package com.hcl.products.test.it.kafka;

import com.ghc.a3.a3core.A3Message;
import com.ghc.a3.a3core.TransportListener;
import com.ghc.a3.ipsocket.server.ReferenceCountedTCPServer;
import com.ghc.a3.ipsocket.utils.SocketOptions;
import com.ghc.config.SimpleXMLConfig;
import com.ghc.ssl.SslSettings;
import com.hcl.products.test.it.kafka.response.KafkaResponseV9;
import com.hcl.products.test.it.kafka.response.KafkaResponseV9Below;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/hcl/products/test/it/kafka/KafkaStubServer.class */
public class KafkaStubServer extends ReferenceCountedTCPServer {
    static final Logger log = Logger.getLogger(KafkaStubServer.class.getName());
    private static final AtomicInteger id = new AtomicInteger(1);
    private final KafkaMessageNotifier proxiedMessageNotifier;
    private final String transportID;

    /* loaded from: input_file:com/hcl/products/test/it/kafka/KafkaStubServer$KafkaStubEventWorkerThread.class */
    private class KafkaStubEventWorkerThread extends Thread {
        private final Socket socket;

        public KafkaStubEventWorkerThread(Socket socket) {
            super("KafkaWorker-" + KafkaStubServer.id.getAndIncrement());
            this.socket = socket;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            KafkaStubServer.log.log(Level.FINER, "New connection accepted from proxy at {0}", this.socket.getInetAddress());
            try {
                DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(this.socket.getInputStream()));
                OutputStream outputStream = this.socket.getOutputStream();
                while (true) {
                    int readInt = readInt(dataInputStream);
                    if (readInt == -1) {
                        return;
                    }
                    KafkaStubServer.log.log(Level.FINEST, "Received length indication of {0}", Integer.valueOf(readInt));
                    byte[] bArr = new byte[readInt + 4];
                    bArr[0] = (byte) (readInt >> 24);
                    bArr[1] = (byte) (readInt >> 16);
                    bArr[2] = (byte) (readInt >> 8);
                    bArr[3] = (byte) readInt;
                    dataInputStream.readFully(bArr, 4, readInt);
                    Iterator<A3Message> it = KafkaA3MessageBuilder.fromMessageFrame(bArr).iterator();
                    while (it.hasNext()) {
                        KafkaStubServer.this.proxiedMessageNotifier.notifyKafkaMessage(it.next(), KafkaStubServer.this.transportID);
                    }
                    ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(bArr);
                    short s = wrappedBuffer.getShort(6);
                    outputStream.write((s < 9 ? new KafkaResponseV9Below() : new KafkaResponseV9()).createKafkaResponseBytes(wrappedBuffer, s));
                    outputStream.flush();
                    KafkaStubServer.log.log(Level.FINEST, "Written response");
                }
            } catch (Exception e) {
                KafkaStubServer.log.log(Level.SEVERE, "Exception in KafkaStubServer : ", (Throwable) e);
            }
        }

        private int readInt(DataInputStream dataInputStream) throws IOException {
            try {
                return dataInputStream.readInt();
            } catch (EOFException unused) {
                return -1;
            } catch (IOException e) {
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStubServer(int i, String str, String str2) {
        super(String.valueOf(i), new SimpleXMLConfig(), new SocketOptions.Builder().so_reuseaddr(i != 0).build(), str);
        this.proxiedMessageNotifier = new KafkaMessageNotifier();
        this.transportID = str2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStubServer(int i, String str, String str2, SslSettings sslSettings) {
        super(String.valueOf(i), sslSettings, new SocketOptions.Builder().so_reuseaddr(i != 0).build(), str);
        this.proxiedMessageNotifier = new KafkaMessageNotifier();
        this.transportID = str2;
    }

    public void requestAccepted(Socket socket) {
        new KafkaStubEventWorkerThread(socket).start();
    }

    public void addListener(TransportListener transportListener, String str) {
        this.proxiedMessageNotifier.addMessageListener(transportListener, str);
    }

    public void removeListener(TransportListener transportListener) {
        this.proxiedMessageNotifier.removeMessageListener(transportListener);
    }

    public int getPort() {
        return this.m_server.getLocalPort();
    }
}
