前段時間需要進行大批量數據導入,DBA給提供的是CVS文件,但是每個CVS文件都好幾個GB大小,直接進行load,數據庫很慢還會產生內存不足的問題,為了實現這個功能,寫了個快速切分文件的程序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.*; import java.util.*; import java.util.concurrent.*; public class FileSplitUtil { private final static Logger log = LogManager.getLogger(FileSplitUtil. class ); private static final long originFileSize = 1024 * 1024 * 100 ; // 100M private static final int blockFileSize = 1024 * 1024 * 64 ; // 防止中文亂碼,必須取2的N次方 /** * CVS文件分隔符 */ private static final char cvsSeparator = '^' ; public static void main(String args[]){ long start = System.currentTimeMillis(); try { String fileName = "D:\\csvtest\\aa.csv" ; File sourceFile = new File(fileName); if (sourceFile.length() >= originFileSize) { String cvsFileName = fileName.replaceAll( "\\\\" , "/" ); FileSplitUtil fileSplitUtil = new FileSplitUtil(); List<String> parts=fileSplitUtil.splitBySize(cvsFileName, blockFileSize); for (String part:parts){ System.out.println( "partName is:" +part); } } System.out.println( "總文件長度" +sourceFile.length()+ ",拆分文件耗時:" + (System.currentTimeMillis() - start) + "ms." ); } catch (Exception e){ log.info(e.getStackTrace()); } } /** * 拆分文件 * * @param fileName 待拆分的完整文件名 * @param byteSize 按多少字節大小拆分 * @return 拆分后的文件名列表 */ public List<String> splitBySize(String fileName, int byteSize) throws IOException, InterruptedException { List<String> parts = new ArrayList<String>(); File file = new File(fileName); int count = ( int ) Math.ceil(file.length() / ( double ) byteSize); int countLen = (count + "" ).length(); RandomAccessFile raf = new RandomAccessFile(fileName, "r" ); long totalLen = raf.length(); CountDownLatch latch = new CountDownLatch(count); for ( int i = 0 ; i < count; i++) { String partFileName = file.getPath() + "." + leftPad((i + 1 ) + "" , countLen, '0' ) + ".cvs" ; int readSize=byteSize; long startPos=( long )i * byteSize; long nextPos=( long )(i+ 1 ) * byteSize; if (nextPos>totalLen){ readSize= ( int ) (totalLen-startPos); } new SplitRunnable(readSize, startPos, partFileName, file, latch).run(); parts.add(partFileName); } latch.await(); //等待所有文件寫完 //由于切割時可能會導致行被切斷,加工所有的的分割文件,合并行 mergeRow(parts); return parts; } /** * 分割處理Runnable * * @author supeidong */ private class SplitRunnable implements Runnable { int byteSize; String partFileName; File originFile; long startPos; CountDownLatch latch; public SplitRunnable( int byteSize, long startPos, String partFileName, File originFile, CountDownLatch latch) { this .startPos = startPos; this .byteSize = byteSize; this .partFileName = partFileName; this .originFile = originFile; this .latch = latch; } public void run() { RandomAccessFile rFile; OutputStream os; try { rFile = new RandomAccessFile(originFile, "r" ); byte [] b = new byte [byteSize]; rFile.seek(startPos); // 移動指針到每“段”開頭 int s = rFile.read(b); os = new FileOutputStream(partFileName); os.write(b, 0 , s); os.flush(); os.close(); latch.countDown(); } catch (IOException e) { log.error(e.getMessage()); latch.countDown(); } } } /** * 合并被切斷的行 * * @param parts */ private void mergeRow(List<String> parts) { List<PartFile> partFiles = new ArrayList<PartFile>(); try { //組裝被切分表對象 for ( int i= 0 ;i<parts.size();i++) { String partFileName=parts.get(i); File splitFileTemp = new File(partFileName); if (splitFileTemp.exists()) { PartFile partFile = new PartFile(); BufferedReader reader= new BufferedReader( new InputStreamReader( new FileInputStream(splitFileTemp), "gbk" )); String firstRow = reader.readLine(); String secondRow = reader.readLine(); String endRow = readLastLine(partFileName); partFile.setPartFileName(partFileName); partFile.setFirstRow(firstRow); partFile.setEndRow(endRow); if (i>= 1 ){ String prePartFile=parts.get(i - 1 ); String preEndRow = readLastLine(prePartFile); partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow)); } partFiles.add(partFile); reader.close(); } } //進行需要合并的行的寫入 for ( int i = 0 ; i < partFiles.size() - 1 ; i++) { PartFile partFile = partFiles.get(i); PartFile partFileNext = partFiles.get(i + 1 ); StringBuilder sb = new StringBuilder(); if (partFileNext.getFirstIsFull()) { sb.append( "\r\n" ); sb.append(partFileNext.getFirstRow()); } else { sb.append(partFileNext.getFirstRow()); } writeLastLine(partFile.getPartFileName(),sb.toString()); } } catch (Exception e) { log.error(e.getMessage()); } } /** * 得到某個字符出現的次數 * @param s * @return */ private int getCharCount(String s) { int count = 0 ; for ( int i = 0 ; i < s.length(); i++) { if (s.charAt(i) == cvsSeparator) { count++; } } return count; } /** * 采用BufferedInputStream方式讀取文件行數 * * @param filename * @return */ public int getFileRow(String filename) throws IOException { InputStream is = new BufferedInputStream( new FileInputStream(filename)); byte [] c = new byte [ 1024 ]; int count = 0 ; int readChars = 0 ; while ((readChars = is.read(c)) != - 1 ) { for ( int i = 0 ; i < readChars; ++i) { if (c[i] == '\n' ) ++count; } } is.close(); return count; } /** * 讀取最后一行數據 * @param filename * @return * @throws IOException */ private String readLastLine(String filename) throws IOException { // 使用RandomAccessFile , 從后找最后一行數據 RandomAccessFile raf = new RandomAccessFile(filename, "r" ); long len = raf.length(); String lastLine = "" ; if (len!=0L) { long pos = len - 1 ; while (pos > 0 ) { pos--; raf.seek(pos); if (raf.readByte() == '\n' ) { lastLine = raf.readLine(); lastLine= new String(lastLine.getBytes( "8859_1" ), "gbk" ); break ; } } } raf.close(); return lastLine; } /** * 修改最后一行數據 * @param fileName * @param lastString * @return * @throws IOException */ private void writeLastLine(String fileName,String lastString){ try { // 打開一個隨機訪問文件流,按讀寫方式 RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw" ); // 文件長度,字節數 long fileLength = randomFile.length(); //將寫文件指針移到文件尾。 randomFile.seek(fileLength); //此處必須加gbk,否則會出現寫入亂碼 randomFile.write(lastString.getBytes( "gbk" )); randomFile.close(); } catch (IOException e) { log.error(e.getMessage()); } } /** * 左填充 * * @param str * @param length * @param ch * @return */ public static String leftPad(String str, int length, char ch) { if (str.length() >= length) { return str; } char [] chs = new char [length]; Arrays.fill(chs, ch); char [] src = str.toCharArray(); System.arraycopy(src, 0 , chs, length - src.length, src.length); return new String(chs); } /** * 合并文件行內部類 */ class PartFile { private String partFileName; private String firstRow; private String endRow; private boolean firstIsFull; public String getPartFileName() { return partFileName; } public void setPartFileName(String partFileName) { this .partFileName = partFileName; } public String getFirstRow() { return firstRow; } public void setFirstRow(String firstRow) { this .firstRow = firstRow; } public String getEndRow() { return endRow; } public void setEndRow(String endRow) { this .endRow = endRow; } public boolean getFirstIsFull() { return firstIsFull; } public void setFirstIsFull( boolean firstIsFull) { this .firstIsFull = firstIsFull; } } } |
以上就是本文的全部內容,希望對大家學習java程序設計有所幫助。