作者回复: 好问题,思考的很细致~ 一般来说,在这种情况,我们倾向于“广撒网”,就是Executors个数多一些,每个Executors的资源相对小一些,比较轻量。 这么做的目的,主要是保证分布式计算的鲁棒性与扩展性,轻量Executors失败、重建的开销,要小于重量级的Executors。尤其是在开启了Dynamic Allocation的情况下,更是如此。
作者回复: 都是很好的问题~ 没事,不用怕字数多,像这样的讨论多多益善~ 咱们一个一个来说。 i)先说执行内存的上限,你也说了,是Execution Memory、外加Storage Memory空闲的部分。再说内存抢占,这两部分内存的消耗,一定是优先消耗Execution Memory,然后再去抢占Storage Memory,而且,抢占的时候,还只能抢Storage Memory空闲的部分(Storage Memory的空间也是动态变化的,比如中间有缓存任务往Storage Memory里面灌数据)。这块其实可以想想黄小乙的占地协议,他必须先把自己的地种满,然后才能去抢人家的地,不能说,一上来,就把人家麻子的地给占了,那也不公平嘛!再者,关于任务的内存消耗,思考的很深入,挺好~ 我们要区分两个概念,一个,是Task可以获取到的内存上限、也就是1/N;一个,是Task计算对于内存的消耗需要。换句话说,一个是供给上限,一个是计算需求。内存的分配,是按照需求来分配的,也就是说,你需要多少,就逐步给你分配多少。并不是说,你的上限是120MB,Memory Manager一下子就把这120MB全部划给你、预留给你了。1/N只是说明,你最多可以拿这么多,但是你具体消耗多少,取决于你手里有多少农作物的种子。这就好比,你手里就1亩地的种子,却想从黄小乙那里租赁10亩地,那黄小乙也不能同意呀!剩下的9亩地,不是白白浪费掉了么。 ii)好像已经把问题ii)回答了,哈哈,就是你说的那样,消耗300MB,还剩60MB。如果Task2到了、Task3还没到,就是你算的结果60MB/2 = 30MB。 iii)这个确实是个好问题,不过这个问题也暴露了你调度系统那一讲没吃透 😃,这部分其实要结合调度系统去理解。还记得DAGScheduler、TaskScheduler和SchedulerBackend吗?Executors的空闲状态,是由SchedulerBackend维护的,TaskScheduler拿到DAGScheduler拆解的Tasks之后,开始去SchedulerBackend那里尝试拿ResourceOffer,然后再从Tasks里面去挑,挑那些满足本地性级别要求的任务,然后再调度过去。你可以把DAGScheduler理解为任务执行的需求端,把SchedulerBackend理解成计算资源的供给端,而TaskScheduler就是中间的媒人、中介、撮合商。回答你的问题,首先,不是Driver一股脑把Tasks都分发过去,因为Tasks的执行需求,要先看SchedulerBackend有没有足够的资源供给。实际上,这个流程恰恰相反,是SchedulerBackend先提供ResourceOffer,然后TaskScheduler才能去挑Tasks,然后把任务调度过去。所以实际的调度情况,和你后者说的差不多,也就是你说的:“按照Executor的执行情况去发送Task”。任务调度确实比较难,除了前面的第5讲《调度系统》,还可以看看infoQ的这篇,这篇《权力的游戏》更详尽:https://www.infoq.cn/article/5aOHzQIaXX6NlHriLtSI
作者回复: 好问题,你说的是对的,这部分buffer也算作Execution Memory的一部分,也会记到Execution Memory的“账上”。因此,如果像你说的这种edge case,连48M都没有了,那拉数据的时候确实会OOM。这部分就没有1/N的关系了哈,每个task都是这么大的buffer,一旦不够,也就OOM了。
作者回复: 正解~ 赞~ 👍 老弟对于执行内存的(1/N/2,1/N)理解得相当到位~
作者回复: 老弟说的生产环境,我理解是物理上的分布式集群哈。如果是真正的分布式集群,那肯定就不是Driver在读数据。坦白地说,你说的这种情况,我觉得挺蹊跷的,把Driver端调大,就能解决问题,但是,把需要扫描的数据量缩小,问题也不见了。所以听上去,是数据量太大,可问题是,Driver并不需要扫描数据,需要扫描的数据量多少,也不会给Driver增减负担,所以这个就很奇怪了。结合你的描述:“每次任务启动都一直初始化,有的时候还超时失败,任务一只run不起来”,倒是有一种可能,就是DAG过于复杂(几十个表关联),导致任务调度失败(比如Task过大,Task分发超时,等等),不过这个需要老弟确认,你说的超时,具体是什么超时?Task分发超时,还是心跳连接超时?有了这些信息,更好判断一些~
作者回复: 好问题~ 其实这两个问题,都可以归结为一个问题,就是JVM堆内内存的使用与占用预估。其实课程中咱们提到过,就是Spark对于JVM堆内内存的估计是不够准确的,这主要是为了牺牲一定的精度,而照顾效率上的考虑。 而堆外内存的计算,就不存在这样的隐患,堆外内存的计算,是非常精确的。因此,JVM堆内内存的OOM,相比堆外会更加的频发。 Task之间的内存争抢,在“逻辑上”会尊重(M/N/2, M/N)的限制,但实际在JVM里面,不同Task所占用的内存,是没有明确边界的。再加上Spark对于内存预估的不准确,OOM也就在所难免。 Spill,仅仅是在Spark的预估值明确表示内存不足的时候,才会触发。而很多时候,OOM的根源在于,“看上去”内存是够的,但真要用的时候,发现不够。 除了预估不准以外,一些内存数据结构的扩展,也会触发OOM。比方说,AppendOnlyMap、PartitionedPairBuffer,他们在spill之前,会有一次扩容,在内存中的扩容,目的就是为了承载更多的用户数据。如果扩容的过程中,发现内存不足,那就只能OOM了。因为这里没有Spill的逻辑,纯粹的内存扩容。 说得不够系统,头上一句、腚上一句,希望对老弟有所帮助~
作者回复: 1. 如果并发度是1,单个Executor就是纯粹的串行计算了,Spark“分布式”计算引擎的并行计算就失去了意义。即便还是可以有多个Executors,但是如果每个Executor在同一时间只有一个线程在工作,这种“分布式”计算的效率也是极其低下的。 2. 好问题,确实有这个隐患。不过呢,因为任务执行涉及的对象都是short-lived,也就是生命周期都很短,不像Storage Memory存储的缓存对象、生命周期很长。因此,执行任务涉及到的对象,多是放在年轻代,Minor GC的触发会频繁一些,回收效率也更高。不像long-lived objects,往往需要触发Full GC才能回收。因此,即便有还未回收的内存,也不影响任务执行,因为Minor GC频繁、且轻量,能够相对及时地把内存回收。
作者回复: 你说的对,如果Spark可以保证依次运行这3个Tasks,确实不会OOM。不过,这里的关键是“依次”,当Executors线程池大于1的时候,Spark不能保证“依次”处理task。Task确实有先来后到,但是要是严格保证“依次”可就难了。 把spark.executor.cores置为1,就是咱们这讲举的极端的例子,Spark可以严格保证“依次”,不过就像咱们说的,它其实已经失去并行计算的意义了。
作者回复: 1. 不完全对,task1之所以能拿到,是因为它“到的早”,它刚来的时候N=1,所以它最多可以拿到整个执行内存。task2、task3不行,是因为他们来得晚,他们到的时候,N至少等于2了,所以“每人”最多拿执行内存的一半,所以肯定会OOM。这里其实有点tricky,主要想考大家对于(1/N/2,1/N)的理解是不是足够透彻。 2. 对,driver OOM调整maxResultSize比较典型。学过这一讲,Executors内存调优可以做得更细致一些了,就不必每次都是单纯调大executor.memory~
作者回复: 好问题,老弟看的非常仔细、认真,赞一个先👍,咱们一个个说 1)是的,如果一个core,要跑两个task,就设置0.5,不过这个需要每个core切实可以超线程到至少两个线程,物理上才可行 2)先说结论,spark.executor.cores设置为1,是没问题的,只要Executors数量足够多,也可以充分利用硬件资源,做到并行计算。我又回顾了下原文,原文的说法,确实有问题,漏了一句话。就是在咱们OOM数据倾斜的例子里,本意是说,在一个物理机的那一个Executor中,把spark.executor.cores设置为1,那么那个Executor就从并行计算,退化到串行计算,因为只有一个cpu嘛。但是,实际上一个物理机,可以起多个Executors,每个Executors只有一个cpu,也是OK的。只不过我们这里,为了说明OOM的root cause,把场景和问题都简化了,就假设一个物理机,只起一个Executor,串行起来的话,就不会有倾斜导致的OOM问题。总结来说,在实际工业部署中,一个集群,多个Executors,每一个Executors一个CPU core,是完全OK的。 3)我看了下,这里确实有矛盾的地方,可能当时回答“狗哭”同学的时候,比较草率了。这里double confirm下,给“To_Drill”的答复是正确的哈,参考后者的答复即可~ 挂起,只存在于task启动前(拿不到N/2),一旦拿到N/2内存,task即开始执行,执行过程中,如果内存申请持续得不到满足,就抛OOM