import { BehaviorSubject, combineLatestWith, finalize, from, Observable, of, Subject, switchMap, tap } from 'rxjs';
import { distinctUntilChanged, take, throttleTime } from 'rxjs/operators';
import { equals } from 'remeda';

export const RS_NO_CHANGE: any = {}; // use an object so it is a memory reference comparison

export type loadFn<T, V> = (t: T) => Promise<V | null> | Observable<V | null>;

// ReloadableSubject is deprecated, please use RouterFinn
export class ReloadableSubject<T, V> extends BehaviorSubject<V> {
  private throttle$$ = new Subject<null>();
  private currentTriggerValue: T;
  private networkActive$$ = new BehaviorSubject<boolean>(false);
  public networkActive$ = this.networkActive$$.asObservable();
  private nonTriggering$$ = new BehaviorSubject<V>(null);

  constructor(init: V, private loadFn: loadFn<T, V>, trigger$: Observable<T>) {
    super(init);
    this.nonTriggering$$.next(init);

    trigger$
      .pipe(
        distinctUntilChanged(equals),
        combineLatestWith(this.throttle$$.pipe(throttleTime(1000))),
        tap(([trigger]) => {
          if (!equals(this.currentTriggerValue, trigger)) {
            this.next(init);
          }
          this.currentTriggerValue = trigger;
        }),
        switchMap(([trigger]) => this.refreshData$(trigger)),
      )
      .subscribe({
        next: (output: V) => {
          if (output !== RS_NO_CHANGE) {
            this.next(output);
          }
        },
      });
  }

  // eslint-disable-next-line rxjs/finnish
  public override asObservable(): Observable<V> {
    return new Observable<V>((subscriber) => {
      const sub = this.subscribe(subscriber);
      this.throttle$$.next(null);
      return () => {
        sub.unsubscribe();
      };
    });
  }

  public override next(v: V) {
    super.next(v);
    this.nonTriggering$$.next(v);
  }

  public refresh(force = false) {
    if (force) {
      if (this.currentTriggerValue) {
        this.refreshData$(this.currentTriggerValue).subscribe({
          next: (output: V) => {
            if (output !== RS_NO_CHANGE) {
              this.next(output);
            }
          },
        });
      }
    } else {
      this.throttle$$.next(null);
    }
  }

  private refreshData$(input: T): Observable<V> {
    if (this.observed && !this.networkActive$$.value) {
      this.networkActive$$.next(true);
      return from(this.loadFn(input)).pipe(
        take(1), // shouldn't need the take, but don't want to risk a leak
        finalize(() => this.networkActive$$.next(false)),
      );
    }
    return of(this.value);
  }
}
