diff --git a/ray.sub b/ray.sub index 8efae160d0..c53a3319b5 100644 --- a/ray.sub +++ b/ray.sub @@ -122,8 +122,18 @@ if [[ -n "${UV_CACHE_DIR_OVERRIDE:-}" ]]; then fi # Create logs directory +# NOTE: LOG_DIR must be on a shared filesystem visible to both the submit host and +# all compute nodes. File-based signaling between the submit host and the containers +# (STARTED_RAY_HEAD, ray_worker_units, ENDED, sandbox readiness) relies on this. +# On systems without a shared FS, srun-based polling would be needed instead. BASE_LOG_DIR=${BASE_LOG_DIR:-$SLURM_SUBMIT_DIR} -LOG_DIR="$BASE_LOG_DIR/$SLURM_JOB_ID-logs" +# On a Slurm requeue/restart, suffix the restart count so we don't clobber the +# previous attempt's logs (and signal files) under the same job ID. +if [[ -n "${SLURM_RESTART_COUNT:-}" ]]; then + LOG_DIR="$BASE_LOG_DIR/$SLURM_JOB_ID-$SLURM_RESTART_COUNT-logs" +else + LOG_DIR="$BASE_LOG_DIR/$SLURM_JOB_ID-logs" +fi mkdir -p $LOG_DIR # Write setup commands to a file so they can be executed inside each container @@ -135,6 +145,15 @@ if [[ -n "$SETUP_COMMAND" ]]; then chmod +x "$SETUP_COMMAND_FILE" fi +# Write COMMAND to a file so the head container can run it without heredoc/quoting +# issues. Its presence is also how the head decides non-interactive vs interactive. +DRIVER_COMMAND_FILE="" +if [[ -n "$COMMAND" ]]; then + DRIVER_COMMAND_FILE="$LOG_DIR/driver_command.sh" + printf '%s' "$COMMAND" > "$DRIVER_COMMAND_FILE" + chmod +x "$DRIVER_COMMAND_FILE" +fi + # Number of GPUs per worker node GPUS_PER_NODE=${GPUS_PER_NODE:-8} @@ -158,7 +177,13 @@ COMMON_SRUN_ARGS+=" -A $SLURM_JOB_ACCOUNT" # Number of CPUs per worker node CPUS_PER_WORKER=${CPUS_PER_WORKER:-$((GPUS_PER_NODE * 16))} +# The head is retried a few times. Workers may need more attempts since they can +# race the head's GCS coming up: `ray start --address` fails until GCS is listening. num_retries=3 +WORKER_NUM_RETRIES=20 + +# Total Ray worker_units expected once every node has registered with the head. +NUM_ACTORS=$((GPUS_PER_NODE * SLURM_JOB_NUM_NODES)) # Track backgrounded srun client PIDs for head and workers declare -A SRUN_PIDS @@ -306,13 +331,19 @@ RAY_RESOURCES+='}' export RAY_RESOURCES TOPO_PROBE_EOF +# Set up the sandbox ports/signal directory on the shared FS before building the +# head script, so the head can gate the driver on sandbox readiness. Arbitrary +# extra mounts are supported via SANDBOX_EXTRA_MOUNTS (e.g. a shared temp dir). +SANDBOX_PORTS_DIR="" +if [[ -n "${SANDBOX_CONTAINER:-}" ]] && [[ -n "${SANDBOX_COMMAND:-}" ]]; then + SANDBOX_PORTS_DIR="$LOG_DIR/sandbox" + mkdir -p "$SANDBOX_PORTS_DIR" +fi + # First we start the head of the ray cluster on one of the physical nodes # Give the head node actual resources to make it schedulable head_cmd=$(cat < "$LOG_DIR/.ray_worker_units.tmp" + mv "$LOG_DIR/.ray_worker_units.tmp" "$LOG_DIR/ray_worker_units" + if [[ -f "$LOG_DIR/ENDED" ]]; then break; fi + done +} +ray-status-sidecar & + # Patch nsight.py before starting Ray head sed -i 's/context\.py_executable = " "\.join(self\.nsight_cmd) + " python"/context.py_executable = " ".join(self.nsight_cmd) + f" {context.py_executable}"/g' /opt/nemo_rl_venv/lib64/python*/site-packages/ray/_private/runtime_env/nsight.py source $LOG_DIR/topology_probe.sh -cat </dev/null | grep "worker_units" | awk -F'[/. ]' '{print \$4}' || echo 0) + echo "[INFO] Number of actors online: \$worker_units/$NUM_ACTORS" + if [[ "\$worker_units" -eq "$NUM_ACTORS" ]]; then + break + fi + if (( SECONDS > RAY_STATUS_DEADLINE )); then + echo "[ERROR] Timed out waiting for all workers to connect (\$worker_units/$NUM_ACTORS after 30 min)" + touch "$LOG_DIR/ENDED" + exit 1 + fi + sleep 5 + done + + echo "All workers connected!" + + # Gate on every sandbox instance being ready. Each per-node sandbox task touches + # SANDBOX_READY_ once its port is listening (see the sandbox srun below). + if [[ -n "$SANDBOX_PORTS_DIR" ]]; then + echo "[INFO] Waiting for sandbox to be ready on all $SLURM_JOB_NUM_NODES nodes..." + SANDBOX_DEADLINE=\$((SECONDS + 600)) + while true; do + ready_count=\$(ls -1 "$SANDBOX_PORTS_DIR"/SANDBOX_READY_* 2>/dev/null | wc -l) + if [[ "\$ready_count" -ge "$SLURM_JOB_NUM_NODES" ]]; then + break + fi + echo "[INFO] Sandbox ready on \$ready_count/$SLURM_JOB_NUM_NODES nodes..." + if (( SECONDS > SANDBOX_DEADLINE )); then + echo "[ERROR] Timed out waiting for sandbox (\$ready_count/$SLURM_JOB_NUM_NODES after 10 min)" + touch "$LOG_DIR/ENDED" + exit 1 + fi + sleep 5 + done + echo "[INFO] All $SLURM_JOB_NUM_NODES sandbox instances ready." + fi + + set +e + bash "$DRIVER_COMMAND_FILE" > "$LOG_DIR/ray-driver.log" 2>&1 + exit_code=\$? + set -e + exit \$exit_code +else + # Interactive: keep the head container alive for user attachment via the attach + # helper (a separate overlapping srun). sleep infinity is invisible to the + # attached session. + sleep infinity +fi EOF ) -srun $COMMON_SRUN_ARGS --container-name=ray-head --nodes=1 --ntasks=1 --cpus-per-task=$CPUS_PER_WORKER -w "$head_node" -o $LOG_DIR/ray-head.log bash -x -c "$head_cmd" & -SRUN_PIDS["ray-head"]=$! - -NUM_ACTORS=$((GPUS_PER_NODE * SLURM_JOB_NUM_NODES)) -# Start Ray worker nodes -# We want 1 Ray worker node per physical node (excluding the head node) -# Worker nodes are started with ray start but without the --head flag -# Start from node 1 since node 0 is running the head -for ((i = 1; i < SLURM_JOB_NUM_NODES; i++)); do - node_i=${nodes_array[$i]} - - worker_cmd=$(cat <&2; exit 1; } +bash -n <(printf '%s' "$worker_cmd") || { echo "[FATAL] Worker script has syntax errors" >&2; exit 1; } ######################################################## # Optional sandbox sidecar for NeMo-Skills-backed Gym resources. +# Launched first so it boots in parallel with the Ray head/workers. Each per-node +# task starts the sandbox, polls its local port, and on success touches +# SANDBOX_READY_ so the head can gate the driver on all instances. ######################################################## export SLURM_MASTER_NODE=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n1) +SANDBOX_PORT="${NEMO_SKILLS_SANDBOX_PORT:-6000}" if [[ -n "${SANDBOX_CONTAINER:-}" ]] && [[ -n "${SANDBOX_COMMAND:-}" ]]; then - SANDBOX_PORTS_DIR="$LOG_DIR/sandbox" - mkdir -p "$SANDBOX_PORTS_DIR" - SANDBOX_MOUNTS="$SANDBOX_PORTS_DIR:$SANDBOX_PORTS_DIR" if [[ -n "${SANDBOX_EXTRA_MOUNTS:-}" ]]; then SANDBOX_MOUNTS="${SANDBOX_EXTRA_MOUNTS},${SANDBOX_MOUNTS}" @@ -550,9 +666,9 @@ if [[ -n "${SANDBOX_CONTAINER:-}" ]] && [[ -n "${SANDBOX_COMMAND:-}" ]]; then SANDBOX_EXPORTS="${SANDBOX_EXPORTS},${SANDBOX_ENV_VARS}" fi - echo "[INFO] Starting sandbox sidecars on all allocated nodes (ports_dir=$SANDBOX_PORTS_DIR)..." - srun --output "$LOG_DIR/sandbox.log" \ - --error "$LOG_DIR/sandbox.log" \ + echo "[INFO] Starting sandbox sidecars on all allocated nodes (ports_dir=$SANDBOX_PORTS_DIR, port=$SANDBOX_PORT)..." + srun --output "$SANDBOX_PORTS_DIR/sandbox-%t.log" \ + --error "$SANDBOX_PORTS_DIR/sandbox-%t.log" \ --container-image="$SANDBOX_CONTAINER" \ --container-mounts="$SANDBOX_MOUNTS" \ --no-container-mount-home \ @@ -565,49 +681,97 @@ if [[ -n "${SANDBOX_CONTAINER:-}" ]] && [[ -n "${SANDBOX_COMMAND:-}" ]]; then --nodes="$SLURM_JOB_NUM_NODES" \ --ntasks-per-node=1 \ --export="$SANDBOX_EXPORTS" \ - bash -c "$SANDBOX_COMMAND" & + bash -xc ' + ('"$SANDBOX_COMMAND"') & + SANDBOX_PID=$! + deadline=$((SECONDS + 300)) + while ! (echo > /dev/tcp/localhost/'"$SANDBOX_PORT"') 2>/dev/null; do + if ! kill -0 $SANDBOX_PID 2>/dev/null; then + echo "[ERROR] Sandbox process died before becoming ready on $(hostname)" + exit 1 + fi + if (( SECONDS > deadline )); then + echo "[ERROR] Sandbox not ready on $(hostname) after 5 min" + exit 1 + fi + sleep 2 + done + touch '"$SANDBOX_PORTS_DIR"'/SANDBOX_READY_$(hostname) + echo "[INFO] Sandbox ready on $(hostname):'"$SANDBOX_PORT"'" + wait $SANDBOX_PID + ' & SRUN_PIDS["sandbox"]=$! echo "[INFO] Sandbox sidecar started in background (PID: ${SRUN_PIDS["sandbox"]})" else echo "[INFO] SANDBOX_CONTAINER or SANDBOX_COMMAND not defined, skipping sandbox startup" fi -# At this stage the Ray cluster bringup has started on the physical nodes in the allocation -# Before we launch a job on this cluster we need to make sure that the bringup is complete -# We do so by querying the number of worker_units in the ray cluster and asserting = NUM_ACTORS -extract_worker_units() { - status_output=$(srun --overlap --container-name=ray-head --nodes=1 --ntasks=1 -w "$head_node" ray status) - if echo "$status_output" | grep -q "worker_units"; then - worker_units=$(echo "$status_output" | grep "worker_units" | awk -F'[/. ]' '{print $4}') - echo $worker_units - else - echo 0 - fi -} +# Start the Ray head. +srun $COMMON_SRUN_ARGS --container-name=ray-head --nodes=1 --ntasks=1 --cpus-per-task=$CPUS_PER_WORKER -w "$head_node" -o $LOG_DIR/ray-head.log bash -x -c "$head_cmd" & +SRUN_PIDS["ray-head"]=$! -# Poll to make sure that all Ray worker nodes have connected to the head. -# All workers have connected when number of GPUs in ray cluster -# is equal to NUM_ACTORS. We use the utility function above -# to check how many GPUs have come online in the ray cluster -while true; do - worker_units=$(extract_worker_units) - echo "[INFO] Number of actors online: $worker_units/$NUM_ACTORS" - if [[ "$worker_units" -eq "$NUM_ACTORS" ]]; then - break - fi - check_srun_processes +# Start the Ray workers (all nodes except the head) with a SINGLE batched srun +# instead of one srun per node. This keeps srun/slurmctld RPCs constant rather than +# scaling them with the node count. In single-node mode there are no workers: the +# head already registers its GPUs as Ray resources. +_num_workers=$((SLURM_JOB_NUM_NODES - 1)) +if [[ $_num_workers -gt 0 ]]; then + # srun -o with %t produces per-task log files; zero-pad to the worker-count + # width so ray-worker-*.log filenames sort naturally (e.g. ray-worker-007.log). + _task_digits=${#_num_workers} + srun $COMMON_SRUN_ARGS --container-name=ray-worker --exact \ + --nodes=$_num_workers \ + --ntasks=$_num_workers \ + --ntasks-per-node=1 \ + --cpus-per-task=$CPUS_PER_WORKER \ + --kill-on-bad-exit=0 \ + --exclude="$head_node" \ + -o "$LOG_DIR/ray-worker-%0${_task_digits}t.log" \ + bash -x -c "$worker_cmd" & + SRUN_PIDS["ray-workers"]=$! +else + echo "[INFO] Single-node mode: head node is the only compute node, no workers to launch" +fi + +# Wait for the head container to report that the Ray head GCS is ready. This is a +# cheap shared-FS file check rather than an srun --overlap into the head node. +while check_srun_processes && ! test -f "$LOG_DIR/STARTED_RAY_HEAD"; do + echo "[INFO][$(date)] Waiting for Ray head GCS to be ready..." sleep 2 done -echo "All workers connected!" - -# We can now launch a job on this cluster -# We do so by launching a driver process on the physical node that the head node is on -# This driver process is responsible for launching a job on the Ray cluster -CONTAINER_CWD=$(scontrol show job $SLURM_JOB_ID | grep -oP 'WorkDir=\K[^ ]+' | head -1) if [[ -n "$COMMAND" ]]; then - srun --no-container-mount-home --overlap --container-name=ray-head --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "$head_node" -o $LOG_DIR/ray-driver.log bash -c "$COMMAND" + # Non-interactive: the head container polls for workers, gates on sandbox + # readiness, runs the driver inline (-> ray-driver.log), and exits with the + # command's status. We just propagate that status (saves a dedicated driver srun). + set +e + wait ${SRUN_PIDS["ray-head"]} + exit_code=$? + set -e + exit $exit_code else + # Interactive: poll for workers from the submit host by reading the file the + # head's ray-status-sidecar publishes (no srun ray-status RPCs), then write the + # attach helper and idle. + RAY_STATUS_DEADLINE=$((SECONDS + 1800)) + while true; do + worker_units=$(cat "$LOG_DIR/ray_worker_units" 2>/dev/null || echo -1) + echo "[INFO] Number of actors online: $worker_units/$NUM_ACTORS" + if [[ "$worker_units" -eq "$NUM_ACTORS" ]]; then + break + fi + check_srun_processes + if (( SECONDS > RAY_STATUS_DEADLINE )); then + echo "[ERROR] Timed out waiting for all workers to connect ($worker_units/$NUM_ACTORS after 30 min)" + touch "$LOG_DIR/ENDED" + exit 1 + fi + sleep 5 + done + + echo "All workers connected!" + + CONTAINER_CWD=$(scontrol show job $SLURM_JOB_ID | grep -oP 'WorkDir=\K[^ ]+' | head -1) echo "[INFO]: Ray Cluster is idled, run this on the slurm head node to get a shell to the head node:" cat <$SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh # No args launches on the head node (node 0) @@ -615,31 +779,36 @@ else # Optional: set COMMAND='...' to run non-interactively instead of opening an interactive shell WORKER_NUM=\${1:-} if [[ -z "\$WORKER_NUM" ]]; then - # Empty means we are on the head node if [[ -n "\${COMMAND:-}" ]]; then srun --no-container-mount-home $GRES_ARG -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-head --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "$head_node" --jobid $SLURM_JOB_ID bash -c "\$COMMAND" else srun --no-container-mount-home $GRES_ARG -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-head --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "$head_node" --jobid $SLURM_JOB_ID --pty bash fi +elif [[ $SLURM_JOB_NUM_NODES -eq 1 ]]; then + echo "Error: Single-node mode — only the head node is available. Run without arguments." + exit 1 else - # Worker numbers 1 through N-1 correspond to ray-worker-1 through ray-worker-(N-1) - # and use nodes_array[1] through nodes_array[N-1] + # All workers share the container name 'ray-worker'; target a specific one by node. + # Log files are 0-indexed: worker K maps to ray-worker-(K-1).log if [[ \$WORKER_NUM -lt 1 || \$WORKER_NUM -ge $SLURM_JOB_NUM_NODES ]]; then echo "Error: WORKER_NUM must be between 1 and $((SLURM_JOB_NUM_NODES-1))" exit 1 fi nodes_array=($nodes) + node="\${nodes_array[\$WORKER_NUM]}" if [[ -n "\${COMMAND:-}" ]]; then - srun --no-container-mount-home $GRES_ARG -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-worker-\$WORKER_NUM --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "\${nodes_array[\$WORKER_NUM]}" --jobid $SLURM_JOB_ID bash -c "\$COMMAND" + srun --no-container-mount-home $GRES_ARG -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-worker --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "\$node" --jobid $SLURM_JOB_ID bash -c "\$COMMAND" else - srun --no-container-mount-home $GRES_ARG -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-worker-\$WORKER_NUM --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "\${nodes_array[\$WORKER_NUM]}" --jobid $SLURM_JOB_ID --pty bash + srun --no-container-mount-home $GRES_ARG -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-worker --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "\$node" --jobid $SLURM_JOB_ID --pty bash fi fi EOF chmod +x $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh echo " COMMAND='echo hello' bash $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh # run a non-interactive command on head node" echo " bash $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh # to attach to head node (i.e., 'worker 0')" - echo " bash $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh 1 # to attach to worker 1" - echo " bash $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh 2 # to attach to worker 2, etc." + if [[ $SLURM_JOB_NUM_NODES -gt 1 ]]; then + echo " bash $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh 1 # to attach to worker 1" + echo " bash $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh 2 # to attach to worker 2, etc." + fi sleep infinity fi