/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.IOpenConnectionsHandler;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.OpenConnectionRntbdRequestRecord;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestArgs;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RntbdOpenConnectionsHandler
implements IOpenConnectionsHandler {
    private static final Logger logger = LoggerFactory.getLogger(RntbdOpenConnectionsHandler.class);
    private final RntbdEndpoint.Provider endpointProvider;

    public RntbdOpenConnectionsHandler(RntbdEndpoint.Provider endpointProvider) {
        Preconditions.checkNotNull(endpointProvider, "Argument 'endpointProvider' can not be null");
        this.endpointProvider = endpointProvider;
    }

    @Override
    public Flux<OpenConnectionResponse> openConnections(String collectionRid, List<RntbdEndpoint> endpoints, int minConnectionsRequiredForEndpoint) {
        Preconditions.checkNotNull(endpoints, "Argument 'endpoints' should not be null");
        if (logger.isDebugEnabled()) {
            logger.debug("Open connections for endpoints {}", (Object)StringUtils.join(endpoints, ","));
        }
        return Flux.fromIterable(endpoints).flatMap(endpoint -> {
            Uri addressUri = endpoint.getAddressUri();
            RxDocumentServiceRequest openConnectionRequest = this.getOpenConnectionRequest(collectionRid, endpoint.serviceEndpoint());
            RntbdRequestArgs requestArgs = new RntbdRequestArgs(openConnectionRequest, addressUri);
            int connectionsOpened = endpoint.channelsMetrics();
            if (connectionsOpened < minConnectionsRequiredForEndpoint) {
                OpenConnectionRntbdRequestRecord requestRecord = endpoint.openConnection(requestArgs);
                return Mono.fromFuture((CompletableFuture)requestRecord).onErrorResume(throwable -> Mono.just((Object)new OpenConnectionResponse(addressUri, false, (Throwable)throwable, true, endpoint.channelsMetrics()))).doOnNext(response -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Connection result: isConnected [{}], address [{}]", (Object)response.isConnected(), (Object)response.getUri());
                    }
                });
            }
            return Mono.just((Object)new OpenConnectionResponse(addressUri, true, null, false, endpoint.channelsMetrics()));
        });
    }

    private RxDocumentServiceRequest getOpenConnectionRequest(String collectionRid, URI serviceEndpoint) {
        RxDocumentServiceRequest openConnectionRequest = RxDocumentServiceRequest.create(null, OperationType.Create, ResourceType.Connection);
        openConnectionRequest.requestContext.locationEndpointToRoute = serviceEndpoint;
        openConnectionRequest.requestContext.resolvedCollectionRid = collectionRid;
        openConnectionRequest.faultInjectionRequestContext.setLocationEndpointToRoute(serviceEndpoint);
        return openConnectionRequest;
    }
}

