diff --git a/README.md b/README.md
index 406cc48..53743fc 100644
--- a/README.md
+++ b/README.md
@@ -26,9 +26,63 @@ Now install the package:
npm i @ng-web-apis/workers
```
-## Usage
+## How it use
-TBD
+You can create worker and use it in a template with `AsyncPipe`:
+
+```typescript
+import {WebWorker} from '@ng-web-apis/workers';
+
+function compute(data: number): number {
+ return data ** 2;
+}
+
+@Component({
+ template: `
+ Computed Result: {{ worker | async }}
+
+ `,
+})
+class SomeComponent {
+ readonly worker = WebWorker.fromFunction(compute);
+}
+```
+
+It's the same with `WorkerPipe` only:
+
+```typescript
+import {WorkerModule} from '@ng-web-apis/workers';
+import {NgModule} from '@angular/core';
+
+@NgModule({
+ imports: [WorkerModule],
+ declarations: [SomeComponent],
+})
+class SomeModule {}
+```
+
+```typescript
+import {WorkerExecutor, WebWorker} from '@ng-web-apis/workers';
+import {FormControl} from '@angular/forms';
+
+@Component({
+ template: `
+ Computed Result: {{ value | waWorker: changeData | async }}
+
+
+ `,
+})
+class SomeComponent {
+ value: string;
+
+ changeData(data: string): string {
+ return `${data} (changed)`;
+ }
+}
+```
## See also
diff --git a/projects/demo/src/app/app.browser.module.ts b/projects/demo/src/app/app.browser.module.ts
index 4d353f0..5efaf73 100644
--- a/projects/demo/src/app/app.browser.module.ts
+++ b/projects/demo/src/app/app.browser.module.ts
@@ -7,8 +7,10 @@ import {
import {NgModule} from '@angular/core';
import {FormsModule} from '@angular/forms';
import {BrowserModule} from '@angular/platform-browser';
+import {WorkerModule} from '@ng-web-apis/workers';
import {AppComponent} from './app.component';
import {AppRoutingModule} from './app.routes';
+import {ClockComponent} from './clock.component';
@NgModule({
bootstrap: [AppComponent],
@@ -17,8 +19,9 @@ import {AppRoutingModule} from './app.routes';
FormsModule,
BrowserModule.withServerTransition({appId: 'demo'}),
AppRoutingModule,
+ WorkerModule,
],
- declarations: [AppComponent],
+ declarations: [AppComponent, ClockComponent],
providers: [
{
provide: LocationStrategy,
diff --git a/projects/demo/src/app/app.component.html b/projects/demo/src/app/app.component.html
index e69de29..4355c48 100644
--- a/projects/demo/src/app/app.component.html
+++ b/projects/demo/src/app/app.component.html
@@ -0,0 +1,10 @@
+
+
+
+
+
Execution time: {{ workerData$ | async }}
+
+
+
+
Execution time: {{ result$ | async }}
+
diff --git a/projects/demo/src/app/app.component.less b/projects/demo/src/app/app.component.less
index 9fc56c2..1e8a83c 100644
--- a/projects/demo/src/app/app.component.less
+++ b/projects/demo/src/app/app.component.less
@@ -5,8 +5,9 @@
align-items: center;
}
-.observer {
- background: skyblue;
- border-radius: 16px;
- padding: 40px;
+.example {
+ min-width: 360px;
+ border-top: 1px solid gainsboro;
+ margin-top: 16px;
+ padding-top: 16px;
}
diff --git a/projects/demo/src/app/app.component.ts b/projects/demo/src/app/app.component.ts
index 67d9132..1d3bf31 100644
--- a/projects/demo/src/app/app.component.ts
+++ b/projects/demo/src/app/app.component.ts
@@ -1,4 +1,7 @@
import {ChangeDetectionStrategy, Component} from '@angular/core';
+import {toData, WebWorker} from '@ng-web-apis/workers';
+import {Subject} from 'rxjs';
+import {map} from 'rxjs/operators';
@Component({
selector: 'main',
@@ -6,4 +9,19 @@ import {ChangeDetectionStrategy, Component} from '@angular/core';
styleUrls: ['./app.component.less'],
changeDetection: ChangeDetectionStrategy.OnPush,
})
-export class AppComponent {}
+export class AppComponent {
+ readonly workerThread = WebWorker.fromFunction(this.startCompute);
+ readonly workerData$ = this.workerThread.pipe(toData());
+ readonly emitter = new Subject();
+ readonly result$ = this.emitter.pipe(map(this.startCompute));
+
+ startCompute(): number {
+ const start = performance.now();
+
+ Array.from({length: 16000}).forEach((_, index) =>
+ Array.from({length: index}).reduce((sum: number) => sum + 1, 0),
+ );
+
+ return performance.now() - start;
+ }
+}
diff --git a/projects/demo/src/app/clock.component.ts b/projects/demo/src/app/clock.component.ts
new file mode 100644
index 0000000..10915a5
--- /dev/null
+++ b/projects/demo/src/app/clock.component.ts
@@ -0,0 +1,14 @@
+import {ChangeDetectionStrategy, Component} from '@angular/core';
+import {Observable, timer} from 'rxjs';
+import {map} from 'rxjs/operators';
+
+@Component({
+ selector: 'app-clock',
+ template: `
+ {{ date$ | async | date: 'mediumTime' }}
+ `,
+ changeDetection: ChangeDetectionStrategy.OnPush,
+})
+export class ClockComponent {
+ readonly date$: Observable = timer(0, 1000).pipe(map(() => Date.now()));
+}
diff --git a/projects/workers/src/public-api.ts b/projects/workers/src/public-api.ts
index 8e9f6fc..a7a39ba 100644
--- a/projects/workers/src/public-api.ts
+++ b/projects/workers/src/public-api.ts
@@ -1,3 +1,12 @@
/**
* Public API Surface of @ng-web-apis/workers
*/
+export * from './worker/classes/web-worker';
+export * from './worker/operators/to-data';
+
+export * from './worker/pipes/worker.pipe';
+
+export * from './worker/types/worker-function';
+export * from './worker/types/typed-message-event';
+
+export * from './worker/worker.module';
diff --git a/projects/workers/src/worker/classes/web-worker.spec.ts b/projects/workers/src/worker/classes/web-worker.spec.ts
new file mode 100644
index 0000000..04dee8b
--- /dev/null
+++ b/projects/workers/src/worker/classes/web-worker.spec.ts
@@ -0,0 +1,92 @@
+import {Observable} from 'rxjs';
+import {take} from 'rxjs/operators';
+import {TypedMessageEvent} from '../types/typed-message-event';
+import {WebWorker} from './web-worker';
+
+// it is needed to ignore web worker errors
+window.onerror = () => {};
+
+describe('WebWorker', () => {
+ it('should fail if a worker is not available', async () => {
+ const OriginalWorker = Worker;
+
+ delete (window as any).Worker;
+
+ const worker = WebWorker.fromFunction(d => d);
+
+ expect(() => worker.terminate()).not.toThrow();
+ expect(() => worker.postMessage()).not.toThrow();
+
+ await expectAsync(worker.toPromise()).toBeRejected();
+
+ (window as any).Worker = OriginalWorker;
+ });
+
+ it('should create worker from a function', () => {
+ const worker = WebWorker.fromFunction(d => d);
+
+ expect(worker instanceof WebWorker).toEqual(true);
+ expect((worker as any).worker instanceof Worker).toEqual(true);
+ });
+
+ it('should trigger an error if URL was not found', async () => {
+ const worker = new WebWorker('some/wrong/url');
+
+ await expectAsync(worker.toPromise()).toBeRejected();
+ });
+
+ it('should resolve the last value before completing', async () => {
+ const worker = WebWorker.fromFunction((data: string) => Promise.resolve(data));
+
+ const promise = worker
+ .pipe(source => {
+ return new Observable(subscriber => {
+ source.subscribe({
+ next({data}: TypedMessageEvent) {
+ (source as WebWorker).terminate();
+ subscriber.next(data);
+ subscriber.complete();
+ },
+ });
+ });
+ })
+ .toPromise();
+
+ worker.postMessage('a');
+ worker.postMessage('b');
+ expect(await promise).toEqual('a');
+ });
+
+ it('should run a worker and return a correct data', async () => {
+ const workerPromise: Promise> = WebWorker.execute<
+ string,
+ string
+ >(data => Promise.resolve().then(() => data), 'some data');
+
+ expect((await workerPromise).data).toEqual('some data');
+ }, 10000);
+
+ it('should create worker', async () => {
+ const thread = WebWorker.fromFunction(data =>
+ Promise.resolve(data),
+ );
+
+ const workerPromise = thread.pipe(take(1)).toPromise();
+
+ thread.postMessage('some data');
+
+ expect((await workerPromise).data).toEqual('some data');
+ }, 10000);
+
+ it('should fail if an inner promise is rejected', async () => {
+ const worker = WebWorker.fromFunction(() =>
+ Promise.reject('reason'),
+ );
+
+ worker.postMessage();
+
+ expect(await worker.toPromise().catch(err => err.message)).toEqual(
+ 'Uncaught reason',
+ );
+ });
+});
diff --git a/projects/workers/src/worker/classes/web-worker.ts b/projects/workers/src/worker/classes/web-worker.ts
new file mode 100644
index 0000000..b3d032c
--- /dev/null
+++ b/projects/workers/src/worker/classes/web-worker.ts
@@ -0,0 +1,84 @@
+import {EMPTY, fromEvent, merge, Observable} from 'rxjs';
+import {take, tap} from 'rxjs/operators';
+import {WORKER_BLANK_FN} from '../consts/worker-fn-template';
+import {TypedMessageEvent} from '../types/typed-message-event';
+import {WorkerFunction} from '../types/worker-function';
+
+export class WebWorker extends Observable> {
+ private readonly worker: Worker | undefined;
+ private readonly url: string;
+
+ constructor(url: string, options?: WorkerOptions) {
+ let worker: Worker | undefined;
+ let error: any;
+
+ try {
+ worker = new Worker(url, options);
+ } catch (e) {
+ error = e;
+ }
+
+ super(subscriber => {
+ if (error) {
+ subscriber.error(error);
+ }
+
+ const eventStream$ = worker
+ ? merge(
+ fromEvent>(worker, 'message').pipe(
+ tap(event => subscriber.next(event)),
+ ),
+ fromEvent(worker, 'error').pipe(
+ tap(event => subscriber.error(event)),
+ ),
+ )
+ : EMPTY;
+
+ return eventStream$.subscribe();
+ });
+
+ this.worker = worker;
+ this.url = url;
+ }
+
+ static fromFunction(
+ fn: WorkerFunction,
+ options?: WorkerOptions,
+ ): WebWorker {
+ return new WebWorker(WebWorker.createFnUrl(fn), options);
+ }
+
+ static execute(
+ fn: WorkerFunction,
+ data: T,
+ ): Promise> {
+ const worker = WebWorker.fromFunction(fn);
+ const promise = worker.pipe(take(1)).toPromise();
+
+ worker.postMessage(data);
+
+ return promise;
+ }
+
+ private static createFnUrl(fn: WorkerFunction): string {
+ const script = `(${WORKER_BLANK_FN})(${fn});`;
+
+ const blob = new Blob([script], {type: 'text/javascript'});
+
+ return URL.createObjectURL(blob);
+ }
+
+ terminate() {
+ if (this.worker) {
+ this.worker.terminate();
+ }
+
+ URL.revokeObjectURL(this.url);
+ }
+
+ postMessage(value: T) {
+ if (this.worker) {
+ this.worker.postMessage(value);
+ }
+ }
+}
diff --git a/projects/workers/src/worker/consts/worker-fn-template.ts b/projects/workers/src/worker/consts/worker-fn-template.ts
new file mode 100644
index 0000000..0d01e13
--- /dev/null
+++ b/projects/workers/src/worker/consts/worker-fn-template.ts
@@ -0,0 +1,22 @@
+// throw an error using the `setTimeout` function
+// because web worker doesn't emit ErrorEvent from promises
+export const WORKER_BLANK_FN = `
+function(fn){
+ function isFunction(type){
+ return type === 'function';
+ }
+
+ self.addEventListener('message', function(e) {
+ var result = fn.call(null, e.data);
+ if (result && [typeof result.then, typeof result.catch].every(isFunction)){
+ result
+ .then(postMessage)
+ .catch(function(error) {
+ setTimeout(function(){throw error}, 0)
+ })
+ } else {
+ postMessage(result);
+ }
+ })
+}
+`;
diff --git a/projects/workers/src/worker/operators/to-data.ts b/projects/workers/src/worker/operators/to-data.ts
new file mode 100644
index 0000000..0db7e1a
--- /dev/null
+++ b/projects/workers/src/worker/operators/to-data.ts
@@ -0,0 +1,7 @@
+import {OperatorFunction} from 'rxjs';
+import {map} from 'rxjs/operators';
+import {TypedMessageEvent} from '../types/typed-message-event';
+
+export function toData(): OperatorFunction, T> {
+ return map, T>(({data}) => data);
+}
diff --git a/projects/workers/src/worker/pipes/worker.pipe.spec.ts b/projects/workers/src/worker/pipes/worker.pipe.spec.ts
new file mode 100644
index 0000000..7d31c68
--- /dev/null
+++ b/projects/workers/src/worker/pipes/worker.pipe.spec.ts
@@ -0,0 +1,35 @@
+import {take} from 'rxjs/operators';
+import {WorkerPipe} from './worker.pipe';
+
+describe('WorkerPipe', () => {
+ let pipe: WorkerPipe;
+
+ beforeEach(() => {
+ pipe = new WorkerPipe();
+ });
+
+ it('should emit the first value', async () => {
+ const result = await pipe
+ .transform('a', data => data)
+ .pipe(take(1))
+ .toPromise();
+
+ expect(await result).toEqual('a');
+ });
+
+ it('should return the same worker for the same function', async () => {
+ const workerFn = (data: unknown) => data;
+
+ const worker = await pipe.transform('a', workerFn);
+ const theSameWorker = await pipe.transform('a', workerFn);
+
+ expect(worker).toEqual(theSameWorker);
+ });
+
+ it('should return a different worker for a different function', async () => {
+ const worker = await pipe.transform('a', (data: unknown) => data);
+ const differentWorker = await pipe.transform('a', (data: unknown) => data);
+
+ expect(worker).not.toEqual(differentWorker);
+ });
+});
diff --git a/projects/workers/src/worker/pipes/worker.pipe.ts b/projects/workers/src/worker/pipes/worker.pipe.ts
new file mode 100644
index 0000000..4f63e1b
--- /dev/null
+++ b/projects/workers/src/worker/pipes/worker.pipe.ts
@@ -0,0 +1,26 @@
+import {Pipe, PipeTransform} from '@angular/core';
+import {Observable} from 'rxjs';
+import {WebWorker} from '../classes/web-worker';
+import {toData} from '../operators/to-data';
+import {WorkerFunction} from '../types/worker-function';
+
+@Pipe({
+ name: 'waWorker',
+})
+export class WorkerPipe implements PipeTransform {
+ private workers = new WeakMap();
+ private observers = new WeakMap>();
+
+ transform(value: T, fn: WorkerFunction): Observable {
+ const worker = this.workers.get(fn) || WebWorker.fromFunction(fn);
+
+ this.workers.set(fn, worker);
+ worker.postMessage(value);
+
+ const observer = this.observers.get(worker) || worker.pipe(toData());
+
+ this.observers.set(worker, observer);
+
+ return observer;
+ }
+}
diff --git a/projects/workers/src/worker/types/typed-message-event.ts b/projects/workers/src/worker/types/typed-message-event.ts
new file mode 100644
index 0000000..492e03b
--- /dev/null
+++ b/projects/workers/src/worker/types/typed-message-event.ts
@@ -0,0 +1,3 @@
+export interface TypedMessageEvent extends MessageEvent {
+ data: T;
+}
diff --git a/projects/workers/src/worker/types/worker-function.ts b/projects/workers/src/worker/types/worker-function.ts
new file mode 100644
index 0000000..854e555
--- /dev/null
+++ b/projects/workers/src/worker/types/worker-function.ts
@@ -0,0 +1 @@
+export type WorkerFunction = (data: T) => R | Promise;
diff --git a/projects/workers/src/worker/worker.module.ts b/projects/workers/src/worker/worker.module.ts
new file mode 100644
index 0000000..e188f51
--- /dev/null
+++ b/projects/workers/src/worker/worker.module.ts
@@ -0,0 +1,8 @@
+import {NgModule} from '@angular/core';
+import {WorkerPipe} from './pipes/worker.pipe';
+
+@NgModule({
+ declarations: [WorkerPipe],
+ exports: [WorkerPipe],
+})
+export class WorkerModule {}