// From https://github.com/KyleAMathews/electric-notes/blob/main/src/y-electric/index.ts

import {
	byteAToUint8Array,
	byteaToUint8ArrayLazy,
	timestamptzToDate,
} from "@/lib/y-electric/utils";
import {
	type Message,
	type Offset,
	ShapeStream,
	isChangeMessage,
} from "@electric-sql/client";
import { toBase64 } from "lib0/buffer";
import * as env from "lib0/environment";
import { ObservableV2 } from "lib0/observable";
import throttle from "lodash.throttle";
import * as awarenessProtocol from "y-protocols/awareness";
import * as Y from "yjs";

// These are supposed to be subsets of the schemas for their respective tables
// Probably want to validate messages to avoid annoying bugs
type OperationMessage = {
	op: Uint8Array;
};

type AwarenessMessage = {
	op: () => Uint8Array;
	client_id: string;
	updated_at: Date;
};

type ObservableProvider = {
	// Providing both sync and synced is a yjs convention
	sync: (state: boolean) => void;
	synced: (state: boolean) => void;
	status: (status: {
		status: `connecting` | `connected` | `disconnected`;
	}) => void;

	"connection-close": () => void;
};

// Values that exist when the provider is connected
type ConnectionState =
	| {
			status: "disconnected";
	  }
	| {
			status: "connecting";
			operationsStream: ShapeStream<OperationMessage>;
			// Only if awareness is configured
			awarenessStream?: ShapeStream<AwarenessMessage>;
	  }
	| {
			status: "connected";
			operationsStream: ShapeStream<OperationMessage>;
			awarenessStream?: ShapeStream<AwarenessMessage>;
			// Looping task that processes pending operations
			sendOperationsTimer: Timer;
	  };

export class ElectricProvider extends ObservableV2<ObservableProvider> {
	operationsStreamUrl: string;
	awarenessStreamUrl: string;
	getBearerToken: () => Promise<string>;
	postOperation: (op: string) => Promise<unknown>;
	postAwareness: (clientId: string, op: string) => Promise<unknown>;
	doc: Y.Doc;
	public awareness?: awarenessProtocol.Awareness;

	// Not null when connected
	private connectionState: ConnectionState = {
		status: "disconnected",
	};
	private _synced = false;

	// pendingOperations serves both as a way to throttle operations and also
	// to persist operations if we perform updates while offline.
	// Later, we can save these to indexeddb to make the doc fully local.
	private pendingOperations: Uint8Array[] = [];
	private minSendOperationIntervalMs = 1000;
	private minAwarenessIntervalMs = 1000;

	// Buffer for synced updates because we don't want to apply updates until
	// we are up-to-date
	private updatesBuffer: Uint8Array[] = [];

	// Only for Node environments
	private exitHandler?: () => void;

	// Last offset and handle for the shape streams
	// Useful even without persistence
	private streamState: {
		operations?: { offset: Offset; handle: string };
		awareness?: { offset: Offset; handle: string };
	} = {};

	private awarenessState: Record<string, number | string> | null = null;

	constructor({
		operationsStreamUrl,
		awarenessStreamUrl,
		postOperation,
		postAwareness,
		doc,
		awareness,
		getBearerToken,
	}: {
		operationsStreamUrl: string;
		awarenessStreamUrl: string;
		getBearerToken: () => Promise<string>;
		postOperation: (op: string) => Promise<unknown>;
		postAwareness: (clientId: string, op: string) => Promise<unknown>;
		doc: Y.Doc;
		awareness?: awarenessProtocol.Awareness;
	}) {
		super();

		this.operationsStreamUrl = operationsStreamUrl;
		this.awarenessStreamUrl = awarenessStreamUrl;
		this.getBearerToken = getBearerToken;
		this.postOperation = postOperation;
		this.postAwareness = postAwareness;
		this.doc = doc;
		this.doc.on("update", this.updateHandler);

		this.awareness = awareness;
		if (this.awareness) {
			this.awareness.on("update", this.awarenessUpdateHandler);
		}

		if (env.isNode && typeof process !== "undefined") {
			this.exitHandler = () => {
				(process as NodeJS.Process).on("exit", () => this.destroy());
			};
		}
		this.connect();
	}

	get synced() {
		return this._synced;
	}

	set synced(state) {
		if (this._synced !== state) {
			this._synced = state;
			this.emit("synced", [state]);
			this.emit("sync", [state]);
		}
	}

	destroy() {
		this.disconnect();
		this.doc.off("update", this.updateHandler);
		if (this.awareness && this.awarenessUpdateHandler) {
			this.awareness.off("update", this.awarenessUpdateHandler);
		}
		if (env.isNode && typeof process !== "undefined" && this.exitHandler) {
			(process as NodeJS.Process).off("exit", this.exitHandler);
		}
		super.destroy();
	}

	disconnect() {
		// TODO(John): i think there's a race condition if you try to destroy
		// too quickly? in that connectionState may not be set
		if (this.connectionState.status === "disconnected") {
			console.warn("Tried to disconnect ElectricProvider while not connected");
			return;
		}
		if (this.awareness) {
			this.awarenessState = this.awareness.getLocalState();

			awarenessProtocol.removeAwarenessStates(
				this.awareness,
				Array.from(this.awareness.getStates().keys()).filter(
					(client) => client !== this.doc.clientID,
				),
				this,
			);

			// try to notify other clients that we are disconnected
			awarenessProtocol.removeAwarenessStates(
				this.awareness,
				[this.doc.clientID],
				"local",
			);
		}

		this.connectionState.operationsStream.unsubscribeAll();
		this.connectionState.awarenessStream?.unsubscribeAll();
		this.connectionState = {
			status: "disconnected",
		};
		this.synced = false;
		this.emit("status", [{ status: "disconnected" }]);
		this.emit("connection-close", []);
	}

	connect() {
		if (this.connectionState.status !== "disconnected") {
			console.warn(
				"Tried to connect ElectricProvider while already connecting",
			);
			return;
		}

		this.setupShapeStream();

		// TODO(John): return to after understanding
		if (this.awareness && this.awarenessState !== null) {
			this.awareness.setLocalState(this.awarenessState);
			this.awarenessState = null;
		}
	}

	private updateHandler = (update: Uint8Array, origin: unknown) => {
		if (origin === "remote") {
			return;
		}
		this.pendingOperations.push(update);
	};

	private sendPendingOperations = async () => {
		const numOperations = this.pendingOperations.length;
		if (numOperations > 0) {
			const operations = this.pendingOperations.slice(0, numOperations);
			const mergedUpdate = Y.mergeUpdates(operations);
			const op = toBase64(mergedUpdate);
			try {
				await this.postOperation(op);
				this.pendingOperations = this.pendingOperations.slice(numOperations);
			} catch (e) {
				// TODO(John): should probably retry a max number of times
				// before disconnecting the provider? Or something else if we
				// want local persistence
				console.error("Failed to send operations. Will continue to retry.", e);
			}
		}

		if (this.connectionState.status === "connected") {
			this.connectionState.sendOperationsTimer = setTimeout(async () => {
				await this.sendPendingOperations();
			}, this.minSendOperationIntervalMs);
		}
	};

	private awarenessUpdateHandler = (
		{
			added,
			updated,
			removed,
		}: { added: number[]; updated: number[]; removed: number[] },
		origin: string,
	) => {
		// origin is local when awareness.setLocalState is called
		if (origin === "local") {
			const changedClients = added.concat(updated).concat(removed);
			this.throttledSendAwareness(changedClients);
		}
	};

	private throttledSendAwareness = throttle(
		(changedClients: number[]) => {
			if (!this.awareness) {
				throw new Error(
					"Tried to send awareness update while awareness is not configured",
				);
			}
			if (this.connectionState.status !== "connected") {
				return Promise.resolve();
			}
			const op = toBase64(
				awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients),
			);
			return this.postAwareness(this.doc.clientID.toString(), op);
		},
		this.minAwarenessIntervalMs,
		{ leading: false, trailing: true },
	);

	private setupShapeStream() {
		// Set up operations stream
		const operationsStream = new ShapeStream<OperationMessage>({
			url: this.operationsStreamUrl,
			parser: byteAToUint8Array,
			headers: {
				Authorization: async () => `Bearer ${await this.getBearerToken()}`,
			},
			...this.streamState.operations,
		});

		const saveStreamState = (
			name: "operations" | "awareness",
			offset: Offset,
			handle: string,
		) => {
			this.streamState[name] = { offset, handle };
		};

		operationsStream.subscribe((messages: Message<OperationMessage>[]) => {
			for (const message of messages) {
				if (isChangeMessage(message)) {
					switch (message.headers.operation) {
						case "insert": {
							this.updatesBuffer.push(message.value.op);
							break;
						}

						case "update":
						case "delete": {
							console.warn(
								`Received unexpected operation: ${message.headers.operation}`,
							);
							break;
						}
					}
				} else {
					switch (message.headers.control) {
						case "up-to-date": {
							const mergedUpdate = Y.mergeUpdates(this.updatesBuffer);
							Y.applyUpdate(this.doc, mergedUpdate, "remote");
							if (this.doc.store.pendingStructs?.update) {
								console.warn(
									"Applied updates but structs are pending! Updates might be corrupted",
								);
							}
							this.updatesBuffer = [];
							this.synced = true;
							break;
						}
						case "must-refetch": {
							// TODO(John): Not sure what I want to do here...
							// should the ydoc be reset?
							// Operations are idempotent so maybe we just allow
							// the old messages to reapply to our ydoc.
							console.warn("Received must-refetch control message.");
							break;
						}
					}
				}
				const offset = operationsStream.lastOffset;
				const handle = operationsStream.shapeHandle;
				if (offset && handle) {
					saveStreamState("operations", offset, handle);
				}
			}
		});

		let awarenessStream: ShapeStream<AwarenessMessage> | undefined;
		if (this.awareness) {
			awarenessStream = new ShapeStream<AwarenessMessage>({
				url: this.awarenessStreamUrl,
				parser: { ...byteaToUint8ArrayLazy, ...timestamptzToDate },
				headers: {
					Authorization: async () => `Bearer ${await this.getBearerToken()}`,
				},
				...this.streamState.awareness,
			});

			awarenessStream.subscribe((messages: Message<AwarenessMessage>[]) => {
				// Ignore messages that are too old and would be removed in the
				// next local update
				const minTime = new Date(
					Date.now() - awarenessProtocol.outdatedTimeout,
				);
				for (const message of messages) {
					if (isChangeMessage(message)) {
						if (
							message.value.updated_at < minTime ||
							message.value.client_id === this.doc.clientID.toString()
						) {
							continue;
						}
						// TODO(John): might want to buffer and merge all
						// updates at once like we do above
						awarenessProtocol.applyAwarenessUpdate(
							// biome-ignore lint/style/noNonNullAssertion: already checked
							this.awareness!,
							message.value.op(),
							"remote",
						);
					}
				}
				// biome-ignore lint/style/noNonNullAssertion: set above
				const offset = awarenessStream!.lastOffset;
				// biome-ignore lint/style/noNonNullAssertion: set above
				const handle = awarenessStream!.shapeHandle;
				if (offset && handle) {
					saveStreamState("awareness", offset, handle);
				}
			});
		}

		const onConnectUnsubscribe = operationsStream.subscribe(() => {
			this.connectionState = {
				status: "connected",
				operationsStream,
				awarenessStream,
				sendOperationsTimer: setTimeout(async () => {
					await this.sendPendingOperations();
				}, this.minSendOperationIntervalMs),
			};
			this.emit("status", [{ status: "connected" }]);
			onConnectUnsubscribe();
		});

		this.connectionState = {
			status: "connecting",
			operationsStream,
			awarenessStream,
		};
		this.emit("status", [{ status: "connecting" }]);
	}
}
