diff --git a/helper/helper.go b/helper/helper.go index 04b80c143..ee6fd2458 100644 --- a/helper/helper.go +++ b/helper/helper.go @@ -1,7 +1,6 @@ package helper import ( - "bufio" "flag" "fmt" "os" @@ -26,10 +25,9 @@ import ( var ( CleanupGroup *sync.WaitGroup version string + pipesMap map[string]bool wasTerminated atomic.Bool wasSigpiped atomic.Bool - writeHandle *os.File - writer *bufio.Writer ) /* @@ -213,24 +211,6 @@ func openClosePipe(filename string) error { return nil } -func getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) { - oidStr, err := operating.System.ReadFile(oidFileName) - if err != nil { - logError(fmt.Sprintf("Error encountered reading oid batch list from file: %v", err)) - return nil, err - } - oidStrList := strings.Split(strings.TrimSpace(fmt.Sprintf("%s", oidStr)), "\n") - oidList := make([]oidWithBatch, len(oidStrList)) - for i, entry := range oidStrList { - oidWithBatchEntry := strings.Split(entry, ",") - oidNum, _ := strconv.Atoi(oidWithBatchEntry[0]) - batchNum, _ := strconv.Atoi(oidWithBatchEntry[1]) - - oidList[i] = oidWithBatch{oid: oidNum, batch: batchNum} - } - return oidList, nil -} - func getOidListFromFile(oidFileName string) ([]int, error) { oidStr, err := operating.System.ReadFile(oidFileName) if err != nil { @@ -246,29 +226,6 @@ func getOidListFromFile(oidFileName string) ([]int, error) { return oidList, nil } -func flushAndCloseRestoreWriter(pipeName string, oid int) error { - if writer != nil { - writer.Write([]byte{}) // simulate writer connected in case of error - err := writer.Flush() - if err != nil { - logError("Oid %d: Failed to flush pipe %s", oid, pipeName) - return err - } - writer = nil - logVerbose("Oid %d: Successfully flushed pipe %s", oid, pipeName) - } - if writeHandle != nil { - err := writeHandle.Close() - if err != nil { - logError("Oid %d: Failed to close pipe handle", oid) - return err - } - writeHandle = nil - logVerbose("Oid %d: Successfully closed pipe handle", oid) - } - return nil -} - /* * Shared helper functions */ @@ -292,10 +249,6 @@ func DoCleanup() { logVerbose("Encountered error closing error file: %v", err) } } - err := flushAndCloseRestoreWriter("Current writer pipe on cleanup", 0) - if err != nil { - logVerbose("Encountered error during cleanup: %v", err) - } pipeFiles, _ := filepath.Glob(fmt.Sprintf("%s_[0-9]*", *pipeFile)) for _, pipeName := range pipeFiles { @@ -305,13 +258,13 @@ func DoCleanup() { * open/close pipes so that the COPY commands hanging on them can complete. */ logVerbose("Opening/closing pipe %s", pipeName) - err = openClosePipe(pipeName) + err := openClosePipe(pipeName) if err != nil { logVerbose("Encountered error opening/closing pipe %s: %v", pipeName, err) } } logVerbose("Removing pipe %s", pipeName) - err = deletePipe(pipeName) + err := deletePipe(pipeName) if err != nil { logVerbose("Encountered error removing pipe %s: %v", pipeName, err) } @@ -319,7 +272,7 @@ func DoCleanup() { skipFiles, _ := filepath.Glob(utils.GetSkipFilename(*pipeFile) + "*") for _, skipFile := range skipFiles { - err = utils.RemoveFileIfExists(skipFile) + err := utils.RemoveFileIfExists(skipFile) if err != nil { logVerbose("Encountered error during cleanup skip files: %v", err) } diff --git a/helper/helper_test.go b/helper/helper_test.go index b0bbf7160..8ff8948d6 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -1,12 +1,204 @@ package helper import ( + "bufio" + "fmt" + "os" + "github.com/greenplum-db/gpbackup/utils" + "golang.org/x/sys/unix" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + + "github.com/greenplum-db/gpbackup/toc" + "github.com/pkg/errors" +) + +var ( + testDir = "/tmp/helper_test/20180101/20180101010101" + testTocFile = fmt.Sprintf("%s/test_toc.yaml", testDir) ) +type restoreReaderTestImpl struct { + waitCount int +} + +func (r *restoreReaderTestImpl) waitForPlugin() error { + r.waitCount++ + return nil +} + +func (r *restoreReaderTestImpl) positionReader(pos uint64, oid int) error { + return nil +} + +func (r *restoreReaderTestImpl) copyData(num int64) (int64, error) { + return 1, nil +} + +func (r *restoreReaderTestImpl) copyAllData() (int64, error) { + return 1, nil +} + +func (r *restoreReaderTestImpl) closeFileHandle() { +} + +func (r *restoreReaderTestImpl) getReaderType() ReaderType { + return "nil" +} + +type helperTestStep struct { + restorePipeWriterArgExpect string + restorePipeWriterResult bool + skipFileArgTableOid int + skipFileResult bool + comment string +} + +type restoreMockHelperImpl struct { + currentStep int + started bool + expectedOidBatch []oidWithBatch + expectedSteps []helperTestStep + + pipesCreated map[string]bool + pipesOpened map[string]bool + + restoreData *restoreReaderTestImpl +} + +func (h *restoreMockHelperImpl) isPipeOpened(pipe string) bool { + Expect(h.pipesOpened).ToNot(BeNil()) + + return h.pipesOpened[pipe] +} + +func (h *restoreMockHelperImpl) isPipeCreated(pipe string) bool { + Expect(h.pipesCreated).ToNot(BeNil()) + + return h.pipesCreated[pipe] +} + +func (h *restoreMockHelperImpl) makeStep() helperTestStep { + if !h.started { + h.started = true + } else { + h.currentStep++ + } + + Expect(h.currentStep).To(BeNumerically("<", len(h.expectedSteps))) + ret := h.expectedSteps[h.currentStep] + fmt.Printf("Step: %s", ret.comment) + return ret +} + +func (h *restoreMockHelperImpl) getCurStep() helperTestStep { + Expect(h.currentStep).To(BeNumerically("<", len(h.expectedSteps))) + return h.expectedSteps[h.currentStep] +} + +func (h *restoreMockHelperImpl) closeAndDeletePipe(tableOid int, batchNum int) { + pipename := fmt.Sprintf("%s_%d_%d", *pipeFile, tableOid, batchNum) + Expect(h.isPipeCreated(pipename)).To(BeTrue()) + + delete(h.pipesOpened, pipename) + delete(h.pipesCreated, pipename) +} + +func newHelperTest(batches []oidWithBatch, steps []helperTestStep) *restoreMockHelperImpl { + var ret = new(restoreMockHelperImpl) + ret.expectedOidBatch = batches + ret.expectedSteps = steps + ret.pipesCreated = make(map[string]bool) + ret.pipesOpened = make(map[string]bool) + ret.restoreData = &restoreReaderTestImpl{} + + // pre-create pipes as starter does + for i := 0; i < *copyQueue; i++ { + oidBatch := ret.expectedOidBatch[i] + pipename := fmt.Sprintf("%s_%d_%d", *pipeFile, oidBatch.oid, oidBatch.batch) + ret.createPipe(pipename) + } + return ret +} + +func (h *restoreMockHelperImpl) getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) { + return h.expectedOidBatch, nil +} + +func (h *restoreMockHelperImpl) checkForSkipFile(pipeFile string, tableOid int) bool { + step := h.getCurStep() + Expect(tableOid).To(Equal(step.skipFileArgTableOid)) + ret := step.skipFileResult + return ret +} + +func (h *restoreMockHelperImpl) createPipe(pipe string) error { + // Check that pipe was not opened yet + Expect(h.isPipeCreated(pipe)).To(Equal(false)) + Expect(h.isPipeOpened(pipe)).To(Equal(false)) + + h.pipesCreated[pipe] = true + return nil +} + +func (h *restoreMockHelperImpl) flushAndCloseRestoreWriter(pipeName string, oid int) error { + // Check that we are closing pipe which is opened + Expect(h.isPipeOpened(pipeName)).To(Equal(true)) + delete(h.pipesOpened, pipeName) + return nil +} + +func (*restoreMockHelperImpl) doRestoreAgentCleanup() { + // This was intentionaly left blank to support the IRestoreHelper interface +} + +func (h *restoreMockHelperImpl) getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (IRestoreReader, error) { + Expect(h.restoreData).ToNot(BeNil()) + return h.restoreData, nil +} + +func (h *restoreMockHelperImpl) getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) { + step := h.makeStep() + Expect(currentPipe).To(Equal(step.restorePipeWriterArgExpect)) + + // The pipe is opened in getRestorePipeWriter and should not be created before + Expect(h.isPipeOpened(currentPipe)).To(Equal(false)) + + if step.restorePipeWriterResult { + h.pipesOpened[currentPipe] = true + var writer bufio.Writer + return &writer, nil, nil + } + return nil, nil, unix.ENXIO +} + +type testPluginCmd struct { + hasProcess_ bool + error *string + waitCount int +} + +func (tp *testPluginCmd) hasProcess() bool { + return tp.hasProcess_ +} + +func (pt *testPluginCmd) pid() int { + return 42 +} + +func (pt *testPluginCmd) Wait() error { + pt.waitCount += 1 + if pt.error == nil { + return nil + } + return errors.New(*pt.error) +} + +func (pt *testPluginCmd) errLog() { +} + var _ = Describe("helper tests", func() { var pluginConfig utils.PluginConfig var isSubset bool @@ -24,6 +216,11 @@ var _ = Describe("helper tests", func() { Options: make(map[string]string), } + BeforeEach(func() { + err := os.MkdirAll(testDir, 0777) + Expect(err).ShouldNot(HaveOccurred()) + }) + Describe("Check subset flag", func() { It("when restore_subset is off, --on-error-continue is false, compression is not used", func() { pluginConfig.Options["restore_subset"] = "off" @@ -98,4 +295,234 @@ var _ = Describe("helper tests", func() { Expect(isSubset).To(Equal(false)) }) }) + + Describe("doRestoreAgent Mocked unit tests", func() { + BeforeEach(func() { + // Setup mocked tests environment + *singleDataFile = false + *content = 1 + *oidFile = "testoid.dat" + *isResizeRestore = true + *origSize = 5 + *destSize = 3 + *pipeFile = "mock" + *onErrorContinue = true + }) + It("successfully restores using a single data file when inputs are valid and no errors occur", func() { + *singleDataFile = true + + oidBatch := []oidWithBatch{{oid: 1, batch: 1}} + steps := []helperTestStep{ + {"mock_1_1", true, 1, false, "Can open single data file"}, + } + + mockHelper := newHelperTest(oidBatch, steps) + + // Prepare and write the toc file + testDir := "" // Use local directory for the TOC file instead of default + *tocFile = fmt.Sprintf("%stest_toc.yaml", testDir) + writeTestTOC(*tocFile) + defer func() { + _ = os.Remove(*tocFile) + }() + + *dataFile = "test_data.dat" + // Call the function under test + err := doRestoreAgentInternal(mockHelper) + + Expect(err).ToNot(HaveOccurred()) + }) + It("successfully restores using multiple data files when inputs are valid and no errors occur", func() { + // Call the function under test + oidBatch := []oidWithBatch{{oid: 1, batch: 1}} + steps := []helperTestStep{ + {"mock_1_1", true, 1, false, "restores using multiple data files"}, + } + + mockHelper := newHelperTest(oidBatch, steps) + + err := doRestoreAgentInternal(mockHelper) + + Expect(err).ToNot(HaveOccurred()) + }) + It("skips batches with corresponding skip file in doRestoreAgent", func() { + // Test Scenario 1. Simulate 1 pass for the doRestoreAgent() function with the specified oids, batches and expected calls + oidBatch := []oidWithBatch{ + {100, 0}, + {200, 0}, + {200, 1}, + {200, 2}, + } + + expectedScenario := []helperTestStep{ + {"mock_100_0", true, -1, false, "Can open pipe for table 100, check_skip_file shall not be called"}, + {"mock_200_0", true, -1, false, "Can open pipe for table 200, check_skip_file shall not be called"}, + {"mock_200_1", false, 200, true, "Can not open pipe for table 200, check_skip_file shall called, skip file exists"}, + {"mock_200_2", true, -1, false, "Went to the next batch, Can open pipe for table 200, check_skip_file shall not be called"}, + } + + helper := newHelperTest(oidBatch, expectedScenario) + + err := doRestoreAgentInternal(helper) + Expect(err).To(BeNil()) + }) + It("skips batches if skip file is discovered with resize restore", func() { + *isResizeRestore = true + *origSize = 3 + *destSize = 5 + + oidBatch := []oidWithBatch{ + {100, 0}, + {200, 0}, + {200, 1}, + {200, 2}, + } + + expectedScenario := []helperTestStep{ + {"mock_100_0", true, -1, false, "Can open pipe for table 100, check_skip_file shall not be called"}, + {"mock_200_0", true, -1, false, "Can open pipe for table 200, check_skip_file shall not be called"}, + {"mock_200_1", false, 200, true, "Can not open pipe for table 200, check_skip_file shall called, skip file exists"}, + {"mock_200_2", true, -1, false, "Went to the next batch, Can open pipe for table 200, check_skip_file shall not be called"}, + } + + helper := newHelperTest(oidBatch, expectedScenario) + err := doRestoreAgentInternal(helper) + Expect(err).To(BeNil()) + }) + It("skips batches if skip file is discovered with single datafile", func() { + *singleDataFile = true + *isResizeRestore = false + *tocFile = testTocFile + + // Although pure concept would be to mock TOC file as well, to keep things simpler + // let's use real TOC file here + writeTestTOC(testTocFile) + defer func() { + _ = os.Remove(*tocFile) + }() + + oidBatch := []oidWithBatch{ + {100, 0}, + {200, 0}, + {200, 1}, + {200, 2}, + } + + expectedScenario := []helperTestStep{ + {"mock_100_0", true, -1, false, "Can open pipe for table 100, check_skip_file shall not be called"}, + {"mock_200_0", true, -1, false, "Can open pipe for table 200, check_skip_file shall not be called"}, + {"mock_200_1", false, 200, true, "Can not open pipe for table 200, check_skip_file shall called, skip file exists"}, + {"mock_200_2", true, -1, false, "Went to the next batch, Can open pipe for table 200, check_skip_file shall not be called"}, + } + + helper := newHelperTest(oidBatch, expectedScenario) + err := doRestoreAgentInternal(helper) + Expect(err).To(BeNil()) + }) + It("calls Wait in waitForPlugin doRestoreAgent for single data file", func() { + *singleDataFile = true + *isResizeRestore = false + *tocFile = testTocFile + + // Although pure concept would be to mock TOC file as well, to keep things simpler + // let's use real TOC file here + writeTestTOC(testTocFile) + defer func() { + _ = os.Remove(*tocFile) + }() + + oidBatch := []oidWithBatch{{100, 0}} + expectedScenario := []helperTestStep{ + {"mock_100_0", true, -1, false, "Some pipe shall be created, out of interest for this test although"}, + } + helper := newHelperTest(oidBatch, expectedScenario) + + err := doRestoreAgentInternal(helper) + Expect(err).ToNot(HaveOccurred()) + + // Check that plugin command's Wait was acually called and only once + Expect(helper.restoreData.waitCount).To(Equal(1)) + }) + It("calls waitForPlugin doRestoreAgent for resize and no single data file ", func() { + Expect(*singleDataFile).To(Equal(false)) + + oidBatch := []oidWithBatch{{100, 0}} + expectedScenario := []helperTestStep{ + {"mock_100_0", true, -1, false, "Some pipe shall be created, out of interest for this test although"}, + } + + helper := newHelperTest(oidBatch, expectedScenario) + + err := doRestoreAgentInternal(helper) + Expect(err).ToNot(HaveOccurred()) + + // Check that plugin command's Wait was acually called and only once + Expect(helper.restoreData.waitCount).To(Equal(1)) + }) + It("calls waitForPlugin doRestoreAgent for reduce cluster and no single data file ", func() { + Expect(*singleDataFile).To(Equal(false)) + *destSize, *origSize = *origSize, *destSize + + oidBatch := []oidWithBatch{{100, 0}} + expectedScenario := []helperTestStep{ + {"mock_100_0", true, -1, false, "Some pipe shall be created, out of interest for this test although"}, + } + + helper := newHelperTest(oidBatch, expectedScenario) + + err := doRestoreAgentInternal(helper) + Expect(err).ToNot(HaveOccurred()) + + // Check that plugin command's Wait was acually called and only once + Expect(helper.restoreData.waitCount).To(Equal(1)) + }) + }) + Describe("RestoreReader tests", func() { + It("waitForPlugin normal completion", func() { + test_cmd1 := testPluginCmd{hasProcess_: true} + test_reader := new(RestoreReader) + test_reader.pluginCmd = &test_cmd1 + + err := test_reader.waitForPlugin() + Expect(err).ToNot(HaveOccurred()) + Expect(test_cmd1.waitCount).To(Equal(1)) + + // Check that waitForPlugin do nothing when no cmd and/or no process + test_cmd2 := testPluginCmd{hasProcess_: false} + test_reader.pluginCmd = &test_cmd2 + err = test_reader.waitForPlugin() + Expect(err).ToNot(HaveOccurred()) + Expect(test_cmd2.waitCount).To(Equal(0)) + }) + It("waitForPlugin error in Wait happened", func() { + msg := "Expected test error" + test_cmd1 := testPluginCmd{hasProcess_: true, error: &msg} + test_reader := new(RestoreReader) + test_reader.pluginCmd = &test_cmd1 + + err := test_reader.waitForPlugin() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal(msg)) + }) + }) }) + +func writeTestTOC(tocFile string) { + // Write test TOC. We are not going to read data using it, so dataLength is a random number + dataLength := 100 + customTOC := fmt.Sprintf(`dataentries: +1: + startbyte: 0 + endbyte: 18 +2: + startbyte: 18 + endbyte: %[1]d +3: + startbyte: %[1]d + endbyte: %d +`, dataLength+18, dataLength+18+18) + fToc, err := os.Create(tocFile) + Expect(err).ShouldNot(HaveOccurred()) + defer fToc.Close() + fToc.WriteString(customTOC) +} diff --git a/helper/restore_helper.go b/helper/restore_helper.go index bea0a1df0..5d4294194 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -11,9 +11,11 @@ import ( "os/exec" "path" "regexp" + "strconv" "strings" "time" + "github.com/greenplum-db/gp-common-go-libs/operating" "github.com/greenplum-db/gpbackup/toc" "github.com/greenplum-db/gpbackup/utils" "github.com/klauspost/compress/zstd" @@ -33,21 +35,32 @@ const ( ) var ( - contentRE *regexp.Regexp + writeHandle *os.File + writer *bufio.Writer + contentRE *regexp.Regexp ) -/* RestoreReader structure to wrap the underlying reader. - * readerType identifies how the reader can be used +/* IRestoreReader interface to wrap the underlying reader. + * getReaderType() identifies how the reader can be used * SEEKABLE uses seekReader. Used when restoring from uncompressed data with filters from local filesystem * NONSEEKABLE and SUBSET types uses bufReader. * SUBSET type applies when restoring using plugin(if compatible) from uncompressed data with filters * NONSEEKABLE type applies for every other restore scenario */ +type IRestoreReader interface { + waitForPlugin() error + positionReader(pos uint64, oid int) error + copyData(num int64) (int64, error) + copyAllData() (int64, error) + closeFileHandle() + getReaderType() ReaderType +} + type RestoreReader struct { fileHandle *os.File bufReader *bufio.Reader seekReader io.ReadSeeker - pluginCmd *PluginCmd + pluginCmd IPluginCmd readerType ReaderType } @@ -55,19 +68,14 @@ type RestoreReader struct { // called on every reader that used a plugin as to not leave any zombies behind. func (r *RestoreReader) waitForPlugin() error { var err error - if r.pluginCmd != nil && r.pluginCmd.Process != nil { - logVerbose(fmt.Sprintf("Waiting for the plugin process (%d)", r.pluginCmd.Process.Pid)) + if r.pluginCmd != nil && r.pluginCmd.hasProcess() { + logVerbose(fmt.Sprintf("Waiting for the plugin process (%d)", r.pluginCmd.pid())) err = r.pluginCmd.Wait() if err != nil { logError(fmt.Sprintf("Plugin process exited with an error: %s", err)) } // Log plugin's stderr as warnings. - errLog := strings.Trim(r.pluginCmd.errBuf.String(), "\x00") - if len(errLog) != 0 { - logWarn(fmt.Sprintf("Plugin log: %s", errLog)) - // Consume the entire buffer. - r.pluginCmd.errBuf.Next(r.pluginCmd.errBuf.Len()) - } + r.pluginCmd.errLog() } return err } @@ -118,10 +126,22 @@ func (r *RestoreReader) copyAllData() (int64, error) { return bytesRead, err } -func closeAndDeletePipe(tableOid int, batchNum int) { +func (r *RestoreReader) getReaderType() ReaderType { + return r.readerType +} + +func (r *RestoreReader) closeFileHandle() { + r.fileHandle.Close() +} + +func (RestoreHelper) createPipe(pipe string) error { + return createPipe(pipe) +} + +func (rh RestoreHelper) closeAndDeletePipe(tableOid int, batchNum int) { pipe := fmt.Sprintf("%s_%d_%d", *pipeFile, tableOid, batchNum) logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, pipe)) - err := flushAndCloseRestoreWriter(pipe, tableOid) + err := rh.flushAndCloseRestoreWriter(pipe, tableOid) if err != nil { logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, err)) } @@ -138,7 +158,30 @@ type oidWithBatch struct { batch int } +type IRestoreHelper interface { + getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) + flushAndCloseRestoreWriter(pipeName string, oid int) error + doRestoreAgentCleanup() + + createPipe(pipe string) error + closeAndDeletePipe(tableOid int, batchNum int) + + getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (IRestoreReader, error) + getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) + checkForSkipFile(pipeFile string, tableOid int) bool +} + +type RestoreHelper struct{} + +func (RestoreHelper) checkForSkipFile(pipeFile string, tableOid int) bool { + return utils.FileExists(fmt.Sprintf("%s_%d", utils.GetSkipFilename(pipeFile), tableOid)) +} + func doRestoreAgent() error { + return doRestoreAgentInternal(new(RestoreHelper)) +} + +func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { // We need to track various values separately per content for resize restore var segmentTOC map[int]*toc.SegmentTOC var tocEntries map[int]map[uint]toc.SegmentDataEntry @@ -149,13 +192,15 @@ func doRestoreAgent() error { var bytesRead int64 var lastError error - readers := make(map[int]*RestoreReader) + readers := make(map[int]IRestoreReader) - oidWithBatchList, err := getOidWithBatchListFromFile(*oidFile) + oidWithBatchList, err := restoreHelper.getOidWithBatchListFromFile(*oidFile) if err != nil { return err } + defer restoreHelper.doRestoreAgentCleanup() + // During a larger-to-smaller restore, we need to do multiple passes for each oid, so the table // restore goes into another nested for loop below. In the normal or smaller-to-larger cases, // this is equivalent to doing a single loop per table. @@ -199,7 +244,7 @@ func doRestoreAgent() error { tocEntries[contentToRestore] = segmentTOC[contentToRestore].DataEntries filename := replaceContentInFilename(*dataFile, contentToRestore) - readers[contentToRestore], err = getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList) + readers[contentToRestore], err = restoreHelper.getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList) if readers[contentToRestore] != nil { // NOTE: If we reach here with batches > 1, there will be // *origSize / *destSize (N old segments / N new segments) @@ -218,7 +263,7 @@ func doRestoreAgent() error { logError(fmt.Sprintf("Error encountered getting restore data reader for single data file: %v", err)) return err } - logVerbose(fmt.Sprintf("Using reader type: %s", readers[contentToRestore].readerType)) + logVerbose(fmt.Sprintf("Using reader type: %s", readers[contentToRestore].getReaderType())) contentToRestore += *destSize } @@ -248,7 +293,7 @@ func doRestoreAgent() error { nextBatchNum := nextOidWithBatch.batch nextPipeToCreate := fmt.Sprintf("%s_%d_%d", *pipeFile, nextOid, nextBatchNum) logVerbose(fmt.Sprintf("Oid %d, Batch %d: Creating pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate)) - err := createPipe(nextPipeToCreate) + err := restoreHelper.createPipe(nextPipeToCreate) if err != nil { logError(fmt.Sprintf("Oid %d, Batch %d: Failed to create pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate)) // In the case this error is hit it means we have lost the @@ -276,12 +321,12 @@ func doRestoreAgent() error { // Close file before it gets overwritten. Free up these // resources when the reader is not needed anymore. if reader, ok := readers[contentToRestore]; ok { - reader.fileHandle.Close() + reader.closeFileHandle() } // We pre-create readers above for the sake of not re-opening SDF readers. For MDF we can't // re-use them but still having them in a map simplifies overall code flow. We repeatedly assign // to a map entry here intentionally. - readers[contentToRestore], err = getRestoreDataReader(filename, nil, nil) + readers[contentToRestore], err = restoreHelper.getRestoreDataReader(filename, nil, nil) if err != nil { logError(fmt.Sprintf("Oid: %d, Batch %d: Error encountered getting restore data reader: %v", tableOid, batchNum, err)) return err @@ -291,7 +336,7 @@ func doRestoreAgent() error { logInfo(fmt.Sprintf("Oid %d, Batch %d: Opening pipe %s", tableOid, batchNum, currentPipe)) for { - writer, writeHandle, err = getRestorePipeWriter(currentPipe) + writer, writeHandle, err = restoreHelper.getRestorePipeWriter(currentPipe) if err != nil { if errors.Is(err, unix.ENXIO) { // COPY (the pipe reader) has not tried to access the pipe yet so our restore_helper @@ -302,7 +347,7 @@ func doRestoreAgent() error { // might be good to have a GPDB version check here. However, the restore helper should // not contain a database connection so the version should be passed through the helper // invocation from gprestore (e.g. create a --db-version flag option). - if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_%d", utils.GetSkipFilename(*pipeFile), tableOid)) { + if *onErrorContinue && restoreHelper.checkForSkipFile(*pipeFile, tableOid) { logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) err = nil skipOid = tableOid @@ -310,7 +355,7 @@ func doRestoreAgent() error { for idx := 0; idx < *copyQueue; idx++ { batchToDelete := batchNum + idx if batchToDelete < batches { - closeAndDeletePipe(tableOid, batchToDelete) + restoreHelper.closeAndDeletePipe(tableOid, batchToDelete) } } goto LoopEnd @@ -378,10 +423,9 @@ func doRestoreAgent() error { lastByte[contentToRestore] = end[contentToRestore] } logInfo(fmt.Sprintf("Oid %d, Batch %d: Copied %d bytes into the pipe", tableOid, batchNum, bytesRead)) - LoopEnd: if tableOid != skipOid { - closeAndDeletePipe(tableOid, batchNum) + restoreHelper.closeAndDeletePipe(tableOid, batchNum) } logVerbose(fmt.Sprintf("Oid %d, Batch %d: End batch restore", tableOid, batchNum)) @@ -414,6 +458,36 @@ func doRestoreAgent() error { return lastError } +func (rh *RestoreHelper) doRestoreAgentCleanup() { + err := rh.flushAndCloseRestoreWriter("Current writer pipe on cleanup", 0) + if err != nil { + logVerbose("Encountered error during cleanup: %v", err) + } +} + +func (RestoreHelper) flushAndCloseRestoreWriter(pipeName string, oid int) error { + if writer != nil { + writer.Write([]byte{}) // simulate writer connected in case of error + err := writer.Flush() + if err != nil { + logError("Oid %d: Failed to flush pipe %s", oid, pipeName) + return err + } + writer = nil + logVerbose("Oid %d: Successfully flushed pipe %s", oid, pipeName) + } + if writeHandle != nil { + err := writeHandle.Close() + if err != nil { + logError("Oid %d: Failed to close pipe handle", oid) + return err + } + writeHandle = nil + logVerbose("Oid %d: Successfully closed pipe handle", oid) + } + return nil +} + func constructSingleTableFilename(name string, contentToRestore int, oid int) string { name = strings.ReplaceAll(name, fmt.Sprintf("gpbackup_%d", *content), fmt.Sprintf("gpbackup_%d", contentToRestore)) nameParts := strings.Split(name, ".") @@ -433,12 +507,30 @@ func replaceContentInFilename(filename string, content int) string { return contentRE.ReplaceAllString(filename, fmt.Sprintf("gpbackup_%d_", content)) } -func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*RestoreReader, error) { +func (RestoreHelper) getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) { + oidStr, err := operating.System.ReadFile(oidFileName) + if err != nil { + logError(fmt.Sprintf("Error encountered reading oid batch list from file: %v", err)) + return nil, err + } + oidStrList := strings.Split(strings.TrimSpace(fmt.Sprintf("%s", oidStr)), "\n") + oidList := make([]oidWithBatch, len(oidStrList)) + for i, entry := range oidStrList { + oidWithBatchEntry := strings.Split(entry, ",") + oidNum, _ := strconv.Atoi(oidWithBatchEntry[0]) + batchNum, _ := strconv.Atoi(oidWithBatchEntry[1]) + + oidList[i] = oidWithBatch{oid: oidNum, batch: batchNum} + } + return oidList, nil +} + +func (RestoreHelper) getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (IRestoreReader, error) { var readHandle io.Reader var seekHandle io.ReadSeeker var isSubset bool var err error = nil - var pluginCmd *PluginCmd = nil + var pluginCmd IPluginCmd restoreReader := new(RestoreReader) if *pluginConfigFile != "" { @@ -470,7 +562,7 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i return nil, err } if pluginCmd != nil { - logVerbose(fmt.Sprintf("Started plugin process (%d)", pluginCmd.Process.Pid)) + logVerbose(fmt.Sprintf("Started plugin process (%d)", pluginCmd.pid())) } // Set the underlying stream reader in restoreReader @@ -497,7 +589,7 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i return restoreReader, err } -func getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) { +func (RestoreHelper) getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) { fileHandle, err := os.OpenFile(currentPipe, os.O_WRONLY|unix.O_NONBLOCK, os.ModeNamedPipe) if err != nil { // error logging handled by calling functions @@ -532,14 +624,38 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool { return true } -// pluginCmd is needed to keep track of readable stderr and whether the command +// IPluginCmd is needed to keep track of readable stderr and whether the command // has already been ended. +type IPluginCmd interface { + hasProcess() bool + pid() int + Wait() error + errLog() +} + type PluginCmd struct { *exec.Cmd errBuf bytes.Buffer } -func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*PluginCmd, io.Reader, bool, error) { +func (p PluginCmd) hasProcess() bool { + return p.Process != nil +} + +func (p PluginCmd) pid() int { + return p.Process.Pid +} + +func (p PluginCmd) errLog() { + errLog := strings.Trim(p.errBuf.String(), "\x00") + if len(errLog) != 0 { + logWarn(fmt.Sprintf("Plugin log: %s", errLog)) + // Consume the entire buffer. + p.errBuf.Next(p.errBuf.Len()) + } +} + +func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (IPluginCmd, io.Reader, bool, error) { isSubset := false pluginConfig, err := utils.ReadPluginConfig(*pluginConfigFile) if err != nil {