我们在单个系统中,一些业务处理可以顺序依次的进行。而涉及到跨系统(有时候系统内部亦然)的时候,会产生比较复杂数据交互(也可以理解为消息传递)的需求,这些数据的交互传递方式,可以是同步也可以是异步的。在异步传递数据的情况下,往往需要一个载体,来临时存储与分发消息。在此基础上,专门针对消息接收、存储、转发而设计与开发出来的专业应用程序,都可以理解为消息队列中间件。
目前创新互联公司已为1000+的企业提供了网站建设、域名、雅安服务器托管、网站托管、企业网站设计、涿鹿网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
引申一下:如果我们自己简单的使用一张数据库表,来记录数据,然后接受数据存储在数据表,通过定时任务再将数据表的数据分发出去,那么我们已经实现了一个最简单的消息系统(这就是本地消息表)。
我们可以认为消息中间件的基本思想就是 利用高效可靠的消息传递机制进行异步的数据传输。在这个基本思想的指导下,不同的消息中间,因为其侧重场景目的不同,在功能、性能、整体设计理念上又各有差别。
消息队列(MQ)本身是实现了生产者到消费者的单向通信模型,RabbitMQ、RocketMQ、Kafka这些常用的MQ都是指实现了这个模型的消息中间件。目前最常用的几个消息中间件主要有,RabbitMQ、RocketMQ、Kafka(分布式流处理平台)、Pulsar(分布式消息流平台)。这里我将两个流处理平台纳入其中了, 更早的一些其他消息中间件已经慢慢淡出视野。业务选型的时候我们遵循两个主要的原则:最大熟悉程度原则(便于运维、使用可靠)、业务契合原则(中间件性能可以支撑业务体量、满足业务功能需求)。
这几个常用的消息中间件选型对比,很容易找到,这里就不详细描述了。大概说一下:Pulsar目前用的不如 RabbitMQ、RocketMQ、Kafka多。RabbitMQ主要偏重是高可靠消息,RocketMQ性能和功能并重,Kafka主要是在大数据处理中应用比较多(Pulsar比较类似)。
我们先简单举例介绍一下异步、解耦、削峰的意义与价值(参考下面这张流程图):
对于一个用户注册接口,假设有2个业务点,分别是注册、发放新人福利,各需要50ms去处理逻辑。如果我们将这两个业务流程耦合在一个接口,那么总计需要100ms处理完成。但是该流程中,用户注册时候,可以不用关心自己的福利是否立即发放,只要尽快注册成功返回数据即可,后续新人福利这一部分业务可以在主流程之外处理。我们如果将其剥离出来,接口主流程中只处理登陆逻辑,并通过MQ推送一条消息,通过异步方式处理后续的发放新人福利逻辑,这样即可保证注册接口50ms左右即能获取结果。
而发放新人福利的业务,则通过异步任务慢慢处理。通过拆分业务点,我们已经做到解耦,注册的附属业务中增加或减少功能点都不会影响主流程。另外如果一个业务主流程在某个点请求并发比较高,正好通过异步方式,可以将压力分散到更长的时间段中去,达到减轻固定时间段处理压力的目的,这就是流量削峰。
另外,单线程模型的语言,通常对消息中间件的需求更强烈。多线程模型的语言,或者协程型语言,虽然可以通过自身的多线程(或协程)机制,来实现业务内部的异步处理,但是考虑到持久化问题以及管理难度,还是成熟的中间件更适合用来做异步数据通信,中间件还能实现分布式系统之间的数据异步通信。
消息中间件的应用场景主要有:
异步通信:可以用于业务系统内部的异步通信,也可以用于分布式系统信息交互。
系统解耦:将不同性质的业务进行隔离切分,提升性能,主附流程分层,按照重要性进行隔离,减少异常影响。
流量削峰:间歇性突刺流量分散处理,减少系统压力,提升系统可用性。
分布式事务一致性:RocketMQ提供的事务消息功能可以处理分布式事务一致性(如电商订单场景)。当然,也可以使用分布式事务中间件。
消息顺序收发:这是最基础的功能,先进先出,消息队列必备。
延时消息:延迟触发的业务场景,如下单后延迟取消未支付订单等。
大数据处理:日志处理,kafka。
分布式缓存同步:消费MySQLbinlog日志进行缓存同步,或者业务变动直接推送到MQ消费。
所以,如果你的业务中有以上列举的场景,或者类似的功能、性能需求,那么赶快引入「消息中间件」来提升你的业务性能吧。
虽然消息中间件引入有以上那么多好处,但是使用的时候依然会存在很多问题。例如:
当然我们对于以上的这些问题,针对业务开发者来说,可以进行提炼,得到以下几个重点问题:
常规的消息中间件和流处理中间件,本身设计一般都能支持顺序消息,但是根据中间件本身不同的设计目标,有不同的原理架构,导致我们业务中使用中间件的时候,要针对性做不同的处理。
以下几个常用消息或流中间件的顺序消息设计以及使用中乱序问题分析:
RabbitMQ的单个队列(queue)自身,可以保证消息的先进先出,在设计上,RabbitMQ所提供的单个队列数据是存储在单个broker节点上的,在开启镜像队列的情况下,镜像的队列也只是作为消息副本而存在,服务依然由主队列提供。这种情况下在单个队列上进行消费,天然就是顺序性的。不过由于单个队列支持多消费者同时消费,我们在开启多个消费者消费统一队列上的数据时候,消息分散到多个消费者上,在并发高的时候,多个消费者无法保证处理消息的顺序性。
解决方法就是对于需要强制顺序的消息,使用同一个MQ队列,并且针对单个队列只开启一个消费者消费(保证并发处理时候的顺序性,多线程同理)。由此引发的单个队列吞吐下降的问题,可以采取kafka的设计思想,针对单一任务开启一组多个队列,将需要顺序的消息按照其固定标识(例如:ID)进行路由,分散到这一组队列中,相同标识的消息进入到相同的队列,单个队列使用单个消费者消费,这样即可以保证消息的顺序与吞吐。
如图所示:
Kafka是流处理中间件,在其设计中,没有队列的概念,消息的收发依赖于Topic,单个topic可以有多个partition(分区),这些partition可以分散到多台broker节点上,并且partition还可以设置副本备份以保证其高可用。
Kafka同一个topic可以有多个消费者,甚至消费组。Kafka中消息消费一般使用消费组(消费组可以互不干涉的消费同一个topic下的消息)来进行消费,消费组中可以有多个消费者。同一个消费组消费单个topic下的多个partition时,将由kafka来调节消费组中消费者与partiton的消费进度与均衡。但是有一点是可以保证的:那就是单个partition在同一个消费组中只能被一个消费者消费。
以上的设计理念下,Kafka内部保证在同一个partition中的消息是顺序的,不保证topic下的消息的顺序性。Kafka的消息生产者发送消息的时候,是可以选择将消息发送到哪个partition中的,我们只要将需要顺序处理的消息,发送到topic下相同的partition,即可保证消息消费的顺序性。(多线程语言使用单个消费者,多线程处理数据时,需要自己去保证处理的顺序,这里略过)。
RocketMQ的一些基本概念和原理,可以通过阿里云的官网做一些了解: 什么是消息队列RocketMQ版?- 消息队列RocketMQ版 - 阿里云【1】 。
RocketMQ的消息收发也是基于Topic的,Topic下有多个 Queue, 分布在一个或多个 Broker 上,用来保证消息的高性能收发( 与Kafka的Topic-Partition机制 有些类似,但内部实现原理并不相同 )。
RocketMQ支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费,如果要实现某一个主题的全局顺序消息消费,可以将该主题的队列数量设置为1,牺牲高可用性。具体图解可以参考阿里云文档: 顺序消息2.0 - 消息队列RocketMQ版 - 阿里云【2】
消息丢失需要分为三部分来看:消息生产者发送消息到消息中间件的过程不发生消息丢失,消息在消息中间件中从接受存储到被消费的过程中消息不丢失, 消息消费的过程中保证能消费到中间件发送的消息而不会丢失。
生产者发送消息不丢失:
消息中间件一般都有消息发送确认机制(ACK), 对于客户端来说,只要配置好消息发送需要ACK确认,就可以根据返回的结果来判断消息是否成功发送到中间件中。这一步通常与中间件的消息接受存储流程设计有关系。根据中间件的设计,我们通常采取的措施如下:
在具体的业务设计中,如果消息发送失败,我们可以根据业务重要程度,做相应的补偿,例如:
消息中间件消息不丢失:
数消息中间件的消息接收存储机制各不相同,但是会根据其特性设计,最大限度保证消息不会丢失:
Kafka在消息接受到保存所做的设计有:
消费者消费消息不丢失:
无限阻塞的问题,可以参考RocketMQ消费失败的重试机制,对消息重试做一定的设计:
消息重复消费问题参考下一个小节。
在分析常用中间件的时候,我们往往会发现,中间件设计者将这个问题的处理,下放给中间件使用者,也就是业务开发者了。诚然,业务消费处理的逻辑比消息生产者复杂得多。生产者只需要保证将消息成功发送到中间件即可,而消费者需要在消费脚本中处理各种复杂的业务逻辑。
解决消息重复消费的问题,核心是使用唯一标识,来标记某条消息是否已经处理过。具体方案可选的则有很多,比如:
通常我们在引入消息中间件的时候,已经会评估与测试消息消费的生产与消费速率,尽量使其达到平衡。但业务也有一些不可预知的突发情况,可能会造成消息的大量积压。在这个时候,我们可以采取如下的方式,来做处理:
临时紧急扩容
消息积压预防
延迟消息这一项功能,在部分MQ中间件中有实现。延时消息和定时消息其实可以互相转换。
RocketMQ定时消息不支持任意的时间精度(出于性能考量)。只支持特定级别的延迟消息。消息延迟级别在broker端通过messageDelayLevel配置。其内部对每一个延迟级别创建对应的消息消费队列,然后创建对应延迟级别的定时任务,从消息消费队列中将消息拉取并恢复消息的原主题和原消息消费队列。
RabbitMQ实现延迟消息通常有两个方案:一是创建一个消息延迟死信队列,搭配一个死信转发队列来实现消费延时。但是该方式如果前一个消息没达到TTL时间,后一个消息即便达到了,也不会被转发到转发队列中;另一个是使用延时Exchange插件(rabbitmq_delayed_message_exchange),消息在达到TTL之后才会转发到对应的队列中并被消费。
Kafka本身不支持延时消息或定时消息, 想要实现消息的延时,需要使用其他的方案。
常用数据库的索引结构都支持数据的顺序索引。借助数据库可以很方便地实现任意时间消息的延时消费。使用一张表存储数据的消费时间,开启定时任务,在满足条件之后将该消息提取出来,后续转发到顺序队列去处理或者直接处理都可以(已处理需要做标记,后续不再出现),但是直接处理需要考虑吞吐量和并发重复性等问题。不如单个脚本转发到普通队列去处理方便。数据库支持的定时任务消息积压是可控的,但是吞吐量会有局限。
Reids的有序列表zset结构,可以实现延时消息。将消息的消费时间作为分值,把消息添加到zset中。使用 zrangebyscore 命令消费消息 # 命令格式 zrangebysocre key min max withscores limit 0 1 消费最早的一条消息 #min max 分别表示开始的分值与结束的分值区间,分别使用 0和当前时间戳,可以查出达到消费时间的消息 # withscores 表示查询的数据要带分值。limit 后面 就是查询的起始 offset 和数量 zrangebyscore key 0 {当前时间戳} withscores limit 0 1 。
当然,这个方案也有局限性,首先,redis必须配置持久化防止消息丢失(如果配置不合理不能100%保证,但是每个命令都持久化会造成性能下降,需要权衡);其次,如果延时消息过多会造成消息的积压形成大key;再次,需要自己做重复消费和消费失败的平衡处理(当然有可能,还是建议开启单个消费进程将延时消息转移到普通队列去消费)。
在很多软件中,都有基于时间轮实现定时任务的实现,使用时间轮以及多级时间轮可以实现延时任务调度。如果我们希望自己实现延时任务队列,可以考虑使用此算法来实现任务的调度,但是需要自己根据具体的需求去设计支持任务的延时上限以及调度的时间粒度(多层级)。时间轮算法我这里就先不讲解了,感兴趣的可以自己去搜索了解。
通过以上几个小节的介绍,相信各位已经能很自然的理解:消息队列、异步解耦的功能与核心思想,并且对如何使用MQ来架构自己的业务有了一定的认知。大多数MQ使用中的问题,只是要求我们多思考,将细节思虑周到,以保证业务的高可用。甚至,我们还可以在这几个解决方案中提炼一些核心出来,以便在业务中参照类似的思想,优化我们的业务。比如,消息顺序性保证 其核心是顺序消息生产者发送到唯一分区,再维持固定分区的单消费者顺序消费;避免消息丢失的核心是每个步骤的确认与降级机制;消费幂等的核心是唯一性标识与步进状态;消息积压处理的核心是快速响应应急预案;延迟消息的核心是消息排序,优化点是性能提升。
科学的方法有归纳和演绎,学习问题处理方案的过程中,提炼出相应的核心思想,并在使用中演绎,将这些归纳总结的知识点,再应用到业务中去,更加得心应手的处理相应的事务,构建出高可用的业务架构,这才是我们最需要做到的。
网站标题:消息中间件应用的常见问题与方案
当前链接:http://www.csdahua.cn/qtweb/news29/95879.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网