import { makeAutoObservableAbstract } from "@/lib/make-auto-observable-abstract";
import type { Transaction } from "@/lib/sync/action-executor";
import type { LocalUpdate, LocalWrite, PKey } from "@/lib/sync/local-write";
import {
	ItemMap,
	MATCH_WRITE_TIMEOUT_MS,
	OptimisticMap,
} from "@/lib/sync/optimistic-map";
import type { WriteId } from "@api/schemas";
import {
	type ChangeMessage,
	ShapeStream,
	isChangeMessage,
} from "@electric-sql/client";
import { makeAutoObservable } from "mobx";
import type { Result } from "neverthrow";

/**
 * ElectricSyncedMap
 *
 * Uses ElectricSQL to sync a map. Requires that the item has a write_id field
 * to use to match writes.
 *
 * We implement this ourselves because it's difficult to get Electric's Shape
 * to be MobX reactive.
 *
 * The buffer is used because the documentation indicates that updates that may
 * be reactive should not be processed until the "up-to-date" message is
 * received. It may take multiple requests for this.
 *
 */
export class ElectricSyncedMap<
	TItem extends object,
	TPKeyFields extends (keyof TItem)[],
> extends ItemMap<TItem, TPKeyFields> {
	shapeStream: ShapeStream<TItem & Record<string, unknown>>;
	private bufferedChangeMessages: ChangeMessage<
		TItem & Record<string, unknown>
	>[] = [];
	private onProcessMessage?: (
		message: ChangeMessage<TItem & Record<string, unknown>>,
	) => void;
	/**
	 * Constructs an instance of ElectricSyncStream.
	 *
	 * @param props - An object containing the following properties:
	 *   - shapeUrl: The URL of the shape to subscribe to.
	 *   - shapeHash: Hash of the shape's schema for invalidating cached requests.
	 *   - pKeyFields: The fields used to identify items in the map.
	 */
	constructor(props: {
		shapeUrl: string;
		shapeHash: string;
		pKeyFields: TPKeyFields;
		onProcessMessage?: (
			message: ChangeMessage<TItem & Record<string, unknown>>,
		) => void;
		getBearerToken: () => Promise<string>;
	}) {
		super({ pKeyFields: props.pKeyFields });
		makeAutoObservableAbstract(this);
		this.onProcessMessage = props.onProcessMessage;
		this.shapeStream = new ShapeStream({
			url: `${props.shapeUrl}?hash=${props.shapeHash}`,
			headers: {
				Authorization: async () => `Bearer ${await props.getBearerToken()}`,
			},
		});
		// Adapted from https://github.com/electric-sql/electric/blob/main/packages/typescript-client/src/shape.ts
		this.shapeStream.subscribe((messages) => {
			for (const message of messages) {
				if (isChangeMessage(message)) {
					this.bufferedChangeMessages.push(message);
				} else {
					switch (message.headers.control) {
						case "up-to-date":
							this.processBuffer();
							break;

						case "must-refetch":
							this.bufferedChangeMessages = [];
							this.items.clear();
							break;
					}
				}
			}
		});
	}

	processBuffer() {
		const messagesToProcess = [...this.bufferedChangeMessages];
		this.bufferedChangeMessages = [];

		for (const message of messagesToProcess) {
			this.onProcessMessage?.(message);
			switch (message.headers.operation) {
				case "insert": {
					const item = message.value as TItem;
					this.items.set(this.extractPkeyStr(item), item);
					break;
				}

				case "update": {
					const pKeyStr = this.extractPkeyStr(message.value);
					const current = this.items.get(pKeyStr);
					if (current === undefined) {
						console.error("Tried to update item that doesn't exist");
						continue;
					}
					this.items.set(pKeyStr, {
						...current,
						...message.value,
					});
					break;
				}

				case "delete":
					this.items.delete(this.extractPkeyStr(message.value));
					break;
			}
		}
	}

	get isUpToDate() {
		return this.shapeStream.isUpToDate;
	}
}

/**
 * Composes the ElectricSyncStream and OptimisticMap.
 *
 * Useful so we can enforce that update values include a new write_id field.
 */
export class ElectricOptimisticMap<
	TItem extends object,
	TPKeyFields extends (keyof TItem)[],
	TWriteIdField extends Exclude<keyof TItem, TPKeyFields[number]> & string,
> {
	/**
	 * The server-authoritative map responsible for syncing a shape from our
	 * database. Not modified by client-side writes.
	 */
	syncedMap: ElectricSyncedMap<TItem, TPKeyFields>;
	/**
	 * Optimistic map responsible for coalescing the `syncedMap` with pending
	 * client-side writes.
	 */
	private optimisticMap: OptimisticMap<TItem, TPKeyFields>;

	/**
	 * Field in each item containing the write ID used for confirmation.
	 */
	private writeIdField: TWriteIdField;

	/**
	 * Stores pending match functions for writes. Keyed by the write index.
	 *
	 * When a change message arrives, we loop through all pending match functions
	 * and see if any match. If a match is found, we clear the timeout and remove
	 * the match function.
	 *
	 * (Otherwise, if a timeout occurs, we log an error and remove the match
	 * function.)
	 */
	private matchingWrites: Map<
		number,
		(message: ChangeMessage<TItem & Record<string, unknown>>) => void
	> = new Map();

	constructor(props: {
		shapeUrl: string;
		pKeyFields: TPKeyFields;
		writeIdField: TWriteIdField;
		shapeHash: string;
		getBearerToken: () => Promise<string>;
	}) {
		makeAutoObservable(this);
		this.syncedMap = new ElectricSyncedMap({
			shapeUrl: props.shapeUrl,
			pKeyFields: props.pKeyFields,
			shapeHash: props.shapeHash,
			onProcessMessage: (message) => {
				for (const tryMatchFn of this.matchingWrites.values()) {
					tryMatchFn(message);
				}
			},
			getBearerToken: props.getBearerToken,
		});
		this.writeIdField = props.writeIdField;
		this.optimisticMap = new OptimisticMap({
			itemMap: this.syncedMap,
			matchWrite: this.matchWrite,
		});
	}

	/**
	 * Callback for registering a watch function for confirmation of server-side
	 * persistence.
	 */
	matchWrite = (
		write: LocalWrite<TItem, TPKeyFields>,
		removeWrite: () => void,
	) => {
		let matchFn: (
			message: ChangeMessage<TItem & Record<string, unknown>>,
		) => boolean;

		// Delete messages don't contain write_ids, so we match by the pkey
		switch (write.operation) {
			case "delete": {
				matchFn = (message) => {
					return (
						this.syncedMap.pKeyToStr(write.pKey) ===
						this.syncedMap.extractPkeyStr(message.value)
					);
				};
				break;
			}
			case "update":
			case "insert": {
				matchFn = (message) => {
					return (
						write.value[this.writeIdField] === message.value[this.writeIdField]
					);
				};
				break;
			}
		}

		const timeout = setTimeout(() => {
			removeWrite();
			this.matchingWrites.delete(write.index);
			console.error("Write matching timed out");
		}, MATCH_WRITE_TIMEOUT_MS);

		this.matchingWrites.set(write.index, (message) => {
			if (matchFn(message)) {
				clearTimeout(timeout);
				this.matchingWrites.delete(write.index);
				removeWrite();
			}
		});
	};

	insert(tx: Transaction, item: TItem): void {
		this.optimisticMap.insert(tx, item);
	}

	update(
		tx: Transaction,
		pKey: PKey<TItem, TPKeyFields>,
		value: LocalUpdate<TItem, TPKeyFields>["value"] &
			Record<TWriteIdField, WriteId>,
	): void {
		this.optimisticMap.update(tx, pKey, value);
	}

	delete(tx: Transaction, pKey: PKey<TItem, TPKeyFields>): void {
		this.optimisticMap.delete(tx, pKey);
	}

	get(pKey: PKey<TItem, TPKeyFields>): Result<TItem, Error> {
		return this.optimisticMap.get(pKey);
	}

	get keys(): PKey<TItem, TPKeyFields>[] {
		return this.optimisticMap.keys;
	}

	values(): TItem[] {
		return this.optimisticMap.values();
	}

	has(pKey: PKey<TItem, TPKeyFields>): boolean {
		return this.optimisticMap.has(pKey);
	}
}
