Spark學習筆記(三)-Spark Streaming

Spark Streaming支持實時數據流的可擴展(scalable)、高吞吐(high-throughput)、容錯(fault-tolerant)的流處理(stream processing)。

 

                                                    架構圖

 

特性如下:

 

  • 可線性伸縮至超過數百個節點;

  • 實現亞秒級延遲處理;

  • 可與Spark批處理和交互式處理無縫集成;

  • 提供簡單的API實現複雜算法;

  • 更多的流方式支持,包括Kafka、Flume、Kinesis、Twitter、ZeroMQ等。

 

原理

 

Spark在接收到實時輸入數據流后,將數據劃分成批次(divides the data into batches),然後轉給Spark Engine處理,按批次生成最後的結果流(generate the final stream of results in batches)。 

 

 

API

 

DStream

 

DStream(Discretized Stream,離散流)是Spark Stream提供的高級抽象連續數據流。

 

  • 組成:一個DStream可看作一個RDDs序列。

  • 核心思想:將計算作為一系列較小時間間隔的、狀態無關的、確定批次的任務,每個時間間隔內接收的輸入數據被可靠存儲在集群中,作為一個輸入數據集。

 

 

  • 特性:一個高層次的函數式編程API、強一致性以及高校的故障恢復。

  • 應用程序模板:

  • 模板1

  • 模板2

 

WordCount示例

 

 

Input DStream

 

Input DStream是一種從流式數據源獲取原始數據流的DStream,分為基本輸入源(文件系統、Socket、Akka Actor、自定義數據源)和高級輸入源(Kafka、Flume等)。

 

  • Receiver:
  • 每個Input DStream(文件流除外)都會對應一個單一的Receiver對象,負責從數據源接收數據並存入Spark內存進行處理。應用程序中可創建多個Input DStream并行接收多個數據流。

  • 每個Receiver是一個長期運行在Worker或者Executor上的Task,所以會佔用該應用程序的一個核(core)。如果分配給Spark Streaming應用程序的核數小於或等於Input DStream個數(即Receiver個數),則只能接收數據,卻沒有能力全部處理(文件流除外,因為無需Receiver)。

  • Spark Streaming已封裝各種數據源,需要時參考官方文檔。

 

Transformation Operation

 

  • 常用Transformation

 

* map(func) :對源DStream的每個元素,採用func函數進行轉換,得到一個新的DStream;

* flatMap(func):與map相似,但是每個輸入項可用被映射為0個或者多個輸出項;

* filter(func):返回一個新的DStream,僅包含源DStream中滿足函數func的項;

* repartition(numPartitions):通過創建更多或者更少的分區改變DStream的并行程度;

* union(otherStream):返回一個新的DStream,包含源DStream和其他DStream的元素;

* count():統計源DStream中每個RDD的元素數量;

* reduce(func):利用函數func聚集源DStream中每個RDD的元素,返回一個包含單元素RDDs的新DStream;

* countByValue():應用於元素類型為K的DStream上,返回一個(K,V)鍵值對類型的新DStream,每個鍵的值是在原DStream的每個RDD中的出現次數;

* reduceByKey(func, [numTasks]):當在一個由(K,V)鍵值對組成的DStream上執行該操作時,返回一個新的由(K,V)鍵值對組成的DStream,每一個key的值均由給定的recuce函數(func)聚集起來;

* join(otherStream, [numTasks]):當應用於兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對),返回一個包含(K, (V, W))鍵值對的新DStream;

* cogroup(otherStream, [numTasks]):當應用於兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對),返回一個包含(K, Seq[V], Seq[W])的元組;

* transform(func):通過對源DStream的每個RDD應用RDD-to-RDD函數,創建一個新的DStream。支持在新的DStream中做任何RDD操作。

 

  • updateStateByKey(func)

  • updateStateByKey可對DStream中的數據按key做reduce,然後對各批次數據累加

  • WordCount的updateStateByKey版本

 

  • transform(func)

  • 通過對原DStream的每個RDD應用轉換函數,創建一個新的DStream。

  • 官方文檔代碼舉例

 

  • Window operations

  • 窗口操作:基於window對數據transformation(個人認為與Storm的tick相似,但功能更強大)。

  • 參數:窗口長度(window length)和滑動時間間隔(slide interval)必須是源DStream批次間隔的倍數。

  • 舉例說明:窗口長度為3,滑動時間間隔為2;上一行是原始DStream,下一行是窗口化的DStream。

  • 常見window operation

有狀態轉換包括基於滑動窗口的轉換和追蹤狀態變化(updateStateByKey)的轉換。

基於滑動窗口的轉換

* window(windowLength, slideInterval) 基於源DStream產生的窗口化的批數據,計算得到一個新的DStream;

* countByWindow(windowLength, slideInterval) 返迴流中元素的一個滑動窗口數;

* reduceByWindow(func, windowLength, slideInterval) 返回一個單元素流。利用函數func聚集滑動時間間隔的流的元素創建這個單元素流。函數func必須滿足結合律,從而可以支持并行計算;

* reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 應用到一個(K,V)鍵值對組成的DStream上時,會返回一個由(K,V)鍵值對組成的新的DStream。每一個key的值均由給定的reduce函數(func函數)進行聚合計算。注意:在默認情況下,這個算子利用了Spark默認的併發任務數去分組。可以通過numTasks參數的設置來指定不同的任務數;

* reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每個窗口的reduce值,是基於先前窗口的reduce值進行增量計算得到的;它會對進入滑動窗口的新數據進行reduce操作,並對離開窗口的老數據進行“逆向reduce”操作。但是,只能用於“可逆reduce函數”,即那些reduce函數都有一個對應的“逆向reduce函數”(以InvFunc參數傳入);

* countByValueAndWindow(windowLength, slideInterval, [numTasks]) 當應用到一個(K,V)鍵值對組成的DStream上,返回一個由(K,V)鍵值對組成的新的DStream。每個key的值都是它們在滑動窗口中出現的頻率。

  • 官方文檔代碼舉例 

 

  • join(otherStream, [numTasks])

  • 連接數據流

  • 官方文檔代碼舉例1

  • 官方文檔代碼舉例2

 

Output Operation

 

 

緩存與持久化

 

  • 通過persist()將DStream中每個RDD存儲在內存。

  • Window operations會自動持久化在內存,無需显示調用persist()。

  • 通過網絡接收的數據流(如Kafka、Flume、Socket、ZeroMQ、RocketMQ等)執行persist()時,默認在兩個節點上持久化序列化后的數據,實現容錯。

 

Checkpoint

 

  • 用途:Spark基於容錯存儲系統(如HDFS、S3)進行故障恢復。

  • 分類:

  • 元數據檢查點:保存流式計算信息用於Driver運行節點的故障恢復,包括創建應用程序的配置、應用程序定義的DStream operations、已入隊但未完成的批次。

  • 數據檢查點:保存生成的RDD。由於stateful transformation需要合併多個批次的數據,即生成的RDD依賴於前幾個批次RDD的數據(dependency chain),為縮短dependency chain從而減少故障恢復時間,需將中間RDD定期保存至可靠存儲(如HDFS)。

  • 使用時機:

  • Stateful transformation:updateStateByKey()以及window operations。

  • 需要Driver故障恢復的應用程序。

  • 使用方法

  • Stateful transformation

streamingContext.checkpoint(checkpointDirectory)

 

  • 需要Driver故障恢復的應用程序(以WordCount舉例):如果checkpoint目錄存在,則根據checkpoint數據創建新StreamingContext;否則(如首次運行)新建StreamingContext。

 

  • checkpoint時間間隔

  • 方法:

dstream.checkpoint(checkpointInterval)

 

  • 原則:一般設置為滑動時間間隔的5-10倍。

  • 分析:checkpoint會增加存儲開銷、增加批次處理時間。當批次間隔較小(如1秒)時,checkpoint可能會減小operation吞吐量;反之,checkpoint時間間隔較大會導致lineage和task數量增長。

 

性能調優

 

降低批次處理時間

 

  • 數據接收并行度

  • 增加DStream:接收網絡數據(如Kafka、Flume、Socket等)時會對數據反序列化再存儲在Spark,由於一個DStream只有Receiver對象,如果成為瓶頸可考慮增加DStream。

  • 設置“spark.streaming.blockInterval”參數:接收的數據被存儲在Spark內存前,會被合併成block,而block數量決定了Task數量;舉例,當批次時間間隔為2秒且block時間間隔為200毫秒時,Task數量約為10;如果Task數量過低,則浪費了CPU資源;推薦的最小block時間間隔為50毫秒。

  • 顯式對Input DStream重新分區:在進行更深層次處理前,先對輸入數據重新分區。

inputStream.repartition(<number of partitions>)

 

  • 數據處理并行度:reduceByKey、reduceByKeyAndWindow等operation可通過設置“spark.default.parallelism”參數或顯式設置并行度方法參數控制。

  • 數據序列化:可配置更高效的Kryo序列化。

 

設置合理批次時間間隔

 

  • 原則:處理數據的速度應大於或等於數據輸入的速度,即批次處理時間大於或等於批次時間間隔。

  • 方法:

  • 先設置批次時間間隔為5-10秒以降低數據輸入速度;

  • 再通過查看log4j日誌中的“Total delay”,逐步調整批次時間間隔,保證“Total delay”小於批次時間間隔。

 

內存調優

 

  • 持久化級別:開啟壓縮,設置參數“spark.rdd.compress”。

  • GC策略:在Driver和Executor上開啟CMS。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

新北清潔公司,居家、辦公、裝潢細清專業服務

※教你寫出一流的銷售文案?

您可能也會喜歡…