国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語(yǔ)言|JavaScript|易語(yǔ)言|vb.net|

服務(wù)器之家 - 編程語(yǔ)言 - Java教程 - 基于Spring Batch向Elasticsearch批量導(dǎo)入數(shù)據(jù)示例

基于Spring Batch向Elasticsearch批量導(dǎo)入數(shù)據(jù)示例

2021-04-06 13:10彭超 Java教程

本文介紹了基于Spring Batch向Elasticsearch批量導(dǎo)入數(shù)據(jù)示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

1.介紹

當(dāng)系統(tǒng)有大量數(shù)據(jù)需要從數(shù)據(jù)庫(kù)導(dǎo)入Elasticsearch時(shí),使用Spring Batch可以提高導(dǎo)入的效率。Spring Batch使用ItemReader分頁(yè)讀取數(shù)據(jù),ItemWriter批量寫數(shù)據(jù)。由于Spring Batch沒(méi)有提供Elastisearch的ItemWriter和ItemReader,本示例中自定義一個(gè)ElasticsearchItemWriter(ElasticsearchItemReader),用于批量導(dǎo)入

2.示例

2.1 pom.xml

本文使用spring data jest連接ES(也可以使用spring data elasticsearch連接ES),ES版本為5.5.3

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
 
  <groupId>com.hfcsbc.estl</groupId>
  <artifactId>es-etl</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
 
  <name>es-etl</name>
  <description>Demo project for Spring Boot</description>
 
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.M7</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
 
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
  </properties>
 
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
 
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
 
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
    </dependency>
 
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
 
    <dependency>
      <groupId>com.github.vanroy</groupId>
      <artifactId>spring-boot-starter-data-jest</artifactId>
      <version>3.0.0.RELEASE</version>
    </dependency>
 
    <dependency>
      <groupId>io.searchbox</groupId>
      <artifactId>jest</artifactId>
      <version>5.3.2</version>
    </dependency>
 
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
 
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
 
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
 
  <repositories>
    <repository>
      <id>spring-snapshots</id>
      <name>Spring Snapshots</name>
      <url>https://repo.spring.io/snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
    <repository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
 
  </repositories>
 
  <pluginRepositories>
    <pluginRepository>
      <id>spring-snapshots</id>
      <name>Spring Snapshots</name>
      <url>https://repo.spring.io/snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </pluginRepository>
    <pluginRepository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </pluginRepository>
  </pluginRepositories>
</project>

2.2 實(shí)體類及repository

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.hfcsbc.esetl.domain;
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;
 
/**
 * Create by pengchao on 2018/2/23
 */
@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")
@Entity
@Data
public class Person {
  @Id
  private Long id;
  private String name;
  @OneToOne
  @Field(type = FieldType.Nested)
  private Address address;
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.hfcsbc.esetl.domain;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
/**
 * Create by pengchao on 2018/2/23
 */
@Entity
@Data
public class Address {
  @Id
  private Long id;
  private String name;
}
?
1
2
3
4
5
6
7
8
package com.hfcsbc.esetl.repository.jpa;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.jpa.repository.JpaRepository;
/**
 * Create by pengchao on 2018/2/23
 */
public interface PersonRepository extends JpaRepository<Person, Long> {
}
?
1
2
3
4
5
6
7
8
package com.hfcsbc.esetl.repository.es;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
/**
 * Create by pengchao on 2018/2/23
 */
public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> {
}

2.3 配置elasticsearchItemWriter

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.hfcsbc.esetl.itemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
/**
 * Create by pengchao on 2018/2/23
 */
public class ElasticsearchItemWriter implements ItemWriter<Person>, ItemWriteListener<Person>, StepExecutionListener {
 
  private EsPersonRepository personRepository;
 
  public ElasticsearchItemWriter(EsPersonRepository personRepository) {
    this.personRepository = personRepository;
  }
 
  @Override
  public void beforeWrite(List<? extends Person> items) {
 
  }
 
  @Override
  public void afterWrite(List<? extends Person> items) {
 
  }
 
  @Override
  public void onWriteError(Exception exception, List<? extends Person> items) {
 
  }
 
  @Override
  public void beforeStep(StepExecution stepExecution) {
 
  }
 
  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    return null;
  }
 
  @Override
  public void write(List<? extends Person> items) throws Exception {
    //實(shí)現(xiàn)類AbstractElasticsearchRepository的saveAll方法調(diào)用的是elasticsearchOperations.bulkIndex(queries),為批量索引
    personRepository.saveAll(items);
  }
}

2.4 配置ElasticsearchItemReader(本示例未使用,僅供參考)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.hfcsbc.esetl.itemReader;
import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import java.util.Iterator;
/**
 * Create by pengchao on 2018/2/24
 */
public class ElasticsearchItemReader<Person> extends AbstractPaginatedDataItemReader<Person> implements InitializingBean {
 
  private final ElasticsearchOperations elasticsearchOperations;
 
  private final SearchQuery query;
 
  private final Class<? extends Person> targetType;
 
  public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends Person> targetType) {
    this.elasticsearchOperations = elasticsearchOperations;
    this.query = query;
    this.targetType = targetType;
  }
 
  @Override
  protected Iterator<Person> doPageRead() {
    return (Iterator<Person>)elasticsearchOperations.queryForList(query, targetType).iterator();
  }
 
  @Override
  public void afterPropertiesSet() throws Exception {
  }
}

2.5 配置spring batch需要的配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.hfcsbc.esetl.config;
import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;
/**
 * Create by pengchao on 2018/2/23
 */
@Configuration
@EnableBatchProcessing
public class BatchConfig {
  @Autowired
  private EsPersonRepository personRepository;
 
  @Bean
  public ItemReader<Person> orderItemReader(EntityManagerFactory entityManagerFactory){
    JpaPagingItemReader<Person> reader = new JpaPagingItemReader<Person>();
    String sqlQuery = "select * from person";
    try {
      JpaNativeQueryProvider<Person> queryProvider = new JpaNativeQueryProvider<Person>();
      queryProvider.setSqlQuery(sqlQuery);
      queryProvider.setEntityClass(Person.class);
      queryProvider.afterPropertiesSet();
      reader.setEntityManagerFactory(entityManagerFactory);
      reader.setPageSize(10000);
      reader.setQueryProvider(queryProvider);
      reader.afterPropertiesSet();
      reader.setSaveState(true);
    } catch (Exception e) {
      e.printStackTrace();
    }
 
    return reader;
  }
 
  @Bean
  public ElasticsearchItemWriter itemWriter(){
    return new ElasticsearchItemWriter(personRepository);
  }
 
  @Bean
  public Step step(StepBuilderFactory stepBuilderFactory,
           ItemReader itemReader,
           ItemWriter itemWriter){
    return stepBuilderFactory
        .get("step1")
        .chunk(10000)
        .reader(itemReader)
        .writer(itemWriter)
        .build();
  }
 
  @Bean
  public Job job(JobBuilderFactory jobBuilderFactory, Step step){
    return jobBuilderFactory
        .get("importJob")
        .incrementer(new RunIdIncrementer())
        .flow(step)
        .end()
        .build();
  }
 
  /**
   * spring batch執(zhí)行時(shí)會(huì)創(chuàng)建一些自身需要的表,這里指定表創(chuàng)建的位置:dataSource
   * @param dataSource
   * @param manager
   * @return
   */
  @Bean
  public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDataSource(dataSource);
    jobRepositoryFactoryBean.setTransactionManager(manager);
    jobRepositoryFactoryBean.setDatabaseType("postgres");
    try {
      return jobRepositoryFactoryBean.getObject();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }
}

2.6配置數(shù)據(jù)庫(kù)及es的連接地址

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
spring:
 redis:
  host: 192.168.1.222
 data:
  jest:
   uri: http://192.168.1.222:9200
   username: elastic
   password: changeme
 
 jpa:
  database: POSTGRESQL
  show-sql: true
  hibernate:
   ddl-auto: update
 
 datasource:
  platform: postgres
  url: jdbc:postgresql://192.168.1.222:5433/person
  username: hfcb
  password: hfcb
  driver-class-name: org.postgresql.Driver
  max-active: 2
 
spring.batch.initialize-schema: always

2.7 配置入口類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.hfcsbc.esetl;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
 
@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})
@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")
@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")
public class EsEtlApplication {
 
  public static void main(String[] args) {
    SpringApplication.run(EsEtlApplication.class, args);
  }
}

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。

原文鏈接:http://www.wisely.top/2018/02/24/spring-batch-elasticsearch

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 欧美一区二区三区免费 | 日韩成人在线视频 | 精品国产一区二区三区四区 | 亚洲国产精品久久 | 五月天婷婷社区 | 国产精品视频入口 | 人人插| 91精品国产91久久久久久 | 中文字幕国产一区 | 天堂va在线高清一区 | 国产激情精品视频 | 亚洲在线视频一区二区 | 国产精品亚洲一区二区三区在线 | 日韩在线综合 | 国产一区二 | 曰本人一级毛片免费完整视频 | 91国产精品 | 一区二区三区在线免费播放 | 久久成人av | 亚洲精品福利 | 国产一区久久 | 亚洲美女精品视频 | 久久久久久久国产精品视频 | 欧美大片高清在线观看平台 | 欧美一区二区三区婷婷月色 | 中文字幕视频一区 | 久久精品成人一区二区三区蜜臀 | 成人精品在线视频 | 超级碰在线观看 | 欧美一级欧美三级在线观看 | 成人精品一区二区三区 | 久久久精品一区二区 | 久久久久久99 | 黄色国产在线看 | 亚洲在线观看一区二区 | 欧美精品一二三 | 日本黄色免费网站 | 成人av网页 | 精品96久久久久久中文字幕无 | 天堂中文网 | 国产精品一区久久久 |