⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠ You can decompress Drawing data with the command palette: ‘Decompress current Excalidraw file’. For more info check in plugin settings under ‘Saving’

Excalidraw Data

Text Elements

async_llm(子进程+异步)

llm_engine(子进程+同步)

SyncMPClient

AsyncMPClient

InprocClient

online http/api

无子进程

offline

offline

DPLB AsyncMPClient

DP AsyncMPClient

1.input_processor.process_inputs() 把文字 “讲个笑话” 分词变成 token id [123, 456, …] 2.await _send_input_message(下面 zmq 要发的) 3.用 asyncio.create_task 起一个常驻任务 4.用 asyncio.create_task 起一个常驻任务 5.构造一个 格式的 O1 ,然后挂起,释放 GIL

接着循环上面的步骤125,构造一个 O2,挂起,上层会等所有 O1、O2的 req

ZMQ 发送 (msgpack 序列化的 token id)

process1(EngineCoreClient进程)

process2(EngineCore 进程)

thread1(process_input_socket)

main thread(run busy loop)

thread3(preocess_output_socket)

  1. 与 process1 创建 ZMQ socket注册到 Poller

  2. Poller.recv_multipart阻塞接收数据

  3. 解码数据,并判断数据是 add 还是 abort

  4. 数据入队input_queue,是 abort 请求的话 入队aborts_queue

  5. 与 process1 创建 ZMQ socket

  6. output_queue 内拿 scheduler 返回的 推理结果output

  7. output编码后放到 zmq 的 buffer 池子

  8. socket.send_multipart把这个 buffer 池子里的数据都零拷贝给 client

1.清空input_queue内的 req,交给scheduler.add_request() 2.scheduler.schedule(), 决定这一轮跑哪些 req、多少 token、分配 kv cache block(prefix cache 命中也在这里判断); 返回SchedulerOutput(token id、block table,哪些需要 prefill/decode); 3.executor.execute_model() 把SchedulerOutput发给 executor(原来的 worker 抽象); GPU 开始 attn+ffn,返回 Future(不管结果,去干别的事情) 4.get_grammar_bitmask,计算 JSON schema / regex 的 token mask 4.5 future.result() 5.sample_tokens(),拿到Future结果,做采样,选下一个 token 6.update_from_output,Scheduler拿到新 token,更新 req 状态 7.output_queue.put()

GPU workers

nvidia

分配 kv cache block (cache 已被 xxConnector 注册过 MR)

attention+FFN

logits/tokens

采样

更新req状态

CPU 同时计算 mask

req status

sample

result

mask

请求执行

block id

ZMQ 返回 token id 和 logprobs 张量

AsyncMPClient

add_request(多少条请求就执行多少次)

v1/chat/completions

engineCoreClient

await

output_handler

1.await get_output_async,从 client 的 outputs_queue 拿一批 2.process_outputs - req_state.detokenizer.update() 解码 - req_state.logprobs_processor.update_from_output() 算 logprobs - req_state.queue.put() 塞进那个请求专属的 唤醒 O1 唤醒 O2 …

RequestOutputCollector

RequestOutputCollector

output_handler

process_outputs_socket

recv_multipart

send_multipart

send_multipart

recv_multipart

1.await output_socket.recv_multipart() 2.decode 收到的 token 3.填outputs_queue.put_nowait()

process_outputs_socket

唤醒

唤醒

BOSS

Bro1

Bro2

scheduler

execute_model

KV Cache

connector

RDMA IBverbs

循环以上步骤1-6 就是 EngineCore.step()