Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(notification): send notification to admin when job failed #3653

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ee/tabby-ui/app/globals.css
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,7 @@
.dialog-without-close-btn > button {
display: none;
}

.unread-notification::before {
@apply content-[''] float-left w-2 h-2 mr-1.5 mt-2 rounded-full bg-red-400;
}
39 changes: 17 additions & 22 deletions ee/tabby-ui/components/notification-box.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ import { useMutation } from '@/lib/tabby/gql'
import { notificationsQuery } from '@/lib/tabby/query'
import { ArrayElementType } from '@/lib/types'
import { cn } from '@/lib/utils'

import LoadingWrapper from './loading-wrapper'
import { ListSkeleton } from './skeleton'
import { Button } from './ui/button'
import { Button } from '@/components/ui/button'
import {
DropdownMenu,
DropdownMenuContent,
DropdownMenuTrigger
} from './ui/dropdown-menu'
import { IconBell, IconCheck } from './ui/icons'
import { Separator } from './ui/separator'
import { Tabs, TabsList, TabsTrigger } from './ui/tabs'
} from '@/components/ui/dropdown-menu'
import { IconBell, IconCheck } from '@/components/ui/icons'
import { Separator } from '@/components/ui/separator'
import { Tabs, TabsList, TabsTrigger } from '@/components/ui/tabs'
import LoadingWrapper from '@/components/loading-wrapper'
import { MemoizedReactMarkdown } from '@/components/markdown'
import { ListSkeleton } from '@/components/skeleton'

interface Props extends HTMLAttributes<HTMLDivElement> {}

Expand Down Expand Up @@ -89,7 +89,7 @@ export function NotificationBox({ className, ...rest }: Props) {
</div>
<Separator />
<Tabs
className="relative my-2 flex-1 overflow-y-auto px-4"
className="relative my-2 flex-1 overflow-y-auto px-5"
defaultValue="unread"
>
<TabsList className="sticky top-0 z-10 grid w-full grid-cols-2">
Expand Down Expand Up @@ -156,8 +156,6 @@ interface NotificationItemProps extends HTMLAttributes<HTMLDivElement> {
}

function NotificationItem({ data }: NotificationItemProps) {
const { title, content } = resolveNotification(data.content)

const markNotificationsRead = useMutation(markNotificationsReadMutation)

const onClickMarkRead = () => {
Expand All @@ -168,17 +166,14 @@ function NotificationItem({ data }: NotificationItemProps) {

return (
<div className="space-y-1.5">
<div className="space-y-1.5">
<div className="flex items-center gap-1.5 overflow-hidden text-sm font-medium">
{!data.read && (
<span className="h-2 w-2 shrink-0 rounded-full bg-red-400"></span>
)}
<span className="flex-1 truncate">{title}</span>
</div>
<div className="whitespace-pre-wrap break-words text-sm text-muted-foreground">
{content}
</div>
</div>
<MemoizedReactMarkdown
className={cn(
'prose max-w-none break-words dark:prose-invert prose-p:leading-relaxed prose-p:my-1 text-sm',
{ 'unread-notification': !data.read }
)}
>
{data.content}
</MemoizedReactMarkdown>
<div className="flex items-center justify-between text-xs text-muted-foreground">
<span className="text-muted-foreground">
{formatNotificationTime(data.createdAt)}
Expand Down
38 changes: 25 additions & 13 deletions ee/tabby-webserver/src/service/background_job/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tabby_db::DbConn;
use tabby_schema::{context::ContextService, CoreError};
use tracing::warn;

use super::helper::Job;

Expand All @@ -14,6 +15,16 @@ impl Job for DbMaintainanceJob {
const NAME: &'static str = "db_maintainance";
}

macro_rules! append_error {
zwpaper marked this conversation as resolved.
Show resolved Hide resolved
($errors:expr, $e:expr, $($arg:tt)*) => {
{
let msg = format!($($arg)*);
warn!("{}: {}", msg, $e);
$errors.push(msg);
}
};
}

impl DbMaintainanceJob {
pub async fn cron(
now: DateTime<Utc>,
Expand All @@ -23,13 +34,13 @@ impl DbMaintainanceJob {
let mut errors = vec![];

if let Err(e) = db.delete_expired_token().await {
errors.push(format!("Failed to delete expired token: {}", e));
append_error!(errors, e, "Failed to delete expired tokens");
};
if let Err(e) = db.delete_expired_password_resets().await {
errors.push(format!("Failed to delete expired password resets: {}", e));
append_error!(errors, e, "Failed to delete expired password resets");
};
if let Err(e) = db.delete_expired_ephemeral_threads().await {
errors.push(format!("Failed to delete expired ephemeral threads: {}", e));
append_error!(errors, e, "Failed to delete expired ephemeral threads");
};

// Read all active sources
Expand All @@ -44,27 +55,28 @@ impl DbMaintainanceJob {
.delete_unused_source_id_read_access_policy(&active_source_ids)
.await
{
errors.push(format!(
"Failed to delete unused source id read access policy: {}",
e
));
append_error!(
errors,
e,
"Failed to delete unused source id read access policy"
);
};
}
Err(e) => {
errors.push(format!("Failed to read active sources: {}", e));
append_error!(errors, e, "Failed to read active sources");
}
}

if let Err(e) = Self::data_retention(now, &db).await {
errors.push(format!("Failed to run data retention job: {}", e));
append_error!(errors, e, "Failed to run data retention job");
}

if errors.is_empty() {
Ok(())
} else {
Err(CoreError::Other(anyhow::anyhow!(
"Failed to run db maintenance job:\n{}",
errors.join(";\n")
"Failed to run db maintenance job:\n\n{}",
errors.join(";\n\n")
)))
}
}
Expand All @@ -90,8 +102,8 @@ impl DbMaintainanceJob {
Ok(())
} else {
Err(CoreError::Other(anyhow::anyhow!(
"Failed to run data retention job:\n{}",
errors.join(";\n")
"{}",
errors.join(";\n\n")
)))
}
}
Expand Down
91 changes: 72 additions & 19 deletions ee/tabby-webserver/src/service/background_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod license_check;
mod third_party_integration;
mod web_crawler;

use std::{str::FromStr, sync::Arc};
use std::{fmt::Display, str::FromStr, sync::Arc};

use cron::Schedule;
use futures::StreamExt;
Expand All @@ -24,8 +24,9 @@ use tabby_schema::{
integration::IntegrationService,
job::JobService,
license::LicenseService,
notification::NotificationService,
notification::{NotificationRecipient, NotificationService},
repository::{GitRepositoryService, RepositoryService, ThirdPartyRepositoryService},
AsID,
};
use third_party_integration::SchedulerGithubGitlabJob;
use tracing::{debug, warn};
Expand All @@ -42,6 +43,24 @@ pub enum BackgroundJobEvent {
IndexGarbageCollection,
}

impl Display for BackgroundJobEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BackgroundJobEvent::SchedulerGitRepository(repository) => {
write!(f, "SyncGitRepository::{}", repository.git_url)
}
BackgroundJobEvent::SchedulerGithubGitlabRepository(integration_id) => {
write!(f, "SyncGithubGitlabRepository::{}", integration_id)
}
BackgroundJobEvent::SyncThirdPartyRepositories(integration_id) => {
write!(f, "SyncThirdPartyRepositories::{}", integration_id)
}
BackgroundJobEvent::WebCrawler(job) => write!(f, "WebCrawler::{}", job.url()),
BackgroundJobEvent::IndexGarbageCollection => write!(f, "IndexGarbageCollection"),
}
}
}

impl BackgroundJobEvent {
pub fn name(&self) -> &'static str {
match self {
Expand All @@ -60,6 +79,22 @@ impl BackgroundJobEvent {
}
}

#[macro_export]
macro_rules! notify_job_error {
($notification_service:expr, $err:expr, $($arg:tt)*) => {{
let msg = format!($($arg)*);
warn!("{}: {:?}", msg, $err);
$notification_service.create(
NotificationRecipient::Admin,
&format!(
r#"Background job failed


{}"#, msg),
).await.unwrap();
}};
}

pub async fn start(
db: DbConn,
job_service: Arc<dyn JobService>,
Expand Down Expand Up @@ -93,14 +128,16 @@ pub async fn start(
continue;
}

let logger = JobLogger::new(db.clone(), job.id);
debug!("Background job {} started, command: {}", job.id, job.command);
let job_id = job.id;
let logger = JobLogger::new(db.clone(), job_id);
debug!("Background job {} started, command: {}", job_id, job.command);
let Ok(event) = serde_json::from_str::<BackgroundJobEvent>(&job.command) else {
logkit::info!(exit_code = -1; "Failed to parse background job event, marking it as failed");
continue;
};

if let Err(err) = match event {
let job_name = event.to_string();
let result = match event {
BackgroundJobEvent::SchedulerGitRepository(repository_config) => {
let job = SchedulerGitJob::new(repository_config);
job.run(embedding.clone()).await
Expand All @@ -120,44 +157,60 @@ pub async fn start(
let job = IndexGarbageCollection;
job.run(repository_service.clone(), context_service.clone()).await
}
} {
logkit::info!(exit_code = 1; "Job failed {}", err);
} else {
logkit::info!(exit_code = 0; "Job completed successfully");
}
logger.finalize().await;
};
debug!("Background job {} completed", job.id);

match &result {
Err(err) => {
logkit::info!(exit_code = 1; "Job failed {}", err);
logger.finalize().await;
notify_job_error!(notification_service, err, r#"`{}` failed.

Please visit [Jobs Detail](/jobs/detail?id={}) to check the error.
"#,
job_name, job_id.as_id());
},
_ => {
logkit::info!(exit_code = 0; "Job completed successfully");
logger.finalize().await;
}
}
},
Some(now) = hourly.next() => {
if let Err(err) = DbMaintainanceJob::cron(now, context_service.clone(), db.clone()).await {
warn!("Database maintainance failed: {:?}", err);
let _ = notification_service.create(NotificationRecipient::Admin,
&format!(r#"Database maintenance failed.

{:?}

Please check stderr logs for details.
"#, err)).await;
}

if let Err(err) = SchedulerGitJob::cron(now, git_repository_service.clone(), job_service.clone()).await {
warn!("Scheduler job failed: {:?}", err);
notify_job_error!(notification_service, err, "Schedule git job failed.\n\nPlease check stderr logs for details.");
}

if let Err(err) = SyncIntegrationJob::cron(now, integration_service.clone(), job_service.clone()).await {
warn!("Sync integration job failed: {:?}", err);
notify_job_error!(notification_service, err, "Sync integration job failed.\n\nPlease check stderr logs for details.");
}

if let Err(err) = SchedulerGithubGitlabJob::cron(now, third_party_repository_service.clone(), job_service.clone()).await {
warn!("Index issues job failed: {err:?}");
notify_job_error!(notification_service, err, "Schedule GitHub/GitLab job failed.\n\nPlease check stderr logs for details.");
}

if let Err(err) = IndexGarbageCollection.run(repository_service.clone(), context_service.clone()).await {
warn!("Index garbage collection job failed: {err:?}");
notify_job_error!(notification_service, err, "Index garbage collection job failed.\n\nPlease check stderr logs for details.");
}

},
Some(now) = daily.next() => {
if let Err(err) = LicenseCheckJob::cron(now, license_service.clone(), notification_service.clone()).await {
warn!("License check job failed: {err:?}");
notify_job_error!(notification_service, err, "License check job failed.\n\nPlease check stderr logs for details.");
}
}
else => {
warn!("Background job channel closed");
break;
return;
}
};
}
Expand Down
4 changes: 4 additions & 0 deletions ee/tabby-webserver/src/service/background_job/web_crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ impl WebCrawlerJob {
}
}

pub fn url(&self) -> &str {
&self.url
}

pub async fn run_impl(self, embedding: Arc<dyn Embedding>) -> tabby_schema::Result<()> {
logkit::info!("Starting doc index pipeline for {}", self.url);
let embedding = embedding.clone();
Expand Down
Loading