Spark性能调优实战
吴磊
FreeWheel机器学习团队负责人
立即订阅
1216 人已学习
课程目录
已更新 13 讲 / 共 32 讲
0/4登录后,你可以任选4讲全文学习。
课前必学 (3讲)
开篇词 | Spark性能调优,你该掌握这些“套路”
免费
01 | 性能调优的必要性:Spark本身就很快,为啥还需要我调优?
02 | 性能调优的本质:调优的手段五花八门,该从哪里入手?
原理篇 (5讲)
03 | RDD:为什么你必须要理解弹性分布式数据集?
04 | DAG与流水线:到底啥叫“内存计算”?
05 | 调度系统:“数据不动代码动”到底是什么意思?
06 | 存储系统:空间换时间,还是时间换空间?
07 | 内存管理基础:Spark如何高效利用有限的内存空间?
通用性能调优篇 (5讲)
08 | 应用开发三原则:如何拓展自己的开发边界?
09 | 调优一筹莫展,配置项速查手册让你事半功倍!(上)
10 |调优一筹莫展,配置项速查手册让你事半功倍!(下)
11 | Shuffle的工作原理:为什么说Shuffle是一时无两的性能杀手?
12 | 广播变量(一):克制Shuffle,如何一招制胜!
Spark性能调优实战
15
15
1.0x
00:00/00:00
登录|注册

12 | 广播变量(一):克制Shuffle,如何一招制胜!

吴磊 2021-04-09
你好,我是吴磊。
在数据分析领域,数据关联(Joins)是 Shuffle 操作的高发区,二者如影随从。可以说,有 Joins 的地方,就有 Shuffle。
我们说过,面对 Shuffle,开发者应当“能省则省、能拖则拖”。我们已经讲过了怎么拖,拖指的就是,把应用中会引入 Shuffle 的操作尽可能地往后面的计算步骤去拖。那具体该怎么省呢?
在数据关联场景中,广播变量就可以轻而易举地省去 Shuffle。所以今天这一讲,我们就先说一说广播变量的含义和作用,再说一说它是如何帮助开发者省去 Shuffle 操作的。

如何理解广播变量?

接下来,咱们借助一个小例子,来讲一讲广播变量的含义与作用。这个例子和 Word Count 有关,它可以说是分布式编程里的 Hello world 了,Word Count 就是用来统计文件中全部单词的,你肯定已经非常熟悉了,所以,我们例子中的需求增加了一点难度,我们要对指定列表中给定的单词计数。
val dict = List(“spark”, “tune”)
val words = spark.sparkContext.textFile(“~/words.csv”)
val keywords = words.filter(word => dict.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect
按照这个需求,同学小 A 实现了如上的代码,一共有 4 行,我们逐一来看。第 1 行在 Driver 端给定待查单词列表 dict;第 2 行以 textFile API 读取分布式文件,内容包含一列,存储的是常见的单词;第 3 行用列表 dict 中的单词过滤分布式文件内容,只保留 dict 中给定的单词;第 4 行调用 reduceByKey 对单词进行累加计数。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Spark性能调优实战》,如需阅读全部文章,
请订阅文章所属专栏
立即订阅
登录 后留言

精选留言(6)

  • Bennan
    1. 改成由driver获取到数据分布,然后通知各个executor之间进行拉取,这样可以利用多个executor网络,避免只有driver组装以后再一个一个发送效率过低

    2.当两个需要join的数据集都很大时,使用broadcast join需要将一个很大的数据集进行网络分发多次,已经远超出了shuffle join需要传输的数据

    作者回复: Perfect!满分💯,两道题答的都很好~

    2021-04-09
    1
  • Geek_d794f8
    老师有两个问题请教一下:
    1.文中提到两个表join,两个表数据量相差很大呀,为什么他们的的分区数是一致的,而且分区数不是根据hadoop的切片规则去划分的吗?
    2.广播join不是默认开启的吗,好像小表默认10M;还需像文中代码val bcUserDF = broadcast(userDF)这样声明吗?
    希望得到您的指导,多谢!
    2021-04-10
  • Jack
    老师,对于第1题,看了下spark的源码,目前Broadcast只有一个实现类TorrentBroadcast,看代码的注释,这个类通过使用类似Bit-torrent协议的方法解决了Driver成为瓶颈的问题。目前Spark还会存在广播变量的数据太大造成Driver成为瓶颈的问题吗?

    /**
     * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
     *
     * The mechanism is as follows:
     *
     * The driver divides the serialized object into small chunks and
     * stores those chunks in the BlockManager of the driver.
     *
     * On each executor, the executor first attempts to fetch the object from its BlockManager. If
     * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
     * other executors if available. Once it gets the chunks, it puts the chunks in its own
     * BlockManager, ready for other executors to fetch from.
     *
     * This prevents the driver from being the bottleneck in sending out multiple copies of the
     * broadcast data (one per executor).
     *
     * When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
     *
     * @param obj object to broadcast
     * @param id A unique identifier for the broadcast variable.
     */
    private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
      extends Broadcast[T](id) with Logging with Serializable {

    作者回复: 非常赞哈👍哈~ 凡是看源码的同学,都先给个赞~

    这块非常值得探讨。我的理解是这样的,代码层面,spark确实已经有code在尝试用p2p的方式来分发广播变量,从而减轻driver负担。

    但是,据我观察,这部分代码尚未生效。细节可以参考这个ticket:【Executor side broadcast for broadcast joins】https://issues.apache.org/jira/browse/SPARK-17556,看上去还是进行中的状态。

    另外,从代码看,目前还是先用collect拉到driver,然后再分发出去:

    BroadcastExchangeExec中的relationFuture用于获取广播变量内容
    在relationFuture内部:
      1. 先是调用executeCollectIterator生成内容relation;
         其中,executeCollectIterator调用collect把结果集收集到driver端
      2. 然后用sparkContext.broadcast(relation),把生成好的内容广播到各个Executors
    并没有看到哪里从Executors拉取数据分片、来减轻driver负载。

    并且,这里还有提示driver内存不够的exception:
    new OutOfMemoryError("Not enough memory to build and broadcast the table to all " + "worker nodes. As a workaround, you can either disable broadcast by setting " + s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark " + s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value.").initCause(oe.getCause))

    你贴的这段代码确实在尝试用片2片,不过,需要仔细看看,它在哪里调用,被谁调用~

    2021-04-09
    2
  • Jack
    老师,第2题@Bennan的回答有疑惑。为什么当两个需要join的数据集都很大时,broadcast join会超出shuffle join需要传输的数据。
    假设有A,B两表,各自大小都是100G,有4个executor,每个executor有4个task对应4个分区,总共16个task。,把shuffle的过程用二分图刻画,两边都各自有16个顶点,边有16*16条。
    对于shuffle join,假设shuffle前后分区个数不变,每个task需要去其他12个task(有3个task和自己在同一个executor上)拉取数据,把shuffle的过程看做是二分图,两边都各自有16个顶点,边有16*16条(task到task)。
    对于broadcast join,driver从各个分区获取数据,有16条从分区到driver的边(相当于16条task到task的边),然后driver广播给各个executor,总共有4条driver到executor的边。
    broadcast join的粒度是executor,shuffle的粒度是task,感觉还是broadcast join的数据少一点。因为同一份数据,即使两个task在同一个executor,对于shuffle,还是会在同一个executor上有两份相同的数据,而broadcast,在一个executor上只有一份数据。

    作者回复: 思考的很深入,先赞一个~ 我的理解是这样的,shuffle分发的,是部分数据。广播分发的,是全量数据。当然,像你说的,广播的粒度大,是executor level。shuffle粒度小,是task level。如果全量数据很大,即使能塞进广播变量,即便是executor粒度,我觉得它的开销,也远大于它的收益,甚至还有driver端oom的风险,所以这种情况,还不如采用shuffle join。 当然,这么说比较笼统,并没有精确的开销和收益的计算。这里其实更多的,是想强调广播不是银弹,不要滥用广播。

    2021-04-09
    3
  • 斯盖丸
    原来小表和大表join是节省了大表的shuffle,不然大表只能根据join的列在所有机器上重新分布一遍,现在懂了

    作者回复: 是的,以小博大

    2021-04-09
  • 🚤
    Executor端的Broadcast这个功能,目前的spark版本是不是没有这个功能呀?

    作者回复: code其实有,但是就我理解,出于各种原因,并没有生效,具体细节可以参考给Jack的回复哈~

    2021-04-09
收起评论
6
返回
顶部