目标:在 FlagCX 内构建一套统一的、多后端多线程的 IBRC P2P 传输栈,兼容 nixl backend 的 batchGet 与 flagcx connector 的 batchPut 调用。
1. 背景
1.1 两类上层调用
┌────────────────────────────────────────────────────────────────────┐
│ nixl backend (C++ plugin) │
│ postXfer(local_descs, remote_descs) │
│ └─ 已切好的 IOV 列表(每个 desc 由 connector 内 │
│ block_offset = block_id * block_len_per_layer 计算得到) │
│ └─ 每个 IOV 对应一次独立 RDMA READ │
└──────────────────────────────────┬─────────────────────────────────┘
│ flagcxP2pEngineReadVector(...)
▼
┌────────────────────────────────────────────────────────────────────┐
│ mooncake/flagcx connector (Python wrapper) │
│ flagcxBatchPut(comm, peer, src_offs, dst_offs, sizes, │
│ src_mrs, dst_mrs, count) │
│ └─ 已通过 group_concurrent_contiguous() 把多个连续 block 拼接为 │
│ 较大的 entry,每个 entry 仍可能 >> 64K │
│ └─ 每个 entry 需要进一步切成 64K slice 后再下发 │
└───────────────────────────────────────────────────────────────────┘
1.2 各后端当前丢下来的数据
(a) nixl backend — 一个 desc 对应一个 IOV
postXfer 中每个 local[i] / remote[i] 已是上层切好的最小单位。prepXfer 阶段把远端描述符反序列化为 FlagcxP2pRdmaDesc:
AsyncTransferTask ← 一次 ReadVector/WriteVector 调用
conn: FlagcxP2pConn* ← 连接句柄(含 sendComm/QP)
op: ASYNC_XFER_READ | ASYNC_XFER_WRITE
numIovs: int ← iov 总数
dataVec: vector<void*> ← 本地目标地址[numIovs]
sizeVec: vector<size_t> ← 每个 iov 的字节数[numIovs]
descs: vector<FlagcxP2pRdmaDesc> ← 远端描述符[numIovs]
└── FlagcxP2pRdmaDesc
addr: uint64_t ← 远端内存虚拟地址
size: uint32_t
rkey: uint32_t ← 远端 MR 的 rkey
localEntries: vector<FlagcxP2pMemRegEntry> ← 本地 MR 查表结果[numIovs]
└── FlagcxP2pMemRegEntry
mhandle: void* ← 指向 FlagcxP2pMrHandleView
└── FlagcxP2pMrHandleView
baseVa: uintptr_t ← 本地 MR 基址
lkey: uint32_t ← 本地 lkey
rkey: uint32_t
ibDevN: int ← 绑定的 IB 设备号
ibDevN: int ← 校验用
done: atomic<bool> ← 完成标志(worker 写,调用方轮询)
result: atomic<int> ← 0 成功 / -1 失败进入 flagcxP2pEngineReadVector 时持有:
| 字段 | 来源 | 含义 |
|---|---|---|
dstVec[i] | local[i].addr | 本地 VA(READ 目标) |
sizeVec[i] | local[i].len == remote[i].len | 长度(已 ≤ block 粒度) |
mrIds[i] | lmd->mr_id | 本地 MR ID → lkey |
descs[i] | FlagcxP2pRdmaDesc | 远端 (addr, rkey) |
特征:上层已切好,IOV size 通常等于 KV cache 单 block 字节数(数 KB ~ 数百 KB),不需要再切。
(b) mooncake backend — 已 group 后的 entry,在 entry 上继续 64k 切
BatchDesc ← 一次 batch_transfer_sync_write 调用
└── task_list: vector<TransferTask> ← 每个 (src_ptr, dst_ptr, length) entry 一个 task
└── TransferRequest ← task 持有原始请求 { source, target_id, target_offset, length }
└── slice_list: vector<Slice*> ← task 被切成多个 64KB Slice,每个 Slice = 1 个 RDMA WR
├── source_addr ← 本地源地址(TransferRequest.source + offset)
├── length ← 最多 64KB
└── rdma.dest_addr ← 远端目标地址
└── rdma.dest_rkey ← 远端 rkey(在 submitPostSend 时填入)1.3 总图
2. 第一步:统一的 Slice 数据结构
2.1 核心数据结构
-
为什么还要 FlagcxSlice? desc 是 nixl 内序列化和反序列化的结构体,我没法直接拓展完放在 flagcx connector 内使用。
-
为什么现在的线程不行? 现在的视角看 只有一个 asyncWorker 线程去下 wr,Mooncake 有 2 个,ucx 有 4 个去下 wr。见: https://zhuanlan.zhihu.com/p/1939988652114580803 mooncake 作者自己也说到了他们验证过后要么提高 batchSize 要么提高 task submission thread 数量。所以除了线性拓展现在的asyncWorker 数量还要兼容 flagcx connector。
// 前向声明(FlagcxSlice 与 FlagcxTransferTask 互引)
struct FlagcxSlice;
// ============================================================================
// 1) 任务:聚合多个 slice 的完成状态
// flagcxP2pEngineXferStatus(xferId) 直接查这个结构的计数
// ============================================================================
struct FlagcxTransferTask {
// —— 核心计数(slice 完成时原子增长)——————————————————————
std::atomic<uint64_t> sliceCount{0}; // buildSlices 期间累加
std::atomic<uint64_t> doneSliceCount{0}; // markSuccess + markFailed 共用
// —— Slice 容器(只在 buildSlices 阶段写入;XferStatus 命中后批量 delete)—
std::vector<FlagcxSlice*> sliceList;
inline bool isAllDone() const {
const auto total = sliceCount.load(std::memory_order_acquire);
const auto done = doneSliceCount.load(std::memory_order_acquire);
return total > 0 && done >= total;
}
};
// ============================================================================
// 2) Slice 主体
// sizeof ≈ 52B + std::string (peerNicPath)
//
// 说明:FlagCX 是 GPU↔最近 NIC 亲和模型,每个 engine 只有一个 WorkerPool
// (绑定到 GPU 的最近 NIC)。slice 不需要带本地 ibDevN——它隐含在「我属于
// 这个 pool」里。peerNicPath 仍然要带,因为它是 shard hash key 和远端
// endpoint 查找 key。
// ============================================================================
struct FlagcxSlice {
uint64_t srcVa; // WRITE: local src VA; READ: local dst VA
uint64_t dstVa; // WRITE: remote dst VA; READ: remote src VA
uint32_t length; // 单次 WR 字节数(≤ blockSize,由 buildSlices 决定)
uint32_t lkey; // 本地 MR lkey
uint32_t rkey; // 远端 MR rkey
uint8_t opcode; // IBV_WR_RDMA_READ / IBV_WR_RDMA_WRITE
// ─── 路由 ─────────────────────────────────────────────────────────
std::string peerNicPath; // shard hash key + adaptorGetEndpoint key
// ─── 完成追踪 ───────────────────────────────────────────────────
FlagcxTransferTask *task{nullptr}; // 反指 task(计数聚合)
// ─── QP 深度(worker 一对多 QP 时用)──────────────────────────────
// 单 worker 拥有多 QP 时,adaptor 在 ibv_post_send 后 ++(*qpDepth),
// poll 完成后 --(*qpDepth),让 worker 在 post 时按 QP 深度做 round-robin
volatile int *qpDepth{nullptr};
// ─── 完成回调(任务级聚合)────────────────────────────────────────
inline void markSuccess() {
task->doneSliceCount.fetch_add(1, std::memory_order_relaxed);
}
inline void markFailed() {
task->doneSliceCount.fetch_add(1, std::memory_order_relaxed);
}
};
// ============================================================================
// 3) 变化图
// sizeof ≈ 64B + std::string (peerNicPath)
// ============================================================================
AsyncTransferTask FlagcxTransferTask
├─ conn ─────────► ├─ (conn 不再放 task 里;slice 内部携带
├─ op │ peerNicPath 做 shard hash + endpoint 路由)
├─ numIovs (== iov 数) ─────────►├─ sliceCount (== slice 数,可能 > iov 数)
├─ dataVec[] ┐ │
├─ sizeVec[] ├─ 三个并行 vector,按 i ─►├─ sliceList[]
├─ descs[] │ 对齐表示一个 IOV │ 每个 entry 是一个 *FlagcxSlice
├─ localEntries[] ┘ │
├─ done ─────────►├─ isAllDone()
└─ result ─────────►└─ doneSliceCount
FlagcxP2pRdmaDesc (wire, 64B, 不动) FlagcxSlice (runtime, ~52B + str, 新增)
├─ srcVa / dstVa / length / lkey / rkey / opcode
├─ peerNicPath
└─ task* / qpDepth*2.3 Template Policy:兼容两种切法
// ─── Policy A:nixl 路径,1 desc = 1 slice,不再二次切 ─────────────────
// 语义:保持 nixl block 粒度,便于在 4 个 QP 上并行打到 4 个 desc
struct NixlSlicePolicy {
static constexpr bool kFurtherCut = false;
static constexpr size_t kBlockSize = SIZE_MAX;
static constexpr size_t kFragmentSize = 0;
};
// ─── Policy B:connector 路径,按 64K 切,末尾合并 ───────────────
// 语义:与 Mooncake submitTransferTask() 同款切法,上层 mooncake 丢下来的可能是连续的几个 block
struct FlagcxSlicePolicy {
static constexpr bool kFurtherCut = true;
static constexpr size_t kBlockSize = 64 * 1024;
static constexpr size_t kFragmentSize = 4 * 1024;
};2.4 切片接口
// 唯一的 slice 构造接口(声明在 flagcx_p2p.h,实现在 flagcx_p2p.cc)
//
// 责任:把一段「绝对化的 (srcVa, dstVa, length)」切成若干个 FlagcxSlice,
// 挂到 task->sliceList,并 task->sliceCount += N。
// 不负责:建 task、submitPostSend、释放 slice(由 caller / XferStatus 负责)。
template <typename Policy>
void buildSlices(
FlagcxTransferTask *task, // 上层已 new
uint64_t srcVa, // 已绝对化的本地 VA
uint64_t dstVa, // 已绝对化的远端 VA
size_t totalLen, // 待切的总长
uint32_t lkey, // 本地 MR lkey
uint32_t rkey, // 远端 MR rkey
uint8_t opcode, // READ / WRITE
const std::string &peerNicPath); // shard hash + endpoint key
// 实现伪码:
// if constexpr (!Policy::kFurtherCut):
// // 1:1 路径
// FlagcxSlice *s = new FlagcxSlice;
// s->{srcVa, dstVa, length=totalLen, lkey, rkey, opcode,
// peerNicPath, task} = ...
// task->sliceList.push_back(s);
// task->sliceCount.fetch_add(1);
// return;
// else:
// // 64K 切 + 末尾合并
// size_t off = 0;
// while (off < totalLen):
// bool merge = (totalLen - off) <= kBlockSize + kFragmentSize;
// size_t len = merge ? (totalLen - off) : kBlockSize;
// new slice with off-relative srcVa/dstVa, length=len ...
// off += len;
// if (merge) break;2.5 两类调用方 build slice 的入口
flagcxP2pEngineWriteVector 和 flagcxP2pEngineReadVector内部 task 结构补充以上字段即可,然后 ensureAsyncWorkerStarted
(a) nixl 路径(READ / WRITE 都走 NixlSlicePolicy)
// 入口是 flagcxP2pEngineReadVector
auto *task = new FlagcxTransferTask;
for (int i = 0; i < numIovs; ++i) {
auto &le = localEntries[i];
uint32_t lkey = ((FlagcxP2pMrHandleView*)le.mhandle)->lkey;
buildSlices<NixlSlicePolicy>(
task,
/*srcVa=*/ isWrite ? (uint64_t)dstVec[i] : descs[i].addr,
/*dstVa=*/ isWrite ? descs[i].addr : (uint64_t)dstVec[i],
/*totalLen=*/ sizeVec[i],
lkey, descs[i].rkey,
isWrite ? IBV_WR_RDMA_WRITE : IBV_WR_RDMA_READ,
conn->peerNicPath);
}(b) flagcx connector 路径(统一 WRITE,走 FlagcxSlicePolicy)
// 入口是 flagcxP2pEngineWriteVector
auto *task = new FlagcxTransferTask;
for (int i = 0; i < numIovs; ++i) { // entry 粒度
auto &le = localEntries[i];
uint32_t lkey = ((FlagcxP2pMrHandleView*)le.mhandle)->lkey;
buildSlices<FlagcxSlicePolicy>(
task,
/*srcVa=*/ (uint64_t)dstVec[i], // connector Python 已绝对化
/*dstVa=*/ descs[i].addr,
/*totalLen=*/ sizeVec[i],
lkey, descs[i].rkey,
IBV_WR_RDMA_WRITE,
conn->peerNicPath);
}3. 第二步:WorkerPool 设计
只放在 flagcx_p2p.cc 内部,类声明也写在/Users/joker/Desktop/project/baai/FlagCX/flagcx/include/flagcx_p2p.h(不外暴露)。
3.1 架构(per-engine 单 pool)
FlagCX 是 GPU↔最近 NIC 亲和模型:每个进程一个 engine,每个 engine 绑定到 GPU 的最近 NIC,因此只有一个 WorkerPool。所有线程生命周期统一由 WorkerPool 管理,engine 本身不持有任何线程。
FlagcxP2pEngine (per-process / per-GPU)
├─ adaptor (flagcxNetIbP2p)
└─ FlagcxWorkerPool ← 单例,绑到 GPU 最近的 NIC
├─ shared_cq_ ← 本 pool 拥有,所有 conn × QP 共享同一个 CQ
├─ notifWorker ← engine 启动时一并 spawn(仅 nixl 路径用)
├─ transferWorker[0] ─┐
├─ transferWorker[1] │ 循环:performPostSend(本 worker shard)
│ ... │ + performPollCq(shared_cq_)
└─ transferWorker[N-1] ┘ worker 数:env FLAGCX_P2P_WORKERS_PER_POOL,默认 2
无 monitorWorker。
生命周期规则:
flagcxP2pEngineCreate:选定本进程 GPU 的最近 NIC,建一个 WorkerPool;pool ctor 内ibv_create_cq+ spawn N 个 transferWorker + 1 个 notifWorker。engine 不持有任何std::thread。flagcxP2pEngineDestroy:销毁 pool(dtor 退出全部 transferWorker + notifWorker → CQ 销毁)。
未来扩展:如果 GPU 跨多 NIC 平摊(multi-rail),engine 可以持有多个 pool(std::vector<FlagcxWorkerPool>)。slice 加 int poolIdx 做路由即可,目前不做。
3.2 类骨架(写在 flagcx_p2p.cc)
不含任何容错/重试/健康统计字段。失败 slice 直接
markFailed()计数,不 redispatch。
class FlagcxWorkerPool {
public:
FlagcxWorkerPool(int ibDevN, ibv_context *ctx, int numaSocketId,
FlagcxP2pEngine *engine);
// ctor 内:
// shared_cq_ = ibv_create_cq(ctx, kSharedCqDepth, ...);
// spawn N transferWorker + 1 notifWorker
~FlagcxWorkerPool();
// adaptor 建 conn 时调,拿共享 CQ 用于 ibv_create_qp
ibv_cq *getSharedCq() const { return shared_cq_; }
// adaptor 建好 QP 后注册到 pool,pool 按 round-robin 把 qp 归到某个 worker
int registerQp(ibv_qp *qp);
void unregisterQp(ibv_qp *qp);
// 上层提交 slice
int submitPostSend(const std::vector<FlagcxSlice*> &slices);
private:
void transferWorker(int threadId);
void performPostSend(int threadId);
void performPollCq(int threadId);
void notifWorker(); // 逻辑与原 notifPollThreadFunc 相同
// ─── 共享资源 ──────────────────────────────────────────────────────
ibv_cq *shared_cq_;
std::vector<std::vector<ibv_qp*>> qpShards_;
std::atomic<int> qpRegisterCounter_{0};
FlagcxP2pEngine *engine_; // notifWorker 需要访问 engine
// ─── Sharded slice queue(hash(peerNicPath) → shard)────────────────
static constexpr int kShardCount = 8;
using SliceList = std::vector<FlagcxSlice*>;
std::unordered_map<std::string, SliceList> slice_queue_[kShardCount];
std::atomic<uint64_t> slice_queue_count_[kShardCount];
TicketLock slice_queue_lock_[kShardCount];
// ─── 进度计数(决定 worker 是否进 cond_var 休眠)────────────────────
std::atomic<uint64_t> submitted_slice_count_{0};
std::atomic<uint64_t> processed_slice_count_{0};
std::atomic<int> suspended_flag_{0};
std::condition_variable cond_var_;
std::mutex cond_mutex_;
// ─── 线程句柄 ────────────────────────────────────────────────────────
std::atomic<bool> workers_running_{true};
std::vector<std::thread> worker_threads_; // transferWorker[0..N-1] + notifWorker
};3.3 主流程语义
(a) submitPostSend —— 生产侧
1. 按 hash(peerNicPath) % kShardCount 分桶到 8 个 shard
2. 每个 shard 拿一次 TicketLock,整体并入 slice_queue_,
slice_queue_count_[sh].fetch_add(N)
3. submitted_slice_count_.fetch_add(totalN, release)
4. if (suspended_flag_.load(acquire) > 0)
cond_var_.notify_all();
(b) transferWorker
void FlagcxWorkerPool::transferWorker(int thread_id) {
bindToSocket(numa_socket_id_);
const static uint64_t kWaitPeriodInNano = 100000000; // 100ms
uint64_t last_wait_ts = getCurrentTimeInNano();
while (workers_running_.load(std::memory_order_relaxed)) {
auto processed = processed_slice_count_.load(std::memory_order_relaxed);
auto submitted = submitted_slice_count_.load(std::memory_order_relaxed);
if (processed == submitted) {
uint64_t now = getCurrentTimeInNano();
if (now - last_wait_ts > kWaitPeriodInNano) {
std::unique_lock<std::mutex> lock(cond_mutex_);
suspended_flag_.fetch_add(1);
// double-check 防 lost wakeup
if (processed_slice_count_.load(std::memory_order_relaxed) ==
submitted_slice_count_.load(std::memory_order_relaxed)) {
cond_var_.wait_for(lock, std::chrono::seconds(1));
}
suspended_flag_.fetch_sub(1);
last_wait_ts = now;
}
continue;
}
performPostSend(thread_id);
#ifndef USE_FAKE_POST_SEND
performPollCq(thread_id);
#endif
}
}(c) performPostSend —— 按 opcode 分发到 adaptor
1. 从本线程负责的 shards(thread_id, thread_id+W, thread_id+2W, ...)
拉 slices 出来(每个 shard 加一次 TicketLock)
2. 按 peerNicPath 分组,对每组:
reads, writes = splitByOpcode(slices)
if !reads.empty():
flagcxP2pIgetBatch(sendComm, reads.size(), reads.data())
if !writes.empty():
flagcxP2pIputBatch(sendComm, writes.size(), writes.data())
adaptor 内部:
- 在本 worker 拥有的 QP 子集上 round-robin 下 WR;
- wr_id = (uintptr_t)slice;
- ibv_post_send 失败 → slice->markFailed() + processed_slice_count_++
不做 redispatch;ibv_post_send 失败直接 markFailed,task 计数体现失败。
(d) performPollCq —— 收割 shared CQ
n = ibv_poll_cq(shared_cq_, wc[64], 64) // 直接 poll pool 的共享 CQ
for i in 0..n:
slice = (FlagcxSlice*) wc[i].wr_id
--(*slice->qpDepth)
if wc[i].status != IBV_WC_SUCCESS:
slice->markFailed()
else:
slice->markSuccess()
processed_slice_count_.fetch_add(1)
shared CQ 下,任何 worker poll 到的 WC 都直接处理,不需要按 cq shard 路由。
ibv_poll_cq本身线程安全,每个 WC 只会被一个 caller 拿走。
3.4 ibrc_p2p_adaptor.cc 的改动
(a) vtable 槽位填充
struct flagcxNetAdaptor flagcxNetIbP2p = {
"IB_P2P",
flagcxP2pInit,
/* ...原有不变... */
flagcxP2pIput, flagcxP2pIget, flagcxP2pIputSignal,
flagcxP2pGetDevFromName,
flagcxP2pIputBatch, // ← 原 nullptr,新增实现
flagcxP2pTestBatch, // ← 暂保留,新路径不使用
flagcxP2pIgetBatch, // ← 重构:签名换成 slice 风格
};(b) flagcxP2pIgetBatch / flagcxP2pIputBatch —— 统一 slice 风格签名
⚠️ 这是相对 v1 的关键变更:
flagcxP2pIgetBatch旧签名(uint64_t *srcOffs / void **requests等)整体替换为 slice 风格。由于 nixl plugin 通过flagcxP2pEngineReadVector间接调用、没有任何上层直接打 adaptor,这个改动对 nixl plugin 零感知。
// 唯一签名,两个函数对称
flagcxResult_t flagcxP2pIgetBatch(
void *sendComm, // FlagcxP2pCommView*
int count,
FlagcxSlice **slices); // 已 build 好的 READ slice 列表
flagcxResult_t flagcxP2pIputBatch(
void *sendComm,
int count,
FlagcxSlice **slices); // 已 build 好的 WRITE slice 列表实现要点(两函数共享 helper):
- caller(worker pool)传入的 slice 列表,已按所有权过滤为本 worker 拥有的 QP 子集——即 caller 已经知道「这批要打哪些 QP」。adaptor 内部只在这个 QP 子集上 round-robin。
- 每 QP 一次链式
ibv_post_send,最多FLAGCX_P2P_IGET_BATCH_MAX_WR = 64个 WR。 - 每个 WR:
wr.wr_id = (uintptr_t)slice; // 关键:shared CQ 直接拿 slice wr.opcode = slice->opcode; // READ or WRITE wr.send_flags = IBV_SEND_SIGNALED; wr.sg_list[0] = {slice->srcVa, slice->length, slice->lkey}; wr.wr.rdma = {slice->dstVa, slice->rkey}; slice->qpDepth = &qp_depth[qp_idx]; // 用于 poll 后回扣 ibv_post_send失败时:直接slice->markFailed()并把它的完成计入processed_slice_count_,不做 retry,不回 failed 列表。task 的doneSliceCount自动累加,XferStatus在isAllDone()后清理(成功失败一视同仁)。
不引入失败 slice 回收路径。容错策略:失败 → 计数 → 上层感知。后续要做重传也由上层重新提交一个新的 task,不在 worker pool 内做循环重试。
3.5 CQ 拓扑与 QP 分片
CQ 拓扑:per-engine 单 CQ(绑定到本 GPU 最近 NIC)
本 engine 的全部 conn(每 conn FLAGCX_P2P_QPS_PER_CONN 个 QP)
conn0.qp0 ─┐
conn0.qp1 │
conn0.qp2 │ 全部挂到
conn0.qp3 │ WorkerPool.shared_cq_ ← ibv_create_cq(ctx, kSharedCqDepth)
conn1.qp0 │
... ┘
kSharedCqDepth参照 Mooncake 的大小,envFLAGCX_P2P_CQ_DEPTH可调。- 建 conn 时
flagcxP2pAccept/Connect调engine->workerPool->getSharedCq()拿到 CQ,传给ibv_create_qp。CQ 生命周期归 pool,conn 拆时不 destroy。 - 当前
ibrc_p2p_adaptor.cc中channels[i].cq = ibv_create_cq(...)这行删掉;同时把struct flagcxP2pChannel channels[FLAGCX_P2P_QPS_PER_CONN]重命名为std::vector<ibv_qp*> qp_list_(与 MooncakeRdmaEndPoint::qp_list_对齐),删去 channel 概念——本来 channel 里就只剩 QP 一个有效字段,CQ 已上移到 pool。
QP → worker 静态分片(round-robin)
registerQp(qp) 内部:
int slot = qpRegisterCounter_.fetch_add(1) % transferWorkerCount_;
qpShards_[slot].push_back(qp);
- 4 QP + 2 worker → worker0: {qp0, qp2};worker1: {qp1, qp3}
- 4 QP + 4 worker → worker_i: {qp_i}(1:1)
- QP count 和 worker count 由两个独立 env 控制:
FLAGCX_P2P_QPS_PER_CONN(默认 2)FLAGCX_P2P_WORKERS_PER_POOL(默认 2)
- unregisterQp:从对应 shard 中 erase,无需重平衡(残余 QP 仍均匀分布)。
3.6 迁移步骤(1→2→3,每步可编可跑)
Step 0:CQ 共享化(topology only)
- WorkerPool 空壳:只加
ibv_create_cq+getSharedCq()+registerQp/unregisterQp,不起 worker 线程。 flagcxP2pAccept/Connect内建 QP 时改从 pool 拿 CQ,不再自建;channels[i].cq不 destroy。channels[]数组同步重命名为qp_list_(对齐 MooncakeRdmaEndPoint::qp_list_),删除flagcxP2pChannel结构体。gCqPoller仍在,注册时从「N×4 个 per-QP CQ」减到「所有 ibDev 的 shared CQ」(数量大幅减少)。- 验证:nixl/perf test 行为不变,只是 CQ 拓扑已换。
Step 1:slice 签名
flagcxP2pIputBatch/IgetBatch内部:wr.wr_id = (uintptr_t)slice。gCqPoller的 poll 回调:拿到wr_id后调slice->markSuccess/markFailed,processed_slice_count_暂不在这里计(worker pool 还没起)。gAsyncWorker在调用 iputBatch/igetBatch 前先 build slice + attach task。- 验证:nixl test / perf test 通过;transfer 完成路径走 slice 计数。
Step 2:引入 WorkerPool 接管 post + poll
- WorkerPool ctor 起 N 个 transferWorker。
WriteVector/ReadVector改:build slice →pool->submitPostSend(),不再走gAsyncWorker。- 停止向
gCqPoller注册新 CQ(shared CQ 改由 transferWorker 自己 poll);gCqPoller 空转直到 entries 清空。 - 验证:worker pool 真正 drain 流量;FLAGCX_DEBUG=INFO 下可见 worker poll 日志。
Step 3:清场
- 删
gCqPoller/cqPollerFunc/ensureCqPollerStarted/cqPollerRegister/Unregister/Stop。 - 删
gAsyncWorker/AsyncTransferTask/asyncWorkerFunc。 - 验证:
grep -r gCqPoller flagcx/返回空;grep -r gAsyncWorker flagcx/返回空。
3.7 全局配置 FlagcxP2pGlobalConfig(对齐 Mooncake globalConfig())
目标:把当前散落在
ibrc_p2p_adaptor.cc顶部的#define(FLAGCX_P2P_QPS_PER_CONN/FLAGCX_P2P_MAX_REQUESTS/FLAGCX_P2P_BATCH_POLL_SIZE/FLAGCX_P2P_IGET_BATCH_MAX_WR/FLAGCX_P2P_READ_BATCH_WINDOW)、设计文档里提到但还没落地的 env(FLAGCX_P2P_WORKERS_PER_POOL/FLAGCX_P2P_CQ_DEPTH/ slice/fragment)、以及 IB QP attr 一起收拢成一个全局结构 + 一个once_flag懒加载,与mooncake::globalConfig()形状对齐;env 走 FlagCX 现成的FLAGCX_PARAM宏(带原子缓存、统一FLAGCX_前缀、env file 透传)。
(a) 放置位置 — 结构体声明落在 flagcx_p2p.h
| 内容 | 文件 | 备注 |
|---|---|---|
struct FlagcxP2pGlobalConfig(POD) | flagcx/include/flagcx_p2p.h | 与 FlagcxP2pRdmaDesc 等已有公共结构同级;不引入 <infiniband/verbs.h>,MTU 用裸 int 表示 |
flagcxP2pGlobalConfig() 访问器声明 | flagcx/include/flagcx_p2p.h | const FlagcxP2pGlobalConfig& flagcxP2pGlobalConfig(); |
flagcxP2pDumpGlobalConfig() 声明 | flagcx/include/flagcx_p2p.h | INFO 日志一次性 dump,便于排障 |
flagcxP2pClampToDeviceLimits() 声明 | flagcx/include/flagcx_p2p.h | 入参全 plain int(max_qp_wr / max_sge / max_cqe),verbs 不进 public 头 |
所有 FLAGCX_PARAM(...) + loadGlobalConfig + 单例 | flagcx/core/flagcx_p2p.cc 顶部 anonymous ns | 不另立 flagcx_p2p_config.cc;与 gXferMap 等已有 file-local 状态同位置 |
ibv_mtu 转换(mtuFromInt) | flagcx/core/flagcx_p2p.cc | 仅在内部使用;adaptor 读 c.mtu_length 是 int,建 QP 前自己转 ibv_mtu |
(b) 结构体(写进 flagcx_p2p.h)
/* ------------------------------------------------------------------ */
/* Global runtime configuration */
/* All fields env-overridable via FLAGCX_P2P_* (see flagcx_p2p.cc). */
/* Lazy-loaded once on first flagcxP2pGlobalConfig() call. */
/* ------------------------------------------------------------------ */
struct FlagcxP2pGlobalConfig {
/* —— Worker pool / QP 拓扑 —— */
int qpsPerConn = 4; // FLAGCX_P2P_QPS_PER_CONN
int workersPerPool = 2; // FLAGCX_P2P_WORKERS_PER_POOL
int shardCount = 8; // FLAGCX_P2P_SHARD_COUNT
/* —— CQ / WR / 完成队列 —— */
size_t sharedCqDepth = 4096; // FLAGCX_P2P_CQ_DEPTH
size_t maxWrPerPost = 64; // FLAGCX_P2P_MAX_WR_PER_POST
size_t maxRequests = 256; // FLAGCX_P2P_MAX_REQUESTS
size_t batchPollSize = 32; // FLAGCX_P2P_BATCH_POLL_SIZE
size_t readBatchWindow = 8; // FLAGCX_P2P_READ_BATCH_WINDOW
/* —— Slice 切片策略 —— */
size_t sliceSize = 64 * 1024; // FLAGCX_P2P_SLICE_SIZE
size_t fragmentLimit = 4 * 1024; // FLAGCX_P2P_FRAGMENT_LIMIT
/* —— IB QP attr(用 plain int,避免引入 verbs 到 public 头)—— */
size_t maxSge = 4; // FLAGCX_P2P_MAX_SGE
size_t maxInline = 64; // FLAGCX_P2P_MAX_INLINE
uint8_t ibPort = 1; // FLAGCX_P2P_IB_PORT
int gidIndex = -1; // FLAGCX_P2P_GID_INDEX
int mtuLength = 4096; // FLAGCX_P2P_MTU (512/1024/2048/4096)
int ibTrafficClass = -1; // FLAGCX_P2P_IB_TC
int retryCnt = 7; // FLAGCX_P2P_RETRY_CNT
/* —— Notification —— */
int notifMaxPeers = 64; // FLAGCX_P2P_NOTIF_MAX_PEERS
/* —— 杂项 —— */
bool enableDestDeviceAffinity = false; // FLAGCX_P2P_DEST_DEV_AFFINITY
};
/* 等价于 mooncake::globalConfig();首次调用懒加载 + std::once_flag 保护 */
const FlagcxP2pGlobalConfig& flagcxP2pGlobalConfig();
/* INFO 日志 dump 一次(loadGlobalConfig 末尾自动调用,也可外部主动调) */
void flagcxP2pDumpGlobalConfig();
/* 设备能力夹紧:把 maxWrPerPost / maxSge / sharedCqDepth 限到设备实际值。
入参全 plain int,verbs 头不进 public header。
adaptor 在 ibv_query_device 后调用一次。 */
void flagcxP2pClampToDeviceLimits(uint32_t maxQpWr,
uint32_t maxSge,
uint32_t maxCqe,
uint32_t maxQp);(c) 加载实现(写在 flagcx_p2p.cc 顶部 anonymous namespace)
与 §1 引用的 mooncake
loadGlobalConfig同形:每个字段一行FLAGCX_PARAM,loader 只做 narrowing + 范围检查 + 写结构体。
namespace {
FLAGCX_PARAM(P2pQpsPerConn, "P2P_QPS_PER_CONN", 4);
FLAGCX_PARAM(P2pWorkersPerPool, "P2P_WORKERS_PER_POOL", 2);
FLAGCX_PARAM(P2pShardCount, "P2P_SHARD_COUNT", 8);
FLAGCX_PARAM(P2pCqDepth, "P2P_CQ_DEPTH", 4096);
FLAGCX_PARAM(P2pMaxWrPerPost, "P2P_MAX_WR_PER_POST", 64);
FLAGCX_PARAM(P2pMaxRequests, "P2P_MAX_REQUESTS", 256);
FLAGCX_PARAM(P2pBatchPollSize, "P2P_BATCH_POLL_SIZE", 32);
FLAGCX_PARAM(P2pReadBatchWindow, "P2P_READ_BATCH_WINDOW", 8);
FLAGCX_PARAM(P2pSliceSize, "P2P_SLICE_SIZE", 64 * 1024);
FLAGCX_PARAM(P2pFragmentLimit, "P2P_FRAGMENT_LIMIT", 4 * 1024);
FLAGCX_PARAM(P2pMaxSge, "P2P_MAX_SGE", 4);
FLAGCX_PARAM(P2pMaxInline, "P2P_MAX_INLINE", 64);
FLAGCX_PARAM(P2pIbPort, "P2P_IB_PORT", 1);
FLAGCX_PARAM(P2pGidIndex, "P2P_GID_INDEX", -1);
FLAGCX_PARAM(P2pMtu, "P2P_MTU", 4096);
FLAGCX_PARAM(P2pIbTc, "P2P_IB_TC", -1);
FLAGCX_PARAM(P2pRetryCnt, "P2P_RETRY_CNT", 7);
FLAGCX_PARAM(P2pNotifMaxPeers, "P2P_NOTIF_MAX_PEERS", 64);
FLAGCX_PARAM(P2pDestDevAffinity, "P2P_DEST_DEV_AFFINITY", 0);
template <typename T>
inline T clampParam(int64_t v, T lo, T hi, T deft, const char* name) {
if (v < (int64_t)lo || v > (int64_t)hi) {
INFO(FLAGCX_INIT,
"Ignore FLAGCX_%s=%lld (out of [%lld,%lld]), use default %lld",
name, (long long)v, (long long)lo, (long long)hi, (long long)deft);
return deft;
}
return (T)v;
}
void loadGlobalConfig(FlagcxP2pGlobalConfig& c) {
c.qpsPerConn = clampParam(flagcxParamP2pQpsPerConn(), 1, 32, 4, "P2P_QPS_PER_CONN");
c.workersPerPool = clampParam(flagcxParamP2pWorkersPerPool(), 1, 8, 2, "P2P_WORKERS_PER_POOL");
c.shardCount = clampParam(flagcxParamP2pShardCount(), 1, 64, 8, "P2P_SHARD_COUNT");
c.sharedCqDepth = clampParam<size_t>(flagcxParamP2pCqDepth(), 1, 1<<20, 4096, "P2P_CQ_DEPTH");
c.maxWrPerPost = clampParam<size_t>(flagcxParamP2pMaxWrPerPost(), 1, 1024, 64, "P2P_MAX_WR_PER_POST");
c.maxRequests = clampParam<size_t>(flagcxParamP2pMaxRequests(), 1, 1<<16, 256, "P2P_MAX_REQUESTS");
c.batchPollSize = clampParam<size_t>(flagcxParamP2pBatchPollSize(), 1, 256, 32, "P2P_BATCH_POLL_SIZE");
c.readBatchWindow = clampParam<size_t>(flagcxParamP2pReadBatchWindow(), 1, 256, 8, "P2P_READ_BATCH_WINDOW");
c.sliceSize = clampParam<size_t>(flagcxParamP2pSliceSize(), 1<<10, 1<<26, 64*1024, "P2P_SLICE_SIZE");
c.fragmentLimit = clampParam<size_t>(flagcxParamP2pFragmentLimit(), 0, c.sliceSize, 4*1024, "P2P_FRAGMENT_LIMIT");
c.maxSge = clampParam<size_t>(flagcxParamP2pMaxSge(), 1, 32, 4, "P2P_MAX_SGE");
c.maxInline = clampParam<size_t>(flagcxParamP2pMaxInline(), 0, 1024, 64, "P2P_MAX_INLINE");
c.ibPort = clampParam<uint8_t>(flagcxParamP2pIbPort(), 1, 255, 1, "P2P_IB_PORT");
c.gidIndex = clampParam<int>(flagcxParamP2pGidIndex(), -1, 255, -1, "P2P_GID_INDEX");
{
int64_t mv = flagcxParamP2pMtu();
if (mv == 512 || mv == 1024 || mv == 2048 || mv == 4096) c.mtuLength = (int)mv;
else { WARN("FLAGCX_P2P_MTU=%lld invalid, fallback 4096", (long long)mv); c.mtuLength = 4096; }
}
c.ibTrafficClass = clampParam<int>(flagcxParamP2pIbTc(), -1, 255, -1, "P2P_IB_TC");
c.retryCnt = clampParam<int>(flagcxParamP2pRetryCnt(), 0, 7, 7, "P2P_RETRY_CNT");
c.notifMaxPeers = clampParam<int>(flagcxParamP2pNotifMaxPeers(), 1, 1024, 64, "P2P_NOTIF_MAX_PEERS");
c.enableDestDeviceAffinity = (flagcxParamP2pDestDevAffinity() != 0);
}
FlagcxP2pGlobalConfig& mutableGlobalConfig() {
static FlagcxP2pGlobalConfig cfg;
static std::once_flag once;
std::call_once(once, [] { loadGlobalConfig(cfg); flagcxP2pDumpGlobalConfig(); });
return cfg;
}
} // namespace
const FlagcxP2pGlobalConfig& flagcxP2pGlobalConfig() {
return mutableGlobalConfig();
}
void flagcxP2pClampToDeviceLimits(uint32_t maxQpWr, uint32_t maxSge,
uint32_t maxCqe, uint32_t /*maxQp*/) {
auto& c = mutableGlobalConfig();
if (c.maxWrPerPost > (size_t)maxQpWr) c.maxWrPerPost = maxQpWr;
if (c.maxSge > (size_t)maxSge) c.maxSge = maxSge;
if (c.sharedCqDepth > (size_t)maxCqe) c.sharedCqDepth = maxCqe;
}
void flagcxP2pDumpGlobalConfig() {
const auto& c = flagcxP2pGlobalConfig();
INFO(FLAGCX_INIT, "=== FlagCX P2P GlobalConfig ===");
INFO(FLAGCX_INIT, "qpsPerConn=%d workersPerPool=%d shardCount=%d",
c.qpsPerConn, c.workersPerPool, c.shardCount);
INFO(FLAGCX_INIT, "sharedCqDepth=%zu maxWrPerPost=%zu maxRequests=%zu "
"batchPollSize=%zu readBatchWindow=%zu",
c.sharedCqDepth, c.maxWrPerPost, c.maxRequests,
c.batchPollSize, c.readBatchWindow);
INFO(FLAGCX_INIT, "sliceSize=%zu fragmentLimit=%zu", c.sliceSize, c.fragmentLimit);
INFO(FLAGCX_INIT, "ibPort=%u gidIndex=%d mtu=%d tc=%d retry=%d "
"maxSge=%zu maxInline=%zu",
(unsigned)c.ibPort, c.gidIndex, c.mtuLength,
c.ibTrafficClass, c.retryCnt, c.maxSge, c.maxInline);
INFO(FLAGCX_INIT, "notifMaxPeers=%d destDevAffinity=%d",
c.notifMaxPeers, (int)c.enableDestDeviceAffinity);
}(d) env 变量总表(FLAGCX_ 前缀由宏自动加)
| env | 默认 | 范围 | 作用 |
|---|---|---|---|
FLAGCX_P2P_QPS_PER_CONN | 4 | [1,32] | 每条 conn 上 QP 数;与 worker 数共同决定 QP→worker 静态分片 |
FLAGCX_P2P_WORKERS_PER_POOL | 2 | [1,8] | 每个 WorkerPool 起的 transferWorker 线程数 |
FLAGCX_P2P_SHARD_COUNT | 8 | [1,64] | submit 侧 sharded slice queue 桶数 |
FLAGCX_P2P_CQ_DEPTH | 4096 | [1,1M] | 共享 CQ ibv_create_cq 深度 |
FLAGCX_P2P_MAX_WR_PER_POST | 64 | [1,1024] | 单次链式 ibv_post_send 最大 WR 数 |
FLAGCX_P2P_MAX_REQUESTS | 256 | [1,65536] | 每 conn 待完成 request 上限 |
FLAGCX_P2P_BATCH_POLL_SIZE | 32 | [1,256] | 每次 ibv_poll_cq 一次拿走的 WC 数 |
FLAGCX_P2P_READ_BATCH_WINDOW | 8 | [1,256] | READ 路径攒批窗口 |
FLAGCX_P2P_SLICE_SIZE | 64K | [1K,64M] | FlagcxSlicePolicy::kBlockSize(connector 路径切片粒度) |
FLAGCX_P2P_FRAGMENT_LIMIT | 4K | [0, sliceSize] | 末尾合并阈值;剩余 ≤ slice+frag 时合并不二切 |
FLAGCX_P2P_MAX_SGE | 4 | [1,32] | QP cap.max_send_sge |
FLAGCX_P2P_MAX_INLINE | 64 | [0,1024] | QP cap.max_inline_data |
FLAGCX_P2P_IB_PORT | 1 | [1,255] | IB port number |
FLAGCX_P2P_GID_INDEX | -1(auto) | [-1,255] | RoCE GID index;-1 走自动选 |
FLAGCX_P2P_MTU | 4096 | {512,1024,2048,4096} | path MTU |
FLAGCX_P2P_IB_TC | -1(off) | [-1,255] | IB traffic class;-1 不设置 |
FLAGCX_P2P_RETRY_CNT | 7 | [0,7] | QP retry count |
FLAGCX_P2P_NOTIF_MAX_PEERS | 64 | [1,1024] | epoll notif 监听 peer 上限 |
FLAGCX_P2P_DEST_DEV_AFFINITY | 0 | {0,1} | 是否启用目的设备亲和路由(占位,对应后续待办 §6.1) |
(e) 调用侧改造对照表
| 旧(编译期常量) | 新(运行时读 config) |
|---|---|
for (int i=0; i<FLAGCX_P2P_QPS_PER_CONN; ++i) | auto& C = flagcxP2pGlobalConfig(); for (int i=0; i<C.qpsPerConn; ++i) |
struct flagcxP2pChannel channels[FLAGCX_P2P_QPS_PER_CONN] | std::vector<ibv_qp*> qp_list_(C.qpsPerConn)(设计文档 §3.5 已要求改名为 qp_list_) |
struct flagcxP2pRequest reqs[FLAGCX_P2P_MAX_REQUESTS] | std::unique_ptr<flagcxP2pRequest[]> reqs(new flagcxP2pRequest[C.maxRequests]) |
struct ibv_wc wcs[FLAGCX_P2P_BATCH_POLL_SIZE] | std::array<ibv_wc, 256> wcs; + 实际 poll 数取 min(C.batchPollSize, 256)(栈数组取上界) |
ibv_create_cq(ctx, kSharedCqDepth, ...) | ibv_create_cq(ctx, C.sharedCqDepth, ...) |
slice_queue_[8] / slice_queue_count_[8] / slice_queue_lock_[8] | 全部 std::vector 化,构造时 resize(C.shardCount);hash 路由 % C.shardCount |
ibv_query_device 后无操作 | flagcxP2pClampToDeviceLimits(d.max_qp_wr, d.max_sge, d.max_cqe, d.max_qp); |
static_assert(2 * FLAGCX_P2P_IGET_BATCH_MAX_WR <= 2 * MAX_REQUESTS) | 改为运行期 check:assert(2 * C.maxWrPerPost <= 2 * C.maxRequests) 在 pool ctor 头部 |
⚠️ 避坑:原有
static_assert是编译期常量约束,迁到运行时配置后必须在 pool 构造时同步 assert,否则用户给出非法组合会沉默崩。
(f) 与 mooncake 写法的对照(一句话总览)
| 维度 | mooncake | 本方案 |
|---|---|---|
| env 解析方式 | 在 loadGlobalConfig 手写 getenv+atoi+范围 | 每个字段一行 FLAGCX_PARAM(...);宏负责原子缓存、env file、FLAGCX_ 前缀 |
| 入口形状 | globalConfig() + once_flag | 完全一致(flagcxP2pGlobalConfig()) |
| 命名空间 | mooncake:: | 文件作用域(flagcx_p2p.h 不带 namespace) |
| 设备能力夹紧 | updateGlobalConfig(ibv_device_attr&) | flagcxP2pClampToDeviceLimits(maxQpWr,maxSge,maxCqe,maxQp),verbs 不进 public 头 |
| 日志 | glog | FlagCX 自有 INFO(FLAGCX_INIT, ...) / WARN(...) |
| MTU 表示 | ibv_mtu 枚举 | int (512/1024/2048/4096),ibv_mtu 转换在 adaptor 内部 |
(g) 加载时机 / 线程安全
- 首次
flagcxP2pGlobalConfig()调用触发懒加载;std::call_once保证只装一次。 - 推荐在
flagcxP2pEngineCreate第一行调用一次(在建 WorkerPool 之前),把”读 env”的开销和日志都集中在 engine 创建路径。 flagcxP2pClampToDeviceLimits是唯一允许写的接口,仅在 IB context 初始化路径调一次;之后所有读路径走const&不需要锁。- 不暴露任何 setter;要改值只能改 env + 重启。
4. 第三步:flagcx connector 改造
参考:3. flagcx ibrc p2p RPC 服务 + flagcx connector 改动
5. 文件改动清单
| 路径 | 变更类型 | 说明 |
|---|---|---|
flagcx/include/flagcx_p2p.h | 增量 | 加 FlagcxTransferTask / FlagcxSlice / 两个 Policy / buildSlices<> 声明 / flagcxP2pEngineReg 增可选出参 / WriteVector 增 policyKind 入参;新增 FlagcxP2pGlobalConfig 结构 + flagcxP2pGlobalConfig() / flagcxP2pDumpGlobalConfig() / flagcxP2pClampToDeviceLimits() 三个函数声明(详见 §3.7) |
flagcx/core/flagcx_p2p.cc | 重构 | 删 gAsyncWorker + AsyncTransferTask;加 FlagcxWorkerPool(cc-only);engineCreate 内 eager 建 pool;改 ReadVector / WriteVector 走 slice + WorkerPool;改 XferStatus;改 Reg 出参;顶部 anonymous ns 加 19 个 FLAGCX_PARAM(P2pXxx, "P2P_XXX", ...) + loadGlobalConfig + mutableGlobalConfig 单例(详见 §3.7);engineCreate 第一行调一次 flagcxP2pGlobalConfig() 触发懒加载 |
flagcx/adaptor/net/ibrc_p2p_adaptor.cc | 改造 | Step 0:channels[i].cq 改从 pool 拿(不再自建/自毁),channels[] 重命名为 qp_list_(对齐 Mooncake),删除 flagcxP2pChannel 结构体;Step 1:igetBatch/iputBatch 改 slice 签名;Step 3:删 gCqPoller 相关全部代码;vtable iputBatch 槽填上;删除文件顶部全部 #define FLAGCX_P2P_QPS_PER_CONN/MAX_REQUESTS/BATCH_POLL_SIZE/IGET_BATCH_MAX_WR/READ_BATCH_WINDOW,引用全部替换为 flagcxP2pGlobalConfig().xxx;静态数组(channels[K]/reqs[K])改运行时容量;编译期 static_assert 改为 pool ctor 内运行时 assert;ibv_query_device 后调 flagcxP2pClampToDeviceLimits(...) 一次 |
plugin/interservice/flagcx_wrapper.py | 增量 | 加 4 类(engine / RPC / register / transfer)共 7 个新 ctypes 绑定;旧 flagcxOneSideRegister / flagcxBatchPut 保留兼容老 caller |
| connector Python 业务代码 | 替换 | 从 flagcxOneSideRegister + flagcxBatchPut 切换过来 |
6 nixl backend会用的 flagcx 接口
| NIXL 函数 | 调用的 FlagCX 接口 / 类型 | 用途 |
|---|---|---|
nixlFlagcxEngine::nixlFlagcxEngine() | flagcxP2pEngineCreate() | 创建 P2P engine |
nixlFlagcxEngine::~nixlFlagcxEngine() | flagcxP2pEngineStopAccept() | 停止 accept |
nixlFlagcxEngine::~nixlFlagcxEngine() | flagcxP2pEngineMrDestroy() | 清理已注册 MR |
nixlFlagcxEngine::~nixlFlagcxEngine() | flagcxP2pEngineConnDestroy() | 清理连接 |
nixlFlagcxEngine::~nixlFlagcxEngine() | flagcxP2pEngineDestroy() | 销毁 engine |
nixlFlagcxEngine::startListener() | flagcxP2pEngineAccept() | 被动接受连接 |
nixlFlagcxEngine::startListener() | flagcxP2pEngineStartListener() | 启动连接监听 |
nixlFlagcxEngine::getConnInfo() | flagcxP2pEngineGetMetadata() | 导出本端连接信息 |
nixlFlagcxEngine::loadRemoteConnInfo() | flagcxP2pEngineConnect() | 主动连接远端 |
nixlFlagcxEngine::loadRemoteConnInfo() | flagcxP2pEngineStartListener() | 启动连接监听 |
nixlFlagcxEngine::disconnect() | flagcxP2pEngineConnDestroy() | 主动断开连接 |
nixlFlagcxEngine::registerMem() | flagcxP2pEngineReg() | 注册本地内存 |
nixlFlagcxEngine::registerMem() | flagcxP2pEnginePrepareDesc() | 生成本地内存的 remote desc |
nixlFlagcxEngine::registerMem() | flagcxP2pEngineMrDestroy() | desc 生成失败时回滚 MR |
nixlFlagcxEngine::deregisterMem() | flagcxP2pEngineMrDestroy() | 注销本地 MR |
nixlFlagcxEngine::prepXfer() | flagcxP2pDeserializeRdmaDesc() | 解析远端 desc |
nixlFlagcxEngine::prepXfer() | flagcxP2pEngineUpdateDesc() | 更新远端 addr/size |
nixlFlagcxEngine::postXfer() | flagcxP2pEngineReadVector() | 发起 NIXL_READ |
nixlFlagcxEngine::postXfer() | flagcxP2pEngineWriteVector() | 发起 NIXL_WRITE |
nixlFlagcxEngine::checkXfer() | flagcxP2pEngineXferStatus() | 查询传输完成 |
nixlFlagcxEngine::checkXfer() | flagcxP2pEngineSendNotif() | 传输完成后发送通知 |
nixlFlagcxEngine::getNotifs() | flagcxP2pEngineGetNotifs() | 获取收到的通知 |
nixlFlagcxEngine::genNotif() | flagcxP2pEngineSendNotif() | 主动发送通知 |