Skip to content

Commit

Permalink
feat: cleanup task
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgehermo9 committed Oct 27, 2024
1 parent 167868e commit a630c30
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 13 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ PORT=3000
DATABASE_URL=postgres://postgres:password@localhost:5432/db
DATABASE_CONNECTIONS=5
MAX_SHARE_EXPIRATION_TIME_SECS=604800
CLEANUP_TASK_CRON_EXPRESSION="0 0 0 * * * *"
CLEANUP_TASK_CRON_EXPRESSION="0 0 */4 * * * *"
1 change: 1 addition & 0 deletions crates/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
apalis = { version = "0.5.5", features = ["cron", "layers"] }
tower = { version = "0.5.1" }
cron = "0.12.1"
lazy_static = "1.5.0"
8 changes: 4 additions & 4 deletions crates/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn main() {
);

let cleanup_task_cron_expression =
env::var("CLEANUP_TASK_CRON_EXPRESSION").unwrap_or_else(|_| "0 0 0 * * * *".to_string());
env::var("CLEANUP_TASK_CRON_EXPRESSION").unwrap_or_else(|_| "0 0 */4 * * * *".to_string());

sqlx::migrate!("./migrations")
.run(&db_connection)
Expand Down Expand Up @@ -105,10 +105,10 @@ async fn start_background_tasks(
.parse()
.map_err(|e| SchedulerError::CronExpressionError(cron_expression.to_string(), e))?;

tracing::info!(schedule = %schedule, "Starting cleanup task worker");
tracing::info!(schedule = %schedule, "Started cleanup worker");

let worker = WorkerBuilder::new("cleanup-task-worker")
.layer(RetryLayer::new(RetryPolicy::retries(5)))
let worker = WorkerBuilder::new("cleanup-worker")
.layer(RetryLayer::new(RetryPolicy::retries(3)))
.stream(CronStream::new(schedule).into_stream())
.data(db_connection)
.build_fn(cleanup::execute_cleanup);
Expand Down
59 changes: 51 additions & 8 deletions crates/server/src/tasks/cleanup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use apalis::prelude::{Data, Job};
use chrono::{DateTime, Utc};
use chrono::{DateTime, SecondsFormat, Utc};
use lazy_static::lazy_static;
use sqlx::PgPool;
use tokio::sync::Mutex;

lazy_static! {
static ref CLEANUP_TASK_LOCK: Mutex<()> = Mutex::new(());
}

#[derive(Default, Debug, Clone)]
pub struct CleanupTask {
Expand All @@ -19,14 +25,51 @@ impl Job for CleanupTask {

#[derive(Debug, thiserror::Error)]
pub enum Error {
// TODO
#[error("Cleanup task error")]
CleanupTaskError,
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Cleanup task was already running. Skipped execution with timestamp {timestamp}")]
AlreadyRunning { timestamp: String },
}

pub async fn execute_cleanup(task: CleanupTask, db_connection: Data<PgPool>) -> Result<(), Error> {
let rfc3339_start_time = task.start_time.to_rfc3339();
tracing::info!(start_time = rfc3339_start_time, "Executing cleanup task");
impl CleanupTask {
async fn cleanup(&self, db_connection: Data<PgPool>) -> Result<u64, Error> {
let rfc3339_start_time = self.start_time.to_rfc3339_opts(SecondsFormat::Millis, true);

let Ok(_lock) = CLEANUP_TASK_LOCK.try_lock() else {
return Err(Error::AlreadyRunning {
timestamp: rfc3339_start_time,
});
};

tracing::info!(timestamp = rfc3339_start_time, "Executing cleanup task");

let result = sqlx::query!(
r#"
DELETE FROM share
WHERE expires_at <= $1
"#,
self.start_time
)
.execute(&*db_connection)
.await?;

Err(Error::CleanupTaskError)
Ok(result.rows_affected())
}
}

pub async fn execute_cleanup(task: CleanupTask, db_connection: Data<PgPool>) -> Result<(), Error> {
match task.cleanup(db_connection).await {
Ok(rows_affected) => {
tracing::info!(deleted_shares = rows_affected, "Cleanup task completed",);
Ok(())
}
Err(e) => {
tracing::error!(reason = %e,"Error executing cleanup task");
match e {
// Do not retry if the task is already running
Error::AlreadyRunning { .. } => Ok(()),
_ => Err(e),
}
}
}
}

0 comments on commit a630c30

Please sign in to comment.