Skip to content

Commit

Permalink
make Ask to work inside handlers themselves without the need of a new…
Browse files Browse the repository at this point in the history
… goroutine
  • Loading branch information
kataras committed Jul 17, 2019
1 parent df0e783 commit c6806f1
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 5 deletions.
10 changes: 8 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Conn struct {
// i.e `askConnect: myNamespace` blocks the `tryNamespace: myNamespace` until finish.
processes *processes

isInsideHandler *uint32

// messages that this connection waits for a reply.
waitingMessages map[string]chan Message
waitingMessagesMutex sync.RWMutex
Expand All @@ -98,6 +100,7 @@ func newConn(socket Socket, namespaces Namespaces) *Conn {
acknowledged: new(uint32),
connectedNamespaces: make(map[string]*NSConn),
processes: newProcesses(),
isInsideHandler: new(uint32),
waitingMessages: make(map[string]chan Message),
allowNativeMessages: false,
shouldHandleOnlyNativeMessages: false,
Expand Down Expand Up @@ -327,7 +330,9 @@ func (c *Conn) startReader() {
continue
}

atomic.StoreUint32(c.isInsideHandler, 1)
c.HandlePayload(b)
atomic.StoreUint32(c.isInsideHandler, 0)
}
}

Expand Down Expand Up @@ -916,7 +921,8 @@ func (c *Conn) sendConfirmation(wait string) {

// Ask method sends a message to the remote side and blocks until a response or an error received from the specific `Message.Event`.
func (c *Conn) Ask(ctx context.Context, msg Message) (Message, error) {
return c.ask(ctx, msg, false)
mustWaitOnlyTheNextMessage := atomic.LoadUint32(c.isInsideHandler) == 1
return c.ask(ctx, msg, mustWaitOnlyTheNextMessage)
}

func (c *Conn) ask(ctx context.Context, msg Message, mustWaitOnlyTheNextMessage bool) (Message, error) {
Expand All @@ -939,7 +945,7 @@ func (c *Conn) ask(ctx context.Context, msg Message, mustWaitOnlyTheNextMessage
}
}

ch := make(chan Message)
ch := make(chan Message, 1)
msg.wait = genWait(c.IsClient())

if mustWaitOnlyTheNextMessage {
Expand Down
4 changes: 2 additions & 2 deletions conn_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (ns *NSConn) askRoomJoin(ctx context.Context, roomName string) (*Room, erro
IsLocal: true,
}

_, err := ns.Conn.ask(ctx, joinMsg, true)
_, err := ns.Conn.Ask(ctx, joinMsg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func (ns *NSConn) askRoomLeave(ctx context.Context, msg Message, lock bool) erro
return ErrBadRoom
}

_, err := ns.Conn.ask(ctx, msg, true)
_, err := ns.Conn.Ask(ctx, msg)
if err != nil {
return err
}
Expand Down
74 changes: 73 additions & 1 deletion conn_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestJoinAndLeaveRoom(t *testing.T) {
t.Fatalf("expected Message's room name to be: %s==%s but it's: %s", roomName, room.Name, msg.Room)
}

go room.Leave(nil)
room.Leave(nil)

wg.Done()
} else {
Expand Down Expand Up @@ -90,3 +90,75 @@ func TestJoinAndLeaveRoom(t *testing.T) {
t.Fatal(err)
}
}

func TestJoinAndLeaveRoomInsideHandler(t *testing.T) {
var (
wg sync.WaitGroup
namespace = "default"
roomName = "room1"
body = []byte("data")
events = neffos.Namespaces{
namespace: neffos.Events{
"event": func(c *neffos.NSConn, msg neffos.Message) error {
if c.Conn.IsClient() {
if !bytes.Equal(msg.Body, body) {
t.Fatalf("expected event's incoming data to be: %s but got: %s", string(body), string(msg.Body))
}

room := c.Room(roomName)
if room == nil {
t.Fatal("expected a non-nil room")
}

if room.Name != msg.Room {
t.Fatalf("expected Message's room name to be: %s==%s but it's: %s", roomName, room.Name, msg.Room)
}

room.Leave(nil)

wg.Done()
} else {
room, err := c.JoinRoom(nil, roomName)
if err != nil {
return err
}

room.Emit(msg.Event, msg.Body)
}

return nil
},
neffos.OnRoomLeft: func(c *neffos.NSConn, msg neffos.Message) error {
if c.Conn.IsClient() {
if msg.Room != roomName {
t.Fatalf("expected left room name to be %s but got %s", roomName, msg.Room)
}
wg.Done()
}

return nil
},
},
}
)

teardownServer := runTestServer("localhost:8080", events)
defer teardownServer()

err := runTestClient("localhost:8080", events,
func(dialer string, client *neffos.Client) {
c, err := client.Connect(nil, namespace)
if err != nil {
t.Fatal(err)
}

// 1 -> to catch its own event and
// 2 -> to notify about room leave inside the event itself for both clients ofc.
wg.Add(2)
c.Emit("event", body)
wg.Wait()
})()
if err != nil {
t.Fatal(err)
}
}

0 comments on commit c6806f1

Please sign in to comment.