Skip to content

Commit

Permalink
fix(analytics): fix content parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
philwinder committed Apr 26, 2023
1 parent 95aaea4 commit 8340f48
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 88 deletions.
9 changes: 6 additions & 3 deletions cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
defaultNumWorkers = 10
defaultNumWorkers = 10
defaultMaxQueueSize = 1024
)

Expand Down Expand Up @@ -59,7 +59,6 @@ func createRunCommand(appContext cli.AppContext) runEFunc {
jobQueue.Start()
defer jobQueue.Stop()

// Task Factory
nodeFactory, err := dag.NewNodeStore(ctx, db.NewInMemDB(), dag.NewInMemWorkRepository[dag.IOSpec]())
if err != nil {
return err
Expand All @@ -68,13 +67,17 @@ func createRunCommand(appContext cli.AppContext) runEFunc {
if err != nil {
return err
}
nodeExecutor, err := dag.NewNodeExecutor[dag.IOSpec](ctx, nil)
if err != nil {
return err
}

rootNodes, err := taskFactory.CreateTask(ctx, uuid.New(), args[0])
if err != nil {
return err
}
for _, rootNode := range rootNodes {
dag.Execute(ctx, rootNode)
nodeExecutor.Execute(ctx, uuid.New(), rootNode)
}
cancelFunc()
results := util.Dedup(api.GetLeafOutputs(ctx, rootNodes))
Expand Down
14 changes: 10 additions & 4 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,20 @@ func executeServeCommand(appContext cli.AppContext) runEFunc {
return err
}

// QueueRepository interacts with the queue
queueRepository, err := item.NewQueueRepository(itemStore, dagQueue, taskFactory)
// AnalyticsRepository manages amplify analytics
analyticsRepository := analytics.NewAnalyticsRepository(persistenceImpl.(db.Analytics))

// NodeExecutor is responsible for executing nodes
nodeExecutor, err := dag.NewNodeExecutor[dag.IOSpec](ctx, analyticsRepository)
if err != nil {
return err
}

// AnalyticsRepository manages amplify analytics
analyticsRepository := analytics.NewAnalyticsRepository(persistenceImpl.(db.Analytics))
// QueueRepository interacts with the queue
queueRepository, err := item.NewQueueRepository(itemStore, dagQueue, taskFactory, nodeExecutor)
if err != nil {
return err
}

// AmplifyAPI provides the REST API
amplifyAPI, err := api.NewAmplifyAPI(queueRepository, taskFactory, analyticsRepository)
Expand Down
41 changes: 41 additions & 0 deletions pkg/analytics/analytics.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package analytics

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"strings"

"github.com/bacalhau-project/amplify/pkg/db"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)

var (
Expand All @@ -17,8 +23,10 @@ var (
type analyticsRepository struct {
database db.Analytics
}

type AnalyticsRepository interface {
QueryTopResultsByKey(ctx context.Context, params QueryTopResultsByKeyParams) (map[string]interface{}, error)
ParseAndStore(context.Context, uuid.UUID, string) error
}

func NewAnalyticsRepository(d db.Analytics) AnalyticsRepository {
Expand Down Expand Up @@ -52,3 +60,36 @@ func (r *analyticsRepository) QueryTopResultsByKey(ctx context.Context, params Q
}
return results, nil
}

func (r *analyticsRepository) ParseAndStore(ctx context.Context, id uuid.UUID, result string) error {
log.Ctx(ctx).Trace().Msgf("execution %s parsing and storing results: %s", id.String(), result)

f := strings.NewReader(result)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
dec := json.NewDecoder(strings.NewReader(line))
for {
var resultMap map[string]string
if err := dec.Decode(&resultMap); err != nil {
if err == io.EOF {
break
}
break
}
for k, v := range resultMap {
log.Ctx(ctx).Trace().Msgf("execution %s storing result metadata: %s=%s", id.String(), k, v)
err := r.database.CreateResultMetadata(ctx, db.CreateResultMetadataParams{
QueueItemID: id,
Type: k,
Value: v,
})
if err != nil {
return err
}
}
}
}

return nil
}
105 changes: 105 additions & 0 deletions pkg/analytics/analytics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package analytics

import (
"context"
"os"
"testing"

"github.com/bacalhau-project/amplify/pkg/db"
"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gotest.tools/assert"
)

func Test_analyticsRepository_ParseAndStore(t *testing.T) {
log.Logger = zerolog.New(zerolog.ConsoleWriter{
Out: os.Stderr,
NoColor: true,
PartsExclude: []string{
zerolog.TimestampFieldName,
},
})
zerolog.SetGlobalLevel(zerolog.TraceLevel)

type args struct {
ctx context.Context
id uuid.UUID
result string
}
tests := []struct {
name string
args args
queryKey string
typeKey string
expectedResult int
wantErr bool
}{
{
"simple",
args{ctx: context.Background(), id: uuid.New(), result: `{"content-type": "image/png"}`},
"content-type",
"image/png",
1,
false,
},
{
"multiline",
args{ctx: context.Background(), id: uuid.New(), result: `{"content-type": "image/png"}
{"content-type": "image/png"}`},
"content-type",
"image/png",
2,
false,
},
{
"sameline",
args{ctx: context.Background(), id: uuid.New(), result: `{"content-type": "image/png"} {"content-type": "image/png"}`},
"content-type",
"image/png",
2,
false,
},
{
"big",
args{ctx: context.Background(), id: uuid.New(), result: `{"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/svg+xml"} {"Content-Type":"image/jpeg"} {"Content-Type":"image/svg+xml"} {"Content-Type":"image/svg+xml"}`},
"content-type",
"image/svg+xml",
3,
false,
},
{
"random_crap",
args{ctx: context.Background(), id: uuid.New(), result: `sing subdir: /25\nprocessing input_file: /inputs/image\nprocessing input_file: /inputs/image/big\nprocessing input_file: /inputs/image/big/image1.jpg\n-e :\\n\\tdir = \"\"\\n\\tbase = \"image1\"\\n\\text = \"jpg\"\ninput_file: /inputs/image/big/image1.jpg\n`},
"",
"",
0,
false,
},
{
"random_crap_with_json_in_middle",
args{ctx: context.Background(), id: uuid.New(), result: `using subdir: /25\nprocessing input_file: /inputs/image\nprocessing
{"Content-Type":"image/jpeg"}
input_file: /inputs/image/big\nprocessing input_file: /inputs/image/big/image1.jpg\n-e :\\n\\tdir = \"\"\\n\\tbase = \"image1\"\\n\\text = \"jpg\"\ninput_file: /inputs/image/big/image1.jpg\n`},
"Content-Type",
"image/jpeg",
1,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &analyticsRepository{
database: db.NewInMemDB(),
}
if err := r.ParseAndStore(tt.args.ctx, tt.args.id, tt.args.result); (err != nil) != tt.wantErr {
t.Errorf("analyticsRepository.ParseAndStore() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.expectedResult > 0 {
res, err := r.QueryTopResultsByKey(tt.args.ctx, QueryTopResultsByKeyParams{Key: tt.queryKey, PageSize: 1})
assert.NilError(t, err)
assert.Equal(t, res[tt.typeKey], int64(tt.expectedResult))
}
})
}
}
12 changes: 6 additions & 6 deletions pkg/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ import (
)

func InitializeConfig(cmd *cobra.Command) *config.AppConfig {
// Initialize viper
_, err := config.InitViper(cmd)
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize config")
}

log.Logger = zerolog.New(zerolog.ConsoleWriter{
Out: os.Stderr,
NoColor: true,
Expand All @@ -24,6 +18,12 @@ func InitializeConfig(cmd *cobra.Command) *config.AppConfig {
},
})

// Initialize viper
_, err := config.InitViper(cmd)
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize config")
}

// Parse final config for log level settings
finalConfig := config.ParseAppConfig(cmd)

Expand Down
46 changes: 34 additions & 12 deletions pkg/dag/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/bacalhau-project/amplify/pkg/analytics"
"github.com/bacalhau-project/amplify/pkg/db"
"github.com/bacalhau-project/amplify/pkg/util"
"github.com/google/uuid"
Expand Down Expand Up @@ -78,16 +79,30 @@ type NodeSpec[T any] struct {
Work Work[T]
}

type NodeExecutor[T any] interface {
Execute(ctx context.Context, executionId uuid.UUID, node Node[T])
}

func NewNodeExecutor[T any](ctx context.Context, analytics analytics.AnalyticsRepository) (NodeExecutor[T], error) {
return &nodeExecutor[T]{
analytics: analytics,
}, nil
}

type nodeExecutor[T any] struct {
analytics analytics.AnalyticsRepository
}

// Execute a given node
func Execute[T any](ctx context.Context, node Node[T]) {
func (e *nodeExecutor[T]) Execute(ctx context.Context, executionId uuid.UUID, node Node[T]) {
if node == nil {
log.Ctx(ctx).Error().Msg("node is nil")
log.Ctx(ctx).Error().Str("executionId", executionId.String()).Msg("node is nil")
return
}
// Get a copy of the node representation
n, err := node.Get(ctx)
if err != nil {
log.Ctx(ctx).Error().Err(err).Int32("id", n.Id).Msg("error getting node")
log.Ctx(ctx).Error().Str("executionId", executionId.String()).Err(err).Int32("id", n.Id).Msg("error getting node")
return
}

Expand All @@ -96,7 +111,7 @@ func Execute[T any](ctx context.Context, node Node[T]) {
for _, parent := range n.Parents {
p, err := parent.Get(ctx)
if err != nil {
log.Ctx(ctx).Error().Err(err).Int32("id", n.Id).Msg("error getting parent")
log.Ctx(ctx).Error().Str("executionId", executionId.String()).Err(err).Int32("id", n.Id).Msg("error getting parent")
return
}
if p.Metadata.EndedAt.IsZero() {
Expand All @@ -105,32 +120,39 @@ func Execute[T any](ctx context.Context, node Node[T]) {
}
}
if !ready {
log.Ctx(ctx).Debug().Int32("id", n.Id).Msg("parent not ready, waiting")
log.Ctx(ctx).Debug().Str("executionId", executionId.String()).Int32("id", n.Id).Msg("parent not ready, waiting")
return
}
// Check if this node is/has already been executed
if !n.Metadata.StartedAt.IsZero() {
log.Ctx(ctx).Debug().Int32("id", n.Id).Msg("already started, skipping")
log.Ctx(ctx).Debug().Str("executionId", executionId.String()).Int32("id", n.Id).Msg("already started, skipping")
return
}
n, err = updateNodeStartTime(ctx, node) // Set the start time
if err != nil {
log.Ctx(ctx).Error().Err(err).Int32("id", n.Id).Msg("error updating node start time")
log.Ctx(ctx).Error().Err(err).Str("executionId", executionId.String()).Int32("id", n.Id).Msg("error updating node start time")
return
}
resultChan := make(chan NodeResult, 10) // Channel to receive status updates, must close once complete
outputs := n.Work(ctx, n.Inputs, resultChan) // Do the work
for status := range resultChan { // Block waiting for the status
err = node.SetResults(ctx, status)
if err != nil {
log.Ctx(ctx).Error().Err(err).Int32("id", n.Id).Msg("error setting node results")
log.Ctx(ctx).Error().Err(err).Str("executionId", executionId.String()).Int32("id", n.Id).Msg("error setting node results")
return
}
if e.analytics != nil {
err = e.analytics.ParseAndStore(ctx, executionId, status.StdOut)
if err != nil {
log.Ctx(ctx).Error().Err(err).Str("executionId", executionId.String()).Int32("id", n.Id).Msg("error parsing and storing analytics")
return
}
}
}
for _, output := range outputs { // Add the outputs to the node
err = node.AddOutput(ctx, output)
if err != nil {
log.Ctx(ctx).Error().Err(err).Int32("id", n.Id).Msg("error adding output")
log.Ctx(ctx).Error().Err(err).Str("executionId", executionId.String()).Int32("id", n.Id).Msg("error adding output")
return
}
}
Expand All @@ -141,20 +163,20 @@ func Execute[T any](ctx context.Context, node Node[T]) {
for _, output := range outputs { // Add the outputs to the node
err = child.AddInput(ctx, output)
if err != nil {
log.Ctx(ctx).Error().Err(err).Int32("id", n.Id).Msg("error adding inputs to child")
log.Ctx(ctx).Error().Err(err).Str("executionId", executionId.String()).Int32("id", n.Id).Msg("error adding inputs to child")
return
}
}
}
n, err = updateNodeEndTime(ctx, node) // Set the end time
if err != nil {
log.Ctx(ctx).Error().Err(err).Int32("id", n.Id).Msg("error updating node end time")
log.Ctx(ctx).Error().Err(err).Str("executionId", executionId.String()).Int32("id", n.Id).Msg("error updating node end time")
return
}

// Execute all children
for _, child := range n.Children {
Execute(ctx, child)
e.Execute(ctx, executionId, child)
}
}

Expand Down
Loading

0 comments on commit 8340f48

Please sign in to comment.