1. 背景与核心思路

Phase 0 采用双路径 (dual-path) 引擎:默认走双边 flagcxSend/Recv;当底层网络适配器支持单边 RDMA(IBRC)时,WRITE 升级为 flagcxHeteroPut + putSignal

两条数据通路:

  • 双边路径 (Send/Recv):上层两个 rank 各自调 postXfer,映射为 flagcxSend / flagcxRecv。所有网络适配器都支持,无需内存注册。
  • 单边路径 (Put):WRITE 方调 flagcxHeteroPut,READ 方仅轮询 signal 等数据到达。仅 IBRC 适配器支持,需预先注册内存窗口(flagcxCommRegister)。

映射规则:

路径发起方调用engine 内部动作对端需做
双边postXfer(WRITE, ...)flagcxSend(addr, len, ...)对端 postXfer(READ, ...)flagcxRecv
双边postXfer(READ, ...)flagcxRecv(addr, len, ...)对端 postXfer(WRITE, ...)flagcxSend
单边postXfer(WRITE, ...)flagcxHeteroPut(srcOff, dstOff, size) + putSignal对端 postXfer(READ, ...)waitValue
单边postXfer(READ, ...)waitValue(signal_offset) 轮询对端 postXfer(WRITE, ...)Put

2. 需要改的东西

2.1 FlagCX 侧:transfer_engine.h

/* ---------- 不透明句柄 ---------- */
typedef struct flagcx_nixl_engine_s   flagcx_nixl_engine_t;
typedef struct flagcx_nixl_conn_s     flagcx_nixl_conn_t;
typedef struct flagcx_nixl_req_s      flagcx_nixl_req_t;
 
/* ---------- 返回码 ---------- */
typedef enum {
    FLAGCX_NIXL_SUCCESS  =  0,
    FLAGCX_NIXL_IN_PROG  =  1,
    FLAGCX_NIXL_ERR      = -1,
} flagcx_nixl_status_t;
 
/* ---------- 操作类型 ---------- */
typedef enum {
    FLAGCX_NIXL_OP_SEND = 0,   // 双边: NIXL_WRITE → flagcxSend
    FLAGCX_NIXL_OP_RECV = 1,   // 双边: NIXL_READ  → flagcxRecv
    FLAGCX_NIXL_OP_PUT  = 2,   // 单边: NIXL_WRITE → flagcxHeteroPut + putSignal
    FLAGCX_NIXL_OP_WAIT = 3,   // 单边: NIXL_READ  → waitValue (轮询 signal)
} flagcx_nixl_op_t;
 
/* ---------- IOV ---------- */
typedef struct {
    void    *addr;           // 本地 buffer 地址
    size_t   length;         // 字节数
    size_t   remote_offset;  // 远端窗口内偏移 (仅 OP_PUT, SEND/RECV 忽略)
} flagcx_nixl_iov_t;
 
/* ---------- Engine 生命周期 ---------- */
flagcx_nixl_status_t
flagcx_nixl_engine_create(const char *local_agent,
                          int device_id,
                          flagcx_nixl_engine_t **engine);
 
void
flagcx_nixl_engine_destroy(flagcx_nixl_engine_t *engine);
 
/* ---------- connInfo 交换 ---------- */
flagcx_nixl_status_t
flagcx_nixl_get_conn_info(flagcx_nixl_engine_t *engine,
                          char **blob, size_t *blob_len);
 
void
flagcx_nixl_free_conn_blob(flagcx_nixl_engine_t *engine, char *blob);
 
flagcx_nixl_status_t
flagcx_nixl_load_remote_conn_info(flagcx_nixl_engine_t *engine,
                                  const char *remote_agent,
                                  const char *blob, size_t blob_len);
 
/* ---------- 连接 ---------- */
flagcx_nixl_status_t
flagcx_nixl_connect(flagcx_nixl_engine_t *engine,
                    const char *remote_agent,
                    flagcx_nixl_conn_t **conn);
 
void
flagcx_nixl_disconnect(flagcx_nixl_engine_t *engine,
                       flagcx_nixl_conn_t *conn);
 
/* ---------- 内存注册 (单边路径, connect 之后、submit 之前, 集合操作) ---------- */
flagcx_nixl_status_t
flagcx_nixl_reg_mem(flagcx_nixl_engine_t *engine,
                    flagcx_nixl_conn_t *conn,
                    void *buf, size_t size);
 
bool
flagcx_nixl_supports_one_sided(flagcx_nixl_engine_t *engine,
                                flagcx_nixl_conn_t *conn);
 
/* ---------- 传输 ---------- */
flagcx_nixl_status_t
flagcx_nixl_submit(flagcx_nixl_engine_t *engine,
                   flagcx_nixl_conn_t *conn,
                   flagcx_nixl_op_t op,
                   const flagcx_nixl_iov_t *iovs, size_t iov_count,
                   flagcx_nixl_req_t **req);
 
flagcx_nixl_status_t
flagcx_nixl_poll(flagcx_nixl_engine_t *engine,
                 flagcx_nixl_req_t *req);
 
void
flagcx_nixl_release_req(flagcx_nixl_engine_t *engine,
                        flagcx_nixl_req_t *req);

2.2 FlagCX 侧:transfer_engine.cpp 完整伪代码

#include "flagcx.h"
#include <string>
#include <mutex>
#include <unordered_map>
#include <cstring>
 
/* ---- 远端 connInfo 反序列化后的缓存 ---- */
struct remote_conn_blob {
    int32_t          device_id;
    flagcxUniqueId   uid;           // 256B,对端 engine_create 时 flagcxGetUniqueId 的结果
};

engine 对象

struct flagcx_nixl_engine_s {
    std::string              local_agent;
    int                      device_id;
    flagcxHandlerGroup_t     handler;    // 持有 devHandle + uniqueId
 
    std::mutex               conn_mutex;
    std::unordered_map<std::string, remote_conn_blob>       remote_infos;
    std::unordered_map<std::string, flagcx_nixl_conn_t *>   peers;
};

conn 对象

struct flagcx_nixl_conn_s {
    std::string      remote_agent;
    int              local_rank;     // 0 或 1
    int              remote_rank;    // 1 - local_rank
    flagcxComm_t     comm;
    flagcxStream_t   stream;         // 每条 conn 一条专用 stream
};

req 对象

struct flagcx_nixl_req_s {
    flagcxEvent_t    event;          // submit 尾部 record,poll 时 query
    bool             completed;
};

engine_create — 初始化引擎,生成 uniqueId

flagcx_nixl_status_t
flagcx_nixl_engine_create(const char *local_agent,
                          int device_id,
                          flagcx_nixl_engine_t **engine)
{
    auto *e        = new flagcx_nixl_engine_s;
    e->local_agent = local_agent;
    e->device_id   = device_id;
 
    // 初始化 handler:加载设备插件,拿到 devHandle
    flagcxHandleInit(&e->handler);
    e->handler->devHandle->setDevice(device_id);
 
    // ★ 无条件生成 uniqueId(双边都生成,connect 时只用 rank-0 的)
    flagcxGetUniqueId(&e->handler->uniqueId);
 
    *engine = e;
    return FLAGCX_NIXL_SUCCESS;
}

要点: 不管本端最终会是 rank-0 还是 rank-1,engine_create 时都生成一份 uniqueIdconnect() 时再按字典序决定用谁的。


engine_destroy — 断开所有连接,释放资源

void
flagcx_nixl_engine_destroy(flagcx_nixl_engine_t *engine)
{
    // 遍历所有已建立的连接,逐个断开
    for (auto &[name, conn] : engine->peers) {
        flagcxCommFinalize(conn->comm);
        flagcxCommDestroy(conn->comm);
        engine->handler->devHandle->streamDestroy(conn->stream);
        delete conn;
    }
    engine->peers.clear();
    engine->remote_infos.clear();
 
    flagcxHandleFree(engine->handler);
    delete engine;
}

get_conn_info — 序列化本端信息为 blob

NIXL 侧 getConnInfo() 调此函数把本端信息打包成二进制 blob, 再由 NIXL 的 metadata 交换通道(etcd / TCP / 手动)传给对端。

blob 格式: [4B version][256B agent_name][4B device_id][256B flagcxUniqueId]
总长 = 520 字节
flagcx_nixl_status_t
flagcx_nixl_get_conn_info(flagcx_nixl_engine_t *engine,
                          char **blob, size_t *blob_len)
{
    constexpr size_t VER_SZ   = 4;
    constexpr size_t AGENT_SZ = 256;
    constexpr size_t DEV_SZ   = 4;
    constexpr size_t UID_SZ   = FLAGCX_UNIQUE_ID_BYTES;   // 256
    constexpr size_t TOTAL    = VER_SZ + AGENT_SZ + DEV_SZ + UID_SZ;  // 520
 
    char *buf = (char *)calloc(1, TOTAL);
    size_t off = 0;
 
    // version
    uint32_t ver = 1;
    memcpy(buf + off, &ver, VER_SZ);                        off += VER_SZ;
 
    // agent name(\0 填充至 256B)
    strncpy(buf + off, engine->local_agent.c_str(), AGENT_SZ);
                                                             off += AGENT_SZ;
 
    // device_id
    int32_t dev = engine->device_id;
    memcpy(buf + off, &dev, DEV_SZ);                        off += DEV_SZ;
 
    // uniqueId(256B,engine_create 时 flagcxGetUniqueId 的结果)
    memcpy(buf + off, engine->handler->uniqueId->internal, UID_SZ);
                                                             off += UID_SZ;
 
    *blob     = buf;
    *blob_len = TOTAL;
    return FLAGCX_NIXL_SUCCESS;
}

双边都把自己的 uniqueId 放进 blob;对端拿到后缓存,connect 时按规则只用 rank-0 方的那份。


free_conn_blob — 释放 blob 内存

void
flagcx_nixl_free_conn_blob(flagcx_nixl_engine_t *engine, char *blob)
{
    free(blob);
}

load_remote_conn_info — 反序列化远端 blob,缓存到 map

NIXL 侧 loadRemoteConnInfo() 调此函数。

flagcx_nixl_status_t
flagcx_nixl_load_remote_conn_info(flagcx_nixl_engine_t *engine,
                                  const char *remote_agent,
                                  const char *blob, size_t blob_len)
{
    constexpr size_t EXPECTED = 4 + 256 + 4 + FLAGCX_UNIQUE_ID_BYTES;  // 520
    if (blob_len < EXPECTED) return FLAGCX_NIXL_ERR;
 
    size_t off = 0;
 
    // version check
    uint32_t ver;
    memcpy(&ver, blob + off, 4);                             off += 4;
    if (ver != 1) return FLAGCX_NIXL_ERR;
 
    // agent name(跳过,调用方已提供 remote_agent)
    off += 256;
 
    // device_id
    int32_t remote_dev;
    memcpy(&remote_dev, blob + off, 4);                      off += 4;
 
    // uniqueId
    flagcxUniqueId remote_uid;
    memcpy(remote_uid.internal, blob + off, FLAGCX_UNIQUE_ID_BYTES);
                                                              off += FLAGCX_UNIQUE_ID_BYTES;
 
    // 缓存到 remote_infos map
    std::lock_guard<std::mutex> lk(engine->conn_mutex);
    engine->remote_infos[remote_agent] = {remote_dev, remote_uid};
 
    return FLAGCX_NIXL_SUCCESS;
}

connect — 确定 rank,选 uniqueId,建 comm

这是双边方案最关键的函数。 两端各自调 connect(),内部阻塞在 flagcxCommInitRank 直到双方都到达。

rank 分配规则:agent name 字典序小的 = rank 0
uniqueId 选择:两边都用 rank-0 方的 uniqueId
flagcx_nixl_status_t
flagcx_nixl_connect(flagcx_nixl_engine_t *engine,
                    const char *remote_agent,
                    flagcx_nixl_conn_t **conn)
{
    std::lock_guard<std::mutex> lk(engine->conn_mutex);
 
    // 幂等:如果已连接,直接返回已有 conn
    auto it = engine->peers.find(remote_agent);
    if (it != engine->peers.end()) {
        *conn = it->second;
        return FLAGCX_NIXL_SUCCESS;
    }
 
    // 查找远端信息(load_remote_conn_info 已缓存)
    auto ri = engine->remote_infos.find(remote_agent);
    if (ri == engine->remote_infos.end())
        return FLAGCX_NIXL_ERR;
 
    // ① 确定 rank:agent name 字典序小的 = rank 0
    int local_rank  = (engine->local_agent < std::string(remote_agent)) ? 0 : 1;
    int remote_rank = 1 - local_rank;
 
    // ② 选 uniqueId:两边都用 rank-0 方的
    //    - 本端是 rank 0  → 用自己 engine_create 时生成的 uid
    //    - 本端是 rank 1  → 用远端 blob 里反序列化出的 uid
    flagcxUniqueId uid;
    if (local_rank == 0) {
        memcpy(&uid, engine->handler->uniqueId->internal,
               FLAGCX_UNIQUE_ID_BYTES);
    } else {
        uid = ri->second.uid;   // 远端(rank-0)的 uid
    }
 
    // ③ flagcxCommInitRank —— 阻塞!双边必须同时到达
    //    nranks=2,因为 NIXL 的 conn 是 point-to-point 的
    flagcxComm_t comm;
    flagcxCommInitRank(&comm, /*nranks=*/2, &uid, local_rank);
 
    // ④ 创建专用 stream(用 devHandle 接口,跨设备厂商通用)
    flagcxStream_t stream;
    engine->handler->devHandle->streamCreate(&stream);
 
    // ⑤ 组装 conn 对象
    auto *c         = new flagcx_nixl_conn_s;
    c->remote_agent = remote_agent;
    c->local_rank   = local_rank;
    c->remote_rank  = remote_rank;
    c->comm         = comm;
    c->stream       = stream;
 
    engine->peers[remote_agent] = c;
    *conn = c;
    return FLAGCX_NIXL_SUCCESS;
}

核心要点:uniqueId 通过 NIXL 自身的 getConnInfo / loadRemoteConnInfo metadata 交换通道传递,不需要单独的控制通道。


disconnect — 断开单条连接

void
flagcx_nixl_disconnect(flagcx_nixl_engine_t *engine,
                       flagcx_nixl_conn_t *conn)
{
    // 先 finalize 再 destroy(NCCL 语义:finalize 刷未完成操作)
    flagcxCommFinalize(conn->comm);
    flagcxCommDestroy(conn->comm);
 
    // 销毁专用 stream
    engine->handler->devHandle->streamDestroy(conn->stream);
 
    // 从 peers map 中移除
    engine->peers.erase(conn->remote_agent);
 
    delete conn;
}

submit — 提交 Send/Recv 操作

flagcx_nixl_status_t
flagcx_nixl_submit(flagcx_nixl_engine_t *engine,
                   flagcx_nixl_conn_t *conn,
                   flagcx_nixl_op_t op,
                   const flagcx_nixl_iov_t *iovs, size_t iov_count,
                   flagcx_nixl_req_t **req)
{
    // 多 IOV 用 group 合并为一次内核提交,减少 launch 开销
    if (iov_count > 1) flagcxGroupStart(conn->comm);
 
    for (size_t i = 0; i < iov_count; i++) {
        if (op == FLAGCX_NIXL_OP_SEND) {
            flagcxSend(iovs[i].addr, iovs[i].length,
                       flagcxUint8,          // 按字节传输
                       conn->remote_rank,    // peer rank(对端在 2-rank comm 中的 rank)
                       conn->comm, conn->stream);
        } else {
            flagcxRecv(iovs[i].addr, iovs[i].length,
                       flagcxUint8,
                       conn->remote_rank,
                       conn->comm, conn->stream);
        }
    }
 
    if (iov_count > 1) flagcxGroupEnd(conn->comm);
 
    // 创建 event 并 record 到 stream,后续 poll 时 eventQuery 判完成
    auto *r      = new flagcx_nixl_req_s;
    r->completed = false;
    engine->handler->devHandle->eventCreate(&r->event,
                                             flagcxEventDisableTiming);
    engine->handler->devHandle->eventRecord(r->event, conn->stream);
 
    *req = r;
    return FLAGCX_NIXL_IN_PROG;   // 异步,返回 IN_PROG
}

IOV 只含本地 addr+length,不需要远端地址/token——这是双边 Send/Recv 与单边 RDMA Put/Get 的本质区别。


poll — 查询请求完成状态

flagcx_nixl_status_t
flagcx_nixl_poll(flagcx_nixl_engine_t *engine,
                 flagcx_nixl_req_t *req)
{
    if (req->completed) return FLAGCX_NIXL_SUCCESS;
 
    // eventQuery:SUCCESS 表示 stream 上该 event 之前的所有操作已完成
    flagcxResult_t ret = engine->handler->devHandle->eventQuery(req->event);
    if (ret == flagcxSuccess) {
        req->completed = true;
        return FLAGCX_NIXL_SUCCESS;
    }
 
    // flagcxInProgress 或其他 → 尚未完成
    return FLAGCX_NIXL_IN_PROG;
}

release_req — 释放请求资源

void
flagcx_nixl_release_req(flagcx_nixl_engine_t *engine,
                        flagcx_nixl_req_t *req)
{
    engine->handler->devHandle->eventDestroy(req->event);
    delete req;
}

函数调用时序总览

NIXL 侧调用                        FlagCX transfer_engine.cpp 内部
─────────────────────────         ─────────────────────────────────
createBackend("FlagCX")
  └→ engine_create()               flagcxHandleInit → devHandle
                                    flagcxGetUniqueId → 生成 uid

getConnInfo()
  └→ get_conn_info()               序列化 [ver|agent|dev|uid] → 520B blob

  (NIXL metadata 交换把 blob 送到对端)

loadRemoteConnInfo("AgentB", blob)
  └→ load_remote_conn_info()        反序列化 blob → remote_infos["AgentB"]

registerMem()

connect("AgentB")
  └→ connect()                      local_rank = (local < remote) ? 0 : 1
                                    uid = rank-0 方的 uniqueId
                                    flagcxCommInitRank(2, uid, rank) ← 阻塞!
                                    devHandle->streamCreate
                                    if need put? flagcx_nixl_reg_mem:0;

postXfer(WRITE, local, remote)
  └→ submit(OP_SEND, iovs)          flagcxGroupStart (if multi-iov)
                                    flagcxSend × N
                                    flagcxGroupEnd
                                    eventCreate + eventRecord

checkXfer(req)
  └→ poll(req)                      devHandle->eventQuery(event)
                                    → SUCCESS / IN_PROG

releaseReqH(req)
  └→ release_req(req)               devHandle->eventDestroy + delete

destroyBackend()
  └→ engine_destroy()               遍历断开所有 conn
                                    flagcxHandleFree

单边路径 (Put) 增补

以下描述当底层 IBRC 适配器可用时,engine 如何从双边自动切换到单边 flagcxHeteroPut

结构体变更(在已有字段基础上新增):

engine:

struct flagcx_nixl_engine_s {
    // ... 同上 ...
    bool one_sided_avail = false;  // connect + reg_mem 后检测 netAdaptor->put
};

conn:

struct flagcx_nixl_conn_s {
    // ... 同上 ...
    bool     mem_registered = false;
    void    *window_base    = nullptr;    // 本端注册窗口基址
    size_t   window_size    = 0;
    void    *reg_handle     = nullptr;    // flagcxCommRegister 返回
};

req:

struct flagcx_nixl_req_s {
    // ... 同上 ...
    bool               is_put_path   = false;   // true = 走单边路径
    size_t             signal_offset = 0;       // Put 路径的 signal 偏移
    int                local_rank    = 0;       // for waitValue(poll 时用)
    flagcxHeteroComm_t hetero_comm   = nullptr; // for netAdaptor 访问(poll 时用)
};

reg_mem — 注册内存窗口(集合操作)

connect() 之后、submit() 之前调用。两端必须同时调用(内部 bootstrapAllGather 是集合操作)。

flagcx_nixl_status_t
flagcx_nixl_reg_mem(flagcx_nixl_engine_t *engine,
                    flagcx_nixl_conn_t *conn,
                    void *buf, size_t size)
{
    // 开启单边注册支持
    setenv("FLAGCX_ENABLE_ONE_SIDE_REGISTER", "1", 1);
 
    void *handle = nullptr;
    flagcxCommRegister(conn->comm, buf, size, &handle);
    //   └→ flagcxOneSideRegister()
    //       ├→ netAdaptor->regMr(buf, size)      // IB 内存注册 → rkey/lkey
    //       └→ bootstrapAllGather()               // 交换所有 rank 的 baseVa/rkey/lkey
    //           → 填充 globalOneSideHandles
 
    conn->mem_registered = true;
    conn->window_base    = buf;
    conn->window_size    = size;
    conn->reg_handle     = handle;
 
    // 检测单边能力:netAdaptor->put 是否可用
    flagcxHeteroComm_t hc = conn->comm->heteroComm;
    engine->one_sided_avail = (hc->netAdaptor && hc->netAdaptor->put);
 
    return FLAGCX_NIXL_SUCCESS;
}
supports_one_sided — 查询当前连接是否可走单边
bool
flagcx_nixl_supports_one_sided(flagcx_nixl_engine_t *engine,
                                flagcx_nixl_conn_t *conn)
{
    return engine->one_sided_avail && conn->mem_registered;
}

⑧’ submit — 双路径版本
flagcx_nixl_status_t
flagcx_nixl_submit(/* ... 同签名 ... */)
{
    auto *r      = new flagcx_nixl_req_s;
    r->completed = false;
 
    bool use_put = engine->one_sided_avail && conn->mem_registered;
 
    // ═══════ 单边 Put 路径 (WRITE 端) ═══════
    if (op == FLAGCX_NIXL_OP_PUT && use_put)
    {
        flagcxHeteroComm_t hc = conn->comm->heteroComm;
 
        for (size_t i = 0; i < iov_count; i++) {
            size_t src_off = (uintptr_t)iovs[i].addr
                           - (uintptr_t)conn->window_base;
            flagcxHeteroPut(hc, conn->remote_rank,
                            src_off, iovs[i].remote_offset, iovs[i].length);
        }
 
        // 写 signal:约定 signal 位于远端窗口最末 8B
        size_t sig_off = conn->window_size - 8;  // TODO: 正式的 signal 分配
        flagcxHeteroPutSignal(hc, conn->remote_rank, sig_off);
 
        r->is_put_path   = true;
        r->signal_offset = sig_off;
        r->local_rank    = conn->local_rank;
        r->hetero_comm   = hc;
        r->event         = nullptr;
        *req = r;
        return FLAGCX_NIXL_IN_PROG;
    }
 
    // ═══════ 单边 Wait 路径 (READ 端) ═══════
    if (op == FLAGCX_NIXL_OP_WAIT && use_put)
    {
        // READ 端无需做数据传输——数据通过 RDMA 直接写入本地窗口
        // 仅需准备 signal 轮询
        size_t sig_off = conn->window_size - 8;
 
        r->is_put_path   = true;
        r->signal_offset = sig_off;
        r->local_rank    = conn->local_rank;
        r->hetero_comm   = conn->comm->heteroComm;
        r->event         = nullptr;
        *req = r;
        return FLAGCX_NIXL_IN_PROG;
    }
 
    // ═══════ 双边 Send/Recv 路径 (fallback) ═══════
    // 当 OP_PUT 但 use_put==false 时,也走此路径(自动降级为 Send)
    r->is_put_path = false;
 
    if (iov_count > 1) flagcxGroupStart(conn->comm);
 
    for (size_t i = 0; i < iov_count; i++) {
        if (op == **FLAGCX_NIXL_OP_SEND** || op == FLAGCX_NIXL_OP_PUT) {
            flagcxSend(iovs[i].addr, iovs[i].length, flagcxUint8,
                       conn->remote_rank, conn->comm, conn->stream);
        } else {
            flagcxRecv(iovs[i].addr, iovs[i].length, flagcxUint8,
                       conn->remote_rank, conn->comm, conn->stream);
        }
    }
 
    if (iov_count > 1) flagcxGroupEnd(conn->comm);
 
    engine->handler->devHandle->eventCreate(&r->event, flagcxEventDisableTiming);
    engine->handler->devHandle->eventRecord(r->event, conn->stream);
 
    *req = r;
    return FLAGCX_NIXL_IN_PROG;
}

OP_PUT 被请求但 use_put == false 时(IBRC 不可用),自动回退到 flagcxSend(双边路径)。

⑨’ poll — 双路径版本
flagcx_nixl_status_t
flagcx_nixl_poll(flagcx_nixl_engine_t *engine, flagcx_nixl_req_t *req)
{
    if (req->completed) return FLAGCX_NIXL_SUCCESS;
 
    if (req->is_put_path) {
        // Put 路径:轮询本端 signal offset
        // waitValue 检查 globalOneSideHandles[local_rank].baseVa + signal_offset
        void **gHandles = (void **)globalOneSideHandles;
        flagcxResult_t ret = req->hetero_comm->netAdaptor->waitValue(
            gHandles, req->local_rank, req->signal_offset, /*expected=*/1);
        if (ret == flagcxSuccess) {
            req->completed = true;
            return FLAGCX_NIXL_SUCCESS;
        }
        return FLAGCX_NIXL_IN_PROG;
    }
 
    // Send/Recv 路径:eventQuery
    flagcxResult_t ret = engine->handler->devHandle->eventQuery(req->event);
    if (ret == flagcxSuccess) {
        req->completed = true;
        return FLAGCX_NIXL_SUCCESS;
    }
    return FLAGCX_NIXL_IN_PROG;
}
⑩’ release_req — 双路径版本
void
flagcx_nixl_release_req(flagcx_nixl_engine_t *engine, flagcx_nixl_req_t *req)
{
    if (!req->is_put_path && req->event) {
        engine->handler->devHandle->eventDestroy(req->event);
    }
    // Put 路径没有创建 event,无需释放
    delete req;
}

单边路径调用时序
Agent A (WRITE 端)                                  Agent B (READ 端)
──────────────────                                  ─────────────────
engine_create                                       engine_create
get_conn_info → blob_A                              get_conn_info → blob_B
             (交换 blob)
loadRemoteConnInfo("B", blob_B)                     loadRemoteConnInfo("A", blob_A)
connect("B") ←── flagcxCommInitRank 阻塞 ──→        connect("A")

reg_mem(gpu_buf_a, size) ←── allGather 阻塞 ──→     reg_mem(gpu_buf_b, size)
  │ regMr + 交换 rkey/lkey/baseVa                    │ regMr + 交换 rkey/lkey/baseVa
  │ one_sided_avail = true ✓                         │ one_sided_avail = true ✓

submit(OP_PUT, iovs)                                submit(OP_WAIT, iovs)
  │ flagcxHeteroPut(srcOff, dstOff, size)            │ (无数据操作, 仅设 signal_offset)
  │ flagcxHeteroPutSignal(sig_off)                   │
  └→ req {is_put_path=true}                          └→ req {is_put_path=true}

poll(req) → SUCCESS (发起方 fire-and-forget)          poll(req) → waitValue(sig_off)
                                                       → IN_PROG / SUCCESS
双路径 vs 单路径 对比
维度双边 Send/Recv单边 Put
适配器要求全部仅 IBRC
内存注册不需要flagcxCommRegister (集合)
地址模型本地 addr+lensrcOffset + dstOffset (窗口内偏移)
FIFO 约束有——Send/Recv 必须配对无——RDMA Write 地址寻址
完成检测eventQuery (stream event)waitValue (轮询 signal)
WRITE 端flagcxSendflagcxHeteroPut + putSignal
READ 端flagcxRecv无数据操作, 仅 waitValue
自动降级use_put==false 时回退到 Send

2.3 NIXL 侧:flagcx_backend.h / flagcx_backend.cpp 的改动

  1. 去掉远端内存相关逻辑

    • nixlFlagcxBackendMD 不再需要 remote_memremote_tokenexported_public_md 字段
    • loadRemoteMD() 不再调 flagcx_nixl_import_mem(),只记录 remote_agent + base_addr + length
    • getPublicData() 仍然序列化 base_addr/length/device_id(让远端知道 range 用于验证,但不传 token)
  2. postXfer() 简化

    nixl_status_t
    nixlFlagcxEngine::postXfer(const nixl_xfer_op_t &operation,
                                const nixl_meta_dlist_t &local,
                                const nixl_meta_dlist_t &remote,
                                const std::string &remote_agent,
                                nixlBackendReqH *&handle,
                                const nixl_opt_b_args_t *opt_args) const {
        // ... prepXfer if needed (build IOV) ...
     
        // 映射: NIXL_WRITE → SEND, NIXL_READ → RECV
        flagcx_nixl_op_t sb_op =
            (operation == NIXL_WRITE) ? FLAGCX_NIXL_OP_SEND
                                      : FLAGCX_NIXL_OP_RECV;
     
        // IOV 只需 local addr + length,不需要 remote_token/remote_offset
        std::vector<flagcx_nixl_iov_t> iovs(lcnt);
        for (int i = 0; i < lcnt; i++) {
            iovs[i].addr   = reinterpret_cast<void*>(local[i].addr);
            iovs[i].length = local[i].len;
        }
     
        flagcx_nixl_submit(engine_, conn, sb_op,
                           iovs.data(), iovs.size(), &req_h->req);
     
        return NIXL_IN_PROG;
    }

    IOV 里不再有 remote_tokenremote_offset,因为数据面是 flagcxSend/Recv,不需要知道远端地址。

  3. supportsNotif() 改为 false

    Phase 0 不走通知。上层如果需要知道对端完成,靠应用层自行同步。

  4. nixlFlagcxReqH 简化

    去掉 want_notifnotif_msgnotif_sent 字段。

  5. 双路径 (Put) 支持

    • registerMem():仍为 no-op(不调 flagcx 注册),但缓存 addr/size/devId 供 connect 后调 reg_mem 使用
    • connect() 返回后,立即调 flagcx_nixl_reg_mem(engine, conn, cached_buf, cached_size) 注册内存窗口(集合操作,双端同步)
    • postXfer() 路径选择:
      if (flagcx_nixl_supports_one_sided(engine, conn)):
          WRITE → OP_PUT  (IOV.remote_offset = remote[i].addr - remote_window_base)
          READ  → OP_WAIT
      else:
          WRITE → OP_SEND
          READ  → OP_RECV
      
    • checkXfer() 无变化——底层 poll() 自动区分 Put/Send 路径

3. connInfo 如何承载 uniqueId

这是双边方案最关键的设计点。

NIXL 的 metadata 交换流程是:

Agent A: getConnInfo() → blob_A
Agent B: getConnInfo() → blob_B

(外部交换 blob_A ↔ blob_B,例如通过 etcd / gRPC / 文件系统)

Agent A: loadRemoteConnInfo("AgentB", blob_B)
Agent B: loadRemoteConnInfo("AgentA", blob_A)

Agent A: connect("AgentB")
Agent B: connect("AgentA")

getConnInfo() 里无条件生成 flagcxUniqueId 并序列化到 blob。

connect() 里:

local_rank = (local_agent < remote_agent) ? 0 : 1

if local_rank == 0:
    uid = 从自己的 getConnInfo 结果里取(engine create 时已生成)
else:
    uid = 从对端的 loadRemoteConnInfo blob 里解析

flagcxCommInitRank(&comm, 2, &uid, local_rank)

blob 格式建议:

[4B version][256B agent_name][8B host_hash][4B device_id][256B flagcxUniqueId]

双边都生成 uniqueId,但约定只用 rank-0 方的那份。 这样无论哪边先调 getConnInfo(),结果都是确定性的。


4. 双边协调约束

方案一的唯一硬性要求是:双边必须按相同顺序配对 Send/Recv。

FlagCX(继承 NCCL 语义)的 Send/Recv 是 FIFO 配对的——同一条 comm 上的第 N 个 Send 对应对端的第 N 个 Recv。

这意味着上层使用模式必须是:

# Agent A (sender)
nixl_agent_a.postXfer(NIXL_WRITE, local_buf_1, remote_desc_1, "AgentB")  # → flagcxSend #1
nixl_agent_a.postXfer(NIXL_WRITE, local_buf_2, remote_desc_2, "AgentB")  # → flagcxSend #2
 
# Agent B (receiver) —— **必须以相同顺序**
nixl_agent_b.postXfer(NIXL_READ, local_buf_1, remote_desc_1, "AgentA")   # → flagcxRecv #1
nixl_agent_b.postXfer(NIXL_READ, local_buf_2, remote_desc_2, "AgentA")   # → flagcxRecv #2

如果顺序不一致(如 B 先 post #2 再 post #1),会死锁或数据错乱。

对于 KV cache 场景,这个约束完全可以满足——prefill/decode 的传输模式是预定义的。

单边路径 (Put) 不受此约束——RDMA Write 是地址寻址的(指定 srcOffset/dstOffset),不存在 FIFO 配对问题。但双边 fallback 路径仍受此约束。


5. 上层使用示例

// ========== Agent A (prefill, 要 WRITE 给 Agent B) ==========
 
nixlAgent agentA("AgentA");
agentA.createBackend("FlagCX", {{"device_id", "0"}});
 
// 注册本地 GPU buffer
nixlBlobDesc send_buf = {.addr = gpu_ptr_a, .len = 4096, .devId = 0};
nixlRegDList send_reg(VRAM_SEG, {send_buf});
agentA.registerMem(send_reg);
 
// 从 Agent B 拿到 connInfo + publicMD
agentA.loadRemoteConnInfo("AgentB", blob_b);
 
// 构造远端描述(只需知道 base_addr 和 length 用于对齐校验)
nixlBlobDesc recv_desc = {.addr = gpu_ptr_b, .len = 4096, .devId = 0};
// ... loadRemoteMD ...
 
// 发起传输
auto req = agentA.createXferReq(NIXL_WRITE, send_reg, recv_descs, "AgentB");
agentA.postXferReq(req);             // → 内部: flagcxSend(gpu_ptr_a, 4096, ...)
while (agentA.checkXferReq(req) == NIXL_IN_PROG) { /* spin */ }
 
// ========== Agent B (decode, 要 READ 来自 Agent A) ==========
 
nixlAgent agentB("AgentB");
agentB.createBackend("FlagCX", {{"device_id", "0"}});
 
nixlBlobDesc recv_buf = {.addr = gpu_ptr_b, .len = 4096, .devId = 0};
nixlRegDList recv_reg(VRAM_SEG, {recv_buf});
agentB.registerMem(recv_reg);
 
agentB.loadRemoteConnInfo("AgentA", blob_a);
 
auto req = agentB.createXferReq(NIXL_READ, recv_reg, send_descs, "AgentA");
agentB.postXferReq(req);             // → 内部: flagcxRecv(gpu_ptr_b, 4096, ...)
while (agentB.checkXferReq(req) == NIXL_IN_PROG) { /* spin */ }

6. 开发任务拆分

Task 1: FlagCX 侧 — transfer_engine.h (新版)

  • 精简后的 C ABI 头文件,只保留 11 个函数
  • 去掉所有控制消息、远端内存、通知相关类型

Task 2: FlagCX 侧 — transfer_engine.cpp

实现以下函数(预估 ~300 行):

| 函数 | 实现要点 | |-----|---------|| | engine_create | 记录 agent name / device_id,生成 flagcxUniqueId | | engine_destroy | 清理 peers map | | get_conn_info | 序列化 agent + host_hash + device_id + uniqueId | | load_remote_conn_info | 解析 blob,缓存到 map | | connect | 从 rank-0 方 blob 取 uid → flagcxCommInitRank(2, uid, rank) → 创建 stream | | disconnect | flagcxCommFinalize + flagcxCommDestroy + 销毁 stream | | reg_mem | flagcxCommRegister → IB regMr + AllGather rkey(集合操作)| | supports_one_sided | 查询 netAdaptor->put 是否可用 | | submit | 双路径:OP_PUT → flagcxHeteroPut + putSignal;OP_SEND/RECV → flagcxSend/Recv + eventRecord | | poll | 双路径:Put → waitValue;Send/Recv → eventQuery | | release_req | eventDestroy (仅 Send/Recv path) + free |

Task 3: NIXL 侧 — flagcx_backend.h / .cpp 适配

  • supportsNotif()false
  • 去掉 remote_tokenremote_mem 相关字段和逻辑
  • postXfer 的 IOV 只填 local addr + length
  • 去掉 genNotif / getNotifs 实现(返回 NOT_SUPPORTED)
  • 去掉 checkXfer 里的 notif 发送逻辑

Task 4: 构建 & 测试

  • FlagCX 侧编译 libflagcx_nixl.so
  • NIXL 侧 meson.build 链接此 lib
  • 写一个最小双进程测试:Agent A WRITE + Agent B READ,验证数据正确性

7. 后续演进路径

Phase 0 (当前)                Phase 1
双路径: Send/Recv + Put       完整单边 + 控制面
IBRC 可用时自动升级 Put        Put/Get 都完善
reg_mem 后走单边              控制通道 + progress thread
双边 fallback 保底            engine 内部自动配对