Skip to content

Commit

Permalink
fix #37 - waiting for confirmation
Browse files Browse the repository at this point in the history
  • Loading branch information
kataras committed Apr 21, 2020
1 parent a6bf2ab commit 5aa2daf
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ assignees: kataras
**Describe the bug**
A clear and concise description of what the bug is.

> Note, if that bugs is a browser relative please report it at the [neffos.js](https://github.com/kataras/neffos.js) repository instead.
> Note that if a bug is a browser-relative please report it at the [neffos.js](https://github.com/kataras/neffos.js) repository instead. Thanks!
**To Reproduce**
Steps to reproduce the behavior:
Expand Down
10 changes: 5 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,9 +588,8 @@ func (c *Conn) Namespace(namespace string) *NSConn {
}

func (c *Conn) tryNamespace(in Message) (*NSConn, bool) {
// for atomic.LoadUint32(c.isConnectingProcess) > 0 {
// }
c.processes.get(in.Namespace).wait() // wait any `askConnect` process (if any) of that "in.Namespace".
c.processes.get(in.Namespace).Wait() // wait any `askConnect` process (if any) of that "in.Namespace".

ns := c.Namespace(in.Namespace)
if ns == nil {
// if _, canConnect := c.namespaces[msg.Namespace]; !canConnect {
Expand All @@ -610,8 +609,9 @@ func (c *Conn) tryNamespace(in Message) (*NSConn, bool) {
// client#Connect
func (c *Conn) askConnect(ctx context.Context, namespace string) (*NSConn, error) {
p := c.processes.get(namespace)
p.start() // block any `tryNamespace` with that "namespace".
defer p.stop() // unblock.
p.Start() // block any `tryNamespace` with that "namespace".

defer p.Done() // unblock.

// defer c.processes.get(namespace).run()()
// for !atomic.CompareAndSwapUint32(c.isConnectingProcess, 0, 1) {
Expand Down
55 changes: 38 additions & 17 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ func (p *processes) get(name string) *process {

if entry == nil {
entry = &process{
v: new(uint32),
finished: make(chan struct{}),
}

p.locker.Lock()
p.entries[name] = entry
p.locker.Unlock()
Expand All @@ -38,30 +39,50 @@ func (p *processes) get(name string) *process {
// process is used on connections on specific actions that needs to wait for an answer from the other side.
// Take for example the `Conn#handleMessage.tryNamespace` which waits for `Conn#askConnect` to finish on the specific namespace.
type process struct {
v *uint32
done uint32

finished chan struct{}
waiting sync.WaitGroup
}

// func (p *process) run() func() {
// p.start()
// return p.stop
// }
// Signal closes the channel.
func (p *process) Signal() {
// if !atomic.CompareAndSwapUint32(&p.running, 1, 0) {
// return // already finished.
// }

func (p *process) start() {
for !atomic.CompareAndSwapUint32(p.v, 0, 1) {
// if already started then wait to finish.
}
close(p.finished)
}

func (p *process) stop() {
atomic.StoreUint32(p.v, 0)
// Finished returns the read-only channel of `finished`.
// It gets fired when `Signal` is called.
func (p *process) Finished() <-chan struct{} {
return p.finished
}

func (p *process) wait() {
for p.isRunning() {
// Done calls the internal WaitGroup's `Done` method.
func (p *process) Done() {
if !atomic.CompareAndSwapUint32(&p.done, 0, 1) {
return
}

p.waiting.Done()
}

// Wait waits on the internal `WaitGroup`. See `Done` too.
func (p *process) Wait() {
if atomic.LoadUint32(&p.done) == 1 {
return
}
p.waiting.Wait()
}

// Start makes future `Wait` calls to hold until `Done`.
func (p *process) Start() {
p.waiting.Add(1)
}

// returns true if process didn't start yet or if stopped running.
func (p *process) isRunning() bool {
return atomic.LoadUint32(p.v) > 0
// isDone reports whether process is finished.
func (p *process) isDone() bool {
return atomic.LoadUint32(&p.done) == 1
}
58 changes: 52 additions & 6 deletions process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,81 @@ package neffos

import (
"fmt"
"sync/atomic"
"testing"
"time"

"golang.org/x/sync/errgroup"
)

func TestProcess(t *testing.T) {
func TestProcessWaitDone(t *testing.T) {
var testProcessName = "default"
procs := newProcesses()
p := procs.get(testProcessName)
p.Start()

worker := func() error {
defer procs.get(testProcessName).stop()
if !procs.get(testProcessName).isRunning() {
defer p.Done()
if p.isDone() {
return fmt.Errorf("%s process should be running", testProcessName)
}
time.Sleep(1 * time.Second)
return nil
}

procs.get(testProcessName).start()
g := new(errgroup.Group)
g.Go(worker)

procs.get(testProcessName).wait()
if procs.get(testProcessName).isRunning() {
p.Wait()
if !p.isDone() {
t.Fatalf("%s process should be finished", testProcessName)
}

if err := g.Wait(); err != nil {
t.Fatal(err)
}
}

func TestProcessSingalFinished(t *testing.T) {
var testProcessName = "default"
procs := newProcesses()
p := procs.get(testProcessName)

var count uint32

worker := func() error {
p.Start()
defer p.Done()

tc := time.NewTicker(time.Second)
defer tc.Stop()

for {
select {
case <-tc.C:
atomic.AddUint32(&count, 1)
case <-p.Finished():
return nil
}
}
}

g := new(errgroup.Group)
g.Go(worker)

var sleepSecs uint32 = 2
time.Sleep(time.Duration(sleepSecs) * time.Second)
p.Signal()
p.Wait()
if !procs.get(testProcessName).isDone() {
t.Fatalf("%s process should be stopped", testProcessName)
}

if err := g.Wait(); err != nil {
t.Fatal(err)
}

if counts := atomic.LoadUint32(&count); counts != sleepSecs {
t.Fatalf("%s process should tik-tok for %d seconds but: %d", testProcessName, sleepSecs, counts)
}
}
1 change: 0 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ func publishMessages(c *Conn, msgs []Message) bool {
for _, msg := range msgs {
if msg.from == c.ID() {
// if the message is not supposed to return back to any connection with this ID.

return true
}

Expand Down

0 comments on commit 5aa2daf

Please sign in to comment.