/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataAggregator;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.FlushSignal;
import com.azure.messaging.eventhubs.implementation.UncheckedExecutionException;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
import java.io.Closeable;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

class EventHubBufferedPartitionProducer
implements Closeable {
    private final ClientLogger logger;
    private final AmqpRetryOptions retryOptions;
    private final EventHubProducerAsyncClient client;
    private final String partitionId;
    private final AmqpErrorContext errorContext;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final Disposable publishSubscription;
    private final Sinks.Many<EventData> eventSink;
    private final CreateBatchOptions createBatchOptions;
    private final PublishResultSubscriber publishResultSubscriber;
    private final EventHubsTracer tracer;
    private final EventDataAggregator eventDataAggregator;

    EventHubBufferedPartitionProducer(EventHubProducerAsyncClient client, String partitionId, EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions options, AmqpRetryOptions retryOptions, Sinks.Many<EventData> eventSink, Tracer tracer) {
        HashMap<String, String> logContext = new HashMap<String, String>();
        logContext.put("partitionId", partitionId);
        this.logger = new ClientLogger(EventHubBufferedPartitionProducer.class, logContext);
        this.client = client;
        this.partitionId = partitionId;
        this.errorContext = new AmqpErrorContext(client.getFullyQualifiedNamespace());
        this.createBatchOptions = new CreateBatchOptions().setPartitionId(partitionId);
        this.retryOptions = retryOptions;
        this.eventSink = eventSink;
        this.eventDataAggregator = new EventDataAggregator((Flux<? extends EventData>)this.eventSink.asFlux(), this::createNewBatch, client.getFullyQualifiedNamespace(), options, partitionId);
        this.publishResultSubscriber = new PublishResultSubscriber(partitionId, this.eventSink, options.getSendSucceededContext(), options.getSendFailedContext(), retryOptions.getTryTimeout(), this.logger);
        this.publishSubscription = (Disposable)this.publishEvents((Flux<EventDataBatch>)this.eventDataAggregator).publishOn(Schedulers.boundedElastic(), 1).subscribeWith((Subscriber)this.publishResultSubscriber);
        this.tracer = new EventHubsTracer(tracer, client.getFullyQualifiedNamespace(), client.getEventHubName(), null);
    }

    Mono<Void> enqueueEvent(EventData eventData) {
        Mono enqueueOperation = Mono.create(sink -> {
            if (this.isClosed.get()) {
                sink.error((Throwable)new IllegalStateException(String.format("Partition publisher id[%s] is already closed. Cannot enqueue more events.", this.partitionId)));
                return;
            }
            TimeoutException e = this.publishResultSubscriber.awaitPendingFlush();
            if (e != null) {
                sink.error((Throwable)e);
                return;
            }
            if (this.isClosed.get()) {
                sink.error((Throwable)new IllegalStateException(String.format("Partition publisher id[%s] was closed between flushing events and now. Cannot enqueue events.", this.partitionId)));
                return;
            }
            this.tracer.reportMessageSpan(eventData, eventData.getContext());
            Sinks.EmitResult emitResult = this.eventSink.tryEmitNext((Object)eventData);
            if (emitResult.isSuccess()) {
                sink.success();
                return;
            }
            if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED || emitResult == Sinks.EmitResult.FAIL_OVERFLOW) {
                this.logger.atInfo().addKeyValue("emitResult", (Object)emitResult).log("Event could not be published downstream. Applying retry.");
                sink.error((Throwable)new AmqpException(true, emitResult + " occurred.", this.errorContext));
            } else {
                this.logger.atWarning().addKeyValue("emitResult", (Object)emitResult).log("Event could not be published downstream. Not retrying.", new Object[]{emitResult});
                sink.error((Throwable)new AmqpException(false, "Unable to buffer message for partition: " + this.getPartitionId(), this.errorContext));
            }
        });
        return RetryUtil.withRetry((Mono)enqueueOperation, (AmqpRetryOptions)this.retryOptions, (String)"Timed out trying to enqueue event data.", (boolean)true).onErrorMap(IllegalStateException.class, error -> new AmqpException(true, "Retries exhausted.", (Throwable)error, this.errorContext));
    }

    String getPartitionId() {
        return this.partitionId;
    }

    int getBufferedEventCount() {
        int value = (Integer)this.eventSink.scanOrDefault(Scannable.Attr.BUFFERED, (Object)0);
        int currentBatch = this.eventDataAggregator.getNumberOfEvents();
        return value + currentBatch;
    }

    Mono<Void> flush() {
        return this.publishResultSubscriber.startFlush();
    }

    @Override
    public void close() {
        if (this.isClosed.getAndSet(true)) {
            return;
        }
        try {
            this.publishResultSubscriber.startFlush().block(this.retryOptions.getTryTimeout());
        }
        catch (IllegalStateException e) {
            this.logger.info("Timed out waiting for flush to complete.", new Object[]{e});
        }
        finally {
            this.publishSubscription.dispose();
            this.client.close();
        }
    }

    private Flux<PublishResult> publishEvents(Flux<EventDataBatch> upstream) {
        return upstream.flatMap(batch -> {
            if (batch == EventDataBatch.EMPTY) {
                return Mono.just((Object)PublishResult.EMPTY);
            }
            return this.client.send((EventDataBatch)batch).thenReturn((Object)new PublishResult(batch.getEvents(), null)).onErrorResume(error -> Mono.just((Object)new PublishResult(batch.getEvents(), (Throwable)error)));
        }, 1, 1);
    }

    private EventDataBatch createNewBatch() {
        Mono<EventDataBatch> batch = this.client.createBatch(this.createBatchOptions);
        try {
            return (EventDataBatch)batch.toFuture().get();
        }
        catch (InterruptedException e) {
            throw this.logger.logExceptionAsError((RuntimeException)new UncheckedExecutionException(e));
        }
        catch (ExecutionException e) {
            throw this.logger.logExceptionAsError((RuntimeException)new UncheckedExecutionException(e));
        }
    }

    private static class PublishResultSubscriber
    extends BaseSubscriber<PublishResult> {
        private final String partitionId;
        private final Sinks.Many<EventData> eventSink;
        private final Consumer<SendBatchSucceededContext> onSucceed;
        private final Consumer<SendBatchFailedContext> onFailed;
        private final Duration operationTimeout;
        private final ClientLogger logger;
        private final AtomicReference<FlushSignal> pendingFlushSignal = new AtomicReference<Object>(null);
        private final Semaphore flushSemaphore = new Semaphore(1);
        private final AtomicBoolean terminated = new AtomicBoolean(false);

        PublishResultSubscriber(String partitionId, Sinks.Many<EventData> eventSink, Consumer<SendBatchSucceededContext> onSucceed, Consumer<SendBatchFailedContext> onFailed, Duration operationTimeout, ClientLogger logger) {
            this.partitionId = partitionId;
            this.eventSink = eventSink;
            this.onSucceed = onSucceed;
            this.onFailed = onFailed;
            this.operationTimeout = operationTimeout;
            this.logger = logger;
        }

        protected void hookOnSubscribe(Subscription s) {
            this.requestUnbounded();
        }

        protected void hookOnNext(PublishResult result) {
            if (result != PublishResult.EMPTY) {
                if (result.error == null) {
                    this.onSucceed.accept(new SendBatchSucceededContext(result.events, this.partitionId));
                } else {
                    this.onFailed.accept(new SendBatchFailedContext(result.events, this.partitionId, result.error));
                }
            }
            FlushCompletionOrigin origin = result == PublishResult.EMPTY ? FlushCompletionOrigin.ON_NEXT_EMPTY : FlushCompletionOrigin.ON_NEXT;
            this.tryCompleteFlush(origin);
        }

        protected void hookOnError(Throwable t) {
            if (this.terminated.getAndSet(true)) {
                return;
            }
            this.logger.atError().log("Publishing-subscription terminated with an error.", new Object[]{t});
            this.onFailed.accept(new SendBatchFailedContext(null, this.partitionId, t));
            this.tryCompleteFlush(FlushCompletionOrigin.TERMINAL_ERROR);
        }

        protected void hookOnComplete() {
            if (this.terminated.getAndSet(true)) {
                return;
            }
            this.logger.atInfo().log("Publishing-subscription terminated.");
            this.tryCompleteFlush(FlushCompletionOrigin.TERMINAL_COMPLETION);
        }

        TimeoutException awaitPendingFlush() {
            boolean acquired;
            if (this.pendingFlushSignal.get() == null) {
                return null;
            }
            try {
                acquired = this.flushSemaphore.tryAcquire(this.operationTimeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                return new TimeoutException("Unable to acquire flush-semaphore due to interrupted exception.");
            }
            if (!acquired) {
                return new TimeoutException("Timed out waiting for flush operation to complete.");
            }
            this.flushSemaphore.release();
            return null;
        }

        Mono<Void> startFlush() {
            return Mono.create(sink -> {
                boolean acquired;
                if (this.terminated.get()) {
                    this.logger.atInfo().log("Nothing to flush as publishing-subscription is terminated.");
                    sink.success();
                    return;
                }
                FlushSignal flushSignal = new FlushSignal((MonoSink<Void>)sink);
                if (!this.pendingFlushSignal.compareAndSet(null, flushSignal)) {
                    this.logger.atInfo().log("Another flush operation is already in progress.");
                    sink.success();
                    return;
                }
                try {
                    acquired = this.flushSemaphore.tryAcquire(this.operationTimeout.toMillis(), TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    this.pendingFlushSignal.set(null);
                    this.logger.atWarning().log("Unable to acquire flush-semaphore.");
                    sink.error((Throwable)e);
                    return;
                }
                if (!acquired) {
                    this.pendingFlushSignal.set(null);
                    sink.error((Throwable)new TimeoutException("timout waiting for acquiring flush-semaphore."));
                    return;
                }
                this.logger.atVerbose().addKeyValue("signal-id", flushSignal.getId()).addKeyValue("permits", (long)this.flushSemaphore.availablePermits()).log("Enqueuing flush.");
                Sinks.EmitResult emitResult = this.eventSink.tryEmitNext((Object)flushSignal);
                if (emitResult != Sinks.EmitResult.OK) {
                    this.pendingFlushSignal.set(null);
                    this.flushSemaphore.release();
                    sink.error((Throwable)new RuntimeException("Unable to enqueue flush: id" + flushSignal.getId() + " (" + emitResult + ")"));
                    return;
                }
            });
        }

        private void tryCompleteFlush(FlushCompletionOrigin origin) {
            FlushSignal flushSignal = this.pendingFlushSignal.getAndSet(null);
            if (flushSignal != null) {
                this.logger.atVerbose().addKeyValue("signal-id", flushSignal.getId()).addKeyValue("permits", (long)this.flushSemaphore.availablePermits()).addKeyValue("completion-origin", (Object)origin).log("Completing flush.");
                this.flushSemaphore.release();
                flushSignal.flushed();
            }
        }

        private static enum FlushCompletionOrigin {
            TERMINAL_COMPLETION,
            TERMINAL_ERROR,
            ON_NEXT_EMPTY,
            ON_NEXT;

        }
    }

    private static class PublishResult {
        private static final PublishResult EMPTY = new PublishResult();
        private final List<EventData> events;
        private final Throwable error;

        PublishResult(List<EventData> events, Throwable error) {
            this.events = Objects.requireNonNull(events);
            this.error = error;
        }

        private PublishResult() {
            this.events = null;
            this.error = null;
        }
    }
}

