• wow_xiaodi
    2021-08-07
    老师,从第一讲看到这里,貌似有个东西还没介绍,假设并行度、executor core和每个core均摊的execution memory都估算好了,那么我要几个executor,每个executor几个core好呢,就是说我executor多,然后每个executor的core少点,还是反过来好呢,是否有经验之谈?

    作者回复: 好问题,思考的很细致~ 一般来说,在这种情况,我们倾向于“广撒网”,就是Executors个数多一些,每个Executors的资源相对小一些,比较轻量。 这么做的目的,主要是保证分布式计算的鲁棒性与扩展性,轻量Executors失败、重建的开销,要小于重量级的Executors。尤其是在开启了Dynamic Allocation的情况下,更是如此。

    
    15
  • 苏子浩
    2021-04-27
    老师好,我有一下三个问题: (i)关于Task获取内存方面中“每个线程分到的可用内存有一定的上下限,下限是 M/N/2,上限是 M/N,也就是属于[M/(2*N), M/N].”,同时注意到上述的M和N其实是随着Executor中task的状态动态变化的,根据前文提到的“执行内存总量M动态变化,由于Execution Memory可以占用Storage Memory以及抢占的优先级,所以ExecutionMemory的下限是 Execution Memory初始值,上限是 spark.executor.memory * spark.memory.fraction”和“N~是Executor 内当前的并发度”。但是在具体的例子中怎么task的内存分配我不是很理解。比如本文中提到的“实例 1:数据倾斜”所提到的“Executor 线程池大小为 3,因此每个 Reduce Task 最多可获得 360MB * 1 / 3 = 120MB 的内存空间。Task1、Task2 获取到的内存空间足以容纳分片 1、分片 2,因此可以顺利完成任务。”我不理解的是,此时的Task是以“一批”的形式同时进入Executor吗?所以是“360MB/3=120MB”。为什么不是Task1‘到的早’,它刚来的时候N=1,所以它最多可以拿到整个执行内存,即360MB呢?但是Task1实际只需要100MB,所以分100MB给Task1。此时可用的‘动态执行内存总量’变成260MB(如果是这样,那么接着的内存构成是:(1)80MBStorage Memory + 180Execution Memory还是(2)180MBStorage Memory + 80MBExecution Memory?i.e.其实我想问的是在内存分配上的优先级是怎么分配?先去‘贪心’地抢先占用Storage Memory等用完以后再使用Execution Memory还是分配Execution Memory再开始用Execution Memory?根据您的黄小己招租种地的规则应该是先去占用自己所属的部分的内存吧?) (ii)同时,实例二:数据膨胀例子中,“task1之所以能拿到300MB,是因为它“到的早”,它刚来的时候N=1,所以它最多可以拿到整个执行内存。“那么task1实际是拿到了多少内存呢?是300MB还是360MB?是按需分配还是给360MB?如果是分配了300MB,那么此时“动态执行内存总量”变成了多少呢,是60MB吗?那么此时Task2进入时,假设Task3还没有进入,N等于2了,所以Task2分配到的执行是60 / 2 = 30MB吗? (iii)Executor中的Task是同时进入的吗?我的意思是Driver是否会一次性生成所有的的Task,并将Task全部都发送到Executor去执行?还是Driver不完全发送所有Task,根据Executor的并发度(基本上取决于Executor的cores个数)去发送,按照Executor的执行情况去发送Task,执行完一个Task再发送一个Task?虽然之前有提到其实Task本身是自带任务调度意愿的。 打了很多字,确实自己没有想明白,麻烦老师了,不好意思!谢谢!
    展开

    作者回复: 都是很好的问题~ 没事,不用怕字数多,像这样的讨论多多益善~ 咱们一个一个来说。 i)先说执行内存的上限,你也说了,是Execution Memory、外加Storage Memory空闲的部分。再说内存抢占,这两部分内存的消耗,一定是优先消耗Execution Memory,然后再去抢占Storage Memory,而且,抢占的时候,还只能抢Storage Memory空闲的部分(Storage Memory的空间也是动态变化的,比如中间有缓存任务往Storage Memory里面灌数据)。这块其实可以想想黄小乙的占地协议,他必须先把自己的地种满,然后才能去抢人家的地,不能说,一上来,就把人家麻子的地给占了,那也不公平嘛!再者,关于任务的内存消耗,思考的很深入,挺好~ 我们要区分两个概念,一个,是Task可以获取到的内存上限、也就是1/N;一个,是Task计算对于内存的消耗需要。换句话说,一个是供给上限,一个是计算需求。内存的分配,是按照需求来分配的,也就是说,你需要多少,就逐步给你分配多少。并不是说,你的上限是120MB,Memory Manager一下子就把这120MB全部划给你、预留给你了。1/N只是说明,你最多可以拿这么多,但是你具体消耗多少,取决于你手里有多少农作物的种子。这就好比,你手里就1亩地的种子,却想从黄小乙那里租赁10亩地,那黄小乙也不能同意呀!剩下的9亩地,不是白白浪费掉了么。 ii)好像已经把问题ii)回答了,哈哈,就是你说的那样,消耗300MB,还剩60MB。如果Task2到了、Task3还没到,就是你算的结果60MB/2 = 30MB。 iii)这个确实是个好问题,不过这个问题也暴露了你调度系统那一讲没吃透 😃,这部分其实要结合调度系统去理解。还记得DAGScheduler、TaskScheduler和SchedulerBackend吗?Executors的空闲状态,是由SchedulerBackend维护的,TaskScheduler拿到DAGScheduler拆解的Tasks之后,开始去SchedulerBackend那里尝试拿ResourceOffer,然后再从Tasks里面去挑,挑那些满足本地性级别要求的任务,然后再调度过去。你可以把DAGScheduler理解为任务执行的需求端,把SchedulerBackend理解成计算资源的供给端,而TaskScheduler就是中间的媒人、中介、撮合商。回答你的问题,首先,不是Driver一股脑把Tasks都分发过去,因为Tasks的执行需求,要先看SchedulerBackend有没有足够的资源供给。实际上,这个流程恰恰相反,是SchedulerBackend先提供ResourceOffer,然后TaskScheduler才能去挑Tasks,然后把任务调度过去。所以实际的调度情况,和你后者说的差不多,也就是你说的:“按照Executor的执行情况去发送Task”。任务调度确实比较难,除了前面的第5讲《调度系统》,还可以看看infoQ的这篇,这篇《权力的游戏》更详尽:https://www.infoq.cn/article/5aOHzQIaXX6NlHriLtSI

    
    14
  • sky_sql
    2021-04-21
    老师好!麻烦问下 Shuffle 文件寻址有个参数spark.reducer.maxSizeInFlight 默认48m,这个buffer缓冲每次拉取48m数据。是Execution Memory剩余部分不够48m就会oom吗?这个和1/N的有啥关联?

    作者回复: 好问题,你说的是对的,这部分buffer也算作Execution Memory的一部分,也会记到Execution Memory的“账上”。因此,如果像你说的这种edge case,连48M都没有了,那拉数据的时候确实会OOM。这部分就没有1/N的关系了哈,每个task都是这么大的buffer,一旦不够,也就OOM了。

    
    8
  • 王天雨
    2021-04-21
    1、执行内存总量是动态变化的,最大是spark.executor.memory * spark.memory.fraction 本例中最大360M。 其次并发度N是固定不变,但是Executor中当前并行执行的任务数是小于等于N的, 上下限公式的计算是根据Executor中当前并行执行的并发度来计算的。 因此先拿到任务的线程能够申请更多的资源,极端情况下,本例单个Task可享受360M内存。

    作者回复: 正解~ 赞~ 👍 老弟对于执行内存的(1/N/2,1/N)理解得相当到位~

    共 4 条评论
    7
  • 在路上
    2021-12-03
    老师好,请教下关于driver端oom问题,生产中遇到过这样问题,数据表数据量大,每次扫描近一年分区,几十个表关联场景,每次任务启动都一直初始化,有的时候还超时失败,任务一只run不起来,我记得没修改逻辑的时候是把driver调到100多g解决的。后续把代码扫描分区缩短了没有了,想问一下这种情况driver在计算分片吗?为啥这么久啊。

    作者回复: 老弟说的生产环境,我理解是物理上的分布式集群哈。如果是真正的分布式集群,那肯定就不是Driver在读数据。坦白地说,你说的这种情况,我觉得挺蹊跷的,把Driver端调大,就能解决问题,但是,把需要扫描的数据量缩小,问题也不见了。所以听上去,是数据量太大,可问题是,Driver并不需要扫描数据,需要扫描的数据量多少,也不会给Driver增减负担,所以这个就很奇怪了。结合你的描述:“每次任务启动都一直初始化,有的时候还超时失败,任务一只run不起来”,倒是有一种可能,就是DAG过于复杂(几十个表关联),导致任务调度失败(比如Task过大,Task分发超时,等等),不过这个需要老弟确认,你说的超时,具体是什么超时?Task分发超时,还是心跳连接超时?有了这些信息,更好判断一些~

    
    5
  • 冯杰
    2021-10-08
    老师好,文章中提到:“Task3 的数据分片大小远超内存上限,即便 Spark 在 Reduce 阶段支持 Spill 和外排,120MB 的内存空间也无法满足 300MB 数据最基本的计算需要,如 PairBuffer 和 AppendOnlyMap 等数据结构的内存消耗,以及数据排序的临时内存消耗等等。” 关于上述这段话有点疑问: 1、shuffle read 阶段,reduce task去fetch数据时,是可以支持spill到磁盘的。 但在实际工作中,经常出现fetch fail的异常,增加内存后或者同等内存下换为堆外内存也可以解决问题; 2、为什么支持spill操作,还会导致OOM呢? 看老师的解答是需要一个最基础的内存需求,比如:300MB的数据需要120M+50M内存,这块儿不是很明白

    作者回复: 好问题~ 其实这两个问题,都可以归结为一个问题,就是JVM堆内内存的使用与占用预估。其实课程中咱们提到过,就是Spark对于JVM堆内内存的估计是不够准确的,这主要是为了牺牲一定的精度,而照顾效率上的考虑。 而堆外内存的计算,就不存在这样的隐患,堆外内存的计算,是非常精确的。因此,JVM堆内内存的OOM,相比堆外会更加的频发。 Task之间的内存争抢,在“逻辑上”会尊重(M/N/2, M/N)的限制,但实际在JVM里面,不同Task所占用的内存,是没有明确边界的。再加上Spark对于内存预估的不准确,OOM也就在所难免。 Spill,仅仅是在Spark的预估值明确表示内存不足的时候,才会触发。而很多时候,OOM的根源在于,“看上去”内存是够的,但真要用的时候,发现不够。 除了预估不准以外,一些内存数据结构的扩展,也会触发OOM。比方说,AppendOnlyMap、PartitionedPairBuffer,他们在spill之前,会有一次扩容,在内存中的扩容,目的就是为了承载更多的用户数据。如果扩容的过程中,发现内存不足,那就只能OOM了。因为这里没有Spill的逻辑,纯粹的内存扩容。 说得不够系统,头上一句、腚上一句,希望对老弟有所帮助~

    
    4
  • 快跑
    2021-04-21
    老师好! 实例1中 1、为什么提到“线程池大小设置为 1 是不可取的”? 2、假如spark.executor.cores设置成 1 ,有3个Task,串行。 第一个Task执行完成后,360M的内存会全部都释放么?会不会有垃圾还没有回收的情况,导致Task2的内存没有360M可以

    作者回复: 1. 如果并发度是1,单个Executor就是纯粹的串行计算了,Spark“分布式”计算引擎的并行计算就失去了意义。即便还是可以有多个Executors,但是如果每个Executor在同一时间只有一个线程在工作,这种“分布式”计算的效率也是极其低下的。 2. 好问题,确实有这个隐患。不过呢,因为任务执行涉及的对象都是short-lived,也就是生命周期都很短,不像Storage Memory存储的缓存对象、生命周期很长。因此,执行任务涉及到的对象,多是放在年轻代,Minor GC的触发会频繁一些,回收效率也更高。不像long-lived objects,往往需要触发Full GC才能回收。因此,即便有还未回收的内存,也不影响任务执行,因为Minor GC频繁、且轻量,能够相对及时地把内存回收。

    共 2 条评论
    4
  • 苏子浩
    2021-04-27
    老师,我想问一下,数据膨胀导致 OOM 的例子中,一定会出现OOM吗?既然Task1 能获取到 300MB 的内存空间,那么挂起Task2线程和Task3线程,等待Task1内存释放,然后依次完成3个Task呢?

    作者回复: 你说的对,如果Spark可以保证依次运行这3个Tasks,确实不会OOM。不过,这里的关键是“依次”,当Executors线程池大于1的时候,Spark不能保证“依次”处理task。Task确实有先来后到,但是要是严格保证“依次”可就难了。 把spark.executor.cores置为1,就是咱们这讲举的极端的例子,Spark可以严格保证“依次”,不过就像咱们说的,它其实已经失去并行计算的意义了。

    
    3
  • Fendora范东_
    2021-04-21
    1.task1首先运行的,拿到自己1/N发现还不够,就继续申请内存。task2/task3后面运行,发现可用内存满足下限,就跑去运行了,结果task1抢先申请到300M,task2,task3在运行时需要更多内存,不能得到满足,导致OOM。 2.driver端oom: 遇到过执行查询SQL,结果集太大,oom。通过调maxResultSize大小来解决。 executor端oom: 之前没细究过,oom了就给executor调大内存就完了。。。。😅

    作者回复: 1. 不完全对,task1之所以能拿到,是因为它“到的早”,它刚来的时候N=1,所以它最多可以拿到整个执行内存。task2、task3不行,是因为他们来得晚,他们到的时候,N至少等于2了,所以“每人”最多拿执行内存的一半,所以肯定会OOM。这里其实有点tricky,主要想考大家对于(1/N/2,1/N)的理解是不是足够透彻。 2. 对,driver OOM调整maxResultSize比较典型。学过这一讲,Executors内存调优可以做得更细致一些了,就不必每次都是单纯调大executor.memory~

    
    2
  • Unknown element
    2022-01-04
    老师您好,我有几个问题: (1)文章中举的例子,“每个 core 有两个线程”是怎么设置的?spark.task.cpus=0.5吗? (2)我看官方文档对于spark.executor.cores的定义是The number of cores to use on each executor. 现在spark.executor.cores=3但是一个机器上只有两个core,那这时候创建executor的资源好像不够?另外您说spark.executor.cores=1就失去并行的意义了,但是我们目前spark.executor.cores就设置的是1(捂脸)运维说这是经过慎重考虑的默认参数,在 https://mp.weixin.qq.com/s/gNxQKTH9JkNsDaBKttvAsQ 这篇文章的 06 规范优化 => 03 参数滥用问题 有提到,不知道老师对这个设置怎么看呢? (3)您对 To_Drill 和 狗哭 两位同学的问题回答感觉有一点矛盾呢~在 狗哭 的问题的回答中您说 “task申请不到额外内存,不得不进入waiting list,等待别的task把内存释放,这个时候,CPU也会挂起”,也就是说task开始计算之后发现内存不够但是又申请不到额外内存就会被挂起,但是在 To_Drill 的问题的回答中您说 “task为了容纳整个数据分片,需要不停地申请内存,如果内存不够,任务不能再挂起了,因为挂起来,内存也不能释放,别的task也进不来,挂起没有意义,所以只能硬着头皮往下执行,直到把内存撑爆为止”,意思应该是task开始计算之后发现内存不够但是又申请不到额外内存这时就直接抛oom?不知道是不是我没理解对... 谢谢老师!!
    展开

    作者回复: 好问题,老弟看的非常仔细、认真,赞一个先👍,咱们一个个说 1)是的,如果一个core,要跑两个task,就设置0.5,不过这个需要每个core切实可以超线程到至少两个线程,物理上才可行 2)先说结论,spark.executor.cores设置为1,是没问题的,只要Executors数量足够多,也可以充分利用硬件资源,做到并行计算。我又回顾了下原文,原文的说法,确实有问题,漏了一句话。就是在咱们OOM数据倾斜的例子里,本意是说,在一个物理机的那一个Executor中,把spark.executor.cores设置为1,那么那个Executor就从并行计算,退化到串行计算,因为只有一个cpu嘛。但是,实际上一个物理机,可以起多个Executors,每个Executors只有一个cpu,也是OK的。只不过我们这里,为了说明OOM的root cause,把场景和问题都简化了,就假设一个物理机,只起一个Executor,串行起来的话,就不会有倾斜导致的OOM问题。总结来说,在实际工业部署中,一个集群,多个Executors,每一个Executors一个CPU core,是完全OK的。 3)我看了下,这里确实有矛盾的地方,可能当时回答“狗哭”同学的时候,比较草率了。这里double confirm下,给“To_Drill”的答复是正确的哈,参考后者的答复即可~ 挂起,只存在于task启动前(拿不到N/2),一旦拿到N/2内存,task即开始执行,执行过程中,如果内存申请持续得不到满足,就抛OOM

    
    1