Skip to content

Commit c0a8614

Browse files
committed
feat: add abort controller for subscribe
1 parent 2556f10 commit c0a8614

File tree

6 files changed

+144
-6
lines changed

6 files changed

+144
-6
lines changed

src/base.ts

+11-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
MessageCategory,
1111
MessageType,
1212
PublishOptions,
13+
SubscribeAbortController,
1314
SubscribeOptions,
1415
SubscribeTopicListener,
1516
WaitCheckOptions,
@@ -493,7 +494,7 @@ export abstract class AbstractEventBus<T> implements IEventBus<T> {
493494
public subscribe(
494495
listener: SubscribeTopicListener,
495496
options: SubscribeOptions = {}
496-
) {
497+
): SubscribeAbortController {
497498
const topic = options.topic || DEFAULT_LISTENER_KEY;
498499
if (!this.topicListener.has(topic)) {
499500
this.topicListener.set(topic, new Set());
@@ -502,6 +503,11 @@ export abstract class AbstractEventBus<T> implements IEventBus<T> {
502503
listener['_subscribeOnce'] = true;
503504
}
504505
this.topicListener.get(topic).add(listener);
506+
listener['_abortController'] = {
507+
abort: () => {
508+
this.topicListener.get(topic).delete(listener);
509+
},
510+
};
505511

506512
// if topic has cache
507513
if (
@@ -517,14 +523,16 @@ export abstract class AbstractEventBus<T> implements IEventBus<T> {
517523
}
518524
this.topicMessageCache.get(topic).clear();
519525
}
526+
527+
return listener['_abortController'];
520528
}
521529

522530
public subscribeOnce(
523531
listener: SubscribeTopicListener,
524532
options: SubscribeOptions = {}
525-
) {
533+
): SubscribeAbortController {
526534
options.subscribeOnce = true;
527-
this.subscribe(listener, options);
535+
return this.subscribe(listener, options);
528536
}
529537

530538
public publish(

src/interface.ts

+9-2
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,21 @@ export type SubscribeTopicListener = (
122122
responder?: IResponder
123123
) => void | Promise<void>;
124124

125+
export type SubscribeAbortController = {
126+
abort: () => void;
127+
};
128+
125129
export interface IEventBus<T> {
126130
addWorker(worker: T);
127131
start(err?: Error): Promise<void>;
128-
subscribe(callback: SubscribeTopicListener, options?: SubscribeOptions): void;
132+
subscribe(
133+
callback: SubscribeTopicListener,
134+
options?: SubscribeOptions
135+
): SubscribeAbortController;
129136
subscribeOnce(
130137
callback: SubscribeTopicListener,
131138
options?: SubscribeOptions
132-
): void;
139+
): SubscribeAbortController;
133140
publishAsync<ResData>(
134141
data: unknown,
135142
publishOptions?: PublishOptions

test/cp.test.ts

+32
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,38 @@ describe('/test/cp.test.ts', function () {
5151
await bus.stop();
5252
});
5353

54+
it('test base subscribe with abort', async () => {
55+
const bus = new ChildProcessEventBus();
56+
const worker = createChildProcessWorker(join(__dirname, 'cp/base.ts'));
57+
bus.addWorker(worker);
58+
await bus.start();
59+
60+
const result = await new Promise(resolve => {
61+
const abortController = bus.subscribe(message => {
62+
resolve(message.body);
63+
});
64+
65+
abortController.abort();
66+
67+
bus.publish({
68+
data: {
69+
name: 'test',
70+
}
71+
}, {
72+
topic: 'target'
73+
});
74+
75+
setTimeout(() => {
76+
resolve(null);
77+
}, 1000);
78+
});
79+
80+
expect(result).toEqual(null);
81+
82+
worker.kill();
83+
await bus.stop();
84+
});
85+
5486
it('test publish with async', async () => {
5587
const bus = new ChildProcessEventBus();
5688
const worker = createChildProcessWorker(join(__dirname, 'cp/publish_async.ts'));

test/local.test.ts

+34-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ describe('/test/local.test.ts', function () {
2222
await bus.stop();
2323
});
2424

25-
it('test base publish and subscribe', async () => {
25+
it.only('test base publish and subscribe', async () => {
2626
const bus = new LocalEventBus({
2727
isWorker: false,
2828
});
@@ -49,6 +49,39 @@ describe('/test/local.test.ts', function () {
4949
await bus.stop();
5050
});
5151

52+
it.only('test base subscribe and abort', async () => {
53+
const bus = new LocalEventBus({
54+
isWorker: false,
55+
});
56+
createLocalWorker(join(__dirname, 'local/base_abort.ts'));
57+
await bus.start();
58+
59+
const result = await new Promise(resolve => {
60+
const abortController = bus.subscribe(message => {
61+
resolve(message.body);
62+
});
63+
64+
abortController.abort();
65+
66+
bus.publish({
67+
data: {
68+
name: 'test',
69+
}
70+
},
71+
{
72+
topic: 'target',
73+
});
74+
75+
setTimeout(() => {
76+
resolve(null);
77+
}, 1000);
78+
});
79+
80+
expect(result).toEqual(null);
81+
82+
await bus.stop();
83+
});
84+
5285
it('test publish with async', async () => {
5386
const bus = new LocalEventBus({
5487
isWorker: false,

test/local/base_abort.ts

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { LocalEventBus } from '../../src/index';
2+
3+
export async function createWorker() {
4+
const bus = new LocalEventBus({
5+
isWorker: true,
6+
});
7+
8+
bus.subscribe(message=>{
9+
console.log(message);
10+
11+
bus.publish({
12+
data: 'hello world'
13+
});
14+
},
15+
{
16+
topic: 'target',
17+
});
18+
19+
await bus.start();
20+
}
21+
22+
createWorker().then(() => {
23+
console.log('ready');
24+
});

test/thread.test.ts

+34
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,40 @@ describe('/test/thread.test.ts', function () {
5252
await bus.stop();
5353
});
5454

55+
it('test base subscribe and abort', async () => {
56+
const bus = new ThreadEventBus();
57+
const worker = createThreadWorker(join(__dirname, 'worker/base.ts'));
58+
bus.addWorker(worker);
59+
await bus.start();
60+
61+
const result = await new Promise(resolve => {
62+
const abortController = bus.subscribe(message => {
63+
resolve(message.body);
64+
});
65+
66+
abortController.abort();
67+
68+
bus.publish({
69+
data: {
70+
name: 'test',
71+
}
72+
},
73+
{
74+
topic: 'target',
75+
});
76+
77+
setTimeout(() => {
78+
resolve(null);
79+
}, 1000);
80+
81+
});
82+
83+
expect(result).toEqual(null);
84+
85+
await worker.terminate();
86+
await bus.stop();
87+
});
88+
5589
it('test publish with async', async () => {
5690
const bus = new ThreadEventBus();
5791
const worker = createThreadWorker(join(__dirname, 'worker/publish_async.ts'));

0 commit comments

Comments
 (0)