import { Subject, Observable, from, Observer } from "rxjs";
import * as signalR from "@microsoft/signalr";
import { switchMap, retryWhen, delay, take } from "rxjs/operators";

type EventType = "OrderUpdated" | "FixturesUpdated";

interface IEvent {
  eventType: EventType;
  memberships: string[];
  version: number;
  messageVersion: number; // should be 3
  id: string;
}

// retain a collection of Subjects; one per API endpoint
// (typically only ever expected to contain a single entry)
const connections: { [key: string]: Subject<IEvent> } = {};

/**
 * Create a signalr connection with connection retry pattern
 * @param apiUrl base API endpoint
 */
export function connection(apiUrl: string): Observable<IEvent> {
  if (!Object.prototype.hasOwnProperty.call(connections, apiUrl)) {
    // use a subject to ensure all consumers subscribe to the same
    // observable instance, and thus the same connection
    connections[apiUrl] = new Subject<IEvent>();
    from(
      fetch(`${apiUrl}/notifications`, { method: "POST" })
        .then((response) => response.json())
        .then((data) =>
          new signalR.HubConnectionBuilder()
            .withUrl(data.url, { accessTokenFactory: () => data.accessToken })
            .configureLogging(signalR.LogLevel.Debug)
            .build()
        )
    )
      // on failure, retry every 5 seconds up to 10 times
      .pipe(retryWhen((errors) => errors.pipe(delay(5000)).pipe(take(10))))
      // map to updates from this connection
      .pipe(switchMap((connection) => updates(connection)))
      .subscribe((event) => connections[apiUrl].next(event));
  }
  return connections[apiUrl];
}

/**
 * Handle inbound events from a signalr connection
 * @param signalrConnection SignalR connection
 */
function updates(signalrConnection: signalR.HubConnection): Observable<IEvent> {
  const sources: string[] = ["seatrade"];
  return Observable.create(async (observer: Observer<IEvent>) => {
    sources.forEach((source) => {
      signalrConnection.on(source, (event: IEvent) => {
        observer.next(event);
      });
    });

    signalrConnection.onclose(() => {
      observer.error("SignalR connection closed");
      window.location.reload(); // token has expired, refresh to force login
    });
    signalrConnection.start().catch((err) => observer.error(`SignalR connection failed to start. Err: ${err}`));
  });
}
