Spark性能调优实战
吴磊
FreeWheel机器学习团队负责人
立即订阅
1105 人已学习
课程目录
已更新 7 讲 / 共 32 讲
0/4登录后,你可以任选4讲全文学习。
课前必学 (3讲)
开篇词 | Spark性能调优,你该掌握这些“套路”
免费
01 | 性能调优的必要性:Spark本身就很快,为啥还需要我调优?
02 | 性能调优的本质:调优的手段五花八门,该从哪里入手?
原理篇 (4讲)
03 | RDD:为什么你必须要理解弹性分布式数据集?
04 | DAG与流水线:到底啥叫“内存计算”?
05 | 调度系统:“数据不动代码动”到底是什么意思?
06 | 存储系统:空间换时间,还是时间换空间?
Spark性能调优实战
15
15
1.0x
00:00/00:00
登录|注册

01 | 性能调优的必要性:Spark本身就很快,为啥还需要我调优?

吴磊 2021-03-15
你好,我是吴磊。
在日常的开发工作中,我发现有个现象很普遍。很多开发者都认为 Spark 的执行性能已经非常强了,实际工作中只要按部就班地实现业务功能就可以了,没有必要进行性能调优。
你是不是也这么认为呢?确实,Spark 的核心竞争力就是它的执行性能,这主要得益于 Spark 基于内存计算的运行模式和钨丝计划的锦上添花,以及 Spark SQL 上的专注与发力。
但是,真如大家所说,开发者只要把业务逻辑实现了就万事大吉了吗?这样,咱们先不急于得出结论,你先跟着我一起看两个日常开发中常见的例子,最后我们再来回答这个问题。
在数据应用场景中,ETL(Extract Transform Load)往往是打头阵的那个,毕竟源数据经过抽取和转换才能用于探索和分析,或者是供养给机器学习算法进行模型训练,从而挖掘出数据深层次的价值。我们今天要举的两个例子,都取自典型 ETL 端到端作业中常见的操作和计算任务。

开发案例 1:数据抽取

第一个例子很简单:给定数据条目,从中抽取特定字段。这样的数据处理需求在平时的 ETL 作业中相当普遍。想要实现这个需求,我们需要定义一个函数 extractFields:它的输入参数是 Seq[Row]类型,也即数据条目序列;输出结果的返回类型是 Seq[(String, Int)],也就是(String, Int)对儿的序列;函数的计算逻辑是从数据条目中抽取索引为 2 的字符串和索引为 4 的整型。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Spark性能调优实战》,如需阅读全部文章,
请订阅文章所属专栏
立即订阅
登录 后留言

精选留言(14)

  • Will
    第二个例子,可以利用map join,让小数据分发到每个worker上,这样不用shuffle数据

    作者回复: 没错,Broadcast joins可以进一步提升性能。

    2021-03-15
    1
    7
  • TaoInsight
    如果pai rDF的startDate和endDate范围有限,可以把日期范围展开,将非等值join转成等值join

    作者回复: 这块能展开说说吗?具体怎么转化为等值join?可以举个例子哈~

    2021-03-17
    3
    2
  • 葛聂
    Case 1为什么性能差一倍呢

    作者回复: 好问题,其实改动非常小,开销相比正例也不大,但这里的关键在于,这个函数会被反反复复调用上百次,累积下来,开销就上去了。所以,关键不在于点小不小,而是这个点,是不是瓶颈。

    2021-03-16
    1
    2
  • 刘吉超
    我们每天有9T数据,用如下代码做ETL json平铺,花很长时间
    val adArr = ArrayBuffer[Map[String, String]]()
    if (ads != null) {
      val adnum = ads.length
      for (i <- 0 until adnum) {
        val addObj = ads.getJSONObject(i)
        val adMap = THashMap[String, String]()
        addObj.entrySet().foreach(x => adMap += (x.getKey -> (if(x.getValue==null) "" else x.getValue.toString ) ))
        adArr += adMap.toMap
      }
    }
    基于老师讲的,避免副作用,改为如下代码
    import org.apache.flink.streaming.api.scala._
    import scala.collection.JavaConversions._
     val adArr = (0 until ads.size()).map(i => ads.getJSONObject(i).toMap.map(entry => entry._1 -> (if(entry._2==null) "" else entry._2.toString)))
    尝试后没啥效果,希望老师指导一下

    编辑回复: 兄弟我是作者哈,第二份代码,我有几个疑问:
    1. 两个import语句的作用是什么?
    2. ads具体是什么内容?是RDD,还是数组,还是什么?
    3. 没有看到哪里定义分布式数据集,所有计算看上去是基于(0 until ads.size())这个List,那么后续所有的计算,map,map里面的toMap,都是在driver计算的,如果你9T数据都在driver计算,那结果。。。
    4. toMap之后,又加了个map,我理解是为了把value中的null替换为空字符串,如果是这样的话,map里面只处理value就好了,不用带着key

    不知道我理解的对不对哈,期待老弟提供更多信息~

    2021-03-24
    1
    1
  • Elon
    函数式的副作用指的是不修改入参吧?在函数内部是可以定义变量、修改变量的。因此fields变量在函数内部,应该不算副作用吧?

    作者回复: 是的,你说的没错。函数的副作用指的是对外部变量、外部环境的影响,内部状态的改变和转换不算。文中这块表述的不严谨哈,这里主要是想强调可变变量fields带来的计算开销。

    2021-03-22
    1
  • fsc2016
    请问老师,这个课程需要哪些基础,我平时使用过pysaprk 做过一些机器学习相关数据处理练习,对于我这种使用spark不多的,可以消化吸收嘛

    作者回复: 可以,没问题,接触过Spark就行。放心吧,原理部分会有大量的生活化类比和故事,尽可能地让你“边玩边学”。另外,咱们有微信群,有问题可以随时探讨~

    2021-03-18
    1
  • 对方正在输入。。。
    可以先将pairdf collect到driver,再将数组按照startdate排序,然后再将其广播。然后在factdf.map里面实现一个方法来从广播的数组里面二分查找到eventdate所属的时间对子。最后就可以根据这个时间对子以及其他的维度属性进行分组聚合了

    作者回复: 广播的思路很赞。不过二分查找这里值得商榷哈,咱们目的是过滤出满足条件的event date,然后和其他维度一起、分组聚合。这里关键不在于过滤和查找效率,关键在于大表的重复扫描,只要解决这个核心痛点,性能问题就迎刃而解。

    2021-03-15
    1
  • 方得始终
    错过了老师上星期的直播,请问在哪里可以看到回放吗?

    编辑回复: 上传到了B站:https://www.bilibili.com/video/BV1AX4y1G7Ks/

    2021-03-21
  • 方得始终
    代码会集中放到GitHub上面吗?这样方便于以后学习查找。我主要用 pyspark, 这个课程只用Scala吗?请问有关于Scala spark的入门教程吗?

    作者回复: 可以的,后面会统一放到git上,方便大家查阅。对,目前专栏里的代码示例都是Scala,主要是简洁,用很短的代码就能示意。如果习惯用pyspark开发,其实不用刻意去学Scala,API其实大同小异,区别不大。pyspark比较熟悉的话,Scala的代码也很容易看懂~ 另外,执行性能pyspark也不差,和Scala,Java相当。除了udf需要跨进程、有一些开销之外,整体还好。

    2021-03-20
  • 天渡
    可否将小表进行broadcast,将reduce端join变为map端join。

    作者回复: 广播的思路没问题~

    2021-03-18
  • LJK
    同一个application如果action多的话一定会影响效率吗?

    作者回复: 不一定的,action个数不是关键,关键是数据的访问效率。关于提升数据访问效率,咱们专栏后面的内容会讲哈~

    2021-03-17
  • Shockang
    Case1里面除了老师讲的副作用外,我认为Scala在处理闭包时也会存在一定的性能损耗,Case2里面把大变量广播出去是一种常见的操作,另外,filter之后加上coalesce 也是比较常见的优化手段

    作者回复: case1的副作用本身其实还好,主要是它引入的开销,就是你说的性能损耗。case2的广播思路没问题~

    coalesce这里有待商榷哈,正例里面已经没有filter,反例里面加coalesce也于事无补。

    2021-03-16
  • rb@31
    另外,不知道老师的更新频率。大概多久能全部更新好?

    编辑回复: 你好,课程详情页处有更新安排哦,每周一,三,五更新。预计更新到5月份

    2021-03-16
  • Geek_92df49
    四个维度分组为什么要加上开始时间和结束时间?
    .groupBy("dim1", "dim2", "dim3", "event_date", "startDate", "endDate")

    编辑回复: 兄弟我是作者哈,你说的没错,分组只需要前4个字段,但是你看最后,instances.write.partitionBy("endDate", "startDate").parquet(rootPath),需要用开始和结束时间这两个字段去做分区存储,因此,在前一步分组的时候,把这两个字段保留了下来。

    2021-03-15
    9
收起评论
14
返回
顶部