Skip to content

Commit

Permalink
fix: implement NodeStore migration (#4029)
Browse files Browse the repository at this point in the history
- fixes #4024

---------

Co-authored-by: frrist <[email protected]>
  • Loading branch information
frrist and frrist authored May 23, 2024
1 parent fa27190 commit 6917674
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func NewNode(
return nil, pkgerrors.Wrap(err, "failed to create NATS client for node info store")
}
nodeInfoStore, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{
BucketName: kvstore.DefaultBucketName,
BucketName: kvstore.BucketNameCurrent,
Client: natsClient.Client,
})
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion pkg/routing/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
)

const (
DefaultBucketName = "nodes"
// BucketNameCurrent is the bucket name for bacalhau version v1.3.1 and beyond.
BucketNameCurrent = "node_v1"
// BucketNameV0 is the bucket name for bacalhau version v1.3.0 and below.
BucketNameV0 = "nodes"
)

type NodeStoreParams struct {
Expand All @@ -40,6 +43,11 @@ func NewNodeStore(ctx context.Context, params NodeStoreParams) (*NodeStore, erro
if bucketName == "" {
return nil, pkgerrors.New("bucket name is required")
}

if err := migrateJetStreamBucket(ctx, js, BucketNameV0, bucketName, migrateNodeInfoToNodeState); err != nil {
return nil, err
}

kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: bucketName,
})
Expand Down
144 changes: 144 additions & 0 deletions pkg/routing/kvstore/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//go:build unit || !integration

package kvstore_test

import (
"context"
"encoding/json"
"testing"

"github.com/nats-io/nats-server/v2/server"
natsserver "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/stretchr/testify/suite"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/routing/kvstore"
)

type KVMigrationSuite struct {
suite.Suite
nats *server.Server
client *nats.Conn
js jetstream.JetStream
}

func (s *KVMigrationSuite) SetupTest() {
opts := &natsserver.DefaultTestOptions
opts.Port = TEST_PORT
opts.JetStream = true
opts.StoreDir = s.T().TempDir()

s.nats = natsserver.RunServer(opts)
var err error
s.client, err = nats.Connect(s.nats.Addr().String())
s.Require().NoError(err)

s.js, err = jetstream.New(s.client)
s.Require().NoError(err)
}

func (s *KVMigrationSuite) TearDownTest() {
s.nats.Shutdown()
s.client.Close()
}

func TestKVMigrationSuite(t *testing.T) {
suite.Run(t, new(KVMigrationSuite))
}

func (s *KVMigrationSuite) TestMigrationFromNodeInfoToNodeState() {
ctx := context.Background()

// Create 'from' bucket and populate it, simulating a requester on v130 with state to migrate.
fromKV, err := s.js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: kvstore.BucketNameV0})
s.Require().NoError(err)

nodeInfos := []models.NodeInfo{
generateNodeInfo("node1", models.EngineDocker),
generateNodeInfo("node2", models.EngineWasm),
generateNodeInfo("node3", models.EngineDocker, models.EngineWasm),
}

// populate bucket with models.NodeInfo, these will be migrated to models.NodeState
for _, n := range nodeInfos {
data, err := json.Marshal(n)
s.Require().NoError(err)
_, err = fromKV.Put(ctx, n.ID(), data)
s.Require().NoError(err)
}

fromBucket := kvstore.BucketNameV0
toBucket := kvstore.BucketNameCurrent

// Open a NodeStore to trigger migration
ns, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{
BucketName: toBucket,
Client: s.client,
})
s.Require().NoError(err)

// Assert the migrated data is correct
for _, ni := range nodeInfos {
ns, err := ns.Get(ctx, ni.ID())
s.Require().NoError(err)
s.Equal(models.NodeStates.DISCONNECTED, ns.Connection)
s.Equal(models.NodeMembership.PENDING, ns.Membership)
s.Equal(ni, ns.Info)
}

// Assert the from bucket has been cleaned up
_, err = s.js.KeyValue(ctx, fromBucket)
s.Require().Equal(jetstream.ErrBucketNotFound, err)
}

func (s *KVMigrationSuite) TestMigrationStoreEmpty() {
ctx := context.Background()

// Create an empty 'from' bucket
_, err := s.js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: kvstore.BucketNameV0})
s.Require().NoError(err)

fromBucket := kvstore.BucketNameV0
toBucket := kvstore.BucketNameCurrent

// Open a NodeStore to trigger migration, in this case there is a from bucket, but it's empty.
ns, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{
BucketName: toBucket,
Client: s.client,
})
s.Require().NoError(err)

// Assert the from bucket has been cleaned up
_, err = s.js.KeyValue(ctx, fromBucket)
s.Require().Contains(err.Error(), "bucket not found")

// Assert that no data was migrated since the from bucket was empty
resp, err := ns.List(ctx)
s.Require().NoError(err)
s.Require().Len(resp, 0)
}

func (s *KVMigrationSuite) TestMigrationStoreDNE() {
ctx := context.Background()

fromBucket := kvstore.BucketNameV0
toBucket := kvstore.BucketNameCurrent

// Open a NodeStore to trigger migration, in this case there isn't a from bucket to migrate from.
ns, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{
BucketName: toBucket,
Client: s.client,
})
s.Require().NoError(err)

// Assert the from bucket has been cleaned up
_, err = s.js.KeyValue(ctx, fromBucket)
s.Require().Contains(err.Error(), "bucket not found")

// Assert that no data was migrated since the from bucket DNE (does not exist)
resp, err := ns.List(ctx)
s.Require().NoError(err)
s.Require().Len(resp, 0)
}
114 changes: 114 additions & 0 deletions pkg/routing/kvstore/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package kvstore

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/nats-io/nats.go/jetstream"
"github.com/rs/zerolog/log"

"github.com/bacalhau-project/bacalhau/pkg/models"
)

func migrateNodeInfoToNodeState(entry jetstream.KeyValueEntry) ([]byte, error) {
var nodeinfo models.NodeInfo
if err := json.Unmarshal(entry.Value(), &nodeinfo); err != nil {
return nil, fmt.Errorf("failed to unmarshal node info: %w", err)
}

nodestate := models.NodeState{
Info: nodeinfo,
Membership: models.NodeMembership.PENDING,
Connection: models.NodeStates.DISCONNECTED,
}
migratedData, err := json.Marshal(nodestate)
if err != nil {
return nil, fmt.Errorf("failed to marshal node state: %w", err)
}
return migratedData, nil
}

func migrateJetStreamBucket(
ctx context.Context,
js jetstream.JetStream,
from string,
to string,
migrateFunc func(entry jetstream.KeyValueEntry) ([]byte, error),
) (retErr error) {
defer func() {
if retErr == nil {
if err := js.DeleteKeyValue(ctx, from); err != nil {
if errors.Is(err, jetstream.ErrBucketNotFound) {
// migration is successful since there isn't previous state to migrate from
retErr = nil
} else {
retErr = fmt.Errorf("NodeStore migration succeeded, but failed to remove old bucket: %w", err)
}
}
}
}()

fromKV, err := js.KeyValue(ctx, from)
if err != nil {
if errors.Is(err, jetstream.ErrBucketNotFound) {
// migration is successful since there isn't previous state to migrate from
return nil
}
return fmt.Errorf("NodeStore migration failed: failed to open 'from' bucket: %w", err)
}

keys, err := fromKV.Keys(ctx)
if err != nil {
if errors.Is(err, jetstream.ErrNoKeysFound) {
// if the store is empty the migration is successful as there isn't anything to migrate
return nil
}
return fmt.Errorf("NodeStore migration failed: failed to list store: %w", err)
}

start := time.Now()
log.Info().Str("from_bucket", from).Str("to_bucket", to).Msgf("Begin NodeStore migration")
toKV, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: to,
})
if err != nil {
return fmt.Errorf("NodeStore migration failed: failed to open to bucket: %w", err)
}

for _, key := range keys {
// Check if the key exists in the 'to' bucket
_, err := toKV.Get(ctx, key)
if err == nil {
// Key already exists in the 'to' bucket, skip to the next key
continue
}
if !errors.Is(err, jetstream.ErrKeyNotFound) {
// An unexpected error occurred while checking the key in the 'to' bucket
return fmt.Errorf("NodeStore migration failed: failed to check key in 'to' bucket: %w", err)
}

// Read the entry from the 'from' bucket
entry, err := fromKV.Get(ctx, key)
if err != nil {
return fmt.Errorf("NodeStore migration failed: failed to read entry with key: %s: %w", key, err)
}

// Apply the migration function
migratedData, err := migrateFunc(entry)
if err != nil {
return fmt.Errorf("NodeStore migration failed: %w", err)
}

// Write the migrated data to the 'to' bucket
if _, err := toKV.Put(ctx, key, migratedData); err != nil {
return fmt.Errorf("NodeStore migration failed: failed to write migrated data to store: %w", err)
}
}

log.Info().Str("from_bucket", from).Str("to_bucket", to).Str("duration", time.Since(start).String()).
Msgf("Completed NodeStore migration")
return nil
}

0 comments on commit 6917674

Please sign in to comment.