import { Injectable } from '@angular/core';
import { ActivatedRoute, Router } from '@angular/router';
import { Observable, ReplaySubject, Subject } from 'rxjs';
import { environment } from '../../environments/environment';
import { Topic } from '@weavix/models/src/topic/topic';
import { Utils } from '../utils/utils';

export interface TopicPayload<T> {
    topic: string[]; // Regex group matches (topic[0] is full topic string, topic[1] is first wildcard, topic[2] is second wildcard, etc.)
    payload: T;
}

interface TopicSubscription {
    subject: Subject<TopicPayload<any>>;
    regex: RegExp;
    qos: Paho.MQTT.Qos;
    created: Date;
    removed?: Date;
    raw: boolean;
}

const SUB_TIMEOUT = 30000;
const REFRESH_TIME = 1800000;

@Injectable({
    providedIn: 'root'
})
export class PubSubService {
    static username: string;
    static password: string;

    private client: Paho.MQTT.Client;
    connected: boolean = false;

    private subscriptions: {[topic: string]: TopicSubscription} = {};
    private pendingSubscriptions: {[topic: string]: TopicSubscription} = {};
    private pendingPublishes = [];
    private clientId;
    private lastMessage = new Date();
    private connectionStale = false;

    connected$: Subject<boolean> = new ReplaySubject<boolean>(1);

    constructor(private router: Router, private route: ActivatedRoute) {
        setInterval(async () => {
            try {
                await this.checkConnection();
            } catch (e) {
                console.error(e);
            }
        }, 15000);
    }


    async refreshSubscriptions() {
        await Promise.all(Object.keys(this.subscriptions).map(async topic => {
            const sub = this.subscriptions[topic];
            if (sub.created.getTime() < new Date().getTime() - REFRESH_TIME) {
                await this.subscribeInternal(topic, sub.qos);
                sub.created = new Date();
            }
        }));
    }

    private async checkConnection() {
        if (this.client) {
            if (!this.connected) {
                await this.connect();
            } else {
                await this.restoreSubscriptions();
            }
        }
    }

    private async restoreSubscriptions() {
        Object.keys(this.pendingSubscriptions).forEach(topic => {
            delete this.pendingSubscriptions[topic];
            try {
                this.client.subscribe(topic, {
                    qos: this.subscriptions[topic].qos,
                    onSuccess: () => {
                    },
                    onFailure: (err) => {
                        this.pendingSubscriptions[topic] = this.subscriptions[topic];
                        console.error(err);
                    }
                });
            } catch (e) {
                console.error(e);
            }
        });
        const publishes = this.pendingPublishes;
        this.pendingPublishes = [];
        publishes.forEach(message => {
            try {
                this.client.send(message);
            } catch (e) {
                this.pendingPublishes.push(message);
                console.error(e);
            }
        });
        await this.refreshSubscriptions();
    }

    async disconnect() {
        if (this.client) {
            try {
                this.client.disconnect();
            } catch (e) {}
        }
        this.client = null;
        this.connected = false;
    }

    async loggedIn() {
        this.clientId = `mqttjs_${String(Math.random()).substring(2, 8)}`;
        this.pendingSubscriptions = {};
        this.subscriptions = {};

        if (this.client) {
            try {
                this.client.disconnect();
            } catch (e) {}
        }

        const host = environment.pubsubApi;
        this.client = new Paho.MQTT.Client(host, environment.pubsubPort, this.clientId);
        this.client.onConnectionLost = (err) => {
            // eslint-disable-next-line no-console
            console.log(err);
            this.connected = false;
            this.connected$.next(false);
        };
        this.client.onMessageArrived = (message) => {
            this.checkLastMessage();
            // eslint-disable-next-line no-console
            console.info('Received message', message.destinationName);
            Object.keys(this.subscriptions).forEach(async topic => {
                const subs = this.subscriptions[topic];
                if (subs.subject.observers.length === 0) {
                    if (!subs.removed) subs.removed = new Date();
                    if (subs.removed.getTime() < new Date().getTime() - SUB_TIMEOUT) {
                        delete this.subscriptions[topic];
                        if (this.pendingSubscriptions[topic]) {
                            delete this.pendingSubscriptions[topic];
                            return;
                        } else {
                            try {
                                await this.unsubscribeInternal(topic);
                                return;
                            } catch (e) {
                                this.subscriptions[topic] = subs;
                            }
                        }
                    }
                } else {
                    delete subs.removed;
                }
                const parts = message.destinationName.match(subs.regex);
                if (parts) {
                    try {
                        const result = subs.raw ?
                            {
                                date: new Date().toISOString(),
                                get data() {
                                    return message.payloadString;
                                },
                                get bytes() {
                                    return message.payloadBytes;
                                }
                            } : JSON.parse(message.payloadString);
                        if (result.error) {
                            subs.subject.error(result.error);
                        } else {
                            subs.subject.next({
                                topic: parts,
                                payload: result
                            });
                        }
                    } catch (e) {
                        console.error(e);
                        subs.subject.next({
                            topic: parts,
                            payload: message.payloadString
                        });
                    }
                }
            });
        };

        await this.connect();
    }

    private async unsubscribeInternal(topic: string) {
        await new Promise((resolve, reject) => {
            this.client.unsubscribe(topic, {
                onSuccess: () => {
                    resolve(null);
                },
                onFailure: (err) => {
                    reject(err);
                }
            });
        });
    }

    private async connect() {
        await new Promise((resolve, reject) => {
            this.client.connect({
                useSSL: environment.is360Api.includes('https://') ? true : false,
                keepAliveInterval: 90,
                cleanSession: false,
                userName: PubSubService.username || '',
                password: PubSubService.password || '',
                onSuccess: async () => {
                    this.connected = true;
                    this.connected$.next(true);
                    // eslint-disable-next-line no-console
                    console.log('MQTT connection established');
                    this.checkLastMessage();
                    if (this.connectionStale) {
                        const url = location.href.split('/').slice(3);
                        if (url[0] === 'a') this.router.navigate(url.slice(2), { queryParams: this.route.snapshot.queryParams });
                        else location.reload();
                    }
                    await this.restoreSubscriptions();
                    resolve(null);
                },
                onFailure: (err) => {
                    console.error(err);
                    reject(err);
                }
            });
        });
    }

    private checkLastMessage() {
        if (new Date().getTime() - this.lastMessage.getTime() >= REFRESH_TIME) {
            this.connectionStale = true;
        }
        this.lastMessage = new Date();
    }

    async publish(topic: Topic, args: string[], payload: any, qos: Paho.MQTT.Qos = 1, retain: boolean = false) {
        let top = topic.toString();
        args.forEach(a => top = top.replace('+', a));

        const blob = payload instanceof Uint8Array ? payload : payload instanceof Blob ? await (payload as any).arrayBuffer() : JSON.stringify(payload);
        const message = new Paho.MQTT.Message(blob);
        message.destinationName = top;
        message.qos = qos;
        message.retained = retain;

        try {
            // eslint-disable-next-line no-console
            console.info('Publishing message', JSON.stringify(message));

            if (!this.client) {
                await this.loggedIn();
            }

            this.client.send(message);
        } catch (e) {
            console.error(e);
            if (qos === 1) {
                this.pendingPublishes.push(message);
            }
        }
    }

    async subscribe<T>(component: any, topic: Topic, args: string[], retain: boolean | number = false, qos: Paho.MQTT.Qos = 1, raw: boolean = false): Promise<Observable<TopicPayload<T>>> {
        if (!this.client) {
            await this.loggedIn();
        }
        let top = topic.toString();
        args.forEach(a => top = top.replace('+', a));
        const subject = this.subscriptions[top] && this.subscriptions[top].subject ||
            (retain ? new ReplaySubject<TopicPayload<T>>(retain === Infinity ? undefined : retain === true ? 1 : retain) : new Subject<TopicPayload<T>>());
        if (!this.subscriptions[top]) {
            this.subscriptions[top] = {
                subject,
                regex: new RegExp(`^${top.replace(/\./g, '\\.').replace(/\(/g, '\\(').replace(/\)/g, '\\)').replace(/#/g, '(.*)').replace(/\+/g, '([^/]+)').replace(/\*/g, '\\*')}$`),
                qos,
                created: new Date(),
                raw
            };
            if (this.connected) {
                try {
                    await this.subscribeInternal(top, qos);
                } catch (e) {
                    this.pendingSubscriptions[top] = this.subscriptions[top];
                }
            } else {
                this.pendingSubscriptions[top] = this.subscriptions[top];
            }
        }
        return Utils.safeSubscribe(component, subject.asObservable());
    }

    private async subscribeInternal(topic: string, qos: Paho.MQTT.Qos = 1) {
        await new Promise((resolve, reject) => {
            this.client.subscribe(topic, {
                qos,
                onSuccess: () => {
                    // eslint-disable-next-line no-console
                    console.log(`Subscribed to ${topic}`);
                    resolve(null);
                },
                onFailure: (err) => {
                    reject(err);
                }
            });
        });
    }
}
