From 8592679d400873eeadb3812570192655465c2192 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Tue, 26 Nov 2024 13:10:43 +0800 Subject: [PATCH] feat: support publish async from worker (#14) * feat: support publish async from worker * test: add timeout case * docs: update --- README.md | 35 ++++++++++---- src/base.ts | 18 ++++++-- test/cp.test.ts | 46 +++++++++++++++++++ test/cp/publish_async_from_worker.ts | 24 ++++++++++ test/cp/publish_async_from_worker_timeout.ts | 29 ++++++++++++ test/local.test.ts | 28 ++++++++++- test/local/publish_async_from_worker.ts | 24 ++++++++++ .../publish_async_from_worker_timeout.ts | 29 ++++++++++++ test/thread.test.ts | 46 +++++++++++++++++++ test/worker/publish_async_from_worker.ts | 24 ++++++++++ .../publish_async_from_worker_timeout.ts | 29 ++++++++++++ 11 files changed, 316 insertions(+), 16 deletions(-) create mode 100644 test/cp/publish_async_from_worker.ts create mode 100644 test/cp/publish_async_from_worker_timeout.ts create mode 100644 test/local/publish_async_from_worker.ts create mode 100644 test/local/publish_async_from_worker_timeout.ts create mode 100644 test/worker/publish_async_from_worker.ts create mode 100644 test/worker/publish_async_from_worker_timeout.ts diff --git a/README.md b/README.md index fbc6a03..2167bf5 100644 --- a/README.md +++ b/README.md @@ -162,17 +162,14 @@ bus.publish(err); **publishAsync** -同步的发送消息,会等待订阅方返回,包含超时参数,默认 5s。 - -此 API 仅限于 main 向 worker 发送消息。 +同步的发送消息,会等待订阅方返回,包含超时参数,默认 5s。支持 main <-> worker 双向异步通信。 +从 main 发送到 worker: ```ts - -// subscribe +// in worker bus.subscribe((message, responder) => { // message.body === {data: 'abc'} - - responder && responder.send({ + responder?.send({ data: 'hello world', }); @@ -180,7 +177,7 @@ bus.subscribe((message, responder) => { responder.error(new Error('test')); }); -// invoke +// in main const result = await bus.publishAsync({ data: 'abc' }, { @@ -188,11 +185,29 @@ const result = await bus.publishAsync({ }); // result => {data: 'hello world'} - ``` -使用 try/catch 捕获错误。 +从 worker 发送到 main: +```ts +// in main +bus.subscribe((message, responder) => { + // message.body === {data: 'abc'} + responder?.send({ + data: 'hello world', + }); +}); + +// in worker +const result = await bus.publishAsync({ + data: 'abc' +}, { + timeout: 5000, +}); + +// result => {data: 'hello world'} +``` +使用 try/catch 捕获错误: ```ts try { await bus.publishAsync({ diff --git a/src/base.ts b/src/base.ts index e55d490..2097c3a 100644 --- a/src/base.ts +++ b/src/base.ts @@ -391,9 +391,19 @@ export abstract class AbstractEventBus implements IEventBus { const isChunk = originMessage.messageOptions?.['isChunk'] === true; const responder = new AckResponder(); responder.onData(data => { - this.publish(data, { - relatedMessageId: originMessage.messageId, - isChunk, + this.transit({ + messageCategory: MessageCategory.OUT, + message: { + messageId: this.generateMessageId(), + workerId: this.getWorkerId(), + type: MessageType.Response, + body: data, + messageOptions: { + relatedMessageId: originMessage.messageId, + isChunk, + topic: originMessage.messageOptions?.topic, + }, + }, }); if (!isChunk) { @@ -586,7 +596,7 @@ export abstract class AbstractEventBus implements IEventBus { message: { messageId, workerId: this.getWorkerId(), - type: this.isMain() ? MessageType.Invoke : MessageType.Response, + type: MessageType.Invoke, body: data, messageOptions: { topic: publishOptions.topic, diff --git a/test/cp.test.ts b/test/cp.test.ts index 08909fd..3bf88dc 100644 --- a/test/cp.test.ts +++ b/test/cp.test.ts @@ -545,4 +545,50 @@ describe('/test/cp.test.ts', function () { worker.kill(); await bus.stop(); }); + + it('test publishAsync from child to main', async () => { + const bus = new ChildProcessEventBus(); + const worker = createChildProcessWorker(join(__dirname, 'cp/publish_async_from_worker.ts')); + bus.addWorker(worker); + await bus.start(); + + await new Promise(resolve => { + // 订阅来自子进程的消息,并返回响应 + bus.subscribe((message, responder?) => { + if (message.body.data === 'request from child') { + responder?.send({ + data: 'response from main', + }); + } else if (message.body.data === 'ok') { + resolve(); + } + }); + }); + + await worker.kill(); + await bus.stop(); + }); + + it('test publishAsync from child to main with timeout', async () => { + const bus = new ChildProcessEventBus(); + const worker = createChildProcessWorker(join(__dirname, 'cp/publish_async_from_worker_timeout.ts')); + bus.addWorker(worker); + await bus.start(); + + const result = await new Promise<{error: {message: string}}>(resolve => { + bus.subscribe((message) => { + // 等待子进程发送的超时错误消息 + if (message.body?.error?.message) { + resolve(message.body); + } + }); + + // 主进程不响应子进程的请求,让其超时 + }); + + // 验证超时错误消息 + expect(result.error.message).toMatch('timeout'); + await worker.kill(); + await bus.stop(); + }); }); diff --git a/test/cp/publish_async_from_worker.ts b/test/cp/publish_async_from_worker.ts new file mode 100644 index 0000000..a2475cd --- /dev/null +++ b/test/cp/publish_async_from_worker.ts @@ -0,0 +1,24 @@ +import { ChildProcessEventBus } from '../../src'; + +async function createWorker() { + const bus = new ChildProcessEventBus({ + isWorker: true, + }); + + await bus.start(); + + // 在子进程中发送异步消息到主进程 + const result = await bus.publishAsync<{data: string}>({ + data: 'request from child', + }); + + if (result.data === 'response from main') { + bus.publish({ + data: 'ok', + }); + } +} + +createWorker().then(() => { + console.log('Child process is ready'); +}); diff --git a/test/cp/publish_async_from_worker_timeout.ts b/test/cp/publish_async_from_worker_timeout.ts new file mode 100644 index 0000000..617d9ad --- /dev/null +++ b/test/cp/publish_async_from_worker_timeout.ts @@ -0,0 +1,29 @@ +import { ChildProcessEventBus } from '../../src'; + +async function createWorker() { + const bus = new ChildProcessEventBus({ + isWorker: true, + }); + + await bus.start(); + + try { + // 设置较短的超时时间 + await bus.publishAsync<{data: string}>({ + data: 'request from child', + }, { + timeout: 1000 // 1秒超时 + }); + } catch (err) { + // 发送超时错误给主进程以验证 + bus.publish({ + error: { + message: err.message + } + }); + } +} + +createWorker().then(() => { + console.log('Child process is ready'); +}); diff --git a/test/local.test.ts b/test/local.test.ts index ca58cd3..6e76371 100644 --- a/test/local.test.ts +++ b/test/local.test.ts @@ -22,7 +22,7 @@ describe('/test/local.test.ts', function () { await bus.stop(); }); - it.only('test base publish and subscribe', async () => { + it('test base publish and subscribe', async () => { const bus = new LocalEventBus({ isWorker: false, }); @@ -49,7 +49,7 @@ describe('/test/local.test.ts', function () { await bus.stop(); }); - it.only('test base subscribe and abort', async () => { + it('test base subscribe and abort', async () => { const bus = new LocalEventBus({ isWorker: false, }); @@ -450,4 +450,28 @@ describe('/test/local.test.ts', function () { await bus.stop(); }); + + it('test publishAsync from child to main with timeout', async () => { + const bus = new LocalEventBus({ + isWorker: false, + }); + createLocalWorker(join(__dirname, 'local/publish_async_from_worker_timeout.ts')); + await bus.start(); + + const result = await new Promise<{error: {message: string}}>(resolve => { + bus.subscribe((message) => { + // 等待子进程发送的超时错误消息 + if (message.body?.error?.message) { + resolve(message.body); + } + }); + + // 主进程不响应子进程的请求,让其超时 + }); + + // 验证超时错误消息 + expect(result.error.message).toMatch('timeout'); + + await bus.stop(); + }); }); diff --git a/test/local/publish_async_from_worker.ts b/test/local/publish_async_from_worker.ts new file mode 100644 index 0000000..3942932 --- /dev/null +++ b/test/local/publish_async_from_worker.ts @@ -0,0 +1,24 @@ +import { LocalEventBus } from '../../src'; + +async function createWorker() { + const bus = new LocalEventBus({ + isWorker: true, + }); + + await bus.start(); + + // 在子进程中发送异步消息到主进程 + const result = await bus.publishAsync<{data: string}>({ + data: 'request from child', + }); + + if (result.data === 'response from main') { + bus.publish({ + data: 'ok', + }); + } +} + +createWorker().then(() => { + console.log('Local worker is ready'); +}); \ No newline at end of file diff --git a/test/local/publish_async_from_worker_timeout.ts b/test/local/publish_async_from_worker_timeout.ts new file mode 100644 index 0000000..8d5ad1a --- /dev/null +++ b/test/local/publish_async_from_worker_timeout.ts @@ -0,0 +1,29 @@ +import { LocalEventBus } from '../../src'; + +async function createWorker() { + const bus = new LocalEventBus({ + isWorker: true, + }); + + await bus.start(); + + try { + // 设置较短的超时时间 + await bus.publishAsync<{data: string}>({ + data: 'request from child', + }, { + timeout: 1000 // 1秒超时 + }); + } catch (err) { + // 发送超时错误给主进程以验证 + bus.publish({ + error: { + message: err.message + } + }); + } +} + +createWorker().then(() => { + console.log('Local worker is ready'); +}); diff --git a/test/thread.test.ts b/test/thread.test.ts index 776d5ba..a063c0c 100644 --- a/test/thread.test.ts +++ b/test/thread.test.ts @@ -549,4 +549,50 @@ describe('/test/thread.test.ts', function () { await bus.stop(); }); + it('test publishAsync from child to main', async () => { + const bus = new ThreadEventBus(); + const worker = createThreadWorker(join(__dirname, 'worker/publish_async_from_worker.ts')); + bus.addWorker(worker); + await bus.start(); + + await new Promise(resolve => { + bus.subscribe((message, responder?) => { + if (message.body.data === 'request from child') { + responder?.send({ + data: 'response from main', + }); + } else if (message.body.data === 'ok') { + resolve(); + } + }); + }); + + await worker.terminate(); + await bus.stop(); + }); + + it('test publishAsync from child to main with timeout', async () => { + const bus = new ThreadEventBus(); + const worker = createThreadWorker(join(__dirname, 'worker/publish_async_from_worker_timeout.ts')); + bus.addWorker(worker); + await bus.start(); + + const result = await new Promise<{error: {message: string}}>(resolve => { + bus.subscribe((message) => { + // 等待子进程发送的超时错误消息 + if (message.body?.error?.message) { + resolve(message.body); + } + }); + + // 主进程不响应子进程的请求,让其超时 + }); + + // 验证超时错误消息 + expect(result.error.message).toMatch('timeout'); + + await worker.terminate(); + await bus.stop(); + }); + }); diff --git a/test/worker/publish_async_from_worker.ts b/test/worker/publish_async_from_worker.ts new file mode 100644 index 0000000..76abcb7 --- /dev/null +++ b/test/worker/publish_async_from_worker.ts @@ -0,0 +1,24 @@ +import { ThreadEventBus } from '../../src'; + +async function createWorker() { + const bus = new ThreadEventBus({ + isWorker: true, + }); + + await bus.start(); + + // 在子进程中发送异步消息到主进程 + const result = await bus.publishAsync<{data: string}>({ + data: 'request from child', + }); + + if (result.data === 'response from main') { + bus.publish({ + data: 'ok', + }); + } +} + +createWorker().then(() => { + console.log('Thread worker is ready'); +}); \ No newline at end of file diff --git a/test/worker/publish_async_from_worker_timeout.ts b/test/worker/publish_async_from_worker_timeout.ts new file mode 100644 index 0000000..12af0c7 --- /dev/null +++ b/test/worker/publish_async_from_worker_timeout.ts @@ -0,0 +1,29 @@ +import { ThreadEventBus } from '../../src'; + +async function createWorker() { + const bus = new ThreadEventBus({ + isWorker: true, + }); + + await bus.start(); + + try { + // 设置较短的超时时间 + await bus.publishAsync<{data: string}>({ + data: 'request from child', + }, { + timeout: 1000 // 1秒超时 + }); + } catch (err) { + // 发送超时错误给主进程以验证 + bus.publish({ + error: { + message: err.message + } + }); + } +} + +createWorker().then(() => { + console.log('Thread worker is ready'); +});