diff --git a/helper/restore_helper.go b/helper/restore_helper.go index e045ea344..c188b6494 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -47,8 +47,29 @@ type RestoreReader struct { fileHandle *os.File bufReader *bufio.Reader seekReader io.ReadSeeker + pluginCmd *PluginCmd readerType ReaderType - errBuf bytes.Buffer +} + +// Wait for plugin process that should be already finished. This should be +// called on every reader that used a plugin as to not leave any zombies behind. +func (r *RestoreReader) waitForPlugin() error { + var err error + if r.pluginCmd != nil && r.pluginCmd.Process != nil { + logVerbose(fmt.Sprintf("Waiting for the plugin process (%d)", r.pluginCmd.Process.Pid)) + err = r.pluginCmd.Wait() + if err != nil { + logError(fmt.Sprintf("Plugin process exited with an error: %s", err)) + } + // Log plugin's stderr as warnings. + errLog := strings.Trim(r.pluginCmd.errBuf.String(), "\x00") + if len(errLog) != 0 { + logWarn(fmt.Sprintf("Plugin log: %s", errLog)) + // Consume the entire buffer. + r.pluginCmd.errBuf.Next(r.pluginCmd.errBuf.Len()) + } + } + return err } func (r *RestoreReader) positionReader(pos uint64, oid int) error { @@ -164,7 +185,20 @@ func doRestoreAgent() error { filename := replaceContentInFilename(*dataFile, contentToRestore) readers[contentToRestore], err = getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList) - + if readers[contentToRestore] != nil { + // NOTE: If we reach here with batches > 1, there will be + // *origSize / *destSize (N old segments / N new segments) + // readers + 1, which is presumably a small number, so we just + // defer the cleanup. + // + // The loops under are constructed in a way that needs to keep + // all readers open for the entire duration of restore (oid is + // in outer loop -- batches in inner loop, we'll need all + // readers for every outer loop iteration), so we can't properly + // close any of the readers until we restore every oid yet, + // unless The Big Refactoring will arrive. + defer readers[contentToRestore].waitForPlugin() + } if err != nil { logError(fmt.Sprintf("Error encountered getting restore data reader for single data file: %v", err)) return err @@ -303,12 +337,7 @@ func doRestoreAgent() error { if *singleDataFile { lastByte[contentToRestore] = start[contentToRestore] + uint64(bytesRead) } - errBuf := readers[contentToRestore].errBuf - if errBuf.Len() > 0 { - err = errors.Wrap(err, strings.Trim(errBuf.String(), "\x00")) - } else { - err = errors.Wrap(err, "Error copying data") - } + err = errors.Wrap(err, "Error copying data") goto LoopEnd } @@ -326,6 +355,17 @@ func doRestoreAgent() error { logVerbose(fmt.Sprintf("Oid %d, Batch %d: End batch restore", tableOid, batchNum)) + // On resize restore reader might be nil. + if !*singleDataFile && !(*isResizeRestore && contentToRestore >= *origSize) { + if errPlugin := readers[contentToRestore].waitForPlugin(); errPlugin != nil { + if err != nil { + err = errors.Wrap(err, errPlugin.Error()) + } else { + err = errPlugin + } + } + } + logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, currentPipe)) errPipe := deletePipe(currentPipe) if errPipe != nil { @@ -371,10 +411,12 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i var seekHandle io.ReadSeeker var isSubset bool var err error = nil + var pluginCmd *PluginCmd = nil restoreReader := new(RestoreReader) if *pluginConfigFile != "" { - readHandle, isSubset, err = startRestorePluginCommand(fileToRead, objToc, oidList, &restoreReader.errBuf) + pluginCmd, readHandle, isSubset, err = startRestorePluginCommand(fileToRead, objToc, oidList) + restoreReader.pluginCmd = pluginCmd if isSubset { // Reader that operates on subset data restoreReader.readerType = SUBSET @@ -400,6 +442,9 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i // error logging handled by calling functions return nil, err } + if pluginCmd != nil { + logVerbose(fmt.Sprintf("Started plugin process (%d)", pluginCmd.Process.Pid)) + } // Set the underlying stream reader in restoreReader if restoreReader.readerType == SEEKABLE { @@ -422,12 +467,6 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i restoreReader.bufReader = bufio.NewReader(readHandle) } - // Check that no error has occurred in plugin command - errMsg := strings.Trim(restoreReader.errBuf.String(), "\x00") - if len(errMsg) != 0 { - return nil, errors.New(errMsg) - } - return restoreReader, err } @@ -466,12 +505,19 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool { return true } -func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidList []int, errBuffer *bytes.Buffer) (io.Reader, bool, error) { +// pluginCmd is needed to keep track of readable stderr and whether the command +// has already been ended. +type PluginCmd struct { + *exec.Cmd + errBuf bytes.Buffer +} + +func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*PluginCmd, io.Reader, bool, error) { isSubset := false pluginConfig, err := utils.ReadPluginConfig(*pluginConfigFile) if err != nil { logError(fmt.Sprintf("Error encountered when reading plugin config: %v", err)) - return nil, false, err + return nil, nil, false, err } cmdStr := "" if objToc != nil && getSubsetFlag(fileToRead, pluginConfig) { @@ -492,14 +538,17 @@ func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidLis cmdStr = fmt.Sprintf("%s restore_data %s %s", pluginConfig.ExecutablePath, pluginConfig.ConfigPath, fileToRead) } logVerbose(cmdStr) - cmd := exec.Command("bash", "-c", cmdStr) - readHandle, err := cmd.StdoutPipe() + pluginCmd := &PluginCmd{} + pluginCmd.Cmd = exec.Command("bash", "-c", cmdStr) + pluginCmd.Stderr = &pluginCmd.errBuf + + readHandle, err := pluginCmd.StdoutPipe() if err != nil { - return nil, false, err + return nil, nil, false, err } - cmd.Stderr = errBuffer - err = cmd.Start() - return readHandle, isSubset, err + err = pluginCmd.Start() + + return pluginCmd, readHandle, isSubset, err } diff --git a/integration/helper_test.go b/integration/helper_test.go index a91dd807a..75914f3de 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -263,6 +263,168 @@ options: Expect(err).ToNot(HaveOccurred()) assertNoErrors() }) + It("gpbackup_helper will not error out when plugin writes something to stderr", func() { + setupRestoreFiles("", true) + + err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run() + Expect(err).ToNot(HaveOccurred()) + defer exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run() + + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--single-data-file", + "--restore-agent", + "--data-file", dataFileFullPath, + "--plugin-config", examplePluginTestConfig} + helperCmd := exec.Command(gpbackupHelperPath, args...) + + var outBuffer bytes.Buffer + helperCmd.Stdout = &outBuffer + helperCmd.Stderr = &outBuffer + + err = helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + for _, i := range []int{1, 3} { + contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i)) + Expect(string(contents)).To(Equal("here is some data\n")) + } + + err = helperCmd.Wait() + printHelperLogOnError(err) + Expect(err).ToNot(HaveOccurred()) + + outputStr := outBuffer.String() + Expect(outputStr).To(ContainSubstring("Some plugin warning")) + + assertNoErrors() + }) + It("gpbackup_helper will not error out when plugin writes something to stderr with cluster resize", func() { + setupRestoreFiles("", true) + for _, i := range []int{1, 3} { + f, _ := os.Create(fmt.Sprintf("%s_%d", examplePluginTestDataFile, i)) + f.WriteString("here is some data\n") + } + + err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run() + Expect(err).ToNot(HaveOccurred()) + defer exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run() + + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--resize-cluster", + "--orig-seg-count", "6", + "--dest-seg-count", "3", + "--restore-agent", + "--data-file", examplePluginTestDataFile, + "--plugin-config", examplePluginTestConfig} + helperCmd := exec.Command(gpbackupHelperPath, args...) + + var outBuffer bytes.Buffer + helperCmd.Stdout = &outBuffer + helperCmd.Stderr = &outBuffer + + err = helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + for _, i := range []int{1, 3} { + contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i)) + Expect(string(contents)).To(Equal("here is some data\n")) + } + + err = helperCmd.Wait() + printHelperLogOnError(err) + Expect(err).ToNot(HaveOccurred()) + + outputStr := outBuffer.String() + Expect(outputStr).To(ContainSubstring("Some plugin warning")) + + assertNoErrors() + }) + It("gpbackup_helper will error out if plugin exits early", func(ctx SpecContext) { + setupRestoreFiles("", true) + + err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_DIE").Run() + Expect(err).ToNot(HaveOccurred()) + defer exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_DIE").Run() + + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--single-data-file", + "--restore-agent", + "--data-file", dataFileFullPath, + "--plugin-config", examplePluginTestConfig} + helperCmd := exec.Command(gpbackupHelperPath, args...) + + var outBuffer bytes.Buffer + helperCmd.Stdout = &outBuffer + helperCmd.Stderr = &outBuffer + + err = helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + for _, i := range []int{1, 3} { + contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i)) + // Empty output + Expect(contents).To(Equal([]byte{})) + } + + err = helperCmd.Wait() + Expect(err).To(HaveOccurred()) + + outputStr := outBuffer.String() + Expect(outputStr).To(ContainSubstring("Plugin process exited with an error")) + + assertErrorsHandled() + }, SpecTimeout(time.Second*10)) + It("gpbackup_helper will error out if plugin exits early with cluster resize", func(ctx SpecContext) { + setupRestoreFiles("", true) + + err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_DIE").Run() + Expect(err).ToNot(HaveOccurred()) + defer exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_DIE").Run() + + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--resize-cluster", + "--orig-seg-count", "6", + "--dest-seg-count", "3", + "--restore-agent", + "--data-file", examplePluginTestDataFile, + "--plugin-config", examplePluginTestConfig} + helperCmd := exec.Command(gpbackupHelperPath, args...) + + var outBuffer bytes.Buffer + helperCmd.Stdout = &outBuffer + helperCmd.Stderr = &outBuffer + + err = helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, 1)) + // Empty output + Expect(contents).To(Equal([]byte{})) + + err = helperCmd.Wait() + Expect(err).To(HaveOccurred()) + + outputStr := outBuffer.String() + Expect(outputStr).To(ContainSubstring("Plugin process exited with an error")) + + assertErrorsHandled() + }, SpecTimeout(time.Second*10)) It("Generates error file when restore agent interrupted", FlakeAttempts(5), func() { setupRestoreFiles("gzip", false) helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--single-data-file") diff --git a/plugins/README.md b/plugins/README.md index ca3657d15..0708e1f9a 100644 --- a/plugins/README.md +++ b/plugins/README.md @@ -43,7 +43,7 @@ gpbackup and gprestore will call the plugin executable in the format [plugin_executable_name] [command] arg1 arg2 ``` -If an error occurs during plugin execution, plugins should write an error message to stderr and return a non-zero error code. +If an error occurs during plugin execution, the plugin should exit with a non-zero code. Plugins may write log messages to stderr without affecting the execution. diff --git a/plugins/example_plugin.bash b/plugins/example_plugin.bash index c13ac334a..b2e193d3f 100755 --- a/plugins/example_plugin.bash +++ b/plugins/example_plugin.bash @@ -77,6 +77,11 @@ restore_data() { filename=`basename "$2"` timestamp_dir=`basename $(dirname "$2")` timestamp_day_dir=${timestamp_dir%??????} + if [ -e "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR" ] ; then + echo 'Some plugin warning' >&2 + elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" ] ; then + exit 1 + fi cat /tmp/plugin_dest/$timestamp_day_dir/$timestamp_dir/$filename }