4
4
"context"
5
5
"database/sql"
6
6
"database/sql/driver"
7
- "flag"
8
7
"fmt"
9
8
"io"
10
9
"net/url"
@@ -34,18 +33,14 @@ import (
34
33
// store to use the raw DialectPostgres dialect and setup a one-use database.
35
34
// See heavyweight.FullTestDB() as a convenience function to help you do this,
36
35
// but please use sparingly because as it's name implies, it is expensive.
37
- func RegisterTxDb (ctx context. Context , dbURL string ) error {
36
+ func RegisterTxDb (dbURL string ) error {
38
37
drivers := sql .Drivers ()
39
38
for _ , driver := range drivers {
40
39
if driver == string (TransactionWrappedPostgres ) {
41
40
// TxDB driver already registered
42
41
return nil
43
42
}
44
43
}
45
- testing .Init ()
46
- if ! flag .Parsed () {
47
- flag .Parse ()
48
- }
49
44
if testing .Short () {
50
45
// -short tests don't need a DB
51
46
return nil
@@ -60,15 +55,10 @@ func RegisterTxDb(ctx context.Context, dbURL string) error {
60
55
if ! strings .HasSuffix (parsed .Path , "_test" ) {
61
56
return fmt .Errorf ("cannot run tests against database named `%s`. Note that the test database MUST end in `_test` to differentiate from a possible production DB. HINT: Try postgresql://postgres@localhost:5432/chainlink_test?sslmode=disable" , parsed .Path [1 :])
62
57
}
63
- abort := make (chan struct {})
64
- go func () {
65
- <- ctx .Done ()
66
- abort <- struct {}{} // abort all queries when context is cancelled
67
- }()
68
58
69
59
name := string (TransactionWrappedPostgres )
70
60
sql .Register (name , & txDriver {
71
- abort : abort ,
61
+ abort : make ( chan struct {}) ,
72
62
dbURL : dbURL ,
73
63
conns : make (map [string ]* conn ),
74
64
})
@@ -85,7 +75,7 @@ var _ driver.SessionResetter = &conn{}
85
75
// When `Close` is called, transaction is rolled back.
86
76
type txDriver struct {
87
77
sync.Mutex
88
- abort <- chan struct {}
78
+ abort chan struct {}
89
79
db * sql.DB
90
80
conns map [string ]* conn
91
81
@@ -130,6 +120,7 @@ func (d *txDriver) deleteConn(c *conn) error {
130
120
}
131
121
delete (d .conns , c .dsn )
132
122
if len (d .conns ) == 0 && d .db != nil {
123
+ close (d .abort )
133
124
if err := d .db .Close (); err != nil {
134
125
return err
135
126
}
@@ -152,7 +143,7 @@ func (c *conn) Begin() (driver.Tx, error) {
152
143
c .Lock ()
153
144
defer c .Unlock ()
154
145
if c .closed {
155
- panic ("conn is closed" )
146
+ return nil , fmt . Errorf ("conn is closed" )
156
147
}
157
148
// Begin is a noop because the transaction was already opened
158
149
return tx {c .tx }, nil
@@ -177,7 +168,7 @@ func (c *conn) PrepareContext(_ context.Context, query string) (driver.Stmt, err
177
168
c .Lock ()
178
169
defer c .Unlock ()
179
170
if c .closed {
180
- panic ("conn is closed" )
171
+ return nil , fmt . Errorf ("conn is closed" )
181
172
}
182
173
183
174
// It is not safe to give the passed in context to the tx directly
@@ -226,7 +217,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
226
217
c .Lock ()
227
218
defer c .Unlock ()
228
219
if c .closed {
229
- panic ("conn is closed" )
220
+ return nil , fmt . Errorf ("conn is closed" )
230
221
}
231
222
232
223
ctx , cancel := utils .ContextFromChan (c .abort )
@@ -277,38 +268,43 @@ func (c *conn) tryOpen() bool {
277
268
// Drivers must ensure all network calls made by Close
278
269
// do not block indefinitely (e.g. apply a timeout).
279
270
func (c * conn ) Close () (err error ) {
280
- if ! c .close () {
281
- return
271
+ newlyClosed , err := c .close ()
272
+ if err != nil {
273
+ return err
274
+ }
275
+ if ! newlyClosed {
276
+ return nil
282
277
}
278
+
283
279
// Wait to remove self to avoid nesting locks.
284
- if err : = c .removeSelf (); err != nil {
285
- panic ( err )
280
+ if err = c .removeSelf (); err != nil {
281
+ return err
286
282
}
287
283
return
288
284
}
289
285
290
- func (c * conn ) close () bool {
286
+ func (c * conn ) close () ( bool , error ) {
291
287
c .Lock ()
292
288
defer c .Unlock ()
293
289
if c .closed {
294
290
// Double close, should be a safe to make this a noop
295
291
// PGX allows double close
296
292
// See: https://github.com/jackc/pgx/blob/a457da8bffa4f90ad672fa093ee87f20cf06687b/conn.go#L249
297
- return false
293
+ return false , nil
298
294
}
299
295
300
296
c .opened --
301
297
if c .opened > 0 {
302
- return false
298
+ return false , nil
303
299
}
304
300
if c .tx != nil {
305
301
if err := c .tx .Rollback (); err != nil {
306
- panic ( err )
302
+ return false , err
307
303
}
308
304
c .tx = nil
309
305
}
310
306
c .closed = true
311
- return true
307
+ return true , nil
312
308
}
313
309
314
310
type tx struct {
@@ -335,7 +331,7 @@ func (s stmt) Exec(args []driver.Value) (driver.Result, error) {
335
331
s .conn .Lock ()
336
332
defer s .conn .Unlock ()
337
333
if s .conn .closed {
338
- panic ("conn is closed" )
334
+ return nil , fmt . Errorf ("conn is closed" )
339
335
}
340
336
return s .st .Exec (mapArgs (args )... )
341
337
}
@@ -345,7 +341,7 @@ func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (drive
345
341
s .conn .Lock ()
346
342
defer s .conn .Unlock ()
347
343
if s .conn .closed {
348
- panic ("conn is closed" )
344
+ return nil , fmt . Errorf ("conn is closed" )
349
345
}
350
346
351
347
ctx , cancel := utils .ContextFromChan (s .abort )
@@ -370,7 +366,7 @@ func (s stmt) Query(args []driver.Value) (driver.Rows, error) {
370
366
s .conn .Lock ()
371
367
defer s .conn .Unlock ()
372
368
if s .conn .closed {
373
- panic ("conn is closed" )
369
+ return nil , fmt . Errorf ("conn is closed" )
374
370
}
375
371
rows , err := s .st .Query (mapArgs (args )... )
376
372
defer func () {
@@ -387,7 +383,7 @@ func (s *stmt) QueryContext(_ context.Context, args []driver.NamedValue) (driver
387
383
s .conn .Lock ()
388
384
defer s .conn .Unlock ()
389
385
if s .conn .closed {
390
- panic ("conn is closed" )
386
+ return nil , fmt . Errorf ("conn is closed" )
391
387
}
392
388
393
389
ctx , cancel := utils .ContextFromChan (s .abort )
0 commit comments