调研日期:2026-06-26 调研对象:
- DModel:
/Users/joker/Desktop/project/infrawaves/DModel- torchtitan:
/Users/joker/Desktop/project/infrawaves/pytorch/torchtitan目标: 0. DModel 里都调用了哪些 all-to-all、用来做什么
- 是否用了 titan 里的 all-to-all 实现
- titan 的 device 侧 all-to-all 到底怎么写的
0. 一句话结论
- DModel 没有用 torchtitan 的任何 all-to-all 实现。DModel 的 all-to-all 全部基于 PyTorch 原生 API(
torch.distributed.all_to_all_single与torch.distributed._functional_collectives.all_to_all_single[_autograd])。对 torchtitan 的引用只停留在 DeepSeek-V3 模型文件的注释里(设计思路参考),代码层面零依赖。 - torchtitan 提供了 5 种 token dispatcher,其中只有
AllToAllTokenDispatcher走 NCCL 的all_to_all_single;DeepEP/HybridEP 是对 DeepSeekdeep_ep库的封装;minimal_async_ep才是 titan 自研的 device 侧 all-to-allv——用 PyTorch SymmetricMemory + Triton kernel 直接往对端 GPU 内存写数据,绕开 NCCL。
1. DModel 侧:all-to-all 全量清单
| 场景 | 文件 | 底层 API | 在 autograd 图中 |
|---|---|---|---|
| EP metadata 交换(token 计数) | dmodel/parallelism/expert_parallel.py:451 | funcol.all_to_all_single(functional collectives) | 否(no_grad) |
| EP token dispatch(payload) | dmodel/parallelism/expert_parallel.py:479 | funcol.all_to_all_single_autograd | 是 |
| EP token combine(payload) | dmodel/parallelism/expert_parallel.py:545 | funcol.all_to_all_single_autograd | 是 |
| Ulysses seq→head 前向/反向 | dmodel/parallelism/ulysses_parallel.py:79 / :102 | dist.all_to_all_single | 是(自定义 Function) |
| Ulysses head→seq 前向/反向 | dmodel/parallelism/ulysses_parallel.py:144 / :166 | dist.all_to_all_single | 是(自定义 Function) |
| NCCL warmup(ep / cp_ulysses) | dmodel/utils/distributed.py:263 | dist.all_to_all_single | 否 |
| 单元测试 | tests/test_all_to_all.py:52,95 | 间接走 functional collectives | 是 |
1.1 EP(专家并行)—— dmodel/parallelism/expert_parallel.py
导入的是 PyTorch functional collectives,不是 titan:
# expert_parallel.py:50-56
from torch.distributed._functional_collectives import (
all_to_all_single as funcol_all_to_all_single,
)
from torch.distributed._functional_collectives import (
all_to_all_single_autograd as funcol_all_to_all_single_autograd,
)
from torch.distributed._functional_collectives import wait_tensor as funcol_wait_tensor两个薄封装(仅加 .contiguous()):
# expert_parallel.py:306-312 —— 参与 autograd,用于 token payload
def all_to_all_single_autograd(input, output_split_sizes, input_split_sizes, group):
return funcol_all_to_all_single_autograd(
input.contiguous(), output_split_sizes, input_split_sizes, group,
)
# expert_parallel.py:315-322 —— 不参与 autograd,用于 metadata,末尾 wait_tensor
def all_to_all_single_functional(input, output_split_sizes, input_split_sizes, group):
output = funcol_all_to_all_single(
input.contiguous(), output_split_sizes, input_split_sizes, group,
)
return funcol_wait_tensor(output)三个调用点(forward):
# expert_parallel.py:450-456 —— ① dispatch 前先 all-to-all 交换"每个 rank 发往每个 expert 的 token 数"
with torch.no_grad():
num_tokens_per_expert_group = all_to_all_single_functional(
num_tokens_per_expert, None, None, group=device_mesh,
)
# expert_parallel.py:478-484 —— ② token payload dispatch(不等分 split,由 ① 算出)
routed_input = all_to_all_single_autograd(
routed_input, mod.output_splits, mod.input_splits, device_mesh,
)
# expert_parallel.py:544-550 —— ③ expert 算完后 combine 回原 rank(splits 互换)
routed_output = all_to_all_single_autograd(
routed_output, mod.input_splits, mod.output_splits, device_mesh,
)ExpertTensorParallel 继承 ExpertParallel,只是把 2D [ep, tp] mesh 里的 ep 子 mesh 传给父类,复用同一套 all-to-all:
# expert_parallel.py:661-662 / 683-685
return super()._token_dispatch(mod, inputs, device_mesh["ep"])
...
return super()._token_combine(mod, routed_output, device_mesh["ep"])autograd 行为由 PyTorch functional collectives 内部提供:前向按
output/input_split_sizes不等分传输,反向自动跑反向 all-to-all 且 split 互换。DModel 自己没写这部分反向。
1.2 Ulysses 序列并行 —— dmodel/parallelism/ulysses_parallel.py
这里是 完全自定义的 torch.autograd.Function,底层用的是 dist.all_to_all_single(注意不是 functional collectives 版)。
# ulysses_parallel.py:45 class ulysses_all_to_all(torch.autograd.Function) —— seq → head
# forward :79 dist.all_to_all_single(output_flat, input_flat, group=sp_group)
# backward :102 dist.all_to_all_single(output_flat, input_flat, group=sp_group)
# ulysses_parallel.py:113 class ulysses_all_to_all_reverse(torch.autograd.Function) —— head → seq
# forward :144 dist.all_to_all_single(...)
# backward :166 dist.all_to_all_single(...)数据布局变换(seq→head 前向):
(bs, local_seq, n_heads, hd) → reshape (bs, local_seq, cp, local_heads, hd) → permute (cp, bs, local_seq, local_heads, hd) → flatten → all_to_all_single(等分)→ reshape 回 (bs, full_seq, local_heads, hd)。
效果:从”持有全部 head 的局部序列”变成”持有局部 head 的完整序列”。reverse 是其数学逆操作。两者前向/反向各做一次 all_to_all_single,互为 autograd 对称(forward ≡ 对方的 backward)。
对外入口:
# ulysses_parallel.py:189 return ulysses_all_to_all.apply(x, cp_size, sp_group) # seq_to_head
# ulysses_parallel.py:206 return ulysses_all_to_all_reverse.apply(x, cp_size, sp_group) # head_to_seq在 UlyssesAttentionWrapper.forward:QKV 投影后 seq_to_head(:340-342),attention 后 head_to_seq(:375);MLAUlyssesWrapper 同理(:525-527, :559)。
1.3 NCCL warmup —— dmodel/utils/distributed.py
# distributed.py:214-219 ep 与 cp_ulysses 两个 mesh 维度映射到 all_to_all
_OP_MAP = { ..., "cp_ulysses": "all_to_all", "ep": "all_to_all", ... }
# distributed.py:259-263 训练前用 dummy all_to_all_single 预热 NCCL 缓冲
elif op == "all_to_all":
buf = torch.empty(num_elements * world, device=device)
= out = torch.empty(num_elements * world, device=device)
for _ in range(num_iters):
dist.all_to_all_single(out, buf, group=group)2. torchtitan 侧:all-to-all / token dispatch 实现
2.1 五种 dispatcher(models/common/config_utils.py:281-329 按 comm_backend 选择)
| 实现 | 类 | comm_backend | 定位 |
|---|---|---|---|
| 标准 all-to-all | AllToAllTokenDispatcher | "standard" | PyTorch 原生 all_to_all_single,通用 NCCL,EP=1 退化本地 |
| TorchAO 量化 | TorchAOTokenDispatcher | (量化 swap) | 继承上者,加 token group padding(FP8→16、MXFP8→32)对齐量化 grouped GEMM |
| DeepEP | DeepEPTokenDispatcher | "deepep" | 封装 deep_ep.Buffer,H100/NVLink Switch,异步 combine 与 shared_experts 重叠 |
| HybridEP | HybridEPTokenDispatcher | "hybridep" | 封装 deep_ep.HybridEPBuffer,GB200/NVLink72,TMA 优化、fused permute |
| MinimalAsyncEP | MinimalAsyncEPTokenDispatcher | "minimal_async_ep" | 自研 device 侧:SymmetricMemory + Triton 实现 all-to-allv(约束:EP=DP,无 TP/CP/PP/SP) |
基础类:LocalTokenDispatcher(token_dispatcher.py:46,EP=1 无通信)、BaseEPTokenDispatcher(token_dispatcher.py:182)。
2.2 标准路径 AllToAllTokenDispatcher(models/common/token_dispatcher.py)
dispatch(前向,两次 all-to-all):
# :289-296 EP=1 走 LocalTokenDispatcher 快速路径
# :298-305 _local_reorder():本地 token 按 expert id 排序成 (N=T*K, D)
# 第一次 all-to-all:交换 per-expert token 计数
# :334-339 (compile) all_to_all_single(num_local_tokens_per_expert_E.view(ep,-1), None,None, ep_mesh)
# :341-346 (eager) spmd.all_to_all(...)
# :357-369 D2H 同步拿到 input_splits / output_splits
# 第二次 all-to-all:按 splits 发实际 token 数据
# :372-377 (compile) routed_input_RD = all_to_all_single(routed_input_ND, output_splits, input_splits, ep_mesh)
# :379-386 (eager) spmd.all_to_all(..., output_split_sizes=..., input_split_sizes=...)
# :397-405 _permute():rank-major → expert-major,让同 expert token 连续,喂 grouped_mmcombine(反向收集):
# :508-517 EP=1 快速路径
# :528-529 _unpermute():逆 permute
# :533-548 反向 all-to-all(input/output splits 互换)发回源 rank
# :565-580 routing scores 加权 + deterministic_scatter_add 累加回原 token 位置2.3 ★ device 侧自研:MinimalAsyncEP(distributed/minimal_async_ep/)
目录只有 __init__.py / api.py / kernels.py——没有任何 .cu/.cpp/.h,device 侧 kernel 全部用 Triton 写。不走 NCCL all-to-all,而是 SymmetricMemory 对端 GPU 内存直写。
(a) SymmetricMemory 缓冲区初始化(api.py:176-277 init_buffer())
# :203 backend 必须是 CUDA:symm_mem.get_backend(device) == "CUDA"
# :210-218 双缓冲 hidden 接收缓冲:symm_mem.empty(max_routed_tokens, hidden_dim, ...) ×2
# :219-224 counts 接收缓冲:symm_mem.empty(group.size(), num_experts, int64)
# :225-228 symm_mem.rendezvous(buf, group):建立跨 GPU 的 IPC 映射
# :230-243 handle.get_buffer(peer, ...):拿到每个 peer 缓冲区的视图(远端 GPU 内存)
# :244-251 把各 peer 缓冲的 data_ptr() 收进一个 int64 GPU tensor,供 Triton kernel 索引原理:symm_mem.empty 分配的内存在 group 内所有 GPU 地址空间可见;rendezvous 建立 IPC 映射;于是本 rank 能拿到 peer 缓冲的裸指针,Triton kernel 直接写过去。
(b) 核心 all-to-all 原语 Triton kernel(kernels.py:291-348 _copy_rows_to_peer_ptrs_kernel,已逐行核对)
@triton.jit
def _copy_rows_to_peer_ptrs_kernel(
src,
dst_ptrs, # int64 指针表:每个 peer 缓冲的基址
dst_ranks, # 每行去哪个 peer
dst_rows, # 每行落在 peer 缓冲的哪一行
num_valid_rows, src_rows, ...,
):
row_start = tl.program_id(0) * BLOCK_M # :321 每块处理 BLOCK_M 行 × BLOCK_N 列
...
src_row = row
if HAS_SRC_ROWS:
src_row = tl.load(src_rows + row, ...) # :334 可选行重排(E-major gather)
dst_rank = tl.load(dst_ranks + row, ...) # :335 本行目标 peer
dst_row = tl.load(dst_rows + row, ...) # :336 目标 peer 内行号
values = tl.load(src + src_row[:,None]*SRC_ROW_STRIDE + col[None,:]*SRC_COL_STRIDE, mask) # :338-340 读本地源
dst_base = tl.load(dst_ptrs + dst_rank, ...) # :342 取目标 peer 缓冲基址
dst_ptr = dst_base.to(tl.pointer_type(DST_DTYPE))[:,None] + dst_row[:,None]*DST_ROW_STRIDE + col[None,:] # :344-347
tl.store(dst_ptr, values, mask=mask & dst_rank_mask[:,None]) # :348 ★ 直接写入远端 GPU 内存一句话:用指针表 + Triton tl.store 把本地行细粒度 point-to-point 写进对端 GPU 的 symmetric 缓冲,绕过 NCCL。
(c) 写后同步 barrier(api.py:336-348)
def _wait_ready(handle, channel):
# 一个 fused barrier kernel:同时 signal + poll 所有 peer,
# 确保所有 peer 都写完本 rank 的接收缓冲后才读
handle.barrier(channel=channel)(d) custom_op + autograd 注册
# api.py:540-627 @torch.library.custom_op("minimal_async_ep::dispatch", device_types="cuda")
# argsort 成 E-major → _dispatch_metadata 交换 counts → _copy_rows_to_peers_and_wait_cuda 直写 peer
# api.py:666-721 @torch.library.custom_op("minimal_async_ep::combine", ...)
# _combine_to_origin 写回源 rank → reduce_topk_slots_kernel 做 top-k 归约
# api.py:990-997 dispatch_op.register_autograd(...) / combine_op.register_autograd(...)
# dispatch.bwd ≈ combine 数据通路;combine.bwd ≈ dispatch 数据通路(含 routing score 梯度)(e) kernels.py Triton kernel 一览
| Kernel | Line | 功能 |
|---|---|---|
_copy_full_counts_to_peer_ptrs_kernel | 27-51 | per-expert count 写入所有 peer 的 count 缓冲 |
_fill_dispatch_metadata_kernel | 54-83 | 每个 routed row 算目标 rank/行号 |
_fill_combine_metadata_kernel | 86-126 | 每个接收 row 算源 rank/行号(combine 用) |
_invert_flat_indices_kernel | 128-144 | E-major↔T-major 索引反转 |
_reduce_topk_slots_kernel | 147-195 | expert-sorted rows 按 top-k 归约回 token(fp32 累加) |
_expand_topk_grad_kernel | 198-243 | 上者的反向 |
_topk_scores_grad_kernel | 246-288 | routing score 梯度 |
_copy_rows_to_peer_ptrs_kernel | 291-348 | 核心:指针表直写 peer GPU |
约束:
api.py:128-136强制 MinimalAsyncEP 配合 FullAC(完全重计算),因为 symmetric memory barrier 语义与选择性 AC 不兼容。
2.4 DeepEP / HybridEP:封装 DeepSeek deep_ep 库
DeepEP(distributed/deepep/deepep.py)
# :21-26 from deep_ep import Buffer, EventHandle, EventOverlap(缺库则报错指向 github.com/deepseek-ai/deepep)
# :294-317 get_buffer():按 group + hidden_bytes 创建/复用 deep_ep.Buffer(group, num_nvl_bytes, num_rdma_bytes)
# :70-116 @torch.library.impl(_lib,"dispatch","CUDA") → buffer.dispatch(..., async_finish=True, allocate_on_comm_stream=True)
# :165-196 combine → buffer.combine(..., async_finish=True),after_event 存进 _pending_combine_event 延迟同步
# :240-285 sync_combine():MoE.forward 里 shared_experts 算完后再 wait,实现通信/计算重叠
# :232-237 autograd:dispatch.bwd → buffer.combine();combine.bwd → buffer.dispatch()HybridEP(distributed/deepep/hybridep.py,面向 GB200/NVLink72)
# :415-421 from deep_ep import HybridEPBuffer(branch: hybrid-ep)
# :443-454 HybridEPBuffer(..., use_fp8=fp8_dispatch, num_sms_dispatch/combine=16, enable_custom_allgather=True)
# :113-191 @torch.library.custom_op("hybridep::dispatch") → _buffer.dispatch_with_permute(...) (fused permute+a2a)
# :225-238 combine → _buffer.combine_with_unpermute(...) (fused unpermute)
# :33-61 DispatchHandle(OpaqueBase):handle 作为 opaque type 在 torch.compile 图里流转(不用全局 cache)| 特性 | DeepEP | HybridEP |
|---|---|---|
| 硬件 | H100 / NVLink Switch | GB200 / NVLink72 |
| 底层类 | deep_ep.Buffer | deep_ep.HybridEPBuffer |
| dispatch | buffer.dispatch() | dispatch_with_permute()(fused) |
| combine | buffer.combine() | combine_with_unpermute()(fused) |
| 异步 combine | 是(_pending_combine_event) | 否(同步返回) |
| handle | 全局 dict _handle_cache | OpaqueBase opaque type |
| permute 位置 | Python 侧 _permute_tokens() | fused 在 kernel 内 |
2.5 MoE 层如何接入 dispatcher(models/common/moe.py)
# :35-56 GroupedExperts.Config.token_dispatcher → self.token_dispatcher = config.token_dispatcher.build()
# :112-158 GroupedExperts.forward(): dispatch → _experts_forward(grouped_mm) → combine
# :135-144 dispatch;:145-148 expert 计算;:149-155 combine
# :390-510 MoE.forward(): router → routing_map → experts(dispatch/combine) → shared_experts(与 combine 重叠)
# :482-491 若是 DeepEPTokenDispatcher 且 sp==1,shared_experts 后调用 sync_combine()
# :160-170 parallelize(): token_dispatcher.wire_meshes(ep_mesh=..., tp_mesh=...) 注入通信组2.6 Activation Checkpoint 与 all-to-all(distributed/activation_checkpoint.py:60-69)
SAC 把通信 op 标 MUST_SAVE,避免 recompute 时重复通信:
comm_ops = [
torch.ops._c10d_functional.reduce_scatter_tensor.default,
torch.ops._c10d_functional.all_to_all_single.default, # :63
(torch.ops, "deepep.dispatch.default"), (torch.ops, "deepep.combine.default"), # :65-66
(torch.ops, "hybridep.dispatch.default"),(torch.ops, "hybridep.combine.default"), # :68-69
]3. 两边对比小结
| 维度 | DModel | torchtitan |
|---|---|---|
| EP all-to-all 底层 | PyTorch functional collectives(NCCL) | 4 选 1:NCCL / DeepEP / HybridEP / SymmetricMemory+Triton |
| Ulysses/CP all-to-all | 自定义 autograd.Function + dist.all_to_all_single | (torchtitan 此处不在本次范围) |
| device 侧自研 kernel | 无(全靠 NCCL/原生) | 有:minimal_async_ep 用 Triton 直写对端 GPU symmetric memory |
| 第三方 EP 库依赖 | 无 | 可选依赖 DeepSeek deep_ep(DeepEP/HybridEP) |
| 是否互相调用 | 不调用 titan(仅注释参考) | — |
回答用户三问:
- DModel 调了 EP(dispatch/combine + metadata)、Ulysses(seq↔head)、warmup 三处 all-to-all。
- 没有用 titan 的 all-to-all 实现,全是 PyTorch 原生 API + DModel 自研封装。
- titan 的 device 侧 all-to-all 看
minimal_async_ep:核心是kernels.py:291-348的_copy_rows_to_peer_ptrs_kernel,靠 SymmetricMemory 拿到对端 GPU 裸指针后用 Tritontl.store直写,handle.barrier()同步,再由torch.library.custom_op包成带 autograd 的 dispatch/combine。
4. device 侧数据流示意
MoE.forward() (moe.py)
│
┌──────────────┼───────────────────────┐
▼ ▼ ▼
AllToAllDispatcher DeepEP/HybridEP MinimalAsyncEP
(token_dispatcher) Dispatcher Dispatcher
│ │ │
▼ ▼ ▼
torch.dist deep_ep.Buffer SymmetricMemory + Triton
.all_to_all_single .dispatch()/.combine()(kernels.py)
(NCCL) (CUDA/NVSHMEM/RDMA) _copy_rows_to_peer_ptrs_kernel
└─ tl.store 直写对端 GPU 内存
handle.barrier() 同步