大家好,我是君哥。
今天来分享 RocketMQ 中一个关键的知识点,消费者的启动过程。
多数消息队列中,消费者和 Broker 通信的方式有两种,PUSH 模式和 PULL 模式:
首先看下面这张图:
图中可以看出,消费者需要注册到 Name Server,拉取消息的时候可以从 Broker 主节点拉取,也可以从 Broker 从节点拉取。
在 RocketMQ 的源码中,拉模式有两个消费者相关的类,其中 DefaultMQPullCons umer 类已经被废弃,官方推荐使用 Defau ltLitePullConsumer 类。下面代码来自官方示例:
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
//启动方法
litePullConsumer.start();
try {
while (running) {
//这里可以看到,PULL 模式下消费者需要业务代码主动去拉取消息
ListmessageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
上面代码中消费者属于消费组 lite_pull _consumer_test,订阅了【TopicTest 】这个 Topic 下的所有 tag。下面一起看一下启动方法。下图是消费者启动过程中类调用关系图,图中心的 pullRequestQueu e 是核心,pull 请求会先发送到这个队列,然后循环地拉取处理。
消费者启动时首先会检查配置,检查的配置项如下:
这部分源代码见 DefaultLitePullConsum erImpl#checkConfig。
如果是集群模式,实例名称改为【进程 ID + “ #” + 系统时间(纳秒 )】,代码如下:
//ClientConfig类
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
创建一个 MQClientInstance 实例,然后把消费者注册到 MQClientInstance。
private void initMQClientFactory() throws MQClientException {
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
}
对 RebalanceLitePullImpl 实例初始化,给下面的参数赋值:
负载均衡线程启动后,默认每 20s 做一次负载均衡,见如下代码:
//RebalanceService 类
public void run() {
while (!this.isStopped()) {
//waitInterval 默认 20s,可以配置
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}
PullAPIWrapper 这个 Wrapper 类是 MQ-ClientInstance 类的 Wrapper 类,类中 pullKernelImpl 方法对 MQClientInstance 类中的 pullMessage 方法进行了装饰,这个装饰类主要增加了下面功能:
代码如下 :
//PullAPIWrapper
public PullResult pullKernelImpl(
//省略所有参数
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//1.获取 Broker 地址
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
//省略从 Name sever 更新本地 Broker 缓存逻辑
if (findBrokerResult != null) {
{
//2.检查 RocketMQ 版本
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
}
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
//3.把偏移量的位改为 0,(偏移量 0x1)
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
//4.封装请求 header
PullMessageRequestHeader = new PullMessageRequestHeader();
//省略封装 requestHeader
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
//5.获取 filterServer 地址
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
offset 存储器的 UML 类图如下:
有两个实现类分别对应集群模式和广播模式,本文讨论的集群模式的实现类是 RemoteBrokerOffsetStore。offset 可以存储在本地或者远端服务器。
启动 MQ 客户端主要包括如下步骤:
//PullMessageService类
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
从下面的代码可以看出,PULL 拉取消息最终使用了 DefaultMQPushConsumer Impl,所以 PULL 模式和 PUSH 模式拉取消息的逻辑是一样的。
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
5.启动 MessageQueue 负载均衡线程。
6.启动生产者线程;7.把 serviceState 改为 Running。
7.源码参考 MQClientInstance#start。
这个定时任务默认每 30s 执行一次,用于监听每个 Topic 下的 MessageQueue 是否发生变化。代码见 startScheduleTask 方法。
轨迹消息主要用于跟踪消息发送、消息消费的轨迹,用于记录详细日志。代码如下:
//AsyncTraceDispatcher 类
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
这里不详细展开了,后面再单独讨论。
本文通过源码分析讲解了 RocketMQ 中 PULL 模式下的消费者启动过程,在生产上使用比较多的还是 PUSH 模式,PULL 模式拉取消息的方法跟 PUSH 模式一样,不同的是 PULL 模式需要应用程序进行拉取动作,可以通过 PULL 模式的学习更容易的理解 PUSH 模式。最后,分析一个 PULL 模式启动过程涉及的 UML 类图:
文章题目:五张图带你理解RocketMQ消费者启动过程
分享网址:http://www.csdahua.cn/qtweb/news28/416428.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网