本文由網(wǎng)易云音樂實(shí)時(shí)計(jì)算平臺(tái)研發(fā)工程師岳猛分享,主要從以下四個(gè)部分將為大家介紹 Flink + Kafka 在網(wǎng)易云音樂的應(yīng)用實(shí)戰(zhàn):
背景
- Flink + Kafka 平臺(tái)化設(shè)計(jì)
- Kafka 在實(shí)時(shí)數(shù)倉中的應(yīng)用
- 問題 & 改進(jìn)
一、背景介紹
1、流平臺(tái)通用框架
目前流平臺(tái)通用的架構(gòu)一般來說包括消息隊(duì)列、計(jì)算引擎和存儲(chǔ)三部分,通用架構(gòu)如下圖所示。客戶端或者 web 的 log 日志會(huì)被采集到消息隊(duì)列;計(jì)算引擎實(shí)時(shí)計(jì)算消息隊(duì)列的數(shù)據(jù);實(shí)時(shí)計(jì)算結(jié)果以 Append 或者 Update 的形式存放到實(shí)時(shí)存儲(chǔ)系統(tǒng)中去。
目前,我們常用的消息隊(duì)列是 Kafka,計(jì)算引擎一開始我們采用的是 Spark Streaming,隨著 Flink 在流計(jì)算引擎的優(yōu)勢(shì)越來越明顯,我們最終確定了 Flink 作為我們統(tǒng)一的實(shí)時(shí)計(jì)算引擎。
2、為什么選 Kafka?
Kafka 是一個(gè)比較早的消息隊(duì)列,但是它是一個(gè)非常穩(wěn)定的消息隊(duì)列,有著眾多的用戶群體,網(wǎng)易也是其中之一。我們考慮 Kafka 作為我們消息中間件的主要原因如下:
- 高吞吐,低延遲:每秒幾十萬 QPS 且毫秒級(jí)延遲;
- 高并發(fā):支持?jǐn)?shù)千客戶端同時(shí)讀寫;
- 容錯(cuò)性,可高性:支持?jǐn)?shù)據(jù)備份,允許節(jié)點(diǎn)丟失;
- 可擴(kuò)展性:支持熱擴(kuò)展,不會(huì)影響當(dāng)前線上業(yè)務(wù)。
3、為什么選擇 Flink?
Apache Flink 是近年來越來越流行的一款開源大數(shù)據(jù)流式計(jì)算引擎,它同時(shí)支持了批處理和流處理,考慮 Flink 作為我們流式計(jì)算引擎的主要因素是:
- 高吞吐,低延遲,高性能;
- 高度靈活的流式窗口;
- 狀態(tài)計(jì)算的 Exactly-once 語義;
- 輕量級(jí)的容錯(cuò)機(jī)制;
- 支持 EventTime 及亂序事件;
- 流批統(tǒng)一引擎。
4、Kafka + Flink 流計(jì)算體系
基于 Kafka 和 Flink 的在消息中間件以及流式計(jì)算方面的耀眼表現(xiàn),于是產(chǎn)生了圍繞 Kafka 及 Flink 為基礎(chǔ)的流計(jì)算平臺(tái)體系,如下圖所示:基于 APP、web 等方式將實(shí)時(shí)產(chǎn)生的日志采集到 Kafka,然后交由 Flink 來進(jìn)行常見的 ETL,全局聚合以及Window 聚合等實(shí)時(shí)計(jì)算。
5、網(wǎng)易云音樂使用 Kafka 的現(xiàn)狀
目前我們有 10+個(gè) Kafka 集群,各個(gè)集群的主要任務(wù)不同,有些作為業(yè)務(wù)集群,有些作為鏡像集群,有些作為計(jì)算集群等。當(dāng)前 Kafka 集群的總節(jié)點(diǎn)數(shù)達(dá)到 200+,單 Kafka 峰值 QPS 400W+。目前,網(wǎng)易云音樂基于 Kafka+Flink 的實(shí)時(shí)任務(wù)達(dá)到了 500+。
二、Flink+Kafka 平臺(tái)化設(shè)計(jì)
基于以上情況,我們想要對(duì) Kafka+Flink 做一個(gè)平臺(tái)化的開發(fā),減少用戶的開發(fā)成本和運(yùn)維成本。實(shí)際上在 2018 年的時(shí)候我們就開始基于 Flink 做一個(gè)實(shí)時(shí)計(jì)算平臺(tái),Kafka 在其中發(fā)揮著重要作用,今年,為了讓用戶更加方便、更加容易的去使用 Flink 和 Kafka,我們進(jìn)行了重構(gòu)。
基于 Flink 1.0 版本我們做了一個(gè) Magina 版本的重構(gòu),在 API 層次我們提供了 Magina SQL 和 Magina SDK 貫穿 DataStream 和 SQL 操作;然后通過自定義 Magina SQL Parser 會(huì)把這些 SQL 轉(zhuǎn)換成 Logical Plan,在將 LogicalPlan 轉(zhuǎn)化為物理執(zhí)行代碼,在這過程中會(huì)去通過 catalog 連接元數(shù)據(jù)管理中心去獲取一些元數(shù)據(jù)的信息。我們?cè)?Kafka 的使用過程中,會(huì)將 Kafka 元數(shù)據(jù)信息登記到元數(shù)據(jù)中心,對(duì)實(shí)時(shí)數(shù)據(jù)的訪問都是以流表的形式。在 Magina 中我們對(duì) Kafka 的使用主要做了三部分的工作:
- 集群 catalog 化;
- Topic 流表化;
- Message Schema 化。
用戶可以在元數(shù)據(jù)管理中心登記不同的表信息或者 catalog 信息等,也可以在 DB 中創(chuàng)建和維護(hù) Kafka 的表,用戶在使用的過程只需要根據(jù)個(gè)人需求使用相應(yīng)的表即可。下圖是對(duì) Kafka 流表的主要引用邏輯。
三、Kafka 在實(shí)時(shí)數(shù)倉中的應(yīng)用
1、在解決問題中發(fā)展
Kafka 在實(shí)時(shí)數(shù)倉使用的過程中,我們遇到了不同的問題,中間也嘗試了不同的解決辦法。
在平臺(tái)初期, 最開始用于實(shí)時(shí)計(jì)算的只有兩個(gè)集群,且有一個(gè)采集集群,單 Topic 數(shù)據(jù)量非常大;不同的實(shí)時(shí)任務(wù)都會(huì)消費(fèi)同一個(gè)大數(shù)據(jù)量的 Topic,Kafka 集群 IO 壓力異常大;
因此,在使用的過程發(fā)現(xiàn) Kafka 的壓力異常大,經(jīng)常出現(xiàn)延遲、I/O 飆升。
我們想到把大的 Topic 進(jìn)行實(shí)時(shí)分發(fā)來解決上面的問題,基于 Flink 1.5 設(shè)計(jì)了如下圖所示的數(shù)據(jù)分發(fā)的程序,也就是實(shí)時(shí)數(shù)倉的雛形。基于這種將大的 Topic 分發(fā)成小的 Topic 的方法,大大減輕了集群的壓力,提升了性能,另外,最初使用的是靜態(tài)的分發(fā)規(guī)則,后期需要添加規(guī)則的時(shí)候要進(jìn)行任務(wù)的重啟,對(duì)業(yè)務(wù)影響比較大,之后我們考慮了使用動(dòng)態(tài)規(guī)則來完成數(shù)據(jù)分發(fā)的任務(wù)。
解決了平臺(tái)初期遇到的問題之后,在平臺(tái)進(jìn)階過程中 Kafka 又面臨新的問題:
- 雖然進(jìn)行了集群的擴(kuò)展,但是任務(wù)量也在增加,Kafka 集群壓力仍然不斷上升;
- 集群壓力上升有時(shí)候出現(xiàn) I/O 相關(guān)問題,消費(fèi)任務(wù)之間容易相互影響;
- 用戶消費(fèi)不同的 Topic 過程沒有中間數(shù)據(jù)的落地,容易造成重復(fù)消費(fèi);
- 任務(wù)遷移 Kafka 困難。
針對(duì)以上問題,我們進(jìn)行了如下圖所示的 Kafka 集群隔離和數(shù)據(jù)分層處理。其過程簡(jiǎn)單來說,將集群分成 DS 集群、日志采集集群、分發(fā)集群,數(shù)據(jù)通過分發(fā)服務(wù)分發(fā)到 Flink 進(jìn)行處理,然后通過數(shù)據(jù)清洗進(jìn)入到 DW 集群,同時(shí)在 DW 寫的過程中會(huì)同步到鏡像集群,在這個(gè)過程中也會(huì)利用 Flink 進(jìn)行實(shí)時(shí)計(jì)算的統(tǒng)計(jì)和拼接,并將生成的 ADS 數(shù)據(jù)寫入在線 ADS 集群和統(tǒng)計(jì) ADS 集群。通過上面的過程,確保了對(duì)實(shí)時(shí)計(jì)算要求比較高的任務(wù)不會(huì)受到統(tǒng)計(jì)報(bào)表的影響。
通過上面的過程,確保了對(duì)實(shí)時(shí)計(jì)算要求比較高的任務(wù)不會(huì)受到統(tǒng)計(jì)報(bào)表的影響。但是我們分發(fā)了不同的集群以后就不可避免的面臨新的問題:
- 如何感知 Kafka 集群狀態(tài)?
- 如何快速分析 Job 消費(fèi)異常?
針對(duì)上面兩個(gè)問題,我們做了一個(gè) Kafka 監(jiān)控系統(tǒng),其監(jiān)控分為如下兩個(gè)維度,這樣在出現(xiàn)異常的時(shí)候就可以進(jìn)行具體判斷出現(xiàn)問題的詳細(xì)情況:
- 集群概況的監(jiān)控:可以看到不同集群對(duì)應(yīng)的 Topic 數(shù)量以及運(yùn)行任務(wù)數(shù)量,以及每個(gè) Topic 消費(fèi)任務(wù)數(shù)據(jù)量、數(shù)據(jù)流入量、流入總量和平均每條數(shù)據(jù)大小;
- 指標(biāo)監(jiān)控:可以看到 Flink 任務(wù)以及對(duì)應(yīng)的 Topic、GroupID、所屬集群、啟動(dòng)時(shí)間、輸入帶寬、InTPS、OutTPS、消費(fèi)延遲以及 Lag 情況。
2、Flink + Kafka 在 Lambda 架構(gòu)下的運(yùn)用
流批統(tǒng)一是目前非常火的概念,很多公司也在考慮這方面的應(yīng)用,目前常用的架構(gòu)要么是 Lambda 架構(gòu),要么是 Kappa 架構(gòu)。對(duì)于流批統(tǒng)一來講需要考慮的包括存儲(chǔ)統(tǒng)一和計(jì)算引擎統(tǒng)一,由于我們當(dāng)前基建沒有統(tǒng)一的存儲(chǔ),那么我們只能選擇了 Lamda 架構(gòu)。
下圖是基于 Flink 和 Kafka 的 Lambda 架構(gòu)在云音樂的具體實(shí)踐,上層是實(shí)時(shí)計(jì)算,下層是離線計(jì)算,橫向是按計(jì)算引擎來分,縱向是按實(shí)時(shí)數(shù)倉來區(qū)分。
四、問題&改進(jìn)
在具體的應(yīng)用過程中,我們也遇到了很多問題,最主要的兩個(gè)問題是:
- 多 Sink 下 Kafka Source 重復(fù)消費(fèi)問題;
- 同交換機(jī)流量激增消費(fèi)計(jì)算延遲問題。
1、多 Sink 下 Kafka Source 重復(fù)消費(fèi)問題
Magina 平臺(tái)上支持多 Sink,也就是說在操作的過程中可以將中間的任意結(jié)果插入到不同的存儲(chǔ)中。這個(gè)過程中就會(huì)出現(xiàn)一個(gè)問題,比如同一個(gè)中間結(jié)果,我們把不同的部分插入到不同的存儲(chǔ)中,那么就會(huì)有多條 DAG,雖然都是臨時(shí)結(jié)果,但是也會(huì)造成 Kafka Source 的重復(fù)消費(fèi),對(duì)性能和資源造成極大的浪費(fèi)。
于是我們想,是否可以避免臨時(shí)中間結(jié)果的多次消費(fèi)。在 1.9 版本之前,我們進(jìn)行了 StreamGraph 的重建,將三個(gè) DataSource 的 DAG 進(jìn)行了合并;在 1.9 版本,Magina 自己也提供了一個(gè)查詢和 Source 合并的優(yōu)化;但是我們發(fā)現(xiàn)如果是在同一個(gè) data update 中有對(duì)同一個(gè)表的多個(gè) Source 的引用,它自己會(huì)合并,但是如果不是在同一個(gè) data update 中,是不會(huì)立即合并的,于是在 1.9 版本之后中我們對(duì) modifyOperations 做了一個(gè) buffer 來解決這個(gè)問題。
2、同交換機(jī)流量激增消費(fèi)計(jì)算延遲問題
這個(gè)問題是最近才出現(xiàn)的問題,也可能不僅僅是同交換機(jī),同機(jī)房的情況也可能。在同一個(gè)交換機(jī)下我們部署了很多機(jī)器,一部分機(jī)器部署了 Kafka 集群,還有一部分部署了 Hadoop 集群。在 Hadoop 上面我們可能會(huì)進(jìn)行 Spark、Hive 的離線計(jì)算以及 Flink 的實(shí)時(shí)計(jì)算,F(xiàn)link 也會(huì)消費(fèi) Kafka 進(jìn)行實(shí)時(shí)計(jì)算。在運(yùn)行的過程中我們發(fā)現(xiàn)某一個(gè)任務(wù)會(huì)出現(xiàn)整體延遲的情況,排查過后沒有發(fā)現(xiàn)其他的異常,除了交換機(jī)在某一個(gè)時(shí)間點(diǎn)的瀏覽激增,進(jìn)一步排查發(fā)現(xiàn)是離線計(jì)算的瀏覽激增,又因?yàn)橥粋€(gè)交換機(jī)的帶寬限制,影響到了 Flink 的實(shí)時(shí)計(jì)算。
為解決這個(gè)問題,我們就考慮要避免離線集群和實(shí)時(shí)集群的相互影響,去做交換機(jī)部署或者機(jī)器部署的優(yōu)化,比如離線集群?jiǎn)为?dú)使用一個(gè)交換機(jī),Kafka 和 Flink 集群也單獨(dú)使用一個(gè)交換機(jī),從硬件層面保證兩者之間不會(huì)相互影響。
Q&A
Q1:Kafka 在實(shí)時(shí)數(shù)倉中的數(shù)據(jù)可靠嗎?
A1:這個(gè)問題的答案更多取決于對(duì)數(shù)據(jù)準(zhǔn)確性的定義,不同的標(biāo)準(zhǔn)可能得到不同的答案。自己首先要定義好數(shù)據(jù)在什么情況下是可靠的,另外要在處理過程中有一個(gè)很好的容錯(cuò)機(jī)制。
Q2:我們?cè)趯W(xué)習(xí)的時(shí)候如何去學(xué)習(xí)這些企業(yè)中遇到的問題?如何去積累這些問題?
A2:個(gè)人認(rèn)為學(xué)習(xí)的過程是問題推動(dòng),遇到了問題去思考解決它,在解決的過程中去積累經(jīng)驗(yàn)和自己的不足之處。
Q3:你們?cè)谔幚?Kafka 的過程中,異常的數(shù)據(jù)怎么處理,有檢測(cè)機(jī)制嗎?
A3:在運(yùn)行的過程中我們有一個(gè)分發(fā)的服務(wù),在分發(fā)的過程中我們會(huì)根據(jù)一定的規(guī)則來檢測(cè)哪些數(shù)據(jù)是異常的,哪些是正常的,然后將異常的數(shù)據(jù)單獨(dú)分發(fā)到一個(gè)異常的 Topic 中去做查詢等,后期用戶在使用的過程中可以根據(jù)相關(guān)指標(biāo)和關(guān)鍵詞到異常的 Topic 中去查看這些數(shù)據(jù)。
原文地址:https://mp.weixin.qq.com/s/0wF_C8mpYBb8KFB0KhwodA