package com.integrated.zyyt.service; import com.integrated.zyyt.enetity.StationInfo; import com.integrated.zyyt.enetity.YyztTDjtjb; import com.integrated.zyyt.enetity.YyztTShkdrb; import com.integrated.zyyt.enetity.ZyytDataResult; import com.integrated.zyyt.enetity.business.Zyyt; import com.integrated.zyyt.util.ZyytUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.SQLException; import java.time.LocalDate; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; @Service @Slf4j //@Transactional(rollbackFor = Exception.class) public class ZyytService { @Resource private JdbcTemplate jdbcTemplate; //站点信息表 public void stationInfoTasks() { log.info("开始进行 机构同步"); LocalDate yestDay = LocalDate.now().minus(1, ChronoUnit.DAYS); String batchNo = ZyytUtil.getBatchNo(yestDay); log.info("批次号为 {}", batchNo); if (ZyytUtil.isRunning("STATIONINFO", yestDay)) { log.info("批次{}正在运行", batchNo); return; } Zyyt stationInfoService = ZyytUtil.getBusinessEntity("STATIONINFO", yestDay); // String exeSql = "merge into STATIONINFO A USING" + " (select ? V_JGBH from dual ) C " + "ON(A.V_JGBH=C.V_JGBH) " + "when matched then " + "update SET A.V_SFMC=?,A.V_SFDM=?,A.V_DSDM=?,A.V_DSMC=?,A.V_QXDM=?,A.V_XSMC=?,A.V_TDJGBH=?,A.V_TDJGMC=?,A.V_JGMC=?,A.V_LXRXM=?,A.V_LXDH=?,A.V_SJHM=?,A.V_LXDZ=?,A.ISCOUNTRY=?,A.C_WDLX =?,A.C_YZBM=?,A.V_YYWDBH=?,A.V_YYWDMC=?,A.V_GPSJD=?,A.V_GPSWD=?,A.YLGZDID =?,A.YLGZDMC=?,A.V_STATUS=?,A.D_SQRQ=?,A.D_SPRQ=?" + "when not matched then " + "insert(A.V_SFMC,A.V_SFDM,A.V_DSDM,A.V_DSMC,A.V_QXDM,A.V_XSMC,A.V_TDJGBH,A.V_TDJGMC,A.V_JGBH,A.V_JGMC,A.V_LXRXM,A.V_LXDH,A.V_SJHM,A.V_LXDZ,A.ISCOUNTRY,A.C_WDLX ,A.C_YZBM,A.V_YYWDBH,A.V_YYWDMC,A.V_GPSJD,A.V_GPSWD,A.YLGZDID ,A.YLGZDMC,A.V_STATUS,A.D_SQRQ,A.D_SPRQ) values " + "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; Long recordId = stationInfoService.insertRecord("STATIONINFO", batchNo); String exeSql = "merge into STATIONINFO A USING" + " (select ? V_JGBH, ? CPBD_LOAD_DT from dual ) C " + "ON(A.V_JGBH=C.V_JGBH) " + "when matched then " + "update SET A.CPBD_LOAD_DT=?, A.V_SFMC=?,A.V_SFDM=?,A.V_DSDM=?,A.V_DSMC=?,A.V_QXDM=?,A.V_XSMC=?,A.V_TDJGBH=?,A.V_TDJGMC=?,A.V_JGMC=?,A.V_LXRXM=?,A.V_LXDH=?,A.V_SJHM=?,A.V_LXDZ=?,A.ISCOUNTRY=?,A.C_WDLX =?,A.C_YZBM=?,A.V_YYWDBH=?,A.V_YYWDMC=?,A.V_GPSJD=?,A.V_GPSWD=?,A.YLGZDID =?,A.YLGZDMC=?,A.V_STATUS=?,A.D_SQRQ=?,A.D_SPRQ=? where A.CPBD_LOAD_DT < C.CPBD_LOAD_DT " + "when not matched then " + "insert(A.V_SFMC,A.V_SFDM,A.V_DSDM,A.V_DSMC,A.V_QXDM,A.V_XSMC,A.V_TDJGBH,A.V_TDJGMC,A.V_JGBH,A.V_JGMC,A.V_LXRXM,A.V_LXDH,A.V_SJHM,A.V_LXDZ,A.ISCOUNTRY,A.C_WDLX ,A.C_YZBM,A.V_YYWDBH,A.V_YYWDMC,A.V_GPSJD,A.V_GPSWD,A.YLGZDID ,A.YLGZDMC,A.V_STATUS,A.D_SQRQ,A.D_SPRQ,A.CPBD_LOAD_DT) values " + "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; Long totalSize = 0L; int pageNum = 0; // int pageSize = 1000; //设置回滚点 // Object savePoint = TransactionAspectSupport.currentTransactionStatus().createSavepoint(); try { // 查询总数 Long totalElements = ZyytUtil.getApiTotalElements("STATIONINFO", yestDay); if (totalElements == null || totalElements == 0) { log.info("共0条数据"); stationInfoService.updateRecord(recordId, 0, totalSize, "未拉取到数据"); return; } Long totalPages = ZyytUtil.calcPages(totalElements, PAGESIZE); log.info("共{}条数据,每页{}条,可分{}页,", totalElements, PAGESIZE, totalPages); for (pageNum = 0; pageNum <= totalPages - 1; pageNum++) { ZyytDataResult zyytDataResult = stationInfoService.getData(pageNum, PAGESIZE); List stationInfoList = zyytDataResult.getContent(); log.info("第{}次获取,本次获取到{}条数据", pageNum + 1, zyytDataResult.getContentSize()); totalSize += zyytDataResult.getContentSize(); // 将数据分批次插入对应的数据表 int batchSize = 500; int length = stationInfoList.size(); // 计算可以分成多少组 int num = length / batchSize; for (int i = 0; i <= num; i++) { // 开始位置 int fromIndex = i * batchSize; // 结束位置 int toIndex = (i + 1) * batchSize < length ? (i + 1) * batchSize : length; if (fromIndex == length) { continue; } System.out.println(fromIndex + " === " + toIndex); List stationInfoListCopy = stationInfoList.subList(fromIndex, toIndex); try { jdbcTemplate.batchUpdate(exeSql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { StationInfo stationInfo = stationInfoListCopy.get(i); // log.info("stationInfo==> {}", JSON.toJSONString(stationInfo)); ps.setString(1, stationInfo.getVJgbh()); // 6.14新增 // 查询用 ps.setDate(2, stationInfo.getCpbdLoadDt() == null ? null : new Date(stationInfo.getCpbdLoadDt().getTime())); // 更新用 ps.setDate(3, stationInfo.getCpbdLoadDt() == null ? null : new Date(stationInfo.getCpbdLoadDt().getTime())); ps.setString(4, stationInfo.getVSfmc()); ps.setString(5, stationInfo.getVSfdm()); ps.setString(6, stationInfo.getVDsdm()); ps.setString(7, stationInfo.getVDsmc()); ps.setString(8, stationInfo.getVQxdm()); ps.setString(9, stationInfo.getVXsmc()); ps.setString(10, stationInfo.getVTdjgbh()); ps.setString(11, stationInfo.getVTdjgmc()); ps.setString(12, stationInfo.getVJgmc()); ps.setString(13, stationInfo.getVLxrxm()); ps.setString(14, stationInfo.getVLxdh()); ps.setString(15, stationInfo.getVSjhm()); ps.setString(16, stationInfo.getVLxdz()); ps.setString(17, stationInfo.getIscountry()); ps.setShort(18, stationInfo.getCWdlx()); ps.setString(19, stationInfo.getCYzbm()); ps.setString(20, stationInfo.getVYywdbh()); ps.setString(21, stationInfo.getVYywdmc()); ps.setString(22, stationInfo.getVGpsjd()); ps.setString(23, stationInfo.getVGpswd()); ps.setString(24, stationInfo.getYlgzdid()); ps.setString(25, stationInfo.getYlgzdmc()); ps.setString(26, stationInfo.getVStatus()); // ps.setDate(25, new Date(stationInfo.getDSqrq().getTime())); ps.setDate(27, stationInfo.getDSqrq() == null ? null : new Date(stationInfo.getDSqrq().getTime())); // ps.setString(25, stationInfo.getDSqrq()); // ps.setString(26, stationInfo.getDSprq()); ps.setDate(28, stationInfo.getDSprq() == null ? null : new Date(stationInfo.getDSprq().getTime())); ps.setString(29, stationInfo.getVSfmc()); ps.setString(30, stationInfo.getVSfdm()); ps.setString(31, stationInfo.getVDsdm()); ps.setString(32, stationInfo.getVDsmc()); ps.setString(33, stationInfo.getVQxdm()); ps.setString(34, stationInfo.getVXsmc()); ps.setString(35, stationInfo.getVTdjgbh()); ps.setString(36, stationInfo.getVTdjgmc()); ps.setString(37, stationInfo.getVJgbh()); ps.setString(38, stationInfo.getVJgmc()); ps.setString(39, stationInfo.getVLxrxm()); ps.setString(40, stationInfo.getVLxdh()); ps.setString(41, stationInfo.getVSjhm()); ps.setString(42, stationInfo.getVLxdz()); ps.setString(43, stationInfo.getIscountry()); ps.setShort(44, stationInfo.getCWdlx()); ps.setString(45, stationInfo.getCYzbm()); ps.setString(46, stationInfo.getVYywdbh()); ps.setString(47, stationInfo.getVYywdmc()); ps.setString(48, stationInfo.getVGpsjd()); ps.setString(49, stationInfo.getVGpswd()); ps.setString(50, stationInfo.getYlgzdid()); ps.setString(51, stationInfo.getYlgzdmc()); ps.setString(52, stationInfo.getVStatus()); // ps.setString(51, stationInfo.getDSqrq()); ps.setDate(53, stationInfo.getDSqrq() == null ? null : new Date(stationInfo.getDSqrq().getTime())); ps.setDate(54, stationInfo.getDSprq() == null ? null : new Date(stationInfo.getDSprq().getTime())); //6.14新增 insert用 ps.setDate(55, stationInfo.getCpbdLoadDt() == null ? null : new Date(stationInfo.getCpbdLoadDt().getTime())); } @Override public int getBatchSize() { return stationInfoListCopy.size(); } }); } catch (Exception e) { log.error("设置参数值报错了"); // e.printStackTrace(); log.error(e.getMessage()); //手工回滚异常 // TransactionAspectSupport.currentTransactionStatus().rollbackToSavepoint(savePoint); stationInfoService.updateRecord(recordId, 1, 0L, e.getMessage()); return; } //试试代码 // break; } //试试代码 // break; } } catch (Exception e) { log.error(e.getMessage()); stationInfoService.updateRecord(recordId, 1, 0L, e.getMessage()); return; } stationInfoService.updateRecord(recordId, 0, totalSize, null); log.info("···········中邮易通 {}站点信息 任务执行完毕···········", yestDay); } private int PAGESIZE = 1000; @Value("${pullData.pageSize}") public void setTRYNUM(Integer pageSize) { pageSize = pageSize <= 0 ? 1000 : pageSize; this.PAGESIZE = pageSize; } //社会快递日报表 /** * @param date */ public void shkdrbTasks(LocalDate date) { log.info("开始进行 社会快递日报表"); String batchNo = ZyytUtil.getBatchNo(date); if (ZyytUtil.isRunning("YYZT_T_SHKDRB", date)) { log.info("YYZT_T_SHKDRB 的 批次{}正在运行", batchNo); return; } Zyyt stationInfoService = ZyytUtil.getBusinessEntity("YYZT_T_SHKDRB", date); Long recordId = stationInfoService.insertRecord("YYZT_T_SHKDRB", batchNo); // 删除已有批次数据 String delSql = "delete from YYZT_T_SHKDRB where BATCH_NO='" + batchNo + "'"; jdbcTemplate.update(delSql); Long totalSize = 0L; Long totalElements = 0L; //设置回滚点 // Object savePoint = TransactionAspectSupport.currentTransactionStatus().createSavepoint(); try { int pageNum = 0; // int pageSize = 1000; String batchInsertSql = "INSERT INTO YYZT_T_SHKDRB (\"V_SFDM\",\"V_SFMC\",\"V_DSDM\",\"V_DSMC\",\"V_QXDM\",\"V_XSMC\",\"V_TDJGBH\",\"V_TDJGMC\",\"V_JGBH\",\"V_JGMC\",\"D_RBRQ\",\"N_ZGYZLJS\",\"N_ZGYZQJS\",\"N_SHKDLJS\",\"N_SHKDQJS\",\"N_ZTLJS\",\"N_ZTQJS\",\"N_YTLJS\",\"N_YTQJS\",\"N_STLJS\",\"N_STQJS\",\"N_BSLJS\",\"N_BSQJS\",\"N_YDLJS\",\"N_YDQJS\",\"N_YFLJS\",\"N_YFQJS\",\"N_JDLJS\",\"N_JDQJS\",\"N_JTLJS\",\"N_JTQJS\",\"N_TTLJS\",\"N_TTQJS\",\"N_DBLJS\",\"N_DBQJS\",\"N_DNLJS\",\"N_DNQJS\",\"N_FWLJS\",\"N_FWQJS\",\"N_YSLJS\",\"N_YSQJS\",\"N_QTLJS\",\"N_QTQJS\",\"RESERVED1\",\"RESERVED2\",\"RESERVED3\",\"RESERVED4\",\"BATCH_NO\") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; // 查询总数 totalElements = ZyytUtil.getApiTotalElements("YYZT_T_SHKDRB", date); if (totalElements == null || totalElements == 0) { log.info("共0条数据"); stationInfoService.updateRecord(recordId, 0, totalSize, "未拉取到数据"); return; } Long totalPages = ZyytUtil.calcPages(totalElements, PAGESIZE); log.info("社会快递日报表(YYZT_T_SHKDRB) 共{}条数据,每页{}条,可分{}页,", totalElements, PAGESIZE, totalPages); for (pageNum = 0; pageNum <= totalPages - 1; pageNum++) { ZyytDataResult zyytDataResult = stationInfoService.getData(pageNum, PAGESIZE); List stationInfoList = zyytDataResult.getContent(); totalSize += zyytDataResult.getContentSize(); log.info("第{}次获取,本次获取到{}条数据", pageNum + 1, zyytDataResult.getContentSize()); // 将数据分批次插入对应的数据表 int batchSize = 500; int length = stationInfoList.size(); // 计算可以分成多少组 int num = length / batchSize; for (int i = 0; i <= num; i++) { // 开始位置 int fromIndex = i * batchSize; // 结束位置 int toIndex = (i + 1) * batchSize < length ? (i + 1) * batchSize : length; if (fromIndex == length) { continue; } List yyztTShkdrbs = stationInfoList.subList(fromIndex, toIndex); try { List insertShkdrbList = new ArrayList<>(); yyztTShkdrbs.forEach(item -> { insertShkdrbList.add(new Object[]{ item.getVSfdm(), item.getVSfmc(), item.getVDsdm(), item.getVDsmc(), item.getVQxdm(), item.getVXsmc(), item.getVTdjgbh(), item.getVTdjgmc(), item.getVJgbh(), item.getVJgmc(), item.getDRbrq(), item.getNZgyzljs(), item.getNZgyzqjs(), item.getNShkdljs(), item.getNShkdqjs(), item.getNZtljs(), item.getNZtqjs(), item.getNYtljs(), item.getNYtqjs(), item.getNStljs(), item.getNStqjs(), item.getNBsljs(), item.getNBsqjs(), item.getNYdljs(), item.getNYdqjs(), item.getNYfljs(), item.getNYfqjs(), item.getNJdljs(), item.getNJdqjs(), item.getNJtljs(), item.getNJtqjs(), item.getNTtljs(), item.getNTtqjs(), item.getNDbljs(), item.getNDbqjs(), item.getNDnljs(), item.getNDnqjs(), item.getNFwljs(), item.getNFwqjs(), item.getNYsljs(), item.getNYsqjs(), item.getNQtljs(), item.getNQtqjs(), item.getReserved1(), item.getReserved2(), item.getReserved3(), item.getReserved4(), batchNo}); }); jdbcTemplate.batchUpdate(batchInsertSql, insertShkdrbList); } catch (Exception e) { log.error(e.getMessage()); //手工回滚异常 // TransactionAspectSupport.currentTransactionStatus().rollbackToSavepoint(savePoint); stationInfoService.updateRecord(recordId, 1, 0L, e.getMessage()); return; } } } } catch (Exception e) { log.error(e.getMessage()); stationInfoService.updateRecord(recordId, 1, 0L, e.getMessage()); return; } if (totalSize != totalElements) { log.error("拉取到的总条数:{}与接口显示的总条数:{}不一致,删除拉取到数据! ", totalSize, totalElements); jdbcTemplate.update(delSql); stationInfoService.updateRecord(recordId, 1, 0L, "拉取到的总条数与接口显示的总条数不一致!"); return; } stationInfoService.updateRecord(recordId, 0, totalSize, null); log.info("···········中邮易通 社会快递日报表{} 任务执行完毕···········", date); } // 代寄统计表 public void djtjbTasks(LocalDate date) { log.info("开始进行 代寄统计表 -- {}", date); String batchNo = ZyytUtil.getBatchNo(date); if (ZyytUtil.isRunning("YYZT_T_DJTJB", date)) { log.info("YYZT_T_DJTJB 的 批次{}正在运行", batchNo); return; } Zyyt stationInfoService = ZyytUtil.getBusinessEntity("YYZT_T_DJTJB", date); Long recordId = stationInfoService.insertRecord("YYZT_T_DJTJB", batchNo); // 删除已有批次数据 String delSql = "delete from YYZT_T_DJTJB where BATCH_NO='" + batchNo + "'"; jdbcTemplate.update(delSql); Long totalSize = 0L; Long totalElements = 0L; //设置回滚点 // Object savePoint = TransactionAspectSupport.currentTransactionStatus().createSavepoint(); try { int pageNum = 0; // int pageSize = 1000; String batchInsertSql = "INSERT INTO YYZT_T_DJTJB (\"V_SFDM\", \"V_SFMC\", \"V_DSDM\", \"V_DSMC\", \"V_QXDM\", \"V_XSMC\", \"V_TDJGBH\", \"V_JGBH\", \"V_JGMC\", \"D_RBRQ\", \"V_YLGZDID\", \"V_YLGZDMC\", \"N_SJYWL\", \"N_TKYWL\", \"N_BKYEL\", \"N_SJZJE\", \"N_TKSR\", \"N_BKSR\", \"BATCH_NO\") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; // 查询总数 totalElements = ZyytUtil.getApiTotalElements("YYZT_T_DJTJB", date); if (totalElements == null || totalElements == 0) { log.info("共0条数据"); stationInfoService.updateRecord(recordId, 0, totalSize, "未拉取到数据"); return; } Long totalPages = ZyytUtil.calcPages(totalElements, PAGESIZE); log.info("共{}条数据,每页{}条,可分{}页,", totalElements, PAGESIZE, totalPages); for (pageNum = 0; pageNum <= totalPages - 1; pageNum++) { ZyytDataResult zyytDataResult = stationInfoService.getData(pageNum, PAGESIZE); List djtjbList = zyytDataResult.getContent(); totalSize += zyytDataResult.getContentSize(); log.info("第{}次获取,本次获取到{}条数据", pageNum + 1, zyytDataResult.getContentSize()); // 将数据分批次插入对应的数据表 int batchSize = 500; int length = djtjbList.size(); // 计算可以分成多少组 int num = length / batchSize; for (int i = 0; i <= num; i++) { // 开始位置 int fromIndex = i * batchSize; // 结束位置 int toIndex = (i + 1) * batchSize < length ? (i + 1) * batchSize : length; if (fromIndex == length) { continue; } System.out.println(fromIndex + " === " + toIndex); List yyztTDjtjbs = djtjbList.subList(fromIndex, toIndex); try { List insertShkdrbList = new ArrayList<>(); yyztTDjtjbs.forEach(item -> { insertShkdrbList.add(new Object[]{ item.getVSfdm(), item.getVSfmc(), item.getVDsdm(), item.getVDsmc(), item.getVQxdm(), item.getVXsmc(), item.getVTdjgbh(), item.getVJgbh(), item.getVJgmc(), item.getDRbrq(), item.getVYlgzdid(), item.getVYlgzdmc(), item.getNSjywl(), item.getNTkywl(), item.getNBkyel(), item.getNSjzje(), item.getNTksr(), item.getNBksr(), batchNo }); }); jdbcTemplate.batchUpdate(batchInsertSql, insertShkdrbList); } catch (Exception e) { log.error(e.getMessage()); //手工回滚异常 // TransactionAspectSupport.currentTransactionStatus().rollbackToSavepoint(savePoint); stationInfoService.updateRecord(recordId, 1, 0L, e.getMessage()); return; } } } } catch (Exception e) { log.error(e.getMessage()); stationInfoService.updateRecord(recordId, 1, 0L, e.getMessage()); return; } if (totalSize != totalElements) { log.error("拉取到的总条数:{}与接口显示的总条数:{}不一致,删除拉取到数据! ", totalSize, totalElements); jdbcTemplate.update(delSql); stationInfoService.updateRecord(recordId, 1, 0L, "拉取到的总条数与接口显示的总条数不一致!"); return; } stationInfoService.updateRecord(recordId, 0, totalSize, null); log.info("···········中邮易通 代寄统计表 -- [{}] 任务执行完毕···········", date); } // /** // * 判断指定日期是否需要重新执行 // * // * @param tableName // * @param date // * @return true 需要执行,false 不需要 // */ // private boolean canGetAgain(String tableName, LocalDate date) { // String sql = "SELECT*FROM (SELECT*FROM ZYYT_RECORD WHERE BATCH_NO='" + ZyytConstant.batchNoPrefix + date + "' AND TABLE_NAME='" + tableName + "' ORDER BY START_TIME DESC) tmp WHERE ROWNUM<=1"; // Map map = null; // try { // map = jdbcTemplate.queryForMap(sql); // } catch (Exception e) { // log.info("sql=={}没有查询到数据", sql); // } // if (map != null) { // String isFinish = Convert.toStr(map.get("IS_FINISH")); // if (!"1".equals(isFinish)) { // return false; // } // //下面 isFinish都是1 // String isError = Convert.toStr(map.get("IS_ERROR")); // if ("1".equals(isError)) { // // 执行有误 // return true; // } // // // 成功执行没有错 // String countNum = Convert.toStr(map.get("COUNT_NUM")); // if ("0".equals(countNum)) { // //没有拉取到数据 // log.info("stationInfoTask"); // return true; // } // } else { // // 该批次未执行 // log.info("stationInfoTask"); // return true; // } // return false; // } }