package tech.powerjob.worker.core.tracker.manager; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import tech.powerjob.worker.core.tracker.processor.ProcessorTracker; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Supplier; /** * 持有 Processor 对象 * instanceId -> Processor * * @author tjq * @since 2020/3/20 */ public class ProcessorTrackerManager { /** * instanceId -> (TaskTrackerAddress -> ProcessorTracker) * 处理脑裂情况下同一个 Instance 存在多个 TaskTracker 的情况 */ private static final Map> PROCESSOR_TRACKER_CONTAINER = Maps.newHashMap(); /** * 获取 ProcessorTracker,如果不存在则创建 */ public static synchronized ProcessorTracker getProcessorTracker(Long instanceId, String address, Supplier creator) { ProcessorTracker processorTracker = PROCESSOR_TRACKER_CONTAINER.getOrDefault(instanceId, Collections.emptyMap()).get(address); if (processorTracker == null) { processorTracker = creator.get(); PROCESSOR_TRACKER_CONTAINER.computeIfAbsent(instanceId, ignore -> Maps.newHashMap()).put(address, processorTracker); } return processorTracker; } public static synchronized List removeProcessorTracker(Long instanceId) { List res = Lists.newLinkedList(); Map ttAddress2Pt = PROCESSOR_TRACKER_CONTAINER.remove(instanceId); if (ttAddress2Pt != null) { res.addAll(ttAddress2Pt.values()); ttAddress2Pt.clear(); } return res; } }