diff --git a/go.mod b/go.mod index d138e7c12..b4934129f 100644 --- a/go.mod +++ b/go.mod @@ -17,3 +17,5 @@ require ( ) go 1.13 + +replace github.com/nsqio/go-nsq v1.0.8 => github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90 diff --git a/go.sum b/go.sum index ee27b8b10..f518717ad 100644 --- a/go.sum +++ b/go.sum @@ -21,13 +21,13 @@ github.com/mreiferson/go-options v1.0.0 h1:RMLidydGlDWpL+lQTXo0bVIf/XT2CTq7AEJMo github.com/mreiferson/go-options v1.0.0/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w= github.com/nsqio/go-diskqueue v1.0.0 h1:XRqpx7zTMu9yNVH+cHvA5jEiPNKoYcyEsCVqXP3eFg4= github.com/nsqio/go-diskqueue v1.0.0/go.mod h1:INuJIxl4ayUsyoNtHL5+9MFPDfSZ0zY93hNY6vhBRsI= -github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk= -github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90 h1:js7rqe9IqkTNFcyXjbmuOo0nV3Z2HBM4yNc8SWqByUs= +github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 h1:Dho5nD6R3PcW2SH1or8vS0dszDaXRxIw55lBX7XiE5g= golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/internal/clusterinfo/data.go b/internal/clusterinfo/data.go index ef0b4e15a..3940b27c2 100644 --- a/internal/clusterinfo/data.go +++ b/internal/clusterinfo/data.go @@ -119,14 +119,14 @@ func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, err // GetLookupdTopicChannels returns a []string containing a union of all the channels // from all the given lookupd for the given topic -func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error) { - var channels []string +func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]ChannelState, error) { var lock sync.Mutex var wg sync.WaitGroup var errs []error - type respType struct { - Channels []string `json:"channels"` + topicChannelsMeta := &TopicChannelsMeta{ + Channels: []string{}, + ChannelsMeta: map[string]*ChannelMeta{}, } for _, addr := range lookupdHTTPAddrs { @@ -137,7 +137,7 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s endpoint := fmt.Sprintf("http://%s/channels?topic=%s", addr, url.QueryEscape(topic)) c.logf("CI: querying nsqlookupd %s", endpoint) - var resp respType + var resp TopicChannelsMeta err := c.client.GETV1(endpoint, &resp) if err != nil { lock.Lock() @@ -148,7 +148,15 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s lock.Lock() defer lock.Unlock() - channels = append(channels, resp.Channels...) + topicChannelsMeta.Channels = append(topicChannelsMeta.Channels, resp.Channels...) + for channelName, channelMeta := range resp.ChannelsMeta { + if curMeta, ok := topicChannelsMeta.ChannelsMeta[channelName]; ok { + if curMeta != nil && curMeta.Paused == true { + continue //one of the lookupd has returned paused,so just continue + } + } + topicChannelsMeta.ChannelsMeta[channelName] = channelMeta + } }(addr) } wg.Wait() @@ -157,13 +165,73 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s return nil, fmt.Errorf("Failed to query any nsqlookupd: %s", ErrList(errs)) } - channels = stringy.Uniq(channels) - sort.Strings(channels) + topicChannelsMeta.Channels = stringy.Uniq(topicChannelsMeta.Channels) + sort.Strings(topicChannelsMeta.Channels) + + var channelStates []ChannelState + for _, channelName := range topicChannelsMeta.Channels { + channelState := ChannelState{ + Name: channelName, + Paused: false, + } + if meta, ok := topicChannelsMeta.ChannelsMeta[channelName]; ok && meta != nil { + channelState.Paused = meta.Paused + } + channelStates = append(channelStates, channelState) + } + + if len(errs) > 0 { + return channelStates, ErrList(errs) + } + return channelStates, nil +} + +// GetLookupdTopic return a topicMeta info from all the given lookupd for the given topic +func (c *ClusterInfo) GetLookupdTopic(topic string, lookupdHTTPAddrs []string) (TopicMeta, error) { + var lock sync.Mutex + var wg sync.WaitGroup + var errs []error + + topicMeta := TopicMeta{} + type respType struct { + Topics []string `json:"topics"` + TopicsMeta map[string]*TopicMeta `json:"topics_meta"` + } + for _, addr := range lookupdHTTPAddrs { + wg.Add(1) + go func(addr string) { + defer wg.Done() + + endpoint := fmt.Sprintf("http://%s/topics?topic=%s", addr, url.QueryEscape(topic)) + c.logf("CI: querying nsqlookupd %s", endpoint) + + var resp respType + err := c.client.GETV1(endpoint, &resp) + if err != nil { + lock.Lock() + errs = append(errs, err) + lock.Unlock() + return + } + lock.Lock() + defer lock.Unlock() + if metaData, ok := resp.TopicsMeta[topic]; ok { + //if one of the lookupd return paused, that should be paused + if metaData != nil && metaData.Paused == true && topicMeta.Paused == false { + topicMeta.Paused = true + } + } + }(addr) + } + wg.Wait() + if len(errs) == len(lookupdHTTPAddrs) { + return topicMeta, fmt.Errorf("Failed to query any nsqlookupd: %s", ErrList(errs)) + } if len(errs) > 0 { - return channels, ErrList(errs) + return topicMeta, ErrList(errs) } - return channels, nil + return topicMeta, nil } // GetLookupdProducers returns Producers of all the nsqd connected to the given lookupds diff --git a/internal/clusterinfo/types.go b/internal/clusterinfo/types.go index 84c5f23bf..0c915585f 100644 --- a/internal/clusterinfo/types.go +++ b/internal/clusterinfo/types.go @@ -304,3 +304,21 @@ type ProducersByHost struct { func (c ProducersByHost) Less(i, j int) bool { return c.Producers[i].Hostname < c.Producers[j].Hostname } + +type ChannelState struct { + Name string + Paused bool +} + +type ChannelMeta struct { + Paused bool `json:"paused"` +} + +type TopicMeta struct { + Paused bool `json:"paused"` +} + +type TopicChannelsMeta struct { + Channels []string `json:"channels"` + ChannelsMeta map[string]*ChannelMeta `json:"channels_meta"` +} diff --git a/nsqadmin/http.go b/nsqadmin/http.go index 9a5e28022..bc2eaf8bd 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -228,9 +228,13 @@ func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps producers, _ := s.ci.GetLookupdTopicProducers( topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) if len(producers) == 0 { - topicChannels, _ := s.ci.GetLookupdTopicChannels( + var channels []string + channelStates, _ := s.ci.GetLookupdTopicChannels( topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) - topicChannelMap[topicName] = topicChannels + for _, channelState := range channelStates { + channels = append(channels, channelState.Name) + } + topicChannelMap[topicName] = channels } } respond: diff --git a/nsqadmin/http_test.go b/nsqadmin/http_test.go index 9dc20960c..24711272f 100644 --- a/nsqadmin/http_test.go +++ b/nsqadmin/http_test.go @@ -23,6 +23,10 @@ type TopicsDoc struct { Topics []interface{} `json:"topics"` } +type TopicsInactiveDoc struct { + Topics map[string][]string `json:"topics"` +} + type TopicStatsDoc struct { *clusterinfo.TopicStats Message string `json:"message"` @@ -177,6 +181,20 @@ func TestHTTPTopicsGET(t *testing.T) { test.Nil(t, err) test.Equal(t, 1, len(tr.Topics)) test.Equal(t, topicName, tr.Topics[0]) + + url = fmt.Sprintf("http://%s/api/topics?inactive=true", nsqadmin1.RealHTTPAddr()) + req, _ = http.NewRequest("GET", url, nil) + resp, err = client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + ti := TopicsInactiveDoc{} + err = json.Unmarshal(body, &ti) + test.Nil(t, err) + test.Equal(t, 0, len(ti.Topics)) } func TestHTTPTopicGET(t *testing.T) { diff --git a/nsqd/channel.go b/nsqd/channel.go index 3fc931c38..c7144237e 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -117,7 +117,7 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD, ) } - c.nsqd.Notify(c) + c.nsqd.Notify(notifyContext{NotifyTypeRegistration, c}) return c } @@ -164,7 +164,7 @@ func (c *Channel) exit(deleted bool) error { // since we are explicitly deleting a channel (not just at system exit time) // de-register this from the lookupd - c.nsqd.Notify(c) + c.nsqd.Notify(notifyContext{NotifyTypeUnRegistration, c}) } else { c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name) } @@ -256,11 +256,15 @@ func (c *Channel) Depth() int64 { } func (c *Channel) Pause() error { - return c.doPause(true) + err := c.doPause(true) + c.nsqd.Notify(notifyContext{NotifyTypeStateUpdate, c}) + return err } func (c *Channel) UnPause() error { - return c.doPause(false) + err := c.doPause(false) + c.nsqd.Notify(notifyContext{NotifyTypeStateUpdate, c}) + return err } func (c *Channel) doPause(pause bool) error { diff --git a/nsqd/lookup.go b/nsqd/lookup.go index 953a450eb..2b2308847 100644 --- a/nsqd/lookup.go +++ b/nsqd/lookup.go @@ -3,6 +3,7 @@ package nsqd import ( "bytes" "encoding/json" + "fmt" "net" "os" "strconv" @@ -12,6 +13,17 @@ import ( "github.com/nsqio/nsq/internal/version" ) +const ( + NotifyTypeRegistration = iota + NotifyTypeUnRegistration + NotifyTypeStateUpdate +) + +type notifyContext struct { + notifyType int + v interface{} +} + func connectCallback(n *NSQD, hostname string) func(*lookupPeer) { return func(lp *lookupPeer) { ci := make(map[string]interface{}) @@ -53,11 +65,26 @@ func connectCallback(n *NSQD, hostname string) func(*lookupPeer) { n.RLock() for _, topic := range n.topicMap { topic.RLock() - if len(topic.channelMap) == 0 { - commands = append(commands, nsq.Register(topic.name, "")) - } else { - for _, channel := range topic.channelMap { - commands = append(commands, nsq.Register(channel.topicName, channel.name)) + commands = append(commands, nsq.Register(topic.name, "")) + topicPaused := topic.IsPaused() + if topicPaused { //sync state when topic paused + command, err := nsq.SyncState(topic.name, "", map[string]interface{}{"paused": topicPaused}) + if err != nil { + n.logf(LOG_ERROR, "LOOKUPD(%s): SyncState - %s", lp, err) + } else { + commands = append(commands, command) + } + } + for _, channel := range topic.channelMap { + commands = append(commands, nsq.Register(channel.topicName, channel.name)) + channelPaused := channel.IsPaused() + if channelPaused { //sync state when channel paused + command, err := nsq.SyncState(channel.topicName, channel.name, map[string]interface{}{"paused": channelPaused}) + if err != nil { + n.logf(LOG_ERROR, "LOOKUPD(%s): SyncState - %s", lp, err) + continue + } + commands = append(commands, command) } } topic.RUnlock() @@ -118,29 +145,61 @@ func (n *NSQD) lookupLoop() { } case val := <-n.notifyChan: var cmd *nsq.Command + var err error var branch string - - switch val.(type) { - case *Channel: - // notify all nsqlookupds that a new channel exists, or that it's removed - branch = "channel" - channel := val.(*Channel) - if channel.Exiting() == true { - cmd = nsq.UnRegister(channel.topicName, channel.name) - } else { + notifyCtx, ok := val.(notifyContext) + if !ok { + panic("non-notifyContext sent to notifyChan - should never happen") + } + switch notifyCtx.notifyType { + case NotifyTypeRegistration: + switch notifyCtx.v.(type) { + case *Channel: + // notify all nsqlookupds that a new channel exists + branch = "channel" + channel := notifyCtx.v.(*Channel) cmd = nsq.Register(channel.topicName, channel.name) + case *Topic: + // notify all nsqlookupds that a new topic exists + branch = "topic" + topic := notifyCtx.v.(*Topic) + cmd = nsq.Register(topic.name, "") } - case *Topic: - // notify all nsqlookupds that a new topic exists, or that it's removed - branch = "topic" - topic := val.(*Topic) - if topic.Exiting() == true { + case NotifyTypeUnRegistration: + switch notifyCtx.v.(type) { + case *Channel: + // notify all nsqlookupds that a new channel removed + branch = "channel" + channel := notifyCtx.v.(*Channel) + cmd = nsq.UnRegister(channel.topicName, channel.name) + case *Topic: + // notify all nsqlookupds that a new topic removed + branch = "topic" + topic := notifyCtx.v.(*Topic) cmd = nsq.UnRegister(topic.name, "") - } else { - cmd = nsq.Register(topic.name, "") } + case NotifyTypeStateUpdate: + switch notifyCtx.v.(type) { + case *Channel: + // notify all nsqlookupds that channel state changed + branch = "channel" + channel := notifyCtx.v.(*Channel) + cmd, err = nsq.SyncState(channel.topicName, channel.name, map[string]interface{}{"paused": channel.IsPaused()}) + if err != nil { + n.logf(LOG_ERROR, "NSQD: build cmd err: %s", err) + } + case *Topic: + // notify all nsqlookupds that topic state changed + branch = "topic" + topic := notifyCtx.v.(*Topic) + cmd, err = nsq.SyncState(topic.name, "", map[string]interface{}{"paused": topic.IsPaused()}) + if err != nil { + n.logf(LOG_ERROR, "NSQD: build cmd err: %s", err) + } + } + default: + panic(fmt.Sprintf("unknown notifyType in notifyContext: %d, should never happen", notifyCtx.notifyType)) } - for _, lookupPeer := range lookupPeers { n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd) _, err := lookupPeer.Command(cmd) diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 390e1a654..9435f04d2 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -497,15 +497,28 @@ func (n *NSQD) GetTopic(topicName string) *Topic { // this makes sure that any message received is buffered to the right channels lookupdHTTPAddrs := n.lookupdHTTPAddrs() if len(lookupdHTTPAddrs) > 0 { - channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs) + topicMeta, err := n.ci.GetLookupdTopic(t.name, lookupdHTTPAddrs) if err != nil { - n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err) + n.logf(LOG_WARN, "failed to query nsqlookupd for metadata to pre-create for topic %s - %s", t.name, err) } - for _, channelName := range channelNames { - if strings.HasSuffix(channelName, "#ephemeral") { + if topicMeta.Paused { + err = t.Pause() + if err != nil { + n.logf(LOG_WARN, "failed to pre-pause topic %s - %s", t.name, err) + } + } + channelStates, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs) + for _, channelState := range channelStates { + if strings.HasSuffix(channelState.Name, "#ephemeral") { continue // do not create ephemeral channel with no consumer client } - t.GetChannel(channelName) + c := t.GetChannel(channelState.Name) + if channelState.Paused { + err = c.Pause() + if err != nil { + n.logf(LOG_WARN, "failed to pre-pause channel %s for topic %s - %s", c.name, t.name, err) + } + } } } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 { n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name) diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index b39df44e8..97317fb53 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -330,6 +330,15 @@ func TestCluster(t *testing.T) { defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() + topic := nsqd.GetTopic("cluster_test_get_topic_by_nsqd") + test.NotNil(t, topic) + // allow some time for nsqd to push info to nsqlookupd + time.Sleep(350 * time.Millisecond) + err := nsqd.DeleteExistingTopic("cluster_test_get_topic_by_nsqd") + test.Nil(t, err) + // allow some time for nsqd to push info to nsqlookupd + time.Sleep(350 * time.Millisecond) + topicName := "cluster_test" + strconv.Itoa(int(time.Now().Unix())) hostname, err := os.Hostname() diff --git a/nsqd/topic.go b/nsqd/topic.go index 76aad14bb..35fce29e5 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -82,7 +82,7 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic t.waitGroup.Wrap(t.messagePump) - t.nsqd.Notify(t) + t.nsqd.Notify(notifyContext{NotifyTypeRegistration, t}) return t } @@ -355,7 +355,7 @@ func (t *Topic) exit(deleted bool) error { // since we are explicitly deleting a topic (not just at system exit time) // de-register this from the lookupd - t.nsqd.Notify(t) + t.nsqd.Notify(notifyContext{NotifyTypeUnRegistration, t}) } else { t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name) } @@ -452,11 +452,15 @@ func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile { } func (t *Topic) Pause() error { - return t.doPause(true) + err := t.doPause(true) + t.nsqd.Notify(notifyContext{NotifyTypeStateUpdate, t}) + return err } func (t *Topic) UnPause() error { - return t.doPause(false) + err := t.doPause(false) + t.nsqd.Notify(notifyContext{NotifyTypeStateUpdate, t}) + return err } func (t *Topic) doPause(pause bool) error { diff --git a/nsqlookupd/http.go b/nsqlookupd/http.go index d2806ea6d..fbcbaac00 100644 --- a/nsqlookupd/http.go +++ b/nsqlookupd/http.go @@ -78,13 +78,56 @@ func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou } func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { - topics := s.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys() + type topicMeta struct { + Paused bool `json:"paused"` + } + + var topics []string + topicsMetaMap := make(map[string]*topicMeta) + + reqParams, err := http_api.NewReqParams(req) + if err != nil { + return nil, http_api.Err{400, err.Error()} + } + + topicName, _ := reqParams.Get("topic") + if topicName != "" { + topics = append(topics, topicName) + var tm topicMeta + producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "") + producers = producers.FilterByActive(s.nsqlookupd.opts.InactiveProducerTimeout, + s.nsqlookupd.opts.TombstoneLifetime) + if producers.IsPaused() { + tm.Paused = true + } + topicsMetaMap[topicName] = &tm + } else { + topics = s.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys() + for _, topicName := range topics { + if _, ok := topicsMetaMap[topicName]; !ok { + topicsMetaMap[topicName] = &topicMeta{Paused: false} + } + producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "") + producers = producers.FilterByActive(s.nsqlookupd.opts.InactiveProducerTimeout, + s.nsqlookupd.opts.TombstoneLifetime) + if producers.IsPaused() { + topicsMetaMap[topicName].Paused = true + } + } + } + return map[string]interface{}{ - "topics": topics, + "topics": topics, + "topics_meta": topicsMetaMap, }, nil } func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + + type channelMeta struct { + Paused bool `json:"paused"` + } + reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -95,9 +138,24 @@ func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps htt return nil, http_api.Err{400, "MISSING_ARG_TOPIC"} } + var producers Producers + channelsMetaMap := make(map[string]*channelMeta) channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys() + for _, channelName := range channels { + if _, ok := channelsMetaMap[channelName]; !ok { + channelsMetaMap[channelName] = &channelMeta{false} + } + producers = s.nsqlookupd.DB.FindProducers("channel", topicName, channelName) + producers = producers.FilterByActive(s.nsqlookupd.opts.InactiveProducerTimeout, + s.nsqlookupd.opts.TombstoneLifetime) + if producers.IsPaused() { + channelsMetaMap[channelName].Paused = true + } + } + return map[string]interface{}{ - "channels": channels, + "channels": channels, + "channels_meta": channelsMetaMap, }, nil } diff --git a/nsqlookupd/http_test.go b/nsqlookupd/http_test.go index ef704a220..21556b750 100644 --- a/nsqlookupd/http_test.go +++ b/nsqlookupd/http_test.go @@ -20,7 +20,13 @@ type InfoDoc struct { } type ChannelsDoc struct { - Channels []interface{} `json:"channels"` + Channels []interface{} `json:"channels"` + ChannelsMeta map[string]interface{} `json:"channels_meta"` +} + +type TopicMetaDoc struct { + Topics []interface{} `json:"topics"` + TopicsMeta map[string]interface{} `json:"topics_meta"` } type ErrMessage struct { @@ -77,12 +83,18 @@ func bootstrapNSQCluster(t *testing.T) (string, []*nsqd.NSQD, *NSQLookupd) { func makeTopic(nsqlookupd *NSQLookupd, topicName string) { key := Registration{"topic", topicName, ""} - nsqlookupd.DB.AddRegistration(key) + now := time.Now() + pi1 := &PeerInfo{now.UnixNano(), "1", "remote_addr:1", "host", "b_addr", 1, 2, "v1"} + p1 := &Producer{pi1, false, now, true} + nsqlookupd.DB.AddProducer(key, p1) } func makeChannel(nsqlookupd *NSQLookupd, topicName string, channelName string) { key := Registration{"channel", topicName, channelName} - nsqlookupd.DB.AddRegistration(key) + now := time.Now() + pi1 := &PeerInfo{now.UnixNano(), "1", "remote_addr:1", "host", "b_addr", 1, 2, "v1"} + p1 := &Producer{pi1, false, now, true} + nsqlookupd.DB.AddProducer(key, p1) makeTopic(nsqlookupd, topicName) } @@ -276,6 +288,7 @@ func TestGetChannels(t *testing.T) { err = json.Unmarshal(body, &ch) test.Nil(t, err) test.Equal(t, 0, len(ch.Channels)) + test.Equal(t, 0, len(ch.ChannelsMeta)) topicName = "sampletopicB" + strconv.Itoa(int(time.Now().Unix())) channelName := "foobar" + strconv.Itoa(int(time.Now().Unix())) @@ -296,6 +309,36 @@ func TestGetChannels(t *testing.T) { test.Nil(t, err) test.Equal(t, 1, len(ch.Channels)) test.Equal(t, channelName, ch.Channels[0]) + test.Equal(t, 1, len(ch.ChannelsMeta)) + test.NotNil(t, ch.ChannelsMeta[channelName]) +} + +func TestTopicMeta(t *testing.T) { + dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t) + defer os.RemoveAll(dataPath) + defer nsqds[0].Exit() + defer nsqlookupd1.Exit() + + client := http.Client{} + + tm := TopicMetaDoc{} + topicName := "sampletopicA" + strconv.Itoa(int(time.Now().Unix())) + makeTopic(nsqlookupd1, topicName) + + url := fmt.Sprintf("http://%s/topics?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName) + + req, _ := http.NewRequest("GET", url, nil) + req.Header.Add("Accept", "application/vnd.nsq; version=1.0") + resp, err := client.Do(req) + test.Nil(t, err) + test.Equal(t, 200, resp.StatusCode) + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + + t.Logf("%s", body) + err = json.Unmarshal(body, &tm) + test.Nil(t, err) + test.NotNil(t, tm.TopicsMeta) } func TestCreateChannel(t *testing.T) { diff --git a/nsqlookupd/lookup_protocol_v1.go b/nsqlookupd/lookup_protocol_v1.go index 1236833ce..843e413cc 100644 --- a/nsqlookupd/lookup_protocol_v1.go +++ b/nsqlookupd/lookup_protocol_v1.go @@ -90,6 +90,8 @@ func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params [ return p.REGISTER(client, reader, params[1:]) case "UNREGISTER": return p.UNREGISTER(client, reader, params[1:]) + case "SYNCSTATE": + return p.SYNCSTATE(client, reader, params[1:]) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) } @@ -191,6 +193,52 @@ func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, pa return []byte("OK"), nil } +func (p *LookupProtocolV1) SYNCSTATE(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { + if client.peerInfo == nil { + return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") + } + + topic, channel, err := getTopicChan("SYNCSTATE", params) + if err != nil { + return nil, err + } + + var bodyLen int32 + err = binary.Read(reader, binary.BigEndian, &bodyLen) + if err != nil { + return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "SYNCSTATE failed to read body size") + } + + body := make([]byte, bodyLen) + _, err = io.ReadFull(reader, body) + if err != nil { + return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "SYNCSTATE failed to read body") + } + + type bodyType struct { + Paused bool `json:"paused"` + } + var state bodyType + err = json.Unmarshal(body, &state) + if err != nil { + return nil, protocol.NewFatalClientErr(err, "E_INVALID", fmt.Sprintf("state body should be json")) + } + + if channel != "" { + key := Registration{"channel", topic, channel} + p.nsqlookupd.DB.UpdateProducer(key, client.peerInfo.id, state.Paused) + p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) SYNCSTATE category:%s key:%s subkey:%s paused: %t", + client, "channel", topic, channel, state.Paused) + } else { + key := Registration{"topic", topic, ""} + p.nsqlookupd.DB.UpdateProducer(key, client.peerInfo.id, state.Paused) + p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) SYNCSTATE category:%s key:%s subkey:%s paused: %t", + client, "topic", topic, "", state.Paused) + } + + return []byte("OK"), nil +} + func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { var err error diff --git a/nsqlookupd/nsqlookupd_test.go b/nsqlookupd/nsqlookupd_test.go index eb66123e8..b7e7ab617 100644 --- a/nsqlookupd/nsqlookupd_test.go +++ b/nsqlookupd/nsqlookupd_test.go @@ -93,6 +93,25 @@ func TestBasicLookupd(t *testing.T) { test.Nil(t, err) test.Equal(t, []byte("OK"), v) + cmd, err := nsq.SyncState(topicName, "channel1", map[string]interface{}{"paused": true}) + test.Nil(t, err) + cmd.WriteTo(conn) + v, err = nsq.ReadResponse(conn) + test.Nil(t, err) + test.Equal(t, []byte("OK"), v) + + cmd, err = nsq.SyncState(topicName, "", map[string]interface{}{"paused": true}) + test.Nil(t, err) + cmd.WriteTo(conn) + v, err = nsq.ReadResponse(conn) + test.Nil(t, err) + test.Equal(t, []byte("OK"), v) + + nsq.Ping().WriteTo(conn) + v, err = nsq.ReadResponse(conn) + test.Nil(t, err) + test.Equal(t, []byte("OK"), v) + pr := ProducersDoc{} endpoint := fmt.Sprintf("http://%s/nodes", httpAddr) err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &pr) @@ -106,6 +125,7 @@ func TestBasicLookupd(t *testing.T) { producers := nsqlookupd.DB.FindProducers("topic", topicName, "") test.Equal(t, 1, len(producers)) + test.Equal(t, true, producers.IsPaused()) producer := producers[0] test.Equal(t, HostAddr, producer.peerInfo.BroadcastAddress) diff --git a/nsqlookupd/registration_db.go b/nsqlookupd/registration_db.go index ba1d1d09c..ac74c7cc1 100644 --- a/nsqlookupd/registration_db.go +++ b/nsqlookupd/registration_db.go @@ -9,7 +9,8 @@ import ( type RegistrationDB struct { sync.RWMutex - registrationMap map[Registration]ProducerMap + registrationMap map[Registration]ProducerMap + highSpeedRegistrations map[Registration]Registrations } type Registration struct { @@ -34,6 +35,7 @@ type Producer struct { peerInfo *PeerInfo tombstoned bool tombstonedAt time.Time + paused bool } type Producers []*Producer @@ -54,7 +56,58 @@ func (p *Producer) IsTombstoned(lifetime time.Duration) bool { func NewRegistrationDB() *RegistrationDB { return &RegistrationDB{ - registrationMap: make(map[Registration]ProducerMap), + registrationMap: make(map[Registration]ProducerMap), + highSpeedRegistrations: make(map[Registration]Registrations), + } +} + +// update high speed registrations map +// should call this func when registrationMap add Registration +func (r *RegistrationDB) addHighSpeedRegistration(k Registration) { + key := Registration{k.Category, "*", ""} + if _, ok := r.highSpeedRegistrations[key]; !ok { + r.highSpeedRegistrations[key] = Registrations{} + } + if k.IsMatch(key.Category, key.Key, key.SubKey) { + r.highSpeedRegistrations[key] = append(r.highSpeedRegistrations[key], k) + } + if k.SubKey != "" { + subKey := Registration{k.Category, k.Key, "*"} + if _, ok := r.highSpeedRegistrations[subKey]; !ok { + r.highSpeedRegistrations[subKey] = Registrations{} + } + if k.IsMatch(subKey.Category, subKey.Key, subKey.SubKey) { + r.highSpeedRegistrations[subKey] = append(r.highSpeedRegistrations[subKey], k) + } + } +} + +// update high speed registrations map +// should call this func when registrationMap remove Registration +func (r *RegistrationDB) removeHighSpeedRegistration(k Registration) { + key := Registration{k.Category, "*", ""} + if registrations, ok := r.highSpeedRegistrations[key]; ok { + for i, registration := range registrations { + if registration == k { + r.highSpeedRegistrations[key] = append( + r.highSpeedRegistrations[key][:i], r.highSpeedRegistrations[key][i+1:]..., + ) + break + } + } + } + if k.SubKey != "" { + subKey := Registration{k.Category, k.Key, "*"} + if registrations, ok := r.highSpeedRegistrations[subKey]; ok { + for i, registration := range registrations { + if registration == k { + r.highSpeedRegistrations[subKey] = append( + r.highSpeedRegistrations[subKey][:i], r.highSpeedRegistrations[subKey][i+1:]..., + ) + break + } + } + } } } @@ -65,6 +118,7 @@ func (r *RegistrationDB) AddRegistration(k Registration) { _, ok := r.registrationMap[k] if !ok { r.registrationMap[k] = make(map[string]*Producer) + r.addHighSpeedRegistration(k) } } @@ -75,6 +129,7 @@ func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool { _, ok := r.registrationMap[k] if !ok { r.registrationMap[k] = make(map[string]*Producer) + r.addHighSpeedRegistration(k) } producers := r.registrationMap[k] _, found := producers[p.peerInfo.id] @@ -84,6 +139,22 @@ func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool { return !found } +// update producer info from a registration +func (r *RegistrationDB) UpdateProducer(k Registration, id string, paused bool) bool { + r.Lock() + defer r.Unlock() + producers, ok := r.registrationMap[k] + if !ok { + return false + } + updated := false + if p, exists := producers[id]; exists && p.paused != paused { + p.paused = paused + updated = true + } + return updated +} + // remove a producer from a registration func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) { r.Lock() @@ -107,6 +178,7 @@ func (r *RegistrationDB) RemoveRegistration(k Registration) { r.Lock() defer r.Unlock() delete(r.registrationMap, k) + r.removeHighSpeedRegistration(k) } func (r *RegistrationDB) needFilter(key string, subkey string) bool { @@ -123,6 +195,10 @@ func (r *RegistrationDB) FindRegistrations(category string, key string, subkey s } return Registrations{} } + quickKey := Registration{category, key, subkey} + if quickResults, ok := r.highSpeedRegistrations[quickKey]; ok { + return quickResults + } results := Registrations{} for k := range r.registrationMap { if !k.IsMatch(category, key, subkey) { @@ -230,6 +306,21 @@ func (pp Producers) PeerInfo() []*PeerInfo { return results } +func (pp Producers) IsPaused() bool { + pausedCount := 0 + total := 0 + for _, p := range pp { + if p.paused == true { + pausedCount++ + } + total++ + } + if total == 0 { + return false + } + return pausedCount*2 >= total +} + func ProducerMap2Slice(pm ProducerMap) Producers { var producers Producers for _, producer := range pm { diff --git a/nsqlookupd/registration_db_test.go b/nsqlookupd/registration_db_test.go index 91a483f73..e1b4ea964 100644 --- a/nsqlookupd/registration_db_test.go +++ b/nsqlookupd/registration_db_test.go @@ -15,10 +15,10 @@ func TestRegistrationDB(t *testing.T) { pi1 := &PeerInfo{beginningOfTime.UnixNano(), "1", "remote_addr:1", "host", "b_addr", 1, 2, "v1"} pi2 := &PeerInfo{beginningOfTime.UnixNano(), "2", "remote_addr:2", "host", "b_addr", 2, 3, "v1"} pi3 := &PeerInfo{beginningOfTime.UnixNano(), "3", "remote_addr:3", "host", "b_addr", 3, 4, "v1"} - p1 := &Producer{pi1, false, beginningOfTime} - p2 := &Producer{pi2, false, beginningOfTime} - p3 := &Producer{pi3, false, beginningOfTime} - p4 := &Producer{pi1, false, beginningOfTime} + p1 := &Producer{pi1, false, beginningOfTime, false} + p2 := &Producer{pi2, false, beginningOfTime, true} + p3 := &Producer{pi3, false, beginningOfTime, false} + p4 := &Producer{pi1, false, beginningOfTime, false} db := NewRegistrationDB() @@ -43,10 +43,12 @@ func TestRegistrationDB(t *testing.T) { p = db.FindProducers("c", "a", "") t.Logf("%s", p) test.Equal(t, 2, len(p)) + test.Equal(t, true, p.IsPaused()) p = db.FindProducers("c", "*", "b") t.Logf("%s", p) test.Equal(t, 1, len(p)) test.Equal(t, p2.peerInfo.id, p[0].peerInfo.id) + test.Equal(t, true, p.IsPaused()) // filter by active test.Equal(t, 0, len(p.FilterByActive(sec30, sec30))) @@ -119,6 +121,33 @@ func fillRegDB(registrations int, producers int) *RegistrationDB { return regDB } +func fillRegDBV2(topics int, channels int, producers int) *RegistrationDB { + regDB := NewRegistrationDB() + for i := 0; i < topics; i++ { + regT := Registration{"topic", "t" + strconv.Itoa(i), ""} + for j := 0; j < producers; j++ { + p := Producer{ + peerInfo: &PeerInfo{ + id: "p" + strconv.Itoa(j), + }, + } + regDB.AddProducer(regT, &p) + } + for k := 0; k < channels; k++ { + regCa := Registration{"channel", "t" + strconv.Itoa(i), "ca" + strconv.Itoa(k)} + for j := 0; j < producers; j++ { + p := Producer{ + peerInfo: &PeerInfo{ + id: "p" + strconv.Itoa(j), + }, + } + regDB.AddProducer(regCa, &p) + } + } + } + return regDB +} + func benchmarkLookupRegistrations(b *testing.B, registrations int, producers int) { regDB := fillRegDB(registrations, producers) b.ResetTimer() @@ -145,6 +174,38 @@ func benchmarkDoLookup(b *testing.B, registrations int, producers int) { } } +func benchmarkLookupSingleTopic(b *testing.B, registrations int, producers int) { + regDB := fillRegDB(registrations, producers) + b.ResetTimer() + for i := 0; i < b.N; i++ { + topic := "t" + strconv.Itoa(rand.Intn(registrations)) + _ = regDB.FindProducers("topic", topic, "") + } +} + +func benchmarkLookupTopics(b *testing.B, registrations int, producers int) { + regDB := fillRegDB(registrations, producers) + b.ResetTimer() + for i := 0; i < b.N; i++ { + topics := regDB.FindRegistrations("topic", "*", "").Keys() + for _, topic := range topics { + _ = regDB.FindProducers("topic", topic, "") + } + } +} + +func benchmarkLookupChannels(b *testing.B, topics int, channels int, producers int) { + regDB := fillRegDBV2(topics, channels, producers) + b.ResetTimer() + for i := 0; i < b.N; i++ { + topic := "t" + strconv.Itoa(rand.Intn(topics)) + channels := regDB.FindRegistrations("channel", topic, "*").SubKeys() + for _, channel := range channels { + _ = regDB.FindProducers("channel", topic, channel) + } + } +} + func BenchmarkLookupRegistrations8x8(b *testing.B) { benchmarkLookupRegistrations(b, 8, 8) } @@ -216,3 +277,76 @@ func BenchmarkDoLookup512x512(b *testing.B) { func BenchmarkDoLookup512x2048(b *testing.B) { benchmarkDoLookup(b, 512, 2048) } + +func BenchmarkLookupSingleTopic8x8(b *testing.B) { + benchmarkLookupSingleTopic(b, 8, 8) +} + +func BenchmarkLookupSingleTopic8x64(b *testing.B) { + benchmarkLookupSingleTopic(b, 8, 64) +} + +func BenchmarkLookupSingleTopic64x64(b *testing.B) { + benchmarkLookupSingleTopic(b, 64, 64) +} + +func BenchmarkLookupSingleTopic64x512(b *testing.B) { + benchmarkLookupSingleTopic(b, 64, 512) +} + +func BenchmarkLookupSingleTopic512x512(b *testing.B) { + benchmarkLookupSingleTopic(b, 512, 512) +} + +func BenchmarkLookupSingleTopic512x2048(b *testing.B) { + benchmarkLookupSingleTopic(b, 512, 2048) +} + +func BenchmarkLookupTopics8x8(b *testing.B) { + benchmarkLookupTopics(b, 8, 8) +} + +func BenchmarkLookupTopics8x64(b *testing.B) { + benchmarkLookupTopics(b, 8, 64) +} + +func BenchmarkLookupTopics64x64(b *testing.B) { + benchmarkLookupTopics(b, 64, 64) +} + +func BenchmarkLookupTopics64x512(b *testing.B) { + benchmarkLookupTopics(b, 64, 512) +} + +func BenchmarkLookupTopics512x512(b *testing.B) { + benchmarkLookupTopics(b, 512, 512) +} + +func BenchmarkLookupTopics512x2048(b *testing.B) { + benchmarkLookupTopics(b, 512, 2048) +} + +func BenchmarkLookupChannels8x8x8(b *testing.B) { + benchmarkLookupChannels(b, 8, 8, 8) +} + +func BenchmarkLookupChannels8x64x64(b *testing.B) { + benchmarkLookupChannels(b, 8, 64, 64) +} + +func BenchmarkLookupChannels64x64x64(b *testing.B) { + benchmarkLookupChannels(b, 64, 64, 64) +} + +//func BenchmarkLookupChannels64x512x512(b *testing.B) { +// benchmarkLookupChannels(b, 64, 512, 512) +//} + +//func BenchmarkLookupChannels512x512x512(b *testing.B) { +// benchmarkLookupChannels(b, 512, 512, 512) +//} + +// +//func BenchmarkLookupChannels512x2048x2048(b *testing.B) { +// benchmarkLookupChannels(b, 512, 2048, 2048) +//}