Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: guarantee subscribe #317

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
2 changes: 2 additions & 0 deletions .github/workflows/event_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ jobs:
- uses: actions/checkout@v3
- uses: WalletConnect/actions/github/paths-filter/@2.2.1
id: filter
with:
path-app: . # run CI when tests are changed
outputs:
infra: ${{ steps.filter.outputs.infra }}
app: ${{ steps.filter.outputs.app }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sub-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@ jobs:
uses: WalletConnect/actions-rs/[email protected]
with:
command: test
args: --test integration
args: --test integration -- --test-threads=1
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test-all:

test-integration:
@echo '==> Testing integration'
RUST_BACKTRACE=1 ANSI_LOGS=true cargo test --test integration -- {{test}}
RUST_BACKTRACE=1 ANSI_LOGS=true cargo test --test integration -- {{test}} --test-threads=1

# Clean build artifacts
clean:
Expand Down
52 changes: 52 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub struct Metrics {
relay_outgoing_message_failures: Counter<u64>,
relay_outgoing_message_latency: Histogram<u64>,
relay_outgoing_message_publish_latency: Histogram<u64>,
relay_subscribes: Counter<u64>,
relay_subscribe_failures: Counter<u64>,
relay_subscribe_latency: Histogram<u64>,
relay_subscribe_request_latency: Histogram<u64>,
postgres_queries: Counter<u64>,
postgres_query_latency: Histogram<u64>,
keys_server_requests: Counter<u64>,
Expand Down Expand Up @@ -112,6 +116,26 @@ impl Metrics {
.with_description("The latency publishing relay messages")
.init();

let relay_subscribes: Counter<u64> = meter
.u64_counter("relay_subscribes")
.with_description("The number of subscribes to relay topics (not including retries)")
.init();

let relay_subscribe_failures: Counter<u64> = meter
.u64_counter("relay_subscribe_failures")
.with_description("The number of failures to subscribe to relay topics")
.init();

let relay_subscribe_latency: Histogram<u64> = meter
.u64_histogram("relay_subscribe_latency")
.with_description("The latency subscribing to relay topics w/ built-in retry")
.init();

let relay_subscribe_request_latency: Histogram<u64> = meter
.u64_histogram("relay_subscribe_request_latency")
.with_description("The latency subscribing to relay topics")
.init();

let postgres_queries: Counter<u64> = meter
.u64_counter("postgres_queries")
.with_description("The number of Postgres queries executed")
Expand Down Expand Up @@ -194,6 +218,10 @@ impl Metrics {
relay_outgoing_message_failures,
relay_outgoing_message_latency,
relay_outgoing_message_publish_latency,
relay_subscribes,
relay_subscribe_failures,
relay_subscribe_latency,
relay_subscribe_request_latency,
postgres_queries,
postgres_query_latency,
keys_server_requests,
Expand Down Expand Up @@ -286,6 +314,30 @@ impl Metrics {
);
}

pub fn relay_subscribe(&self, success: bool, start: Instant) {
let elapsed = start.elapsed();

let ctx = Context::current();
let attributes = [KeyValue::new("success", success.to_string())];
self.relay_subscribes.add(&ctx, 1, &attributes);
self.relay_subscribe_latency
.record(&ctx, elapsed.as_millis() as u64, &attributes);
}

pub fn relay_subscribe_failure(&self, is_permenant: bool) {
let ctx = Context::current();
let attributes = [KeyValue::new("is_permenant", is_permenant.to_string())];
self.relay_subscribe_failures.add(&ctx, 1, &attributes);
}

pub fn relay_subscribe_request(&self, start: Instant) {
let elapsed = start.elapsed();

let ctx = Context::current();
self.relay_subscribe_request_latency
.record(&ctx, elapsed.as_millis() as u64, &[]);
}

pub fn postgres_query(&self, query_name: &'static str, start: Instant) {
let elapsed = start.elapsed();

Expand Down
22 changes: 9 additions & 13 deletions src/model/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ed25519_dalek::SigningKey,
relay_rpc::domain::{ProjectId, Topic},
serde::{Deserialize, Serialize},
sqlx::{FromRow, PgPool, Postgres},
sqlx::{FromRow, PgExecutor, PgPool, Postgres, Transaction},

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / Integration Tests

unused import: `PgExecutor`

Check failure on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Clippy

unused import: `PgExecutor`

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Unit Tests

unused import: `PgExecutor`

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / Integration Tests

unused import: `PgExecutor`

Check failure on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Clippy

unused import: `PgExecutor`

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Unit Tests

unused import: `PgExecutor`

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / Integration Tests

unused import: `PgExecutor`

Check failure on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Clippy

unused import: `PgExecutor`

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Unit Tests

unused import: `PgExecutor`
std::{collections::HashSet, time::Instant},
tracing::instrument,
uuid::Uuid,
Expand All @@ -28,13 +28,13 @@
pub topic: String,
}

pub async fn upsert_project(
pub async fn upsert_project<'e>(
project_id: ProjectId,
app_domain: &str,
topic: Topic,
authentication_key: &SigningKey,
subscribe_key: &StaticSecret,
postgres: &PgPool,
postgres: impl sqlx::PgExecutor<'e>,
metrics: Option<&Metrics>,
) -> Result<ProjectWithPublicKeys, sqlx::error::Error> {
let authentication_public_key = encode_authentication_public_key(authentication_key);
Expand All @@ -58,15 +58,15 @@
// TODO test idempotency
#[allow(clippy::too_many_arguments)]
#[instrument(skip(authentication_private_key, subscribe_private_key, postgres, metrics))]
async fn upsert_project_impl(
async fn upsert_project_impl<'e>(
project_id: ProjectId,
app_domain: &str,
topic: Topic,
authentication_public_key: String,
authentication_private_key: String,
subscribe_public_key: String,
subscribe_private_key: String,
postgres: &PgPool,
postgres: impl sqlx::PgExecutor<'e>,
metrics: Option<&Metrics>,
) -> Result<ProjectWithPublicKeys, sqlx::error::Error> {
let query = "
Expand Down Expand Up @@ -324,18 +324,16 @@
}

// TODO test idempotency
#[instrument(skip(postgres, metrics))]
#[instrument(skip(txn, metrics))]
pub async fn upsert_subscriber(
project: Uuid,
account: AccountId,
scope: HashSet<Uuid>,
notify_key: &[u8; 32],
notify_topic: Topic,
postgres: &PgPool,
txn: &mut Transaction<'_, Postgres>,
metrics: Option<&Metrics>,
) -> Result<SubscribeResponse, sqlx::error::Error> {
let mut txn = postgres.begin().await?;

// `xmax = 0`: https://stackoverflow.com/a/39204667

let query = "
Expand Down Expand Up @@ -363,15 +361,13 @@
.bind(hex::encode(notify_key))
.bind(notify_topic.as_ref())
.bind(Utc::now() + chrono::Duration::days(30))
.fetch_one(&mut *txn)

Check failure on line 364 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / Integration Tests

the trait bound `&mut Transaction<'_, Postgres>: sqlx::Executor<'_>` is not satisfied

Check failure on line 364 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Clippy

the trait bound `&mut sqlx::Transaction<'_, sqlx::Postgres>: sqlx::Executor<'_>` is not satisfied

Check failure on line 364 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Unit Tests

the trait bound `&mut Transaction<'_, Postgres>: sqlx::Executor<'_>` is not satisfied

Check failure on line 364 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / Integration Tests

the trait bound `&mut Transaction<'_, Postgres>: sqlx::Executor<'_>` is not satisfied

Check failure on line 364 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Clippy

the trait bound `&mut sqlx::Transaction<'_, sqlx::Postgres>: sqlx::Executor<'_>` is not satisfied

Check failure on line 364 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Unit Tests

the trait bound `&mut Transaction<'_, Postgres>: sqlx::Executor<'_>` is not satisfied

Check failure on line 364 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / Integration Tests

the trait bound `&mut Transaction<'_, Postgres>: sqlx::Executor<'_>` is not satisfied

Check failure on line 364 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Clippy

the trait bound `&mut sqlx::Transaction<'_, sqlx::Postgres>: sqlx::Executor<'_>` is not satisfied

Check failure on line 364 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Unit Tests

the trait bound `&mut Transaction<'_, Postgres>: sqlx::Executor<'_>` is not satisfied
.await?;
if let Some(metrics) = metrics {
metrics.postgres_query("upsert_subscriber", start);
}

update_subscriber_scope(subscriber.id, scope, &mut txn, metrics).await?;

txn.commit().await?;
update_subscriber_scope(subscriber.id, scope, txn, metrics).await?;

Ok(subscriber)
}
Expand Down Expand Up @@ -415,7 +411,7 @@
async fn update_subscriber_scope(
subscriber: Uuid,
scope: HashSet<Uuid>,
txn: &mut sqlx::Transaction<'_, Postgres>,
txn: &mut Transaction<'_, Postgres>,
metrics: Option<&Metrics>,
) -> Result<(), sqlx::error::Error> {
let query = "
Expand Down
57 changes: 56 additions & 1 deletion src/publish_relay_message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use {
crate::metrics::Metrics,
relay_client::{error::Error, http::Client},
relay_rpc::rpc::{msg_id::MsgId, Publish},
relay_rpc::{
domain::Topic,
rpc::{msg_id::MsgId, Publish},
},
std::time::{Duration, Instant},
tokio::time::sleep,
tracing::{error, instrument, warn},
Expand Down Expand Up @@ -79,3 +82,55 @@ pub async fn publish_relay_message(
}
Ok(())
}

#[instrument(skip_all)]
pub async fn subscribe_relay_topic(
relay_ws_client: &relay_client::websocket::Client,
topic: &Topic,
metrics: Option<&Metrics>,
) -> Result<(), Error> {
let start = Instant::now();

let client_publish_call = || async {
let start = Instant::now();
let result = relay_ws_client.subscribe_blocking(topic.clone()).await;
if let Some(metrics) = metrics {
metrics.relay_subscribe_request(start);
}
result
};

let mut tries = 0;
while let Err(e) = client_publish_call().await {
tries += 1;
let is_permenant = tries >= 10;
if let Some(metrics) = metrics {
metrics.relay_subscribe_failure(is_permenant);
}

if is_permenant {
error!("Permenant error subscribing to topic {topic}, took {tries} tries: {e:?}");

if let Some(metrics) = metrics {
// TODO make DRY with end-of-function call
metrics.relay_subscribe(false, start);
}
return Err(e);
}

let retry_in = Duration::from_secs(1);
warn!(
"Temporary error subscribing to topic {topic}, retrying attempt {tries} in {retry_in:?}: {e:?}"
);
sleep(retry_in).await;
}

if let Some(metrics) = metrics {
metrics.relay_subscribe(true, start);
}

// Sleep to account for some replication lag. Without this, the subscription may not be active on all nodes
sleep(Duration::from_millis(250)).await;

Ok(())
}
33 changes: 29 additions & 4 deletions src/services/public_http_server/handlers/subscribe_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use {
crate::{
error::NotifyServerError,
model::helpers::upsert_project,
publish_relay_message::{publish_relay_message, subscribe_relay_topic},
rate_limit::{self, Clock, RateLimitError},
registry::{extractor::AuthedProjectId, storage::redis::Redis},
spec::{NOTIFY_NOOP_TAG, NOTIFY_NOOP_TTL},
state::AppState,
utils::topic_from_key,
},
Expand All @@ -17,10 +19,10 @@ use {
hyper::StatusCode,
once_cell::sync::Lazy,
regex::Regex,
relay_rpc::domain::ProjectId,
relay_rpc::{domain::ProjectId, rpc::Publish},
serde::{Deserialize, Serialize},
serde_json::json,
std::sync::Arc,
std::sync::{Arc, OnceLock},
tracing::{info, instrument},
x25519_dalek::{PublicKey, StaticSecret},
};
Expand Down Expand Up @@ -79,13 +81,14 @@ pub async fn handler(

let authentication_key = ed25519_dalek::SigningKey::generate(&mut OsRng);

let mut txn = state.postgres.begin().await?;
let project = upsert_project(
project_id,
&app_domain,
topic.clone(),
&authentication_key,
&subscribe_key,
&state.postgres,
&mut *txn,
state.metrics.as_ref(),
)
.await
Expand All @@ -101,9 +104,31 @@ pub async fn handler(
// Don't call subscribe if we are already subscribed in a previous request
if project.topic == topic.as_ref() {
info!("Subscribing to project topic: {topic}");
state.relay_ws_client.subscribe(topic).await?;
subscribe_relay_topic(&state.relay_ws_client, &topic, state.metrics.as_ref()).await?;

// Send noop to extend ttl of relay's mapping
info!("Timing: Publishing noop to notify_topic");
publish_relay_message(
&state.relay_http_client,
&Publish {
topic,
message: {
// Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime
static LOCK: OnceLock<Arc<str>> = OnceLock::new();
LOCK.get_or_init(|| "".into()).clone()
},
tag: NOTIFY_NOOP_TAG,
ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32,
prompt: false,
},
state.metrics.as_ref(),
)
.await?;
info!("Timing: Finished publishing noop to notify_topic");
}

txn.commit().await?;

Ok(Json(SubscribeTopicResponseBody {
authentication_key: project.authentication_public_key,
subscribe_key: project.subscribe_public_key,
Expand Down
Loading
Loading