From ee572ee7a95ce88b38e79e9909735cba0ca3338b Mon Sep 17 00:00:00 2001 From: Nicolas Vinuesa Date: Wed, 30 Aug 2023 23:31:55 +0200 Subject: [PATCH] Implement timeout-based retry method for txns This patch is part of the set of patchs aiming to fix https://bugs.launchpad.net/juju/+bug/2031631. The idea here is to implement a transaction retry mechanism similar to the one being used in the official mongodb driver, i.e. a 120 seconds timeout for transactions to finish or waiting while being retried. The context passed to the mgo Run method is necessary in order to cancel the actual request that's being executed by mgo either after timeout, either because the user (juju) cancels the passed context. Tests have been fixed and some have been deleted since they are no longer relevant (we don't retry for a fixed number of times, instead we retry until timeout). --- export_test.go | 6 ++ incrementalprune_test.go | 5 +- txn.go | 135 ++++++++++++++++++++++----------------- txn_test.go | 60 +++-------------- txnsuite_test.go | 7 +- 5 files changed, 96 insertions(+), 117 deletions(-) diff --git a/export_test.go b/export_test.go index 788af16..bd5a788 100644 --- a/export_test.go +++ b/export_test.go @@ -19,6 +19,12 @@ func SetRunnerFunc(r Runner, f func() TxnRunner) { } } +// Specify the transaction timeout for some tests. +func SetTxnTimeout(r Runner, t time.Duration) { + inner := r.(*transactionRunner) + inner.txnTimeout = t +} + var CheckMongoSupportsOut = checkMongoSupportsOut // NewDBOracleNoOut is only used for testing. It forces the DBOracle to not ask diff --git a/incrementalprune_test.go b/incrementalprune_test.go index 969c940..0afe516 100644 --- a/incrementalprune_test.go +++ b/incrementalprune_test.go @@ -4,6 +4,7 @@ package txn import ( + "context" "fmt" "time" @@ -302,7 +303,7 @@ func (s *TxnSuite) TearDownTest(c *gc.C) { func (s *TxnSuite) runTxn(c *gc.C, ops ...txn.Op) bson.ObjectId { txnId := bson.NewObjectId() - err := s.runner.Run(ops, txnId, nil) + err := s.runner.Run(context.Background(), ops, txnId, nil) c.Assert(err, jc.ErrorIsNil) return txnId } @@ -314,7 +315,7 @@ func (s *TxnSuite) runInterruptedTxn(c *gc.C, breakpoint string, ops ...txn.Op) KillChance: 1, Breakpoint: breakpoint, }) - err := s.runner.Run(ops, txnId, nil) + err := s.runner.Run(context.Background(), ops, txnId, nil) c.Assert(err, gc.Equals, txn.ErrChaos) txn.SetChaos(txn.Chaos{}) return txnId diff --git a/txn.go b/txn.go index 7b35e58..a4e4e6a 100755 --- a/txn.go +++ b/txn.go @@ -13,6 +13,7 @@ package txn import ( + "context" "math/rand" "strings" "time" @@ -29,14 +30,6 @@ import ( var logger = loggo.GetLogger("juju.txn") const ( - // defaultClientTxnRetries is the default number of times a transaction will be retried - // when there is an invariant assertion failure (for client side transactions). - defaultClientTxnRetries = 3 - - // defaultServerTxnRetries is the default number of times a transaction will be retried - // when there is an invariant assertion failure (for server side transactions). - defaultServerTxnRetries = 50 - // defaultRetryBackoff is the default interval used to pause between // unsuccessful transaction operations. defaultRetryBackoff = 1 * time.Millisecond @@ -51,6 +44,10 @@ const ( // defaultChangeLogName is the default mgo transaction runner change log. defaultChangeLogName = "txns.log" + + // defaultTxnTimeoutSeconds is the default time length for a + // transaction to finish before it gets cancelled. + defaultTxnTimeoutSeconds = 120 ) const ( @@ -139,7 +136,7 @@ type Runner interface { } type txnRunner interface { - Run([]txn.Op, bson.ObjectId, interface{}) error + Run(context.Context, []txn.Op, bson.ObjectId, interface{}) error ChangeLog(*mgo.Collection) ResumeAll() error } @@ -160,12 +157,13 @@ type transactionRunner struct { clock Clock serverSideTransactions bool - nrRetries int retryBackoff time.Duration retryFuzzPercent int pauseFunc func(duration time.Duration) newRunner func() txnRunner + + txnTimeout time.Duration } var _ Runner = (*transactionRunner)(nil) @@ -248,7 +246,6 @@ func NewRunner(params RunnerParams) Runner { runTransactionObserver: params.RunTransactionObserver, clock: params.Clock, serverSideTransactions: sstxn, - nrRetries: params.MaxRetryAttempts, retryBackoff: params.RetryBackoff, retryFuzzPercent: params.RetryFuzzPercent, pauseFunc: params.PauseFunc, @@ -261,12 +258,6 @@ func NewRunner(params RunnerParams) Runner { } else if txnRunner.changeLogName == "" { txnRunner.changeLogName = defaultChangeLogName } - if txnRunner.nrRetries == 0 { - txnRunner.nrRetries = defaultClientTxnRetries - if txnRunner.serverSideTransactions { - txnRunner.nrRetries = defaultServerTxnRetries - } - } if txnRunner.retryBackoff == 0 { txnRunner.retryBackoff = defaultRetryBackoff } @@ -284,6 +275,7 @@ func NewRunner(params RunnerParams) Runner { // they also specify a RunTransactionObserver. txnRunner.clock = clock.WallClock } + txnRunner.txnTimeout = defaultTxnTimeoutSeconds * time.Second return txnRunner } @@ -301,53 +293,68 @@ func (tr *transactionRunner) newRunnerImpl() txnRunner { return runner } -// Run is defined on Runner. +// Run is defined on Runner. After timeout the transaction gets cancelled and +// the last returned error by the transaction will be returned. func (tr *transactionRunner) Run(transactions TransactionSource) error { - var lastErr error - for i := 0; i < tr.nrRetries; i++ { - // If we are retrying, give other txns a chance to have a go. - if i > 0 && tr.serverSideTransactions { - tr.backoff(i) - } - ops, err := transactions(i) - if err == ErrTransientFailure { - continue - } - if err == ErrNoOperations { - return nil - } - if err != nil { - return err - } - if len(ops) == 0 { - // Treat this the same as ErrNoOperations but don't suppress other errors. - return nil - } - if err = tr.RunTransaction(&Transaction{ - Ops: ops, - Attempt: i, - }); err == nil { - return nil - } else if err != txn.ErrAborted && !mgo.IsRetryable(err) && !mgo.IsSnapshotError(err) { - // Mongo very occasionally returns an intermittent - // "unexpected message" error. Retry those. - // Also mongo sometimes gets very busy and we get an - // i/o timeout. We retry those too. - // However if this is the last time, return that error - // rather than the excessive contention error. - msg := err.Error() - retryErr := strings.HasSuffix(msg, "unexpected message") || - strings.HasSuffix(msg, "i/o timeout") - if !retryErr || i == (tr.nrRetries-1) { + ctx, cancel := context.WithTimeout(context.TODO(), tr.txnTimeout) + defer cancel() + + var ( + lastErr error + i int + ) + for { + select { + case <-ctx.Done(): + if lastErr == txn.ErrAborted { + return ErrExcessiveContention + } + return lastErr + default: + // If we are retrying, give other txns a chance to have a go. + if i > 0 && tr.serverSideTransactions { + tr.backoff(i) + } + ops, err := transactions(i) + if err == ErrTransientFailure { + i++ + continue + } + if err == ErrNoOperations { + return nil + } + if err != nil { return err } + if len(ops) == 0 { + // Treat this the same as ErrNoOperations but don't suppress other errors. + return nil + } + if err = tr.runTransaction( + ctx, + &Transaction{ + Ops: ops, + Attempt: i, + }); err == nil { + return nil + } else if err != txn.ErrAborted && !mgo.IsRetryable(err) && !mgo.IsSnapshotError(err) { + // Mongo very occasionally returns an intermittent + // "unexpected message" error. Retry those. + // Also mongo sometimes gets very busy and we get an + // i/o timeout. We retry those too. + // However if this is the last time, return that error + // rather than the excessive contention error. + msg := err.Error() + retryErr := strings.HasSuffix(msg, "unexpected message") || + strings.HasSuffix(msg, "i/o timeout") + if !retryErr { + return err + } + } + lastErr = err + i++ } - lastErr = err } - if lastErr == txn.ErrAborted { - return ErrExcessiveContention - } - return lastErr } func (tr *transactionRunner) backoff(attempt int) { @@ -370,6 +377,14 @@ func (tr *transactionRunner) pause(dur time.Duration) { // RunTransaction is defined on Runner. func (tr *transactionRunner) RunTransaction(transaction *Transaction) error { + ctx, cancel := context.WithTimeout(context.TODO(), tr.txnTimeout) + defer cancel() + + return tr.runTransaction(ctx, transaction) +} + +// RunTransaction is defined on Runner. +func (tr *transactionRunner) runTransaction(ctx context.Context, transaction *Transaction) error { testHooks := <-tr.testHooks tr.testHooks <- nil if len(testHooks) > 0 { @@ -421,7 +436,7 @@ func (tr *transactionRunner) RunTransaction(transaction *Transaction) error { } } } - err := runner.Run(transaction.Ops, "", nil) + err := runner.Run(ctx, transaction.Ops, "", nil) if tr.runTransactionObserver != nil { transaction.Error = err transaction.Duration = tr.clock.Now().Sub(start) diff --git a/txn_test.go b/txn_test.go index 5ece369..64594c2 100644 --- a/txn_test.go +++ b/txn_test.go @@ -4,6 +4,7 @@ package txn_test import ( + "context" "errors" "fmt" "time" @@ -59,6 +60,8 @@ func (s *txnSuite) SetUpTest(c *gc.C) { s.backoffs = append(s.backoffs, dur) }, }) + // Set a smaller txn timeout than the default one for tests. + jujutxn.SetTxnTimeout(s.txnRunner, 100*time.Millisecond) s.supportsSST = false } @@ -123,6 +126,8 @@ func (s *sstxnSuite) SetUpTest(c *gc.C) { s.backoffs = append(s.backoffs, dur) }, }) + // Set a smaller txn timeout than the default one for tests. + jujutxn.SetTxnTimeout(s.txnRunner, 100*time.Millisecond) s.supportsSST = true } @@ -433,10 +438,8 @@ func (s *txnSuite) TestRetryHooks(c *gc.C) { } func (s *txnSuite) TestExcessiveContention(c *gc.C) { - maxAttempt := 0 // This keeps failing because the Assert is wrong. buildTxn := func(attempt int) ([]txn.Op, error) { - maxAttempt = attempt ops := []txn.Op{{ C: s.collection.Name, Id: "1", @@ -447,14 +450,9 @@ func (s *txnSuite) TestExcessiveContention(c *gc.C) { } err := s.txnRunner.Run(buildTxn) c.Assert(err, gc.Equals, jujutxn.ErrExcessiveContention) - if s.supportsSST { - c.Assert(maxAttempt, gc.Equals, 49) - } else { - c.Assert(maxAttempt, gc.Equals, 2) - } } -func (s *txnSuite) TestPause(c *gc.C) { +func (s *txnSuite) TestBackoff(c *gc.C) { buildTxn := func(attempt int) ([]txn.Op, error) { ops := []txn.Op{{ C: s.collection.Name, @@ -467,7 +465,7 @@ func (s *txnSuite) TestPause(c *gc.C) { err := s.txnRunner.Run(buildTxn) c.Assert(err, gc.Equals, jujutxn.ErrExcessiveContention) if s.supportsSST { - c.Assert(s.backoffs, gc.HasLen, 49) + // c.Assert(s.backoffs, gc.HasLen, 49) c.Assert(s.backoffs[48], jc.DurationLessThan, 50*time.Millisecond) for i := 0; i < len(s.backoffs); i++ { c.Assert(s.backoffs[i], jc.GreaterThan, 0) @@ -586,27 +584,6 @@ func (s *txnSuite) TestRunFailureIntermittentUnexpectedMessage(c *gc.C) { c.Check(tries, gc.Equals, 2) } -func (s *txnSuite) TestRunFailureAlwaysUnexpectedMessage(c *gc.C) { - runner := jujutxn.NewRunner(jujutxn.RunnerParams{}) - fake := &fakeRunner{errors: []error{ - errors.New("unexpected message"), - errors.New("unexpected message"), - errors.New("unexpected message"), - errors.New("unexpected message"), - }} - jujutxn.SetRunnerFunc(runner, fake.new) - tries := 0 - // Doesn't matter what this returns as long as it isn't an error. - buildTxn := func(attempt int) ([]txn.Op, error) { - tries++ - // return 1 op that happens to do nothing - return []txn.Op{{}}, nil - } - err := runner.Run(buildTxn) - c.Check(err, gc.ErrorMatches, "unexpected message") - c.Check(tries, gc.Equals, 3) -} - func (s *txnSuite) TestRunFailureIOTimeout(c *gc.C) { runner := jujutxn.NewRunner(jujutxn.RunnerParams{}) fake := &fakeRunner{errors: []error{errors.New("i/o timeout")}} @@ -623,27 +600,6 @@ func (s *txnSuite) TestRunFailureIOTimeout(c *gc.C) { c.Check(tries, gc.Equals, 2) } -func (s *txnSuite) TestRunFailureAlwaysIOTimeout(c *gc.C) { - runner := jujutxn.NewRunner(jujutxn.RunnerParams{}) - fake := &fakeRunner{errors: []error{ - errors.New("i/o timeout"), - errors.New("i/o timeout"), - errors.New("i/o timeout"), - errors.New("i/o timeout"), - }} - jujutxn.SetRunnerFunc(runner, fake.new) - tries := 0 - // Doesn't matter what this returns as long as it isn't an error. - buildTxn := func(attempt int) ([]txn.Op, error) { - tries++ - // return 1 op that happens to do nothing - return []txn.Op{{}}, nil - } - err := runner.Run(buildTxn) - c.Check(err, gc.ErrorMatches, "i/o timeout") - c.Check(tries, gc.Equals, 3) -} - func (s *txnSuite) TestRunTransactionObserver(c *gc.C) { var calls []jujutxn.Transaction clock := testclock.NewClock(time.Now()) @@ -701,7 +657,7 @@ func (f *fakeRunner) new() jujutxn.TxnRunner { return f } -func (f *fakeRunner) Run([]txn.Op, bson.ObjectId, interface{}) error { +func (f *fakeRunner) Run(context.Context, []txn.Op, bson.ObjectId, interface{}) error { if len(f.durations) > 0 && f.clock != nil { f.clock.Advance(f.durations[0]) f.durations = f.durations[1:] diff --git a/txnsuite_test.go b/txnsuite_test.go index 3a302eb..faba183 100644 --- a/txnsuite_test.go +++ b/txnsuite_test.go @@ -4,6 +4,7 @@ package txn_test import ( + "context" "sync/atomic" "time" @@ -55,7 +56,7 @@ func (s *TxnSuite) TearDownTest(c *gc.C) { func (s *TxnSuite) runTxn(c *gc.C, ops ...txn.Op) bson.ObjectId { txnId := bson.NewObjectId() - err := s.runner.Run(ops, txnId, nil) + err := s.runner.Run(context.Background(), ops, txnId, nil) c.Assert(err, jc.ErrorIsNil) return txnId } @@ -81,14 +82,14 @@ func timestampBasedTxnId(timestamp time.Time) bson.ObjectId { func (s *TxnSuite) runTxnWithTimestamp(c *gc.C, expectedErr error, timestamp time.Time, ops ...txn.Op) bson.ObjectId { txnId := timestampBasedTxnId(timestamp) c.Logf("generated txn %v from timestamp %v", txnId, timestamp) - err := s.runner.Run(ops, txnId, nil) + err := s.runner.Run(context.Background(), ops, txnId, nil) c.Assert(err, gc.Equals, expectedErr) return txnId } func (s *TxnSuite) runFailingTxn(c *gc.C, expectedErr error, ops ...txn.Op) bson.ObjectId { txnId := bson.NewObjectId() - err := s.runner.Run(ops, txnId, nil) + err := s.runner.Run(context.Background(), ops, txnId, nil) c.Assert(err, gc.Equals, expectedErr) return txnId }