消息队列高手课
李玥
京东零售技术架构部资深架构师
立即订阅
8426 人已学习
课程目录
已完结 41 讲
0/4登录后,你可以任选4讲全文学习。
课前必读 (2讲)
开篇词 | 优秀的程序员,你的技术栈中不能只有“增删改查”
免费
预习 | 怎样更好地学习这门课?
基础篇 (8讲)
01 | 为什么需要消息队列?
02 | 该如何选择消息队列?
03 | 消息模型:主题和队列有什么区别?
04 | 如何利用事务消息实现分布式事务?
05 | 如何确保消息不会丢失?
06 | 如何处理消费过程中的重复消息?
07 | 消息积压了该如何处理?
08 | 答疑解惑(一) : 网关如何接收服务端的秒杀结果?
进阶篇 (21讲)
09 | 学习开源代码该如何入手?
10 | 如何使用异步设计提升系统性能?
11 | 如何实现高性能的异步网络传输?
12 | 序列化与反序列化:如何通过网络传输结构化的数据?
13 | 传输协议:应用程序之间对话的语言
14 | 内存管理:如何避免内存溢出和频繁的垃圾回收?
加餐 | JMQ的Broker是如何异步处理消息的?
15 | Kafka如何实现高性能IO?
16 | 缓存策略:如何使用缓存来减少磁盘IO?
17 | 如何正确使用锁保护共享数据,协调异步线程?
18 | 如何用硬件同步原语(CAS)替代锁?
19 | 数据压缩:时间换空间的游戏
20 | RocketMQ Producer源码分析:消息生产的实现过程
21 | Kafka Consumer源码分析:消息消费的实现过程
22 | Kafka和RocketMQ的消息复制实现的差异点在哪?
23 | RocketMQ客户端如何在集群中找到正确的节点?
24 | Kafka的协调服务ZooKeeper:实现分布式系统的“瑞士军刀”
25 | RocketMQ与Kafka中如何实现事务?
26 | MQTT协议:如何支持海量的在线IoT设备?
27 | Pulsar的存储计算分离设计:全新的消息队列设计思路
28 | 答疑解惑(二):我的100元哪儿去了?
案例篇 (7讲)
29 | 流计算与消息(一):通过Flink理解流计算的原理
30 | 流计算与消息(二):在流计算中使用Kafka链接计算任务
31 | 动手实现一个简单的RPC框架(一):原理和程序的结构
32 | 动手实现一个简单的RPC框架(二):通信与序列化
33 | 动手实现一个简单的RPC框架(三):客户端
34 | 动手实现一个简单的RPC框架(四):服务端
35 | 答疑解惑(三):主流消息队列都是如何存储消息的?
测试篇 (2讲)
期中测试丨10个消息队列热点问题自测
免费
期末测试 | 消息队列100分试卷等你来挑战!
结束语 (1讲)
结束语 | 程序员如何构建知识体系?
消息队列高手课
登录|注册

25 | RocketMQ与Kafka中如何实现事务?

李玥 2019-09-21
你好,我是李玥。
在之前《04 | 如何利用事务消息实现分布式事务?》这节课中,我通过一个小例子来和大家讲解了如何来使用事务消息。在这节课的评论区,很多同学都提出来,非常想了解一下事务消息到底是怎么实现的。不仅要会使用,还要掌握实现原理,这种学习态度,一直是我们非常提倡的,这节课,我们就一起来学习一下,在 RocketMQ 和 Kafka 中,事务消息分别是如何来实现的?

RocketMQ 的事务是如何实现的?

首先我们来看 RocketMQ 的事务。我在之前的课程中,已经给大家讲解过 RocketMQ 事务的大致流程,这里我们再一起通过代码,重温一下这个流程。
public class CreateOrderService {
@Inject
private OrderDao orderDao; // 注入订单表的DAO
@Inject
private ExecutorService executorService; //注入一个ExecutorService
private TransactionMQProducer producer;
// 初始化transactionListener 和 producer
@Init
public void init() throws MQClientException {
TransactionListener transactionListener = createTransactionListener();
producer = new TransactionMQProducer("myGroup");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
}
// 创建订单服务的请求入口
@PUT
@RequestMapping(...)
public boolean createOrder(@RequestBody CreateOrderRequest request) {
// 根据创建订单请求创建一条消息
Message msg = createMessage(request);
// 发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, request);
// 返回:事务是否成功
return sendResult.getSendStatus() == SendStatus.SEND_OK;
}
private TransactionListener createTransactionListener() {
return new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
CreateOrderRequest request = (CreateOrderRequest ) arg;
try {
// 执行本地事务创建订单
orderDao.createOrderInDB(request);
// 如果没抛异常说明执行成功,提交事务消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Throwable t) {
// 失败则直接回滚事务消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 反查本地事务
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {、
// 从消息中获得订单ID
String orderId = msg.getUserProperty("orderId");
// 去数据库中查询订单号是否存在,如果存在则提交事务;
// 如果不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回UNKNOW
//(PS:这里RocketMQ有个拼写错误:UNKNOW)
return orderDao.isOrderIdExistsInDB(orderId)?
LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
}
};
}
//....
}
在这个流程中,我们提供一个创建订单的服务,功能就是在数据库中插入一条订单记录,并发送一条创建订单的消息,要求写数据库和发消息这两个操作在一个事务内执行,要么都成功,要么都失败。在这段代码中,我们首先在 init() 方法中初始化了 transactionListener 和发生 RocketMQ 事务消息的变量 producer。真正提供创建订单服务的方法是 createOrder(),在这个方法里面,我们根据请求的参数创建一条消息,然后调用 RocketMQ producer 发送事务消息,并返回事务执行结果。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《消息队列高手课》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(7)

  • weilai
    查好像很多博客都说阿里已经把RocketMQ的这个反查接口给干掉了?老师,是这样吗?遇到这种问题,您是怎么找到答案的?

    作者回复: 以官网文档和代码为准吧,至少目前的版本是没有变化的。

    https://rocketmq.apache.org/docs/transaction-example/

    2019-10-24
    2
  • miniluo
    老师,有个疑问:文中说到rocketmq#checkLocalTransaction这个方法反查到可能本地事务还在提交中就返回了unknow,那后续呢?还会通过定时轮询检查?求解,谢谢

    作者回复: 会一直定时轮询,直到有结果或者超时。

    2019-09-21
    3
    1
  • 我已经设置了昵称
    kafka的事务消息和rocketmq的只是比rocketmq少了个反查机制。忽略内部的实现,可以这么理解吧。可是老师貌似没有讲过如何使用kafka的事务消息呢

    作者回复: 在第30节课中会讲到Kafka事务的用途。

    2019-10-29
    1
  • A9
    请问老师,失败的半消息也是在commit log中存储着吧。如果失败的事务消息存储过多,会不会导致在读取commit log时频繁触发缺页?

    作者回复: 一般来说不会,因为如果是已经关闭的事务,就不会再去读它对应的半消息了。

    由于事务的超时机制存在,一般来说,活动的事务的日志大多都在commit log的尾部。

    2019-10-27
  • 不惑ing
    ' Kafka 的事务则是用于实现它的 Exactly Once 机制,应用于实时计算的场景中。'这句话的意思理解为kafka的事务针对本地事务和发消息一致性没有rocketmq好,但是也可以用,这样理解对吗?

    作者回复: 可以这么理解,Kafka没有RocketMQ的事务反查补偿机制。

    2019-10-05
  • jack
    老师,如果仅仅把kafka作为数据源,流计算的结果保存到了其他数据库中,是不是就用不到kafka的事务了呢?

    作者回复: 是这样的

    2019-10-02
  • lmtoo
    kafka的第二阶段,事务协调者发送给每个分区的事务结束的消息,每个分区是怎么处理这个事务结束的消息的?这个事务结束的消息保存到哪儿了?是不是消费者挂机重启之后,事务结束的消息就没了?

    作者回复: 事务结束消息就是一条特殊的消息,和普通消息一样保存在分区中。同普通消息一样,事务结束消息只要不被删除,就会一直存在。

    2019-09-23
收起评论
7
返回
顶部