/*
 * Copyright © 2022 - Zimproov.
 * All rights reserved.
 */

// Import the debug module.
import debug from "debug";
// Import the RxJS library.
import { concatMap, filter, from, fromEvent, lastValueFrom, map, Observable, retry, Subscriber, timeout } from "rxjs";
// Import the JSON:API message.
import { MessageHelper, messageValidator } from "@andromeda/json-api";
// Import the converter request resources.
import { AssetConverterRequest, AssetConverterStatus } from "@andromeda/asset-conversion";
// Import the validation error class.
import { check } from "@andromeda/validation";
// Import the target resolver.
import { resolveTarget } from "@andromeda/xhr";
import WebSocket from "isomorphic-ws";


/**
 * Returns the converter socket.
 * Opens a new socket if required.
 *
 * @returns {Promise<WebSocket>} A promise that resolves with the socket to the converter app.
 */
export async function getConverterSocket(): Promise<WebSocket> {
    return loadConverterURL().then(url => {
        return new Promise<WebSocket>((resolve, reject) => {
            const socket = new WebSocket(url);
            socket.addEventListener("close", reject);
            socket.addEventListener("open", () => {
                resolve(socket);
            });
        });
    });
}

/**
 * Returns the load-balancer socket.
 * Opens a new socket if required.
 *
 * @returns {Promise<WebSocket>} A promise that resolves with the socket to the converter app.
 */
function getLoadBalancerSocket(): Promise<WebSocket> {
    if (lbSocket) return lbSocket;

    const observable = new Observable<WebSocket>(function openLBSocket(subscriber: Subscriber<WebSocket>) {
        resolveConverterTarget().then(url => {
            // Open the socket.
            url.pathname += "/converter";
            const socket = new WebSocket(url);
            socket.addEventListener("close", () => {
                subscriber.error();
                lbSocket = null;
            });
            socket.addEventListener("open", () => {
                log("Connected to the load balancer !");
                subscriber.next(socket);
                subscriber.complete();
            });
        }).catch(subscriber.error.bind(subscriber));
    });

    return (lbSocket = lastValueFrom(observable.pipe(retry({ delay: 5000 }))));
}

/** Resolves the converter url. */
function resolveConverterTarget(): Promise<URL> {
    // Resolve the converter target.
    return resolveTarget("converter").then(config => {
        if (!config) {
            throw new Error("Could not find the converter target !");
        }

        // Build the url.
        const target  = config[1];
        const url = new URL("https://example.com");
        url.protocol = target.ssl ? "wss:" : "ws:";
        url.hostname = target.hostname;
        url.port = String(target.port ?? (target.ssl ? 443 : 80));
        url.pathname = target.root ?? "";
        if (!url.pathname.endsWith("/")) url.pathname += "/";
        return url;
    });
}

/**
 * Loads the url to the converter api.
 *
 * @return {Promise<URL>} A promise that resolves with the url.
 */
async function loadConverterURL(): Promise<URL> {
    if (converterURL) {
        return converterURL;
    }

    const observable = new Observable(function openConverterSocket(subscriber: Subscriber<URL>): void {
        getLoadBalancerSocket().then(lbSocket => {
            lbSocket.send(JSON.stringify({ data: { type: AssetConverterRequest.Type }}));

            const message = (fromEvent(lbSocket, "message") as Observable<MessageEvent>).pipe(
                filter((event: MessageEvent): event is MessageEvent & { data: string } => {
                    return typeof event.data === "string"
                }),
                map(event => event.data),
                map(message => JSON.parse(message)),
                map(message => {
                    check(message, messageValidator, true);
                    return message;
                }),
                map(message => MessageHelper.findFrom(
                    message.data ?? [], AssetConverterStatus.Type, AssetConverterStatus.validate
                )),
                concatMap(value => from(value))
            );

            // Check the message type.
            const subscription = message.subscribe(message => {
                if (message.attributes.stage === AssetConverterStatus.ReadinessStage.ready) {
                    subscription.unsubscribe();

                    // Resolve the converter target.
                    resolveConverterTarget().then(url => {
                        url.pathname += `/converter/${message.id}`;
                        subscriber.next(url);
                        subscriber.complete();
                    }).catch(subscriber.error.bind(subscriber));
                }
            });

            lbSocket.addEventListener("close", event => {
                subscriber.error(
                    new Error(`Socket closed at an unexpected time with status ${event.code} (${event.reason})` )
                );
            });
        }).catch(subscriber.error.bind(subscriber));
    });

    return converterURL = new Promise<URL>((resolve, reject) => {
        lastValueFrom(observable.pipe(timeout({ first: 90 * 1000 }), retry({ delay: 5000 })))
            .then(url => { converterURL = null; resolve(url); }).catch(reject);
    });
}

/** Memo of the used socket. */
let lbSocket: Promise<WebSocket> | null = null;
let converterURL: Promise<URL> | null = null;
const log = debug("asset-converter-client:converter:socket");
