Spark 性能调优实战
吴磊
前 FreeWheel 机器学习团队负责人
8808 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 36 讲
Spark 性能调优实战
15
15
1.0x
00:00/00:00
登录|注册

23 | 钨丝计划:Tungsten给开发者带来了哪些福报?

你好,我是吴磊。
通过前两讲的学习,我们知道在 Spark SQL 这颗智能大脑中,“左脑”Catalyst 优化器负责把查询语句最终转换成可执行的 Physical Plan。但是,把 Physical Plan 直接丢给 Spark 去执行并不是最优的选择,最优的选择是把它交给“右脑”Tungsten 再做一轮优化。
Tungsten 又叫钨丝计划,它主要围绕内核引擎做了两方面的改进:数据结构设计和全阶段代码生成(WSCG,Whole Stage Code Generation)。
今天这一讲,我们就来说说 Tungsten 的设计初衷是什么,它的两方面改进到底解决了哪些问题,以及它给开发者到底带来了哪些性能红利。

Tungsten 在数据结构方面的设计

相比 Spark Core,Tungsten 在数据结构方面做了两个比较大的改进,一个是紧凑的二进制格式 Unsafe Row,另一个是内存页管理。我们一个一个来说。

Unsafe Row:二进制数据结构

Unsafe Row 是一种字节数组,它可以用来存储下图所示 Schema 为(userID,name,age,gender)的用户数据条目。总的来说,所有字段都会按照 Schema 中的顺序安放在数组中。其中,定长字段的值会直接安插到字节中,而变长字段会先在 Schema 的相应位置插入偏移地址,再把字段长度和字段值存储到靠后的元素中。更详细的例子我们在第 9 讲说过,你可以去看看。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Tungsten计划为开发者带来了显著的性能提升和优势。该计划主要围绕内核引擎做了两方面的改进:数据结构设计和全阶段代码生成(WSCG)。在数据结构方面,Tungsten采用了紧凑的二进制格式Unsafe Row和基于内存页的内存管理,显著降低了存储开销和GC压力,提升了数据存储效率与GC效率。WSCG则通过生成一份“手写代码”,将所有计算融合为一个统一的函数,消除了操作符之间的虚函数调用和内存随机访问,从而提高了CPU的缓存命中率和工作效率。这些改进为开发者带来了潜在性能收益,降低了存储开销和GC压力,提升了数据存储效率与GC效率,以及提高了CPU cache利用率,减少CPU中断,显著提升CPU利用率。总的来说,Tungsten计划通过优化数据结构和代码生成,解决了存储开销大、GC效率低、CPU cache命中率低等问题,为开发者带来了性能红利。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Spark 性能调优实战》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(18)

  • 最新
  • 精选
  • Fendora范东_
    有个地方没理解 1.onheap内存寻址所说的内存页表,保存的是对象引用到jvm对象地址的映射,那它应该也是一个map结构,它是不是用下面所说的Tungsten.hashmap实现的? 2.我理解不是,下面说的Tungsten.hashmap实例,我理解它是一个完整查询结构:要查某条数据,先计算key的hashcode,然后拿到128位内存地址,然后高64位(堆内对象引用)用于在「内存页表」中查询jvm对象地址,低64位用于在查到的jvm对象中进行偏移计算拿到具体某行数据。 3.如果2我没分析错,那「内存页表」是怎么实现的呢?

    作者回复: 好问题,老弟思考很深入~ 你说的对,内存页表,不是Tungsten.hashmap,更不是普通的Map。它实际上是MemoryBlock数组,也就是MemoryBlock[]。 MemoryBlock,它就是一段连续的内存区域,这个对象本身的引用,就是这块内存区域的起始地址。这个对象有个属性,length,它记录了这块内存区域有多大。 简单理解,你可把MemoryBlock当成一个超级大的字节数组,这个字节数组,就是所谓的“连续内存区域”。数组的起始地址,也即是index为0的地址,就是MemoryBlock这个对象的引用。 再说回Tungsten 128位地址,前64位,你可以理解成MemoryBlock[],也即是MemoryBlock数组的下标,用来寻址MemoryBlock;而后64位,你可以理解成定位到的MemoryBlock这个字节数组内部的偏移地址,来定位你的Unsafe Row,或是其他数据结构。

    2021-05-05
    4
    17
  • 斯盖丸
    老师,tungsten内存页这块还是看得我很迷糊。我试着用我自己的语言复述下你看下对不对。 假设我有一张一百万行的表,用Tungsten内存,那就是会把这一百万分散到几十个内存页里去是吗,也就是一个内存页存了几万行? 其次,这几十个内存页里前64位都存着key value的键值对(共一百万个),后64位存偏移指针。其中key就是每一行的hash,row就是表的每一行对吗? 最后,如果我想找id=3的那行数据(假设id唯一),那就再去内存页的后64位找偏移量为3,也就是指针挪动3个单位,来确定最终要找的那一行是吗? 感觉自己都不能自圆其说了,要是面试这么回答估计得挂,求老师帮忙看看错在哪里…

    作者回复: 好问题,这里确实有点绕,我们从头来说~ 首先,一般来说,Tungsten的Page Table,也就是内存页,不会直接(加粗)存储数据条目,说白了就是数据表中的行(Rows),Tungsten的Page Table,存储的往往是Shuffle Map阶段计算过程中用到的各类数据结构, 比如AppendOnlyMap、PairBuffer等等。不过,这些数据结构,他们都会携带(Carry)数据行,也就是把数据行当做是Payload。通常来说,这些Map类型的数据结构,他们的Key往往是Join Key,而Payload,往往就是数据行。这是其一。 然后,对于这些Map类型的数据结构,比如AppendOnlyMap、或是PairBuffer,在Tungsten机制下,他们的实现,就是用文中说的Tungsten HashMap来实现的。具体的实现方式是,一个数组用来存储(Hash Code,Pointer),Hash code就是Map当中Key的哈希值,而Pointer,就是Tungsten内存地址,也就是128位的地址,其中前64位是Memory Page也即内存页地址,而后64位就是定位到具体(Key、Value)对的引用。也就是说,这个Pointer,会定位到具体的数据条目(数据行)。 总结下来,Tungsten机制下,消耗内存的数据结构是HashMap,HashMap中的Pointer是内存地址,内存地址用来定位内存页和具体的数据条目,而每个内存页都是一个JVM Object,因此,就像你说的,一百万行的数据,会被分散到几十个内存页,每个内存页存储几万条数据条目。不知道这么说,能解答你的疑问吗? 关于这部分:“最后,如果我想找id=3的那行数据(假设id唯一),那就再去内存页的后64位找偏移量为3,也就是指针挪动3个单位,来确定最终要找的那一行是吗?”。 这个是不对的,内存地址的偏移量和你的数据条目本身没有关系,也就是说,你不能用id=3去寻址。这里的偏移地址,指的是,相比Memory Page的偏移地址,Tungsten会寻址到某一条Unsafe Row,而Unsafe Row里面的字段,比如包括了id、name、age等等,这个跟寻址没有关系,Tungsten寻址到Unsafe Row,会根据Schema和二进制序列化规则,自行去反序列化所需的字段,Unsafe Row内部的数据访问,和寻址已经没有关系了。这是两个层级的事情。 简单来说,寻址是去定位到Unsafe Row;而找到Unsafe Row,怎么去获取其中的字段,那是另外一回事了,结合Schema就可以搞定~

    2021-06-10
    13
  • kingcall
    哈哈,昨天看了一遍没懂,然后去补了点知识 1 java unsafe 2 虚拟内存管理 3 spark 官网关于Tungsten的介绍,今天又来了! 回答:关于sort 为了更好的利用CPU 的多级缓存,Tungsten 做了关于类似pointer-key 作为元素的数组,从而避免在主存里面随机读取数据进行排序,从而可以更好的利用缓存,其实这就是Tungsten 的第二点,然而这一点老师没有介绍,估计是在这等着的吧,哈哈!

    作者回复: 哈哈,没错,这里挖了个坑,老弟成功地填上了~ 赞👍 机智如你~ 赞锲而不舍的钻研精神~ 666

    2021-05-07
    2
    5
  • Sean
    老师提到,开启了堆外之后,Spark在运行时会优先使用堆外,堆外不够再回退到堆内。我理解为这个任务一共生成了1000个task,每个task100m,堆外内存是6G,堆内2G,在执行到第50个task时,发现堆内还剩下40m,则剩下的所有task,960个都会走堆内内存,即时堆外50个task占用的内存已经释放,依然不会被使用,不知道这样理解对不对

    作者回复: 好问题~ 目前比较遗憾,堆外、堆内的划分,不是以Task为粒度的,而是以作业为粒度。换句话说,如果开启了堆外内存,在一个作业内,对于所有的Tasks,它只会尝试使用堆外内存,而不会去使用堆内内存,这个是由现在的实现机制(MemoryManager指定内存模式,而所有TaskMemoryManager继承了MemoryManager的内存模式,而MemoryManager的作用范围,是整个作业)决定的。 因此,这也是为什么Spark社区不鼓励开启堆外,一方面因为隐患比较大,对于作业稳定性影响不好;再者,在经过Tungsten优化之后,堆内上面的执行性能,一点不比堆外差。 如果非要开启堆外,也是可以的,不过这可能就需要对内存占用有个比较精确的估计。

    2021-09-04
    3
  • keeprun
    老师好,最近在看groupBy的Aggregation策略的选择,包含Hash-based Aggregation(spark 2.2.0后增加了Object-Hash-based Aggregation)和Sort-based Aggregation。其中能否使用Hash-based Aggregation的判断条件主要是UnsafeRow.isMutable(field.dataType()),主要是定长的数据类型,看到注释中提到,数据能够就地更新(Field types that can be updated in place in UnsafeRows)。是否主要是效率考虑(之前第9节有说明UnsafeRow的存储方式,定长的数据按顺序存储在字节数组中,而变长的字段需要通过offset来记录。)还是其他有其他原因?麻烦解惑。

    作者回复: 老弟研究得挺深入,赞一个~ 👍 先来说HashAggregate,对于一般的数据类型来说,也就是Primitive的类型,比如int、double、float这种,Spark默认会采用HashAggregate来实现聚合计算。如果聚合的目标是对象,比方说String,那Spark就会退化到ObjectHashAggregate,来完成计算。原因很简单,HashAggregate并不支持对象类型。 另外,这二者都是用内存数据结构,来完成聚合计算,当内存不足的时候,或者Key的数量,大于一定数值的时候,这两种实现都会退化到SortAggregate,其实这也好理解,内存不足,自然需要溢出。 在Spark中,溢出的处理往往是外排,也就是先把内存中的数据排序,再溢出,最后所有溢出文件与内存中数据的聚合,再用Sort Merge来完成。因此,一旦内存不足,涉及到溢出,聚合操作自然退化到SortAggregate。 我们知道,排序往往会消耗额外的CPU和内存,因此,相比前两者,SortAggregate的性能一定更差。 关于语句UnsafeRow.isMutable(field.dataType()),现在的Spark中,会强制Aggregate的计算,要利用Tungsten的数据结构,比方说UnsafeRow,Tungsten HashMap等等。这里主要是判定字段的数据类型,是不是Primitive的,如果是,才能用HashAggregate,如果字段是诸如String类型的字段,就得退化到刚刚说的ObjectHashAggregate~ 大体上就是这些,希望对老弟有所帮助哈~

    2021-11-27
    2
  • wow_xiaodi
    老师,有个问题,java的hashmap对于哈希冲突的元素可以通过遍历链表来定位到目标对象,那么tungsten.hashmap的value存放的却是一个128位内存地址,那么此时遇到哈希冲突,他是怎么解决的呢?是先根据128位的内容去寻址内存页的开始位置,然后一直遍历下去吗?

    作者回复: 好问题,这里为了突出说明Tungsten HashMap的优势,我们简化了一些细节,并没有提哈希冲突的问题。 实际上,Tungsten HashMap解决冲突的方式,跟传统Java HashMap并没有本质区别,也是用链表来存储多个内存地址,从而解决冲突的问题。

    2021-08-12
    2
  • Stony.修行僧
    学到很多,也参照了 《learn spark》,性能优化提高了不少,从好几个个小时job 优化到5分钟

    作者回复: 太赞了👍,老弟V5~

    2021-05-07
    2
    2
  • zxk
    问题二:Spark SQL 解析为语法树后,在不使用 Expression Codegen 的情况下,表达式节点每次执行都需要进行 Spark 内部的一些相关操作(如做一些操作类型匹配),那么 Spark 自身机制的开销可能大于我们需要执行的计算的开销,因此需要 Expression Codegen 对表达式进行代码生成,此时侧重于对表达式自身的优化;而 WSCG 则侧重与多个函数之间的合并,两者侧重点并不相同。 这里有几个疑问想请教下老师: 1. Tungsten 在堆内采用了 8 字节表示 Java Object,这跟 64 位 JVM 可以对应上,但 64 位 JVM 是有指针压缩机制的,这个对于 Tungsten 是否生效 2. Tungsten 在堆外有 64 位空间浪费了,为何 Spark 社区不针对堆内堆外区分处理,而是采用统一管理的方式?

    作者回复: 表达式Codegen,和WSCG的回答没问题,两者一个局部,一个全局。实际上,WSCG在执行过程中,会利用到局部的表达式Codegen,两者是部分和整体的关系。 关于几个疑问: 1. 会生效。尽管Tungsten设计了自己的数据结构,比如Unsafe Row,比如HashMap,但在实现机制上,(如果是堆内内存),仍然逃不脱JVM机制的管控,比如GC效率还是跟对象数成反比,再比如你说的指针压缩机制,都是同样适用的。 2. 好问题,确实有64位是null,但是使用统一的内存地址抽象,方便Spark对于内存的统一管理。不论堆内还是堆外,使用Tungsten地址可以做到统一寻址,在代码项目的实现与维护上更加高效,避免仅仅因为内存空间的不同,就需要实现并维护两套不同的代码。

    2021-05-05
    2
  • 斯盖丸
    老师,请问On heap寻址里的Object引用和偏移地址分别对应的是什么?Object引用是一条Row,偏移地址里是Row的一个字段或者说是列吗?

    作者回复: 不是哈,Object引用对应的是内存页(Memory Page)地址,通过Object引用来寻址内存页,而偏移地址,你可以理解成:内存页里面的Unsafe Row的起始地址。至于说Unsafe Row内部的数据列如何寻址、访问,这个就是Unsafe Row二进制字节序列本身的事情了,就是定长字段按序访问、变长字段先得到Unsafe Row内的Offset,再去拿字段长度和具体内容,比如字符串“Mike”。 一个是128位Tungsten地址的偏移地址,一个是Unsafe Row内部的偏移地址,虽然都叫Offset,但是含义完全不同哈~

    2021-05-05
    2
  • Unknown element
    老师问下投影是什么?我看执行计划里好像就是选出需要的字段?

    作者回复: 对,projection,就是列剪枝,选出需要的字段,实际上就是这个意思,只不过名字听起来高大上一点,哈哈

    2021-11-04
    1
收起评论
显示
设置
留言
18
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部