How to use data parallel mode

Internally, SambaFlow supports several types of model parallelization. Model parallelization makes running the model (training, fine-tuning, etc) more efficient. You have control over parallelization in these ways:

  • Data parallel mode. You can compile and run the model in data parallel mode, discussed here. Some applications run much faster in data parallel mode.

  • Tensor parallel mode. You can instruct the compiler to perform tensor parallel optimization, which also results in running the model more efficiently. See How to use tensor parallel mode (Beta).

Data parallel vs. tensor parallel

You can theoretically use both data parallel and tensor parallel mode with the same compile and run cycle but note these points:

  • Tensor parallel batch mode can support only RDUs in a single node (which means up to 8 RDUs). Data parallel mode can run across multiple nodes (no limit on the number of RDUs).

  • With tensor parallel batch mode, the number of RDUs is set during compilation and cannot change at runtime. With data parallel mode, the number of chips can be assigned at runtime, so one PEF can run on the number of RDUs you specify.

  • The two modes might treat batch size differently. For example, assume the model uses batch size 8.

    • If you compile using tensor parallel batch mode on 4 RDUs, then the compiled PEF includes instructions for 4 RDUs, each with batch size 2.

    • In data parallel mode, the PEF instructions target a single RDU with batch size 8. If you deploy this PEF to run on 4 RDUs in data parallel mode, then you need to have data input with batch size 32 (4 times 8).

What is data parallel?

For training, the data is broken up into minibatches. In data parallel mode, multiple replicas of the application are launched and the entire training set is partitioned or sharded among the replicas. Each replica then further breaks its partition (or shard) into minibatches. Each replica runs on a computational resource but is served a unique partition of the data.

When you run a model in data parallel mode, the compiler creates a graph that supports both model and data parallelism. At runtime, the model is split across computational resources and multiple replicas of this split model might be launched concurrently, with each replica served a unique data partition. This efficient use of resources is transparent to the user.

See Resources for links to technical papers and other background information.

Data parallel mode only has an effect during training. You see benefits if latency for a single training iteration exceeds about 100ms. For very low latency graphs, you don’t see much gain and might even see deterioration.

How does SambaFlow use data parallel operations?

SambaFlowTM has built-in support for data parallel operations. Many ML applications can take advantage of this mode for improved performance.

Internally, SambaFlow makes use of:

  • The MPICH MPI library.

  • The torch.distributed framework. All functionality provided by torch.distributed is available in a SambaFlow data parallel application. A required key feature of torch.distributed is the ability to create a distributed data sampler to feed data to the replicas.

    MPICH and torch.distributed launch application replicas and handle basic communication between those replicas.

  • A custom communications library (the Collective Communication Library, or CCL). The CCL library provides low-level features that enable acceleration of gradient tensor syncing between replicas.

    Gradient tensors held in Reconfigurable Dataflow UnitTM (RDU) memory can be shared directly; they don’t have to use host CPU and memory. The ALL-GATHER and ALL-REDUCE functions that operate on the gradient tensors are also parallelized. The CCL enables this acceleration across RDUs in the same node, as well as between RDUs in different nodes.

You can leverage this functionality by making a few modifications to the application.

Modify an application to work in data parallel mode

To support data parallel mode, make these changes:

  1. Ensure that the application makes use of the PyTorch torch.utils.data.distributed framework.

  2. Consider sharding data among replias, for example, by using DistributedSampler() (See the PyTorch documentation External link for details).

    The MPI framework that SambaNova uses for data parallel operations is supported as part of PyTorch. The framework allows for easy distribution of dataset samples to all replicas.

  3. Use the PyTorch DataLoader() (recommended).

  4. Modify the samba.session.run() method to accept the --data-parallel and --reduce_on_rdu arguments when they are passed in at the command line.

Here’s a sample code snippet for the samba.session.run() method. If the method includes the two arguments, you can specify arguments specific to data parallel training runs on the command line. See Run a data parallel application.

loss, pred = samba.session.run(input_tensors=[X_mb, y_mb],
                                output_tensors=model.output_tensors,
                                hyperparam_dict=hyperparam_dict,
                                data_parallel=args.data_parallel,
                                reduce_on_rdu=args.reduce_on_rdu)

Compile a data parallel application

To compile an application for data parallel operations, pass these arguments to the SambaFlow compile command:

--data-parallel

Causes the compiler to annotate gradients within the application and to add buffers to the dataflow graph.

-ws

Short for world size, this argument defines the minimum number of application replicas to be launched when the model is trained in data parallel mode. For compilation, set the value to 2. The actual number of replicas to be launched is defined at runtime.

A compilation command for data parallel mode looks like this:

python model.py compile --data-parallel -ws 2 --pef-name=$PEF_NAME

Run a data parallel application

Data parallel applications are launched via the standard MPI command mpirun or another MPI-compliant launcher such as Slurm or Kubernetes with appropriate plugins. The MPI launcher creates a separate process for each application replica and enables each process to later communicate with the others. The mpirun command takes arguments specific to data parallel operations, as well as the standard SambaFlow run command.

-np

Number of replicas to be launched. The number must be at least as large as the -ws argument specified during compilation but it can be larger, usually up to the total number of available RDUs. When mpirun is called with a specified number of replicas, that number is passed to the distributed sampler to divide the dataset.

--data-parallel

Enables data parallel execution at runtime. SambaFlow will automatically handle gradient synchronization during the session.run() call.

--reduce-on-rdu

Enables direct syncing of gradient tensors between RDUs and their associated device memories using CCL (rather than syncing via the host), greatly improving performance. If this optional argument is not specified, gradient tensor syncing happens via the host.

SambaNova recommends that you enable reduce-on-rdu in conjunction with data-parallel.
--host

Allows a user to specify the host(s) on which replicas should be launched. If —host is omitted, mpirun launches all processes and replicas on the local machine. With —host (which takes a comma-separated list of host names), mpirun logs in to those hosts and launches the specified number of processes on those machines, evenly divided across them. You don’t even need to be logged into one of the hosts you’ve specified.

A run command for data parallel mode might look like this:

$ /opt/mpich-3.4.3/bin/mpirun -np $X --host $HOST_NAMES python parallel.py run --data-parallel --reduce-on-rdu --pef $PEF_NAME

Data parallel best practices

Follow these best practices when running applications in data parallel mode:

  • Ensure that each replica can access training data and PEF. On a networked volume, each SambaNova node must have access to the training data and the compiled PEF.

  • In data parallel mode, an application runs concurrently on multiple RDUs. Certain actions are repeated:

    • mpirun merges the stdout of all replicas, making it easy to see all output at once or redirect to a file.

    • To avoid the merge to stdout, you can designate one of the replicas (or “ranks” in MPI parlance) to perform the logging.

    • Use torch.distributed.get_rank() to get a unique identifier for each process in the MPI process group to create unique filenames or paths for each replica.

  • Synchronization between replicas (beyond the automatic gradient sync carried out by SambaFlow) is available via torch.distributed.barrier().

  • Parameters that are common to all replicas can be passed via the command line (see Run a data parallel application), a common file, or broadcast via torch.distributed.broadcast().

  • An application can use any torch.distributed call with the exception of torch.distributed.init_process_group(). SambaFlow calls torch.distributed.init_process_group() automatically when you pass in the --data-parallel flag.

Data parallel example command

Here’s an example of a command you might use to run the transformers_hook.py example in data parallel mode.

/opt/mpich-3.4.3/bin/mpirun -np 4  python transformers_hook.py run
    --config_name modules/configs/sweep_configs/.json
    --tokenizer_name bert-large-uncased
    --module_name mlm_ns
    --task_name mlm_ns
    --max_seq_length 512 -b 32
    --output_dir=hf_output
    --overwrite_output_dir
    --do_train
    --per_device_train_batch_size 32
    --input_dir 
    --cache 
    --max_predictions_per_seq 20
    --save_steps -1
    --warmup_steps 0
    --logging_steps 1
    --weight_decay 0.01
    --learning_rate 0.00035
    --non_split_head
    --dense_adam
    --adam_beta2 0.98
    --max_grad_norm_clip 1.0
    --validate_stat_perf
    --validate_tying_plus_embed_train
    --skip_checkpoint -p MY_PEF.pef
    --max_steps 10
    --steps_this_run 10
    --data-parallel
    --reduce-on-rdu

See Run a data parallel application for a list of arguments that are specific to data parallel mode. The other arguments are application specific. Run the application itself with --help for some information.

Resources

Our engineering team suggests the following resources for background information: