diff --git a/src/transports/ipc.js b/src/transports/ipc.js index f050a01..073afcd 100644 --- a/src/transports/ipc.js +++ b/src/transports/ipc.js @@ -66,9 +66,10 @@ function encode(op, data) { return packet; } -const working = { - full: '', +const accumulatedData = { + payload: Buffer.alloc(0), op: undefined, + expectedLength: 0, }; function decode(socket, callback) { @@ -77,23 +78,46 @@ function decode(socket, callback) { return; } - let { op } = working; - let raw; - if (working.full === '') { - op = working.op = packet.readInt32LE(0); - const len = packet.readInt32LE(4); - raw = packet.slice(8, len + 8); - } else { - raw = packet.toString(); - } + accumulatedData.payload = Buffer.concat([accumulatedData.payload, packet]); - try { - const data = JSON.parse(working.full + raw); - callback({ op, data }); // eslint-disable-line callback-return - working.full = ''; - working.op = undefined; - } catch (err) { - working.full += raw; + while (accumulatedData.payload.length > 0) { + if (accumulatedData.expectedLength === 0) { + // We are at the start of a new payload + accumulatedData.op = accumulatedData.payload.readInt32LE(0); + accumulatedData.expectedLength = accumulatedData.payload.readInt32LE(4); + accumulatedData.payload = accumulatedData.payload.subarray(8); // Remove opcode and length + } + + if (accumulatedData.payload.length < accumulatedData.expectedLength) { + // Full payload hasn't been received yet, wait for more data + break; + } + + // Accumulated data has the full payload and possibly the beginning of the next payload + const currentPayload = accumulatedData.payload.subarray(0, accumulatedData.expectedLength); + const nextPayload = accumulatedData.payload.subarray(accumulatedData.expectedLength); + + accumulatedData.payload = nextPayload; // Keep remainder for next payload + + try { + callback({ + op: accumulatedData.op, + data: JSON.parse(currentPayload.toString('utf8')), + }); + + // Reset for next payload + accumulatedData.op = undefined; + accumulatedData.expectedLength = 0; + } catch (err) { + // Full payload has been received, but is not valid JSON + callback({ error: new Error('Received payload with malformed JSON', { cause: err }) }); + + // Reset for next payload + accumulatedData.op = undefined; + accumulatedData.expectedLength = 0; + + break; + } } decode(socket, callback); @@ -117,7 +141,11 @@ class IPCTransport extends EventEmitter { })); socket.pause(); socket.on('readable', () => { - decode(socket, ({ op, data }) => { + decode(socket, ({ error, op, data }) => { + if (error) { + this.client.emit('error', error); + return; + } switch (op) { case OPCodes.PING: this.send(data, OPCodes.PONG);