diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 5ba3d9c..f34a11b 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -10,7 +10,7 @@ assignees: kataras **Describe the bug** A clear and concise description of what the bug is. -> Note, if that bugs is a browser relative please report it at the [neffos.js](https://github.com/kataras/neffos.js) repository instead. +> Note that if a bug is a browser-relative please report it at the [neffos.js](https://github.com/kataras/neffos.js) repository instead. Thanks! **To Reproduce** Steps to reproduce the behavior: diff --git a/conn.go b/conn.go index 1bf8390..99d60c2 100644 --- a/conn.go +++ b/conn.go @@ -588,9 +588,8 @@ func (c *Conn) Namespace(namespace string) *NSConn { } func (c *Conn) tryNamespace(in Message) (*NSConn, bool) { - // for atomic.LoadUint32(c.isConnectingProcess) > 0 { - // } - c.processes.get(in.Namespace).wait() // wait any `askConnect` process (if any) of that "in.Namespace". + c.processes.get(in.Namespace).Wait() // wait any `askConnect` process (if any) of that "in.Namespace". + ns := c.Namespace(in.Namespace) if ns == nil { // if _, canConnect := c.namespaces[msg.Namespace]; !canConnect { @@ -610,8 +609,9 @@ func (c *Conn) tryNamespace(in Message) (*NSConn, bool) { // client#Connect func (c *Conn) askConnect(ctx context.Context, namespace string) (*NSConn, error) { p := c.processes.get(namespace) - p.start() // block any `tryNamespace` with that "namespace". - defer p.stop() // unblock. + p.Start() // block any `tryNamespace` with that "namespace". + + defer p.Done() // unblock. // defer c.processes.get(namespace).run()() // for !atomic.CompareAndSwapUint32(c.isConnectingProcess, 0, 1) { diff --git a/process.go b/process.go index e7acd72..e36b1ed 100644 --- a/process.go +++ b/process.go @@ -25,8 +25,9 @@ func (p *processes) get(name string) *process { if entry == nil { entry = &process{ - v: new(uint32), + finished: make(chan struct{}), } + p.locker.Lock() p.entries[name] = entry p.locker.Unlock() @@ -38,30 +39,50 @@ func (p *processes) get(name string) *process { // process is used on connections on specific actions that needs to wait for an answer from the other side. // Take for example the `Conn#handleMessage.tryNamespace` which waits for `Conn#askConnect` to finish on the specific namespace. type process struct { - v *uint32 + done uint32 + + finished chan struct{} + waiting sync.WaitGroup } -// func (p *process) run() func() { -// p.start() -// return p.stop -// } +// Signal closes the channel. +func (p *process) Signal() { + // if !atomic.CompareAndSwapUint32(&p.running, 1, 0) { + // return // already finished. + // } -func (p *process) start() { - for !atomic.CompareAndSwapUint32(p.v, 0, 1) { - // if already started then wait to finish. - } + close(p.finished) } -func (p *process) stop() { - atomic.StoreUint32(p.v, 0) +// Finished returns the read-only channel of `finished`. +// It gets fired when `Signal` is called. +func (p *process) Finished() <-chan struct{} { + return p.finished } -func (p *process) wait() { - for p.isRunning() { +// Done calls the internal WaitGroup's `Done` method. +func (p *process) Done() { + if !atomic.CompareAndSwapUint32(&p.done, 0, 1) { + return } + + p.waiting.Done() +} + +// Wait waits on the internal `WaitGroup`. See `Done` too. +func (p *process) Wait() { + if atomic.LoadUint32(&p.done) == 1 { + return + } + p.waiting.Wait() +} + +// Start makes future `Wait` calls to hold until `Done`. +func (p *process) Start() { + p.waiting.Add(1) } -// returns true if process didn't start yet or if stopped running. -func (p *process) isRunning() bool { - return atomic.LoadUint32(p.v) > 0 +// isDone reports whether process is finished. +func (p *process) isDone() bool { + return atomic.LoadUint32(&p.done) == 1 } diff --git a/process_test.go b/process_test.go index b0ab751..d0f5a3c 100644 --- a/process_test.go +++ b/process_test.go @@ -2,35 +2,81 @@ package neffos import ( "fmt" + "sync/atomic" "testing" "time" "golang.org/x/sync/errgroup" ) -func TestProcess(t *testing.T) { +func TestProcessWaitDone(t *testing.T) { var testProcessName = "default" procs := newProcesses() + p := procs.get(testProcessName) + p.Start() worker := func() error { - defer procs.get(testProcessName).stop() - if !procs.get(testProcessName).isRunning() { + defer p.Done() + if p.isDone() { return fmt.Errorf("%s process should be running", testProcessName) } time.Sleep(1 * time.Second) return nil } - procs.get(testProcessName).start() g := new(errgroup.Group) g.Go(worker) - procs.get(testProcessName).wait() - if procs.get(testProcessName).isRunning() { + p.Wait() + if !p.isDone() { + t.Fatalf("%s process should be finished", testProcessName) + } + + if err := g.Wait(); err != nil { + t.Fatal(err) + } +} + +func TestProcessSingalFinished(t *testing.T) { + var testProcessName = "default" + procs := newProcesses() + p := procs.get(testProcessName) + + var count uint32 + + worker := func() error { + p.Start() + defer p.Done() + + tc := time.NewTicker(time.Second) + defer tc.Stop() + + for { + select { + case <-tc.C: + atomic.AddUint32(&count, 1) + case <-p.Finished(): + return nil + } + } + } + + g := new(errgroup.Group) + g.Go(worker) + + var sleepSecs uint32 = 2 + time.Sleep(time.Duration(sleepSecs) * time.Second) + p.Signal() + p.Wait() + if !procs.get(testProcessName).isDone() { t.Fatalf("%s process should be stopped", testProcessName) } if err := g.Wait(); err != nil { t.Fatal(err) } + + if counts := atomic.LoadUint32(&count); counts != sleepSecs { + t.Fatalf("%s process should tik-tok for %d seconds but: %d", testProcessName, sleepSecs, counts) + } } diff --git a/server.go b/server.go index 3cfd735..1594c08 100644 --- a/server.go +++ b/server.go @@ -430,7 +430,6 @@ func publishMessages(c *Conn, msgs []Message) bool { for _, msg := range msgs { if msg.from == c.ID() { // if the message is not supposed to return back to any connection with this ID. - return true }