Flink 靠什么征服饿了么工程师?

平台现状

下面是目前饿了么平台现状架构图:java

clipboard.png

来源于多个数据源的数据写到kafka里,计算引擎主要是Storm,Spark和Flink,计算引擎出来的结果数据再落地到各类存储上。python

目前Storm任务大概有100多个,Spark任务有50个左右,Flink暂时还比较少。mysql

目前咱们集群规模天天数据量有60TB,计算次数有1000000000,节点有400个。这里要提一下,Spark和Flink都是on yarn的,其中Flink onyarn主要是用做任务间jobmanager隔离, Storm是standalone模式。程序员

应用场景

1.一致性语义redis

在讲述咱们应用场景以前,先强调实时计算一个重要概念, 一致性语义:sql

1) at-most-once:即fire and forget,咱们一般写一个java的应用,不去考虑源头的offset管理,也不去考虑下游的幂等性的话,就是简单的at-most-once,数据来了,无论中间状态怎样,写数据的状态怎样,也没有ack机制。apache

2) at-least-once: 重发机制,重发数据保证每条数据至少处理一次。api

3) exactly-once: 使用粗Checkpoint粒度控制来实现exactly-once,咱们讲的exactly-once大多数指计算引擎内的exactly-once,即每一步的operator内部的状态是否能够重放;上一次的job若是挂了,可否从上一次的状态顺利恢复,没有涉及到输出到sink的幂等性概念。性能优化

4) at-least-one + idempotent = exactly-one:若是咱们能保证说下游有幂等性的操做,好比基于mysql实现 update on duplicate key;或者你用es, cassandra之类的话,能够经过主键key去实现upset的语义, 保证at-least-once的同时,再加上幂等性就是exactly-once。架构

2. Storm

饿了么早期都是使用Storm,16年以前仍是Storm,17年才开始有Sparkstreaming, Structed-streaming。Storm用的比较早,主要有下面几个概念:

1) 数据是tuple-based

2) 毫秒级延迟

3) 主要支持java, 如今利用apache beam也支持python和go。

4) Sql的功能还不完备,咱们本身内部封装了typhon,用户只须要扩展咱们的一些接口,就可使用不少主要的功能;flux是Storm的一个比较好的工具,只须要写一个yaml文件,就能够描述一个Storm任务,某种程度上说知足了一些需求,但仍是要求用户是会写java的工程师,数据分析师就使用不了。

2.1 总结

1) 易用性:由于使用门槛高,从而限制了它的推广。

2)StateBackend:更多的须要外部存储,好比redis之类的kv存储。

3) 资源分配方面:用worker和slot提早设定的方式,另外因为优化点作的较少,引擎吞吐量相对比较低一点。

3. Sparkstreaming

有一天有个业务方过来提需求说 咱们能不能写个sql,几分钟内就能够发布一个实时计算任务。 因而咱们开始作Sparkstreaming。它的主要概念以下:

1) Micro-batch:须要提早设定一个窗口,而后在窗口内处理数据。

2) 延迟是秒级级别,比较好的状况是500ms左右。

3) 开发语言是java和scala。

4)streaming SQL,主要是咱们的工做,咱们但愿提供streaming SQL的平台。

特色:

1) Spark生态和SparkSQL: 这是Spark比较好的地方,技术栈是统一的,SQL,图计算,machine learning的包都是能够互调的。由于它先作的是批处理,和Flink不同,因此它自然的实时和离线的api是统一的。

2) Checkpointon hdfs。

3) onyarn:Spark是属于hadoop生态体系,和yarn集成度高。

4) 高吞吐: 由于它是Micro-batch的方式,吞吐也是比较高的。

下面给你们大体展现一下咱们平台用户快速发布一个实时任务的操做页面,它须要哪些步骤。咱们这里不是写DDL和DML语句,而是ui展现页面的方式。

clipboard.png

页面里面会让用户选一些必要的参数, 首先会选哪个kafka集群,每一个分区消费多少,反压也是默认开启的。消费位置须要让用户每次去指定,有可能用户下一次重写实时任务的时候,能够根据业务需求去选择offset消费点。

中间就是让用户描述pipeline。 SQL就是kafka的多个topic,输出选择一个输出表,SQL把上面消费的kafka DStream注册成表,而后写一串pipeline,最后咱们帮用户封装了一些对外sink(刚刚提到的各类存储都支持,若是存储能实现upsert语义的话,咱们都是支持了的)。

3.1 MultiStream-Join

虽然刚刚知足通常无状态批次内的计算要求,但就有用户想说, 我想作流的join怎么办, 早期的Spark1.5能够参考Spark-streamingsql这个开源项目把 DStream注册为一个表,而后对这个表作join的操做,但这只支持1.5以前的版本,Spark2.0推出structured streaming以后项目就废弃了。咱们有一个tricky的方式:

clipboard.png

让Sparkstreaming去消费多个topic,可是我根据一些条件把消费的DStream里面的每一个批次RDD转化为DataFrame,这样就能够注册为一张表,根据特定的条件,切分为两张表,就能够简单的作个join,这个join的问题彻底依赖于本次消费的数据,它们join的条件是不可控的,是比较tricky的方式。好比说下面这个例子,消费两个topic,而后简单经过filer条件,拆成两个表,而后就能够作个两张表的join,但它本质是一个流。

clipboard.png

3.2 Exactly-once

clipboard.png

exactly-once须要特别注意一个点:

咱们必需要求数据sink到外部存储后,offset才能commit,不论是到zk,仍是mysql里面,你最好保证它在一个transaction里面,并且必须在输出到外部存储(这里最好保证一个upsert语义,根据unique key来实现upset语义)以后,而后这边源头driver再根据存储的offeset去产生kafka RDD,executor再根据kafka每一个分区的offset去消费数据。若是知足这些条件,就能够实现端到端的exactly-once. 这是一个大前提。

3.3 总结

1) Stateful Processing SQL ( <2.x mapWithState、updateStateByKey):咱们要实现跨批次带状态的计算的话,在1.X版本,咱们经过这两个接口去作,但仍是须要把这个状态存到hdfs或者外部去,实现起来比较麻烦一点。

2) Real Multi-Stream Join:没办法实现真正的多个流join的语义。

3)End-To-End Exactly-Once Semantics:它的端到端的exactly-once语义实现起来比较麻烦,须要sink到外部存储后还须要手动的在事务里面提交offset。

4. STRUCTUREDSTREAMING

咱们调研而后并去使用了Spark2.X以后带状态的增量计算。下面这个图是官方网站的:

clipboard.png

全部的流计算都参照了Google的 data flow,里面有个重要的概念:数据的processing time和event time,即数据的处理时间和真正的发生时间有个gap。因而流计算领域还有个watermark,当前进来的事件水位须要watermark来维持,watermark能够指定时间delay的范围,在延迟窗口以外的数据是能够丢弃的,在业务上晚到的数据也是没有意义的。

下面是structuredstreaming的架构图:

clipboard.png

这里面就是把刚才Sparkstreaming讲exactly-once的步骤1,2,3都实现了,它本质上仍是分批的batch方式,offset本身维护,状态存储用的hdfs,对外的sink没有作相似的幂等操做,也没有写完以后再去commit offset,它只是再保证容错的同时去实现内部引擎的exactly-once。

4.1 特色

1) Stateful Processing SQL&DSL:能够知足带状态的流计算

2) Real Multi-Stream Join:能够经过Spark2.3实现多个流的join,多个流的join作法和Flink相似,你须要先定义两个流的条件(主要是时间做为一个条件),好比说有两个topic的流进来,而后你但愿经过某一个具体的schema中某个字段(一般是event time)来限定须要buffer的数据,这样能够实现真正意义上的流的join。

3)比较容易实现端到端的exactly-once的语义,只须要扩展sink的接口支持幂等操做是能够实现exactly-once的。

特别说一下,Structuredstreaming和原生的streaming的api有一点区别,它create表的Dataframe的时候,是须要指定表的schema的,意味着你须要提早指定schema。另外它的watermark是不支持SQL的,因而咱们加了一个扩展,实现彻底写sql,能够从左边到右边的转换(下图),咱们但愿用户不止是程序员,也但愿不会写程序的数据分析师等同窗也能用到。

clipboard.png

4.2 总结

1) Trigger(Processing Time、 Continuous ):2.3以前主要基于processing Time,每一个批次的数据处理完了立马触发下一批次的计算。2.3推出了record by record的持续处理的trigger。

2)Continuous Processing (Only Map-Like Operations):目前它只支持map like的操做,同时sql的支持度也有些限制。

3) LowEnd-To-End Latency With Exactly-Once Guarantees:端到端的exactly-once的保证须要本身作一些额外的扩展, 咱们发现kafka0.11版本提供了事务的功能,是能够从基于这方面考虑从而去实现从source到引擎再到sink,真正意义上的端到端的exactly-once。

4) CEP(Drools):咱们发现有业务方须要提供cep 这样复琐事件处理的功能,目前咱们的语法没法直接支持,咱们让用户使用规则引擎Drools,而后跑在每一个executor上面,依靠规则引擎功能去实现cep。

因而基于以上几个Spark structuredstreaming的特色和缺点,咱们考虑使用Flink来作这些事情。

5.Flink

clipboard.png

Flink目标是对标Spark,流这块是领先比较多,它野心也比较大,图计算,机器学习等它都有,底层也是支持yarn,tez等。对于社区用的比较多的存储,Flink社区官方都支持比较好,相对来讲。

Flink的框架图:

clipboard.png

Flink中的JobManager,至关于Spark的driver角色,taskManger至关于executor,里面的task也有点相似Spark的那些task。 不过Flink用的rpc是akka,同时Flink core自定义了内存序列化框架,另外task无需像Spark每一个stage的task必须相互等待而是处理完后即往下游发送数据。

Flink binary data处理operator:

clipboard.png

Spark的序列化用户通常会使用kryo或者java默认的序列化,同时也有Tungsten项目对Spark程序作一jvm层面以及代码生成方面的优化。相对于Spark,Flink本身实现了基于内存的序列化框架,里面维护着key和pointer的概念,它的key是连续存储,在cpu层面会作一些优化,cache miss几率极低。比较和排序的时候不须要比较真正的数据,先经过这个key比较,只有当它相等的时候,才会从内存中把这个数据反序列化出来,再去对比具体的数据,这是个不错的性能优化点。

Flink task chain:

clipboard.png

Task中operatorchain,是比较好的概念。若是上下游数据分布不须要从新shuffle的话,好比图中source是kafka source,后面跟的map只是一个简单的数据filter,咱们把它放在一个线程里面,就能够减小线程上下文切换的代价。

并行度概念

clipboard.png

好比说这里面会有5个task,就会有几个并发线程去跑,chain起来的话放在一个线程去跑就能够提高数据传输性能。Spark是黑盒的,每一个operator没法设并发度,而Flink能够对每一个operator设并发度,这样能够更灵活一点,做业运行起来对资源利用率也更高一点。

Spark 通常经过Spark.default.parallelism来调整并行度,有shuffle操做的话,并行度通常是通Spark.sql.shuffle.partitions参数来调整,实时计算的话其实应该调小一点,好比咱们生产中和kafka的partition数调的差很少,batch在生产上会调得大一点,咱们设为1000,左边的图咱们设并发度为2,最大是10,这样首先分2个并发去跑,另外根据key作一个分组的概念,最大分为10组,就能够作到把数据尽可能的打散。

State & Checkpoint

由于Flink的数据是一条条过来处理,因此Flink中的每条数据处理完了立马发给下游,而不像spark,须要等该operator所在的stage全部的task都完成了再往下发。

Flink有粗粒度的checkpoint机制,以很是小的代价为每一个元素赋予一个snapshot概念,只有当属于本次snapshot的全部数据都进来后才会触发计算,计算完后,才把buffer数据往下发,目前Flink sql没有提供控制buffer timeout的接口,即个人数据要buffer多久才往下发。能够在构建Flink context时,指定buffer timeout为0,处理完的数据才会立马发下去,不须要等达到必定阈值后再往下发。

Backend默认是维护在jobmanager内存,咱们更多使用的的是写到hdfs上,每一个operator的状态写到rocksdb上,而后异步周期增量同步到外部存储。

容错

clipboard.png

图中左半部分的红色节点发生了failover,若是是at-least-once,则其最上游把数据重发一次就好;但若是是exactly-once,则须要每一个计算节点从上一次失败的时机重放。

Exactly Once Two-Phase Commit

clipboard.png

Flink1.4以后有两阶段提交来支持exactly-once.它的概念是从上游kafka消费数据后,每一步都会发起一次投票,来记录状态,经过checkpoint的屏障来处理标记,只有最后再写到kafka(0.11以后的版本),只有最后完成以后,才会把每一步的状态让jobmanager中的cordinator去通知能够固化下来,这样实现exactly-once。

Savepoints
还有一点Flink比较好的就是,基于它的checkpoint来实现savepoint功能。业务方须要每一个应用恢复节点不同,但愿恢复到的版本也是能够指定的,这是比较好的。这个savepoint不仅是数据的恢复,也有计算状态的恢复。

特色:

1) Trigger (Processing Time、 Event Time、IngestionTime):对比下,Flink支持的流式语义更丰富,不只支持Processing Time, 也支持Event time和Ingestion Time。

2)Continuous Processing & Window:支持纯意义上的持续处理,recordby record的,window也比Spark处理的好。

3) Low End-To-End Latency With Exactly-Once Guarantees:由于有两阶段提交,用户是能够选择在牺牲必定吞吐量的状况下,根据业务需求状况来调整来保证端到端的exactly-once。

4) CEP:支持得好。

5) Savepoints:能够根据业务的需求作一些版本控制。

也有作的还很差的:

1)SQL (Syntax Function、Parallelism):SQL功能还不是很完备,大部分用户是从hive迁移过来,Spark支持hive覆盖率达到99%以上。 SQL函数不支持,目前还没法对单个operator作并行度的设置。

2) ML、Graph等:机器学习,图计算等其余领域比Spark要弱一点,但社区也在着力持续改进这个问题。

本文做者:易伟平

阅读原文

本文来自云栖社区合做伙伴“阿里技术”,如需转载请联系原做者。