作者回复: 非常好的问题~ 先赞一个 👍 先说结论,你说的没错,通过在Driver端创建全局变量,然后在map中进行引用,也可以达到mapPartitions同样的效果。原理就是你说的函数闭包,其实本质上,它就是个对象,包含了外部数据结构的函数对象。 但是,需要注意的是,通过这种方式创建的闭包,一定要保证Driver端创建的全局变量是可以序列化的,也就是实现了Serializable接口。只有满足这个前提,Spark才能在这个闭包之上创建分布式任务,否则会报“Task not serializable”错误。 这个错误其实很好理解,要把Driver创建的对象带到Executors,这个对象一定是要可序列化的才行。不过遗憾的是,很多共享对象,比如咱们本讲的MD5,都没有实现Serializable接口,所以Driver端创建的MD5实例,Spark“带不动”(没法从Driver带到Executors),这一点你不妨动手试一试~ 不过这个点提得非常好,对于那些自定义的Class,我们可以让它实现Serializable接口,从而可以轻松地在Driver创建Spark可以“带的动”的全局变量。 说到这里,我追问老弟一句:在Driver创建MD5实例,map中直接引用,Spark会报“Task not Serializable”;那为什么在mapPartitions里面创建MD5实例,map引用这个对象,Spark却没有报“Task not Serializable”错误呢? 老弟不妨再想一想,咱们评论区继续讨论~
作者回复: Perfect,正解!就是这么回事~
作者回复: 不可以,严禁这么做。 RDD里面,禁止嵌套定义新的RDD,这个是Spark分布式开发的大忌。 Spark的开发模式、或者是开发规范,是利用transformation,来完成分布式数据集之间的转换,从而达到处理数据的目的~ RDD内嵌套RDD,这种属于单机思维哈,必须要跳出来~
作者回复: 两份代码都是满分💯,👍 感谢老弟分享、贡献Python代码,后面我们一起把它整理到GitHub里面去~ 感谢!!!
作者回复: 对的~ 确实,Scala的语法比较简洁(偷懒),return不是必须的,如果函数体没有return,那么最后一行代码的输出,就是整个函数的返回值~ 具体到这个匿名函数,它的返回结果就是你说的Array[String],后续flatMap再把这里的Array展平
作者回复: 可以参考下面的方式来实现: import scala.collection.mutable.ArrayBuffer def f(iter: Iterator[Int]) : Iterator[Int] = { val dataSeq: ArrayBuffer[Int] = new ArrayBuffer() while (iter.hasNext) { val cur = iter.next; dataSeq += cur } // 你的处理逻辑 val res = yourRPCFunction(dataSeq) // 以迭代器的形式,返回计算结果 res.iterator } // 处理RDD的时候,用mapPartitions调用f即可 data.mapPartitions(f)
作者回复: Scala语法确实比较灵活,一般来说,简单表达式用(),复杂表达式用{}。 比如,简单函数体:(x => x + 1)、(_ + 1),等等; 复杂函数体:{case x: String => “一大堆关于x的转换” }
作者回复: 是的,没有问题~ JAVA8开始支持函数式编程,它的stream本质上和咱们这里的算子是一样的~
作者回复: 好问题, 1)(K,V)可以理解成是元组,这个没有问题 2)关于reduceByKey为啥不支持RDD[Map[String,Int]],这个其实老弟需要充分理解PairRDD,就RDD[Map[String,Int]]这种类型来说,本质上,他其实是RDD[KeyType],而不是RDD[(KeyType,ValueType)]。也就是说,RDD[Map[String,Int]]不是PairRDD,所以不仅reduceByKey不支持,其他*ByKey类操作也不支持。如果把它变换一下,比如RDD[(String,Map[String,Int])],这样是可以的,因为这个类型属于PairRDD,KeyType是String,而ValueType是Map[String,Int]~
作者回复: 好问题~ 你说的对,这里笔误了,已经让编辑帮忙改成了“word.getbytes”哈~ 感谢纠正~