微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了。
创新互联公司专注为客户提供全方位的互联网综合服务,包含不限于网站建设、成都网站制作、万秀网络推广、小程序开发、万秀网络营销、万秀企业策划、万秀品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;创新互联公司为所有大学生创业者提供万秀建站搭建服务,24小时服务热线:18980820575,官方网址:www.cdcxhl.com
今天,以 RabbitMQ 为例,松哥来和大家聊一聊消息中间消息发送可靠性的问题。
注意,以下内容我主要和大家讨论如何确保消息生产者将消息发送成功,并不涉及消息消费的问题。
大家知道,RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。
大致的流程就是这样,所以要确保消息发送的可靠性,主要从两方面去确认:
如果能确认这两步,那么我们就可以认为消息发送成功了。
如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。
经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:
上面提出的三个步骤,第三步需要我们自己实现,前两步 RabbitMQ 则有现成的解决方案。
如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:
这是两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报如下错误:
我们分别来看。以下所有案例都在 Spring Boot 中展开,文末可以下载相关源码。
Spring Boot 中开启 RabbitMQ 事务机制的方式如下:
首先需要先提供一个事务管理器,如下:
- @Bean
- RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
- return new RabbitTransactionManager(connectionFactory);
- }
接下来,在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式:
- @Service
- public class MsgService {
- @Autowired
- RabbitTemplate rabbitTemplate;
- @Transactional
- public void send() {
- rabbitTemplate.setChannelTransacted(true);
- rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
- int i = 1 / 0;
- }
- }
这里注意两点:
这就 OK 了。
在上面的案例中,我们在结尾来了个 1/0 ,这在运行时必然抛出异常,我们可以尝试运行该方法,发现消息并未发送成功。
当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:
上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。
所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,这种方式,性能要远远高于事务模式,一起来看下。
2.2.1 单条消息处理
首先我们移除刚刚关于事务的代码,然后在 application.properties 中配置开启消息发送方确认机制,如下:
- spring.rabbitmq.publisher-confirm-type=correlated
- spring.rabbitmq.publisher-returns=true
第一行是配置消息到达交换器的确认回调,第二行则是配置消息到达队列的回调。
第一行属性的配置有三个取值:
接下来我们要开启两个监听,具体配置如下:
- @Configuration
- public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
- public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
- public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
- private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
- @Autowired
- RabbitTemplate rabbitTemplate;
- @Bean
- Queue queue() {
- return new Queue(JAVABOY_QUEUE_NAME);
- }
- @Bean
- DirectExchange directExchange() {
- return new DirectExchange(JAVABOY_EXCHANGE_NAME);
- }
- @Bean
- Binding binding() {
- return BindingBuilder.bind(queue())
- .to(directExchange())
- .with(JAVABOY_QUEUE_NAME);
- }
- @PostConstruct
- public void initRabbitTemplate() {
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnsCallback(this);
- }
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if (ack) {
- logger.info("{}:消息成功到达交换器",correlationData.getId());
- }else{
- logger.error("{}:消息发送失败", correlationData.getId());
- }
- }
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- logger.error("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());
- }
- }
关于这个配置类,我说如下几点:
这就可以了。
接下来我们对消息发送进行测试。
首先我们尝试将消息发送到一个不存在的交换机中,像下面这样:
- rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
注意第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报如下错误:
接下来我们给定一个真实存在的交换器,但是给一个不存在的队列,像下面这样:
- rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
注意此时第二个参数是一个字符串,不是变量。
可以看到,消息虽然成功达到交换器了,但是没有成功路由到队列(因为队列不存在)。
这是一条消息的发送,我们再来看看消息的批量发送。
2.2.2 消息批量处理
如果是消息批量处理,那么发送成功的回调监听是一样的,这里不再赘述。
这就是 publisher-confirm 模式。
相比于事务,这种模式下的消息吞吐量会得到极大的提升。
失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了。
两种重试我们分别来看。
前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置如下:
- spring.rabbitmq.template.retry.enabled=true
- spring.rabbitmq.template.retry.initial-interval=1000ms
- spring.rabbitmq.template.retry.max-attempts=10
- spring.rabbitmq.template.retry.max-interval=10000ms
- spring.rabbitmq.template.retry.multiplier=2
从上往下配置含义依次是:
间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
配置完成后,再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试。
业务重试主要是针对消息没有到达交换器的情况。
如果消息没有成功到达交换器,根据我们第二小节的讲解,此时就会触发消息发送失败回调,在这个回调中,我们就可以做文章了!
整体思路是这样:
首先创建一张表,用来记录发送到中间件上的消息,像下面这样:
每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下:
其他字段都很好理解,我就不一一啰嗦了。
在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1(在消息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。
另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。
大致的思路就是上面这样,松哥这里就不给出代码了,松哥的 vhr 里边邮件发送就是这样的思路来处理的,完整代码大家可以参考 vhr 项目(https://github.com/lenve/vhr)。
当然这种思路有两个弊端:
当然,大家也要注意,消息是否要确保 100% 发送成功,也要看具体情况。
好啦,这就是关于消息生产者的一些常见问题以及对应的解决方案,下篇文章松哥和大家探讨如果保证消息消费成功并解决幂等性问题。
本文涉及到的相关源代码大家可以在这里下载:https://github.com/lenve/javaboy-code-samples。
本文转载自微信公众号「江南一点雨」,可以通过以下二维码关注。转载本文请联系江南一点雨公众号。
网站题目:四种策略确保RabbitMQ消息发送可靠性!你用哪种?
链接地址:http://www.csdahua.cn/qtweb/news5/552655.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网