Skip to content

Commit

Permalink
Merge pull request #188 from tulios/fix-message-timestamp-on-produce-v3
Browse files Browse the repository at this point in the history
Fix message timestamp on produce v3
  • Loading branch information
tulios authored Oct 29, 2018
2 parents d35a152 + 392d640 commit e452afd
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 6 deletions.
56 changes: 56 additions & 0 deletions src/broker/__tests__/produce.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -300,5 +300,61 @@ describe('Broker > Produce', () => {
throttleTime: 0,
})
})

testIfKafka011('request to a topic with max timestamp difference configured', async () => {
topicName = `test-max-timestamp-difference-${secureRandom()}`

await createTopic({
topic: topicName,
config: [
{
name: 'message.timestamp.difference.max.ms',
value: '604800000', // 7 days
},
],
})

const metadata = await retryProtocol(
'LEADER_NOT_AVAILABLE',
async () => await broker.metadata([topicName])
)

// Find leader of partition
const partitionBroker = metadata.topicMetadata[0].partitionMetadata[0].leader
const newBrokerData = metadata.brokers.find(b => b.nodeId === partitionBroker)

// Connect to the correct broker to produce message
broker2 = new Broker({
connection: createConnection(newBrokerData),
logger: newLogger(),
allowExperimentalV011: true,
})
await broker2.connect()

const partitionData = {
topic: topicName,
partitions: [
{
partition: 0,
messages: [{ key: `key-${secureRandom()}`, value: `some-value-${secureRandom()}` }],
},
],
}

const response1 = await retryProtocol(
'LEADER_NOT_AVAILABLE',
async () => await broker2.produce({ topicData: [partitionData] })
)

expect(response1).toEqual({
topics: [
{
topicName,
partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }],
},
],
throttleTime: 0,
})
})
})
})
12 changes: 8 additions & 4 deletions src/protocol/requests/produce/v3/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ const topicEncoder = compression => async ({ topic, partitions }) => {

const partitionsEncoder = compression => async ({ partition, messages }) => {
const dateNow = Date.now()
let timestamps = messages.map(m => m.timestamp)
timestamps = timestamps.length === 0 ? [dateNow] : timestamps
const messageTimestamps = messages
.map(m => m.timestamp)
.filter(timestamp => timestamp != null)
.sort()

const timestamps = messageTimestamps.length === 0 ? [dateNow] : messageTimestamps
const firstTimestamp = timestamps[0]
const maxTimestamp = timestamps[timestamps.length - 1]

const firstTimestamp = Math.min(...timestamps)
const maxTimestamp = Math.max(...timestamps)
const records = messages.map((message, i) =>
Record({
...message,
Expand Down
8 changes: 6 additions & 2 deletions testHelpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,16 @@ const addPartitions = async ({ topic, partitions }) => {
})
}

const testIfKafka011 = (description, callback) => {
const testIfKafka011 = (description, callback, testFn = test) => {
return process.env.KAFKA_VERSION === '0.11'
? test(description, callback)
? testFn(description, callback)
: test.skip(description, callback)
}

testIfKafka011.only = (description, callback) => {
return testIfKafka011(description, callback, test.only)
}

const unsupportedVersionResponse = () => Buffer.from({ type: 'Buffer', data: [0, 35, 0, 0, 0, 0] })

module.exports = {
Expand Down

0 comments on commit e452afd

Please sign in to comment.