SpringBoot如何使用RocketMQ

这篇文章主要为大家展示了“SpringBoot如何使用RocketMQ”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“SpringBoot如何使用RocketMQ”这篇文章吧。

创新互联建站专注于包头企业网站建设,成都响应式网站建设公司,商城网站建设。包头网站建设公司,为包头等地区提供建站服务。全流程定制制作,专业设计,全程项目跟踪,创新互联建站专业和态度为您提供的服务

什么是RocketMQ?#

官方说明:

随着使用越来越多的队列和虚拟主题,ActiveMQ IO模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。因此,我们那时开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。

看到这里可以很清楚的知道RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。

具有以下特性:

支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型  能够保证严格的消息顺序,在一个队列中可靠的先进先出(FIFO)和严格的顺序传递  提供丰富的消息拉取模式,支持拉(pull)和推(push)两种消息模式  单一队列百万消息的堆积能力,亿级消息堆积能力  支持多种消息协议,如 JMS、MQTT 等  分布式高可用的部署架构,满足至少一次消息传递语义

RocketMQ环境安装#

下载地址:https://rocketmq.apache.org/dowloading/releases/

从官方下载二进制或者源码来进行使用。源码编译需要Maven3.2x,JDK8

在根目录进行打包:

mvn -Prelease-all -DskipTests clean packager -U

distribution/target/apache-rocketmq文件夹中会存在一个文件夹版,zip,tar三个可运行的完整程序。

使用rocketmq-4.6.0.zip:

启动名称服务 mqnamesrv.cmd  启动数据中心 mqbroker.cmd -n localhost:9876

SpringBoot环境中使用RocketMQ#

SpringBoot 入门:https://www.jb51.net/article/177449.htm

SpringBoot 常用start:https://www.jb51.net/article/177451.htm

当前环境版本为:

SpringBoot 2.0.6.RELEASE  SpringCloud Finchley.RELEASE  SpringCldod Alibaba 0.2.1.RELEASE  RocketMQ 4.3.0

在项目工程中导入:

<!-- MQ Begin --><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version></dependency><!-- MQ End -->

由于我们这边已经有工程了所以就不在进行创建这种过程了。主要是看看如何使用RocketMQ。

创建RocketMQProperties配置属性类,类中内容如下:

@ConfigurationProperties(prefix = "rocketmq")public class RocketMQProperties { private boolean isEnable = false; private String namesrvAddr = "localhost:9876"; private String groupName = "default"; private int producerMaxMessageSize = 1024; private int producerSendMsgTimeout = 2000; private int producerRetryTimesWhenSendFailed = 2; private int consumerConsumeThreadMin = 5; private int consumerConsumeThreadMax = 30; private int consumerConsumeMessageBatchMaxSize = 1; //省略get set}

现在我们所有子系统中的生产者,消费者对应:

isEnable 是否开启mq

namesrvAddr 集群地址

groupName 分组名称

设置为统一已方便系统对接,如有其它需求在进行扩展,类中我们已经给了默认值也可以在配置文件或配置中心中获取配置,配置如下:

#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示rocketmq.groupName=please_rename_unique_group_name#是否开启自动配置rocketmq.isEnable=true#mq的nameserver地址rocketmq.namesrvAddr=127.0.0.1:9876#消息最大长度 默认1024*4(4M)rocketmq.producer.maxMessageSize=4096#发送消息超时时间,默认3000rocketmq.producer.sendMsgTimeout=3000#发送消息失败重试次数,默认2rocketmq.producer.retryTimesWhenSendFailed=2#消费者线程数量rocketmq.consumer.consumeThreadMin=5rocketmq.consumer.consumeThreadMax=32#设置一次消费消息的条数,默认为1条rocketmq.consumer.consumeMessageBatchMaxSize=1

创建消费者接口 RocketConsumer.java 该接口用户约束消费者需要的核心步骤:

/** * 消费者接口 *  * @author SimpleWu * */public interface RocketConsumer {/** * 初始化消费者 */ public abstract void init(); /** * 注册监听 *  * @param messageListener */ public void registerMessageListener(MessageListener messageListener);}

创建抽象消费者 AbstractRocketConsumer.java:

/** * 消费者基本信息 *  * @author SimpelWu */public abstract class AbstractRocketConsumer implements RocketConsumer { protected String topics; protected String tags; protected MessageListener messageListener; protected String consumerTitel; protected MQPushConsumer mqPushConsumer; /** * 必要的信息 *  * @param topics * @param tags * @param consumerTitel */ public void necessary(String topics, String tags, String consumerTitel) { this.topics = topics; this.tags = tags; this.consumerTitel = consumerTitel; } public abstract void init(); @Override public void registerMessageListener(MessageListener messageListener) { this.messageListener = messageListener; } }

在类中我们必须指定这个topics,tags与消息监听逻辑

public abstract void init();该方法是用于初始化消费者,由子类实现。

接下来我们编写自动配置类RocketMQConfiguation.java,该类用户初始化一个默认的生产者连接,以及加载所有的消费者。

@EnableConfigurationProperties({ RocketMQProperties.class }) 使用该配置文件

@Configuration 标注为配置类

@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") 只有当配置中指定rocketmq.isEnable = true的时候才会生效

核心内容如下:

/** * mq配置 *  * @author SimpleWu */@Configuration@EnableConfigurationProperties({ RocketMQProperties.class })@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true")public class RocketMQConfiguation { private RocketMQProperties properties; private ApplicationContext applicationContext; private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class); public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) { this.properties = properties; this.applicationContext = applicationContext; } /** * 注入一个默认的消费者 * @return * @throws MQClientException */ @Bean public DefaultMQProducer getRocketMQProducer() throws MQClientException { if (StringUtils.isEmpty(properties.getGroupName())) {  throw new MQClientException(-1, "groupName is blank"); } if (StringUtils.isEmpty(properties.getNamesrvAddr())) {  throw new MQClientException(-1, "nameServerAddr is blank"); } DefaultMQProducer producer; producer = new DefaultMQProducer(properties.getGroupName()); producer.setNamesrvAddr(properties.getNamesrvAddr()); // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); // 如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName // producer.setInstanceName(instanceName); producer.setMaxMessageSize(properties.getProducerMaxMessageSize()); producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout()); // 如果发送消息失败,设置重试次数,默认为2次 producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed()); try {  producer.start();  log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(),   properties.getNamesrvAddr()); } catch (MQClientException e) {  log.error(String.format("producer is error {}", e.getMessage(), e));  throw e; } return producer; } /** * SpringBoot启动时加载所有消费者 */ @PostConstruct public void initConsumer() { Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class); if (consumers == null || consumers.size() == 0) {  log.info("init rocket consumer 0"); } Iterator<String> beans = consumers.keySet().iterator(); while (beans.hasNext()) {  String beanName = (String) beans.next();  AbstractRocketConsumer consumer = consumers.get(beanName);  consumer.init();  createConsumer(consumer);  log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags,   consumer.topics); } } /** * 通过消费者信心创建消费者 *  * @param consumerPojo */ public void createConsumer(AbstractRocketConsumer arc) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName()); consumer.setNamesrvAddr(this.properties.getNamesrvAddr()); consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin()); consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax()); consumer.registerMessageListener(arc.messageListenerConcurrently); /**  * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费  */ // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /**  * 设置消费模型,集群还是广播,默认为集群  */ // consumer.setMessageModel(MessageModel.CLUSTERING); /**  * 设置一次消费消息的条数,默认为1条  */ consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize()); try {  consumer.subscribe(arc.topics, arc.tags);  consumer.start();  arc.mqPushConsumer=consumer; } catch (MQClientException e) {  log.error("info consumer title {}", arc.consumerTitel, e); } }}

然后在src/main/resources文件夹中创建目录与文件META-INF/spring.factories里面添加自动配置类即可开启启动配置,我们只需要导入依赖即可:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.xcloud.config.rocketmq.RocketMQConfiguation

接下来在服务中导入依赖,然后通过我们的抽象类获取所有必要信息对消费者进行创建,该步骤会在所有消费者初始化完成后进行,且只会管理是Spring Bean的消费者。

下面我们看看如何创建一个消费者,创建消费者的步骤非常简单,只需要继承AbstractRocketConsumer然后再加上Spring的@Component就能够完成消费者的创建,我们可以在类中自定义消费的主题与标签。

在项目可以根据需求当消费者创建失败的时候是否继续启动工程。

创建一个默认的消费者 DefaultConsumerMQ.java

@Componentpublic class DefaultConsumerMQ extends AbstractRocketConsumer { /** * 初始化消费者 */ @Override public void init() { // 设置主题,标签与消费者标题 super.necessary("TopicTest", "*", "这是标题"); //消费者具体执行逻辑 registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  msgs.forEach(msg -> {   System.out.printf("consumer message boyd %s %n", new String(msg.getBody()));  });  // 标记该消息已经被成功消费  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  } }); }}

super.necessary("TopicTest", "*", "这是标题"); 是必须要设置的,代表该消费者监听TopicTest主题下所有tags,标题那个字段是我自己定义的,所以对于该配置来说没什么意义。

我们可以在这里注入Spring的Bean来进行任意逻辑处理。

创建一个消息发送类进行测试

@Overridepublic String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException { Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息到一个Broker SendResult sendResult = defaultMQProducer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); return null;}

我们来通过Http请求测试:

http://localhost:10001/demo/base/mq/hello consumer message boyd hello http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿 consumer message boyd 嘿嘿嘿嘿嘿

好了到这里简单的start算是设计完成了,后面还有一些:顺序消息生产,顺序消费消息,异步消息生产等一系列功能,官人可参照官方去自行处理。

ActiveMQ 没经过大规模吞吐量场景的验证,社区不高不活跃。  RabbitMQ 集群动态扩展麻烦,且与当前程序语言不至于难以定制化。  kafka 支持主要的MQ功能,功能无法达到程序需求的要求,所以不使用,且与当前程序语言不至于难以定制化。  rocketMQ 经过全世界的女人的洗礼,已经很强大;MQ功能较为完善,还是分布式的,扩展性好;支持复杂MQ业务场景。(业务复杂可做首选)

以上是“SpringBoot如何使用RocketMQ”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!

本文名称:SpringBoot如何使用RocketMQ
URL分享:https://www.cdcxhl.com/article44/pjepee.html

成都网站建设公司_创新互联,为您提供做网站企业网站制作网站维护移动网站建设小程序开发网站营销

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

成都app开发公司