作者回复: 好问题,我们换个角度讨论这个问题。你的问题其实是:为什么广播变量可以复用,但是User Memory中的数据却不行。 想要复用数据,前提是有“一个人”或是“一个地方”,明确记录了需要复用的这份数据,存储在什么地方,只有这样,后面的任务才能复用这份数据,这样的信息,又叫元数据、元信息。 这就好比,你去宜家买家具,你是需要根据家具的Id,去仓库中自提货物的。家具Id上明确标记了,你需要的货物,存在哪个货架、哪个层、哪个位置。如果没有这样的元信息,偌大的宜家家居超市,你不可能找到你需要的东西。 广播变量这个“货物”的具体地址,BlockManager会帮忙记录,所以“后来的人”(Task),如果需要访问广播变量,它能从BlockManager那里迅速地知道广播变量存储在哪里(哪个executor的什么位置),可以迅速地访问数据。 但是,User Memory也好,Storage、Execution memory也罢,他们本质上就是JVM heap。JVM heap虽然也会记录对象引用对应的存储地址,但是,它没有办法区分,两份数据,在内容上,是不是一样的。实际上,JVM也没有这样的义务。 因此,同一个Task当中两份完全一样的数据,到了JVM那里,它并不知道:“OK,这两份数据一样,后来的人只要访问已有的数据就OK了”。对他来说,这是两份不同的数据,即便内容都一样,他照样会单独花费存储空间来存储。根本原因在于,他并没有Spark的运行时上下文, 不能像BlockManager那样维护全局的元数据。
作者回复: 对,没错,文中说的数据集大小是内存中的数据集。 这块咱们有过介绍哈~ 最精确的办法: val df: DataFrame = _ df.cache.count val plan = df.queryExecution.logical val estimated: BigInt = spark .sessionState .executePlan(plan) .optimizedPlan .stats .sizeInBytes 就是先Cache,再查看物理执行计划,可以获得数据集在内存中存储大小。
作者回复: 都是非常好的问题。 问题一:你说的没错,不管是哪片内存区域,实际上都是JVM Heap的一部分。回答你的问题:如果User Memory空间不足,但是Spark Memory(Storage + Execution)空闲,会不会OOM?对于这种情况,即使你自定的数据结构其实超过了User Memory,但实际上,并不会立即报OOM,因为Spark对于堆内内存的预估,没有那么精确。这里有些Tricky,对于Spark划定的各种“线”,也就是通过配置项设置的不同区域的百分比,它类似于一种“软限制”,也就是建议你遵守它划出的一道道“线”,但如果你没有遵守,强行“跨线”,只要其他区域还有空间,你真的抢占了,Spark也不会立即阻止你。这就好比“中国式”过马路,红绿灯是一种“软限制”,即使是红灯,只要没有机动车通行,咱们照样可以凑一堆人过马路,但如果突然窜出来一辆车,把咱撞了,那就是咱们理亏,因为我们是“闯红灯”的那一方。Spark的“线”也类似,是一种“软限制”。但是,“出来混迟早是要还的”,你的自定义数据结构,占了本该属于Spark Memory的地盘,那么Spark在执行任务或是缓存的时候,很有可能就跟着倒霉,最后(加重加粗)“看上去”是执行任务或是缓存任务OOM,但本质上,是因为你的自定义数据结构,提前占了人家的地方。这种时候,你会很难debug。这也就是为什么Spark要求开发者要遵循各个配置项的设置,不要“越线”。 问题二:还是User Memory,那个map就是自定义数据结构,它会消耗User Memory。你的那种用法,本质上就是闭包。
作者回复: 好问题,是这样的。 其实这里的关键因素,是谁先谁后的问题,一旦确定了哪个为先,我们就可以参考14讲“三足鼎立”的思路,去调整相关配置项。我们可以分类讨论。 比如在大厂,很多时候,硬件资源是受限的,因此你能拿到多少计算资源,其实是有quota的,尤其是在大厂多个团队共享一个超大集群的情况下。这个时候,你就得让数据去迁就计算资源,那这就意味着,Executors相关的参数,得先定,然后再去倒推并行度相关的参数。 相反,如果你是初创,或者团队独享分布式集群,说白了就是“家里有矿”。这个时候,你可以让计算资源去迁就数据,也就是,先定数据相关的,比如并行度相关参数,然后,再去定Executors相关的并发度、内存大小,等等。 总之,case by case,核心就是谁先谁后,不同的场景,结合实际情况,由你来决定谁优先调整,谁是被动地跟着调整。
作者回复: 标准答案了,💯。你说的这个方法很有参考价值,虽然是粗略估算,但对于大多数情况来说,其实八九不离十了~ 非常赞,看上去“粗略”,但很实用~
作者回复: 有不少类库可以“拿来即用”,比如Java类库:Instrumentation,或者第三方库,比如RamUsageEstimator,可以多搜一搜,这样的工具还是蛮多的~
作者回复: 咱们先说“内存膨胀系数”,我们可以根据Spark UI,来估算磁盘上的数据,load到内存之后,会膨胀多少倍。具体怎么做呢? 打开Spark UI,找到某一个Stage,任何一个都可以,然后进入Description页面,里面会有一些详细的metrics,其中有两个非常的重要,就是我们拿来粗略计算“内存膨胀系数”用的。也就是Shuffle spill (memory) 和Shuffle spill (disk),我们说的“内存膨胀系数”,就是二者的商,也就是: Memory Expansion Ratio = Shuffle spill (memory) / Shuffle spill (disk) 原理其实特别简单,这两个metrics,针对的是同一份数据进行统计的,只不过一个是在内存中的大小,一个是在磁盘上的大小,因此两者的商,就是内存膨胀系数。 有了这个“内存膨胀系数”,结合你磁盘上的分布式数据集,以及哪些需要执行、哪些需要缓存,你就可以很方便的计算出,他们在内存中的大小,从而相应地去设置Storage Memory和Execution Memory。 当然,这是一种粗略的办法,不过虽然粗略,但是很高效。拿到这个系数之后,对于数据在内存中的占用,估算起来就很方便了。
作者回复: 好问题,确实,并发度、并行度、执行内存,三者“三足鼎立”。并发度和执行内存,都是一开始就设置好了,在整个作业的执行过程中,都是一样的,不会改变。但是,并行度不是,并行度可以随着作业的进展,随时调整。 所以,在不同的Stages内,为了维持3者的平衡,我们可以通过调整并行度,来维持“三足鼎立”的平衡。那么问题来,在不同的Stages,咋调整并行度呢? 有两种办法: 1. spark.sql.shuffle.partitions 通过spark.conf.set(spark.sql.shuffle.partitions, XXX)来控制每个Stages的并行度大小。这么做有个缺点,就是当你Stages比较多得时候,这个配置项频繁地改来改去,应用的维护成本会很高。 2. AQE的自动分区合并,也就是让Spark自动帮你算合适的并行度。但是这个需要相关的配置项,比如你期望的每个分片大小,等等。配置项的具体细节可以参考“配置项”的第二讲哈~
作者回复: 对,你说的没错,执行内存所消耗的,就是PartitionedPairBuffer、PartitionedAppendOnlyMap这些数据结构,以及排序等操作所必需的内存空间。对于Execution Memory的计算,其实咱们说的一直都是估算,因为我们没有办法做精确地计算,因为你比如像刚刚说的那些数据结构,实际上因为有Spill的机制来做保护,因此并不是数据分片大于这些数据结构就会立即OOM。 咱们本讲推荐的计算方法,实际上是一种保守的计算不同内存区域的估算方法,目的是让不同区域分配相对均衡,避免潜在的OOM隐患。给出的估算公式:#Execution = #threads * #dataset / #N;更多地是从平衡和稳定性的角度出发,因为在内存区域与数据集大小配比均衡的情况下,性能往往不会太差~
作者回复: 对,如果没有自定义的数据结构,可以把spark.memory.fraction调高,多给Spark的执行任务和缓存任务分配些内存空间。 毕竟,没有自定义结构,User memory留着也是浪费。