Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update lassie to sync Retriever (after provider rewrite revert) #173

Merged
merged 2 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions autoretrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"github.com/application-research/autoretrieve/paychannelmanager"
lassieclient "github.com/filecoin-project/lassie/pkg/client"
lassieeventrecorder "github.com/filecoin-project/lassie/pkg/eventrecorder"
"github.com/filecoin-project/lassie/pkg/indexerlookup"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
rpcstmgr "github.com/filecoin-project/lotus/chain/stmgr/rpc"
"github.com/filecoin-project/lotus/chain/wallet"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/paychmgr"
"github.com/ipfs/go-cid"
ipfsdatastore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
flatfs "github.com/ipfs/go-ds-flatfs"
Expand Down Expand Up @@ -154,30 +154,25 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) {
// Initialize Filecoin retriever
var retriever *lassieretriever.Retriever
if !cfg.DisableRetrieval {
var ep lassieretriever.Endpoint
var candidateFinder lassieretriever.CandidateFinder
switch cfg.LookupEndpointType {
case EndpointTypeEstuary:
logger.Infof("Using Estuary endpoint type")
ep = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter)
logger.Infof("Using Estuary candidate finder type")
candidateFinder = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter)
case EndpointTypeIndexer:
logger.Infof("Using indexer endpoint type")
ep = endpoint.NewIndexerEndpoint(cfg.LookupEndpointURL)
logger.Infof("Using indexer candidate finder type")
candidateFinder = indexerlookup.NewCandidateFinder(cfg.LookupEndpointURL)
default:
return nil, errors.New("unrecognized endpoint type")
return nil, errors.New("unrecognized candidate finder type")
}

retrieverCfg, err := cfg.ExtractFilecoinRetrieverConfig(cctx.Context, minerPeerGetter)
if err != nil {
return nil, err
}

confirmer := func(c cid.Cid) (bool, error) {
return blockManager.Has(cctx.Context, c)
}

// Instantiate client
retrievalClient, err := lassieclient.NewClient(
blockstore,
datastore,
host,
payChanMgr,
Expand All @@ -190,17 +185,19 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) {
return nil, err
}

retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, ep, confirmer)
retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, candidateFinder)
if err != nil {
return nil, err
}
retriever.Start()
if cfg.EventRecorderEndpointURL != "" {
logger.Infof("Reporting retrieval events to %v", cfg.EventRecorderEndpointURL)
eventRecorderEndpointAuthorization, err := loadEventRecorderAuth(dataDirPath(cctx))
if err != nil {
return nil, err
}
retriever.RegisterListener(lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization))
eventRecorder := lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization)
retriever.RegisterSubscriber(eventRecorder.RecordEvent)
}
}

Expand Down
123 changes: 74 additions & 49 deletions bitswap/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ import (

"github.com/application-research/autoretrieve/blocks"
"github.com/application-research/autoretrieve/metrics"
"github.com/dustin/go-humanize"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-bitswap/message"
bitswap_message_pb "github.com/ipfs/go-bitswap/message/pb"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-graphsync/storeutil"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipld/go-ipld-prime"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -53,6 +57,7 @@ type Provider struct {
config ProviderConfig
network network.BitSwapNetwork
blockManager *blocks.Manager
linkSystem ipld.LinkSystem
retriever *lassieretriever.Retriever
taskQueue *peertaskqueue.PeerTaskQueue
workReady chan struct{}
Expand Down Expand Up @@ -97,6 +102,7 @@ func NewProvider(
config: config,
network: network.NewFromIpfsHost(host, routing),
blockManager: blockManager,
linkSystem: storeutil.LinkSystemForBlockstore(blockManager.Blockstore),
retriever: retriever,
taskQueue: peertaskqueue.New(),
workReady: make(chan struct{}, config.MaxBitswapWorkers),
Expand Down Expand Up @@ -157,14 +163,14 @@ func (provider *Provider) ReceiveMessage(ctx context.Context, sender peer.ID, in

// If there was a problem (aside from block not found), log and move
// on
if err != nil && !ipld.IsNotFound(err) {
if err != nil && !format.IsNotFound(err) {
logger.Warnf("Failed to get block for bitswap entry: %s", entry.Cid)
continue
}

// As long as no not found error was hit, queue the block and move
// on...
if !ipld.IsNotFound(err) {
if !format.IsNotFound(err) {
stats.Record(ctx, metrics.BlockstoreCacheHitCount.M(1))
provider.queueBlock(ctx, sender, entry, size)
continue
Expand All @@ -188,53 +194,9 @@ func (provider *Provider) ReceiveMessage(ctx context.Context, sender peer.ID, in

switch entry.WantType {
case wantTypeHave:
// TODO: for WANT_HAVE, just check if there's a candidate, we
// probably don't have to actually do the retrieval yet
if err := provider.retriever.Request(entry.Cid); err != nil {
// If no candidates were found, there's nothing that can be done, so
// queue DONT_HAVE and move on
provider.queueDontHave(ctx, sender, entry, "failed_retriever_request")

if !errors.Is(err, lassieretriever.ErrNoCandidates) {
logger.Warnf("Could not get candidates: %s", err.Error())
}

continue
}

provider.blockManager.AwaitBlock(ctx, entry.Cid, func(_ blocks.Block, err error) {
if err != nil {
logger.Debugf("Failed to load block: %s", err.Error())
provider.queueDontHave(ctx, sender, entry, "failed_block_load")
} else {
logger.Debugf("Successfully awaited block (want_have): %s", entry.Cid)
provider.queueHave(context.Background(), sender, entry)
}
provider.signalWork()
})
provider.retrieveForPeer(ctx, entry, sender, false)
case wantTypeBlock:
if err := provider.retriever.Request(entry.Cid); err != nil {
// If no candidates were found, there's nothing that can be done, so
// queue DONT_HAVE and move on
provider.queueDontHave(ctx, sender, entry, "failed_retriever_request")

if !errors.Is(err, lassieretriever.ErrNoCandidates) {
logger.Warnf("Could not get candidates: %s", err.Error())
}

continue
}

provider.blockManager.AwaitBlock(ctx, entry.Cid, func(block blocks.Block, err error) {
if err != nil {
logger.Debugf("Failed to load block: %s", err.Error())
provider.queueDontHave(ctx, sender, entry, "failed_block_load")
} else {
logger.Debugf("Successfully awaited block (want_block): %s", entry.Cid)
provider.queueBlock(context.Background(), sender, entry, block.Size)
}
provider.signalWork()
})
provider.retrieveForPeer(ctx, entry, sender, true)
}
}

Expand Down Expand Up @@ -355,3 +317,66 @@ func (provider *Provider) queueHave(ctx context.Context, sender peer.ID, entry m
// Record response metric
stats.Record(ctx, metrics.BitswapResponseCount.M(1))
}

func (provider *Provider) retrieveForPeer(ctx context.Context, entry message.Entry, sender peer.ID, sendBlock bool) {
retrievalId, err := types.NewRetrievalID()
if err != nil {
logger.Errorf("Failed to create retrieval ID: %s", err.Error())
return
}

logger.Debugf("Starting retrieval for %s (%s)", entry.Cid, retrievalId)

// Start a background blockstore fetch with a callback to send the block
// to the peer once it's available.
blockCtx, blockCancel := context.WithCancel(ctx)
if provider.blockManager.AwaitBlock(blockCtx, entry.Cid, func(block blocks.Block, err error) {
if err != nil {
logger.Debugf("Failed to load block: %s", err.Error())
provider.queueDontHave(ctx, sender, entry, "failed_block_load")
} else {
if sendBlock {
logger.Debugf("Successfully awaited block (want_block): %s", entry.Cid)
provider.queueBlock(context.Background(), sender, entry, block.Size)
} else {
logger.Debugf("Successfully awaited block (want_have): %s", entry.Cid)
provider.queueHave(context.Background(), sender, entry)
}
}
provider.signalWork()
blockCancel()
}) {
// If the block was already in the blockstore then we don't need to
// start a retrieval.
return
}

// Try to start a new retrieval (if it's already running then no
// need to error, just continue on to await block)
go func() {
result, err := provider.retriever.Retrieve(ctx, provider.linkSystem, retrievalId, entry.Cid)
if err != nil {
if errors.Is(err, lassieretriever.ErrRetrievalAlreadyRunning) {
logger.Debugf("Retrieval already running for %s, no new one will be started", entry.Cid)
return // Don't send dont_have or run blockCancel(), let it async load
} else if errors.Is(err, lassieretriever.ErrNoCandidates) {
// Just do a debug print if there were no candidates because this happens a lot
logger.Debugf("No candidates for %s (%s)", entry.Cid, retrievalId)
provider.queueDontHave(ctx, sender, entry, "no_candidates")
} else {
// Otherwise, there was a real failure, print with more importance
logger.Errorf("Retrieval for %s (%s) failed: %v", entry.Cid, retrievalId, err)
provider.queueDontHave(ctx, sender, entry, "retrieval_failed")
}
} else {
logger.Infof("Retrieval for %s (%s) completed (duration: %s, bytes: %s, blocks: %d)",
entry.Cid,
retrievalId,
result.Duration,
humanize.IBytes(result.Size),
result.Blocks,
)
}
blockCancel()
}()
}
32 changes: 22 additions & 10 deletions blocks/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Manager struct {
}

type waitListEntry struct {
ctx context.Context
callback func(Block, error)
registeredAt time.Time
}
Expand All @@ -55,7 +56,13 @@ func NewManager(inner blockstore.Blockstore, getAwaitTimeout time.Duration) *Man
return mgr
}

func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(Block, error)) {
// AwaitBlock will wait for a block to be added to the blockstore and then
// call the callback with the block. If the block is already in the blockstore,
// the callback will be called immediately. If the block is not in the blockstore
// or the context is cancelled, the callback will not be called.
// Returns true if the block was already in the blockstore, allowing the
// callback to be called, or false otherwise.
func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(Block, error)) bool {
// We need to lock the blockstore here to make sure the requested block
// doesn't get added while being added to the waitlist
mgr.waitListLk.Lock()
Expand All @@ -68,22 +75,25 @@ func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(B
if !ipld.IsNotFound(err) {
mgr.waitListLk.Unlock()
callback(Block{}, err)
return
return false
}

mgr.waitList[cid] = append(mgr.waitList[cid], waitListEntry{
ctx: ctx,
callback: callback,
registeredAt: time.Now(),
})

mgr.waitListLk.Unlock()
return
return false
}

mgr.waitListLk.Unlock()

// Otherwise, we can immediately run the callback
// Otherwise, we can immediately run the callback and notify the caller of
// success
callback(Block{cid, size}, nil)
return true
}

func (mgr *Manager) Put(ctx context.Context, block blocks.Block) error {
Expand Down Expand Up @@ -149,20 +159,22 @@ func (mgr *Manager) startPollCleanup() {
for cid := range mgr.waitList {
// For each element in the slice for this CID...
for i := 0; i < len(mgr.waitList[cid]); i++ {
// ...check if it's timed out...
if time.Since(mgr.waitList[cid][i].registeredAt) > mgr.getAwaitTimeout {
// ...check whether the waiter context was cancelled or it's been in the
// list too long...
if mgr.waitList[cid][i].ctx.Err() != nil || time.Since(mgr.waitList[cid][i].registeredAt) > mgr.getAwaitTimeout {
// ...and if so, delete this element by replacing it with
// the last element of the slice and shrinking the length by
// 1, and step the index back
mgr.waitList[cid][i].callback(Block{}, ErrWaitTimeout)
// 1, and step the index back.
if mgr.waitList[cid][i].ctx.Err() == nil {
mgr.waitList[cid][i].callback(Block{}, ErrWaitTimeout)
}
mgr.waitList[cid][i] = mgr.waitList[cid][len(mgr.waitList[cid])-1]
mgr.waitList[cid] = mgr.waitList[cid][:len(mgr.waitList[cid])-1]
i--
}
}

// If the slice is empty now, remove it entirely from the waitList
// map
// If the slice is empty now, remove it entirely from the waitList map
if len(mgr.waitList[cid]) == 0 {
delete(mgr.waitList, cid)
}
Expand Down
8 changes: 4 additions & 4 deletions endpoint/estuary.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/application-research/autoretrieve/minerpeergetter"
"github.com/filecoin-project/go-address"
lassieretriever "github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
)

Expand Down Expand Up @@ -39,7 +39,7 @@ func NewEstuaryEndpoint(url string, mpg *minerpeergetter.MinerPeerGetter) *Estua
}
}

func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]lassieretriever.RetrievalCandidate, error) {
func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) {
// Create URL with CID
endpointURL, err := url.Parse(ee.url)
if err != nil {
Expand All @@ -63,13 +63,13 @@ func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]l
return nil, ErrEndpointBodyInvalid
}

converted := make([]lassieretriever.RetrievalCandidate, 0, len(unfiltered))
converted := make([]types.RetrievalCandidate, 0, len(unfiltered))
for _, original := range unfiltered {
minerPeer, err := ee.mpg.MinerPeer(ctx, original.Miner)
if err != nil {
return nil, fmt.Errorf("%w: failed to get miner peer: %v", ErrEndpointRequestFailed, err)
}
converted = append(converted, lassieretriever.RetrievalCandidate{
converted = append(converted, types.RetrievalCandidate{
MinerPeer: minerPeer,
RootCid: original.RootCid,
})
Expand Down
Loading