From 267ae916e9c3235eecbae262ed92a2d5bd7caa7e Mon Sep 17 00:00:00 2001 From: bailey Date: Thu, 9 Jan 2025 07:15:25 -0700 Subject: [PATCH 1/2] remove ReadableCursorStream --- src/cursor/abstract_cursor.ts | 114 +----------------- test/integration/crud/misc_cursors.test.js | 42 +++---- .../node-specific/abstract_cursor.test.ts | 9 +- 3 files changed, 24 insertions(+), 141 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 8eccdfcf63..6fe1ccd0a8 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -1,4 +1,4 @@ -import { Readable, Transform } from 'stream'; +import { Readable } from 'stream'; import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson'; import { type OnDemandDocumentDeserializeOptions } from '../cmap/wire_protocol/on_demand/document'; @@ -496,33 +496,10 @@ export abstract class AbstractCursor< } stream(options?: CursorStreamOptions): Readable & AsyncIterable { - if (options?.transform) { - const transform = options.transform; - const readable = new ReadableCursorStream(this); - - const transformedStream = readable.pipe( - new Transform({ - objectMode: true, - highWaterMark: 1, - transform(chunk, _, callback) { - try { - const transformed = transform(chunk); - callback(undefined, transformed); - } catch (err) { - callback(err); - } - } - }) - ); - - // Bubble errors to transformed stream, because otherwise no way - // to handle this error. - readable.on('error', err => transformedStream.emit('error', err)); - - return transformedStream; - } - - return new ReadableCursorStream(this); + const transform = options?.transform ?? (doc => doc); + return Readable.from(this, { autoDestroy: false, highWaterMark: 1, objectMode: true }).map( + transform + ); } async hasNext(): Promise { @@ -1062,87 +1039,6 @@ export abstract class AbstractCursor< } } -class ReadableCursorStream extends Readable { - private _cursor: AbstractCursor; - private _readInProgress = false; - - constructor(cursor: AbstractCursor) { - super({ - objectMode: true, - autoDestroy: false, - highWaterMark: 1 - }); - this._cursor = cursor; - } - - // eslint-disable-next-line @typescript-eslint/no-unused-vars - override _read(size: number): void { - if (!this._readInProgress) { - this._readInProgress = true; - this._readNext(); - } - } - - override _destroy(error: Error | null, callback: (error?: Error | null) => void): void { - this._cursor.close().then( - () => callback(error), - closeError => callback(closeError) - ); - } - - private _readNext() { - if (this._cursor.id === Long.ZERO) { - this.push(null); - return; - } - - this._cursor.next().then( - result => { - if (result == null) { - this.push(null); - } else if (this.destroyed) { - this._cursor.close().then(undefined, squashError); - } else { - if (this.push(result)) { - return this._readNext(); - } - - this._readInProgress = false; - } - }, - err => { - // NOTE: This is questionable, but we have a test backing the behavior. It seems the - // desired behavior is that a stream ends cleanly when a user explicitly closes - // a client during iteration. Alternatively, we could do the "right" thing and - // propagate the error message by removing this special case. - if (err.message.match(/server is closed/)) { - this._cursor.close().then(undefined, squashError); - return this.push(null); - } - - // NOTE: This is also perhaps questionable. The rationale here is that these errors tend - // to be "operation was interrupted", where a cursor has been closed but there is an - // active getMore in-flight. This used to check if the cursor was killed but once - // that changed to happen in cleanup legitimate errors would not destroy the - // stream. There are change streams test specifically test these cases. - if (err.message.match(/operation was interrupted/)) { - return this.push(null); - } - - // NOTE: The two above checks on the message of the error will cause a null to be pushed - // to the stream, thus closing the stream before the destroy call happens. This means - // that either of those error messages on a change stream will not get a proper - // 'error' event to be emitted (the error passed to destroy). Change stream resumability - // relies on that error event to be emitted to create its new cursor and thus was not - // working on 4.4 servers because the error emitted on failover was "interrupted at - // shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down". - // See NODE-4475. - return this.destroy(err); - } - ); - } -} - configureResourceManagement(AbstractCursor.prototype); /** diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index b8de060b6b..e225e3df93 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1495,7 +1495,7 @@ describe('Cursor', function () { } }); - it('does not auto destroy streams', function (done) { + it('does not auto destroy streams', async function () { const docs = []; for (var i = 0; i < 10; i++) { @@ -1503,32 +1503,26 @@ describe('Cursor', function () { } const configuration = this.configuration; - client.connect((err, client) => { - expect(err).to.not.exist; + await client.connect(); - const db = client.db(configuration.db); - db.createCollection('does_not_autodestroy_streams', (err, collection) => { - expect(err).to.not.exist; + const db = client.db(configuration.db); + const collection = await db.createCollection('does_not_autodestroy_streams'); - collection.insertMany(docs, configuration.writeConcernMax(), err => { - expect(err).to.not.exist; + await collection.insertMany(docs, configuration.writeConcernMax()); - const cursor = collection.find(); - const stream = cursor.stream(); - stream.on('close', () => { - expect.fail('extra close event must not be called'); - }); - stream.on('end', () => { - client.close(); - done(); - }); - stream.on('data', doc => { - expect(doc).to.exist; - }); - stream.resume(); - }); - }); + const cursor = collection.find(); + const stream = cursor.stream(); + + const end$ = once(stream, 'end'); + const close$ = once(stream, 'close').then(() => { + expect.fail('extra close event must not be called'); }); + + stream.resume(); + + await Promise.race([end$, close$]); + + await client.close(); }); it('should be able to stream documents', { @@ -2321,7 +2315,7 @@ describe('Cursor', function () { .find() .withReadPreference('notsecondary'); test.ok(false); - } catch (err) {} // eslint-disable-line + } catch (err) { } // eslint-disable-line db.collection('shouldFailToSetReadPreferenceOnCursor') .find() diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index ac060c9d45..358f3b9216 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -1,7 +1,6 @@ import { expect } from 'chai'; import { once } from 'events'; import * as sinon from 'sinon'; -import { Transform } from 'stream'; import { inspect } from 'util'; import { @@ -299,14 +298,8 @@ describe('class AbstractCursor', function () { }); it('propagates errors to transform stream', async function () { - const transform = new Transform({ - transform(data, encoding, callback) { - callback(null, data); - } - }); - // MongoServerError: unknown operator: $bar - const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform }); + const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform: doc => doc }); const error: Error | null = await new Promise(resolve => { stream.on('error', error => resolve(error)); From 5abf1114ffe029e58d7a3ac69f04ed4de14909e6 Mon Sep 17 00:00:00 2001 From: bailey Date: Thu, 16 Jan 2025 09:03:15 -0700 Subject: [PATCH 2/2] fix two failing tests --- src/cursor/abstract_cursor.ts | 10 ++- test/integration/crud/misc_cursors.test.js | 74 +++++++------------ .../node-specific/abstract_cursor.test.ts | 7 +- 3 files changed, 37 insertions(+), 54 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 6fe1ccd0a8..67ad3a3852 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -497,9 +497,13 @@ export abstract class AbstractCursor< stream(options?: CursorStreamOptions): Readable & AsyncIterable { const transform = options?.transform ?? (doc => doc); - return Readable.from(this, { autoDestroy: false, highWaterMark: 1, objectMode: true }).map( - transform - ); + const stream = Readable.from(this, { + autoDestroy: false, + highWaterMark: 1, + objectMode: true + }).map(transform); + stream.on('close', () => this.close() as any); + return stream; } async hasNext(): Promise { diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index e225e3df93..f205795cfc 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1608,62 +1608,38 @@ describe('Cursor', function () { } }); - it('immediately destroying a stream prevents the query from executing', { - // Add a tag that our runner can trigger on - // in this case we are setting that node needs to be higher than 0.10.X to run - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } - }, - - test: function (done) { - var i = 0, - docs = [{ b: 2 }, { b: 3 }], - doneCalled = 0; - - const configuration = this.configuration; - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); - - const db = client.db(configuration.db); - db.createCollection( - 'immediately_destroying_a_stream_prevents_the_query_from_executing', - (err, collection) => { - expect(err).to.not.exist; + it('immediately destroying a stream prevents the query from executing', async function () { + var i = 0, + docs = [{ b: 2 }, { b: 3 }]; - // insert all docs - collection.insertMany(docs, configuration.writeConcernMax(), err => { - expect(err).to.not.exist; + const configuration = this.configuration; + await client.connect(); - const cursor = collection.find(); - const stream = cursor.stream(); + const db = client.db(configuration.db); + const collection = db.collection( + 'immediately_destroying_a_stream_prevents_the_query_from_executing' + ); + // insert all docs + await collection.insertMany(docs, configuration.writeConcernMax()); - stream.on('data', function () { - i++; - }); + const cursor = collection.find(); + const stream = cursor.stream(); - cursor.once('close', testDone('close')); - stream.once('error', testDone('error')); + stream.on('data', function () { + i++; + }); - stream.destroy(); + const close$ = once(stream, 'close').then(async () => { + expect(i).to.equal(0); + expect(cursor.closed).to.be.true; + }); + const error$ = once(stream, 'error').catch(e => { + throw e; + }); - function testDone() { - return err => { - ++doneCalled; + stream.destroy(); - if (doneCalled === 1) { - expect(err).to.not.exist; - test.strictEqual(0, i); - test.strictEqual(true, cursor.closed); - done(); - } - }; - } - }); - } - ); - }); - } + await Promise.race([close$, error$]); }); it('removes session when cloning an find cursor', async function () { diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index 358f3b9216..f7518ed49e 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -285,8 +285,7 @@ describe('class AbstractCursor', function () { const docs = [{ count: 0 }]; beforeEach(async function () { - client = this.configuration.newClient(); - + client = this.configuration.newClient({}, { monitorCommands: true }); collection = client.db('abstract_cursor_integration').collection('test'); await collection.insertMany(docs); @@ -301,6 +300,10 @@ describe('class AbstractCursor', function () { // MongoServerError: unknown operator: $bar const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform: doc => doc }); + stream.on('data', () => { + // do nothing + }); + const error: Error | null = await new Promise(resolve => { stream.on('error', error => resolve(error)); stream.on('end', () => resolve(null));