Skip to content

Commit

Permalink
HARMONY-1974: Implement work reaper and work failer (started) in go
Browse files Browse the repository at this point in the history
  • Loading branch information
indiejames committed Jan 17, 2025
1 parent 9ce2b1e commit 83dc63d
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 31 deletions.
13 changes: 5 additions & 8 deletions core-services/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@ import (
"github.com/robfig/cron/v3"

_ "github.com/mattn/go-sqlite3"

"github.com/go-playground/validator/v10"
)

const PLUGIN_RESTART_DELAY = 5 * time.Second

// use a single instance of Validate, it caches struct info
var validate *validator.Validate

func main() {
// TODO is this better than using an implicitly called `init` function?
env.InitEnvVars()
baseEnv := env.InitEnvVars()
workReaperEnv := workreaper.InitEnv(baseEnv)

logger := logs.NewLogger()

Expand All @@ -52,7 +47,9 @@ func main() {
cron.Recover(lgr),
cron.SkipIfStillRunning(cron.DefaultLogger),
))
crn.AddFunc("@every 30s", func() { workreaper.DeleteOldWork(ctx) })
crn.AddFunc(workReaperEnv.WorkReaperCron, func() {
workreaper.DeleteOldWork(ctx, workReaperEnv)
})
crn.AddFunc("* * * * *", func() {
logger.Info("Every minute")
time.Sleep(10 * time.Second)
Expand Down
15 changes: 15 additions & 0 deletions core-services/internal/cronjobs/workfailer/env-defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package workfailer

// The cron entry for scheduling the work reaper
const WORK_FAILER_CRON = "@every 300s"

// WorkItems that have not been updated for more than this many minutes are
// updated by the work failer (resulting either in job and work item failure or a retry)
const FAILABLE_WORK_AGE_MINUTES = 5

// The batch size used by work-failer. Set it to 0 will effectively disable work-failer.
const WORK_FAILER_BATCH_SIZE = 1000

// Maximum number of work items allowed on the work item update queue before halting failing work items
// Set the value to -1 to always fail work items
const MAX_WORK_ITEMS_ON_UPDATE_QUEUE_FAILER = 1000
57 changes: 57 additions & 0 deletions core-services/internal/cronjobs/workfailer/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Environment variables for the work failer
package workfailer

import (
"fmt"
"os"
"strconv"

"github.com/go-playground/validator/v10"
"github.com/nasa/harmony/core-services/internal/env"
)

type WorkFailerEnv struct {
env.HarmonyEnv
WorkFailerCron string `validate:"required,cron"`
WorkFailerBatchSize int `validate:"gte=1"`
FailableWorkAgeMinutes int `validate:"gte=1"`
MaxWorkItemsOnUpdateQueueFailer int `validate:"gte1"`
}

func InitEnv(baseEnv env.HarmonyEnv) WorkFailerEnv {
cronEntry := os.Getenv("WORK_FAILER_CRON")
if cronEntry == "" {
cronEntry = WORK_FAILER_CRON
}

batchSize, err := strconv.Atoi(os.Getenv("WORK_FAILER_BATCH_SIZE"))
if err != nil {
batchSize = WORK_FAILER_BATCH_SIZE
}
failableWorkAgeMinutes, err := strconv.Atoi(os.Getenv("FAILABLE_WORK_AGE_MINUTES"))
if err != nil {
failableWorkAgeMinutes = FAILABLE_WORK_AGE_MINUTES
}
maxWorkItemsOnUpdateQueueFailer, err := strconv.Atoi(os.Getenv("MAX_WORK_ITEMS_ON_UPDATE_QUEUE_FAILER"))
if err != nil {
maxWorkItemsOnUpdateQueueFailer = MAX_WORK_ITEMS_ON_UPDATE_QUEUE_FAILER
}

env := WorkFailerEnv{
baseEnv,
cronEntry,
batchSize,
failableWorkAgeMinutes,
maxWorkItemsOnUpdateQueueFailer,
}

validate := validator.New(validator.WithRequiredStructEnabled())
err = validate.Struct(env)
if err != nil {
fmt.Println("Invalid work failer env vars")
panic(err)
}

return env

}
16 changes: 16 additions & 0 deletions core-services/internal/cronjobs/workfailer/workfailer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package workfailer

// import (
// "context"
// _ "fmt"
// _ "os"
// _ "strconv"

// "github.com/nasa/harmony/core-services/internal/db"
// logs "github.com/nasa/harmony/core-services/internal/log"
// "github.com/nasa/harmony/core-services/internal/models/job"
// )

// func getExpiredWorkItems(ctx context.Context) {

// }
10 changes: 10 additions & 0 deletions core-services/internal/cronjobs/workreaper/env-default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package workreaper

// The cron entry for scheduling the work reaper
const WORK_REAPER_CRON = "@every 30s"

// WorkItems and WorkflowSteps (in a terminal state) older than this many minutes are checked by the work reaper
const REAPABLE_WORK_AGE_MINUTES = 20160

// The batch size of the work reaper service
const WORK_REAPER_BATCH_SIZE = 2000
51 changes: 51 additions & 0 deletions core-services/internal/cronjobs/workreaper/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Environment variables for the work reaper
package workreaper

import (
"fmt"
"os"
"strconv"

"github.com/go-playground/validator/v10"
"github.com/nasa/harmony/core-services/internal/env"
)

type WorkReaperEnv struct {
env.HarmonyEnv
WorkReaperCron string `validate:"required,cron"`
WorkReaperBatchSize int `validate:"gte=1"`
ReapableWorkAgeMinutes int `validate:"gte=1"`
}

func InitEnv(baseEnv env.HarmonyEnv) WorkReaperEnv {
cronEntry := os.Getenv("WORK_REAPER_CRON")
if cronEntry == "" {
cronEntry = WORK_REAPER_CRON
}

batchSize, err := strconv.Atoi(os.Getenv("WORK_REAPER_BATCH_SIZE"))
if err != nil {
batchSize = WORK_REAPER_BATCH_SIZE
}
reapableWorkAgeMinutes, err := strconv.Atoi(os.Getenv("REAPABLE_WORK_AGE_MINUTES"))
if err != nil {
reapableWorkAgeMinutes = REAPABLE_WORK_AGE_MINUTES
}

env := WorkReaperEnv{
baseEnv,
cronEntry,
batchSize,
reapableWorkAgeMinutes,
}

validate := validator.New(validator.WithRequiredStructEnabled())
err = validate.Struct(env)
if err != nil {
fmt.Println("Invalid work reaper env vars")
panic(err)
}

return env

}
27 changes: 5 additions & 22 deletions core-services/internal/cronjobs/workreaper/workreaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,16 @@ package workreaper
import (
"context"
"fmt"
"os"
"strconv"

"github.com/nasa/harmony/core-services/internal/db"
logs "github.com/nasa/harmony/core-services/internal/log"
"github.com/nasa/harmony/core-services/internal/models/job"
)

func deleteTerminalWorkItems(ctx context.Context, notUpdatedForMinutes int, jobStatus []job.JobStatus) {
func deleteTerminalWorkItems(ctx context.Context, notUpdatedForMinutes int, jobStatus []job.JobStatus, batchSize int) {
var done = false
var startingId = 0
var totalDeleted int64 = 0
batchSize, err := strconv.Atoi(os.Getenv("WORK_REAPER_BATCH_SIZE"))
if err != nil {
// use default batch size
batchSize = 100
}

logger := logs.GetLoggerForContext(ctx)

Expand Down Expand Up @@ -48,15 +41,10 @@ func deleteTerminalWorkItems(ctx context.Context, notUpdatedForMinutes int, jobS
logger.Info(fmt.Sprintf("Done deleting work-items. Total work-items deleted: %d", totalDeleted))
}

func deleteTerminalWorkflowSteps(ctx context.Context, notUpdatedForMinutes int, jobStatus []job.JobStatus) {
func deleteTerminalWorkflowSteps(ctx context.Context, notUpdatedForMinutes int, jobStatus []job.JobStatus, batchSize int) {
var done = false
var startingId = 0
var totalDeleted int64 = 0
batchSize, err := strconv.Atoi(os.Getenv("WORK_REAPER_BATCH_SIZE"))
if err != nil {
// use default batch size
batchSize = 100
}

logger := logs.GetLoggerForContext(ctx)

Expand Down Expand Up @@ -84,13 +72,8 @@ func deleteTerminalWorkflowSteps(ctx context.Context, notUpdatedForMinutes int,
logger.Info(fmt.Sprintf("Done deleting workflow steps. Total workflow steps deleted: %d", totalDeleted))
}

func DeleteOldWork(ctx context.Context) {
reapableWOrkAgeMinutes, err := strconv.Atoi(os.Getenv("REAPABLE_WORK_AGE_MINUTES"))
if err != nil {
// TODO this is just set to 1 for dev purposes - it should have a better default
reapableWOrkAgeMinutes = 1
}
deleteTerminalWorkItems(ctx, reapableWOrkAgeMinutes, job.TerminalStatuses)
deleteTerminalWorkflowSteps(ctx, reapableWOrkAgeMinutes, job.TerminalStatuses)
func DeleteOldWork(ctx context.Context, env WorkReaperEnv) {
deleteTerminalWorkItems(ctx, env.ReapableWorkAgeMinutes, job.TerminalStatuses, env.WorkReaperBatchSize)
deleteTerminalWorkflowSteps(ctx, env.ReapableWorkAgeMinutes, job.TerminalStatuses, env.WorkReaperBatchSize)

}
7 changes: 6 additions & 1 deletion core-services/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"github.com/joho/godotenv"
)

func InitEnvVars() {
type HarmonyEnv struct {
}

func InitEnvVars() HarmonyEnv {
// read the env from .env
err := godotenv.Load()
if err != nil {
Expand All @@ -17,4 +20,6 @@ func InitEnvVars() {
log.Fatal("Error loading .env file")
}
}

return HarmonyEnv{}
}

0 comments on commit 83dc63d

Please sign in to comment.