package org.apache.seata.discovery.registry.consul;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.health.HealthServicesRequest;
import com.ecwid.consul.v1.health.model.HealthService;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.config.exception.ConfigNotFoundException;
import org.apache.seata.discovery.registry.RegistryHeartBeats;
import org.apache.seata.discovery.registry.RegistryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.class */
public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener> {
    private static volatile ConsulRegistryServiceImpl instance;
    private static volatile ConsulClient client;
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsulRegistryServiceImpl.class);
    private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
    private static final String FILE_ROOT_REGISTRY = "registry";
    private static final String FILE_CONFIG_SPLIT_CHAR = ".";
    private static final String REGISTRY_TYPE = "consul";
    private static final String SERVER_ADDR_KEY = "serverAddr";
    private static final String REGISTRY_CLUSTER = "cluster";
    private static final String DEFAULT_CLUSTER_NAME = "default";
    private static final String SERVICE_TAG = "services";
    private static final String ACL_TOKEN = "aclToken";
    private static final String FILE_CONFIG_KEY_PREFIX = "registry.consul.";
    private static final int THREAD_POOL_NUM = 1;
    private static final int MAP_INITIAL_CAPACITY = 8;
    private String transactionServiceGroup;
    private static final String DEFAULT_CHECK_INTERVAL = "10s";
    private static final String DEFAULT_CHECK_TIMEOUT = "1s";
    private static final String DEFAULT_DEREGISTER_TIME = "20s";
    private static final int DEFAULT_WATCH_TIMEOUT = 60;
    private ConcurrentMap<String, List<InetSocketAddress>> clusterAddressMap = new ConcurrentHashMap(MAP_INITIAL_CAPACITY);
    private ConcurrentMap<String, Set<ConsulListener>> listenerMap = new ConcurrentHashMap(MAP_INITIAL_CAPACITY);
    private ConcurrentMap<String, ConsulNotifier> notifiers = new ConcurrentHashMap(MAP_INITIAL_CAPACITY);
    private ExecutorService notifierExecutor = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM, 2147483647L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new NamedThreadFactory("services-consul-notifier", THREAD_POOL_NUM));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl$ConsulNotifier.class */
    public class ConsulNotifier implements Runnable {
        private String cluster;
        private long consulIndex;
        private boolean hasError = false;
        private boolean running = true;

        ConsulNotifier(String str, long j) {
            this.cluster = str;
            this.consulIndex = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    processService();
                } catch (Exception e) {
                    this.hasError = true;
                    ConsulRegistryServiceImpl.LOGGER.error("consul refresh services error:{}", e.getMessage());
                }
            }
        }

        private void processService() {
            Response healthyServices = ConsulRegistryServiceImpl.this.getHealthyServices(this.cluster, this.consulIndex, 60L);
            Long consulIndex = healthyServices.getConsulIndex();
            if ((consulIndex == null || consulIndex.longValue() <= this.consulIndex) && !this.hasError) {
                return;
            }
            this.hasError = false;
            List<HealthService> list = (List) healthyServices.getValue();
            this.consulIndex = consulIndex.longValue();
            Iterator it = ((Set) ConsulRegistryServiceImpl.this.listenerMap.get(this.cluster)).iterator();
            while (it.hasNext()) {
                ((ConsulListener) it.next()).onEvent(list);
            }
        }

        void stop() {
            this.running = false;
        }
    }

    private ConsulRegistryServiceImpl() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ConsulRegistryServiceImpl getInstance() {
        if (instance == null) {
            synchronized (ConsulRegistryServiceImpl.class) {
                if (instance == null) {
                    instance = new ConsulRegistryServiceImpl();
                }
            }
        }
        return instance;
    }

    public void register(InetSocketAddress inetSocketAddress) throws Exception {
        NetUtil.validAddress(inetSocketAddress);
        doRegister(inetSocketAddress);
        RegistryHeartBeats.addHeartBeat(REGISTRY_TYPE, inetSocketAddress, this::doRegister);
    }

    private void doRegister(InetSocketAddress inetSocketAddress) {
        getConsulClient().agentServiceRegister(createService(inetSocketAddress), getAclToken());
    }

    public void unregister(InetSocketAddress inetSocketAddress) throws Exception {
        NetUtil.validAddress(inetSocketAddress);
        getConsulClient().agentServiceDeregister(createServiceId(inetSocketAddress), getAclToken());
    }

    public void subscribe(String str, ConsulListener consulListener) throws Exception {
        this.listenerMap.computeIfAbsent(str, str2 -> {
            return new HashSet();
        }).add(consulListener);
        Long consulIndex = getHealthyServices(str, -1L, 60L).getConsulIndex();
        this.notifierExecutor.submit(this.notifiers.computeIfAbsent(str, str3 -> {
            return new ConsulNotifier(str, consulIndex.longValue());
        }));
    }

    public void unsubscribe(String str, ConsulListener consulListener) throws Exception {
        this.notifiers.remove(str).stop();
    }

    public List<InetSocketAddress> lookup(String str) throws Exception {
        this.transactionServiceGroup = str;
        String serviceGroup = getServiceGroup(str);
        if (serviceGroup == null) {
            throw new ConfigNotFoundException("%s configuration item is required", new String[]{"service.vgroupMapping." + str});
        }
        return lookupByCluster(serviceGroup);
    }

    private List<InetSocketAddress> lookupByCluster(String str) throws Exception {
        if (!this.listenerMap.containsKey(str)) {
            refreshCluster(str);
            subscribe(str, list -> {
                refreshCluster(str, list);
            });
        }
        return this.clusterAddressMap.get(str);
    }

    private ConsulClient getConsulClient() {
        if (client == null) {
            synchronized (ConsulRegistryServiceImpl.class) {
                if (client == null) {
                    InetSocketAddress inetSocketAddress = NetUtil.toInetSocketAddress(FILE_CONFIG.getConfig("registry.consul.serverAddr"));
                    client = new ConsulClient(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
                }
            }
        }
        return client;
    }

    private String getClusterName() {
        return FILE_CONFIG.getConfig(String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_CLUSTER), DEFAULT_CLUSTER_NAME);
    }

    private String createServiceId(InetSocketAddress inetSocketAddress) {
        return getClusterName() + "-" + NetUtil.toStringAddress(inetSocketAddress);
    }

    private static String getAclToken() {
        String property = StringUtils.isNotBlank(System.getProperty(ACL_TOKEN)) ? System.getProperty(ACL_TOKEN) : FILE_CONFIG.getConfig(String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, ACL_TOKEN));
        if (StringUtils.isNotBlank(property)) {
            return property;
        }
        return null;
    }

    private NewService createService(InetSocketAddress inetSocketAddress) {
        NewService newService = new NewService();
        newService.setId(createServiceId(inetSocketAddress));
        newService.setName(getClusterName());
        newService.setTags(Collections.singletonList(SERVICE_TAG));
        newService.setPort(Integer.valueOf(inetSocketAddress.getPort()));
        newService.setAddress(NetUtil.toIpAddress(inetSocketAddress));
        newService.setCheck(createCheck(inetSocketAddress));
        return newService;
    }

    private NewService.Check createCheck(InetSocketAddress inetSocketAddress) {
        NewService.Check check = new NewService.Check();
        check.setTcp(NetUtil.toStringAddress(inetSocketAddress));
        check.setInterval(DEFAULT_CHECK_INTERVAL);
        check.setTimeout(DEFAULT_CHECK_TIMEOUT);
        check.setDeregisterCriticalServiceAfter(DEFAULT_DEREGISTER_TIME);
        return check;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Response<List<HealthService>> getHealthyServices(String str, long j, long j2) {
        return getConsulClient().getHealthServices(str, HealthServicesRequest.newBuilder().setTag(SERVICE_TAG).setQueryParams(new QueryParams(j2, j)).setPassing(true).setToken(getAclToken()).build());
    }

    private void refreshCluster(String str) {
        Response<List<HealthService>> healthyServices;
        if (StringUtils.isBlank(str) || (healthyServices = getHealthyServices(str, -1L, -1L)) == null) {
            return;
        }
        refreshCluster(str, (List) healthyServices.getValue());
    }

    private void refreshCluster(String str, List<HealthService> list) {
        if (str == null || list == null) {
            return;
        }
        List<InetSocketAddress> list2 = (List) list.stream().map((v0) -> {
            return v0.getService();
        }).map(service -> {
            return new InetSocketAddress(service.getAddress(), service.getPort().intValue());
        }).collect(Collectors.toList());
        this.clusterAddressMap.put(str, list2);
        removeOfflineAddressesIfNecessary(this.transactionServiceGroup, str, list2);
    }

    public void close() throws Exception {
        client = null;
    }
}
