import mergeEndWithAny from "@lib/util/frp/mergeEndWithAny";
import { Observable, distinctUntilChanged, interval, map, scan } from "rxjs";
import { Pin } from "../pin";

const UPDATE_PERIOD_MS = 50;
const UPDATE_BATCH_SIZE = 20;

type AccumulatorState<P extends Pin> = {
	currentList: P[];
	stillNeed: P[];
};

export default function staggerNewPins<P extends Pin>(pinsStream: Observable<P[]>): Observable<P[]> {
	const newPinsUpdateStream = pinsStream.pipe(
		map((pins) => (state: AccumulatorState<P>) => {
			// first pass and after list resets
			if (state.currentList.length === 0) {
				return {
					currentList: pins.slice(0, UPDATE_BATCH_SIZE),
					stillNeed: pins.slice(UPDATE_BATCH_SIZE),
				};
			}
			// new pin list
			const currentSet = new Set(state.currentList.map((pin) => pin.id));
			const nextSet = new Set(pins.map((pin) => pin.id));
			state.stillNeed = pins.filter((pin) => !currentSet.has(pin.id));
			state.currentList = state.currentList.filter((pin) => nextSet.has(pin.id));
			return state;
		}),
	);

	const periodicUpdateStream = interval(UPDATE_PERIOD_MS).pipe(
		map(() => (state: AccumulatorState<P>) => {
			if (state.stillNeed.length > 0) {
				state.currentList = state.currentList.concat(state.stillNeed.slice(0, UPDATE_BATCH_SIZE));
				state.stillNeed = state.stillNeed.slice(UPDATE_BATCH_SIZE);
			}
			return state;
		}),
	);

	return mergeEndWithAny(newPinsUpdateStream, periodicUpdateStream)
		.pipe(
			scan((acc, updator) => updator(acc), {
				currentList: [] as P[],
				stillNeed: [] as P[],
			}),
		)
		.pipe(map((state) => state.currentList))
		.pipe(distinctUntilChanged());
}
