Skip to content

Commit 90ae6b1

Browse files
DX-1019: Read Your Writes Support (#1175)
* add: readYourWrites option interface * send local sync token on requests * fmt * add promise.all tests * add: lua script test * format tests * fmt * change upstashSyncToken convention * add public redis client test * add: fastly and cloudflare clients ryw support * fmt * add default test * add: comments * add: http comment * sync token docs * remove comment * fix readYourWrites arg comment * add: ryw operation comments * revert requester * revert requester interface
1 parent 26a3a66 commit 90ae6b1

9 files changed

+192
-19
lines changed

pkg/commands/command.ts

+2
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ export class Command<TResult, TData> {
8383
public async exec(client: Requester): Promise<TData> {
8484
const { result, error } = await client.request<TResult>({
8585
body: this.command,
86+
upstashSyncToken: client.upstashSyncToken,
8687
});
88+
8789
if (error) {
8890
throw new UpstashError(error);
8991
}

pkg/http.ts

+45-1
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,21 @@ export type UpstashRequest = {
1515
* Request body will be serialized to json
1616
*/
1717
body?: unknown;
18+
19+
upstashSyncToken?: string;
1820
};
1921
export type UpstashResponse<TResult> = { result?: TResult; error?: string };
2022

21-
export type Requester = {
23+
export interface Requester {
24+
/**
25+
* When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client.
26+
*/
27+
readYourWrites?: boolean;
28+
29+
/**
30+
* This token is used to ensure that the client is in sync with the server. On each request, we send this token in the header, and the server will return a new token.
31+
*/
32+
upstashSyncToken?: string;
2233
request: <TResult = unknown>(req: UpstashRequest) => Promise<UpstashResponse<TResult>>;
2334
};
2435

@@ -95,11 +106,17 @@ export type HttpClientConfig = {
95106
agent?: any;
96107
signal?: AbortSignal;
97108
keepAlive?: boolean;
109+
110+
/**
111+
* When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client.
112+
*/
113+
readYourWrites?: boolean;
98114
} & RequesterConfig;
99115

100116
export class HttpClient implements Requester {
101117
public baseUrl: string;
102118
public headers: Record<string, string>;
119+
103120
public readonly options: {
104121
backend?: string;
105122
agent: any;
@@ -108,6 +125,8 @@ export class HttpClient implements Requester {
108125
cache?: CacheSetting;
109126
keepAlive: boolean;
110127
};
128+
public readYourWrites: boolean;
129+
public upstashSyncToken = "";
111130

112131
public readonly retry: {
113132
attempts: number;
@@ -123,6 +142,8 @@ export class HttpClient implements Requester {
123142
signal: config.signal,
124143
keepAlive: config.keepAlive ?? true,
125144
};
145+
this.upstashSyncToken = "";
146+
this.readYourWrites = config.readYourWrites ?? true;
126147

127148
this.baseUrl = config.baseUrl.replace(/\/$/, "");
128149

@@ -185,6 +206,14 @@ export class HttpClient implements Requester {
185206
backend: this.options.backend,
186207
};
187208

209+
/**
210+
* We've recieved a new `upstash-sync-token` in the previous response. We use it in the next request to observe the effects of previous requests.
211+
*/
212+
if (this.readYourWrites) {
213+
const newHeader = this.upstashSyncToken;
214+
this.headers["upstash-sync-token"] = newHeader;
215+
}
216+
188217
let res: Response | null = null;
189218
let error: Error | null = null;
190219
for (let i = 0; i <= this.retry.attempts; i++) {
@@ -216,6 +245,20 @@ export class HttpClient implements Requester {
216245
throw new UpstashError(`${body.error}, command was: ${JSON.stringify(req.body)}`);
217246
}
218247

248+
if (this.readYourWrites) {
249+
const headers = res.headers;
250+
this.upstashSyncToken = headers.get("upstash-sync-token") ?? "";
251+
}
252+
253+
254+
/**
255+
* We save the new `upstash-sync-token` in the response header to use it in the next request.
256+
*/
257+
if (this.readYourWrites) {
258+
const headers = res.headers;
259+
this.upstashSyncToken = headers.get("upstash-sync-token") ?? "";
260+
}
261+
219262
if (this.options.responseEncoding === "base64") {
220263
if (Array.isArray(body)) {
221264
return body.map(({ result, error }) => ({
@@ -226,6 +269,7 @@ export class HttpClient implements Requester {
226269
const result = decode(body.result) as any;
227270
return { result, error: body.error };
228271
}
272+
229273
return body as UpstashResponse<TResult>;
230274
}
231275
}

pkg/pipeline.ts

+1
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ export class Pipeline<TCommands extends Command<any, any>[] = []> {
283283
throw new Error("Pipeline is empty");
284284
}
285285
const path = this.multiExec ? ["multi-exec"] : ["pipeline"];
286+
286287
const res = (await this.client.request({
287288
path,
288289
body: Object.values(this.commands).map((c) => c.command),

pkg/read-your-writes.test.ts

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import { keygen, newHttpClient } from "./test-utils";
2+
3+
import { afterAll, describe, expect, test } from "bun:test";
4+
5+
import { Redis as PublicRedis } from "../platforms/nodejs";
6+
import { SetCommand } from "./commands/set";
7+
import { Redis } from "./redis";
8+
9+
const client = newHttpClient();
10+
const { cleanup } = keygen();
11+
afterAll(cleanup);
12+
describe("Read Your Writes Feature", () => {
13+
test("successfully retrieves Upstash-Sync-Token in the response header and updates local state", async () => {
14+
const initialSync = client.upstashSyncToken;
15+
await new SetCommand(["key", "value"]).exec(client);
16+
const updatedSync = client.upstashSyncToken;
17+
await new SetCommand(["key", "value"]).exec(client);
18+
19+
expect(updatedSync).not.toEqual(initialSync);
20+
});
21+
22+
test("succesfully updates sync state with pipeline", async () => {
23+
const initialSync = client.upstashSyncToken;
24+
25+
const { pipeline } = new Redis(client);
26+
const p = pipeline();
27+
28+
p.set("key1", "value1");
29+
p.set("key2", "value2");
30+
p.set("key3", "value3");
31+
32+
await p.exec();
33+
34+
const updatedSync = client.upstashSyncToken;
35+
36+
expect(initialSync).not.toEqual(updatedSync);
37+
});
38+
39+
test("updates after each element of promise.all", async () => {
40+
let currentSync = client.upstashSyncToken;
41+
42+
const promises = Array.from({ length: 3 }, (_, i) =>
43+
new SetCommand([`key${i}`, `value${i}`]).exec(client).then(() => {
44+
expect(client.upstashSyncToken).not.toEqual(currentSync);
45+
currentSync = client.upstashSyncToken;
46+
}),
47+
);
48+
49+
await Promise.all(promises);
50+
});
51+
52+
test("updates after successful lua script call", async () => {
53+
const s = `redis.call('SET', 'mykey', 'myvalue')
54+
return 1
55+
`;
56+
57+
const initialSync = client.upstashSyncToken;
58+
59+
const redis = new Redis(client);
60+
const script = redis.createScript(s);
61+
62+
await script.exec([], []);
63+
64+
const updatedSync = client.upstashSyncToken;
65+
66+
expect(updatedSync).not.toEqual(initialSync);
67+
});
68+
69+
test("should not update the sync state in case of Redis client with manuel HTTP client and opt-out ryw", async () => {
70+
const optOutClient = newHttpClient();
71+
const redis = new Redis(optOutClient, { readYourWrites: false });
72+
73+
const initialSync = optOutClient.upstashSyncToken;
74+
75+
await redis.set("key", "value");
76+
77+
const updatedSync = optOutClient.upstashSyncToken;
78+
79+
expect(updatedSync).toEqual(initialSync);
80+
});
81+
82+
test("should not update the sync state when public Redis interface is provided with opt-out", async () => {
83+
const redis = new PublicRedis({
84+
url: process.env.UPSTASH_REDIS_REST_URL,
85+
token: process.env.UPSTASH_REDIS_REST_TOKEN,
86+
readYourWrites: false,
87+
});
88+
89+
// @ts-expect-error - We need the sync token for this test, which resides on the client
90+
const initialSync = redis.client.upstashSyncToken;
91+
92+
await redis.set("key", "value");
93+
94+
// @ts-expect-error - We need the sync token for this test, which resides on the client
95+
const updatedSync = redis.client.upstashSyncToken;
96+
97+
expect(updatedSync).toEqual(initialSync);
98+
});
99+
100+
test("should update the sync state when public Redis interface is provided with default behaviour", async () => {
101+
const redis = new PublicRedis({
102+
url: process.env.UPSTASH_REDIS_REST_URL,
103+
token: process.env.UPSTASH_REDIS_REST_TOKEN,
104+
});
105+
106+
// @ts-expect-error - We need the sync token for this test, which resides on the client
107+
const initialSync = redis.client.upstashSyncToken;
108+
109+
await redis.set("key", "value");
110+
111+
// @ts-expect-error - We need the sync token for this test, which resides on the client
112+
const updatedSync = redis.client.upstashSyncToken;
113+
expect(updatedSync).not.toEqual(initialSync);
114+
});
115+
});

pkg/redis.ts

+4
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,10 @@ export class Redis {
210210
this.client = client;
211211
this.opts = opts;
212212
this.enableTelemetry = opts?.enableTelemetry ?? true;
213+
214+
if (opts?.readYourWrites === false) {
215+
this.client.readYourWrites = false;
216+
}
213217
this.enableAutoPipelining = opts?.enableAutoPipelining ?? true;
214218
}
215219

pkg/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ export type RedisOptions = {
3131
latencyLogging?: boolean;
3232
enableTelemetry?: boolean;
3333
enableAutoPipelining?: boolean;
34+
readYourWrites?: boolean;
3435
};

platforms/cloudflare.ts

+8-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ export type RedisConfigCloudflare = {
3030
*/
3131
signal?: AbortSignal;
3232
keepAlive?: boolean;
33+
34+
/**
35+
* When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client.
36+
*/
37+
readYourWrites?: boolean;
3338
} & core.RedisOptions &
3439
RequesterConfig &
3540
Env;
@@ -51,15 +56,11 @@ export class Redis extends core.Redis {
5156
*/
5257
constructor(config: RedisConfigCloudflare, env?: Env) {
5358
if (!config.url) {
54-
throw new Error(
55-
`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`
56-
);
59+
throw new Error(`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`)
5760
}
5861

5962
if (!config.token) {
60-
throw new Error(
61-
`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`
62-
);
63+
throw new Error(`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`)
6364
}
6465

6566
if (config.url.startsWith(" ") || config.url.endsWith(" ") || /\r|\n/.test(config.url)) {
@@ -76,6 +77,7 @@ export class Redis extends core.Redis {
7677
responseEncoding: config.responseEncoding,
7778
signal: config.signal,
7879
keepAlive: config.keepAlive,
80+
readYourWrites: config.readYourWrites,
7981
});
8082

8183
super(client, {

platforms/fastly.ts

+8-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ export type RedisConfigFastly = {
2727
*/
2828
backend: string;
2929
keepAlive?: boolean;
30+
31+
/**
32+
* When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client.
33+
*/
34+
readYourWrites?: boolean;
3035
} & core.RedisOptions &
3136
RequesterConfig;
3237

@@ -48,15 +53,11 @@ export class Redis extends core.Redis {
4853
*/
4954
constructor(config: RedisConfigFastly) {
5055
if (!config.url) {
51-
throw new Error(
52-
`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`
53-
);
56+
throw new Error(`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`)
5457
}
5558

5659
if (!config.token) {
57-
throw new Error(
58-
`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`
59-
);
60+
throw new Error(`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`)
6061
}
6162

6263
if (config.url.startsWith(" ") || config.url.endsWith(" ") || /\r|\n/.test(config.url)) {
@@ -73,6 +74,7 @@ export class Redis extends core.Redis {
7374
options: { backend: config.backend },
7475
responseEncoding: config.responseEncoding,
7576
keepAlive: config.keepAlive,
77+
readYourWrites: config.readYourWrites,
7678
});
7779

7880
super(client, {

platforms/nodejs.ts

+8-6
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ export type RedisConfigNodejs = {
5353
latencyLogging?: boolean;
5454
agent?: unknown;
5555
keepAlive?: boolean;
56+
57+
/**
58+
* When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client.
59+
*/
60+
readYourWrites?: boolean;
5661
} & core.RedisOptions &
5762
RequesterConfig;
5863

@@ -97,15 +102,11 @@ export class Redis extends core.Redis {
97102
}
98103

99104
if (!configOrRequester.url) {
100-
throw new Error(
101-
`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`
102-
);
105+
throw new Error(`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`)
103106
}
104107

105108
if (!configOrRequester.token) {
106-
throw new Error(
107-
`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`
108-
);
109+
throw new Error(`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`)
109110
}
110111

111112
if (
@@ -133,6 +134,7 @@ export class Redis extends core.Redis {
133134
cache: configOrRequester.cache ?? "no-store",
134135
signal: configOrRequester.signal,
135136
keepAlive: configOrRequester.keepAlive,
137+
readYourWrites: configOrRequester.readYourWrites,
136138
});
137139

138140
super(client, {

0 commit comments

Comments
 (0)