forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob_complete_tx_test.go
82 lines (63 loc) · 2.15 KB
/
job_complete_tx_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package river
import (
"context"
"testing"
"time"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/testfactory"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivertype"
)
func TestJobCompleteTx(t *testing.T) {
t.Parallel()
ctx := context.Background()
type JobArgs struct {
JobArgsReflectKind[JobArgs]
}
type testBundle struct {
client *Client[pgx.Tx]
exec riverdriver.Executor
tx pgx.Tx
}
setup := func(ctx context.Context, t *testing.T) (context.Context, *testBundle) {
t.Helper()
tx := riverinternaltest.TestTx(ctx, t)
client, err := NewClient(riverpgxv5.New(nil), &Config{
Logger: riversharedtest.Logger(t),
})
require.NoError(t, err)
ctx = context.WithValue(ctx, rivercommon.ContextKeyClient{}, client)
return ctx, &testBundle{
client: client,
exec: riverpgxv5.New(nil).UnwrapExecutor(tx),
tx: tx,
}
}
t.Run("CompletesJob", func(t *testing.T) {
t.Parallel()
ctx, bundle := setup(ctx, t)
job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{
State: ptrutil.Ptr(rivertype.JobStateRunning),
})
completedJob, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job})
require.NoError(t, err)
require.Equal(t, rivertype.JobStateCompleted, completedJob.State)
require.WithinDuration(t, time.Now(), *completedJob.FinalizedAt, 2*time.Second)
updatedJob, err := bundle.exec.JobGetByID(ctx, job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateCompleted, updatedJob.State)
})
t.Run("ErrorIfNotRunning", func(t *testing.T) {
t.Parallel()
ctx, bundle := setup(ctx, t)
job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})
_, err := JobCompleteTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job})
require.EqualError(t, err, "job must be running")
})
}