package com.iplatform.gather.cache;
|
|
import com.iplatform.gather.service.StoreServiceImpl;
|
import com.iplatform.model.po.Sdc_meta_db;
|
import com.iplatform.model.po.Sdc_meta_table;
|
import com.walker.cache.AbstractCacheProvider;
|
import com.walker.cache.Cachable;
|
import com.walker.cache.Cache;
|
import com.walker.connector.Address;
|
import com.walker.infrastructure.utils.StringUtils;
|
|
import java.util.Iterator;
|
import java.util.List;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
/**
|
* 由于采集元数据缓存较复杂,所以暂时不实现:Redis方式。
|
* @author 时克英
|
* @date 2022-09-20
|
*/
|
public class LocalMetaDataCacheProvider extends AbstractCacheProvider<StoreObject> {
|
|
// 标识是否第一次调用检查数据库
|
private AtomicBoolean first = new AtomicBoolean(true);
|
|
private StoreServiceImpl storeService;
|
|
public void setStoreService(StoreServiceImpl storeService) {
|
this.storeService = storeService;
|
}
|
|
@Override
|
protected int loadDataToCache(Cache cache) {
|
List<Sdc_meta_db> metaDbList = this.storeService.queryAllMetaDbs();
|
if(metaDbList != null){
|
StoreObject so = null;
|
for(Sdc_meta_db m : metaDbList){
|
so = (StoreObject)cache.get(m.getStore_id());
|
if(so == null){
|
so = new StoreObject();
|
cache.put(m.getStore_id(), so);
|
}
|
so.addDb(createWrapAddress(m), m.getId());
|
}
|
|
List<Sdc_meta_table> metaTableList = this.storeService.queryAllMetaTables();
|
for(Sdc_meta_table m : metaTableList){
|
so = (StoreObject)cache.get(m.getStore_id());
|
so.addTable(m.getDb_id(), m.getTable_name());
|
}
|
return metaDbList.size();
|
}
|
return 0;
|
}
|
|
@Override
|
public String getProviderName() {
|
return "cache.gather.meta_data";
|
}
|
|
@Override
|
public Class<?> getProviderType() {
|
return StoreObject.class;
|
}
|
|
private Address createWrapAddress(Sdc_meta_db metaDb){
|
String[] hostInfo = metaDb.getHost_info().split(StringUtils.SEPARATOR_COLON);
|
Address a = new Address();
|
a.setUrl(hostInfo[0]);
|
a.setPort(Integer.parseInt(hostInfo[1]));
|
a.setService(metaDb.getDatabase_name());
|
a.setUsing(metaDb.getIs_using() == 1);
|
a.setAuthentication(metaDb.getUsername());
|
a.setCertification(metaDb.getPassword());
|
return a;
|
}
|
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
//~ 以下为缓存特定方法。2022-09-20
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
public void addAddress(String storeId, Address address, long metaDbId){
|
StoreObject so = this.getCacheData(storeId);
|
if(so == null){
|
so = new StoreObject();
|
this.putCacheData(storeId, so);
|
}
|
so.addDb(address, metaDbId);
|
}
|
|
public void addTable(String storeId, Address address, String destTableName){
|
StoreObject so = this.getCacheData(storeId);
|
so.addTable(address, destTableName);
|
}
|
|
/**
|
* 根据地址返回该地址对应的metaDbId
|
* @param storeId
|
* @param address
|
* @return
|
*/
|
public Long getMetaDbId(String storeId, Address address){
|
return this.getCacheData(storeId).getMetaDbId(address);
|
}
|
|
/**
|
* 返回在存储系统中,给定主机上有几个数据库
|
* @param address
|
* @return
|
*/
|
public int getDatabaseSize(Address address){
|
int i = 0;
|
StoreObject so = null;
|
for(Iterator<Cachable> it = this.getCache().getIterator(); it.hasNext();){
|
so = (StoreObject)it.next();
|
i = i + so.getDatabaseSize(address);
|
}
|
logger.debug("主机'" + address + "'中有数据库:" + i + "个");
|
return i;
|
}
|
|
/**
|
* 返回给定的数据库中,元数据记录了有多少表
|
* @param address
|
* @return
|
*/
|
public int getTableSize(Address address){
|
int i = 0;
|
StoreObject so = null;
|
Cachable cachable = null;
|
for(Iterator<Cachable> it = this.getCache().getIterator(); it.hasNext();){
|
cachable = it.next();
|
if(cachable == null){
|
continue;
|
}
|
so = (StoreObject)cachable.getValue();
|
i = i + so.getTableSize(address);
|
}
|
logger.debug("数据库'" + address + "'中有表:" + i + "个");
|
return i;
|
}
|
|
/**
|
* 返回当前存储正在使用的数据库信息
|
* @return
|
*/
|
public Address getUsingAddress(String storeId){
|
StoreObject so = this.getCacheData(storeId);
|
if(so == null){
|
logger.debug("缓存中未找到存储对象'StoreObject',可能还没有创建,storeId = " + storeId);
|
return null;
|
}
|
Address a = so.getUsingAddress();
|
logger.debug("存储正在使用的数据库:" + a + ", storeId = " + storeId);
|
return a;
|
}
|
|
public Address getDatabaseAddress(String storeId, long metaDbId){
|
StoreObject so = this.getCacheData(storeId);
|
if(so == null){
|
logger.debug("缓存中未找到存储对象'StoreObject',可能还没有创建,storeId = " + storeId);
|
return null;
|
}
|
Address db = so.getDbCache(metaDbId);
|
return db;
|
}
|
|
public boolean isExistDatabase(String storeId, Address address) {
|
StoreObject so = this.getCacheData(storeId);
|
if(so == null){
|
if(first.get()){
|
first.compareAndSet(true, false);
|
return false;
|
} else {
|
throw new IllegalStateException("状态错误:缓存中未找到StoreObject,storeId = " + storeId);
|
}
|
}
|
return so.isExistDatabase(address);
|
}
|
|
public boolean isExistTable(String storeId, Address address, String tableName) {
|
StoreObject so = this.getCacheData(storeId);
|
return so.isExistTable(storeId, address, tableName);
|
}
|
|
/**
|
* 从缓存中删除一个表数据
|
* @param storeId
|
* @param address
|
* @param tableName
|
*/
|
public void removeTable(String storeId, Address address, String tableName){
|
StoreObject so = this.getCacheData(storeId);
|
if(so == null){
|
logger.debug("缓存中未找到存储对象'StoreObject',可能还没有创建,storeId = " + storeId);
|
return;
|
}
|
so.removeTable(address, tableName);
|
}
|
}
|