業(yè)務(wù)背景
目前移動端的使用場景中會用到大量的消息推送,push消息可以幫助運營人員更高效地實現(xiàn)運營目標(biāo)(比如給用戶推送營銷活動或者提醒APP新功能)。
對于推送系統(tǒng)來說需要具備以下兩個特性:
-
消息秒級送到用戶,無延時,支持每秒百萬推送,單機百萬長連接。
-
支持通知、文本、自定義消息透傳等展現(xiàn)形式。正是由于以上原因,對于系統(tǒng)的開發(fā)和維護帶來了挑戰(zhàn)。下圖是推送系統(tǒng)的簡單描述(API->推送模塊->手機)。
問題背景
推送系統(tǒng)中長連接集群在穩(wěn)定性測試、壓力測試階運行一段時間后隨機會出現(xiàn)一個進程掛掉的情況,概率較小(頻率為一個月左右發(fā)生一次),這會影響部分客戶端消息送到的時效。
推送系統(tǒng)中的長連接節(jié)點(Broker系統(tǒng))是基于Netty開發(fā),此節(jié)點維護了服務(wù)端和手機終端的長連接,線上問題出現(xiàn)后,添加Netty內(nèi)存泄露監(jiān)控參數(shù)進行問題排查,觀察多天但并未排查出問題。
由于長連接節(jié)點是Netty開發(fā),為便于讀者理解,下面簡單介紹一下Netty。
Netty介紹
Netty是一個高性能、異步事件驅(qū)動的NIO框架,基于Java NIO提供的API實現(xiàn)。它提供了對TCP、UDP和文件傳輸?shù)闹С郑鳛楫?dāng)前最流行的NIO框架,Netty在互聯(lián)網(wǎng)領(lǐng)域、大數(shù)據(jù)分布式計算領(lǐng)域、游戲行業(yè)、通信行業(yè)等獲得了廣泛的應(yīng)用,HBase,Hadoop,Bees,Dubbo等開源組件也基于Netty的NIO框架構(gòu)建。
問題分析
猜想
最初猜想是長連接數(shù)導(dǎo)致的,但經(jīng)過排查日志、分析代碼,發(fā)現(xiàn)并不是此原因造成。
長連接數(shù):39萬,如下圖:
每個channel字節(jié)大小1456, 按40萬長連接計算,不致于產(chǎn)生內(nèi)存過大現(xiàn)象。
查看GC日志
查看GC日志,發(fā)現(xiàn)進程掛掉之前頻繁full GC(頻率5分鐘一次),但內(nèi)存并未降低,懷疑堆外內(nèi)存泄露。
分析heap內(nèi)存情況
ChannelOutboundBuffer對象占將近5G內(nèi)存,泄露原因基本可以確定:ChannelOutboundBuffer的entry數(shù)過多導(dǎo)致,查看ChannelOutboundBuffer的源碼可以分析出,是ChannelOutboundBuffer中的數(shù)據(jù)。
沒有寫出去,導(dǎo)致一直積壓;
ChannelOutboundBuffer內(nèi)部是一個鏈表結(jié)構(gòu)。
從上圖分析數(shù)據(jù)未寫出去,為什么會出現(xiàn)這種情況?
代碼中實際有判斷連接是否可用的情況(Channel.isActive),并且會對超時的連接進行關(guān)閉。從歷史經(jīng)驗來看,這種情況發(fā)生在連接半打開(客戶端異常關(guān)閉)的情況比較多---雙方不進行數(shù)據(jù)通信無問題。
按上述猜想,測試環(huán)境進行重現(xiàn)和測試。
1)模擬客戶端集群,并與長連接服務(wù)器建立連接,設(shè)置客戶端節(jié)點的防火墻,模擬服務(wù)器與客戶端網(wǎng)絡(luò)異常的場景(即要模擬Channel.isActive調(diào)用成功,但數(shù)據(jù)實際發(fā)送不出去的情況)。
2)調(diào)小堆外內(nèi)存,持續(xù)發(fā)送測試消息給之前的客戶端。消息大小(1K左右)。
3)按照128M內(nèi)存來計算,實際上調(diào)用9W多次就會出現(xiàn)。
問題解決
啟用autoRead機制
當(dāng)channel不可寫時,關(guān)閉autoRead;
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- if (!ctx.channel().isWritable()) {
- Channel channel = ctx.channel();
- ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel);
- String clientId = "";
- if (channelInfo != null) {
- clientId = channelInfo.getClientId();
- }
- LOGGER.info("channel is unwritable, turn off autoread, clientId:{}", clientId);
- channel.config().setAutoRead(false);
- }
- }
當(dāng)數(shù)據(jù)可寫時開啟autoRead;
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
- {
- Channel channel = ctx.channel();
- ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel);
- String clientId = "";
- if (channelInfo != null) {
- clientId = channelInfo.getClientId();
- }
- if (channel.isWritable()) {
- LOGGER.info("channel is writable again, turn on autoread, clientId:{}", clientId);
- channel.config().setAutoRead(true);
- }
- }
說明:
autoRead的作用是更精確的速率控制,如果打開的時候Netty就會幫我們注冊讀事件。當(dāng)注冊了讀事件后,如果網(wǎng)絡(luò)可讀,則Netty就會從channel讀取數(shù)據(jù)。那如果autoread關(guān)掉后,則Netty會不注冊讀事件。
這樣即使是對端發(fā)送數(shù)據(jù)過來了也不會觸發(fā)讀事件,從而也不會從channel讀取到數(shù)據(jù)。當(dāng)recv_buffer滿時,也就不會再接收數(shù)據(jù)。
設(shè)置高低水位
- serverBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 8 * 1024 * 1024));
注:高低水位配合后面的isWritable使用
增加channel.isWritable()的判斷
channel是否可用除了校驗channel.isActive()還需要加上channel.isWrite()的判斷,isActive只是保證連接是否激活,而是否可寫由isWrite來決定。
- private void writeBackMessage(ChannelHandlerContext ctx, MqttMessage message) {
- Channel channel = ctx.channel();
- //增加channel.isWritable()的判斷
- if (channel.isActive() && channel.isWritable()) {
- ChannelFuture cf = channel.writeAndFlush(message);
- if (cf.isDone() && cf.cause() != null) {
- LOGGER.error("channelWrite error!", cf.cause());
- ctx.close();
- }
- }
- }
注:isWritable可以來控制ChannelOutboundBuffer,不讓其無限制膨脹。其機制就是利用設(shè)置好的channel高低水位來進行判斷。
問題驗證
修改后再進行測試,發(fā)送到27W次也并不報錯;
解決思路分析
一般Netty數(shù)據(jù)處理流程如下:將讀取的數(shù)據(jù)交由業(yè)務(wù)線程處理,處理完成再發(fā)送出去(整個過程是異步的),Netty為了提高網(wǎng)絡(luò)的吞吐量,在業(yè)務(wù)層與socket之間增加了一個ChannelOutboundBuffer。
在調(diào)用channel.write的時候,所有寫出的數(shù)據(jù)其實并沒有寫到socket,而是先寫到ChannelOutboundBuffer。當(dāng)調(diào)用channel.flush的時候才真正的向socket寫出。因為這中間有一個buffer,就存在速率匹配了,而且這個buffer還是無界的(鏈表),也就是你如果沒有控制channel.write的速度,會有大量的數(shù)據(jù)在這個buffer里堆積,如果又碰到socket寫不出數(shù)據(jù)的時候(isActive此時判斷無效)或者寫得慢的情況。
很有可能的結(jié)果就是資源耗盡,而且如果ChannelOutboundBuffer存放的是
DirectByteBuffer,這會讓問題更加難排查。
流程可抽象如下:
從上面的分析可以看出,步驟一寫太快(快到處理不過來)或者下游發(fā)送不出數(shù)據(jù)都會造成問題,這實際是一個速率匹配問題。
Netty源碼說明
超過高水位
當(dāng)ChannelOutboundBuffer的容量超過高水位設(shè)定閾值后,isWritable()返回false,設(shè)置channel不可寫(setUnwritable),并且觸發(fā)fireChannelWritabilityChanged()。
- private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
- if (size == 0) {
- return;
- }
- long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
- if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
- setUnwritable(invokeLater);
- }
- }
- private void setUnwritable(boolean invokeLater) {
- for (;;) {
- final int oldValue = unwritable;
- final int newValue = oldValue | 1;
- if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
- if (oldValue == 0 && newValue != 0) {
- fireChannelWritabilityChanged(invokeLater);
- }
- break;
- }
- }
- }
低于低水位
當(dāng)ChannelOutboundBuffer的容量低于低水位設(shè)定閾值后,isWritable()返回true,設(shè)置channel可寫,并且觸發(fā)fireChannelWritabilityChanged()。
- private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
- if (size == 0) {
- return;
- }
- long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
- if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
- setWritable(invokeLater);
- }
- }
- private void setWritable(boolean invokeLater) {
- for (;;) {
- final int oldValue = unwritable;
- final int newValue = oldValue & ~1;
- if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
- if (oldValue != 0 && newValue == 0) {
- fireChannelWritabilityChanged(invokeLater);
- }
- break;
- }
- }
- }
總結(jié)
當(dāng)ChannelOutboundBuffer的容量超過高水位設(shè)定閾值后,isWritable()返回false,表明消息產(chǎn)生堆積,需要降低寫入速度。
當(dāng)ChannelOutboundBuffer的容量低于低水位設(shè)定閾值后,isWritable()返回true,表明消息過少,需要提高寫入速度。通過以上三個步驟修改后,部署線上觀察半年未發(fā)生問題出現(xiàn)。
原文鏈接:https://mp.weixin.qq.com/s?__biz=MzI0NzEyODIyOA==&mid=2247487422&idx=1&sn=f9684b46fc38f347036c21b4fb3c3979&utm_source=tuicool&utm_medium=referral