import { Queue } from '@bugseq-site/app/src/lib/queue'
import { BugSeqApiError } from '@bugseq-site/shared/src/lib/api/errors'

interface AsyncProcess<Type> {
  execute: () => Promise<Type>
}

interface WrappedProcess<Type> {
  item: AsyncProcess<Type>
  resolve: (t: Type) => any
  reject: (e: Error) => any
}

export class AsyncProcessor<Type> extends Queue<WrappedProcess<Type>> {
  private readonly concurrency: number
  private readonly retries: number
  private readonly minBackoffMs: number
  private readonly maxBackoffMs: number
  private readonly backoffCoefficient: number
  private readonly backoffJitterMs: number
  private readonly maxTotalErrors: number

  private readonly pendingPromises: Array<WrappedProcess<Type>>
  private totalErrorsSeen: number

  constructor (
    concurrency: number = 1,
    retries: number = 1,
    minBackoffMs: number = 10,
    maxBackoffMs: number = 5000,
    backoffCoefficient: number = 2,
    backoffJitterMs: number = 200,
    maxTotalErrors: number = 100
  ) {
    super()

    this.concurrency = concurrency
    this.retries = retries
    this.minBackoffMs = minBackoffMs
    this.maxBackoffMs = maxBackoffMs
    this.backoffCoefficient = backoffCoefficient
    this.backoffJitterMs = backoffJitterMs
    this.maxTotalErrors = maxTotalErrors

    this.pendingPromises = []
    this.totalErrorsSeen = 0
  }

  public async execute (item: AsyncProcess<Type>): Promise<Type> {
    return await new Promise((resolve, reject) => {
      super.enqueue({ item, resolve, reject })
      void this.executeIfPossible()
    })
  }

  public done (): boolean {
    return super.length === 0
  }

  private async executeIfPossible (): Promise<void> {
    if (this.pendingPromises.length >= this.concurrency) {
      return
    }

    const wrapped = super.dequeue()

    // queue is already empty
    if (wrapped === undefined) {
      return
    }

    this.pendingPromises.push(wrapped)

    try {
      await this.executeWithRetries(wrapped)
    } catch (e) {
      if (this.totalErrorsSeen >= this.maxTotalErrors) {
        // drain the whole queue
        while (true) {
          const w = super.dequeue()
          if (w === undefined) {
            break
          }

          w.reject(new Error('Too many errors. Aborting.'))
        }
      }
    } finally {
      const index = this.pendingPromises.indexOf(wrapped)
      if (index !== -1) {
        this.pendingPromises.splice(index, 1)
      }
      await this.executeIfPossible()
    }
  }

  private async executeWithRetries (wrapped: WrappedProcess<Type>): Promise<void> {
    for (let attempt = 0; attempt < this.retries; attempt++) {
      try {
        const resp = await wrapped.item.execute()
        wrapped.resolve(resp)
        return
      } catch (e: unknown) {
        this.totalErrorsSeen += 1

        // assume BugSeqApiError is non-retryable
        if (attempt === this.retries - 1 || e instanceof BugSeqApiError) {
          if (typeof e === 'string') {
            wrapped.reject(new Error(e))
          } else if (e instanceof Error) {
            wrapped.reject(e)
          } else {
            throw e
          }
          return
        } else if (this.totalErrorsSeen >= this.maxTotalErrors) {
          wrapped.reject(new Error('Too many errors. Aborting.'))
          return
        }
      }

      let backoffTime = this.minBackoffMs
      backoffTime *= Math.pow(this.backoffCoefficient, attempt)
      backoffTime = Math.min(backoffTime, this.maxBackoffMs)
      const jitter = Math.floor(Math.random() * this.backoffJitterMs)
      backoffTime += jitter
      await new Promise((resolve) => setTimeout(resolve, backoffTime))
    }
  }
}
