diff --git a/backup/backup.go b/backup/backup.go index 80edede48..7de9bbbcd 100644 --- a/backup/backup.go +++ b/backup/backup.go @@ -235,7 +235,7 @@ func backupGlobals(metadataFile *utils.FileWithByteCount) { } func backupPredata(metadataFile *utils.FileWithByteCount, tables []Table, tableOnly bool) { - if wasTerminated { + if wasTerminated.Load() { return } gplog.Info("Writing pre-data metadata") @@ -290,7 +290,7 @@ func backupPredata(metadataFile *utils.FileWithByteCount, tables []Table, tableO } func backupData(tables []Table) { - if wasTerminated { + if wasTerminated.Load() { return } if len(tables) == 0 { @@ -326,7 +326,7 @@ func backupData(tables []Table) { } func backupPostdata(metadataFile *utils.FileWithByteCount) { - if wasTerminated { + if wasTerminated.Load() { return } gplog.Info("Writing post-data metadata") @@ -349,7 +349,7 @@ func backupPostdata(metadataFile *utils.FileWithByteCount) { } func backupStatistics(tables []Table) { - if wasTerminated { + if wasTerminated.Load() { return } statisticsFilename := globalFPInfo.GetStatisticsFilePath() @@ -365,7 +365,7 @@ func DoTeardown() { backupFailed := false defer func() { // If the backup was terminated, the signal handler will handle cleanup - if wasTerminated { + if wasTerminated.Load() { CleanupGroup.Wait() } else { DoCleanup(backupFailed) @@ -389,7 +389,7 @@ func DoTeardown() { } backupFailed = true } - if wasTerminated { + if wasTerminated.Load() { /* * Don't print an error or create a report file if the backup was canceled, * as the signal handler will take care of cleanup and return codes. Just @@ -491,7 +491,7 @@ func DoCleanup(backupFailed bool) { utils.CleanUpHelperFilesOnAllHosts(globalCluster, globalFPInfo, cleanupTimeout) // Check gpbackup_helper errors here if backup was terminated - if wasTerminated { + if wasTerminated.Load() { err := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo) if err != nil { gplog.Error(err.Error()) @@ -586,7 +586,7 @@ func GetVersion() string { } func logCompletionMessage(msg string) { - if wasTerminated { + if wasTerminated.Load() { gplog.Info("%s incomplete", msg) } else { gplog.Info("%s complete", msg) diff --git a/backup/data.go b/backup/data.go index 5d7606f8e..09ce298e1 100644 --- a/backup/data.go +++ b/backup/data.go @@ -64,7 +64,7 @@ type BackupProgressCounters struct { } func CopyTableOut(queryContext context.Context, connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) { - if wasTerminated { + if wasTerminated.Load() { return -1, nil } checkPipeExistsCommand := "" @@ -223,7 +223,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { return // Error somewhere, terminate default: // Default is must to avoid blocking } - if wasTerminated || isErroredBackup.Load() { + if wasTerminated.Load() || isErroredBackup.Load() { counters.ProgressBar.(*pb.ProgressBar).NotPrint = true cancel() return @@ -313,7 +313,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { return // Error somewhere, terminate default: // Default is must to avoid blocking } - if wasTerminated || isErroredBackup.Load() { + if wasTerminated.Load() || isErroredBackup.Load() { cancel() return } @@ -354,7 +354,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { if backupSnapshot == "" { allWorkersTerminatedLogged := false for _, table := range tables { - if wasTerminated || isErroredBackup.Load() { + if wasTerminated.Load() || isErroredBackup.Load() { counters.ProgressBar.(*pb.ProgressBar).NotPrint = true break } diff --git a/backup/global_variables.go b/backup/global_variables.go index b96c7d8df..3baa8b3f0 100644 --- a/backup/global_variables.go +++ b/backup/global_variables.go @@ -2,6 +2,7 @@ package backup import ( "sync" + "sync/atomic" "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/dbconn" @@ -27,7 +28,7 @@ const ( Deferred Complete PG_LOCK_NOT_AVAILABLE = "55P03" - ENUM_TYPE_OID = 3500 + ENUM_TYPE_OID = 3500 ) /* @@ -43,7 +44,7 @@ var ( objectCounts map[string]int pluginConfig *utils.PluginConfig version string - wasTerminated bool + wasTerminated atomic.Bool backupLockFile lockfile.Lockfile filterRelationClause string quotedRoleNames map[string]string diff --git a/backup/queries_relations.go b/backup/queries_relations.go index 232ca275e..e0d293496 100644 --- a/backup/queries_relations.go +++ b/backup/queries_relations.go @@ -560,7 +560,7 @@ func LockTables(connectionPool *dbconn.DBConn, tables []Relation) { for i, currentBatch := range tableBatches { _, err := connectionPool.Exec(fmt.Sprintf("LOCK TABLE %s %s", currentBatch, lockMode)) if err != nil { - if wasTerminated { + if wasTerminated.Load() { gplog.Warn("Interrupt received while acquiring ACCESS SHARE locks on tables") select {} // wait for cleanup thread to exit gpbackup } else { diff --git a/helper/backup_helper.go b/helper/backup_helper.go index b72f7252d..1b0ebbb68 100644 --- a/helper/backup_helper.go +++ b/helper/backup_helper.go @@ -42,7 +42,7 @@ func doBackupAgent() error { */ for i, oid := range oidList { currentPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i]) - if wasTerminated { + if wasTerminated.Load() { logError("Terminated due to user request") return errors.New("Terminated due to user request") } diff --git a/helper/helper.go b/helper/helper.go index 5c0a30675..1ca431ebd 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -26,7 +26,7 @@ import ( var ( CleanupGroup *sync.WaitGroup version string - wasTerminated bool + wasTerminated atomic.Bool wasSigpiped atomic.Bool writeHandle *os.File writer *bufio.Writer @@ -60,7 +60,7 @@ var ( func DoHelper() { var err error defer func() { - if wasTerminated { + if wasTerminated.Load() { CleanupGroup.Wait() return } @@ -163,8 +163,8 @@ func InitializeSignalHandler() { terminatedChan <- true } }() - wasTerminated = <-terminatedChan - if wasTerminated { + wasTerminated.Store(<-terminatedChan) + if wasTerminated.Load() { DoCleanup() os.Exit(2) } else { @@ -275,7 +275,7 @@ func flushAndCloseRestoreWriter(pipeName string, oid int) error { func DoCleanup() { defer CleanupGroup.Done() - if wasTerminated { + if wasTerminated.Load() { /* * If the agent dies during the last table copy, it can still report * success, so we create an error file and check for its presence in diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 331cf72d0..e4218cdce 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -234,7 +234,7 @@ func doRestoreAgent() error { batchNum := oidWithBatch.batch contentToRestore := *content + (*destSize * batchNum) - if wasTerminated { + if wasTerminated.Load() { logError("Terminated due to user request") return errors.New("Terminated due to user request") } diff --git a/restore/data.go b/restore/data.go index 0f58f4122..1fb040faf 100644 --- a/restore/data.go +++ b/restore/data.go @@ -27,7 +27,7 @@ var ( ) func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) { - if wasTerminated { + if wasTerminated.Load() { return -1, nil } whichConn = connectionPool.ValidateConnNum(whichConn) @@ -234,7 +234,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co utils.WriteOidListToSegments(oidList, globalCluster, fpInfo, "oid") initialPipes := CreateInitialSegmentPipes(oidList, globalCluster, connectionPool, fpInfo) - if wasTerminated { + if wasTerminated.Load() { return 0 } isFilter := false @@ -280,7 +280,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co return // Error somewhere, terminate default: // Default is must to avoid blocking } - if wasTerminated { + if wasTerminated.Load() { dataProgressBar.(*pb.ProgressBar).NotPrint = true cancel() return diff --git a/restore/global_variables.go b/restore/global_variables.go index 92196867d..dedd33158 100644 --- a/restore/global_variables.go +++ b/restore/global_variables.go @@ -2,6 +2,7 @@ package restore import ( "sync" + "sync/atomic" "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/dbconn" @@ -36,7 +37,7 @@ var ( pluginConfig *utils.PluginConfig restoreStartTime string version string - wasTerminated bool + wasTerminated atomic.Bool errorTablesMetadata map[string]Empty errorTablesData map[string]Empty opts *options.Options diff --git a/restore/parallel.go b/restore/parallel.go index 8a1c11d8e..2ad1bc054 100644 --- a/restore/parallel.go +++ b/restore/parallel.go @@ -23,7 +23,7 @@ var ( func executeStatementsForConn(statements chan toc.StatementWithType, fatalErr *error, numErrors *int32, progressBar utils.ProgressBar, whichConn int, executeInParallel bool) { for statement := range statements { - if wasTerminated || *fatalErr != nil { + if wasTerminated.Load() || *fatalErr != nil { if executeInParallel { gplog.Error("Error detected on connection %d. Terminating transactions.", whichConn) txMutex.Lock() diff --git a/restore/restore.go b/restore/restore.go index b5b46c093..864c2c01e 100644 --- a/restore/restore.go +++ b/restore/restore.go @@ -289,7 +289,7 @@ func verifyIncrementalState() { } func restorePredata(metadataFilename string) { - if wasTerminated { + if wasTerminated.Load() { return } var numErrors int32 @@ -356,7 +356,7 @@ func restorePredata(metadataFilename string) { } progressBar.Finish() - if wasTerminated { + if wasTerminated.Load() { gplog.Info("Pre-data metadata restore incomplete") } else if numErrors > 0 { gplog.Info("Pre-data metadata restore completed with failures") @@ -366,7 +366,7 @@ func restorePredata(metadataFilename string) { } func restoreSequenceValues(metadataFilename string) { - if wasTerminated { + if wasTerminated.Load() { return } gplog.Info("Restoring sequence values") @@ -396,7 +396,7 @@ func restoreSequenceValues(metadataFilename string) { progressBar.Finish() } - if wasTerminated { + if wasTerminated.Load() { gplog.Info("Sequence values restore incomplete") } else if numErrors > 0 { gplog.Info("Sequence values restore completed with failures") @@ -466,7 +466,7 @@ func editStatementsRedirectSchema(statements []toc.StatementWithType, redirectSc } func restoreData() (int, map[string][]toc.CoordinatorDataEntry) { - if wasTerminated { + if wasTerminated.Load() { return -1, nil } restorePlan := backupConfig.RestorePlan @@ -502,7 +502,7 @@ func restoreData() (int, map[string][]toc.CoordinatorDataEntry) { } dataProgressBar.Finish() - if wasTerminated { + if wasTerminated.Load() { gplog.Info("Data restore incomplete") } else if numErrors > 0 { gplog.Info("Data restore completed with failures") @@ -514,7 +514,7 @@ func restoreData() (int, map[string][]toc.CoordinatorDataEntry) { } func restorePostdata(metadataFilename string) { - if wasTerminated { + if wasTerminated.Load() { return } gplog.Info("Restoring post-data metadata") @@ -532,7 +532,7 @@ func restorePostdata(metadataFilename string) { numErrors += ExecuteRestoreMetadataStatements("postdata", thirdBatch, "", progressBar, utils.PB_VERBOSE, connectionPool.NumConns > 1) progressBar.Finish() - if wasTerminated { + if wasTerminated.Load() { gplog.Info("Post-data metadata restore incomplete") } else if numErrors > 0 { gplog.Info("Post-data metadata restore completed with failures") @@ -542,7 +542,7 @@ func restorePostdata(metadataFilename string) { } func restoreStatistics() { - if wasTerminated { + if wasTerminated.Load() { return } statisticsFilename := globalFPInfo.GetStatisticsFilePath() @@ -562,7 +562,7 @@ func restoreStatistics() { } func runAnalyze(filteredDataEntries map[string][]toc.CoordinatorDataEntry) { - if wasTerminated { + if wasTerminated.Load() { return } gplog.Info("Running ANALYZE on restored tables") @@ -591,7 +591,7 @@ func runAnalyze(filteredDataEntries map[string][]toc.CoordinatorDataEntry) { numErrors := ExecuteStatements(analyzeStatements, progressBar, connectionPool.NumConns > 1) progressBar.Finish() - if wasTerminated { + if wasTerminated.Load() { gplog.Info("ANALYZE on restored tables incomplete") } else if numErrors > 0 { gplog.Info("ANALYZE on restored tables completed with failures") @@ -604,7 +604,7 @@ func DoTeardown() { restoreFailed := false defer func() { // If the restore was terminated, the signal handler will handle cleanup - if wasTerminated { + if wasTerminated.Load() { CleanupGroup.Wait() } else { DoCleanup(restoreFailed) @@ -629,7 +629,7 @@ func DoTeardown() { } restoreFailed = true } - if wasTerminated { + if wasTerminated.Load() { /* * Don't print an error if the restore was canceled, as the signal handler * will take care of cleanup and return codes. Just wait until the signal @@ -740,7 +740,7 @@ func DoCleanup(restoreFailed bool) { utils.CleanUpHelperFilesOnAllHosts(globalCluster, fpInfo, cleanupTimeout) // Check gpbackup_helper errors here if restore was terminated - if wasTerminated { + if wasTerminated.Load() { err := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo) if err != nil { gplog.Error(err.Error()) diff --git a/utils/agent_remote.go b/utils/agent_remote.go index 436553652..6b9540189 100644 --- a/utils/agent_remote.go +++ b/utils/agent_remote.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/greenplum-db/gp-common-go-libs/cluster" @@ -144,12 +145,12 @@ func VerifyHelperVersionOnSegments(version string, c *cluster.Cluster) { } } -func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, operation string, pluginConfigFile string, compressStr string, onErrorContinue bool, isFilter bool, wasTerminated *bool, copyQueue int, isSingleDataFile bool, resizeCluster bool, origSize int, destSize int, verbosity int) { +func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, operation string, pluginConfigFile string, compressStr string, onErrorContinue bool, isFilter bool, wasTerminated *atomic.Bool, copyQueue int, isSingleDataFile bool, resizeCluster bool, origSize int, destSize int, verbosity int) { // A mutex lock for cleaning up and starting gpbackup helpers prevents a // race condition that causes gpbackup_helpers to be orphaned if // gpbackup_helper cleanup happens before they are started. helperMutex.Lock() - if *wasTerminated { + if wasTerminated.Load() { helperMutex.Unlock() select {} // Pause forever and wait for cleanup to exit program. } diff --git a/utils/agent_remote_test.go b/utils/agent_remote_test.go index 8b1c6b56b..14cfdbafe 100644 --- a/utils/agent_remote_test.go +++ b/utils/agent_remote_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "os" + "sync/atomic" "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/gplog" @@ -126,21 +127,21 @@ var _ = Describe("agent remote", func() { }) Describe("StartGpbackupHelpers()", func() { It("Correctly propagates --on-error-continue flag to gpbackup_helper", func() { - wasTerminated := false + var wasTerminated atomic.Bool utils.StartGpbackupHelpers(testCluster, fpInfo, "operation", "/tmp/pluginConfigFile.yml", " compressStr", true, false, &wasTerminated, 1, true, false, 0, 0, gplog.LOGINFO) cc := testExecutor.ClusterCommands[0] Expect(cc[1].CommandString).To(ContainSubstring(" --on-error-continue")) }) It("Correctly propagates --copy-queue-size value to gpbackup_helper", func() { - wasTerminated := false + var wasTerminated atomic.Bool utils.StartGpbackupHelpers(testCluster, fpInfo, "operation", "/tmp/pluginConfigFile.yml", " compressStr", false, false, &wasTerminated, 4, true, false, 0, 0, gplog.LOGINFO) cc := testExecutor.ClusterCommands[0] Expect(cc[1].CommandString).To(ContainSubstring(" --copy-queue-size 4")) }) It("Correctly propagates verbosity", func() { - wasTerminated := false + var wasTerminated atomic.Bool verbosity := gplog.LOGDEBUG utils.StartGpbackupHelpers(testCluster, fpInfo, "operation", "/tmp/pluginConfigFile.yml", " compressStr", false, false, &wasTerminated, 4, true, false, 0, 0, verbosity) diff --git a/utils/util.go b/utils/util.go index 89b9a961a..1dd99255d 100644 --- a/utils/util.go +++ b/utils/util.go @@ -14,6 +14,7 @@ import ( path "path/filepath" "regexp" "strings" + "sync/atomic" "time" "github.com/greenplum-db/gp-common-go-libs/dbconn" @@ -156,7 +157,7 @@ func ValidateCompressionTypeAndLevel(compressionType string, compressionLevel in return nil } -func InitializeSignalHandler(cleanupFunc func(bool), procDesc string, termFlag *bool) { +func InitializeSignalHandler(cleanupFunc func(bool), procDesc string, termFlag *atomic.Bool) { signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, unix.SIGINT, unix.SIGTERM) go func() { @@ -168,7 +169,7 @@ func InitializeSignalHandler(cleanupFunc func(bool), procDesc string, termFlag * case unix.SIGTERM: gplog.Warn("Received a termination signal, aborting %s", procDesc) } - *termFlag = true + termFlag.Store(true) cleanupFunc(true) os.Exit(2) }()