Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADBDEV 6061 Add test for doRestoreAgent in gpbackup #102

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
df8e427
Add simple mocking and test for skip files for doRestoreAgent
dkovalev1 Aug 5, 2024
9106dc8
more clear test definition, add checks for pipes existance
dkovalev1 Aug 14, 2024
8d547f7
Add two more tests to mocking sample
dkovalev1 Aug 20, 2024
a738d7f
Merge branch 'master' into ADBDEV-6061
dkovalev1 Aug 21, 2024
2681013
Fix after merge
dkovalev1 Aug 21, 2024
68b4fcd
implement mocked methods
dkovalev1 Aug 22, 2024
189a078
Change variable case.
dkovalev1 Aug 26, 2024
35e7741
Rename symbols to follow code style
dkovalev1 Aug 27, 2024
381cf2b
Save all global variables on upper level for tests
dkovalev1 Aug 28, 2024
a2ad824
Add recent changes from ADBDEV-6012 and a few more tests
dkovalev1 Sep 4, 2024
aa5d445
Rename interfaces and structs
dkovalev1 Sep 4, 2024
64c5d30
use closeFileHandle instead of getting handle
dkovalev1 Sep 4, 2024
fc23f5a
Add closeAndDeletePipe to the Helper interface
dkovalev1 Sep 4, 2024
7fd9df3
Merge branch 'master' into ADBDEV-6061
dkovalev1 Sep 9, 2024
0171cee
Rename RestoreHelperImpl to RestoreHelper
dkovalev1 Sep 9, 2024
b1fab12
Do not save global variables for test and other cosmetic changes
dkovalev1 Sep 16, 2024
16e5f92
Merge branch 'master' into ADBDEV-6061
dkovalev1 Sep 24, 2024
b34c37f
Add test for plugin error conditions
dkovalev1 Sep 24, 2024
e45e70a
Refactoring suggested by the review
dkovalev1 Oct 2, 2024
247d924
Refactoring inspired by the code review
dkovalev1 Oct 3, 2024
6bba266
Changes according to review
dkovalev1 Oct 10, 2024
eaf99c6
revert some changes to helper.go, make refactroring smaller
dkovalev1 Oct 11, 2024
173fbba
Refactoring inspired by the code review
dkovalev1 Oct 15, 2024
696c2b2
Regactoring for the code review.
dkovalev1 Oct 21, 2024
3586685
Merge branch 'master' into ADBDEV-6061
dkovalev1 Oct 21, 2024
dd8a4da
Move comment into test description, simplify pipes list
dkovalev1 Oct 22, 2024
fb011fa
Use map search for opened pipes, use mocked struct to store preloaded…
dkovalev1 Oct 23, 2024
915c959
Use bool zero value as pipesMap lookup result
dkovalev1 Oct 24, 2024
898f5fa
Make code more readable
dkovalev1 Oct 24, 2024
08edf94
Simplify test after code review
dkovalev1 Oct 24, 2024
53402c7
Changes inspired by code review
dkovalev1 Oct 28, 2024
cae0e76
Fix build
dkovalev1 Oct 28, 2024
ac15111
move closing pipe to increase locality
dkovalev1 Nov 14, 2024
85e8b4f
Merge branch 'master' into ADBDEV-6061
dkovalev1 Nov 29, 2024
d9895dd
Merge branch 'master' into ADBDEV-6061
dkovalev1 Dec 6, 2024
adc6d29
Update tests to the current SUT state and pipe lifetime checks
dkovalev1 Dec 9, 2024
133454e
Merge branch 'master' into ADBDEV-6061
dkovalev1 Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,19 @@ func preloadCreatedPipesForRestore(oidWithBatchList []oidWithBatch, queuedPipeCo
}
}

func getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) {
type Helper interface {
getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error)
createPipe(pipe string) error
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
flushAndCloseRestoreWriter(pipeName string, oid int) error
}

type HelperImpl struct{}

func (h HelperImpl) createPipe(pipe string) error {
return createPipe(pipe)
}

func (h HelperImpl) getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) {
oidStr, err := operating.System.ReadFile(oidFileName)
if err != nil {
logError(fmt.Sprintf("Error encountered reading oid batch list from file: %v", err))
Expand Down Expand Up @@ -245,7 +257,7 @@ func getOidListFromFile(oidFileName string) ([]int, error) {
return oidList, nil
}

func flushAndCloseRestoreWriter(pipeName string, oid int) error {
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
func (h *HelperImpl) flushAndCloseRestoreWriter(pipeName string, oid int) error {
if writer != nil {
writer.Write([]byte{}) // simulate writer connected in case of error
err := writer.Flush()
Expand Down Expand Up @@ -291,7 +303,7 @@ func DoCleanup() {
logVerbose("Encountered error closing error file: %v", err)
}
}
err := flushAndCloseRestoreWriter("Current writer pipe on cleanup", 0)
err := new(HelperImpl).flushAndCloseRestoreWriter("Current writer pipe on cleanup", 0)
if err != nil {
logVerbose("Encountered error during cleanup: %v", err)
}
Expand Down
147 changes: 147 additions & 0 deletions helper/helper_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,103 @@
package helper

import (
"bufio"
"os"

"github.com/greenplum-db/gpbackup/utils"
"golang.org/x/sys/unix"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/greenplum-db/gpbackup/toc"
"github.com/pkg/errors"
)

type SkipFile_test_step struct {
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
getRestorePipeWriter_arg_expect string
getRestorePipeWriter_result bool
check_skip_file_arg_tableoid int
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
check_skip_file_result bool
}

type SkipFileMockHelperImpl struct {
step_no int
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
expected_oidbatch []oidWithBatch
expected_steps []SkipFile_test_step

opened_pipes map[string]string // Ginkgo matcher works over map value, will diplicate key here
}

func (h *SkipFileMockHelperImpl) getCurStep() SkipFile_test_step {
return h.expected_steps[h.step_no]
}

func NewSkipFileTest(batches []oidWithBatch, steps []SkipFile_test_step) *SkipFileMockHelperImpl {
var ret = new(SkipFileMockHelperImpl)
ret.expected_oidbatch = batches
ret.expected_steps = steps
ret.opened_pipes = nil
return ret
}

func (h *SkipFileMockHelperImpl) getOidWithBatchListFromFile(oidFileName string) ([]oidWithBatch, error) {

whitehawk marked this conversation as resolved.
Show resolved Hide resolved
return h.expected_oidbatch, nil
}

func (h *SkipFileMockHelperImpl) checkForSkipFile(pipeFile string, tableOid int) bool {

step := h.getCurStep()
Expect(tableOid).To(Equal(step.check_skip_file_arg_tableoid))
ret := h.getCurStep().check_skip_file_result
return ret
}

func (h *SkipFileMockHelperImpl) createPipe(pipe string) error {

// Some pipes are preopened before calling doRestoreAgent. Add them to pipe tracking list
if h.opened_pipes == nil {
h.opened_pipes = map[string]string{}

for k, _ := range pipesMap {
h.opened_pipes[k] = k
}
}
// Check that pipe was not opened yet
Expect(h.opened_pipes).ShouldNot(ContainElement(pipe))

h.opened_pipes[pipe] = pipe
return nil
}

func (h *SkipFileMockHelperImpl) flushAndCloseRestoreWriter(pipeName string, oid int) error {

whitehawk marked this conversation as resolved.
Show resolved Hide resolved
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
// Check that we are closing pipe which is opened
Expect(h.opened_pipes).To(ContainElement(pipeName))
delete(h.opened_pipes, pipeName)
return nil
}

func (h *SkipFileMockHelperImpl) getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*RestoreReader, error) {
return nil, errors.New("getRestoreDataReader Not implemented")
RekGRpth marked this conversation as resolved.
Show resolved Hide resolved
}

func (h *SkipFileMockHelperImpl) getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) {

h.step_no++
Expect(currentPipe).To(Equal(h.getCurStep().getRestorePipeWriter_arg_expect))

// The pipe should be created before
Expect(h.opened_pipes).Should(ContainElement(currentPipe))

if h.getCurStep().getRestorePipeWriter_result {
var writer bufio.Writer
return &writer, nil, nil
}
return nil, nil, unix.ENXIO
}

var _ = Describe("helper tests", func() {
var pluginConfig utils.PluginConfig
var isSubset bool
Expand Down Expand Up @@ -98,4 +189,60 @@ var _ = Describe("helper tests", func() {
Expect(isSubset).To(Equal(false))
})
})

Describe("doRestoreAgent Mocked unit tests", func() {
It("Test skip file in doRestoreAgent", func() {
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
Expect(1).To(Equal(1))
whitehawk marked this conversation as resolved.
Show resolved Hide resolved

save_singleDataFile := *singleDataFile
save_content := *content
save_oidFile := *oidFile
save_isResizeRestore := *isResizeRestore
save_origSize := *origSize
save_destSize := *destSize
save_pipeFile := *pipeFile
save_onErrorContinue := *onErrorContinue

*singleDataFile = false
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
*content = 1
*oidFile = "testoid.dat"
*isResizeRestore = true
*origSize = 5
*destSize = 3
*pipeFile = "mock"
*onErrorContinue = true
whitehawk marked this conversation as resolved.
Show resolved Hide resolved

// Test Scenario 1. Simulate 1 pass for the doRestoreAgent() function with the specified oids, bathces and expected calls
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
oid_batch := []oidWithBatch{
{100, 2},
{200, 3},
{200, 4},
{200, 5},
}

expected_scenario := []SkipFile_test_step{
{}, // placeholder as steps start from 1
{"mock_100_2", true, -1, false}, // Can open pipe for table 100, check_skip_file shall not be called
{"mock_200_3", true, -1, false}, // Can open pipe for table 200, check_skip_file shall not be called
{"mock_200_4", false, 200, false}, // Can not open pipe for table 200, check_skip_file shall be called, skip file not exists
{"mock_200_4", false, 200, true}, // Can not open pipe for table 200, check_skip_file shall called, skip file exists
{"mock_200_5", true, -1, false}, // Went to the next batch, Can open pipe for table 200, check_skip_file shall not be called
}

helper := NewSkipFileTest(oid_batch, expected_scenario)

err := doRestoreAgent_internal(helper, helper)
Expect(err).To(BeNil())

*singleDataFile = save_singleDataFile
*content = save_content
*oidFile = save_oidFile
*isResizeRestore = save_isResizeRestore
*origSize = save_origSize
*destSize = save_destSize
*pipeFile = save_pipeFile
*onErrorContinue = save_onErrorContinue

})
})
})
38 changes: 28 additions & 10 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,19 @@ type oidWithBatch struct {
batch int
}

func doRestoreAgent() error {
type RestoreHelper interface {
RekGRpth marked this conversation as resolved.
Show resolved Hide resolved
getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*RestoreReader, error)
getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error)
checkForSkipFile(pipeFile string, tableOid int) bool
}

type RestoreHelperImpl struct{}
RekGRpth marked this conversation as resolved.
Show resolved Hide resolved

func (RestoreHelperImpl) checkForSkipFile(pipeFile string, tableOid int) bool {
return utils.FileExists(fmt.Sprintf("%s_skip_%d", pipeFile, tableOid))
}

func doRestoreAgent_internal(h Helper, rh RestoreHelper) error {
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
// We need to track various values separately per content for resize restore
var segmentTOC map[int]*toc.SegmentTOC
var tocEntries map[int]map[uint]toc.SegmentDataEntry
Expand All @@ -136,7 +148,7 @@ func doRestoreAgent() error {

readers := make(map[int]*RestoreReader)

oidWithBatchList, err := getOidWithBatchListFromFile(*oidFile)
oidWithBatchList, err := h.getOidWithBatchListFromFile(*oidFile)
if err != nil {
return err
}
Expand Down Expand Up @@ -184,7 +196,7 @@ func doRestoreAgent() error {
tocEntries[contentToRestore] = segmentTOC[contentToRestore].DataEntries

filename := replaceContentInFilename(*dataFile, contentToRestore)
readers[contentToRestore], err = getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList)
readers[contentToRestore], err = rh.getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList)
if readers[contentToRestore] != nil {
// NOTE: If we reach here with batches > 1, there will be
// *origSize / *destSize (N old segments / N new segments)
Expand Down Expand Up @@ -229,7 +241,7 @@ func doRestoreAgent() error {
nextBatchNum := nextOidWithBatch.batch
nextPipeToCreate := fmt.Sprintf("%s_%d_%d", *pipeFile, nextOid, nextBatchNum)
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Creating pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
err := createPipe(nextPipeToCreate)
err := h.createPipe(nextPipeToCreate)
if err != nil {
logError(fmt.Sprintf("Oid %d, Batch %d: Failed to create pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
// In the case this error is hit it means we have lost the
Expand All @@ -256,7 +268,7 @@ func doRestoreAgent() error {
// We pre-create readers above for the sake of not re-opening SDF readers. For MDF we can't
// re-use them but still having them in a map simplifies overall code flow. We repeatedly assign
// to a map entry here intentionally.
readers[contentToRestore], err = getRestoreDataReader(filename, nil, nil)
readers[contentToRestore], err = rh.getRestoreDataReader(filename, nil, nil)
if err != nil {
logError(fmt.Sprintf("Oid: %d, Batch %d: Error encountered getting restore data reader: %v", tableOid, batchNum, err))
return err
Expand All @@ -266,7 +278,7 @@ func doRestoreAgent() error {

logInfo(fmt.Sprintf("Oid %d, Batch %d: Opening pipe %s", tableOid, batchNum, currentPipe))
for {
writer, writeHandle, err = getRestorePipeWriter(currentPipe)
writer, writeHandle, err = rh.getRestorePipeWriter(currentPipe)
if err != nil {
if errors.Is(err, unix.ENXIO) {
// COPY (the pipe reader) has not tried to access the pipe yet so our restore_helper
Expand All @@ -277,7 +289,7 @@ func doRestoreAgent() error {
// might be good to have a GPDB version check here. However, the restore helper should
// not contain a database connection so the version should be passed through the helper
// invocation from gprestore (e.g. create a --db-version flag option).
if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_skip_%d", *pipeFile, tableOid)) {
if *onErrorContinue && rh.checkForSkipFile(*pipeFile, tableOid) {
logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum))
err = nil
goto LoopEnd
Expand Down Expand Up @@ -348,7 +360,7 @@ func doRestoreAgent() error {

LoopEnd:
logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, currentPipe))
err = flushAndCloseRestoreWriter(currentPipe, tableOid)
err = h.flushAndCloseRestoreWriter(currentPipe, tableOid)
if err != nil {
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, err))
}
Expand Down Expand Up @@ -387,6 +399,12 @@ func doRestoreAgent() error {
return lastError
}

func doRestoreAgent() error {
helper := new(HelperImpl)
restorer := new(RestoreHelperImpl)
return doRestoreAgent_internal(helper, restorer)
}

func constructSingleTableFilename(name string, contentToRestore int, oid int) string {
name = strings.ReplaceAll(name, fmt.Sprintf("gpbackup_%d", *content), fmt.Sprintf("gpbackup_%d", contentToRestore))
nameParts := strings.Split(name, ".")
Expand All @@ -406,7 +424,7 @@ func replaceContentInFilename(filename string, content int) string {
return contentRE.ReplaceAllString(filename, fmt.Sprintf("gpbackup_%d_", content))
}

func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*RestoreReader, error) {
func (RestoreHelperImpl) getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*RestoreReader, error) {
var readHandle io.Reader
var seekHandle io.ReadSeeker
var isSubset bool
Expand Down Expand Up @@ -470,7 +488,7 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i
return restoreReader, err
}

func getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) {
func (RestoreHelperImpl) getRestorePipeWriter(currentPipe string) (*bufio.Writer, *os.File, error) {
fileHandle, err := os.OpenFile(currentPipe, os.O_WRONLY|unix.O_NONBLOCK, os.ModeNamedPipe)
if err != nil {
// error logging handled by calling functions
Expand Down