/*
 * Copyright © 2022 - Zimproov.
 * All rights reserved.
 */

// Import the isomorphic WebSocket implementation.
import WebSocket from "isomorphic-ws";
// Import the RxJS library.
import { fromEvent, Observable, Subject } from "rxjs";
// Import the JSON:API
import { MessageHelper, messageValidator } from "@andromeda/json-api";
import { check } from "@andromeda/validation";
import { Asset } from "@andromeda/resources";

// Import the WebAsset interface.
import { WebAsset } from "./web-asset";
// Import the message encoder.
import { encodeAssetMessage, decodeAssetMessage } from "./message";


/**
 * Interface used to describe the store used by the {@link AssetExchange} class.
 * A Store should allow retrieval of any asset requested by the
 */
export interface AssetExchangeStore {
    /**
     * Loads the requested asset from the store.
     *
     * @param {string} id The identifier of the requested asset.
     * @return {Promise<WebAsset | null | undefined>}
     * A promise that resolves with the asset found, or null if it does not exist.
     */
    get(id: string): Promise<WebAsset | null | undefined>;
}

/**
 * Class used to exchange assets over a {@link WebSocket} connection.
 * Answer requests by querying the provided {@link AssetExchangeStore}.
 */
export class AssetExchange extends Subject<WebAsset> {
    /** Observable to use for the message events. */
    private _message: Observable<WebSocket.MessageEvent>;

    /**
     * Class constructor.
     * Starts listening for incoming messages on the socket.
     *
     * @param {WebSocket} socket The socket that the manager will be listening on.
     * @param {AssetExchangeStore} store The store used by the manager when uploading assets.
     */
    public constructor(public socket: WebSocket, public store: AssetExchangeStore) {
        super();
        this._message = fromEvent(socket, "message") as Observable<WebSocket.MessageEvent>;
        socket.binaryType = "arraybuffer";
        this._listen();
    }

    // TODO: Add the updates object.
    // public get updates(): Observable<AssetManagerStatusUpdate> {}

    /**
     * Queries the remote exchange for the given asset.
     *
     * @param {string} id The id of the requested asset.
     * @return {Promise<WebAsset>} A promise that resolves with the asset and its data.
     */
    public async query(id: string): Promise<WebAsset> {
        // Ensure that the socket is writable.
        if (this.socket.readyState !== WebSocket.OPEN) {
            throw new Error("Socket is not writable !");
        }

        // Send the request over the socket.
        const request = new MessageHelper({ type: Asset.Type, id });
        this.socket.send(JSON.stringify(request));

        // Wait for the asset response.
        return new Promise<WebAsset>((resolve, reject): void => {
            const subscription = this._message.subscribe({
                next: function waitForValidAsset(this: AssetExchange, event: WebSocket.MessageEvent): void {
                    if (!(event.data instanceof ArrayBuffer)) return;

                    // Decode the message.
                    let decoded: WebAsset[];
                    try {
                        decoded = decodeAssetMessage(event.data);
                    } catch (e: unknown) {
                        console.warn("Failed to decode an asset message: ", e);
                        this.socket.close(3100, "Invalid asset message !");
                        return;
                    }

                    // Check if any of the assets is the requested one.
                    const asset = decoded.find(asset => asset.resource.id === id);
                    if (asset) {
                        resolve(asset);
                        subscription.unsubscribe();
                    }
                }.bind(this),
                complete: () => reject(new Error("The socket closed before the message was received."))
            });

            // Unsubscribe from the events once the socket is closed.
            this.socket.addEventListener("close", subscription.unsubscribe.bind(subscription), { once: true });
            setTimeout(() => {
                subscription.unsubscribe();
                reject(new Error("Failed to read the asset after 120seconds."));
            }, 2 * 60 * 1000);
        });
    }

    /** Listens for incoming asset request messages. */
    private _listen(): void {
        // Wrap the socket in an observable.
        const subscription = this._message.subscribe(this._onIncomingMessage.bind(this));

        // Unsubscribe from the events once the socket is closed.
        this.socket.addEventListener("close", subscription.unsubscribe.bind(subscription), { once: true });
    }

    /** Listener used to handle incoming messages. */
    private _onIncomingMessage(message: WebSocket.MessageEvent): void {
        // Check if the message is an asset request.
        if (typeof message.data !== "string") return;
        let jsonApiMessage: MessageHelper;
        try {
            const data = JSON.parse(message.data);
            check(data, messageValidator, true);
            jsonApiMessage = new MessageHelper(data.data ?? null, data.included);
        } catch (e: unknown) {
            console.warn("Received a non JSON:API message. Closing the socket.");
            this.socket.close(3100, "Non JSON:API text message received");
            return;
        }
        const assetRequests = jsonApiMessage.findMany(Asset.Type);
        if (assetRequests.length <= 0) return;

        // Load the assets from the store and send them to the.
        Promise.all(assetRequests.map(request => this.store.get(request.id)))
            .then((assets: (WebAsset | null | undefined)[]): WebAsset[] => assets.filter(
                function filterNullAssets(asset: WebAsset | null | undefined, index: number): asset is WebAsset {
                    if (asset === null || asset === undefined) {
                        console.warn(
                            "Failed to find the asset with id \"%s\" in the store",
                            assetRequests[index].id
                        );
                        return false;
                    }
                    return true;
                }
            ))
            .then(encodeAssetMessage)
            .then(this.socket.send.bind(this.socket));
    }
}
