如何用RabbitMQ解决分布式事务?

今天松哥想通过一个简单的案例,来和大家聊一聊如何通过消息中间件来处理分布式事务。

在栖霞等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供网站制作、成都网站制作 网站设计制作按需设计网站,公司网站建设,企业网站建设,成都品牌网站建设,成都全网营销,外贸营销网站建设,栖霞网站建设费用合理。

1. 思路分析

先来说说整体思路。

有一个名词叫做消息驱动的微服务,相信很多小伙伴都听说过。怎么理解呢?

在微服务系统中,服务之间的互相调用,我们可以使用 HTTP 的方式,例如 OpenFeign,也可以使用 RPC 的方式,例如 Dubbo,除了这些方案之外,我们也可以使用消息驱动,这是一种典型的响应式系统设计方案。

在消息驱动的微服务中,服务之间不再互相直接调用,当服务之间需要通信时,就把通信内容发送到消息中间件上,另一个服务则通过监听消息中间件中的消息队列,来完成相应的业务逻辑调用,过程就是这么个过程,并不难,具体怎么玩,我们继续往下看。

2. 业务分析

折腾了半天,后来松哥在网上找到了一个别人写好的例子,我觉得用来演示这个问题特别合适,所以我就没有自己写案例了,直接用别人的代码,我们来逐个分析,跟前面讲分布式事务 Seata 的方式一致。

首先我们来看如下一张流程图,这是一个用户购票的案例:

当用户想要购买一张票时:

  • 向新订单队列中写入一条数据。
  • Order Service 负责消费这个队列中的消息,完成订单的创建,然后再向新订单缴费队列中写入一条消息。
  • User Service 负责消费新订单缴费队列中的消息,在 User Service 中完成对用户账户余额的划扣,然后向新订单转移票队列中写入一条消息。
  • Ticket Service 负责消费新订单转移票队列,在 Ticket Service 中完成票的转移,然后发送一条消息给订单完成队列。
  • 最后 Order Service 中负责监听订单完成队列,处理完成后的订单。

这就是一个典型的消息驱动微服务,也是一个典型的响应式系统。在这个系统中,一共有三个服务,分别是:

  • Order Service
  • User Service
  • Ticket Service

这三个服务之间不会进行任何形式的直接调用,大家有事都是直接发送到消息中间件,其他服务则从消息中间件中获取自己想要的消息然后进行处理。

具体到我们的实践中,则多了一个检查票是否够用的流程,如下图:

创建订单时,先由 Ticket 服务检查票是否够用,没问题的话再继续发起订单的创建。其他过程我就不说了。

另外还需要注意,在售票系统中,由于每张票都不同,例如每张票可能有座位啥的,因此一张票在数据库中往往是被设计成一条记录。

3. 实践

流程我已经说明白了,接下来我们就来看看具体的代码实践。

3.1 准备数据库

首先我们准备三个数据库,分别是:

javaboy_order:订单库,用户创建订单等操作,在这个数据库中完成。

javaboy_ticket:票务库,这个库中保存着所有的票据信息,每一张票都是一条记录,都保存在这个库中。

javaboy_user:用户库,这里保存着用户的账户余额以及付款记录等信息。

每个库中都有各自对应的表,为了操作方便,这些表不用自己创建,将来等项目启动了,利用 JPA 自动创建即可。

3.2 项目概览

我们先来整体上看下这个项目,公众号后台回复 mq_tran 可以下载完整代码:

一共有五个服务:

  • eureka:注册中心
  • order:订单服务
  • service:公共模块
  • ticket:票务服务
  • user:用户服务
  • 下面分别来说。

3.3 注册中心

有人说,都消息驱动了,还要注册中心干嘛?

消息驱动没错,消息驱动微服务之后每个服务只管把消息往消息中间件上扔,每个服务又只管消费消息中间件上的消息,这个时候对于服务注册中心似乎不是那么强需要。不过在我们这个案例中,消息驱动主要用来处理事务问题,其他常规需求我们还是用 OpenFeign 来处理,所以这里我们依然需要一个注册中心。

这里的注册中心我就选择常见的 Eureka,省事一些。由于本文主要是和大家聊分布式事务,所以涉及到微服务的东西我就简单介绍下,不会占用过多篇幅,如果大家还不熟悉 Spring Cloud 的用法,可以在公众号后台回复 vhr 有一套视频介绍。

服务注册中心的创建记得加上 Spring Security,将自己的服务注册中心保护起来。

这块有一个小小的细节和大家多说两句。

Eureka 用 Spring Security 保护起来之后,以后其他服务注册都是通过 Http Basic 来认证,所以我们要在代码中开启 Http Basic 认证,如下(以前旧版本不需要下面这段代码,但是新版本需要):

3.4 购票服务

接下来我们就来看看购票服务。

@Configuration
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http.authorizeRequests()
.anyRequest().authenticated()
.and()
.httpBasic()
.and().formLogin().and().csrf().disable();
}
}

购票是从下订单开始,所以我们就先从订单服务 order 开始整个流程的分析。

3.4.1 新订单处理(order)

当用户发起一个购票请求后,这个请求发送到 order 服务上,order 服务首先会向 order:new 队列发送一条消息,开启一个订单的处理流程。代码如下:

@Transactional
@PostMapping("")
public void create(@RequestBody OrderDTO dto) {
dto.setUuid(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("order:new", dto);
}

上面设置的 UUID 是整个订单在处理过程中的一个唯一标志符,也算是一条主线。

order:new​ 队列中的消息将被 ticket 服务消费,ticket 服务消费 order:new​ 中的消息,并进行锁票操作(锁票的目的防止有两个消费同时购买同一张票),锁票成功后,ticket 服务将向 order:locked​ 队列发送一条消息,表示锁票成功;否则向 order:fail 队列发送一条消息表示锁票失败。

这里的 OrderDTO 对象将贯穿整个购票过程。

3.4.2 锁票(ticket)

锁票操作是在 ticket 服务中完成的,代码如下:

@Transactional
@RabbitListener(queues = "order:new")
public void handleTicketLock(OrderDTO msg) {
LOG.info("Get new order for ticket lock:{}", msg);
int lockCount = ticketRepository.lockTicket(msg.getCustomerId(), msg.getTicketNum());
if (lockCount == 0) {
msg.setStatus("TICKET_LOCK_FAIL");
rabbitTemplate.convertAndSend("order:fail", msg);
} else {
msg.setStatus("TICKET_LOCKED");
rabbitTemplate.convertAndSend("order:locked", msg);
}
}

先调用 lockTicket 方法去数据库中锁票,所谓的锁票就是将要购买的票的 lock_user 字段设置为 customer_id(购买者的 id)。

如果锁票成功(即数据库修改成功),设置 msg 的状态为 TICKET_LOCKED​,同时发送消息到 order:locked 队列,表示锁票成功。

如果锁票失败(即数据库修改失败),设置 msg 的状态为 TICKET_LOCK_FAIL​,同时发送消息到 order:fail 队列,表示锁票失败。

3.4.2 锁票成功(order)

接下来,由 order 服务消费 order:locked 队列中的消息,也就是锁票成功后接下来的操作。

@Transactional
@RabbitListener(queues = "order:locked")
public void handle(OrderDTO msg) {
LOG.info("Get new order to create:{}", msg);
if (orderRepository.findOneByUuid(msg.getUuid()) != null) {
LOG.info("Msg already processed:{}", msg);
} else {
Order order = newOrder(msg);
orderRepository.save(order);
msg.setId(order.getId());
}
msg.setStatus("NEW");
rabbitTemplate.convertAndSend("order:pay", msg);
}

锁票成功后,先根据订单的 UUID 去订单数据库查询,是否已经有订单记录了,如果有,说明这条消息已经被处理了,可以防止订单的重复处理(这块主要是解决幂等性问题)。

如果订单还没有被处理,则创建一个新的订单对象,并保存到数据库中,创建新订单对象的时候,需要设置订单的 status 为 NEW。

最后设置 msg 的 status 为 NEW,然后向 order:pay​ 队列发送一条消息开启付款流程,付款是由 user 服务提供的。user 服务中会检查用户的账户余额是否够用,如果不够用,就会发送消息到 order:ticket_error​ 队列,表示订票失败;如果余额够用,则进行正常的付款操作,并在付款成功后发送消息到 order:ticket_move 队列,开启票的转移。

3.4.3 缴费(user)

锁票成功后,接下来就是付费了,付费服务由 user 提供。

@Transactional
@RabbitListener(queues = "order:pay")
public void handle(OrderDTO msg) {
LOG.info("Get new order to pay:{}", msg);
// 先检查payInfo判断重复消息。
PayInfo pay = payInfoRepository.findOneByOrderId(msg.getId());
if (pay != null) {
LOG.warn("Order already paid, duplicated message.");
return;
}
Customer customer = customerRepository.getById(msg.getCustomerId());
if (customer.getDeposit() < msg.getAmount()) {
LOG.info("No enough deposit, need amount:{}", msg.getAmount());
msg.setStatus("NOT_ENOUGH_DEPOSIT");
rabbitTemplate.convertAndSend("order:ticket_error", msg);
return;
}
pay = new PayInfo();
pay.setOrderId(msg.getId());
pay.setAmount(msg.getAmount());
pay.setStatus("PAID");
payInfoRepository.save(pay);
customerRepository.charge(msg.getCustomerId(), msg.getAmount());
msg.setStatus("PAID");
rabbitTemplate.convertAndSend("order:ticket_move", msg);
}

这里的执行步骤如下:

  • 首先根据订单 id 去查找付款信息,检查当前订单是否已经完成付款,如果已经完成服务,则直接 return,这一步也是为了处理幂等性问题。
  • 根据顾客的 id,查找到顾客的完整信息,包括顾客的账户余额。
  • 检查顾客的账户余额是否足够支付票价,如果不够,则设置 msg 的 status 为 NOT_ENOUGH_DEPOSIT,同时向order:ticket_error 队列发送消息,表示订票失败。
  • 如果顾客账户余额足够支付票价,则创建一个 PayInfo 对象,设置相关的支付信息,并存入pay_info 表中。
  • 调用 charge 方法完成顾客账户余额的扣款。
  • 发送消息到order:ticket_move 队列中,开启交票操作。

3.4.4 交票(ticket)

@Transactional
@RabbitListener(queues = "order:ticket_move")
public void handleTicketMove(OrderDTO msg) {
LOG.info("Get new order for ticket move:{}", msg);
int moveCount = ticketRepository.moveTicket(msg.getCustomerId(), msg.getTicketNum());
if (moveCount == 0) {
LOG.info("Ticket already transferred.");
}
msg.setStatus("TICKET_MOVED");
rabbitTemplate.convertAndSend("order:finish", msg);
}

调用 moveTicket 方法完成交票操作,也就是设置 ticket 表中票的 owner 为 customerId。

交票成功后,发送消息到 order:finish 队列,表示交票完成。

3.4.5 订单完成(order)

@Transactional
@RabbitListener(queues = "order:finish")
public void handleFinish(OrderDTO msg) {
LOG.info("Get finished order:{}", msg);
Order order = orderRepository.getById(msg.getId());
order.setStatus("FINISH");
orderRepository.save(order);
}

这里的处理就比较简单,订单完成后,就设置订单的状态为 FINISH 即可。

上面介绍的是一条主线,顺利的话,消息顺着这条线走一遍,一个订单就处理完成了。

不顺利的话,就有各种幺蛾子,我们分别来看。

3.4.6 锁票失败(order)

锁票是在 ticket 服务中完成的,如果锁票失败,就会直接向 order:fail 队列发送消息,该队列的消息由 order 服务负责消费。

3.4.7 扣款失败(ticket)

扣款操作是在 user 中完成的,扣款失败就会向 order:ticket_error 队列中发送消息,该队列的消息由 ticket 服务负责消费。

@Transactional
@RabbitListener(queues = "order:ticket_error")
public void handleError(OrderDTO msg) {
LOG.info("Get order error for ticket unlock:{}", msg);
int count = ticketRepository.unMoveTicket(msg.getCustomerId(), msg.getTicketNum());
if (count == 0) {
LOG.info("Ticket already unlocked:", msg);
}
count = ticketRepository.unLockTicket(msg.getCustomerId(), msg.getTicketNum());
if (count == 0) {
LOG.info("Ticket already unmoved, or not moved:", msg);
}
rabbitTemplate.convertAndSend("order:fail", msg);
}

当扣款失败的时候,做三件事:

  • 撤销票的转移,也就是把票的 owner 字段重新置为 null。
  • 撤销锁票,也就是把票的lock_user 字段重新置为 null。
  • 向order:fail 队列发送订单失败的消息。

3.4.8 下单失败(order)

下单失败的处理在 order 服务中,有三种情况会向 order:fail 队列发送消息:

  • 锁票失败
  • 扣款失败(客户账户余额不足)
  • 订单超时
@Transactional
@RabbitListener(queues = "order:fail")
public void handleFailed(OrderDTO msg) {
LOG.info("Get failed order:{}", msg);
Order order;
if (msg.getId() == null) {
order = newOrder(msg);
order.setReason("TICKET_LOCK_FAIL");
} else {
order = orderRepository.getById(msg.getId());
if (msg.getStatus().equals("NOT_ENOUGH_DEPOSIT")) {
order.setReason("NOT_ENOUGH_DEPOSIT");
}
}
order.setStatus("FAIL");
orderRepository.save(order);
}

该方法的具体处理逻辑如下:

  • 首先查看是否有订单 id,如果连订单 id 都没有,就说明是锁票失败,给订单设置 reason 属性的值为TICKET_LOCK_FAIL。
  • 如果有订单 id,则根据 id 查询订单信息,并判断订单状态是否为NOT_ENOUGH_DEPOSIT​,这个表示扣款失败,如果订单状态是NOT_ENOUGH_DEPOSIT,则设置失败的 reason 也为此。
  • 最后设置订单状态为 FAIL,然后更新数据库中的订单信息即可。

3.4.9 订单超时(order)

order 服务中还有一个定时任务,定时去数据库中捞取那些处理失败的订单,如下:

@Scheduled(fixedDelay = 10000L)
public void checkInvalidOrder() {
ZonedDateTime checkTime = ZonedDateTime.now().minusMinutes(1L);
List orders = orderRepository.findAllByStatusAndCreatedDateBefore("NEW", checkTime);
orders.stream().forEach(order -> {
LOG.error("Order timeout:{}", order);
OrderDTO dto = new OrderDTO();
dto.setId(order.getId());
dto.setTicketNum(order.getTicketNum());
dto.setUuid(order.getUuid());
dto.setAmount(order.getAmount());
dto.setTitle(order.getTitle());
dto.setCustomerId(order.getCustomerId());
dto.setStatus("TIMEOUT");
rabbitTemplate.convertAndSend("order:ticket_error", dto);
});
}

可以看到,这里是去数据库中捞取那些状态为 NEW 并且是 1 分钟之前的订单,根据前面的分析,当锁票成功后,就会将订单的状态设置为 NEW 并且存入数据库中。换言之,当锁票成功一分钟之后,这张票还没有卖掉,就设置订单超时,同时向 order:ticket_error 队列发送一条消息,这条消息在 ticket 服务中被消费,最终完成撤销交票、撤销锁票等操作。

这就是大致的代码处理流程。

再来回顾一下前面那张图:

结合着代码来看这张图是不是就很容易懂了。

3.5 测试

接下来我们来进行一个简单的测试。

先来一个订票失败的测试,如下:

由于用户只有 1000 块钱,这张票要 10000,所以购票必然失败。请求执行成功后,我们查看 order 表,多了如下一条记录:

可以看到,订单失败的理由就是账户余额不足。此时查看 ticket 和 user 表,发现都完好如初(如果需要,则已经反向补偿了)。

接下来我们手动给 ticket 表中 lock_user 字段设置一个值,如下:

这个表示这张票已经被人锁定了。

然后我们发起一次购票请求(这次可以把金额设置到合理范围,其实不设置也行,反正这次失败还没走到付款这一步):

请求发送成功后,接下来我们去查看 order 表,多了如下一条记录:

可以看到,这次下单失败的理由是锁票失败。此时查看 ticket 和 user 表,发现都完好如初(如果需要,则已经反向补偿了)。

最后再来一次成功测试,先把 ticket 表中的 lock_user 字段置空,然后发送如下请求:

这次购票成功,查看 ticket 表,发票已经票有所属:

查看订单表:

可以多了一条成功的购票记录。

查看用户表:

用户账户已扣款。

查看支付记录表:

可以看到已经有了支付记录。

4. 总结

整体上来说,上面这个案例,技术上并没有什么难的,复杂之处在于设计。一开始要设计好消息的处理流程以及消息处理失败后如何进行补偿,这个是比较考验大家技术的。

另外上面案例中,消息的发送和消费都用到了 RabbitMQ 中的事务机制(确保消息消费成功)以及 Spring 中的事务机制(确保消息发送和数据保存同时成功),这些我就不再赘述了。

总之,通过消息中间件处理分布式事务,这种方式通过牺牲数据的强一致性换取性能的大幅提升,但是实现这种方式的成本和复杂度是比较高的,使用时还要看实际业务情况。

本文标题:如何用RabbitMQ解决分布式事务?
标题路径:http://www.csdahua.cn/qtweb/news28/490628.html

网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网