• 对方正在输入。。。
    2021-05-26
    第一题:可以先统计每个key的条目数,然后根据条目数从小到大排序,后取其序列之中位数,然后找出比中位数大n倍,同时条目数大于一定阈值的key 第二题:老师,我觉得加盐的操作好像根本没啥用,加盐的场景适合聚合操作,但是吧,一旦有了aggregator,sortShuffule的时候已经在map端提前聚合了,也不会发生倾斜了。比如,本文的例子,解决这个倾斜的问题,我理解是不是可以事先对交易表作group然后求其sum值等,这个阶段因为会在map阶段事先聚合,所以并不会倾斜,然后再将聚合的结果和orders做join

    作者回复: 第一题满分💯~ 第二题说的非常好!思考的相当到位~ 你说的没错,咱们文中的例子,特殊的地方在于,涉及aggregate的字段,都是来自Transactions表,也就是sum(tx.price * tx.quantity),因此确实可以按照你说的办法,先做Map端聚合,然后再和Orders表做关联。 不过,其实“两阶段Shuffle”的调优思路,更多地是给大家提供一种面对极端场景的解决办法。 实际上,在大多数的Join场景中,查询涉及的aggregate往往来自两张表的字段,比如把我们的例子稍微调整一下,聚合计算改成:sum(tx.price * o.batches),这个时候,我们就没办法再充分利用Map端聚合了,因为这个时候必须要先完成两张表的Shuffle,然后在Reduce端完成聚合计算。

    共 3 条评论
    5
  • ulysses
    2021-08-03
    老师想问下面一段代码,怎么能够筛选去哪些是skew的数据,哪些是even的 数据,对scala语法不太熟悉。 val skewOrderIds: Array[Int] = _ val evenOrderIds: Array[Int] = _ val skewTx: DataFrame = transactions.filter(array_contains(lit(skewOrderIds),$"orderId")) val evenTx: DataFrame = transactions.filter(array_contains(lit(evenOrderIds),$"orderId"))

    作者回复: 这块是这样的,skewOrderIds和evenOrderIds这两个列表是用来记录有倾斜和没有倾斜的OrderIds;后面那两句就是DataFrame的语法,这里面用了filter算子,用来对transactions做过滤,具体用法可以参考官方API哈。 其实这里面关键的是前面那两句,就是怎么获得倾斜列表和非倾斜列表,这块可以事先通过分组计数来获得。

    共 2 条评论
    3
  • 子兮
    2021-11-25
    老师,“两阶段 Shuffle”指的是,通过“加盐、Shuffle、关联、聚合”与“去盐化、Shuffle、聚合”这两个阶段的计算过程,第二阶段去盐后进行shuffle,不是仍然会把一个key 的所有值拉到一起吗?这和直接join最后的结果一样的呀?应该是去盐后不再进行shuffle类的操作这种加盐的操作才有意义,如理解有误,还请老师解答,谢谢

    作者回复: 两阶段Shuffle,目的是消除掉Executors级别的负载均衡问题,加盐之后,实际上数据就不再倾斜,这个时候Shuffle,数据在Executors的分布式均衡的,因此负载也是均衡的。这样的话,聚合之后,原始key的数量,会呈指数级锐减。比如,原来的key=-1,Cardinality是10,而数量是1亿,在加盐、reduce过后,数量就退化为10,这个时候,不同key之间,数量其实都差不太多,倾斜问题不再了。然后,把random去掉,也就是去盐化,再shuffle、聚合一次,也不会有倾斜问题,而计算的逻辑,与原来保持一致,可谓两全其美

    
    1
  • aof
    2021-05-22
    第一题:一般来讲,我们不会在代码一出现Join的地方就进行Key数量的统计,一般是执行任务的过程中,结合Spark WebUI上查看某个Stage中的Tasks的耗时排行,比如某个Task或某些Task的耗时是其他Task耗时的两倍以上,那我们就知道出现了数据倾斜,然后我们可以根据stage对应的代码位置来排查是哪些key出现了倾斜 第二题:如果是对数据进行分组排序这种情况,某个Key对应组的数据比较多,如果进行加盐的话,是无法保证整个组内的数据是有序的。加盐之后一组分为N组,每个组是有序的,但是最后去盐合并的时候,需要进行归并排序。

    作者回复: 第一题的思路非常赞,和传统的先用分组计数这种“先验”的方法不同,你的思路是一种更直接、更单刀直入的“后验”方法。不过,我很好奇,这个具体怎么计算?我们确实可以通过Spark UI来判断哪些Tasks是均匀的、哪些是倾斜的,但是怎么知道不同的Tasks内部,处理的都是哪些Join Keys呢?期待你的答复~ 第二题没问题,加盐之后不保序,所以凡是以排序为前提的聚合类操作,就都不能直接去用“两阶段Shuffle”,比如像你这个例子,还需要在去盐化的过程中做归并排序~

    共 5 条评论
    1
  • 王天雨
    2021-05-21
    2、比如聚合操作是取平均数 ,就不适合二次聚合了吧

    作者回复: 平均数是没问题的,平均数的计算与排序无关,其实聚合多少次都是OK的。这里的关键是“聚合的前提是排序”,举个例子,比如求分位数,25%分位、75%分位或是中位数,等等,这些计算是需要先进行全局排序,然后才可以计算的。像这些对于排序有依赖的聚合计算,就不适合“两阶段Shuffle”~

    共 2 条评论
    1
  • Unknown element
    2022-01-21
    老师您好 这两讲介绍的优化方法好像只适用于用spark原生API开发的情况?如果是hive SQL是不是就只能通过调参来优化了?谢谢老师~

    作者回复: 对,如果是纯SQL的话,只能依靠调参和引擎本身的特性来实现优化,比如说Hive的map side join机制,等等。像两阶段Shuffle这种比较定制化的方案,还是需要开发者自己来实现

    
    
  • Monster
    2022-01-20
    //内表复制加盐 var saltedskewOrders = skewOrders.withColumn(“joinKey”, concat($“orderId”, lit(“_”), lit(1)))for (i <- 2 to numExecutors) {saltedskewOrders = saltedskewOrders union skewOrders.withColumn(“joinKey”, concat($“orderId”, lit(“_”), lit(i)))} 对内表复制加盐这块的代码还是没太理解,老师可否再指导下? 举个例子说下,我理解的这个部分的代码实现目的: 例如order_id=12345,在外表加盐后变成:12345_1,那么内表复制加盐后也应该是:12345_1,否则join时候关联条件会关联不到。但还是没看懂这个内表复制加盐的代码逻辑是怎么实现的。求老师解答,非常感谢!

    作者回复: 其实就是把内表数据复制N份,每份的后缀都不同,第一份后缀是1,第二份后缀是2,以此类推;目的和你说的一样,要保持和外表的一致性。外表加了盐,内表也需要加盐,只不过内表需要“复制+加盐”,而外表的随机加盐有所不同~

    
    
  • To_Drill
    2021-11-01
    老师您好,想请教下,像以排序为前提的聚合类操作(一般都会进行全局排序)如果发生数据倾斜了,在资源固定的前提下有啥有效的方法优化呢?

    作者回复: 老弟说的“以排序为前提的聚合类操作”,能举几个例子么?方便判断你说的这些操作,适不适合做“加盐处理”,如果设计得当的话,我觉得“加盐”之后,分而治之,还是能改善问题的

    
    
  • 毛聪
    2021-05-19
    1.可以将Join Keys先group by统计一下各个不同的组合的数据量,可以取出前几个数据量特别大的作为倾斜组,剩余的作为非倾斜组。 2.“两阶段Shuffle”要对内表进行“复制加盐”,这样可能会导致内表的大小变得太大,如果内表原来的大小就超过单个Executor的大小,“复制加盐”后应该会导致OOM。

    作者回复: 第一题满分💯~ 第二题再想想哈~ 内表虽然确实多了不少副本,但是结合之前我们讲的“三足鼎立”,由于并行度跟着变大了,所以每个数据分片的大小并没有变化,因此实际上并不存在OOM的隐患。 朝着排序的方向想一想,如果Shuffle中涉及的聚合计算需要以排序为前提,那么加盐之后的优化手段,也就是“两阶段Shuffle”,会不会破坏原先的计算逻辑?

    共 3 条评论
    
  • abuff
    2021-05-19
    加盐不应该是加在前缀吗?文章怎么写是后缀呢

    作者回复: 都可以的,本质上都是把Jon Keys打散,这样哈希过后,他们会被Shuffle到不同的Executors中去。所以说,盐粒加在前、后不重要,重要的是,加盐之后的Join Keys,已经被打散了,Shuffle过后的数据分布更均匀~

    共 3 条评论
    