diff --git a/.travis.yml b/.travis.yml index bbac2d4b905..e3c049581d6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -55,7 +55,7 @@ script: ./scripts/runTestsOnTravis.sh $TEST_SUITE deploy: provider: script cleanup: true - script: curl -sL http://git.io/goreleaser | bash + script: curl -sfL https://goreleaser.com/static/run | VERSION=v1.26.2 bash on: tags: true condition: ($TRAVIS_GO_VERSION =~ 1.22) && ($TEST_SUITE = "compile") diff --git a/server/client.go b/server/client.go index 7171375e448..5d4b57131d7 100644 --- a/server/client.go +++ b/server/client.go @@ -2950,8 +2950,10 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri return nil, fmt.Errorf(errs) } - // Update our route map here. - c.srv.updateRemoteSubscription(im.acc, &nsub, 1) + // Update our route map here. But only if we are not a leaf node or a hub leafnode. + if c.kind != LEAF || c.isHubLeafNode() { + c.srv.updateRemoteSubscription(im.acc, &nsub, 1) + } return &nsub, nil } diff --git a/server/consumer.go b/server/consumer.go index afb8715488c..a61fd542585 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -711,7 +711,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } mset.mu.RLock() - s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc + s, jsa, cfg, acc := mset.srv, mset.jsa, mset.cfg, mset.acc retention := cfg.Retention mset.mu.RUnlock() @@ -726,10 +726,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerConfigRequiredError() } - jsa.usageMu.RLock() - selectedLimits, limitsFound := jsa.limits[tierName] - jsa.usageMu.RUnlock() - if !limitsFound { + selectedLimits, _, _, _ := acc.selectLimits(config.replicas(&cfg)) + if selectedLimits == nil { return nil, NewJSNoLimitsError() } @@ -737,10 +735,10 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // Make sure we have sane defaults. Do so with the JS lock, otherwise a // badly timed meta snapshot can result in a race condition. mset.js.mu.Lock() - setConsumerConfigDefaults(config, &mset.cfg, srvLim, &selectedLimits) + setConsumerConfigDefaults(config, &mset.cfg, srvLim, selectedLimits) mset.js.mu.Unlock() - if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits, isRecovering); err != nil { + if err := checkConsumerCfg(config, srvLim, &cfg, acc, selectedLimits, isRecovering); err != nil { return nil, err } sampleFreq := 0 diff --git a/server/jetstream.go b/server/jetstream.go index 2b877ad8625..4ace6731ce2 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1440,7 +1440,11 @@ func (a *Account) maxBytesLimits(cfg *StreamConfig) (bool, int64) { return false, 0 } jsa.usageMu.RLock() - selectedLimits, _, ok := jsa.selectLimits(cfg) + var replicas int + if cfg != nil { + replicas = cfg.Replicas + } + selectedLimits, _, ok := jsa.selectLimits(replicas) jsa.usageMu.RUnlock() if !ok { return false, 0 @@ -1590,7 +1594,7 @@ func diffCheckedLimits(a, b map[string]JetStreamAccountLimits) map[string]JetStr func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) { for _, mset := range jsa.streams { cfg := &mset.cfg - if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 { + if tier == _EMPTY_ || tier == tierName(cfg.Replicas) && cfg.MaxBytes > 0 { switch cfg.Storage { case FileStorage: store += uint64(cfg.MaxBytes) @@ -1607,7 +1611,7 @@ func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) { func reservedStorage(sas map[string]*streamAssignment, tier string) (mem, store uint64) { for _, sa := range sas { cfg := sa.Config - if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 { + if tier == _EMPTY_ || tier == tierName(cfg.Replicas) && cfg.MaxBytes > 0 { switch cfg.Storage { case FileStorage: store += uint64(cfg.MaxBytes) @@ -1695,17 +1699,29 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats { stats.ReservedMemory, stats.ReservedStore = reservedStorage(sas, _EMPTY_) } for _, sa := range sas { - stats.Consumers += len(sa.consumers) - if !defaultTier { - tier := tierName(sa.Config) - u, ok := stats.Tiers[tier] + if defaultTier { + stats.Consumers += len(sa.consumers) + } else { + stats.Streams++ + streamTier := tierName(sa.Config.Replicas) + su, ok := stats.Tiers[streamTier] if !ok { - u = JetStreamTier{} + su = JetStreamTier{} + } + su.Streams++ + stats.Tiers[streamTier] = su + + // Now consumers, check each since could be different tiers. + for _, ca := range sa.consumers { + stats.Consumers++ + consumerTier := tierName(ca.Config.replicas(sa.Config)) + cu, ok := stats.Tiers[consumerTier] + if !ok { + cu = JetStreamTier{} + } + cu.Consumers++ + stats.Tiers[consumerTier] = cu } - u.Streams++ - stats.Streams++ - u.Consumers += len(sa.consumers) - stats.Tiers[tier] = u } } } else { @@ -2089,9 +2105,8 @@ func (js *jetStream) limitsExceeded(storeType StorageType) bool { return js.wouldExceedLimits(storeType, 0) } -func tierName(cfg *StreamConfig) string { +func tierName(replicas int) string { // TODO (mh) this is where we could select based off a placement tag as well "qos:tier" - replicas := cfg.Replicas if replicas == 0 { replicas = 1 } @@ -2111,11 +2126,11 @@ func (jsa *jsAccount) jetStreamAndClustered() (*jetStream, bool) { } // jsa.usageMu read lock should be held. -func (jsa *jsAccount) selectLimits(cfg *StreamConfig) (JetStreamAccountLimits, string, bool) { +func (jsa *jsAccount) selectLimits(replicas int) (JetStreamAccountLimits, string, bool) { if selectedLimits, ok := jsa.limits[_EMPTY_]; ok { return selectedLimits, _EMPTY_, true } - tier := tierName(cfg) + tier := tierName(replicas) if selectedLimits, ok := jsa.limits[tier]; ok { return selectedLimits, tier, true } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 99dd719fdd7..c675bd1d1c7 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -3267,7 +3267,11 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, } func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError { - selectedLimits, tier, jsa, apiErr := acc.selectLimits(cfg) + var replicas int + if cfg != nil { + replicas = cfg.Replicas + } + selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas) if apiErr != nil { return apiErr } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 344fa91e9d6..56e472762de 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5986,7 +5986,7 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*r return nil, errs } -func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) { +func (acc *Account) selectLimits(replicas int) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) { // Grab our jetstream account info. acc.mu.RLock() jsa := acc.js @@ -5997,7 +5997,7 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st } jsa.usageMu.RLock() - selectedLimits, tierName, ok := jsa.selectLimits(cfg) + selectedLimits, tierName, ok := jsa.selectLimits(replicas) jsa.usageMu.RUnlock() if !ok { @@ -6008,7 +6008,11 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st // Read lock needs to be held func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfig) *ApiError { - selectedLimits, tier, _, apiErr := acc.selectLimits(cfg) + var replicas int + if cfg != nil { + replicas = cfg.Replicas + } + selectedLimits, tier, _, apiErr := acc.selectLimits(replicas) if apiErr != nil { return apiErr } @@ -7145,7 +7149,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } - selectedLimits, _, _, apiErr := acc.selectLimits(&streamCfg) + selectedLimits, _, _, apiErr := acc.selectLimits(cfg.replicas(&streamCfg)) if apiErr != nil { resp.Error = apiErr s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) @@ -7202,7 +7206,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // If the consumer name is specified and we think it already exists, then // we're likely updating an existing consumer, so don't count it. Otherwise // we will incorrectly return NewJSMaximumConsumersLimitError for an update. - if oname != "" && cn == oname && sa.consumers[oname] != nil { + if oname != _EMPTY_ && cn == oname && sa.consumers[oname] != nil { continue } } diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index 4e261f41e45..f8f6466f69e 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -1460,3 +1460,75 @@ func TestJetStreamJWTHAStorageLimitsOnScaleAndUpdate(t *testing.T) { require_Equal(t, r3.ReservedMemory, 22*1024) // TEST9 require_Equal(t, r3.ReservedStore, 5*1024*1024) // TEST1-TEST6 } + +func TestJetStreamJWTClusteredTiersR3StreamWithR1ConsumersAndAccounting(t *testing.T) { + sysKp, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + newUser(t, sysKp) + + accKp, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Name = "acc" + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ + DiskStorage: 1100, Consumer: 10, Streams: 1} + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{ + DiskStorage: 1100, Consumer: 1, Streams: 1} + accJwt := encodeClaim(t, accClaim, aExpPub) + accCreds := newUser(t, accKp) + tmlp := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { + listen: 127.0.0.1:-1 + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + fmt.Sprintf(` + operator: %s + system_account: %s + resolver = MEMORY + resolver_preload = { + %s : %s + %s : %s + } + `, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt) + + c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds)) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Now make sure we can add in 10 R1 consumers. + for i := 1; i <= 10; i++ { + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: fmt.Sprintf("C-%d", i), + AckPolicy: nats.AckExplicitPolicy, + Replicas: 1, + }) + require_NoError(t, err) + } + + info, err := js.AccountInfo() + require_NoError(t, err) + + // Make sure we account for these properly. + r1 := info.Tiers["R1"] + r3 := info.Tiers["R3"] + + require_Equal(t, r1.Streams, 0) + require_Equal(t, r1.Consumers, 10) + require_Equal(t, r3.Streams, 1) + require_Equal(t, r3.Consumers, 0) +} diff --git a/server/jetstream_leafnode_test.go b/server/jetstream_leafnode_test.go index bb2337d7f47..f0fa78881f7 100644 --- a/server/jetstream_leafnode_test.go +++ b/server/jetstream_leafnode_test.go @@ -1242,3 +1242,85 @@ func TestJetStreamLeafNodeSvcImportExportCycle(t *testing.T) { _, err = js.Publish("foo", []byte("msg")) require_NoError(t, err) } + +func TestJetStreamLeafNodeJSClusterMigrateRecovery(t *testing.T) { + tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: hub, store_dir:", 1) + c := createJetStreamCluster(t, tmpl, "hub", _EMPTY_, 3, 12232, true) + defer c.shutdown() + + tmpl = strings.Replace(jsClusterTemplWithLeafNode, "store_dir:", "domain: leaf, store_dir:", 1) + lnc := c.createLeafNodesWithTemplateAndStartPort(tmpl, "leaf", 3, 23913) + defer lnc.shutdown() + + lnc.waitOnClusterReady() + for _, s := range lnc.servers { + s.setJetStreamMigrateOnRemoteLeaf() + } + + nc, _ := jsClientConnect(t, lnc.randomServer()) + defer nc.Close() + + ljs, err := nc.JetStream(nats.Domain("leaf")) + require_NoError(t, err) + + // Create an asset in the leafnode cluster. + si, err := ljs.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + require_Equal(t, si.Cluster.Name, "leaf") + require_NotEqual(t, si.Cluster.Leader, noLeader) + require_Equal(t, len(si.Cluster.Replicas), 2) + + // Count how many remotes each server in the leafnode cluster is + // supposed to have and then take them down. + remotes := map[*Server]int{} + for _, s := range lnc.servers { + remotes[s] += len(s.leafRemoteCfgs) + s.closeAndDisableLeafnodes() + checkLeafNodeConnectedCount(t, s, 0) + } + + // The Raft nodes in the leafnode cluster now need some time to + // notice that they're no longer receiving AEs from a leader, as + // they should have been forced into observer mode. Check that + // this is the case. + time.Sleep(maxElectionTimeout) + for _, s := range lnc.servers { + s.rnMu.RLock() + for name, n := range s.raftNodes { + // We don't expect the metagroup to have turned into an + // observer but all other assets should have done. + if name == defaultMetaGroupName { + require_False(t, n.IsObserver()) + } else { + require_True(t, n.IsObserver()) + } + } + s.rnMu.RUnlock() + } + + // Bring the leafnode connections back up. + for _, s := range lnc.servers { + s.reEnableLeafnodes() + checkLeafNodeConnectedCount(t, s, remotes[s]) + } + + // Wait for nodes to notice they are no longer in observer mode + // and to leave observer mode. + time.Sleep(maxElectionTimeout) + for _, s := range lnc.servers { + s.rnMu.RLock() + for _, n := range s.raftNodes { + require_False(t, n.IsObserver()) + } + s.rnMu.RUnlock() + } + + // Previously nodes would have left observer mode but then would + // have failed to elect a stream leader as they were stuck on a + // long election timer. Now this should work reliably. + lnc.waitOnStreamLeader(globalAccountName, "TEST") +} diff --git a/server/leafnode_test.go b/server/leafnode_test.go index aaabb988ef4..59563c2a7f1 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -7607,3 +7607,111 @@ func TestLeafNodeLoopDetectionOnActualLoop(t *testing.T) { t.Fatalf("Did not get any error regarding loop") } } + +// https://github.com/nats-io/nats-server/issues/5473 +func TestLeafNodeDupeDeliveryQueueSubAndPlainSub(t *testing.T) { + clusterCommonConf := ` + accounts: { + tenant: { + users: [ { user:t, password: t } ] + exports: [{stream: system-a.events.>}] + } + system-a: { + users: [ { user:sa, password: sa } ] + imports: [ + {stream: {subject: system-a.events.>, account: tenant}, prefix: tenant} + ] + } + $SYS: { users = [ {user: "s", password: "s"} ] } + } + leafnodes { + remotes: [{ + urls: [ "nats-leaf://sa:sa@127.0.0.1:17422" ] + account: system-a + }] + }` + + confCluster0 := createConfFile(t, []byte(fmt.Sprintf(` + server_name: a-0 + port: -1 + cluster: { + name: cluster-a + listen: 127.0.0.1:16122 + routes = [ nats://127.0.0.1:16123 ] + pool_size: -1 + } + %s`, clusterCommonConf))) + + confCluster1 := createConfFile(t, []byte(fmt.Sprintf(` + server_name: a-1 + port: -1 + cluster: { + name: cluster-a + listen: 127.0.0.1:16123 + routes = [ nats://127.0.0.1:16122 ] + pool_size: -1 + } + %s`, clusterCommonConf))) + + serverB := createConfFile(t, []byte(` + server_name: b + port: -1 + leafnodes: { port: 17422 } + accounts: { + system-a: { + users: [ { user: sa, password: sa } ] + exports: [{stream: *.system-a.>}] + } + system-b: { + users: [ { user: sb, password: sb } ] + imports: [ {stream: {subject: *.system-a.>, account: system-a }}] + } + $SYS: { users = [ {user: "s", password: "s"} ] } + }`)) + + // Start server B + srvB, _ := RunServerWithConfig(serverB) + defer srvB.Shutdown() + + // Start the cluster servers. + srvA0, _ := RunServerWithConfig(confCluster0) + defer srvA0.Shutdown() + // Make sure this is connected first before starting the second server in cluster A. + checkLeafNodeConnectedCount(t, srvB, 1) + // Start second A server. + srvA1, _ := RunServerWithConfig(confCluster1) + defer srvA1.Shutdown() + // Make sure they are routed together. + checkNumRoutes(t, srvA0, 1) + checkNumRoutes(t, srvA1, 1) + // Make sure each cluster server is connected to server B. + checkLeafNodeConnectedCount(t, srvB, 2) + + // Create plain subscriber on server B attached to system-b account. + ncB := natsConnect(t, srvB.ClientURL(), nats.UserInfo("sb", "sb")) + defer ncB.Close() + sub, err := ncB.SubscribeSync("*.system-a.events.>") + require_NoError(t, err) + // Create a new sub that has a queue group as well. + subq, err := ncB.QueueSubscribeSync("*.system-a.events.objectnotfound", "SYSB") + require_NoError(t, err) + ncB.Flush() + time.Sleep(250 * time.Millisecond) + + // Connect to cluster A + ncA := natsConnect(t, srvA0.ClientURL(), nats.UserInfo("t", "t")) + defer ncA.Close() + + err = ncA.Publish("system-a.events.objectnotfound", []byte("EventA")) + require_NoError(t, err) + ncA.Flush() + // Wait for them to be received. + time.Sleep(250 * time.Millisecond) + + n, _, err := sub.Pending() + require_NoError(t, err) + require_Equal(t, n, 1) + n, _, err = subq.Pending() + require_NoError(t, err) + require_Equal(t, n, 1) +} diff --git a/server/raft.go b/server/raft.go index 76653949912..68ab6a22443 100644 --- a/server/raft.go +++ b/server/raft.go @@ -156,25 +156,27 @@ type raft struct { llqrt time.Time // Last quorum lost time lsut time.Time // Last scale-up time - term uint64 // The current vote term - pterm uint64 // Previous term from the last snapshot - pindex uint64 // Previous index from the last snapshot - commit uint64 // Sequence number of the most recent commit - applied uint64 // Sequence number of the most recently applied commit - hcbehind bool // Were we falling behind at the last health check? (see: isCurrent) + term uint64 // The current vote term + pterm uint64 // Previous term from the last snapshot + pindex uint64 // Previous index from the last snapshot + commit uint64 // Index of the most recent commit + applied uint64 // Index of the most recently applied commit leader string // The ID of the leader vote string // Our current vote state lxfer bool // Are we doing a leadership transfer? + hcbehind bool // Were we falling behind at the last health check? (see: isCurrent) + s *Server // Reference to top-level server c *client // Internal client for subscriptions js *jetStream // JetStream, if running, to see if we are out of resources - dflag bool // Debug flag - pleader bool // Has the group ever had a leader? - observer bool // The node is observing, i.e. not participating in voting - extSt extensionState // Extension state + dflag bool // Debug flag + pleader bool // Has the group ever had a leader? + observer bool // The node is observing, i.e. not participating in voting + + extSt extensionState // Extension state psubj string // Proposals subject rpsubj string // Remove peers subject @@ -233,16 +235,18 @@ const ( hbIntervalDefault = 1 * time.Second lostQuorumIntervalDefault = hbIntervalDefault * 10 // 10 seconds lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds + observerModeIntervalDefault = 48 * time.Hour ) var ( - minElectionTimeout = minElectionTimeoutDefault - maxElectionTimeout = maxElectionTimeoutDefault - minCampaignTimeout = minCampaignTimeoutDefault - maxCampaignTimeout = maxCampaignTimeoutDefault - hbInterval = hbIntervalDefault - lostQuorumInterval = lostQuorumIntervalDefault - lostQuorumCheck = lostQuorumCheckIntervalDefault + minElectionTimeout = minElectionTimeoutDefault + maxElectionTimeout = maxElectionTimeoutDefault + minCampaignTimeout = minCampaignTimeoutDefault + maxCampaignTimeout = maxCampaignTimeoutDefault + hbInterval = hbIntervalDefault + lostQuorumInterval = lostQuorumIntervalDefault + lostQuorumCheck = lostQuorumCheckIntervalDefault + observerModeInterval = observerModeIntervalDefault ) type RaftConfig struct { @@ -873,7 +877,7 @@ func (n *raft) PauseApply() error { n.hcommit = n.commit // Also prevent us from trying to become a leader while paused and catching up. n.pobserver, n.observer = n.observer, true - n.resetElect(48 * time.Hour) + n.resetElect(observerModeInterval) return nil } @@ -1893,8 +1897,16 @@ func (n *raft) SetObserver(isObserver bool) { func (n *raft) setObserver(isObserver bool, extSt extensionState) { n.Lock() defer n.Unlock() + + wasObserver := n.observer n.observer = isObserver n.extSt = extSt + + // If we're leaving observer state then reset the election timer or + // we might end up waiting for up to the observerModeInterval. + if wasObserver && !isObserver { + n.resetElect(randCampaignTimeout()) + } } // processAppendEntries is called by the Raft state machine when there are @@ -1944,7 +1956,7 @@ func (n *raft) runAsFollower() { n.resetElectionTimeoutWithLock() n.debug("Not switching to candidate, no resources") } else if n.IsObserver() { - n.resetElectWithLock(48 * time.Hour) + n.resetElectWithLock(observerModeInterval) n.debug("Not switching to candidate, observer only") } else if n.isCatchingUp() { n.debug("Not switching to candidate, catching up") diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index f8f12eef602..127a3642c45 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -98,17 +98,26 @@ func (sg smGroup) unlockAll() { } // Create a raft group and place on numMembers servers at random. +// Filestore based. func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup { + return c.createRaftGroupEx(name, numMembers, smf, FileStorage) +} + +func (c *cluster) createMemRaftGroup(name string, numMembers int, smf smFactory) smGroup { + return c.createRaftGroupEx(name, numMembers, smf, MemoryStorage) +} + +func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory, st StorageType) smGroup { c.t.Helper() if numMembers > len(c.servers) { c.t.Fatalf("Members > Peers: %d vs %d", numMembers, len(c.servers)) } servers := append([]*Server{}, c.servers...) rand.Shuffle(len(servers), func(i, j int) { servers[i], servers[j] = servers[j], servers[i] }) - return c.createRaftGroupWithPeers(name, servers[:numMembers], smf) + return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, st) } -func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory) smGroup { +func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup { c.t.Helper() var sg smGroup @@ -122,12 +131,19 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s } for _, s := range servers { - fs, err := newFileStore( - FileStoreConfig{StoreDir: c.t.TempDir(), BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute}, - StreamConfig{Name: name, Storage: FileStorage}, - ) - require_NoError(c.t, err) - cfg := &RaftConfig{Name: name, Store: c.t.TempDir(), Log: fs} + var cfg *RaftConfig + if st == FileStorage { + fs, err := newFileStore( + FileStoreConfig{StoreDir: c.t.TempDir(), BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute}, + StreamConfig{Name: name, Storage: FileStorage}, + ) + require_NoError(c.t, err) + cfg = &RaftConfig{Name: name, Store: c.t.TempDir(), Log: fs} + } else { + ms, err := newMemStore(&StreamConfig{Name: name, Storage: MemoryStorage}) + require_NoError(c.t, err) + cfg = &RaftConfig{Name: name, Store: c.t.TempDir(), Log: ms} + } s.bootstrapRaftNode(cfg, peers, true) n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{}) require_NoError(c.t, err) @@ -243,13 +259,20 @@ func (a *stateAdder) restart() { // The filestore is stopped as well, so need to extract the parts to recreate it. rn := a.n.(*raft) - fs := rn.wal.(*fileStore) - var err error - a.cfg.Log, err = newFileStore(fs.fcfg, fs.cfg.StreamConfig) + + switch rn.wal.(type) { + case *fileStore: + fs := rn.wal.(*fileStore) + a.cfg.Log, err = newFileStore(fs.fcfg, fs.cfg.StreamConfig) + case *memStore: + ms := rn.wal.(*memStore) + a.cfg.Log, err = newMemStore(&ms.cfg) + } if err != nil { panic(err) } + a.n, err = a.s.startRaftNode(globalAccountName, a.cfg, pprofLabels{}) if err != nil { panic(err) diff --git a/server/raft_test.go b/server/raft_test.go index b12af87b790..5e5ab4b630c 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 The NATS Authors +// Copyright 2021-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -168,7 +168,7 @@ func TestNRGRecoverFromFollowingNoLeader(t *testing.T) { for _, n := range rg { rn := n.node().(*raft) rn.ApplyQ().drain() - rn.switchToFollower("") + rn.switchToFollower(noLeader) } // Resume the nodes. @@ -262,7 +262,7 @@ func TestNRGSimpleElection(t *testing.T) { msg := require_ChanRead(t, voteReqs, time.Second) vr := decodeVoteRequest(msg.Data, msg.Reply) require_True(t, vr != nil) - require_NotEqual(t, vr.candidate, "") + require_NotEqual(t, vr.candidate, _EMPTY_) // The leader should have bumped their term in order to start // an election. @@ -468,3 +468,53 @@ func TestNRGCandidateStepsDownAfterAE(t *testing.T) { return nil }) } + +// Test to make sure this does not cause us to truncate our wal or enter catchup state. +func TestNRGHeartbeatOnLeaderChange(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + for i := 0; i < 10; i++ { + // Restart the leader. + leader := rg.leader().(*stateAdder) + leader.proposeDelta(22) + leader.proposeDelta(-11) + leader.proposeDelta(-11) + rg.waitOnTotal(t, 0) + leader.stop() + leader.restart() + rg.waitOnLeader() + } +} + +func TestNRGElectionTimerAfterObserver(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + for _, n := range rg { + n.node().SetObserver(true) + } + + time.Sleep(maxElectionTimeout) + before := time.Now() + + for _, n := range rg { + n.node().SetObserver(false) + } + + time.Sleep(maxCampaignTimeout) + + for _, n := range rg { + rn := n.node().(*raft) + rn.RLock() + etlr := rn.etlr + rn.RUnlock() + require_True(t, etlr.After(before)) + } +} diff --git a/server/stream.go b/server/stream.go index e56926fc3fd..3f35f608e81 100644 --- a/server/stream.go +++ b/server/stream.go @@ -462,7 +462,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } } jsa.usageMu.RLock() - selected, tier, hasTier := jsa.selectLimits(&cfg) + selected, tier, hasTier := jsa.selectLimits(cfg.Replicas) jsa.usageMu.RUnlock() reserved := int64(0) if !isClustered { @@ -1666,9 +1666,9 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str jsa.mu.RLock() acc := jsa.account jsa.usageMu.RLock() - selected, tier, hasTier := jsa.selectLimits(&cfg) + selected, tier, hasTier := jsa.selectLimits(cfg.Replicas) if !hasTier && old.Replicas != cfg.Replicas { - selected, tier, hasTier = jsa.selectLimits(old) + selected, tier, hasTier = jsa.selectLimits(old.Replicas) } jsa.usageMu.RUnlock() reserved := int64(0) @@ -1903,7 +1903,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) js := mset.js - if targetTier := tierName(cfg); mset.tier != targetTier { + if targetTier := tierName(cfg.Replicas); mset.tier != targetTier { // In cases such as R1->R3, only one update is needed jsa.usageMu.RLock() _, ok := jsa.limits[targetTier] @@ -2191,9 +2191,11 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup) msgs.recycle(&ims) case <-t.C: mset.mu.RLock() + var stalled bool + if mset.mirror != nil { + stalled = time.Since(time.Unix(0, mset.mirror.last.Load())) > sourceHealthCheckInterval + } isLeader := mset.isLeader() - last := time.Unix(0, mset.mirror.last.Load()) - stalled := mset.mirror != nil && time.Since(last) > sourceHealthCheckInterval mset.mu.RUnlock() // No longer leader. if !isLeader { diff --git a/server/stree/leaf.go b/server/stree/leaf.go index 839450f2e40..119837ec260 100644 --- a/server/stree/leaf.go +++ b/server/stree/leaf.go @@ -18,16 +18,17 @@ import ( ) // Leaf node +// Order of struct fields for best memory alignment (as per govet/fieldalignment) type leaf[T any] struct { + value T // This could be the whole subject, but most likely just the suffix portion. // We will only store the suffix here and assume all prior prefix paths have // been checked once we arrive at this leafnode. suffix []byte - value T } func newLeaf[T any](suffix []byte, value T) *leaf[T] { - return &leaf[T]{copyBytes(suffix), value} + return &leaf[T]{value, copyBytes(suffix)} } func (n *leaf[T]) isLeaf() bool { return true } diff --git a/server/stree/node16.go b/server/stree/node16.go index 2d206afda79..7da5df89d99 100644 --- a/server/stree/node16.go +++ b/server/stree/node16.go @@ -14,10 +14,11 @@ package stree // Node with 16 children +// Order of struct fields for best memory alignment (as per govet/fieldalignment) type node16 struct { - meta child [16]node - key [16]byte + meta + key [16]byte } func newNode16(prefix []byte) *node16 { diff --git a/server/stree/node256.go b/server/stree/node256.go index fdadde0bc01..f5bf69bc93c 100644 --- a/server/stree/node256.go +++ b/server/stree/node256.go @@ -14,9 +14,10 @@ package stree // Node with 256 children +// Order of struct fields for best memory alignment (as per govet/fieldalignment) type node256 struct { - meta child [256]node + meta } func newNode256(prefix []byte) *node256 { diff --git a/server/stree/node4.go b/server/stree/node4.go index 2d48962545a..6aeb024abff 100644 --- a/server/stree/node4.go +++ b/server/stree/node4.go @@ -14,10 +14,11 @@ package stree // Node with 4 children +// Order of struct fields for best memory alignment (as per govet/fieldalignment) type node4 struct { - meta child [4]node - key [4]byte + meta + key [4]byte } func newNode4(prefix []byte) *node4 {