Spark 性能调优实战
吴磊
前 FreeWheel 机器学习团队负责人
8808 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 36 讲
Spark 性能调优实战
15
15
1.0x
00:00/00:00
登录|注册

13 | 广播变量(二):如何让Spark SQL选择Broadcast Joins?

你好,我是吴磊。
上一讲我们说到,在数据关联场景中,广播变量是克制 Shuffle 的杀手锏,用 Broadcast Joins 取代 Shuffle Joins 可以大幅提升执行性能。但是,很多同学只会使用默认的广播变量,不会去调优。那么,我们该怎么保证 Spark 在运行时优先选择 Broadcast Joins 策略呢?
今天这一讲,我就围绕着数据关联场景,从配置项和开发 API 两个方面,帮你梳理出两类调优手段,让你能够游刃有余地运用广播变量。

利用配置项强制广播

我们先来从配置项的角度说一说,有哪些办法可以让 Spark 优先选择 Broadcast Joins。在 Spark SQL 配置项那一讲,我们提到过 spark.sql.autoBroadcastJoinThreshold 这个配置项。它的设置值是存储大小,默认是 10MB。它的含义是,对于参与 Join 的两张表来说,任意一张表的尺寸小于 10MB,Spark 就在运行时采用 Broadcast Joins 的实现方式去做数据关联。另外,AQE 在运行时尝试动态调整 Join 策略时,也是基于这个参数来判定过滤后的数据表是否足够小,从而把原本的 Shuffle Joins 调整为 Broadcast Joins。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入探讨了如何让Spark SQL优先选择Broadcast Joins的方法,包括通过配置项和API强制广播两种途径。首先,从配置项角度介绍了如何通过调整参数值来强制Spark选择Broadcast Joins,并强调了在设置广播阈值时需要注意内存中的存储大小与磁盘上的存储大小存在差异。其次,详细介绍了利用API强制广播的方法,包括使用Join Hints和SQL functions中的broadcast函数来指导Spark SQL选择Broadcast Joins。文章通过实际案例和代码示例,为读者提供了丰富的实践经验和技术指导。同时,强调了广播变量并非解决所有数据关联问题的银弹,需要谨慎使用。总的来说,本文内容深入浅出,适合读者快速了解并掌握相关技术特点。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Spark 性能调优实战》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(18)

  • 最新
  • 精选
  • kingcall
    第一题:可以参考JoinStrategyHint.scala BROADCAST, SHUFFLE_MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL 第二题:本质上是一样的,sql 的broadcast返回值是一个Dataset[T],而sparkContext.broadcast的返回值是一个Broadcast[T] 类型值,需要调用value方法,才能返回被广播出去的变量,所以二者在使用的使用的体现形式上,sparkContext.broadcast 需要你调用一次value 方法才能和其他DF 进行join,下面提供一个demo 进行说明 import org.apache.spark.sql.functions.broadcast val transactionsDF: DataFrame = sparksession.range(100).toDF("userID") val userDF: DataFrame = sparksession.range(10, 20).toDF("userID") val bcUserDF = broadcast(userDF) val bcUserDF2 = sparkContext.broadcast(userDF) val dataFrame = transactionsDF.join(bcUserDF, Seq("userID"), "inner") dataFrame.show() val dataFrame2 = transactionsDF.join(bcUserDF2.value, Seq("userID"), "inner") dataFrame2.show()

    作者回复: Perfect x 2!两道题都是满分💯~ 不过,第一题,我再追问一句,当然,这么追问有点过分,哈哈,毕竟咱们这节课还没有讲不同Join的实现方式(26讲会展开)。追问的问题是: SHUFFLE_MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL 这几个hints的作用和效果,分别是什么?他们分别适合哪些场景?不妨提前思考一下,等到了26讲,咱们再细聊哈~

    2021-04-12
    3
    25
  • Geek_d794f8
    老师我做了一个测试,我的表数据是parquet存储,snappy压缩的,磁盘的存储大小为133.4M。我将广播变量的阈值调到了134M,它却可以自动广播;当我将阈值调到132M,则不会自动广播。 我用老师的方法做了一个数据在内存展开的预估,大概1000M左右,那么为什么我按照磁盘的大小设定广播阈值,它能够广播呢?

    作者回复: 好问题~ 这里我先说声抱歉,看到你这个问题,我意识到我原文没有说清楚。是这样的,我们分两种情况来说。为了叙述方便,我把你的数据集记为D,也就是磁盘中133.4MB,展开到内存是1000MB。 第一种情况,数据集D上不存在Cache,这个时候那D直接参与后续的计算,比如Join,由于Spark SQL无从判断D在内存中的大小,因此它会“偷懒”地直接拿磁盘大小作为判断依据,也就是你上面说的那种情况,广播阈值大于133.4MB,就可以广播,反之则不行。不得不说,这个“懒”偷的有点过分,不过现阶段Spark SQL就是这么做的,这个和Spark的版本没有关系,3.0中Spark SQL也是这么处理的。 第二种情况,数据集之上有Cache,这个时候,Spark SQL判定的依据,就是Cache中的存储大小,也就是1000MB,所以你不妨再做个实验,也就是用Cache过后的D再去做Join,这个时候,你会发现,广播阈值必须要大于1000MB才行。 总结下来,对于数据集大小的判定,Cache与否完全不同,不Cache则直接用磁盘存储大小,Cache过后则使用Cache中的存储大小。需要特别注意的是,第一种情况,也就是不加Cache的时候,Spark SQL简单粗暴地直接用磁盘存储大小,坦白讲是存在隐患的,也就是创建广播变量创建过程中Driver端的OOM问题,或是广播变量创建过程中的Time out问题。因此,即便在那种情况之下,Spark SQL会选择参考磁盘存储大小,但是我的建议还是事先估算好数据集在内存中的存储大小,做到有备无患。

    2021-05-18
    5
    17
  • YJ
    老师,我有一个问题。 bigTableA.Join(broadcast(smallTable), ...); bigTableB.Join(broadsast(smallTable), ...); bigTableA.Join(bigTableB, ...); 这里 广播了的smallTable 会被第二个join重用吗? 还是说会被广播两次?

    作者回复: 好问题, 这种写法,Spark不会复用先前的广播变量,所以第二次的Broadcast会重复计算。 复用广播最保险的方式,是这种写法: val bcSmallTable = sparkContext.broadcast(smallTable) bigTableA.Join(bcSmallTable.value, ...); bigTableB.Join(bcSmallTable.value, ...);

    2021-04-16
    3
    16
  • Geek1185
    为什么left join的时候不能广播左边的小表呢?几百行的表左连接几亿行的表(业务上要求即便没关联上也要保留左表的记录)。 就像为什么left join时,左表在on的谓词不能下推? 我不太明白,希望老师解答

    作者回复: 好问题~ 这主要是因为left join的计算逻辑。简单的回答是,广播左表、或者说基表,没用! 为什么这么说呢,你不妨仔细想想left join的原理。它是把inner join的结果,再加上不满足关联条件的结果。 如果是广播右表的话,左表的数据分区,能看到右表的全量数据,不管是满足关联条件和不满足关联条件,左表的数据分区都能立即得到答案。 但是,反过来不行,比如把左表广播了,右表“待在原地、保持不动”,那对于左表来说,对于每一个右表的数据分区来说,左表没有全局视角,它只能知道哪些右表分区的数据满足关联条件,但是,它不知道在全局情况下,到底有没有不满足关联条件、需要生成null的数据记录。 可能说的有点绕,你不妨仔细想想left join的计算原理,就能明白,在left join下,广播左表是没有用的;同理,right join,广播右表,也没有用。因为他们都不能实现left/right join原本的计算逻辑~

    2021-08-30
    3
    11
  • 周俊
    老师,假设我有16张表需要连接,其余15张都是小表,如果我将15张小表都做成广播变量,假设他们的总数据量超过了8G,是不是会直接OOM呀,还是说只要每一个广播变量不超过8g,就不会有问题。

    作者回复: 是这样的,每个广播的上限是8G,超过这个限制,Spark直接抛异常。所以,这个8G是以广播变量为粒度的。15个小表的话,只要他们各自不超过8G就没事。不过,话说,同时搞15个广播变量,Driver不一定受得了~ 当然,要看你的每个广播变量有多大了。 不过anyway,只要你Driver内存足够大,每个广播又不超过8G,就没事。

    2021-08-17
    5
  • Jefitar
    老师,有个问题,字符串“abcd”只需要消耗 4 个字节,为什么JVM 在堆内存储这 4 个字符串总共需要消耗 48 个字节?

    作者回复: 具体细节可以参考这里哈:https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html java.lang.String object internals: OFFSET SIZE TYPE DESCRIPTION VALUE 0 4 (object header) ... 4 4 (object header) ... 8 4 (object header) ... 12 4 char[] String.value [] 16 4 int String.hash 0 20 4 int String.hash32 0 Instance size: 24 bytes (reported by Instrumentation API)

    2021-04-19
    3
  • Unknown element
    老师您好 请问 val plan = df.queryExecution.logicalval estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes 这个查看内存占用的方法是在哪里看到的呢?我在官方文档 https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions 里把这些方法都搜了一遍没有搜到QAQ

    作者回复: 最早在哪里看到的,说实话我也忘了[允悲],太久了。不过这个方法一定是切实可行的,日常的工作中验证无数次了哈~ 需要提醒的是,要预估数据集的内存占用,需要先把数据集cache起来,否则的话,sizeInBytes给出的,实际上源文件在磁盘上的存储大小,切记切记

    2022-01-06
    2
  • 笨小孩
    老师你好 在SparkSql中使用类似with as 这样的语法 会自动广播这张表嘛

    作者回复: 不会的~ 和语法没什么关系,主要是看表大小。

    2021-05-25
    2
  • 魏海霞
    老师您好,用sparksql开发,遇到一个写了hint也不走broadcast的情况。具体情况是这样的,A表是个大表,有20多亿条记录,b,c,d都是小表,表就几个字段,数据最多也就3000多条,select /*+ broadcast(b,c,d) from a join b jion c left join d 执行计划里b c都用的是BroadcastHashJOIN,d表是SortMergeJoin。d表不走bhj的原因大概是什么?能给个思路吗?

    作者回复: 能把执行计划贴出来吗?(可以在join完成之后的DataFrame之上调用explain) 我想知道这4张表的关联顺序,讲道理,应该是: ((a inner join b) inner join c) left join d 这是我的理解,如果是这样的话,理论上应该都是Broadcast Join才对,可以把explain执行计划贴出来确认一下~

    2021-09-08
    2
    1
  • Sampson
    磊哥你好 ,请教一下,我在我的任务中设置的如下的参数提交Spark任务, --master yarn --deploy-mode cluster --num-executors 20 --executor-cores 1 --executor-memory 5G --driver-memory 2G --conf spark.yarn.executor.memoryOverhead=2048M --conf spark.sql.shuffle.partitions=30 --conf spark.default.parallelism=30 --conf spark.sql.autoBroadcastJoinThreshold=1024 按照上文中讲到的我设置了广播变量的阀值是 1024 = 1G ,但是看任务运行中的日志 storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on miai7:38123 (size: 14.5 KB, free: 2.5 GB) storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on miai9:38938 (size: 14.5 KB, free: 2.5 GB) storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on miai5:39487 (size: 14.5 KB, free: 2.5 GB) storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on miai7:46429 (size: 14.5 KB, free: 2.5 GB) storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on miai5:41456 (size: 14.5 KB, free: 2.5 GB) storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on miai7:40246 (size: 14.5 KB, free: 2.5 GB) storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on miai5:45320 (size: 14.5 KB, free: 2.5 GB) storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on miai4:41769 (size: 14.5 KB, free: 2.5 GB) storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on miai4:38896 (size: 14.5 KB, free: 2.5 GB) storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on miai4:35351 (size: 14.5 KB, free: 2.5 GB) 并不是我设置的1G呀 ,这是为什么呢 ?

    作者回复: 这个配置项spark.sql.autoBroadcastJoinThreshold,它的含义是,size不大于这个设置的数据集,可以被广播。从log来看,你的广播变量小于1G,所以没有问题~ 并不是说你设置多少,广播变量大小就是多少

    2022-01-21
收起评论
显示
设置
留言
18
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部