• Sean
    2021-08-22
    老师上文提到了"使用 Parquet、ORC 等文件格式,去坐享谓词下推带来的数据读取效率",应该如何理解,莫非谓词下推依耐与于文件存储格式吗

    作者回复: 好问题,是这样的: 1. 谓词下推本身,不依赖于任何文件存储格式,它本身就是Spark SQL的优化策略,DataFrame里面如果包含filter一类的操作,他们就会尽可能地被推到执行计划的最下面。 2. 但是,谓词下推的效果,和文件存储格式有关。假设是CSV这种行存格式,那么谓词下推顶多是在整个执行计划的shuffle之前,降低数据量大小。但如果是orc、Parquet这种列存文件,谓词下推能直接推到文件扫描上去,直接在磁盘扫描阶段,就降低文件扫描量,降低i/o开销,从而提升执行性能。

    
    25
  • 斯盖丸
    2021-03-31
    老师如果两个超大表,但是一张表重复数据很多,那是不是先做distinct,再join会好一些?毕竟虽然distinct会shuffle但最后join的数据量也是成倍减少的

    作者回复: 对,在这个场景下,先用distinct节省数据量更合适。咱们能省则省、能拖则拖是一般性原则哈。不过先用distinct其实还是遵循了能省则省的原则。get到核心思想就好,灵活应用~

    
    6
  • 猿鸽君
    2021-05-10
    https://github.com/apache/spark/pull/21086。老师您好,请问老师知道这个可能是什么原因导致的吗?我用2.2.0版本就会出这个错。通过spark ui也看不到有task failed。看起来就像被强制终止了。

    作者回复: 这个PR描述的问题,是说: 当你读取Parquet文件的时候,Parquet filter(就是用谓词下推的时候,需要用到的功能)是需要做序列化的,在这个PR之前,序列化是在Driver端做得,但是,Driver的做法不是线程安全的,所以存在重复序列化的隐患,也就是你看到的报错。 这个PR解决的问题,就是把Parquet filters的序列化,从Dirver,挪到Executors,从而避免刚刚说的线程安全问题。 因此,如果你的业务问题,和这个PR的描述是一致的,那么,升级Spark(比如到2.3),就可以解决这个问题~

    
    5
  • John.Xiong
    2021-06-12
    老师,您说的两个表通过把多个字段拼接后hash成一个字段关联,但是hash不是只有碰撞问题吗?万一两个不同组合弄成了一个hash值不是会导致问题吗?我对碰撞不太熟悉,可能说的不对,请老师指教

    作者回复: 好问题,你说的对,hash之后,确实存在哈希冲突的隐患,具体细节可以参考27讲提到的思路:https://time.geekbang.org/column/article/373089。 简单来说,至少有两种办法来避免哈希冲突: 1)使用两种算法做哈希,最终把哈希值拼接在一起,同一个Value,经过两种不同哈希算法,得到完全一样的哈希值的概率,几乎为0。 2)不使用哈希的办法,而是给join keys每个字段维护一个字典,每个字段值在字典内对应一个唯一的整数。拿到每个字段指定的种整数,然后组装起来,作为新的join key。这个是 @Fendora范东_ 同学提供的方法,我觉得比哈希的方法更好,既缩短了Join Key,又不存在哈希冲突的问题。

    
    4
  • aof
    2021-05-04
    其实老师总结的已经很全面了。这里推荐两个比较通用的调优小技巧: 1、 Spark默认使用的是Java serialization序列化方式,我们可以考虑使用Kryo serialization序列化的方式,不过会有一些限制,比如不是支持所有的序列化类型,需要手动注册要序列化的类。 2、 尽量使用占用空间小的数据结构。比如,能使用基本数据类型的就用基本数据类型,不要用对应的包装类(int——>Integer),能用int的就不要用String,String占用的空间要大的多。

    作者回复: 补充的非常好~

    
    3
  • Cohen
    2021-04-08
    老师,能否弄个GitHub 配套代码案例

    作者回复: 可以的,没问题~ 后面搞一个~

    共 3 条评论
    3
  • October
    2021-03-31
    享受Tungsten带来的堆外内存的红利时,除了使用dataframe或dataset API之外,还需要在sparkconf中开启堆外内存吧

    作者回复: 你说的没错,默认是关闭的,这块细节咱们在后面的配置项章节和内存视角会有详细展开~

    共 4 条评论
    3
  • balabala
    2021-10-24
    老师你好。您在单机思维中提到“类似这种忽视实例化 Util 操作的行为还有很多,比如用临时变量缓存数据转换的中间结果等等”。请问可以举例一下吗?

    作者回复: 比如说: val df: DataFrame = _ // 某个转换过程的中间结果 val temp = df.cache 然后,在另一个分布式数据集(RDD、或是DataFrame)中,引用temp变量,把temp当作普通变量来用

    
    1
  • 陈威洋
    2021-06-30
    磊哥好~ 请教个基础问题,文章有一段话:“用哈希算法生成一个固定长度的字节序列,把它作为新的 Join key”。我的理解是把右表的字段名用哈希算法形式拼接起来,我在想新的Join key怎么能跟左表的key保持关联关系呢?我在用join连接表的话,这个新的key起到作用关联的作用? 希望得到磊哥的解惑!~~

    作者回复: 好问题~ 这部分细节咱们在27讲“大表Join小表”有展开哈,不妨看一看。 简单来说,就是左右表的的Join Keys,都做相同的处理,比如文中提到的 1. 拼接Join Keys 2. 计算哈希值 就是左右两张表,都做同样的操作,这样,每张表都会多出来一个新的字段,比如把它叫做Key Hash,那么两张表就可以用新的Key Hash来做关联了~

    共 2 条评论
    1
  • 乐意至极
    2021-03-31
    老师,你好。我在实际工作中遇到这个问题ERROR RetryingBlockFetcher: Exception while beginning fetch of 520 outstanding blocks (after 1 retries) java.io.IOException: Failed to connect to <HOST/IP>:38000 持续了12小时 我有以下观察: 1,这个<HOST/IP>上的Executor已经SUCCESS了 2,这个持续了12小时的task是process local 3,无长时间gc,也无明显倾斜 排查了很久。。希望老师能给点指点~

    作者回复: 看上去是shuffle fetch的过程中出了问题,总是没办法成功拉取远端数据,之所以时间长,是因为task总是retry,不过居然最后都试成功了。也就是你的task从那个host不停地拉数据、不停地失败、不停地重试,在第4次fail之前,总能成功。基于这个猜测,我觉得看看那台主机的文件系统。如果文件系统没问题,就要看那台主机的负载,需要double check下是否真的没有大gc、数据是不是真的没有倾斜。

    共 3 条评论
    1