• 来世愿做友人 A
    2021-03-22
    问题1: rdd 会有 dep 属性,用来区分是否是 shuffle 生成的 rdd. 而 dep 属性的确定主要是根据子 rdd 是否依赖父 rdd 的某一部分数据,这个就得看他两的分区器(如果 tranf/action 有的话)。如果分区器一致,就不会产生 shuffle。 问题2: 在 task 启动后,会调用 rdd iterator 进行算子链的递归生成,调用 stage 图中最后一个 rdd 的 compute 方法,一般如果是 spark 提供的 rdd,compute 函数大都会继续调用父 rdd 的 iterator 方法,直到到 stage 的根 rdd,一般都是 sourceRdd,比如 hadoopRdd,KakaRdd,就会返回 source iterator。开始返回,如果子rdd 是 map 转换的,就会组成 itr.map(f)。如果再下一个是 filter 转换,就会组成 itr.map(f1).filter(f2),以此类推。不知道这边理解对不对,有点绕

    作者回复: Perfect!答得已经很完美了,不过咱们再进一步,第二题,假设不是rdd API,而是dataframe、dataset,spark对于同一个stage内的算子,会有哪些优化呢?

    共 8 条评论
    42
  • Shockang
    2021-03-23
    正如老师在文章里面提到的一样,Hadoop MapReduce使用硬盘来存储中间结果,而 Spark自从诞生以来就一直标榜自己是内存计算,可能有些同学会比较奇怪,为什么内存明显比硬盘快,MR 不去选择内存计算,实际上 MR 也有在使用内存的,比如环形缓冲区的存在就可以说明,之所以这样做,一个很重要的原因是 MR 诞生的年代(04 年)内存比较贵,后来随着科技发展,内存价格在不断下降,大家如果仔细研究就会发现比如 Spark 比如 Redis 等充分利用内存来计算的框架都是 10 年左右出现的,就是在这个时候内存价格开始大幅度下降的。我之所以说这么多,其实就想说明,事物的发展都是有规律的,大数据的背后也潜藏着各种规律,把握好这些规律,个人认为对于理解记忆各种不同的大数据技术都是很有帮助的。

    作者回复: 说得太好了!以史为鉴知兴替,就是这个道理。在纵向上把视角拉高,其实就更容易理解很多新概念、新办法、新框架、新引擎。说的真好,后续多讨论哈~

    共 6 条评论
    37
  • 对方正在输入。。。
    2021-03-22
    问题一:每个rdd会有个dependencies的属性,deps记录的是该rdd与父rdd之间的依赖关系,deps类型是Seq[dependency], 如果dependency类型是shuffleDenpendency,那么spark就会视其操作为shuffle操作,然后进行stage的切割。 问题二:stage执行时,spark会调用该stage末尾rdd的iterator方法,然后iterator方法实现逻辑是:将该rdd的compute方法作用下父rdd的compute计算结果之上,从而得到该rdd的分区

    作者回复: 答得挺好~ 追问一句哈,第一题,spark怎么判断一个dependency是不是shuffle Dependency呢?

    共 3 条评论
    7
  • Sansi
    2021-03-22
    内存计算的第二层含义真的算内存计算吗,mr不是也可以把spark的多个map操作放到一个map任务吗,我认为只是在api层面spark更简单

    作者回复: 好问题,我认为算的,多个操作在内存中完成统一的数据转换,我认为这就是内存计算。mr不同的map任务之间也是需要落盘的哟~ 更何况,同一stage内部,spark还有wscg这种优化,因此即便是同一个map stage之间的比拼,效率上spark也会比mr更好。

    共 8 条评论
    7
  • aof
    2021-05-03
    1. DAG以Shuffle划分Stages,Shuffle的产生主要通过宽依赖和窄依赖,而宽窄依赖主要通过不同的算子来产生,比如产生窄依赖的算子:map,flatMap,filter,mapPartitions,union;产生宽依赖的算子:cogroup,join,groupyByKey,reduceByKey,combineByKey,distinct,repartition 2. 官网上看到过:WholeStageCodegen 全阶段代码生成将多个operators编译成一个Java函数来提升性能。

    作者回复: 两道题答得都不错~ 1. 宽窄依赖和算子类型,确实是判定Shuffle的主要依据,不过,还需要结合数据本身的分布来看。比如,你可以搜搜Collocated Join,这种Join情况是不会引入Shuffle的哈~ 2.WSCG这个提得很好,不过,这个是只有Spark SQL才能享受到的特性,也就是当你使用DataFrame、Dataset或是SQL进行开发的时候,才能享受到这个特性。对于纯粹的RDD API来说,所谓的“捏合”,其实是一种伪“捏合”,它是通过同一个Stage内部多个RDD算子compute函数嵌套的方式,来完成“捏合”。

    
    6
  • Geek_18fe90
    2021-12-29
    spark shuffle前后的分区数是如何计算的

    作者回复: Map阶段的并行度,会沿用父RDD的并行度,比如沿用HadoopRDD的并行度,这样的话,就是源文件原始的分片数量。Reduce阶段,可以通过repartition来调整,如果没有调整,默认按照spark.sql.shuffle.partitions来走~

    
    5
  • sparkjoy
    2021-07-13
    第一题,主要看父rdd的分区器是否一致,如果一致则生成子rdd的过程中不会产生shuffle

    作者回复: 是的~ 父子RDD的partitioner一致,就意味着他们会划分到同一个Stage~

    
    5
  • 小学生敬亭山
    2021-03-26
    老师您好,我请教个问题。既然是大数据,那么假设数据很大,无论怎么分区或者分布式,单个机器的内存都放不下,那这个时候spark是怎么计算的呢?必然会有一部分在磁盘一部分在内存吧,这种情况spark是如何避免落盘,如何提升效率的呢。

    作者回复: 好问题,先来回答你的问题: 1. 不是数据大于内存就会溢出到磁盘,取决于分片大小和每个task的可用内存。这部分在cpu、内存视角那几讲会详细展开,怎么平衡并行度、线程池、内存消耗。到时候可以关注一下哈~ 2. shuffle的过程确实有落盘的步骤,但也仅限shuffle操作。stage内部是流水线式的内存计算,不会有落盘的动作。

    共 2 条评论
    5
  • Wiggle Wiggle
    2021-04-13
    说个最极端的情况,如果对一个dataframe Read以后做了一堆不会触发shuffle 的操作,最后又调用了一下coalesce(1),然后write ,那是不是就意味着从读数据开始的所有操作都会在一个executor上完成?

    作者回复: 非常好的问题,这个edge case非常有意思,我们来细说说~ 取决于你如何调用coalesce(1, shuffle = false/true),分两种情况。 1. shuffle = false,就像你说的,所有操作,从一开始,并行度都是1,都在一个executor计算,显然,这个时候,整个作业非常慢,奇慢无比 2. shuffle = true,这个时候,coalesce就会引入shuffle,切割stage。coalesce之前,用源数据DataFrame的并行度,这个时候是多个Executors真正的并行计算;coalesce之后,也就是shuffle之后,并行度下降为1,所有父RDD的分区,全部shuffle到一个executor,交给一个task去计算。显然,相比前一种,这种实现在执行效率上,更好一些。因此,如果业务应用必须要这么做,推荐这一种实现方法。

    共 3 条评论
    4
  • 斯盖丸
    2021-04-18
    请问下老师,spark里cache的正确姿势是什么? 是直接df.cache()还是val cacheDf = df.cache()呢?另外不管cache还是persist都是lazy的,所以有必要紧接着一句df.count()让它马上执行吗?因为这样会平白无故多一个job,不知道是不是画蛇添足了

    作者回复: df.cache() df.count 或是 val cacheDf = df.cache() cacheDf.count 都可以,action是必需的,没有action,不会触发缓存的计算和存储,这可不是画蛇添足哈~

    
    2