Learn about model creation with SambaFlow

In this tutorial you examine one of the examples included in your SambaFlow installation. We’ll look at the Python code for the logreg.py example with a focus how creating a model for RDU is different from creating a model for other hardware.

Prerequisites

Ensure that the sambaflow package is installed, and that the SambaNova Daemon is running. See Prepare your environment for details.

In Hello SambaFlow! Compile and run a model, you set up your environment and get started with logreg.py. The example uses:

  • A Python program, available at /opt/sambaflow/apps/starters/logreg/logreg.py

  • A simple neural network dataset (MNIST), which is available in Torchvision. The example application downloads the dataset when you run the model.

In this tutorial you learn what’s inside the Python code.

Explore model development

Developing a model with SambaFlow is similar to developing a model with the PyTorch NN modules. If you’re repurposing a PyTorch model, you have to convert PyTorch tensors to SambaTensors and likely make other changes so that the model can run on RDU instead of CPU. See Convert existing models to SambaFlow.

If you want to run the model and you don’t have access to the internet in your environment, download the MNIST dataset on a system that has access, and make the data available to your system. See (Optional) Download model data for details.

Model definition

The model definition specifies the layers in the model and the number of features in each layer. This model consists of only one linear layer. We calculate the loss inside the model — this way the loss calculation is performed on the RDU as well.

Here’s the Python code:

LogReg class
class LogReg(nn.Module):
    """
    Define the model architecture i.e. the layers in the model and the
    number of features in each layer

    :ivar lin_layer: Linear layer
    :ivar criterion: Cross Entropy loss layer
    """
    def __init__(self, num_features: int, num_classes: int):
        """

        :param num_features: Number of input features for the model
        :param num_classes: Number of output labels the model classifies inputs
        """
        super().__init__()

        #: Linear layer for predicting target class of inputs
        self.lin_layer = nn.Linear(in_features=num_features,
                                   out_features=num_classes)

        #: Cross Entropy layer for loss computation
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, inputs: torch.Tensor,
                targets: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Forward pass of the model for the given inputs. The forward pass
        predicts the class labels for the inputs and computes the loss
        between the correct and predicted class labels.

        :param inputs: The input samples in the dataset
        :type inputs: torch.Tensor

        :param targets: The correct labels for the inputs
        :type targets: torch.Tensor

        :return: The loss and predicted classes of the inputs
        :rtype: torch.Tensor
        """
        out = self.lin_layer(inputs)
        loss = self.criterion(out, targets)
        return loss, out

Imports

Our model imports several Python modules. Here’s the Python code, followed by an explanation of each import.

Imports
import argparse
import sys
from typing import Tuple

import sambaflow.samba.utils as utils
import torch
import torch.distributed as dist
import torch.nn as nn
import torchvision
from sambaflow import samba
from sambaflow.samba.utils.argparser import parse_app_args
from sambaflow.samba.utils.dataset.mnist import dataset_transform
from sambaflow.samba.utils.pef_utils import get_pefmeta
  • sambaflow.samba is the set of SambaFlow modules.

  • sambaflow.samba.utils contains all the utilities, such as tracing etc.

  • parse_app_args is our built-in argument parsing support for each supported execution mode (more details below).

  • dataset_transform is a utility function to transform the data.

  • get_pefmeta saves the model’s metadata in the resulting executable file (PEF file).

Command-line argument specifications

The add_args and add_run_args functions define parameters for use with this model.

add_args function
def add_args(parser: argparse.ArgumentParser) -> None:

    # General args
    parser.add_argument('--lr',
                        type=float,
                        default=0.001,
                        help="Learning rate for training")
    parser.add_argument('--momentum',
                        type=float,
                        default=0.0,
                        help="Momentum value for training")
    parser.add_argument('--weight-decay',
                        type=float,
                        default=1e-4,
                        help="Weight decay for training")
    parser.add_argument('-e', '--num-epochs', type=int, default=1)
    parser.add_argument('--num-features', type=int, default=784)
    parser.add_argument('--num-classes', type=int, default=10)
    parser.add_argument('--weight-norm',
                        action="store_true",
                        help="Enable weight normalization")
    parser.add_argument(
        '--acc-test',
        action='store_true',
        help='Option for accuracy guard test in RDU regression.')
    # end args

def add_run_args(parser: argparse.ArgumentParser) -> None:

    # Runtime args
    parser.add_argument('--data-folder',
                        type=str,
                        default='mnist_data',
                        help="The folder to download the MNIST dataset to.")
    # end args

Users of the model can specify command-line arguments set model parameters.

  • --num-epochs or -e specifies the number of epochs to run the training loop.

  • --num-features specifies the embedding dimension of the input data.

  • --num-classes is the number of different classes in our classification problem. For the MNIST example, the number of different classes is ten for digits from 0 to 9.

  • --data-folder specifies the download location for the MNIST data.

Data preparation

Data preparation is pretty standard (and familiar to those who’ve worked with PyTorch datasets).

prepare_dataloader() function
def prepare_dataloader(
        args: argparse.Namespace
) -> Tuple[torch.utils.data.DataLoader, torch.utils.data.DataLoader]:
    """
    Prep work to train the logreg model with the `MNIST dataset <http://yann.lecun.com/exdb/mnist/>`__:

    We'll split the dataset into train and test sets and return the corresponding data loaders

    :param args: argument specifying the location of the dataset
    :type args: argparse.Namespace

    :return: Train and test data loaders
    :rtype: Tuple[torch.utils.data.DataLoader]
    """

    # Get the train & test data (images and labels) from the MNIST dataset
    train_dataset = torchvision.datasets.MNIST(
        root=f'{args.data_folder}',
        train=True,
        transform=dataset_transform(args),
        download=True)
    test_dataset = torchvision.datasets.MNIST(
        root=f'{args.data_folder}',
        train=False,
        transform=dataset_transform(args))

    # Get the train & test data loaders (input pipeline)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=args.batch_size,
                                               shuffle=True)
    test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                              batch_size=args.batch_size,
                                              shuffle=False)
    return train_loader, test_loader

Train function definition

The train() function defines the training logic. It is similar to a typical PyTorch training loop.

  • The outer loop iterates over the number of epochs provided by the --num-epochs argument.

  • The inner loop iterates over the training data.

Let’s look at the annotated code first, and then explore some details that correspond to the numbers.

train() function
def train(args: argparse.Namespace, model: nn.Module) -> None:
    """
    Train the model.
    At the end of a training loop, the model will be able
    to correctly predict the class labels for any input, within a certain
    accuracy.

    :param args: Hyperparameter values and accuracy test behavior controls
    :type args: argparse.Namespace

    :param model: Model to be trained
    :type model: torch.nn.Module

    """

    # Get data loaders for training and test data
    train_loader, test_loader = prepare_dataloader(args)

    # Total training steps (iterations) per epoch
    total_step = len(train_loader)

    hyperparam_dict = {
        "lr": args.lr,
        "momentum": args.momentum,
        "weight_decay": args.weight_decay
    }

    # Train and test for specified number of epochs
    for epoch in range(args.num_epochs):
        avg_loss = 0

        # Train the model for all samples in the train data loader
        for i, (images, labels) in enumerate(train_loader):
            sn_images = samba.from_torch_tensor(images, name='image', batch_dim=0)  (1)
            sn_labels = samba.from_torch_tensor(labels, name='label', batch_dim=0)

            loss, outputs = samba.session.run(       (2)
                input_tensors=[sn_images, sn_labels],
                output_tensors=model.output_tensors,
                hyperparam_dict=hyperparam_dict,
                data_parallel=args.data_parallel,
                reduce_on_rdu=args.reduce_on_rdu)

            # Sync the loss and outputs with host memory
            loss, outputs = samba.to_torch(loss), samba.to_torch(outputs)
            avg_loss += loss.mean()

            # Print loss per 10,000th sample in every epoch
            if (i + 1) % 10000 == 0 and args.local_rank <= 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1, args.num_epochs, i + 1, total_step,
                    avg_loss / (i + 1)))

        # Check the accuracy of the trained model for all samples in the test data loader
        # Sync the model parameters with host memory
        samba.session.to_cpu(model)
        test_acc = 0.0
        with torch.no_grad():
            correct = 0
            total = 0
            total_loss = 0
            for images, labels in test_loader:
                loss, outputs = model(images, labels)
                loss, outputs = samba.to_torch(loss), samba.to_torch(outputs)  (3)
                total_loss += loss.mean()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum()

            test_acc = 100.0 * correct / total

            if args.local_rank <= 0:
                print(
                    'Test Accuracy: {:.2f}'.format(test_acc),
                    ' Loss: {:.4f}'.format(total_loss.item() /
                                           (len(test_loader))))

        if args.acc_test:
            assert args.num_epochs == 1, "Accuracy test only supported for 1 epoch"
            assert test_acc > 91.0 and test_acc < 92.0, "Test accuracy not within specified bounds."

Here’s some detail on the code fragments annotated with a number above. Expand the code sample to see the numbers.

1 We use the function from_torch_tensor to create SambaFlow tensors (SambaTensor) from PyTorch tensors. This function is similar to the torch.from_numpy function in PyTorch, which creates a PyTorch tensor from a NumPy array. Similarly, the function samba.to_torch creates a PyTorch tensor from a SambaTensor.
2 When we run the model on the device, we call the samba.session.run function:
loss, outputs = samba.session.run(
                input_tensors=[sn_images, sn_labels],
                output_tensors=model.output_tensors,
                hyperparam_dict=hyperparam_dict,
                data_parallel=args.data_parallel,
                reduce_on_rdu=args.reduce_on_rdu)

The code uses model.run([sn_images, sn_labels], hyperparam_dict=hyperparam_dict).

3 To collect data about loss and output and print those data we convert back from SambaTensors to PyTorch tensors.

Main function

The main function runs in different modes depending on the command-line input. The two main execution modes are compile and run.

Here’s how compiling and running a model works:

  1. For compilation, the compiler needs to know the shapes of the input and output. Our code generates random SambaTensors (ipt and tgt) and pass them to the compilation command.

  2. You can compile the model by running the script with the compile command.

  3. After the compile run has produced a PEF file, you can run the script again with the run command, passing in the PEF file name as a parameter to do the training run.

main() function
def main(argv):
    """
    :param argv: Command line arguments (`compile`, `test` or `run`)
    """
    args = parse_app_args(argv=argv,
                          common_parser_fn=add_args,
                          run_parser_fn=add_run_args)

    # when it is not distributed mode, local rank is -1.
    args.local_rank = dist.get_rank() if dist.is_initialized() else -1

    # Create random input and output data for testing
    ipt = samba.randn(args.batch_size,
                      args.num_features,
                      name='image',
                      batch_dim=0,
                      named_dims=('B', 'F')).bfloat16().float()
    tgt = samba.randint(args.num_classes, (args.batch_size, ),
                        name='label',
                        batch_dim=0,
                        named_dims=('B', ))

    ipt.host_memory = False
    tgt.host_memory = False

    # Instantiate the model
    model = LogReg(args.num_features, args.num_classes)

    # Sync model parameters with RDU memory
    samba.from_torch_model_(model)

    # Annotate parameters if weight normalization is on
    if args.weight_norm:
        utils.weight_norm_(model.lin_layer)

    inputs = (ipt, tgt)

    # Instantiate an optimizer if the model will be trained
    if args.inference:
        optimizer = None
    else:
        # We use the SGD optimizer to update the weights of the model
        optimizer = samba.optim.SGD(model.parameters(),
                                    lr=args.lr,
                                    momentum=args.momentum,
                                    weight_decay=args.weight_decay)

    if args.command == "compile":
        #  Compile the model to generate a PEF (Plasticine Executable Format) binary
        samba.session.compile(model,
                              inputs,
                              optimizer,
                              name='logreg_torch',
                              app_dir=utils.get_file_dir(__file__),
                              config_dict=vars(args),
                              pef_metadata=get_pefmeta(args, model))

    elif args.command in ["test", "run"]:
        # Trace the compiled graph to initialize the model weights and input/output tensors
        # for execution on the RDU.
        # The PEF required for tracing is the binary generated during compilation
        # Mapping refers to how the model layers are arranged in a pipeline for execution.
        # Valid options: 'spatial' or 'section'
        utils.trace_graph(model,
                          inputs,
                          optimizer,
                          pef=args.pef,
                          mapping=args.mapping)

        if args.command == "test":
            # Test the model's functional correctness. This tests if the result of execution
            # on the RDU is comparable to that on a CPU. CPU run results are used as reference.
            # Note that this test is different from testing model fit during training.
            # Given the same initial weights and inputs, this tests if the graph execution
            # on RDU generates outputs that are comparable to those generated on a CPU.
            outputs = model.output_tensors
            test(args, model, inputs, outputs)

        elif args.command == "run":

            # Train the model on RDU. This is where the model will be trained
            # i.e. weights will be learned to fit the input dataset
            train(args, model)


if __name__ == '__main__':
    main(sys.argv[1:])

For discussion of a main() function that’s very similar to the function above, see Tie the pieces together with main().

Examine the complete example

You can examine the complete example here:

logreg.py complete example
"""
In this example, we will walk you through the process of defining a model,
compiling it, training and testing it on SN RDU
"""

import argparse
import sys
from typing import Tuple

import sambaflow.samba.utils as utils
import torch
import torch.distributed as dist
import torch.nn as nn
import torchvision
from sambaflow import samba
from sambaflow.samba.utils.argparser import parse_app_args
from sambaflow.samba.utils.dataset.mnist import dataset_transform
from sambaflow.samba.utils.pef_utils import get_pefmeta


class LogReg(nn.Module):
    """
    Define the model architecture i.e. the layers in the model and the
    number of features in each layer

    :ivar lin_layer: Linear layer
    :ivar criterion: Cross Entropy loss layer
    """
    def __init__(self, num_features: int, num_classes: int):
        """

        :param num_features: Number of input features for the model
        :param num_classes: Number of output labels the model classifies inputs
        """
        super().__init__()

        #: Linear layer for predicting target class of inputs
        self.lin_layer = nn.Linear(in_features=num_features,
                                   out_features=num_classes)

        #: Cross Entropy layer for loss computation
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, inputs: torch.Tensor,
                targets: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Forward pass of the model for the given inputs. The forward pass
        predicts the class labels for the inputs and computes the loss
        between the correct and predicted class labels.

        :param inputs: The input samples in the dataset
        :type inputs: torch.Tensor

        :param targets: The correct labels for the inputs
        :type targets: torch.Tensor

        :return: The loss and predicted classes of the inputs
        :rtype: torch.Tensor
        """
        out = self.lin_layer(inputs)
        loss = self.criterion(out, targets)
        return loss, out


def add_args(parser: argparse.ArgumentParser) -> None:

    # Compile time args
    parser.add_argument('--lr',
                        type=float,
                        default=0.001,
                        help="Learning rate for training")
    parser.add_argument('--momentum',
                        type=float,
                        default=0.0,
                        help="Momentum value for training")
    parser.add_argument('--weight-decay',
                        type=float,
                        default=1e-4,
                        help="Weight decay for training")
    parser.add_argument('-e', '--num-epochs', type=int, default=1)
    parser.add_argument('--num-features', type=int, default=784)
    parser.add_argument('--num-classes', type=int, default=10)
    parser.add_argument('--weight-norm',
                        action="store_true",
                        help="Enable weight normalization")
    parser.add_argument(
        '--acc-test',
        action='store_true',
        help='Option for accuracy guard test in CH regression.')
    # end args


def add_run_args(parser: argparse.ArgumentParser) -> None:

    # Runtime args
    parser.add_argument('--data-folder',
                        type=str,
                        default='mnist_data',
                        help="The folder to download the MNIST dataset to.")
    # end args


def prepare_dataloader(
        args: argparse.Namespace
) -> Tuple[torch.utils.data.DataLoader, torch.utils.data.DataLoader]:
    """
    Prep work to train the logreg model with the `MNIST dataset <http://yann.lecun.com/exdb/mnist/>`__:

    We'll split the dataset into train and test sets and return the corresponding data loaders

    :param args: argument specifying the location of the dataset
    :type args: argparse.Namespace

    :return: Train and test data loaders
    :rtype: Tuple[torch.utils.data.DataLoader]
    """

    # Get the train & test data (images and labels) from the MNIST dataset
    train_dataset = torchvision.datasets.MNIST(
        root=f'{args.data_folder}',
        train=True,
        transform=dataset_transform(args),
        download=True)
    test_dataset = torchvision.datasets.MNIST(
        root=f'{args.data_folder}',
        train=False,
        transform=dataset_transform(args))

    # Get the train & test data loaders (input pipeline)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=args.batch_size,
                                               shuffle=True)
    test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                              batch_size=args.batch_size,
                                              shuffle=False)
    return train_loader, test_loader


def train(args: argparse.Namespace, model: nn.Module) -> None:
    """
    Train the model.
    At the end of a training loop, the model will be able
    to correctly predict the class labels for any input, within a certain
    accuracy.

    :param args: Hyperparameter values and accuracy test behavior controls
    :type args: argparse.Namespace

    :param model: Model to be trained
    :type model: torch.nn.Module

    """

    # Get data loaders for training and test data
    train_loader, test_loader = prepare_dataloader(args)

    # Total training steps (iterations) per epoch
    total_step = len(train_loader)

    hyperparam_dict = {
        "lr": args.lr,
        "momentum": args.momentum,
        "weight_decay": args.weight_decay
    }

    # Train and test for specified number of epochs
    for epoch in range(args.num_epochs):
        avg_loss = 0

        # Train the model for all samples in the train data loader
        for i, (images, labels) in enumerate(train_loader):
            sn_images = samba.from_torch(images, name='image', batch_dim=0)
            sn_labels = samba.from_torch(labels, name='label', batch_dim=0)

            loss, outputs = samba.session.run(
                input_tensors=[sn_images, sn_labels],
                output_tensors=model.output_tensors,
                hyperparam_dict=hyperparam_dict,
                data_parallel=args.data_parallel,
                reduce_on_rdu=args.reduce_on_rdu)

            # Sync the loss and outputs with host memory
            loss, outputs = samba.to_torch(loss), samba.to_torch(outputs)
            avg_loss += loss.mean()

            # Print loss per 10,000th sample in every epoch
            if (i + 1) % 10000 == 0 and args.local_rank <= 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1, args.num_epochs, i + 1, total_step,
                    avg_loss / (i + 1)))

        # Check the accuracy of the trained model for all samples in the test data loader
        # Sync the model parameters with host memory
        samba.session.to_cpu(model)
        test_acc = 0.0
        with torch.no_grad():
            correct = 0
            total = 0
            total_loss = 0
            for images, labels in test_loader:
                loss, outputs = model(images, labels)
                loss, outputs = samba.to_torch(loss), samba.to_torch(outputs)
                total_loss += loss.mean()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum()

            test_acc = 100.0 * correct / total

            if args.local_rank <= 0:
                print(
                    'Test Accuracy: {:.2f}'.format(test_acc),
                    ' Loss: {:.4f}'.format(total_loss.item() /
                                           (len(test_loader))))

        if args.acc_test:
            assert args.num_epochs == 1, "Accuracy test only supported for 1 epoch"
            assert test_acc > 91.0 and test_acc < 92.0, "Test accuracy not within specified bounds."


def test(args: argparse.Namespace, model: nn.Module,
         inputs: Tuple[samba.SambaTensor],
         outputs: Tuple[samba.SambaTensor]) -> None:
    """
    Test the outputs generated on SN RDU against golden reference outputs generated on CPU

    :param args: Arguments to control
    :type args: argparse.Namespace

    :param model: Model instance
    :type model: torch.nn.Module

    :param inputs: RDU memory for the inputs
    :type inputs: Tuple[samba.SambaTensor]

    :param outputs: RDU memory for the outputs
    :type outputs: Tuple[samba.SambaTensor]
    """

    samba.session.tracing = False
    outputs_gold = model(*inputs)

    outputs_samba = samba.session.run(input_tensors=inputs,
                                      output_tensors=outputs,
                                      data_parallel=args.data_parallel,
                                      reduce_on_rdu=args.reduce_on_rdu)

    # Check that all RDU and CPU outputs match numerically
    for i, (output_samba,
            output_gold) in enumerate(zip(outputs_samba, outputs_gold)):
        print('samba:', output_samba)
        print('gold:', output_gold)
        utils.assert_close(output_samba,
                           output_gold,
                           f'forward output #{i}',
                           threshold=5e-3)

    if args.weight_norm:
        # Perform extra checks for verifying weight norm implementation
        g_output, v_output = samba.session.get_tensors_by_name([
            "logreg__lin_layer__weight_magnitude",
            "logreg__lin_layer__weight_direction"
        ])
        print("Magnitude_tensor", g_output)
        print("Direction_tensor", v_output)


def main(argv):
    """
    :param argv: Command line arguments (`compile`, `test` or `run`)
    """
    args = parse_app_args(argv=argv,
                          common_parser_fn=add_args,
                          run_parser_fn=add_run_args)

    # when it is not distributed mode, local rank is -1.
    args.local_rank = dist.get_rank() if dist.is_initialized() else -1

    # Create random input and output data for testing
    ipt = samba.randn(args.batch_size,
                      args.num_features,
                      name='image',
                      batch_dim=0,
                      named_dims=('B', 'F')).bfloat16().float()
    tgt = samba.randint(args.num_classes, (args.batch_size, ),
                        name='label',
                        batch_dim=0,
                        named_dims=('B', ))

    ipt.host_memory = False
    tgt.host_memory = False

    # Instantiate the model
    model = LogReg(args.num_features, args.num_classes)

    # Sync model parameters with RDU memory
    samba.from_torch_(model)

    # Annotate parameters if weight normalization is on
    if args.weight_norm:
        utils.weight_norm_(model.lin_layer)

    inputs = (ipt, tgt)

    # Instantiate an optimizer if the model will be trained
    if args.inference:
        optimizer = None
    else:
        # We use the SGD optimizer to update the weights of the model
        optimizer = samba.optim.SGD(model.parameters(),
                                    lr=args.lr,
                                    momentum=args.momentum,
                                    weight_decay=args.weight_decay)

    if args.command == "compile":
        #  Compile the model to generate a PEF (Plasticine Executable Format) binary
        samba.session.compile(model,
                              inputs,
                              optimizer,
                              name='logreg_torch',
                              app_dir=utils.get_file_dir(__file__),
                              config_dict=vars(args),
                              pef_metadata=get_pefmeta(args, model))

    elif args.command in ["test", "run"]:
        # Trace the compiled graph to initialize the model weights and input/output tensors
        # for execution on the RDU.
        # The PEF required for tracing is the binary generated during compilation
        # Mapping refers to how the model layers are arranged in a pipeline for execution.
        # Valid options: 'spatial' or 'section'
        utils.trace_graph(model,
                          inputs,
                          optimizer,
                          pef=args.pef,
                          mapping=args.mapping)

        if args.command == "test":
            # Test the model's functional correctness. This tests if the result of execution
            # on the RDU is comparable to that on a CPU. CPU run results are used as reference.
            # Note that this test is different from testing model fit during training.
            # Given the same initial weights and inputs, this tests if the graph execution
            # on RDU generates outputs that are comparable to those generated on a CPU.
            outputs = model.output_tensors
            test(args, model, inputs, outputs)

        elif args.command == "run":

            # Train the model on RDU. This is where the model will be trained
            # i.e. weights will be learned to fit the input dataset
            train(args, model)


if __name__ == '__main__':
    main(sys.argv[1:])