作者回复: 好问题~ 我们分资源调度和任务调度两种情况来说。 Spark在做任务调度之前,SchedulerBackend封装的调度器,比如Yarn、Mesos、Standalone,实际上已经完成了资源调度,换句话说,整个集群有多少个containers/executors,已经是一件确定的事情了。而且,每个Executors的CPU和内存,也都是确定的了(因为你启动Spark集群的时候,使用配置项指定了每个Executors的CPU和内存分别是多少)。资源调度器在做资源调度的时候,确实是同时需要CPU和内存信息的。 资源调度完成后,Spark开始任务调度,你的问题,其实是任务调度范畴的问题。也就是TaskScheduler在准备调度任务的时候,要事先知道都有哪些Executors可用,注意,是可用。也就是TaskScheduler的核心目的,在于获取“可用”的Executors。 现在来回答你的问题,也就是:为什么ExecutorData不存储于内存相关的信息。答案是:不需要。一来,TaskScheduler要达到目的,它只需知道Executors是否有空闲CPU、有几个空闲CPU就可以了,有这些信息就足以让他决定是否把tasks调度到目标Executors上去。二来,每个Executors的内存总大小,在Spark集群启动的时候就确定了,因此,ExecutorData自然是没必要记录像Total Memory这样的冗余信息。 再来说Free Memory,首先,我们说过,Spark对于内存的预估不准,再者,每个Executors的可用内存都会随着GC的执行而动态变化,因此,ExecutorData记录的Free Memory,永远都是过时的信息,TaskScheduler拿到这样的信息,也没啥用。一者是不准,二来确实没用,因为TaskScheduler拿不到数据分片大小这样的信息,TaskScheduler在Driver端,而数据分片是在目标Executors,所以TaskScheduler拿到Free Memory也没啥用,因为它也不能判断说:task要处理的数据分片,是不是超过了目标Executors的可用内存。 综上,ExecutorData的数据结构中,只保存了CPU信息,而没有记录内存消耗等信息。不知道这些能不能解答你的问题?有问题再聊哈~
作者回复: 好问题,是这样的,资源调度和任务调度是分开的。 资源调度主要看哪些节点可以启动executors,是否能满足executors所需的cpu数量要求,这个时候,不会考虑任务、数据本地性这些因素。 资源调度完成之后,在任务调度阶段,spark负责计算每个任务的本地性,效果就是task明确知道自己应该调度到哪个节点,甚至是哪个executors。最后scheduler Backend会把task代码,分发到目标节点的目标executors,完成任务调度,实现数据不动代码动。 所以,二者是独立的,不能混为一谈哈~
作者回复: 第一题给满分💯,看答案就知道认真去读源码了,赞一个~ 第二题也答对了。第二题,再想想,还有其他cases吗?
作者回复: 非常好,思考的很深入~ 你说的没错,由于map没有用广播,所以每个task都会携带这个map,有额外的网络和内存存储开销。 但是,你要看跟谁比,跟广播比,第二种写法确实不如广播,但如果你跟第一种实现比,性能提升会非常明显。在第一种实现下,task不会携带map,而是在Executor临时去读文件、临时创建那个map,这个重复的计算开销,远大于task分发携带map带来的网络和内存开销。 简言之, 你说的非常对,不过咱们这一讲要强调的关键是调度系统,因此后来并没有用广播进一步优化,讲道理来说,一定是广播的实现方式是最优的。
作者回复: 好思路,完全没问题~
作者回复: 非常好的问题~ 你说的是对的,这里是我没有说清楚。不管是实现方式1、还是实现方式2,都是RDD里的每一条数据都去运行一次函数。原文中“这意味着集群中的每一个 Executors 都需要执行函数中封装的两个计算步骤”这个表述,是不准确的。准确的说法,应该是像你说的,每条数据,都需要执行这两个计算步骤。感谢老弟的提醒和纠正~ 你说的是对的,就按你自己的思路来理解就好~ 另外,要想让每个Executor只处理一次,那么咱们就只能依赖广播变量了,只有广播变量才能做到这一点。
作者回复: 666,不容易,赞坚持不懈~ 👍
作者回复: 如果你说的是shuffle read阶段的locality,咱们换个角度思考这个问题。shuffle map阶段,每个map task把中间文件写到本地盘。shuffle read阶段,每个reduce task需要从集群的所有节点拉数据,走网络。这个过程是由shuffle的实现机制决定的。 因此,从reduce视角看过去,所有中间文件都是它的“数据源”,这些数据源,散落在集群的每一个节点,因此,每个reduce task的locality,在最好的情况下,能做到rack local,最差的时候,那就是any。 正是因为这样,所以我们会强调,竭尽全力避免shuffle。因为它的实现机制,决定了reduce task的计算一定会让数据走网络。 不知道这么说能不能回答你的问题哈~ 要是我理解错了,就再at我,咱们继续讨论哈~
作者回复: 其实不用特别在意高阶函数,它的核心作用,其实是创建出一个包含了查询字典的函数对象。真正分发到Executors的代码,是这个包含了字典的函数对象。 而创建字典的过程,只在Driver端做一次,因为传递给高阶函数第一个参数、生成“带查询字典的函数对象”的动作,只做了一次,所以创建字典的开销,只有在Driver的那一次。
作者回复: 没问题,就是locality wait,就是有些task是有调度倾向的,preferredLocations。但是,它想要去的executors,可能正在忙,没有空闲cpu。这个时候两个选择,要么,等executors忙完;要么放弃,调度到其他节点或是executors,退而求其次。locality wait默认3s,但是可以调。不过一般3s就行。除非有些io密集型,必须要node local,这个时候,可以适当调大,多等等。 其实就是平衡,等待时间和执行时间的平衡,看你具体场景。