From bb5e7a532afdc5b5ea76595ecf1ad2d270fc8e9a Mon Sep 17 00:00:00 2001 From: Zohaib Date: Mon, 24 Jul 2023 10:59:57 +0300 Subject: [PATCH] Adding locking mechanism for snapshots to avoid race conditions This PR adds lease based locking that will allow only one of the instances to create and upload a snapshot to the configured storage. --- cfg/config.go | 1 + logstream/replicator.go | 27 +++++++ logstream/replicator_meta_store.go | 126 +++++++++++++++++++++++++++++ marmot.go | 6 +- stream/embedded_nats.go | 2 +- 5 files changed, 158 insertions(+), 4 deletions(-) create mode 100644 logstream/replicator_meta_store.go diff --git a/cfg/config.go b/cfg/config.go index a4fc55f..9a1ae5f 100644 --- a/cfg/config.go +++ b/cfg/config.go @@ -17,6 +17,7 @@ import ( type SnapshotStoreType string const NodeNamePrefix = "marmot-node" +const EmbeddedClusterName = "e-marmot" const ( Nats SnapshotStoreType = "nats" S3 = "s3" diff --git a/logstream/replicator.go b/logstream/replicator.go index 0a59b86..c671140 100644 --- a/logstream/replicator.go +++ b/logstream/replicator.go @@ -17,6 +17,8 @@ import ( const maxReplicateRetries = 7 const SnapshotShardID = uint64(1) +var SnapshotLeaseTTL = 10 * time.Second + type Replicator struct { nodeID uint64 shards uint64 @@ -25,6 +27,7 @@ type Replicator struct { client *nats.Conn repState *replicationState + metaStore *replicatorMetaStore snapshot snapshot.NatsSnapshot streamMap map[uint64]nats.JetStreamContext } @@ -85,6 +88,11 @@ func NewReplicator( return nil, err } + metaStore, err := newReplicatorMetaStore(cfg.EmbeddedClusterName, nc) + if err != nil { + return nil, err + } + return &Replicator{ client: nc, nodeID: nodeID, @@ -95,6 +103,7 @@ func NewReplicator( streamMap: streamMap, snapshot: snapshot, repState: repState, + metaStore: metaStore, }, nil } @@ -221,6 +230,24 @@ func (r *Replicator) LastSaveSnapshotTime() time.Time { } func (r *Replicator) SaveSnapshot() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + locked, err := r.metaStore.ContextRefreshingLease("snapshot", SnapshotLeaseTTL, ctx) + if err != nil { + log.Warn().Err(err).Msg("Error acquiring snapshot lock") + return + } + + if !locked { + log.Info().Msg("Snapshot saving already locked, skipping") + return + } + + r.ForceSaveSnapshot() +} + +func (r *Replicator) ForceSaveSnapshot() { if r.snapshot == nil { return } diff --git a/logstream/replicator_meta_store.go b/logstream/replicator_meta_store.go new file mode 100644 index 0000000..866720d --- /dev/null +++ b/logstream/replicator_meta_store.go @@ -0,0 +1,126 @@ +package logstream + +import ( + "context" + "time" + + "github.com/fxamacker/cbor/v2" + "github.com/maxpert/marmot/cfg" + "github.com/nats-io/nats.go" + "github.com/rs/zerolog/log" +) + +type replicatorMetaStore struct { + nats.KeyValue +} + +type replicatorLockInfo struct { + NodeID uint64 + Timestamp int64 +} + +func newReplicatorMetaStore(name string, nc *nats.Conn) (*replicatorMetaStore, error) { + jsx, err := nc.JetStream() + if err != nil { + return nil, err + } + + kv, err := jsx.KeyValue(name) + if err == nats.ErrBucketNotFound { + kv, err = jsx.CreateKeyValue(&nats.KeyValueConfig{ + Storage: nats.FileStorage, + Bucket: name, + Replicas: cfg.Config.ReplicationLog.Replicas, + }) + } + + if err != nil { + return nil, err + } + + return &replicatorMetaStore{KeyValue: kv}, nil +} + +func (m *replicatorMetaStore) AcquireLease(name string, duration time.Duration) (bool, error) { + now := time.Now().UnixMilli() + info := &replicatorLockInfo{ + NodeID: cfg.Config.NodeID, + Timestamp: now, + } + payload, err := info.Serialize() + if err != nil { + return false, err + } + + entry, err := m.Get(name) + if err == nats.ErrKeyNotFound { + rev := uint64(0) + rev, err = m.Create(name, payload) + if rev != 0 && err == nil { + return true, nil + } + } + + if err != nil { + return false, err + } + + err = info.DeserializeFrom(entry.Value()) + if err != nil { + return false, err + } + + if info.NodeID != cfg.Config.NodeID && info.Timestamp+duration.Milliseconds() > now { + return false, err + } + + _, err = m.Update(name, payload, entry.Revision()) + if err != nil { + return false, err + } + + return true, nil +} + +func (m *replicatorMetaStore) ContextRefreshingLease( + name string, + duration time.Duration, + ctx context.Context, +) (bool, error) { + locked, err := m.AcquireLease(name, duration) + go func(locked bool, err error) { + if !locked || err != nil { + return + } + + refresh := time.NewTicker(duration / 2) + for { + locked, err = m.AcquireLease(name, duration) + if err != nil { + log.Warn().Err(err).Str("name", name).Msg("Error acquiring lease") + return + } else if !locked { + log.Warn().Str("name", name).Msg("Unable to acquire lease") + return + } + + refresh.Reset(duration / 2) + select { + case <-refresh.C: + continue + case <-ctx.Done(): + return + } + } + }(locked, err) + + return locked, err +} + +func (r *replicatorLockInfo) Serialize() ([]byte, error) { + return cbor.Marshal(r) +} + +func (r *replicatorLockInfo) DeserializeFrom(data []byte) error { + return cbor.Unmarshal(data, r) +} diff --git a/marmot.go b/marmot.go index 78e63d7..a3f8ab9 100644 --- a/marmot.go +++ b/marmot.go @@ -77,7 +77,7 @@ func main() { } if *cfg.SaveSnapshotFlag { - replicator.SaveSnapshot() + replicator.ForceSaveSnapshot() return } @@ -144,7 +144,7 @@ func main() { log.Info(). Time("last_snapshot", lastSnapshotTime). Dur("duration", now.Sub(lastSnapshotTime)). - Msg("Saving timer based snapshot") + Msg("Triggering timer based snapshot save") replicator.SaveSnapshot() } } @@ -153,7 +153,7 @@ func main() { ctxSt.Cancel() if cfg.Config.Snapshot.Enable && cfg.Config.Publish { log.Info().Msg("Saving snapshot before going to sleep") - replicator.SaveSnapshot() + replicator.ForceSaveSnapshot() } os.Exit(0) diff --git a/stream/embedded_nats.go b/stream/embedded_nats.go index 2a71742..95ddddf 100644 --- a/stream/embedded_nats.go +++ b/stream/embedded_nats.go @@ -54,7 +54,7 @@ func startEmbeddedServer(nodeName string) (*embeddedNats, error) { JetStreamMaxMemory: -1, JetStreamMaxStore: -1, Cluster: server.ClusterOpts{ - Name: "e-marmot", + Name: cfg.EmbeddedClusterName, }, LeafNode: server.LeafNodeOpts{}, }