作者回复: 好问题,思考的很深入,你是第一个问这个问题的同学,先赞一个~ 首先,分区合并是在reduce阶段(这块原文我可能没说清楚,我的锅),也就是每个reduce task把数据拉回来,aqe发现有些task分片小于合并的“目标大小”,这个时候,物理策略Coalesce Shuffle Partitions就开始生效。这部分细节后面的aqe还会展开,这一讲因为focus在配置项,所以没说的特别细。 然后,对于你说的问题,我们来分类讨论。 第一种,假设是单表shuffle,比如reduceByKey这种,那么合并之后不影响,即便一个分片有多个key,也不要紧,只要保证同一个key的payload都在一个分区就行。 第二种,两表join。这个时候就有意思了,你说的担心就会成立,比如,外表大、内表小,外表合并之后,两个分区,第一个分区包含key=1、2、3,第二个分区,包含key=4、5、6;因为内表小,因此内表合并之后,一个分区就包含了1-6。其实,这样不要紧,为啥呢?如果你熟悉join实现原理的话(26讲会展开),在一个进程内,不论是nlj、smj还是hash join,外表,也就是驱动表,会驱动内表的“全量扫描”,带引号是因为效率不一样,smj、hj不用全量,但意思是一样的,就是不管内表有多少分区,都会被外表驱动着去被遍历。因此,不轮内表数据分配到了哪个分区,其实都还是在一个executor进程内,所以,不影响join逻辑,也不影响效率。nlj还是慢,hash join还是快,不会因为数据“看上去”打散了,而影响功能和性能。 手机徒手打字,可能没说特别清楚,有问题再留言哈~
作者回复: 好问题,先说为什么参数归类为Reduce端,因为这个参数spark.shuffle.sort.bypassMergeThreshold,它指定的是Reduce阶段的并行度限制,所以归类到Reduce端参数。但它实际控制的,是在Map阶段,能否byPass排序的操作。 再来说说,为啥Reduce端的并行度为什么不能太大。这个其实怪我没有在本讲交代更多bypass的实现机制。如果Shuffle走bypass的code path,那么对于每一个Task,它的执行过程是: a)为每一个reduce task生成一个临时文件 b)为每个临时文件创建写buffer、以及一个serializer对象,用于序列化数据 c)保持所有的临时文件打开,将map阶段的数据,按照reduce端partitionId的不同,依次写入到这些临时文件 d)最后,map task计算完毕,把所有这些临时文件合并到一起,生成data文件,并同时生成index文件 因此,你看,如果你的reduce阶段并行度非常的高,那么map task的计算开销会非常大,要同时打开非常多的临时文件、建立非常多的写buffer、新建非常多个serializer,这些都是开销。 这也是为什么,要启用shuffle的bypass,spark会要求你的reduce并行度,要小于参数的设置:spark.shuffle.sort.bypassMergeThreshold,目的其实就是说:如果你的reduce并行度非常高,那么开启bypass,即便节省了排序的开销,但是因此而引入的诸多额外开销,实际上整体上算下来是划不来的。
作者回复: 没错,在超大规模的shuffle里,这几个参数都可以贡献任务执行的稳定性,不过compress我印象中默认是enabled。其他两个参数对于稳定性很有帮助~
作者回复: 好问题,这里其实有两个话题,我们分别来讨论。不过,首先感谢老弟的补充!这块是我遗漏了,非常感谢! 先说第一个话题,就是AQE判断倾斜的条件比较苛刻。这块我同意你的说法,就是AQE对于倾斜的判定,非常的谨慎,除了对“倾斜的candidates”有要求,也就是大于skewedPartitionThresholdInBytes,同时还要大于中位数的一定倍数、等等;还对非倾斜的部分也有要求,也就是你补充的那部分。我说说我的理解,AQE之所以采用这种“严进”(也就是不会轻易地断定场景就是倾斜的),根本原因恰恰是你说的第二个话题,也就是倾斜的处理是有性能开销的。比如数据复制的开销、数据分发的开销,等等。所以对于倾斜的判定,AQE采取了比较保守的策略。应该说,策略这种东西,无所谓优劣,因为这要看场景。我们其实很难说,是保守的策略好,还是激进的策略好。这其实要看倾斜处理带来的收益有多大,这就要说到第二个话题。 也就是倾斜处理的开销问题,其实不管是AQE的自动倾斜处理,还是我们开发者手工处理数据倾斜(可以参考第29讲的“两阶段Shuffle”),实际上在处理倾斜的过程中,都会引入不同程度的计算开销。核心问题在于:数据倾斜带来的负载倾斜是否构成整个关联计算的性能瓶颈?如果这个问题的答案是肯定的,那么AQE的自动倾斜处理也好、开发者的手工处理也罢,就都是值得的。相反,那就成了“费力不讨好”。 以上是我的一些想法,欢迎老弟继续讨论~ 再次感谢老弟补充完善!
作者回复: 好问题,看的很细~ 你是第一个深究这个参数的同学,赞一个~ 👍 这个是我的锅,我没有交代清楚,spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin,这是个internal的配置项,Spark社区并没有把它暴露给开发者,不过我们还是可以通过调整这个参数的设置,来调整Spark SQL的优化行为。
作者回复: 可以参考刚刚留言的那个预估公式: val df: DataFrame = _ df.cache.count val plan = df.queryExecution.logical val estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes 这里estimated,就是数据集在内存中的预估大小,你可以参考这个数值,来设置Driver内存~
作者回复: 好问题,我觉得几个渠道吧,一个是官方的release notes,一个是databricks的官方博客,再有就是源码啦~
作者回复: 对,确实有这个问题。 这块其实后面广播那几讲会有介绍。就是如果RDD、DataFrame没有压缩,Spark判断的存储大小,是数据集对应的源文件在文件系统上面的磁盘存储大小。如果用Parquet、ORC这种压缩比比较高的数据,就容易出现你说的问题。 我们后面会有一个数据集大小预估办法,你可以参考那个办法,来预估数据集在内存中的大小~ 具体来说: val df: DataFrame = _ df.cache.count val plan = df.queryExecution.logical val estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes
作者回复: 大小是相对的哈,参与Join的两张表,如果左右表的尺寸相差3倍以上,就可以考虑用“大表Join小表”的思路来解决这类关联问题了。我们在后面的第27讲,专门讲解“大表Join小表”的各种优化思路。 你说的情况,是和广播阈值作对比,就是能放进广播变量的,都可以算是小表。 所以,回答你的问题,10M我觉得算是小表了,和数据条目的数量没有关系,和整个数据集的内存存储大小有关系。Spark不会限制说,你的数据集条目超过了多少行,就不能做优化,主要还是看存储大小。
作者回复: 爱莫能助,得升级到3.0~