From 513eb9050aeb26eac1e28ce734994eb11ab34819 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Sat, 23 Dec 2023 07:55:41 +0900 Subject: [PATCH 1/3] chore: refactor subscribe --- src/services/websocket_server/mod.rs | 4 +- src/spec.rs | 2 + src/types/mod.rs | 6 +- tests/deployment.rs | 11 +- tests/integration.rs | 470 ++++++++++++++------------- 5 files changed, 255 insertions(+), 238 deletions(-) diff --git a/src/services/websocket_server/mod.rs b/src/services/websocket_server/mod.rs index 744db9ef..e7c43396 100644 --- a/src/services/websocket_server/mod.rs +++ b/src/services/websocket_server/mod.rs @@ -249,8 +249,8 @@ pub struct NotifyWatchSubscriptions { #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -struct NotifySubscribe { - subscription_auth: String, +pub struct NotifySubscribe { + pub subscription_auth: String, } #[derive(Serialize, Deserialize, Debug)] diff --git a/src/spec.rs b/src/spec.rs index 2e6413f8..fcbf4c73 100644 --- a/src/spec.rs +++ b/src/spec.rs @@ -43,6 +43,8 @@ pub const NOTIFY_SUBSCRIPTIONS_CHANGED_TTL: Duration = T300; // https://specs.walletconnect.com/2.0/specs/clients/notify/notify-authentication pub const NOTIFY_WATCH_SUBSCRIPTIONS_ACT: &str = "notify_watch_subscriptions"; pub const NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT: &str = "notify_watch_subscriptions_response"; +pub const NOTIFY_SUBSCRIBE_ACT: &str = "notify_subscription"; +pub const NOTIFY_SUBSCRIBE_RESPONSE_ACT: &str = "notify_subscription_response"; #[cfg(test)] mod tests { diff --git a/src/types/mod.rs b/src/types/mod.rs index 3b996f0c..2313fead 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -67,7 +67,11 @@ impl Envelope { } impl Envelope { - pub fn new(encryption_key: &[u8; 32], data: impl Serialize, pubkey: [u8; 32]) -> Result { + pub fn new( + encryption_key: &[u8; 32], + data: serde_json::Value, + pubkey: [u8; 32], + ) -> Result { let serialized = serde_json::to_vec(&data)?; let iv = generate_nonce(); diff --git a/tests/deployment.rs b/tests/deployment.rs index b048d39d..267f0e6b 100644 --- a/tests/deployment.rs +++ b/tests/deployment.rs @@ -256,10 +256,13 @@ async fn watch_subscriptions( let response_topic_key = derive_key(&x25519_dalek::PublicKey::from(key_agreement_key), &secret).unwrap(); let response_topic = sha256::digest(&response_topic_key); - println!("watch_subscriptions response_topic: {response_topic}"); - let envelope = - Envelope::::new(&response_topic_key, message, *public.as_bytes()).unwrap(); + let envelope = Envelope::::new( + &response_topic_key, + serde_json::to_value(message).unwrap(), + *public.as_bytes(), + ) + .unwrap(); let message = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); let watch_subscriptions_topic = sha256::digest(&key_agreement_key); @@ -514,7 +517,7 @@ async fn run_test(statement: String, watch_subscriptions_all_domains: bool) { let envelope: Envelope = Envelope::::new( &response_topic_key, - message, + serde_json::to_value(message).unwrap(), *subscription_public.as_bytes(), ) .unwrap(); diff --git a/tests/integration.rs b/tests/integration.rs index 985745ec..48f21642 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -3,7 +3,7 @@ use { async_trait::async_trait, base64::{engine::general_purpose::STANDARD as BASE64, Engine}, chacha20poly1305::{aead::Aead, ChaCha20Poly1305, KeyInit}, - chrono::{Duration, TimeZone, Utc}, + chrono::{DateTime, Duration, TimeZone, Utc}, data_encoding::BASE64URL, ed25519_dalek::SigningKey, hyper::StatusCode, @@ -11,7 +11,7 @@ use { auth::{ add_ttl, encode_authentication_private_key, encode_authentication_public_key, encode_subscribe_private_key, encode_subscribe_public_key, from_jwt, CacaoValue, - DidWeb, KeyServerResponse, NotifyServerSubscription, SharedClaims, + DidWeb, GetSharedClaims, KeyServerResponse, NotifyServerSubscription, SharedClaims, SubscriptionDeleteRequestAuth, SubscriptionDeleteResponseAuth, SubscriptionRequestAuth, SubscriptionResponseAuth, SubscriptionUpdateRequestAuth, SubscriptionUpdateResponseAuth, WatchSubscriptionsChangedRequestAuth, @@ -56,18 +56,18 @@ use { }, websocket_server::{ decode_key, derive_key, relay_ws_client::RelayClientEvent, NotifyRequest, - NotifyResponse, NotifyWatchSubscriptions, ResponseAuth, + NotifyResponse, NotifySubscribe, NotifyWatchSubscriptions, ResponseAuth, }, }, spec::{ NOTIFY_DELETE_METHOD, NOTIFY_DELETE_RESPONSE_TAG, NOTIFY_DELETE_TAG, NOTIFY_DELETE_TTL, - NOTIFY_MESSAGE_TAG, NOTIFY_NOOP_TAG, NOTIFY_SUBSCRIBE_METHOD, - NOTIFY_SUBSCRIBE_RESPONSE_TAG, NOTIFY_SUBSCRIBE_TAG, NOTIFY_SUBSCRIBE_TTL, - NOTIFY_SUBSCRIPTIONS_CHANGED_TAG, NOTIFY_UPDATE_METHOD, NOTIFY_UPDATE_RESPONSE_TAG, - NOTIFY_UPDATE_TAG, NOTIFY_UPDATE_TTL, NOTIFY_WATCH_SUBSCRIPTIONS_ACT, - NOTIFY_WATCH_SUBSCRIPTIONS_METHOD, NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT, - NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG, NOTIFY_WATCH_SUBSCRIPTIONS_TAG, - NOTIFY_WATCH_SUBSCRIPTIONS_TTL, + NOTIFY_MESSAGE_TAG, NOTIFY_NOOP_TAG, NOTIFY_SUBSCRIBE_ACT, NOTIFY_SUBSCRIBE_METHOD, + NOTIFY_SUBSCRIBE_RESPONSE_ACT, NOTIFY_SUBSCRIBE_RESPONSE_TAG, NOTIFY_SUBSCRIBE_TAG, + NOTIFY_SUBSCRIBE_TTL, NOTIFY_SUBSCRIPTIONS_CHANGED_TAG, NOTIFY_UPDATE_METHOD, + NOTIFY_UPDATE_RESPONSE_TAG, NOTIFY_UPDATE_TAG, NOTIFY_UPDATE_TTL, + NOTIFY_WATCH_SUBSCRIPTIONS_ACT, NOTIFY_WATCH_SUBSCRIPTIONS_METHOD, + NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT, NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG, + NOTIFY_WATCH_SUBSCRIPTIONS_TAG, NOTIFY_WATCH_SUBSCRIPTIONS_TTL, }, types::{Envelope, EnvelopeType0, EnvelopeType1, Notification}, }, @@ -88,6 +88,7 @@ use { domain::{DecodedClientId, ProjectId, Topic}, }, reqwest::Response, + serde::de::DeserializeOwned, serde_json::{json, Value}, sha2::{digest::generic_array::GenericArray, Digest}, sha3::Keccak256, @@ -2526,96 +2527,243 @@ fn topic_from_key(key: &[u8]) -> Topic { sha256::digest(key).into() } -enum MessageMethod { - WatchSubscriptionsRequest, -} - +#[derive(Clone)] struct IdentityKeyDetails<'a> { keys_server_url: &'a Url, signing_key: &'a SigningKey, did_key: &'a str, } -// TODO move to method -enum TopicEncryptionScheme { - // Symetric([u8; 32]), - Asymetric { - client_private: x25519_dalek::StaticSecret, - client_public: x25519_dalek::PublicKey, - server_public: x25519_dalek::PublicKey, - }, +struct TopicEncryptionSchemeAsymetric { + client_private: x25519_dalek::StaticSecret, + client_public: x25519_dalek::PublicKey, + server_public: x25519_dalek::PublicKey, } -async fn publish_jwt_message<'a>( +async fn publish_watch_subscriptions_request<'a>( relay_ws_client: &Client, - app_domain: Option<&str>, did_pkh: String, - aud_authentication_key: &DecodedClientId, + client_id: &DecodedClientId, identity_key_details: IdentityKeyDetails<'a>, - method: MessageMethod, - encryption_details: TopicEncryptionScheme, + encryption_details: TopicEncryptionSchemeAsymetric, + app: Option, ) { - let (method, tag, ttl, act) = match method { - MessageMethod::WatchSubscriptionsRequest => ( - NOTIFY_WATCH_SUBSCRIPTIONS_METHOD, - NOTIFY_WATCH_SUBSCRIPTIONS_TAG, - NOTIFY_WATCH_SUBSCRIPTIONS_TTL, - NOTIFY_WATCH_SUBSCRIPTIONS_ACT, - ), - }; + publish_jwt_message( + relay_ws_client, + client_id, + identity_key_details.clone(), + encryption_details, + NOTIFY_WATCH_SUBSCRIPTIONS_TAG, + NOTIFY_WATCH_SUBSCRIPTIONS_TTL, + NOTIFY_WATCH_SUBSCRIPTIONS_ACT, + |shared_claims| { + serde_json::to_value(NotifyRequest::new( + NOTIFY_WATCH_SUBSCRIPTIONS_METHOD, + NotifyWatchSubscriptions { + watch_subscriptions_auth: encode_auth( + &WatchSubscriptionsRequestAuth { + shared_claims, + ksu: identity_key_details.keys_server_url.to_string(), + sub: did_pkh, + app, + }, + identity_key_details.signing_key, + ), + }, + )) + .unwrap() + }, + ) + .await +} - let now = Utc::now(); - let subscription_auth = WatchSubscriptionsRequestAuth { - shared_claims: SharedClaims { +async fn publish_subscribe_request<'a>( + relay_ws_client: &Client, + did_pkh: String, + client_id: &DecodedClientId, + identity_key_details: IdentityKeyDetails<'a>, + encryption_details: TopicEncryptionSchemeAsymetric, + app: DidWeb, + notification_types: HashSet, +) { + publish_jwt_message( + relay_ws_client, + client_id, + identity_key_details.clone(), + encryption_details, + NOTIFY_SUBSCRIBE_TAG, + NOTIFY_SUBSCRIBE_TTL, + NOTIFY_SUBSCRIBE_ACT, + |shared_claims| { + serde_json::to_value(NotifyRequest::new( + NOTIFY_SUBSCRIBE_METHOD, + NotifySubscribe { + subscription_auth: encode_auth( + &SubscriptionRequestAuth { + shared_claims, + ksu: identity_key_details.keys_server_url.to_string(), + sub: did_pkh.clone(), + scp: notification_types + .iter() + .map(ToString::to_string) + .collect::>() + .join(" "), + app, + }, + identity_key_details.signing_key, + ), + }, + )) + .unwrap() + }, + ) + .await +} + +async fn accept_message(rx: &mut UnboundedReceiver) -> PublishedMessage { + let event = rx.recv().await.unwrap(); + match event { + RelayClientEvent::Message(msg) => msg, + e => panic!("Expected message, got {e:?}"), + } +} + +#[allow(clippy::too_many_arguments)] +async fn subscribe( + relay_ws_client: &relay_client::websocket::Client, + rx: &mut UnboundedReceiver, + did_pkh: &str, + keys_server_url: Url, + app_key_agreement_key: x25519_dalek::PublicKey, + app_client_id: &DecodedClientId, + identity_signing_key: &SigningKey, + identity_did_key: &str, + app: DidWeb, + notification_types: HashSet, +) { + let secret = StaticSecret::random_from_rng(OsRng); + let public = PublicKey::from(&secret); + let response_topic_key = derive_key(&app_key_agreement_key, &secret).unwrap(); + let response_topic = topic_from_key(&response_topic_key); + + publish_subscribe_request( + relay_ws_client, + did_pkh.to_owned(), + app_client_id, + IdentityKeyDetails { + keys_server_url: &keys_server_url, + signing_key: identity_signing_key, + did_key: identity_did_key, + }, + TopicEncryptionSchemeAsymetric { + client_private: secret, + client_public: public, + server_public: app_key_agreement_key, + }, + app, + notification_types, + ) + .await; + + relay_ws_client + .subscribe(response_topic.clone()) + .await + .unwrap(); + + let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { + loop { + let msg = accept_message(rx).await; + if msg.tag == NOTIFY_SUBSCRIBE_RESPONSE_TAG && msg.topic == response_topic { + return msg; + } + } + }) + .await + .unwrap(); + + let auth = decode_response_message::(msg, &response_topic_key); + assert_eq!(auth.shared_claims.act, NOTIFY_SUBSCRIBE_RESPONSE_ACT); + assert_eq!(auth.shared_claims.iss, app_client_id.to_did_key()); + assert_eq!(auth.shared_claims.aud, identity_did_key); +} + +#[allow(clippy::too_many_arguments)] +async fn publish_jwt_message<'a>( + relay_ws_client: &Client, + client_id: &DecodedClientId, + identity_key_details: IdentityKeyDetails<'a>, + encryption_details: TopicEncryptionSchemeAsymetric, + tag: u32, + ttl: std::time::Duration, + act: &str, + make_message: impl FnOnce(SharedClaims) -> serde_json::Value, +) { + fn make_shared_claims( + now: DateTime, + ttl: std::time::Duration, + act: &str, + client_id: &DecodedClientId, + identity_key_details: IdentityKeyDetails<'_>, + ) -> SharedClaims { + SharedClaims { iat: now.timestamp() as u64, exp: add_ttl(now, ttl).timestamp() as u64, iss: identity_key_details.did_key.to_owned(), act: act.to_owned(), - aud: aud_authentication_key.to_did_key(), + aud: client_id.to_did_key(), mjv: "0".to_owned(), - }, - ksu: identity_key_details.keys_server_url.to_string(), - sub: did_pkh, - app: app_domain.map(|domain| DidWeb::from_domain(domain.to_owned())), - }; + } + } - let message = NotifyRequest::new( - method, - NotifyWatchSubscriptions { - watch_subscriptions_auth: encode_auth( - &subscription_auth, - identity_key_details.signing_key, - ), - }, - ); + let now = Utc::now(); - let (envelope, topic_key) = match encryption_details { - TopicEncryptionScheme::Asymetric { - client_private: client_secret, - client_public, - server_public, - } => { - let response_topic_key = derive_key(&server_public, &client_secret).unwrap(); - ( - Envelope::::new( - &response_topic_key, - message, - *client_public.as_bytes(), - ) + let message = make_message(make_shared_claims( + now, + ttl, + act, + client_id, + identity_key_details, + )); + + let TopicEncryptionSchemeAsymetric { + client_private: client_secret, + client_public, + server_public, + } = encryption_details; + let (envelope, topic_key) = { + let response_topic_key = derive_key(&server_public, &client_secret).unwrap(); + ( + Envelope::::new(&response_topic_key, message, *client_public.as_bytes()) .unwrap(), - server_public, - ) - } + server_public, + ) }; - let message = BASE64.encode(envelope.to_bytes()); let topic = topic_from_key(topic_key.as_bytes()); + let message = BASE64.encode(envelope.to_bytes()); + relay_ws_client .publish(topic, message, tag, ttl, false) .await .unwrap(); } +fn decode_response_message(msg: PublishedMessage, response_topic_key: &[u8; 32]) -> T +where + T: GetSharedClaims + DeserializeOwned, +{ + let Envelope:: { sealbox, iv, .. } = + Envelope::::from_bytes(BASE64.decode(msg.message.as_bytes()).unwrap()) + .unwrap(); + let decrypted_response = ChaCha20Poly1305::new(GenericArray::from_slice(response_topic_key)) + .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) + .unwrap(); + let response = + serde_json::from_slice::>(&decrypted_response).unwrap(); + + from_jwt::(&response.result.response_auth).unwrap() +} + #[allow(clippy::too_many_arguments)] async fn watch_subscriptions( notify_server_url: Url, @@ -2627,7 +2775,7 @@ async fn watch_subscriptions( relay_ws_client: &relay_client::websocket::Client, rx: &mut UnboundedReceiver, ) -> (Vec, [u8; 32]) { - let (key_agreement_key, authentication_key) = get_notify_did_json(¬ify_server_url).await; + let (key_agreement_key, client_id) = get_notify_did_json(¬ify_server_url).await; let secret = StaticSecret::random_from_rng(OsRng); let public = PublicKey::from(&secret); @@ -2635,22 +2783,21 @@ async fn watch_subscriptions( let response_topic_key = derive_key(&key_agreement_key, &secret).unwrap(); let response_topic = topic_from_key(&response_topic_key); - publish_jwt_message( + publish_watch_subscriptions_request( relay_ws_client, - app_domain, did_pkh.to_owned(), - &authentication_key, + &client_id, IdentityKeyDetails { keys_server_url: &keys_server_url, signing_key: identity_signing_key, did_key: identity_did_key, }, - MessageMethod::WatchSubscriptionsRequest, - TopicEncryptionScheme::Asymetric { + TopicEncryptionSchemeAsymetric { client_private: secret, client_public: public, server_public: key_agreement_key, }, + app_domain.map(|domain| DidWeb::from_domain(domain.to_owned())), ) .await; @@ -2659,14 +2806,6 @@ async fn watch_subscriptions( .await .unwrap(); - async fn accept_message(rx: &mut UnboundedReceiver) -> PublishedMessage { - let event = rx.recv().await.unwrap(); - match event { - RelayClientEvent::Message(msg) => msg, - e => panic!("Expected message, got {e:?}"), - } - } - let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { loop { let msg = accept_message(rx).await; @@ -2678,21 +2817,13 @@ async fn watch_subscriptions( .await .unwrap(); - let Envelope:: { sealbox, iv, .. } = - Envelope::::from_bytes(BASE64.decode(msg.message.as_bytes()).unwrap()) - .unwrap(); - let decrypted_response = ChaCha20Poly1305::new(GenericArray::from_slice(&response_topic_key)) - .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) - .unwrap(); - let response = - serde_json::from_slice::>(&decrypted_response).unwrap(); - - let auth = from_jwt::(&response.result.response_auth).unwrap(); + let auth = decode_response_message::(msg, &response_topic_key); assert_eq!( auth.shared_claims.act, NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT ); - assert_eq!(auth.shared_claims.iss, authentication_key.to_did_key()); + assert_eq!(auth.shared_claims.iss, client_id.to_did_key()); + assert_eq!(auth.shared_claims.aud, identity_did_key); (auth.sbs, response_topic_key) } @@ -2751,7 +2882,6 @@ async fn subscribe_topic( app_domain: String, notify_server_url: &Url, ) -> ( - Topic, x25519_dalek::PublicKey, ed25519_dalek::VerifyingKey, DecodedClientId, @@ -2778,7 +2908,6 @@ async fn subscribe_topic( let key_agreement = decode_key(&response.subscribe_key).unwrap(); ( - topic_from_key(&key_agreement), x25519_dalek::PublicKey::from(key_agreement), ed25519_dalek::VerifyingKey::from_bytes(&authentication).unwrap(), // Better approach, but dependency versions conflict right now @@ -2880,7 +3009,7 @@ async fn run_test( ) .await; - let (subscribe_topic, key_agreement, authentication, client_id) = + let (key_agreement, authentication, client_id) = subscribe_topic(&project_id, app_domain.clone(), ¬ify_server.url).await; let watch_topic_key = { @@ -2905,142 +3034,21 @@ async fn run_test( watch_topic_key }; - // ---------------------------------------------------- - // SUBSCRIBE WALLET CLIENT TO DAPP THROUGHT NOTIFY - // ---------------------------------------------------- - - // Prepare subscription auth for *wallet* client - // https://github.com/WalletConnect/walletconnect-docs/blob/main/docs/specs/clients/notify/notify-authentication.md#notify-subscription let notification_type = Uuid::new_v4(); let notification_types = HashSet::from([notification_type, Uuid::new_v4()]); - let now = Utc::now(); - let subscription_auth = SubscriptionRequestAuth { - shared_claims: SharedClaims { - iat: now.timestamp() as u64, - exp: add_ttl(now, NOTIFY_SUBSCRIBE_TTL).timestamp() as u64, - iss: identity_did_key.clone(), - act: "notify_subscription".to_owned(), - aud: client_id.to_did_key(), - mjv: "0".to_owned(), - }, - ksu: keys_server_url.to_string(), - sub: did_pkh.clone(), - scp: notification_types - .iter() - .map(ToString::to_string) - .collect::>() - .join(" "), - app: DidWeb::from_domain(app_domain.clone()), - }; - - // Encode the subscription auth - let subscription_auth = encode_auth(&subscription_auth, &identity_signing_key); - - let sub_auth = json!({ "subscriptionAuth": subscription_auth }); - let message = NotifyRequest::new(NOTIFY_SUBSCRIBE_METHOD, sub_auth); - - let subscription_secret = StaticSecret::random_from_rng(OsRng); - let subscription_public = PublicKey::from(&subscription_secret); - let response_topic_key = derive_key(&key_agreement, &subscription_secret).unwrap(); - - let cipher = ChaCha20Poly1305::new(GenericArray::from_slice(&response_topic_key)); - - let envelope: Envelope = Envelope::::new( - &response_topic_key, - message, - *subscription_public.as_bytes(), - ) - .unwrap(); - let message = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); - - // Get response topic for wallet client and notify communication - let response_topic = sha256::digest(&response_topic_key); - println!("subscription response_topic: {response_topic}"); - - // Subscribe to the topic and listen for response - relay_ws_client - .subscribe(response_topic.clone().into()) - .await - .unwrap(); - - // Send subscription request to notify - relay_ws_client - .publish( - subscribe_topic, - message, - NOTIFY_SUBSCRIBE_TAG, - NOTIFY_SUBSCRIBE_TTL, - false, - ) - .await - .unwrap(); - - let resp = rx.recv().await.unwrap(); - let RelayClientEvent::Message(msg) = resp else { - panic!("Expected message, got {:?}", resp); - }; - let msg = if msg.tag == NOTIFY_SUBSCRIBE_RESPONSE_TAG { - assert_eq!(msg.tag, NOTIFY_SUBSCRIBE_RESPONSE_TAG); - msg - } else { - println!( - "got additional message with unexpected tag {} msg.id {} and message_id {}", - msg.tag, - msg.message_id, - sha256::digest(msg.message.as_ref()), - ); - let Envelope:: { sealbox, iv, .. } = Envelope::::from_bytes( - base64::engine::general_purpose::STANDARD - .decode(msg.message.as_bytes()) - .unwrap(), - ) - .unwrap(); - let decrypted_response = ChaCha20Poly1305::new(GenericArray::from_slice(&watch_topic_key)) - .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) - .unwrap(); - let response: NotifyResponse = - serde_json::from_slice(&decrypted_response).unwrap(); - println!( - "warn: got additional message with unexpected tag {} msg.id {} and message_id {} RPC ID {}", - msg.tag, - msg.message_id, - sha256::digest(msg.message.as_ref()), - response.id, - ); - - let resp = rx.recv().await.unwrap(); - let RelayClientEvent::Message(msg) = resp else { - panic!("Expected message, got {:?}", resp); - }; - assert_eq!(msg.tag, NOTIFY_SUBSCRIBE_RESPONSE_TAG); - msg - }; - - let Envelope:: { sealbox, iv, .. } = Envelope::::from_bytes( - base64::engine::general_purpose::STANDARD - .decode(msg.message.as_bytes()) - .unwrap(), + subscribe( + &relay_ws_client, + &mut rx, + &did_pkh, + keys_server_url.clone(), + key_agreement, + &client_id, + &identity_signing_key, + &identity_did_key, + DidWeb::from_domain(app_domain.clone()), + notification_types.clone(), ) - .unwrap(); - - let decrypted_response = cipher - .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) - .unwrap(); - - let response: NotifyResponse = - serde_json::from_slice(&decrypted_response).unwrap(); - - let response_auth = response - .result - .get("responseAuth") // TODO use structure - .unwrap() - .as_str() - .unwrap(); - let subscribe_response_auth = from_jwt::(response_auth).unwrap(); - assert_eq!( - subscribe_response_auth.shared_claims.act, - "notify_subscription_response" - ); + .await; let notify_key = { let resp = rx.recv().await.unwrap(); From 72aba7c9c6b4eb4409231a6d64c31443dae83688 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Sat, 23 Dec 2023 09:25:10 +0900 Subject: [PATCH 2/3] chore: refactor subscriptions changed --- .../handlers/notify_watch_subscriptions.rs | 8 +- src/services/websocket_server/mod.rs | 6 + src/spec.rs | 4 + tests/integration.rs | 410 +++++++++++++----- 4 files changed, 311 insertions(+), 117 deletions(-) diff --git a/src/services/websocket_server/handlers/notify_watch_subscriptions.rs b/src/services/websocket_server/handlers/notify_watch_subscriptions.rs index 61939196..e3f2a3ad 100644 --- a/src/services/websocket_server/handlers/notify_watch_subscriptions.rs +++ b/src/services/websocket_server/handlers/notify_watch_subscriptions.rs @@ -20,7 +20,7 @@ use { registry::storage::redis::Redis, services::websocket_server::{ decode_key, derive_key, handlers::decrypt_message, NotifyRequest, NotifyResponse, - NotifyWatchSubscriptions, + NotifySubscriptionsChanged, NotifyWatchSubscriptions, }, spec::{ NOTIFY_SUBSCRIPTIONS_CHANGED_METHOD, NOTIFY_SUBSCRIPTIONS_CHANGED_TAG, @@ -293,8 +293,10 @@ pub async fn update_subscription_watchers( let auth = sign_jwt(response_message, authentication_secret)?; let request = NotifyRequest::new( NOTIFY_SUBSCRIPTIONS_CHANGED_METHOD, - json!({ "subscriptionsChangedAuth": auth }), - ); // TODO use structure + NotifySubscriptionsChanged { + subscriptions_changed_auth: auth, + }, + ); let sym_key = decode_key(sym_key)?; let envelope = Envelope::::new(&sym_key, request)?; diff --git a/src/services/websocket_server/mod.rs b/src/services/websocket_server/mod.rs index e7c43396..e95f18e4 100644 --- a/src/services/websocket_server/mod.rs +++ b/src/services/websocket_server/mod.rs @@ -247,6 +247,12 @@ pub struct NotifyWatchSubscriptions { pub watch_subscriptions_auth: String, } +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct NotifySubscriptionsChanged { + pub subscriptions_changed_auth: String, +} + #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct NotifySubscribe { diff --git a/src/spec.rs b/src/spec.rs index fcbf4c73..3170603b 100644 --- a/src/spec.rs +++ b/src/spec.rs @@ -20,6 +20,7 @@ pub const NOTIFY_UPDATE_RESPONSE_TAG: u32 = 4009; pub const NOTIFY_WATCH_SUBSCRIPTIONS_TAG: u32 = 4010; pub const NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG: u32 = 4011; pub const NOTIFY_SUBSCRIPTIONS_CHANGED_TAG: u32 = 4012; +pub const NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TAG: u32 = 4013; pub const NOTIFY_NOOP_TAG: u32 = 4050; // TTLs @@ -38,11 +39,14 @@ pub const NOTIFY_UPDATE_RESPONSE_TTL: Duration = T2592000; pub const NOTIFY_WATCH_SUBSCRIPTIONS_TTL: Duration = T300; pub const NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TTL: Duration = T300; pub const NOTIFY_SUBSCRIPTIONS_CHANGED_TTL: Duration = T300; +pub const NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TTL: Duration = T300; // acts // https://specs.walletconnect.com/2.0/specs/clients/notify/notify-authentication pub const NOTIFY_WATCH_SUBSCRIPTIONS_ACT: &str = "notify_watch_subscriptions"; pub const NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT: &str = "notify_watch_subscriptions_response"; +pub const NOTIFY_SUBSCRIPTIONS_CHANGED_ACT: &str = "notify_subscriptions_changed"; +pub const NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONE_ACT: &str = "notify_subscriptions_changed_response"; pub const NOTIFY_SUBSCRIBE_ACT: &str = "notify_subscription"; pub const NOTIFY_SUBSCRIBE_RESPONSE_ACT: &str = "notify_subscription_response"; diff --git a/tests/integration.rs b/tests/integration.rs index 48f21642..f300f19c 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -15,9 +15,10 @@ use { SubscriptionDeleteRequestAuth, SubscriptionDeleteResponseAuth, SubscriptionRequestAuth, SubscriptionResponseAuth, SubscriptionUpdateRequestAuth, SubscriptionUpdateResponseAuth, WatchSubscriptionsChangedRequestAuth, - WatchSubscriptionsRequestAuth, WatchSubscriptionsResponseAuth, - KEYS_SERVER_IDENTITY_ENDPOINT, KEYS_SERVER_IDENTITY_ENDPOINT_PUBLIC_KEY_QUERY, - KEYS_SERVER_STATUS_SUCCESS, STATEMENT_ALL_DOMAINS, STATEMENT_THIS_DOMAIN, + WatchSubscriptionsChangedResponseAuth, WatchSubscriptionsRequestAuth, + WatchSubscriptionsResponseAuth, KEYS_SERVER_IDENTITY_ENDPOINT, + KEYS_SERVER_IDENTITY_ENDPOINT_PUBLIC_KEY_QUERY, KEYS_SERVER_STATUS_SUCCESS, + STATEMENT_ALL_DOMAINS, STATEMENT_THIS_DOMAIN, }, config::Configuration, jsonrpc::NotifyPayload, @@ -56,18 +57,22 @@ use { }, websocket_server::{ decode_key, derive_key, relay_ws_client::RelayClientEvent, NotifyRequest, - NotifyResponse, NotifySubscribe, NotifyWatchSubscriptions, ResponseAuth, + NotifyResponse, NotifySubscribe, NotifySubscriptionsChanged, + NotifyWatchSubscriptions, ResponseAuth, }, }, spec::{ NOTIFY_DELETE_METHOD, NOTIFY_DELETE_RESPONSE_TAG, NOTIFY_DELETE_TAG, NOTIFY_DELETE_TTL, NOTIFY_MESSAGE_TAG, NOTIFY_NOOP_TAG, NOTIFY_SUBSCRIBE_ACT, NOTIFY_SUBSCRIBE_METHOD, NOTIFY_SUBSCRIBE_RESPONSE_ACT, NOTIFY_SUBSCRIBE_RESPONSE_TAG, NOTIFY_SUBSCRIBE_TAG, - NOTIFY_SUBSCRIBE_TTL, NOTIFY_SUBSCRIPTIONS_CHANGED_TAG, NOTIFY_UPDATE_METHOD, - NOTIFY_UPDATE_RESPONSE_TAG, NOTIFY_UPDATE_TAG, NOTIFY_UPDATE_TTL, - NOTIFY_WATCH_SUBSCRIPTIONS_ACT, NOTIFY_WATCH_SUBSCRIPTIONS_METHOD, - NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT, NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG, - NOTIFY_WATCH_SUBSCRIPTIONS_TAG, NOTIFY_WATCH_SUBSCRIPTIONS_TTL, + NOTIFY_SUBSCRIBE_TTL, NOTIFY_SUBSCRIPTIONS_CHANGED_ACT, + NOTIFY_SUBSCRIPTIONS_CHANGED_METHOD, NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONE_ACT, + NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TAG, NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TTL, + NOTIFY_SUBSCRIPTIONS_CHANGED_TAG, NOTIFY_UPDATE_METHOD, NOTIFY_UPDATE_RESPONSE_TAG, + NOTIFY_UPDATE_TAG, NOTIFY_UPDATE_TTL, NOTIFY_WATCH_SUBSCRIPTIONS_ACT, + NOTIFY_WATCH_SUBSCRIPTIONS_METHOD, NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT, + NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TAG, NOTIFY_WATCH_SUBSCRIPTIONS_TAG, + NOTIFY_WATCH_SUBSCRIPTIONS_TTL, }, types::{Envelope, EnvelopeType0, EnvelopeType1, Notification}, }, @@ -2540,6 +2545,11 @@ struct TopicEncryptionSchemeAsymetric { server_public: x25519_dalek::PublicKey, } +enum TopicEncrptionScheme<'a> { + Asymetric(TopicEncryptionSchemeAsymetric), + Symetric(&'a [u8; 32]), +} + async fn publish_watch_subscriptions_request<'a>( relay_ws_client: &Client, did_pkh: String, @@ -2552,7 +2562,7 @@ async fn publish_watch_subscriptions_request<'a>( relay_ws_client, client_id, identity_key_details.clone(), - encryption_details, + TopicEncrptionScheme::Asymetric(encryption_details), NOTIFY_WATCH_SUBSCRIPTIONS_TAG, NOTIFY_WATCH_SUBSCRIPTIONS_TTL, NOTIFY_WATCH_SUBSCRIPTIONS_ACT, @@ -2590,7 +2600,7 @@ async fn publish_subscribe_request<'a>( relay_ws_client, client_id, identity_key_details.clone(), - encryption_details, + TopicEncrptionScheme::Asymetric(encryption_details), NOTIFY_SUBSCRIBE_TAG, NOTIFY_SUBSCRIBE_TTL, NOTIFY_SUBSCRIBE_ACT, @@ -2681,7 +2691,7 @@ async fn subscribe( .await .unwrap(); - let auth = decode_response_message::(msg, &response_topic_key); + let (_id, auth) = decode_response_message::(msg, &response_topic_key); assert_eq!(auth.shared_claims.act, NOTIFY_SUBSCRIBE_RESPONSE_ACT); assert_eq!(auth.shared_claims.iss, app_client_id.to_did_key()); assert_eq!(auth.shared_claims.aud, identity_did_key); @@ -2692,7 +2702,7 @@ async fn publish_jwt_message<'a>( relay_ws_client: &Client, client_id: &DecodedClientId, identity_key_details: IdentityKeyDetails<'a>, - encryption_details: TopicEncryptionSchemeAsymetric, + encryption_details: TopicEncrptionScheme<'a>, tag: u32, ttl: std::time::Duration, act: &str, @@ -2725,22 +2735,33 @@ async fn publish_jwt_message<'a>( identity_key_details, )); - let TopicEncryptionSchemeAsymetric { - client_private: client_secret, - client_public, - server_public, - } = encryption_details; - let (envelope, topic_key) = { - let response_topic_key = derive_key(&server_public, &client_secret).unwrap(); - ( - Envelope::::new(&response_topic_key, message, *client_public.as_bytes()) - .unwrap(), + let (envelope, topic) = match encryption_details { + TopicEncrptionScheme::Asymetric(TopicEncryptionSchemeAsymetric { + client_private: client_secret, + client_public, server_public, - ) + }) => { + let response_topic_key = derive_key(&server_public, &client_secret).unwrap(); + ( + Envelope::::new( + &response_topic_key, + message, + *client_public.as_bytes(), + ) + .unwrap() + .to_bytes(), + topic_from_key(server_public.as_bytes()), + ) + } + TopicEncrptionScheme::Symetric(sym_key) => ( + Envelope::::new(sym_key, message) + .unwrap() + .to_bytes(), + topic_from_key(sym_key), + ), }; - let topic = topic_from_key(topic_key.as_bytes()); - let message = BASE64.encode(envelope.to_bytes()); + let message = BASE64.encode(envelope); relay_ws_client .publish(topic, message, tag, ttl, false) @@ -2748,20 +2769,28 @@ async fn publish_jwt_message<'a>( .unwrap(); } -fn decode_response_message(msg: PublishedMessage, response_topic_key: &[u8; 32]) -> T +fn decode_message(msg: PublishedMessage, key: &[u8; 32]) -> T where - T: GetSharedClaims + DeserializeOwned, + T: DeserializeOwned, { let Envelope:: { sealbox, iv, .. } = Envelope::::from_bytes(BASE64.decode(msg.message.as_bytes()).unwrap()) .unwrap(); - let decrypted_response = ChaCha20Poly1305::new(GenericArray::from_slice(response_topic_key)) + let decrypted_response = ChaCha20Poly1305::new(GenericArray::from_slice(key)) .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) .unwrap(); - let response = - serde_json::from_slice::>(&decrypted_response).unwrap(); + serde_json::from_slice::(&decrypted_response).unwrap() +} - from_jwt::(&response.result.response_auth).unwrap() +fn decode_response_message(msg: PublishedMessage, key: &[u8; 32]) -> (u64, T) +where + T: GetSharedClaims + DeserializeOwned, +{ + let response = decode_message::>(msg, key); + ( + response.id, + from_jwt::(&response.result.response_auth).unwrap(), + ) } #[allow(clippy::too_many_arguments)] @@ -2774,7 +2803,7 @@ async fn watch_subscriptions( did_pkh: &str, relay_ws_client: &relay_client::websocket::Client, rx: &mut UnboundedReceiver, -) -> (Vec, [u8; 32]) { +) -> (Vec, [u8; 32], DecodedClientId) { let (key_agreement_key, client_id) = get_notify_did_json(¬ify_server_url).await; let secret = StaticSecret::random_from_rng(OsRng); @@ -2817,7 +2846,8 @@ async fn watch_subscriptions( .await .unwrap(); - let auth = decode_response_message::(msg, &response_topic_key); + let (_id, auth) = + decode_response_message::(msg, &response_topic_key); assert_eq!( auth.shared_claims.act, NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT @@ -2825,7 +2855,95 @@ async fn watch_subscriptions( assert_eq!(auth.shared_claims.iss, client_id.to_did_key()); assert_eq!(auth.shared_claims.aud, identity_did_key); - (auth.sbs, response_topic_key) + (auth.sbs, response_topic_key, client_id) +} + +async fn publish_subscriptions_changed_response<'a>( + relay_ws_client: &Client, + did_pkh: String, + client_id: &DecodedClientId, + identity_key_details: IdentityKeyDetails<'a>, + sym_key: &[u8; 32], + id: u64, +) { + publish_jwt_message( + relay_ws_client, + client_id, + identity_key_details.clone(), + TopicEncrptionScheme::Symetric(sym_key), + NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TAG, + NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONSE_TTL, + NOTIFY_SUBSCRIPTIONS_CHANGED_RESPONE_ACT, + |shared_claims| { + serde_json::to_value(NotifyResponse::new( + id, + ResponseAuth { + response_auth: encode_auth( + &WatchSubscriptionsChangedResponseAuth { + shared_claims, + ksu: identity_key_details.keys_server_url.to_string(), + sub: did_pkh.clone(), + }, + identity_key_details.signing_key, + ), + }, + )) + .unwrap() + }, + ) + .await +} + +#[allow(clippy::too_many_arguments)] +async fn accept_watch_subscriptions_changed( + keys_server_url: Url, + notify_server_client_id: &DecodedClientId, + identity_signing_key: &SigningKey, + identity_did_key: &str, + did_pkh: &str, + watch_topic_key: &[u8; 32], + relay_ws_client: &relay_client::websocket::Client, + rx: &mut UnboundedReceiver, +) -> Vec { + let msg = tokio::time::timeout(std::time::Duration::from_secs(5), async { + loop { + let msg = accept_message(rx).await; + if msg.tag == NOTIFY_SUBSCRIPTIONS_CHANGED_TAG + && msg.topic == topic_from_key(watch_topic_key) + { + return msg; + } + } + }) + .await + .unwrap(); + + let request = decode_message::>(msg, watch_topic_key); + assert_eq!(request.method, NOTIFY_SUBSCRIPTIONS_CHANGED_METHOD); + let auth = from_jwt::( + &request.params.subscriptions_changed_auth, + ) + .unwrap(); + + assert_eq!(auth.shared_claims.act, NOTIFY_SUBSCRIPTIONS_CHANGED_ACT); + assert_eq!(auth.shared_claims.iss, notify_server_client_id.to_did_key()); + assert_eq!(auth.shared_claims.aud, identity_did_key); + + publish_subscriptions_changed_response( + relay_ws_client, + did_pkh.to_owned(), + notify_server_client_id, + IdentityKeyDetails { + keys_server_url: &keys_server_url, + signing_key: identity_signing_key, + did_key: identity_did_key, + }, + watch_topic_key, + request.id, + ) + .await; + + auth.sbs } fn generate_identity_key() -> (SigningKey, DecodedClientId) { @@ -3012,27 +3130,22 @@ async fn run_test( let (key_agreement, authentication, client_id) = subscribe_topic(&project_id, app_domain.clone(), ¬ify_server.url).await; - let watch_topic_key = { - let (subs, watch_topic_key) = watch_subscriptions( - notify_server.url.clone(), - keys_server_url.clone(), - if watch_subscriptions_all_domains { - None - } else { - Some(&app_domain) - }, - &identity_signing_key, - &identity_did_key, - &did_pkh, - &relay_ws_client, - &mut rx, - ) - .await; - - assert!(subs.is_empty()); - - watch_topic_key - }; + let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( + notify_server.url.clone(), + keys_server_url.clone(), + if watch_subscriptions_all_domains { + None + } else { + Some(&app_domain) + }, + &identity_signing_key, + &identity_did_key, + &did_pkh, + &relay_ws_client, + &mut rx, + ) + .await; + assert!(subs.is_empty()); let notification_type = Uuid::new_v4(); let notification_types = HashSet::from([notification_type, Uuid::new_v4()]); @@ -3050,56 +3163,35 @@ async fn run_test( ) .await; - let notify_key = { - let resp = rx.recv().await.unwrap(); - - let RelayClientEvent::Message(msg) = resp else { - panic!("Expected message, got {:?}", resp); - }; - assert_eq!(msg.tag, NOTIFY_SUBSCRIPTIONS_CHANGED_TAG); - - let Envelope:: { sealbox, iv, .. } = Envelope::::from_bytes( - base64::engine::general_purpose::STANDARD - .decode(msg.message.as_bytes()) - .unwrap(), - ) - .unwrap(); - - let decrypted_response = ChaCha20Poly1305::new(GenericArray::from_slice(&watch_topic_key)) - .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) - .unwrap(); - - let response: NotifyRequest = - serde_json::from_slice(&decrypted_response).unwrap(); - - let response_auth = response - .params - .get("subscriptionsChangedAuth") // TODO use structure + let subs = accept_watch_subscriptions_changed( + keys_server_url.clone(), + ¬ify_server_client_id, + &identity_signing_key, + &identity_public_key.to_did_key(), + &did_pkh, + &watch_topic_key, + &relay_ws_client, + &mut rx, + ) + .await; + assert_eq!(subs.len(), 1); + let sub = &subs[0]; + assert_eq!(sub.scope, notification_types); + assert_eq!(sub.account, account); + assert_eq!(sub.app_domain, app_domain); + assert_eq!(sub.app_authentication_key, client_id.to_did_key()); + assert_eq!( + &DecodedClientId::try_from_did_key(&sub.app_authentication_key) .unwrap() - .as_str() - .unwrap(); - let auth = from_jwt::(response_auth).unwrap(); - assert_eq!(auth.shared_claims.act, "notify_subscriptions_changed"); - assert_eq!(auth.sbs.len(), 1); - let sub = &auth.sbs[0]; - assert_eq!(sub.scope, notification_types); - assert_eq!(sub.account, account); - assert_eq!(sub.app_domain, app_domain); - assert_eq!(sub.app_authentication_key, client_id.to_did_key()); - assert_eq!( - &DecodedClientId::try_from_did_key(&sub.app_authentication_key) - .unwrap() - .0, - authentication.as_bytes() - ); - assert_eq!(sub.scope, notification_types); - decode_key(&sub.sym_key).unwrap() - }; + .0, + authentication.as_bytes() + ); - let notify_topic = sha256::digest(¬ify_key); + let notify_key = decode_key(&sub.sym_key).unwrap(); + let notify_topic = topic_from_key(¬ify_key); relay_ws_client - .subscribe(notify_topic.clone().into()) + .subscribe(notify_topic.clone()) .await .unwrap(); @@ -3218,7 +3310,7 @@ async fn run_test( relay_ws_client .publish( - notify_topic.clone().into(), + notify_topic.clone(), encoded_message, NOTIFY_UPDATE_TAG, NOTIFY_UPDATE_TTL, @@ -3331,7 +3423,7 @@ async fn run_test( relay_ws_client .publish( - notify_topic.into(), + notify_topic, encoded_message, NOTIFY_DELETE_TAG, NOTIFY_DELETE_TTL, @@ -3512,11 +3604,13 @@ async fn notify_all_domains(notify_server: &NotifyServerContext) { let project_id1 = ProjectId::generate(); let app_domain1 = format!("{project_id1}.example.com"); - subscribe_topic(&project_id1, app_domain1.clone(), ¬ify_server.url).await; + let (key_agreement1, authentication1, client_id1) = + subscribe_topic(&project_id1, app_domain1.clone(), ¬ify_server.url).await; let project_id2 = ProjectId::generate(); let app_domain2 = format!("{project_id2}.example.com"); - subscribe_topic(&project_id2, app_domain2.clone(), ¬ify_server.url).await; + let (key_agreement2, authentication2, client_id2) = + subscribe_topic(&project_id2, app_domain2.clone(), ¬ify_server.url).await; let vars = get_vars(); let (relay_ws_client, mut rx) = create_client( @@ -3526,7 +3620,7 @@ async fn notify_all_domains(notify_server: &NotifyServerContext) { ) .await; - let (_subs, _watch_topic_key) = watch_subscriptions( + let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( notify_server.url.clone(), mock_keys_server_url.clone(), None, @@ -3537,14 +3631,102 @@ async fn notify_all_domains(notify_server: &NotifyServerContext) { &mut rx, ) .await; + assert!(subs.is_empty()); - // TODO subscribe to project 1 - // TODO assert response - // TODO assert project 1 subscription in watch subscriptions changed + let notification_type1 = Uuid::new_v4(); + let notification_types1 = HashSet::from([notification_type1, Uuid::new_v4()]); + subscribe( + &relay_ws_client, + &mut rx, + &did_pkh, + mock_keys_server_url.clone(), + key_agreement1, + &client_id1, + &identity_signing_key, + &identity_public_key.to_did_key(), + DidWeb::from_domain(app_domain1.clone()), + notification_types1.clone(), + ) + .await; + let subs = accept_watch_subscriptions_changed( + mock_keys_server_url.clone(), + ¬ify_server_client_id, + &identity_signing_key, + &identity_public_key.to_did_key(), + &did_pkh, + &watch_topic_key, + &relay_ws_client, + &mut rx, + ) + .await; + assert_eq!(subs.len(), 1); + let sub = &subs[0]; + assert_eq!(sub.scope, notification_types1); + assert_eq!(sub.account, account); + assert_eq!(sub.app_domain, app_domain1); + assert_eq!(sub.app_authentication_key, client_id1.to_did_key()); + assert_eq!( + &DecodedClientId::try_from_did_key(&sub.app_authentication_key) + .unwrap() + .0, + authentication1.as_bytes() + ); - // TODO subscribe to project 2 - // TODO assert response - // TODO assert project 2 subscription in watch subscriptions changed + let notification_type2 = Uuid::new_v4(); + let notification_types2 = HashSet::from([notification_type2, Uuid::new_v4()]); + subscribe( + &relay_ws_client, + &mut rx, + &did_pkh, + mock_keys_server_url.clone(), + key_agreement2, + &client_id2, + &identity_signing_key, + &identity_public_key.to_did_key(), + DidWeb::from_domain(app_domain2.clone()), + notification_types2.clone(), + ) + .await; + let subs = accept_watch_subscriptions_changed( + mock_keys_server_url.clone(), + ¬ify_server_client_id, + &identity_signing_key, + &identity_public_key.to_did_key(), + &did_pkh, + &watch_topic_key, + &relay_ws_client, + &mut rx, + ) + .await; + assert_eq!(subs.len(), 2); + let sub1 = subs + .iter() + .find(|sub| sub.app_domain == app_domain1) + .unwrap(); + assert_eq!(sub1.scope, notification_types1); + assert_eq!(sub1.account, account); + assert_eq!(sub1.app_domain, app_domain1); + assert_eq!(sub1.app_authentication_key, client_id1.to_did_key()); + assert_eq!( + &DecodedClientId::try_from_did_key(&sub1.app_authentication_key) + .unwrap() + .0, + authentication1.as_bytes() + ); + let sub2 = subs + .iter() + .find(|sub| sub.app_domain == app_domain2) + .unwrap(); + assert_eq!(sub2.scope, notification_types2); + assert_eq!(sub2.account, account); + assert_eq!(sub2.app_domain, app_domain2); + assert_eq!(sub2.app_authentication_key, client_id2.to_did_key()); + assert_eq!( + &DecodedClientId::try_from_did_key(&sub2.app_authentication_key) + .unwrap() + .0, + authentication2.as_bytes() + ); } #[test_context(NotifyServerContext)] @@ -3591,7 +3773,7 @@ async fn works_with_staging_keys_server(notify_server: &NotifyServerContext) { ) .await; - let (_subs, _watch_topic_key) = watch_subscriptions( + let (_subs, _watch_topic_key, _notify_server_client_id) = watch_subscriptions( notify_server.url.clone(), keys_server_url.clone(), Some(&app_domain), From 37147ae7a4555fbd81bfb3ebf43d6385a4851c00 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Sat, 23 Dec 2023 09:39:06 +0900 Subject: [PATCH 3/3] chore: separate this domain test --- tests/integration.rs | 127 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 124 insertions(+), 3 deletions(-) diff --git a/tests/integration.rs b/tests/integration.rs index f300f19c..f40e456f 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -3551,7 +3551,7 @@ async fn notify_all_domains_old(notify_server: &NotifyServerContext) { #[test_context(NotifyServerContext)] #[tokio::test] -async fn notify_this_domain(notify_server: &NotifyServerContext) { +async fn notify_integration(notify_server: &NotifyServerContext) { run_test(STATEMENT_THIS_DOMAIN.to_owned(), false, notify_server).await } @@ -3729,6 +3729,129 @@ async fn notify_all_domains(notify_server: &NotifyServerContext) { ); } +#[test_context(NotifyServerContext)] +#[tokio::test] +async fn notify_this_domain(notify_server: &NotifyServerContext) { + let (identity_signing_key, identity_public_key) = generate_identity_key(); + + let (account_signing_key, account) = generate_account(); + let did_pkh = account.to_did_pkh(); + + let mock_keys_server = MockServer::start().await; + let mock_keys_server_url = mock_keys_server.uri().parse::().unwrap(); + + let project_id1 = ProjectId::generate(); + let app_domain1 = format!("{project_id1}.example.com"); + let (key_agreement1, _authentication1, client_id1) = + subscribe_topic(&project_id1, app_domain1.clone(), ¬ify_server.url).await; + + register_mocked_identity_key( + &mock_keys_server, + identity_public_key.clone(), + sign_cacao( + app_domain1.clone(), + did_pkh.clone(), + STATEMENT_THIS_DOMAIN.to_owned(), + identity_public_key.clone(), + mock_keys_server_url.to_string(), + account_signing_key, + ), + ) + .await; + + let project_id2 = ProjectId::generate(); + let app_domain2 = format!("{project_id2}.example.com"); + let (key_agreement2, _authentication2, client_id2) = + subscribe_topic(&project_id2, app_domain2.clone(), ¬ify_server.url).await; + + let vars = get_vars(); + let (relay_ws_client, mut rx) = create_client( + vars.relay_url.parse().unwrap(), + vars.project_id.into(), + notify_server.url.clone(), + ) + .await; + + let (subs, watch_topic_key, notify_server_client_id) = watch_subscriptions( + notify_server.url.clone(), + mock_keys_server_url.clone(), + Some(&app_domain1), + &identity_signing_key, + &identity_public_key.to_did_key(), + &account.to_did_pkh(), + &relay_ws_client, + &mut rx, + ) + .await; + assert!(subs.is_empty()); + + let notification_type1 = Uuid::new_v4(); + let notification_types1 = HashSet::from([notification_type1, Uuid::new_v4()]); + subscribe( + &relay_ws_client, + &mut rx, + &did_pkh, + mock_keys_server_url.clone(), + key_agreement1, + &client_id1, + &identity_signing_key, + &identity_public_key.to_did_key(), + DidWeb::from_domain(app_domain1.clone()), + notification_types1.clone(), + ) + .await; + let subs = accept_watch_subscriptions_changed( + mock_keys_server_url.clone(), + ¬ify_server_client_id, + &identity_signing_key, + &identity_public_key.to_did_key(), + &did_pkh, + &watch_topic_key, + &relay_ws_client, + &mut rx, + ) + .await; + assert_eq!(subs.len(), 1); + let sub = &subs[0]; + assert_eq!(sub.account, account); + assert_eq!(sub.app_domain, app_domain1); + + let notification_type2 = Uuid::new_v4(); + let notification_types2 = HashSet::from([notification_type2, Uuid::new_v4()]); + let result = tokio::time::timeout( + std::time::Duration::from_secs(1), + subscribe( + &relay_ws_client, + &mut rx, + &did_pkh, + mock_keys_server_url.clone(), + key_agreement2, + &client_id2, + &identity_signing_key, + &identity_public_key.to_did_key(), + DidWeb::from_domain(app_domain2.clone()), + notification_types2.clone(), + ), + ) + .await; + assert!(result.is_err()); + let result = tokio::time::timeout( + std::time::Duration::from_secs(1), + accept_watch_subscriptions_changed( + mock_keys_server_url.clone(), + ¬ify_server_client_id, + &identity_signing_key, + &identity_public_key.to_did_key(), + &did_pkh, + &watch_topic_key, + &relay_ws_client, + &mut rx, + ), + ) + .await; + assert!(result.is_err()); +} + #[test_context(NotifyServerContext)] #[tokio::test] async fn works_with_staging_keys_server(notify_server: &NotifyServerContext) { @@ -3796,5 +3919,3 @@ async fn works_with_staging_keys_server(notify_server: &NotifyServerContext) { // TODO test updating from 1, to 0, to 2 scopes // TODO test deleting and re-subscribing -// TODO adapt test THIS domain -// TODO assert failure (no response, and no subscriptions changed) when subscribing to project that SIWE doesn't allow