⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠ You can decompress Drawing data with the command palette: ‘Decompress current Excalidraw file’. For more info check in plugin settings under ‘Saving’
Excalidraw Data
Text Elements
async_llm(子进程+异步)
llm_engine(子进程+同步)
SyncMPClient
AsyncMPClient
InprocClient
online http/api
无子进程
offline
offline
DPLB AsyncMPClient
DP AsyncMPClient
1.input_processor.process_inputs() 把文字 “讲个笑话” 分词变成 token id [123, 456, …] 2.await _send_input_message(下面 zmq 要发的) 3.用 asyncio.create_task 起一个常驻任务 4.用 asyncio.create_task 起一个常驻任务 5.构造一个 格式的 O1 ,然后挂起,释放 GIL
接着循环上面的步骤125,构造一个 O2,挂起,上层会等所有 O1、O2的 req
ZMQ 发送 (msgpack 序列化的 token id)
process1(EngineCoreClient进程)
process2(EngineCore 进程)
thread1(process_input_socket)
main thread(run busy loop)
thread3(preocess_output_socket)
-
与 process1 创建 ZMQ socket注册到 Poller
-
Poller.recv_multipart阻塞接收数据
-
解码数据,并判断数据是 add 还是 abort
-
数据入队input_queue,是 abort 请求的话 入队aborts_queue
-
与 process1 创建 ZMQ socket
-
output_queue 内拿 scheduler 返回的 推理结果output
-
output编码后放到 zmq 的 buffer 池子
-
socket.send_multipart把这个 buffer 池子里的数据都零拷贝给 client
1.清空input_queue内的 req,交给scheduler.add_request() 2.scheduler.schedule(), 决定这一轮跑哪些 req、多少 token、分配 kv cache block(prefix cache 命中也在这里判断); 返回SchedulerOutput(token id、block table,哪些需要 prefill/decode); 3.executor.execute_model() 把SchedulerOutput发给 executor(原来的 worker 抽象); GPU 开始 attn+ffn,返回 Future(不管结果,去干别的事情) 4.get_grammar_bitmask,计算 JSON schema / regex 的 token mask 4.5 future.result() 5.sample_tokens(),拿到Future结果,做采样,选下一个 token 6.update_from_output,Scheduler拿到新 token,更新 req 状态 7.output_queue.put()
GPU workers
nvidia
分配 kv cache block (cache 已被 xxConnector 注册过 MR)
attention+FFN
logits/tokens
采样
更新req状态
CPU 同时计算 mask
req status
sample
result
mask
请求执行
block id
ZMQ 返回 token id 和 logprobs 张量
AsyncMPClient
add_request(多少条请求就执行多少次)
v1/chat/completions
engineCoreClient
await
output_handler
1.await get_output_async,从 client 的 outputs_queue 拿一批 2.process_outputs - req_state.detokenizer.update() 解码 - req_state.logprobs_processor.update_from_output() 算 logprobs - req_state.queue.put() 塞进那个请求专属的 唤醒 O1 唤醒 O2 …
RequestOutputCollector
RequestOutputCollector
output_handler
process_outputs_socket
recv_multipart
send_multipart
send_multipart
recv_multipart
1.await output_socket.recv_multipart() 2.decode 收到的 token 3.填outputs_queue.put_nowait()
process_outputs_socket
唤醒
唤醒
BOSS
Bro1
Bro2
scheduler
execute_model
KV Cache
connector
RDMA IBverbs
循环以上步骤1-6 就是 EngineCore.step()