定义现象
绝大部分任务都很快完成,只有一个或者少数几个任务执行的很慢甚至最终执行失败,
这样的现象为数据倾斜现象。
任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大于平均时长(木桶原理)。
Hive
单表数据
- 对于某些不需要计算的数据可以优先过滤
- 当任务重存在 group by 的聚合操作时,开启参数设置
是否在 Map 端进行聚合,默认为 True
set hive.map.aggr = true;
在 Map 端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
有数据倾斜的时候进行负载均衡(默认是 false)
set hive.groupby.skewindata = true;
- 增加 reduce 数据
每个 Reduce 处理的数据量默认是 256MB
set hive.exec.reducers.bytes.per.reducer = 256000000
每个任务最大的 reduce 数,默认为 1009
set hive.exec.reducers.max = 1009
计算 reducer 数的公式
N=min(参数 2,总输入数据量/参数 1)(参数 2 指的是上面的 1009,参数 1 值得是 256M)设置每个 job 的 Reduce 个数
set mapreduce.job.reduces = 15;
多表join数据
- 使用参数
join 的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.skewjoin.key=100000;
如果是 join 过程出现倾斜应该设置为 true
set hive.optimize.skewjoin=false;
如果开启了,在 Join 过程中 Hive 会将计数超过阈值 hive.skewjoin.key(默认 100000)的倾斜 key 对应的行临时写进文件中,然后再启动另一个 job 做 map join 生成结果。通过hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个 job 的 mapper 数量,默认 10000。
set hive.skewjoin.mapjoin.map.tasks=10000;
- MapJoin
设置自动选择 MapJoin
set hive.auto.convert.join=true; #默认为 true
大表小表的阈值设置(默认 25M 以下认为是小表):
set hive.mapjoin.smalltable.filesize=25000000;
MapJoin 是将 Join 双方比较小的表直接分发到各个 Map 进程的内存中,在 Map 进程中进行 Join 操作,这样就不用进行 Reduce 步骤,从而提高了速度。
Spark
数据倾斜一般是发生在 shuffle 类的算子,比如 distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup 等,涉及到数据重分区,如果其中某一个 key 数量特别大,就发生了数据倾斜。
单表数据
- 可以在 shuffle 之前的 map 端做预聚合操作。即在 shuffle 前将同一分区内所属同 key 的记录先进行一个预结算,再将结果进行 shuffle,发送到 reduce 端做一个汇总。
比如 : reduceByKey 替代 groupByKey、两阶段聚合(加盐局部聚合+去盐全局聚合)等。
多表Join 数据
- 大小表的Join,使用 广播机制。小表足够小,可被加载进 Driver 并通过 Broadcast 方法广播到各个 Executor 中。
- 拆分大 key 打散大表,扩容小表。
- 将数据倾斜的 key 和没有数据倾斜的 key 分别为两个数据集;
- 对数据倾斜的 key 在其加上随机前缀,然后与另一个表的相同 key 也做随机前缀后 join。
- 对正常的数据做join,对倾斜的数据做join,然后 union。
方案 | 简介 |
HiveETL预处理 | Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行 join,然后再 Spark作业中针对的数据就不是原来的表,而是预处理后的表,在Spark时就不需要进行原先的shuffle操作了 |
过滤少数导致数据倾斜的key | 如果少数几个数据量特别多的 key 对作业的执行和计算结果不重要,那么直接结果掉它们 |
提高shuffle 并行度 | 提高shuffle类算子并行度 |
两阶段聚合 | 先随机加前缀预聚合,第二次去掉前缀再聚合 |
将reduce join 转成 map join | 小表广播到大表所在 executor进行 mapjoin |
采样倾斜key并分拆join | 对少数key的数据加上随机前缀,另一个表也膨胀为加上随机前缀,然后进行 join,再与正常join数据进行 union |
使用随机前缀和扩容RDD进行join | 将有大量数据倾斜的 key,每条都打上一个随机前缀,将另一个RDD彭场,然后两个RDD进行join |
Flink
- 使用 LocalKeyBy
通过在 KeyBy 之前积攒一定数量的数据,然后进行聚合,减少下游的数据量,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈 - shuffle 之前发送数据倾斜
由于某些原因 Kafka 的 topic 中某些 partition 的数据量较大,某些partition 的数据量较少。需要让 Flink 任务强制进行 shuffle。使用 shuffle、rebalance 或 rescale算子即可将数据均匀分配,从而解决数据倾斜的问题。 - keyBy 后的窗口聚合操作存在数据倾斜
因为使用了窗口,变成了有界数据(攒批)的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:
➢ 第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二
阶段分组依据,避免不同窗口的结果聚合到一起)
➢ 第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合
总结
综上所述,对数据倾斜的问题,首先要判断该 key 是否会对结果产生影响,对其进行过滤或者打上随机 key。然后还可以通过随机前缀的两阶段处理 和 增加 reduce, map,减少 shuffle,重分区(Flink)等。