SpringBatch 是一個大數據量的并行處理框架。通常用于數據的離線遷移,和數據處理,?持事務、并發、流程、監控、縱向和橫向擴展,提供統?的接?管理和任務管理;SpringBatch是SpringSource和埃森哲為了統一業界并行處理標準為廣大開發者提供方便開發的一套框架。
官方地址:github.com/spring-projects/spring-batch
- SpringBatch 本身提供了重試,異常處理,跳過,重啟、任務處理統計,資源管理等特性,這些特性開發者看重他的主要原因;
- SpringBatch 是一個輕量級的批處理框架;
- SpringBatch 結構分層,業務與處理策略、結構分離;
- 任務的運行的實例狀態,執行數據,參數都會落地到數據庫;
快速入門
pom.xml 添加
1
2
3
4
|
< dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-batch</ artifactId > </ dependency > |
創建BatchConfig(可以是其他類名)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
@Configuration @EnableBatchProcessing public class BatchConfig { // tag::readerwriterprocessor[] @Bean public FlatFileItemReader<Person> flatFileItemReader() { FlatFileItemReader<Person> reader = new FlatFileItemReader<>(); reader.setResource( new ClassPathResource( "sample-data.csv" )); FixedLengthTokenizer fixedLengthTokenizer = new FixedLengthTokenizer(); reader.setLineMapper( new DefaultLineMapper<Person>() {{ setLineTokenizer( new DelimitedLineTokenizer() {{ setNames( new String[]{ "firstName" , "lastName" }); }}); setFieldSetMapper( new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person. class ); }}); }}); return reader; } @Bean public JdbcPagingItemReader<Person> jdbcPagingItemReader(DataSource dataSource) { JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>(); reader.setDataSource(dataSource); reader.setFetchSize( 100 ); reader.setQueryProvider( new MySqlPagingQueryProvider() {{ setSelectClause( "SELECT person_id,first_name,last_name" ); setFromClause( "from people" ); setWhereClause( "last_name=:lastName" ); setSortKeys( new HashMap<String, Order>() {{ put( "person_id" , Order.ASCENDING); }}); }}); reader.setParameterValues( new HashMap<String, Object>() {{ put( "lastName" , "DOE" ); }}); reader.setRowMapper( new BeanPropertyRowMapper<>(Person. class )); return reader; } @Bean public JdbcBatchItemWriter<Person> jdbcBatchItemWriter(DataSource dataSource) { JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>(); writer.setItemSqlParameterSourceProvider( new BeanPropertyItemSqlParameterSourceProvider<>()); writer.setSql( "INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)" ); writer.setDataSource(dataSource); return writer; } /*@Bean public FlatFileItemWriter<Person> flatFileItemWriter(DataSource dataSource) { FlatFileItemWriter<Person> writer = new FlatFileItemWriter<>(); writer.setAppendAllowed(true); writer.setEncoding("UTF-8"); // writer.set(dataSource); return writer; }*/ // end::readerwriterprocessor[] // tag::jobstep[] @Bean public Job importUserJob(JobBuilderFactory jobBuilderFactory, JobCompletionNotificationListener listener, Step step) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .start(step) .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, PersonItemProcessor processor, ItemWriter jdbcBatchItemWriter, ItemReader flatFileItemReader) { /*CompositeItemProcessor compositeItemProcessor = new CompositeItemProcessor(); compositeItemProcessor.setDelegates(Lists.newArrayList(processor, processor));*/ return stepBuilderFactory.get( "step1" ) .<Person, Person>chunk( 10 ) .reader(flatFileItemReader) .processor(processor) .writer(jdbcBatchItemWriter) .build(); } // end::jobstep[] } |
Spring Batch的分層架構
- Insfrastructure 策略管理:包括任務的失敗重試,異常處理,事務,skip,以及數據的輸入輸出(文本文件,DB,Message)
- Core: springBatch 的核心,包括JobLauch,job,step等等
- Application: 業務處理,創建任務,決定任務的執行方式(定時任務,手動觸發等)
Spring Batch執行流程
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
原文鏈接:http://blog.didispace.com/spring-batch-1/