package com.walker.di; import com.walker.infrastructure.utils.KeyValue; import com.walker.infrastructure.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; public abstract class AbstractDataImportor implements DataImportor{ protected final transient Logger logger = LoggerFactory.getLogger(this.getClass()); protected static final long MAX_BATCH_SIZE = 1024; private boolean brokenContinue = false; private String brokenId = null; private boolean batchSupport = false; // 批量处理的数量 private long batchSize = MAX_BATCH_SIZE; private Object source = null; private List fieldNames = null; private long saveSizeOnce = 200; // 忽略的行数,默认不忽略。 private int ignoreRows = 0; // 批量导入线程休眠毫秒数 private long sleepMillSeconds = 0; private BatchLoadListener batchLoadListener = null; // 已导入数据量 private long importSize = 0; // 导入唯一标识,由业务确定,在导入错误时,运行把错误结果重新导入 private String id; private boolean showError = true; private String errorFile = null; // 错误结果文件路径 private String saveFileFolder = null; // 系统保存文件的文件夹路径 private ErrorWriter errorWriter = null; private int headRowNumber = 2; private UpdateType updateType = UpdateType.Ignore; public void setBatchLoadListener(BatchLoadListener batchLoadListener) { this.batchLoadListener = batchLoadListener; } @Override public void load() throws DataImportException { if(this.source == null){ throw new IllegalArgumentException("加载源不存在:source == null", null); } if(this.brokenContinue && StringUtils.isEmpty(this.brokenId)){ throw new IllegalArgumentException("在断点续传(导)模式下,brokenId必须设置"); } if(StringUtils.isEmpty(this.id)){ throw new IllegalArgumentException("id必须设置:用于区分业务多次导入"); } if(this.batchSupport){ if(this.batchLoadListener == null){ throw new IllegalArgumentException("batchLoadListener未设置"); } this.acquireImportBatch(this.batchLoadListener, this.source); this.closeErrorWriter(); return; } List rows = this.acquireImportDataList(this.source); if(StringUtils.isEmptyList(rows)){ logger.warn("未加载到任何导入数据"); return; } // 校验放到 this.doExecuteImport()方法中了 2023-02-02 // if(!this.doValidate(rows)){ // throw new DataImportException("导入错误,数据校验未通过", null); // } this.doExecuteImport(rows, this.getFieldNames()); this.closeErrorWriter(); } /** * 执行一次导入保存操作,无论一次还是批量都调用该方法。 *
     *     1)一次性的仅执行一次
     *     2)批量的会多次执行
     * 
* @param rows * @param fieldNames * @throws DataImportException */ protected void doExecuteImport(List rows, List fieldNames) throws DataImportException{ if(StringUtils.isEmptyList(fieldNames)){ throw new DataImportException("导入数据中未找到标题列信息", null); } if(this.showError && this.errorWriter == null){ logger.debug("初始化: ErrorWriter..."); this.errorWriter = this.acquireErrorWriter(this.id, fieldNames); if(this.errorWriter == null){ throw new UnsupportedOperationException("ErrorWriter未实现代码,请修改"); } } // 校验数据 this.doValidate(rows); List savedList = null; long brokenIndex = 0; for(Object[] row : rows){ if(savedList == null){ savedList = new ArrayList<>(256); } // 当导入数量不满足系统设置的批量大小时,最后一个元素也要加入保存队列 // 以便下面触发保存方法。 if(savedList.size() == (rows.size()-1)){ savedList.add(row); brokenIndex ++; if(this.logger.isDebugEnabled()){ this.logger.debug("最后一批不超过'" + this.saveSizeOnce + "',最后一个元素加入保存队列"); } } // 当保存集合满了,或者与原始数据 if(savedList.size() >= this.saveSizeOnce || savedList.size() == rows.size()){ logger.debug("开始保存一次:" + savedList.size()); try{ this.saveImportData(savedList, fieldNames); this.increaseImportSize(savedList.size()); savedList.clear(); savedList = new ArrayList<>(256); } catch (Exception ex){ if(this.brokenContinue){ logger.error("导入出现错误,启用断点续传(导),记录当前记录位置:" + brokenIndex); this.saveBrokenInfo(brokenIndex); } if(ex instanceof BusinessImportException){ throw ex; } throw new DataImportException("导入数据出现异常:" + ex.getMessage(), ex); } } savedList.add(row); brokenIndex ++; } } protected boolean doValidate(List rows){ boolean success = true; Map map = null; List> kv = null; Object[] srcObject = null; String error = null; // for(Object[] row : rows){ for(Iterator it = rows.iterator(); it.hasNext();){ srcObject = it.next(); map = this.getDataMap(srcObject); kv = this.getKeyValueList(srcObject, this.fieldNames); // error = this.validateData(map); error = this.validateData(map); if(error != null){ if(success){ success = false; } // 2023-02-02 直接调用错误记录器写入错误 // this.recordUnValidateData(map); if(this.showError){ this.errorWriter.write(kv, error); } // 2023-02-01 校验不通过,从集合删除 it.remove(); continue; } } return success; } /** * 把加载的数据转换成 Map 对象,key = column_name, value = data_object * @param data * @return */ protected TreeMap getDataMap(Object[] data){ if(this.fieldNames == null){ throw new IllegalArgumentException("不存在列信息,无法获取数据Map"); } // 注意:这里必须用有序的 TreeMap TreeMap map = new TreeMap<>(); for(int i=0; i> getKeyValueList(Object[] data, List fieldNames){ List> list = new ArrayList<>(); KeyValue kv = null; for(int i=0; i(fieldNames.get(i), data[i] == null? null : data[i].toString()); list.add(kv); } return list; } @Override public boolean isBatchSupport() { return this.batchSupport; } @Override public boolean isBrokenContinue() { return this.brokenContinue; } @Override public void setBatchEnabled() { this.setBatchSize(MAX_BATCH_SIZE); } @Override public void setBatchSize(long batchSize) { if(batchSize < 0){ throw new IllegalArgumentException("batchSize不能小于0"); } if(batchSize > MAX_BATCH_SIZE){ throw new IllegalArgumentException("batchSize建议不要大于:" + MAX_BATCH_SIZE); } if(batchSize <= this.saveSizeOnce){ throw new IllegalArgumentException("批量大小必须大于'每个保存数量':" + this.saveSizeOnce); } this.batchSize = batchSize; this.batchSupport = true; } @Override public void setBrokenContinue(boolean allow) { this.brokenContinue = allow; } @Override public void setBrokenId(String brokenId){ // throw new UnsupportedOperationException("未写代码呢?"); this.brokenId = brokenId; } @Override public void setFieldNames(List fieldNames){ if(this.fieldNames != null){ return; } if(StringUtils.isEmptyList(fieldNames)){ throw new IllegalArgumentException("fieldNames 不能为空"); } this.fieldNames = fieldNames; } @Override public void setIgnoreRows(int number){ if(number <= 0){ throw new IllegalArgumentException("忽略行数必须大于0"); } this.ignoreRows = number; } @Override public void setBatchSleepMills(long millSeconds){ if(millSeconds <= 0 || millSeconds >= 10000){ throw new IllegalArgumentException("请设置合理批量导入间隔时间,一般在: 100 - 5000 之间"); } this.sleepMillSeconds = millSeconds; } @Override public List getFieldNames(){ return this.fieldNames; } /** * 返回批量加载导入数据的数量(每次) * @return * @date 2023-02-01 */ @Override public long getBatchSize(){ return this.batchSize; } @Override public long getSuccessSize() { return importSize; } @Override public String getId() { return id; } @Override public void setId(String id) { this.id = id; } @Override public void setShowError(boolean showError){ this.showError = showError; } @Override public void setSaveFileFolder(String saveFileFolder) { this.saveFileFolder = saveFileFolder; } /** * 返回错误文件路径,如: d:/file/12344566_error.csv * @return * @date 2023-02-01 */ @Override public String getErrorFile(){ if(StringUtils.isEmpty(this.saveFileFolder)){ throw new IllegalStateException("saveFileFolder未设置,无法保存错误文件"); } if(errorFile == null){ errorFile = new StringBuilder(this.saveFileFolder).append(this.getId()) .append(StringUtils.UNDERLINE).append(this.getImportFileSuffix()).toString(); } return errorFile; } protected String getSaveFileFolder(){ return this.saveFileFolder; } /** * 返回忽略导入多少行数据。 * @return */ public int getIgnoreRows(){ return this.ignoreRows; } /** * 返回批量导入时,每次保存数据,线程休息毫秒时间,如果0表示不休眠。 * @return */ public long getSleepMillSeconds(){ return this.sleepMillSeconds; } /** * 设置加载数据的原始参数,可能是文件路径、表名等。 * @param source */ protected void setSource(Object source){ if(source == null){ throw new IllegalArgumentException("source is not null!"); } this.source = source; } /** * 设置每次保存记录的数量,通常持久化到数据库中是一批次操作,系统默认:200 * @param saveSizeOnce */ public void setSaveSizeOnce(long saveSizeOnce) { if(saveSizeOnce < 0 || saveSizeOnce > this.batchSize){ throw new IllegalArgumentException("每次保存数据量只能在:0-" + this.batchSize + "之间"); } this.saveSizeOnce = saveSizeOnce; } @Override public void setHeadRowNumber(int headRowNumber){ this.headRowNumber = headRowNumber; } @Override public void setUpdateType(UpdateType updateType){ this.updateType = updateType; } /** * 判断数据更新类型,在需要检查数据是否存在时使用。 * @return * @date 2023-02-05 */ public UpdateType getUpdateType(){ return this.updateType; } /** * 返回表头有几行。 * @return */ public int getHeadRowNumber(){ return this.headRowNumber; } public long getSaveSizeOnce(){ return this.saveSizeOnce; } /** * 增加导入数量,表示已经成功导入数据量。 * @param addSize */ protected void increaseImportSize(long addSize){ this.importSize += addSize; } protected boolean isShowError(){ return this.showError; } /** * 由子类加载具体导入数据集合。 * @param source 原始输入源(参数) * @return * @throws DataImportException */ protected abstract List acquireImportDataList(Object source) throws DataImportException; /** * 批量加载具体导入数据集合。 * @param batchLoadListener * @param source * @throws DataImportException */ protected abstract void acquireImportBatch(BatchLoadListener batchLoadListener, Object source) throws DataImportException; /** * 判断一条数据是否符合验证,由业务实现。 * @param data * @return 返回校验不通过原因,如果校验正确则返回: null */ protected abstract String validateData(Map data); // protected abstract String validateData(List> data); // /** // * 记录验证不符合条件的记录,后续会让用户下载并参考。 // * @param data // */ // protected abstract void recordUnValidateData(Map data); /** * 保存导入的数据集合。 * @param dataList */ protected abstract void saveImportData(List dataList, List fieldNames) throws BusinessImportException; /** * 保存断点信息,后续重新导入需要使用。 * @param index 当前记录索引值 */ protected abstract void saveBrokenInfo(long index); /** * 获取一个错误记录器实现对象。 * @param id 导入唯一业务标识 * @param fieldNames 数据标题列集合 * @return * @date 2023-02-02 */ protected abstract ErrorWriter acquireErrorWriter(String id, List fieldNames); /** * 获得导入文(以及错误结果)文件后缀名,如: error.csv | error.xlsx * @return * @date 2023-02-08 */ protected abstract String getImportFileSuffix(); /** * 注意:这里需要关闭错误记录器,但不能设置csvProcessor为异步任务,否则这里关闭后就无法继续记录了。 * @date 2023-02-02 */ private void closeErrorWriter(){ if(this.errorWriter != null){ this.errorWriter.close(); this.logger.debug("关闭:errorWriter"); } } }