diff --git a/src/network/connection.ts b/src/network/connection.ts index 06e062fb..20f97cbc 100644 --- a/src/network/connection.ts +++ b/src/network/connection.ts @@ -604,8 +604,7 @@ export class Connection extends EventEmitter { const request = this.#inflightRequests.get(correlationId) if (!request) { - this.emit( - 'error', + this.#socket.destroy( new UnexpectedCorrelationIdError(`Received unexpected response with correlation_id=${correlationId}`, { raw: this.#responseReader.buffer.slice(0, this.#nextMessage + INT32_SIZE) }) diff --git a/test/network/connection.test.ts b/test/network/connection.test.ts index 139aa309..b6543185 100644 --- a/test/network/connection.test.ts +++ b/test/network/connection.test.ts @@ -32,6 +32,18 @@ import { mockedErrorMessage, mockedOperationId } from '../helpers.ts' +import { scheduler } from 'node:timers/promises' + +process + .on('unhandledRejection', (reason, p) => { + console.error('Unhandled Rejection at:', p) + console.error(reason) + }) + .on('uncaughtException', err => { + console.error('Uncaught Exception thrown') + console.error(err) + process.exit(1) + }) // Create passwords as Confluent Kafka images don't support it via environment const saslBroker = parseBroker(kafkaSaslBootstrapServers[0]) @@ -569,7 +581,20 @@ test('Connection should handle unexpected correlation IDs', async t => { response.writeInt32BE(8, 0) // size (8 bytes payload) response.writeInt32BE(99999, 4) // Unexpected correlationId response.writeInt32BE(123, 8) // mock result - socket.end(response) + socket.write(response) + + // Put next write on event loop to ensure onData is first processing only the first response + setTimeout(() => { + if (socket.writable) { + // Send another response with another unexpected correlation ID + // If this is received and processed by the connection, it will lead to an unhandled exception + const response2 = Buffer.alloc(4 + 4 + 4) // size + correlationId + result + response2.writeInt32BE(8, 0) // size (8 bytes payload) + response2.writeInt32BE(100000, 4) // Unexpected correlationId + response2.writeInt32BE(124, 8) // mock result + socket.write(response2) + } + }, 0) }) }) @@ -604,8 +629,12 @@ test('Connection should handle unexpected correlation IDs', async t => { // Wait for error const error = await errorPromise - ok(error instanceof UnexpectedCorrelationIdError) - strictEqual(error.message, 'Received unexpected response with correlation_id=99999') + ok(error instanceof NetworkError) + ok(error.cause instanceof UnexpectedCorrelationIdError) + strictEqual(error.cause.message, 'Received unexpected response with correlation_id=99999') + + // Put test finish on event loop to allow server to send the second unexpected correlation ID + await scheduler.wait(0) }) test('Connection should handle socket errors', async t => {