• Abigail
    2021-10-20
    Broadcast Join 中相比 SMJ,HJ 并不要求参与 Join 的两张表有序,也不需要维护两个游标来判断当前的记录位置,只要在 Build 阶段构建的哈希表可以放进内存就行。这个时候,相比 NLJ、SMJ,HJ 的执行效率是最高的。因此,当Broadcast Join 的前提条件存在,在可以采用 HJ 的情况下,Spark 自然就没有必要再去用 SMJ 这种前置开销(排序)比较大的方式去完成数据关联。

    作者回复: 是的,正解~ 满分💯

    
    14
  • 小新
    2021-12-02
    SMJ 在执行稳定性方面,远胜于 HJ,这句话怎么理解? 还有在做等值关联时,优先级是:Broadcast HJ Shuffle SMJ Shuffle HJ那什么情况下Shuffle HJ会启用呢?

    作者回复: 好问题,由于咱们这门课程的定位,是入门课,所有很多东西,都没能介绍的更深入,老弟问了个非常好的问题,这个问题在《Spark性能调优实战》有详细的介绍,我把它贴出来,供老弟参考哈 “ 在等值数据关联中,Spark 会尝试按照 BHJ > SMJ > SHJ 的顺序依次选择 Join 策略。在这三种策略中,执行效率最高的是 BHJ,其次是 SHJ,再次是 SMJ。其中,SMJ 和 SHJ 策略支持所有连接类型,如全连接、Anti Join 等等。BHJ 尽管效率最高,但是有两个前提条件:一是连接类型不能是全连接(Full Outer Join);二是基表要足够小,可以放到广播变量里面去。 那为什么 SHJ 比 SMJ 执行效率高,排名却不如 SMJ 靠前呢?这是个非常好的问题。我们先来说结论,相比 SHJ,Spark 优先选择 SMJ 的原因在于,SMJ 的实现方式更加稳定,更不容易 OOM。回顾 HJ 的实现机制,在 Build 阶段,算法根据内表创建哈希表。在 Probe 阶段,为了让外表能够成功“探测”(Probe)到每一个 Hash Key,哈希表要全部放进内存才行。坦白说,这个前提还是蛮苛刻的,仅这一点要求就足以让 Spark 对其望而却步。 要知道,在不同的计算场景中,数据分布的多样性很难保证内表一定能全部放进内存。而且在 Spark 中,SHJ 策略要想被选中必须要满足两个先决条件,这两个条件都是对数据尺寸的要求。首先,外表大小至少是内表的 3 倍。其次,内表数据分片的平均大小要小于广播变量阈值。第一个条件的动机很好理解,只有当内外表的尺寸悬殊到一定程度时,HJ 的优势才会比 SMJ 更显著。第二个限制的目的是,确保内表的每一个数据分片都能全部放进内存。 和 SHJ 相比,SMJ 没有这么多的附加条件,无论是单表排序,还是两表做归并关联,都可以借助磁盘来完成。内存中放不下的数据,可以临时溢出到磁盘。单表排序的过程,我们可以参考 Shuffle Map 阶段生成中间文件的过程。在做归并关联的时候,算法可以把磁盘中的有序数据用合理的粒度,依次加载进内存完成计算。这个粒度可大可小,大到以数据分片为单位,小到逐条扫描。 正是考虑到这些因素,相比 SHJ,Spark SQL 会优先选择 SMJ。事实上,在配置项 spark.sql.join.preferSortMergeJoin 默认为 True 的情况下,Spark SQL 会用 SMJ 策略来兜底,确保作业执行的稳定性,压根就不会打算去尝试 SHJ。开发者如果想通过配置项来调整 Join 策略,需要把这个参数改为 False,这样 Spark SQL 才有可能去尝试 SHJ。 ”

    
    7
  • Unknown element
    2021-10-21
    “学习过 Shuffle 之后,我们知道,Shuffle 在 Map 阶段往往会对数据做排序,而这恰恰正中 SMJ 机制的下怀。” 老师问下这里 join 之前应该还得再排一次序吧?因为 map 阶段的排序只能保证 reduce task 从每个 map task 拉取过来的数据片段是有序的,但是多个数据片段之间还是无序的吧

    作者回复: 这里并不需要全局有序,只要Task内部两边有序就可以了,老弟不妨想一想为什么~ 提示:本质上,还是要理解,分布式下的Join与单机Join的不同~

    共 3 条评论
    2
  • 王璀璨
    2022-01-11
    老师最近在用spark重构pandas的时候遇到一个问题,在udf中使用filter查询的时候报错 temp_data = hospital_data_sheet1.groupby(['hcp_id', 'hcp_name']).count().select('hcp_id', 'hcp_name') def get_data(data1): search_data = hospital_data_sheet1.select((hospital_data_sheet1['hcp_id'] == data1)) total = data1 return total get_number = F.udf(get_data, StringType()) result_data = temp_data.withColumn('total', get_number(temp_data['hcp_id'])) result_data.show() 最后报错 _pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object 不知道为什么会这样,请老师看一下

    作者回复: get_data函数定义的有问题,你只需要把输入、输出的计算逻辑定义清楚就可以了,不要在函数里面引入外面的数据集,比如hospital_data_sheet1,spark是不可能把它序列化的。get_number这udf,后面实际的函数是get_data,所以它需要序列化这个函数:get_data。但是这个函数里面,有一个数据集,这个数据集对于Spark调度系统来说,是透明的,不可能序列化的。所以,把这个数据集去掉,而且,这个函数的计算逻辑,跟这个数据集没有任何关系的。你需要定义一个简单数据处理逻辑。然后最后在result_data = temp_data.withColumn('total', get_number(temp_data['hcp_id']))这里面调用就好了

    
    1
  • LJK
    2021-11-05
    老师好,工作中碰到过ERROR BroadcastExchangeExec: Could not execute broadcast in 300 secs.的报错,请问这种报错的排查思路以及有哪些可能的原因导致的呢?

    作者回复: 广播变量创建超时~ 原因是广播变量封装的数据量太大了,导致在分发的过程中超时,两种解决思路: 1)想办法把广播变量封装的内容变小 2)增加广播变量超时时间,调大配置项:spark.sql.broadcastTimeout,他的默认值就是这里的300s,把它调大一些就好了

    
    1
  • 慢慢卢
    2021-12-09
    "学习过 Shuffle 之后,我们知道,Shuffle 在 Map 阶段往往会对数据做排序,而这恰恰正中 SMJ 机制的下怀。对于已经排好序的两张表,SMJ 的复杂度是 O(M + N),这样的执行效率与 HJ 的 O(M) 可以说是不相上下。再者,SMJ 在执行稳定性方面,远胜于 HJ,在内存受限的情况下,SMJ 可以充分利用磁盘来顺利地完成关联计算。因此,考虑到 Shuffle SMJ 的诸多优势,Shuffle HJ 就像是关公后面的周仓,Spark SQL 向来对之视而不见,所以对于 HJ 你大概知道它的作用就行。" 这段里面提到SMJ可以利用磁盘完成计算,结合前面提到内存管理,能使用磁盘的除了cache、shuffle write外,也就是说内存计算其他过程也会使用到磁盘(比如SMJ),但我理解内存计算应该完全在内存中,不然就不会有OOM了。 所以这点我没有搞懂,辛苦老师指导解释下。

    作者回复: 这里说的是Spark的Spill和外排机制,正如你所说,内存计算,理论上都该在内存中完成,但实际情况是,数据体量,往往超出内存的限制,因此通过spill和外排,来保证任务的顺利执行。Spark当中很多环节,都会利用磁盘的,比方说shuffle、cache、join、reuseExchange,等等,等等。当然,对于大多数计算,spark还是优先利用内存的,实在放不下,才考虑磁盘

    
    
  • Geek_038655
    2021-10-26
    请问:大表join大表怎么优化?

    作者回复: 这块要说声抱歉,由于是入门篇,所以咱们没有安排这部分课程。大表Join大表属于比较复杂的场景,需要特殊对待,性能篇有介绍具体怎么处理。 当然,并不是鼓励老弟去买性能篇,不是这个意思,只是交代一下,为什么大表Join大表没有录入咱们这门课。 老弟有需要的话,我把性能篇里面相关章节的核心内容,copy到评论区,希望对你有所帮助~

    
    
  • 思绪纷繁
    2023-06-06 来自北京
    【由于左表与右表在并行度(分区数)上是一致的】 想问下,只要做shuffle join操作,左表和右表的的并行度一定是一样的吗?
    
    1
  • 18736416569
    2023-02-24 来自新加坡
    老师好,前几天跑任务时遇到了关于join的一个问题,很是不解:逻辑是这样的:a表id去left join b表的id_1或者id_2,我用的是a.id = b.id_1 OR a.id = b.id_2,发现spark采用的是BroadCastNestedLoopJoin,难道这不算是等值连接吗?
    
    
  • 小麦
    2022-11-19 来自浙江
    【由于左表与右表在并行度(分区数)上是一致的】 想问下,如果是 Hadoop RDD,左表数据量很大,以 128M 划分成10个分区,而右表只有2个分区。如何进行后续的计算呢?
    共 1 条评论
    