18 | 数据关联优化:都有哪些Join策略,开发者该如何取舍?
- 深入了解
- 翻译
- 解释
- 总结
本文深入探讨了大数据处理中的数据关联优化策略,重点介绍了在分布式环境下的Join实现机制及其适用场景。首先对比了Hash Join、Sort Merge Join和Nested Loop Join三种实现机制的特性与优劣,然后介绍了Shuffle Join和Broadcast Join两大类的分布式Join策略。文章强调了Broadcast Join通过广播变量来提高执行效率的优势,并通过具体的代码示例和图示生动地解释了其执行过程和优势。此外,还详细介绍了Spark SQL支持的5种分布式Join策略及其选择偏好,以及Broadcast Join的生效前提条件。总的来说,本文通过深入浅出的方式,帮助读者了解了不同的Join策略及其适用场景,为开发者在实际应用中做出取舍提供了指导。值得注意的是,文章还提出了一个问题,即为什么Spark SQL没有选择支持Broadcast SMJ,并鼓励读者在留言区进行交流互动。整体而言,本文内容丰富,涵盖了大数据处理中的关键技术点,对于对数据关联优化感兴趣的读者具有很高的参考价值。
《零基础入门 Spark》,新⼈⾸单¥59
全部留言(13)
- 最新
- 精选
- AbigailBroadcast Join 中相比 SMJ,HJ 并不要求参与 Join 的两张表有序,也不需要维护两个游标来判断当前的记录位置,只要在 Build 阶段构建的哈希表可以放进内存就行。这个时候,相比 NLJ、SMJ,HJ 的执行效率是最高的。因此,当Broadcast Join 的前提条件存在,在可以采用 HJ 的情况下,Spark 自然就没有必要再去用 SMJ 这种前置开销(排序)比较大的方式去完成数据关联。
作者回复: 是的,正解~ 满分💯
2021-10-20215 - 小新SMJ 在执行稳定性方面,远胜于 HJ,这句话怎么理解? 还有在做等值关联时,优先级是:Broadcast HJ Shuffle SMJ Shuffle HJ那什么情况下Shuffle HJ会启用呢?
作者回复: 好问题,由于咱们这门课程的定位,是入门课,所有很多东西,都没能介绍的更深入,老弟问了个非常好的问题,这个问题在《Spark性能调优实战》有详细的介绍,我把它贴出来,供老弟参考哈 “ 在等值数据关联中,Spark 会尝试按照 BHJ > SMJ > SHJ 的顺序依次选择 Join 策略。在这三种策略中,执行效率最高的是 BHJ,其次是 SHJ,再次是 SMJ。其中,SMJ 和 SHJ 策略支持所有连接类型,如全连接、Anti Join 等等。BHJ 尽管效率最高,但是有两个前提条件:一是连接类型不能是全连接(Full Outer Join);二是基表要足够小,可以放到广播变量里面去。 那为什么 SHJ 比 SMJ 执行效率高,排名却不如 SMJ 靠前呢?这是个非常好的问题。我们先来说结论,相比 SHJ,Spark 优先选择 SMJ 的原因在于,SMJ 的实现方式更加稳定,更不容易 OOM。回顾 HJ 的实现机制,在 Build 阶段,算法根据内表创建哈希表。在 Probe 阶段,为了让外表能够成功“探测”(Probe)到每一个 Hash Key,哈希表要全部放进内存才行。坦白说,这个前提还是蛮苛刻的,仅这一点要求就足以让 Spark 对其望而却步。 要知道,在不同的计算场景中,数据分布的多样性很难保证内表一定能全部放进内存。而且在 Spark 中,SHJ 策略要想被选中必须要满足两个先决条件,这两个条件都是对数据尺寸的要求。首先,外表大小至少是内表的 3 倍。其次,内表数据分片的平均大小要小于广播变量阈值。第一个条件的动机很好理解,只有当内外表的尺寸悬殊到一定程度时,HJ 的优势才会比 SMJ 更显著。第二个限制的目的是,确保内表的每一个数据分片都能全部放进内存。 和 SHJ 相比,SMJ 没有这么多的附加条件,无论是单表排序,还是两表做归并关联,都可以借助磁盘来完成。内存中放不下的数据,可以临时溢出到磁盘。单表排序的过程,我们可以参考 Shuffle Map 阶段生成中间文件的过程。在做归并关联的时候,算法可以把磁盘中的有序数据用合理的粒度,依次加载进内存完成计算。这个粒度可大可小,大到以数据分片为单位,小到逐条扫描。 正是考虑到这些因素,相比 SHJ,Spark SQL 会优先选择 SMJ。事实上,在配置项 spark.sql.join.preferSortMergeJoin 默认为 True 的情况下,Spark SQL 会用 SMJ 策略来兜底,确保作业执行的稳定性,压根就不会打算去尝试 SHJ。开发者如果想通过配置项来调整 Join 策略,需要把这个参数改为 False,这样 Spark SQL 才有可能去尝试 SHJ。 ”
2021-12-028 - Unknown element“学习过 Shuffle 之后,我们知道,Shuffle 在 Map 阶段往往会对数据做排序,而这恰恰正中 SMJ 机制的下怀。” 老师问下这里 join 之前应该还得再排一次序吧?因为 map 阶段的排序只能保证 reduce task 从每个 map task 拉取过来的数据片段是有序的,但是多个数据片段之间还是无序的吧
作者回复: 这里并不需要全局有序,只要Task内部两边有序就可以了,老弟不妨想一想为什么~ 提示:本质上,还是要理解,分布式下的Join与单机Join的不同~
2021-10-2142 - 王璀璨老师最近在用spark重构pandas的时候遇到一个问题,在udf中使用filter查询的时候报错 temp_data = hospital_data_sheet1.groupby(['hcp_id', 'hcp_name']).count().select('hcp_id', 'hcp_name') def get_data(data1): search_data = hospital_data_sheet1.select((hospital_data_sheet1['hcp_id'] == data1)) total = data1 return total get_number = F.udf(get_data, StringType()) result_data = temp_data.withColumn('total', get_number(temp_data['hcp_id'])) result_data.show() 最后报错 _pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object 不知道为什么会这样,请老师看一下
作者回复: get_data函数定义的有问题,你只需要把输入、输出的计算逻辑定义清楚就可以了,不要在函数里面引入外面的数据集,比如hospital_data_sheet1,spark是不可能把它序列化的。get_number这udf,后面实际的函数是get_data,所以它需要序列化这个函数:get_data。但是这个函数里面,有一个数据集,这个数据集对于Spark调度系统来说,是透明的,不可能序列化的。所以,把这个数据集去掉,而且,这个函数的计算逻辑,跟这个数据集没有任何关系的。你需要定义一个简单数据处理逻辑。然后最后在result_data = temp_data.withColumn('total', get_number(temp_data['hcp_id']))这里面调用就好了
2022-01-111 - LJK老师好,工作中碰到过ERROR BroadcastExchangeExec: Could not execute broadcast in 300 secs.的报错,请问这种报错的排查思路以及有哪些可能的原因导致的呢?
作者回复: 广播变量创建超时~ 原因是广播变量封装的数据量太大了,导致在分发的过程中超时,两种解决思路: 1)想办法把广播变量封装的内容变小 2)增加广播变量超时时间,调大配置项:spark.sql.broadcastTimeout,他的默认值就是这里的300s,把它调大一些就好了
2021-11-051 - 慢慢卢"学习过 Shuffle 之后,我们知道,Shuffle 在 Map 阶段往往会对数据做排序,而这恰恰正中 SMJ 机制的下怀。对于已经排好序的两张表,SMJ 的复杂度是 O(M + N),这样的执行效率与 HJ 的 O(M) 可以说是不相上下。再者,SMJ 在执行稳定性方面,远胜于 HJ,在内存受限的情况下,SMJ 可以充分利用磁盘来顺利地完成关联计算。因此,考虑到 Shuffle SMJ 的诸多优势,Shuffle HJ 就像是关公后面的周仓,Spark SQL 向来对之视而不见,所以对于 HJ 你大概知道它的作用就行。" 这段里面提到SMJ可以利用磁盘完成计算,结合前面提到内存管理,能使用磁盘的除了cache、shuffle write外,也就是说内存计算其他过程也会使用到磁盘(比如SMJ),但我理解内存计算应该完全在内存中,不然就不会有OOM了。 所以这点我没有搞懂,辛苦老师指导解释下。
作者回复: 这里说的是Spark的Spill和外排机制,正如你所说,内存计算,理论上都该在内存中完成,但实际情况是,数据体量,往往超出内存的限制,因此通过spill和外排,来保证任务的顺利执行。Spark当中很多环节,都会利用磁盘的,比方说shuffle、cache、join、reuseExchange,等等,等等。当然,对于大多数计算,spark还是优先利用内存的,实在放不下,才考虑磁盘
2021-12-09 - Geek_038655请问:大表join大表怎么优化?
作者回复: 这块要说声抱歉,由于是入门篇,所以咱们没有安排这部分课程。大表Join大表属于比较复杂的场景,需要特殊对待,性能篇有介绍具体怎么处理。 当然,并不是鼓励老弟去买性能篇,不是这个意思,只是交代一下,为什么大表Join大表没有录入咱们这门课。 老弟有需要的话,我把性能篇里面相关章节的核心内容,copy到评论区,希望对你有所帮助~
2021-10-26 - 思绪纷繁【由于左表与右表在并行度(分区数)上是一致的】 想问下,只要做shuffle join操作,左表和右表的的并行度一定是一样的吗?2023-06-06归属地:北京1
- 嬴梦川shuffle write 过程中对结构中的数据记录按(目标分区 ID,Key)排序, 排列后的结果应该不光对“目标分区 ID”有序,也应该对“Key”有序。这样在reduce阶段拉取数据时再做一次时间复杂度为O(N)归并排序就行了。不会改变复杂度O(M+N)的最终结果2023-09-24归属地:江苏
- 嬴梦川第六章中, shuffle write 过程中对结构中的数据记录按(目标分区 ID,Key)排序, 排列后的结果应该不光对“目标分区 ID”有序,也应该对“Key”是有序的吧2023-09-24归属地:江苏