diff --git a/.github/workflows/event_pr.yml b/.github/workflows/event_pr.yml index 7c357893..88262313 100644 --- a/.github/workflows/event_pr.yml +++ b/.github/workflows/event_pr.yml @@ -41,6 +41,8 @@ jobs: - uses: actions/checkout@v3 - uses: WalletConnect/actions/github/paths-filter/@2.2.1 id: filter + with: + path-app: . # run CI when tests are changed outputs: infra: ${{ steps.filter.outputs.infra }} app: ${{ steps.filter.outputs.app }} diff --git a/.github/workflows/sub-ci.yml b/.github/workflows/sub-ci.yml index c2db4ba6..450c42d0 100644 --- a/.github/workflows/sub-ci.yml +++ b/.github/workflows/sub-ci.yml @@ -87,4 +87,4 @@ jobs: uses: WalletConnect/actions-rs/cargo@1.1.0 with: command: test - args: --test integration + args: --test integration -- --test-threads=1 diff --git a/Cargo.lock b/Cargo.lock index d187631f..1c3afdac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4179,9 +4179,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.33.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes", @@ -4198,9 +4198,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", diff --git a/justfile b/justfile index 233d0d05..79ce6ee8 100644 --- a/justfile +++ b/justfile @@ -31,7 +31,7 @@ test-all: test-integration: @echo '==> Testing integration' - RUST_BACKTRACE=1 ANSI_LOGS=true cargo test --test integration -- {{test}} + RUST_BACKTRACE=1 ANSI_LOGS=true cargo test --test integration -- {{test}} --test-threads=1 # Clean build artifacts clean: diff --git a/src/metrics.rs b/src/metrics.rs index 59dc3137..72716463 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -35,6 +35,10 @@ pub struct Metrics { relay_outgoing_message_failures: Counter, relay_outgoing_message_latency: Histogram, relay_outgoing_message_publish_latency: Histogram, + relay_subscribes: Counter, + relay_subscribe_failures: Counter, + relay_subscribe_latency: Histogram, + relay_subscribe_request_latency: Histogram, postgres_queries: Counter, postgres_query_latency: Histogram, keys_server_requests: Counter, @@ -112,6 +116,26 @@ impl Metrics { .with_description("The latency publishing relay messages") .init(); + let relay_subscribes: Counter = meter + .u64_counter("relay_subscribes") + .with_description("The number of subscribes to relay topics (not including retries)") + .init(); + + let relay_subscribe_failures: Counter = meter + .u64_counter("relay_subscribe_failures") + .with_description("The number of failures to subscribe to relay topics") + .init(); + + let relay_subscribe_latency: Histogram = meter + .u64_histogram("relay_subscribe_latency") + .with_description("The latency subscribing to relay topics w/ built-in retry") + .init(); + + let relay_subscribe_request_latency: Histogram = meter + .u64_histogram("relay_subscribe_request_latency") + .with_description("The latency subscribing to relay topics") + .init(); + let postgres_queries: Counter = meter .u64_counter("postgres_queries") .with_description("The number of Postgres queries executed") @@ -194,6 +218,10 @@ impl Metrics { relay_outgoing_message_failures, relay_outgoing_message_latency, relay_outgoing_message_publish_latency, + relay_subscribes, + relay_subscribe_failures, + relay_subscribe_latency, + relay_subscribe_request_latency, postgres_queries, postgres_query_latency, keys_server_requests, @@ -286,6 +314,30 @@ impl Metrics { ); } + pub fn relay_subscribe(&self, success: bool, start: Instant) { + let elapsed = start.elapsed(); + + let ctx = Context::current(); + let attributes = [KeyValue::new("success", success.to_string())]; + self.relay_subscribes.add(&ctx, 1, &attributes); + self.relay_subscribe_latency + .record(&ctx, elapsed.as_millis() as u64, &attributes); + } + + pub fn relay_subscribe_failure(&self, is_permenant: bool) { + let ctx = Context::current(); + let attributes = [KeyValue::new("is_permenant", is_permenant.to_string())]; + self.relay_subscribe_failures.add(&ctx, 1, &attributes); + } + + pub fn relay_subscribe_request(&self, start: Instant) { + let elapsed = start.elapsed(); + + let ctx = Context::current(); + self.relay_subscribe_request_latency + .record(&ctx, elapsed.as_millis() as u64, &[]); + } + pub fn postgres_query(&self, query_name: &'static str, start: Instant) { let elapsed = start.elapsed(); diff --git a/src/model/helpers.rs b/src/model/helpers.rs index 2e89cba0..90a783f9 100644 --- a/src/model/helpers.rs +++ b/src/model/helpers.rs @@ -13,7 +13,7 @@ use { ed25519_dalek::SigningKey, relay_rpc::domain::{ProjectId, Topic}, serde::{Deserialize, Serialize}, - sqlx::{FromRow, PgPool, Postgres}, + sqlx::{FromRow, PgExecutor, PgPool, Postgres, Transaction}, std::{collections::HashSet, time::Instant}, tracing::instrument, uuid::Uuid, @@ -28,13 +28,13 @@ pub struct ProjectWithPublicKeys { pub topic: String, } -pub async fn upsert_project( +pub async fn upsert_project<'e>( project_id: ProjectId, app_domain: &str, topic: Topic, authentication_key: &SigningKey, subscribe_key: &StaticSecret, - postgres: &PgPool, + postgres: impl sqlx::PgExecutor<'e>, metrics: Option<&Metrics>, ) -> Result { let authentication_public_key = encode_authentication_public_key(authentication_key); @@ -58,7 +58,7 @@ pub async fn upsert_project( // TODO test idempotency #[allow(clippy::too_many_arguments)] #[instrument(skip(authentication_private_key, subscribe_private_key, postgres, metrics))] -async fn upsert_project_impl( +async fn upsert_project_impl<'e>( project_id: ProjectId, app_domain: &str, topic: Topic, @@ -66,7 +66,7 @@ async fn upsert_project_impl( authentication_private_key: String, subscribe_public_key: String, subscribe_private_key: String, - postgres: &PgPool, + postgres: impl sqlx::PgExecutor<'e>, metrics: Option<&Metrics>, ) -> Result { let query = " @@ -324,18 +324,16 @@ pub struct SubscribeResponse { } // TODO test idempotency -#[instrument(skip(postgres, metrics))] +#[instrument(skip(txn, metrics))] pub async fn upsert_subscriber( project: Uuid, account: AccountId, scope: HashSet, notify_key: &[u8; 32], notify_topic: Topic, - postgres: &PgPool, + txn: &mut Transaction<'_, Postgres>, metrics: Option<&Metrics>, ) -> Result { - let mut txn = postgres.begin().await?; - // `xmax = 0`: https://stackoverflow.com/a/39204667 let query = " @@ -369,9 +367,7 @@ pub async fn upsert_subscriber( metrics.postgres_query("upsert_subscriber", start); } - update_subscriber_scope(subscriber.id, scope, &mut txn, metrics).await?; - - txn.commit().await?; + update_subscriber_scope(subscriber.id, scope, txn, metrics).await?; Ok(subscriber) } @@ -415,7 +411,7 @@ pub async fn update_subscriber( async fn update_subscriber_scope( subscriber: Uuid, scope: HashSet, - txn: &mut sqlx::Transaction<'_, Postgres>, + txn: &mut Transaction<'_, Postgres>, metrics: Option<&Metrics>, ) -> Result<(), sqlx::error::Error> { let query = " diff --git a/src/publish_relay_message.rs b/src/publish_relay_message.rs index 422f328b..7130f929 100644 --- a/src/publish_relay_message.rs +++ b/src/publish_relay_message.rs @@ -1,7 +1,10 @@ use { crate::metrics::Metrics, relay_client::{error::Error, http::Client}, - relay_rpc::rpc::{msg_id::MsgId, Publish}, + relay_rpc::{ + domain::Topic, + rpc::{msg_id::MsgId, Publish}, + }, std::time::{Duration, Instant}, tokio::time::sleep, tracing::{error, instrument, warn}, @@ -79,3 +82,55 @@ pub async fn publish_relay_message( } Ok(()) } + +#[instrument(skip_all)] +pub async fn subscribe_relay_topic( + relay_ws_client: &relay_client::websocket::Client, + topic: &Topic, + metrics: Option<&Metrics>, +) -> Result<(), Error> { + let start = Instant::now(); + + let client_publish_call = || async { + let start = Instant::now(); + let result = relay_ws_client.subscribe_blocking(topic.clone()).await; + if let Some(metrics) = metrics { + metrics.relay_subscribe_request(start); + } + result + }; + + let mut tries = 0; + while let Err(e) = client_publish_call().await { + tries += 1; + let is_permenant = tries >= 10; + if let Some(metrics) = metrics { + metrics.relay_subscribe_failure(is_permenant); + } + + if is_permenant { + error!("Permenant error subscribing to topic {topic}, took {tries} tries: {e:?}"); + + if let Some(metrics) = metrics { + // TODO make DRY with end-of-function call + metrics.relay_subscribe(false, start); + } + return Err(e); + } + + let retry_in = Duration::from_secs(1); + warn!( + "Temporary error subscribing to topic {topic}, retrying attempt {tries} in {retry_in:?}: {e:?}" + ); + sleep(retry_in).await; + } + + if let Some(metrics) = metrics { + metrics.relay_subscribe(true, start); + } + + // Sleep to account for some replication lag. Without this, the subscription may not be active on all nodes + sleep(Duration::from_millis(250)).await; + + Ok(()) +} diff --git a/src/services/public_http_server/handlers/subscribe_topic.rs b/src/services/public_http_server/handlers/subscribe_topic.rs index 7c6ab573..ea90ad49 100644 --- a/src/services/public_http_server/handlers/subscribe_topic.rs +++ b/src/services/public_http_server/handlers/subscribe_topic.rs @@ -2,8 +2,10 @@ use { crate::{ error::NotifyServerError, model::helpers::upsert_project, + publish_relay_message::{publish_relay_message, subscribe_relay_topic}, rate_limit::{self, Clock, RateLimitError}, registry::{extractor::AuthedProjectId, storage::redis::Redis}, + spec::{NOTIFY_NOOP_TAG, NOTIFY_NOOP_TTL}, state::AppState, utils::topic_from_key, }, @@ -17,10 +19,10 @@ use { hyper::StatusCode, once_cell::sync::Lazy, regex::Regex, - relay_rpc::domain::ProjectId, + relay_rpc::{domain::ProjectId, rpc::Publish}, serde::{Deserialize, Serialize}, serde_json::json, - std::sync::Arc, + std::sync::{Arc, OnceLock}, tracing::{info, instrument}, x25519_dalek::{PublicKey, StaticSecret}, }; @@ -79,13 +81,14 @@ pub async fn handler( let authentication_key = ed25519_dalek::SigningKey::generate(&mut OsRng); + let mut txn = state.postgres.begin().await?; let project = upsert_project( project_id, &app_domain, topic.clone(), &authentication_key, &subscribe_key, - &state.postgres, + &mut *txn, state.metrics.as_ref(), ) .await @@ -101,9 +104,31 @@ pub async fn handler( // Don't call subscribe if we are already subscribed in a previous request if project.topic == topic.as_ref() { info!("Subscribing to project topic: {topic}"); - state.relay_ws_client.subscribe(topic).await?; + subscribe_relay_topic(&state.relay_ws_client, &topic, state.metrics.as_ref()).await?; + + // Send noop to extend ttl of relay's mapping + info!("Timing: Publishing noop to notify_topic"); + publish_relay_message( + &state.relay_http_client, + &Publish { + topic, + message: { + // Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| "".into()).clone() + }, + tag: NOTIFY_NOOP_TAG, + ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32, + prompt: false, + }, + state.metrics.as_ref(), + ) + .await?; + info!("Timing: Finished publishing noop to notify_topic"); } + txn.commit().await?; + Ok(Json(SubscribeTopicResponseBody { authentication_key: project.authentication_public_key, subscribe_key: project.subscribe_public_key, diff --git a/src/services/websocket_server/handlers/notify_subscribe.rs b/src/services/websocket_server/handlers/notify_subscribe.rs index 82243c58..2ea3ed39 100644 --- a/src/services/websocket_server/handlers/notify_subscribe.rs +++ b/src/services/websocket_server/handlers/notify_subscribe.rs @@ -7,7 +7,7 @@ use { }, error::NotifyServerError, model::helpers::{get_project_by_topic, get_welcome_notification, upsert_subscriber}, - publish_relay_message::publish_relay_message, + publish_relay_message::{publish_relay_message, subscribe_relay_topic}, rate_limit::{self, Clock, RateLimitError}, registry::storage::redis::Redis, services::{ @@ -153,6 +153,12 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay let scope = parse_scope(&request_auth.scp) .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error? + let mut txn = state + .postgres + .begin() + .await + .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; + let subscriber = { // Technically we don't need to derive based on client_public_key anymore; we just need a symkey. But this is technical // debt from when clients derived the same symkey on their end via Diffie-Hellman. But now they use the value from @@ -169,7 +175,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay scope.clone(), ¬ify_key, notify_topic, - &state.postgres, + &mut txn, state.metrics.as_ref(), ) .await @@ -195,13 +201,40 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay .map_err(RelayMessageServerError::NotifyServerError)?; // TODO change to client error? info!("Timing: Subscribing to notify_topic: {notify_topic}"); - state - .relay_ws_client - .subscribe(notify_topic.clone()) - .await - .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error? + subscribe_relay_topic( + &state.relay_ws_client, + ¬ify_topic, + state.metrics.as_ref(), + ) + .await + .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; info!("Timing: Finished subscribing to topic"); + // Send noop to extend ttl of relay's mapping + info!("Timing: Publishing noop to notify_topic"); + publish_relay_message( + &state.relay_http_client, + &Publish { + topic: notify_topic.clone(), + message: { + // Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| "".into()).clone() + }, + tag: NOTIFY_NOOP_TAG, + ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32, + prompt: false, + }, + state.metrics.as_ref(), + ) + .await + .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error? + info!("Timing: Finished publishing noop to notify_topic"); + + txn.commit() + .await + .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; + info!("Timing: Recording SubscriberUpdateParams"); state.analytics.client(SubscriberUpdateParams { project_pk: project.id, @@ -213,7 +246,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay method: NotifyClientMethod::Subscribe, old_scope: HashSet::new(), new_scope: scope.clone(), - notification_topic: notify_topic.clone(), + notification_topic: notify_topic, topic, }); info!("Timing: Finished recording SubscriberUpdateParams"); @@ -275,27 +308,6 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay info!("Finished publishing subscribe response"); } - // Send noop to extend ttl of relay's mapping - info!("Timing: Publishing noop to notify_topic"); - publish_relay_message( - &state.relay_http_client, - &Publish { - topic: notify_topic, - message: { - // Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime - static LOCK: OnceLock> = OnceLock::new(); - LOCK.get_or_init(|| "".into()).clone() - }, - tag: NOTIFY_NOOP_TAG, - ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32, - prompt: false, - }, - state.metrics.as_ref(), - ) - .await - .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error? - info!("Timing: Finished publishing noop to notify_topic"); - // TODO do in same txn as upsert_subscriber() if subscriber.inserted { let welcome_notification = diff --git a/src/services/websocket_server/handlers/notify_watch_subscriptions.rs b/src/services/websocket_server/handlers/notify_watch_subscriptions.rs index 5c16b5e9..0f62bef1 100644 --- a/src/services/websocket_server/handlers/notify_watch_subscriptions.rs +++ b/src/services/websocket_server/handlers/notify_watch_subscriptions.rs @@ -368,6 +368,7 @@ pub async fn prepare_subscription_watchers( Ok((source_subscriptions, watchers_with_subscriptions)) } +#[instrument(skip_all)] pub async fn send_to_subscription_watchers( watchers_with_subscriptions: Vec<(SubscriptionWatcherQuery, Vec)>, authentication_secret: &ed25519_dalek::SigningKey, diff --git a/terraform/monitoring/dashboard.jsonnet b/terraform/monitoring/dashboard.jsonnet index 97d95aa6..f1daebd5 100644 --- a/terraform/monitoring/dashboard.jsonnet +++ b/terraform/monitoring/dashboard.jsonnet @@ -77,6 +77,10 @@ dashboard.new( panels.app.registry_request_rate(ds, vars) {gridPos: pos._6 }, panels.app.registry_request_latency(ds, vars) {gridPos: pos._6 }, + panels.app.relay_subscribe_rate(ds, vars) {gridPos: pos._6 }, + panels.app.relay_subscribe_latency(ds, vars) {gridPos: pos._6 }, + panels.app.relay_subscribe_failures(ds, vars) {gridPos: pos._6 }, + row.new('Application publisher subservice'), panels.app.publishing_workers_count(ds, vars) {gridPos: pos._5 }, panels.app.publishing_workers_errors(ds, vars) {gridPos: pos._5 }, diff --git a/terraform/monitoring/panels/app/relay_subscribe_failures.libsonnet b/terraform/monitoring/panels/app/relay_subscribe_failures.libsonnet new file mode 100644 index 00000000..1b72b3c4 --- /dev/null +++ b/terraform/monitoring/panels/app/relay_subscribe_failures.libsonnet @@ -0,0 +1,50 @@ +local grafana = import '../../grafonnet-lib/grafana.libsonnet'; +local defaults = import '../../grafonnet-lib/defaults.libsonnet'; + +local panels = grafana.panels; +local targets = grafana.targets; + +{ + new(ds, vars):: + panels.timeseries( + title = 'Relay Subscribe Errors', + datasource = ds.prometheus, + ) + .configure(defaults.configuration.timeseries) + + .setAlert(vars.environment, grafana.alert.new( + namespace = vars.namespace, + name = '%(env)s - Failed to subscribe to relay topic' % { env: vars.environment }, + message = '%(env)s - Failed to subscribe to relay topic' % { env: vars.environment }, + notifications = vars.notifications, + noDataState = 'no_data', + period = '0m', + conditions = [ + grafana.alertCondition.new( + evaluatorParams = [ 0 ], + evaluatorType = 'gt', + operatorType = 'or', + queryRefId = 'RelaySubscribesPermenantFailures', + queryTimeStart = '5m', + queryTimeEnd = 'now', + reducerType = grafana.alert_reducers.Avg + ), + ], + )) + + .addTarget(targets.prometheus( + datasource = ds.prometheus, + expr = 'sum by (aws_ecs_task_revision) (increase(relay_subscribe_failures_total{is_permenant="true"}[$__rate_interval]))', + legendFormat = 'Permenant r{{aws_ecs_task_revision}}', + exemplar = true, + refId = 'RelaySubscribePermenantFailures', + )) + + .addTarget(targets.prometheus( + datasource = ds.prometheus, + expr = 'sum by (aws_ecs_task_revision) (increase(relay_subscribe_failures_total{is_permenant="false"}[$__rate_interval]))', + legendFormat = 'Temporary r{{aws_ecs_task_revision}}', + exemplar = true, + refId = 'RelaySubscribeTemporaryFailures', + )) +} diff --git a/terraform/monitoring/panels/app/relay_subscribe_latency.libsonnet b/terraform/monitoring/panels/app/relay_subscribe_latency.libsonnet new file mode 100644 index 00000000..62d168b0 --- /dev/null +++ b/terraform/monitoring/panels/app/relay_subscribe_latency.libsonnet @@ -0,0 +1,33 @@ +local grafana = import '../../grafonnet-lib/grafana.libsonnet'; +local defaults = import '../../grafonnet-lib/defaults.libsonnet'; + +local panels = grafana.panels; +local targets = grafana.targets; + +{ + new(ds, vars):: + panels.timeseries( + title = 'Relay Subscribe Latency', + datasource = ds.prometheus, + ) + .configure( + defaults.configuration.timeseries + .withUnit('ms') + ) + + .addTarget(targets.prometheus( + datasource = ds.prometheus, + expr = 'sum by (aws_ecs_task_revision) (rate(relay_subscribe_latency_sum[$__rate_interval])) / sum by (aws_ecs_task_revision) (rate(relay_subscribe_latency_count[$__rate_interval]))', + legendFormat = 'Publish w/ retries r{{aws_ecs_task_revision}}', + exemplar = false, + refId = 'RelaySubscribeLatency', + )) + + .addTarget(targets.prometheus( + datasource = ds.prometheus, + expr = 'sum by (aws_ecs_task_revision) (rate(relay_subscribe_request_latency_sum[$__rate_interval])) / sum by (aws_ecs_task_revision) (rate(relay_subscribe_request_latency_count[$__rate_interval]))', + legendFormat = 'Individual RPC r{{aws_ecs_task_revision}}', + exemplar = false, + refId = 'RelaySubscribeRequestLatency', + )) +} diff --git a/terraform/monitoring/panels/app/relay_subscribe_rate.libsonnet b/terraform/monitoring/panels/app/relay_subscribe_rate.libsonnet new file mode 100644 index 00000000..efd9a3f1 --- /dev/null +++ b/terraform/monitoring/panels/app/relay_subscribe_rate.libsonnet @@ -0,0 +1,25 @@ +local grafana = import '../../grafonnet-lib/grafana.libsonnet'; +local defaults = import '../../grafonnet-lib/defaults.libsonnet'; + +local panels = grafana.panels; +local targets = grafana.targets; + +{ + new(ds, vars):: + panels.timeseries( + title = 'Relay Subscribe Rate', + datasource = ds.prometheus, + ) + .configure( + defaults.configuration.timeseries + .withUnit('cps') + ) + + .addTarget(targets.prometheus( + datasource = ds.prometheus, + expr = 'sum by (aws_ecs_task_revision, tag) (rate(relay_subscribes_total[$__rate_interval]))', + legendFormat = '{{tag}} r{{aws_ecs_task_revision}}', + exemplar = true, + refId = 'RelaySubscribesRate', + )) +} diff --git a/terraform/monitoring/panels/panels.libsonnet b/terraform/monitoring/panels/panels.libsonnet index c9c18fb7..5d007224 100644 --- a/terraform/monitoring/panels/panels.libsonnet +++ b/terraform/monitoring/panels/panels.libsonnet @@ -23,6 +23,9 @@ local docdb_mem_threshold = units.size_bin(GiB = docdb_mem * 0.1); relay_outgoing_message_rate: (import 'app/relay_outgoing_message_rate.libsonnet' ).new, relay_outgoing_message_latency: (import 'app/relay_outgoing_message_latency.libsonnet' ).new, relay_outgoing_message_failures: (import 'app/relay_outgoing_message_failures.libsonnet' ).new, + relay_subscribe_rate: (import 'app/relay_subscribe_rate.libsonnet' ).new, + relay_subscribe_latency: (import 'app/relay_subscribe_latency.libsonnet' ).new, + relay_subscribe_failures: (import 'app/relay_subscribe_failures.libsonnet' ).new, postgres_query_rate: (import 'app/postgres_query_rate.libsonnet' ).new, postgres_query_latency: (import 'app/postgres_query_latency.libsonnet' ).new, keys_server_request_rate: (import 'app/keys_server_request_rate.libsonnet' ).new, diff --git a/tests/deployment.rs b/tests/deployment.rs index 65d600e7..93e8cc8b 100644 --- a/tests/deployment.rs +++ b/tests/deployment.rs @@ -1,7 +1,7 @@ use { crate::utils::{ - create_client, encode_auth, generate_account, topic_subscribe, verify_jwt, - UnregisterIdentityRequestAuth, JWT_LEEWAY, + encode_auth, generate_account, verify_jwt, UnregisterIdentityRequestAuth, JWT_LEEWAY, + RELAY_MESSAGE_DELIVERY_TIMEOUT, }, base64::Engine, chacha20poly1305::{ @@ -21,6 +21,7 @@ use { WatchSubscriptionsRequestAuth, WatchSubscriptionsResponseAuth, STATEMENT_THIS_DOMAIN, }, jsonrpc::NotifyPayload, + publish_relay_message::subscribe_relay_topic, services::{ public_http_server::handlers::{ notify_v0::NotifyBody, @@ -52,13 +53,13 @@ use { cacao::{self, signature::Eip191}, ed25519_dalek::Keypair, }, - domain::DecodedClientId, + domain::{DecodedClientId, ProjectId}, rpc::msg_id::get_message_id, }, serde_json::json, sha2::Digest, sha3::Keccak256, - std::{collections::HashSet, env}, + std::{collections::HashSet, env, sync::Arc}, tokio::sync::broadcast::Receiver, url::Url, uuid::Uuid, @@ -150,6 +151,44 @@ struct Vars { keys_server_url: Url, } +pub async fn create_client( + relay_url: Url, + relay_project_id: ProjectId, + notify_url: Url, +) -> ( + Arc, + Receiver, +) { + let (tx, mut rx) = tokio::sync::broadcast::channel(8); + let (mpsc_tx, mut mpsc_rx) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn(async move { + while let Some(event) = mpsc_rx.recv().await { + let _ = tx.send(event); + } + }); + let connection_handler = + notify_server::services::websocket_server::relay_ws_client::RelayConnectionHandler::new( + "notify-client", + mpsc_tx, + ); + let relay_ws_client = Arc::new(relay_client::websocket::Client::new(connection_handler)); + + let keypair = Keypair::generate(&mut StdRng::from_entropy()); + let opts = notify_server::relay_client_helpers::create_ws_connect_options( + &keypair, + relay_url, + notify_url, + relay_project_id, + ) + .unwrap(); + relay_ws_client.connect(&opts).await.unwrap(); + + // Eat up the "connected" message + _ = rx.recv().await.unwrap(); + + (relay_ws_client, rx) +} + #[allow(clippy::too_many_arguments)] async fn watch_subscriptions( vars: &Vars, @@ -282,7 +321,7 @@ async fn watch_subscriptions( .await .unwrap(); - topic_subscribe(relay_ws_client, response_topic.clone()) + subscribe_relay_topic(relay_ws_client, &response_topic, None) .await .unwrap(); @@ -527,8 +566,7 @@ async fn run_test(statement: String, watch_subscriptions_all_domains: bool) { let response_topic = topic_from_key(&response_topic_key); println!("subscription response_topic: {response_topic}"); - // Subscribe to the topic and listen for response - topic_subscribe(relay_ws_client.as_ref(), response_topic.clone()) + subscribe_relay_topic(&relay_ws_client, &response_topic, None) .await .unwrap(); @@ -659,7 +697,7 @@ async fn run_test(statement: String, watch_subscriptions_all_domains: bool) { let notify_topic = topic_from_key(¬ify_key); - topic_subscribe(relay_ws_client.as_ref(), notify_topic.clone()) + subscribe_relay_topic(&relay_ws_client, ¬ify_topic, None) .await .unwrap(); @@ -683,9 +721,6 @@ async fn run_test(statement: String, watch_subscriptions_all_domains: bool) { accounts: vec![account], }; - // wait for notify server to register the user - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - let _res = reqwest::Client::new() .post(format!( "{}/{}/notify", @@ -971,9 +1006,6 @@ async fn run_test(statement: String, watch_subscriptions_all_domains: bool) { assert!(auth.sbs.is_empty()); } - // wait for notify server to unregister the user - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - let resp = reqwest::Client::new() .post(format!( "{}/{}/notify", @@ -1011,7 +1043,7 @@ async fn run_test(statement: String, watch_subscriptions_all_domains: bool) { .await .unwrap(); - if let Ok(resp) = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()).await { + if let Ok(resp) = tokio::time::timeout(RELAY_MESSAGE_DELIVERY_TIMEOUT, rx.recv()).await { let resp = resp.unwrap(); let RelayClientEvent::Message(msg) = resp else { panic!("Expected message, got {:?}", resp); diff --git a/tests/integration.rs b/tests/integration.rs index f9e5e48b..6579b06c 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,7 +1,7 @@ use { crate::utils::{ - encode_auth, format_eip155_account, generate_eoa, topic_subscribe, verify_jwt, - UnregisterIdentityRequestAuth, JWT_LEEWAY, + encode_auth, format_eip155_account, generate_eoa, verify_jwt, RelayClient, + UnregisterIdentityRequestAuth, JWT_LEEWAY, RELAY_MESSAGE_DELIVERY_TIMEOUT, }, async_trait::async_trait, base64::{engine::general_purpose::STANDARD as BASE64, Engine}, @@ -70,9 +70,9 @@ use { types::SubscriberNotificationStatus, }, websocket_server::{ - decode_key, derive_key, relay_ws_client::RelayClientEvent, AuthMessage, - NotifyDelete, NotifyRequest, NotifyResponse, NotifySubscribe, - NotifySubscriptionsChanged, NotifyUpdate, NotifyWatchSubscriptions, ResponseAuth, + decode_key, derive_key, AuthMessage, NotifyDelete, NotifyRequest, NotifyResponse, + NotifySubscribe, NotifySubscriptionsChanged, NotifyUpdate, + NotifyWatchSubscriptions, ResponseAuth, }, }, spec::{ @@ -99,7 +99,6 @@ use { rand::rngs::StdRng, rand_chacha::rand_core::OsRng, rand_core::SeedableRng, - relay_client::websocket::{Client, PublishedMessage}, relay_rpc::{ auth::{ cacao::{ @@ -111,6 +110,7 @@ use { ed25519_dalek::Keypair, }, domain::{DecodedClientId, ProjectId, Topic}, + rpc::SubscriptionData, }, reqwest::Response, serde::de::DeserializeOwned, @@ -133,13 +133,12 @@ use { test_context::{test_context, AsyncTestContext}, tokio::{ net::{TcpListener, ToSocketAddrs}, - sync::{broadcast, broadcast::Receiver}, + sync::broadcast, time::error::Elapsed, }, - tracing::info, tracing_subscriber::fmt::format::FmtSpan, url::Url, - utils::{create_client, generate_account}, + utils::generate_account, uuid::Uuid, wiremock::{ http::Method, @@ -156,7 +155,7 @@ mod utils; // The only variable that's needed is a valid relay project ID because the relay is not mocked. // The registry is mocked out, so any project ID or notify secret is valid and are generated randomly in these tests. -// The staging relay will always be used, to avoid unnecessary load on prod relay. +// The prod relay will always be used, to allow tests to run longer than 1 minute and enabling debugging with data lake // The localhost Postgres will always be used. This is valid in both docker-compose.storage and GitHub CI. // TODO make these DRY with local configuration defaults @@ -165,7 +164,7 @@ fn get_vars() -> Vars { project_id: env::var("PROJECT_ID").unwrap(), // No use-case to modify these currently. - relay_url: "wss://staging.relay.walletconnect.com".to_owned(), + relay_url: "wss://relay.walletconnect.com".to_owned(), } } @@ -1237,16 +1236,14 @@ async fn test_notify_v0(notify_server: &NotifyServerContext) { .unwrap(); let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), ) .await; - topic_subscribe(relay_ws_client.as_ref(), notify_topic) - .await - .unwrap(); + relay_client.subscribe(notify_topic).await; let notification = Notification { r#type: notification_type, @@ -1279,12 +1276,12 @@ async fn test_notify_v0(notify_server: &NotifyServerContext) { .await; let (_, claims) = accept_notify_message( + &mut relay_client, &account, &authentication_key.verifying_key(), &get_client_id(&authentication_key.verifying_key()), &app_domain, ¬ify_key, - &mut rx, ) .await; @@ -1336,16 +1333,14 @@ async fn test_notify_v1(notify_server: &NotifyServerContext) { .unwrap(); let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), ) .await; - topic_subscribe(relay_ws_client.as_ref(), notify_topic) - .await - .unwrap(); + relay_client.subscribe(notify_topic).await; let notification = Notification { r#type: notification_type, @@ -1385,12 +1380,12 @@ async fn test_notify_v1(notify_server: &NotifyServerContext) { assert_eq!(response.sent, HashSet::from([account.clone()])); let (_, claims) = accept_notify_message( + &mut relay_client, &account, &authentication_key.verifying_key(), &get_client_id(&authentication_key.verifying_key()), &app_domain, ¬ify_key, - &mut rx, ) .await; @@ -2601,7 +2596,7 @@ enum TopicEncrptionScheme { } async fn publish_watch_subscriptions_request( - relay_ws_client: &Client, + relay_client: &mut RelayClient, account: &AccountId, client_id: &DecodedClientId, identity_key_details: &IdentityKeyDetails, @@ -2609,7 +2604,7 @@ async fn publish_watch_subscriptions_request( app: Option, ) { publish_jwt_message( - relay_ws_client, + relay_client, client_id, identity_key_details, &TopicEncrptionScheme::Asymetric(encryption_details), @@ -2640,7 +2635,7 @@ async fn publish_watch_subscriptions_request( #[allow(clippy::too_many_arguments)] async fn publish_subscribe_request( - relay_ws_client: &Client, + relay_client: &mut RelayClient, did_pkh: String, client_id: &DecodedClientId, identity_key_details: &IdentityKeyDetails, @@ -2650,7 +2645,7 @@ async fn publish_subscribe_request( mjv: String, ) { publish_jwt_message( - relay_ws_client, + relay_client, client_id, identity_key_details, &TopicEncrptionScheme::Asymetric(encryption_details), @@ -2684,18 +2679,9 @@ async fn publish_subscribe_request( .await } -async fn accept_message(rx: &mut Receiver) -> PublishedMessage { - let event = rx.recv().await.unwrap(); - match event { - RelayClientEvent::Message(msg) => msg, - e => panic!("Expected message, got {e:?}"), - } -} - #[allow(clippy::too_many_arguments)] async fn subscribe( - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, + relay_client: &mut RelayClient, account: &AccountId, identity_key_details: &IdentityKeyDetails, app_key_agreement_key: x25519_dalek::PublicKey, @@ -2704,8 +2690,7 @@ async fn subscribe( notification_types: HashSet, ) { let _subs = subscribe_with_mjv( - relay_ws_client, - rx, + relay_client, account, identity_key_details, app_key_agreement_key, @@ -2719,8 +2704,7 @@ async fn subscribe( #[allow(clippy::too_many_arguments)] async fn subscribe_v1( - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, + relay_client: &mut RelayClient, account: &AccountId, identity_key_details: &IdentityKeyDetails, app_key_agreement_key: x25519_dalek::PublicKey, @@ -2729,8 +2713,7 @@ async fn subscribe_v1( notification_types: HashSet, ) -> Vec { subscribe_with_mjv( - relay_ws_client, - rx, + relay_client, account, identity_key_details, app_key_agreement_key, @@ -2744,8 +2727,7 @@ async fn subscribe_v1( #[allow(clippy::too_many_arguments)] async fn subscribe_with_mjv( - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, + relay_client: &mut RelayClient, account: &AccountId, identity_key_details: &IdentityKeyDetails, app_key_agreement_key: x25519_dalek::PublicKey, @@ -2760,7 +2742,7 @@ async fn subscribe_with_mjv( let response_topic = topic_from_key(&response_topic_key); publish_subscribe_request( - relay_ws_client, + relay_client, account.to_did_pkh(), app_client_id, identity_key_details, @@ -2778,22 +2760,11 @@ async fn subscribe_with_mjv( // https://walletconnect.slack.com/archives/C03SMNKLPU0/p1704449850496039?thread_ts=1703984667.223199&cid=C03SMNKLPU0 tokio::time::sleep(std::time::Duration::from_secs(1)).await; - topic_subscribe(relay_ws_client, response_topic.clone()) - .await - .unwrap(); + relay_client.subscribe(response_topic.clone()).await; - let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { - loop { - let msg = accept_message(rx).await; - if msg.tag == NOTIFY_SUBSCRIBE_RESPONSE_TAG && msg.topic == response_topic { - return msg; - } else { - info!("subscribe: ignored message with tag: {}", msg.tag); - } - } - }) - .await - .unwrap(); + let msg = relay_client + .accept_message(NOTIFY_SUBSCRIBE_RESPONSE_TAG, &response_topic) + .await; let (_id, auth) = decode_response_message::(msg, &response_topic_key); assert_eq!(auth.shared_claims.act, NOTIFY_SUBSCRIBE_RESPONSE_ACT); @@ -2809,7 +2780,7 @@ async fn subscribe_with_mjv( #[allow(clippy::too_many_arguments)] async fn publish_jwt_message( - relay_ws_client: &Client, + relay_client: &mut RelayClient, client_id: &DecodedClientId, identity_key_details: &IdentityKeyDetails, encryption_details: &TopicEncrptionScheme, @@ -2876,13 +2847,10 @@ async fn publish_jwt_message( let message = BASE64.encode(envelope); - relay_ws_client - .publish(topic, message, tag, ttl, false) - .await - .unwrap(); + relay_client.publish(topic, message, tag, ttl).await; } -fn decode_message(msg: PublishedMessage, key: &[u8; 32]) -> T +fn decode_message(msg: SubscriptionData, key: &[u8; 32]) -> T where T: DeserializeOwned, { @@ -2895,7 +2863,7 @@ where serde_json::from_slice::(&decrypted_response).unwrap() } -fn decode_response_message(msg: PublishedMessage, key: &[u8; 32]) -> (u64, T) +fn decode_response_message(msg: SubscriptionData, key: &[u8; 32]) -> (u64, T) where T: GetSharedClaims + DeserializeOwned, { @@ -2906,7 +2874,7 @@ where ) } -fn decode_auth_message(msg: PublishedMessage, key: &[u8; 32]) -> (u64, T) +fn decode_auth_message(msg: SubscriptionData, key: &[u8; 32]) -> (u64, T) where T: GetSharedClaims + DeserializeOwned, { @@ -2916,12 +2884,11 @@ where #[allow(clippy::too_many_arguments)] async fn watch_subscriptions( + relay_client: &mut RelayClient, notify_server_url: Url, identity_key_details: &IdentityKeyDetails, app_domain: Option, account: &AccountId, - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, ) -> (Vec, [u8; 32], DecodedClientId) { let (key_agreement_key, client_id) = get_notify_did_json(¬ify_server_url).await; @@ -2932,7 +2899,7 @@ async fn watch_subscriptions( let response_topic = topic_from_key(&response_topic_key); publish_watch_subscriptions_request( - relay_ws_client, + relay_client, account, &client_id, identity_key_details, @@ -2945,22 +2912,11 @@ async fn watch_subscriptions( ) .await; - topic_subscribe(relay_ws_client, response_topic.clone()) - .await - .unwrap(); + relay_client.subscribe(response_topic.clone()).await; - let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { - loop { - let msg = accept_message(rx).await; - if msg.tag == NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG && msg.topic == response_topic { - return msg; - } else { - info!("watch_subscriptions: ignored message with tag: {}", msg.tag); - } - } - }) - .await - .unwrap(); + let msg = relay_client + .accept_message(NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG, &response_topic) + .await; let (_id, auth) = decode_response_message::(msg, &response_topic_key); @@ -2979,7 +2935,7 @@ async fn watch_subscriptions( } async fn publish_subscriptions_changed_response( - relay_ws_client: &Client, + relay_client: &mut RelayClient, did_pkh: &AccountId, client_id: &DecodedClientId, identity_key_details: &IdentityKeyDetails, @@ -2987,7 +2943,7 @@ async fn publish_subscriptions_changed_response( id: u64, ) { publish_jwt_message( - relay_ws_client, + relay_client, client_id, identity_key_details, &TopicEncrptionScheme::Symetric(sym_key), @@ -3017,30 +2973,18 @@ async fn publish_subscriptions_changed_response( #[allow(clippy::too_many_arguments)] async fn accept_watch_subscriptions_changed( + relay_client: &mut RelayClient, notify_server_client_id: &DecodedClientId, identity_key_details: &IdentityKeyDetails, account: &AccountId, watch_topic_key: [u8; 32], - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, ) -> Vec { - let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { - loop { - let msg = accept_message(rx).await; - if msg.tag == NOTIFY_SUBSCRIPTIONS_CHANGED_TAG - && msg.topic == topic_from_key(&watch_topic_key) - { - return msg; - } else { - info!( - "accept_watch_subscriptions_changed: ignored message with tag: {}", - msg.tag - ); - } - } - }) - .await - .unwrap(); + let msg = relay_client + .accept_message( + NOTIFY_SUBSCRIPTIONS_CHANGED_TAG, + &topic_from_key(&watch_topic_key), + ) + .await; let request = decode_message::>(msg, &watch_topic_key); @@ -3059,7 +3003,7 @@ async fn accept_watch_subscriptions_changed( assert_eq!(auth.sub, account.to_did_pkh()); publish_subscriptions_changed_response( - relay_ws_client, + relay_client, account, notify_server_client_id, identity_key_details, @@ -3072,7 +3016,7 @@ async fn accept_watch_subscriptions_changed( } async fn publish_notify_message_response( - relay_ws_client: &Client, + relay_client: &mut RelayClient, account: &AccountId, app_client_id: &DecodedClientId, did_web: DidWeb, @@ -3081,7 +3025,7 @@ async fn publish_notify_message_response( id: u64, ) { publish_jwt_message( - relay_ws_client, + relay_client, app_client_id, identity_key_details, &TopicEncrptionScheme::Symetric(sym_key), @@ -3112,28 +3056,16 @@ async fn publish_notify_message_response( #[allow(clippy::too_many_arguments)] async fn accept_notify_message( + client: &mut RelayClient, account: &AccountId, app_authentication: &VerifyingKey, app_client_id: &DecodedClientId, app_domain: &DidWeb, notify_key: &[u8; 32], - rx: &mut Receiver, ) -> (u64, NotifyMessage) { - let msg = tokio::time::timeout(std::time::Duration::from_secs(15), async { - loop { - let msg = accept_message(rx).await; - if msg.tag == NOTIFY_MESSAGE_TAG && msg.topic == topic_from_key(notify_key) { - return msg; - } else { - info!( - "accept_notify_message: ignored message with tag: {}", - msg.tag - ); - } - } - }) - .await - .unwrap(); + let msg = client + .accept_message(NOTIFY_MESSAGE_TAG, &topic_from_key(notify_key)) + .await; let request = decode_message::>(msg, notify_key); assert_eq!(request.method, NOTIFY_MESSAGE_METHOD); @@ -3156,27 +3088,26 @@ async fn accept_notify_message( #[allow(clippy::too_many_arguments)] async fn accept_and_respond_to_notify_message( + relay_client: &mut RelayClient, identity_key_details: &IdentityKeyDetails, account: &AccountId, app_authentication: &VerifyingKey, app_client_id: &DecodedClientId, app_domain: DidWeb, notify_key: [u8; 32], - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, ) -> NotifyMessage { let (request_id, claims) = accept_notify_message( + relay_client, account, app_authentication, app_client_id, &app_domain, ¬ify_key, - rx, ) .await; publish_notify_message_response( - relay_ws_client, + relay_client, account, app_client_id, app_domain, @@ -3191,7 +3122,7 @@ async fn accept_and_respond_to_notify_message( #[allow(clippy::too_many_arguments)] async fn publish_update_request( - relay_ws_client: &Client, + relay_client: &mut RelayClient, account: &AccountId, client_id: &DecodedClientId, identity_key_details: &IdentityKeyDetails, @@ -3201,7 +3132,7 @@ async fn publish_update_request( mjv: String, ) { publish_jwt_message( - relay_ws_client, + relay_client, client_id, identity_key_details, &TopicEncrptionScheme::Symetric(sym_key), @@ -3233,8 +3164,7 @@ async fn publish_update_request( #[allow(clippy::too_many_arguments)] async fn update( - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, + relay_client: &mut RelayClient, account: &AccountId, identity_key_details: &IdentityKeyDetails, app: &DidWeb, @@ -3243,8 +3173,7 @@ async fn update( notification_types: &HashSet, ) { let _subs = update_with_mjv( - relay_ws_client, - rx, + relay_client, account, identity_key_details, app, @@ -3258,8 +3187,7 @@ async fn update( #[allow(clippy::too_many_arguments)] async fn update_v1( - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, + relay_client: &mut RelayClient, account: &AccountId, identity_key_details: &IdentityKeyDetails, app: &DidWeb, @@ -3268,8 +3196,7 @@ async fn update_v1( notification_types: &HashSet, ) -> Vec { update_with_mjv( - relay_ws_client, - rx, + relay_client, account, identity_key_details, app, @@ -3283,8 +3210,7 @@ async fn update_v1( #[allow(clippy::too_many_arguments)] async fn update_with_mjv( - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, + relay_client: &mut RelayClient, account: &AccountId, identity_key_details: &IdentityKeyDetails, app: &DidWeb, @@ -3294,7 +3220,7 @@ async fn update_with_mjv( mjv: String, ) -> Vec { publish_update_request( - relay_ws_client, + relay_client, account, app_client_id, identity_key_details, @@ -3305,19 +3231,9 @@ async fn update_with_mjv( ) .await; - let response_topic = topic_from_key(¬ify_key); - let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { - loop { - let msg = accept_message(rx).await; - if msg.tag == NOTIFY_UPDATE_RESPONSE_TAG && msg.topic == response_topic { - return msg; - } else { - info!("update: ignored message with tag: {}", msg.tag); - } - } - }) - .await - .unwrap(); + let msg = relay_client + .accept_message(NOTIFY_UPDATE_RESPONSE_TAG, &topic_from_key(¬ify_key)) + .await; let (_id, auth) = decode_response_message::(msg, ¬ify_key); assert_eq!(auth.shared_claims.act, NOTIFY_UPDATE_RESPONSE_ACT); @@ -3333,7 +3249,7 @@ async fn update_with_mjv( } async fn publish_delete_request( - relay_ws_client: &Client, + relay_client: &mut RelayClient, account: &AccountId, client_id: &DecodedClientId, identity_key_details: &IdentityKeyDetails, @@ -3342,7 +3258,7 @@ async fn publish_delete_request( mjv: String, ) { publish_jwt_message( - relay_ws_client, + relay_client, client_id, identity_key_details, &TopicEncrptionScheme::Symetric(sym_key), @@ -3373,22 +3289,20 @@ async fn publish_delete_request( #[allow(clippy::too_many_arguments)] async fn delete( + relay_client: &mut RelayClient, identity_key_details: &IdentityKeyDetails, app: &DidWeb, app_client_id: &DecodedClientId, account: &AccountId, notify_key: [u8; 32], - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, ) { let _subs = delete_with_mjv( + relay_client, identity_key_details, app, app_client_id, account, notify_key, - relay_ws_client, - rx, "0".to_owned(), ) .await; @@ -3396,22 +3310,20 @@ async fn delete( #[allow(clippy::too_many_arguments)] async fn delete_v1( + relay_client: &mut RelayClient, identity_key_details: &IdentityKeyDetails, app: &DidWeb, app_client_id: &DecodedClientId, account: &AccountId, notify_key: [u8; 32], - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, ) -> Vec { delete_with_mjv( + relay_client, identity_key_details, app, app_client_id, account, notify_key, - relay_ws_client, - rx, "1".to_owned(), ) .await @@ -3419,17 +3331,16 @@ async fn delete_v1( #[allow(clippy::too_many_arguments)] async fn delete_with_mjv( + relay_client: &mut RelayClient, identity_key_details: &IdentityKeyDetails, app: &DidWeb, app_client_id: &DecodedClientId, account: &AccountId, notify_key: [u8; 32], - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, mjv: String, ) -> Vec { publish_delete_request( - relay_ws_client, + relay_client, account, app_client_id, identity_key_details, @@ -3439,19 +3350,9 @@ async fn delete_with_mjv( ) .await; - let response_topic = topic_from_key(¬ify_key); - let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { - loop { - let msg = accept_message(rx).await; - if msg.tag == NOTIFY_DELETE_RESPONSE_TAG && msg.topic == response_topic { - return msg; - } else { - info!("delete: ignored message with tag: {}", msg.tag); - } - } - }) - .await - .unwrap(); + let msg = relay_client + .accept_message(NOTIFY_DELETE_RESPONSE_TAG, &topic_from_key(¬ify_key)) + .await; let (_id, auth) = decode_response_message::(msg, ¬ify_key); assert_eq!(auth.shared_claims.act, NOTIFY_DELETE_RESPONSE_ACT); @@ -3467,7 +3368,7 @@ async fn delete_with_mjv( } async fn publish_get_notifications_request( - relay_ws_client: &Client, + relay_client: &mut RelayClient, account: &AccountId, client_id: &DecodedClientId, identity_key_details: &IdentityKeyDetails, @@ -3476,7 +3377,7 @@ async fn publish_get_notifications_request( params: GetNotificationsParams, ) { publish_jwt_message( - relay_ws_client, + relay_client, client_id, identity_key_details, &TopicEncrptionScheme::Symetric(sym_key), @@ -3508,8 +3409,7 @@ async fn publish_get_notifications_request( #[allow(clippy::too_many_arguments)] async fn get_notifications( - relay_ws_client: &relay_client::websocket::Client, - rx: &mut Receiver, + relay_client: &mut RelayClient, account: &AccountId, identity_key_details: &IdentityKeyDetails, app: &DidWeb, @@ -3518,7 +3418,7 @@ async fn get_notifications( params: GetNotificationsParams, ) -> GetNotificationsResult { publish_get_notifications_request( - relay_ws_client, + relay_client, account, app_client_id, identity_key_details, @@ -3528,19 +3428,12 @@ async fn get_notifications( ) .await; - let response_topic = topic_from_key(¬ify_key); - let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { - loop { - let msg = accept_message(rx).await; - if msg.tag == NOTIFY_GET_NOTIFICATIONS_RESPONSE_TAG && msg.topic == response_topic { - return msg; - } else { - info!("get_notifications: ignored message with tag: {}", msg.tag); - } - } - }) - .await - .unwrap(); + let msg = relay_client + .accept_message( + NOTIFY_GET_NOTIFICATIONS_RESPONSE_TAG, + &topic_from_key(¬ify_key), + ) + .await; let (_id, auth) = decode_auth_message::(msg, ¬ify_key); @@ -3712,7 +3605,7 @@ async fn update_subscription(notify_server: &NotifyServerContext) { .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -3723,22 +3616,20 @@ async fn update_subscription(notify_server: &NotifyServerContext) { subscribe_topic(&project_id, app_domain.clone(), ¬ify_server.url).await; let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details, Some(app_domain.clone()), &account, - &relay_ws_client, - &mut rx, ) .await; assert!(subs.is_empty()); // Subscribe with 1 type let notification_types = HashSet::from([Uuid::new_v4()]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement, @@ -3748,12 +3639,11 @@ async fn update_subscription(notify_server: &NotifyServerContext) { ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -3762,16 +3652,13 @@ async fn update_subscription(notify_server: &NotifyServerContext) { let notify_key = decode_key(&sub.sym_key).unwrap(); - topic_subscribe(relay_ws_client.as_ref(), topic_from_key(¬ify_key)) - .await - .unwrap(); + relay_client.subscribe(topic_from_key(¬ify_key)).await; // Update to 0 types let notification_types = HashSet::from([]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); update( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, &app_domain, @@ -3781,12 +3668,11 @@ async fn update_subscription(notify_server: &NotifyServerContext) { ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -3794,10 +3680,9 @@ async fn update_subscription(notify_server: &NotifyServerContext) { // Update to 2 types let notification_types = HashSet::from([Uuid::new_v4(), Uuid::new_v4()]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); update( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, &app_domain, @@ -3807,12 +3692,11 @@ async fn update_subscription(notify_server: &NotifyServerContext) { ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -3852,7 +3736,7 @@ async fn sends_noop(notify_server: &NotifyServerContext) { .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -3863,22 +3747,20 @@ async fn sends_noop(notify_server: &NotifyServerContext) { subscribe_topic(&project_id, app_domain.clone(), ¬ify_server.url).await; let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details, Some(app_domain.clone()), &account, - &relay_ws_client, - &mut rx, ) .await; assert!(subs.is_empty()); let notification_type = Uuid::new_v4(); let notification_types = HashSet::from([notification_type]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement, @@ -3889,12 +3771,11 @@ async fn sends_noop(notify_server: &NotifyServerContext) { .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -3904,20 +3785,11 @@ async fn sends_noop(notify_server: &NotifyServerContext) { let notify_key = decode_key(&sub.sym_key).unwrap(); let notify_topic = topic_from_key(¬ify_key); - topic_subscribe(relay_ws_client.as_ref(), notify_topic.clone()) - .await - .unwrap(); + relay_client.subscribe(notify_topic.clone()).await; - let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { - loop { - let msg = accept_message(&mut rx).await; - if msg.tag == NOTIFY_NOOP_TAG && msg.topic == notify_topic { - return msg; - } - } - }) - .await - .unwrap(); + let msg = relay_client + .accept_message(NOTIFY_NOOP_TAG, ¬ify_topic) + .await; assert_eq!(msg.message.as_ref(), ""); } @@ -3954,7 +3826,7 @@ async fn delete_subscription(notify_server: &NotifyServerContext) { .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -3965,22 +3837,20 @@ async fn delete_subscription(notify_server: &NotifyServerContext) { subscribe_topic(&project_id, app_domain.clone(), ¬ify_server.url).await; let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details, Some(app_domain.clone()), &account, - &relay_ws_client, - &mut rx, ) .await; assert!(subs.is_empty()); let notification_type = Uuid::new_v4(); let notification_types = HashSet::from([notification_type]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement, @@ -3991,12 +3861,11 @@ async fn delete_subscription(notify_server: &NotifyServerContext) { .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -4005,9 +3874,7 @@ async fn delete_subscription(notify_server: &NotifyServerContext) { let notify_key = decode_key(&sub.sym_key).unwrap(); - topic_subscribe(relay_ws_client.as_ref(), topic_from_key(¬ify_key)) - .await - .unwrap(); + relay_client.subscribe(topic_from_key(¬ify_key)).await; let notification = Notification { r#type: notification_type, @@ -4040,14 +3907,13 @@ async fn delete_subscription(notify_server: &NotifyServerContext) { .await; let claims = accept_and_respond_to_notify_message( + &mut relay_client, &identity_key_details, &account, &authentication, &client_id, app_domain.clone(), notify_key, - &relay_ws_client, - &mut rx, ) .await; @@ -4057,25 +3923,23 @@ async fn delete_subscription(notify_server: &NotifyServerContext) { assert_eq!(claims.msg.icon, "icon"); assert_eq!(claims.msg.url, "url"); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); delete( + &mut relay_client, &identity_key_details, &app_domain, &client_id, &account, notify_key, - &relay_ws_client, - &mut rx, ) .await; let sbs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert!(sbs.is_empty()); @@ -4164,7 +4028,7 @@ async fn all_domains_works(notify_server: &NotifyServerContext) { subscribe_topic(&project_id2, app_domain2.clone(), ¬ify_server.url).await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -4172,22 +4036,20 @@ async fn all_domains_works(notify_server: &NotifyServerContext) { .await; let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details, None, &account, - &relay_ws_client, - &mut rx, ) .await; assert!(subs.is_empty()); let notification_type1 = Uuid::new_v4(); let notification_types1 = HashSet::from([notification_type1, Uuid::new_v4()]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement1, @@ -4197,12 +4059,11 @@ async fn all_domains_works(notify_server: &NotifyServerContext) { ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -4220,10 +4081,9 @@ async fn all_domains_works(notify_server: &NotifyServerContext) { let notification_type2 = Uuid::new_v4(); let notification_types2 = HashSet::from([notification_type2, Uuid::new_v4()]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement2, @@ -4233,12 +4093,11 @@ async fn all_domains_works(notify_server: &NotifyServerContext) { ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 2); @@ -4312,7 +4171,7 @@ async fn this_domain_only(notify_server: &NotifyServerContext) { subscribe_topic(&project_id2, app_domain2.clone(), ¬ify_server.url).await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -4320,22 +4179,20 @@ async fn this_domain_only(notify_server: &NotifyServerContext) { .await; let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details, Some(app_domain1.clone()), &account, - &relay_ws_client, - &mut rx, ) .await; assert!(subs.is_empty()); let notification_type1 = Uuid::new_v4(); let notification_types1 = HashSet::from([notification_type1, Uuid::new_v4()]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement1, @@ -4345,12 +4202,11 @@ async fn this_domain_only(notify_server: &NotifyServerContext) { ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -4360,12 +4216,11 @@ async fn this_domain_only(notify_server: &NotifyServerContext) { let notification_type2 = Uuid::new_v4(); let notification_types2 = HashSet::from([notification_type2, Uuid::new_v4()]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); let result = tokio::time::timeout( - std::time::Duration::from_secs(1), + RELAY_MESSAGE_DELIVERY_TIMEOUT / 2, subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement2, @@ -4377,14 +4232,13 @@ async fn this_domain_only(notify_server: &NotifyServerContext) { .await; assert!(result.is_err()); let result = tokio::time::timeout( - std::time::Duration::from_secs(1), + RELAY_MESSAGE_DELIVERY_TIMEOUT / 2, accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ), ) .await; @@ -4438,7 +4292,7 @@ async fn works_with_staging_keys_server(notify_server: &NotifyServerContext) { .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -4446,12 +4300,11 @@ async fn works_with_staging_keys_server(notify_server: &NotifyServerContext) { .await; let (_subs, _watch_topic_key, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details, Some(app_domain), &account, - &relay_ws_client, - &mut rx, ) .await; @@ -4467,8 +4320,7 @@ async fn works_with_staging_keys_server(notify_server: &NotifyServerContext) { async fn setup_project_and_watch( notify_server_url: Url, ) -> ( - Arc, - Receiver, + RelayClient, AccountId, IdentityKeyDetails, ProjectId, @@ -4509,7 +4361,7 @@ async fn setup_project_and_watch( .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server_url.clone(), @@ -4520,19 +4372,17 @@ async fn setup_project_and_watch( subscribe_topic(&project_id, app_domain.clone(), ¬ify_server_url).await; let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server_url, &identity_key_details, Some(app_domain.clone()), &account, - &relay_ws_client, - &mut rx, ) .await; assert!(subs.is_empty()); ( - relay_ws_client, - rx, + relay_client, account, identity_key_details, project_id, @@ -4547,8 +4397,7 @@ async fn setup_project_and_watch( #[allow(clippy::too_many_arguments)] async fn subscribe_to_notifications( - relay_ws_client: &Arc, - rx: &mut Receiver, + relay_client: &mut RelayClient, account: &AccountId, identity_key_details: &IdentityKeyDetails, app_domain: DidWeb, @@ -4558,10 +4407,9 @@ async fn subscribe_to_notifications( watch_topic_key: [u8; 32], notification_types: HashSet, ) -> [u8; 32] { - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - relay_ws_client, - rx, + relay_client, account, identity_key_details, app_key_agreement_key, @@ -4571,12 +4419,11 @@ async fn subscribe_to_notifications( ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, notify_server_client_id, identity_key_details, account, watch_topic_key, - relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -4584,9 +4431,7 @@ async fn subscribe_to_notifications( let notify_key = decode_key(&sub.sym_key).unwrap(); - topic_subscribe(relay_ws_client.as_ref(), topic_from_key(¬ify_key)) - .await - .unwrap(); + relay_client.subscribe(topic_from_key(¬ify_key)).await; notify_key } @@ -4595,8 +4440,7 @@ async fn setup_subscription( notify_server_url: Url, notification_types: HashSet, ) -> ( - Arc, - Receiver, + RelayClient, AccountId, IdentityKeyDetails, ProjectId, @@ -4605,8 +4449,7 @@ async fn setup_subscription( [u8; 32], ) { let ( - relay_ws_client, - mut rx, + mut relay_client, account, identity_key_details, project_id, @@ -4619,8 +4462,7 @@ async fn setup_subscription( ) = setup_project_and_watch(notify_server_url).await; let notify_key = subscribe_to_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, app_domain.clone(), @@ -4633,8 +4475,7 @@ async fn setup_subscription( .await; ( - relay_ws_client, - rx, + relay_client, account, identity_key_details, project_id, @@ -4648,8 +4489,7 @@ async fn setup_subscription( #[tokio::test] async fn e2e_get_notifications_has_none(notify_server: &NotifyServerContext) { let ( - relay_ws_client, - mut rx, + mut relay_client, account, identity_key_details, _project_id, @@ -4659,8 +4499,7 @@ async fn e2e_get_notifications_has_none(notify_server: &NotifyServerContext) { ) = setup_subscription(notify_server.url.clone(), HashSet::from([Uuid::new_v4()])).await; let result = get_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, &app_domain, @@ -4676,10 +4515,9 @@ async fn e2e_get_notifications_has_none(notify_server: &NotifyServerContext) { assert!(!result.has_more); let failed_result = tokio::time::timeout( - std::time::Duration::from_secs(1), + RELAY_MESSAGE_DELIVERY_TIMEOUT / 2, get_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, &app_domain, @@ -4700,8 +4538,7 @@ async fn e2e_get_notifications_has_none(notify_server: &NotifyServerContext) { async fn e2e_get_notifications_has_one(notify_server: &NotifyServerContext) { let notification_type = Uuid::new_v4(); let ( - relay_ws_client, - mut rx, + mut relay_client, account, identity_key_details, project_id, @@ -4747,8 +4584,7 @@ async fn e2e_get_notifications_has_one(notify_server: &NotifyServerContext) { let after_notification_sent = Utc::now() + chrono::Duration::seconds(1); // Postgres time could be slightly out-of-sync with this process it seems let result = get_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, &app_domain, @@ -6007,8 +5843,7 @@ async fn e2e_set_a_welcome_notification(notify_server: &NotifyServerContext) { #[tokio::test] async fn e2e_send_welcome_notification(notify_server: &NotifyServerContext) { let ( - relay_ws_client, - mut rx, + mut relay_client, account, identity_key_details, project_id, @@ -6045,11 +5880,10 @@ async fn e2e_send_welcome_notification(notify_server: &NotifyServerContext) { ) .await; - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); let notify_key = subscribe_to_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, app_domain.clone(), @@ -6065,14 +5899,13 @@ async fn e2e_send_welcome_notification(notify_server: &NotifyServerContext) { msg: notify_message, .. } = accept_and_respond_to_notify_message( + &mut relay_client2, &identity_key_details, &account, &app_authentication_key, &app_client_id, app_domain.clone(), notify_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(welcome_notification.r#type, notify_message.r#type); @@ -6081,8 +5914,7 @@ async fn e2e_send_welcome_notification(notify_server: &NotifyServerContext) { assert_eq!(welcome_notification.url.as_ref(), Some(¬ify_message.url)); let result = get_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, &app_domain, @@ -6108,8 +5940,7 @@ async fn e2e_send_welcome_notification(notify_server: &NotifyServerContext) { #[tokio::test] async fn e2e_send_single_welcome_notification(notify_server: &NotifyServerContext) { let ( - relay_ws_client, - mut rx, + mut relay_client, account, identity_key_details, project_id, @@ -6141,8 +5972,7 @@ async fn e2e_send_single_welcome_notification(notify_server: &NotifyServerContex .unwrap(); let _notify_key = subscribe_to_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, app_domain.clone(), @@ -6155,8 +5985,7 @@ async fn e2e_send_single_welcome_notification(notify_server: &NotifyServerContex .await; let notify_key = subscribe_to_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, app_domain.clone(), @@ -6169,8 +5998,7 @@ async fn e2e_send_single_welcome_notification(notify_server: &NotifyServerContex .await; let result = get_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, &app_domain, @@ -6189,8 +6017,7 @@ async fn e2e_send_single_welcome_notification(notify_server: &NotifyServerContex #[tokio::test] async fn subscribe_idempotent_keeps_symkey(notify_server: &NotifyServerContext) { let ( - relay_ws_client, - mut rx, + mut relay_client, account, identity_key_details, _project_id, @@ -6203,8 +6030,7 @@ async fn subscribe_idempotent_keeps_symkey(notify_server: &NotifyServerContext) ) = setup_project_and_watch(notify_server.url.clone()).await; let notify_key1 = subscribe_to_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, app_domain.clone(), @@ -6217,8 +6043,7 @@ async fn subscribe_idempotent_keeps_symkey(notify_server: &NotifyServerContext) .await; let notify_key2 = subscribe_to_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, app_domain.clone(), @@ -6237,8 +6062,7 @@ async fn subscribe_idempotent_keeps_symkey(notify_server: &NotifyServerContext) #[tokio::test] async fn subscribe_idempotent_updates_notification_types(notify_server: &NotifyServerContext) { let ( - relay_ws_client, - mut rx, + mut relay_client, account, identity_key_details, _project_id, @@ -6252,8 +6076,7 @@ async fn subscribe_idempotent_updates_notification_types(notify_server: &NotifyS let notification_types = HashSet::from([Uuid::new_v4()]); let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, app_key_agreement_key, @@ -6266,8 +6089,7 @@ async fn subscribe_idempotent_updates_notification_types(notify_server: &NotifyS let notification_types = HashSet::from([Uuid::new_v4()]); let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, app_key_agreement_key, @@ -6283,8 +6105,7 @@ async fn subscribe_idempotent_updates_notification_types(notify_server: &NotifyS #[tokio::test] async fn e2e_doesnt_send_welcome_notification(notify_server: &NotifyServerContext) { let ( - relay_ws_client, - mut rx, + mut relay_client, account, identity_key_details, project_id, @@ -6321,11 +6142,10 @@ async fn e2e_doesnt_send_welcome_notification(notify_server: &NotifyServerContex ) .await; - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); let notify_key = subscribe_to_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, app_domain.clone(), @@ -6338,24 +6158,22 @@ async fn e2e_doesnt_send_welcome_notification(notify_server: &NotifyServerContex .await; let result = tokio::time::timeout( - std::time::Duration::from_secs(1), + RELAY_MESSAGE_DELIVERY_TIMEOUT / 2, accept_and_respond_to_notify_message( + &mut relay_client2, &identity_key_details, &account, &app_authentication_key, &app_client_id, app_domain.clone(), notify_key, - &relay_ws_client, - &mut rx2, ), ) .await; assert!(result.is_err()); let result = get_notifications( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, &app_domain, @@ -6404,7 +6222,7 @@ async fn delete_and_resubscribe(notify_server: &NotifyServerContext) { .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -6415,22 +6233,20 @@ async fn delete_and_resubscribe(notify_server: &NotifyServerContext) { subscribe_topic(&project_id, app_domain.clone(), ¬ify_server.url).await; let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details, Some(app_domain.clone()), &account, - &relay_ws_client, - &mut rx, ) .await; assert!(subs.is_empty()); let notification_type = Uuid::new_v4(); let notification_types = HashSet::from([notification_type]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement, @@ -6441,12 +6257,11 @@ async fn delete_and_resubscribe(notify_server: &NotifyServerContext) { .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -6455,9 +6270,7 @@ async fn delete_and_resubscribe(notify_server: &NotifyServerContext) { let notify_key = decode_key(&sub.sym_key).unwrap(); - topic_subscribe(relay_ws_client.as_ref(), topic_from_key(¬ify_key)) - .await - .unwrap(); + relay_client.subscribe(topic_from_key(¬ify_key)).await; let notification = Notification { r#type: notification_type, @@ -6490,14 +6303,13 @@ async fn delete_and_resubscribe(notify_server: &NotifyServerContext) { .await; let claims = accept_and_respond_to_notify_message( + &mut relay_client, &identity_key_details, &account, &authentication, &client_id, app_domain.clone(), notify_key, - &relay_ws_client, - &mut rx, ) .await; @@ -6507,25 +6319,23 @@ async fn delete_and_resubscribe(notify_server: &NotifyServerContext) { assert_eq!(claims.msg.icon, "icon"); assert_eq!(claims.msg.url, "url"); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); delete( + &mut relay_client, &identity_key_details, &app_domain, &client_id, &account, notify_key, - &relay_ws_client, - &mut rx, ) .await; let sbs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert!(sbs.is_empty()); @@ -6553,10 +6363,9 @@ async fn delete_and_resubscribe(notify_server: &NotifyServerContext) { let notification_type = Uuid::new_v4(); let notification_types = HashSet::from([notification_type]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement, @@ -6567,12 +6376,11 @@ async fn delete_and_resubscribe(notify_server: &NotifyServerContext) { .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details, &account, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -6581,9 +6389,7 @@ async fn delete_and_resubscribe(notify_server: &NotifyServerContext) { let notify_key = decode_key(&sub.sym_key).unwrap(); - topic_subscribe(relay_ws_client.as_ref(), topic_from_key(¬ify_key)) - .await - .unwrap(); + relay_client.subscribe(topic_from_key(¬ify_key)).await; let notification = Notification { r#type: notification_type, @@ -6616,14 +6422,13 @@ async fn delete_and_resubscribe(notify_server: &NotifyServerContext) { .await; let claims = accept_and_respond_to_notify_message( + &mut relay_client, &identity_key_details, &account, &authentication, &client_id, app_domain.clone(), notify_key, - &relay_ws_client, - &mut rx, ) .await; @@ -6667,8 +6472,9 @@ async fn watch_subscriptions_multiple_clients_mjv_v0(notify_server: &NotifyServe ), ) .await; + let vars = get_vars(); - let (relay_ws_client1, mut rx1) = create_client( + let mut relay_client1 = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -6694,8 +6500,9 @@ async fn watch_subscriptions_multiple_clients_mjv_v0(notify_server: &NotifyServe ), ) .await; + let vars = get_vars(); - let (relay_ws_client2, mut rx2) = create_client( + let mut relay_client2 = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -6703,23 +6510,21 @@ async fn watch_subscriptions_multiple_clients_mjv_v0(notify_server: &NotifyServe .await; let (subs1, watch_topic_key1, notify_server_client_id) = watch_subscriptions( + &mut relay_client1, notify_server.url.clone(), &identity_key_details1, Some(app_domain.clone()), &account, - &relay_ws_client1, - &mut rx1, ) .await; assert!(subs1.is_empty()); let (subs2, watch_topic_key2, notify_server_client_id2) = watch_subscriptions( + &mut relay_client2, notify_server.url.clone(), &identity_key_details2, Some(app_domain.clone()), &account, - &relay_ws_client2, - &mut rx2, ) .await; assert!(subs2.is_empty()); @@ -6727,10 +6532,9 @@ async fn watch_subscriptions_multiple_clients_mjv_v0(notify_server: &NotifyServe let notification_type = Uuid::new_v4(); let notification_types = HashSet::from([notification_type]); - let mut rx1_2 = rx1.resubscribe(); + let mut relay_client1_2 = relay_client1.clone(); subscribe( - &relay_ws_client1, - &mut rx1, + &mut relay_client1, &account, &identity_key_details1, key_agreement, @@ -6741,12 +6545,11 @@ async fn watch_subscriptions_multiple_clients_mjv_v0(notify_server: &NotifyServe .await; let subs1 = accept_watch_subscriptions_changed( + &mut relay_client1_2, ¬ify_server_client_id, &identity_key_details1, &account, watch_topic_key1, - &relay_ws_client1, - &mut rx1_2, ) .await; assert_eq!(subs1.len(), 1); @@ -6754,12 +6557,11 @@ async fn watch_subscriptions_multiple_clients_mjv_v0(notify_server: &NotifyServe assert_eq!(sub1.scope, notification_types); let subs2 = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details2, &account, watch_topic_key2, - &relay_ws_client2, - &mut rx2, ) .await; assert_eq!(subs2.len(), 1); @@ -6769,16 +6571,13 @@ async fn watch_subscriptions_multiple_clients_mjv_v0(notify_server: &NotifyServe assert_eq!(sub1.sym_key, sub2.sym_key); let notify_key = decode_key(&sub2.sym_key).unwrap(); - topic_subscribe(relay_ws_client1.as_ref(), topic_from_key(¬ify_key)) - .await - .unwrap(); + relay_client1.subscribe(topic_from_key(¬ify_key)).await; // Update to 2 types let notification_types = HashSet::from([Uuid::new_v4(), Uuid::new_v4()]); - let mut rx1_2 = rx1.resubscribe(); + let mut relay_client1_2 = relay_client1.clone(); update( - &relay_ws_client1, - &mut rx1, + &mut relay_client1, &account, &identity_key_details1, &app_domain, @@ -6788,56 +6587,51 @@ async fn watch_subscriptions_multiple_clients_mjv_v0(notify_server: &NotifyServe ) .await; let subs1 = accept_watch_subscriptions_changed( + &mut relay_client1_2, ¬ify_server_client_id, &identity_key_details1, &account, watch_topic_key1, - &relay_ws_client1, - &mut rx1_2, ) .await; assert_eq!(subs1.len(), 1); assert_eq!(subs1[0].scope, notification_types); let subs2 = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details2, &account, watch_topic_key2, - &relay_ws_client2, - &mut rx2, ) .await; assert_eq!(subs2.len(), 1); assert_eq!(subs2[0].scope, notification_types); - let mut rx1_2 = rx1.resubscribe(); + let mut relay_client1_2 = relay_client1.clone(); delete( + &mut relay_client1, &identity_key_details1, &app_domain, &client_id, &account, notify_key, - &relay_ws_client1, - &mut rx1, ) .await; let subs1 = accept_watch_subscriptions_changed( + &mut relay_client1_2, ¬ify_server_client_id, &identity_key_details1, &account, watch_topic_key1, - &relay_ws_client1, - &mut rx1_2, ) .await; assert!(subs1.is_empty()); let subs2 = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details2, &account, watch_topic_key2, - &relay_ws_client2, - &mut rx2, ) .await; assert!(subs2.is_empty()); @@ -6876,8 +6670,9 @@ async fn watch_subscriptions_multiple_clients_mjv_v1(notify_server: &NotifyServe ), ) .await; + let vars = get_vars(); - let (relay_ws_client1, mut rx1) = create_client( + let mut relay_client1 = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -6903,8 +6698,9 @@ async fn watch_subscriptions_multiple_clients_mjv_v1(notify_server: &NotifyServe ), ) .await; + let vars = get_vars(); - let (relay_ws_client2, mut rx2) = create_client( + let mut relay_client2 = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -6912,23 +6708,21 @@ async fn watch_subscriptions_multiple_clients_mjv_v1(notify_server: &NotifyServe .await; let (subs1, watch_topic_key1, notify_server_client_id) = watch_subscriptions( + &mut relay_client1, notify_server.url.clone(), &identity_key_details1, Some(app_domain.clone()), &account, - &relay_ws_client1, - &mut rx1, ) .await; assert!(subs1.is_empty()); let (subs2, watch_topic_key2, notify_server_client_id2) = watch_subscriptions( + &mut relay_client2, notify_server.url.clone(), &identity_key_details2, Some(app_domain.clone()), &account, - &relay_ws_client2, - &mut rx2, ) .await; assert!(subs2.is_empty()); @@ -6936,10 +6730,9 @@ async fn watch_subscriptions_multiple_clients_mjv_v1(notify_server: &NotifyServe let notification_type = Uuid::new_v4(); let notification_types = HashSet::from([notification_type]); - let mut rx1_2 = rx1.resubscribe(); + let mut relay_client1_2 = relay_client1.clone(); let subs1 = subscribe_v1( - &relay_ws_client1, - &mut rx1, + &mut relay_client1, &account, &identity_key_details1, key_agreement, @@ -6953,12 +6746,11 @@ async fn watch_subscriptions_multiple_clients_mjv_v1(notify_server: &NotifyServe assert_eq!(sub1.scope, notification_types); let subs2 = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details2, &account, watch_topic_key2, - &relay_ws_client2, - &mut rx2, ) .await; assert_eq!(subs2.len(), 1); @@ -6966,14 +6758,13 @@ async fn watch_subscriptions_multiple_clients_mjv_v1(notify_server: &NotifyServe assert_eq!(sub2.scope, notification_types); let result1 = tokio::time::timeout( - std::time::Duration::from_secs(1), + RELAY_MESSAGE_DELIVERY_TIMEOUT / 2, accept_watch_subscriptions_changed( + &mut relay_client1_2, ¬ify_server_client_id, &identity_key_details1, &account, watch_topic_key1, - &relay_ws_client1, - &mut rx1_2, ), ) .await; @@ -6982,16 +6773,13 @@ async fn watch_subscriptions_multiple_clients_mjv_v1(notify_server: &NotifyServe assert_eq!(sub1.sym_key, sub2.sym_key); let notify_key = decode_key(&sub2.sym_key).unwrap(); - topic_subscribe(relay_ws_client1.as_ref(), topic_from_key(¬ify_key)) - .await - .unwrap(); + relay_client1.subscribe(topic_from_key(¬ify_key)).await; // Update to 2 types let notification_types = HashSet::from([Uuid::new_v4(), Uuid::new_v4()]); - let mut rx1_2 = rx1.resubscribe(); + let mut relay_client1_2 = relay_client1.clone(); let subs1 = update_v1( - &relay_ws_client1, - &mut rx1, + &mut relay_client1, &account, &identity_key_details1, &app_domain, @@ -7004,64 +6792,59 @@ async fn watch_subscriptions_multiple_clients_mjv_v1(notify_server: &NotifyServe assert_eq!(subs1[0].scope, notification_types); let subs2 = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details2, &account, watch_topic_key2, - &relay_ws_client2, - &mut rx2, ) .await; assert_eq!(subs2.len(), 1); assert_eq!(subs2[0].scope, notification_types); let result1 = tokio::time::timeout( - std::time::Duration::from_secs(1), + RELAY_MESSAGE_DELIVERY_TIMEOUT / 2, accept_watch_subscriptions_changed( + &mut relay_client1_2, ¬ify_server_client_id, &identity_key_details1, &account, watch_topic_key1, - &relay_ws_client1, - &mut rx1_2, ), ) .await; assert!(result1.is_err()); - let mut rx1_2 = rx1.resubscribe(); + let mut relay_client1_2 = relay_client1.clone(); let subs1 = delete_v1( + &mut relay_client1, &identity_key_details1, &app_domain, &client_id, &account, notify_key, - &relay_ws_client1, - &mut rx1, ) .await; assert!(subs1.is_empty()); let subs2 = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details2, &account, watch_topic_key2, - &relay_ws_client2, - &mut rx2, ) .await; assert!(subs2.is_empty()); let result1 = tokio::time::timeout( - std::time::Duration::from_secs(1), + RELAY_MESSAGE_DELIVERY_TIMEOUT / 2, accept_watch_subscriptions_changed( + &mut relay_client1_2, ¬ify_server_client_id, &identity_key_details1, &account, watch_topic_key1, - &relay_ws_client1, - &mut rx1_2, ), ) .await; @@ -7228,7 +7011,7 @@ async fn same_address_different_chain_modify_subscription(notify_server: &Notify .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -7239,22 +7022,20 @@ async fn same_address_different_chain_modify_subscription(notify_server: &Notify subscribe_topic(&project_id, app_domain.clone(), ¬ify_server.url).await; let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details1, Some(app_domain.clone()), &account1, - &relay_ws_client, - &mut rx, ) .await; assert!(subs.is_empty()); // Subscribe with 1 type let notification_types = HashSet::from([Uuid::new_v4()]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account1, &identity_key_details1, key_agreement, @@ -7264,12 +7045,11 @@ async fn same_address_different_chain_modify_subscription(notify_server: &Notify ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details1, &account1, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -7278,15 +7058,12 @@ async fn same_address_different_chain_modify_subscription(notify_server: &Notify let notify_key = decode_key(&sub.sym_key).unwrap(); - topic_subscribe(relay_ws_client.as_ref(), topic_from_key(¬ify_key)) - .await - .unwrap(); + relay_client.subscribe(topic_from_key(¬ify_key)).await; let notification_types = HashSet::from([]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); update( - &relay_ws_client, - &mut rx, + &mut relay_client, &account2, &identity_key_details2, &app_domain, @@ -7296,12 +7073,11 @@ async fn same_address_different_chain_modify_subscription(notify_server: &Notify ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details1, &account1, watch_topic_key, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -7362,7 +7138,7 @@ async fn same_address_different_chain_watch_subscriptions(notify_server: &Notify .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -7373,33 +7149,30 @@ async fn same_address_different_chain_watch_subscriptions(notify_server: &Notify subscribe_topic(&project_id, app_domain.clone(), ¬ify_server.url).await; let (subs1, watch_topic_key1, notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details1, Some(app_domain.clone()), &account1, - &relay_ws_client, - &mut rx, ) .await; assert!(subs1.is_empty()); let (subs2, watch_topic_key2, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details2, Some(app_domain.clone()), &account2, - &relay_ws_client, - &mut rx, ) .await; assert!(subs2.is_empty()); // Subscribe with 1 type let notification_types = HashSet::from([Uuid::new_v4()]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account1, &identity_key_details1, key_agreement, @@ -7409,12 +7182,11 @@ async fn same_address_different_chain_watch_subscriptions(notify_server: &Notify ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details1, &account1, watch_topic_key1, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -7422,12 +7194,11 @@ async fn same_address_different_chain_watch_subscriptions(notify_server: &Notify assert_eq!(sub1.scope, notification_types); assert_eq!(sub1.account, account1); let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details2, &account2, watch_topic_key2, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -7438,15 +7209,12 @@ async fn same_address_different_chain_watch_subscriptions(notify_server: &Notify assert_eq!(sub1.sym_key, sub2.sym_key); let notify_key = decode_key(&sub2.sym_key).unwrap(); - topic_subscribe(relay_ws_client.as_ref(), topic_from_key(¬ify_key)) - .await - .unwrap(); + relay_client.subscribe(topic_from_key(¬ify_key)).await; let notification_types = HashSet::from([]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); update( - &relay_ws_client, - &mut rx, + &mut relay_client, &account2, &identity_key_details2, &app_domain, @@ -7456,24 +7224,22 @@ async fn same_address_different_chain_watch_subscriptions(notify_server: &Notify ) .await; let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details1, &account1, watch_topic_key1, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); assert_eq!(subs[0].scope, notification_types); assert_eq!(subs[0].account, account1); let subs = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details2, &account2, watch_topic_key2, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs.len(), 1); @@ -7535,7 +7301,7 @@ async fn watch_subscriptions_response_chain_agnostic(notify_server: &NotifyServe .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -7547,8 +7313,7 @@ async fn watch_subscriptions_response_chain_agnostic(notify_server: &NotifyServe let notification_types = HashSet::from([Uuid::new_v4()]); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account1, &identity_key_details1, key_agreement, @@ -7559,12 +7324,11 @@ async fn watch_subscriptions_response_chain_agnostic(notify_server: &NotifyServe .await; let (subs1, _watch_topic_key1, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details1, Some(app_domain.clone()), &account1, - &relay_ws_client, - &mut rx, ) .await; assert_eq!(subs1.len(), 1); @@ -7572,12 +7336,11 @@ async fn watch_subscriptions_response_chain_agnostic(notify_server: &NotifyServe assert_eq!(subs1[0].account, account1); let (subs2, _watch_topic_key2, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details2, Some(app_domain.clone()), &account2, - &relay_ws_client, - &mut rx, ) .await; assert_eq!(subs2.len(), 1); @@ -7642,7 +7405,7 @@ async fn no_watcher_gives_only_chains_for_subscription(notify_server: &NotifySer .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -7655,8 +7418,7 @@ async fn no_watcher_gives_only_chains_for_subscription(notify_server: &NotifySer subscribe_topic(&project_id2, app_domain2.clone(), ¬ify_server.url).await; let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account1, &identity_key_details1, key_agreement1, @@ -7669,8 +7431,7 @@ async fn no_watcher_gives_only_chains_for_subscription(notify_server: &NotifySer assert_eq!(subs[0].account, account1); let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account2, &identity_key_details2, key_agreement2, @@ -7740,7 +7501,7 @@ async fn subscribe_response_chain_agnostic(notify_server: &NotifyServerContext) .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -7753,27 +7514,24 @@ async fn subscribe_response_chain_agnostic(notify_server: &NotifyServerContext) subscribe_topic(&project_id2, app_domain2.clone(), ¬ify_server.url).await; let (_subs, _watch_topic_key1, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details1, None, &account1, - &relay_ws_client, - &mut rx, ) .await; let (_subs, _watch_topic_key1, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details2, None, &account2, - &relay_ws_client, - &mut rx, ) .await; let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account1, &identity_key_details1, key_agreement1, @@ -7786,8 +7544,7 @@ async fn subscribe_response_chain_agnostic(notify_server: &NotifyServerContext) assert_eq!(subs[0].account, account1); let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account2, &identity_key_details2, key_agreement2, @@ -7870,7 +7627,7 @@ async fn update_response_chain_agnostic(notify_server: &NotifyServerContext) { .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -7883,27 +7640,24 @@ async fn update_response_chain_agnostic(notify_server: &NotifyServerContext) { subscribe_topic(&project_id2, app_domain2.clone(), ¬ify_server.url).await; let (_subs, _watch_topic_key1, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details1, None, &account1, - &relay_ws_client, - &mut rx, ) .await; let (_subs, _watch_topic_key1, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details2, None, &account2, - &relay_ws_client, - &mut rx, ) .await; let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account1, &identity_key_details1, key_agreement1, @@ -7916,8 +7670,7 @@ async fn update_response_chain_agnostic(notify_server: &NotifyServerContext) { assert_eq!(subs[0].account, account1); let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account2, &identity_key_details2, key_agreement2, @@ -7950,13 +7703,10 @@ async fn update_response_chain_agnostic(notify_server: &NotifyServerContext) { .sym_key, ) .unwrap(); - topic_subscribe(&relay_ws_client, topic_from_key(&sub2_key)) - .await - .unwrap(); + relay_client.subscribe(topic_from_key(&sub2_key)).await; let subs = update_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account2, &identity_key_details2, &app_domain2, @@ -8039,7 +7789,7 @@ async fn delete_response_chain_agnostic(notify_server: &NotifyServerContext) { .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -8052,27 +7802,24 @@ async fn delete_response_chain_agnostic(notify_server: &NotifyServerContext) { subscribe_topic(&project_id2, app_domain2.clone(), ¬ify_server.url).await; let (_subs, _watch_topic_key1, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details1, None, &account1, - &relay_ws_client, - &mut rx, ) .await; let (_subs, _watch_topic_key1, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details2, None, &account2, - &relay_ws_client, - &mut rx, ) .await; let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account1, &identity_key_details1, key_agreement1, @@ -8085,8 +7832,7 @@ async fn delete_response_chain_agnostic(notify_server: &NotifyServerContext) { assert_eq!(subs[0].account, account1); let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account2, &identity_key_details2, key_agreement2, @@ -8119,18 +7865,15 @@ async fn delete_response_chain_agnostic(notify_server: &NotifyServerContext) { .sym_key, ) .unwrap(); - topic_subscribe(&relay_ws_client, topic_from_key(&sub2_key)) - .await - .unwrap(); + relay_client.subscribe(topic_from_key(&sub2_key)).await; let subs = delete_v1( + &mut relay_client, &identity_key_details2, &app_domain2, &app_client_id2, &account2, sub2_key, - &relay_ws_client, - &mut rx, ) .await; assert_eq!(subs.len(), 1); @@ -8186,16 +7929,14 @@ async fn same_address_different_chain_notify(notify_server: &NotifyServerContext .unwrap(); let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), ) .await; - topic_subscribe(relay_ws_client.as_ref(), notify_topic) - .await - .unwrap(); + relay_client.subscribe(notify_topic.clone()).await; let notification = Notification { r#type: notification_type, @@ -8235,12 +7976,12 @@ async fn same_address_different_chain_notify(notify_server: &NotifyServerContext assert!(response.failed.is_empty()); let (_, claims) = accept_notify_message( + &mut relay_client, &account1, &authentication_key.verifying_key(), &get_client_id(&authentication_key.verifying_key()), &app_domain, ¬ify_key, - &mut rx, ) .await; @@ -8290,7 +8031,7 @@ async fn no_watcher_returns_only_app_subscriptions(notify_server: &NotifyServerC subscribe_topic(&project_id2, app_domain2.clone(), ¬ify_server.url).await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -8299,8 +8040,7 @@ async fn no_watcher_returns_only_app_subscriptions(notify_server: &NotifyServerC let notification_types = HashSet::from([Uuid::new_v4()]); let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement1, @@ -8315,8 +8055,7 @@ async fn no_watcher_returns_only_app_subscriptions(notify_server: &NotifyServerC let notification_types = HashSet::from([Uuid::new_v4()]); let subs = subscribe_v1( - &relay_ws_client, - &mut rx, + &mut relay_client, &account, &identity_key_details, key_agreement2, @@ -8384,7 +8123,7 @@ async fn different_account_subscribe_results_one_subscription(notify_server: &No .await; let vars = get_vars(); - let (relay_ws_client, mut rx) = create_client( + let mut relay_client = RelayClient::new( vars.relay_url.parse().unwrap(), vars.project_id.into(), notify_server.url.clone(), @@ -8395,31 +8134,28 @@ async fn different_account_subscribe_results_one_subscription(notify_server: &No subscribe_topic(&project_id, app_domain.clone(), ¬ify_server.url).await; let (subs1, watch_topic_key1, notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details1, Some(app_domain.clone()), &account1, - &relay_ws_client, - &mut rx, ) .await; assert!(subs1.is_empty()); let (subs2, watch_topic_key2, _notify_server_client_id) = watch_subscriptions( + &mut relay_client, notify_server.url.clone(), &identity_key_details2, Some(app_domain.clone()), &account2, - &relay_ws_client, - &mut rx, ) .await; assert!(subs2.is_empty()); let notification_types = HashSet::from([Uuid::new_v4()]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account1, &identity_key_details1, key_agreement, @@ -8429,24 +8165,22 @@ async fn different_account_subscribe_results_one_subscription(notify_server: &No ) .await; let subs1 = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details1, &account1, watch_topic_key1, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs1.len(), 1); let sub1 = &subs1[0]; assert_eq!(sub1.scope, notification_types); let subs2 = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details2, &account2, watch_topic_key2, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs2.len(), 1); @@ -8455,10 +8189,9 @@ async fn different_account_subscribe_results_one_subscription(notify_server: &No assert_eq!(sub1.sym_key, sub2.sym_key); let notification_types = HashSet::from([Uuid::new_v4()]); - let mut rx2 = rx.resubscribe(); + let mut relay_client2 = relay_client.clone(); subscribe( - &relay_ws_client, - &mut rx, + &mut relay_client, &account2, &identity_key_details2, key_agreement, @@ -8468,24 +8201,22 @@ async fn different_account_subscribe_results_one_subscription(notify_server: &No ) .await; let subs1 = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details1, &account1, watch_topic_key1, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs1.len(), 1); let sub1 = &subs1[0]; assert_eq!(sub1.scope, notification_types); let subs2 = accept_watch_subscriptions_changed( + &mut relay_client2, ¬ify_server_client_id, &identity_key_details2, &account2, watch_topic_key2, - &relay_ws_client, - &mut rx2, ) .await; assert_eq!(subs2.len(), 1); diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs index 57b0f5d2..bfa6f28c 100644 --- a/tests/utils/mod.rs +++ b/tests/utils/mod.rs @@ -7,55 +7,149 @@ use { error::NotifyServerError, model::types::AccountId, notify_message::NotifyMessage, - relay_client_helpers::create_ws_connect_options, - services::websocket_server::relay_ws_client::{RelayClientEvent, RelayConnectionHandler}, + relay_client_helpers::create_http_client, }, rand::rngs::StdRng, rand_chacha::rand_core::OsRng, rand_core::SeedableRng, - relay_client::websocket, + relay_client::http::Client, relay_rpc::{ auth::ed25519_dalek::Keypair, - domain::ProjectId, + domain::{ProjectId, Topic}, jwt::{JwtHeader, JWT_HEADER_ALG, JWT_HEADER_TYP}, + rpc::SubscriptionData, }, serde::Serialize, sha2::Digest, sha3::Keccak256, - std::sync::Arc, - tokio::sync::broadcast::Receiver, + std::{sync::Arc, time::Duration}, + tokio::sync::{broadcast::Receiver, RwLock}, + tracing::info, url::Url, }; +pub const RELAY_MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30); + pub const JWT_LEEWAY: i64 = 30; -pub async fn create_client( - relay_url: Url, - relay_project_id: ProjectId, - notify_url: Url, -) -> ( - Arc, - Receiver, -) { - let (tx, mut rx) = tokio::sync::broadcast::channel(8); - let (mpsc_tx, mut mpsc_rx) = tokio::sync::mpsc::unbounded_channel(); - tokio::task::spawn(async move { - while let Some(event) = mpsc_rx.recv().await { - let _ = tx.send(event); +pub struct RelayClient { + pub client: Arc, + pub receiver: Receiver, + pub topics: Arc>>, +} + +impl Clone for RelayClient { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + receiver: self.receiver.resubscribe(), + topics: self.topics.clone(), + } + } +} + +const RETRIES: usize = 5; + +#[allow(dead_code)] +impl RelayClient { + pub async fn new(relay_url: Url, relay_project_id: ProjectId, notify_url: Url) -> Self { + let client = create_http_client( + &Keypair::generate(&mut StdRng::from_entropy()), + relay_url, + notify_url, + relay_project_id, + ) + .unwrap(); + + let (tx, rx) = tokio::sync::broadcast::channel(8); + let topics = Arc::new(RwLock::new(vec![])); + tokio::task::spawn({ + let relay_client = client.clone(); + let topics = topics.clone(); + async move { + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + let topics = topics.read().await.clone(); + if topics.is_empty() { + continue; + } + + let result = relay_client.batch_fetch(topics.clone()).await; + if let Ok(res) = result { + for msg in res.messages { + if tx.send(msg).is_err() { + break; + } + } + } + } + } + }); + + Self { + client: Arc::new(client), + receiver: rx, + topics, } - }); - let connection_handler = RelayConnectionHandler::new("notify-client", mpsc_tx); - let relay_ws_client = Arc::new(websocket::Client::new(connection_handler)); + } - let keypair = Keypair::generate(&mut StdRng::from_entropy()); - let opts = - create_ws_connect_options(&keypair, relay_url, notify_url, relay_project_id).unwrap(); - relay_ws_client.connect(&opts).await.unwrap(); + pub async fn subscribe(&self, topic: Topic) { + self.topics.write().await.push(topic.clone()); + let mut tries = 0; + loop { + tries += 1; + let result = self.client.subscribe_blocking(topic.clone()).await; + match result { + Ok(_) => return, + e if tries > RETRIES => { + let _ = e.unwrap(); + } + _ => {} + } + } + } - // Eat up the "connected" message - _ = rx.recv().await.unwrap(); + pub async fn publish( + &self, + topic: Topic, + message: impl Into>, + tag: u32, + ttl: Duration, + ) { + let message = message.into(); + let mut tries = 0; + loop { + tries += 1; + let result = self + .client + .publish(topic.clone(), message.clone(), tag, ttl, false) + .await; + match result { + Ok(_) => return, + e if tries > RETRIES => e.unwrap(), + _ => {} + } + } + } - (relay_ws_client, rx) + pub async fn accept_message(&mut self, tag: u32, topic: &Topic) -> SubscriptionData { + let result = tokio::time::timeout(RELAY_MESSAGE_DELIVERY_TIMEOUT, async { + loop { + let msg = self.receiver.recv().await.unwrap(); + if msg.tag == tag && &msg.topic == topic { + return msg; + } else { + info!("expected {tag}, ignored message with tag: {}", msg.tag); + } + } + }) + .await; + + match result { + Ok(msg) => msg, + Err(_) => panic!("Timeout waiting for {tag} message on topic {topic}"), + } + } } // Workaround https://github.com/rust-lang/rust-clippy/issues/11613 @@ -150,16 +244,3 @@ impl GetSharedClaims for UnregisterIdentityRequestAuth { &self.shared_claims } } - -pub async fn topic_subscribe( - relay_ws_client: &relay_client::websocket::Client, - topic: relay_rpc::domain::Topic, -) -> Result<(), relay_client::error::Error> { - relay_ws_client.subscribe(topic).await?; - - // Sleep after subscribing to a topic to give relay time to process the subscription - // This shouldn't be required, but seems to be a race condition currently - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - - Ok(()) -}