diff --git a/projects/workers/src/worker/pipes/worker.pipe.spec.ts b/projects/workers/src/worker/pipes/worker.pipe.spec.ts index 7d31c68..4ec9d97 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.spec.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.spec.ts @@ -32,4 +32,18 @@ describe('WorkerPipe', () => { expect(worker).not.toEqual(differentWorker); }); + + it('should terminate a previous worker', async () => { + const worker = await pipe.transform('a', (data: unknown) => data); + + await pipe.transform('a', (data: unknown) => data); + await expectAsync(worker.toPromise()).toBeResolved(); + }); + + it('should terminate a worker then a pipe is destroyed', async () => { + const worker = await pipe.transform('a', (data: unknown) => data); + + pipe.ngOnDestroy(); + await expectAsync(worker.toPromise()).toBeResolved(); + }); }); diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts index 1cdd4bb..36a9fd7 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -1,4 +1,4 @@ -import {Pipe, PipeTransform} from '@angular/core'; +import {OnDestroy, Pipe, PipeTransform} from '@angular/core'; import {Observable} from 'rxjs'; import {WebWorker} from '../classes/web-worker'; import {toData} from '../operators/to-data'; @@ -7,21 +7,35 @@ import {WorkerFunction} from '../types/worker-function'; @Pipe({ name: 'waWorker', }) -export class WorkerPipe implements PipeTransform { - private workers = new WeakMap(); - private observers = new WeakMap>(); +export class WorkerPipe implements PipeTransform, OnDestroy { + private fn!: WorkerFunction; + private worker!: WebWorker; + private observer!: Observable; transform(value: T, fn: WorkerFunction): Observable { - const worker: WebWorker = - this.workers.get(fn) || WebWorker.fromFunction(fn); + if (this.fn !== fn) { + this.terminateWorker(); + this.initNewWorker(fn); + } - this.workers.set(fn, worker); - worker.postMessage(value); + this.worker.postMessage(value); - const observer = this.observers.get(worker) || worker.pipe(toData()); + return this.observer; + } + + ngOnDestroy(): void { + this.terminateWorker(); + } - this.observers.set(worker, observer); + private terminateWorker() { + if (this.worker) { + this.worker.terminate(); + } + } - return observer; + private initNewWorker(fn: WorkerFunction) { + this.fn = fn; + this.worker = WebWorker.fromFunction(fn); + this.observer = this.worker.pipe(toData()); } }