Skip to content

Commit

Permalink
fix: publish response and subscription watcher updates in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
chris13524 committed Mar 5, 2024
1 parent 4b77eff commit 6efc52a
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
.await
.map_err(RelayMessageServerError::NotifyServerError)?; // TODO change to client error?

{
let response_fut = async {
let now = Utc::now();
let response_message = SubscriptionDeleteResponseAuth {
shared_claims: SharedClaims {
Expand Down Expand Up @@ -231,19 +231,22 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
state.metrics.as_ref(),
)
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?
}
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into())) // TODO change to client error?
};

send_to_subscription_watchers(
watchers_with_subscriptions,
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_client_id,
&state.relay_client,
state.metrics.as_ref(),
)
.await
.map_err(RelayMessageServerError::NotifyServerError)?; // TODO change to client error?
let watcher_fut = async {
send_to_subscription_watchers(
watchers_with_subscriptions,
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_client_id,
&state.relay_client,
state.metrics.as_ref(),
)
.await
.map_err(RelayMessageServerError::NotifyServerError) // TODO change to client error?
};

tokio::try_join!(response_fut, watcher_fut)?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
.await
.map_err(RelayMessageServerError::NotifyServerError)?; // TODO change to client error?

{
let response_fut = async {
let now = Utc::now();
let response_message = SubscriptionResponseAuth {
shared_claims: SharedClaims {
Expand Down Expand Up @@ -316,18 +316,22 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?
info!("Finished publishing subscribe response");
}
Ok(())
};

send_to_subscription_watchers(
watchers_with_subscriptions,
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_client_id,
&state.relay_client,
state.metrics.as_ref(),
)
.await
.map_err(RelayMessageServerError::NotifyServerError)?; // TODO change to client error?
let watcher_fut = async {
send_to_subscription_watchers(
watchers_with_subscriptions,
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_client_id,
&state.relay_client,
state.metrics.as_ref(),
)
.await
.map_err(RelayMessageServerError::NotifyServerError) // TODO change to client error?
};

tokio::try_join!(response_fut, watcher_fut)?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
.await
.map_err(RelayMessageServerError::NotifyServerError)?; // TODO change to client error?

{
let response_fut = async {
let now = Utc::now();
let response_message = SubscriptionUpdateResponseAuth {
shared_claims: SharedClaims {
Expand Down Expand Up @@ -226,19 +226,22 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
state.metrics.as_ref(),
)
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?
}
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into())) // TODO change to client error?
};

send_to_subscription_watchers(
watchers_with_subscriptions,
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_client_id,
&state.relay_client,
state.metrics.as_ref(),
)
.await
.map_err(RelayMessageServerError::NotifyServerError)?; // TODO change to client error?
let watcher_fut = async {
send_to_subscription_watchers(
watchers_with_subscriptions,
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_client_id,
&state.relay_client,
state.metrics.as_ref(),
)
.await
.map_err(RelayMessageServerError::NotifyServerError) // TODO change to client error?
};

tokio::try_join!(response_fut, watcher_fut)?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use {
},
base64::Engine,
chrono::{Duration, Utc},
futures_util::StreamExt,
relay_rpc::{auth::ed25519_dalek::SigningKey, domain::DecodedClientId, rpc::Publish},
sqlx::PgPool,
std::sync::Arc,
Expand Down Expand Up @@ -380,26 +381,34 @@ pub async fn send_to_subscription_watchers(
http_client: &relay_client::http::Client,
metrics: Option<&Metrics>,
) -> Result<(), NotifyServerError> {
for (watcher, subscriptions) in watchers_with_subscriptions {
info!(
"Timing: Sending watchSubscriptionsChanged to watcher.did_key: {}",
watcher.did_key
);
send(
subscriptions,
&watcher.account,
watcher.did_key.clone(),
&watcher.sym_key,
authentication_secret,
authentication_client_id,
http_client,
metrics,
)
.await?;
info!(
"Timing: Sent watchSubscriptionsChanged to watcher.did_key: {}",
watcher.did_key
);
let results = futures_util::stream::iter(watchers_with_subscriptions)
.map(|(watcher, subscriptions)| async move {
info!(
"Timing: Sending watchSubscriptionsChanged to watcher.did_key: {}",
watcher.did_key
);
send(
subscriptions,
&watcher.account,
watcher.did_key.clone(),
&watcher.sym_key,
authentication_secret,
authentication_client_id,
http_client,
metrics,
)
.await?;
info!(
"Timing: Sent watchSubscriptionsChanged to watcher.did_key: {}",
watcher.did_key
);
Ok(())
})
.buffer_unordered(10)
.collect::<Vec<Result<(), NotifyServerError>>>()
.await;
for result in results {
result?;
}
Ok(())
}
Expand Down

0 comments on commit 6efc52a

Please sign in to comment.