Flink主要組件
作業管理器(JobManager)
(1) 控制一個應用程序執行的主進程,也就是說,每個應用程序 都會被一個不同的Jobmanager所控制執行
(2) Jobmanager會先接收到要執行的應用程序,這個應用程序會包括:作業圖( Job Graph)、邏輯數據流圖( ogical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。
(3) Jobmanager會把 Jobgraph轉換成一個物理層面的 數據流圖,這個圖被叫做 “執行圖”(Executiongraph),包含了所有可以并發執行的任務。Job Manager會向資源管理器( Resourcemanager)請求執行任務必要的資源,也就是 任務管理器(Taskmanager)上的插槽slot。一旦它獲取到了足夠的資源,就會將執行圖分發到真正運行它們的 Taskmanager上。而在運行過程中Jobmanagera會負責所有需要中央協調的操作,比如說檢查點(checkpoints)的協調。
任務管理器(Taskmanager)
(1) Flink中的工作進程。通常在 Flink中會有多個 Taskmanageria運行, 每個 Taskmanageri都包含了一定數量的插槽( slots)。插槽的數量限制了Taskmanageri能夠執行的任務數量。
(2) 啟動之后, Taskmanager會向資源管理器注冊它的插槽;收到資源管理器的指令后, Taskmanageri就會將一個或者多個插槽提供給Jobmanageri調用。Jobmanager就可以向插槽分配任務( tasks)來執行了。
(3) 在執行過程中, 一個 Taskmanagera可以跟其它運行同一應用程序的Taskmanager交換數據。
資源管理器(Resource Manager)
(1) 主要負責管理任務管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定義的處理資源單元。
(2) Flink 為不同的環境和資源管理工具提供了不同資源管理器,比如YARNMesos、K8s,以及 standalone部署。
(3) 當 Jobmanager申請插槽資源時, Resourcemanager會將有空閑插槽的Taskmanager?分配給Jobmanager。如果 Resourcemanagery沒有足夠的插槽來滿足 Jobmanager的請求, 它還可以向資源提供平臺發起會話,以提供啟動 Taskmanager進程的容器。
分發器(Dispatcher)
(1) 可以跨作業運行,它為應用提交提供了REST接口。
(2)當一個應用被提交執行時,分發器就會啟動并將應用移交給Jobmanage
(3) Dispatcher他會啟動一個 WebUi,用來方便地 展示和監控作業執行的信息。
任務提交流程
- 提交應用
- 啟動并提交應用
- 請求slots
- 任務啟動
- 注冊slots
- 發出提供slot的指令
- 提供slots
- 提交要在slots中執行的任務
- 交換數據
任務提交流程(YARN)
a. Flink任務提交后,Client向HDFS上傳Flink的Jar包和配置
b. 隨后向 Yarn ResourceManager提交任務ResourceManager分配 Container資源并通知對應的NodeManager啟動
c. ApplicationMaster,ApplicationMaster 啟動后加載Flink的Jar包和配置構建環境
d. 然后啟動JobManager , 之后ApplicationMaster 向ResourceManager 申請資源啟動TaskManager
e. ResourceManager 分配 Container 資源后 , 由ApplicationMaster通知資源所在節點的NodeManager啟動TaskManager
f. NodeManager 加載 Flink 的 Jar 包和配置構建環境并啟動 TaskManager
g. TaskManager 啟動后向 JobManager 發送心跳包,并等待 JobManager 向其分配任務。
源碼分析--集群啟動 JobManager 啟動分析
JobManager 的內部包含非常重要的三大組件
- WebMonitorEndpoint
- ResourceManager
- Dispatcher
入口,啟動主類:StandaloneSessionClusterEntrypoint
- // 入 口
- StandaloneSessionClusterEntrypoint.main() ClusterEntrypoint.runClusterEntrypoint(entrypoint);
- clusterEntrypoint.startCluster();
- runCluster(configuration, pluginManager);
- // 第一步:初始化各種服務
- /**
- * 初始化了 主節點對外提供服務的時候所需要的 三大核心組件啟動時所需要的基礎服務
- * 初始化服務,如 JobManager 的 Akka RPC 服務,HA 服務,心跳檢查服務,metric service
- * 這些服務都是 Master 節點要使用到的一些服務
- * 1、commonRpcService: 基于 Akka 的 RpcService 實現。RPC 服務啟動 Akka 參與者來接收從 RpcGateway 調用 RPC
- * 2、haServices: 提供對高可用性所需的所有服務的訪問注冊,分布式計數器和領導人選舉
- * 3、blobServer: 負責偵聽傳入的請求生成線程來處理這些請求。它還負責創建要存儲的目錄結構 blob 或臨時緩存它們
- * 4、heartbeatServices: 提供心跳所需的所有服務。這包括創建心跳接收器和心跳發送者。
- * 5、metricRegistry: 跟蹤所有已注冊的 Metric,它作為連接 MetricGroup 和 MetricReporter
- * 6、archivedExecutionGraphStore: 存儲執行圖ExecutionGraph的可序列化形式。
- */
- initializeServices(configuration, pluginManager);
- // 創建 DispatcherResourceManagerComponentFactory, 初始化各種組件的
- 工廠實例
- // 其實內部包含了三個重要的成員變量:
- // 創建 ResourceManager 的工廠實例
- // 創建 Dispatcher 的工廠實例
- // 創建 WebMonitorEndpoint 的工廠實例
- createDispatcherResourceManagerComponentFactory(configuration);
- // 創建 集群運行需要的一些組件:Dispatcher, ResourceManager 等
- // 創 建 ResourceManager
- // 創 建 Dispatcher
- // 創 建 WebMonitorEndpoint
- clusterComponent = dispatcherResourceManagerComponentFactory.create(...)
1. initializeServices():初始化各種服務
- // 初 始 化 和 啟 動 AkkaRpcService, 內 部 其 實 包 裝 了 一 個 ActorSystem commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...)
- // 初始化一個負責 IO 的線程池
- ioExecutor = Executors.newFixedThreadPool(...)
- // 初始化 HA 服務組件,負責 HA 服務的是:ZooKeeperHaServices haServices = createHaServices(configuration, ioExecutor);
- // 初始化 BlobServer 服務端
- blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start();
- // 初始化心跳服務組件, heartbeatServices = HeartbeatServices heartbeatServices = createHeartbeatServices(configuration);
- // 初始化一個用來存儲 ExecutionGraph 的 Store, 實現是:
- FileArchivedExecutionGraphStore
- archivedExecutionGraphStore = createSerializableExecutionGraphStore(...)
2. createDispatcherResourceManagerComponentFactory(configuration)初始化了多組件的工廠實例
- 1、DispatcherRunnerFactory,默認實現:DefaultDispatcherRunnerFactory
- 2、ResourceManagerFactory,默認實現:StandaloneResourceManagerFactory
- 3、RestEndpointFactory,默認實現:SessionRestEndpointFactory
- clusterComponent = dispatcherResourceManagerComponentFactory
- .create(configuration, ioExecutor, commonRpcService, haServices,
- blobServer, heartbeatServices, metricRegistry,
- archivedExecutionGraphStore,
- new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
- this);
3. 創建 WebMonitorEndpoint
- /*************************************************
- * 創建 WebMonitorEndpoint 實例, 在 Standalone 模式下:DispatcherRestEndpoint
- * 1、restEndpointFactory = SessionRestEndpointFactory
- * 2、webMonitorEndpoint = DispatcherRestEndpoint
- * 3、highAvailabilityServices.getClusterRestEndpointLeaderElectionService() = ZooKeeperLeaderElectionService
- * 當前這個 DispatcherRestEndpoint 的作用是:
- * 1、初始化的過程中,會一大堆的 Handler
- * 2、啟動一個 Netty 的服務端,綁定了這些 Handler
- * 3、當 client 通過 flink 命令執行了某些操作(發起 restful 請求), 服務端由 webMonitorEndpoint 來執行處理
- * 4、舉個例子: 如果通過 flink run 提交一個 Job,那么最后是由 webMonitorEndpoint 中的 JobSubmitHandler 來執行處理
- * 5、補充一個:job 由 JobSubmitHandler 執行完畢之后,轉交給 Dispatcher 去調度執行
- */
- webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
- configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever,
- blobServer, executor, metricFetcher,
- highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
- fatalErrorHandler
- );
4. 創建 resourceManager
- /*************************************************
- * 創建 StandaloneResourceManager 實例對象
- * 1、resourceManager = StandaloneResourceManager
- * 2、resourceManagerFactory = StandaloneResourceManagerFactory
- */
- resourceManager = resourceManagerFactory.createResourceManager(
- configuration, ResourceID.generate(),
- rpcService, highAvailabilityServices, heartbeatServices,
- fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()),
- webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname
- );
- protected ResourceManager<ResourceID> createResourceManager(
- Configuration configuration,
- ResourceID resourceId,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- FatalErrorHandler fatalErrorHandler,
- ClusterInformation clusterInformation,
- @Nullable String webInterfaceUrl,
- ResourceManagerMetricGroup resourceManagerMetricGroup,
- ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
- final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);
- /*************************************************
- * 注釋: 得到一個 StandaloneResourceManager 實例對象
- */
- return new StandaloneResourceManager(
- rpcService,
- resourceId,
- highAvailabilityServices,
- heartbeatServices,
- resourceManagerRuntimeServices.getSlotManager(),
- ResourceManagerPartitionTrackerImpl::new,
- resourceManagerRuntimeServices.getJobLeaderIdService(),
- clusterInformation,
- fatalErrorHandler,
- resourceManagerMetricGroup,
- standaloneClusterStartupPeriodTime,
- AkkaUtils.getTimeoutAsTime(configuration)
- );
- }
- /**
- requestSlot():接受 solt請求
- sendSlotReport(..): 將solt請求發送TaskManager
- registerJobManager(...): 注冊job管理者。 該job指的是 提交給flink的應用程序
- registerTaskExecutor(...): 注冊task執行者。
- **/
- public ResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory,
- JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler,
- ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) {
- /*************************************************
- * 注釋: 當執行完畢這個構造方法的時候,會觸發調用 onStart() 方法執行
- */
- super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null);
- protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
- this.rpcService = checkNotNull(rpcService, "rpcService");
- this.endpointId = checkNotNull(endpointId, "endpointId");
- /*************************************************
- * 注釋:ResourceManager 或者 TaskExecutor 中的 RpcServer 實現
- * 以 ResourceManager 為例說明:
- * 啟動 ResourceManager 的 RPCServer 服務
- * 這里啟動的是 ResourceManager 的 Rpc 服務端。
- * 接收 TaskManager 啟動好了而之后, 進行注冊和心跳,來匯報 Taskmanagaer 的資源情況
- * 通過動態代理的形式構建了一個Server
- */
- this.rpcServer = rpcService.startServer(this);
5. 在創建resourceManager同級:啟動任務接收器Starting Dispatcher
- /*************************************************
- * 創建 并啟動 Dispatcher
- * 1、dispatcherRunner = DispatcherRunnerLeaderElectionLifecycleManager
- * 2、dispatcherRunnerFactory = DefaultDispatcherRunnerFactory
- * 第一個參數:ZooKeeperLeaderElectionService
- * -
- * 老版本: 這個地方是直接創建一個 Dispatcher 對象然后調用 dispatcher.start() 來啟動
- * 新版本: 直接創建一個 DispatcherRunner, 內部就是要創建和啟動 Dispatcher
- * -
- * DispatcherRunner 是對 Dispatcher 的封裝。
- * DispatcherRunner被創建的代碼的內部,會創建 Dispatcher并啟動
- */
- log.debug("Starting Dispatcher.");
- dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
- highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler,
- // TODO_ZYM 注釋: 注意第三個參數
- new HaServicesJobGraphStoreFactory(highAvailabilityServices),
- ioExecutor, rpcService, partialDispatcherServices
- );
Dispatcher 啟動后,將會等待任務提交,如果有任務提交,則會經過submitJob(...)函數進入后續處理。
提交(一個Flink應用的提交必須經過三個graph的轉換)
首先看下一些名詞
StreamGraph
是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。可以用一個 DAG 來表示),DAG 的頂點是 StreamNode,邊是 StreamEdge,邊包含了由哪個 StreamNode 依賴哪個 StreamNode。
- StreamNode:用來代表 operator 的類,并具有所有相關的屬性,如并發度、入邊和出邊等。
- StreamEdge:表示連接兩個StreamNode的邊。
DataStream 上常見的 transformation 有 map、flatmap、filter等(見DataStream Transformation了解更多)。這些transformation會構造出一棵 StreamTransformation 樹,通過這棵樹轉換成 StreamGraph
以map方法為例,看看源碼
- public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
- // 通過java reflection抽出mapper的返回值類型
- TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
- Utils.getCallLocationName(), true);
- // 返回一個新的DataStream,SteramMap 為 StreamOperator 的實現類
- return transform("Map", outType, new StreamMap<>(clean(mapper)));
- }
- public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
- // read the output type of the input Transform to coax out errors about MissingTypeInfo
- transformation.getOutputType();
- // 新的transformation會連接上當前DataStream中的transformation,從而構建成一棵樹
- OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
- this.transformation,
- operatorName,
- operator,
- outTypeInfo,
- environment.getParallelism());
- @SuppressWarnings({ "unchecked", "rawtypes" })
- SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
- // 所有的transformation都會存到 env 中,調用execute時遍歷該list生成StreamGraph
- getExecutionEnvironment().addOperator(resultTransform);
- return returnStream;
- }
map轉換將用戶自定義的函數MapFunction包裝到StreamMap這個Operator中,再將StreamMap包裝到OneInputTransformation,最后該transformation存到env中,當調用env.execute時,遍歷其中的transformation集合構造出StreamGraph
JobGraph
(1) StreamGraph經過優化后生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點。
- 將并不涉及到 shuffle 的算子進行合并。
- 對于同一個 operator chain 里面的多個算子,會在同一個 task 中執行。
- 對于不在同一個 operator chain 里的算子,會在不同的 task 中執行。
(2) JobGraph 用來由 JobClient 提交給 JobManager,是由頂點(JobVertex)、中間結果(IntermediateDataSet)和邊(JobEdge)組成的 DAG 圖。
(3) JobGraph 定義作業級別的配置,而每個頂點和中間結果定義具體操作和中間數據的設置。
JobVertex
JobVertex 相當于是 JobGraph 的頂點。經過優化后符合條件的多個StreamNode可能會chain在一起生成一個JobVertex,即一個JobVertex包含一個或多個operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。
IntermediateDataSet
JobVertex的輸出,即經過operator處理產生的數據集。
JobEdge
job graph中的一條數據傳輸通道。source 是IntermediateDataSet,sink 是 JobVertex。即數據通過JobEdge由IntermediateDataSet傳遞給目標JobVertex。
(1) 首先是通過API會生成transformations,通過transformations會生成StreamGraph。
(2)將StreamGraph的某些StreamNode Chain在一起生成JobGraph,前兩步轉換都是在客戶端完成。
(3)最后會將JobGraph轉換為ExecutionGraph,相比JobGraph會增加并行度的概念,這一步是在Jobmanager里完成。
ExecutionJobVertex
ExecutionJobVertex一一對應JobGraph中的JobVertex
ExecutionVertex
一個ExecutionJobVertex對應n個ExecutionVertex,其中n就是算子的并行度。ExecutionVertex就是并行任務的一個子任務
Execution
Execution 是對 ExecutionVertex 的一次執行,通過 ExecutionAttemptId 來唯一標識。
IntermediateResult
在 JobGraph 中用 IntermediateDataSet 表示 JobVertex 的對外輸出,一個 JobGraph 可能有 n(n >=0) 個輸出。在 ExecutionGraph 中,與此對應的就是 IntermediateResult。每一個 IntermediateResult 就有 numParallelProducers(并行度) 個生產者,每個生產者的在相應的 IntermediateResult 上的輸出對應一個 IntermediateResultPartition。IntermediateResultPartition 表示的是 ExecutionVertex 的一個輸出分區
ExecutionEdge
ExecutionEdge 表示 ExecutionVertex 的輸入,通過 ExecutionEdge 將 ExecutionVertex 和 IntermediateResultPartition 連接起來,進而在不同的 ExecutionVertex 之間建立聯系。
ExecutionGraph的構建
- 構建JobInformation
- 構建ExecutionGraph
- 將JobGraph進行拓撲排序,獲取sortedTopology頂點集合
- // ExecutionGraphBuilder
- public static ExecutionGraph buildGraph(
- @Nullable ExecutionGraph prior,
- JobGraph jobGraph,
- ...) throws JobExecutionException, JobException {
- // 構建JobInformation
- // 構建ExecutionGraph
- // 將JobGraph進行拓撲排序,獲取sortedTopology頂點集合
- List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
- executionGraph.attachJobGraph(sortedTopology);
- return executionGraph;
- }
構建ExecutionJobVertex,連接IntermediateResultPartition和ExecutionVertex
- //ExecutionGraph
- public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
- for (JobVertex jobVertex : topologiallySorted) {
- // 構建ExecutionJobVertex
- ExecutionJobVertex ejv = new ExecutionJobVertex(
- this,
- jobVertex,
- 1,
- maxPriorAttemptsHistoryLength,
- rpcTimeout,
- globalModVersion,
- createTimestamp);
- // 連接IntermediateResultPartition和ExecutionVertex
- ev.connectToPredecessors(this.intermediateResults);
- }
- // ExecutionJobVertex
- public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
- List<JobEdge> inputs = jobVertex.getInputs();
- for (int num = 0; num < inputs.size(); num++) {
- JobEdge edge = inputs.get(num);
- IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
- this.inputs.add(ires);
- int consumerIndex = ires.registerConsumer();
- for (int i = 0; i < parallelism; i++) {
- ExecutionVertex ev = taskVertices[i];
- ev.connectSource(num, ires, edge, consumerIndex);
- }
- }
- }
拆分計劃(可執行能力)
- // ExecutionVertex
- public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
- final DistributionPattern pattern = edge.getDistributionPattern();
- final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
- ExecutionEdge[] edges;
- switch (pattern) {
- // 下游 JobVertex 的輸入 partition 算法,如果是 forward 或 rescale 的話為 POINTWISE
- case POINTWISE:
- edges = connectPointwise(sourcePartitions, inputNumber);
- break;
- // 每一個并行的ExecutionVertex節點都會鏈接到源節點產生的所有中間結果IntermediateResultPartition
- case ALL_TO_ALL:
- edges = connectAllToAll(sourcePartitions, inputNumber);
- break;
- default:
- throw new RuntimeException("Unrecognized distribution pattern.");
- }
- inputEdges[inputNumber] = edges;
- for (ExecutionEdge ee : edges) {
- ee.getSource().addConsumer(ee, consumerNumber);
- }
- }
- private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
- final int numSources = sourcePartitions.length;
- final int parallelism = getTotalNumberOfParallelSubtasks();
- // 如果并發數等于partition數,則一對一進行連接
- if (numSources == parallelism) {
- return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) };
- }
- // 如果并發數大于partition數,則一對多進行連接
- else if (numSources < parallelism) {
- int sourcePartition;
- if (parallelism % numSources == 0) {
- int factor = parallelism / numSources;
- sourcePartition = subTaskIndex / factor;
- }
- else {
- float factor = ((float) parallelism) / numSources;
- sourcePartition = (int) (subTaskIndex / factor);
- }
- return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };
- }
- // 果并發數小于partition數,則多對一進行連接
- else {
- if (numSources % parallelism == 0) {
- int factor = numSources / parallelism;
- int startIndex = subTaskIndex * factor;
- ExecutionEdge[] edges = new ExecutionEdge[factor];
- for (int i = 0; i < factor; i++) {
- edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber);
- }
- return edges;
- }
- else {
- float factor = ((float) numSources) / parallelism;
- int start = (int) (subTaskIndex * factor);
- int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
- sourcePartitions.length :
- (int) ((subTaskIndex + 1) * factor);
- ExecutionEdge[] edges = new ExecutionEdge[end - start];
- for (int i = 0; i < edges.length; i++) {
- edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber);
- }
- return edges;
- }
- }
- }
- private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
- ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
- for (int i = 0; i < sourcePartitions.length; i++) {
- IntermediateResultPartition irp = sourcePartitions[i];
- edges[i] = new ExecutionEdge(irp, this, inputNumber);
- }
- return edges;
- }
返回ExecutionGraph
TaskManager
TaskManager啟動
- public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
- //主要初始化一堆的service,并新建一個org.apache.flink.runtime.taskexecutor.TaskExecutor
- final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration,resourceId);
- //調用TaskExecutor的start()方法
- taskManagerRunner.start();
- }
TaskExecutor :submitTask()
接著的重要函數是shumitTask()函數,該函數會通過AKKA機制,向TaskManager發出一個submitTask的消息請求,TaskManager收到消息請求后,會執行submitTask()方法。(省略了部分代碼)。
- public CompletableFuture<Acknowledge> submitTask(
- TaskDeploymentDescriptor tdd,
- JobMasterId jobMasterId,
- Time timeout) {
- jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
- taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
- TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(xxx);
- InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(xxx);
- TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
- CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
- LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
- ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
- PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
- final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
- jobId,
- tdd.getAllocationId(),
- taskInformation.getJobVertexId(),
- tdd.getSubtaskIndex());
- final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
- final TaskStateManager taskStateManager = new TaskStateManagerImpl(
- jobId,
- tdd.getExecutionAttemptId(),
- localStateStore,
- taskRestore,
- checkpointResponder);
- //新建一個Task
- Task task = new Task(xxxx);
- log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
- boolean taskAdded;
- try {
- taskAdded = taskSlotTable.addTask(task);
- } catch (SlotNotFoundException | SlotNotActiveException e) {
- throw new TaskSubmissionException("Could not submit task.", e);
- }
- if (taskAdded) {
- //啟動任務
- task.startTaskThread();
- return CompletableFuture.completedFuture(Acknowledge.get());
- }
最后創建執行Task的線程,然后調用startTaskThread()來啟動具體的執行線程,Task線程內部的run()方法承載了被執行的核心邏輯。
Task是執行在TaskExecutor進程里的一個線程,下面來看看其run方法
(1) 檢測當前狀態,正常情況為CREATED,如果是FAILED或CANCELING直接返回,其余狀態將拋異常。
(2) 讀取DistributedCache文件。
(3) 啟動ResultPartitionWriter和InputGate。
(4) 向taskEventDispatcher注冊partitionWriter。
(5) 根據nameOfInvokableClass加載對應的類并實例化。
(6) 將狀態置為RUNNING并執行invoke方法。
- public void run() {
- while (true) {
- ExecutionState current = this.executionState;
- invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
- network.registerTask(this);
- Environment env = new RuntimeEnvironment(. . . . );
- invokable.setEnvironment(env);
- // actual task core work
- if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
- }
- // notify everyone that we switched to running
- notifyObservers(ExecutionState.RUNNING, null);
- executingThread.setContextClassLoader(userCodeClassLoader);
- // run the invokable
- invokable.invoke();
- if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
- notifyObservers(ExecutionState.FINISHED, null);
- }
- Finally{
- // free the network resources
- network.unregisterTask(this);
- // free memory resources
- if (invokable != null) {
- memoryManager.releaseAll(invokable);
- }
- libraryCache.unregisterTask(jobId, executionId);
- removeCachedFiles(distributedCacheEntries, fileCache);
總結
整體的流程與架構可能三兩張圖或者三言兩語就可以勾勒出畫面,但是背后源碼的實現是艱辛的。源碼的復雜度和當初設計框架的抓狂感,我們只有想象。現在我們只是站在巨人的肩膀上去學習。
本篇的主題是"Flink架構與執行流程",做下小結,Flink on Yarn的提交執行流程:
1 Flink任務提交后,Client向HDFS上傳Flink的Jar包和配置。
2 向Yarn ResourceManager提交任務。
3 ResourceManager分配Container資源并通知對應的NodeManager啟動ApplicationMaster。
4 ApplicationMaster啟動后加載Flink的Jar包和配置構建環境。
5 啟動JobManager之后ApplicationMaster向ResourceManager申請資源啟動TaskManager。
6 ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節點的NodeManager啟動TaskManager。
7 NodeManager加載Flink的Jar包和配置構建環境并啟動TaskManager。
8 TaskManager啟動后向JobManager發送心跳包,并等待JobManager向其分配任務。
原文鏈接:https://mp.weixin.qq.com/s/iA_nMURBPMeqkSJ-kQUEjw