作者回复: 优秀!👍👍👍 满分💯,置顶🔝
作者回复: 好问题,你说的没错,Spark的采样,确实不会精确地控制个数。这跟他的采样原理有关,Spark采用伯努利或是泊松分布来做采样,对于每一个数据元素的采样,都是独立的。比如这里的0.1,也就是10%的采样率,Spark会对RDD每个元素抛个骰子,骰子值小于10%,才会把这个元素采集出去。因此,10%这个比例只是在统计上的一种近似,而不会精确地保证每次采样的个数,一定是原数据的10%。一个tips是,采样率越高,精确率越高,这个还是那句话,跟刚才的采样原理有关~ 如果想精确采样的话,可以这么做: rdd.takeSample(false, 1000) 这样得到的结果,一定是采集1000个数值。但是请注意,这里的返回结果,不再是RDD,而是array。takeSample的实现,实际上也是调用sample算子,只不过在最后,加入了元素个数上的检查,所以可以保证采样结果,一定是这里指定的1000。最后,要强调的是,takeSample会把结果收集到Driver端,所以要注意它对于Driver端内存的消耗~
作者回复: 非常好的问题,我们来细说说~ 取决于你如何调用coalesce(1, shuffle = false/true),分两种情况。 1. shuffle = false,这个时候,coalesce不会引入Shuffle,但是,所有操作,从一开始,并行度都是1,都在一个executor计算,显然,这个时候,整个作业非常慢,奇慢无比 2. shuffle = true,这个时候,coalesce就会引入shuffle,切割stage。coalesce之前,用源数据DataFrame的并行度,这个时候是多个Executors真正的并行计算;coalesce之后,也就是shuffle之后,并行度下降为1,所有父RDD的分区,全部shuffle到一个executor,交给一个task去计算。 显然,相比前一种,这种实现在执行效率上,更好一些。因此,如果业务应用必须要这么做,推荐这一种实现方法。回答你最初的问题,没错,coalesce(1)必然会把数据都放入同一个Executor里
作者回复: 正解,满分💯 👍
作者回复: 并无优劣之分~ foreachPartition更多地用来写DB和Kafka;而saveAsTextFile用来写文件系统。数据写入标的不同而已,没有优劣上的差别。 当然,也可以用foreachPartition来写文件系统,但是这个时候,就不如直接调用saveAsTextFile这类算子方便。
作者回复: 想必老弟对coalesce(1, shuffle = false/true)的用法和区别,已经了如指掌,不再赘述。 遗憾的是,驴和熊猫不可兼得,想要高性能的同时做到保序,确实不容易。shuffle = false/true,前者保序,后者高性能,确实两者很难兼得。 实在对不住,对于两者都需要的场景,我暂时还真想不到特别好的办法,除了你说的“先shuffle -> coalesce(1, shuffle = true) -> 再排序 -> 再收集”之外,我现在还真想不到更好的办法,抱歉
作者回复: 好问题~ coalesce(1,shuffle = false),首先,shuffle = false,通常是为了保序,否则没必要。然后,coalesce(1,shuffle = false),它的目标,是把分区数降低为1。要做到没有Shuffle,那只有从源头上就开始限制,也就是说,源头上的并行度,就是1。只有这样,才能做到最后在没有Shuffle的情况下,把并行度降低为1。 需要说明的是,这种“骚操作”会拖累整个作业的执行性能,如非必要,绝不推荐这么做~
作者回复: 正解,满分💯~
作者回复: 第一个问题,在第6讲的问题里面回复了哈~ 第6讲:06 | Shuffle管理,其实就是在介绍Sort-based shuffle的工作原理,只不过作为入门课,我们没有特别点明。bypass mergesort,实际上就是当reduce分区数小于一定阈值、且计算中不存在Aggregate操作,Spark就把Sort-based shuffle退化为Hash-based shuffle,因此,bypass mergesort实际的应用场景非常有限,因为一般来说,数据分析都少不了数据聚合,也就少不了Aggregate操作。 unsafe shuffle,其实底子还是Sort-based shuffle,但是利用了Tungsten的数据结构来做优化,Tungsten相关内存,老弟可以参考:14 | 台前幕后:DataFrame与Spark SQL的由来
作者回复: 确实,第一题跟scala语法比较相关,可以参考Geek_2dfa9a同学给出的答案哈:val unionRdd2 = Seq(rdd1, rdd2, rdd3).reduce(_.union(_))。 第二题满分💯~