shikeyin
2024-01-11 65da8373531677b1c37a98f53eaa30c892f35e5a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package com.iplatform.gather.support;
 
import com.iplatform.gather.cache.LocalMetaDataCacheProvider;
import com.iplatform.gather.service.StoreServiceImpl;
import com.iplatform.model.po.S_host;
import com.iplatform.model.po.Sdc_meta_db;
import com.iplatform.model.po.Sdc_meta_table;
import com.walker.cache.Cachable;
import com.walker.cache.CacheProvider;
import com.walker.connector.Address;
import com.walker.db.DatabaseException;
import com.walker.db.TableInfo;
import com.walker.db.page.GenericPager;
import com.walker.infrastructure.utils.NumberGenerator;
import com.walker.store.AbstractMetaDataEngine;
 
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
public class DefaultMetaDataEngine extends AbstractMetaDataEngine {
 
    private StoreServiceImpl storeService;
    private LocalMetaDataCacheProvider metaDataCacheProvider;
    private CacheProvider<S_host> hostCacheProvider;
 
    private Map<Integer, List<String>> cachedFields = new ConcurrentHashMap<Integer, List<String>>();
 
    // 缓存的字段集合最大值,超过此值系统会清除缓存重新慢慢积累
    private final int maxCacheFields = 1024;
 
    public void setStoreService(StoreServiceImpl storeService) {
        this.storeService = storeService;
    }
 
    public void setMetaDataCacheProvider(LocalMetaDataCacheProvider metaDataCacheProvider) {
        this.metaDataCacheProvider = metaDataCacheProvider;
    }
 
    public void setHostCacheProvider(CacheProvider<S_host> hostCacheProvider) {
        this.hostCacheProvider = hostCacheProvider;
    }
 
 
    @Override
    public Address getUsingAddress(String storeId) {
        return this.metaDataCacheProvider.getUsingAddress(storeId);
    }
 
    @Override
    public void saveNewAddress(String storeId, Address address) throws DatabaseException {
        Sdc_meta_db metaDb = new Sdc_meta_db();
        metaDb.setId(NumberGenerator.getSequenceNumber());
        metaDb.setCreate_time(System.currentTimeMillis());
        metaDb.setDatabase_name(address.getServiceName());
        metaDb.setHost_info(address.getUrl() + ":" + address.getPort());
        metaDb.setStore_id(storeId);
        metaDb.setSummary(TIP_ADD_METADB);
        metaDb.setTable_count(0);
        metaDb.setUsed(1);
        this.storeService.execSaveMetaDb(metaDb);
        logger.info("引擎添加元数据,metaDB = " + metaDb);
 
        // 更新缓存
        this.metaDataCacheProvider.addAddress(storeId, address, metaDb.getId());
    }
 
    @Override
    public void saveNewTable(String storeId, Address addr, String destTableName) throws DatabaseException {
        Long metaDbId = this.metaDataCacheProvider.getMetaDbId(storeId, addr);
        if(metaDbId == null){
            throw new IllegalArgumentException("缓存错误:metaDbId不存在,address = " + addr);
        }
        Sdc_meta_table mt = new Sdc_meta_table();
        mt.setCreate_time(System.currentTimeMillis());
        mt.setDb_id(metaDbId);
        mt.setStore_id(storeId);
        mt.setSummary(TIP_ADD_METATABLE);
        mt.setTable_name(destTableName);
        logger.info("引擎添加元数据,metaTable = " + mt);
        this.storeService.execSaveMetaTable(mt);
 
        // 更新缓存
        this.metaDataCacheProvider.addTable(storeId, addr, destTableName);
    }
 
    @Override
    public List<String> getFields(Address address, String tableName) {
        int hashCode = address.hashCode() + tableName.hashCode();
        List<String> fields = cachedFields.get(hashCode);
        if(fields == null){
//            logger.debug("getFields, address = " + address + ", tableName = " + tableName);
            fields = this.getDatabaseMetaEngine().getFields(address, tableName);
            if(fields == null){
                throw new IllegalStateException("元数据引擎错误:查找数据库未找到表字段信息。 address = " + address + ", tableName = " + tableName);
            }
            if(cachedFields.size() >= maxCacheFields){
                cachedFields.clear();
            }
            cachedFields.put(hashCode, fields);
        }
        return fields;
    }
 
    @Override
    public int getTableSize(Address address) {
        return this.metaDataCacheProvider.getTableSize(address);
    }
 
    @Override
    public int getDatabaseSize(Address address) {
        return this.metaDataCacheProvider.getDatabaseSize(address);
    }
 
    @Override
    public Map<String, TableInfo> getTableRows(String storeId, long metaDbId, List<String> tableNameList) {
        Address address = this.metaDataCacheProvider.getDatabaseAddress(storeId, metaDbId);
        if(address == null){
            logger.debug("或者从数据库中直接查询host主机信息,去掉异常");
            throw new IllegalStateException("dcMetaCacheProvider未找到缓存的数据库地址信息。storeId: " + storeId + ", metaDbId: " + metaDbId);
        }
        this.combinAddressAuthentication(address);
        return this.getDatabaseMetaEngine().getTableRows(address, tableNameList);
    }
 
    /**
     * 设置数据库的用户名、密码信息
     * @param address
     */
    private void combinAddressAuthentication(Address address){
//        Iterator<Cachable> it = hostCacheProvider.getCache().getIterator();
        S_host host = null;
        Cachable cache = null;
        for(Iterator<Cachable> it = hostCacheProvider.getCache().getIterator(); it.hasNext();){
            cache = it.next();
            if(cache != null){
                host = (S_host)cache.getValue();
                if(address.getUrl().equals(host.getUrl())
                        && address.getPort() == host.getPort()
//                        && address.getServiceName().equals(host.getServiceName())
                ){
                    // 把数据库服务器主机的用户名、密码设置给address对象
                    address.setAuthentication(host.getAuthentication());
                    address.setCertification(host.getCertification());
                    logger.debug(".........设置了address用户信息:" + host.getAuthentication());
                }
            }
        }
    }
 
    @Override
    public boolean isExistDatabase(String storeId, Address address) {
        boolean result = this.metaDataCacheProvider.isExistDatabase(storeId, address);
//        logger.debug("是否存在数据库'" + address + ", storeId = " + storeId);
        return result;
    }
 
    @Override
    public boolean isExistTable(String storeId, Address address, String tableName) {
        boolean result = this.metaDataCacheProvider.isExistTable(storeId, address, tableName);
//        logger.debug("是否存在表'" + tableName + ", storeId = " + storeId + ", address = " + address);
        return result;
    }
 
    private final String TIP_ADD_METADB = "系统自动创建数据库并添加元数据";
    private final String TIP_ADD_METATABLE = "系统自动创建表添加元数据";
 
 
    /**
     * 分页获取采集元数据表信息,该方法从: StoreServiceImpl中迁移到该方法,避免循环依赖。
     * @param storeId
     * @param metaDbId
     * @return
     * @date 2022-09-21
     */
    public GenericPager<Sdc_meta_table> queryPageMetaTables(String storeId, long metaDbId) {
        // 查出来本页所有表中的数据量大小
        GenericPager<Sdc_meta_table> result = this.storeService.queryPageMetaTables(storeId);
        List<Sdc_meta_table> datas = result.getDatas();
 
        // 获取得到的数据表名字集合,为下面查询记录总数准备
        List<String> tableNameList = new ArrayList<String>();
        for (Sdc_meta_table mt : datas) {
            tableNameList.add(mt.getTable_name());
        }
 
        // 把找到的每个表的记录数设置到属性中
        // Address address = metaCache.getDatabaseAddress(storeId, metaDbId);
        TableInfo ti = null;
        try {
            Map<String, TableInfo> map = this.getTableRows(storeId, metaDbId, tableNameList);
            for (Sdc_meta_table mt : datas) {
                ti = map.get(mt.getTable_name());
                if (ti == null) {
                    mt.setRow_count((long)-1);
                } else {
                    mt.setRow_count(ti.getRows());
                }
            }
        } catch (Exception ex) {
            logger.error("获取表结构元数据出现错误,可能物理表不存在", ex);
            for (Sdc_meta_table mt : datas) {
                mt.setRow_count((long)-1);
            }
        }
        return result;
    }
}