Skip to content

Commit

Permalink
fix: Revert "fix: SDK analytics (#545)" (#549)
Browse files Browse the repository at this point in the history
This reverts commit afad3cc.
  • Loading branch information
chris13524 committed May 8, 2024
1 parent fe22cb7 commit 9ef307a
Show file tree
Hide file tree
Showing 14 changed files with 21 additions and 203 deletions.
4 changes: 0 additions & 4 deletions src/analytics/relay_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use {

pub struct RelayResponseParams {
pub request: Arc<RelayIncomingMessage>,
pub request_sdk: Option<Arc<str>>,

pub response_message_id: Arc<str>,
pub response_topic: Topic,
Expand All @@ -32,8 +31,6 @@ pub struct RelayRequest {
pub request_tag: u32,
/// Time at which the request was received
pub request_received_at: NaiveDateTime,
/// The SDK information of the request
pub request_sdk: Option<Arc<str>>,

/// Relay message ID of response
pub response_message_id: Arc<str>,
Expand All @@ -58,7 +55,6 @@ impl From<RelayResponseParams> for RelayRequest {
request_topic: params.request.topic.value().clone(),
request_tag: params.request.tag,
request_received_at: params.request.received_at.naive_utc(),
request_sdk: params.request_sdk,

response_message_id: params.response_message_id.clone(),
response_topic: params.response_topic.value().clone(),
Expand Down
70 changes: 6 additions & 64 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub trait GetSharedClaims {
fn get_shared_claims(&self) -> &SharedClaims;
}

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WatchSubscriptionsRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -100,16 +100,6 @@ pub struct WatchSubscriptionsRequestAuth {
/// did:web of app domain to watch, or `null` for all domains
#[serde(default)]
pub app: Option<DidWeb>,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl WatchSubscriptionsRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for WatchSubscriptionsRequestAuth {
Expand Down Expand Up @@ -186,7 +176,7 @@ pub struct WatchSubscriptionsChangedResponseAuth {
// Note: MessageAuth is different since it doesn't have `aud`
// pub struct MessageAuth {

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageResponseAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -196,16 +186,6 @@ pub struct MessageResponseAuth {
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl MessageResponseAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for MessageResponseAuth {
Expand All @@ -214,7 +194,7 @@ impl GetSharedClaims for MessageResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -226,16 +206,6 @@ pub struct SubscriptionRequestAuth {
pub app: DidWeb,
/// space-delimited scope of notification types authorized by the user
pub scp: String,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionRequestAuth {
Expand Down Expand Up @@ -263,7 +233,7 @@ impl GetSharedClaims for SubscriptionResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionUpdateRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -275,16 +245,6 @@ pub struct SubscriptionUpdateRequestAuth {
pub app: DidWeb,
/// space-delimited scope of notification types authorized by the user
pub scp: String,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionUpdateRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionUpdateRequestAuth {
Expand All @@ -311,7 +271,7 @@ impl GetSharedClaims for SubscriptionUpdateResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionDeleteRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -321,16 +281,6 @@ pub struct SubscriptionDeleteRequestAuth {
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionDeleteRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionDeleteRequestAuth {
Expand Down Expand Up @@ -370,9 +320,6 @@ pub struct SubscriptionGetNotificationsRequestAuth {
#[serde(flatten)]
#[validate(nested)]
pub params: GetNotificationsParams,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionGetNotificationsRequestAuth {
Expand Down Expand Up @@ -406,7 +353,7 @@ impl GetSharedClaims for SubscriptionGetNotificationsResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionMarkNotificationsAsReadRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -418,15 +365,10 @@ pub struct SubscriptionMarkNotificationsAsReadRequestAuth {
pub app: DidWeb,
#[serde(flatten)]
pub params: MarkNotificationsAsReadParams,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionMarkNotificationsAsReadRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))?;
self.params
.validate_with_args(&MarkNotificationsAsReadParamsValidatorContext {
all: self.params.all,
Expand Down
3 changes: 0 additions & 3 deletions src/publish_relay_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub async fn publish_relay_message(
relay_client: &Client,
publish: &Publish,
relay_request: Option<Arc<RelayIncomingMessage>>,
sdk: Option<Arc<str>>,
metrics: Option<&Metrics>,
analytics: &NotifyAnalytics,
) -> Result<(), Error<PublishError>> {
Expand Down Expand Up @@ -93,7 +92,6 @@ pub async fn publish_relay_message(
let finished = Utc::now();
analytics.relay_request(RelayResponseParams {
request: relay_request,
request_sdk: sdk,
response_message_id: get_message_id(&publish.message).into(),
response_topic: publish.topic.clone(),
response_tag: publish.tag,
Expand Down Expand Up @@ -124,7 +122,6 @@ pub async fn publish_relay_message(
let finished = Utc::now();
analytics.relay_request(RelayResponseParams {
request: relay_request,
request_sdk: sdk,
response_message_id: get_message_id(&publish.message).into(),
response_topic: publish.topic.clone(),
response_tag: publish.tag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ pub enum RelayMessageServerError {

#[error("Subscription watcher send: {0}")]
SubscriptionWatcherSend(SubscriptionWatcherSendError),

#[error("Error sending sdk info via oneshot channel")]
SdkOneshotSend,
}

#[derive(Debug, thiserror::Error)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use {
rpc::Publish,
},
std::{collections::HashSet, sync::Arc},
tokio::sync::oneshot,
tracing::{info, warn},
};

Expand Down Expand Up @@ -88,12 +87,10 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R

let req = decrypt_message::<NotifyDelete, _>(envelope, &sym_key)?;

let (sdk_tx, mut sdk_rx) = oneshot::channel();
async fn handle(
state: &AppState,
msg: &RelayIncomingMessage,
req: &JsonRpcRequest<NotifyDelete>,
sdk_tx: oneshot::Sender<Option<Arc<str>>>,
subscriber: &SubscriberWithScope,
project: &Project,
project_client_id: DecodedClientId,
Expand All @@ -114,14 +111,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
"request_auth.shared_claims.iss: {:?}",
request_auth.shared_claims.iss
);

request_auth
.validate()
.map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error?

sdk_tx
.send(request_auth.sdk.map(Into::into))
.map_err(|_| RelayMessageServerError::SdkOneshotSend)?;
let request_iss_client_id =
DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss)
.map_err(AuthError::JwtIssNotDidKey)
Expand Down Expand Up @@ -242,16 +231,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
Ok((ResponseAuth { response_auth }, watchers_with_subscriptions))
}

let result = handle(
state,
&msg,
&req,
sdk_tx,
&subscriber,
&project,
project_client_id,
)
.await;
let result = handle(state, &msg, &req, &subscriber, &project, project_client_id).await;

let (response, watchers_with_subscriptions, result) = match result {
Ok((result, watchers_with_subscriptions)) => (
Expand All @@ -268,13 +248,10 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
),
};

let sdk = sdk_rx.try_recv().unwrap_or(None);

let msg = Arc::from(msg);

let response_fut = {
let msg = msg.clone();
let sdk = sdk.clone();
async {
let envelope = Envelope::<EnvelopeType0>::new(&sym_key, response)
.map_err(RelayMessageServerError::EnvelopeEncryption)?;
Expand All @@ -291,7 +268,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
prompt: false,
},
Some(msg),
sdk,
state.metrics.as_ref(),
&state.analytics,
)
Expand All @@ -309,7 +285,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
&state.notify_keys.authentication_client_id,
&state.relay_client,
msg,
sdk,
state.metrics.as_ref(),
&state.analytics,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use {
rpc::{msg_id::get_message_id, Publish},
},
std::sync::Arc,
tokio::sync::oneshot,
tracing::info,
};

Expand Down Expand Up @@ -80,12 +79,10 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R

let req = decrypt_message::<AuthMessage, _>(envelope, &sym_key)?;

let (sdk_tx, mut sdk_rx) = oneshot::channel();
async fn handle(
state: &AppState,
msg: &RelayIncomingMessage,
req: &JsonRpcRequest<AuthMessage>,
sdk_tx: oneshot::Sender<Option<Arc<str>>>,
subscriber: &SubscriberWithScope,
project: &Project,
) -> Result<AuthMessage, RelayMessageError> {
Expand All @@ -99,14 +96,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
"request_auth.shared_claims.iss: {:?}",
request_auth.shared_claims.iss
);

request_auth
.validate()
.map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error?

sdk_tx
.send(request_auth.sdk.map(Into::into))
.map_err(|_| RelayMessageServerError::SdkOneshotSend)?;
let request_iss_client_id =
DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss)
.map_err(AuthError::JwtIssNotDidKey)
Expand Down Expand Up @@ -152,6 +141,10 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
(account, Arc::<str>::from(domain))
};

request_auth
.validate()
.map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error?

let data = get_notifications_for_subscriber(
subscriber.id,
request_auth.params,
Expand Down Expand Up @@ -225,7 +218,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
Ok(AuthMessage { auth })
}

let result = handle(state, &msg, &req, sdk_tx, &subscriber, &project).await;
let result = handle(state, &msg, &req, &subscriber, &project).await;

let response = match &result {
Ok(result) => serde_json::to_vec(&JsonRpcResponse::new(req.id, result))
Expand All @@ -234,8 +227,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
.map_err(RelayMessageServerError::JsonRpcResponseErrorSerialization)?,
};

let sdk = sdk_rx.try_recv().unwrap_or(None);

let envelope = Envelope::<EnvelopeType0>::new(&sym_key, response)
.map_err(RelayMessageServerError::EnvelopeEncryption)?;

Expand All @@ -251,7 +242,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
prompt: false,
},
Some(Arc::new(msg)),
sdk,
state.metrics.as_ref(),
&state.analytics,
)
Expand Down
Loading

0 comments on commit 9ef307a

Please sign in to comment.