• 慢慢卢
    2021-06-19
    任务调度的时候不考虑可用内存大小吗

    作者回复: 好问题~ 我们分资源调度和任务调度两种情况来说。 Spark在做任务调度之前,SchedulerBackend封装的调度器,比如Yarn、Mesos、Standalone,实际上已经完成了资源调度,换句话说,整个集群有多少个containers/executors,已经是一件确定的事情了。而且,每个Executors的CPU和内存,也都是确定的了(因为你启动Spark集群的时候,使用配置项指定了每个Executors的CPU和内存分别是多少)。资源调度器在做资源调度的时候,确实是同时需要CPU和内存信息的。 资源调度完成后,Spark开始任务调度,你的问题,其实是任务调度范畴的问题。也就是TaskScheduler在准备调度任务的时候,要事先知道都有哪些Executors可用,注意,是可用。也就是TaskScheduler的核心目的,在于获取“可用”的Executors。 现在来回答你的问题,也就是:为什么ExecutorData不存储于内存相关的信息。答案是:不需要。一来,TaskScheduler要达到目的,它只需知道Executors是否有空闲CPU、有几个空闲CPU就可以了,有这些信息就足以让他决定是否把tasks调度到目标Executors上去。二来,每个Executors的内存总大小,在Spark集群启动的时候就确定了,因此,ExecutorData自然是没必要记录像Total Memory这样的冗余信息。 再来说Free Memory,首先,我们说过,Spark对于内存的预估不准,再者,每个Executors的可用内存都会随着GC的执行而动态变化,因此,ExecutorData记录的Free Memory,永远都是过时的信息,TaskScheduler拿到这样的信息,也没啥用。一者是不准,二来确实没用,因为TaskScheduler拿不到数据分片大小这样的信息,TaskScheduler在Driver端,而数据分片是在目标Executors,所以TaskScheduler拿到Free Memory也没啥用,因为它也不能判断说:task要处理的数据分片,是不是超过了目标Executors的可用内存。 综上,ExecutorData的数据结构中,只保存了CPU信息,而没有记录内存消耗等信息。不知道这些能不能解答你的问题?有问题再聊哈~

    共 2 条评论
    50
  • Geek_d794f8
    2021-03-25
    老师,数据尽量不动,比如有部分数据在节点A,那么移动计算难道不是要在节点A上启动Excutor才可以进行计算吗?但是Excutor不是在申请资源的时候就确定了在哪几个节点上启动Excutor吗?老师请指教一下

    作者回复: 好问题,是这样的,资源调度和任务调度是分开的。 资源调度主要看哪些节点可以启动executors,是否能满足executors所需的cpu数量要求,这个时候,不会考虑任务、数据本地性这些因素。 资源调度完成之后,在任务调度阶段,spark负责计算每个任务的本地性,效果就是task明确知道自己应该调度到哪个节点,甚至是哪个executors。最后scheduler Backend会把task代码,分发到目标节点的目标executors,完成任务调度,实现数据不动代码动。 所以,二者是独立的,不能混为一谈哈~

    共 6 条评论
    21
  • L3nvy
    2021-03-24
    1. 位置信息通过特定的字符串前缀格式标识 executor_[hostname]_[executorid] [hostname] hdfs_cache_[hostname] DAGScheduler会尝试获取RDD的每个Partition的偏好位置信息,a.如果RDD被缓存,通过缓存的位置信息获取每个分区的位置信息;b.如果RDD有preferredLocations属性,通过preferredLocations获取每个分区的位置信息;c. 遍历RDD的所有是NarrowDependency的父RDD,找到第一个满足a,b条件的位置信息 DAGScheduler将生成好的TaskSet提交给TaskSetManager进行任务的本地性级别计算 2. 感觉像是Spark on Kubernetes这种场景 应该和相关存储配置有关;不太了解,猜想的话。如果是配置的Spark中间过程使用的存储是分布式存储,Node Local应该不成立;如果就是单个容器的内部空间,或者挂载到主机上的空间,应该可以成立

    作者回复: 第一题给满分💯,看答案就知道认真去读源码了,赞一个~ 第二题也答对了。第二题,再想想,还有其他cases吗?

    共 3 条评论
    17
  • 斯盖丸
    2021-04-18
    老师,我这是二刷您的课程了,但我想说课程的例子没看懂。第二种用部分函数的例子里,是节约了哪步操作呢?读文件应该只要Driver读一次就够了。但是zipWithIndex生成的map呢,由于没有把它广播出去,那应该还是每个task都会被拷贝一份全量的map吧。我这样的理解对吗?如果是对的,那感觉性能提升也不应该那么明显吧…

    作者回复: 非常好,思考的很深入~ 你说的没错,由于map没有用广播,所以每个task都会携带这个map,有额外的网络和内存存储开销。 但是,你要看跟谁比,跟广播比,第二种写法确实不如广播,但如果你跟第一种实现比,性能提升会非常明显。在第一种实现下,task不会携带map,而是在Executor临时去读文件、临时创建那个map,这个重复的计算开销,远大于task分发携带map带来的网络和内存开销。 简言之, 你说的非常对,不过咱们这一讲要强调的关键是调度系统,因此后来并没有用广播进一步优化,讲道理来说,一定是广播的实现方式是最优的。

    共 6 条评论
    16
  • 小学生敬亭山
    2021-03-26
    老师正例这个,先建map,再broadcast map 是不是一样的逻辑

    作者回复: 好思路,完全没问题~

    
    9
  • wow_xiaodi
    2021-07-20
    老师,请问对于第一种函数的写法和调用,为何是每个executor只处理一次,而不是对RDD里的每一条数据都去运行一遍函数,然后都加载一次map呢?请问这个函数在spark内核里如何解析和运作的呢,他如何知道里面有个map只去初始化一次,而不是每条数据都运行一次呢?

    作者回复: 非常好的问题~ 你说的是对的,这里是我没有说清楚。不管是实现方式1、还是实现方式2,都是RDD里的每一条数据都去运行一次函数。原文中“这意味着集群中的每一个 Executors 都需要执行函数中封装的两个计算步骤”这个表述,是不准确的。准确的说法,应该是像你说的,每条数据,都需要执行这两个计算步骤。感谢老弟的提醒和纠正~ 你说的是对的,就按你自己的思路来理解就好~ 另外,要想让每个Executor只处理一次,那么咱们就只能依赖广播变量了,只有广播变量才能做到这一点。

    
    6
  • Z宇锤锤
    2021-04-11
    /** * Create a TaskLocation from a string returned by getPreferredLocations. * These strings have the form executor_[hostname]_[executorid], [hostname], or * hdfs_cache_[hostname], depending on whether the location is cached. */ 终于找到了榜一所说的location信息

    作者回复: 666,不容易,赞坚持不懈~ 👍

    
    6
  • 来世愿做友人 A
    2021-03-24
    第一题:因为是为每个 partition 建立一个task,所以在建立task之前,都会获取每个partition的位置偏好信息。首先判断 rdd 是否被缓存过,通过 rddId + splitIndex 组合成 blockId 判断。如果没有,判断preferredLocations,看起来是判断是否 checkpoint 过。如果还没有,向上获取父rdd,如果是窄依赖,循环上面的判断逻辑。这里想问个问题,代码里直到task分发,似乎没有看到关于shuffle的位置偏好。比如中间有个shuffle过程,shuffle结果写在磁盘小文件,是不是下个 stage 的 task 应该发到父 stage 的所在 executor 更合适?目前没看到这个逻辑,想问问老师

    作者回复: 如果你说的是shuffle read阶段的locality,咱们换个角度思考这个问题。shuffle map阶段,每个map task把中间文件写到本地盘。shuffle read阶段,每个reduce task需要从集群的所有节点拉数据,走网络。这个过程是由shuffle的实现机制决定的。 因此,从reduce视角看过去,所有中间文件都是它的“数据源”,这些数据源,散落在集群的每一个节点,因此,每个reduce task的locality,在最好的情况下,能做到rack local,最差的时候,那就是any。 正是因为这样,所以我们会强调,竭尽全力避免shuffle。因为它的实现机制,决定了reduce task的计算一定会让数据走网络。 不知道这么说能不能回答你的问题哈~ 要是我理解错了,就再at我,咱们继续讨论哈~

    共 3 条评论
    6
  • 张笑笑
    2021-09-11
    吴老师,您给的这个案例中,第二个实现方式上使用了高阶函数,看了几次,确实还是没明白,为什么使用这种写法,它只在driver端做一次计算?为什么就省去了读取文件,创建字典的开小了,迷惑中...

    作者回复: 其实不用特别在意高阶函数,它的核心作用,其实是创建出一个包含了查询字典的函数对象。真正分发到Executors的代码,是这个包含了字典的函数对象。 而创建字典的过程,只在Driver端做一次,因为传递给高阶函数第一个参数、生成“带查询字典的函数对象”的动作,只做了一次,所以创建字典的开销,只有在Driver的那一次。

    共 3 条评论
    5
  • Fendora范东_
    2021-04-04
    关于任务调度: 默认情况下,会先调度process local那批tasks;然后依次是node,rack,any。 在调度了最契合locality的tasks后还有空闲executor。下一批task本来是有资源可用的,但最适合执行task的executor已被占用,此时会评估下一批tasks等待时间和在空闲executor执行数据传输时间,如果等待时间大于数据传输则直接调度到空闲executor,否则继续等待。 把wait参数设置为0,则可以不进行等待,有资源时直接调度执行 这块逻辑一直有点乱。磊哥看下哪有问题嘛?

    作者回复: 没问题,就是locality wait,就是有些task是有调度倾向的,preferredLocations。但是,它想要去的executors,可能正在忙,没有空闲cpu。这个时候两个选择,要么,等executors忙完;要么放弃,调度到其他节点或是executors,退而求其次。locality wait默认3s,但是可以调。不过一般3s就行。除非有些io密集型,必须要node local,这个时候,可以适当调大,多等等。 其实就是平衡,等待时间和执行时间的平衡,看你具体场景。

    
    5