参数设置
import argparse
parser = argparse.ArgumentParser(
description="Train Deep Learning Recommendation Model (DLRM)"
)
parser.add_argument("--arch-sparse-feature-size", type=int, default=2)
...
parser.add_argument("--lr-num-decay-steps", type=int, default=0)
# 解析命令行参数,返回一个包含参数值的 Namespace 对象
args = parser.parse_args()
生成数据
Random
train_data, train_ld, test_data, test_ld = dp.make_random_data_and_loader(
args, ln_emb, m_den
)
def make_random_data_and_loader(
args,
ln_emb,
m_den,
offset_to_length_converter=False,
):
# 构造自定义数据集
train_data = RandomDataset(
m_den,
ln_emb,
args.data_size,
args.num_batches,
args.mini_batch_size,
args.num_indices_per_lookup,
args.num_indices_per_lookup_fixed,
1, # num_targets
args.round_targets,
args.data_generation,
args.data_trace_file,
args.data_trace_enable_padding,
reset_seed_on_access=True,
rand_data_dist=args.rand_data_dist,
rand_data_min=args.rand_data_min,
rand_data_max=args.rand_data_max,
rand_data_mu=args.rand_data_mu,
rand_data_sigma=args.rand_data_sigma,
rand_seed=args.numpy_rand_seed,
) # WARNING: generates a batch of lookups at once
...
# 从自定义数据集中批量(mini-batch)加载数据
train_loader = torch.utils.data.DataLoader(
train_data,
batch_size=1,
shuffle=False,
num_workers=args.num_workers,
collate_fn=collate_wrapper_random,
pin_memory=False,
drop_last=False, # True
)
...
return train_data, train_loader, test_data, test_loader
# 自定义数据集:继承Dataset,重写__getitem__, __len__ 方法
class RandomDataset(Dataset):
def __init__(
self,
m_den,
ln_emb,
data_size,
num_batches,
mini_batch_size,
num_indices_per_lookup,
num_indices_per_lookup_fixed,
num_targets=1,
round_targets=False,
data_generation="random",
trace_file="",
enable_padding=False,
reset_seed_on_access=False,
rand_data_dist="uniform",
rand_data_min=1,
rand_data_max=1,
rand_data_mu=-1,
rand_data_sigma=1,
rand_seed=0,
):
# compute batch size
nbatches = int(np.ceil((data_size * 1.0) / mini_batch_size))
if num_batches != 0:
nbatches = num_batches
data_size = nbatches * mini_batch_size
# print("Total number of batches %d" % nbatches)
# save args (recompute data_size if needed)
self.m_den = m_den
self.ln_emb = ln_emb
self.data_size = data_size
self.num_batches = nbatches
self.mini_batch_size = mini_batch_size
self.num_indices_per_lookup = num_indices_per_lookup
self.num_indices_per_lookup_fixed = num_indices_per_lookup_fixed
self.num_targets = num_targets
self.round_targets = round_targets
self.data_generation = data_generation
self.trace_file = trace_file
self.enable_padding = enable_padding
self.reset_seed_on_access = reset_seed_on_access
self.rand_seed = rand_seed
self.rand_data_dist = rand_data_dist
self.rand_data_min = rand_data_min
self.rand_data_max = rand_data_max
self.rand_data_mu = rand_data_mu
self.rand_data_sigma = rand_data_sigma
def reset_numpy_seed(self, numpy_rand_seed):
np.random.seed(numpy_rand_seed)
# torch.manual_seed(numpy_rand_seed)
# 根据给定的索引返回相应的数据样本
def __getitem__(self, index):
# 如果slice是切片类型,则迭代调用__getitem__方法,返回一个列表
if isinstance(index, slice):
return [
# 调用__getitem__方法
self[idx]
# 如果index.start为None,则默认取0
for idx in range(
index.start or 0, index.stop or len(self), index.step or 1
)
]
...
# number of data points in a batch
n = min(self.mini_batch_size, self.data_size - (index * self.mini_batch_size))
# generate a batch of dense and sparse features
if self.data_generation == "random":
(X, lS_o, lS_i) = generate_dist_input_batch(
self.m_den,
self.ln_emb,
n,
self.num_indices_per_lookup,
self.num_indices_per_lookup_fixed,
rand_data_dist=self.rand_data_dist,
rand_data_min=self.rand_data_min,
rand_data_max=self.rand_data_max,
rand_data_mu=self.rand_data_mu,
rand_data_sigma=self.rand_data_sigma,
)
...
# generate a batch of target (probability of a click)
T = generate_random_output_batch(n, self.num_targets, self.round_targets)
return (X, lS_o, lS_i, T)
# 返回batch的数量
def __len__(self):
# WARNING: note that we produce bacthes of outputs in __getitem__
# therefore we should use num_batches rather than data_size below
return self.num_batches
# random data from uniform or gaussian ditribution (input data)
def generate_dist_input_batch(
m_den,
ln_emb,
n,
num_indices_per_lookup,
num_indices_per_lookup_fixed,
rand_data_dist,
rand_data_min,
rand_data_max,
rand_data_mu,
rand_data_sigma,
):
# dense feature
# 标准正态分布,n个样本,m_den个feature
Xt = torch.tensor(ra.rand(n, m_den).astype(np.float32))
# sparse feature (sparse indices)
# lS_emb_offsets和lS_emb_indices分别是存储离散特征偏移量和索引的列表
lS_emb_offsets = []
lS_emb_indices = []
# for each embedding generate a list of n lookups,
# where each lookup is composed of multiple sparse indices
for size in ln_emb:
lS_batch_offsets = []
lS_batch_indices = []
offset = 0
for _ in range(n):
# num of sparse indices to be used per embedding (between
if num_indices_per_lookup_fixed:
sparse_group_size = np.int64(num_indices_per_lookup)
else:
# random between [1,num_indices_per_lookup])
r = ra.random(1)
sparse_group_size = np.int64(
np.round(max([1.0], r * min(size, num_indices_per_lookup)))
)
# sparse indices to be used per embedding
if rand_data_dist == "gaussian":
if rand_data_mu == -1:
rand_data_mu = (rand_data_max + rand_data_min) / 2.0
r = ra.normal(rand_data_mu, rand_data_sigma, sparse_group_size)
sparse_group = np.clip(r, rand_data_min, rand_data_max)
sparse_group = np.unique(sparse_group).astype(np.int64)
elif rand_data_dist == "uniform":
r = ra.random(sparse_group_size)
sparse_group = np.unique(np.round(r * (size - 1)).astype(np.int64))
else:
raise (
rand_data_dist,
"distribution is not supported. \
please select uniform or gaussian",
)
# reset sparse_group_size in case some index duplicates were removed
sparse_group_size = np.int64(sparse_group.size)
# store lengths and indices
lS_batch_offsets += [offset]
lS_batch_indices += sparse_group.tolist()
# update offset for next iteration
offset += sparse_group_size
lS_emb_offsets.append(torch.tensor(lS_batch_offsets))
lS_emb_indices.append(torch.tensor(lS_batch_indices))
return (Xt, lS_emb_offsets, lS_emb_indices)
最终生成的random
特征如下,对应于batch_size=2
, dense_feature_num=4
,sparse_feature_num=3
,sparse_feature
的类别分别为(4,3,2)
Xt:
tensor([[4.17022e-01, 7.20325e-01, 1.14375e-04, 3.02333e-01],
[1.46756e-01, 9.23386e-02, 1.86260e-01, 3.45561e-01]])
lS_emb_indices:
[tensor([1, 2, 0, 1, 3]), tensor([1, 0]), tensor([0, 1, 1])]
lS_emb_offsets:
[tensor([0, 2]), tensor([0, 1]), tensor([0, 2])]
对齐网络节点数量
例如:
# sanity check: dense feature sizes and mlp dimensions must match
if m_den != ln_bot[0]:
sys.exit(
"ERROR: arch-dense-feature-size "
+ str(m_den)
+ " does not match first dim of bottom mlp "
+ str(ln_bot[0])
)
# sparse特征和dense特征的size对齐
if m_spa != m_den_out:
sys.exit(
"ERROR: arch-sparse-feature-size "
+ str(m_spa)
+ " does not match last dim of bottom mlp "
+ str(m_den_out)
)
并输出:
model arch:
mlp top arch 3 layers, with input to output dimensions:
[8 4 2 1]
# of interactions
8
mlp bot arch 2 layers, with input to output dimensions:
[4 3 2]
# of features (sparse and dense)
4
dense feature size
4
sparse feature size
2
# of embeddings (= # of sparse features) 3, with dimensions 2x:
[4 3 2]
data (inputs and targets):
mini-batch: 0
# dense features: 4个特征
tensor([[4.1702e-01, 7.2032e-01, 1.1437e-04, 3.0233e-01],
[1.4676e-01, 9.2339e-02, 1.8626e-01, 3.4556e-01]])
# spare features: 3个特征
tensor([[2, 3],
[1, 1],
[2, 1]], dtype=torch.int32)
# spare features: 同一个batch的特征放在一起,根据offset可以转换成上面的形式
[tensor([1, 2, 0, 1, 3]), tensor([1, 0]), tensor([0, 1, 1])]
# 预测目标(probability of a click):
tensor([[0.8946],
[0.0850]])
mini-batch: 1
tensor([[0.0391, 0.1698, 0.8781, 0.0983],
[0.4211, 0.9579, 0.5332, 0.6919]])
tensor([[1, 3],
[2, 1],
[1, 1]], dtype=torch.int32)
[tensor([2, 0, 2, 3]), tensor([1, 2, 1]), tensor([0, 0])]
tensor([[0.6788],
[0.2116]])
mini-batch: 2
tensor([[0.2655, 0.4916, 0.0534, 0.5741],
[0.1467, 0.5893, 0.6998, 0.1023]])
tensor([[2, 1],
[2, 2],
[1, 1]], dtype=torch.int32)
[tensor([1, 2, 2]), tensor([1, 2, 0, 2]), tensor([1, 0])]
tensor([[0.9275],
[0.3478]])
模型训练
整体流程
先初始化dlrm训练对象
global dlrm
dlrm = DLRM_Net(
m_spa,
ln_emb,
ln_bot,
ln_top,
arch_interaction_op=args.arch_interaction_op,
arch_interaction_itself=args.arch_interaction_itself,
sigmoid_bot=-1,
sigmoid_top=ln_top.size - 2,
sync_dense_params=args.sync_dense_params,
loss_threshold=args.loss_threshold,
ndevices=ndevices,
qr_flag=args.qr_flag,
qr_operation=args.qr_operation,
qr_collisions=args.qr_collisions,
qr_threshold=args.qr_threshold,
md_flag=args.md_flag,
md_threshold=args.md_threshold,
weighted_pooling=args.weighted_pooling,
loss_function=args.loss_function
)
进入epoch循环,按min-batch进行训练:
while k < args.nepochs:
...
for j, inputBatch in enumerate(train_ld):
...
前向传播,其中X代表Dense features:torch.Size([batch_size, feature_num])。lS_o与lS_i是sparse feature的offset
和index
Z = dlrm_wrap(
X,
lS_o,
lS_i,
use_gpu,
device,
ndevices=ndevices,
)
def dlrm_wrap(X, lS_o, lS_i, use_gpu, device, ndevices=1):
...
# 调用nn.Module的__call__()方法,会将输入传递给模型的forward()方法,并返回输出
return dlrm(X.to(device), lS_o, lS_i)
计算loss:
E = loss_fn_wrap(Z, T, use_gpu, device)
# compute loss and accuracy
L = E.detach().cpu().numpy() # numpy array
反向传播,计算梯度,更新模型参数:
E.backward()
if (args.mlperf_logging and (j + 1) % args.mlperf_grad_accum_iter == 0) or not args.mlperf_logging:
optimizer.step()
lr_scheduler.step()
初始化网络
初始化embedding
# m_spa: sparse特征的embedding size
# ln_emb: list, 长度表示sparse特征的数量,值表示每个特征的可能取值数量
self.emb_l, w_list = self.create_emb(m_spa, ln_emb, weighted_pooling)
def create_emb(self, m, ln, weighted_pooling=None):
...
for i in range(0, ln.size):
n = ln[i]
...
# sparse=True: 使用稀疏张量来存储嵌入矩阵
EE = nn.EmbeddingBag(n, m, mode="sum", sparse=True)
# initialize embeddings
W = np.random.uniform(
low=-np.sqrt(1 / n), high=np.sqrt(1 / n), size=(n, m)
).astype(np.float32)
# approach 1
EE.weight.data = torch.tensor(W, requires_grad=True)
...
emb_l.append(EE)
return emb_l, v_W_l
初始化bottom mlp
,top mlp
# ln_bot: bottom mlp不同层的神经元个数,例如[4,3,2]
self.bot_l = self.create_mlp(ln_bot, sigmoid_bot)
self.top_l = self.create_mlp(ln_top, sigmoid_top)
def create_mlp(self, ln, sigmoid_layer):
# build MLP layer by layer
layers = nn.ModuleList()
for i in range(0, ln.size - 1):
n = ln[i]
m = ln[i + 1]
# construct fully connected operator
LL = nn.Linear(int(n), int(m), bias=True)
# initialize the weights
# with torch.no_grad():
# custom Xavier input, output or two-sided fill
mean = 0.0 # std_dev = np.sqrt(variance)
std_dev = np.sqrt(2 / (m + n)) # np.sqrt(1 / m) # np.sqrt(1 / n)
W = np.random.normal(mean, std_dev, size=(m, n)).astype(np.float32)
std_dev = np.sqrt(1 / m) # np.sqrt(2 / (m + 1))
bt = np.random.normal(mean, std_dev, size=m).astype(np.float32)
# approach 1
LL.weight.data = torch.tensor(W, requires_grad=True)
LL.bias.data = torch.tensor(bt, requires_grad=True)
# approach 2
# LL.weight.data.copy_(torch.tensor(W))
# LL.bias.data.copy_(torch.tensor(bt))
# approach 3
# LL.weight = Parameter(torch.tensor(W),requires_grad=True)
# LL.bias = Parameter(torch.tensor(bt),requires_grad=True)
layers.append(LL)
# construct sigmoid or relu operator
if i == sigmoid_layer:
layers.append(nn.Sigmoid())
else:
layers.append(nn.ReLU())
# approach 1: use ModuleList
# return layers
# approach 2: use Sequential container to wrap all layers
return torch.nn.Sequential(*layers)
指定loss function
if self.loss_function == "mse":
self.loss_fn = torch.nn.MSELoss(reduction="mean")
elif self.loss_function == "bce":
self.loss_fn = torch.nn.BCELoss(reduction="mean")
前向传播
def sequential_forward(self, dense_x, lS_o, lS_i):
# process dense features (using bottom mlp), resulting in a row vector
x = self.apply_mlp(dense_x, self.bot_l)
# debug prints
# print("intermediate")
# print(x.detach().cpu().numpy())
# process sparse features(using embeddings), resulting in a list of row vectors
ly = self.apply_emb(lS_o, lS_i, self.emb_l, self.v_W_l)
# for y in ly:
# print(y.detach().cpu().numpy())
# interact features (dense and sparse)
z = self.interact_features(x, ly)
# print(z.detach().cpu().numpy())
# obtain probability of a click (using top mlp)
p = self.apply_mlp(z, self.top_l)
# clamp output if needed
if 0.0 < self.loss_threshold and self.loss_threshold < 1.0:
z = torch.clamp(p, min=self.loss_threshold, max=(1.0 - self.loss_threshold))
else:
z = p
return z
def apply_mlp(self, x, layers):
return layers(x)
# 得到sparse feature的embedding
def apply_emb(self, lS_o, lS_i, emb_l, v_W_l):
ly = []
for k, sparse_index_group_batch in enumerate(lS_i):
sparse_offset_group_batch = lS_o[k]
...
E = emb_l[k]
V = E(
sparse_index_group_batch,
sparse_offset_group_batch,
per_sample_weights=per_sample_weights,
)
ly.append(V)
return ly
特征交互
def interact_features(self, x, ly):
if self.arch_interaction_op == "dot":
# concatenate dense and sparse features
(batch_size, d) = x.shape
T = torch.cat([x] + ly, dim=1).view((batch_size, -1, d))
# perform a dot product
Z = torch.bmm(T, torch.transpose(T, 1, 2))
# append dense feature with the interactions (into a row vector)
# approach 1: all
# Zflat = Z.view((batch_size, -1))
# approach 2: unique
_, ni, nj = Z.shape
# approach 1: tril_indices
# offset = 0 if self.arch_interaction_itself else -1
# li, lj = torch.tril_indices(ni, nj, offset=offset)
# approach 2: custom
offset = 1 if self.arch_interaction_itself else 0
li = torch.tensor([i for i in range(ni) for j in range(i + offset)])
lj = torch.tensor([j for i in range(nj) for j in range(i + offset)])
Zflat = Z[:, li, lj]
# concatenate dense features and interactions
R = torch.cat([x] + [Zflat], dim=1)
elif self.arch_interaction_op == "cat":
# concatenation features (into a row vector)
R = torch.cat([x] + ly, dim=1)
else:
sys.exit(
"ERROR: --arch-interaction-op="
+ self.arch_interaction_op
+ " is not supported"
)
return R
反向传播
def loss_fn_wrap(Z, T, use_gpu, device):
with record_function("DLRM loss compute"):
if args.loss_function == "mse" or args.loss_function == "bce":
return dlrm.loss_fn(Z, T.to(device))
elif args.loss_function == "wbce":
loss_ws_ = dlrm.loss_ws[T.data.view(-1).long()].view_as(T).to(device)
loss_fn_ = dlrm.loss_fn(Z, T.to(device))
loss_sc_ = loss_ws_ * loss_fn_
return loss_sc_.mean()
学习率调度器,用于调整模型优化器的学习率,合理的学习率可以加速模型的收敛速度
class LRPolicyScheduler(_LRScheduler):
def __init__(self, optimizer, num_warmup_steps, decay_start_step, num_decay_steps):
self.num_warmup_steps = num_warmup_steps
self.decay_start_step = decay_start_step
self.decay_end_step = decay_start_step + num_decay_steps
self.num_decay_steps = num_decay_steps
if self.decay_start_step < self.num_warmup_steps:
sys.exit("Learning rate warmup must finish before the decay starts")
super(LRPolicyScheduler, self).__init__(optimizer)
def get_lr(self):
step_count = self._step_count
if step_count < self.num_warmup_steps:
# warmup
scale = 1.0 - (self.num_warmup_steps - step_count) / self.num_warmup_steps
lr = [base_lr * scale for base_lr in self.base_lrs]
self.last_lr = lr
elif self.decay_start_step <= step_count and step_count < self.decay_end_step:
# decay
decayed_steps = step_count - self.decay_start_step
scale = ((self.num_decay_steps - decayed_steps) / self.num_decay_steps) ** 2
min_lr = 0.0000001
lr = [max(min_lr, base_lr * scale) for base_lr in self.base_lrs]
self.last_lr = lr
else:
if self.num_decay_steps > 0:
# freeze at last, either because we're after decay
# or because we're between warmup and decay
lr = self.last_lr
else:
# do not adjust
lr = self.base_lrs
return lr
推理
量化
dlrm = torch.quantization.quantize_dynamic(dlrm, {torch.nn.Linear}, quantize_dtype)
dlrm.quantize_embedding(args.quantize_emb_with_bit)
use GPU
模型搬到GPU
dlrm = dlrm.to(device)
数据搬到GPU
X.to(device)
https://developer.nvidia.com/blog/optimizing-dlrm-on-nvidia-gpus/
https://github.com/NVIDIA/DeepLearningExamples/tree/master/PyTorch/Recommendation/DLRM
实验
CPU
data-size | mini-batch-size | arch-sparse-feature-size | arch-mlp-bot | arch-embedding-size | nepochs | 运行时间(s) |
---|---|---|---|---|---|---|
1000000 | 1000 | 2 | 4-3-2 | 4-3-2 | 10 | 900 |
1000000 | 100 | 2 | 4-3-2 | 4-3-2 | 1 | 120 |
1000000 | 128 | 16 | 13-512-256-64-16 | 100-100-100-100-100-100-100-100-100-100-100-100-1000-1000-1000 | 1 | 520 |
10000000 | 128 | 16 | 13-512-256-64-16 | 100-100-100-100-100-100-100-100-100-100-100-100-1000-1000-1000 | 1 | 5052 |
1kw数据,内存也无压力,因为embedding矩阵是稀疏矩阵的形式存储的
一块RTX3050搞定DLRM训练
https://zhuanlan.zhihu.com/p/575091968
https://www.hpc-ai.tech/blog/embedding-training-with-1-gpu-memory-and-10-times-less-budget-an-open-source-solution-for