diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts index 04dee8b..52c747a 100644 --- a/projects/workers/src/worker/classes/web-worker.spec.ts +++ b/projects/workers/src/worker/classes/web-worker.spec.ts @@ -89,4 +89,25 @@ describe('WebWorker', () => { 'Uncaught reason', ); }); + + it('should close all subscriptions, if the worker was terminated', async () => { + const worker = WebWorker.fromFunction(() => 'some data'); + + const subscriptions = [ + worker.subscribe(), + worker.subscribe(), + worker.subscribe(), + ]; + + worker.terminate(); + expect(subscriptions.map(s => s.closed)).toEqual([true, true, true]); + }); + + it("shouldn't throw any errors, if the worker was terminated twice", async () => { + const worker = WebWorker.fromFunction(() => 'some data'); + + worker.terminate(); + worker.terminate(); + expect(await worker.toPromise()).toBeUndefined(); + }); }); diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index b3d032c..0a06dbf 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -1,5 +1,5 @@ -import {EMPTY, fromEvent, merge, Observable} from 'rxjs'; -import {take, tap} from 'rxjs/operators'; +import {EMPTY, fromEvent, merge, Observable, Subject} from 'rxjs'; +import {take, takeUntil, tap} from 'rxjs/operators'; import {WORKER_BLANK_FN} from '../consts/worker-fn-template'; import {TypedMessageEvent} from '../types/typed-message-event'; import {WorkerFunction} from '../types/worker-function'; @@ -7,6 +7,7 @@ import {WorkerFunction} from '../types/worker-function'; export class WebWorker extends Observable> { private readonly worker: Worker | undefined; private readonly url: string; + private readonly destroy$: Subject; constructor(url: string, options?: WorkerOptions) { let worker: Worker | undefined; @@ -19,26 +20,29 @@ export class WebWorker extends Observable } super(subscriber => { + let eventStream$: Observable | ErrorEvent> = EMPTY; + if (error) { subscriber.error(error); + } else if (this.destroy$.isStopped) { + subscriber.complete(); + } else if (worker) { + eventStream$ = merge( + fromEvent>(worker, 'message').pipe( + tap(event => subscriber.next(event)), + ), + fromEvent(worker, 'error').pipe( + tap(event => subscriber.error(event)), + ), + ).pipe(takeUntil(this.destroy$)); } - const eventStream$ = worker - ? merge( - fromEvent>(worker, 'message').pipe( - tap(event => subscriber.next(event)), - ), - fromEvent(worker, 'error').pipe( - tap(event => subscriber.error(event)), - ), - ) - : EMPTY; - - return eventStream$.subscribe(); + eventStream$.subscribe().add(subscriber); }); this.worker = worker; this.url = url; + this.destroy$ = new Subject(); } static fromFunction( @@ -57,7 +61,11 @@ export class WebWorker extends Observable worker.postMessage(data); - return promise; + return promise.then(result => { + worker.terminate(); + + return result; + }); } private static createFnUrl(fn: WorkerFunction): string { @@ -69,11 +77,18 @@ export class WebWorker extends Observable } terminate() { + if (this.destroy$.isStopped) { + return; + } + if (this.worker) { this.worker.terminate(); } URL.revokeObjectURL(this.url); + + this.destroy$.next(); + this.destroy$.complete(); } postMessage(value: T) { diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts index 4f63e1b..1cdd4bb 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -12,7 +12,8 @@ export class WorkerPipe implements PipeTransform { private observers = new WeakMap>(); transform(value: T, fn: WorkerFunction): Observable { - const worker = this.workers.get(fn) || WebWorker.fromFunction(fn); + const worker: WebWorker = + this.workers.get(fn) || WebWorker.fromFunction(fn); this.workers.set(fn, worker); worker.postMessage(value);