调研日期:2026-06-26 调研对象:

  • DModel:/Users/joker/Desktop/project/infrawaves/DModel
  • torchtitan:/Users/joker/Desktop/project/infrawaves/pytorch/torchtitan

目标: 0. DModel 里都调用了哪些 all-to-all、用来做什么

  1. 是否用了 titan 里的 all-to-all 实现
  2. titan 的 device 侧 all-to-all 到底怎么写的

0. 一句话结论

  • DModel 没有用 torchtitan 的任何 all-to-all 实现。DModel 的 all-to-all 全部基于 PyTorch 原生 API(torch.distributed.all_to_all_singletorch.distributed._functional_collectives.all_to_all_single[_autograd])。对 torchtitan 的引用只停留在 DeepSeek-V3 模型文件的注释里(设计思路参考),代码层面零依赖。
  • torchtitan 提供了 5 种 token dispatcher,其中只有 AllToAllTokenDispatcher 走 NCCL 的 all_to_all_single;DeepEP/HybridEP 是对 DeepSeek deep_ep 库的封装;minimal_async_ep 才是 titan 自研的 device 侧 all-to-allv——用 PyTorch SymmetricMemory + Triton kernel 直接往对端 GPU 内存写数据,绕开 NCCL。

1. DModel 侧:all-to-all 全量清单

场景文件底层 API在 autograd 图中
EP metadata 交换(token 计数)dmodel/parallelism/expert_parallel.py:451funcol.all_to_all_single(functional collectives)否(no_grad)
EP token dispatch(payload)dmodel/parallelism/expert_parallel.py:479funcol.all_to_all_single_autograd
EP token combine(payload)dmodel/parallelism/expert_parallel.py:545funcol.all_to_all_single_autograd
Ulysses seq→head 前向/反向dmodel/parallelism/ulysses_parallel.py:79 / :102dist.all_to_all_single是(自定义 Function)
Ulysses head→seq 前向/反向dmodel/parallelism/ulysses_parallel.py:144 / :166dist.all_to_all_single是(自定义 Function)
NCCL warmup(ep / cp_ulysses)dmodel/utils/distributed.py:263dist.all_to_all_single
单元测试tests/test_all_to_all.py:52,95间接走 functional collectives

1.1 EP(专家并行)—— dmodel/parallelism/expert_parallel.py

导入的是 PyTorch functional collectives,不是 titan:

# expert_parallel.py:50-56
from torch.distributed._functional_collectives import (
    all_to_all_single as funcol_all_to_all_single,
)
from torch.distributed._functional_collectives import (
    all_to_all_single_autograd as funcol_all_to_all_single_autograd,
)
from torch.distributed._functional_collectives import wait_tensor as funcol_wait_tensor

两个薄封装(仅加 .contiguous()):

# expert_parallel.py:306-312  —— 参与 autograd,用于 token payload
def all_to_all_single_autograd(input, output_split_sizes, input_split_sizes, group):
    return funcol_all_to_all_single_autograd(
        input.contiguous(), output_split_sizes, input_split_sizes, group,
    )
 
# expert_parallel.py:315-322  —— 不参与 autograd,用于 metadata,末尾 wait_tensor
def all_to_all_single_functional(input, output_split_sizes, input_split_sizes, group):
    output = funcol_all_to_all_single(
        input.contiguous(), output_split_sizes, input_split_sizes, group,
    )
    return funcol_wait_tensor(output)

三个调用点(forward):

# expert_parallel.py:450-456  —— ① dispatch 前先 all-to-all 交换"每个 rank 发往每个 expert 的 token 数"
with torch.no_grad():
    num_tokens_per_expert_group = all_to_all_single_functional(
        num_tokens_per_expert, None, None, group=device_mesh,
    )
 
# expert_parallel.py:478-484  —— ② token payload dispatch(不等分 split,由 ① 算出)
routed_input = all_to_all_single_autograd(
    routed_input, mod.output_splits, mod.input_splits, device_mesh,
)
 
# expert_parallel.py:544-550  —— ③ expert 算完后 combine 回原 rank(splits 互换)
routed_output = all_to_all_single_autograd(
    routed_output, mod.input_splits, mod.output_splits, device_mesh,
)

ExpertTensorParallel 继承 ExpertParallel,只是把 2D [ep, tp] mesh 里的 ep 子 mesh 传给父类,复用同一套 all-to-all:

# expert_parallel.py:661-662 / 683-685
return super()._token_dispatch(mod, inputs, device_mesh["ep"])
...
return super()._token_combine(mod, routed_output, device_mesh["ep"])

autograd 行为由 PyTorch functional collectives 内部提供:前向按 output/input_split_sizes 不等分传输,反向自动跑反向 all-to-all 且 split 互换。DModel 自己没写这部分反向。

1.2 Ulysses 序列并行 —— dmodel/parallelism/ulysses_parallel.py

这里是 完全自定义的 torch.autograd.Function,底层用的是 dist.all_to_all_single(注意不是 functional collectives 版)。

# ulysses_parallel.py:45   class ulysses_all_to_all(torch.autograd.Function)  —— seq → head
#   forward  :79   dist.all_to_all_single(output_flat, input_flat, group=sp_group)
#   backward :102  dist.all_to_all_single(output_flat, input_flat, group=sp_group)
 
# ulysses_parallel.py:113  class ulysses_all_to_all_reverse(torch.autograd.Function)  —— head → seq
#   forward  :144  dist.all_to_all_single(...)
#   backward :166  dist.all_to_all_single(...)

数据布局变换(seq→head 前向): (bs, local_seq, n_heads, hd) → reshape (bs, local_seq, cp, local_heads, hd) → permute (cp, bs, local_seq, local_heads, hd) → flatten → all_to_all_single(等分)→ reshape 回 (bs, full_seq, local_heads, hd)。 效果:从”持有全部 head 的局部序列”变成”持有局部 head 的完整序列”。reverse 是其数学逆操作。两者前向/反向各做一次 all_to_all_single,互为 autograd 对称(forward ≡ 对方的 backward)。

对外入口:

# ulysses_parallel.py:189  return ulysses_all_to_all.apply(x, cp_size, sp_group)         # seq_to_head
# ulysses_parallel.py:206  return ulysses_all_to_all_reverse.apply(x, cp_size, sp_group) # head_to_seq

UlyssesAttentionWrapper.forward:QKV 投影后 seq_to_head(:340-342),attention 后 head_to_seq(:375);MLAUlyssesWrapper 同理(:525-527, :559)。

1.3 NCCL warmup —— dmodel/utils/distributed.py

# distributed.py:214-219  ep 与 cp_ulysses 两个 mesh 维度映射到 all_to_all
_OP_MAP = { ..., "cp_ulysses": "all_to_all", "ep": "all_to_all", ... }
 
# distributed.py:259-263  训练前用 dummy all_to_all_single 预热 NCCL 缓冲
elif op == "all_to_all":
    buf = torch.empty(num_elements * world, device=device)
=    out = torch.empty(num_elements * world, device=device)
    for _ in range(num_iters):
        dist.all_to_all_single(out, buf, group=group)

2. torchtitan 侧:all-to-all / token dispatch 实现

2.1 五种 dispatcher(models/common/config_utils.py:281-329comm_backend 选择)

实现comm_backend定位
标准 all-to-allAllToAllTokenDispatcher"standard"PyTorch 原生 all_to_all_single,通用 NCCL,EP=1 退化本地
TorchAO 量化TorchAOTokenDispatcher(量化 swap)继承上者,加 token group padding(FP8→16、MXFP8→32)对齐量化 grouped GEMM
DeepEPDeepEPTokenDispatcher"deepep"封装 deep_ep.Buffer,H100/NVLink Switch,异步 combine 与 shared_experts 重叠
HybridEPHybridEPTokenDispatcher"hybridep"封装 deep_ep.HybridEPBuffer,GB200/NVLink72,TMA 优化、fused permute
MinimalAsyncEPMinimalAsyncEPTokenDispatcher"minimal_async_ep"自研 device 侧:SymmetricMemory + Triton 实现 all-to-allv(约束:EP=DP,无 TP/CP/PP/SP)

基础类:LocalTokenDispatchertoken_dispatcher.py:46,EP=1 无通信)、BaseEPTokenDispatchertoken_dispatcher.py:182)。

2.2 标准路径 AllToAllTokenDispatchermodels/common/token_dispatcher.py

dispatch(前向,两次 all-to-all):

# :289-296   EP=1 走 LocalTokenDispatcher 快速路径
# :298-305   _local_reorder():本地 token 按 expert id 排序成 (N=T*K, D)
 
# 第一次 all-to-all:交换 per-expert token 计数
# :334-339 (compile)  all_to_all_single(num_local_tokens_per_expert_E.view(ep,-1), None,None, ep_mesh)
# :341-346 (eager)    spmd.all_to_all(...)
# :357-369  D2H 同步拿到 input_splits / output_splits
 
# 第二次 all-to-all:按 splits 发实际 token 数据
# :372-377 (compile)  routed_input_RD = all_to_all_single(routed_input_ND, output_splits, input_splits, ep_mesh)
# :379-386 (eager)    spmd.all_to_all(..., output_split_sizes=..., input_split_sizes=...)
 
# :397-405  _permute():rank-major → expert-major,让同 expert token 连续,喂 grouped_mm

combine(反向收集):

# :508-517  EP=1 快速路径
# :528-529  _unpermute():逆 permute
# :533-548  反向 all-to-all(input/output splits 互换)发回源 rank
# :565-580  routing scores 加权 + deterministic_scatter_add 累加回原 token 位置

2.3 ★ device 侧自研:MinimalAsyncEP(distributed/minimal_async_ep/

目录只有 __init__.py / api.py / kernels.py——没有任何 .cu/.cpp/.h,device 侧 kernel 全部用 Triton 写。不走 NCCL all-to-all,而是 SymmetricMemory 对端 GPU 内存直写

(a) SymmetricMemory 缓冲区初始化(api.py:176-277 init_buffer()

# :203      backend 必须是 CUDA:symm_mem.get_backend(device) == "CUDA"
# :210-218  双缓冲 hidden 接收缓冲:symm_mem.empty(max_routed_tokens, hidden_dim, ...)  ×2
# :219-224  counts 接收缓冲:symm_mem.empty(group.size(), num_experts, int64)
# :225-228  symm_mem.rendezvous(buf, group):建立跨 GPU 的 IPC 映射
# :230-243  handle.get_buffer(peer, ...):拿到每个 peer 缓冲区的视图(远端 GPU 内存)
# :244-251  把各 peer 缓冲的 data_ptr() 收进一个 int64 GPU tensor,供 Triton kernel 索引

原理:symm_mem.empty 分配的内存在 group 内所有 GPU 地址空间可见;rendezvous 建立 IPC 映射;于是本 rank 能拿到 peer 缓冲的裸指针,Triton kernel 直接写过去。

(b) 核心 all-to-all 原语 Triton kernel(kernels.py:291-348 _copy_rows_to_peer_ptrs_kernel,已逐行核对)

@triton.jit
def _copy_rows_to_peer_ptrs_kernel(
    src,
    dst_ptrs,   # int64 指针表:每个 peer 缓冲的基址
    dst_ranks,  # 每行去哪个 peer
    dst_rows,   # 每行落在 peer 缓冲的哪一行
    num_valid_rows, src_rows, ...,
):
    row_start = tl.program_id(0) * BLOCK_M          # :321  每块处理 BLOCK_M 行 × BLOCK_N 列
    ...
    src_row = row
    if HAS_SRC_ROWS:
        src_row = tl.load(src_rows + row, ...)      # :334  可选行重排(E-major gather)
    dst_rank = tl.load(dst_ranks + row, ...)        # :335  本行目标 peer
    dst_row  = tl.load(dst_rows  + row, ...)        # :336  目标 peer 内行号
    values   = tl.load(src + src_row[:,None]*SRC_ROW_STRIDE + col[None,:]*SRC_COL_STRIDE, mask)  # :338-340 读本地源
    dst_base = tl.load(dst_ptrs + dst_rank, ...)    # :342  取目标 peer 缓冲基址
    dst_ptr  = dst_base.to(tl.pointer_type(DST_DTYPE))[:,None] + dst_row[:,None]*DST_ROW_STRIDE + col[None,:]  # :344-347
    tl.store(dst_ptr, values, mask=mask & dst_rank_mask[:,None])  # :348  ★ 直接写入远端 GPU 内存

一句话:用指针表 + Triton tl.store 把本地行细粒度 point-to-point 写进对端 GPU 的 symmetric 缓冲,绕过 NCCL

(c) 写后同步 barrier(api.py:336-348

def _wait_ready(handle, channel):
    # 一个 fused barrier kernel:同时 signal + poll 所有 peer,
    # 确保所有 peer 都写完本 rank 的接收缓冲后才读
    handle.barrier(channel=channel)

(d) custom_op + autograd 注册

# api.py:540-627  @torch.library.custom_op("minimal_async_ep::dispatch", device_types="cuda")
#   argsort 成 E-major → _dispatch_metadata 交换 counts → _copy_rows_to_peers_and_wait_cuda 直写 peer
# api.py:666-721  @torch.library.custom_op("minimal_async_ep::combine", ...)
#   _combine_to_origin 写回源 rank → reduce_topk_slots_kernel 做 top-k 归约
# api.py:990-997  dispatch_op.register_autograd(...) / combine_op.register_autograd(...)
#   dispatch.bwd ≈ combine 数据通路;combine.bwd ≈ dispatch 数据通路(含 routing score 梯度)

(e) kernels.py Triton kernel 一览

KernelLine功能
_copy_full_counts_to_peer_ptrs_kernel27-51per-expert count 写入所有 peer 的 count 缓冲
_fill_dispatch_metadata_kernel54-83每个 routed row 算目标 rank/行号
_fill_combine_metadata_kernel86-126每个接收 row 算源 rank/行号(combine 用)
_invert_flat_indices_kernel128-144E-major↔T-major 索引反转
_reduce_topk_slots_kernel147-195expert-sorted rows 按 top-k 归约回 token(fp32 累加)
_expand_topk_grad_kernel198-243上者的反向
_topk_scores_grad_kernel246-288routing score 梯度
_copy_rows_to_peer_ptrs_kernel291-348核心:指针表直写 peer GPU

约束:api.py:128-136 强制 MinimalAsyncEP 配合 FullAC(完全重计算),因为 symmetric memory barrier 语义与选择性 AC 不兼容。

2.4 DeepEP / HybridEP:封装 DeepSeek deep_ep

DeepEP(distributed/deepep/deepep.py

# :21-26   from deep_ep import Buffer, EventHandle, EventOverlap(缺库则报错指向 github.com/deepseek-ai/deepep)
# :294-317 get_buffer():按 group + hidden_bytes 创建/复用 deep_ep.Buffer(group, num_nvl_bytes, num_rdma_bytes)
# :70-116  @torch.library.impl(_lib,"dispatch","CUDA")  → buffer.dispatch(..., async_finish=True, allocate_on_comm_stream=True)
# :165-196 combine → buffer.combine(..., async_finish=True),after_event 存进 _pending_combine_event 延迟同步
# :240-285 sync_combine():MoE.forward 里 shared_experts 算完后再 wait,实现通信/计算重叠
# :232-237 autograd:dispatch.bwd → buffer.combine();combine.bwd → buffer.dispatch()

HybridEP(distributed/deepep/hybridep.py,面向 GB200/NVLink72)

# :415-421 from deep_ep import HybridEPBuffer(branch: hybrid-ep)
# :443-454 HybridEPBuffer(..., use_fp8=fp8_dispatch, num_sms_dispatch/combine=16, enable_custom_allgather=True)
# :113-191 @torch.library.custom_op("hybridep::dispatch") → _buffer.dispatch_with_permute(...)  (fused permute+a2a)
# :225-238 combine → _buffer.combine_with_unpermute(...)  (fused unpermute)
# :33-61   DispatchHandle(OpaqueBase):handle 作为 opaque type 在 torch.compile 图里流转(不用全局 cache)
特性DeepEPHybridEP
硬件H100 / NVLink SwitchGB200 / NVLink72
底层类deep_ep.Bufferdeep_ep.HybridEPBuffer
dispatchbuffer.dispatch()dispatch_with_permute()(fused)
combinebuffer.combine()combine_with_unpermute()(fused)
异步 combine是(_pending_combine_event否(同步返回)
handle全局 dict _handle_cacheOpaqueBase opaque type
permute 位置Python 侧 _permute_tokens()fused 在 kernel 内

2.5 MoE 层如何接入 dispatcher(models/common/moe.py

# :35-56   GroupedExperts.Config.token_dispatcher → self.token_dispatcher = config.token_dispatcher.build()
# :112-158 GroupedExperts.forward(): dispatch → _experts_forward(grouped_mm) → combine
#   :135-144 dispatch;:145-148 expert 计算;:149-155 combine
# :390-510 MoE.forward(): router → routing_map → experts(dispatch/combine) → shared_experts(与 combine 重叠)
#   :482-491 若是 DeepEPTokenDispatcher 且 sp==1,shared_experts 后调用 sync_combine()
# :160-170 parallelize(): token_dispatcher.wire_meshes(ep_mesh=..., tp_mesh=...) 注入通信组

2.6 Activation Checkpoint 与 all-to-all(distributed/activation_checkpoint.py:60-69

SAC 把通信 op 标 MUST_SAVE,避免 recompute 时重复通信:

comm_ops = [
    torch.ops._c10d_functional.reduce_scatter_tensor.default,
    torch.ops._c10d_functional.all_to_all_single.default,   # :63
    (torch.ops, "deepep.dispatch.default"),  (torch.ops, "deepep.combine.default"),    # :65-66
    (torch.ops, "hybridep.dispatch.default"),(torch.ops, "hybridep.combine.default"),  # :68-69
]

3. 两边对比小结

维度DModeltorchtitan
EP all-to-all 底层PyTorch functional collectives(NCCL)4 选 1:NCCL / DeepEP / HybridEP / SymmetricMemory+Triton
Ulysses/CP all-to-all自定义 autograd.Function + dist.all_to_all_single(torchtitan 此处不在本次范围)
device 侧自研 kernel无(全靠 NCCL/原生)minimal_async_ep 用 Triton 直写对端 GPU symmetric memory
第三方 EP 库依赖可选依赖 DeepSeek deep_ep(DeepEP/HybridEP)
是否互相调用不调用 titan(仅注释参考)

回答用户三问:

  1. DModel 调了 EP(dispatch/combine + metadata)、Ulysses(seq↔head)、warmup 三处 all-to-all。
  2. 没有用 titan 的 all-to-all 实现,全是 PyTorch 原生 API + DModel 自研封装。
  3. titan 的 device 侧 all-to-all 看 minimal_async_ep:核心是 kernels.py:291-348_copy_rows_to_peer_ptrs_kernel,靠 SymmetricMemory 拿到对端 GPU 裸指针后用 Triton tl.store 直写,handle.barrier() 同步,再由 torch.library.custom_op 包成带 autograd 的 dispatch/combine。

4. device 侧数据流示意

                 MoE.forward()  (moe.py)

        ┌──────────────┼───────────────────────┐
        ▼              ▼                         ▼
 AllToAllDispatcher  DeepEP/HybridEP        MinimalAsyncEP
 (token_dispatcher)  Dispatcher             Dispatcher
        │              │                         │
        ▼              ▼                         ▼
 torch.dist          deep_ep.Buffer        SymmetricMemory + Triton
 .all_to_all_single  .dispatch()/.combine()(kernels.py)
   (NCCL)            (CUDA/NVSHMEM/RDMA)    _copy_rows_to_peer_ptrs_kernel
                                            └─ tl.store 直写对端 GPU 内存
                                               handle.barrier() 同步