0. 调研

Mooncake内:

Mooncake 的 RPC 与 get_rpc_port 做了啥?

Mooncake 把通信拆成控制面数据面两条独立通路,这是整个设计的核心。

TransferEngine.initialize(hostname, "P2PHANDSHAKE", "rdma", "") 内部做两件事:建立 RDMA 资源(PD/CQ/设备),以及在一个自动选取的 TCP 端口上拉起一个握手守护线程(startDaemon,端口由 findAvailableTcpPort 分配)。get_rpc_port() 返回的就是这个守护线程监听的端口,上层用 hostname:rpc_port 作为对端的唯一会话标识(session)。

控制面就是这个守护线程:它收 JSON over TCP 的请求(Connection / Metadata / Notify 三类),在对端发起连接时交换 RDMA 连接参数(QP num、GID、LID、MTU)并解析出对方内存段的 rkey。数据面则完全是裸 RDMA verbs:batch_register_memory(ptrs, lens) 一次性把所有 KV 内存注册成 MR 并发布段描述符(地址区间 + rkey),batch_transfer_sync_write(remote_session, src_ptrs, dst_ptrs, lengths)绝对虚拟地址寻址——引擎按需对 remote_session 做一次握手(若连接已存在则复用),从对端发布的段元数据里查出 dst_ptrs 落在哪个 MR、拿到 rkey,然后 post RDMA write,阻塞到完成。

关键结论:rpc_port 只服务于数据面建链时的元数据握手,与请求级控制完全无关。vLLM 的 mooncake_connector.py 另外维护了一条 ZMQ side channel(端口 VLLM_MOONCAKE_BOOTSTRAP_PORT + dp_rank*tp_size),Decode 用它把”我要拉哪些请求的哪些 block、我的 base 地址、我的 rpc_port”打包成 MooncakeAgentMetadata 发给 Prefill;Prefill 收到后才用其中的 remote_hostname:remote_port(=rpc_port) 去驱动引擎做实际写入。两条通路职责清晰:ZMQ 管”传什么”,rpc_port 管”怎么连”。

FLAGCX内:

flagcx/core/flagcx_p2p.cc 的引擎 API(flagcxP2pEngine*,定义在 flagcx_p2p.h)

  • FlagcxP2pRdmaDesc(64 字节,addr + size + rkey)= Mooncake 的段描述符,且已有序列化函数 flagcxP2pSerializeRdmaDesc
  • gMemRegInfo(全局 unordered_map<baseAddr, FlagcxP2pMemRegEntry>)= Mooncake 的内存段表,每项已缓存预计算好的 64B desc(descBuf)。
  • flagcxP2pEngineReg / PrepareDesc / WriteVector / ReadVector / XferStatus = batch_register_memory + batch_transfer_sync_write 的全部能力,WriteVector 已支持按 (mrId, 本地源VA, size, 远端desc) 向量化提交。
  • flagcxP2pEngineConnect / Accept 已经在 flagcxSocket 上交换了 FlagcxP2pCtrlMeta(gpuIdx、notifPort、flags)并在 adaptor 内完成 QP 参数协商和 QP 状态机迁移;flagcxP2pEngineGetMetadata 已能吐出 ip:rdma_port?gpu?notif_port

flagcx 内没有中间 nixl 的话还需要提供一下功能: (1) 握手时需要交换 MR/desc 表——目前 desc 交换被甩给了应用层(现 connector 用 ZMQ 传 base 地址 + MR 索引); (2) Accept常驻守护,不是阻塞单次调用; (3) getRpcPort 按 session 复用连接的缓存 (4) flagcx engine API 导出到 flagcx_wrapper.py(Python 侧现在只能调旧的 comm API)。

1. flagcx 侧代码改动

现有 connect/accept 的 socket 握手扩展成携带 MR desc 表的控制面, 再把 accept 包装成常驻守护线程,并补一个 session 连接缓存。类似 Mooncake SocketHandShakePlugin ,只是复用 FlagCX 已有的 flagcxSocket + gMemRegInfo + FlagcxP2pRdmaDesc

rpc_port 不新开。 getRpcPort 直接返回现有 RDMA listener 的端口(即 flagcxP2pEngineGetMetadata 里那个 rdma_port,从 listeners[dev].handle 解析)。新增一个薄接口:

int flagcxP2pEngineGetRpcPort(FlagcxP2pEngine *engine);   // 返回 listener 端口

握手捎带 desc 表。flagcxP2pEngineConnectflagcxP2pEngineAccept 交换完 FlagcxP2pCtrlMeta 之后,追加一轮 flagcxSocketSendRecv:把本端 gMemRegInfo 里所有条目序列化为 {baseAddr, size, rkey} 数组发出去,同时收下对端的同样数组,存到 FlagcxP2pConn 上新增的字段 std::vector<RemoteRegion> remoteRegions。这一步用的就是 descBuf 里现成的 rkey,不需要新逻辑。

accept 守护化。 仿照已有的 notifWorkerLoop 线程模式,新增 flagcxP2pEngineStartRpcServer(engine),内部起一个线程循环调用现有的 flagcxP2pEngineAccept,每接入一个对端就完成上面的握手并把 conn 挂进引擎的 session 表。Decode 侧(写入的接收方)在注册完内存后拉起它即可。

session 缓存 + desc 查找。 引擎内维护 unordered_map<string("host:port"), FlagcxP2pConn*>。新增:

// 按 session 取连接,不存在则发起 connect(内部完成 desc 表交换),存在则复用
FlagcxP2pConn *flagcxP2pEngineGetConn(FlagcxP2pEngine *engine, const char *session);
 
// 给定远端绝对 VA + size,在 conn->remoteRegions 里定位落点,填出 desc(addr=VA, rkey=该段rkey)
int flagcxP2pEngineMakeDesc(FlagcxP2pConn *conn, uint64_t remoteVa, uint32_t size,
                            FlagcxP2pRdmaDesc *desc);

MakeDesc 就是 Mooncake “按 dst_ptr 查 rkey” 那一步的本地化实现。有了它,上层只需给绝对地址,无需再关心 MR 索引。

同步写。 WriteVector 已是非阻塞 + flagcxP2pEngineXferStatus 轮询。再包一个阻塞版对齐 batch_transfer_sync_write 即可,完成判定复用 FlagcxTransferTask::isAllDone:

int flagcxP2pEngineWriteVectorSync(FlagcxP2pConn *conn,
                                   std::vector<FlagcxP2pMr> mrIds,
                                   std::vector<void*> srcVec,
                                   std::vector<size_t> sizeVec,
                                   std::vector<FlagcxP2pRdmaDesc> descs);

最后把以上接口(Create/Destroy、Reg、GetRpcPort、StartRpcServer、GetConn、MakeDesc、WriteVectorSync)按 Function(...) 形式补进 flagcx_wrapper.py,并加一个薄 Python 包装类。整套改动不碰 ibrc_p2p adaptor 的 QP/WorkerPool 数据面,只在 flagcx_p2p.cc 控制面上做加法。

2. vllm-plugin-fl侧改动

只需”建链 + 注册 + 传输”这三处内核换掉。

注册阶段(register_kv_caches)不再只收集 (base, size),而是为每个 KV tensor 调 flagcxP2pEngineReg 拿到 mrId 列表,并记录 kv_caches_base_addr;同时 self.rpc_port = engine.get_rpc_port(),Decode 侧拉起 StartRpcServerFlagCXAgentMetadata.remote_port 从原来的 side_channel 端口改为 rpc_port(side channel 仍保留,只用于请求级信号)。

删除旧逻辑:_create_pair_comm_decode_listener_thread、uid 握手、flagcxCommInitRank_register_kv_for_commflagcxOneSideRegister/SignalRegister、以及 flagcxBatchPut + flagcxWaitCounter 那条计数器同步链路。PairCommInfo 退化为按 session 缓存的连接句柄(直接交给引擎的 session 表管理,connector 侧甚至可以不再自己存)。

传输阶段(_send_blocks)由 Prefill 发起。原来按 (src_off, dst_off, mr_idx) 拼 batch 的代码改为:先 conn = engine.get_conn(f"{remote_host}:{remote_rpc_port}")(首次触发握手 + desc 表交换,之后复用);沿用现有的 _group_contiguous 把连续 block 合并;对每个分组,本地源 = 本地KV base + 偏移,远端 desc 由 engine.make_desc(conn, 远端KV base + 偏移, size) 算出;最后 engine.write_vector_sync(conn, mrIds, srcVec, sizeVec, descs)。无需 signal buffer、无需 counter。Prefill 完成后照旧通过 ZMQ 回 TRANS_DONE,Decode 的 _receive_kv 收到即标记请求完成。

Decode 是写入接收方,对外公布自己的 host:rpc_port 和 KV base 地址;Prefill 连过去、取回 Decode 的 desc 表、用绝对地址 RDMA write 把 KV 推过去。