import { Component, OnInit, Input, Output } from '@angular/core';
import { Observable, interval } from 'rxjs';

import {
  map,
  switchMap,
  tap,
  delay,
  startWith,
  pairwise,
  shareReplay,
} from 'rxjs/operators';
import { pick, omit } from 'lodash';
import { HttpClient } from '@angular/common/http';
import { LogService } from '@anderson-optimization/ui';
import { environment } from 'src/environments/environment';
import EventEmitter from 'events';

@Component({
  selector: 'dg-stream-consumer',
  templateUrl: './stream-consumer.component.html',
  styleUrls: ['./stream-consumer.component.scss'],
})
export class StreamConsumerComponent implements OnInit {
  @Input() stream: string;
  @Input() consumer: string;
  @Input() updateInterval: number = 10;
  @Output() onPurge: EventEmitter = new EventEmitter();

  constructor(
    protected http: HttpClient,
    protected log: LogService,
  ) {}

  stream$: Observable<any>;
  consumer$: Observable<any>;
  consumerPair$: Observable<any>;

  consumerKeys = ['num_ack_pending', 'num_redelivered', 'num_waiting', 'num_pending'];

  updating: boolean = false;

  lastConsumer = {};

  ngOnInit(): void {
    this.stream$ = interval(this.updateInterval * 1000).pipe(
      startWith(0),
      tap(() => (this.updating = true)),
      switchMap(() =>
        this.http.get(`${environment.gridDetect.url}/process/stream/${this.stream}`),
      ),
      shareReplay(1),
      // map(v => pick(v, this.consumerKeys)),
      delay(500),
      tap(() => (this.updating = false)),
    );
    this.consumer$ = interval(this.updateInterval * 1000).pipe(
      startWith(0),
      tap(() => (this.updating = true)),
      switchMap(() =>
        this.http.get(
          `${environment.gridDetect.url}/process/stream/${this.stream}/consumers/${this.consumer}`,
        ),
      ),
      shareReplay(1),
      map((v) => {
        return {
          ...pick(v, this.consumerKeys),
          ts: Date.now(),
        };
      }),
      delay(500),
      tap(() => (this.updating = false)),
    );
    this.consumerPair$ = this.consumer$.pipe(pairwise());
  }

  purge(event) {
    this.http
      .get(`${environment.gridDetect.url}/process/stream/${this.stream}/purge`)
      .subscribe((v) => {
        console.log('Purged stream!', v);
      });
    this.onPurge.emit(event);
  }

  diff(pair, key) {
    return pair[1][key] - pair[0][key];
  }
}
