大规模数据处理实战
蔡元楠
硅谷资深工程师
41608 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 46 讲
大规模数据处理实战
15
15
1.0x
00:00/00:00
登录|注册

20 | 流处理案例实战:分析纽约市出租车载客信息

输出结果
滑动窗口操作
区域信息处理
Join操作
数据水印
数据过滤
创建schema
分割数据
创建Streaming DataFrame
Apache Kafka
付费信息数据集
出租车载客信息数据集
计算结果并输出
Stream-stream Join
数据清洗
流数据输入
数据集介绍
加水印是否是必须的?Outer-Join呢?
流的Inner-Join不支持完全输出模式的原因
Structured Streaming
RDD和DataFrame API的使用
思考题
Spark流处理
流处理案例实战:分析纽约市出租车载客信息

该思维导图由 AI 生成,仅供参考

你好,我是蔡元楠。
今天我要与你分享的主题是“流处理案例实战:分析纽约市出租车载客信息”。
在上一讲中,我们结合加州房屋信息的真实数据集,构建了一个基本的预测房价的线性回归模型。通过这个实例,我们不仅学习了处理大数据问题的基本流程,而且还进一步熟练了对 RDD 和 DataFrame API 的使用。
你应该已经发现,上一讲的实例是一个典型的批处理问题,因为处理的数据是静态而有边界的。今天让我们来一起通过实例,更加深入地学习用 Spark 去解决实际的流处理问题。
相信你还记得,在前面的章节中我们介绍过 Spark 两个用于流处理的组件——Spark Streaming 和 Structured Streaming。其中 Spark Streaming 是 Spark 2.0 版本前的的流处理库,在 Spark 2.0 之后,集成了 DataFrame/DataSet API 的 Structured Streaming 成为 Spark 流处理的主力。
今天就让我们一起用 Structured Streaming 对纽约市出租车的载客信息进行处理,建立一个实时流处理的 pipeline,实时输出各个区域内乘客小费的平均数来帮助司机决定要去哪里接单。

数据集介绍

今天的数据集是纽约市 2009~2015 年出租车载客的信息。每一次出行包含了两个事件,一个事件代表出发,另一个事件代表到达。每个事件都有 11 个属性,它的 schema 如下所示:
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文介绍了使用Spark的Structured Streaming对纽约市出租车载客信息进行实时流处理的案例。文章首先介绍了数据集的结构和特点,然后详细讲解了流数据输入的处理过程,包括使用Kafka模拟实时流数据源,并创建Streaming DataFrame。接着,文章讲解了数据清洗的过程,包括根据数据类型创建schema,将数据分割并加入相应的列,以及使用DataFrame的filter函数进行数据清洗。通过这些步骤,读者可以了解如何利用Spark的Structured Streaming处理实时流数据,并从中获取有用的信息。 在文章中,还介绍了流与流的Join(Stream-stream join)的支持,以及如何利用数据水印(Watermark)来解决流数据不完整的问题。通过实例代码,读者可以了解如何对两个数据流进行Join操作,并在其中使用水印来控制数据延迟,以避免内存泄漏。 另外,文章还介绍了如何对地理位置信息进行处理,将纽约市划分成区域,并计算实时的小费最高区域。最后,通过滑动窗口操作,输出每隔一段时间内每个区域内的平均小费,以帮助司机决定下一步去哪里接单。 总的来说,本文通过实际案例向读者展示了如何使用Spark进行流处理,具有一定的技术深度和实用性。读者可以从中了解到流处理的复杂性和对延迟性的要求,以及如何利用Structured Streaming对真实数据集进行流处理。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《大规模数据处理实战》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(13)

  • 最新
  • 精选
  • never leave
    官网上说inner join的watermark是可选的,outer join的watermark是必选的。但是我感觉应该都是必选的吧,就像案例中的inner join一样,如果不是必须的话,旧数据一直保存在内存中,有可能导致内存不够。

    作者回复: never leave同学,感谢提问。现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。 Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。 由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。

    2019-06-03
    23
  • FengX
    老师,请问join操作里有riderId了,为什么要加上endTime > startTime AND endTime <= startTime + interval 2 hours?

    作者回复: Feng.X,感谢提问。 这个限制目的在于抛弃任何长于2个小时的出租车出行数据。 对于这个例子来说,这样一个对事件时间的限制确实不是必须的。加入它其实是为了告诉你,在基于事件时间来join两个流时,我们一般不考虑时间跨度过大的情况,因为它们没有普遍意义,还会影响数据分析的结果。 举例,对于一个网页广告,我们需要知道用户看到一个广告后要多长时间才会去点击它,从而评估广告的效果。这里显然有两个流:一个代表用户看到广告的事件,另一个代表用户点击广告的事件。尽管我们可以通过用户的ID来Join这两个流,但是我们需要加一个限制,就是点击广告的时间不能比看到广告的时间晚太久,否则Join的结果很可能是不准确的。比如,用户可能在1:00和2:00都看到了广告,但是只在2:01点击了它,我们应该把2:00和2:01Join起来,而不应该Join1:00和2:01,因为1:00看到的广告并没有促使他点击。

    2019-06-03
    2
    8
  • jon
    不支持完全输出是因为join的只是一个时间窗口内的数据 在这个例子中inner join使用watermark 是必须的,left joinwatermark不是必须的

    作者回复: jon,感谢提问。 现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。 Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。 由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。

    2019-06-03
    2
  • Ming
    我猜: 对于inner join来说,用不用watermark只是纯粹的一个性能考量,不影响单条数据的正确性,只影响最终分析的样本大小。 对于outer join来说,用watermark会影响单条数据正确性,所以在逻辑上看应该是不推荐的,除非会有内存泄漏的风险。 我倒是好奇为啥spark把这个特性叫水印

    作者回复: Ming,感谢提问。 现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。 Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。 由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。

    2019-06-03
    2
  • Poleness
    请问下,这里解析kafka的value的时候,自定义了schema,但真正生产中很多数据的类型结构是很复杂的,徒手写schema的方式不一定可行。不知道有没有更优雅的方式? (看了源码,如果是json等格式好像可以自动推断,但是对于kafka,他的sourceSchema好像是写死的,不知大家有没有好的建议或者经验?)
    2019-06-04
    1
    5
  • 谢志斌
    老师好,纽约市出租车第一个数据集链接,无法访问。
    2020-04-23
    2
  • lhk
    老师你好,请教个watermark的问题:水印是为了解决数据出现延迟时,流处理程序要等待多久。那超过这个时间的数据就丢弃了吗?程序不会再处理他们了吗?比如水印设置30分钟,那31分钟到来的数据就不管了是吧?
    2019-09-17
    2
  • 刘万里
    老师 您好,最近好久没用spark,有个问题请教一下,现在最新spark是否已经支持cep了
    2019-06-03
    1
  • YX
    比如说,如果定义水印是 10 分钟,数据 A 的事件时间是 1:00,数据 B 的事件时间是 1:10,由于数据传输发生了延迟,我们在 1:15 才收到了 A 和 B,那么我们将只处理数据 B 并更新结果,A 会被无视。 ----------------------------------- 这里对水印的表述存在一定的不准确,应该是和具体收到的时间无关,而是「max event time seen by the engine 」系统当前最大的event time。
    2021-10-19
  • 天敌
    老师,数据集下载不了了,能再分享一下吗?
    2021-03-24
收起评论
显示
设置
留言
13
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部