作者回复: 好问题,是这样的: 1. 谓词下推本身,不依赖于任何文件存储格式,它本身就是Spark SQL的优化策略,DataFrame里面如果包含filter一类的操作,他们就会尽可能地被推到执行计划的最下面。 2. 但是,谓词下推的效果,和文件存储格式有关。假设是CSV这种行存格式,那么谓词下推顶多是在整个执行计划的shuffle之前,降低数据量大小。但如果是orc、Parquet这种列存文件,谓词下推能直接推到文件扫描上去,直接在磁盘扫描阶段,就降低文件扫描量,降低i/o开销,从而提升执行性能。
作者回复: 对,在这个场景下,先用distinct节省数据量更合适。咱们能省则省、能拖则拖是一般性原则哈。不过先用distinct其实还是遵循了能省则省的原则。get到核心思想就好,灵活应用~
作者回复: 这个PR描述的问题,是说: 当你读取Parquet文件的时候,Parquet filter(就是用谓词下推的时候,需要用到的功能)是需要做序列化的,在这个PR之前,序列化是在Driver端做得,但是,Driver的做法不是线程安全的,所以存在重复序列化的隐患,也就是你看到的报错。 这个PR解决的问题,就是把Parquet filters的序列化,从Dirver,挪到Executors,从而避免刚刚说的线程安全问题。 因此,如果你的业务问题,和这个PR的描述是一致的,那么,升级Spark(比如到2.3),就可以解决这个问题~
作者回复: 好问题,你说的对,hash之后,确实存在哈希冲突的隐患,具体细节可以参考27讲提到的思路:https://time.geekbang.org/column/article/373089。 简单来说,至少有两种办法来避免哈希冲突: 1)使用两种算法做哈希,最终把哈希值拼接在一起,同一个Value,经过两种不同哈希算法,得到完全一样的哈希值的概率,几乎为0。 2)不使用哈希的办法,而是给join keys每个字段维护一个字典,每个字段值在字典内对应一个唯一的整数。拿到每个字段指定的种整数,然后组装起来,作为新的join key。这个是 @Fendora范东_ 同学提供的方法,我觉得比哈希的方法更好,既缩短了Join Key,又不存在哈希冲突的问题。
作者回复: 补充的非常好~
作者回复: 可以的,没问题~ 后面搞一个~
作者回复: 你说的没错,默认是关闭的,这块细节咱们在后面的配置项章节和内存视角会有详细展开~
作者回复: 比如说: val df: DataFrame = _ // 某个转换过程的中间结果 val temp = df.cache 然后,在另一个分布式数据集(RDD、或是DataFrame)中,引用temp变量,把temp当作普通变量来用
作者回复: 好问题~ 这部分细节咱们在27讲“大表Join小表”有展开哈,不妨看一看。 简单来说,就是左右表的的Join Keys,都做相同的处理,比如文中提到的 1. 拼接Join Keys 2. 计算哈希值 就是左右两张表,都做同样的操作,这样,每张表都会多出来一个新的字段,比如把它叫做Key Hash,那么两张表就可以用新的Key Hash来做关联了~
作者回复: 看上去是shuffle fetch的过程中出了问题,总是没办法成功拉取远端数据,之所以时间长,是因为task总是retry,不过居然最后都试成功了。也就是你的task从那个host不停地拉数据、不停地失败、不停地重试,在第4次fail之前,总能成功。基于这个猜测,我觉得看看那台主机的文件系统。如果文件系统没问题,就要看那台主机的负载,需要double check下是否真的没有大gc、数据是不是真的没有倾斜。