Skip to content

Commit c0eed90

Browse files
authored
feat: adding dead letter processing and publishing timeout (#208)
* feat: adding dead letter processing and publishing timeout * chore: adding give up check, removing unnecesary error * chore(tests): adding test_dead_letters * chore(tests): adding listening for the pg_notify to the test * chore: changing timeouts to the Duration * chore: resolving merge conflict on a new updates * chore: choring comments, removing unnecessary match
1 parent 864c6e9 commit c0eed90

File tree

4 files changed

+324
-16
lines changed

4 files changed

+324
-16
lines changed

src/services/publisher_service/helpers.rs

+41
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use {
33
crate::{metrics::Metrics, model::types::AccountId, types::Notification},
44
relay_rpc::domain::{ProjectId, Topic},
55
sqlx::{FromRow, PgPool, Postgres},
6+
std::time::Duration,
67
tracing::{error, instrument},
78
uuid::Uuid,
89
wc::metrics::otel::Context,
@@ -226,3 +227,43 @@ pub async fn update_metrics_on_queue_stats(metrics: &Metrics, postgres: &PgPool)
226227
}
227228
}
228229
}
230+
231+
/// Checks for messages in the `processing` state for more than threshold
232+
/// and put it back in a `queued` state for processing
233+
#[instrument(skip(postgres))]
234+
pub async fn dead_letters_check(
235+
threshold: Duration,
236+
postgres: &PgPool,
237+
) -> std::result::Result<(), sqlx::error::Error> {
238+
let update_status_query = "
239+
UPDATE subscriber_notification
240+
SET status = 'queued'
241+
WHERE status = 'processing'
242+
AND EXTRACT(EPOCH FROM (NOW() - updated_at)) > $1::INTEGER
243+
";
244+
sqlx::query::<Postgres>(update_status_query)
245+
.bind(threshold.as_secs() as i64)
246+
.execute(postgres)
247+
.await?;
248+
Ok(())
249+
}
250+
251+
/// Checks for message is created more than threshold
252+
#[instrument(skip(postgres))]
253+
pub async fn dead_letter_give_up_check(
254+
notification: Uuid,
255+
threshold: Duration,
256+
postgres: &PgPool,
257+
) -> std::result::Result<bool, sqlx::error::Error> {
258+
let query_to_check = "
259+
SELECT now() - created_at > interval '$1 seconds'
260+
FROM subscriber_notification
261+
WHERE id = $2
262+
";
263+
let row: (bool,) = sqlx::query_as(query_to_check)
264+
.bind(threshold.as_secs() as i64)
265+
.bind(notification)
266+
.fetch_one(postgres)
267+
.await?;
268+
Ok(row.0)
269+
}

src/services/publisher_service/mod.rs

+116-16
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,22 @@ use {
1111
types::{Envelope, EnvelopeType0},
1212
},
1313
base64::Engine,
14-
helpers::update_message_processing_status,
14+
helpers::{dead_letter_give_up_check, update_message_processing_status},
1515
relay_client::http::Client,
1616
relay_rpc::{
1717
domain::{ClientId, DecodedClientId, Topic},
1818
rpc::{msg_id::MsgId, Publish, JSON_RPC_VERSION_STR},
1919
},
2020
sqlx::{postgres::PgListener, PgPool},
21-
std::sync::{
22-
atomic::{AtomicUsize, Ordering},
23-
Arc,
21+
std::{
22+
sync::{
23+
atomic::{AtomicUsize, Ordering},
24+
Arc,
25+
},
26+
time::Duration,
2427
},
25-
tracing::{info, instrument, warn},
28+
tokio::time::{interval, timeout},
29+
tracing::{error, info, instrument, warn},
2630
types::SubscriberNotificationStatus,
2731
wc::metrics::otel::Context,
2832
};
@@ -31,12 +35,19 @@ pub mod helpers;
3135
pub mod types;
3236

3337
// TODO: These should be configurable, add to the config
34-
// Maximum of the parallel messages processing workers
38+
/// Maximum of the parallel messages processing workers
3539
const MAX_WORKERS: usize = 10;
36-
// Number of workers to be spawned on the service start to clean the queue
40+
/// Number of workers to be spawned on the service start to clean the queue
3741
const START_WORKERS: usize = 10;
3842
// Messages queue stats observing database polling interval
3943
const QUEUE_STATS_POLLING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
44+
/// Maximum publishing time before the publish will be considered as failed
45+
/// and the messages in queue with the `processing` state will be returned to the queue
46+
const PUBLISHING_TIMEOUT: Duration = Duration::from_secs(60 * 5); // 5 minutes
47+
/// Interval to check for dead letters
48+
const DEAD_LETTER_POLL_INTERVAL: Duration = Duration::from_secs(60);
49+
/// Total maximum time to process the message before it will be considered as failed
50+
const PUBLISHING_GIVE_UP_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24); // One day
4051

4152
#[instrument(skip_all)]
4253
pub async fn start(
@@ -59,7 +70,20 @@ pub async fn start(
5970
});
6071
}
6172

62-
// TODO: Spawned tasks counter should be exported to metrics
73+
// Spawning dead letters check task
74+
tokio::spawn({
75+
let postgres = postgres.clone();
76+
async move {
77+
let mut poll_interval = interval(DEAD_LETTER_POLL_INTERVAL);
78+
loop {
79+
poll_interval.tick().await;
80+
if let Err(e) = helpers::dead_letters_check(PUBLISHING_TIMEOUT, &postgres).await {
81+
warn!("Error on dead letters check: {:?}", e);
82+
}
83+
}
84+
}
85+
});
86+
6387
let spawned_tasks_counter = Arc::new(AtomicUsize::new(0));
6488

6589
// Spawning initial workers to process messages from the queue in case
@@ -176,16 +200,36 @@ async fn process_queued_messages(
176200
if let Some(notification) = result {
177201
let notification_id = notification.subscriber_notification;
178202
info!("Got a notification with id: {}", notification_id);
179-
process_notification(notification, relay_http_client.clone(), metrics, analytics)
180-
.await?;
181203

182-
update_message_processing_status(
183-
notification_id,
184-
SubscriberNotificationStatus::Published,
185-
postgres,
204+
let process_result = process_with_timeout(
205+
PUBLISHING_TIMEOUT,
206+
notification,
207+
relay_http_client.clone(),
186208
metrics,
187-
)
188-
.await?;
209+
analytics,
210+
);
211+
212+
match process_result.await {
213+
Ok(_) => {
214+
update_message_processing_status(
215+
notification_id,
216+
SubscriberNotificationStatus::Published,
217+
postgres,
218+
metrics,
219+
)
220+
.await?;
221+
}
222+
Err(e) => {
223+
warn!("Error on `process_notification`: {:?}", e);
224+
update_message_status_queued_or_failed(
225+
notification_id,
226+
PUBLISHING_GIVE_UP_TIMEOUT,
227+
postgres,
228+
metrics,
229+
)
230+
.await?;
231+
}
232+
}
189233
} else {
190234
info!("No more notifications to process, stopping the loop");
191235
break;
@@ -194,6 +238,31 @@ async fn process_queued_messages(
194238
Ok(())
195239
}
196240

241+
/// Process publishing with the threshold timeout
242+
#[instrument(skip(relay_http_client, metrics, analytics, notification))]
243+
async fn process_with_timeout(
244+
execution_threshold: Duration,
245+
notification: NotificationToProcess,
246+
relay_http_client: Arc<Client>,
247+
metrics: Option<&Metrics>,
248+
analytics: &NotifyAnalytics,
249+
) -> crate::error::Result<()> {
250+
match timeout(
251+
execution_threshold,
252+
process_notification(notification, relay_http_client.clone(), metrics, analytics),
253+
)
254+
.await
255+
{
256+
Ok(result) => {
257+
result?;
258+
}
259+
Err(e) => {
260+
return Err(crate::error::Error::TokioTimeElapsed(e));
261+
}
262+
};
263+
Ok(())
264+
}
265+
197266
#[instrument(skip_all, fields(notification = ?notification))]
198267
async fn process_notification(
199268
notification: NotificationToProcess,
@@ -263,3 +332,34 @@ async fn process_notification(
263332

264333
Ok(())
265334
}
335+
336+
/// Updates message status back to `Queued` or mark as `Failed`
337+
/// depending on the `giveup_threshold` messsage creation time
338+
#[instrument(skip(postgres, metrics))]
339+
async fn update_message_status_queued_or_failed(
340+
notification_id: uuid::Uuid,
341+
giveup_threshold: Duration,
342+
postgres: &PgPool,
343+
metrics: Option<&Metrics>,
344+
) -> crate::error::Result<()> {
345+
if dead_letter_give_up_check(notification_id, giveup_threshold, postgres).await? {
346+
error!("Message was not processed during the giving up threshold, marking it as failed");
347+
update_message_processing_status(
348+
notification_id,
349+
SubscriberNotificationStatus::Failed,
350+
postgres,
351+
metrics,
352+
)
353+
.await?;
354+
} else {
355+
update_message_processing_status(
356+
notification_id,
357+
SubscriberNotificationStatus::Queued,
358+
postgres,
359+
metrics,
360+
)
361+
.await?;
362+
}
363+
364+
Ok(())
365+
}

src/services/publisher_service/types.rs

+3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub enum SubscriberNotificationStatus {
88
Queued,
99
Processing,
1010
Published,
11+
Failed,
1112
NotSubscribed,
1213
WrongScope,
1314
RateLimited,
@@ -19,6 +20,7 @@ impl fmt::Display for SubscriberNotificationStatus {
1920
SubscriberNotificationStatus::Queued => write!(f, "queued"),
2021
SubscriberNotificationStatus::Processing => write!(f, "processing"),
2122
SubscriberNotificationStatus::Published => write!(f, "published"),
23+
SubscriberNotificationStatus::Failed => write!(f, "failed"),
2224
SubscriberNotificationStatus::NotSubscribed => write!(f, "not-subscribed"),
2325
SubscriberNotificationStatus::WrongScope => write!(f, "wrong-scope"),
2426
SubscriberNotificationStatus::RateLimited => write!(f, "rate-limited"),
@@ -33,6 +35,7 @@ impl FromStr for SubscriberNotificationStatus {
3335
"queued" => Ok(SubscriberNotificationStatus::Queued),
3436
"processing" => Ok(SubscriberNotificationStatus::Processing),
3537
"published" => Ok(SubscriberNotificationStatus::Published),
38+
"failed" => Ok(SubscriberNotificationStatus::Failed),
3639
"not-subscribed" => Ok(SubscriberNotificationStatus::NotSubscribed),
3740
"wrong-scope" => Ok(SubscriberNotificationStatus::WrongScope),
3841
"rate-limited" => Ok(SubscriberNotificationStatus::RateLimited),

0 commit comments

Comments
 (0)