package com.walker.di.univocity;
|
|
import com.univocity.parsers.common.processor.BatchedColumnProcessor;
|
import com.univocity.parsers.common.processor.ObjectRowListProcessor;
|
import com.univocity.parsers.csv.CsvParser;
|
import com.univocity.parsers.csv.CsvParserSettings;
|
import com.walker.di.BatchLoadListener;
|
import com.walker.di.Constants;
|
import com.walker.di.DataImportException;
|
import com.walker.di.ErrorWriter;
|
import com.walker.di.support.InputStreamDataImportor;
|
import com.walker.di.univocity.util.BatchLoadUtils;
|
import com.walker.infrastructure.ApplicationRuntimeException;
|
import com.walker.infrastructure.utils.StringUtils;
|
|
import java.io.BufferedOutputStream;
|
import java.io.File;
|
import java.io.FileNotFoundException;
|
import java.io.FileOutputStream;
|
import java.io.IOException;
|
import java.io.InputStream;
|
import java.io.OutputStream;
|
import java.util.Arrays;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.TimeUnit;
|
|
/**
|
* univocity实现的CSV文件导入。
|
* @author 时克英
|
* @date 2023-01-31
|
*/
|
public abstract class CsvDataImportor extends InputStreamDataImportor {
|
|
private String batchError = null;
|
|
@Override
|
public String getBatchError(){
|
return this.batchError;
|
}
|
|
@Override
|
public void setBatchEnabled() {
|
this.setBatchSize(MAX_BATCH_SIZE);
|
}
|
|
@Override
|
public void setBatchSize(long batchSize) {
|
super.setBatchSize(batchSize);
|
// 2023-02-01 设置默认的批量加载监听器实现。
|
this.setBatchLoadListener(new InternalBatchListenerImpl());
|
}
|
|
@Override
|
public String getImportFileSuffix(){
|
return Constants.IMPORT_ERROR_SUFFIX_CSV;
|
}
|
|
@Override
|
protected List<Object[]> acquireImportDataList(Object source) throws DataImportException {
|
this.checkSource(source);
|
try{
|
ObjectRowListProcessor rowListProcessor = new ObjectRowListProcessor();
|
CsvParserSettings csvParserSettings = new CsvParserSettings();
|
csvParserSettings.setHeaderExtractionEnabled(true);
|
// csvParserSettings.setCommentCollectionEnabled(true);
|
csvParserSettings.setMaxColumns(64);
|
// 忽略第一行
|
if(this.getIgnoreRows() > 0){
|
csvParserSettings.setNumberOfRowsToSkip(this.getIgnoreRows());
|
}
|
csvParserSettings.setProcessor(rowListProcessor);
|
CsvParser csvParser = new CsvParser(csvParserSettings);
|
csvParser.parse((InputStream) source);
|
|
String[] headers = rowListProcessor.getHeaders();
|
if(headers == null || headers.length == 0){
|
throw new DataImportException("导入文件未找到列名称行", null);
|
}
|
this.setFieldNames(Arrays.asList(headers));
|
|
return rowListProcessor.getRows();
|
|
} catch (Exception ex){
|
throw new DataImportException("CsvParser解析异常:" + ex.getMessage(), ex);
|
} finally {
|
this.releaseSource(source);
|
}
|
}
|
|
@Override
|
protected void acquireImportBatch(BatchLoadListener batchLoadListener, Object source) throws DataImportException {
|
this.checkSource(source);
|
long sleepMillSeconds = this.getSleepMillSeconds();
|
|
CsvParserSettings csvParserSettings = new CsvParserSettings();
|
csvParserSettings.setMaxColumns(64);
|
// 忽略第一行
|
if(this.getIgnoreRows() > 0){
|
csvParserSettings.setNumberOfRowsToSkip(this.getIgnoreRows());
|
}
|
csvParserSettings.setHeaderExtractionEnabled(true);
|
BatchedColumnProcessor batchedColumnProcessor = new BatchedColumnProcessor((int)CsvDataImportor.this.getBatchSize()) {
|
@Override
|
public void batchProcessed(int dataSize) {
|
// 设置列信息。
|
CsvDataImportor.this.setFieldNames(Arrays.asList(this.getHeaders()));
|
|
Map<String, List<String>> columnsByName = this.getColumnValuesAsMapOfNames();
|
List<Object[]> rows = batchLoadListener.onBatchLoad(columnsByName, this.getHeaders(), dataSize);
|
if(StringUtils.isEmptyList(rows)){
|
logger.error("未批量解析到任何数据:batchLoadListener.onBatchLoad, dataSize=" + dataSize);
|
return;
|
}
|
logger.debug("批量解析到数据量:" + rows.size());
|
|
// 校验没批数据,出现问题直接记录到文件中。
|
// 校验放到 this.doExecuteImport()方法中了 2023-02-02
|
// CsvDataImportor.this.doValidate(rows);
|
|
try {
|
CsvDataImportor.this.doExecuteImport(rows, getFieldNames());
|
} catch (DataImportException e) {
|
batchError = e.getMessage();
|
throw new ApplicationRuntimeException("批量导入监听错误:" + e.getMessage(), e);
|
} finally {
|
releaseSource(source);
|
}
|
|
if(sleepMillSeconds > 0){
|
try {
|
TimeUnit.MILLISECONDS.sleep(sleepMillSeconds);
|
logger.debug("-----> sleep: " + sleepMillSeconds);
|
} catch (InterruptedException e) {
|
logger.error("间隔等待(防止CUP过高),出现异常:" + e.getMessage());
|
}
|
}
|
|
// for(Map.Entry<String, List<String>> entry : columnsByName.entrySet()){
|
// System.out.println("column = " + entry.getKey());
|
// System.out.println("values = " + entry.getValue());
|
// System.out.println("------------------------------");
|
// }
|
}
|
};
|
csvParserSettings.setProcessor(batchedColumnProcessor);
|
CsvParser csvParser = new CsvParser(csvParserSettings);
|
try{
|
csvParser.parse((InputStream) source);
|
} catch (Exception ex){
|
throw new DataImportException(ex.getMessage(), ex);
|
} finally {
|
this.releaseSource(source);
|
}
|
}
|
|
// @Override
|
// protected void recordUnValidateData(Map<String, String> data) {
|
// }
|
|
// private void checkSource(Object source){
|
// if(!(source instanceof InputStream)){
|
// throw new IllegalArgumentException("source必须是:InputStream");
|
// }
|
// }
|
|
private class InternalBatchListenerImpl implements BatchLoadListener{
|
@Override
|
public List<Object[]> onBatchLoad(Map<String, List<String>> columnsByName, String[] fieldNames, int dataSize) {
|
return BatchLoadUtils.translateTo(columnsByName, fieldNames, dataSize);
|
}
|
}
|
|
@Override
|
protected ErrorWriter acquireErrorWriter(String id, List<String> fieldNames){
|
InputStream templateFileStream = null;
|
OutputStream errorFileStream = null;
|
File errorFile = null;
|
try {
|
// templateFile = ResourceUtils.getFile(Constants.IMPORT_ERROR_FILE_TEMPLATE);
|
// templateFileStream = this.getClass().getClassLoader().getResourceAsStream(Constants.IMPORT_ERROR_FILE_TEMPLATE);
|
errorFile = new File(this.getErrorFile());
|
errorFileStream = new BufferedOutputStream(new FileOutputStream(errorFile));
|
// 这里不能关闭输出流,否则无法写入数据,后续会一直写入。
|
// FileCopyUtils.copy(templateFileStream, errorFileStream, false);
|
return new CsvErrorWriter(errorFileStream, fieldNames);
|
// return new TsvErrorWriter(errorFileStream, fieldNames);
|
|
} catch (FileNotFoundException e) {
|
logger.error("未找到'错误模板文件':" + Constants.IMPORT_ERROR_FILE_TEMPLATE, e);
|
return null;
|
}
|
// catch (IOException ex){
|
// logger.error(errorFile.getAbsolutePath());
|
// logger.error("通过模板拷贝错误文件失败:" + Constants.IMPORT_ERROR_FILE_TEMPLATE, ex);
|
// return null;
|
// }
|
finally {
|
if(templateFileStream != null){
|
try {
|
templateFileStream.close();
|
} catch (IOException e) {}
|
}
|
// if(errorFileStream != null){
|
// try {
|
// errorFileStream.close();
|
// } catch (IOException e) {}
|
// }
|
}
|
}
|
|
}
|