/*
 * Decompiled with CFR 0.152.
 */
package com.talpie.linker;

import com.talpie.linker.AES;
import com.talpie.linker.ClientListeners;
import com.talpie.linker.ClientListenersHandler;
import com.talpie.linker.ClientStatsListener;
import com.talpie.linker.Config;
import com.talpie.linker.DataSocketClient;
import com.talpie.linker.Message;
import com.talpie.linker.RSA;
import com.talpie.linker.Service;
import com.talpie.linker.StatiCom;
import com.talpie.linker.StreamFrame;
import com.talpie.linker.StreamSocketClient;
import com.talpie.linker.StreamStatsListener;
import com.talpie.linker.SystemInfo;
import java.io.EOFException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.Base64;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;

public class ClientService
implements Service {
    private final Config config;
    private SystemInfo systemInfo = new SystemInfo();
    private final ExecutorService socketExecutor = Executors.newSingleThreadExecutor();
    private volatile boolean running = false;
    private RSA rsa;
    private AES aes;
    private Socket socket;
    private String clientId;
    private String serverMachineId;
    private OutputStream out;
    private InputStream in;
    private final ConcurrentHashMap<String, CompletableFuture<Message>> pending = new ConcurrentHashMap();
    private static final ScheduledExecutorService SCHED = Executors.newSingleThreadScheduledExecutor();
    private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
    private final ConcurrentHashMap<String, DataSocketClient> dataSockets = new ConcurrentHashMap();
    private final ClientListenersHandler listenersHandlers = new ClientListenersHandler();
    private final ClientStatsListener stats = new ClientStatsListener();
    private final ExecutorService pingExecutor = Executors.newSingleThreadExecutor();
    private volatile int pingMisses = 0;
    private volatile long lastRttMs = -1L;
    private final StreamStatsListener streamStats = new StreamStatsListener();
    private final ConcurrentHashMap<String, StreamSocketClient> streams = new ConcurrentHashMap();

    public Config getConfig() {
        return this.config;
    }

    public boolean isRunning() {
        return this.running;
    }

    public SystemInfo getSystemInfo() {
        return this.systemInfo;
    }

    public RSA getRsa() {
        return this.rsa;
    }

    public AES getAes() {
        return this.aes;
    }

    public String getClientId() {
        return this.clientId;
    }

    public ClientListenersHandler getListenersHandlers() {
        return this.listenersHandlers;
    }

    public String getServerMachineId() {
        return this.serverMachineId;
    }

    public ClientStatsListener getStats() {
        return this.stats;
    }

    public StreamStatsListener getStreamStats() {
        return this.streamStats;
    }

    public StreamSocketClient getStream(String streamId) {
        return this.streams.get(streamId);
    }

    void removeStream(String streamId) {
        this.streams.remove(streamId);
    }

    public ClientService(String hostname, int port) {
        this.config = new Config(hostname, port);
        this.listenersHandlers.register(this.stats);
        this.listenersHandlers.register(this.stats);
        this.listenersHandlers.register(this.stats);
        this.listenersHandlers.register(this.streamStats);
        this.listenersHandlers.register(this.streamStats);
        this.listenersHandlers.register(this.streamStats);
    }

    @Override
    public void start() {
        this.running = true;
        try {
            this.rsa = new RSA();
            this.aes = new AES();
            this.socket = new Socket(this.config.getHostname(), this.config.getPort());
            this.out = this.socket.getOutputStream();
            this.in = this.socket.getInputStream();
            this.socket.setReceiveBufferSize(0x100000);
            this.socket.setSendBufferSize(0x100000);
            this.getListenersHandlers().start(this);
            this.getListenersHandlers().connected(this);
            if (this.handshake()) {
                this.socketExecutor.submit(this::socketReceiveLoop);
                this.pingExecutor.submit(this::pingLoop);
            }
        }
        catch (Exception e) {
            this.getListenersHandlers().error(this, e);
        }
    }

    @Override
    public void stop() {
        this.stop("User stop.");
    }

    public void stop(String cause) {
        this.running = false;
        try {
            IllegalStateException ex = new IllegalStateException("Connection closed");
            this.pending.forEach((id, fut) -> fut.completeExceptionally(ex));
            this.pending.clear();
            this.pingExecutor.shutdownNow();
            this.socketExecutor.shutdownNow();
            this.sendExecutor.shutdownNow();
            if (this.out != null) {
                this.out.close();
            }
            if (this.in != null) {
                this.in.close();
            }
            if (this.socket != null) {
                this.socket.close();
            }
            this.getListenersHandlers().stop(this);
            this.getListenersHandlers().disconnected(this, new Exception(cause));
        }
        catch (Exception e) {
            this.getListenersHandlers().error(this, e);
        }
    }

    @Override
    public Service.Type getType() {
        return Service.Type.CLIENT;
    }

    public CompletableFuture<Message> sendRequest(String route, byte[] data) {
        return this.sendRequest(route, data, 0L);
    }

    public CompletableFuture<Message> sendRequest(String route, byte[] data, long timeoutMillis) {
        String socketId = UUID.randomUUID().toString();
        CompletableFuture<Message> openFut = this.sendControlRequest("#_SYS++DS/OPEN", socketId.getBytes(StandardCharsets.UTF_8), timeoutMillis);
        CompletionStage result = openFut.thenComposeAsync(ack -> {
            try {
                Socket s = new Socket(this.getConfig().getHostname(), this.getConfig().getPort());
                s.setReceiveBufferSize(0x100000);
                s.setSendBufferSize(0x100000);
                DataSocketClient dsc = new DataSocketClient(this, s, socketId);
                dsc.start();
                this.dataSockets.put(socketId, dsc);
                CompletableFuture<Message> xfer = dsc.sendRequest(route, data, timeoutMillis);
                return xfer.whenComplete((msg, err) -> {
                    this.dataSockets.remove(socketId);
                    try {
                        s.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                });
            }
            catch (Exception openErr) {
                CompletableFuture failed = new CompletableFuture();
                failed.completeExceptionally(openErr);
                return failed;
            }
        });
        if (timeoutMillis > 0L) {
            SCHED.schedule(() -> ClientService.lambda$sendRequest$2((CompletableFuture)result, socketId), timeoutMillis, TimeUnit.MILLISECONDS);
        }
        return result;
    }

    public CompletableFuture<Message> sendControlRequest(String route, byte[] payload) {
        return this.sendControlRequest(route, payload, 0L);
    }

    public CompletableFuture<Message> sendControlRequest(String route, byte[] payload, long timeoutMillis) {
        Message req = new Message(route, payload);
        req.setPayload(payload);
        CompletableFuture<Message> fut = new CompletableFuture<Message>();
        this.pending.put(req.getMessageId(), fut);
        this.sendExecutor.submit(() -> {
            block2: {
                try {
                    this.writeMessage(req);
                }
                catch (Exception e) {
                    CompletableFuture<Message> p = this.pending.remove(req.getMessageId());
                    if (p == null || p.isDone()) break block2;
                    p.completeExceptionally(e);
                }
            }
        });
        if (timeoutMillis > 0L) {
            SCHED.schedule(() -> {
                CompletableFuture<Message> p = this.pending.remove(req.getMessageId());
                if (p != null && !p.isDone()) {
                    p.completeExceptionally(new Exception("Timeout waiting for response: " + req.getMessageId()));
                }
            }, timeoutMillis, TimeUnit.MILLISECONDS);
        }
        return fut;
    }

    private boolean handshake() {
        try {
            StatiCom.writeLine(this.out, "#_START-MAIN-SOCKET++");
            StatiCom.writeLine(this.out, this.rsa.getPublicKeyBase64());
            this.rsa.setPublicKeyBase64(StatiCom.readLineString(this.in));
            StatiCom.writeLine(this.out, this.rsa.encrypt(this.aes.getKeyBase64()));
            this.aes.setCounterKeyBase64(this.rsa.decrypt(StatiCom.readLineString(this.in)));
            StatiCom.writeLine(this.out, this.aes.encrypt(this.systemInfo.getMachineId()));
            String data = this.aes.decrypt(StatiCom.readLineString(this.in));
            this.serverMachineId = data.substring(0, 36);
            this.clientId = data.substring(36, 72);
        }
        catch (Exception e) {
            this.getListenersHandlers().handshakeFailed(this, e);
            return false;
        }
        this.getListenersHandlers().handshakeCompleted(this);
        return true;
    }

    private void socketReceiveLoop() {
        while (this.running) {
            try {
                String encHeaderStr = StatiCom.readLineString(this.in);
                byte[] encHeader = Base64.getDecoder().decode(encHeaderStr);
                byte[] headerPlain = this.aes.decrypt(encHeader, null);
                String header = new String(headerPlain, StandardCharsets.UTF_8);
                Message msg = Message.fromHeader(header);
                int total = msg.getLength().intValueExact();
                byte[] encPayload = total > 0 ? this.readPayloadWithProgress(this.in, total, msg) : new byte[]{};
                byte[] plain = total > 0 ? this.aes.decrypt(encPayload, encHeader) : new byte[]{};
                msg.setPayload(plain);
                try {
                    this.stats.snapshot();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
                this.stats.onMsgRx(msg.getRoute());
                if (msg.getType() == '1') {
                    CompletableFuture<Message> fut = this.pending.remove(msg.getMessageId());
                    if (fut != null) {
                        fut.complete(msg);
                    }
                    if (msg.getRoute().equals("#_SYS++PING")) {
                        this.getListenersHandlers().ping(this, msg);
                        continue;
                    }
                    this.getListenersHandlers().controlResponse(this, msg);
                    continue;
                }
                this.handleRequest(msg);
            }
            catch (SocketTimeoutException encHeaderStr) {
            }
            catch (Exception e) {
                this.stop(e.getLocalizedMessage());
            }
        }
    }

    private void handleRequest(Message req) {
        try {
            switch (req.getRoute()) {
                case "#_SYS++DS/OPEN-REQ": {
                    String socketId = new String(req.getPayload(), StandardCharsets.UTF_8);
                    try {
                        Socket s = new Socket(this.getConfig().getHostname(), this.getConfig().getPort());
                        DataSocketClient dsc = new DataSocketClient(this, s, socketId);
                        dsc.start();
                        this.dataSockets.put(socketId, dsc);
                        req.setResponse("OK".getBytes(StandardCharsets.UTF_8));
                        this.writeMessage(req);
                    }
                    catch (Exception e) {
                        req.setError("DS-OPEN-FAILED".getBytes(StandardCharsets.UTF_8));
                        try {
                            this.writeMessage(req);
                        }
                        catch (Exception dsc) {}
                    }
                    break;
                }
                case "#_SYS++STREAM/OPEN-REQ": {
                    String streamId = new String(req.getPayload(), StandardCharsets.UTF_8);
                    try {
                        Socket s = new Socket(this.getConfig().getHostname(), this.getConfig().getPort());
                        StreamSocketClient sc = new StreamSocketClient(this, s, streamId, 256);
                        sc.start();
                        this.streams.put(streamId, sc);
                        req.setResponse("OK".getBytes(StandardCharsets.UTF_8));
                        this.writeMessage(req);
                    }
                    catch (Exception e) {
                        req.setError("STREAM-OPEN-FAILED".getBytes(StandardCharsets.UTF_8));
                        try {
                            this.writeMessage(req);
                        }
                        catch (Exception exception) {}
                    }
                    break;
                }
                case "#_SYS++STREAM/CLOSE": {
                    String streamId = new String(req.getPayload(), StandardCharsets.UTF_8);
                    StreamSocketClient sc = this.streams.remove(streamId);
                    if (sc != null) {
                        sc.stop();
                    }
                    req.setResponse("CLOSE-ACK".getBytes(StandardCharsets.UTF_8));
                    this.writeMessage(req);
                    break;
                }
                default: {
                    this.writeMessage(this.getListenersHandlers().controlRequest(this, req));
                }
            }
        }
        catch (Exception e) {
            this.getListenersHandlers().error(this, e);
        }
    }

    private byte[] readPayloadWithProgress(InputStream in, int total, Message msg) throws Exception {
        int got;
        byte[] buf = new byte[total];
        long recvd = 0L;
        int lastPercent = -1;
        for (int off = 0; off < total; off += got) {
            int want = Math.min(1024, total - off);
            got = in.read(buf, off, want);
            if (got != -1) continue;
            throw new EOFException("EOF during payload read");
        }
        return buf;
    }

    private void onReceiveProgress(Message msg, long recvd, long total, int percent) {
        this.getListenersHandlers().progressRx(this, msg, recvd, total, percent);
    }

    private void onSendProgress(Message msg, long sent, long total, int percent) {
        this.getListenersHandlers().progressTx(this, msg, sent, total, percent);
    }

    private void writeMessage(Message msg) throws Exception {
        if (msg == null) {
            return;
        }
        try {
            this.stats.onMsgTx(msg.getRoute());
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        byte[] plainPayload = msg.getPayload();
        int encLen = this.aes.encryptedLength(plainPayload.length);
        byte[] saved = plainPayload;
        msg.setPayload(new byte[encLen]);
        String headerStr = msg.getHeader();
        msg.setPayload(saved);
        String encHeaderStr = this.aes.encrypt(headerStr);
        StatiCom.writeLine(this.out, encHeaderStr);
        byte[] encHeader = Base64.getDecoder().decode(encHeaderStr);
        byte[] encPayload = this.aes.encrypt(plainPayload, encHeader);
        assert (encPayload.length == encLen);
        msg.setPayload(encPayload);
        BigInteger lenBI = msg.getLength();
        if (lenBI.signum() > 0) {
            int total = msg.getPayload().length;
            if (lenBI.compareTo(BigInteger.valueOf(total)) != 0) {
                throw new IllegalStateException("Header length mismatch: header=" + String.valueOf(lenBI) + " bytes, actual=" + total);
            }
            long sent = 0L;
            int lastPercent = -1;
            for (int offset = 0; offset < total; offset += 1024) {
                int chunk = Math.min(1024, total - offset);
                this.out.write(msg.getPayload(), offset, chunk);
                this.out.flush();
                int percent = (int)((sent += (long)chunk) * 100L / (long)total);
                if (percent == lastPercent) continue;
                this.onSendProgress(msg, sent, total, percent);
                lastPercent = percent;
            }
        }
    }

    public void removeDataSocket(String socketId) {
        this.dataSockets.remove(socketId);
    }

    public CompletableFuture<DataSocketClient> openDataSocketAsync(long timeoutMillis) {
        String socketId = UUID.randomUUID().toString();
        CompletableFuture<Message> req = this.sendRequest("#_SYS++DS/START", socketId.getBytes(StandardCharsets.UTF_8), timeoutMillis);
        return req.thenApplyAsync(response -> {
            try {
                Socket s = new Socket(this.getConfig().getHostname(), this.getConfig().getPort());
                DataSocketClient dsc = new DataSocketClient(this, s, socketId);
                dsc.start();
                this.dataSockets.put(socketId, dsc);
                return dsc;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void pingLoop() {
        while (this.running) {
            try {
                byte[] payload = this.getListenersHandlers().feedPing(this);
                if (payload == null) {
                    payload = new byte[]{};
                }
                long timeoutMs = Math.max(1000, this.getConfig().getPingTimeout());
                CompletableFuture<Message> f = this.sendControlRequest("#_SYS++PING", payload, timeoutMs);
                ((CompletableFuture)f.thenAccept(resp -> {
                    Timestamp echoed = resp.getRequested();
                    if (echoed != null) {
                        this.lastRttMs = System.currentTimeMillis() - echoed.getTime();
                    }
                    this.pingMisses = 0;
                })).exceptionally(ex -> {
                    int m = ++this.pingMisses;
                    this.getListenersHandlers().error(this, (Throwable)ex);
                    if (m >= this.getConfig().getMaxPingMisses()) {
                        this.stop("Ping timeout (" + m + "/" + this.getConfig().getMaxPingMisses() + ")");
                    }
                    return null;
                });
                Thread.sleep(Math.max(250, this.getConfig().getPingDelay()));
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                break;
            }
            catch (Exception e) {
                this.getListenersHandlers().error(this, e);
            }
        }
    }

    public CompletableFuture<StreamSocketClient> openStreamAsync(String streamId, int queueSize, long timeoutMs) {
        CompletableFuture<Message> ctrl = this.sendControlRequest("#_SYS++STREAM/OPEN-REQ", streamId.getBytes(StandardCharsets.UTF_8), timeoutMs);
        return ctrl.thenApplyAsync(ok -> {
            try {
                Socket s = new Socket(this.getConfig().getHostname(), this.getConfig().getPort());
                StreamSocketClient sc = new StreamSocketClient(this, s, streamId, queueSize);
                sc.start();
                this.streams.put(streamId, sc);
                return sc;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public AutoCloseable listenStream(final BiConsumer<String, StreamFrame> onFrame) {
        return this.getListenersHandlers().register(new ClientListeners.StreamMessage(){

            @Override
            public void streamFrame(Object caller, Timestamp ts, StreamSocketClient sc, StreamFrame frame) {
                if (onFrame != null) {
                    onFrame.accept(frame.streamId, frame);
                }
            }
        });
    }

    public boolean sendThroughStream(String streamId, StreamFrame frame) {
        StreamSocketClient sc = this.streams.get(streamId);
        return sc != null && sc.offerFrame(frame);
    }

    public CompletableFuture<Void> closeStreamAsync(String streamId, long timeoutMs) {
        StreamSocketClient sc = this.streams.remove(streamId);
        if (sc != null) {
            sc.stop();
        }
        return this.sendControlRequest("#_SYS++STREAM/CLOSE", streamId.getBytes(StandardCharsets.UTF_8), timeoutMs).thenAccept(x -> {});
    }

    private static /* synthetic */ void lambda$sendRequest$2(CompletableFuture result, String socketId) {
        if (!result.isDone()) {
            result.completeExceptionally(new TimeoutException("Timeout in sendDataRequest (overall): " + socketId));
        }
    }
}

