目标:在 FlagCX 内构建一套统一的、多后端多线程的 IBRC P2P 传输栈,兼容 nixl backend 的 batchGet 与 flagcx connector 的 batchPut 调用。


1. 背景

1.1 两类上层调用

┌────────────────────────────────────────────────────────────────────┐
│                     nixl backend (C++ plugin)                       │
│  postXfer(local_descs, remote_descs)                                │
│   └─ 已切好的 IOV 列表(每个 desc 由 connector 内                     │
│       block_offset = block_id * block_len_per_layer 计算得到)       │
│   └─ 每个 IOV 对应一次独立 RDMA READ                                  │
└──────────────────────────────────┬─────────────────────────────────┘
                                   │ flagcxP2pEngineReadVector(...)
                                   ▼
┌────────────────────────────────────────────────────────────────────┐
│                mooncake/flagcx connector (Python wrapper)          │
│  flagcxBatchPut(comm, peer, src_offs, dst_offs, sizes,              │
│                 src_mrs, dst_mrs, count)                            │
│   └─ 已通过 group_concurrent_contiguous() 把多个连续 block 拼接为      │
│       较大的 entry,每个 entry 仍可能 >> 64K                          │
│   └─ 每个 entry 需要进一步切成 64K slice 后再下发                      │
└───────────────────────────────────────────────────────────────────┘

1.2 各后端当前丢下来的数据

(a) nixl backend — 一个 desc 对应一个 IOV

postXfer 中每个 local[i] / remote[i] 已是上层切好的最小单位。prepXfer 阶段把远端描述符反序列化为 FlagcxP2pRdmaDesc

AsyncTransferTask                          ← 一次 ReadVector/WriteVector 调用
  conn: FlagcxP2pConn*                     ← 连接句柄(含 sendComm/QP)
  op: ASYNC_XFER_READ | ASYNC_XFER_WRITE
  numIovs: int                             ← iov 总数
  dataVec: vector<void*>                   ← 本地目标地址[numIovs]
  sizeVec: vector<size_t>                  ← 每个 iov 的字节数[numIovs]
  descs: vector<FlagcxP2pRdmaDesc>         ← 远端描述符[numIovs]
    └── FlagcxP2pRdmaDesc
          addr: uint64_t                   ← 远端内存虚拟地址
          size: uint32_t
          rkey: uint32_t                   ← 远端 MR 的 rkey
  localEntries: vector<FlagcxP2pMemRegEntry>  ← 本地 MR 查表结果[numIovs]
    └── FlagcxP2pMemRegEntry
          mhandle: void*                   ← 指向 FlagcxP2pMrHandleView
            └── FlagcxP2pMrHandleView
                  baseVa: uintptr_t        ← 本地 MR 基址
                  lkey: uint32_t           ← 本地 lkey
                  rkey: uint32_t
                  ibDevN: int              ← 绑定的 IB 设备号
          ibDevN: int                      ← 校验用
  done: atomic<bool>                       ← 完成标志(worker 写,调用方轮询)
  result: atomic<int>0 成功 / -1 失败

进入 flagcxP2pEngineReadVector 时持有:

字段来源含义
dstVec[i]local[i].addr本地 VA(READ 目标)
sizeVec[i]local[i].len == remote[i].len长度(已 ≤ block 粒度)
mrIds[i]lmd->mr_id本地 MR ID → lkey
descs[i]FlagcxP2pRdmaDesc远端 (addr, rkey)

特征:上层已切好,IOV size 通常等于 KV cache 单 block 字节数(数 KB ~ 数百 KB),不需要再切

(b) mooncake backend — 已 group 后的 entry,在 entry 上继续 64k 切

BatchDesc                          ← 一次 batch_transfer_sync_write 调用
  └── task_list: vector<TransferTask>   ← 每个 (src_ptr, dst_ptr, length) entry 一个 task
        └── TransferRequest              ← task 持有原始请求 { source, target_id, target_offset, length }
        └── slice_list: vector<Slice*>  ← task 被切成多个 64KB Slice,每个 Slice = 1 个 RDMA WR
              ├── source_addr            ← 本地源地址(TransferRequest.source + offset)
              ├── length                 ← 最多 64KB
              └── rdma.dest_addr        ← 远端目标地址
              └── rdma.dest_rkey        ← 远端 rkey(在 submitPostSend 时填入)

1.3 总图

2. 第一步:统一的 Slice 数据结构

2.1 核心数据结构

  1. 为什么还要 FlagcxSlice? desc 是 nixl 内序列化和反序列化的结构体,我没法直接拓展完放在 flagcx connector 内使用。

  2. 为什么现在的线程不行? 现在的视角看 只有一个 asyncWorker 线程去下 wr,Mooncake 有 2 个,ucx 有 4 个去下 wr。见: https://zhuanlan.zhihu.com/p/1939988652114580803 mooncake 作者自己也说到了他们验证过后要么提高 batchSize 要么提高 task submission thread 数量。所以除了线性拓展现在的asyncWorker 数量还要兼容 flagcx connector。

// 前向声明(FlagcxSlice 与 FlagcxTransferTask 互引)
struct FlagcxSlice;
 
// ============================================================================
// 1) 任务:聚合多个 slice 的完成状态
//    flagcxP2pEngineXferStatus(xferId) 直接查这个结构的计数
// ============================================================================
struct FlagcxTransferTask {
  // —— 核心计数(slice 完成时原子增长)——————————————————————
  std::atomic<uint64_t> sliceCount{0};         // buildSlices 期间累加
  std::atomic<uint64_t> doneSliceCount{0};     // markSuccess + markFailed 共用
  // —— Slice 容器(只在 buildSlices 阶段写入;XferStatus 命中后批量 delete)—
  std::vector<FlagcxSlice*> sliceList;
 
  inline bool isAllDone() const {
    const auto total = sliceCount.load(std::memory_order_acquire);
    const auto done  = doneSliceCount.load(std::memory_order_acquire);
    return total > 0 && done >= total;
  }
};
 
// ============================================================================
// 2) Slice 主体
//    sizeof ≈ 52B + std::string (peerNicPath)
//
//   说明:FlagCX 是 GPU↔最近 NIC 亲和模型,每个 engine 只有一个 WorkerPool
//   (绑定到 GPU 的最近 NIC)。slice 不需要带本地 ibDevN——它隐含在「我属于
//   这个 pool」里。peerNicPath 仍然要带,因为它是 shard hash key 和远端
//   endpoint 查找 key。
// ============================================================================
struct FlagcxSlice {
  uint64_t srcVa;         // WRITE: local src VA;  READ: local dst VA
  uint64_t dstVa;         // WRITE: remote dst VA; READ: remote src VA
  uint32_t length;        // 单次 WR 字节数(≤ blockSize,由 buildSlices 决定)
  uint32_t lkey;          // 本地 MR lkey
  uint32_t rkey;          // 远端 MR rkey
  uint8_t  opcode;        // IBV_WR_RDMA_READ / IBV_WR_RDMA_WRITE
 
  // ─── 路由 ─────────────────────────────────────────────────────────
  std::string       peerNicPath;  // shard hash key + adaptorGetEndpoint key
 
  // ─── 完成追踪 ───────────────────────────────────────────────────
  FlagcxTransferTask *task{nullptr};   // 反指 task(计数聚合)
 
  // ─── QP 深度(worker 一对多 QP 时用)──────────────────────────────
  // 单 worker 拥有多 QP 时,adaptor 在 ibv_post_send 后 ++(*qpDepth),
  // poll 完成后 --(*qpDepth),让 worker 在 post 时按 QP 深度做 round-robin
  volatile int *qpDepth{nullptr};
 
  // ─── 完成回调(任务级聚合)────────────────────────────────────────
  inline void markSuccess() {
    task->doneSliceCount.fetch_add(1, std::memory_order_relaxed);
  }
  inline void markFailed() {
    task->doneSliceCount.fetch_add(1, std::memory_order_relaxed);
  }
};
 
 
// ============================================================================
// 3) 变化图
//    sizeof ≈ 64B + std::string (peerNicPath)
// ============================================================================
  AsyncTransferTask                          FlagcxTransferTask
  ├─ conn                          ─────────► ├─ (conn 不再放 task 里;slice 内部携带
  ├─ op                                       │   peerNicPath 做 shard hash + endpoint 路由)
  ├─ numIovs (== iov 数)            ─────────►├─ sliceCount (== slice 数,可能 > iov 数)
  ├─ dataVec[]      ┐                         │
  ├─ sizeVec[]      ├─ 三个并行 vector,按 i  ─►├─ sliceList[]
  ├─ descs[]        │  对齐表示一个 IOV         │  每个 entry 是一个 *FlagcxSlice
  ├─ localEntries[] ┘                         │
  ├─ done                          ─────────►├─ isAllDone()
  └─ result                        ─────────►└─ doneSliceCount
 
  FlagcxP2pRdmaDesc (wire, 64B, 不动)         FlagcxSlice (runtime, ~52B + str, 新增)
                                              ├─ srcVa / dstVa / length / lkey / rkey / opcode
                                              ├─ peerNicPath
                                              └─ task* / qpDepth*

2.3 Template Policy:兼容两种切法

// ─── Policy A:nixl 路径,1 desc = 1 slice,不再二次切 ─────────────────
//   语义:保持 nixl block 粒度,便于在 4 个 QP 上并行打到 4 个 desc
struct NixlSlicePolicy {
  static constexpr bool   kFurtherCut    = false;
  static constexpr size_t kBlockSize     = SIZE_MAX;
  static constexpr size_t kFragmentSize  = 0;
};
 
// ─── Policy B:connector 路径,按 64K 切,末尾合并 ───────────────
//   语义:与 Mooncake submitTransferTask() 同款切法,上层 mooncake 丢下来的可能是连续的几个 block
struct FlagcxSlicePolicy {
  static constexpr bool   kFurtherCut    = true;
  static constexpr size_t kBlockSize     = 64 * 1024;
  static constexpr size_t kFragmentSize  = 4  * 1024;
};

2.4 切片接口

// 唯一的 slice 构造接口(声明在 flagcx_p2p.h,实现在 flagcx_p2p.cc)
//
// 责任:把一段「绝对化的 (srcVa, dstVa, length)」切成若干个 FlagcxSlice,
//       挂到 task->sliceList,并 task->sliceCount += N。
// 不负责:建 task、submitPostSend、释放 slice(由 caller / XferStatus 负责)。
template <typename Policy>
void buildSlices(
    FlagcxTransferTask        *task,            // 上层已 new
    uint64_t                   srcVa,           // 已绝对化的本地 VA
    uint64_t                   dstVa,           // 已绝对化的远端 VA
    size_t                     totalLen,        // 待切的总长
    uint32_t                   lkey,            // 本地 MR lkey
    uint32_t                   rkey,            // 远端 MR rkey
    uint8_t                    opcode,          // READ / WRITE
    const std::string         &peerNicPath);    // shard hash + endpoint key
 
// 实现伪码:
//   if constexpr (!Policy::kFurtherCut):
//       // 1:1 路径
//       FlagcxSlice *s = new FlagcxSlice;
//       s->{srcVa, dstVa, length=totalLen, lkey, rkey, opcode,
//           peerNicPath, task} = ...
//       task->sliceList.push_back(s);
//       task->sliceCount.fetch_add(1);
//       return;
//   else:
//       // 64K 切 + 末尾合并
//       size_t off = 0;
//       while (off < totalLen):
//           bool merge = (totalLen - off) <= kBlockSize + kFragmentSize;
//           size_t len = merge ? (totalLen - off) : kBlockSize;
//           new slice with off-relative srcVa/dstVa, length=len ...
//           off += len;
//           if (merge) break;

2.5 两类调用方 build slice 的入口

flagcxP2pEngineWriteVector 和 flagcxP2pEngineReadVector内部 task 结构补充以上字段即可,然后 ensureAsyncWorkerStarted

(a) nixl 路径(READ / WRITE 都走 NixlSlicePolicy

// 入口是 flagcxP2pEngineReadVector
auto *task = new FlagcxTransferTask;
for (int i = 0; i < numIovs; ++i) {
  auto &le = localEntries[i];
  uint32_t lkey = ((FlagcxP2pMrHandleView*)le.mhandle)->lkey;
 
  buildSlices<NixlSlicePolicy>(
      task,
      /*srcVa=*/  isWrite ? (uint64_t)dstVec[i] : descs[i].addr,
      /*dstVa=*/  isWrite ? descs[i].addr        : (uint64_t)dstVec[i],
      /*totalLen=*/ sizeVec[i],
      lkey, descs[i].rkey,
      isWrite ? IBV_WR_RDMA_WRITE : IBV_WR_RDMA_READ,
      conn->peerNicPath);
}

(b) flagcx connector 路径(统一 WRITE,走 FlagcxSlicePolicy

// 入口是 flagcxP2pEngineWriteVector
auto *task = new FlagcxTransferTask;
for (int i = 0; i < numIovs; ++i) { // entry 粒度
  auto &le = localEntries[i];
  uint32_t lkey = ((FlagcxP2pMrHandleView*)le.mhandle)->lkey;
 
  buildSlices<FlagcxSlicePolicy>(
      task,
      /*srcVa=*/   (uint64_t)dstVec[i],         // connector Python 已绝对化
      /*dstVa=*/   descs[i].addr,
      /*totalLen=*/ sizeVec[i],
      lkey, descs[i].rkey,
      IBV_WR_RDMA_WRITE,
      conn->peerNicPath);
}

3. 第二步:WorkerPool 设计

只放在 flagcx_p2p.cc 内部,类声明也写在/Users/joker/Desktop/project/baai/FlagCX/flagcx/include/flagcx_p2p.h(不外暴露)。

3.1 架构(per-engine 单 pool)

FlagCX 是 GPU↔最近 NIC 亲和模型:每个进程一个 engine,每个 engine 绑定到 GPU 的最近 NIC,因此只有一个 WorkerPool。所有线程生命周期统一由 WorkerPool 管理,engine 本身不持有任何线程。

FlagcxP2pEngine (per-process / per-GPU)
├─ adaptor (flagcxNetIbP2p)
└─ FlagcxWorkerPool   ← 单例,绑到 GPU 最近的 NIC
    ├─ shared_cq_                       ← 本 pool 拥有,所有 conn × QP 共享同一个 CQ
    ├─ notifWorker                       ← engine 启动时一并 spawn(仅 nixl 路径用)
    ├─ transferWorker[0]  ─┐
    ├─ transferWorker[1]   │  循环:performPostSend(本 worker shard)
    │   ...                │      + performPollCq(shared_cq_)
    └─ transferWorker[N-1] ┘  worker 数:env FLAGCX_P2P_WORKERS_PER_POOL,默认 2

无 monitorWorker。

生命周期规则

  • flagcxP2pEngineCreate:选定本进程 GPU 的最近 NIC,建一个 WorkerPool;pool ctor 内 ibv_create_cq + spawn N 个 transferWorker + 1 个 notifWorker。engine 不持有任何 std::thread
  • flagcxP2pEngineDestroy:销毁 pool(dtor 退出全部 transferWorker + notifWorker → CQ 销毁)。

未来扩展:如果 GPU 跨多 NIC 平摊(multi-rail),engine 可以持有多个 pool(std::vector<FlagcxWorkerPool>)。slice 加 int poolIdx 做路由即可,目前不做

3.2 类骨架(写在 flagcx_p2p.cc)

不含任何容错/重试/健康统计字段。失败 slice 直接 markFailed() 计数,不 redispatch。

class FlagcxWorkerPool {
 public:
  FlagcxWorkerPool(int ibDevN, ibv_context *ctx, int numaSocketId,
                   FlagcxP2pEngine *engine);
  // ctor 内:
  //   shared_cq_ = ibv_create_cq(ctx, kSharedCqDepth, ...);
  //   spawn N transferWorker + 1 notifWorker
  ~FlagcxWorkerPool();
 
  // adaptor 建 conn 时调,拿共享 CQ 用于 ibv_create_qp
  ibv_cq *getSharedCq() const { return shared_cq_; }
 
  // adaptor 建好 QP 后注册到 pool,pool 按 round-robin 把 qp 归到某个 worker
  int  registerQp(ibv_qp *qp);
  void unregisterQp(ibv_qp *qp);
 
  // 上层提交 slice
  int submitPostSend(const std::vector<FlagcxSlice*> &slices);
 
 private:
  void transferWorker(int threadId);
  void performPostSend(int threadId);
  void performPollCq(int threadId);
  void notifWorker();   // 逻辑与原 notifPollThreadFunc 相同
 
  // ─── 共享资源 ──────────────────────────────────────────────────────
  ibv_cq                           *shared_cq_;
  std::vector<std::vector<ibv_qp*>> qpShards_;
  std::atomic<int>                  qpRegisterCounter_{0};
  FlagcxP2pEngine                  *engine_;       // notifWorker 需要访问 engine
 
  // ─── Sharded slice queue(hash(peerNicPath) → shard)────────────────
  static constexpr int kShardCount = 8;
  using SliceList = std::vector<FlagcxSlice*>;
 
  std::unordered_map<std::string, SliceList> slice_queue_[kShardCount];
  std::atomic<uint64_t>                      slice_queue_count_[kShardCount];
  TicketLock                                 slice_queue_lock_[kShardCount];
 
  // ─── 进度计数(决定 worker 是否进 cond_var 休眠)────────────────────
  std::atomic<uint64_t>     submitted_slice_count_{0};
  std::atomic<uint64_t>     processed_slice_count_{0};
  std::atomic<int>          suspended_flag_{0};
  std::condition_variable   cond_var_;
  std::mutex                cond_mutex_;
 
  // ─── 线程句柄 ────────────────────────────────────────────────────────
  std::atomic<bool>         workers_running_{true};
  std::vector<std::thread>  worker_threads_;   // transferWorker[0..N-1] + notifWorker
};

3.3 主流程语义

(a) submitPostSend —— 生产侧

1. 按 hash(peerNicPath) % kShardCount 分桶到 8 个 shard
2. 每个 shard 拿一次 TicketLock,整体并入 slice_queue_,
   slice_queue_count_[sh].fetch_add(N)
3. submitted_slice_count_.fetch_add(totalN, release)
4. if (suspended_flag_.load(acquire) > 0)
       cond_var_.notify_all();

(b) transferWorker

void FlagcxWorkerPool::transferWorker(int thread_id) {
  bindToSocket(numa_socket_id_);
  const static uint64_t kWaitPeriodInNano = 100000000;  // 100ms
  uint64_t last_wait_ts = getCurrentTimeInNano();
  while (workers_running_.load(std::memory_order_relaxed)) {
    auto processed = processed_slice_count_.load(std::memory_order_relaxed);
    auto submitted = submitted_slice_count_.load(std::memory_order_relaxed);
    if (processed == submitted) {
      uint64_t now = getCurrentTimeInNano();
      if (now - last_wait_ts > kWaitPeriodInNano) {
        std::unique_lock<std::mutex> lock(cond_mutex_);
        suspended_flag_.fetch_add(1);
        // double-check 防 lost wakeup
        if (processed_slice_count_.load(std::memory_order_relaxed) ==
            submitted_slice_count_.load(std::memory_order_relaxed)) {
          cond_var_.wait_for(lock, std::chrono::seconds(1));
        }
        suspended_flag_.fetch_sub(1);
        last_wait_ts = now;
      }
      continue;
    }
    performPostSend(thread_id);
#ifndef USE_FAKE_POST_SEND
    performPollCq(thread_id);
#endif
  }
}

(c) performPostSend —— 按 opcode 分发到 adaptor

1. 从本线程负责的 shards(thread_id, thread_id+W, thread_id+2W, ...)
   拉 slices 出来(每个 shard 加一次 TicketLock)
2. 按 peerNicPath 分组,对每组:
       reads, writes = splitByOpcode(slices)
       if !reads.empty():
           flagcxP2pIgetBatch(sendComm, reads.size(), reads.data())
       if !writes.empty():
           flagcxP2pIputBatch(sendComm, writes.size(), writes.data())
   adaptor 内部:
       - 在本 worker 拥有的 QP 子集上 round-robin 下 WR;
       - wr_id = (uintptr_t)slice;
       - ibv_post_send 失败 → slice->markFailed() + processed_slice_count_++

不做 redispatch;ibv_post_send 失败直接 markFailed,task 计数体现失败。

(d) performPollCq —— 收割 shared CQ

n = ibv_poll_cq(shared_cq_, wc[64], 64)   // 直接 poll pool 的共享 CQ
for i in 0..n:
    slice = (FlagcxSlice*) wc[i].wr_id
    --(*slice->qpDepth)
    if wc[i].status != IBV_WC_SUCCESS:
        slice->markFailed()
    else:
        slice->markSuccess()
    processed_slice_count_.fetch_add(1)

shared CQ 下,任何 worker poll 到的 WC 都直接处理,不需要按 cq shard 路由。ibv_poll_cq 本身线程安全,每个 WC 只会被一个 caller 拿走。

3.4 ibrc_p2p_adaptor.cc 的改动

(a) vtable 槽位填充

struct flagcxNetAdaptor flagcxNetIbP2p = {
    "IB_P2P",
    flagcxP2pInit,
    /* ...原有不变... */
    flagcxP2pIput, flagcxP2pIget, flagcxP2pIputSignal,
    flagcxP2pGetDevFromName,
 
    flagcxP2pIputBatch,   // ← 原 nullptr,新增实现
    flagcxP2pTestBatch,   // ← 暂保留,新路径不使用
    flagcxP2pIgetBatch,   // ← 重构:签名换成 slice 风格
};

(b) flagcxP2pIgetBatch / flagcxP2pIputBatch —— 统一 slice 风格签名

⚠️ 这是相对 v1 的关键变更flagcxP2pIgetBatch 旧签名(uint64_t *srcOffs / void **requests 等)整体替换为 slice 风格。由于 nixl plugin 通过 flagcxP2pEngineReadVector 间接调用、没有任何上层直接打 adaptor,这个改动对 nixl plugin 零感知

// 唯一签名,两个函数对称
flagcxResult_t flagcxP2pIgetBatch(
    void           *sendComm,   // FlagcxP2pCommView*
    int             count,
    FlagcxSlice   **slices);    // 已 build 好的 READ slice 列表
 
flagcxResult_t flagcxP2pIputBatch(
    void           *sendComm,
    int             count,
    FlagcxSlice   **slices);    // 已 build 好的 WRITE slice 列表

实现要点(两函数共享 helper):

  1. caller(worker pool)传入的 slice 列表,已按所有权过滤为本 worker 拥有的 QP 子集——即 caller 已经知道「这批要打哪些 QP」。adaptor 内部只在这个 QP 子集上 round-robin。
  2. 每 QP 一次链式 ibv_post_send,最多 FLAGCX_P2P_IGET_BATCH_MAX_WR = 64 个 WR。
  3. 每个 WR:
    wr.wr_id              = (uintptr_t)slice;      // 关键:shared CQ 直接拿 slice
    wr.opcode             = slice->opcode;          // READ or WRITE
    wr.send_flags         = IBV_SEND_SIGNALED;
    wr.sg_list[0]         = {slice->srcVa, slice->length, slice->lkey};
    wr.wr.rdma            = {slice->dstVa, slice->rkey};
    slice->qpDepth        = &qp_depth[qp_idx];      // 用于 poll 后回扣
    
  4. ibv_post_send 失败时:直接 slice->markFailed() 并把它的完成计入 processed_slice_count_不做 retry,不回 failed 列表。task 的 doneSliceCount 自动累加,XferStatusisAllDone() 后清理(成功失败一视同仁)。

不引入失败 slice 回收路径。容错策略:失败 → 计数 → 上层感知。后续要做重传也由上层重新提交一个新的 task,不在 worker pool 内做循环重试。

3.5 CQ 拓扑与 QP 分片

CQ 拓扑:per-engine 单 CQ(绑定到本 GPU 最近 NIC)

本 engine 的全部 conn(每 conn FLAGCX_P2P_QPS_PER_CONN 个 QP)
    conn0.qp0 ─┐
    conn0.qp1  │
    conn0.qp2  │  全部挂到
    conn0.qp3  │  WorkerPool.shared_cq_  ← ibv_create_cq(ctx, kSharedCqDepth)
    conn1.qp0  │
    ...        ┘
  • kSharedCqDepth 参照 Mooncake 的大小,env FLAGCX_P2P_CQ_DEPTH 可调。
  • 建 conn 时 flagcxP2pAccept/Connectengine->workerPool->getSharedCq() 拿到 CQ,传给 ibv_create_qp。CQ 生命周期归 pool,conn 拆时不 destroy。
  • 当前 ibrc_p2p_adaptor.ccchannels[i].cq = ibv_create_cq(...) 这行删掉;同时把 struct flagcxP2pChannel channels[FLAGCX_P2P_QPS_PER_CONN] 重命名为 std::vector<ibv_qp*> qp_list_(与 Mooncake RdmaEndPoint::qp_list_ 对齐),删去 channel 概念——本来 channel 里就只剩 QP 一个有效字段,CQ 已上移到 pool。

QP → worker 静态分片(round-robin)

registerQp(qp) 内部:
  int slot = qpRegisterCounter_.fetch_add(1) % transferWorkerCount_;
  qpShards_[slot].push_back(qp);
  • 4 QP + 2 worker → worker0: {qp0, qp2};worker1: {qp1, qp3}
  • 4 QP + 4 worker → worker_i: {qp_i}(1:1)
  • QP count 和 worker count 由两个独立 env 控制:
    • FLAGCX_P2P_QPS_PER_CONN(默认 2)
    • FLAGCX_P2P_WORKERS_PER_POOL(默认 2)
  • unregisterQp:从对应 shard 中 erase,无需重平衡(残余 QP 仍均匀分布)。

3.6 迁移步骤(1→2→3,每步可编可跑)

Step 0:CQ 共享化(topology only)

  • WorkerPool 空壳:只加 ibv_create_cq + getSharedCq() + registerQp/unregisterQp,不起 worker 线程。
  • flagcxP2pAccept/Connect 内建 QP 时改从 pool 拿 CQ,不再自建;channels[i].cq 不 destroy。channels[] 数组同步重命名为 qp_list_(对齐 Mooncake RdmaEndPoint::qp_list_),删除 flagcxP2pChannel 结构体。
  • gCqPoller 仍在,注册时从「N×4 个 per-QP CQ」减到「所有 ibDev 的 shared CQ」(数量大幅减少)。
  • 验证:nixl/perf test 行为不变,只是 CQ 拓扑已换。

Step 1:slice 签名

  • flagcxP2pIputBatch/IgetBatch 内部:wr.wr_id = (uintptr_t)slice
  • gCqPoller 的 poll 回调:拿到 wr_id 后调 slice->markSuccess/markFailedprocessed_slice_count_ 暂不在这里计(worker pool 还没起)。
  • gAsyncWorker 在调用 iputBatch/igetBatch 前先 build slice + attach task。
  • 验证:nixl test / perf test 通过;transfer 完成路径走 slice 计数。

Step 2:引入 WorkerPool 接管 post + poll

  • WorkerPool ctor 起 N 个 transferWorker。
  • WriteVector/ReadVector 改:build slice → pool->submitPostSend(),不再走 gAsyncWorker
  • 停止向 gCqPoller 注册新 CQ(shared CQ 改由 transferWorker 自己 poll);gCqPoller 空转直到 entries 清空。
  • 验证:worker pool 真正 drain 流量;FLAGCX_DEBUG=INFO 下可见 worker poll 日志。

Step 3:清场

  • gCqPoller / cqPollerFunc / ensureCqPollerStarted / cqPollerRegister/Unregister/Stop
  • gAsyncWorker / AsyncTransferTask / asyncWorkerFunc
  • 验证:grep -r gCqPoller flagcx/ 返回空;grep -r gAsyncWorker flagcx/ 返回空。

3.7 全局配置 FlagcxP2pGlobalConfig(对齐 Mooncake globalConfig()

目标:把当前散落在 ibrc_p2p_adaptor.cc 顶部的 #defineFLAGCX_P2P_QPS_PER_CONN / FLAGCX_P2P_MAX_REQUESTS / FLAGCX_P2P_BATCH_POLL_SIZE / FLAGCX_P2P_IGET_BATCH_MAX_WR / FLAGCX_P2P_READ_BATCH_WINDOW)、设计文档里提到但还没落地的 env(FLAGCX_P2P_WORKERS_PER_POOL / FLAGCX_P2P_CQ_DEPTH / slice/fragment)、以及 IB QP attr 一起收拢成一个全局结构 + 一个 once_flag 懒加载,与 mooncake::globalConfig() 形状对齐;env 走 FlagCX 现成的 FLAGCX_PARAM 宏(带原子缓存、统一 FLAGCX_ 前缀、env file 透传)。

(a) 放置位置 — 结构体声明落在 flagcx_p2p.h

内容文件备注
struct FlagcxP2pGlobalConfig(POD)flagcx/include/flagcx_p2p.hFlagcxP2pRdmaDesc 等已有公共结构同级;不引入 <infiniband/verbs.h>,MTU 用裸 int 表示
flagcxP2pGlobalConfig() 访问器声明flagcx/include/flagcx_p2p.hconst FlagcxP2pGlobalConfig& flagcxP2pGlobalConfig();
flagcxP2pDumpGlobalConfig() 声明flagcx/include/flagcx_p2p.hINFO 日志一次性 dump,便于排障
flagcxP2pClampToDeviceLimits() 声明flagcx/include/flagcx_p2p.h入参全 plain int(max_qp_wr / max_sge / max_cqe),verbs 不进 public 头
所有 FLAGCX_PARAM(...) + loadGlobalConfig + 单例flagcx/core/flagcx_p2p.cc 顶部 anonymous ns不另立 flagcx_p2p_config.cc;与 gXferMap 等已有 file-local 状态同位置
ibv_mtu 转换(mtuFromIntflagcx/core/flagcx_p2p.cc仅在内部使用;adaptor 读 c.mtu_length 是 int,建 QP 前自己转 ibv_mtu

(b) 结构体(写进 flagcx_p2p.h

/* ------------------------------------------------------------------ */
/*  Global runtime configuration                                      */
/*  All fields env-overridable via FLAGCX_P2P_* (see flagcx_p2p.cc).  */
/*  Lazy-loaded once on first flagcxP2pGlobalConfig() call.           */
/* ------------------------------------------------------------------ */
struct FlagcxP2pGlobalConfig {
  /* —— Worker pool / QP 拓扑 —— */
  int    qpsPerConn         = 4;        // FLAGCX_P2P_QPS_PER_CONN
  int    workersPerPool     = 2;        // FLAGCX_P2P_WORKERS_PER_POOL
  int    shardCount         = 8;        // FLAGCX_P2P_SHARD_COUNT
 
  /* —— CQ / WR / 完成队列 —— */
  size_t sharedCqDepth      = 4096;     // FLAGCX_P2P_CQ_DEPTH
  size_t maxWrPerPost       = 64;       // FLAGCX_P2P_MAX_WR_PER_POST
  size_t maxRequests        = 256;      // FLAGCX_P2P_MAX_REQUESTS
  size_t batchPollSize      = 32;       // FLAGCX_P2P_BATCH_POLL_SIZE
  size_t readBatchWindow    = 8;        // FLAGCX_P2P_READ_BATCH_WINDOW
 
  /* —— Slice 切片策略 —— */
  size_t sliceSize          = 64 * 1024; // FLAGCX_P2P_SLICE_SIZE
  size_t fragmentLimit      =  4 * 1024; // FLAGCX_P2P_FRAGMENT_LIMIT
 
  /* —— IB QP attr(用 plain int,避免引入 verbs 到 public 头)—— */
  size_t  maxSge            = 4;        // FLAGCX_P2P_MAX_SGE
  size_t  maxInline         = 64;       // FLAGCX_P2P_MAX_INLINE
  uint8_t ibPort            = 1;        // FLAGCX_P2P_IB_PORT
  int     gidIndex          = -1;       // FLAGCX_P2P_GID_INDEX
  int     mtuLength         = 4096;     // FLAGCX_P2P_MTU (512/1024/2048/4096)
  int     ibTrafficClass    = -1;       // FLAGCX_P2P_IB_TC
  int     retryCnt          = 7;        // FLAGCX_P2P_RETRY_CNT
 
  /* —— Notification —— */
  int     notifMaxPeers     = 64;       // FLAGCX_P2P_NOTIF_MAX_PEERS
 
  /* —— 杂项 —— */
  bool    enableDestDeviceAffinity = false;  // FLAGCX_P2P_DEST_DEV_AFFINITY
};
 
/* 等价于 mooncake::globalConfig();首次调用懒加载 + std::once_flag 保护 */
const FlagcxP2pGlobalConfig& flagcxP2pGlobalConfig();
 
/* INFO 日志 dump 一次(loadGlobalConfig 末尾自动调用,也可外部主动调) */
void flagcxP2pDumpGlobalConfig();
 
/* 设备能力夹紧:把 maxWrPerPost / maxSge / sharedCqDepth 限到设备实际值。
   入参全 plain int,verbs 头不进 public header。
   adaptor 在 ibv_query_device 后调用一次。 */
void flagcxP2pClampToDeviceLimits(uint32_t maxQpWr,
                                  uint32_t maxSge,
                                  uint32_t maxCqe,
                                  uint32_t maxQp);

(c) 加载实现(写在 flagcx_p2p.cc 顶部 anonymous namespace)

与 §1 引用的 mooncake loadGlobalConfig 同形:每个字段一行 FLAGCX_PARAM,loader 只做 narrowing + 范围检查 + 写结构体。

namespace {
 
FLAGCX_PARAM(P2pQpsPerConn,        "P2P_QPS_PER_CONN",        4);
FLAGCX_PARAM(P2pWorkersPerPool,    "P2P_WORKERS_PER_POOL",    2);
FLAGCX_PARAM(P2pShardCount,        "P2P_SHARD_COUNT",         8);
FLAGCX_PARAM(P2pCqDepth,           "P2P_CQ_DEPTH",            4096);
FLAGCX_PARAM(P2pMaxWrPerPost,      "P2P_MAX_WR_PER_POST",     64);
FLAGCX_PARAM(P2pMaxRequests,       "P2P_MAX_REQUESTS",        256);
FLAGCX_PARAM(P2pBatchPollSize,     "P2P_BATCH_POLL_SIZE",     32);
FLAGCX_PARAM(P2pReadBatchWindow,   "P2P_READ_BATCH_WINDOW",   8);
FLAGCX_PARAM(P2pSliceSize,         "P2P_SLICE_SIZE",          64 * 1024);
FLAGCX_PARAM(P2pFragmentLimit,     "P2P_FRAGMENT_LIMIT",       4 * 1024);
FLAGCX_PARAM(P2pMaxSge,            "P2P_MAX_SGE",             4);
FLAGCX_PARAM(P2pMaxInline,         "P2P_MAX_INLINE",          64);
FLAGCX_PARAM(P2pIbPort,            "P2P_IB_PORT",             1);
FLAGCX_PARAM(P2pGidIndex,          "P2P_GID_INDEX",          -1);
FLAGCX_PARAM(P2pMtu,               "P2P_MTU",                 4096);
FLAGCX_PARAM(P2pIbTc,              "P2P_IB_TC",              -1);
FLAGCX_PARAM(P2pRetryCnt,          "P2P_RETRY_CNT",           7);
FLAGCX_PARAM(P2pNotifMaxPeers,     "P2P_NOTIF_MAX_PEERS",     64);
FLAGCX_PARAM(P2pDestDevAffinity,   "P2P_DEST_DEV_AFFINITY",   0);
 
template <typename T>
inline T clampParam(int64_t v, T lo, T hi, T deft, const char* name) {
  if (v < (int64_t)lo || v > (int64_t)hi) {
    INFO(FLAGCX_INIT,
         "Ignore FLAGCX_%s=%lld (out of [%lld,%lld]), use default %lld",
         name, (long long)v, (long long)lo, (long long)hi, (long long)deft);
    return deft;
  }
  return (T)v;
}
 
void loadGlobalConfig(FlagcxP2pGlobalConfig& c) {
  c.qpsPerConn        = clampParam(flagcxParamP2pQpsPerConn(),       1, 32, 4,  "P2P_QPS_PER_CONN");
  c.workersPerPool    = clampParam(flagcxParamP2pWorkersPerPool(),   1,  8, 2,  "P2P_WORKERS_PER_POOL");
  c.shardCount        = clampParam(flagcxParamP2pShardCount(),       1, 64, 8,  "P2P_SHARD_COUNT");
  c.sharedCqDepth     = clampParam<size_t>(flagcxParamP2pCqDepth(),  1, 1<<20, 4096, "P2P_CQ_DEPTH");
  c.maxWrPerPost      = clampParam<size_t>(flagcxParamP2pMaxWrPerPost(), 1, 1024,  64,  "P2P_MAX_WR_PER_POST");
  c.maxRequests       = clampParam<size_t>(flagcxParamP2pMaxRequests(),  1, 1<<16, 256, "P2P_MAX_REQUESTS");
  c.batchPollSize     = clampParam<size_t>(flagcxParamP2pBatchPollSize(), 1, 256,  32,  "P2P_BATCH_POLL_SIZE");
  c.readBatchWindow   = clampParam<size_t>(flagcxParamP2pReadBatchWindow(), 1, 256, 8,  "P2P_READ_BATCH_WINDOW");
  c.sliceSize         = clampParam<size_t>(flagcxParamP2pSliceSize(),   1<<10, 1<<26, 64*1024, "P2P_SLICE_SIZE");
  c.fragmentLimit     = clampParam<size_t>(flagcxParamP2pFragmentLimit(), 0, c.sliceSize, 4*1024, "P2P_FRAGMENT_LIMIT");
  c.maxSge            = clampParam<size_t>(flagcxParamP2pMaxSge(),    1, 32, 4,   "P2P_MAX_SGE");
  c.maxInline         = clampParam<size_t>(flagcxParamP2pMaxInline(), 0, 1024, 64, "P2P_MAX_INLINE");
  c.ibPort            = clampParam<uint8_t>(flagcxParamP2pIbPort(),   1, 255, 1,  "P2P_IB_PORT");
  c.gidIndex          = clampParam<int>(flagcxParamP2pGidIndex(),    -1, 255, -1, "P2P_GID_INDEX");
  {
    int64_t mv = flagcxParamP2pMtu();
    if (mv == 512 || mv == 1024 || mv == 2048 || mv == 4096) c.mtuLength = (int)mv;
    else { WARN("FLAGCX_P2P_MTU=%lld invalid, fallback 4096", (long long)mv); c.mtuLength = 4096; }
  }
  c.ibTrafficClass    = clampParam<int>(flagcxParamP2pIbTc(),       -1, 255, -1, "P2P_IB_TC");
  c.retryCnt          = clampParam<int>(flagcxParamP2pRetryCnt(),    0,   7, 7,  "P2P_RETRY_CNT");
  c.notifMaxPeers     = clampParam<int>(flagcxParamP2pNotifMaxPeers(), 1, 1024, 64, "P2P_NOTIF_MAX_PEERS");
  c.enableDestDeviceAffinity = (flagcxParamP2pDestDevAffinity() != 0);
}
 
FlagcxP2pGlobalConfig& mutableGlobalConfig() {
  static FlagcxP2pGlobalConfig cfg;
  static std::once_flag once;
  std::call_once(once, [] { loadGlobalConfig(cfg); flagcxP2pDumpGlobalConfig(); });
  return cfg;
}
 
}  // namespace
 
const FlagcxP2pGlobalConfig& flagcxP2pGlobalConfig() {
  return mutableGlobalConfig();
}
 
void flagcxP2pClampToDeviceLimits(uint32_t maxQpWr, uint32_t maxSge,
                                  uint32_t maxCqe, uint32_t /*maxQp*/) {
  auto& c = mutableGlobalConfig();
  if (c.maxWrPerPost  > (size_t)maxQpWr) c.maxWrPerPost  = maxQpWr;
  if (c.maxSge        > (size_t)maxSge)  c.maxSge        = maxSge;
  if (c.sharedCqDepth > (size_t)maxCqe)  c.sharedCqDepth = maxCqe;
}
 
void flagcxP2pDumpGlobalConfig() {
  const auto& c = flagcxP2pGlobalConfig();
  INFO(FLAGCX_INIT, "=== FlagCX P2P GlobalConfig ===");
  INFO(FLAGCX_INIT, "qpsPerConn=%d workersPerPool=%d shardCount=%d",
       c.qpsPerConn, c.workersPerPool, c.shardCount);
  INFO(FLAGCX_INIT, "sharedCqDepth=%zu maxWrPerPost=%zu maxRequests=%zu "
                    "batchPollSize=%zu readBatchWindow=%zu",
       c.sharedCqDepth, c.maxWrPerPost, c.maxRequests,
       c.batchPollSize, c.readBatchWindow);
  INFO(FLAGCX_INIT, "sliceSize=%zu fragmentLimit=%zu", c.sliceSize, c.fragmentLimit);
  INFO(FLAGCX_INIT, "ibPort=%u gidIndex=%d mtu=%d tc=%d retry=%d "
                    "maxSge=%zu maxInline=%zu",
       (unsigned)c.ibPort, c.gidIndex, c.mtuLength,
       c.ibTrafficClass, c.retryCnt, c.maxSge, c.maxInline);
  INFO(FLAGCX_INIT, "notifMaxPeers=%d destDevAffinity=%d",
       c.notifMaxPeers, (int)c.enableDestDeviceAffinity);
}

(d) env 变量总表(FLAGCX_ 前缀由宏自动加)

env默认范围作用
FLAGCX_P2P_QPS_PER_CONN4[1,32]每条 conn 上 QP 数;与 worker 数共同决定 QP→worker 静态分片
FLAGCX_P2P_WORKERS_PER_POOL2[1,8]每个 WorkerPool 起的 transferWorker 线程数
FLAGCX_P2P_SHARD_COUNT8[1,64]submit 侧 sharded slice queue 桶数
FLAGCX_P2P_CQ_DEPTH4096[1,1M]共享 CQ ibv_create_cq 深度
FLAGCX_P2P_MAX_WR_PER_POST64[1,1024]单次链式 ibv_post_send 最大 WR 数
FLAGCX_P2P_MAX_REQUESTS256[1,65536]每 conn 待完成 request 上限
FLAGCX_P2P_BATCH_POLL_SIZE32[1,256]每次 ibv_poll_cq 一次拿走的 WC 数
FLAGCX_P2P_READ_BATCH_WINDOW8[1,256]READ 路径攒批窗口
FLAGCX_P2P_SLICE_SIZE64K[1K,64M]FlagcxSlicePolicy::kBlockSize(connector 路径切片粒度)
FLAGCX_P2P_FRAGMENT_LIMIT4K[0, sliceSize]末尾合并阈值;剩余 ≤ slice+frag 时合并不二切
FLAGCX_P2P_MAX_SGE4[1,32]QP cap.max_send_sge
FLAGCX_P2P_MAX_INLINE64[0,1024]QP cap.max_inline_data
FLAGCX_P2P_IB_PORT1[1,255]IB port number
FLAGCX_P2P_GID_INDEX-1(auto)[-1,255]RoCE GID index;-1 走自动选
FLAGCX_P2P_MTU4096{512,1024,2048,4096}path MTU
FLAGCX_P2P_IB_TC-1(off)[-1,255]IB traffic class;-1 不设置
FLAGCX_P2P_RETRY_CNT7[0,7]QP retry count
FLAGCX_P2P_NOTIF_MAX_PEERS64[1,1024]epoll notif 监听 peer 上限
FLAGCX_P2P_DEST_DEV_AFFINITY0{0,1}是否启用目的设备亲和路由(占位,对应后续待办 §6.1)

(e) 调用侧改造对照表

旧(编译期常量)新(运行时读 config)
for (int i=0; i<FLAGCX_P2P_QPS_PER_CONN; ++i)auto& C = flagcxP2pGlobalConfig(); for (int i=0; i<C.qpsPerConn; ++i)
struct flagcxP2pChannel channels[FLAGCX_P2P_QPS_PER_CONN]std::vector<ibv_qp*> qp_list_(C.qpsPerConn)(设计文档 §3.5 已要求改名为 qp_list_
struct flagcxP2pRequest reqs[FLAGCX_P2P_MAX_REQUESTS]std::unique_ptr<flagcxP2pRequest[]> reqs(new flagcxP2pRequest[C.maxRequests])
struct ibv_wc wcs[FLAGCX_P2P_BATCH_POLL_SIZE]std::array<ibv_wc, 256> wcs; + 实际 poll 数取 min(C.batchPollSize, 256)(栈数组取上界)
ibv_create_cq(ctx, kSharedCqDepth, ...)ibv_create_cq(ctx, C.sharedCqDepth, ...)
slice_queue_[8] / slice_queue_count_[8] / slice_queue_lock_[8]全部 std::vector 化,构造时 resize(C.shardCount);hash 路由 % C.shardCount
ibv_query_device 后无操作flagcxP2pClampToDeviceLimits(d.max_qp_wr, d.max_sge, d.max_cqe, d.max_qp);
static_assert(2 * FLAGCX_P2P_IGET_BATCH_MAX_WR <= 2 * MAX_REQUESTS)改为运行期 check:assert(2 * C.maxWrPerPost <= 2 * C.maxRequests) 在 pool ctor 头部

⚠️ 避坑:原有 static_assert 是编译期常量约束,迁到运行时配置后必须在 pool 构造时同步 assert,否则用户给出非法组合会沉默崩。

(f) 与 mooncake 写法的对照(一句话总览)

维度mooncake本方案
env 解析方式loadGlobalConfig 手写 getenv+atoi+范围每个字段一行 FLAGCX_PARAM(...);宏负责原子缓存、env file、FLAGCX_ 前缀
入口形状globalConfig() + once_flag完全一致(flagcxP2pGlobalConfig()
命名空间mooncake::文件作用域(flagcx_p2p.h 不带 namespace)
设备能力夹紧updateGlobalConfig(ibv_device_attr&)flagcxP2pClampToDeviceLimits(maxQpWr,maxSge,maxCqe,maxQp),verbs 不进 public 头
日志glogFlagCX 自有 INFO(FLAGCX_INIT, ...) / WARN(...)
MTU 表示ibv_mtu 枚举int (512/1024/2048/4096),ibv_mtu 转换在 adaptor 内部

(g) 加载时机 / 线程安全

  • 首次 flagcxP2pGlobalConfig() 调用触发懒加载;std::call_once 保证只装一次。
  • 推荐在 flagcxP2pEngineCreate 第一行调用一次(在建 WorkerPool 之前),把”读 env”的开销和日志都集中在 engine 创建路径。
  • flagcxP2pClampToDeviceLimits唯一允许写的接口,仅在 IB context 初始化路径调一次;之后所有读路径走 const& 不需要锁。
  • 不暴露任何 setter;要改值只能改 env + 重启。

4. 第三步:flagcx connector 改造

参考:3. flagcx ibrc p2p RPC 服务 + flagcx connector 改动


5. 文件改动清单

路径变更类型说明
flagcx/include/flagcx_p2p.h增量FlagcxTransferTask / FlagcxSlice / 两个 Policy / buildSlices<> 声明 / flagcxP2pEngineReg 增可选出参 / WriteVector 增 policyKind 入参;新增 FlagcxP2pGlobalConfig 结构 + flagcxP2pGlobalConfig() / flagcxP2pDumpGlobalConfig() / flagcxP2pClampToDeviceLimits() 三个函数声明(详见 §3.7)
flagcx/core/flagcx_p2p.cc重构gAsyncWorker + AsyncTransferTask;加 FlagcxWorkerPool(cc-only);engineCreate 内 eager 建 pool;改 ReadVector / WriteVector 走 slice + WorkerPool;改 XferStatus;改 Reg 出参;顶部 anonymous ns 加 19 个 FLAGCX_PARAM(P2pXxx, "P2P_XXX", ...) + loadGlobalConfig + mutableGlobalConfig 单例(详见 §3.7);engineCreate 第一行调一次 flagcxP2pGlobalConfig() 触发懒加载
flagcx/adaptor/net/ibrc_p2p_adaptor.cc改造Step 0:channels[i].cq 改从 pool 拿(不再自建/自毁),channels[] 重命名为 qp_list_(对齐 Mooncake),删除 flagcxP2pChannel 结构体;Step 1:igetBatch/iputBatch 改 slice 签名;Step 3:删 gCqPoller 相关全部代码;vtable iputBatch 槽填上;删除文件顶部全部 #define FLAGCX_P2P_QPS_PER_CONN/MAX_REQUESTS/BATCH_POLL_SIZE/IGET_BATCH_MAX_WR/READ_BATCH_WINDOW,引用全部替换为 flagcxP2pGlobalConfig().xxx;静态数组(channels[K]/reqs[K])改运行时容量;编译期 static_assert 改为 pool ctor 内运行时 assert;ibv_query_device 后调 flagcxP2pClampToDeviceLimits(...) 一次
plugin/interservice/flagcx_wrapper.py增量加 4 类(engine / RPC / register / transfer)共 7 个新 ctypes 绑定;旧 flagcxOneSideRegister / flagcxBatchPut 保留兼容老 caller
connector Python 业务代码替换flagcxOneSideRegister + flagcxBatchPut 切换过来

6 nixl backend会用的 flagcx 接口

NIXL 函数调用的 FlagCX 接口 / 类型用途
nixlFlagcxEngine::nixlFlagcxEngine()flagcxP2pEngineCreate()创建 P2P engine
nixlFlagcxEngine::~nixlFlagcxEngine()flagcxP2pEngineStopAccept()停止 accept
nixlFlagcxEngine::~nixlFlagcxEngine()flagcxP2pEngineMrDestroy()清理已注册 MR
nixlFlagcxEngine::~nixlFlagcxEngine()flagcxP2pEngineConnDestroy()清理连接
nixlFlagcxEngine::~nixlFlagcxEngine()flagcxP2pEngineDestroy()销毁 engine
nixlFlagcxEngine::startListener()flagcxP2pEngineAccept()被动接受连接
nixlFlagcxEngine::startListener()flagcxP2pEngineStartListener()启动连接监听
nixlFlagcxEngine::getConnInfo()flagcxP2pEngineGetMetadata()导出本端连接信息
nixlFlagcxEngine::loadRemoteConnInfo()flagcxP2pEngineConnect()主动连接远端
nixlFlagcxEngine::loadRemoteConnInfo()flagcxP2pEngineStartListener()启动连接监听
nixlFlagcxEngine::disconnect()flagcxP2pEngineConnDestroy()主动断开连接
nixlFlagcxEngine::registerMem()flagcxP2pEngineReg()注册本地内存
nixlFlagcxEngine::registerMem()flagcxP2pEnginePrepareDesc()生成本地内存的 remote desc
nixlFlagcxEngine::registerMem()flagcxP2pEngineMrDestroy()desc 生成失败时回滚 MR
nixlFlagcxEngine::deregisterMem()flagcxP2pEngineMrDestroy()注销本地 MR
nixlFlagcxEngine::prepXfer()flagcxP2pDeserializeRdmaDesc()解析远端 desc
nixlFlagcxEngine::prepXfer()flagcxP2pEngineUpdateDesc()更新远端 addr/size
nixlFlagcxEngine::postXfer()flagcxP2pEngineReadVector()发起 NIXL_READ
nixlFlagcxEngine::postXfer()flagcxP2pEngineWriteVector()发起 NIXL_WRITE
nixlFlagcxEngine::checkXfer()flagcxP2pEngineXferStatus()查询传输完成
nixlFlagcxEngine::checkXfer()flagcxP2pEngineSendNotif()传输完成后发送通知
nixlFlagcxEngine::getNotifs()flagcxP2pEngineGetNotifs()获取收到的通知
nixlFlagcxEngine::genNotif()flagcxP2pEngineSendNotif()主动发送通知