Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Kafka Instrumentation events #11616

Closed
1 task done
marinakurtin opened this issue May 14, 2023 · 8 comments
Closed
1 task done

Add support for Kafka Instrumentation events #11616

marinakurtin opened this issue May 14, 2023 · 8 comments

Comments

@marinakurtin
Copy link

marinakurtin commented May 14, 2023

Is there an existing issue that is already proposing this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe it

I need an ability to monitor the events of my Kafka consumer

As far as I know it's already supported in the Kafka.js

I added the documentations reference in the documentation section below

I succeeded to find a workaround solution.
On receive of the first message of kafka, I can extract the reference of the Consumer object and add listeners on it.

Here is code example

@EventPattern('kafkaTopic')
 handler(@Ctx() context: KafkaContext) {
  if (!this.consumer) {
    this.consumer = context.getConsumer();
    this.consumer.on('consumer.heartbeat', (event) => {
      console.log("HEARTBEAT event", event);
    });
  }
//business logic }

But in my opinion, it's a bad solution, because that way requires to check on each message if it's already handled or not.

Describe the solution you'd like

I would an ability to connect the Instrumentation events.

I would like to have a decorator, which would receive the name of the event and connect to it.

Or

I would like to have an ability to add it into the options object of the NestFactory.createMicroservice method

Teachability, documentation, adoption, migration strategy

https://kafka.js.org/docs/instrumentation-events

What is the motivation / use case for changing the behavior?

The ability to monitor the consumer behaviour

@marinakurtin marinakurtin added needs triage This issue has not been looked into type: enhancement 🐺 labels May 14, 2023
@david-badalov
Copy link

david-badalov commented May 16, 2023

Would like to see this implemented too
I had to hack it:

export class CustomKafkaServer extends ServerKafka {
            async start(callback): Promise<void> {
                const consumerOptions = Object.assign(this.options.consumer || {}, {
                  groupId: this.groupId,
                });
                this.consumer = this.client.consumer(consumerOptions);
                this.producer = this.client.producer(this.options.producer);
                this.registerEvents(this.instrumentationEvents);  <--- add events here
                await this.consumer.connect();
                await this.producer.connect();
                await this.bindEvents(this.consumer);
                callback();
              }
}

@esahin90
Copy link
Contributor

I created a PR to address crashing consumers based on the Instrumentation Events. Maybe it could be handle more generally.

Here is the related PR: #11910

@kamilmysliwiec
Copy link
Member

So far the only way to accomplish this is to use a custom strategy that extends the built-in transporter. Example:

import { ServerKafka } from '@nestjs/microservices';
import { Consumer } from '@nestjs/microservices/external/kafka.interface';

class MyCustomStrategy extends ServerKafka {
  async bindEvents(consumer: Consumer) {
    consumer.on('consumer.heartbeat', () => console.log('...'));
    await super.bindEvents(consumer);
  }
}

// and later
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  strategy: new MyCustomStrategy({ ...kafka options here }),
});

@micalevisk micalevisk removed the needs triage This issue has not been looked into label Jul 9, 2023
@david-badalov
Copy link

david-badalov commented Jul 25, 2023

Hey @micalevisk @johnbiundo @jmcdo29 @BrunnerLivio @MarkPieszak @valorkin,
We're big fans of NestJS at our organization! 🚀 We've submitted a PR that adds kafkajs instrumentation events to NestJS, addressing a current gap. We'd be grateful if you could review it when you have time.
Thanks!

#12012

@esahin90
Copy link
Contributor

esahin90 commented Oct 2, 2023

Any updates regarding the review of the PR? This feature is crucial for a production environment, since you could get into the state, where the consumer dies, but your nest-service is still running without any issues. When a consumer crashes, the service would not be consuming anymore messages from kafka, but the service is still alive. I would not recommend to use the kafka consumer implementation in production until your cable of listening to consumer CRASH events or at least the nest-service carries over the failure.

I tried to address the consumer crash problem in a different PR, where the nestjs kafka consumer is always listening to these event in #11910

@kamilmysliwiec
Copy link
Member

This feature isn't crucial as there's currently a different way to register event listeners. We're still debating if the approach/API proposed in this PR is actually simpler than what you can do now @esahin90
(#11616 (comment))

@micalevisk
Copy link
Member

micalevisk commented Mar 20, 2024

I think that we shouldn't add new features related with kafka.js due to #13223

@kamilmysliwiec
Copy link
Member

#14142

@nestjs nestjs locked and limited conversation to collaborators Nov 20, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants