diff --git a/backup/data.go b/backup/data.go index a65527bfa..5d7606f8e 100644 --- a/backup/data.go +++ b/backup/data.go @@ -134,7 +134,7 @@ func BackupSingleTableData(queryContext context.Context, table Table, rowsCopied destinationToWrite := "" if MustGetFlagBool(options.SINGLE_DATA_FILE) { - destinationToWrite = fmt.Sprintf("%s_%d", globalFPInfo.GetSegmentPipePathForCopyCommand("pipe"), table.Oid) + destinationToWrite = fmt.Sprintf("%s_%d", globalFPInfo.GetSegmentPipePathForCopyCommand(), table.Oid) } else { destinationToWrite = globalFPInfo.GetTableBackupFilePathForCopyCommand(table.Oid, utils.GetPipeThroughProgram().Extension, false) } diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index f932b2b2a..8ff3d0074 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -289,10 +289,10 @@ func assertArtifactsCleaned(timestamp string) { fpInfo := filepath.NewFilePathInfo(backupCluster, "", timestamp, "", false) description := "Checking if helper files are cleaned up properly" cleanupFunc := func(contentID int) string { - errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID, "pipe")) + errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID)) oidFile := fpInfo.GetSegmentHelperFilePath(contentID, "oid") scriptFile := fpInfo.GetSegmentHelperFilePath(contentID, "script") - pipeFile := fpInfo.GetSegmentPipeFilePath(contentID, "pipe") + pipeFile := fpInfo.GetSegmentPipeFilePath(contentID) return fmt.Sprintf("! ls %s* && ! ls %s* && ! ls %s* && ! ls %s*", errorFile, oidFile, scriptFile, pipeFile) } diff --git a/filepath/filepath.go b/filepath/filepath.go index e18c10b9f..7ad507238 100644 --- a/filepath/filepath.go +++ b/filepath/filepath.go @@ -103,13 +103,17 @@ func (backupFPInfo *FilePathInfo) replaceCopyFormatStringsInPath(templateFilePat return strings.Replace(filePath, "", strconv.Itoa(contentID), -1) } -func (backupFPInfo *FilePathInfo) GetSegmentPipeFilePath(contentID int, suffix string) string { - templateFilePath := backupFPInfo.GetSegmentPipePathForCopyCommand(suffix) +func (backupFPInfo *FilePathInfo) GetSegmentPipeFilePath(contentID int, helperIdx ...int) string { + templateFilePath := backupFPInfo.GetSegmentPipePathForCopyCommand(helperIdx...) return backupFPInfo.replaceCopyFormatStringsInPath(templateFilePath, contentID) } -func (backupFPInfo *FilePathInfo) GetSegmentPipePathForCopyCommand(suffix string) string { - return fmt.Sprintf("/gpbackup__%s_%s_%d", backupFPInfo.Timestamp, suffix, backupFPInfo.PID) +func (backupFPInfo *FilePathInfo) GetSegmentPipePathForCopyCommand(helperIdx ...int) string { + pipe := "pipe" + if len(helperIdx) == 1 { + pipe = fmt.Sprintf("pipe_%d", helperIdx[0]) + } + return fmt.Sprintf("/gpbackup__%s_%s_%d", backupFPInfo.Timestamp, pipe, backupFPInfo.PID) } func (backupFPInfo *FilePathInfo) GetTableBackupFilePath(contentID int, tableOid uint32, extension string, singleDataFile bool) string { diff --git a/integration/utils_test.go b/integration/utils_test.go index 1f1b11dc0..ecfff4484 100644 --- a/integration/utils_test.go +++ b/integration/utils_test.go @@ -42,7 +42,7 @@ var _ = Describe("utils integration", func() { Expect(err).To(Not(HaveOccurred())) defer os.Remove(testPipe) go func() { - copyFileName := fpInfo.GetSegmentPipePathForCopyCommand("pipe") + copyFileName := fpInfo.GetSegmentPipePathForCopyCommand() // COPY will blcok because there is no reader for the testPipe _, _ = conn.Exec(fmt.Sprintf("COPY public.foo TO PROGRAM 'echo %s > /dev/null; cat - > %s' WITH CSV DELIMITER ','", copyFileName, testPipe)) }() diff --git a/restore/data.go b/restore/data.go index 87acd676d..b9d8c91cb 100644 --- a/restore/data.go +++ b/restore/data.go @@ -39,8 +39,7 @@ func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, ta if singleDataFile || resizeCluster { //helper.go handles compression, so we don't want to set it here customPipeThroughCommand = utils.DefaultPipeThroughProgram - pipe := fmt.Sprintf("pipe_%d", whichConn) - errorFile := fmt.Sprintf("%s_error", globalFPInfo.GetSegmentPipePathForCopyCommand(pipe)) + errorFile := fmt.Sprintf("%s_error", globalFPInfo.GetSegmentPipePathForCopyCommand(whichConn)) readFromDestinationCommand = fmt.Sprintf("(timeout --foreground 300 bash -c \"while [[ ! -p \"%s\" && ! -f \"%s\" ]]; do sleep 1; done\" || (echo \"Pipe not found %s\">&2; exit 1)) && %s", destinationToRead, errorFile, destinationToRead, readFromDestinationCommand) } else if MustGetFlagString(options.PLUGIN_CONFIG) != "" { readFromDestinationCommand = fmt.Sprintf("%s restore_data %s", pluginConfig.ExecutablePath, pluginConfig.ConfigPath) @@ -89,8 +88,7 @@ func restoreSingleTableData(queryContext context.Context, fpInfo *filepath.FileP for i := 0; i < batches; i++ { destinationToRead := "" if backupConfig.SingleDataFile || resizeCluster { - pipe := fmt.Sprintf("pipe_%d", whichConn) - destinationToRead = fmt.Sprintf("%s_%d_%d", fpInfo.GetSegmentPipePathForCopyCommand(pipe), entry.Oid, i) + destinationToRead = fmt.Sprintf("%s_%d_%d", fpInfo.GetSegmentPipePathForCopyCommand(whichConn), entry.Oid, i) } else { destinationToRead = fpInfo.GetTableBackupFilePathForCopyCommand(entry.Oid, utils.GetPipeThroughProgram().Extension, backupConfig.SingleDataFile) } diff --git a/utils/agent_remote.go b/utils/agent_remote.go index a847abec7..64afe43fd 100644 --- a/utils/agent_remote.go +++ b/utils/agent_remote.go @@ -34,7 +34,7 @@ var AgentErr = errors.New("agent error") */ func CreateSegmentPipeOnAllHostsForBackup(oid string, c *cluster.Cluster, fpInfo filepath.FilePathInfo) { remoteOutput := c.GenerateAndExecuteCommand("Creating segment data pipes", cluster.ON_SEGMENTS, func(contentID int) string { - pipeName := fpInfo.GetSegmentPipeFilePath(contentID, "pipe") + pipeName := fpInfo.GetSegmentPipeFilePath(contentID) pipeName = fmt.Sprintf("%s_%s", pipeName, oid) gplog.Debug("Creating pipe %s", pipeName) return fmt.Sprintf("mkfifo -m 0700 %s", pipeName) @@ -46,9 +46,8 @@ func CreateSegmentPipeOnAllHostsForBackup(oid string, c *cluster.Cluster, fpInfo func CreateSegmentPipeOnAllHostsForRestore(oid string, c *cluster.Cluster, fpInfo filepath.FilePathInfo, helperIdx int) { oidWithBatch := strings.Split(oid, ",") - suffix := fmt.Sprintf("pipe_%d", helperIdx) remoteOutput := c.GenerateAndExecuteCommand("Creating segment data pipes", cluster.ON_SEGMENTS, func(contentID int) string { - pipeName := fpInfo.GetSegmentPipeFilePath(contentID, suffix) + pipeName := fpInfo.GetSegmentPipeFilePath(contentID, helperIdx) pipeName = fmt.Sprintf("%s_%s_%s", pipeName, oidWithBatch[0], oidWithBatch[1]) gplog.Debug("Creating pipe %s", pipeName) return fmt.Sprintf("mkfifo %s", pipeName) @@ -179,18 +178,16 @@ func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, oper resizeStr = fmt.Sprintf(" --resize-cluster --orig-seg-count %d --dest-seg-count %d", origSize, destSize) } oid := "oid" - pipe := "pipe" script := "script" if len(helperIdx) == 1 { oid = fmt.Sprintf("oid_%d", helperIdx[0]) - pipe = fmt.Sprintf("pipe_%d", helperIdx[0]) script = fmt.Sprintf("script_%d", helperIdx[0]) } remoteOutput := c.GenerateAndExecuteCommand("Starting gpbackup_helper agent", cluster.ON_SEGMENTS, func(contentID int) string { tocFile := fpInfo.GetSegmentTOCFilePath(contentID) oidFile := fpInfo.GetSegmentHelperFilePath(contentID, oid) scriptFile := fpInfo.GetSegmentHelperFilePath(contentID, script) - pipeFile := fpInfo.GetSegmentPipeFilePath(contentID, pipe) + pipeFile := fpInfo.GetSegmentPipeFilePath(contentID, helperIdx...) backupFile := fpInfo.GetTableBackupFilePath(contentID, 0, GetPipeThroughProgram().Extension, true) helperCmdStr := fmt.Sprintf(`gpbackup_helper %s --toc-file %s --oid-file %s --pipe-file %s --data-file "%s" --content %d%s%s%s%s%s%s --copy-queue-size %d --verbosity %d`, operation, tocFile, oidFile, pipeFile, backupFile, contentID, pluginStr, compressStr, onErrorContinueStr, filterStr, singleDataFileStr, resizeStr, copyQueue, verbosity) @@ -337,12 +334,8 @@ func CleanUpSegmentHelperProcesses(c *cluster.Cluster, fpInfo filepath.FilePathI } func CheckAgentErrorsOnSegments(c *cluster.Cluster, fpInfo filepath.FilePathInfo, helperIdx ...int) error { - pipe := "pipe" - if len(helperIdx) == 1 { - pipe = fmt.Sprintf("pipe_%d", helperIdx[0]) - } remoteOutput := c.GenerateAndExecuteCommand("Checking whether segment agents had errors", cluster.ON_SEGMENTS, func(contentID int) string { - errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID, pipe)) + errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID, helperIdx...)) /* * If an error file exists we want to indicate an error, as that means * the agent errored out. If no file exists, the agent was successful. @@ -367,11 +360,10 @@ func CheckAgentErrorsOnSegments(c *cluster.Cluster, fpInfo filepath.FilePathInfo func CreateSkipFileOnSegments(oid string, tableName string, c *cluster.Cluster, fpInfo filepath.FilePathInfo, helperIdx int) { createSkipFileLogMsg := fmt.Sprintf("Creating skip file on segments for restore entry %s (%s)", oid, tableName) - pipe := fmt.Sprintf("pipe_%d", helperIdx) remoteOutput := c.GenerateAndExecuteCommand(createSkipFileLogMsg, cluster.ON_SEGMENTS, func(contentID int) string { - return fmt.Sprintf("touch %s_skip_%s", fpInfo.GetSegmentPipeFilePath(contentID, pipe), oid) + return fmt.Sprintf("touch %s_skip_%s", fpInfo.GetSegmentPipeFilePath(contentID, helperIdx), oid) }) c.CheckClusterError(remoteOutput, "Error while creating skip file on segments", func(contentID int) string { - return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID, pipe), oid) + return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID, helperIdx), oid) }) } diff --git a/utils/plugin.go b/utils/plugin.go index 6cb86c9bd..d4a5fa15e 100644 --- a/utils/plugin.go +++ b/utils/plugin.go @@ -414,7 +414,7 @@ func (plugin *PluginConfig) BackupSegmentTOCs(c *cluster.Cluster, fpInfo filepat cluster.ON_SEGMENTS, func(contentID int) string { tocFile := fpInfo.GetSegmentTOCFilePath(contentID) - errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID, "pipe")) + errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID)) command = fmt.Sprintf(`while [[ ! -f "%s" && ! -f "%s" ]]; do sleep 1; done; ls "%s"`, tocFile, errorFile, tocFile) return command }) diff --git a/utils/util.go b/utils/util.go index 4297ef5a8..89b9a961a 100644 --- a/utils/util.go +++ b/utils/util.go @@ -191,7 +191,7 @@ func TerminateHangingCopySessions(fpInfo filepath.FilePathInfo, appName string, queryCol = "query" } - copyFileName := fpInfo.GetSegmentPipePathForCopyCommand("pipe") + copyFileName := fpInfo.GetSegmentPipePathForCopyCommand() fromClause := fmt.Sprintf(`FROM pg_stat_activity WHERE application_name = '%s' AND %s LIKE '%%%s%%'