RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程
成都创新互联公司主要从事网站设计、网站建设、网页设计、企业做网站、公司建网站等业务。立足成都服务白碱滩,10余年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:028-86922220
在Broker启动的时候会拉起相关服务
流程如下:
流程图引用网址
http://blog.csdn.net/akfly/article/details/53447000
由于是Broker来存储消息,那么消息入口的代码应该是在Broker里面,而Broker的入口是BrokerStartup,以及重要的BrokerController。
具体流程可以参考Broker启动源代码分析。
Broker启动流程
以发送消息为例
Broker启动的时候,会注册一个SendMessageProcesser来响应netty的发送消息请求,如下:
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
}
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand proce***equest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
...
switch (request.getCode()) {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
继续看sendMessage..
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
...
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
调用MessageStore.putMessage(msgInner)
当前文章:52.源代码解读-RocketMQ消息写入机制
文章起源:https://www.cdcxhl.com/article48/pojohp.html
成都网站建设公司_创新互联,为您提供动态网站、网站收录、软件开发、网站排名、微信公众号、营销型网站建设
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联