import { API_ENDPOINT_HTTP } from "@/config";
import { makeAutoObservableAbstract } from "@/lib/make-auto-observable-abstract";
import type { LocalWrite } from "@/lib/sync/local-write";
import { getTableRecordsRoute } from "@api/fastAPI";
import type {
	Record as ApiRecord,
	DiffEvent,
	SessionUserId,
	TableId,
	WriteId,
} from "@api/schemas";
import type { useAuth } from "@clerk/clerk-react";
import {
	ShapeStream,
	isChangeMessage,
	isControlMessage,
} from "@electric-sql/client";
import { matchBy, matchStream } from "@electric-sql/experimental";
import { EventSource } from "eventsource";
import { runInAction } from "mobx";
import { ResultAsync } from "neverthrow";

/**
 * SyncStream
 *
 * A SyncStream has two purposes:
 * - to manage a map of items that reflects the current server state
 * - to be able to determine when a local write has been persisted into the map
 */
export abstract class SyncStream<
	TItem extends object,
	TIdKey extends keyof TItem & string,
> {
	idKey: TIdKey;
	items: Map<TItem[TIdKey], TItem>;

	constructor(idKey: TIdKey, items: Map<TItem[TIdKey], TItem>) {
		this.idKey = idKey;
		this.items = items;
	}

	/**
	 * Given a LocalWrite, return a Promise that resolves when either we've
	 * received a sync message from the server that persists this write or when
	 * attempting to match the write times out.
	 */
	abstract matchWrite(
		write: LocalWrite<TItem, TIdKey>,
	): ResultAsync<void, Error>;
}

const TIMEOUT_MS = 60_000;

/**
 * ElectricSyncStream
 *
 * Uses ElectricSQL to sync a map. Requires that the item has a write_id field
 * to use to match writes.
 */
export class ElectricSyncStream<
	TItem extends object,
	TIdKey extends keyof TItem & string,
	TWriteIdKey extends keyof TItem & string,
> extends SyncStream<TItem, TIdKey> {
	private writeIdKey: TWriteIdKey;
	private shapeStream: ShapeStream<TItem & Record<string, unknown>>;

	/**
	 * Constructs an instance of ElectricSyncStream.
	 *
	 * @param props - An object containing the following properties:
	 *   - shapeUrl: The URL of the shape to subscribe to.
	 *   - shapeHash: Hash of the shape's schema for invalidating cached requests.
	 *   - idKey: The key used to identify items in the map.
	 *   - writeIdKey: The key used to confirm writes from the server.
	 */
	constructor(props: {
		shapeUrl: string;
		shapeHash: string;
		idKey: TIdKey;
		writeIdKey: TWriteIdKey;
	}) {
		super(props.idKey, new Map());

		this.writeIdKey = props.writeIdKey;
		this.shapeStream = new ShapeStream({
			url: `${props.shapeUrl}?hash=${props.shapeHash}`,
		});
		// Adapted from https://github.com/electric-sql/electric/blob/main/packages/typescript-client/src/shape.ts
		this.shapeStream.subscribe((messages) => {
			runInAction(() => {
				for (const message of messages) {
					if (isChangeMessage(message)) {
						switch (message.headers.operation) {
							case "insert":
								this.items.set(
									message.value[this.idKey] as TItem[TIdKey],
									message.value,
								);
								break;

							case "update": {
								const current = this.items.get(
									message.value[this.idKey] as TItem[TIdKey],
								);
								if (current === undefined) {
									console.error("Tried to update item that doesn't exist");
									return;
								}
								this.items.set(message.value[this.idKey] as TItem[TIdKey], {
									...current,
									...message.value,
								});
								break;
							}

							case "delete":
								this.items.delete(message.value[this.idKey] as TItem[TIdKey]);
								break;
						}
					}
					if (isControlMessage(message)) {
						switch (message.headers.control) {
							case "up-to-date":
								break;

							case "must-refetch":
								this.items.clear();
								break;
						}
					}
				}
			});
		});

		makeAutoObservableAbstract(this);
	}

	matchWrite(write: LocalWrite<TItem, TIdKey>): ResultAsync<void, Error> {
		// A delete sync message only contains the id of the item, but
		// otherwise can match by the write_id.
		const matchFn =
			write.operation === "delete"
				? matchBy<TItem & Record<string, unknown>>(
						this.idKey,
						write.itemId as string,
					)
				: matchBy<TItem & Record<string, unknown>>(
						this.writeIdKey,
						//@ts-expect-error
						//NOTE(John): In our code, we need to make sure that a
						//write_id is always present for updates.
						write.value[this.writeIdKey],
					);

		const result = ResultAsync.fromPromise(
			matchStream(this.shapeStream, [write.operation], matchFn, TIMEOUT_MS),
			() => new Error(`Failed to match write ${write}`),
		).map(() => {
			// Map just to return `void`
			return;
		});
		return result;
	}

	get isUpToDate() {
		return this.shapeStream.isUpToDate;
	}
}

/**
 * RecordSyncStream
 *
 * A sync stream for our table records. We don't use ElectricSQL for this
 * because we can't use it to sync views to the client.
 */
export class RecordSyncStream extends SyncStream<ApiRecord, "link"> {
	tableId: TableId;
	userSessionId: SessionUserId;
	diffEventWaits: Map<WriteId, () => void> = new Map();
	eventSource: EventSource | null = null;
	// Can't just check eventSource because getToken is async, causing a
	// possible race condition
	isWatching = false;
	getToken: ReturnType<typeof useAuth>["getToken"];

	constructor(props: {
		tableId: TableId;
		userSessionId: SessionUserId;
		getToken: ReturnType<typeof useAuth>["getToken"];
	}) {
		super("link", new Map());
		this.tableId = props.tableId;
		this.userSessionId = props.userSessionId;
		this.getToken = props.getToken;
		makeAutoObservableAbstract(this);
	}

	/**
	 * For Records, we don't have write_ids in each row to track, so we just
	 * clear writes whenever we get a diff event. This is a hack, but we're
	 * hoping it's acceptable for now, assuming that local writes are synced
	 * nearly immediately, so any visual data loss is minimal.
	 */
	matchWrite(write: LocalWrite<ApiRecord, "link">): ResultAsync<void, Error> {
		return ResultAsync.fromPromise(
			new Promise((resolve, reject) => {
				const timeout = setTimeout(() => {
					reject(new Error(`Timeout matching write ${write}`));
				}, TIMEOUT_MS);
				this.diffEventWaits.set(write.id, () => {
					clearTimeout(timeout);
					resolve();
				});
			}),
			() => new Error(`Failed to match write ${write}`),
		);
	}

	/**
	 * Refresh the table's records from the server once.
	 *
	 * Useful when you need the table's latest data but don't need to
	 * live-update it.
	 *
	 * For example, when modifying a relationship cell, we need to display the
	 * foreign table's records, but it's okay to not have live updates.
	 *
	 * We don't want the refresh to succeed if we're already watching the
	 * table, because then the watch should manage the records.
	 */
	async refresh() {
		if (this.eventSource !== null) {
			return;
		}
		const response = await getTableRecordsRoute({
			table_id: this.tableId,
		});
		runInAction(() => {
			if (this.eventSource !== null) {
				return;
			}
			this.items = new Map(
				response.data.records.map((record) => [record.link, record]),
			);
		});
	}

	/**
	 * Start watching the table.
	 */
	start() {
		if (this.isWatching) {
			console.warn(
				"Tried to start watching table that's already being watched",
			);
			return;
		}
		this.isWatching = true;
		this.getToken().then((token) => {
			this.eventSource = new EventSource(
				`${API_ENDPOINT_HTTP}/tables/watch?table_id=${this.tableId}&user_session_id=${this.userSessionId}`,
				{
					fetch: (url, init) =>
						fetch(url, {
							...init,
							headers: {
								...init?.headers,
								Authorization: `Bearer ${token}`,
							},
						}),
				},
			);
			this.eventSource.addEventListener("diff", this.diffHandler);
			this.eventSource.addEventListener("error", (event) => {
				console.error("Error watching table", event);
			});
		});
	}

	diffHandler = (event: MessageEvent) => {
		runInAction(() => {
			// TOOD(John): validate?
			const data: DiffEvent = JSON.parse(event.data);
			if (data.table_id !== this.tableId) {
				console.error("Received diff event for table that we're not watching");
				return;
			}
			for (const resolve of this.diffEventWaits.values()) {
				resolve();
			}
			this.diffEventWaits.clear();
			switch (data.op) {
				case "insert": {
					this.items.set(data.record.link, data.record);
					break;
				}

				case "update": {
					const current = this.items.get(data.record_link);
					if (current === undefined) {
						console.error("Tried to update item that doesn't exist");
						return;
					}
					this.items.set(data.record_link, {
						...current,
						cell_values: {
							...current.cell_values,
							...data.updated_cell_values,
						},
						...(data.updated_record_order && {
							order: data.updated_record_order,
						}),
					});
					break;
				}

				case "delete": {
					this.items.delete(data.record_link);
					break;
				}

				case "reset": {
					this.items.clear();
					for (const record of data.new_records) {
						this.items.set(record.link, record);
					}
					break;
				}
			}
		});
	};

	/**
	 * Stop watching the table.
	 */
	stop() {
		this.isWatching = false;
		if (this.eventSource === null) {
			return;
		}
		this.eventSource.close();
		this.eventSource = null;
	}
}
