diff --git a/pkg/node/node.go b/pkg/node/node.go index 388e874acc..7faf659d1f 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -230,7 +230,7 @@ func NewNode( return nil, pkgerrors.Wrap(err, "failed to create NATS client for node info store") } nodeInfoStore, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{ - BucketName: kvstore.DefaultBucketName, + BucketName: kvstore.BucketNameCurrent, Client: natsClient.Client, }) if err != nil { diff --git a/pkg/routing/kvstore/kvstore.go b/pkg/routing/kvstore/kvstore.go index f288adbb3c..0fc5b3a6da 100644 --- a/pkg/routing/kvstore/kvstore.go +++ b/pkg/routing/kvstore/kvstore.go @@ -17,7 +17,10 @@ import ( ) const ( - DefaultBucketName = "nodes" + // BucketNameCurrent is the bucket name for bacalhau version v1.3.1 and beyond. + BucketNameCurrent = "node_v1" + // BucketNameV0 is the bucket name for bacalhau version v1.3.0 and below. + BucketNameV0 = "nodes" ) type NodeStoreParams struct { @@ -40,6 +43,11 @@ func NewNodeStore(ctx context.Context, params NodeStoreParams) (*NodeStore, erro if bucketName == "" { return nil, pkgerrors.New("bucket name is required") } + + if err := migrateJetStreamBucket(ctx, js, BucketNameV0, bucketName, migrateNodeInfoToNodeState); err != nil { + return nil, err + } + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ Bucket: bucketName, }) diff --git a/pkg/routing/kvstore/migration_test.go b/pkg/routing/kvstore/migration_test.go new file mode 100644 index 0000000000..176f432699 --- /dev/null +++ b/pkg/routing/kvstore/migration_test.go @@ -0,0 +1,144 @@ +//go:build unit || !integration + +package kvstore_test + +import ( + "context" + "encoding/json" + "testing" + + "github.com/nats-io/nats-server/v2/server" + natsserver "github.com/nats-io/nats-server/v2/test" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/suite" + + "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/routing/kvstore" +) + +type KVMigrationSuite struct { + suite.Suite + nats *server.Server + client *nats.Conn + js jetstream.JetStream +} + +func (s *KVMigrationSuite) SetupTest() { + opts := &natsserver.DefaultTestOptions + opts.Port = TEST_PORT + opts.JetStream = true + opts.StoreDir = s.T().TempDir() + + s.nats = natsserver.RunServer(opts) + var err error + s.client, err = nats.Connect(s.nats.Addr().String()) + s.Require().NoError(err) + + s.js, err = jetstream.New(s.client) + s.Require().NoError(err) +} + +func (s *KVMigrationSuite) TearDownTest() { + s.nats.Shutdown() + s.client.Close() +} + +func TestKVMigrationSuite(t *testing.T) { + suite.Run(t, new(KVMigrationSuite)) +} + +func (s *KVMigrationSuite) TestMigrationFromNodeInfoToNodeState() { + ctx := context.Background() + + // Create 'from' bucket and populate it, simulating a requester on v130 with state to migrate. + fromKV, err := s.js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: kvstore.BucketNameV0}) + s.Require().NoError(err) + + nodeInfos := []models.NodeInfo{ + generateNodeInfo("node1", models.EngineDocker), + generateNodeInfo("node2", models.EngineWasm), + generateNodeInfo("node3", models.EngineDocker, models.EngineWasm), + } + + // populate bucket with models.NodeInfo, these will be migrated to models.NodeState + for _, n := range nodeInfos { + data, err := json.Marshal(n) + s.Require().NoError(err) + _, err = fromKV.Put(ctx, n.ID(), data) + s.Require().NoError(err) + } + + fromBucket := kvstore.BucketNameV0 + toBucket := kvstore.BucketNameCurrent + + // Open a NodeStore to trigger migration + ns, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{ + BucketName: toBucket, + Client: s.client, + }) + s.Require().NoError(err) + + // Assert the migrated data is correct + for _, ni := range nodeInfos { + ns, err := ns.Get(ctx, ni.ID()) + s.Require().NoError(err) + s.Equal(models.NodeStates.DISCONNECTED, ns.Connection) + s.Equal(models.NodeMembership.PENDING, ns.Membership) + s.Equal(ni, ns.Info) + } + + // Assert the from bucket has been cleaned up + _, err = s.js.KeyValue(ctx, fromBucket) + s.Require().Equal(jetstream.ErrBucketNotFound, err) +} + +func (s *KVMigrationSuite) TestMigrationStoreEmpty() { + ctx := context.Background() + + // Create an empty 'from' bucket + _, err := s.js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: kvstore.BucketNameV0}) + s.Require().NoError(err) + + fromBucket := kvstore.BucketNameV0 + toBucket := kvstore.BucketNameCurrent + + // Open a NodeStore to trigger migration, in this case there is a from bucket, but it's empty. + ns, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{ + BucketName: toBucket, + Client: s.client, + }) + s.Require().NoError(err) + + // Assert the from bucket has been cleaned up + _, err = s.js.KeyValue(ctx, fromBucket) + s.Require().Contains(err.Error(), "bucket not found") + + // Assert that no data was migrated since the from bucket was empty + resp, err := ns.List(ctx) + s.Require().NoError(err) + s.Require().Len(resp, 0) +} + +func (s *KVMigrationSuite) TestMigrationStoreDNE() { + ctx := context.Background() + + fromBucket := kvstore.BucketNameV0 + toBucket := kvstore.BucketNameCurrent + + // Open a NodeStore to trigger migration, in this case there isn't a from bucket to migrate from. + ns, err := kvstore.NewNodeStore(ctx, kvstore.NodeStoreParams{ + BucketName: toBucket, + Client: s.client, + }) + s.Require().NoError(err) + + // Assert the from bucket has been cleaned up + _, err = s.js.KeyValue(ctx, fromBucket) + s.Require().Contains(err.Error(), "bucket not found") + + // Assert that no data was migrated since the from bucket DNE (does not exist) + resp, err := ns.List(ctx) + s.Require().NoError(err) + s.Require().Len(resp, 0) +} diff --git a/pkg/routing/kvstore/migrations.go b/pkg/routing/kvstore/migrations.go new file mode 100644 index 0000000000..ffee0819d9 --- /dev/null +++ b/pkg/routing/kvstore/migrations.go @@ -0,0 +1,114 @@ +package kvstore + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/rs/zerolog/log" + + "github.com/bacalhau-project/bacalhau/pkg/models" +) + +func migrateNodeInfoToNodeState(entry jetstream.KeyValueEntry) ([]byte, error) { + var nodeinfo models.NodeInfo + if err := json.Unmarshal(entry.Value(), &nodeinfo); err != nil { + return nil, fmt.Errorf("failed to unmarshal node info: %w", err) + } + + nodestate := models.NodeState{ + Info: nodeinfo, + Membership: models.NodeMembership.PENDING, + Connection: models.NodeStates.DISCONNECTED, + } + migratedData, err := json.Marshal(nodestate) + if err != nil { + return nil, fmt.Errorf("failed to marshal node state: %w", err) + } + return migratedData, nil +} + +func migrateJetStreamBucket( + ctx context.Context, + js jetstream.JetStream, + from string, + to string, + migrateFunc func(entry jetstream.KeyValueEntry) ([]byte, error), +) (retErr error) { + defer func() { + if retErr == nil { + if err := js.DeleteKeyValue(ctx, from); err != nil { + if errors.Is(err, jetstream.ErrBucketNotFound) { + // migration is successful since there isn't previous state to migrate from + retErr = nil + } else { + retErr = fmt.Errorf("NodeStore migration succeeded, but failed to remove old bucket: %w", err) + } + } + } + }() + + fromKV, err := js.KeyValue(ctx, from) + if err != nil { + if errors.Is(err, jetstream.ErrBucketNotFound) { + // migration is successful since there isn't previous state to migrate from + return nil + } + return fmt.Errorf("NodeStore migration failed: failed to open 'from' bucket: %w", err) + } + + keys, err := fromKV.Keys(ctx) + if err != nil { + if errors.Is(err, jetstream.ErrNoKeysFound) { + // if the store is empty the migration is successful as there isn't anything to migrate + return nil + } + return fmt.Errorf("NodeStore migration failed: failed to list store: %w", err) + } + + start := time.Now() + log.Info().Str("from_bucket", from).Str("to_bucket", to).Msgf("Begin NodeStore migration") + toKV, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: to, + }) + if err != nil { + return fmt.Errorf("NodeStore migration failed: failed to open to bucket: %w", err) + } + + for _, key := range keys { + // Check if the key exists in the 'to' bucket + _, err := toKV.Get(ctx, key) + if err == nil { + // Key already exists in the 'to' bucket, skip to the next key + continue + } + if !errors.Is(err, jetstream.ErrKeyNotFound) { + // An unexpected error occurred while checking the key in the 'to' bucket + return fmt.Errorf("NodeStore migration failed: failed to check key in 'to' bucket: %w", err) + } + + // Read the entry from the 'from' bucket + entry, err := fromKV.Get(ctx, key) + if err != nil { + return fmt.Errorf("NodeStore migration failed: failed to read entry with key: %s: %w", key, err) + } + + // Apply the migration function + migratedData, err := migrateFunc(entry) + if err != nil { + return fmt.Errorf("NodeStore migration failed: %w", err) + } + + // Write the migrated data to the 'to' bucket + if _, err := toKV.Put(ctx, key, migratedData); err != nil { + return fmt.Errorf("NodeStore migration failed: failed to write migrated data to store: %w", err) + } + } + + log.Info().Str("from_bucket", from).Str("to_bucket", to).Str("duration", time.Since(start).String()). + Msgf("Completed NodeStore migration") + return nil +}