Skip to content

Commit

Permalink
refactor: prepend a header to each WebTransport chunk
Browse files Browse the repository at this point in the history
This commit updates the format of the header added in [1], in order to
match the format used for a WebSocket frame ([2]).

Two advantages:

- small payloads only need 1 byte instead of 4
- payloads larger than 2^31 bytes are supported

[1]: 6142324
[2]: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#decoding_payload_length
  • Loading branch information
darrachequesne committed Aug 1, 2023
1 parent aea321c commit 0b5e985
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 62 deletions.
89 changes: 71 additions & 18 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,27 @@ const decodePayload = (
return packets;
};

const HEADER_LENGTH = 4;

export function createPacketEncoderStream() {
return new TransformStream({
transform(packet: Packet, controller) {
encodePacketToBinary(packet, encodedPacket => {
const header = new Uint8Array(HEADER_LENGTH);
// last 31 bits indicate the length of the payload
new DataView(header.buffer).setUint32(0, encodedPacket.length);
const payloadLength = encodedPacket.length;
let header;
// inspired by the WebSocket format: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#decoding_payload_length
if (payloadLength < 126) {
header = new Uint8Array(1);
new DataView(header.buffer).setUint8(0, payloadLength);
} else if (payloadLength < 65536) {
header = new Uint8Array(3);
const view = new DataView(header.buffer);
view.setUint8(0, 126);
view.setUint16(1, payloadLength);
} else {
header = new Uint8Array(9);
const view = new DataView(header.buffer);
view.setUint8(0, 127);
view.setBigUint64(1, BigInt(payloadLength));
}
// first bit indicates whether the payload is plain text (0) or binary (1)
if (packet.data && typeof packet.data !== "string") {
header[0] |= 0x80;
Expand Down Expand Up @@ -91,6 +103,13 @@ function concatChunks(chunks: Uint8Array[], size: number) {
return buffer;
}

const enum State {
READ_HEADER,
READ_EXTENDED_LENGTH_16,
READ_EXTENDED_LENGTH_64,
READ_PAYLOAD
}

export function createPacketDecoderStream(
maxPayload: number,
binaryType: BinaryType
Expand All @@ -99,44 +118,78 @@ export function createPacketDecoderStream(
TEXT_DECODER = new TextDecoder();
}
const chunks: Uint8Array[] = [];
let expectedSize = -1;
let state = State.READ_HEADER;
let expectedLength = -1;
let isBinary = false;

return new TransformStream({
transform(chunk: Uint8Array, controller) {
chunks.push(chunk);
while (true) {
const expectHeader = expectedSize === -1;
if (expectHeader) {
if (totalLength(chunks) < HEADER_LENGTH) {
if (state === State.READ_HEADER) {
if (totalLength(chunks) < 1) {
break;
}
const header = concatChunks(chunks, 1);
isBinary = (header[0] & 0x80) === 0x80;
expectedLength = header[0] & 0x7f;
if (expectedLength < 126) {
state = State.READ_PAYLOAD;
} else if (expectedLength === 126) {
state = State.READ_EXTENDED_LENGTH_16;
} else {
state = State.READ_EXTENDED_LENGTH_64;
}
} else if (state === State.READ_EXTENDED_LENGTH_16) {
if (totalLength(chunks) < 2) {
break;
}
const headerArray = concatChunks(chunks, 2);
expectedLength = new DataView(
headerArray.buffer,
headerArray.byteOffset,
headerArray.length
).getUint16(0);
state = State.READ_PAYLOAD;
} else if (state === State.READ_EXTENDED_LENGTH_64) {
if (totalLength(chunks) < 8) {
break;
}
const headerArray = concatChunks(chunks, HEADER_LENGTH);
const header = new DataView(
const headerArray = concatChunks(chunks, 8);

const view = new DataView(
headerArray.buffer,
headerArray.byteOffset,
headerArray.length
).getUint32(0);
);

isBinary = header >> 31 === -1;
expectedSize = header & 0x7fffffff;
const n = view.getUint32(0);

if (expectedSize === 0 || expectedSize > maxPayload) {
if (n > Math.pow(2, 53 - 32) - 1) {
// the maximum safe integer in JavaScript is 2^53 - 1
controller.enqueue(ERROR_PACKET);
break;
}

expectedLength = n * Math.pow(2, 32) + view.getUint32(4);
state = State.READ_PAYLOAD;
} else {
if (totalLength(chunks) < expectedSize) {
if (totalLength(chunks) < expectedLength) {
break;
}
const data = concatChunks(chunks, expectedSize);
const data = concatChunks(chunks, expectedLength);
controller.enqueue(
decodePacket(
isBinary ? data : TEXT_DECODER.decode(data),
binaryType
)
);
expectedSize = -1;
state = State.READ_HEADER;
}

if (expectedLength === 0 || expectedLength > maxPayload) {
controller.enqueue(ERROR_PACKET);
break;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions test/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ describe("engine.io-parser (browser only)", () => {
});

const header = await reader.read();
expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 3));
expect(header.value).to.eql(Uint8Array.of(131));

const payload = await reader.read();
expect(payload.value).to.eql(Uint8Array.of(1, 2, 3));
Expand All @@ -142,7 +142,7 @@ describe("engine.io-parser (browser only)", () => {
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

writer.write(Uint8Array.of(128, 0, 0, 3, 1, 2, 3));
writer.write(Uint8Array.of(131, 1, 2, 3));

const { value } = await reader.read();

Expand Down
91 changes: 51 additions & 40 deletions test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ describe("engine.io-parser", () => {
});

const header = await reader.read();
expect(header.value).to.eql(Uint8Array.of(0, 0, 0, 5));
expect(header.value).to.eql(Uint8Array.of(5));

const payload = await reader.read();
expect(payload.value).to.eql(Uint8Array.of(52, 49, 226, 130, 172));
Expand All @@ -99,7 +99,7 @@ describe("engine.io-parser", () => {
});

const header = await reader.read();
expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 3));
expect(header.value).to.eql(Uint8Array.of(131));

const payload = await reader.read();
expect(payload.value === data).to.be(true);
Expand All @@ -117,7 +117,7 @@ describe("engine.io-parser", () => {
});

const header = await reader.read();
expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 3));
expect(header.value).to.eql(Uint8Array.of(131));

const payload = await reader.read();
expect(payload.value).to.eql(Uint8Array.of(1, 2, 3));
Expand All @@ -135,11 +135,53 @@ describe("engine.io-parser", () => {
});

const header = await reader.read();
expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 6));
expect(header.value).to.eql(Uint8Array.of(134));

const payload = await reader.read();
expect(payload.value).to.eql(Uint8Array.of(1, 0, 2, 0, 1, 1));
});

it("should encode a binary packet (Uint8Array - medium)", async () => {
const stream = createPacketEncoderStream();

const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

const data = new Uint8Array(12345);

writer.write({
type: "message",
data
});

const header = await reader.read();
expect(header.value).to.eql(Uint8Array.of(254, 48, 57));

const payload = await reader.read();
expect(payload.value === data).to.be(true);
});

it("should encode a binary packet (Uint8Array - big)", async () => {
const stream = createPacketEncoderStream();

const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

const data = new Uint8Array(123456789);

writer.write({
type: "message",
data
});

const header = await reader.read();
expect(header.value).to.eql(
Uint8Array.of(255, 0, 0, 0, 0, 7, 91, 205, 21)
);

const payload = await reader.read();
expect(payload.value === data).to.be(true);
});
});

describe("createPacketDecoderStream", () => {
Expand All @@ -149,7 +191,7 @@ describe("engine.io-parser", () => {
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

writer.write(Uint8Array.of(0, 0, 0, 5));
writer.write(Uint8Array.of(5));
writer.write(Uint8Array.of(52, 49, 226, 130, 172));

const packet = await reader.read();
Expand All @@ -165,25 +207,16 @@ describe("engine.io-parser", () => {
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

writer.write(Uint8Array.of(0));
writer.write(Uint8Array.of(0));
writer.write(Uint8Array.of(0));
writer.write(Uint8Array.of(5));
writer.write(Uint8Array.of(52));
writer.write(Uint8Array.of(49));
writer.write(Uint8Array.of(226));
writer.write(Uint8Array.of(130));
writer.write(Uint8Array.of(172));

writer.write(Uint8Array.of(0));
writer.write(Uint8Array.of(0));
writer.write(Uint8Array.of(0));
writer.write(Uint8Array.of(1));
writer.write(Uint8Array.of(50));

writer.write(Uint8Array.of(0));
writer.write(Uint8Array.of(0));
writer.write(Uint8Array.of(0));
writer.write(Uint8Array.of(1));
writer.write(Uint8Array.of(51));

Expand All @@ -203,29 +236,7 @@ describe("engine.io-parser", () => {
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

writer.write(
Uint8Array.of(
0,
0,
0,
5,
52,
49,
226,
130,
172,
0,
0,
0,
1,
50,
0,
0,
0,
1,
51
)
);
writer.write(Uint8Array.of(5, 52, 49, 226, 130, 172, 1, 50, 1, 51));

const { value } = await reader.read();
expect(value).to.eql({ type: "message", data: "1€" });
Expand All @@ -243,7 +254,7 @@ describe("engine.io-parser", () => {
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

writer.write(Uint8Array.of(128, 0, 0, 3, 1, 2, 3));
writer.write(Uint8Array.of(131, 1, 2, 3));

const { value } = await reader.read();

Expand All @@ -258,7 +269,7 @@ describe("engine.io-parser", () => {
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

writer.write(Uint8Array.of(0, 0, 1, 0));
writer.write(Uint8Array.of(11));

const packet = await reader.read();
expect(packet.value).to.eql({ type: "error", data: "parser error" });
Expand All @@ -270,7 +281,7 @@ describe("engine.io-parser", () => {
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

writer.write(Uint8Array.of(0, 0, 0, 0));
writer.write(Uint8Array.of(0));

const packet = await reader.read();
expect(packet.value).to.eql({ type: "error", data: "parser error" });
Expand Down
4 changes: 2 additions & 2 deletions test/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ describe("engine.io-parser (node.js only)", () => {
});

const header = await reader.read();
expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 3));
expect(header.value).to.eql(Uint8Array.of(131));

const payload = await reader.read();
expect(payload.value).to.eql(Uint8Array.of(1, 2, 3));
Expand All @@ -137,7 +137,7 @@ describe("engine.io-parser (node.js only)", () => {
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

writer.write(Uint8Array.of(128, 0, 0, 3, 1, 2, 3));
writer.write(Uint8Array.of(131, 1, 2, 3));

const { value } = await reader.read();

Expand Down

0 comments on commit 0b5e985

Please sign in to comment.