Skip to content

Commit

Permalink
Batching example
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Oct 11, 2024
1 parent dddebd3 commit 55c42f2
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 0 deletions.
15 changes: 15 additions & 0 deletions patterns-use-cases/batching/batching-typescript/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Priority queue

An example of implementing a batching stream processing handler.

Run the example with `npm run app-dev`.

You can simulate adding work to the queue like this:
```shell
# add one item
curl localhost:8080/batcher/myKey/receive --json '123'
# add lots
for i in $(seq 1 31); do curl localhost:8080/batcher/myKey/receive --json "$i"; done
```

As you do so, you can observe the logs; batches of 10 will be sent, with a timeout after a second if batches are not filled.
19 changes: 19 additions & 0 deletions patterns-use-cases/batching/batching-typescript/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "@restatedev/example-pattern-batching",
"version": "0.1.0",
"description": "A Restate example showing the implementation of event batching",
"type": "commonjs",
"scripts": {
"build": "tsc --noEmitOnError",
"app-dev": "tsx --watch ./src/app.ts",
"app": "tsx ./src/app.ts"
},
"dependencies": {
"@restatedev/restate-sdk": "^1.3.2"
},
"devDependencies": {
"@types/node": "^20.12.7",
"tsx": "^4.17.0",
"typescript": "^5.0.2"
}
}
16 changes: 16 additions & 0 deletions patterns-use-cases/batching/batching-typescript/src/app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/blob/main/LICENSE
*/

import { endpoint } from "@restatedev/restate-sdk";

import { batcher, batchReceiver } from "./batcher";

endpoint().bind(batcher).bind(batchReceiver).listen();
131 changes: 131 additions & 0 deletions patterns-use-cases/batching/batching-typescript/src/batcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/blob/main/LICENSE
*/

import {
handlers,
object,
ObjectContext,
ObjectSharedContext,
} from "@restatedev/restate-sdk";

type BatcherState = {
index: number;
items: unknown[];
};

const MAX_BATCH = 10;
const MAX_BATCH_WAIT_MS = 1000;

export const batcher = object({
name: "batcher",
handlers: {
expire: handlers.object.shared(
async (
ctx: ObjectSharedContext<BatcherState>,
index: number,
): Promise<void> => {
const currentIndex = (await ctx.get("index")) ?? 0;

if (index !== currentIndex) {
// the batch was already sent within the expiry
return;
} else {
// slow path: we need to lock the key to send the batch
// we pay the cost of an additional invocation because we expect
// this path to be much rarer. once invocation cancellation is
// available in the SDK, we could instead cancel this expire call
// when the batch is sent, in which case this would be the only path
// and we could merge the handlers.
ctx
.objectSendClient<Batcher>({ name: "batcher" }, ctx.key)
.expireSlow(index);
}
},
),
expireSlow: async (ctx: ObjectContext<BatcherState>, index: number) => {
const currentIndex = (await ctx.get("index")) ?? 0;

if (index !== currentIndex) {
// the batch was sent in between the expire and expireSlow call
return;
}

const items = (await ctx.get("items")) ?? [];
ctx.console.log(
`Sending batch ${index} with ${items.length} items as the timer fired`,
);
return sendBatch(ctx, index, items);
},
receive: async (
ctx: ObjectContext<BatcherState>,
item: unknown,
): Promise<void> => {
const index = (await ctx.get("index")) ?? 0;
const items = (await ctx.get("items")) ?? [];

items.push(item);

if (items.length >= MAX_BATCH) {
ctx.console.log(
`Sending batch ${index} as it reached ${MAX_BATCH} items`,
);

return sendBatch(ctx, index, items);
}

if (items.length == 1) {
ctx.console.log(
`Adding item to new batch ${index}, will send in at most ${MAX_BATCH_WAIT_MS} ms`,
);

ctx
.objectSendClient<Batcher>({ name: "batcher" }, ctx.key, {
delay: MAX_BATCH_WAIT_MS,
})
.expire(index);
} else {
ctx.console.log(`Adding item to batch ${index}`);
}

ctx.set("items", items);
},
},
});

type Batch = {
items: unknown[];
};

export const batchReceiver = object({
name: "batchReceiver",
handlers: {
receive: async (ctx: ObjectContext, batch: Batch): Promise<void> => {
ctx.console.log("Received batch:", batch);
// do stuff
},
},
});

function sendBatch(
ctx: ObjectContext<BatcherState>,
index: number,
items: unknown[],
): void {
ctx.set("index", index + 1);
ctx.clear("items");

ctx
.objectSendClient<BatchReceiver>({ name: "batchReceiver" }, ctx.key)
.receive({ items });
}

export type Batcher = typeof batcher;
export type BatchReceiver = typeof batchReceiver;
18 changes: 18 additions & 0 deletions patterns-use-cases/batching/batching-typescript/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"compilerOptions": {
"target": "esnext",
"lib": ["esnext"],
"module": "nodenext",
"allowJs": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"outDir": "./dist",
"allowSyntheticDefaultImports": true,
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"strict": true,
"skipDefaultLibCheck": true,
"skipLibCheck": true
}
}

0 comments on commit 55c42f2

Please sign in to comment.