Skip to content

Commit 9cdddc4

Browse files
committed
implement server Ask for redis
1 parent 8a180aa commit 9cdddc4

File tree

2 files changed

+35
-22
lines changed

2 files changed

+35
-22
lines changed

stackexchange/nats/stackexchange_nats.go

+5-11
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,6 @@ func (exc *StackExchange) Publish(msg neffos.Message) bool {
300300

301301
// Ask implements server Ask for nats. It blocks.
302302
func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token string) (response neffos.Message, err error) {
303-
if !msg.IsWait(false) {
304-
return response, neffos.ErrInvalidPayload
305-
}
306-
307303
// for some reason we can't use the exc.publisher.Subscribe,
308304
// so create a new connection for subscription which will be terminated on message receive or timeout.
309305
subConn, err := exc.opts.Connect()
@@ -328,13 +324,11 @@ func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token str
328324
return response, neffos.ErrWrite
329325
}
330326

331-
for {
332-
select {
333-
case <-ctx.Done():
334-
return response, ctx.Err()
335-
case response = <-ch:
336-
return response, response.Err
337-
}
327+
select {
328+
case <-ctx.Done():
329+
return response, ctx.Err()
330+
case response = <-ch:
331+
return response, response.Err
338332
}
339333
}
340334

stackexchange/redis/stackexchange_redis.go

+30-11
Original file line numberDiff line numberDiff line change
@@ -227,27 +227,46 @@ func (exc *StackExchange) OnConnect(c *neffos.Conn) error {
227227
func (exc *StackExchange) Publish(msg neffos.Message) bool {
228228
// channel := exc.getMessageChannel(c.ID(), msg)
229229
channel := exc.getChannel(msg.Namespace, msg.Room, msg.To)
230-
b := msg.Serialize()
231230
// neffos.Debugf("[%s] publish to channel [%s] the data [%s]\n", msg.FromExplicit, channel, string(b))
232231

232+
err := exc.publish(channel, msg.Serialize())
233+
return err == nil
234+
}
235+
236+
func (exc *StackExchange) publish(channel string, b []byte) error {
233237
cmd := radix.FlatCmd(nil, "PUBLISH", channel, b)
234-
err := exc.pool.Do(cmd)
238+
return exc.pool.Do(cmd)
239+
}
240+
241+
// Ask implements the server Ask feature for redis. It blocks until response.
242+
func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token string) (response neffos.Message, err error) {
243+
sub := radix.PersistentPubSub("", "", exc.connFunc)
244+
msgCh := make(chan radix.PubSubMessage)
245+
err = sub.Subscribe(msgCh, token)
235246
if err != nil {
236-
return false
247+
return
237248
}
249+
defer sub.Close()
238250

239-
return true
240-
}
251+
if !exc.Publish(msg) {
252+
return response, neffos.ErrWrite
253+
}
254+
255+
select {
256+
case <-ctx.Done():
257+
err = ctx.Err()
258+
case redisMsg := <-msgCh:
259+
response = neffos.DeserializeMessage(nil, redisMsg.Message, false, false)
260+
err = response.Err
261+
}
241262

242-
// Ask TODO.
243-
// Ask will implement the server Ask feature for redis. It will block until response.
244-
func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token string) (neffos.Message, error) {
245-
panic("Not Implemented Yet") // check tomorrow... I am too tired now.
263+
return
246264
}
247265

248-
// NotifyAsk TODO.
266+
// NotifyAsk notifies and unblocks a "msg" subscriber, called on a server connection's read when expects a result.
249267
func (exc *StackExchange) NotifyAsk(msg neffos.Message, token string) error {
250-
panic("Not Implemented Yet")
268+
msg.ClearWait()
269+
return exc.publish(token, msg.Serialize())
251270
}
252271

253272
// Subscribe subscribes to a specific namespace,

0 commit comments

Comments
 (0)