-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub_test.go
69 lines (57 loc) · 1.34 KB
/
pubsub_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package pubsub_test
import (
"context"
"reflect"
"sync"
"testing"
"time"
"github.com/twipi/pubsub"
)
func TestSubscriber(t *testing.T) {
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// This test is in a separate package to ensure that the API is usable from
// other packages.
s := pubsub.NewSubscriber[int]()
src := make(chan int)
wg.Add(1)
go func() {
s.Listen(ctx, src)
wg.Done()
}()
// Create a slow subscriber.
slowCh := make(chan int)
s.Subscribe(slowCh, nil)
defer s.Unsubscribe(slowCh)
// Test broadcast. This should not block.
broadcast := []int{1, 2, 3, 4, 5}
(func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
for _, n := range broadcast {
t.Logf("broadcasting: %v", n)
select {
case <-ctx.Done():
t.Fatalf("broadcast canceled: %v", ctx.Err())
case src <- n:
}
}
})()
// Try to receive all broadcasts.
var slowReceives []int
for len(slowReceives) < len(broadcast) {
select {
case <-ctx.Done():
return
case n := <-slowCh:
t.Logf("slow subscriber received: %v", n)
slowReceives = append(slowReceives, n)
time.Sleep(10 * time.Millisecond)
}
}
if !reflect.DeepEqual(broadcast, slowReceives) {
t.Errorf("slow subscriber did not receive all broadcasts")
}
}