Spark:partition、task、executor关系

spark中执行任务会显示以下格式的进度:

[Stage 4:=========================>                              (12 + 11) / 24]
# 这是stage4阶段:
## 共有24个task(一个partition对应一个task,因此有24个partition)
## 当前正在并行执行的task数量为11。
## 这里没有executor数,由于一个executor里能够同时执行多个task(每一个task至少要占用一个虚拟核vcore)
## 已经有12个task执行完成

观察这个进度过程有利于看出是否存在数据倾斜:若其中1个task的完成时间明显高于其余task,说明极可能这个task处理的数据量多于其余task。java

executor和task关系:

一个executor能够并行执行多个task,实际上一个executor是一个进程,task是executor里的一个线程。
一个task至少要独占executor里的一个虚拟核心vcore。
一个executor里的核心数由spark-submit的--executor-cores参数指定。
一个task要占用几个核心,能够由.config("spark.task.cpus", 1)配置,默认是1即一个task占用一个vcore。shell

同时并行执行的task最大数量 = executor数目 * (每一个executor核数 / 每一个task占用核心数)json

任务执行快结束可能会变成这样:

[Stage 4:=============================================>          (22 + 2) / 24]

由于这时候还有2个task没有完成,此时有些executor可能已经空闲下来了。spa


DataFrameReader读取csv和json若是设了以下选项,会形成生成的DataFrame只有一个partition,也就是只有一个task:线程

.option("multiLine", true)  
//加入此行会形成生成的DataFrame只有一个partition

由于spark要考虑读取多行解析文件数据,因此不能进行文件的随意分割。code

相反,若是是单行模式则能够以任意行结束符进行分割,就能并行读取,生成的DataFrame就能有多个分区。
若是要读取的文本文件在hdfs上,生成DataFrame的分区数等于原始文件的block数。如1345MB文件,block大小128MB,会生成11个partition(1345/128=10.5)。进程

查看Dataset分区数:

ds.rdd.getNumPartitions

改变分区数:

ds.repartition #能任意改变分区数,可是速度慢
ds.coalesce #只能减小分区数,对平衡数据倾斜有效,并且是窄依赖因此速度块。