diff --git a/src/apis/callbacks.ts b/src/apis/callbacks.ts index 513930e1..d1bf7169 100644 --- a/src/apis/callbacks.ts +++ b/src/apis/callbacks.ts @@ -1,4 +1,4 @@ -import { MultipleErrors, TimeoutError } from '../errors.ts' +import { MultipleErrors } from '../errors.ts' import { PromiseWithResolvers } from '../utils.ts' import { type Callback } from './definitions.ts' @@ -65,25 +65,3 @@ export function runConcurrentCallbacks ( operation(item, operationCallback.bind(null, i++)) } } - -export function createTimeoutCallback ( - callback: Callback, - timeout: number, - errorMessage: string -): Callback { - let timeoutFired = false - const timeoutHandle = setTimeout(() => { - timeoutFired = true - callback(new TimeoutError(errorMessage), undefined as unknown as ReturnType) - }, timeout) - - return (error: Error | null, result: ReturnType) => { - /* c8 ignore next 3 - Hard to test */ - if (timeoutFired) { - return - } - - clearTimeout(timeoutHandle) - callback(error, result) - } -} diff --git a/src/clients/base/base.ts b/src/clients/base/base.ts index ba2e4acb..f45204a5 100644 --- a/src/clients/base/base.ts +++ b/src/clients/base/base.ts @@ -24,7 +24,7 @@ import { import type { GenericError } from '../../errors.ts' import { MultipleErrors, NetworkError, UnsupportedApiError, UserError } from '../../errors.ts' import { ConnectionPool } from '../../network/connection-pool.ts' -import { type Broker, type Connection, type ConnectionOptions } from '../../network/connection.ts' +import { type Broker, type Connection } from '../../network/connection.ts' import { parseBroker } from '../../network/utils.ts' import { kInstance } from '../../symbols.ts' import { ajv, debugDump, loggers } from '../../utils.ts' @@ -269,7 +269,7 @@ export class Base extends EventEm [kCreateConnectionPool] (): ConnectionPool { const pool = new ConnectionPool(this[kClientId], { ownerId: this[kInstance], - ...(this[kOptions] as ConnectionOptions) + ...this[kOptions] }) this.#forwardEvents(pool, [ @@ -356,7 +356,8 @@ export class Base extends EventEm operation((error, result) => { if (error) { const genericError = error as GenericError - const retriable = genericError.findBy?.('code', NetworkError.code) || genericError.findBy?.('canRetry', true) + // Only retry if all the errors in the chain are retriable + const retriable = !genericError.findBy?.('canRetry', false) errors.push(error) if (attempt < retries && retriable && !shouldSkipRetry?.(error)) { diff --git a/src/clients/consumer/consumer.ts b/src/clients/consumer/consumer.ts index af8b90db..ceeb701f 100644 --- a/src/clients/consumer/consumer.ts +++ b/src/clients/consumer/consumer.ts @@ -2,7 +2,6 @@ import { type ValidateFunction } from 'ajv' import { type CallbackWithPromise, createPromisifiedCallback, - createTimeoutCallback, kCallbackPromise, runConcurrentCallbacks } from '../../apis/callbacks.ts' @@ -1141,8 +1140,6 @@ export class Consumer callback: Callback diagnostic: Record + timeoutHandle: NodeJS.Timeout | null + timedOut: boolean } export const ConnectionStatuses = { @@ -78,11 +81,11 @@ export const ConnectionStatuses = { ERROR: 'error' } as const -export type ConnectionStatus = keyof typeof ConnectionStatuses -export type ConnectionStatusValue = (typeof ConnectionStatuses)[keyof typeof ConnectionStatuses] +export type ConnectionStatus = (typeof ConnectionStatuses)[keyof typeof ConnectionStatuses] -export const defaultOptions: ConnectionOptions = { +export const defaultOptions = { connectTimeout: 5000, + requestTimeout: 30000, maxInflights: 5 } @@ -92,7 +95,7 @@ export class Connection extends EventEmitter { #host: string | undefined #port: number | undefined #options: ConnectionOptions - #status: ConnectionStatusValue + #status: ConnectionStatus #instanceId: number #clientId: string | undefined // @ts-ignore This is used just for debugging @@ -143,7 +146,7 @@ export class Connection extends EventEmitter { return this.#instanceId } - get status (): ConnectionStatusValue { + get status (): ConnectionStatus { return this.#status } @@ -187,7 +190,7 @@ export class Connection extends EventEmitter { typeof this.#options.tlsServerName === 'string' ? this.#options.tlsServerName : host } - const connectionTimeoutHandler = () => { + const connectingSocketTimeoutHandler = () => { const error = new TimeoutError(`Connection to ${host}:${port} timed out.`) diagnosticContext.error = error this.#socket.destroy() @@ -201,7 +204,7 @@ export class Connection extends EventEmitter { connectionsConnectsChannel.asyncEnd.publish(diagnosticContext) } - const connectionErrorHandler = (error: Error) => { + const connectingSocketErrorHandler = (error: Error) => { this.#onConnectionError(host, port, diagnosticContext, error) } @@ -216,10 +219,10 @@ export class Connection extends EventEmitter { this.#socket.setNoDelay(true) this.#socket.once(this.#options.tls ? 'secureConnect' : 'connect', () => { - this.#socket.removeListener('timeout', connectionTimeoutHandler) - this.#socket.removeListener('error', connectionErrorHandler) + this.#socket.removeListener('timeout', connectingSocketTimeoutHandler) + this.#socket.removeListener('error', connectingSocketErrorHandler) - this.#socket.on('error', this.#onError.bind(this)) + this.#socket.on('error', this.#connectedSocketErrorHandler.bind(this)) this.#socket.on('data', this.#onData.bind(this)) if (this.#handleBackPressure) { this.#socket.on('drain', this.#onDrain.bind(this)) @@ -235,8 +238,8 @@ export class Connection extends EventEmitter { } }) - this.#socket.once('timeout', connectionTimeoutHandler) - this.#socket.once('error', connectionErrorHandler) + this.#socket.once('timeout', connectingSocketTimeoutHandler) + this.#socket.once('error', connectingSocketErrorHandler) } catch (error) { this.#status = ConnectionStatuses.ERROR @@ -364,11 +367,23 @@ export class Connection extends EventEmitter { callback: null as unknown as Callback, // Will be set later hasResponseHeaderTaggedFields, noResponse: payload.context.noResponse ?? false, - diagnostic + diagnostic, + timeoutHandle: null, + timedOut: false } this.#requestsQueue.push(fastQueueCallback => { - request.callback = fastQueueCallback + request.callback = (error: Error | null, payload: any) => { + clearTimeout(request.timeoutHandle!) + request.timeoutHandle = null + fastQueueCallback(error, payload) + } + if (!request.noResponse) { + request.timeoutHandle = setTimeout(() => { + request.timedOut = true + request.callback(new TimeoutError('Request timed out'), null) + }, this.#options.requestTimeout) + } if (this.#socketMustBeDrained) { this.#afterDrainRequests.push(request) @@ -616,7 +631,11 @@ export class Connection extends EventEmitter { this.#inflightRequests.delete(correlationId) - const { apiKey, apiVersion, hasResponseHeaderTaggedFields, parser, callback } = request + const { apiKey, apiVersion, hasResponseHeaderTaggedFields, parser, callback, timedOut } = request + + if (timedOut) { + return + } let deserialized: any let responseError: Error | null = null @@ -704,7 +723,7 @@ export class Connection extends EventEmitter { } } - #onError (error: Error): void { + #connectedSocketErrorHandler (error: Error): void { clearTimeout(this.#reauthenticationTimeout) this.emit('error', new NetworkError('Connection error', { cause: error })) } diff --git a/test/clients/base/base.test.ts b/test/clients/base/base.test.ts index 30f9da26..4b8d460f 100644 --- a/test/clients/base/base.test.ts +++ b/test/clients/base/base.test.ts @@ -11,6 +11,7 @@ import { Connection, MultipleErrors, sleep, + TimeoutError, UnsupportedApiError, type ClientDiagnosticEvent, type ClusterMetadata @@ -541,22 +542,13 @@ test('metadata should handle connection failures to non-existent broker', async strictEqual(['MultipleErrors', 'AggregateError'].includes(error.name), true) // Error message should indicate failure - strictEqual(error.message.includes('failed'), true) + strictEqual(error.message, 'Cannot connect to any broker.') // Should contain nested errors strictEqual(Array.isArray(error.errors), true) - strictEqual(error.errors.length > 0, true) - - // At least one error should be a network error - const hasNetworkError = error.errors.some( - (err: any) => - err.message.includes('ECONNREFUSED') || - err.message.includes('ETIMEDOUT') || - err.message.includes('getaddrinfo') || - err.message.includes('connect') - ) - - strictEqual(hasNetworkError, true) + strictEqual(error.errors.length, 1) + strictEqual(error.errors[0] instanceof TimeoutError, true) + strictEqual(error.errors[0].message, 'Connection to 192.0.2.1:9092 timed out.') } }) diff --git a/test/clients/consumer/consumer-consumer-group-protocol.test.ts b/test/clients/consumer/consumer-consumer-group-protocol.test.ts index 770c250c..8fe45e95 100644 --- a/test/clients/consumer/consumer-consumer-group-protocol.test.ts +++ b/test/clients/consumer/consumer-consumer-group-protocol.test.ts @@ -8,6 +8,7 @@ import { ProduceAcks, ResponseError, sleep, + TimeoutError, UnsupportedApiError, type MessageToProduce, type ProducerOptions @@ -165,11 +166,22 @@ test('#consumerGroupHeartbeat should ignore response when closed ', skipConsumer }) test('#consumerGroupHeartbeat should timeout and schedule another heartbeat', skipConsumerGroupProtocol, async t => { - const consumer = createConsumer(t, { groupProtocol: 'consumer', maxWaitTime: 100, timeout: 200 }) + const consumer = createConsumer(t, { groupProtocol: 'consumer', maxWaitTime: 100, requestTimeout: 200 }) const topic = await createTopic(t, true, 1) const stream = await consumer.consume({ topics: [topic] }) - let mockCount = 1 - mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, () => mockCount-- > 0) + mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, ( + __originalSend, + __apiKey, + __apiVersion, + __payload, + __responseParser, + __hasRequestHeaderTaggedFields, + __hasResponseHeaderTaggedFields, + callback + ) => { + callback(new TimeoutError('Request timed out'), null) + return false + }) await once(consumer, 'consumer:heartbeat:error') await once(consumer, 'consumer:heartbeat:end') await stream.close() diff --git a/test/clients/consumer/consumer.test.ts b/test/clients/consumer/consumer.test.ts index d71bca37..0efedf4e 100644 --- a/test/clients/consumer/consumer.test.ts +++ b/test/clients/consumer/consumer.test.ts @@ -929,7 +929,7 @@ test('fetch should support both promise and callback API', async t => { const topicInfo = metadata.topics.get(topic)! await new Promise((resolve, reject) => { - const consumer = createConsumer(t) + const consumer = createConsumer(t, { requestTimeout: 100000 }) // First test callback API consumer.fetch( diff --git a/test/clients/producer/producer.test.ts b/test/clients/producer/producer.test.ts index e11af3df..f6cf97f0 100644 --- a/test/clients/producer/producer.test.ts +++ b/test/clients/producer/producer.test.ts @@ -715,7 +715,7 @@ test('send should handle synchronuous error during payload creation', async t => const testTopic = await createTopic(t) const compression = 'lz4' - const expectedError = new GenericError('PLT_KFK_UNSUPPORTED_COMPRESSION', 'Avoid RUD') + const expectedError = new GenericError('PLT_KFK_UNSUPPORTED_COMPRESSION', 'Avoid RUD', { canRetry: false }) t.mock.method(compressionsAlgorithms[compression], 'compressSync', () => { throw expectedError }) diff --git a/test/helpers.ts b/test/helpers.ts index eb97d981..5c34bd8d 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -130,7 +130,9 @@ export function mockMethod ( fn?: (original: (...args: any[]) => void, ...args: any[]) => boolean | void ) { if (typeof errorToMock === 'undefined') { - errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')]) + errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')], { + canRetry: false + }) } const original = target[method].bind(target) @@ -259,7 +261,9 @@ export function mockAPI ( fn?: (original: (...args: any[]) => void, ...args: any[]) => boolean | void ) { if (typeof errorToMock === 'undefined') { - errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')]) + errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')], { + canRetry: false + }) } const originalGet = pool.get.bind(pool) diff --git a/test/network/connection.test.ts b/test/network/connection.test.ts index 139aa309..4a442ff2 100644 --- a/test/network/connection.test.ts +++ b/test/network/connection.test.ts @@ -411,6 +411,100 @@ test('Connection.send should enqueue request and process response', async t => { verifyTracingChannel() }) +test('Connection.send should handle requests with response', async t => { + const { server, port } = await createServer(t) + const connection = new Connection('test-client') + t.after(() => connection.close()) + + // Setup a simple echo server + server.on('connection', socket => { + socket.on('data', () => { + setTimeout(() => socket.write(Buffer.from([0, 0, 0, 0, 0, 0, 0, 1])), 1000) + }) + }) + + await connection.connect('localhost', port) + + // Create payload function that indicates no response is expected + function payloadFn () { + const writer = Writer.create() + writer.appendInt32(42) + return writer + } + + await new Promise((resolve, reject) => { + connection.send( + 0, // apiKey + 0, // apiVersion + payloadFn, + function () { + return 'Success' + }, // Dummy parser + false, // hasRequestHeaderTaggedFields + false, // hasResponseHeaderTaggedFields + (err, returnValue) => { + if (err) { + reject(err) + } else { + resolve(returnValue) + } + } + ) + }) +}) + +test('Connection.send should time out eventually (custom timeout)', async t => { + const { server, port } = await createServer(t) + const customTimeout = 2000 + const connection = new Connection('test-client', { requestTimeout: customTimeout }) + t.after(() => connection.close()) + + // Setup a simple echo server + server.on('connection', socket => { + socket.on('data', () => { + setTimeout(() => socket.write(Buffer.from([0, 0, 0, 0, 0, 0, 0, 1])), 10000) + }) + }) + + await connection.connect('localhost', port) + + // Create payload function that indicates no response is expected + function payloadFn () { + const writer = Writer.create() + writer.appendInt32(42) + return writer + } + + const startTime = performance.now() + try { + await new Promise((resolve, reject) => { + connection.send( + 0, // apiKey + 0, // apiVersion + payloadFn, + function () { + return 'Success' + }, // Dummy parser + false, // hasRequestHeaderTaggedFields + false, // hasResponseHeaderTaggedFields + (err, returnValue) => { + if (err) { + reject(err) + } else { + resolve(returnValue) + } + } + ) + }) + throw new Error('Expected request to time out') + } catch (error) { + deepStrictEqual((error as Error).message, 'Request timed out') + const timeoutMargin = customTimeout * 0.01 // Allow 1% margin + const timeoutDiff = Math.abs(performance.now() - startTime - customTimeout) + strictEqual(timeoutDiff <= timeoutMargin, true, 'Should time out with custom timeout') + } +}) + test('Connection.send should handle requests with no response', async t => { const { server, port } = await createServer(t) const connection = new Connection('test-client')