作者回复: 都需要考虑,结合你这个例子,整体逻辑是这样的。 首先,Spark读取分布式文件,获取数据源,这个时候,并行度就是文件在分布式文件系统上的并行度,比如HDFS、比如S3。HDFS的分片可能是128M或是256M,那么它的并行度,就取决于文件总大小和分片大小的商。 这个时候,由于分片大小是固定的,你可以结合分片大小,去设置执行内存和并发度(executor线程池),让他们满足(1/N/2,1/N)的关系。 然后,你设置的spark.sql.shuffle.partitions,会控制Joins之中的Shuffle Reduce阶段并行度,这个参数设置多少。其实取决于你Shuffle过后每个Reduce Task需要处理的数据分片大小。由于你之前是按照128M或是256M来设置的执行内存, 和并发度。这个时候,在设置spark.sql.shuffle.partitions这个值的时候,只要保证数据分片大小还是在128M或是256M左右(shuffle前可能有过滤、过程中还会有聚合,所以原来的并行度就不合适了),就依然能维持“三足鼎立”的计算平衡。 所以说,核心是维持这个平衡。在你这个case,核心思路是,根据“定下来的”,去调整“未定下来的”,就可以去设置每一个参数了。 在最开始,“定下来的”是并行度,因为这个是文件系统决定的,但是执行内存和并发度未定,所以你可以相应地去调整这两类参数。 后面Shuffle的时候,执行内存和并发度已经定下来了,但是spark.sql.shuffle.partitions未定,你再结合那个公式,去定这个参数的值就好了。思路就是这么个思路,其实还是万变不离其宗。
作者回复: 没问题,满分💯 ~
作者回复: 好问题,其实没有什么特别的含义,就是一种公平机制,就是保证至少有均值的1/2可以满足,否则就不进行计算。 其实你说把它改成3成不成,我觉得也没什么不可以。但是,改成3之后,task最低内存保障更低了,即便你有1/3给它,它也完不成计算,其实还是得挂起。 1/2的保底,其实更make sense,因为1/2的内存相比均值来说,只差了一半,这样有些任务你先分配了1/2,运行的过程中,其他task还会释放内存,这个时候,这个task还是可以成功执行的。但如果是1/3,你亏空的内存更多,需要等待的概率越大,挂起的概率也就越大。
作者回复: 好问题~ 是这样,这个公式的目的,主要是让每个Task能够拿到并处理适量的数据,不至于因为数据分布本身,而带来OOM。 D/P ~ (M/N/2, M/N),也就是数据分片大小,让他与M/N在同一个当量。这样基本能够保证每个Task处理的数据是适量的。 怎么理解适量呢,就是在消耗一定内存(如AppendOnlyMap)的基础上,有少量的溢出。我们知道,D/P是原始数据的尺寸,真正到内存里去,是会翻倍的,至于翻多少倍,这个和文件格式有关系。不过,不管他翻多少倍,只要原始的D/P和M/N在一个当量,那么我们大概率就能避开OOM的问题,不至于某些Tasks需要处理的数据分片过大而OOM。 整体上是这么个逻辑,不确定我说清楚了没,有问题再讨论哈~
作者回复: Spark UI有专门的配置页,记录了不同配置项的设置值,其中有默认并行度设置。 但重点不在这,重点是协调、平衡“三足鼎立”,也就是并行度、并发度、执行内存,去提升CPU利用率,所以你更需要使用系统工具、监控工具,比如ganglia、Prometheus、Grafana、nmon、htop等等,这些工具,去观察你的CPU利用率,然后回过头来,平衡三者,然后再去观察CPU利用率是不是提升了,通过这种方式,来去调优。
作者回复: 稍有差别,M、N是针对Executor的,也就是Executor的执行内存M、线程池大小N。 D和P不一样,它指的是你的分布式数据集,D是数据总量,比如20GB;而P指的是这份数据集的并行度,比如200,那么你的每个数据分片的大小D/P,就是20GB/200 = 100MB。 如果你的M是2GB,也就是2GB的执行内存,N是20,也就是20个线程,那么这个时候,M/N就是100MB。那么,你的D/P就刚好坐落在(M/N/2, M/N)这个区间里。
作者回复: 对,没错~
作者回复: 这个需要熟悉shuffle里面,用到的内存数据结构,比如AppendOnlyMap,PartitionedPairBuffer,等等。在Shuffle map task阶段,Spark会利用类似的数据结构,来计算数据,当这些数据结构空间不足的时候,Spark会成倍扩容这些数据结构,但是只会扩容一次。想象一下,如果没有Spill机制,实际上Spark很容易OOM,因为扩容一次之后,也很难容下单个数据分片的全部数据
作者回复: 好问题,这个公式的作用其实是双向的,也即是给定资源,你可以算出并行度;反过来,找到合适的并行度,你也可以指导Spark集群资源的设定。 你说的没错,2GB的分片,确实太大了,通常来说,分片大小在200MB左右,是比较合适的,推荐把分片大小设定在这个范围。 有了分片大小,其实就可以反向指导Spark资源设定了。在你的例子里面,我理解资源是提前设定好了,也就是你说的堆内外分别9G,5个cores,也许这个设定是基于经验、也许是出于直觉。 如果用200MB分片大小反推过来的话,其实你可以考虑降低Executors的内存配置,或是提高它的CPU cores配置,这样把内存与CPU做到一个均衡配比,相比现在一个task要处理2GB的数据,效果要更好~ 毕竟咱们性能调优的本质,是让硬件资源之间尽量去平衡,调节并行度也好、分片大小也好、各种资源参数也好,实际上都是手段。
作者回复: 没写错哟,上限就是 spark.executor.memory * spark.memory.fraction ,也就是storage memory + execution memory之和。Unified memory manager,在统一管理模式下,大家是可以互相抢占的,因此,如果没有分布式数据集缓存,storage memory那片内存区域,执行任务是都可以抢过来的,所以上限就是两者之和。