Skip to content

Commit

Permalink
dg WaitingCache is now inserted in the correct place (#73)
Browse files Browse the repository at this point in the history
* dg WaitingCache is now inserted in the correct place

Signed-off-by: Jimmy Moore <[email protected]>

* lint fixes

Signed-off-by: Jimmy Moore <[email protected]>

---------

Signed-off-by: Jimmy Moore <[email protected]>
  • Loading branch information
jimmyaxod authored Jan 31, 2025
1 parent dedc200 commit 710eae4
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func runServe(_ *cobra.Command, _ []string) {
panic(err)
}

dg, err := devicegroup.NewFromSchema(siloConf.Device, log, siloMetrics)
dg, err := devicegroup.NewFromSchema(siloConf.Device, false, log, siloMetrics)
if err != nil {
panic(err)
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/storage/devicegroup/device_group_from.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/loopholelabs/silo/pkg/storage/metrics"
"github.com/loopholelabs/silo/pkg/storage/protocol"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/waitingcache"
)

func NewFromProtocol(ctx context.Context,
Expand Down Expand Up @@ -85,7 +84,7 @@ func NewFromProtocol(ctx context.Context,
devices[index-1] = ds
}

dg, err := NewFromSchema(devices, log, met)
dg, err := NewFromSchema(devices, true, log, met)
if err != nil {
return nil, err
}
Expand All @@ -101,13 +100,14 @@ func NewFromProtocol(ctx context.Context,
d := dg.devices[dev]
d.EventHandler = eventHandler

destStorageFactory := func(di *packets.DevInfo) storage.Provider {
d.WaitingCacheLocal, d.WaitingCacheRemote = waitingcache.NewWaitingCacheWithLogger(d.Prov, int(di.BlockSize), dg.log)

if d.Exp != nil {
d.Exp.SetProvider(d.WaitingCacheLocal)
}
destStorageFactory := func(_ *packets.DevInfo) storage.Provider {
/*
d.WaitingCacheLocal, d.WaitingCacheRemote = waitingcache.NewWaitingCacheWithLogger(d.Prov, int(di.BlockSize), dg.log)
if d.Exp != nil {
d.Exp.SetProvider(d.WaitingCacheLocal)
}
*/
return d.WaitingCacheRemote
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/devicegroup/device_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func setupDeviceGroup(t *testing.T) *DeviceGroup {
return nil
}
*/
dg, err := NewFromSchema(testDeviceSchema, nil, nil)
dg, err := NewFromSchema(testDeviceSchema, false, nil, nil)
assert.NoError(t, err)

t.Cleanup(func() {
Expand Down
34 changes: 22 additions & 12 deletions pkg/storage/devicegroup/device_group_to.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
"github.com/loopholelabs/silo/pkg/storage/protocol"
"github.com/loopholelabs/silo/pkg/storage/protocol/packets"
"github.com/loopholelabs/silo/pkg/storage/volatilitymonitor"
"github.com/loopholelabs/silo/pkg/storage/waitingcache"
)

func NewFromSchema(ds []*config.DeviceSchema, log types.Logger, met metrics.SiloMetrics) (*DeviceGroup, error) {
func NewFromSchema(ds []*config.DeviceSchema, createWC bool, log types.Logger, met metrics.SiloMetrics) (*DeviceGroup, error) {
dg := &DeviceGroup{
log: log,
met: met,
Expand All @@ -44,6 +45,13 @@ func NewFromSchema(ds []*config.DeviceSchema, log types.Logger, met metrics.Silo
blockSize = defaultBlockSize
}

var waitingCacheLocal *waitingcache.Local
var waitingCacheRemote *waitingcache.Remote
if createWC {
waitingCacheLocal, waitingCacheRemote = waitingcache.NewWaitingCacheWithLogger(prov, blockSize, dg.log)
prov = waitingCacheLocal
}

local := modules.NewLockable(prov)
mlocal := modules.NewMetrics(local)
dirtyLocal, dirtyRemote := dirtytracker.NewDirtyTracker(mlocal, blockSize)
Expand All @@ -68,17 +76,19 @@ func NewFromSchema(ds []*config.DeviceSchema, log types.Logger, met metrics.Silo
}

dg.devices = append(dg.devices, &DeviceInformation{
Size: local.Size(),
BlockSize: uint64(blockSize),
NumBlocks: totalBlocks,
Schema: s,
Prov: prov,
Storage: local,
Exp: exp,
Volatility: vmonitor,
DirtyLocal: dirtyLocal,
DirtyRemote: dirtyRemote,
Orderer: orderer,
Size: local.Size(),
BlockSize: uint64(blockSize),
NumBlocks: totalBlocks,
Schema: s,
Prov: prov,
Storage: local,
Exp: exp,
Volatility: vmonitor,
DirtyLocal: dirtyLocal,
DirtyRemote: dirtyRemote,
Orderer: orderer,
WaitingCacheLocal: waitingCacheLocal,
WaitingCacheRemote: waitingCacheRemote,
})

// Set these two at least, so we know *something* about every device in progress handler.
Expand Down

0 comments on commit 710eae4

Please sign in to comment.