作者回复: 好问题哈,第三问确实很难,这里老哥我挖了个天坑,老弟见谅哈~ Spark使用cachedData = IndexedSeq[CachedData]()来存储(LogicalPlan,InMemoryRelation)键值对列表,其中的LogicalPlan就是Analyzed Logical Plan。咱们的问题是,为什么Spark选择了使用Analyzed LP作为Key,而没有选择Optimized LP。这个问题的关键,其实是Spark如何判断一个待计算的执行计划是否已经存在缓存结果了。Spark怎么来判断呢?简单的说,就是通过规范化的Logical Plan(CanonicalizedPlan)来进行对比,如果两个查询的CanonicalizedPlan是完全一致的,那么Spark SQL认为:OK,这个计划在缓存中已经有结果了,直接拿来用就好。 那么接下来的问题就是,给定一个Logical Plan,Spark SQL如何生产CanonicalizedPlan,简单来说,就是把AST语法树中的fields、ids不太归一化的字段进行归一化,从而消除不同查询计划之间那些“无关痛痒”的细微差别,这些细微差别与查询结果无关,所以在生成CanonicalizedPlan的过程中,Spark会把这些“细微差别”去掉,从而不影响两个CanonicalizedPlan之间是否相同的对比。 但是,这个规范化或是归一化的过程,Spark SQL其实并不敢完全保证。也就是说,本来计算结果相同的两个查询,他们的CanonicalizedPlan依然有可能不一样,也就是所谓的false negative。对于两个Plan之间CanonicalizedPlan的对比,Spark SQL采取保守的策略,就是宁可错杀1000,也不能漏放一个。也就是,Spark SQL宁可把两个执行结果一样的Logical Plan判定为不同,也要100%地保证:执行结果不同的两个查询,他们的CanonicalizedPlan一定是100%是不同的,也就是不能允许出现false positive的情况。 正是因为CanonicalizedPlan无法完全确保执行计划与计算结果完全一一对应,所以cachedData这个东西越早检查越省事,如果推迟到Optimized Logical Plan之后,那么大多数情况下,要么缓存很难命中,要么缓存好不容易命中了,但是Optimized阶段的优化就都白做了。所以总结下来,把Cache检查放在Analyzed LP之后、Optimized LP之前,就是上面说的这个原因。这个确实比较变态,root cause分析起来很困难,这个其实需要结合源码去顺藤摸瓜。辛苦老弟了哈~ 另外分享给老弟一个小tips:当你发现,你的问题,百度、google或是身边的人,都没办法帮你解答的时候,源码是最好的“老师”,我觉得这个是开源项目最大的意义,分享给老弟~
作者回复: 答得都对,具体细节刚刚在上一个thread都展开了,可以看看哈~
作者回复: 好问题,RDD确实不存在Cache miss的问题;咱们文中说的Cache miss,都是围绕着DataFrame,DataFrame、Dataset、SQL都走Spark SQL优化流程。Spark SQL有单独的Cache Manager来管理Cache复用,它本身的一些缺陷,会导致上述这些API,在开发的过程中,会有Cache miss的隐患~ 关于Spark SQL的优化流程,可以参考后面Spark SQL那三讲,那几讲展开的比较细哈~
作者回复: 好问题~ 1. 是的,一个空间不够,因此继续向后扫描,扫了两个之后,发现空间够了,就停止了。 2. 这个要赞一个~ 👍,思考很细致。这里咱们图省事,我没有交代的特别清楚,实际上,blockId不是String,而是一个sealed class,也就是一个类。这个类有不少属性,其中一个是asRDDId,这个函数就可以用来获取RDD Id,从而区分扫描的Block,是属于哪个RDD的,从而实现“兔子不吃窝边草”~ 3. 对,现在的设计,是放在Analyzed LP之后、Optimized LP之前,相当于是一种逻辑优化,就像你说的,能省略掉一些Rules、或者选取一些更优的Rules。 关于Cache Manager的优化,有个思路是SQL Re-write,其实这个思路就来源于传统的DBMS,Spark SQL没有这个环节。如果能够用SQL Re-writer,把Analyzed LP重写,那么其实potentially,很多的Analyzed LP都可以共享一份数据。Query Re-writer本身也并不复杂,参考传统DBMS,就可以很快实现出一个来。如果Cache Manager参考的,不再是“纯字符串”的Analyzed LP,而是重写之后的查询,想必效果要好很多~
作者回复: 对,是的,要先用Action算子来触发缓存的计算,让Spark真正把分布式数据集缓存到内存或是磁盘中去。否则的话,就像你说的,“就算用了cachedRdd这个变量多次缓存也不起效果”。
作者回复: 1 没问题,我之前看过你的回答~ 2、3 也都对,满分💯~
作者回复: 好问题,3个土豆(更准确地说,是多个,三个麻袋里面的所有土豆)一起,构成一个RDD哈~ RDD是数据“抽象”,它是用来“囊括”分布式数据集的,分布式数据集由数据分片/分区构成,土豆工坊里面的每一个土豆,都是一个RDD数据分片,不是RDD本身,他们凑在一起,够了一个RDD。 RDD是抽象的概念,而数据分片是实体。
作者回复: 物化(Materialization):把分布式数据集的Iterator迭代器,展开并存储到内存或是磁盘的过程。
作者回复: 文中的三种方式,主要是为了区别示意哈,目的是为了说清楚缓存复用。实际开发的时候,肯定是需要Action算子来触发缓存计算的。
作者回复: 不是哈,RDD还不如DataFrame,因为DF会走Spark SQL,Cache Manager是Spark SQL的组件之一。虽然CM效率不高、容易miss,但是Spark SQL好歹还有这么个组件帮他复用Cache。 RDD更惨,没有哪个组件来帮他做“复用”这件事,所以,要想充分利用、复用RDD Cache,你只能用第三种方式,也就是变量赋值的方式,明确地引用RDD Cache。