diff --git a/justfile b/justfile index 37f43bc7..008089f2 100644 --- a/justfile +++ b/justfile @@ -9,7 +9,7 @@ run: test ENV: @echo '==> Running integration tests' - ENVIRONMENT="{{ENV}}" cargo test --test integration + ENVIRONMENT="{{ENV}}" cargo test --test integration -- --nocapture deploy-terraform ENV: @echo '==> Deploying terraform on env {{ENV}}' diff --git a/src/error.rs b/src/error.rs index 5b1b6bdb..a133ea8b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -134,6 +134,9 @@ pub enum Error { #[error(transparent)] ToStrError(#[from] hyper::header::ToStrError), + + #[error(transparent)] + EdDalek(#[from] ed25519_dalek::ed25519::Error), } impl IntoResponse for Error { diff --git a/src/handlers/notify.rs b/src/handlers/notify.rs index da0cd3e5..d9eb70f7 100644 --- a/src/handlers/notify.rs +++ b/src/handlers/notify.rs @@ -1,4 +1,5 @@ use { + super::subscribe_topic::ProjectData, crate::{ analytics::message_info::MessageInfo, error, @@ -14,12 +15,14 @@ use { Json, }, base64::Engine, + ed25519_dalek::Signer, error::Result, futures::FutureExt, log::warn, mongodb::bson::doc, relay_rpc::{ domain::Topic, + jwt::{JwtHeader, JWT_HEADER_ALG, JWT_HEADER_TYP}, rpc::{msg_id::MsgId, Publish}, }, serde::{Deserialize, Serialize}, @@ -29,6 +32,8 @@ use { wc::metrics::otel::{Context, KeyValue}, }; +const NOTIFY_MSG_TTL: u64 = 2592000; + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct NotifyBody { pub notification: Notification, @@ -89,9 +94,24 @@ pub async fn handler( .find(doc! { "_id": {"$in": &accounts}}, None) .await?; + let project_data: ProjectData = state + .database + .collection::("project_data") + .find_one(doc! { "_id": project_id.clone()}, None) + .await? + .ok_or(error::Error::NoProjectDataForTopic(project_id.clone()))?; + + let notify_pubkey = bs58::encode(state.keypair.public_key().as_bytes()).into_string(); // Generate publish jobs - this will also remove accounts from not_found // Prepares the encrypted message and gets the topic for each account - let jobs = generate_publish_jobs(notification, cursor, &mut response).await?; + let jobs = generate_publish_jobs( + notification, + cursor, + &mut response, + &project_data, + ¬ify_pubkey, + ) + .await?; // Attempts to send to all found accounts, waiting for relay ack for // NOTIFY_TIMEOUT seconds @@ -174,7 +194,7 @@ async fn process_publish_jobs( job.topic.clone(), job.message, 4002, - Duration::from_secs(2592000), + Duration::from_secs(NOTIFY_MSG_TTL), true, ), ) @@ -214,17 +234,13 @@ async fn generate_publish_jobs( notification: Notification, mut cursor: mongodb::Cursor, response: &mut Response, + project_data: &ProjectData, + notify_pubkey: &str, ) -> Result> { let mut jobs = vec![]; let id = chrono::Utc::now().timestamp_millis().unsigned_abs(); - let message = JsonRpcPayload { - id, - jsonrpc: "2.0".to_string(), - params: JsonRpcParams::Push(notification.clone()), - }; - while let Some(client_data) = cursor.try_next().await? { response.not_found.remove(&client_data.id); @@ -236,6 +252,17 @@ async fn generate_publish_jobs( continue; } + let message = JsonRpcPayload { + id, + jsonrpc: "2.0".to_string(), + params: JsonRpcParams::Push(sign_message( + ¬ification, + project_data, + notify_pubkey, + &client_data, + )?), + }; + let envelope = Envelope::::new(&client_data.sym_key, &message)?; let base64_notification = @@ -286,3 +313,68 @@ fn send_metrics( KeyValue::new("project_id", project_id), ]) } + +fn sign_message( + msg: &Notification, + project_data: &ProjectData, + notify_pubkey: &str, + client_data: &ClientData, +) -> Result { + let msg = { + let msg = JwtMessage { + iat: chrono::Utc::now().timestamp(), + exp: (chrono::Utc::now() + chrono::Duration::seconds(NOTIFY_MSG_TTL as i64)) + .timestamp(), + iss: format!("did:key:{}", notify_pubkey), + ksu: client_data.ksu.to_string(), + aud: format!("did:pkh:{}", client_data.id), + act: "notify_message".to_string(), + sub: client_data.sub_auth_hash.clone(), + app: project_data.dapp_url.to_string(), + msg: msg.clone(), + }; + let serialized = serde_json::to_string(&msg)?; + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(serialized) + }; + + let header = { + let data = JwtHeader { + typ: JWT_HEADER_TYP, + alg: JWT_HEADER_ALG, + }; + + let serialized = serde_json::to_string(&data)?; + + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(serialized) + }; + + let private_key = ed25519_dalek::SecretKey::from_bytes(&hex::decode( + project_data.identity_keypair.private_key.clone(), + )?)?; + + let public_key = ed25519_dalek::PublicKey::from(&private_key); + + let keypair = ed25519_dalek::Keypair { + secret: private_key, + public: public_key, + }; + + let message = format!("{}.{}", header, msg); + let signature = + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(keypair.sign(message.as_bytes())); + + Ok(format!("{}.{}", message, signature)) +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct JwtMessage { + pub iat: i64, // issued at + pub exp: i64, // expiry + pub iss: String, // public key of cast server (did:key) + pub ksu: String, // key server url + pub aud: String, // blockchain account (did:pkh) + pub act: String, // action intent (must be "notify_message") + pub sub: String, // subscriptionId (sha256 hash of subscriptionAuth) + pub app: String, // dapp domain url + pub msg: Notification, // message +} diff --git a/src/handlers/subscribe_topic.rs b/src/handlers/subscribe_topic.rs index 4febe4f9..7633e0cc 100644 --- a/src/handlers/subscribe_topic.rs +++ b/src/handlers/subscribe_topic.rs @@ -17,6 +17,7 @@ pub struct ProjectData { pub id: String, pub identity_keypair: Keypair, pub signing_keypair: Keypair, + pub dapp_url: String, pub topic: String, } @@ -73,6 +74,8 @@ pub async fn handler( private_key: hex::encode(identity_secret.to_bytes()), public_key: identity_public.clone(), }, + // TODO: Proper dapp url + dapp_url: "http://localhost:3000".into(), topic: topic.clone(), }; diff --git a/src/jsonrpc.rs b/src/jsonrpc.rs index d2469a57..c7d68e9d 100644 --- a/src/jsonrpc.rs +++ b/src/jsonrpc.rs @@ -1,7 +1,4 @@ -use { - crate::types::Notification, - serde::{Deserialize, Serialize}, -}; +use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] pub struct JsonRpcPayload { @@ -15,5 +12,5 @@ pub struct JsonRpcPayload { #[serde(tag = "method", content = "params")] pub enum JsonRpcParams { #[serde(rename = "wc_pushMessage")] - Push(Notification), + Push(String), } diff --git a/src/state.rs b/src/state.rs index 9a7f0730..10511111 100644 --- a/src/state.rs +++ b/src/state.rs @@ -61,7 +61,7 @@ impl AppState { pub async fn register_client( &self, project_id: &str, - client_data: &ClientData, + client_data: ClientData, url: &Url, ) -> Result<()> { let key = hex::decode(client_data.sym_key.clone())?; @@ -72,6 +72,7 @@ impl AppState { relay_url: url.to_string().trim_end_matches('/').to_string(), sym_key: client_data.sym_key.clone(), scope: client_data.scope.clone(), + ..client_data }; self.database diff --git a/src/types.rs b/src/types.rs index eb4ed292..dca74898 100644 --- a/src/types.rs +++ b/src/types.rs @@ -31,6 +31,9 @@ pub struct ClientData { pub id: String, pub relay_url: String, pub sym_key: String, + pub sub_auth_hash: String, + pub expiry: u64, + pub ksu: String, pub scope: HashSet, } diff --git a/src/websocket_service/handlers/push_subscribe.rs b/src/websocket_service/handlers/push_subscribe.rs index 155d62e7..0002af89 100644 --- a/src/websocket_service/handlers/push_subscribe.rs +++ b/src/websocket_service/handlers/push_subscribe.rs @@ -57,6 +57,7 @@ pub async fn handle( let id = msg.id; let sub_auth = SubscriptionAuth::from_jwt(&msg.params.subscription_auth)?; + let sub_auth_hash = sha256::digest(msg.params.subscription_auth); let secret = StaticSecret::random_from_rng(chacha20poly1305::aead::OsRng {}); let public = PublicKey::from(&secret); @@ -87,6 +88,9 @@ pub async fn handle( relay_url: state.config.relay_url.clone(), sym_key: push_key.clone(), scope: sub_auth.scp.split(' ').map(|s| s.into()).collect(), + expiry: sub_auth.exp, + sub_auth_hash, + ksu: sub_auth.ksu, }; let push_topic = sha256::digest(&*hex::decode(&push_key)?); @@ -106,7 +110,7 @@ pub async fn handle( state .register_client( &project_data.id, - &client_data, + client_data, &url::Url::parse(&state.config.relay_url)?, ) .await?; diff --git a/src/websocket_service/handlers/push_update.rs b/src/websocket_service/handlers/push_update.rs index ef4e7ead..b994fc5b 100644 --- a/src/websocket_service/handlers/push_update.rs +++ b/src/websocket_service/handlers/push_update.rs @@ -61,6 +61,7 @@ pub async fn handle( let msg: NotifyMessage = serde_json::from_slice(&msg)?; let sub_auth = SubscriptionAuth::from_jwt(&msg.params.subscription_auth)?; + let sub_auth_hash = sha256::digest(msg.params.subscription_auth); let response = NotifyResponse:: { id: msg.id, @@ -87,13 +88,16 @@ pub async fn handle( relay_url: state.config.relay_url.clone(), sym_key: client_data.sym_key.clone(), scope: sub_auth.scp.split(' ').map(|s| s.into()).collect(), + sub_auth_hash, + expiry: sub_auth.exp, + ksu: sub_auth.ksu, }; info!("[{request_id}] Updating client: {:?}", &client_data); state .register_client( &lookup_data.project_id, - &client_data, + client_data, &url::Url::parse(&state.config.relay_url)?, ) .await?; diff --git a/tests/integration.rs b/tests/integration.rs index 8322b200..9adc37ba 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -8,7 +8,8 @@ use { chrono::Utc, ed25519_dalek::Signer, notify_server::{ - auth::SubscriptionAuth, + auth::{AuthError, SubscriptionAuth}, + handlers::notify::JwtMessage, types::{Envelope, EnvelopeType0, EnvelopeType1, Notification}, websocket_service::{NotifyMessage, NotifyResponse}, wsclient::{self, RelayClientEvent}, @@ -25,6 +26,8 @@ use { x25519_dalek::{PublicKey, StaticSecret}, }; +const JWT_LEEWAY: i64 = 30; + fn urls(env: String) -> (String, String) { match env.as_str() { "STAGING" => ( @@ -101,6 +104,12 @@ PROJECT_ID to be set", .as_str() .unwrap(); + let dapp_identity_pubkey = dapp_pubkey_response + .get("identityPublicKey") + .unwrap() + .as_str() + .unwrap(); + // Get subscribe topic for dapp let subscribe_topic = sha256::digest(&*hex::decode(dapp_pubkey).unwrap()); @@ -125,6 +134,7 @@ PROJECT_ID to be set", // Encode the subscription auth let subscription_auth = encode_subscription_auth(&subscription_auth, &keypair); + let sub_auth_hash = sha256::digest(&*subscription_auth.clone()); let id = chrono::Utc::now().timestamp_millis().unsigned_abs(); @@ -234,16 +244,20 @@ PROJECT_ID to be set", ) .unwrap(); - let decrypted_notification: NotifyMessage = serde_json::from_slice( + let decrypted_notification: NotifyMessage = serde_json::from_slice( &cipher .decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox)) .unwrap(), ) .unwrap(); - let received_notification = decrypted_notification.params; - - assert_eq!(received_notification, notification); + // let received_notification = decrypted_notification.params; + let claims = verify_jwt(&decrypted_notification.params, dapp_identity_pubkey).unwrap(); + assert_eq!(claims.msg, notification); + assert_eq!(claims.sub, sub_auth_hash); + assert!(claims.iat < chrono::Utc::now().timestamp() + JWT_LEEWAY); + assert!(claims.exp > chrono::Utc::now().timestamp() - JWT_LEEWAY); + // TODO: add more asserts let delete_params = json!({ "code": 400, @@ -328,3 +342,30 @@ pub fn encode_subscription_auth(subscription_auth: &SubscriptionAuth, keypair: & format!("{message}.{signature}") } + +fn verify_jwt(jwt: &str, key: &str) -> notify_server::error::Result { + let key = jsonwebtoken::DecodingKey::from_ed_der(&hex::decode(key).unwrap()); + + let mut parts = jwt.rsplitn(2, '.'); + + let (Some(signature), Some(message)) = (parts.next(), parts.next()) else { + return Err(AuthError::Format)?; + }; + + // Finally, verify signature. + let sig_result = jsonwebtoken::crypto::verify( + signature, + message.as_bytes(), + &key, + jsonwebtoken::Algorithm::EdDSA, + ); + + match sig_result { + Ok(true) => Ok(serde_json::from_slice::( + &base64::engine::general_purpose::STANDARD_NO_PAD + .decode(jwt.split('.').nth(1).unwrap()) + .unwrap(), + )?), + Ok(false) | Err(_) => Err(AuthError::InvalidSignature)?, + } +}