使用Apache Hudi構建大規模、事務性數據湖

一個近期由Hudi PMC & Uber Senior Engineering Manager Nishith Agarwal分享的Talk

關於Nishith Agarwal更詳細的介紹,主要從事數據方面的工作,包括攝取標準化,數據湖原語等。

什麼是數據湖?數據湖是一個集中式的存儲,允許以任意規模存儲結構化和非結構化數據。你可以存儲原始數據,而不需要先轉化為結構化的數據,基於數據湖之上可以運行多種類型的分析,如dashboard、大數據處理的可視化、實時分析、機器學習等。

接着看看對於構建PB級數據湖有哪些關鍵的要求

第一個要求:增量攝取(CDC)

企業中高價值的數據往往存儲在OLTP中,例如下圖中,users表包含用戶ID,國家/地區,修改時間和其他詳細信息,但OLTP系統並未針對大批量分析進行優化,因此可能需要引入數據湖。同時一些企業採用備份在線數據庫的方式,並將其存儲到數據湖中的方法來攝取數據,但這種方式無法擴展,同時它給上游數據庫增加了沉重的負擔,也導致數據重寫的浪費,因此需要一種增量攝取數據的方法。

第二個要求:Log Event去重

考慮分析大規模時間序列數據的場景,這些事件被寫入數據管道,並且數量非常大,可達數十億,每秒可達百萬的量。但流中可能有重複項,可能是由於至少一次(atleast-once)保證,數據管道或客戶端失敗重試處理等發送了重複的事件,如果不對日誌流進行重複處理,則對這些數據集進行的分析會有正確性問題。下圖是一個示例日誌事件流,其中事件ID為唯一鍵,帶有事件時間和其他有效負載。

第三個要求:存儲管理(自動管理DFS上文件)

我們已經了解了如何攝取數據,那麼如何管理數據的存儲以擴展整個生態系統呢?其中小文件是個大問題,它們會導致查詢引擎的開銷並增加文件系統元數據的壓力。而如果寫入較大的文件,則可能導致攝取延遲增加。一種常見的策略是先攝取小文件,然後再進行合併,這種方法沒有標準,並且在某些情況下是非原子行為,會導致一致性問題。無論如何,當我們寫小文件並且在合併這些文件之前,查詢性能都會受到影響。

第四個要求:事務寫(ACID能力)

傳統數據湖在數據寫入時的事務性方面做得不太好,但隨着越來越多的業務關鍵處理流程移至數據湖,情況也在發生變化,我們需要一種機制來原子地發布一批數據,即僅保存有效數據,部分失敗必須回滾而不會損壞已有數據集。同時查詢的結果必須是可重複的,查詢端看不到任何部分提取的數據,任何提交的數據都必須可靠地寫入。Hudi提供了強大的ACID能力。

第五個要求:更快地派生/ETL數據(增量處理)

僅僅能快速攝取數據還不夠,我們還需要具有計算派生數據的能力,沒有這個能力,數據工程師通常會繞過原始表來構建其派生/ETL並最終破壞整個體繫結構。下面示例中,我們看到原始付款表(貨幣未標準化)和發生貨幣轉換的派生表。

擴展此類數據管道時很有挑戰,如僅對變更進行計算,或者基於窗口的Join的挑戰。對基礎數據集進行大規模重新處理不太可能,這會浪費計算資源。需要在數據湖上進行抽象以支持對上游表中已更改的行(數據)進行智能計算。

第六個要求:法律合規/數據刪除(更新&刪除)

近年來隨着新的數據保護法規生效,對數據保留有了嚴格的規定,需要刪除原始記錄,修複數據的正確性等,當需要在PB級數據湖中高效執行合規性時非常困難,如同大海撈針一般,需要高效的刪除,如進行索引,對掃描進行優化,將刪除記錄有效地傳播到下游表的機制。

要求回顧(匯總)

  • 支持增量數據庫變更日誌攝取。
  • 從日誌事件中刪除所有重複項。
  • Data Lake必須為其數據集提供有效的存儲管理
  • 支持事務寫入
  • 必須提供嚴格的SLA,以確保原始表和派生表的數據新鮮度
  • 任何數據合規性需求都需要得到有效的支持
  • 支持唯一鍵約束
  • 有效處理遲到的數據

有沒有能滿足上面所有需求的系統呢?接下來我們引入Apache Hudi,HUDI代表Hadoop Upserts Deletes and Incrementals。從高層次講,HUDI允許消費數據庫和kafa事件中的變更事件,也可以增量消費其他HUDI數據集中的變更事件,並將其提取到存儲在Hadoop兼容,如HDFS和雲存儲中。在讀取方面,它提供3種不同的視圖:增量視圖,快照視圖和實時視圖。

HUDI支持2種存儲格式:“寫時複製”和“讀時合併”。

首先來看看寫時複製。如下圖所示,HUDI管理了數據集,並嘗試將一批數據寫入數據湖,HUDI維護稱為“提交時間軸(commit timeline)”的內容,以跟蹤HUDI管理的數據集上發生的操作/更改,它在提交時間軸上標記了一個“inflight”文件,表示操作已開始,HUDI會寫2個parquet文件,然後將“inflight”文件標記為已完成,這從原子上使該新數據寫入HUDI管理的數據集中,並可用於查詢。正如我們提到的,RO視圖優化查詢性能,並提供parquet的基本原始列存性能,無需增加任何額外成本。
現在假設需要更新另一批數據,HUDI在提交時間軸上標記了一個“inflight”文件,並開始合併這些更新並重寫Parquet File1。此時,由於提交仍在進行中,因此用戶看不到正在寫入任何這些更新(這就是我們稱為“快照隔離”)。最終以原子方式發布提交后,就可以查詢版本為C2的新合併的parquet文件。

COW已經在Uber投入運行多年,大多數數據集都位於COW存儲類型上。

儘管COW服務於我們的大多數用例,但仍有一些因素值得我們關注。以Uber的行程表為例,可以想象這可能是一個很大的表,它在旅程的整個生命周期中獲取大量更新。每隔30分鐘,我們就會獲得一組新旅行以及對舊旅行的一些更新,在Hive上的旅行數據是按天劃分分區的,因此新旅行最終會在最新分區中寫入新文件,而某些更新會在舊分區中寫入文件。使用COW,我們只能重寫那些更新所涉及的文件,並且能夠高效地更新。由於COW最終會重寫某些文件,因此可以像合併和重寫該數據一樣快。在該用例中通常大於15分鐘。再來看另外一種情況,由於某些業務用例(例如GDPR),必須更新大量歷史行程,這些更新涉及過去幾個月數據,從而導致很高的寫入延遲,並一遍又一遍地重寫大量數據,寫放大也會導致大量的IO。若為工作負載分配的資源不足,可能就會嚴重損害攝取延遲。

在真實場景中,會將ETL鏈接在一起來構建數據管道,問題會變得更加複雜。

對問題進行總結如下:在COW中,太多的更新(尤其是雜亂的跨分區/文件)會嚴重影響提取延遲(由於作業運行時間較長且無法追趕上入流量),同時還會引起巨大的寫放大,從而影響HDFS(相同文件的48個版本+過多的IO)。合併更新和重寫parquet文件會限制我們的數據的新鮮度,因為完成此類工作需要時間 = (重寫parquet文件所花費的時間*parquet文件的數量)/(并行性)。

在COW中,我們實際上並沒有太大的parquet文件,因為即使只有一行更新也可能要重寫整個文件,因為Hudi會選擇寫入小於預期大小的文件。

MergeOnRead將所有這些更新分組到一個文件中,然後在稍後的時刻創建一個新版本。對於重更新的表,重寫大文件會導致開銷變大。

如何解決上述寫放大問題呢?除了將更新合併並重寫parquet文件之外,我們將更新寫入增量文件中,這可以幫助我們降低攝取延遲並獲得更好的新鮮度。

將更新寫入增量文件將需要在讀取端做額外的工作以便能夠讀取增量文件中記錄,這意味着我們需要構建更智能,更智能的讀取端。

首先來看看寫時複製。如下圖所示,HUDI管理了數據集,並嘗試將一批數據寫入數據湖,HUDI維護稱為“提交時間軸(commit timeline)”的內容,以跟蹤HUDI管理的數據集上發生的操作/更改,它在提交時間軸上標記了一個“inflight”文件,表示操作已開始,HUDI會寫2個parquet文件,然後將“inflight”文件標記為已完成,這從原子上使該新數據寫入HUDI管理的數據集中,並可用於查詢。正如我們提到的,RO視圖優化查詢性能,並提供parquet的基本原始列存性能,無需增加任何額外成本。

現在需要進行第二次更新,與合併和重寫新的parquet文件(如在COW中一樣)不同,這些更新被寫到與基礎parquet文件對應的增量文件中。RO視圖繼續查詢parquet文件(過時的數據),而RealTime View(Snapshot query)會合併了parquet中的數據和增量文件中的更新,以提供最新數據的視圖。可以看到,MOR是在查詢執行時間與較低攝取延遲之間的一個權衡。

那麼,為什麼我們要異步運行壓縮?我們實現了MERGE_ON_READ來提高數據攝取速度,我們希望儘快攝取較新的數據。而合併更新和創建列式文件是Hudi數據攝取的主要耗時部分。

因此我們引入了異步Compaction步驟,該步驟可以與數據攝取同時運行,減少數據攝取延遲。

Hudi將事務引入到了大規模數據處理中,實際上,我們是最早這樣做的系統之一,最近,它已通過其他項目的類似方法獲得了社區認可。

Hudi支持多行多分區的原子性提交,Hudi維護一個特殊的文件夾.hoodie,在該文件夾中記錄以單調遞增的時間戳表示的操作,Hudi使用此文件夾以原子方式公開已提交的操作;發生的部分故障會透明地回滾,並且不會影響讀者和後面的寫入;Hudi使用MVCC模型將讀取與併發攝取和壓縮隔離開來;Hudi提交協議和DFS存儲保證了數據的持久寫入。

下面介紹Hudi在Uber的使用情況

Hudi管理了超過150PB數據湖,超過10000張表,每天攝入5000億條記錄。

接着看看Hudi如何替代分析架構。利用Hudi的upsert原語,可以在攝取到數據湖中時實現<5分鐘的新鮮度,並且能繼續獲得列式數據的原始性能(parquet格式),同時使用Hudi還可以獲得實時視圖,以5-10分鐘的延遲提供dashboard,此外HUDI支持的增量視圖有助於長尾效應對數據集的突變。

為方便用戶能快速使用Hudi,Hudi提供了一些開箱即用的工具,如HoodieDeltaStreamer,在Uber內部,HoodieDeltaStreamer用來對全球網絡進行近實時分析,可用來消費DFS/Kafka中的數據。

除了DeltaStreamer,Hudi還集成了Spark Datasource,也提供了開箱即用的能力,基於Spark,可以快速構建ETL管道,同時也可無縫使用Hudi + PySpark。

接着介紹更高級的原語和特性。

如何從損壞的數據中恢復?例如線上由於bug導致寫入了不正確的數據,或者上游系統將某一列的值標記為null,Hudi也可以很好的處理上述場景,可以將表恢復到最近的一次正確時間,如Hudi提供的savepoint就可以將不同的commit保存起來,用於後續恢復,注意MoR表暫時不支持savepoint;Hudi還提供了文件的版本號,即可以保存多個版本的文件,這對於CoW和MoR表都適用,但是會佔用一些存儲空間。

Hudi還提供便於增量ETL的高級特性,通過Spark/Spark便可以輕鬆增量拉取Hudi表的變更。

除了增量拉取,Hudi也提供了時間旅行特性,同樣通過Spark/Hive便可以輕鬆查詢指定版本的數據,其中對於Hive查詢中指定hoodie.table_name.consume.end.timestamp也馬上會得到支持。

下面看看對於線上的Hudi Spark作業如何調優。

下面列舉了幾個調優手段,設置Kryo序列化器,使用Shuffle Service,利用開源的profiler來進行內存調優,當然Hudi也提供了Hudi生產環境的調優配置,可參考【調優 | Apache Hudi應用調優指南】

下面介紹社區正在進行的工作,敬請期待。

即將發布的0.6.0版本,將企業中存量的parquet表高效導入Hudi中,與傳統通過Spark讀取Parquet表然後再寫入Hudi方案相比,佔用的資源和耗時都將大幅降低。以及對於查詢計劃的O(1)時間複雜度的處理,新增列索引及統一元數據管理以消除對DFS的文件list操作。

還有一些值得關注的特性,比如支持行級別的索引,該功能將極大降低upsert的延遲;異步數據clustering以優化存儲和查詢性能;支持Presto對MoR表的快照查詢;Hudi集成Flink,通過Flink可將數據寫入Hudi數據湖。

整個分享就介紹到這裏,歡迎觀看。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

您可能也會喜歡…