/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.implementation.instrumentation;

import com.azure.core.util.Context;
import com.azure.core.util.tracing.SpanKind;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsConsumerInstrumentation;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentationScope;
import com.azure.messaging.eventhubs.implementation.instrumentation.OperationName;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class InstrumentedCheckpointStore
implements CheckpointStore {
    private final CheckpointStore checkpointStore;
    private final EventHubsConsumerInstrumentation instrumentation;
    private final EventHubsTracer tracer;

    private InstrumentedCheckpointStore(CheckpointStore checkpointStore, EventHubsConsumerInstrumentation instrumentation) {
        this.checkpointStore = checkpointStore;
        this.instrumentation = instrumentation;
        this.tracer = instrumentation.getTracer();
    }

    public static CheckpointStore create(CheckpointStore checkpointStore, EventHubsConsumerInstrumentation instrumentation) {
        if (!instrumentation.isEnabled()) {
            return checkpointStore;
        }
        return new InstrumentedCheckpointStore(checkpointStore, instrumentation);
    }

    @Override
    public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        return this.checkpointStore.listOwnership(fullyQualifiedNamespace, eventHubName, consumerGroup);
    }

    @Override
    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
        return this.checkpointStore.claimOwnership(requestedPartitionOwnerships);
    }

    @Override
    public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        return this.checkpointStore.listCheckpoints(fullyQualifiedNamespace, eventHubName, consumerGroup);
    }

    @Override
    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
        return Mono.using(() -> this.instrumentation.createScope((m, s) -> m.reportCheckpoint(checkpoint, (InstrumentationScope)s)).setSpan(this.startSpan(checkpoint.getPartitionId())), scope -> this.checkpointStore.updateCheckpoint(checkpoint).doOnError(scope::setError).doOnCancel(scope::setCancelled).contextWrite(ctx -> ctx.putAllMap(scope.getSpan().getValues())), InstrumentationScope::close);
    }

    private Context startSpan(String partitionId) {
        return this.tracer.isEnabled() ? this.tracer.startSpan(OperationName.CHECKPOINT, this.tracer.createStartOptions(SpanKind.INTERNAL, OperationName.CHECKPOINT, partitionId), Context.NONE) : Context.NONE;
    }
}

