Skip to content

Commit 4138c44

Browse files
authored
Merge pull request nsqio#1376 from ploxiln/topic_always_memqueue
nsqd: allow unbuffered memory chan if ephemeral or deferred
2 parents ae2e77a + 4f5d227 commit 4138c44

File tree

2 files changed

+22
-19
lines changed

2 files changed

+22
-19
lines changed

nsqd/channel.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,
8282
clients: make(map[int64]Consumer),
8383
deleteCallback: deleteCallback,
8484
nsqd: nsqd,
85+
ephemeral: strings.HasSuffix(channelName, "#ephemeral"),
8586
}
86-
// create mem-queue only if size > 0 (do not use unbuffered chan)
87-
if nsqd.getOpts().MemQueueSize > 0 {
87+
// avoid mem-queue if size == 0 for more consistent ordering
88+
if nsqd.getOpts().MemQueueSize > 0 || c.ephemeral {
8889
c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
8990
}
9091
if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
@@ -96,8 +97,7 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,
9697

9798
c.initPQ()
9899

99-
if strings.HasSuffix(channelName, "#ephemeral") {
100-
c.ephemeral = true
100+
if c.ephemeral {
101101
c.backend = newDummyBackendQueue()
102102
} else {
103103
dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {

nsqd/topic.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic
4646
t := &Topic{
4747
name: topicName,
4848
channelMap: make(map[string]*Channel),
49-
memoryMsgChan: nil,
49+
memoryMsgChan: make(chan *Message, nsqd.getOpts().MemQueueSize),
5050
startChan: make(chan int, 1),
5151
exitChan: make(chan int),
5252
channelUpdateChan: make(chan int),
@@ -56,10 +56,6 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic
5656
deleteCallback: deleteCallback,
5757
idFactory: NewGUIDFactory(nsqd.getOpts().ID),
5858
}
59-
// create mem-queue only if size > 0 (do not use unbuffered chan)
60-
if nsqd.getOpts().MemQueueSize > 0 {
61-
t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
62-
}
6359
if strings.HasSuffix(topicName, "#ephemeral") {
6460
t.ephemeral = true
6561
t.backend = newDummyBackendQueue()
@@ -222,18 +218,25 @@ func (t *Topic) PutMessages(msgs []*Message) error {
222218
}
223219

224220
func (t *Topic) put(m *Message) error {
225-
select {
226-
case t.memoryMsgChan <- m:
227-
default:
228-
err := writeMessageToBackend(m, t.backend)
229-
t.nsqd.SetHealth(err)
230-
if err != nil {
231-
t.nsqd.logf(LOG_ERROR,
232-
"TOPIC(%s) ERROR: failed to write message to backend - %s",
233-
t.name, err)
234-
return err
221+
// If mem-queue-size == 0, avoid memory chan, for more consistent ordering,
222+
// but try to use memory chan for deferred messages (they lose deferred timer
223+
// in backend queue) or if topic is ephemeral (there is no backend queue).
224+
if cap(t.memoryMsgChan) > 0 || t.ephemeral || m.deferred != 0 {
225+
select {
226+
case t.memoryMsgChan <- m:
227+
return nil
228+
default:
229+
break // write to backend
235230
}
236231
}
232+
err := writeMessageToBackend(m, t.backend)
233+
t.nsqd.SetHealth(err)
234+
if err != nil {
235+
t.nsqd.logf(LOG_ERROR,
236+
"TOPIC(%s) ERROR: failed to write message to backend - %s",
237+
t.name, err)
238+
return err
239+
}
237240
return nil
238241
}
239242

0 commit comments

Comments
 (0)