package com.iplatform.gather.support; import com.iplatform.gather.util.StoreUtils; import com.walker.connector.Address; import com.walker.connector.support.DatabaseConnector; import com.walker.db.DatabaseException; import com.walker.infrastructure.utils.StringUtils; import com.walker.store.support.DatabaseStore; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 默认的数据库存储对象实现,把采集的原始数据写入到目标表中。 * @author 时克英 * @date 2022-09-13 */ public class DefaultDatabaseStore extends DatabaseStore { private final Map cachedInsertSQL = new ConcurrentHashMap(32); @Override protected int onWriteDataToDB(Address address, DatabaseConnector connector, List fields , String destTableName, List dataList, String srcName, Object parameter) throws DatabaseException { // 先检查是否需要更新 if(!this.isSupportUpdate()){ return this.doInsertData(connector, dataList, fields, destTableName, address); } String[] keyNames = this.getKeyNames(srcName, parameter); if(keyNames == null || keyNames.length == 0){ throw new IllegalArgumentException("参数异常:未找到主键信息,不能执行数据更新操作:" + srcName); } List> existDataList = StoreUtils.loadExistDataList(keyNames, destTableName, connector, dataList); // 找出更新数据,并批量更新 if(existDataList != null && existDataList.size() > 0){ logger.debug("存在要更新的数据集合:" + existDataList.size()); if(keyNames.length == 1){ // 存储所有更新的主键集合,key = 主键值, value = 空字符串 Map updateKeysMap = new HashMap(existDataList.size()); for(Map d : existDataList){ updateKeysMap.put(d.get(keyNames[0]).toString(), StringUtils.EMPTY_STRING); // updateParams.add(new Object[]{d.get(keyNames[0])}); } // 更新SQL String updateSql = null; // 新插入数据集合 List insertDatas = new ArrayList(); // 拼接更新集合参数 List updateParams = new ArrayList(existDataList.size()); String idValue = null; // JSONObject record = null; Map record = null; for(Object obj : dataList){ record = (Map)obj; idValue = record.get(keyNames[0]).toString(); // 本记录主键的值 if(updateSql == null){ updateSql = StoreUtils.getUpdateSQLByOneKey(destTableName, keyNames[0], record); } if(updateKeysMap.get(idValue) != null){ // 这是一个要更新的数据 updateParams.add(StoreUtils.getUpdateParams(record, idValue)); } else { // 这个是要插入的数据 insertDatas.add(record); } } connector.execBatchUpdateForJdbc(updateSql, updateParams); logger.debug("........更新数据库一次:" + updateSql); // 添加新数据 // logger.debug("..........更新数据,只有一个主键,太好了:" + srcName + ", " + parameter); return this.doInsertData(connector, insertDatas, fields, destTableName, address); } else if(keyNames.length > 1){ logger.debug("..........更新数据,存在多个主键:" + srcName + ", " + parameter); throw new UnsupportedOperationException("还不支持多个主键的更新操作"); } else { throw new UnsupportedOperationException("不会到这里"); } } else { logger.debug("..........没有更新数据,直接添加:" + srcName + ", " + parameter); return this.doInsertData(connector, dataList, fields, destTableName, address); } } private int doInsertData(DatabaseConnector connector, List datas , List fields, String destTableName, Address addr){ if(datas != null && datas.size() > 0){ String sql = StoreUtils.getInsertSQL(addr, fields, destTableName, this.cachedInsertSQL); // logger.debug("生成的写入SQL = " + sql); MapSqlParameterSource[] spsArray = new MapSqlParameterSource[datas.size()]; int j = 0; Map jsonObj = null; for(Object map : datas){ jsonObj = (Map)map; spsArray[j++] = StoreUtils.createMapParameter(fields, jsonObj); } return connector.execBatchInsert(sql, spsArray); } else return 0; } /** * 返回要更新数据的“主键名字”,可能会有多个 * @param srcName 原始表名 * @param parameter 业务参数,如:customerCode * * @return */ protected String[] getKeyNames(String srcName, Object parameter){ return null; } }