Skip to content

Commit

Permalink
Use separate error buffers for different processes in gprestore helper (
Browse files Browse the repository at this point in the history
#87)

Problem description:
gprestore helper reused one error buffer for stderr of multiple processes when
it worked with a plugin. It resulted in the following problem: if the plugin was
launched several times, the error could be detected only on the 2nd run, while
the error happens (i.e. plugin outputs something to the stderr) on each plugin
run.

Fix:
Add a separate error buffer for each RestoreReader.

As now there is no need for global 'errBuf' variable, it was moved to
'doBackupAgent' as a local variable (keeping it global without obvious need is a
bad practice). For the backup helper case, it is ok to have only one buffer as
only one backup plugin instance is started for each backup helper.

Tests are not added because the problem has no stable reproduction scenario - it
depends on the timing of writing to the stderr.

(cherry picked from commit ab19bff)
  • Loading branch information
whitehawk authored and RekGRpth committed Aug 1, 2024
1 parent 207212f commit d1eb0bb
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
12 changes: 7 additions & 5 deletions helper/backup_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package helper

import (
"bufio"
"bytes"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -34,6 +35,7 @@ func doBackupAgent() error {

preloadCreatedPipesForBackup(oidList, *copyQueue)
var currentPipe string
var errBuf bytes.Buffer
/*
* It is important that we create the reader before creating the writer
* so that we establish a connection to the first pipe (created by gpbackup)
Expand Down Expand Up @@ -62,7 +64,7 @@ func doBackupAgent() error {
return err
}
if i == 0 {
pipeWriter, writeCmd, err = getBackupPipeWriter()
pipeWriter, writeCmd, err = getBackupPipeWriter(&errBuf)
if err != nil {
logError(fmt.Sprintf("Oid %d: Error encountered getting backup pipe writer: %v", oid, err))
return err
Expand Down Expand Up @@ -124,10 +126,10 @@ func getBackupPipeReader(currentPipe string) (io.Reader, io.ReadCloser, error) {
return reader, readHandle, nil
}

func getBackupPipeWriter() (pipe BackupPipeWriterCloser, writeCmd *exec.Cmd, err error) {
func getBackupPipeWriter(errBuf *bytes.Buffer) (pipe BackupPipeWriterCloser, writeCmd *exec.Cmd, err error) {
var writeHandle io.WriteCloser
if *pluginConfigFile != "" {
writeCmd, writeHandle, err = startBackupPluginCommand()
writeCmd, writeHandle, err = startBackupPluginCommand(errBuf)
} else {
writeHandle, err = os.Create(*dataFile)
}
Expand Down Expand Up @@ -155,7 +157,7 @@ func getBackupPipeWriter() (pipe BackupPipeWriterCloser, writeCmd *exec.Cmd, err
return nil, nil, fmt.Errorf("unknown compression type '%s' (compression level %d)", *compressionType, *compressionLevel)
}

func startBackupPluginCommand() (*exec.Cmd, io.WriteCloser, error) {
func startBackupPluginCommand(errBuf *bytes.Buffer) (*exec.Cmd, io.WriteCloser, error) {
pluginConfig, err := utils.ReadPluginConfig(*pluginConfigFile)
if err != nil {
// error logging handled by calling functions
Expand All @@ -169,7 +171,7 @@ func startBackupPluginCommand() (*exec.Cmd, io.WriteCloser, error) {
// error logging handled by calling functions
return nil, nil, err
}
writeCmd.Stderr = &errBuf
writeCmd.Stderr = errBuf
err = writeCmd.Start()
if err != nil {
// error logging handled by calling functions
Expand Down
2 changes: 0 additions & 2 deletions helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package helper

import (
"bufio"
"bytes"
"flag"
"fmt"
"os"
Expand All @@ -25,7 +24,6 @@ import (

var (
CleanupGroup *sync.WaitGroup
errBuf bytes.Buffer
version string
wasTerminated bool
writeHandle *os.File
Expand Down
13 changes: 8 additions & 5 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package helper

import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"io"
Expand Down Expand Up @@ -47,6 +48,7 @@ type RestoreReader struct {
bufReader *bufio.Reader
seekReader io.ReadSeeker
readerType ReaderType
errBuf bytes.Buffer
}

func (r *RestoreReader) positionReader(pos uint64, oid int) error {
Expand Down Expand Up @@ -301,6 +303,7 @@ func doRestoreAgent() error {
if *singleDataFile {
lastByte[contentToRestore] = start[contentToRestore] + uint64(bytesRead)
}
errBuf := readers[contentToRestore].errBuf
if errBuf.Len() > 0 {
err = errors.Wrap(err, strings.Trim(errBuf.String(), "\x00"))
} else {
Expand Down Expand Up @@ -371,7 +374,7 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i
restoreReader := new(RestoreReader)

if *pluginConfigFile != "" {
readHandle, isSubset, err = startRestorePluginCommand(fileToRead, objToc, oidList)
readHandle, isSubset, err = startRestorePluginCommand(fileToRead, objToc, oidList, &restoreReader.errBuf)
if isSubset {
// Reader that operates on subset data
restoreReader.readerType = SUBSET
Expand Down Expand Up @@ -420,7 +423,7 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i
}

// Check that no error has occurred in plugin command
errMsg := strings.Trim(errBuf.String(), "\x00")
errMsg := strings.Trim(restoreReader.errBuf.String(), "\x00")
if len(errMsg) != 0 {
return nil, errors.New(errMsg)
}
Expand Down Expand Up @@ -452,7 +455,7 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool {
return false
}
// Helper's option does not allow to use subset
if !*isFiltered || *onErrorContinue {
if !*isFiltered || *onErrorContinue {
return false
}
// Restore subset and compression does not allow together
Expand All @@ -463,7 +466,7 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool {
return true
}

func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (io.Reader, bool, error) {
func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidList []int, errBuffer *bytes.Buffer) (io.Reader, bool, error) {
isSubset := false
pluginConfig, err := utils.ReadPluginConfig(*pluginConfigFile)
if err != nil {
Expand Down Expand Up @@ -495,7 +498,7 @@ func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidLis
if err != nil {
return nil, false, err
}
cmd.Stderr = &errBuf
cmd.Stderr = errBuffer

err = cmd.Start()
return readHandle, isSubset, err
Expand Down

0 comments on commit d1eb0bb

Please sign in to comment.