package com.iplatform.gather.support;
|
|
import com.iplatform.gather.util.StoreUtils;
|
import com.walker.connector.Address;
|
import com.walker.connector.support.DatabaseConnector;
|
import com.walker.db.DatabaseException;
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.store.support.DatabaseStore;
|
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
|
|
import java.util.ArrayList;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
/**
|
* 默认的数据库存储对象实现,把采集的原始数据写入到目标表中。
|
* @author 时克英
|
* @date 2022-09-13
|
*/
|
public class DefaultDatabaseStore extends DatabaseStore {
|
|
private final Map<Integer, String> cachedInsertSQL = new ConcurrentHashMap<Integer, String>(32);
|
|
@Override
|
protected int onWriteDataToDB(Address address, DatabaseConnector connector, List<String> fields
|
, String destTableName, List<Object> dataList, String srcName, Object parameter) throws DatabaseException {
|
|
// 先检查是否需要更新
|
if(!this.isSupportUpdate()){
|
return this.doInsertData(connector, dataList, fields, destTableName, address);
|
}
|
|
String[] keyNames = this.getKeyNames(srcName, parameter);
|
if(keyNames == null || keyNames.length == 0){
|
throw new IllegalArgumentException("参数异常:未找到主键信息,不能执行数据更新操作:" + srcName);
|
}
|
|
List<Map<String, Object>> existDataList = StoreUtils.loadExistDataList(keyNames, destTableName, connector, dataList);
|
|
// 找出更新数据,并批量更新
|
if(existDataList != null && existDataList.size() > 0){
|
logger.debug("存在要更新的数据集合:" + existDataList.size());
|
if(keyNames.length == 1){
|
// 存储所有更新的主键集合,key = 主键值, value = 空字符串
|
Map<String, String> updateKeysMap = new HashMap<String, String>(existDataList.size());
|
for(Map<String, Object> d : existDataList){
|
updateKeysMap.put(d.get(keyNames[0]).toString(), StringUtils.EMPTY_STRING);
|
// updateParams.add(new Object[]{d.get(keyNames[0])});
|
}
|
|
// 更新SQL
|
String updateSql = null;
|
// 新插入数据集合
|
List<Object> insertDatas = new ArrayList<Object>();
|
|
// 拼接更新集合参数
|
List<Object[]> updateParams = new ArrayList<Object[]>(existDataList.size());
|
String idValue = null;
|
// JSONObject record = null;
|
Map<String, Object> record = null;
|
|
for(Object obj : dataList){
|
record = (Map<String, Object>)obj;
|
idValue = record.get(keyNames[0]).toString(); // 本记录主键的值
|
|
if(updateSql == null){
|
updateSql = StoreUtils.getUpdateSQLByOneKey(destTableName, keyNames[0], record);
|
}
|
if(updateKeysMap.get(idValue) != null){
|
// 这是一个要更新的数据
|
updateParams.add(StoreUtils.getUpdateParams(record, idValue));
|
} else {
|
// 这个是要插入的数据
|
insertDatas.add(record);
|
}
|
}
|
connector.execBatchUpdateForJdbc(updateSql, updateParams);
|
logger.debug("........更新数据库一次:" + updateSql);
|
|
// 添加新数据
|
// logger.debug("..........更新数据,只有一个主键,太好了:" + srcName + ", " + parameter);
|
return this.doInsertData(connector, insertDatas, fields, destTableName, address);
|
|
} else if(keyNames.length > 1){
|
logger.debug("..........更新数据,存在多个主键:" + srcName + ", " + parameter);
|
throw new UnsupportedOperationException("还不支持多个主键的更新操作");
|
} else {
|
throw new UnsupportedOperationException("不会到这里");
|
}
|
} else {
|
logger.debug("..........没有更新数据,直接添加:" + srcName + ", " + parameter);
|
return this.doInsertData(connector, dataList, fields, destTableName, address);
|
}
|
}
|
|
private int doInsertData(DatabaseConnector connector, List<Object> datas
|
, List<String> fields, String destTableName, Address addr){
|
if(datas != null && datas.size() > 0){
|
String sql = StoreUtils.getInsertSQL(addr, fields, destTableName, this.cachedInsertSQL);
|
// logger.debug("生成的写入SQL = " + sql);
|
|
MapSqlParameterSource[] spsArray = new MapSqlParameterSource[datas.size()];
|
int j = 0;
|
Map<String, Object> jsonObj = null;
|
for(Object map : datas){
|
jsonObj = (Map<String, Object>)map;
|
spsArray[j++] = StoreUtils.createMapParameter(fields, jsonObj);
|
}
|
return connector.execBatchInsert(sql, spsArray);
|
} else
|
return 0;
|
}
|
|
/**
|
* 返回要更新数据的“主键名字”,可能会有多个
|
* @param srcName 原始表名
|
* @param parameter 业务参数,如:customerCode
|
*
|
* @return
|
*/
|
protected String[] getKeyNames(String srcName, Object parameter){
|
return null;
|
}
|
}
|