From b92195427abffda02de94fab8869eba409ce3701 Mon Sep 17 00:00:00 2001 From: Phil Winder Date: Tue, 2 May 2023 22:13:55 +0100 Subject: [PATCH] chore(queue): add second, higher priority queue for user requests * too difficult to create a priority queue that was thread safe so decided to ignore it * align trigger timings so we don't re-process the same cids --- cmd/run/run.go | 22 +------------- cmd/serve.go | 23 ++++++++++----- pkg/api/api.go | 11 +++++-- pkg/config/appConfig.go | 13 ++++++-- pkg/dag/dag.go | 20 +++++++++++++ pkg/item/item.go | 5 ++-- pkg/item/queue_repository.go | 49 +++++++++++++++++++++++-------- pkg/item/queue_repository_test.go | 6 ++-- pkg/triggers/triggers.go | 2 +- 9 files changed, 100 insertions(+), 51 deletions(-) diff --git a/cmd/run/run.go b/cmd/run/run.go index c51e1b5..a926a6b 100644 --- a/cmd/run/run.go +++ b/cmd/run/run.go @@ -88,7 +88,7 @@ func createRunCommand(appContext cli.AppContext) runEFunc { if err != nil { return err } - finished, err = allChildNodesFinished(ctx, rep.Children) + finished, err = dag.AllNodesFinished(ctx, rep.Children) if err != nil { return err } @@ -105,23 +105,3 @@ func createRunCommand(appContext cli.AppContext) runEFunc { return nil } } - -func allChildNodesFinished(ctx context.Context, children []dag.Node[dag.IOSpec]) (bool, error) { - for _, child := range children { - rep, err := child.Get(ctx) - if err != nil { - return false, err - } - if rep.Metadata.EndedAt.IsZero() { - return false, nil - } - finished, err := allChildNodesFinished(ctx, rep.Children) - if err != nil { - return false, err - } - if !finished { - return false, nil - } - } - return true, nil -} diff --git a/cmd/serve.go b/cmd/serve.go index 4315fa6..0caa1df 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -58,14 +58,23 @@ func executeServeCommand(appContext cli.AppContext) runEFunc { }, } - // DAG Queue - log.Ctx(ctx).Info().Int("concurrency", appContext.Config.WorkflowConcurrency).Int("max-waiting", appContext.Config.MaxWaitingWorkflows).Msg("Starting DAG queue") - dagQueue, err := queue.NewGenericQueue(ctx, appContext.Config.WorkflowConcurrency, appContext.Config.MaxWaitingWorkflows) + // Priority User Queue + log.Ctx(ctx).Info().Int("concurrency", appContext.Config.WorkflowConcurrency).Int("max-waiting", appContext.Config.MaxWaitingWorkflows).Msg("Starting priority queue") + priorityQueue, err := queue.NewGenericQueue(ctx, appContext.Config.WorkflowConcurrency, appContext.Config.MaxWaitingWorkflows) if err != nil { return err } - dagQueue.Start() - defer dagQueue.Stop() + priorityQueue.Start() + defer priorityQueue.Stop() + + // Trigger Queue + log.Ctx(ctx).Info().Int("concurrency", appContext.Config.WorkflowConcurrency).Int("max-waiting", appContext.Config.MaxWaitingWorkflows).Msg("Starting Trigger queue") + triggerQueue, err := queue.NewGenericQueue(ctx, appContext.Config.WorkflowConcurrency, appContext.Config.MaxWaitingWorkflows) + if err != nil { + return err + } + triggerQueue.Start() + defer triggerQueue.Stop() // Node Queue log.Ctx(ctx).Info().Int("concurrency", appContext.Config.NodeConcurrency).Msg("Starting node queue") @@ -123,7 +132,7 @@ func executeServeCommand(appContext cli.AppContext) runEFunc { } // QueueRepository interacts with the queue - queueRepository, err := item.NewQueueRepository(itemStore, dagQueue, taskFactory, nodeExecutor) + queueRepository, err := item.NewQueueRepository(itemStore, priorityQueue, triggerQueue, taskFactory, nodeExecutor) if err != nil { return err } @@ -139,7 +148,7 @@ func executeServeCommand(appContext cli.AppContext) runEFunc { if appContext.Config.Trigger.IPFSSearch.Enabled { t := triggers.IPFSSearchTrigger{ URL: appContext.Config.Trigger.IPFSSearch.QueryURL, - Period: 30 * time.Second, + Period: appContext.Config.Trigger.IPFSSearch.Period, } go func() { if err := t.Start(ctx, cidChan); err != nil { diff --git a/pkg/api/api.go b/pkg/api/api.go index 5722516..21ae49e 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -318,7 +318,11 @@ func (a *amplifyAPI) PostV0Queue(w http.ResponseWriter, r *http.Request) { } func (a *amplifyAPI) createExecution(ctx context.Context, w http.ResponseWriter, executionID uuid.UUID, cid string) { - err := a.CreateExecution(ctx, executionID, cid) + err := a.er.Create(ctx, item.ItemParams{ + ID: executionID, + CID: cid, + Priority: true, + }) if err != nil { if err == queue.ErrQueueFull { sendError(ctx, w, http.StatusTooManyRequests, "Queue full", err.Error()) @@ -333,8 +337,9 @@ func (a *amplifyAPI) createExecution(ctx context.Context, w http.ResponseWriter, func (a *amplifyAPI) CreateExecution(ctx context.Context, executionID uuid.UUID, cid string) error { return a.er.Create(ctx, item.ItemParams{ - ID: executionID, - CID: cid, + ID: executionID, + CID: cid, + Priority: false, }) } diff --git a/pkg/config/appConfig.go b/pkg/config/appConfig.go index 9ce777b..6fe569c 100644 --- a/pkg/config/appConfig.go +++ b/pkg/config/appConfig.go @@ -4,6 +4,7 @@ import ( "fmt" "path/filepath" "strings" + "time" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -24,6 +25,7 @@ const ( PortFlag = "port" IPFSSearchEnabledFlag = "trigger.ipfs-search.enabled" IPFSSearchQueryURLFlag = "trigger.ipfs-search.query-url" + IPFSSearchPeriodFlag = "trigger.ipfs-search.period" DBURIFlag = "db.uri" NumConcurrentNodesFlag = "num-concurrent-nodes" NumConcurrentWorkflowsFlag = "num-concurrent-workflows" @@ -52,8 +54,9 @@ type DB struct { } type IPFSSearch struct { - Enabled bool `yaml:"enabled"` - QueryURL string `yaml:"query-url"` + Enabled bool `yaml:"enabled"` + QueryURL string `yaml:"query-url"` + Period time.Duration `yaml:"period"` } func ParseAppConfig(cmd *cobra.Command) *AppConfig { @@ -90,6 +93,10 @@ func ParseAppConfig(cmd *cobra.Command) *AppConfig { if err != nil { log.Ctx(ctx).Fatal().Err(err).Str("flag", IPFSSearchEnabledFlag).Msg("Failed to parse") } + ipfsSearchPeriod, err := cmd.Flags().GetDuration(IPFSSearchPeriodFlag) + if err != nil { + log.Ctx(ctx).Fatal().Err(err).Str("flag", IPFSSearchPeriodFlag).Msg("Failed to parse") + } return &AppConfig{ LogLevel: logLevel, ConfigPath: cmd.Flag(ConfigPathFlag).Value.String(), @@ -98,6 +105,7 @@ func ParseAppConfig(cmd *cobra.Command) *AppConfig { IPFSSearch: IPFSSearch{ Enabled: ipfsSearchEnabled, QueryURL: cmd.Flag(IPFSSearchQueryURLFlag).Value.String(), + Period: ipfsSearchPeriod, }, }, DB: DB{ @@ -117,6 +125,7 @@ func AddGlobalFlags(cmd *cobra.Command) { cmd.PersistentFlags().Int(PortFlag, 8080, "Port to listen on") cmd.PersistentFlags().Bool(IPFSSearchEnabledFlag, false, "Enable IPFS-Search trigger") cmd.PersistentFlags().String(IPFSSearchQueryURLFlag, "https://api.ipfs-search.com/v1/search?q=first-seen%3A%3Enow-5m&page=0", "Query URL for IPFS-Search") + cmd.PersistentFlags().Duration(IPFSSearchPeriodFlag, 5*time.Minute, "Query URL for IPFS-Search") cmd.PersistentFlags().String(DBURIFlag, "", "Database URI (blank for in-memory)") cmd.PersistentFlags().Int(NumConcurrentNodesFlag, 10, "Number of concurrent nodes to run at one time") cmd.PersistentFlags().Int(NumConcurrentWorkflowsFlag, 10, "Number of concurrent workflows to run at one time") diff --git a/pkg/dag/dag.go b/pkg/dag/dag.go index 879b3c1..66c7e1a 100644 --- a/pkg/dag/dag.go +++ b/pkg/dag/dag.go @@ -32,3 +32,23 @@ func NodeMapToList[T any](dags map[T]Node[IOSpec]) (nodes []Node[IOSpec]) { } return } + +func AllNodesFinished(ctx context.Context, nodes []Node[IOSpec]) (bool, error) { + for _, child := range nodes { + rep, err := child.Get(ctx) + if err != nil { + return false, err + } + if rep.Metadata.EndedAt.IsZero() { + return false, nil + } + finished, err := AllNodesFinished(ctx, rep.Children) + if err != nil { + return false, err + } + if !finished { + return false, nil + } + } + return true, nil +} diff --git a/pkg/item/item.go b/pkg/item/item.go index 167beaf..0016cc8 100644 --- a/pkg/item/item.go +++ b/pkg/item/item.go @@ -21,6 +21,7 @@ type ItemMetadata struct { } type ItemParams struct { - ID uuid.UUID - CID string + ID uuid.UUID + CID string + Priority bool } diff --git a/pkg/item/queue_repository.go b/pkg/item/queue_repository.go index f8514bd..4ffa2c0 100644 --- a/pkg/item/queue_repository.go +++ b/pkg/item/queue_repository.go @@ -3,11 +3,13 @@ package item import ( "context" "fmt" + "time" "github.com/bacalhau-project/amplify/pkg/dag" "github.com/bacalhau-project/amplify/pkg/queue" "github.com/bacalhau-project/amplify/pkg/task" "github.com/google/uuid" + "github.com/rs/zerolog/log" ) var ( @@ -24,21 +26,23 @@ type QueueRepository interface { } type queueRepository struct { - repo ItemStore - tf task.TaskFactory - queue queue.Queue - nodeExecutor dag.NodeExecutor[dag.IOSpec] + repo ItemStore + tf task.TaskFactory + priorityQueue queue.Queue + secondaryQueue queue.Queue + nodeExecutor dag.NodeExecutor[dag.IOSpec] } -func NewQueueRepository(repo ItemStore, queue queue.Queue, taskFactory task.TaskFactory, nodeExecutor dag.NodeExecutor[dag.IOSpec]) (QueueRepository, error) { - if repo == nil || taskFactory == nil || queue == nil || nodeExecutor == nil { +func NewQueueRepository(repo ItemStore, priorityQueue queue.Queue, secondaryQueue queue.Queue, taskFactory task.TaskFactory, nodeExecutor dag.NodeExecutor[dag.IOSpec]) (QueueRepository, error) { + if repo == nil || taskFactory == nil || secondaryQueue == nil || priorityQueue == nil || nodeExecutor == nil { return nil, fmt.Errorf("missing dependencies") } return &queueRepository{ - repo: repo, - tf: taskFactory, - queue: queue, - nodeExecutor: nodeExecutor, + repo: repo, + tf: taskFactory, + priorityQueue: priorityQueue, + secondaryQueue: secondaryQueue, + nodeExecutor: nodeExecutor, }, nil } @@ -49,7 +53,11 @@ func (r *queueRepository) Create(ctx context.Context, req ItemParams) error { if req.CID == "" { return ErrInvalidRequestCID } - if r.queue.IsFull() { + q := r.secondaryQueue + if req.Priority { + q = r.priorityQueue + } + if q.IsFull() { return queue.ErrQueueFull } err := r.repo.NewItem(ctx, req) @@ -61,9 +69,26 @@ func (r *queueRepository) Create(ctx context.Context, req ItemParams) error { return err } for _, node := range dags { - err := r.queue.Enqueue(func(ctx context.Context) { + err := q.Enqueue(func(ctx context.Context) { // Execute the node r.nodeExecutor.Execute(ctx, req.ID, node) + // Wait until all the nodes in the dag are completed + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + finished, err := dag.AllNodesFinished(ctx, []dag.Node[dag.IOSpec]{node}) + if err != nil { + log.Ctx(ctx).Error().Err(err).Msg("error checking if all nodes are finished") + return + } + if finished { + log.Ctx(ctx).Info().Msg("all nodes are finished") + return + } + } + } }) if err != nil { return err diff --git a/pkg/item/queue_repository_test.go b/pkg/item/queue_repository_test.go index beabb95..f9a25c4 100644 --- a/pkg/item/queue_repository_test.go +++ b/pkg/item/queue_repository_test.go @@ -17,7 +17,7 @@ func Test_QueueRepository_Create(t *testing.T) { persistence := db.NewInMemDB() itemStore := newMockItemStore(persistence) executor, _ := dag.NewNodeExecutor[dag.IOSpec](context.Background(), nil) - repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor) + repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor) assert.NilError(t, err) tests := []struct { name string @@ -41,7 +41,7 @@ func Test_QueueRepository_Get(t *testing.T) { persistence := db.NewInMemDB() itemStore := newMockItemStore(persistence) executor, _ := dag.NewNodeExecutor[dag.IOSpec](context.Background(), nil) - repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor) + repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor) assert.NilError(t, err) id := uuid.New() err = repo.Create(context.Background(), ItemParams{ID: id, CID: "cid"}) @@ -69,7 +69,7 @@ func Test_QueueRepository_List(t *testing.T) { persistence := db.NewInMemDB() itemStore := newMockItemStore(persistence) executor, _ := dag.NewNodeExecutor[dag.IOSpec](context.Background(), nil) - repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor) + repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor) assert.NilError(t, err) id1 := uuid.New() err = repo.Create(context.Background(), ItemParams{ID: id1, CID: "cid"}) diff --git a/pkg/triggers/triggers.go b/pkg/triggers/triggers.go index d33a485..0bdd5f0 100644 --- a/pkg/triggers/triggers.go +++ b/pkg/triggers/triggers.go @@ -52,7 +52,7 @@ func (t *IPFSSearchTrigger) Start(ctx context.Context, cidChan chan cid.Cid) err for _, c := range cids { cidChan <- c } - log.Ctx(ctx).Debug().Msg("Sleeping for 1 minute before fetching IPFS Search again") + log.Ctx(ctx).Debug().Msg("Sleeping before fetching IPFS Search again") } } }