Post

Tokio runtime 设计与实现

Tokio runtime 设计与实现

Runtime 核心数据

Work

每个 worker 线程对应一个 Worker 实例,它包含:

  • 本地任务队列 local

    • 类型:LifoDeque,最大容量 256。
    • 用于存储由本线程创建或唤醒的任务。
    • 优先被调度执行,只有在空时才访问全局队列。
    • 支持任务的 工作窃取(work stealing)。
  • LIFO Slot(后进先出槽)

    • 类型:Option
    • 用于快速调度刚被当前线程唤醒的任务,避免任务落入队列从而节省调度开销。
    • 最多只能存一个任务;若已有任务,会将旧任务退回本地队列。
    • 每个线程独享,不可被其它线程窃取
  • 运行中的任务计数和调度指标

    • 用于决定是否切换调度来源,例如是否从本地队列换到全局队列。

GlobalQueue: 跨线程任务共享队列

  • 类型:Injector(来自 crossbeam 的 work-stealing 队列实现)。

  • 用途:所有线程可并发向此队列推入任务;当某线程本地队列为空时,会尝试从此队列中拉取任务。

  • 特殊用例:来自非 worker 线程的 tokio::spawn 会将任务直接插入全局队列。

Scheduler: 调度器结构

每个 Tokio 多线程 runtime 启动时会创建一个 Scheduler,它持有:

  • 所有的 Worker 实例(例如一个数组 [Worker; N]);
  • 一个全局队列 GlobalQueue;
  • 控制调度行为的参数(如 global_queue_interval、event_interval);
  • Parker 与 Unparker,用于线程休眠与唤醒;
  • 用于任务 stealing 的原子引用等辅助数据结构。

调度流程简略:

1
2
3
4
5
6
7
loop {
    // 优先从 lifo slot 取任务
    // 然后尝试从本地队列 pop
    // 如果空了,每隔 global_queue_interval 次尝试拉取全局任务
    // 若还空,尝试从其他 worker 的本地队列偷任务(steal)
    // 若全部失败,则线程休眠
}

Task:可调度的异步任务

  • Tokio 内部封装了每个 Future 为 Task。

  • Task 实际是指针封装(Arc),里面包含:

    • RawTask 的状态控制器;
    • Waker;
    • Future 对象;
    • 执行上下文等。
  • Task 被压入队列、从队列取出并 poll,是 Tokio 调度的最小单元。

Shared:所有线程共享的调度上下文

  • 多线程调度器中的某些结构(如全局队列、metrics、配置项)被封装在一个共享的 Shared 结构中,所有 Worker 持有其引用。

  • 类型典型是 Arc

可视化结构关系(简化)

1
2
3
4
5
6
7
8
9
10
11
12
Runtime
│
├─ Scheduler
│  ├─ GlobalQueue (Injector<Task>)
│  ├─ [Worker; N]
│      ├─ local: LifoDeque<Task>
│      ├─ lifo_slot: Option<Task>
│      ├─ metrics
│
└─ Shared
   ├─ runtime config
   ├─ event loop driver

Tokio 多线程调度器通过 本地队列 + 全局队列 + LIFO slot + 工作窃取 的组合设计,实现了:

  • 高并发、低延迟的任务调度;
  • 局部性优化(lifo + 本地队列);
  • 线程饥饿避免与全局公平性(全局队列 + stealing);
  • IO / timer 驱动集成(driver + event loop);

它是一个 lightweight 且高度优化的任务调度器,适合处理大规模的异步负载。

Task Steal流程分析

steal_into2 是 Tokio 多线程调度器中工作窃取逻辑的核心实现之一。这个函数通过原子操作安全地从一个 worker 的本地任务队列中“偷”任务,并把它们搬运到另一个线程的本地缓冲区中。下面我将逐步拆解并解释它的关键逻辑与数据结构含义。

数据结构

1
2
3
4
5
pub(crate) struct Inner<T> {
    head: AtomicUnsignedLong,  // 两个 UnsignedShort,打包成一个 u32/u64
    tail: AtomicUnsignedShort, // 本地 thread 使用
    buffer: Box<[UnsafeCell<MaybeUninit<Notified<T>>>; LOCAL_QUEUE_CAPACITY]>
}

其中 head 被编码为一个打包结构:(steal_head, real_head),防止 ABA 问题 + 多线程冲突: • real_head: 当前任务头部位置(被 local 线程或 stealer 消费) • steal_head: 如果两者不同,说明一个 stealing 过程正在进行,其他偷线程必须等待

steal_into2 步骤详解

Step 1:尝试原子声明“我要偷任务了”

1
2
let (src_head_steal, src_head_real) = unpack(prev_packed);
let src_tail = self.0.tail.load(Acquire);

• 若 src_head_steal != src_head_real:说明别人已经在偷,不能同时进行。 • 计算可偷数量 n = (tail - head) / 2,防止过度搬运。

然后构造新的 head:

1
2
let steal_to = src_head_real.wrapping_add(n);
let next_packed = pack(src_head_steal, steal_to);

• 注意 steal_to 是你要“抢占到”的尾部,接下来用 CAS 尝试写入。

Step 2:原子地声明“我偷到了这些任务段”

1
self.0.head.compare_exchange(prev_packed, next_packed, AcqRel, Acquire)

• 成功后,当前线程就“独占”了 [src_head_real, steal_to) 这段任务。 • 所有其他 stealer 会发现 head_steal != head_real,而自动跳过。

Step 3:真正去 buffer 里读取任务、写入目标缓冲区

1
2
3
4
5
6
7
8
9
10
for i in 0..n {
    let src_idx = src_pos & MASK;
    let dst_idx = dst_pos & MASK;

let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });

dst.inner.buffer[dst_idx]
    .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });

}

​ 这里读写都是 UnsafeCell,因为 Rust 的类型系统不允许 &T 下的修改。我们确保当前没有数据竞争,因此使用 unsafe 是合理的。

Step 4:原子更新 head_steal = head_real,宣告“偷取完成”

1
2
let head = unpack(prev_packed).1;
let next_packed = pack(head, head);

再次使用 compare_exchange 将 head_steal 设置为 head_real,其他线程就可以继续尝试偷了。

注意的点 • 如果偷取过程中失败了(CAS失败或被别人抢先偷),会重试或放弃; • 每次只偷任务队列中一半(n = n - n / 2),让本地线程保留一半任务,提升缓存局部性; • 用 AtomicUnsignedLong + pack/unpack 技巧避免使用两个原子变量,同时解决同步和 ABA 问题; • 整体保证:无锁、多线程安全、零拷贝搬运

This post is licensed under CC BY 4.0 by the author.