零基础入门 Spark
吴磊
前 FreeWheel 机器学习研发经理
19171 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 38 讲
零基础入门 Spark
15
15
1.0x
00:00/00:00
登录|注册

18 | 数据关联优化:都有哪些Join策略,开发者该如何取舍?

你好,我是吴磊。
在上一讲,我们分别从关联形式与实现机制这两个方面,对数据分析进行了讲解和介绍。对于不同关联形式的用法和实现机制的原理,想必你已经了然于胸。不过,在大数据的应用场景中,数据的处理往往是在分布式的环境下进行的,在这种情况下,数据关联的计算还要考虑网络分发这个环节。
我们知道,在分布式环境中,Spark 支持两类数据分发模式。一类是我们在第 7 讲学过的 Shuffle,Shuffle 通过中间文件来完成 Map 阶段与 Reduce 阶段的数据交换,因此它会引入大量的磁盘与网络开销。另一类是我们在第 10 讲介绍的广播变量(Broadcast Variables),广播变量在 Driver 端创建,并由 Driver 分发到各个 Executors。
因此,从数据分发模式的角度出发,数据关联又可以分为 Shuffle Join 和 Broadcast Join 这两大类。将两种分发模式与 Join 本身的 3 种实现机制相结合,就会衍生出分布式环境下的 6 种 Join 策略。
那么,对于这 6 种 Join 策略,Spark SQL 是如何支持的呢?它们的优劣势与适用场景都有哪些?开发者能否针对这些策略有的放矢地进行取舍?今天这一讲,咱们就来聊聊这些话题。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入探讨了大数据处理中的数据关联优化策略,重点介绍了在分布式环境下的Join实现机制及其适用场景。首先对比了Hash Join、Sort Merge Join和Nested Loop Join三种实现机制的特性与优劣,然后介绍了Shuffle Join和Broadcast Join两大类的分布式Join策略。文章强调了Broadcast Join通过广播变量来提高执行效率的优势,并通过具体的代码示例和图示生动地解释了其执行过程和优势。此外,还详细介绍了Spark SQL支持的5种分布式Join策略及其选择偏好,以及Broadcast Join的生效前提条件。总的来说,本文通过深入浅出的方式,帮助读者了解了不同的Join策略及其适用场景,为开发者在实际应用中做出取舍提供了指导。值得注意的是,文章还提出了一个问题,即为什么Spark SQL没有选择支持Broadcast SMJ,并鼓励读者在留言区进行交流互动。整体而言,本文内容丰富,涵盖了大数据处理中的关键技术点,对于对数据关联优化感兴趣的读者具有很高的参考价值。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《零基础入门 Spark》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(13)

  • 最新
  • 精选
  • Abigail
    Broadcast Join 中相比 SMJ,HJ 并不要求参与 Join 的两张表有序,也不需要维护两个游标来判断当前的记录位置,只要在 Build 阶段构建的哈希表可以放进内存就行。这个时候,相比 NLJ、SMJ,HJ 的执行效率是最高的。因此,当Broadcast Join 的前提条件存在,在可以采用 HJ 的情况下,Spark 自然就没有必要再去用 SMJ 这种前置开销(排序)比较大的方式去完成数据关联。

    作者回复: 是的,正解~ 满分💯

    2021-10-20
    2
    15
  • 小新
    SMJ 在执行稳定性方面,远胜于 HJ,这句话怎么理解? 还有在做等值关联时,优先级是:Broadcast HJ Shuffle SMJ Shuffle HJ那什么情况下Shuffle HJ会启用呢?

    作者回复: 好问题,由于咱们这门课程的定位,是入门课,所有很多东西,都没能介绍的更深入,老弟问了个非常好的问题,这个问题在《Spark性能调优实战》有详细的介绍,我把它贴出来,供老弟参考哈 “ 在等值数据关联中,Spark 会尝试按照 BHJ > SMJ > SHJ 的顺序依次选择 Join 策略。在这三种策略中,执行效率最高的是 BHJ,其次是 SHJ,再次是 SMJ。其中,SMJ 和 SHJ 策略支持所有连接类型,如全连接、Anti Join 等等。BHJ 尽管效率最高,但是有两个前提条件:一是连接类型不能是全连接(Full Outer Join);二是基表要足够小,可以放到广播变量里面去。 那为什么 SHJ 比 SMJ 执行效率高,排名却不如 SMJ 靠前呢?这是个非常好的问题。我们先来说结论,相比 SHJ,Spark 优先选择 SMJ 的原因在于,SMJ 的实现方式更加稳定,更不容易 OOM。回顾 HJ 的实现机制,在 Build 阶段,算法根据内表创建哈希表。在 Probe 阶段,为了让外表能够成功“探测”(Probe)到每一个 Hash Key,哈希表要全部放进内存才行。坦白说,这个前提还是蛮苛刻的,仅这一点要求就足以让 Spark 对其望而却步。 要知道,在不同的计算场景中,数据分布的多样性很难保证内表一定能全部放进内存。而且在 Spark 中,SHJ 策略要想被选中必须要满足两个先决条件,这两个条件都是对数据尺寸的要求。首先,外表大小至少是内表的 3 倍。其次,内表数据分片的平均大小要小于广播变量阈值。第一个条件的动机很好理解,只有当内外表的尺寸悬殊到一定程度时,HJ 的优势才会比 SMJ 更显著。第二个限制的目的是,确保内表的每一个数据分片都能全部放进内存。 和 SHJ 相比,SMJ 没有这么多的附加条件,无论是单表排序,还是两表做归并关联,都可以借助磁盘来完成。内存中放不下的数据,可以临时溢出到磁盘。单表排序的过程,我们可以参考 Shuffle Map 阶段生成中间文件的过程。在做归并关联的时候,算法可以把磁盘中的有序数据用合理的粒度,依次加载进内存完成计算。这个粒度可大可小,大到以数据分片为单位,小到逐条扫描。 正是考虑到这些因素,相比 SHJ,Spark SQL 会优先选择 SMJ。事实上,在配置项 spark.sql.join.preferSortMergeJoin 默认为 True 的情况下,Spark SQL 会用 SMJ 策略来兜底,确保作业执行的稳定性,压根就不会打算去尝试 SHJ。开发者如果想通过配置项来调整 Join 策略,需要把这个参数改为 False,这样 Spark SQL 才有可能去尝试 SHJ。 ”

    2021-12-02
    8
  • Unknown element
    “学习过 Shuffle 之后,我们知道,Shuffle 在 Map 阶段往往会对数据做排序,而这恰恰正中 SMJ 机制的下怀。” 老师问下这里 join 之前应该还得再排一次序吧?因为 map 阶段的排序只能保证 reduce task 从每个 map task 拉取过来的数据片段是有序的,但是多个数据片段之间还是无序的吧

    作者回复: 这里并不需要全局有序,只要Task内部两边有序就可以了,老弟不妨想一想为什么~ 提示:本质上,还是要理解,分布式下的Join与单机Join的不同~

    2021-10-21
    4
    2
  • 王璀璨
    老师最近在用spark重构pandas的时候遇到一个问题,在udf中使用filter查询的时候报错 temp_data = hospital_data_sheet1.groupby(['hcp_id', 'hcp_name']).count().select('hcp_id', 'hcp_name') def get_data(data1): search_data = hospital_data_sheet1.select((hospital_data_sheet1['hcp_id'] == data1)) total = data1 return total get_number = F.udf(get_data, StringType()) result_data = temp_data.withColumn('total', get_number(temp_data['hcp_id'])) result_data.show() 最后报错 _pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object 不知道为什么会这样,请老师看一下

    作者回复: get_data函数定义的有问题,你只需要把输入、输出的计算逻辑定义清楚就可以了,不要在函数里面引入外面的数据集,比如hospital_data_sheet1,spark是不可能把它序列化的。get_number这udf,后面实际的函数是get_data,所以它需要序列化这个函数:get_data。但是这个函数里面,有一个数据集,这个数据集对于Spark调度系统来说,是透明的,不可能序列化的。所以,把这个数据集去掉,而且,这个函数的计算逻辑,跟这个数据集没有任何关系的。你需要定义一个简单数据处理逻辑。然后最后在result_data = temp_data.withColumn('total', get_number(temp_data['hcp_id']))这里面调用就好了

    2022-01-11
    1
  • LJK
    老师好,工作中碰到过ERROR BroadcastExchangeExec: Could not execute broadcast in 300 secs.的报错,请问这种报错的排查思路以及有哪些可能的原因导致的呢?

    作者回复: 广播变量创建超时~ 原因是广播变量封装的数据量太大了,导致在分发的过程中超时,两种解决思路: 1)想办法把广播变量封装的内容变小 2)增加广播变量超时时间,调大配置项:spark.sql.broadcastTimeout,他的默认值就是这里的300s,把它调大一些就好了

    2021-11-05
    1
  • 慢慢卢
    "学习过 Shuffle 之后,我们知道,Shuffle 在 Map 阶段往往会对数据做排序,而这恰恰正中 SMJ 机制的下怀。对于已经排好序的两张表,SMJ 的复杂度是 O(M + N),这样的执行效率与 HJ 的 O(M) 可以说是不相上下。再者,SMJ 在执行稳定性方面,远胜于 HJ,在内存受限的情况下,SMJ 可以充分利用磁盘来顺利地完成关联计算。因此,考虑到 Shuffle SMJ 的诸多优势,Shuffle HJ 就像是关公后面的周仓,Spark SQL 向来对之视而不见,所以对于 HJ 你大概知道它的作用就行。" 这段里面提到SMJ可以利用磁盘完成计算,结合前面提到内存管理,能使用磁盘的除了cache、shuffle write外,也就是说内存计算其他过程也会使用到磁盘(比如SMJ),但我理解内存计算应该完全在内存中,不然就不会有OOM了。 所以这点我没有搞懂,辛苦老师指导解释下。

    作者回复: 这里说的是Spark的Spill和外排机制,正如你所说,内存计算,理论上都该在内存中完成,但实际情况是,数据体量,往往超出内存的限制,因此通过spill和外排,来保证任务的顺利执行。Spark当中很多环节,都会利用磁盘的,比方说shuffle、cache、join、reuseExchange,等等,等等。当然,对于大多数计算,spark还是优先利用内存的,实在放不下,才考虑磁盘

    2021-12-09
  • Geek_038655
    请问:大表join大表怎么优化?

    作者回复: 这块要说声抱歉,由于是入门篇,所以咱们没有安排这部分课程。大表Join大表属于比较复杂的场景,需要特殊对待,性能篇有介绍具体怎么处理。 当然,并不是鼓励老弟去买性能篇,不是这个意思,只是交代一下,为什么大表Join大表没有录入咱们这门课。 老弟有需要的话,我把性能篇里面相关章节的核心内容,copy到评论区,希望对你有所帮助~

    2021-10-26
  • 思绪纷繁
    【由于左表与右表在并行度(分区数)上是一致的】 想问下,只要做shuffle join操作,左表和右表的的并行度一定是一样的吗?
    2023-06-06归属地:北京
    1
  • 嬴梦川
    shuffle write 过程中对结构中的数据记录按(目标分区 ID,Key)排序, 排列后的结果应该不光对“目标分区 ID”有序,也应该对“Key”有序。这样在reduce阶段拉取数据时再做一次时间复杂度为O(N)归并排序就行了。不会改变复杂度O(M+N)的最终结果
    2023-09-24归属地:江苏
  • 嬴梦川
    第六章中, shuffle write 过程中对结构中的数据记录按(目标分区 ID,Key)排序, 排列后的结果应该不光对“目标分区 ID”有序,也应该对“Key”是有序的吧
    2023-09-24归属地:江苏
收起评论
显示
设置
留言
13
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部