徐昀泽 360云计算 java
女主宣言算法
Kafka 做为一个支持实时处理大量请求的分布式流处理平台,须要一个设计良好的定时器来处理异步任务。本文做者将基于 Kafka 1.1.0 版本的源码来介绍 Kafka 中定时器的基础数据结构——时间轮的原理和实现。数组
PS:丰富的一线技术、多元化的表现形式,尽在“360云计算”,点关注哦!数据结构
1app
简单时间轮异步
简单时间轮是时间任务桶的循环链表,又被称为 桶(bucket) 。令 u 为时间单元大小,一个大小为 n 的时间轮有 n 个桶,可以持有 n * u 个定时任务,每一个任务的过时时间会落在一个时间间隔内。(注:下文的 u 和 n 沿用这个定义)每一个桶持有进入相应时间范围的定时任务。第一个桶持有 [0, u) 范围的任务,第二个桶持有 [u, 2u) 范围的任务……第 n 个桶持有 [u * (n - 1), u * n) 范围的任务。每过一个时间单元 u,定时器会推动并移动到下个桶,而后第一个桶中全部的定时任务都会过时。因为任务已通过期,此时定时器不会插入任务到当前桶中。定时器会马上运行过时的任务。由于空桶在下一轮是可用的,因此若是当前的桶对应时间 t,那么它会在推动后变成 [t + u * n, t + (n + 1) * u) 的桶。分布式
本质上时间轮就是个哈希表,经过对任务的过时时间求哈希,落到对应的位置。而每一个位置对应的 bucket 是链表,所以时间轮插入/删除定时任务的时间复杂度是 O(1)。而基于优先队列的定时器,好比 java.util.concurrent.DelayQueue 和 java.util.Timer 插入/删除的时间复杂度是 O(log n)。ide
2函数
分层时间轮性能
简单时间轮的主要缺点是它假定定时器请求是在从当前时刻开始的 n * u 时间间隔内,若是定时器请求超出了这个间隔就会产生溢出,致使任务没法放入时间轮中。分层时间轮会处理这种溢出,它以层次来组织时间轮,最底层的精度更高,层数越高,表示的精度更低。这里用精度来指代时间单元大小。举例说明,令 u = 1, n = 3,设起始时刻是 c,则各层次的桶为
层次 |
桶 | 精度 |
1 |
[c,c] [c+1,c+1] [c+2,c+2] | 1 |
2 | [c,c+2] [c+3,c+5] [c+6,c+8] | 3 |
3 | [c,c+8] [c+9,c+17] [c+18,c+26] | 9 |
PS:这里沿用了代码注释里的表示,即闭区间,而前面讲述原理时都是左闭右开区间,二者是等价的,只是表示不一致。
在 c+1 时刻,桶 [c,c]、[c,c+2]、[c,c+8]过时了,以后:
1 层的时钟移动到 c+1,而且建立新的桶 [c+3,c+3];
二、3 层的时钟仍然在 c 处,由于他们没彻底过时。
此时各层次的桶为:
层次 |
桶 | 精度 |
1 |
[c+1,c+1] [c+2,c+2] [c+3,c+3] | 1 |
2 | [c,c+2] [c+3,c+5] [c+6,c+8] | 3 |
3 |
[c,c+8] [c+9,c+17] [c+18,c+26] | 9 |
注意,桶 [c,c+2] 不会接收任何任务,由于此时时刻是 c+1,只有过时时间为 c+1 和 c+2 才会被分配到该桶,然而 1 层的两个桶 [c+1,c+1] [c+2,c+2] 会优先接收任务。相似地,3 层的 [c+1,c+8] 也不会接收任何任务,由于这个范围被 2 层的桶覆盖了。
对单层时间轮,插入/删除定时任务的时间复杂度都是 O(1)。对分层时间轮,令 m 是时间轮的数量,则插入的时间复杂度是 O(m),由于最多向上插 m 次。相比起系统中请求的数量,m 一般是小不少的。而删除的时间复杂度是 O(1)。
像时钟就是一个典型的三层时间轮,秒针能表示 0 到 59 秒,可是对 60 秒以上则须要分针进一步表示,再进一步即时针,一共能表示的时间范围为 0 到 43199 秒,精度为 1 秒。从秒针到分针到时针,表示精度是依次下降的,秒针精度为 1 秒,有 60 格,所以分针精度是 1 * 60 = 60 秒,相似地,时钟精度是 3600 秒。
3
TimingWheel 的实现
内部字段
名称 |
类型 |
说明 |
tickMs | Long |
时间单元 u |
wheelSize | Int |
桶的数量 n |
startMs | Long |
毫秒级时间戳 |
taskCounter | AtomicInteger | 任务数量,即全部桶的节点数量之和 |
queue | DelayQueue[TimerTaskList] | 标准库的延时队列 |
经过上述主构造参数能够计算出如下私有字段(private[this],能够被包内其余类访问)
// 当前时间轮的整个时间跨度,即更高一层时间轮的 tickMs private[this] val interval = tickMs * wheelSize // 建立 wheelSize 个桶(定时任务链表) private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
// 向下取整,使起始时间戳能被 tickMs 整除 private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
// 高一层时间轮,用来保存超过 interval 的任务 @volatile private[this] var overflowWheel: TimingWheel = null
经过 addOverflowWheel 建立高一层时间轮:
private[this] def addOverflowWheel(): Unit = { synchronized { if (overflowWheel == null) { // 双重检查上锁 overflowWheel = new TimingWheel( // 仅有 tickMs 不是原封不动地转发低层时间轮的字段,由于高层时间轮的时间单元粒度更粗(即精度更低) // 仍是参考时钟,时针的 tickMs 是分针 tickMs 的 60 倍 tickMs = interval, wheelSize = wheelSize, startMs = currentTime, taskCounter = taskCounter, queue ) } } }
添加定时任务
在 Kafka 中,定时任务被抽象为 TimerTaskEntry 类,而桶(定时任务链表)则被抽象为 TimerTaskList 类,在代码中命名都是 bucket(桶)。bucket 实现了 java.util.concurrent.Delayed 接口:
def getDelay(unit: TimeUnit): Long = { unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS) }
所以 bucket 可以被加入延时队列中,延时队列在调用 poll 时,会调用内部对象的 getDelay 方法来判断对象是否能够被弹出。再看看实际的 add 实现:
def add(timerTaskEntry: TimerTaskEntry): Boolean = { // 定时任务的过时时间戳 val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) { // Entry 绑定的 TimerTask 调用了 cancel() 方法主动将 Entry 从链表中移除 false } else if (expiration < currentTime + tickMs) { // 过时时间在第一个桶的范围内,表示已通过期,此时无需加入时间轮 false } else if (expiration < currentTime + interval) { // 过时时间在当前时间轮能表示的时间范围内,加入到其中一个桶 // 注意按照这个算法,第一个桶的时间范围是 [c+u,c+u*2),由于 [c,c+u) 范围内被视为已过时 // 并且第一个桶对应 buckets 的下标并不必定是 0,由于数组只是做为循环队列的存储方式,起始下标无所谓 val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry)
// 设置过时时间,这里也取整了,便可以被 tickMs 整除 if (bucket.setExpiration(virtualId * tickMs)) { // 仅在新的过时时间和以前的不一样才返回 true // 因为进行了取整,同一个 bucket 全部节点的过时时间都相同,所以仅在 bucket 的第一个节点加入时才会进入此 if 块 // 所以保证了每一个桶只会被加入一次到 queue 中,queue 存放全部包含定时任务节点的 bucket // 借助 DelayQueue 来检测 bucket 是否过时,bucket 时遍历便可取出全部节点 queue.offer(bucket) } true } else { // 过时时间在当前时间轮表示的范围以外,即溢出,须要建立高一层时间轮来加入 if (overflowWheel == null) addOverflowWheel() // 双重检查上锁的第一层检查 overflowWheel.add(timerTaskEntry) // 注意高一层时间轮也可能没法容纳,所以可能会递归建立更高层级的时间轮 } }
能够看到 DelayQueue 对象 queue 在时间轮的做用是,保存包含定时任务节点的桶,桶能够来自不一样层次的时间轮,固然,全部层次时间轮也共享这个队列。
TimingWheel 自己没有实现推动功能,而是借助延迟队列 DelayQueue 来实现时间的推移,假设有 M 个定时任务分布在 N 个桶中,那么插入的时间复杂度为 O(M + N * log N),其中 M >= N。若是把任务全存到延迟队列中,那么插入的时间复杂度为 O(M * log M),所以 Kafka 时间轮的优化是有意义的。
时间轮的推动
def advanceClock(timeMs: Long): Unit = { if (timeMs >= currentTime + tickMs) { // timeMs 超过了当前 bucket 的时间范围 currentTime = timeMs - (timeMs % tickMs) // 修改当前时间,即原先的第一个桶已经失效
// 若存在更高层的时间轮,则也会向前运转 if (overflowWheel != null) overflowWheel.advanceClock(currentTime) } }
仅仅是修改 currentTime,该字段决定内部的 bucket 是否过时,见前面的 add 方法实现。
4
时间轮在 Kafka 管理定时任务中的做用
private class ExpiredOperationReaper extends ShutdownableThread(/* ... */) { // doWork 方法会在线程函数,即基类 Thread 的 run 方法中循环调用 override def doWork() { advanceClock(200L) // 200 ms } }而 purgatory 是使用 kafka.utils.timer 包下的 SystemTimer 类做为定时器的:
def apply[T <: DelayedOperation](purgatoryName: String, /* ... */): DelayedOperationPurgatory[T] = { val timer = new SystemTimer(purgatoryName) new DelayedOperationPurgatory[T](purgatoryName, timer, /* ... */) }而在 SystemTimer 中,关键字段为时间轮对象 timingWheel:
// java.util.concurrent 包提供的延时队列 private[this] val delayQueue = new DelayQueue[TimerTaskList]() private[this] val taskCounter = new AtomicInteger(0) // Kafka 在 kafka.utils.timer 包中自行实现的时间轮 private[this] val timingWheel = new TimingWheel( tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue )其 advanceClock 方法其实是调用 timingWheel.advanceClock 方法:
def advanceClock(timeoutMs: Long): Boolean = { // 从延时队列中等待 timeout 毫秒,如有过时 bucket 则取出 var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) { // 存在过时 bucket writeLock.lock() try { while (bucket != null) { // 推动当前时间轮,内部可能会递归推动更高一层时间轮,currentTime 被修改 timingWheel.advanceClock(bucket.getExpiration()) bucket.flush(reinsert) // 默认 timeout 为 0,即非阻塞,也就是说尽量取出当前时刻全部过时的 buckets bucket = delayQueue.poll() } } finally { writeLock.unlock() } true } else { false } }可见,SystemTimer 对象在调用 advancedClock 推动时间时,实际上是从延时队列中取出推动的时间内全部过时的 bucket,而后 flush:
// Remove all task entries and apply the supplied function to each of them def flush(f: (TimerTaskEntry)=>Unit): Unit = { synchronized { // 遍历整个 bucket(链表),remove 删除全部节点 var head = root.next while (head ne root) { remove(head) f(head) head = root.next } expiration.set(-1L) } }注意到,传入 flush 的是 reinsert 函数:
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)问题来了,为啥删除以后又要从新插入呢?由于若是取出的这个 bucket 是属于高层时间轮的,因为高层时间轮精度不够,此时 bucket 可能并未过时。举个两层时间轮的例子(单位:毫秒):
层次 |
桶 |
1 |
[0,1) [1,2) |
2 | [0,2) [2,4) |
层次 |
桶 |
1 | [2,3) [3,4) |
2 | [2,4) [4,6) |
5
总结
若是你们有什么建议或疑问,能够在下方留言交流。