java 中 自定義OutputFormat的實例詳解
實例代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
package com.ccse.hadoop.outputformat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; public class MySelfOutputFormatApp { public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput" ; public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput" ; public final static String OUTPUT_FILENAME = "/abc" ; public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI(OUTPUT_PATH), conf); fileSystem.delete( new Path(OUTPUT_PATH), true ); Job job = new Job(conf, MySelfOutputFormatApp. class .getSimpleName()); job.setJarByClass(MySelfOutputFormatApp. class ); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper. class ); job.setMapOutputKeyClass(Text. class ); job.setMapOutputValueClass(LongWritable. class ); job.setReducerClass(MyReducer. class ); job.setOutputKeyClass(Text. class ); job.setOutputValueClass(LongWritable. class ); job.setOutputFormatClass(MyselfOutputFormat. class ); job.waitForCompletion( true ); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private Text word = new Text(); private LongWritable writable = new LongWritable( 1 ); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { if (value != null ) { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, writable); } } } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long sum = 0 ; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } } public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> { private FSDataOutputStream outputStream = null ; @Override public RecordWriter<Text, LongWritable> getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { try { FileSystem fileSystem = FileSystem.get( new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration()); //指定文件的輸出路徑 final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH + MySelfOutputFormatApp.OUTPUT_FILENAME); this .outputStream = fileSystem.create(path, false ); } catch (URISyntaxException e) { e.printStackTrace(); } return new MySelfRecordWriter(outputStream); } @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new FileOutputCommitter( new Path(MySelfOutputFormatApp.OUTPUT_PATH), context); } } public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> { private FSDataOutputStream outputStream = null ; public MySelfRecordWriter(FSDataOutputStream outputStream) { this .outputStream = outputStream; } @Override public void write(Text key, LongWritable value) throws IOException, InterruptedException { this .outputStream.writeBytes(key.toString()); this .outputStream.writeBytes( "\t" ); this .outputStream.writeLong(value.get()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { this .outputStream.close(); } } } |
2.OutputFormat是用于處理各種輸出目的地的。
2.1 OutputFormat需要寫出去的鍵值對,是來自于Reducer類,是通過RecordWriter獲得的。
2.2 RecordWriter中的write(...)方法只有k和v,寫到哪里去哪?這要通過單獨傳入OutputStream來處理。write就是把k和v寫入到OutputStream中的。
2.3 RecordWriter類位于OutputFormat中的。因此,我們自定義的OutputFromat必須繼承OutputFormat類型。那么,流對象必須在getRecordWriter(...)方法中獲得。
以上就是java 中自定義OutputFormat的實例,如有疑問請留言或者到本站社區交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
原文鏈接:http://blog.csdn.net/woshisap/article/details/42320129