Spark性能调优实战
吴磊
FreeWheel机器学习团队负责人
新⼈⾸单¥59.9
1358 人已学习
课程目录
已更新 23 讲 / 共 32 讲
0/4登录后,你可以任选4讲全文学习。
课前必学 (3讲)
开篇词 | Spark性能调优,你该掌握这些“套路”
免费
01 | 性能调优的必要性:Spark本身就很快,为啥还需要我调优?
02 | 性能调优的本质:调优的手段五花八门,该从哪里入手?
原理篇 (5讲)
03 | RDD:为什么你必须要理解弹性分布式数据集?
04 | DAG与流水线:到底啥叫“内存计算”?
05 | 调度系统:“数据不动代码动”到底是什么意思?
06 | 存储系统:空间换时间,还是时间换空间?
07 | 内存管理基础:Spark如何高效利用有限的内存空间?
通用性能调优篇 (12讲)
08 | 应用开发三原则:如何拓展自己的开发边界?
09 | 调优一筹莫展,配置项速查手册让你事半功倍!(上)
10 |调优一筹莫展,配置项速查手册让你事半功倍!(下)
11 | Shuffle的工作原理:为什么说Shuffle是一时无两的性能杀手?
12 | 广播变量(一):克制Shuffle,如何一招制胜!
13 | 广播变量(二):有哪些途径让Spark SQL选择Broadcast Joins?
14 | CPU视角:如何高效地利用CPU?
15 | 内存视角(一):如何最大化内存的使用效率?
16 | 内存视角(二):如何有效避免Cache滥用?
17 | 内存视角(三):OOM都是谁的锅?怎么破?
18 | 磁盘视角:如果内存无限大,磁盘还有用武之地吗?
19 | 网络视角:如何有效降低网络开销?
Spark SQL 性能调优篇 (3讲)
20 | RDD和DataFrame:既生瑜、何生亮
21 | Catalyst逻辑计划:你的SQL语句是怎么被优化的?(上)
22 | Catalyst物理计划:你的SQL语句是怎么被优化的(下)?
Spark性能调优实战
15
15
1.0x
00:00/00:00
登录|注册

22 | Catalyst物理计划:你的SQL语句是怎么被优化的(下)?

吴磊 2021-05-03
你好,我是吴磊。
上一讲我们说了,Catalyst 优化器的逻辑优化过程包含两个环节:逻辑计划解析和逻辑计划优化。逻辑优化的最终目的就是要把 Unresolved Logical Plan 从次优的 Analyzed Logical Plan 最终变身为执行高效的 Optimized Logical Plan。
但是,逻辑优化的每一步仅仅是从逻辑上表明 Spark SQL 需要“做什么”,并没有从执行层面说明具体该“怎么做”。因此,为了把逻辑计划交付执行,Catalyst 还需要把 Optimized Logical Plan 转换为物理计划。物理计划比逻辑计划更具体,它明确交代了 Spark SQL 的每一步具体该怎么执行。
物理计划阶段
今天这一讲,我们继续追随小 Q 的脚步,看看它经过 Catalyst 的物理优化阶段之后,还会发生哪些变化。

优化 Spark Plan

物理阶段的优化是从逻辑优化阶段输出的 Optimized Logical Plan 开始的,因此我们先来回顾一下小 Q 的原始查询和 Optimized Logical Plan。
val userFile: String = _
val usersDf = spark.read.parquet(userFile)
usersDf.printSchema
/**
root
|-- userId: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- gender: string (nullable = true)
|-- email: string (nullable = true)
*/
val users = usersDf
.select("name", "age", "userId")
.filter($"age" < 30)
.filter($"gender".isin("M"))
val txFile: String = _
val txDf = spark.read.parquet(txFile)
txDf.printSchema
/**
root
|-- txId: integer (nullable = true)
|-- userId: integer (nullable = true)
|-- price: float (nullable = true)
|-- volume: integer (nullable = true)
*/
val result = txDF.select("price", "volume", "userId")
.join(users, Seq("userId"), "inner")
.groupBy(col("name"), col("age")).agg(sum(col("price") * col("volume")).alias("revenue"))
result.write.parquet("_")
两表关联的查询语句经过转换之后,得到的 Optimized Logical Plan 如下图所示。注意,在逻辑计划的根节点,出现了“Join Inner”字样,Catalyst 优化器明确了这一步需要做内关联。但是,怎么做内关联,使用哪种 Join 策略来进行关联,Catalyst 并没有交代清楚。因此,逻辑计划本身不具备可操作性。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Spark性能调优实战》,如需阅读全部文章,
请订阅文章所属专栏新⼈⾸单¥59.9
立即订阅
登录 后留言

精选留言(3)

  • kingcall
    回答问题:
    个人认为没有必要:因为被广播出去的数据集合都很小,可以通过Hash 或者 Nested Loop 来实现,而这二者之间的区别在于
    1. Hash 主要适用于等值关联
    2. Nested Loop 就可以用来实现笛卡尔积或者是不等值关联
    提问:
    Sort Merge 个人认为是在大数据量下催生出来的一个解决方案,但是想不通为什么在大数量的情况下,> 或者是< 的这种关联为什么会退化成CartesianProduct,而不是SortMergeJoin
    因为我觉得这个时候使用SortMergeJoin还是比CartesianProduct好的,除非是!= 这样的关联使用CartesianProduct,当然spark 可能是考虑到了排序的成本
    2021-05-03
    2
  • Fendora范东_
    一、从原理和复杂度看(假设单节点大表规模m,小表规模n)
    1、数据分发:
          broadcast把整个小表发送到大表数据所在节点;
          shuffle大小表按照同样的分区方式、数进行数据重新 划分。
    2、join实现
        hash join小表建立hash表,大表遍历;(时间复杂度为构建hash表时间O(n)+大小表比较时间O(m)*O(1),空间复杂依赖hash map)
        sort merge join 先排序,然后采用MergeSort中合并操作进行比较;(时间复杂度为 排序时间max{O(m*lgm),O(n*lgn)} + 大小表比较时间O(m+n),其实时空复杂度依赖具体排序算法。。。)
        nested loop join大小表双层for循环依次比较。O(mn)

    二、BSMJ分析(小表总规模N)
    1.实现
       1.1小表先broadcast,所有节点再分别进行排序、合并。
       1.2小表先排序再broadcast,最后两表进行合并。
    2.原因
       2.1先broadcast,再排序,最坏时相比BNLJ多了每个节点的O(N*lgN)小表排序耗时;最好时max{O(N*lgN),O(m*lgm)}+O(M+N)不见得就一定比O(Mn)效果好。
       2.2如果先排序,那driver端就需要排序耗时O(N*lgN),driver极有可能是整个集群的瓶颈。

    磊哥看看哪里有问题没
    2021-05-03
  • Z宇锤锤
    Broadcast Sort Merger Join中Broadcast指的是数据分发的方式,SMB指的是Join实现机制。
    Sort Merge Join的原理是将两张表的数据按照相同的分区算法,分发到各个Executor上。如果使用Broadcast传输,被广播的表会先在Executor端进行数据的拆分,拆分完成以后,所有的分区会被Collect到Driver端,再向每一个Executor分发完整数据。这使被广播的表数据即便拆分了,还是被聚合分发,浪费时间。
    2021-05-03
收起评论
3
返回
顶部