06 | Shuffle管理:为什么Shuffle是性能瓶颈?
什么是 Shuffle
- 深入了解
- 翻译
- 解释
- 总结
本文深入探讨了Spark中Shuffle的重要性以及为什么Shuffle成为性能瓶颈。通过生动的比喻和具体的示例,使得抽象的技术概念变得易于理解。首先通过“工地搬砖”的比喻,直观解释了Shuffle的概念,将不同类型的砖头在分公司之间搬运的过程与分布式计算中的Shuffle进行了类比。随后,阐述了Shuffle的工作原理,以Word Count的例子为例,详细分析了reduceByKey算子中涉及的Shuffle操作。文章指出,Shuffle是Map阶段与Reduce阶段之间的数据交换,强调了Shuffle在分布式计算中的关键作用。最后,文章提出了计算过程中为何需要引入Shuffle操作的原因,强调了在绝大多数的业务场景中,Shuffle操作都是必需的、无法避免的。通过本文,读者可以深入了解Shuffle在Spark中的重要性和工作原理,以及为何Shuffle成为性能瓶颈。
《零基础入门 Spark》,新⼈⾸单¥59
全部留言(29)
- 最新
- 精选
- Geek_2dfa9a置顶官网配置文档 https://spark.apache.org/docs/3.1.2/configuration.html Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. Note: This will be overridden by SPARK_LOCAL_DIRS (Standalone), MESOS_SANDBOX (Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager. 临时目录,用来存map output文件(shuffle产生)和save RDD到磁盘的时候会用到。应该配置成快速的本地磁盘。支持‘,’分隔的多个目录。 注意:这个配置会被SPARK_LOCAL_DIRS(Standalone部署),MESOS_SANDBOX(Mesos),LOCAL_DIRS (YARN)替换。 既然shuffle会带来很高的开销,除了优化driver程序也可以考虑优化系统配置。 首先/tmp是会被系统清理的(取决于不同linux分发版的清理策略),如果作业运行时/tmp中的文件被清除了,那就要重新shuffle或 重新缓存RDD(这块没有仔细研究,简单猜测下缓存失效策略是重新缓存),因此,不适合将配置spark.local.dir设置为默认的/tmp 其次,部署文档有提该配置项支持多个目录,那可以考虑配置多块硬盘(或者SSD),再把挂载不同硬盘的目录配置到spark.local.dir, 这样可以显著提升shuffle和RDD缓存的性能。请大家指教。
作者回复: 完美,满分💯!置顶🔝 老弟功底十分扎实~👍👍👍
2021-09-22737 - qinsi看到哈希那边有个问题,就是遇到不均匀的数据会怎么样?比如对这篇论文执行word count:https://isotropic.org/papers/chicken.pdf 原本可能指望所有工人一起搬砖,结果发现只有一个工人在搬砖? 另外前几讲说过大数据的精髓在于数据不动代码动,但这讲说还是无法避免shuffle搬运数据,这个要怎么理解?
作者回复: 好问题~ 第一个问题实际上,就是数据倾斜,data skew,倾斜会导致你说的,闲的闲死、忙的忙死,忙的那个拖累作业整体性能。两种思路,一个是用spark3.0的AQE,join自动倾斜处理。另一个是手工加盐。这两种方法,其实在《性能篇》都有详细的介绍。稍后我把那边比较核心的讲解,给你贴过来。这会在地铁上,不好操作。 第二个,其实是两个层面的事情。一个是调度系统,说的是代码调度,调度到数据所在的地方。而shuffle呢,数据移动是刚需,是计算逻辑需要。换句话说,这个时候,代码动不动,数据都要动。这个其实已经超出调度系统范畴,纯粹是计算逻辑需要。两个层面的问题哈~
2021-09-23212 - Unknown element老师您在shuffle read部分说“不同的 Reduce Task 正是根据 index 文件中的起始索引来确定哪些数据内容是属于自己的”,这一步具体是怎么实现的呢?以文中的index文件举例,文中的index文件是0,3,7,那么不同的reduce task是各自有一个编号,然后按编号大小顺序确定自己应该拉取哪一部分数据吗?比如编号为0的reduce task拉取index文件的第一个索引到第二个索引之间的数据,也就是index为0,1,2的数据;编号为1的reduce task拉取index文件的第二个索引到第三个索引之间的数据,也就是index为3,4,5,6的数据;编号为2的reduce task拉取index文件的第三个索引到最后的数据,也就是index为7,8,9的数据?这样的话如果map task计算出来没有数据应该被发到第二个reduce task那index文件是0,3,3吗?
作者回复: 好问题~ 是的, 就像你分析的那样,如果map task计算阶段,发现没有某些reduce task的数据,那么index文件中的索引,就会一直顺延~ 思考的很深入,赞👍~
2021-10-0810 - Unknown element老师您好 我有两个问题想问下: 1. 对于所有 Map Task 生成的中间文件,Reduce Task 需要通过网络从不同节点的硬盘中下载并拉取属于自己的数据内容 那不同的reduce task是怎么知道哪些内容是属于自己的呢?比如对于文中的例子,reduce阶段的3个任务怎么知道自己应该拉取中间文件的哪些记录? 2. 对于评论区AIK的问题,您说shuffle过程不是数据交换,而是数据流转,那意思是在map阶段 所有将要执行reduce task的节点都是空闲的吗(等待map task生成shuffle中间文件)?那他们是不是在这个stage的整个计算过程中都是空闲的?这样的话岂不是没有发挥出集群的最大算力?
作者回复: 好问题~ 先说第一个,这个就是index文件的作用,它记录的就是隶属于不同Reduce task的数据索引,Reduce task基于这些索引来判断,哪些数据属于他。举例来说,Reduce task 3,那么data文件中,index从3到4之间的数据,就是属于这个Reduce task 3的。 第二个,好问题。这个其实是“静态思维”惹的祸。要知道,不管是Map task,还是Reduce task,消耗的计算资源,都是同一个集群、同样节点上同样的一批Executors。 Reduce tasks对于Map tasks是有依赖的,同一批Executors,执行完Map tasks之后,数据落盘到了spark.local.dir配置的目录。接下来,还是这同一批Executors,启动Reduce tasks,跨网络去不同节点上拉取属于自己的数据。 因此,Executors一直没闲着,不存在资源浪费的问题。要动态地看待Map & Reduce过程,他们在时间线上,是有前后关系的。而所有任务,消耗的都是同一批硬件资源~
2021-09-2936 - 钟强如果集群中只有一个executor, 但是executor上面有多个map task, 这样的环境是不是不需要shuffle?
作者回复: shuffle与executors数量无关哈,即便是一个executors,像groupByKey、join、reduceByKey这些操作,照样会引入shuffle,只不过shuffle都在同一个executors发生,省去了网络I/O的开销,但是磁盘开销还是会有
2022-03-083 - Ebdaoli磊哥,关于 spark shuffle write 阶段有个问题不太理解,①:Maptask的 任务数的并行度由什么来决定的?根据文件大小来切分划分的吗? ②:为什么最终数据需要进行 sort?合并的方式选择的是 归并排序?
作者回复: 一个个来看哈 1)Map task,其并行度由其Stage中的首个RDD决定,如果Map task是读取HDFS,那么并行度就是分布式文件block数量; 2)是的,中间文件的合并,可以理解为归并排序;另外,对于Join策略,Spark通常默认选取SMJ,因此Sort有利于后期做数据关联
2022-03-063 - A因rdd得依赖属性shuffle划分了两个stage0和1 运行stage0的executor产生的数据称作建材,结束后driver继续提交stage1,运行stage1的executor全集群得去拉去各自所需的建材,可以这样理解嘛老师? 那stage0产生的临时data、index是记录在哪里?如何返回给driver的呢?以及stage1提交时是如何获取的呢? 目前想到的是重新封装但是又说不过去 还望老师给条路,自己再去研究一下! 感谢老师!
作者回复: Quote “运行stage0的executor产生的数据称作建材,结束后driver继续提交stage1,运行stage1的executor全集群得去拉去各自所需的建材,可以这样理解嘛老师?” 是的,理解是对的~ 完全正确 对于每个map task来说,data、index都存储在本机磁盘,具体目录由spark.local.dir配置项来确定。哪些文件,存储在哪里,尺寸大小,这些meta data,都会由存储系统当中的BlockManager来记录,每个Executors都有自己的BlockManager。各个Executors的BlockManager会向Driver的BlockManagerMaster定期汇报这些meta data。reduce task在尝试拉取data、index文件时,需要通过Executors的BlockManager去拿到这些元信息,然后完成数据拉取。如果Executors的BlockManager没有这些元信息,BlockManager回去找Driver端的BlockManagerMaster,从而拿到全局元信息~
2022-01-1523 - 猫太太您好,讲的很清楚,看完都有点想去去自己看源码了。这句有点不太理解,可以解释一下么:“Reduce 阶段不同于 Reduce Task 拉取数据的过程,往往也被叫做 Shuffle Read。” 请问shuffle read属于reduce阶段么?reduce task拉取数据的过程不包括在reduce阶段么?reduce task拉取数据的过程不是shuffle read么?reduce阶段有什么事情发生呢?谢谢~
作者回复: 老弟说的是对的~ Reduce Task拉取数据的过程就是Shuffle Read。 这里的写法有问题,我回头让编辑帮忙改下,这里应该是最后的总结部分,改得比较仓促,出typo了,感谢老弟提醒~
2021-12-053 - 阿狸弟弟的包子店Shuffle算法感觉需要补一补,看评论有hash得,还有sort-base的,还有其他的吗?
作者回复: 之前有hash shuffle manager和Tungsten shuffle manager,不过现在都统一到sort shuffle manager里面了。hash很早就deprecate掉了,对于文件的消耗太大了;Tungsten shuffle manager,基本运行机制与sort shuffle manager一致,主要是在计算过程中,尽可能地利用了Tungsten的数据结构和内存寻址方式。在计算流程上,Tungsten based shuffle和sort based shuffle是一样的~
2022-01-272 - 实数这个讲的是hashshuffle是吗,那么第二代的sortshuffle相比较如何呢。是不是两代的shuffle都不能保证全局有序啊
作者回复: Hash-based shuffle已经deprecate了哈,现在默认的都是Sort-based shuffle。Shuffle的目的不是排序,单纯的Shuffle,做不到全局有序。Sort-based只是shuffle的一种实现方式~ Sort-based实现方式至少有两个收益: 1)为后续可能的Sort Merge Join奠定基础 2)为后续可能的全局排序,奠定基础
2021-11-292