package org.springframework.integration.channel;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.context.Lifecycle;
import org.springframework.core.log.LogMessage;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.util.Assert;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.ContextView;

/* loaded from: input_file:org/springframework/integration/channel/FluxMessageChannel.class */
public class FluxMessageChannel extends AbstractMessageChannel implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel, Lifecycle {
    private final Sinks.Many<Message<?>> sink = Sinks.many().multicast().onBackpressureBuffer(1, false);
    private final List<Publisher<? extends Message<?>>> sourcePublishers = new ArrayList();
    private volatile Disposable.Composite upstreamSubscriptions = Disposables.composite();
    private volatile boolean active = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.springframework.integration.channel.FluxMessageChannel$1, reason: invalid class name */
    /* loaded from: input_file:org/springframework/integration/channel/FluxMessageChannel$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$reactor$core$publisher$Sinks$EmitResult = new int[Sinks.EmitResult.values().length];

        static {
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.OK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.FAIL_NON_SERIALIZED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.FAIL_OVERFLOW.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.FAIL_TERMINATED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$reactor$core$publisher$Sinks$EmitResult[Sinks.EmitResult.FAIL_CANCELLED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel
    protected boolean doSend(Message<?> message, long j) {
        Assert.state(this.active && this.sink.currentSubscriberCount() > 0, () -> {
            return "The [" + String.valueOf(this) + "] doesn't have subscribers to accept messages";
        });
        long j2 = 0;
        if (j > 0) {
            j2 = j;
        }
        long nanos = TimeUnit.MILLISECONDS.toNanos(10L);
        while (this.active && !tryEmitMessage(message)) {
            j2 -= 10;
            if (j >= 0 && j2 <= 0) {
                return false;
            }
            LockSupport.parkNanos(nanos);
        }
        return true;
    }

    private boolean tryEmitMessage(Message<?> message) {
        Message<?> message2 = message;
        ContextView captureReactorContext = IntegrationReactiveUtils.captureReactorContext();
        if (!captureReactorContext.isEmpty()) {
            message2 = MutableMessageBuilder.fromMessage(message).setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, captureReactorContext).build();
        }
        if (!this.active) {
            return false;
        }
        switch (AnonymousClass1.$SwitchMap$reactor$core$publisher$Sinks$EmitResult[this.sink.tryEmitNext(message2).ordinal()]) {
            case 1:
                return true;
            case 2:
            case 3:
                return false;
            case 4:
                throw new IllegalStateException("The [" + String.valueOf(this) + "] doesn't have subscribers to accept messages");
            case 5:
            case 6:
                throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink: " + String.valueOf(this.sink));
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    public void subscribe(Subscriber<? super Message<?>> subscriber) {
        this.sink.asFlux().publish(1).refCount().subscribe(subscriber);
    }

    public void start() {
        this.active = true;
        this.upstreamSubscriptions = Disposables.composite();
        this.sourcePublishers.forEach(this::doSubscribeTo);
    }

    public void stop() {
        this.active = false;
        this.upstreamSubscriptions.dispose();
    }

    public boolean isRunning() {
        return this.active;
    }

    private void disposeUpstreamSubscription(AtomicReference<Disposable> atomicReference) {
        Disposable disposable = atomicReference.get();
        if (disposable != null) {
            this.upstreamSubscriptions.remove(disposable);
            disposable.dispose();
        }
    }

    @Override // org.springframework.integration.channel.ReactiveStreamsSubscribableChannel
    public void subscribeTo(Publisher<? extends Message<?>> publisher) {
        this.sourcePublishers.add(publisher);
        doSubscribeTo(publisher);
    }

    private void doSubscribeTo(Publisher<? extends Message<?>> publisher) {
        Flux doOnComplete = Flux.from(publisher).doOnComplete(() -> {
            this.sourcePublishers.remove(publisher);
        });
        Sinks.Many<Message<?>> many = this.sink;
        Objects.requireNonNull(many);
        addPublisherToSubscribe(doOnComplete.delaySubscription(Mono.fromCallable(many::currentSubscriberCount).filter(num -> {
            return num.intValue() > 0;
        }).repeatWhenEmpty(flux -> {
            return this.active ? flux.delayElements(Duration.ofMillis(100L)) : flux;
        })).flatMap(message -> {
            return Mono.just(message).handle((message, synchronousSink) -> {
                sendReactiveMessage(message);
            }).contextWrite(StaticMessageHeaderAccessor.getReactorContext(message));
        }).contextCapture());
    }

    private void addPublisherToSubscribe(Flux<?> flux) {
        AtomicReference atomicReference = new AtomicReference();
        Disposable subscribe = flux.doOnTerminate(() -> {
            disposeUpstreamSubscription(atomicReference);
        }).subscribe();
        if (subscribe.isDisposed() || !this.upstreamSubscriptions.add(subscribe)) {
            return;
        }
        atomicReference.set(subscribe);
    }

    private void sendReactiveMessage(Message<?> message) {
        Message<?> message2 = message;
        if (message2.getHeaders().containsKey(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT)) {
            message2 = MutableMessageBuilder.fromMessage(message).removeHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT).build();
        }
        try {
            if (!send(message2)) {
                this.logger.warn(new MessageDeliveryException(message2, "Failed to send message to channel '" + String.valueOf(this)), "Message was not delivered");
            }
        } catch (Exception e) {
            this.logger.error(e, LogMessage.format("Error during processing event: %s", message2));
        }
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.integration.support.management.IntegrationManagement
    public void destroy() {
        this.active = false;
        this.upstreamSubscriptions.dispose();
        this.sourcePublishers.clear();
        this.sink.emitComplete(Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(1L)));
        super.destroy();
    }
}
