From 1f0050d085bb90be8c9490e450e72a87b1a0281f Mon Sep 17 00:00:00 2001 From: Antoine GIRARD Date: Thu, 18 Jul 2024 12:42:57 +0200 Subject: [PATCH 1/4] couchbase: Add CAS support --- .../pages/processors/couchbase.adoc | 2 +- internal/impl/couchbase/couchbase.go | 44 +++++++++++++------ internal/impl/couchbase/processor.go | 24 +++++++--- internal/impl/couchbase/processor_test.go | 25 +++++++++++ 4 files changed, 75 insertions(+), 20 deletions(-) diff --git a/docs/modules/components/pages/processors/couchbase.adoc b/docs/modules/components/pages/processors/couchbase.adoc index 649e87b659..703f5bac64 100644 --- a/docs/modules/components/pages/processors/couchbase.adoc +++ b/docs/modules/components/pages/processors/couchbase.adoc @@ -69,7 +69,7 @@ couchbase: -- ====== -When inserting, replacing or upserting documents, each must have the `content` property set. +When inserting, replacing or upserting documents, each must have the `content` property set. CAS value is stored in meta `couchbase_cas`. It prevent read/write conflict by only allowing write if not modified by other. You can clear the value with `meta couchbase_cas = deleted()` to disable this check. == Fields diff --git a/internal/impl/couchbase/couchbase.go b/internal/impl/couchbase/couchbase.go index 52e4b9373b..ab35c6136d 100644 --- a/internal/impl/couchbase/couchbase.go +++ b/internal/impl/couchbase/couchbase.go @@ -20,56 +20,72 @@ import ( "github.com/couchbase/gocb/v2" ) -func valueFromOp(op gocb.BulkOp) (out any, err error) { +func valueFromOp(op gocb.BulkOp) (out any, cas gocb.Cas, err error) { switch o := op.(type) { case *gocb.GetOp: if o.Err != nil { - return nil, o.Err + return nil, gocb.Cas(0), o.Err } err := o.Result.Content(&out) - return out, err + + return out, o.Result.Cas(), err case *gocb.InsertOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err case *gocb.RemoveOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err case *gocb.ReplaceOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err case *gocb.UpsertOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err } - return nil, errors.New("type not supported") + return nil, gocb.Cas(0), errors.New("type not supported") } -func get(key string, _ []byte) gocb.BulkOp { +func get(key string, _ []byte, _ gocb.Cas) gocb.BulkOp { return &gocb.GetOp{ ID: key, } } -func insert(key string, data []byte) gocb.BulkOp { +func insert(key string, data []byte, _ gocb.Cas) gocb.BulkOp { return &gocb.InsertOp{ ID: key, Value: data, } } -func remove(key string, _ []byte) gocb.BulkOp { +func remove(key string, _ []byte, cas gocb.Cas) gocb.BulkOp { return &gocb.RemoveOp{ - ID: key, + ID: key, + Cas: cas, } } -func replace(key string, data []byte) gocb.BulkOp { +func replace(key string, data []byte, cas gocb.Cas) gocb.BulkOp { return &gocb.ReplaceOp{ ID: key, Value: data, + Cas: cas, } } -func upsert(key string, data []byte) gocb.BulkOp { +func upsert(key string, data []byte, cas gocb.Cas) gocb.BulkOp { return &gocb.UpsertOp{ ID: key, Value: data, + Cas: cas, } } diff --git a/internal/impl/couchbase/processor.go b/internal/impl/couchbase/processor.go index 2e4e508e94..141f68296a 100644 --- a/internal/impl/couchbase/processor.go +++ b/internal/impl/couchbase/processor.go @@ -27,6 +27,11 @@ import ( "github.com/redpanda-data/connect/v4/internal/impl/couchbase/client" ) +const ( + // MetaCASKey hold CAS of entry. + MetaCASKey = "couchbase_cas" +) + var ( // ErrInvalidOperation specified operation is not supported. ErrInvalidOperation = errors.New("invalid operation") @@ -41,7 +46,7 @@ func ProcessorConfig() *service.ConfigSpec { Version("4.11.0"). Categories("Integration"). Summary("Performs operations against Couchbase for each message, allowing you to store or retrieve data within message payloads."). - Description("When inserting, replacing or upserting documents, each must have the `content` property set."). + Description("When inserting, replacing or upserting documents, each must have the `content` property set. CAS value is stored in meta `couchbase_cas`. It prevent read/write conflict by only allowing write if not modified by other. You can clear the value with `meta couchbase_cas = deleted()` to disable this check."). Field(service.NewInterpolatedStringField("id").Description("Document id.").Example(`${! json("id") }`)). Field(service.NewBloblangField("content").Description("Document content.").Optional()). Field(service.NewStringAnnotatedEnumField("operation", map[string]string{ @@ -73,7 +78,7 @@ type Processor struct { *couchbaseClient id *service.InterpolatedString content *bloblang.Executor - op func(key string, data []byte) gocb.BulkOp + op func(key string, data []byte, cas gocb.Cas) gocb.BulkOp } // NewProcessor returns a Couchbase processor. @@ -139,7 +144,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat } // generate query - for index := range newMsg { + for index, msg := range newMsg { // generate id k, err := inBatch.TryInterpolatedString(index, p.id) if err != nil { @@ -159,7 +164,14 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat } } - ops[index] = p.op(k, content) + var cas gocb.Cas // retrieve cas if set + if val, ok := msg.MetaGetMut(MetaCASKey); ok { + if v, ok := val.(gocb.Cas); ok { + cas = v + } + } + + ops[index] = p.op(k, content, cas) } // execute @@ -170,7 +182,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat // set results for index, part := range newMsg { - out, err := valueFromOp(ops[index]) + out, cas, err := valueFromOp(ops[index]) if err != nil { part.SetError(fmt.Errorf("couchbase operator failed: %w", err)) } @@ -180,6 +192,8 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat } else if out != nil { part.SetStructured(out) } + + part.MetaSetMut(MetaCASKey, cas) } return []service.MessageBatch{newMsg}, nil diff --git a/internal/impl/couchbase/processor_test.go b/internal/impl/couchbase/processor_test.go index 988bd14890..bfffe58be0 100644 --- a/internal/impl/couchbase/processor_test.go +++ b/internal/impl/couchbase/processor_test.go @@ -196,6 +196,11 @@ operation: 'insert' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -222,6 +227,11 @@ operation: 'upsert' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -248,6 +258,11 @@ operation: 'replace' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -273,6 +288,11 @@ operation: 'get' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message should contain expected payload. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -298,6 +318,11 @@ operation: 'remove' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) From a4848b4e046128900f28d6f2c421c85f731d8436 Mon Sep 17 00:00:00 2001 From: Antoine GIRARD Date: Thu, 18 Jul 2024 12:42:57 +0200 Subject: [PATCH 2/4] add stream test for CAS --- internal/impl/couchbase/processor_test.go | 111 +++++++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) diff --git a/internal/impl/couchbase/processor_test.go b/internal/impl/couchbase/processor_test.go index bfffe58be0..0565147352 100644 --- a/internal/impl/couchbase/processor_test.go +++ b/internal/impl/couchbase/processor_test.go @@ -17,6 +17,7 @@ package couchbase_test import ( "context" "fmt" + "sync" "testing" "time" @@ -28,6 +29,8 @@ import ( "github.com/redpanda-data/benthos/v4/public/service/integration" "github.com/redpanda-data/connect/v4/internal/impl/couchbase" + + _ "github.com/redpanda-data/benthos/v4/public/components/pure" ) func TestProcessorConfigLinting(t *testing.T) { @@ -349,10 +352,116 @@ operation: 'get' assert.Len(t, msgOut[0], 1) // message should contain an error. - assert.Error(t, msgOut[0][0].GetError(), "TODO") + assert.Error(t, msgOut[0][0].GetError()) // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) assert.Equal(t, uid, string(dataOut)) } + +func TestIntegrationCouchbaseStream(t *testing.T) { + ctx := context.Background() + + integration.CheckSkip(t) + + servicePort := requireCouchbase(t) + bucket := fmt.Sprintf("testing-stream-%d", time.Now().Unix()) + require.NoError(t, createBucket(context.Background(), t, servicePort, bucket)) + t.Cleanup(func() { + require.NoError(t, removeBucket(context.Background(), t, servicePort, bucket)) + }) + + for _, clearCAS := range []bool{true, false} { + t.Run(fmt.Sprintf("%t", clearCAS), func(t *testing.T) { + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`)) + + inFn, err := streamOutBuilder.AddBatchProducerFunc() + require.NoError(t, err) + + var outBatches []service.MessageBatch + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + outBatchMut.Lock() + outBatches = append(outBatches, mb) + outBatchMut.Unlock() + return nil + })) + + // insert + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + id: '${! json("key") }' + content: 'root = this' + operation: 'insert' +`, servicePort, bucket, username, password))) + + if clearCAS { // ignore cas check + require.NoError(t, streamOutBuilder.AddProcessorYAML(` +mapping: | + meta couchbase_cas = deleted() +`)) + } + + // upsert + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + id: '${! json("key") }' + content: 'root = this' + operation: 'upsert' +`, servicePort, bucket, username, password))) + + if clearCAS { // ignore cas check + require.NoError(t, streamOutBuilder.AddProcessorYAML(` +mapping: | + meta couchbase_cas = deleted() +`)) + } + // remove + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + id: '${! json("key") }' + operation: 'remove' +`, servicePort, bucket, username, password))) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + require.NoError(t, inFn(ctx, service.MessageBatch{ + service.NewMessage([]byte(`{"key":"hello","value":"word"}`)), + })) + require.NoError(t, streamOut.StopWithin(time.Second*15)) + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1 + }, time.Second*5, time.Millisecond*100) + + // batch processing should be fine and contain one message. + assert.NoError(t, err) + assert.Len(t, outBatches, 1) + assert.Len(t, outBatches[0], 1) + + // message should contain an error. + assert.NoError(t, outBatches[0][0].GetError()) + }) + } +} From 921013b215cb5f3ac5e080195e5cfca2d864e453 Mon Sep 17 00:00:00 2001 From: Antoine GIRARD Date: Thu, 18 Jul 2024 17:17:25 +0200 Subject: [PATCH 3/4] add failing test --- internal/impl/couchbase/cache_test.go | 15 ++-- internal/impl/couchbase/processor_test.go | 101 ++++++++++++++++++++++ 2 files changed, 108 insertions(+), 8 deletions(-) diff --git a/internal/impl/couchbase/cache_test.go b/internal/impl/couchbase/cache_test.go index 8a8d6cbeff..ed47d0c9c8 100644 --- a/internal/impl/couchbase/cache_test.go +++ b/internal/impl/couchbase/cache_test.go @@ -62,13 +62,17 @@ cache_resources: ) } -func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error { - cluster, err := gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{ +func getCluster(ctx context.Context, tb testing.TB, port string) (*gocb.Cluster, error) { + return gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{ Authenticator: gocb.PasswordAuthenticator{ Username: username, Password: password, }, }) +} + +func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error { + cluster, err := getCluster(ctx, tb, port) if err != nil { return err } @@ -79,12 +83,7 @@ func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error } func createBucket(ctx context.Context, tb testing.TB, port, bucket string) error { - cluster, err := gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{ - Authenticator: gocb.PasswordAuthenticator{ - Username: username, - Password: password, - }, - }) + cluster, err := getCluster(ctx, tb, port) if err != nil { return err } diff --git a/internal/impl/couchbase/processor_test.go b/internal/impl/couchbase/processor_test.go index 0565147352..da15fda7be 100644 --- a/internal/impl/couchbase/processor_test.go +++ b/internal/impl/couchbase/processor_test.go @@ -465,3 +465,104 @@ couchbase: }) } } + +func TestIntegrationCouchbaseStreamError(t *testing.T) { + ctx := context.Background() + + integration.CheckSkip(t) + + servicePort := requireCouchbase(t) + bucket := fmt.Sprintf("testing-stream-error-%d", time.Now().Unix()) + require.NoError(t, createBucket(context.Background(), t, servicePort, bucket)) + t.Cleanup(func() { + require.NoError(t, removeBucket(context.Background(), t, servicePort, bucket)) + }) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`)) + + inFn, err := streamOutBuilder.AddBatchProducerFunc() + require.NoError(t, err) + + var outBatches []service.MessageBatch + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + outBatchMut.Lock() + outBatches = append(outBatches, mb) + outBatchMut.Unlock() + return nil + })) + + // insert + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + id: '${! json("key") }' + content: | + root = this + root.at = timestamp_unix_micro() + operation: 'insert' +`, servicePort, bucket, username, password))) + + // upsert adn remove in parallel + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +workflow: + meta_path: "" + branches: + write: + processors: + - couchbase: + url: 'couchbase://localhost:%[1]s' + bucket: %[2]s + username: %[3]s + password: %[4]s + id: '${! json("key") }' + content: | + root = this + root.at = timestamp_unix_micro() + operation: 'upsert' + remove: + processors: + - sleep: + duration: "1s" + - couchbase: + url: 'couchbase://localhost:%[1]s' + bucket: %[2]s + username: %[3]s + password: %[4]s + id: '${! json("key") }' + content: | + root = this + root.at = timestamp_unix_micro() + operation: 'remove' +`, servicePort, bucket, username, password))) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + require.NoError(t, inFn(ctx, service.MessageBatch{ + service.NewMessage([]byte(`{"key":"hello","value":"word"}`)), + })) + require.NoError(t, streamOut.StopWithin(time.Second*15)) + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1 + }, time.Second*5, time.Millisecond*100) + + // batch contain one message. + assert.NoError(t, err) + assert.Len(t, outBatches, 1) + assert.Len(t, outBatches[0], 1) + + // message should contain an error. + assert.Error(t, outBatches[0][0].GetError()) +} From a39bb0b3097e322a2529490214866fb184c378a7 Mon Sep 17 00:00:00 2001 From: Antoine GIRARD Date: Thu, 18 Jul 2024 18:25:05 +0200 Subject: [PATCH 4/4] more explicit with replace --- internal/impl/couchbase/processor_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/impl/couchbase/processor_test.go b/internal/impl/couchbase/processor_test.go index da15fda7be..b19c7a0e17 100644 --- a/internal/impl/couchbase/processor_test.go +++ b/internal/impl/couchbase/processor_test.go @@ -408,7 +408,7 @@ mapping: | `)) } - // upsert + // replace require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` couchbase: url: 'couchbase://localhost:%s' @@ -417,7 +417,7 @@ couchbase: password: %s id: '${! json("key") }' content: 'root = this' - operation: 'upsert' + operation: 'replace' `, servicePort, bucket, username, password))) if clearCAS { // ignore cas check @@ -523,7 +523,7 @@ workflow: content: | root = this root.at = timestamp_unix_micro() - operation: 'upsert' + operation: 'replace' remove: processors: - sleep: @@ -537,7 +537,7 @@ workflow: content: | root = this root.at = timestamp_unix_micro() - operation: 'remove' + operation: 'replace' `, servicePort, bucket, username, password))) streamOut, err := streamOutBuilder.Build()