字节跳动单点恢复功能及 Regional CheckPoint 优化实践

简介: 本文介绍字节跳动在过去一段时间里作的两个主要的 Feature,一是在 Network 层的单点恢复的功能,二是 Checkpoint 层的 Regional Checkpoint。性能优化

做者|廖嘉逸网络

摘要:本文介绍字节跳动在过去一段时间里作的两个主要的 Feature,一是在 Network 层的单点恢复的功能,二是 Checkpoint 层的 Regional Checkpoint。内容包括:数据结构

单点恢复机制Regional Checkpoint在 Checkpoint 的其它优化挑战 & 将来规划并发

做者分享原版视频回顾:https://www.bilibili.com/video/BV13a4y1H7XY?p=2ide

1、单点恢复机制

在字节跳动的实时推荐场景中,咱们使用 Flink 将用户特征与用户行为进行实时拼接,拼接样本做为实时模型的输入。拼接服务的时延和稳定性直接影响了线上产品对用户的推荐效果,而这种拼接服务在 Flink 中是一个相似双流 Join 的实现,Job 中的任何一个 Task 或节点出现故障,都会致使整个 Job 发生 Failover,影响对应业务的实时推荐效果。高并发

在介绍单点恢复以前,咱们回顾一下 Flink 的 Failover 策略。性能

  • Individual-Failover:

只重启出错的 Task,适用于 Task 间无链接的状况,应用场景有限。测试

  • Region-Failover:

该策略会将做业中的全部 Task 划分为数个 Region。当有 Task 发生故障时,它会尝试找出进行故障恢复须要重启的最小 Region 集合。相比于全局重启故障恢复策略,这种策略在一些场景下的故障恢复须要重启的 Task 会更少。优化

若是使用 Region-Failover 策略,但由于 Job 是一个全链接的拓扑,自己就是一个大 region。重启 region 至关于重启整个 Job,因此咱们考虑是否能够用 Flink Individual-task-failover 策略去替代 Region-failover 策略?而 Individual-task-failover 的策略在这种拓扑下是彻底不适用的。因此咱们对于如下特征的场景,须要设计开发一个新的 Failover 策略:阿里云

  • 多流 Join 拓扑
  • 流量大(30M QPS)、高并发度(16K*16K)
  • 容许短期内小量部分数据丢失
  • 对数据持续输出型要求高

在讲述技术方案以前,看一下 Flink 现有的数据传输机制。

从左往右看(SubTaskA):

  1. 当数据流入时会先被 RecordWriter 接收
  2. RecordWriter 根据数据的信息,例如 key,将数据进行 shuffle 选择对应的 channel
  3. 将数据装载到 buffer 中,并放到 channel 对应的 buffer 队列里
  4. 经过 Netty Server 向下游发送
  5. 下游 Netty Client 接收数据
  6. 根据 buffer 中的分区信息,转发发到下游对应的 channel 中
  7. 由 InputProcessor 将数据从 buffer 中取出,执行 operator 逻辑

根据上面提出的思路咱们要解决如下几个问题:

  • 如何让上游 Task 感知下游 Failure
  • 下游 Task 失败后,如何让上游 Task 向正常的 Task 发送数据
  • 上游 Task 失败后,如何让下游 Task 继续消费 buffer 中的数据
  • 上下游中不完整的数据如何处理
  • 如何创建新的链接

针对上述问题提出解决方案。

■ 如何让上游 Task 感知下游 Failure

下游 SubTask 主动将失败信息传递给上游,或者 TM 被关闭上游 Netty Server 也能够感知到。图中用 X 表示不可用的 SubPartition。

首先将 SubPartition1 和对应的 view (Netty Server 用来取 SubPartition 数据的一个结构)置为不可用。

以后当 Record Writer 接收到新数据须要向 SubPartition1 发送数据,此时须要进行一个可用性判断,当 SubPartition 状态可用则正常发送,不可用直接丢弃数据。

■ 上游 Task 接收到下游 Task 新的链接

下游 subTask 被从新调度启动后,向上游发送 Partition Request,上游 Netty Server 收到 Partition Request 后从新给下游 SubTask 建立对用的 View, 此时上游 Record Writer 就能够正常写数据。

■ 下游 Task 感知上游 Task 失败

一样的下游 Netty Client 能感知到上游有 subTask 失败了,这时找出对应的 channel ,在末尾插入一个不可用的事件(这里用感叹号来表示事件)。咱们的目的是想要尽量的少丢数据,此时 channel 中的 buffer 任能够被 InputProcessor 正常消费,直到读取到“不可用事件”。再进行 channel 不可用标记和对应的 buffer 队列清理。

■ Buffer 中有不完整的数据

首先要知道不完整的数据存放在哪里,它存在于 input process 的内部,input process 会给每个 channel 维护一个小的 buffer 队列。当收到一个 buffer ,它是不完整的数据,那么等到接收到下一个 buffer 后再拼接成一条完整的数据发往 operator。

■ 下游 Task 和上游 Task 从新链接

当上游有问题的 Task 被从新调度后,经过调用 TaskManager API 来通知下游。下游 Shuffle Environment 收到通知后判断对应的 channel 状态,若是是不可,用直接生成新的 channel 并释放掉老的。若是是可用状态,说明 channel 的 buffer 没有消费完,须要等待 buffer 消费完再进行替换操做。

业务收益

上图是以 4000 并行度的做业为例作了对比测试。业务是将一个用户展示流和一个用户行为流的进行 Join,整个做业共有 12000个 Task。

上图中 单点恢复(预留资源)是使用调度组作的一个 feature,在申请资源的时,选择额外多申请一些资源,当发生 failover 时省去了从 YARN 去申请资源的时间开销。

最后作到了做业的输出减小千分之一,恢复时间约 5 秒。由于整个恢复过程时间较短,能够基本作到下游无感知。

2、Regional Checkpoint

在一个比较经典的数据集成场景,数据导入导出。好比从 Kafka 导入到 Hive,知足下面几个特征。

  • 拓扑中没有 All-to-All 的链接
  • 强依赖 Checkpoint 来实现 Exactly-Once 语义下的数据输出
  • Checkpoint 间隔长,对成功率要求高

在这种状况下,数据没有任何的 shuffle 。

在数据集成的场景中遇到哪些问题?

  • 单个 Task Checkpoint 失败会影响全局的 Checkpoint 输出
  • 网络抖动、写入超时/失败、存储环境抖动对做业的影响过于明显
  • 2000并行以上的做业成功率明显降低,低于业务预期

在这里,咱们想到做业会根据 region-failover 策略将做业的拓扑划分为多个 region。那么 Checkpoint 是否能够采起相似的思路,将 checkpoint 以 region 的单位来管理?答案是确定的。

在这种状况下不须要等到全部 Task checkpoint 完成后才去作分区归档操做(例如 HDFS 文件 rename)。而是当某个 region 完成后便可进行 region 级别的 checkpoint 归档操做。

介绍方案以前先简单回顾 Flink 现有的 checkpoint 机制。相信你们都比较熟悉。


现有 ckp

上图中是一个 Kafka source 和 Hive sink 算子的拓扑,并行度为 4 的例子。

首先 checkpoint coordinator 触发 triggerCheckpoint 的操做,发送到各个 source task。在 Task 收到请求以后,触发 Task 内的 operator 进行 snapshot 操做。例子中有 8 个 operator 状态。


现有 ckp1

在各 operator 完成 snapshot 后,Task 发送 ACK 消息给 checkpoint coordinator 表示当前 Task 已经完成了 Checkpoint。

以后当 coordinator 收到全部 Task 成功的 ACK 消息,那么 checkpont 能够认为是成功了。最后触发 finalize 操做,保存对应的 metadata。通知全部 Task checkpoint 完成。

当咱们使用 Region 方式去管理 checkpoint 时会遇到什么问题?

  • 如何划分 Checkpoint Region

把彼此没有链接的 Task 集合,划分为 1 个 region。显而易见例子中有四个 Region。

  • 失败 Region 的 Checkpoint 结果如何处理

假设第一次 checkpoint 能正常完成,每一个 operator 对应的状态都成功写入 HDFS checkpoint1 目录中,并经过逻辑映射,将 8 个operator 映射到 4 个 checkpoint region。注意仅仅是逻辑映射,并无对物理文件作出任何移动和修改。


现有 ckp1

第二次进行 checkpoint 时 region-4-data(Kafka-4,Hive-4)checkpoint 失败。checkpoint2 (job/chk_2)目录中没有对应 Kafka-4-state 和 Hive-4-state 文件,当前 checkpoint2 是不完整的。为了保证完整,从上一次或以前成功的 checkpoint 文件中寻找 region-4-data 成功的 state 文件,并进行逻辑映射。这样当前 checkpoint 每一个 region 状态文件就完整了,能够认为 checkpoint 完成。

此时若是发生大部分或全部 region 都失败,若是都引用前一次 checkpoint 那么当前这个 checkpoint 和上一个 checkpoint 相同也就没有意义了。

经过配置 region 最大失败比例, 好比 50%,例子中 4 个 region ,最多能接受两个 region 失败。

  • 如何避免在文件系统上存储过多的 Checkpoint 历史数据

若是有某个 region 一直失败(遇到脏数据或代码逻辑问题),当前的机制会致使把全部历史 checkpoint 文件都保留下来,显然这是不合理的。

经过配置支持 region 最大连续失败次数。例如2表示 region 最多能引用前两次的 checkpoint 成功的 region 结果。

工程实现难点

  • 如何处理 Task Fail 和 checkpoint timeout
  • 同一 region 内已经 snapshot 成功的 subTask 状态如何处理
  • 如何保证和 checkpoint Coordinator 的兼容性

来看目前 Flink 是如何作的。


现有 coordinator

当发生 Task failure ,先会通知到 JobMaster FailoverStrategy,经过 FailoverStrategy 来通知 checkpoint coordinator 进行 checkpoint cancel 操做。

那么 checkpoint timeout 状况如何处理?当 coordinator 触发 checkpoint 时,会开启 checkpoint canceller。canceller 内有一个定时器,当超过预设时间而且 coordinator 还未完成 checkpoint,说明出现timeout,通知 coordinator cancel 本次 checkpoint。

不管是 Task fail 仍是 timeout 最终都会指向 pendding checkpoint,而且当前指向的 checkpoint 就会被丢弃。

在作出相应修改前先梳理 checkpoint 相关的 Message,和 checkpoint coordinator 会作出的反应。

Global checkpoint 为 Flink 现有机制。

为了保持和 checkpoint Coordinator 兼容性,添加一个 CheckpointHandle 接口。并添加了两个实现分别是 GlobalCheckpointHandle 和 RegionalCheckpointHandle 经过过滤消息的方式实现 global checkpoint 和 region checkpoint 相关操做。

region checkpoint 提一点。若是 handler 接收到失败消息,将这个 region 置为失败,并尝试从以前的 successful checkpoint 进行 region 逻辑映射。一样 coordinator 发送 nofityComplate 消息也会先通过 handler 的过滤,过滤掉发送给失败 Task 的消息。


业务收益

测试在 5000 并行度下,假设单个 Task snapshot 的成功率为 99.99%。使用 Global checkpoint 的成功率为 60.65%, 而使用 Region checkpoint 任然能保持 99.99%。

3、Checkpoint 上的其它优化

■ 并行化恢复 operator 状态

union state 是一种比较特殊的状态,在恢复时须要找到 job 全部的 Task state 再进行 union 恢复到单个 Task 中。若是 Job 并行度很是大,如 10000, 那么每一个 task 的 union state 进行恢复时至少须要读取 10000 个文件。若是串行恢复这 10000 个文件里的状态,那么恢复的耗时可想而知是很是漫长的。

虽然 OperatorState 对应的数据结构是没法进行并行操做的,可是咱们读取文件的过程是能够并行化的,在 OperatorStateBackend 的恢复过程当中,咱们将读取 HDFS 文件的过程并行化,等到全部状态文件解析到内存后,再用单线程去处理,这样咱们能够将几十分钟的状态恢复时间减小到几分钟。

■ 加强 CheckpointScheduler 并支持 Checkpoint 整点触发

Flink checkpoint 的 interval,timeout 在任务提交以后是没法修改的。但刚上线时只能根据经验值进行设置。而每每在做业高峰期时会发现 interval,timeout 等参数设置不合理。这时一般一个方法是修改参数重启任务,对业务影响比较大,显然这种方式是不合理的。

在这里,咱们对 CheckpointCoordinator 内部的 Checkpoint 触发机制作了重构,将已有的 Checkpoint 触发流程给抽象出来,使得咱们能够很快地基于抽象类对 Checkpoint 触发机制进行定制化。好比在支持数据导入的场景中,为了更快地造成 Hive 分区,咱们实现了整点触发的机制,方便下游尽快地看到数据。

还有不少优化点就不一一列举了。

4、挑战 & 将来规划

目前字节内部的做业状态最大能达到 200TB 左右的水平,而对于这种大流量和大状态的做业,直接使用 RocksDB StateBackend 是没法支撑的。因此将来,咱们会以后继续会在 state 和 checkpoint 性能优化和稳定性上作更多的工做,好比强化已有的 StateBackend、解决倾斜和反压下 Checkpoint 的速率问题、加强调试能力等。

本文为阿里云原创内容,未经容许不得转载。