大规模数据处理实战
蔡元楠
Google Brain资深工程师
立即订阅
8443 人已学习
课程目录
已完结 46 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 从这里开始,带你走上硅谷一线系统架构师之路
免费
模块一 | 直通硅谷大规模数据处理技术 (3讲)
01 | 为什么MapReduce会被硅谷一线公司淘汰?
02 | MapReduce后谁主沉浮:怎样设计下一代数据处理技术?
03 | 大规模数据处理初体验:怎样实现大型电商热销榜?
模块二 | 实战学习大规模数据处理基本功 (8讲)
04 | 分布式系统(上):学会用服务等级协议SLA来评估你的系统
05 | 分布式系统(下):架构师不得不知的三大指标
06 | 如何区分批处理还是流处理?
07 | Workflow设计模式:让你在大规模数据世界中君临天下
08 | 发布/订阅模式:流处理架构中的瑞士军刀
09 | CAP定理:三选二,架构师必须学会的取舍
10 | Lambda架构:Twitter亿级实时数据分析架构背后的倚天剑
11 | Kappa架构:利用Kafka锻造的屠龙刀
模块三 | 抽丝剥茧剖析Apache Spark设计精髓 (10讲)
12 | 我们为什么需要Spark?
13 | 弹性分布式数据集:Spark大厦的地基(上)
14 | 弹性分布式数据集:Spark大厦的地基(下)
15 | Spark SQL:Spark数据查询的利器
16 | Spark Streaming:Spark的实时流计算API
17 | Structured Streaming:如何用DataFrame API进行实时数据分析?
18 | Word Count:从零开始运行你的第一个Spark应用
19 | 综合案例实战:处理加州房屋信息,构建线性回归模型
20 | 流处理案例实战:分析纽约市出租车载客信息
21 | 深入对比Spark与Flink:帮你系统设计两开花
模块四 | Apache Beam为何能一统江湖 (8讲)
22 | Apache Beam的前世今生
23 | 站在Google的肩膀上学习Beam编程模型
24 | PCollection:为什么Beam要如此抽象封装数据?
25 | Transform:Beam数据转换操作的抽象方法
26 | Pipeline:Beam如何抽象多步骤的数据流水线?
27 | Pipeline I/O: Beam数据中转的设计模式
28 | 如何设计创建好一个Beam Pipeline?
29 | 如何测试Beam Pipeline?
模块五 | 决战 Apache Beam 真实硅谷案例 (7讲)
30 | Apache Beam实战冲刺:Beam如何run everywhere?
31 | WordCount Beam Pipeline实战
32 | Beam Window:打通流处理的任督二脉
33 | 横看成岭侧成峰:再战Streaming WordCount
34 | Amazon热销榜Beam Pipeline实战
35 | Facebook游戏实时流处理Beam Pipeline实战(上)
36 | Facebook游戏实时流处理Beam Pipeline实战(下)
模块六 | 大规模数据处理的挑战与未来 (4讲)
37 | 5G时代,如何处理超大规模物联网数据
38 | 大规模数据处理在深度学习中如何应用?
39 | 从SQL到Streaming SQL:突破静态数据查询的次元
40 | 大规模数据处理未来之路
专栏加餐 | 特别福利 (4讲)
FAQ第一期 | 学习大规模数据处理需要什么基础?
加油站 | Practice makes perfect!
FAQ第二期 | Spark案例实战答疑
FAQ第三期 | Apache Beam基础答疑
结束语 (1讲)
结束语 | 世间所有的相遇,都是久别重逢
大规模数据处理实战
登录|注册

20 | 流处理案例实战:分析纽约市出租车载客信息

蔡元楠 2019-06-03
你好,我是蔡元楠。
今天我要与你分享的主题是“流处理案例实战:分析纽约市出租车载客信息”。
在上一讲中,我们结合加州房屋信息的真实数据集,构建了一个基本的预测房价的线性回归模型。通过这个实例,我们不仅学习了处理大数据问题的基本流程,而且还进一步熟练了对 RDD 和 DataFrame API 的使用。
你应该已经发现,上一讲的实例是一个典型的批处理问题,因为处理的数据是静态而有边界的。今天让我们来一起通过实例,更加深入地学习用 Spark 去解决实际的流处理问题。
相信你还记得,在前面的章节中我们介绍过 Spark 两个用于流处理的组件——Spark Streaming 和 Structured Streaming。其中 Spark Streaming 是 Spark 2.0 版本前的的流处理库,在 Spark 2.0 之后,集成了 DataFrame/DataSet API 的 Structured Streaming 成为 Spark 流处理的主力。
今天就让我们一起用 Structured Streaming 对纽约市出租车的载客信息进行处理,建立一个实时流处理的 pipeline,实时输出各个区域内乘客小费的平均数来帮助司机决定要去哪里接单。

数据集介绍

今天的数据集是纽约市 2009~2015 年出租车载客的信息。每一次出行包含了两个事件,一个事件代表出发,另一个事件代表到达。每个事件都有 11 个属性,它的 schema 如下所示:
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《大规模数据处理实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(8)

  • never leave
    官网上说inner join的watermark是可选的,outer join的watermark是必选的。但是我感觉应该都是必选的吧,就像案例中的inner join一样,如果不是必须的话,旧数据一直保存在内存中,有可能导致内存不够。

    作者回复: never leave同学,感谢提问。现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。

    Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。

    由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。

    2019-06-03
    10
  • Poleness
    请问下,这里解析kafka的value的时候,自定义了schema,但真正生产中很多数据的类型结构是很复杂的,徒手写schema的方式不一定可行。不知道有没有更优雅的方式?
    (看了源码,如果是json等格式好像可以自动推断,但是对于kafka,他的sourceSchema好像是写死的,不知大家有没有好的建议或者经验?)
    2019-06-04
    2
  • jon
    不支持完全输出是因为join的只是一个时间窗口内的数据
    在这个例子中inner join使用watermark 是必须的,left joinwatermark不是必须的

    作者回复: jon,感谢提问。

    现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。

    Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。

    由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。

    2019-06-03
    2
  • lhk
    老师你好,请教个watermark的问题:水印是为了解决数据出现延迟时,流处理程序要等待多久。那超过这个时间的数据就丢弃了吗?程序不会再处理他们了吗?比如水印设置30分钟,那31分钟到来的数据就不管了是吧?
    2019-09-17
    1
  • Feng.X
    老师,请问join操作里有riderId了,为什么要加上endTime > startTime AND endTime <= startTime + interval 2 hours?

    作者回复: Feng.X,感谢提问。

    这个限制目的在于抛弃任何长于2个小时的出租车出行数据。

    对于这个例子来说,这样一个对事件时间的限制确实不是必须的。加入它其实是为了告诉你,在基于事件时间来join两个流时,我们一般不考虑时间跨度过大的情况,因为它们没有普遍意义,还会影响数据分析的结果。

    举例,对于一个网页广告,我们需要知道用户看到一个广告后要多长时间才会去点击它,从而评估广告的效果。这里显然有两个流:一个代表用户看到广告的事件,另一个代表用户点击广告的事件。尽管我们可以通过用户的ID来Join这两个流,但是我们需要加一个限制,就是点击广告的时间不能比看到广告的时间晚太久,否则Join的结果很可能是不准确的。比如,用户可能在1:00和2:00都看到了广告,但是只在2:01点击了它,我们应该把2:00和2:01Join起来,而不应该Join1:00和2:01,因为1:00看到的广告并没有促使他点击。

    2019-06-03
    1
  • 刘万里
    老师 您好,最近好久没用spark,有个问题请教一下,现在最新spark是否已经支持cep了
    2019-06-03
    1
  • 北京知府
    这部分数据有个不太直观的地方,那就是同一次出行会有两个记录,…
    为何会出现两个记录?用一条记录也能记录出发和到达吧?
    2019-06-12
  • Ming
    我猜:

    对于inner join来说,用不用watermark只是纯粹的一个性能考量,不影响单条数据的正确性,只影响最终分析的样本大小。
    对于outer join来说,用watermark会影响单条数据正确性,所以在逻辑上看应该是不推荐的,除非会有内存泄漏的风险。

    我倒是好奇为啥spark把这个特性叫水印

    作者回复: Ming,感谢提问。

    现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。

    Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。

    由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。

    2019-06-03
收起评论
8
返回
顶部