作者回复: netty buffer导致的堆外内存溢出,和GC没关系哈,这个问题比较复杂,咱们一步步说。 首先,io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 7633633280, max: 7635730432),这个问题,往往出现在Shuffle read阶段,spark用netty的client/server来拉取远端节点数据,并且透过java.nio.DirectByteBuffers来缓存接收到的数据块。当数据分布存在比较严重的倾斜问题的时候,就会导致某些Block过大,从而导致单个线程占用的Direct Buffer超过16MB,从而报出上面的错误。 因此,要从根本上解决问题,可以先搞定数据倾斜的问题,如果数据倾斜消除了,那么这个问题大概率自己就会消失掉。关于消除数据倾斜的方法,可以参考后面AQE那几讲,以及两阶段Shuffle那一讲。 接下来,假设你消除了Data Skew之后,这个报错还在,那么就继续用下面的办法。DirectByteBuffers默认的大小就是spark.executor.memory的大小,也就是说,它在逻辑上,会“计入”Executor memory内存的消耗。spark.executor.memory这玩意其实指定的JVM heap堆内的内存大小,而DirectByteBuffers是堆外内存,按理说两者应该区别对待,然而默认情况下,并没有。因此,如果DirectByteBuffers消耗非常的过分,那么我们可以在spark.executor.extraJavaOptions当中,特意地去指定-XX:MaxDirectMemorySize这个参数,这个参数,就是用来指定DirectByteBuffers的内存大小,可以把它设置的大一些。 再者,假设上面的设置,还不能解决问题,那么接下来,我们就得做进一步的精细化调优。接下来可能有点复杂,有些参数可能得需要你慢慢消化,不过没关系,公式比较简单。首先,把spark.reducer.maxSizeInFlight,设置成-XX:MaxDirectMemorySize / spark.executor.cores ,这个设置的意图,是降低每个线程需要缓存的数据量。然后,把spark.maxRemoteBlockSizeFetchToMem,设置成spark.reducer.maxSizeInFlight / 5,这个设置的意图,是为了把大的Block直接落盘,从而迅速释放线程占用的Direct buffer,降低Direct buffer(也就是堆外内存)的消耗,从而降低OOM的风险。没错,这个问题,本质上还是一种OOM问题。 思路大概就这些,老弟不妨一试,有结果了记得说一声哈~ 不好使的话,咱们继续再调优~ 加油!
作者回复: 是的,没错!补充的很到位,谢谢哈~
作者回复: 看上去你的部署是Hive on Spark,Stage-1_0: 0/1099是Hive根据运行时,自动计算出来的,也就是mapreduce.job.reduces这个配置项是-1的时候,Hive自动帮你算。不过当你指定mapreduce.job.reduces具体值的时候,比如你这里的1500,那么Hive就会让所有的Spark Reduce阶段都具有1500的并行度。 从这里可以看到:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-Spark,Hive接管了Shuffle Reduce阶段并行度的计算: mapreduce.job.reduces Default Value: -1 (disabled) Added in: Hive 1.1.0 with HIVE-7567 Sets the number of reduce tasks for each Spark shuffle stage (e.g. the number of partitions when performing a Spark shuffle). This is set to -1 by default (disabled); instead the number of reduce tasks is dynamically calculated based on Hive data statistics. Setting this to a constant value sets the same number of partitions for all Spark shuffle stages. 就是刚才说的,如果不设置,默认-1,那么Hive自动帮你算,也就是你最开始的并行度:Stage-1_0: 0/1099;后来你手工改成1500,那么后续的并行度就都是1500。也就是说,因为这个Hive on Spark配置项的存在,Spark自己的配置项spark.default.parallelism和spark.sql.shuffle.partitions,就都被禁默了。这也是为什么你反复调整这两个参数没什么用的原因~
作者回复: 哈哈,感谢老弟的建议~
作者回复: 好问题,关于并行度、并发度与执行内存,三者之间的关系与设置,可以参考第14讲说到的“三足鼎立”:https://time.geekbang.org/column/article/362710。 200MB是一种经验之谈,也就是大部分情况下的设置。精确的计算,还是要参考三足鼎立说到的方法。其实并行度也好、并发度与执行内存也罢,他们的设置,都不是一成不变的,不然的话,咱们其实也不需要做调优了。200MB给大家的是个起始值建议,也就是说,当你无从下手的时候,不妨从200MB开始。但是有了三足鼎立这种比较系统的方法,你就可以更合理地设置相关参数,让并行度、并发度与执行内存三者之间达到平衡。 举个极端的例子,虽然你数据很大,5TB,但是你的机器内存受限,这个时候数据分片设置成200M也没有什么毛病,当然问题就是tasks数量太大,但这个是有前提的,前提就是内存受限。相反,如果你内存相对充裕,那么自然可以根据三足鼎立的算法,去相应地调整参数,这个时候,并行度算出来是多少合适,那就设置成多少就可以啦~
作者回复: 感谢老弟的认可~ 一看老弟的ID就知道你是搞机器学习的,同行你好,幸会~ 哈哈!以后找机会可以一起探讨ML、DL的应用和实战~ 问得都是好问题!先说第一个,你的sense非常好,对于PairBuffer和AppendOnlyMap这些内存数据结构,Spark确实会对他们做扩容,也就是第一次发现空间不足的时候,Spark会创建2倍于之前的数据结构,然后把之前已经聚合好的结果,迁移到这个新创建的数据结构中来,而且后续的聚合计算,都围绕着新的数据结构进行,也就是这个扩容出来的2倍的空间,之前的数据结构惨遭弃用,后续慢慢地被GC回收。不过,这种扩容,只做一次,如果后续发现还需要更多内存,那么后续的计算就只能依赖Spill来完成了。值得一提的是,如果因为内存受限、或是其他任务的内存抢占,扩容的时候失败了,这个时候就会报OOM。 再来说第二个问题,确实是好问题,不过Spark并没有采用两两归并的优化机制,确实是同时对多个文件流做合并。说说这块为啥没有做优化,我说说我的理解哈,主要是这块的性能开销其实还好,换句话说,需要合并的spills的数量,其实还好,没有想象的那么多。为什么这么说呢, 我们来举个例子。通常来说,每个Task处理的数据分片大小在200MB最佳,这个是结合经验得出的结论。那么我们就可以利用后面说的“三足鼎立”来保证每个Task的分片大小就在200MB左右。在这样的情况下,现代计算机的硬件资源基本上都比较充足,比如说,对于一个有着5g Execution Memory,32 cores的Executors来说,每个Task能分到的内存时160MB左右,对于200MB的数据分区来说,其实spills的数量是有限的,因此这种情况下,同时打开多个临时文件的压力其实还好~
作者回复: 非常赞~ 👍 思考的很到位,没错,就像你说的那样~ 首先,溢出的临时文件的内容,就是和分区数据的顺序有关,你说的没错,同样的分片,换个顺序,溢出的文件内存(对reduceByKey来说)大概率是不同的; 再者,不论溢出文件的内容相同与否,同样的分片数据,最终的中间文件是一致的,这个是由最终的Merge Sort决定的。 最后,就像你分析的,reduceByKey溢出的临时文件,一定会比groupByKey更少,原因很简单,就像你说的,因为有Map端聚合,所以自然节省空间,需要溢出的频率就降低了,溢出文件自然更少。 很棒~ 耐心地推导了reduceByKey的全部过程,赞赞赞~
作者回复: 好问题,我们一个个来看。 1. 确实都不需要排序,但是,在Spark的byPass机制下,groupByKey可以通过设置spark.shuffle.sort.bypassMergeThreshold从而跳过排序操作,但是,reduceByKey不行,因为它有Aggregate聚合操作,所以,坦白说,Sort shuffle manager对于reduceByKey这种操作确实不太公平,但是没办法,它的实现机制就是这样。 2. 没错,在默认排序的情况下,你说的这几个步骤,都会有排序,都会受排序影响。 3. spark.shuffle.sort.bypassMergeThreshold这个参数是控制shuffle的,所以不区分是rdd还是DataFrame、Dataset,如果满足byPass条件,都会生效的~
作者回复: 第一题满分💯~ map端聚合,主要是降低数据量,减少磁盘和网络的开销。归并排序主要是保证运行的稳定性,因为归并排序可以通过磁盘来做,也就是用External Sorter来做这事,这样不会因为内存受限而频繁地oom。
作者回复: 感谢老弟的认可~ 等以后有时间吧,现在有个《零基础入门Spark》的课程还没完结,实在忙不过来了,哈哈