Skip to content

Commit

Permalink
Add simple mocking and test for skip files for doRestoreAgent
Browse files Browse the repository at this point in the history
  • Loading branch information
dkovalev1 committed Aug 5, 2024
1 parent 54089ed commit df8e427
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 10 deletions.
13 changes: 12 additions & 1 deletion helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,18 @@ func preloadCreatedPipesForRestore(oidWithBatchList []oidWithBatch, queuedPipeCo
}
}

func getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) {
type Helper interface {
getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error)
createPipe(pipe string) error
}

type HelperImpl struct{}

func (h HelperImpl) createPipe(pipe string) error {
return createPipe(pipe)
}

func (h HelperImpl) getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) {
oidStr, err := operating.System.ReadFile(oidFileName)
if err != nil {
logError(fmt.Sprintf("Error encountered reading oid batch list from file: %v", err))
Expand Down
115 changes: 115 additions & 0 deletions helper/helper_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,87 @@
package helper

import (
"bufio"
"os"

"github.com/greenplum-db/gpbackup/utils"
"golang.org/x/sys/unix"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/greenplum-db/gpbackup/toc"
"github.com/pkg/errors"
)

type test_step struct {
getRestorePipeWriter_arg_expect string
getRestorePipeWriter_result bool
check_skip_file_arg_tableoid int
check_skip_file_result bool
}

type MockHelperImpl struct {
step_no int
expected_steps []test_step
}

func (h MockHelperImpl) getCurStep() test_step {
return h.expected_steps[h.step_no]
}

var test_expected_scenario = []test_step{
{}, // placeholder as steps start from 1
{"mock_100_2", true, -1, false}, // Can open pipe for table 100, check_skip_file shall not be called
{"mock_200_3", true, -1, false}, // Can open pipe for table 200, check_skip_file shall not be called
{"mock_200_4", false, 200, false}, // Can not open pipe for table 200, check_skip_file shall be called, skip file not exists
{"mock_200_4", false, 200, true}, // Can not open pipe for table 200, check_skip_file shall called, skip file exists
{"mock_200_5", true, -1, false}, // Went to the next batch, Can open pipe for table 200, check_skip_file shall not be called
}

func NewSkipFileTest() *MockHelperImpl {
var ret MockHelperImpl
ret.expected_steps = test_expected_scenario
return &ret
}

func (h MockHelperImpl) getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) {
ret := []oidWithBatch{
{100, 2},
{200, 3},
{200, 4},
{200, 5},
}
return ret, nil
}

func (h MockHelperImpl) checkForSkipFile(pipeFile string, tableOid int) bool {

step := h.getCurStep()
Expect(tableOid).To(Equal(step.check_skip_file_arg_tableoid))
ret := h.getCurStep().check_skip_file_result
return ret
}

func (h MockHelperImpl) createPipe(pipe string) error {
return nil
}
func (h *MockHelperImpl) getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*RestoreReader, error) {
return nil, errors.New("getRestoreDataReader Not implemented")
}

func (h *MockHelperImpl) getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) {

h.step_no++
Expect(currentPipe).To(Equal(h.expected_steps[h.step_no].getRestorePipeWriter_arg_expect))

if test_expected_scenario[h.step_no].getRestorePipeWriter_result {
var writer bufio.Writer
return &writer, nil, nil
}
return nil, nil, unix.ENXIO
}

var _ = Describe("helper tests", func() {
var pluginConfig utils.PluginConfig
var isSubset bool
Expand Down Expand Up @@ -98,4 +173,44 @@ var _ = Describe("helper tests", func() {
Expect(isSubset).To(Equal(false))
})
})

Describe("doRestoreAgent", func() {
It("Test skip file in doRestoreAgent", func() {
Expect(1).To(Equal(1))

save_singleDataFile := *singleDataFile
save_content := *content
save_oidFile := *oidFile
save_isResizeRestore := *isResizeRestore
save_origSize := *origSize
save_destSize := *destSize
save_pipeFile := *pipeFile
save_onErrorContinue := *onErrorContinue

*singleDataFile = false
*content = 1
*oidFile = "testoid.dat"
*isResizeRestore = true
*origSize = 1
*destSize = 1
*pipeFile = "mock"
*onErrorContinue = true

helper := NewSkipFileTest()

err := doRestoreAgent_internal(helper, helper)

Expect(err).To(BeNil())

*singleDataFile = save_singleDataFile
*content = save_content
*oidFile = save_oidFile
*isResizeRestore = save_isResizeRestore
*origSize = save_origSize
*destSize = save_destSize
*pipeFile = save_pipeFile
*onErrorContinue = save_onErrorContinue

})
})
})
36 changes: 27 additions & 9 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,19 @@ type oidWithBatch struct {
batch int
}

func doRestoreAgent() error {
type RestoreHelper interface {
getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*RestoreReader, error)
getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error)
checkForSkipFile(pipeFile string, tableOid int) bool
}

type RestoreHelperImpl struct{}

func (RestoreHelperImpl) checkForSkipFile(pipeFile string, tableOid int) bool {
return utils.FileExists(fmt.Sprintf("%s_skip_%d", pipeFile, tableOid))
}

func doRestoreAgent_internal(h Helper, rh RestoreHelper) error {
// We need to track various values separately per content for resize restore
var segmentTOC map[int]*toc.SegmentTOC
var tocEntries map[int]map[uint]toc.SegmentDataEntry
Expand All @@ -136,7 +148,7 @@ func doRestoreAgent() error {

readers := make(map[int]*RestoreReader)

oidWithBatchList, err := getOidWithBatchListFromFile(*oidFile)
oidWithBatchList, err := h.getOidWithBatchListFromFile(*oidFile)
if err != nil {
return err
}
Expand Down Expand Up @@ -184,7 +196,7 @@ func doRestoreAgent() error {
tocEntries[contentToRestore] = segmentTOC[contentToRestore].DataEntries

filename := replaceContentInFilename(*dataFile, contentToRestore)
readers[contentToRestore], err = getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList)
readers[contentToRestore], err = rh.getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList)
if readers[contentToRestore] != nil {
// NOTE: If we reach here with batches > 1, there will be
// *origSize / *destSize (N old segments / N new segments)
Expand Down Expand Up @@ -229,7 +241,7 @@ func doRestoreAgent() error {
nextBatchNum := nextOidWithBatch.batch
nextPipeToCreate := fmt.Sprintf("%s_%d_%d", *pipeFile, nextOid, nextBatchNum)
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Creating pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
err := createPipe(nextPipeToCreate)
err := h.createPipe(nextPipeToCreate)
if err != nil {
logError(fmt.Sprintf("Oid %d, Batch %d: Failed to create pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
// In the case this error is hit it means we have lost the
Expand All @@ -256,7 +268,7 @@ func doRestoreAgent() error {
// We pre-create readers above for the sake of not re-opening SDF readers. For MDF we can't
// re-use them but still having them in a map simplifies overall code flow. We repeatedly assign
// to a map entry here intentionally.
readers[contentToRestore], err = getRestoreDataReader(filename, nil, nil)
readers[contentToRestore], err = rh.getRestoreDataReader(filename, nil, nil)
if err != nil {
logError(fmt.Sprintf("Oid: %d, Batch %d: Error encountered getting restore data reader: %v", tableOid, batchNum, err))
return err
Expand All @@ -266,7 +278,7 @@ func doRestoreAgent() error {

logInfo(fmt.Sprintf("Oid %d, Batch %d: Opening pipe %s", tableOid, batchNum, currentPipe))
for {
writer, writeHandle, err = getRestorePipeWriter(currentPipe)
writer, writeHandle, err = rh.getRestorePipeWriter(currentPipe)
if err != nil {
if errors.Is(err, unix.ENXIO) {
// COPY (the pipe reader) has not tried to access the pipe yet so our restore_helper
Expand All @@ -277,7 +289,7 @@ func doRestoreAgent() error {
// might be good to have a GPDB version check here. However, the restore helper should
// not contain a database connection so the version should be passed through the helper
// invocation from gprestore (e.g. create a --db-version flag option).
if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_skip_%d", *pipeFile, tableOid)) {
if *onErrorContinue && rh.checkForSkipFile(*pipeFile, tableOid) {
logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum))
err = nil
goto LoopEnd
Expand Down Expand Up @@ -387,6 +399,12 @@ func doRestoreAgent() error {
return lastError
}

func doRestoreAgent() error {
helper := new(HelperImpl)
restorer := new(RestoreHelperImpl)
return doRestoreAgent_internal(helper, restorer)
}

func constructSingleTableFilename(name string, contentToRestore int, oid int) string {
name = strings.ReplaceAll(name, fmt.Sprintf("gpbackup_%d", *content), fmt.Sprintf("gpbackup_%d", contentToRestore))
nameParts := strings.Split(name, ".")
Expand All @@ -406,7 +424,7 @@ func replaceContentInFilename(filename string, content int) string {
return contentRE.ReplaceAllString(filename, fmt.Sprintf("gpbackup_%d_", content))
}

func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*RestoreReader, error) {
func (RestoreHelperImpl) getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*RestoreReader, error) {
var readHandle io.Reader
var seekHandle io.ReadSeeker
var isSubset bool
Expand Down Expand Up @@ -470,7 +488,7 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i
return restoreReader, err
}

func getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) {
func (RestoreHelperImpl) getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) {
fileHandle, err := os.OpenFile(currentPipe, os.O_WRONLY|unix.O_NONBLOCK, os.ModeNamedPipe)
if err != nil {
// error logging handled by calling functions
Expand Down

0 comments on commit df8e427

Please sign in to comment.