package com.walker.di.univocity; import com.univocity.parsers.common.processor.BatchedColumnProcessor; import com.univocity.parsers.common.processor.ObjectRowListProcessor; import com.univocity.parsers.csv.CsvParser; import com.univocity.parsers.csv.CsvParserSettings; import com.walker.di.BatchLoadListener; import com.walker.di.Constants; import com.walker.di.DataImportException; import com.walker.di.ErrorWriter; import com.walker.di.support.InputStreamDataImportor; import com.walker.di.univocity.util.BatchLoadUtils; import com.walker.infrastructure.ApplicationRuntimeException; import com.walker.infrastructure.utils.StringUtils; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** * univocity实现的CSV文件导入。 * @author 时克英 * @date 2023-01-31 */ public abstract class CsvDataImportor extends InputStreamDataImportor { private String batchError = null; @Override public String getBatchError(){ return this.batchError; } @Override public void setBatchEnabled() { this.setBatchSize(MAX_BATCH_SIZE); } @Override public void setBatchSize(long batchSize) { super.setBatchSize(batchSize); // 2023-02-01 设置默认的批量加载监听器实现。 this.setBatchLoadListener(new InternalBatchListenerImpl()); } @Override public String getImportFileSuffix(){ return Constants.IMPORT_ERROR_SUFFIX_CSV; } @Override protected List acquireImportDataList(Object source) throws DataImportException { this.checkSource(source); try{ ObjectRowListProcessor rowListProcessor = new ObjectRowListProcessor(); CsvParserSettings csvParserSettings = new CsvParserSettings(); csvParserSettings.setHeaderExtractionEnabled(true); // csvParserSettings.setCommentCollectionEnabled(true); csvParserSettings.setMaxColumns(64); // 忽略第一行 if(this.getIgnoreRows() > 0){ csvParserSettings.setNumberOfRowsToSkip(this.getIgnoreRows()); } csvParserSettings.setProcessor(rowListProcessor); CsvParser csvParser = new CsvParser(csvParserSettings); csvParser.parse((InputStream) source); String[] headers = rowListProcessor.getHeaders(); if(headers == null || headers.length == 0){ throw new DataImportException("导入文件未找到列名称行", null); } this.setFieldNames(Arrays.asList(headers)); return rowListProcessor.getRows(); } catch (Exception ex){ throw new DataImportException("CsvParser解析异常:" + ex.getMessage(), ex); } finally { this.releaseSource(source); } } @Override protected void acquireImportBatch(BatchLoadListener batchLoadListener, Object source) throws DataImportException { this.checkSource(source); long sleepMillSeconds = this.getSleepMillSeconds(); CsvParserSettings csvParserSettings = new CsvParserSettings(); csvParserSettings.setMaxColumns(64); // 忽略第一行 if(this.getIgnoreRows() > 0){ csvParserSettings.setNumberOfRowsToSkip(this.getIgnoreRows()); } csvParserSettings.setHeaderExtractionEnabled(true); BatchedColumnProcessor batchedColumnProcessor = new BatchedColumnProcessor((int)CsvDataImportor.this.getBatchSize()) { @Override public void batchProcessed(int dataSize) { // 设置列信息。 CsvDataImportor.this.setFieldNames(Arrays.asList(this.getHeaders())); Map> columnsByName = this.getColumnValuesAsMapOfNames(); List rows = batchLoadListener.onBatchLoad(columnsByName, this.getHeaders(), dataSize); if(StringUtils.isEmptyList(rows)){ logger.error("未批量解析到任何数据:batchLoadListener.onBatchLoad, dataSize=" + dataSize); return; } logger.debug("批量解析到数据量:" + rows.size()); // 校验没批数据,出现问题直接记录到文件中。 // 校验放到 this.doExecuteImport()方法中了 2023-02-02 // CsvDataImportor.this.doValidate(rows); try { CsvDataImportor.this.doExecuteImport(rows, getFieldNames()); } catch (DataImportException e) { batchError = e.getMessage(); throw new ApplicationRuntimeException("批量导入监听错误:" + e.getMessage(), e); } finally { releaseSource(source); } if(sleepMillSeconds > 0){ try { TimeUnit.MILLISECONDS.sleep(sleepMillSeconds); logger.debug("-----> sleep: " + sleepMillSeconds); } catch (InterruptedException e) { logger.error("间隔等待(防止CUP过高),出现异常:" + e.getMessage()); } } // for(Map.Entry> entry : columnsByName.entrySet()){ // System.out.println("column = " + entry.getKey()); // System.out.println("values = " + entry.getValue()); // System.out.println("------------------------------"); // } } }; csvParserSettings.setProcessor(batchedColumnProcessor); CsvParser csvParser = new CsvParser(csvParserSettings); try{ csvParser.parse((InputStream) source); } catch (Exception ex){ throw new DataImportException(ex.getMessage(), ex); } finally { this.releaseSource(source); } } // @Override // protected void recordUnValidateData(Map data) { // } // private void checkSource(Object source){ // if(!(source instanceof InputStream)){ // throw new IllegalArgumentException("source必须是:InputStream"); // } // } private class InternalBatchListenerImpl implements BatchLoadListener{ @Override public List onBatchLoad(Map> columnsByName, String[] fieldNames, int dataSize) { return BatchLoadUtils.translateTo(columnsByName, fieldNames, dataSize); } } @Override protected ErrorWriter acquireErrorWriter(String id, List fieldNames){ InputStream templateFileStream = null; OutputStream errorFileStream = null; File errorFile = null; try { // templateFile = ResourceUtils.getFile(Constants.IMPORT_ERROR_FILE_TEMPLATE); // templateFileStream = this.getClass().getClassLoader().getResourceAsStream(Constants.IMPORT_ERROR_FILE_TEMPLATE); errorFile = new File(this.getErrorFile()); errorFileStream = new BufferedOutputStream(new FileOutputStream(errorFile)); // 这里不能关闭输出流,否则无法写入数据,后续会一直写入。 // FileCopyUtils.copy(templateFileStream, errorFileStream, false); return new CsvErrorWriter(errorFileStream, fieldNames); // return new TsvErrorWriter(errorFileStream, fieldNames); } catch (FileNotFoundException e) { logger.error("未找到'错误模板文件':" + Constants.IMPORT_ERROR_FILE_TEMPLATE, e); return null; } // catch (IOException ex){ // logger.error(errorFile.getAbsolutePath()); // logger.error("通过模板拷贝错误文件失败:" + Constants.IMPORT_ERROR_FILE_TEMPLATE, ex); // return null; // } finally { if(templateFileStream != null){ try { templateFileStream.close(); } catch (IOException e) {} } // if(errorFileStream != null){ // try { // errorFileStream.close(); // } catch (IOException e) {} // } } } }