Spark 性能调优实战
吴磊
前 FreeWheel 机器学习团队负责人
8808 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 36 讲
Spark 性能调优实战
15
15
1.0x
00:00/00:00
登录|注册

15 | 内存视角(一):如何最大化内存的使用效率?

你好,我是吴磊。
上一讲我们说,想要提升 CPU 利用率,最重要的就是合理分配执行内存,但是,执行内存只是 Spark 内存分区的一部分。因此,想要合理分配执行内存,我们必须先从整体上合理划分好 Spark 所有的内存区域。
可在实际开发应用的时候,身边有不少同学向我抱怨:“Spark 划分不同内存区域的原理我都知道,但我还是不知道不同内存区域的大小该怎么设置,纠结来、纠结去。最后,所有跟内存有关的配置项,我还是保留了默认值。”
这种不能把原理和实践结合起来的情况很常见,所以今天这一讲,我就从熟悉的 Label Encoding 实例出发,一步步带你去分析不同情况下,不同内存区域的调整办法,帮你归纳出最大化内存利用率的常规步骤。这样,你在调整内存的时候,就能结合应用的需要,做到有章可循、有的放矢。

从一个实例开始

我们先来回顾一下第 5 讲中讲过的 Label Encoding。在 Label Encoding 的业务场景中,我们需要对用户兴趣特征做 Encoding。依据模板中兴趣字符串及其索引位置,我们的任务是把千亿条样本中的用户兴趣转换为对应的索引值。模板文件的内容示例如下所示。
//模板文件
//用户兴趣
体育-篮球-NBA-湖人
军事-武器-步枪-AK47
实现的代码如下所示,注意啦,这里的代码是第 5 讲中优化后的版本。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入探讨了如何最大化内存的使用效率,通过具体的实例分析和性能调优方法,帮助读者了解了如何在Spark内存分区中最大化内存利用率。文章首先回顾了Label Encoding的业务场景和代码实现思路,然后分析了findIndex函数存在的性能隐患,即User Memory的内存消耗问题。接着,文章提出了性能调优的方法,即使用广播变量来优化代码,减少对User Memory内存区域的占用。通过对比优化前后的代码实现方式,文章展示了优化后的版本对内存消耗的改进,减少了对User Memory内存区域的占用,省掉了大量内存消耗。文章还详细介绍了内存规划的两步走,包括预估内存占用和调整内存配置项的具体操作步骤。最后,强调了合理划分Spark所有的内存区域对提升CPU与内存利用率的重要性,并提出了每日一练的问题,引发读者思考。整体而言,本文内容丰富,深入浅出,适合技术人员学习参考。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Spark 性能调优实战》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(19)

  • 最新
  • 精选
  • 金角大王
    老师,为何UserMemory中自定义数据结构不能像bc那样在StorageMemory中只存一份?

    作者回复: 好问题,我们换个角度讨论这个问题。你的问题其实是:为什么广播变量可以复用,但是User Memory中的数据却不行。 想要复用数据,前提是有“一个人”或是“一个地方”,明确记录了需要复用的这份数据,存储在什么地方,只有这样,后面的任务才能复用这份数据,这样的信息,又叫元数据、元信息。 这就好比,你去宜家买家具,你是需要根据家具的Id,去仓库中自提货物的。家具Id上明确标记了,你需要的货物,存在哪个货架、哪个层、哪个位置。如果没有这样的元信息,偌大的宜家家居超市,你不可能找到你需要的东西。 广播变量这个“货物”的具体地址,BlockManager会帮忙记录,所以“后来的人”(Task),如果需要访问广播变量,它能从BlockManager那里迅速地知道广播变量存储在哪里(哪个executor的什么位置),可以迅速地访问数据。 但是,User Memory也好,Storage、Execution memory也罢,他们本质上就是JVM heap。JVM heap虽然也会记录对象引用对应的存储地址,但是,它没有办法区分,两份数据,在内容上,是不是一样的。实际上,JVM也没有这样的义务。 因此,同一个Task当中两份完全一样的数据,到了JVM那里,它并不知道:“OK,这两份数据一样,后来的人只要访问已有的数据就OK了”。对他来说,这是两份不同的数据,即便内容都一样,他照样会单独花费存储空间来存储。根本原因在于,他并没有Spark的运行时上下文, 不能像BlockManager那样维护全局的元数据。

    2021-04-16
    36
  • Fendora范东_
    还有个疑问,想请教下磊哥 文中说的数据集大小是内存中的数据集吧 文件落盘后数据集大小可以很方便查看,那内存中数据集大小怎么看呢

    作者回复: 对,没错,文中说的数据集大小是内存中的数据集。 这块咱们有过介绍哈~ 最精确的办法: val df: DataFrame = _ df.cache.count val plan = df.queryExecution.logical val estimated: BigInt = spark .sessionState .executePlan(plan) .optimizedPlan .stats .sizeInBytes 就是先Cache,再查看物理执行计划,可以获得数据集在内存中存储大小。

    2021-04-16
    26
  • zxk
    老师,这边想请教两个问题。 问题一:User Memory、Execution Memory、Storage Memory 是属于 Spark 自身对内存区域的划分,但 Spark 的 executor 实际上又是一个 JVM,假如我把 User Memory 设置的非常小,又自定义了一个很大的数据结构,此时 User Memory 不够用了,而 Execution Memory、Storage Memory 还有很大的空闲,那么这时候会不会 OOM?如果是 GC 又不太符合 JVM 的 GC 条件。 问题二:在使用 mapPartition 算子的时候,如果我在进入迭代前外部定义了一个 map,然后迭代中往这个 map 添加数据,那么这个 map 又是占用哪部分内存的?

    作者回复: 都是非常好的问题。 问题一:你说的没错,不管是哪片内存区域,实际上都是JVM Heap的一部分。回答你的问题:如果User Memory空间不足,但是Spark Memory(Storage + Execution)空闲,会不会OOM?对于这种情况,即使你自定的数据结构其实超过了User Memory,但实际上,并不会立即报OOM,因为Spark对于堆内内存的预估,没有那么精确。这里有些Tricky,对于Spark划定的各种“线”,也就是通过配置项设置的不同区域的百分比,它类似于一种“软限制”,也就是建议你遵守它划出的一道道“线”,但如果你没有遵守,强行“跨线”,只要其他区域还有空间,你真的抢占了,Spark也不会立即阻止你。这就好比“中国式”过马路,红绿灯是一种“软限制”,即使是红灯,只要没有机动车通行,咱们照样可以凑一堆人过马路,但如果突然窜出来一辆车,把咱撞了,那就是咱们理亏,因为我们是“闯红灯”的那一方。Spark的“线”也类似,是一种“软限制”。但是,“出来混迟早是要还的”,你的自定义数据结构,占了本该属于Spark Memory的地盘,那么Spark在执行任务或是缓存的时候,很有可能就跟着倒霉,最后(加重加粗)“看上去”是执行任务或是缓存任务OOM,但本质上,是因为你的自定义数据结构,提前占了人家的地方。这种时候,你会很难debug。这也就是为什么Spark要求开发者要遵循各个配置项的设置,不要“越线”。 问题二:还是User Memory,那个map就是自定义数据结构,它会消耗User Memory。你的那种用法,本质上就是闭包。

    2021-04-18
    20
  • 静心
    老师,我发现14讲中,计算合理的并行度,依赖之一是在执行内存大小给定的前提下。而15讲中,计算执行内存大小,依赖之一是在并行度给定的前提下。所以到底如何破局呢?

    作者回复: 好问题,是这样的。 其实这里的关键因素,是谁先谁后的问题,一旦确定了哪个为先,我们就可以参考14讲“三足鼎立”的思路,去调整相关配置项。我们可以分类讨论。 比如在大厂,很多时候,硬件资源是受限的,因此你能拿到多少计算资源,其实是有quota的,尤其是在大厂多个团队共享一个超大集群的情况下。这个时候,你就得让数据去迁就计算资源,那这就意味着,Executors相关的参数,得先定,然后再去倒推并行度相关的参数。 相反,如果你是初创,或者团队独享分布式集群,说白了就是“家里有矿”。这个时候,你可以让计算资源去迁就数据,也就是,先定数据相关的,比如并行度相关参数,然后,再去定Executors相关的并发度、内存大小,等等。 总之,case by case,核心就是谁先谁后,不同的场景,结合实际情况,由你来决定谁优先调整,谁是被动地跟着调整。

    2021-05-12
    17
  • 西南偏北
    老师讲的预估内存占用非常细,但就像老师给出的第二题中说的那样,如果Spark应用程序中的计算逻辑很多,这样预估自然是很精确,但是会花费大量时间,成本巨大!分享一下自己平时粗略估算内存占用的方法(如有不对,还望老师纠正): 1、Storage Memory估算:将要缓存到内存的RDD/Dataset/Dataframe或广播变量进行cache,然后在Spark WEBUI的Storage标签页下直接查看所有的内存占用,大致就对应Storage Memory。 2、Execution Memory估算:有了Storage Memory,因为默认情况下Execution Memory和Storage Memory占用Spark Memory的比例是相同的,这里可以将Execution Memory和Storage Memory设置为相同。 3、User Memory:如果应用中没有使用太多自定义数据类型,保持默认值即可;如果使用了很多自定义数据类型,按老师说的方式进行估算即可。 上面只是一个粗略的估算,可能需要根据任务的执行情况进行一些调整。

    作者回复: 标准答案了,💯。你说的这个方法很有参考价值,虽然是粗略估算,但对于大多数情况来说,其实八九不离十了~ 非常赞,看上去“粗略”,但很实用~

    2021-05-04
    15
  • 快跑
    REPL 中,通过 Java 的常规方法估算数据存储大小 老师,这个过程具体是怎么做呢。

    作者回复: 有不少类库可以“拿来即用”,比如Java类库:Instrumentation,或者第三方库,比如RamUsageEstimator,可以多搜一搜,这样的工具还是蛮多的~

    2021-04-19
    14
  • licl1008
    老师 课后思考中提到内存规划第一步很麻烦 您在留言回复中提到可以根据sparkUI估算'数据放大倍数',然后粗略估算内存。请问具体是如何操作? 根据您这个方法,可以同时得出 user execution storage三个空间的内存大小? 谢谢指导

    作者回复: 咱们先说“内存膨胀系数”,我们可以根据Spark UI,来估算磁盘上的数据,load到内存之后,会膨胀多少倍。具体怎么做呢? 打开Spark UI,找到某一个Stage,任何一个都可以,然后进入Description页面,里面会有一些详细的metrics,其中有两个非常的重要,就是我们拿来粗略计算“内存膨胀系数”用的。也就是Shuffle spill (memory) 和Shuffle spill (disk),我们说的“内存膨胀系数”,就是二者的商,也就是: Memory Expansion Ratio = Shuffle spill (memory) / Shuffle spill (disk) 原理其实特别简单,这两个metrics,针对的是同一份数据进行统计的,只不过一个是在内存中的大小,一个是在磁盘上的大小,因此两者的商,就是内存膨胀系数。 有了这个“内存膨胀系数”,结合你磁盘上的分布式数据集,以及哪些需要执行、哪些需要缓存,你就可以很方便的计算出,他们在内存中的大小,从而相应地去设置Storage Memory和Execution Memory。 当然,这是一种粗略的办法,不过虽然粗略,但是很高效。拿到这个系数之后,对于数据在内存中的占用,估算起来就很方便了。

    2021-05-10
    4
    13
  • Geek_d794f8
    老师,我有一个疑惑,对于有多个stage的任务,每个stage的内存预估的情况可能不一样。那样就无法给一个比较适合所有stage得内存配置?

    作者回复: 好问题,确实,并发度、并行度、执行内存,三者“三足鼎立”。并发度和执行内存,都是一开始就设置好了,在整个作业的执行过程中,都是一样的,不会改变。但是,并行度不是,并行度可以随着作业的进展,随时调整。 所以,在不同的Stages内,为了维持3者的平衡,我们可以通过调整并行度,来维持“三足鼎立”的平衡。那么问题来,在不同的Stages,咋调整并行度呢? 有两种办法: 1. spark.sql.shuffle.partitions 通过spark.conf.set(spark.sql.shuffle.partitions, XXX)来控制每个Stages的并行度大小。这么做有个缺点,就是当你Stages比较多得时候,这个配置项频繁地改来改去,应用的维护成本会很高。 2. AQE的自动分区合并,也就是让Spark自动帮你算合适的并行度。但是这个需要相关的配置项,比如你期望的每个分片大小,等等。配置项的具体细节可以参考“配置项”的第二讲哈~

    2021-04-18
    2
    4
  • licl1008
    老师 我是接着昨天的提问再问一下😂 execution mem内存估算中,#dataset不用算#bc过的数据集。但我的疑惑是:#bc只是把文件系统的数据搬到了内存,同样只是一个Relation而已,和执行要的内存似乎没什么联系。execution时,比如一些aggregate操作,不是同样也要把这个#bc relation的数据读出来,然后驻留在内存中处理吗。那岂不是也是要算入执行内存?或者我没懂算执行内存的理论依据到底是什么?我理解的执行内存是做aggregate sort等操作需要的内存空间,也是就pairedbuffer这之类的结构的大小 谢谢老师不吝赐教😄

    作者回复: 对,你说的没错,执行内存所消耗的,就是PartitionedPairBuffer、PartitionedAppendOnlyMap这些数据结构,以及排序等操作所必需的内存空间。对于Execution Memory的计算,其实咱们说的一直都是估算,因为我们没有办法做精确地计算,因为你比如像刚刚说的那些数据结构,实际上因为有Spill的机制来做保护,因此并不是数据分片大于这些数据结构就会立即OOM。 咱们本讲推荐的计算方法,实际上是一种保守的计算不同内存区域的估算方法,目的是让不同区域分配相对均衡,避免潜在的OOM隐患。给出的估算公式:#Execution = #threads * #dataset / #N;更多地是从平衡和稳定性的角度出发,因为在内存区域与数据集大小配比均衡的情况下,性能往往不会太差~

    2021-05-20
    3
  • 老师,结合这一节内容和之前的,自定义数据结构其实和hive中的表关系不大,不涉及自定义,那么这时候是不是就可以把存储自定义这部分的内存匀出来给到统一内存身上

    作者回复: 对,如果没有自定义的数据结构,可以把spark.memory.fraction调高,多给Spark的执行任务和缓存任务分配些内存空间。 毕竟,没有自定义结构,User memory留着也是浪费。

    2021-04-16
    9
    2
收起评论
显示
设置
留言
19
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部