Skip to content

Commit

Permalink
fix: SDK analytics (#545)
Browse files Browse the repository at this point in the history
* fix: SDK analytics

* fix: oneshot receive error
  • Loading branch information
chris13524 authored May 8, 2024
1 parent 66e8a11 commit afad3cc
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 21 deletions.
4 changes: 4 additions & 0 deletions src/analytics/relay_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {

pub struct RelayResponseParams {
pub request: Arc<RelayIncomingMessage>,
pub request_sdk: Option<Arc<str>>,

pub response_message_id: Arc<str>,
pub response_topic: Topic,
Expand All @@ -31,6 +32,8 @@ pub struct RelayRequest {
pub request_tag: u32,
/// Time at which the request was received
pub request_received_at: NaiveDateTime,
/// The SDK information of the request
pub request_sdk: Option<Arc<str>>,

/// Relay message ID of response
pub response_message_id: Arc<str>,
Expand All @@ -55,6 +58,7 @@ impl From<RelayResponseParams> for RelayRequest {
request_topic: params.request.topic.value().clone(),
request_tag: params.request.tag,
request_received_at: params.request.received_at.naive_utc(),
request_sdk: params.request_sdk,

response_message_id: params.response_message_id.clone(),
response_topic: params.response_topic.value().clone(),
Expand Down
70 changes: 64 additions & 6 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub trait GetSharedClaims {
fn get_shared_claims(&self) -> &SharedClaims;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct WatchSubscriptionsRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -100,6 +100,16 @@ pub struct WatchSubscriptionsRequestAuth {
/// did:web of app domain to watch, or `null` for all domains
#[serde(default)]
pub app: Option<DidWeb>,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl WatchSubscriptionsRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for WatchSubscriptionsRequestAuth {
Expand Down Expand Up @@ -176,7 +186,7 @@ pub struct WatchSubscriptionsChangedResponseAuth {
// Note: MessageAuth is different since it doesn't have `aud`
// pub struct MessageAuth {

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct MessageResponseAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -186,6 +196,16 @@ pub struct MessageResponseAuth {
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl MessageResponseAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for MessageResponseAuth {
Expand All @@ -194,7 +214,7 @@ impl GetSharedClaims for MessageResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct SubscriptionRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -206,6 +226,16 @@ pub struct SubscriptionRequestAuth {
pub app: DidWeb,
/// space-delimited scope of notification types authorized by the user
pub scp: String,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionRequestAuth {
Expand Down Expand Up @@ -233,7 +263,7 @@ impl GetSharedClaims for SubscriptionResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct SubscriptionUpdateRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -245,6 +275,16 @@ pub struct SubscriptionUpdateRequestAuth {
pub app: DidWeb,
/// space-delimited scope of notification types authorized by the user
pub scp: String,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionUpdateRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionUpdateRequestAuth {
Expand All @@ -271,7 +311,7 @@ impl GetSharedClaims for SubscriptionUpdateResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct SubscriptionDeleteRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -281,6 +321,16 @@ pub struct SubscriptionDeleteRequestAuth {
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionDeleteRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionDeleteRequestAuth {
Expand Down Expand Up @@ -320,6 +370,9 @@ pub struct SubscriptionGetNotificationsRequestAuth {
#[serde(flatten)]
#[validate(nested)]
pub params: GetNotificationsParams,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionGetNotificationsRequestAuth {
Expand Down Expand Up @@ -353,7 +406,7 @@ impl GetSharedClaims for SubscriptionGetNotificationsResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct SubscriptionMarkNotificationsAsReadRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -365,10 +418,15 @@ pub struct SubscriptionMarkNotificationsAsReadRequestAuth {
pub app: DidWeb,
#[serde(flatten)]
pub params: MarkNotificationsAsReadParams,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionMarkNotificationsAsReadRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))?;
self.params
.validate_with_args(&MarkNotificationsAsReadParamsValidatorContext {
all: self.params.all,
Expand Down
3 changes: 3 additions & 0 deletions src/publish_relay_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub async fn publish_relay_message(
relay_client: &Client,
publish: &Publish,
relay_request: Option<Arc<RelayIncomingMessage>>,
sdk: Option<Arc<str>>,
metrics: Option<&Metrics>,
analytics: &NotifyAnalytics,
) -> Result<(), Error<PublishError>> {
Expand Down Expand Up @@ -92,6 +93,7 @@ pub async fn publish_relay_message(
let finished = Utc::now();
analytics.relay_request(RelayResponseParams {
request: relay_request,
request_sdk: sdk,
response_message_id: get_message_id(&publish.message).into(),
response_topic: publish.topic.clone(),
response_tag: publish.tag,
Expand Down Expand Up @@ -122,6 +124,7 @@ pub async fn publish_relay_message(
let finished = Utc::now();
analytics.relay_request(RelayResponseParams {
request: relay_request,
request_sdk: sdk,
response_message_id: get_message_id(&publish.message).into(),
response_topic: publish.topic.clone(),
response_tag: publish.tag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ pub enum RelayMessageServerError {

#[error("Subscription watcher send: {0}")]
SubscriptionWatcherSend(SubscriptionWatcherSendError),

#[error("Error sending sdk info via oneshot channel")]
SdkOneshotSend,
}

#[derive(Debug, thiserror::Error)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use {
rpc::Publish,
},
std::{collections::HashSet, sync::Arc},
tokio::sync::oneshot,
tracing::{info, warn},
};

Expand Down Expand Up @@ -87,10 +88,12 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R

let req = decrypt_message::<NotifyDelete, _>(envelope, &sym_key)?;

let (sdk_tx, mut sdk_rx) = oneshot::channel();
async fn handle(
state: &AppState,
msg: &RelayIncomingMessage,
req: &JsonRpcRequest<NotifyDelete>,
sdk_tx: oneshot::Sender<Option<Arc<str>>>,
subscriber: &SubscriberWithScope,
project: &Project,
project_client_id: DecodedClientId,
Expand All @@ -111,6 +114,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
"request_auth.shared_claims.iss: {:?}",
request_auth.shared_claims.iss
);

request_auth
.validate()
.map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error?

sdk_tx
.send(request_auth.sdk.map(Into::into))
.map_err(|_| RelayMessageServerError::SdkOneshotSend)?;
let request_iss_client_id =
DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss)
.map_err(AuthError::JwtIssNotDidKey)
Expand Down Expand Up @@ -231,7 +242,16 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
Ok((ResponseAuth { response_auth }, watchers_with_subscriptions))
}

let result = handle(state, &msg, &req, &subscriber, &project, project_client_id).await;
let result = handle(
state,
&msg,
&req,
sdk_tx,
&subscriber,
&project,
project_client_id,
)
.await;

let (response, watchers_with_subscriptions, result) = match result {
Ok((result, watchers_with_subscriptions)) => (
Expand All @@ -248,10 +268,13 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
),
};

let sdk = sdk_rx.try_recv().unwrap_or(None);

let msg = Arc::from(msg);

let response_fut = {
let msg = msg.clone();
let sdk = sdk.clone();
async {
let envelope = Envelope::<EnvelopeType0>::new(&sym_key, response)
.map_err(RelayMessageServerError::EnvelopeEncryption)?;
Expand All @@ -268,6 +291,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
prompt: false,
},
Some(msg),
sdk,
state.metrics.as_ref(),
&state.analytics,
)
Expand All @@ -285,6 +309,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
&state.notify_keys.authentication_client_id,
&state.relay_client,
msg,
sdk,
state.metrics.as_ref(),
&state.analytics,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use {
rpc::{msg_id::get_message_id, Publish},
},
std::sync::Arc,
tokio::sync::oneshot,
tracing::info,
};

Expand Down Expand Up @@ -79,10 +80,12 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R

let req = decrypt_message::<AuthMessage, _>(envelope, &sym_key)?;

let (sdk_tx, mut sdk_rx) = oneshot::channel();
async fn handle(
state: &AppState,
msg: &RelayIncomingMessage,
req: &JsonRpcRequest<AuthMessage>,
sdk_tx: oneshot::Sender<Option<Arc<str>>>,
subscriber: &SubscriberWithScope,
project: &Project,
) -> Result<AuthMessage, RelayMessageError> {
Expand All @@ -96,6 +99,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
"request_auth.shared_claims.iss: {:?}",
request_auth.shared_claims.iss
);

request_auth
.validate()
.map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error?

sdk_tx
.send(request_auth.sdk.map(Into::into))
.map_err(|_| RelayMessageServerError::SdkOneshotSend)?;
let request_iss_client_id =
DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss)
.map_err(AuthError::JwtIssNotDidKey)
Expand Down Expand Up @@ -141,10 +152,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
(account, Arc::<str>::from(domain))
};

request_auth
.validate()
.map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error?

let data = get_notifications_for_subscriber(
subscriber.id,
request_auth.params,
Expand Down Expand Up @@ -218,7 +225,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
Ok(AuthMessage { auth })
}

let result = handle(state, &msg, &req, &subscriber, &project).await;
let result = handle(state, &msg, &req, sdk_tx, &subscriber, &project).await;

let response = match &result {
Ok(result) => serde_json::to_vec(&JsonRpcResponse::new(req.id, result))
Expand All @@ -227,6 +234,8 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
.map_err(RelayMessageServerError::JsonRpcResponseErrorSerialization)?,
};

let sdk = sdk_rx.try_recv().unwrap_or(None);

let envelope = Envelope::<EnvelopeType0>::new(&sym_key, response)
.map_err(RelayMessageServerError::EnvelopeEncryption)?;

Expand All @@ -242,6 +251,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
prompt: false,
},
Some(Arc::new(msg)),
sdk,
state.metrics.as_ref(),
&state.analytics,
)
Expand Down
Loading

0 comments on commit afad3cc

Please sign in to comment.