/*
 * Copyright © 2022 - Zimproov.
 * All rights reserved.
 */

// Import the RxJS module.
import {
    BehaviorSubject,
    concatMap,
    filter,
    firstValueFrom,
    from,
    fromEvent,
    map,
    Observable,
    retry,
    Subscriber, timer
} from "rxjs";
// Import the UUID generator.
import { v4 } from "uuid";
// Import the isomorphic websocket definition.
import WebSocket from "isomorphic-ws";
// Import the conversion interface.
import {
    AssetConversionRequest,
    AssetConversionResult,
    AssetConversionStatus,
    Conversion
} from "@andromeda/asset-conversion";
// Import the WebAsset interface.
import { AssetExchange, WebAsset } from "@andromeda/converter-lib";
// Import the asset resource.
import { Asset } from "@andromeda/resources";
// Import the JSON:API message interface.
import { MessageHelper, messageValidator } from "@andromeda/json-api";
// Import the JSON schema checker.
import { check } from "@andromeda/validation";

// Import the web asset modules.
import { loadWebAsset, storeWebAsset, uploadWebAsset } from "../web-asset";
// Import the info status.
import {
    ConversionInfo,
    ConversionInfoConverting,
    ConversionInfoDone,
    ConversionInfoDownloading,
    ConversionInfoPreparing,
    ConversionInfoQueued,
    ConversionInfoStoring,
    ConversionInfoUninitialized,
    ConversionInfoUploading,
    ConversionStatus
} from "./status";
import { getConverterSocket } from "./socket";


/**
 * Converts a given {@link WebAsset}.
 *
 * @param {WebAsset} asset The asset to convert.
 * @param {Conversion} conversion The conversion to run.
 * @return {Promise<WebAsset[]>} A promise that resolves with a list of all the converted assets.
 */
export function convert(asset: WebAsset, conversion: Conversion): BehaviorSubject<ConversionInfo> {
    // Build the conversion cold observable.
    const observable = new Observable(conversionRunner);

    // Prepare a new wrapper behaviour subject.
    const conversionSubject = new BehaviorSubject<ConversionInfo>({ status: ConversionStatus.uninitialized });
    observable.pipe(retry({
        delay: e => {
            console.warn("Failed to run a conversion. Trying again in 5 seconds");
            console.warn(e);
            return timer(5000);
        }
    })).subscribe(conversionSubject);

    // Return the subject.
    return conversionSubject;

    /** Inner function used to run the conversion. */
    function conversionRunner(subscriber: Subscriber<ConversionInfo>): void {
        subscriber.next({ status: ConversionStatus.preparing });
        getConverterSocket()
            .then(async socket => {
                // Fail the observer whenever the socket closes.
                socket.addEventListener("close", subscriber.error.bind(subscriber));

                // Run the generator function.
                for await (const status of generator(socket)) { subscriber.next(status); }

                // Finish the operation.
                subscriber.complete(); socket.close();
            })
            .catch(subscriber.error.bind(subscriber));

        // Generator function used to return all the statuses one after the other.
        async function* generator(socket: WebSocket): AsyncGenerator<ConversionInfo> {
            // Store the asset locally.
            await storeWebAsset(asset);
            const assetManager = new AssetExchange(socket, { get: id => loadWebAsset(id) });

            // Build the asset conversion request.
            yield { status: ConversionStatus.queued };
            const request: AssetConversionRequest = {
                type: AssetConversionRequest.Type, id: v4(),
                attributes: conversion,
                relationships: { asset: { data: { type: Asset.Type, id: asset.resource.id } } }
            };
            socket.send(JSON.stringify(new MessageHelper(request)));

            // Wait for the conversion updates.
            yield { status: ConversionStatus.uploading, progress: 1 };
            const updates = (fromEvent(socket, "message") as Observable<WebSocket.MessageEvent>)
                .pipe(
                    filter((event): event is WebSocket.MessageEvent & { data: string } => {
                        return typeof event.data === "string"
                    }),
                    map(event => event.data),
                    map(message => JSON.parse(message)),
                    map(message => {
                        check(message, messageValidator, true);
                        return message;
                    }),
                    map(message => MessageHelper.findFrom(
                        message.data ?? [], AssetConversionStatus.Type, AssetConversionStatus.validate
                    )),
                    concatMap(value => from(value))
                );
            updates.subscribe(stage => {
                switch (stage.attributes.stage) {
                case AssetConversionStatus.ConversionStage.starting:
                    conversionSubject.next({ status: ConversionStatus.converting, progress: 1 });
                    break;
                }
            });

            // Wait for the conversion result.
            const result = await firstValueFrom((fromEvent(socket, "message") as Observable<WebSocket.MessageEvent>)
                .pipe(
                    filter((event): event is WebSocket.MessageEvent & { data: string } => {
                        return typeof event.data === "string";
                    }),
                    map(event => event.data),
                    map(message => JSON.parse(message)),
                    map(message => {
                        check(message, messageValidator, true);
                        return message;
                    }),
                    map(message => MessageHelper.findFrom(
                        message.data ?? [], AssetConversionResult.Type, AssetConversionResult.validate
                    )),
                    concatMap(message => from(message))
                )
            );
            conversionSubject.next({ status: ConversionStatus.downloading, progress: 1 });

            // Read all the assets.
            const assets = await Promise.all(
                result.relationships.assets.data.map(asset => assetManager.query(asset.id))
            );

            // Store all the assets.
            let progress = 0;
            conversionSubject.next({ status: ConversionStatus.storing, progress });
            if (assets.length > 0) {
                await Promise.all(assets.map(async asset => {
                    // Store the asset.
                    await uploadWebAsset(asset).then(() => storeWebAsset(asset));
                    // await Promise.all([ storeWebAsset(asset), uploadWebAsset(asset) ]);

                    progress += 1 / assets.length;
                    conversionSubject.next({ status: ConversionStatus.storing, progress });
                }));
            }

            conversionSubject.next({ status: ConversionStatus.done, assets });
        }
    }
}

export type {
    ConversionInfo,
    ConversionInfoDone,
    ConversionInfoStoring,
    ConversionInfoDownloading,
    ConversionInfoConverting,
    ConversionInfoUninitialized,
    ConversionInfoUploading,
    ConversionInfoQueued,
    ConversionInfoPreparing
};
export { ConversionStatus };
