Data parallel applications in SambaFlow

Some applications run much faster in data parallel mode than in the default mode (model parallel). This page discusses what it means to use data parallel mode, when it makes sense, and how you can compile and run in data parallel mode.

What is data parallel?

Deep learning can use data parallelism or model parallelism.

  • For any training, the data is broken up into minibatches. For data parallel, the entire training set is partitioned or sharded among the replicas, and then each replica further breaks its partition or shard into minibatches.

  • For model parallel learning, we split the model. In the case of a deep neural network, we split layers into groups of neurons while maintaining connectivity to preceding and succeeding layers. Each model split runs on a computational resource and is served the full dataset.

See Resources for links to technical papers and other background information.

How does SambaFlow use data parallel operations?

SambaFlowTM has built-in support for data parallel operations. Many ML applications can take advantage of this mode for improved performance.

Internally, SambaFlow makes use of

  • The MPICH MPI library, MPICH-3.4.3 is being shipped in early 2023

  • The torch.distributed framework. All functionality provided by torch.distributed is available in a SambaFlow data parallel application. A required key feature of torch.distributed is the ability to create a distributed data sampler to feed data to the replicas.

    MPICH and torch.distributed launch application replicas and handle basic communication between those replicas.

  • A custom communications library (the Collective Communication Library, or CCL). The CCL library provides low-level features that enable acceleration of gradient tensor syncing between replicas.

    Gradient tensors held in Reconfigurable Dataflow UnitTM (RDU) memory can be shared directly; they don’t have to use host CPU and memory. The ALL-GATHER and ALL-REDUCE functions that operate on the gradient tensors are also parallelized. The CCL enables this acceleration across RDUs in the same node, as well as between RDUs in different nodes.

Leverage this functionality by making a few modifications to the application.

Modify an application to work in data parallel mode

To support data parallel mode, make these changes:

  1. Ensure that the application makes use of the PyTorch torch.utils.data.distributed framework.

  2. Consider sharding data among replias, for example, by using DistributedSampler() .

    The MPI framework that SambaNova uses for data parallel operations is supported as part of PyTorch. The framework allows for easy distribution of dataset samples to all replicas.

  3. Use the PyTorch DataLoader() (recommended).

  4. Modify the samba.session.run() method to accept the --data-parallel and --reduce_on_rdu arguments when they are passed in at the command line.

    loss, pred = samba.session.run(input_tensors=[X_mb, y_mb],
                                        output_tensors=model.output_tensors,
                                        hyperparam_dict=hyperparam_dict,
                                        data_parallel=args.data_parallel,
                                        reduce_on_rdu=args.reduce_on_rdu)

The next sections describe the compile and run command-line arguments in detail.

Compile a data parallel application

To compile an application for data parallel operations, pass these arguments to the SambaFlow compile command:

--data-parallel

Causes the compiler to add the necessary Gather and Reduce sections and buffers to the dataflow graph.

-ws

Short for world size, this argument defines the minimum number of application replicas to be launched when the model is trained in data parallel mode. For compilation, set the value to 2. The actual number of replicas to be launched is defined at runtime.

A compilation command for data parallel mode looks like this:

python model.py compile --data-parallel -ws 2 --pef-name=$PEF_NAME

Run a data parallel application

Data parallel applications are launched via the standard MPI command mpirun. The command takes arguments specific to data parallel operations, as well as the standard SambaFlow run command:

-np

Number of replicas to be launched. The number must be at least as large as the -ws argument specified during compilation, but it can be larger, usually up to the total number of available RDUs. If the application fits in a single tile of an RDU, it is possible to launch as many replicas as there are tiles available.

--data-parallel

Enables data parallel execution at runtime, launching the specified number of replicas.

--reduce-on-rdu

Enables direct syncing of gradient tensors between RDUs and their associated device memories (rather than syncing via the host), greatly improving performance. If this optional argument is not specified, gradient tensor syncing happens via the host.

SambaNova recommends that you enable reduce-on-rdu in conjunction with data-parallel.
--host

Allows a user to specify the host(s) on which replicas should be launched. If this optional argument is not specified, replicas are launched on the host that the user is logged into.

A run command for data parallel mode might look like this:

$ /opt/mpich-3.4.3/bin/mpirun -np $X --host $HOST_NAMES python parallel.py run --data-parallel --reduce-on-rdu --pef $PEF_NAME
We frequently update the version of the MPICH library we ship.

Data parallel best practices

Follow these best practices when running applications in data parallel mode:

  • When mpirun is called with a specified number of replicas (via the -np argument), that number will be passed to the distributed sampler to divide the dataset as appropriate.

  • Ensure that training data is accessible to each replica, e.g., on a networked volume accessible to each SambaNova node. The compiled PEF must also be available to each node.

  • In data parallel mode, an application runs oncurrently on multiple RDU. Certain actions are repeated:

    • mpirun merges the stdout of all replicas, making it easy to see all output at once or redirect to a file. Subsequent notes apply to an application opening a file itself.

    • To avoid the merge to stdout, you can designate one of the replicas (or “ranks” in MPI parlance) to perform the logging.

    • You can use torch.distributed.get_rank() to get a unique identifier for each process in the sampler process group to create unique filenames or paths for each replica (each process created by the DistributedSampler will be mapped to a replica via mpirun).

  • Synchronization between replicas (beyond the automatic gradient sync carried out by SambaFlow) is available via torch.distributed.barrier():

    All replicas must call barrier(), but in the example, replica 1 performs the work, and then calls barrier(). The other replicas only call barrier(). When the barrier() call exits, each replica is assured that replica 1’s work is complete.

  • Parameters that are common to all replicas can be passed via the command line (see Run a data parallel application), a common file, or broadcast via torch.distributed.broadcast().

  • An application can use any torch.distributed call with the exception of torch.distributed.init_process_group(). SambaFlow calls torch.distributed.init_process_group() automatically when you pass in the --data-parallel flag.

Data parallel example command

Here’s an example of a command you might use to run the transformers_hook.py example in data parallel mode.

/opt/mpich-3.4.3/bin/mpirun -np 4  python transformers_hook.py run
    --config_name modules/configs/sweep_configs/mlmns_bert_base_config.json
    --tokenizer_name bert-large-uncased
    --module_name mlm_ns
    --task_name mlm_ns
    --max_seq_length 512 -b 32
    --output_dir=hf_output
    --overwrite_output_dir
    --do_train
    --per_device_train_batch_size 32
    --input_dir /import/snvm-sc-publicmoduledata-scratch/snvm_rgrs/bert/mlm/hdf5_lower_case_1_seq_len_512_max_pred_80_masked_lm_prob_0.15_random_seed_12345_dupe_factor_5/wikicorpus_en/
    --cache /import/snvm-sc-publicmoduledata-scratch/snvm_rgrs/bert/mlm/wikitext-103-raw/cache
    --max_predictions_per_seq 20
    --save_steps -1
    --warmup_steps 0
    --logging_steps 1
    --weight_decay 0.01
    --learning_rate 0.00035
    --non_split_head
    --dense_adam
    --adam_beta2 0.98
    --max_grad_norm_clip 1.0
    --validate_stat_perf
    --validate_tying_plus_embed_train
    --skip_checkpoint -p /import/snvm-fn-scratch1/michaelc/pef_tgm/main_2/bert-tp-weight-dp.pef
    --max_steps 10
    --steps_this_run 10
    --data-parallel
    --reduce-on-rdu

See Run a data parallel application for a list of arguments that are specific to data parallel mode. The other arguments are application-specific. Run the application itself with --help for some information.

Resources

Our engineering team suggests the following resources for background information: