Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update lassie to sync Retriever (before provider rewrite revert) #167

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions autoretrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"github.com/application-research/autoretrieve/paychannelmanager"
lassieclient "github.com/filecoin-project/lassie/pkg/client"
lassieeventrecorder "github.com/filecoin-project/lassie/pkg/eventrecorder"
"github.com/filecoin-project/lassie/pkg/indexerlookup"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
rpcstmgr "github.com/filecoin-project/lotus/chain/stmgr/rpc"
"github.com/filecoin-project/lotus/chain/wallet"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/paychmgr"
"github.com/ipfs/go-cid"
ipfsdatastore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
flatfs "github.com/ipfs/go-ds-flatfs"
Expand Down Expand Up @@ -154,27 +154,23 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) {
// Initialize Filecoin retriever
var retriever *lassieretriever.Retriever
if !cfg.DisableRetrieval {
var ep lassieretriever.Endpoint
var candidateFinder lassieretriever.CandidateFinder
switch cfg.LookupEndpointType {
case EndpointTypeEstuary:
logger.Infof("Using Estuary endpoint type")
ep = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter)
logger.Infof("Using Estuary candidate finder type")
candidateFinder = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter)
case EndpointTypeIndexer:
logger.Infof("Using indexer endpoint type")
ep = endpoint.NewIndexerEndpoint(cfg.LookupEndpointURL)
logger.Infof("Using indexer candidate finder type")
candidateFinder = indexerlookup.NewCandidateFinder(cfg.LookupEndpointURL)
default:
return nil, errors.New("unrecognized endpoint type")
return nil, errors.New("unrecognized candidate finder type")
}

retrieverCfg, err := cfg.ExtractFilecoinRetrieverConfig(cctx.Context, minerPeerGetter)
if err != nil {
return nil, err
}

confirmer := func(c cid.Cid) (bool, error) {
return blockManager.Has(cctx.Context, c)
}

// Instantiate client
retrievalClient, err := lassieclient.NewClient(
blockstore,
Expand All @@ -190,17 +186,19 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) {
return nil, err
}

retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, ep, confirmer)
retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, candidateFinder)
if err != nil {
return nil, err
}
<-retriever.Start()
if cfg.EventRecorderEndpointURL != "" {
logger.Infof("Reporting retrieval events to %v", cfg.EventRecorderEndpointURL)
eventRecorderEndpointAuthorization, err := loadEventRecorderAuth(dataDirPath(cctx))
if err != nil {
return nil, err
}
retriever.RegisterListener(lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization))
eventRecorder := lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization)
retriever.RegisterSubscriber(eventRecorder.RecordEvent)
}
}

Expand Down
153 changes: 93 additions & 60 deletions bitswap/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (

"github.com/application-research/autoretrieve/blocks"
"github.com/application-research/autoretrieve/metrics"
"github.com/dustin/go-humanize"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-bitswap/message"
bitswap_message_pb "github.com/ipfs/go-bitswap/message/pb"
"github.com/ipfs/go-bitswap/network"
Expand Down Expand Up @@ -66,13 +68,16 @@ type Provider struct {
retriever *lassieretriever.Retriever

// Incoming messages to be processed - work is 1 per message
requestQueue *peertaskqueue.PeerTaskQueue
requestQueue *peertaskqueue.PeerTaskQueue
requestQueueSignalChan chan struct{}

// Outgoing messages to be sent - work is size of messages in bytes
responseQueue *peertaskqueue.PeerTaskQueue
responseQueue *peertaskqueue.PeerTaskQueue
responseQueueSignalChan chan struct{}

// CIDs that need to be retrieved - work is 1 per CID queued
retrievalQueue *peertaskqueue.PeerTaskQueue
retrievalQueue *peertaskqueue.PeerTaskQueue
retrievalQueueSignalChan chan struct{}
}

type overwriteTaskMerger struct{}
Expand Down Expand Up @@ -121,27 +126,30 @@ func NewProvider(
}

provider := &Provider{
config: config,
network: network.NewFromIpfsHost(host, routing),
blockManager: blockManager,
retriever: retriever,
requestQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)),
responseQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)),
retrievalQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)),
config: config,
network: network.NewFromIpfsHost(host, routing),
blockManager: blockManager,
retriever: retriever,
requestQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)),
requestQueueSignalChan: make(chan struct{}, 10),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so here's my recommendation for these signals:

  1. make them buffer 1
  2. when writing, call:
select {
   case provider.requestQueueSignalChan <- struct{}{}:
   default:
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, nice, so if it blocks then bail, I didn't think of that!

responseQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)),
responseQueueSignalChan: make(chan struct{}, 10),
retrievalQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)),
retrievalQueueSignalChan: make(chan struct{}, 10),
}

provider.network.Start(provider)

for i := 0; i < int(config.RequestWorkers); i++ {
go provider.handleRequests()
go provider.handleRequests(ctx)
}

for i := 0; i < int(config.ResponseWorkers); i++ {
go provider.handleResponses()
go provider.handleResponses(ctx)
}

for i := 0; i < int(config.RetrievalWorkers); i++ {
go provider.handleRetrievals()
go provider.handleRetrievals(ctx)
}

return provider, nil
Expand All @@ -158,15 +166,18 @@ func (provider *Provider) ReceiveMessage(ctx context.Context, sender peer.ID, in
})
}
provider.requestQueue.PushTasks(sender, tasks...)
provider.requestQueueSignalChan <- struct{}{}
}

func (provider *Provider) handleRequests() {
ctx := context.Background()

for {
func (provider *Provider) handleRequests(ctx context.Context) {
for ctx.Err() == nil {
peerID, tasks, _ := provider.requestQueue.PopTasks(100)
if len(tasks) == 0 {
time.Sleep(time.Millisecond * 250)
select {
case <-ctx.Done():
case <-time.After(time.Millisecond * 250):
case <-provider.requestQueueSignalChan:
}
continue
}

Expand Down Expand Up @@ -252,17 +263,20 @@ func (provider *Provider) handleRequest(
Priority: int(entry.Priority),
Work: 1,
})
provider.retrievalQueueSignalChan <- struct{}{}

return nil
}

func (provider *Provider) handleResponses() {
ctx := context.Background()

for {
func (provider *Provider) handleResponses(ctx context.Context) {
for ctx.Err() == nil {
peerID, tasks, _ := provider.responseQueue.PopTasks(targetMessageSize)
if len(tasks) == 0 {
time.Sleep(time.Millisecond * 250)
select {
case <-ctx.Done():
case <-time.After(time.Millisecond * 250):
case <-provider.responseQueueSignalChan:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when len(tasks) != 0, you had better still optionally drain the signal chan, i.e

if len(tasks == 0) {
   ///...
   continue
}

select {
case <-provider.responseQueueSignalChan:
default:
}
///...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't be surprised if you're getting stuck cause of this.

}
continue
}

Expand Down Expand Up @@ -291,15 +305,15 @@ func (provider *Provider) handleResponses() {
log.Debugf("Sending have for %s", cid)

// Response metric
ctx, _ = tag.New(ctx, tag.Insert(metrics.BitswapTopic, "HAVE"))
stats.Record(ctx, metrics.BitswapResponseCount.M(1))
taggedCtx, _ := tag.New(ctx, tag.Insert(metrics.BitswapTopic, "HAVE"))
stats.Record(taggedCtx, metrics.BitswapResponseCount.M(1))
case actionSendDontHave:
msg.AddDontHave(cid)
log.Debugf("Sending dont have for %s", cid)

// Response metric
ctx, _ = tag.New(ctx, tag.Insert(metrics.BitswapTopic, "DONT_HAVE"), tag.Insert(metrics.BitswapDontHaveReason, data.reason))
stats.Record(ctx, metrics.BitswapResponseCount.M(1))
taggedCtx, _ := tag.New(ctx, tag.Insert(metrics.BitswapTopic, "DONT_HAVE"), tag.Insert(metrics.BitswapDontHaveReason, data.reason))
stats.Record(taggedCtx, metrics.BitswapResponseCount.M(1))
case actionSendBlock:
block, err := provider.blockManager.Get(ctx, cid)
if err != nil {
Expand All @@ -310,8 +324,8 @@ func (provider *Provider) handleResponses() {
log.Debugf("Sending block for %s", cid)

// Response metric
ctx, _ = tag.New(ctx, tag.Insert(metrics.BitswapTopic, "BLOCK"))
stats.Record(ctx, metrics.BitswapResponseCount.M(1))
taggedCtx, _ := tag.New(ctx, tag.Insert(metrics.BitswapTopic, "BLOCK"))
stats.Record(taggedCtx, metrics.BitswapResponseCount.M(1))
}
}

Expand All @@ -325,13 +339,15 @@ func (provider *Provider) handleResponses() {
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we calling TasksDone twice when an error occurs sending a message?

}

func (provider *Provider) handleRetrievals() {
ctx := context.Background()

for {
func (provider *Provider) handleRetrievals(ctx context.Context) {
for ctx.Err() == nil {
peerID, tasks, _ := provider.retrievalQueue.PopTasks(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that retreival is synchronous, this appears to limit things to one retrieval per worker queue, no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, default is 8 workers, but I hadn't seen a reason to increase this (yet) because it appears the pipe of incoming requests is so small; but maybe I'm not seeing it right

if len(tasks) == 0 {
time.Sleep(time.Millisecond * 250)
select {
case <-ctx.Done():
case <-time.After(time.Millisecond * 250):
case <-provider.retrievalQueueSignalChan:
}
continue
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, better drain the signal queue

Expand All @@ -344,38 +360,52 @@ func (provider *Provider) handleRetrievals() {
continue
}

log.Debugf("Requesting retrieval for %s", cid)
retrievalId, err := types.NewRetrievalID()
if err != nil {
log.Errorf("Failed to create retrieval ID: %s", err.Error())
}

log.Debugf("Starting retrieval for %s (%s)", cid, retrievalId)

// Start a background blockstore fetch with a callback to send the block
// to the peer once it's available.
blockCtx, blockCancel := context.WithCancel(ctx)
if provider.blockManager.AwaitBlock(blockCtx, cid, func(block blocks.Block, err error) {
if err != nil {
log.Debugf("Async block load failed: %s", err)
provider.queueSendDontHave(peerID, task.Priority, cid, "failed_block_load")
} else {
log.Debugf("Async block load completed: %s", cid)
provider.queueSendBlock(peerID, task.Priority, cid, block.Size)
}
blockCancel()
}) {
// If the block was already in the blockstore then we don't need to
// start a retrieval.
continue
}

// Try to start a new retrieval (if it's already running then no
// need to error, just continue on to await block)
if err := provider.retriever.Request(cid); err != nil {
if !errors.As(err, &lassieretriever.ErrRetrievalAlreadyRunning{}) {
if errors.Is(err, lassieretriever.ErrNoCandidates) {
// Just do a debug print if there were no candidates because this happens a lot
log.Debugf("No candidates for %s", cid)
} else {
// Otherwise, there was a real failure, print with more importance
log.Errorf("Request for %s failed: %v", cid, err)
}
} else {
result, err := provider.retriever.Retrieve(ctx, retrievalId, cid)
if err != nil {
if errors.Is(err, lassieretriever.ErrRetrievalAlreadyRunning) {
log.Debugf("Retrieval already running for %s, no new one will be started", cid)
continue // Don't send dont_have or run blockCancel(), let it async load
} else if errors.Is(err, lassieretriever.ErrNoCandidates) {
// Just do a debug print if there were no candidates because this happens a lot
log.Debugf("No candidates for %s (%s)", cid, retrievalId)
provider.queueSendDontHave(peerID, task.Priority, cid, "no_candidates")
} else {
// Otherwise, there was a real failure, print with more importance
log.Errorf("Retrieval for %s (%s) failed: %v", cid, retrievalId, err)
provider.queueSendDontHave(peerID, task.Priority, cid, "retrieval_failed")
}
} else {
log.Infof("Started retrieval for %s", cid)
log.Infof("Retrieval for %s (%s) completed (duration: %s, bytes: %s, blocks: %d)", cid, retrievalId, result.Duration, humanize.IBytes(result.Size), result.Blocks)
}

// TODO: if retriever.Request() is changed to be blocking, make
// blockManager.AwaitBlock() cancellable and cancel it after the
// request finishes if there's an error
provider.blockManager.AwaitBlock(ctx, cid, func(block blocks.Block, err error) {
if err != nil {
log.Debugf("Async block load failed: %s", err)
provider.queueSendDontHave(peerID, task.Priority, block.Cid, "failed_block_load")
} else {
log.Debugf("Async block load completed: %s", block.Cid)
provider.queueSendBlock(peerID, task.Priority, block.Cid, block.Size)
}
})
blockCancel()
}

provider.retrievalQueue.TasksDone(peerID, tasks...)
Expand All @@ -387,11 +417,11 @@ func (provider *Provider) ReceiveError(err error) {
}

func (provider *Provider) PeerConnected(peerID peer.ID) {
log.Debugf("Peer %s connected", peerID)
// TODO: too noisy: log.Debugf("Peer %s connected", peerID)
}

func (provider *Provider) PeerDisconnected(peerID peer.ID) {
log.Debugf("Peer %s disconnected", peerID)
// TODO: too noisy: log.Debugf("Peer %s disconnected", peerID)
}

func (provider *Provider) queueSendHave(peerID peer.ID, priority int, cid cid.Cid) {
Expand All @@ -404,6 +434,7 @@ func (provider *Provider) queueSendHave(peerID peer.ID, priority int, cid cid.Ci
action: actionSendHave,
},
})
provider.responseQueueSignalChan <- struct{}{}
}

func (provider *Provider) queueSendDontHave(peerID peer.ID, priority int, cid cid.Cid, reason string) {
Expand All @@ -417,6 +448,7 @@ func (provider *Provider) queueSendDontHave(peerID peer.ID, priority int, cid ci
reason: reason,
},
})
provider.responseQueueSignalChan <- struct{}{}
}

func (provider *Provider) queueSendBlock(peerID peer.ID, priority int, cid cid.Cid, size int) {
Expand All @@ -429,4 +461,5 @@ func (provider *Provider) queueSendBlock(peerID peer.ID, priority int, cid cid.C
action: actionSendBlock, // TODO: maybe check retrieval task for this
},
})
provider.responseQueueSignalChan <- struct{}{}
}
Loading