-
-
Notifications
You must be signed in to change notification settings - Fork 7.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka microservice can process only one message by topic at the moment #12703
Comments
Same.( Any solutions on the latest preset? |
I think this issue is missing some more detail to clarify whether this is expected behavior or not.
Kafka is intended to process messages in each partition in order, so it doesn't make sense to process messages later until you finish processing the ones in front. |
So, We've updated example repository, readme was added. The problem is that consumer doesn't process message even on another topic if some message was sent to the one topic (when we send message to test1 topic, consumer doesn't process any messages on test2 topic) |
That's just how eachMessage for KafkaJS works. It processes each topic-partition sequentially one at a time. You need to set that partitionsConsumedConcurrently to get what you want. You're saying this only started happening at an upgrade to 10.x, can you reproduce in another example repository with a lower NestJs version? |
So, I made another branch |
AFAIR there was a bug in v7 (or v8) that instead of awaiting the |
Nothing is mentioned about different topics in documentation of KafkaJS. For sure, messages in the same partition should be processed in order, and Kafka only implies this about partitions but not topics. Imagine that we have two users:
If we're processing two different topics in sync so while one user waits response about his profile second one can get main page of the site.
|
Who said you can't process two topics in parallel though? |
I guess there is misunderstanding because we're not talking about Guys provided valid test-case in the In the case of CPU-bound tasks and single-thread node for sure I cannot argue but in their case they have return new Promise<string>((resolve) => {
setTimeout(() => {
resolve("pong1");
const time2 = new Date()
console.log(`test1 took ${getTimeDifference(time1, time2)} secs`)
}, 5000);
}); If we have two different topics with the same behaviour they should be processed concurrently but they don't. and according to |
I'll investigate this as soon as I can, thank you @weissmall |
Update after debuggingFile: public async handleEvent(
pattern: string,
packet: ReadPacket,
context: KafkaContext,
): Promise<any> {
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
}
const resultOrStream = await handler(packet.data, context);
if (isObservable(resultOrStream)) {
await lastValueFrom(resultOrStream); // Here reason of the issue
}
} It seems a little bit strange but while we're awaiting I guess for this situation we need to have something kinda message queue for each topic to allow different topics be processed asynchronously. If we won't await response we'll get issue with processing one partition asynchronously which is bad, so we need some workaround or maybe core issue may be somewhere deeper that I've discovered |
Hi, any updates? |
I can reproduce this with raw import { Kafka } from "kafkajs";
const kafka = new Kafka({
clientId: "my-app",
brokers: ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "test-group" });
const run = async () => {
await consumer.connect();
await consumer.subscribe({ topic: "test1" });
await consumer.subscribe({ topic: "test2" });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log("Received message", {
topic,
partition,
key: message.value.toString(),
});
await new Promise((resolve) => setTimeout(resolve, 5000));
console.log("Processed message");
},
});
};
run().catch(console.error); Result:
|
I'm running into same siutation ... any updates? |
Have you tried using an observable? #3954 (comment) |
|
With using Blizzard's node-rdkafka everything works perfect Code: var Transform = require('stream').Transform;
const { randomInt } = require('crypto');
var Kafka = require('node-rdkafka');
var stream = Kafka.KafkaConsumer.createReadStream({
'metadata.broker.list': 'localhost:9092',
'group.id': 'test-group',
'socket.keepalive.enable': true,
'enable.auto.commit': true,
"offset.store.method": 'broker',
}, {
'auto.offset.reset': "end",
"offset.store.method": 'broker',
"enable.auto.commit": true,
"auto.commit.interval.ms": 5000,
"offset.store.sync.interval.ms": 5000,
}, {
topics: ['test1', 'test2'],
waitInterval: 0,
objectMode: false
});
stream.on('error', function(err) {
if (err) console.log(err);
process.exit(1);
});
stream.on('error', function(err) {
console.log(err);
process.exit(1);
});
stream.consumer.on('event.error', function(err) {
console.log(err);
})
stream.consumer.on('ready', (_) => {
console.log('Ready');
})
stream.consumer.on('subscribed', (_) => {
console.log('Subscribed')
})
stream.consumer.on('data', async ({ topic, value, partition }) => {
const id = randomInt(500);
console.log(`Got message with [id ${id}] at [${new Date()}]\n`, {
topic, value: value.toString(), partition,
})
await new Promise((resolve) => setTimeout(resolve, 5000));
console.log(`Processed message with [id ${id}] at [${new Date()}]`);
}) Received logs:
|
Would you like to create a PR replacing kafkajs with node-rdkafka? |
@kamilmysliwiec This will be a breaking change but I can take a swing at it, it has been a while. |
Does it have to be a breaking change? Could it be introduced as a new transport type and mark the old one for deprecation? |
It can be implemented as a new transport which is what I will work towards first. @kamilmysliwiec let me know which direction you would want to take. I am opening a new issue here; #13535 |
Sounds good @mkaufmaner |
I think waiting for "combineStreamsAndThrowIfRetriable()" in the handleMessage method of the ServerKafka class might be an issue. Another problem is that the HeartBeat operation cannot be performed, resulting in a problem of rebalancing because it is blocked while processing one message. Do you have any updates planned to address these issues? |
|
Let's track this here #13538 |
Is there an existing issue for this?
Current behavior
When sending any message on the topic by kafka, application getting stuck and doesn't process other messages until processing on the previous version will be finished
Minimum reproduction code
https://github.com/Klutrem/kafka-issue
Steps to reproduce
npm i
localhost:9092
npm run start
test
Expected behavior
It was working on the previous versions:
With that package.json, application can process any number of messages at the same time, it doesn't stuck
Package
@nestjs/common
@nestjs/core
@nestjs/microservices
@nestjs/platform-express
@nestjs/platform-fastify
@nestjs/platform-socket.io
@nestjs/platform-ws
@nestjs/testing
@nestjs/websockets
Other package
No response
NestJS version
10.2.7
Packages versions
Node.js version
18.16.0
In which operating systems have you tested?
Other
No response
The text was updated successfully, but these errors were encountered: