1. 背景与核心思路
Phase 0 采用双路径 (dual-path) 引擎:默认走双边 flagcxSend/Recv;当底层网络适配器支持单边 RDMA(IBRC)时,WRITE 升级为 flagcxHeteroPut + putSignal。
两条数据通路:
- 双边路径 (Send/Recv):上层两个 rank 各自调
postXfer,映射为flagcxSend/flagcxRecv。所有网络适配器都支持,无需内存注册。 - 单边路径 (Put):WRITE 方调
flagcxHeteroPut,READ 方仅轮询 signal 等数据到达。仅 IBRC 适配器支持,需预先注册内存窗口(flagcxCommRegister)。
映射规则:
| 路径 | 发起方调用 | engine 内部动作 | 对端需做 |
|---|---|---|---|
| 双边 | postXfer(WRITE, ...) | flagcxSend(addr, len, ...) | 对端 postXfer(READ, ...) → flagcxRecv |
| 双边 | postXfer(READ, ...) | flagcxRecv(addr, len, ...) | 对端 postXfer(WRITE, ...) → flagcxSend |
| 单边 | postXfer(WRITE, ...) | flagcxHeteroPut(srcOff, dstOff, size) + putSignal | 对端 postXfer(READ, ...) → waitValue |
| 单边 | postXfer(READ, ...) | waitValue(signal_offset) 轮询 | 对端 postXfer(WRITE, ...) → Put |
2. 需要改的东西
2.1 FlagCX 侧:transfer_engine.h
/* ---------- 不透明句柄 ---------- */
typedef struct flagcx_nixl_engine_s flagcx_nixl_engine_t;
typedef struct flagcx_nixl_conn_s flagcx_nixl_conn_t;
typedef struct flagcx_nixl_req_s flagcx_nixl_req_t;
/* ---------- 返回码 ---------- */
typedef enum {
FLAGCX_NIXL_SUCCESS = 0,
FLAGCX_NIXL_IN_PROG = 1,
FLAGCX_NIXL_ERR = -1,
} flagcx_nixl_status_t;
/* ---------- 操作类型 ---------- */
typedef enum {
FLAGCX_NIXL_OP_SEND = 0, // 双边: NIXL_WRITE → flagcxSend
FLAGCX_NIXL_OP_RECV = 1, // 双边: NIXL_READ → flagcxRecv
FLAGCX_NIXL_OP_PUT = 2, // 单边: NIXL_WRITE → flagcxHeteroPut + putSignal
FLAGCX_NIXL_OP_WAIT = 3, // 单边: NIXL_READ → waitValue (轮询 signal)
} flagcx_nixl_op_t;
/* ---------- IOV ---------- */
typedef struct {
void *addr; // 本地 buffer 地址
size_t length; // 字节数
size_t remote_offset; // 远端窗口内偏移 (仅 OP_PUT, SEND/RECV 忽略)
} flagcx_nixl_iov_t;
/* ---------- Engine 生命周期 ---------- */
flagcx_nixl_status_t
flagcx_nixl_engine_create(const char *local_agent,
int device_id,
flagcx_nixl_engine_t **engine);
void
flagcx_nixl_engine_destroy(flagcx_nixl_engine_t *engine);
/* ---------- connInfo 交换 ---------- */
flagcx_nixl_status_t
flagcx_nixl_get_conn_info(flagcx_nixl_engine_t *engine,
char **blob, size_t *blob_len);
void
flagcx_nixl_free_conn_blob(flagcx_nixl_engine_t *engine, char *blob);
flagcx_nixl_status_t
flagcx_nixl_load_remote_conn_info(flagcx_nixl_engine_t *engine,
const char *remote_agent,
const char *blob, size_t blob_len);
/* ---------- 连接 ---------- */
flagcx_nixl_status_t
flagcx_nixl_connect(flagcx_nixl_engine_t *engine,
const char *remote_agent,
flagcx_nixl_conn_t **conn);
void
flagcx_nixl_disconnect(flagcx_nixl_engine_t *engine,
flagcx_nixl_conn_t *conn);
/* ---------- 内存注册 (单边路径, connect 之后、submit 之前, 集合操作) ---------- */
flagcx_nixl_status_t
flagcx_nixl_reg_mem(flagcx_nixl_engine_t *engine,
flagcx_nixl_conn_t *conn,
void *buf, size_t size);
bool
flagcx_nixl_supports_one_sided(flagcx_nixl_engine_t *engine,
flagcx_nixl_conn_t *conn);
/* ---------- 传输 ---------- */
flagcx_nixl_status_t
flagcx_nixl_submit(flagcx_nixl_engine_t *engine,
flagcx_nixl_conn_t *conn,
flagcx_nixl_op_t op,
const flagcx_nixl_iov_t *iovs, size_t iov_count,
flagcx_nixl_req_t **req);
flagcx_nixl_status_t
flagcx_nixl_poll(flagcx_nixl_engine_t *engine,
flagcx_nixl_req_t *req);
void
flagcx_nixl_release_req(flagcx_nixl_engine_t *engine,
flagcx_nixl_req_t *req);2.2 FlagCX 侧:transfer_engine.cpp 完整伪代码
#include "flagcx.h"
#include <string>
#include <mutex>
#include <unordered_map>
#include <cstring>
/* ---- 远端 connInfo 反序列化后的缓存 ---- */
struct remote_conn_blob {
int32_t device_id;
flagcxUniqueId uid; // 256B,对端 engine_create 时 flagcxGetUniqueId 的结果
};engine 对象
struct flagcx_nixl_engine_s {
std::string local_agent;
int device_id;
flagcxHandlerGroup_t handler; // 持有 devHandle + uniqueId
std::mutex conn_mutex;
std::unordered_map<std::string, remote_conn_blob> remote_infos;
std::unordered_map<std::string, flagcx_nixl_conn_t *> peers;
};conn 对象
struct flagcx_nixl_conn_s {
std::string remote_agent;
int local_rank; // 0 或 1
int remote_rank; // 1 - local_rank
flagcxComm_t comm;
flagcxStream_t stream; // 每条 conn 一条专用 stream
};req 对象
struct flagcx_nixl_req_s {
flagcxEvent_t event; // submit 尾部 record,poll 时 query
bool completed;
};① engine_create — 初始化引擎,生成 uniqueId
flagcx_nixl_status_t
flagcx_nixl_engine_create(const char *local_agent,
int device_id,
flagcx_nixl_engine_t **engine)
{
auto *e = new flagcx_nixl_engine_s;
e->local_agent = local_agent;
e->device_id = device_id;
// 初始化 handler:加载设备插件,拿到 devHandle
flagcxHandleInit(&e->handler);
e->handler->devHandle->setDevice(device_id);
// ★ 无条件生成 uniqueId(双边都生成,connect 时只用 rank-0 的)
flagcxGetUniqueId(&e->handler->uniqueId);
*engine = e;
return FLAGCX_NIXL_SUCCESS;
}要点: 不管本端最终会是 rank-0 还是 rank-1,engine_create 时都生成一份 uniqueId。connect() 时再按字典序决定用谁的。
② engine_destroy — 断开所有连接,释放资源
void
flagcx_nixl_engine_destroy(flagcx_nixl_engine_t *engine)
{
// 遍历所有已建立的连接,逐个断开
for (auto &[name, conn] : engine->peers) {
flagcxCommFinalize(conn->comm);
flagcxCommDestroy(conn->comm);
engine->handler->devHandle->streamDestroy(conn->stream);
delete conn;
}
engine->peers.clear();
engine->remote_infos.clear();
flagcxHandleFree(engine->handler);
delete engine;
}③ get_conn_info — 序列化本端信息为 blob
NIXL 侧 getConnInfo() 调此函数把本端信息打包成二进制 blob,
再由 NIXL 的 metadata 交换通道(etcd / TCP / 手动)传给对端。
blob 格式: [4B version][256B agent_name][4B device_id][256B flagcxUniqueId]
总长 = 520 字节
flagcx_nixl_status_t
flagcx_nixl_get_conn_info(flagcx_nixl_engine_t *engine,
char **blob, size_t *blob_len)
{
constexpr size_t VER_SZ = 4;
constexpr size_t AGENT_SZ = 256;
constexpr size_t DEV_SZ = 4;
constexpr size_t UID_SZ = FLAGCX_UNIQUE_ID_BYTES; // 256
constexpr size_t TOTAL = VER_SZ + AGENT_SZ + DEV_SZ + UID_SZ; // 520
char *buf = (char *)calloc(1, TOTAL);
size_t off = 0;
// version
uint32_t ver = 1;
memcpy(buf + off, &ver, VER_SZ); off += VER_SZ;
// agent name(\0 填充至 256B)
strncpy(buf + off, engine->local_agent.c_str(), AGENT_SZ);
off += AGENT_SZ;
// device_id
int32_t dev = engine->device_id;
memcpy(buf + off, &dev, DEV_SZ); off += DEV_SZ;
// uniqueId(256B,engine_create 时 flagcxGetUniqueId 的结果)
memcpy(buf + off, engine->handler->uniqueId->internal, UID_SZ);
off += UID_SZ;
*blob = buf;
*blob_len = TOTAL;
return FLAGCX_NIXL_SUCCESS;
}双边都把自己的 uniqueId 放进 blob;对端拿到后缓存,connect 时按规则只用 rank-0 方的那份。
④ free_conn_blob — 释放 blob 内存
void
flagcx_nixl_free_conn_blob(flagcx_nixl_engine_t *engine, char *blob)
{
free(blob);
}⑤ load_remote_conn_info — 反序列化远端 blob,缓存到 map
NIXL 侧 loadRemoteConnInfo() 调此函数。
flagcx_nixl_status_t
flagcx_nixl_load_remote_conn_info(flagcx_nixl_engine_t *engine,
const char *remote_agent,
const char *blob, size_t blob_len)
{
constexpr size_t EXPECTED = 4 + 256 + 4 + FLAGCX_UNIQUE_ID_BYTES; // 520
if (blob_len < EXPECTED) return FLAGCX_NIXL_ERR;
size_t off = 0;
// version check
uint32_t ver;
memcpy(&ver, blob + off, 4); off += 4;
if (ver != 1) return FLAGCX_NIXL_ERR;
// agent name(跳过,调用方已提供 remote_agent)
off += 256;
// device_id
int32_t remote_dev;
memcpy(&remote_dev, blob + off, 4); off += 4;
// uniqueId
flagcxUniqueId remote_uid;
memcpy(remote_uid.internal, blob + off, FLAGCX_UNIQUE_ID_BYTES);
off += FLAGCX_UNIQUE_ID_BYTES;
// 缓存到 remote_infos map
std::lock_guard<std::mutex> lk(engine->conn_mutex);
engine->remote_infos[remote_agent] = {remote_dev, remote_uid};
return FLAGCX_NIXL_SUCCESS;
}⑥ connect — 确定 rank,选 uniqueId,建 comm
这是双边方案最关键的函数。 两端各自调 connect(),内部阻塞在 flagcxCommInitRank 直到双方都到达。
rank 分配规则:agent name 字典序小的 = rank 0
uniqueId 选择:两边都用 rank-0 方的 uniqueId
flagcx_nixl_status_t
flagcx_nixl_connect(flagcx_nixl_engine_t *engine,
const char *remote_agent,
flagcx_nixl_conn_t **conn)
{
std::lock_guard<std::mutex> lk(engine->conn_mutex);
// 幂等:如果已连接,直接返回已有 conn
auto it = engine->peers.find(remote_agent);
if (it != engine->peers.end()) {
*conn = it->second;
return FLAGCX_NIXL_SUCCESS;
}
// 查找远端信息(load_remote_conn_info 已缓存)
auto ri = engine->remote_infos.find(remote_agent);
if (ri == engine->remote_infos.end())
return FLAGCX_NIXL_ERR;
// ① 确定 rank:agent name 字典序小的 = rank 0
int local_rank = (engine->local_agent < std::string(remote_agent)) ? 0 : 1;
int remote_rank = 1 - local_rank;
// ② 选 uniqueId:两边都用 rank-0 方的
// - 本端是 rank 0 → 用自己 engine_create 时生成的 uid
// - 本端是 rank 1 → 用远端 blob 里反序列化出的 uid
flagcxUniqueId uid;
if (local_rank == 0) {
memcpy(&uid, engine->handler->uniqueId->internal,
FLAGCX_UNIQUE_ID_BYTES);
} else {
uid = ri->second.uid; // 远端(rank-0)的 uid
}
// ③ flagcxCommInitRank —— 阻塞!双边必须同时到达
// nranks=2,因为 NIXL 的 conn 是 point-to-point 的
flagcxComm_t comm;
flagcxCommInitRank(&comm, /*nranks=*/2, &uid, local_rank);
// ④ 创建专用 stream(用 devHandle 接口,跨设备厂商通用)
flagcxStream_t stream;
engine->handler->devHandle->streamCreate(&stream);
// ⑤ 组装 conn 对象
auto *c = new flagcx_nixl_conn_s;
c->remote_agent = remote_agent;
c->local_rank = local_rank;
c->remote_rank = remote_rank;
c->comm = comm;
c->stream = stream;
engine->peers[remote_agent] = c;
*conn = c;
return FLAGCX_NIXL_SUCCESS;
}核心要点:uniqueId 通过 NIXL 自身的 getConnInfo / loadRemoteConnInfo metadata 交换通道传递,不需要单独的控制通道。
⑦ disconnect — 断开单条连接
void
flagcx_nixl_disconnect(flagcx_nixl_engine_t *engine,
flagcx_nixl_conn_t *conn)
{
// 先 finalize 再 destroy(NCCL 语义:finalize 刷未完成操作)
flagcxCommFinalize(conn->comm);
flagcxCommDestroy(conn->comm);
// 销毁专用 stream
engine->handler->devHandle->streamDestroy(conn->stream);
// 从 peers map 中移除
engine->peers.erase(conn->remote_agent);
delete conn;
}⑧ submit — 提交 Send/Recv 操作
flagcx_nixl_status_t
flagcx_nixl_submit(flagcx_nixl_engine_t *engine,
flagcx_nixl_conn_t *conn,
flagcx_nixl_op_t op,
const flagcx_nixl_iov_t *iovs, size_t iov_count,
flagcx_nixl_req_t **req)
{
// 多 IOV 用 group 合并为一次内核提交,减少 launch 开销
if (iov_count > 1) flagcxGroupStart(conn->comm);
for (size_t i = 0; i < iov_count; i++) {
if (op == FLAGCX_NIXL_OP_SEND) {
flagcxSend(iovs[i].addr, iovs[i].length,
flagcxUint8, // 按字节传输
conn->remote_rank, // peer rank(对端在 2-rank comm 中的 rank)
conn->comm, conn->stream);
} else {
flagcxRecv(iovs[i].addr, iovs[i].length,
flagcxUint8,
conn->remote_rank,
conn->comm, conn->stream);
}
}
if (iov_count > 1) flagcxGroupEnd(conn->comm);
// 创建 event 并 record 到 stream,后续 poll 时 eventQuery 判完成
auto *r = new flagcx_nixl_req_s;
r->completed = false;
engine->handler->devHandle->eventCreate(&r->event,
flagcxEventDisableTiming);
engine->handler->devHandle->eventRecord(r->event, conn->stream);
*req = r;
return FLAGCX_NIXL_IN_PROG; // 异步,返回 IN_PROG
}IOV 只含本地 addr+length,不需要远端地址/token——这是双边 Send/Recv 与单边 RDMA Put/Get 的本质区别。
⑨ poll — 查询请求完成状态
flagcx_nixl_status_t
flagcx_nixl_poll(flagcx_nixl_engine_t *engine,
flagcx_nixl_req_t *req)
{
if (req->completed) return FLAGCX_NIXL_SUCCESS;
// eventQuery:SUCCESS 表示 stream 上该 event 之前的所有操作已完成
flagcxResult_t ret = engine->handler->devHandle->eventQuery(req->event);
if (ret == flagcxSuccess) {
req->completed = true;
return FLAGCX_NIXL_SUCCESS;
}
// flagcxInProgress 或其他 → 尚未完成
return FLAGCX_NIXL_IN_PROG;
}⑩ release_req — 释放请求资源
void
flagcx_nixl_release_req(flagcx_nixl_engine_t *engine,
flagcx_nixl_req_t *req)
{
engine->handler->devHandle->eventDestroy(req->event);
delete req;
}函数调用时序总览
NIXL 侧调用 FlagCX transfer_engine.cpp 内部
───────────────────────── ─────────────────────────────────
createBackend("FlagCX")
└→ engine_create() flagcxHandleInit → devHandle
flagcxGetUniqueId → 生成 uid
getConnInfo()
└→ get_conn_info() 序列化 [ver|agent|dev|uid] → 520B blob
(NIXL metadata 交换把 blob 送到对端)
loadRemoteConnInfo("AgentB", blob)
└→ load_remote_conn_info() 反序列化 blob → remote_infos["AgentB"]
registerMem()
connect("AgentB")
└→ connect() local_rank = (local < remote) ? 0 : 1
uid = rank-0 方的 uniqueId
flagcxCommInitRank(2, uid, rank) ← 阻塞!
devHandle->streamCreate
if need put? flagcx_nixl_reg_mem:0;
postXfer(WRITE, local, remote)
└→ submit(OP_SEND, iovs) flagcxGroupStart (if multi-iov)
flagcxSend × N
flagcxGroupEnd
eventCreate + eventRecord
checkXfer(req)
└→ poll(req) devHandle->eventQuery(event)
→ SUCCESS / IN_PROG
releaseReqH(req)
└→ release_req(req) devHandle->eventDestroy + delete
destroyBackend()
└→ engine_destroy() 遍历断开所有 conn
flagcxHandleFree
单边路径 (Put) 增补
以下描述当底层 IBRC 适配器可用时,engine 如何从双边自动切换到单边 flagcxHeteroPut。
结构体变更(在已有字段基础上新增):
engine:
struct flagcx_nixl_engine_s {
// ... 同上 ...
bool one_sided_avail = false; // connect + reg_mem 后检测 netAdaptor->put
};conn:
struct flagcx_nixl_conn_s {
// ... 同上 ...
bool mem_registered = false;
void *window_base = nullptr; // 本端注册窗口基址
size_t window_size = 0;
void *reg_handle = nullptr; // flagcxCommRegister 返回
};req:
struct flagcx_nixl_req_s {
// ... 同上 ...
bool is_put_path = false; // true = 走单边路径
size_t signal_offset = 0; // Put 路径的 signal 偏移
int local_rank = 0; // for waitValue(poll 时用)
flagcxHeteroComm_t hetero_comm = nullptr; // for netAdaptor 访问(poll 时用)
};⑪ reg_mem — 注册内存窗口(集合操作)
在 connect() 之后、submit() 之前调用。两端必须同时调用(内部 bootstrapAllGather 是集合操作)。
flagcx_nixl_status_t
flagcx_nixl_reg_mem(flagcx_nixl_engine_t *engine,
flagcx_nixl_conn_t *conn,
void *buf, size_t size)
{
// 开启单边注册支持
setenv("FLAGCX_ENABLE_ONE_SIDE_REGISTER", "1", 1);
void *handle = nullptr;
flagcxCommRegister(conn->comm, buf, size, &handle);
// └→ flagcxOneSideRegister()
// ├→ netAdaptor->regMr(buf, size) // IB 内存注册 → rkey/lkey
// └→ bootstrapAllGather() // 交换所有 rank 的 baseVa/rkey/lkey
// → 填充 globalOneSideHandles
conn->mem_registered = true;
conn->window_base = buf;
conn->window_size = size;
conn->reg_handle = handle;
// 检测单边能力:netAdaptor->put 是否可用
flagcxHeteroComm_t hc = conn->comm->heteroComm;
engine->one_sided_avail = (hc->netAdaptor && hc->netAdaptor->put);
return FLAGCX_NIXL_SUCCESS;
}⑫ supports_one_sided — 查询当前连接是否可走单边
bool
flagcx_nixl_supports_one_sided(flagcx_nixl_engine_t *engine,
flagcx_nixl_conn_t *conn)
{
return engine->one_sided_avail && conn->mem_registered;
}⑧’ submit — 双路径版本
flagcx_nixl_status_t
flagcx_nixl_submit(/* ... 同签名 ... */)
{
auto *r = new flagcx_nixl_req_s;
r->completed = false;
bool use_put = engine->one_sided_avail && conn->mem_registered;
// ═══════ 单边 Put 路径 (WRITE 端) ═══════
if (op == FLAGCX_NIXL_OP_PUT && use_put)
{
flagcxHeteroComm_t hc = conn->comm->heteroComm;
for (size_t i = 0; i < iov_count; i++) {
size_t src_off = (uintptr_t)iovs[i].addr
- (uintptr_t)conn->window_base;
flagcxHeteroPut(hc, conn->remote_rank,
src_off, iovs[i].remote_offset, iovs[i].length);
}
// 写 signal:约定 signal 位于远端窗口最末 8B
size_t sig_off = conn->window_size - 8; // TODO: 正式的 signal 分配
flagcxHeteroPutSignal(hc, conn->remote_rank, sig_off);
r->is_put_path = true;
r->signal_offset = sig_off;
r->local_rank = conn->local_rank;
r->hetero_comm = hc;
r->event = nullptr;
*req = r;
return FLAGCX_NIXL_IN_PROG;
}
// ═══════ 单边 Wait 路径 (READ 端) ═══════
if (op == FLAGCX_NIXL_OP_WAIT && use_put)
{
// READ 端无需做数据传输——数据通过 RDMA 直接写入本地窗口
// 仅需准备 signal 轮询
size_t sig_off = conn->window_size - 8;
r->is_put_path = true;
r->signal_offset = sig_off;
r->local_rank = conn->local_rank;
r->hetero_comm = conn->comm->heteroComm;
r->event = nullptr;
*req = r;
return FLAGCX_NIXL_IN_PROG;
}
// ═══════ 双边 Send/Recv 路径 (fallback) ═══════
// 当 OP_PUT 但 use_put==false 时,也走此路径(自动降级为 Send)
r->is_put_path = false;
if (iov_count > 1) flagcxGroupStart(conn->comm);
for (size_t i = 0; i < iov_count; i++) {
if (op == **FLAGCX_NIXL_OP_SEND** || op == FLAGCX_NIXL_OP_PUT) {
flagcxSend(iovs[i].addr, iovs[i].length, flagcxUint8,
conn->remote_rank, conn->comm, conn->stream);
} else {
flagcxRecv(iovs[i].addr, iovs[i].length, flagcxUint8,
conn->remote_rank, conn->comm, conn->stream);
}
}
if (iov_count > 1) flagcxGroupEnd(conn->comm);
engine->handler->devHandle->eventCreate(&r->event, flagcxEventDisableTiming);
engine->handler->devHandle->eventRecord(r->event, conn->stream);
*req = r;
return FLAGCX_NIXL_IN_PROG;
}当 OP_PUT 被请求但 use_put == false 时(IBRC 不可用),自动回退到 flagcxSend(双边路径)。
⑨’ poll — 双路径版本
flagcx_nixl_status_t
flagcx_nixl_poll(flagcx_nixl_engine_t *engine, flagcx_nixl_req_t *req)
{
if (req->completed) return FLAGCX_NIXL_SUCCESS;
if (req->is_put_path) {
// Put 路径:轮询本端 signal offset
// waitValue 检查 globalOneSideHandles[local_rank].baseVa + signal_offset
void **gHandles = (void **)globalOneSideHandles;
flagcxResult_t ret = req->hetero_comm->netAdaptor->waitValue(
gHandles, req->local_rank, req->signal_offset, /*expected=*/1);
if (ret == flagcxSuccess) {
req->completed = true;
return FLAGCX_NIXL_SUCCESS;
}
return FLAGCX_NIXL_IN_PROG;
}
// Send/Recv 路径:eventQuery
flagcxResult_t ret = engine->handler->devHandle->eventQuery(req->event);
if (ret == flagcxSuccess) {
req->completed = true;
return FLAGCX_NIXL_SUCCESS;
}
return FLAGCX_NIXL_IN_PROG;
}⑩’ release_req — 双路径版本
void
flagcx_nixl_release_req(flagcx_nixl_engine_t *engine, flagcx_nixl_req_t *req)
{
if (!req->is_put_path && req->event) {
engine->handler->devHandle->eventDestroy(req->event);
}
// Put 路径没有创建 event,无需释放
delete req;
}单边路径调用时序
Agent A (WRITE 端) Agent B (READ 端)
────────────────── ─────────────────
engine_create engine_create
get_conn_info → blob_A get_conn_info → blob_B
(交换 blob)
loadRemoteConnInfo("B", blob_B) loadRemoteConnInfo("A", blob_A)
connect("B") ←── flagcxCommInitRank 阻塞 ──→ connect("A")
reg_mem(gpu_buf_a, size) ←── allGather 阻塞 ──→ reg_mem(gpu_buf_b, size)
│ regMr + 交换 rkey/lkey/baseVa │ regMr + 交换 rkey/lkey/baseVa
│ one_sided_avail = true ✓ │ one_sided_avail = true ✓
submit(OP_PUT, iovs) submit(OP_WAIT, iovs)
│ flagcxHeteroPut(srcOff, dstOff, size) │ (无数据操作, 仅设 signal_offset)
│ flagcxHeteroPutSignal(sig_off) │
└→ req {is_put_path=true} └→ req {is_put_path=true}
poll(req) → SUCCESS (发起方 fire-and-forget) poll(req) → waitValue(sig_off)
→ IN_PROG / SUCCESS
双路径 vs 单路径 对比
| 维度 | 双边 Send/Recv | 单边 Put |
|---|---|---|
| 适配器要求 | 全部 | 仅 IBRC |
| 内存注册 | 不需要 | 需 flagcxCommRegister (集合) |
| 地址模型 | 本地 addr+len | srcOffset + dstOffset (窗口内偏移) |
| FIFO 约束 | 有——Send/Recv 必须配对 | 无——RDMA Write 地址寻址 |
| 完成检测 | eventQuery (stream event) | waitValue (轮询 signal) |
| WRITE 端 | flagcxSend | flagcxHeteroPut + putSignal |
| READ 端 | flagcxRecv | 无数据操作, 仅 waitValue |
| 自动降级 | — | use_put==false 时回退到 Send |
2.3 NIXL 侧:flagcx_backend.h / flagcx_backend.cpp 的改动
-
去掉远端内存相关逻辑
nixlFlagcxBackendMD不再需要remote_mem、remote_token、exported_public_md字段loadRemoteMD()不再调flagcx_nixl_import_mem(),只记录remote_agent+base_addr+lengthgetPublicData()仍然序列化base_addr/length/device_id(让远端知道 range 用于验证,但不传 token)
-
postXfer()简化nixl_status_t nixlFlagcxEngine::postXfer(const nixl_xfer_op_t &operation, const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote, const std::string &remote_agent, nixlBackendReqH *&handle, const nixl_opt_b_args_t *opt_args) const { // ... prepXfer if needed (build IOV) ... // 映射: NIXL_WRITE → SEND, NIXL_READ → RECV flagcx_nixl_op_t sb_op = (operation == NIXL_WRITE) ? FLAGCX_NIXL_OP_SEND : FLAGCX_NIXL_OP_RECV; // IOV 只需 local addr + length,不需要 remote_token/remote_offset std::vector<flagcx_nixl_iov_t> iovs(lcnt); for (int i = 0; i < lcnt; i++) { iovs[i].addr = reinterpret_cast<void*>(local[i].addr); iovs[i].length = local[i].len; } flagcx_nixl_submit(engine_, conn, sb_op, iovs.data(), iovs.size(), &req_h->req); return NIXL_IN_PROG; }IOV 里不再有
remote_token和remote_offset,因为数据面是flagcxSend/Recv,不需要知道远端地址。 -
supportsNotif()改为falsePhase 0 不走通知。上层如果需要知道对端完成,靠应用层自行同步。
-
nixlFlagcxReqH简化去掉
want_notif、notif_msg、notif_sent字段。 -
双路径 (Put) 支持
registerMem():仍为 no-op(不调 flagcx 注册),但缓存 addr/size/devId 供connect后调reg_mem使用connect()返回后,立即调flagcx_nixl_reg_mem(engine, conn, cached_buf, cached_size)注册内存窗口(集合操作,双端同步)postXfer()路径选择:if (flagcx_nixl_supports_one_sided(engine, conn)): WRITE → OP_PUT (IOV.remote_offset = remote[i].addr - remote_window_base) READ → OP_WAIT else: WRITE → OP_SEND READ → OP_RECVcheckXfer()无变化——底层poll()自动区分 Put/Send 路径
3. connInfo 如何承载 uniqueId
这是双边方案最关键的设计点。
NIXL 的 metadata 交换流程是:
Agent A: getConnInfo() → blob_A
Agent B: getConnInfo() → blob_B
(外部交换 blob_A ↔ blob_B,例如通过 etcd / gRPC / 文件系统)
Agent A: loadRemoteConnInfo("AgentB", blob_B)
Agent B: loadRemoteConnInfo("AgentA", blob_A)
Agent A: connect("AgentB")
Agent B: connect("AgentA")
在 getConnInfo() 里无条件生成 flagcxUniqueId 并序列化到 blob。
在 connect() 里:
local_rank = (local_agent < remote_agent) ? 0 : 1
if local_rank == 0:
uid = 从自己的 getConnInfo 结果里取(engine create 时已生成)
else:
uid = 从对端的 loadRemoteConnInfo blob 里解析
flagcxCommInitRank(&comm, 2, &uid, local_rank)
blob 格式建议:
[4B version][256B agent_name][8B host_hash][4B device_id][256B flagcxUniqueId]
双边都生成 uniqueId,但约定只用 rank-0 方的那份。 这样无论哪边先调 getConnInfo(),结果都是确定性的。
4. 双边协调约束
方案一的唯一硬性要求是:双边必须按相同顺序配对 Send/Recv。
FlagCX(继承 NCCL 语义)的 Send/Recv 是 FIFO 配对的——同一条 comm 上的第 N 个 Send 对应对端的第 N 个 Recv。
这意味着上层使用模式必须是:
# Agent A (sender)
nixl_agent_a.postXfer(NIXL_WRITE, local_buf_1, remote_desc_1, "AgentB") # → flagcxSend #1
nixl_agent_a.postXfer(NIXL_WRITE, local_buf_2, remote_desc_2, "AgentB") # → flagcxSend #2
# Agent B (receiver) —— **必须以相同顺序**
nixl_agent_b.postXfer(NIXL_READ, local_buf_1, remote_desc_1, "AgentA") # → flagcxRecv #1
nixl_agent_b.postXfer(NIXL_READ, local_buf_2, remote_desc_2, "AgentA") # → flagcxRecv #2如果顺序不一致(如 B 先 post #2 再 post #1),会死锁或数据错乱。
对于 KV cache 场景,这个约束完全可以满足——prefill/decode 的传输模式是预定义的。
单边路径 (Put) 不受此约束——RDMA Write 是地址寻址的(指定 srcOffset/dstOffset),不存在 FIFO 配对问题。但双边 fallback 路径仍受此约束。
5. 上层使用示例
// ========== Agent A (prefill, 要 WRITE 给 Agent B) ==========
nixlAgent agentA("AgentA");
agentA.createBackend("FlagCX", {{"device_id", "0"}});
// 注册本地 GPU buffer
nixlBlobDesc send_buf = {.addr = gpu_ptr_a, .len = 4096, .devId = 0};
nixlRegDList send_reg(VRAM_SEG, {send_buf});
agentA.registerMem(send_reg);
// 从 Agent B 拿到 connInfo + publicMD
agentA.loadRemoteConnInfo("AgentB", blob_b);
// 构造远端描述(只需知道 base_addr 和 length 用于对齐校验)
nixlBlobDesc recv_desc = {.addr = gpu_ptr_b, .len = 4096, .devId = 0};
// ... loadRemoteMD ...
// 发起传输
auto req = agentA.createXferReq(NIXL_WRITE, send_reg, recv_descs, "AgentB");
agentA.postXferReq(req); // → 内部: flagcxSend(gpu_ptr_a, 4096, ...)
while (agentA.checkXferReq(req) == NIXL_IN_PROG) { /* spin */ }
// ========== Agent B (decode, 要 READ 来自 Agent A) ==========
nixlAgent agentB("AgentB");
agentB.createBackend("FlagCX", {{"device_id", "0"}});
nixlBlobDesc recv_buf = {.addr = gpu_ptr_b, .len = 4096, .devId = 0};
nixlRegDList recv_reg(VRAM_SEG, {recv_buf});
agentB.registerMem(recv_reg);
agentB.loadRemoteConnInfo("AgentA", blob_a);
auto req = agentB.createXferReq(NIXL_READ, recv_reg, send_descs, "AgentA");
agentB.postXferReq(req); // → 内部: flagcxRecv(gpu_ptr_b, 4096, ...)
while (agentB.checkXferReq(req) == NIXL_IN_PROG) { /* spin */ }6. 开发任务拆分
Task 1: FlagCX 侧 — transfer_engine.h (新版)
- 精简后的 C ABI 头文件,只保留 11 个函数
- 去掉所有控制消息、远端内存、通知相关类型
Task 2: FlagCX 侧 — transfer_engine.cpp
实现以下函数(预估 ~300 行):
| 函数 | 实现要点 |
|-----|---------||
| engine_create | 记录 agent name / device_id,生成 flagcxUniqueId |
| engine_destroy | 清理 peers map |
| get_conn_info | 序列化 agent + host_hash + device_id + uniqueId |
| load_remote_conn_info | 解析 blob,缓存到 map |
| connect | 从 rank-0 方 blob 取 uid → flagcxCommInitRank(2, uid, rank) → 创建 stream |
| disconnect | flagcxCommFinalize + flagcxCommDestroy + 销毁 stream |
| reg_mem | flagcxCommRegister → IB regMr + AllGather rkey(集合操作)|
| supports_one_sided | 查询 netAdaptor->put 是否可用 |
| submit | 双路径:OP_PUT → flagcxHeteroPut + putSignal;OP_SEND/RECV → flagcxSend/Recv + eventRecord |
| poll | 双路径:Put → waitValue;Send/Recv → eventQuery |
| release_req | eventDestroy (仅 Send/Recv path) + free |
Task 3: NIXL 侧 — flagcx_backend.h / .cpp 适配
supportsNotif()→false- 去掉
remote_token、remote_mem相关字段和逻辑 postXfer的 IOV 只填local addr + length- 去掉
genNotif/getNotifs实现(返回 NOT_SUPPORTED) - 去掉 checkXfer 里的 notif 发送逻辑
Task 4: 构建 & 测试
- FlagCX 侧编译
libflagcx_nixl.so - NIXL 侧 meson.build 链接此 lib
- 写一个最小双进程测试:Agent A WRITE + Agent B READ,验证数据正确性
7. 后续演进路径
Phase 0 (当前) Phase 1
双路径: Send/Recv + Put 完整单边 + 控制面
IBRC 可用时自动升级 Put Put/Get 都完善
reg_mem 后走单边 控制通道 + progress thread
双边 fallback 保底 engine 内部自动配对