數據倉庫有四個基本的特征:面向主題的、集成的、相對穩定的、反映歷史變化的。其中數據集成是數據倉庫構建的首要前提,指將多個分散的、異構的數據源整合在一起以便于后續的數據分析。將數據集成過程平臺化,將極大提升數據開發人員的效率,本文主要內容為:
- 數據集成 VS 數據同步
- 集成需求
- 數據集成 V1
- 數據集成 V2
- 線上效果
- 總結
A data warehouse is a subject-oriented, integrated, nonvolatile, and time-variant collection of data in support of management’s decisions.
—— Bill Inmon
一、數據集成 VS 數據同步
- 「數據集成」往往和「數據同步」在概念上存在一定的混淆,為此我們對這二者進行了區分:
- 「數據集成」特指面向數據倉庫 ODS 層的數據同步過程;
- 「數據同步」面向的是一般化的 Source 到 Sink 的數據傳輸過程。
二者的關系如下圖所示:
「數據同步平臺」提供基礎能力,不摻雜具體的業務邏輯。
「數據集成平臺」是構建在「數據同步平臺」之上的,除了將原始數據同步之外還包含了一些聚合的邏輯 (如通過數據庫的日志數據對快照數據進行恢復,下文將會詳細展開) 以及數倉規范相關的內容 (如數倉 ODS 層庫表命名規范) 等。
目前「數據同步平臺」的建設正在我們的規劃之中,但這并不影響「數據集成平臺」的搭建,一些同步的需求可提前在「實時計算平臺」創建,以「約定」的方式解耦。
值得一提的是「數據集成」也應當涵蓋「數據采集」(由特定的工具支持) 和「數據清洗」(由采集粒度、日志規范等因素決定) 兩部分內容,這兩部分內容各個公司都有自己的實現,本文將不做詳細介紹。
二、集成需求
目前伴魚內部數據的集成需求主要體現在三塊:Stat Log (業務標準化日志或稱統計日志)、TiDB 及 MongoDB。除此之外還有一些 Service Log、Nginx Log 等,此類不具備代表性不在本文介紹。另外,由于實時數倉正處于建設過程中,目前「數據集成平臺」只涵蓋離線數倉 (Hive)。
Stat Log:業務落盤的日志將由 FileBeat 組件收集至 Kafka。由于日志為 Append Only 類型, 因此 Stat Log 集成相對簡單,只需將 Kafka 數據同步至 Hive 即可。
DB (TiDB、MongoDB):DB 數據相對麻煩,核心訴求是數倉中能夠存在業務數據庫的鏡像,即存在業務數據庫中某一時刻(天級 or 小時級)的數據快照,當然有時也有對數據變更過程的分析需求。因此 DB 數據集成需要將這兩個方面都考慮進去。
由于以上兩種類型的數據集成方式差異較大,下文將分別予以討論。
三、數據集成 V1
伴魚早期「數據集成平臺」已具備雛形,這個階段主要是借助一系列開源的工具實現。隨著時間推進,這個版本暴露的問題也逐漸增多,接下來將主要從數據流的角度對 V1 進行闡述,更多的細節問題將在 V2 版本的設計中體現。
3.1 Stat Log
日志的集成并未接入平臺,而是煙囪式的開發方式,數據集成的鏈路如下圖所示:
Kafka 中的數據先經過 Flume 同步至 HDFS,再由 Spark 任務將數據從 HDFS 導入至 Hive 并創建分區。整體鏈路較長且引入了第三方組件(Flume)增加了運維的成本,另外 Kafka 的原始數據在 HDFS 冗余存儲也增加了存儲的開銷。
3.2 DB
DB 數據的集成主要是基于查詢的方式(批的方式,通過 Select 查詢進行全表掃描得到快照數據)實現,其鏈路如下圖所示:
用戶通過平臺提交集成任務,由 Airflow 定時任務掃描集成平臺元數據庫,生成對應的取數任務 (TiDB 的數據通過 Sqoop 工具,MongoDB 的數據則通過 Mongoexport 工具)。可以看到 V1 版本并沒有獲取數據庫的變更的日志數據,不能滿足對數據變更過程的分析訴求。
由于 Sqoop 任務最終要從 TiDB 生產環境的業務數據庫獲取數據,數據量大的情況下勢必對業務數據庫造成一定的影響。Mongoexport 任務直接作用在 MongoDB 的隱藏節點 (無業務數據請求),對于線上業務的影響可以忽略不計。基于此,DBA 單獨搭建了一套 TiDB 大數據集群,用于將體量較大的業務數據庫同步至此 (基于 TiDB Pump 和 Drainer 組件),因此部分 Sqoop 任務可以從此集群拉群數據以消除對業務數據庫的影響。從數據流的角度,整個過程如下圖所示:
是否將生產環境 TiDB 業務數據庫同步至 TiDB 大數據集群由數倉的需求以及 DBA 對于數據量評估決定。可以看出,這種形式也存在著大量數據的冗余,集群的資源隨著同步任務的增加時長達到瓶頸。并且隨著后續的演進,TiDB 大數據集群也涵蓋一部分數據應用生產環境的業務數據庫,集群作用域逐漸模糊。
四、數據集成 V2
V2 版本我們引入了 Flink,將同步的鏈路進行了簡化,DB 數據集成從之前的基于查詢的方式改成了基于日志的方式 (流的方式),大大降低了冗余的存儲。
4.1 Stat Log
借助 Flink 1.11 版本后對于 Hive Integration 的支持,我們可以輕松的將 Kafka 的數據寫入 Hive,因此 Stat Log 的集成也就變得異常簡單 (相比 V1 版本,去除了對 Flume 組件的依賴,數據冗余也消除了),同時 Flink Exactly-Once 的語義也確保了數據的準確性。從數據流的角度,整個過程如下圖所示:
目前按照小時粒度生成日志分區,幾項 Flink 任務配置參數如下:
- checkpoint: 10 min
- watermark: 1 min
- partition.time-extractor.kind: ‘custom’
- sink.partition-commit.delay: ‘3600s’
- sink.partition-commit.policy.kind: ‘metastore,success-file’
- sink.partition-commit.trigger: ‘partition-time’
4.2 DB
基于日志的方式對 DB 數據進行集成,意味著需要采集 DB 的日志數據,在我們目前的實現中 TiDB 基于 Pump 和 Drainer 組件(目前生產環境數據庫集群版本暫不支持開啟 TICDC),MongoDB 基于 MongoShake 組件,采集的數據將輸送至 Kafka。
采用這種方式,一方面降低了業務數據庫的查詢壓力,另一方面可以捕捉數據的變更過程,同時冗余的數據存儲也消除了。不過由于原始數據是日志數據,需要通過一定的手段還原出快照數據。新的鏈路如下圖所示:
用戶提交集成任務后將同步創建三個任務:
- 增量任務 (流):「增量任務」將 DB 日志數據由 Kafka 同步至 Hive。由于采集組件都是按照集群粒度進行采集,且集群數量有限,目前都是手動的方式將同步的任務在「實時計算平臺」創建,集成任務創建時默認假定同步任務已經 ready,待「數據同步平臺」落地后可以同步做更多的自動化操作和校驗。
- 存量任務 (批):要想還原出快照數據則至少需要一份初始的快照數據,因此「存量任務」的目的是從業務數據庫拉取集成時數據的初始快照數據。
- Merge 任務 (批):「Merge 任務」將存量數據和增量數據進行聚合以還原快照數據。還原后的快照數據可作為下一日的存量,因此「存量任務」只需調度執行一次,獲取初始快照數據即可。
「存量任務」和「Merge 任務」由離線調度平臺 Dolphinscheduler (簡稱 DS) 調度執行,任務執行過程中將從集成任務的元數據庫中獲取所需的信息。目前「Merge 任務」按小時粒度調度,即每小時還原快照數據。
從數據流的角度,整個過程如下圖所示:
DB 的數據集成相較于 Stat Log 復雜性高,接下來以 TiDB 的數據集成為例講述設計過程中的一些要點 (MongoDB 流程類似,區別在于存量同步工具及數據解析)。
4.2.1 需求表達
對于用戶而言,集成任務需要提供以下兩類信息:
- TiDB 源信息:包括集群、庫、表。
- 集成方式:集成方式表示的是快照數據的聚合粒度,包括全量和增量。全量表示需要將存量的快照數據與今日的增量日志數據聚合,而增量表示只需要將今日的增量日志數據聚合 (即便增量方式無需和存量的快照數據聚合,但初始存量的獲取依舊是有必要的,具體的使用形式由數倉人員自行決定)。
4.2.2 存量任務
存量任務雖然有且僅執行一次,但為了完全消除數據集成對業務數據庫的影響,我們選擇數據庫的備份-恢復機制來實現。公司內部數據庫的備份和恢復操作已經平臺化,集群將定期進行備份 (天粒度),通過平臺可以查詢到集群的最新備份,并且可由接口觸發備份恢復操作,故存量的獲取可直接作用于恢復的數據庫。
由于數據庫備份的時間點與集成任務提交的時間點并不一定是同一天,這之間存在著一定的時間差將導致存量快照數據不符合我們的預期,各時間點的關系如下圖所示:
按照我們的設定,存量快照數據應當是包含 T4 之前的全部數據,而實際備份的快照數據僅包含 T1 之前的全部數據,這之間存在這 N 天的數據差。
注:這里之所以不說數據差集為 T1 至 T4 區間的數據,是因為增量的 Binlog 數據是以整點為分區的,在 Merge 的時候也是將整點的分區數據與存量數據進行聚合,并支持了數據去重。因此 T1 時刻的存量數據與 T0-T3 之間的增量數據的 Merge 結果等效于 T0 時刻的存量數據與 T0-T3 之間的增量數據的 Merge 結果。所以 T1 至 T4 的數據差集等效 T0 至 T3 的數據差集,即圖示中的 N 天數據。
對于缺失的這部分數據實則是可以在「存量任務」中進行補全,仔細分析這其實是可以通過執行的 「Merge 任務」的補數操作實現。
整個「存量任務」的工作流如下圖所示:
- 同步觸發數據庫平臺進行備份恢復,產生回執 ID;
- 通過回執 ID 輪訓備份恢復狀態,恢復失敗需要 DBA 定位異常,故將下線整個工作流,待恢復成功可在平臺重新恢復執行「存量任務」。恢復進行中,工作流直接退出,借助 DS 定時調度等待下次喚醒。恢復成功,進入后續邏輯;
- 從恢復庫中拉取存量,判定存量是否存在數據差,若存在則執行 Merge 任務的補數操作,整個操作可冪等執行,如若失敗退出此次工作流,等待下次調度;
- 成功,下線整個工作流,任務完成。
4.2.3 Merge 任務
Merge 任務的前提是存量數據與增量數據都已經 ready,我們通過 _SUCCESS 文件進行標記。整個「Merge 任務」的工作流如下圖所示:
- 校驗文件標記是否存在,若不存在說明數據未 ready ,進行報警并退出工作流等待下次調度;
- 執行 Merge 操作,失敗報警并退出工作流等待下次調度;
- 成功,退出工作流等待下次調度。
Merge 操作通過 Flink DataSet API 實現。核心邏輯如下:
- 加載存量、增量數據,統一數據格式(核心字段:主鍵 Key 作為同一條數據的聚合字段;CommitTs 標識 binlog 的提交時間,存量數據默認為 0 早于增量數據;OpType 標識數據操作類型,包括:Insert、Update、Delete,存量數據默認為 Insert 類型),將兩份數據進行 union;
- 按照主鍵聚合;
- 保留聚合后 CommitTs 最大的數據條目,其余丟棄;
- 過濾 OpType 為 Delete 類型的數據條目;
- 輸出聚合結果。
核心代碼:
- allMergedData.groupBy(x -> x.getKeyCols())
-
.reduce(new ReduceFunction
() { - public MergeTransform reduce(MergeTransform value1, MergeTransform value2) throws Exception {
- if (value1.getCommitTS() > value2.getCommitTS()){
- return value1;
- }
- return value2;
- }
- })
-
.filter(new FilterFunction
() { //增量:過濾掉 op=delete - public boolean filter(MergeTransform merge) throws Exception {
- if (merge.getOpType().equals(OPType.DELETE)){
- return false;
- }
- return true;
- }
- })
- .map(x -> x.getHiveColsText())
- .writeAsText(outPath);
主要思想為「后來者居上」,針對于 Insert、Update 操作,最新值直接覆蓋舊值,針對 Delete 操作,直接丟棄。這種方式也天然的實現了數據去重操作。
4.2.4 容錯性與數據一致性保證
我們大體可以從三個任務故障場景下的處理方式來驗證方案的容錯性。
- 「存量任務」異常失敗:通常是備份恢復失敗導致,DS 任務將發送失敗報警,因「數據庫平臺」暫不支持恢復重試,需人工介入處理。同時「Merge 任務」檢測不到存量的 _SUCCESS 標記,工作流不會向后推進。
- 「增量任務」異常失敗:Flink 自身的容錯機制以及「實時計算平臺」的外部檢測機制保障「增量任務」的容錯性。若在「Merge 任務」調度執行期間「增量任務」尚未恢復,將誤以為該小時無增量數據跳過執行,此時相當于快照更新延遲(Merge 是將全天的增量數據與存量聚合,在之后的調度時間點如果「增量任務」恢復又可以聚合得到最新的快照),或者在「增量任務」恢復后可人為觸發「Merge 任務」補數。
- 「Merge 任務」異常失敗:任務具有冪等性,通過設置 DS 任務失敗后的重試機制保障容錯性,同時發送失敗報警。
以上,通過自動恢復機制和報警機制確保了整個工作流的正確執行。接下來我們可以從數據的角度看一下方案對于一致性的保障。
數據的一致性體現在 Merge 操作。兩份數據聚合,從代碼層面一定可以確保算法的正確性 (這是可驗證的、可測試的),那么唯一可能導致數據不一致的情況出現在兩份輸入的數據上,即存量和增量,存在兩種情況:
- 存量和增量數據有交疊:體現在初始存量與整點的增量數據聚合場景,由于算法天然的去重性可以保證數據的一致。
- 存量和增量數據有缺失:體現在增量數據的缺失上,而增量數據是由 Flink 將 Kafka 數據寫入 Hive 的,這個過程中是有一定的可能性造成數據的不一致,即分區提交后的亂序數據。雖然說亂序數據到來后的下一次 checkpoint 時間點分區將再次提交,但下游任務一般是檢測到首次分區提交就會觸發執行,造成下游任務的數據不一致。
針對 Flink 流式寫 Hive 過程中的亂序數據處理可以采取兩種手段:
- 一是 Kafka 設置單分區,多分區是產生導致亂序的根因,通過避免多分區消除數據亂序。
- 二是報警補償,亂序一旦產生流式任務是無法完全避免的 (可通過 watermark 設置亂序容忍時間,但終有一個界限),那么只能通過報警做事后補償。
問題轉換成了如何感知到亂序,我們可以進一步分析,既然亂序數據會觸發前一個分區的二次提交,那么只需要在提交分區的時候檢測前一個分區是否存在 _SUCCESS 標記便可以知曉是否是亂序數據以及觸發報警。
五、線上效果
總覽
存量任務
Merge 任務
六、總結
本文闡述了伴魚「數據集成平臺」核心設計思路,整個方案還有一些細節未在文章中體現,如數據 Schema 的變更、DB 日志數據的解析等,這些細節對于平臺構建也至關重要。目前伴魚絕大部分的集成任務已切換至新的方式并穩定運行。我們也正在推進實時數倉集成任務的接入,以提供更統一的體驗。
原文地址:https://mp.weixin.qq.com/s/THWRYEgfXwP6-pRYFQorkg