let mitt = require('mitt');

const wait = async function(ms=0) {
  return new Promise(resolve => setTimeout(resolve, ms));
};

const chain = async promisesFns => {
  const r = [];
  for(var i=0; i<promisesFns.length; i++){
    let v = await promisesFns[i]();
    r.push(v);
  }
  return r;
}

const throttledParallelTasks = async function(tasks, options){
  var { batchSize=1, batchDelay=0, onStepComplete, onBatchComplete } = options || {};
  let results = [];
  batchSize = parseInt(batchSize);
  if(isNaN(batchSize) || batchSize < 1){
    throw new Error('batchSize must be a positive integer');
  }
  let i = 0;
  while(i < tasks.length){
    if(i){
      await wait(batchDelay);
    }
    let batchResults = await Promise.all(
      tasks
        .slice(i, i + batchSize)
        .map(
          async task => {
            const result = await task();
            if(typeof onStepComplete === 'function'){
              onStepComplete(result);
            }
            return result;
          }
        )
    );
    results.push(batchResults);
    if(typeof onBatchComplete === 'function'){
      onBatchComplete(batchResults);
    }
    i += batchSize;
  }
  return results;
}

// AN OBJECT TO KEEP TRACK OF ALL QUEUES IN MEMORY
const QUEUES = {};
function createAsyncQueue({namespace="global", MAX_NUM_PARALLEL_TASKS=200}){
  if(mitt && typeof mitt !== 'function'){
    mitt = mitt.default;
  }
  const eventEmitter = mitt();
  let tasks = [];

  function evaluate(){
    // remove non-function members which may have been passed in by mistake
    tasks = tasks.filter(t => typeof t === 'function');

    tasks
      .filter((t, i) => !t._pending && i<MAX_NUM_PARALLEL_TASKS)
      .forEach(
        async t => {
          t._pending = true;
          let result;
          function finish(){
            eventEmitter.emit('taskFinished', { t, result });
          };
          try {
            result = await t();
          }
          catch(err){
            finish();
            throw err;
          }
          finish();
        }
      );

    // for backwards compatibility, also save tasks to the `requests` key
    eventEmitter.tasks = eventEmitter.requests = tasks;
    if(!tasks.length && !eventEmitter._completed){
      eventEmitter._completed = true;
      eventEmitter.emit('completed');
    }
    else if(tasks.length) {
      eventEmitter._completed = false;
    }
  }

  // add one or more tasks
  eventEmitter.on(
    'add',
    t => {
      // coerce to array
      if(!Array.isArray(t)){
        t = [t];
      }
      tasks = tasks.concat(t);
    }
  );

  eventEmitter.on(
    'prepend',
    t => tasks.unshift(t)
  );

  eventEmitter.on(
    'insert',
    ({t, i=tasks.length-1}) => tasks.splice(i, 0, t)
  );

  eventEmitter.on(
    'taskFinished',
    ({ t }) => tasks = tasks.filter(_t => _t !== t)
  );

  eventEmitter.on(
    'clear',
    () => {
      tasks.forEach(cancelTask);
      tasks = []
    }
  );

  function cancelTask(t){
    if(typeof t.reject === 'function'){
      t.reject({
        cancelled: true
      });
      tasks = tasks.filter(_t => _t !== t);
    }
  }

  eventEmitter.on('*', evaluate);

  QUEUES[namespace] = eventEmitter;
  // for backwards compatibility, also save tasks to the `requests` key
  eventEmitter.tasks = eventEmitter.requests = tasks;
  eventEmitter.cancelTask = cancelTask;
  return eventEmitter;
}

function getAsyncQueue(params){
  const { namespace } = params;
  const existing = QUEUES[namespace];
  if(!existing){
    let q = createAsyncQueue(params);
    return q;
  }
  return existing;
}

function clearAsyncQueues({namespace}={}){  
  let queuesToClear = namespace
    ? [QUEUES[namespace]]
    : Object.keys(QUEUES).map(k => QUEUES[k]);
  return queuesToClear.map(
    q => {
      q.emit('clear');
    }
  );
}

module.exports = {
  wait,
  chain,
  throttledParallelTasks,
  createAsyncQueue,
  getAsyncQueue,
  clearAsyncQueues
};