作者回复: 好问题~ 推出AQE之前,Spark SQL的静态优化没有那么智能。举例来说,表A:10G;表2:2G,表2过滤之后100MB,然后比如把广播阈值设置为1GB。讲道理,理论上,过滤之后应该选择Broadcast Join才对。 但是,Spark SQL在静态优化期间,会根据表A、B的存储大小,来判断选择什么Join策略,由于表A、B都大于1GB,所以Spark SQL在优化阶段自然选择Shuffle Join。它在优化阶段,并不知道filter之后,表B会变小,小到足以放进广播变量。这个正是静态优化的限制与痛点,是AQE设计初衷之一。 但是AQE就不同,根据Shuffle过后的中间文件,Spark SQL可以动态地决定,把原理的Shuffle Join调整为Broadcast Join。
作者回复: 好问题~ 其实这个问题我特别想让你独立思考,你先花5分钟想想,结合咱们之前讲过的内容,然后再看答复~ —————————给你留个分割线,哈哈———————————————— 首先,Spark有个参数:spark.sql.join.prefersortmergeJoin,它默认是true,也就是说,对于Shuffle Joins,Spark SQL会prefer也就是偏向优先选择Shuffle Sort Merge Join,换句话说,如果你不动这个参数的话,那么Spark SQL根本不会去考虑用Shuffle Hash Join。这么说吧,在Spark SQL眼里,Shuffle Hash Join的地位非常的低,虽然这一点我不认可,但是Spark就是这么干的。 所以,如果你想启用Shuffle Hash Join,那就必须把spark.sql.join.prefersortmergeJoin置为false。这还不算完,即便如此,Spark SQL其实还是会倾向使用Shuffle Sort Merge Join。当且仅当如下条件成立的时候,Spark才有可能考虑Shuffle Hash Join: 1)外表比内表至少大3倍 2)内表所有数据分片,都要小于广播变量 你说说,这个条件苛刻不?因此,想要启用Shuffle Hash Join,开发者就只剩下一条路可走:Join hints。也就是使用.hint("SHUFFLE_HASH")关键字,来强制Spark SQL选择Shuffle Hash Join。但是这个时候,你都使用强制机制了,Spark SQL会走单独的code path,不用再去利用AQE的Join策略调整、去看看能不能转Broadcast Joins。 因此,总结下来,在AQE的优化机制下,只有SMJ会降级为Broadcast Hash Join,压根跟Shuffle Hash Join没啥关系~
作者回复: spark的计算模式是惰性求值,在没有action的情况,对于数据集大小的判断,其实都是一种基于统计的猜测,这个就是CBO的工作原理。CBO要想work,必须要先用ANALYZE TABLE来统计数据表的各种信息。 AQE相反,放弃了这种思路,不再提取计算统计信息,而是根据运行时的反馈,来动态优化。shuffle本身,你可以把它看作是一种“action”,因为map阶段要落盘,reduce阶段在本质上,是另一个Stage的“map”阶段。下一个stage通过磁盘与上一个stage交换数据。也就是说,Spark能利用的运行时数据,只有map阶段输出的中间文件,对于数据表的大小也好,过滤之后的表大小也好,如果没有shuffle map的中间文件,spark并没有哪里可以获取关于数据表大小、分布这类信息。因此,AQE必须依赖shuffle中间文件,来完成动态的判断。
作者回复: 先来说说你提的问题,好问题~ 这里面涉及到一个多表Join的执行过程问题,先说结论,无论你的查询涉及多少张表,所有的数据库引擎(包括Spark SQL在内),都是以两表Join为粒度,逐渐地完成Join的。以你的例子来说,a、b、c三张表,a、c先关联,那么这个时候,a、c会生成关联后的中间结果tmp,后续和b表关联的,是这个“tmp表”,已经不再是你说的c表了。因此,回答你的问题,a、c关联利用AQE转化为BHJ,那么后续Spark SQL需要继续关联tmp和b,这个时候,AQE会继续判定tmp结果集大小,如果小于广播阈值,那么它就会把tmp与b的关联,继续转化为BHJ。相反,如果tmp很大,那么Spark SQL就用默认的SMJ。 再回过头来说问题一,现有的AQE机制,是不会利用你说的那些信息的,不过这正是问题所在,就是AQE可不可以、应不应该利用其它的运行时信息,我个人认为,是可以的,只不过这里还是要平衡一个运行时信息统计效率的问题,这个是关键所在~ 关于问题二,你说的特性,应该是Spark的Dynamic allocation,也就是以Executors为粒度,根据任务的计算负载,动态缩放Spark的分布式集群。这个确实是个不错的思路,不过呢,如果仅仅是依赖于集群的缩放,那么其实还是解决不了以task为粒度做负载均衡的问题,因为不论你有多少Executors,倾斜的数据,还是会分发到某一个Executor上面去。这块的答案其实是“两阶段Shuffle”,第一阶段通过加盐, 把数据打散,这样数据就会均衡地分发到集群内所有的Executors,解决了Executors之间的负载均衡问题;第二阶段是去盐化,也就是把第一阶段添加的随机“盐粒”去掉,保证关联关系的一致性。关键两阶段Shuffle的细节,可以参考第29讲哈~
作者回复: 对,CBO相关的元信息,都存储在Hive表,使用ANALYZE语句分析得到的统计信息,都会记录到那些数据表里面去~
作者回复: 对的,对的~ 老弟说的对~ 是应该像你说的那样,交叉组合,原图画的有问题,回头我改下,感谢老弟提醒~
作者回复: 是的,没错,就是你说的这个逻辑~
作者回复: 第二题满分💯,没错,要实现Executors级别的均衡,那确实需要加盐来处理,也就是把Join Keys打散,这样哈希过后再分发数据,数据在Executors中的分布就会比较均衡。 第一题实时统计的思路也没问题~ 这个思路其实就是传统DBMS的真正的CBO,实时计算每一步的costs,然后按照costs动态做优化。
作者回复: 我先说说对于问题的理解哈,不确定我的理解是不是对的。 你的诉求:一个csv格式的文件,4个字段,目标是对其中3个字段做匿名化,想问按照第四个字段做分布式计算来提高执行效率行不行。 如果是这样的话,那我觉得完全没问题,如果第四个字段分布比较均匀,不存在明显的倾斜问题,我觉得完全可以啊,而且就是应该这么做来提升执行效率。 如果有倾斜的话,你还可以手工加盐处理,手工加盐的方法我们29讲会细说,到时候可以重点关注下~
作者回复: 我们把shuffle之前的stage称作stage0,也就是shuffle map stage,把shuffle过后的stage,成为stage1,也就是reduce阶段的stage。 Stage0的并行度,或者说有多少task,是由stage0里面第一个RDD/DataFrame决定的。而Reduce阶段的stage1,它的并行度,或者说tasks数量,它是由配置项spark.sql.shuffle.partitions决定的