Skip to content

Commit

Permalink
feat: jwt authed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Rakowskiii committed Jul 28, 2023
1 parent 7975702 commit 11c6f92
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 22 deletions.
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ run:

test ENV:
@echo '==> Running integration tests'
ENVIRONMENT="{{ENV}}" cargo test --test integration
ENVIRONMENT="{{ENV}}" cargo test --test integration -- --nocapture

deploy-terraform ENV:
@echo '==> Deploying terraform on env {{ENV}}'
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ pub enum Error {

#[error(transparent)]
ToStrError(#[from] hyper::header::ToStrError),

#[error(transparent)]
EdDalek(#[from] ed25519_dalek::ed25519::Error),
}

impl IntoResponse for Error {
Expand Down
108 changes: 100 additions & 8 deletions src/handlers/notify.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {
super::subscribe_topic::ProjectData,
crate::{
analytics::message_info::MessageInfo,
error,
Expand All @@ -14,12 +15,14 @@ use {
Json,
},
base64::Engine,
ed25519_dalek::Signer,
error::Result,
futures::FutureExt,
log::warn,
mongodb::bson::doc,
relay_rpc::{
domain::Topic,
jwt::{JwtHeader, JWT_HEADER_ALG, JWT_HEADER_TYP},
rpc::{msg_id::MsgId, Publish},
},
serde::{Deserialize, Serialize},
Expand All @@ -29,6 +32,8 @@ use {
wc::metrics::otel::{Context, KeyValue},
};

const NOTIFY_MSG_TTL: u64 = 2592000;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct NotifyBody {
pub notification: Notification,
Expand Down Expand Up @@ -89,9 +94,24 @@ pub async fn handler(
.find(doc! { "_id": {"$in": &accounts}}, None)
.await?;

let project_data: ProjectData = state
.database
.collection::<ProjectData>("project_data")
.find_one(doc! { "_id": project_id.clone()}, None)
.await?
.ok_or(error::Error::NoProjectDataForTopic(project_id.clone()))?;

let notify_pubkey = bs58::encode(state.keypair.public_key().as_bytes()).into_string();
// Generate publish jobs - this will also remove accounts from not_found
// Prepares the encrypted message and gets the topic for each account
let jobs = generate_publish_jobs(notification, cursor, &mut response).await?;
let jobs = generate_publish_jobs(
notification,
cursor,
&mut response,
&project_data,
&notify_pubkey,
)
.await?;

// Attempts to send to all found accounts, waiting for relay ack for
// NOTIFY_TIMEOUT seconds
Expand Down Expand Up @@ -174,7 +194,7 @@ async fn process_publish_jobs(
job.topic.clone(),
job.message,
4002,
Duration::from_secs(2592000),
Duration::from_secs(NOTIFY_MSG_TTL),
true,
),
)
Expand Down Expand Up @@ -214,17 +234,13 @@ async fn generate_publish_jobs(
notification: Notification,
mut cursor: mongodb::Cursor<ClientData>,
response: &mut Response,
project_data: &ProjectData,
notify_pubkey: &str,
) -> Result<Vec<PublishJob>> {
let mut jobs = vec![];

let id = chrono::Utc::now().timestamp_millis().unsigned_abs();

let message = JsonRpcPayload {
id,
jsonrpc: "2.0".to_string(),
params: JsonRpcParams::Push(notification.clone()),
};

while let Some(client_data) = cursor.try_next().await? {
response.not_found.remove(&client_data.id);

Expand All @@ -236,6 +252,17 @@ async fn generate_publish_jobs(
continue;
}

let message = JsonRpcPayload {
id,
jsonrpc: "2.0".to_string(),
params: JsonRpcParams::Push(sign_message(
&notification,
project_data,
notify_pubkey,
&client_data,
)?),
};

let envelope = Envelope::<EnvelopeType0>::new(&client_data.sym_key, &message)?;

let base64_notification =
Expand Down Expand Up @@ -286,3 +313,68 @@ fn send_metrics(
KeyValue::new("project_id", project_id),
])
}

fn sign_message(
msg: &Notification,
project_data: &ProjectData,
notify_pubkey: &str,
client_data: &ClientData,
) -> Result<String> {
let msg = {
let msg = JwtMessage {
iat: chrono::Utc::now().timestamp(),
exp: (chrono::Utc::now() + chrono::Duration::seconds(NOTIFY_MSG_TTL as i64))
.timestamp(),
iss: format!("did:key:{}", notify_pubkey),
ksu: client_data.ksu.to_string(),
aud: format!("did:pkh:{}", client_data.id),
act: "notify_message".to_string(),
sub: client_data.sub_auth_hash.clone(),
app: project_data.dapp_url.to_string(),
msg: msg.clone(),
};
let serialized = serde_json::to_string(&msg)?;
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(serialized)
};

let header = {
let data = JwtHeader {
typ: JWT_HEADER_TYP,
alg: JWT_HEADER_ALG,
};

let serialized = serde_json::to_string(&data)?;

base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(serialized)
};

let private_key = ed25519_dalek::SecretKey::from_bytes(&hex::decode(
project_data.identity_keypair.private_key.clone(),
)?)?;

let public_key = ed25519_dalek::PublicKey::from(&private_key);

let keypair = ed25519_dalek::Keypair {
secret: private_key,
public: public_key,
};

let message = format!("{}.{}", header, msg);
let signature =
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(keypair.sign(message.as_bytes()));

Ok(format!("{}.{}", message, signature))
}

#[derive(Serialize, Deserialize, Debug)]
pub struct JwtMessage {
pub iat: i64, // issued at
pub exp: i64, // expiry
pub iss: String, // public key of cast server (did:key)
pub ksu: String, // key server url
pub aud: String, // blockchain account (did:pkh)
pub act: String, // action intent (must be "notify_message")
pub sub: String, // subscriptionId (sha256 hash of subscriptionAuth)
pub app: String, // dapp domain url
pub msg: Notification, // message
}
3 changes: 3 additions & 0 deletions src/handlers/subscribe_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct ProjectData {
pub id: String,
pub identity_keypair: Keypair,
pub signing_keypair: Keypair,
pub dapp_url: String,
pub topic: String,
}

Expand Down Expand Up @@ -73,6 +74,8 @@ pub async fn handler(
private_key: hex::encode(identity_secret.to_bytes()),
public_key: identity_public.clone(),
},
// TODO: Proper dapp url
dapp_url: "http://localhost:3000".into(),
topic: topic.clone(),
};

Expand Down
7 changes: 2 additions & 5 deletions src/jsonrpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use {
crate::types::Notification,
serde::{Deserialize, Serialize},
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpcPayload {
Expand All @@ -15,5 +12,5 @@ pub struct JsonRpcPayload {
#[serde(tag = "method", content = "params")]
pub enum JsonRpcParams {
#[serde(rename = "wc_pushMessage")]
Push(Notification),
Push(String),
}
3 changes: 2 additions & 1 deletion src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl AppState {
pub async fn register_client(
&self,
project_id: &str,
client_data: &ClientData,
client_data: ClientData,
url: &Url,
) -> Result<()> {
let key = hex::decode(client_data.sym_key.clone())?;
Expand All @@ -72,6 +72,7 @@ impl AppState {
relay_url: url.to_string().trim_end_matches('/').to_string(),
sym_key: client_data.sym_key.clone(),
scope: client_data.scope.clone(),
..client_data
};

self.database
Expand Down
3 changes: 3 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub struct ClientData {
pub id: String,
pub relay_url: String,
pub sym_key: String,
pub sub_auth_hash: String,
pub expiry: u64,
pub ksu: String,
pub scope: HashSet<String>,
}

Expand Down
6 changes: 5 additions & 1 deletion src/websocket_service/handlers/push_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub async fn handle(
let id = msg.id;

let sub_auth = SubscriptionAuth::from_jwt(&msg.params.subscription_auth)?;
let sub_auth_hash = sha256::digest(msg.params.subscription_auth);

let secret = StaticSecret::random_from_rng(chacha20poly1305::aead::OsRng {});
let public = PublicKey::from(&secret);
Expand Down Expand Up @@ -87,6 +88,9 @@ pub async fn handle(
relay_url: state.config.relay_url.clone(),
sym_key: push_key.clone(),
scope: sub_auth.scp.split(' ').map(|s| s.into()).collect(),
expiry: sub_auth.exp,
sub_auth_hash,
ksu: sub_auth.ksu,
};

let push_topic = sha256::digest(&*hex::decode(&push_key)?);
Expand All @@ -106,7 +110,7 @@ pub async fn handle(
state
.register_client(
&project_data.id,
&client_data,
client_data,
&url::Url::parse(&state.config.relay_url)?,
)
.await?;
Expand Down
6 changes: 5 additions & 1 deletion src/websocket_service/handlers/push_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub async fn handle(
let msg: NotifyMessage<NotifySubscribe> = serde_json::from_slice(&msg)?;

let sub_auth = SubscriptionAuth::from_jwt(&msg.params.subscription_auth)?;
let sub_auth_hash = sha256::digest(msg.params.subscription_auth);

let response = NotifyResponse::<bool> {
id: msg.id,
Expand All @@ -87,13 +88,16 @@ pub async fn handle(
relay_url: state.config.relay_url.clone(),
sym_key: client_data.sym_key.clone(),
scope: sub_auth.scp.split(' ').map(|s| s.into()).collect(),
sub_auth_hash,
expiry: sub_auth.exp,
ksu: sub_auth.ksu,
};
info!("[{request_id}] Updating client: {:?}", &client_data);

state
.register_client(
&lookup_data.project_id,
&client_data,
client_data,
&url::Url::parse(&state.config.relay_url)?,
)
.await?;
Expand Down
51 changes: 46 additions & 5 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use {
chrono::Utc,
ed25519_dalek::Signer,
notify_server::{
auth::SubscriptionAuth,
auth::{AuthError, SubscriptionAuth},
handlers::notify::JwtMessage,
types::{Envelope, EnvelopeType0, EnvelopeType1, Notification},
websocket_service::{NotifyMessage, NotifyResponse},
wsclient::{self, RelayClientEvent},
Expand All @@ -25,6 +26,8 @@ use {
x25519_dalek::{PublicKey, StaticSecret},
};

const JWT_LEEWAY: i64 = 30;

fn urls(env: String) -> (String, String) {
match env.as_str() {
"STAGING" => (
Expand Down Expand Up @@ -101,6 +104,12 @@ PROJECT_ID to be set",
.as_str()
.unwrap();

let dapp_identity_pubkey = dapp_pubkey_response
.get("identityPublicKey")
.unwrap()
.as_str()
.unwrap();

// Get subscribe topic for dapp
let subscribe_topic = sha256::digest(&*hex::decode(dapp_pubkey).unwrap());

Expand All @@ -125,6 +134,7 @@ PROJECT_ID to be set",

// Encode the subscription auth
let subscription_auth = encode_subscription_auth(&subscription_auth, &keypair);
let sub_auth_hash = sha256::digest(&*subscription_auth.clone());

let id = chrono::Utc::now().timestamp_millis().unsigned_abs();

Expand Down Expand Up @@ -234,16 +244,20 @@ PROJECT_ID to be set",
)
.unwrap();

let decrypted_notification: NotifyMessage<Notification> = serde_json::from_slice(
let decrypted_notification: NotifyMessage<String> = serde_json::from_slice(
&cipher
.decrypt(&iv.into(), chacha20poly1305::aead::Payload::from(&*sealbox))
.unwrap(),
)
.unwrap();

let received_notification = decrypted_notification.params;

assert_eq!(received_notification, notification);
// let received_notification = decrypted_notification.params;
let claims = verify_jwt(&decrypted_notification.params, dapp_identity_pubkey).unwrap();
assert_eq!(claims.msg, notification);
assert_eq!(claims.sub, sub_auth_hash);
assert!(claims.iat < chrono::Utc::now().timestamp() + JWT_LEEWAY);
assert!(claims.exp > chrono::Utc::now().timestamp() - JWT_LEEWAY);
// TODO: add more asserts

let delete_params = json!({
"code": 400,
Expand Down Expand Up @@ -328,3 +342,30 @@ pub fn encode_subscription_auth(subscription_auth: &SubscriptionAuth, keypair: &

format!("{message}.{signature}")
}

fn verify_jwt(jwt: &str, key: &str) -> notify_server::error::Result<JwtMessage> {
let key = jsonwebtoken::DecodingKey::from_ed_der(&hex::decode(key).unwrap());

let mut parts = jwt.rsplitn(2, '.');

let (Some(signature), Some(message)) = (parts.next(), parts.next()) else {
return Err(AuthError::Format)?;
};

// Finally, verify signature.
let sig_result = jsonwebtoken::crypto::verify(
signature,
message.as_bytes(),
&key,
jsonwebtoken::Algorithm::EdDSA,
);

match sig_result {
Ok(true) => Ok(serde_json::from_slice::<JwtMessage>(
&base64::engine::general_purpose::STANDARD_NO_PAD
.decode(jwt.split('.').nth(1).unwrap())
.unwrap(),
)?),
Ok(false) | Err(_) => Err(AuthError::InvalidSignature)?,
}
}

0 comments on commit 11c6f92

Please sign in to comment.