⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠ You can decompress Drawing data with the command palette: ‘Decompress current Excalidraw file’. For more info check in plugin settings under ‘Saving’
Excalidraw Data
Text Elements
req from precoess2 main thread
是self.request 记录的新请求吗
yes
no
2.streaming_queue
1.waiting queue(当前等待队列)
抢占策略 当reqA, reqB, reqC 已经进 running
reqA 到最后的 reqC 每次自己算完要新的 block 存 cache 的时候就会
reqA
reqB
reqC
从左往右给每个请求 需要多少 kv block 算好
情况 A: 遍历到第二个请求 block 就不够了 就把最后的请求 C丢回 waitting
情况 B: C 后面请求的丢回后还不够 把自己丢回 waitting
running loop
waiting loop
1.num_tokens_with_spec - num_computed_tokens (还差多少 token 算完=目标-当前) 2. config 的切块阈值 3. token_budget 4. max_model_len
取 min
num_new_token
kv_cache_manager
allocate_slots
token_budget 减少当前num_new_token
(tips:decode 就是 1)
本地:get_computed_blocks 看 prefix cache 命中的 block和 token 远端:走 xxxConnector 的 get_num_new_matched_tokens 看远端 prefill帮忙计算完的 token 数
prefill 算了
allocate_slots (delay block)
skipped_waiting 或waiting 队列取出高优请求
_try_promote_blocked_waiting_request 看当前 request 在不在finished_recving_kv_req_ids 内
在,kv cache 传完了,正常往下调度 status 改成 waiting (tips:cache block,把 delay 给补登记)
不在,kv cache没传完 标记为 WAITING_FOR_REMOTE_KVS 放到step_skipped_waiting
1token
if round 0
else:
看 status,不是 等待状态就直接放到 running
tips:这里存在其他更多判断条件决定当前的请求 需要先放到step_skipped_waiting,例如 grammar 没好,lora放不下,多模态没 prefetch 等
所有step_skipped_waiting 放进 skipped_waiting 等下次 schedule
get_num_common_prefix_blocks 计算 cascade 公共前缀
拆 new/cache,新请求发全量,旧请求发 delta
1.打包各类信息为schedulerOutput 2.调用connector.build_connector_meta(), 把计划告诉 xxx_connector, worker 靠这个指导搬运 kv cache
调度后推进状态 提前假设已调度的已经被完成 让下一个 step 继续调度时知 道之前算到哪了
skipped_waiting queue(已跳过的,最高优)
step_skipped_waiting(中转站)
waiting 等待队列
running 正在被处理的请求队列
scheduler
executor
如果分配失败
tips:假设被抢的 request 的 kv block3,7,8,9 被释放; 即在 free_block_queue 内的对应 block 的 ref count==0
free_block_queue
B9
B8
B7
B3
token0 token1
token4 token5
token6 token7
token2 token3
B3
B7
B8
B9
h0 = hash(种子, [t0..t1])
h1 = hash(h0, [t2..t3])
h2 = hash(h1, [t4..t5])
h3 = hash(h2, [t6..t7])
waiting 的命中缓存和 running 的抢占说明: 被抢占后的请求再次进 waiting 只发现 B7 能在hash表内找到,于是 就直接取出 B7,然后 get_new_block(在free_block_queue里面 popleft) 取出 B9 和 B8 这两个 block 给重计算使用。
reuse
re
cached_block_hash_to_block
B9
B8
B7
B3
h0
h1
h2
h3
对应关系
hash 表
空闲队列
connector
- synchronize 一次 并插一个cuda event
- _update_states(scheduler_output)
- _prepare_inputs(), 返回 logits_indices和spec_decode_metadata
- _determine_batch_execution_and_padding() 决定 cudaGraphMode+padding 策略
- _build_attention_metadata() 构建 attention 的 metadata
- _preprocess() 组装 input_ids, positions, inputs_embeds
with synchronize_input_prep
with set_forward_context
_model_forward 去调用 self.model(input_ids, positions, …) 拿到hidden_states
logits=compute_logits(sample_hidden_states)
sample_hidden_states=hidden_states[logits_indices]
以上的输出打包成execute_model_state
batch tensor 组装
slot_mapping[flat_token_i] = block_table[req_i, position_i // block_size] * block_size + position_i % block_size
prefill
decode
/v1/completions {do_remote_decode, max_tokens=1, 完整prompt}
/v1/completions {do_remote_prefill, remote_host=P:8998, 同request_id, 完整prompt}
router.py
client
scheduler(step):1. 8 条 8k 输入,分配 576x8 个 block
scheduler(step):1. 8 条 8k 输入,分配 576x8 个 block
3.WAITING_FOR_REMOTE_KVS ^JIX5zNXj
Paged kv cache
B3
t0 k/v
t1 k/v
…
t15 k/v
slots: 0 1 … 15
-
build_connector_meta,维护reqs_to_send
-
build_connector_meta,维护reqs_to_recv
worker
start_load_kv _receive_kv encode{D地址, req id, D的block_ids}
worker
start_load_kv 检查reqs_to_send 有东西吗?
forward 如果好了就更新 _reqs_need_send[id]=block_ids
ZMQ
10 个后台 sender 线程
batch_transfer_sync_write
ZMQ
done,go on decode attention
总的就是 all kv cache
layer_0 tensor: [block_0][block_1]…[block_147455] ← 1.125GB
layer_1 tensor: [block_0][block_1]…[block_147455] ← 1.125GB
…
layer_47 tensor: [block_0][block_1]…[block_147455] ← 1.125GB
block_table 里存的就是这个 block_id 列表,所有 layer 的 attention 计算都读同一张 block_table
一层内的 block 之间地址连续,不同层之间的 block 不连续所以每个 token 在不同层上都要单独用一个 block,用 block_table 保存
mooncake
get_rpc_port + batch_register_memory
get_rpc_port + batch_register_memory
各自整块 kv cache的 mr 地址范围,rkey,NIC GID 等注册 在 host:port 等对端 的RPC 服务来取
mooncake
batch_transfer_sync_write
openSegment
allocateBatchID
submitTransferWithNotify or submitTransfer
closeSegment
RdmaTransport:: submitTransferTask
rdma
transport
ibv_post_send
request1—n
slice1
slice2
slice3 …
lkey
dst_addr src_addr
lkey
dst_addr src_addr
lkey
dst_addr src_addr
slice1
slice2
slice3 …
lkey
dst_addr src_addr
lkey
dst_addr src_addr
lkey
dst_addr src_addr
WorkerPool:: submitPostSend()
slice1
slice2
slice3 …
lkey
dst_addr src_addr
lkey
dst_addr src_addr
lkey
dst_addr src_addr
rkey peer_nic retry_cnt
rkey peer_nic retry_cnt
rkey peer_nic retry_cnt
worker pool thread:: transferWorker 从当前 slice que 里面取出来多个 slice 然后走 ibv_post_send
getBatchTransferStatus
ibv_poll_cq
第一次会拿回 整个kv cache buffer 的地址范围和 rkey
workpool 以 RNIC 为对象 创建资源如下:
2QP 512 wr per QP 1CQ 2thread
slice 就是指针,QP 内有 depth 数组 来存 slice→wr 的在飞数量,同步更新 到depth内(原子计数),在下次post_send 前通过看这个给不同的 QP 再次灌满 wr
tips:1.mooncake 这里一个 gpu 可选一堆 workpool,一个 nic 坏了直接换个 pool 2.mooncake 每个 thead 即下 wr 又 pool cq(看一眼 wc,非强行推到对应的 wr_id处) 3.ucx(nixl)/uccl等是 4thread 下 wr,1 个 thread pool cq 4.mooncake多线程下batch_transfer_sync_write,其他多线程下 wr,乐
slice queue
shard0
shard1
shard3
shard4
shard5
shard6
shard7
nic0
nic1
nic2
nic3
nic4
nic5
nic6
nic7
上层10 个线程写各自 tprank 通讯的 shard,底层每个 nic 的 workpool 内的资源去处理各自的 shard 内的 slice,每个 slice 又会携带计数,状态告知上层,成功就继续 不成功就容错
mooncake 的多线程大致示意图
step
step
shape of layer i = (2, num_blocks, block_size, num_kv_heads, head_dim)
① blocks-first:
② kv-first:
shape of layer i = (num_blocks, 2, block_size, num_kv_heads, head_dim)