• 来世愿做友人 A
    2021-04-07
    请教老师一个问题,对于小文件合并,上边说的是某个 executor 的大小排序后合并。比如两个 executorA 和 B,分别有两个 task 运行,groupby 并且各自产生了3个分区,分别是 a.0,a.1,a.2 和 b.0,b.1,b.2,在没有合并小分区的情况下,reduce端会有三个任务拉取各自的012分区。但是,打开小分区合并,在满足合并的条件下,a.0和a.1合并成 a.01,b.1和b.2合并成b.12。这时候两个 task 各有两个分区,但是他们的分区 key 相当于混在一起了。shuffle的 reduce 是怎么拉取。因为目前只看过 raw rdd的相关,目前没想到是怎么解决这个问题的?比如又会多引入一层 shuffle?或者有其它判断,最终判断这次的 reduce 只能有一个 task,然后拉取所有 map 端的分区?

    作者回复: 好问题,思考的很深入,你是第一个问这个问题的同学,先赞一个~ 首先,分区合并是在reduce阶段(这块原文我可能没说清楚,我的锅),也就是每个reduce task把数据拉回来,aqe发现有些task分片小于合并的“目标大小”,这个时候,物理策略Coalesce Shuffle Partitions就开始生效。这部分细节后面的aqe还会展开,这一讲因为focus在配置项,所以没说的特别细。 然后,对于你说的问题,我们来分类讨论。 第一种,假设是单表shuffle,比如reduceByKey这种,那么合并之后不影响,即便一个分片有多个key,也不要紧,只要保证同一个key的payload都在一个分区就行。 第二种,两表join。这个时候就有意思了,你说的担心就会成立,比如,外表大、内表小,外表合并之后,两个分区,第一个分区包含key=1、2、3,第二个分区,包含key=4、5、6;因为内表小,因此内表合并之后,一个分区就包含了1-6。其实,这样不要紧,为啥呢?如果你熟悉join实现原理的话(26讲会展开),在一个进程内,不论是nlj、smj还是hash join,外表,也就是驱动表,会驱动内表的“全量扫描”,带引号是因为效率不一样,smj、hj不用全量,但意思是一样的,就是不管内表有多少分区,都会被外表驱动着去被遍历。因此,不轮内表数据分配到了哪个分区,其实都还是在一个executor进程内,所以,不影响join逻辑,也不影响效率。nlj还是慢,hash join还是快,不会因为数据“看上去”打散了,而影响功能和性能。 手机徒手打字,可能没说特别清楚,有问题再留言哈~

    共 5 条评论
    21
  • CRT
    2021-05-11
    spark.shuffle.sort.bypassMergeThreshold 这个阈值为什么是跟Reduce 端的分区数有关,Reduce 端的分区数过大的话,取消排序会有不好的影响吗?

    作者回复: 好问题,先说为什么参数归类为Reduce端,因为这个参数spark.shuffle.sort.bypassMergeThreshold,它指定的是Reduce阶段的并行度限制,所以归类到Reduce端参数。但它实际控制的,是在Map阶段,能否byPass排序的操作。 再来说说,为啥Reduce端的并行度为什么不能太大。这个其实怪我没有在本讲交代更多bypass的实现机制。如果Shuffle走bypass的code path,那么对于每一个Task,它的执行过程是: a)为每一个reduce task生成一个临时文件 b)为每个临时文件创建写buffer、以及一个serializer对象,用于序列化数据 c)保持所有的临时文件打开,将map阶段的数据,按照reduce端partitionId的不同,依次写入到这些临时文件 d)最后,map task计算完毕,把所有这些临时文件合并到一起,生成data文件,并同时生成index文件 因此,你看,如果你的reduce阶段并行度非常的高,那么map task的计算开销会非常大,要同时打开非常多的临时文件、建立非常多的写buffer、新建非常多个serializer,这些都是开销。 这也是为什么,要启用shuffle的bypass,spark会要求你的reduce并行度,要小于参数的设置:spark.shuffle.sort.bypassMergeThreshold,目的其实就是说:如果你的reduce并行度非常高,那么开启bypass,即便节省了排序的开销,但是因此而引入的诸多额外开销,实际上整体上算下来是划不来的。

    
    20
  • kingcall
    2021-04-09
    其实我们的调优很多都是发生在数据规模比较大的情况下,对于比较大的shuffle 可以对下面的参数进行调节,提高整个shuffle 的健壮性 spark.shuffle.compress 是否对shuffle 的中间结果进行压缩,如果压缩的话使用`spark.io.compression.codec` 的配置进行压缩 spark.shuffle.io.maxRetries io 失败的重试次数,在大型shuffle遇到网络或者GC 问题的时候很有用。 spark.shuffle.io.retryWait io 失败的时候的等待时间

    作者回复: 没错,在超大规模的shuffle里,这几个参数都可以贡献任务执行的稳定性,不过compress我印象中默认是enabled。其他两个参数对于稳定性很有帮助~

    共 2 条评论
    10
  • 苏子浩
    2021-05-26
    老师好!我想讨论一下文中“自动数据倾斜处理”部分。其中我们提到“advisoryPartitionSizeInBytes”这个参数。我通过查看源代码发现:拆分的时候我们具体使用的拆分粒度(targetSize)不仅会考虑该参数的数值,同时会考虑非倾斜的分区(non-skewedPartition)的平均大小。用数学表示的话应该是“Math.max(advisortSize, nonSkewSizes.sum / nonSkewSizes.length)”。其中nonSkewSizes表示“所有分区中过滤掉倾斜分区后所剩余分区,其分区大小所构成的列表”。 我想表达的是:在‘自动倾斜处理’中所用到的思想与‘自动分区合并’中相似! 并不是指定了 advisoryPartitionSizeInBytes 是多少,Spark 就会完全尊重开发者的意见,还要考虑非倾斜分区的平均大小。 那么这样来看的话,文中所举的例子“检测到倾斜分区之后,接下来就是对它拆分,拆分的时候还会用到 advisoryPartitionSizeInBytes 参数。假设我们将这个参数的值设置为 256MB,那么,刚刚那个 512MB 的倾斜分区会以 256MB 为粒度拆分成多份,因此,这个大分区会被拆成 2 个小分区( 512MB / 256MB =2)。拆分之后,原来的数据表就由 3 个分区变成了 4 个分区,每个分区的尺寸都不大于 256MB。“其实在这里其实是比较了Math( ((80+100) / 2), 256) = 256后,我们才最终确定以 256MB 为粒度拆分存在倾斜的分区。 接着是我的一点看法,AQE中对于认定倾斜分区的条件看起来非常苛刻,首先要满足该分区的大小高于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 参数的设定值。同时,取所有数据分区大小排序后的中位数作为放大基数,尺寸大于中位数指定倍数的分区才会被判定为倾斜分区。那么是不是可以看到其实做倾斜分区处理这件事的成本还是很高的。因为在数据关联场景中,如果两边的表都存在数据倾斜的话,会出现笛卡尔积的显现。哪怕是只有一边的表存在数据倾斜,另外一边的分区复制也是不小的开销。在关联场景中涉及到更多的网络开销。以及需要涉及到reducer任务从一个分区中读取部分数据,其中涉及到的数据划分成本很高? 其实我看到的第一反应是想到了Shuffle Hash Join激活的先决条件,感觉激活的条件都非常苛刻。
    展开

    作者回复: 好问题,这里其实有两个话题,我们分别来讨论。不过,首先感谢老弟的补充!这块是我遗漏了,非常感谢! 先说第一个话题,就是AQE判断倾斜的条件比较苛刻。这块我同意你的说法,就是AQE对于倾斜的判定,非常的谨慎,除了对“倾斜的candidates”有要求,也就是大于skewedPartitionThresholdInBytes,同时还要大于中位数的一定倍数、等等;还对非倾斜的部分也有要求,也就是你补充的那部分。我说说我的理解,AQE之所以采用这种“严进”(也就是不会轻易地断定场景就是倾斜的),根本原因恰恰是你说的第二个话题,也就是倾斜的处理是有性能开销的。比如数据复制的开销、数据分发的开销,等等。所以对于倾斜的判定,AQE采取了比较保守的策略。应该说,策略这种东西,无所谓优劣,因为这要看场景。我们其实很难说,是保守的策略好,还是激进的策略好。这其实要看倾斜处理带来的收益有多大,这就要说到第二个话题。 也就是倾斜处理的开销问题,其实不管是AQE的自动倾斜处理,还是我们开发者手工处理数据倾斜(可以参考第29讲的“两阶段Shuffle”),实际上在处理倾斜的过程中,都会引入不同程度的计算开销。核心问题在于:数据倾斜带来的负载倾斜是否构成整个关联计算的性能瓶颈?如果这个问题的答案是肯定的,那么AQE的自动倾斜处理也好、开发者的手工处理也罢,就都是值得的。相反,那就成了“费力不讨好”。 以上是我的一些想法,欢迎老弟继续讨论~ 再次感谢老弟补充完善!

    共 2 条评论
    6
  • 天翼
    2021-05-03
    老师,请问一下,spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 这个配置在官网的 Configuration 中没有找到,是改名了吗?

    作者回复: 好问题,看的很细~ 你是第一个深究这个参数的同学,赞一个~ 👍 这个是我的锅,我没有交代清楚,spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin,这是个internal的配置项,Spark社区并没有把它暴露给开发者,不过我们还是可以通过调整这个参数的设置,来调整Spark SQL的优化行为。

    共 2 条评论
    6
  • 木子中心
    2021-09-08
    老师好,有一个问题:文章里面说建议设置spark.sql.autoBroadcastJoinThreshold为2G,如果数据是parquet格式的话,将数据加载到内存中会膨胀比较大,这个时候,driver端内存应该配置多少才能不oom呢?

    作者回复: 可以参考刚刚留言的那个预估公式: val df: DataFrame = _ df.cache.count val plan = df.queryExecution.logical val estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes 这里estimated,就是数据集在内存中的预估大小,你可以参考这个数值,来设置Driver内存~

    
    5
  • 斯盖丸
    2021-06-15
    老师,我看Spark 3.1.1的文档,spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin这个参数已经被去除了。。由此想到一个重大问题:像这种重大的参数配置变更老师是怎么第一时间获悉的并跟上版本更新的节奏,而不像我们这样没人告知,只能被动地获悉呢?

    作者回复: 好问题,我觉得几个渠道吧,一个是官方的release notes,一个是databricks的官方博客,再有就是源码啦~

    
    5
  • 木子中心
    2021-09-08
    吴老师好!有个问题想请教下: spark广播join经常造成driver oom,spark.sql.autoBroadcastJoinThreshold使用默认值,driver为4-5G左右,文件存储格式为parquet。查阅资料发现spark是直接通过扫描文件的总大小及多少列来判断是否小于阈值,进行广播。由于使用了parquet格式,在扫描少数列的情况下,由于压缩率较高,在某些情况下,上百万数据的结果集也进行广播,造成driver段oom。

    作者回复: 对,确实有这个问题。 这块其实后面广播那几讲会有介绍。就是如果RDD、DataFrame没有压缩,Spark判断的存储大小,是数据集对应的源文件在文件系统上面的磁盘存储大小。如果用Parquet、ORC这种压缩比比较高的数据,就容易出现你说的问题。 我们后面会有一个数据集大小预估办法,你可以参考那个办法,来预估数据集在内存中的大小~ 具体来说: val df: DataFrame = _ df.cache.count val plan = df.queryExecution.logical val estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes

    
    3
  • 耳东
    2021-04-16
    问下老师,如果某个表的大小是10m,但是它的条数超过了1千万条,这种表属不属于小表

    作者回复: 大小是相对的哈,参与Join的两张表,如果左右表的尺寸相差3倍以上,就可以考虑用“大表Join小表”的思路来解决这类关联问题了。我们在后面的第27讲,专门讲解“大表Join小表”的各种优化思路。 你说的情况,是和广播阈值作对比,就是能放进广播变量的,都可以算是小表。 所以,回答你的问题,10M我觉得算是小表了,和数据条目的数量没有关系,和整个数据集的内存存储大小有关系。Spark不会限制说,你的数据集条目超过了多少行,就不能做优化,主要还是看存储大小。

    共 2 条评论
    3
  • 辰
    2021-04-13
    这个aqe规则是在3.0版本才有的,但是我公司目前用的版本是2.2,有什么其他的参数设置吗

    作者回复: 爱莫能助,得升级到3.0~

    
    3