零基础入门 Spark
吴磊
前 FreeWheel 机器学习研发经理
19171 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 38 讲
零基础入门 Spark
15
15
1.0x
00:00/00:00
登录|注册

11 | 存储系统:数据到底都存哪儿了?

你好,我是吴磊。
感谢你在国庆假期仍然坚持学习,今天这一讲,我们来学习存储系统,与调度系统一样,它也是 Spark 重要的基础设施之一。不过,你可能会好奇:“掌握 Spark 应用开发,需要去了解这么底层的知识吗?”坦白地说,还真需要,为什么这么说呢?
我们前面学了 Shuffle 管理、RDD Cache 和广播变量,这些功能与特性,对 Spark 作业的执行性能有着至关重要的影响。而想要实现这些功能,底层的支撑系统正是 Spark 存储系统。
学习和熟悉存储系统,不单单是为了完善我们的知识体系,它还能直接帮你更好地利用 RDD Cache 和广播变量这些特性。在未来,这些知识也能为你做 Shuffle 的调优奠定良好的基础。
既然存储系统这么重要,那要怎样高效快速地掌握它呢?本着学以致用的原则,我们需要先了解系统的服务对象,说白了就是存储系统是用来存什么东西的。

服务对象

笼统地说,Spark 存储系统负责维护所有暂存在内存与磁盘中的数据,这些数据包括 Shuffle 中间文件、RDD Cache 以及广播变量
对于上述三类数据,我们并不陌生。我们先回顾一下什么是 Shuffle 中间文件,在 Shuffle 的计算过程中,Map Task 在 Shuffle Write 阶段生产 data 与 index 文件。接下来,根据 index 文件提供的分区索引,Shuffle Read 阶段的 Reduce Task 从不同节点拉取属于自己的分区数据。而 Shuffle 中间文件,指的正是两个阶段为了完成数据交换所仰仗的 data 与 index 文件。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入介绍了Spark存储系统的构成和重要组件,以生动的类比和详实的内容帮助读者深入了解存储系统的工作原理。文章从MemoryStore和DiskStore两个重要组件入手,分别介绍了它们在内存数据访问和磁盘数据访问中的作用和工作原理。通过形象的比喻和示意图,读者可以清晰地了解RDD Cache和Shuffle中间文件的存储过程,以及数据块与磁盘文件之间的映射关系。此外,文章还重点回顾了存储系统的核心组件和数据存取过程,帮助读者全面理解存储系统的重要性和工作原理。整体而言,本文内容详实,适合希望深入了解Spark存储系统的读者。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《零基础入门 Spark》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(10)

  • 最新
  • 精选
  • Geek_2dfa9a
    置顶
    今天的题好像面试啊,LinkedHashMap是HashMap的增强版,直接继承了HashMap类,多提供了一个可以按插入顺序的遍历,这个遍历是通过 两方面实现的: 1.直接扩展HashMap的Map.Entry,增加了before,after的指针,这样就在不改变本身数组的结构下 (哈希表本身通过数组实现常数级查询的时间复杂度,因此会打乱插入顺序),又变成了一个双链表,双链表的顺序就是插入顺序。 2.记录链头,链尾,这样就可以从头/尾按顺序遍历了。 这里LinkedHashMap是怎么组织链表的值得提一下,LinkedHashMap没有覆盖HashMap的put方法,HashMap使用了模版设计模式, 很好的实现了扩展和解耦,通过提供空方法扩展点afterNodeAccess,afterNodeInsertion,afterNodeRemoval, LinkedHashMap是通过重写这些扩展点实现了链表的插入和删除。 最后回一下上节课和老师交流的打开视野,目前来说,Spark对我来说有点像盲人摸象,现在只摸到了象腿,所以说大象像根柱子, 但是Spark可能还有很多精彩的地方我没摸到,期待老师带我们从多角度深入了解Spark这头大象。

    作者回复: 老弟谦虚啦!到目前为止,你的回答最为精彩,每次都让我印象很深,真的很棒👍!老弟得空可以加我微信,搜索“方块K”或是“rJunior”,我把你拉进读者群~ 在那里可以跟更多的同学探讨,分享你对于技术的理解和把握~ LinkedHashMap分析的非常到位,很深入,满分💯,置顶🔝。不过,提醒一点,LinkedHashMap的采用,很重要的一点,是用来实现RDD Cache的LRU机制,老弟不妨就这一点,再展开思考,我们评论区继续讨论~

    2021-10-11
    2
    9
  • Geek_2dfa9a
    MemoryStore这块涉及大量的nio,看得我头皮发麻。简单点讲,里面有个核心的常量LinkedHashMap,作为LRU缓存,存储所有Block。 putBytes()这个方法主要用来写数据,方法入参分别是blockId(数据块的标识),size(数据块长度),memoryMode(存放在堆上还是堆外), _bytes(具体内存分配的闭包),具体实现逻辑是,先检查blockId对应的数据块是否已缓存,然后通过memoryManager(1.6以前是StaticMemoryManager, 不能支持堆外内存,1.6以后默认UnifiedMemoryManager,可以通过spark.memory.useLegacyMode指定)确认内存是否够缓存,在后通过 _bytes把数据拷贝到DirectByteBuffer,如果数据本来就在堆外的话就省略这个逻辑,最后把blockId作为key,ChunkedByteBuffer (就是一个DirectByteBuffer数组,里面是要缓存的数据)作为SerializedMemoryEntry存到LinkedHashMap里,这里注意, 为了保证线程安全,LinkedHashMap需要加锁,这里是一个细粒度锁加到少部分代码上减少开销。 上面的是缓存序列化块的逻辑,putIterator既可以缓存序列化的值(堆内/堆外),也可以直接缓存对象(只能存放在堆内),具体逻辑和LinkedHashMap 不太相关了,篇幅有限就不分析了。 作为一个LRU缓存,Spark肯定还要有一个容量满后的清除操作,触发点在put时校验空间是否足够,具体逻辑在evictBlocksToFreeSpace, 入参有三个blockId(失效后需要缓存的block),space(需要缓存的block的长度),memoryMode。缓存失效这块影响比较大,需要加两个锁, 在外层给memoryManager加锁,之后再给LinkedHashMap加锁,因为put的时候memoryManager没加锁,如果正在put的时候清理缓存会发生数据竞争, 因此LinkedHashMap也需要加锁,保证同一时间LinkedHashMap只能有一个操作,之后的操作就简单了,拿到LinkedHashMap的迭代器, 从第一个Entry开始判断,如果可以失效()的话就记录下blockId并在blockInfoManager针对blockId加锁, 一直到释放的内存达到space。接下来判断如果清理这些失效的block能否拿到需要的space,不能的话就返回0,表示不能清理出所需空间,如果能拿到的话, 就开始dropBlock(),有些block可能同时在磁盘和内存缓存,如果只清除一处的话blockInfo无需删除解锁即可,如果磁盘和内存都没有了则需删除blockInfo, 如果清理过程中发生异常,则把还没清理的blockInfo解锁(这块逻辑放在finally里,并且作为一个编程技巧注释出来)。 另外有一点值得注意,Spark为了避免MaxDirectMemorySize的限制,使用了反射拿到了DirectByteBuffer的私有构造方法 private DirectByteBuffer(long addr, int cap),这样就避开了allocateDirect方法里面Bits.reserveMemory的限制。

    作者回复: 分析得很透彻,memoryStore缓存数据、evict数据的过程说得很清楚~ 赞👍!!!

    2021-10-12
    10
  • Alvin-L
    原本存储在mysql里的数据如何转存储到spark里?老师可否加餐讲一讲mysql转存spark的相关内容

    作者回复: 首先明确一点哈,Spark是纯粹的计算引擎,它本身是没有存储引擎的。当然,它有自己的存储系统(BlockManager那一套体系),但是这个存储系统跟存储引擎(比如S3、HDFS)有着本质的区别,Spark需要外部存储引擎来获取数据源,Spark自己的BlockManager仅仅是为了更好地辅助分布式计算。 老弟说的MySQL数据,我理解是物化到磁盘的数据记录。要把MySQL的数据,dump到S3,或是HDFS,其实有很多方法。这个过程,可以通过Spark来做,也可以用其他大数据组件来做,比较MapReduce或是早期的Sqoop,当然,现在Sqoop逐渐退出历史舞台。 如果要用Spark做的话,也就是把MySQL数据dump到HDFS或是S3,其实就是read API + write API这么简单,中间都没啥计算逻辑其实,就是纯粹的读 + 写,搞定~ 不知道问题我理解的对不对哈,咱们评论区继续讨论~

    2021-10-18
    2
    1
  • Neo-dqy
    【BlockManagerMaster 与众多 BlockManager 之间通过**心跳**来完成信息交换】,可以问一下老师这个心跳机制是什么呀?具体是怎么实现的呢?

    作者回复: 好问题~ 2.0之前,spark用akka来实现不同组件之间的通信。2.0以后,spark抛弃akka,基于netty实现了自己的rpc系统,主要对象有: rpcEndpoint,对标akka的actor rpcEndpointRef,对标akk的actorRef rpcEnv,对标akka的actorSystem Spark RPC解耦了Spark Core和底层通信系统,让spark不同组件之间(包括存储系统,blockmanager,等)实现高效异步通信

    2021-10-05
    1
  • qinsi
    LinkedHashMap 可用来实现 LRU Cache

    作者回复: 正解,满分💯~

    2021-10-04
    1
  • 小强
    请问老师,RDD Cache 以及广播变量是存储在storage memory, 那shuffle中间文件是否是存储在execution memory里啊?

    作者回复: 并不是哈,shuffle中间文件是物化到spark.local.dir这个配置项设置的地址,主要是磁盘中的文件系统目录,当然,也可以是Ramdisk(把内存当磁盘用,开辟内存文件系统)。总之,具体存在哪里,是由spark.local.dir决定的,而这个配置项,往往是磁盘

    2021-10-26
  • 钱鹏 Allen
    LinkedHashMap可以类比HashMap前者有序,后者无序

    作者回复: 可以参考qinsi的回复~ LinkedHashMap特有的结构,Spark主要利用它实现LRU

    2021-10-07
    2
  • Unknown element
    老师在文章倒数第二张图片里shuffle中间文件的blockid里有个属性叫 是否为shuffle block ,这里的shuffle block是什么呢?为什么shuffle中间文件不属于shuffle block?

    作者回复: blockid是对数据块的统一封装(类),可以代表rdd cache block、广播变量block、shuffle block。是否为shuffle block这个字段,就是用来判断封装的数据块是不是用于shuffle的~

    2021-10-06
    3
  • Wangyf
    插个眼。 为嘛内存的那个 MemoryStore 是自己干活,但磁盘的 DiskStore 却要维护另一个对象来干活?那它自己又去干嘛了?
    2022-07-20
    1
  • qingtama
    老师有个问题想请教下,如果使用LinkedHashMap是为了LRU的话就是说数据会有被淘汰的情况,但是我理解不过是内存的数据还是磁盘的数据都不能因为达到了LinkedHashMap存储上限而被清理掉吧,尤其数据都在被使用着的时候。
    2022-04-03
收起评论
显示
设置
留言
10
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部