Spark 性能调优实战
吴磊
前 FreeWheel 机器学习团队负责人
8808 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 36 讲
Spark 性能调优实战
15
15
1.0x
00:00/00:00
登录|注册

24 | Spark 3.0(一):AQE的3个特性怎么才能用好?

你好,我是吴磊。
目前,距离 Spark 3.0 版本的发布已经将近一年的时间了,这次版本升级添加了自适应查询执行(AQE)、动态分区剪裁(DPP)和扩展的 Join Hints 等新特性。利用好这些新特性,可以让我们的性能调优如虎添翼。因此,我会用三讲的时间和你聊聊它们。今天,我们先来说说 AQE。
我发现,同学们在使用 AQE 的时候总会抱怨说:“AQE 的开关打开了,相关的配置项也设了,可应用性能还是没有提升。”这往往是因为我们对于 AQE 的理解不够透彻,调优总是照葫芦画瓢,所以这一讲,我们就先从 AQE 的设计初衷说起,然后说说它的工作原理,最后再去探讨怎样才能用好 AQE。

Spark 为什么需要 AQE?

在 2.0 版本之前,Spark SQL 仅仅支持启发式、静态的优化过程,就像我们在第 21、22、23 三讲介绍的一样。
启发式的优化又叫 RBO(Rule Based Optimization,基于规则的优化),它往往基于一些规则和策略实现,如谓词下推、列剪枝,这些规则和策略来源于数据库领域已有的应用经验。也就是说,启发式的优化实际上算是一种经验主义
经验主义的弊端就是不分青红皂白、胡子眉毛一把抓,对待相似的问题和场景都使用同一类套路。Spark 社区正是因为意识到了 RBO 的局限性,因此在 2.2 版本中推出了 CBO(Cost Based Optimization,基于成本的优化)。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Spark 3.0版本引入了自适应查询执行(AQE)等新特性,为了充分利用这些特性,我们需要了解AQE的设计初衷、工作原理以及如何使用。AQE作为一种动态优化机制,可以在运行时根据Shuffle Map阶段的统计信息动态调整、修正逻辑计划和物理计划,从而提升性能。AQE的三大特性包括Join策略调整、自动分区合并和自动倾斜处理。Join策略调整通过DemoteBroadcastHashJoin和OptimizeLocalShuffleReader规则,将Shuffle Joins降级为Broadcast Joins,并优化Shuffle操作,提升性能。自动分区合并利用CoalesceShufflePartitions策略,在Reduce阶段合并小于目标尺寸的分区,提高效率。自动倾斜处理则通过OptimizeSkewedJoin策略,将大分区拆分为多个小分区,平衡Task之间的计算负载。然而,在复杂的数据倾斜场景下,需要权衡AQE的自动化机制与手工处理倾斜之间的利害得失。因此,了解AQE的设计初衷、工作原理以及如何使用这些特性,可以帮助我们更好地优化Spark SQL查询性能。AQE是Spark SQL的一种动态优化机制,它的诞生解决了RBO、CBO,这些启发式、静态优化机制的局限性。想要用好AQE,我们就要掌握它的特点,以及它支持的三种优化特性的工作原理和使用方法。如果用一句话来概括AQE的定义,就是每当Shuffle Map阶段执行完毕,它都会结合这个阶段的统计信息,根据既定的规则和策略动态地调整、修正尚未执行的逻辑计划和物理计划,从而完成对原始查询语句的运行时优化。也因此,只有当你的查询语句会引入Shuffle操作的时候,Spark SQL才会触发AQE。AQE支持的三种优化特性分别是Join策略调整、自动分区合并和自动倾斜处理。关于Join策略调整,我们首先要知道DemoteBroadcastHashJoin规则仅仅适用于Shuffle Sort Merge Join这种关联机制,对于其他Shuffle Joins类型,AQE暂不支持把它们转化为Broadcast Joins。其次,为了确保AQE的Join策略调整正常运行,我们要确保spark.sql.adaptive.localShuffleReader.enabled配置项始终为开启状态。关于自动分区合并,我们要知道,在Shuffle Map阶段完成之

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Spark 性能调优实战》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(18)

  • 最新
  • 精选
  • 周俊
    老师, //订单表与用户表关联 select sum(order.price * order.volume), user.id from order inner join user on order.userId = user.id where user.type = ‘Head Users’ group by user.id 这段代码中,catalyst会做谓词下推处理,将过滤条件放到贴近数据源扫描的地方,这里过滤之后user表的大小已经属于小表变成广播变量的阈值了吧,为什么还会有AQE呢

    作者回复: 好问题~ 推出AQE之前,Spark SQL的静态优化没有那么智能。举例来说,表A:10G;表2:2G,表2过滤之后100MB,然后比如把广播阈值设置为1GB。讲道理,理论上,过滤之后应该选择Broadcast Join才对。 但是,Spark SQL在静态优化期间,会根据表A、B的存储大小,来判断选择什么Join策略,由于表A、B都大于1GB,所以Spark SQL在优化阶段自然选择Shuffle Join。它在优化阶段,并不知道filter之后,表B会变小,小到足以放进广播变量。这个正是静态优化的限制与痛点,是AQE设计初衷之一。 但是AQE就不同,根据Shuffle过后的中间文件,Spark SQL可以动态地决定,把原理的Shuffle Join调整为Broadcast Join。

    2021-08-16
    12
  • Fendora范东_
    磊哥,请教下 Join策略调整里面,为啥只有SMJ能降级,而其他Join比如SHJ不能降级

    作者回复: 好问题~ 其实这个问题我特别想让你独立思考,你先花5分钟想想,结合咱们之前讲过的内容,然后再看答复~ —————————给你留个分割线,哈哈———————————————— 首先,Spark有个参数:spark.sql.join.prefersortmergeJoin,它默认是true,也就是说,对于Shuffle Joins,Spark SQL会prefer也就是偏向优先选择Shuffle Sort Merge Join,换句话说,如果你不动这个参数的话,那么Spark SQL根本不会去考虑用Shuffle Hash Join。这么说吧,在Spark SQL眼里,Shuffle Hash Join的地位非常的低,虽然这一点我不认可,但是Spark就是这么干的。 所以,如果你想启用Shuffle Hash Join,那就必须把spark.sql.join.prefersortmergeJoin置为false。这还不算完,即便如此,Spark SQL其实还是会倾向使用Shuffle Sort Merge Join。当且仅当如下条件成立的时候,Spark才有可能考虑Shuffle Hash Join: 1)外表比内表至少大3倍 2)内表所有数据分片,都要小于广播变量 你说说,这个条件苛刻不?因此,想要启用Shuffle Hash Join,开发者就只剩下一条路可走:Join hints。也就是使用.hint("SHUFFLE_HASH")关键字,来强制Spark SQL选择Shuffle Hash Join。但是这个时候,你都使用强制机制了,Spark SQL会走单独的code path,不用再去利用AQE的Join策略调整、去看看能不能转Broadcast Joins。 因此,总结下来,在AQE的优化机制下,只有SMJ会降级为Broadcast Hash Join,压根跟Shuffle Hash Join没啥关系~

    2021-05-09
    3
    8
  • sparkjoy
    老师,为什么不在join之前统计两个表的大小,从而决定是否用BHJ,而是map结束之后才根据shuffle文件总大小去判断呢?

    作者回复: spark的计算模式是惰性求值,在没有action的情况,对于数据集大小的判断,其实都是一种基于统计的猜测,这个就是CBO的工作原理。CBO要想work,必须要先用ANALYZE TABLE来统计数据表的各种信息。 AQE相反,放弃了这种思路,不再提取计算统计信息,而是根据运行时的反馈,来动态优化。shuffle本身,你可以把它看作是一种“action”,因为map阶段要落盘,reduce阶段在本质上,是另一个Stage的“map”阶段。下一个stage通过磁盘与上一个stage交换数据。也就是说,Spark能利用的运行时数据,只有map阶段输出的中间文件,对于数据表的大小也好,过滤之后的表大小也好,如果没有shuffle map的中间文件,spark并没有哪里可以获取关于数据表大小、分布这类信息。因此,AQE必须依赖shuffle中间文件,来完成动态的判断。

    2021-08-27
    4
  • zxk
    问题一:个人认为 AQE 可以在加载读取文件的时候获取一些运行时信息,或者做 cache 的时候。这里也有个疑问,就是 AQE 会不会根据这些信息也进行一些优化? 问题二想到两种方法,不知道是否可行: 1. Spark 有一个动态分配调整 executor 的功能,在 Shuffle Map 阶段由 Driver 端汇聚信息决定好倾斜数据的切割方式,之后部分数据发送到原有的 executor 上,切割出来的数据发送到新的 executor 上,同时也需要注意对应做关联的数据也需要复制一份传输到新的 executor 上,但这样会带来 driver 端决策的开销、新的 executor 调度开销以及关联数据额外复制并通过网络传输的开销。 2. 仍按照原来的方式进行,但在 Reduce 阶段切割数据后,起一个新的 executor 来分担切割后的数据,并通知 driver 端。如果能够在同节点上新起 executor,还可以消除网络之间的传输,只做进程间的数据传输即可。 这里想向老师请教一个关于 Join 策略调整的问题,如果 a、b 为事实表,c 为维度表,a、c 做关联后 c 从原来的 SMJ 被 AQE 优化为了 BHJ 后,如果紧接着 b 又跟 c 做关联,那么 Spark 是否会直接使用 BHJ,还是仍需要将 b、c 做 SHuffle Map 之后才能优化为 BHJ?

    作者回复: 先来说说你提的问题,好问题~ 这里面涉及到一个多表Join的执行过程问题,先说结论,无论你的查询涉及多少张表,所有的数据库引擎(包括Spark SQL在内),都是以两表Join为粒度,逐渐地完成Join的。以你的例子来说,a、b、c三张表,a、c先关联,那么这个时候,a、c会生成关联后的中间结果tmp,后续和b表关联的,是这个“tmp表”,已经不再是你说的c表了。因此,回答你的问题,a、c关联利用AQE转化为BHJ,那么后续Spark SQL需要继续关联tmp和b,这个时候,AQE会继续判定tmp结果集大小,如果小于广播阈值,那么它就会把tmp与b的关联,继续转化为BHJ。相反,如果tmp很大,那么Spark SQL就用默认的SMJ。 再回过头来说问题一,现有的AQE机制,是不会利用你说的那些信息的,不过这正是问题所在,就是AQE可不可以、应不应该利用其它的运行时信息,我个人认为,是可以的,只不过这里还是要平衡一个运行时信息统计效率的问题,这个是关键所在~ 关于问题二,你说的特性,应该是Spark的Dynamic allocation,也就是以Executors为粒度,根据任务的计算负载,动态缩放Spark的分布式集群。这个确实是个不错的思路,不过呢,如果仅仅是依赖于集群的缩放,那么其实还是解决不了以task为粒度做负载均衡的问题,因为不论你有多少Executors,倾斜的数据,还是会分发到某一个Executor上面去。这块的答案其实是“两阶段Shuffle”,第一阶段通过加盐, 把数据打散,这样数据就会均衡地分发到集群内所有的Executors,解决了Executors之间的负载均衡问题;第二阶段是去盐化,也就是把第一阶段添加的随机“盐粒”去掉,保证关联关系的一致性。关键两阶段Shuffle的细节,可以参考第29讲哈~

    2021-05-07
    4
  • sparkjoy
    老师,cbo的信息是存在表的元信息里吗?

    作者回复: 对,CBO相关的元信息,都存储在Hive表,使用ANALYZE语句分析得到的统计信息,都会记录到那些数据表里面去~

    2021-08-26
    2
  • A
    spark.sql.adaptive.advisoryPartitionSizeInBytes,由开发者指定分区合并后的推荐尺寸。分区合并的最终尺寸这个配置应该是个建议值吧?min(maxTargetSize,advisoryTargetSize)会通过这行代码来取advisoryTargetSize

    作者回复: 是的,没错,就是你说的这个逻辑~

    2021-07-21
    2
    2
  • wayne
    老师,请问 aqe可以强制指定输出文件的大小吗?比如强制设置 设置分区文件大小为128m

    作者回复: 不可以哈,AQE并不能强制每个分区大小是多少,只能是根据诸如: spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes spark.sql.adaptive.advisoryPartitionSizeInBytes spark.sql.adaptive.coalescePartitions.minPartitionNum 来决定什么时候拆分、什么时候合并 再者,实际上强制分区大小,也是没有道理的,合理的做法,是根据“三足鼎立”的原则,灵活地设置CPU、内存和并行度,来决定每个分区的大小~

    2021-12-20
    1
  • To_Drill
    老师您好,关于文章中的图-两边倾斜中的关联箭头指向有点疑问,如果两张表的关联分区都发生了倾斜,然后都进行了拆分,那么拆分之后的子分区中的是局部数据,应该和对方的全量数据都关联下才能保证原来的关联逻辑,这样的话各个子分区的关联箭头指向是不是应该如下图所示呢? 拆分-01-拷贝 -> 拆分-01 拆分-01 -> 拆分-02 拆分-02 -> 拆分-01-拷贝 拆分-02-拷贝 -> 拆分-02-拷贝

    作者回复: 对的,对的~ 老弟说的对~ 是应该像你说的那样,交叉组合,原图画的有问题,回头我改下,感谢老弟提醒~

    2021-11-30
    1
  • 西南偏北
    1. 如果不用Map阶段的输出文件的话,那应该就是实时统计了吧,比如"ANALYZE TABLE COMPUTE STATISTICS" 2. 为了防止倾斜分区都出现在同一个Executor的情况,可以考虑对倾斜数据的key进行加前缀,然后再将这些数据进行一下重分区repartition(),分区数指定为executor的个数。但是,由于使用了repartition(),也就引入了shuffle开销,这个也是一个要平衡的问题

    作者回复: 第二题满分💯,没错,要实现Executors级别的均衡,那确实需要加盐来处理,也就是把Join Keys打散,这样哈希过后再分发数据,数据在Executors中的分布就会比较均衡。 第一题实时统计的思路也没问题~ 这个思路其实就是传统DBMS的真正的CBO,实时计算每一步的costs,然后按照costs动态做优化。

    2021-05-14
    1
  • Stony.修行僧
    老师请教一个问题,一个10g的csv 文件,里面有4个字段,其中三个字段需要做匿名化。在匿名化三个字段里过程中,可以partition第四个字段来提高性能吗?求老师的意见和建议

    作者回复: 我先说说对于问题的理解哈,不确定我的理解是不是对的。 你的诉求:一个csv格式的文件,4个字段,目标是对其中3个字段做匿名化,想问按照第四个字段做分布式计算来提高执行效率行不行。 如果是这样的话,那我觉得完全没问题,如果第四个字段分布比较均匀,不存在明显的倾斜问题,我觉得完全可以啊,而且就是应该这么做来提升执行效率。 如果有倾斜的话,你还可以手工加盐处理,手工加盐的方法我们29讲会细说,到时候可以重点关注下~

    2021-05-07
    2
    1
收起评论
显示
设置
留言
18
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部