使用Apache Spark和Apache Hudi構建分析數據湖

1. 引入

大多數現代數據湖都是基於某種分佈式文件系統(DFS),如HDFS或基於雲的存儲,如AWS S3構建的。遵循的基本原則之一是文件的“一次寫入多次讀取”訪問模型。這對於處理海量數據非常有用,如數百GB到TB的數據。

但是在構建分析數據湖時,更新數據並不罕見。根據不同場景,這些更新頻率可能是每小時一次,甚至可能是每天或每周一次。另外可能還需要在最新視圖、包含所有更新的歷史視圖甚至僅是最新增量視圖上運行分析。

通常這會導致使用用於流和批處理的多個系統,前者處理增量數據,而後者處理歷史數據。

處理存儲在HDFS上的數據時,維護增量更新的常見工作流程是這裏所述的Ingest-Reconcile-Compact-Purge策略。

Apache Hudi之類的框架在這裏便可發揮作用。它在後台為我們管理此工作流程,從而使我們的核心應用程序代碼更加簡潔,Hudi支持對最新數據視圖的查詢以及查詢在某個時間點的增量更改。

這篇文章將介紹Hudi的核心概念以及如何在Copy-On-Write模式下進行操作。

本篇文章項目源代碼放在github。

2. 大綱

  • 先決條件和框架版本
  • Hudi核心概念
  • 初始設置和依賴項
  • 使用CoW表

2.1 先決條件和框架版本

如果你事先了解如何使用scala編寫spark作業以及讀取和寫入parquet文件,那麼本篇文章理解起來將非常容易。

框架版本如下

  • JDK: openjdk 1.8.0_242
  • Scala: 2.12.8
  • Spark: 2.4.4
  • Hudi Spark bundle: 0.5.2-incubating

注意:在撰寫本文時,AWS EMR與Hudi v0.5.0-incubating集成在一起,該軟件包具有一個bug會導致upsert操作卡死或花費很長時間才能完成,可查看相關issue了解更多,該問題已在當前版本的Hudi(0.5.2-incubating及之後版本)中修復。如果計劃在AWS EMR上運行代碼,則可能要考慮用最新版本覆蓋默認的集成版本。

2.2 Hudi核心概念

先從一些需要理解的核心概念開始。

1. 表類型

Hudi支持兩種表類型

  • 寫時複製(CoW):寫入CoW表時,將運行Ingest-Reconcile-Compact-Purge周期。每次寫操作后,CoW表中的數據始終是最新記錄,對於需要儘快讀取最新數據的場景,可首選此模式。數據僅以列文件格式(parquet)存儲在CoW表中,由於每個寫操作都涉及壓縮和覆蓋,因此此模式產生的文件最少。

  • 讀時合併(MoR):MoR表專註於快速寫操作。寫入這些表將創建增量文件,隨後將其壓縮以生成讀取時的最新數據,壓縮操作可以同步或異步完成,數據以列文件格式(parquet)和基於行的文件格式(avro)組合存儲。

這是Hudi文檔中提到的兩種表格格式之間的權衡取捨。

Trade-off CoW MoR
數據延遲 Higher Lower
更新開銷 (I/O) Higher (重寫整個parquet文件) Lower (追加到delta log文件)
Parquet文件大小 Smaller (高update(I/0) 開銷) Larger (低更新開銷)
Write Amplification Higher Lower (由compaction策略決定)

2. 查詢類型

Hudi支持兩種主要類型的查詢:“快照查詢”和“增量查詢”。除兩種主要查詢類型外,MoR表還支持“讀優化查詢”。

  • 快照查詢:對於CoW表,快照查詢返回數據的最新視圖,而對於MoR表,則返回接近實時的視圖。 對於MoR表,快照查詢將即時合併基本文件和增量文件,因此可能會有一些讀取延遲。使用CoW,由於寫入負責合併,因此讀取很快,只需要讀取基本文件。

  • 增量查詢:增量查詢使您可以通過指定“開始”時間或在特定時間點通過指定“開始”和“結束”時間來查看特定提交時間之後的數據。

  • 讀優化查詢:對於MoR表,讀取優化查詢返回一個視圖,該視圖僅包含基本文件中的數據,而不合併增量文件。

3. 以Hudi格式寫入時的關鍵屬性

  • hoodie.datasource.write.table.type,定義表的類型-默認值為COPY_ON_WRITE。對於MoR表,將此值設置為MERGE_ON_READ。

  • hoodie.table.name,這是必填字段,每個表都應具有唯一的名稱。

  • hoodie.datasource.write.recordkey.field,將此視為表的主鍵。此屬性的值是DataFrame中列的名稱,該列是主鍵。

  • hoodie.datasource.write.precombine.field,更新數據時,如果存在兩個具有相同主鍵的記錄,則此列中的值將決定更新哪個記錄。選擇諸如時間戳記的列將確保選擇具有最新時間戳記的記錄。

  • hoodie.datasource.write.operation,定義寫操作的類型。值可以為upsert,insert,bulk_insert和delete,默認值為upsert。

2.3 初始設置和依賴項

1. 依賴說明

為了在Spark作業中使用Hudi,需要使用spark-sql,hudi-spark-bundle和spark-avro依賴項,此外還需要將Spark配置為使用KryoSerializer。

pom.xml大致內容如下

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.12.8</scala.version>
    <scala.compat.version>2.12</scala.compat.version>
    <spec2.version>4.2.0</spec2.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
        <version>2.4.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-spark-bundle_${scala.compat.version}</artifactId>
        <version>0.5.2-incubating</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_${scala.compat.version}</artifactId>
        <version>2.4.4</version>
    </dependency>
</dependencies>

2. 設置Schema

我們使用下面的Album類來表示表的schema。

case class Album(albumId: Long, title: String, tracks: Array[String], updateDate: Long)   

3. 生成測試數據

創建一些用於upsert操作的數據。

  • INITIAL_ALBUM_DATA有兩個記錄,鍵為801。
  • UPSERT_ALBUM_DATA包含一個更新的記錄和兩個新的記錄。
def dateToLong(dateString: String): Long = LocalDate.parse(dateString, formatter).toEpochDay

private val INITIAL_ALBUM_DATA = Seq(
    Album(800, "6 String Theory", Array("Lay it down", "Am I Wrong", "68"), dateToLong("2019-12-01")),
    Album(801, "Hail to the Thief", Array("2+2=5", "Backdrifts"), dateToLong("2019-12-01")),
    Album(801, "Hail to the Thief", Array("2+2=5", "Backdrifts", "Go to sleep"), dateToLong("2019-12-03"))
  )

  private val UPSERT_ALBUM_DATA = Seq(
    Album(800, "6 String Theory - Special", Array("Jumpin' the blues", "Bluesnote", "Birth of blues"), dateToLong("2020-01-03")),
    Album(802, "Best Of Jazz Blues", Array("Jumpin' the blues", "Bluesnote", "Birth of blues"), dateToLong("2020-01-04")),
    Album(803, "Birth of Cool", Array("Move", "Jeru", "Moon Dreams"), dateToLong("2020-02-03"))
  )

4. 初始化SparkContext

最後初始化Spark上下文。這裏要注意的重要一點是KryoSerializer的使用。

val spark: SparkSession = SparkSession.builder()
    .appName("hudi-datalake")
    .master("local[*]")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.hive.convertMetastoreParquet", "false") // Uses Hive SerDe, this is mandatory for MoR tables
    .getOrCreate()

2.4 使用CoW表

本節將處理CoW表的記錄,如讀取和刪除記錄。

1. basePath(基本路徑)和Upsert方法

定義一個basePath,upsert方法會將表數據寫入該路徑,該方法將以org.apache.hudi格式寫入Dataframe,請確保上面討論的所有Hudi屬性均已設置。

val basePath = "/tmp/store"

private def upsert(albumDf: DataFrame, tableName: String, key: String, combineKey: String) = {
    albumDf.write
      .format("hudi")
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey)
      .option(HoodieWriteConfig.TABLE_NAME, tableName)
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
      // Ignore this property for now, the default is too high when experimenting on your local machine
      // Set this to a lower value to improve performance.
      // I'll probably cover Hudi tuning in a separate post.
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(SaveMode.Append)
      .save(s"$basePath/$tableName/")
  }

2. 初始化upsert

插入INITIAL_ALBUM_DATA,我們應該創建2條記錄,對於801,該記錄的日期為2019-12-03。

val tableName = "Album"
upsert(INITIAL_ALBUM_DATA.toDF(), tableName, "albumId", "updateDate")
spark.read.format("hudi").load(s"$basePath/$tableName/*").show()

讀取CoW表就像使用格式(“hudl”)的常規spark.read一樣簡單。

// Output
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----------------+--------------------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|albumId|            title|              tracks|updateDate|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----------------+--------------------+----------+
|     20200412182343|  20200412182343_0_1|               801|               default|65841d0a-0083-447...|    801|Hail to the Thief|[2+2=5, Backdrift...|     18233|
|     20200412182343|  20200412182343_0_2|               800|               default|65841d0a-0083-447...|    800|  6 String Theory|[Lay it down, Am ...|     18231|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----------------+--------------------+----------+

另一種確定的方法是查看Workload profile的日誌輸出,內容大致如下

Workload profile :WorkloadProfile {globalStat=WorkloadStat {numInserts=2, numUpdates=0}, partitionStat={default=WorkloadStat {numInserts=2, numUpdates=0}}}

3. 更新記錄

upsert(UPSERT_ALBUM_DATA.toDF(), tableName, "albumId", "updateDate")

查看Workload profile的日誌輸出,並驗證它是否符合預期

Workload profile :WorkloadProfile {globalStat=WorkloadStat {numInserts=2, numUpdates=1}, partitionStat={default=WorkloadStat {numInserts=2, numUpdates=1}}}

查詢輸出如下

spark.read.format("hudi").load(s"$basePath/$tableName/*").show()

//Output
+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|albumId|               title|              tracks|updateDate|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+
|     20200412183510|  20200412183510_0_1|               801|               default|65841d0a-0083-447...|    801|   Hail to the Thief|[2+2=5, Backdrift...|     18233|
|     20200412184040|  20200412184040_0_1|               800|               default|65841d0a-0083-447...|    800|6 String Theory -...|[Jumpin' the blue...|     18264|
|     20200412184040|  20200412184040_0_2|               802|               default|65841d0a-0083-447...|    802|  Best Of Jazz Blues|[Jumpin' the blue...|     18265|
|     20200412184040|  20200412184040_0_3|               803|               default|65841d0a-0083-447...|    803|       Birth of Cool|[Move, Jeru, Moon...|     18295|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+

4. 查詢記錄

我們在上面查看數據的方式稱為“快照查詢”,這是默認設置,另外還支持“增量查詢”。

4.1 增量查詢

要執行增量查詢,我們需要在讀取時將hoodie.datasource.query.type屬性設置為incremental,並指定hoodie.datasource.read.begin.instanttime屬性。 這將在指定的即時時間之後讀取所有記錄,對於本示例,我們將instantTime指定為20200412183510

spark.read
    .format("hudi")
    .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
    .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200412183510")
    .load(s"$basePath/$tableName")
    .show()

這將在提交時間20200412183510之後返回所有記錄。

+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|albumId|               title|              tracks|updateDate|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+
|     20200412184040|  20200412184040_0_1|               800|               default|65841d0a-0083-447...|    800|6 String Theory -...|[Jumpin' the blue...|     18264|
|     20200412184040|  20200412184040_0_2|               802|               default|65841d0a-0083-447...|    802|  Best Of Jazz Blues|[Jumpin' the blue...|     18265|
|     20200412184040|  20200412184040_0_3|               803|               default|65841d0a-0083-447...|    803|       Birth of Cool|[Move, Jeru, Moon...|     18295|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+

5. 刪除記錄

我們要查看的最後一個操作是刪除,刪除類似於upsert,需要一個待刪除記錄的DataFrame,如下面的示例代碼所示,不需要整行,只需要主鍵即可。

val deleteKeys = Seq(
    Album(803, "", null, 0l),
    Album(802, "", null, 0l)
)

import spark.implicits._

val df = deleteKeys.toDF()

df.write.format("hudi")
    .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "albumId")
    .option(HoodieWriteConfig.TABLE_NAME, tableName)
    // Set the option "hoodie.datasource.write.operation" to "delete"
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
    .mode(SaveMode.Append) // Only Append Mode is supported for Delete.
    .save(s"$basePath/$tableName/")

spark.read.format("hudi").load(s"$basePath/$tableName/*").show()

這是本部分介紹的全部內容。後面我們將探討在MERGE-ON-READ表進行操作。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

您可能也會喜歡…