Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

arch-162-silo-storage-timeout-module #57

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/storage/expose/nbd_dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func BenchmarkDevReadNLLatency(mb *testing.B) {
mb.Run(name, func(b *testing.B) {

store := sources.NewMemoryStorage(diskSize)
lstore := modules.NewArtificialLatency(store, lts, 0, lts, 0)
lstore := modules.NewArtificialLatency(store, lts, 0, lts, 0, 0, 0)
// logstore := modules.NewLogger(lstore)
driver := modules.NewMetrics(lstore)

Expand Down
22 changes: 14 additions & 8 deletions pkg/storage/modules/artificial_latency.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
package modules

import (
"sync"
"time"

"github.com/loopholelabs/silo/pkg/storage"
)

/**
* Simple artificial latency for tests etc
* Adds a RWMutex for this, so that the added latency is within a lock
*
*/
type ArtificialLatency struct {
storage.ProviderWithEvents
lock sync.RWMutex
prov storage.Provider
latencyRead time.Duration
latencyWrite time.Duration
latencyReadPerByte time.Duration
latencyWritePerByte time.Duration
latencyFlush time.Duration
latencyClose time.Duration
}

// Relay events to embedded StorageProvider
Expand All @@ -28,19 +27,22 @@ func (i *ArtificialLatency) SendSiloEvent(eventType storage.EventType, eventData
return append(data, storage.SendSiloEvent(i.prov, eventType, eventData)...)
}

func NewArtificialLatency(prov storage.Provider, latencyRead time.Duration, latencyReadPerByte time.Duration, latencyWrite time.Duration, latencyWritePerByte time.Duration) *ArtificialLatency {
func NewArtificialLatency(prov storage.Provider,
latencyRead time.Duration, latencyReadPerByte time.Duration,
latencyWrite time.Duration, latencyWritePerByte time.Duration,
latencyFlush time.Duration, latencyClose time.Duration) *ArtificialLatency {
return &ArtificialLatency{
prov: prov,
latencyRead: latencyRead,
latencyWrite: latencyWrite,
latencyReadPerByte: latencyReadPerByte,
latencyWritePerByte: latencyWritePerByte,
latencyFlush: latencyFlush,
latencyClose: latencyClose,
}
}

func (i *ArtificialLatency) ReadAt(buffer []byte, offset int64) (int, error) {
i.lock.RLock()
defer i.lock.RUnlock()
if i.latencyRead != 0 {
time.Sleep(i.latencyRead)
}
Expand All @@ -51,8 +53,6 @@ func (i *ArtificialLatency) ReadAt(buffer []byte, offset int64) (int, error) {
}

func (i *ArtificialLatency) WriteAt(buffer []byte, offset int64) (int, error) {
i.lock.Lock()
defer i.lock.Unlock()
if i.latencyWrite != 0 {
time.Sleep(i.latencyWrite)
}
Expand All @@ -63,6 +63,9 @@ func (i *ArtificialLatency) WriteAt(buffer []byte, offset int64) (int, error) {
}

func (i *ArtificialLatency) Flush() error {
if i.latencyFlush != 0 {
time.Sleep(i.latencyFlush)
}
return i.prov.Flush()
}

Expand All @@ -71,6 +74,9 @@ func (i *ArtificialLatency) Size() uint64 {
}

func (i *ArtificialLatency) Close() error {
if i.latencyClose != 0 {
time.Sleep(i.latencyClose)
}
return i.prov.Close()
}

Expand Down
112 changes: 112 additions & 0 deletions pkg/storage/modules/timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package modules

import (
"errors"
"time"

"github.com/loopholelabs/silo/pkg/storage"
)

var (
ErrTimeout = errors.New("request timeout")
)

/**
* Add timeout functionality
*
*/
type Timeout struct {
storage.ProviderWithEvents
prov storage.Provider
timeoutRead time.Duration
timeoutWrite time.Duration
timeoutClose time.Duration
timeoutFlush time.Duration
}

func NewTimeout(prov storage.Provider, timeoutRead time.Duration, timeoutWrite time.Duration, timeoutClose time.Duration, timeoutFlush time.Duration) *Timeout {
return &Timeout{
prov: prov,
timeoutRead: timeoutRead,
timeoutWrite: timeoutWrite,
timeoutClose: timeoutClose,
timeoutFlush: timeoutFlush,
}
}

// Relay events to embedded StorageProvider
func (i *Timeout) SendSiloEvent(eventType storage.EventType, eventData storage.EventData) []storage.EventReturnData {
data := i.ProviderWithEvents.SendSiloEvent(eventType, eventData)
return append(data, storage.SendSiloEvent(i.prov, eventType, eventData)...)
}

type readWriteResult struct {
n int
err error
}

func (i *Timeout) ReadAt(buffer []byte, offset int64) (int, error) {
result := make(chan readWriteResult, 1)
go func() {
n, err := i.prov.ReadAt(buffer, offset)
result <- readWriteResult{n: n, err: err}
}()

select {
case <-time.After(i.timeoutRead):
return 0, ErrTimeout
case result := <-result:
return result.n, result.err
}
}

func (i *Timeout) WriteAt(buffer []byte, offset int64) (int, error) {
result := make(chan readWriteResult, 1)
go func() {
n, err := i.prov.WriteAt(buffer, offset)
result <- readWriteResult{n: n, err: err}
}()

select {
case <-time.After(i.timeoutWrite):
return 0, ErrTimeout
case result := <-result:
return result.n, result.err
}
}

func (i *Timeout) Flush() error {
result := make(chan error, 1)
go func() {
result <- i.prov.Flush()
}()

select {
case <-time.After(i.timeoutFlush):
return ErrTimeout
case result := <-result:
return result
}
}

func (i *Timeout) Size() uint64 {
return i.prov.Size()
}

func (i *Timeout) Close() error {
result := make(chan error, 1)
go func() {
result <- i.prov.Close()
}()

select {
case <-time.After(i.timeoutClose):
return ErrTimeout
case result := <-result:
return result
}
}

func (i *Timeout) CancelWrites(offset int64, length int64) {
i.prov.CancelWrites(offset, length)
}
46 changes: 46 additions & 0 deletions pkg/storage/modules/timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package modules_test

import (
"testing"
"time"

"github.com/loopholelabs/silo/pkg/storage/modules"
"github.com/loopholelabs/silo/pkg/storage/sources"
"github.com/stretchr/testify/assert"
)

func TestTimeout(t *testing.T) {
mem := sources.NewMemoryStorage(1024 * 1024)
timeout := modules.NewTimeout(mem, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond)

buffer := make([]byte, 1024)
_, err := timeout.ReadAt(buffer, 0)
assert.NoError(t, err)

_, err = timeout.WriteAt(buffer, 0)
assert.NoError(t, err)

err = timeout.Flush()
assert.NoError(t, err)

err = timeout.Close()
assert.NoError(t, err)

// Now with artificial latency, we expect these calls to fail with ErrTimeout

latency := modules.NewArtificialLatency(mem, 5*time.Second, 0, 5*time.Second, 0, 5*time.Second, 5*time.Second)
timeoutLatency := modules.NewTimeout(latency, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond, 100*time.Millisecond)

_, err = timeoutLatency.ReadAt(buffer, 0)
assert.ErrorIs(t, modules.ErrTimeout, err)

_, err = timeoutLatency.WriteAt(buffer, 0)
assert.ErrorIs(t, modules.ErrTimeout, err)

err = timeoutLatency.Flush()
assert.ErrorIs(t, modules.ErrTimeout, err)

err = timeoutLatency.Close()
assert.ErrorIs(t, modules.ErrTimeout, err)

}
2 changes: 1 addition & 1 deletion pkg/storage/storage_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestStorageEventsForModules(tt *testing.T) {
addModule(sourceDirtyLocal)
addModule(sourceDirtyRemote)

mod1 := modules.NewArtificialLatency(sourceDirtyLocal, 0, 0, 0, 0)
mod1 := modules.NewArtificialLatency(sourceDirtyLocal, 0, 0, 0, 0, 0, 0)
addModule(mod1)
mod2, err := modules.NewBinLog(mod1, "binlog_file")
assert.NoError(t, err)
Expand Down
Loading