Skip to content

Commit

Permalink
feat(microservices): Allow RegExps in Patterns for Kafka
Browse files Browse the repository at this point in the history
closes #3083
  • Loading branch information
edeesis committed May 1, 2024
1 parent eb892ca commit e89fcda
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 15 deletions.
16 changes: 15 additions & 1 deletion integration/microservices/e2e/sum-kafka.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,21 @@ describe.skip('Kafka transport', function () {
.send()
.end(() => {
setTimeout(() => {
expect(KafkaController.IS_NOTIFIED).to.be.true;
expect(KafkaMessagesController.IS_NOTIFIED).to.be.true;
done();
}, 1000);
});
});

it(`/POST (async event notification)`, done => {
request(server)
.post('/notifyRegex')
.send()
.end(() => {
setTimeout(() => {
expect(KafkaMessagesController.IS_REGEX_NOTIFIED).to.be.eq(
'regex.notify.test-0',
);
done();
}, 1000);
});
Expand Down
6 changes: 5 additions & 1 deletion integration/microservices/src/kafka/kafka.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { UserDto } from './dtos/user.dto';
@Controller()
export class KafkaController implements OnModuleInit, OnModuleDestroy {
protected readonly logger = new Logger(KafkaController.name);
static IS_NOTIFIED = false;
static MATH_SUM = 0;

@Client({
Expand Down Expand Up @@ -133,6 +132,11 @@ export class KafkaController implements OnModuleInit, OnModuleDestroy {
return this.client.emit('notify', { notify: true });
}

@Post('notifyRegex')
async sendRegexNotification(): Promise<any> {
return this.client.emit('regex.notify.test-0', { notify: true });
}

// Complex data to send.
@Post('/user')
@HttpCode(200)
Expand Down
15 changes: 13 additions & 2 deletions integration/microservices/src/kafka/kafka.messages.controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';
import {
Ctx,
EventPattern,
KafkaContext,
MessagePattern,
} from '@nestjs/microservices';
import { BusinessDto } from './dtos/business.dto';
import { UserDto } from './dtos/user.dto';
import { BusinessEntity } from './entities/business.entity';
Expand All @@ -10,6 +15,7 @@ import { KafkaController } from './kafka.controller';
export class KafkaMessagesController {
protected readonly logger = new Logger(KafkaMessagesController.name);
static IS_NOTIFIED = false;
static IS_REGEX_NOTIFIED: any = false;

@MessagePattern('math.sum.sync.kafka.message')
mathSumSyncKafkaMessage(data: any) {
Expand Down Expand Up @@ -53,7 +59,12 @@ export class KafkaMessagesController {

@EventPattern('notify')
eventHandler(data: any) {
KafkaController.IS_NOTIFIED = data.value.notify;
KafkaMessagesController.IS_NOTIFIED = data.value.notify;
}

@EventPattern(/regex\.notify\.test-[0-9]*/)
regexHandler(data: any, @Ctx() context: KafkaContext) {
KafkaMessagesController.IS_REGEX_NOTIFIED = context.getTopic();
}

// Complex data to send.
Expand Down
18 changes: 15 additions & 3 deletions packages/microservices/server/server-kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { KafkaLogger, KafkaParser } from '../helpers';
import {
CustomTransportStrategy,
KafkaOptions,
MessageHandler,
OutgoingResponse,
ReadPacket,
} from '../interfaces';
Expand All @@ -49,6 +50,8 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
protected clientId: string;
protected groupId: string;

protected registeredPatterns: any[];

constructor(protected readonly options: KafkaOptions['options']) {
super();

Expand Down Expand Up @@ -120,13 +123,12 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
}

public async bindEvents(consumer: Consumer) {
const registeredPatterns = [...this.messageHandlers.keys()];
const consumerSubscribeOptions = this.options.subscribe || {};

if (registeredPatterns.length > 0) {
if (this.registeredPatterns.length > 0) {
await this.consumer.subscribe({
...consumerSubscribeOptions,
topics: registeredPatterns,
topics: this.registeredPatterns,
});
}

Expand Down Expand Up @@ -317,4 +319,14 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
protected initializeDeserializer(options: KafkaOptions['options']) {
this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer();
}

public addHandler(
pattern: any,
callback: MessageHandler,
isEventHandler: boolean = false,
extras: Record<string, any> = {},
) {
this.registeredPatterns.push(pattern);
super.addHandler(pattern, callback, isEventHandler, extras);
}
}
29 changes: 21 additions & 8 deletions packages/microservices/test/server/server-kafka.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,26 @@ describe('ServerKafka', () => {
await server.listen(callback);

const pattern = 'test';
const handler = sinon.spy();
(server as any).messageHandlers = objectToMap({
[pattern]: handler,
});
(server as any).registeredPatterns = [pattern];

await server.bindEvents((server as any).consumer);

expect(subscribe.called).to.be.true;
expect(
subscribe.calledWith({
topics: [pattern],
}),
).to.be.true;

expect(run.called).to.be.true;
expect(connect.called).to.be.true;
});
it('should call subscribe and run on consumer when there are RegExp messageHandlers', async () => {
(server as any).logger = new NoopLogger();
await server.listen(callback);

const pattern = /test/;
(server as any).registeredPatterns = [pattern];

await server.bindEvents((server as any).consumer);

Expand All @@ -204,10 +220,7 @@ describe('ServerKafka', () => {
await server.listen(callback);

const pattern = 'test';
const handler = sinon.spy();
(server as any).messageHandlers = objectToMap({
[pattern]: handler,
});
(server as any).registeredPatterns = [pattern];

await server.bindEvents((server as any).consumer);

Expand Down

0 comments on commit e89fcda

Please sign in to comment.