import { Injectable } from "@angular/core";
import { IMessage, RxStomp, RxStompState } from "@stomp/rx-stomp";
import { Observable, pipe, share } from "rxjs";
import { filter, map } from "rxjs/operators";

export type WebsocketMessage = IMessage;

@Injectable({
    providedIn: "root",
})
export class RxStompService extends RxStomp {
    constructor() {
        super();
    }
}

export enum WebsocketConnectionStatus {
    Connected = "Connected",
    Disconnected = "Disconnected",
    Connecting = "Connecting",
    Disconnecting = "Disconnecting",
}

@Injectable()
export class WebsocketService {
    private sharedWatchers: Map<string, Observable<WebsocketMessage>> = new Map();

    constructor(private readonly rxStompService: RxStompService) {}

    public readonly connectionStatus$ = this.rxStompService.connectionState$.pipe(
        map((state) => {
            switch (state) {
                case RxStompState.OPEN:
                    return WebsocketConnectionStatus.Connected;
                case RxStompState.CLOSED:
                    return WebsocketConnectionStatus.Disconnected;
                case RxStompState.CONNECTING:
                    return WebsocketConnectionStatus.Connecting;
                case RxStompState.CLOSING:
                    return WebsocketConnectionStatus.Disconnecting;
            }
        })
    );

    public watchTopic(topicName: string, eventTypes?: string[]): Observable<WebsocketMessage> {
        const sharedWatcher = this.sharedWatchers.get(topicName);
        if (sharedWatcher) {
            return sharedWatcher.pipe(this.filterByEvents(eventTypes));
        }

        const watcher = this.rxStompService.watch(topicName).pipe(share());
        this.sharedWatchers.set(topicName, watcher);

        return watcher.pipe(this.filterByEvents(eventTypes));
    }

    private filterByEvents(eventTypes?: string[]) {
        return pipe(
            filter((message: IMessage) => {
                if (!eventTypes) {
                    return true;
                }

                return eventTypes.includes(message.headers["event-type"]);
            })
        );
    }
}
