使用結構化流的大規模Lakehouse實現

2021年5月27日下午05:00(太平洋時間)

下載幻燈片

業務主管、高管、分析師和數據科學家依靠最新的信息來做出業務決策、適應市場、滿足客戶需求或運行有效的供應鏈運營。beplay体育app下载地址

來聽聽Asurion是如何使用Delta、Structured Streaming、AutoLoader和SQL Analytics來將生產數據延遲從- 1天提高到接近實時的Asurion的技術團隊將分享戰鬥測試的技巧和技巧,你隻有在一定規模的情況下才能獲得。Asurion數據湖執行超過4000個流作業,並在AWS的生產數據湖中托管超過4000個表。

在本節中請注意:
Tomasz Magdanski, Asurion工程總監

成績單

托馬斯·馬格丹斯克:你好。我要告訴你的,你在任何書或博客裏都找不到。我將與您分享您在嚐試構建大規模數據湖時會遇到的實際障礙。我簡單講一下《亞蘇裏安》我們是怎麼走到這一步的?如何經營規模大、性價比高的湖屋?最重要的是,我們在這一過程中吸取了教訓。
我們是一家大型保險和支持公司。每天,我們有超過10,000名專家參與支持會議,幫助全球超過3億人,我們的服務範圍從技術支持到當天的設備維修和更換。為了讓您了解這個項目的規模,我們的湖屋從100多個數據庫中攝取了4000多個表。我們在數據湖中創建了7500個表,其中隻有1 - 2個表,我稍後會講到。我們從Kafka, Kinesis, SNS和SQS中攝取流數據,以及來自api的文件,平麵文件和數據。
我們甚至從其他雲提供商那裏獲取數據。我們的數據庫來源從SQL server, Oracle, PostGreSQL, MySQL動態開始Redshift。在我們的數據倉庫中,我們將數千個這樣的表組合在一起,生成300多個數據模型和600多個數據集市。最後,在我們的消費層,我們有超過10,000個數據視圖和超過2000個報告。我們之前的架構是基於Lambda架構的。如您所知,Lambda體係結構包含速度層和批處理層,這其中存在幾個問題。首先,每件事都要處理兩次。您還必須對數據進行兩次驗證,並且通常必須使用不同的技術來實現這一點。或者必須以不同的方式處理後期數據,並且必須擔心在大部分不可變空間中重新處理數據。你必須擔心調度,重寫和查詢。 Data updates are also really difficult, which makes the data compliance more difficult.
我們沒有真正的計算機存儲分離,這使得可伸縮性變得困難和昂貴。我們的數據延遲主要是D-1,我們還有一個非常廣泛的技術堆棧。從紅移到Lambda, Kinesis, Kinesis Firehose, EMR, RDSS, Hive, Spectrum。這很難管理。然後我們看了湖屋建築,我們隻有一個管道。我們擁有接近實時的數據延遲能力,Apache Spark的可擴展性,高度集成的生態係統,而且技術堆棧非常狹窄。這是非常非常有希望的。
另一個有助於開發的重要進步是直接利用生產數據。我們的目標是盡可能減少數據移動,並利用計算機存儲分離。開發數據平台,獲取真實的生產數據Beplay体育安卓版本來識別和解決許多微妙的問題,以及處理實際規模,這些都是我們在開發環境中無法重現的。所以這個結果對我們來說是非常理想的。我們使用IAM角色和掛載點以隻讀模式透明地將預產品計算集群連接到生產數據,同時還將數據寫回生產數據包。因此,數據實際上從未長期駐留在環境中,處於真正的計算和存儲分離部分。這張圖很重要,我們會在某節課上再講。
在我們之前的體係結構中,每個表都有一個ETL映射作業,這使得平台非常嚴格。Beplay体育安卓版本我們實際上有4000個映射,如果您需要在許多映射中進行更改,那麼工作量很大,而且平台有點抗拒更改,因為必須更改的事情很複雜。Beplay体育安卓版本在Lake House中,我們希望創建一個單一的spark ingestion作業,它能夠流處理、批處理和從所有源中讀取數據,並且是完全可配置的,我們使用Scala編寫了這個作業,使用了設計模式和大量依賴注入,允許非常豐富的可配置性。我們選擇結構化流來利用檢查點,精確或至少一種語義,我們已經統一了我們的接口,我們的著陸區域,到S3和Kafka。因此,數據庫、CDC管道、api、平麵文件都上傳到S3、SNS和SQS, Kinesis都上傳到Kafka。然後,我們使用Databricks作為股作業和臨時集群來調度攝取作業。因此,這使得代碼具有高度的可測試性、易於維護和非常靈活。
數據湖中的所有表都是delta表。作為數據湖的一部分,我們需要保持一個緩慢變化的維度類型,隻附加表來跟蹤所有行、所有列和分數的變化,我們稱之為L1表,但我們也需要存儲每一行的相關版本,就像搜索引擎[聽不清楚]類型,通過合並到目標表的變化,我們稱之為L2表。所以我們有一個選擇。你是否將數據從著陸區流到L1,然後從L1流到L2?但這實際上會使我們的工作崗位數量翻倍,從4000個增加到8000個。因此,在經過大量測試後,我們決定利用每個批處理API的強大功能同時編寫兩個表。如果你……Intellect box被大大簡化了,但我們實際上沒有使用直接的Spark。我們有讀者和作者圍繞Spark。我們還通過在forge批處理中添加強製元列來大量裝飾我們的數據。
我們要做的下一個選擇是關於我們的觸發選擇一種原生的解決這個問題的方法是取出攝取作業,那是一個流作業並將它作為一個短暫的作業發送出去。現在的挑戰是,我們有4000個表要運行,而Databricks隻允許在碎片中有1000個作業。最重要的是,每個短暫的集群都必須有一個驅動程序和至少兩個節點,這將使我們處於一個12,000個節點的環境中,這有點讓人望而卻步,我們不認為這是對資源的最佳利用。所以下一步我們考慮在筆記本電腦中把這些流作業組合在一起,並把它們作為一個臨時作業來運行,我們找到了一個驅動程序處理大約40個流的最佳位置。
當然,集群必須更大才能處理所有流,但我們也注意到相當高的計算機浪費,因為在這40個流中,有些流可能並不經常有數據,但我們讓它們一直運行。這種方法的另一個問題是,如果我們想禁用單個流,我們實際上必須停止作業,這將停止所有40個流。因此,我們希望找到一種更好的方法,能夠在不需要停止所有其他作業的情況下,切換一個作業,或將它從一個筆記本移動到另一個筆記本,或從一個集群移動到另一個集群。最後,我們選擇了觸發方式,對於那些可能不知道觸發方式的人它是結構化流,它會收集所有數據,直到你的最大流或文件或字節數,處理這些數據,然後檢查進度,然後終止作業。
因此,當您希望微批運行時,您實際上負責調度它們。這意味著沒有持續執行,這允許我們將筆記本中的數百個作業放在一個臨時集群中。現在,因為這些作業是按計劃運行的,我們實際上可以在兩輪之間在筆記本電腦之間遷移它們。我們還可以在每一輪中刷新每一輪的衝突,這樣我們所做的唯一配置更改就會立即生效,而不需要重新啟動。同時,我們也在使用機器學習來振興不同集群的工作,以確保我們滿足我們的數據sli。我們有五種類型的筆記本作為臨時作業運行,這些筆記本有一個分配給它們的組。他們進入數據庫,配置數據庫,收集所有屬於該組的作業,然後他們以[聽不清]係列運行這些作業,或者通過使用電源收集,我們可以讓作業執行癱瘓,當然,在驅動器上,通常是同時運行16到32個作業。
我們有五種口味的筆記本。我們有一個用於非常頻繁更新的東西,實際上我們可以在一個筆記本電腦上放大約60個表。我們的目標是在60分鍾內完成所有的更新和合並。我們有更新頻率較低的表,可以放入300到500個表。然後是不經常更新的表,我們可以放1000個。這種方法的好處是,我們可以獲得一個頻率不斷增長的表,並在這些組之間無縫地移動它,並無縫地重新平衡這些工作,而不會對該組中運行的任何其他工作造成資源或影響。
我們還可以通過在y循環中運行作業的筆記本來實現所謂的偽流,我們已經測試過了,性能非常接近於運行一個處理觸發器,讓Spark處理微批的執行。例外的是,我們可以在每個y循環的頂部檢查配置,這意味著如果我們有五個作業正在運行,我們想要禁用其中一個,我們隻禁用其中一個,下一次迭代將不會運行,而其他四個不會重新啟動。
好的。所以讓我們繼續討論我們在建造如此大規模的湖泊環境時所學到的經驗教訓。首先,讓我們談談雲文件。你們可能不知道,雲文件是Databricks提供的Auto Loader的一部分,當你使用AWS並試圖從文件夾中讀取時,Databricks會自動為該文件夾創建一個S3通知,使其成為一個SNS,然後它會有一個訂閱了訂閱了SNS的SQS的SNS。現在,首先,AWS隻允許每個桶100個通知。因此,您將不得不編寫某種自動化程序,以便知道您在後端已經有了飽和的通知,並且您可能需要在下一個bucket中部署一種新的作業類型。
其次,SQS和SNS在默認情況下沒有標記,至少現在是這樣。因此,如果你像我們一樣,或者要求你所有的資源都被標記,你將不得不使用某種Lambda或某種函數來注意和檢測Databricks在你的帳戶上創建的資源,並適當地標記它們。最後,AWS SNS對api, ListSubscriptions和ListSubscriptionsByTopic有硬性限製,Databricks使用這些api來檢查是否已經…SNS已經有SQS訂閱。如果你運行足夠多的工作,就像我們同時運行成千上萬個工作一樣,我們已經看到我們達到這些限製,我們的工作失敗了。所以今天處理這個問題的唯一選擇就是看看是否有變化不大的非常慢的表,我們可以禁用通知,或者隻是及時地分散它們來避免這種情況。但在某些時候,我們會遇到這樣的問題。
好的。我們從雲文件中學到的另一個教訓。我要回到我之前講過的數據計算種族隔離的幻燈片。因此,當你從另一個帳戶預戳計算,從生產帳戶請求數據時,Databricks會設置一個通知,它會設置一個SNS,然後它會設置一個SQS。但是如果您注意的話,就會發現SNS和SQS位於計算的帳戶中,而不是存儲的帳戶中。
這樣就可以了,您的測試也可以了,很好,您已經準備好部署到生產環境中了。將作業部署到生產環境中。您對數據提出了相同的請求,但這次Databricks隻創建了一個SQS隊列,因為已經存在一個SNS,對嗎?所以現在,是的,你的生產數據也可以工作,但通知和整個雲文件都是通過預刺激環境運行的,這當然是有問題的。因此,在部署過程中,我們必須清理通知,首先運行生產作業,讓生產作業設置SNS和SQS,然後啟動預生產跳轉以訂閱該SNS主題。這是最後一步,但這是我們在部署過程中必須執行和自動化的額外步驟。
好的,下一個。我們通過AWS DMS從成千上萬的數據庫表中獲取數據,我們使用CDC更改數據捕獲流。所以當你第一次嚐試在桌子上啟用CDC時,你必須做兩件事。你必須啟動負載和疾控中心。load的意思是,它是給定時間表的快照,然後CDC是數據庫記錄的過程,在對每個角色進行更改後跳過。現在,我們在這個設置中發現的挑戰是加載文件可能需要幾個小時,CDC文件在你開始工作時就開始被跟蹤。所以有可能在管道的CDC部分中有一個對象[聽不清]它會有一個時間戳,在加載文件完成之前,而加載文件分配了現在的時間戳。
所以你有一個加載文件版本的行,它的時間戳在CDC更新版本的行之前。因此,我們將加載文件的DMS時間戳重置為零,以避免這種競爭條件。我們從DMS和CDC中學到的另一個經驗是數據類型轉換,由於DMS可以連接到幾乎任何數據庫,我們發現有時它不能正確地轉換數據類型。例如,我們是一個SQL服務器,一個小整數轉換為UINT,我們已經看到了一些溢出,我們必須應用你在幻燈片底部看到的規則來強製它返回一個整數。例如,在Oracle中,數值被轉換為DECIMAL(38,10),你可以選擇將其一直設置為38,38,但例如,在我們的Oracle數據庫中,數值列的精度是50。所以我們沒有辦法把數據帶來。我們必須將名為numberDataTypeScale的設置設置為負2,這有效地將其轉換為字符串。
這又是一個教訓。加載文件可能很大,讀取時可能會導致數據傾斜。所以你可能需要加些鹽。DMS文件不是分區的,所以要麼考慮壓縮,要麼注意Spark會花費很長時間來讀取DMS存儲桶。如果你在幾個月後重新啟動,會有成千上萬的小文件。我們將DMS設置為在開始任務時刪除所有文件,隻是為了重新開始,隻是為了緩解這個問題和這個問題,並盡量減少數據重複。數據庫源上的某些源可能有大型事務。
讓我們回顧一下。當我們在一個微批處理中對單行有多個更新時,我們必須確定哪一個被延遲了。這個說我們想要合並到目標表中,最初我們認為我們可以使用時間戳,因為它們通常以毫秒或微秒為單位,但我們發現,如果您手動在數據庫中打開一個大型事務,並在該事務中對同一行進行大量更新,然後關閉事務,所有這些更新都帶有完全相同的時間戳。所以你不可能確定哪個是最新的。因此,我們必須通過DMS為所有數據庫引入LSN,以確保我們有某種確定性的方式來合並並獲得最新的行。
現在,我們學到的另一個教訓是,如果數據庫沒有主鍵,但是有一個唯一的約束,你可以用來合並,這個約束包含空值,空值是很多的源,Databrick的merge Delta在匹配時不會識別null和null,它每次都會插入新行。所以我們要做的就是將所有的空值替換為字符串空值或者空字符串來進行更確定的歸並。
關於卡夫卡的教訓。所以如果你使用Kafka進行某種CDC管道模式,這也是可能的,你可能有一些沒有很多流量的表,你不想為這個小表配置大量的分區和浪費大量的資源,但初始數據負載實際上可能會帶來數百萬行的數據。現在你有了一個沒有大量分區的主題,但它有很多數據,你不一定要為了一次讀取而重新分區該主題。因此,我們建議將每個觸發器的最小分區和最大偏移量設置為較大的數字。在我們的例子中,我認為我們分區了4000個,抵消了10,000個,以迫使Spark麻痹這個主題的權重,從而在不需要重新分區的情況下加快它的速度。
我們還使用了我們編寫的第一個L1表。我們為每個批處理優化裏麵的表,然後我們把它作為L2歸並的源。那就是避免一個動作,第二個動作,回到原點,因為原點可能很慢。DMS可能因為這個文件數量而停止,或者Kafka在這種特殊情況下也可能很慢。所以一般來說,我們采用了這樣的模式,即向數據幀中添加一個批處理ID列,寫入L1,如果需要的話進行優化,然後使用相同的批處理ID從L1中篩選數據進行合並。我們發現這種模式比返回源要快得多,如果你有大量的數據,緩存也會更慢。
我們從卡夫卡身上學到的其他經驗。好吧,我們希望在所有東西上都使用觸發器一次,這樣我們就有了選擇,而Kinesis和其他來源不支持這個開箱操作。所以我們必須使用Kafka Connect將我們所有的SNS, SQS, Kinesis移動到Kafka中,然後我們可以使用支持,我們可以使用Kafka觸發器一次,使這個工作以與我們在DMS中完全相同的方式工作。
從達美航空吸取的教訓。當你第一次帶來數據時,手動優化你的表因為你會得到更快的[聽不清]然後啟用Delta Optimized Writes因為合並會重寫很多數據。Optimized Writes是很好的合並和計算文件大小,所以你不需要經常做壓縮優化。移動您的批處理ID和合並列到您的數據框架的前麵。
因此,Databrick收集數據幀的第一行的統計信息。如果你有一個非常寬的數據幀,你不會得到最後的列的統計信息,特別是如果你剛剛添加了批處理ID,那將是最後一列。例如,您需要該批處理ID進行篩選。所以把你要合並和搜索的所有東西都移到數據框架的前麵。如果你使用增量合並列,比如自動更新數字,等等,你也可以使用排序來將數據放在一起,進一步減少需要讀取的文件數量,它有數據跳過。對於Delta,我們總是建議使用分區和帶有IO緩存的i3實例類型。
還有其他的經驗教訓。如果你有其他需要讀取Delta的工具,比如我們用Presto,當你在Hive中注冊Delta表時,你必須寫S3路徑。因為Presto不會理解DBFS,它不會理解[聽不清],所以我們必須手動創建表定義並將S3路徑放入其中。如果你想讓你的Delta文件被諸如Athena、Hive或者Spectrum這樣的東西讀寫,你需要生成清單文件,並啟用對這些清單文件的自動更新。這樣,非delta或非delta事務,本地[聽不清]技術仍然可以顯示文件來讀取parquet文件。
Presto和Spark視圖目前不兼容。這是需要注意的,因為如果你在Spark中創建一個視圖,Presto將不能利用它,反之亦然。我們還發現,將像行數、最後修改的行數這樣的增量統計信息提取到熱緩存中實際上是非常可取的,因為我們的工作負載需要更新許多先前的表,許多依賴的表需要首先更新。假設您有一個TL,它需要首先更新30,40個表。擁有一個正在運行的集群並發出40個已描述的命令,僅僅是為了確定是否可以運行作業,這是很慢的。通過在每個任務結束時將這些統計數據提交給熱緩存,下一個任務隻需讀取現金並確定所有表都已更新,我就可以開始調度任務了。
好的。最後,Delta和Spark一起工作,將Delta從Delta表中輸出,但目前,到今天為止,它隻對append有效。這是有意義的,因為當您向Delta表追加內容時,您將向表和事務日誌中插入新文件,並且偵聽該表的任何人都知道有新文件,並且這些新文件被讀取。這很有道理。至於歸並,這有點複雜因為歸並會重寫所有數據。是的,創建了一個新文件這個文件可能有一百萬行,但我們隻更新了1000行。所以如果你在文件中沿著管道走,你必須從百萬行中篩選到實際發生變化的一千行。我們要怎麼做呢?
同樣,我們使用了我們之前添加的批處理ID來知道新的批處理是27,我隻需要提取27的數據,因此將這個龐大的文件過濾成一個小文件。它不是最有效的,但它運行得很好。最後是SQL分析,簡單介紹一下我們如何使用SQL分析。因此,我們的數據集市是由超過1000個SQL查詢和語句的集合構建的。我們需要一種方法將數據集市從以前的實驗室平台提升並轉移到這個平台,因此我們需要一個良好的可伸縮SQL執行引擎。Beplay体育安卓版本當然,Spark就是這樣,我們希望利用現有的框架,通過JDBC連接器將SQL語句提交到Spark集群。第一個選擇是使用交互式集群,但它們相當昂貴。
所以我們更傾向於使用開源Spark和Delta的EMR。當SQL Analytics產品進入我們的範圍時,它仍然支持JDBC連接,所以它非常適合我們能夠發送那些SQL查詢並創建那些數據集市。到目前為止,我們學到的一些經驗教訓是,因為它是一個早期的產品,我們必須自己從api中收集所有的指標,並將它們放在Delta表中,以進行一些監控和性能。每個SQL工作區隻允許使用一個元存儲,這意味著如果你有多個不同的SQL端點,它們都將共享一個元存儲,這與集群不同,我們可以在每個集群中配置不同的元存儲。因此,我們在分離計算和存儲以及跨帳戶和使用元存儲將它們結合在一起方麵受到了一些限製。
它也不支持UDF。因此,如果你需要一個[聽不清],你仍然必須依靠交互式Spark SQL集群,在那裏你可以附加jar,這次它將沒有jar來進行SQL分析。最後,您還必須學習如何排除Spark故障。因此,您仍然需要學習如何理解dag以及Spark UI上的Spark job和SQL視圖,因為這仍然隻是一個底層的Spark job。因此,為了有效地找到查詢中的瓶頸,您仍然需要這樣做。
好的,非常感謝。現在是問答時間。謝謝你,請提供你的反饋。我們想要你的回複。我們希望提高我們與大家分享的內容的質量,如果你有任何問題,如果你想離線聯係我,請隨時在領英上與我聯係。謝謝你!

托馬斯Magdanski

Tomasz是一位經驗豐富的技術領導者,專注於大數據、實時應用和機器學習技術的現實世界實現。他部署和管理生產應用程序…
閱讀更多

Baidu
map