Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADBDEV-6599: Make gprestore --resize-cluster use --jobs for parallel restoration #110

Merged
merged 78 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
ca1c1db
Implement MVP to paralellize gprestore
RekGRpth Oct 1, 2024
e8b6c1a
fix test
RekGRpth Oct 1, 2024
fffe562
fix backup and tests
RekGRpth Oct 1, 2024
3c0a096
optimize
RekGRpth Oct 1, 2024
ee14bef
test
RekGRpth Oct 1, 2024
733e6a6
fix
RekGRpth Oct 1, 2024
4f8a3ac
fix
RekGRpth Oct 1, 2024
971d5ea
simplify
RekGRpth Oct 1, 2024
873b43b
simplify
RekGRpth Oct 3, 2024
f1e5832
rename
RekGRpth Oct 3, 2024
6b56776
rename
RekGRpth Oct 3, 2024
70940ca
rename
RekGRpth Oct 3, 2024
6f575f6
format
RekGRpth Oct 3, 2024
b164a27
Merge branch 'master' into ADBDEV-6338
RekGRpth Oct 4, 2024
b6b1705
Merge branch 'master' into ADBDEV-6599
RekGRpth Oct 29, 2024
2476483
test
RekGRpth Oct 29, 2024
39cf460
comment
RekGRpth Oct 29, 2024
defe56a
optimize
RekGRpth Oct 30, 2024
163d2b4
stabelize tests
RekGRpth Oct 30, 2024
1a5aacc
fix
RekGRpth Oct 30, 2024
24af8f7
fix
RekGRpth Oct 31, 2024
6053237
revert
RekGRpth Oct 31, 2024
a1d7df5
Merge branch 'master' into ADBDEV-6599
RekGRpth Nov 11, 2024
649935a
fix test
RekGRpth Nov 12, 2024
54dbb1f
stabellize test
RekGRpth Nov 12, 2024
5265dd3
Merge branch 'master' into ADBDEV-6599
RekGRpth Dec 3, 2024
7e47f64
add helper index
RekGRpth Dec 3, 2024
867c9b8
-step
RekGRpth Dec 3, 2024
015c646
fix test
RekGRpth Dec 4, 2024
a654b57
optimize
RekGRpth Dec 4, 2024
2644704
revert backup
RekGRpth Dec 4, 2024
4526c83
simplify and optimize
RekGRpth Dec 4, 2024
7203cec
fix
RekGRpth Dec 4, 2024
c352e26
simplify
RekGRpth Dec 4, 2024
62044ca
skip
RekGRpth Dec 4, 2024
c9e3949
error
RekGRpth Dec 4, 2024
0a45237
rename
RekGRpth Dec 4, 2024
5a486a3
mv
RekGRpth Dec 4, 2024
61ffe30
rename
RekGRpth Dec 4, 2024
6d47183
simplify
RekGRpth Dec 4, 2024
fdb18a4
simplify
RekGRpth Dec 4, 2024
11a1244
fix
RekGRpth Dec 4, 2024
a45a360
simplify
RekGRpth Dec 4, 2024
932f382
simplify
RekGRpth Dec 4, 2024
846efcb
simplify
RekGRpth Dec 4, 2024
473f790
simplify
RekGRpth Dec 4, 2024
e822f56
pipe
RekGRpth Dec 4, 2024
1c5a445
fix test
RekGRpth Dec 4, 2024
a6dc131
simplify
RekGRpth Dec 4, 2024
26c93c3
fix
RekGRpth Dec 4, 2024
a257cef
error
RekGRpth Dec 4, 2024
600a752
fix test
RekGRpth Dec 4, 2024
bcba70a
fix test
RekGRpth Dec 4, 2024
d0fedbb
fix tests
RekGRpth Dec 4, 2024
9c13069
fix test
RekGRpth Dec 4, 2024
a7b37c1
error
RekGRpth Dec 4, 2024
0ac6864
fix
RekGRpth Dec 4, 2024
3988ff5
rename
RekGRpth Dec 4, 2024
0a18bad
fix tests
RekGRpth Dec 4, 2024
ba750c4
error
RekGRpth Dec 4, 2024
1caa858
fix
RekGRpth Dec 4, 2024
d8a1ebe
fix test
RekGRpth Dec 4, 2024
8de1a68
fix test
RekGRpth Dec 4, 2024
39a7a3a
fix test
RekGRpth Dec 4, 2024
c91309b
optimize
RekGRpth Dec 4, 2024
91484d2
simplify
RekGRpth Dec 4, 2024
f519803
fix test
RekGRpth Dec 4, 2024
4a11b18
skip
RekGRpth Dec 4, 2024
f590f04
skip
RekGRpth Dec 4, 2024
62a0bd6
error
RekGRpth Dec 5, 2024
b9543d8
test
RekGRpth Dec 5, 2024
5e4a44c
Merge branch 'master' into ADBDEV-6599
RekGRpth Dec 5, 2024
8dd19ca
max helpers
RekGRpth Dec 6, 2024
f94440f
fix
RekGRpth Dec 6, 2024
6956a93
fatal when ivalid
RekGRpth Dec 6, 2024
fa6222e
optimize
RekGRpth Dec 6, 2024
04d77da
replace last only
RekGRpth Dec 6, 2024
9a4a45c
fix
RekGRpth Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func backupData(tables []Table) {
for _, table := range tables {
oidList = append(oidList, fmt.Sprintf("%d", table.Oid))
}
utils.WriteOidListToSegments(oidList, globalCluster, globalFPInfo, "oid")
utils.WriteOidListToSegments(oidList, globalCluster, globalFPInfo)
compressStr := fmt.Sprintf(" --compression-level %d --compression-type %s", MustGetFlagInt(options.COMPRESSION_LEVEL), MustGetFlagString(options.COMPRESSION_TYPE))
if MustGetFlagBool(options.NO_COMPRESSION) {
compressStr = " --compression-level 0"
Expand Down
2 changes: 1 addition & 1 deletion backup/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ var _ = Describe("backup/data tests", func() {
It("backs up a single regular table with single data file", func() {
_ = cmdFlags.Set(options.SINGLE_DATA_FILE, "true")

backupFile := fmt.Sprintf("<SEG_DATA_DIR>/gpbackup_<SEGID>_20170101010101_pipe_(.*)_%d", testTable.Oid)
backupFile := fmt.Sprintf("<SEG_DATA_DIR>/gpbackup_<SEGID>_20170101010101_\\d+_pipe_%d", testTable.Oid)
copyCmd := fmt.Sprintf(copyFmtStr, backupFile)
mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10))
err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0)
Expand Down
72 changes: 68 additions & 4 deletions end_to_end/end_to_end_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,13 @@ func assertArtifactsCleaned(timestamp string) {
fpInfo := filepath.NewFilePathInfo(backupCluster, "", timestamp, "", false)
description := "Checking if helper files are cleaned up properly"
cleanupFunc := func(contentID int) string {
errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID))
oidFile := fpInfo.GetSegmentHelperFilePath(contentID, "oid")
scriptFile := fpInfo.GetSegmentHelperFilePath(contentID, "script")
pipeFile := fpInfo.GetSegmentPipeFilePath(contentID)
errorFile := utils.GetErrorFilename(pipeFile)
skipFile := utils.GetSkipFilename(pipeFile)

return fmt.Sprintf("! ls %s && ! ls %s && ! ls %s && ! ls %s*", errorFile, oidFile, scriptFile, pipeFile)
return fmt.Sprintf("! ls %s* && ! ls %s* && ! ls %s* && ! ls %s* && ! ls %s*", errorFile, skipFile, oidFile, scriptFile, pipeFile)
}
remoteOutput := backupCluster.GenerateAndExecuteCommand(description, cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR, cleanupFunc)
if remoteOutput.NumErrors != 0 {
Expand Down Expand Up @@ -2326,7 +2327,7 @@ LANGUAGE plpgsql NO SQL;`)
testhelper.AssertQueryRuns(restoreConn, "DROP ROLE testrole;")
})
DescribeTable("",
func(fullTimestamp string, incrementalTimestamp string, tarBaseName string, isIncrementalRestore bool, isFilteredRestore bool, isSingleDataFileRestore bool, testUsesPlugin bool) {
func(fullTimestamp string, incrementalTimestamp string, tarBaseName string, isIncrementalRestore bool, isFilteredRestore bool, isSingleDataFileRestore bool, testUsesPlugin bool, size ...int) {
if isSingleDataFileRestore && segmentCount != 3 {
Skip("Single data file resize restores currently require a 3-segment cluster to test.")
}
Expand Down Expand Up @@ -2358,6 +2359,13 @@ LANGUAGE plpgsql NO SQL;`)
if isFilteredRestore {
gprestoreArgs = append(gprestoreArgs, "--include-schema", "schematwo")
}
if len(size) == 1 {
if isSingleDataFileRestore {
gprestoreArgs = append(gprestoreArgs, "--copy-queue-size", fmt.Sprintf("%d", size[0]))
} else {
gprestoreArgs = append(gprestoreArgs, "--jobs", fmt.Sprintf("%d", size[0]))
}
}
gprestore(gprestorePath, restoreHelperPath, fullTimestamp, gprestoreArgs...)

// check row counts
Expand Down Expand Up @@ -2405,30 +2413,49 @@ LANGUAGE plpgsql NO SQL;`)
}
},
Entry("Can backup a 9-segment cluster and restore to current cluster", "20220909090738", "", "9-segment-db", false, false, false, false),
Entry("Can backup a 9-segment cluster and restore to current cluster with jobs", "20220909090738", "", "9-segment-db", false, false, false, false, 3),
Entry("Can backup a 9-segment cluster and restore to current cluster with single data file", "20220909090827", "", "9-segment-db-single-data-file", false, false, true, false),
Entry("Can backup a 9-segment cluster and restore to current cluster with single data file with copy-queue-size", "20220909090827", "", "9-segment-db-single-data-file", false, false, true, false, 3),
Entry("Can backup a 9-segment cluster and restore to current cluster with incremental backups", "20220909150254", "20220909150353", "9-segment-db-incremental", true, false, false, false),
Entry("Can backup a 9-segment cluster and restore to current cluster with incremental backups with jobs", "20220909150254", "20220909150353", "9-segment-db-incremental", true, false, false, false, 3),

Entry("Can backup a 7-segment cluster and restore to current cluster", "20220908145504", "", "7-segment-db", false, false, false, false),
Entry("Can backup a 7-segment cluster and restore to current cluster with jobs", "20220908145504", "", "7-segment-db", false, false, false, false, 3),
Entry("Can backup a 7-segment cluster and restore to current cluster single data file", "20220912101931", "", "7-segment-db-single-data-file", false, false, true, false),
Entry("Can backup a 7-segment cluster and restore to current cluster single data file with copy-queue-size", "20220912101931", "", "7-segment-db-single-data-file", false, false, true, false, 3),
Entry("Can backup a 7-segment cluster and restore to current cluster with a filter", "20220908145645", "", "7-segment-db-filter", false, true, false, false),
Entry("Can backup a 7-segment cluster and restore to current cluster with a filter with jobs", "20220908145645", "", "7-segment-db-filter", false, true, false, false, 3),
Entry("Can backup a 7-segment cluster and restore to current cluster with single data file and filter", "20220912102413", "", "7-segment-db-single-data-file-filter", false, true, true, false),
Entry("Can backup a 7-segment cluster and restore to current cluster with single data file and filter with copy-queue-size", "20220912102413", "", "7-segment-db-single-data-file-filter", false, true, true, false, 3),
Entry("Can backup a 2-segment cluster and restore to current cluster single data file and filter", "20220908150223", "", "2-segment-db-single-data-file-filter", false, true, true, false),
Entry("Can backup a 2-segment cluster and restore to current cluster single data file and filter with copy-queue-size", "20220908150223", "", "2-segment-db-single-data-file-filter", false, true, true, false, 3),
Entry("Can backup a 2-segment cluster and restore to current cluster single data file", "20220908150159", "", "2-segment-db-single-data-file", false, false, true, false),
Entry("Can backup a 2-segment cluster and restore to current cluster single data file with copy-queue-size", "20220908150159", "", "2-segment-db-single-data-file", false, false, true, false, 3),
Entry("Can backup a 2-segment cluster and restore to current cluster with filter", "20220908150238", "", "2-segment-db-filter", false, true, false, false),
Entry("Can backup a 2-segment cluster and restore to current cluster with incremental backups and a single data file", "20220909150612", "20220909150622", "2-segment-db-incremental", true, false, false, false),
Entry("Can backup a 2-segment cluster and restore to current cluster with filter with jobs", "20220908150238", "", "2-segment-db-filter", false, true, false, false, 3),
Entry("Can backup a 2-segment cluster and restore to current cluster with incremental backups and a single data file", "20220909150612", "20220909150622", "2-segment-db-incremental", true, false, true, false),
Entry("Can backup a 2-segment cluster and restore to current cluster with incremental backups and a single data file with copy-queue-size", "20220909150612", "20220909150622", "2-segment-db-incremental", true, false, true, false, 3),
Entry("Can backup a 1-segment cluster and restore to current cluster", "20220908150735", "", "1-segment-db", false, false, false, false),
Entry("Can backup a 1-segment cluster and restore to current cluster with jobs", "20220908150735", "", "1-segment-db", false, false, false, false, 3),
Entry("Can backup a 1-segment cluster and restore to current cluster with single data file", "20220908150752", "", "1-segment-db-single-data-file", false, false, true, false),
Entry("Can backup a 1-segment cluster and restore to current cluster with single data file with copy-queue-size", "20220908150752", "", "1-segment-db-single-data-file", false, false, true, false, 3),
Entry("Can backup a 1-segment cluster and restore to current cluster with a filter", "20220908150804", "", "1-segment-db-filter", false, true, false, false),
Entry("Can backup a 1-segment cluster and restore to current cluster with a filter with jobs", "20220908150804", "", "1-segment-db-filter", false, true, false, false, 3),
Entry("Can backup a 3-segment cluster and restore to current cluster", "20220909094828", "", "3-segment-db", false, false, false, false),
Entry("Can backup a 3-segment cluster and restore to current cluster with jobs", "20220909094828", "", "3-segment-db", false, false, false, false, 3),

Entry("Can backup a 2-segment using gpbackup 1.26.0 and restore to current cluster", "20230516032007", "", "2-segment-db-1_26_0", false, false, false, false),
Entry("Can backup a 2-segment using gpbackup 1.26.0 and restore to current cluster with jobs", "20230516032007", "", "2-segment-db-1_26_0", false, false, false, false, 3),

// These tests will only run in CI, to avoid requiring developers to configure a plugin locally.
// We don't do as many combinatoric tests for resize restores using plugins, partly for storage space reasons and partly because
// we assume that if all of the above resize restores work and basic plugin restores work then the intersection should also work.
Entry("Can perform a backup and full restore of a 7-segment cluster using a plugin", "20220912101931", "", "7-segment-db-single-data-file", false, false, true, true),
Entry("Can perform a backup and full restore of a 7-segment cluster using a plugin with jobs", "20220912101931", "", "7-segment-db-single-data-file", false, false, true, true, 3),
Entry("Can perform a backup and full restore of a 2-segment cluster using a plugin", "20220908150159", "", "2-segment-db-single-data-file", false, false, true, true),
Entry("Can perform a backup and full restore of a 2-segment cluster using a plugin with jobs", "20220908150159", "", "2-segment-db-single-data-file", false, false, true, true, 3),
Entry("Can perform a backup and incremental restore of a 2-segment cluster using a plugin", "20220909150612", "20220909150622", "2-segment-db-incremental", true, false, false, true),
Entry("Can perform a backup and incremental restore of a 2-segment cluster using a plugin with jobs", "20220909150612", "20220909150622", "2-segment-db-incremental", true, false, false, true, 3),
)
It("will not restore a pre-1.26.0 backup that lacks a stored SegmentCount value", func() {
extractDirectory := extractSavedTarFile(backupDir, "2-segment-db-1_24_0")
Expand Down Expand Up @@ -2581,6 +2608,33 @@ LANGUAGE plpgsql NO SQL;`)
Expect(err).To(HaveOccurred())
Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`))
Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2: timeout: context canceled`))
assertDataRestored(restoreConn, map[string]int{
"public.t0": 0,
"public.t1": 0,
"public.t2": 0,
"public.t3": 0,
"public.t4": 0})
assertArtifactsCleaned("20240502095933")
testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;")
})
It("Will continue after error during restore with jobs", func() {
command := exec.Command("tar", "-xzf", "resources/2-segment-db-error.tar.gz", "-C", backupDir)
mustRunCommand(command)
gprestoreCmd := exec.Command(gprestorePath,
"--timestamp", "20240502095933",
"--redirect-db", "restoredb",
"--backup-dir", path.Join(backupDir, "2-segment-db-error"),
"--resize-cluster", "--jobs", "3", "--on-error-continue")
output, err := gprestoreCmd.CombinedOutput()
Expect(err).To(HaveOccurred())
Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t1: COPY t1, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`))
Expect(string(output)).To(MatchRegexp(`Error loading data into table public.t3: COPY t3, line \d+, column i: "\d+": ERROR: value "\d+" is out of range for type smallint`))
assertDataRestored(restoreConn, map[string]int{
"public.t0": 1000000,
"public.t1": 0,
"public.t2": 1000000,
"public.t3": 0,
"public.t4": 1000000})
assertArtifactsCleaned("20240502095933")
testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;")
})
Expand Down Expand Up @@ -2666,6 +2720,11 @@ LANGUAGE plpgsql NO SQL;`)
"--resize-cluster", "--on-error-continue")
output, _ := gprestoreCmd.CombinedOutput()
Expect(string(output)).To(ContainSubstring(`[ERROR]:-Encountered errors with 1 helper agent(s)`))
assertDataRestored(restoreConn, map[string]int{
"public.a": 1000,
"public.b": 0,
"public.c": 0,
"public.d": 0})
assertArtifactsCleaned("20240730085053")
testhelper.AssertQueryRuns(restoreConn, "DROP TABLE a; DROP TABLE b; DROP TABLE c; DROP TABLE d;")
})
Expand All @@ -2683,6 +2742,11 @@ LANGUAGE plpgsql NO SQL;`)
"--resize-cluster", "--on-error-continue", "--jobs", "3")
output, _ := gprestoreCmd.CombinedOutput()
Expect(string(output)).To(ContainSubstring(`[ERROR]:-Encountered errors with 1 helper agent(s)`))
assertDataRestored(restoreConn, map[string]int{
"public.a": 1000,
"public.b": 0,
"public.c": 1000,
"public.d": 1000})
assertArtifactsCleaned("20240730085053")
testhelper.AssertQueryRuns(restoreConn, "DROP TABLE a; DROP TABLE b; DROP TABLE c; DROP TABLE d;")
})
Expand Down
23 changes: 18 additions & 5 deletions filepath/filepath.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package filepath
*/

import (
"errors"
"fmt"
"os"
"path"
Expand Down Expand Up @@ -103,13 +104,13 @@ func (backupFPInfo *FilePathInfo) replaceCopyFormatStringsInPath(templateFilePat
return strings.Replace(filePath, "<SEGID>", strconv.Itoa(contentID), -1)
}

func (backupFPInfo *FilePathInfo) GetSegmentPipeFilePath(contentID int) string {
templateFilePath := backupFPInfo.GetSegmentPipePathForCopyCommand()
func (backupFPInfo *FilePathInfo) GetSegmentPipeFilePath(contentID int, helperIdx ...int) string {
templateFilePath := backupFPInfo.GetSegmentPipePathForCopyCommand(helperIdx...)
return backupFPInfo.replaceCopyFormatStringsInPath(templateFilePath, contentID)
}

func (backupFPInfo *FilePathInfo) GetSegmentPipePathForCopyCommand() string {
return fmt.Sprintf("<SEG_DATA_DIR>/gpbackup_<SEGID>_%s_pipe_%d", backupFPInfo.Timestamp, backupFPInfo.PID)
func (backupFPInfo *FilePathInfo) GetSegmentPipePathForCopyCommand(helperIdx ...int) string {
return fmt.Sprintf("<SEG_DATA_DIR>/gpbackup_<SEGID>_%s_%d_%s", backupFPInfo.Timestamp, backupFPInfo.PID, FormatSuffix("pipe", helperIdx...))
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
}

func (backupFPInfo *FilePathInfo) GetTableBackupFilePath(contentID int, tableOid uint32, extension string, singleDataFile bool) string {
Expand Down Expand Up @@ -204,7 +205,7 @@ func (backupFPInfo *FilePathInfo) GetPluginConfigPath() string {
}

func (backupFPInfo *FilePathInfo) GetSegmentHelperFilePath(contentID int, suffix string) string {
return path.Join(backupFPInfo.SegDirMap[contentID], fmt.Sprintf("gpbackup_%d_%s_%s_%d", contentID, backupFPInfo.Timestamp, suffix, backupFPInfo.PID))
return path.Join(backupFPInfo.SegDirMap[contentID], fmt.Sprintf("gpbackup_%d_%s_%d_%s", contentID, backupFPInfo.Timestamp, backupFPInfo.PID, suffix))
}

func (backupFPInfo *FilePathInfo) GetHelperLogPath() string {
Expand Down Expand Up @@ -291,3 +292,15 @@ func GetSegPrefix(connectionPool *dbconn.DBConn) string {
segPrefix = segPrefix[:len(segPrefix)-2] // Remove "-1" segment ID from string
return segPrefix
}

func FormatSuffix(suffix string, helperIdx ...int) string {
switch len(helperIdx) {
case 0:
break
case 1:
suffix += fmt.Sprintf("%d", helperIdx[0])
default:
gplog.Fatal(errors.New("helperIdx slice should have <= 1 elements"), "")
}
return suffix
}
6 changes: 3 additions & 3 deletions helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func DoHelper() {
}
if err != nil {
// error logging handled in doBackupAgent and doRestoreAgent
errFile := fmt.Sprintf("%s_error", *pipeFile)
errFile := utils.GetErrorFilename(*pipeFile)
gplog.Debug("Writing error file %s", errFile)
handle, err := utils.OpenFileForWrite(errFile)
if err != nil {
Expand Down Expand Up @@ -281,7 +281,7 @@ func DoCleanup() {
* success, so we create an error file and check for its presence in
* gprestore after the COPYs are finished.
*/
errFile := fmt.Sprintf("%s_error", *pipeFile)
errFile := utils.GetErrorFilename(*pipeFile)
gplog.Debug("Writing error file %s", errFile)
handle, err := utils.OpenFileForWrite(errFile)
if err != nil {
Expand Down Expand Up @@ -317,7 +317,7 @@ func DoCleanup() {
}
}

skipFiles, _ := filepath.Glob(fmt.Sprintf("%s_skip_*", *pipeFile))
skipFiles, _ := filepath.Glob(utils.GetSkipFilename(*pipeFile) + "*")
for _, skipFile := range skipFiles {
err = utils.RemoveFileIfExists(skipFile)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func doRestoreAgent() error {
// might be good to have a GPDB version check here. However, the restore helper should
// not contain a database connection so the version should be passed through the helper
// invocation from gprestore (e.g. create a --db-version flag option).
if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_skip_%d", *pipeFile, tableOid)) {
if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_%d", utils.GetSkipFilename(*pipeFile), tableOid)) {
logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum))
err = nil
skipOid = tableOid
Expand Down
2 changes: 1 addition & 1 deletion integration/agent_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ = Describe("agent remote", func() {
})
Describe("WriteOidListToSegments()", func() {
It("writes oids to a temp file and copies it to all segments", func() {
utils.WriteOidListToSegments(oidList, testCluster, filePath, "oid")
utils.WriteOidListToSegments(oidList, testCluster, filePath)

remoteOutput := testCluster.GenerateAndExecuteCommand("ensure oid file was written to segments", cluster.ON_SEGMENTS, func(contentID int) string {
remoteOidFile := filePath.GetSegmentHelperFilePath(contentID, "oid")
Expand Down
5 changes: 3 additions & 2 deletions integration/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/greenplum-db/gp-common-go-libs/operating"
"github.com/greenplum-db/gpbackup/utils"
"github.com/klauspost/compress/zstd"
"golang.org/x/sys/unix"

Expand All @@ -33,7 +34,7 @@ var (
restoreOidFile = fmt.Sprintf("%s/restore_test_oids", testDir)
pipeFile = fmt.Sprintf("%s/test_pipe", testDir)
dataFileFullPath = filepath.Join(testDir, "test_data")
errorFile = fmt.Sprintf("%s_error", pipeFile)
errorFile = utils.GetErrorFilename(pipeFile)
)

const (
Expand Down Expand Up @@ -690,7 +691,7 @@ func setupRestoreWithSkipFiles(oid int, withPlugin bool) []string {
createCustomTOCFile(tocFile, dataLength)
ret = append(ret, tocFile)

skipFile := fmt.Sprintf("%s_skip_%d", pipeFile, 1)
skipFile := utils.GetSkipFilename(pipeFile) + "_1"
err = exec.Command("touch", skipFile).Run()
Expect(err).ToNot(HaveOccurred())

Expand Down
2 changes: 1 addition & 1 deletion integration/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var _ = Describe("utils integration", func() {
query := `SELECT count(*) FROM pg_stat_activity WHERE application_name = 'hangingApplication'`
Eventually(func() string { return dbconn.MustSelectString(connectionPool, query) }, 5*time.Second, 100*time.Millisecond).Should(Equal("1"))

utils.TerminateHangingCopySessions(fpInfo, "hangingApplication", 30 * time.Second, 1 * time.Second)
utils.TerminateHangingCopySessions(fpInfo, "hangingApplication", 30*time.Second, 1*time.Second)

Eventually(func() string { return dbconn.MustSelectString(connectionPool, query) }, 5*time.Second, 100*time.Millisecond).Should(Equal("0"))

Expand Down
Loading
Loading