20 | 流处理案例实战:分析纽约市出租车载客信息
该思维导图由 AI 生成,仅供参考
数据集介绍
- 深入了解
- 翻译
- 解释
- 总结
本文介绍了使用Spark的Structured Streaming对纽约市出租车载客信息进行实时流处理的案例。文章首先介绍了数据集的结构和特点,然后详细讲解了流数据输入的处理过程,包括使用Kafka模拟实时流数据源,并创建Streaming DataFrame。接着,文章讲解了数据清洗的过程,包括根据数据类型创建schema,将数据分割并加入相应的列,以及使用DataFrame的filter函数进行数据清洗。通过这些步骤,读者可以了解如何利用Spark的Structured Streaming处理实时流数据,并从中获取有用的信息。 在文章中,还介绍了流与流的Join(Stream-stream join)的支持,以及如何利用数据水印(Watermark)来解决流数据不完整的问题。通过实例代码,读者可以了解如何对两个数据流进行Join操作,并在其中使用水印来控制数据延迟,以避免内存泄漏。 另外,文章还介绍了如何对地理位置信息进行处理,将纽约市划分成区域,并计算实时的小费最高区域。最后,通过滑动窗口操作,输出每隔一段时间内每个区域内的平均小费,以帮助司机决定下一步去哪里接单。 总的来说,本文通过实际案例向读者展示了如何使用Spark进行流处理,具有一定的技术深度和实用性。读者可以从中了解到流处理的复杂性和对延迟性的要求,以及如何利用Structured Streaming对真实数据集进行流处理。
《大规模数据处理实战》,新⼈⾸单¥59
全部留言(13)
- 最新
- 精选
- never leave官网上说inner join的watermark是可选的,outer join的watermark是必选的。但是我感觉应该都是必选的吧,就像案例中的inner join一样,如果不是必须的话,旧数据一直保存在内存中,有可能导致内存不够。
作者回复: never leave同学,感谢提问。现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。 Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。 由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。
2019-06-0323 - FengX老师,请问join操作里有riderId了,为什么要加上endTime > startTime AND endTime <= startTime + interval 2 hours?
作者回复: Feng.X,感谢提问。 这个限制目的在于抛弃任何长于2个小时的出租车出行数据。 对于这个例子来说,这样一个对事件时间的限制确实不是必须的。加入它其实是为了告诉你,在基于事件时间来join两个流时,我们一般不考虑时间跨度过大的情况,因为它们没有普遍意义,还会影响数据分析的结果。 举例,对于一个网页广告,我们需要知道用户看到一个广告后要多长时间才会去点击它,从而评估广告的效果。这里显然有两个流:一个代表用户看到广告的事件,另一个代表用户点击广告的事件。尽管我们可以通过用户的ID来Join这两个流,但是我们需要加一个限制,就是点击广告的时间不能比看到广告的时间晚太久,否则Join的结果很可能是不准确的。比如,用户可能在1:00和2:00都看到了广告,但是只在2:01点击了它,我们应该把2:00和2:01Join起来,而不应该Join1:00和2:01,因为1:00看到的广告并没有促使他点击。
2019-06-0328 - jon不支持完全输出是因为join的只是一个时间窗口内的数据 在这个例子中inner join使用watermark 是必须的,left joinwatermark不是必须的
作者回复: jon,感谢提问。 现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。 Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。 由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。
2019-06-032 - Ming我猜: 对于inner join来说,用不用watermark只是纯粹的一个性能考量,不影响单条数据的正确性,只影响最终分析的样本大小。 对于outer join来说,用watermark会影响单条数据正确性,所以在逻辑上看应该是不推荐的,除非会有内存泄漏的风险。 我倒是好奇为啥spark把这个特性叫水印
作者回复: Ming,感谢提问。 现阶段不仅Inner-join不支持完全输出模式,任何类型的Join都不支持完全输出模式。因为完全输出模式要求每当有新数据输入时,输出完整的结果表。而对于无边界数据,我们很难把所有历史数据存在内存中。所以,一般Join的都是在某个时间窗口内的流数据,这就是引入watermarking的原因。希望将来Spark可以引入新的机制来支持这一点。 Outer join是要在Inner Join的基础上,把没有匹配的行的对应列设为NULL。但是由于流数据的无边界性,Spark永远无法知道在未来会不会找到匹配的数据。所以为了保证Outer Join的正确性,加水印是必须的。这样Spark的执行引擎只要在水印的有效期内没找到与之匹配的数据,就可以把对应的列设为NULL并输出。 由于Inner Join不需要连接两个表中所有的行,所以在Spark官网的叙述中,水印和事件时间的限制不是必须的。但是如果不加任何限制,流数据会不断被读入内存,这样不安全的。所以,即便是Inner Join,我也推荐你加水印和事件时间的限制。
2019-06-032 - Poleness请问下,这里解析kafka的value的时候,自定义了schema,但真正生产中很多数据的类型结构是很复杂的,徒手写schema的方式不一定可行。不知道有没有更优雅的方式? (看了源码,如果是json等格式好像可以自动推断,但是对于kafka,他的sourceSchema好像是写死的,不知大家有没有好的建议或者经验?)2019-06-0415
- 谢志斌老师好,纽约市出租车第一个数据集链接,无法访问。2020-04-232
- lhk老师你好,请教个watermark的问题:水印是为了解决数据出现延迟时,流处理程序要等待多久。那超过这个时间的数据就丢弃了吗?程序不会再处理他们了吗?比如水印设置30分钟,那31分钟到来的数据就不管了是吧?2019-09-172
- 刘万里老师 您好,最近好久没用spark,有个问题请教一下,现在最新spark是否已经支持cep了2019-06-031
- YX比如说,如果定义水印是 10 分钟,数据 A 的事件时间是 1:00,数据 B 的事件时间是 1:10,由于数据传输发生了延迟,我们在 1:15 才收到了 A 和 B,那么我们将只处理数据 B 并更新结果,A 会被无视。 ----------------------------------- 这里对水印的表述存在一定的不准确,应该是和具体收到的时间无关,而是「max event time seen by the engine 」系统当前最大的event time。2021-10-19
- 天敌老师,数据集下载不了了,能再分享一下吗?2021-03-24