如果我們只局限于會使用Hive,而不考慮性能問題,就難搭建出一個完美的數倉,所以Hive性能調優是我們大數據從業者必須掌握的技能。本文將給大家講解Hive參數與性能調優的一些方法及技巧。
一、Limit 限制調整
一般情況下,limit語句還是需要執行整個查詢語句,然后再返回部分結果。
有一個配置屬性可以開啟,避免這種情況:對數據源進行抽樣。
hive.limit.optimize.enable=true -- 開啟對數據源進行采樣的功能
hive.limit.row.max.size -- 設置最小的采樣容量
hive.limit.optimize.limit.file -- 設置最大的采樣樣本數
缺點:有可能部分數據永遠不會被處理到
二、JOIN優化
1. 使用相同的連接鍵
當對3個或者更多個表進行join連接時,如果每個on子句都使用相同的連接鍵的話,那么只會產生一個MapReduce job。
2. 盡量盡早地過濾數據
減少每個階段的數據量,對于分區表要加分區,同時只選擇需要使用到的字段。
3. 盡量原子化操作
盡量避免一個SQL包含復雜邏輯,可以使用中間表來完成復雜的邏輯。
三、小文件優化
1.小文件過多產生的影響
- 首先對底層存儲HDFS來說,HDFS本身就不適合存儲大量小文件,小文件過多會導致namenode元數據特別大, 占用太多內存,嚴重影響HDFS的性能
- 對 Hive 來說,在進行查詢時,每個小文件都會當成一個塊,啟動一個Map任務來完成,而一個Map任務啟動和初始化的時間遠遠大于邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的Map數量是受限的
2.怎么解決小文件過多
1)使用 hive 自帶的 concatenate 命令,自動合并小文件
使用方法:
#對于非分區表 alter table A concatenate; #對于分區表 alter table B partition(day=20201224) concatenate;
注意:
- concatenate 命令只支持 RCFILE 和 ORC 文件類型。
- 使用concatenate命令合并小文件時不能指定合并后的文件數量,但可以多次執行該命令。
- 當多次使用concatenate后文件數量不在變化,這個跟參數mapreduce.input.fileinputformat.split.minsize=256mb 的設置有關,可設定每個文件的最小size。
2)調整參數減少Map數量
設置map輸入合并小文件的相關參數:
#執行Map前進行小文件合并 #CombineHiveInputFormat底層是 Hadoop的 CombineFileInputFormat 方法 #此方法是在mapper中將多個文件合成一個split作為輸入 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 默認 #每個Map最大輸入大小(這個值決定了合并后文件的數量) set mapred.max.split.size=256000000; -- 256M #一個節點上split的至少的大小(這個值決定了多個DataNode上的文件是否需要合并) set mapred.min.split.size.per.node=100000000; -- 100M #一個交換機下split的至少的大小(這個值決定了多個交換機上的文件是否需要合并) set mapred.min.split.size.per.rack=100000000; -- 100M
設置map輸出和reduce輸出進行合并的相關參數:
#設置map端輸出進行合并,默認為true set hive.merge.mapfiles = true; #設置reduce端輸出進行合并,默認為false set hive.merge.mapredfiles = true; #設置合并文件的大小 set hive.merge.size.per.task = 256*1000*1000; -- 256M #當輸出文件的平均大小小于該值時,啟動一個獨立的MapReduce任務進行文件merge set hive.merge.smallfiles.avgsize=16000000; -- 16M
啟用壓縮:
# hive的查詢結果輸出是否進行壓縮 set hive.exec.compress.output=true; # MapReduce Job的結果輸出是否使用壓縮 set mapreduce.output.fileoutputformat.compress=true;
3)減少Reduce的數量
#reduce 的個數決定了輸出的文件的個數,所以可以調整reduce的個數控制hive表的文件數量, #hive中的分區函數 distribute by 正好是控制MR中partition分區的, #然后通過設置reduce的數量,結合分區函數讓數據均衡的進入每個reduce即可。 #設置reduce的數量有兩種方式,第一種是直接設置reduce個數 set mapreduce.job.reduces=10; #第二種是設置每個reduce的大小,Hive會根據數據總大小猜測確定一個reduce個數 set hive.exec.reducers.bytes.per.reducer=5120000000; -- 默認是1G,設置為5G #執行以下語句,將數據均衡的分配到reduce中 set mapreduce.job.reduces=10; insert overwrite table A partition(dt) select * from B distribute by rand(); 解釋:如設置reduce數量為10,則使用 rand(), 隨機生成一個數 x % 10 , 這樣數據就會隨機進入 reduce 中,防止出現有的文件過大或過小
4)使用hadoop的archive將小文件歸檔
Hadoop Archive簡稱HAR,是一個高效地將小文件放入HDFS塊中的文件存檔工具,它能夠將多個小文件打包成一個HAR文件,這樣在減少namenode內存使用的同時,仍然允許對文件進行透明的訪問。
#用來控制歸檔是否可用 set hive.archive.enabled=true; #通知Hive在創建歸檔時是否可以設置父目錄 set hive.archive.har.parentdir.settable=true; #控制需要歸檔文件的大小 set har.partfile.size=1099511627776; #使用以下命令進行歸檔 ALTER TABLE A ARCHIVE PARTITION(dt='2022-02-24', hr='12'); #對已歸檔的分區恢復為原文件 ALTER TABLE A UNARCHIVE PARTITION(dt='2022-02-24', hr='12');
注意:
歸檔的分區可以查看不能 insert overwrite,必須先unarchive
四、本地模式
有時hive的輸入數據量是非常小的。在這種情況下,為查詢出發執行任務的時間消耗可能會比實際job的執行時間要多的多。對于大多數這種情況,hive可以通過本地模式在單臺機器上處理所有的任務。對于小數據集,執行時間會明顯被縮短。
set hive.exec.mode.local.auto=true;
當一個job滿足如下條件才能真正使用本地模式:
- job的輸入數據大小必須小于參數:hive.exec.mode.local.auto.inputbytes.max (默認128MB)
- job的map數必須小于參數:hive.exec.mode.local.auto.tasks.max (默認4)
- job的reduce數必須為0或者1
可用參數 hive.mapred.local.mem (默認0)控制child jvm使用的最大內存數。
五、strict模式
開啟嚴格模式對分區表進行查詢,在where子句中沒有加分區過濾的話,將禁止提交任務(默認:nonstrict)
set hive.mapred.mode=strict 開啟嚴格模式
注:使用嚴格模式可以禁止以下三種類型的查詢:
1. 對分區表的查詢必須使用到分區相關的字段
分區表的數據量通常都比較大,對分區表的查詢必須使用到分區相關的字段,不允許掃描所有分區,想想也是如果掃描所有分區的話那么對表進行分區還有什么意義呢。
當然某些特殊情況可能還是需要掃描所有分區,這個時候就需要記得確保嚴格模式被關閉。
2. order by必須帶limit
因為要保證全局有序需要將所有的數據拉到一個Reducer上,當數據集比較大時速度會很慢。個人猜測可能是設置了limit N之后就會有一個很簡單的優化算法:每個Reducer排序取N然后再合并排序取N即可,可大大減少數據傳輸量。
3. 禁止笛卡爾積查詢(join必須有on連接條件)
Hive不會對where中的連接條件優化為on,所以join必須帶有on連接條件,不允許兩個表直接相乘。
六、并行執行優化
Hive會將一個查詢轉化成一個或者多個階段。這樣的階段可以是MapReduce階段、抽樣階段、合并階段、limit階段。或者Hive執行過程中可能需要的其他階段。默認情況下,Hive一次只會執行一個階段。不過,某個特定的job可能包含眾多的階段,而這些階段可能并非完全互相依賴的,也就是說有些階段是可以并行執行的,這樣可能使得整個job的執行時間縮短。如果有更多的階段可以并行執行,那么job可能就越快完成。
通過設置參數hive.exec.parallel值為true,就可以開啟并發執行。在共享集群中,需要注意下,如果job中并行階段增多,那么集群利用率就會增加。
set hive.exec.parallel=true; //打開任務并行執行 set hive.exec.parallel.thread.number=16; //同一個sql允許最大并行度,默認為8。
當然得是在系統資源比較空閑的時候才有優勢,否則沒資源,并行也起不來。
七、JVM優化
JVM重用是Hadoop調優參數的內容,其對Hive的性能具有非常大的影響,特別是對于很難避免小文件的場景或task特別多的場景,這類場景大多數執行時間都很短。
Hadoop的默認配置通常是使用派生JVM來執行map和Reduce任務的。這時JVM的啟動過程可能會造成相當大的開銷,尤其是執行的job包含有成百上千task任務的情況。JVM重用可以使得JVM實例在同一個job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中進行配置。通常在10-20之間,具體多少需要根據具體業務場景測試得出。
<property> <name>mapreduce.job.jvm.numtasksname> <value>10value> <description>How many tasks to run per jvm. If set to -1, there is no limit. description> property>
我們也可以在Hive中設置:
set mapred.job.reuse.jvm.num.tasks=10 設置jvm重用
這個功能的缺點是,開啟JVM重用將一直占用使用到的task插槽,以便進行重用,直到任務完成后才能釋放。如果某個“不平衡的”job中有某幾個reduce task執行的時間要比其他Reduce task消耗的時間多的多的話,那么保留的插槽就會一直空閑著卻無法被其他的job使用,直到所有的task都結束了才會釋放。
八、推測執行優化
在分布式集群環境下,因為程序bug(包括Hadoop本身的bug),負載不均衡或者資源分布不均等原因,會造成同一個作業的多個任務之間運行速度不一致,有些任務的運行速度可能明顯慢于其他任務(比如一個作業的某個任務進度只有50%,而其他所有任務已經運行完畢),則這些任務會拖慢作業的整體執行進度。為了避免這種情況發生,Hadoop采用了推測執行(Speculative Execution)機制,它根據一定的法則推測出“拖后腿”的任務,并為這樣的任務啟動一個備份任務,讓該任務與原始任務同時處理同一份數據,并最終選用最先成功運行完成任務的計算結果作為最終結果。
設置開啟推測執行參數:Hadoop的mapred-site.xml文件中進行配置:
<property> <name>mapreduce.map.speculativename> <value>truevalue> <description>If true, then multiple instances of some map tasks may be executed in parallel.description> property> <property> <name>mapreduce.reduce.speculativename> <value>truevalue> <description>If true, then multiple instances of some reduce tasks may be executed in parallel.description> property>
Hive本身也提供了配置項來控制reduce-side的推測執行:
set hive.mapred.reduce.tasks.speculative.execution=true
關于調優這些推測執行變量,還很難給一個具體的建議。如果用戶因為輸入數據量很大而需要執行長時間的map或者reduce task的話,那么啟動推測執行造成的浪費是非常巨大的。
九、數據傾斜優化
數據傾斜的原理都知道,就是某一個或幾個key占據了整個數據的90%,這樣整個任務的效率都會被這個key的處理拖慢,同時也可能會因為相同的key會聚合到一起造成內存溢出。
Hive的數據傾斜一般的處理方案:
常見的做法,通過參數調優:
set hive.map.aggr=true; set hive.groupby.skewindata = ture;
當選項設定為true時,生成的查詢計劃有兩個MapReduce任務。
在第一個MapReduce中,map的輸出結果集合會隨機分布到reduce中,每個reduce做部分聚合操作,并輸出結果。
這樣處理的結果是,相同的Group By Key有可能分發到不同的reduce中,從而達到負載均衡的目的;
第二個MapReduce任務再根據預處理的數據結果按照Group By Key分布到reduce中(這個過程可以保證相同的Group By Key分布到同一個reduce中),最后完成最終的聚合操作。
但是這個處理方案對于我們來說是個黑盒,無法把控。
那么在日常需求的情況下如何處理這種數據傾斜的情況呢;
- sample采樣,獲取哪些集中的key;
- 將集中的key按照一定規則添加隨機數;
- 進行join,由于打散了,所以數據傾斜避免了;
- 在處理結果中對之前的添加的隨機數進行切分,變成原始的數據。
十、動態分區調整
動態分區屬性:設置為true表示開啟動態分區功能(默認為false)
hive.exec.dynamic.partition=true;
動態分區屬性:設置為nonstrict,表示允許所有分區都是動態的(默認為strict) 設置為strict,表示必須保證至少有一個分區是靜態的
hive.exec.dynamic.partition.mode=strict;
動態分區屬性:每個mapper或reducer可以創建的最大動態分區個數
hive.exec.max.dynamic.partitions.pernode=100;
動態分區屬性:一個動態分區創建語句可以創建的最大動態分區個數
hive.exec.max.dynamic.partitions=1000;
動態分區屬性:全局可以創建的最大文件個數
hive.exec.max.created.files=100000;
十一、其他參數調優
開啟CLI提示符前打印出當前所在的數據庫名
set hive.cli.print.current.db=true;
讓CLI打印出字段名稱
hive.cli.print.header=true;
設置任務名稱,方便查找監控
set mapred.job.name=P_DWA_D_IA_S_USER_PROD;
決定是否可以在 Map 端進行聚合操作
set hive.map.aggr=true;
有數據傾斜的時候進行負載均衡
set hive.groupby.skewindata=true;
對于簡單的不需要聚合的類似SELECT col from table LIMIT n語句,不需要起MapReduce job,直接通過Fetch task獲取數據
set hive.fetch.task.conversion=more;
最后
代碼優化原則:
理透需求原則,這是優化的根本;
把握數據全鏈路原則,這是優化的脈絡;
堅持代碼的簡潔原則,這讓優化更加簡單;
沒有瓶頸時談論優化,這是自尋煩惱。
原文地址:https://mp.weixin.qq.com/s/F2WKvFxQBGlp-6nBuIU6lw