Skip to content

Commit

Permalink
feat: support publish async from worker (#14)
Browse files Browse the repository at this point in the history
* feat: support publish async from worker

* test: add timeout case

* docs: update
  • Loading branch information
czy88840616 authored Nov 26, 2024
1 parent 18d00dc commit 8592679
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 16 deletions.
35 changes: 25 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,37 +162,52 @@ bus.publish(err);

**publishAsync**

同步的发送消息,会等待订阅方返回,包含超时参数,默认 5s。

此 API 仅限于 main 向 worker 发送消息。
同步的发送消息,会等待订阅方返回,包含超时参数,默认 5s。支持 main <-> worker 双向异步通信。

从 main 发送到 worker:
```ts

// subscribe
// in worker
bus.subscribe((message, responder) => {
// message.body === {data: 'abc'}

responder && responder.send({
responder?.send({
data: 'hello world',
});

// send error
responder.error(new Error('test'));
});

// invoke
// in main
const result = await bus.publishAsync({
data: 'abc'
}, {
timeout: 5000,
});

// result => {data: 'hello world'}

```

使用 try/catch 捕获错误。
从 worker 发送到 main:
```ts
// in main
bus.subscribe((message, responder) => {
// message.body === {data: 'abc'}
responder?.send({
data: 'hello world',
});
});

// in worker
const result = await bus.publishAsync({
data: 'abc'
}, {
timeout: 5000,
});

// result => {data: 'hello world'}
```

使用 try/catch 捕获错误:
```ts
try {
await bus.publishAsync({
Expand Down
18 changes: 14 additions & 4 deletions src/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,19 @@ export abstract class AbstractEventBus<T> implements IEventBus<T> {
const isChunk = originMessage.messageOptions?.['isChunk'] === true;
const responder = new AckResponder();
responder.onData(data => {
this.publish(data, {
relatedMessageId: originMessage.messageId,
isChunk,
this.transit({
messageCategory: MessageCategory.OUT,
message: {
messageId: this.generateMessageId(),
workerId: this.getWorkerId(),
type: MessageType.Response,
body: data,
messageOptions: {
relatedMessageId: originMessage.messageId,
isChunk,
topic: originMessage.messageOptions?.topic,
},
},
});

if (!isChunk) {
Expand Down Expand Up @@ -586,7 +596,7 @@ export abstract class AbstractEventBus<T> implements IEventBus<T> {
message: {
messageId,
workerId: this.getWorkerId(),
type: this.isMain() ? MessageType.Invoke : MessageType.Response,
type: MessageType.Invoke,
body: data,
messageOptions: {
topic: publishOptions.topic,
Expand Down
46 changes: 46 additions & 0 deletions test/cp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -545,4 +545,50 @@ describe('/test/cp.test.ts', function () {
worker.kill();
await bus.stop();
});

it('test publishAsync from child to main', async () => {
const bus = new ChildProcessEventBus();
const worker = createChildProcessWorker(join(__dirname, 'cp/publish_async_from_worker.ts'));
bus.addWorker(worker);
await bus.start();

await new Promise<void>(resolve => {
// 订阅来自子进程的消息,并返回响应
bus.subscribe((message, responder?) => {
if (message.body.data === 'request from child') {
responder?.send({
data: 'response from main',
});
} else if (message.body.data === 'ok') {
resolve();
}
});
});

await worker.kill();
await bus.stop();
});

it('test publishAsync from child to main with timeout', async () => {
const bus = new ChildProcessEventBus();
const worker = createChildProcessWorker(join(__dirname, 'cp/publish_async_from_worker_timeout.ts'));
bus.addWorker(worker);
await bus.start();

const result = await new Promise<{error: {message: string}}>(resolve => {
bus.subscribe((message) => {
// 等待子进程发送的超时错误消息
if (message.body?.error?.message) {
resolve(message.body);
}
});

// 主进程不响应子进程的请求,让其超时
});

// 验证超时错误消息
expect(result.error.message).toMatch('timeout');
await worker.kill();
await bus.stop();
});
});
24 changes: 24 additions & 0 deletions test/cp/publish_async_from_worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { ChildProcessEventBus } from '../../src';

async function createWorker() {
const bus = new ChildProcessEventBus({
isWorker: true,
});

await bus.start();

// 在子进程中发送异步消息到主进程
const result = await bus.publishAsync<{data: string}>({
data: 'request from child',
});

if (result.data === 'response from main') {
bus.publish({
data: 'ok',
});
}
}

createWorker().then(() => {
console.log('Child process is ready');
});
29 changes: 29 additions & 0 deletions test/cp/publish_async_from_worker_timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { ChildProcessEventBus } from '../../src';

async function createWorker() {
const bus = new ChildProcessEventBus({
isWorker: true,
});

await bus.start();

try {
// 设置较短的超时时间
await bus.publishAsync<{data: string}>({
data: 'request from child',
}, {
timeout: 1000 // 1秒超时
});
} catch (err) {
// 发送超时错误给主进程以验证
bus.publish({
error: {
message: err.message
}
});
}
}

createWorker().then(() => {
console.log('Child process is ready');
});
28 changes: 26 additions & 2 deletions test/local.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('/test/local.test.ts', function () {
await bus.stop();
});

it.only('test base publish and subscribe', async () => {
it('test base publish and subscribe', async () => {
const bus = new LocalEventBus({
isWorker: false,
});
Expand All @@ -49,7 +49,7 @@ describe('/test/local.test.ts', function () {
await bus.stop();
});

it.only('test base subscribe and abort', async () => {
it('test base subscribe and abort', async () => {
const bus = new LocalEventBus({
isWorker: false,
});
Expand Down Expand Up @@ -450,4 +450,28 @@ describe('/test/local.test.ts', function () {

await bus.stop();
});

it('test publishAsync from child to main with timeout', async () => {
const bus = new LocalEventBus({
isWorker: false,
});
createLocalWorker(join(__dirname, 'local/publish_async_from_worker_timeout.ts'));
await bus.start();

const result = await new Promise<{error: {message: string}}>(resolve => {
bus.subscribe((message) => {
// 等待子进程发送的超时错误消息
if (message.body?.error?.message) {
resolve(message.body);
}
});

// 主进程不响应子进程的请求,让其超时
});

// 验证超时错误消息
expect(result.error.message).toMatch('timeout');

await bus.stop();
});
});
24 changes: 24 additions & 0 deletions test/local/publish_async_from_worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { LocalEventBus } from '../../src';

async function createWorker() {
const bus = new LocalEventBus({
isWorker: true,
});

await bus.start();

// 在子进程中发送异步消息到主进程
const result = await bus.publishAsync<{data: string}>({
data: 'request from child',
});

if (result.data === 'response from main') {
bus.publish({
data: 'ok',
});
}
}

createWorker().then(() => {
console.log('Local worker is ready');
});
29 changes: 29 additions & 0 deletions test/local/publish_async_from_worker_timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { LocalEventBus } from '../../src';

async function createWorker() {
const bus = new LocalEventBus({
isWorker: true,
});

await bus.start();

try {
// 设置较短的超时时间
await bus.publishAsync<{data: string}>({
data: 'request from child',
}, {
timeout: 1000 // 1秒超时
});
} catch (err) {
// 发送超时错误给主进程以验证
bus.publish({
error: {
message: err.message
}
});
}
}

createWorker().then(() => {
console.log('Local worker is ready');
});
46 changes: 46 additions & 0 deletions test/thread.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,4 +549,50 @@ describe('/test/thread.test.ts', function () {
await bus.stop();
});

it('test publishAsync from child to main', async () => {
const bus = new ThreadEventBus();
const worker = createThreadWorker(join(__dirname, 'worker/publish_async_from_worker.ts'));
bus.addWorker(worker);
await bus.start();

await new Promise<void>(resolve => {
bus.subscribe((message, responder?) => {
if (message.body.data === 'request from child') {
responder?.send({
data: 'response from main',
});
} else if (message.body.data === 'ok') {
resolve();
}
});
});

await worker.terminate();
await bus.stop();
});

it('test publishAsync from child to main with timeout', async () => {
const bus = new ThreadEventBus();
const worker = createThreadWorker(join(__dirname, 'worker/publish_async_from_worker_timeout.ts'));
bus.addWorker(worker);
await bus.start();

const result = await new Promise<{error: {message: string}}>(resolve => {
bus.subscribe((message) => {
// 等待子进程发送的超时错误消息
if (message.body?.error?.message) {
resolve(message.body);
}
});

// 主进程不响应子进程的请求,让其超时
});

// 验证超时错误消息
expect(result.error.message).toMatch('timeout');

await worker.terminate();
await bus.stop();
});

});
24 changes: 24 additions & 0 deletions test/worker/publish_async_from_worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { ThreadEventBus } from '../../src';

async function createWorker() {
const bus = new ThreadEventBus({
isWorker: true,
});

await bus.start();

// 在子进程中发送异步消息到主进程
const result = await bus.publishAsync<{data: string}>({
data: 'request from child',
});

if (result.data === 'response from main') {
bus.publish({
data: 'ok',
});
}
}

createWorker().then(() => {
console.log('Thread worker is ready');
});
Loading

0 comments on commit 8592679

Please sign in to comment.