Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
RekGRpth committed Dec 4, 2024
1 parent 11a1244 commit a45a360
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 28 deletions.
2 changes: 1 addition & 1 deletion backup/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func BackupSingleTableData(queryContext context.Context, table Table, rowsCopied

destinationToWrite := ""
if MustGetFlagBool(options.SINGLE_DATA_FILE) {
destinationToWrite = fmt.Sprintf("%s_%d", globalFPInfo.GetSegmentPipePathForCopyCommand("pipe"), table.Oid)
destinationToWrite = fmt.Sprintf("%s_%d", globalFPInfo.GetSegmentPipePathForCopyCommand(), table.Oid)
} else {
destinationToWrite = globalFPInfo.GetTableBackupFilePathForCopyCommand(table.Oid, utils.GetPipeThroughProgram().Extension, false)
}
Expand Down
4 changes: 2 additions & 2 deletions end_to_end/end_to_end_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,10 @@ func assertArtifactsCleaned(timestamp string) {
fpInfo := filepath.NewFilePathInfo(backupCluster, "", timestamp, "", false)
description := "Checking if helper files are cleaned up properly"
cleanupFunc := func(contentID int) string {
errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID, "pipe"))
errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID))
oidFile := fpInfo.GetSegmentHelperFilePath(contentID, "oid")
scriptFile := fpInfo.GetSegmentHelperFilePath(contentID, "script")
pipeFile := fpInfo.GetSegmentPipeFilePath(contentID, "pipe")
pipeFile := fpInfo.GetSegmentPipeFilePath(contentID)

return fmt.Sprintf("! ls %s* && ! ls %s* && ! ls %s* && ! ls %s*", errorFile, oidFile, scriptFile, pipeFile)
}
Expand Down
12 changes: 8 additions & 4 deletions filepath/filepath.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,17 @@ func (backupFPInfo *FilePathInfo) replaceCopyFormatStringsInPath(templateFilePat
return strings.Replace(filePath, "<SEGID>", strconv.Itoa(contentID), -1)
}

func (backupFPInfo *FilePathInfo) GetSegmentPipeFilePath(contentID int, suffix string) string {
templateFilePath := backupFPInfo.GetSegmentPipePathForCopyCommand(suffix)
func (backupFPInfo *FilePathInfo) GetSegmentPipeFilePath(contentID int, helperIdx ...int) string {
templateFilePath := backupFPInfo.GetSegmentPipePathForCopyCommand(helperIdx...)
return backupFPInfo.replaceCopyFormatStringsInPath(templateFilePath, contentID)
}

func (backupFPInfo *FilePathInfo) GetSegmentPipePathForCopyCommand(suffix string) string {
return fmt.Sprintf("<SEG_DATA_DIR>/gpbackup_<SEGID>_%s_%s_%d", backupFPInfo.Timestamp, suffix, backupFPInfo.PID)
func (backupFPInfo *FilePathInfo) GetSegmentPipePathForCopyCommand(helperIdx ...int) string {
pipe := "pipe"
if len(helperIdx) == 1 {
pipe = fmt.Sprintf("pipe_%d", helperIdx[0])
}
return fmt.Sprintf("<SEG_DATA_DIR>/gpbackup_<SEGID>_%s_%s_%d", backupFPInfo.Timestamp, pipe, backupFPInfo.PID)
}

func (backupFPInfo *FilePathInfo) GetTableBackupFilePath(contentID int, tableOid uint32, extension string, singleDataFile bool) string {
Expand Down
2 changes: 1 addition & 1 deletion integration/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var _ = Describe("utils integration", func() {
Expect(err).To(Not(HaveOccurred()))
defer os.Remove(testPipe)
go func() {
copyFileName := fpInfo.GetSegmentPipePathForCopyCommand("pipe")
copyFileName := fpInfo.GetSegmentPipePathForCopyCommand()
// COPY will blcok because there is no reader for the testPipe
_, _ = conn.Exec(fmt.Sprintf("COPY public.foo TO PROGRAM 'echo %s > /dev/null; cat - > %s' WITH CSV DELIMITER ','", copyFileName, testPipe))
}()
Expand Down
6 changes: 2 additions & 4 deletions restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, ta
if singleDataFile || resizeCluster {
//helper.go handles compression, so we don't want to set it here
customPipeThroughCommand = utils.DefaultPipeThroughProgram
pipe := fmt.Sprintf("pipe_%d", whichConn)
errorFile := fmt.Sprintf("%s_error", globalFPInfo.GetSegmentPipePathForCopyCommand(pipe))
errorFile := fmt.Sprintf("%s_error", globalFPInfo.GetSegmentPipePathForCopyCommand(whichConn))
readFromDestinationCommand = fmt.Sprintf("(timeout --foreground 300 bash -c \"while [[ ! -p \"%s\" && ! -f \"%s\" ]]; do sleep 1; done\" || (echo \"Pipe not found %s\">&2; exit 1)) && %s", destinationToRead, errorFile, destinationToRead, readFromDestinationCommand)
} else if MustGetFlagString(options.PLUGIN_CONFIG) != "" {
readFromDestinationCommand = fmt.Sprintf("%s restore_data %s", pluginConfig.ExecutablePath, pluginConfig.ConfigPath)
Expand Down Expand Up @@ -89,8 +88,7 @@ func restoreSingleTableData(queryContext context.Context, fpInfo *filepath.FileP
for i := 0; i < batches; i++ {
destinationToRead := ""
if backupConfig.SingleDataFile || resizeCluster {
pipe := fmt.Sprintf("pipe_%d", whichConn)
destinationToRead = fmt.Sprintf("%s_%d_%d", fpInfo.GetSegmentPipePathForCopyCommand(pipe), entry.Oid, i)
destinationToRead = fmt.Sprintf("%s_%d_%d", fpInfo.GetSegmentPipePathForCopyCommand(whichConn), entry.Oid, i)
} else {
destinationToRead = fpInfo.GetTableBackupFilePathForCopyCommand(entry.Oid, utils.GetPipeThroughProgram().Extension, backupConfig.SingleDataFile)
}
Expand Down
20 changes: 6 additions & 14 deletions utils/agent_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var AgentErr = errors.New("agent error")
*/
func CreateSegmentPipeOnAllHostsForBackup(oid string, c *cluster.Cluster, fpInfo filepath.FilePathInfo) {
remoteOutput := c.GenerateAndExecuteCommand("Creating segment data pipes", cluster.ON_SEGMENTS, func(contentID int) string {
pipeName := fpInfo.GetSegmentPipeFilePath(contentID, "pipe")
pipeName := fpInfo.GetSegmentPipeFilePath(contentID)
pipeName = fmt.Sprintf("%s_%s", pipeName, oid)
gplog.Debug("Creating pipe %s", pipeName)
return fmt.Sprintf("mkfifo -m 0700 %s", pipeName)
Expand All @@ -46,9 +46,8 @@ func CreateSegmentPipeOnAllHostsForBackup(oid string, c *cluster.Cluster, fpInfo

func CreateSegmentPipeOnAllHostsForRestore(oid string, c *cluster.Cluster, fpInfo filepath.FilePathInfo, helperIdx int) {
oidWithBatch := strings.Split(oid, ",")
suffix := fmt.Sprintf("pipe_%d", helperIdx)
remoteOutput := c.GenerateAndExecuteCommand("Creating segment data pipes", cluster.ON_SEGMENTS, func(contentID int) string {
pipeName := fpInfo.GetSegmentPipeFilePath(contentID, suffix)
pipeName := fpInfo.GetSegmentPipeFilePath(contentID, helperIdx)
pipeName = fmt.Sprintf("%s_%s_%s", pipeName, oidWithBatch[0], oidWithBatch[1])
gplog.Debug("Creating pipe %s", pipeName)
return fmt.Sprintf("mkfifo %s", pipeName)
Expand Down Expand Up @@ -179,18 +178,16 @@ func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, oper
resizeStr = fmt.Sprintf(" --resize-cluster --orig-seg-count %d --dest-seg-count %d", origSize, destSize)
}
oid := "oid"
pipe := "pipe"
script := "script"
if len(helperIdx) == 1 {
oid = fmt.Sprintf("oid_%d", helperIdx[0])
pipe = fmt.Sprintf("pipe_%d", helperIdx[0])
script = fmt.Sprintf("script_%d", helperIdx[0])
}
remoteOutput := c.GenerateAndExecuteCommand("Starting gpbackup_helper agent", cluster.ON_SEGMENTS, func(contentID int) string {
tocFile := fpInfo.GetSegmentTOCFilePath(contentID)
oidFile := fpInfo.GetSegmentHelperFilePath(contentID, oid)
scriptFile := fpInfo.GetSegmentHelperFilePath(contentID, script)
pipeFile := fpInfo.GetSegmentPipeFilePath(contentID, pipe)
pipeFile := fpInfo.GetSegmentPipeFilePath(contentID, helperIdx...)
backupFile := fpInfo.GetTableBackupFilePath(contentID, 0, GetPipeThroughProgram().Extension, true)
helperCmdStr := fmt.Sprintf(`gpbackup_helper %s --toc-file %s --oid-file %s --pipe-file %s --data-file "%s" --content %d%s%s%s%s%s%s --copy-queue-size %d --verbosity %d`,
operation, tocFile, oidFile, pipeFile, backupFile, contentID, pluginStr, compressStr, onErrorContinueStr, filterStr, singleDataFileStr, resizeStr, copyQueue, verbosity)
Expand Down Expand Up @@ -337,12 +334,8 @@ func CleanUpSegmentHelperProcesses(c *cluster.Cluster, fpInfo filepath.FilePathI
}

func CheckAgentErrorsOnSegments(c *cluster.Cluster, fpInfo filepath.FilePathInfo, helperIdx ...int) error {
pipe := "pipe"
if len(helperIdx) == 1 {
pipe = fmt.Sprintf("pipe_%d", helperIdx[0])
}
remoteOutput := c.GenerateAndExecuteCommand("Checking whether segment agents had errors", cluster.ON_SEGMENTS, func(contentID int) string {
errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID, pipe))
errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID, helperIdx...))
/*
* If an error file exists we want to indicate an error, as that means
* the agent errored out. If no file exists, the agent was successful.
Expand All @@ -367,11 +360,10 @@ func CheckAgentErrorsOnSegments(c *cluster.Cluster, fpInfo filepath.FilePathInfo

func CreateSkipFileOnSegments(oid string, tableName string, c *cluster.Cluster, fpInfo filepath.FilePathInfo, helperIdx int) {
createSkipFileLogMsg := fmt.Sprintf("Creating skip file on segments for restore entry %s (%s)", oid, tableName)
pipe := fmt.Sprintf("pipe_%d", helperIdx)
remoteOutput := c.GenerateAndExecuteCommand(createSkipFileLogMsg, cluster.ON_SEGMENTS, func(contentID int) string {
return fmt.Sprintf("touch %s_skip_%s", fpInfo.GetSegmentPipeFilePath(contentID, pipe), oid)
return fmt.Sprintf("touch %s_skip_%s", fpInfo.GetSegmentPipeFilePath(contentID, helperIdx), oid)
})
c.CheckClusterError(remoteOutput, "Error while creating skip file on segments", func(contentID int) string {
return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID, pipe), oid)
return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID, helperIdx), oid)
})
}
2 changes: 1 addition & 1 deletion utils/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (plugin *PluginConfig) BackupSegmentTOCs(c *cluster.Cluster, fpInfo filepat
cluster.ON_SEGMENTS,
func(contentID int) string {
tocFile := fpInfo.GetSegmentTOCFilePath(contentID)
errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID, "pipe"))
errorFile := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID))
command = fmt.Sprintf(`while [[ ! -f "%s" && ! -f "%s" ]]; do sleep 1; done; ls "%s"`, tocFile, errorFile, tocFile)
return command
})
Expand Down
2 changes: 1 addition & 1 deletion utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TerminateHangingCopySessions(fpInfo filepath.FilePathInfo, appName string,
queryCol = "query"
}

copyFileName := fpInfo.GetSegmentPipePathForCopyCommand("pipe")
copyFileName := fpInfo.GetSegmentPipePathForCopyCommand()
fromClause := fmt.Sprintf(`FROM pg_stat_activity
WHERE application_name = '%s'
AND %s LIKE '%%%s%%'
Expand Down

0 comments on commit a45a360

Please sign in to comment.