Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixup: simplify #12

Open
wants to merge 6 commits into
base: transform-by
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions benchmark/streams/transform-by.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const common = require('../common');
const Transform = require('stream').Transform;

const bench = common.createBenchmark(main, {
n: [1e6]
});

function main({ n }) {
const s = Transform.by(async function*(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
}
});
s.resume();

bench.start();

let k = 0;
function run() {
while (k++ < n && s.write(b));
if (k >= n)
s.end();
}
s.on('drain', run);
s.on('finish', () => bench.end(n));
run();
}
6 changes: 0 additions & 6 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -630,12 +630,6 @@ display if `block` does not throw.
An iterable argument (i.e. a value that works with `for...of` loops) was
required, but not provided to a Node.js API.

<a id="ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE"></a>
### ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE

A function argument that returns an async iterable (i.e. a value that works
with `for await...of` loops) was required, but not provided to a Node.js API.

<a id="ERR_ASSERTION"></a>
### ERR_ASSERTION

Expand Down
8 changes: 4 additions & 4 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1646,14 +1646,14 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
the strings or buffers be iterated to match the other streams semantics
for performance reasons.

### stream.Transform.by(asyncGeneratorFunction[, options])
### stream.Transform.by(transform[, options])
<!-- YAML
added: REPLACEME
-->

* `asyncGeneratorFunction` {AsyncGeneratorFunction} A mapping function which
accepts a `source` async iterable which can be used to read incoming data, while
transformed data is pushed to the stream with the `yield` keyword.
* `transform` {AsyncGeneratorFunction} A mapping function which accepts a `source`
async iterable which can be used to read incoming data, while transformed data is
pushed to the stream with the `yield` keyword.
* `options` {Object} Options provided to `new stream.Transform([options])`.
By default, `Transform.by()` will set `options.objectMode` to `true`,
unless this is explicitly opted out by setting `options.objectMode` to `false`.
Expand Down
131 changes: 20 additions & 111 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,12 @@ const {

module.exports = Transform;
const {
ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_TRANSFORM_ALREADY_TRANSFORMING,
ERR_TRANSFORM_WITH_LENGTH_0
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

const kSourceIteratorPull = Symbol('kSourceIteratorPull');
const kSourceIteratorResolve = Symbol('kSourceIteratorResolve');
const kSourceIteratorChunk = Symbol('kSourceIteratorChunk');
const kSourceIteratorStream = Symbol('kSourceIteratorStream');
const kSourceIteratorPump = Symbol('kSourceIteratorPump');
const kSourceIteratorGrabResolve = Symbol('kSourceIteratorGrabResolve');

ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);
Expand Down Expand Up @@ -232,110 +222,29 @@ function done(stream, er, data) {
return stream.push(null);
}

function SourceIterator(asyncGeneratorFn, opts) {
const source = this;
const result = asyncGeneratorFn(this);
if (typeof result[Symbol.asyncIterator] !== 'function') {
throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn');
}
const iter = result[Symbol.asyncIterator]();
if (typeof iter.next !== 'function') {
throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn');
}

this[kSourceIteratorPull] = null;
this[kSourceIteratorChunk] = null;
this[kSourceIteratorResolve] = null;
this[kSourceIteratorStream] = new Transform({
objectMode: true,
...opts,
transform(chunk, encoding, cb) {
source.encoding = encoding;
if (source[kSourceIteratorResolve] === null) {
source[kSourceIteratorChunk] = chunk;
source[kSourceIteratorPull] = cb;
return;
}
source[kSourceIteratorResolve]({ value: chunk, done: false });
source[kSourceIteratorResolve] = null;
cb(null);
}
});
this.encoding = this[kSourceIteratorStream]._transformState.writeencoding;
this[kSourceIteratorGrabResolve] = (resolve) => {
this[kSourceIteratorResolve] = resolve;
};
const first = iter.next();
this[kSourceIteratorPump](iter, first);
}

SourceIterator.prototype[Symbol.asyncIterator] = function() {
return this;
};

ObjectSetPrototypeOf(SourceIterator.prototype, AsyncIteratorPrototype);
const from = require('internal/streams/from');

SourceIterator.prototype.next = function next() {
if (this[kSourceIteratorPull] === null || this[kSourceIteratorChunk] === null)
return new Promise(this[kSourceIteratorGrabResolve]);
Transform.by = function by(transform, opts) {
let _resolve;
let _promise = new Promise((resolve) => _resolve = resolve);

this[kSourceIteratorPull](null);
const result = Promise.resolve({
value: this[kSourceIteratorChunk],
done: false
});
this[kSourceIteratorChunk] = null;
this[kSourceIteratorPull] = null;
return result;
};
if (typeof transform !== 'function') {
throw new ERR_INVALID_ARG_TYPE('transform', ['function'], iterable);
}

SourceIterator.prototype[kSourceIteratorPump] = async function pump(iter, p) {
const stream = this[kSourceIteratorStream];
try {
stream.removeListener('prefinish', prefinish);
stream.on('prefinish', () => {
if (this[kSourceIteratorResolve] !== null) {
this[kSourceIteratorResolve]({ value: undefined, done: true });
}
});
let next = await p;
return from(Duplex, transform(async function*() {
while (true) {
const { done, value } = next;
if (done) {
if (value !== undefined) stream.push(value);

// In the event of an early return we explicitly
// discard any buffered state
if (stream._writableState.length > 0) {
const { length } = stream._writableState;
const { transforming } = stream._transformState;
stream._writableState.length = 0;
stream._transformState.transforming = false;
prefinish.call(stream);
stream._writableState.length = length;
stream._transformState.transforming = transforming;
} else {
prefinish.call(stream);
}
break;
}
stream.push(value);
next = await iter.next();
const { chunk, done, cb } = await _promise;
if (done) return cb();
yield chunk;
_promise = new Promise((resolve) => _resolve = resolve);
cb();
}
} catch (err) {
process.nextTick(() => stream.destroy(err));
} finally {
this[kSourceIteratorPull] = null;
this[kSourceIteratorChunk] = null;
this[kSourceIteratorResolve] = null;
this[kSourceIteratorStream] = null;
}
};


Transform.by = function by(asyncGeneratorFn, opts) {
const source = new SourceIterator(asyncGeneratorFn, opts);
const stream = source[kSourceIteratorStream];

return stream;
}()), {
objectMode: true,
autoDestroy: true,
...opts,
write: (chunk, encoding, cb) => _resolve({ chunk, done: false, cb }),
final: (cb) => _resolve({ done: true, cb })
});
};
2 changes: 0 additions & 2 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,6 @@ module.exports = {
// Note: Node.js specific errors must begin with the prefix ERR_
E('ERR_AMBIGUOUS_ARGUMENT', 'The "%s" argument is ambiguous. %s', TypeError);
E('ERR_ARG_NOT_ITERABLE', '%s must be iterable', TypeError);
E('ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', '%s must return an async iterable',
TypeError);
E('ERR_ASSERTION', '%s', Error);
E('ERR_ASYNC_CALLBACK', '%s must be a function', TypeError);
E('ERR_ASYNC_TYPE', 'Invalid name for async "type": %s', TypeError);
Expand Down
7 changes: 4 additions & 3 deletions lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ function from(Readable, iterable, opts) {
});
}

if (iterable && iterable[SymbolAsyncIterator])
if (iterable && typeof iterable[SymbolAsyncIterator] === 'function')
iterator = iterable[SymbolAsyncIterator]();
else if (iterable && iterable[SymbolIterator])
else if (iterable && typeof iterable[SymbolIterator] === 'function')
iterator = iterable[SymbolIterator]();
else

if (!iterator || typeof iterator.next !== 'function')
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidmarkclements Moved invalid arg check to Readable.from.


const readable = new Readable({
Expand Down
Loading