From b20aad5b356d8a4f43b1ca8fcafcb3bcc509ddd9 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Mon, 23 Nov 2020 12:21:21 +0300 Subject: [PATCH 1/2] fix: close all subscriptions, if the worker was terminated --- .../src/worker/classes/web-worker.spec.ts | 21 ++++++++ .../workers/src/worker/classes/web-worker.ts | 51 ++++++++++++++----- .../workers/src/worker/pipes/worker.pipe.ts | 3 +- 3 files changed, 61 insertions(+), 14 deletions(-) diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts index 04dee8b..52c747a 100644 --- a/projects/workers/src/worker/classes/web-worker.spec.ts +++ b/projects/workers/src/worker/classes/web-worker.spec.ts @@ -89,4 +89,25 @@ describe('WebWorker', () => { 'Uncaught reason', ); }); + + it('should close all subscriptions, if the worker was terminated', async () => { + const worker = WebWorker.fromFunction(() => 'some data'); + + const subscriptions = [ + worker.subscribe(), + worker.subscribe(), + worker.subscribe(), + ]; + + worker.terminate(); + expect(subscriptions.map(s => s.closed)).toEqual([true, true, true]); + }); + + it("shouldn't throw any errors, if the worker was terminated twice", async () => { + const worker = WebWorker.fromFunction(() => 'some data'); + + worker.terminate(); + worker.terminate(); + expect(await worker.toPromise()).toBeUndefined(); + }); }); diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index b3d032c..67c58bb 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -1,4 +1,4 @@ -import {EMPTY, fromEvent, merge, Observable} from 'rxjs'; +import {EMPTY, fromEvent, merge, Observable, Observer} from 'rxjs'; import {take, tap} from 'rxjs/operators'; import {WORKER_BLANK_FN} from '../consts/worker-fn-template'; import {TypedMessageEvent} from '../types/typed-message-event'; @@ -7,6 +7,8 @@ import {WorkerFunction} from '../types/worker-function'; export class WebWorker extends Observable> { private readonly worker: Worker | undefined; private readonly url: string; + private isStopped: boolean; + private observers: Observer>[]; constructor(url: string, options?: WorkerOptions) { let worker: Worker | undefined; @@ -19,26 +21,33 @@ export class WebWorker extends Observable } super(subscriber => { + let eventStream$: Observable | ErrorEvent> = EMPTY; + if (error) { subscriber.error(error); + } else if (this.isStopped) { + subscriber.complete(); + } else if (worker) { + eventStream$ = merge( + fromEvent>(worker, 'message').pipe( + tap(event => subscriber.next(event)), + ), + fromEvent(worker, 'error').pipe( + tap(event => subscriber.error(event)), + ), + ); + + this.observers.push(subscriber); } - const eventStream$ = worker - ? merge( - fromEvent>(worker, 'message').pipe( - tap(event => subscriber.next(event)), - ), - fromEvent(worker, 'error').pipe( - tap(event => subscriber.error(event)), - ), - ) - : EMPTY; - return eventStream$.subscribe(); }); this.worker = worker; this.url = url; + + this.isStopped = false; + this.observers = []; } static fromFunction( @@ -57,7 +66,11 @@ export class WebWorker extends Observable worker.postMessage(data); - return promise; + return promise.then(result => { + worker.terminate(); + + return result; + }); } private static createFnUrl(fn: WorkerFunction): string { @@ -69,11 +82,23 @@ export class WebWorker extends Observable } terminate() { + if (this.isStopped) { + return; + } + if (this.worker) { this.worker.terminate(); } URL.revokeObjectURL(this.url); + + this.isStopped = true; + this.observers.forEach(observer => { + if (!observer.closed) { + observer.complete(); + } + }); + this.observers = []; } postMessage(value: T) { diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts index 4f63e1b..1cdd4bb 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -12,7 +12,8 @@ export class WorkerPipe implements PipeTransform { private observers = new WeakMap>(); transform(value: T, fn: WorkerFunction): Observable { - const worker = this.workers.get(fn) || WebWorker.fromFunction(fn); + const worker: WebWorker = + this.workers.get(fn) || WebWorker.fromFunction(fn); this.workers.set(fn, worker); worker.postMessage(value); From e1e7d0ace4d8c24d6ebd36ce0ab2b08fa1330e14 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Mon, 23 Nov 2020 13:52:28 +0300 Subject: [PATCH 2/2] chore: use Subject instead of an array of observers --- .../workers/src/worker/classes/web-worker.ts | 30 +++++++------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index 67c58bb..0a06dbf 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -1,5 +1,5 @@ -import {EMPTY, fromEvent, merge, Observable, Observer} from 'rxjs'; -import {take, tap} from 'rxjs/operators'; +import {EMPTY, fromEvent, merge, Observable, Subject} from 'rxjs'; +import {take, takeUntil, tap} from 'rxjs/operators'; import {WORKER_BLANK_FN} from '../consts/worker-fn-template'; import {TypedMessageEvent} from '../types/typed-message-event'; import {WorkerFunction} from '../types/worker-function'; @@ -7,8 +7,7 @@ import {WorkerFunction} from '../types/worker-function'; export class WebWorker extends Observable> { private readonly worker: Worker | undefined; private readonly url: string; - private isStopped: boolean; - private observers: Observer>[]; + private readonly destroy$: Subject; constructor(url: string, options?: WorkerOptions) { let worker: Worker | undefined; @@ -25,7 +24,7 @@ export class WebWorker extends Observable if (error) { subscriber.error(error); - } else if (this.isStopped) { + } else if (this.destroy$.isStopped) { subscriber.complete(); } else if (worker) { eventStream$ = merge( @@ -35,19 +34,15 @@ export class WebWorker extends Observable fromEvent(worker, 'error').pipe( tap(event => subscriber.error(event)), ), - ); - - this.observers.push(subscriber); + ).pipe(takeUntil(this.destroy$)); } - return eventStream$.subscribe(); + eventStream$.subscribe().add(subscriber); }); this.worker = worker; this.url = url; - - this.isStopped = false; - this.observers = []; + this.destroy$ = new Subject(); } static fromFunction( @@ -82,7 +77,7 @@ export class WebWorker extends Observable } terminate() { - if (this.isStopped) { + if (this.destroy$.isStopped) { return; } @@ -92,13 +87,8 @@ export class WebWorker extends Observable URL.revokeObjectURL(this.url); - this.isStopped = true; - this.observers.forEach(observer => { - if (!observer.closed) { - observer.complete(); - } - }); - this.observers = []; + this.destroy$.next(); + this.destroy$.complete(); } postMessage(value: T) {