diff --git a/jsonrpc/src/common/disposable.ts b/jsonrpc/src/common/disposable.ts index 97139d34..4c7b776f 100644 --- a/jsonrpc/src/common/disposable.ts +++ b/jsonrpc/src/common/disposable.ts @@ -17,3 +17,40 @@ export namespace Disposable { }; } } + +export class DisposableStore implements Disposable { + + private isDisposed: boolean; + private readonly disposables: Set; + + constructor() { + this.isDisposed = false; + this.disposables = new Set(); + } + + /** + * Dispose of all registered disposables and mark this object as disposed. + * + * Any future disposables added to this object will be disposed of on `add`. + */ + public dispose(): void { + if (this.isDisposed || this.disposables.size === 0) { + return; + } + try { + this.disposables.forEach(item => item.dispose()); + } finally { + this.isDisposed = true; + this.disposables.clear(); + } + } + + public add(t: T): T { + if (this.isDisposed) { + t.dispose(); + } else { + this.disposables.add(t); + } + return t; + } +} \ No newline at end of file diff --git a/jsonrpc/src/common/messageReader.ts b/jsonrpc/src/common/messageReader.ts index 3e917551..ca57ef19 100644 --- a/jsonrpc/src/common/messageReader.ts +++ b/jsonrpc/src/common/messageReader.ts @@ -11,6 +11,7 @@ import { Message } from './messages'; import { ContentDecoder, ContentTypeDecoder } from './encoding'; import { Disposable } from './api'; import { Semaphore } from './semaphore'; +import { DisposableStore } from './disposable'; /** * A callback that receives each incoming JSON-RPC message. @@ -170,7 +171,7 @@ export class ReadableStreamMessageReader extends AbstractMessageReader { private readable: RAL.ReadableStream; private options: ResolvedMessageReaderOptions; - private callback!: DataCallback; + private callback: DataCallback | undefined; private nextMessageLength: number; private messageToken: number; @@ -179,15 +180,29 @@ export class ReadableStreamMessageReader extends AbstractMessageReader { private _partialMessageTimeout: number; private readSemaphore: Semaphore; + private disposables = new DisposableStore(); public constructor(readable: RAL.ReadableStream, options?: RAL.MessageBufferEncoding | MessageReaderOptions) { super(); this.readable = readable; this.options = ResolvedMessageReaderOptions.fromOptions(options); this.buffer = RAL().messageBuffer.create(this.options.charset); this._partialMessageTimeout = 10000; + this.partialMessageTimer = undefined; this.nextMessageLength = -1; this.messageToken = 0; this.readSemaphore = new Semaphore(1); + + this.disposables.add(this.readable.onData((data: Uint8Array) => { + if(this.callback){this.onData(data);} + })); + this.disposables.add(this.readable.onError((error: any) => this.fireError(error))); + this.disposables.add(this.readable.onClose(() => this.fireClose())); + } + + public dispose(): void { + super.dispose(); + this.disposables.dispose(); + this.callback = undefined; } public set partialMessageTimeout(timeout: number) { @@ -199,16 +214,11 @@ export class ReadableStreamMessageReader extends AbstractMessageReader { } public listen(callback: DataCallback): Disposable { - this.nextMessageLength = -1; - this.messageToken = 0; - this.partialMessageTimer = undefined; + if (this.callback !== undefined) { + throw new Error('Reader can only listen once.'); + } this.callback = callback; - const result = this.readable.onData((data: Uint8Array) => { - this.onData(data); - }); - this.readable.onError((error: any) => this.fireError(error)); - this.readable.onClose(() => this.fireClose()); - return result; + return Disposable.create(() => this.callback = undefined); } private onData(data: Uint8Array): void { @@ -250,7 +260,7 @@ export class ReadableStreamMessageReader extends AbstractMessageReader { ? await this.options.contentDecoder.decode(body) : body; const message = await this.options.contentTypeDecoder.decode(bytes, this.options); - this.callback(message); + this.callback!(message); }).catch((error) => { this.fireError(error); });