• RespectM
    2021-06-20
    老师,如何加快netty堆外内存的回收啊?snappy+parquet格式数据会导致,netty堆外内存增长太快,导致netty使用过多direct memory,然后报错。

    作者回复: netty buffer导致的堆外内存溢出,和GC没关系哈,这个问题比较复杂,咱们一步步说。 首先,io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 7633633280, max: 7635730432),这个问题,往往出现在Shuffle read阶段,spark用netty的client/server来拉取远端节点数据,并且透过java.nio.DirectByteBuffers来缓存接收到的数据块。当数据分布存在比较严重的倾斜问题的时候,就会导致某些Block过大,从而导致单个线程占用的Direct Buffer超过16MB,从而报出上面的错误。 因此,要从根本上解决问题,可以先搞定数据倾斜的问题,如果数据倾斜消除了,那么这个问题大概率自己就会消失掉。关于消除数据倾斜的方法,可以参考后面AQE那几讲,以及两阶段Shuffle那一讲。 接下来,假设你消除了Data Skew之后,这个报错还在,那么就继续用下面的办法。DirectByteBuffers默认的大小就是spark.executor.memory的大小,也就是说,它在逻辑上,会“计入”Executor memory内存的消耗。spark.executor.memory这玩意其实指定的JVM heap堆内的内存大小,而DirectByteBuffers是堆外内存,按理说两者应该区别对待,然而默认情况下,并没有。因此,如果DirectByteBuffers消耗非常的过分,那么我们可以在spark.executor.extraJavaOptions当中,特意地去指定-XX:MaxDirectMemorySize这个参数,这个参数,就是用来指定DirectByteBuffers的内存大小,可以把它设置的大一些。 再者,假设上面的设置,还不能解决问题,那么接下来,我们就得做进一步的精细化调优。接下来可能有点复杂,有些参数可能得需要你慢慢消化,不过没关系,公式比较简单。首先,把spark.reducer.maxSizeInFlight,设置成-XX:MaxDirectMemorySize / spark.executor.cores ,这个设置的意图,是降低每个线程需要缓存的数据量。然后,把spark.maxRemoteBlockSizeFetchToMem,设置成spark.reducer.maxSizeInFlight / 5,这个设置的意图,是为了把大的Block直接落盘,从而迅速释放线程占用的Direct buffer,降低Direct buffer(也就是堆外内存)的消耗,从而降低OOM的风险。没错,这个问题,本质上还是一种OOM问题。 思路大概就这些,老弟不妨一试,有结果了记得说一声哈~ 不好使的话,咱们继续再调优~ 加油!

    共 6 条评论
    24
  • 对方正在输入。。。
    2021-04-13
    老师我补充一下,采用sortShuffle的方式时,只有满足在shuffleDependency里面aggeragator或者sort这两个字段有效时,才会根据partitionid和key排序,否则只根据partitionid排序。如不会按照key排序的算子有repartition

    作者回复: 是的,没错!补充的很到位,谢谢哈~

    共 3 条评论
    13
  • 快跑
    2021-05-21
    老师你好,针对参数配置有几个疑问,辛苦帮忙解答下疑问 SELECT userid, pvid FROM table GROUP BY user,pvid 没有额外设置配置的情况下,日志:Stage-0_0: 0(+1)/1294 Stage-1_0: 0/1099 Stage-0_0,就是读取文件的过程,Task数据根据数据块决定的。 疑问1:Stage-1_0,这个阶段的Task数量,默认是怎么计算出来的? 我想改变Stage-1_0阶段的Task并发数量。 通过设置spark.default.parallelism=2000和spark.sql.shuffle.partitions=3000都没有生效。倒是mapreduce.job.reduces=1500配置项生效了。 日志:Stage-0_0: 0(+1)/1294 Stage-1_0: 0/1500 疑问2:这种sql执行计划中的groupByKey,属于spark.sql.shuffle.partitions所描述的聚合类操作的场景么? 疑问3:spark.sql.shuffle.partitions 和 mapreduce.job.reduces 怎么理解这两个参数的使用场景
    展开

    作者回复: 看上去你的部署是Hive on Spark,Stage-1_0: 0/1099是Hive根据运行时,自动计算出来的,也就是mapreduce.job.reduces这个配置项是-1的时候,Hive自动帮你算。不过当你指定mapreduce.job.reduces具体值的时候,比如你这里的1500,那么Hive就会让所有的Spark Reduce阶段都具有1500的并行度。 从这里可以看到:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-Spark,Hive接管了Shuffle Reduce阶段并行度的计算: mapreduce.job.reduces Default Value: -1 (disabled) Added in: Hive 1.1.0 with HIVE-7567 Sets the number of reduce tasks for each Spark shuffle stage (e.g. the number of partitions when performing a Spark shuffle). This is set to -1 by default (disabled); instead the number of reduce tasks is dynamically calculated based on Hive data statistics. Setting this to a constant value sets the same number of partitions for all Spark shuffle stages. 就是刚才说的,如果不设置,默认-1,那么Hive自动帮你算,也就是你最开始的并行度:Stage-1_0: 0/1099;后来你手工改成1500,那么后续的并行度就都是1500。也就是说,因为这个Hive on Spark配置项的存在,Spark自己的配置项spark.default.parallelism和spark.sql.shuffle.partitions,就都被禁默了。这也是为什么你反复调整这两个参数没什么用的原因~

    
    12
  • 慢慢卢
    2021-06-14
    建议老师以同样的思路弄一个flink的专栏

    作者回复: 哈哈,感谢老弟的建议~

    共 2 条评论
    7
  • 空
    2021-06-12
    老师,您好,学习了您的专栏受益匪浅,对于您评论里说的每个task处理的数据分片200M左右最佳,那如果我两个5T的大表做join,那我的shuffle reduce 的task数量岂不是要26215,那这个磁盘和网络开销不是大的惊人?

    作者回复: 好问题,关于并行度、并发度与执行内存,三者之间的关系与设置,可以参考第14讲说到的“三足鼎立”:https://time.geekbang.org/column/article/362710。 200MB是一种经验之谈,也就是大部分情况下的设置。精确的计算,还是要参考三足鼎立说到的方法。其实并行度也好、并发度与执行内存也罢,他们的设置,都不是一成不变的,不然的话,咱们其实也不需要做调优了。200MB给大家的是个起始值建议,也就是说,当你无从下手的时候,不妨从200MB开始。但是有了三足鼎立这种比较系统的方法,你就可以更合理地设置相关参数,让并行度、并发度与执行内存三者之间达到平衡。 举个极端的例子,虽然你数据很大,5TB,但是你的机器内存受限,这个时候数据分片设置成200M也没有什么毛病,当然问题就是tasks数量太大,但这个是有前提的,前提就是内存受限。相反,如果你内存相对充裕,那么自然可以根据三足鼎立的算法,去相应地调整参数,这个时候,并行度算出来是多少合适,那就设置成多少就可以啦~

    
    6
  • CycleGAN
    2021-05-15
    老师的这一节讲得真棒,我也看了一些博客和书,但总是乱糟糟的,专栏质量很高,兼顾了深度+清晰度+新,已推荐同事中,好好看一起跳槽。。 老师我有个两个问题,第一个是针对问题一,每一个分区的(分区 ID,Key)的枚举数目,在初始阶段就应该就确定了,partitionedAppendOnlyMap中很多时候key有时候也就几十几百,这里,是5种,而buffer只能放4种,差这么一种,就增加了很多次的落盘读盘,spark有针对buffer的动态调整吗? 第二个,是对所有临时文件和内存数据结构中剩余的数据记录做归并排序,是结合堆排序的吗,临时文件太多的时候,会不会不能同时打开这么多文件,还是用的类似优化版的两两归并呢?

    作者回复: 感谢老弟的认可~ 一看老弟的ID就知道你是搞机器学习的,同行你好,幸会~ 哈哈!以后找机会可以一起探讨ML、DL的应用和实战~ 问得都是好问题!先说第一个,你的sense非常好,对于PairBuffer和AppendOnlyMap这些内存数据结构,Spark确实会对他们做扩容,也就是第一次发现空间不足的时候,Spark会创建2倍于之前的数据结构,然后把之前已经聚合好的结果,迁移到这个新创建的数据结构中来,而且后续的聚合计算,都围绕着新的数据结构进行,也就是这个扩容出来的2倍的空间,之前的数据结构惨遭弃用,后续慢慢地被GC回收。不过,这种扩容,只做一次,如果后续发现还需要更多内存,那么后续的计算就只能依赖Spill来完成了。值得一提的是,如果因为内存受限、或是其他任务的内存抢占,扩容的时候失败了,这个时候就会报OOM。 再来说第二个问题,确实是好问题,不过Spark并没有采用两两归并的优化机制,确实是同时对多个文件流做合并。说说这块为啥没有做优化,我说说我的理解哈,主要是这块的性能开销其实还好,换句话说,需要合并的spills的数量,其实还好,没有想象的那么多。为什么这么说呢, 我们来举个例子。通常来说,每个Task处理的数据分片大小在200MB最佳,这个是结合经验得出的结论。那么我们就可以利用后面说的“三足鼎立”来保证每个Task的分片大小就在200MB左右。在这样的情况下,现代计算机的硬件资源基本上都比较充足,比如说,对于一个有着5g Execution Memory,32 cores的Executors来说,每个Task能分到的内存时160MB左右,对于200MB的数据分区来说,其实spills的数量是有限的,因此这种情况下,同时打开多个临时文件的压力其实还好~

    共 2 条评论
    6
  • 苏子浩
    2021-04-19
    老师我想确认一下,问题一所提到的reduceByKey 中 Map 阶段每一个临时文件中的内容是否和该对应数据分区中的数据记录的顺序有关呢?因为分区中数据记录的不同而导致Map 阶段每一个临时文件中的内容不同。根据在文中提到的GroupBuKey算子的计算步骤二:“PartitionedPairBuffer 填满后,如果分片中还有未处理的数据记录,就对 Buffer 中的数据记录按(目标分区 ID,Key)进行排序,将所有数据溢出到临时文件,同时清空缓存。”那么类比到ReduceByKey中,就可以得到“当PartitionedAppendOnlyMap填满后,如果分片中还有未处理的数据记录,就对 Buffer 中的数据记录按(目标分区 ID,Key)进行排序,将所有数据溢出到临时文件,同时清空缓存。”那么根据您文中所给出的“数据分区0”图片,所可看到的文件顺序是“红,菊,黄,紫,黄,红,紫,橙,紫,青,青,橙,橙,红,黄,黄”。那么根据上述算法,假设PartitionedAppendOnlyMap的size为4。那么在我们处理到第一个“青”的时候,触发第一次临时文件的排序与溢出,并清空数据结构。i.e. 溢写出的第一个文件为: [(分区id, 红), 2]; [(分区id, 橙), 2]; [(分区id, 黄), 2]; [(分区id, 紫), 3]。继续扫描剩下的数据记录,直到遍历所有数据分区0中所有数据记录。此时partitionedAppendOnlyMap里的数据为: [(分区id, 红), 1]; [(分区id, 橙), 2]; [(分区id, 黄), 2]; [(分区id, 蓝), 2]. 此时我们对partitionedAppendOnlyMap里的数据排序后,与溢出的临时文件进行归并排序得到输出文件(数据文件): [(分区id,红), 3]; [(分区id, 橙), 4]; [(分区id, 黄), 4]; [(分区id, 紫), 3]; [(分区id, 青), 2]. 对应的索引文件为: 0,1,2,3,4. 和 groupByKey 生成的中间文件不一样,map端聚合,降低了数据量。 不好意思,打了这么多字,谢谢!
    展开

    作者回复: 非常赞~ 👍 思考的很到位,没错,就像你说的那样~ 首先,溢出的临时文件的内容,就是和分区数据的顺序有关,你说的没错,同样的分片,换个顺序,溢出的文件内存(对reduceByKey来说)大概率是不同的; 再者,不论溢出文件的内容相同与否,同样的分片数据,最终的中间文件是一致的,这个是由最终的Merge Sort决定的。 最后,就像你分析的,reduceByKey溢出的临时文件,一定会比groupByKey更少,原因很简单,就像你说的,因为有Map端聚合,所以自然节省空间,需要溢出的频率就降低了,溢出文件自然更少。 很棒~ 耐心地推导了reduceByKey的全部过程,赞赞赞~

    共 5 条评论
    6
  • 快跑
    2021-04-15
    老师好,这节学习过后生成许多疑问,请老师帮忙解惑 想就“千女散花”的过程详细了解下spark.shuffle.sort.bypassMergeThreshold 1、“千女散花”过程中的groupByKey和reduceByKey是否都不需要引入排序操作? 目前我觉得是不需要 2、如果满足spark.shuffle.sort.bypassMergeThreshold阈值情况下,由这个参数引入排序具体发生在哪阶段? map写临时文件阶段 临时文件归并merge阶段 reduce拉取数据merge 这些过程都会有影响么 3、spark.shuffle.sort.bypassMergeThreshold生效的场景会区分spark rdd, spark sql,还是hive on spark么

    作者回复: 好问题,我们一个个来看。 1. 确实都不需要排序,但是,在Spark的byPass机制下,groupByKey可以通过设置spark.shuffle.sort.bypassMergeThreshold从而跳过排序操作,但是,reduceByKey不行,因为它有Aggregate聚合操作,所以,坦白说,Sort shuffle manager对于reduceByKey这种操作确实不太公平,但是没办法,它的实现机制就是这样。 2. 没错,在默认排序的情况下,你说的这几个步骤,都会有排序,都会受排序影响。 3. spark.shuffle.sort.bypassMergeThreshold这个参数是控制shuffle的,所以不区分是rdd还是DataFrame、Dataset,如果满足byPass条件,都会生效的~

    共 4 条评论
    5
  • 井先生
    2021-04-08
    答题1: 溢写出一个文件 [(分区id,红),3] ,[(分区id,橙),4] ,[(分区id,黄),4] ,[(分区id,紫),3] partitionedAppendOnlyMap里面的数据 [(分区id,蓝),2] 归并排序后的输出文件 [(分区id,红),3] ,[(分区id,橙),4] ,[(分区id,黄),4] ,[(分区id,紫),3],[(分区id,蓝),2] 比groupByKey生产的中间文件size小,因为做过map端的预聚合 疑问: map端做归并排序再写文件的目的是为了每一个分区的数据连续,从而让reduce端在读文件的时候读连续的记录速度更快吗?

    作者回复: 第一题满分💯~ map端聚合,主要是降低数据量,减少磁盘和网络的开销。归并排序主要是保证运行的稳定性,因为归并排序可以通过磁盘来做,也就是用External Sorter来做这事,这样不会因为内存受限而频繁地oom。

    共 4 条评论
    5
  • 福
    2021-11-09
    真的是希望老师出个flink的,写的好nice呀

    作者回复: 感谢老弟的认可~ 等以后有时间吧,现在有个《零基础入门Spark》的课程还没完结,实在忙不过来了,哈哈

    
    3