diff --git a/packages/core/test/router/router-response-controller.spec.ts b/packages/core/test/router/router-response-controller.spec.ts index aac3212628e..7fdc31d13fa 100644 --- a/packages/core/test/router/router-response-controller.spec.ts +++ b/packages/core/test/router/router-response-controller.spec.ts @@ -7,6 +7,7 @@ import { PassThrough, Writable } from 'stream'; import { HttpStatus, RequestMethod } from '../../../common'; import { RouterResponseController } from '../../router/router-response-controller'; import { NoopHttpAdapter } from '../utils/noop-adapter.spec'; +import { SseStream } from '../../router/sse-stream'; describe('RouterResponseController', () => { let adapter: NoopHttpAdapter; @@ -374,6 +375,73 @@ data: test done(); }); + describe('when writing data too densely', () => { + const DEFAULT_MAX_LISTENERS = SseStream.defaultMaxListeners; + const MAX_LISTENERS = 1; + const sandbox = sinon.createSandbox(); + + beforeEach(() => { + // Can't access to the internal sseStream, + // as a workround, set `defaultMaxListeners` of `SseStream` and reset the max listeners of `process` + const PROCESS_MAX_LISTENERS = process.getMaxListeners(); + SseStream.defaultMaxListeners = MAX_LISTENERS; + process.setMaxListeners(PROCESS_MAX_LISTENERS); + + const sseStream = sinon.createStubInstance(SseStream); + const originalWrite = SseStream.prototype.write; + // Make `.write()` always return false, so as to listen `drain` event + sseStream.write.callsFake(function (...args: any[]) { + originalWrite.apply(this, args); + return false; + }); + sandbox.replace(SseStream.prototype, 'write', sseStream.write); + }); + + afterEach(() => { + sandbox.restore(); + SseStream.defaultMaxListeners = DEFAULT_MAX_LISTENERS; + }); + + it('should not cause memory leak', async () => { + let maxDrainListenersExceededWarning = null; + process.on('warning', (warning: any) => { + if ( + warning.name === 'MaxListenersExceededWarning' && + warning.emitter instanceof SseStream && + warning.type === 'drain' && + warning.count === MAX_LISTENERS + 1 + ) { + maxDrainListenersExceededWarning = warning; + } + }); + + const result = new Subject(); + + const response = new Writable(); + response._write = () => {}; + + const request = new Writable(); + request._write = () => {}; + + routerResponseController.sse( + result, + response as unknown as ServerResponse, + request as unknown as IncomingMessage, + ); + + // Send multiple messages simultaneously + Array.from({ length: MAX_LISTENERS + 1 }).forEach((_, i) => + result.next(String(i)), + ); + + await new Promise(resolve => process.nextTick(resolve)); + + expect(() => { + expect(maxDrainListenersExceededWarning).to.equal(null); + }, 'it will fail as there is an issue here to be addressed').to.throw(); + }); + }); + describe('when there is an error', () => { it('should close the request', done => { const result = new Subject();