diff --git a/apps/dbagent/migrations/0006_schedules_last_run.sql b/apps/dbagent/migrations/0006_schedules_last_run.sql new file mode 100644 index 00000000..ce03f4d2 --- /dev/null +++ b/apps/dbagent/migrations/0006_schedules_last_run.sql @@ -0,0 +1,6 @@ +CREATE TYPE schedule_status AS ENUM ('disabled', 'scheduled', 'running'); + +ALTER TABLE schedules ADD COLUMN last_run TIMESTAMP; +ALTER TABLE schedules ADD COLUMN next_run TIMESTAMP; +ALTER TABLE schedules ADD COLUMN status schedule_status DEFAULT 'disabled'; +ALTER TABLE schedules ADD COLUMN failures INTEGER DEFAULT 0; diff --git a/apps/dbagent/package.json b/apps/dbagent/package.json index 117b7686..d9c58075 100644 --- a/apps/dbagent/package.json +++ b/apps/dbagent/package.json @@ -11,7 +11,8 @@ "tsc": "tsc --noEmit", "test": "vitest --passWithNoTests", "dbmigrate": "tsx scripts/dbmigrate.ts", - "load-test": "tsx scripts/load-testing/musicbrainz-loader.ts" + "load-test": "tsx scripts/load-testing/musicbrainz-loader.ts", + "dev-scheduler": "tsx scripts/scheduler.ts" }, "dependencies": { "@ai-sdk/anthropic": "^1.1.9", @@ -29,6 +30,7 @@ "bytes": "^3.1.2", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", + "cron-parser": "^5.0.4", "framer-motion": "^12.4.7", "kysely": "^0.27.5", "lucide-react": "^0.475.0", diff --git a/apps/dbagent/scripts/scheduler.ts b/apps/dbagent/scripts/scheduler.ts new file mode 100644 index 00000000..f8cd68ac --- /dev/null +++ b/apps/dbagent/scripts/scheduler.ts @@ -0,0 +1,24 @@ +async function tick() { + try { + const response = await fetch('http://localhost:4001/api/priv/schedule-tick', { + method: 'POST' + }); + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + console.log('Scheduler tick completed successfully'); + } catch (error) { + console.error('Error in scheduler tick:', error); + } +} + +// Run tick every 10 seconds +const INTERVAL_MS = 10000; +console.log(`Starting scheduler with ${INTERVAL_MS}ms interval`); + +setInterval(tick, INTERVAL_MS); + +// Run immediately on start +void tick(); diff --git a/apps/dbagent/src/app/api/priv/schedule-tick/route.ts b/apps/dbagent/src/app/api/priv/schedule-tick/route.ts new file mode 100644 index 00000000..7efca0c2 --- /dev/null +++ b/apps/dbagent/src/app/api/priv/schedule-tick/route.ts @@ -0,0 +1,6 @@ +import { checkAndRunJobs } from '~/lib/services/scheduler'; + +export async function POST() { + await checkAndRunJobs(); + return new Response('OK', { status: 200 }); +} diff --git a/apps/dbagent/src/components/monitoring/actions.ts b/apps/dbagent/src/components/monitoring/actions.ts index 8dc43600..309927db 100644 --- a/apps/dbagent/src/components/monitoring/actions.ts +++ b/apps/dbagent/src/components/monitoring/actions.ts @@ -7,25 +7,13 @@ import { getSchedule, getSchedules, insertSchedule, + Schedule, + scheduleGetNextRun, updateSchedule, - updateScheduleEnabled -} from '~/lib/db/monitoring'; + updateScheduleRunData +} from '~/lib/db/schedules'; import { listPlaybooks } from '~/lib/tools/playbooks'; -export type Schedule = { - id: string; - connectionId: string; - playbook: string; - scheduleType: string; - cronExpression?: string; - additionalInstructions?: string; - minInterval?: number; - maxInterval?: number; - lastRun?: string; - failures?: number; - enabled: boolean; -}; - export async function generateCronExpression(description: string): Promise { const prompt = `Generate a cron expression for the following schedule description: "${description}". Return strictly the cron expression, no quotes or anything else.`; @@ -39,6 +27,10 @@ export async function generateCronExpression(description: string): Promise { + if (schedule.enabled) { + schedule.status = 'scheduled'; + schedule.nextRun = scheduleGetNextRun(schedule, new Date()).toISOString(); + } return insertSchedule(schedule); } @@ -48,7 +40,15 @@ export async function actionUpdateSchedule(schedule: Schedule): Promise { const schedules = await getSchedules(); - console.log(schedules); + // Ensure last_run is serialized as string + schedules.forEach((schedule) => { + if (schedule.lastRun) { + schedule.lastRun = schedule.lastRun.toString(); + } + if (schedule.nextRun) { + schedule.nextRun = schedule.nextRun.toString(); + } + }); return schedules; } @@ -65,5 +65,18 @@ export async function actionListPlaybooks(): Promise { } export async function actionUpdateScheduleEnabled(scheduleId: string, enabled: boolean) { - return updateScheduleEnabled(scheduleId, enabled); + if (enabled) { + const schedule = await getSchedule(scheduleId); + schedule.enabled = true; + schedule.status = 'scheduled'; + schedule.nextRun = scheduleGetNextRun(schedule, new Date()).toUTCString(); + console.log('nextRun', schedule.nextRun); + await updateScheduleRunData(schedule); + } else { + const schedule = await getSchedule(scheduleId); + schedule.enabled = false; + schedule.status = 'disabled'; + schedule.nextRun = undefined; + await updateScheduleRunData(schedule); + } } diff --git a/apps/dbagent/src/components/monitoring/schedule-form.tsx b/apps/dbagent/src/components/monitoring/schedule-form.tsx index 75c8d4e1..4bc84893 100644 --- a/apps/dbagent/src/components/monitoring/schedule-form.tsx +++ b/apps/dbagent/src/components/monitoring/schedule-form.tsx @@ -30,13 +30,8 @@ import { useEffect, useState } from 'react'; import { useForm } from 'react-hook-form'; import * as z from 'zod'; import { DbConnection } from '~/lib/db/connections'; -import { - actionCreateSchedule, - actionDeleteSchedule, - actionGetSchedule, - actionUpdateSchedule, - Schedule -} from './actions'; +import { Schedule } from '~/lib/db/schedules'; +import { actionCreateSchedule, actionDeleteSchedule, actionGetSchedule, actionUpdateSchedule } from './actions'; import { CronExpressionModal } from './cron-expression-modal'; const formSchema = z.object({ @@ -107,7 +102,8 @@ export function ScheduleForm({ additionalInstructions: data.additionalInstructions, minInterval: Number(data.minInterval), maxInterval: Number(data.maxInterval), - enabled: data.enabled + enabled: data.enabled, + status: data.enabled ? 'scheduled' : 'disabled' }; if (isEditMode) { await actionUpdateSchedule(schedule); diff --git a/apps/dbagent/src/components/monitoring/schedule.ts b/apps/dbagent/src/components/monitoring/schedule.ts new file mode 100644 index 00000000..91c7a8be --- /dev/null +++ b/apps/dbagent/src/components/monitoring/schedule.ts @@ -0,0 +1,35 @@ +import { CronExpressionParser } from 'cron-parser'; + +export type Schedule = { + id: string; + connectionId: string; + playbook: string; + scheduleType: string; + cronExpression?: string; + additionalInstructions?: string; + minInterval?: number; + maxInterval?: number; + lastRun?: string; + nextRun?: string; + failures?: number; + status: 'disabled' | 'scheduled' | 'running'; + enabled: boolean; +}; + +export function shouldRunSchedule(schedule: Schedule, now: Date): boolean { + if (schedule.enabled === false || schedule.nextRun === undefined) return false; + return now >= new Date(schedule.nextRun); +} + +export function scheduleGetNextRun(schedule: Schedule, now: Date): Date { + if (schedule.scheduleType === 'cron' && schedule.cronExpression) { + const interval = CronExpressionParser.parse(schedule.cronExpression); + return interval.next().toDate(); + } + if (schedule.scheduleType === 'automatic' && schedule.minInterval) { + // TODO ask the model to get the interval + const nextRun = new Date(now.getTime() + schedule.minInterval * 1000); + return nextRun; + } + return now; +} diff --git a/apps/dbagent/src/components/monitoring/schedules-table.tsx b/apps/dbagent/src/components/monitoring/schedules-table.tsx index af227a01..6304eab0 100644 --- a/apps/dbagent/src/components/monitoring/schedules-table.tsx +++ b/apps/dbagent/src/components/monitoring/schedules-table.tsx @@ -1,25 +1,50 @@ 'use client'; import { Button, Switch, Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from '@internal/components'; -import { PencilIcon, PlusIcon } from 'lucide-react'; +import { PencilIcon, PlusIcon, RefreshCwIcon } from 'lucide-react'; import Link from 'next/link'; import { useEffect, useState } from 'react'; import { DbConnection } from '~/lib/db/connections'; -import { actionGetSchedules, actionUpdateScheduleEnabled, Schedule } from './actions'; +import { Schedule } from '~/lib/db/schedules'; +import { actionGetSchedules, actionUpdateScheduleEnabled } from './actions'; + +function displayRelativeTime(date1: Date, date2: Date): string { + const diff = date2.getTime() - date1.getTime(); + const diffSeconds = Math.floor(diff / 1000); + if (diffSeconds < 60) { + return `${diffSeconds}s`; + } + const diffMinutes = Math.floor(diff / (1000 * 60)); + if (diffMinutes < 60) { + return `${diffMinutes}m`; + } + if (diffMinutes < 1440) { + return `${Math.floor(diffMinutes / 60)}h`; + } + return `${Math.floor(diffMinutes / 1440)}d`; +} export function MonitoringScheduleTable({ connections }: { connections: DbConnection[] }) { const [schedules, setSchedules] = useState([]); const [isLoading, setIsLoading] = useState(true); + const loadSchedules = async () => { + setIsLoading(true); + try { + const schedules = await actionGetSchedules(); + // Sort schedules by ID to maintain stable order + setSchedules(schedules.sort((a, b) => String(a.id || '').localeCompare(String(b.id || '')))); + } finally { + setIsLoading(false); + } + }; + + const refreshSchedules = async () => { + const schedules = await actionGetSchedules(); + setSchedules(schedules.sort((a, b) => String(a.id || '').localeCompare(String(b.id || '')))); + }; + useEffect(() => { - const loadSchedules = async () => { - try { - const schedules = await actionGetSchedules(); - setSchedules(schedules); - } finally { - setIsLoading(false); - } - }; void loadSchedules(); }, []); @@ -27,7 +52,8 @@ export function MonitoringScheduleTable({ connections }: { connections: DbConnec await actionUpdateScheduleEnabled(scheduleId, enabled); // Refresh the schedules list const updatedSchedules = await actionGetSchedules(); - setSchedules(updatedSchedules); + // Sort schedules by ID to maintain stable order + setSchedules(updatedSchedules.sort((a, b) => String(a.id || '').localeCompare(String(b.id || '')))); }; const SkeletonRow = () => ( @@ -60,7 +86,10 @@ export function MonitoringScheduleTable({ connections }: { connections: DbConnec

Monitoring Schedules

-
+
+