package org.apache.zookeeper.server;

import com.ites.helper.common.constant.HelperConstant;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.cert.Certificate;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.command.CommandExecutor;
import org.apache.zookeeper.server.command.FourLetterCommands;
import org.apache.zookeeper.server.command.NopCommand;
import org.apache.zookeeper.server.command.SetTraceMaskCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/zookeeper-3.7.0.jar:org/apache/zookeeper/server/NettyServerCnxn.class */
public class NettyServerCnxn extends ServerCnxn {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) NettyServerCnxn.class);
    private final Channel channel;
    private CompositeByteBuf queuedBuffer;
    private final AtomicBoolean throttled;
    private ByteBuffer bb;
    private final ByteBuffer bbLen;
    private long sessionId;
    private int sessionTimeout;
    private Certificate[] clientChain;
    private volatile boolean closingChannel;
    private final NettyServerCnxnFactory factory;
    private boolean initialized;
    public int readIssuedAfterReadComplete;
    private volatile HandshakeState handshakeState;
    private final GenericFutureListener<Future<Void>> onSendBufferDoneListener;

    /* loaded from: input_file:BOOT-INF/lib/zookeeper-3.7.0.jar:org/apache/zookeeper/server/NettyServerCnxn$HandshakeState.class */
    public enum HandshakeState {
        NONE,
        STARTED,
        FINISHED
    }

    /* loaded from: input_file:BOOT-INF/lib/zookeeper-3.7.0.jar:org/apache/zookeeper/server/NettyServerCnxn$ReadEvent.class */
    enum ReadEvent {
        DISABLE,
        ENABLE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/zookeeper-3.7.0.jar:org/apache/zookeeper/server/NettyServerCnxn$SendBufferWriter.class */
    public class SendBufferWriter extends Writer {
        private StringBuffer sb;

        private SendBufferWriter() {
            this.sb = new StringBuffer();
        }

        private void checkFlush(boolean z) {
            if ((!z || this.sb.length() <= 0) && this.sb.length() <= 2048) {
                return;
            }
            NettyServerCnxn.this.sendBuffer(ByteBuffer.wrap(this.sb.toString().getBytes(StandardCharsets.UTF_8)));
            this.sb.setLength(0);
        }

        @Override // java.io.Writer, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.sb == null) {
                return;
            }
            checkFlush(true);
            this.sb = null;
        }

        @Override // java.io.Writer, java.io.Flushable
        public void flush() throws IOException {
            checkFlush(true);
        }

        @Override // java.io.Writer
        public void write(char[] cArr, int i, int i2) throws IOException {
            this.sb.append(cArr, i, i2);
            checkFlush(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyServerCnxn(Channel channel, ZooKeeperServer zooKeeperServer, NettyServerCnxnFactory nettyServerCnxnFactory) {
        super(zooKeeperServer);
        this.throttled = new AtomicBoolean(false);
        this.bbLen = ByteBuffer.allocate(4);
        this.handshakeState = HandshakeState.NONE;
        this.onSendBufferDoneListener = future -> {
            if (future.isSuccess()) {
                packetSent();
            }
        };
        this.channel = channel;
        this.closingChannel = false;
        this.factory = nettyServerCnxnFactory;
        if (this.factory.login != null) {
            this.zooKeeperSaslServer = new ZooKeeperSaslServer(nettyServerCnxnFactory.login);
        }
        addAuthInfo(new Id(HelperConstant.IP_KEY, ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress()));
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public void close(ServerCnxn.DisconnectReason disconnectReason) {
        this.disconnectReason = disconnectReason;
        close();
    }

    public void close() {
        this.closingChannel = true;
        LOG.debug("close called for session id: 0x{}", Long.toHexString(this.sessionId));
        setStale();
        this.factory.unregisterConnection(this);
        if (!this.factory.cnxns.remove(this)) {
            LOG.debug("cnxns size:{}", Integer.valueOf(this.factory.cnxns.size()));
            if (this.channel.isOpen()) {
                this.channel.close();
                return;
            }
            return;
        }
        LOG.debug("close in progress for session id: 0x{}", Long.toHexString(this.sessionId));
        this.factory.removeCnxnFromSessionMap(this);
        this.factory.removeCnxnFromIpMap(this, ((InetSocketAddress) this.channel.remoteAddress()).getAddress());
        if (this.zkServer != null) {
            this.zkServer.removeCnxn(this);
        }
        if (this.channel.isOpen()) {
            this.channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener2((GenericFutureListener<? extends Future<? super Void>>) new ChannelFutureListener() { // from class: org.apache.zookeeper.server.NettyServerCnxn.1
                @Override // io.netty.util.concurrent.GenericFutureListener
                public void operationComplete(ChannelFuture channelFuture) {
                    channelFuture.channel().close().addListener2(future -> {
                        NettyServerCnxn.this.releaseQueuedBuffer();
                    });
                }
            });
        } else {
            ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1L);
            this.channel.eventLoop().execute(this::releaseQueuedBuffer);
        }
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public long getSessionId() {
        return this.sessionId;
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public int getSessionTimeout() {
        return this.sessionTimeout;
    }

    @Override // org.apache.zookeeper.server.ServerCnxn, org.apache.zookeeper.Watcher
    public void process(WatchedEvent watchedEvent) {
        ReplyHeader replyHeader = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, 64L, "Deliver event " + watchedEvent + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);
        }
        try {
            ServerMetrics.getMetrics().WATCH_BYTES.add(sendResponse(replyHeader, watchedEvent.getWrapper(), "notification"));
        } catch (IOException e) {
            LOG.debug("Problem sending to {}", getRemoteSocketAddress(), e);
            close();
        }
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public int sendResponse(ReplyHeader replyHeader, Record record, String str, String str2, Stat stat, int i) throws IOException {
        if (this.closingChannel || !this.channel.isOpen()) {
            return 0;
        }
        ByteBuffer[] serialize = serialize(replyHeader, record, str, str2, stat, i);
        int i2 = serialize[0].getInt();
        serialize[0].rewind();
        sendBuffer(serialize);
        decrOutstandingAndCheckThrottle(replyHeader);
        return i2;
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public void setSessionId(long j) {
        this.sessionId = j;
        this.factory.addSession(j, this);
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public void sendBuffer(ByteBuffer... byteBufferArr) {
        if (byteBufferArr.length == 1 && byteBufferArr[0] == ServerCnxnFactory.closeConn) {
            close(ServerCnxn.DisconnectReason.CLIENT_CLOSED_CONNECTION);
        } else {
            this.channel.writeAndFlush(Unpooled.wrappedBuffer(byteBufferArr)).addListener2((GenericFutureListener<? extends Future<? super Void>>) this.onSendBufferDoneListener);
        }
    }

    private boolean checkFourLetterWord(Channel channel, ByteBuf byteBuf, int i) {
        if (!FourLetterCommands.isKnown(i)) {
            return false;
        }
        String commandString = FourLetterCommands.getCommandString(i);
        channel.config().setAutoRead(false);
        packetReceived(4L);
        PrintWriter printWriter = new PrintWriter(new BufferedWriter(new SendBufferWriter()));
        if (!FourLetterCommands.isEnabled(commandString)) {
            LOG.debug("Command {} is not executed because it is not in the whitelist.", commandString);
            new NopCommand(printWriter, this, commandString + " is not executed because it is not in the whitelist.").start();
            return true;
        }
        LOG.info("Processing {} command from {}", commandString, channel.remoteAddress());
        if (i != FourLetterCommands.setTraceMaskCmd) {
            return new CommandExecutor().execute(this, printWriter, i, this.zkServer, this.factory);
        }
        ByteBuffer allocate = ByteBuffer.allocate(8);
        byteBuf.readBytes(allocate);
        allocate.flip();
        long j = allocate.getLong();
        ZooTrace.setTextTraceLevel(j);
        new SetTraceMaskCommand(printWriter, this, j).start();
        return true;
    }

    private void checkIsInEventLoop(String str) {
        if (!this.channel.eventLoop().inEventLoop()) {
            throw new IllegalStateException(str + "() called from non-EventLoop thread");
        }
    }

    private void appendToQueuedBuffer(ByteBuf byteBuf) {
        checkIsInEventLoop("appendToQueuedBuffer");
        if (this.queuedBuffer.numComponents() == this.queuedBuffer.maxNumComponents()) {
            this.queuedBuffer.consolidate();
        }
        this.queuedBuffer.addComponent(true, byteBuf);
        ServerMetrics.getMetrics().NETTY_QUEUED_BUFFER.add(this.queuedBuffer.capacity());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processMessage(ByteBuf byteBuf) {
        checkIsInEventLoop("processMessage");
        LOG.debug("0x{} queuedBuffer: {}", Long.toHexString(this.sessionId), this.queuedBuffer);
        if (LOG.isTraceEnabled()) {
            LOG.trace("0x{} buf {}", Long.toHexString(this.sessionId), ByteBufUtil.hexDump(byteBuf));
        }
        if (this.throttled.get()) {
            LOG.debug("Received message while throttled");
            if (this.queuedBuffer == null) {
                LOG.debug("allocating queue");
                this.queuedBuffer = this.channel.alloc().compositeBuffer();
            }
            appendToQueuedBuffer(byteBuf.retainedDuplicate());
            if (LOG.isTraceEnabled()) {
                LOG.trace("0x{} queuedBuffer {}", Long.toHexString(this.sessionId), ByteBufUtil.hexDump(this.queuedBuffer));
                return;
            }
            return;
        }
        LOG.debug("not throttled");
        if (this.queuedBuffer != null) {
            appendToQueuedBuffer(byteBuf.retainedDuplicate());
            processQueuedBuffer();
            return;
        }
        receiveMessage(byteBuf);
        if (this.closingChannel || !byteBuf.isReadable()) {
            return;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Before copy {}", byteBuf);
        }
        if (this.queuedBuffer == null) {
            this.queuedBuffer = this.channel.alloc().compositeBuffer();
        }
        appendToQueuedBuffer(byteBuf.retainedSlice(byteBuf.readerIndex(), byteBuf.readableBytes()));
        if (LOG.isTraceEnabled()) {
            LOG.trace("Copy is {}", this.queuedBuffer);
            LOG.trace("0x{} queuedBuffer {}", Long.toHexString(this.sessionId), ByteBufUtil.hexDump(this.queuedBuffer));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processQueuedBuffer() {
        checkIsInEventLoop("processQueuedBuffer");
        if (this.queuedBuffer == null) {
            LOG.debug("queue empty");
            return;
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("processing queue 0x{} queuedBuffer {}", Long.toHexString(this.sessionId), ByteBufUtil.hexDump(this.queuedBuffer));
        }
        receiveMessage(this.queuedBuffer);
        if (this.closingChannel) {
            LOG.debug("Processed queue - channel closed, dropping remaining bytes");
        } else if (this.queuedBuffer.isReadable()) {
            LOG.debug("Processed queue - bytes remaining");
            this.queuedBuffer.discardReadComponents();
        } else {
            LOG.debug("Processed queue - no bytes remaining");
            releaseQueuedBuffer();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void releaseQueuedBuffer() {
        checkIsInEventLoop("releaseQueuedBuffer");
        if (this.queuedBuffer != null) {
            this.queuedBuffer.release();
            this.queuedBuffer = null;
        }
    }

    private void receiveMessage(ByteBuf byteBuf) {
        checkIsInEventLoop("receiveMessage");
        while (byteBuf.isReadable() && !this.throttled.get()) {
            try {
                if (this.bb != null) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable {} bb len {} {}", Integer.valueOf(byteBuf.readableBytes()), Integer.valueOf(this.bb.remaining()), this.bb);
                        ByteBuffer duplicate = this.bb.duplicate();
                        duplicate.flip();
                        LOG.trace("0x{} bb {}", Long.toHexString(this.sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(duplicate)));
                    }
                    if (this.bb.remaining() > byteBuf.readableBytes()) {
                        this.bb.limit(this.bb.position() + byteBuf.readableBytes());
                    }
                    byteBuf.readBytes(this.bb);
                    this.bb.limit(this.bb.capacity());
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("after readBytes message readable {} bb len {} {}", Integer.valueOf(byteBuf.readableBytes()), Integer.valueOf(this.bb.remaining()), this.bb);
                        ByteBuffer duplicate2 = this.bb.duplicate();
                        duplicate2.flip();
                        LOG.trace("after readbytes 0x{} bb {}", Long.toHexString(this.sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(duplicate2)));
                    }
                    if (this.bb.remaining() == 0) {
                        this.bb.flip();
                        packetReceived(4 + this.bb.remaining());
                        ZooKeeperServer zooKeeperServer = this.zkServer;
                        if (zooKeeperServer == null || !zooKeeperServer.isRunning()) {
                            throw new IOException("ZK down");
                        }
                        if (this.initialized) {
                            zooKeeperServer.processPacket(this, this.bb);
                        } else {
                            LOG.debug("got conn req request from {}", getRemoteSocketAddress());
                            zooKeeperServer.processConnectRequest(this, this.bb);
                            this.initialized = true;
                        }
                        this.bb = null;
                    } else {
                        continue;
                    }
                } else {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable {} bblenrem {}", Integer.valueOf(byteBuf.readableBytes()), Integer.valueOf(this.bbLen.remaining()));
                        ByteBuffer duplicate3 = this.bbLen.duplicate();
                        duplicate3.flip();
                        LOG.trace("0x{} bbLen {}", Long.toHexString(this.sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(duplicate3)));
                    }
                    if (byteBuf.readableBytes() < this.bbLen.remaining()) {
                        this.bbLen.limit(this.bbLen.position() + byteBuf.readableBytes());
                    }
                    byteBuf.readBytes(this.bbLen);
                    this.bbLen.limit(this.bbLen.capacity());
                    if (this.bbLen.remaining() == 0) {
                        this.bbLen.flip();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("0x{} bbLen {}", Long.toHexString(this.sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(this.bbLen)));
                        }
                        int i = this.bbLen.getInt();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("0x{} bbLen len is {}", Long.toHexString(this.sessionId), Integer.valueOf(i));
                        }
                        this.bbLen.clear();
                        if (!this.initialized && checkFourLetterWord(this.channel, byteBuf, i)) {
                            return;
                        }
                        if (i < 0 || i > BinaryInputArchive.maxBuffer) {
                            throw new IOException("Len error " + i);
                        }
                        this.zkServer.checkRequestSizeWhenReceivingMessage(i);
                        this.bb = ByteBuffer.allocate(i);
                    } else {
                        continue;
                    }
                }
            } catch (IOException e) {
                LOG.warn("Closing connection to {}", getRemoteSocketAddress(), e);
                close(ServerCnxn.DisconnectReason.IO_EXCEPTION);
                return;
            } catch (ClientCnxnLimitException e2) {
                ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1L);
                LOG.debug("Closing connection to {}", getRemoteSocketAddress(), e2);
                close(ServerCnxn.DisconnectReason.CLIENT_RATE_LIMIT);
                return;
            }
        }
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public void disableRecv(boolean z) {
        if (this.throttled.compareAndSet(false, true)) {
            LOG.debug("Throttling - disabling recv {}", this);
            this.channel.pipeline().fireUserEventTriggered((Object) ReadEvent.DISABLE);
        }
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public void enableRecv() {
        if (this.throttled.compareAndSet(true, false)) {
            LOG.debug("Sending unthrottle event {}", this);
            this.channel.pipeline().fireUserEventTriggered((Object) ReadEvent.ENABLE);
        }
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public void setSessionTimeout(int i) {
        this.sessionTimeout = i;
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public int getInterestOps() {
        if (this.channel == null || !this.channel.isOpen()) {
            return 0;
        }
        int i = 0;
        if (!this.throttled.get()) {
            i = 0 | 1;
        }
        if (!this.channel.isWritable()) {
            i |= 4;
        }
        return i;
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public InetSocketAddress getRemoteSocketAddress() {
        return (InetSocketAddress) this.channel.remoteAddress();
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public void sendCloseSession() {
        sendBuffer(ServerCnxnFactory.closeConn);
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    protected ServerStats serverStats() {
        if (this.zkServer == null) {
            return null;
        }
        return this.zkServer.serverStats();
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public boolean isSecure() {
        return this.factory.secure;
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public Certificate[] getClientCertificateChain() {
        if (this.clientChain == null) {
            return null;
        }
        return (Certificate[]) Arrays.copyOf(this.clientChain, this.clientChain.length);
    }

    @Override // org.apache.zookeeper.server.ServerCnxn
    public void setClientCertificateChain(Certificate[] certificateArr) {
        if (certificateArr == null) {
            this.clientChain = null;
        } else {
            this.clientChain = (Certificate[]) Arrays.copyOf(certificateArr, certificateArr.length);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Channel getChannel() {
        return this.channel;
    }

    public int getQueuedReadableBytes() {
        checkIsInEventLoop("getQueuedReadableBytes");
        if (this.queuedBuffer != null) {
            return this.queuedBuffer.readableBytes();
        }
        return 0;
    }

    public void setHandshakeState(HandshakeState handshakeState) {
        this.handshakeState = handshakeState;
    }

    public HandshakeState getHandshakeState() {
        return this.handshakeState;
    }
}
