玩转 Spring 全家桶
丁雪丰
美团研究员,《Spring Boot 实战》、《Spring 攻略》译者,腾讯云 TVP。
97978 人已学习
新⼈⾸单¥68
课程目录
已完结/共 123 讲
第十一章:Spring Cloud及Cloud Native概述 (5讲)
玩转 Spring 全家桶
登录|注册
留言
8
收藏
沉浸
阅读
分享
手机端
回顶部
当前播放: 116 | 通过Spring Cloud Stream访问Kafka
00:00 / 00:00
高清
  • 高清
1.0x
  • 2.0x
  • 1.5x
  • 1.25x
  • 1.0x
  • 0.75x
  • 0.5x
网页全屏
全屏
00:00
付费课程,可试看
01 | Spring课程介绍
02 | 一起认识Spring家族的主要成员
03 | 跟着Spring了解技术趋势
04 | 编写你的第一个Spring程序
05 | 如何配置单数据源
06 | 如何配置多数据源
07 | 那些好用的连接池们:HikariCP
08 | 那些好用的连接池们:Alibaba Druid
09 | 如何通过Spring JDBC访问数据库
10 | 什么是Spring的事务抽象(上)
11 | 什么是Spring的事务抽象(下)
12 | 了解Spring的JDBC异常抽象
13 | 课程答疑(上)
14 | 课程答疑(下)
15 | 认识Spring Data JPA
16 | 定义JPA的实体对象
17 | 开始我们的线上咖啡馆实战项目:SpringBucks
18 | 通过Spring Data JPA操作数据库
19 | Spring Data JPA的Repository是怎么从接口变成Bean的
20 | 通过MyBatis操作数据库
21 | 让MyBatis更好用的那些工具:MyBatis Generator
22 | 让MyBatis更好用的那些工具:MyBatis PageHelper
23 | SpringBucks实战项目进度小结
24 | 通过Docker辅助开发
25 | 在Spring中访问MongoDB
26 | 在Spring中访问Redis
27 | Redis的哨兵与集群模式
28 | 了解Spring的缓存抽象
29 | Redis在Spring中的其他用法
30 | SpringBucks实战项目进度小结
31 | Project Reactor介绍(上)
32 | Project Reactor介绍(下)
33 | 通过Reactive的方式访问Redis
34 | 通过Reactive的方式访问MongoDB
35 | 通过Reactive的方式访问RDBMS
36 | 通过AOP打印数据访问层的摘要(上)
37 | 通过AOP打印数据访问层的摘要(下)
38 | SpringBucks实战项目进度小结
39 | 编写第一个Spring MVC Controller
40 | 理解Spring的应用上下文
41 | 理解请求的处理机制
42 | 如何定义处理方法(上)
43 | 如何定义处理方法(下)
44 | Spring MVC中的视图解析机制(上)
45 | Spring MVC中的视图解析机制(下)
46 | Spring MVC中的常用视图(上)
47 | Spring MVC中的常用视图(下)
48 | 静态资源与缓存
49 | Spring MVC中的异常处理机制
50 | 了解Spring MVC的切入点
51 | SpringBucks实战项目进度小结
52 | 课程答疑
53 | 通过RestTemplate访问Web资源
54 | RestTemplate的高阶用法
55 | 简单定制RestTemplate
56 | 通过WebClient访问Web资源
57 | SpringBucks实战项目进度小结
58 | 设计好的RESTful Web Service(上)
59 | 设计好的RESTful Web Service(下)
60 | 什么是HATEOAS
61 | 使用Spring Data REST实现简单的超媒体服务(上)
62 | 使用Spring Data REST实现简单的超媒体服务(下)
63 | 分布式环境中如何解决Session的问题
64 | 使用WebFlux代替Spring MVC(上)
65 | 使用WebFlux代替Spring MVC(下)
66 | SpringBucks实战项目进度小结
67 | 认识Spring Boot的组成部分
68 | 了解自动配置的实现原理
69 | 动手实现自己的自动配置
70 | 如何在低版本Spring中快速实现类似自动配置的功能
71 | 了解起步依赖及其实现原理
72 | 定制自己的起步依赖
73 | 深挖Spring Boot的配置加载机制
74 | 理解配置背后的PropertySource抽象
75 | 认识Spring Boot的各类Actuator Endpoint
76 | 动手定制自己的Health Indicator
77 | 通过Micrometer获取运行数据
78 | 通过Spring Boot Admin了解程序的运行状态
79 | 如何定制Web容器的运行参数
80 | 如何配置容器支持HTTP/2(上)
81 | 如何配置容器支持HTTP/2(下)
82 | 如何编写命令行运行的程序
83 | 了解可执行Jar背后的秘密
84 | 如何将Spring Boot应用打包成Docker镜像文件
85 | SpringBucks实战项目进度小结
86 | 简单理解微服务
87 | 如何理解云原生(Cloud Native)
88 | 12-Factor App(上)
89 | 12-Factor App(下)
90 | 认识Spring Cloud的组成部分
91 | 使用Eureka作为服务注册中心
92 | 使用Spring Cloud Loadbalancer访问服务
93 | 使用Feign访问服务
94 | 深入理解服务发现背后的DiscoveryClient
95 | 使用Zookeeper作为服务注册中心
96 | 使用Consul作为服务注册中心
97 | 使用Nacos作为服务注册中心
98 | 如何定制自己的DiscoveryClient
99 | SpringBucks实战项目进度小结
100 | 使用Hystrix实现服务熔断(上)
101 | 使用Hystrix实现服务熔断(下)
102 | 如何观察服务熔断
103 | 使用Resilience4j实现服务熔断
104 | 使用Resilience4j实现服务限流(上)
105 | 使用Resilience4j实现服务限流(下)
106 | SpringBucks实战项目进度小结
107 | 基于Git的配置中心(上)
108 | 基于Git的配置中心(下)
109 | 基于Zookeeper的配置中心
110 | 深入理解Spring Cloud的配置抽象
111 | 基于Consul的配置中心
112 | 基于Nacos的配置中心
113 | SpringBucks实战项目进度小结
114 | 认识Spring Cloud Stream
115 | 通过Spring Cloud Stream访问RabbitMQ
116 | 通过Spring Cloud Stream访问Kafka
117 | SpringBucks实战项目进度小结
118 | 通过Dapper理解链路治理
119 | 使用Spring Cloud Sleuth实现链路追踪
120 | 如何追踪消息链路
121 | 除了链路还要治理什么
122 | SpringBucks实战项目进度小结
123 | 结课测试&结束语
登录 后留言

全部留言(8)

  • 最新
  • 精选
又双叒叕是一年啊
之前线上使用过kafka做泄洪处理,确实很强大。kafka也非常稳定,多副本机制也能保证消息不丢失,但是只能保证单个partition的消息有序,请问老师能kafka能保证全局的消息有序性?或者是否可以在应用层做处理? 还有如何能避免消息的重复消费问题,一般产线上消息去重如何去做 有没有好的建议

作者回复: 首先,kafka 3副本在ack1时,极端情况里也是有可能会丢消息的,真不能丢的建议ack all。其次,我建议你还是默认认为消息是乱序的会重复的,虽然程序写起来要考虑的东西会多不少,但这样才够健壮。关于重复的问题,一般消息体里的内容总是会有什么业务ID的,让业务逻辑来做幂等性控制,这个不要强行放在MQ客户端想一劳永逸。

2019-05-23
10
york
事件驱动和消息驱动,是不是本质上是差不多的? 应用程序内部,用Spring Event机制;而跨应用程序的场景,就用消息中间件?

作者回复: 如果我说消息也可以看成一种事件,你是不是就不会困惑了。我见过有自己给自己发个MQ消息的,也有很多人这么用。

2019-10-30
3
白菜炒五花肉
老师,直接用kafka-client和用spring cloud stream kafka有什么优劣, 实际产线上哪个用的多啊

作者回复: 我感觉目前接触到的情况下,好像还是前者多一点,或者是spring-kafka,用spring cloud stream的少点

2021-08-14
2
jollyja
老师你好,咨询一个场景采用哪个消息队列产品及产品特性来实现比较合适: 【场景描述】 ServiceSubscribeA订阅Message(ID=1),线程阻塞(超时则抛异常)直到收到ID=1的Message(只收指定ID的消息); ServiceSubscribeB订阅Message(ID=2),线程阻塞(超时则抛异常)直到收到ID=2的Message(只收指定ID的消息); ServicePublisher发布Message,ID=1、ID=2、。。。 subscribe和publish,无先后关系; subscribe并发数要求高,可能同时几千个线程(不同jvm实例)做subscribe; subscribe处理时效要求高,不考虑publish耗时的话,从subscribe开始到收到消息的耗时为毫秒级;

作者回复: 订阅消息为什么会有线程阻塞,没收到消息的时候监听队列就可以了呀,不用做什么阻塞。 看你的描述两个消费者订阅的是同一个队列,只是各收不同的消息,JMS规范里有message selector,可以根据消息头里的不同属性来筛选消息。 但我总觉得你可能没把需求说清楚,如果是不同的系统,通常监听不同的队列,如果监听同一个队列,那也是不同的分组,消息都会收下来的。感觉没能理解你的需求。

2021-05-04
2
test
老师,如果kafka要自己手动提交offset是不是就不能按照你的例子这样子写。

作者回复: 那可以直接使用Spring Kafka,不用Spring Cloud Stream的封装

2020-08-07
2
高峰
你好,这种binder的模式,如何设置 consumer group-Id

作者回复: 我们在kafka-barista-service的配置文件中有一个spring.cloud.stream.bindings.newOrders.group=barista-service,这里的newOrders就是Topic,group是分组名

2020-07-23
2
2
十块钱
kafka之前做过日志分析,现在公司用阿里rocketmq,使用起来比较方便,但是要做好消息幂等性
2019-05-21
2
高峰
请教一下老师,我这边单点kafka,单个消费者,单个partion, 我用spring boot kafka binder 消费一段时间后 自动退出了, 报一下错误。 Revoke previously assigned partitions xxxxxx Member consumer-xxxx-consumer-01-2-e7d8f7bf-ea52-4b3a-8aed-39059da4f772 sending LeaveGroup request to coordinator 172.16.12.170:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics 是不是长时间没有数据可以消费导致的。 allow.auto.create.topics = true auto.commit.interval.ms = 100 auto.offset.reset = earliest bootstrap.servers = [172.16.12.170:9092] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = hot-consumer-01 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 1200000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100
2020-09-23
收起评论