Skip to content
This repository has been archived by the owner on May 29, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1 from ng-web-apis/feature/webworker
Browse files Browse the repository at this point in the history
feat: added base classes
  • Loading branch information
IKatsuba authored Nov 20, 2020
2 parents bd0b18e + 9992fc0 commit 84cb368
Show file tree
Hide file tree
Showing 16 changed files with 395 additions and 8 deletions.
58 changes: 56 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,63 @@ Now install the package:
npm i @ng-web-apis/workers
```

## Usage
## How it use

TBD
You can create worker and use it in a template with `AsyncPipe`:

```typescript
import {WebWorker} from '@ng-web-apis/workers';

function compute(data: number): number {
return data ** 2;
}

@Component({
template: `
Computed Result: {{ worker | async }}
<form (ngSubmit)="worker.postMessage(input.value)">
<input #input />
<button type="submit">Send to worker</button>
</form>
`,
})
class SomeComponent {
readonly worker = WebWorker.fromFunction<number, number>(compute);
}
```

It's the same with `WorkerPipe` only:

```typescript
import {WorkerModule} from '@ng-web-apis/workers';
import {NgModule} from '@angular/core';

@NgModule({
imports: [WorkerModule],
declarations: [SomeComponent],
})
class SomeModule {}
```

```typescript
import {WorkerExecutor, WebWorker} from '@ng-web-apis/workers';
import {FormControl} from '@angular/forms';

@Component({
template: `
Computed Result: {{ value | waWorker: changeData | async }}
<input [(ngModel)]="value" />
`,
})
class SomeComponent {
value: string;

changeData(data: string): string {
return `${data} (changed)`;
}
}
```

## See also

Expand Down
5 changes: 4 additions & 1 deletion projects/demo/src/app/app.browser.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import {
import {NgModule} from '@angular/core';
import {FormsModule} from '@angular/forms';
import {BrowserModule} from '@angular/platform-browser';
import {WorkerModule} from '@ng-web-apis/workers';
import {AppComponent} from './app.component';
import {AppRoutingModule} from './app.routes';
import {ClockComponent} from './clock.component';

@NgModule({
bootstrap: [AppComponent],
Expand All @@ -17,8 +19,9 @@ import {AppRoutingModule} from './app.routes';
FormsModule,
BrowserModule.withServerTransition({appId: 'demo'}),
AppRoutingModule,
WorkerModule,
],
declarations: [AppComponent],
declarations: [AppComponent, ClockComponent],
providers: [
{
provide: LocationStrategy,
Expand Down
10 changes: 10 additions & 0 deletions projects/demo/src/app/app.component.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<app-clock></app-clock>

<div class="example">
<button (click)="workerThread.postMessage()">Run in worker process</button>
<p>Execution time: {{ workerData$ | async }}</p>
</div>
<div class="example">
<button (click)="emitter.next()">Run in main process</button>
<p>Execution time: {{ result$ | async }}</p>
</div>
9 changes: 5 additions & 4 deletions projects/demo/src/app/app.component.less
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
align-items: center;
}

.observer {
background: skyblue;
border-radius: 16px;
padding: 40px;
.example {
min-width: 360px;
border-top: 1px solid gainsboro;
margin-top: 16px;
padding-top: 16px;
}
20 changes: 19 additions & 1 deletion projects/demo/src/app/app.component.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import {ChangeDetectionStrategy, Component} from '@angular/core';
import {toData, WebWorker} from '@ng-web-apis/workers';
import {Subject} from 'rxjs';
import {map} from 'rxjs/operators';

@Component({
selector: 'main',
templateUrl: './app.component.html',
styleUrls: ['./app.component.less'],
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class AppComponent {}
export class AppComponent {
readonly workerThread = WebWorker.fromFunction<void, number>(this.startCompute);
readonly workerData$ = this.workerThread.pipe(toData());
readonly emitter = new Subject<void>();
readonly result$ = this.emitter.pipe(map(this.startCompute));

startCompute(): number {
const start = performance.now();

Array.from({length: 16000}).forEach((_, index) =>
Array.from({length: index}).reduce<number>((sum: number) => sum + 1, 0),
);

return performance.now() - start;
}
}
14 changes: 14 additions & 0 deletions projects/demo/src/app/clock.component.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {ChangeDetectionStrategy, Component} from '@angular/core';
import {Observable, timer} from 'rxjs';
import {map} from 'rxjs/operators';

@Component({
selector: 'app-clock',
template: `
{{ date$ | async | date: 'mediumTime' }}
`,
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ClockComponent {
readonly date$: Observable<number> = timer(0, 1000).pipe(map(() => Date.now()));
}
9 changes: 9 additions & 0 deletions projects/workers/src/public-api.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Public API Surface of @ng-web-apis/workers
*/
export * from './worker/classes/web-worker';
export * from './worker/operators/to-data';

export * from './worker/pipes/worker.pipe';

export * from './worker/types/worker-function';
export * from './worker/types/typed-message-event';

export * from './worker/worker.module';
92 changes: 92 additions & 0 deletions projects/workers/src/worker/classes/web-worker.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import {Observable} from 'rxjs';
import {take} from 'rxjs/operators';
import {TypedMessageEvent} from '../types/typed-message-event';
import {WebWorker} from './web-worker';

// it is needed to ignore web worker errors
window.onerror = () => {};

describe('WebWorker', () => {
it('should fail if a worker is not available', async () => {
const OriginalWorker = Worker;

delete (window as any).Worker;

const worker = WebWorker.fromFunction<void, void>(d => d);

expect(() => worker.terminate()).not.toThrow();
expect(() => worker.postMessage()).not.toThrow();

await expectAsync(worker.toPromise()).toBeRejected();

(window as any).Worker = OriginalWorker;
});

it('should create worker from a function', () => {
const worker = WebWorker.fromFunction(d => d);

expect(worker instanceof WebWorker).toEqual(true);
expect((worker as any).worker instanceof Worker).toEqual(true);
});

it('should trigger an error if URL was not found', async () => {
const worker = new WebWorker('some/wrong/url');

await expectAsync(worker.toPromise()).toBeRejected();
});

it('should resolve the last value before completing', async () => {
const worker = WebWorker.fromFunction((data: string) => Promise.resolve(data));

const promise = worker
.pipe(source => {
return new Observable(subscriber => {
source.subscribe({
next({data}: TypedMessageEvent<string>) {
(source as WebWorker).terminate();
subscriber.next(data);
subscriber.complete();
},
});
});
})
.toPromise();

worker.postMessage('a');
worker.postMessage('b');
expect(await promise).toEqual('a');
});

it('should run a worker and return a correct data', async () => {
const workerPromise: Promise<TypedMessageEvent<string>> = WebWorker.execute<
string,
string
>(data => Promise.resolve().then(() => data), 'some data');

expect((await workerPromise).data).toEqual('some data');
}, 10000);

it('should create worker', async () => {
const thread = WebWorker.fromFunction<string, string>(data =>
Promise.resolve(data),
);

const workerPromise = thread.pipe(take(1)).toPromise();

thread.postMessage('some data');

expect((await workerPromise).data).toEqual('some data');
}, 10000);

it('should fail if an inner promise is rejected', async () => {
const worker = WebWorker.fromFunction<void, string>(() =>
Promise.reject('reason'),
);

worker.postMessage();

expect(await worker.toPromise().catch(err => err.message)).toEqual(
'Uncaught reason',
);
});
});
84 changes: 84 additions & 0 deletions projects/workers/src/worker/classes/web-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import {EMPTY, fromEvent, merge, Observable} from 'rxjs';
import {take, tap} from 'rxjs/operators';
import {WORKER_BLANK_FN} from '../consts/worker-fn-template';
import {TypedMessageEvent} from '../types/typed-message-event';
import {WorkerFunction} from '../types/worker-function';

export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>> {
private readonly worker: Worker | undefined;
private readonly url: string;

constructor(url: string, options?: WorkerOptions) {
let worker: Worker | undefined;
let error: any;

try {
worker = new Worker(url, options);
} catch (e) {
error = e;
}

super(subscriber => {
if (error) {
subscriber.error(error);
}

const eventStream$ = worker
? merge(
fromEvent<TypedMessageEvent<R>>(worker, 'message').pipe(
tap(event => subscriber.next(event)),
),
fromEvent<ErrorEvent>(worker, 'error').pipe(
tap(event => subscriber.error(event)),
),
)
: EMPTY;

return eventStream$.subscribe();
});

this.worker = worker;
this.url = url;
}

static fromFunction<T, R>(
fn: WorkerFunction<T, R>,
options?: WorkerOptions,
): WebWorker<T, R> {
return new WebWorker<T, R>(WebWorker.createFnUrl(fn), options);
}

static execute<T, R>(
fn: WorkerFunction<T, R>,
data: T,
): Promise<TypedMessageEvent<R>> {
const worker = WebWorker.fromFunction(fn);
const promise = worker.pipe(take(1)).toPromise();

worker.postMessage(data);

return promise;
}

private static createFnUrl(fn: WorkerFunction): string {
const script = `(${WORKER_BLANK_FN})(${fn});`;

const blob = new Blob([script], {type: 'text/javascript'});

return URL.createObjectURL(blob);
}

terminate() {
if (this.worker) {
this.worker.terminate();
}

URL.revokeObjectURL(this.url);
}

postMessage(value: T) {
if (this.worker) {
this.worker.postMessage(value);
}
}
}
22 changes: 22 additions & 0 deletions projects/workers/src/worker/consts/worker-fn-template.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// throw an error using the `setTimeout` function
// because web worker doesn't emit ErrorEvent from promises
export const WORKER_BLANK_FN = `
function(fn){
function isFunction(type){
return type === 'function';
}
self.addEventListener('message', function(e) {
var result = fn.call(null, e.data);
if (result && [typeof result.then, typeof result.catch].every(isFunction)){
result
.then(postMessage)
.catch(function(error) {
setTimeout(function(){throw error}, 0)
})
} else {
postMessage(result);
}
})
}
`;
7 changes: 7 additions & 0 deletions projects/workers/src/worker/operators/to-data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {OperatorFunction} from 'rxjs';
import {map} from 'rxjs/operators';
import {TypedMessageEvent} from '../types/typed-message-event';

export function toData<T>(): OperatorFunction<TypedMessageEvent<T>, T> {
return map<TypedMessageEvent<T>, T>(({data}) => data);
}
Loading

0 comments on commit 84cb368

Please sign in to comment.