diff --git a/src/broker/__tests__/produce.spec.js b/src/broker/__tests__/produce.spec.js index 70c960a8e..ddfc6b362 100644 --- a/src/broker/__tests__/produce.spec.js +++ b/src/broker/__tests__/produce.spec.js @@ -300,5 +300,61 @@ describe('Broker > Produce', () => { throttleTime: 0, }) }) + + testIfKafka011('request to a topic with max timestamp difference configured', async () => { + topicName = `test-max-timestamp-difference-${secureRandom()}` + + await createTopic({ + topic: topicName, + config: [ + { + name: 'message.timestamp.difference.max.ms', + value: '604800000', // 7 days + }, + ], + }) + + const metadata = await retryProtocol( + 'LEADER_NOT_AVAILABLE', + async () => await broker.metadata([topicName]) + ) + + // Find leader of partition + const partitionBroker = metadata.topicMetadata[0].partitionMetadata[0].leader + const newBrokerData = metadata.brokers.find(b => b.nodeId === partitionBroker) + + // Connect to the correct broker to produce message + broker2 = new Broker({ + connection: createConnection(newBrokerData), + logger: newLogger(), + allowExperimentalV011: true, + }) + await broker2.connect() + + const partitionData = { + topic: topicName, + partitions: [ + { + partition: 0, + messages: [{ key: `key-${secureRandom()}`, value: `some-value-${secureRandom()}` }], + }, + ], + } + + const response1 = await retryProtocol( + 'LEADER_NOT_AVAILABLE', + async () => await broker2.produce({ topicData: [partitionData] }) + ) + + expect(response1).toEqual({ + topics: [ + { + topicName, + partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }], + }, + ], + throttleTime: 0, + }) + }) }) }) diff --git a/src/protocol/requests/produce/v3/request.js b/src/protocol/requests/produce/v3/request.js index 79075b972..a2895181d 100644 --- a/src/protocol/requests/produce/v3/request.js +++ b/src/protocol/requests/produce/v3/request.js @@ -63,11 +63,15 @@ const topicEncoder = compression => async ({ topic, partitions }) => { const partitionsEncoder = compression => async ({ partition, messages }) => { const dateNow = Date.now() - let timestamps = messages.map(m => m.timestamp) - timestamps = timestamps.length === 0 ? [dateNow] : timestamps + const messageTimestamps = messages + .map(m => m.timestamp) + .filter(timestamp => timestamp != null) + .sort() + + const timestamps = messageTimestamps.length === 0 ? [dateNow] : messageTimestamps + const firstTimestamp = timestamps[0] + const maxTimestamp = timestamps[timestamps.length - 1] - const firstTimestamp = Math.min(...timestamps) - const maxTimestamp = Math.max(...timestamps) const records = messages.map((message, i) => Record({ ...message,