Skip to content

Commit

Permalink
chore(worker rpc): improve error throwing (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrcnk authored Nov 29, 2024
1 parent 67c9205 commit c15809e
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 98 deletions.
Binary file modified bun.lockb
Binary file not shown.
1 change: 1 addition & 0 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
},
"dependencies": {
"mina-signer": "3.0.7",
"serialize-error": "^11.0.3",
"superjson": "2.2.1"
},
"devDependencies": {
Expand Down
10 changes: 5 additions & 5 deletions packages/utils/src/test/worker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { createRpcHandler } from "../worker-rpc";

const { messageHandler } = createRpcHandler({
methods: {
ping: async () => 'pong',
}
})
methods: {
ping: async () => "pong",
},
});

self.onmessage = messageHandler
self.onmessage = messageHandler;
54 changes: 28 additions & 26 deletions packages/utils/src/worker-rpc.spec.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
import { describe, it, expect, mock, beforeAll } from 'bun:test'
import { createRpc, createRpcHandler } from './worker-rpc'
import { beforeAll, describe, expect, it, mock } from "bun:test";
import { createRpc, createRpcHandler } from "./worker-rpc";

describe("Worker RPC", () => {
let worker: Worker
let worker: Worker;

beforeAll(() => {
worker = new Worker(new URL('./test/worker.ts', import.meta.url))
})
beforeAll(() => {
worker = new Worker(new URL("./test/worker.ts", import.meta.url));
});

it("creates RPC handler", async () => {
const mockedHandler = mock(async () => "pong")
const { messageHandler } = createRpcHandler({
methods: {
ping: mockedHandler,
}
})
await messageHandler(new MessageEvent('message', { data: { method: 'ping', params: [] } }))
expect(mockedHandler).toHaveBeenCalled()
})
it("creates RPC handler", async () => {
const mockedHandler = mock(async () => "pong");
const { messageHandler } = createRpcHandler({
methods: {
ping: mockedHandler,
},
});
await messageHandler(
new MessageEvent("message", { data: { method: "ping", params: [] } }),
);
expect(mockedHandler).toHaveBeenCalled();
});

it("exchanges messages with Web Worker", async () => {
const rpc = createRpc({ worker })
const response = await rpc.request({ method: 'ping', params: [] })
expect(response.result).toBe('pong')
})
it("exchanges messages with Web Worker", async () => {
const rpc = createRpc({ worker });
const response = await rpc.request({ method: "ping", params: [] });
expect(response.result).toBe("pong");
});

it("calls non-existing method", async () => {
const rpc = createRpc({ worker })
expect(rpc.request({ method: 'pang', params: [] })).rejects.toThrow()
})
})
it("calls non-existing method", async () => {
const rpc = createRpc({ worker });
expect(rpc.request({ method: "pang", params: [] })).rejects.toThrow();
});
});
140 changes: 73 additions & 67 deletions packages/utils/src/worker-rpc.ts
Original file line number Diff line number Diff line change
@@ -1,93 +1,99 @@
import { z } from "zod";
import { deserializeError, serializeError } from "serialize-error";
import superjson from "superjson";
import { z } from "zod";

const DEFAULT_TIMEOUT = 60000;

export const RequestSchema = z.object({
method: z.string(),
params: z.array(z.string()).optional(),
method: z.string(),
params: z.array(z.string()).optional(),
});

type RequestParams = z.infer<typeof RequestSchema>;

export const ResponseSchema = z
.object({
id: z.string(),
result: z.any().optional(),
error: z.string().optional(),
})
.strict();
.object({
id: z.string(),
result: z.any().optional(),
error: z.string().optional(),
})
.strict();

type Response = z.infer<typeof ResponseSchema>;

export type RequestFn = (params: RequestParams) => Promise<Response>;

export const createRpc = ({
worker,
timeout,
worker,
timeout,
}: {
worker: Worker;
timeout?: number;
worker: Worker;
timeout?: number;
}) => {
const request: RequestFn = async ({ method, params }) => {
let resolved = false;
return new Promise((resolve, reject) => {
console.log('>>>M', method, params)
setTimeout(() => {
if (resolved) return;
return reject(new Error("[WorkerRPC] Timeout reached."));
}, timeout ?? DEFAULT_TIMEOUT);
const responseListener = (event: MessageEvent) => {
resolved = true;
worker.removeEventListener("message", responseListener);
const data = superjson.parse(event.data);
const response = ResponseSchema.parse(data);
if (response.error)
return reject(new Error(`[WorkerRPC] ${response.error}`));
return resolve(response);
};
worker.addEventListener("message", responseListener);
worker.postMessage({ method, params });
});
};
return {
request,
};
const request: RequestFn = async ({ method, params }) => {
let resolved = false;
return new Promise((resolve, reject) => {
setTimeout(() => {
if (resolved) return;
return reject(new Error("[WorkerRPC] Timeout reached."));
}, timeout ?? DEFAULT_TIMEOUT);
const responseListener = (event: MessageEvent) => {
resolved = true;
worker.removeEventListener("message", responseListener);
const data = superjson.parse(event.data);
const response = ResponseSchema.parse(data);
if (response.error) {
const errorObject = superjson.parse(response.error);
const deserializedError = deserializeError(errorObject);
return reject(deserializedError);
}
return resolve(response);
};
worker.addEventListener("message", responseListener);
worker.postMessage({ method, params });
});
};
return {
request,
};
};

type Method = (params: string[]) => Promise<unknown>;
type MethodsMap = Record<string, Method>;

const respond = (data: unknown) => postMessage(superjson.stringify(data))
const respond = (data: unknown) => postMessage(superjson.stringify(data));

export const createRpcHandler = ({ methods }: { methods: MethodsMap }) => {
const methodKeys = Object.keys(methods);
if (methodKeys.length === 0) throw new Error("No methods provided.");
const MethodEnum = z.enum(['error', ...methodKeys]);
const ExtendedRequestSchema = RequestSchema.extend({
method: MethodEnum,
}).strict();
const ExtendedResponseSchema = ResponseSchema.extend({
id: MethodEnum,
}).strict();
const messageHandler = async (event: MessageEvent) => {
try {
const action = ExtendedRequestSchema.parse(event.data)
const callable = methods[action.method]
if (!callable) throw new Error(`Method "${action.method}" not found.`);
const result = await callable(action.params ?? []);
const parsedResult = ExtendedResponseSchema.parse({
id: action.method,
result,
});
return respond(parsedResult);
// biome-ignore lint/suspicious/noExplicitAny: Error handling
} catch (error: any) {
return respond(ExtendedResponseSchema.parse({
id: 'error',
error: `[WorkerRPC] ${error.message}`,
}));
}
};
return { messageHandler };
const methodKeys = Object.keys(methods);
if (methodKeys.length === 0) throw new Error("No methods provided.");
const MethodEnum = z.enum(["error", ...methodKeys]);
const ExtendedRequestSchema = RequestSchema.extend({
method: MethodEnum,
}).strict();
const ExtendedResponseSchema = ResponseSchema.extend({
id: MethodEnum,
}).strict();
const messageHandler = async (event: MessageEvent) => {
try {
const action = ExtendedRequestSchema.parse(event.data);
const callable = methods[action.method];
if (!callable) throw new Error(`Method "${action.method}" not found.`);
const result = await callable(action.params ?? []);
const parsedResult = ExtendedResponseSchema.parse({
id: action.method,
result,
});
return respond(parsedResult);
// biome-ignore lint/suspicious/noExplicitAny: Error handling
} catch (error: any) {
const serializedError = superjson.stringify(serializeError(error));
return respond(
ExtendedResponseSchema.parse({
id: "error",
error: serializedError,
}),
);
}
};
return { messageHandler };
};

0 comments on commit c15809e

Please sign in to comment.