Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filestore): add mmap reader option #665

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes:

### Added

- add `WithMMapReader` option to `FileManager`

### Changed

- updated to go-libp2p to [v0.37.0](https://github.com/libp2p/go-libp2p/releases/tag/v0.37.0)
Expand Down
61 changes: 61 additions & 0 deletions filestore/filereader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package filestore

import (
"io"
"os"

"golang.org/x/exp/mmap"
)

type FileReader interface {
io.ReaderAt
io.Closer
}

var _ FileReader = (*stdReader)(nil)

type stdReader struct {
f *os.File
}

// ReadAt implements the FileReader interface.
func (r *stdReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.f.ReadAt(p, off)
}

// Close implements the FileReader interface.
func (r *stdReader) Close() error {
return r.f.Close()
}

func newStdReader(path string) (FileReader, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}

Check warning on line 35 in filestore/filereader.go

View check run for this annotation

Codecov / codecov/patch

filestore/filereader.go#L34-L35

Added lines #L34 - L35 were not covered by tests
return &stdReader{f: f}, nil
}

var _ FileReader = (*mmapReader)(nil)

type mmapReader struct {
m *mmap.ReaderAt
}

// ReadAt implements the FileReader interface.
func (r *mmapReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.m.ReadAt(p, off)
}

// Close implements the FileReader interface.
func (r *mmapReader) Close() error {
return r.m.Close()
}

func newMmapReader(path string) (FileReader, error) {
m, err := mmap.Open(path)
if err != nil {
return nil, err
}

Check warning on line 59 in filestore/filereader.go

View check run for this annotation

Codecov / codecov/patch

filestore/filereader.go#L58-L59

Added lines #L58 - L59 were not covered by tests
return &mmapReader{m: m}, nil
}
128 changes: 70 additions & 58 deletions filestore/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (

var bg = context.Background()

func newTestFilestore(t *testing.T) (string, *Filestore) {
func newTestFilestore(t *testing.T, option ...Option) (string, *Filestore) {
mds := ds.NewMapDatastore()

testdir, err := os.MkdirTemp("", "filestore-test")
if err != nil {
t.Fatal(err)
}
fm := NewFileManager(mds, testdir)
fm := NewFileManager(mds, testdir, option...)
fm.AllowFiles = true

bs := blockstore.NewBlockstore(mds)
Expand All @@ -48,62 +48,74 @@ func makeFile(dir string, data []byte) (string, error) {
}

func TestBasicFilestore(t *testing.T) {
dir, fs := newTestFilestore(t)

buf := make([]byte, 1000)
rand.Read(buf)

fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}

var cids []cid.Cid
for i := 0; i < 100; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}

err := fs.Put(bg, n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}

for i, c := range cids {
blk, err := fs.Get(bg, c)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
t.Fatal("data didnt match on the way out")
}
}

kch, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}

out := make(map[string]struct{})
for c := range kch {
out[c.KeyString()] = struct{}{}
}

if len(out) != len(cids) {
t.Fatal("mismatch in number of entries")
}

for _, c := range cids {
if _, ok := out[c.KeyString()]; !ok {
t.Fatal("missing cid: ", c)
}
cases := []struct {
name string
options []Option
}{
{"default", nil},
{"mmap", []Option{WithMMapReader()}},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
dir, fs := newTestFilestore(t, c.options...)

buf := make([]byte, 1000)
rand.Read(buf)

fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}

var cids []cid.Cid
for i := 0; i < 100; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}

err := fs.Put(bg, n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}

for i, c := range cids {
blk, err := fs.Get(bg, c)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
t.Fatal("data didnt match on the way out")
}
}

kch, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}

out := make(map[string]struct{})
for c := range kch {
out[c.KeyString()] = struct{}{}
}

if len(out) != len(cids) {
t.Fatal("mismatch in number of entries")
}

for _, c := range cids {
if _, ok := out[c.KeyString()]; !ok {
t.Fatal("missing cid: ", c)
}
}
})
}
}

Expand Down
37 changes: 28 additions & 9 deletions filestore/fsrefstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// FilestorePrefix identifies the key prefix for FileManager blocks.
var FilestorePrefix = ds.NewKey("filestore")

type Option func(*FileManager)

// FileManager is a blockstore implementation which stores special
// blocks FilestoreNode type. These nodes only contain a reference
// to the actual location of the block data in the filesystem
Expand All @@ -34,6 +36,7 @@ type FileManager struct {
AllowUrls bool
ds ds.Batching
root string
makeReader func(path string) (FileReader, error)
}

// CorruptReferenceError implements the error interface.
Expand All @@ -51,11 +54,32 @@ func (c CorruptReferenceError) Error() string {
return c.Err.Error()
}

// WithMMapReader sets the FileManager's reader factory to use memory-mapped file I/O.
// On Windows, when reading and writing to a file simultaneously, the system would consume
// a significant amount of memory due to caching. This memory usage is not reflected in
// the application but in the system. Using memory-mapped files (implemented with
// CreateFileMapping on Windows) avoids this issue.
func WithMMapReader() Option {
return func(f *FileManager) {
f.makeReader = newMmapReader
}
}

// NewFileManager initializes a new file manager with the given
// datastore and root. All FilestoreNodes paths are relative to the
// root path given here, which is prepended for any operations.
func NewFileManager(ds ds.Batching, root string) *FileManager {
return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root}
func NewFileManager(ds ds.Batching, root string, options ...Option) *FileManager {
f := &FileManager{
ds: dsns.Wrap(ds, FilestorePrefix),
root: root,
makeReader: newStdReader,
}

for _, option := range options {
option(f)
}

return f
}

// AllKeysChan returns a channel from which to read the keys stored in
Expand Down Expand Up @@ -175,21 +199,16 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er
p := filepath.FromSlash(d.GetFilePath())
abspath := filepath.Join(f.root, p)

fi, err := os.Open(abspath)
fi, err := f.makeReader(abspath)
if os.IsNotExist(err) {
return nil, &CorruptReferenceError{StatusFileNotFound, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
defer fi.Close()

_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}

outbuf := make([]byte, d.GetSize_())
_, err = io.ReadFull(fi, outbuf)
_, err = fi.ReadAt(outbuf, int64(d.GetOffset()))
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, &CorruptReferenceError{StatusFileChanged, err}
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ require (
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
Expand Down Expand Up @@ -183,7 +184,6 @@ require (
go.uber.org/fx v1.23.0 // indirect
go.uber.org/mock v0.5.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/text v0.19.0 // indirect
Expand Down