⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠ You can decompress Drawing data with the command palette: ‘Decompress current Excalidraw file’. For more info check in plugin settings under ‘Saving’
1
flowchart LR Agent A ========================= ========================= subgraph B["Agent B"] direction TB subgraph B1["NIXL 侧改动"] BAgent["nixlAgent"] BPlugin["libplugin_flagcx.so"] BBackend["nixlFlagcxEngine"] BMD["nixlFlagcxBackendMD"] BReq["nixlFlagcxReqH"] end subgraph B2["FlagCX 侧改动"] BSB["flagcx_nixl_engine"] BConn["peer cache / conn state"] BMem["local_mem / remote_mem"] BProg["accept_thread\nprogress_thread"] BCtrl["same-host ctrl block"] BDev["deviceAdaptor"] BNet["netAdaptor"] end BAgent --> BPlugin --> BBackend --> BSB BBackend --> BMD BBackend --> BReq BSB --> BConn BSB --> BMem BSB --> BProg BSB --> BCtrl BSB --> BDev BSB --> BNet end Metadata / lifecycle ========================= ========================= AAgent -->|"4. prepXfer()"| ABackend ABackend -->|"组装 iov\nlocal_offset / remote_offset"| AReq AAgent -->|"5. postXfer()"| ABackend ABackend -->|"flagcx_nixl_submit(op, iovs)"| ASB SH --> SHPath["same-host data path\nWRITE: memcpy(remote_ptr+off, local_ptr+off)\nREAD: memcpy(local_ptr+off, remote_ptr+off)"] CH --> CHPath["cross-host data path\nWRITE: WRITE_REQ -> READY -> isend -> DONE\nREAD: irecv -> READ_REQ -> remote isend -> complete"] SHPath --> AProg CHPath --> AProg AAgent -->|"6. checkXfer()"| ABackend ABackend -->|"flagcx_nixl_poll()"| ASB ASB -->|"done / in-prog"| ABackend Notification ========================= ========================= subgraph A["Agent A"] direction TB subgraph A1["NIXL 侧改动"] AAgent["nixlAgent"] APlugin["libplugin_flagcx.so\nflagcx_plugin.cpp"] ABackend["nixlFlagcxEngine\nflagcx_backend.h/.cpp\nsupportsRemote=true\nsupportsLocal=false\nsupportsNotif=true"] AMD["nixlFlagcxBackendMD\nLOCAL_REG / REMOTE_IMPORTED"] AReq["nixlFlagcxReqH\nPREPARED / POSTED / COMPLETED"] end subgraph A2["FlagCX 侧改动"] ASB["flagcx_nixl_engine\nflagcx_nixl_engine.h/.cc"] AConn["peer cache / conn state"] AMem["local_mem / remote_mem"] AProg["accept_thread\nprogress_thread"] ACtrl["same-host ctrl block\nshm ring / notif / error / close"] ADev["deviceAdaptor\nipcMemHandleCreate/Open/Close\n deviceMemcpy"] ANet["netAdaptor\nctrl/data comm\nisend/irecv/test"] end AAgent --> APlugin --> ABackend --> ASB ABackend --> AMD ABackend --> AReq ASB --> AConn ASB --> AMem ASB --> AProg ASB --> ACtrl ASB --> ADev ASB --> ANet end Agent B ========================= ========================= ABackend -. "1. getConnInfo()" .-> ASB ASB -. "connInfo blob\nversion/agent/host_hash/device_id/\nctrl_desc/net listen handle" .-> BBackend BBackend -. "2. loadRemoteConnInfo()" .-> BSB BBackend -. "registerMem()\nflagcx_nixl_reg_mem()" .-> BSB BSB -. "public md blob\nmem_token/base/len/type/\noptional ipc_handle" .-> ABackend ABackend -. "loadRemoteMD()\nflagcx_nixl_import_mem()" .-> ASB AAgent -->|"3. connect(remote_agent)"| ABackend ABackend -->|"flagcx_nixl_connect()"| ASB ASB --> T{"Topology?"} T -->|"same-host"| SH["导入 peer ctrl block\nremote md 打开 IPC handle\nremote_mem.ipc_mapped_ptr ready"] T -->|"cross-host"| CH["建立 ctrl_send/recv_comm\n建立 data_send/recv_comm"] Transfer path ========================= ========================= ABackend -->|"optional completion notif\nflagcx_nixl_send_notif()"| ASB ASB -->|"same-host: shm ctrl block"| BCtrl ASB -->|"cross-host: net ctrl plane"| BNet BSB -->|"flagcx_nixl_drain_notifs()"| BBackend BBackend -->|"getNotifs()"| BAgent
Excalidraw Data
Text Elements
NIXL
Flagcx
nixlAgent::createBackend()
- plugin manager
nixlFlagcxEngine
flagcx_nixl_engine_create(),都去flagcxGetUniqueId,用的时候都用 rank0 params 变为长期 flagcx_nixl_engine*
nixlBackendInitParams
- getConnInfo
flagcx_nixl_get_conn_info() 序列化 uniqueId+host+dev,变成 blob 格式返回
3.getSupportedMems()
返回支持 VRAM
nixlFlagcxEngine::loadRemoteConnInfo 这里框架会帮你把远端的 blob 送过来,见 文档 0内的
flagcx_nixl_load_remote_conn_info(),把 对端的 blob反序列化给自己使用
blob+remoteAgent(i)
也叫 metadata
self
nFE::connect
flagcx_nixl_connect()这个函数只调用 flagcxCommInitRank(comm, 2, uid, rank)
nFE::prepXferDlist
self
用nixlFlagcxBackendMD* 来构造nixl_meta_dlist_t
self
生成一个nixlFlagcxReqH
nFE::postXferReq()
返回一个flagcx_nixl_req_t*
flagcx_nixl_submit(engine, conn, op, iovs, niov, &req)
NIXL_IN_PROG
nFE::checkXfer() 如果拿到 done 就补发 一条 notify
flagcx_nixl_poll(engine, req, &done)
notify
flagcx_nixl_send_notify
flagcx_nixl_req_free
done?
init backend
NIXL_WRITE
NIXL_READ
if reg && IBRC && Put
Put
flagcxRecv
flagcxSend
transfer
nFE::loadRemoteMD() only put need to remote_offset
metadataB
register
because: put need windows register
one_sided?
nFE::registerMem() 要求返回一个metadataP
nixlLocalSection::addDescList()
对 desc 注册本地 buffer
flagcx_nixl_reg_mem()去调用flagcxCommRegister
if put
self
nFE::getPublicData()
self(现在本地就有了 addr, lem, devId, metadataP, metadataB)
from metadataP to metadataB
listener 线程