Skip to content

Commit

Permalink
Fix: add validation
Browse files Browse the repository at this point in the history
  • Loading branch information
alonsopec89 committed Oct 16, 2024
1 parent b0bda2f commit 34108ee
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions go/genkit/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,20 @@ func (f *Flow[In, Out, Stream]) start(ctx context.Context, input In, cb streamin
return state, nil
}

func isInputMissing(input any) bool {
if input == nil {
return true
}
v := reflect.ValueOf(input)
switch v.Kind() {
case reflect.Ptr, reflect.Slice, reflect.Map, reflect.Interface, reflect.Chan, reflect.Func:
return v.IsNil()
default:
// For other types like structs, zero value might be a valid input.
return false
}
}

// execute performs one flow execution.
// Using its flowState argument as a starting point, it runs the flow function until
// it finishes or is interrupted.
Expand Down Expand Up @@ -511,13 +525,21 @@ func (f *Flow[In, Out, Stream]) execute(ctx context.Context, state *flowState[In
traceID := rootSpanContext.TraceID().String()
exec.TraceIDs = append(exec.TraceIDs, traceID)
// TODO: Save rootSpanContext in the state.
if reflect.ValueOf(input).IsZero() {
if state == nil || reflect.ValueOf(state.Input).IsZero() {
return base.Zero[Out](), fmt.Errorf("input is missing and cannot be retrieved from state")
if isInputMissing(input) {
if state == nil {
return base.Zero[Out](), errors.New("input is missing and state is nil")
}
if isInputMissing(state.Input) {
return base.Zero[Out](), errors.New("input is missing and state.Input is also empty")
}
input = state.Input
// TODO: convert input to string
//tracing.SetCustomMetadataAttr(ctx, "flow:input", string(input))

// Convert input to JSON string for tracing metadata
bytes, err := json.Marshal(input)
if err != nil {
return base.Zero[Out](), fmt.Errorf("failed to marshal input for tracing: %w", err)
}
tracing.SetCustomMetadataAttr(ctx, "input", string(bytes))
}
start := time.Now()
var err error
Expand Down

0 comments on commit 34108ee

Please sign in to comment.