Skip to content

Commit ad66830

Browse files
committed
feat: support publish async from worker
1 parent 18d00dc commit ad66830

6 files changed

+111
-6
lines changed

src/base.ts

+14-4
Original file line numberDiff line numberDiff line change
@@ -391,9 +391,19 @@ export abstract class AbstractEventBus<T> implements IEventBus<T> {
391391
const isChunk = originMessage.messageOptions?.['isChunk'] === true;
392392
const responder = new AckResponder();
393393
responder.onData(data => {
394-
this.publish(data, {
395-
relatedMessageId: originMessage.messageId,
396-
isChunk,
394+
this.transit({
395+
messageCategory: MessageCategory.OUT,
396+
message: {
397+
messageId: this.generateMessageId(),
398+
workerId: this.getWorkerId(),
399+
type: MessageType.Response,
400+
body: data,
401+
messageOptions: {
402+
relatedMessageId: originMessage.messageId,
403+
isChunk,
404+
topic: originMessage.messageOptions?.topic,
405+
},
406+
},
397407
});
398408

399409
if (!isChunk) {
@@ -586,7 +596,7 @@ export abstract class AbstractEventBus<T> implements IEventBus<T> {
586596
message: {
587597
messageId,
588598
workerId: this.getWorkerId(),
589-
type: this.isMain() ? MessageType.Invoke : MessageType.Response,
599+
type: MessageType.Invoke,
590600
body: data,
591601
messageOptions: {
592602
topic: publishOptions.topic,

test/cp.test.ts

+23
Original file line numberDiff line numberDiff line change
@@ -545,4 +545,27 @@ describe('/test/cp.test.ts', function () {
545545
worker.kill();
546546
await bus.stop();
547547
});
548+
549+
it('test publishAsync from child to main', async () => {
550+
const bus = new ChildProcessEventBus();
551+
const worker = createChildProcessWorker(join(__dirname, 'cp/publish_async_from_worker.ts'));
552+
bus.addWorker(worker);
553+
await bus.start();
554+
555+
await new Promise<void>(resolve => {
556+
// 订阅来自子进程的消息,并返回响应
557+
bus.subscribe((message, responder?) => {
558+
if (message.body.data === 'request from child') {
559+
responder?.send({
560+
data: 'response from main',
561+
});
562+
} else if (message.body.data === 'ok') {
563+
resolve();
564+
}
565+
});
566+
});
567+
568+
await worker.kill();
569+
await bus.stop();
570+
});
548571
});

test/cp/publish_async_from_worker.ts

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { ChildProcessEventBus } from '../../src';
2+
3+
async function createWorker() {
4+
const bus = new ChildProcessEventBus({
5+
isWorker: true,
6+
});
7+
8+
await bus.start();
9+
10+
// 在子进程中发送异步消息到主进程
11+
const result = await bus.publishAsync<{data: string}>({
12+
data: 'request from child',
13+
});
14+
15+
if (result.data === 'response from main') {
16+
bus.publish({
17+
data: 'ok',
18+
});
19+
}
20+
}
21+
22+
createWorker().then(() => {
23+
console.log('Child process is ready');
24+
});

test/local.test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ describe('/test/local.test.ts', function () {
2222
await bus.stop();
2323
});
2424

25-
it.only('test base publish and subscribe', async () => {
25+
it('test base publish and subscribe', async () => {
2626
const bus = new LocalEventBus({
2727
isWorker: false,
2828
});
@@ -49,7 +49,7 @@ describe('/test/local.test.ts', function () {
4949
await bus.stop();
5050
});
5151

52-
it.only('test base subscribe and abort', async () => {
52+
it('test base subscribe and abort', async () => {
5353
const bus = new LocalEventBus({
5454
isWorker: false,
5555
});
+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { LocalEventBus } from '../../src';
2+
3+
async function createWorker() {
4+
const bus = new LocalEventBus({
5+
isWorker: true,
6+
});
7+
8+
await bus.start();
9+
10+
// 在子进程中发送异步消息到主进程
11+
const result = await bus.publishAsync<{data: string}>({
12+
data: 'request from child',
13+
});
14+
15+
if (result.data === 'response from main') {
16+
bus.publish({
17+
data: 'ok',
18+
});
19+
}
20+
}
21+
22+
createWorker().then(() => {
23+
console.log('Local worker is ready');
24+
});
+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { ThreadEventBus } from '../../src';
2+
3+
async function createWorker() {
4+
const bus = new ThreadEventBus({
5+
isWorker: true,
6+
});
7+
8+
await bus.start();
9+
10+
// 在子进程中发送异步消息到主进程
11+
const result = await bus.publishAsync<{data: string}>({
12+
data: 'request from child',
13+
});
14+
15+
if (result.data === 'response from main') {
16+
bus.publish({
17+
data: 'ok',
18+
});
19+
}
20+
}
21+
22+
createWorker().then(() => {
23+
console.log('Thread worker is ready');
24+
});

0 commit comments

Comments
 (0)