记一次线上Java程序故障惊心动魄两小时

周日早上醒来,明媚的阳光从卧室的窗户直射进来,久违的好天气。穿好衣服我开始筹划今天去哪里转转。一周忙碌的工作几乎没有时间陪家人,今天该好好陪陪家人了。

10年的江城网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。成都全网营销的优势是能够根据用户设备显示端的尺寸不同,自动调整江城建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。成都创新互联从事“江城网站设计”,“江城网站推广”以来,每个客户项目都认真落实执行。

当我起来收拾好一切准备出发的时候,我瞄了一眼手机,发现手机的邮箱里有一份报警邮件,报警邮件显示线上最近10分钟流量有异常,而且是多个渠道。有突然有一种不祥的预感:线上kafka出问题了。我让媳妇和孩子下楼在车里等我,我赶紧打开了电脑,查看线上系统。果然不出意外,kafka已经积压了几千万的数据。因为我们的业务分为实时数据和离线数据,实时数据是FileBeat负责收集日志发到Kafka,然后我们这个业务系统消费Kafka统计数据,实时数据对于当前流量分析、预算控制、熔断有非常重要的作用,如果实时数据异常,其它业务系统都会受到一定的影响。

定位到报警邮件是由于kafka消息积压而导致实时数据异常触发的,我立马连上了我们消费Kafka的业务系统(data-collect)。这是一个运行时间很长了的Java服务,它的作用就是实时消费kafka数据,然后经过一定的业务逻辑处理,将最终结果更新到mongodb中。进到服务器以后,我发现这个服务已经处于假死状态,最后一条日志显示系统发生了OOM,也就是服务器内存爆了。

关于data-collect这个Java服务的核心逻辑我在这里详细说明一下。这个系统的代码是很早的一位同事写的,因为早期我们的数据体量还不是很大,所以,他采用了一种简单的处理方式。先消费数据,处理完成以后放到一个Map中,然后,启动了一个每10s执行一次的定时任务,定时任务读取Map数据更新到mongo中,然后清空Map(ConcurrentMap)。这样做的优点是将消费Kafka的操作和入库操作分开了,可以防止因为入库时间太长而导致消费速度变慢,但是,这种做法有一个致命的缺点:内存不可控。如果定时任务因为Mongo操作时间太长而没有及时清空Map,Map中会积累大量的数据,最终耗尽内存,系统发生OOM。这时候如果系统自启了,也会丢失大量的数据。

其实,这个问题我很早有意识到,但是系统一直运行良好,没有出现任何问题,我们认为在现有数据体量下它是安全的。而碰巧的是,就在前一天我们升级了Mongo的配置,mongo机器进行了一个主从切换。同时,有一些大表清理和TTL索引重建的工作还在mongo后台运行。这就导致了我们操作mongo耗时的增加。进而导致了我们一直认为安全的系统出现了这个问题。

回到data-collect这个系统的设计上。可能有的同学会在这里有个疑问,为什么不直接消费出来就入库操作呢?这里我们有一个重要的处理逻辑:为了防止频繁的更新mongo,我们会将消费出来的数据在内存中进行一个合并处理,你可以简单的理解为一个Map,如果key存在,我们就进行++的操作。最终操作mongo是$inc的操作,不是insert和update的操作。这也是我们需要一个ConcurrentMap的原因。也就是我们大概消费了1000万条数据,但是最终我们处理完成以后只有10万条数据,很多key相同的数据我们都进行了合并处理,这样我们mongo的操作就减少了很多。

data-collect发生了OOM,我只能第一时间重启,重启以后,消费正常,系统开始有了数据。但是大概运行了几分钟以后,又发生了OOM。原因很简单:kafka积压的了大量的消息,消费很快,但是异步如mongo太耗时,所以导致数据全部挤压在了这个Map内存中。看到这里,我想只能动手改造代码了。改造的最终要达到的结果是:系统在不发生OOM的前提下,消费积压在kafka中的数据,完成mongo操作。

改造的思路很简单,就是干掉定时任务。在消费kafka消息中增加一个逻辑,每当消费消息并且内存进行数据合并完成以后,我们判断Map的大小,如果Map的大小超过我们设定的限制以后,开始触发mongo操作。之前的代码mongo操作是单线程执行,为了提升mongo插入操作,我们开启20个线程并行执行,所以我们这里需要一个带阻塞队列的线程池。改造后的代码如下:

这里是SpringBoot集成Kafka的消费代码。

这是内存处理完成以后入mongo的操作。因为我们的topic有20个分区,所以代码中的listenPartition0是多线程执行的。如果没有synchronized的同步代码块,那assembleyAdxTrafficVo方法就会多线程执行,这就会导致数据重复插入mongo,具体大家可以体会assembleyAdxTrafficVo方法的逻辑。

而插入mongo操作的用了线程池ExecutorService,注意这里我们executorService对象的定义。

为什么要自己定义一个阻塞队列CustomeBlockQueue?这相比很多人碰到过这个问题,如果采用默认的阻塞队列,例如:ArrayBlockingQueue,当队列长度长度超过设置的值时,ArrayBlockingQueue会拒绝新的数据进入,并且抛出异常,所以我们需要自己定义CustomeBlockQueue,并且重写他的offer方法(BlockingQueue默认采用offer方法将元素增加到队列),offer方法不会阻塞,put方法会阻塞,所以我们需要重写offer方法,并且内部采用put方法实现。关于这一点,大家可以多尝试。ArrayBlockingQueue和LinkedBlockingQueue都有很多坑等大家去踩。

按照上述代码处理完成上线以后,系统开始正常运行,kafka积压的消息也开始慢慢降低,系统趋于恢复正常,而这时已经是12点了,惊心动魄的2小时总算过去了,阿弥陀佛。

新闻名称:记一次线上Java程序故障惊心动魄两小时
链接地址:http://www.csdahua.cn/qtweb/news15/539315.html

网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网