Skip to content

Commit

Permalink
Make the wasTerminated variable atomic (#121)
Browse files Browse the repository at this point in the history
The global variable wasTerminated must be atomic because it is used
simultaneously in several different goroutines.

No tests were added because it is a rare case of a race condition that is
nearly impossible to reproduce in a stable manner.

Ticket: ADBDEV-6797
  • Loading branch information
RekGRpth authored Dec 5, 2024
1 parent e7ca364 commit 1c8b12e
Show file tree
Hide file tree
Showing 14 changed files with 53 additions and 48 deletions.
16 changes: 8 additions & 8 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func backupGlobals(metadataFile *utils.FileWithByteCount) {
}

func backupPredata(metadataFile *utils.FileWithByteCount, tables []Table, tableOnly bool) {
if wasTerminated {
if wasTerminated.Load() {
return
}
gplog.Info("Writing pre-data metadata")
Expand Down Expand Up @@ -290,7 +290,7 @@ func backupPredata(metadataFile *utils.FileWithByteCount, tables []Table, tableO
}

func backupData(tables []Table) {
if wasTerminated {
if wasTerminated.Load() {
return
}
if len(tables) == 0 {
Expand Down Expand Up @@ -326,7 +326,7 @@ func backupData(tables []Table) {
}

func backupPostdata(metadataFile *utils.FileWithByteCount) {
if wasTerminated {
if wasTerminated.Load() {
return
}
gplog.Info("Writing post-data metadata")
Expand All @@ -349,7 +349,7 @@ func backupPostdata(metadataFile *utils.FileWithByteCount) {
}

func backupStatistics(tables []Table) {
if wasTerminated {
if wasTerminated.Load() {
return
}
statisticsFilename := globalFPInfo.GetStatisticsFilePath()
Expand All @@ -365,7 +365,7 @@ func DoTeardown() {
backupFailed := false
defer func() {
// If the backup was terminated, the signal handler will handle cleanup
if wasTerminated {
if wasTerminated.Load() {
CleanupGroup.Wait()
} else {
DoCleanup(backupFailed)
Expand All @@ -389,7 +389,7 @@ func DoTeardown() {
}
backupFailed = true
}
if wasTerminated {
if wasTerminated.Load() {
/*
* Don't print an error or create a report file if the backup was canceled,
* as the signal handler will take care of cleanup and return codes. Just
Expand Down Expand Up @@ -491,7 +491,7 @@ func DoCleanup(backupFailed bool) {
utils.CleanUpHelperFilesOnAllHosts(globalCluster, globalFPInfo, cleanupTimeout)

// Check gpbackup_helper errors here if backup was terminated
if wasTerminated {
if wasTerminated.Load() {
err := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo)
if err != nil {
gplog.Error(err.Error())
Expand Down Expand Up @@ -586,7 +586,7 @@ func GetVersion() string {
}

func logCompletionMessage(msg string) {
if wasTerminated {
if wasTerminated.Load() {
gplog.Info("%s incomplete", msg)
} else {
gplog.Info("%s complete", msg)
Expand Down
8 changes: 4 additions & 4 deletions backup/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type BackupProgressCounters struct {
}

func CopyTableOut(queryContext context.Context, connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) {
if wasTerminated {
if wasTerminated.Load() {
return -1, nil
}
checkPipeExistsCommand := ""
Expand Down Expand Up @@ -223,7 +223,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
if wasTerminated || isErroredBackup.Load() {
if wasTerminated.Load() || isErroredBackup.Load() {
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
cancel()
return
Expand Down Expand Up @@ -313,7 +313,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
if wasTerminated || isErroredBackup.Load() {
if wasTerminated.Load() || isErroredBackup.Load() {
cancel()
return
}
Expand Down Expand Up @@ -354,7 +354,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
if backupSnapshot == "" {
allWorkersTerminatedLogged := false
for _, table := range tables {
if wasTerminated || isErroredBackup.Load() {
if wasTerminated.Load() || isErroredBackup.Load() {
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
break
}
Expand Down
5 changes: 3 additions & 2 deletions backup/global_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backup

import (
"sync"
"sync/atomic"

"github.com/greenplum-db/gp-common-go-libs/cluster"
"github.com/greenplum-db/gp-common-go-libs/dbconn"
Expand All @@ -27,7 +28,7 @@ const (
Deferred
Complete
PG_LOCK_NOT_AVAILABLE = "55P03"
ENUM_TYPE_OID = 3500
ENUM_TYPE_OID = 3500
)

/*
Expand All @@ -43,7 +44,7 @@ var (
objectCounts map[string]int
pluginConfig *utils.PluginConfig
version string
wasTerminated bool
wasTerminated atomic.Bool
backupLockFile lockfile.Lockfile
filterRelationClause string
quotedRoleNames map[string]string
Expand Down
2 changes: 1 addition & 1 deletion backup/queries_relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func LockTables(connectionPool *dbconn.DBConn, tables []Relation) {
for i, currentBatch := range tableBatches {
_, err := connectionPool.Exec(fmt.Sprintf("LOCK TABLE %s %s", currentBatch, lockMode))
if err != nil {
if wasTerminated {
if wasTerminated.Load() {
gplog.Warn("Interrupt received while acquiring ACCESS SHARE locks on tables")
select {} // wait for cleanup thread to exit gpbackup
} else {
Expand Down
2 changes: 1 addition & 1 deletion helper/backup_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func doBackupAgent() error {
*/
for i, oid := range oidList {
currentPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i])
if wasTerminated {
if wasTerminated.Load() {
logError("Terminated due to user request")
return errors.New("Terminated due to user request")
}
Expand Down
10 changes: 5 additions & 5 deletions helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
var (
CleanupGroup *sync.WaitGroup
version string
wasTerminated bool
wasTerminated atomic.Bool
wasSigpiped atomic.Bool
writeHandle *os.File
writer *bufio.Writer
Expand Down Expand Up @@ -60,7 +60,7 @@ var (
func DoHelper() {
var err error
defer func() {
if wasTerminated {
if wasTerminated.Load() {
CleanupGroup.Wait()
return
}
Expand Down Expand Up @@ -163,8 +163,8 @@ func InitializeSignalHandler() {
terminatedChan <- true
}
}()
wasTerminated = <-terminatedChan
if wasTerminated {
wasTerminated.Store(<-terminatedChan)
if wasTerminated.Load() {
DoCleanup()
os.Exit(2)
} else {
Expand Down Expand Up @@ -275,7 +275,7 @@ func flushAndCloseRestoreWriter(pipeName string, oid int) error {

func DoCleanup() {
defer CleanupGroup.Done()
if wasTerminated {
if wasTerminated.Load() {
/*
* If the agent dies during the last table copy, it can still report
* success, so we create an error file and check for its presence in
Expand Down
2 changes: 1 addition & 1 deletion helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func doRestoreAgent() error {
batchNum := oidWithBatch.batch

contentToRestore := *content + (*destSize * batchNum)
if wasTerminated {
if wasTerminated.Load() {
logError("Terminated due to user request")
return errors.New("Terminated due to user request")
}
Expand Down
6 changes: 3 additions & 3 deletions restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
)

func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) {
if wasTerminated {
if wasTerminated.Load() {
return -1, nil
}
whichConn = connectionPool.ValidateConnNum(whichConn)
Expand Down Expand Up @@ -234,7 +234,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co

utils.WriteOidListToSegments(oidList, globalCluster, fpInfo, "oid")
initialPipes := CreateInitialSegmentPipes(oidList, globalCluster, connectionPool, fpInfo)
if wasTerminated {
if wasTerminated.Load() {
return 0
}
isFilter := false
Expand Down Expand Up @@ -280,7 +280,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
if wasTerminated {
if wasTerminated.Load() {
dataProgressBar.(*pb.ProgressBar).NotPrint = true
cancel()
return
Expand Down
3 changes: 2 additions & 1 deletion restore/global_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package restore

import (
"sync"
"sync/atomic"

"github.com/greenplum-db/gp-common-go-libs/cluster"
"github.com/greenplum-db/gp-common-go-libs/dbconn"
Expand Down Expand Up @@ -36,7 +37,7 @@ var (
pluginConfig *utils.PluginConfig
restoreStartTime string
version string
wasTerminated bool
wasTerminated atomic.Bool
errorTablesMetadata map[string]Empty
errorTablesData map[string]Empty
opts *options.Options
Expand Down
2 changes: 1 addition & 1 deletion restore/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (

func executeStatementsForConn(statements chan toc.StatementWithType, fatalErr *error, numErrors *int32, progressBar utils.ProgressBar, whichConn int, executeInParallel bool) {
for statement := range statements {
if wasTerminated || *fatalErr != nil {
if wasTerminated.Load() || *fatalErr != nil {
if executeInParallel {
gplog.Error("Error detected on connection %d. Terminating transactions.", whichConn)
txMutex.Lock()
Expand Down
28 changes: 14 additions & 14 deletions restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func verifyIncrementalState() {
}

func restorePredata(metadataFilename string) {
if wasTerminated {
if wasTerminated.Load() {
return
}
var numErrors int32
Expand Down Expand Up @@ -356,7 +356,7 @@ func restorePredata(metadataFilename string) {
}

progressBar.Finish()
if wasTerminated {
if wasTerminated.Load() {
gplog.Info("Pre-data metadata restore incomplete")
} else if numErrors > 0 {
gplog.Info("Pre-data metadata restore completed with failures")
Expand All @@ -366,7 +366,7 @@ func restorePredata(metadataFilename string) {
}

func restoreSequenceValues(metadataFilename string) {
if wasTerminated {
if wasTerminated.Load() {
return
}
gplog.Info("Restoring sequence values")
Expand Down Expand Up @@ -396,7 +396,7 @@ func restoreSequenceValues(metadataFilename string) {
progressBar.Finish()
}

if wasTerminated {
if wasTerminated.Load() {
gplog.Info("Sequence values restore incomplete")
} else if numErrors > 0 {
gplog.Info("Sequence values restore completed with failures")
Expand Down Expand Up @@ -466,7 +466,7 @@ func editStatementsRedirectSchema(statements []toc.StatementWithType, redirectSc
}

func restoreData() (int, map[string][]toc.CoordinatorDataEntry) {
if wasTerminated {
if wasTerminated.Load() {
return -1, nil
}
restorePlan := backupConfig.RestorePlan
Expand Down Expand Up @@ -502,7 +502,7 @@ func restoreData() (int, map[string][]toc.CoordinatorDataEntry) {
}

dataProgressBar.Finish()
if wasTerminated {
if wasTerminated.Load() {
gplog.Info("Data restore incomplete")
} else if numErrors > 0 {
gplog.Info("Data restore completed with failures")
Expand All @@ -514,7 +514,7 @@ func restoreData() (int, map[string][]toc.CoordinatorDataEntry) {
}

func restorePostdata(metadataFilename string) {
if wasTerminated {
if wasTerminated.Load() {
return
}
gplog.Info("Restoring post-data metadata")
Expand All @@ -532,7 +532,7 @@ func restorePostdata(metadataFilename string) {
numErrors += ExecuteRestoreMetadataStatements("postdata", thirdBatch, "", progressBar, utils.PB_VERBOSE, connectionPool.NumConns > 1)
progressBar.Finish()

if wasTerminated {
if wasTerminated.Load() {
gplog.Info("Post-data metadata restore incomplete")
} else if numErrors > 0 {
gplog.Info("Post-data metadata restore completed with failures")
Expand All @@ -542,7 +542,7 @@ func restorePostdata(metadataFilename string) {
}

func restoreStatistics() {
if wasTerminated {
if wasTerminated.Load() {
return
}
statisticsFilename := globalFPInfo.GetStatisticsFilePath()
Expand All @@ -562,7 +562,7 @@ func restoreStatistics() {
}

func runAnalyze(filteredDataEntries map[string][]toc.CoordinatorDataEntry) {
if wasTerminated {
if wasTerminated.Load() {
return
}
gplog.Info("Running ANALYZE on restored tables")
Expand Down Expand Up @@ -591,7 +591,7 @@ func runAnalyze(filteredDataEntries map[string][]toc.CoordinatorDataEntry) {
numErrors := ExecuteStatements(analyzeStatements, progressBar, connectionPool.NumConns > 1)
progressBar.Finish()

if wasTerminated {
if wasTerminated.Load() {
gplog.Info("ANALYZE on restored tables incomplete")
} else if numErrors > 0 {
gplog.Info("ANALYZE on restored tables completed with failures")
Expand All @@ -604,7 +604,7 @@ func DoTeardown() {
restoreFailed := false
defer func() {
// If the restore was terminated, the signal handler will handle cleanup
if wasTerminated {
if wasTerminated.Load() {
CleanupGroup.Wait()
} else {
DoCleanup(restoreFailed)
Expand All @@ -629,7 +629,7 @@ func DoTeardown() {
}
restoreFailed = true
}
if wasTerminated {
if wasTerminated.Load() {
/*
* Don't print an error if the restore was canceled, as the signal handler
* will take care of cleanup and return codes. Just wait until the signal
Expand Down Expand Up @@ -740,7 +740,7 @@ func DoCleanup(restoreFailed bool) {
utils.CleanUpHelperFilesOnAllHosts(globalCluster, fpInfo, cleanupTimeout)

// Check gpbackup_helper errors here if restore was terminated
if wasTerminated {
if wasTerminated.Load() {
err := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo)
if err != nil {
gplog.Error(err.Error())
Expand Down
Loading

0 comments on commit 1c8b12e

Please sign in to comment.