29 | 消息驱动:如何集成 Stream 实现消息驱动?
- 深入了解
- 翻译
- 解释
- 总结
本文介绍了如何使用Spring Cloud Stream技术集成消息驱动,并以RabbitMQ消息中间件为例进行了实践场景的讲解。文章首先指出了传统的Stream注解方式已经被淘汰,取而代之的是更为流行的Functional Programming风格,即函数式编程。作者通过一个实战案例展示了如何将两个具有关联性的业务服务改造成基于消息驱动的实现方式。接着,文章详细介绍了消息生产者和消费者的定义和实现过程,包括添加生产者逻辑、定义消费者逻辑以及配置文件的内容。通过示例代码和解释,读者可以了解到如何使用StreamBridge发送消息到RabbitMQ,以及如何定义消费者方法来消费消息事件。整体而言,本文通过实际案例和详细步骤,向读者展示了如何使用Spring Cloud Stream技术实现消息驱动,体现了文章的技术特点和实用性。文章还提到了配置文件的细节,包括Binder和Binding两部分的配置,以及消息生产者和消费者的信道定义和RabbitMQ消息队列的关系。最后,文章总结了使用functional event风格实现消息驱动的注意事项,并提出了思考题,引发读者对异常处理方式的思考。整体而言,本文内容丰富,适合对Spring Cloud Stream技术感兴趣的读者阅读学习。
《Spring Cloud 微服务项目实战》,新⼈⾸单¥59
全部留言(9)
- 最新
- 精选
- Carla约定的多了,怎么感觉反而变得麻烦了😂
作者回复: 讲真,而且很容易出错,排查起来还不容易 :)
2022-05-125 - 曹亮老师,请教一下,在微服务项目中Stream和mq结合使用,和直接在服务里将消息发送到mq,这两种有什么区别?用stream的优势体现在哪里?
作者回复: 直接发送mq可以理解为JDBC,使用Stream可以理解为在jdbc上卷了一层JPA,后一个比前一个卷,造轮子哈哈 回到MQ来看的话就相当于Stream在底层中间件之上做了一层spring-integration的抽象,屏蔽了底层细节。
2022-06-174 - 密码123456感觉,这配置好难。配了好几天都没有配好。
作者回复: 世上无难事,只要肯放弃。同学直接用我的源码吧:)
2022-05-252 - wake老师如果我其它方法名也叫addCoupon会被误认为消费者吗?或者说只有返回值为Consumer才会被认定为消费者,那如果有两个同样的返回Consumer的addCoupon方法呢
作者回复: 其实和返回值关系不大,Consumer只是函数式编程的一种实现方式,还有其他方式比如Flux、Supplier。如果方法重名,我印象中是遵循先来后到,后初始化的bean对应的consumer优先生效。同学可以本地试一下,回头告诉我们答案哈
2022-02-2221 - zkr为啥生产者发消息,消费者接受消息不用指定队列名和routing key?队列这个我还能理解,我试了下是按topic名+group名自动生成了一个,但是为啥topic模式不用指定routing key?
作者回复: routing key也可以在配置中指定,后面做消息分区的时候这部分有讲,如果需要定制的话可以参考
2022-07-26归属地:上海 - 奔跑的蚂蚁试了下spring-cloud-starter-stream-rocketmq 的 function binding 第一次发送消息会报日志 DefaultBinderFactory : Retrieving cached binder: rocketmq DirectWithAttributesChannel : Channel 'unknown.channel.name' has 1 subscriber(s). BeanFactoryAwareFunctionRegistry : Looking up function 'streamBridge' with acceptedOutputTypes: [application/json] .MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one. 这些都正常嘛
作者回复: 这几行日志没看到报错信息,应该是INFO log,如果有Error可以把完整log trace发出来一起看下
2022-02-242 - 奔跑的蚂蚁我们项目用了spirng.cloud.stream.rocketMq 出了异常是自动重试,老师能讲讲原理嘛 能自定义重试间隔是次数嘛(是不是mq里面配置的)
作者回复: 重试策略(次数+每次重试间隔)会在后面的一节课程里单独讲到,应该快上线了
2022-02-18 - peter请教老师几个问题: Q1:消费者标识是否可以不用方法名,另外指定一个? Q2:“采用了 Consumer 的实现方式”,“添加消息消费者”部分的这句话中,consumer的实现方式是指什么?笔误吗? Q3 :“return request -> ”,request从哪里来? CouponConsumer类中,addCoupon方法中直接用了request,此request从哪里来的? Q4:配置文件中,binders下面的my-rabbit,这个名字是任意的吗? 如果有多个,其他的也要起一个不同的名字吗? 比如my-rabbit2, my-kafka等 Q5: 交换机和队列是怎么对应的? A 两个交换机,两个队列,每个交换机都可以将消息发送到两个队列吗? B 队列的名字,用点号分成两部分,有什么含义吗?
作者回复: Q1:不走寻常路是可以,但是失去了约定大约配置的意义 Q2:Consumer是函数式编程的一种方式 Q3:函数式编程语法,建议先找一些开源资料熟悉下语法 Q4: 任意名字 Q5:交换机和队列之间可以建立绑定关系,官网的帮助文档里有详细信息https://www.rabbitmq.com/documentation.html
2022-02-182 - kimoti消费消息发生异常,消息重新进入队列重新消费2022-05-06