package com.iplatform.gather.cache; import com.iplatform.gather.service.StoreServiceImpl; import com.iplatform.model.po.Sdc_meta_db; import com.iplatform.model.po.Sdc_meta_table; import com.walker.cache.AbstractCacheProvider; import com.walker.cache.Cachable; import com.walker.cache.Cache; import com.walker.connector.Address; import com.walker.infrastructure.utils.StringUtils; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; /** * 由于采集元数据缓存较复杂,所以暂时不实现:Redis方式。 * @author 时克英 * @date 2022-09-20 */ public class LocalMetaDataCacheProvider extends AbstractCacheProvider { // 标识是否第一次调用检查数据库 private AtomicBoolean first = new AtomicBoolean(true); private StoreServiceImpl storeService; public void setStoreService(StoreServiceImpl storeService) { this.storeService = storeService; } @Override protected int loadDataToCache(Cache cache) { List metaDbList = this.storeService.queryAllMetaDbs(); if(metaDbList != null){ StoreObject so = null; for(Sdc_meta_db m : metaDbList){ so = (StoreObject)cache.get(m.getStore_id()); if(so == null){ so = new StoreObject(); cache.put(m.getStore_id(), so); } so.addDb(createWrapAddress(m), m.getId()); } List metaTableList = this.storeService.queryAllMetaTables(); for(Sdc_meta_table m : metaTableList){ so = (StoreObject)cache.get(m.getStore_id()); so.addTable(m.getDb_id(), m.getTable_name()); } return metaDbList.size(); } return 0; } @Override public String getProviderName() { return "cache.gather.meta_data"; } @Override public Class getProviderType() { return StoreObject.class; } private Address createWrapAddress(Sdc_meta_db metaDb){ String[] hostInfo = metaDb.getHost_info().split(StringUtils.SEPARATOR_COLON); Address a = new Address(); a.setUrl(hostInfo[0]); a.setPort(Integer.parseInt(hostInfo[1])); a.setService(metaDb.getDatabase_name()); a.setUsing(metaDb.getIs_using() == 1); a.setAuthentication(metaDb.getUsername()); a.setCertification(metaDb.getPassword()); return a; } //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ //~ 以下为缓存特定方法。2022-09-20 //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ public void addAddress(String storeId, Address address, long metaDbId){ StoreObject so = this.getCacheData(storeId); if(so == null){ so = new StoreObject(); this.putCacheData(storeId, so); } so.addDb(address, metaDbId); } public void addTable(String storeId, Address address, String destTableName){ StoreObject so = this.getCacheData(storeId); so.addTable(address, destTableName); } /** * 根据地址返回该地址对应的metaDbId * @param storeId * @param address * @return */ public Long getMetaDbId(String storeId, Address address){ return this.getCacheData(storeId).getMetaDbId(address); } /** * 返回在存储系统中,给定主机上有几个数据库 * @param address * @return */ public int getDatabaseSize(Address address){ int i = 0; StoreObject so = null; for(Iterator it = this.getCache().getIterator(); it.hasNext();){ so = (StoreObject)it.next(); i = i + so.getDatabaseSize(address); } logger.debug("主机'" + address + "'中有数据库:" + i + "个"); return i; } /** * 返回给定的数据库中,元数据记录了有多少表 * @param address * @return */ public int getTableSize(Address address){ int i = 0; StoreObject so = null; Cachable cachable = null; for(Iterator it = this.getCache().getIterator(); it.hasNext();){ cachable = it.next(); if(cachable == null){ continue; } so = (StoreObject)cachable.getValue(); i = i + so.getTableSize(address); } logger.debug("数据库'" + address + "'中有表:" + i + "个"); return i; } /** * 返回当前存储正在使用的数据库信息 * @return */ public Address getUsingAddress(String storeId){ StoreObject so = this.getCacheData(storeId); if(so == null){ logger.debug("缓存中未找到存储对象'StoreObject',可能还没有创建,storeId = " + storeId); return null; } Address a = so.getUsingAddress(); logger.debug("存储正在使用的数据库:" + a + ", storeId = " + storeId); return a; } public Address getDatabaseAddress(String storeId, long metaDbId){ StoreObject so = this.getCacheData(storeId); if(so == null){ logger.debug("缓存中未找到存储对象'StoreObject',可能还没有创建,storeId = " + storeId); return null; } Address db = so.getDbCache(metaDbId); return db; } public boolean isExistDatabase(String storeId, Address address) { StoreObject so = this.getCacheData(storeId); if(so == null){ if(first.get()){ first.compareAndSet(true, false); return false; } else { throw new IllegalStateException("状态错误:缓存中未找到StoreObject,storeId = " + storeId); } } return so.isExistDatabase(address); } public boolean isExistTable(String storeId, Address address, String tableName) { StoreObject so = this.getCacheData(storeId); return so.isExistTable(storeId, address, tableName); } /** * 从缓存中删除一个表数据 * @param storeId * @param address * @param tableName */ public void removeTable(String storeId, Address address, String tableName){ StoreObject so = this.getCacheData(storeId); if(so == null){ logger.debug("缓存中未找到存储对象'StoreObject',可能还没有创建,storeId = " + storeId); return; } so.removeTable(address, tableName); } }