
export type ObserverCallback<V = unknown> = (value: V | void) => void;

/**
 * Defines an ultra simple observable interface.
 * Used to subscribe to remote state changes where
 * React Contexts or props can't be used.
 */
export interface BasicObservable<V = unknown> {


    /** Emit the next value for subscribers */
    next: (value: V | void) => void;

    /** Subscribes the callback to value changes */
    subscribe: (cb: ObserverCallback<V>) => () => void;

    /** Unsubscribes the callback from value changes */
    unsubscribe: (cb: ObserverCallback<V>) => void;

    /** Closes the stream, unsubscribing all listeners and resetting state. */
    close: () => void;

}

/**
 * Extends the basic observable by allowing
 * the persistance of the last emitted value.
 * New subscribers will immediately be executed
 * with the current value.
 */
export interface BasicPersistedObservable<V = unknown> extends BasicObservable<V> {

    /** Returns the current value */
    getValue: () => V | void;

}

abstract class AbstractObservable<V = unknown> implements BasicObservable<V> {

    protected _currentValue: V | void = undefined;

    private _subscriptions: Set<ObserverCallback<V>> = new Set();

    public next(value: V | void) {
        this._currentValue = value;
        this._emit();
    }

    public subscribe(cb: ObserverCallback<V>): () => void {
        this._subscriptions.add(cb);
        return () => this.unsubscribe(cb);
    }

    public unsubscribe(cb: ObserverCallback<V>): void {
        this._subscriptions.delete(cb);
    }

    public close() {
        this._currentValue = undefined;
        this._subscriptions.clear();
    }

    private _emit(value: V | void = this._currentValue) {
        this._subscriptions.forEach(sub => sub(value))
    }

}

export type CloneFunction<V = unknown> = (currentValue: V | void) => V | void

/**
 * Provides a basic implementation of the BasicObservable.
 *
 * Allows provisioning a custom 'clone' function so the 'read'
 * operation can return a new value, not a reference. By default,
 * a reference is returned.
 */
export class SimpleBehaviorSubject<V = unknown> extends AbstractObservable<V> implements BasicPersistedObservable<V>  {

    private _clone: CloneFunction<V> = v => v;

    constructor(cloneFn?: CloneFunction<V>) {
        super();
        if (typeof cloneFn === 'function') {
            this._clone = cloneFn;
        }
    }

    public getValue(): V | void {
        return this._clone(this._currentValue);
    }

    public subscribe(cb: ObserverCallback<V>): () => void {
        cb(this._currentValue);
        return super.subscribe(cb);
    }

}


export class SimpleReplaySubject<V = unknown> extends AbstractObservable<V> implements BasicObservable<V> {

    private _history: (V | void)[] = [];

    public subscribe(cb: ObserverCallback<V>): () => void {
        this._history.forEach(v => cb(v));
        return super.subscribe(cb);
    }

    public next(value: V | void): void {
        this._history.push(value);
        return super.next(value);
    }

}
