/*
 * Decompiled with CFR 0.152.
 */
package com.talpie.linker;

import com.talpie.linker.AES;
import com.talpie.linker.ClientService;
import com.talpie.linker.Message;
import com.talpie.linker.RSA;
import com.talpie.linker.StatiCom;
import com.talpie.linker.StreamFrame;
import java.io.EOFException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

public class StreamSocketClient {
    private final ClientService client;
    private final Socket socket;
    private final String streamId;
    private final ExecutorService rxExec = Executors.newSingleThreadExecutor();
    private final ExecutorService txExec = Executors.newSingleThreadExecutor();
    private volatile boolean running = false;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private OutputStream out;
    private InputStream in;
    private RSA rsa;
    private AES aes;
    private long rxFrames = 0L;
    private long rxBytes = 0L;
    private long txFrames = 0L;
    private long txBytes = 0L;
    private final BlockingQueue<StreamFrame> txQueue;

    public String getStreamId() {
        return this.streamId;
    }

    public StreamSocketClient(ClientService client, Socket socket, String streamId, int queueSize) {
        this.client = client;
        this.socket = socket;
        this.streamId = streamId;
        this.rsa = client.getRsa();
        this.aes = client.getAes();
        this.txQueue = new ArrayBlockingQueue<StreamFrame>(queueSize);
        try {
            socket.setTcpNoDelay(true);
            socket.setKeepAlive(true);
            socket.setReceiveBufferSize(0x100000);
            socket.setSendBufferSize(0x100000);
        }
        catch (Exception e) {
            client.getListenersHandlers().error(this, e);
        }
    }

    public void start() {
        try {
            this.out = this.socket.getOutputStream();
            this.in = this.socket.getInputStream();
            if (!this.handshake()) {
                this.stop();
                return;
            }
            this.running = true;
            this.client.getListenersHandlers().streamOpen(this, this, this.streamId);
            this.rxExec.submit(this::rxLoop);
            this.txExec.submit(this::txLoop);
        }
        catch (Exception e) {
            this.client.getListenersHandlers().error(this, e);
            this.stop();
        }
    }

    public void stop() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        this.running = false;
        try {
            this.txQueue.clear();
            this.rxExec.shutdownNow();
            this.txExec.shutdownNow();
            if (this.out != null) {
                this.out.close();
            }
            if (this.in != null) {
                this.in.close();
            }
            if (this.socket != null) {
                this.socket.close();
            }
            this.client.removeStream(this.streamId);
            this.client.getListenersHandlers().streamClose(this, this, this.streamId);
        }
        catch (Exception e) {
            this.client.getListenersHandlers().error(this, e);
        }
    }

    public boolean offerFrame(StreamFrame f) {
        if (this.txQueue.offer(f)) {
            return true;
        }
        this.txQueue.poll();
        return this.txQueue.offer(f);
    }

    private boolean handshake() {
        try {
            String init = "#_START-STREAM-SOCKET++" + this.client.getClientId() + this.streamId + this.rsa.sign(this.client.getClientId() + this.streamId);
            StatiCom.writeLine(this.out, init);
            String confirm = this.aes.decrypt(StatiCom.readLineString(this.in));
            return "#_STREAM-SOCKET-READY++".equals(confirm);
        }
        catch (Exception e) {
            this.client.getListenersHandlers().streamHandshakeFailed(this, this, this.streamId, e);
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rxLoop() {
        try {
            while (this.running) {
                String encHeaderStr = StatiCom.readLineString(this.in);
                byte[] encHeader = Base64.getDecoder().decode(encHeaderStr);
                byte[] headerPlain = this.aes.decrypt(encHeader, null);
                Message msg = Message.fromHeader(new String(headerPlain, StandardCharsets.UTF_8));
                int total = msg.getLength().intValueExact();
                byte[] encPayload = total > 0 ? this.readPayloadWithProgress(this.in, total, msg) : new byte[]{};
                byte[] plain = total > 0 ? this.aes.decrypt(encPayload, encHeader) : new byte[]{};
                ++this.rxFrames;
                long onWire = (long)encHeaderStr.length() + 1L + (long)encPayload.length;
                this.rxBytes += onWire;
                this.client.getListenersHandlers().streamProgressRx(this, this, this.rxFrames, this.rxBytes);
                msg.setPayload(plain);
                String route = msg.getRoute();
                if (route.equals("#_STREAM++/CLOSE") || route.equals("#_STREAM++/CLOSE-ACK")) {
                    this.stop();
                    break;
                }
                if (route.equals("#_STREAM++/META/" + this.streamId)) {
                    this.client.getListenersHandlers().streamMeta(this, this, this.streamId, plain);
                    continue;
                }
                if (route.equals("#_STREAM++/DATA/" + this.streamId)) {
                    StreamFrame frame = StreamSocketClient.decodeFrame(this.streamId, plain);
                    this.client.getListenersHandlers().streamFrame(this, this, frame);
                    continue;
                }
                this.client.getListenersHandlers().error(this, new IllegalStateException("Unexpected route: " + route));
            }
        }
        catch (Exception e) {
            this.client.getListenersHandlers().error(this, e);
        }
        finally {
            this.stop();
        }
    }

    private void txLoop() {
        try {
            while (this.running) {
                StreamFrame f = this.txQueue.take();
                Message m = new Message("#_STREAM++/DATA/" + this.streamId, StreamSocketClient.encodeFrame(f));
                long bytes = this.writeMessage(m);
                ++this.txFrames;
                this.txBytes += bytes;
                this.client.getListenersHandlers().streamProgressTx(this, this, this.txFrames, this.txBytes);
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            this.client.getListenersHandlers().error(this, e);
        }
    }

    private static byte[] encodeFrame(StreamFrame f) {
        byte flags = (byte)(f.key ? 1 : 0);
        byte[] buf = new byte[17 + f.payload.length];
        buf[0] = flags;
        ByteBuffer.wrap(buf, 1, 16).putLong(f.seq).putLong(f.tsNanos);
        System.arraycopy(f.payload, 0, buf, 17, f.payload.length);
        return buf;
    }

    private static StreamFrame decodeFrame(String id, byte[] b) {
        byte flags = b[0];
        long seq = ByteBuffer.wrap(b, 1, 8).getLong();
        long ts = ByteBuffer.wrap(b, 9, 8).getLong();
        byte[] pl = Arrays.copyOfRange(b, 17, b.length);
        return new StreamFrame(id, seq, ts, (flags & 1) == 1, pl);
    }

    private byte[] readPayloadWithProgress(InputStream in, int total, Message msg) throws Exception {
        int got;
        byte[] buf = new byte[total];
        for (int off = 0; off < total; off += got) {
            got = in.read(buf, off, Math.min(4096, total - off));
            if (got != -1) continue;
            throw new EOFException();
        }
        return buf;
    }

    private long writeMessage(Message msg) throws Exception {
        byte[] plain = msg.getPayload();
        int encLen = this.aes.encryptedLength(plain.length);
        byte[] saved = plain;
        msg.setPayload(new byte[encLen]);
        String headerStr = msg.getHeader();
        msg.setPayload(saved);
        String encHeaderStr = this.aes.encrypt(headerStr);
        StatiCom.writeLine(this.out, encHeaderStr);
        byte[] encHeader = Base64.getDecoder().decode(encHeaderStr);
        byte[] encPayload = this.aes.encrypt(plain, encHeader);
        msg.setPayload(encPayload);
        this.out.write(encPayload);
        this.out.flush();
        return (long)encHeaderStr.length() + 1L + (long)encPayload.length;
    }
}

