零基础入门 Spark
吴磊
前 FreeWheel 机器学习研发经理
19171 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 38 讲
零基础入门 Spark
15
15
1.0x
00:00/00:00
登录|注册

10 | 广播变量 & 累加器:共享变量是用来做什么的?

你好,我是吴磊。
今天是国庆第一天,首先祝你节日快乐。专栏上线以来,有不少同学留言说期待后续内容,所以国庆期间我们仍旧更新正文内容,让我们一起把基础知识模块收个尾。
学习过 RDD 常用算子之后,回顾这些算子,你会发现它们都是作用(Apply)在 RDD 之上的。RDD 的计算以数据分区为粒度,依照算子的逻辑,Executors 以相互独立的方式,完成不同数据分区的计算与转换。
不难发现,对于 Executors 来说,分区中的数据都是局部数据。换句话说,在同一时刻,隶属于某个 Executor 的数据分区,对于其他 Executors 来说是不可见的。
不过,在做应用开发的时候,总会有一些计算逻辑需要访问“全局变量”,比如说全局计数器,而这些全局变量在任意时刻对所有的 Executors 都是可见的、共享的。那么问题来了,像这样的全局变量,或者说共享变量,Spark 又是如何支持的呢?
今天这一讲,我就来和你聊聊 Spark 共享变量。按照创建与使用方式的不同,Spark 提供了两类共享变量,分别是广播变量(Broadcast variables)和累加器(Accumulators)。接下来,我们就正式进入今天的学习,去深入了解这两种共享变量的用法、以及它们各自的适用场景。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入介绍了Spark中的广播变量和累加器的重要性和用法。广播变量通过SparkContext的broadcast API创建,可以在不同Executor之间共享数据,避免重复分发和存储,提高作业执行性能。文章通过对比普通变量和广播变量在计算过程中的差异,清晰展示了广播变量的优势所在。另一方面,累加器用于在分布式任务中进行累加操作,例如计数器。通过实际代码示例和对比分析,生动地展示了广播变量和累加器的用法、工作原理和优势,为读者提供了深入了解这两种共享变量的适用场景和优化Spark作业的方法。读者可以通过本文快速了解广播变量和累加器在Spark中的重要作用,以及如何利用它们来优化分布式计算的性能。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《零基础入门 Spark》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(11)

  • 最新
  • 精选
  • Geek_2dfa9a
    置顶
    1.使用普通变量是无法得到预期结果的,因为lambda里的必须是final型的变量,那这里我用两种方法做个测试: 首先是原子类,因字数限制,只放关键代码: var normal = new AtomicInteger(0) val preRdd = wordRdd.filter(f => if(f.contains("scala")) { println(normal.incrementAndGet()) false } else { true } ) println(normal.get()) 可以看到Executor运行过程中normal是会正常累加的,但是最后println(normal.get())打印出来是0,这里简单分析下,spark会把闭包序列化后 传递给Executor,然后Executor再把闭包反序列化后作用在RDD上。因此Driver里的normal变量和Executor里的normal变量是多个进程里的多个变量。 然后使用自定义类对象 class IntHolder(var value: Int) {} 测试,报了一个Exception in thread "main" org.apache.spark.SparkException: Task not serializable 从侧面也证明,闭包里的对象是要实现序列化的。变量是多个进程里的多个变量改进下再试, class IntHolder(var value: Int) extends Serializable {} 发现每个Task都是从0开始计数,更说明每个Task里的对象是Driver丢过来的副本。这里我多想了下Task里的函数是串行执行还是并行执行的,如果我的IntHolder对象 不是线程安全的,那在Task里有无数据竞争?从我的例子看是串行运行的,但是一时找不到看哪些代码,请老师指正。 多讲两句,Spark闭包序列化和反序列化真的是很重要的知识,打开了我的视野,我感觉Spark是Faas一种非常巧妙的场景,函数式编程真的也很符合Spark 把函数作用在RDD上,存算分离的思路,UCB出品真的名不虚传。 2.Spark基本支持任何类型的广播变量,但是不支持RDD类型的广播变量,从代码中可以看到,有一个验证会判断变量是否为RDD类型,如果想要广播RDD类型的话,可以先 collect收集到driver,再作为普通集合广播。 def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass), "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc }

    作者回复: 实验做的非常细致,有观察、有思考、思考很深入,赞👍,置顶🔝 先说Task串行、并行的问题,Task与Task之间,是并行的,Task内部,是串行。这个其实好理解,毕竟,一个Task占用一个CPU core,不存在并发上的物理条件~ 再者,特别同意你说的FAAS和打开视野,我对此深有同感~ 用你的话说,是Spark对于序列化和反序列化的支持。对我来说,Spark的调度系统,对我个人影响巨大,让我真正认识到分布式系统的玩法、特色和核心。在我看来,调度系统是学习Spark最应该学的部分,没有之一~ 当然,仁者见仁,大家的喜好和偏好都不同~ 就打开视野这方面,确实调度系统对于我个人的提升,或者说给我个人留下的印象,非常非常的深刻。 广播变量答的也很好~ RDD需要先collect,这一点其实想吐槽社区,完全可以做得跟DataFrame一样,直接封装就好了,非要开发者自行collect一波,然后再封装,麻烦!DataFrame就比较直接,Spark在背后帮开发者做了collect这一步,就开发者使用体验来说,还是要好些~

    2021-10-03
    23
  • Geek_f39659
    第一题: driver中声明的非广播变量,如果executor需要用的话,会给每一个分区拷贝一个副本,所以每个分区都会给自己的这份副本加一加一。最后这些副本随着executor 进程的结束就都丢失了。所以driver 中的这个变量仍然还是0.

    作者回复: 没错,满分💯~

    2021-10-03
    12
  • 火炎焱燚
    python 版的代码对累加器这儿有些不一样,貌似没有longAccumulator,double之分,我这儿是这样的: file='~~~~/chapter01/wikiOfSpark.txt' lineRDD=sc.textFile(file) lineRDD.first() # 会打印出lineRDD的第一行: u'Apache Spark',如果没有打印出来,则报错 wordRDD=lineRDD.flatMap(lambda line: line.split(" ")) # 定义累加器 ac = sc.accumulator(0) # 这儿的定义方式不一样 # 定义filter算子的判定函数f,注意,f的返回类型必须是Boolean def f(x): if x=='': ac.add(1) return False else: return True # 使用f对RDD进行过滤 cleanWordRDD = wordRDD.filter(f) kvRDD=cleanWordRDD.map(lambda word:(word,1)) wordCounts=kvRDD.reduceByKey(lambda x,y:x+y) # 获取计算结果 wordCounts.collect() # 结果:[('Spark', 63), ('Apache', 34)] # wordCounts.map(lambda (k,v):(v,k)).sortByKey(False).take(5) # 作业执行完毕,通过调用value获取累加器结果 ac.value # 79

    作者回复: 赞👍,满分💯,又收获一份Python代码,感谢老弟!后续我们一起把代码整理到Github~

    2021-10-12
    3
  • welldo
    以前我写累加器,都是手动继承AccumulatorV2,现在找到更简单的方法了😁

    作者回复: 继承AccumulatorV2也是个方法,其实还蛮高级的~

    2021-10-29
    2
  • GAC·DU
    写程序测试了一下老师的第一个思考题,在本机的idea中可以得到正确的结果79,但是在spark- shell或者提交到yarn集群上结果却是0,好纠结,请老师解惑,代码如下。 第二个思考题,我认为广播变量是只读数据,参加计算时不能随着RDD一起变换形态破坏数据的一致性。 import org.apache.spark.sql.SparkSession object BcAcOpt { var n: Long = _ // 全局变量 def main(args: Array[String]): Unit = { val spark = SparkSession.builder.master("local[*]").appName("board").getOrCreate() val lineRDD = spark.sparkContext.textFile("wikiOfSpark.txt") println(lineRDD.partitions.length) val wordRDD = lineRDD.flatMap(word => word.split(" ")) val filterRDD = wordRDD.filter(acf) val kvRDD = filterRDD.map(word => (word, 1)) val wordCount = kvRDD.reduceByKey((x, y) => x + y) wordCount.take(10) println(n) spark.stop() } def acf(word: String): Boolean = { if ("".equals(word)) { n += 1 false } else { true } } }

    作者回复: 非常赞~ 完美地复现了问题~ 这正是我想要的,哈哈~ 关于第一题,代码写的没有任何问题,就是想让大家对比这里的 var n: Long = _ // 全局变量,与累加器的区别所在。你的执行结果,完全符合预期,即本机跑,完全OK,但是,分布式环境就不行。这里我先不给老弟回复,老弟不妨花时间想想,没想明白的话,说明这一讲累加器没有吃透,再想想哈~ 想不清楚也不要紧,我们留言区继续讨论~ 第二个问题的话,广播变量是read-only的,只能读,不能写,其实这里的问题,不是想问它和累加器的区别,而是想问,广播变量能不能封装RDD、DataFrame这种分布式数据集,答案是肯定的哈~ 除了普通变量,分布式数据集之上,也可以创建广播变量~

    2021-10-02
    4
    2
  • welldo
    老师,广播变量有"只读" / "不可变"特性, 但广播引用数据类型时, 广播的实际上是地址值,那么地址值肯定不可变,而地址值指向的内容是可变的; 我今天在idea和shell里做了一个测试,证明了这个说法是对的, 代码如下: (scala-List是不可变的,所以代码里我用了可变的scala-ListBuffer) var buffer: ListBuffer[String] = ListBuffer() var bufferBroad = spark.sparkContext.broadcast(buffer) val cleanWordRDD2: RDD[String] = wordRDD.filter(word => { if (bc.value.contains(word)) { //bc是文章里的广播变量 bufferBroad.value.append("test") //关键的一行 true } else { false } }) println(cleanWordRDD2.count())//触发这条dag println(buffer) //97(34+63)个test println(bufferBroad.value) //97(34+63)个test 老师,这个说法和我的证明没有问题吧? 这个算不算“不可变”特性的漏洞 呢?

    作者回复: 哈哈,先赞👍一下老弟爱钻研的精神!!! 不过呢,所谓的漏洞说法有问题,证明更有问题,哈哈。根本原因还是,你的环境是单机、local,所以你怎么玩儿,都是单机的玩儿法。你比如这句: bufferBroad.value.append("test") 推广到分布式的环境,肯定会报错的。所以说做实验的话,建议还是在分布式下去做,这样很多结论,才站得住脚~

    2021-10-31
    1
  • welldo
    老师,关于第一题, 我的答案:driver和executor是不一样的进程,普通变量会拷贝副本到executor上, “原本”和“副本”没有任何关系, spark-shell打印出来的是“原本”,数值是初始化时的值. 我的问题:为何idea能计算正确呢?

    作者回复: idea其实就是单机,单机上,用普通变量,和用累加器,效果是一样的。但是在分布式的环境下,高下立现

    2021-10-29
    1
  • Sun
    老师您好。关于第一题的实验,我使用自定义类,如下: class MyCount implements Serializable{ int num; public MyCount(){ this.num = 0; } public void add(){ this.num++; } } 将它用于代替广播变量,是不行的,因为executor最后销毁了这些对象。 但是我将这个对象置为静态对象,放在Driver中再运行后,这个对象成功得到了最后的结果。 请问老师,为什么设置为静态对象就可以获取结果呢?静态对象在Driver和Executor是怎么一个工作机制?
    2022-04-20
    1
    1
  • 唐方刚
    广播变量是以executor为粒度分发的,那么累加器是怎么分发的?最终的结果又是怎么算出来的?
    2022-08-12归属地:广东
  • 杨帅
    老师,我有一个问题:一个worker上可以有多个executor吗?在读取文件阶段,一个executor(一个进程)里可以有多个task(多个线程)读取不同分区(RDD的分区)的数据吗?那RDD的分区的定义是什么呢,是不同机器上的数据,还是?
    2022-06-18
收起评论
显示
设置
留言
11
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部