Spark 性能调优实战
吴磊
前 FreeWheel 机器学习团队负责人
8808 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 36 讲
Spark 性能调优实战
15
15
1.0x
00:00/00:00
登录|注册

27 | 大表Join小表:广播变量容不下小表怎么办?

你好,我是吴磊。
在数据分析领域,大表 Join 小表的场景非常普遍。不过,大小是个相对的概念,通常来说,大表与小表尺寸相差 3 倍以上,我们就将其归类为“大表 Join 小表”的计算场景。因此,大表 Join 小表,仅仅意味着参与关联的两张表尺寸悬殊。
对于大表 Join 小表这种场景,我们应该优先考虑 BHJ,它是 Spark 支持的 5 种 Join 策略中执行效率最高的。BHJ 处理大表 Join 小表时的前提条件是,广播变量能够容纳小表的全量数据。但是,如果小表的数据量超过广播阈值,我们又该怎么办呢?
今天这一讲,我们就结合 3 个真实的业务案例,来聊一聊这种情况的解决办法。虽然这 3 个案例不可能覆盖“大表 Join 小表”场景中的所有情况,但是,分析并汇总案例的应对策略和解决办法,有利于我们在调优的过程中开阔思路、发散思维,从而避免陷入“面对问题无所适从”的窘境。

案例 1:Join Key 远大于 Payload

在第一个案例中,大表 100GB、小表 10GB,它们全都远超广播变量阈值(默认 10MB)。因为小表的尺寸已经超过 8GB,在大于 8GB 的数据集上创建广播变量,Spark 会直接抛出异常,中断任务执行,所以 Spark 是没有办法应用 BHJ 机制的。那我们该怎么办呢?先别急,我们来看看这个案例的业务需求。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入探讨了在数据分析领域中常见的大表Join小表场景,并结合三个真实业务案例,提出了解决这种情况的实用建议。文章首先介绍了一个业务案例,涉及流量预测和时序序列生成,强调了在大表Join小表场景中遇到的问题。针对小表数据量超过广播阈值的情况,文章提出了一种解决方案,即通过生成哈希值来替代原始Join Keys,从而将SMJ转化为BHJ,避免Shuffle过程,提高执行效率。此外,还对可能出现的哈希冲突问题进行了讨论,并提出了消除哈希冲突的方法。通过这些案例分析和解决方案的讨论,读者可以在调优过程中开阔思路、发散思维,避免陷入“面对问题无所适从”的窘境。整体而言,本文通过实际案例和技术讨论,为读者提供了解决大表Join小表场景中遇到的问题的实用建议,具有一定的技术深度和实用性。文章还介绍了利用AQE和DPP机制进行优化的方法,以及在小表数据分布均匀的情况下使用Join Hints强制选择SHJ策略进行关联计算的技巧。这些方法和技巧为读者提供了在实际工作中优化大表Join小表场景的参考和指导。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Spark 性能调优实战》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(15)

  • 最新
  • 精选
  • Fendora范东_
    问题回答 1.给join keys每个字段维护一个字典,每个字段值在字典内对应一个唯一的整数。拿到每个字段指定的种整数,然后组装起来,作为新的join key。 2.把维度表查询sql单独拿出来,缓存其df,查看其屋里计划,可以知道它结果集大小。 3.参考AQE的自动倾斜处理特性,把数据倾斜的分区拆分开,然后再进行SHJ

    作者回复: 第一题超赞~ 这个方法非常好,实际上我觉得比较hash的方法还棒!因为它天然能避开冲突的问题,而且存储效率很高,机智如你~ 真的特别好的方法,其实本质上这可以看成是一种编码方式了,老弟你也搞机器学习吗?编码的思路在Machine Learning用的非常普遍~ 当然这里有一个字典维护的成本,不过如果Join Keys的cardinality不是很夸张的话,其实还好~ 2、3的思路都没问题~

    2021-05-15
    5
    10
  • Sam
    问题回答。 第一题: 对每个字段的值生成一个独热编码,以字典来维护,然后再组合各个字段的独热编码,形成一个新的join key。当然这里不一定用独热编码,也能用二进制编码,如果想更加节省空间更加折腾,可以使用 哈夫曼编码,哈夫曼编码 从本质上讲,是将最宝贵的资源(最短的编码)给出现概率最大的信息。 第二题: 先 Cache,再查看执行计划,有示例代码,在第17篇内存视角(三):OMM都是谁的锅?怎么破? 第三题: AQE的自动倾斜处理,在 Reduce 阶段,当 Reduce Task 所需处理的分区尺寸大于一定阈值时,利用 OptimizeSkewedJoin 策略,AQE 会把大分区拆成多个小分区。(阀值可自行设置的)

    作者回复: 前两题满分💯~ 第三题的话,AQE是需要Shuffle才能生效的,因此,要想利用AQE消除数据倾斜,得先对小表做repartition才行。 另外一种可行的办法,就是可以参考后面29讲负隅顽抗里面的“两阶段加盐Shuffle”,通过加盐把数据打散,让原本不均衡的数据,变得均衡,然后再用SHJ去做关联。当然,这种方法的开发成本会高很多,不过算是一种比较通用的方法。 这些方法,这些“招儿”,其实不好说孰优孰劣,还要结合具体数据量、场景,结合实际效果去看。不过,艺多不压身,调优比较重要的一点,就是要思考广、“招儿”多,然后通过对比不同优化方法的优劣势和效果差异,来更好地理解Spark,这样后面再有类似情况,就能轻车熟路啦~

    2021-07-16
    2
    3
  • 子兮
    老师,SHJ 要求小表数据均匀,防止某一分区OOM,是因为小表hash 后,会把相同的key, reduce到一个分区吗?那么这样SMJ 同样也会在shuffle 后有oom 的风险呀?

    作者回复: SHJ和SMJ,最主要的区别,是SMJ可以利用外排,或者说可以依赖磁盘,对于内存的依赖,没那个重。但是SHJ就不同,HJ的build phase,必须完全依赖内存,这也是为什么会对小表的数量体量、数据分布有要求。一旦小表某一个分区,超过Task内存上限,就直接报OOM,整个Job也就fail了

    2021-11-26
    2
  • 超级丹
    老师,我有个疑问,dpp,不会导致inode过高吗? 特别是在用户上做分区这种。。。

    作者回复: 确实,本质上就是Join Key和分区键的矛盾。 分区键要求Cardinality不能太高,否则就会出现文件分布过细、过散,分布式文件系统负担过重。但是另一方面,Join Key往往是userId、orderId、txId这类Cardinality很大的字段,这个是由业务逻辑决定的,很难改变。 因此,分区键和Join Key就会存在博弈,换句话说,是牺牲文件系统效率来强行触发DPP,还是保证文件系统效率而放弃DPP,这里面就会有一个取舍的问题。

    2021-06-22
    3
    2
  • wow_xiaodi
    第一题首先打开aqe,对基表df.dropDupilcates(***).withColumn("code", monotonically_increasing_id()),这样就完成全局编码了。然后驱动表和基表都join上这个字典表,将组合join keys转换为字典值

    作者回复: 编码的思路很赞~ 不过用自增列是不行的,同样的Join Key,应该有同样的编码,这样才不影响原有的关联逻辑,自增列会导致同样的Join Key,有不同的编码。 比如,Join Key是order_id,那么在transactions表中,会有多个同样的order_id出现,也就是多笔交易同属于一个订单,这个时候,自增列编码是不行的。

    2021-08-15
    1
  • CycleGAN
    我们生产端用的2.4也没有AQE机制,但是我觉得减小维度表的大小也能模拟出来,要在维度表上做设计,最有效的还是分区做好,比如我们按时间分区,每个分区的数据局小很多,然后摘出来需要的列,列上的长str做编码,再反映射,列上的长join key做编码等。感觉把维表做小还是值得的,性价比很高。 然后分析大小的话,一是看cache后看大小,主要还是ui页面里的sql计划里看是不是Broadcast,倒是也不需要非常精准。

    作者回复: 对,做小维表的目的,实际上还是把Shuffle joins转化成Broadcast joins,这个转化带来的收益实在是太大了!所以千方百计地尝试做Broadcast joins,确实是非常值得的一件事~ 预估大小UI没问题,另外还可以用executePlan查看执行计划的方法做预估。

    2021-05-22
    1
  • 老师,这些都是基于spark3.0的新特性,那对于3.0之前的版本又该怎么解决呢,毕竟3.0版本是新出的,对于大多数公司不一定使用了该版本

    作者回复: 咱们举了3个案例,其中第二个案例需要用到3.0的AQE,具体来说是Join策略调整;第三个案例是用join hints把Shuffle SMJ转化为Shuffle HJ。这两个case利用的新特性,确实是3.0才支持。 不过第一个,也就是Join Keys比Payload大很多,这个技巧并不需要3.0哈~

    2021-05-19
    1
  • zxk
    老师我想请问下,在第二个案例中,如果我将 join 条件写成 on orders.userId = users.id and users.type = ‘Head User’ ,是否能实现维度表提前过滤并达到 Broadcast 的要求?

    作者回复: 不可以哈,Spark SQL目前还没有那么智能,你这么写的效果,和单独把users.type = ‘Head User’当作过滤条件的效果是一样的,都是通过Spark SQL AQE来动态触发Join策略调整,还做不到在静态优化阶段就提前决定在运行时把Shuffle Joins转化为Broadcast Joins。

    2021-05-16
    2
    1
  • Unknown element
    老师您好 您在评论 @陈威洋 对问题3的回答时说 "AQE是需要Shuffle才能生效的,因此,要想利用AQE消除数据倾斜,得先对小表做repartition才行",但是问题3的背景已经是小表尺寸大于广播阈值 因此必须走shuffle了呀,那AQE在这个shuffle的map文件输出之后进行分区倾斜处理不就可以了吗?为什么要额外对小表进行一次 repartition 呢?谢谢老师~

    作者回复: 确实,可以利用AQE的自动倾斜处理来handle 20G的小表,不过其实我们在下一章节会介绍,AQE的自动倾斜处理,只能handle task粒度的倾斜,没有办法处理Executors粒度的倾斜,这个时候,还是需要依赖两阶段的Shuffle来完成,具体的加盐操作可以参考后面一章哈

    2022-01-19
  • wow_xiaodi
    老师,对于哈希处理join keys或许要考虑基表的列数量和每列数据类型,因为毕竟两个哈希函数计算出来的结果再拼接,长度不下于32甚至大于64。假如原始列内容大部分都是英文和数字,而且大小或长度都不厉害,哈希处理后存储开销会不会更加大了呢?最后有一个问题需要确认下,在tungsten的数据结构里,一个整形数据,是否用到了32bit的存储空间呢?一个utf-8编码的中文是否占用2-4个字节?

    作者回复: 是的, 这个use case比较特殊,就是Join Keys特别多,而且都是String类型,因此整体数据体量很大。确实,像你说的,使用哈希的目的,就是缩减Join Keys大小,那么咱们自然要确认缩减之后的Join Keys,一定要远远小于原始的Join Keys,否则做这件事就没有意义啦~ Tungsten那个,整型占用4个字节,这个和数据类型有关,其实和Tungsten没什么关系的。

    2021-08-14
收起评论
显示
设置
留言
15
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部