30|Structured Streaming:从“流动的Word Count”开始
环境准备
- 深入了解
- 翻译
- 解释
- 总结
本文介绍了Structured Streaming框架下的流处理实例“流动的Word Count”,通过实例演示了在流处理框架下如何实现实时的Word Count计算。作者详细讲解了环境准备和实现流动的Word Count的步骤,包括使用netcat工具向本地9999端口的Socket地址发送数据,并使用Spark的Structured Streaming框架监听该端口,实时处理数据。文章重点介绍了数据加载和数据处理的过程,包括使用SparkSession的readStream API创建DataFrame、指定数据源类型和选项,以及使用DataFrame API完成Word Count计算逻辑。此外,文章还介绍了在Structured Streaming框架下,如何使用writeStream API将处理结果写入到不同类型的Sink中,以及不同的输出模式。通过对比Complete mode和Update mode的执行结果,读者可以直观地了解不同输出模式的特点和区别。整体而言,本文以实例为引,详细介绍了Structured Streaming框架下的流处理实践,对于想要了解流处理基本概念和实现方法的读者具有很好的参考价值。
《零基础入门 Spark》,新⼈⾸单¥59
全部留言(3)
- 最新
- 精选
- 东围居士老师,我一直都有个疑惑,Spark Streaming 和 Spark Structured Streaming 有什么区别呢,现在企业里面用哪个更多,如果我们现在要学习的话,是不是只学 Spark Structured Streaming 就可以了?
作者回复: 好问题,Spark Streaming、Spark MLlib(RDD based)、GraphX,和Structured Streaming、Spark MLlib(DataFrame based)、GraphFrames,都是相对的,前者都是基于Spark Core,都是基于RDD的计算子框架,而后者,都是基于DataFrame,可以共享Spark SQL性能红利的子框架。 简单来说,后者是前者的“继任者”,Spark社区官方,会推荐大家从老的RDD based子框架,迁移到新的DataFrame based、Spark SQL powered 子框架。 不过呢,出于历史原因,很多企业在很早就adopt了Spark,比如早在1.x版本,就开始采用Spark了,那个时候,大家用的还都是RDD based子框架,应用代码也都是基于那些子框架开发的,这里面有个迁移成本的问题。所以,据我所知,到现在为止,还有一些公司,出于怕麻烦的原因,还没有迁移,还在沿用老的子框架。 不过,如果是公司新的业务需求,或者新上马Spark,那么我建议都采用DataFrame based、Spark SQL powered新框架,来共享Spark SQL提供的执行性能。当然了,新框架的开发、完善,还需要一定的过程,不过到目前为止,在功能方面,新框架基本上都支持了,而且像Structured Streaming,相比老的Spark Streaming,还提供了Event time、Late data、Watermark这些更丰富的功能支持。所以说,如果受公司所限,不得不维护老框架,那这个确实比较麻烦一些,还要学老框架;但如果是新项目、新部署,那么还是推荐新框架,来同时收获完善的功能和高效的执行性能~
2021-11-236 - 大叮当老师您好,一直有个问题想请请您解惑下。 就是kafka每个主题都有个分区的概念,理论上说,一个消费者组中消费者数目和分区数一致,是效率最高的。 引申到Spark Streaming/Spark Structured Streaming,我的理解: 1、executor数目,和要消费的topic中的分区数目一致,效能最高,不知道这个点我理解对不对。 我的问题是:假设我的场景就是Spark Streaming/Spark Structured Streaming消费kafka解析其中的json数据,然后写入诸如redis,hbase这样的nosql组件。 基于这样的场景,我是不是每个executor只分配1个CPU核就可以了,比如我一个topic有3个partition,那么我指定3个executor(先不考虑driver),每个executor1个cpu核就够了,如果每个executor多个核反而浪费了,用不到?? 恳请老师解惑,谢谢
作者回复: 好问题~ 完全正确:“理论上说,一个消费者组中消费者数目和分区数一致,是效率最高的”,不过,更具体地说,Kafka Partitions的数量,应该和Spark Executors所拥有的总cores相对应。 举个例子,一个Kafka Topic,有60个Partitions,那么一个Spark集群,不管有多少个Executors,总cores可以被60整除,就是比较好的设置。比方说,有10个Executors,每个Executors有3个core,总共30 cores,这个是个好配置。再比如,30个Executors,每个Executor一个core,这个设置也行。在或者,30个Executors,每个Executor两个core,也可以的~ 不过,像你说的,如果Kafka总共60个Partitions,但是Spark集群起的Executors cores大于这个数字,那多余的CPU就会浪费掉。比如说,起20个Executors,但是每个Executors给4个core,那多出来的20个core,就完全浪费掉了。 其实,对于Kafka Partitions与Spark的对应关系,你不妨这样简单理解: 就把Kafka当成HDFS,把Kafka的Topic,当成是HDFS上的一个分布式文件,Kafka Topic的Partitions,其实可以类比分布式文件在HDFS上的数据分片。这样来理解,很多设置就迎刃而解了~
2021-11-1926 - blank想问一下老师,_spark_metadata在本地时丢失 不会影响streaming job正常运行,但在azure上,会发生unable to find batch /_spark_metadata/0的情况 这个问题要怎么处理呢。什么情况下metadata文件会丢失呢2022-09-06归属地:上海1