shikeyin
2024-01-11 65da8373531677b1c37a98f53eaa30c892f35e5a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package com.iplatform.gather.support;
 
import com.iplatform.gather.util.StoreUtils;
import com.walker.connector.Address;
import com.walker.connector.support.DatabaseConnector;
import com.walker.db.DatabaseException;
import com.walker.infrastructure.utils.StringUtils;
import com.walker.store.support.DatabaseStore;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
 
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 默认的数据库存储对象实现,把采集的原始数据写入到目标表中。
 * @author 时克英
 * @date 2022-09-13
 */
public class DefaultDatabaseStore extends DatabaseStore {
 
    private final Map<Integer, String> cachedInsertSQL = new ConcurrentHashMap<Integer, String>(32);
 
    @Override
    protected int onWriteDataToDB(Address address, DatabaseConnector connector, List<String> fields
            , String destTableName, List<Object> dataList, String srcName, Object parameter) throws DatabaseException {
 
        // 先检查是否需要更新
        if(!this.isSupportUpdate()){
            return this.doInsertData(connector, dataList, fields, destTableName, address);
        }
 
        String[] keyNames = this.getKeyNames(srcName, parameter);
        if(keyNames == null || keyNames.length == 0){
            throw new IllegalArgumentException("参数异常:未找到主键信息,不能执行数据更新操作:" + srcName);
        }
 
        List<Map<String, Object>> existDataList = StoreUtils.loadExistDataList(keyNames, destTableName, connector, dataList);
 
        // 找出更新数据,并批量更新
        if(existDataList != null && existDataList.size() > 0){
            logger.debug("存在要更新的数据集合:" + existDataList.size());
            if(keyNames.length == 1){
                // 存储所有更新的主键集合,key = 主键值, value = 空字符串
                Map<String, String> updateKeysMap = new HashMap<String, String>(existDataList.size());
                for(Map<String, Object> d : existDataList){
                    updateKeysMap.put(d.get(keyNames[0]).toString(), StringUtils.EMPTY_STRING);
//                    updateParams.add(new Object[]{d.get(keyNames[0])});
                }
 
                // 更新SQL
                String updateSql = null;
                // 新插入数据集合
                List<Object> insertDatas = new ArrayList<Object>();
 
                // 拼接更新集合参数
                List<Object[]> updateParams = new ArrayList<Object[]>(existDataList.size());
                String idValue = null;
//                JSONObject record = null;
                Map<String, Object> record = null;
 
                for(Object obj : dataList){
                    record = (Map<String, Object>)obj;
                    idValue = record.get(keyNames[0]).toString(); // 本记录主键的值
 
                    if(updateSql == null){
                        updateSql = StoreUtils.getUpdateSQLByOneKey(destTableName, keyNames[0], record);
                    }
                    if(updateKeysMap.get(idValue) != null){
                        // 这是一个要更新的数据
                        updateParams.add(StoreUtils.getUpdateParams(record, idValue));
                    } else {
                        // 这个是要插入的数据
                        insertDatas.add(record);
                    }
                }
                connector.execBatchUpdateForJdbc(updateSql, updateParams);
                logger.debug("........更新数据库一次:" + updateSql);
 
                // 添加新数据
//                logger.debug("..........更新数据,只有一个主键,太好了:" + srcName + ", " + parameter);
                return this.doInsertData(connector, insertDatas, fields, destTableName, address);
 
            } else if(keyNames.length > 1){
                logger.debug("..........更新数据,存在多个主键:" + srcName + ", " + parameter);
                throw new UnsupportedOperationException("还不支持多个主键的更新操作");
            } else {
                throw new UnsupportedOperationException("不会到这里");
            }
        } else {
            logger.debug("..........没有更新数据,直接添加:" + srcName + ", " + parameter);
            return this.doInsertData(connector, dataList, fields, destTableName, address);
        }
    }
 
    private int doInsertData(DatabaseConnector connector, List<Object> datas
            , List<String> fields, String destTableName, Address addr){
        if(datas != null && datas.size() > 0){
            String sql = StoreUtils.getInsertSQL(addr, fields, destTableName, this.cachedInsertSQL);
//            logger.debug("生成的写入SQL = " + sql);
 
            MapSqlParameterSource[] spsArray = new MapSqlParameterSource[datas.size()];
            int j = 0;
            Map<String, Object> jsonObj = null;
            for(Object map : datas){
                jsonObj = (Map<String, Object>)map;
                spsArray[j++] = StoreUtils.createMapParameter(fields, jsonObj);
            }
            return connector.execBatchInsert(sql, spsArray);
        } else
            return 0;
    }
 
    /**
     * 返回要更新数据的“主键名字”,可能会有多个
     * @param srcName 原始表名
     * @param parameter 业务参数,如:customerCode
     *
     * @return
     */
    protected String[] getKeyNames(String srcName, Object parameter){
        return null;
    }
}