import {
  RealtimeChannel,
  RealtimePostgresChangesFilter,
  RealtimePostgresChangesPayload,
  SupabaseClient,
} from "@supabase/supabase-js";
import Debug from "debug";
import { isBrowser, keys } from "lib/util";
import { Filter } from "./filter";
import PQueue from "p-queue";

const debug = Debug("oom:SubscriptionManager");

export type SubscriptionSource = "server" | "client";
type SubscriptionRemove = () => void;

export type SubscriptionCallback = <T extends { [index: string]: any }>(
  data: RealtimePostgresChangesPayload<T>,
  source: SubscriptionSource,
) => void;

let lastId = 1;
function nextId() {
  return String(lastId++);
}

/**
 * Manages subscriptions to the realtime changes feed. This is a singleton class
 */
export class SubscriptionManager {
  static supabase: SupabaseClient;
  static instance = new SubscriptionManager();
  static add(tableName: string, filter: any, callback: SubscriptionCallback) {
    return this.instance.add(tableName, filter, callback);
  }

  private channelId: number = 0;
  private channel: RealtimeChannel | undefined;
  private keyToFilter: Map<string, RealtimePostgresChangesFilter<any>> = new Map();
  private keyToIds: Map<string, Set<string>> = new Map();
  private idToKey: Map<string, string> = new Map();
  private callbacksById: Map<string, SubscriptionCallback> = new Map();

  constructor(private name: string = "subscription_manager") {
    if (isBrowser()) {
      window.addEventListener("beforeunload", () => {
        this.unsubscribe();
      });
    }
  }

  private callbackName(tableName: string, filters: string) {
    return `${tableName}:${filters}`;
  }

  private remove(id: string) {
    const key = this.idToKey.get(id);
    if (!key) return;

    this.idToKey.delete(id);
    this.callbacksById.delete(id);

    const ids = this.keyToIds.get(key);
    if (ids) {
      ids.delete(id);
      if (ids.size === 0) {
        // If there are no more callbacks for this filter, unsubscribe
        this.keyToIds.delete(key);
        this.keyToFilter.delete(key);
      }
    }

    // always call subscribe because we have to set up the channel from scratch
    // to remove
    this.subscribe();
  }

  /**
   * This is the only method external users of this class really need to call
   *
   * @param tableName
   * @param filter
   * @param callback
   * @returns
   */
  async add(tableName: string, filter: Filter, callback: SubscriptionCallback): Promise<SubscriptionRemove> {
    const id = nextId();
    const remove = () => this.remove(id);
    this.callbacksById.set(id, callback);

    if (Object.keys(filter).length > 1) {
      throw new Error("Only one filter is supported at this time");
    }

    const channelFilter = keys(filter).reduce((acc, k) => {
      acc = acc ? `${acc},` : "";
      const value = filter[k];
      if (value instanceof Array) {
        return `${acc}${k}=in.(${value.map((v) => `'${v}'`).join(",")})`;
      } else {
        return `${acc}${k}=eq.${value}`;
      }
    }, "");
    const key = this.callbackName(tableName, channelFilter);
    this.idToKey.set(id, key);

    // If we already have a subscription for this filter, just add the callback
    if (this.keyToFilter.has(key)) {
      this.keyToIds.get(key)?.add(id);
      return remove;
    }

    const realtimeFilter: RealtimePostgresChangesFilter<any> = {
      event: "*",
      schema: "public",
      table: tableName,
    };
    if (channelFilter !== "") {
      realtimeFilter["filter"] = channelFilter;
    }

    this.keyToFilter.set(key, realtimeFilter);
    this.keyToIds.set(key, new Set([id]));
    // Otherwise, subscribe to the channel
    await this.subscribe();
    return remove;
  }

  _subscribeQueue = new PQueue({ concurrency: 1 });
  async subscribe() {
    if (!SubscriptionManager.supabase) {
      console.log("No supabase client");
      return;
    }

    this._subscribeQueue.clear();
    await this._subscribeQueue.add(async () => {
      try {
        const lastChannel = this.channel;
        const nextChannelId = this.channelId + 1;
        const nextChannel = SubscriptionManager.supabase.channel(`${this.name}:${nextChannelId}`);

        const filters = Array.from(this.keyToFilter.values());
        if (filters.length === 0) return;

        for (const [key, filter] of this.keyToFilter) {
          debug("subscribe", key, filter);
          nextChannel.on("postgres_changes", filter, (payload) => {
            this.receive(key, payload);
          });
        }

        await new Promise((resolve, reject) => {
          nextChannel.subscribe((cb, err) => {
            debug("subscribed", cb, nextChannelId);
            resolve(void 0);
          });
        });

        lastChannel?.unsubscribe();

        this.channelId = nextChannelId;
        this.channel = nextChannel;
      } catch (e) {
        console.error("Error subscribing", e);
      }
    });
  }

  async unsubscribe() {
    if (!this.channel) return;

    await this.channel.unsubscribe();
    this.channel = undefined;
    await SubscriptionManager.supabase.removeAllChannels();
  }

  /**
   * Emit a fake change event to all subscribers of a table. This is useful for testing
   *
   * @param tableName
   * @param type
   * @param newData
   */
  emit(tableName: string, type: "INSERT" | "UPDATE" | "DELETE", newData: object, oldData: object = {}) {
    for (const key of this.keyToIds.keys()) {
      if (key.startsWith(tableName + ":")) {
        this.receive(
          key,
          {
            eventType: type,
            new: newData,
            schema: "public",
            table: tableName,
            commit_timestamp: new Date().toISOString(),
            errors: [],
            old: oldData,
          },
          "client",
        );
      }
    }
  }

  private receive(
    key: string,
    payload: RealtimePostgresChangesPayload<any>,
    source: SubscriptionSource = "server",
  ) {
    debug("receive", key, payload, source);

    const ids = this.keyToIds.get(key);
    if (!ids) return;

    for (const id of ids) {
      debug("receive id", id, payload);

      const callback = this.callbacksById.get(id);
      if (callback) callback(payload, source);
    }
  }
}

// @ts-ignore
globalThis.SubscriptionManager = SubscriptionManager;
