Skip to content

Commit

Permalink
chore(queue): add second, higher priority queue for user requests
Browse files Browse the repository at this point in the history
* too difficult to create a priority queue that was thread safe so decided to ignore it
* align trigger timings so we don't re-process the same cids
  • Loading branch information
philwinder committed May 3, 2023
1 parent b6cb278 commit b921954
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 51 deletions.
22 changes: 1 addition & 21 deletions cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func createRunCommand(appContext cli.AppContext) runEFunc {
if err != nil {
return err
}
finished, err = allChildNodesFinished(ctx, rep.Children)
finished, err = dag.AllNodesFinished(ctx, rep.Children)
if err != nil {
return err
}
Expand All @@ -105,23 +105,3 @@ func createRunCommand(appContext cli.AppContext) runEFunc {
return nil
}
}

func allChildNodesFinished(ctx context.Context, children []dag.Node[dag.IOSpec]) (bool, error) {
for _, child := range children {
rep, err := child.Get(ctx)
if err != nil {
return false, err
}
if rep.Metadata.EndedAt.IsZero() {
return false, nil
}
finished, err := allChildNodesFinished(ctx, rep.Children)
if err != nil {
return false, err
}
if !finished {
return false, nil
}
}
return true, nil
}
23 changes: 16 additions & 7 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,23 @@ func executeServeCommand(appContext cli.AppContext) runEFunc {
},
}

// DAG Queue
log.Ctx(ctx).Info().Int("concurrency", appContext.Config.WorkflowConcurrency).Int("max-waiting", appContext.Config.MaxWaitingWorkflows).Msg("Starting DAG queue")
dagQueue, err := queue.NewGenericQueue(ctx, appContext.Config.WorkflowConcurrency, appContext.Config.MaxWaitingWorkflows)
// Priority User Queue
log.Ctx(ctx).Info().Int("concurrency", appContext.Config.WorkflowConcurrency).Int("max-waiting", appContext.Config.MaxWaitingWorkflows).Msg("Starting priority queue")
priorityQueue, err := queue.NewGenericQueue(ctx, appContext.Config.WorkflowConcurrency, appContext.Config.MaxWaitingWorkflows)
if err != nil {
return err
}
dagQueue.Start()
defer dagQueue.Stop()
priorityQueue.Start()
defer priorityQueue.Stop()

// Trigger Queue
log.Ctx(ctx).Info().Int("concurrency", appContext.Config.WorkflowConcurrency).Int("max-waiting", appContext.Config.MaxWaitingWorkflows).Msg("Starting Trigger queue")
triggerQueue, err := queue.NewGenericQueue(ctx, appContext.Config.WorkflowConcurrency, appContext.Config.MaxWaitingWorkflows)
if err != nil {
return err
}
triggerQueue.Start()
defer triggerQueue.Stop()

// Node Queue
log.Ctx(ctx).Info().Int("concurrency", appContext.Config.NodeConcurrency).Msg("Starting node queue")
Expand Down Expand Up @@ -123,7 +132,7 @@ func executeServeCommand(appContext cli.AppContext) runEFunc {
}

// QueueRepository interacts with the queue
queueRepository, err := item.NewQueueRepository(itemStore, dagQueue, taskFactory, nodeExecutor)
queueRepository, err := item.NewQueueRepository(itemStore, priorityQueue, triggerQueue, taskFactory, nodeExecutor)
if err != nil {
return err
}
Expand All @@ -139,7 +148,7 @@ func executeServeCommand(appContext cli.AppContext) runEFunc {
if appContext.Config.Trigger.IPFSSearch.Enabled {
t := triggers.IPFSSearchTrigger{
URL: appContext.Config.Trigger.IPFSSearch.QueryURL,
Period: 30 * time.Second,
Period: appContext.Config.Trigger.IPFSSearch.Period,
}
go func() {
if err := t.Start(ctx, cidChan); err != nil {
Expand Down
11 changes: 8 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,11 @@ func (a *amplifyAPI) PostV0Queue(w http.ResponseWriter, r *http.Request) {
}

func (a *amplifyAPI) createExecution(ctx context.Context, w http.ResponseWriter, executionID uuid.UUID, cid string) {
err := a.CreateExecution(ctx, executionID, cid)
err := a.er.Create(ctx, item.ItemParams{
ID: executionID,
CID: cid,
Priority: true,
})
if err != nil {
if err == queue.ErrQueueFull {
sendError(ctx, w, http.StatusTooManyRequests, "Queue full", err.Error())
Expand All @@ -333,8 +337,9 @@ func (a *amplifyAPI) createExecution(ctx context.Context, w http.ResponseWriter,

func (a *amplifyAPI) CreateExecution(ctx context.Context, executionID uuid.UUID, cid string) error {
return a.er.Create(ctx, item.ItemParams{
ID: executionID,
CID: cid,
ID: executionID,
CID: cid,
Priority: false,
})
}

Expand Down
13 changes: 11 additions & 2 deletions pkg/config/appConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"path/filepath"
"strings"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand All @@ -24,6 +25,7 @@ const (
PortFlag = "port"
IPFSSearchEnabledFlag = "trigger.ipfs-search.enabled"
IPFSSearchQueryURLFlag = "trigger.ipfs-search.query-url"
IPFSSearchPeriodFlag = "trigger.ipfs-search.period"
DBURIFlag = "db.uri"
NumConcurrentNodesFlag = "num-concurrent-nodes"
NumConcurrentWorkflowsFlag = "num-concurrent-workflows"
Expand Down Expand Up @@ -52,8 +54,9 @@ type DB struct {
}

type IPFSSearch struct {
Enabled bool `yaml:"enabled"`
QueryURL string `yaml:"query-url"`
Enabled bool `yaml:"enabled"`
QueryURL string `yaml:"query-url"`
Period time.Duration `yaml:"period"`
}

func ParseAppConfig(cmd *cobra.Command) *AppConfig {
Expand Down Expand Up @@ -90,6 +93,10 @@ func ParseAppConfig(cmd *cobra.Command) *AppConfig {
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Str("flag", IPFSSearchEnabledFlag).Msg("Failed to parse")
}
ipfsSearchPeriod, err := cmd.Flags().GetDuration(IPFSSearchPeriodFlag)
if err != nil {
log.Ctx(ctx).Fatal().Err(err).Str("flag", IPFSSearchPeriodFlag).Msg("Failed to parse")
}
return &AppConfig{
LogLevel: logLevel,
ConfigPath: cmd.Flag(ConfigPathFlag).Value.String(),
Expand All @@ -98,6 +105,7 @@ func ParseAppConfig(cmd *cobra.Command) *AppConfig {
IPFSSearch: IPFSSearch{
Enabled: ipfsSearchEnabled,
QueryURL: cmd.Flag(IPFSSearchQueryURLFlag).Value.String(),
Period: ipfsSearchPeriod,
},
},
DB: DB{
Expand All @@ -117,6 +125,7 @@ func AddGlobalFlags(cmd *cobra.Command) {
cmd.PersistentFlags().Int(PortFlag, 8080, "Port to listen on")
cmd.PersistentFlags().Bool(IPFSSearchEnabledFlag, false, "Enable IPFS-Search trigger")
cmd.PersistentFlags().String(IPFSSearchQueryURLFlag, "https://api.ipfs-search.com/v1/search?q=first-seen%3A%3Enow-5m&page=0", "Query URL for IPFS-Search")
cmd.PersistentFlags().Duration(IPFSSearchPeriodFlag, 5*time.Minute, "Query URL for IPFS-Search")
cmd.PersistentFlags().String(DBURIFlag, "", "Database URI (blank for in-memory)")
cmd.PersistentFlags().Int(NumConcurrentNodesFlag, 10, "Number of concurrent nodes to run at one time")
cmd.PersistentFlags().Int(NumConcurrentWorkflowsFlag, 10, "Number of concurrent workflows to run at one time")
Expand Down
20 changes: 20 additions & 0 deletions pkg/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,23 @@ func NodeMapToList[T any](dags map[T]Node[IOSpec]) (nodes []Node[IOSpec]) {
}
return
}

func AllNodesFinished(ctx context.Context, nodes []Node[IOSpec]) (bool, error) {
for _, child := range nodes {
rep, err := child.Get(ctx)
if err != nil {
return false, err
}
if rep.Metadata.EndedAt.IsZero() {
return false, nil
}
finished, err := AllNodesFinished(ctx, rep.Children)
if err != nil {
return false, err
}
if !finished {
return false, nil
}
}
return true, nil
}
5 changes: 3 additions & 2 deletions pkg/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ItemMetadata struct {
}

type ItemParams struct {
ID uuid.UUID
CID string
ID uuid.UUID
CID string
Priority bool
}
49 changes: 37 additions & 12 deletions pkg/item/queue_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package item
import (
"context"
"fmt"
"time"

"github.com/bacalhau-project/amplify/pkg/dag"
"github.com/bacalhau-project/amplify/pkg/queue"
"github.com/bacalhau-project/amplify/pkg/task"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)

var (
Expand All @@ -24,21 +26,23 @@ type QueueRepository interface {
}

type queueRepository struct {
repo ItemStore
tf task.TaskFactory
queue queue.Queue
nodeExecutor dag.NodeExecutor[dag.IOSpec]
repo ItemStore
tf task.TaskFactory
priorityQueue queue.Queue
secondaryQueue queue.Queue
nodeExecutor dag.NodeExecutor[dag.IOSpec]
}

func NewQueueRepository(repo ItemStore, queue queue.Queue, taskFactory task.TaskFactory, nodeExecutor dag.NodeExecutor[dag.IOSpec]) (QueueRepository, error) {
if repo == nil || taskFactory == nil || queue == nil || nodeExecutor == nil {
func NewQueueRepository(repo ItemStore, priorityQueue queue.Queue, secondaryQueue queue.Queue, taskFactory task.TaskFactory, nodeExecutor dag.NodeExecutor[dag.IOSpec]) (QueueRepository, error) {
if repo == nil || taskFactory == nil || secondaryQueue == nil || priorityQueue == nil || nodeExecutor == nil {
return nil, fmt.Errorf("missing dependencies")
}
return &queueRepository{
repo: repo,
tf: taskFactory,
queue: queue,
nodeExecutor: nodeExecutor,
repo: repo,
tf: taskFactory,
priorityQueue: priorityQueue,
secondaryQueue: secondaryQueue,
nodeExecutor: nodeExecutor,
}, nil
}

Expand All @@ -49,7 +53,11 @@ func (r *queueRepository) Create(ctx context.Context, req ItemParams) error {
if req.CID == "" {
return ErrInvalidRequestCID
}
if r.queue.IsFull() {
q := r.secondaryQueue
if req.Priority {
q = r.priorityQueue
}
if q.IsFull() {
return queue.ErrQueueFull
}
err := r.repo.NewItem(ctx, req)
Expand All @@ -61,9 +69,26 @@ func (r *queueRepository) Create(ctx context.Context, req ItemParams) error {
return err
}
for _, node := range dags {
err := r.queue.Enqueue(func(ctx context.Context) {
err := q.Enqueue(func(ctx context.Context) {
// Execute the node
r.nodeExecutor.Execute(ctx, req.ID, node)
// Wait until all the nodes in the dag are completed
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
finished, err := dag.AllNodesFinished(ctx, []dag.Node[dag.IOSpec]{node})
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("error checking if all nodes are finished")
return
}
if finished {
log.Ctx(ctx).Info().Msg("all nodes are finished")
return
}
}
}
})
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/item/queue_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Test_QueueRepository_Create(t *testing.T) {
persistence := db.NewInMemDB()
itemStore := newMockItemStore(persistence)
executor, _ := dag.NewNodeExecutor[dag.IOSpec](context.Background(), nil)
repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor)
repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor)
assert.NilError(t, err)
tests := []struct {
name string
Expand All @@ -41,7 +41,7 @@ func Test_QueueRepository_Get(t *testing.T) {
persistence := db.NewInMemDB()
itemStore := newMockItemStore(persistence)
executor, _ := dag.NewNodeExecutor[dag.IOSpec](context.Background(), nil)
repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor)
repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor)
assert.NilError(t, err)
id := uuid.New()
err = repo.Create(context.Background(), ItemParams{ID: id, CID: "cid"})
Expand Down Expand Up @@ -69,7 +69,7 @@ func Test_QueueRepository_List(t *testing.T) {
persistence := db.NewInMemDB()
itemStore := newMockItemStore(persistence)
executor, _ := dag.NewNodeExecutor[dag.IOSpec](context.Background(), nil)
repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor)
repo, err := NewQueueRepository(itemStore, queue.NewMockQueue(), queue.NewMockQueue(), task.NewMockTaskFactory(persistence), executor)
assert.NilError(t, err)
id1 := uuid.New()
err = repo.Create(context.Background(), ItemParams{ID: id1, CID: "cid"})
Expand Down
2 changes: 1 addition & 1 deletion pkg/triggers/triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (t *IPFSSearchTrigger) Start(ctx context.Context, cidChan chan cid.Cid) err
for _, c := range cids {
cidChan <- c
}
log.Ctx(ctx).Debug().Msg("Sleeping for 1 minute before fetching IPFS Search again")
log.Ctx(ctx).Debug().Msg("Sleeping before fetching IPFS Search again")
}
}
}
Expand Down

0 comments on commit b921954

Please sign in to comment.