Skip to content

Commit a11ddaf

Browse files
jkowalskiJulio López
andauthored
restore: added support for incremental restore and ignoring copy errors (kopia#794)
* restore: added support for incremental restore and ignoring copy errors Co-authored-by: Julio López <[email protected]>
1 parent 2a58166 commit a11ddaf

File tree

7 files changed

+192
-15
lines changed

7 files changed

+192
-15
lines changed

cli/command_restore.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ var (
7777
restoreSkipTimes = false
7878
restoreSkipOwners = false
7979
restoreSkipPermissions = false
80+
restoreIncremental = false
81+
restoreIgnoreErrors = false
8082
)
8183

8284
const (
@@ -101,6 +103,8 @@ func addRestoreFlags(cmd *kingpin.CmdClause) {
101103
cmd.Flag("skip-permissions", "Skip permissions during restore").BoolVar(&restoreSkipPermissions)
102104
cmd.Flag("skip-times", "Skip times during restore").BoolVar(&restoreSkipTimes)
103105
cmd.Flag("ignore-permission-errors", "Ignore permission errors").BoolVar(&restoreIgnorePermissionErrors)
106+
cmd.Flag("ignore-errors", "Ignore all errors").BoolVar(&restoreIgnoreErrors)
107+
cmd.Flag("skip-existing", "Skip files and symlinks that exist in the output").BoolVar(&restoreIncremental)
104108
}
105109

106110
func restoreOutput(ctx context.Context) (restore.Output, error) {
@@ -182,7 +186,22 @@ func detectRestoreMode(ctx context.Context, m string) string {
182186
}
183187

184188
func printRestoreStats(ctx context.Context, st restore.Stats) {
185-
log(ctx).Infof("Restored %v files, %v directories and %v symbolic links (%v)\n", st.RestoredFileCount, st.RestoredDirCount, st.RestoredSymlinkCount, units.BytesStringBase10(st.RestoredTotalFileSize))
189+
var maybeSkipped, maybeErrors string
190+
191+
if st.SkippedCount > 0 {
192+
maybeSkipped = fmt.Sprintf(", skipped %v (%v)", st.SkippedCount, units.BytesStringBase10(st.SkippedTotalFileSize))
193+
}
194+
195+
if st.IgnoredErrorCount > 0 {
196+
maybeErrors = fmt.Sprintf(", ignored %v errors", st.IgnoredErrorCount)
197+
}
198+
199+
log(ctx).Infof("Restored %v files, %v directories and %v symbolic links (%v)%v%v.\n",
200+
st.RestoredFileCount,
201+
st.RestoredDirCount,
202+
st.RestoredSymlinkCount,
203+
units.BytesStringBase10(st.RestoredTotalFileSize),
204+
maybeSkipped, maybeErrors)
186205
}
187206

188207
func runRestoreCommand(ctx context.Context, rep repo.Repository) error {
@@ -199,16 +218,18 @@ func runRestoreCommand(ctx context.Context, rep repo.Repository) error {
199218
t0 := clock.Now()
200219

201220
st, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{
202-
Parallel: restoreParallel,
221+
Parallel: restoreParallel,
222+
Incremental: restoreIncremental,
223+
IgnoreErrors: restoreIgnoreErrors,
203224
ProgressCallback: func(ctx context.Context, stats restore.Stats) {
204-
restoredCount := stats.RestoredFileCount + stats.RestoredDirCount + stats.RestoredSymlinkCount
225+
restoredCount := stats.RestoredFileCount + stats.RestoredDirCount + stats.RestoredSymlinkCount + stats.SkippedCount
205226
enqueuedCount := stats.EnqueuedFileCount + stats.EnqueuedDirCount + stats.EnqueuedSymlinkCount
206227

207228
if restoredCount == 0 {
208229
return
209230
}
210231

211-
var maybeRemaining string
232+
var maybeRemaining, maybeSkipped, maybeErrors string
212233

213234
if stats.EnqueuedTotalFileSize > 0 {
214235
progress := float64(stats.RestoredTotalFileSize) / float64(stats.EnqueuedTotalFileSize)
@@ -223,9 +244,19 @@ func runRestoreCommand(ctx context.Context, rep repo.Repository) error {
223244
}
224245
}
225246

226-
log(ctx).Infof("Processed %v (%v) of %v (%v)%v.",
247+
if stats.SkippedCount > 0 {
248+
maybeSkipped = fmt.Sprintf(", skipped %v (%v)", stats.SkippedCount, units.BytesStringBase10(stats.SkippedTotalFileSize))
249+
}
250+
251+
if stats.IgnoredErrorCount > 0 {
252+
maybeErrors = fmt.Sprintf(", ignored %v errors", stats.IgnoredErrorCount)
253+
}
254+
255+
log(ctx).Infof("Processed %v (%v) of %v (%v)%v%v.",
227256
restoredCount, units.BytesStringBase10(stats.RestoredTotalFileSize),
228257
enqueuedCount, units.BytesStringBase10(stats.EnqueuedTotalFileSize),
258+
maybeSkipped,
259+
maybeErrors,
229260
maybeRemaining)
230261
},
231262
})

snapshot/restore/local_fs_output.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"os"
88
"path/filepath"
99
"runtime"
10+
"time"
1011

1112
"github.com/natefinch/atomic"
1213
"github.com/pkg/errors"
@@ -17,6 +18,8 @@ import (
1718

1819
const modBits = os.ModePerm | os.ModeSetgid | os.ModeSetuid | os.ModeSticky
1920

21+
const maxTimeDeltaToConsiderFileTheSame = 2 * time.Second
22+
2023
// FilesystemOutput contains the options for outputting a file system tree.
2124
type FilesystemOutput struct {
2225
// TargetPath for restore.
@@ -95,6 +98,31 @@ func (o *FilesystemOutput) WriteFile(ctx context.Context, relativePath string, f
9598
return nil
9699
}
97100

101+
// FileExists implements restore.Output interface.
102+
func (o *FilesystemOutput) FileExists(ctx context.Context, relativePath string, e fs.File) bool {
103+
st, err := os.Lstat(filepath.Join(o.TargetPath, relativePath))
104+
if err != nil {
105+
return false
106+
}
107+
108+
if (st.Mode() & os.ModeType) != 0 {
109+
// not a file
110+
return false
111+
}
112+
113+
if st.Size() != e.Size() {
114+
// wrong size
115+
return false
116+
}
117+
118+
timeDelta := st.ModTime().Sub(e.ModTime())
119+
if timeDelta < 0 {
120+
timeDelta = -timeDelta
121+
}
122+
123+
return timeDelta < maxTimeDeltaToConsiderFileTheSame
124+
}
125+
98126
// CreateSymlink implements restore.Output interface.
99127
func (o *FilesystemOutput) CreateSymlink(ctx context.Context, relativePath string, e fs.Symlink) error {
100128
targetPath, err := e.Readlink(ctx)
@@ -139,6 +167,16 @@ func fileIsSymlink(stat os.FileInfo) bool {
139167
return stat.Mode()&os.ModeSymlink != 0
140168
}
141169

170+
// SymlinkExists implements restore.Output interface.
171+
func (o *FilesystemOutput) SymlinkExists(ctx context.Context, relativePath string, e fs.Symlink) bool {
172+
st, err := os.Lstat(filepath.Join(o.TargetPath, relativePath))
173+
if err != nil {
174+
return false
175+
}
176+
177+
return (st.Mode() & os.ModeType) == os.ModeSymlink
178+
}
179+
142180
// set permission, modification time and user/group ids on targetPath.
143181
func (o *FilesystemOutput) setAttributes(targetPath string, e fs.Entry) error {
144182
le, err := localfs.NewEntry(targetPath)

snapshot/restore/restore.go

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,45 +22,61 @@ type Output interface {
2222
BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error
2323
FinishDirectory(ctx context.Context, relativePath string, e fs.Directory) error
2424
WriteFile(ctx context.Context, relativePath string, e fs.File) error
25+
FileExists(ctx context.Context, relativePath string, e fs.File) bool
2526
CreateSymlink(ctx context.Context, relativePath string, e fs.Symlink) error
27+
SymlinkExists(ctx context.Context, relativePath string, e fs.Symlink) bool
2628
Close(ctx context.Context) error
2729
}
2830

2931
// Stats represents restore statistics.
3032
type Stats struct {
3133
RestoredTotalFileSize int64
3234
EnqueuedTotalFileSize int64
35+
SkippedTotalFileSize int64
3336

3437
RestoredFileCount int32
3538
RestoredDirCount int32
3639
RestoredSymlinkCount int32
3740
EnqueuedFileCount int32
3841
EnqueuedDirCount int32
3942
EnqueuedSymlinkCount int32
43+
SkippedCount int32
44+
IgnoredErrorCount int32
4045
}
4146

4247
func (s *Stats) clone() Stats {
4348
return Stats{
4449
RestoredTotalFileSize: atomic.LoadInt64(&s.RestoredTotalFileSize),
4550
EnqueuedTotalFileSize: atomic.LoadInt64(&s.EnqueuedTotalFileSize),
46-
RestoredFileCount: atomic.LoadInt32(&s.RestoredFileCount),
47-
RestoredDirCount: atomic.LoadInt32(&s.RestoredDirCount),
48-
RestoredSymlinkCount: atomic.LoadInt32(&s.RestoredSymlinkCount),
49-
EnqueuedFileCount: atomic.LoadInt32(&s.EnqueuedFileCount),
50-
EnqueuedDirCount: atomic.LoadInt32(&s.EnqueuedDirCount),
51-
EnqueuedSymlinkCount: atomic.LoadInt32(&s.EnqueuedSymlinkCount),
51+
SkippedTotalFileSize: atomic.LoadInt64(&s.SkippedTotalFileSize),
52+
53+
RestoredFileCount: atomic.LoadInt32(&s.RestoredFileCount),
54+
RestoredDirCount: atomic.LoadInt32(&s.RestoredDirCount),
55+
RestoredSymlinkCount: atomic.LoadInt32(&s.RestoredSymlinkCount),
56+
EnqueuedFileCount: atomic.LoadInt32(&s.EnqueuedFileCount),
57+
EnqueuedDirCount: atomic.LoadInt32(&s.EnqueuedDirCount),
58+
EnqueuedSymlinkCount: atomic.LoadInt32(&s.EnqueuedSymlinkCount),
59+
SkippedCount: atomic.LoadInt32(&s.SkippedCount),
60+
IgnoredErrorCount: atomic.LoadInt32(&s.IgnoredErrorCount),
5261
}
5362
}
5463

5564
// Options provides optional restore parameters.
5665
type Options struct {
5766
Parallel int
5867
ProgressCallback func(ctx context.Context, s Stats)
68+
Incremental bool
69+
IgnoreErrors bool
5970
}
6071

6172
// Entry walks a snapshot root with given root entry and restores it to the provided output.
6273
func Entry(ctx context.Context, rep repo.Repository, output Output, rootEntry fs.Entry, options Options) (Stats, error) {
63-
c := copier{output: output, q: parallelwork.NewQueue()}
74+
c := copier{
75+
output: output,
76+
q: parallelwork.NewQueue(),
77+
incremental: options.Incremental,
78+
ignoreErrors: options.IgnoreErrors,
79+
}
6480

6581
c.q.ProgressCallback = func(ctx context.Context, enqueued, active, completed int64) {
6682
options.ProgressCallback(ctx, c.stats.clone())
@@ -91,12 +107,52 @@ func Entry(ctx context.Context, rep repo.Repository, output Output, rootEntry fs
91107
}
92108

93109
type copier struct {
94-
stats Stats
95-
output Output
96-
q *parallelwork.Queue
110+
stats Stats
111+
output Output
112+
q *parallelwork.Queue
113+
incremental bool
114+
ignoreErrors bool
97115
}
98116

99117
func (c *copier) copyEntry(ctx context.Context, e fs.Entry, targetPath string, onCompletion func() error) error {
118+
if c.incremental {
119+
// in incremental mode, do not copy if the output already exists
120+
switch e := e.(type) {
121+
case fs.File:
122+
if c.output.FileExists(ctx, targetPath, e) {
123+
log(ctx).Debugf("skipping file %v because it already exists and metadata matches", targetPath)
124+
atomic.AddInt32(&c.stats.SkippedCount, 1)
125+
atomic.AddInt64(&c.stats.SkippedTotalFileSize, e.Size())
126+
127+
return onCompletion()
128+
}
129+
130+
case fs.Symlink:
131+
if c.output.SymlinkExists(ctx, targetPath, e) {
132+
atomic.AddInt32(&c.stats.SkippedCount, 1)
133+
log(ctx).Debugf("skipping symlink %v because it already exists", targetPath)
134+
135+
return onCompletion()
136+
}
137+
}
138+
}
139+
140+
err := c.copyEntryInternal(ctx, e, targetPath, onCompletion)
141+
if err == nil {
142+
return nil
143+
}
144+
145+
if c.ignoreErrors {
146+
atomic.AddInt32(&c.stats.IgnoredErrorCount, 1)
147+
log(ctx).Warningf("ignored error %v on %v", err, targetPath)
148+
149+
return nil
150+
}
151+
152+
return err
153+
}
154+
155+
func (c *copier) copyEntryInternal(ctx context.Context, e fs.Entry, targetPath string, onCompletion func() error) error {
100156
switch e := e.(type) {
101157
case fs.Directory:
102158
log(ctx).Debugf("dir: '%v'", targetPath)

snapshot/restore/tar_output.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ func (o *TarOutput) WriteFile(ctx context.Context, relativePath string, f fs.Fil
8686
return nil
8787
}
8888

89+
// FileExists implements restore.Output interface.
90+
func (o *TarOutput) FileExists(ctx context.Context, relativePath string, f fs.File) bool {
91+
return false
92+
}
93+
8994
// CreateSymlink implements restore.Output interface.
9095
func (o *TarOutput) CreateSymlink(ctx context.Context, relativePath string, l fs.Symlink) error {
9196
target, err := l.Readlink(ctx)
@@ -110,6 +115,11 @@ func (o *TarOutput) CreateSymlink(ctx context.Context, relativePath string, l fs
110115
return nil
111116
}
112117

118+
// SymlinkExists implements restore.Output interface.
119+
func (o *TarOutput) SymlinkExists(ctx context.Context, relativePath string, l fs.Symlink) bool {
120+
return false
121+
}
122+
113123
// NewTarOutput creates new tar writer output.
114124
func NewTarOutput(w io.WriteCloser) *TarOutput {
115125
return &TarOutput{w, tar.NewWriter(w)}

snapshot/restore/zip_output.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,22 @@ func (o *ZipOutput) WriteFile(ctx context.Context, relativePath string, f fs.Fil
6969
return nil
7070
}
7171

72+
// FileExists implements restore.Output interface.
73+
func (o *ZipOutput) FileExists(ctx context.Context, relativePath string, l fs.File) bool {
74+
return false
75+
}
76+
7277
// CreateSymlink implements restore.Output interface.
7378
func (o *ZipOutput) CreateSymlink(ctx context.Context, relativePath string, e fs.Symlink) error {
7479
log(ctx).Debugf("create symlink not implemented yet")
7580
return nil
7681
}
7782

83+
// SymlinkExists implements restore.Output interface.
84+
func (o *ZipOutput) SymlinkExists(ctx context.Context, relativePath string, l fs.Symlink) bool {
85+
return false
86+
}
87+
7888
// NewZipOutput creates new zip writer output.
7989
func NewZipOutput(w io.WriteCloser, method uint16) *ZipOutput {
8090
return &ZipOutput{w, zip.NewWriter(w), method}

tests/end_to_end_test/restore_fail_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ func TestRestoreFail(t *testing.T) {
6464

6565
// Expect a subsequent restore to fail
6666
e.RunAndExpectFailure(t, "snapshot", "restore", snapID, targetDir)
67+
68+
// --ignore-errors allows the snapshot to succeed despite missing blob.
69+
e.RunAndExpectSuccess(t, "snapshot", "restore", "--ignore-errors", snapID, targetDir)
6770
}
6871

6972
func findPackBlob(blobIDs []string) string {

tests/end_to_end_test/restore_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import (
99
"io"
1010
"os"
1111
"path/filepath"
12+
"regexp"
1213
"runtime"
14+
"strconv"
1315
"testing"
1416
"time"
1517

@@ -88,6 +90,33 @@ func TestRestoreCommand(t *testing.T) {
8890
// Attempt to restore into a target directory that already exists
8991
e.RunAndExpectFailure(t, "restore", rootID, restoreDir, "--no-overwrite-directories")
9092

93+
// Very quick incremental restore where all files already exist.
94+
// Look for status output that indicates files were skipped.
95+
re := regexp.MustCompile(`Restored (\d+) files.*skipped (\d+) `)
96+
foundStatus := false
97+
lastFileCount := 0
98+
_, stderr := e.RunAndExpectSuccessWithErrOut(t, "restore", rootID, restoreDir, "--skip-existing")
99+
100+
for _, l := range stderr {
101+
if m := re.FindStringSubmatch(l); m != nil {
102+
fileCount, _ := strconv.Atoi(m[1])
103+
skippedCount, _ := strconv.Atoi(m[2])
104+
lastFileCount = fileCount
105+
106+
if fileCount == 0 && skippedCount > 0 {
107+
foundStatus = true
108+
}
109+
}
110+
}
111+
112+
if !foundStatus {
113+
t.Fatalf("expected status line indicating files were skipped, none found: %v", stderr)
114+
}
115+
116+
if lastFileCount != 0 {
117+
t.Fatalf("not all files were skipped: %v", stderr)
118+
}
119+
91120
// Attempt to restore into a target directory that already exists
92121
e.RunAndExpectFailure(t, "restore", rootID, restoreDir, "--no-overwrite-files")
93122
}

0 commit comments

Comments
 (0)