Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(notification): send notification to admin when job failed #3653

Closed
wants to merge 14 commits into from
Closed
41 changes: 26 additions & 15 deletions ee/tabby-webserver/src/service/background_job/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tabby_db::DbConn;
use tabby_schema::{context::ContextService, notification::NotificationService, CoreError};
use tabby_schema::{context::ContextService, CoreError};
use tracing::warn;

use super::helper::Job;

Expand All @@ -14,23 +15,32 @@ impl Job for DbMaintainanceJob {
const NAME: &'static str = "db_maintainance";
}

macro_rules! append_error {
zwpaper marked this conversation as resolved.
Show resolved Hide resolved
($errors:expr, $e:expr, $($arg:tt)*) => {
{
let msg = format!($($arg)*);
warn!("{}: {}", msg, $e);
$errors.push(msg);
}
};
}

impl DbMaintainanceJob {
pub async fn cron(
now: DateTime<Utc>,
context: Arc<dyn ContextService>,
db: DbConn,
notification_service: Arc<dyn NotificationService>,
) -> tabby_schema::Result<()> {
let mut errors = vec![];

if let Err(e) = db.delete_expired_token().await {
errors.push(format!("Failed to delete expired token: {}", e));
append_error!(errors, e, "Failed to delete expired tokens");
};
if let Err(e) = db.delete_expired_password_resets().await {
errors.push(format!("Failed to delete expired password resets: {}", e));
append_error!(errors, e, "Failed to delete expired password resets");
};
if let Err(e) = db.delete_expired_ephemeral_threads().await {
errors.push(format!("Failed to delete expired ephemeral threads: {}", e));
append_error!(errors, e, "Failed to delete expired ephemeral threads");
};

// Read all active sources
Expand All @@ -45,27 +55,28 @@ impl DbMaintainanceJob {
.delete_unused_source_id_read_access_policy(&active_source_ids)
.await
{
errors.push(format!(
"Failed to delete unused source id read access policy: {}",
e
));
append_error!(
errors,
e,
"Failed to delete unused source id read access policy"
);
};
}
Err(e) => {
errors.push(format!("Failed to read active sources: {}", e));
append_error!(errors, e, "Failed to read active sources");
}
}

if let Err(e) = Self::data_retention(now, &db).await {
errors.push(format!("Failed to run data retention job: {}", e));
append_error!(errors, e, "Failed to run data retention job");
}

if errors.is_empty() {
Ok(())
} else {
Err(CoreError::Other(anyhow::anyhow!(
"Failed to run db maintenance job:\n{}",
errors.join(";\n")
"Failed to run db maintenance job:\n\n{}",
errors.join(";\n\n")
)))
}
}
Expand All @@ -91,8 +102,8 @@ impl DbMaintainanceJob {
Ok(())
} else {
Err(CoreError::Other(anyhow::anyhow!(
"Failed to run data retention job:\n{}",
errors.join(";\n")
"{}",
errors.join(";\n\n")
)))
}
}
Expand Down
48 changes: 37 additions & 11 deletions ee/tabby-webserver/src/service/background_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod license_check;
mod third_party_integration;
mod web_crawler;

use std::{str::FromStr, sync::Arc};
use std::{fmt::Display, str::FromStr, sync::Arc};

use cron::Schedule;
use futures::StreamExt;
Expand Down Expand Up @@ -43,6 +43,24 @@ pub enum BackgroundJobEvent {
IndexGarbageCollection,
}

impl Display for BackgroundJobEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BackgroundJobEvent::SchedulerGitRepository(repository) => {
write!(f, "SyncGitRepository::{}", repository.git_url)
}
BackgroundJobEvent::SchedulerGithubGitlabRepository(integration_id) => {
write!(f, "SyncGithubGitlabRepository::{}", integration_id)
}
BackgroundJobEvent::SyncThirdPartyRepositories(integration_id) => {
write!(f, "SyncThirdPartyRepositories::{}", integration_id)
}
BackgroundJobEvent::WebCrawler(job) => write!(f, "WebCrawler::{}", job.url()),
BackgroundJobEvent::IndexGarbageCollection => write!(f, "IndexGarbageCollection"),
}
}
}

impl BackgroundJobEvent {
pub fn name(&self) -> &'static str {
match self {
Expand Down Expand Up @@ -71,6 +89,7 @@ macro_rules! notify_job_error {
&format!(
r#"Background job failed


{}"#, msg),
).await.unwrap();
}};
Expand Down Expand Up @@ -117,7 +136,7 @@ pub async fn start(
continue;
};

let job_name = format!("{:?}", event);
let job_name = event.to_string();
let result = match event {
BackgroundJobEvent::SchedulerGitRepository(repository_config) => {
let job = SchedulerGitJob::new(repository_config);
Expand Down Expand Up @@ -145,8 +164,9 @@ pub async fn start(
Err(err) => {
logkit::info!(exit_code = 1; "Job failed {}", err);
logger.finalize().await;
notify_job_error!(notification_service, err, r#"Job {:?} failed,
Please visit [Jobs Detail](http://localhost:8080/jobs/detail?id={}) to check the error and retry.
notify_job_error!(notification_service, err, r#"`{}` failed.

Please visit [Jobs Detail](/jobs/detail?id={}) to check the error.
"#,
job_name, job_id.as_id());
},
Expand All @@ -157,29 +177,35 @@ Please visit [Jobs Detail](http://localhost:8080/jobs/detail?id={}) to check the
}
},
Some(now) = hourly.next() => {
if let Err(err) = DbMaintainanceJob::cron(now, context_service.clone(), db.clone(), notification_service.clone()).await {
warn!("Database maintenance failed: {:?}", err);
if let Err(err) = DbMaintainanceJob::cron(now, context_service.clone(), db.clone()).await {
let _ = notification_service.create(NotificationRecipient::Admin,
&format!(r#"Database maintenance failed.

{:?}

Please check stderr logs for details.
"#, err)).await;
}

if let Err(err) = SchedulerGitJob::cron(now, git_repository_service.clone(), job_service.clone()).await {
notify_job_error!(notification_service, err, "Scheduler job failed");
notify_job_error!(notification_service, err, "Schedule git job failed.\n\nPlease check stderr logs for details.");
}

if let Err(err) = SyncIntegrationJob::cron(now, integration_service.clone(), job_service.clone()).await {
notify_job_error!(notification_service, err, "Sync integration job failed");
notify_job_error!(notification_service, err, "Sync integration job failed.\n\nPlease check stderr logs for details.");
}

if let Err(err) = SchedulerGithubGitlabJob::cron(now, third_party_repository_service.clone(), job_service.clone()).await {
notify_job_error!(notification_service, err, "Index issues job failed");
notify_job_error!(notification_service, err, "Schedule GitHub/GitLab job failed.\n\nPlease check stderr logs for details.");
}

if let Err(err) = IndexGarbageCollection.run(repository_service.clone(), context_service.clone()).await {
notify_job_error!(notification_service, err, "Index garbage collection job failed");
notify_job_error!(notification_service, err, "Index garbage collection job failed.\n\nPlease check stderr logs for details.");
}
},
Some(now) = daily.next() => {
if let Err(err) = LicenseCheckJob::cron(now, license_service.clone(), notification_service.clone()).await {
notify_job_error!(notification_service, err, "License check job failed");
notify_job_error!(notification_service, err, "License check job failed.\n\nPlease check stderr logs for details.");
}
}
else => {
Expand Down
4 changes: 4 additions & 0 deletions ee/tabby-webserver/src/service/background_job/web_crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ impl WebCrawlerJob {
}
}

pub fn url(&self) -> &str {
&self.url
}

pub async fn run_impl(self, embedding: Arc<dyn Embedding>) -> tabby_schema::Result<()> {
logkit::info!("Starting doc index pipeline for {}", self.url);
let embedding = embedding.clone();
Expand Down
Loading