shikeyin
2024-01-11 65da8373531677b1c37a98f53eaa30c892f35e5a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package com.iplatform.gather.cache;
 
import com.iplatform.gather.service.StoreServiceImpl;
import com.iplatform.model.po.Sdc_meta_db;
import com.iplatform.model.po.Sdc_meta_table;
import com.walker.cache.AbstractCacheProvider;
import com.walker.cache.Cachable;
import com.walker.cache.Cache;
import com.walker.connector.Address;
import com.walker.infrastructure.utils.StringUtils;
 
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
 
/**
 * 由于采集元数据缓存较复杂,所以暂时不实现:Redis方式。
 * @author 时克英
 * @date 2022-09-20
 */
public class LocalMetaDataCacheProvider extends AbstractCacheProvider<StoreObject> {
 
    // 标识是否第一次调用检查数据库
    private AtomicBoolean first = new AtomicBoolean(true);
 
    private StoreServiceImpl storeService;
 
    public void setStoreService(StoreServiceImpl storeService) {
        this.storeService = storeService;
    }
 
    @Override
    protected int loadDataToCache(Cache cache) {
        List<Sdc_meta_db> metaDbList = this.storeService.queryAllMetaDbs();
        if(metaDbList != null){
            StoreObject so = null;
            for(Sdc_meta_db m : metaDbList){
                so = (StoreObject)cache.get(m.getStore_id());
                if(so == null){
                    so = new StoreObject();
                    cache.put(m.getStore_id(), so);
                }
                so.addDb(createWrapAddress(m), m.getId());
            }
 
            List<Sdc_meta_table> metaTableList = this.storeService.queryAllMetaTables();
            for(Sdc_meta_table m : metaTableList){
                so = (StoreObject)cache.get(m.getStore_id());
                so.addTable(m.getDb_id(), m.getTable_name());
            }
            return metaDbList.size();
        }
        return 0;
    }
 
    @Override
    public String getProviderName() {
        return "cache.gather.meta_data";
    }
 
    @Override
    public Class<?> getProviderType() {
        return StoreObject.class;
    }
 
    private Address createWrapAddress(Sdc_meta_db metaDb){
        String[] hostInfo = metaDb.getHost_info().split(StringUtils.SEPARATOR_COLON);
        Address a = new Address();
        a.setUrl(hostInfo[0]);
        a.setPort(Integer.parseInt(hostInfo[1]));
        a.setService(metaDb.getDatabase_name());
        a.setUsing(metaDb.getIs_using() == 1);
        a.setAuthentication(metaDb.getUsername());
        a.setCertification(metaDb.getPassword());
        return a;
    }
 
    //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    //~ 以下为缓存特定方法。2022-09-20
    //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
    public void addAddress(String storeId, Address address, long metaDbId){
        StoreObject so = this.getCacheData(storeId);
        if(so == null){
            so = new StoreObject();
            this.putCacheData(storeId, so);
        }
        so.addDb(address, metaDbId);
    }
 
    public void addTable(String storeId, Address address, String destTableName){
        StoreObject so = this.getCacheData(storeId);
        so.addTable(address, destTableName);
    }
 
    /**
     * 根据地址返回该地址对应的metaDbId
     * @param storeId
     * @param address
     * @return
     */
    public Long getMetaDbId(String storeId, Address address){
        return this.getCacheData(storeId).getMetaDbId(address);
    }
 
    /**
     * 返回在存储系统中,给定主机上有几个数据库
     * @param address
     * @return
     */
    public int getDatabaseSize(Address address){
        int i = 0;
        StoreObject so = null;
        for(Iterator<Cachable> it = this.getCache().getIterator(); it.hasNext();){
            so = (StoreObject)it.next();
            i = i + so.getDatabaseSize(address);
        }
        logger.debug("主机'" + address + "'中有数据库:" + i + "个");
        return i;
    }
 
    /**
     * 返回给定的数据库中,元数据记录了有多少表
     * @param address
     * @return
     */
    public int getTableSize(Address address){
        int i = 0;
        StoreObject so = null;
        Cachable cachable = null;
        for(Iterator<Cachable> it = this.getCache().getIterator(); it.hasNext();){
            cachable = it.next();
            if(cachable == null){
                continue;
            }
            so = (StoreObject)cachable.getValue();
            i = i + so.getTableSize(address);
        }
        logger.debug("数据库'" + address + "'中有表:" + i + "个");
        return i;
    }
 
    /**
     * 返回当前存储正在使用的数据库信息
     * @return
     */
    public Address getUsingAddress(String storeId){
        StoreObject so = this.getCacheData(storeId);
        if(so == null){
            logger.debug("缓存中未找到存储对象'StoreObject',可能还没有创建,storeId = " + storeId);
            return null;
        }
        Address a = so.getUsingAddress();
        logger.debug("存储正在使用的数据库:" + a + ", storeId = " + storeId);
        return a;
    }
 
    public Address getDatabaseAddress(String storeId, long metaDbId){
        StoreObject so = this.getCacheData(storeId);
        if(so == null){
            logger.debug("缓存中未找到存储对象'StoreObject',可能还没有创建,storeId = " + storeId);
            return null;
        }
        Address db = so.getDbCache(metaDbId);
        return db;
    }
 
    public boolean isExistDatabase(String storeId, Address address) {
        StoreObject so = this.getCacheData(storeId);
        if(so == null){
            if(first.get()){
                first.compareAndSet(true, false);
                return false;
            } else {
                throw new IllegalStateException("状态错误:缓存中未找到StoreObject,storeId = " + storeId);
            }
        }
        return so.isExistDatabase(address);
    }
 
    public boolean isExistTable(String storeId, Address address, String tableName) {
        StoreObject so = this.getCacheData(storeId);
        return so.isExistTable(storeId, address, tableName);
    }
 
    /**
     * 从缓存中删除一个表数据
     * @param storeId
     * @param address
     * @param tableName
     */
    public void removeTable(String storeId, Address address, String tableName){
        StoreObject so = this.getCacheData(storeId);
        if(so == null){
            logger.debug("缓存中未找到存储对象'StoreObject',可能还没有创建,storeId = " + storeId);
            return;
        }
        so.removeTable(address, tableName);
    }
}