这段代码是 DeepSeek 相关的模型权重转换脚本,主要用于 转换和保存 Hugging Face(HF)格式的模型权重到 safetensors 格式,并按模型并行(MP)方式进行切分

import os
import shutil
from argparse import ArgumentParser
from glob import glob
from tqdm import tqdm, trange

import torch
from safetensors.torch import safe_open, save_file


mapping = {
    "embed_tokens": ("embed", 0),
    "input_layernorm": ("attn_norm", None),
    "post_attention_layernorm": ("ffn_norm", None),
    "q_proj": ("wq", 0),
    "q_a_proj": ("wq_a", None),
    "q_a_layernorm": ("q_norm", None),
    "q_b_proj": ("wq_b", 0),
    "kv_a_proj_with_mqa": ("wkv_a", None),
    "kv_a_layernorm": ("kv_norm", None),
    "kv_b_proj": ("wkv_b", 0),
    "o_proj": ("wo", 1),
    "gate": ("gate", None),
    "gate_proj": ("w1", 0),
    "down_proj": ("w2", 1),
    "up_proj": ("w3", 0),
    "norm": ("norm", None),
    "lm_head": ("head", 0),
    "scale": ("scale", None),
}


def main(hf_ckpt_path, save_path, n_experts, mp):
    """
    Converts and saves model checkpoint files into a specified format.

    Args:
        hf_ckpt_path (str): Path to the directory containing the input checkpoint files.
        save_path (str): Path to the directory where the converted checkpoint files will be saved.
        n_experts (int): Total number of experts in the model.
        mp (int): Model parallelism factor.
        
    Returns:
        None
    """
    torch.set_num_threads(8)
    n_local_experts = n_experts // mp
    state_dicts = [{} for _ in range(mp)]

    for file_path in tqdm(glob(os.path.join(hf_ckpt_path, "*.safetensors"))):
        with safe_open(file_path, framework="pt", device="cpu") as f:
            for name in f.keys():
                if "model.layers.61" in name:
                    continue
                param: torch.Tensor = f.get_tensor(name)
                if name.startswith("model."):
                    name = name[len("model."):]
                name = name.replace("self_attn", "attn")
                name = name.replace("mlp", "ffn")
                name = name.replace("weight_scale_inv", "scale")
                name = name.replace("e_score_correction_bias", "bias")
                key = name.split(".")[-2]
                assert key in mapping, f"Key {key} not found in mapping"
                new_key, dim = mapping[key]
                name = name.replace(key, new_key)
                for i in range(mp):
                    new_param = param
                    if "experts" in name and "shared_experts" not in name:
                        idx = int(name.split(".")[-3])
                        if idx < i * n_local_experts or idx >= (i + 1) * n_local_experts:
                            continue
                    elif dim is not None:
                        assert param.size(dim) % mp == 0, f"Dimension {dim} must be divisible by {mp}"
                        shard_size = param.size(dim) // mp
                        new_param = param.narrow(dim, i * shard_size, shard_size).contiguous()
                    state_dicts[i][name] = new_param

    os.makedirs(save_path, exist_ok=True)

    for i in trange(mp):
        save_file(state_dicts[i], os.path.join(save_path, f"model{i}-mp{mp}.safetensors"))

    for file_path in glob(os.path.join(hf_ckpt_path, "*token*")):
        new_file_path = os.path.join(save_path, os.path.basename(file_path))
        shutil.copyfile(file_path, new_file_path)


if __name__ == "__main__":
    parser = ArgumentParser()
    parser.add_argument("--hf-ckpt-path", type=str, required=True)
    parser.add_argument("--save-path", type=str, required=True)
    parser.add_argument("--n-experts", type=int, required=True)
    parser.add_argument("--model-parallel", type=int, required=True)
    args = parser.parse_args()
    assert args.n_experts % args.model_parallel == 0, "Number of experts must be divisible by model parallelism"
    main(args.hf_ckpt_path, args.save_path, args.n_experts, args.model_parallel)


代码解析

1. 代码的核心目标

  • 加载 HF 格式的 safetensors 模型权重
  • 重命名和转换部分权重名称
  • 按照 mp(模型并行)对权重进行切分
  • 保存处理后的 safetensors 权重到目标路径
  • 复制 *token* 相关的文件(可能是 tokenizer 相关的数据)

2. 代码关键部分解析

(1)参数解析
parser = ArgumentParser()
parser.add_argument("--hf-ckpt-path", type=str, required=True)  # 输入模型权重路径
parser.add_argument("--save-path", type=str, required=True)  # 目标保存路径
parser.add_argument("--n-experts", type=int, required=True)  # 总 expert 数量(Mixture-of-Experts 结构)
parser.add_argument("--model-parallel", type=int, required=True)  # 模型并行的份数
args = parser.parse_args()

这个部分定义了 命令行参数,并确保 n_experts 必须能被 model_parallel 整除,即每个并行部分能分配相等数量的 Experts。

(2)创建 state_dicts 存储不同 MP 份数的权重
n_local_experts = n_experts // mp
state_dicts = [{} for _ in range(mp)]  # 为每个模型并行分片创建一个存储字典
  • n_local_experts 计算 每个模型并行进程的 expert 数量
  • state_dicts 作为 一个列表,每个元素对应一个模型并行进程的权重

(3)遍历 HF safetensors 文件
for file_path in tqdm(glob(os.path.join(hf_ckpt_path, "*.safetensors"))):
    with safe_open(file_path, framework="pt", device="cpu") as f:
        for name in f.keys():
            if "model.layers.61" in name:
                continue  # 跳过特定层
            param: torch.Tensor = f.get_tensor(name)  # 加载 tensor 数据
  • 遍历 Hugging Face safetensors 文件。
  • 使用 safe_open() 安全读取权重(避免直接加载到 GPU)。
  • 过滤掉 "model.layers.61" 相关的层(可能是特殊层)。
  • 提取所有的 tensor 参数。

(4)重命名权重 & 进行切分
name = name.replace("self_attn", "attn").replace("mlp", "ffn")
name = name.replace("weight_scale_inv", "scale").replace("e_score_correction_bias", "bias")

key = name.split(".")[-2]
assert key in mapping, f"Key {key} not found in mapping"

new_key, dim = mapping[key]  # 从映射表获取新名称和切分维度
name = name.replace(key, new_key)
  • 统一模型层名称,将 Hugging Face 权重的 key 映射到新的格式,例如:
    • "self_attn""attn"
    • "mlp""ffn"
    • "weight_scale_inv""scale"
    • "e_score_correction_bias""bias"
  • 映射表 mapping 用于替换部分关键层名称,确保最终格式一致。

(5)按照 MP 进行切分
for i in range(mp):
    new_param = param
    if "experts" in name and "shared_experts" not in name:
        idx = int(name.split(".")[-3])
        if idx < i * n_local_experts or idx >= (i + 1) * n_local_experts:
            continue  # 只保留属于当前 MP 份的 experts 参数
    elif dim is not None:
        assert param.size(dim) % mp == 0, f"Dimension {dim} must be divisible by {mp}"
        shard_size = param.size(dim) // mp
        new_param = param.narrow(dim, i * shard_size, shard_size).contiguous()  # 沿指定维度切片
    state_dicts[i][name] = new_param
  • 对于 Mixture-of-Experts(MoE)结构
    • experts 相关参数按照 n_experts 进行筛选,只保留 属于当前 MP 份的 experts
  • 对于普通张量
    • 如果指定了 dim,则按 MP 进行切分(例如 q_projo_proj 通常按 hidden_dim 进行切分)。
    • narrow() 从张量的指定维度提取对应 MP 份的切片

(6)保存处理后的权重
os.makedirs(save_path, exist_ok=True)
for i in trange(mp):
    save_file(state_dicts[i], os.path.join(save_path, f"model{i}-mp{mp}.safetensors"))
  • 创建保存目录
  • 分别保存切分后的 safetensors 权重文件(每个 MP 进程一个文件)。

(7)复制 tokenizer 相关文件
for file_path in glob(os.path.join(hf_ckpt_path, "*token*")):
    new_file_path = os.path.join(save_path, os.path.basename(file_path))
    shutil.copyfile(file_path, new_file_path)
  • 复制 *token* 相关的文件,通常是 tokenizer 相关的数据,例如:
    • tokenizer.json
    • vocab.txt
    • merges.txt

总结

💡 这段代码的主要功能

1️⃣ 加载 Hugging Face safetensors 格式的权重
2️⃣ 统一命名规则(转换部分层名称)
3️⃣ 按照 mp 进行模型权重切分(特别是 MoE 结构)
4️⃣ 保存为新的 safetensors 格式
5️⃣ 复制 tokenizer 相关文件


🚀 为什么要这样做?

🔹 适配 DeepSeek 训练的并行策略(MP)
🔹 优化推理性能,让每个计算节点只加载自己需要的部分
🔹 兼容不同模型框架(DeepSeek vs. Hugging Face)
🔹 降低内存占用(通过 shard 切分大张量)


🌍 适用场景

✅ 需要在 DeepSeek 训练环境加载 Hugging Face 权重
✅ 需要在 分布式推理环境(TPU/GPU)按 MP 加载权重
✅ 需要在 大规模 MoE 结构转换和存储专家层权重


🛠 可能的优化点

🔹 支持更多模型格式转换(如 PyTorch .binsafetensors
🔹 更灵活的 expert 分配策略(动态调整 MoE 权重)
🔹 自动检测 mapping 可能遗漏的层,避免 assert key in mapping 失败


这段代码本质上是 一个 DeepSeek 训练环境的 Hugging Face 权重转换工具,为 模型部署和推理做准备。它保证了大模型 MoE 结构下的参数切分和高效加载,是 DeepSeek 分布式训练架构的一部分。 🚀

可以用于哪些生产环节

这段代码主要用于将 Hugging Face 格式的模型权重转换为 DeepSeek 训练框架所需的 safetensors 格式,并根据 模型并行(MP) 策略对模型参数进行切分。这样的操作可以在多个生产环节中提供高效的支持,尤其是在需要处理大规模模型和分布式训练或推理的场景下。以下是一些可能的生产环节:

1. 分布式训练

在训练大规模模型时,尤其是采用 Mixture-of-Experts(MoE) 的模型结构,模型的参数量非常庞大,单一设备无法加载整个模型。利用该脚本可以:

  • 将模型权重按 MP 切分,并将切分后的权重分布到多个计算节点(例如 GPU、TPU 或其他分布式计算平台),确保每个节点只加载其负责的部分。
  • 多节点并行训练 中有效地管理 模型参数的分布,减少内存压力,提高训练速度。

生产环节应用示例:

  • 大型预训练模型(如 GPT-3、BERT 大模型)训练:通过模型并行策略,分布式训练能够支持在集群或云平台中高效地训练大规模语言模型。
  • MoE 模型训练:DeepSeek 框架专门用于 MoE 模型,在这种结构下,只有一部分专家模型参与每次前向传播,脚本中的切分策略可保证每个设备加载其负责的专家层。

2. 模型推理与部署

  • 推理阶段,该脚本可以将训练好的模型权重进行切分并存储为 safetensors 格式,优化加载速度和内存占用。
  • 使用 DeepSeek 模型并行(MP) 策略,将大模型切分到多个机器或 GPU 上进行 分布式推理,以支持高效的在线服务或批量推理。

生产环节应用示例:

  • 大规模推理服务:例如,电商、金融或搜索引擎等场景中,需要大规模并行化推理以提供实时的 AI 服务。
  • 多模态推理:如结合视觉和文本的多模态大模型,通常需要将模型按层次或专家分布到多个计算资源上进行并行推理。

3. 模型压缩与优化

  • 通过模型并行化的方式,该脚本可以帮助在训练过程中或者部署时优化内存管理,使得每个设备加载部分权重,避免内存溢出。
  • 切分后的权重可以在硬件资源有限的设备(如边缘设备)上进行推理,减少模型的内存占用。

生产环节应用示例:

  • 边缘计算:在设备如智能手机、IoT 设备上部署大规模模型,脚本的并行切分可以帮助在有限的内存和计算资源上运行部分模型,从而减少内存压力,提高推理速度。
  • 服务器优化:在云计算环境中,根据设备资源,按需切分和加载模型,提高集群资源的利用率。

4. 大规模训练数据生成

  • 在生成对抗网络(GAN)、强化学习(RL)等训练框架中,需要大量的数据生成与优化过程。通过模型并行,可以提高训练过程的效率。
  • 数据处理与增强:对训练过程中的中间层权重切分、训练数据分批等操作有利于 数据增强和生成

生产环节应用示例:

  • 自动化数据生成:如基于大规模语言模型生成训练数据,用于对抗训练、强化学习等。
  • 大规模数据预处理:脚本的切分策略可应用于生成大规模数据集的分布式处理,减少数据加载时间。

5. 自定义训练框架

  • 该脚本适用于为特定场景开发自定义训练框架的任务。它可以将 Hugging Face 模型权重转化为 DeepSeek 所需格式,提供高效的分布式训练支持。

生产环节应用示例:

  • 自定义 AI 训练平台:企业或研究机构在研发特定的 AI 模型时,可能会使用自定义框架进行训练。此脚本帮助他们将 Hugging Face 模型集成到该框架中,按需切分参数以适应不同的硬件和计算资源。
  • 混合模型训练:例如,将不同专家或子模型的权重集成到一个大模型中进行训练时,模型的权重切分将优化训练流程。

6. 模型迁移与重训练

  • 将 Hugging Face 的模型迁移到 DeepSeek 等分布式训练框架中进行 迁移学习模型重训练,该脚本可以帮助 转换权重重新分配权重,适应新的训练环境。

生产环节应用示例:

  • 迁移学习:使用已经预训练的语言模型,在特定领域(如金融、医疗)上进行迁移学习时,可以利用该脚本按 MP 切分权重,提升训练效率。
  • 模型微调:在大规模数据集上进行微调时,利用并行策略加速微调过程。

7. 模型压缩与分布式存储

  • 在大规模存储系统中,分布式存储和管理模型权重变得尤为重要。该脚本可以帮助将模型按 MP 切分并保存到不同的存储节点,适合需要进行 高效存储与访问管理 的场景。

生产环节应用示例:

  • 云端模型存储与访问:对于云平台上的 AI 模型,可以按需切分存储并通过 MP 策略来管理模型的访问和加载。
  • 备份与恢复:通过切分后的模型文件,能够更加高效地进行模型的备份与恢复操作。

8. 自动化与 DevOps

  • 结合自动化部署与 DevOps 流程,帮助团队 自动化模型转换与部署,支持 CI/CD 流程中对大规模模型的自动化训练和推理。

生产环节应用示例:

  • 持续集成/持续部署(CI/CD):在机器学习模型的开发和部署过程中,使用此脚本帮助自动化地将 Hugging Face 权重转换并分布式部署到不同的计算节点。
  • 模型生命周期管理:自动化模型版本管理与迁移,确保在多种训练和推理环境中对模型进行高效管理。

总结

这段代码在多个生产环节中都有广泛的应用,特别是在 分布式训练大规模推理边缘设备部署定制化训练平台 等场景中,能够显著提升 计算效率、内存管理、模型管理和自动化部署。企业可以利用这段代码优化 大模型的存储、加载、训练和推理过程,提升生产系统的性能和可扩展性。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐