Shuffle核心概念、Shuffle調優(yōu)及故障排除
六、bypass機制開啟閾值
對于SortShuffleManager,如果shuffle reduce task的數量小于某一閾值則shuffle write過程中不會進行排序操作,而是直接按照未經優(yōu)化的HashShuffleManager的方式去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合并成一個文件,并會創(chuàng)建單獨的索引文件。
當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數調大一些,大于shuffle read task的數量,那么此時map-side就不會進行排序了,減少了排序的性能開銷,但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。
SortShuffleManager排序操作閾值的設置可以通過spark.shuffle.sort.bypassMergeThreshold這一參數進行設置,默認值為200,該參數的設置方法如下:
reduce端拉取數據等待間隔配置:
val conf = new SparkConf()
.set("spark.shuffle.sort.bypassMergeThreshold", "400")
數據傾斜
就是數據分到各個區(qū)的數量不太均勻,可以自定義分區(qū)器,想怎么分就怎么分。
Spark中的數據傾斜問題主要指shuffle過程中出現的數據傾斜問題,是由于不同的key對應的數據量不同導致的不同task所處理的數據量不同的問題。
例如,reduced端一共要處理100萬條數據,第一個和第二個task分別被分配到了1萬條數據,計算5分鐘內完成,第三個task分配到了98萬數據,此時第三個task可能需要10個小時完成,這使得整個Spark作業(yè)需要10個小時才能運行完成,這就是數據傾斜所帶來的后果。
注意,要區(qū)分開數據傾斜與數據過量這兩種情況,數據傾斜是指少數task被分配了絕大多數的數據,因此少數task運行緩慢;數據過量是指所有task被分配的數據量都很大,相差不多,所有task都運行緩慢。
數據傾斜的表現:
Spark作業(yè)的大部分task都執(zhí)行迅速,只有有限的幾個task執(zhí)行的非常慢,此時可能出現了數據傾斜,作業(yè)可以運行,但是運行得非常慢;Spark作業(yè)的大部分task都執(zhí)行迅速,但是有的task在運行過程中會突然報出OOM,反復執(zhí)行幾次都在某一個task報出OOM錯誤,此時可能出現了數據傾斜,作業(yè)無法正常運行。定位數據傾斜問題:查閱代碼中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根據代碼邏輯判斷此處是否會出現數據傾斜;查看Spark作業(yè)的log文件,log文件對于錯誤的記錄會精確到代碼的某一行,可以根據異常定位到的代碼位置來明確錯誤發(fā)生在第幾個stage,對應的shuffle算子是哪一個;1. 預聚合原始數據
1. 避免shuffle過程
絕大多數情況下,Spark作業(yè)的數據來源都是Hive表,這些Hive表基本都是經過ETL之后的昨天的數據。為了避免數據傾斜,我們可以考慮避免shuffle過程,如果避免了shuffle過程,那么從根本上就消除了發(fā)生數據傾斜問題的可能。
如果Spark作業(yè)的數據來源于Hive表,那么可以先在Hive表中對數據進行聚合,例如按照key進行分組,將同一key對應的所有value用一種特殊的格式拼接到一個字符串里去,這樣,一個key就只有一條數據了;之后,對一個key的所有value進行處理時,只需要進行map操作即可,無需再進行任何的shuffle操作。通過上述方式就避免了執(zhí)行shuffle操作,也就不可能會發(fā)生任何的數據傾斜問題。
對于Hive表中數據的操作,不一定是拼接成一個字符串,也可以是直接對key的每一條數據進行累計計算。要區(qū)分開,處理的數據量大和數據傾斜的區(qū)別。
2. 增大key粒度(減小數據傾斜可能性,增大每個task的數據量)
如果沒有辦法對每個key聚合出來一條數據,在特定場景下,可以考慮擴大key的聚合粒度。
例如,目前有10萬條用戶數據,當前key的粒度是(省,城市,區(qū),日期),現在我們考慮擴大粒度,將key的粒度擴大為(省,城市,日期),這樣的話,key的數量會減少,key之間的數據量差異也有可能會減少,由此可以減輕數據傾斜的現象和問題。(此方法只針對特定類型的數據有效,當應用場景不適宜時,會加重數據傾斜)
2. 預處理導致傾斜的key
1. 過濾
如果在Spark作業(yè)中允許丟棄某些數據,那么可以考慮將可能導致數據傾斜的key進行過濾,濾除可能導致數據傾斜的key對應的數據,這樣,在Spark作業(yè)中就不會發(fā)生數據傾斜了。
2. 使用隨機key
當使用了類似于groupByKey、reduceByKey這樣的算子時,可以考慮使用隨機key實現雙重聚合,如下圖所示:
隨機key實現雙重聚合
首先,通過map算子給每個數據的key添加隨機數前綴,對key進行打散,將原先一樣的key變成不一樣的key,然后進行第一次聚合,這樣就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合;隨后,去除掉每個key的前綴,再次進行聚合。
此方法對于由groupByKey、reduceByKey這類算子造成的數據傾斜有比較好的效果,僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。
此方法也是前幾種方案沒有比較好的效果時要嘗試的解決方案。
3. sample采樣對傾斜key單獨進行join
在Spark中,如果某個RDD只有一個key,那么在shuffle過程中會默認將此key對應的數據打散,由不同的reduce端task進行處理。
所以當由單個key導致數據傾斜時,可有將發(fā)生數據傾斜的key單獨提取出來,組成一個RDD,然后用這個原本會導致傾斜的key組成的RDD和其他RDD單獨join,此時,根據Spark的運行機制,此RDD中的數據會在shuffle階段被分散到多個task中去進行join操作。
傾斜key單獨join的流程如下圖所示:
傾斜key單獨join流程
適用場景分析:
對于RDD中的數據,可以將其轉換為一個中間表,或者是直接使用countByKey()的方式,看一下這個RDD中各個key對應的數據量,此時如果你發(fā)現整個RDD就一個key的數據量特別多,那么就可以考慮使用這種方法。
當數據量非常大時,可以考慮使用sample采樣獲取10%的數據,然后分析這10%的數據中哪個key可能會導致數據傾斜,然后將這個key對應的數據單獨提取出來。
不適用場景分析:
如果一個RDD中導致數據傾斜的key很多,那么此方案不適用。
3. 提高reduce并行度
當方案一和方案二對于數據傾斜的處理沒有很好的效果時,可以考慮提高shuffle過程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的數量,那么每個task分配到的數據量就會相應減少,由此緩解數據傾斜問題。
1. reduce端并行度的設置
在大部分的shuffle算子中,都可以傳入一個并行度的設置參數,比如reduceByKey(500),這個參數會決定shuffle過程中reduce端的并行度,在進行shuffle操作的時候,就會對應著創(chuàng)建指定數量的reduce task。對于Spark SQL中的shuffle類語句,比如group by、join等,需要設置一個參數,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的并行度,該值默認是200,對于很多場景來說都有點過小。
增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。
舉例來說,如果原本有5個key,每個key對應10條數據,這5個key都是分配給一個task的,那么這個task就要處理50條數據。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數據,那么自然每個task的執(zhí)行時間都會變短了。
2. reduce端并行度設置存在的缺陷
提高reduce端并行度并沒有從根本上改變數據傾斜的本質和問題(方案一和方案二從根本上避免了數據傾斜的發(fā)生),只是盡可能地去緩解和減輕shuffle reduce task的數據壓力,以及數據傾斜的問題,適用于有較多key對應的數據量都比較大的情況。
該方案通常無法徹底解決數據傾斜,因為如果出現一些極端情況,比如某個key對應的數據量有100萬,那么無論你的task數量增加到多少,這個對應著100萬數據的key肯定還是會分配到一個task中去處理,因此注定還是會發(fā)生數據傾斜的。所以這種方案只能說是在發(fā)現數據傾斜時嘗試使用的一種手段,嘗試去用最簡單的方法緩解數據傾斜而已,或者是和其他方案結合起來使用。
在理想情況下,reduce端并行度提升后,會在一定程度上減輕數據傾斜的問題,甚至基本消除數據傾斜;但是,在一些情況下,只會讓原來由于數據傾斜而運行緩慢的task運行速度稍有提升,或者避免了某些task的OOM問題,但是,仍然運行緩慢,此時,要及時放棄方案三,開始嘗試后面的方案。
4. 使用map join
正常情況下,join操作都會執(zhí)行shuffle過程,并且執(zhí)行的是reduce join,也就是先將所有相同的key和對應的value匯聚到一個reduce task中,然后再進行join。普通join的過程如下圖所示:
普通join過程
普通的join是會走shuffle過程的,而一旦shuffle,就相當于會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數據+map算子來實現與join同樣的效果,也就是map join,此時就不會發(fā)生shuffle操作,也就不會發(fā)生數據傾斜。
注意:RDD是并不能直接進行廣播的,只能將RDD內部的數據通過collect拉取到Driver內存然后再進行廣播。
1. 核心思路:
不使用join算子進行連接操作,而使用broadcast變量與map類算子實現join操作,進而完全規(guī)避掉shuffle類的操作,徹底避免數據傾斜的發(fā)生和出現。將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然后對其創(chuàng)建一個broadcast變量;接著對另外一個RDD執(zhí)行map類算子,在算子函數內,從broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數據用你需要的方式連接起來。
根據上述思路,根本不會發(fā)生shuffle操作,從根本上杜絕了join操作可能導致的數據傾斜問題。
當join操作有數據傾斜問題并且其中一個RDD的數據量較小時,可以優(yōu)先考慮這種方式,效果非常好。
map join的過程如下圖所示:
map join過程
2. 不適用場景分析:
由于Spark的廣播變量是在每個Executor中保存一個副本,如果兩個RDD數據量都比較大,那么如果將一個數據量比較大的RDD做成廣播變量,那么很有可能會造成內存溢出。
故障排除1. 避免OOM-out of memory
在Shuffle過程,reduce端task并不是等到map端task將其數據全部寫入磁盤后再去拉取,而是map端寫一點數據,reduce端task就會拉取一小部分數據,然后立即進行后面的聚合、算子函數的使用等操作。
reduce端task能夠拉取多少數據,由reduce拉取數據的緩沖區(qū)buffer來決定,因為拉取過來的數據都是先放在buffer中,然后再進行后續(xù)的處理,buffer的默認大小為48MB。
reduce端task會一邊拉取一邊計算,不一定每次都會拉滿48MB的數據,可能大多數時候拉取一部分數據就處理掉了。
雖然說增大reduce端緩沖區(qū)大小可以減少拉取次數,提升Shuffle性能,但是有時map端的數據量非常大,寫出的速度非?,此時reduce端的所有task在拉取的時候,有可能全部達到自己緩沖的最大極限值,即48MB,此時,再加上reduce端執(zhí)行的聚合函數的代碼,可能會創(chuàng)建大量的對象,這可能會導致內存溢出,即OOM。
如果一旦出現reduce端內存溢出的問題,我們可以考慮減小reduce端拉取數據緩沖區(qū)的大小,例如減少為12MB。
在實際生產環(huán)境中是出現過這種問題的,這是典型的以性能換執(zhí)行的原理。reduce端拉取數據的緩沖區(qū)減小,不容易導致OOM,但是相應的,reudce端的拉取次數增加,造成更多的網絡傳輸開銷,造成性能的下降。
注意,要保證任務能夠運行,再考慮性能的優(yōu)化。
2. 避免GC導致的shuffle文件拉取失敗
在Spark作業(yè)中,有時會出現shuffle file not found的錯誤,這是非常常見的一個報錯,有時出現這種錯誤以后,選擇重新執(zhí)行一遍,就不再報出這種錯誤。
出現上述問題可能的原因是Shuffle操作中,后面stage的task想要去上一個stage的task所在的Executor拉取數據,結果對方正在執(zhí)行GC,執(zhí)行GC會導致Executor內所有的工作現場全部停止,比如BlockManager、基于netty的網絡通信等,這就會導致后面的task拉取數據拉取了半天都沒有拉取到,就會報出shuffle file not found的錯誤,而第二次再次執(zhí)行就不會再出現這種錯誤。
可以通過調整reduce端拉取數據重試次數和reduce端拉取數據時間間隔這兩個參數來對Shuffle性能進行調整,增大參數值,使得reduce端拉取數據的重試次數增加,并且每次失敗后等待的時間間隔加長。
JVM GC導致的shuffle文件拉取失敗調整數據重試次數和reduce端拉取數據時間間隔:
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
.set("spark.shuffle.io.retryWait", "60s")
3. YARN-CLIENT模式導致的網卡流量激增問題
在YARN-client模式下,Driver啟動在本地機器上,而Driver負責所有的任務調度,需要與YARN集群上的多個Executor進行頻繁的通信。
假設有100個Executor,1000個task,那么每個Executor分配到10個task,之后,Driver要頻繁地跟Executor上運行的1000個task進行通信,通信數據非常多,并且通信品類特別高。這就導致有可能在Spark任務運行過程中,由于頻繁大量的網絡通訊,本地機器的網卡流量會激增。
注意,YARN-client模式只會在測試環(huán)境中使用,而之所以使用YARN-client模式,是由于可以看到詳細全面的log信息,通過查看log,可以鎖定程序中存在的問題,避免在生產環(huán)境下發(fā)生故障。
在生產環(huán)境下,使用的一定是YARN-cluster模式。在YARN-cluster模式下,就不會造成本地機器網卡流量激增問題,如果YARN-cluster模式下存在網絡通信的問題,需要運維團隊進行解決。
4. YARN-CLUSTER模式的JVM棧內存溢出無法執(zhí)行問題
當Spark作業(yè)中包含SparkSQL的內容時,可能會碰到YARN-client模式下可以運行,但是YARN-cluster模式下無法提交運行(報出OOM錯誤)的情況。
YARN-client模式下,Driver是運行在本地機器上的,Spark使用的JVM的PermGen的配置,是本地機器上的spark-class文件,JVM永久代的大小是128MB,這個是沒有問題的,但是在YARN-cluster模式下,Driver運行在YARN集群的某個節(jié)點上,使用的是沒有經過配置的默認設置,PermGen永久代大小為82MB。
SparkSQL的內部要進行很復雜的SQL的語義解析、語法樹轉換等等,非常復雜,如果sql語句本身就非常復雜,那么很有可能會導致性能的損耗和內存的占用,特別是對PermGen的占用會比較大。
所以,此時如果PermGen占用好過了82MB,但是又小于128MB,就會出現YARN-client模式下可以運行,YARN-cluster模式下無法運行的情況。
解決上述問題的方法是增加PermGen(永久代)的容量,需要在spark-submit腳本中對相關參數進行設置,設置方法如下:
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
通過上述方法就設置了Driver永久代的大小,默認為128MB,最大256MB,這樣就可以避免上面所說的問題。
5. 避免SparkSQL JVM棧內存溢出
當SparkSQL的sql語句有成百上千的or關鍵字時,就可能會出現Driver端的JVM棧內存溢出。
JVM棧內存溢出基本上就是由于調用的方法層級過多,產生了大量的,非常深的,超出了JVM棧深度限制的遞歸。(我們猜測SparkSQL有大量or語句的時候,在解析SQL時,例如轉換為語法樹或者進行執(zhí)行計劃的生成的時候,對于or的處理是遞歸,or非常多時,會發(fā)生大量的遞歸)
此時,建議將一條sql語句拆分為多條sql語句來執(zhí)行,每條sql語句盡量保證100個以內的子句。根據實際的生產環(huán)境試驗,一條sql語句的or關鍵字控制在100個以內,通常不會導致JVM棧內存溢出。

請輸入評論內容...
請輸入評論/評論長度6~500個字
最新活動更多
-
10月23日火熱報名中>> 2025是德科技創(chuàng)新技術峰會
-
10月23日立即報名>> Works With 開發(fā)者大會深圳站
-
10月24日立即參評>> 【評選】維科杯·OFweek 2025(第十屆)物聯網行業(yè)年度評選
-
11月27日立即報名>> 【工程師系列】汽車電子技術在線大會
-
12月18日立即報名>> 【線下會議】OFweek 2025(第十屆)物聯網產業(yè)大會
-
精彩回顧立即查看>> 【限時福利】TE 2025國際物聯網展·深圳站