零基础入门 Spark
吴磊
前 FreeWheel 机器学习研发经理
19171 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 38 讲
零基础入门 Spark
15
15
1.0x
00:00/00:00
登录|注册

06 | Shuffle管理:为什么Shuffle是性能瓶颈?

你好,我是吴磊。
在上一讲,我们拜访了斯巴克国际建筑集团总公司,结识了 Spark 调度系统的三巨头:DAGScheduler、TaskScheduler 和 SchedulerBackend。相信你已经感受到,调度系统组件众多且运作流程精密而又复杂。
任务调度的首要环节,是 DAGScheduler 以 Shuffle 为边界,把计算图 DAG 切割为多个执行阶段 Stages。显然,Shuffle 是这个环节的关键。那么,我们不禁要问:“Shuffle 是什么?为什么任务执行需要 Shuffle 操作?Shuffle 是怎样一个过程?”
今天这一讲,我们转而去“拜访”斯巴克国际建筑集团的分公司,用“工地搬砖的任务”来理解 Shuffle 及其工作原理。由于 Shuffle 的计算几乎需要消耗所有类型的硬件资源,比如 CPU、内存、磁盘与网络,在绝大多数的 Spark 作业中,Shuffle 往往是作业执行性能的瓶颈,因此,我们必须要掌握 Shuffle 的工作原理,从而为 Shuffle 环节的优化打下坚实基础。

什么是 Shuffle

我们先不急着给 Shuffle 下正式的定义,为了帮你迅速地理解 Shuffle 的含义,从而达到事半功倍的效果,我们不妨先去拜访斯巴克集团的分公司,去看看“工地搬砖”是怎么一回事。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入探讨了Spark中Shuffle的重要性以及为什么Shuffle成为性能瓶颈。通过生动的比喻和具体的示例,使得抽象的技术概念变得易于理解。首先通过“工地搬砖”的比喻,直观解释了Shuffle的概念,将不同类型的砖头在分公司之间搬运的过程与分布式计算中的Shuffle进行了类比。随后,阐述了Shuffle的工作原理,以Word Count的例子为例,详细分析了reduceByKey算子中涉及的Shuffle操作。文章指出,Shuffle是Map阶段与Reduce阶段之间的数据交换,强调了Shuffle在分布式计算中的关键作用。最后,文章提出了计算过程中为何需要引入Shuffle操作的原因,强调了在绝大多数的业务场景中,Shuffle操作都是必需的、无法避免的。通过本文,读者可以深入了解Shuffle在Spark中的重要性和工作原理,以及为何Shuffle成为性能瓶颈。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《零基础入门 Spark》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(29)

  • 最新
  • 精选
  • Geek_2dfa9a
    置顶
    官网配置文档 https://spark.apache.org/docs/3.1.2/configuration.html Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. Note: This will be overridden by SPARK_LOCAL_DIRS (Standalone), MESOS_SANDBOX (Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager. 临时目录,用来存map output文件(shuffle产生)和save RDD到磁盘的时候会用到。应该配置成快速的本地磁盘。支持‘,’分隔的多个目录。 注意:这个配置会被SPARK_LOCAL_DIRS(Standalone部署),MESOS_SANDBOX(Mesos),LOCAL_DIRS (YARN)替换。 既然shuffle会带来很高的开销,除了优化driver程序也可以考虑优化系统配置。 首先/tmp是会被系统清理的(取决于不同linux分发版的清理策略),如果作业运行时/tmp中的文件被清除了,那就要重新shuffle或 重新缓存RDD(这块没有仔细研究,简单猜测下缓存失效策略是重新缓存),因此,不适合将配置spark.local.dir设置为默认的/tmp 其次,部署文档有提该配置项支持多个目录,那可以考虑配置多块硬盘(或者SSD),再把挂载不同硬盘的目录配置到spark.local.dir, 这样可以显著提升shuffle和RDD缓存的性能。请大家指教。

    作者回复: 完美,满分💯!置顶🔝 老弟功底十分扎实~👍👍👍

    2021-09-22
    7
    37
  • qinsi
    看到哈希那边有个问题,就是遇到不均匀的数据会怎么样?比如对这篇论文执行word count:https://isotropic.org/papers/chicken.pdf 原本可能指望所有工人一起搬砖,结果发现只有一个工人在搬砖? 另外前几讲说过大数据的精髓在于数据不动代码动,但这讲说还是无法避免shuffle搬运数据,这个要怎么理解?

    作者回复: 好问题~ 第一个问题实际上,就是数据倾斜,data skew,倾斜会导致你说的,闲的闲死、忙的忙死,忙的那个拖累作业整体性能。两种思路,一个是用spark3.0的AQE,join自动倾斜处理。另一个是手工加盐。这两种方法,其实在《性能篇》都有详细的介绍。稍后我把那边比较核心的讲解,给你贴过来。这会在地铁上,不好操作。 第二个,其实是两个层面的事情。一个是调度系统,说的是代码调度,调度到数据所在的地方。而shuffle呢,数据移动是刚需,是计算逻辑需要。换句话说,这个时候,代码动不动,数据都要动。这个其实已经超出调度系统范畴,纯粹是计算逻辑需要。两个层面的问题哈~

    2021-09-23
    2
    12
  • Unknown element
    老师您在shuffle read部分说“不同的 Reduce Task 正是根据 index 文件中的起始索引来确定哪些数据内容是属于自己的”,这一步具体是怎么实现的呢?以文中的index文件举例,文中的index文件是0,3,7,那么不同的reduce task是各自有一个编号,然后按编号大小顺序确定自己应该拉取哪一部分数据吗?比如编号为0的reduce task拉取index文件的第一个索引到第二个索引之间的数据,也就是index为0,1,2的数据;编号为1的reduce task拉取index文件的第二个索引到第三个索引之间的数据,也就是index为3,4,5,6的数据;编号为2的reduce task拉取index文件的第三个索引到最后的数据,也就是index为7,8,9的数据?这样的话如果map task计算出来没有数据应该被发到第二个reduce task那index文件是0,3,3吗?

    作者回复: 好问题~ 是的, 就像你分析的那样,如果map task计算阶段,发现没有某些reduce task的数据,那么index文件中的索引,就会一直顺延~ 思考的很深入,赞👍~

    2021-10-08
    10
  • Unknown element
    老师您好 我有两个问题想问下: 1. 对于所有 Map Task 生成的中间文件,Reduce Task 需要通过网络从不同节点的硬盘中下载并拉取属于自己的数据内容 那不同的reduce task是怎么知道哪些内容是属于自己的呢?比如对于文中的例子,reduce阶段的3个任务怎么知道自己应该拉取中间文件的哪些记录? 2. 对于评论区AIK的问题,您说shuffle过程不是数据交换,而是数据流转,那意思是在map阶段 所有将要执行reduce task的节点都是空闲的吗(等待map task生成shuffle中间文件)?那他们是不是在这个stage的整个计算过程中都是空闲的?这样的话岂不是没有发挥出集群的最大算力?

    作者回复: 好问题~ 先说第一个,这个就是index文件的作用,它记录的就是隶属于不同Reduce task的数据索引,Reduce task基于这些索引来判断,哪些数据属于他。举例来说,Reduce task 3,那么data文件中,index从3到4之间的数据,就是属于这个Reduce task 3的。 第二个,好问题。这个其实是“静态思维”惹的祸。要知道,不管是Map task,还是Reduce task,消耗的计算资源,都是同一个集群、同样节点上同样的一批Executors。 Reduce tasks对于Map tasks是有依赖的,同一批Executors,执行完Map tasks之后,数据落盘到了spark.local.dir配置的目录。接下来,还是这同一批Executors,启动Reduce tasks,跨网络去不同节点上拉取属于自己的数据。 因此,Executors一直没闲着,不存在资源浪费的问题。要动态地看待Map & Reduce过程,他们在时间线上,是有前后关系的。而所有任务,消耗的都是同一批硬件资源~

    2021-09-29
    3
    6
  • 钟强
    如果集群中只有一个executor, 但是executor上面有多个map task, 这样的环境是不是不需要shuffle?

    作者回复: shuffle与executors数量无关哈,即便是一个executors,像groupByKey、join、reduceByKey这些操作,照样会引入shuffle,只不过shuffle都在同一个executors发生,省去了网络I/O的开销,但是磁盘开销还是会有

    2022-03-08
    3
  • Ebdaoli
    磊哥,关于 spark shuffle write 阶段有个问题不太理解,①:Maptask的 任务数的并行度由什么来决定的?根据文件大小来切分划分的吗? ②:为什么最终数据需要进行 sort?合并的方式选择的是 归并排序?

    作者回复: 一个个来看哈 1)Map task,其并行度由其Stage中的首个RDD决定,如果Map task是读取HDFS,那么并行度就是分布式文件block数量; 2)是的,中间文件的合并,可以理解为归并排序;另外,对于Join策略,Spark通常默认选取SMJ,因此Sort有利于后期做数据关联

    2022-03-06
    3
  • A
    因rdd得依赖属性shuffle划分了两个stage0和1 运行stage0的executor产生的数据称作建材,结束后driver继续提交stage1,运行stage1的executor全集群得去拉去各自所需的建材,可以这样理解嘛老师? 那stage0产生的临时data、index是记录在哪里?如何返回给driver的呢?以及stage1提交时是如何获取的呢? 目前想到的是重新封装但是又说不过去 还望老师给条路,自己再去研究一下! 感谢老师!

    作者回复: Quote “运行stage0的executor产生的数据称作建材,结束后driver继续提交stage1,运行stage1的executor全集群得去拉去各自所需的建材,可以这样理解嘛老师?” 是的,理解是对的~ 完全正确 对于每个map task来说,data、index都存储在本机磁盘,具体目录由spark.local.dir配置项来确定。哪些文件,存储在哪里,尺寸大小,这些meta data,都会由存储系统当中的BlockManager来记录,每个Executors都有自己的BlockManager。各个Executors的BlockManager会向Driver的BlockManagerMaster定期汇报这些meta data。reduce task在尝试拉取data、index文件时,需要通过Executors的BlockManager去拿到这些元信息,然后完成数据拉取。如果Executors的BlockManager没有这些元信息,BlockManager回去找Driver端的BlockManagerMaster,从而拿到全局元信息~

    2022-01-15
    2
    3
  • 猫太太
    您好,讲的很清楚,看完都有点想去去自己看源码了。这句有点不太理解,可以解释一下么:“Reduce 阶段不同于 Reduce Task 拉取数据的过程,往往也被叫做 Shuffle Read。” 请问shuffle read属于reduce阶段么?reduce task拉取数据的过程不包括在reduce阶段么?reduce task拉取数据的过程不是shuffle read么?reduce阶段有什么事情发生呢?谢谢~

    作者回复: 老弟说的是对的~ Reduce Task拉取数据的过程就是Shuffle Read。 这里的写法有问题,我回头让编辑帮忙改下,这里应该是最后的总结部分,改得比较仓促,出typo了,感谢老弟提醒~

    2021-12-05
    3
  • 阿狸弟弟的包子店
    Shuffle算法感觉需要补一补,看评论有hash得,还有sort-base的,还有其他的吗?

    作者回复: 之前有hash shuffle manager和Tungsten shuffle manager,不过现在都统一到sort shuffle manager里面了。hash很早就deprecate掉了,对于文件的消耗太大了;Tungsten shuffle manager,基本运行机制与sort shuffle manager一致,主要是在计算过程中,尽可能地利用了Tungsten的数据结构和内存寻址方式。在计算流程上,Tungsten based shuffle和sort based shuffle是一样的~

    2022-01-27
    2
  • 实数
    这个讲的是hashshuffle是吗,那么第二代的sortshuffle相比较如何呢。是不是两代的shuffle都不能保证全局有序啊

    作者回复: Hash-based shuffle已经deprecate了哈,现在默认的都是Sort-based shuffle。Shuffle的目的不是排序,单纯的Shuffle,做不到全局有序。Sort-based只是shuffle的一种实现方式~ Sort-based实现方式至少有两个收益: 1)为后续可能的Sort Merge Join奠定基础 2)为后续可能的全局排序,奠定基础

    2021-11-29
    2
收起评论
显示
设置
留言
29
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部