10 | 广播变量 & 累加器:共享变量是用来做什么的?
- 深入了解
- 翻译
- 解释
- 总结
本文深入介绍了Spark中的广播变量和累加器的重要性和用法。广播变量通过SparkContext的broadcast API创建,可以在不同Executor之间共享数据,避免重复分发和存储,提高作业执行性能。文章通过对比普通变量和广播变量在计算过程中的差异,清晰展示了广播变量的优势所在。另一方面,累加器用于在分布式任务中进行累加操作,例如计数器。通过实际代码示例和对比分析,生动地展示了广播变量和累加器的用法、工作原理和优势,为读者提供了深入了解这两种共享变量的适用场景和优化Spark作业的方法。读者可以通过本文快速了解广播变量和累加器在Spark中的重要作用,以及如何利用它们来优化分布式计算的性能。
《零基础入门 Spark》,新⼈⾸单¥59
全部留言(11)
- 最新
- 精选
- Geek_2dfa9a置顶1.使用普通变量是无法得到预期结果的,因为lambda里的必须是final型的变量,那这里我用两种方法做个测试: 首先是原子类,因字数限制,只放关键代码: var normal = new AtomicInteger(0) val preRdd = wordRdd.filter(f => if(f.contains("scala")) { println(normal.incrementAndGet()) false } else { true } ) println(normal.get()) 可以看到Executor运行过程中normal是会正常累加的,但是最后println(normal.get())打印出来是0,这里简单分析下,spark会把闭包序列化后 传递给Executor,然后Executor再把闭包反序列化后作用在RDD上。因此Driver里的normal变量和Executor里的normal变量是多个进程里的多个变量。 然后使用自定义类对象 class IntHolder(var value: Int) {} 测试,报了一个Exception in thread "main" org.apache.spark.SparkException: Task not serializable 从侧面也证明,闭包里的对象是要实现序列化的。变量是多个进程里的多个变量改进下再试, class IntHolder(var value: Int) extends Serializable {} 发现每个Task都是从0开始计数,更说明每个Task里的对象是Driver丢过来的副本。这里我多想了下Task里的函数是串行执行还是并行执行的,如果我的IntHolder对象 不是线程安全的,那在Task里有无数据竞争?从我的例子看是串行运行的,但是一时找不到看哪些代码,请老师指正。 多讲两句,Spark闭包序列化和反序列化真的是很重要的知识,打开了我的视野,我感觉Spark是Faas一种非常巧妙的场景,函数式编程真的也很符合Spark 把函数作用在RDD上,存算分离的思路,UCB出品真的名不虚传。 2.Spark基本支持任何类型的广播变量,但是不支持RDD类型的广播变量,从代码中可以看到,有一个验证会判断变量是否为RDD类型,如果想要广播RDD类型的话,可以先 collect收集到driver,再作为普通集合广播。 def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass), "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc }
作者回复: 实验做的非常细致,有观察、有思考、思考很深入,赞👍,置顶🔝 先说Task串行、并行的问题,Task与Task之间,是并行的,Task内部,是串行。这个其实好理解,毕竟,一个Task占用一个CPU core,不存在并发上的物理条件~ 再者,特别同意你说的FAAS和打开视野,我对此深有同感~ 用你的话说,是Spark对于序列化和反序列化的支持。对我来说,Spark的调度系统,对我个人影响巨大,让我真正认识到分布式系统的玩法、特色和核心。在我看来,调度系统是学习Spark最应该学的部分,没有之一~ 当然,仁者见仁,大家的喜好和偏好都不同~ 就打开视野这方面,确实调度系统对于我个人的提升,或者说给我个人留下的印象,非常非常的深刻。 广播变量答的也很好~ RDD需要先collect,这一点其实想吐槽社区,完全可以做得跟DataFrame一样,直接封装就好了,非要开发者自行collect一波,然后再封装,麻烦!DataFrame就比较直接,Spark在背后帮开发者做了collect这一步,就开发者使用体验来说,还是要好些~
2021-10-0323 - Geek_f39659第一题: driver中声明的非广播变量,如果executor需要用的话,会给每一个分区拷贝一个副本,所以每个分区都会给自己的这份副本加一加一。最后这些副本随着executor 进程的结束就都丢失了。所以driver 中的这个变量仍然还是0.
作者回复: 没错,满分💯~
2021-10-0312 - 火炎焱燚python 版的代码对累加器这儿有些不一样,貌似没有longAccumulator,double之分,我这儿是这样的: file='~~~~/chapter01/wikiOfSpark.txt' lineRDD=sc.textFile(file) lineRDD.first() # 会打印出lineRDD的第一行: u'Apache Spark',如果没有打印出来,则报错 wordRDD=lineRDD.flatMap(lambda line: line.split(" ")) # 定义累加器 ac = sc.accumulator(0) # 这儿的定义方式不一样 # 定义filter算子的判定函数f,注意,f的返回类型必须是Boolean def f(x): if x=='': ac.add(1) return False else: return True # 使用f对RDD进行过滤 cleanWordRDD = wordRDD.filter(f) kvRDD=cleanWordRDD.map(lambda word:(word,1)) wordCounts=kvRDD.reduceByKey(lambda x,y:x+y) # 获取计算结果 wordCounts.collect() # 结果:[('Spark', 63), ('Apache', 34)] # wordCounts.map(lambda (k,v):(v,k)).sortByKey(False).take(5) # 作业执行完毕,通过调用value获取累加器结果 ac.value # 79
作者回复: 赞👍,满分💯,又收获一份Python代码,感谢老弟!后续我们一起把代码整理到Github~
2021-10-123 - welldo以前我写累加器,都是手动继承AccumulatorV2,现在找到更简单的方法了😁
作者回复: 继承AccumulatorV2也是个方法,其实还蛮高级的~
2021-10-292 - GAC·DU写程序测试了一下老师的第一个思考题,在本机的idea中可以得到正确的结果79,但是在spark- shell或者提交到yarn集群上结果却是0,好纠结,请老师解惑,代码如下。 第二个思考题,我认为广播变量是只读数据,参加计算时不能随着RDD一起变换形态破坏数据的一致性。 import org.apache.spark.sql.SparkSession object BcAcOpt { var n: Long = _ // 全局变量 def main(args: Array[String]): Unit = { val spark = SparkSession.builder.master("local[*]").appName("board").getOrCreate() val lineRDD = spark.sparkContext.textFile("wikiOfSpark.txt") println(lineRDD.partitions.length) val wordRDD = lineRDD.flatMap(word => word.split(" ")) val filterRDD = wordRDD.filter(acf) val kvRDD = filterRDD.map(word => (word, 1)) val wordCount = kvRDD.reduceByKey((x, y) => x + y) wordCount.take(10) println(n) spark.stop() } def acf(word: String): Boolean = { if ("".equals(word)) { n += 1 false } else { true } } }
作者回复: 非常赞~ 完美地复现了问题~ 这正是我想要的,哈哈~ 关于第一题,代码写的没有任何问题,就是想让大家对比这里的 var n: Long = _ // 全局变量,与累加器的区别所在。你的执行结果,完全符合预期,即本机跑,完全OK,但是,分布式环境就不行。这里我先不给老弟回复,老弟不妨花时间想想,没想明白的话,说明这一讲累加器没有吃透,再想想哈~ 想不清楚也不要紧,我们留言区继续讨论~ 第二个问题的话,广播变量是read-only的,只能读,不能写,其实这里的问题,不是想问它和累加器的区别,而是想问,广播变量能不能封装RDD、DataFrame这种分布式数据集,答案是肯定的哈~ 除了普通变量,分布式数据集之上,也可以创建广播变量~
2021-10-0242 - welldo老师,广播变量有"只读" / "不可变"特性, 但广播引用数据类型时, 广播的实际上是地址值,那么地址值肯定不可变,而地址值指向的内容是可变的; 我今天在idea和shell里做了一个测试,证明了这个说法是对的, 代码如下: (scala-List是不可变的,所以代码里我用了可变的scala-ListBuffer) var buffer: ListBuffer[String] = ListBuffer() var bufferBroad = spark.sparkContext.broadcast(buffer) val cleanWordRDD2: RDD[String] = wordRDD.filter(word => { if (bc.value.contains(word)) { //bc是文章里的广播变量 bufferBroad.value.append("test") //关键的一行 true } else { false } }) println(cleanWordRDD2.count())//触发这条dag println(buffer) //97(34+63)个test println(bufferBroad.value) //97(34+63)个test 老师,这个说法和我的证明没有问题吧? 这个算不算“不可变”特性的漏洞 呢?
作者回复: 哈哈,先赞👍一下老弟爱钻研的精神!!! 不过呢,所谓的漏洞说法有问题,证明更有问题,哈哈。根本原因还是,你的环境是单机、local,所以你怎么玩儿,都是单机的玩儿法。你比如这句: bufferBroad.value.append("test") 推广到分布式的环境,肯定会报错的。所以说做实验的话,建议还是在分布式下去做,这样很多结论,才站得住脚~
2021-10-311 - welldo老师,关于第一题, 我的答案:driver和executor是不一样的进程,普通变量会拷贝副本到executor上, “原本”和“副本”没有任何关系, spark-shell打印出来的是“原本”,数值是初始化时的值. 我的问题:为何idea能计算正确呢?
作者回复: idea其实就是单机,单机上,用普通变量,和用累加器,效果是一样的。但是在分布式的环境下,高下立现
2021-10-291 - Sun老师您好。关于第一题的实验,我使用自定义类,如下: class MyCount implements Serializable{ int num; public MyCount(){ this.num = 0; } public void add(){ this.num++; } } 将它用于代替广播变量,是不行的,因为executor最后销毁了这些对象。 但是我将这个对象置为静态对象,放在Driver中再运行后,这个对象成功得到了最后的结果。 请问老师,为什么设置为静态对象就可以获取结果呢?静态对象在Driver和Executor是怎么一个工作机制?2022-04-2011
- 唐方刚广播变量是以executor为粒度分发的,那么累加器是怎么分发的?最终的结果又是怎么算出来的?2022-08-12归属地:广东
- 杨帅老师,我有一个问题:一个worker上可以有多个executor吗?在读取文件阶段,一个executor(一个进程)里可以有多个task(多个线程)读取不同分区(RDD的分区)的数据吗?那RDD的分区的定义是什么呢,是不同机器上的数据,还是?2022-06-18