本文转载自微信公众号「菜鸟飞呀飞」,作者刘进坤。转载本文请联系菜鸟飞呀飞公众号。
创新互联主营海盐网站建设的网络公司,主营网站建设方案,重庆APP软件开发,海盐h5微信小程序搭建,海盐网站营销推广欢迎海盐等地区企业咨询
前言
在面过的几家大厂中,几乎每轮的面试官(「没写错,几乎是每轮面试官」)都问了同样一个问题:你们的系统是分布式的系统吗?
答:是。
面试官:那么你们分布式的系统是如何解决分布式事务这个问题的呢?也就是如何保证数据的一致性。
答:我们的系统中通过 RocketMQ 的事务消息来保证数据的最终一致性。
面试官:那你说说它是如何来保证数据的最终一致性的?
答:分两部分来回答,第一部分先回答事务消息的实现流程,第二部分解释为什么它能保证数据的最终一致性。
事务消息的实现流程
事务消息
如何保证数据的最终一致性
实现流程说完了,可能你现在有各种各样的疑惑?
Q: half 消息是个啥?
A: 它和我们正常发送的普通消息是一样的,都是存储在 MQ 中,唯一不同的是 half 在 MQ 中不会立马被消费者消费到,除非这个 half 消息被 commit 了。(至于为什么未 commit 的 half 消息无法被消费者读取到,这是因为在 MQ 内部,对于事务消息而言,在 commit 之前,会先放在一个内部队列中,只有 commit 了,才会真正将消息放在消费者能读取到的 topic 队列中)
Q: 为什么要先发送 half 消息?
A: 前面已经解释过了,主要是为了保证服务 A 和 MQ 之间是否能正常通信,如果两者之间都不能正常通信,后面还玩个锤子,直接返回异常就可以了。
Q: 如果 MQ 接收到了 half 消息,但是在返回 success 响应的时候,因为网络原因,导致服务 A 没有接收到 success 响应,这个时候是什么现象?
A: 当服务 A 发送 half 消息后,它会等待 MQ 给自己返回 success 响应,如果没有接收到,那么服务 A 也会直接结束,返回异常,不再执行后续逻辑。不执行后续逻辑,这样服务 A 也就不会提交 commit 消息给 MQ,MQ 长时间没接收到 commit 消息,那么它就会主动回调服务 A 的一个接口,服务 A 通过接口,查询本地数据后,发现这条消息对应的业务并没有正常执行,那么就告诉 MQ,这个 half 消息不能 commit,需要 rollback,MQ 知道后,就将 half 消息进行删除。
Q: 如果服务 A 本地事务执行失败了,怎么办?
A: 服务 A 本地事务执行失败后,先对自己本地事务进行回滚,然后再向 MQ 发送 rollback 操作。
Q: 服务 A 本地事务提交成功或失败后,向 MQ 发送的 commit 或者 rollback 消息,因为网络问题丢失了,又该怎么处理?
A: 和上一个问题一样,MQ 长时间没有接收到 half 消息的 commit 或者 rollback 消息,MQ 会主动回调服务 A 的接口,通过这个接口来判断自己该对这个 half 消息如何处理。
Q: 前面说的全是事务消息的实现流程,这和事务消息如何保证数据的最终一致性有什么关系呢?
A: 有关系。首先,服务 A 执行本地事务并提交和向 MQ 中发送消息这是两个写操作,然后通过 RocketMQ 的事务消息,我们保证了这两个写操作要么都执行成功,要么都执行失败。然后让其他系统,如服务 B 通过消费 MQ 中的消息,然后再去执行自己本地的事务,这样到最后,服务 A 和服务 B 这两个系统的数据状态是不是达到了一致?这就是最终一致性的含义。
如果要求服务 A 和服务 B 的数据状态,在服务 A 返回给客户端之间,这两者就达到一致,这是强一致性,RocketMQ 是没法保证强一致性的。
目前通过「可靠消息来保证数据的最终一致性」是很多大厂都采用的方案,基本都是通过 MQ 和补偿机制来保证数据的一致性。(所谓的可靠消息,就是消息不丢失,如何保证 MQ 的消息不丢失,下篇文章会写,这也是面试常考题)
Q: 服务 B 本地事务提交失败了,怎么办?
A: 如果服务 B 本地事务提交失败了,可以进行多次重试,直到成功。如果重试多次后,还是提交失败,例如此时服务 B 对应的 DB 宕机了,这个时候只要服务 B 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 会在一定时间后,继续将这条消息推送给服务 B,服务 B 就可以继续执行本地事务并提交了,直到成功。这样,依旧是保证了服务 A 和服务 B 数据的最终一致性。
代码实现
使用 RokcetMQ 的事务消息主要涉及到两个部分:
如何发送半事务消息,这个可以通过「TransactionMQProducer」 类来实现。
- TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup");
- TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(msg, null);
- // 通过result来判断half消息是否发送成功
- if(result.getSendStatus() == SendStatus.SEND_OK){
- // 成功
- }else{
- // 失败
- }
在前面我们提到了服务 A 需要提供一个接口,用来供 MQ 回调服务 A,实际上这个接口就是一个监听器:「TransactionListener」的方法。这是一个接口,提供了两个方法。
- public interface TransactionListener {
- // 当half消息发送成功后,我们在这里实现自己的业务逻辑,然后commit或者rollback 给MQ
- LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
- // 这个方法就是供MQ回调的方法,MQ通过回调该方法来判断half消息的状态
- // 可以看到,这个方法的参数是MessageExt,也就是half消息的内容,如果根据MessageExt,我们完全能在服务A中判断之前的业务是否处理成功
- LocalTransactionState checkLocalTransaction(final MessageExt msg);
- }
实际使用时,我们需要实现该接口,例如:
- public class MyTransactionListener implements TransactionListener {
- @Override
- public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- try{
- // 处理业务逻辑
- // ....
- // 业务逻辑处理成功,commit
- return LocalTransactionState.COMMIT_MESSAGE;
- }catch (Exception e){
- }
- // 业务处理失败,rollback
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }
- @Override
- public LocalTransactionState checkLocalTransaction(MessageExt msg) {
- return null;
- }
- }
另外,在创建 producer 时,指定我们实现实现的监听器
- TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup");
- transactionMQProducer.setTransactionListener(new MyTransactionListener());
总结
分布式事务是大厂面试必问题,而目前大部分公司都是采用可靠消息来保证数据的最终一种性,通常会采用 RocketMQ 来实现。如果想去阿里的同学,建议 MQ 这块,选择 RocketMQ 多复习一下。
新闻标题:RocketMQ事务消息如何保证数据的一致性
当前地址:http://www.csdahua.cn/qtweb/news6/173756.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网