Skip to content

Commit

Permalink
feat: upgrade to node 8 (#28)
Browse files Browse the repository at this point in the history
- Use native async/await and don't transpile
- Use `setImmediate` instead of `process.nextTick` as that's the correct way to wait for IO to finish. The only reason it worked before was a side effect of the generator based async/await that typescript gave us.

BREAKING CHANGE: We now require node 8 and use native async/await
  • Loading branch information
reconbot authored Apr 17, 2018
1 parent 05e4bca commit 76938be
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 38 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ notifications:
node_js:
- '9'
- '8'
- '6'
after_success:
- npm run semantic-release
branches:
Expand Down
2 changes: 1 addition & 1 deletion lib/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export class TransformStream extends Transform implements IBluestream {
return this.end(cb)
}
if ((this as any)._writableState.pendingcb > 0) {
process.nextTick(() => this.end(cb))
setImmediate(() => this.end(cb))
return
}
Transform.prototype.end.call(this, cb)
Expand Down
2 changes: 1 addition & 1 deletion lib/write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export class WriteStream extends Writable implements IBluestream {
return this.end(cb)
}
if ((this as any)._writableState.pendingcb > 0) {
return process.nextTick(() => this.end(cb))
return setImmediate(() => this.end(cb))
}
Writable.prototype.end.call(this, cb)
} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"typescript": "^2.7.2"
},
"engines": {
"node": ">=6"
"node": ">=8"
},
"eslintConfig": {
"env": {
Expand Down
20 changes: 10 additions & 10 deletions test/read-test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { assert } from 'chai'
import { read, ReadStream, wait } from '../lib'

function nextTick (data?) {
return new Promise(resolve => process.nextTick(() => resolve(data)))
function promiseImmediate (data?) {
return new Promise(resolve => setImmediate(() => resolve(data)))
}

describe('ReadStream', () => {
Expand Down Expand Up @@ -118,8 +118,8 @@ describe('ReadStream', () => {
let callCount = 0
const stream = read(function () {
callCount++
this.push(nextTick(1))
this.push(nextTick(2))
this.push(promiseImmediate(1))
this.push(promiseImmediate(2))
return null
})
let sum = 0
Expand All @@ -135,8 +135,8 @@ describe('ReadStream', () => {
let callCount = 0
const stream = read(function () {
callCount++
this.push(nextTick(1))
this.push(nextTick(2))
this.push(promiseImmediate(1))
this.push(promiseImmediate(2))
this.push(null)
})
let sum = 0
Expand All @@ -153,17 +153,17 @@ describe('ReadStream', () => {
let pushCount = 0
const stream = read(async function () {
callCount++
this.push(await nextTick(1))
this.push(await promiseImmediate(1))
pushCount++
this.push(await nextTick(2))
this.push(await promiseImmediate(2))
pushCount++
})
let sum = 0
stream.on('data', data => {
sum += data
})
await nextTick()
await nextTick()
await promiseImmediate()
await promiseImmediate()
assert.equal(1, pushCount)
stream.push(null)
await wait(stream)
Expand Down
13 changes: 7 additions & 6 deletions test/readAsync-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { createReadStream } from 'fs'
import { join } from 'path'
import { collect, read, readAsync, write } from '../lib'

function nextTick (data) {
return new Promise(resolve => process.nextTick(() => resolve(data)))
function promiseImmediate (data?) {
return new Promise(resolve => setImmediate(() => resolve(data)))
}

function bufferStream () {
Expand Down Expand Up @@ -79,20 +79,21 @@ describe('#readAsync', () => {
assert.isNull(await readAsync(stream, 5))
assert.equal(stream._eventsCount, 1)
})
it('rejects if the stream errors', async () => {
it('rejects if the stream errors', () => {
const stream = read(() => 1)
const error = new Error('Foo!')
nextTick().then(() => stream.emit('error', error))
await readAsync(stream, 5).then(() => {
const assertion = readAsync(stream, 5).then(() => {
assert.isTrue(false, 'The promise should have rejected')
}, err => {
assert.isNotNull(err)
assert.deepEqual(err, error)
assert.equal(stream._eventsCount, 1)
})
stream.emit('error', error)
return assertion
})
it('rejects if the stream is already in flowing mode', async () => {
const stream = read(() => nextTick(1))
const stream = read(() => promiseImmediate(1))
stream.resume()
await readAsync(stream, 1).then(() => {
assert.isTrue(false, 'The promise should have rejected')
Expand Down
38 changes: 29 additions & 9 deletions test/transform-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,26 @@ function numbers () {
}})
}

function manyNumbers () {
let count = 0
return bstream.read(function () {
const arr = []
for (let index = 0; index < 2000; index++) {
arr.push(count++)
}
this.push(arr)
if (count > 100000) {
this.push(null)
}
})
}

function delay (ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}

function nextTick () {
return new Promise(resolve => process.nextTick(resolve))
function promiseImmediate (data?) {
return new Promise(resolve => setImmediate(() => resolve(data)))
}

describe('TransformStream', () => {
Expand Down Expand Up @@ -122,7 +136,7 @@ describe('TransformStream', () => {
})

it('handles sync errors', async () => {
let transform = bstream.transform(() => { throw new Error("I'm an Error") })
const transform = bstream.transform(() => { throw new Error("I'm an Error") })
const transformPromise = transform.promise()
transform.write(4)
await transformPromise.then(() => {
Expand Down Expand Up @@ -166,8 +180,8 @@ describe('TransformStream', () => {

it('supports writable objects and readable buffers', async () => {
let transform = new bstream.TransformStream({
writableObjectMode: true,
readableObjectMode: false,
writableObjectMode: true,
transform ({ value }) {
const data = value.toString()
this.push(data)
Expand All @@ -181,7 +195,7 @@ describe('TransformStream', () => {
it('allows for concurrent operations', async () => {
// resolve the promise from the deferred on the 2nd data event
const defered = defer()
let transform = bstream.transform({ concurrent: 2 }, async data => {
const transform = bstream.transform({ concurrent: 2 }, async data => {
if (data === 1) {
return defered.promise
}
Expand Down Expand Up @@ -219,11 +233,11 @@ describe('TransformStream', () => {

it('ensures all concurrent operations finish before ending', async () => {
let finished = 0
const numbers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, null]
const nums = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, null]
const source = bstream.read(async function () {
await nextTick()
this.push(numbers.shift())
this.push(numbers.shift())
await promiseImmediate()
this.push(nums.shift())
this.push(nums.shift())
})
const sink = bstream.transform({ concurrent: 6 }, async num => {
await delay(num)
Expand All @@ -232,4 +246,10 @@ describe('TransformStream', () => {
await bstream.pipe(source, sink)
assert.equal(finished, 11)
})

it('handles pushing more than the buffer in a single read', async () => {
await bstream.pipe(manyNumbers(), bstream.transform(function (nums) {
nums.forEach(num => this.push(num))
}), bstream.write(i => i))
})
})
6 changes: 3 additions & 3 deletions test/utilites-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ function objects () {
}})
}

function nextTick () {
return new Promise(resolve => process.nextTick(resolve))
function promiseImmediate (data?) {
return new Promise(resolve => setImmediate(() => resolve(data)))
}

describe('#wait', () => {
it('waits until the stream ends', async () => {
let last = '0'
await wait(lines().pipe(map(async el => {
await nextTick()
await promiseImmediate()
if (el) { last = el }
return el
})))
Expand Down
10 changes: 5 additions & 5 deletions test/write-test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Readable } from 'stream'
import { defer } from '../lib/utils'
import * as bstream from '../lib'
import { defer } from '../lib/utils'

function numbers (num = 6) {
const arr = [...new Array(num)].map((val, i) => i + 1)
Expand All @@ -12,7 +12,7 @@ function numbers (num = 6) {
if (value % 2 === 0) {
this.push(value)
} else {
process.nextTick(() => this.push(value))
setImmediate(() => this.push(value))
}
}})
}
Expand All @@ -21,8 +21,8 @@ function delay (ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}

function nextTick () {
return new Promise(resolve => process.nextTick(resolve))
function promiseImmediate (data?) {
return new Promise(resolve => setImmediate(() => resolve(data)))
}

describe('WriteStream', () => {
Expand Down Expand Up @@ -105,7 +105,7 @@ describe('WriteStream', () => {
let finished = 0
const numbers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, null]
const source = bstream.read(async function () {
await nextTick()
await promiseImmediate()
this.push(numbers.shift())
this.push(numbers.shift())
})
Expand Down
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"compilerOptions": {
"outDir": "dist",
"allowJs": false,
"target": "es2015",
"target": "es2017",
"moduleResolution": "node",
"module": "commonjs",
"declaration": true
Expand Down

0 comments on commit 76938be

Please sign in to comment.