package com.iplatform.gather.support; import com.iplatform.gather.cache.LocalMetaDataCacheProvider; import com.iplatform.gather.service.StoreServiceImpl; import com.iplatform.model.po.S_host; import com.iplatform.model.po.Sdc_meta_db; import com.iplatform.model.po.Sdc_meta_table; import com.walker.cache.Cachable; import com.walker.cache.CacheProvider; import com.walker.connector.Address; import com.walker.db.DatabaseException; import com.walker.db.TableInfo; import com.walker.db.page.GenericPager; import com.walker.infrastructure.utils.NumberGenerator; import com.walker.store.AbstractMetaDataEngine; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class DefaultMetaDataEngine extends AbstractMetaDataEngine { private StoreServiceImpl storeService; private LocalMetaDataCacheProvider metaDataCacheProvider; private CacheProvider hostCacheProvider; private Map> cachedFields = new ConcurrentHashMap>(); // 缓存的字段集合最大值,超过此值系统会清除缓存重新慢慢积累 private final int maxCacheFields = 1024; public void setStoreService(StoreServiceImpl storeService) { this.storeService = storeService; } public void setMetaDataCacheProvider(LocalMetaDataCacheProvider metaDataCacheProvider) { this.metaDataCacheProvider = metaDataCacheProvider; } public void setHostCacheProvider(CacheProvider hostCacheProvider) { this.hostCacheProvider = hostCacheProvider; } @Override public Address getUsingAddress(String storeId) { return this.metaDataCacheProvider.getUsingAddress(storeId); } @Override public void saveNewAddress(String storeId, Address address) throws DatabaseException { Sdc_meta_db metaDb = new Sdc_meta_db(); metaDb.setId(NumberGenerator.getSequenceNumber()); metaDb.setCreate_time(System.currentTimeMillis()); metaDb.setDatabase_name(address.getServiceName()); metaDb.setHost_info(address.getUrl() + ":" + address.getPort()); metaDb.setStore_id(storeId); metaDb.setSummary(TIP_ADD_METADB); metaDb.setTable_count(0); metaDb.setUsed(1); this.storeService.execSaveMetaDb(metaDb); logger.info("引擎添加元数据,metaDB = " + metaDb); // 更新缓存 this.metaDataCacheProvider.addAddress(storeId, address, metaDb.getId()); } @Override public void saveNewTable(String storeId, Address addr, String destTableName) throws DatabaseException { Long metaDbId = this.metaDataCacheProvider.getMetaDbId(storeId, addr); if(metaDbId == null){ throw new IllegalArgumentException("缓存错误:metaDbId不存在,address = " + addr); } Sdc_meta_table mt = new Sdc_meta_table(); mt.setCreate_time(System.currentTimeMillis()); mt.setDb_id(metaDbId); mt.setStore_id(storeId); mt.setSummary(TIP_ADD_METATABLE); mt.setTable_name(destTableName); logger.info("引擎添加元数据,metaTable = " + mt); this.storeService.execSaveMetaTable(mt); // 更新缓存 this.metaDataCacheProvider.addTable(storeId, addr, destTableName); } @Override public List getFields(Address address, String tableName) { int hashCode = address.hashCode() + tableName.hashCode(); List fields = cachedFields.get(hashCode); if(fields == null){ // logger.debug("getFields, address = " + address + ", tableName = " + tableName); fields = this.getDatabaseMetaEngine().getFields(address, tableName); if(fields == null){ throw new IllegalStateException("元数据引擎错误:查找数据库未找到表字段信息。 address = " + address + ", tableName = " + tableName); } if(cachedFields.size() >= maxCacheFields){ cachedFields.clear(); } cachedFields.put(hashCode, fields); } return fields; } @Override public int getTableSize(Address address) { return this.metaDataCacheProvider.getTableSize(address); } @Override public int getDatabaseSize(Address address) { return this.metaDataCacheProvider.getDatabaseSize(address); } @Override public Map getTableRows(String storeId, long metaDbId, List tableNameList) { Address address = this.metaDataCacheProvider.getDatabaseAddress(storeId, metaDbId); if(address == null){ logger.debug("或者从数据库中直接查询host主机信息,去掉异常"); throw new IllegalStateException("dcMetaCacheProvider未找到缓存的数据库地址信息。storeId: " + storeId + ", metaDbId: " + metaDbId); } this.combinAddressAuthentication(address); return this.getDatabaseMetaEngine().getTableRows(address, tableNameList); } /** * 设置数据库的用户名、密码信息 * @param address */ private void combinAddressAuthentication(Address address){ // Iterator it = hostCacheProvider.getCache().getIterator(); S_host host = null; Cachable cache = null; for(Iterator it = hostCacheProvider.getCache().getIterator(); it.hasNext();){ cache = it.next(); if(cache != null){ host = (S_host)cache.getValue(); if(address.getUrl().equals(host.getUrl()) && address.getPort() == host.getPort() // && address.getServiceName().equals(host.getServiceName()) ){ // 把数据库服务器主机的用户名、密码设置给address对象 address.setAuthentication(host.getAuthentication()); address.setCertification(host.getCertification()); logger.debug(".........设置了address用户信息:" + host.getAuthentication()); } } } } @Override public boolean isExistDatabase(String storeId, Address address) { boolean result = this.metaDataCacheProvider.isExistDatabase(storeId, address); // logger.debug("是否存在数据库'" + address + ", storeId = " + storeId); return result; } @Override public boolean isExistTable(String storeId, Address address, String tableName) { boolean result = this.metaDataCacheProvider.isExistTable(storeId, address, tableName); // logger.debug("是否存在表'" + tableName + ", storeId = " + storeId + ", address = " + address); return result; } private final String TIP_ADD_METADB = "系统自动创建数据库并添加元数据"; private final String TIP_ADD_METATABLE = "系统自动创建表添加元数据"; /** * 分页获取采集元数据表信息,该方法从: StoreServiceImpl中迁移到该方法,避免循环依赖。 * @param storeId * @param metaDbId * @return * @date 2022-09-21 */ public GenericPager queryPageMetaTables(String storeId, long metaDbId) { // 查出来本页所有表中的数据量大小 GenericPager result = this.storeService.queryPageMetaTables(storeId); List datas = result.getDatas(); // 获取得到的数据表名字集合,为下面查询记录总数准备 List tableNameList = new ArrayList(); for (Sdc_meta_table mt : datas) { tableNameList.add(mt.getTable_name()); } // 把找到的每个表的记录数设置到属性中 // Address address = metaCache.getDatabaseAddress(storeId, metaDbId); TableInfo ti = null; try { Map map = this.getTableRows(storeId, metaDbId, tableNameList); for (Sdc_meta_table mt : datas) { ti = map.get(mt.getTable_name()); if (ti == null) { mt.setRow_count((long)-1); } else { mt.setRow_count(ti.getRows()); } } } catch (Exception ex) { logger.error("获取表结构元数据出现错误,可能物理表不存在", ex); for (Sdc_meta_table mt : datas) { mt.setRow_count((long)-1); } } return result; } }