diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index 67c58bb..0a06dbf 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -1,5 +1,5 @@ -import {EMPTY, fromEvent, merge, Observable, Observer} from 'rxjs'; -import {take, tap} from 'rxjs/operators'; +import {EMPTY, fromEvent, merge, Observable, Subject} from 'rxjs'; +import {take, takeUntil, tap} from 'rxjs/operators'; import {WORKER_BLANK_FN} from '../consts/worker-fn-template'; import {TypedMessageEvent} from '../types/typed-message-event'; import {WorkerFunction} from '../types/worker-function'; @@ -7,8 +7,7 @@ import {WorkerFunction} from '../types/worker-function'; export class WebWorker extends Observable> { private readonly worker: Worker | undefined; private readonly url: string; - private isStopped: boolean; - private observers: Observer>[]; + private readonly destroy$: Subject; constructor(url: string, options?: WorkerOptions) { let worker: Worker | undefined; @@ -25,7 +24,7 @@ export class WebWorker extends Observable if (error) { subscriber.error(error); - } else if (this.isStopped) { + } else if (this.destroy$.isStopped) { subscriber.complete(); } else if (worker) { eventStream$ = merge( @@ -35,19 +34,15 @@ export class WebWorker extends Observable fromEvent(worker, 'error').pipe( tap(event => subscriber.error(event)), ), - ); - - this.observers.push(subscriber); + ).pipe(takeUntil(this.destroy$)); } - return eventStream$.subscribe(); + eventStream$.subscribe().add(subscriber); }); this.worker = worker; this.url = url; - - this.isStopped = false; - this.observers = []; + this.destroy$ = new Subject(); } static fromFunction( @@ -82,7 +77,7 @@ export class WebWorker extends Observable } terminate() { - if (this.isStopped) { + if (this.destroy$.isStopped) { return; } @@ -92,13 +87,8 @@ export class WebWorker extends Observable URL.revokeObjectURL(this.url); - this.isStopped = true; - this.observers.forEach(observer => { - if (!observer.closed) { - observer.complete(); - } - }); - this.observers = []; + this.destroy$.next(); + this.destroy$.complete(); } postMessage(value: T) {