前文了解了 RocketMQ消息存储的相关原理,本文将讲讲消息消费的过程及相关概念。
关于消息消费,消费者组这些概念,基本和kafka 是类似的,比如:
一个消费组内可以包含多个消费者,1个消费组可订阅多个主题。消费组之间有集群模式与广播模式两种。
集群模式下,主题下的同一消息只允许被消费组内的一个消费者消费,消费进度存储在 broker 端。广播模式下,则每个消费者都可以消费该消息,消费进度存储在消费者端。
集群模式下,一个消费队列同一时间,只允许被一个消费者消费,1个消费者,可以消费多个消息队列。具体的可以看我前面的文章。
而且 rocketmq 消息服务器与消费者的消息传输有 2 种方式:推模式、拉模式。拉模式,即消费者主动向消息服务器发送请求;推模式,即消息服务器向消费者推送消息。推模式,是基于拉模式实现的。
主要就是初始化了三个组件,然后启动后台定时任务。
三个组件:
几个定时任务
对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:
Pull
即消费者每隔一定时间主动去 Broker 拉取消息
优点
消费速度、数量可控
缺点
如果间隔时间短,可能会拉空,并且频繁 RPC 请求增加网络开销 如果间隔时间长,则可能会有消息延迟 消费进度offset需要consumer自己来维护
Push
即 Broker 主动实时推送消息给消费者
优点
消息实时,保持长链接,不会频繁建立链接
缺点
如果消息数量过大,消费者吞吐量小,肯能会造成消费者缓冲区溢出。
在文章的开头我们也说了RocketMQ推模式,是基于拉模式实现的。
【PullMessageService 消息拉取】
RocketMQ 通过 PullMessageService 拉取消息。
通过代码段 PullMessageService#run可以看出:
- public void run() {
- // stopped 是 volidate 修饰的变量,用于线程间通信。
- while (!this.isStopped()) {
- // ..
- // 阻塞队列, 如果 pullRequestQueue 没有元素,则阻塞
- PullRequest pullRequest = this.pullRequestQueue.take();
- // 消息拉取
- this.pullMessage(pullRequest);
- // ...
- }
- }
关于PullRequest
- // 消费者组
- private String consumerGroup;
- // 消息队列
- private MessageQueue messageQueue;
- // 消息处理队列,从 Broker 拉取到的消息先存入 ProcessQueue,然后再提交到消费者消费池消费
- private ProcessQueue processQueue;
- // 待拉取的 MessageQueue 偏移量
- private long nextOffset;
- // 是否被锁定
- private boolean lockedFirst = false;
PullMessageService 添加 PullRequest 有两种方式:
延时添加
立即添加
【关于ProcessQueue】
ProcessQueue 是 MessageQueue 在消费端的重现、快照。PullMessageService 从消息服务器默认每次拉取 32 条消息,按消息的队列偏移量顺序存放在 ProcessQueue 中,PullMessageService 再将消息提交到消费者消费线程池。消息消费成功后,从 ProcessQueue 中移除。
- // 读写锁
- private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
- // 消息存储容器, k:消息偏移量,v:消息实体
- private final TreeMap
msgTreeMap = new TreeMap (); - // ProcessQueue 中消息总数
- private final AtomicLong msgCount = new AtomicLong();
- // ProcessQueue 中消息总大小
- private final AtomicLong msgSize = new AtomicLong();
- // 当前 ProcessQueue 中包含的最大队列偏移量
- private volatile long queueOffsetMax = 0L;
- // 当前 ProcessQueue 是否被丢弃
- private volatile boolean dropped = false;
- // 上一次开始消息拉取时间戳
- private volatile long lastPullTimestamp = System.currentTimeMillis();
- // 上一次消息消费时间戳
- private volatile long lastConsumeTimestamp = System.currentTimeMillis();
【对消息拉取进行流量控制】
processQueue 的消息数量 大于 1000, processQueue 的消息大小 大于 100 MB,将延迟 50 毫秒后拉取消息
processQueue 中偏移量最大的消息与偏移量最小的消息的跨度超过 2000 则延迟 50 毫秒再拉取消息。
根据主题拉取订阅的消息,如果为空,延迟 3 秒,再拉取。
【消息服务端 broker 组装消息】
代码位置:PullMessageProcessor#processRequest
【消息拉取长轮询机制】
RocketMQ 推模式是循环向消息服务端发送消息拉取请求。
消费者向 broker 拉取消息时,如果消息未到达消费队列,并且未启用 长轮询机制,则会在服务端等待 shortPollingTimeMills(默认1秒) 时间后再去判断消息是否已经到达消息队列,如果消息未到达,则提示消息拉取客户端 PULL_NOT_FOUND。
如果开启长轮询模式,rocketMQ 会每 5s 轮询检查一次消息是否可达,同时一有新消息到达后立马通知挂起线程再次验证新消息是否是自己感兴趣的消息,如果是则从 commitlog 文件提取消息返回给消息拉取客户端,否则直到挂起超时,超时时间由消息拉取方在消息拉取时封装在请求参数中,PUSH 模式默认 15s。
PULL 模式通过 DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis 设置。RocketMQ 通过在 Broker 端配置 longPollingEnable 为 true 来开启长轮询模式。
RocketMQ 的长轮询机制由 2 个线程共同完成。PullRequestHoldService、ReputMessageService。
【Push消费模式流程简析】
后台独立线程RebalanceServic根据Topic中消息队列个数和当前消费组内消费者个数进行负载均衡,给当前消费者分配对应的MessageQueue,将其封装为PullRequest实例放入队列pullRequestQueue中。
Consumer端开启后台独立的线程PullMessageService不断地从队列pullRequestQueue中获取PullRequest并通过网络通信模块异步发送Pull消息的RPC请求给Broker端。这里算是比较典型的生产者-消费者模型,实现了准实时的自动消息拉取。
PullMessageService异步拉取到消息后,通过PullCallback进行回调处理,如果拉取成功,则更新消费进度,putPullRequest到阻塞队列pullRequestQueue中,接着立即进行拉取
监听器 ConsumeMessageConcurrentlyService 会一直监听回调方法 PullCallback,把拉取到的消息交给Consumerrequest进行处理,Consumerrequest会调用消费者业务方实现的consumeMessage()接口处理具体业务,消费者业务方处理完成后返回ACK给Consumerrequest,如果消费者ACK返回的失败,则在集群模式下把消息发回 Broker 进行重试(广播模型重试的成本太高),最后更新消费进度offsetTable
在Broker端,PullMessageProcessor业务处理器收到Pull消息的RPC请求后,通过MessageStore实例从commitLog获取消息。如果第一次尝试Pull消息失败(比如Broker端没有可以消费的消息),则通过长轮询机制先hold住并且挂起该请求,然后通过Broker端的后台线程PullRequestHoldService重新尝试和后台线程ReputMessageService进行二次处理。
【Push消息流程图】
普通轮询比较简单,就是定时发起请求,服务端收到请求后不论数据有没有更新都立即返回
优点就是实现简单,容易理解。
缺点就是服务端是被动的,服务端要不断的处理客户端连接,并且服务端无法控制客户端pull的频率以及客户端数量.
长轮询是对普通轮询的优化,依然由客户端发起请求,服务端收到后并不立即响应而是hold住客户端连接,等待数据产生变更后(或者超过指定时间还未产生变更)才回复客户端
说白了,就是对普通轮询加了个控制,你客户端可以随时请求我,但是回不回复我说了算,这就保证了服务端不会被客户端带节奏,导致自己的压力不可控.
在 RocketMq 中消费者主动发起pull请求,broker在处理消息拉取请求时,如果没有查询到消息,将不返回消费者任何信息,而是先hold住并且挂起请求,使其不会立即发起下一次拉取请求,会将请求信息pullRequest添加到pullRequestTable中,等待触发通知消费者的事件。
当生产者发送最新消息过来后,首先持久化到commitLog文件,通过异步方式同时持久化consumerQueue和index。然后激活consumer发送来hold的请求,立即将消息通过channel写入consumer客户。
如果没有消息到达且客户端拉取的偏移量是最新的,会hold住请求。其中hold请求超时时间 < 请求设定的超时时间。同时Broker端也定时检测是否请求超时,超时则立即将请求返回,状态code为NO_NEW_MESSAGE。
然后在Broker端,通过后台独立线程PullRequestHoldService遍历所有挂起的请求pullRequestTable,如果有消息,则返回响应给消费者。
同时,另外一个ReputMessageService线程不断地构建ConsumeQueue/IndexFile数据,不断的检测是否有新消息产生,如果有新消息,则从pullRequestTable通过Topic+queueId的key获取对应hold住的请求pullRequest,再根据其中的长链接channel进行通信响应。
通过这种长轮询机制,即可解决Consumer端需要通过不断地发送无效的轮询Pull请求,而导致整个RocketMQ集群中Broker端负载很高的问题。
流程如下:
当一个业务系统部署多台机器时,每台机器都启动了一个Consumer,并且这些Consumer都在同一个ConsumerGroup也就是消费组中,此时一个消费组中多个Consumer消费一个Topic,而一个Topic中会有多个MessageQueue。
比如有2个Consumer,3个MessageQueue,那么这3个MessageQueue怎么分配呢?这就涉及到Consumer的负载均衡了。
首先 Consumer 在启动时,会把自己注册给所有 Broker ,并保持心跳,让每一个 Broker 都知道消费组中有哪些 Consumer 。
然后 Consumer 在消费时,会随机链接一台 Broker ,获取消费组中的所有 Consumer 。
主要流程如下:
RocketMQ 消息队列重新分布由 RebalanceService 线程来实现的。RebalanceService 随着 MQClientInstance 的启动而启动。RebalanceService 默认每 20 秒,执行一次 MQClientInstance#doRebalance
【主题的消息队列负载流程】
客户端执行期间 伴随着PullMessageService 与 RebalanceService 线程交互
【消费过程】
【消息确认】
客户端在发送重试消息时,封装了 ConsumerSendMsgBackRequestHeader。
- // 消息物理偏移量
- private Long offset;
- // 消费组
- private String group;
- // 延迟等级
- private Integer delayLevel;
- // 消息ID
- private String originMsgId;
- // 消息主题
- private String originTopic;
- // 最大重新消费次数,默认 16 次 SubscriptionGroupConfig.retryMaxTimes 中定义
- private Integer maxReconsumeTimes;
服务端的接收逻辑
从消息消费者和消费者组的基本概念,到消息消费的流程。我们了解了RocetMQ消息消费的相关原理。消费者客户端的启动后,会后台运行几个定时任务来处理相关的逻辑。也知道了RocetMQ消息获取有推拉两种模式,而且推模式也是建立在拉模式的基础之上。知道了普通轮询和长轮询的区别,并且了解了长轮询的实现逻辑。对消息消费和确认流程有了了解。
分享名称:消费者原理分析-RocketMQ知识体系(四)
网页路径:http://www.csdahua.cn/qtweb/news28/335828.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网