消息队列高手课
李玥
美团高级技术专家
52199 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 42 讲
进阶篇 (21讲)
消息队列高手课
15
15
1.0x
00:00/00:00
登录|注册

25 | RocketMQ与Kafka中如何实现事务?

发送事务消息
提交阶段
准备阶段
用于流计算中
与消息队列的Exactly Once不同
多条消息的原子性
事务反查
处理半消息
sendMessageInTransaction
checkLocalTransaction
executeLocalTransaction
创建订单
初始化transactionListener和producer
事务恢复机制
分析Kafka源代码
实现流程
事务日志主题
事务协调者
Exactly Once
Kafka的事务解决的问题
Broker处理事务消息
Producer发送事务消息
TransactionListener
创建订单服务
思考题
Kafka的事务是如何实现的?
Kafka的事务和Exactly Once
RocketMQ的事务是如何实现的?
RocketMQ与Kafka中如何实现事务?

该思维导图由 AI 生成,仅供参考

你好,我是李玥。
在之前《04 | 如何利用事务消息实现分布式事务?》这节课中,我通过一个小例子来和大家讲解了如何来使用事务消息。在这节课的评论区,很多同学都提出来,非常想了解一下事务消息到底是怎么实现的。不仅要会使用,还要掌握实现原理,这种学习态度,一直是我们非常提倡的,这节课,我们就一起来学习一下,在 RocketMQ 和 Kafka 中,事务消息分别是如何来实现的?

RocketMQ 的事务是如何实现的?

首先我们来看 RocketMQ 的事务。我在之前的课程中,已经给大家讲解过 RocketMQ 事务的大致流程,这里我们再一起通过代码,重温一下这个流程。
public class CreateOrderService {
@Inject
private OrderDao orderDao; // 注入订单表的DAO
@Inject
private ExecutorService executorService; //注入一个ExecutorService
private TransactionMQProducer producer;
// 初始化transactionListener 和 producer
@Init
public void init() throws MQClientException {
TransactionListener transactionListener = createTransactionListener();
producer = new TransactionMQProducer("myGroup");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
}
// 创建订单服务的请求入口
@PUT
@RequestMapping(...)
public boolean createOrder(@RequestBody CreateOrderRequest request) {
// 根据创建订单请求创建一条消息
Message msg = createMessage(request);
// 发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, request);
// 返回:事务是否成功
return sendResult.getSendStatus() == SendStatus.SEND_OK;
}
private TransactionListener createTransactionListener() {
return new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
CreateOrderRequest request = (CreateOrderRequest ) arg;
try {
// 执行本地事务创建订单
orderDao.createOrderInDB(request);
// 如果没抛异常说明执行成功,提交事务消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Throwable t) {
// 失败则直接回滚事务消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 反查本地事务
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {、
// 从消息中获得订单ID
String orderId = msg.getUserProperty("orderId");
// 去数据库中查询订单号是否存在,如果存在则提交事务;
// 如果不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回UNKNOW
//(PS:这里RocketMQ有个拼写错误:UNKNOW)
return orderDao.isOrderIdExistsInDB(orderId)?
LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
}
};
}
//....
}
在这个流程中,我们提供一个创建订单的服务,功能就是在数据库中插入一条订单记录,并发送一条创建订单的消息,要求写数据库和发消息这两个操作在一个事务内执行,要么都成功,要么都失败。在这段代码中,我们首先在 init() 方法中初始化了 transactionListener 和发生 RocketMQ 事务消息的变量 producer。真正提供创建订单服务的方法是 createOrder(),在这个方法里面,我们根据请求的参数创建一条消息,然后调用 RocketMQ producer 发送事务消息,并返回事务执行结果。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

RocketMQ和Kafka是两种流行的消息中间件,它们都支持事务消息。本文深入解析了RocketMQ事务消息的实现细节,包括事务消息的发送、本地事务执行、事务反查以及Broker端的处理流程。通过对RocketMQ事务消息的工作原理和实现机制进行详细介绍,读者可以深入了解RocketMQ事务消息的特点和使用方法,为使用和定制RocketMQ提供了重要参考。 Kafka的事务机制解决的是在一个事务中发送的多条消息要么都成功,要么都失败的问题。它基于两阶段提交来实现事务,利用了特殊的主题中的队列和分区来记录事务日志。不同于RocketMQ,Kafka直接将事务消息放到对应的业务分区中,配合客户端过滤来暂时屏蔽进行中的事务消息。Kafka的事务适用于实现其Exactly Once机制,主要应用于实时计算的场景中。 总的来说,本文通过对RocketMQ和Kafka事务消息的实现细节进行深入分析,帮助读者了解了它们的工作原理和实现机制,以及它们在不同场景下的适用性。这对于从事消息中间件开发或使用的技术人员来说,是一份重要的参考资料。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《消息队列高手课》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(21)

  • 最新
  • 精选
  • 丁小明
    也就是说其实kafka的Exactly Once模式,是kafka的consumer通过PID去实现了一个幂等操作,原理上来说是和at last once我们业务自己通过其他唯一ID实现幂等是一样的效果,并不是正真的只传输到客户端一次,而是重复传输实现了幂等。

    作者回复: 是这样的。

    2020-05-08
    23
  • ξ!
    老师,如果本地事务是有返回值的话可不可以先执行本地事务如果有异常就抛出,再去执行发送消息,因为现在这么写获取不到执行完本地事务的结果呀

    作者回复: 考虑这种情况: 1. 客户端提交了本地事务; 2. 本地事务在数据库执行成功了。 3. 这个时候客户端宕机了; 这种情况下,就没法保证一致性了。

    2020-05-13
    2
    12
  • 二明儿
    老师好,请教个问题,kafka consumer将事务未提交的消息 在客户端过滤后不放行给业务代码消费,如果这样如果有大量未提交的消息对于客户端端内存会不会有影响?如果这个时候客户端重启或者发生reblance,offset已经提交会不会导致消息丢失?

    作者回复: 大量未提交消息对客户端内存影响不大,因为Kafka客户端有一个固定大小的buffer用来保存拉取的消息。 只要你遵循:先执行消费业务逻辑,再提交,这样的原则。 即使客户端重启或者Rebalance,也不会丢消息。

    2020-04-10
    2
    7
  • weilai
    查好像很多博客都说阿里已经把RocketMQ的这个反查接口给干掉了?老师,是这样吗?遇到这种问题,您是怎么找到答案的?

    作者回复: 以官网文档和代码为准吧,至少目前的版本是没有变化的。 https://rocketmq.apache.org/docs/transaction-example/

    2019-10-24
    2
    6
  • jack
    老师,如果仅仅把kafka作为数据源,流计算的结果保存到了其他数据库中,是不是就用不到kafka的事务了呢?

    作者回复: 是这样的

    2019-10-02
    6
  • miniluo
    老师,有个疑问:文中说到rocketmq#checkLocalTransaction这个方法反查到可能本地事务还在提交中就返回了unknow,那后续呢?还会通过定时轮询检查?求解,谢谢

    作者回复: 会一直定时轮询,直到有结果或者超时。

    2019-09-21
    4
    6
  • 不惑ing
    ' Kafka 的事务则是用于实现它的 Exactly Once 机制,应用于实时计算的场景中。'这句话的意思理解为kafka的事务针对本地事务和发消息一致性没有rocketmq好,但是也可以用,这样理解对吗?

    作者回复: 可以这么理解,Kafka没有RocketMQ的事务反查补偿机制。

    2019-10-05
    5
  • A9
    请问老师,失败的半消息也是在commit log中存储着吧。如果失败的事务消息存储过多,会不会导致在读取commit log时频繁触发缺页?

    作者回复: 一般来说不会,因为如果是已经关闭的事务,就不会再去读它对应的半消息了。 由于事务的超时机制存在,一般来说,活动的事务的日志大多都在commit log的尾部。

    2019-10-27
    3
  • lmtoo
    kafka的第二阶段,事务协调者发送给每个分区的事务结束的消息,每个分区是怎么处理这个事务结束的消息的?这个事务结束的消息保存到哪儿了?是不是消费者挂机重启之后,事务结束的消息就没了?

    作者回复: 事务结束消息就是一条特殊的消息,和普通消息一样保存在分区中。同普通消息一样,事务结束消息只要不被删除,就会一直存在。

    2019-09-23
    3
  • jian
    请问老师,这里说“消息发送完成后,生产者给协调者发送提交或回滚事务的请求,由协调者来开始两阶段提交,完成事务。第一阶段,协调者把事务的状态设置为“预提交”,并写入事务日志。到这里,实际上事务已经成功了,无论接下来发生什么情况,事务最终都会被提交。”假如协调者执行完第一阶段之后还没有执行第二阶段,这时候机器宕机或者进程被KILL掉了,是不是重启之后会继续执行第二阶段呢?

    作者回复: 是这样的。

    2020-07-07
    2
收起评论
显示
设置
留言
21
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部