diff --git a/backup/backup.go b/backup/backup.go index 7de9bbbcd..9457ed83a 100644 --- a/backup/backup.go +++ b/backup/backup.go @@ -306,7 +306,7 @@ func backupData(tables []Table) { for _, table := range tables { oidList = append(oidList, fmt.Sprintf("%d", table.Oid)) } - utils.WriteOidListToSegments(oidList, globalCluster, globalFPInfo, "oid") + utils.WriteOidListToSegments(oidList, globalCluster, globalFPInfo) compressStr := fmt.Sprintf(" --compression-level %d --compression-type %s", MustGetFlagInt(options.COMPRESSION_LEVEL), MustGetFlagString(options.COMPRESSION_TYPE)) if MustGetFlagBool(options.NO_COMPRESSION) { compressStr = " --compression-level 0" diff --git a/backup/data_test.go b/backup/data_test.go index 3fd8fe038..b210eb779 100644 --- a/backup/data_test.go +++ b/backup/data_test.go @@ -176,7 +176,7 @@ var _ = Describe("backup/data tests", func() { It("backs up a single regular table with single data file", func() { _ = cmdFlags.Set(options.SINGLE_DATA_FILE, "true") - backupFile := fmt.Sprintf("/gpbackup__20170101010101_pipe_(.*)_%d", testTable.Oid) + backupFile := fmt.Sprintf("/gpbackup__20170101010101_\\d+_pipe_%d", testTable.Oid) copyCmd := fmt.Sprintf(copyFmtStr, backupFile) mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10)) err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 7a45ffe82..226f8a044 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -289,12 +289,13 @@ func assertArtifactsCleaned(timestamp string) { fpInfo := filepath.NewFilePathInfo(backupCluster, "", timestamp, "", false) description := "Checking if helper files are cleaned up properly" cleanupFunc := func(contentID int) string { - errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID)) oidFile := fpInfo.GetSegmentHelperFilePath(contentID, "oid") scriptFile := fpInfo.GetSegmentHelperFilePath(contentID, "script") pipeFile := fpInfo.GetSegmentPipeFilePath(contentID) + errorFile := utils.GetErrorFilename(pipeFile) + skipFile := utils.GetSkipFilename(pipeFile) - return fmt.Sprintf("! ls %s && ! ls %s && ! ls %s && ! ls %s*", errorFile, oidFile, scriptFile, pipeFile) + return fmt.Sprintf("! ls %s* && ! ls %s* && ! ls %s* && ! ls %s* && ! ls %s*", errorFile, skipFile, oidFile, scriptFile, pipeFile) } remoteOutput := backupCluster.GenerateAndExecuteCommand(description, cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR, cleanupFunc) if remoteOutput.NumErrors != 0 { @@ -2326,7 +2327,7 @@ LANGUAGE plpgsql NO SQL;`) testhelper.AssertQueryRuns(restoreConn, "DROP ROLE testrole;") }) DescribeTable("", - func(fullTimestamp string, incrementalTimestamp string, tarBaseName string, isIncrementalRestore bool, isFilteredRestore bool, isSingleDataFileRestore bool, testUsesPlugin bool) { + func(fullTimestamp string, incrementalTimestamp string, tarBaseName string, isIncrementalRestore bool, isFilteredRestore bool, isSingleDataFileRestore bool, testUsesPlugin bool, size ...int) { if isSingleDataFileRestore && segmentCount != 3 { Skip("Single data file resize restores currently require a 3-segment cluster to test.") } @@ -2358,6 +2359,13 @@ LANGUAGE plpgsql NO SQL;`) if isFilteredRestore { gprestoreArgs = append(gprestoreArgs, "--include-schema", "schematwo") } + if len(size) == 1 { + if isSingleDataFileRestore { + gprestoreArgs = append(gprestoreArgs, "--copy-queue-size", fmt.Sprintf("%d", size[0])) + } else { + gprestoreArgs = append(gprestoreArgs, "--jobs", fmt.Sprintf("%d", size[0])) + } + } gprestore(gprestorePath, restoreHelperPath, fullTimestamp, gprestoreArgs...) // check row counts @@ -2405,30 +2413,49 @@ LANGUAGE plpgsql NO SQL;`) } }, Entry("Can backup a 9-segment cluster and restore to current cluster", "20220909090738", "", "9-segment-db", false, false, false, false), + Entry("Can backup a 9-segment cluster and restore to current cluster with jobs", "20220909090738", "", "9-segment-db", false, false, false, false, 3), Entry("Can backup a 9-segment cluster and restore to current cluster with single data file", "20220909090827", "", "9-segment-db-single-data-file", false, false, true, false), + Entry("Can backup a 9-segment cluster and restore to current cluster with single data file with copy-queue-size", "20220909090827", "", "9-segment-db-single-data-file", false, false, true, false, 3), Entry("Can backup a 9-segment cluster and restore to current cluster with incremental backups", "20220909150254", "20220909150353", "9-segment-db-incremental", true, false, false, false), + Entry("Can backup a 9-segment cluster and restore to current cluster with incremental backups with jobs", "20220909150254", "20220909150353", "9-segment-db-incremental", true, false, false, false, 3), Entry("Can backup a 7-segment cluster and restore to current cluster", "20220908145504", "", "7-segment-db", false, false, false, false), + Entry("Can backup a 7-segment cluster and restore to current cluster with jobs", "20220908145504", "", "7-segment-db", false, false, false, false, 3), Entry("Can backup a 7-segment cluster and restore to current cluster single data file", "20220912101931", "", "7-segment-db-single-data-file", false, false, true, false), + Entry("Can backup a 7-segment cluster and restore to current cluster single data file with copy-queue-size", "20220912101931", "", "7-segment-db-single-data-file", false, false, true, false, 3), Entry("Can backup a 7-segment cluster and restore to current cluster with a filter", "20220908145645", "", "7-segment-db-filter", false, true, false, false), + Entry("Can backup a 7-segment cluster and restore to current cluster with a filter with jobs", "20220908145645", "", "7-segment-db-filter", false, true, false, false, 3), Entry("Can backup a 7-segment cluster and restore to current cluster with single data file and filter", "20220912102413", "", "7-segment-db-single-data-file-filter", false, true, true, false), + Entry("Can backup a 7-segment cluster and restore to current cluster with single data file and filter with copy-queue-size", "20220912102413", "", "7-segment-db-single-data-file-filter", false, true, true, false, 3), Entry("Can backup a 2-segment cluster and restore to current cluster single data file and filter", "20220908150223", "", "2-segment-db-single-data-file-filter", false, true, true, false), + Entry("Can backup a 2-segment cluster and restore to current cluster single data file and filter with copy-queue-size", "20220908150223", "", "2-segment-db-single-data-file-filter", false, true, true, false, 3), Entry("Can backup a 2-segment cluster and restore to current cluster single data file", "20220908150159", "", "2-segment-db-single-data-file", false, false, true, false), + Entry("Can backup a 2-segment cluster and restore to current cluster single data file with copy-queue-size", "20220908150159", "", "2-segment-db-single-data-file", false, false, true, false, 3), Entry("Can backup a 2-segment cluster and restore to current cluster with filter", "20220908150238", "", "2-segment-db-filter", false, true, false, false), - Entry("Can backup a 2-segment cluster and restore to current cluster with incremental backups and a single data file", "20220909150612", "20220909150622", "2-segment-db-incremental", true, false, false, false), + Entry("Can backup a 2-segment cluster and restore to current cluster with filter with jobs", "20220908150238", "", "2-segment-db-filter", false, true, false, false, 3), + Entry("Can backup a 2-segment cluster and restore to current cluster with incremental backups and a single data file", "20220909150612", "20220909150622", "2-segment-db-incremental", true, false, true, false), + Entry("Can backup a 2-segment cluster and restore to current cluster with incremental backups and a single data file with copy-queue-size", "20220909150612", "20220909150622", "2-segment-db-incremental", true, false, true, false, 3), Entry("Can backup a 1-segment cluster and restore to current cluster", "20220908150735", "", "1-segment-db", false, false, false, false), + Entry("Can backup a 1-segment cluster and restore to current cluster with jobs", "20220908150735", "", "1-segment-db", false, false, false, false, 3), Entry("Can backup a 1-segment cluster and restore to current cluster with single data file", "20220908150752", "", "1-segment-db-single-data-file", false, false, true, false), + Entry("Can backup a 1-segment cluster and restore to current cluster with single data file with copy-queue-size", "20220908150752", "", "1-segment-db-single-data-file", false, false, true, false, 3), Entry("Can backup a 1-segment cluster and restore to current cluster with a filter", "20220908150804", "", "1-segment-db-filter", false, true, false, false), + Entry("Can backup a 1-segment cluster and restore to current cluster with a filter with jobs", "20220908150804", "", "1-segment-db-filter", false, true, false, false, 3), Entry("Can backup a 3-segment cluster and restore to current cluster", "20220909094828", "", "3-segment-db", false, false, false, false), + Entry("Can backup a 3-segment cluster and restore to current cluster with jobs", "20220909094828", "", "3-segment-db", false, false, false, false, 3), Entry("Can backup a 2-segment using gpbackup 1.26.0 and restore to current cluster", "20230516032007", "", "2-segment-db-1_26_0", false, false, false, false), + Entry("Can backup a 2-segment using gpbackup 1.26.0 and restore to current cluster with jobs", "20230516032007", "", "2-segment-db-1_26_0", false, false, false, false, 3), // These tests will only run in CI, to avoid requiring developers to configure a plugin locally. // We don't do as many combinatoric tests for resize restores using plugins, partly for storage space reasons and partly because // we assume that if all of the above resize restores work and basic plugin restores work then the intersection should also work. Entry("Can perform a backup and full restore of a 7-segment cluster using a plugin", "20220912101931", "", "7-segment-db-single-data-file", false, false, true, true), + Entry("Can perform a backup and full restore of a 7-segment cluster using a plugin with jobs", "20220912101931", "", "7-segment-db-single-data-file", false, false, true, true, 3), Entry("Can perform a backup and full restore of a 2-segment cluster using a plugin", "20220908150159", "", "2-segment-db-single-data-file", false, false, true, true), + Entry("Can perform a backup and full restore of a 2-segment cluster using a plugin with jobs", "20220908150159", "", "2-segment-db-single-data-file", false, false, true, true, 3), Entry("Can perform a backup and incremental restore of a 2-segment cluster using a plugin", "20220909150612", "20220909150622", "2-segment-db-incremental", true, false, false, true), + Entry("Can perform a backup and incremental restore of a 2-segment cluster using a plugin with jobs", "20220909150612", "20220909150622", "2-segment-db-incremental", true, false, false, true, 3), ) It("will not restore a pre-1.26.0 backup that lacks a stored SegmentCount value", func() { extractDirectory := extractSavedTarFile(backupDir, "2-segment-db-1_24_0") @@ -2581,6 +2608,33 @@ LANGUAGE plpgsql NO SQL;`) Expect(err).To(HaveOccurred()) Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`)) Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2: timeout: context canceled`)) + assertDataRestored(restoreConn, map[string]int{ + "public.t0": 0, + "public.t1": 0, + "public.t2": 0, + "public.t3": 0, + "public.t4": 0}) + assertArtifactsCleaned("20240502095933") + testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;") + }) + It("Will continue after error during restore with jobs", func() { + command := exec.Command("tar", "-xzf", "resources/2-segment-db-error.tar.gz", "-C", backupDir) + mustRunCommand(command) + gprestoreCmd := exec.Command(gprestorePath, + "--timestamp", "20240502095933", + "--redirect-db", "restoredb", + "--backup-dir", path.Join(backupDir, "2-segment-db-error"), + "--resize-cluster", "--jobs", "3", "--on-error-continue") + output, err := gprestoreCmd.CombinedOutput() + Expect(err).To(HaveOccurred()) + Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`)) + Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t3: COPY t3, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`)) + assertDataRestored(restoreConn, map[string]int{ + "public.t0": 1000000, + "public.t1": 0, + "public.t2": 1000000, + "public.t3": 0, + "public.t4": 1000000}) assertArtifactsCleaned("20240502095933") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;") }) @@ -2666,6 +2720,11 @@ LANGUAGE plpgsql NO SQL;`) "--resize-cluster", "--on-error-continue") output, _ := gprestoreCmd.CombinedOutput() Expect(string(output)).To(ContainSubstring(`[ERROR]:-Encountered errors with 1 helper agent(s)`)) + assertDataRestored(restoreConn, map[string]int{ + "public.a": 1000, + "public.b": 0, + "public.c": 0, + "public.d": 0}) assertArtifactsCleaned("20240730085053") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE a; DROP TABLE b; DROP TABLE c; DROP TABLE d;") }) @@ -2683,6 +2742,11 @@ LANGUAGE plpgsql NO SQL;`) "--resize-cluster", "--on-error-continue", "--jobs", "3") output, _ := gprestoreCmd.CombinedOutput() Expect(string(output)).To(ContainSubstring(`[ERROR]:-Encountered errors with 1 helper agent(s)`)) + assertDataRestored(restoreConn, map[string]int{ + "public.a": 1000, + "public.b": 0, + "public.c": 1000, + "public.d": 1000}) assertArtifactsCleaned("20240730085053") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE a; DROP TABLE b; DROP TABLE c; DROP TABLE d;") }) diff --git a/filepath/filepath.go b/filepath/filepath.go index af7235b6e..4133d8f3b 100644 --- a/filepath/filepath.go +++ b/filepath/filepath.go @@ -7,6 +7,7 @@ package filepath */ import ( + "errors" "fmt" "os" "path" @@ -103,13 +104,13 @@ func (backupFPInfo *FilePathInfo) replaceCopyFormatStringsInPath(templateFilePat return strings.Replace(filePath, "", strconv.Itoa(contentID), -1) } -func (backupFPInfo *FilePathInfo) GetSegmentPipeFilePath(contentID int) string { - templateFilePath := backupFPInfo.GetSegmentPipePathForCopyCommand() +func (backupFPInfo *FilePathInfo) GetSegmentPipeFilePath(contentID int, helperIdx ...int) string { + templateFilePath := backupFPInfo.GetSegmentPipePathForCopyCommand(helperIdx...) return backupFPInfo.replaceCopyFormatStringsInPath(templateFilePath, contentID) } -func (backupFPInfo *FilePathInfo) GetSegmentPipePathForCopyCommand() string { - return fmt.Sprintf("/gpbackup__%s_pipe_%d", backupFPInfo.Timestamp, backupFPInfo.PID) +func (backupFPInfo *FilePathInfo) GetSegmentPipePathForCopyCommand(helperIdx ...int) string { + return fmt.Sprintf("/gpbackup__%s_%d_%s", backupFPInfo.Timestamp, backupFPInfo.PID, FormatSuffix("pipe", helperIdx...)) } func (backupFPInfo *FilePathInfo) GetTableBackupFilePath(contentID int, tableOid uint32, extension string, singleDataFile bool) string { @@ -204,7 +205,7 @@ func (backupFPInfo *FilePathInfo) GetPluginConfigPath() string { } func (backupFPInfo *FilePathInfo) GetSegmentHelperFilePath(contentID int, suffix string) string { - return path.Join(backupFPInfo.SegDirMap[contentID], fmt.Sprintf("gpbackup_%d_%s_%s_%d", contentID, backupFPInfo.Timestamp, suffix, backupFPInfo.PID)) + return path.Join(backupFPInfo.SegDirMap[contentID], fmt.Sprintf("gpbackup_%d_%s_%d_%s", contentID, backupFPInfo.Timestamp, backupFPInfo.PID, suffix)) } func (backupFPInfo *FilePathInfo) GetHelperLogPath() string { @@ -291,3 +292,15 @@ func GetSegPrefix(connectionPool *dbconn.DBConn) string { segPrefix = segPrefix[:len(segPrefix)-2] // Remove "-1" segment ID from string return segPrefix } + +func FormatSuffix(suffix string, helperIdx ...int) string { + switch len(helperIdx) { + case 0: + break + case 1: + suffix += fmt.Sprintf("%d", helperIdx[0]) + default: + gplog.Fatal(errors.New("helperIdx slice should have <= 1 elements"), "") + } + return suffix +} diff --git a/helper/helper.go b/helper/helper.go index 1ca431ebd..04b80c143 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -78,7 +78,7 @@ func DoHelper() { } if err != nil { // error logging handled in doBackupAgent and doRestoreAgent - errFile := fmt.Sprintf("%s_error", *pipeFile) + errFile := utils.GetErrorFilename(*pipeFile) gplog.Debug("Writing error file %s", errFile) handle, err := utils.OpenFileForWrite(errFile) if err != nil { @@ -281,7 +281,7 @@ func DoCleanup() { * success, so we create an error file and check for its presence in * gprestore after the COPYs are finished. */ - errFile := fmt.Sprintf("%s_error", *pipeFile) + errFile := utils.GetErrorFilename(*pipeFile) gplog.Debug("Writing error file %s", errFile) handle, err := utils.OpenFileForWrite(errFile) if err != nil { @@ -317,7 +317,7 @@ func DoCleanup() { } } - skipFiles, _ := filepath.Glob(fmt.Sprintf("%s_skip_*", *pipeFile)) + skipFiles, _ := filepath.Glob(utils.GetSkipFilename(*pipeFile) + "*") for _, skipFile := range skipFiles { err = utils.RemoveFileIfExists(skipFile) if err != nil { diff --git a/helper/restore_helper.go b/helper/restore_helper.go index e4218cdce..bea0a1df0 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -302,7 +302,7 @@ func doRestoreAgent() error { // might be good to have a GPDB version check here. However, the restore helper should // not contain a database connection so the version should be passed through the helper // invocation from gprestore (e.g. create a --db-version flag option). - if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_skip_%d", *pipeFile, tableOid)) { + if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_%d", utils.GetSkipFilename(*pipeFile), tableOid)) { logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) err = nil skipOid = tableOid diff --git a/integration/agent_remote_test.go b/integration/agent_remote_test.go index 7c312a357..8b03bbdd8 100644 --- a/integration/agent_remote_test.go +++ b/integration/agent_remote_test.go @@ -25,7 +25,7 @@ var _ = Describe("agent remote", func() { }) Describe("WriteOidListToSegments()", func() { It("writes oids to a temp file and copies it to all segments", func() { - utils.WriteOidListToSegments(oidList, testCluster, filePath, "oid") + utils.WriteOidListToSegments(oidList, testCluster, filePath) remoteOutput := testCluster.GenerateAndExecuteCommand("ensure oid file was written to segments", cluster.ON_SEGMENTS, func(contentID int) string { remoteOidFile := filePath.GetSegmentHelperFilePath(contentID, "oid") diff --git a/integration/helper_test.go b/integration/helper_test.go index f856ab458..c8bac5395 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/greenplum-db/gp-common-go-libs/operating" + "github.com/greenplum-db/gpbackup/utils" "github.com/klauspost/compress/zstd" "golang.org/x/sys/unix" @@ -33,7 +34,7 @@ var ( restoreOidFile = fmt.Sprintf("%s/restore_test_oids", testDir) pipeFile = fmt.Sprintf("%s/test_pipe", testDir) dataFileFullPath = filepath.Join(testDir, "test_data") - errorFile = fmt.Sprintf("%s_error", pipeFile) + errorFile = utils.GetErrorFilename(pipeFile) ) const ( @@ -690,7 +691,7 @@ func setupRestoreWithSkipFiles(oid int, withPlugin bool) []string { createCustomTOCFile(tocFile, dataLength) ret = append(ret, tocFile) - skipFile := fmt.Sprintf("%s_skip_%d", pipeFile, 1) + skipFile := utils.GetSkipFilename(pipeFile) + "_1" err = exec.Command("touch", skipFile).Run() Expect(err).ToNot(HaveOccurred()) diff --git a/integration/utils_test.go b/integration/utils_test.go index 5977ecb94..ecfff4484 100644 --- a/integration/utils_test.go +++ b/integration/utils_test.go @@ -50,7 +50,7 @@ var _ = Describe("utils integration", func() { query := `SELECT count(*) FROM pg_stat_activity WHERE application_name = 'hangingApplication'` Eventually(func() string { return dbconn.MustSelectString(connectionPool, query) }, 5*time.Second, 100*time.Millisecond).Should(Equal("1")) - utils.TerminateHangingCopySessions(fpInfo, "hangingApplication", 30 * time.Second, 1 * time.Second) + utils.TerminateHangingCopySessions(fpInfo, "hangingApplication", 30*time.Second, 1*time.Second) Eventually(func() string { return dbconn.MustSelectString(connectionPool, query) }, 5*time.Second, 100*time.Millisecond).Should(Equal("0")) diff --git a/restore/data.go b/restore/data.go index 1fb040faf..6b30d6a14 100644 --- a/restore/data.go +++ b/restore/data.go @@ -24,6 +24,7 @@ import ( var ( tableDelim = "," + maxHelpers int ) func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) { @@ -39,7 +40,7 @@ func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, ta if singleDataFile || resizeCluster { //helper.go handles compression, so we don't want to set it here customPipeThroughCommand = utils.DefaultPipeThroughProgram - errorFile := fmt.Sprintf("%s_error", globalFPInfo.GetSegmentPipePathForCopyCommand()) + errorFile := utils.GetErrorFilename(globalFPInfo.GetSegmentPipePathForCopyCommand(HelperIdx(whichConn)...)) readFromDestinationCommand = fmt.Sprintf("(timeout --foreground 300 bash -c \"while [[ ! -p \"%s\" && ! -f \"%s\" ]]; do sleep 1; done\" || (echo \"Pipe not found %s\">&2; exit 1)) && %s", destinationToRead, errorFile, destinationToRead, readFromDestinationCommand) } else if MustGetFlagString(options.PLUGIN_CONFIG) != "" { readFromDestinationCommand = fmt.Sprintf("%s restore_data %s", pluginConfig.ExecutablePath, pluginConfig.ConfigPath) @@ -88,7 +89,7 @@ func restoreSingleTableData(queryContext context.Context, fpInfo *filepath.FileP for i := 0; i < batches; i++ { destinationToRead := "" if backupConfig.SingleDataFile || resizeCluster { - destinationToRead = fmt.Sprintf("%s_%d_%d", fpInfo.GetSegmentPipePathForCopyCommand(), entry.Oid, i) + destinationToRead = fmt.Sprintf("%s_%d_%d", fpInfo.GetSegmentPipePathForCopyCommand(HelperIdx(whichConn)...), entry.Oid, i) } else { destinationToRead = fpInfo.GetTableBackupFilePathForCopyCommand(entry.Oid, utils.GetPipeThroughProgram().Extension, backupConfig.SingleDataFile) } @@ -106,7 +107,7 @@ func restoreSingleTableData(queryContext context.Context, fpInfo *filepath.FileP // will hang indefinitely waiting to read from pipes that the helper // was expected to set up if backupConfig.SingleDataFile || resizeCluster { - agentErr := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo) + agentErr := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo, HelperIdx(whichConn)...) if agentErr != nil { gplog.Error(agentErr.Error()) return agentErr @@ -120,7 +121,7 @@ func restoreSingleTableData(queryContext context.Context, fpInfo *filepath.FileP if MustGetFlagBool(options.ON_ERROR_CONTINUE) { if connectionPool.Version.AtLeast("6") && (backupConfig.SingleDataFile || resizeCluster) { // inform segment helpers to skip this entry - utils.CreateSkipFileOnSegments(fmt.Sprintf("%d", entry.Oid), tableName, globalCluster, globalFPInfo) + utils.CreateSkipFileOnSegments(entry.Oid, tableName, globalCluster, globalFPInfo, HelperIdx(whichConn)...) } } return copyErr @@ -218,34 +219,44 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co gplog.Verbose("Initializing pipes and gpbackup_helper on segments for %srestore", msg) utils.VerifyHelperVersionOnSegments(version, globalCluster) - // During a larger-to-smaller restore, we need to do multiple passes of - // data loading so we assign the batches here. - oidList := make([]string, 0) - for _, entry := range dataEntries { - if entry.IsReplicated { - oidList = append(oidList, fmt.Sprintf("%d,0", entry.Oid)) - continue - } + if backupConfig.SingleDataFile { + maxHelpers = 1 + } else if connectionPool.NumConns < totalTables { + maxHelpers = connectionPool.NumConns + } else { + maxHelpers = totalTables + } + + for whichConn := 0; whichConn < maxHelpers; whichConn++ { + // During a larger-to-smaller restore, we need to do multiple passes of + // data loading so we assign the batches here. + oidList := make([]string, 0) + for entryIdx := whichConn; entryIdx < totalTables; entryIdx += maxHelpers { + if dataEntries[entryIdx].IsReplicated { + oidList = append(oidList, fmt.Sprintf("%d,0", dataEntries[entryIdx].Oid)) + continue + } - for b := 0; b < batches; b++ { - oidList = append(oidList, fmt.Sprintf("%d,%d", entry.Oid, b)) + for b := 0; b < batches; b++ { + oidList = append(oidList, fmt.Sprintf("%d,%d", dataEntries[entryIdx].Oid, b)) + } } - } - utils.WriteOidListToSegments(oidList, globalCluster, fpInfo, "oid") - initialPipes := CreateInitialSegmentPipes(oidList, globalCluster, connectionPool, fpInfo) - if wasTerminated.Load() { - return 0 - } - isFilter := false - if len(opts.IncludedRelations) > 0 || len(opts.ExcludedRelations) > 0 || len(opts.IncludedSchemas) > 0 || len(opts.ExcludedSchemas) > 0 { - isFilter = true - } - compressStr := "" - if backupConfig.Compressed { - compressStr = fmt.Sprintf(" --compression-type %s ", utils.GetPipeThroughProgram().Name) + utils.WriteOidListToSegments(oidList, globalCluster, fpInfo, HelperIdx(whichConn)...) + initialPipes := CreateInitialSegmentPipes(oidList, globalCluster, connectionPool, fpInfo, HelperIdx(whichConn)...) + if wasTerminated.Load() { + return 0 + } + isFilter := false + if len(opts.IncludedRelations) > 0 || len(opts.ExcludedRelations) > 0 || len(opts.IncludedSchemas) > 0 || len(opts.ExcludedSchemas) > 0 { + isFilter = true + } + compressStr := "" + if backupConfig.Compressed { + compressStr = fmt.Sprintf(" --compression-type %s ", utils.GetPipeThroughProgram().Name) + } + utils.StartGpbackupHelpers(globalCluster, fpInfo, "--restore-agent", MustGetFlagString(options.PLUGIN_CONFIG), compressStr, MustGetFlagBool(options.ON_ERROR_CONTINUE), isFilter, &wasTerminated, initialPipes, backupConfig.SingleDataFile, resizeCluster, origSize, destSize, gplog.GetVerbosity(), HelperIdx(whichConn)...) } - utils.StartGpbackupHelpers(globalCluster, fpInfo, "--restore-agent", MustGetFlagString(options.PLUGIN_CONFIG), compressStr, MustGetFlagBool(options.ON_ERROR_CONTINUE), isFilter, &wasTerminated, initialPipes, backupConfig.SingleDataFile, resizeCluster, origSize, destSize, gplog.GetVerbosity()) } /* * We break when an interrupt is received and rely on @@ -253,7 +264,6 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co * statements in progress if they don't finish on their own. */ var tableNum int64 = 0 - tasks := make(chan toc.CoordinatorDataEntry, totalTables) var workerPool sync.WaitGroup var numErrors int32 var mutex = &sync.Mutex{} @@ -273,7 +283,8 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co defer workerPool.Done() setGUCsForConnection(gucStatements, whichConn) - for entry := range tasks { + for entryIdx := whichConn; entryIdx < totalTables; entryIdx += connectionPool.NumConns { + entry := dataEntries[entryIdx] // Check if any error occurred in any other goroutines: select { case <-ctx.Done(): @@ -304,6 +315,10 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co if err != nil { atomic.AddInt32(&numErrors, 1) + if errors.Is(err, utils.AgentErr) && MustGetFlagBool(options.ON_ERROR_CONTINUE) && maxHelpers > 1 { + dataProgressBar.(*pb.ProgressBar).NotPrint = true + return + } if errors.Is(err, utils.AgentErr) || !MustGetFlagBool(options.ON_ERROR_CONTINUE) { dataProgressBar.(*pb.ProgressBar).NotPrint = true cancel() @@ -320,10 +335,6 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co } }(i) } - for _, entry := range dataEntries { - tasks <- entry - } - close(tasks) workerPool.Wait() // Allow panics to crash from the main process, invoking DoCleanup select { @@ -341,16 +352,25 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co return numErrors } -func CreateInitialSegmentPipes(oidList []string, c *cluster.Cluster, connectionPool *dbconn.DBConn, fpInfo filepath.FilePathInfo) int { +func CreateInitialSegmentPipes(oidList []string, c *cluster.Cluster, connectionPool *dbconn.DBConn, fpInfo filepath.FilePathInfo, helperIdx ...int) int { // Create min(connections, tables) segment pipes on each host var maxPipes int - if connectionPool.NumConns < len(oidList) { + if !backupConfig.SingleDataFile { + maxPipes = 1 // Create single initial pipe for the first oid in the restore oids list for non --single-data-file data file restore. + } else if connectionPool.NumConns < len(oidList) { maxPipes = connectionPool.NumConns } else { maxPipes = len(oidList) } for i := 0; i < maxPipes; i++ { - utils.CreateSegmentPipeOnAllHostsForRestore(oidList[i], c, fpInfo) + utils.CreateSegmentPipeOnAllHostsForRestore(oidList[i], c, fpInfo, helperIdx...) } return maxPipes } + +func HelperIdx(whichConn int) []int { + if maxHelpers > 1 { + return []int{whichConn} + } + return []int{} +} diff --git a/restore/data_test.go b/restore/data_test.go index b1667c8dc..ad03956d5 100644 --- a/restore/data_test.go +++ b/restore/data_test.go @@ -57,7 +57,7 @@ var _ = Describe("restore/data tests", func() { Expect(err).ShouldNot(HaveOccurred()) }) It("will restore a table from a single data file", func() { - execStr := regexp.QuoteMeta(fmt.Sprintf("COPY public.foo(i,j) FROM PROGRAM '(timeout --foreground 300 bash -c \"while [[ ! -p \"/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456\" && ! -f \"/gpbackup__20170101010101_pipe_%d_error\" ]]; do sleep 1; done\" || (echo \"Pipe not found /backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456\">&2; exit 1)) && cat /backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456' WITH CSV DELIMITER ',' ON SEGMENT", os.Getpid())) + execStr := regexp.QuoteMeta(fmt.Sprintf("COPY public.foo(i,j) FROM PROGRAM '(timeout --foreground 300 bash -c \"while [[ ! -p \"/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456\" && ! -f \"/gpbackup__20170101010101_%d_error\" ]]; do sleep 1; done\" || (echo \"Pipe not found /backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456\">&2; exit 1)) && cat /backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456' WITH CSV DELIMITER ',' ON SEGMENT", os.Getpid())) mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456" _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, true, 0) diff --git a/restore/restore.go b/restore/restore.go index 864c2c01e..282a62bb0 100644 --- a/restore/restore.go +++ b/restore/restore.go @@ -739,12 +739,10 @@ func DoCleanup(restoreFailed bool) { utils.CleanUpSegmentHelperProcesses(globalCluster, fpInfo, "restore", cleanupTimeout) utils.CleanUpHelperFilesOnAllHosts(globalCluster, fpInfo, cleanupTimeout) - // Check gpbackup_helper errors here if restore was terminated - if wasTerminated.Load() { - err := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo) - if err != nil { - gplog.Error(err.Error()) - } + // Check gpbackup_helper errors here + err := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo) + if err != nil { + gplog.Error(err.Error()) } } } diff --git a/utils/agent_remote.go b/utils/agent_remote.go index 6b9540189..a7b878359 100644 --- a/utils/agent_remote.go +++ b/utils/agent_remote.go @@ -4,6 +4,7 @@ import ( "fmt" "io" path "path/filepath" + "regexp" "runtime" "strconv" "strings" @@ -45,10 +46,10 @@ func CreateSegmentPipeOnAllHostsForBackup(oid string, c *cluster.Cluster, fpInfo }) } -func CreateSegmentPipeOnAllHostsForRestore(oid string, c *cluster.Cluster, fpInfo filepath.FilePathInfo) { +func CreateSegmentPipeOnAllHostsForRestore(oid string, c *cluster.Cluster, fpInfo filepath.FilePathInfo, helperIdx ...int) { oidWithBatch := strings.Split(oid, ",") remoteOutput := c.GenerateAndExecuteCommand("Creating segment data pipes", cluster.ON_SEGMENTS, func(contentID int) string { - pipeName := fpInfo.GetSegmentPipeFilePath(contentID) + pipeName := fpInfo.GetSegmentPipeFilePath(contentID, helperIdx...) pipeName = fmt.Sprintf("%s_%s_%s", pipeName, oidWithBatch[0], oidWithBatch[1]) gplog.Debug("Creating pipe %s", pipeName) return fmt.Sprintf("mkfifo %s", pipeName) @@ -58,7 +59,8 @@ func CreateSegmentPipeOnAllHostsForRestore(oid string, c *cluster.Cluster, fpInf }) } -func WriteOidListToSegments(oidList []string, c *cluster.Cluster, fpInfo filepath.FilePathInfo, fileSuffix string) { +func WriteOidListToSegments(oidList []string, c *cluster.Cluster, fpInfo filepath.FilePathInfo, helperIdx ...int) { + fileSuffix := filepath.FormatSuffix("oid", helperIdx...) rsync_exists := CommandExists("rsync") if !rsync_exists { gplog.Fatal(errors.New("Failed to find rsync on PATH. Please ensure rsync is installed."), "") @@ -145,7 +147,7 @@ func VerifyHelperVersionOnSegments(version string, c *cluster.Cluster) { } } -func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, operation string, pluginConfigFile string, compressStr string, onErrorContinue bool, isFilter bool, wasTerminated *atomic.Bool, copyQueue int, isSingleDataFile bool, resizeCluster bool, origSize int, destSize int, verbosity int) { +func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, operation string, pluginConfigFile string, compressStr string, onErrorContinue bool, isFilter bool, wasTerminated *atomic.Bool, copyQueue int, isSingleDataFile bool, resizeCluster bool, origSize int, destSize int, verbosity int, helperIdx ...int) { // A mutex lock for cleaning up and starting gpbackup helpers prevents a // race condition that causes gpbackup_helpers to be orphaned if // gpbackup_helper cleanup happens before they are started. @@ -178,12 +180,11 @@ func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, oper if resizeCluster { resizeStr = fmt.Sprintf(" --resize-cluster --orig-seg-count %d --dest-seg-count %d", origSize, destSize) } - remoteOutput := c.GenerateAndExecuteCommand("Starting gpbackup_helper agent", cluster.ON_SEGMENTS, func(contentID int) string { tocFile := fpInfo.GetSegmentTOCFilePath(contentID) - oidFile := fpInfo.GetSegmentHelperFilePath(contentID, "oid") - scriptFile := fpInfo.GetSegmentHelperFilePath(contentID, "script") - pipeFile := fpInfo.GetSegmentPipeFilePath(contentID) + oidFile := fpInfo.GetSegmentHelperFilePath(contentID, filepath.FormatSuffix("oid", helperIdx...)) + scriptFile := fpInfo.GetSegmentHelperFilePath(contentID, filepath.FormatSuffix("script", helperIdx...)) + pipeFile := fpInfo.GetSegmentPipeFilePath(contentID, helperIdx...) backupFile := fpInfo.GetTableBackupFilePath(contentID, 0, GetPipeThroughProgram().Extension, true) helperCmdStr := fmt.Sprintf(`gpbackup_helper %s --toc-file %s --oid-file %s --pipe-file %s --data-file "%s" --content %d%s%s%s%s%s%s --copy-queue-size %d --verbosity %d`, operation, tocFile, oidFile, pipeFile, backupFile, contentID, pluginStr, compressStr, onErrorContinueStr, filterStr, singleDataFileStr, resizeStr, copyQueue, verbosity) @@ -205,10 +206,10 @@ HEREDOC func findCommandStr(c *cluster.Cluster, fpInfo filepath.FilePathInfo, contentID int) string { var cmdString string if runtime.GOOS == "linux" { - cmdString = fmt.Sprintf(`find %s -regextype posix-extended -regex ".*gpbackup_%d_%s_(oid|script|pipe)_%d.*"`, + cmdString = fmt.Sprintf(`find %s -regextype posix-extended -regex ".*gpbackup_%d_%s_%d_(oid|script|pipe|skip).*"`, c.GetDirForContent(contentID), contentID, fpInfo.Timestamp, fpInfo.PID) } else if runtime.GOOS == "darwin" { - cmdString = fmt.Sprintf(`find -E %s -regex ".*gpbackup_%d_%s_(oid|script|pipe)_%d.*"`, + cmdString = fmt.Sprintf(`find -E %s -regex ".*gpbackup_%d_%s_%d_(oid|script|pipe|skip).*"`, c.GetDirForContent(contentID), contentID, fpInfo.Timestamp, fpInfo.PID) } return cmdString @@ -329,20 +330,25 @@ func CleanUpSegmentHelperProcesses(c *cluster.Cluster, fpInfo filepath.FilePathI } } -func CheckAgentErrorsOnSegments(c *cluster.Cluster, fpInfo filepath.FilePathInfo) error { +func CheckAgentErrorsOnSegments(c *cluster.Cluster, fpInfo filepath.FilePathInfo, helperIdx ...int) error { remoteOutput := c.GenerateAndExecuteCommand("Checking whether segment agents had errors", cluster.ON_SEGMENTS, func(contentID int) string { - errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID)) + errorFile := GetErrorFilename(fpInfo.GetSegmentPipeFilePath(contentID, helperIdx...)) + if len(helperIdx) == 0 { + errorFile += "*" + } + /* * If an error file exists we want to indicate an error, as that means * the agent errored out. If no file exists, the agent was successful. */ - return fmt.Sprintf("if [[ -f %s ]]; then echo 'error'; fi; rm -f %s", errorFile, errorFile) + return fmt.Sprintf("if ls %[1]s >/dev/null 2>/dev/null; then echo 'error'; fi; rm -f %[1]s", errorFile) }) + agent := filepath.FormatSuffix("agent", helperIdx...) numErrors := 0 for contentID, cmd := range remoteOutput.Commands { if strings.TrimSpace(cmd.Stdout) == "error" { - gplog.Verbose("Error occurred with helper agent on segment %d on host %s.", contentID, c.GetHostForContent(contentID)) + gplog.Verbose("Error occurred with helper %s on segment %d on host %s.", agent, contentID, c.GetHostForContent(contentID)) numErrors++ } } @@ -354,12 +360,20 @@ func CheckAgentErrorsOnSegments(c *cluster.Cluster, fpInfo filepath.FilePathInfo return nil } -func CreateSkipFileOnSegments(oid string, tableName string, c *cluster.Cluster, fpInfo filepath.FilePathInfo) { - createSkipFileLogMsg := fmt.Sprintf("Creating skip file on segments for restore entry %s (%s)", oid, tableName) +func CreateSkipFileOnSegments(oid uint32, tableName string, c *cluster.Cluster, fpInfo filepath.FilePathInfo, helperIdx ...int) { + createSkipFileLogMsg := fmt.Sprintf("Creating skip file on segments for restore entry %d (%s)", oid, tableName) remoteOutput := c.GenerateAndExecuteCommand(createSkipFileLogMsg, cluster.ON_SEGMENTS, func(contentID int) string { - return fmt.Sprintf("touch %s_skip_%s", fpInfo.GetSegmentPipeFilePath(contentID), oid) + return fmt.Sprintf("touch %s_%d", GetSkipFilename(fpInfo.GetSegmentPipeFilePath(contentID, helperIdx...)), oid) }) c.CheckClusterError(remoteOutput, "Error while creating skip file on segments", func(contentID int) string { - return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID), oid) + return fmt.Sprintf("Could not create skip file %s_%d on segments", GetSkipFilename(fpInfo.GetSegmentPipeFilePath(contentID, helperIdx...)), oid) }) } + +func GetErrorFilename(pipeFile string) string { + return regexp.MustCompile(`_pipe(\d*)$`).ReplaceAllString(pipeFile, "_error$1") +} + +func GetSkipFilename(pipeFile string) string { + return regexp.MustCompile(`_pipe(\d*)$`).ReplaceAllString(pipeFile, "_skip$1") +} diff --git a/utils/agent_remote_test.go b/utils/agent_remote_test.go index 14cfdbafe..e5ceb9842 100644 --- a/utils/agent_remote_test.go +++ b/utils/agent_remote_test.go @@ -50,13 +50,13 @@ var _ = Describe("agent remote", func() { // this file is not used throughout the unit tests below, and it is cleaned up with the method: `operating.System.Remove` Describe("WriteOidListToSegments()", func() { It("generates the correct rsync commands to copy oid file to segments", func() { - utils.WriteOidListToSegments(oidList, testCluster, fpInfo, "oid") + utils.WriteOidListToSegments(oidList, testCluster, fpInfo) Expect(testExecutor.NumExecutions).To(Equal(1)) cc := testExecutor.ClusterCommands[0] Expect(len(cc)).To(Equal(2)) - Expect(cc[0].CommandString).To(MatchRegexp("rsync -e ssh .*/gpbackup-oids.* localhost:/data/gpseg0/gpbackup_0_11112233445566_oid_.*")) - Expect(cc[1].CommandString).To(MatchRegexp("rsync -e ssh .*/gpbackup-oids.* remotehost1:/data/gpseg1/gpbackup_1_11112233445566_oid_.*")) + Expect(cc[0].CommandString).To(MatchRegexp("rsync -e ssh .*/gpbackup-oids.* localhost:/data/gpseg0/gpbackup_0_11112233445566_\\d+_oid")) + Expect(cc[1].CommandString).To(MatchRegexp("rsync -e ssh .*/gpbackup-oids.* remotehost1:/data/gpseg1/gpbackup_1_11112233445566_\\d+_oid")) }) It("panics if any rsync commands fail and outputs correct err messages", func() { testExecutor.ErrorOnExecNum = 1 @@ -73,7 +73,7 @@ var _ = Describe("agent remote", func() { }, } - Expect(func() { utils.WriteOidListToSegments(oidList, testCluster, fpInfo, "oid") }).To(Panic()) + Expect(func() { utils.WriteOidListToSegments(oidList, testCluster, fpInfo) }).To(Panic()) Expect(testExecutor.NumExecutions).To(Equal(1)) Expect(string(logfile.Contents())).To(ContainSubstring(`[CRITICAL]:-Failed to rsync oid file on 1 segment. See gbytes.Buffer for a complete list of errors.`)) @@ -155,12 +155,12 @@ var _ = Describe("agent remote", func() { Expect(err).ToNot(HaveOccurred()) cc := testExecutor.ClusterCommands[0] - errorFile0 := fmt.Sprintf(`/data/gpseg0/gpbackup_0_11112233445566_pipe_%d_error`, fpInfo.PID) - expectedCmd0 := fmt.Sprintf(`if [[ -f %[1]s ]]; then echo 'error'; fi; rm -f %[1]s`, errorFile0) + errorFile0 := fmt.Sprintf(`/data/gpseg0/gpbackup_0_11112233445566_%d_error`, fpInfo.PID) + expectedCmd0 := fmt.Sprintf(`if ls %[1]s* >/dev/null 2>/dev/null; then echo 'error'; fi; rm -f %[1]s*`, errorFile0) Expect(cc[0].CommandString).To(ContainSubstring(expectedCmd0)) - errorFile1 := fmt.Sprintf(`/data/gpseg1/gpbackup_1_11112233445566_pipe_%d_error`, fpInfo.PID) - expectedCmd1 := fmt.Sprintf(`if [[ -f %[1]s ]]; then echo 'error'; fi; rm -f %[1]s`, errorFile1) + errorFile1 := fmt.Sprintf(`/data/gpseg1/gpbackup_1_11112233445566_%d_error`, fpInfo.PID) + expectedCmd1 := fmt.Sprintf(`if ls %[1]s* >/dev/null 2>/dev/null; then echo 'error'; fi; rm -f %[1]s*`, errorFile1) Expect(cc[1].CommandString).To(ContainSubstring(expectedCmd1)) }) diff --git a/utils/plugin.go b/utils/plugin.go index d4a5fa15e..128d75909 100644 --- a/utils/plugin.go +++ b/utils/plugin.go @@ -414,7 +414,7 @@ func (plugin *PluginConfig) BackupSegmentTOCs(c *cluster.Cluster, fpInfo filepat cluster.ON_SEGMENTS, func(contentID int) string { tocFile := fpInfo.GetSegmentTOCFilePath(contentID) - errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID)) + errorFile := GetErrorFilename(fpInfo.GetSegmentPipeFilePath(contentID)) command = fmt.Sprintf(`while [[ ! -f "%s" && ! -f "%s" ]]; do sleep 1; done; ls "%s"`, tocFile, errorFile, tocFile) return command })