queue.js 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. 'use strict';
  2. var atoa = require('atoa');
  3. var a = require('./a');
  4. var once = require('./once');
  5. var emitter = require('./emitter');
  6. var debounce = require('./debounce');
  7. module.exports = function queue (worker, concurrency) {
  8. var q = [], load = 0, max = concurrency || 1, paused;
  9. var qq = emitter({
  10. push: manipulate.bind(null, 'push'),
  11. unshift: manipulate.bind(null, 'unshift'),
  12. pause: function pause () { paused = true; },
  13. resume: function resume () { paused = false; debounce(labor); },
  14. pending: q
  15. });
  16. if (Object.defineProperty && !Object.definePropertyPartial) {
  17. Object.defineProperty(qq, 'length', { get: function getter () { return q.length; } });
  18. }
  19. function manipulate (how, task, done) {
  20. var tasks = a(task) ? task : [task];
  21. tasks.forEach(function insert (t) { q[how]({ t: t, done: done }); });
  22. debounce(labor);
  23. }
  24. function labor () {
  25. if (paused || load >= max) { return; }
  26. if (!q.length) { if (load === 0) { qq.emit('drain'); } return; }
  27. load++;
  28. var job = q.pop();
  29. worker(job.t, once(complete.bind(null, job)));
  30. debounce(labor);
  31. }
  32. function complete (job) {
  33. load--;
  34. debounce(job.done, atoa(arguments, 1));
  35. debounce(labor);
  36. }
  37. return qq;
  38. };