8889841ctest/promise.js000066600000012466150435262460007563 0ustar00'use strict' const test = require('tape') const buildQueue = require('../').promise const { promisify } = require('util') const sleep = promisify(setTimeout) const immediate = promisify(setImmediate) test('concurrency', function (t) { t.plan(2) t.throws(buildQueue.bind(null, worker, 0)) t.doesNotThrow(buildQueue.bind(null, worker, 1)) async function worker (arg) { return true } }) test('worker execution', async function (t) { const queue = buildQueue(worker, 1) const result = await queue.push(42) t.equal(result, true, 'result matches') async function worker (arg) { t.equal(arg, 42) return true } }) test('limit', async function (t) { const queue = buildQueue(worker, 1) const [res1, res2] = await Promise.all([queue.push(10), queue.push(0)]) t.equal(res1, 10, 'the result matches') t.equal(res2, 0, 'the result matches') async function worker (arg) { await sleep(arg) return arg } }) test('multiple executions', async function (t) { const queue = buildQueue(worker, 1) const toExec = [1, 2, 3, 4, 5] const expected = ['a', 'b', 'c', 'd', 'e'] let count = 0 await Promise.all(toExec.map(async function (task, i) { const result = await queue.push(task) t.equal(result, expected[i], 'the result matches') })) async function worker (arg) { t.equal(arg, toExec[count], 'arg matches') return expected[count++] } }) test('drained', async function (t) { const queue = buildQueue(worker, 2) const toExec = new Array(10).fill(10) let count = 0 async function worker (arg) { await sleep(arg) count++ } toExec.forEach(function (i) { queue.push(i) }) await queue.drained() t.equal(count, toExec.length) toExec.forEach(function (i) { queue.push(i) }) await queue.drained() t.equal(count, toExec.length * 2) }) test('drained with exception should not throw', async function (t) { const queue = buildQueue(worker, 2) const toExec = new Array(10).fill(10) async function worker () { throw new Error('foo') } toExec.forEach(function (i) { queue.push(i) }) await queue.drained() }) test('drained with drain function', async function (t) { let drainCalled = false const queue = buildQueue(worker, 2) queue.drain = function () { drainCalled = true } const toExec = new Array(10).fill(10) let count = 0 async function worker (arg) { await sleep(arg) count++ } toExec.forEach(function () { queue.push() }) await queue.drained() t.equal(count, toExec.length) t.equal(drainCalled, true) }) test('drained while idle should resolve', async function (t) { const queue = buildQueue(worker, 2) async function worker (arg) { await sleep(arg) } await queue.drained() }) test('drained while idle should not call the drain function', async function (t) { let drainCalled = false const queue = buildQueue(worker, 2) queue.drain = function () { drainCalled = true } async function worker (arg) { await sleep(arg) } await queue.drained() t.equal(drainCalled, false) }) test('set this', async function (t) { t.plan(1) const that = {} const queue = buildQueue(that, worker, 1) await queue.push(42) async function worker (arg) { t.equal(this, that, 'this matches') } }) test('unshift', async function (t) { const queue = buildQueue(worker, 1) const expected = [1, 2, 3, 4] await Promise.all([ queue.push(1), queue.push(4), queue.unshift(3), queue.unshift(2) ]) t.is(expected.length, 0) async function worker (arg) { t.equal(expected.shift(), arg, 'tasks come in order') } }) test('push with worker throwing error', async function (t) { t.plan(5) const q = buildQueue(async function (task, cb) { throw new Error('test error') }, 1) q.error(function (err, task) { t.ok(err instanceof Error, 'global error handler should catch the error') t.match(err.message, /test error/, 'error message should be "test error"') t.equal(task, 42, 'The task executed should be passed') }) try { await q.push(42) } catch (err) { t.ok(err instanceof Error, 'push callback should catch the error') t.match(err.message, /test error/, 'error message should be "test error"') } }) test('unshift with worker throwing error', async function (t) { t.plan(2) const q = buildQueue(async function (task, cb) { throw new Error('test error') }, 1) try { await q.unshift(42) } catch (err) { t.ok(err instanceof Error, 'push callback should catch the error') t.match(err.message, /test error/, 'error message should be "test error"') } }) test('no unhandledRejection (push)', async function (t) { function handleRejection () { t.fail('unhandledRejection') } process.once('unhandledRejection', handleRejection) const q = buildQueue(async function (task, cb) { throw new Error('test error') }, 1) q.push(42) await immediate() process.removeListener('unhandledRejection', handleRejection) }) test('no unhandledRejection (unshift)', async function (t) { function handleRejection () { t.fail('unhandledRejection') } process.once('unhandledRejection', handleRejection) const q = buildQueue(async function (task, cb) { throw new Error('test error') }, 1) q.unshift(42) await immediate() process.removeListener('unhandledRejection', handleRejection) }) test/tsconfig.json000066600000000232150435262460010242 0ustar00{ "compilerOptions": { "target": "es6", "module": "commonjs", "noEmit": true, "strict": true }, "files": [ "./example.ts" ] } test/test.js000066600000026550150435262460007063 0ustar00'use strict' /* eslint-disable no-var */ var test = require('tape') var buildQueue = require('../') test('concurrency', function (t) { t.plan(2) t.throws(buildQueue.bind(null, worker, 0)) t.doesNotThrow(buildQueue.bind(null, worker, 1)) function worker (arg, cb) { cb(null, true) } }) test('worker execution', function (t) { t.plan(3) var queue = buildQueue(worker, 1) queue.push(42, function (err, result) { t.error(err, 'no error') t.equal(result, true, 'result matches') }) function worker (arg, cb) { t.equal(arg, 42) cb(null, true) } }) test('limit', function (t) { t.plan(4) var expected = [10, 0] var queue = buildQueue(worker, 1) queue.push(10, result) queue.push(0, result) function result (err, arg) { t.error(err, 'no error') t.equal(arg, expected.shift(), 'the result matches') } function worker (arg, cb) { setTimeout(cb, arg, null, arg) } }) test('multiple executions', function (t) { t.plan(15) var queue = buildQueue(worker, 1) var toExec = [1, 2, 3, 4, 5] var count = 0 toExec.forEach(function (task) { queue.push(task, done) }) function done (err, result) { t.error(err, 'no error') t.equal(result, toExec[count - 1], 'the result matches') } function worker (arg, cb) { t.equal(arg, toExec[count], 'arg matches') count++ setImmediate(cb, null, arg) } }) test('multiple executions, one after another', function (t) { t.plan(15) var queue = buildQueue(worker, 1) var toExec = [1, 2, 3, 4, 5] var count = 0 queue.push(toExec[0], done) function done (err, result) { t.error(err, 'no error') t.equal(result, toExec[count - 1], 'the result matches') if (count < toExec.length) { queue.push(toExec[count], done) } } function worker (arg, cb) { t.equal(arg, toExec[count], 'arg matches') count++ setImmediate(cb, null, arg) } }) test('set this', function (t) { t.plan(3) var that = {} var queue = buildQueue(that, worker, 1) queue.push(42, function (err, result) { t.error(err, 'no error') t.equal(this, that, 'this matches') }) function worker (arg, cb) { t.equal(this, that, 'this matches') cb(null, true) } }) test('drain', function (t) { t.plan(4) var queue = buildQueue(worker, 1) var worked = false queue.push(42, function (err, result) { t.error(err, 'no error') t.equal(result, true, 'result matches') }) queue.drain = function () { t.equal(true, worked, 'drained') } function worker (arg, cb) { t.equal(arg, 42) worked = true setImmediate(cb, null, true) } }) test('pause && resume', function (t) { t.plan(7) var queue = buildQueue(worker, 1) var worked = false t.notOk(queue.paused, 'it should not be paused') queue.pause() queue.push(42, function (err, result) { t.error(err, 'no error') t.equal(result, true, 'result matches') }) t.notOk(worked, 'it should be paused') t.ok(queue.paused, 'it should be paused') queue.resume() queue.resume() // second resume is a no-op t.notOk(queue.paused, 'it should not be paused') function worker (arg, cb) { t.equal(arg, 42) worked = true cb(null, true) } }) test('pause in flight && resume', function (t) { t.plan(9) var queue = buildQueue(worker, 1) var expected = [42, 24] t.notOk(queue.paused, 'it should not be paused') queue.push(42, function (err, result) { t.error(err, 'no error') t.equal(result, true, 'result matches') t.ok(queue.paused, 'it should be paused') process.nextTick(function () { queue.resume() }) }) queue.push(24, function (err, result) { t.error(err, 'no error') t.equal(result, true, 'result matches') t.notOk(queue.paused, 'it should not be paused') }) queue.pause() function worker (arg, cb) { t.equal(arg, expected.shift()) process.nextTick(function () { cb(null, true) }) } }) test('altering concurrency', function (t) { t.plan(7) var queue = buildQueue(worker, 1) var count = 0 queue.pause() queue.push(24, workDone) queue.push(24, workDone) queue.concurrency = 2 queue.resume() t.equal(queue.running(), 2, '2 jobs running') function workDone (err, result) { t.error(err, 'no error') t.equal(result, true, 'result matches') } function worker (arg, cb) { t.equal(0, count, 'works in parallel') setImmediate(function () { count++ cb(null, true) }) } }) test('idle()', function (t) { t.plan(12) var queue = buildQueue(worker, 1) t.ok(queue.idle(), 'queue is idle') queue.push(42, function (err, result) { t.error(err, 'no error') t.equal(result, true, 'result matches') t.notOk(queue.idle(), 'queue is not idle') }) queue.push(42, function (err, result) { t.error(err, 'no error') t.equal(result, true, 'result matches') // it will go idle after executing this function setImmediate(function () { t.ok(queue.idle(), 'queue is now idle') }) }) t.notOk(queue.idle(), 'queue is not idle') function worker (arg, cb) { t.notOk(queue.idle(), 'queue is not idle') t.equal(arg, 42) setImmediate(cb, null, true) } }) test('saturated', function (t) { t.plan(9) var queue = buildQueue(worker, 1) var preworked = 0 var worked = 0 queue.saturated = function () { t.pass('saturated') t.equal(preworked, 1, 'started 1 task') t.equal(worked, 0, 'worked zero task') } queue.push(42, done) queue.push(42, done) function done (err, result) { t.error(err, 'no error') t.equal(result, true, 'result matches') } function worker (arg, cb) { t.equal(arg, 42) preworked++ setImmediate(function () { worked++ cb(null, true) }) } }) test('length', function (t) { t.plan(7) var queue = buildQueue(worker, 1) t.equal(queue.length(), 0, 'nothing waiting') queue.push(42, done) t.equal(queue.length(), 0, 'nothing waiting') queue.push(42, done) t.equal(queue.length(), 1, 'one task waiting') queue.push(42, done) t.equal(queue.length(), 2, 'two tasks waiting') function done (err, result) { t.error(err, 'no error') } function worker (arg, cb) { setImmediate(function () { cb(null, true) }) } }) test('getQueue', function (t) { t.plan(10) var queue = buildQueue(worker, 1) t.equal(queue.getQueue().length, 0, 'nothing waiting') queue.push(42, done) t.equal(queue.getQueue().length, 0, 'nothing waiting') queue.push(42, done) t.equal(queue.getQueue().length, 1, 'one task waiting') t.equal(queue.getQueue()[0], 42, 'should be equal') queue.push(43, done) t.equal(queue.getQueue().length, 2, 'two tasks waiting') t.equal(queue.getQueue()[0], 42, 'should be equal') t.equal(queue.getQueue()[1], 43, 'should be equal') function done (err, result) { t.error(err, 'no error') } function worker (arg, cb) { setImmediate(function () { cb(null, true) }) } }) test('unshift', function (t) { t.plan(8) var queue = buildQueue(worker, 1) var expected = [1, 2, 3, 4] queue.push(1, done) queue.push(4, done) queue.unshift(3, done) queue.unshift(2, done) function done (err, result) { t.error(err, 'no error') } function worker (arg, cb) { t.equal(expected.shift(), arg, 'tasks come in order') setImmediate(function () { cb(null, true) }) } }) test('unshift && empty', function (t) { t.plan(2) var queue = buildQueue(worker, 1) var completed = false queue.pause() queue.empty = function () { t.notOk(completed, 'the task has not completed yet') } queue.unshift(1, done) queue.resume() function done (err, result) { completed = true t.error(err, 'no error') } function worker (arg, cb) { setImmediate(function () { cb(null, true) }) } }) test('push && empty', function (t) { t.plan(2) var queue = buildQueue(worker, 1) var completed = false queue.pause() queue.empty = function () { t.notOk(completed, 'the task has not completed yet') } queue.push(1, done) queue.resume() function done (err, result) { completed = true t.error(err, 'no error') } function worker (arg, cb) { setImmediate(function () { cb(null, true) }) } }) test('kill', function (t) { t.plan(5) var queue = buildQueue(worker, 1) var expected = [1] var predrain = queue.drain queue.drain = function drain () { t.fail('drain should never be called') } queue.push(1, done) queue.push(4, done) queue.unshift(3, done) queue.unshift(2, done) queue.kill() function done (err, result) { t.error(err, 'no error') setImmediate(function () { t.equal(queue.length(), 0, 'no queued tasks') t.equal(queue.running(), 0, 'no running tasks') t.equal(queue.drain, predrain, 'drain is back to default') }) } function worker (arg, cb) { t.equal(expected.shift(), arg, 'tasks come in order') setImmediate(function () { cb(null, true) }) } }) test('killAndDrain', function (t) { t.plan(6) var queue = buildQueue(worker, 1) var expected = [1] var predrain = queue.drain queue.drain = function drain () { t.pass('drain has been called') } queue.push(1, done) queue.push(4, done) queue.unshift(3, done) queue.unshift(2, done) queue.killAndDrain() function done (err, result) { t.error(err, 'no error') setImmediate(function () { t.equal(queue.length(), 0, 'no queued tasks') t.equal(queue.running(), 0, 'no running tasks') t.equal(queue.drain, predrain, 'drain is back to default') }) } function worker (arg, cb) { t.equal(expected.shift(), arg, 'tasks come in order') setImmediate(function () { cb(null, true) }) } }) test('pause && idle', function (t) { t.plan(11) var queue = buildQueue(worker, 1) var worked = false t.notOk(queue.paused, 'it should not be paused') t.ok(queue.idle(), 'should be idle') queue.pause() queue.push(42, function (err, result) { t.error(err, 'no error') t.equal(result, true, 'result matches') }) t.notOk(worked, 'it should be paused') t.ok(queue.paused, 'it should be paused') t.notOk(queue.idle(), 'should not be idle') queue.resume() t.notOk(queue.paused, 'it should not be paused') t.notOk(queue.idle(), 'it should not be idle') function worker (arg, cb) { t.equal(arg, 42) worked = true process.nextTick(cb.bind(null, null, true)) process.nextTick(function () { t.ok(queue.idle(), 'is should be idle') }) } }) test('push without cb', function (t) { t.plan(1) var queue = buildQueue(worker, 1) queue.push(42) function worker (arg, cb) { t.equal(arg, 42) cb() } }) test('unshift without cb', function (t) { t.plan(1) var queue = buildQueue(worker, 1) queue.unshift(42) function worker (arg, cb) { t.equal(arg, 42) cb() } }) test('push with worker throwing error', function (t) { t.plan(5) var q = buildQueue(function (task, cb) { cb(new Error('test error'), null) }, 1) q.error(function (err, task) { t.ok(err instanceof Error, 'global error handler should catch the error') t.match(err.message, /test error/, 'error message should be "test error"') t.equal(task, 42, 'The task executed should be passed') }) q.push(42, function (err) { t.ok(err instanceof Error, 'push callback should catch the error') t.match(err.message, /test error/, 'error message should be "test error"') }) }) test/example.ts000066600000002645150435262460007550 0ustar00import * as fastq from '../' import { promise as queueAsPromised } from '../' // Basic example const queue = fastq(worker, 1) queue.push('world', (err, result) => { if (err) throw err console.log('the result is', result) }) queue.push('push without cb') queue.concurrency queue.drain() queue.empty = () => undefined console.log('the queue tasks are', queue.getQueue()) queue.idle() queue.kill() queue.killAndDrain() queue.length queue.pause() queue.resume() queue.saturated = () => undefined queue.unshift('world', (err, result) => { if (err) throw err console.log('the result is', result) }) queue.unshift('unshift without cb') function worker(task: any, cb: fastq.done) { cb(null, 'hello ' + task) } // Generics example interface GenericsContext { base: number; } const genericsQueue = fastq({ base: 6 }, genericsWorker, 1) genericsQueue.push(7, (err, done) => { if (err) throw err console.log('the result is', done) }) genericsQueue.unshift(7, (err, done) => { if (err) throw err console.log('the result is', done) }) function genericsWorker(this: GenericsContext, task: number, cb: fastq.done) { cb(null, 'the meaning of life is ' + (this.base * task)) } const queue2 = queueAsPromised(asyncWorker, 1) async function asyncWorker(task: any) { return 'hello ' + task } async function run () { await queue.push(42) await queue.unshift(42) } run() example.js000066600000000356150435262460006554 0ustar00'use strict' /* eslint-disable no-var */ var queue = require('./')(worker, 1) queue.push(42, function (err, result) { if (err) { throw err } console.log('the result is', result) }) function worker (arg, cb) { cb(null, 42 * 2) } README.md000066600000020177150435262460006045 0ustar00# fastq ![ci][ci-url] [![npm version][npm-badge]][npm-url] [![Dependency Status][david-badge]][david-url] Fast, in memory work queue. Benchmarks (1 million tasks): * setImmediate: 812ms * fastq: 854ms * async.queue: 1298ms * neoAsync.queue: 1249ms Obtained on node 12.16.1, on a dedicated server. If you need zero-overhead series function call, check out [fastseries](http://npm.im/fastseries). For zero-overhead parallel function call, check out [fastparallel](http://npm.im/fastparallel). [![js-standard-style](https://raw.githubusercontent.com/feross/standard/master/badge.png)](https://github.com/feross/standard) * Installation * Usage * API * Licence & copyright ## Install `npm i fastq --save` ## Usage (callback API) ```js 'use strict' const queue = require('fastq')(worker, 1) queue.push(42, function (err, result) { if (err) { throw err } console.log('the result is', result) }) function worker (arg, cb) { cb(null, arg * 2) } ``` ## Usage (promise API) ```js const queue = require('fastq').promise(worker, 1) async function worker (arg) { return arg * 2 } async function run () { const result = await queue.push(42) console.log('the result is', result) } run() ``` ### Setting "this" ```js 'use strict' const that = { hello: 'world' } const queue = require('fastq')(that, worker, 1) queue.push(42, function (err, result) { if (err) { throw err } console.log(this) console.log('the result is', result) }) function worker (arg, cb) { console.log(this) cb(null, arg * 2) } ``` ### Using with TypeScript (callback API) ```ts 'use strict' import * as fastq from "fastq"; import type { queue, done } from "fastq"; type Task = { id: number } const q: queue = fastq(worker, 1) q.push({ id: 42}) function worker (arg: Task, cb: done) { console.log(arg.id) cb(null) } ``` ### Using with TypeScript (promise API) ```ts 'use strict' import * as fastq from "fastq"; import type { queueAsPromised } from "fastq"; type Task = { id: number } const q: queueAsPromised = fastq.promise(asyncWorker, 1) q.push({ id: 42}).catch((err) => console.error(err)) async function asyncWorker (arg: Task): Promise { // No need for a try-catch block, fastq handles errors automatically console.log(arg.id) } ``` ## API * fastqueue() * queue#push() * queue#unshift() * queue#pause() * queue#resume() * queue#idle() * queue#length() * queue#getQueue() * queue#kill() * queue#killAndDrain() * queue#error() * queue#concurrency * queue#drain * queue#empty * queue#saturated * fastqueue.promise() ------------------------------------------------------- ### fastqueue([that], worker, concurrency) Creates a new queue. Arguments: * `that`, optional context of the `worker` function. * `worker`, worker function, it would be called with `that` as `this`, if that is specified. * `concurrency`, number of concurrent tasks that could be executed in parallel. ------------------------------------------------------- ### queue.push(task, done) Add a task at the end of the queue. `done(err, result)` will be called when the task was processed. ------------------------------------------------------- ### queue.unshift(task, done) Add a task at the beginning of the queue. `done(err, result)` will be called when the task was processed. ------------------------------------------------------- ### queue.pause() Pause the processing of tasks. Currently worked tasks are not stopped. ------------------------------------------------------- ### queue.resume() Resume the processing of tasks. ------------------------------------------------------- ### queue.idle() Returns `false` if there are tasks being processed or waiting to be processed. `true` otherwise. ------------------------------------------------------- ### queue.length() Returns the number of tasks waiting to be processed (in the queue). ------------------------------------------------------- ### queue.getQueue() Returns all the tasks be processed (in the queue). Returns empty array when there are no tasks ------------------------------------------------------- ### queue.kill() Removes all tasks waiting to be processed, and reset `drain` to an empty function. ------------------------------------------------------- ### queue.killAndDrain() Same than `kill` but the `drain` function will be called before reset to empty. ------------------------------------------------------- ### queue.error(handler) Set a global error handler. `handler(err, task)` will be called each time a task is completed, `err` will be not null if the task has thrown an error. ------------------------------------------------------- ### queue.concurrency Property that returns the number of concurrent tasks that could be executed in parallel. It can be altered at runtime. ------------------------------------------------------- ### queue.drain Function that will be called when the last item from the queue has been processed by a worker. It can be altered at runtime. ------------------------------------------------------- ### queue.empty Function that will be called when the last item from the queue has been assigned to a worker. It can be altered at runtime. ------------------------------------------------------- ### queue.saturated Function that will be called when the queue hits the concurrency limit. It can be altered at runtime. ------------------------------------------------------- ### fastqueue.promise([that], worker(arg), concurrency) Creates a new queue with `Promise` apis. It also offers all the methods and properties of the object returned by [`fastqueue`](#fastqueue) with the modified [`push`](#pushPromise) and [`unshift`](#unshiftPromise) methods. Node v10+ is required to use the promisified version. Arguments: * `that`, optional context of the `worker` function. * `worker`, worker function, it would be called with `that` as `this`, if that is specified. It MUST return a `Promise`. * `concurrency`, number of concurrent tasks that could be executed in parallel. #### queue.push(task) => Promise Add a task at the end of the queue. The returned `Promise` will be fulfilled (rejected) when the task is completed successfully (unsuccessfully). This promise could be ignored as it will not lead to a `'unhandledRejection'`. #### queue.unshift(task) => Promise Add a task at the beginning of the queue. The returned `Promise` will be fulfilled (rejected) when the task is completed successfully (unsuccessfully). This promise could be ignored as it will not lead to a `'unhandledRejection'`. #### queue.drained() => Promise Wait for the queue to be drained. The returned `Promise` will be resolved when all tasks in the queue have been processed by a worker. This promise could be ignored as it will not lead to a `'unhandledRejection'`. ## License ISC [ci-url]: https://github.com/mcollina/fastq/workflows/ci/badge.svg [npm-badge]: https://badge.fury.io/js/fastq.svg [npm-url]: https://badge.fury.io/js/fastq [david-badge]: https://david-dm.org/mcollina/fastq.svg [david-url]: https://david-dm.org/mcollina/fastq bench.js000066600000002253150435262460006176 0ustar00'use strict' const max = 1000000 const fastqueue = require('./')(worker, 1) const { promisify } = require('util') const immediate = promisify(setImmediate) const qPromise = require('./').promise(immediate, 1) const async = require('async') const neo = require('neo-async') const asyncqueue = async.queue(worker, 1) const neoqueue = neo.queue(worker, 1) function bench (func, done) { const key = max + '*' + func.name let count = -1 console.time(key) end() function end () { if (++count < max) { func(end) } else { console.timeEnd(key) if (done) { done() } } } } function benchFastQ (done) { fastqueue.push(42, done) } function benchAsyncQueue (done) { asyncqueue.push(42, done) } function benchNeoQueue (done) { neoqueue.push(42, done) } function worker (arg, cb) { setImmediate(cb) } function benchSetImmediate (cb) { worker(42, cb) } function benchFastQPromise (done) { qPromise.push(42).then(function () { done() }, done) } function runBench (done) { async.eachSeries([ benchSetImmediate, benchFastQ, benchNeoQueue, benchAsyncQueue, benchFastQPromise ], bench, done) } runBench(runBench) .github/dependabot.yml000066600000000300150435262460010740 0ustar00version: 2 updates: - package-ecosystem: npm directory: "/" schedule: interval: daily open-pull-requests-limit: 10 ignore: - dependency-name: standard versions: - 16.0.3 .github/workflows/ci.yml000066600000001623150435262460011274 0ustar00name: ci on: [push, pull_request] jobs: legacy: runs-on: ubuntu-latest strategy: matrix: node-version: ['0.10', '0.12', 4.x, 6.x, 8.x] steps: - uses: actions/checkout@v2 - name: Use Node.js uses: actions/setup-node@v1 with: node-version: ${{ matrix.node-version }} - name: Install run: | npm install --production && npm install tape - name: Run tests run: | npm run legacy test: runs-on: ubuntu-latest strategy: matrix: node-version: [10.x, 12.x, 13.x, 14.x, 15.x, 16.x] steps: - uses: actions/checkout@v2 - name: Use Node.js uses: actions/setup-node@v1 with: node-version: ${{ matrix.node-version }} - name: Install run: | npm install - name: Run tests run: | npm run test queue.js000066600000012761150435262460006250 0ustar00'use strict' /* eslint-disable no-var */ var reusify = require('reusify') function fastqueue (context, worker, concurrency) { if (typeof context === 'function') { concurrency = worker worker = context context = null } if (concurrency < 1) { throw new Error('fastqueue concurrency must be greater than 1') } var cache = reusify(Task) var queueHead = null var queueTail = null var _running = 0 var errorHandler = null var self = { push: push, drain: noop, saturated: noop, pause: pause, paused: false, concurrency: concurrency, running: running, resume: resume, idle: idle, length: length, getQueue: getQueue, unshift: unshift, empty: noop, kill: kill, killAndDrain: killAndDrain, error: error } return self function running () { return _running } function pause () { self.paused = true } function length () { var current = queueHead var counter = 0 while (current) { current = current.next counter++ } return counter } function getQueue () { var current = queueHead var tasks = [] while (current) { tasks.push(current.value) current = current.next } return tasks } function resume () { if (!self.paused) return self.paused = false for (var i = 0; i < self.concurrency; i++) { _running++ release() } } function idle () { return _running === 0 && self.length() === 0 } function push (value, done) { var current = cache.get() current.context = context current.release = release current.value = value current.callback = done || noop current.errorHandler = errorHandler if (_running === self.concurrency || self.paused) { if (queueTail) { queueTail.next = current queueTail = current } else { queueHead = current queueTail = current self.saturated() } } else { _running++ worker.call(context, current.value, current.worked) } } function unshift (value, done) { var current = cache.get() current.context = context current.release = release current.value = value current.callback = done || noop if (_running === self.concurrency || self.paused) { if (queueHead) { current.next = queueHead queueHead = current } else { queueHead = current queueTail = current self.saturated() } } else { _running++ worker.call(context, current.value, current.worked) } } function release (holder) { if (holder) { cache.release(holder) } var next = queueHead if (next) { if (!self.paused) { if (queueTail === queueHead) { queueTail = null } queueHead = next.next next.next = null worker.call(context, next.value, next.worked) if (queueTail === null) { self.empty() } } else { _running-- } } else if (--_running === 0) { self.drain() } } function kill () { queueHead = null queueTail = null self.drain = noop } function killAndDrain () { queueHead = null queueTail = null self.drain() self.drain = noop } function error (handler) { errorHandler = handler } } function noop () {} function Task () { this.value = null this.callback = noop this.next = null this.release = noop this.context = null this.errorHandler = null var self = this this.worked = function worked (err, result) { var callback = self.callback var errorHandler = self.errorHandler var val = self.value self.value = null self.callback = noop if (self.errorHandler) { errorHandler(err, val) } callback.call(self.context, err, result) self.release(self) } } function queueAsPromised (context, worker, concurrency) { if (typeof context === 'function') { concurrency = worker worker = context context = null } function asyncWrapper (arg, cb) { worker.call(this, arg) .then(function (res) { cb(null, res) }, cb) } var queue = fastqueue(context, asyncWrapper, concurrency) var pushCb = queue.push var unshiftCb = queue.unshift queue.push = push queue.unshift = unshift queue.drained = drained return queue function push (value) { var p = new Promise(function (resolve, reject) { pushCb(value, function (err, result) { if (err) { reject(err) return } resolve(result) }) }) // Let's fork the promise chain to // make the error bubble up to the user but // not lead to a unhandledRejection p.catch(noop) return p } function unshift (value) { var p = new Promise(function (resolve, reject) { unshiftCb(value, function (err, result) { if (err) { reject(err) return } resolve(result) }) }) // Let's fork the promise chain to // make the error bubble up to the user but // not lead to a unhandledRejection p.catch(noop) return p } function drained () { if (queue.idle()) { return new Promise(function (resolve) { resolve() }) } var previousDrain = queue.drain var p = new Promise(function (resolve) { queue.drain = function () { previousDrain() resolve() } }) return p } } module.exports = fastqueue module.exports.promise = queueAsPromised index.d.ts000066600000002560150435262460006463 0ustar00declare function fastq(context: C, worker: fastq.worker, concurrency: number): fastq.queue declare function fastq(worker: fastq.worker, concurrency: number): fastq.queue declare namespace fastq { type worker = (this: C, task: T, cb: fastq.done) => void type asyncWorker = (this: C, task: T) => Promise type done = (err: Error | null, result?: R) => void type errorHandler = (err: Error, task: T) => void interface queue { push(task: T, done?: done): void unshift(task: T, done?: done): void pause(): any resume(): any idle(): boolean length(): number getQueue(): T[] kill(): any killAndDrain(): any error(handler: errorHandler): void concurrency: number drain(): any empty: () => void saturated: () => void } interface queueAsPromised extends queue { push(task: T): Promise unshift(task: T): Promise drained(): Promise } function promise(context: C, worker: fastq.asyncWorker, concurrency: number): fastq.queueAsPromised function promise(worker: fastq.asyncWorker, concurrency: number): fastq.queueAsPromised } export = fastq LICENSE000066600000001375150435262460005572 0ustar00Copyright (c) 2015-2020, Matteo Collina Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. example.mjs000066600000000335150435262460006726 0ustar00import { promise as queueAsPromised } from './queue.js' /* eslint-disable */ const queue = queueAsPromised(worker, 1) console.log('the result is', await queue.push(42)) async function worker (arg) { return 42 * 2 } package.json000066600000002542150435262460007050 0ustar00{ "name": "fastq", "version": "1.15.0", "description": "Fast, in memory work queue", "main": "queue.js", "scripts": { "lint": "standard --verbose | snazzy", "unit": "nyc --lines 100 --branches 100 --functions 100 --check-coverage --reporter=text tape test/test.js test/promise.js", "coverage": "nyc --reporter=html --reporter=cobertura --reporter=text tape test/test.js test/promise.js", "test:report": "npm run lint && npm run unit:report", "test": "npm run lint && npm run unit && npm run typescript", "typescript": "tsc --project ./test/tsconfig.json", "legacy": "tape test/test.js" }, "pre-commit": [ "test" ], "repository": { "type": "git", "url": "git+https://github.com/mcollina/fastq.git" }, "keywords": [ "fast", "queue", "async", "worker" ], "author": "Matteo Collina ", "license": "ISC", "bugs": { "url": "https://github.com/mcollina/fastq/issues" }, "homepage": "https://github.com/mcollina/fastq#readme", "devDependencies": { "async": "^3.1.0", "neo-async": "^2.6.1", "nyc": "^15.0.0", "pre-commit": "^1.2.2", "snazzy": "^9.0.0", "standard": "^16.0.0", "tape": "^5.0.0", "typescript": "^4.0.2" }, "dependencies": { "reusify": "^1.0.4" }, "standard": { "ignore": [ "example.mjs" ] } }