• little_fly
    2021-05-15
    老师,能不能对Spark应用程序中经常遇到的一些报错,写篇分析文章?比如,类似如下的报错信息: 1. org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 172 bytes of memory, got 0 2. ERROR cluster.YarnScheduler: Lost executor 4 on node-3: Container killed by YARN for exceeding memory limits. 5.0 GB of 5 GB physical memory used. 3. java.lang.OutOfMemoryError: No enough memory for aggregation 4. org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException: Executor is not registered 5. ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to node-2/192.168.10.1:41977 等等诸如此类的报错,很多也都不是说增加资源就能解决的。希望老师能分享一下有哪些比较好的分析排错思路
    展开

    作者回复: 非常好的建议,这块我是这么想的,后续我会整理一个github repo,把专栏中涉及的代码、数据、结果、以及常见问题等内容汇总到这个项目中去。当然内容不止这些,结合大家的需要,我们还会持续不断地向其中添加诸如笔试面试题、工作机会、职业发展等内容,把potatoes项目打造成我们共有的Spark私塾。看到你的这个问题,我觉得可以多加一类内容,就是常见报错,把这些报错分门别类,报错原因、应对办法,等等。 对于这个github repo,我是打算用开源的思路来做,比如就这个常见报错,我是期望大家群策群力,一起共享工作中遇到的各种报错,然后可以一起来分析背后的root cause和解决办法~ 然后不断地把它丰富、完善~ 等专栏写完,我就会着手做这件事,到时候一起弄哈~

    共 3 条评论
    11
  • 辰
    2021-05-13
    这个dpp限制其实蛮多的,就比如拿课件中的例子,通过user.id关联,而且这个关联应该也是大部分场景都用到的,但是的事实表居然要按userid进行分区,不管是电商项目还是其他项目,userid肯定是比较多的,比如10万个user就对应10万userid,也就对应了10万个分区,这怕不是要玩爆系统哦,不知道理解的对不对

    作者回复: 理解得非常的对~ 分区键和Join Keys的选取,确实是个博弈,咱们课件中的例子,为了方便说明DPP的机制和原理,所以没有深究。但是你说的非常对,就是这个Join Keys,确实本身就是个限制。因为就像你说的,平时的数据关联,都是ID类的Join Keys比较多,而这样的数据列,往往是不适合做分区键的。 所以要想充分利用DPP,确实是得在数仓规划的最开始,就把以后要用的常用查询都考虑到,在做分区表设计的时候,尽量做到分区处理本身与查询执行性能之间的平衡。

    
    6
  • Fendora范东_
    2021-05-10
    有几个疑问,麻烦磊哥指点下 1.文中所写SQL,最终有3个stage,读数据的两个stage按理说是没有依赖关系的,资源足够情况下是可以并发运行的,这就和DPP运行逻辑相违背了,这块spark是怎么做的呢? 2.有没有这样一种可能:DPP生效,即维度表做了过滤查询,事实表基于维度表广播过来的hashtable再做过滤查询,这个过程比DPP不生效,即两张表并发进行过滤查询,过程还慢? 3.DPP感觉包含了AQE join策略调整。假如本来是SMJ,运行过程中维度表经过过滤小于广播阈值,变成了BHJ。

    作者回复: 好问题~ 先说前两个问题,其实这个问题是,Spark是先扫描事实表,还是先做DPP,答案肯定是后者。 说说DPP的意义,这里有个关键,就是Spark“一上来”、在最开始,是怎么知道谁是事实表、维度表的?其实,Spark SQL的静态优化,关于数据统计,比如针对Parquet、CSV等等不同的数据源,它其实不是什么都不做,它至少会统计“表大小”这些最基本的信息,根据这些size类型的信息,它其实可以判断哪个是事实表,哪个是维度表。 如此一来,它先去判断DPP的前提条件是否成立,成立的话,就走一波DPP。 再来说第3题,确实是好问题,不过这里其实本质上是和DPP的实现机制有关,也就是说,它用“一箭双雕”的方式,一方面利用广播来过滤,一方面顺水推舟,用ReuseExchange(内存中的reuse,跟之前的磁盘reuse不同)再次利用广播来完成Broadcast Hash Join,一石二鸟。 再者,DPP实际上是先于AQE Join策略调整的,因为这里还没有Shuffle呢,还记得AQE的触发机制是Shuffle吗?因此DPP相当于截了AQE的“胡儿”(打麻将的截胡儿),不过其实谁先谁后的,没那么重要,反正我们享受到了Broadcast Hash Join的性能提升。 再者,DPP“水到渠成”的BHJ,其实还有一个优势,就是相比AQE,它不需要Shuffle就能触发,因此不需要Shuffle Write阶段的计算,才能做优化,所以说DPP带来的BHJ,相比AQE的Join策略调整,其实是更优的优化~

    共 2 条评论
    6
  • kingcall
    2021-05-11
    回答: 问题1 Broadcast Hash Join 本身也是依赖Broadcast 来完成的,所以Broadcast 肯定是可以完成这个需求的,但是有一个问题 Broadcast Hash Join 有spark.sql.autoBroadcastJoinThreshold 这个限制条件,但是这个条件也仅仅是限制了是否自动发生Broadcast Join,所以手动Broadcast话就没这个限制了或者使用hints,所以想怎么玩就怎么玩了,不知道对不对? 但是这里依然有一个问题那就是Broadcast出去的变量过大,这也就是 Broadcast Join 为啥有一个阈值了,要是这里能有一个这样的机制就好了,首先判断能否发生PDD 优化,然后让每个executor 去主动拉去自己所需的数据就好了(满足条件的 Join Key 的key 和 value),而不是整个Broadcast 变量。 问题2 缓存、外部存储主要是saprk 没有类似flink 的 Async IO ,但是也可以自己在map 函数中实现

    作者回复: 问题1可以参考下面几个同学的思路~ 强制广播和依靠广播阈值(也就是spark.sql.autoBroadcastJoinThreshold )来广播,其实本质上区别不大。 DPP的核心机制,还是用维表的Key set来过滤事实表数据。这个Key set全集是必需的,Spark的默认实现,是采用了广播变量,这个广播的哈希表还能用于后续的BHJ,一箭双雕,但是也因此而受限于广播阈值和8G的限制。 问题2答得不错,就是用其他的方式获取Key set全集,分布式缓存、存储,其实都行,虽然都会引入数据分发,但去掉了广播的限制~ 在DPP的第二个环节,也就是两表过滤之后的Join,其实采用SMJ或者是Shuffle Hash Join,就可以再次去掉广播阈值的限制。 所以总结下来,要想去掉广播的限制,需要对两个环节进行改进,一个是Key set的全集存储、分发;一个是过滤之后两表的Join。

    
    4
  • zxk
    2021-05-11
    问题一:放弃使用 BHJ 的话,其中一个表经过裁剪过滤后,使用广播变量只广播 Join Key 而非整个表的数据,仍可以实现 DPP,但个人仍为仍需要针对广播的 Join Key 加上一个 Threshold,否则可能将 Driver 撑爆。 问题二:如果维度表的过滤条件正好是 Join key,同时也是事实表的分区目录,也可以考虑先将这个过滤条件直接推到事实表作为一个靠近数据源的 filter 条件。 有个疑问请教下老师: 1DPP 的机制就是将经过过滤后的维度表广播到事实表进行裁剪,减少扫描数据,但 Spark 怎么才知道哪个是维度表哪个是事实表?因为 Spark 一上来是不清楚表信息的,如果一开始就把事实表先扫描了,那感觉 DPP 就失去意义了,是否需要开发者共一些额外的信息?

    作者回复: 满分💯,答得好,问得也好~ 思考的很深入 先说第一题,没错,只广播Join Keys,广播阈值的限制会低很多,不过还是需要利用广播机制,来传递“过滤条件”,事实表拿到“过滤条件”来降低扫描量。因此就像你说的,虽然限制降低,但是还是有Join Keys超大撑爆广播阈值、甚至是Driver的风险。所以要想完全去掉广播,那就必须要换一种网络分发方式,举个例子,比如分布式文件系统、分布式缓存,也就是所有Executors都可以通过他们拿到“过滤条件”。当然,你可能会说,这样效率就低很多了,确实,不过其实这道题的目的,就是鼓励大家脑洞、多思考、发散思维,思维越发散,其实对于Spark为什么采用广播机制的理解就越深刻。 再说第二题,很不错的思路~ 相当于是Query rewrite,就是直接把维表Join Key上的过滤条件,在SQL查询优化阶段就传递给事实表,不错不错~ 不过,Spark并没有Query rewrite这个阶段,但是思路我很喜欢~ 其实传统的DBMS都是有Query rewrite这个环节的,Spark偷懒,这部分没做。 最后再说DPP的意义,就是你说的,Spark“一上来”、最开始怎么知道谁是事实表、维度表。其实它还真是知道的,Spark SQL的静态优化,关于数据统计,比如针对Parquet、CSV等等不同的数据源,它其实不是什么都不做,它至少会统计“表大小”这些最基本的信息,根据这些size类型的信息,它其实可以判断哪个是事实表,哪个是维度表。

    
    3
  • To_Drill
    2021-12-02
    老师,按照DPP的机制来看,DPP只支持内连接(inner join)不支持外连接(left/right join)吧?如果是的,那就有多了个限制条件,适用场景更少了。

    作者回复: 那倒没有哈~ DPP并没有关联形式(inner、left、right)上的限制

    共 3 条评论
    1
  • 张守一
    2021-05-18
    老师 如果以userid作为分区字段 相比于日期等分区字段 分区过多 但id等字段又常常作为join key 这个怎么解决呢

    作者回复: 好问题,这块确实是DPP的“硬伤”。 就像你说的,Join Keys往往是cardinality比较高的字段,比如userId;而分区键往往是要选择那些cardinality比较低的字段,否则数据的存储就会非常的分散。 一个cardinality高,一个cardinality低,两者相互矛盾。这块确实比较“蛋疼”,如果查询中的Join Keys多属于userId这种cardinality非常高的字段,坦白说咱们还真没什么好办法。 对于那些cardinality较高的Join Key,我们就需要做取舍,也就是在存储效率和DPP之间做权衡。如果查询效率是第一优先级,那么我们其实还是可以强行对cardinality较高的Join Key做分区键。但如果相反,存储效率最大的concern,那么也就只好放弃取Join Key做分区键,放弃DPP优化机制。

    
    1
  • aof
    2021-05-16
    可以在逻辑计划优化的时候,就直接将维表的过滤条件通过join条件传导到事实表,减少数据的扫描

    作者回复: 不错的思路~ 不过这里有个前提条件,就是对于维表上的过滤字段,事实表上也要有才行。比如,dim.c1 = "xxx",那么就要求fact上面也要有c1字段,否则仅在逻辑计划阶段,是做不到过滤条件传导的。 这个思路本质上其实是Query Re-writer,就是在逻辑计划阶段把查询进行重写。

    
    1
  • Unknown element
    2022-01-17
    老师您好,问下如果关联是left join那DPP还能生效吗?此时左表并不需要过滤,因为结果集应该保留左表所有数据 谢谢老师~

    作者回复: 好问题,确实,如果是left join,DPP就没有意义了,毕竟左表的数据不管能不能关联上,都需要扫描出来

    
    
  • tony
    2021-09-08
    其实,spark有定义rule的机制,在逻辑计划阶段自己实现query rewrite,通过一些前置的检验条件,把维表的非分区字段条件转为filter推到事实表的logical plan上。不过只能满足小数据量的场景。

    作者回复: 确实,其实可以借鉴传统RDBMS中经典的优化方法,query rewrite就是其中一个。Spark SQL确实缺少query rewriter这个组件,采用逻辑规则加物理策略的方式,实际上缺少了一定的灵活性~ 不过,query rewrite这块,我理解和数据量本身应该关系不大

    
    