Skip to content

Commit

Permalink
Merge pull request #5 from nikksan/master
Browse files Browse the repository at this point in the history
Provide hardcoded list of namespaces to the subscriber
  • Loading branch information
tecs authored Apr 20, 2023
2 parents e4644ae + 8a8ecbf commit 93a11d2
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 33 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@luckbox/notification-client",
"version": "2.0.1",
"version": "2.0.2",
"description": "A two-way Redis-based notification client allowing for pub/sub",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
13 changes: 0 additions & 13 deletions src/NotificationPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export default class NotificationPublisher {
private namespace: string;
private prefix: string;
private redisClient: Redis;
private hasPublishedServerInitCommand = false;

constructor(params: ConstructorParams) {
this.namespace = params.namespace;
Expand All @@ -26,8 +25,6 @@ export default class NotificationPublisher {
rooms: Array<string | number> | null = null,
user: string | number | null = null
): Promise<void> {
await this.publishServerInitCommandIfNotAlready();

const payload: Record<string, unknown> = {
e: event,
m: message,
Expand All @@ -43,14 +40,4 @@ export default class NotificationPublisher {

await this.redisClient.publish(`${this.prefix}/${this.namespace}/tx`, JSON.stringify(payload));
}

private async publishServerInitCommandIfNotAlready() {
if (this.hasPublishedServerInitCommand) {
return;
}

await this.redisClient.publish(`${this.prefix}/tx`, this.namespace);

this.hasPublishedServerInitCommand = true;
}
}
37 changes: 20 additions & 17 deletions src/NotificationSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ export type MessageHandler = (namespace: string, event: string, message: unknown

type ConstructorParams = {
prefix: string,
namespaces: Array<string>,
handler: MessageHandler,
redisConfig: RedisOptions,
loggerFactory: LoggerFactory,
};

export default class NotificationSubscriber {
private namespaces: Record<string, string> = {};
private namespaces: Array<string>;
private logger: Logger;
private redisClient: Redis;
private prefix: string;
Expand All @@ -22,40 +23,42 @@ export default class NotificationSubscriber {
this.logger = params.loggerFactory.create(this.constructor.name);
this.prefix = params.prefix;
this.handler = params.handler;
this.namespaces = params.namespaces;

this.redisClient = new Redis(params.redisConfig);
}

async subscribe(): Promise<void> {
for (const namespace of this.namespaces) {
await this.register(namespace);
}

this.redisClient.on('message', async (channel: string, packet: string) => {
try {
if (channel === `${this.prefix}/tx`) {
await this.register(packet);
return;
}
if (
!channel.startsWith(this.prefix) ||
!channel.endsWith('/tx')
) {
return;
}

if (channel in this.namespaces) {
await this.receivePacket(channel, packet);
const namespace = channel.slice(this.prefix.length + 1, -3);
try {
if (this.namespaces.includes(namespace)) {
await this.receivePacket(namespace, packet);
}
} catch (err) {
this.logger.warn(err);
}
});

await this.redisClient.publish(`${this.prefix}/rx`, 'SERVER_INIT');
await this.redisClient.subscribe(`${this.prefix}/tx`);
}

private async register(namespace: string) {
const channel = `${this.prefix}/${namespace}/tx`;
if (!(channel in this.namespaces)) {
this.namespaces[channel] = namespace;
await this.redisClient.subscribe(channel);
}
await this.redisClient.subscribe(channel);
this.logger.info(`Registered namespace ${namespace} [${channel}]`);
}

private receivePacket(channel: string, packet: string) {
const namespace = this.namespaces[channel];
private receivePacket(namespace: string, packet: string) {
const {
e: event,
m: message,
Expand Down

0 comments on commit 93a11d2

Please sign in to comment.