1.使用reduceByKey/aggregateByKey替代groupByKey
groupByKey算子只是单纯对数据进行分组,而reduceByKey和aggregateByKey会先按照相同的key值合并数据,然后再进行分组,更适用于数据量大的场景。
groupByKey没有预先聚合,shuffle操作会更加严重。
2.用广播变量存储和处理外来数据
Driver每次分发任务的时候会把task和数据发送给各个Executor。因此,在每个Executor中,有多少个task就有多少个Driver端数据的副本。过多重复的副本会占用大量的内存,严重影响性能。而广播变量在最开始只保存了一个副本在Driver上,当task使用广播变量的时候,会从Driver拉取广播变量的副本,并保存在本地的BlockManager上。此后每次运行Executor上的task,Executor都会从本地的BlockManager获取广播变量的副本,如果本地没有,也可以从其他节点的BlockManager上拉取。由于广播变量的内容可以跨作业共享,且广播变量的存储不需要生成很多副本,所以广播变量的使用节省了内存空间,且增加了跨节点之间数据传输的速率。
3.collect函数容易导致内存不足
collect操作会将Executor的全部数据发送到Driver端,如果Driver端内存不足,会产生out of memory报错。我工作时用到的Driver端,内存只配了4G。当要处理的数据量很大的时候,使用toLocalIterator算子会更加合适。toLocalIterator以迭代器的方式处理数据,可以把数据按分区来进行迭代处理。
4.用mapPartitions代替map
map处理数据的时候,对数据进行的是逐个遍历,效率比mapPartitions低。
mapPartitions是对整个RDD的所有分区的数据,以迭代器的方式逐一进行操作,实现了批量处理。当数据量特别大的时候,只能用map,用mapPartitions可能会出现内存溢出。当数据量占用内存空间不太大的时候,如果为了提升执行效率,更推荐使用mapPartitions。
5.使用filter + coalesce操作
coalesce可以对数据进行重新分区,用来解决数据在每个分区不均匀的问题,且可以指定分区数和是否进行shuffle操作。重新分区可以减少任务调度成本,提高任务对RDD的处理速度。在coalesce前使用filter,可以减少coalesce任务处理的数据量,提升性能。
6.对多次使用的Rdd数据进行持久化
持久化常用的算子:Rdd.cache() & Rdd.persist()
如果RDD在一个计算任务中,需要被多次使用,使用cache算子或者persist算子可以把数据持久化,让数据被缓存在内存中。当RDD被持久化后,每个节点的其它分区都可以使用这个RDD在内存中进行计算。持久化的方式方便了不同分区间的数据读写,加快了RDD处理的速度。
声明:文中观点不代表本站立场。本文传送门:https://eyangzhen.com/206828.html