国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - 編程技術 - Flink執行流程與源碼分析

Flink執行流程與源碼分析

2021-09-08 22:56大數據左右手王了個博 編程技術

本篇的主題是"Flink架構與執行流程",做下小結,Flink on Yarn的提交執行流程,整體的流程與架構可能三兩張圖或者三言兩語就可以勾勒出畫面,但是背后源碼的實現是艱辛的。源碼的復雜度和當初設計框架的抓狂感,我們只有想象。

Flink執行流程與源碼分析

Flink主要組件

Flink執行流程與源碼分析

作業管理器(JobManager)

(1) 控制一個應用程序執行的主進程,也就是說,每個應用程序 都會被一個不同的Jobmanager所控制執行

(2) Jobmanager會先接收到要執行的應用程序,這個應用程序會包括:作業圖( Job Graph)、邏輯數據流圖( ogical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。

(3) Jobmanager會把 Jobgraph轉換成一個物理層面的 數據流圖,這個圖被叫做 “執行圖”(Executiongraph),包含了所有可以并發執行的任務。Job Manager會向資源管理器( Resourcemanager)請求執行任務必要的資源,也就是 任務管理器(Taskmanager)上的插槽slot。一旦它獲取到了足夠的資源,就會將執行圖分發到真正運行它們的 Taskmanager上。而在運行過程中Jobmanagera會負責所有需要中央協調的操作,比如說檢查點(checkpoints)的協調。

任務管理器(Taskmanager)

(1) Flink中的工作進程。通常在 Flink中會有多個 Taskmanageria運行, 每個 Taskmanageri都包含了一定數量的插槽( slots)。插槽的數量限制了Taskmanageri能夠執行的任務數量。

(2) 啟動之后, Taskmanager會向資源管理器注冊它的插槽;收到資源管理器的指令后, Taskmanageri就會將一個或者多個插槽提供給Jobmanageri調用。Jobmanager就可以向插槽分配任務( tasks)來執行了。

(3) 在執行過程中, 一個 Taskmanagera可以跟其它運行同一應用程序的Taskmanager交換數據。

資源管理器(Resource Manager)

(1) 主要負責管理任務管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定義的處理資源單元。

(2) Flink 為不同的環境和資源管理工具提供了不同資源管理器,比如YARNMesos、K8s,以及 standalone部署。

(3) 當 Jobmanager申請插槽資源時, Resourcemanager會將有空閑插槽的Taskmanager?分配給Jobmanager。如果 Resourcemanagery沒有足夠的插槽來滿足 Jobmanager的請求, 它還可以向資源提供平臺發起會話,以提供啟動 Taskmanager進程的容器。

分發器(Dispatcher)

(1) 可以跨作業運行,它為應用提交提供了REST接口。

(2)當一個應用被提交執行時,分發器就會啟動并將應用移交給Jobmanage

(3) Dispatcher他會啟動一個 WebUi,用來方便地 展示和監控作業執行的信息。

任務提交流程

Flink執行流程與源碼分析

  1. 提交應用
  2. 啟動并提交應用
  3. 請求slots
  4. 任務啟動
  5. 注冊slots
  6. 發出提供slot的指令
  7. 提供slots
  8. 提交要在slots中執行的任務
  9. 交換數據

任務提交流程(YARN)

Flink執行流程與源碼分析

a. Flink任務提交后,Client向HDFS上傳Flink的Jar包和配置

b. 隨后向 Yarn ResourceManager提交任務ResourceManager分配 Container資源并通知對應的NodeManager啟動

c. ApplicationMaster,ApplicationMaster 啟動后加載Flink的Jar包和配置構建環境

d. 然后啟動JobManager , 之后ApplicationMaster 向ResourceManager 申請資源啟動TaskManager

e. ResourceManager 分配 Container 資源后 , 由ApplicationMaster通知資源所在節點的NodeManager啟動TaskManager

f. NodeManager 加載 Flink 的 Jar 包和配置構建環境并啟動 TaskManager

g. TaskManager 啟動后向 JobManager 發送心跳包,并等待 JobManager 向其分配任務。

源碼分析--集群啟動 JobManager 啟動分析

JobManager 的內部包含非常重要的三大組件

  • WebMonitorEndpoint
  • ResourceManager
  • Dispatcher

入口,啟動主類:StandaloneSessionClusterEntrypoint

  1. // 入 口 
  2. StandaloneSessionClusterEntrypoint.main() ClusterEntrypoint.runClusterEntrypoint(entrypoint); 
  3. clusterEntrypoint.startCluster();  
  4. runCluster(configuration, pluginManager); 
  5.  
  6. // 第一步:初始化各種服務 
  7.  /** 
  8.   * 初始化了 主節點對外提供服務的時候所需要的 三大核心組件啟動時所需要的基礎服務 
  9.   *  初始化服務,如 JobManager 的 Akka RPC 服務,HA 服務,心跳檢查服務,metric service 
  10.   *  這些服務都是 Master 節點要使用到的一些服務 
  11.   *  1、commonRpcService:  基于 Akka 的 RpcService 實現。RPC 服務啟動 Akka 參與者來接收從 RpcGateway 調用 RPC 
  12.   *  2、haServices:    提供對高可用性所需的所有服務的訪問注冊,分布式計數器和領導人選舉 
  13.   *  3、blobServer:    負責偵聽傳入的請求生成線程來處理這些請求。它還負責創建要存儲的目錄結構 blob 或臨時緩存它們 
  14.   *  4、heartbeatServices:  提供心跳所需的所有服務。這包括創建心跳接收器和心跳發送者。 
  15.   *  5、metricRegistry:   跟蹤所有已注冊的 Metric,它作為連接 MetricGroup 和 MetricReporter 
  16.   *  6、archivedExecutionGraphStore:   存儲執行圖ExecutionGraph的可序列化形式。 
  17. */ 
  18. initializeServices(configuration, pluginManager); 
  19.  
  20. // 創建 DispatcherResourceManagerComponentFactory, 初始化各種組件的 
  21. 工廠實例 
  22. // 其實內部包含了三個重要的成員變量: 
  23. // 創建 ResourceManager 的工廠實例 
  24. // 創建 Dispatcher 的工廠實例 
  25. // 創建 WebMonitorEndpoint 的工廠實例 
  26. createDispatcherResourceManagerComponentFactory(configuration); 
  27.  
  28. // 創建 集群運行需要的一些組件:Dispatcher, ResourceManager 等 
  29. // 創 建 ResourceManager 
  30. // 創 建 Dispatcher 
  31. // 創 建 WebMonitorEndpoint 
  32. clusterComponent = dispatcherResourceManagerComponentFactory.create(...) 

1. initializeServices():初始化各種服務

  1. // 初 始 化 和 啟 動 AkkaRpcService, 內 部 其 實 包 裝 了 一 個 ActorSystem commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...) 
  2.  
  3. // 初始化一個負責 IO 的線程池 
  4. ioExecutor = Executors.newFixedThreadPool(...) 
  5. // 初始化 HA 服務組件,負責 HA 服務的是:ZooKeeperHaServices haServices = createHaServices(configuration, ioExecutor); 
  6.  
  7. // 初始化 BlobServer 服務端 
  8. blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); 
  9.  
  10. // 初始化心跳服務組件, heartbeatServices = HeartbeatServices heartbeatServices = createHeartbeatServices(configuration); 
  11.  
  12. // 初始化一個用來存儲 ExecutionGraph 的 Store, 實現是: 
  13. FileArchivedExecutionGraphStore 
  14. archivedExecutionGraphStore = createSerializableExecutionGraphStore(...) 

2. createDispatcherResourceManagerComponentFactory(configuration)初始化了多組件的工廠實例

  1. 1、DispatcherRunnerFactory,默認實現:DefaultDispatcherRunnerFactory  
  2.  
  3. 2、ResourceManagerFactory,默認實現:StandaloneResourceManagerFactory  
  4.  
  5. 3、RestEndpointFactory,默認實現:SessionRestEndpointFactory 
  6.  
  7. clusterComponent = dispatcherResourceManagerComponentFactory 
  8.     .create(configuration, ioExecutor, commonRpcService, haServices, 
  9.      blobServer, heartbeatServices, metricRegistry, 
  10.      archivedExecutionGraphStore, 
  11.      new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), 
  12.      this); 

3. 創建 WebMonitorEndpoint

  1. /************************************************* 
  2.   *  創建 WebMonitorEndpoint 實例, 在 Standalone 模式下:DispatcherRestEndpoint 
  3.   *  1、restEndpointFactory = SessionRestEndpointFactory 
  4.   *  2、webMonitorEndpoint = DispatcherRestEndpoint 
  5.   *  3、highAvailabilityServices.getClusterRestEndpointLeaderElectionService() = ZooKeeperLeaderElectionService 
  6.   *  當前這個 DispatcherRestEndpoint 的作用是: 
  7.   *  1、初始化的過程中,會一大堆的 Handler 
  8.   *  2、啟動一個 Netty 的服務端,綁定了這些 Handler 
  9.   *  3、當 client 通過 flink 命令執行了某些操作(發起 restful 請求), 服務端由 webMonitorEndpoint 來執行處理 
  10.   *  4、舉個例子: 如果通過 flink run 提交一個 Job,那么最后是由 webMonitorEndpoint 中的 JobSubmitHandler 來執行處理 
  11.   *  5、補充一個:job 由 JobSubmitHandler 執行完畢之后,轉交給 Dispatcher 去調度執行 
  12.   */ 
  13.  webMonitorEndpoint = restEndpointFactory.createRestEndpoint( 
  14.   configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, 
  15.   blobServer, executor, metricFetcher, 
  16.   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), 
  17.   fatalErrorHandler 
  18.  ); 

4. 創建 resourceManager

  1. /************************************************* 
  2.  *  創建 StandaloneResourceManager 實例對象 
  3.  *  1、resourceManager = StandaloneResourceManager 
  4.  *  2、resourceManagerFactory = StandaloneResourceManagerFactory 
  5. */ 
  6. resourceManager = resourceManagerFactory.createResourceManager( 
  7.  configuration, ResourceID.generate(), 
  8.  rpcService, highAvailabilityServices, heartbeatServices, 
  9.  fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), 
  10.  webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname 
  11. ); 
  1. protected ResourceManager<ResourceID> createResourceManager( 
  2.   Configuration configuration, 
  3.   ResourceID resourceId, 
  4.   RpcService rpcService, 
  5.   HighAvailabilityServices highAvailabilityServices, 
  6.   HeartbeatServices heartbeatServices, 
  7.   FatalErrorHandler fatalErrorHandler, 
  8.   ClusterInformation clusterInformation, 
  9.   @Nullable String webInterfaceUrl, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, 
  11.   ResourceManagerRuntimeServices resourceManagerRuntimeServices) { 
  12.  
  13.  final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration); 
  14.  
  15.  /************************************************* 
  16.   *  注釋: 得到一個 StandaloneResourceManager 實例對象 
  17.   */ 
  18.  return new StandaloneResourceManager( 
  19.   rpcService, 
  20.   resourceId, 
  21.   highAvailabilityServices, 
  22.   heartbeatServices, 
  23.   resourceManagerRuntimeServices.getSlotManager(), 
  24.   ResourceManagerPartitionTrackerImpl::new, 
  25.   resourceManagerRuntimeServices.getJobLeaderIdService(), 
  26.   clusterInformation, 
  27.   fatalErrorHandler, 
  28.   resourceManagerMetricGroup, 
  29.   standaloneClusterStartupPeriodTime, 
  30.   AkkaUtils.getTimeoutAsTime(configuration) 
  31.  ); 
  32.  
  33.  } 
  34.   
  1. /** 
  2. requestSlot():接受 solt請求 
  3. sendSlotReport(..): 將solt請求發送TaskManager 
  4. registerJobManager(...): 注冊job管理者。 該job指的是 提交給flink的應用程序 
  5. registerTaskExecutor(...): 注冊task執行者。 
  6. **/ 
  7. public ResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices, 
  8.   HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, 
  9.   JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) { 
  11.  
  12.  /************************************************* 
  13.   *  注釋: 當執行完畢這個構造方法的時候,會觸發調用 onStart() 方法執行 
  14.   */ 
  15.  super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null); 
  1. protected RpcEndpoint(final RpcService rpcService, final String endpointId) { 
  2.  this.rpcService = checkNotNull(rpcService, "rpcService"); 
  3.  this.endpointId = checkNotNull(endpointId, "endpointId"); 
  4.  
  5.  /************************************************* 
  6.   *  注釋:ResourceManager 或者 TaskExecutor 中的 RpcServer 實現 
  7.   *  以 ResourceManager 為例說明: 
  8.   *  啟動 ResourceManager 的 RPCServer 服務 
  9.   *  這里啟動的是 ResourceManager 的 Rpc 服務端。 
  10.   *  接收 TaskManager 啟動好了而之后, 進行注冊和心跳,來匯報 Taskmanagaer 的資源情況 
  11.   *  通過動態代理的形式構建了一個Server 
  12.   */ 
  13.  this.rpcServer = rpcService.startServer(this); 

5. 在創建resourceManager同級:啟動任務接收器Starting Dispatcher

  1. /************************************************* 
  2.  
  3.  *  創建 并啟動 Dispatcher 
  4.  *  1、dispatcherRunner = DispatcherRunnerLeaderElectionLifecycleManager 
  5.  *  2、dispatcherRunnerFactory = DefaultDispatcherRunnerFactory 
  6.  *  第一個參數:ZooKeeperLeaderElectionService 
  7.  *  - 
  8.  *  老版本: 這個地方是直接創建一個 Dispatcher 對象然后調用 dispatcher.start() 來啟動 
  9.  *  新版本: 直接創建一個 DispatcherRunner, 內部就是要創建和啟動 Dispatcher 
  10.  *  - 
  11.  *  DispatcherRunner 是對 Dispatcher 的封裝。 
  12.  *  DispatcherRunner被創建的代碼的內部,會創建 Dispatcher并啟動 
  13.  */ 
  14. log.debug("Starting Dispatcher."); 
  15. dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( 
  16.  highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, 
  17.  // TODO_ZYM 注釋: 注意第三個參數 
  18.  new HaServicesJobGraphStoreFactory(highAvailabilityServices), 
  19.  ioExecutor, rpcService, partialDispatcherServices 
  20. ); 

Dispatcher 啟動后,將會等待任務提交,如果有任務提交,則會經過submitJob(...)函數進入后續處理。

提交(一個Flink應用的提交必須經過三個graph的轉換)

Flink執行流程與源碼分析

首先看下一些名詞

StreamGraph

是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。可以用一個 DAG 來表示),DAG 的頂點是 StreamNode,邊是 StreamEdge,邊包含了由哪個 StreamNode 依賴哪個 StreamNode。

  • StreamNode:用來代表 operator 的類,并具有所有相關的屬性,如并發度、入邊和出邊等。
  • StreamEdge:表示連接兩個StreamNode的邊。

Flink執行流程與源碼分析

DataStream 上常見的 transformation 有 map、flatmap、filter等(見DataStream Transformation了解更多)。這些transformation會構造出一棵 StreamTransformation 樹,通過這棵樹轉換成 StreamGraph

以map方法為例,看看源碼

  1. public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) { 
  2.   // 通過java reflection抽出mapper的返回值類型 
  3.   TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), 
  4.       Utils.getCallLocationName(), true); 
  5.  
  6.   // 返回一個新的DataStream,SteramMap 為 StreamOperator 的實現類 
  7.   return transform("Map", outType, new StreamMap<>(clean(mapper))); 
  8.  
  9. public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { 
  10.   // read the output type of the input Transform to coax out errors about MissingTypeInfo 
  11.   transformation.getOutputType(); 
  12.  
  13.   // 新的transformation會連接上當前DataStream中的transformation,從而構建成一棵樹 
  14.   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( 
  15.       this.transformation, 
  16.       operatorName, 
  17.       operator, 
  18.       outTypeInfo, 
  19.       environment.getParallelism()); 
  20.  
  21.   @SuppressWarnings({ "unchecked""rawtypes" }) 
  22.   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); 
  23.  
  24.   // 所有的transformation都會存到 env 中,調用execute時遍歷該list生成StreamGraph 
  25.   getExecutionEnvironment().addOperator(resultTransform); 
  26.  
  27.   return returnStream; 

map轉換將用戶自定義的函數MapFunction包裝到StreamMap這個Operator中,再將StreamMap包裝到OneInputTransformation,最后該transformation存到env中,當調用env.execute時,遍歷其中的transformation集合構造出StreamGraph

JobGraph

(1) StreamGraph經過優化后生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點。

Flink執行流程與源碼分析

  • 將并不涉及到 shuffle 的算子進行合并。
  • 對于同一個 operator chain 里面的多個算子,會在同一個 task 中執行。
  • 對于不在同一個 operator chain 里的算子,會在不同的 task 中執行。

(2) JobGraph 用來由 JobClient 提交給 JobManager,是由頂點(JobVertex)、中間結果(IntermediateDataSet)和邊(JobEdge)組成的 DAG 圖。

(3) JobGraph 定義作業級別的配置,而每個頂點和中間結果定義具體操作和中間數據的設置。

JobVertex

JobVertex 相當于是 JobGraph 的頂點。經過優化后符合條件的多個StreamNode可能會chain在一起生成一個JobVertex,即一個JobVertex包含一個或多個operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。

IntermediateDataSet

JobVertex的輸出,即經過operator處理產生的數據集。

JobEdge

job graph中的一條數據傳輸通道。source 是IntermediateDataSet,sink 是 JobVertex。即數據通過JobEdge由IntermediateDataSet傳遞給目標JobVertex。

(1) 首先是通過API會生成transformations,通過transformations會生成StreamGraph。

(2)將StreamGraph的某些StreamNode Chain在一起生成JobGraph,前兩步轉換都是在客戶端完成。

(3)最后會將JobGraph轉換為ExecutionGraph,相比JobGraph會增加并行度的概念,這一步是在Jobmanager里完成。

Flink執行流程與源碼分析

ExecutionJobVertex

ExecutionJobVertex一一對應JobGraph中的JobVertex

ExecutionVertex

一個ExecutionJobVertex對應n個ExecutionVertex,其中n就是算子的并行度。ExecutionVertex就是并行任務的一個子任務

Execution

Execution 是對 ExecutionVertex 的一次執行,通過 ExecutionAttemptId 來唯一標識。

IntermediateResult

在 JobGraph 中用 IntermediateDataSet 表示 JobVertex 的對外輸出,一個 JobGraph 可能有 n(n >=0) 個輸出。在 ExecutionGraph 中,與此對應的就是 IntermediateResult。每一個 IntermediateResult 就有 numParallelProducers(并行度) 個生產者,每個生產者的在相應的 IntermediateResult 上的輸出對應一個 IntermediateResultPartition。IntermediateResultPartition 表示的是 ExecutionVertex 的一個輸出分區

ExecutionEdge

ExecutionEdge 表示 ExecutionVertex 的輸入,通過 ExecutionEdge 將 ExecutionVertex 和 IntermediateResultPartition 連接起來,進而在不同的 ExecutionVertex 之間建立聯系。

ExecutionGraph的構建

  1. 構建JobInformation
  2. 構建ExecutionGraph
  3. 將JobGraph進行拓撲排序,獲取sortedTopology頂點集合
  1. // ExecutionGraphBuilder 
  2.  public static ExecutionGraph buildGraph( 
  3.   @Nullable ExecutionGraph prior
  4.   JobGraph jobGraph, 
  5.   ...) throws JobExecutionException, JobException { 
  6.   // 構建JobInformation 
  7.    
  8.   // 構建ExecutionGraph 
  9.    
  10.   // 將JobGraph進行拓撲排序,獲取sortedTopology頂點集合 
  11.   List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); 
  12.    
  13.   executionGraph.attachJobGraph(sortedTopology); 
  14.  
  15.   return executionGraph; 
  16.  } 

構建ExecutionJobVertex,連接IntermediateResultPartition和ExecutionVertex

  1. //ExecutionGraph 
  2.  public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException { 
  3.   for (JobVertex jobVertex : topologiallySorted) { 
  4.    // 構建ExecutionJobVertex 
  5.    ExecutionJobVertex ejv = new ExecutionJobVertex( 
  6.      this, 
  7.      jobVertex, 
  8.      1, 
  9.      maxPriorAttemptsHistoryLength, 
  10.      rpcTimeout, 
  11.      globalModVersion, 
  12.      createTimestamp); 
  13.    // 連接IntermediateResultPartition和ExecutionVertex 
  14.    ev.connectToPredecessors(this.intermediateResults); 
  15.  } 
  16.    
  17.    
  18.   // ExecutionJobVertex 
  19.  public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException { 
  20.   List<JobEdge> inputs = jobVertex.getInputs(); 
  21.    
  22.   for (int num = 0; num < inputs.size(); num++) { 
  23.    JobEdge edge = inputs.get(num); 
  24.    IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); 
  25.    this.inputs.add(ires); 
  26.    int consumerIndex = ires.registerConsumer(); 
  27.     
  28.    for (int i = 0; i < parallelism; i++) { 
  29.     ExecutionVertex ev = taskVertices[i]; 
  30.     ev.connectSource(num, ires, edge, consumerIndex); 
  31.    } 
  32.   } 
  33.  } 

拆分計劃(可執行能力)

  1. // ExecutionVertex 
  2.  public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) { 
  3.  
  4.   final DistributionPattern pattern = edge.getDistributionPattern(); 
  5.   final IntermediateResultPartition[] sourcePartitions = source.getPartitions(); 
  6.  
  7.   ExecutionEdge[] edges; 
  8.  
  9.   switch (pattern) { 
  10.    // 下游 JobVertex 的輸入 partition 算法,如果是 forward 或 rescale 的話為 POINTWISE 
  11.    case POINTWISE: 
  12.     edges = connectPointwise(sourcePartitions, inputNumber); 
  13.     break; 
  14.    // 每一個并行的ExecutionVertex節點都會鏈接到源節點產生的所有中間結果IntermediateResultPartition 
  15.    case ALL_TO_ALL: 
  16.     edges = connectAllToAll(sourcePartitions, inputNumber); 
  17.     break; 
  18.  
  19.    default
  20.     throw new RuntimeException("Unrecognized distribution pattern."); 
  21.  
  22.   } 
  23.  
  24.   inputEdges[inputNumber] = edges; 
  25.   for (ExecutionEdge ee : edges) { 
  26.    ee.getSource().addConsumer(ee, consumerNumber); 
  27.   } 
  28.  } 
  29.  
  30.  
  31.  private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  32.   final int numSources = sourcePartitions.length; 
  33.   final int parallelism = getTotalNumberOfParallelSubtasks(); 
  34.  
  35.   // 如果并發數等于partition數,則一對一進行連接 
  36.   if (numSources == parallelism) { 
  37.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; 
  38.   } 
  39.   //  如果并發數大于partition數,則一對多進行連接 
  40.   else if (numSources < parallelism) { 
  41.  
  42.    int sourcePartition; 
  43.  
  44.    if (parallelism % numSources == 0) { 
  45.     int factor = parallelism / numSources; 
  46.     sourcePartition = subTaskIndex / factor; 
  47.    } 
  48.    else { 
  49.     float factor = ((float) parallelism) / numSources; 
  50.     sourcePartition = (int) (subTaskIndex / factor); 
  51.    } 
  52.  
  53.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) }; 
  54.   } 
  55.   // 果并發數小于partition數,則多對一進行連接 
  56.   else { 
  57.    if (numSources % parallelism == 0) { 
  58.     int factor = numSources / parallelism; 
  59.     int startIndex = subTaskIndex * factor; 
  60.  
  61.     ExecutionEdge[] edges = new ExecutionEdge[factor]; 
  62.     for (int i = 0; i < factor; i++) { 
  63.      edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber); 
  64.     } 
  65.     return edges; 
  66.    } 
  67.    else { 
  68.     float factor = ((float) numSources) / parallelism; 
  69.  
  70.     int start = (int) (subTaskIndex * factor); 
  71.     int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ? 
  72.       sourcePartitions.length : 
  73.       (int) ((subTaskIndex + 1) * factor); 
  74.  
  75.     ExecutionEdge[] edges = new ExecutionEdge[end - start]; 
  76.     for (int i = 0; i < edges.length; i++) { 
  77.      edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber); 
  78.     } 
  79.  
  80.     return edges; 
  81.    } 
  82.   } 
  83.  } 
  84.  
  85.  
  86.  private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  87.   ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length]; 
  88.  
  89.   for (int i = 0; i < sourcePartitions.length; i++) { 
  90.    IntermediateResultPartition irp = sourcePartitions[i]; 
  91.    edges[i] = new ExecutionEdge(irp, this, inputNumber); 
  92.   } 
  93.  
  94.   return edges; 
  95.  } 

Flink執行流程與源碼分析

返回ExecutionGraph

TaskManager

TaskManager啟動

  1. public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception { 
  2.         //主要初始化一堆的service,并新建一個org.apache.flink.runtime.taskexecutor.TaskExecutor 
  3.   final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration,resourceId); 
  4.   //調用TaskExecutor的start()方法 
  5.         taskManagerRunner.start(); 

TaskExecutor :submitTask()

接著的重要函數是shumitTask()函數,該函數會通過AKKA機制,向TaskManager發出一個submitTask的消息請求,TaskManager收到消息請求后,會執行submitTask()方法。(省略了部分代碼)。

  1. public CompletableFuture<Acknowledge> submitTask( 
  2.    TaskDeploymentDescriptor tdd, 
  3.    JobMasterId jobMasterId, 
  4.    Time timeout) { 
  5.  
  6.     jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); 
  7.     taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); 
  8.     
  9.    TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(xxx); 
  10.  
  11.    InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(xxx); 
  12.  
  13.    TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); 
  14.    CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); 
  15.  
  16.    LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); 
  17.    ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); 
  18.    PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); 
  19.  
  20.    final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( 
  21.     jobId, 
  22.     tdd.getAllocationId(), 
  23.     taskInformation.getJobVertexId(), 
  24.     tdd.getSubtaskIndex()); 
  25.  
  26.    final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); 
  27.  
  28.    final TaskStateManager taskStateManager = new TaskStateManagerImpl( 
  29.     jobId, 
  30.     tdd.getExecutionAttemptId(), 
  31.     localStateStore, 
  32.     taskRestore, 
  33.     checkpointResponder); 
  34.             //新建一個Task 
  35.    Task task = new Task(xxxx); 
  36.  
  37.    log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); 
  38.  
  39.    boolean taskAdded; 
  40.  
  41.    try { 
  42.     taskAdded = taskSlotTable.addTask(task); 
  43.    } catch (SlotNotFoundException | SlotNotActiveException e) { 
  44.     throw new TaskSubmissionException("Could not submit task.", e); 
  45.    } 
  46.  
  47.    if (taskAdded) { 
  48.        //啟動任務 
  49.     task.startTaskThread(); 
  50.  
  51.     return CompletableFuture.completedFuture(Acknowledge.get()); 
  52.    }  

最后創建執行Task的線程,然后調用startTaskThread()來啟動具體的執行線程,Task線程內部的run()方法承載了被執行的核心邏輯。

Task是執行在TaskExecutor進程里的一個線程,下面來看看其run方法

(1) 檢測當前狀態,正常情況為CREATED,如果是FAILED或CANCELING直接返回,其余狀態將拋異常。

(2) 讀取DistributedCache文件。

(3) 啟動ResultPartitionWriter和InputGate。

(4) 向taskEventDispatcher注冊partitionWriter。

(5) 根據nameOfInvokableClass加載對應的類并實例化。

(6) 將狀態置為RUNNING并執行invoke方法。

  1. public void run() { 
  2.         while (true) { 
  3.             ExecutionState current = this.executionState; 
  4.             invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); 
  5.             network.registerTask(this); 
  6.             Environment env = new RuntimeEnvironment(. . . . ); 
  7.             invokable.setEnvironment(env); 
  8.             //  actual task core work 
  9.             if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { 
  10.             } 
  11.             // notify everyone that we switched to running 
  12.             notifyObservers(ExecutionState.RUNNING, null); 
  13.             executingThread.setContextClassLoader(userCodeClassLoader); 
  14.             // run the invokable 
  15.             invokable.invoke(); 
  16.  
  17.             if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { 
  18.                 notifyObservers(ExecutionState.FINISHED, null); 
  19.             } 
  20.             Finally{ 
  21.                 // free the network resources 
  22.                 network.unregisterTask(this); 
  23.                 // free memory resources 
  24.                 if (invokable != null) { 
  25.                     memoryManager.releaseAll(invokable); 
  26.                 } 
  27.                 libraryCache.unregisterTask(jobId, executionId); 
  28.                 removeCachedFiles(distributedCacheEntries, fileCache); 

總結

整體的流程與架構可能三兩張圖或者三言兩語就可以勾勒出畫面,但是背后源碼的實現是艱辛的。源碼的復雜度和當初設計框架的抓狂感,我們只有想象。現在我們只是站在巨人的肩膀上去學習。

本篇的主題是"Flink架構與執行流程",做下小結,Flink on Yarn的提交執行流程:

1 Flink任務提交后,Client向HDFS上傳Flink的Jar包和配置。

2 向Yarn ResourceManager提交任務。

3 ResourceManager分配Container資源并通知對應的NodeManager啟動ApplicationMaster。

4 ApplicationMaster啟動后加載Flink的Jar包和配置構建環境。

5 啟動JobManager之后ApplicationMaster向ResourceManager申請資源啟動TaskManager。

6 ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節點的NodeManager啟動TaskManager。

7 NodeManager加載Flink的Jar包和配置構建環境并啟動TaskManager。

8 TaskManager啟動后向JobManager發送心跳包,并等待JobManager向其分配任務。

原文鏈接:https://mp.weixin.qq.com/s/iA_nMURBPMeqkSJ-kQUEjw

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: a免费网站 | 91精品国产91久久久久久最新 | 国产精品一二三在线观看 | 成年无码av片在线 | 欧美电影免费网站 | 久久久www | 欧美一区二区三区在线观看视频 | 狠狠操一区二区三区 | 欧美自拍视频 | 国产精品高清在线观看 | 精品伊人 | 免费黄色在线观看 | 日韩av成人在线观看 | 久久久一区二区三区 | 亚洲影视一区 | 99亚洲伊人久久精品影院红桃 | 九九久久久 | 成人性生交大片免费网站 | 国产成人一区二区三区在线观看 | 国产精品久久久久久久久久久久| 日韩中文字幕在线观看 | 欧美a在线 | 精品无人区一区二区三区动漫 | 欧美在线播放 | 午夜视频在线观看免费视频 | 亚洲精品免费在线观看视频 | 爱爱视频在线 | 九九热在线观看 | 日韩第一区 | 99热在线播放| 免费一级毛片免费播放 | 日日视频| 久久aⅴ乱码一区二区三区 一区二区精品视频 | 在线观看欧美日韩 | 天天澡天天狠天天天做 | 免费视频黄 | 97久久精品午夜一区二区 | 国产一区二区三区在线免费 | 99精品一区二区 | 色综合天天综合网国产成人网 | 国产精品久久久久国产a级 成人a在线视频 |