DLRM源码解读

目录

参数设置

import argparse
parser = argparse.ArgumentParser(
    description="Train Deep Learning Recommendation Model (DLRM)"
)

parser.add_argument("--arch-sparse-feature-size", type=int, default=2)
...
parser.add_argument("--lr-num-decay-steps", type=int, default=0)

# 解析命令行参数,返回一个包含参数值的 Namespace 对象

args = parser.parse_args()

生成数据

Random

train_data, train_ld, test_data, test_ld = dp.make_random_data_and_loader(
    args, ln_emb, m_den
)

def make_random_data_and_loader(
    args,
    ln_emb,
    m_den,
    offset_to_length_converter=False,
):
	
    # 构造自定义数据集
    
    train_data = RandomDataset(
        m_den,
        ln_emb,
        args.data_size,
        args.num_batches,
        args.mini_batch_size,
        args.num_indices_per_lookup,
        args.num_indices_per_lookup_fixed,
        1,  # num_targets
        args.round_targets,
        args.data_generation,
        args.data_trace_file,
        args.data_trace_enable_padding,
        reset_seed_on_access=True,
        rand_data_dist=args.rand_data_dist,
        rand_data_min=args.rand_data_min,
        rand_data_max=args.rand_data_max,
        rand_data_mu=args.rand_data_mu,
        rand_data_sigma=args.rand_data_sigma,
        rand_seed=args.numpy_rand_seed,
    )  # WARNING: generates a batch of lookups at once

	...
    
    # 从自定义数据集中批量(mini-batch)加载数据
    
    train_loader = torch.utils.data.DataLoader(
        train_data,
        batch_size=1,
        shuffle=False,
        num_workers=args.num_workers,
        collate_fn=collate_wrapper_random,
        pin_memory=False,
        drop_last=False,  # True
    )

	...
    
    return train_data, train_loader, test_data, test_loader
# 自定义数据集:继承Dataset,重写__getitem__, __len__ 方法

class RandomDataset(Dataset):
    def __init__(
        self,
        m_den,
        ln_emb,
        data_size,
        num_batches,
        mini_batch_size,
        num_indices_per_lookup,
        num_indices_per_lookup_fixed,
        num_targets=1,
        round_targets=False,
        data_generation="random",
        trace_file="",
        enable_padding=False,
        reset_seed_on_access=False,
        rand_data_dist="uniform",
        rand_data_min=1,
        rand_data_max=1,
        rand_data_mu=-1,
        rand_data_sigma=1,
        rand_seed=0,
    ):
        # compute batch size
        
        nbatches = int(np.ceil((data_size * 1.0) / mini_batch_size))
        if num_batches != 0:
            nbatches = num_batches
            data_size = nbatches * mini_batch_size
            # print("Total number of batches %d" % nbatches)

        # save args (recompute data_size if needed)
        
        self.m_den = m_den
        self.ln_emb = ln_emb
        self.data_size = data_size
        self.num_batches = nbatches
        self.mini_batch_size = mini_batch_size
        self.num_indices_per_lookup = num_indices_per_lookup
        self.num_indices_per_lookup_fixed = num_indices_per_lookup_fixed
        self.num_targets = num_targets
        self.round_targets = round_targets
        self.data_generation = data_generation
        self.trace_file = trace_file
        self.enable_padding = enable_padding
        self.reset_seed_on_access = reset_seed_on_access
        self.rand_seed = rand_seed
        self.rand_data_dist = rand_data_dist
        self.rand_data_min = rand_data_min
        self.rand_data_max = rand_data_max
        self.rand_data_mu = rand_data_mu
        self.rand_data_sigma = rand_data_sigma

    def reset_numpy_seed(self, numpy_rand_seed):
        np.random.seed(numpy_rand_seed)
        # torch.manual_seed(numpy_rand_seed)

    # 根据给定的索引返回相应的数据样本
    
    def __getitem__(self, index):

        # 如果slice是切片类型,则迭代调用__getitem__方法,返回一个列表
        
        if isinstance(index, slice):
            return [
                # 调用__getitem__方法
                
                self[idx]
                # 如果index.start为None,则默认取0
                
                for idx in range(
                    index.start or 0, index.stop or len(self), index.step or 1
                )
            ]

		...

        # number of data points in a batch
        
        n = min(self.mini_batch_size, self.data_size - (index * self.mini_batch_size))

        # generate a batch of dense and sparse features
        
        if self.data_generation == "random":
            (X, lS_o, lS_i) = generate_dist_input_batch(
                self.m_den,
                self.ln_emb,
                n,
                self.num_indices_per_lookup,
                self.num_indices_per_lookup_fixed,
                rand_data_dist=self.rand_data_dist,
                rand_data_min=self.rand_data_min,
                rand_data_max=self.rand_data_max,
                rand_data_mu=self.rand_data_mu,
                rand_data_sigma=self.rand_data_sigma,
            )
		...

        # generate a batch of target (probability of a click)
        
        T = generate_random_output_batch(n, self.num_targets, self.round_targets)

        return (X, lS_o, lS_i, T)
	
    # 返回batch的数量
    
    def __len__(self):
        # WARNING: note that we produce bacthes of outputs in __getitem__
        # therefore we should use num_batches rather than data_size below
        
        return self.num_batches
# random data from uniform or gaussian ditribution (input data)

def generate_dist_input_batch(
    m_den,
    ln_emb,
    n,
    num_indices_per_lookup,
    num_indices_per_lookup_fixed,
    rand_data_dist,
    rand_data_min,
    rand_data_max,
    rand_data_mu,
    rand_data_sigma,
):
    # dense feature
    # 标准正态分布,n个样本,m_den个feature
    
    Xt = torch.tensor(ra.rand(n, m_den).astype(np.float32))

    # sparse feature (sparse indices)
    # lS_emb_offsets和lS_emb_indices分别是存储离散特征偏移量和索引的列表
    
    lS_emb_offsets = []
    lS_emb_indices = []
    # for each embedding generate a list of n lookups,
    # where each lookup is composed of multiple sparse indices
    
    for size in ln_emb:
        lS_batch_offsets = []
        lS_batch_indices = []
        offset = 0
        for _ in range(n):
            # num of sparse indices to be used per embedding (between
            
            if num_indices_per_lookup_fixed:
                sparse_group_size = np.int64(num_indices_per_lookup)
            else:
                # random between [1,num_indices_per_lookup])
                
                r = ra.random(1)
                sparse_group_size = np.int64(
                    np.round(max([1.0], r * min(size, num_indices_per_lookup)))
                )
            # sparse indices to be used per embedding
            
            if rand_data_dist == "gaussian":
                if rand_data_mu == -1:
                    rand_data_mu = (rand_data_max + rand_data_min) / 2.0
                r = ra.normal(rand_data_mu, rand_data_sigma, sparse_group_size)
                sparse_group = np.clip(r, rand_data_min, rand_data_max)
                sparse_group = np.unique(sparse_group).astype(np.int64)
            elif rand_data_dist == "uniform":
                r = ra.random(sparse_group_size)
                sparse_group = np.unique(np.round(r * (size - 1)).astype(np.int64))
            else:
                raise (
                    rand_data_dist,
                    "distribution is not supported. \
                     please select uniform or gaussian",
                )

            # reset sparse_group_size in case some index duplicates were removed
            
            sparse_group_size = np.int64(sparse_group.size)
            # store lengths and indices
            
            lS_batch_offsets += [offset]
            lS_batch_indices += sparse_group.tolist()
            # update offset for next iteration
            
            offset += sparse_group_size
        lS_emb_offsets.append(torch.tensor(lS_batch_offsets))
        lS_emb_indices.append(torch.tensor(lS_batch_indices))

    return (Xt, lS_emb_offsets, lS_emb_indices)

最终生成的random特征如下,对应于batch_size=2, dense_feature_num=4,sparse_feature_num=3,sparse_feature的类别分别为(4,3,2)

Xt:
tensor([[4.17022e-01, 7.20325e-01, 1.14375e-04, 3.02333e-01],
        [1.46756e-01, 9.23386e-02, 1.86260e-01, 3.45561e-01]])
        
lS_emb_indices:
[tensor([1, 2, 0, 1, 3]), tensor([1, 0]), tensor([0, 1, 1])]

lS_emb_offsets:
[tensor([0, 2]), tensor([0, 1]), tensor([0, 2])]

对齐网络节点数量

例如:

# sanity check: dense feature sizes and mlp dimensions must match

if m_den != ln_bot[0]:
    sys.exit(
        "ERROR: arch-dense-feature-size "
        + str(m_den)
        + " does not match first dim of bottom mlp "
        + str(ln_bot[0])
    )

# sparse特征和dense特征的size对齐

if m_spa != m_den_out:
    sys.exit(
        "ERROR: arch-sparse-feature-size "
        + str(m_spa)
        + " does not match last dim of bottom mlp "
        + str(m_den_out)
    )

并输出:

model arch:
mlp top arch 3 layers, with input to output dimensions:
[8 4 2 1]
# of interactions
8
mlp bot arch 2 layers, with input to output dimensions:
[4 3 2]
# of features (sparse and dense)
4
dense feature size
4
sparse feature size
2
# of embeddings (= # of sparse features) 3, with dimensions 2x:
[4 3 2]
data (inputs and targets):
mini-batch: 0
# dense features: 4个特征
tensor([[4.1702e-01, 7.2032e-01, 1.1437e-04, 3.0233e-01],
        [1.4676e-01, 9.2339e-02, 1.8626e-01, 3.4556e-01]])
# spare features: 3个特征
tensor([[2, 3],
        [1, 1],
        [2, 1]], dtype=torch.int32)
# spare features: 同一个batch的特征放在一起,根据offset可以转换成上面的形式
[tensor([1, 2, 0, 1, 3]), tensor([1, 0]), tensor([0, 1, 1])]
# 预测目标(probability of a click):
tensor([[0.8946],
        [0.0850]])
        
mini-batch: 1
tensor([[0.0391, 0.1698, 0.8781, 0.0983],
        [0.4211, 0.9579, 0.5332, 0.6919]])
tensor([[1, 3],
        [2, 1],
        [1, 1]], dtype=torch.int32)
[tensor([2, 0, 2, 3]), tensor([1, 2, 1]), tensor([0, 0])]
tensor([[0.6788],
        [0.2116]])
        
mini-batch: 2
tensor([[0.2655, 0.4916, 0.0534, 0.5741],
        [0.1467, 0.5893, 0.6998, 0.1023]])
tensor([[2, 1],
        [2, 2],
        [1, 1]], dtype=torch.int32)
[tensor([1, 2, 2]), tensor([1, 2, 0, 2]), tensor([1, 0])]
tensor([[0.9275],
        [0.3478]])

模型训练

整体流程

先初始化dlrm训练对象

global dlrm
dlrm = DLRM_Net(
    m_spa,
    ln_emb,
    ln_bot,
    ln_top,
    arch_interaction_op=args.arch_interaction_op,
    arch_interaction_itself=args.arch_interaction_itself,
    sigmoid_bot=-1,
    sigmoid_top=ln_top.size - 2,
    sync_dense_params=args.sync_dense_params,
    loss_threshold=args.loss_threshold,
    ndevices=ndevices,
    qr_flag=args.qr_flag,
    qr_operation=args.qr_operation,
    qr_collisions=args.qr_collisions,
    qr_threshold=args.qr_threshold,
    md_flag=args.md_flag,
    md_threshold=args.md_threshold,
    weighted_pooling=args.weighted_pooling,
    loss_function=args.loss_function
)

进入epoch循环,按min-batch进行训练:

while k < args.nepochs:
	...
  for j, inputBatch in enumerate(train_ld):
    ...

前向传播,其中X代表Dense features:torch.Size([batch_size, feature_num])。lS_o与lS_i是sparse feature的offsetindex

Z = dlrm_wrap(
        X,
        lS_o,
        lS_i,
        use_gpu,
        device,
        ndevices=ndevices,
                    )

def dlrm_wrap(X, lS_o, lS_i, use_gpu, device, ndevices=1):
    ...
    # 调用nn.Module的__call__()方法,会将输入传递给模型的forward()方法,并返回输出
    
    return dlrm(X.to(device), lS_o, lS_i)

计算loss:

E = loss_fn_wrap(Z, T, use_gpu, device)
# compute loss and accuracy

L = E.detach().cpu().numpy()  # numpy array

反向传播,计算梯度,更新模型参数:

E.backward()

if (args.mlperf_logging and (j + 1) % args.mlperf_grad_accum_iter == 0) or not args.mlperf_logging:
    optimizer.step()
    lr_scheduler.step()

初始化网络

初始化embedding

# m_spa: sparse特征的embedding size
# ln_emb: list, 长度表示sparse特征的数量,值表示每个特征的可能取值数量

self.emb_l, w_list = self.create_emb(m_spa, ln_emb, weighted_pooling)

def create_emb(self, m, ln, weighted_pooling=None):
    ...
    for i in range(0, ln.size):
        n = ln[i]
        ...
        # sparse=True: 使用稀疏张量来存储嵌入矩阵
        
        EE = nn.EmbeddingBag(n, m, mode="sum", sparse=True)
        # initialize embeddings
        
        W = np.random.uniform(
            low=-np.sqrt(1 / n), high=np.sqrt(1 / n), size=(n, m)
        ).astype(np.float32)
        # approach 1
        EE.weight.data = torch.tensor(W, requires_grad=True)
        ...
    	emb_l.append(EE)
    
    return emb_l, v_W_l

初始化bottom mlp,top mlp

# ln_bot: bottom mlp不同层的神经元个数,例如[4,3,2]

self.bot_l = self.create_mlp(ln_bot, sigmoid_bot)
self.top_l = self.create_mlp(ln_top, sigmoid_top)

def create_mlp(self, ln, sigmoid_layer):
    # build MLP layer by layer
    
    layers = nn.ModuleList()
    for i in range(0, ln.size - 1):
        n = ln[i]
        m = ln[i + 1]

        # construct fully connected operator
        
        LL = nn.Linear(int(n), int(m), bias=True)

        # initialize the weights
        # with torch.no_grad():
        # custom Xavier input, output or two-sided fill
        
        mean = 0.0  # std_dev = np.sqrt(variance)
        std_dev = np.sqrt(2 / (m + n))  # np.sqrt(1 / m) # np.sqrt(1 / n)
        W = np.random.normal(mean, std_dev, size=(m, n)).astype(np.float32)
        std_dev = np.sqrt(1 / m)  # np.sqrt(2 / (m + 1))
        bt = np.random.normal(mean, std_dev, size=m).astype(np.float32)
        # approach 1
        LL.weight.data = torch.tensor(W, requires_grad=True)
        LL.bias.data = torch.tensor(bt, requires_grad=True)
        # approach 2
        # LL.weight.data.copy_(torch.tensor(W))
        # LL.bias.data.copy_(torch.tensor(bt))
        # approach 3
        # LL.weight = Parameter(torch.tensor(W),requires_grad=True)
        # LL.bias = Parameter(torch.tensor(bt),requires_grad=True)
        
        layers.append(LL)

        # construct sigmoid or relu operator
        
        if i == sigmoid_layer:
            layers.append(nn.Sigmoid())
            else:
                layers.append(nn.ReLU())

     # approach 1: use ModuleList
     # return layers
     # approach 2: use Sequential container to wrap all layers
    
     return torch.nn.Sequential(*layers)

指定loss function

if self.loss_function == "mse":
    self.loss_fn = torch.nn.MSELoss(reduction="mean")
elif self.loss_function == "bce":
    self.loss_fn = torch.nn.BCELoss(reduction="mean")

前向传播

def sequential_forward(self, dense_x, lS_o, lS_i):
    # process dense features (using bottom mlp), resulting in a row vector
    
    x = self.apply_mlp(dense_x, self.bot_l)
    # debug prints
    # print("intermediate")
    # print(x.detach().cpu().numpy())

    # process sparse features(using embeddings), resulting in a list of row vectors
    
    ly = self.apply_emb(lS_o, lS_i, self.emb_l, self.v_W_l)
    # for y in ly:
    #     print(y.detach().cpu().numpy())

    # interact features (dense and sparse)
    
    z = self.interact_features(x, ly)
    # print(z.detach().cpu().numpy())

    # obtain probability of a click (using top mlp)
    
    p = self.apply_mlp(z, self.top_l)

    # clamp output if needed
    
    if 0.0 < self.loss_threshold and self.loss_threshold < 1.0:
        z = torch.clamp(p, min=self.loss_threshold, max=(1.0 - self.loss_threshold))
    else:
        z = p

    return z

def apply_mlp(self, x, layers):
    return layers(x)

# 得到sparse feature的embedding

def apply_emb(self, lS_o, lS_i, emb_l, v_W_l):
    ly = []
    for k, sparse_index_group_batch in enumerate(lS_i):
        sparse_offset_group_batch = lS_o[k]
      	...
        E = emb_l[k]
        V = E(
            sparse_index_group_batch,
            sparse_offset_group_batch,
            per_sample_weights=per_sample_weights,
        )

        ly.append(V)
    
    return ly

特征交互

def interact_features(self, x, ly):

    if self.arch_interaction_op == "dot":
        # concatenate dense and sparse features
        
        (batch_size, d) = x.shape
        T = torch.cat([x] + ly, dim=1).view((batch_size, -1, d))
        # perform a dot product
        
        Z = torch.bmm(T, torch.transpose(T, 1, 2))
        # append dense feature with the interactions (into a row vector)
        # approach 1: all
        # Zflat = Z.view((batch_size, -1))
        # approach 2: unique
        
        _, ni, nj = Z.shape
        # approach 1: tril_indices
        # offset = 0 if self.arch_interaction_itself else -1
        # li, lj = torch.tril_indices(ni, nj, offset=offset)
        # approach 2: custom
        
        offset = 1 if self.arch_interaction_itself else 0
        li = torch.tensor([i for i in range(ni) for j in range(i + offset)])
        lj = torch.tensor([j for i in range(nj) for j in range(i + offset)])
        Zflat = Z[:, li, lj]
        # concatenate dense features and interactions
        
        R = torch.cat([x] + [Zflat], dim=1)
     elif self.arch_interaction_op == "cat":
        # concatenation features (into a row vector)
        
        R = torch.cat([x] + ly, dim=1)
     else:
        sys.exit(
            "ERROR: --arch-interaction-op="
            + self.arch_interaction_op
            + " is not supported"
        )

     return R

反向传播

def loss_fn_wrap(Z, T, use_gpu, device):
    with record_function("DLRM loss compute"):
        if args.loss_function == "mse" or args.loss_function == "bce":
            return dlrm.loss_fn(Z, T.to(device))
        elif args.loss_function == "wbce":
            loss_ws_ = dlrm.loss_ws[T.data.view(-1).long()].view_as(T).to(device)
            loss_fn_ = dlrm.loss_fn(Z, T.to(device))
            loss_sc_ = loss_ws_ * loss_fn_
            return loss_sc_.mean()

学习率调度器,用于调整模型优化器的学习率,合理的学习率可以加速模型的收敛速度

class LRPolicyScheduler(_LRScheduler):
    def __init__(self, optimizer, num_warmup_steps, decay_start_step, num_decay_steps):
        self.num_warmup_steps = num_warmup_steps
        self.decay_start_step = decay_start_step
        self.decay_end_step = decay_start_step + num_decay_steps
        self.num_decay_steps = num_decay_steps

        if self.decay_start_step < self.num_warmup_steps:
            sys.exit("Learning rate warmup must finish before the decay starts")

        super(LRPolicyScheduler, self).__init__(optimizer)

    def get_lr(self):
        step_count = self._step_count
        if step_count < self.num_warmup_steps:
            # warmup
            scale = 1.0 - (self.num_warmup_steps - step_count) / self.num_warmup_steps
            lr = [base_lr * scale for base_lr in self.base_lrs]
            self.last_lr = lr
        elif self.decay_start_step <= step_count and step_count < self.decay_end_step:
            # decay
            decayed_steps = step_count - self.decay_start_step
            scale = ((self.num_decay_steps - decayed_steps) / self.num_decay_steps) ** 2
            min_lr = 0.0000001
            lr = [max(min_lr, base_lr * scale) for base_lr in self.base_lrs]
            self.last_lr = lr
        else:
            if self.num_decay_steps > 0:
                # freeze at last, either because we're after decay
                # or because we're between warmup and decay
                lr = self.last_lr
            else:
                # do not adjust
                lr = self.base_lrs
        return lr

推理

量化

dlrm = torch.quantization.quantize_dynamic(dlrm, {torch.nn.Linear}, quantize_dtype)
dlrm.quantize_embedding(args.quantize_emb_with_bit)

use GPU

模型搬到GPU

dlrm = dlrm.to(device)

数据搬到GPU

X.to(device)

https://developer.nvidia.com/blog/optimizing-dlrm-on-nvidia-gpus/

https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/Recommendation/DLRM

实验

CPU

data-size mini-batch-size arch-sparse-feature-size arch-mlp-bot arch-embedding-size nepochs 运行时间(s)
1000000 1000 2 4-3-2 4-3-2 10 900
1000000 100 2 4-3-2 4-3-2 1 120
1000000 128 16 13-512-256-64-16 100-100-100-100-100-100-100-100-100-100-100-100-1000-1000-1000 1 520
10000000 128 16 13-512-256-64-16 100-100-100-100-100-100-100-100-100-100-100-100-1000-1000-1000 1 5052

1kw数据,内存也无压力,因为embedding矩阵是稀疏矩阵的形式存储的

一块RTX3050搞定DLRM训练

https://zhuanlan.zhihu.com/p/575091968

https://www.hpc-ai.tech/blog/embedding-training-with-1-gpu-memory-and-10-times-less-budget-an-open-source-solution-for