Skip to content

Commit

Permalink
Add infrastructure to lazily parse CXXRTL packets.
Browse files Browse the repository at this point in the history
At the moment this still forces each packet to a deserialized
representation, but it opens the door to avoiding this for packets
going through secondary links.
  • Loading branch information
whitequark committed Dec 3, 2024
1 parent fce5805 commit 7319295
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 57 deletions.
55 changes: 33 additions & 22 deletions src/cxxrtl/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,39 @@ export enum ConnectionState {
// Note that we trust that server returns well-formed JSON. It would take far too much time to
// verify its adherence to the schema here, for little gain.
export class Connection {
private readonly link: link.ILink;

private _state = ConnectionState.Initializing;

private _commands: string[] = [];
private _events: string[] = [];
private _itemValuesEncodings: string[] = [];

private promises: {
resolve: (response: proto.AnyResponse) => void;
resolve: (response: link.Packet<proto.AnyResponse>) => void;
reject: (error: Error) => void;
}[] = [];
private timestamps: Date[] = [];

private sendIndex: number = 0;
private recvIndex: number = 0;

constructor(private readonly link: link.ILink) {
constructor(link_: link.ILink) {
this.link = link_;
this.link.onRecv = this.onLinkRecv.bind(this);
this.link.onDone = this.onLinkDone.bind(this);
this.send({
this.send(link.Packet.fromObject({
type: 'greeting',
version: 0,
});
}));
}

dispose(): void {
this.link.dispose();
}

private traceSend(packet: proto.ClientPacket) {
private traceSend(linkPacket: link.Packet<proto.ClientPacket>) {
const packet = linkPacket.asObject();
if (packet.type === 'greeting') {
console.debug('[CXXRTL] C>S', packet);
} else if (packet.type === 'command') {
Expand All @@ -53,7 +57,8 @@ export class Connection {
}
}

private traceRecv(packet: proto.ServerPacket) {
private traceRecv(linkPacket: link.Packet<proto.ServerPacket>) {
const packet = linkPacket.asObject();
if (packet.type === 'greeting') {
console.debug('[CXXRTL] S>C', packet);
} else if (packet.type === 'response') {
Expand All @@ -67,17 +72,18 @@ export class Connection {
}
}

private async send(packet: proto.ClientPacket): Promise<void> {
this.traceSend(packet);
private async send(linkPacket: link.Packet<proto.ClientPacket>): Promise<void> {
this.traceSend(linkPacket);
if (this._state === ConnectionState.Disconnected) {
throw new Error('unable to send packet after link is shutdown');
} else {
this.link.send(packet);
this.link.send(linkPacket);
}
}

private async onLinkRecv(packet: proto.ServerPacket): Promise<void> {
this.traceRecv(packet);
private async onLinkRecv(linkPacket: link.Packet<proto.ServerPacket>): Promise<void> {
this.traceRecv(linkPacket);
const packet = linkPacket.asObject();
if (this._state === ConnectionState.Initializing && packet.type === 'greeting') {
if (packet.version === 0) {
this._commands = packet.commands;
Expand All @@ -93,15 +99,15 @@ export class Connection {
const nextPromise = this.promises.shift();
if (nextPromise !== undefined) {
if (packet.type === 'response') {
nextPromise.resolve(packet);
nextPromise.resolve(link.Packet.fromObject(packet));
} else {
nextPromise.reject(new CommandError(packet));
}
} else {
this.rejectPromises(new Error(`unexpected '${packet.type}' reply with no commands queued`));
}
} else if (this._state === ConnectionState.Connected && packet.type === 'event') {
await this.onEvent(packet);
await this.onEvent(link.Packet.fromObject(packet));
} else {
this.rejectPromises(new Error(`unexpected ${packet.type} packet received for ${this._state} connection`));
}
Expand All @@ -119,7 +125,7 @@ export class Connection {
}
}

async perform(command: proto.AnyCommand): Promise<proto.AnyResponse> {
async exchange(command: link.Packet<proto.AnyCommand>): Promise<link.Packet<proto.AnyResponse>> {
await this.send(command);
return new Promise((resolve, reject) => {
this.promises.push({ resolve, reject });
Expand All @@ -130,7 +136,7 @@ export class Connection {

async onDisconnected(): Promise<void> {}

async onEvent(_event: proto.AnyEvent): Promise<void> {}
async onEvent(_event: link.Packet<proto.AnyEvent>): Promise<void> {}

get state(): ConnectionState {
return this._state;
Expand All @@ -148,31 +154,36 @@ export class Connection {
return this._itemValuesEncodings.slice();
}

private async command<T extends proto.AnyResponse>(command: proto.AnyCommand): Promise<T> {
const response = await this.exchange(link.Packet.fromObject(command));
return response.cast<T>().asObject();
}

async listScopes(command: proto.CommandListScopes): Promise<proto.ResponseListScopes> {
return await this.perform(command) as proto.ResponseListScopes;
return this.command<proto.ResponseListScopes>(command);
}

async listItems(command: proto.CommandListItems): Promise<proto.ResponseListItems> {
return await this.perform(command) as proto.ResponseListItems;
return this.command<proto.ResponseListItems>(command);
}

async referenceItems(command: proto.CommandReferenceItems): Promise<proto.ResponseReferenceItems> {
return await this.perform(command) as proto.ResponseReferenceItems;
return this.command<proto.ResponseReferenceItems>(command);
}

async queryInterval(command: proto.CommandQueryInterval): Promise<proto.ResponseQueryInterval> {
return await this.perform(command) as proto.ResponseQueryInterval;
return this.command<proto.ResponseQueryInterval>(command);
}

async getSimulationStatus(command: proto.CommandGetSimulationStatus): Promise<proto.ResponseGetSimulationStatus> {
return await this.perform(command) as proto.ResponseGetSimulationStatus;
return this.command<proto.ResponseGetSimulationStatus>(command);
}

async runSimulation(command: proto.CommandRunSimulation): Promise<proto.ResponseRunSimulation> {
return await this.perform(command) as proto.ResponseRunSimulation;
return this.command<proto.ResponseRunSimulation>(command);
}

async pauseSimulation(command: proto.CommandPauseSimulation): Promise<proto.ResponsePauseSimulation> {
return await this.perform(command) as proto.ResponsePauseSimulation;
return this.command<proto.ResponsePauseSimulation>(command);
}
}
78 changes: 59 additions & 19 deletions src/cxxrtl/link.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,52 @@ import * as stream from 'node:stream';

import * as proto from './proto';

// Lazily serialize/deserialize packets in case they only need to be passed along.
export class Packet<T> {
private constructor(
private serialized: string | undefined,
private deserialized: T | undefined,
) { }

static fromString<T>(serialized: string) {
return new Packet<T>(serialized, undefined);
}

static fromObject<T>(deserialized: T) {
return new Packet<T>(undefined, deserialized);
}

asString(): string {
if (this.serialized === undefined) {
this.serialized = JSON.stringify(this.deserialized!);
}
return this.serialized;
}

asObject(): T {
if (this.deserialized === undefined) {
this.deserialized = <T>JSON.parse(this.serialized!);
}
return this.deserialized;
}

cast<U>(): Packet<U> {
return <Packet<U>>(<unknown>(this));
}

// Make sure we don't unintentionally negate the performance advantages of this wrapper.
toJSON(): never {
throw new Error('call Packet.asObject() instead of serializing with JSON.stringify()');
}
}

export interface ILink {
dispose(): void;

onRecv: (packet: proto.ServerPacket) => Promise<void>;
onRecv: (packet: Packet<proto.ServerPacket>) => Promise<void>;
onDone: () => Promise<void>;

send(packet: proto.ClientPacket): Promise<void>;
send(packet: Packet<proto.ClientPacket>): Promise<void>;
}

export class MockLink implements ILink {
Expand All @@ -22,24 +61,24 @@ export class MockLink implements ILink {
}
}

async onRecv(_serverPacket: proto.ServerPacket): Promise<void> {}
async onRecv(_serverPacket: Packet<proto.ServerPacket>): Promise<void> {}

async onDone(): Promise<void> {}

async send(clientPacket: proto.ClientPacket): Promise<void> {
async send(clientPacket: Packet<proto.ClientPacket>): Promise<void> {
if (this.conversation.length === 0) {
throw new Error('premature end of conversation');
}

const [[expectedClient, expectedServer], ...restOfConversation] = this.conversation;

if (JSON.stringify(clientPacket) === JSON.stringify(expectedClient)) {
if (clientPacket.asString() === JSON.stringify(expectedClient)) {
if (expectedServer instanceof Array) {
for (const serverPacket of expectedServer) {
await this.onRecv(serverPacket);
await this.onRecv(Packet.fromObject(serverPacket));
}
} else {
await this.onRecv(expectedServer);
await this.onRecv(Packet.fromObject(expectedServer));
}
} else {
console.error('unexpected client packet', clientPacket, '; expected:', expectedClient);
Expand Down Expand Up @@ -82,28 +121,28 @@ export class NodeStreamLink implements ILink {
// Second, convert the packet text to JSON. This can throw errors e.g. if there is foreign
// data injected between server replies, or the server is malfunctioning. In that case,
// stop processing input.
const packets: proto.ServerPacket[] = [];
const packets: Packet<proto.ServerPacket>[] = [];
for (const packetText of packetTexts) {
try {
packets.push(JSON.parse(packetText) as proto.ServerPacket);
} catch (error) {
console.error('malformed JSON: ', packetText);
this.stream.pause();
return;
}
packets.push(Packet.fromString<proto.ServerPacket>(packetText));
}

// Finally, run the handler for each of the packets. If the handler blocks, don't wait for
// its completion, but run the next handler anyway; this is because a handler can send
// another client packet, causing `onStreamData` to be re-entered, anyway.
for (const packet of packets) {
(async (packet: proto.ServerPacket) => {
const success = (async (packet) => {
try {
await this.onRecv(packet);
return true;
} catch (error) {
console.error('uncaught error in onRecv', error);
this.stream.pause();
return false;
}
})(packet);
if (!success) {
break;
}
}
}

Expand All @@ -119,11 +158,12 @@ export class NodeStreamLink implements ILink {
this.stream.destroy();
}

async onRecv(_serverPacket: proto.ServerPacket): Promise<void> {}
async onRecv(_serverPacket: Packet<proto.ServerPacket>): Promise<void> {}

async onDone(): Promise<void> {}

async send(clientPacket: proto.ClientPacket): Promise<void> {
this.stream.write(JSON.stringify(clientPacket) + '\0');
async send(clientPacket: Packet<proto.ClientPacket>): Promise<void> {
this.stream.write(clientPacket.asString());
this.stream.write('\0');
}
}
35 changes: 19 additions & 16 deletions src/debug/session.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as vscode from 'vscode';

import * as proto from '../cxxrtl/proto';
import { ILink } from '../cxxrtl/link';
import * as link from '../cxxrtl/link';
import { Connection } from '../cxxrtl/client';
import { TimeInterval, TimePoint } from '../model/time';
import { Diagnostic, DiagnosticType, Reference, Sample, UnboundReference } from '../model/sample';
Expand Down Expand Up @@ -40,10 +40,10 @@ export enum SimulationPauseReason {
export class Session {
private connection: Connection;

private secondaryLinks: ILink[] = [];
private secondaryLinks: link.ILink[] = [];
private greetingPacketPromise: Promise<proto.ServerGreeting>;

constructor(link: ILink) {
constructor(link: link.ILink) {
this.connection = new Connection(link);
this.greetingPacketPromise = new Promise((resolve, _reject) => {
this.connection.onConnected = async (greetingPacket) => resolve(greetingPacket);
Expand All @@ -53,10 +53,11 @@ export class Session {
secondaryLink.onDone();
}
};
this.connection.onEvent = async (event) => {
this.connection.onEvent = async (linkEvent) => {
for (const secondaryLink of this.secondaryLinks) {
secondaryLink.onRecv(event);
secondaryLink.onRecv(linkEvent);
}
const event = linkEvent.asObject();
if (event.event === 'simulation_paused') {
await this.handleSimulationPausedEvent(event.cause);
} else if (event.event === 'simulation_finished') {
Expand All @@ -70,33 +71,35 @@ export class Session {
this.connection.dispose();
}

createSecondaryLink(): ILink {
const link: ILink = {
createSecondaryLink(): link.ILink {
const secondaryLink: link.ILink = {
dispose: () => {
this.secondaryLinks.splice(this.secondaryLinks.indexOf(link));
this.secondaryLinks.splice(this.secondaryLinks.indexOf(secondaryLink));
},

send: async (clientPacket) => {
if (clientPacket.type === 'greeting') {
send: async (linkCommandPacket) => {
const packet = linkCommandPacket.asObject();
if (packet.type === 'greeting') {
const serverGreetingPacket = await this.greetingPacketPromise;
if (clientPacket.version === serverGreetingPacket.version) {
await link.onRecv(serverGreetingPacket);
if (packet.version === serverGreetingPacket.version) {
await secondaryLink.onRecv(link.Packet.fromObject(serverGreetingPacket));
} else {
throw new Error(
`Secondary link requested greeting version ${clientPacket.version}, ` +
`Secondary link requested greeting version ${packet.version}, ` +
`but server greeting version is ${serverGreetingPacket.version}`
);
}
} else {
const serverPacket = await this.connection.perform(clientPacket);
await link.onRecv(serverPacket);
const linkResponsePacket = await this.connection.exchange(
linkCommandPacket.cast<proto.AnyCommand>());
await secondaryLink.onRecv(linkResponsePacket);
}
},

onRecv: async (serverPacket) => {},
onDone: async () => {},
};
return link;
return secondaryLink;
}

// ======================================== Inspecting the design
Expand Down

0 comments on commit 7319295

Please sign in to comment.