編寫MR程序,讓其可以適合大部分的HBase表數據導入到HBase表數據。其中包括可以設置版本數、可以設置輸入表的列導入設置(選取其中某幾列)、可以設置輸出表的列導出設置(選取其中某幾列)。
原始表test1數據如下:
每個row key都有兩個版本的數據,這里只顯示了row key為1的數據
在hbase shell 中創建數據表:
1
2
3
4
5
6
7
|
create 'test2' ,{ NAME => 'cf1' ,VERSIONS => 10} // 保存無版本、無列導入設置、無列導出設置的數據 create 'test3' ,{ NAME => 'cf1' ,VERSIONS => 10} // 保存無版本、無列導入設置、有列導出設置的數據 create 'test4' ,{ NAME => 'cf1' ,VERSIONS => 10} // 保存無版本、有列導入設置、無列導出設置的數據 create 'test5' ,{ NAME => 'cf1' ,VERSIONS => 10} // 保存有版本、無列導入設置、無列導出設置的數據 create 'test6' ,{ NAME => 'cf1' ,VERSIONS => 10} // 保存有版本、無列導入設置、有列導出設置的數據 create 'test7' ,{ NAME => 'cf1' ,VERSIONS => 10} // 保存有版本、有列導入設置、無列導出設置的數據 create 'test8' ,{ NAME => 'cf1' ,VERSIONS => 10} // 保存有版本、有列導入設置、有列導出設置的數據 |
main函數入口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package GeneralHBaseToHBase; import org.apache.hadoop.util.ToolRunner; public class DriverTest { public static void main(String[] args) throws Exception { // 無版本設置、無列導入設置,無列導出設置 String[] myArgs1= new String[]{ "test1" , // 輸入表 "test2" , // 輸出表 "0" , // 版本大小數,如果值為0,則為默認從輸入表導出最新的數據到輸出表 "-1" , // 列導入設置,如果為-1 ,則沒有設置列導入 "-1" // 列導出設置,如果為-1,則沒有設置列導出 }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs1); // 無版本設置、有列導入設置,無列導出設置 String[] myArgs2= new String[]{ "test1" , "test3" , "0" , "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14" , "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs2); // 無版本設置,無列導入設置,有列導出設置 String[] myArgs3= new String[]{ "test1" , "test4" , "0" , "-1" , "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs3); // 有版本設置,無列導入設置,無列導出設置 String[] myArgs4= new String[]{ "test1" , "test5" , "2" , "-1" , "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs4); // 有版本設置、有列導入設置,無列導出設置 String[] myArgs5= new String[]{ "test1" , "test6" , "2" , "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14" , "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs5); // 有版本設置、無列導入設置,有列導出設置 String[] myArgs6= new String[]{ "test1" , "test7" , "2" , "-1" , "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs6); // 有版本設置、有列導入設置,有列導出設置 String[] myArgs7= new String[]{ "test1" , "test8" , "2" , "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14" , "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs7); } } |
driver:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
package GeneralHBaseToHBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import util.JarUtil; public class HBaseDriver extends Configured implements Tool{ public static String FROMTABLE= "" ; //導入表 public static String TOTABLE= "" ; //導出表 public static String SETVERSION= "" ; //是否設置版本 // args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable} @Override public int run(String[] args) throws Exception { if (args.length!= 5 ){ System.err.println( "Usage:\n demo.job.HBaseDriver <input> <inputTable> " + "<output> <outputTable>" + "< versions >" + " <set columns from inputTable> like <cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14> or <-1> " + "<set columns from outputTable> like <cf1:c1,cf1:c10,cf1:c14> or <-1>" ); return - 1 ; } Configuration conf = getConf(); FROMTABLE = args[ 0 ]; TOTABLE = args[ 1 ]; SETVERSION = args[ 2 ]; conf.set( "SETVERSION" , SETVERSION); if (!args[ 3 ].equals( "-1" )){ conf.set( "COLUMNFROMTABLE" , args[ 3 ]); } if (!args[ 4 ].equals( "-1" )){ conf.set( "COLUMNTOTABLE" , args[ 4 ]); } String jobName = "From table " +FROMTABLE+ " ,Import to " + TOTABLE; Job job = Job.getInstance(conf, jobName); job.setJarByClass(HBaseDriver. class ); Scan scan = new Scan(); // 判斷是否需要設置版本 if (SETVERSION != "0" || SETVERSION != "1" ){ scan.setMaxVersions(Integer.parseInt(SETVERSION)); } // 設置HBase表輸入:表名、scan、Mapper類、mapper輸出鍵類型、mapper輸出值類型 TableMapReduceUtil.initTableMapperJob( FROMTABLE, scan, HBaseToHBaseMapper. class , ImmutableBytesWritable. class , Put. class , job); // 設置HBase表輸出:表名,reducer類 TableMapReduceUtil.initTableReducerJob(TOTABLE, null , job); // 沒有 reducers, 直接寫入到 輸出文件 job.setNumReduceTasks( 0 ); return job.waitForCompletion( true ) ? 0 : 1 ; } private static Configuration configuration; public static Configuration getConfiguration(){ if (configuration== null ){ /** * TODO 了解如何直接從Windows提交代碼到Hadoop集群 * 并修改其中的配置為實際配置 */ configuration = new Configuration(); configuration.setBoolean( "mapreduce.app-submission.cross-platform" , true ); // 配置使用跨平臺提交任務 configuration.set( "fs.defaultFS" , "hdfs://master:8020" );// 指定namenode configuration.set( "mapreduce.framework.name" , "yarn" ); // 指定使用yarn框架 configuration.set( "yarn.resourcemanager.address" , "master:8032" ); // 指定resourcemanager configuration.set( "yarn.resourcemanager.scheduler.address" , "master:8030" ); // 指定資源分配器 configuration.set( "mapreduce.jobhistory.address" , "master:10020" ); // 指定historyserver configuration.set( "hbase.master" , "master:16000" ); configuration.set( "hbase.rootdir" , "hdfs://master:8020/hbase" ); configuration.set( "hbase.zookeeper.quorum" , "slave1,slave2,slave3" ); configuration.set( "hbase.zookeeper.property.clientPort" , "2181" ); //TODO 需export->jar file ; 設置正確的jar包所在位置 configuration.set( "mapreduce.job.jar" ,JarUtil.jar(HBaseDriver. class )); // 設置jar包路徑 } return configuration; } } |
mapper:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
|
package GeneralHBaseToHBase; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; import java.util.NavigableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HBaseToHBaseMapper extends TableMapper<ImmutableBytesWritable, Put> { Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper. class ); private static int versionNum = 0 ; private static String[] columnFromTable = null ; private static String[] columnToTable = null ; private static String column1 = null ; private static String column2 = null ; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); versionNum = Integer.parseInt(conf.get( "SETVERSION" , "0" )); column1 = conf.get( "COLUMNFROMTABLE" , null ); if (!(column1 == null )){ columnFromTable = column1.split( "," ); } column2 = conf.get( "COLUMNTOTABLE" , null ); if (!(column2 == null )){ columnToTable = column2.split( "," ); } } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { context.write(key, resultToPut(key,value)); } /*** * 把key,value轉換為Put * @param key * @param value * @return * @throws IOException */ private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException { HashMap<String, String> fTableMap = new HashMap<>(); HashMap<String, String> tTableMap = new HashMap<>(); Put put = new Put(key.get()); if (! (columnFromTable == null || columnFromTable.length == 0 )){ fTableMap = getFamilyAndColumn(columnFromTable); } if (! (columnToTable == null || columnToTable.length == 0 )){ tTableMap = getFamilyAndColumn(columnToTable); } if (versionNum== 0 ){ if (fTableMap.size() == 0 ){ if (tTableMap.size() == 0 ){ for (Cell kv : value.rawCells()) { put.add(kv); // 沒有設置版本,沒有設置列導入,沒有設置列導出 } return put; } else { return getPut(put, value, tTableMap); // 無版本、無列導入、有列導出 } } else { if (tTableMap.size() == 0 ){ return getPut(put, value, fTableMap); // 無版本、有列導入、無列導出 } else { return getPut(put, value, tTableMap); // 無版本、有列導入、有列導出 } } } else { if (fTableMap.size() == 0 ){ if (tTableMap.size() == 0 ){ return getPut1(put, value); // 有版本,無列導入,無列導出 } else { return getPut2(put, value, tTableMap); //有版本,無列導入,有列導出 } } else { if (tTableMap.size() == 0 ){ return getPut2(put,value,fTableMap); // 有版本,有列導入,無列導出 } else { return getPut2(put,value,tTableMap); // 有版本,有列導入,有列導出 } } } } /*** * 無版本設置的情況下,對于有列導入或者列導出 * @param put * @param value * @param tableMap * @return * @throws IOException */ private Put getPut(Put put,Result value,HashMap<String, String> tableMap) throws IOException{ for (Cell kv : value.rawCells()){ byte [] family = kv.getFamily(); if (tableMap.containsKey( new String(family))){ String columnStr = tableMap.get( new String(family)); ArrayList<String> columnBy = toByte(columnStr); if (columnBy.contains( new String(kv.getQualifier()))){ put.add(kv); //沒有設置版本,沒有設置列導入,有設置列導出 } } } return put; } /*** * (有版本,無列導入,有列導出)或者(有版本,有列導入,無列導出) * @param put * @param value * @param tTableMap * @return */ private Put getPut2(Put put,Result value,HashMap<String, String> tableMap){ NavigableMap< byte [], NavigableMap< byte [], NavigableMap<Long, byte []>>> map=value.getMap(); for ( byte [] family:map.keySet()){ if (tableMap.containsKey( new String(family))){ String columnStr = tableMap.get( new String(family)); log.info( "@@@@@@@@@@@" + new String(family)+ " " +columnStr); ArrayList<String> columnBy = toByte(columnStr); NavigableMap< byte [], NavigableMap<Long, byte []>> familyMap = map.get(family); //列簇作為key獲取其中的列相關數據 for ( byte [] column:familyMap.keySet()){ //根據列名循壞 log.info( "!!!!!!!!!!!" + new String(column)); if (columnBy.contains( new String(column))){ NavigableMap<Long, byte []> valuesMap = familyMap.get(column); for (Entry<Long, byte []> s:valuesMap.entrySet()){ //獲取列對應的不同版本數據,默認最新的一個 System.out.println( "***:" + new String(family)+ " " + new String(column)+ " " +s.getKey()+ " " + new String(s.getValue())); put.addColumn(family, column, s.getKey(),s.getValue()); } } } } } return put; } /*** * 有版本、無列導入、無列導出 * @param put * @param value * @return */ private Put getPut1(Put put,Result value){ NavigableMap< byte [], NavigableMap< byte [], NavigableMap<Long, byte []>>> map=value.getMap(); for ( byte [] family:map.keySet()){ NavigableMap< byte [], NavigableMap<Long, byte []>> familyMap = map.get(family); //列簇作為key獲取其中的列相關數據 for ( byte [] column:familyMap.keySet()){ //根據列名循壞 NavigableMap<Long, byte []> valuesMap = familyMap.get(column); for (Entry<Long, byte []> s:valuesMap.entrySet()){ //獲取列對應的不同版本數據,默認最新的一個 put.addColumn(family, column, s.getKey(),s.getValue()); } } } return put; } // str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} /*** * 得到列簇名與列名的k,v形式的map * @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} * @return map => {"cf1" => "c1,c2,c10,c11,c14"} */ private static HashMap<String, String> getFamilyAndColumn(String[] str){ HashMap<String, String> map = new HashMap<>(); HashSet<String> set = new HashSet<>(); for (String s : str){ set.add(s.split( ":" )[ 0 ]); } Object[] ob = set.toArray(); for ( int i= 0 ; i<ob.length;i++){ String family = String.valueOf(ob[i]); String columns = "" ; for ( int j= 0 ;j < str.length;j++){ if (family.equals(str[j].split( ":" )[ 0 ])){ columns += str[j].split( ":" )[ 1 ]+ "," ; } } map.put(family, columns.substring( 0 , columns.length()- 1 )); } return map; } private static ArrayList<String> toByte(String s){ ArrayList<String> b = new ArrayList<>(); String[] sarr = s.split( "," ); for ( int i= 0 ;i<sarr.length;i++){ b.add(sarr[i]); } return b; } } |
程序運行完之后,在hbase shell中查看每個表,看是否數據導入正確:
test2:(無版本、無列導入設置、無列導出設置)
test3 (無版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導出設置)
test4(無版本、無列導入設置、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))
test5(有版本、無列導入設置、無列導出設置)
test6(有版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導出設置)
test7(有版本、無列導入設置、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))
test8(有版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/Angelababy_huan/article/details/53236693