零基础入门 Spark
吴磊
前 FreeWheel 机器学习研发经理
19171 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 38 讲
零基础入门 Spark
15
15
1.0x
00:00/00:00
登录|注册

07 | RDD常用算子(二):Spark如何实现数据聚合?

你好,我是吴磊。
积累了一定的理论基础之后,今天我们继续来学习 RDD 常用算子。在RDD 常用算子(一)那一讲,我们讲了四个算子 map、mapPartitions、flatMap 和 filter,同时留了这样一道思考题:“这些算子之间,有哪些共同点?”
今天我们就来揭晓答案。首先,在功能方面,这 4 个算子都用于 RDD 内部的数据转换,而学习过 Shuffle 的工作原理之后,我们不难发现,这 4 个算子当中,没有任何一个算子,会引入 Shuffle 计算。
而今天我们要学习的几个算子则恰恰相反,它们都会引入繁重的 Shuffle 计算。这些算子分别是 groupByKey、reduceByKey、aggregateByKey 和 sortByKey,也就是表格中加粗的部分。
我们知道,在数据分析场景中,典型的计算类型分别是分组、聚合和排序。而 groupByKey、reduceByKey、aggregateByKey 和 sortByKey 这些算子的功能,恰恰就是用来实现分组、聚合和排序的计算逻辑。
RDD算子分类表
尽管这些算子看上去相比其他算子的适用范围更窄,也就是它们只能作用(Apply)在 Paired RDD 之上,所谓 Paired RDD,它指的是元素类型为(Key,Value)键值对的 RDD。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入介绍了Spark中常用的数据聚合算子,包括groupByKey、reduceByKey、aggregateByKey和sortByKey。通过清晰的示例和解释,读者能够快速掌握这些算子的用法和计算过程。文章指出了groupByKey算子执行效率较差的问题,并重点回顾了reduceByKey和aggregateByKey算子的灵活性和功能性。此外,还介绍了sortByKey算子用于按照Key进行排序。总的来说,本文帮助读者了解了Spark中实现数据聚合的方式,为开发数据分析应用提供了重要基础。通过掌握这些算子的用法与原理,读者能够游刃有余地开发数据分析应用,为他们在实际开发中应用这些算子提供了指导和帮助。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《零基础入门 Spark》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(9)

  • 最新
  • 精选
  • Geek_2dfa9a
    置顶
    最近公司在搞黑客马拉松,我忙于做一个数仓血缘图计算的项目来晚啦。 reduceByKey和aggregateByKey底层使用了同一个方法实现:combineByKeyWithClassTag,该方法是将KV型的RDD[(K, V)]转换为RDD[(K, C)], 类似于分组聚合,既然要找到reduceByKey和aggregateByKey的联系那肯定要从下至上由共性开始分析,combineByKeyWithClassTag方法声明如下: def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { } 字数限制了,我把方法实现放在下面评论。 首先讲三个高阶函数入参:createCombiner,mergeValue,mergeCombiners数,在方法里组成了Aggregator对象,Aggregator 其实就是spark对分组聚合(Shuffle)操作的抽象,如果不清楚spark分组聚合的过程这三个高阶函数不好理解,简单点讲,RDD按照Key分组后因为 不同Partition里会有相同Key,因此对于Key=k1这个大组会有多个小组(k11,k12...k1n),首先createCombiner会给k11,k12...k1n) 每个小组赋一个初始值C0,然后mergeValue把小组内的每个记录叠加给初始值得到一个小组值C00(其实就是map端聚合),最后再把所有小组的 小组值合并成一个KV型RDD(注意这里V已经变成了C类型)。 再讲参数partitioner,了解RDD的话应该清楚,就是这个RDD的分区规则,这里的入参就是聚合后RDD的分区规则,如果相同的话,那Shuffle就完全不需要了, 直接task本地聚合一下就好了,源码里也就直接mapPartitions就结束了,如果聚合前后分区规则不相同的话,那么就会返回一个ShuffledRDD。 最后讲下参数mapSideCombine和serializer,mapSideCombine就是是否在map端聚合,方法开头的校验可以看到keyClass是数组时不支持 map端聚合和哈希分区,这里是为什么呢?不太熟悉scala的我查了下stackoverflow,原来scala里的数组和Java的一样,hashcode只是数组对象本身的 hashcode,和内容没关系,那自然没办法map端聚合了, 这里请问老师,那数组作为KVRDD的时候,reduce端的聚合是怎么完成判等的呢? serializer是指出数据如何序列化的,序列化就先不说了,不然又要讲好多。 最后总结下,reduceByKey和aggregateByKey底层实现完全相同,都是combineByKeyWithClassTag,只不过reduceByKey调用 combineByKeyWithClassTag的入参mergeValue和mergeCombiners是相等的,aggregateByKey是用户指定可以不等的,也就是说 reduceByKey是一种特殊的aggregateByKey。

    作者回复: 完美,满分💯!置顶🔝 数组的hashcode,其实就是它的内存地址,就像你说的,跟内容无关。因此,它没有办法利用hashPartitioner完成分发,所以说,key是数组的rdd,应用范围非常有限。 除了hashPartitioner,Spark还支持rangePartitioner,这种分区器,不需要hashcode,只要实现了comparator(能比较)就行。所以即便非要用hashcode来排序,也ok。这个时候,内容是否一样,已经不重要了。 reduce阶段需要聚合的场景,一定是需要hashPartitioner的,由于数组不支持hashPartitioner,它也就走不到reduce那一步,没有这样的code path~

    2021-09-26
    3
    20
  • Geek_2a0deb
    reduceByKey 和 aggregateByKey的区别在于reduceByKey在map端和reduce时的聚合函数一致,而aggregateByKey在map端和reduce端聚合函数可以不一致,联系就是reduceByKey可以认为是一种特殊的aggregateByKey(map和reduce是同一个函数)如果用算子来描述:reduceByKey(f)=aggregateByKey(初始值) (f,f)

    作者回复: 没错,满分~ 💯

    2022-01-11
    1
  • Botanic
    ``` # 使用 aggregateByKey 来实现 reduceByKey def f1(x, y): # 显示定义Map阶段聚合函数f1,求加和 return x+y import random # 实验3:aggregateByKey 使用说明 textFile = SparkContext().textFile("../wikiOfSpark.txt") wordCount = ( textFile.flatMap(lambda line: line.split(" ")) .filter(lambda word: word != "") .map(lambda word: (word, 1)) .aggregateByKey(0, f1, f1) .sortBy(lambda x: x[1], False) .take(5)) print(wordCount) ```

    作者回复: 满分,💯,赞👍~

    2021-12-22
  • pythonbug
    我感觉aggregateByKey直接作用在刚刚读取数据的RDD上的情况很少,因为刚刚从数据源读取出来的数据分区大多数时候是没啥业务含义的,所以Map阶段的聚合也没有太大意义。所以猜测,aggregateByKey可能大多数情况是跟在reduceByKey之后,那个时候已经对数据按照业务分区好了。那个时候Map阶段的聚合才有一些意义,不知道猜的对不对

    作者回复: 没什么关系哈,aggregateByKey可以理解成是reduceByKey的“升级版”,功能更灵活,能够应对的计算场景更多~ 实际上,reduceByKey完全可以用aggregateByKey来实现~

    2021-11-14
  • 十年
    请问老师,aggregateByKey的初始值有什么作用?

    作者回复: 数值的话,可以指定非零初始值,字符串的话,可以指定任意前缀字符串~ 主要看业务需要了

    2021-10-07
    2
  • 钱鹏 Allen
    reduceByKey 和 aggregateByKey的联系:将相同的key值进行聚合。不同点:reduceByKey()采用的是相同的func,在map阶段使用sum操作,reduce阶段采用max操作就不满足。 aggregateByKey可以看做是更一般的reduceByKey,

    作者回复: 对的~

    2021-09-28
  • J
    讲解aggregateByKey计算过程时,图例错写成了“reduceByKey计算过程”
    2022-08-07归属地:湖北
    1
  • 睿晞
    aggregateByKey算子中第一个默认参数的使用方法是什么啊?是直接参与到第二个聚合函数(reduce端)里面运算吗?比如,默认参数是0,然后如果第二个聚合函数是max求最大值,初始默认参数是参与比较的,用0和每个字段中的值比较,是这个意思吗?
    2022-04-27
  • Spoon
    Java实现代码 https://github.com/Spoon94/spark-practice/blob/master/src/main/java/com/spoon/spark/core/AggOpJob.java
    2022-03-27
    1
收起评论
显示
设置
留言
9
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部