作者回复: Perfect!答得已经很完美了,不过咱们再进一步,第二题,假设不是rdd API,而是dataframe、dataset,spark对于同一个stage内的算子,会有哪些优化呢?
作者回复: 说得太好了!以史为鉴知兴替,就是这个道理。在纵向上把视角拉高,其实就更容易理解很多新概念、新办法、新框架、新引擎。说的真好,后续多讨论哈~
作者回复: 答得挺好~ 追问一句哈,第一题,spark怎么判断一个dependency是不是shuffle Dependency呢?
作者回复: 好问题,我认为算的,多个操作在内存中完成统一的数据转换,我认为这就是内存计算。mr不同的map任务之间也是需要落盘的哟~ 更何况,同一stage内部,spark还有wscg这种优化,因此即便是同一个map stage之间的比拼,效率上spark也会比mr更好。
作者回复: 两道题答得都不错~ 1. 宽窄依赖和算子类型,确实是判定Shuffle的主要依据,不过,还需要结合数据本身的分布来看。比如,你可以搜搜Collocated Join,这种Join情况是不会引入Shuffle的哈~ 2.WSCG这个提得很好,不过,这个是只有Spark SQL才能享受到的特性,也就是当你使用DataFrame、Dataset或是SQL进行开发的时候,才能享受到这个特性。对于纯粹的RDD API来说,所谓的“捏合”,其实是一种伪“捏合”,它是通过同一个Stage内部多个RDD算子compute函数嵌套的方式,来完成“捏合”。
作者回复: Map阶段的并行度,会沿用父RDD的并行度,比如沿用HadoopRDD的并行度,这样的话,就是源文件原始的分片数量。Reduce阶段,可以通过repartition来调整,如果没有调整,默认按照spark.sql.shuffle.partitions来走~
作者回复: 是的~ 父子RDD的partitioner一致,就意味着他们会划分到同一个Stage~
作者回复: 好问题,先来回答你的问题: 1. 不是数据大于内存就会溢出到磁盘,取决于分片大小和每个task的可用内存。这部分在cpu、内存视角那几讲会详细展开,怎么平衡并行度、线程池、内存消耗。到时候可以关注一下哈~ 2. shuffle的过程确实有落盘的步骤,但也仅限shuffle操作。stage内部是流水线式的内存计算,不会有落盘的动作。
作者回复: 非常好的问题,这个edge case非常有意思,我们来细说说~ 取决于你如何调用coalesce(1, shuffle = false/true),分两种情况。 1. shuffle = false,就像你说的,所有操作,从一开始,并行度都是1,都在一个executor计算,显然,这个时候,整个作业非常慢,奇慢无比 2. shuffle = true,这个时候,coalesce就会引入shuffle,切割stage。coalesce之前,用源数据DataFrame的并行度,这个时候是多个Executors真正的并行计算;coalesce之后,也就是shuffle之后,并行度下降为1,所有父RDD的分区,全部shuffle到一个executor,交给一个task去计算。显然,相比前一种,这种实现在执行效率上,更好一些。因此,如果业务应用必须要这么做,推荐这一种实现方法。
作者回复: df.cache() df.count 或是 val cacheDf = df.cache() cacheDf.count 都可以,action是必需的,没有action,不会触发缓存的计算和存储,这可不是画蛇添足哈~