/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.implementation.instrumentation;

import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.util.Context;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.tracing.SpanKind;
import com.azure.core.util.tracing.StartSpanOptions;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.implementation.MessageUtils;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsMetricsProvider;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationScope;
import com.azure.messaging.eventhubs.implementation.instrumentation.OperationName;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import java.time.Instant;
import java.util.function.BiConsumer;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class EventHubsConsumerInstrumentation {
    private static final Symbol ENQUEUED_TIME_UTC_ANNOTATION_NAME_SYMBOL = Symbol.valueOf((String)AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue());
    private static final InstrumentationScope NOOP_SCOPE = new InstrumentationScope(null, null, null);
    private final EventHubsTracer tracer;
    private final EventHubsMetricsProvider meter;
    private final boolean isSync;

    public EventHubsConsumerInstrumentation(Tracer tracer, Meter meter, String fullyQualifiedName, String entityName, String consumerGroup, boolean isSyncConsumer) {
        this.tracer = new EventHubsTracer(tracer, fullyQualifiedName, entityName, consumerGroup);
        this.meter = new EventHubsMetricsProvider(meter, fullyQualifiedName, entityName, consumerGroup);
        this.isSync = isSyncConsumer;
    }

    public EventHubsTracer getTracer() {
        return this.tracer;
    }

    public InstrumentationScope createScope(BiConsumer<EventHubsMetricsProvider, InstrumentationScope> reportMetricsCallback) {
        return this.isEnabled() ? new InstrumentationScope(this.tracer, this.meter, reportMetricsCallback) : NOOP_SCOPE;
    }

    public InstrumentationScope startAsyncConsume(Message message, String partitionId) {
        if (!this.isEnabled()) {
            return NOOP_SCOPE;
        }
        InstrumentationScope scope = this.createScope((m, s) -> {
            if (!this.isSync) {
                m.reportProcess(1, partitionId, (InstrumentationScope)s);
            }
        });
        Instant enqueuedTime = MessageUtils.getEnqueuedTime(message.getMessageAnnotations().getValue(), ENQUEUED_TIME_UTC_ANNOTATION_NAME_SYMBOL);
        if (!this.isSync) {
            ApplicationProperties properties = message.getApplicationProperties();
            scope.setSpan(this.tracer.startProcessSpan(properties == null ? null : properties.getValue(), enqueuedTime, partitionId)).makeSpanCurrent();
        }
        if (enqueuedTime != null) {
            this.meter.reportLag(enqueuedTime, partitionId, scope);
        }
        return scope;
    }

    public Flux<PartitionEvent> syncReceive(Flux<PartitionEvent> events, String partitionId) {
        if (!this.isEnabled()) {
            return events;
        }
        StartSpanOptions startOptions = this.tracer.isEnabled() ? this.tracer.createStartOptions(SpanKind.CLIENT, OperationName.RECEIVE, partitionId) : null;
        Integer[] receivedCount = new Integer[]{0};
        return Flux.using(() -> {
            if (startOptions != null) {
                startOptions.setStartTimestamp(Instant.now());
            }
            return this.createScope((m, s) -> this.meter.reportReceive(receivedCount[0], partitionId, (InstrumentationScope)s));
        }, scope -> events.doOnNext(partitionEvent -> {
            if (startOptions != null) {
                receivedCount[0] = receivedCount[0] + 1;
                EventData data = partitionEvent.getData();
                startOptions.addLink(this.tracer.createLink(data.getProperties(), data.getEnqueuedTime()));
            }
        }).doOnError(scope::setError).doOnCancel(scope::setCancelled), scope -> {
            if (startOptions != null) {
                startOptions.setAttribute("messaging.batch.message_count", (Object)receivedCount[0]);
                startOptions.setAttribute("messaging.destination.partition.id", (Object)partitionId);
                scope.setSpan(this.tracer.startSpan(OperationName.RECEIVE, startOptions, Context.NONE));
            }
            scope.close();
        });
    }

    public InstrumentationScope startProcess(EventBatchContext batchContext) {
        if (batchContext.getEvents().isEmpty() || !this.isEnabled()) {
            return NOOP_SCOPE;
        }
        InstrumentationScope scope = this.createScope((m, s) -> m.reportProcess(batchContext.getEvents().size(), batchContext.getPartitionContext().getPartitionId(), (InstrumentationScope)s));
        return scope.setSpan(this.tracer.startProcessSpan(batchContext)).makeSpanCurrent();
    }

    public InstrumentationScope startProcess(EventContext eventContext) {
        EventData event = eventContext.getEventData();
        if (event == null || !this.isEnabled()) {
            return NOOP_SCOPE;
        }
        InstrumentationScope scope = this.createScope((m, s) -> m.reportProcess(1, eventContext.getPartitionContext().getPartitionId(), (InstrumentationScope)s));
        Context span = this.tracer.startProcessSpan(event.getProperties(), event.getEnqueuedTime(), eventContext.getPartitionContext().getPartitionId());
        return scope.setSpan(span).makeSpanCurrent();
    }

    public <T> Mono<T> instrumentMono(Mono<T> publisher, OperationName operationName, String partitionId) {
        if (!this.isEnabled()) {
            return publisher;
        }
        return Mono.using(() -> this.createScope((m, s) -> m.reportGenericOperationDuration(operationName, partitionId, (InstrumentationScope)s)).setSpan(this.tracer.startGenericOperationSpan(operationName, partitionId, Context.NONE)), scope -> publisher.doOnError(scope::setError).doOnCancel(scope::setCancelled).contextWrite(c -> c.put((Object)"trace-context", (Object)scope.getSpan())), InstrumentationScope::close);
    }

    public boolean isEnabled() {
        return this.tracer.isEnabled() || this.meter.isEnabled();
    }
}

