From 83dc63d29131621c27323928981a9a416c6e87a5 Mon Sep 17 00:00:00 2001 From: James Norton Date: Fri, 17 Jan 2025 09:35:03 -0500 Subject: [PATCH] HARMONY-1974: Implement work reaper and work failer (started) in go --- core-services/cmd/main.go | 13 ++--- .../cronjobs/workfailer/env-defaults.go | 15 +++++ .../internal/cronjobs/workfailer/env.go | 57 +++++++++++++++++++ .../cronjobs/workfailer/workfailer.go | 16 ++++++ .../cronjobs/workreaper/env-default.go | 10 ++++ .../internal/cronjobs/workreaper/env.go | 51 +++++++++++++++++ .../cronjobs/workreaper/workreaper.go | 27 ++------- core-services/internal/env/env.go | 7 ++- 8 files changed, 165 insertions(+), 31 deletions(-) create mode 100644 core-services/internal/cronjobs/workfailer/env-defaults.go create mode 100644 core-services/internal/cronjobs/workfailer/env.go create mode 100644 core-services/internal/cronjobs/workfailer/workfailer.go create mode 100644 core-services/internal/cronjobs/workreaper/env-default.go create mode 100644 core-services/internal/cronjobs/workreaper/env.go diff --git a/core-services/cmd/main.go b/core-services/cmd/main.go index 372f933bf..12734e491 100644 --- a/core-services/cmd/main.go +++ b/core-services/cmd/main.go @@ -21,18 +21,13 @@ import ( "github.com/robfig/cron/v3" _ "github.com/mattn/go-sqlite3" - - "github.com/go-playground/validator/v10" ) const PLUGIN_RESTART_DELAY = 5 * time.Second -// use a single instance of Validate, it caches struct info -var validate *validator.Validate - func main() { - // TODO is this better than using an implicitly called `init` function? - env.InitEnvVars() + baseEnv := env.InitEnvVars() + workReaperEnv := workreaper.InitEnv(baseEnv) logger := logs.NewLogger() @@ -52,7 +47,9 @@ func main() { cron.Recover(lgr), cron.SkipIfStillRunning(cron.DefaultLogger), )) - crn.AddFunc("@every 30s", func() { workreaper.DeleteOldWork(ctx) }) + crn.AddFunc(workReaperEnv.WorkReaperCron, func() { + workreaper.DeleteOldWork(ctx, workReaperEnv) + }) crn.AddFunc("* * * * *", func() { logger.Info("Every minute") time.Sleep(10 * time.Second) diff --git a/core-services/internal/cronjobs/workfailer/env-defaults.go b/core-services/internal/cronjobs/workfailer/env-defaults.go new file mode 100644 index 000000000..1b7b70db7 --- /dev/null +++ b/core-services/internal/cronjobs/workfailer/env-defaults.go @@ -0,0 +1,15 @@ +package workfailer + +// The cron entry for scheduling the work reaper +const WORK_FAILER_CRON = "@every 300s" + +// WorkItems that have not been updated for more than this many minutes are +// updated by the work failer (resulting either in job and work item failure or a retry) +const FAILABLE_WORK_AGE_MINUTES = 5 + +// The batch size used by work-failer. Set it to 0 will effectively disable work-failer. +const WORK_FAILER_BATCH_SIZE = 1000 + +// Maximum number of work items allowed on the work item update queue before halting failing work items +// Set the value to -1 to always fail work items +const MAX_WORK_ITEMS_ON_UPDATE_QUEUE_FAILER = 1000 diff --git a/core-services/internal/cronjobs/workfailer/env.go b/core-services/internal/cronjobs/workfailer/env.go new file mode 100644 index 000000000..9a368ac31 --- /dev/null +++ b/core-services/internal/cronjobs/workfailer/env.go @@ -0,0 +1,57 @@ +// Environment variables for the work failer +package workfailer + +import ( + "fmt" + "os" + "strconv" + + "github.com/go-playground/validator/v10" + "github.com/nasa/harmony/core-services/internal/env" +) + +type WorkFailerEnv struct { + env.HarmonyEnv + WorkFailerCron string `validate:"required,cron"` + WorkFailerBatchSize int `validate:"gte=1"` + FailableWorkAgeMinutes int `validate:"gte=1"` + MaxWorkItemsOnUpdateQueueFailer int `validate:"gte1"` +} + +func InitEnv(baseEnv env.HarmonyEnv) WorkFailerEnv { + cronEntry := os.Getenv("WORK_FAILER_CRON") + if cronEntry == "" { + cronEntry = WORK_FAILER_CRON + } + + batchSize, err := strconv.Atoi(os.Getenv("WORK_FAILER_BATCH_SIZE")) + if err != nil { + batchSize = WORK_FAILER_BATCH_SIZE + } + failableWorkAgeMinutes, err := strconv.Atoi(os.Getenv("FAILABLE_WORK_AGE_MINUTES")) + if err != nil { + failableWorkAgeMinutes = FAILABLE_WORK_AGE_MINUTES + } + maxWorkItemsOnUpdateQueueFailer, err := strconv.Atoi(os.Getenv("MAX_WORK_ITEMS_ON_UPDATE_QUEUE_FAILER")) + if err != nil { + maxWorkItemsOnUpdateQueueFailer = MAX_WORK_ITEMS_ON_UPDATE_QUEUE_FAILER + } + + env := WorkFailerEnv{ + baseEnv, + cronEntry, + batchSize, + failableWorkAgeMinutes, + maxWorkItemsOnUpdateQueueFailer, + } + + validate := validator.New(validator.WithRequiredStructEnabled()) + err = validate.Struct(env) + if err != nil { + fmt.Println("Invalid work failer env vars") + panic(err) + } + + return env + +} diff --git a/core-services/internal/cronjobs/workfailer/workfailer.go b/core-services/internal/cronjobs/workfailer/workfailer.go new file mode 100644 index 000000000..ee4514a49 --- /dev/null +++ b/core-services/internal/cronjobs/workfailer/workfailer.go @@ -0,0 +1,16 @@ +package workfailer + +// import ( +// "context" +// _ "fmt" +// _ "os" +// _ "strconv" + +// "github.com/nasa/harmony/core-services/internal/db" +// logs "github.com/nasa/harmony/core-services/internal/log" +// "github.com/nasa/harmony/core-services/internal/models/job" +// ) + +// func getExpiredWorkItems(ctx context.Context) { + +// } diff --git a/core-services/internal/cronjobs/workreaper/env-default.go b/core-services/internal/cronjobs/workreaper/env-default.go new file mode 100644 index 000000000..5f45678b0 --- /dev/null +++ b/core-services/internal/cronjobs/workreaper/env-default.go @@ -0,0 +1,10 @@ +package workreaper + +// The cron entry for scheduling the work reaper +const WORK_REAPER_CRON = "@every 30s" + +// WorkItems and WorkflowSteps (in a terminal state) older than this many minutes are checked by the work reaper +const REAPABLE_WORK_AGE_MINUTES = 20160 + +// The batch size of the work reaper service +const WORK_REAPER_BATCH_SIZE = 2000 diff --git a/core-services/internal/cronjobs/workreaper/env.go b/core-services/internal/cronjobs/workreaper/env.go new file mode 100644 index 000000000..04de105ec --- /dev/null +++ b/core-services/internal/cronjobs/workreaper/env.go @@ -0,0 +1,51 @@ +// Environment variables for the work reaper +package workreaper + +import ( + "fmt" + "os" + "strconv" + + "github.com/go-playground/validator/v10" + "github.com/nasa/harmony/core-services/internal/env" +) + +type WorkReaperEnv struct { + env.HarmonyEnv + WorkReaperCron string `validate:"required,cron"` + WorkReaperBatchSize int `validate:"gte=1"` + ReapableWorkAgeMinutes int `validate:"gte=1"` +} + +func InitEnv(baseEnv env.HarmonyEnv) WorkReaperEnv { + cronEntry := os.Getenv("WORK_REAPER_CRON") + if cronEntry == "" { + cronEntry = WORK_REAPER_CRON + } + + batchSize, err := strconv.Atoi(os.Getenv("WORK_REAPER_BATCH_SIZE")) + if err != nil { + batchSize = WORK_REAPER_BATCH_SIZE + } + reapableWorkAgeMinutes, err := strconv.Atoi(os.Getenv("REAPABLE_WORK_AGE_MINUTES")) + if err != nil { + reapableWorkAgeMinutes = REAPABLE_WORK_AGE_MINUTES + } + + env := WorkReaperEnv{ + baseEnv, + cronEntry, + batchSize, + reapableWorkAgeMinutes, + } + + validate := validator.New(validator.WithRequiredStructEnabled()) + err = validate.Struct(env) + if err != nil { + fmt.Println("Invalid work reaper env vars") + panic(err) + } + + return env + +} diff --git a/core-services/internal/cronjobs/workreaper/workreaper.go b/core-services/internal/cronjobs/workreaper/workreaper.go index ab79c4e0a..3069c4c93 100644 --- a/core-services/internal/cronjobs/workreaper/workreaper.go +++ b/core-services/internal/cronjobs/workreaper/workreaper.go @@ -4,23 +4,16 @@ package workreaper import ( "context" "fmt" - "os" - "strconv" "github.com/nasa/harmony/core-services/internal/db" logs "github.com/nasa/harmony/core-services/internal/log" "github.com/nasa/harmony/core-services/internal/models/job" ) -func deleteTerminalWorkItems(ctx context.Context, notUpdatedForMinutes int, jobStatus []job.JobStatus) { +func deleteTerminalWorkItems(ctx context.Context, notUpdatedForMinutes int, jobStatus []job.JobStatus, batchSize int) { var done = false var startingId = 0 var totalDeleted int64 = 0 - batchSize, err := strconv.Atoi(os.Getenv("WORK_REAPER_BATCH_SIZE")) - if err != nil { - // use default batch size - batchSize = 100 - } logger := logs.GetLoggerForContext(ctx) @@ -48,15 +41,10 @@ func deleteTerminalWorkItems(ctx context.Context, notUpdatedForMinutes int, jobS logger.Info(fmt.Sprintf("Done deleting work-items. Total work-items deleted: %d", totalDeleted)) } -func deleteTerminalWorkflowSteps(ctx context.Context, notUpdatedForMinutes int, jobStatus []job.JobStatus) { +func deleteTerminalWorkflowSteps(ctx context.Context, notUpdatedForMinutes int, jobStatus []job.JobStatus, batchSize int) { var done = false var startingId = 0 var totalDeleted int64 = 0 - batchSize, err := strconv.Atoi(os.Getenv("WORK_REAPER_BATCH_SIZE")) - if err != nil { - // use default batch size - batchSize = 100 - } logger := logs.GetLoggerForContext(ctx) @@ -84,13 +72,8 @@ func deleteTerminalWorkflowSteps(ctx context.Context, notUpdatedForMinutes int, logger.Info(fmt.Sprintf("Done deleting workflow steps. Total workflow steps deleted: %d", totalDeleted)) } -func DeleteOldWork(ctx context.Context) { - reapableWOrkAgeMinutes, err := strconv.Atoi(os.Getenv("REAPABLE_WORK_AGE_MINUTES")) - if err != nil { - // TODO this is just set to 1 for dev purposes - it should have a better default - reapableWOrkAgeMinutes = 1 - } - deleteTerminalWorkItems(ctx, reapableWOrkAgeMinutes, job.TerminalStatuses) - deleteTerminalWorkflowSteps(ctx, reapableWOrkAgeMinutes, job.TerminalStatuses) +func DeleteOldWork(ctx context.Context, env WorkReaperEnv) { + deleteTerminalWorkItems(ctx, env.ReapableWorkAgeMinutes, job.TerminalStatuses, env.WorkReaperBatchSize) + deleteTerminalWorkflowSteps(ctx, env.ReapableWorkAgeMinutes, job.TerminalStatuses, env.WorkReaperBatchSize) } diff --git a/core-services/internal/env/env.go b/core-services/internal/env/env.go index 7e5062547..06bda29aa 100644 --- a/core-services/internal/env/env.go +++ b/core-services/internal/env/env.go @@ -7,7 +7,10 @@ import ( "github.com/joho/godotenv" ) -func InitEnvVars() { +type HarmonyEnv struct { +} + +func InitEnvVars() HarmonyEnv { // read the env from .env err := godotenv.Load() if err != nil { @@ -17,4 +20,6 @@ func InitEnvVars() { log.Fatal("Error loading .env file") } } + + return HarmonyEnv{} }