Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support typed search attributes #599

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregatedpool

import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -304,6 +305,138 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
return errors.E(op, err)
}

case *internal.UpsertWorkflowTypedSearchAttributes:
wp.log.Debug("upsert typed search attributes request", zap.Uint64("ID", msg.ID))
var sau []temporal.SearchAttributeUpdate

for k, v := range command.SearchAttributes {
switch v.Type {
case internal.BoolType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyBool(k).ValueUnset())
continue
}
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(bool); ok {
sau = append(sau, temporal.NewSearchAttributeKeyBool(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.FloatType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyFloat64(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(float64); ok {
sau = append(sau, temporal.NewSearchAttributeKeyFloat64(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a float64 type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.IntType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(int); ok {
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(tt)))
} else {
wp.log.Warn("field value is not an int type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.KeywordType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.KeywordListType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.([]string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a []string (strings array) type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.StringType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.DatetimeType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyTime(k).ValueUnset())
continue
}

if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
tm, err := time.Parse(time.RFC3339, tt)
if err != nil {
return errors.E(op, fmt.Errorf("failed to parse time into RFC3339: %w", err))
}

sau = append(sau, temporal.NewSearchAttributeKeyTime(k).ValueSet(tm))
} else {
wp.log.Warn("bool field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value))
}
}
}

err := wp.env.UpsertTypedSearchAttributes(temporal.NewSearchAttributes(sau...))
if err != nil {
return errors.E(op, err)
}

case *internal.SignalExternalWorkflow:
wp.log.Debug("signal external workflow request", zap.Uint64("ID", msg.ID))
wp.env.SignalExternalWorkflow(
Expand Down
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/temporalio/roadrunner-temporal/v5

go 1.23
go 1.23.2

toolchain go1.23.2
toolchain go1.23.4

require (
github.com/goccy/go-json v0.10.4
Expand All @@ -17,9 +17,9 @@ require (
go.temporal.io/api v1.43.0
go.temporal.io/sdk v1.31.0
go.temporal.io/sdk/contrib/tally v0.2.0
go.temporal.io/server v1.25.2
go.temporal.io/server v1.26.2
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.36.0
google.golang.org/protobuf v1.36.1
)

require (
Expand All @@ -39,7 +39,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/mock v1.7.0-rc.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nexus-rpc/sdk-go v0.1.0 // indirect
Expand Down Expand Up @@ -67,8 +67,8 @@ require (
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241219192143-6b3ec007d9bb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241223144023-3abc09e42ca8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect
google.golang.org/grpc v1.69.2
gopkg.in/yaml.v3 v3.0.1 // indirect
)
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 h1:TmHmbvxPmaegwhDubVz0lICL0J5Ka2vwTzhoePEXsGE=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0/go.mod h1:qztMSjm835F2bXf+5HKAPIS5qsmQDqZna/PgVt4rWtI=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down Expand Up @@ -247,8 +247,8 @@ go.temporal.io/sdk v1.31.0 h1:CLYiP0R5Sdj0gq8LyYKDDz4ccGOdJPR8wNGJU0JGwj8=
go.temporal.io/sdk v1.31.0/go.mod h1:8U8H7rF9u4Hyb4Ry9yiEls5716DHPNvVITPNkgWUwE8=
go.temporal.io/sdk/contrib/tally v0.2.0 h1:XnTJIQcjOv+WuCJ1u8Ve2nq+s2H4i/fys34MnWDRrOo=
go.temporal.io/sdk/contrib/tally v0.2.0/go.mod h1:1kpSuCms/tHeJQDPuuKkaBsMqfHnIIRnCtUYlPNXxuE=
go.temporal.io/server v1.25.2 h1:AVlFfNStYVvgyIG/QlfPsp1RgiP+IBuMnEX/kIYAHJ4=
go.temporal.io/server v1.25.2/go.mod h1:3BzDvfiMwi9ETiv4UoE1LWhuIctR6Ep+G1RVomH9YZE=
go.temporal.io/server v1.26.2 h1:vDW11lxslYPlGDbQklWi/tqbkVZ2ExtRO1jNjvZmUUI=
go.temporal.io/server v1.26.2/go.mod h1:tgY+4z/PuIdqs6ouV1bT90RWSWfEioWkzmrNrLYLUrk=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down Expand Up @@ -377,10 +377,10 @@ google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto/googleapis/api v0.0.0-20241219192143-6b3ec007d9bb h1:B7GIB7sr443wZ/EAEl7VZjmh1V6qzkt5V+RYcUYtS1U=
google.golang.org/genproto/googleapis/api v0.0.0-20241219192143-6b3ec007d9bb/go.mod h1:E5//3O5ZIG2l71Xnt+P/CYUY8Bxs8E7WMoZ9tlcMbAY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb h1:3oy2tynMOP1QbTC0MsNNAV+Se8M2Bd0A5+x1QHyw+pI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/genproto/googleapis/api v0.0.0-20241223144023-3abc09e42ca8 h1:st3LcW/BPi75W4q1jJTEor/QWwbNlPlDG0JTn6XhZu0=
google.golang.org/genproto/googleapis/api v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:klhJGKFyG8Tn50enBn7gizg4nXGXJ+jqEREdCWaPcV4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand All @@ -404,8 +404,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 2 additions & 2 deletions go.work
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
go 1.23
go 1.23.2

toolchain go1.23.0
toolchain go1.23.4

use (
.
Expand Down
Loading
Loading