import { HttpClient } from '@angular/common/http'
import { Injectable } from '@angular/core'

import { BehaviorSubject, Subject } from 'rxjs'
import { filter, take, map } from 'rxjs/operators'

import { AuthService } from './auth.service'
import { ConfigService } from './config.service'

interface Spark {
  on(action: string, cb: (data?: any) => void): void
  write(message: any): void
}

@Injectable()
export class PubSubService {
  private readonly API_BASE_URL: string
  private readonly SOCKET_BASE_URL: string
  private primusInstance = new BehaviorSubject<any>(null)
  private socket = this.primusInstance.pipe(filter<Spark>(Boolean))

  public events = new Subject<any>()
  public connected = new BehaviorSubject(false)

  constructor(
    public http: HttpClient,
    public authService: AuthService,
    public configService: ConfigService
  ) {
    const apiBaseUrl = this.configService.API_BASE_URL
    if (!apiBaseUrl) {
      throw new Error('API_BASE_URL is not provided')
    }
    this.API_BASE_URL = apiBaseUrl
    this.SOCKET_BASE_URL = this.API_BASE_URL.replace('http', 'ws') + '/ws'
  }

  /**
   * This method is eeded to not forget to
   * import socket library as soon as we can
   */
  public init() {
    this.loadPrimus()
    this.socket.subscribe(socket => {
      // console.log('Primus loaded')
      socket.on('open', () => {
        this.connected.next(true)
      })
      socket.on('data', message => {
        this.events.next(message)
      })
    })
  }

  public send(message) {
    this.socket.pipe(take(1)).subscribe(spark => {
      spark.write(message)
    })
  }

  /**
   * Wait for successful authorization and connect to WSS
   * @private
   */
  private initPrimus() {
    if (window['_socketInstance']) {
      // This allows to not duplicate events when in HMR-mode
      window['_socketInstance'].end()
    }
    this.authService.accessToken$
      .pipe(
        filter(Boolean),
        take(1),
        map(accessToken => {
          window['_socketInstance'] = window['Primus'].connect(
            this.SOCKET_BASE_URL + '?managerToken=' + accessToken
          )
          return window['_socketInstance']
        })
      )
      .subscribe(primus => {
        this.primusInstance.next(primus)
      })
  }

  private loadPrimus() {
    // console.log('Loading primus...')

    if (window['Primus']) {
      return this.initPrimus()
    }

    const script = document.createElement('script')
    script.type = 'text/javascript'
    script.async = true
    script.onload = () => this.initPrimus()
    script.src = `${this.API_BASE_URL}/ws/primus.js`
    document.getElementsByTagName('head')[0].appendChild(script)
  }
}
