Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: discover topic/channel paused state on new topic discovery #1274

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ require (
)

go 1.13

replace github.com/nsqio/go-nsq v1.0.8 => github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ github.com/mreiferson/go-options v1.0.0 h1:RMLidydGlDWpL+lQTXo0bVIf/XT2CTq7AEJMo
github.com/mreiferson/go-options v1.0.0/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w=
github.com/nsqio/go-diskqueue v1.0.0 h1:XRqpx7zTMu9yNVH+cHvA5jEiPNKoYcyEsCVqXP3eFg4=
github.com/nsqio/go-diskqueue v1.0.0/go.mod h1:INuJIxl4ayUsyoNtHL5+9MFPDfSZ0zY93hNY6vhBRsI=
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90 h1:js7rqe9IqkTNFcyXjbmuOo0nV3Z2HBM4yNc8SWqByUs=
github.com/thekingofworld/go-nsq v1.0.9-0.20200815080834-015554cb0b90/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76 h1:Dho5nD6R3PcW2SH1or8vS0dszDaXRxIw55lBX7XiE5g=
golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
88 changes: 78 additions & 10 deletions internal/clusterinfo/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, err

// GetLookupdTopicChannels returns a []string containing a union of all the channels
// from all the given lookupd for the given topic
func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error) {
var channels []string
func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]ChannelState, error) {
var lock sync.Mutex
var wg sync.WaitGroup
var errs []error

type respType struct {
Channels []string `json:"channels"`
topicChannelsMeta := &TopicChannelsMeta{
Channels: []string{},
ChannelsMeta: map[string]*ChannelMeta{},
}

for _, addr := range lookupdHTTPAddrs {
Expand All @@ -137,7 +137,7 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s
endpoint := fmt.Sprintf("http://%s/channels?topic=%s", addr, url.QueryEscape(topic))
c.logf("CI: querying nsqlookupd %s", endpoint)

var resp respType
var resp TopicChannelsMeta
err := c.client.GETV1(endpoint, &resp)
if err != nil {
lock.Lock()
Expand All @@ -148,7 +148,15 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s

lock.Lock()
defer lock.Unlock()
channels = append(channels, resp.Channels...)
topicChannelsMeta.Channels = append(topicChannelsMeta.Channels, resp.Channels...)
for channelName, channelMeta := range resp.ChannelsMeta {
if curMeta, ok := topicChannelsMeta.ChannelsMeta[channelName]; ok {
if curMeta != nil && curMeta.Paused == true {
continue //one of the lookupd has returned paused,so just continue
}
}
topicChannelsMeta.ChannelsMeta[channelName] = channelMeta
}
}(addr)
}
wg.Wait()
Expand All @@ -157,13 +165,73 @@ func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []s
return nil, fmt.Errorf("Failed to query any nsqlookupd: %s", ErrList(errs))
}

channels = stringy.Uniq(channels)
sort.Strings(channels)
topicChannelsMeta.Channels = stringy.Uniq(topicChannelsMeta.Channels)
sort.Strings(topicChannelsMeta.Channels)

var channelStates []ChannelState
for _, channelName := range topicChannelsMeta.Channels {
channelState := ChannelState{
Name: channelName,
Paused: false,
}
if meta, ok := topicChannelsMeta.ChannelsMeta[channelName]; ok && meta != nil {
channelState.Paused = meta.Paused
}
channelStates = append(channelStates, channelState)
}

if len(errs) > 0 {
return channelStates, ErrList(errs)
}
return channelStates, nil
}

// GetLookupdTopic return a topicMeta info from all the given lookupd for the given topic
func (c *ClusterInfo) GetLookupdTopic(topic string, lookupdHTTPAddrs []string) (TopicMeta, error) {
var lock sync.Mutex
var wg sync.WaitGroup
var errs []error

topicMeta := TopicMeta{}
type respType struct {
Topics []string `json:"topics"`
TopicsMeta map[string]*TopicMeta `json:"topics_meta"`
}
for _, addr := range lookupdHTTPAddrs {
wg.Add(1)
go func(addr string) {
defer wg.Done()

endpoint := fmt.Sprintf("http://%s/topics?topic=%s", addr, url.QueryEscape(topic))
c.logf("CI: querying nsqlookupd %s", endpoint)

var resp respType
err := c.client.GETV1(endpoint, &resp)
if err != nil {
lock.Lock()
errs = append(errs, err)
lock.Unlock()
return
}

lock.Lock()
defer lock.Unlock()
if metaData, ok := resp.TopicsMeta[topic]; ok {
//if one of the lookupd return paused, that should be paused
if metaData != nil && metaData.Paused == true && topicMeta.Paused == false {
topicMeta.Paused = true
}
}
}(addr)
}
wg.Wait()
if len(errs) == len(lookupdHTTPAddrs) {
return topicMeta, fmt.Errorf("Failed to query any nsqlookupd: %s", ErrList(errs))
}
if len(errs) > 0 {
return channels, ErrList(errs)
return topicMeta, ErrList(errs)
}
return channels, nil
return topicMeta, nil
}

// GetLookupdProducers returns Producers of all the nsqd connected to the given lookupds
Expand Down
18 changes: 18 additions & 0 deletions internal/clusterinfo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,21 @@ type ProducersByHost struct {
func (c ProducersByHost) Less(i, j int) bool {
return c.Producers[i].Hostname < c.Producers[j].Hostname
}

type ChannelState struct {
Name string
Paused bool
}

type ChannelMeta struct {
Paused bool `json:"paused"`
}

type TopicMeta struct {
Paused bool `json:"paused"`
}

type TopicChannelsMeta struct {
Channels []string `json:"channels"`
ChannelsMeta map[string]*ChannelMeta `json:"channels_meta"`
}
8 changes: 6 additions & 2 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,13 @@ func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps
producers, _ := s.ci.GetLookupdTopicProducers(
topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
if len(producers) == 0 {
topicChannels, _ := s.ci.GetLookupdTopicChannels(
var channels []string
channelStates, _ := s.ci.GetLookupdTopicChannels(
topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
topicChannelMap[topicName] = topicChannels
for _, channelState := range channelStates {
channels = append(channels, channelState.Name)
}
topicChannelMap[topicName] = channels
}
}
respond:
Expand Down
18 changes: 18 additions & 0 deletions nsqadmin/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type TopicsDoc struct {
Topics []interface{} `json:"topics"`
}

type TopicsInactiveDoc struct {
Topics map[string][]string `json:"topics"`
}

type TopicStatsDoc struct {
*clusterinfo.TopicStats
Message string `json:"message"`
Expand Down Expand Up @@ -177,6 +181,20 @@ func TestHTTPTopicsGET(t *testing.T) {
test.Nil(t, err)
test.Equal(t, 1, len(tr.Topics))
test.Equal(t, topicName, tr.Topics[0])

url = fmt.Sprintf("http://%s/api/topics?inactive=true", nsqadmin1.RealHTTPAddr())
req, _ = http.NewRequest("GET", url, nil)
resp, err = client.Do(req)
test.Nil(t, err)
test.Equal(t, 200, resp.StatusCode)
body, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()

t.Logf("%s", body)
ti := TopicsInactiveDoc{}
err = json.Unmarshal(body, &ti)
test.Nil(t, err)
test.Equal(t, 0, len(ti.Topics))
}

func TestHTTPTopicGET(t *testing.T) {
Expand Down
12 changes: 8 additions & 4 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,
)
}

c.nsqd.Notify(c)
c.nsqd.Notify(notifyContext{NotifyTypeRegistration, c})

return c
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (c *Channel) exit(deleted bool) error {

// since we are explicitly deleting a channel (not just at system exit time)
// de-register this from the lookupd
c.nsqd.Notify(c)
c.nsqd.Notify(notifyContext{NotifyTypeUnRegistration, c})
} else {
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name)
}
Expand Down Expand Up @@ -256,11 +256,15 @@ func (c *Channel) Depth() int64 {
}

func (c *Channel) Pause() error {
return c.doPause(true)
err := c.doPause(true)
c.nsqd.Notify(notifyContext{NotifyTypeStateUpdate, c})
return err
}

func (c *Channel) UnPause() error {
return c.doPause(false)
err := c.doPause(false)
c.nsqd.Notify(notifyContext{NotifyTypeStateUpdate, c})
return err
}

func (c *Channel) doPause(pause bool) error {
Expand Down
103 changes: 81 additions & 22 deletions nsqd/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nsqd
import (
"bytes"
"encoding/json"
"fmt"
"net"
"os"
"strconv"
Expand All @@ -12,6 +13,17 @@ import (
"github.com/nsqio/nsq/internal/version"
)

const (
NotifyTypeRegistration = iota
NotifyTypeUnRegistration
NotifyTypeStateUpdate
)

type notifyContext struct {
notifyType int
v interface{}
}

func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
return func(lp *lookupPeer) {
ci := make(map[string]interface{})
Expand Down Expand Up @@ -53,11 +65,26 @@ func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
n.RLock()
for _, topic := range n.topicMap {
topic.RLock()
if len(topic.channelMap) == 0 {
commands = append(commands, nsq.Register(topic.name, ""))
} else {
for _, channel := range topic.channelMap {
commands = append(commands, nsq.Register(channel.topicName, channel.name))
commands = append(commands, nsq.Register(topic.name, ""))
topicPaused := topic.IsPaused()
if topicPaused { //sync state when topic paused
command, err := nsq.SyncState(topic.name, "", map[string]interface{}{"paused": topicPaused})
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): SyncState - %s", lp, err)
} else {
commands = append(commands, command)
}
}
for _, channel := range topic.channelMap {
commands = append(commands, nsq.Register(channel.topicName, channel.name))
channelPaused := channel.IsPaused()
if channelPaused { //sync state when channel paused
command, err := nsq.SyncState(channel.topicName, channel.name, map[string]interface{}{"paused": channelPaused})
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): SyncState - %s", lp, err)
continue
}
commands = append(commands, command)
}
}
topic.RUnlock()
Expand Down Expand Up @@ -118,29 +145,61 @@ func (n *NSQD) lookupLoop() {
}
case val := <-n.notifyChan:
var cmd *nsq.Command
var err error
var branch string

switch val.(type) {
case *Channel:
// notify all nsqlookupds that a new channel exists, or that it's removed
branch = "channel"
channel := val.(*Channel)
if channel.Exiting() == true {
cmd = nsq.UnRegister(channel.topicName, channel.name)
} else {
notifyCtx, ok := val.(notifyContext)
if !ok {
panic("non-notifyContext sent to notifyChan - should never happen")
}
switch notifyCtx.notifyType {
case NotifyTypeRegistration:
switch notifyCtx.v.(type) {
case *Channel:
// notify all nsqlookupds that a new channel exists
branch = "channel"
channel := notifyCtx.v.(*Channel)
cmd = nsq.Register(channel.topicName, channel.name)
case *Topic:
// notify all nsqlookupds that a new topic exists
branch = "topic"
topic := notifyCtx.v.(*Topic)
cmd = nsq.Register(topic.name, "")
}
case *Topic:
// notify all nsqlookupds that a new topic exists, or that it's removed
branch = "topic"
topic := val.(*Topic)
if topic.Exiting() == true {
case NotifyTypeUnRegistration:
switch notifyCtx.v.(type) {
case *Channel:
// notify all nsqlookupds that a new channel removed
branch = "channel"
channel := notifyCtx.v.(*Channel)
cmd = nsq.UnRegister(channel.topicName, channel.name)
case *Topic:
// notify all nsqlookupds that a new topic removed
branch = "topic"
topic := notifyCtx.v.(*Topic)
cmd = nsq.UnRegister(topic.name, "")
} else {
cmd = nsq.Register(topic.name, "")
}
case NotifyTypeStateUpdate:
switch notifyCtx.v.(type) {
case *Channel:
// notify all nsqlookupds that channel state changed
branch = "channel"
channel := notifyCtx.v.(*Channel)
cmd, err = nsq.SyncState(channel.topicName, channel.name, map[string]interface{}{"paused": channel.IsPaused()})
if err != nil {
n.logf(LOG_ERROR, "NSQD: build cmd err: %s", err)
}
case *Topic:
// notify all nsqlookupds that topic state changed
branch = "topic"
topic := notifyCtx.v.(*Topic)
cmd, err = nsq.SyncState(topic.name, "", map[string]interface{}{"paused": topic.IsPaused()})
if err != nil {
n.logf(LOG_ERROR, "NSQD: build cmd err: %s", err)
}
}
default:
panic(fmt.Sprintf("unknown notifyType in notifyContext: %d, should never happen", notifyCtx.notifyType))
}

for _, lookupPeer := range lookupPeers {
n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
_, err := lookupPeer.Command(cmd)
Expand Down
Loading