shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package com.walker.di.univocity;
 
import com.univocity.parsers.common.processor.BatchedColumnProcessor;
import com.univocity.parsers.common.processor.ObjectRowListProcessor;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import com.walker.di.BatchLoadListener;
import com.walker.di.Constants;
import com.walker.di.DataImportException;
import com.walker.di.ErrorWriter;
import com.walker.di.support.InputStreamDataImportor;
import com.walker.di.univocity.util.BatchLoadUtils;
import com.walker.infrastructure.ApplicationRuntimeException;
import com.walker.infrastructure.utils.StringUtils;
 
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
 
/**
 * univocity实现的CSV文件导入。
 * @author 时克英
 * @date 2023-01-31
 */
public abstract class CsvDataImportor extends InputStreamDataImportor {
 
    private String batchError = null;
 
    @Override
    public String getBatchError(){
        return this.batchError;
    }
 
    @Override
    public void setBatchEnabled() {
        this.setBatchSize(MAX_BATCH_SIZE);
    }
 
    @Override
    public void setBatchSize(long batchSize) {
        super.setBatchSize(batchSize);
        // 2023-02-01 设置默认的批量加载监听器实现。
        this.setBatchLoadListener(new InternalBatchListenerImpl());
    }
 
    @Override
    public String getImportFileSuffix(){
        return Constants.IMPORT_ERROR_SUFFIX_CSV;
    }
 
    @Override
    protected List<Object[]> acquireImportDataList(Object source) throws DataImportException {
        this.checkSource(source);
        try{
            ObjectRowListProcessor rowListProcessor = new ObjectRowListProcessor();
            CsvParserSettings csvParserSettings = new CsvParserSettings();
            csvParserSettings.setHeaderExtractionEnabled(true);
//            csvParserSettings.setCommentCollectionEnabled(true);
            csvParserSettings.setMaxColumns(64);
            // 忽略第一行
            if(this.getIgnoreRows() > 0){
                csvParserSettings.setNumberOfRowsToSkip(this.getIgnoreRows());
            }
            csvParserSettings.setProcessor(rowListProcessor);
            CsvParser csvParser = new CsvParser(csvParserSettings);
            csvParser.parse((InputStream) source);
 
            String[] headers = rowListProcessor.getHeaders();
            if(headers == null || headers.length == 0){
                throw new DataImportException("导入文件未找到列名称行", null);
            }
            this.setFieldNames(Arrays.asList(headers));
 
            return rowListProcessor.getRows();
 
        } catch (Exception ex){
            throw new DataImportException("CsvParser解析异常:" + ex.getMessage(), ex);
        } finally {
            this.releaseSource(source);
        }
    }
 
    @Override
    protected void acquireImportBatch(BatchLoadListener batchLoadListener, Object source) throws DataImportException {
        this.checkSource(source);
        long sleepMillSeconds = this.getSleepMillSeconds();
 
        CsvParserSettings csvParserSettings = new CsvParserSettings();
        csvParserSettings.setMaxColumns(64);
        // 忽略第一行
        if(this.getIgnoreRows() > 0){
            csvParserSettings.setNumberOfRowsToSkip(this.getIgnoreRows());
        }
        csvParserSettings.setHeaderExtractionEnabled(true);
        BatchedColumnProcessor batchedColumnProcessor = new BatchedColumnProcessor((int)CsvDataImportor.this.getBatchSize()) {
            @Override
            public void batchProcessed(int dataSize) {
                // 设置列信息。
                CsvDataImportor.this.setFieldNames(Arrays.asList(this.getHeaders()));
 
                Map<String, List<String>> columnsByName = this.getColumnValuesAsMapOfNames();
                List<Object[]> rows = batchLoadListener.onBatchLoad(columnsByName, this.getHeaders(), dataSize);
                if(StringUtils.isEmptyList(rows)){
                    logger.error("未批量解析到任何数据:batchLoadListener.onBatchLoad, dataSize=" + dataSize);
                    return;
                }
                logger.debug("批量解析到数据量:" + rows.size());
 
                // 校验没批数据,出现问题直接记录到文件中。
                // 校验放到 this.doExecuteImport()方法中了 2023-02-02
//                CsvDataImportor.this.doValidate(rows);
 
                try {
                    CsvDataImportor.this.doExecuteImport(rows, getFieldNames());
                } catch (DataImportException e) {
                    batchError = e.getMessage();
                    throw new ApplicationRuntimeException("批量导入监听错误:" + e.getMessage(), e);
                } finally {
                    releaseSource(source);
                }
 
                if(sleepMillSeconds > 0){
                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepMillSeconds);
                        logger.debug("-----> sleep: " + sleepMillSeconds);
                    } catch (InterruptedException e) {
                        logger.error("间隔等待(防止CUP过高),出现异常:" + e.getMessage());
                    }
                }
 
//                for(Map.Entry<String, List<String>> entry : columnsByName.entrySet()){
//                    System.out.println("column = " + entry.getKey());
//                    System.out.println("values = " + entry.getValue());
//                    System.out.println("------------------------------");
//                }
            }
        };
        csvParserSettings.setProcessor(batchedColumnProcessor);
        CsvParser csvParser = new CsvParser(csvParserSettings);
        try{
            csvParser.parse((InputStream) source);
        } catch (Exception ex){
            throw new DataImportException(ex.getMessage(), ex);
        } finally {
            this.releaseSource(source);
        }
    }
 
//    @Override
//    protected void recordUnValidateData(Map<String, String> data) {
//    }
 
//    private void checkSource(Object source){
//        if(!(source instanceof InputStream)){
//            throw new IllegalArgumentException("source必须是:InputStream");
//        }
//    }
 
    private class InternalBatchListenerImpl implements BatchLoadListener{
        @Override
        public List<Object[]> onBatchLoad(Map<String, List<String>> columnsByName, String[] fieldNames, int dataSize) {
            return BatchLoadUtils.translateTo(columnsByName, fieldNames, dataSize);
        }
    }
 
    @Override
    protected ErrorWriter acquireErrorWriter(String id, List<String> fieldNames){
        InputStream templateFileStream = null;
        OutputStream errorFileStream = null;
        File errorFile = null;
        try {
//            templateFile = ResourceUtils.getFile(Constants.IMPORT_ERROR_FILE_TEMPLATE);
//            templateFileStream = this.getClass().getClassLoader().getResourceAsStream(Constants.IMPORT_ERROR_FILE_TEMPLATE);
            errorFile = new File(this.getErrorFile());
            errorFileStream = new BufferedOutputStream(new FileOutputStream(errorFile));
            // 这里不能关闭输出流,否则无法写入数据,后续会一直写入。
//            FileCopyUtils.copy(templateFileStream, errorFileStream, false);
            return new CsvErrorWriter(errorFileStream, fieldNames);
//            return new TsvErrorWriter(errorFileStream, fieldNames);
 
        } catch (FileNotFoundException e) {
            logger.error("未找到'错误模板文件':" + Constants.IMPORT_ERROR_FILE_TEMPLATE, e);
            return null;
        }
//        catch (IOException ex){
//            logger.error(errorFile.getAbsolutePath());
//            logger.error("通过模板拷贝错误文件失败:" + Constants.IMPORT_ERROR_FILE_TEMPLATE, ex);
//            return null;
//        }
        finally {
            if(templateFileStream != null){
                try {
                    templateFileStream.close();
                } catch (IOException e) {}
            }
//            if(errorFileStream != null){
//                try {
//                    errorFileStream.close();
//                } catch (IOException e) {}
//            }
        }
    }
 
}