2018-03-06 15:09:58 1686瀏覽
如今學(xué)習(xí)大數(shù)據(jù)開發(fā)的人不斷的增加,但是關(guān)于大數(shù)據(jù)也有不少的小伙伴不是很了解,本篇文章小編就和大家一塊來看一下扣丁學(xué)堂大數(shù)據(jù)分析之2018大數(shù)據(jù)Spark性能調(diào)優(yōu)之?dāng)?shù)據(jù)傾斜,希望可以幫到喜歡或者準(zhǔn)備參加大數(shù)據(jù)培訓(xùn)學(xué)習(xí)大數(shù)據(jù)的小伙伴們。
絕大多數(shù)task執(zhí)行得都非??欤珎€別task執(zhí)行極慢。比如總共有1000個task,997個task都在1分鐘之內(nèi)執(zhí)行完了,但是剩余兩三個task卻要一兩個小時,這種情況很常見。2018大數(shù)據(jù)Spark性能調(diào)優(yōu)之?dāng)?shù)據(jù)傾斜,扣丁學(xué)堂大數(shù)據(jù)培訓(xùn)老師分享給大家。
原本能夠正常執(zhí)行的Spark作業(yè),某天突然報出OOM(內(nèi)存溢出)異常,觀察異常棧,是我們寫的業(yè)務(wù)代碼造成的。這種情況比較少見,數(shù)據(jù)傾斜發(fā)生的原因:
在進行shuffle的時候,必須將各個節(jié)點上相同的key拉取到某個節(jié)點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應(yīng)的數(shù)據(jù)量特別大的話,就會發(fā)生數(shù)據(jù)傾斜,因此出現(xiàn)數(shù)據(jù)傾斜的時候,Spark作業(yè)看起來會運行得非常緩慢,甚至可能因為某個task處理的數(shù)據(jù)量過大導(dǎo)致內(nèi)存溢出。
數(shù)據(jù)傾斜只會發(fā)生在shuffle過程中 distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
問題分析:
某個task執(zhí)行特別慢的情況。
首先要看的,就是數(shù)據(jù)傾斜發(fā)生在第幾個stage中。
如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到當(dāng)前運行到了第幾個stage。
如果是用yarn-cluster模式提交,則可以通過Spark Web UI來查看當(dāng)前運行到了第幾個stage。
此外,無論是使用yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI上深入看一下當(dāng)前這個stage各個task分配的數(shù)據(jù)量,從而進一步確定是不是task分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜。
知道數(shù)據(jù)傾斜發(fā)生在哪一個stage之后,接著我們就需要根據(jù)stage劃分原理,推算出來發(fā)生傾斜的那個stage對應(yīng)代碼中的哪一部分(Spark是根據(jù)shuffle類算子來進行stage的劃分)。
某個task莫名其妙內(nèi)存溢出的情況。
看log的異常棧,通過異常棧信息就可以定位到你的代碼中哪一行發(fā)生了內(nèi)存溢出。然后在那行代碼附近找找,一般也會有shuffle類算子,此時很可能就是這個算子導(dǎo)致了數(shù)據(jù)傾斜。不能單純靠偶然的內(nèi)存溢出就判定發(fā)生了數(shù)據(jù)傾斜。因為自己編寫的代碼的bug,以及偶然出現(xiàn)的數(shù)據(jù)異常,也可能會導(dǎo)致內(nèi)存溢出。
查看導(dǎo)致數(shù)據(jù)傾斜的key的數(shù)據(jù)分布情況。
知道了數(shù)據(jù)傾斜發(fā)生在哪里之后,通常需要分析一下哪個執(zhí)行了shuffle操作并且導(dǎo)致了數(shù)據(jù)傾斜的RDD表。查看一下其中key的分布情況,這主要是為之后選擇哪一種技術(shù)方案提供依據(jù)。針對不同的key分布與不同的shuffle算子組合起來的各種情況,可能需要選擇不同的技術(shù)方案來解決。
有很多種查看key分布的方式
如果是Spark SQL中的group by、join語句導(dǎo)致的數(shù)據(jù)傾斜,那么就查詢一下SQL中使用的表的key分布情況。
如果是對Spark RDD執(zhí)行shuffle算子導(dǎo)致的數(shù)據(jù)傾斜,那么可以在Spark作業(yè)中加入查看key分布的代碼,比如RDD.countByKey()。然后對統(tǒng)計出來的各個key出現(xiàn)的次數(shù),collect/take到客戶端打印一下,就可以看到key的分布情況。
解決方案
方案一:使用Hive ETL預(yù)處理數(shù)據(jù)
適用場景:導(dǎo)致數(shù)據(jù)傾斜的是hive表。如果該Hive表中的數(shù)據(jù)本身很不均勻,而且業(yè)務(wù)場景需要頻繁使用Spark對Hive表執(zhí)行某個分析操作,那么比較適合使用這種技術(shù)方案。
實現(xiàn)思路:可以評估一下,是否可以通過Hive來進行數(shù)據(jù)預(yù)處理(即通過Hive ETL預(yù)先對數(shù)據(jù)按照key進行聚合,或者是預(yù)先和其他表進行join),然后在Spark作業(yè)中針對的數(shù)據(jù)源就不是原來的Hive表了,而是預(yù)處理后的Hive表。此時由于數(shù)據(jù)已經(jīng)預(yù)先進行過聚合或join操作了,那么在Spark作業(yè)中也就不需要使用原先的shuffle類算子執(zhí)行這類操作了。
方案優(yōu)點:實現(xiàn)起來簡單便捷,效果還非常好,完全規(guī)避掉了數(shù)據(jù)傾斜,Spark作業(yè)的性能會大幅度提升。
方案缺點:Hive ETL中還是會發(fā)生數(shù)據(jù)傾斜。
在一些Java系統(tǒng)與Spark結(jié)合使用的項目中,會出現(xiàn)Java代碼頻繁調(diào)用Spark作業(yè)的場景,而且對Spark作業(yè)的執(zhí)行性能要求很高,就比較適合使用這種方案。將數(shù)據(jù)傾斜提前到上游的Hive ETL,每天僅執(zhí)行一次,只有那一次是比較慢的,而之后每次Java調(diào)用Spark作業(yè)時,執(zhí)行速度都會很快,能夠提供更好的用戶體驗。
用戶通過Java Web系統(tǒng)提交數(shù)據(jù)分析統(tǒng)計任務(wù),后端通過Java提交Spark作業(yè)進行數(shù)據(jù)分析統(tǒng)計。要求Spark作業(yè)速度必須要快。
方案二:過濾少數(shù)導(dǎo)致傾斜的key
適用場景:如果發(fā)現(xiàn)導(dǎo)致傾斜的key就少數(shù)幾個,而且對計算本身的影響并不大的話。
實現(xiàn)思路:將導(dǎo)致數(shù)據(jù)傾斜的key給過濾掉之后,這些key就不會參與計算了。
方案優(yōu)點:實現(xiàn)簡單,而且效果也很好,可以完全規(guī)避掉數(shù)據(jù)傾斜。
方案缺點:適用場景不多,大多數(shù)情況下,導(dǎo)致傾斜的key還是很多的,并不是只有少數(shù)幾個。
方案三:提高shuffle操作的并行度
處理數(shù)據(jù)傾斜最簡單的一種方案
實現(xiàn)思路:在對RDD執(zhí)行shuffle算子時,給shuffle算子傳入一個參數(shù),比如reduceByKey(1000),該參數(shù)就設(shè)置了這個shuffle算子執(zhí)行時shuffle read task的數(shù)量。對于Spark SQL中的shuffle類語句,比如group by、join等,需要設(shè)置一個參數(shù),即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度,該值默認(rèn)是200,對于很多場景來說都有點過小。
實現(xiàn)原理:增加shuffle read task的數(shù)量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數(shù)據(jù)。
方案優(yōu)點:實現(xiàn)起來比較簡單,可以有效緩解和減輕數(shù)據(jù)傾斜的影響。
方案缺點:只是緩解了數(shù)據(jù)傾斜而已,沒有徹底根除問題,根據(jù)實踐經(jīng)驗來看,其效果有限。
該方案通常無法徹底解決數(shù)據(jù)傾斜,因為如果出現(xiàn)一些極端情況,比如某個key對應(yīng)的數(shù)據(jù)量有100萬,那么無論你的task數(shù)量增加到多少,這個對應(yīng)著100萬數(shù)據(jù)的key肯定還是會分配到一個task中去處理,因此注定還是會發(fā)生數(shù)據(jù)傾斜的。
方案四:兩階段聚合(局部聚合+全局聚合)
適用場景:對RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
實現(xiàn)思路:這個方案的核心實現(xiàn)思路就是進行兩階段聚合。第一次是局部聚合,先給每個key都打上一個隨機數(shù),比如10以內(nèi)的隨機數(shù),此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數(shù)后的數(shù)據(jù),執(zhí)行reduceByKey等聚合操作,進行局部聚合,那么局部聚合結(jié)果,就會變成了(1_hello, 2) (2_hello, 2)。然后將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結(jié)果了,比如(hello, 4)。
方案優(yōu)點:對于聚合類的shuffle操作導(dǎo)致的數(shù)據(jù)傾斜,效果是非常不錯的。通常都可以解決掉數(shù)據(jù)傾斜,或者至少是大幅度緩解數(shù)據(jù)傾斜,將Spark作業(yè)的性能提升數(shù)倍以上。
方案缺點:僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。
// 第一步,給RDD中的每個key都打上一個隨機前綴。
JavaPairRDD randomPrefixRdd = rdd.mapToPair(
new PairFunction, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2(prefix + "_" + tuple._1, tuple._2);
}
});// 第二步,對打上隨機前綴的key進行局部聚合。
JavaPairRDD localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});// 第三步,去除RDD中每個key的隨機前綴。
JavaPairRDD removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2(originalKey, tuple._2);
}
});// 第四步,對去除了隨機前綴的RDD進行全局聚合。
JavaPairRDD globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
方案五:將reduce join轉(zhuǎn)為map join
適用場景:在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的數(shù)據(jù)量比較小(比如幾百M或者一兩G),比較適用此方案。
實現(xiàn)思路:不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現(xiàn)join操作,進而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。
實現(xiàn)原理:普通的join是會走shuffle過程的,而一旦shuffle,就相當(dāng)于會將相同key的數(shù)據(jù)拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來實現(xiàn)與join同樣的效果,也就是map join,此時就不會發(fā)生shuffle操作,也就不會發(fā)生數(shù)據(jù)傾斜。
方案優(yōu)點:對join操作導(dǎo)致的數(shù)據(jù)傾斜,效果非常好,因為根本就不會發(fā)生shuffle,也就根本不會發(fā)生數(shù)據(jù)傾斜。
方案缺點:適用場景較少,因為這個方案只適用于一個大表和一個小表的情況
方案六:采樣傾斜key并分拆join操作
適用場景:兩個RDD/Hive表進行join的時候,如果數(shù)據(jù)量都比較大,那么此時可以看一下兩個RDD/Hive表中的key分布情況。如果出現(xiàn)數(shù)據(jù)傾斜,是因為其中某一個RDD/Hive表中的少數(shù)幾個key的數(shù)據(jù)量過大,而另一個RDD/Hive表中的所有key都分布比較均勻,那么采用這個解決方案是比較合適的。
實現(xiàn)原理:對于join導(dǎo)致的數(shù)據(jù)傾斜,如果只是某幾個key導(dǎo)致了傾斜,可以將少數(shù)幾個key分拆成獨立RDD,并附加隨機前綴打散成n份去進行join,此時這幾個key對應(yīng)的數(shù)據(jù)就不會集中在少數(shù)幾個task上,而是分散到多個task進行join了。
方案優(yōu)點:對于join導(dǎo)致的數(shù)據(jù)傾斜,如果只是某幾個key導(dǎo)致了傾斜,采用該方式可以用最有效的方式打散key進行join。而且只需要針對少數(shù)傾斜key對應(yīng)的數(shù)據(jù)進行擴容n倍,不需要對全量數(shù)據(jù)進行擴容。避免了占用過多內(nèi)存。
方案缺點:如果導(dǎo)致傾斜的key特別多的話,比如成千上萬個key都導(dǎo)致數(shù)據(jù)傾斜,那么這種方式也不適合。
方案七:使用隨機前綴和擴容RDD進行join
適用場景:如果在進行join操作時,RDD中有大量的key導(dǎo)致數(shù)據(jù)傾斜,那么進行分拆key也沒什么意義,此時就只能使用最后一種方案來解決問題了。
實現(xiàn)原理:將原先一樣的key通過附加隨機前綴變成不一樣的key,然后就可以將這些處理后的“不同key”分散到多個task中去處理,而不是讓一個task處理大量的相同key。該方案與“解決方案六”的不同之處就在于,上一種方案是盡量只對少數(shù)傾斜key對應(yīng)的數(shù)據(jù)進行特殊處理,由于處理過程需要擴容RDD,因此上一種方案擴容RDD后對內(nèi)存的占用并不大;而這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進行單獨處理,因此只能對整個RDD進行數(shù)據(jù)擴容,對內(nèi)存資源要求很高。
方案優(yōu)點:對join類型的數(shù)據(jù)傾斜基本都可以處理,而且效果也相對比較顯著,性能提升效果非常不錯。
方案缺點:該方案更多的是緩解數(shù)據(jù)傾斜,而不是徹底避免數(shù)據(jù)傾斜。而且需要對整個RDD進行擴容,對內(nèi)存資源要求很高。
好了,關(guān)于2018大數(shù)據(jù)Spark性能調(diào)優(yōu)之?dāng)?shù)據(jù)傾斜就先為大家說到這里,喜歡大數(shù)據(jù)或者想要學(xué)習(xí)的小伙伴可以觀看扣丁學(xué)堂的大數(shù)據(jù)視頻教程學(xué)習(xí),希望大家都能學(xué)有所成!扣丁學(xué)堂大數(shù)據(jù)學(xué)習(xí)群:209080834。
【關(guān)注微信公眾號獲取更多學(xué)習(xí)資料】
查看更多關(guān)于“大數(shù)據(jù)培訓(xùn)資訊”的相關(guān)文章>>