Spark 性能调优实战
吴磊
前 FreeWheel 机器学习团队负责人
8808 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 36 讲
Spark 性能调优实战
15
15
1.0x
00:00/00:00
登录|注册

14 | CPU视角:如何高效地利用CPU?

你好,我是吴磊。
在日常的开发与调优工作中,总有同学向我抱怨:“为什么我的应用 CPU 利用率这么低?偌大的集群,CPU 利用率才 10%!”确实,较低的 CPU 利用率不仅对宝贵的硬件资源来说是一种非常大的浪费,也会让应用端到端的执行性能很难达到令人满意的效果。那么,在分布式应用开发中,我们到底该如何高效地利用 CPU?
我们说过,性能调优的最终目的,是在所有参与计算的硬件资源之间寻求协同与平衡,让硬件资源达到一种平衡、无瓶颈的状态。对于 CPU 来说,最需要协同和平衡的硬件资源非内存莫属。原因主要有两方面:一方面,在处理延迟方面,只有内存能望其项背;另一方面,在主板上内存通过数据总线直接向 CPU 寄存器供给数据。因此,理顺它们之间的关系,可以为性能调优奠定更好的基础。
那么,今天这一讲,我们就从硬件资源平衡的角度入手,去分析 CPU 与内存到底该如何合作。

CPU 与内存的平衡本质上是什么?

我们知道,Spark 将内存分成了 Execution Memory 和 Storage Memory 两类,分别用于分布式任务执行和 RDD 缓存。其中,RDD 缓存虽然最终占用的是 Storage Memory,但在 RDD 展开(Unroll)之前,计算任务消耗的还是 Execution Memory。因此,Spark 中 CPU 与内存的平衡,其实就是 CPU 与执行内存之间的协同与配比。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文从硬件资源平衡的角度出发,探讨了CPU与内存之间的协同与配比。作者指出,要实现CPU与执行内存之间的平衡,需要合理配置并行度、执行内存大小和集群的并行计算能力这三类参数。文章通过生动的比喻引出了执行内存抢占规则,并详细介绍了并行度、并发度与执行内存的概念及其配置项。此外,文章还分析了CPU低效的两个原因:线程挂起和调度开销,以及它们对CPU与计算内存之间平衡的影响。通过本文,读者可以了解到在分布式应用开发中如何合理配置硬件资源,以实现CPU与内存的高效利用。文章通过黄小乙的如意算盘的比喻,生动形象地阐述了在给定Executor线程池和执行内存大小的情况下,如何计算一个能够让数据分片平均大小在一定区间内的并行度,从而有效提升CPU利用率。总结来说,本文通过深入浅出的方式,为读者提供了优化CPU利用率的方法,帮助他们更好地理解硬件资源平衡的重要性。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Spark 性能调优实战》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(35)

  • 最新
  • 精选
  • Geek_d794f8
    老师,在考虑并行度,内存,线程数三者之间的平衡时,spark.sql.shuffle.partitions的值是shuffle的reducer阶段的并行度,那么对于从数据源读取(比如读hive表)这个起始的map阶段的并行度是否需要考虑?这个阶段spark底层有某种默认的切片规则吗,需要在代码中人为的干预吗(比如coalesce)? 我使用的是DataFrame和DataSet的api。

    作者回复: 都需要考虑,结合你这个例子,整体逻辑是这样的。 首先,Spark读取分布式文件,获取数据源,这个时候,并行度就是文件在分布式文件系统上的并行度,比如HDFS、比如S3。HDFS的分片可能是128M或是256M,那么它的并行度,就取决于文件总大小和分片大小的商。 这个时候,由于分片大小是固定的,你可以结合分片大小,去设置执行内存和并发度(executor线程池),让他们满足(1/N/2,1/N)的关系。 然后,你设置的spark.sql.shuffle.partitions,会控制Joins之中的Shuffle Reduce阶段并行度,这个参数设置多少。其实取决于你Shuffle过后每个Reduce Task需要处理的数据分片大小。由于你之前是按照128M或是256M来设置的执行内存, 和并发度。这个时候,在设置spark.sql.shuffle.partitions这个值的时候,只要保证数据分片大小还是在128M或是256M左右(shuffle前可能有过滤、过程中还会有聚合,所以原来的并行度就不合适了),就依然能维持“三足鼎立”的计算平衡。 所以说,核心是维持这个平衡。在你这个case,核心思路是,根据“定下来的”,去调整“未定下来的”,就可以去设置每一个参数了。 在最开始,“定下来的”是并行度,因为这个是文件系统决定的,但是执行内存和并发度未定,所以你可以相应地去调整这两类参数。 后面Shuffle的时候,执行内存和并发度已经定下来了,但是spark.sql.shuffle.partitions未定,你再结合那个公式,去定这个参数的值就好了。思路就是这么个思路,其实还是万变不离其宗。

    2021-04-16
    6
    27
  • zxk
    问题一:并发度决定了数据分片的大小: - 在每个线程都分配到了最大内存,即 M/N 的内存时,如果 task 还需要更多的内存,那么就会发生 OOM。 - 在每个线程都分配到了最少内存,即 M/2N的内存时,如果 task 还需要更多的内存,此时又没有其他线程释放内存供其使用,那么也会导致OOM。

    作者回复: 没问题,满分💯 ~

    2021-04-17
    3
    14
  • qconljk
    首先,在一个 Executor 中,每个 CPU 线程能够申请到的内存比例是有上下限的,最高不超过 1/N,最低不少于 1/N/2,其中 N 代表线程池大小。这个除以2,2代表什么?

    作者回复: 好问题,其实没有什么特别的含义,就是一种公平机制,就是保证至少有均值的1/2可以满足,否则就不进行计算。 其实你说把它改成3成不成,我觉得也没什么不可以。但是,改成3之后,task最低内存保障更低了,即便你有1/3给它,它也完不成计算,其实还是得挂起。 1/2的保底,其实更make sense,因为1/2的内存相比均值来说,只差了一半,这样有些任务你先分配了1/2,运行的过程中,其他task还会释放内存,这个时候,这个task还是可以成功执行的。但如果是1/3,你亏空的内存更多,需要等待的概率越大,挂起的概率也就越大。

    2021-04-14
    5
    9
  • 斯盖丸
    这讲看得有些迷,想请问下老师如何从UI角度看出来我任务的并行度是否合适呢?

    作者回复: Spark UI有专门的配置页,记录了不同配置项的设置值,其中有默认并行度设置。 但重点不在这,重点是协调、平衡“三足鼎立”,也就是并行度、并发度、执行内存,去提升CPU利用率,所以你更需要使用系统工具、监控工具,比如ganglia、Prometheus、Grafana、nmon、htop等等,这些工具,去观察你的CPU利用率,然后回过头来,平衡三者,然后再去观察CPU利用率是不是提升了,通过这种方式,来去调优。

    2021-04-14
    9
  • wow_xiaodi
    最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N/2, M/N)区间 这里很不解为何内存的大小和分片大小有直接联系,无论是计算过程还是shuffle过程,都是用到一些内存占用较小的数据结构去做的,就算内存不够用,也会有gc去保证。这个公式感觉就是让数据分片大小和执行内存等价了,让所有数据都在待在内存中一次性批处理,而不是处理一部分溢出落盘再继续处理?请老师指正

    作者回复: 好问题~ 是这样,这个公式的目的,主要是让每个Task能够拿到并处理适量的数据,不至于因为数据分布本身,而带来OOM。 D/P ~ (M/N/2, M/N),也就是数据分片大小,让他与M/N在同一个当量。这样基本能够保证每个Task处理的数据是适量的。 怎么理解适量呢,就是在消耗一定内存(如AppendOnlyMap)的基础上,有少量的溢出。我们知道,D/P是原始数据的尺寸,真正到内存里去,是会翻倍的,至于翻多少倍,这个和文件格式有关系。不过,不管他翻多少倍,只要原始的D/P和M/N在一个当量,那么我们大概率就能避开OOM的问题,不至于某些Tasks需要处理的数据分片过大而OOM。 整体上是这么个逻辑,不确定我说清楚了没,有问题再讨论哈~

    2021-08-02
    8
  • Geek_d794f8
    我的理解是每个线程申请的内存上限是M/N,那么当数据分片过少,某个task需要处理的数据量较大,M/N的上限执行内存也不够时,就会出现OOM。 不知道这么理解对不对?

    作者回复: 对,没错~

    2021-04-16
    5
  • 小灵芝
    “在给定执行内存 M、线程池大小 N 和数据总量 D 的时候,想要有效地提升 CPU 利用率,我们就要计算出最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N/2, M/N)区间。这样,在运行时,我们的 CPU 利用率往往不会太差。” 请问老师,这里的M, N, D 都是针对一个executor而言的对吧?

    作者回复: 稍有差别,M、N是针对Executor的,也就是Executor的执行内存M、线程池大小N。 D和P不一样,它指的是你的分布式数据集,D是数据总量,比如20GB;而P指的是这份数据集的并行度,比如200,那么你的每个数据分片的大小D/P,就是20GB/200 = 100MB。 如果你的M是2GB,也就是2GB的执行内存,N是20,也就是20个线程,那么这个时候,M/N就是100MB。那么,你的D/P就刚好坐落在(M/N/2, M/N)这个区间里。

    2021-04-30
    4
  • Sampson
    老师您好,请教一下,在上文中有提到spill机制可以保护oom 的溢出,这个是怎么判断的呢

    作者回复: 这个需要熟悉shuffle里面,用到的内存数据结构,比如AppendOnlyMap,PartitionedPairBuffer,等等。在Shuffle map task阶段,Spark会利用类似的数据结构,来计算数据,当这些数据结构空间不足的时候,Spark会成倍扩容这些数据结构,但是只会扩容一次。想象一下,如果没有Spill机制,实际上Spark很容易OOM,因为扩容一次之后,也很难容下单个数据分片的全部数据

    2022-01-19
    2
    2
  • 斯盖丸
    老师,顺着王天雨同学的问题我接着问。我的executor memory为9G,executor的off heap memory也为9G,executor cores为5个,executor instances为30个。 以上是我的配置。 照您的公式,我的数据分片的大小就应该在(9G+9G)/5/2=1.8G到(9G+9G)/5=3.6,即数据分片在(1.8G,3.6G)的范围内吗,那进一步说,我在Spark UI里,找到第一个读取parquet的任务,看shuffle read size这个指标,如果在(1.8G,3.6G)这个区间之内,说明就是可以的,是这样吗? 感觉这个分片好大哦~~

    作者回复: 好问题,这个公式的作用其实是双向的,也即是给定资源,你可以算出并行度;反过来,找到合适的并行度,你也可以指导Spark集群资源的设定。 你说的没错,2GB的分片,确实太大了,通常来说,分片大小在200MB左右,是比较合适的,推荐把分片大小设定在这个范围。 有了分片大小,其实就可以反向指导Spark资源设定了。在你的例子里面,我理解资源是提前设定好了,也就是你说的堆内外分别9G,5个cores,也许这个设定是基于经验、也许是出于直觉。 如果用200MB分片大小反推过来的话,其实你可以考虑降低Executors的内存配置,或是提高它的CPU cores配置,这样把内存与CPU做到一个均衡配比,相比现在一个task要处理2GB的数据,效果要更好~ 毕竟咱们性能调优的本质,是让硬件资源之间尽量去平衡,调节并行度也好、分片大小也好、各种资源参数也好,实际上都是手段。

    2021-05-28
    3
    2
  • kingcall
    M 的下限是 Execution Memory 初始值,上限是 spark.executor.memory * spark.memory.fraction 划定的所有内存区域。这个老师笔误,写错了吧!

    作者回复: 没写错哟,上限就是 spark.executor.memory * spark.memory.fraction ,也就是storage memory + execution memory之和。Unified memory manager,在统一管理模式下,大家是可以互相抢占的,因此,如果没有分布式数据集缓存,storage memory那片内存区域,执行任务是都可以抢过来的,所以上限就是两者之和。

    2021-04-14
    3
    2
收起评论
显示
设置
留言
35
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部