diff --git a/src/consumer/fetcher.js b/src/consumer/fetcher.js index fcea2ba4c..878e52515 100644 --- a/src/consumer/fetcher.js +++ b/src/consumer/fetcher.js @@ -49,10 +49,11 @@ const createFetcher = ({ const start = async () => { if (isRunning) return isRunning = true + const stopPromise = new Promise(resolve => emitter.once('stop', resolve)) while (isRunning) { try { - const batches = await fetch(nodeId) + const batches = await Promise.any([fetch(nodeId), stopPromise]) if (isRunning) { const availableBatches = filterUnassignedBatches(batches) @@ -77,7 +78,9 @@ const createFetcher = ({ const stop = async () => { if (!isRunning) return isRunning = false - await new Promise(resolve => emitter.once('end', () => resolve())) + const endPromise = new Promise(resolve => emitter.once('end', resolve)) + emitter.emit('stop'); + await endPromise; } return { start, stop, getWorkerQueue }