diff --git a/.github/workflows/preview-branches.yml b/.github/workflows/preview-branches.yml index 585e8322ca..34a6c9fe99 100644 --- a/.github/workflows/preview-branches.yml +++ b/.github/workflows/preview-branches.yml @@ -54,6 +54,7 @@ jobs: pulumi config set files-subdomain files-pr-${{ env.PR_ID }} pulumi config set docker-image-tag pr-${{ env.PR_ID }} pulumi config set quadratic-api-uri https://quadratic-api-dev-pr-${{ env.PR_ID }}.herokuapp.com + pulumi config set is-preview true pulumi up -y env: diff --git a/Cargo.lock b/Cargo.lock index b7cc963255..af8a5f3b7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -208,9 +208,9 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -219,9 +219,9 @@ version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -1187,10 +1187,10 @@ checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" dependencies = [ "fnv", "ident_case", - "proc-macro2 1.0.78", - "quote 1.0.35", + "proc-macro2 1.0.81", + "quote 1.0.36", "strsim", - "syn 2.0.48", + "syn 2.0.60", ] [[package]] @@ -1200,8 +1200,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", - "quote 1.0.35", - "syn 2.0.48", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -1274,9 +1274,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e57e12b69e57fad516e01e2b3960f122696fdb13420e1a88ed8e210316f2876" dependencies = [ "darling", - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -1482,9 +1482,9 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -1569,7 +1569,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.11", - "indexmap 2.2.3", + "indexmap 2.1.0", "slab", "tokio", "tokio-util", @@ -1588,7 +1588,7 @@ dependencies = [ "futures-sink", "futures-util", "http 1.0.0", - "indexmap 2.2.3", + "indexmap 2.1.0", "slab", "tokio", "tokio-util", @@ -1907,9 +1907,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.3" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" +checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -2326,9 +2326,9 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -2499,9 +2499,9 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -2557,8 +2557,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ "proc-macro-error-attr", - "proc-macro2 1.0.78", - "quote 1.0.35", + "proc-macro2 1.0.81", + "quote 1.0.36", "syn 1.0.109", "version_check", ] @@ -2569,8 +2569,8 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", + "proc-macro2 1.0.81", + "quote 1.0.36", "version_check", ] @@ -2585,9 +2585,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.78" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" dependencies = [ "unicode-ident", ] @@ -2644,7 +2644,7 @@ dependencies = [ "getrandom", "half 2.4.0", "htmlescape", - "indexmap 2.2.3", + "indexmap 2.1.0", "itertools", "js-sys", "lazy_static", @@ -2798,11 +2798,11 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.35" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ - "proc-macro2 1.0.78", + "proc-macro2 1.0.81", ] [[package]] @@ -3193,9 +3193,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.196" +version = "1.0.198" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" dependencies = [ "serde_derive", ] @@ -3213,20 +3213,20 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.196" +version = "1.0.198" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] name = "serde_json" -version = "1.0.113" +version = "1.0.116" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" +checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" dependencies = [ "itoa", "ryu", @@ -3249,9 +3249,9 @@ version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3081f5ffbb02284dda55132aa26daecedd7372a42417bbbab6f14ab7d6bb9145" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -3286,9 +3286,9 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b93fb4adc70021ac1b47f7d45e8cc4169baaa7ea58483bc5b721d19a26202212" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -3450,8 +3450,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ "heck", - "proc-macro2 1.0.78", - "quote 1.0.35", + "proc-macro2 1.0.81", + "quote 1.0.36", "rustversion", "syn 1.0.109", ] @@ -3463,10 +3463,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" dependencies = [ "heck", - "proc-macro2 1.0.78", - "quote 1.0.35", + "proc-macro2 1.0.81", + "quote 1.0.36", "rustversion", - "syn 2.0.48", + "syn 2.0.60", ] [[package]] @@ -3492,19 +3492,19 @@ version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", + "proc-macro2 1.0.81", + "quote 1.0.36", "unicode-ident", ] [[package]] name = "syn" -version = "2.0.48" +version = "2.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", + "proc-macro2 1.0.81", + "quote 1.0.36", "unicode-ident", ] @@ -3556,8 +3556,8 @@ checksum = "99f688a08b54f4f02f0a3c382aefdb7884d3d69609f785bd253dc033243e3fe4" dependencies = [ "heck", "proc-macro-error", - "proc-macro2 1.0.78", - "quote 1.0.35", + "proc-macro2 1.0.81", + "quote 1.0.36", "syn 1.0.109", ] @@ -3604,9 +3604,9 @@ version = "1.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7fbe9b594d6568a6a1443250a7e67d80b74e1e96f6d1715e1e21cc1888291d3" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -3718,9 +3718,9 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -3876,9 +3876,9 @@ version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -3943,9 +3943,9 @@ version = "7.0.0" source = "git+https://github.com/HactarCE/ts-rs/?rev=812c1a8#812c1a8b5ff3128916426e95e228f51430eb02cc" dependencies = [ "Inflector", - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", "termcolor", ] @@ -4137,8 +4137,8 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d257817081c7dffcdbab24b9e62d2def62e2ff7d00b1c20062551e6cccc145ff" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", + "proc-macro2 1.0.81", + "quote 1.0.36", ] [[package]] @@ -4194,17 +4194,17 @@ dependencies = [ "bumpalo", "log", "once_cell", - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" +checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" dependencies = [ "cfg-if", "js-sys", @@ -4218,7 +4218,7 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" dependencies = [ - "quote 1.0.35", + "quote 1.0.36", "wasm-bindgen-macro-support", ] @@ -4228,9 +4228,9 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4243,9 +4243,9 @@ checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" [[package]] name = "wasm-bindgen-test" -version = "0.3.41" +version = "0.3.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "143ddeb4f833e2ed0d252e618986e18bfc7b0e52f2d28d77d05b2f045dd8eb61" +checksum = "d9bf62a58e0780af3e852044583deee40983e5886da43a271dd772379987667b" dependencies = [ "console_error_panic_hook", "js-sys", @@ -4257,13 +4257,13 @@ dependencies = [ [[package]] name = "wasm-bindgen-test-macro" -version = "0.3.41" +version = "0.3.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5211b7550606857312bba1d978a8ec75692eae187becc5e680444fffc5e6f89" +checksum = "b7f89739351a2e03cb94beb799d47fb2cac01759b40ec441f7de39b00cbf7ef0" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] @@ -4479,9 +4479,9 @@ version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", + "proc-macro2 1.0.81", + "quote 1.0.36", + "syn 2.0.60", ] [[package]] diff --git a/infra/helpers/isPreviewEnvironment.ts b/infra/helpers/isPreviewEnvironment.ts new file mode 100644 index 0000000000..23cc07c604 --- /dev/null +++ b/infra/helpers/isPreviewEnvironment.ts @@ -0,0 +1,6 @@ +import * as pulumi from "@pulumi/pulumi"; + +const config = new pulumi.Config(); + +// Configuration from command line +export const isPreviewEnvironment = config.get("is-preview") === "true"; diff --git a/infra/shared/securityGroups.ts b/infra/shared/securityGroups.ts index 5817efe16d..1da6285de6 100644 --- a/infra/shared/securityGroups.ts +++ b/infra/shared/securityGroups.ts @@ -1,9 +1,9 @@ import * as aws from "@pulumi/aws"; +import { isPreviewEnvironment } from "../helpers/isPreviewEnvironment"; // Create a Security Group for the Files EC2 instance export const filesEc2SecurityGroup = new aws.ec2.SecurityGroup("files-sg", { ingress: [ - // { protocol: "tcp", fromPort: 22, toPort: 22, cidrBlocks: ["0.0.0.0/0"] }, { protocol: "tcp", fromPort: 80, @@ -39,7 +39,6 @@ export const multiplayerEc2SecurityGroup = new aws.ec2.SecurityGroup( "multiplayer-sg", { ingress: [ - // { protocol: "tcp", fromPort: 22, toPort: 22, cidrBlocks: ["0.0.0.0/0"] }, { protocol: "tcp", fromPort: 80, @@ -67,3 +66,23 @@ export const redisSecurityGroup = new aws.ec2.SecurityGroup("redis-sg", { }, ], }); + +// Allow SSH traffic to the Preview Instances +if (isPreviewEnvironment) { + new aws.ec2.SecurityGroupRule(`files-ssh-ingress-rule`, { + type: "ingress", + fromPort: 22, + toPort: 22, + protocol: "tcp", + cidrBlocks: ["0.0.0.0/0"], + securityGroupId: filesEc2SecurityGroup.id, + }); + new aws.ec2.SecurityGroupRule(`multiplayer-ssh-ingress-rule`, { + type: "ingress", + fromPort: 22, + toPort: 22, + protocol: "tcp", + cidrBlocks: ["0.0.0.0/0"], + securityGroupId: multiplayerEc2SecurityGroup.id, + }); +} diff --git a/quadratic-client/src/app/events/events.ts b/quadratic-client/src/app/events/events.ts index e8da6a8e7d..41536135c8 100644 --- a/quadratic-client/src/app/events/events.ts +++ b/quadratic-client/src/app/events/events.ts @@ -21,7 +21,7 @@ import { import EventEmitter from 'eventemitter3'; interface EventTypes { - needRefresh: (state: 'required' | 'recommended') => void; + needRefresh: (state: 'required' | 'recommended' | 'force') => void; search: (found?: SheetPosTS[], current?: number) => void; hoverCell: (cell?: JsRenderCodeCell | EditingCell) => void; diff --git a/quadratic-client/src/app/ui/UpdateAlertVersion.tsx b/quadratic-client/src/app/ui/UpdateAlertVersion.tsx index f51dcc4d01..7901803510 100644 --- a/quadratic-client/src/app/ui/UpdateAlertVersion.tsx +++ b/quadratic-client/src/app/ui/UpdateAlertVersion.tsx @@ -6,9 +6,9 @@ import { events } from '../events/events'; import { FixedBottomAlert } from './components/PermissionOverlay'; export const UpdateAlertVersion = () => { - const [showDialog, setShowDialog] = useState(false); + const [showDialog, setShowDialog] = useState(false); useEffect(() => { - const needRefresh = (refresh: 'required' | 'recommended') => setShowDialog(refresh); + const needRefresh = (refresh: 'required' | 'recommended' | 'force') => setShowDialog(refresh); events.on('needRefresh', needRefresh); return () => { events.off('needRefresh', needRefresh); @@ -17,6 +17,11 @@ export const UpdateAlertVersion = () => { if (showDialog === false) return null; + if (showDialog === 'force') { + window.location.reload(); + return null; + } + return (
diff --git a/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayer.ts b/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayer.ts index cf81d5eea8..7ca3efef10 100644 --- a/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayer.ts +++ b/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayer.ts @@ -109,6 +109,10 @@ export class Multiplayer { this.receiveUsersInRoom(e.data.room); break; + case 'multiplayerClientReload': + events.emit('needRefresh', 'force'); + break; + default: console.warn('Unhandled message type', e.data); } diff --git a/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayerClientMessages.ts b/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayerClientMessages.ts index 19272b4858..4fbe2affc2 100644 --- a/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayerClientMessages.ts +++ b/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayerClientMessages.ts @@ -80,11 +80,16 @@ export interface MultiplayerClientUsersInRoom { room: ReceiveRoom; } +export interface MultiplayerClientReload { + type: 'multiplayerClientReload'; +} + export type MultiplayerClientMessage = | MultiplayerClientState | MultiplayerClientUserUpdate | MultiplayerClientUsersInRoom - | MultiplayerClientUserUpdate; + | MultiplayerClientUserUpdate + | MultiplayerClientReload; export type ClientMultiplayerMessage = | ClientMultiplayerInit diff --git a/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayerTypes.ts b/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayerTypes.ts index 7bcfed606d..c7b5473fb8 100644 --- a/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayerTypes.ts +++ b/quadratic-client/src/app/web-workers/multiplayerWebWorker/multiplayerTypes.ts @@ -131,7 +131,7 @@ export interface ReceiveCurrentTransaction { export interface ReceiveError { type: 'Error'; - error: string; + error: string | Record; error_level: string; } diff --git a/quadratic-client/src/app/web-workers/multiplayerWebWorker/worker/multiplayerClient.ts b/quadratic-client/src/app/web-workers/multiplayerWebWorker/worker/multiplayerClient.ts index f7a9c10ad6..492d400b71 100644 --- a/quadratic-client/src/app/web-workers/multiplayerWebWorker/worker/multiplayerClient.ts +++ b/quadratic-client/src/app/web-workers/multiplayerWebWorker/worker/multiplayerClient.ts @@ -86,6 +86,12 @@ class MultiplayerClient { state: data, }); } + + reload() { + this.send({ + type: 'multiplayerClientReload', + }); + } } export const multiplayerClient = new MultiplayerClient(); diff --git a/quadratic-client/src/app/web-workers/multiplayerWebWorker/worker/multiplayerServer.ts b/quadratic-client/src/app/web-workers/multiplayerWebWorker/worker/multiplayerServer.ts index cbbabae453..178495c8c1 100644 --- a/quadratic-client/src/app/web-workers/multiplayerWebWorker/worker/multiplayerServer.ts +++ b/quadratic-client/src/app/web-workers/multiplayerWebWorker/worker/multiplayerServer.ts @@ -256,6 +256,15 @@ export class MultiplayerServer { case 'Error': if (data.error_level === 'Error') { + // If the server is missing transactions, reload the page + if (typeof data.error != 'string' && 'MissingTransactions' in data.error) { + console.warn( + `[Multiplayer] Warn: Missing transactions, expected ${data.error.MissingTransactions[0]}, but got ${data.error.MissingTransactions[1]}` + ); + multiplayerClient.reload(); + break; + } + Sentry.captureException({ message: `Error response from the multiplayer server: ${data.error}`, level: 'error', diff --git a/quadratic-files/.env.docker b/quadratic-files/.env.docker index c35105ea3f..7d6e07a662 100644 --- a/quadratic-files/.env.docker +++ b/quadratic-files/.env.docker @@ -1,7 +1,9 @@ HOST=0.0.0.0 PORT=3002 -FILE_CHECK_S=5 +FILE_CHECK_S=3 FILES_PER_CHECK=100 +TRUNCATE_FILE_CHECK_S=3600 # 1 hour +TRUNCATE_TRANSACTION_AGE_DAYS=5 # 5 days ENVIRONMENT=docker QUADRATIC_API_URI=http://quadratic-api:8000 @@ -11,6 +13,7 @@ PUBSUB_HOST=redis PUBSUB_PORT=6379 PUBSUB_PASSWORD= PUBSUB_ACTIVE_CHANNELS=active_channels +PUBSUB_PROCESSED_TRANSACTIONS_CHANNEL=processed_transactions AWS_S3_REGION= AWS_S3_BUCKET_NAME=quadratic-api-docker diff --git a/quadratic-files/.env.example b/quadratic-files/.env.example index 02c40d0e2f..0a3f0ee3f8 100644 --- a/quadratic-files/.env.example +++ b/quadratic-files/.env.example @@ -2,6 +2,8 @@ HOST=127.0.0.1 PORT=3002 FILE_CHECK_S=3 FILES_PER_CHECK=100 +TRUNCATE_FILE_CHECK_S=3600 # 1 hour +TRUNCATE_TRANSACTION_AGE_DAYS=5 # 5 days ENVIRONMENT=docker QUADRATIC_API_URI=http://localhost:8000 @@ -11,6 +13,7 @@ PUBSUB_HOST=0.0.0.0 PUBSUB_PORT=6379 PUBSUB_PASSWORD= PUBSUB_ACTIVE_CHANNELS=active_channels +PUBSUB_PROCESSED_TRANSACTIONS_CHANNEL=processed_transactions AWS_S3_REGION= AWS_S3_BUCKET_NAME= diff --git a/quadratic-files/.env.test b/quadratic-files/.env.test index 5e8b717b25..ca08711da8 100644 --- a/quadratic-files/.env.test +++ b/quadratic-files/.env.test @@ -2,6 +2,8 @@ HOST=127.0.0.1 PORT=3002 FILE_CHECK_S=3 FILES_PER_CHECK=100 +TRUNCATE_FILE_CHECK_S=3600 # 1 hour +TRUNCATE_TRANSACTION_AGE_DAYS=5 # 5 days ENVIRONMENT=test QUADRATIC_API_URI=http://localhost:8000 @@ -10,6 +12,8 @@ M2M_AUTH_TOKEN=M2M_AUTH_TOKEN PUBSUB_HOST=0.0.0.0 PUBSUB_PORT=6379 PUBSUB_PASSWORD= +PUBSUB_ACTIVE_CHANNELS=active_channels +PUBSUB_PROCESSED_TRANSACTIONS_CHANNEL=processed_transactions AWS_S3_REGION= AWS_S3_BUCKET_NAME= diff --git a/quadratic-files/src/config.rs b/quadratic-files/src/config.rs index 1716f70592..8ecaaa362a 100644 --- a/quadratic-files/src/config.rs +++ b/quadratic-files/src/config.rs @@ -17,12 +17,15 @@ pub(crate) struct Config { pub(crate) port: String, pub(crate) file_check_s: i64, pub(crate) files_per_check: i64, + pub(crate) truncate_file_check_s: i64, + pub(crate) truncate_transaction_age_days: i64, pub(crate) environment: Environment, pub(crate) pubsub_host: String, pub(crate) pubsub_port: String, pub(crate) pubsub_password: String, pub(crate) pubsub_active_channels: String, + pub(crate) pubsub_processed_transactions_channel: String, pub(crate) quadratic_api_uri: String, pub(crate) m2m_auth_token: String, diff --git a/quadratic-files/src/error.rs b/quadratic-files/src/error.rs index 7f75d21131..7444ba7e6d 100644 --- a/quadratic-files/src/error.rs +++ b/quadratic-files/src/error.rs @@ -52,6 +52,9 @@ pub(crate) enum FilesError { #[error("Transaction queue error: {0}")] TransactionQueue(String), + #[error("Error truncating files: {0}")] + Truncate(String), + #[error("unknown error: {0}")] Unknown(String), } diff --git a/quadratic-files/src/file.rs b/quadratic-files/src/file.rs index 668e6cbb87..41e2db883f 100644 --- a/quadratic-files/src/file.rs +++ b/quadratic-files/src/file.rs @@ -23,6 +23,7 @@ use quadratic_rust_shared::{ use crate::{ error::{FilesError, Result}, state::{settings::Settings, State}, + truncate::{add_processed_transaction, processed_transaction_key}, }; pub static GROUP_NAME: &str = "quadratic-file-service-1"; @@ -126,7 +127,7 @@ pub(crate) async fn process_queue_for_room( // get all transactions for the room in the queue let transactions = pubsub .connection - .get_messages_from(channel, &checkpoint_sequence_num.to_string()) + .get_messages_from(channel, &checkpoint_sequence_num.to_string(), false) .await? .iter() .flat_map(|(_, message)| serde_json::from_str::(message)) @@ -191,9 +192,11 @@ pub(crate) async fn process_queue_for_room( // confirm that transactions have been processed pubsub .connection - .ack(channel, GROUP_NAME, keys, Some(active_channels)) + .ack(channel, GROUP_NAME, keys, Some(active_channels), false) .await?; + drop(pubsub); + // update the checkpoint in quadratic-api let key = &key(*file_id, last_sequence_num); set_file_checkpoint( @@ -207,7 +210,22 @@ pub(crate) async fn process_queue_for_room( ) .await?; + // add FILE_ID.SEQUENCE_NUM to the processed transactions channel + let message = processed_transaction_key(&file_id.to_string(), &last_sequence_num.to_string()); + let processed_transactions_channel = state + .settings + .pubsub_processed_transactions_channel + .to_owned(); + + add_processed_transaction( + &Arc::clone(state), + &processed_transactions_channel, + &message, + ) + .await?; + state.stats.lock().await.last_processed_file_time = Some(Instant::now()); + state.stats.lock().await.files_to_process_in_pubsub = 0; tracing::info!( "Processed sequence numbers {first_sequence_num} - {last_sequence_num} for room {file_id} in {:?}", start.elapsed() diff --git a/quadratic-files/src/main.rs b/quadratic-files/src/main.rs index c4439eff6c..880d4258c1 100644 --- a/quadratic-files/src/main.rs +++ b/quadratic-files/src/main.rs @@ -10,6 +10,7 @@ mod server; mod state; #[cfg(test)] mod test_util; +mod truncate; use error::Result; diff --git a/quadratic-files/src/server.rs b/quadratic-files/src/server.rs index cca224dfd4..a9ecdeba7b 100644 --- a/quadratic-files/src/server.rs +++ b/quadratic-files/src/server.rs @@ -12,6 +12,7 @@ use tokio::time; use tower_http::trace::{DefaultMakeSpan, TraceLayer}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use crate::truncate::truncate_processed_transactions; use crate::{ config::config, error::{FilesError, Result}, @@ -81,8 +82,34 @@ pub(crate) async fn serve() -> Result<()> { } }); + // in a separate thread, truncate streams/channels + tokio::spawn({ + let state = Arc::clone(&state); + + async move { + let mut interval = + time::interval(Duration::from_secs(config.truncate_file_check_s as u64)); + + loop { + interval.tick().await; + + if let Err(error) = truncate_processed_transactions( + &state, + &config.pubsub_processed_transactions_channel, + config.truncate_transaction_age_days as u64, + ) + .await + { + tracing::error!("Error truncating streams: {error}"); + } + } + } + }); + // in a separate thread, log stats tokio::spawn({ + let state = Arc::clone(&state); + async move { let mut interval = time::interval(Duration::from_secs(HEALTHCHECK_INTERVAL_S)); diff --git a/quadratic-files/src/state/settings.rs b/quadratic-files/src/state/settings.rs index 206418f449..952a5eb69f 100644 --- a/quadratic-files/src/state/settings.rs +++ b/quadratic-files/src/state/settings.rs @@ -9,6 +9,7 @@ pub(crate) struct Settings { pub(crate) m2m_auth_token: String, pub(crate) aws_client: Client, pub(crate) aws_s3_bucket_name: String, + pub(crate) pubsub_processed_transactions_channel: String, } impl Settings { @@ -26,6 +27,9 @@ impl Settings { ) .await, aws_s3_bucket_name: config.aws_s3_bucket_name.to_owned(), + pubsub_processed_transactions_channel: config + .pubsub_processed_transactions_channel + .to_owned(), } } } diff --git a/quadratic-files/src/state/stats.rs b/quadratic-files/src/state/stats.rs index fc2fd436f5..539bcb5858 100644 --- a/quadratic-files/src/state/stats.rs +++ b/quadratic-files/src/state/stats.rs @@ -6,24 +6,35 @@ use tokio::time::Instant; pub(crate) struct Stats { pub(crate) last_processed_file_time: Option, pub(crate) files_to_process_in_pubsub: u64, + pub(crate) channels_to_truncate_in_pubsub: u64, + pub(crate) last_truncated_transaction_time: Option, } #[derive(Debug, Default, Serialize)] pub(crate) struct StatsResponse { pub(crate) last_processed_file: String, pub(crate) files_to_process_in_pubsub: u64, + pub(crate) last_processed_transaction: String, + pub(crate) channels_to_truncate_in_pubsub: u64, +} + +fn ago(time: Option, kind: &str) -> String { + match time { + Some(time) => format!("{:?} seconds ago", time.elapsed().as_secs()), + None => format!("No {kind} processed yet"), + } } impl Display for Stats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let last_processed_file = match self.last_processed_file_time { - Some(time) => format!("{:?} seconds ago", time.elapsed().as_secs()), - None => "No files processed yet".to_string(), - }; + let last_processed_file = ago(self.last_processed_file_time, "files"); + let last_processed_transaction = ago(self.last_truncated_transaction_time, "transactions"); let stats = StatsResponse { last_processed_file, files_to_process_in_pubsub: self.files_to_process_in_pubsub, + last_processed_transaction, + channels_to_truncate_in_pubsub: self.channels_to_truncate_in_pubsub, }; write!(f, "{}", serde_json::to_string(&stats).unwrap()) diff --git a/quadratic-files/src/truncate.rs b/quadratic-files/src/truncate.rs new file mode 100644 index 0000000000..383eb1e345 --- /dev/null +++ b/quadratic-files/src/truncate.rs @@ -0,0 +1,264 @@ +use chrono::{Days, Utc}; +use std::sync::Arc; +use tokio::time::Instant; + +use quadratic_rust_shared::pubsub::PubSub as PubSubTrait; + +use crate::{ + error::{FilesError, Result}, + state::State, +}; + +pub(crate) fn processed_transaction_key(file_id: &str, sequence_num: &str) -> String { + format!("{}.{}", file_id, sequence_num) +} + +pub(crate) fn parse_processed_transaction_key(key: &str) -> Result<(String, String)> { + let split = key.split('.').collect::>(); + + if split.len() < 2 { + return Err(FilesError::Unknown(format!("Could not parse key {key}"))); + } + + Ok((split[0].to_string(), split[1].to_string())) +} + +/// Process outstanding transactions in the queue, but only for the given key. +/// This is useful for testing as we need to create timestamps in the past +/// rather than accepting NOW() (the default). +async fn add_processed_transaction_with_key( + state: &Arc, + channel: &str, + message: &str, + key: &str, +) -> Result<()> { + state + .pubsub + .lock() + .await + .connection + .publish(channel, key, message, None) + .await?; + + Ok(()) +} + +/// Process outstanding transactions in the queue +pub(crate) async fn add_processed_transaction( + state: &Arc, + channel: &str, + message: &str, +) -> Result<()> { + add_processed_transaction_with_key(state, channel, message, "*").await +} + +/// Process outstanding transactions in the queue +pub(crate) async fn truncate_processed_transaction( + state: &Arc, + processed_transactions_channel: &str, + key: &str, + file_id: &str, + sequence_num: &str, +) -> Result<()> { + let start = Instant::now(); + + // this is an expensive lock + let mut pubsub = state.pubsub.lock().await; + + tracing::trace!("Attempting to truncate at sequence number {sequence_num} for file {file_id}"); + + // Redis does not trim inclusively, so we need to add a 1 to the sequence number + let inclusive_sequence_num = sequence_num.parse::().unwrap_or(0) + 1; + + // trim the channel at the sequence number + pubsub + .connection + .trim(file_id, &inclusive_sequence_num.to_string()) + .await?; + + // trim the process transactions channel for this checkpoint + pubsub + .connection + .trim(processed_transactions_channel, key) + .await?; + + state.stats.lock().await.last_truncated_transaction_time = Some(Instant::now()); + + tracing::trace!( + "Truncated at sequence number {sequence_num} for file {file_id} in {:?}", + start.elapsed() + ); + + Ok(()) +} + +/// Get all messages from the processed transactions channel at the given timestamp (transaction_age_days) +pub(crate) async fn get_messages( + state: &Arc, + processed_transactions_channel: &str, + transaction_age_days: u64, +) -> Result> { + // milliseconds from TRANSACTION_AGE_DAYS ago + let millis = Utc::now() + .checked_sub_days(Days::new(transaction_age_days)) + .ok_or_else(|| { + FilesError::Truncate(format!( + "Could not create a date {transaction_age_days} days from now" + )) + })? + .timestamp_millis(); + + // get all messages from the processed transactions channel (ignores consumer groups) + let messages = state + .pubsub + .lock() + .await + .connection + .get_messages_before(processed_transactions_channel, &millis.to_string(), true) + .await?; + + Ok(messages) +} + +/// Process outstanding transactions in the queue +pub(crate) async fn truncate_processed_transactions( + state: &Arc, + channel: &str, + transaction_age_days: u64, +) -> Result<()> { + // get messages from the channel + let messages = get_messages(state, channel, transaction_age_days).await?; + + // collect info for stats + state.stats.lock().await.channels_to_truncate_in_pubsub = messages.len() as u64; + + for (key, value) in messages.iter() { + let (file_id, sequence_num) = parse_processed_transaction_key(value)?; + + if let Err(error) = + truncate_processed_transaction(state, channel, key, &file_id, &sequence_num).await + { + tracing::error!("Error truncating channel {file_id}.{sequence_num}: {error}"); + }; + } + + state.stats.lock().await.channels_to_truncate_in_pubsub = 0; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use uuid::Uuid; + + use super::*; + use crate::test_util::new_arc_state; + + async fn assert_file_messages(state: Arc, file_ids: &Vec, expected: usize) { + for file_id in file_ids { + let messages = state + .pubsub + .lock() + .await + .connection + .get_messages_before(&file_id.to_string(), "+", false) + .await + .unwrap(); + + assert_eq!( + messages.len(), + expected, + "expected file {} to have {} messages", + file_id, + expected + ); + } + } + + pub(crate) async fn add_processed_transaction_in_days( + state: &Arc, + channel: &str, + message: &str, + days_old: u64, + ) -> Result<()> { + let millis = Utc::now() + .checked_sub_days(Days::new(days_old)) + .unwrap() + .timestamp_millis(); + + add_processed_transaction_with_key(state, channel, message, &millis.to_string()).await + } + + #[tokio::test] + async fn truncates_files() { + let state = new_arc_state().await; + let channel = format!("processed_transactions_{}", Uuid::new_v4()); + let mut file_ids = vec![]; + const AGE: u64 = 5; + + // add 10 transactions to 10 channels + for i in 1..=10 { + let file_id = Uuid::new_v4(); + file_ids.push(file_id.to_string()); + + for j in 1..=10 { + state + .pubsub + .lock() + .await + .connection + .publish( + &file_id.to_string(), + &j.to_string(), + &format!("message {i}-{j}"), + Some("active_channels"), + ) + .await + .unwrap(); + + let message = processed_transaction_key(&file_id.to_string(), &j.to_string()); + add_processed_transaction_in_days(&state, &channel, &message, AGE) + .await + .unwrap(); + } + } + + // wait + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // verify the messages are in the FILE_ID channel + assert_file_messages(state.clone(), &file_ids, 10).await; + + // verify the messages are NOT ready to be processed (AGE + 1 days old) + let process_transactions_messages = get_messages(&state, &channel, AGE + 1).await.unwrap(); + assert_eq!(process_transactions_messages.len(), 0); + + truncate_processed_transactions(&state, &channel, AGE + 1) + .await + .unwrap(); + + // wait + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // verify the messages are in the FILE_ID channel + assert_file_messages(state.clone(), &file_ids, 10).await; + + // verify the messages are in the process_transactions channel + let process_transactions_messages = get_messages(&state, &channel, AGE).await.unwrap(); + assert_eq!(process_transactions_messages.len(), 100); + + truncate_processed_transactions(&state, &channel, 0) + .await + .unwrap(); + + // wait + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // the most recent message should be remain since redis is range exclusive + let process_transactions_messages = get_messages(&state, &channel, AGE).await.unwrap(); + assert_eq!(process_transactions_messages.len(), 1); + + // all files should be processed + assert_file_messages(state.clone(), &file_ids, 0).await; + } +} diff --git a/quadratic-multiplayer/src/error.rs b/quadratic-multiplayer/src/error.rs index 1c93019f10..5ab5a3bdf3 100644 --- a/quadratic-multiplayer/src/error.rs +++ b/quadratic-multiplayer/src/error.rs @@ -63,6 +63,9 @@ pub(crate) enum MpError { #[error("Error reading MinVersion file: {0}")] MinVersion(String), + #[error("Requested {0} transactions but only found {1}")] + MissingTransactions(String, String), + #[error("PubSub error: {0}")] PubSub(String), diff --git a/quadratic-multiplayer/src/message/handle.rs b/quadratic-multiplayer/src/message/handle.rs index fb4765b6b5..10adccf4d6 100644 --- a/quadratic-multiplayer/src/message/handle.rs +++ b/quadratic-multiplayer/src/message/handle.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use tokio::sync::Mutex; use uuid::Uuid; -use crate::error::{MpError, Result}; +use crate::error::{ErrorLevel, MpError, Result}; use crate::get_mut_room; use crate::message::{ broadcast, request::MessageRequest, response::MessageResponse, send_user_message, @@ -216,10 +216,37 @@ pub(crate) async fn handle_message( // update the heartbeat state.update_user_heartbeat(file_id, &session_id).await?; + let sequence_num = state.get_sequence_num(&file_id).await?; + + // calculate the expected number of transactions to get from redis + // add 1 to include the min_sequence_num (inclusive range) + let expected_num_transactions = sequence_num + .checked_sub(min_sequence_num) + .unwrap_or_default() + + 1; + + tracing::warn!("min_sequence_num: {}", min_sequence_num); + tracing::warn!("sequence_num: {}", sequence_num); + tracing::warn!("expected_num_transactions: {}", expected_num_transactions); + let transactions = state .get_messages_from_pubsub(&file_id, min_sequence_num) .await?; + tracing::warn!("got: {}", transactions.len()); + + // we don't have the expected number of transactions + // send an error to the client so they can reload + if transactions.len() < expected_num_transactions as usize { + return Ok(Some(MessageResponse::Error { + error: MpError::MissingTransactions( + expected_num_transactions.to_string(), + transactions.len().to_string(), + ), + error_level: ErrorLevel::Error, + })); + } + let response = MessageResponse::Transactions { transactions: serde_json::to_string(&transactions)?, }; @@ -287,7 +314,7 @@ pub(crate) mod tests { user_1: User, request: MessageRequest, response: Option, - broadcast_response: MessageResponse, + broadcast_response: Option, ) { let stream = state ._get_user_in_room(&file_id, &user_1.session_id) @@ -301,8 +328,10 @@ pub(crate) mod tests { .unwrap(); assert_eq!(handled, response); - let received = integration_test_receive(&socket, 2).await.unwrap(); - assert_eq!(received, broadcast_response); + if let Some(broadcast_response) = broadcast_response { + let received = integration_test_receive(&socket, 2).await.unwrap(); + assert_eq!(received, broadcast_response); + } } #[tokio::test] @@ -334,7 +363,16 @@ pub(crate) mod tests { update, }; - test_handle(socket, state, file_id, user_1, request, None, response).await; + test_handle( + socket, + state, + file_id, + user_1, + request, + None, + Some(response), + ) + .await; } #[tokio::test] @@ -372,7 +410,7 @@ pub(crate) mod tests { user_1, request, None, - response, + Some(response), ) .await; @@ -405,7 +443,7 @@ pub(crate) mod tests { user_1, request, None, - response, + Some(response), ) .await; @@ -414,7 +452,7 @@ pub(crate) mod tests { } #[tokio::test] - async fn handle_transaction() { + async fn handle_set_and_get_transactions() { let (socket, state, _, file_id, user_1, _) = setup().await; let id = Uuid::new_v4(); let session_id = user_1.session_id; @@ -434,10 +472,77 @@ pub(crate) mod tests { let response = MessageResponse::Transaction { id, file_id, - operations, + operations: operations.clone(), sequence_num: 1, }; - test_handle(socket, state, file_id, user_1, request, None, response).await; + test_handle( + socket.clone(), + state.clone(), + file_id, + user_1.clone(), + request, + None, + Some(response.clone()), + ) + .await; + + // now test get_transactions + let request = MessageRequest::GetTransactions { + file_id, + session_id, + min_sequence_num: 1, + }; + + let string_operations = operations.to_string(); + let response = MessageResponse::Transactions { + transactions: format!("[{{\"id\":\"{id}\",\"file_id\":\"{file_id}\",\"operations\":{string_operations},\"sequence_num\":1}}]"), + }; + + // expect an empty array since there are no transactions + test_handle( + socket, + state, + file_id, + user_1, + request, + Some(response), + None, + ) + .await; + } + + #[tokio::test] + async fn handle_missing_transactions() { + let (socket, state, _, file_id, user_1, _) = setup().await; + let session_id = user_1.session_id; + + let request = MessageRequest::GetTransactions { + file_id, + session_id, + min_sequence_num: 1, + }; + + // increment the sequence_num + get_mut_room!(state, file_id) + .unwrap() + .increment_sequence_num(); + + let response = MessageResponse::Error { + error: MpError::MissingTransactions("1".into(), "0".into()), // requested 1, got 0 + error_level: ErrorLevel::Error, + }; + + // expect an error since we're requesting a higher sequence_num + test_handle( + socket, + state, + file_id, + user_1, + request, + Some(response), + None, + ) + .await; } } diff --git a/quadratic-multiplayer/src/server.rs b/quadratic-multiplayer/src/server.rs index 2e844ec71c..4157023ef2 100644 --- a/quadratic-multiplayer/src/server.rs +++ b/quadratic-multiplayer/src/server.rs @@ -213,16 +213,11 @@ async fn handle_socket( } match error { - // kill the ws connection for auth errors - MpError::Authentication(_) => { - break; - } - // kill the ws connection for file permission errors - MpError::FilePermissions(_) => { - break; - } - // kill the ws connection for room not found errors - MpError::RoomNotFound(_) => { + // kill the ws connection for certain errors + MpError::Authentication(_) + | MpError::UserNotFound(_, _) + | MpError::FilePermissions(_) + | MpError::RoomNotFound(_) => { break; } // noop diff --git a/quadratic-multiplayer/src/state/pubsub.rs b/quadratic-multiplayer/src/state/pubsub.rs index 32a4c74746..28d7fa1df9 100644 --- a/quadratic-multiplayer/src/state/pubsub.rs +++ b/quadratic-multiplayer/src/state/pubsub.rs @@ -123,7 +123,7 @@ impl State { .lock() .await .connection - .get_messages_from(&file_id.to_string(), &min_sequence_num.to_string()) + .get_messages_from(&file_id.to_string(), &min_sequence_num.to_string(), false) .await? .iter() .flat_map(|(_, message)| serde_json::from_str::(message)) @@ -138,7 +138,7 @@ impl State { .lock() .await .connection - .last_message(&file_id.to_string()) + .last_message(&file_id.to_string(), false) .await?) } } diff --git a/quadratic-multiplayer/src/state/room.rs b/quadratic-multiplayer/src/state/room.rs index 54b7093328..557e75d832 100644 --- a/quadratic-multiplayer/src/state/room.rs +++ b/quadratic-multiplayer/src/state/room.rs @@ -169,10 +169,18 @@ macro_rules! get_or_create_room { } else { let url = &$self.settings.quadratic_api_uri; let jwt = &$self.settings.m2m_auth_token; - quadratic_rust_shared::quadratic_api::get_file_checkpoint(url, jwt, &$file_id) - .await? - .sequence_number - .max($sequence_num) + let response = quadratic_rust_shared::quadratic_api::get_file_checkpoint( + url, jwt, &$file_id, + ) + .await? + .sequence_number + .max($sequence_num); + tracing::info!( + "Retrieved sequence number {} for room {}", + response, + $file_id + ); + response } } }; diff --git a/quadratic-rust-shared/src/pubsub/mod.rs b/quadratic-rust-shared/src/pubsub/mod.rs index e6c9bd54d5..8753df28b5 100644 --- a/quadratic-rust-shared/src/pubsub/mod.rs +++ b/quadratic-rust-shared/src/pubsub/mod.rs @@ -57,24 +57,38 @@ pub trait PubSub { group: &str, keys: Vec<&str>, active_channel: Option<&str>, + preserve_sequence: bool, ) -> impl Future> + Send; + fn trim(&mut self, channel: &str, key: &str) -> impl Future> + Send; + fn messages( &mut self, channel: &str, group: &str, - keys: Option>, + consumer: &str, + keys: Option<&str>, max_messages: usize, + preserve_sequence: bool, + ) -> impl Future>> + Send; + + fn get_messages_before( + &mut self, + channel: &str, + id: &str, + preserve_sequence: bool, ) -> impl Future>> + Send; fn get_messages_from( &mut self, channel: &str, id: &str, + preserve_sequence: bool, ) -> impl Future>> + Send; fn last_message( &mut self, channel: &str, + preserve_sequence: bool, ) -> impl Future> + Send; } diff --git a/quadratic-rust-shared/src/pubsub/redis.rs b/quadratic-rust-shared/src/pubsub/redis.rs index e7e6ddd46a..cc10375302 100644 --- a/quadratic-rust-shared/src/pubsub/redis.rs +++ b/quadratic-rust-shared/src/pubsub/redis.rs @@ -108,16 +108,32 @@ impl super::PubSub for RedisConnection { _group: &str, _keys: Vec<&str>, _active_channel: Option<&str>, + _preserve_sequence: bool, ) -> Result<()> { unimplemented!() } + async fn trim(&mut self, _channel: &str, _key: &str) -> Result { + unimplemented!() + } + async fn messages( &mut self, _channel: &str, _group: &str, - _keys: Option>, + _consumer: &str, + _keys: Option<&str>, _max_messages: usize, + _preserve_sequence: bool, + ) -> Result> { + unimplemented!() + } + + async fn get_messages_before( + &mut self, + _channel: &str, + _id: &str, + _preserve_sequence: bool, ) -> Result> { unimplemented!() } @@ -126,11 +142,16 @@ impl super::PubSub for RedisConnection { &mut self, _channel: &str, _id: &str, + _preserve_sequence: bool, ) -> Result> { unimplemented!() } - async fn last_message(&mut self, _channel: &str) -> Result<(String, String)> { + async fn last_message( + &mut self, + _channel: &str, + _preserve_sequence: bool, + ) -> Result<(String, String)> { unimplemented!() } diff --git a/quadratic-rust-shared/src/pubsub/redis_streams.rs b/quadratic-rust-shared/src/pubsub/redis_streams.rs index 80fe52d558..7aab7ec6c0 100644 --- a/quadratic-rust-shared/src/pubsub/redis_streams.rs +++ b/quadratic-rust-shared/src/pubsub/redis_streams.rs @@ -53,12 +53,18 @@ fn client(config: Config) -> Result { )) } -fn to_key(key: &str) -> String { - format!("{key}-0") +fn to_key(key: &str, preserve_sequence: bool) -> String { + if preserve_sequence { + key.into() + } else { + format!("{key}-0") + } } -fn to_keys(keys: Vec<&str>) -> Vec { - keys.iter().map(|key| to_key(key)).collect::>() +fn to_keys(keys: Vec<&str>, preserve_sequence: bool) -> Vec { + keys.iter() + .map(|key| to_key(key, preserve_sequence)) + .collect::>() } fn from_key(key: &str) -> String { @@ -73,15 +79,21 @@ fn from_value(value: &Value) -> String { } } -fn parse_message(id: &StreamId) -> Message { - let StreamId { id, map: value } = id; - let parsed_id = from_key(id); +fn parse_message(id: &StreamId, preserve_sequence: bool) -> Message { + let StreamId { mut id, map: value } = id.to_owned(); + + if !preserve_sequence { + id = from_key(&id); + } + let message = from_value(value.iter().next().unwrap().1); - (parsed_id.to_string(), message) + (id.to_string(), message) } -fn stream_ids_to_messages(ids: Vec) -> Vec { - ids.iter().map(parse_message).collect::>() +fn stream_ids_to_messages(ids: Vec, preserve_sequence: bool) -> Vec { + ids.iter() + .map(|id| parse_message(id, preserve_sequence)) + .collect::>() } impl super::PubSub for RedisConnection { @@ -193,6 +205,7 @@ impl super::PubSub for RedisConnection { group: &str, keys: Vec<&str>, active_channel: Option<&str>, + preserve_sequence: bool, ) -> Result<()> { if keys.is_empty() { return Err(SharedError::PubSub( @@ -200,7 +213,7 @@ impl super::PubSub for RedisConnection { )); } - let ids = to_keys(keys); + let ids = to_keys(keys, preserve_sequence); self.multiplex .xack::<&str, &str, String, u128>(channel, group, &ids) @@ -214,6 +227,19 @@ impl super::PubSub for RedisConnection { Ok(()) } + /// Trim messages from a channel + async fn trim(&mut self, channel: &str, key: &str) -> Result { + let xtrim = cmd("XTRIM").arg(channel).arg("MINID").arg(key).to_owned(); + let value = self.multiplex.send_packed_command(&xtrim).await?; + + match value { + Value::Int(num) => Ok(num), + _ => Err(SharedError::PubSub( + "Error trimming messages for channel {channel} key {key}".into(), + )), + } + } + /// Get unread messages from a channel. Specify the keys to get messages for, /// or None to get all new messages. /// @@ -225,22 +251,21 @@ impl super::PubSub for RedisConnection { &mut self, channel: &str, group: &str, - keys: Option>, + consumer: &str, + maybe_id: Option<&str>, max_messages: usize, + preserve_sequence: bool, ) -> Result> { - // convert keys to ids, default to all new messages (">") if None - let ids = keys.map_or_else(|| vec![">".to_string()], |keys| to_keys(keys)); - - // redis requires the number of keys to match the number of ids - let keys = vec![channel; ids.len()]; + // convert id, default to all new messages (">") if None + let id = maybe_id.map_or_else(|| ">".into(), |id| to_key(id, preserve_sequence)); let opts = StreamReadOptions::default() .count(max_messages) - .group(group, channel); + .group(group, consumer); let raw_messages: Result = self .multiplex - .xread_options(&keys, &ids, &opts) + .xread_options(&[channel], &[&id], &opts) .await .map_err(|e| { SharedError::PubSub(format!("Error reading messages for channel {channel}: {e}")) @@ -249,21 +274,40 @@ impl super::PubSub for RedisConnection { let messages = raw_messages? .keys .iter() - .flat_map(|StreamKey { key: _key, ids }| ids.iter().map(parse_message)) + .flat_map(|StreamKey { key: _key, ids }| { + ids.iter().map(|id| parse_message(id, preserve_sequence)) + }) .collect::>(); Ok(messages) } + /// Get messages from the beginning of a channel ending at a specific id + async fn get_messages_before( + &mut self, + channel: &str, + id: &str, + preserve_sequence: bool, + ) -> Result> { + let messages: StreamRangeReply = self.multiplex.xrange(channel, "-", id).await?; + + Ok(stream_ids_to_messages(messages.ids, preserve_sequence)) + } + /// Get messages from a channel starting from a specific id - async fn get_messages_from(&mut self, channel: &str, id: &str) -> Result> { + async fn get_messages_from( + &mut self, + channel: &str, + id: &str, + preserve_sequence: bool, + ) -> Result> { let messages: StreamRangeReply = self.multiplex.xrange(channel, id, "+").await?; - Ok(stream_ids_to_messages(messages.ids)) + Ok(stream_ids_to_messages(messages.ids, preserve_sequence)) } /// Get the last message in a channel - async fn last_message(&mut self, channel: &str) -> Result { + async fn last_message(&mut self, channel: &str, preserve_sequence: bool) -> Result { let message: StreamRangeReply = self.multiplex.xrevrange_count(channel, "+", "-", 1).await?; @@ -271,7 +315,7 @@ impl super::PubSub for RedisConnection { SharedError::PubSub("Error getting last message: no messages found".into()) })?; - Ok(parse_message(id)) + Ok(parse_message(id, preserve_sequence)) } } @@ -300,6 +344,7 @@ pub mod tests { let (config, channel) = setup(); let messages = ["test 1", "test 2"]; let group = "group 1"; + let consumer = "consumer 1"; let mut connection = RedisConnection::new(config).await.unwrap(); connection.subscribe(&channel, group).await.unwrap(); @@ -336,7 +381,7 @@ pub mod tests { // get all new messages let results = connection - .messages(&channel, group, None, 10) + .messages(&channel, group, consumer, None, 10, false) .await .unwrap(); @@ -359,7 +404,7 @@ pub mod tests { // acknowledge all messages connection - .ack(&channel, group, ids.clone(), None) + .ack(&channel, group, ids.clone(), None, false) .await .unwrap(); @@ -371,8 +416,9 @@ pub mod tests { // println!("pending: {:?}", pending); + let max_id = ids.into_iter().max().unwrap(); let results = connection - .messages(&channel, group, Some(ids), 10) + .messages(&channel, group, consumer, Some(max_id), 10, false) .await .unwrap(); @@ -397,7 +443,7 @@ pub mod tests { } // get the last message - let results = connection.last_message(&channel).await.unwrap(); + let results = connection.last_message(&channel, false).await.unwrap(); assert_eq!(results, ("2".into(), messages[1].into())); }