From 3dfd75e9651e517576b7a5ed144841dade221f3d Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Fri, 25 Sep 2020 10:10:48 +0300 Subject: [PATCH 01/22] feat: added service and base classes --- projects/demo/src/app/app.browser.module.ts | 2 + projects/demo/src/app/app.component.html | 7 ++ projects/demo/src/app/app.component.ts | 11 ++- projects/workers/src/public-api.ts | 4 + .../web-worker/classes/any-next-subject.ts | 7 ++ .../src/web-worker/classes/web-worker.ts | 78 +++++++++++++++++++ .../web-worker-executor.service.spec.ts | 13 ++++ .../services/web-worker-executor.service.ts | 18 +++++ .../web-worker/types/web-worker-function.ts | 1 + .../src/web-worker/web-worker.module.ts | 7 ++ 10 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 projects/workers/src/web-worker/classes/any-next-subject.ts create mode 100644 projects/workers/src/web-worker/classes/web-worker.ts create mode 100644 projects/workers/src/web-worker/services/web-worker-executor.service.spec.ts create mode 100644 projects/workers/src/web-worker/services/web-worker-executor.service.ts create mode 100644 projects/workers/src/web-worker/types/web-worker-function.ts create mode 100644 projects/workers/src/web-worker/web-worker.module.ts diff --git a/projects/demo/src/app/app.browser.module.ts b/projects/demo/src/app/app.browser.module.ts index 4d353f0..70bf580 100644 --- a/projects/demo/src/app/app.browser.module.ts +++ b/projects/demo/src/app/app.browser.module.ts @@ -7,6 +7,7 @@ import { import {NgModule} from '@angular/core'; import {FormsModule} from '@angular/forms'; import {BrowserModule} from '@angular/platform-browser'; +import {WebWorkerModule} from '@ng-web-apis/workers'; import {AppComponent} from './app.component'; import {AppRoutingModule} from './app.routes'; @@ -17,6 +18,7 @@ import {AppRoutingModule} from './app.routes'; FormsModule, BrowserModule.withServerTransition({appId: 'demo'}), AppRoutingModule, + WebWorkerModule, ], declarations: [AppComponent], providers: [ diff --git a/projects/demo/src/app/app.component.html b/projects/demo/src/app/app.component.html index e69de29..c2ed58c 100644 --- a/projects/demo/src/app/app.component.html +++ b/projects/demo/src/app/app.component.html @@ -0,0 +1,7 @@ +
+ + + +
+ +{{ workerThread | async | json }} diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts index 67d9132..fe76690 100644 --- a/projects/demo/src/app/app.component.ts +++ b/projects/demo/src/app/app.component.ts @@ -1,4 +1,5 @@ import {ChangeDetectionStrategy, Component} from '@angular/core'; +import {WebWorker, WebWorkerExecutor} from '@ng-web-apis/workers'; @Component({ selector: 'main', @@ -6,4 +7,12 @@ import {ChangeDetectionStrategy, Component} from '@angular/core'; styleUrls: ['./app.component.less'], changeDetection: ChangeDetectionStrategy.OnPush, }) -export class AppComponent {} +export class AppComponent { + public workerThread: WebWorker; + + constructor(webWorkerExecutor: WebWorkerExecutor) { + this.workerThread = webWorkerExecutor.execute((result: string) => + Promise.resolve(`Message from worker: ${result}`), + ); + } +} diff --git a/projects/workers/src/public-api.ts b/projects/workers/src/public-api.ts index 8e9f6fc..08e522d 100644 --- a/projects/workers/src/public-api.ts +++ b/projects/workers/src/public-api.ts @@ -1,3 +1,7 @@ /** * Public API Surface of @ng-web-apis/workers */ +export * from './web-worker/web-worker.module'; +export * from './web-worker/types/web-worker-function'; +export * from './web-worker/services/web-worker-executor.service'; +export * from './web-worker/classes/web-worker'; diff --git a/projects/workers/src/web-worker/classes/any-next-subject.ts b/projects/workers/src/web-worker/classes/any-next-subject.ts new file mode 100644 index 0000000..209bb81 --- /dev/null +++ b/projects/workers/src/web-worker/classes/any-next-subject.ts @@ -0,0 +1,7 @@ +import {Subject} from 'rxjs'; + +export class AnyNextSubject extends Subject { + next(value?: any) { + super.next(value); + } +} diff --git a/projects/workers/src/web-worker/classes/web-worker.ts b/projects/workers/src/web-worker/classes/web-worker.ts new file mode 100644 index 0000000..c354765 --- /dev/null +++ b/projects/workers/src/web-worker/classes/web-worker.ts @@ -0,0 +1,78 @@ +import {fromEvent} from 'rxjs'; +import {takeWhile} from 'rxjs/operators'; +import {WebWorkerFunction} from '../types/web-worker-function'; +import {AnyNextSubject} from './any-next-subject'; + +export class WebWorker extends AnyNextSubject { + private worker!: Worker; + + private static createFnUrl(fn: WebWorkerFunction): string { + const script = ` +self.addEventListener('message', function(e) { + var result = ((${fn.toString()}).call(null, e.data)); + if(result && [typeof result.then, typeof result.catch].every(function (type) {return type === 'function'})){ + result.then(function(res){ + postMessage({result: res}); + }).catch(function(error){ + postMessage({error: error}); + }) + } else { + postMessage(result); + } +}); + `; + + const blob = new Blob([script], {type: 'text/javascript'}); + + return URL.createObjectURL(blob); + } + + public static fromFunction( + fn: WebWorkerFunction, + options?: WorkerOptions, + ): WebWorker { + return new WebWorker(WebWorker.createFnUrl(fn), options); + } + + constructor(private url: string, options?: WorkerOptions) { + super(); + + try { + this.worker = new Worker(url, options); + } catch (e) { + this.error(e); + } + + fromEvent(this.worker, 'message') + .pipe(takeWhile(() => !this.isStopped)) + .subscribe(event => { + if (event.data) { + if (event.data.hasOwnProperty('error')) { + this.error(event.data.error); + } else if (event.data.hasOwnProperty('result')) { + super.next(event.data.result); + } else { + super.next(); + } + } + }); + + fromEvent(this.worker, 'error') + .pipe(takeWhile(() => !this.isStopped)) + .subscribe(event => { + this.error(event.error); + }); + } + + complete() { + this.worker.terminate(); + URL.revokeObjectURL(this.url); + super.complete(); + } + + next(value?: T) { + if (!this.isStopped) { + this.worker.postMessage(value); + } + } +} diff --git a/projects/workers/src/web-worker/services/web-worker-executor.service.spec.ts b/projects/workers/src/web-worker/services/web-worker-executor.service.spec.ts new file mode 100644 index 0000000..9e32fb6 --- /dev/null +++ b/projects/workers/src/web-worker/services/web-worker-executor.service.spec.ts @@ -0,0 +1,13 @@ +import {TestBed} from '@angular/core/testing'; + +import {WebWorkerExecutor} from './web-worker-executor.service'; + +describe('WebWorkerExecutorService', () => { + beforeEach(() => TestBed.configureTestingModule({})); + + it('should be created', () => { + const service: WebWorkerExecutor = TestBed.get(WebWorkerExecutor); + + expect(service).toBeTruthy(); + }); +}); diff --git a/projects/workers/src/web-worker/services/web-worker-executor.service.ts b/projects/workers/src/web-worker/services/web-worker-executor.service.ts new file mode 100644 index 0000000..5f8f822 --- /dev/null +++ b/projects/workers/src/web-worker/services/web-worker-executor.service.ts @@ -0,0 +1,18 @@ +import {inject, Injectable, InjectFlags} from '@angular/core'; +import {WebWorker} from '../classes/web-worker'; +import {WebWorkerFunction} from '../types/web-worker-function'; +import {WebWorkerModule} from '../web-worker.module'; + +@Injectable({ + providedIn: WebWorkerModule, + useFactory(): WebWorkerExecutor { + const instance = inject(WebWorkerExecutor, InjectFlags.Optional); + + return instance || new WebWorkerExecutor(); + }, +}) +export class WebWorkerExecutor { + execute(fn: WebWorkerFunction, options?: WorkerOptions): WebWorker { + return WebWorker.fromFunction(fn, options); + } +} diff --git a/projects/workers/src/web-worker/types/web-worker-function.ts b/projects/workers/src/web-worker/types/web-worker-function.ts new file mode 100644 index 0000000..5bdb3c2 --- /dev/null +++ b/projects/workers/src/web-worker/types/web-worker-function.ts @@ -0,0 +1 @@ +export type WebWorkerFunction = (data: T) => R | PromiseLike; diff --git a/projects/workers/src/web-worker/web-worker.module.ts b/projects/workers/src/web-worker/web-worker.module.ts new file mode 100644 index 0000000..4616bf6 --- /dev/null +++ b/projects/workers/src/web-worker/web-worker.module.ts @@ -0,0 +1,7 @@ +import {CommonModule} from '@angular/common'; +import {NgModule} from '@angular/core'; + +@NgModule({ + imports: [CommonModule], +}) +export class WebWorkerModule {} From fb2c592bcd94ab7f332d1624cdf9bf8a2c282eed Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Wed, 30 Sep 2020 19:10:47 +0300 Subject: [PATCH 02/22] chore: refactor service --- projects/demo/src/app/app.component.ts | 2 +- .../services/web-worker-executor.service.ts | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts index fe76690..c513700 100644 --- a/projects/demo/src/app/app.component.ts +++ b/projects/demo/src/app/app.component.ts @@ -11,7 +11,7 @@ export class AppComponent { public workerThread: WebWorker; constructor(webWorkerExecutor: WebWorkerExecutor) { - this.workerThread = webWorkerExecutor.execute((result: string) => + this.workerThread = webWorkerExecutor.createWorker((result: string) => Promise.resolve(`Message from worker: ${result}`), ); } diff --git a/projects/workers/src/web-worker/services/web-worker-executor.service.ts b/projects/workers/src/web-worker/services/web-worker-executor.service.ts index 5f8f822..3f12d68 100644 --- a/projects/workers/src/web-worker/services/web-worker-executor.service.ts +++ b/projects/workers/src/web-worker/services/web-worker-executor.service.ts @@ -12,7 +12,19 @@ import {WebWorkerModule} from '../web-worker.module'; }, }) export class WebWorkerExecutor { - execute(fn: WebWorkerFunction, options?: WorkerOptions): WebWorker { + execute(fn: WebWorkerFunction, data: T): Promise { + const worker = this.createWorker(fn); + const promise = worker.toPromise(); + + worker.next(data); + + return promise; + } + + createWorker( + fn: WebWorkerFunction, + options?: WorkerOptions, + ): WebWorker { return WebWorker.fromFunction(fn, options); } } From a7c1218e53aa20a83f1da95a5daa74fdffaf5b97 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Wed, 7 Oct 2020 22:30:53 +0300 Subject: [PATCH 03/22] feat: added WorkerPipe --- projects/demo/src/app/app.browser.module.ts | 4 +-- projects/demo/src/app/app.component.html | 2 +- projects/demo/src/app/app.component.ts | 8 +++-- projects/workers/src/public-api.ts | 8 ++--- .../services/web-worker-executor.service.ts | 30 ------------------- .../web-worker/types/web-worker-function.ts | 1 - .../src/web-worker/web-worker.module.ts | 7 ----- .../classes/any-next-subject.ts | 0 .../classes/web-worker.ts | 8 ++--- .../src/worker/pipes/worker.pipe.spec.ts | 10 +++++++ .../workers/src/worker/pipes/worker.pipe.ts | 25 ++++++++++++++++ .../services/worker-executor.service.spec.ts} | 4 +-- .../services/worker-executor.service.ts | 23 ++++++++++++++ .../src/worker/types/worker-function.ts | 1 + projects/workers/src/worker/worker.module.ts | 21 +++++++++++++ 15 files changed, 99 insertions(+), 53 deletions(-) delete mode 100644 projects/workers/src/web-worker/services/web-worker-executor.service.ts delete mode 100644 projects/workers/src/web-worker/types/web-worker-function.ts delete mode 100644 projects/workers/src/web-worker/web-worker.module.ts rename projects/workers/src/{web-worker => worker}/classes/any-next-subject.ts (100%) rename projects/workers/src/{web-worker => worker}/classes/web-worker.ts (90%) create mode 100644 projects/workers/src/worker/pipes/worker.pipe.spec.ts create mode 100644 projects/workers/src/worker/pipes/worker.pipe.ts rename projects/workers/src/{web-worker/services/web-worker-executor.service.spec.ts => worker/services/worker-executor.service.spec.ts} (63%) create mode 100644 projects/workers/src/worker/services/worker-executor.service.ts create mode 100644 projects/workers/src/worker/types/worker-function.ts create mode 100644 projects/workers/src/worker/worker.module.ts diff --git a/projects/demo/src/app/app.browser.module.ts b/projects/demo/src/app/app.browser.module.ts index 70bf580..ebd5f58 100644 --- a/projects/demo/src/app/app.browser.module.ts +++ b/projects/demo/src/app/app.browser.module.ts @@ -7,7 +7,7 @@ import { import {NgModule} from '@angular/core'; import {FormsModule} from '@angular/forms'; import {BrowserModule} from '@angular/platform-browser'; -import {WebWorkerModule} from '@ng-web-apis/workers'; +import {WorkerModule} from '@ng-web-apis/workers'; import {AppComponent} from './app.component'; import {AppRoutingModule} from './app.routes'; @@ -18,7 +18,7 @@ import {AppRoutingModule} from './app.routes'; FormsModule, BrowserModule.withServerTransition({appId: 'demo'}), AppRoutingModule, - WebWorkerModule, + WorkerModule, ], declarations: [AppComponent], providers: [ diff --git a/projects/demo/src/app/app.component.html b/projects/demo/src/app/app.component.html index c2ed58c..6c3593f 100644 --- a/projects/demo/src/app/app.component.html +++ b/projects/demo/src/app/app.component.html @@ -4,4 +4,4 @@ -{{ workerThread | async | json }} +{{ workerThread | async | waWorker: oneMoreFn | async | json }} diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts index c513700..6b9a8bc 100644 --- a/projects/demo/src/app/app.component.ts +++ b/projects/demo/src/app/app.component.ts @@ -1,5 +1,5 @@ import {ChangeDetectionStrategy, Component} from '@angular/core'; -import {WebWorker, WebWorkerExecutor} from '@ng-web-apis/workers'; +import {WebWorker, WorkerExecutor} from '@ng-web-apis/workers'; @Component({ selector: 'main', @@ -10,9 +10,13 @@ import {WebWorker, WebWorkerExecutor} from '@ng-web-apis/workers'; export class AppComponent { public workerThread: WebWorker; - constructor(webWorkerExecutor: WebWorkerExecutor) { + constructor(webWorkerExecutor: WorkerExecutor) { this.workerThread = webWorkerExecutor.createWorker((result: string) => Promise.resolve(`Message from worker: ${result}`), ); } + + oneMoreFn(data: any): Promise { + return Promise.resolve().then(() => data); + } } diff --git a/projects/workers/src/public-api.ts b/projects/workers/src/public-api.ts index 08e522d..03e57af 100644 --- a/projects/workers/src/public-api.ts +++ b/projects/workers/src/public-api.ts @@ -1,7 +1,7 @@ /** * Public API Surface of @ng-web-apis/workers */ -export * from './web-worker/web-worker.module'; -export * from './web-worker/types/web-worker-function'; -export * from './web-worker/services/web-worker-executor.service'; -export * from './web-worker/classes/web-worker'; +export * from './worker/worker.module'; +export * from './worker/types/worker-function'; +export * from './worker/services/worker-executor.service'; +export * from './worker/classes/web-worker'; diff --git a/projects/workers/src/web-worker/services/web-worker-executor.service.ts b/projects/workers/src/web-worker/services/web-worker-executor.service.ts deleted file mode 100644 index 3f12d68..0000000 --- a/projects/workers/src/web-worker/services/web-worker-executor.service.ts +++ /dev/null @@ -1,30 +0,0 @@ -import {inject, Injectable, InjectFlags} from '@angular/core'; -import {WebWorker} from '../classes/web-worker'; -import {WebWorkerFunction} from '../types/web-worker-function'; -import {WebWorkerModule} from '../web-worker.module'; - -@Injectable({ - providedIn: WebWorkerModule, - useFactory(): WebWorkerExecutor { - const instance = inject(WebWorkerExecutor, InjectFlags.Optional); - - return instance || new WebWorkerExecutor(); - }, -}) -export class WebWorkerExecutor { - execute(fn: WebWorkerFunction, data: T): Promise { - const worker = this.createWorker(fn); - const promise = worker.toPromise(); - - worker.next(data); - - return promise; - } - - createWorker( - fn: WebWorkerFunction, - options?: WorkerOptions, - ): WebWorker { - return WebWorker.fromFunction(fn, options); - } -} diff --git a/projects/workers/src/web-worker/types/web-worker-function.ts b/projects/workers/src/web-worker/types/web-worker-function.ts deleted file mode 100644 index 5bdb3c2..0000000 --- a/projects/workers/src/web-worker/types/web-worker-function.ts +++ /dev/null @@ -1 +0,0 @@ -export type WebWorkerFunction = (data: T) => R | PromiseLike; diff --git a/projects/workers/src/web-worker/web-worker.module.ts b/projects/workers/src/web-worker/web-worker.module.ts deleted file mode 100644 index 4616bf6..0000000 --- a/projects/workers/src/web-worker/web-worker.module.ts +++ /dev/null @@ -1,7 +0,0 @@ -import {CommonModule} from '@angular/common'; -import {NgModule} from '@angular/core'; - -@NgModule({ - imports: [CommonModule], -}) -export class WebWorkerModule {} diff --git a/projects/workers/src/web-worker/classes/any-next-subject.ts b/projects/workers/src/worker/classes/any-next-subject.ts similarity index 100% rename from projects/workers/src/web-worker/classes/any-next-subject.ts rename to projects/workers/src/worker/classes/any-next-subject.ts diff --git a/projects/workers/src/web-worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts similarity index 90% rename from projects/workers/src/web-worker/classes/web-worker.ts rename to projects/workers/src/worker/classes/web-worker.ts index c354765..10f2dd6 100644 --- a/projects/workers/src/web-worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -1,12 +1,12 @@ import {fromEvent} from 'rxjs'; import {takeWhile} from 'rxjs/operators'; -import {WebWorkerFunction} from '../types/web-worker-function'; +import {WorkerFunction} from '../types/worker-function'; import {AnyNextSubject} from './any-next-subject'; -export class WebWorker extends AnyNextSubject { +export class WebWorker extends AnyNextSubject { private worker!: Worker; - private static createFnUrl(fn: WebWorkerFunction): string { + private static createFnUrl(fn: WorkerFunction): string { const script = ` self.addEventListener('message', function(e) { var result = ((${fn.toString()}).call(null, e.data)); @@ -28,7 +28,7 @@ self.addEventListener('message', function(e) { } public static fromFunction( - fn: WebWorkerFunction, + fn: WorkerFunction, options?: WorkerOptions, ): WebWorker { return new WebWorker(WebWorker.createFnUrl(fn), options); diff --git a/projects/workers/src/worker/pipes/worker.pipe.spec.ts b/projects/workers/src/worker/pipes/worker.pipe.spec.ts new file mode 100644 index 0000000..eccf8f7 --- /dev/null +++ b/projects/workers/src/worker/pipes/worker.pipe.spec.ts @@ -0,0 +1,10 @@ +import {WorkerExecutor} from '../services/worker-executor.service'; +import {WorkerPipe} from './worker.pipe'; + +describe('WorkerPipe', () => { + it('create an instance', () => { + const pipe = new WorkerPipe(new WorkerExecutor()); + + expect(pipe).toBeTruthy(); + }); +}); diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts new file mode 100644 index 0000000..c9f5c8e --- /dev/null +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -0,0 +1,25 @@ +import {Pipe, PipeTransform} from '@angular/core'; +import {WebWorker} from '../classes/web-worker'; +import {WorkerExecutor} from '../services/worker-executor.service'; +import {WorkerFunction} from '../types/worker-function'; + +@Pipe({ + name: 'waWorker', +}) +export class WorkerPipe implements PipeTransform { + private workers = new WeakMap(); + + constructor(private workerExecutor: WorkerExecutor) {} + + transform(value: any, fn: WorkerFunction): any { + const worker = this.workers.has(fn) + ? (this.workers.get(fn) as WebWorker) + : this.workerExecutor.createWorker(fn); + + this.workers.set(fn, worker); + + worker.next(value); + + return worker; + } +} diff --git a/projects/workers/src/web-worker/services/web-worker-executor.service.spec.ts b/projects/workers/src/worker/services/worker-executor.service.spec.ts similarity index 63% rename from projects/workers/src/web-worker/services/web-worker-executor.service.spec.ts rename to projects/workers/src/worker/services/worker-executor.service.spec.ts index 9e32fb6..0ebb0d6 100644 --- a/projects/workers/src/web-worker/services/web-worker-executor.service.spec.ts +++ b/projects/workers/src/worker/services/worker-executor.service.spec.ts @@ -1,12 +1,12 @@ import {TestBed} from '@angular/core/testing'; -import {WebWorkerExecutor} from './web-worker-executor.service'; +import {WorkerExecutor} from './worker-executor.service'; describe('WebWorkerExecutorService', () => { beforeEach(() => TestBed.configureTestingModule({})); it('should be created', () => { - const service: WebWorkerExecutor = TestBed.get(WebWorkerExecutor); + const service: WorkerExecutor = TestBed.get(WorkerExecutor); expect(service).toBeTruthy(); }); diff --git a/projects/workers/src/worker/services/worker-executor.service.ts b/projects/workers/src/worker/services/worker-executor.service.ts new file mode 100644 index 0000000..1214a3f --- /dev/null +++ b/projects/workers/src/worker/services/worker-executor.service.ts @@ -0,0 +1,23 @@ +import {Injectable} from '@angular/core'; +import {WebWorker} from '../classes/web-worker'; +import {WorkerFunction} from '../types/worker-function'; + +@Injectable() +export class WorkerExecutor { + execute(fn: WorkerFunction, data?: T): Promise { + const worker = this.createWorker(fn); + const promise = worker.toPromise(); + + worker.next(data); + worker.complete(); + + return promise; + } + + createWorker( + fn: WorkerFunction, + options?: WorkerOptions, + ): WebWorker { + return WebWorker.fromFunction(fn, options); + } +} diff --git a/projects/workers/src/worker/types/worker-function.ts b/projects/workers/src/worker/types/worker-function.ts new file mode 100644 index 0000000..db3eb10 --- /dev/null +++ b/projects/workers/src/worker/types/worker-function.ts @@ -0,0 +1 @@ +export type WorkerFunction = (data: T) => R | PromiseLike; diff --git a/projects/workers/src/worker/worker.module.ts b/projects/workers/src/worker/worker.module.ts new file mode 100644 index 0000000..a021e75 --- /dev/null +++ b/projects/workers/src/worker/worker.module.ts @@ -0,0 +1,21 @@ +import {CommonModule} from '@angular/common'; +import {inject, InjectFlags, NgModule} from '@angular/core'; +import {WorkerPipe} from './pipes/worker.pipe'; +import {WorkerExecutor} from './services/worker-executor.service'; + +@NgModule({ + imports: [CommonModule], + declarations: [WorkerPipe], + exports: [WorkerPipe], + providers: [ + { + provide: WorkerExecutor, + useFactory(): WorkerExecutor { + const instance = inject(WorkerExecutor, InjectFlags.Optional); + + return instance || new WorkerExecutor(); + }, + }, + ], +}) +export class WorkerModule {} From e257eb56815930443976714fba9823f6747c8622 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Thu, 8 Oct 2020 10:13:33 +0300 Subject: [PATCH 04/22] test: started writing tests --- .../src/worker/classes/web-worker.spec.ts | 16 ++++++ .../workers/src/worker/classes/web-worker.ts | 56 +++++++++---------- .../services/worker-executor.service.spec.ts | 36 ++++++++++-- .../services/worker-executor.service.ts | 4 +- projects/workers/src/worker/worker.module.ts | 13 +---- 5 files changed, 78 insertions(+), 47 deletions(-) create mode 100644 projects/workers/src/worker/classes/web-worker.spec.ts diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts new file mode 100644 index 0000000..88eb566 --- /dev/null +++ b/projects/workers/src/worker/classes/web-worker.spec.ts @@ -0,0 +1,16 @@ +import {WebWorker} from './web-worker'; + +describe('WebWorker', () => { + it('should create worker from a function', () => { + const worker = WebWorker.fromFunction(d => d); + + expect(worker instanceof WebWorker).toEqual(true); + expect((worker as any).worker instanceof Worker).toEqual(true); + }); + + it("shouldn't create worker", async () => { + const worker = new WebWorker('some/wrong/url'); + + await expectAsync(worker.toPromise()).toBeRejected(); + }); +}); diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index 10f2dd6..f3e8b4d 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -6,34 +6,6 @@ import {AnyNextSubject} from './any-next-subject'; export class WebWorker extends AnyNextSubject { private worker!: Worker; - private static createFnUrl(fn: WorkerFunction): string { - const script = ` -self.addEventListener('message', function(e) { - var result = ((${fn.toString()}).call(null, e.data)); - if(result && [typeof result.then, typeof result.catch].every(function (type) {return type === 'function'})){ - result.then(function(res){ - postMessage({result: res}); - }).catch(function(error){ - postMessage({error: error}); - }) - } else { - postMessage(result); - } -}); - `; - - const blob = new Blob([script], {type: 'text/javascript'}); - - return URL.createObjectURL(blob); - } - - public static fromFunction( - fn: WorkerFunction, - options?: WorkerOptions, - ): WebWorker { - return new WebWorker(WebWorker.createFnUrl(fn), options); - } - constructor(private url: string, options?: WorkerOptions) { super(); @@ -64,6 +36,34 @@ self.addEventListener('message', function(e) { }); } + public static fromFunction( + fn: WorkerFunction, + options?: WorkerOptions, + ): WebWorker { + return new WebWorker(WebWorker.createFnUrl(fn), options); + } + + private static createFnUrl(fn: WorkerFunction): string { + const script = ` +self.addEventListener('message', function(e) { + var result = ((${fn.toString()}).call(null, e.data)); + if(result && [typeof result.then, typeof result.catch].every(function (type) {return type === 'function'})){ + result.then(function(res){ + postMessage({result: res}); + }).catch(function(error){ + postMessage({error: error}); + }) + } else { + postMessage({result: result}); + } +}); + `; + + const blob = new Blob([script], {type: 'text/javascript'}); + + return URL.createObjectURL(blob); + } + complete() { this.worker.terminate(); URL.revokeObjectURL(this.url); diff --git a/projects/workers/src/worker/services/worker-executor.service.spec.ts b/projects/workers/src/worker/services/worker-executor.service.spec.ts index 0ebb0d6..7752ebb 100644 --- a/projects/workers/src/worker/services/worker-executor.service.spec.ts +++ b/projects/workers/src/worker/services/worker-executor.service.spec.ts @@ -1,13 +1,37 @@ import {TestBed} from '@angular/core/testing'; - +import {WorkerModule} from '@ng-web-apis/workers'; +import {take} from 'rxjs/operators'; import {WorkerExecutor} from './worker-executor.service'; -describe('WebWorkerExecutorService', () => { - beforeEach(() => TestBed.configureTestingModule({})); +describe('WorkerExecutorService', () => { + let service: WorkerExecutor; - it('should be created', () => { - const service: WorkerExecutor = TestBed.get(WorkerExecutor); + beforeEach(() => { + TestBed.configureTestingModule({ + imports: [WorkerModule], + }); - expect(service).toBeTruthy(); + service = TestBed.get(WorkerExecutor); }); + + it('should run worker and return correct data', async () => { + const workerPromise: Promise = service.execute( + data => Promise.resolve().then(() => data), + 'some data', + ); + + expect(await workerPromise).toEqual('some data'); + }, 10000); + + it('should create worker', async () => { + const thread = service.createWorker(data => + Promise.resolve(data), + ); + + const workerPromise = thread.pipe(take(1)).toPromise(); + + thread.next('some data'); + + expect(await workerPromise).toEqual('some data'); + }, 10000); }); diff --git a/projects/workers/src/worker/services/worker-executor.service.ts b/projects/workers/src/worker/services/worker-executor.service.ts index 1214a3f..4c6da29 100644 --- a/projects/workers/src/worker/services/worker-executor.service.ts +++ b/projects/workers/src/worker/services/worker-executor.service.ts @@ -1,4 +1,5 @@ import {Injectable} from '@angular/core'; +import {take} from 'rxjs/operators'; import {WebWorker} from '../classes/web-worker'; import {WorkerFunction} from '../types/worker-function'; @@ -6,10 +7,9 @@ import {WorkerFunction} from '../types/worker-function'; export class WorkerExecutor { execute(fn: WorkerFunction, data?: T): Promise { const worker = this.createWorker(fn); - const promise = worker.toPromise(); + const promise = worker.pipe(take(1)).toPromise(); worker.next(data); - worker.complete(); return promise; } diff --git a/projects/workers/src/worker/worker.module.ts b/projects/workers/src/worker/worker.module.ts index a021e75..035bfba 100644 --- a/projects/workers/src/worker/worker.module.ts +++ b/projects/workers/src/worker/worker.module.ts @@ -1,5 +1,5 @@ import {CommonModule} from '@angular/common'; -import {inject, InjectFlags, NgModule} from '@angular/core'; +import {NgModule} from '@angular/core'; import {WorkerPipe} from './pipes/worker.pipe'; import {WorkerExecutor} from './services/worker-executor.service'; @@ -7,15 +7,6 @@ import {WorkerExecutor} from './services/worker-executor.service'; imports: [CommonModule], declarations: [WorkerPipe], exports: [WorkerPipe], - providers: [ - { - provide: WorkerExecutor, - useFactory(): WorkerExecutor { - const instance = inject(WorkerExecutor, InjectFlags.Optional); - - return instance || new WorkerExecutor(); - }, - }, - ], + providers: [WorkerExecutor], }) export class WorkerModule {} From 86e17b5ce9642fe1daf94bf95f29cb784e0d2d4e Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Thu, 29 Oct 2020 22:07:05 +0300 Subject: [PATCH 05/22] test: more tests --- projects/demo/src/app/app.component.ts | 4 +-- .../workers/src/worker/classes/web-worker.ts | 28 +++++++++------- .../src/worker/pipes/worker.pipe.spec.ts | 32 +++++++++++++++++-- .../workers/src/worker/pipes/worker.pipe.ts | 3 +- .../src/worker/types/worker-function.ts | 2 +- 5 files changed, 51 insertions(+), 18 deletions(-) diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts index 6b9a8bc..ba51c28 100644 --- a/projects/demo/src/app/app.component.ts +++ b/projects/demo/src/app/app.component.ts @@ -16,7 +16,7 @@ export class AppComponent { ); } - oneMoreFn(data: any): Promise { - return Promise.resolve().then(() => data); + oneMoreFn(data: string): Promise { + return Promise.resolve(data); } } diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index f3e8b4d..fb29926 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -45,18 +45,24 @@ export class WebWorker extends AnyNextSubject { private static createFnUrl(fn: WorkerFunction): string { const script = ` -self.addEventListener('message', function(e) { - var result = ((${fn.toString()}).call(null, e.data)); - if(result && [typeof result.then, typeof result.catch].every(function (type) {return type === 'function'})){ - result.then(function(res){ - postMessage({result: res}); - }).catch(function(error){ - postMessage({error: error}); - }) - } else { - postMessage({result: result}); +(function(fn){ + function isFunction(type){ + return type === 'function'; } -}); + + self.addEventListener('message', function(e) { + var result = fn.call(null, e.data); + if(result && [typeof result.then, typeof result.catch].every(isFunction)){ + result.then(function(res){ + postMessage({result: res}); + }).catch(function(error){ + postMessage({error: error}); + }) + } else { + postMessage({result: result}); + } + }) +})(${fn.toString()}); `; const blob = new Blob([script], {type: 'text/javascript'}); diff --git a/projects/workers/src/worker/pipes/worker.pipe.spec.ts b/projects/workers/src/worker/pipes/worker.pipe.spec.ts index eccf8f7..e4c0fbc 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.spec.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.spec.ts @@ -1,10 +1,36 @@ +import {take} from 'rxjs/operators'; import {WorkerExecutor} from '../services/worker-executor.service'; import {WorkerPipe} from './worker.pipe'; describe('WorkerPipe', () => { - it('create an instance', () => { - const pipe = new WorkerPipe(new WorkerExecutor()); + let pipe: WorkerPipe; - expect(pipe).toBeTruthy(); + beforeEach(() => { + pipe = new WorkerPipe(new WorkerExecutor()); + }); + + it('should emit the first value', async () => { + const result = await pipe + .transform('a', data => data) + .pipe(take(1)) + .toPromise(); + + expect(result).toEqual('a'); + }); + + it('should return the same worker for the same function', async () => { + const workerFn = (data: unknown) => data; + + const worker = await pipe.transform('a', workerFn); + const theSameWorker = await pipe.transform('a', workerFn); + + expect(worker).toEqual(theSameWorker); + }); + + it('should return a different worker for a different function', async () => { + const worker = await pipe.transform('a', (data: unknown) => data); + const differentWorker = await pipe.transform('a', (data: unknown) => data); + + expect(worker).not.toEqual(differentWorker); }); }); diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts index c9f5c8e..5ae1702 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -1,4 +1,5 @@ import {Pipe, PipeTransform} from '@angular/core'; +import {Observable} from 'rxjs'; import {WebWorker} from '../classes/web-worker'; import {WorkerExecutor} from '../services/worker-executor.service'; import {WorkerFunction} from '../types/worker-function'; @@ -11,7 +12,7 @@ export class WorkerPipe implements PipeTransform { constructor(private workerExecutor: WorkerExecutor) {} - transform(value: any, fn: WorkerFunction): any { + transform(value: T, fn: WorkerFunction): Observable { const worker = this.workers.has(fn) ? (this.workers.get(fn) as WebWorker) : this.workerExecutor.createWorker(fn); diff --git a/projects/workers/src/worker/types/worker-function.ts b/projects/workers/src/worker/types/worker-function.ts index db3eb10..854e555 100644 --- a/projects/workers/src/worker/types/worker-function.ts +++ b/projects/workers/src/worker/types/worker-function.ts @@ -1 +1 @@ -export type WorkerFunction = (data: T) => R | PromiseLike; +export type WorkerFunction = (data: T) => R | Promise; From c9be2fb55976d03524c07328508302ffdd66713b Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Thu, 29 Oct 2020 22:09:35 +0300 Subject: [PATCH 06/22] chore: remove public keyword --- projects/demo/src/app/app.component.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts index ba51c28..23a4880 100644 --- a/projects/demo/src/app/app.component.ts +++ b/projects/demo/src/app/app.component.ts @@ -8,7 +8,7 @@ import {WebWorker, WorkerExecutor} from '@ng-web-apis/workers'; changeDetection: ChangeDetectionStrategy.OnPush, }) export class AppComponent { - public workerThread: WebWorker; + workerThread: WebWorker; constructor(webWorkerExecutor: WorkerExecutor) { this.workerThread = webWorkerExecutor.createWorker((result: string) => From c14afa64e1e169186f4033c88706ee5ff84830f2 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Fri, 30 Oct 2020 09:48:13 +0300 Subject: [PATCH 07/22] test: added more tests for WebWorker --- .../src/worker/classes/web-worker.spec.ts | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts index 88eb566..c07eb86 100644 --- a/projects/workers/src/worker/classes/web-worker.spec.ts +++ b/projects/workers/src/worker/classes/web-worker.spec.ts @@ -1,6 +1,19 @@ +import {Observable} from 'rxjs'; import {WebWorker} from './web-worker'; describe('WebWorker', () => { + it('should fail if a worker is not available', async () => { + const OriginalWorker = Worker; + + delete (window as any).Worker; + + const worker = WebWorker.fromFunction(d => d); + + await expectAsync(worker.toPromise()).toBeRejected(); + + (window as any).Worker = OriginalWorker; + }); + it('should create worker from a function', () => { const worker = WebWorker.fromFunction(d => d); @@ -8,9 +21,39 @@ describe('WebWorker', () => { expect((worker as any).worker instanceof Worker).toEqual(true); }); - it("shouldn't create worker", async () => { + it('should trigger an error if URL not found', async () => { const worker = new WebWorker('some/wrong/url'); await expectAsync(worker.toPromise()).toBeRejected(); }); + + it('should fail if an inner promise is rejected', async () => { + const worker = WebWorker.fromFunction(() => Promise.reject('reason')); + + worker.next(); + + await expect(await worker.toPromise().catch(err => err)).toEqual('reason'); + }); + + it('should resolve the last value before completing', async () => { + const worker = WebWorker.fromFunction((data: string) => Promise.resolve(data)); + + const promise = worker + .pipe(source => { + return new Observable(subscriber => { + source.subscribe({ + next(value: string) { + (source as WebWorker).complete(); + subscriber.next(value); + subscriber.complete(); + }, + }); + }); + }) + .toPromise(); + + worker.next('a'); + worker.next('b'); + expect(await promise).toEqual('a'); + }); }); From ca8f5bc65dba12dd0a4e0b22977114e452e384a4 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Fri, 30 Oct 2020 09:56:46 +0300 Subject: [PATCH 08/22] chore: removed dead code --- projects/workers/src/worker/classes/web-worker.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index fb29926..95f2fc2 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -23,8 +23,6 @@ export class WebWorker extends AnyNextSubject { this.error(event.data.error); } else if (event.data.hasOwnProperty('result')) { super.next(event.data.result); - } else { - super.next(); } } }); @@ -77,8 +75,6 @@ export class WebWorker extends AnyNextSubject { } next(value?: T) { - if (!this.isStopped) { - this.worker.postMessage(value); - } + this.worker.postMessage(value); } } From 3d078d2c6dca7a05557039945136c4904cd97678 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Wed, 4 Nov 2020 15:19:58 +0300 Subject: [PATCH 09/22] chore: update a demo app --- projects/demo/src/app/app.browser.module.ts | 3 ++- projects/demo/src/app/app.component.html | 15 +++++++------ projects/demo/src/app/app.component.less | 9 ++++---- projects/demo/src/app/app.component.ts | 24 +++++++++++++++------ projects/demo/src/app/clock.component.ts | 18 ++++++++++++++++ 5 files changed, 52 insertions(+), 17 deletions(-) create mode 100644 projects/demo/src/app/clock.component.ts diff --git a/projects/demo/src/app/app.browser.module.ts b/projects/demo/src/app/app.browser.module.ts index ebd5f58..5efaf73 100644 --- a/projects/demo/src/app/app.browser.module.ts +++ b/projects/demo/src/app/app.browser.module.ts @@ -10,6 +10,7 @@ import {BrowserModule} from '@angular/platform-browser'; import {WorkerModule} from '@ng-web-apis/workers'; import {AppComponent} from './app.component'; import {AppRoutingModule} from './app.routes'; +import {ClockComponent} from './clock.component'; @NgModule({ bootstrap: [AppComponent], @@ -20,7 +21,7 @@ import {AppRoutingModule} from './app.routes'; AppRoutingModule, WorkerModule, ], - declarations: [AppComponent], + declarations: [AppComponent, ClockComponent], providers: [ { provide: LocationStrategy, diff --git a/projects/demo/src/app/app.component.html b/projects/demo/src/app/app.component.html index 6c3593f..44f5973 100644 --- a/projects/demo/src/app/app.component.html +++ b/projects/demo/src/app/app.component.html @@ -1,7 +1,10 @@ -
- - - -
+ -{{ workerThread | async | waWorker: oneMoreFn | async | json }} +
+ +

Execution time: {{ workerThread | async }}

+
+
+ +

Execution time: {{ result$ | async }}

+
diff --git a/projects/demo/src/app/app.component.less b/projects/demo/src/app/app.component.less index 9fc56c2..1e8a83c 100644 --- a/projects/demo/src/app/app.component.less +++ b/projects/demo/src/app/app.component.less @@ -5,8 +5,9 @@ align-items: center; } -.observer { - background: skyblue; - border-radius: 16px; - padding: 40px; +.example { + min-width: 360px; + border-top: 1px solid gainsboro; + margin-top: 16px; + padding-top: 16px; } diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts index 23a4880..fc93b7c 100644 --- a/projects/demo/src/app/app.component.ts +++ b/projects/demo/src/app/app.component.ts @@ -1,5 +1,7 @@ import {ChangeDetectionStrategy, Component} from '@angular/core'; import {WebWorker, WorkerExecutor} from '@ng-web-apis/workers'; +import {Observable, Subject} from 'rxjs'; +import {map} from 'rxjs/operators'; @Component({ selector: 'main', @@ -8,15 +10,25 @@ import {WebWorker, WorkerExecutor} from '@ng-web-apis/workers'; changeDetection: ChangeDetectionStrategy.OnPush, }) export class AppComponent { - workerThread: WebWorker; + workerThread: WebWorker; + emitter: Subject; + result$: Observable; constructor(webWorkerExecutor: WorkerExecutor) { - this.workerThread = webWorkerExecutor.createWorker((result: string) => - Promise.resolve(`Message from worker: ${result}`), - ); + this.workerThread = webWorkerExecutor.createWorker(this.startCompute); + this.emitter = new Subject(); + this.result$ = this.emitter.pipe(map(this.startCompute)); } - oneMoreFn(data: string): Promise { - return Promise.resolve(data); + startCompute(): number { + function compute(num: number): number { + return Array.from({length: num}).reduce((sum: number) => sum + 1, 0); + } + + const start = performance.now(); + + Array.from({length: 16000}).forEach((_, index) => compute(index)); + + return performance.now() - start; } } diff --git a/projects/demo/src/app/clock.component.ts b/projects/demo/src/app/clock.component.ts new file mode 100644 index 0000000..6f40a87 --- /dev/null +++ b/projects/demo/src/app/clock.component.ts @@ -0,0 +1,18 @@ +import {ChangeDetectionStrategy, Component} from '@angular/core'; +import {Observable, timer} from 'rxjs'; +import {map} from 'rxjs/operators'; + +@Component({ + selector: 'app-clock', + template: ` + {{ date | async | date: 'mediumTime' }} + `, + changeDetection: ChangeDetectionStrategy.OnPush, +}) +export class ClockComponent { + date: Observable; + + constructor() { + this.date = timer(0, 1000).pipe(map(() => Date.now())); + } +} From 97a96421d0065797fe9d24a90e6fbdca43f1a0a1 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Mon, 9 Nov 2020 10:31:29 +0300 Subject: [PATCH 10/22] docs: small updates --- README.md | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 406cc48..af5db6d 100644 --- a/README.md +++ b/README.md @@ -26,9 +26,67 @@ Now install the package: npm i @ng-web-apis/workers ``` -## Usage +## How it use -TBD +You can create worker with service and use it in a template with `AsyncPipe`: + +```typescript +import {WorkerExecutor, WebWorker} from '@ng-web-apis/workers'; + +@Component({ + template: ` + Computed Result: {{ worker | async }} +
+ + +
+ `, +}) +class SomeComponent { + worker: WebWorker; + + construcor(workerExecutor: WorkerExecutor) { + this.worker = workerExecutor.createWorker(this.compute); + } + + compute(data: number): number { + return data ** 2; + } +} +``` + +It's the same with `WorkerPipe` only: + +```typescript +import {WorkerModule} from '@ng-web-apis/workers'; +import {NgModule} from '@angular/core'; + +@NgModule({ + imports: [WorkerModule], + declarations: [SomeComponent], +}) +class SomeModule {} +``` + +```typescript +import {WorkerExecutor, WebWorker} from '@ng-web-apis/workers'; +import {FormControl} from '@angular/forms'; + +@Component({ + template: ` + Computed Result: {{ control.value | waWorker: changeData | async }} + + + `, +}) +class SomeComponent { + control = new FormControl(''); + + changeData(data: number): number { + return `${data} (changed)`; + } +} +``` ## See also From 736f6e5464ce95fd2e0a8a0244d873802adae3b4 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Mon, 9 Nov 2020 19:22:39 +0300 Subject: [PATCH 11/22] chore: changes after review --- README.md | 4 ++-- projects/workers/src/worker/classes/web-worker.spec.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index af5db6d..98125ed 100644 --- a/README.md +++ b/README.md @@ -43,9 +43,9 @@ import {WorkerExecutor, WebWorker} from '@ng-web-apis/workers'; `, }) class SomeComponent { - worker: WebWorker; + readonly worker: WebWorker; - construcor(workerExecutor: WorkerExecutor) { + constructor(workerExecutor: WorkerExecutor) { this.worker = workerExecutor.createWorker(this.compute); } diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts index c07eb86..c20a59e 100644 --- a/projects/workers/src/worker/classes/web-worker.spec.ts +++ b/projects/workers/src/worker/classes/web-worker.spec.ts @@ -21,7 +21,7 @@ describe('WebWorker', () => { expect((worker as any).worker instanceof Worker).toEqual(true); }); - it('should trigger an error if URL not found', async () => { + it('should trigger an error if URL was not found', async () => { const worker = new WebWorker('some/wrong/url'); await expectAsync(worker.toPromise()).toBeRejected(); From dd6fca7b553fddd86ed5adb8c361a910a9393559 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Wed, 11 Nov 2020 09:31:53 +0300 Subject: [PATCH 12/22] chore: moved the worker function --- .../workers/src/worker/classes/web-worker.ts | 22 ++----------------- .../src/worker/consts/worker-fn-template.ts | 20 +++++++++++++++++ 2 files changed, 22 insertions(+), 20 deletions(-) create mode 100644 projects/workers/src/worker/consts/worker-fn-template.ts diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index 95f2fc2..dc922b4 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -1,5 +1,6 @@ import {fromEvent} from 'rxjs'; import {takeWhile} from 'rxjs/operators'; +import {WORKER_BLANK_FN} from '../consts/worker-fn-template'; import {WorkerFunction} from '../types/worker-function'; import {AnyNextSubject} from './any-next-subject'; @@ -42,26 +43,7 @@ export class WebWorker extends AnyNextSubject { } private static createFnUrl(fn: WorkerFunction): string { - const script = ` -(function(fn){ - function isFunction(type){ - return type === 'function'; - } - - self.addEventListener('message', function(e) { - var result = fn.call(null, e.data); - if(result && [typeof result.then, typeof result.catch].every(isFunction)){ - result.then(function(res){ - postMessage({result: res}); - }).catch(function(error){ - postMessage({error: error}); - }) - } else { - postMessage({result: result}); - } - }) -})(${fn.toString()}); - `; + const script = `(${WORKER_BLANK_FN})(${fn.toString()});`; const blob = new Blob([script], {type: 'text/javascript'}); diff --git a/projects/workers/src/worker/consts/worker-fn-template.ts b/projects/workers/src/worker/consts/worker-fn-template.ts new file mode 100644 index 0000000..a6b92f8 --- /dev/null +++ b/projects/workers/src/worker/consts/worker-fn-template.ts @@ -0,0 +1,20 @@ +export const WORKER_BLANK_FN = ` +function(fn){ + function isFunction(type){ + return type === 'function'; + } + + self.addEventListener('message', function(e) { + var result = fn.call(null, e.data); + if(result && [typeof result.then, typeof result.catch].every(isFunction)){ + result.then(function(res){ + postMessage({result: res}); + }).catch(function(error){ + postMessage({error: error}); + }) + } else { + postMessage({result: result}); + } + }) +} +`; From 5b84449e445b2d90ea2bfd66f991e1425e5f9ccf Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Wed, 11 Nov 2020 09:33:32 +0300 Subject: [PATCH 13/22] chore: added export of missing pipe --- projects/workers/src/public-api.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/projects/workers/src/public-api.ts b/projects/workers/src/public-api.ts index 03e57af..747162b 100644 --- a/projects/workers/src/public-api.ts +++ b/projects/workers/src/public-api.ts @@ -5,3 +5,4 @@ export * from './worker/worker.module'; export * from './worker/types/worker-function'; export * from './worker/services/worker-executor.service'; export * from './worker/classes/web-worker'; +export * from './worker/pipes/worker.pipe'; From ba0fb3080e0fabf6957fc97c62850e5931989379 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Wed, 11 Nov 2020 09:37:40 +0300 Subject: [PATCH 14/22] chore: added the 'readonly' modifier --- projects/demo/src/app/app.component.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts index fc93b7c..e889ccf 100644 --- a/projects/demo/src/app/app.component.ts +++ b/projects/demo/src/app/app.component.ts @@ -10,9 +10,9 @@ import {map} from 'rxjs/operators'; changeDetection: ChangeDetectionStrategy.OnPush, }) export class AppComponent { - workerThread: WebWorker; - emitter: Subject; - result$: Observable; + readonly workerThread: WebWorker; + readonly emitter: Subject; + readonly result$: Observable; constructor(webWorkerExecutor: WorkerExecutor) { this.workerThread = webWorkerExecutor.createWorker(this.startCompute); From 8420911a4f3002165c0449d9bddc9ade5721aebf Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Mon, 16 Nov 2020 09:59:46 +0300 Subject: [PATCH 15/22] chore: refactoring after review --- README.md | 22 +++++++--------- projects/demo/src/app/app.component.html | 2 +- projects/demo/src/app/app.component.ts | 8 +++--- projects/demo/src/app/clock.component.ts | 8 ++---- .../src/worker/classes/any-next-subject.ts | 7 ----- .../src/worker/classes/web-worker.spec.ts | 10 ++++--- .../workers/src/worker/classes/web-worker.ts | 26 +++++++++---------- .../workers/src/worker/pipes/worker.pipe.ts | 6 ++--- .../services/worker-executor.service.spec.ts | 2 +- .../services/worker-executor.service.ts | 4 +-- 10 files changed, 39 insertions(+), 56 deletions(-) delete mode 100644 projects/workers/src/worker/classes/any-next-subject.ts diff --git a/README.md b/README.md index 98125ed..e353a35 100644 --- a/README.md +++ b/README.md @@ -33,25 +33,21 @@ You can create worker with service and use it in a template with `AsyncPipe`: ```typescript import {WorkerExecutor, WebWorker} from '@ng-web-apis/workers'; +function compute(data: number): number { + return data ** 2; +} + @Component({ template: ` Computed Result: {{ worker | async }} -
+
`, }) class SomeComponent { - readonly worker: WebWorker; - - constructor(workerExecutor: WorkerExecutor) { - this.worker = workerExecutor.createWorker(this.compute); - } - - compute(data: number): number { - return data ** 2; - } + readonly worker: WebWorker = workerExecutor.createWorker(compute); } ``` @@ -74,13 +70,13 @@ import {FormControl} from '@angular/forms'; @Component({ template: ` - Computed Result: {{ control.value | waWorker: changeData | async }} + Computed Result: {{ value | waWorker: changeData | async }} - + `, }) class SomeComponent { - control = new FormControl(''); + value: string; changeData(data: number): number { return `${data} (changed)`; diff --git a/projects/demo/src/app/app.component.html b/projects/demo/src/app/app.component.html index 44f5973..128742d 100644 --- a/projects/demo/src/app/app.component.html +++ b/projects/demo/src/app/app.component.html @@ -1,7 +1,7 @@
- +

Execution time: {{ workerThread | async }}

diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts index e889ccf..c10e6ca 100644 --- a/projects/demo/src/app/app.component.ts +++ b/projects/demo/src/app/app.component.ts @@ -21,13 +21,11 @@ export class AppComponent { } startCompute(): number { - function compute(num: number): number { - return Array.from({length: num}).reduce((sum: number) => sum + 1, 0); - } - const start = performance.now(); - Array.from({length: 16000}).forEach((_, index) => compute(index)); + Array.from({length: 16000}).forEach((_, index) => + Array.from({length: index}).reduce((sum: number) => sum + 1, 0), + ); return performance.now() - start; } diff --git a/projects/demo/src/app/clock.component.ts b/projects/demo/src/app/clock.component.ts index 6f40a87..10915a5 100644 --- a/projects/demo/src/app/clock.component.ts +++ b/projects/demo/src/app/clock.component.ts @@ -5,14 +5,10 @@ import {map} from 'rxjs/operators'; @Component({ selector: 'app-clock', template: ` - {{ date | async | date: 'mediumTime' }} + {{ date$ | async | date: 'mediumTime' }} `, changeDetection: ChangeDetectionStrategy.OnPush, }) export class ClockComponent { - date: Observable; - - constructor() { - this.date = timer(0, 1000).pipe(map(() => Date.now())); - } + readonly date$: Observable = timer(0, 1000).pipe(map(() => Date.now())); } diff --git a/projects/workers/src/worker/classes/any-next-subject.ts b/projects/workers/src/worker/classes/any-next-subject.ts deleted file mode 100644 index 209bb81..0000000 --- a/projects/workers/src/worker/classes/any-next-subject.ts +++ /dev/null @@ -1,7 +0,0 @@ -import {Subject} from 'rxjs'; - -export class AnyNextSubject extends Subject { - next(value?: any) { - super.next(value); - } -} diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts index c20a59e..c915d54 100644 --- a/projects/workers/src/worker/classes/web-worker.spec.ts +++ b/projects/workers/src/worker/classes/web-worker.spec.ts @@ -28,9 +28,11 @@ describe('WebWorker', () => { }); it('should fail if an inner promise is rejected', async () => { - const worker = WebWorker.fromFunction(() => Promise.reject('reason')); + const worker = WebWorker.fromFunction(() => + Promise.reject('reason'), + ); - worker.next(); + worker.postMessage(); await expect(await worker.toPromise().catch(err => err)).toEqual('reason'); }); @@ -52,8 +54,8 @@ describe('WebWorker', () => { }) .toPromise(); - worker.next('a'); - worker.next('b'); + worker.postMessage('a'); + worker.postMessage('b'); expect(await promise).toEqual('a'); }); }); diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index dc922b4..e3fb8d6 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -1,10 +1,9 @@ -import {fromEvent} from 'rxjs'; -import {takeWhile} from 'rxjs/operators'; +import {fromEvent, Subject} from 'rxjs'; +import {filter, takeWhile} from 'rxjs/operators'; import {WORKER_BLANK_FN} from '../consts/worker-fn-template'; import {WorkerFunction} from '../types/worker-function'; -import {AnyNextSubject} from './any-next-subject'; -export class WebWorker extends AnyNextSubject { +export class WebWorker extends Subject { private worker!: Worker; constructor(private url: string, options?: WorkerOptions) { @@ -17,14 +16,15 @@ export class WebWorker extends AnyNextSubject { } fromEvent(this.worker, 'message') - .pipe(takeWhile(() => !this.isStopped)) + .pipe( + takeWhile(() => !this.isStopped), + filter(event => !!event.data), + ) .subscribe(event => { - if (event.data) { - if (event.data.hasOwnProperty('error')) { - this.error(event.data.error); - } else if (event.data.hasOwnProperty('result')) { - super.next(event.data.result); - } + if (event.data.hasOwnProperty('error')) { + this.error(event.data.error); + } else if (event.data.hasOwnProperty('result')) { + super.next(event.data.result); } }); @@ -35,7 +35,7 @@ export class WebWorker extends AnyNextSubject { }); } - public static fromFunction( + static fromFunction( fn: WorkerFunction, options?: WorkerOptions, ): WebWorker { @@ -56,7 +56,7 @@ export class WebWorker extends AnyNextSubject { super.complete(); } - next(value?: T) { + postMessage(value: T) { this.worker.postMessage(value); } } diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts index 5ae1702..38379fc 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -13,13 +13,11 @@ export class WorkerPipe implements PipeTransform { constructor(private workerExecutor: WorkerExecutor) {} transform(value: T, fn: WorkerFunction): Observable { - const worker = this.workers.has(fn) - ? (this.workers.get(fn) as WebWorker) - : this.workerExecutor.createWorker(fn); + const worker = this.workers.get(fn) || this.workerExecutor.createWorker(fn); this.workers.set(fn, worker); - worker.next(value); + worker.postMessage(value); return worker; } diff --git a/projects/workers/src/worker/services/worker-executor.service.spec.ts b/projects/workers/src/worker/services/worker-executor.service.spec.ts index 7752ebb..30d5639 100644 --- a/projects/workers/src/worker/services/worker-executor.service.spec.ts +++ b/projects/workers/src/worker/services/worker-executor.service.spec.ts @@ -30,7 +30,7 @@ describe('WorkerExecutorService', () => { const workerPromise = thread.pipe(take(1)).toPromise(); - thread.next('some data'); + thread.postMessage('some data'); expect(await workerPromise).toEqual('some data'); }, 10000); diff --git a/projects/workers/src/worker/services/worker-executor.service.ts b/projects/workers/src/worker/services/worker-executor.service.ts index 4c6da29..d2a0351 100644 --- a/projects/workers/src/worker/services/worker-executor.service.ts +++ b/projects/workers/src/worker/services/worker-executor.service.ts @@ -5,11 +5,11 @@ import {WorkerFunction} from '../types/worker-function'; @Injectable() export class WorkerExecutor { - execute(fn: WorkerFunction, data?: T): Promise { + execute(fn: WorkerFunction, data: T): Promise { const worker = this.createWorker(fn); const promise = worker.pipe(take(1)).toPromise(); - worker.next(data); + worker.postMessage(data); return promise; } From 6d10d1ad84a84c485c2805bf006f3c14d66e6b54 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Mon, 16 Nov 2020 21:24:01 +0300 Subject: [PATCH 16/22] chore(workers): remove unreachable code --- README.md | 8 ++-- projects/demo/src/app/app.component.ts | 16 +++----- .../src/worker/classes/web-worker.spec.ts | 9 +++++ .../workers/src/worker/classes/web-worker.ts | 11 +++++- .../src/worker/pipes/worker.pipe.spec.ts | 3 +- .../workers/src/worker/pipes/worker.pipe.ts | 5 +-- .../services/worker-executor.service.spec.ts | 37 ------------------- .../services/worker-executor.service.ts | 23 ------------ projects/workers/src/worker/worker.module.ts | 4 -- 9 files changed, 30 insertions(+), 86 deletions(-) delete mode 100644 projects/workers/src/worker/services/worker-executor.service.spec.ts delete mode 100644 projects/workers/src/worker/services/worker-executor.service.ts diff --git a/README.md b/README.md index e353a35..c172ca9 100644 --- a/README.md +++ b/README.md @@ -28,10 +28,10 @@ npm i @ng-web-apis/workers ## How it use -You can create worker with service and use it in a template with `AsyncPipe`: +You can create worker and use it in a template with `AsyncPipe`: ```typescript -import {WorkerExecutor, WebWorker} from '@ng-web-apis/workers'; +import {WebWorker} from '@ng-web-apis/workers'; function compute(data: number): number { return data ** 2; @@ -47,7 +47,7 @@ function compute(data: number): number { `, }) class SomeComponent { - readonly worker: WebWorker = workerExecutor.createWorker(compute); + readonly worker = WebWorker.fromFunction(compute); } ``` @@ -78,7 +78,7 @@ import {FormControl} from '@angular/forms'; class SomeComponent { value: string; - changeData(data: number): number { + changeData(data: number): string { return `${data} (changed)`; } } diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts index c10e6ca..5ff7eba 100644 --- a/projects/demo/src/app/app.component.ts +++ b/projects/demo/src/app/app.component.ts @@ -1,6 +1,6 @@ import {ChangeDetectionStrategy, Component} from '@angular/core'; -import {WebWorker, WorkerExecutor} from '@ng-web-apis/workers'; -import {Observable, Subject} from 'rxjs'; +import {WebWorker} from '@ng-web-apis/workers'; +import {Subject} from 'rxjs'; import {map} from 'rxjs/operators'; @Component({ @@ -10,15 +10,9 @@ import {map} from 'rxjs/operators'; changeDetection: ChangeDetectionStrategy.OnPush, }) export class AppComponent { - readonly workerThread: WebWorker; - readonly emitter: Subject; - readonly result$: Observable; - - constructor(webWorkerExecutor: WorkerExecutor) { - this.workerThread = webWorkerExecutor.createWorker(this.startCompute); - this.emitter = new Subject(); - this.result$ = this.emitter.pipe(map(this.startCompute)); - } + readonly workerThread = WebWorker.fromFunction(this.startCompute); + readonly emitter = new Subject(); + readonly result$ = this.emitter.pipe(map(this.startCompute)); startCompute(): number { const start = performance.now(); diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts index c915d54..ca82375 100644 --- a/projects/workers/src/worker/classes/web-worker.spec.ts +++ b/projects/workers/src/worker/classes/web-worker.spec.ts @@ -58,4 +58,13 @@ describe('WebWorker', () => { worker.postMessage('b'); expect(await promise).toEqual('a'); }); + + it('should run a worker and return a correct data', async () => { + const workerPromise: Promise = WebWorker.execute( + data => Promise.resolve().then(() => data), + 'some data', + ); + + expect(await workerPromise).toEqual('some data'); + }, 10000); }); diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index e3fb8d6..5751087 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -1,5 +1,5 @@ import {fromEvent, Subject} from 'rxjs'; -import {filter, takeWhile} from 'rxjs/operators'; +import {filter, take, takeWhile} from 'rxjs/operators'; import {WORKER_BLANK_FN} from '../consts/worker-fn-template'; import {WorkerFunction} from '../types/worker-function'; @@ -42,6 +42,15 @@ export class WebWorker extends Subject { return new WebWorker(WebWorker.createFnUrl(fn), options); } + static execute(fn: WorkerFunction, data: T): Promise { + const worker = WebWorker.fromFunction(fn); + const promise = worker.pipe(take(1)).toPromise(); + + worker.postMessage(data); + + return promise; + } + private static createFnUrl(fn: WorkerFunction): string { const script = `(${WORKER_BLANK_FN})(${fn.toString()});`; diff --git a/projects/workers/src/worker/pipes/worker.pipe.spec.ts b/projects/workers/src/worker/pipes/worker.pipe.spec.ts index e4c0fbc..5042038 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.spec.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.spec.ts @@ -1,12 +1,11 @@ import {take} from 'rxjs/operators'; -import {WorkerExecutor} from '../services/worker-executor.service'; import {WorkerPipe} from './worker.pipe'; describe('WorkerPipe', () => { let pipe: WorkerPipe; beforeEach(() => { - pipe = new WorkerPipe(new WorkerExecutor()); + pipe = new WorkerPipe(); }); it('should emit the first value', async () => { diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts index 38379fc..b10b157 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -1,7 +1,6 @@ import {Pipe, PipeTransform} from '@angular/core'; import {Observable} from 'rxjs'; import {WebWorker} from '../classes/web-worker'; -import {WorkerExecutor} from '../services/worker-executor.service'; import {WorkerFunction} from '../types/worker-function'; @Pipe({ @@ -10,10 +9,8 @@ import {WorkerFunction} from '../types/worker-function'; export class WorkerPipe implements PipeTransform { private workers = new WeakMap(); - constructor(private workerExecutor: WorkerExecutor) {} - transform(value: T, fn: WorkerFunction): Observable { - const worker = this.workers.get(fn) || this.workerExecutor.createWorker(fn); + const worker = this.workers.get(fn) || WebWorker.fromFunction(fn); this.workers.set(fn, worker); diff --git a/projects/workers/src/worker/services/worker-executor.service.spec.ts b/projects/workers/src/worker/services/worker-executor.service.spec.ts deleted file mode 100644 index 30d5639..0000000 --- a/projects/workers/src/worker/services/worker-executor.service.spec.ts +++ /dev/null @@ -1,37 +0,0 @@ -import {TestBed} from '@angular/core/testing'; -import {WorkerModule} from '@ng-web-apis/workers'; -import {take} from 'rxjs/operators'; -import {WorkerExecutor} from './worker-executor.service'; - -describe('WorkerExecutorService', () => { - let service: WorkerExecutor; - - beforeEach(() => { - TestBed.configureTestingModule({ - imports: [WorkerModule], - }); - - service = TestBed.get(WorkerExecutor); - }); - - it('should run worker and return correct data', async () => { - const workerPromise: Promise = service.execute( - data => Promise.resolve().then(() => data), - 'some data', - ); - - expect(await workerPromise).toEqual('some data'); - }, 10000); - - it('should create worker', async () => { - const thread = service.createWorker(data => - Promise.resolve(data), - ); - - const workerPromise = thread.pipe(take(1)).toPromise(); - - thread.postMessage('some data'); - - expect(await workerPromise).toEqual('some data'); - }, 10000); -}); diff --git a/projects/workers/src/worker/services/worker-executor.service.ts b/projects/workers/src/worker/services/worker-executor.service.ts deleted file mode 100644 index d2a0351..0000000 --- a/projects/workers/src/worker/services/worker-executor.service.ts +++ /dev/null @@ -1,23 +0,0 @@ -import {Injectable} from '@angular/core'; -import {take} from 'rxjs/operators'; -import {WebWorker} from '../classes/web-worker'; -import {WorkerFunction} from '../types/worker-function'; - -@Injectable() -export class WorkerExecutor { - execute(fn: WorkerFunction, data: T): Promise { - const worker = this.createWorker(fn); - const promise = worker.pipe(take(1)).toPromise(); - - worker.postMessage(data); - - return promise; - } - - createWorker( - fn: WorkerFunction, - options?: WorkerOptions, - ): WebWorker { - return WebWorker.fromFunction(fn, options); - } -} diff --git a/projects/workers/src/worker/worker.module.ts b/projects/workers/src/worker/worker.module.ts index 035bfba..e188f51 100644 --- a/projects/workers/src/worker/worker.module.ts +++ b/projects/workers/src/worker/worker.module.ts @@ -1,12 +1,8 @@ -import {CommonModule} from '@angular/common'; import {NgModule} from '@angular/core'; import {WorkerPipe} from './pipes/worker.pipe'; -import {WorkerExecutor} from './services/worker-executor.service'; @NgModule({ - imports: [CommonModule], declarations: [WorkerPipe], exports: [WorkerPipe], - providers: [WorkerExecutor], }) export class WorkerModule {} From 79df753c660ac945460a6e2ec2fad7c7184530e2 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Mon, 16 Nov 2020 21:31:27 +0300 Subject: [PATCH 17/22] chore(workers): don't know how it is possible --- projects/workers/src/public-api.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/projects/workers/src/public-api.ts b/projects/workers/src/public-api.ts index 747162b..de5ff5a 100644 --- a/projects/workers/src/public-api.ts +++ b/projects/workers/src/public-api.ts @@ -3,6 +3,5 @@ */ export * from './worker/worker.module'; export * from './worker/types/worker-function'; -export * from './worker/services/worker-executor.service'; export * from './worker/classes/web-worker'; export * from './worker/pipes/worker.pipe'; From 3888a0ba7f3ebc4ebf9ec9087391127b417762fe Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Mon, 16 Nov 2020 21:44:18 +0300 Subject: [PATCH 18/22] chore: add missed test --- .../workers/src/worker/classes/web-worker.spec.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts index ca82375..b6b3aa6 100644 --- a/projects/workers/src/worker/classes/web-worker.spec.ts +++ b/projects/workers/src/worker/classes/web-worker.spec.ts @@ -1,4 +1,5 @@ import {Observable} from 'rxjs'; +import {take} from 'rxjs/operators'; import {WebWorker} from './web-worker'; describe('WebWorker', () => { @@ -67,4 +68,16 @@ describe('WebWorker', () => { expect(await workerPromise).toEqual('some data'); }, 10000); + + it('should create worker', async () => { + const thread = WebWorker.fromFunction(data => + Promise.resolve(data), + ); + + const workerPromise = thread.pipe(take(1)).toPromise(); + + thread.postMessage('some data'); + + expect(await workerPromise).toEqual('some data'); + }, 10000); }); From 1369f19da295658c21cb4b97a19ceb0386651843 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Wed, 18 Nov 2020 23:12:12 +0300 Subject: [PATCH 19/22] chore: refactoring after review --- projects/demo/src/app/app.component.html | 2 +- projects/demo/src/app/app.component.ts | 3 +- projects/workers/src/public-api.ts | 9 ++- .../src/worker/classes/web-worker.spec.ts | 44 ++++++------ .../workers/src/worker/classes/web-worker.ts | 67 ++++++++++--------- .../src/worker/consts/worker-fn-template.ts | 16 +++-- .../workers/src/worker/operators/to-data.ts | 7 ++ .../src/worker/pipes/worker.pipe.spec.ts | 2 +- .../workers/src/worker/pipes/worker.pipe.ts | 9 ++- .../src/worker/types/typed-message-event.ts | 3 + 10 files changed, 99 insertions(+), 63 deletions(-) create mode 100644 projects/workers/src/worker/operators/to-data.ts create mode 100644 projects/workers/src/worker/types/typed-message-event.ts diff --git a/projects/demo/src/app/app.component.html b/projects/demo/src/app/app.component.html index 128742d..4355c48 100644 --- a/projects/demo/src/app/app.component.html +++ b/projects/demo/src/app/app.component.html @@ -2,7 +2,7 @@
-

Execution time: {{ workerThread | async }}

+

Execution time: {{ workerData$ | async }}

diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts index 5ff7eba..1d3bf31 100644 --- a/projects/demo/src/app/app.component.ts +++ b/projects/demo/src/app/app.component.ts @@ -1,5 +1,5 @@ import {ChangeDetectionStrategy, Component} from '@angular/core'; -import {WebWorker} from '@ng-web-apis/workers'; +import {toData, WebWorker} from '@ng-web-apis/workers'; import {Subject} from 'rxjs'; import {map} from 'rxjs/operators'; @@ -11,6 +11,7 @@ import {map} from 'rxjs/operators'; }) export class AppComponent { readonly workerThread = WebWorker.fromFunction(this.startCompute); + readonly workerData$ = this.workerThread.pipe(toData()); readonly emitter = new Subject(); readonly result$ = this.emitter.pipe(map(this.startCompute)); diff --git a/projects/workers/src/public-api.ts b/projects/workers/src/public-api.ts index de5ff5a..a7a39ba 100644 --- a/projects/workers/src/public-api.ts +++ b/projects/workers/src/public-api.ts @@ -1,7 +1,12 @@ /** * Public API Surface of @ng-web-apis/workers */ -export * from './worker/worker.module'; -export * from './worker/types/worker-function'; export * from './worker/classes/web-worker'; +export * from './worker/operators/to-data'; + export * from './worker/pipes/worker.pipe'; + +export * from './worker/types/worker-function'; +export * from './worker/types/typed-message-event'; + +export * from './worker/worker.module'; diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts index b6b3aa6..fa684de 100644 --- a/projects/workers/src/worker/classes/web-worker.spec.ts +++ b/projects/workers/src/worker/classes/web-worker.spec.ts @@ -1,7 +1,11 @@ import {Observable} from 'rxjs'; import {take} from 'rxjs/operators'; +import {TypedMessageEvent} from '../types/typed-message-event'; import {WebWorker} from './web-worker'; +// it is needed to ignore web worker errors +window.onerror = () => {}; + describe('WebWorker', () => { it('should fail if a worker is not available', async () => { const OriginalWorker = Worker; @@ -28,16 +32,6 @@ describe('WebWorker', () => { await expectAsync(worker.toPromise()).toBeRejected(); }); - it('should fail if an inner promise is rejected', async () => { - const worker = WebWorker.fromFunction(() => - Promise.reject('reason'), - ); - - worker.postMessage(); - - await expect(await worker.toPromise().catch(err => err)).toEqual('reason'); - }); - it('should resolve the last value before completing', async () => { const worker = WebWorker.fromFunction((data: string) => Promise.resolve(data)); @@ -45,9 +39,9 @@ describe('WebWorker', () => { .pipe(source => { return new Observable(subscriber => { source.subscribe({ - next(value: string) { - (source as WebWorker).complete(); - subscriber.next(value); + next({data}: TypedMessageEvent) { + (source as WebWorker).terminate(); + subscriber.next(data); subscriber.complete(); }, }); @@ -61,12 +55,12 @@ describe('WebWorker', () => { }); it('should run a worker and return a correct data', async () => { - const workerPromise: Promise = WebWorker.execute( - data => Promise.resolve().then(() => data), - 'some data', - ); + const workerPromise: Promise> = WebWorker.execute< + string, + string + >(data => Promise.resolve().then(() => data), 'some data'); - expect(await workerPromise).toEqual('some data'); + expect((await workerPromise).data).toEqual('some data'); }, 10000); it('should create worker', async () => { @@ -78,6 +72,18 @@ describe('WebWorker', () => { thread.postMessage('some data'); - expect(await workerPromise).toEqual('some data'); + expect((await workerPromise).data).toEqual('some data'); }, 10000); + + it('should fail if an inner promise is rejected', async () => { + const worker = WebWorker.fromFunction(() => + Promise.reject('reason'), + ); + + worker.postMessage(); + + expect(await worker.toPromise().catch(err => err.message)).toEqual( + 'Uncaught reason', + ); + }); }); diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index 5751087..3128534 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -1,38 +1,43 @@ -import {fromEvent, Subject} from 'rxjs'; -import {filter, take, takeWhile} from 'rxjs/operators'; +import {fromEvent, Observable} from 'rxjs'; +import {take, takeWhile} from 'rxjs/operators'; import {WORKER_BLANK_FN} from '../consts/worker-fn-template'; +import {TypedMessageEvent} from '../types/typed-message-event'; import {WorkerFunction} from '../types/worker-function'; -export class WebWorker extends Subject { - private worker!: Worker; +export class WebWorker extends Observable> { + private worker: Worker; + private url: string; - constructor(private url: string, options?: WorkerOptions) { - super(); + constructor(url: string, options?: WorkerOptions) { + let worker!: Worker; + let error: any; try { - this.worker = new Worker(url, options); + worker = new Worker(url, options); } catch (e) { - this.error(e); + error = e; } - fromEvent(this.worker, 'message') - .pipe( - takeWhile(() => !this.isStopped), - filter(event => !!event.data), - ) - .subscribe(event => { - if (event.data.hasOwnProperty('error')) { - this.error(event.data.error); - } else if (event.data.hasOwnProperty('result')) { - super.next(event.data.result); - } - }); - - fromEvent(this.worker, 'error') - .pipe(takeWhile(() => !this.isStopped)) - .subscribe(event => { - this.error(event.error); - }); + super(subscriber => { + if (error) { + subscriber.error(error); + } + + fromEvent>(this.worker, 'message') + .pipe(takeWhile(() => !subscriber.closed)) + .subscribe(event => { + subscriber.next(event); + }); + + fromEvent(this.worker, 'error') + .pipe(takeWhile(() => !subscriber.closed)) + .subscribe(event => { + subscriber.error(event); + }); + }); + + this.worker = worker; + this.url = url; } static fromFunction( @@ -42,7 +47,10 @@ export class WebWorker extends Subject { return new WebWorker(WebWorker.createFnUrl(fn), options); } - static execute(fn: WorkerFunction, data: T): Promise { + static execute( + fn: WorkerFunction, + data: T, + ): Promise> { const worker = WebWorker.fromFunction(fn); const promise = worker.pipe(take(1)).toPromise(); @@ -52,17 +60,16 @@ export class WebWorker extends Subject { } private static createFnUrl(fn: WorkerFunction): string { - const script = `(${WORKER_BLANK_FN})(${fn.toString()});`; + const script = `(${WORKER_BLANK_FN})(${fn});`; const blob = new Blob([script], {type: 'text/javascript'}); return URL.createObjectURL(blob); } - complete() { + terminate() { this.worker.terminate(); URL.revokeObjectURL(this.url); - super.complete(); } postMessage(value: T) { diff --git a/projects/workers/src/worker/consts/worker-fn-template.ts b/projects/workers/src/worker/consts/worker-fn-template.ts index a6b92f8..0d01e13 100644 --- a/projects/workers/src/worker/consts/worker-fn-template.ts +++ b/projects/workers/src/worker/consts/worker-fn-template.ts @@ -1,3 +1,5 @@ +// throw an error using the `setTimeout` function +// because web worker doesn't emit ErrorEvent from promises export const WORKER_BLANK_FN = ` function(fn){ function isFunction(type){ @@ -6,14 +8,14 @@ function(fn){ self.addEventListener('message', function(e) { var result = fn.call(null, e.data); - if(result && [typeof result.then, typeof result.catch].every(isFunction)){ - result.then(function(res){ - postMessage({result: res}); - }).catch(function(error){ - postMessage({error: error}); - }) + if (result && [typeof result.then, typeof result.catch].every(isFunction)){ + result + .then(postMessage) + .catch(function(error) { + setTimeout(function(){throw error}, 0) + }) } else { - postMessage({result: result}); + postMessage(result); } }) } diff --git a/projects/workers/src/worker/operators/to-data.ts b/projects/workers/src/worker/operators/to-data.ts new file mode 100644 index 0000000..0db7e1a --- /dev/null +++ b/projects/workers/src/worker/operators/to-data.ts @@ -0,0 +1,7 @@ +import {OperatorFunction} from 'rxjs'; +import {map} from 'rxjs/operators'; +import {TypedMessageEvent} from '../types/typed-message-event'; + +export function toData(): OperatorFunction, T> { + return map, T>(({data}) => data); +} diff --git a/projects/workers/src/worker/pipes/worker.pipe.spec.ts b/projects/workers/src/worker/pipes/worker.pipe.spec.ts index 5042038..7d31c68 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.spec.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.spec.ts @@ -14,7 +14,7 @@ describe('WorkerPipe', () => { .pipe(take(1)) .toPromise(); - expect(result).toEqual('a'); + expect(await result).toEqual('a'); }); it('should return the same worker for the same function', async () => { diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts index b10b157..4f63e1b 100644 --- a/projects/workers/src/worker/pipes/worker.pipe.ts +++ b/projects/workers/src/worker/pipes/worker.pipe.ts @@ -1,6 +1,7 @@ import {Pipe, PipeTransform} from '@angular/core'; import {Observable} from 'rxjs'; import {WebWorker} from '../classes/web-worker'; +import {toData} from '../operators/to-data'; import {WorkerFunction} from '../types/worker-function'; @Pipe({ @@ -8,14 +9,18 @@ import {WorkerFunction} from '../types/worker-function'; }) export class WorkerPipe implements PipeTransform { private workers = new WeakMap(); + private observers = new WeakMap>(); transform(value: T, fn: WorkerFunction): Observable { const worker = this.workers.get(fn) || WebWorker.fromFunction(fn); this.workers.set(fn, worker); - worker.postMessage(value); - return worker; + const observer = this.observers.get(worker) || worker.pipe(toData()); + + this.observers.set(worker, observer); + + return observer; } } diff --git a/projects/workers/src/worker/types/typed-message-event.ts b/projects/workers/src/worker/types/typed-message-event.ts new file mode 100644 index 0000000..492e03b --- /dev/null +++ b/projects/workers/src/worker/types/typed-message-event.ts @@ -0,0 +1,3 @@ +export interface TypedMessageEvent extends MessageEvent { + data: T; +} From d971e868042b902abbfe0546d9a5721758e6cf57 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Thu, 19 Nov 2020 11:03:48 +0300 Subject: [PATCH 20/22] chore: refactoring after review --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c172ca9..53743fc 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ import {FormControl} from '@angular/forms'; class SomeComponent { value: string; - changeData(data: number): string { + changeData(data: string): string { return `${data} (changed)`; } } From 7e9d7cd2a9fdfce7256a22fd62826974b4e52d56 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Thu, 19 Nov 2020 22:08:05 +0300 Subject: [PATCH 21/22] chore: review review review --- .../workers/src/worker/classes/web-worker.ts | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts index 3128534..b3d032c 100644 --- a/projects/workers/src/worker/classes/web-worker.ts +++ b/projects/workers/src/worker/classes/web-worker.ts @@ -1,15 +1,15 @@ -import {fromEvent, Observable} from 'rxjs'; -import {take, takeWhile} from 'rxjs/operators'; +import {EMPTY, fromEvent, merge, Observable} from 'rxjs'; +import {take, tap} from 'rxjs/operators'; import {WORKER_BLANK_FN} from '../consts/worker-fn-template'; import {TypedMessageEvent} from '../types/typed-message-event'; import {WorkerFunction} from '../types/worker-function'; export class WebWorker extends Observable> { - private worker: Worker; - private url: string; + private readonly worker: Worker | undefined; + private readonly url: string; constructor(url: string, options?: WorkerOptions) { - let worker!: Worker; + let worker: Worker | undefined; let error: any; try { @@ -23,17 +23,18 @@ export class WebWorker extends Observable subscriber.error(error); } - fromEvent>(this.worker, 'message') - .pipe(takeWhile(() => !subscriber.closed)) - .subscribe(event => { - subscriber.next(event); - }); - - fromEvent(this.worker, 'error') - .pipe(takeWhile(() => !subscriber.closed)) - .subscribe(event => { - subscriber.error(event); - }); + const eventStream$ = worker + ? merge( + fromEvent>(worker, 'message').pipe( + tap(event => subscriber.next(event)), + ), + fromEvent(worker, 'error').pipe( + tap(event => subscriber.error(event)), + ), + ) + : EMPTY; + + return eventStream$.subscribe(); }); this.worker = worker; @@ -68,11 +69,16 @@ export class WebWorker extends Observable } terminate() { - this.worker.terminate(); + if (this.worker) { + this.worker.terminate(); + } + URL.revokeObjectURL(this.url); } postMessage(value: T) { - this.worker.postMessage(value); + if (this.worker) { + this.worker.postMessage(value); + } } } From 9992fc090ba3059ee4c552e0bb5cece5a3570b17 Mon Sep 17 00:00:00 2001 From: IKatsuba Date: Thu, 19 Nov 2020 22:15:07 +0300 Subject: [PATCH 22/22] chore: fix coverage --- projects/workers/src/worker/classes/web-worker.spec.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts index fa684de..04dee8b 100644 --- a/projects/workers/src/worker/classes/web-worker.spec.ts +++ b/projects/workers/src/worker/classes/web-worker.spec.ts @@ -12,7 +12,10 @@ describe('WebWorker', () => { delete (window as any).Worker; - const worker = WebWorker.fromFunction(d => d); + const worker = WebWorker.fromFunction(d => d); + + expect(() => worker.terminate()).not.toThrow(); + expect(() => worker.postMessage()).not.toThrow(); await expectAsync(worker.toPromise()).toBeRejected();