作者回复: 第一题满分💯~ 第二题说的非常好!思考的相当到位~ 你说的没错,咱们文中的例子,特殊的地方在于,涉及aggregate的字段,都是来自Transactions表,也就是sum(tx.price * tx.quantity),因此确实可以按照你说的办法,先做Map端聚合,然后再和Orders表做关联。 不过,其实“两阶段Shuffle”的调优思路,更多地是给大家提供一种面对极端场景的解决办法。 实际上,在大多数的Join场景中,查询涉及的aggregate往往来自两张表的字段,比如把我们的例子稍微调整一下,聚合计算改成:sum(tx.price * o.batches),这个时候,我们就没办法再充分利用Map端聚合了,因为这个时候必须要先完成两张表的Shuffle,然后在Reduce端完成聚合计算。
作者回复: 这块是这样的,skewOrderIds和evenOrderIds这两个列表是用来记录有倾斜和没有倾斜的OrderIds;后面那两句就是DataFrame的语法,这里面用了filter算子,用来对transactions做过滤,具体用法可以参考官方API哈。 其实这里面关键的是前面那两句,就是怎么获得倾斜列表和非倾斜列表,这块可以事先通过分组计数来获得。
作者回复: 两阶段Shuffle,目的是消除掉Executors级别的负载均衡问题,加盐之后,实际上数据就不再倾斜,这个时候Shuffle,数据在Executors的分布式均衡的,因此负载也是均衡的。这样的话,聚合之后,原始key的数量,会呈指数级锐减。比如,原来的key=-1,Cardinality是10,而数量是1亿,在加盐、reduce过后,数量就退化为10,这个时候,不同key之间,数量其实都差不太多,倾斜问题不再了。然后,把random去掉,也就是去盐化,再shuffle、聚合一次,也不会有倾斜问题,而计算的逻辑,与原来保持一致,可谓两全其美
作者回复: 第一题的思路非常赞,和传统的先用分组计数这种“先验”的方法不同,你的思路是一种更直接、更单刀直入的“后验”方法。不过,我很好奇,这个具体怎么计算?我们确实可以通过Spark UI来判断哪些Tasks是均匀的、哪些是倾斜的,但是怎么知道不同的Tasks内部,处理的都是哪些Join Keys呢?期待你的答复~ 第二题没问题,加盐之后不保序,所以凡是以排序为前提的聚合类操作,就都不能直接去用“两阶段Shuffle”,比如像你这个例子,还需要在去盐化的过程中做归并排序~
作者回复: 平均数是没问题的,平均数的计算与排序无关,其实聚合多少次都是OK的。这里的关键是“聚合的前提是排序”,举个例子,比如求分位数,25%分位、75%分位或是中位数,等等,这些计算是需要先进行全局排序,然后才可以计算的。像这些对于排序有依赖的聚合计算,就不适合“两阶段Shuffle”~
作者回复: 对,如果是纯SQL的话,只能依靠调参和引擎本身的特性来实现优化,比如说Hive的map side join机制,等等。像两阶段Shuffle这种比较定制化的方案,还是需要开发者自己来实现
作者回复: 其实就是把内表数据复制N份,每份的后缀都不同,第一份后缀是1,第二份后缀是2,以此类推;目的和你说的一样,要保持和外表的一致性。外表加了盐,内表也需要加盐,只不过内表需要“复制+加盐”,而外表的随机加盐有所不同~
作者回复: 老弟说的“以排序为前提的聚合类操作”,能举几个例子么?方便判断你说的这些操作,适不适合做“加盐处理”,如果设计得当的话,我觉得“加盐”之后,分而治之,还是能改善问题的
作者回复: 第一题满分💯~ 第二题再想想哈~ 内表虽然确实多了不少副本,但是结合之前我们讲的“三足鼎立”,由于并行度跟着变大了,所以每个数据分片的大小并没有变化,因此实际上并不存在OOM的隐患。 朝着排序的方向想一想,如果Shuffle中涉及的聚合计算需要以排序为前提,那么加盐之后的优化手段,也就是“两阶段Shuffle”,会不会破坏原先的计算逻辑?
作者回复: 都可以的,本质上都是把Jon Keys打散,这样哈希过后,他们会被Shuffle到不同的Executors中去。所以说,盐粒加在前、后不重要,重要的是,加盐之后的Join Keys,已经被打散了,Shuffle过后的数据分布更均匀~