作者回复: 好问题,老弟思考很深入~ 你说的对,内存页表,不是Tungsten.hashmap,更不是普通的Map。它实际上是MemoryBlock数组,也就是MemoryBlock[]。 MemoryBlock,它就是一段连续的内存区域,这个对象本身的引用,就是这块内存区域的起始地址。这个对象有个属性,length,它记录了这块内存区域有多大。 简单理解,你可把MemoryBlock当成一个超级大的字节数组,这个字节数组,就是所谓的“连续内存区域”。数组的起始地址,也即是index为0的地址,就是MemoryBlock这个对象的引用。 再说回Tungsten 128位地址,前64位,你可以理解成MemoryBlock[],也即是MemoryBlock数组的下标,用来寻址MemoryBlock;而后64位,你可以理解成定位到的MemoryBlock这个字节数组内部的偏移地址,来定位你的Unsafe Row,或是其他数据结构。
作者回复: 好问题,这里确实有点绕,我们从头来说~ 首先,一般来说,Tungsten的Page Table,也就是内存页,不会直接(加粗)存储数据条目,说白了就是数据表中的行(Rows),Tungsten的Page Table,存储的往往是Shuffle Map阶段计算过程中用到的各类数据结构, 比如AppendOnlyMap、PairBuffer等等。不过,这些数据结构,他们都会携带(Carry)数据行,也就是把数据行当做是Payload。通常来说,这些Map类型的数据结构,他们的Key往往是Join Key,而Payload,往往就是数据行。这是其一。 然后,对于这些Map类型的数据结构,比如AppendOnlyMap、或是PairBuffer,在Tungsten机制下,他们的实现,就是用文中说的Tungsten HashMap来实现的。具体的实现方式是,一个数组用来存储(Hash Code,Pointer),Hash code就是Map当中Key的哈希值,而Pointer,就是Tungsten内存地址,也就是128位的地址,其中前64位是Memory Page也即内存页地址,而后64位就是定位到具体(Key、Value)对的引用。也就是说,这个Pointer,会定位到具体的数据条目(数据行)。 总结下来,Tungsten机制下,消耗内存的数据结构是HashMap,HashMap中的Pointer是内存地址,内存地址用来定位内存页和具体的数据条目,而每个内存页都是一个JVM Object,因此,就像你说的,一百万行的数据,会被分散到几十个内存页,每个内存页存储几万条数据条目。不知道这么说,能解答你的疑问吗? 关于这部分:“最后,如果我想找id=3的那行数据(假设id唯一),那就再去内存页的后64位找偏移量为3,也就是指针挪动3个单位,来确定最终要找的那一行是吗?”。 这个是不对的,内存地址的偏移量和你的数据条目本身没有关系,也就是说,你不能用id=3去寻址。这里的偏移地址,指的是,相比Memory Page的偏移地址,Tungsten会寻址到某一条Unsafe Row,而Unsafe Row里面的字段,比如包括了id、name、age等等,这个跟寻址没有关系,Tungsten寻址到Unsafe Row,会根据Schema和二进制序列化规则,自行去反序列化所需的字段,Unsafe Row内部的数据访问,和寻址已经没有关系了。这是两个层级的事情。 简单来说,寻址是去定位到Unsafe Row;而找到Unsafe Row,怎么去获取其中的字段,那是另外一回事了,结合Schema就可以搞定~
作者回复: 哈哈,没错,这里挖了个坑,老弟成功地填上了~ 赞👍 机智如你~ 赞锲而不舍的钻研精神~ 666
作者回复: 好问题~ 目前比较遗憾,堆外、堆内的划分,不是以Task为粒度的,而是以作业为粒度。换句话说,如果开启了堆外内存,在一个作业内,对于所有的Tasks,它只会尝试使用堆外内存,而不会去使用堆内内存,这个是由现在的实现机制(MemoryManager指定内存模式,而所有TaskMemoryManager继承了MemoryManager的内存模式,而MemoryManager的作用范围,是整个作业)决定的。 因此,这也是为什么Spark社区不鼓励开启堆外,一方面因为隐患比较大,对于作业稳定性影响不好;再者,在经过Tungsten优化之后,堆内上面的执行性能,一点不比堆外差。 如果非要开启堆外,也是可以的,不过这可能就需要对内存占用有个比较精确的估计。
作者回复: 老弟研究得挺深入,赞一个~ 👍 先来说HashAggregate,对于一般的数据类型来说,也就是Primitive的类型,比如int、double、float这种,Spark默认会采用HashAggregate来实现聚合计算。如果聚合的目标是对象,比方说String,那Spark就会退化到ObjectHashAggregate,来完成计算。原因很简单,HashAggregate并不支持对象类型。 另外,这二者都是用内存数据结构,来完成聚合计算,当内存不足的时候,或者Key的数量,大于一定数值的时候,这两种实现都会退化到SortAggregate,其实这也好理解,内存不足,自然需要溢出。 在Spark中,溢出的处理往往是外排,也就是先把内存中的数据排序,再溢出,最后所有溢出文件与内存中数据的聚合,再用Sort Merge来完成。因此,一旦内存不足,涉及到溢出,聚合操作自然退化到SortAggregate。 我们知道,排序往往会消耗额外的CPU和内存,因此,相比前两者,SortAggregate的性能一定更差。 关于语句UnsafeRow.isMutable(field.dataType()),现在的Spark中,会强制Aggregate的计算,要利用Tungsten的数据结构,比方说UnsafeRow,Tungsten HashMap等等。这里主要是判定字段的数据类型,是不是Primitive的,如果是,才能用HashAggregate,如果字段是诸如String类型的字段,就得退化到刚刚说的ObjectHashAggregate~ 大体上就是这些,希望对老弟有所帮助哈~
作者回复: 好问题,这里为了突出说明Tungsten HashMap的优势,我们简化了一些细节,并没有提哈希冲突的问题。 实际上,Tungsten HashMap解决冲突的方式,跟传统Java HashMap并没有本质区别,也是用链表来存储多个内存地址,从而解决冲突的问题。
作者回复: 太赞了👍,老弟V5~
作者回复: 表达式Codegen,和WSCG的回答没问题,两者一个局部,一个全局。实际上,WSCG在执行过程中,会利用到局部的表达式Codegen,两者是部分和整体的关系。 关于几个疑问: 1. 会生效。尽管Tungsten设计了自己的数据结构,比如Unsafe Row,比如HashMap,但在实现机制上,(如果是堆内内存),仍然逃不脱JVM机制的管控,比如GC效率还是跟对象数成反比,再比如你说的指针压缩机制,都是同样适用的。 2. 好问题,确实有64位是null,但是使用统一的内存地址抽象,方便Spark对于内存的统一管理。不论堆内还是堆外,使用Tungsten地址可以做到统一寻址,在代码项目的实现与维护上更加高效,避免仅仅因为内存空间的不同,就需要实现并维护两套不同的代码。
作者回复: 不是哈,Object引用对应的是内存页(Memory Page)地址,通过Object引用来寻址内存页,而偏移地址,你可以理解成:内存页里面的Unsafe Row的起始地址。至于说Unsafe Row内部的数据列如何寻址、访问,这个就是Unsafe Row二进制字节序列本身的事情了,就是定长字段按序访问、变长字段先得到Unsafe Row内的Offset,再去拿字段长度和具体内容,比如字符串“Mike”。 一个是128位Tungsten地址的偏移地址,一个是Unsafe Row内部的偏移地址,虽然都叫Offset,但是含义完全不同哈~
作者回复: 对,projection,就是列剪枝,选出需要的字段,实际上就是这个意思,只不过名字听起来高大上一点,哈哈