• Geek_d794f8
    2021-04-16
    老师,在考虑并行度,内存,线程数三者之间的平衡时,spark.sql.shuffle.partitions的值是shuffle的reducer阶段的并行度,那么对于从数据源读取(比如读hive表)这个起始的map阶段的并行度是否需要考虑?这个阶段spark底层有某种默认的切片规则吗,需要在代码中人为的干预吗(比如coalesce)? 我使用的是DataFrame和DataSet的api。

    作者回复: 都需要考虑,结合你这个例子,整体逻辑是这样的。 首先,Spark读取分布式文件,获取数据源,这个时候,并行度就是文件在分布式文件系统上的并行度,比如HDFS、比如S3。HDFS的分片可能是128M或是256M,那么它的并行度,就取决于文件总大小和分片大小的商。 这个时候,由于分片大小是固定的,你可以结合分片大小,去设置执行内存和并发度(executor线程池),让他们满足(1/N/2,1/N)的关系。 然后,你设置的spark.sql.shuffle.partitions,会控制Joins之中的Shuffle Reduce阶段并行度,这个参数设置多少。其实取决于你Shuffle过后每个Reduce Task需要处理的数据分片大小。由于你之前是按照128M或是256M来设置的执行内存, 和并发度。这个时候,在设置spark.sql.shuffle.partitions这个值的时候,只要保证数据分片大小还是在128M或是256M左右(shuffle前可能有过滤、过程中还会有聚合,所以原来的并行度就不合适了),就依然能维持“三足鼎立”的计算平衡。 所以说,核心是维持这个平衡。在你这个case,核心思路是,根据“定下来的”,去调整“未定下来的”,就可以去设置每一个参数了。 在最开始,“定下来的”是并行度,因为这个是文件系统决定的,但是执行内存和并发度未定,所以你可以相应地去调整这两类参数。 后面Shuffle的时候,执行内存和并发度已经定下来了,但是spark.sql.shuffle.partitions未定,你再结合那个公式,去定这个参数的值就好了。思路就是这么个思路,其实还是万变不离其宗。

    共 6 条评论
    27
  • zxk
    2021-04-17
    问题一:并发度决定了数据分片的大小: - 在每个线程都分配到了最大内存,即 M/N 的内存时,如果 task 还需要更多的内存,那么就会发生 OOM。 - 在每个线程都分配到了最少内存,即 M/2N的内存时,如果 task 还需要更多的内存,此时又没有其他线程释放内存供其使用,那么也会导致OOM。

    作者回复: 没问题,满分💯 ~

    共 3 条评论
    14
  • qconljk
    2021-04-14
    首先,在一个 Executor 中,每个 CPU 线程能够申请到的内存比例是有上下限的,最高不超过 1/N,最低不少于 1/N/2,其中 N 代表线程池大小。这个除以2,2代表什么?

    作者回复: 好问题,其实没有什么特别的含义,就是一种公平机制,就是保证至少有均值的1/2可以满足,否则就不进行计算。 其实你说把它改成3成不成,我觉得也没什么不可以。但是,改成3之后,task最低内存保障更低了,即便你有1/3给它,它也完不成计算,其实还是得挂起。 1/2的保底,其实更make sense,因为1/2的内存相比均值来说,只差了一半,这样有些任务你先分配了1/2,运行的过程中,其他task还会释放内存,这个时候,这个task还是可以成功执行的。但如果是1/3,你亏空的内存更多,需要等待的概率越大,挂起的概率也就越大。

    共 5 条评论
    9
  • wow_xiaodi
    2021-08-02
    最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N/2, M/N)区间 这里很不解为何内存的大小和分片大小有直接联系,无论是计算过程还是shuffle过程,都是用到一些内存占用较小的数据结构去做的,就算内存不够用,也会有gc去保证。这个公式感觉就是让数据分片大小和执行内存等价了,让所有数据都在待在内存中一次性批处理,而不是处理一部分溢出落盘再继续处理?请老师指正

    作者回复: 好问题~ 是这样,这个公式的目的,主要是让每个Task能够拿到并处理适量的数据,不至于因为数据分布本身,而带来OOM。 D/P ~ (M/N/2, M/N),也就是数据分片大小,让他与M/N在同一个当量。这样基本能够保证每个Task处理的数据是适量的。 怎么理解适量呢,就是在消耗一定内存(如AppendOnlyMap)的基础上,有少量的溢出。我们知道,D/P是原始数据的尺寸,真正到内存里去,是会翻倍的,至于翻多少倍,这个和文件格式有关系。不过,不管他翻多少倍,只要原始的D/P和M/N在一个当量,那么我们大概率就能避开OOM的问题,不至于某些Tasks需要处理的数据分片过大而OOM。 整体上是这么个逻辑,不确定我说清楚了没,有问题再讨论哈~

    
    8
  • 斯盖丸
    2021-04-14
    这讲看得有些迷,想请问下老师如何从UI角度看出来我任务的并行度是否合适呢?

    作者回复: Spark UI有专门的配置页,记录了不同配置项的设置值,其中有默认并行度设置。 但重点不在这,重点是协调、平衡“三足鼎立”,也就是并行度、并发度、执行内存,去提升CPU利用率,所以你更需要使用系统工具、监控工具,比如ganglia、Prometheus、Grafana、nmon、htop等等,这些工具,去观察你的CPU利用率,然后回过头来,平衡三者,然后再去观察CPU利用率是不是提升了,通过这种方式,来去调优。

    
    8
  • 小灵芝
    2021-04-30
    “在给定执行内存 M、线程池大小 N 和数据总量 D 的时候,想要有效地提升 CPU 利用率,我们就要计算出最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N/2, M/N)区间。这样,在运行时,我们的 CPU 利用率往往不会太差。” 请问老师,这里的M, N, D 都是针对一个executor而言的对吧?

    作者回复: 稍有差别,M、N是针对Executor的,也就是Executor的执行内存M、线程池大小N。 D和P不一样,它指的是你的分布式数据集,D是数据总量,比如20GB;而P指的是这份数据集的并行度,比如200,那么你的每个数据分片的大小D/P,就是20GB/200 = 100MB。 如果你的M是2GB,也就是2GB的执行内存,N是20,也就是20个线程,那么这个时候,M/N就是100MB。那么,你的D/P就刚好坐落在(M/N/2, M/N)这个区间里。

    
    4
  • Geek_d794f8
    2021-04-16
    我的理解是每个线程申请的内存上限是M/N,那么当数据分片过少,某个task需要处理的数据量较大,M/N的上限执行内存也不够时,就会出现OOM。 不知道这么理解对不对?

    作者回复: 对,没错~

    
    4
  • Sampson
    2022-01-19
    老师您好,请教一下,在上文中有提到spill机制可以保护oom 的溢出,这个是怎么判断的呢

    作者回复: 这个需要熟悉shuffle里面,用到的内存数据结构,比如AppendOnlyMap,PartitionedPairBuffer,等等。在Shuffle map task阶段,Spark会利用类似的数据结构,来计算数据,当这些数据结构空间不足的时候,Spark会成倍扩容这些数据结构,但是只会扩容一次。想象一下,如果没有Spill机制,实际上Spark很容易OOM,因为扩容一次之后,也很难容下单个数据分片的全部数据

    共 2 条评论
    2
  • 斯盖丸
    2021-05-28
    老师,顺着王天雨同学的问题我接着问。我的executor memory为9G,executor的off heap memory也为9G,executor cores为5个,executor instances为30个。 以上是我的配置。 照您的公式,我的数据分片的大小就应该在(9G+9G)/5/2=1.8G到(9G+9G)/5=3.6,即数据分片在(1.8G,3.6G)的范围内吗,那进一步说,我在Spark UI里,找到第一个读取parquet的任务,看shuffle read size这个指标,如果在(1.8G,3.6G)这个区间之内,说明就是可以的,是这样吗? 感觉这个分片好大哦~~

    作者回复: 好问题,这个公式的作用其实是双向的,也即是给定资源,你可以算出并行度;反过来,找到合适的并行度,你也可以指导Spark集群资源的设定。 你说的没错,2GB的分片,确实太大了,通常来说,分片大小在200MB左右,是比较合适的,推荐把分片大小设定在这个范围。 有了分片大小,其实就可以反向指导Spark资源设定了。在你的例子里面,我理解资源是提前设定好了,也就是你说的堆内外分别9G,5个cores,也许这个设定是基于经验、也许是出于直觉。 如果用200MB分片大小反推过来的话,其实你可以考虑降低Executors的内存配置,或是提高它的CPU cores配置,这样把内存与CPU做到一个均衡配比,相比现在一个task要处理2GB的数据,效果要更好~ 毕竟咱们性能调优的本质,是让硬件资源之间尽量去平衡,调节并行度也好、分片大小也好、各种资源参数也好,实际上都是手段。

    共 3 条评论
    2
  • kingcall
    2021-04-14
    M 的下限是 Execution Memory 初始值,上限是 spark.executor.memory * spark.memory.fraction 划定的所有内存区域。这个老师笔误,写错了吧!

    作者回复: 没写错哟,上限就是 spark.executor.memory * spark.memory.fraction ,也就是storage memory + execution memory之和。Unified memory manager,在统一管理模式下,大家是可以互相抢占的,因此,如果没有分布式数据集缓存,storage memory那片内存区域,执行任务是都可以抢过来的,所以上限就是两者之和。

    共 3 条评论
    2