-
Notifications
You must be signed in to change notification settings - Fork 531
/
provider.go
650 lines (604 loc) · 20.6 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
package goose
import (
"context"
"database/sql"
"errors"
"fmt"
"io/fs"
"math"
"strconv"
"strings"
"sync"
"github.com/pressly/goose/v3/database"
"github.com/pressly/goose/v3/internal/controller"
"github.com/pressly/goose/v3/internal/gooseutil"
"github.com/pressly/goose/v3/internal/sqlparser"
"go.uber.org/multierr"
)
// Provider is a goose migration provider.
type Provider struct {
// mu protects all accesses to the provider and must be held when calling operations on the
// database.
mu sync.Mutex
db *sql.DB
store *controller.StoreController
versionTableOnce sync.Once
fsys fs.FS
cfg config
// migrations are ordered by version in ascending order. This list will never be empty and
// contains all migrations known to the provider.
migrations []*Migration
}
// NewProvider returns a new goose provider.
//
// The caller is responsible for matching the database dialect with the database/sql driver. For
// example, if the database dialect is "postgres", the database/sql driver could be
// github.com/lib/pq or github.com/jackc/pgx. Each dialect has a corresponding [database.Dialect]
// constant backed by a default [database.Store] implementation. For more advanced use cases, such
// as using a custom table name or supplying a custom store implementation, see [WithStore].
//
// fsys is the filesystem used to read migration files, but may be nil. Most users will want to use
// [os.DirFS], os.DirFS("path/to/migrations"), to read migrations from the local filesystem.
// However, it is possible to use a different "filesystem", such as [embed.FS] or filter out
// migrations using [fs.Sub].
//
// See [ProviderOption] for more information on configuring the provider.
//
// Unless otherwise specified, all methods on Provider are safe for concurrent use.
func NewProvider(dialect Dialect, db *sql.DB, fsys fs.FS, opts ...ProviderOption) (*Provider, error) {
if db == nil {
return nil, errors.New("db must not be nil")
}
if fsys == nil {
fsys = noopFS{}
}
cfg := config{
registered: make(map[int64]*Migration),
excludePaths: make(map[string]bool),
excludeVersions: make(map[int64]bool),
logger: &stdLogger{},
}
for _, opt := range opts {
if err := opt.apply(&cfg); err != nil {
return nil, err
}
}
// Allow users to specify a custom store implementation, but only if they don't specify a
// dialect. If they specify a dialect, we'll use the default store implementation.
if dialect == "" && cfg.store == nil {
return nil, errors.New("dialect must not be empty")
}
if dialect != "" && cfg.store != nil {
return nil, errors.New("dialect must be empty when using a custom store implementation")
}
var store database.Store
if dialect != "" {
var err error
store, err = database.NewStore(dialect, DefaultTablename)
if err != nil {
return nil, err
}
} else {
store = cfg.store
}
if store.Tablename() == "" {
return nil, errors.New("invalid store implementation: table name must not be empty")
}
return newProvider(db, store, fsys, cfg, registeredGoMigrations /* global */)
}
func newProvider(
db *sql.DB,
store database.Store,
fsys fs.FS,
cfg config,
global map[int64]*Migration,
) (*Provider, error) {
// Collect migrations from the filesystem and merge with registered migrations.
//
// Note, we don't parse SQL migrations here. They are parsed lazily when required.
// feat(mf): we could add a flag to parse SQL migrations eagerly. This would allow us to return
// an error if there are any SQL parsing errors. This adds a bit overhead to startup though, so
// we should make it optional.
filesystemSources, err := collectFilesystemSources(fsys, false, cfg.excludePaths, cfg.excludeVersions)
if err != nil {
return nil, err
}
versionToGoMigration := make(map[int64]*Migration)
// Add user-registered Go migrations from the provider.
for version, m := range cfg.registered {
versionToGoMigration[version] = m
}
// Skip adding global Go migrations if explicitly disabled.
if cfg.disableGlobalRegistry {
// TODO(mf): let's add a warn-level log here to inform users if len(global) > 0. Would like
// to add this once we're on go1.21 and leverage the new slog package.
} else {
for version, m := range global {
if _, ok := versionToGoMigration[version]; ok {
return nil, fmt.Errorf("global go migration conflicts with provider-registered go migration with version %d", version)
}
versionToGoMigration[version] = m
}
}
// At this point we have all registered unique Go migrations (if any). We need to merge them
// with SQL migrations from the filesystem.
migrations, err := merge(filesystemSources, versionToGoMigration)
if err != nil {
return nil, err
}
if len(migrations) == 0 {
return nil, ErrNoMigrations
}
return &Provider{
db: db,
fsys: fsys,
cfg: cfg,
store: controller.NewStoreController(store),
migrations: migrations,
}, nil
}
// Status returns the status of all migrations, merging the list of migrations from the database and
// filesystem. The returned items are ordered by version, in ascending order.
func (p *Provider) Status(ctx context.Context) ([]*MigrationStatus, error) {
return p.status(ctx)
}
// HasPending returns true if there are pending migrations to apply, otherwise, it returns false. If
// out-of-order migrations are disabled, yet some are detected, this method returns an error.
//
// Note, this method will not use a SessionLocker if one is configured. This allows callers to check
// for pending migrations without blocking or being blocked by other operations.
func (p *Provider) HasPending(ctx context.Context) (bool, error) {
return p.hasPending(ctx)
}
// GetVersions returns the max database version and the target version to migrate to.
//
// Note, this method will not use a SessionLocker if one is configured. This allows callers to check
// for versions without blocking or being blocked by other operations.
func (p *Provider) GetVersions(ctx context.Context) (current, target int64, err error) {
return p.getVersions(ctx)
}
// GetDBVersion returns the highest version recorded in the database, regardless of the order in
// which migrations were applied. For example, if migrations were applied out of order (1,4,2,3),
// this method returns 4. If no migrations have been applied, it returns 0.
func (p *Provider) GetDBVersion(ctx context.Context) (int64, error) {
if p.cfg.disableVersioning {
return -1, errors.New("getting database version not supported when versioning is disabled")
}
return p.getDBMaxVersion(ctx, nil)
}
// ListSources returns a list of all migration sources known to the provider, sorted in ascending
// order by version. The path field may be empty for manually registered migrations, such as Go
// migrations registered using the [WithGoMigrations] option.
func (p *Provider) ListSources() []*Source {
sources := make([]*Source, 0, len(p.migrations))
for _, m := range p.migrations {
sources = append(sources, &Source{
Type: m.Type,
Path: m.Source,
Version: m.Version,
})
}
return sources
}
// Ping attempts to ping the database to verify a connection is available.
func (p *Provider) Ping(ctx context.Context) error {
return p.db.PingContext(ctx)
}
// Close closes the database connection initially supplied to the provider.
func (p *Provider) Close() error {
return p.db.Close()
}
// ApplyVersion applies exactly one migration for the specified version. If there is no migration
// available for the specified version, this method returns [ErrVersionNotFound]. If the migration
// has already been applied, this method returns [ErrAlreadyApplied].
//
// The direction parameter determines the migration direction: true for up migration and false for
// down migration.
func (p *Provider) ApplyVersion(ctx context.Context, version int64, direction bool) (*MigrationResult, error) {
res, err := p.apply(ctx, version, direction)
if err != nil {
return nil, err
}
// This should never happen, we must return exactly one result.
if len(res) != 1 {
versions := make([]string, 0, len(res))
for _, r := range res {
versions = append(versions, strconv.FormatInt(r.Source.Version, 10))
}
return nil, fmt.Errorf(
"unexpected number of migrations applied running apply, expecting exactly one result: %v",
strings.Join(versions, ","),
)
}
return res[0], nil
}
// Up applies all pending migrations. If there are no new migrations to apply, this method returns
// empty list and nil error.
func (p *Provider) Up(ctx context.Context) ([]*MigrationResult, error) {
hasPending, err := p.HasPending(ctx)
if err != nil {
return nil, err
}
if !hasPending {
return nil, nil
}
return p.up(ctx, false, math.MaxInt64)
}
// UpByOne applies the next pending migration. If there is no next migration to apply, this method
// returns [ErrNoNextVersion].
func (p *Provider) UpByOne(ctx context.Context) (*MigrationResult, error) {
hasPending, err := p.HasPending(ctx)
if err != nil {
return nil, err
}
if !hasPending {
return nil, ErrNoNextVersion
}
res, err := p.up(ctx, true, math.MaxInt64)
if err != nil {
return nil, err
}
if len(res) == 0 {
return nil, ErrNoNextVersion
}
// This should never happen, we must return exactly one result.
if len(res) != 1 {
versions := make([]string, 0, len(res))
for _, r := range res {
versions = append(versions, strconv.FormatInt(r.Source.Version, 10))
}
return nil, fmt.Errorf(
"unexpected number of migrations applied running up-by-one, expecting exactly one result: %v",
strings.Join(versions, ","),
)
}
return res[0], nil
}
// UpTo applies all pending migrations up to, and including, the specified version. If there are no
// migrations to apply, this method returns empty list and nil error.
//
// For example, if there are three new migrations (9,10,11) and the current database version is 8
// with a requested version of 10, only versions 9,10 will be applied.
func (p *Provider) UpTo(ctx context.Context, version int64) ([]*MigrationResult, error) {
hasPending, err := p.HasPending(ctx)
if err != nil {
return nil, err
}
if !hasPending {
return nil, nil
}
return p.up(ctx, false, version)
}
// Down rolls back the most recently applied migration. If there are no migrations to rollback, this
// method returns [ErrNoNextVersion].
//
// Note, migrations are rolled back in the order they were applied. And not in the reverse order of
// the migration version. This only applies in scenarios where migrations are allowed to be applied
// out of order.
func (p *Provider) Down(ctx context.Context) (*MigrationResult, error) {
res, err := p.down(ctx, true, 0)
if err != nil {
return nil, err
}
if len(res) == 0 {
return nil, ErrNoNextVersion
}
// This should never happen, we must return exactly one result.
if len(res) != 1 {
versions := make([]string, 0, len(res))
for _, r := range res {
versions = append(versions, strconv.FormatInt(r.Source.Version, 10))
}
return nil, fmt.Errorf(
"unexpected number of migrations applied running down, expecting exactly one result: %v",
strings.Join(versions, ","),
)
}
return res[0], nil
}
// DownTo rolls back all migrations down to, but not including, the specified version.
//
// For example, if the current database version is 11,10,9... and the requested version is 9, only
// migrations 11, 10 will be rolled back.
//
// Note, migrations are rolled back in the order they were applied. And not in the reverse order of
// the migration version. This only applies in scenarios where migrations are allowed to be applied
// out of order.
func (p *Provider) DownTo(ctx context.Context, version int64) ([]*MigrationResult, error) {
if version < 0 {
return nil, fmt.Errorf("invalid version: must be a valid number or zero: %d", version)
}
return p.down(ctx, false, version)
}
// *** Internal methods ***
func (p *Provider) up(
ctx context.Context,
byOne bool,
version int64,
) (_ []*MigrationResult, retErr error) {
if version < 1 {
return nil, errInvalidVersion
}
conn, cleanup, err := p.initialize(ctx, true)
if err != nil {
return nil, fmt.Errorf("failed to initialize: %w", err)
}
defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
if len(p.migrations) == 0 {
return nil, nil
}
var apply []*Migration
if p.cfg.disableVersioning {
if byOne {
return nil, errors.New("up-by-one not supported when versioning is disabled")
}
apply = p.migrations
} else {
// optimize(mf): Listing all migrations from the database isn't great.
//
// The ideal implementation would be to query for the current max version and then apply
// migrations greater than that version. However, a nice property of the current
// implementation is that we can make stronger guarantees about unapplied migrations.
//
// In cases where users do not use out-of-order migrations, we want to surface an error if
// there are older unapplied migrations. See https://github.com/pressly/goose/issues/761 for
// more details.
//
// And in cases where users do use out-of-order migrations, we need to build a list of older
// migrations that need to be applied, so we need to query for all migrations anyways.
dbMigrations, err := p.store.ListMigrations(ctx, conn)
if err != nil {
return nil, err
}
if len(dbMigrations) == 0 {
return nil, errMissingZeroVersion
}
versions, err := gooseutil.UpVersions(
getVersionsFromMigrations(p.migrations), // fsys versions
getVersionsFromListMigrations(dbMigrations), // db versions
version,
p.cfg.allowMissing,
)
if err != nil {
return nil, err
}
for _, v := range versions {
m, err := p.getMigration(v)
if err != nil {
return nil, err
}
apply = append(apply, m)
}
}
return p.runMigrations(ctx, conn, apply, sqlparser.DirectionUp, byOne)
}
func (p *Provider) down(
ctx context.Context,
byOne bool,
version int64,
) (_ []*MigrationResult, retErr error) {
conn, cleanup, err := p.initialize(ctx, true)
if err != nil {
return nil, fmt.Errorf("failed to initialize: %w", err)
}
defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
if len(p.migrations) == 0 {
return nil, nil
}
if p.cfg.disableVersioning {
var downMigrations []*Migration
if byOne {
last := p.migrations[len(p.migrations)-1]
downMigrations = []*Migration{last}
} else {
downMigrations = p.migrations
}
return p.runMigrations(ctx, conn, downMigrations, sqlparser.DirectionDown, byOne)
}
dbMigrations, err := p.store.ListMigrations(ctx, conn)
if err != nil {
return nil, err
}
if len(dbMigrations) == 0 {
return nil, errMissingZeroVersion
}
// We never migrate the zero version down.
if dbMigrations[0].Version == 0 {
p.printf("no migrations to run, current version: 0")
return nil, nil
}
var apply []*Migration
for _, dbMigration := range dbMigrations {
if dbMigration.Version <= version {
break
}
m, err := p.getMigration(dbMigration.Version)
if err != nil {
return nil, err
}
apply = append(apply, m)
}
return p.runMigrations(ctx, conn, apply, sqlparser.DirectionDown, byOne)
}
func (p *Provider) apply(
ctx context.Context,
version int64,
direction bool,
) (_ []*MigrationResult, retErr error) {
if version < 1 {
return nil, errInvalidVersion
}
m, err := p.getMigration(version)
if err != nil {
return nil, err
}
conn, cleanup, err := p.initialize(ctx, true)
if err != nil {
return nil, fmt.Errorf("failed to initialize: %w", err)
}
defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
result, err := p.store.GetMigration(ctx, conn, version)
if err != nil && !errors.Is(err, database.ErrVersionNotFound) {
return nil, err
}
// There are a few states here:
// 1. direction is up
// a. migration is applied, this is an error (ErrAlreadyApplied)
// b. migration is not applied, apply it
if direction && result != nil {
return nil, fmt.Errorf("version %d: %w", version, ErrAlreadyApplied)
}
// 2. direction is down
// a. migration is applied, rollback
// b. migration is not applied, this is an error (ErrNotApplied)
if !direction && result == nil {
return nil, fmt.Errorf("version %d: %w", version, ErrNotApplied)
}
d := sqlparser.DirectionDown
if direction {
d = sqlparser.DirectionUp
}
return p.runMigrations(ctx, conn, []*Migration{m}, d, true)
}
func (p *Provider) getVersions(ctx context.Context) (current, target int64, retErr error) {
conn, cleanup, err := p.initialize(ctx, false)
if err != nil {
return -1, -1, fmt.Errorf("failed to initialize: %w", err)
}
defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
target = p.migrations[len(p.migrations)-1].Version
// If versioning is disabled, we always have pending migrations and the target version is the
// last migration.
if p.cfg.disableVersioning {
return -1, target, nil
}
current, err = p.store.GetLatestVersion(ctx, conn)
if err != nil {
if errors.Is(err, database.ErrVersionNotFound) {
return -1, target, errMissingZeroVersion
}
return -1, target, err
}
return current, target, nil
}
func (p *Provider) hasPending(ctx context.Context) (_ bool, retErr error) {
conn, cleanup, err := p.initialize(ctx, false)
if err != nil {
return false, fmt.Errorf("failed to initialize: %w", err)
}
defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
// If versioning is disabled, we always have pending migrations.
if p.cfg.disableVersioning {
return true, nil
}
// List all migrations from the database. Careful, optimizations here can lead to subtle bugs.
// We have 2 important cases to consider:
//
// 1. Users have enabled out-of-order migrations, in which case we need to check if any
// migrations are missing and report that there are pending migrations. Do not surface an
// error because this is a valid state.
//
// 2. Users have disabled out-of-order migrations (default), in which case we need to check if all
// migrations have been applied. We cannot check for the highest applied version because we lose the
// ability to surface an error if an out-of-order migration was introduced. It would be silently
// ignored and the user would not know that they have unapplied migrations.
//
// Maybe we could consider adding a flag to the provider such as IgnoreMissing, which would
// allow silently ignoring missing migrations. This would be useful for users that have built
// checks that prevent missing migrations from being introduced.
dbMigrations, err := p.store.ListMigrations(ctx, conn)
if err != nil {
return false, err
}
apply, err := gooseutil.UpVersions(
getVersionsFromMigrations(p.migrations), // fsys versions
getVersionsFromListMigrations(dbMigrations), // db versions
math.MaxInt64,
p.cfg.allowMissing,
)
if err != nil {
return false, err
}
return len(apply) > 0, nil
}
func getVersionsFromMigrations(in []*Migration) []int64 {
out := make([]int64, 0, len(in))
for _, m := range in {
out = append(out, m.Version)
}
return out
}
func getVersionsFromListMigrations(in []*database.ListMigrationsResult) []int64 {
out := make([]int64, 0, len(in))
for _, m := range in {
out = append(out, m.Version)
}
return out
}
func (p *Provider) status(ctx context.Context) (_ []*MigrationStatus, retErr error) {
conn, cleanup, err := p.initialize(ctx, true)
if err != nil {
return nil, fmt.Errorf("failed to initialize: %w", err)
}
defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
status := make([]*MigrationStatus, 0, len(p.migrations))
for _, m := range p.migrations {
migrationStatus := &MigrationStatus{
Source: &Source{
Type: m.Type,
Path: m.Source,
Version: m.Version,
},
State: StatePending,
}
// If versioning is disabled, we can't check the database for applied migrations, so we
// assume all migrations are pending.
if !p.cfg.disableVersioning {
dbResult, err := p.store.GetMigration(ctx, conn, m.Version)
if err != nil && !errors.Is(err, database.ErrVersionNotFound) {
return nil, err
}
if dbResult != nil {
migrationStatus.State = StateApplied
migrationStatus.AppliedAt = dbResult.Timestamp
}
}
status = append(status, migrationStatus)
}
return status, nil
}
// getDBMaxVersion returns the highest version recorded in the database, regardless of the order in
// which migrations were applied. conn may be nil, in which case a connection is initialized.
func (p *Provider) getDBMaxVersion(ctx context.Context, conn *sql.Conn) (_ int64, retErr error) {
if conn == nil {
var cleanup func() error
var err error
conn, cleanup, err = p.initialize(ctx, true)
if err != nil {
return 0, err
}
defer func() {
retErr = multierr.Append(retErr, cleanup())
}()
}
latest, err := p.store.GetLatestVersion(ctx, conn)
if err != nil {
if errors.Is(err, database.ErrVersionNotFound) {
return 0, errMissingZeroVersion
}
return -1, err
}
return latest, nil
}