Skip to content

Commit

Permalink
Merge pull request #173 from application-research/rvagg/lassie-update…
Browse files Browse the repository at this point in the history
…-provider-revert

feat: update lassie to sync Retriever (after provider rewrite revert)
  • Loading branch information
hannahhoward authored Feb 1, 2023
2 parents c0de64f + 03956e7 commit 89e0899
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 189 deletions.
25 changes: 11 additions & 14 deletions autoretrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"github.com/application-research/autoretrieve/paychannelmanager"
lassieclient "github.com/filecoin-project/lassie/pkg/client"
lassieeventrecorder "github.com/filecoin-project/lassie/pkg/eventrecorder"
"github.com/filecoin-project/lassie/pkg/indexerlookup"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
rpcstmgr "github.com/filecoin-project/lotus/chain/stmgr/rpc"
"github.com/filecoin-project/lotus/chain/wallet"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/paychmgr"
"github.com/ipfs/go-cid"
ipfsdatastore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
flatfs "github.com/ipfs/go-ds-flatfs"
Expand Down Expand Up @@ -154,30 +154,25 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) {
// Initialize Filecoin retriever
var retriever *lassieretriever.Retriever
if !cfg.DisableRetrieval {
var ep lassieretriever.Endpoint
var candidateFinder lassieretriever.CandidateFinder
switch cfg.LookupEndpointType {
case EndpointTypeEstuary:
logger.Infof("Using Estuary endpoint type")
ep = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter)
logger.Infof("Using Estuary candidate finder type")
candidateFinder = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter)
case EndpointTypeIndexer:
logger.Infof("Using indexer endpoint type")
ep = endpoint.NewIndexerEndpoint(cfg.LookupEndpointURL)
logger.Infof("Using indexer candidate finder type")
candidateFinder = indexerlookup.NewCandidateFinder(cfg.LookupEndpointURL)
default:
return nil, errors.New("unrecognized endpoint type")
return nil, errors.New("unrecognized candidate finder type")
}

retrieverCfg, err := cfg.ExtractFilecoinRetrieverConfig(cctx.Context, minerPeerGetter)
if err != nil {
return nil, err
}

confirmer := func(c cid.Cid) (bool, error) {
return blockManager.Has(cctx.Context, c)
}

// Instantiate client
retrievalClient, err := lassieclient.NewClient(
blockstore,
datastore,
host,
payChanMgr,
Expand All @@ -190,17 +185,19 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) {
return nil, err
}

retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, ep, confirmer)
retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, candidateFinder)
if err != nil {
return nil, err
}
retriever.Start()
if cfg.EventRecorderEndpointURL != "" {
logger.Infof("Reporting retrieval events to %v", cfg.EventRecorderEndpointURL)
eventRecorderEndpointAuthorization, err := loadEventRecorderAuth(dataDirPath(cctx))
if err != nil {
return nil, err
}
retriever.RegisterListener(lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization))
eventRecorder := lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization)
retriever.RegisterSubscriber(eventRecorder.RecordEvent)
}
}

Expand Down
123 changes: 74 additions & 49 deletions bitswap/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ import (

"github.com/application-research/autoretrieve/blocks"
"github.com/application-research/autoretrieve/metrics"
"github.com/dustin/go-humanize"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-bitswap/message"
bitswap_message_pb "github.com/ipfs/go-bitswap/message/pb"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-graphsync/storeutil"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipld/go-ipld-prime"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -53,6 +57,7 @@ type Provider struct {
config ProviderConfig
network network.BitSwapNetwork
blockManager *blocks.Manager
linkSystem ipld.LinkSystem
retriever *lassieretriever.Retriever
taskQueue *peertaskqueue.PeerTaskQueue
workReady chan struct{}
Expand Down Expand Up @@ -97,6 +102,7 @@ func NewProvider(
config: config,
network: network.NewFromIpfsHost(host, routing),
blockManager: blockManager,
linkSystem: storeutil.LinkSystemForBlockstore(blockManager.Blockstore),
retriever: retriever,
taskQueue: peertaskqueue.New(),
workReady: make(chan struct{}, config.MaxBitswapWorkers),
Expand Down Expand Up @@ -157,14 +163,14 @@ func (provider *Provider) ReceiveMessage(ctx context.Context, sender peer.ID, in

// If there was a problem (aside from block not found), log and move
// on
if err != nil && !ipld.IsNotFound(err) {
if err != nil && !format.IsNotFound(err) {
logger.Warnf("Failed to get block for bitswap entry: %s", entry.Cid)
continue
}

// As long as no not found error was hit, queue the block and move
// on...
if !ipld.IsNotFound(err) {
if !format.IsNotFound(err) {
stats.Record(ctx, metrics.BlockstoreCacheHitCount.M(1))
provider.queueBlock(ctx, sender, entry, size)
continue
Expand All @@ -188,53 +194,9 @@ func (provider *Provider) ReceiveMessage(ctx context.Context, sender peer.ID, in

switch entry.WantType {
case wantTypeHave:
// TODO: for WANT_HAVE, just check if there's a candidate, we
// probably don't have to actually do the retrieval yet
if err := provider.retriever.Request(entry.Cid); err != nil {
// If no candidates were found, there's nothing that can be done, so
// queue DONT_HAVE and move on
provider.queueDontHave(ctx, sender, entry, "failed_retriever_request")

if !errors.Is(err, lassieretriever.ErrNoCandidates) {
logger.Warnf("Could not get candidates: %s", err.Error())
}

continue
}

provider.blockManager.AwaitBlock(ctx, entry.Cid, func(_ blocks.Block, err error) {
if err != nil {
logger.Debugf("Failed to load block: %s", err.Error())
provider.queueDontHave(ctx, sender, entry, "failed_block_load")
} else {
logger.Debugf("Successfully awaited block (want_have): %s", entry.Cid)
provider.queueHave(context.Background(), sender, entry)
}
provider.signalWork()
})
provider.retrieveForPeer(ctx, entry, sender, false)
case wantTypeBlock:
if err := provider.retriever.Request(entry.Cid); err != nil {
// If no candidates were found, there's nothing that can be done, so
// queue DONT_HAVE and move on
provider.queueDontHave(ctx, sender, entry, "failed_retriever_request")

if !errors.Is(err, lassieretriever.ErrNoCandidates) {
logger.Warnf("Could not get candidates: %s", err.Error())
}

continue
}

provider.blockManager.AwaitBlock(ctx, entry.Cid, func(block blocks.Block, err error) {
if err != nil {
logger.Debugf("Failed to load block: %s", err.Error())
provider.queueDontHave(ctx, sender, entry, "failed_block_load")
} else {
logger.Debugf("Successfully awaited block (want_block): %s", entry.Cid)
provider.queueBlock(context.Background(), sender, entry, block.Size)
}
provider.signalWork()
})
provider.retrieveForPeer(ctx, entry, sender, true)
}
}

Expand Down Expand Up @@ -355,3 +317,66 @@ func (provider *Provider) queueHave(ctx context.Context, sender peer.ID, entry m
// Record response metric
stats.Record(ctx, metrics.BitswapResponseCount.M(1))
}

func (provider *Provider) retrieveForPeer(ctx context.Context, entry message.Entry, sender peer.ID, sendBlock bool) {
retrievalId, err := types.NewRetrievalID()
if err != nil {
logger.Errorf("Failed to create retrieval ID: %s", err.Error())
return
}

logger.Debugf("Starting retrieval for %s (%s)", entry.Cid, retrievalId)

// Start a background blockstore fetch with a callback to send the block
// to the peer once it's available.
blockCtx, blockCancel := context.WithCancel(ctx)
if provider.blockManager.AwaitBlock(blockCtx, entry.Cid, func(block blocks.Block, err error) {
if err != nil {
logger.Debugf("Failed to load block: %s", err.Error())
provider.queueDontHave(ctx, sender, entry, "failed_block_load")
} else {
if sendBlock {
logger.Debugf("Successfully awaited block (want_block): %s", entry.Cid)
provider.queueBlock(context.Background(), sender, entry, block.Size)
} else {
logger.Debugf("Successfully awaited block (want_have): %s", entry.Cid)
provider.queueHave(context.Background(), sender, entry)
}
}
provider.signalWork()
blockCancel()
}) {
// If the block was already in the blockstore then we don't need to
// start a retrieval.
return
}

// Try to start a new retrieval (if it's already running then no
// need to error, just continue on to await block)
go func() {
result, err := provider.retriever.Retrieve(ctx, provider.linkSystem, retrievalId, entry.Cid)
if err != nil {
if errors.Is(err, lassieretriever.ErrRetrievalAlreadyRunning) {
logger.Debugf("Retrieval already running for %s, no new one will be started", entry.Cid)
return // Don't send dont_have or run blockCancel(), let it async load
} else if errors.Is(err, lassieretriever.ErrNoCandidates) {
// Just do a debug print if there were no candidates because this happens a lot
logger.Debugf("No candidates for %s (%s)", entry.Cid, retrievalId)
provider.queueDontHave(ctx, sender, entry, "no_candidates")
} else {
// Otherwise, there was a real failure, print with more importance
logger.Errorf("Retrieval for %s (%s) failed: %v", entry.Cid, retrievalId, err)
provider.queueDontHave(ctx, sender, entry, "retrieval_failed")
}
} else {
logger.Infof("Retrieval for %s (%s) completed (duration: %s, bytes: %s, blocks: %d)",
entry.Cid,
retrievalId,
result.Duration,
humanize.IBytes(result.Size),
result.Blocks,
)
}
blockCancel()
}()
}
32 changes: 22 additions & 10 deletions blocks/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Manager struct {
}

type waitListEntry struct {
ctx context.Context
callback func(Block, error)
registeredAt time.Time
}
Expand All @@ -55,7 +56,13 @@ func NewManager(inner blockstore.Blockstore, getAwaitTimeout time.Duration) *Man
return mgr
}

func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(Block, error)) {
// AwaitBlock will wait for a block to be added to the blockstore and then
// call the callback with the block. If the block is already in the blockstore,
// the callback will be called immediately. If the block is not in the blockstore
// or the context is cancelled, the callback will not be called.
// Returns true if the block was already in the blockstore, allowing the
// callback to be called, or false otherwise.
func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(Block, error)) bool {
// We need to lock the blockstore here to make sure the requested block
// doesn't get added while being added to the waitlist
mgr.waitListLk.Lock()
Expand All @@ -68,22 +75,25 @@ func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(B
if !ipld.IsNotFound(err) {
mgr.waitListLk.Unlock()
callback(Block{}, err)
return
return false
}

mgr.waitList[cid] = append(mgr.waitList[cid], waitListEntry{
ctx: ctx,
callback: callback,
registeredAt: time.Now(),
})

mgr.waitListLk.Unlock()
return
return false
}

mgr.waitListLk.Unlock()

// Otherwise, we can immediately run the callback
// Otherwise, we can immediately run the callback and notify the caller of
// success
callback(Block{cid, size}, nil)
return true
}

func (mgr *Manager) Put(ctx context.Context, block blocks.Block) error {
Expand Down Expand Up @@ -149,20 +159,22 @@ func (mgr *Manager) startPollCleanup() {
for cid := range mgr.waitList {
// For each element in the slice for this CID...
for i := 0; i < len(mgr.waitList[cid]); i++ {
// ...check if it's timed out...
if time.Since(mgr.waitList[cid][i].registeredAt) > mgr.getAwaitTimeout {
// ...check whether the waiter context was cancelled or it's been in the
// list too long...
if mgr.waitList[cid][i].ctx.Err() != nil || time.Since(mgr.waitList[cid][i].registeredAt) > mgr.getAwaitTimeout {
// ...and if so, delete this element by replacing it with
// the last element of the slice and shrinking the length by
// 1, and step the index back
mgr.waitList[cid][i].callback(Block{}, ErrWaitTimeout)
// 1, and step the index back.
if mgr.waitList[cid][i].ctx.Err() == nil {
mgr.waitList[cid][i].callback(Block{}, ErrWaitTimeout)
}
mgr.waitList[cid][i] = mgr.waitList[cid][len(mgr.waitList[cid])-1]
mgr.waitList[cid] = mgr.waitList[cid][:len(mgr.waitList[cid])-1]
i--
}
}

// If the slice is empty now, remove it entirely from the waitList
// map
// If the slice is empty now, remove it entirely from the waitList map
if len(mgr.waitList[cid]) == 0 {
delete(mgr.waitList, cid)
}
Expand Down
8 changes: 4 additions & 4 deletions endpoint/estuary.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/application-research/autoretrieve/minerpeergetter"
"github.com/filecoin-project/go-address"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
)

Expand Down Expand Up @@ -39,7 +39,7 @@ func NewEstuaryEndpoint(url string, mpg *minerpeergetter.MinerPeerGetter) *Estua
}
}

func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]lassieretriever.RetrievalCandidate, error) {
func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) {
// Create URL with CID
endpointURL, err := url.Parse(ee.url)
if err != nil {
Expand All @@ -63,13 +63,13 @@ func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]l
return nil, ErrEndpointBodyInvalid
}

converted := make([]lassieretriever.RetrievalCandidate, 0, len(unfiltered))
converted := make([]types.RetrievalCandidate, 0, len(unfiltered))
for _, original := range unfiltered {
minerPeer, err := ee.mpg.MinerPeer(ctx, original.Miner)
if err != nil {
return nil, fmt.Errorf("%w: failed to get miner peer: %v", ErrEndpointRequestFailed, err)
}
converted = append(converted, lassieretriever.RetrievalCandidate{
converted = append(converted, types.RetrievalCandidate{
MinerPeer: minerPeer,
RootCid: original.RootCid,
})
Expand Down
Loading

0 comments on commit 89e0899

Please sign in to comment.