Skip to content

Commit

Permalink
chore: move wasi-kv example to folder
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Aug 9, 2024
1 parent f079bf1 commit 7c5090b
Show file tree
Hide file tree
Showing 23 changed files with 1,013 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Alternatively, you can use [Quick Install](https://docs.couchbase.com/server/cur
### Running

```shell
cd examples/wasi-keyvalue
WASMCLOUD_SECRETS_TOPIC=wasmcloud.secrets \
wash up -d

Expand Down
File renamed without changes.
44 changes: 44 additions & 0 deletions examples/wasi-keyvalue/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import (
"errors"

"github.com/wasmCloud/provider-sdk-go"
)

type CouchbaseConnectionArgs struct {
Username string
Password string
BucketName string
ConnectionString string
ScopeName string
CollectionName string
}

// Construct Couchbase connection args from config and secrets
func validateCouchbaseConfig(config map[string]string, secrets map[string]provider.SecretValue) (CouchbaseConnectionArgs, error) {
connectionArgs := CouchbaseConnectionArgs{}
if username, ok := config["username"]; !ok || username == "" {
return connectionArgs, errors.New("username config is required")
} else {
connectionArgs.Username = username
}
if bucketName, ok := config["bucketName"]; !ok || bucketName == "" {
return connectionArgs, errors.New("bucketName config is required")
} else {
connectionArgs.BucketName = bucketName
}
if connectionString, ok := config["connectionString"]; !ok || connectionString == "" {
return connectionArgs, errors.New("connectionString config is required")
} else {
connectionArgs.ConnectionString = connectionString
}

password := secrets["password"].String.Reveal()
if password == "" {
return connectionArgs, errors.New("password secret is required")
} else {
connectionArgs.Password = password
}
return connectionArgs, nil
}
47 changes: 47 additions & 0 deletions examples/wasi-keyvalue/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module github.com/couchbase-examples/wasmcloud-provider-couchbase

go 1.22.3

require (
github.com/couchbase/gocb/v2 v2.8.1
github.com/wasmCloud/provider-sdk-go v0.0.0-20240724131928-654ff747dffc
github.com/wrpc/wrpc/go v0.0.0-20240619071643-b830439e40d6
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0
)

require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d // indirect
)

require (
github.com/couchbase/gocbcore/v10 v10.4.1 // indirect
github.com/couchbase/gocbcoreps v0.1.2 // indirect
github.com/couchbase/goprotostellar v1.0.2 // indirect
github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20230515165046-68b522a21131 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/nats-io/nats.go v1.36.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/grpc v1.64.1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
198 changes: 198 additions & 0 deletions examples/wasi-keyvalue/go.sum

Large diffs are not rendered by default.

153 changes: 153 additions & 0 deletions examples/wasi-keyvalue/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//go:generate wit-bindgen-wrpc go --out-dir bindings --package github.com/couchbase-examples/wasmcloud-provider-couchbase/bindings wit

package main

import (
"context"
"errors"
"log"
"os"
"os/signal"
"syscall"
"time"

server "github.com/couchbase-examples/wasmcloud-provider-couchbase/bindings"
"github.com/couchbase/gocb/v2"
"github.com/wasmCloud/provider-sdk-go"
)

func main() {
if err := run(); err != nil {
log.Fatal(err)
}
}

func run() error {
// Handle SIGINT (CTRL+C) gracefully.
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()

// Set up OpenTelemetry.
otelShutdown, err := setupOTelSDK(ctx)
if err != nil {
return err
}
// Handle shutdown properly so nothing leaks.
defer func() {
err = errors.Join(err, otelShutdown(context.Background()))
}()

// Initialize the provider with callbacks to track linked components
providerHandler := Handler{
linkedFrom: make(map[string]map[string]string),
clusterConnections: make(map[string]*gocb.Collection),
}

p, err := provider.New(
provider.TargetLinkPut(func(link provider.InterfaceLinkDefinition) error {
return handleNewTargetLink(&providerHandler, link)
}),
provider.TargetLinkDel(func(link provider.InterfaceLinkDefinition) error {
return handleDelTargetLink(&providerHandler, link)
}),
provider.HealthCheck(func() string {
return handleHealthCheck(&providerHandler)
}),
provider.Shutdown(func() error {
return handleShutdown(&providerHandler)
}),
)
if err != nil {
return err
}

// Store the provider for use in the handlers
providerHandler.WasmcloudProvider = p

// Setup two channels to await RPC and control interface operations
providerCh := make(chan error, 1)
signalCh := make(chan os.Signal, 1)

// Handle RPC operations
stopFunc, err := server.Serve(p.RPCClient, &providerHandler, &providerHandler)
if err != nil {
p.Shutdown()
return err
}

// Handle control interface operations
go func() {
err := p.Start()
providerCh <- err
}()

// Shutdown on SIGINT
signal.Notify(signalCh, syscall.SIGINT)

// Run provider until either a shutdown is requested or a SIGINT is received
select {
case err = <-providerCh:
stopFunc()
return err
case <-signalCh:
p.Shutdown()
stopFunc()
}
return nil
}

// Provider handler functions
func handleNewTargetLink(handler *Handler, link provider.InterfaceLinkDefinition) error {
handler.Logger.Info("Handling new target link", "link", link)
handler.linkedFrom[link.SourceID] = link.TargetConfig
couchbaseConnectionArgs, err := validateCouchbaseConfig(link.TargetConfig, link.TargetSecrets)
if err != nil {
handler.Logger.Error("Invalid couchbase target config", "error", err)
return err
}
handler.updateCouchbaseCluster(handler, link.SourceID, couchbaseConnectionArgs)
return nil
}

func (h *Handler) updateCouchbaseCluster(handler *Handler, sourceId string, connectionArgs CouchbaseConnectionArgs) {
// Connect to the cluster
cluster, err := gocb.Connect(connectionArgs.ConnectionString, gocb.ClusterOptions{
Username: connectionArgs.Username,
Password: connectionArgs.Password,
})
if err != nil {
handler.Logger.Error("unable to connect to couchbase cluster", "error", err)
return
}
var collection *gocb.Collection
if connectionArgs.CollectionName != "" && connectionArgs.ScopeName != "" {
collection = cluster.Bucket(connectionArgs.BucketName).Scope(connectionArgs.ScopeName).Collection(connectionArgs.CollectionName)
} else {
collection = cluster.Bucket(connectionArgs.BucketName).DefaultCollection()
}

bucket := cluster.Bucket(connectionArgs.BucketName)
if err = bucket.WaitUntilReady(5*time.Second, nil); err != nil {
handler.Logger.Error("unable to connect to couchbase bucket", "error", err)
}

// Store the connection
handler.clusterConnections[sourceId] = collection
}

func handleDelTargetLink(handler *Handler, link provider.InterfaceLinkDefinition) error {
handler.Logger.Info("Handling del target link", "link", link)
delete(handler.linkedFrom, link.Target)
return nil
}

func handleHealthCheck(handler *Handler) string {
handler.Logger.Debug("Handling health check")
return "provider healthy"
}

func handleShutdown(handler *Handler) error {
handler.Logger.Info("Handling shutdown")
// clear(handler.linkedFrom)
return nil
}
Loading

0 comments on commit 7c5090b

Please sign in to comment.