From f181bbdbf73a264e46547714cb628bfb83851678 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Thu, 26 Nov 2020 09:54:06 +0300 Subject: [PATCH 1/3] fix: add termination of unused workers If pipe is destroyed a worker will be terminated --- .../src/worker/pipes/worker.pipe.spec.ts | 14 ++++++++ .../workers/src/worker/pipes/worker.pipe.ts | 36 +++++++++++++------ 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/projects/workers/src/worker/pipes/worker.pipe.spec.ts b/projects/workers/src/worker/pipes/worker.pipe.spec.ts index 7d31c68..4ec9d97 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.spec.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.spec.ts @@ -32,4 +32,18 @@ describe('WorkerPipe', () => { expect(worker).not.toEqual(differentWorker); }); + + it('should terminate a previous worker', async () => { + const worker = await pipe.transform('a', (data: unknown) => data); + + await pipe.transform('a', (data: unknown) => data); + await expectAsync(worker.toPromise()).toBeResolved(); + }); + + it('should terminate a worker then a pipe is destroyed', async () => { + const worker = await pipe.transform('a', (data: unknown) => data); + + pipe.ngOnDestroy(); + await expectAsync(worker.toPromise()).toBeResolved(); + }); }); diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts index 1cdd4bb..f42517d 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -1,4 +1,4 @@ -import {Pipe, PipeTransform} from '@angular/core'; +import {OnDestroy, Pipe, PipeTransform} from '@angular/core'; import {Observable} from 'rxjs'; import {WebWorker} from '../classes/web-worker'; import {toData} from '../operators/to-data'; @@ -7,21 +7,35 @@ import {WorkerFunction} from '../types/worker-function'; @Pipe({ name: 'waWorker', }) -export class WorkerPipe implements PipeTransform { - private workers = new WeakMap(); - private observers = new WeakMap>(); +export class WorkerPipe implements PipeTransform, OnDestroy { + private fn!: WorkerFunction; + private worker!: WebWorker; + private observer!: Observable; transform(value: T, fn: WorkerFunction): Observable { - const worker: WebWorker = - this.workers.get(fn) || WebWorker.fromFunction(fn); + if (this.fn !== fn) { + this.terminate(); + this.initNewWorker(fn); - this.workers.set(fn, worker); - worker.postMessage(value); + this.worker.postMessage(value); + } - const observer = this.observers.get(worker) || worker.pipe(toData()); + return this.observer; + } + + ngOnDestroy(): void { + this.terminate(); + } - this.observers.set(worker, observer); + private terminate() { + if (this.worker) { + this.worker.terminate(); + } + } - return observer; + private initNewWorker(fn: WorkerFunction) { + this.fn = fn; + this.worker = WebWorker.fromFunction(fn); + this.observer = this.worker.pipe(toData()); } } From 49dcbd3bd34557eab7696807c9403559efacc5c2 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Thu, 26 Nov 2020 09:59:47 +0300 Subject: [PATCH 2/3] chore: fix a small bug --- projects/workers/src/worker/pipes/worker.pipe.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts index f42517d..112cea9 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -16,10 +16,10 @@ export class WorkerPipe implements PipeTransform, OnDestroy { if (this.fn !== fn) { this.terminate(); this.initNewWorker(fn); - - this.worker.postMessage(value); } + this.worker.postMessage(value); + return this.observer; } From ac1a2d20c1e7a0bee4a25d073bf9fdd0cbba9779 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Thu, 26 Nov 2020 10:01:01 +0300 Subject: [PATCH 3/3] chore: rename terminate -> terminateWorker --- projects/workers/src/worker/pipes/worker.pipe.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts index 112cea9..36a9fd7 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -14,7 +14,7 @@ export class WorkerPipe implements PipeTransform, OnDestroy { transform(value: T, fn: WorkerFunction): Observable { if (this.fn !== fn) { - this.terminate(); + this.terminateWorker(); this.initNewWorker(fn); } @@ -24,10 +24,10 @@ export class WorkerPipe implements PipeTransform, OnDestroy { } ngOnDestroy(): void { - this.terminate(); + this.terminateWorker(); } - private terminate() { + private terminateWorker() { if (this.worker) { this.worker.terminate(); }