Distributed Jobs on Many VMs#

SkyPilot supports multi-node cluster provisioning and distributed execution on many VMs.

For example, here is a simple PyTorch Distributed training example:

name: resnet-distributed-app

resources:
  accelerators: V100:4

num_nodes: 2

setup: |
  pip3 install --upgrade pip
  git clone https://github.com/michaelzhiluo/pytorch-distributed-resnet
  cd pytorch-distributed-resnet
  # SkyPilot's default image on AWS/GCP has CUDA 11.6 (Azure 11.5).
  pip3 install -r requirements.txt torch==1.12.1+cu113 --extra-index-url https://download.pytorch.org/whl/cu113
  mkdir -p data  && mkdir -p saved_models && cd data && \
    wget -c --quiet https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
  tar -xvzf cifar-10-python.tar.gz

run: |
  cd pytorch-distributed-resnet

  num_nodes=`echo "$SKYPILOT_NODE_IPS" | wc -l`
  master_addr=`echo "$SKYPILOT_NODE_IPS" | head -n1`
  python3 -m torch.distributed.launch \
    --nproc_per_node=${SKYPILOT_NUM_GPUS_PER_NODE} \
    --node_rank=${SKYPILOT_NODE_RANK} \
    --nnodes=$num_nodes \
    --master_addr=$master_addr \
    --master_port=8008 \
    resnet_ddp.py --num_epochs 20

In the above,

  • num_nodes: 2 specifies that this task is to be run on 2 nodes, with each node having 4 V100s;

  • The highlighted lines in the run section show common environment variables that are useful for launching distributed training, explained below.

Note

If you encounter the error [Errno 24] Too many open files, this indicates that your process has exceeded the maximum number of open file descriptors allowed by the system. This often occurs in high-load scenarios, e.g., launching significant amount number of nodes, such as 100.

To resolve this issue, run the following command, and try again:

ulimit -n 65535

Environment variables#

SkyPilot exposes these environment variables that can be accessed in a task’s run commands:

  • SKYPILOT_NODE_RANK: rank (an integer ID from 0 to num_nodes-1) of the node executing the task.

  • SKYPILOT_NODE_IPS: a string of IP addresses of the nodes reserved to execute the task, where each line contains one IP address.

    • You can retrieve the number of nodes by echo "$SKYPILOT_NODE_IPS" | wc -l and the IP address of the third node by echo "$SKYPILOT_NODE_IPS" | sed -n 3p.

    • To manipulate these IP addresses, you can also store them to a file in the run command with echo $SKYPILOT_NODE_IPS >> ~/sky_node_ips.

  • SKYPILOT_NUM_GPUS_PER_NODE: number of GPUs reserved on each node to execute the task; the same as the count in accelerators: <name>:<count> (rounded up if a fraction).

Launching a multi-node task (new cluster)#

When using sky launch to launch a multi-node task on a new cluster, the following happens in sequence:

  1. Nodes are provisioned. (barrier)

  2. Workdir/file_mounts are synced to all nodes. (barrier)

  3. setup commands are executed on all nodes. (barrier)

  4. run commands are executed on all nodes.

Launching a multi-node task (existing cluster)#

When using sky launch to launch a multi-node task on an existing cluster, the cluster may have more nodes than the current task’s num_nodes requirement.

The following happens in sequence:

  1. SkyPilot checks the runtime on all nodes are up-to-date. (barrier)

  2. Workdir/file_mounts are synced to all nodes. (barrier)

  3. setup commands are executed on all nodes of the cluster. (barrier)

  4. run commands are executed on the subset of nodes scheduled to execute the task, which may be fewer than the cluster size.

Tip

To skip rerunning the setup commands, use either sky launch --no-setup ... (performs steps 1, 2, 4 above) or sky exec (performs step 2 (workdir only) and step 4).

Executing a task on the head node only#

To execute a task on the head node only (a common scenario for tools like mpirun), use the SKYPILOT_NODE_RANK environment variable as follows:

...

num_nodes: <n>

run: |
  if [ "${SKYPILOT_NODE_RANK}" == "0" ]; then
      # Launch the head-only command here.
  fi

SSH into worker nodes#

In addition to the head node, the SSH configurations for the worker nodes of a multi-node cluster are also added to ~/.ssh/config as <cluster_name>-worker<n>. This allows you directly to SSH into the worker nodes, if required.

# Assuming 3 nodes in a cluster named mycluster

# Head node.
$ ssh mycluster

# Worker nodes.
$ ssh mycluster-worker1
$ ssh mycluster-worker2

Executing a Distributed Ray Program#

To execute a distributed Ray program on many VMs, you can download the training script and launch the task yaml:

$ wget https://raw.githubusercontent.com/skypilot-org/skypilot/master/examples/distributed_ray_train/train.py
$ sky launch ray_train.yaml
resources:
  accelerators: L4:2
  memory: 64+

num_nodes: 2

workdir: .

setup: |
  conda activate ray
  if [ $? -ne 0 ]; then
    conda create -n ray python=3.10 -y
    conda activate ray
  fi

  pip install "ray[train]"
  pip install tqdm
  pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

run: |
  sudo chmod 777 -R /var/tmp
  head_ip=`echo "$SKYPILOT_NODE_IPS" | head -n1`
  num_nodes=`echo "$SKYPILOT_NODE_IPS" | wc -l`
  if [ "$SKYPILOT_NODE_RANK" == "0" ]; then
    ps aux | grep ray | grep 6379 &> /dev/null || ray start --head  --disable-usage-stats --port 6379
    sleep 5
    python train.py --num-workers $num_nodes
  else
    sleep 5
    ps aux | grep ray | grep 6379 &> /dev/null || ray start --address $head_ip:6379 --disable-usage-stats
  fi

Warning

Avoid Installing Ray in Base Environment: Before proceeding with the execution of a distributed Ray program, it is crucial to ensure that Ray is not installed in the base environment. Installing a different version of Ray in the base environment can lead to abnormal cluster status.

It is highly recommended to create a dedicated virtual environment (as above) for Ray and its dependencies, and avoid calling ray stop as that will also cause issue with the cluster.