Spark 技术调优,别告诉我你不会?

目录java

 

1、性能调优sql

2、jvm调优 数据库

3、shuffle调优(优先使用前面两点,实测有效)apache

4、算子调优缓存

5、troubleshooting网络

6、数据倾斜解决方案架构


1、性能调优

1.1 配更多资源:app

    --num-executors 3 \  配置executor的数量--driver-memory 100m \  配置driver的内存(影响不大) --executor-memory 100m \  配置每一个executor的内存大小 --executor-cores 3 \  配置每一个executor的cpu core数量运维

    num-executors、executor-cores可提高任务的并行度;driver-memory、executor-memory增长内存,可缓存更多的数据减小磁盘IO,减小suffle时reduce端磁盘IO,可下降堆内存满了频繁GC,避免频繁垃圾回收jvm

1.2 调节并行度:

    一、task数量,至少设置成与Spark application的总cpu core数量相同(最理想状况,好比总共150个cpu core,分配了150个task,一块儿运行,差很少同一时间运行完毕)

    二、官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,好比150个cpu core,基本要设置task数量为300~500;

    如何设置一个Spark Application的并行度? SparkConf conf = new SparkConf()  .set("spark.default.parallelism", "500")

1.3 重构RDD架构以及RDD持久化:

    第一,RDD架构重构与优化 尽可能去复用RDD,差很少的RDD,能够抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。

    第二,公共RDD必定要实现持久化,对于要屡次计算和使用的公共RDD,必定要进行持久化。

    第三,持久化,是能够进行序列化的。优化使用memory,再考虑磁盘

    第四,为了数据的高可靠性,并且内存充足,可使用双副本机制,进行持久化;持久化后的一个副本,由于机器宕机了,副本丢了,就仍是得从新计算一次;这种方式,仅仅针对你的内存资源极度充足

1.4 广播大变量

    广播变量的好处,不是每一个task一份变量副本,而是变成每一个节点的executor才一份副本。这样的话,就可让变量产生的副本大大减小。减小网络开销、内存占用、磁盘IO、GC垃圾回收的次数

1.5 使用Kryo序列化set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化

    Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。 因此Kryo序列化优化之后,可让网络传输的数据变少;在集群中耗费的内存资源大大减小。

1.6 使用fastutil优化数据格式

    fastutil尽可能提供了在任何场景下都是速度最快的集合类库

1.7 调节数据本地化等待时长

    BlockManager > PROCESS_LOCAL > NODE_LOCAL > NO_PREF > RACK_LOCAL > ANY    

    观察日志,spark做业的运行日志,推荐你们在测试的时候,先用client模式,在本地就直接能够看到比较全的日志。日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL 观察大部分task的数据本地化级别

    new SparkConf()  .set("spark.locality.wait", "10") 

BlockManager

存放位置

详细说明

PROCESS_LOCAL

进程本地化 

task要计算的数据在同一个Executor中

NODE_LOCAL

节点本地化  

速度比PROCESS_LOCAL稍慢,由于数据须要在不一样进程之间传递或从文件中读取

NO_PREF

没有最佳位置这一说

数据从哪里访问都同样快,不须要位置优先。好比SparkSQL读取MySQL中的数据

RACK_LOCAL

本架本地化

数据在同一机架的不一样节点。须要经过网络传输数据及文件IO,比NODE_LOCAL慢

ANY

跨机架

数据在非同一机架的网络上,速度最慢


2、jvm调优 

2.1 JVM调优之下降cache操做的内存占比

    JVM调优的第一个点:下降cache操做的内存占比 spark中,堆内存又被划分红了两块儿,一起是专门用来给RDD的cache、persist操做进行RDD数据缓存用的;

    另一块儿,就是咱们刚才所说的,用来给spark算子函数的运行使用的,存放函数中本身建立的对象

    spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2

2.2 JVM调优之调节executor堆外内存与链接等待时长

    --conf spark.yarn.executor.memoryOverhead=2048(最小300m),通常会调大些

    --conf spark.core.connection.ack.wait.timeout=300(某某file,not found。file lost。颇有多是有那份数据的executor在jvm gc。因此拉取数据的时候,创建不了链接。而后超过默认60s之后,直接宣告失败。)


3、shuffle调优(优先使用前面两点,实测有效)

3.1 Shuffle调优之合并map端输出文件

    new SparkConf().set("spark.shuffle.consolidateFiles", "true") 开启shuffle map端输出文件合并的机制;默认状况下,是不开启的,就是会发生如上所述的大量map端输出文件的操做,严重影响性能。

    没有开启的话,每个task都会建立一份文件,先后有多少个task建立就会有多少个文件生成;若是开启的话,就会生成task的并行度 * executor的数量 份文件

3.2 Shuffle调优之调节map端内存缓冲与reduce端内存占比

    spark.shuffle.file.buffer,默认32k,能够调成64K      // spark.shuffle.file.buffer,每次扩大一倍,而后看看效果,64,128;spark.shuffle.memoryFraction,每次提升0.1,看看效果。

    spark.shuffle.memoryFraction,默认0.2,能够调成0.3   // 若是数据量比较大,reduce task拉取过来的数据不少,那么就会频繁发生reduce端聚合内存不够用,频繁发生spill操做,溢写到磁盘上去。

3.3 Shuffle调优之HashShuffleManager与SortShuffleManager

  1. 在生产环境中,不建议你们贸然使用第三点和第四点:
  2. 若是你不想要你的数据在shuffle时排序,那么就本身设置一下,用hash shuffle manager。 
  3. 若是你的确是须要你的数据在shuffle时进行排序的,那么就默认不用动,默认就是sort shuffle manager;或者是什么?若是你压根儿不care是否排序这个事儿,那么就默认让他就是sort的。调节一些其余的参数(consolidation机制)。(80%,都是用这种) 
    spark.shuffle.manager:hash、sort、tungsten-sort 
    new SparkConf().set("spark.shuffle.manager", "hash") 
    new SparkConf().set("spark.shuffle.manager", "tungsten-sort") 
    // 默认就是,new SparkConf().set("spark.shuffle.manager", "sort") 
    new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")

4、算子调优

4.1 MapPartitions提高Map类操做性能

    何时比较适合用MapPartitions系列操做,就是说,数据量不是特别大的时候,均可以用这种MapPartitions系列操做,性能仍是很是不错的,是有提高的。好比原来是15分钟,(曾经有一次性能调优),12分钟。10分钟->9分钟。

4.2 filter事后使用coalesce减小分区数量

    主要就是用于在filter操做以后,针对每一个partition的数据量各不相同的状况,来压缩partition的数量。减小partition的数量,并且让每一个partition的数据量都尽可能均匀紧凑。 从而便于后面的task进行计算操做,可以必定程度的提高性能。

4.3 使用foreachPartition优化写数据库性能

    在实际生产环境中,清一色,都是使用foreachPartition操做;可是有个问题,跟mapPartitions操做同样,若是一个partition的数量真的特别特别大,好比真的是100万,那基本上就不太靠谱了。 一会儿进来,颇有可能会发生OOM,内存溢出的问题。

4.4 使用repartition解决Spark SQL低并行度的性能问题

    repartition算子,你用Spark SQL这一步的并行度和task数量,确定是没有办法去改变了。可是呢,能够将你用Spark SQL查询出来的RDD,使用repartition算子,去从新进行分区,此时能够分区成多个partition,好比从20个partition,分区成100个。

4.5 reduceByKey本地聚合

    reduceByKey,相较于普通的shuffle操做(好比groupByKey),它的一个特色,就是说,会进行map端的本地聚合。


5、troubleshooting

5.1 控制shuffle reduce端缓冲大小以免OOM

    reduce端执行的聚合函数的代码,可能会建立大量的对象。也许,一会儿,内存就撑不住了,就会OOM。这个时候,就应该减小reduce端task缓冲的大小。我宁愿多拉取几回,可是每次同时可以拉取到reduce端每一个task的数量,比较少,就不容易发生OOM内存溢出的问题。(好比,能够调节成12M)

    spark.reducer.maxSizeInFlight,48
    spark.reducer.maxSizeInFlight,24

5.2 解决JVM GC致使的shuffle文件拉取失败

spark.shuffle.io.maxRetries 3

    第一个参数,意思就是说,shuffle文件拉取的时候,若是没有拉取到(拉取失败),最多或重试几回(会从新拉取几回文件),默认是3次。 

    spark.shuffle.io.retryWait 5s 第二个参数,意思就是说,每一次重试拉取文件的时间间隔,默认是5s钟。

5.3 解决YARN队列资源不足致使的application直接失败

    可能同时提交了相同的做业,以前那个做业就占据了资源的60%,再提交一个相同的做业,确定会资源不足;或者提交了一个长时间的做业,然后面须要运行2分钟的做业

    解决:跟运维沟通,实现调度策略。zeus

5.4 解决各类序列化致使的报错

    序列化报错要注意的三个点: 

  1. 你的算子函数里面,若是使用到了外部的自定义类型的变量,那么此时,就要求你的自定义类型,必须是可序列化的
  2. 若是要将自定义的类型,做为RDD的元素类型,那么自定义的类型也必须是能够序列化的
  3. 不能在上述两种状况下,去使用一些第三方的,不支持序列化的类型

    Connection是不支持序列化的

5.5 解决算子函数返回NULL致使的问题

    你们能够看到,在有些算子函数里面,是须要咱们有一个返回值的。可是,有时候,咱们可能对某些值,就是不想有什么返回值。

    咱们若是直接返回NULL的话,那么能够不幸的告诉你们,是不行的,会报错的。 Scala.Math(NULL),异常 若是碰到你的确是对于某些值,不想要有返回值的话,有一个解决的办法: 

  1. 在返回的时候,返回一些特殊的值,不要返回null,好比“-999” 
  2. 在经过算子获取到了一个RDD以后,能够对这个RDD执行filter操做,进行数据过滤。filter内,能够对数据进行断定,若是是-999,那么就返回false,给过滤掉就能够了。 
  3. 你们不要忘了,以前我们讲过的那个算子调优里面的coalesce算子,在filter以后,可使用coalesce算子压缩一下RDD的partition的数量,让各个partition的数据比较紧凑一些。也能提高一些性能。

5.6 解决yarn-client模式致使的网卡流量激增问题

    yarn-client模式下,只是用于测试时使用;

    yarn-cluster模式,就跟你的本地机器引发的网卡流量激增的问题,就没有关系了。也就是说,就算有问题,也应该是yarn运维团队和基础运维团队之间的事情了。

    使用了yarn-cluster模式之后,就不是你的本地机器运行Driver,进行task调度了。是yarn集群中,某个节点会运行driver进程,负责task调度。

5.7 解决yarn-cluster模式的JVM栈内存溢出问题

    yarn-client模式下,driver是运行在本地机器上的,JVM的永久代的大小是128M,这个是没有问题的

    yarn-cluster模式下,driver是运行在yarn集群的某个节点上的,使用的是没有通过配置的默认设置(PermGen永久代大小),82M。

    解决:spark-submit脚本中,加入如下配置便可:--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

    问题二:spark sql,调用的方法层级过多,由于产生了大量的,很是深的,超出了JVM栈深度限制的,递归。

    解决:JVM Stack Memory Overflow,栈内存溢出。 这种时候,建议不要搞那么复杂的spark sql语句。采用替代方案:将一条sql语句,拆解成多条sql语句来执行。每条sql语句,就只有100个or子句之内;一条一条SQL语句来执行。根据生产环境经验的测试,一条sql语句,100个or子句之内,是还能够的。一般状况下,不会报那个栈内存溢出。

5.8 错误的持久化方式以及checkpoint的使用


6、数据倾斜解决方案

6.1 聚合源数据以及过滤致使倾斜的key

  • 第一个方案:聚合源数据;将数据按key,将valuse进行聚合,中间使用分隔符进行拼接
  • 第二个方案:过滤致使倾斜的key

    简单。直接。效果是很是之好的。完全根除了数据倾斜的问题。

6.2 提升shuffle操做reduce并行度

    全部的shuffle算子,好比groupByKey、countByKey、reduceByKey。在调用的时候,传入进去一个参数。一个数字。那个数字,就表明了那个shuffle操做的reduce端的并行度。那么在进行shuffle操做的时候,就会对应着建立指定数量的reduce task。 这样的话,就可让每一个reduce task分配到更少的数据。基本能够缓解数据倾斜的问题。

    好比说,本来某个task分配数据特别多,直接OOM,内存溢出了,程序无法运行,直接挂掉。按照log,找到发生数据倾斜的shuffle操做,给它传入一个并行度数字,这样的话,原先那个task分配到的数据,确定会变少。就至少能够避免OOM的状况,程序至少是能够跑的。

6.3 使用随机key实现双重聚合

    第一轮聚合的时候,对key进行打散,将原先同样的key,变成不同的key,至关因而将每一个key分为多组; 

    先针对多个组,进行key的局部聚合;接着,再去除掉每一个key的前缀,而后对全部的key,进行全局的聚合。 

    对groupByKey、reduceByKey形成的数据倾斜,有比较好的效果。 

    若是说,以前的第1、第2、第三种方案,都无法解决数据倾斜的问题,那么就只能依靠这一种方式了。

6.4 将reduce join转换为map join

    若是两个RDD要进行join,其中一个RDD是比较小的。一个RDD是100万数据,一个RDD是1万数据。(一个RDD是1亿数据,一个RDD是100万数据) 

    其中一个RDD必须是比较小的,broadcast出去那个小RDD的数据之后,就会在每一个executor的block manager中都驻留一份。要确保你的内存足够存放那个小RDD中的数据 

    这种方式下,根本不会发生shuffle操做,确定也不会发生数据倾斜;从根本上杜绝了join操做可能致使的数据倾斜的问题; 

    对于join中有数据倾斜的状况,你们尽可能第一时间先考虑这种方式,效果很是好;若是某个RDD比较小的状况下。

6.5 sample采样倾斜key单独进行join

    将发生数据倾斜的key,单独拉出来,放到一个RDD中去;就用这个本来会倾斜的key RDD跟其余RDD,单独去join一下,这个时候,key对应的数据,可能就会分散到多个task中去进行join操做。 

    就不至于说是,这个key跟以前其余的key混合在一个RDD中时,确定是会致使一个key对应的全部数据,都到一个task中去,就会致使数据倾斜。

6.6 使用随机数以及扩容表进行join

  1. 选择一个RDD,要用flatMap,进行扩容,将每条数据,映射为多条数据,每一个映射出来的数据,都带了一个n之内的随机数,一般来讲,会选择10。 
  2. 将另一个RDD,作普通的map映射操做,每条数据,都打上一个10之内的随机数。 
  3. 最后,将两个处理后的RDD,进行join操做。