⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠ You can decompress Drawing data with the command palette: ‘Decompress current Excalidraw file’. For more info check in plugin settings under ‘Saving’

Excalidraw Data

Text Elements

req from precoess2 main thread

是self.request 记录的新请求吗

yes

no

2.streaming_queue

1.waiting queue(当前等待队列)

抢占策略 当reqA, reqB, reqC 已经进 running

reqA 到最后的 reqC 每次自己算完要新的 block 存 cache 的时候就会

reqA

reqB

reqC

从左往右给每个请求 需要多少 kv block 算好

情况 A: 遍历到第二个请求 block 就不够了 就把最后的请求 C丢回 waitting

情况 B: C 后面请求的丢回后还不够 把自己丢回 waitting

running loop

waiting loop

1.num_tokens_with_spec - num_computed_tokens (还差多少 token 算完=目标-当前) 2. config 的切块阈值 3. token_budget 4. max_model_len

取 min

num_new_token

kv_cache_manager

allocate_slots

token_budget 减少当前num_new_token

(tips:decode 就是 1)

本地:get_computed_blocks 看 prefix cache 命中的 block和 token 远端:走 xxxConnector 的 get_num_new_matched_tokens 看远端 prefill帮忙计算完的 token 数

prefill 算了

allocate_slots (delay block)

skipped_waiting 或waiting 队列取出高优请求

_try_promote_blocked_waiting_request 看当前 request 在不在finished_recving_kv_req_ids 内

在,kv cache 传完了,正常往下调度 status 改成 waiting (tips:cache block,把 delay 给补登记)

不在,kv cache没传完 标记为 WAITING_FOR_REMOTE_KVS 放到step_skipped_waiting

1token

if round 0

else:

看 status,不是 等待状态就直接放到 running

tips:这里存在其他更多判断条件决定当前的请求 需要先放到step_skipped_waiting,例如 grammar 没好,lora放不下,多模态没 prefetch 等

所有step_skipped_waiting 放进 skipped_waiting 等下次 schedule

get_num_common_prefix_blocks 计算 cascade 公共前缀

拆 new/cache,新请求发全量,旧请求发 delta

1.打包各类信息为schedulerOutput 2.调用connector.build_connector_meta(), 把计划告诉 xxx_connector, worker 靠这个指导搬运 kv cache

调度后推进状态 提前假设已调度的已经被完成 让下一个 step 继续调度时知 道之前算到哪了

skipped_waiting queue(已跳过的,最高优)

step_skipped_waiting(中转站)

waiting 等待队列

running 正在被处理的请求队列

scheduler

executor

如果分配失败

tips:假设被抢的 request 的 kv block3,7,8,9 被释放; 即在 free_block_queue 内的对应 block 的 ref count==0

free_block_queue

B9

B8

B7

B3

token0 token1

token4 token5

token6 token7

token2 token3

B3

B7

B8

B9

h0 = hash(种子, [t0..t1])

h1 = hash(h0, [t2..t3])

h2 = hash(h1, [t4..t5])

h3 = hash(h2, [t6..t7])

waiting 的命中缓存和 running 的抢占说明: 被抢占后的请求再次进 waiting 只发现 B7 能在hash表内找到,于是 就直接取出 B7,然后 get_new_block(在free_block_queue里面 popleft) 取出 B9 和 B8 这两个 block 给重计算使用。

reuse

re

cached_block_hash_to_block

B9

B8

B7

B3

h0

h1

h2

h3

对应关系

hash 表

空闲队列

connector

  1. synchronize 一次 并插一个cuda event
  2. _update_states(scheduler_output)
  3. _prepare_inputs(), 返回 logits_indices和spec_decode_metadata
  4. _determine_batch_execution_and_padding() 决定 cudaGraphMode+padding 策略
  5. _build_attention_metadata() 构建 attention 的 metadata
  6. _preprocess() 组装 input_ids, positions, inputs_embeds

with synchronize_input_prep

with set_forward_context

_model_forward 去调用 self.model(input_ids, positions, …) 拿到hidden_states

logits=compute_logits(sample_hidden_states)

sample_hidden_states=hidden_states[logits_indices]

以上的输出打包成execute_model_state

batch tensor 组装

slot_mapping[flat_token_i] = block_table[req_i, position_i // block_size] * block_size + position_i % block_size

prefill

decode

/v1/completions {do_remote_decode, max_tokens=1, 完整prompt}

/v1/completions {do_remote_prefill, remote_host=P:8998, 同request_id, 完整prompt}

router.py

client

scheduler(step):1. 8 条 8k 输入,分配 576x8 个 block

scheduler(step):1. 8 条 8k 输入,分配 576x8 个 block

             3.WAITING_FOR_REMOTE_KVS ^JIX5zNXj

Paged kv cache

B3

t0 k/v

t1 k/v

t15 k/v

slots: 0 1 … 15

  1. build_connector_meta,维护reqs_to_send

  2. build_connector_meta,维护reqs_to_recv

worker

start_load_kv _receive_kv encode{D地址, req id, D的block_ids}

worker

start_load_kv 检查reqs_to_send 有东西吗?

forward 如果好了就更新 _reqs_need_send[id]=block_ids

ZMQ

10 个后台 sender 线程

batch_transfer_sync_write

ZMQ

done,go on decode attention

总的就是 all kv cache

layer_0 tensor: [block_0][block_1]…[block_147455] ← 1.125GB

layer_1 tensor: [block_0][block_1]…[block_147455] ← 1.125GB

layer_47 tensor: [block_0][block_1]…[block_147455] ← 1.125GB

block_table 里存的就是这个 block_id 列表,所有 layer 的 attention 计算都读同一张 block_table

一层内的 block 之间地址连续,不同层之间的 block 不连续所以每个 token 在不同层上都要单独用一个 block,用 block_table 保存

mooncake

get_rpc_port + batch_register_memory

get_rpc_port + batch_register_memory

各自整块 kv cache的 mr 地址范围,rkey,NIC GID 等注册 在 host:port 等对端 的RPC 服务来取

mooncake

batch_transfer_sync_write

openSegment

allocateBatchID

submitTransferWithNotify or submitTransfer

closeSegment

RdmaTransport:: submitTransferTask

rdma

transport

ibv_post_send

request1—n

slice1

slice2

slice3 …

lkey

dst_addr src_addr

lkey

dst_addr src_addr

lkey

dst_addr src_addr

slice1

slice2

slice3 …

lkey

dst_addr src_addr

lkey

dst_addr src_addr

lkey

dst_addr src_addr

WorkerPool:: submitPostSend()

slice1

slice2

slice3 …

lkey

dst_addr src_addr

lkey

dst_addr src_addr

lkey

dst_addr src_addr

rkey peer_nic retry_cnt

rkey peer_nic retry_cnt

rkey peer_nic retry_cnt

worker pool thread:: transferWorker 从当前 slice que 里面取出来多个 slice 然后走 ibv_post_send

getBatchTransferStatus

ibv_poll_cq

第一次会拿回 整个kv cache buffer 的地址范围和 rkey

workpool 以 RNIC 为对象 创建资源如下:

2QP 512 wr per QP 1CQ 2thread

slice 就是指针,QP 内有 depth 数组 来存 slicewr 的在飞数量,同步更新 到depth内(原子计数),在下次post_send 前通过看这个给不同的 QP 再次灌满 wr

tips:1.mooncake 这里一个 gpu 可选一堆 workpool,一个 nic 坏了直接换个 pool 2.mooncake 每个 thead 即下 wr 又 pool cq(看一眼 wc,非强行推到对应的 wr_id处) 3.ucx(nixl)/uccl等是 4thread 下 wr,1 个 thread pool cq 4.mooncake多线程下batch_transfer_sync_write,其他多线程下 wr,乐

slice queue

shard0

shard1

shard2

shard3

shard4

shard5

shard6

shard7

nic0

nic1

nic2

nic3

nic4

nic5

nic6

nic7

上层10 个线程写各自 tprank 通讯的 shard,底层每个 nic 的 workpool 内的资源去处理各自的 shard 内的 slice,每个 slice 又会携带计数,状态告知上层,成功就继续 不成功就容错

mooncake 的多线程大致示意图

step

step

shape of layer i = (2, num_blocks, block_size, num_kv_heads, head_dim)

① blocks-first:

② kv-first:

shape of layer i = (num_blocks, 2, block_size, num_kv_heads, head_dim)