Flink中設計了用戶自定義函數體系(User Defined Function,UDF),開發人員實現業務邏輯就是開發UDF。
一、環境對象
StreamExecutionEnvironment是Flink應用開發時的概念,表示流計算作業的執行環境,是作業開發的入口、數據源接口、生成和轉換DataStream的接口、數據Sink的接口、作業配置接口、作業啟動執行的入口。
Environment是運行時作業級別的概念,從StreamExecutionEnvironment中的配置信息衍生而來。進入到Flink作業執行的時刻,作業需要的是相關的配置信息,如作業的名稱、并行度、作業編號JobID、監控的Metric、容錯的配置信息、IO等,用StreamExecutionRuntime對象就不適合了,很多API是不需要的,所以在Flink中抽象出了Environment作為運行時刻的上下文信息。
RuntimeContext是運行時Task實例級別的概念。Environment本身仍然是比較粗粒度作業級別的配置,對于每一個Task而言,其本身有更細節的配置信息,所以Flink又抽象了RuntimeContext,每一個Task實例有自己的RuntimeContext。
環境對象關系如下:
1.1 執行環境
StreamExecutionEnvironment
Flink流計算應用的執行環境,是Flink作業開發和啟動執行的入口
開發者對StreamExecutionEnvironment的實現是無感知的。
LocalStreamEnvironment
本地執行環境,在單個JVM中使用多線程模擬Flink集群。
其基本的工作流程如下:
1) 執行Flink作業的Main函數生成Streamgraph,轉化為JobGraph。
2) 設置任務運行的配置信息。
3) 根據配置信息啟動對應的LocalFlinkMiniCluster。
4) 根據配置信息和miniCluster生成對應的MiniClusterClient。
5) 通過MiniClusterClient提交JobGraph 到MiniCluster。
RemoteStreamEnvironment
在大規模數據中心中部署的Flink生產集群的執行環境。
當將作業發布到Flink集群的時候,使用RemoteStreamEnvironment。
其基本的工作流程如下:
1) 執行Flink作業的Main函數生成Streamgraph,轉化為JobGraph。
2) 設置任務運行的配置信息。
3) 提交JobGraph到遠程的Flink集群。
StreamContextEnvironment
在Cli命令行或者單元測試時候會被使用,執行步驟同上。
StreamPlanEnvironment
在Flink Web UI管理界面中可視化展現Job的時候,專門用來生成執行計劃(實際上就是StreamGraph)
ScalaShellStreamEnvironment
這是Scala Shell執行環境,可以在命令行中交互式開發Flink作業。
其基本工作流程如下:
- 校驗部署模式,目前Scala Shell僅支持attached模式。
- 上傳每個作業需要的Jar文件。
其余步驟與RemoteStreamEnvironment類似。
1.2 運行時環境
RuntimeEnvironment
在Task開始執行時進行初始化,把Task運行相關的信息都封裝到該對象中,其中不光包含了配置信息,運行時的各種服務也會被包裝到其中。
SavepointEnvironment
SavepointEnvironment是Environment的最小化實現,在狀態處理器的API中使用。
1.3 運行時上下文
RuntimeContext是Function運行時的上下文,封裝了Function運行時可能需要的所有信息,讓Function在運行時能夠獲取到作業級別的信息,如并行度相關信息、Task名稱、執行配置信息(ExecutionConfig)、State等。
Function的每個實例都有一個RuntimeContext對象,在RichFunction中通過getRunctionContext()可以訪問該對象。
RuntimeContext的類體系圖如下:
StreamingRuntimeContext:
在流計算UDF中使用的上下文,用來訪問作業信息、狀態等。
DistributedRuntimeUDFContext:
由運行時UDF所在的批處理算子創建,在DataSet批處理中使用。
RuntimeUDFContext:
在批處理應用的UDF中使用。
SavepointRuntimeContext:
支持對檢查點和保存點進行操作,包括讀取、變更、寫入等。
CepRuntimeContext:
CEP復雜事件處理中使用的上下文。
二、數據流元素
數據流元素在Flink中叫做StreamElement
- 有數據記錄StreamRecord,
- 延遲標記LatencyMarker、Watermark、
- 流狀態標記StreamStatus這四種。
在執行層面,4種數據流元素都被序列化成二進制數據,形成混合的數據流,在算子中將混合數據流中的數據流元素反序列化出來。
StreamRecord
StreamRecord表示數據流中的一條記錄(或者叫做一個事件),也叫數據記錄。
包含以下內容:
1)數據的值本身
2)時間戳(可選)
LatencyMarker
用來近似評估延遲,LatencyMarker在Source中創建,并向下游發送,繞過業務處理邏輯,在Sink節點中使用LatencyMarker估計數據在整個DAG圖中的流轉花費的時間。
LatencyMarker包含信息如下:
1)周期性的在數據源算子中創造出來的時間戳。
2)算子編號
3)數據源算子所在的Task編號
Watermark
是一個時間戳,用來告訴算子所有時間早于等于Watermark的事件或記錄都已經達到,不會再有比Watermark更早的記錄。
StreamStatus
用來通知Task是否會繼續接收到上游的記錄或者Watermark。在數據源算子中生成,向下游沿著DataFlow傳遞。
有兩種表示狀態:
1)空閑狀態(IDLE)
2)活動狀態(ACTIVE)
三、數據轉換
數據轉換在Flink中叫做Transformation,是銜接DataStream Api和Flink內核的邏輯結構。
Transformation有兩大類:
1)物理Transformation: 會轉換成算子,繼承了PhysicalTransformation。
2)虛擬Transformation: 不會轉換成具體算子。
Tranformation包含了Flink的運行時關鍵參數:
1)name:轉換器名稱,主要用于可視化。
2)uid:用戶指定的uid,該uid的主要目的是在job重啟時再次分配跟之前相同的uid,可以持久保存狀態。
3)bufferTimeout:buffer超時時間。
4)parallelism:并行度。
5)id:跟屬性uid無關,生成方式是基于一個靜態累加器。
6)outputType:輸出類型,用來進行序列化數據。
7)slotSharingGroup:給當前的Transformation設置Slot共享組。
3.1 物理Transformation SourceTransformation
從數據源讀取數據的Transformation,是Flink作業的起點。
只有下游Transformation,沒有上游輸入。
SinkTransformation
將數據寫到外部存儲的Transformation,是Flink作業的終點。
OneInputTransformation
單流輸入的Transformation(只接收一個輸入流),跟上面的SinkTransformation構造器類似,同樣需要input和operator參數。
TwoInputTransformation
雙輸入的Transformation(接收兩種流作為輸入),分別叫做第一輸入和第二輸入。
3.2 虛擬Transformation SideOutputTransformation
在旁路輸出中轉換而來,表示上游Transformation的一個分流。
每個sideoutput通過OutputTag標識。
SplitTransformation
用來按條件切分數據流,該轉換用于將一個流拆分成多個流。
SelectTransformation
與SplitTransformation配合使用,用來在下游選擇SplitTransformation切分的數據流。
PartitionTransformation
該轉換器用于改變輸入元素的分區,其名稱為Partition。工作時除了提供一個StreamTransformation作為輸入外,還需要提供一個StreamPartitionor的實例來進行分區。
UnionTransformation
合并轉換器,該轉換器用于將多個輸入StreamTransformation進行合并,因此該轉換器接收StreamTransformation的集合。Union要求上游輸入的數據的結構必須是完全相同的。
FeedbackTransformation
表示FlinkDAG中的一個反饋點。簡單來說,就是把符合條件的數據發回上游Transformation處理,一個反饋點可以連接一個或多個上游的Transformation,這些連接關系叫反饋邊。符合反饋條件并交給上游的Transformation的數據流叫做反饋流。
FeedbackTransformation的固定名稱為Feedback
有兩個重要參數:
- input:上游輸入StreamTransformation
- waitTime:默認為0,即永遠等待,如果設置了等待時間,一旦超過該等待時間,則計算結束并且不再接收數據。
實例化FeedbackTransformation時,會自動創建一個用于存儲反饋邊的集合feedbackEdges。
FeedbackTransformation通過定義一個實力方法addFeedbackEdge來收集,
在加入的StreamTransformation的實例有一個要求,
當前FeedbackTransformation跟待加入的StreamTransformation并行度一致。
CoFeedbackTransformation
與FeedbackTransformation類似,也是FlinkDAG中的一個反饋點。
- 不同之處在于,CoFeedbackTransformation反饋給上游的數據流與上游Transformation的輸入類型不同
- 所以要求上游的Transformation必須是TwoInputTransformation。
四、算子行為
4.1 生命周期管理
1)setup:初始化環境、時間服務、注冊監控等。
2)open:該行為由各個具體的算子負責實現,包含了算子的初始化邏輯。
3)close:所有的數據處理完畢之后關閉算子,此時需要去報將所有的緩存數據向下游發送。
4)dispose:該方法在算子生命周期的最后執行階段,此時算子已經關閉,停止處理數據,進行資源的釋放。
StreamTask作為算子的容器,負責管理算子的生命周期。
4.2 異步算子
異步算子的目的是解決與外部系統交互時網絡延遲所導致的系統瓶頸問題。
異步算子的兩種輸出模式
1)順序輸出
先收到的數據先輸出,后續數據元素的異步函數調用無論是否先完成,都需要等待,順序模式可以保證消息不亂序,但是可能增加延遲,降低算子的吞吐量。
2)無序輸出
先處理完的數據元素先輸出,不保證消息順序,相比于順序模式,無序輸出模式算子延遲低、吞吐量高。無序輸出模式并不是完全無序的,仍然要保持Watermark不能超越其前面數據元素的原則。等待完成隊列將按照Watermakr切分成組,組內可以無序輸出,組之間必須嚴格保證順序。
五、處理函數
5.1 雙流Join 即時Join
邏輯如下:
1) 創建一個State對象
2)接收到輸入流 1事件后更新Sate。
3)接收到輸出流 2的事件后遍歷State,根據Join條件進行匹配,將匹配結果發送到下游。
5.2延遲雙流Join
在流式數據里,數據可能是亂序的,數據會延遲到達,并且為了提供處理效率,使用小批量模式計算,而不是每個事件觸發一次Join計算。
邏輯如下:
1)創建2個state對象,分別緩存輸入流1和輸入流2的事件。
2)創建一個定時器,等待數據的到達,定時延遲觸發Join計算。
3)接收到輸入流1事件后更新State。
4)接收到輸入流2事件后更新State。
5)定時器遍歷State1和State2,根據Join條件進行匹配,將匹配結果發送到下游。
六、數據分區
數據分區在Flink中叫做Partition。
本質上說,分布式計算就是把一個作業切分成子任務Task,將不同的數據交給不同的Task計算。
StreamParitioner
是Flink中的數據流分區抽象接口,決定了在實際運行中的數據流分發模式。
自定義分區
使用用戶自定義分區函數,為每一個元組選擇目標分區。
ForwardParitioner
用于在同一個OperatorChain中上下游算子之間的數據轉發, 實際上數據是直接傳遞給下游的。
ShufflePartitioner
隨機將元素進行分區,可以確保下游的Task能夠均勻的獲取數據。
ReblancePartitioner
以Round-robin的方式為每個元素分配分區,確保下游的Task可以均勻的獲取數據,以免數據傾斜。
RescalingPartitioner
根據上下游Task的數據進行分區。
使用Round-robin選擇下游的一個Task進行數據分區,
如上游有2個Source,下游有6個Map,那么每個Source會分配3個固定下游的map,
不會向未分配給自己的分區寫入數據。
BroadcastPartitioner
將該記錄廣播給所有分區,即有N個分區,就把數據復制N份,每個分區1分
KeyGroupStreamPartitioner
keyedStream根據KeyGroup索引編號進行分區,該分區器不是提供給用戶來用。
KeyedStream在構造Transformation的時候默認使用KeyedGroup分區形式,從而在底層上支持作業Rescale功能。
七、分布式ID
到此這篇關于解析Flink內核原理與實現核心抽象的文章就介紹到這了,更多相關Flink內核原理核心抽象內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://blog.csdn.net/qq_34635236/article/details/119839288