Alink漫談(五) : 迭代計算和Superstep_台中搬家公司

※推薦台中搬家公司優質服務,可到府估價

台中搬鋼琴,台中金庫搬運,中部廢棄物處理,南投縣搬家公司,好幫手搬家,西屯區搬家

Alink漫談(五) : 迭代計算和Superstep

目錄

  • Alink漫談(五) : 迭代計算和Superstep
    • 0x00 摘要
    • 0x01 緣由
    • 0x02 背景概念
      • 2.1 四層執行圖
      • 2.2 Task和SubTask
      • 2.3 如何劃分 Task 的依據
      • 2.4 JobGraph
      • 2.5 BSP模型和Superstep
        • BSP模型
        • BSP模型的實現
        • Flink-Gelly
    • 0x03 Flink的迭代算法(superstep-based)
      • 3.1 Bulk Iterate
      • 3.2 迭代機制
    • 0x04 Alink如何使用迭代
    • 0x05 深入Flink源碼和runtime來驗證
      • 5.1 向Flink提交Job
      • 5.2 生成JobGraph
      • 5.3 迭代對應的Task
        • 5.3.1 IterationHeadTask
        • 5.3.2 IterationIntermediateTask
        • 5.3.3 IterationTailTask
          • 如何和Head建立聯繫
          • 如何把用戶返回的數值傳給Head
        • 5.3.4 IterationSynchronizationSinkTask
      • 5.4 superstep
    • 0x06 結合KMeans代碼看superset
      • 6.1 K-means算法概要
      • 6.2 KMeansPreallocateCentroid
      • 6.3 KMeansAssignCluster 和 KMeansUpdateCentroids
      • 6.4 KMeansOutputModel
    • 0x07 參考

0x00 摘要

Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平台,是業界首個同時支持批式算法、流式算法的機器學習平台。迭代算法在很多數據分析領域會用到,比如機器學習或者圖計算。本文將通過Superstep入手看看Alink是如何利用Flink迭代API來實現具體算法。

因為Alink的公開資料太少,所以以下均為自行揣測,肯定會有疏漏錯誤,希望大家指出,我會隨時更新。

0x01 緣由

為什麼提到 Superstep 這個概念,是因為在擼KMeans代碼的時候,發現幾個很奇怪的地方,比如以下三個步驟中,都用到了context.getStepNo(),而且會根據其數值的不同進行不同業務操作:

public class KMeansPreallocateCentroid extends ComputeFunction {
    public void calc(ComContext context) {
        LOG.info("liuhao  KMeansPreallocateCentroid ");
        if (context.getStepNo() == 1) {
          /** 具體業務邏輯代碼
           * Allocate memory for pre-round centers and current centers.
           */        
        }
    }
}  

public class KMeansAssignCluster extends ComputeFunction {
    public void calc(ComContext context) {
        ......
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }
      /** 具體業務邏輯代碼
       * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
       */
    }
}

public class KMeansUpdateCentroids extends ComputeFunction {
    public void calc(ComContext context) {
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }
      /** 具體業務邏輯代碼
       * Update the centroids based on the sum of points and point number belonging to the same cluster.
       */
    }

查看ComContext的源碼,發現stepNo的來源居然是runtimeContext.getSuperstepNumber()

public class ComContext {
   private final int taskId;
   private final int numTask;
   private final int stepNo; // 對,就是這裏
   private final int sessionId;
	public ComContext(int sessionId, IterationRuntimeContext runtimeContext) {
		this.sessionId = sessionId;
		this.numTask = runtimeContext.getNumberOfParallelSubtasks();
		this.taskId = runtimeContext.getIndexOfThisSubtask();
		this.stepNo = runtimeContext.getSuperstepNumber(); // 這裏進行了變量初始化
	}  
	/**
	 * Get current iteration step number, the same as {@link IterationRuntimeContext#getSuperstepNumber()}.
	 * @return iteration step number.
	 */
	public int getStepNo() {
		return stepNo; // 這裡是使用
	}  
}

看到這裡有的兄弟可能會虎軀一震,這不是BSP模型的概念嘛。我就是想寫個KMeans算法,怎麼除了MPI模型,還要考慮BSP模型。下面就讓我們一步一步挖掘究竟Alink都做了什麼工作。

0x02 背景概念

2.1 四層執行圖

在 Flink 中的執行圖可以分為四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖

  • StreamGraph:Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。
  • JobGraph:StreamGraph 經過優化後生成了 JobGraph, JobGraph是提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。JobGraph是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。
  • ExecutionGraph:JobManager 根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是調度層最核心的數據結構。
  • 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task 后形成的“圖”,並不是一個具體的數據結構。

2.2 Task和SubTask

因為某種原因,Flink內部對這兩個概念的使用本身就有些混亂:在Task Manager里這個subtask的概念由一個叫Task的類來實現。Task Manager里談論的Task對象實際上對應的是ExecutionGraph里的一個subtask。

所以這兩個概念需要理清楚。

  • Task(任務) :Task對應JobGraph的一個節點,是一個算子Operator。Task 是一個階段多個功能相同 subTask 的集合,類似於 Spark 中的 TaskSet。
  • subTask(子任務) :subTask 是 Flink 中任務最小執行單元,是一個 Java 類的實例,這個 Java 類中有屬性和方法,完成具體的計算邏輯。在ExecutionGraph里Task被分解為多個并行執行的subtask 。每個subtask作為一個excution分配到Task Manager里執行。
  • Operator Chains(算子鏈) :沒有 shuffle 的多個算子合併在一個 subTask 中,就形成了 Operator Chains,類似於 Spark 中的 Pipeline。Operator subTask 的數量指的就是算子的并行度。同一程序的不同算子也可能具有不同的并行度(因為可以通過 setParallelism() 方法來修改并行度)。

Flink 中的程序本質上是并行的。在執行期間,每一個算子 Operator (Transformation)都有一個或多個算子subTask(Operator SubTask),每個算子的 subTask 之間都是彼此獨立,並在不同的線程中執行,並且可能在不同的機器或容器上執行。

Task( SubTask) 是一個Runnable 對象, Task Manager接受到TDD 後會用它實例化成一個Task對象, 並啟動一個線程執行Task的Run方法。

TaskDeploymentDescriptor(TDD) : 是Task Manager在submitTask是提交給TM的數據結構。 他包含了關於Task的所有描述信息。比如:

  • TaskInfo : 包含該Task 執行的java 類,該類是某個 AbstractInvokable的實現類 , 當然也是某個operator的實現類 (比如DataSourceTask, DataSinkTask, BatchTask,StreamTask 等)。
  • IG描述 :通常包含一個或兩個InputGateDeploymentDescriptor(IGD)。
  • 目標RP的描述: ParitionId, PartitionType, RS個數等等。

2.3 如何劃分 Task 的依據

在以下情況下會重新劃分task

  • 并行度發生變化時
  • keyBy() /window()/apply() 等發生 Rebalance 重新分配;
  • 調用 startNewChain() 方法,開啟一個新的算子鏈;
  • 調用 diableChaining()方法,即:告訴當前算子操作不使用 算子鏈 操作。

比如有如下操作

DataStream<String> text = env.socketTextStream(hostname, port);

DataStream counts = text
    .filter(new FilterClass())
    .map(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(10))
    .sum(2)

那麼StreamGraph的轉換流是:

 Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink

其task是四個:

  • Source –> Filter –> Map
  • keyBy
  • timeWindow
  • Sink

其中每個task又會被分成分若干subtask。在執行時,一個Task會被并行化成若干個subTask實例進行執行,一個subTask對應一個執行線程。

2.4 JobGraph

以上說了這麼多,就是要說jobGraph和subtask,因為本文中我們在分析源碼和調試時候,主要是從jobGraph這裏開始入手來看subtask

JobGraph是在StreamGraph的基礎之上,對StreamNode進行了關聯合併的操作,比如對於source -> flatMap -> reduce -> sink 這樣一個數據處理鏈,當source和flatMap滿足鏈接的條件時,可以可以將兩個操作符的操作放到一個線程并行執行,這樣可以減少網絡中的數據傳輸,由於在source和flatMap之間的傳輸的數據也不用序列化和反序列化,所以也提高了程序的執行效率。

相比流圖(StreamGraph)以及批處理優化計劃(OptimizedPlan),JobGraph發生了一些變化,已經不完全是“靜態”的數據結構了,因為它加入了中間結果集(IntermediateDataSet)這一“動態”概念。

作業頂點(JobVertex)、中間數據集(IntermediateDataSet)、作業邊(JobEdge)是組成JobGraph的基本元素。這三個對象彼此之間互為依賴:

  • 一個JobVertex關聯着若干個JobEdge作為輸入端以及若干個IntermediateDataSet作為其生產的結果集;每個JobVertex都有諸如并行度和執行代碼等屬性。
  • 一個IntermediateDataSet關聯着一個JobVertex作為生產者以及若干個JobEdge作為消費者;
  • 一個JobEdge關聯着一個IntermediateDataSet可認為是源以及一個JobVertex可認為是目標消費者;

那麼JobGraph是怎麼組織並存儲這些元素的呢?其實JobGraph只以Map的形式存儲了所有的JobVertex,鍵是JobVertexID:

private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();

至於其它的元素,通過JobVertex都可以根據關係找尋到。需要注意的是,用於迭代的反饋邊(feedback edge)當前並不體現在JobGraph中,而是被內嵌在特殊的JobVertex中通過反饋信道(feedback channel)在它們之間建立關係。

2.5 BSP模型和Superstep

BSP模型

BSP模型是并行計算模型的一種。并行計算模型通常指從并行算法的設計和分析出發,將各種并行計算機(至少某一類并行計算機)的基本特徵抽象出來,形成一個抽象的計算模型。

BSP模型是一種異步MIMD-DM模型(DM: distributed memory,SM: shared memory),BSP模型支持消息傳遞系統,塊內異步并行,塊間顯式同步,該模型基於一個master協調,所有的worker同步(lock-step)執行, 數據從輸入的隊列中讀取。

BSP計算模型不僅是一種體繫結構模型,也是設計并行程序的一種方法。BSP程序設計準則是整體同步(bulk synchrony),其獨特之處在於超步(superstep)概念的引入。一個BSP程序同時具有水平和垂直兩個方面的結構。從垂直上看,一個BSP程序由一系列串行的超步(superstep)組成。

BSP模型的實現

BSP模型的實現大概舉例如下:

  • Pregel :Google的大規模圖計算框架,首次提出了將BSP模型應用於圖計算,具體請看Pregel——大規模圖處理系統,不過至今未開源。
  • Apache Giraph :ASF社區的Incubator項目,由Yahoo!貢獻,是BSP的java實現,專註於迭代圖計算(如pagerank,最短連接等),每一個job就是一個沒有reducer過程的hadoop job。
  • Apache Hama :也是ASF社區的Incubator項目,與Giraph不同的是它是一個純粹的BSP模型的java實現,並且不單單是用於圖計算,意在提供一個通用的BSP模型的應用框架。

Flink-Gelly

Flink-Gelly利用Flink的高效迭代算子來支持海量數據的迭代式圖處理。目前,Flink Gelly提供了“Vertex-Centric”,“Scatter-Gather”以及“Gather-Sum-Apply”等計算模型的實現。

“Vertex-Centric”迭代模型也就是我們經常聽到的“Pregel”,是一種從Vertex角度出發的圖計算方式。其中,同步地迭代計算的步驟稱之為“superstep”。在每個“superstep”中,每個頂點都執行一個用戶自定義的函數,且頂點之間通過消息進行通信,當一個頂點知道圖中其他任意頂點的唯一ID時,該頂點就可以向其發送一條消息。

但是實際上,KMeans不是圖處理,Alink也沒有基於Flink-Gelly來構建。也許只是借鑒了其概念。所以我們還需要再探尋。

0x03 Flink的迭代算法(superstep-based)

迭代算法在很多數據分析領域會用到,比如機器學習或者圖計算。為了從大數據中抽取有用信息,這個時候往往會需要在處理的過程中用到迭代計算。

所謂迭代運算,就是給定一個初值,用所給的算法公式計算初值得到一个中間結果,然後將中間結果作為輸入參數進行反覆計算,在滿足一定條件的時候得到計算結果。

大數據處理框架很多,比如spark,mr。實際上這些實現迭代計算都是很困難的。

Flink直接支持迭代計算。Flink實現迭代的思路也是很簡單,就是實現一個step函數,然後將其嵌入到迭代算子中去。有兩種迭代操作算子: Iterate和Delta Iterate。兩個操作算子都是在未收到終止迭代信號之前一直調用step函數。

3.1 Bulk Iterate

這種迭代方式稱為全量迭代,它會將整個數據輸入,經過一定的迭代次數,最終得到你想要的結果。

迭代操作算子包括了簡單的迭代形式:每次迭代,step函數會消費全量數據(本次輸入和上次迭代的結果),然後計算得到下輪迭代的輸出(例如,map,reduce,join等)

迭代過程主要分為以下幾步:

  • Iteration Input(迭代輸入):是初始輸入值或者上一次迭代計算的結果。
  • Step Function(step函數):每次迭代都會執行step函數。它迭代計算DataSet,由一系列的operator組成,比如map,flatMap,join等,取決於具體的業務邏輯。
  • Next Partial Solution(中間結果):每一次迭代計算的結果,被發送到下一次迭代計算中。
  • Iteration Result(迭代結果):最後一次迭代輸出的結果,被輸出到datasink或者發送到下游處理。

它迭代的結束條件是:

  • 達到最大迭代次數
  • 自定義收斂聚合函數

編程的時候,需要調用iterate(int),該函數返回的是一個IterativeDataSet,當然我們可以對它進行一些操作,比如map等。Iterate函數唯一的參數是代表最大迭代次數。

迭代是一個環。我們需要進行閉環操作,那麼這時候就要用到closeWith(Dataset)操作了,參數就是需要循環迭代的dataset。也可以可選的指定一個終止標準,操作closeWith(DataSet, DataSet),可以通過判斷第二個dataset是否為空,來終止迭代。如果不指定終止迭代條件,迭代就會在迭代了最大迭代次數后終止。

3.2 迭代機制

DataSet API引進了獨特的同步迭代機制(superstep-based),僅限於用在有界的流。

我們將迭代操作算子的每個步驟函數的執行稱為單個迭代。在并行設置中,在迭代狀態的不同分區上并行計算step函數的多個實例。在許多設置中,對所有并行實例上的step函數的一次評估形成了所謂的superstep,這也是同步的粒度。因此,迭代的所有并行任務都需要在初始化下一個superstep之前完成superstep。終止準則也將被評估為superstep同步屏障。

下面是Apache原文

We referred to each execution of the step function of an iteration operator as a single iteration. In parallel setups, multiple instances of the step function are evaluated in parallel on different partitions of the iteration state. In many settings, one evaluation of the step function on all parallel instances forms a so called superstep, which is also the granularity of synchronization. Therefore, all parallel tasks of an iteration need to complete the superstep, before a next superstep will be initialized. Termination criteria will also be evaluated at superstep barriers.

下面是apache原圖

概括如下:

每次迭代都是一個superstep
    每次迭代中有若干subtask在不同的partition上分別執行step
      	 每個step有一個HeadTask,若干IntermediateTask,一個TailTask
    每個superstep有一個SynchronizationSinkTask 同步,因為迭代的所有并行任務需要在下一個迭代前完成

由此我們可以知道,superstep這是Flink DataSet API的概念,但是你從這裡能夠看到BSP模型的影子,比如:

  • 在傳統的BSP模型中,一個superstep被分為3步: 本地的計算, 消息的傳遞, 同步的barrier.
  • Barrier Synchronization又叫障礙同步或柵欄同步。每一次同步也是一個超步的完成和下一個超步的開始;
  • Superstep超步 是一次計算迭代,從起始每往前步進一層對應一個超步。
  • 程序該什麼時候結束是程序自己控制

0x04 Alink如何使用迭代

KMeansTrainBatchOp.iterateICQ函數中,生成了一個IterativeComQueue,而IterativeComQueue之中就用到了superstep-based迭代。

return new IterativeComQueue()
   .initWithPartitionedData(TRAIN_DATA, data)
   .initWithBroadcastData(INIT_CENTROID, initCentroid)
   .initWithBroadcastData(KMEANS_STATISTICS, statistics)
   .add(new KMeansPreallocateCentroid())
   .add(new KMeansAssignCluster(distance))
   .add(new AllReduce(CENTROID_ALL_REDUCE))
   .add(new KMeansUpdateCentroids(distance))
   .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) // 終止條件
   .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) 
   .setMaxIter(maxIter) // 迭代最大次數
   .exec();

而BaseComQueue.exec函數中則有:

public DataSet<Row> exec() {
   IterativeDataSet<byte[]> loop // Flink 迭代API
      = loopStartDataSet(executionEnvironment)
      .iterate(maxIter);
     // 後續操作能看出來,之前添加在queue上的比如KMeansPreallocateCentroid,都是在loop之上運行的。
  		if (null == compareCriterion) {
        loopEnd = loop.closeWith...
     	} else {     
        // compare Criterion.
        DataSet<Boolean> criterion = input ... compareCriterion
        loopEnd = loop.closeWith( ... criterion ... )
      }   
}

再仔細研究代碼,我們可以看出:

superstep包括:

.add(new KMeansPreallocateCentroid())
.add(new KMeansAssignCluster(distance))
.add(new AllReduce(CENTROID_ALL_REDUCE))
.add(new KMeansUpdateCentroids(distance))

終止標準就是

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

擁有後台管理系統的網站,將擁有強大的資料管理與更新功能,幫助您隨時新增網站的內容並節省網站開發的成本。

利用KMeansIterTermination構建了一個RichMapPartitionFunction作為終止標準。最後結束時候調用 KMeansOutputModel完成業務操作。

最大循環就是

.setMaxIter(maxIter)

於是我們可以得出結論,superstep-based Bulk Iterate 迭代算子是用來實現整體KMeans算法,KMeans算法就是一個superstep進行迭代。但是在superstep內容如果需要通訊或者柵欄同步,則採用了MPI的allReduce。

0x05 深入Flink源碼和runtime來驗證

我們需要深入到Flink內部去挖掘驗證,如果大家有興趣,可以參見下面調用棧,自己添加斷點來研究。

execute:56, LocalExecutor (org.apache.flink.client.deployment.executors)
executeAsync:944, ExecutionEnvironment (org.apache.flink.api.java)
execute:860, ExecutionEnvironment (org.apache.flink.api.java)
execute:844, ExecutionEnvironment (org.apache.flink.api.java)
collect:413, DataSet (org.apache.flink.api.java)
sinkFrom:44, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
sinkFrom:20, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
link:89, BatchOperator (com.alibaba.alink.operator.batch)
linkTo:239, BatchOperator (com.alibaba.alink.operator.batch)
print:337, BatchOperator (com.alibaba.alink.operator.batch)
main:35, KMeansExample (com.alibaba.alink)

5.1 向Flink提交Job

Alink和Flink構建聯繫,是在print調用中完成的。因為是本地調試,Flink會啟動一個miniCluster,然後會做如下操作。

  • 首先生成執行計劃Plan。Plan以數據流形式來表示批處理程序,但它只是批處理程序最初的表示,然後計劃會被優化以生成更高效的方案OptimizedPlan。
  • 然後,計劃被編譯生成JobGraph。這個圖是要交給flink去生成task的圖。
  • 生成一系列配置。
  • 將JobGraph和配置交給flink集群去運行。如果不是本地運行的話,還會把jar文件通過網絡發給其他節點。
  • 以本地模式運行的話,可以看到啟動過程,如啟動性能度量、web模塊、JobManager、ResourceManager、taskManager等等。

當我們看到了submitJob調用,就知道KMeans代碼已經和Flink構建了聯繫

@Internal
public class LocalExecutor implements PipelineExecutor {

   public static final String NAME = "local";

   @Override
   public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {

      // we only support attached execution with the local executor.
      checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));

      final JobGraph jobGraph = getJobGraph(pipeline, configuration);
      final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);
      final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);

      CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);

      jobIdFuture
            .thenCompose(clusterClient::requestJobResult)
            .thenAccept((jobResult) -> clusterClient.shutDownCluster());

      return jobIdFuture.thenApply(jobID ->
            new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
   }

5.2 生成JobGraph

生成jobGraph的具體流程是:

  • IterativeDataSet.closeWith會生成一個BulkIterationResultSet。
  • PrintBatchOp.sinkFrom中會調用到ExecutionEnvironment.executeAsync
  • 調用createProgramPlan構建一個Plan
  • OperatorTranslation.translate函數發現if (dataSet instanceof BulkIterationResultSet),則調用translateBulkIteration(bulkIterationResultSet);
  • 這時候生成了執行計劃Plan
  • ExecutionEnvironment.executeAsync調用LocalExecutor.execute
  • 然後調用FlinkPipelineTranslationUtil.getJobGraph來生成jobGraph
  • GraphCreatingVisitor.preVisit中會判斷 if (c instanceof BulkIterationBase),以生成BulkIterationNode
  • PlanTranslator.translateToJobGraph會調用到JobGraphGenerator.compileJobGraph,最終調用到createBulkIterationHead就生成了迭代處理的Head。
  • 最後將jobGraph提交給Cluster ,jobGraph 變形為 ExceutionGraph在JM和TM上執行。

5.3 迭代對應的Task

前面代碼中,getJobGraph函數作用是生成了job graph。

然後 JobManager 根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是調度層最核心的數據結構。

最後 JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task。

所以我們需要看看最終運行時候,迭代API對應着哪些Task。

針對IterativeDataSet,即superstep-based Bulk Iterate,Flink生成了如下的task。

  • IterationHeadTask
  • IterationIntermediateTask
  • IterationTailTask
  • IterationSynchronizationSinkTask

5.3.1 IterationHeadTask

IterationHeadTask主要作用是協調一次迭代。

它會讀取初始輸入,和迭代Tail建立一個BlockingBackChannel。在成功處理輸入之後,它會發送EndOfSuperstep事件給自己的輸出。它在每次superstep之後會聯繫 synchronization task,等到自己收到一個用來同步的AllWorkersDoneEvent。AllWorkersDoneEvent表示所有其他的heads已經完成了自己的迭代。

下一次迭代時候,上一次迭代中tail的輸出就經由backchannel傳輸,形成了head的輸入。何時進入到下一個迭代,是由HeadTask完成的。一旦迭代完成,head將發送TerminationEvent給所有和它關聯的task,告訴他們shutdown。

				barrier.waitForOtherWorkers();

				if (barrier.terminationSignaled()) {
					requestTermination();
					nextStepKickoff.signalTermination();
				} else {
					incrementIterationCounter();
					String[] globalAggregateNames = barrier.getAggregatorNames();
					Value[] globalAggregates = barrier.getAggregates();
					aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
          // 在這裏發起下一次Superstep。
					nextStepKickoff.triggerNextSuperstep();
				}
			}

IterationHeadTask是在JobGraphGenerator.createBulkIterationHead中構建的。其例子如下:

"PartialSolution (Bulk Iteration) (org.apache.flink.runtime.iterative.task.IterationHeadTask)"

5.3.2 IterationIntermediateTask

IterationIntermediateTask是superstep中間段的task,其將傳輸EndOfSuperstepEvent和TerminationEvent給所有和它關聯的tasks。此外,IterationIntermediateTask能更新the workset或者the solution set的迭代狀態。

如果迭代狀態被更新,本task的輸出將傳送回IterationHeadTask,在這種情況下,本task將作為head再次被安排。

IterationIntermediateTask的例子如下:

 "MapPartition (computation@KMeansUpdateCentroids) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
 "Combine (SUM(0), at kMeansPlusPlusInit(KMeansInitCentroids.java:135) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
 "MapPartition (AllReduceSend) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
"Filter (Filter at kMeansPlusPlusInit(KMeansInitCentroids.java:130)) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   

5.3.3 IterationTailTask

IterationTailTask是迭代的最末尾。如果迭代狀態被更新,本task的輸出將通過BlockingBackChannel傳送回IterationHeadTask,反饋給迭代頭就意味着一個迭代完整邏輯的完成,那麼就可以關閉這個迭代閉合環了。這種情況下,本task將在head所在的實例上重新被調度。

這裡有幾個關鍵點需要注意:

如何和Head建立聯繫

Flink有一個BlockingQueueBroker類,這是一個阻塞式的隊列代理,它的作用是對迭代併發進行控制。Broker是單例的,迭代頭任務和尾任務會生成同樣的broker ID,所以頭尾在同一個JVM中會基於相同的dataChannel進行通信。dataChannel由迭代頭創建。

IterationHeadTask中會生成BlockingBackChannel,這是一個容量為1的阻塞隊列。

// 生成channel
BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, this.getIOManager())); 

// 然後block在這裏,等待Tail
superstepResult = backChannel.getReadEndAfterSuperstepEnded();

IterationTailTask則是如下:

// 在基類得到channel,因為是單例,所以會得到同一個
worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey());

// notify iteration head if responsible for workset update 在這裏通知Head
worksetBackChannel.notifyOfEndOfSuperstep();

而兩者都是利用如下辦法來建立聯繫,在同一個subtask中會使用同一個brokerKey,這樣首尾就聯繫起來了。

public String brokerKey() {
    if (this.brokerKey == null) {
        int iterationId = this.config.getIterationId();
        this.brokerKey = this.getEnvironment().getJobID().toString() + '#' + iterationId + '#' + this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
    }

    return this.brokerKey;
}
如何把用戶返回的數值傳給Head

這是通過output.collect來完成的。

首先,在Tail初始化時候,會生成一個outputCollector,這個outputCollector會被設置為本task的輸出outputCollector。這樣就保證了用戶函數的輸出都會轉流到outputCollector。

而outputCollector的輸出就是worksetBackChannel的輸出,這裏設置為同一個instance。這樣用戶輸出就輸出到backChannel中。

	@Override
	protected void initialize() throws Exception {
		super.initialize();
    
		// set the last output collector of this task to reflect the iteration tail state update:
		// a) workset update,
		// b) solution set update, or
		// c) merged workset and solution set update

		Collector<OT> outputCollector = null;
		if (isWorksetUpdate) {
      // 生成一個outputCollector
			outputCollector = createWorksetUpdateOutputCollector();

			// we need the WorksetUpdateOutputCollector separately to count the collected elements
			if (isWorksetIteration) {
				worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
			}
		}
    
    ......
    // 把outputCollector設置為本task的輸出
		setLastOutputCollector(outputCollector);
	}

outputCollector的輸出就是worksetBackChannel的輸出buffer,這裏設置為同一個instance。

	protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) {
		DataOutputView outputView = worksetBackChannel.getWriteEnd();
		TypeSerializer<OT> serializer = getOutputSerializer();
		return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate);
	}

運行時候如下:

	@Override
	public void run() throws Exception {

		SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());

		while (this.running && !terminationRequested()) {

      // 用戶在這裏輸出,最後會輸出到output.collect,也就是worksetBackChannel的輸出buffer。
			super.run();

      // 這時候以及輸出到channel完畢,只是通知head進行讀取。
			if (isWorksetUpdate) {
				// notify iteration head if responsible for workset update
				worksetBackChannel.notifyOfEndOfSuperstep();
			} else if (isSolutionSetUpdate) {
				// notify iteration head if responsible for solution set update
				solutionSetUpdateBarrier.notifySolutionSetUpdate();
			}

      ...
	}

IterationTailTask例子如下:

"Pipe (org.apache.flink.runtime.iterative.task.IterationTailTask)"

5.3.4 IterationSynchronizationSinkTask

IterationSynchronizationSinkTask作用是同步所有的iteration heads,IterationSynchronizationSinkTask被是實現成一個 output task。其只是用來協調,不處理任何數據。

在每一次superstep,IterationSynchronizationSinkTask只是等待直到它從每一個head都收到一個WorkerDoneEvent。這表示下一次superstep可以開始了。

這裏需要注意的是 SynchronizationSinkTask 如何等待各個并行度的headTask。比如Flink的并行度是5,那麼SynchronizationSinkTask怎麼做到等待這5個headTask。

在IterationSynchronizationSinkTask中,註冊了SyncEventHandler來等待head的WorkerDoneEvent。

this.eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, this.aggregators, this.getEnvironment().getUserClassLoader());
this.headEventReader.registerTaskEventListener(this.eventHandler, WorkerDoneEvent.class);

在SyncEventHandler中,我們可以看到,在構建時候,numberOfEventsUntilEndOfSuperstep就被設置為并行度,每次收到一個WorkerDoneEvent,workerDoneEventCounter就遞增,當等於numberOfEventsUntilEndOfSuperstep,即并行度時候,就說明本次superstep中,所有headtask都成功了。

    private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) {
        if (this.endOfSuperstep) {
            throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
        } else {
          // 每次遞增
            ++this.workerDoneEventCounter;
            String[] aggNames = workerDoneEvent.getAggregatorNames();
            Value[] aggregates = workerDoneEvent.getAggregates(this.userCodeClassLoader);
            if (aggNames.length != aggregates.length) {
                throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
            } else {
                for(int i = 0; i < aggNames.length; ++i) {
                    Aggregator<Value> aggregator = (Aggregator)this.aggregators.get(aggNames[i]);
                    aggregator.aggregate(aggregates[i]);
                }

              // numberOfEventsUntilEndOfSuperstep就是并行度,等於并行度時候就說明所有head都成功了。
                if (this.workerDoneEventCounter % this.numberOfEventsUntilEndOfSuperstep == 0) {
                    this.endOfSuperstep = true;
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

IterationSynchronizationSinkTask的例子如下:

"Sync (BulkIteration (Bulk Iteration)) (org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask)"

5.4 superstep

綜上所述,我們最終得到superstep如下:

***** 文字描述如下 *****
  
每次迭代都是一個superstep
  每次迭代中有若干subtask在不同的partition上分別執行step
     每個step有一個HeadTask,若干IntermediateTask,一個TailTask
  每個superstep有一個SynchronizationSinkTask
  
***** 偽代碼大致如下 *****
  
for maxIter :
  begin superstep
      for maxSubTask :
         begin step
           IterationHeadTask
           IterationIntermediateTask
           IterationIntermediateTask
           ...
           IterationIntermediateTask
           IterationIntermediateTask
           IterationTailTask
         end step
    IterationSynchronizationSinkTask
  end superstep

0x06 結合KMeans代碼看superset

6.1 K-means算法概要

K-means算法的過程,為了盡量不用數學符號,所以描述的不是很嚴謹,大概就是這個意思,“物以類聚、人以群分”:

  1. 首先輸入k的值,即我們希望將數據集經過聚類得到k個分組。
  2. 從數據集中隨機選擇k個數據點作為初始大哥(質心,Centroid)
  3. 對集合中每一個小弟,計算與每一個大哥的距離(距離的含義後面會講),離哪個大哥距離近,就跟定哪個大哥。
  4. 這時每一個大哥手下都聚集了一票小弟,這時候召開人民代表大會,每一群選出新的大哥(其實是通過算法選出新的質心)。
  5. 如果新大哥和老大哥之間的距離小於某一個設置的閾值(表示重新計算的質心的位置變化不大,趨於穩定,或者說收斂),可以認為我們進行的聚類已經達到期望的結果,算法終止。
  6. 如果新大哥和老大哥距離變化很大,需要迭代3~5步驟。

6.2 KMeansPreallocateCentroid

KMeansPreallocateCentroid也是superstep一員,但是只有context.getStepNo() == 1的時候,才會進入實際業務邏輯,預分配Centroid。當superstep為大於1的時候,本task會執行,但不會進入具體業務代碼。

public class KMeansPreallocateCentroid extends ComputeFunction {
    private static final Logger LOG = LoggerFactory.getLogger(KMeansPreallocateCentroid.class);

    @Override
    public void calc(ComContext context) {
        // 每次superstep都會進到這裏
        LOG.info("  KMeansPreallocateCentroid 我每次都會進的呀   ");
        if (context.getStepNo() == 1) {
          // 實際預分配業務只進入一次
        }
    }
}

6.3 KMeansAssignCluster 和 KMeansUpdateCentroids

KMeansAssignCluster 作用是為每個點(point)計算最近的聚類中心,為每個聚類中心的點坐標的計數和求和。

KMeansUpdateCentroids 作用是基於計算出來的點計數和坐標,計算新的聚類中心。

Alink在整個計算過程中維護一個特殊節點來記住待求中心點當前的結果。

這就是為啥迭代時候需要區分奇數次和偶數次的原因了。奇數次就表示老大哥,偶數次就表示新大哥。每次superstep只會計算一批大哥,留下另外一批大哥做距離比對。

另外要注意的一點是:普通的迭代計算,是通過Tail給Head回傳用戶數據,但是KMeans這裏的實現並沒有採用這個辦法,而是把計算出來的中心點都存在共享變量中,在各個intermediate之間互相交互。

public class KMeansAssignCluster extends ComputeFunction {
    public void calc(ComContext context) {
        ......
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }
      /** 具體業務邏輯代碼
       * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
       */
    }
}

public class KMeansUpdateCentroids extends ComputeFunction {
    public void calc(ComContext context) {
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }
      /** 具體業務邏輯代碼
       * Update the centroids based on the sum of points and point number belonging to the same cluster.
       */
    }

6.4 KMeansOutputModel

這裏要特殊說明,因為KMeansOutputModel是最終輸出模型,而KMeans算法的實現是:所有subtask都擁有所有中心點,就是說所有subtask都會有相同的模型,就沒有必要全部輸出,所以這裏限定了第一個subtask才能輸出,其他的都不輸出。

	@Override
	public List <Row> calc(ComContext context) {
    // 只有第一個subtask才輸出模型數據。
		if (context.getTaskId() != 0) {
			return null;
		}

    ....
      
		modelData.params = new KMeansTrainModelData.ParamSummary();
		modelData.params.k = k;
		modelData.params.vectorColName = vectorColName;
		modelData.params.distanceType = distanceType;
		modelData.params.vectorSize = vectorSize;
		modelData.params.latitudeColName = latitudeColName;
		modelData.params.longtitudeColName = longtitudeColName;

		RowCollector collector = new RowCollector();
		new KMeansModelDataConverter().save(modelData, collector);
		return collector.getRows();
	}

0x07 參考

幾種并行計算模型的區別(BSP LogP PRAM)

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/iterations.html
聚類、K-Means、例子、細節

Flink-Gelly:Iterative Graph Processing

從BSP模型到Apache Hama

Flink DataSet迭代運算

幾種并行計算模型的區別(BSP LogP PRAM)

Flink架構,源碼及debug

Flink 之 Dataflow、Task、subTask、Operator Chains、Slot 介紹

Flink 任務和調度

Flink運行時之生成作業圖

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

還在煩惱搬家費用要多少哪?台中大展搬家線上試算搬家費用,從此不再擔心「物品怎麼計費」、「多少車才能裝完」

您可能也會喜歡…