package com.iplatform.gather.support;
|
|
import com.iplatform.gather.cache.LocalMetaDataCacheProvider;
|
import com.iplatform.gather.service.StoreServiceImpl;
|
import com.iplatform.model.po.S_host;
|
import com.iplatform.model.po.Sdc_meta_db;
|
import com.iplatform.model.po.Sdc_meta_table;
|
import com.walker.cache.Cachable;
|
import com.walker.cache.CacheProvider;
|
import com.walker.connector.Address;
|
import com.walker.db.DatabaseException;
|
import com.walker.db.TableInfo;
|
import com.walker.db.page.GenericPager;
|
import com.walker.infrastructure.utils.NumberGenerator;
|
import com.walker.store.AbstractMetaDataEngine;
|
|
import java.util.ArrayList;
|
import java.util.Iterator;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
public class DefaultMetaDataEngine extends AbstractMetaDataEngine {
|
|
private StoreServiceImpl storeService;
|
private LocalMetaDataCacheProvider metaDataCacheProvider;
|
private CacheProvider<S_host> hostCacheProvider;
|
|
private Map<Integer, List<String>> cachedFields = new ConcurrentHashMap<Integer, List<String>>();
|
|
// 缓存的字段集合最大值,超过此值系统会清除缓存重新慢慢积累
|
private final int maxCacheFields = 1024;
|
|
public void setStoreService(StoreServiceImpl storeService) {
|
this.storeService = storeService;
|
}
|
|
public void setMetaDataCacheProvider(LocalMetaDataCacheProvider metaDataCacheProvider) {
|
this.metaDataCacheProvider = metaDataCacheProvider;
|
}
|
|
public void setHostCacheProvider(CacheProvider<S_host> hostCacheProvider) {
|
this.hostCacheProvider = hostCacheProvider;
|
}
|
|
|
@Override
|
public Address getUsingAddress(String storeId) {
|
return this.metaDataCacheProvider.getUsingAddress(storeId);
|
}
|
|
@Override
|
public void saveNewAddress(String storeId, Address address) throws DatabaseException {
|
Sdc_meta_db metaDb = new Sdc_meta_db();
|
metaDb.setId(NumberGenerator.getSequenceNumber());
|
metaDb.setCreate_time(System.currentTimeMillis());
|
metaDb.setDatabase_name(address.getServiceName());
|
metaDb.setHost_info(address.getUrl() + ":" + address.getPort());
|
metaDb.setStore_id(storeId);
|
metaDb.setSummary(TIP_ADD_METADB);
|
metaDb.setTable_count(0);
|
metaDb.setUsed(1);
|
this.storeService.execSaveMetaDb(metaDb);
|
logger.info("引擎添加元数据,metaDB = " + metaDb);
|
|
// 更新缓存
|
this.metaDataCacheProvider.addAddress(storeId, address, metaDb.getId());
|
}
|
|
@Override
|
public void saveNewTable(String storeId, Address addr, String destTableName) throws DatabaseException {
|
Long metaDbId = this.metaDataCacheProvider.getMetaDbId(storeId, addr);
|
if(metaDbId == null){
|
throw new IllegalArgumentException("缓存错误:metaDbId不存在,address = " + addr);
|
}
|
Sdc_meta_table mt = new Sdc_meta_table();
|
mt.setCreate_time(System.currentTimeMillis());
|
mt.setDb_id(metaDbId);
|
mt.setStore_id(storeId);
|
mt.setSummary(TIP_ADD_METATABLE);
|
mt.setTable_name(destTableName);
|
logger.info("引擎添加元数据,metaTable = " + mt);
|
this.storeService.execSaveMetaTable(mt);
|
|
// 更新缓存
|
this.metaDataCacheProvider.addTable(storeId, addr, destTableName);
|
}
|
|
@Override
|
public List<String> getFields(Address address, String tableName) {
|
int hashCode = address.hashCode() + tableName.hashCode();
|
List<String> fields = cachedFields.get(hashCode);
|
if(fields == null){
|
// logger.debug("getFields, address = " + address + ", tableName = " + tableName);
|
fields = this.getDatabaseMetaEngine().getFields(address, tableName);
|
if(fields == null){
|
throw new IllegalStateException("元数据引擎错误:查找数据库未找到表字段信息。 address = " + address + ", tableName = " + tableName);
|
}
|
if(cachedFields.size() >= maxCacheFields){
|
cachedFields.clear();
|
}
|
cachedFields.put(hashCode, fields);
|
}
|
return fields;
|
}
|
|
@Override
|
public int getTableSize(Address address) {
|
return this.metaDataCacheProvider.getTableSize(address);
|
}
|
|
@Override
|
public int getDatabaseSize(Address address) {
|
return this.metaDataCacheProvider.getDatabaseSize(address);
|
}
|
|
@Override
|
public Map<String, TableInfo> getTableRows(String storeId, long metaDbId, List<String> tableNameList) {
|
Address address = this.metaDataCacheProvider.getDatabaseAddress(storeId, metaDbId);
|
if(address == null){
|
logger.debug("或者从数据库中直接查询host主机信息,去掉异常");
|
throw new IllegalStateException("dcMetaCacheProvider未找到缓存的数据库地址信息。storeId: " + storeId + ", metaDbId: " + metaDbId);
|
}
|
this.combinAddressAuthentication(address);
|
return this.getDatabaseMetaEngine().getTableRows(address, tableNameList);
|
}
|
|
/**
|
* 设置数据库的用户名、密码信息
|
* @param address
|
*/
|
private void combinAddressAuthentication(Address address){
|
// Iterator<Cachable> it = hostCacheProvider.getCache().getIterator();
|
S_host host = null;
|
Cachable cache = null;
|
for(Iterator<Cachable> it = hostCacheProvider.getCache().getIterator(); it.hasNext();){
|
cache = it.next();
|
if(cache != null){
|
host = (S_host)cache.getValue();
|
if(address.getUrl().equals(host.getUrl())
|
&& address.getPort() == host.getPort()
|
// && address.getServiceName().equals(host.getServiceName())
|
){
|
// 把数据库服务器主机的用户名、密码设置给address对象
|
address.setAuthentication(host.getAuthentication());
|
address.setCertification(host.getCertification());
|
logger.debug(".........设置了address用户信息:" + host.getAuthentication());
|
}
|
}
|
}
|
}
|
|
@Override
|
public boolean isExistDatabase(String storeId, Address address) {
|
boolean result = this.metaDataCacheProvider.isExistDatabase(storeId, address);
|
// logger.debug("是否存在数据库'" + address + ", storeId = " + storeId);
|
return result;
|
}
|
|
@Override
|
public boolean isExistTable(String storeId, Address address, String tableName) {
|
boolean result = this.metaDataCacheProvider.isExistTable(storeId, address, tableName);
|
// logger.debug("是否存在表'" + tableName + ", storeId = " + storeId + ", address = " + address);
|
return result;
|
}
|
|
private final String TIP_ADD_METADB = "系统自动创建数据库并添加元数据";
|
private final String TIP_ADD_METATABLE = "系统自动创建表添加元数据";
|
|
|
/**
|
* 分页获取采集元数据表信息,该方法从: StoreServiceImpl中迁移到该方法,避免循环依赖。
|
* @param storeId
|
* @param metaDbId
|
* @return
|
* @date 2022-09-21
|
*/
|
public GenericPager<Sdc_meta_table> queryPageMetaTables(String storeId, long metaDbId) {
|
// 查出来本页所有表中的数据量大小
|
GenericPager<Sdc_meta_table> result = this.storeService.queryPageMetaTables(storeId);
|
List<Sdc_meta_table> datas = result.getDatas();
|
|
// 获取得到的数据表名字集合,为下面查询记录总数准备
|
List<String> tableNameList = new ArrayList<String>();
|
for (Sdc_meta_table mt : datas) {
|
tableNameList.add(mt.getTable_name());
|
}
|
|
// 把找到的每个表的记录数设置到属性中
|
// Address address = metaCache.getDatabaseAddress(storeId, metaDbId);
|
TableInfo ti = null;
|
try {
|
Map<String, TableInfo> map = this.getTableRows(storeId, metaDbId, tableNameList);
|
for (Sdc_meta_table mt : datas) {
|
ti = map.get(mt.getTable_name());
|
if (ti == null) {
|
mt.setRow_count((long)-1);
|
} else {
|
mt.setRow_count(ti.getRows());
|
}
|
}
|
} catch (Exception ex) {
|
logger.error("获取表结构元数据出现错误,可能物理表不存在", ex);
|
for (Sdc_meta_table mt : datas) {
|
mt.setRow_count((long)-1);
|
}
|
}
|
return result;
|
}
|
}
|