forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscription_manager_test.go
141 lines (113 loc) · 4.89 KB
/
subscription_manager_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package river
import (
"context"
"testing"
"time"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/startstoptest"
"github.com/riverqueue/river/rivershared/testfactory"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivertype"
)
func Test_SubscriptionManager(t *testing.T) {
t.Parallel()
ctx := context.Background()
type testBundle struct {
exec riverdriver.Executor
subscribeCh chan []jobcompleter.CompleterJobUpdated
tx pgx.Tx
}
setup := func(t *testing.T) (*subscriptionManager, *testBundle) {
t.Helper()
tx := riverinternaltest.TestTx(ctx, t)
exec := riverpgxv5.New(nil).UnwrapExecutor(tx)
subscribeCh := make(chan []jobcompleter.CompleterJobUpdated, 1)
manager := newSubscriptionManager(riversharedtest.BaseServiceArchetype(t), subscribeCh)
require.NoError(t, manager.Start(ctx))
t.Cleanup(manager.Stop)
return manager, &testBundle{
exec: exec,
subscribeCh: subscribeCh,
tx: tx,
}
}
t.Run("DistributesRequestedEventsToSubscribers", func(t *testing.T) {
t.Parallel()
manager, bundle := setup(t)
t.Cleanup(func() { close(bundle.subscribeCh) })
sub, cancelSub := manager.SubscribeConfig(&SubscribeConfig{ChanSize: 10, Kinds: []EventKind{EventKindJobCompleted, EventKindJobSnoozed}})
t.Cleanup(cancelSub)
// Send some events
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(time.Now())})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(time.Now())})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRetryable)})
job4 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)})
makeStats := func(complete, wait, run time.Duration) *jobstats.JobStatistics {
return &jobstats.JobStatistics{
CompleteDuration: complete,
QueueWaitDuration: wait,
RunDuration: run,
}
}
bundle.subscribeCh <- []jobcompleter.CompleterJobUpdated{
{Job: job1, JobStats: makeStats(101, 102, 103)}, // completed, should be sent
{Job: job2, JobStats: makeStats(201, 202, 203)}, // cancelled, should be skipped
}
bundle.subscribeCh <- []jobcompleter.CompleterJobUpdated{
{Job: job3, JobStats: makeStats(301, 302, 303)}, // retryable, should be skipped
{Job: job4, JobStats: makeStats(401, 402, 403)}, // snoozed/scheduled, should be sent
}
received := riversharedtest.WaitOrTimeoutN(t, sub, 2)
require.Equal(t, job1.ID, received[0].Job.ID)
require.Equal(t, rivertype.JobStateCompleted, received[0].Job.State)
require.Equal(t, time.Duration(101), received[0].JobStats.CompleteDuration)
require.Equal(t, time.Duration(102), received[0].JobStats.QueueWaitDuration)
require.Equal(t, time.Duration(103), received[0].JobStats.RunDuration)
require.Equal(t, job4.ID, received[1].Job.ID)
require.Equal(t, rivertype.JobStateScheduled, received[1].Job.State)
require.Equal(t, time.Duration(401), received[1].JobStats.CompleteDuration)
require.Equal(t, time.Duration(402), received[1].JobStats.QueueWaitDuration)
require.Equal(t, time.Duration(403), received[1].JobStats.RunDuration)
cancelSub()
select {
case value, stillOpen := <-sub:
require.False(t, stillOpen, "subscription channel should be closed")
require.Nil(t, value, "subscription channel should be closed")
default:
require.Fail(t, "subscription channel should have been closed")
}
})
t.Run("StartStopRepeatedly", func(t *testing.T) {
// This service does not use the typical `startstoptest.Stress()` test
// because there are some additional steps required after a `Stop` for the
// subsequent `Start` to succeed. It's also not friendly for multiple
// concurrent calls to `Start` and `Stop`, but this is fine because the only
// usage within `Client` is already protected by a mutex.
t.Parallel()
manager, bundle := setup(t)
subscribeCh := bundle.subscribeCh
for i := 0; i < 100; i++ {
close(subscribeCh)
manager.Stop()
subscribeCh = make(chan []jobcompleter.CompleterJobUpdated, 1)
manager.ResetSubscribeChan(subscribeCh)
require.NoError(t, manager.Start(ctx))
}
close(subscribeCh)
})
t.Run("StartStopStress", func(t *testing.T) {
t.Parallel()
svc, bundle := setup(t)
// Close the subscription channel in advance so that stops can leave
// successfully.
close(bundle.subscribeCh)
startstoptest.Stress(ctx, t, svc)
})
}