12 | 广播变量(一):克制Shuffle,如何一招制胜!
如何理解广播变量?
- 深入了解
- 翻译
- 解释
- 总结
本文深入探讨了如何通过使用广播变量来克制Shuffle操作,提高数据分析领域的性能。作者首先以电子商务场景为例,说明了在分布式环境中进行数据关联时,Shuffle Join所带来的性能瓶颈。随后,作者介绍了如何利用广播变量来避免Shuffle操作,通过将小表广播到各个Executor,避免了大表的全网分发,从而提高了运行时执行性能。文章强调了广播变量的作用,即通过缓存分布式数据集来克制Shuffle操作,提高性能。总结指出,掌握广播变量的创建方式,可以在数据关联中将Shuffle Joins转换为Broadcast Joins,以小投入换取大产出,实现性能优化。整体而言,本文深入浅出地介绍了广播变量的原理和应用,为读者提供了解决Shuffle操作的实用技巧。
《Spark 性能调优实战》,新⼈⾸单¥59
全部留言(23)
- 最新
- 精选
- Sansi1. 改成由driver获取到数据分布,然后通知各个executor之间进行拉取,这样可以利用多个executor网络,避免只有driver组装以后再一个一个发送效率过低 2.当两个需要join的数据集都很大时,使用broadcast join需要将一个很大的数据集进行网络分发多次,已经远超出了shuffle join需要传输的数据
作者回复: Perfect!满分💯,两道题答的都很好~
2021-04-09421 - Geek_d794f8磊哥,为什么我测试了广播rdd不行: 我写了个demo,广播rdd是报错的,代码如下: val userFile: String ="spark-basic/File/csv_data.csv" val df: DataFrame = spark.read.csv(userFile) val rdd = spark.sparkContext.textFile("userFile") val bc_df: Broadcast[RDD[String]] = spark.sparkContext.broadcast(rdd) bc_df.value.collect().foreach(println) 报错如下:Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Can not directly broadcast RDDs; instead, call collect() and broadcast the result. 然后看了一下源码:SparkContext中的broadcast方法: def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass), "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc } 第4行的代码显示的Can not directly broadcast RDDs 是不是我哪里不太对?
作者回复: 你是对的,确实会报错。这里我没有交代清楚,RDD确实不能直接用广播变量封装,它不像DataFrame,DataFrame广播的那部分源码在内部把collect这个事做了,所以你可以直接用广播封装DataFrame,但是RDD没有,确实需要先手动collect RDD数据集,然后再在driver端用广播变量封装,我的锅,没有交代清楚~ 不过,歪打正着,通过这个例子,你可以更好地理解Driver在构建广播变量时的计算过程,也就是第一步都是把数据集collect到Driver端,不管是RDD、DataFrame、Dataset,区别无非是collect这件事是谁做的。RDD是开发者来做,而DataFrame、Dataset是Spark自己“偷偷”做了。
2021-05-12320 - Jack老师,对于第1题,看了下spark的源码,目前Broadcast只有一个实现类TorrentBroadcast,看代码的注释,这个类通过使用类似Bit-torrent协议的方法解决了Driver成为瓶颈的问题。目前Spark还会存在广播变量的数据太大造成Driver成为瓶颈的问题吗? /** * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]]. * * The mechanism is as follows: * * The driver divides the serialized object into small chunks and * stores those chunks in the BlockManager of the driver. * * On each executor, the executor first attempts to fetch the object from its BlockManager. If * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or * other executors if available. Once it gets the chunks, it puts the chunks in its own * BlockManager, ready for other executors to fetch from. * * This prevents the driver from being the bottleneck in sending out multiple copies of the * broadcast data (one per executor). * * When initialized, TorrentBroadcast objects read SparkEnv.get.conf. * * @param obj object to broadcast * @param id A unique identifier for the broadcast variable. */ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) extends Broadcast[T](id) with Logging with Serializable {
作者回复: 非常赞哈👍哈~ 凡是看源码的同学,都先给个赞~ 这块非常值得探讨。我的理解是这样的,代码层面,spark确实已经有code在尝试用p2p的方式来分发广播变量,从而减轻driver负担。 但是,据我观察,这部分代码尚未生效。细节可以参考这个ticket:【Executor side broadcast for broadcast joins】https://issues.apache.org/jira/browse/SPARK-17556,看上去还是进行中的状态。 另外,从代码看,目前还是先用collect拉到driver,然后再分发出去: BroadcastExchangeExec中的relationFuture用于获取广播变量内容 在relationFuture内部: 1. 先是调用executeCollectIterator生成内容relation; 其中,executeCollectIterator调用collect把结果集收集到driver端 2. 然后用sparkContext.broadcast(relation),把生成好的内容广播到各个Executors 并没有看到哪里从Executors拉取数据分片、来减轻driver负载。 并且,这里还有提示driver内存不够的exception: new OutOfMemoryError("Not enough memory to build and broadcast the table to all " + "worker nodes. As a workaround, you can either disable broadcast by setting " + s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark " + s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value.").initCause(oe.getCause)) 你贴的这段代码确实在尝试用片2片,不过,需要仔细看看,它在哪里调用,被谁调用~
2021-04-0986 - 冯杰老师你好,关于broacast join,遇到了一个特别的问题请教一下。 1、Fact(订单) 和 DIM(门店) 关联。 其中门店表量级为 3w(条数) * 10个(字段),采用Parquet存储在Hive上,大小1M左右。 2、运行参数,并行度 = 200,Executor = 50,CPU核数 = 2,内存/Executor = 6G,Drvier内存=2G。 PS:没有特别配置Broadcast 相关参数 3、执行时,有两个疑问点,不得其解 a)Spark UI 显示,并没有执行BHJ,反而执行了 hash sort merge join。 照理,如此小的数据,应该走前者 b)Spark UI 显示,走hash sort merge join后,shuffle阶段的内存计算大小为4MB,sort阶段的内存计算大小为6G。 为何sort完后,为膨胀的如此厉害。
作者回复: 先来说第一个问题~ a)你说的没错,按理说,应该走广播,毕竟广播阈值默认10MB,你的数据集足够小,磁盘上1MB,内存里4MB,怎么看都比广播阈值小。我能想到的可能的原因,就是Spark SQL对于数据集大小的误判,就是它对于DIM的预估大于10MB。你不妨用下面这个方法,计算一下DIM在Spark SQL下的预判大小: val df: DataFrame = _ df.cache.count val plan = df.queryExecution.logical val estimated: BigInt = spark .sessionState .executePlan(plan) .optimizedPlan .stats .sizeInBytes 看看能不能发现什么端倪。虽然咱们不能完全确认原因,不过要解决这个问题倒是蛮简单的。一来可以用broadcast函数,二来可以用join hints,总之就是各种花式强制广播。 b)这里就需要更多信息来判断了,6G是从哪里得到的?Spark UI的DAG图吗?sort阶段的6G,仅仅是DIM表的大小?可以加我微信“方块K”或是“rJunior”,把完整的DAG贴给我~
2021-06-105 - Geek_d794f8老师有两个问题请教一下: 1.文中提到两个表join,两个表数据量相差很大呀,为什么他们的的分区数是一致的,而且分区数不是根据hadoop的切片规则去划分的吗? 2.广播join不是默认开启的吗,好像小表默认10M;还需像文中代码val bcUserDF = broadcast(userDF)这样声明吗? 希望得到您的指导,多谢!
作者回复: 思考的很深入,👍赞一个~ 1. 我理解,你问的是这句吧?“由于左右表的分区数是一致的,因此 Shuffle 过后,一定能够保证 userID 相同的交易记录和用户数据坐落在同一个 Executors 内。” HadoopRDD的分区数、或者说并行度,确实是由HDFS文件系统决定的;但是,Shuffle过后,每个分布式数据集的并行度,就由参数spark.sql.shuffle.partitions来决定了,这个咱们在配置项哪一讲说过哟~ 因此,如果你没有手工用repartition或是Coalesce去调整并行度,默认情况下,大家Shuffle过后(在Reduce阶段)都是这个并行度。 2. 默认确实是开启的,默认值确实也是10MB,但是,这个10MB太太太太太太(太 x N)小了!很多小表其实都超过了这个阈值,因此,如果你懒得去调整这个参数,可以直接用broadcast(userDF)这种强制广播的方式,省时省力,比较方便~
2021-04-105 - 斯盖丸老师我生产中为啥从没有遇到过10000并行度那么大的stage,可能我公司比较小吧,集群最多才100多个核,多数时才几百个任务,最多时也才2000多个任务。这健康吗?
作者回复: 这里其实有两个容易混淆的概念哈~ 一个是并行度,并行度其实是从数据角度出发,表示的是你的分布式数据集划分的粒度,再直白点说,它和分区数是等效的。因此,它其实跟你集群有多少Executors,每个Executors配置了多少cores,没有关系~ 第二个是并发度,或者叫Executors线程池大小,也就是你用spark.executor.cores类似的参数,给Executors指定的cores资源。它限制了在同一时间,你的Executors中最多同时能跑多少个任务。Executors并发度乘以集群中的Executors数量,其实就是你集群的并发处理能力,很多地方也叫并行处理能力。其实蛋疼的地方在于,不同的作者、不同的上下文,并发和并行这两个词,总是混用。所以也就造成大家都比较困惑。 咱们在配置项第一讲,其实就在尝试厘清、约定这两个词的定义,一来方便大家理解,二来方便后续讨论。 所以,回答你的问题,其实没什么不健康的哈~ 10000并行度,意味着10000个分区的分布式数据集,这个应该不难见到。另外100个cores的集群,其实也不算小了~ 不过你说的2000任务我没有get到,不知道是2000并行度,还是2000的集群并发。如果是2000集群并发的话,这个数和100cores对不上。这意味着你的每个core需要20个超线程,哈哈,目前还没有这么给力的CPU。一般CPU也就2个超线程。
2021-05-0544 - 狗哭select * from (select id from table1) a -- 结果很大 left join (select id from table2) b -- 结果很小 on t1.id = t2.id; 老师请教下,这种情况b表会广播吗?如果不会怎么处理能让其广播出去呢
作者回复: 能否广播,取决于b表的存储大小,是否小于广播阈值,也就是:spark.sql.autoBroadcastJoinThreshold。如果小于这个阈值,就会广播,否则就不会。 如果懒得设阈值,还可以利用 API 强制广播,这里的具体细节,可以参考第13讲哈,就是后面的一讲~ 会详细说,怎么把Shuffle Join,转化为Broadcast Join
2021-10-2131 - 斯盖丸原来小表和大表join是节省了大表的shuffle,不然大表只能根据join的列在所有机器上重新分布一遍,现在懂了
作者回复: 是的,以小博大
2021-04-091 - 陌生的心酸1>.当数据量比较大,对数据进行广播后,同时还要接受各个Executor的中间结果上报,状态管理,导致网络繁忙,继而会发生分发任务到Executor产生失败 2> 两个大表【超过广播变量的阈值参数设置】进行join,数据需要分发多次,效率不佳
作者回复: 可以参考Bennan同学的答案哈: 1. P2P思路:改成由driver获取到数据分布,然后通知各个executor之间进行拉取,这样可以利用多个executor网络,避免只有driver组装以后再一个一个发送效率过低 2.当两个需要join的数据集都很大时,使用broadcast join需要将一个很大的数据集进行网络分发多次,已经远超出了shuffle join需要传输的数据
2022-02-21 - 子兮老师您好,在整个应用资源较为紧张,数据量较大的情况下:spark core计算过程中,生成一个较大的RDD , 它被引用一次,但我还是对它persist(disk),我在用完并且动作算子后,立刻对它进行了释放unpersist,这样操作是否能加快spark 对这个rdd 的清理,加快内存的释放,缓解内存压力?如果是persist(memory and disk),用完并且在动作算子后立即释放unpersist,是否能缓解内存压力?如果不persist,用完并且在动作算子后立即释放unpersist,是否能缓解内存压力? 文字有些长,希望没给老师造成困扰,谢谢老师
作者回复: 关于Cache(Persist)的部分,建议老弟关注第16讲:内存视角(二):如何有效避免Cache滥用?这一讲,比较系统、细致地介绍了Cache的使用场景、原则,和一般注意事项,尤其是什么时候该Cache,什么时候不能滥用Cache,老弟可以先看看哈~
2021-11-10