Skip to content

Commit

Permalink
Merge pull request #1345 from tulios/remove-polyfills
Browse files Browse the repository at this point in the history
Remove unneeded utilities
  • Loading branch information
Nevon authored May 6, 2022
2 parents 89c5beb + 36a272b commit fe9bd30
Show file tree
Hide file tree
Showing 23 changed files with 33 additions and 98 deletions.
11 changes: 5 additions & 6 deletions src/admin/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const createRetry = require('../retry')
const flatten = require('../utils/flatten')
const waitFor = require('../utils/waitFor')
const groupBy = require('../utils/groupBy')
const createConsumer = require('../consumer')
Expand Down Expand Up @@ -961,19 +960,19 @@ module.exports = ({
)
)

const errors = flatten(
res.map(({ results }) =>
const errors = res
.flatMap(({ results }) =>
results.map(({ groupId, errorCode, error }) => {
return { groupId, errorCode, error }
})
)
).filter(({ errorCode }) => errorCode !== 0)
.filter(({ errorCode }) => errorCode !== 0)

clonedGroupIds = errors.map(({ groupId }) => groupId)

if (errors.length > 0) throw new KafkaJSDeleteGroupsError('Error in DeleteGroups', errors)

results = flatten(res.map(({ results }) => results))
results = res.flatMap(({ results }) => results)

return results
} catch (e) {
Expand Down Expand Up @@ -1012,7 +1011,7 @@ module.exports = ({
partitions.map(p => p.partition)
)

const partitionsFound = flatten(values(partitionsByBroker))
const partitionsFound = values(partitionsByBroker).flat()
const topicOffsets = await fetchTopicOffsets(topic)

const leaderNotFoundErrors = []
Expand Down
3 changes: 1 addition & 2 deletions src/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ const Lock = require('../utils/lock')
const sharedPromiseTo = require('../utils/sharedPromiseTo')
const createRetry = require('../retry')
const connectionPoolBuilder = require('./connectionPoolBuilder')
const flatten = require('../utils/flatten')
const { EARLIEST_OFFSET, LATEST_OFFSET } = require('../constants')
const {
KafkaJSError,
Expand Down Expand Up @@ -493,7 +492,7 @@ module.exports = class Cluster {

// Execute all requests, merge and normalize the responses
const responses = await Promise.all(requests)
const partitionsPerTopic = flatten(responses).reduce(mergeTopics, {})
const partitionsPerTopic = responses.flat().reduce(mergeTopics, {})

return keys(partitionsPerTopic).map(topic => ({
topic,
Expand Down
4 changes: 1 addition & 3 deletions src/consumer/assigners/roundRobinAssigner/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const { MemberMetadata, MemberAssignment } = require('../../assignerProtocol')
const flatten = require('../../../utils/flatten')

/**
* RoundRobinAssigner
Expand Down Expand Up @@ -42,11 +41,10 @@ module.exports = ({ cluster }) => ({
const sortedMembers = members.map(({ memberId }) => memberId).sort()
const assignment = {}

const topicsPartionArrays = topics.map(topic => {
const topicsPartitions = topics.flatMap(topic => {
const partitionMetadata = cluster.findTopicPartitionMetadata(topic)
return partitionMetadata.map(m => ({ topic: topic, partitionId: m.partitionId }))
})
const topicsPartitions = flatten(topicsPartionArrays)

topicsPartitions.forEach((topicPartition, i) => {
const assignee = sortedMembers[i % membersCount]
Expand Down
3 changes: 1 addition & 2 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const sleep = require('../utils/sleep')
const websiteUrl = require('../utils/websiteUrl')
const arrayDiff = require('../utils/arrayDiff')
const flatMap = require('../utils/flatMap')
const createRetry = require('../retry')
const sharedPromiseTo = require('../utils/sharedPromiseTo')

Expand Down Expand Up @@ -495,7 +494,7 @@ module.exports = class ConsumerGroup {
rackId: this.rackId,
})

return flatMap(responses, ({ topicName, partitions }) => {
return responses.flatMap(({ topicName, partitions }) => {
const topicRequestData = requests.find(({ topic }) => topic === topicName)

let preferredReadReplicas = this.preferredReadReplicasPerTopicPartition[topicName]
Expand Down
5 changes: 2 additions & 3 deletions src/consumer/offsetManager/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const Long = require('../../utils/long')
const flatten = require('../../utils/flatten')
const isInvalidOffset = require('./isInvalidOffset')
const initializeConsumerOffsets = require('./initializeConsumerOffsets')
const {
Expand Down Expand Up @@ -131,8 +130,8 @@ module.exports = class OffsetManager {
const subtractTopicOffsets = topic =>
subtractPartitionOffsets(this.resolvedOffsets[topic], committedOffsets[topic])

const offsetsDiff = this.topics.map(subtractTopicOffsets)
return flatten(offsetsDiff).reduce((sum, offset) => sum.add(offset), Long.fromValue(0))
const offsetsDiff = this.topics.flatMap(subtractTopicOffsets)
return offsetsDiff.reduce((sum, offset) => sum.add(offset), Long.fromValue(0))
}

/**
Expand Down
4 changes: 1 addition & 3 deletions src/consumer/workerQueue.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
const allSettled = require('../utils/promiseAllSettled')

/**
* @typedef {ReturnType<typeof createWorkerQueue>} WorkerQueue
*/
Expand Down Expand Up @@ -28,7 +26,7 @@ const createWorkerQueue = ({ workers }) => {

workers.forEach(worker => worker.run({ next: () => queue.shift() }))

const results = await allSettled(promises)
const results = await Promise.allSettled(promises)
const rejected = results.find(result => result.status === 'rejected')
if (rejected) {
// @ts-ignore
Expand Down
6 changes: 2 additions & 4 deletions src/producer/__tests__/idempotentProduceMessages.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
jest.setTimeout(20000)

const PromiseAllSettled = require('../../utils/promiseAllSettled')

const {
secureRandom,
newLogger,
Expand Down Expand Up @@ -148,7 +146,7 @@ describe('Producer > Idempotent producer', () => {
})
}

await PromiseAllSettled(
await Promise.allSettled(
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
)

Expand All @@ -173,7 +171,7 @@ describe('Producer > Idempotent producer', () => {
})
}

await PromiseAllSettled(
await Promise.allSettled(
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
)

Expand Down
9 changes: 2 additions & 7 deletions src/producer/responseSerializer.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
const flatten = require('../utils/flatten')

module.exports = ({ topics }) => {
const partitions = topics.map(({ topicName, partitions }) =>
module.exports = ({ topics }) =>
topics.flatMap(({ topicName, partitions }) =>
partitions.map(partition => ({ topicName, ...partition }))
)

return flatten(partitions)
}
4 changes: 1 addition & 3 deletions src/producer/sendMessages.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
const flatten = require('../utils/flatten')
const { KafkaJSMetadataNotLoaded } = require('../errors')
const { staleMetadata } = require('../protocol/error')
const groupMessagesPerPartition = require('./groupMessagesPerPartition')
Expand Down Expand Up @@ -132,8 +131,7 @@ module.exports = ({ logger, cluster, partitioner, eosManager, retrier }) => {
try {
const requests = await createProducerRequests(responsePerBroker)
await Promise.all(requests)
const responses = Array.from(responsePerBroker.values())
return flatten(responses)
return Array.from(responsePerBroker.values()).flat()
} catch (e) {
if (e.name === 'KafkaJSConnectionClosedError') {
cluster.removeBroker({ host: e.host, port: e.port })
Expand Down
4 changes: 1 addition & 3 deletions src/protocol/requests/fetch/v0/response.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const Decoder = require('../../../decoder')
const { KafkaJSOffsetOutOfRange } = require('../../../../errors')
const { failure, createErrorFromCode, errorCodes } = require('../../../error')
const flatten = require('../../../../utils/flatten')
const MessageSetDecoder = require('../../../messageSet/decoder')

/**
Expand Down Expand Up @@ -42,13 +41,12 @@ const { code: OFFSET_OUT_OF_RANGE_ERROR_CODE } = errorCodes.find(
)

const parse = async data => {
const partitionsWithError = data.responses.map(({ topicName, partitions }) => {
const errors = data.responses.flatMap(({ topicName, partitions }) => {
return partitions
.filter(partition => failure(partition.errorCode))
.map(partition => Object.assign({}, partition, { topic: topicName }))
})

const errors = flatten(partitionsWithError)
if (errors.length > 0) {
const { errorCode, topic, partition } = errors[0]
if (errorCode === OFFSET_OUT_OF_RANGE_ERROR_CODE) {
Expand Down
5 changes: 2 additions & 3 deletions src/protocol/requests/listOffsets/v0/response.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* Offsets Response (Version: 0) => [responses]
Expand Down Expand Up @@ -33,10 +32,10 @@ const decodePartitions = decoder => ({
const decodeOffsets = decoder => decoder.readInt64().toString()

const parse = async data => {
const partitionsWithError = data.responses.map(response =>
const partitionsWithError = data.responses.flatMap(response =>
response.partitions.filter(partition => failure(partition.errorCode))
)
const partitionWithError = flatten(partitionsWithError)[0]
const partitionWithError = partitionsWithError[0]
if (partitionWithError) {
throw createErrorFromCode(partitionWithError.errorCode)
}
Expand Down
5 changes: 2 additions & 3 deletions src/protocol/requests/listOffsets/v1/response.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* ListOffsets Response (Version: 1) => [responses]
Expand Down Expand Up @@ -33,10 +32,10 @@ const decodePartitions = decoder => ({
})

const parse = async data => {
const partitionsWithError = data.responses.map(response =>
const partitionsWithError = data.responses.flatMap(response =>
response.partitions.filter(partition => failure(partition.errorCode))
)
const partitionWithError = flatten(partitionsWithError)[0]
const partitionWithError = partitionsWithError[0]
if (partitionWithError) {
throw createErrorFromCode(partitionWithError.errorCode)
}
Expand Down
5 changes: 2 additions & 3 deletions src/protocol/requests/listOffsets/v2/response.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* ListOffsets Response (Version: 2) => throttle_time_ms [responses]
Expand Down Expand Up @@ -35,10 +34,10 @@ const decodePartitions = decoder => ({
})

const parse = async data => {
const partitionsWithError = data.responses.map(response =>
const partitionsWithError = data.responses.flatMap(response =>
response.partitions.filter(partition => failure(partition.errorCode))
)
const partitionWithError = flatten(partitionsWithError)[0]
const partitionWithError = partitionsWithError[0]
if (partitionWithError) {
throw createErrorFromCode(partitionWithError.errorCode)
}
Expand Down
4 changes: 1 addition & 3 deletions src/protocol/requests/metadata/v0/response.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* Metadata Response (Version: 0) => [brokers] [topic_metadata]
Expand Down Expand Up @@ -56,11 +55,10 @@ const parse = async data => {
throw createErrorFromCode(topicErrorCode)
}

const partitionsWithErrors = data.topicMetadata.map(topic => {
const errors = data.topicMetadata.flatMap(topic => {
return topic.partitionMetadata.filter(partition => failure(partition.partitionErrorCode))
})

const errors = flatten(partitionsWithErrors)
if (errors.length > 0) {
const { partitionErrorCode } = errors[0]
throw createErrorFromCode(partitionErrorCode)
Expand Down
5 changes: 2 additions & 3 deletions src/protocol/requests/offsetCommit/v0/response.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* OffsetCommit Response (Version: 0) => [responses]
Expand Down Expand Up @@ -29,10 +28,10 @@ const decodePartitions = decoder => ({
})

const parse = async data => {
const partitionsWithError = data.responses.map(response =>
const partitionsWithError = data.responses.flatMap(response =>
response.partitions.filter(partition => failure(partition.errorCode))
)
const partitionWithError = flatten(partitionsWithError)[0]
const partitionWithError = partitionsWithError[0]
if (partitionWithError) {
throw createErrorFromCode(partitionWithError.errorCode)
}
Expand Down
5 changes: 2 additions & 3 deletions src/protocol/requests/offsetFetch/v1/response.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* OffsetFetch Response (Version: 1) => [responses]
Expand Down Expand Up @@ -33,10 +32,10 @@ const decodePartitions = decoder => ({
})

const parse = async data => {
const partitionsWithError = data.responses.map(response =>
const partitionsWithError = data.responses.flatMap(response =>
response.partitions.filter(partition => failure(partition.errorCode))
)
const partitionWithError = flatten(partitionsWithError)[0]
const partitionWithError = partitionsWithError[0]
if (partitionWithError) {
throw createErrorFromCode(partitionWithError.errorCode)
}
Expand Down
5 changes: 2 additions & 3 deletions src/protocol/requests/offsetFetch/v2/response.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* OffsetFetch Response (Version: 2) => [responses] error_code
Expand Down Expand Up @@ -39,10 +38,10 @@ const parse = async data => {
throw createErrorFromCode(data.errorCode)
}

const partitionsWithError = data.responses.map(response =>
const partitionsWithError = data.responses.flatMap(response =>
response.partitions.filter(partition => failure(partition.errorCode))
)
const partitionWithError = flatten(partitionsWithError)[0]
const partitionWithError = partitionsWithError[0]
if (partitionWithError) {
throw createErrorFromCode(partitionWithError.errorCode)
}
Expand Down
4 changes: 1 addition & 3 deletions src/protocol/requests/produce/v0/response.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* v0
Expand Down Expand Up @@ -30,11 +29,10 @@ const decode = async rawData => {
}

const parse = async data => {
const partitionsWithError = data.topics.map(topic => {
const errors = data.topics.flatMap(topic => {
return topic.partitions.filter(partition => failure(partition.errorCode))
})

const errors = flatten(partitionsWithError)
if (errors.length > 0) {
const { errorCode } = errors[0]
throw createErrorFromCode(errorCode)
Expand Down
4 changes: 1 addition & 3 deletions src/protocol/requests/produce/v3/response.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* Produce Response (Version: 3) => [responses] throttle_time_ms
Expand Down Expand Up @@ -37,11 +36,10 @@ const decode = async rawData => {
}

const parse = async data => {
const partitionsWithError = data.topics.map(response => {
const errors = data.topics.flatMap(response => {
return response.partitions.filter(partition => failure(partition.errorCode))
})

const errors = flatten(partitionsWithError)
if (errors.length > 0) {
const { errorCode } = errors[0]
throw createErrorFromCode(errorCode)
Expand Down
3 changes: 0 additions & 3 deletions src/utils/flatMap.js

This file was deleted.

12 changes: 0 additions & 12 deletions src/utils/flatten.js

This file was deleted.

Loading

0 comments on commit fe9bd30

Please sign in to comment.