Code elements of the training program
The Generative NLP tutorial is available on our public GitHub repo here . It includes two main code files, one for training, and one for inference.
-
The
generative_train.py
code, discussed here, supports compiling and training a model. As part of training, you can generate checkpoints. -
The
generative_inference.py
code supports compiling for inference and performing an inference run. See Code elements of the inference program.
This doc page explores the generative_train.py
code.
This tutorial uses a GPT-2 Hugging Face model because the model is small and compile and train times are short. You can follow the same process with GPT-3 and later or with other LLMs. |
It all starts with main()
A SambaNova model must go through both compilation and training. The main()
function calls the functions to perform these tasks, and also does some preparation.
Function | Description | See |
---|---|---|
parse_app_args() |
Collect the arguments coming from |
|
|
Download the pretrained model from Hugging Face. We use the |
|
|
Patch some parts of the Hugging Face model to improve performance on RDU. NOTE: In future releases of this tutorial, we expect to remove the patch functionality. |
|
|
Convert the model to use the SambaFlow framework. |
|
|
Create dummy tensors for compilation. |
|
|
Create the optimizer. The |
|
|
If the user specifies |
|
|
If the user specifies |
Here’s the code for main()
main() for training (compile and run)
def main(argv: List[str]) -> None:
# Parse the args
args = parse_app_args(
argv=argv, common_parser_fn=add_common_args, run_parser_fn=add_run_args
)
# Download the model from Hugging Face
if args.model_name_or_path:
model = AutoModelForCausalLM.from_pretrained(
args.model_name_or_path, cache_dir=args.cache_dir
)
model.training = True
elif args.config_name:
config = AutoConfig.from_pretrained(args.config_name, cache_dir=args.cache_dir)
# Read dropout rate from config
args.dropout = config.resid_pdrop
model = AutoModelForCausalLM.from_config(config)
else:
raise RuntimeError("Must provide --model_name_or_path or --config_name")
if not args.inference:
model = model.train()
else:
model = model.eval()
# Patch the model here
model = patch_model(model, args)
samba.from_torch_model_(model)
# Make the tracing inputs
inputs = get_model_trace_inputs(args)
# Make the optimizer
optims = get_optimizers(args, model)
if args.command == "compile":
samba.session.compile(
model, inputs, optims, name="hf_transformer", init_output_grads=True
)
elif args.command == "run":
traced_outputs = utils.trace_graph(
model, inputs, optims, pef=args.pef, init_output_grads=True
)
train(args, model, traced_outputs)
Parse input arguments
Users can run generative_train.py
with input arguments to affect its behavior.
-
The first argument is
compile
orrun
. Based on that argument, users can specify additional options that are predefined by the compiler or the SambaFlow layer, or by the application itself.The following arguments are available out of the box:
-
Common arguments are handled by the SambaNova compiler or the SambaFlow framework and are supported as inputs when you compile or run the model.
-
Compiler arguments are supported in conjunction with
compile
, for example,mymodel.py compile --o0
. -
Runtime arguments are supported in conjunction with
run
, for example,mymodel.py run --data-dir=/tmp/data
.
-
-
Each model can specify additional arguments, usually by using the following functions:
-
add_common_args()
specifies arguments for use with eithercompile
orrun
. -
add_run_args()
specifies arguments for use withrun
, that is, for training or inference. -
add_compile_args()
is not defined in this example. Because SambaNova supports a rich set of arguments out of the box, not all applications specify additional compiler arguments.
-
Here’s the code we use to support additional model-specific arguments:
Adding arguments for compile and run
def add_common_args(parser: argparse.ArgumentParser):
"""Adds common arguments to an argparser object
Args:
parser (argparse.ArgumentParser): The argument parser object to add arguments to
"""
parser.add_argument(
"--model_name_or_path",
type=str,
help="Path to pretrained model or model identifier from huggingface.co/models",
)
parser.add_argument(
"--config_name",
type=str,
help="Path to pretrained model config or model identifier from huggingface.co/models",
)
parser.add_argument(
"--cache_dir",
type=str,
help="Where to store pretrained models and data downloaded from huggingface.co",
)
parser.add_argument(
"--max_seq_length",
type=int,
default=-1,
help="The maximum total input sequence length after tokenization. "
"Data in your data dir will be truncated or padded to this length. ",
)
parser.add_argument(
"--weight_decay",
type=float,
default=0.1,
help="The weight decay to apply (if not zero) to all layers except all "
"bias and LayerNorm weights in the AdamW optimizer.",
)
parser.add_argument(
"--max_grad_norm_clip",
type=float,
default=1.0,
help="Maximum gradient norm (for gradient clipping)",
)
parser.add_argument(
"--learning_rate",
type=float,
default=7.5e-6,
help="The initial learning rate for the AdamW optimizer.",
)
parser.add_argument(
"--dropout",
type=float,
default=0.1,
help="proportion of activations to drop in dropout layers",
)
parser.add_argument(
"--prompt_loss_weight",
type=float,
default=0.0,
help='Relative weight of tokens with the "prompt" token type ID '
"during backpropagation.",
)
def add_run_args(parser: argparse.ArgumentParser):
"""Adds arguments used at runtime to an argument parser object
Args:
parser (argparse.ArgumentParser): The argument parser object to add arguments to
"""
parser.add_argument(
"--data_dir",
type=str,
help="Path to a directory containing HDF5 files of pre-tokenized text",
)
parser.add_argument(
"--steps", type=int, default=800, help="Number of training steps to take"
)
parser.add_argument(
"--min_eval_acc",
type=float,
default=0.0,
help="Minimum threshold for evaluation accuracy of a trained model. only for testing.",
)
parser.add_argument(
"--subsample_eval",
type=float,
default=0.1,
help="Proportion of the evaluation set to use for evaluation. "
"Setting a smaller poportion helps speed up evauation.",
)
parser.add_argument(
"--checkpoint_name",
type=str,
default="checkpoint.pt",
help="Path where the final trained checkpoint will be saved.",
)
Pull the pretrained model and configuration
Hugging Face supports two functions for pulling a model:
-
AutomodelForCausalLM.from_pretrained()
uses the Hugging Face model and configuration directly. -
AutoConfig.from_pretrained()
andAutoModelForCausalLM.from_config()
allow us to use our own configuration file.
Pull the model
if args.model_name_or_path:
model = AutoModelForCausalLM.from_pretrained(
args.model_name_or_path, cache_dir=args.cache_dir)
model.training = True
elif args.config_name:
config = AutoConfig.from_pretrained(args.config_name, cache_dir=args.cache_dir)
# Read dropout rate from config
args.dropout = config.resid_pdrop
model = AutoModelForCausalLM.from_config(config)
For this tutorial, the configuration is stored as configuration/gpt2_small_config.json
. We’ve fine-tuned those numbers to work well on the RDU architecture.
Configuration file: gpt2_small_config.json
{
"activation_function": "gelu_new",
"architectures": [
"GPT2LMHeadModel"
],
"attn_pdrop": 0.1,
"bos_token_id": 50256,
"embd_pdrop": 0.1,
"eos_token_id": 50256,
"initializer_range": 0.02,
"layer_norm_epsilon": 1e-05,
"model_type": "gpt2",
"n_ctx": 1024,
"n_embd": 768,
"n_head": 12,
"n_layer": 12,
"n_positions": 1024,
"resid_pdrop": 0.1,
"summary_activation": null,
"summary_first_dropout": 0.1,
"summary_proj_to_labels": true,
"summary_type": "cls_index",
"summary_use_proj": true,
"task_specific_params": {
"text-generation": {
"do_sample": true,
"max_length": 50
}
},
"vocab_size": 50257
}`
Improve efficiency on RDU by patching
Hugging Face supports the concept of patching a model. In this tutorial, we use patching to make the model run more efficiently on RDUs. We’ve defined gpt2_patch_helper
in the gpt2_patch.py
file, which patches module forward calls within a gpt2-based transformer model that’s been customized for RDU.
The model runs without the patch, but the optimization improves performance.
Patch the model
def patch_model(model: nn.Module, args: argparse.Namespace) -> nn.Module:
"""Patch the Hugging Face model to make it more efficient when running on RDU.
Args:
model (nn.Module): The Hugging Face model instance
args (argparse.Namespace): The parsed command line args
Returns:
nn.Module: The patched model instance
"""
return gpt2_patch_helper(model,
args.batch_size,
inference=args.inference,
max_length=args.max_seq_length,
max_pef_length=args.max_seq_length)
Convert the model to use the SambaFlow framework
To convert the model to use the SambaFlow framework, we call samba.from_torch_model
. You find the documentation in the API reference.
Create dummy tensors for compilation
The SambaFlow compiler maps the model graph onto an RDU. There are many computations that go on to produce the output tensors from the input tensors, and the compiler traces all of that computation. For compilation (tracing), the compiler doesn’t require actual data, but does need tensors of the same shape.
We achieve this with a call to get_model_trace_inputs()
.
-
First
get_model_trace_inputs()
callssamba.from_torch_tensor()
, which takes torch tensors as input and generates SambaTensor instances as output. The SambaTensors carry additional information that is used by the compiler to create a PEF. -
Then
get_model_trace_inputs()
performs some other conversion tasks.
The conversion from Toch tensor to SambaTensor is a minimum requirement for any model you want to run on RDU. |
Create dummy tensors for tracing
def get_model_trace_inputs(args: argparse.Namespace) -> Tuple[Any]:
"""Get input tensors to use for tracing the model.
Since they're only used for tracing, these tensors are composed of dummy data.
Args:
args (argparse.Namespace): Parsed command line arguments
Returns:
Tuple[Any]: Inputs to use for tracing
"""
# Make input_ids
input_ids = torch.randint(0, 5000, (
args.batch_size, args.max_seq_length), dtype=torch.int32)
input_ids = samba.from_torch_tensor(
input_ids, name="input_ids")
# Make position_ids
position_ids = torch.arange(args.max_seq_length)
position_ids = position_ids.short()
position_ids = samba.from_torch_tensor(
position_ids.unsqueeze(0).expand(input_ids.shape), name='input_position_ids')
# Make labels
labels = torch.ones(args.batch_size, args.max_seq_length, dtype=torch.int16)
labels = samba.from_torch_tensor(labels, name='labels')
# Prepare the tracing items
tracing_inputs = (
input_ids, None, None, None, position_ids, None, None, None, None, labels)
return tracing_inputs
Create the optimizers
An optimizer is an algorithm or method used to adjust the parameters (weights and biases) of a machine learning model during the training process. The optimizer:
-
Computes gradients to minimize loss.
-
Iteratively updates model parameters to work with the gradient values.
-
Determines the optimal learning rate. AdamW includes an adaptive learning rate algorithm.
SambaFlow has the AdamW optimizer built in. You can instead use your own optimizer. See Examine model code with external loss function for an example.
The get_optimizers()
function creates three custom optimizers based on AdamW.
-
One optimizer for the embeddings
-
One optimizer for parameters that need weight decay
-
One optimizer for parameters that don’t need weight decay (When decay is 0, AdamW behaves like Adam).
Create optimizers with get_optimizers
def get_optimizers(
args: argparse.Namespace, model: torch.nn.Module) -> List[torch.optim.Optimizer]:
"""Construct the optimizers
Create separate optimizers for Embeddings, parameters that need weight decay,
and parameters that do not
Args:
args (argparse.Namespace): The parsed command line arguments
model (torch.nn.Module): The model instance
Returns:
List[torch.optim.Optimizer]: The optimizers
"""
emb_modules = [module for module in model.modules() if isinstance(
module, torch.nn.Embedding)]
emb_params = OrderedSet(itertools.chain(*[emb.parameters() for emb in emb_modules]))
other_params = OrderedSet(
[(name, param) for name, param in model.named_parameters() if param not in emb_params])
# Exclude weight decay from bias & layernorm parameters
no_decay = ["bias"]
for name, params in model.named_parameters():
if "ln" in name or "layernorm" in name or "layer_norm" in name:
no_decay.append(name)
params_w_weight_decay = OrderedSet([(n, p) for n, p in other_params if not any(
nd in n for nd in no_decay)])
params_wo_weight_decay = OrderedSet([(n, p) for n, p in other_params if any(
nd in n for nd in no_decay)])
emb_optim = samba.optim.AdamW(
emb_params,
lr=args.learning_rate,
betas=(0.9, 0.997),
eps=1e-8,
weight_decay=args.weight_decay,
max_grad_norm=args.max_grad_norm_clip)
opt_w_weight_decay = samba.optim.AdamW(
[param for (name, param) in params_w_weight_decay],
lr=args.learning_rate,
betas=(0.9, 0.997),
weight_decay=args.weight_decay,
max_grad_norm=args.max_grad_norm_clip)
opt_wo_weight_decay = samba.optim.AdamW(
[param for (name, param) in params_wo_weight_decay],
lr=args.learning_rate,
betas=(0.9, 0.997),
max_grad_norm=args.max_grad_norm_clip)
return [emb_optim, opt_w_weight_decay, opt_wo_weight_decay]
Define model compilation
This model file defines how to perform compilation or training based on user input.
The compile function is defined in SambaSession.compile()
. See the API Reference for details.
We call compile()
with the following arguments:
-
model
is the RDU-ready model created bysamba.from_torch_model()
-
inputs
are returned byget_model_trace_inputs()
. -
name
is the name of the model -
init_output_grads
specifies whether to initialize the output gradient tensors. This argument is deprecated. -
inference
Compile the model
if args.command == 'compile':
samba.session.compile(model,
inputs,
optims,
name='hf_transformer',
app_dir=samba.utils.get_file_dir(file),
init_output_grads=True,
inference=args.inference)
Define model training
To train the model, we call train()
with the user-specified arguments, the model, and certain outputs of compilation.
Train the model
elif args.command == 'run':
traced_outputs = utils.trace_graph(
model, inputs, optims, pef=args.pef, init_output_grads=True)
train(args, model, traced_outputs)
Train function overview
The code for the training loop is at the heart of the model.
-
At the top level,
train()
consists of three nested loops:-
The outermost
while
loop checks for completion. Until training completes, this loop calls theevaluate()
function, which prints progress information to stdout. When training completes,evaluate()
prints the final eval total loss. -
The intermediate
for
loop is looping over dataloaders to so all dataloaders can be processed. -
The innermost
for
loop is looping over batches and feeds them tomodel_step()
, which trains the batches
-
-
The
train()
function leverages the Hugging Face tokenizer that is included with any model. It tracks the number of steps, loss values, etc. -
The code also includes a progress bar (see
pbar.update(1)
). -
Evaluate the model result. Evaluation accuracy has to be greater than what’s defined in command-line args or an error results.
-
At the end,
train()
saves the checkpoint for the number of steps that the user asked for. Checkpoints are SambaTensors converted to numeric data corresponding to torch tensors along with the model’s state dictionary. You call the PyTorchsave()
function to create a checkpoint explicitly.
Train function
def train(args: argparse.Namespace, model: nn.Module, traced_outputs: List[SambaTensor]):
"""Perform the training procedure
Args:
args (argparse.Namespace): The parsed command line arguments
model (nn.Module): The model instance
traced_outputs (List[SambaTensor]): The outputs of the model from tracing
"""
eval_total_loss = None
epoch = 1
total_steps_taken = 0
training_done = False
# Depend on the provided step count rather than epochs
while not training_done:
print(f"Training Epoch {epoch}")
dataloaders = get_epoch_train_iterators(args)
with build_progress_bar(dataloaders) as pbar:
for dataloader in dataloaders:
# Break out if steps exceed specified steps
if total_steps_taken >= args.steps:
if not training_done:
print(f"Finished training at {total_steps_taken} steps!")
training_done = True
break
for batch in dataloader:
# Break out if steps exceed specified steps
if total_steps_taken >= args.steps:
if not training_done:
print(f"Finished training at {total_steps_taken} steps!")
training_done = True
break
inputs, target_token_type_ids = prepare_inputs(args, batch)
inputs = [t for t in inputs if t is not None]
# Take one training step
loss = model_step(
args, model, inputs, target_token_type_ids, traced_outputs)
train_loss = loss.item()
total_steps_taken += 1
pbar.update(1)
pbar.set_description(f"Training loss: {train_loss}")
if not training_done:
eval_total_loss, eval_acc = evaluate(args, model, traced_outputs)
print(
f"Evaluation Results At Step {total_steps_taken} :
Total Loss: {eval_total_loss}, Eval acc: {eval_acc}")
epoch += 1
print("Finished training")
# Evaluate
eval_total_loss, eval_acc = evaluate(args, model, traced_outputs)
print(f"Final eval total loss: {eval_total_loss}\nFinal eval accuracy: {eval_acc}")
if args.min_eval_acc > 0.0:
assert eval_acc >= args.min_eval_acc,
f"Obtained eval_acc={eval_acc}, Expected Minimum eval_acc={args.min_eval_acc}"
# Save checkpoint
save_checkpoint(model, args.checkpoint_name)
Top-level loop: evaluate()
top-level loop
while not training_done:
print(f"Training Epoch {epoch}")
dataloaders = get_epoch_train_iterators(args)
with build_progress_bar(dataloaders) as pbar:
....
eval_total_loss, eval_acc = evaluate(args, model, traced_outputs)
print(f"Final eval total loss: {eval_total_loss}\nFinal eval accuracy: {eval_acc}")
if args.min_eval_acc > 0.0:
assert eval_acc >= args.min_eval_acc,
f"Obtained eval_acc={eval_acc}, Expected Minimum eval_acc={args.min_eval_acc}"
# Save checkpoint
save_checkpoint(model, args.checkpoint_name)
Intermediate loop: prepare_inputs()
prepare_inputs() function
def prepare_inputs(
args: argparse.Namespace,
inputs: Tuple[torch.Tensor, torch.Tensor]) ->
Tuple[Sequence[Optional[torch.Tensor]], torch.Tensor]:
"""Prepare a batch of torch tensors from the data loader for passing into the model.
This involves creating position IDs, shifting over the input_ids by 1 position
to create labels, and creating target_token_type_ids to match the labels. The final
tuple of tensors should match the shape of the inputs used for tracing.
Args:
args (argparse.Namespace): The parsed command line args
inputs (Tuple[torch.Tensor, torch.Tensor]): The inputs from the data loader
Returns:
Tuple[Sequence[Optional[torch.Tensor]], torch.Tensor]:
The inputs for run, and the token type IDs
"""
input_ids = inputs[0].int()
batch_size = input_ids.shape[0]
# The train dataloader does not contain the following entries
position_ids = torch.arange(args.max_seq_length)
position_ids = position_ids.unsqueeze(0).expand(input_ids.shape)
position_ids = position_ids.short()
# Prepare the attention mask for the Hugging Face Module
# set the labels to the input_ids, to be modified in the GPT model
labels = input_ids.short()
labels = labels[..., 1:]
labels = torch.cat(
(labels, torch.ones([labels.shape[0], 1], dtype=labels.dtype) * -100), dim=1)
# Construct the token type IDs
token_type_ids = inputs[1].int()
target_token_type_ids = token_type_ids[..., 1:]
target_token_type_ids = torch.cat(
(target_token_type_ids,
torch.ones(
[target_token_type_ids.shape[0], 1],
dtype=target_token_type_ids.dtype) * PADDING_TOKEN_TYPE_ID),
dim=1)
# Pad inputs to match the batch size
if batch_size < args.batch_size:
input_ids = pad_tensor(input_ids, args.batch_size, 0)
position_ids = pad_tensor(position_ids, args.batch_size, 0)
labels = pad_tensor(labels, args.batch_size, -100)
target_token_type_ids = pad_tensor(
target_token_type_ids, args.batch_size, PADDING_TOKEN_TYPE_ID)
traced_inputs = (
input_ids, None, None, None, position_ids, None, None, None, None, labels)
return traced_inputs, target_token_type_id
Innermost loop: feed batches to model_step()
model_step() function
def model_step(args: argparse.Namespace,
model: nn.Module,
inputs: List[torch.Tensor],
target_token_type_ids: torch.Tensor,
traced_outputs: List[SambaTensor]) -> torch.Tensor:
"""Take one training step on RDU
Args:
args (argparse.Namespace): The parsed command line arguments
model (nn.Module): The model instance
inputs (List[torch.Tensor]): The inputs for this step
target_token_type_ids (torch.Tensor): The token type IDs
traced_outputs (List[SambaTensor]): The outputs of the model from tracing
Returns:
torch.Tensor: The loss for this step
"""
inputs = [ipt for ipt in inputs if ipt is not None]
learning_rate = args.learning_rate
dropout_rate = args.dropout
hyper_dict = {'lr': learning_rate}
dropout_dict = {'p': dropout_rate}
hyperparam_dict = {hyper_dict, dropout_dict}
# Compute loss scale
loss_scale = compute_loss_scale(
args, inputs[-1], target_token_type_ids, model.output_tensors[0].dtype)
# Convert input tensors to SambaTensor
inputs_this_step = get_runtime_inputs(inputs)
# Set the gradient of the output
traced_outputs[0].sn_grad = loss_scale
outputs = samba.session.run(inputs_this_step,
traced_outputs,
hyperparam_dict=hyperparam_dict,
section_types=['FWD', 'BCKWD', 'GRADNORM', 'OPT'])
samba_loss = outputs[0]
loss = samba.to_torch(samba_loss).float()
loss *= loss_scale.float()
loss = loss.sum()
return loss
As part of model_step()
we compute the loss.
compute_loss() function
def compute_loss_scale(
args: argparse.Namespace,
targets: torch.Tensor,
target_token_type_ids: torch.Tensor,
output_dtype: Union[torch.dtype, str]) -> torch.Tensor:
"""Compute the scale factor of the loss, depending on the labels indicated by/
the padding tokens' ignored indices.
This is used to compute the correct value of the loss gradient to start backpropagation
Args:
args (argparse.Namespace): Parsed command line args
targets (torch.Tensor): Target tensor for training
target_token_type_ids (torch.Tensor): Token type IDs of the target tensor
output_dtype (Union[torch.dtype, str]): Data type to output the loss scale in
Returns:
torch.Tensor: The scale factor of the loss
"""
# ignore_index = -100 by default
grad_scale_not_ignored = ~targets.eq(-100)
# token_type_id = 2 identifies padding <eos> tokens
grad_scale_not_ignored[target_token_type_ids.eq(
PADDING_TOKEN_TYPE_ID)] = False
grad_scale = grad_scale_not_ignored.float()
# token_type_id = 0 identifies prompt tokens
grad_scale[target_token_type_ids.eq(
PROMPT_TOKEN_TYPE_ID)] *= args.prompt_loss_weight
# normalize so that grad_scales sum to 1
grad_scale /= torch.sum(grad_scale)
loss_scale = grad_scale.bfloat16().to(output_dtype).flatten()
return loss_scale