Skip to content

Commit 3e76169

Browse files
authored
Support for stdin streams (kopia#862)
* Add StreamingFile interface * unit test for virtualfs * CLI: Snapshot create support for stdin sources * Uploader support for fs.StreamingFile * End to end test for stdin source snapshot * upload test to improve coverage
1 parent 9b3cae7 commit 3e76169

File tree

9 files changed

+464
-6
lines changed

9 files changed

+464
-6
lines changed

cli/command_snapshot_create.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package cli
22

33
import (
44
"context"
5+
"os"
56
"path/filepath"
67
"strings"
78
"time"
89

910
"github.com/pkg/errors"
1011

12+
"github.com/kopia/kopia/fs"
13+
"github.com/kopia/kopia/fs/virtualfs"
1114
"github.com/kopia/kopia/repo"
1215
"github.com/kopia/kopia/snapshot"
1316
"github.com/kopia/kopia/snapshot/policy"
@@ -22,7 +25,7 @@ const (
2225
var (
2326
snapshotCreateCommand = snapshotCommands.Command("create", "Creates a snapshot of local directory or file.").Default()
2427

25-
snapshotCreateSources = snapshotCreateCommand.Arg("source", "Files or directories to create snapshot(s) of.").ExistingFilesOrDirs()
28+
snapshotCreateSources = snapshotCreateCommand.Arg("source", "Files or directories to create snapshot(s) of.").Strings()
2629
snapshotCreateAll = snapshotCreateCommand.Flag("all", "Create snapshots for files or directories previously backed up by this user on this computer").Bool()
2730
snapshotCreateCheckpointUploadLimitMB = snapshotCreateCommand.Flag("upload-limit-mb", "Stop the backup process after the specified amount of data (in MB) has been uploaded.").PlaceHolder("MB").Default("0").Int64()
2831
snapshotCreateCheckpointInterval = snapshotCreateCommand.Flag("checkpoint-interval", "Frequency for creating periodic checkpoint.").Duration()
@@ -34,6 +37,7 @@ var (
3437
snapshotCreateEndTime = snapshotCreateCommand.Flag("end-time", "Override snapshot end timestamp.").String()
3538
snapshotCreateForceEnableActions = snapshotCreateCommand.Flag("force-enable-actions", "Enable snapshot actions even if globally disabled on this client").Hidden().Bool()
3639
snapshotCreateForceDisableActions = snapshotCreateCommand.Flag("force-disable-actions", "Disable snapshot actions even if globally enabled on this client").Hidden().Bool()
40+
snapshotCreateStdinFileName = snapshotCreateCommand.Flag("stdin-file", "File path to be used for stdin data snapshot.").String()
3741
)
3842

3943
func runSnapshotCommand(ctx context.Context, rep repo.RepositoryWriter) error {
@@ -163,9 +167,24 @@ func startTimeAfterEndTime(startTime, endTime time.Time) bool {
163167
func snapshotSingleSource(ctx context.Context, rep repo.RepositoryWriter, u *snapshotfs.Uploader, sourceInfo snapshot.SourceInfo) error {
164168
log(ctx).Infof("Snapshotting %v ...", sourceInfo)
165169

166-
localEntry, err := getLocalFSEntry(ctx, sourceInfo.Path)
167-
if err != nil {
168-
return errors.Wrap(err, "unable to get local filesystem entry")
170+
var (
171+
err error
172+
fsEntry fs.Entry
173+
setManual bool
174+
)
175+
176+
if *snapshotCreateStdinFileName != "" {
177+
// stdin source will be snapshotted using a virtual static root directory with a single streaming file entry
178+
// Create a new static directory with the given name and add a streaming file entry with os.Stdin reader
179+
fsEntry = virtualfs.NewStaticDirectory(sourceInfo.Path, fs.Entries{
180+
virtualfs.StreamingFileFromReader(*snapshotCreateStdinFileName, os.Stdin),
181+
})
182+
setManual = true
183+
} else {
184+
fsEntry, err = getLocalFSEntry(ctx, sourceInfo.Path)
185+
if err != nil {
186+
return errors.Wrap(err, "unable to get local filesystem entry")
187+
}
169188
}
170189

171190
previous, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, nil)
@@ -180,7 +199,7 @@ func snapshotSingleSource(ctx context.Context, rep repo.RepositoryWriter, u *sna
180199

181200
log(ctx).Debugf("uploading %v using %v previous manifests", sourceInfo, len(previous))
182201

183-
manifest, err := u.Upload(ctx, localEntry, policyTree, sourceInfo, previous...)
202+
manifest, err := u.Upload(ctx, fsEntry, policyTree, sourceInfo, previous...)
184203
if err != nil {
185204
// fail-fast uploads will fail here without recording a manifest, other uploads will
186205
// possibly fail later.
@@ -218,6 +237,12 @@ func snapshotSingleSource(ctx context.Context, rep repo.RepositoryWriter, u *sna
218237
return errors.Wrap(err, "unable to apply retention policy")
219238
}
220239

240+
if setManual {
241+
if err = policy.SetManual(ctx, rep, sourceInfo); err != nil {
242+
return errors.Wrap(err, "unable to set manual field in scheduling policy for source")
243+
}
244+
}
245+
221246
if ferr := rep.Flush(ctx); ferr != nil {
222247
return errors.Wrap(ferr, "flush error")
223248
}

fs/entry.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ type File interface {
5050
Open(ctx context.Context) (Reader, error)
5151
}
5252

53+
// StreamingFile represents an entry that is a stream.
54+
type StreamingFile interface {
55+
Entry
56+
GetReader(ctx context.Context) (io.Reader, error)
57+
}
58+
5359
// Directory represents contents of a directory.
5460
type Directory interface {
5561
Entry

fs/virtualfs/virtualfs.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Package virtualfs implements an in-memory abstraction of fs.Directory and fs.StreamingFile.
2+
package virtualfs
3+
4+
import (
5+
"context"
6+
"errors"
7+
"io"
8+
"os"
9+
"time"
10+
11+
"github.com/kopia/kopia/fs"
12+
)
13+
14+
const (
15+
defaultPermissions os.FileMode = 0777
16+
)
17+
18+
// virtualEntry is an in-memory implementation of a directory entry.
19+
type virtualEntry struct {
20+
name string
21+
mode os.FileMode
22+
size int64
23+
modTime time.Time
24+
owner fs.OwnerInfo
25+
device fs.DeviceInfo
26+
}
27+
28+
func (e *virtualEntry) Name() string {
29+
return e.name
30+
}
31+
32+
func (e *virtualEntry) IsDir() bool {
33+
return e.mode.IsDir()
34+
}
35+
36+
func (e *virtualEntry) Mode() os.FileMode {
37+
return e.mode
38+
}
39+
40+
func (e *virtualEntry) ModTime() time.Time {
41+
return e.modTime
42+
}
43+
44+
func (e *virtualEntry) Size() int64 {
45+
return e.size
46+
}
47+
48+
func (e *virtualEntry) Sys() interface{} {
49+
return nil
50+
}
51+
52+
func (e *virtualEntry) Owner() fs.OwnerInfo {
53+
return e.owner
54+
}
55+
56+
func (e *virtualEntry) Device() fs.DeviceInfo {
57+
return e.device
58+
}
59+
60+
func (e *virtualEntry) LocalFilesystemPath() string {
61+
return ""
62+
}
63+
64+
// staticDirectory is an in-memory implementation of fs.Directory.
65+
type staticDirectory struct {
66+
virtualEntry
67+
entries fs.Entries
68+
}
69+
70+
// Child gets the named child of a directory.
71+
func (sd *staticDirectory) Child(ctx context.Context, name string) (fs.Entry, error) {
72+
return fs.ReadDirAndFindChild(ctx, sd, name)
73+
}
74+
75+
// Readdir gets the contents of a directory.
76+
func (sd *staticDirectory) Readdir(ctx context.Context) (fs.Entries, error) {
77+
return append(fs.Entries(nil), sd.entries...), nil
78+
}
79+
80+
// NewStaticDirectory returns a virtual static directory.
81+
func NewStaticDirectory(name string, entries fs.Entries) fs.Directory {
82+
return &staticDirectory{
83+
virtualEntry: virtualEntry{
84+
name: name,
85+
mode: defaultPermissions | os.ModeDir,
86+
},
87+
entries: entries,
88+
}
89+
}
90+
91+
// virtualFile is an implementation of fs.StreamingFile with an io.Reader.
92+
type virtualFile struct {
93+
virtualEntry
94+
reader io.Reader
95+
}
96+
97+
var errReaderAlreadyUsed = errors.New("cannot use streaming file reader more than once")
98+
99+
// GetReader returns the streaming file's reader.
100+
// Note: Caller of this function has to ensure concurrency safety.
101+
// The file's reader is set to nil after the first call.
102+
func (vf *virtualFile) GetReader(ctx context.Context) (io.Reader, error) {
103+
if vf.reader == nil {
104+
return nil, errReaderAlreadyUsed
105+
}
106+
107+
// reader must be fetched only once
108+
ret := vf.reader
109+
vf.reader = nil
110+
111+
return ret, nil
112+
}
113+
114+
// StreamingFileFromReader returns a streaming file with given name and reader.
115+
func StreamingFileFromReader(name string, reader io.Reader) fs.StreamingFile {
116+
return &virtualFile{
117+
virtualEntry: virtualEntry{
118+
name: name,
119+
mode: defaultPermissions,
120+
},
121+
reader: reader,
122+
}
123+
}
124+
125+
var (
126+
_ fs.Directory = &staticDirectory{}
127+
_ fs.StreamingFile = &virtualFile{}
128+
_ fs.Entry = &virtualEntry{}
129+
)

fs/virtualfs/virtualfs_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package virtualfs
2+
3+
import (
4+
"context"
5+
"errors"
6+
"os"
7+
"reflect"
8+
"testing"
9+
10+
"github.com/kopia/kopia/fs"
11+
)
12+
13+
func TestStreamingFile(t *testing.T) {
14+
// Create a temporary file with test data
15+
content := []byte("Temporary file content")
16+
17+
r, w, err := os.Pipe()
18+
if err != nil {
19+
t.Fatalf("error creating pipe file: %v", err)
20+
}
21+
22+
if _, err = w.Write(content); err != nil {
23+
t.Fatalf("error writing to pipe file: %v", err)
24+
}
25+
26+
w.Close()
27+
28+
filename := "stream-file"
29+
f := StreamingFileFromReader(filename, r)
30+
31+
rootDir := NewStaticDirectory("root", fs.Entries{f})
32+
33+
e, err := rootDir.Child(context.TODO(), filename)
34+
if err != nil {
35+
t.Fatalf("error getting child entry: %v", err)
36+
}
37+
38+
if e.Name() != filename {
39+
t.Fatalf("did not get expected filename: (actual) %v != %v (expected)", e.Name(), filename)
40+
}
41+
42+
entries, err := rootDir.Readdir(context.TODO())
43+
if err != nil {
44+
t.Fatalf("error getting dir entries %v", err)
45+
}
46+
47+
if len(entries) == 0 {
48+
t.Errorf("expected directory with 1 entry, got %v", rootDir)
49+
}
50+
51+
// Read and compare data
52+
reader, err := f.GetReader(context.TODO())
53+
if err != nil {
54+
t.Fatalf("error getting streaming file reader: %v", err)
55+
}
56+
57+
result := make([]byte, len(content))
58+
59+
if _, err = reader.Read(result); err != nil {
60+
t.Fatalf("error reading streaming file: %v", err)
61+
}
62+
63+
if !reflect.DeepEqual(result, content) {
64+
t.Fatalf("did not get expected file content: (actual) %v != %v (expected)", result, content)
65+
}
66+
}
67+
68+
func TestStreamingFileGetReader(t *testing.T) {
69+
// Create a temporary file with test data
70+
content := []byte("Temporary file content")
71+
72+
r, w, err := os.Pipe()
73+
if err != nil {
74+
t.Fatalf("error creating pipe file: %v", err)
75+
}
76+
77+
if _, err = w.Write(content); err != nil {
78+
t.Fatalf("error writing to pipe file: %v", err)
79+
}
80+
81+
w.Close()
82+
83+
filename := "stream-file"
84+
f := StreamingFileFromReader(filename, r)
85+
86+
// Read and compare data
87+
reader, err := f.GetReader(context.TODO())
88+
if err != nil {
89+
t.Fatalf("error getting streaming file reader: %v", err)
90+
}
91+
92+
result := make([]byte, len(content))
93+
94+
if _, err = reader.Read(result); err != nil {
95+
t.Fatalf("error reading streaming file: %v", err)
96+
}
97+
98+
if !reflect.DeepEqual(result, content) {
99+
t.Fatalf("did not get expected file content: (actual) %v != %v (expected)", result, content)
100+
}
101+
102+
// Second call to GetReader must fail
103+
_, err = f.GetReader(context.TODO())
104+
if err == nil {
105+
t.Fatal("expected error, got none")
106+
}
107+
108+
if !errors.Is(err, errReaderAlreadyUsed) {
109+
t.Fatalf("did not get expected error: (actual) %v != %v (expected)", err, errReaderAlreadyUsed)
110+
}
111+
}

0 commit comments

Comments
 (0)