如何在 Apache Flink 中使用 Python API?

做者:孙金城(金竹) 整理:韩非java

本文根据 Apache Flink 系列直播课程整理而成,由 Apache Flink PMC,阿里巴巴高级技术专家 孙金城 分享。重点为你们介绍 Flink Python API 的现状及将来规划,主要内容包括:Apache Flink Python API 的前世此生和将来发展;Apache Flink Python API 架构及开发环境搭建;Apache Flink Python API 核心算子介绍及应用。git

一.Apache Flink Python API 的前世此生和将来发展

1.Flink 为何选择支持 Python

Apache Flink 是流批统一的开源大数据计算引擎,在 Flink 1.9.0 版本开启了新的 ML 接口和全新的Python API架构。那么为何 Flink 要增长对 Python 的支持,下文将进行详细分析。github

  • 最流行的开发语言

1.jpg

Python 自己是很是优秀的开发语言,据 RedMonk 数据统计,除 Java 和 JavaScript 以外,受欢迎度排名第三。shell

RedMonk 是著名的以开发人员为中心的行业分析公司,其更详细的分析信息,你们在拿到个人PPT以后,能够点击连接进行详细查阅。好了,那么Python的火热,与咱们今天向你们分享的流批统一的大数据计算引擎,Apache Flink有什么关系呢?带着这个问题,咱们你们想一想目前与大数据相关的著名的开源组件有哪些呢?好比说最先期的批处理框架Hadoop?流计算平台Storm,最近异常火热的Spark?异或其余领域数仓的Hive,KV存储的HBase?这些都是很是著名的开源项目,那么这些项目都无一例外的进行了Python API的支持。apache

  • 众多开源项目支持

2.jpg

Python 的生态已相对完善,基于此,Apache Flink 在 1.9 版本中也投入了大量的精力,去推出了一个全新的 Pyflink。除大数据外,人工智能与Python也有十分密切的关系。windows

  • ML青睐的语言

3.jpg

从上图统计数据能够发现,Python API 自己已经占机器学习岗位需求语言的 0.129%。相对于 R 语言,Python 语言彷佛更受青睐。Python 做为解释型语言,语法的设计哲学是”用一种方法而且只有一种方法来作一件事”。其简洁和易用性使其成为了世界上最受欢迎的语言,在大数据计算领域都有着很好的生态建设,同时Python在机器学习 在机器学习方面也有很好的前景,因此咱们在近期发布的Apache Flink 1.9 以全新的架构推出新的 Python API微信

Flink 是一款流批统一的计算引擎,社区很是重视和关注 Flink 用户,除 Java 语言或者 Scala 语言,社区但愿提供多种入口,多种途径,让更多的用户更方便的使用 Flink,并收获 Flink 在大数据算力上带来的价值。所以 Flink 1.9 开始,Flink 社区以一个全新的技术体系来推出 Python API,而且已经支持了大部分经常使用的一些算子,好比如 JOIN,AGG,WINDOW 等。数据结构

2.Python API – RoadMap

4.jpg

在 Flink 1.9 中虽然 Python 可使用 Java 的 User-defined Function,可是还缺少 Python native 的 User-defined function 的定义,因此咱们计划在 Flink 1.10 中进行支持 Python User-defined function 的支持。并技术增长对数据分析工具类库 Pandas 的支持,在 Flink 1.11 增长对 DataStream API 和 ML API 的支持。架构

二.Python API架构及开发环境搭建

1.Python Table API架构

5.jpg

新的 Python API 架构分为用户 API 部分,PythonVM 和 Java VM 的通信部分,和最终将做业提交到 Flink 集群进行运行的部分。那么 PythonVM 和 JavaVM 是怎样通信的呢?咱们在Python 端会会有一个 Python 的 Gateway 用于保持和 Java 通信的连接,在 Java 部分有一个 GateWayServer 用于接收 Python 部分的调用请求。框架

关于 Python API 的架构部分,在 1.9 以前,Flink 的 DataSet 和 DataStream 已经有了对 Python API 的支持,可是拥有 DataSet API 和 DataStream API 两套不一样的 API。对于 Flink 这样一个流批统一的流式计算引擎来说,统一的架构相当重要。而且对于已有的 Python DataSet API 和 DataStream API 而言,采用了JPython 的技术体系架构,而 JPython 自己对目前 Python 的 3.X 系列没法很好的支持,因此 Flink 1.9 发布后,决定将原有的 Python API 体系架构废弃,以全新的技术架构出现。这套全新的 Python API 基于 Table API 之上。

Table API 和 Python API 之间的通信采用了一种简单的办法,利用 Python VM 和 Java VM 进行通讯。在 Python API 的书写或者调用过程当中,以某种方式来与 Java API 进行通信。操做 Python API 就像操做 Java 的 Table API同样。新架构中能够确保如下内容:

  • 不须要另外建立一套新的算子,能够轻松与 Java 的 Table API 的功能保持一致;
  • 得益于现有的 Java Table API 优化模型,Python 写出来的API,能够利用 Java API 优化模型进行优化,能够确保 Python 的 API 写出来的 Job 也可以具有极致性能。

5.jpg

如图,当 Python 发起对Java的对象请求时候,在 Java 段建立对象并保存在一个存储结构中,并分配一个 ID 给 Python 端,Python 端在拿到 Java 对象的 ID 后就能够对这个对象进行操做,也就是说 Python 端能够操做任何 Java 端的对象,这也就是为何新的架构能够保证Python Table API 和 Java Table API功能一致,而且能过服用现有的优化模型。

6.jpg

在新的架构和通信模型下,Python API 调用 Java API 只须要在持有 Java 对象的 ID,将调用方法的名字和参数传递给 Java VM,就能完成对 Java Table API 的调用,因此在这样的架构中开发 Python Table API 与开发 Java Table API 的方式彻底一致,接下来我为你们详细介绍如何开发一个简单的 Python API 做业。

2.Python Table API – Job开发

7.jpg

一般来说一个 Python Table Job 通常会分红四个部分,首先要根据目前的现状,要决定这个Job 是以批的方式运行,仍是流的方式运行。固然后续版本用户能够不考虑,但当前 1.9 版本仍是须要考虑。

在决定第一步以怎样的方式执行 Job 后,咱们须要了解数据从哪里来,如何定义 Source、结构数据类型等信息。而后须要写计算逻辑,而后就是对数据进行计算操做,但最终计算的结果须要持久化到某个系统。最后定义 Sink,与 Source 相似,咱们须要定义 Sink Schema,以及每个字段类型。

下面将详细分享如何用 Python API 写每一步?首先,咱们建立一个执行环境,对于执行环境自己来说,首先须要一个 ExecutionEnvironment,根本上咱们须要一个 TableEnvironment。那么在 TableEnvironment 中,有一个参数 Table Config,Table Config 中会有一些在执行过程当中的配置参数,能够传递到 RunTime 层。除此以外,还提供了一些个性化的配置项,能够在实际业务开发中进行使用。

8.jpg

在拿到 Environment 后,须要对数据源表进行定义,以 CSV 格式文件为例,用"逗号"分隔,用 Field 来代表这个文件中有哪些字段。那么会看到,目前里面用逗号分隔,而且只有一个字段叫 word,类型是 String。

9.jpg

在定义并描述完数据源数据结构转换成 Table 数据结构后,也就是说转换到 Table API 层面以后是怎样的数据结构和数据类型?下面将经过 with_schema 添加字段及字段类型。这里只有一个字段,数据类型也是 String,最终注册成一个表,注册到 catlog 中,就能够供后面的查询计算使用了。

10.jpg

建立结果表,当计算完成后须要将这些结果存储到持久化系统中,以 WordCount 为例,首先存储表会有一个 word 以及它的计数两个字段,一个是 String 类型的 word,另外一个是 Bigint 的计数,而后把它注册成 Sink。

11.jpg

编写注册完 Table Sink 后,再来看如何编写逻辑。其实用 Python API 写 WordCount 和 Table API 同样很是简单。由于相对于 DataSream 而言 Python API 写一个 WordCount 只须要一行。好比 group by,先扫描Source表,而后 group by 一个 Word,再进行 Select word 并加上聚合统计Count ,最终将最数据结果插入到结果表里面中。

3.Python Table API – 环境搭建

12.jpg

那么WordCount 怎样才能真正的运行起来?首先须要搭建开发环境,不一样的机器上可能安装的软件版本不同,这里列出来了一些版本的需求和要求,其中括号中是示例机器上的版本。

14.jpg

第二步,构建一个 Java 的二进制发布包,以从源代码进行构建,那么这一页面就是从原代码获取咱们的主干代码,而且拉取 1.9 的分支。固然你们能够用 Mater,可是 Master 不够稳定,仍是建议你们在本身学习的过程当中,最好是用 1.9 的分支去作。接下来进行实战演练环节,首先验证 PPT 的正确性。首先编译代码,示例以下:

//下载源代码
git clone https://github.com/apache/flink.git
// 拉取1.9分支
cd flink; git fetch origin release-1.9
git checkout -b release-1.9 origin/release-1.9
//构建二进制发布包
mvn clean install -DskipTests -Dfast
复制代码

编译完成后,须要在相应目录下找到发布包:

cd flink-dist/target/flink-1.9.0-bin/flink-1.9.0
tar -zcvf flink-1.9.0.tar.gz flink-1.9.0
复制代码

在构建完 Java 的 API 以后进行检验,咱们要构建一个 Python 的发布包。

15.jpg

由于大多数 Python 的用户咱们都知道咱们须要 pip install 方式,将须要的依赖库进行与本地的 Python 环境进行集成或者安装。

那么 Flink 也是同样,PyFlink 也须要打包一个 Pypip 可以识别的资源进行安装,在实际的使用中,也能够按这种命令去拷贝,在本身的环境中尝试。

cd flink-Python;Python setup.py sdist
复制代码

这个过程只是将 Java 包囊括进来,再把本身 PyFlink 自己模块的一些 Java 的包和 Python 包打包成一块儿,它会在 dist 目录下,有一个 apache-flink-1.9.dev0.tar.gz。

cd dist/
复制代码

在 dist 目录的 apache-flink-1.9.dev0.tar.gz 就是咱们能够用于 pip install 的 PyFlink 包。在1.9版本,除了 Flink Table,还有 Flink Table Blink。Flink 同时会支持两个 plan,若是你们能够尝试,咱们能够自由的切换是 Flink 原有的 Planner,仍是 Blink 的 Planner,你们能够去尝试。完成打包后,就能够尝试把包安装到咱们的实际环境当中。

16.jpg

接下来是一个很是简单的命令,首先检查命令的正确性,在执行以前,咱们用 pip 检查一下 list,咱们要看在已有的包里有没有,如今尝试把刚才打包的包再安装。在实际的使用过程当中,若是升级版,也要有这个过程,要把新的包要进行安装。

pip install dist/*.tar.gz pip list|grep flink 复制代码

17.jpg

安装完成后,就能够用刚才写的 WordCount 例子来验证环境是否正确。验证一下刚才的正确性,怎么验证?为了你们方便,能够直接克隆 enjoyment.code 仓库。

git clone https://github.com/sunjincheng121/enjoyment.code.git
cd enjoyment.code; Python word_count.py
复制代码

接下来体验并尝试。在这个目录下,咱们刚才开发的 WordCount 例子。直接用 Python 或检验环境是否 OK。这个时候 Flink Python API 会启动一个 Mini 的 Cluster,会将刚才 WordCount Job 进行执行,提交到一个 Mini Cluster 进行执行。如今 Run 的过程当中其实已经在集群上进行执行了。其实在这个代码里面是读了一个 Source 文件,把结果写到 CSV 文件,在当前目录,是有一个 Sink CSV 的。具体的操做步骤能够查看Flink中文社区视频Apache Flink Python API 现状及规划

18.jpg

IDE 的配置在正常的开发过程当中,其实咱们大部分仍是在本地进行开发的,这里推荐你们仍是用 Pychram 来开发 Python 相关的逻辑或者 Job。

同时因为有很大量的截图存在,也把这些内容整理到了博客当中,你们能够扫描二维码去关注和查看那么一些详细的注意事项,博客详细地址:enjoyment.cool。这里有一个很关键的地方,你们要注意,就是可能你的环境中有多种 Python 的环境,这时候选择的环境必定是刚才 pip install 环境。具体操做详见Apache Flink Python API 现状及规划。

4.Python Table API – 做业提交

19.jpg

还有哪些方式来提交 Job 呢?这是一个 CLI 的方式,也就是说真正的提交到一个现有的集群。首先启动一个集群。构建的目录通常在 target 目录下,若是要启动一个集群,直接启动就能够。这里要说一点的是,其中一个集群外部有个 Web Port,它的端口的地址都是在 flink-conf.yaml 配置的。按照 PPT 中命令,能够去查看日志,看是否启动成功,而后从外部的网站访问。若是集群正常启动,接下来看如何提交 Job 。

20.jpg

Flink 经过 run 提交做业,示例代码以下:

./bin/flink run -py  ~/training/0806/enjoyment.code/myPyFlink/enjoyment/word_count_cli.py
复制代码

用命令行方式去执行,除了用 PY 参数,还能够指定 Python 的 module,以及其余一些依赖的资源文件、JAR等。

21.jpg

在 1.9 版本中还为你们提供一种更便利的方式,就是以 Python Shell 交互式的方式来写 Python API 拿到结果。有两种方式可执行,第一种方式是 Local,第二种方式 Remote,其实这两种没有本质的差别。首先来看 Local ,命令以下:

bin/pyflink-shell.sh local
复制代码

启动一个mini Cluster ,当输出后,会出来一个 Python 的 Flink CLI 同时会有一些示例程序,供你们来体验,按照上面的案例就可以达到正确的输出和提交,既能够写 Streaming,也能够写 Batch。详细步骤你们参考视频操做便可。

到目前为止,你们应该已经对 Flink 1.9 上 Python API 架构有了大概了解,同时也了解到如何搭建 Python API 环境。而且以一个简单的 WordCount 示例,体验如何在 IDE 里面去执行程序,如何以 Flink run 和交互式的方式去提交 Job。同时也体验了现有一些交互上的一种方式来使用 Flink Python API。那么介绍完了整个 Flink 的一些环境搭建和一个简单的示例后。接下来详细介绍一下在1.9里面全部的核心算子。

三.Flink Python API 核心算子介绍及应用

1.Python Table API 算子

22.jpg

上面分享建立一个 Job 的过程,第一要选择执行的方式是Streaming仍是Batch;第二个要定义使用的表,Source、Schema、数据类型;第三是开发逻辑,同时在写 WordCount 时,使用 Count 的函数。最后,在 Python API 里面内置了不少聚合函数,可使用count,sum, max,min等等。

因此在目前 Flink 1.9 版本中,已经可以知足大多数常规需求。除了刚才讲到的 count。Flink Table API 算子 1.9 中也已经支持。关于 Flink Table API 算子,不管是 Python Table API 仍是 Java 的Table API,都有如下几种类型的操做。第一单流上的操做,好比说作一些SELECT、Filter,同时还能够在流上作一些聚合,包括开窗函数的 windows 窗口聚合以及列的一些操做,好比最下面的 add_columns 和 drop_columns。

除了单流,还有双流的操做,好比说双流 JOIN、双流 minus、union ,这些算子在Python Table API 里面都提供了很好的支持。Python Table API 在 Flink 1.9 中,从功能的角度看几乎彻底等同于Java Table API,下面以实际代码来看上述算子是怎么编写的以及怎么去开发Python算子。 2.Python Table API 算子-Watermark定义

23.jpg

细心的同窗可能会注意到,咱们还没有提到流的一个特质性 -> 时序。流的特性是来的顺序是可能乱序,而这种乱序又是流上客观存在的一种状态。在 Flink 中通常采用 Watermark 机制来解决这种乱序的问题。

在 Python API 中如何定义 Watermark?假设有一个 JSON 数据,a 字段 String,time 字段 datetime。这个时候定义 Watermark 就要在增长 Schema 时增长 rowtime 列。rowtime 必须是 timestamps 类型。

Watermark 有多种定义方式,上图中 watermarks_periodic_bounded 即会周期性的去发 Watermark,6万单位是毫秒。若是数据是乱序的,可以处理一分钟以内的乱序,因此这个值调的越大,数据乱序接受程度越高,可是有一点数据的延迟也会越高。关于 Watermark 原理你们能够查看个人blog: 1t.click/7dM。

3.Python Table API – Java UDF

24.jpg

最后,跟你们分享一下 Java UDF在 Flink 1.9 版本中的应用, 虽然在1.9中不支持 Python 的 UDF ,但 Flink 为你们提供了能够在 Python 中使用 Java UDF。在 Flink 1.9 中,对 Table 模块进行了优化和重构,目前开发 Java UDF 只须要引入 Flink common 依赖就能够进行 Python API 开发。

25.jpg

接下来以一个具体的示例给你们介绍利用 Java UDF 开发 Python API UDF,假设咱们开发一个求字符串长度的 UDF,在 Python 中须要用 Java 中的 register_java_function,function 的名字是包全路径。而后在使用时,就能够用注册的名字完成UDF的调用,详细能够查阅个人Blog: 1t.click/HQF。

26.jpg

那怎样来执行?能够用 Flink run 命令去执行,同时须要将UDF的JAR包携带上去。

Java UDF 只支持 Scalar Function?其实否则,在 Java UDF中既支持 Scalar Function,也支持 Table Function和Aggregate Function。以下所示:

27.jpg

4.Python Table API 经常使用连接

28.jpg

上面所讲到的一些东西,有一些长链的文档和连接,也放在PPT上方便你们查阅,同时最下面我也有我的博客。但愿对你们有帮助。

四.总结

简单的总结一下,本篇首先是介绍了Apache Flink Python API 历史发展的过程,介绍了Apache Flink Python API架构变动的缘由以及当前架构模型;任何对将来 Flink Python API 是的规划与功能特性继续详细介绍,最后指望你们能在QA环节能给一些建议和意见,谢谢!更多细节内容,欢迎订阅个人博客: enjoyment.cool/


▼ Apache Flink 社区推荐 ▼

Apache Flink 及大数据领域盛会 Flink Forward Asia 2019 将于 11月28-30日在北京举办,阿里、腾讯、美团、字节跳动、百度、英特尔、DellEMC、Lyft、Netflix 及 Flink 创始团队等近 30 家知名企业资深技术专家齐聚国际会议中心,与全球开发者共同探讨大数据时代核心技术与开源生态。了解更多精彩议程请点击:

developer.aliyun.com/special/ffa…

Flink 社区公众号后台回复“门票”,少许免费门票抢先拿。

Flink 社区官方微信公众号

Ververica公众号二维码.jpg