⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠ You can decompress Drawing data with the command palette: ‘Decompress current Excalidraw file’. For more info check in plugin settings under ‘Saving’

1

flowchart LR
   Agent A
   =========================
   =========================
  subgraph B["Agent B"]
    direction TB

    subgraph B1["NIXL 侧改动"]
      BAgent["nixlAgent"]
      BPlugin["libplugin_flagcx.so"]
      BBackend["nixlFlagcxEngine"]
      BMD["nixlFlagcxBackendMD"]
      BReq["nixlFlagcxReqH"]
    end

    subgraph B2["FlagCX 侧改动"]
      BSB["flagcx_nixl_engine"]
      BConn["peer cache / conn state"]
      BMem["local_mem / remote_mem"]
      BProg["accept_thread\nprogress_thread"]
      BCtrl["same-host ctrl block"]
      BDev["deviceAdaptor"]
      BNet["netAdaptor"]
    end

    BAgent --> BPlugin --> BBackend --> BSB
    BBackend --> BMD
    BBackend --> BReq
    BSB --> BConn
    BSB --> BMem
    BSB --> BProg
    BSB --> BCtrl
    BSB --> BDev
    BSB --> BNet
  end

   Metadata / lifecycle
   =========================
   =========================
  AAgent -->|"4. prepXfer()"| ABackend
  ABackend -->|"组装 iov\nlocal_offset / remote_offset"| AReq
  AAgent -->|"5. postXfer()"| ABackend
  ABackend -->|"flagcx_nixl_submit(op, iovs)"| ASB

  SH --> SHPath["same-host data path\nWRITE: memcpy(remote_ptr+off, local_ptr+off)\nREAD:  memcpy(local_ptr+off, remote_ptr+off)"]
  CH --> CHPath["cross-host data path\nWRITE: WRITE_REQ -> READY -> isend -> DONE\nREAD:  irecv -> READ_REQ -> remote isend -> complete"]

  SHPath --> AProg
  CHPath --> AProg

  AAgent -->|"6. checkXfer()"| ABackend
  ABackend -->|"flagcx_nixl_poll()"| ASB
  ASB -->|"done / in-prog"| ABackend

   Notification
   =========================
   =========================
  subgraph A["Agent A"]
    direction TB

    subgraph A1["NIXL 侧改动"]
      AAgent["nixlAgent"]
      APlugin["libplugin_flagcx.so\nflagcx_plugin.cpp"]
      ABackend["nixlFlagcxEngine\nflagcx_backend.h/.cpp\nsupportsRemote=true\nsupportsLocal=false\nsupportsNotif=true"]
      AMD["nixlFlagcxBackendMD\nLOCAL_REG / REMOTE_IMPORTED"]
      AReq["nixlFlagcxReqH\nPREPARED / POSTED / COMPLETED"]
    end

    subgraph A2["FlagCX 侧改动"]
      ASB["flagcx_nixl_engine\nflagcx_nixl_engine.h/.cc"]
      AConn["peer cache / conn state"]
      AMem["local_mem / remote_mem"]
      AProg["accept_thread\nprogress_thread"]
      ACtrl["same-host ctrl block\nshm ring / notif / error / close"]
      ADev["deviceAdaptor\nipcMemHandleCreate/Open/Close\n deviceMemcpy"]
      ANet["netAdaptor\nctrl/data comm\nisend/irecv/test"]
    end

    AAgent --> APlugin --> ABackend --> ASB
    ABackend --> AMD
    ABackend --> AReq
    ASB --> AConn
    ASB --> AMem
    ASB --> AProg
    ASB --> ACtrl
    ASB --> ADev
    ASB --> ANet
  end

   Agent B
   =========================
   =========================
  ABackend -. "1. getConnInfo()" .-> ASB
  ASB -. "connInfo blob\nversion/agent/host_hash/device_id/\nctrl_desc/net listen handle" .-> BBackend
  BBackend -. "2. loadRemoteConnInfo()" .-> BSB

  BBackend -. "registerMem()\nflagcx_nixl_reg_mem()" .-> BSB
  BSB -. "public md blob\nmem_token/base/len/type/\noptional ipc_handle" .-> ABackend
  ABackend -. "loadRemoteMD()\nflagcx_nixl_import_mem()" .-> ASB

  AAgent -->|"3. connect(remote_agent)"| ABackend
  ABackend -->|"flagcx_nixl_connect()"| ASB
  ASB --> T{"Topology?"}

  T -->|"same-host"| SH["导入 peer ctrl block\nremote md 打开 IPC handle\nremote_mem.ipc_mapped_ptr ready"]
  T -->|"cross-host"| CH["建立 ctrl_send/recv_comm\n建立 data_send/recv_comm"]

   Transfer path
   =========================
   =========================
  ABackend -->|"optional completion notif\nflagcx_nixl_send_notif()"| ASB
  ASB -->|"same-host: shm ctrl block"| BCtrl
  ASB -->|"cross-host: net ctrl plane"| BNet

  BSB -->|"flagcx_nixl_drain_notifs()"| BBackend
  BBackend -->|"getNotifs()"| BAgent

Excalidraw Data

Text Elements

NIXL

Flagcx

nixlAgent::createBackend()

  1. plugin manager

nixlFlagcxEngine

flagcx_nixl_engine_create(),都去flagcxGetUniqueId,用的时候都用 rank0 params 变为长期 flagcx_nixl_engine*

nixlBackendInitParams

  1. getConnInfo

flagcx_nixl_get_conn_info() 序列化 uniqueId+host+dev,变成 blob 格式返回

3.getSupportedMems()

返回支持 VRAM

nixlFlagcxEngine::loadRemoteConnInfo 这里框架会帮你把远端的 blob 送过来,见 文档 0内的

flagcx_nixl_load_remote_conn_info(),把 对端的 blob反序列化给自己使用

blob+remoteAgent(i)

也叫 metadata

self

nFE::connect

flagcx_nixl_connect()这个函数只调用 flagcxCommInitRank(comm, 2, uid, rank)

nFE::prepXferDlist

self

用nixlFlagcxBackendMD* 来构造nixl_meta_dlist_t

self

生成一个nixlFlagcxReqH

nFE::postXferReq()

返回一个flagcx_nixl_req_t*

flagcx_nixl_submit(engine, conn, op, iovs, niov, &req)

NIXL_IN_PROG

nFE::checkXfer() 如果拿到 done 就补发 一条 notify

flagcx_nixl_poll(engine, req, &done)

notify

flagcx_nixl_send_notify

flagcx_nixl_req_free

done?

init backend

NIXL_WRITE

NIXL_READ

if reg && IBRC && Put

Put

flagcxRecv

flagcxSend

transfer

nFE::loadRemoteMD() only put need to remote_offset

metadataB

register

because: put need windows register

one_sided?

nFE::registerMem() 要求返回一个metadataP

nixlLocalSection::addDescList()

对 desc 注册本地 buffer

flagcx_nixl_reg_mem()去调用flagcxCommRegister

if put

self

nFE::getPublicData()

self(现在本地就有了 addr, lem, devId, metadataP, metadataB)

from metadataP to metadataB

listener 线程