diff --git a/patterns-use-cases/batching/batching-typescript/README.md b/patterns-use-cases/batching/batching-typescript/README.md new file mode 100644 index 00000000..c7b65527 --- /dev/null +++ b/patterns-use-cases/batching/batching-typescript/README.md @@ -0,0 +1,15 @@ +# Priority queue + +An example of implementing a batching stream processing handler. + +Run the example with `npm run app-dev`. + +You can simulate adding work to the queue like this: +```shell +# add one item +curl localhost:8080/batcher/myKey/receive --json '123' +# add lots +for i in $(seq 1 31); do curl localhost:8080/batcher/myKey/receive --json "$i"; done +``` + +As you do so, you can observe the logs; batches of 10 will be sent, with a timeout after a second if batches are not filled. diff --git a/patterns-use-cases/batching/batching-typescript/package.json b/patterns-use-cases/batching/batching-typescript/package.json new file mode 100644 index 00000000..70f50e33 --- /dev/null +++ b/patterns-use-cases/batching/batching-typescript/package.json @@ -0,0 +1,19 @@ +{ + "name": "@restatedev/example-pattern-batching", + "version": "0.1.0", + "description": "A Restate example showing the implementation of event batching", + "type": "commonjs", + "scripts": { + "build": "tsc --noEmitOnError", + "app-dev": "tsx --watch ./src/app.ts", + "app": "tsx ./src/app.ts" + }, + "dependencies": { + "@restatedev/restate-sdk": "^1.3.2" + }, + "devDependencies": { + "@types/node": "^20.12.7", + "tsx": "^4.17.0", + "typescript": "^5.0.2" + } +} diff --git a/patterns-use-cases/batching/batching-typescript/src/app.ts b/patterns-use-cases/batching/batching-typescript/src/app.ts new file mode 100644 index 00000000..5b84e3fe --- /dev/null +++ b/patterns-use-cases/batching/batching-typescript/src/app.ts @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate Examples for the Node.js/TypeScript SDK, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/blob/main/LICENSE + */ + +import { endpoint } from "@restatedev/restate-sdk"; + +import { batcher, batchReceiver } from "./batcher"; + +endpoint().bind(batcher).bind(batchReceiver).listen(); diff --git a/patterns-use-cases/batching/batching-typescript/src/batcher.ts b/patterns-use-cases/batching/batching-typescript/src/batcher.ts new file mode 100644 index 00000000..3b3367a6 --- /dev/null +++ b/patterns-use-cases/batching/batching-typescript/src/batcher.ts @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate Examples for the Node.js/TypeScript SDK, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/blob/main/LICENSE + */ + +import { + handlers, + object, + ObjectContext, + ObjectSharedContext, +} from "@restatedev/restate-sdk"; + +type BatcherState = { + index: number; + items: unknown[]; +}; + +const MAX_BATCH = 10; +const MAX_BATCH_WAIT_MS = 1000; + +export const batcher = object({ + name: "batcher", + handlers: { + expire: handlers.object.shared( + async ( + ctx: ObjectSharedContext, + index: number, + ): Promise => { + const currentIndex = (await ctx.get("index")) ?? 0; + + if (index !== currentIndex) { + // the batch was already sent within the expiry + return; + } else { + // slow path: we need to lock the key to send the batch + // we pay the cost of an additional invocation because we expect + // this path to be much rarer. once invocation cancellation is + // available in the SDK, we could instead cancel this expire call + // when the batch is sent, in which case this would be the only path + // and we could merge the handlers. + ctx + .objectSendClient({ name: "batcher" }, ctx.key) + .expireSlow(index); + } + }, + ), + expireSlow: async (ctx: ObjectContext, index: number) => { + const currentIndex = (await ctx.get("index")) ?? 0; + + if (index !== currentIndex) { + // the batch was sent in between the expire and expireSlow call + return; + } + + const items = (await ctx.get("items")) ?? []; + ctx.console.log( + `Sending batch ${index} with ${items.length} items as the timer fired`, + ); + return sendBatch(ctx, index, items); + }, + receive: async ( + ctx: ObjectContext, + item: unknown, + ): Promise => { + const index = (await ctx.get("index")) ?? 0; + const items = (await ctx.get("items")) ?? []; + + items.push(item); + + if (items.length >= MAX_BATCH) { + ctx.console.log( + `Sending batch ${index} as it reached ${MAX_BATCH} items`, + ); + + return sendBatch(ctx, index, items); + } + + if (items.length == 1) { + ctx.console.log( + `Adding item to new batch ${index}, will send in at most ${MAX_BATCH_WAIT_MS} ms`, + ); + + ctx + .objectSendClient({ name: "batcher" }, ctx.key, { + delay: MAX_BATCH_WAIT_MS, + }) + .expire(index); + } else { + ctx.console.log(`Adding item to batch ${index}`); + } + + ctx.set("items", items); + }, + }, +}); + +type Batch = { + items: unknown[]; +}; + +export const batchReceiver = object({ + name: "batchReceiver", + handlers: { + receive: async (ctx: ObjectContext, batch: Batch): Promise => { + ctx.console.log("Received batch:", batch); + // do stuff + }, + }, +}); + +function sendBatch( + ctx: ObjectContext, + index: number, + items: unknown[], +): void { + ctx.set("index", index + 1); + ctx.clear("items"); + + ctx + .objectSendClient({ name: "batchReceiver" }, ctx.key) + .receive({ items }); +} + +export type Batcher = typeof batcher; +export type BatchReceiver = typeof batchReceiver; diff --git a/patterns-use-cases/batching/batching-typescript/tsconfig.json b/patterns-use-cases/batching/batching-typescript/tsconfig.json new file mode 100644 index 00000000..c2946b24 --- /dev/null +++ b/patterns-use-cases/batching/batching-typescript/tsconfig.json @@ -0,0 +1,18 @@ +{ + "compilerOptions": { + "target": "esnext", + "lib": ["esnext"], + "module": "nodenext", + "allowJs": true, + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "outDir": "./dist", + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "strict": true, + "skipDefaultLibCheck": true, + "skipLibCheck": true + } +}