/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.AmqpLinkProvider;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ConsumerFactory;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ProtonSessionWrapper;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.DeliverySettleMode;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.EventHubSession;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.time.Duration;
import java.util.HashMap;
import java.util.Locale;
import java.util.Objects;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnknownDescribedType;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import reactor.core.publisher.Mono;

class EventHubReactorSession
extends ReactorSession
implements EventHubSession {
    private static final Symbol EPOCH = Symbol.valueOf((String)"com.microsoft:epoch");
    private static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME = Symbol.valueOf((String)"com.microsoft:enable-receiver-runtime-metric");
    private static final ClientLogger LOGGER = new ClientLogger(EventHubReactorSession.class);
    private final boolean isV2;

    EventHubReactorSession(AmqpConnection amqpConnection, ProtonSessionWrapper session, ReactorHandlerProvider handlerProvider, AmqpLinkProvider linkProvider, Mono<ClaimsBasedSecurityNode> cbsNodeSupplier, TokenManagerProvider tokenManagerProvider, AmqpRetryOptions retryOptions, MessageSerializer messageSerializer, boolean isV2) {
        super(amqpConnection, session, handlerProvider, linkProvider, cbsNodeSupplier, tokenManagerProvider, messageSerializer, retryOptions);
        this.isV2 = isV2;
    }

    @Override
    public Mono<AmqpSendLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retryPolicy, String clientIdentifier) {
        Objects.requireNonNull(linkName, "'linkName' cannot be null.");
        Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        Objects.requireNonNull(timeout, "'timeout' cannot be null.");
        Objects.requireNonNull(clientIdentifier, "'clientIdentifier' cannot be null.");
        HashMap<Symbol, String> properties = new HashMap<Symbol, String>();
        properties.put(AmqpConstants.CLIENT_IDENTIFIER, clientIdentifier);
        return this.createProducer(linkName, entityPath, timeout, retryPolicy, properties).cast(AmqpSendLink.class);
    }

    @Override
    public Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry, EventPosition eventPosition, ReceiveOptions options, String clientIdentifier) {
        Symbol[] symbolArray;
        Objects.requireNonNull(linkName, "'linkName' cannot be null.");
        Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        Objects.requireNonNull(timeout, "'timeout' cannot be null.");
        Objects.requireNonNull(retry, "'retry' cannot be null.");
        Objects.requireNonNull(eventPosition, "'eventPosition' cannot be null.");
        Objects.requireNonNull(options, "'options' cannot be null.");
        Objects.requireNonNull(clientIdentifier, "'clientIdentifier' cannot be null.");
        String eventPositionExpression = this.getExpression(eventPosition);
        HashMap<Symbol, UnknownDescribedType> filter = new HashMap<Symbol, UnknownDescribedType>();
        filter.put(AmqpConstants.STRING_FILTER, new UnknownDescribedType((Object)AmqpConstants.STRING_FILTER, (Object)eventPositionExpression));
        HashMap<Symbol, Object> properties = new HashMap<Symbol, Object>();
        if (options.getOwnerLevel() != null) {
            properties.put(EPOCH, options.getOwnerLevel());
        }
        properties.put(AmqpConstants.CLIENT_RECEIVER_IDENTIFIER, clientIdentifier);
        if (options.getTrackLastEnqueuedEventProperties()) {
            Symbol[] symbolArray2 = new Symbol[1];
            symbolArray = symbolArray2;
            symbolArray2[0] = ENABLE_RECEIVER_RUNTIME_METRIC_NAME;
        } else {
            symbolArray = null;
        }
        Symbol[] desiredCapabilities = symbolArray;
        ConsumerFactory consumerFactory = this.isV2 ? new ConsumerFactory(DeliverySettleMode.ACCEPT_AND_SETTLE_ON_DELIVERY, false) : new ConsumerFactory();
        return this.createConsumer(linkName, entityPath, timeout, retry, filter, properties, desiredCapabilities, SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND, consumerFactory);
    }

    private String getExpression(EventPosition eventPosition) {
        String isInclusiveFlag;
        String string = isInclusiveFlag = eventPosition.isInclusive() ? "=" : "";
        if (eventPosition.getOffset() != null) {
            return String.format("amqp.annotation.%s >%s '%s'", AmqpMessageConstant.OFFSET_ANNOTATION_NAME.getValue(), isInclusiveFlag, eventPosition.getOffset());
        }
        if (eventPosition.getSequenceNumber() != null) {
            return String.format("amqp.annotation.%s >%s '%s'", AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), isInclusiveFlag, eventPosition.getSequenceNumber());
        }
        if (eventPosition.getEnqueuedDateTime() != null) {
            String ms;
            try {
                ms = Long.toString(eventPosition.getEnqueuedDateTime().toEpochMilli());
            }
            catch (ArithmeticException ex) {
                throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException(String.format(Locale.ROOT, "Event position for enqueued DateTime could not be parsed. Value: '%s'", eventPosition.getEnqueuedDateTime()), ex));
            }
            return String.format("amqp.annotation.%s >%s '%s'", AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue(), isInclusiveFlag, ms);
        }
        throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("No starting position was set."));
    }
}

