From e6a38d4bac5f5c0697a4081d938780399b374c96 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Thu, 24 Oct 2024 12:16:51 +0100 Subject: [PATCH 1/2] Initial implementation Signed-off-by: Jimmy Moore --- pkg/storage/expose/nbd_dev_test.go | 2 +- pkg/storage/modules/artificial_latency.go | 22 +++-- pkg/storage/modules/timeout.go | 112 ++++++++++++++++++++++ pkg/storage/modules/timeout_test.go | 46 +++++++++ pkg/storage/storage_events_test.go | 2 +- 5 files changed, 174 insertions(+), 10 deletions(-) create mode 100644 pkg/storage/modules/timeout.go create mode 100644 pkg/storage/modules/timeout_test.go diff --git a/pkg/storage/expose/nbd_dev_test.go b/pkg/storage/expose/nbd_dev_test.go index 1240e611..26992f65 100644 --- a/pkg/storage/expose/nbd_dev_test.go +++ b/pkg/storage/expose/nbd_dev_test.go @@ -146,7 +146,7 @@ func BenchmarkDevReadNLLatency(mb *testing.B) { mb.Run(name, func(b *testing.B) { store := sources.NewMemoryStorage(diskSize) - lstore := modules.NewArtificialLatency(store, lts, 0, lts, 0) + lstore := modules.NewArtificialLatency(store, lts, 0, lts, 0, 0, 0) // logstore := modules.NewLogger(lstore) driver := modules.NewMetrics(lstore) diff --git a/pkg/storage/modules/artificial_latency.go b/pkg/storage/modules/artificial_latency.go index 0f14f73a..74371b7b 100644 --- a/pkg/storage/modules/artificial_latency.go +++ b/pkg/storage/modules/artificial_latency.go @@ -1,7 +1,6 @@ package modules import ( - "sync" "time" "github.com/loopholelabs/silo/pkg/storage" @@ -9,17 +8,17 @@ import ( /** * Simple artificial latency for tests etc - * Adds a RWMutex for this, so that the added latency is within a lock * */ type ArtificialLatency struct { storage.ProviderWithEvents - lock sync.RWMutex prov storage.Provider latencyRead time.Duration latencyWrite time.Duration latencyReadPerByte time.Duration latencyWritePerByte time.Duration + latencyFlush time.Duration + latencyClose time.Duration } // Relay events to embedded StorageProvider @@ -28,19 +27,22 @@ func (i *ArtificialLatency) SendSiloEvent(eventType storage.EventType, eventData return append(data, storage.SendSiloEvent(i.prov, eventType, eventData)...) } -func NewArtificialLatency(prov storage.Provider, latencyRead time.Duration, latencyReadPerByte time.Duration, latencyWrite time.Duration, latencyWritePerByte time.Duration) *ArtificialLatency { +func NewArtificialLatency(prov storage.Provider, + latencyRead time.Duration, latencyReadPerByte time.Duration, + latencyWrite time.Duration, latencyWritePerByte time.Duration, + latencyFlush time.Duration, latencyClose time.Duration) *ArtificialLatency { return &ArtificialLatency{ prov: prov, latencyRead: latencyRead, latencyWrite: latencyWrite, latencyReadPerByte: latencyReadPerByte, latencyWritePerByte: latencyWritePerByte, + latencyFlush: latencyFlush, + latencyClose: latencyClose, } } func (i *ArtificialLatency) ReadAt(buffer []byte, offset int64) (int, error) { - i.lock.RLock() - defer i.lock.RUnlock() if i.latencyRead != 0 { time.Sleep(i.latencyRead) } @@ -51,8 +53,6 @@ func (i *ArtificialLatency) ReadAt(buffer []byte, offset int64) (int, error) { } func (i *ArtificialLatency) WriteAt(buffer []byte, offset int64) (int, error) { - i.lock.Lock() - defer i.lock.Unlock() if i.latencyWrite != 0 { time.Sleep(i.latencyWrite) } @@ -63,6 +63,9 @@ func (i *ArtificialLatency) WriteAt(buffer []byte, offset int64) (int, error) { } func (i *ArtificialLatency) Flush() error { + if i.latencyFlush != 0 { + time.Sleep(i.latencyFlush) + } return i.prov.Flush() } @@ -71,6 +74,9 @@ func (i *ArtificialLatency) Size() uint64 { } func (i *ArtificialLatency) Close() error { + if i.latencyClose != 0 { + time.Sleep(i.latencyClose) + } return i.prov.Close() } diff --git a/pkg/storage/modules/timeout.go b/pkg/storage/modules/timeout.go new file mode 100644 index 00000000..85a2b142 --- /dev/null +++ b/pkg/storage/modules/timeout.go @@ -0,0 +1,112 @@ +package modules + +import ( + "errors" + "time" + + "github.com/loopholelabs/silo/pkg/storage" +) + +var ( + ErrTimeout = errors.New("request timeout") +) + +/** + * Add timeout functionality + * + */ +type Timeout struct { + storage.ProviderWithEvents + prov storage.Provider + timeoutRead time.Duration + timeoutWrite time.Duration + timeoutClose time.Duration + timeoutFlush time.Duration +} + +func NewTimeout(prov storage.Provider, timeoutRead time.Duration, timeoutWrite time.Duration, timeoutClose time.Duration, timeoutFlush time.Duration) *Timeout { + return &Timeout{ + prov: prov, + timeoutRead: timeoutRead, + timeoutWrite: timeoutWrite, + timeoutClose: timeoutClose, + timeoutFlush: timeoutFlush, + } +} + +// Relay events to embedded StorageProvider +func (i *Timeout) SendSiloEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.ProviderWithEvents.SendSiloEvent(event_type, event_data) + return append(data, storage.SendSiloEvent(i.prov, event_type, event_data)...) +} + +type readWriteResult struct { + n int + err error +} + +func (i *Timeout) ReadAt(buffer []byte, offset int64) (int, error) { + result := make(chan readWriteResult, 1) + go func() { + n, err := i.prov.ReadAt(buffer, offset) + result <- readWriteResult{n: n, err: err} + }() + + select { + case <-time.After(i.timeoutRead): + return 0, ErrTimeout + case result := <-result: + return result.n, result.err + } +} + +func (i *Timeout) WriteAt(buffer []byte, offset int64) (int, error) { + result := make(chan readWriteResult, 1) + go func() { + n, err := i.prov.WriteAt(buffer, offset) + result <- readWriteResult{n: n, err: err} + }() + + select { + case <-time.After(i.timeoutWrite): + return 0, ErrTimeout + case result := <-result: + return result.n, result.err + } +} + +func (i *Timeout) Flush() error { + result := make(chan error, 1) + go func() { + result <- i.prov.Flush() + }() + + select { + case <-time.After(i.timeoutFlush): + return ErrTimeout + case result := <-result: + return result + } +} + +func (i *Timeout) Size() uint64 { + return i.prov.Size() +} + +func (i *Timeout) Close() error { + result := make(chan error, 1) + go func() { + result <- i.prov.Close() + }() + + select { + case <-time.After(i.timeoutClose): + return ErrTimeout + case result := <-result: + return result + } +} + +func (i *Timeout) CancelWrites(offset int64, length int64) { + i.prov.CancelWrites(offset, length) +} diff --git a/pkg/storage/modules/timeout_test.go b/pkg/storage/modules/timeout_test.go new file mode 100644 index 00000000..b5b1d736 --- /dev/null +++ b/pkg/storage/modules/timeout_test.go @@ -0,0 +1,46 @@ +package modules_test + +import ( + "testing" + "time" + + "github.com/loopholelabs/silo/pkg/storage/modules" + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/stretchr/testify/assert" +) + +func TestTimeout(t *testing.T) { + mem := sources.NewMemoryStorage(1024 * 1024) + timeout := modules.NewTimeout(mem, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond) + + buffer := make([]byte, 1024) + _, err := timeout.ReadAt(buffer, 0) + assert.NoError(t, err) + + _, err = timeout.WriteAt(buffer, 0) + assert.NoError(t, err) + + err = timeout.Flush() + assert.NoError(t, err) + + err = timeout.Close() + assert.NoError(t, err) + + // Now with artificial latency, we expect these calls to fail with ErrTimeout + + latency := modules.NewArtificialLatency(mem, 5*time.Second, 0, 5*time.Second, 0, 5*time.Second, 5*time.Second) + timeoutLatency := modules.NewTimeout(latency, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond) + + _, err = timeoutLatency.ReadAt(buffer, 0) + assert.ErrorIs(t, modules.ErrTimeout, err) + + _, err = timeoutLatency.WriteAt(buffer, 0) + assert.ErrorIs(t, modules.ErrTimeout, err) + + err = timeoutLatency.Flush() + assert.ErrorIs(t, modules.ErrTimeout, err) + + err = timeoutLatency.Close() + assert.ErrorIs(t, modules.ErrTimeout, err) + +} diff --git a/pkg/storage/storage_events_test.go b/pkg/storage/storage_events_test.go index 4bb16958..9e3ffbe6 100644 --- a/pkg/storage/storage_events_test.go +++ b/pkg/storage/storage_events_test.go @@ -208,7 +208,7 @@ func TestStorageEventsForModules(tt *testing.T) { addModule(sourceDirtyLocal) addModule(sourceDirtyRemote) - mod1 := modules.NewArtificialLatency(sourceDirtyLocal, 0, 0, 0, 0) + mod1 := modules.NewArtificialLatency(sourceDirtyLocal, 0, 0, 0, 0, 0, 0) addModule(mod1) mod2, err := modules.NewBinLog(mod1, "binlog_file") assert.NoError(t, err) From 12307646cdc9def177d7d217f13ec203551724b8 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Wed, 20 Nov 2024 17:32:07 +0000 Subject: [PATCH 2/2] Lint Signed-off-by: Jimmy Moore --- pkg/storage/modules/timeout.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/storage/modules/timeout.go b/pkg/storage/modules/timeout.go index 85a2b142..9479cbdb 100644 --- a/pkg/storage/modules/timeout.go +++ b/pkg/storage/modules/timeout.go @@ -35,9 +35,9 @@ func NewTimeout(prov storage.Provider, timeoutRead time.Duration, timeoutWrite t } // Relay events to embedded StorageProvider -func (i *Timeout) SendSiloEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { - data := i.ProviderWithEvents.SendSiloEvent(event_type, event_data) - return append(data, storage.SendSiloEvent(i.prov, event_type, event_data)...) +func (i *Timeout) SendSiloEvent(eventType storage.EventType, eventData storage.EventData) []storage.EventReturnData { + data := i.ProviderWithEvents.SendSiloEvent(eventType, eventData) + return append(data, storage.SendSiloEvent(i.prov, eventType, eventData)...) } type readWriteResult struct {