deepseek mtp 源码解析
统一计算,然后拆分: 先用self.mtp一口气计算出所有层级的隐状态[H₀...Hₖ],然后用chunk拆分开。迭代滚动,匹配目标: 在循环中,通过不断地将labels向前滚动,巧妙地为每个MTP模块(H₁H₂, …)匹配上正确的未来目标(next_2next_3, …)。计算损失,但不立即反传: 为每个MTP模块计算出mtp_loss。梯度“嫁接”: 使用这个“自动求导函数钩子”,将mtp_l
_postprocess方法中的这段代码是GPTModel中实现MTP(Multi-Token Prediction)功能的关键部分。它负责计算并应用MTP的损失。
让我们来详细解释这段代码在做什么,以及为什么这么设计。
# _postprocess 方法内部
# ... (前面的代码)
# 检查是否启用了MTP功能
if mtp_in_postprocess:
# 1. 执行 MTP Block 的前向传播
hidden_states = self.mtp(
input_ids=input_ids,
position_ids=position_ids,
hidden_states=hidden_states, # 这是来自主干网络的 H₀
attention_mask=attention_mask,
# ... 其他参数 ...
embedding=self.embedding, # 传入共享的 embedding 函数
**(extra_block_kwargs or {}),
)
# 如果模型本身没有 post_process (例如在流水线并行的中间阶段),直接返回
if not self.post_process:
return hidden_states
# 如果启用了 MTP
if self.mtp_process:
# 2. 准备 MTP 任务的 labels 和 loss_mask
mtp_labels = labels.clone() # 复制一份原始 labels
hidden_states_list = torch.chunk(hidden_states, 1 + self.config.mtp_num_layers, dim=0)
hidden_states = hidden_states_list[0] # H₀ 分离出来给主模型用
if loss_mask is None:
loss_mask = torch.ones_like(mtp_labels) # 如果没有提供 loss_mask,就默认全部计算损失
# 3. 循环计算每个 MTP 模块的损失
for mtp_layer_number in range(self.config.mtp_num_layers):
# 3.1. 计算当前 MTP 模块的 logits
mtp_logits, _ = self.output_layer(
hidden_states_list[mtp_layer_number + 1], # 使用 H₁, H₂, ...
weight=output_weight,
runtime_gather_output=runtime_gather_output,
)
# 3.2. 滚动(roll) labels 和 loss_mask 以获取下一个时间步的目标
mtp_labels, _ = roll_tensor(mtp_labels, shifts=-1, dims=-1, cp_group=self.cp_group)
loss_mask, num_tokens = roll_tensor(
loss_mask, shifts=-1, dims=-1, cp_group=self.cp_group
)
# 3.3. 计算语言模型损失
mtp_loss = self.compute_language_model_loss(mtp_labels, mtp_logits)
mtp_loss = loss_mask * mtp_loss # 应用 mask
# 3.4. (训练时) 记录损失用于日志
if self.training:
MTPLossLoggingHelper.save_loss_to_tracker(...)
# 3.5. 计算损失缩放因子
mtp_loss_scale = self.config.mtp_loss_scaling_factor / self.config.mtp_num_layers
# 3.6. 将 MTP loss “注入”到反向传播图中
if self.config.calculate_per_token_loss:
hidden_states = MTPLossAutoScaler.apply(
hidden_states, mtp_loss_scale * mtp_loss
)
else:
hidden_states = MTPLossAutoScaler.apply(
hidden_states, mtp_loss_scale * mtp_loss / num_tokens
)
# 4. 计算主模型的 logits 和 loss (使用 H₀)
# ... (后续代码计算主模型的 logits 和 loss) ...
代码逻辑详解
这段代码的核心思想是在计算主模型的损失之前,先把所有MTP模块的损失计算出来,并以一种巧妙的方式将它们“附加”到计算图中。
-
执行MTP Block (
self.mtp(...)):- 这是第一步,它接收主干网络输出的隐状态
H₀,然后串行地计算出所有MTP模块的隐状态H₁,H₂, …,Hₖ。 - 如我们之前所分析的,它返回一个沿序列长度维度拼接好的大张量
[H₀, H₁, H₂, ..., Hₖ]。
- 这是第一步,它接收主干网络输出的隐状态
-
准备数据 (
torch.chunk,clone):torch.chunk将这个大张量拆分回一个包含H₀,H₁,H₂, … 的列表hidden_states_list。hidden_states = hidden_states_list[0]将H₀单独拿出来,准备给后续的主模型损失计算使用。mtp_labels = labels.clone()创建一个labels的副本,因为接下来的循环会不断地修改它。
-
循环计算MTP损失: 这是最核心的部分。循环
k次,每次处理一个MTP模块。-
3.1. 获取Logits:
hidden_states_list[mtp_layer_number + 1]取出当前MTP模块的输出,即H₁,H₂, …。- 通过共享的
self.output_layer计算出该模块的mtp_logits。
-
3.2. 滚动目标 (
roll_tensor): 这是实现多步预测的关键技巧。- 在第一次循环(
mtp_layer_number=0,处理MTP Module 1)之前,mtp_labels是原始labels的副本,代表next_1的目标。 - 调用
roll_tensor(mtp_labels, shifts=-1)将mtp_labels向前滚动一位。现在,在i位置上的 label 变成了原始序列中i+2位置的 token。这正是MTP Module 1所需的next_2目标! - 在第二次循环中,再次滚动,
mtp_labels就变成了next_3的目标,正好对应MTP Module 2。 loss_mask也以同样的方式滚动,以确保在正确的位置应用mask。
- 在第一次循环(
-
3.3. 计算损失:
- 使用滚动后的
mtp_labels和刚计算出的mtp_logits,通过compute_language_model_loss(内部就是交叉熵)计算出当前MTP模块的损失mtp_loss。
- 使用滚动后的
-
3.4. 日志记录:
MTPLossLoggingHelper是一个工具类,用于收集每个MTP模块的损失值,以便在训练日志中分别打印出来,方便监控。
-
3.6. 注入梯度 (
MTPLossAutoScaler.apply): 这是最巧妙的部分。- MTP的损失计算出来了,但它如何参与到反向传播中呢?我们不能直接
mtp_loss.backward(),因为这会和主损失的backward冲突。 MTPLossAutoScaler是一个自定义的torch.autograd.Function。它的apply方法就像一个“钩子”或“信使”。- 它将
mtp_loss(乘以一个缩放因子)作为参数传递进去,但它的前向传播什么也不做,只是原样返回hidden_states(即H₀)。 - 它的魔法在反向传播时发生。当主损失的梯度流回
H₀时,会触发MTPLossAutoScaler的backward方法。在这个方法里,它会手动创建一个梯度,这个梯度就等于我们之前传入的mtp_loss。 - 这样,
mtp_loss就被有效地、以正确的缩放比例“注入”到了主计算图的梯度流中。最终H₀接收到的梯度 = 来自主损失的梯度 + 来自所有MTP损失的梯度。
- MTP的损失计算出来了,但它如何参与到反向传播中呢?我们不能直接
-
-
计算主模型损失:
- 在循环结束后,代码会继续执行,使用
hidden_states(即H₀) 和原始的labels去计算主模型的 logits 和损失。这个过程和没有MTP时完全一样。
- 在循环结束后,代码会继续执行,使用
总结
_postprocess 方法中的这段代码通过一个精巧的流程完成了MTP的损失计算和梯度注入:
- 统一计算,然后拆分: 先用
self.mtp一口气计算出所有层级的隐状态[H₀...Hₖ],然后用chunk拆分开。 - 迭代滚动,匹配目标: 在循环中,通过
roll_tensor不断地将labels向前滚动,巧妙地为每个MTP模块(H₁,H₂, …)匹配上正确的未来目标(next_2,next_3, …)。 - 计算损失,但不立即反传: 为每个MTP模块计算出
mtp_loss。 - 梯度“嫁接”: 使用
MTPLossAutoScaler.apply这个“自动求导函数钩子”,将mtp_loss附加到H₀上。这个操作在前向传播时是“透明”的,但在反向传播时,它会把mtp_loss的值作为梯度贡献给H₀,从而影响整个模型的参数更新。
这种设计使得MTP功能可以作为一个“插件”被集成到GPTModel中,而不需要大规模地修改核心的训练循环和反向传播逻辑,非常高效和解耦。
梯度“嫁接”: 使用 MTPLossAutoScaler.apply 这个“自动求导函数钩子”,将 mtp_loss 附加到 H₀ 上。这个操作在前向传播时是“透明”的,但在反向传播时,它会把 mtp_loss 的值作为梯度贡献给 H₀ 详细解释一下, apply 这是什么用法
好的,我们来深入剖析 MTPLossAutoScaler.apply 这个看似神秘的操作,以及它背后的 PyTorch autograd.Function 机制。这正是实现“梯度嫁接”的魔法所在。
1. 问题背景:为什么需要这么复杂的操作?
首先,要理解为什么不直接 total_loss = main_loss + mtp_loss_scaled 然后 total_loss.backward()。
在 Megatron 这种复杂的分布式训练框架中,损失计算和反向传播被拆分在不同的流水线阶段(Pipeline Stages)。
- 主损失
main_loss是在最后一个流水线阶段计算的,因为Output Head通常在那里。 H₀(以及所有 MTP 模块) 的计算可能在倒数第二个或更早的阶段。mtp_loss的计算依赖于 MTP 模块的输出Hᵢ,而Hᵢ又依赖于H₀。
这意味着,mtp_loss 和 main_loss 在计算图上的位置不同,并且它们都依赖于 H₀。我们希望 H₀ 能同时接收到来自这两个损失的梯度。
直接将两个标量损失相加在复杂的流水线调度中可能会引入同步问题,或者让计算图变得混乱。更重要的是,Megatron 需要对不同来源的梯度进行精确的缩放(Scaling),尤其是在使用 FP16/BF16 混合精度训练时,为了防止梯度消失/爆炸,所有损失的梯度都需要乘以一个全局的 loss_scale。
MTPLossAutoScaler 提供了一种解耦的方式:在计算 mtp_loss 的地方,我们不直接修改计算图的最终输出,而是将这个损失“暂存”起来,并让它在反向传播时“自动”以梯度的形式出现。
2. torch.autograd.Function 的工作原理
要理解 apply,首先要理解 torch.autograd.Function。
在 PyTorch 中,你定义的每一个操作(如 torch.add, torch.matmul)背后都有一个对应的 autograd.Function 子类。这个子类定义了两个核心的静态方法:
forward(ctx, *args, **kwargs): 定义了前向传播的行为。它接收输入张量,执行计算,并返回输出张量。ctx是一个上下文对象,可以用来“暂存”一些在前向传播中产生的、但反向传播时需要用到的张量(比如中间结果)。backward(ctx, *grad_outputs): 定义了反向传播的行为。它接收来自计算图后续节点的梯度grad_outputs,并需要计算和返回相对于forward方法输入的梯度。ctx对象可以用来取出在forward中暂存的张量。
当你创建一个 autograd.Function 的子类后,你可以通过 MyFunction.apply(...) 来调用它。apply 方法是 PyTorch 提供的标准入口,它会自动处理 autograd 引擎的连接,将你的自定义操作嵌入到计算图中。
3. MTPLossAutoScaler 的源码实现详解
让我们来看 MTPLossAutoScaler 的具体实现:
class MTPLossAutoScaler(torch.autograd.Function):
"""An AutoScaler that triggers the backward pass and scales the grad for mtp loss."""
main_loss_backward_scale: torch.Tensor = torch.tensor(1.0)
@staticmethod
def forward(ctx, output: torch.Tensor, mtp_loss: torch.Tensor):
"""
前向传播方法
"""
# 1. 将 mtp_loss 暂存起来,以便在 backward 时使用
ctx.save_for_backward(mtp_loss)
# 2. 直接返回第一个输入 `output` (也就是 H₀)
# 这个操作对于前向传播是“透明的”,它没有改变 H₀ 的值。
return output
@staticmethod
def backward(ctx, grad_output: torch.Tensor):
"""
反向传播方法
"""
# 1. 从上下文中取出之前暂存的 mtp_loss
(mtp_loss,) = ctx.saved_tensors
# 2. 获取主损失的梯度缩放因子 (这个值由框架在其他地方设置)
mtp_loss_backward_scale = MTPLossAutoScaler.main_loss_backward_scale
# 3. 创建 MTP 损失的梯度
# 注意!这里没有使用输入的 grad_output!
# 我们手动创建了一个和 mtp_loss 形状相同、但值全为 mtp_loss_backward_scale 的张量。
# 这一步是实现“梯度嫁接”的核心!
scaled_mtp_loss_grad = torch.ones_like(mtp_loss) * mtp_loss_backward_scale
# 4. 返回梯度
# backward 方法需要为 forward 的每个输入返回一个梯度。
# forward 的输入是 (output, mtp_loss)
# - 对应 output (H₀) 的梯度: 就是从后续节点传来的 grad_output
# - 对应 mtp_loss 的梯度: 就是我们刚刚手动创建的 scaled_mtp_loss_grad
return grad_output, scaled_mtp_loss_grad
4. apply 的用法和“梯度嫁接”过程
现在,我们看 apply 是如何被调用的:
# H₀ 的形状是 [s, b, h]
# mtp_loss 的形状是 [b, s]
hidden_states = MTPLossAutoScaler.apply(hidden_states, mtp_loss_scale * mtp_loss)
发生了什么?
在前向传播中:
MTPLossAutoScaler.forward被调用。output参数接收了hidden_states(H₀)。mtp_loss参数接收了mtp_loss_scale * mtp_loss。ctx.save_for_backward(...)将mtp_loss_scale * mtp_loss这个张量暂存了起来。forward方法原封不动地返回了hidden_states。- 所以,对于计算图的后续部分(比如计算主模型损失),它们看到的
hidden_states没有任何变化。这个操作是**“透明的”**。 - 但是,
autograd引擎已经记录下来:hidden_states的计算历史中,增加了一个MTPLossAutoScalerBackward节点。
在反向传播中:
- 当主损失的
backward()被调用,梯度会沿着计算图向后传播。 - 当梯度流回
MTPLossAutoScaler这个节点时,它的backward方法被触发。 backward方法接收到一个grad_output,这是主损失相对于hidden_states(H₀) 的梯度。backward方法执行了我们上面分析的逻辑:- 它直接将
grad_output作为H₀的梯度返回。 - 同时,它创建了一个新的梯度
scaled_mtp_loss_grad,并将其作为mtp_loss输入的梯度返回。
- 它直接将
等等,这看起来不对劲? mtp_loss 本身就是一个损失,为什么它还需要一个梯度?
这里的理解需要一点跳跃。实际上,在调用 torch.autograd.backward() 时,mtp_loss 张量也被隐式地认为是需要计算梯度的图的一部分。
torch.autograd.backward(tensors, grad_tensors) 的工作方式是:d(最终损失) / d(x) = sum( grad_tensors[i] * d(tensors[i]) / d(x) )
在我们这个场景中,当 MTPLossAutoScaler.backward 返回 (grad_output, scaled_mtp_loss_grad) 时,autograd 引擎会做如下计算来得到最终流向 H₀ 之前节点的梯度 G_final:
G_final = grad_output * d(H₀)/d(prev_node) + scaled_mtp_loss_grad * d(mtp_loss)/d(prev_node)
由于 mtp_loss 本身就是 H₀ (以及更早节点) 的函数,d(mtp_loss)/d(prev_node) 这一项是存在的。而我们手动提供的 scaled_mtp_loss_grad 就相当于 d(最终损失)/d(mtp_loss)。
一个更直观的理解:
把 autograd.Function 想象成一个自定义的梯度“阀门”。
- 前向: 阀门让
H₀顺畅通过,但悄悄记下了mtp_loss的值。 - 反向: 当主损失的梯度
grad_output从出口流回来时,阀门做了两件事:- 让
grad_output从入口流出去,传给H₀的上游。 - 同时,阀门自己打开了一个新的水源(
mtp_loss),并让这个新的水源也从入口流出去,与grad_output汇合。这个新水源的“水压”就是mtp_loss的值乘以缩放因子。
- 让
最终,H₀ 的上游节点(如 Transformer Block 的参数)感受到的总“水压”(总梯度)就是主损失的梯度和所有 MTP 损失的梯度的总和。
这就是“梯度嫁接”的实现原理:通过自定义 autograd.Function,在前向传播时“透明”地通过数据,但在反向传播时无中生有地注入一个新的梯度源,将其嫁接到主计算图的梯度流上。
主损失 main_loss 是在最后一个流水线阶段计算的,因为 Output Head 通常在那里。 H₀ (以及所有 MTP 模块) 的计算可能在倒数第二个或更早的阶段。 你确定对吗, MTP module 的output Head 和主函数是相通的
您提的这个问题非常精准,指出了一个关键的实现细节!您是对的,我的上一个解释为了简化流水线并行的概念,描述得不够精确。
您的观察是正确的:MTP 模块的 Output Head 和主模型的 Output Head 是共享的,并且它们都位于最后一个流水线阶段。
让我们来纠正并详细阐述在流水线并行(Pipeline Parallelism, PP)下,MTP 的真实工作流程。
纠正与精确描述:流水线并行下的 MTP
在启用流水线并行时,模型的不同部分被放置在不同的 GPU(称为 “stage”)上。一个典型的 GPT 模型流水线划分如下:
- Stage 0 (第一个GPU): Embedding Layer + 前面的 N 个 Transformer Block。
- Stage 1 (第二个GPU): 中间的 M 个 Transformer Block。
- …
- Stage P-1 (最后一个GPU): 最后的 K 个 Transformer Block +
Output Head(后处理)。
现在,我们把 MTP 架构放进这个流水线并行的框架里。DeepSeek 和 Megatron 的实现通常遵循以下原则:
- MTP 模块的位置: 所有的 MTP 模块 (
MultiTokenPredictionBlock) 都和Output Head一样,被放置在最后一个流水线阶段 (Stage P-1)。 - 共享与依赖:
Embedding Layer位于 第一个阶段 (Stage 0)。Output Head位于 最后一个阶段 (Stage P-1)。- MTP 模块需要调用共享的
Embedding Layer和Output Head。
这就产生了一个挑战:MTP 模块在最后一个阶段,但它需要第一个阶段的 Embedding Layer。
解决方案:
框架会通过特殊的机制(例如,在最后一个阶段也创建一个 Embedding Layer 的实例,但将其权重与第一个阶段的权重“绑定”或“共享”)来解决这个问题。您在代码中看到的 tie_word_embeddings_state_dict 函数就是处理这种权重绑定的。
在流水线并行下的真实数据流
现在,我们来重新梳理一下数据流,这将澄清 H₀ 和 MTP 模块的位置关系。
假设有 2 个流水线阶段 (PP=2):
Stage 0 (GPU 0)
- 输入:
input_ids,position_ids, … - 计算:
self.embedding(...)->embedding_outputself.decoder(...)(执行前一半的 Transformer Blocks) ->hidden_states_stage0
- 输出: 将
hidden_states_stage0发送到下一个阶段 (Stage 1)。
Stage 1 (GPU 1, 最后一个阶段)
-
输入: 接收来自 Stage 0 的
hidden_states_stage0。 -
计算 (主干网络部分):
self.decoder(...)(接收hidden_states_stage0作为输入,执行后一半的 Transformer Blocks) ->main_hidden_states(即H₀)。
-
计算 (MTP 部分): 现在,关键的部分来了!
self.mtp(...)被调用。- 输入:
hidden_states:main_hidden_states(H₀),这是刚刚在本阶段计算出来的。input_ids,position_ids: 这些原始输入需要通过 P2P 通信从第一个阶段重新发送到最后一个阶段,因为 MTP 模块需要它们来roll和embed。embedding: 调用本阶段的、与 Stage 0 权重绑定的Embedding Layer。
self.mtp内部串行计算H₁,H₂, …- 输出: 拼接好的
all_hidden_states = [H₀, H₁, H₂, ...]。
-
计算 (输出和损失):
self.post_process(包含共享的Output Head和 MTP 损失计算逻辑) 被调用。- 输入:
all_hidden_states。 - 它会像我们之前分析的那样,拆分
[H₀, H₁, ...],为每个Hᵢ计算 logits,并通过roll_tensor匹配正确的labels,最终计算出main_loss和所有的mtp_loss。 MTPLossAutoScaler在这里将mtp_loss“嫁接”到H₀的计算图上。- 最终计算出主模型的
main_loss。
-
反向传播:
main_loss.backward()被调用。- 梯度开始在 Stage 1 内部反向传播。当梯度流到
H₀时,由于MTPLossAutoScaler的存在,所有 MTP 损失的梯度会汇入主损失的梯度流。 - 这个合并后的梯度会继续反向传播,传过 Stage 1 的后一半 Transformer Blocks。
- 最终,梯度会被发送回 Stage 0,继续完成整个模型的反向传播。
总结与纠正
- 您的观察是正确的: MTP 模块和
Output Head都是在同一个、也是最后一个流水线阶段。 H₀的位置:H₀(主干网络的最终输出) 也是在最后一个流水线阶段计算完成的。- 信息流:
H₀计算完成后,在同一个 GPU (Stage P-1) 内部,它被直接传递给 MTP 模块作为输入,MTP 模块再进行它自己的串行计算。 - 梯度的汇合:
main_loss和mtp_loss都在最后一个阶段计算。MTPLossAutoScaler的作用依然是在这个阶段内部,将mtp_loss的梯度“嫁接”到H₀上,确保在梯度回传给上一个阶段之前,H₀已经承载了所有相关的梯度信息。
我之前的解释为了突出 H₀ 和 mtp_loss 在计算图上的依赖关系,而忽略了它们在物理设备(流水线阶段)上的布局,导致了误解。感谢您的敏锐指正!这个修正后的流程更贴近 Megatron-LM 的实际实现。
更多推荐


所有评论(0)