package com.walker.di;
|
|
import com.walker.infrastructure.utils.KeyValue;
|
import com.walker.infrastructure.utils.StringUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.ArrayList;
|
import java.util.Iterator;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.TreeMap;
|
|
public abstract class AbstractDataImportor implements DataImportor{
|
|
protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
protected static final long MAX_BATCH_SIZE = 1024;
|
|
private boolean brokenContinue = false;
|
private String brokenId = null;
|
private boolean batchSupport = false;
|
// 批量处理的数量
|
private long batchSize = MAX_BATCH_SIZE;
|
|
private Object source = null;
|
|
private List<String> fieldNames = null;
|
|
private long saveSizeOnce = 200;
|
|
// 忽略的行数,默认不忽略。
|
private int ignoreRows = 0;
|
// 批量导入线程休眠毫秒数
|
private long sleepMillSeconds = 0;
|
|
private BatchLoadListener batchLoadListener = null;
|
|
// 已导入数据量
|
private long importSize = 0;
|
|
// 导入唯一标识,由业务确定,在导入错误时,运行把错误结果重新导入
|
private String id;
|
|
private boolean showError = true;
|
private String errorFile = null; // 错误结果文件路径
|
private String saveFileFolder = null; // 系统保存文件的文件夹路径
|
private ErrorWriter errorWriter = null;
|
|
private int headRowNumber = 2;
|
|
private UpdateType updateType = UpdateType.Ignore;
|
|
public void setBatchLoadListener(BatchLoadListener batchLoadListener) {
|
this.batchLoadListener = batchLoadListener;
|
}
|
|
@Override
|
public void load() throws DataImportException {
|
if(this.source == null){
|
throw new IllegalArgumentException("加载源不存在:source == null", null);
|
}
|
if(this.brokenContinue && StringUtils.isEmpty(this.brokenId)){
|
throw new IllegalArgumentException("在断点续传(导)模式下,brokenId必须设置");
|
}
|
if(StringUtils.isEmpty(this.id)){
|
throw new IllegalArgumentException("id必须设置:用于区分业务多次导入");
|
}
|
|
if(this.batchSupport){
|
if(this.batchLoadListener == null){
|
throw new IllegalArgumentException("batchLoadListener未设置");
|
}
|
this.acquireImportBatch(this.batchLoadListener, this.source);
|
this.closeErrorWriter();
|
return;
|
}
|
|
List<Object[]> rows = this.acquireImportDataList(this.source);
|
if(StringUtils.isEmptyList(rows)){
|
logger.warn("未加载到任何导入数据");
|
return;
|
}
|
|
// 校验放到 this.doExecuteImport()方法中了 2023-02-02
|
// if(!this.doValidate(rows)){
|
// throw new DataImportException("导入错误,数据校验未通过", null);
|
// }
|
|
this.doExecuteImport(rows, this.getFieldNames());
|
this.closeErrorWriter();
|
}
|
|
/**
|
* 执行一次导入保存操作,无论一次还是批量都调用该方法。
|
* <pre>
|
* 1)一次性的仅执行一次
|
* 2)批量的会多次执行
|
* </pre>
|
* @param rows
|
* @param fieldNames
|
* @throws DataImportException
|
*/
|
protected void doExecuteImport(List<Object[]> rows, List<String> fieldNames) throws DataImportException{
|
if(StringUtils.isEmptyList(fieldNames)){
|
throw new DataImportException("导入数据中未找到标题列信息", null);
|
}
|
if(this.showError && this.errorWriter == null){
|
logger.debug("初始化: ErrorWriter...");
|
this.errorWriter = this.acquireErrorWriter(this.id, fieldNames);
|
if(this.errorWriter == null){
|
throw new UnsupportedOperationException("ErrorWriter未实现代码,请修改");
|
}
|
}
|
|
// 校验数据
|
this.doValidate(rows);
|
|
List<Object[]> savedList = null;
|
long brokenIndex = 0;
|
|
for(Object[] row : rows){
|
if(savedList == null){
|
savedList = new ArrayList<>(256);
|
}
|
|
// 当导入数量不满足系统设置的批量大小时,最后一个元素也要加入保存队列
|
// 以便下面触发保存方法。
|
if(savedList.size() == (rows.size()-1)){
|
savedList.add(row);
|
brokenIndex ++;
|
if(this.logger.isDebugEnabled()){
|
this.logger.debug("最后一批不超过'" + this.saveSizeOnce + "',最后一个元素加入保存队列");
|
}
|
}
|
|
// 当保存集合满了,或者与原始数据
|
if(savedList.size() >= this.saveSizeOnce || savedList.size() == rows.size()){
|
logger.debug("开始保存一次:" + savedList.size());
|
try{
|
this.saveImportData(savedList, fieldNames);
|
this.increaseImportSize(savedList.size());
|
savedList.clear();
|
savedList = new ArrayList<>(256);
|
|
} catch (Exception ex){
|
if(this.brokenContinue){
|
logger.error("导入出现错误,启用断点续传(导),记录当前记录位置:" + brokenIndex);
|
this.saveBrokenInfo(brokenIndex);
|
}
|
if(ex instanceof BusinessImportException){
|
throw ex;
|
}
|
throw new DataImportException("导入数据出现异常:" + ex.getMessage(), ex);
|
}
|
}
|
|
savedList.add(row);
|
brokenIndex ++;
|
}
|
}
|
|
protected boolean doValidate(List<Object[]> rows){
|
boolean success = true;
|
Map<String, String> map = null;
|
List<KeyValue<String, String>> kv = null;
|
Object[] srcObject = null;
|
String error = null;
|
// for(Object[] row : rows){
|
for(Iterator<Object[]> it = rows.iterator(); it.hasNext();){
|
srcObject = it.next();
|
map = this.getDataMap(srcObject);
|
kv = this.getKeyValueList(srcObject, this.fieldNames);
|
// error = this.validateData(map);
|
error = this.validateData(map);
|
if(error != null){
|
if(success){
|
success = false;
|
}
|
// 2023-02-02 直接调用错误记录器写入错误
|
// this.recordUnValidateData(map);
|
if(this.showError){
|
this.errorWriter.write(kv, error);
|
}
|
// 2023-02-01 校验不通过,从集合删除
|
it.remove();
|
continue;
|
}
|
}
|
return success;
|
}
|
|
/**
|
* 把加载的数据转换成 Map 对象,key = column_name, value = data_object
|
* @param data
|
* @return
|
*/
|
protected TreeMap<String, String> getDataMap(Object[] data){
|
if(this.fieldNames == null){
|
throw new IllegalArgumentException("不存在列信息,无法获取数据Map");
|
}
|
// 注意:这里必须用有序的 TreeMap
|
TreeMap<String, String> map = new TreeMap<>();
|
for(int i=0; i<this.fieldNames.size(); i++){
|
map.put(this.fieldNames.get(i), data[i] == null? null : data[i].toString());
|
}
|
return map;
|
}
|
|
protected List<KeyValue<String, String>> getKeyValueList(Object[] data, List<String> fieldNames){
|
List<KeyValue<String, String>> list = new ArrayList<>();
|
KeyValue<String, String> kv = null;
|
for(int i=0; i<this.fieldNames.size(); i++){
|
kv = new KeyValue<>(fieldNames.get(i), data[i] == null? null : data[i].toString());
|
list.add(kv);
|
}
|
return list;
|
}
|
|
@Override
|
public boolean isBatchSupport() {
|
return this.batchSupport;
|
}
|
|
@Override
|
public boolean isBrokenContinue() {
|
return this.brokenContinue;
|
}
|
|
@Override
|
public void setBatchEnabled() {
|
this.setBatchSize(MAX_BATCH_SIZE);
|
}
|
|
@Override
|
public void setBatchSize(long batchSize) {
|
if(batchSize < 0){
|
throw new IllegalArgumentException("batchSize不能小于0");
|
}
|
if(batchSize > MAX_BATCH_SIZE){
|
throw new IllegalArgumentException("batchSize建议不要大于:" + MAX_BATCH_SIZE);
|
}
|
if(batchSize <= this.saveSizeOnce){
|
throw new IllegalArgumentException("批量大小必须大于'每个保存数量':" + this.saveSizeOnce);
|
}
|
this.batchSize = batchSize;
|
this.batchSupport = true;
|
}
|
|
@Override
|
public void setBrokenContinue(boolean allow) {
|
this.brokenContinue = allow;
|
}
|
|
@Override
|
public void setBrokenId(String brokenId){
|
// throw new UnsupportedOperationException("未写代码呢?");
|
this.brokenId = brokenId;
|
}
|
|
@Override
|
public void setFieldNames(List<String> fieldNames){
|
if(this.fieldNames != null){
|
return;
|
}
|
if(StringUtils.isEmptyList(fieldNames)){
|
throw new IllegalArgumentException("fieldNames 不能为空");
|
}
|
this.fieldNames = fieldNames;
|
}
|
|
@Override
|
public void setIgnoreRows(int number){
|
if(number <= 0){
|
throw new IllegalArgumentException("忽略行数必须大于0");
|
}
|
this.ignoreRows = number;
|
}
|
|
@Override
|
public void setBatchSleepMills(long millSeconds){
|
if(millSeconds <= 0 || millSeconds >= 10000){
|
throw new IllegalArgumentException("请设置合理批量导入间隔时间,一般在: 100 - 5000 之间");
|
}
|
this.sleepMillSeconds = millSeconds;
|
}
|
|
@Override
|
public List<String> getFieldNames(){
|
return this.fieldNames;
|
}
|
|
/**
|
* 返回批量加载导入数据的数量(每次)
|
* @return
|
* @date 2023-02-01
|
*/
|
@Override
|
public long getBatchSize(){
|
return this.batchSize;
|
}
|
|
@Override
|
public long getSuccessSize() {
|
return importSize;
|
}
|
|
@Override
|
public String getId() {
|
return id;
|
}
|
|
@Override
|
public void setId(String id) {
|
this.id = id;
|
}
|
|
@Override
|
public void setShowError(boolean showError){
|
this.showError = showError;
|
}
|
|
@Override
|
public void setSaveFileFolder(String saveFileFolder) {
|
this.saveFileFolder = saveFileFolder;
|
}
|
|
/**
|
* 返回错误文件路径,如: d:/file/12344566_error.csv
|
* @return
|
* @date 2023-02-01
|
*/
|
@Override
|
public String getErrorFile(){
|
if(StringUtils.isEmpty(this.saveFileFolder)){
|
throw new IllegalStateException("saveFileFolder未设置,无法保存错误文件");
|
}
|
if(errorFile == null){
|
errorFile = new StringBuilder(this.saveFileFolder).append(this.getId())
|
.append(StringUtils.UNDERLINE).append(this.getImportFileSuffix()).toString();
|
}
|
return errorFile;
|
}
|
|
protected String getSaveFileFolder(){
|
return this.saveFileFolder;
|
}
|
|
/**
|
* 返回忽略导入多少行数据。
|
* @return
|
*/
|
public int getIgnoreRows(){
|
return this.ignoreRows;
|
}
|
|
/**
|
* 返回批量导入时,每次保存数据,线程休息毫秒时间,如果0表示不休眠。
|
* @return
|
*/
|
public long getSleepMillSeconds(){
|
return this.sleepMillSeconds;
|
}
|
|
/**
|
* 设置加载数据的原始参数,可能是文件路径、表名等。
|
* @param source
|
*/
|
protected void setSource(Object source){
|
if(source == null){
|
throw new IllegalArgumentException("source is not null!");
|
}
|
this.source = source;
|
}
|
|
/**
|
* 设置每次保存记录的数量,通常持久化到数据库中是一批次操作,系统默认:200
|
* @param saveSizeOnce
|
*/
|
public void setSaveSizeOnce(long saveSizeOnce) {
|
if(saveSizeOnce < 0 || saveSizeOnce > this.batchSize){
|
throw new IllegalArgumentException("每次保存数据量只能在:0-" + this.batchSize + "之间");
|
}
|
this.saveSizeOnce = saveSizeOnce;
|
}
|
|
@Override
|
public void setHeadRowNumber(int headRowNumber){
|
this.headRowNumber = headRowNumber;
|
}
|
|
@Override
|
public void setUpdateType(UpdateType updateType){
|
this.updateType = updateType;
|
}
|
|
/**
|
* 判断数据更新类型,在需要检查数据是否存在时使用。
|
* @return
|
* @date 2023-02-05
|
*/
|
public UpdateType getUpdateType(){
|
return this.updateType;
|
}
|
|
/**
|
* 返回表头有几行。
|
* @return
|
*/
|
public int getHeadRowNumber(){
|
return this.headRowNumber;
|
}
|
|
public long getSaveSizeOnce(){
|
return this.saveSizeOnce;
|
}
|
|
/**
|
* 增加导入数量,表示已经成功导入数据量。
|
* @param addSize
|
*/
|
protected void increaseImportSize(long addSize){
|
this.importSize += addSize;
|
}
|
|
protected boolean isShowError(){
|
return this.showError;
|
}
|
|
/**
|
* 由子类加载具体导入数据集合。
|
* @param source 原始输入源(参数)
|
* @return
|
* @throws DataImportException
|
*/
|
protected abstract List<Object[]> acquireImportDataList(Object source) throws DataImportException;
|
|
/**
|
* 批量加载具体导入数据集合。
|
* @param batchLoadListener
|
* @param source
|
* @throws DataImportException
|
*/
|
protected abstract void acquireImportBatch(BatchLoadListener batchLoadListener, Object source) throws DataImportException;
|
|
/**
|
* 判断一条数据是否符合验证,由业务实现。
|
* @param data
|
* @return 返回校验不通过原因,如果校验正确则返回: null
|
*/
|
protected abstract String validateData(Map<String, String> data);
|
// protected abstract String validateData(List<KeyValue<String, String>> data);
|
|
// /**
|
// * 记录验证不符合条件的记录,后续会让用户下载并参考。
|
// * @param data
|
// */
|
// protected abstract void recordUnValidateData(Map<String, String> data);
|
|
/**
|
* 保存导入的数据集合。
|
* @param dataList
|
*/
|
protected abstract void saveImportData(List<Object[]> dataList, List<String> fieldNames) throws BusinessImportException;
|
|
/**
|
* 保存断点信息,后续重新导入需要使用。
|
* @param index 当前记录索引值
|
*/
|
protected abstract void saveBrokenInfo(long index);
|
|
/**
|
* 获取一个错误记录器实现对象。
|
* @param id 导入唯一业务标识
|
* @param fieldNames 数据标题列集合
|
* @return
|
* @date 2023-02-02
|
*/
|
protected abstract ErrorWriter acquireErrorWriter(String id, List<String> fieldNames);
|
|
/**
|
* 获得导入文(以及错误结果)文件后缀名,如: error.csv | error.xlsx
|
* @return
|
* @date 2023-02-08
|
*/
|
protected abstract String getImportFileSuffix();
|
|
/**
|
* 注意:这里需要关闭错误记录器,但不能设置csvProcessor为异步任务,否则这里关闭后就无法继续记录了。
|
* @date 2023-02-02
|
*/
|
private void closeErrorWriter(){
|
if(this.errorWriter != null){
|
this.errorWriter.close();
|
this.logger.debug("关闭:errorWriter");
|
}
|
}
|
}
|