批量導入思路
最近遇到一個需要批量導入數據問題。后來考慮運用反射做成一個工具類,思路是首先定義注解接口,在bean類上加注解,運行時通過反射獲取傳入Bean的注解,自動生成需要插入DB的SQL,根據設置的參數值批量提交。不需要寫具體的SQL,也沒有DAO的實現,這樣一來批量導入的實現就和具體的數據庫表徹底解耦。實際批量執行的SQL如下:
1 | insert into company_candidate(company_id,user_id,card_id,facebook_id,type,create_time,weight,score) VALUES (?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE type=?,weight=?,score=? |
第一步,定義注解接口
注解接口Table中定義了數據庫名和表名。RetentionPolicy.RUNTIME表示該注解保存到運行時,因為我們需要在運行時,去讀取注解參數來生成具體的SQL。
02 | @Retention (RetentionPolicy.RUNTIME) |
03 | @Target (ElementType.TYPE) |
04 | public @interface Table { |
09 | String tableName() default "" ; |
注解接口TableField中定義了數據庫表名的各個具體字段名稱,以及該字段是否忽略(忽略的話就會以數據庫表定義默認值填充,DB非null字段的注解不允許出現把ignore注解設置為true)。update注解是在主鍵在DB重復時,需要更新的字段。
02 | @Retention (RetentionPolicy.RUNTIME) |
03 | @Target (ElementType.FIELD) |
04 | public @interface TableField { |
09 | String fieldName() default "" ; |
14 | boolean pk() default false ; |
19 | boolean ignore() default false ; |
24 | boolean update() default false ; |
第二步,給Bean添加注解
給Bean添加注解(為了簡潔省略了import和set/get方法以及其他屬性),@TableField(fieldName = "company_id")
表示companyId字段對應DB表的字段名為"company_id
",其中updateTime屬性的注解含有ignore=true
,表示該屬性值會被忽略。另外serialVersionUID屬性由于沒有@TableField注解,在更新DB時也會被忽略。
代碼如下:
01 | @Table (dbName = "company" , tableName = "company_candidate" ) |
02 | public class CompanyCandidateModel implements Serializable{ |
03 | private static final long serialVersionUID = -1234554321773322135L; |
04 | @TableField (fieldName = "company_id" ) |
05 | private int companyId; |
06 | @TableField (fieldName = "user_id" ) |
09 | @TableField (fieldName = "card_id" ) |
12 | @TableField (fieldName = "facebook_id" ) |
13 | private long facebookId; |
14 | @TableField (fieldName= "type" , update = true ) |
16 | @TableField (fieldName = "create_time" ) |
17 | private Date createTime; |
18 | @TableField (fieldName = "update_time" , ignore= true ) |
19 | private Date updateTime; |
21 | @TableField (fieldName= "weight" , update = true ) |
24 | @TableField (fieldName= "score" , update = true ) |
第三步,讀取注解的反射工具類
讀取第二步Bean類的注解的反射工具類。利用反射getAnnotation(TableField.class)
讀取注解信息,為批量SQL的拼接最好準備。
getTableBeanFieldMap()
方法里生成一個LinkedHashMap對象,是為了保證生成插入SQL的field順序,之后也能按同樣的順序給參數賦值,避免錯位。getSqlParamFields()
方法也類似,是為了給PreparedStatement設置參數用。
代碼如下:
01 | public class ReflectUtil { |
03 | * <Class,<表定義Field名,Bean定義Field>>的map緩存 |
05 | private static final Map<Class<?>, Map<string field= "" >> classTableBeanFieldMap = new HashMap<Class<?>, Map<string field= "" >>(); |
07 | private static final Map<Class<?>, List<field>> sqlParamFieldsMap = new HashMap<Class<?>, List<field>>(); |
08 | private ReflectUtil(){}; |
10 | * 獲取該類上所有@TableField注解,且沒有忽略的字段的Map。 |
11 | * <br />返回一個有序的LinkedHashMap類型 |
12 | * <br />其中key為DB表中的字段,value為Bean類里的屬性Field對象 |
16 | public static Map<string field= "" > getTableBeanFieldMap(Class<?> clazz) { |
18 | Map<string field= "" > fieldsMap = classTableBeanFieldMap.get(clazz); |
19 | if (fieldsMap == null ) { |
20 | fieldsMap = new LinkedHashMap<string field= "" >(); |
21 | for (Field field : clazz.getDeclaredFields()) { |
22 | TableField annotation = field.getAnnotation(TableField. class ); |
23 | if (annotation != null && !annotation.ignore() && ! "" .equals(annotation.fieldName())) { |
24 | field.setAccessible( true ); |
25 | fieldsMap.put(annotation.fieldName(), field); |
29 | classTableBeanFieldMap.put(clazz, fieldsMap); |
34 | * 獲取該類上所有@TableField注解,且沒有忽略的字段的Map。ON DUPLICATE KEY UPDATE后需要更新的字段追加在list最后,為了填充參數值準備 |
35 | * <br />返回一個有序的ArrayList類型 |
36 | * <br />其中key為DB表中的字段,value為Bean類里的屬性Field對象 |
40 | public static List<field> getSqlParamFields(Class<?> clazz) { |
42 | List<field> sqlParamFields = sqlParamFieldsMap.get(clazz); |
43 | if (sqlParamFields == null ) { |
45 | Map<string field= "" > fieldsMap = getTableBeanFieldMap(clazz); |
46 | sqlParamFields = new ArrayList<field>(fieldsMap.size() * 2 ); |
48 | List<field> updateParamFields = new ArrayList<field>(); |
49 | Iterator<Entry<string field= "" >> iter = fieldsMap.entrySet().iterator(); |
50 | while (iter.hasNext()) { |
51 | Entry<string field= "" > entry = (Entry<string field= "" >) iter.next(); |
52 | Field field = entry.getValue(); |
54 | sqlParamFields.add(field); |
56 | TableField annotation = field.getAnnotation(TableField. class ); |
57 | if (annotation != null && !annotation.ignore() && annotation.update()) { |
58 | updateParamFields.add(field); |
61 | sqlParamFields.addAll(updateParamFields); |
63 | sqlParamFieldsMap.put(clazz, sqlParamFields); |
65 | return sqlParamFields; |
68 | * 獲取表名,對象中使用@Table的tableName來標記對應數據庫的表名,若未標記則自動將類名轉成小寫 |
73 | public static String getTableName(Class<?> clazz) { |
74 | Table table = clazz.getAnnotation(Table. class ); |
75 | if (table != null && table.tableName() != null && ! "" .equals(table.tableName())) { |
76 | return table.tableName(); |
79 | return clazz.getSimpleName().toLowerCase(); |
82 | * 獲取數據庫名,對象中使用@Table的dbName來標記對應數據庫名 |
86 | public static String getDBName(Class<?> clazz) { |
87 | Table table = clazz.getAnnotation(Table. class ); |
88 | if (table != null && table.dbName() != null ) { |
90 | return table.dbName(); |
第四步,生成SQL語句
根據上一步的方法,生成真正執行的SQL語句。
1 | insert into company_candidate(company_id,user_id,card_id,facebook_id,type,create_time,weight,score) VALUES (?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE type=?,weight=?,score=? |
代碼如下:
02 | private static final char COMMA = ',' ; |
03 | private static final char BRACKETS_BEGIN = '(' ; |
04 | private static final char BRACKETS_END = ')' ; |
05 | private static final char QUESTION_MARK = '?' ; |
06 | private static final char EQUAL_SIGN = '=' ; |
07 | private static final String INSERT_BEGIN = "INSERT INTO " ; |
08 | private static final String INSERT_VALURS = " VALUES " ; |
09 | private static final String DUPLICATE_UPDATE = " ON DUPLICATE KEY UPDATE " ; |
11 | private static final Map<string string= "" > tableInsertSqlMap = new HashMap<string string= "" >(); |
13 | * 獲取插入的sql語句,對象中使用@TableField的fieldName來標記對應數據庫的列名,若未標記則忽略 |
14 | * 必須標記@TableField(fieldName = "company_id")注解 |
20 | public static String getInsertSql(String tableName, Map<string field= "" > fieldsMap) throws Exception { |
21 | String sql = tableInsertSqlMap.get(tableName); |
23 | StringBuilder sbSql = new StringBuilder( 300 ).append(INSERT_BEGIN); |
24 | StringBuilder sbValue = new StringBuilder(INSERT_VALURS); |
25 | StringBuilder sbUpdate = new StringBuilder( 100 ).append(DUPLICATE_UPDATE); |
26 | sbSql.append(tableName); |
27 | sbSql.append(BRACKETS_BEGIN); |
28 | sbValue.append(BRACKETS_BEGIN); |
29 | Iterator<Entry<string field= "" >> iter = fieldsMap.entrySet().iterator(); |
30 | while (iter.hasNext()) { |
31 | Entry<string field= "" > entry = (Entry<string field= "" >) iter.next(); |
32 | String tableFieldName = entry.getKey(); |
33 | Field field = entry.getValue(); |
34 | sbSql.append(tableFieldName); |
36 | sbValue.append(QUESTION_MARK); |
37 | sbValue.append(COMMA); |
38 | TableField tableField = field.getAnnotation(TableField. class ); |
39 | if (tableField != null && tableField.update()) { |
40 | sbUpdate.append(tableFieldName); |
41 | sbUpdate.append(EQUAL_SIGN); |
42 | sbUpdate.append(QUESTION_MARK); |
43 | sbUpdate.append(COMMA); |
47 | sbSql.deleteCharAt(sbSql.length() - 1 ); |
48 | sbValue.deleteCharAt(sbValue.length() - 1 ); |
49 | sbSql.append(BRACKETS_END); |
50 | sbValue.append(BRACKETS_END); |
51 | sbSql.append(sbValue); |
52 | if (!sbUpdate.toString().equals(DUPLICATE_UPDATE)) { |
53 | sbUpdate.deleteCharAt(sbUpdate.length() - 1 ); |
54 | sbSql.append(sbUpdate); |
56 | sql = sbSql.toString(); |
57 | tableInsertSqlMap.put(tableName, sql); |
第五步,批量SQL插入實現
從連接池獲取Connection,SQLUtil.getInsertSql()
獲取執行的SQL語句,根據sqlParamFields來為PreparedStatement填充參數值。當循環的值集合到達batchNum時就提交一次。
代碼如下:
02 | * 批量插入,如果主鍵一致則更新。結果返回更新記錄條數<br /> |
09 | public int batchInsertSQL(List<? extends Object> dataList, int batchNum) throws Exception { |
10 | if (dataList == null || dataList.isEmpty()) { |
13 | Class<?> clazz = dataList.get( 0 ).getClass(); |
14 | String tableName = ReflectUtil.getTableName(clazz); |
15 | String dbName = ReflectUtil.getDBName(clazz); |
16 | Connection connnection = null ; |
17 | PreparedStatement preparedStatement = null ; |
19 | Map<string field= "" > fieldsMap = ReflectUtil.getTableBeanFieldMap(dataList.get( 0 ).getClass()); |
21 | String sql = SQLUtil.getInsertSql(tableName, fieldsMap); |
22 | log.debug( "prepare to start batch operation , sql = " + sql + " , dbName = " + dbName); |
24 | List<field> sqlParamFields = ReflectUtil.getSqlParamFields(dataList.get( 0 ).getClass()); |
27 | int parameterIndex = 1 ; |
29 | List<object> errorsRecords = new ArrayList</object><object>(batchNum); |
33 | connnection = this .getConnection(dbName); |
35 | connnection.setAutoCommit( false ); |
36 | preparedStatement = connnection.prepareStatement(sql); |
39 | int totalRecordCount = dataList.size(); |
40 | for ( int current = 0 ; current < totalRecordCount; current++) { |
42 | object = dataList.get(current); |
44 | for (Field field : sqlParamFields) { |
46 | preparedStatement.setObject(parameterIndex++, field.get(object)); |
48 | errorsRecords.add(object); |
49 | preparedStatement.addBatch(); |
51 | if (innerCount >= batchNum || current >= totalRecordCount - 1 ) { |
53 | preparedStatement.executeBatch(); |
54 | preparedStatement.clearBatch(); |
60 | errorsRecords.clear(); |
63 | dataList.set(current, null ); |
66 | } catch (Exception e) { |
68 | CallBackImpl.getInstance().exectuer(sql, errorsRecords, e); |
69 | BatchDBException be = new BatchDBException( "batch run error , dbName = " + dbName + " sql = " + sql, e); |
74 | if (preparedStatement != null ) { |
75 | preparedStatement.clearBatch(); |
76 | preparedStatement.close(); |
78 | if (connnection != null ) |
最后,批量工具類使用例子
在mysql下的開發環境下測試,5萬條數據大概13秒。
1 | List<companycandidatemodel> updateDataList = new ArrayList<companycandidatemodel>( 50000 ); |
3 | int result = batchJdbcTemplate.batchInsertSQL(updateDataList, 50 ); |
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對服務器之家的支持.