From b6db210bb96baf5264a018c20ca057a52b6bfe34 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 24 Jan 2023 20:41:44 +1100 Subject: [PATCH 1/2] feat: update lassie to sync Retriever * Retriever#Retrieve() calls are now synchronous, so we get to wait for the direct return value and error synchronously * Change the AwaitGet call order and make it cancellable * Make the provider context-cancel aware for cleaner shutdown * Other minor fixes and adaptions to the new Lassie code --- autoretrieve.go | 24 +++++------ bitswap/provider.go | 100 ++++++++++++++++++++++++-------------------- blocks/manager.go | 32 +++++++++----- endpoint/estuary.go | 8 ++-- endpoint/indexer.go | 90 --------------------------------------- go.mod | 15 +++---- go.sum | 29 +++++++------ 7 files changed, 116 insertions(+), 182 deletions(-) delete mode 100644 endpoint/indexer.go diff --git a/autoretrieve.go b/autoretrieve.go index 6dc7ea4..7ca9d43 100644 --- a/autoretrieve.go +++ b/autoretrieve.go @@ -24,12 +24,12 @@ import ( "github.com/application-research/autoretrieve/paychannelmanager" lassieclient "github.com/filecoin-project/lassie/pkg/client" lassieeventrecorder "github.com/filecoin-project/lassie/pkg/eventrecorder" + "github.com/filecoin-project/lassie/pkg/indexerlookup" lassieretriever "github.com/filecoin-project/lassie/pkg/retriever" rpcstmgr "github.com/filecoin-project/lotus/chain/stmgr/rpc" "github.com/filecoin-project/lotus/chain/wallet" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/paychmgr" - "github.com/ipfs/go-cid" ipfsdatastore "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" flatfs "github.com/ipfs/go-ds-flatfs" @@ -154,16 +154,16 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) { // Initialize Filecoin retriever var retriever *lassieretriever.Retriever if !cfg.DisableRetrieval { - var ep lassieretriever.Endpoint + var candidateFinder lassieretriever.CandidateFinder switch cfg.LookupEndpointType { case EndpointTypeEstuary: - logger.Infof("Using Estuary endpoint type") - ep = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter) + logger.Infof("Using Estuary candidate finder type") + candidateFinder = endpoint.NewEstuaryEndpoint(cfg.LookupEndpointURL, minerPeerGetter) case EndpointTypeIndexer: - logger.Infof("Using indexer endpoint type") - ep = endpoint.NewIndexerEndpoint(cfg.LookupEndpointURL) + logger.Infof("Using indexer candidate finder type") + candidateFinder = indexerlookup.NewCandidateFinder(cfg.LookupEndpointURL) default: - return nil, errors.New("unrecognized endpoint type") + return nil, errors.New("unrecognized candidate finder type") } retrieverCfg, err := cfg.ExtractFilecoinRetrieverConfig(cctx.Context, minerPeerGetter) @@ -171,10 +171,6 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) { return nil, err } - confirmer := func(c cid.Cid) (bool, error) { - return blockManager.Has(cctx.Context, c) - } - // Instantiate client retrievalClient, err := lassieclient.NewClient( blockstore, @@ -190,17 +186,19 @@ func New(cctx *cli.Context, dataDir string, cfg Config) (*Autoretrieve, error) { return nil, err } - retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, ep, confirmer) + retriever, err = lassieretriever.NewRetriever(cctx.Context, retrieverCfg, retrievalClient, candidateFinder) if err != nil { return nil, err } + <-retriever.Start() if cfg.EventRecorderEndpointURL != "" { logger.Infof("Reporting retrieval events to %v", cfg.EventRecorderEndpointURL) eventRecorderEndpointAuthorization, err := loadEventRecorderAuth(dataDirPath(cctx)) if err != nil { return nil, err } - retriever.RegisterListener(lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization)) + eventRecorder := lassieeventrecorder.NewEventRecorder(cctx.Context, cfg.InstanceId, cfg.EventRecorderEndpointURL, eventRecorderEndpointAuthorization) + retriever.RegisterSubscriber(eventRecorder.RecordEvent) } } diff --git a/bitswap/provider.go b/bitswap/provider.go index ae6dfd9..38699e9 100644 --- a/bitswap/provider.go +++ b/bitswap/provider.go @@ -7,7 +7,9 @@ import ( "github.com/application-research/autoretrieve/blocks" "github.com/application-research/autoretrieve/metrics" + "github.com/dustin/go-humanize" lassieretriever "github.com/filecoin-project/lassie/pkg/retriever" + "github.com/filecoin-project/lassie/pkg/types" "github.com/ipfs/go-bitswap/message" bitswap_message_pb "github.com/ipfs/go-bitswap/message/pb" "github.com/ipfs/go-bitswap/network" @@ -133,15 +135,15 @@ func NewProvider( provider.network.Start(provider) for i := 0; i < int(config.RequestWorkers); i++ { - go provider.handleRequests() + go provider.handleRequests(ctx) } for i := 0; i < int(config.ResponseWorkers); i++ { - go provider.handleResponses() + go provider.handleResponses(ctx) } for i := 0; i < int(config.RetrievalWorkers); i++ { - go provider.handleRetrievals() + go provider.handleRetrievals(ctx) } return provider, nil @@ -160,10 +162,8 @@ func (provider *Provider) ReceiveMessage(ctx context.Context, sender peer.ID, in provider.requestQueue.PushTasks(sender, tasks...) } -func (provider *Provider) handleRequests() { - ctx := context.Background() - - for { +func (provider *Provider) handleRequests(ctx context.Context) { + for ctx.Err() == nil { peerID, tasks, _ := provider.requestQueue.PopTasks(100) if len(tasks) == 0 { time.Sleep(time.Millisecond * 250) @@ -256,10 +256,8 @@ func (provider *Provider) handleRequest( return nil } -func (provider *Provider) handleResponses() { - ctx := context.Background() - - for { +func (provider *Provider) handleResponses(ctx context.Context) { + for ctx.Err() == nil { peerID, tasks, _ := provider.responseQueue.PopTasks(targetMessageSize) if len(tasks) == 0 { time.Sleep(time.Millisecond * 250) @@ -291,15 +289,15 @@ func (provider *Provider) handleResponses() { log.Debugf("Sending have for %s", cid) // Response metric - ctx, _ = tag.New(ctx, tag.Insert(metrics.BitswapTopic, "HAVE")) - stats.Record(ctx, metrics.BitswapResponseCount.M(1)) + taggedCtx, _ := tag.New(ctx, tag.Insert(metrics.BitswapTopic, "HAVE")) + stats.Record(taggedCtx, metrics.BitswapResponseCount.M(1)) case actionSendDontHave: msg.AddDontHave(cid) log.Debugf("Sending dont have for %s", cid) // Response metric - ctx, _ = tag.New(ctx, tag.Insert(metrics.BitswapTopic, "DONT_HAVE"), tag.Insert(metrics.BitswapDontHaveReason, data.reason)) - stats.Record(ctx, metrics.BitswapResponseCount.M(1)) + taggedCtx, _ := tag.New(ctx, tag.Insert(metrics.BitswapTopic, "DONT_HAVE"), tag.Insert(metrics.BitswapDontHaveReason, data.reason)) + stats.Record(taggedCtx, metrics.BitswapResponseCount.M(1)) case actionSendBlock: block, err := provider.blockManager.Get(ctx, cid) if err != nil { @@ -310,8 +308,8 @@ func (provider *Provider) handleResponses() { log.Debugf("Sending block for %s", cid) // Response metric - ctx, _ = tag.New(ctx, tag.Insert(metrics.BitswapTopic, "BLOCK")) - stats.Record(ctx, metrics.BitswapResponseCount.M(1)) + taggedCtx, _ := tag.New(ctx, tag.Insert(metrics.BitswapTopic, "BLOCK")) + stats.Record(taggedCtx, metrics.BitswapResponseCount.M(1)) } } @@ -325,10 +323,8 @@ func (provider *Provider) handleResponses() { } } -func (provider *Provider) handleRetrievals() { - ctx := context.Background() - - for { +func (provider *Provider) handleRetrievals(ctx context.Context) { + for ctx.Err() == nil { peerID, tasks, _ := provider.retrievalQueue.PopTasks(1) if len(tasks) == 0 { time.Sleep(time.Millisecond * 250) @@ -344,38 +340,52 @@ func (provider *Provider) handleRetrievals() { continue } - log.Debugf("Requesting retrieval for %s", cid) + retrievalId, err := types.NewRetrievalID() + if err != nil { + log.Errorf("Failed to create retrieval ID: %s", err.Error()) + } + + log.Debugf("Starting retrieval for %s (%s)", cid, retrievalId) + + // Start a background blockstore fetch with a callback to send the block + // to the peer once it's available. + blockCtx, blockCancel := context.WithCancel(ctx) + if provider.blockManager.AwaitBlock(blockCtx, cid, func(block blocks.Block, err error) { + if err != nil { + log.Debugf("Async block load failed: %s", err) + provider.queueSendDontHave(peerID, task.Priority, cid, "failed_block_load") + } else { + log.Debugf("Async block load completed: %s", cid) + provider.queueSendBlock(peerID, task.Priority, cid, block.Size) + } + blockCancel() + }) { + // If the block was already in the blockstore then we don't need to + // start a retrieval. + continue + } // Try to start a new retrieval (if it's already running then no // need to error, just continue on to await block) - if err := provider.retriever.Request(cid); err != nil { - if !errors.As(err, &lassieretriever.ErrRetrievalAlreadyRunning{}) { - if errors.Is(err, lassieretriever.ErrNoCandidates) { - // Just do a debug print if there were no candidates because this happens a lot - log.Debugf("No candidates for %s", cid) - } else { - // Otherwise, there was a real failure, print with more importance - log.Errorf("Request for %s failed: %v", cid, err) - } - } else { + result, err := provider.retriever.Retrieve(ctx, retrievalId, cid) + if err != nil { + if errors.Is(err, lassieretriever.ErrRetrievalAlreadyRunning) { log.Debugf("Retrieval already running for %s, no new one will be started", cid) + continue // Don't send dont_have or run blockCancel(), let it async load + } else if errors.Is(err, lassieretriever.ErrNoCandidates) { + // Just do a debug print if there were no candidates because this happens a lot + log.Debugf("No candidates for %s (%s)", cid, retrievalId) + provider.queueSendDontHave(peerID, task.Priority, cid, "no_candidates") + } else { + // Otherwise, there was a real failure, print with more importance + log.Errorf("Retrieval for %s (%s) failed: %v", cid, retrievalId, err) + provider.queueSendDontHave(peerID, task.Priority, cid, "retrieval_failed") } } else { - log.Infof("Started retrieval for %s", cid) + log.Infof("Retrieval for %s (%s) completed (duration: %s, bytes: %s, blocks: %d)", cid, retrievalId, result.Duration, humanize.IBytes(result.Size), result.Blocks) } - // TODO: if retriever.Request() is changed to be blocking, make - // blockManager.AwaitBlock() cancellable and cancel it after the - // request finishes if there's an error - provider.blockManager.AwaitBlock(ctx, cid, func(block blocks.Block, err error) { - if err != nil { - log.Debugf("Async block load failed: %s", err) - provider.queueSendDontHave(peerID, task.Priority, block.Cid, "failed_block_load") - } else { - log.Debugf("Async block load completed: %s", block.Cid) - provider.queueSendBlock(peerID, task.Priority, block.Cid, block.Size) - } - }) + blockCancel() } provider.retrievalQueue.TasksDone(peerID, tasks...) diff --git a/blocks/manager.go b/blocks/manager.go index 1daa45f..37f160d 100644 --- a/blocks/manager.go +++ b/blocks/manager.go @@ -33,6 +33,7 @@ type Manager struct { } type waitListEntry struct { + ctx context.Context callback func(Block, error) registeredAt time.Time } @@ -55,7 +56,13 @@ func NewManager(inner blockstore.Blockstore, getAwaitTimeout time.Duration) *Man return mgr } -func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(Block, error)) { +// AwaitBlock will wait for a block to be added to the blockstore and then +// call the callback with the block. If the block is already in the blockstore, +// the callback will be called immediately. If the block is not in the blockstore +// or the context is cancelled, the callback will not be called. +// Returns true if the block was already in the blockstore, allowing the +// callback to be called, or false otherwise. +func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(Block, error)) bool { // We need to lock the blockstore here to make sure the requested block // doesn't get added while being added to the waitlist mgr.waitListLk.Lock() @@ -68,22 +75,25 @@ func (mgr *Manager) AwaitBlock(ctx context.Context, cid cid.Cid, callback func(B if !ipld.IsNotFound(err) { mgr.waitListLk.Unlock() callback(Block{}, err) - return + return false } mgr.waitList[cid] = append(mgr.waitList[cid], waitListEntry{ + ctx: ctx, callback: callback, registeredAt: time.Now(), }) mgr.waitListLk.Unlock() - return + return false } mgr.waitListLk.Unlock() - // Otherwise, we can immediately run the callback + // Otherwise, we can immediately run the callback and notify the caller of + // success callback(Block{cid, size}, nil) + return true } func (mgr *Manager) Put(ctx context.Context, block blocks.Block) error { @@ -149,20 +159,22 @@ func (mgr *Manager) startPollCleanup() { for cid := range mgr.waitList { // For each element in the slice for this CID... for i := 0; i < len(mgr.waitList[cid]); i++ { - // ...check if it's timed out... - if time.Since(mgr.waitList[cid][i].registeredAt) > mgr.getAwaitTimeout { + // ...check whether the waiter context was cancelled or it's been in the + // list too long... + if mgr.waitList[cid][i].ctx.Err() != nil || time.Since(mgr.waitList[cid][i].registeredAt) > mgr.getAwaitTimeout { // ...and if so, delete this element by replacing it with // the last element of the slice and shrinking the length by - // 1, and step the index back - mgr.waitList[cid][i].callback(Block{}, ErrWaitTimeout) + // 1, and step the index back. + if mgr.waitList[cid][i].ctx.Err() == nil { + mgr.waitList[cid][i].callback(Block{}, ErrWaitTimeout) + } mgr.waitList[cid][i] = mgr.waitList[cid][len(mgr.waitList[cid])-1] mgr.waitList[cid] = mgr.waitList[cid][:len(mgr.waitList[cid])-1] i-- } } - // If the slice is empty now, remove it entirely from the waitList - // map + // If the slice is empty now, remove it entirely from the waitList map if len(mgr.waitList[cid]) == 0 { delete(mgr.waitList, cid) } diff --git a/endpoint/estuary.go b/endpoint/estuary.go index 5a9b3f2..a120a42 100644 --- a/endpoint/estuary.go +++ b/endpoint/estuary.go @@ -11,7 +11,7 @@ import ( "github.com/application-research/autoretrieve/minerpeergetter" "github.com/filecoin-project/go-address" - lassieretriever "github.com/filecoin-project/lassie/pkg/retriever" + "github.com/filecoin-project/lassie/pkg/types" "github.com/ipfs/go-cid" ) @@ -39,7 +39,7 @@ func NewEstuaryEndpoint(url string, mpg *minerpeergetter.MinerPeerGetter) *Estua } } -func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]lassieretriever.RetrievalCandidate, error) { +func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) { // Create URL with CID endpointURL, err := url.Parse(ee.url) if err != nil { @@ -63,13 +63,13 @@ func (ee *EstuaryEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]l return nil, ErrEndpointBodyInvalid } - converted := make([]lassieretriever.RetrievalCandidate, 0, len(unfiltered)) + converted := make([]types.RetrievalCandidate, 0, len(unfiltered)) for _, original := range unfiltered { minerPeer, err := ee.mpg.MinerPeer(ctx, original.Miner) if err != nil { return nil, fmt.Errorf("%w: failed to get miner peer: %v", ErrEndpointRequestFailed, err) } - converted = append(converted, lassieretriever.RetrievalCandidate{ + converted = append(converted, types.RetrievalCandidate{ MinerPeer: minerPeer, RootCid: original.RootCid, }) diff --git a/endpoint/indexer.go b/endpoint/indexer.go deleted file mode 100644 index f514527..0000000 --- a/endpoint/indexer.go +++ /dev/null @@ -1,90 +0,0 @@ -package endpoint - -import ( - "context" - "fmt" - "io" - "math/rand" - "net/http" - "time" - - "github.com/filecoin-project/index-provider/metadata" - lassieretriever "github.com/filecoin-project/lassie/pkg/retriever" - "github.com/filecoin-project/storetheindex/api/v0/finder/model" - "github.com/ipfs/go-cid" -) - -type IndexerEndpoint struct { - c *http.Client - baseUrl string -} - -func NewIndexerEndpoint(url string) *IndexerEndpoint { - return &IndexerEndpoint{ - c: &http.Client{ - Timeout: time.Minute, - }, - baseUrl: url, - } -} - -func (idxf *IndexerEndpoint) sendRequest(req *http.Request) (*model.FindResponse, error) { - req.Header.Set("Content-Type", "application/json") - resp, err := idxf.c.Do(req) - if err != nil { - return nil, err - } - // Handle failed requests - if resp.StatusCode != http.StatusOK { - if resp.StatusCode == http.StatusNotFound { - return &model.FindResponse{}, nil - } - return nil, fmt.Errorf("batch find query failed: %v", http.StatusText(resp.StatusCode)) - } - - defer resp.Body.Close() - b, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - return model.UnmarshalFindResponse(b) -} - -func (idxf *IndexerEndpoint) FindCandidates(ctx context.Context, cid cid.Cid) ([]lassieretriever.RetrievalCandidate, error) { - u := fmt.Sprint(idxf.baseUrl, "/multihash/", cid.Hash().B58String()) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) - if err != nil { - return nil, err - } - - parsedResp, err := idxf.sendRequest(req) - if err != nil { - return nil, err - } - hash := string(cid.Hash()) - // turn parsedResp into records. - var matches []lassieretriever.RetrievalCandidate - - indices := rand.Perm(len(parsedResp.MultihashResults)) - for _, i := range indices { - multihashResult := parsedResp.MultihashResults[i] - - if !(string(multihashResult.Multihash) == hash) { - continue - } - for _, val := range multihashResult.ProviderResults { - // filter out any results that aren't filecoin graphsync - var dtm metadata.GraphsyncFilecoinV1 - if err := dtm.UnmarshalBinary(val.Metadata); err != nil { - continue - } - - matches = append(matches, lassieretriever.RetrievalCandidate{ - RootCid: cid, - MinerPeer: val.Provider, - }) - } - } - return matches, nil -} diff --git a/go.mod b/go.mod index 61ecbd6..fabe2ae 100644 --- a/go.mod +++ b/go.mod @@ -7,10 +7,8 @@ require ( github.com/dustin/go-humanize v1.0.0 github.com/filecoin-project/go-address v1.1.0 github.com/filecoin-project/go-state-types v0.9.9 - github.com/filecoin-project/index-provider v0.9.1 - github.com/filecoin-project/lassie v0.1.2 + github.com/filecoin-project/lassie v0.2.1-0.20230124092052-87b0369110fa github.com/filecoin-project/lotus v1.18.0 - github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 github.com/ipfs/go-bitswap v0.10.2 github.com/ipfs/go-block-format v0.0.3 github.com/ipfs/go-blockservice v0.4.0 @@ -29,7 +27,7 @@ require ( github.com/libp2p/go-libp2p-kad-dht v0.18.0 github.com/multiformats/go-multiaddr v0.7.0 github.com/prometheus/client_golang v1.14.0 - github.com/urfave/cli/v2 v2.16.3 + github.com/urfave/cli/v2 v2.23.7 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc go.opencensus.io v0.24.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 @@ -38,7 +36,7 @@ require ( ) require ( - github.com/BurntSushi/toml v1.1.0 // indirect + github.com/BurntSushi/toml v1.2.1 // indirect github.com/DataDog/zstd v1.4.5 // indirect github.com/GeertJohan/go.incremental v1.0.0 // indirect github.com/GeertJohan/go.rice v1.0.2 // indirect @@ -85,7 +83,7 @@ require ( github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082a837 // indirect github.com/filecoin-project/go-crypto v0.0.1 // indirect github.com/filecoin-project/go-data-transfer v1.15.2 // indirect - github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc1 // indirect + github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc1.0.20230111032640-1da99777ed8b // indirect github.com/filecoin-project/go-ds-versioning v0.1.2 // indirect github.com/filecoin-project/go-fil-commcid v0.1.0 // indirect github.com/filecoin-project/go-fil-markets v1.25.3-0.20230107010325-143abaddd0f3 // indirect @@ -98,6 +96,7 @@ require ( github.com/filecoin-project/go-paramfetch v0.0.4 // indirect github.com/filecoin-project/go-statemachine v1.0.3 // indirect github.com/filecoin-project/go-statestore v0.2.0 // indirect + github.com/filecoin-project/index-provider v0.9.2 // indirect github.com/filecoin-project/pubsub v1.0.0 // indirect github.com/filecoin-project/specs-actors v0.9.15 // indirect github.com/filecoin-project/specs-actors/v2 v2.3.6 // indirect @@ -162,10 +161,11 @@ require ( github.com/ipfs/go-verifcid v0.0.2 // indirect github.com/ipfs/interface-go-ipfs-core v0.7.0 // indirect github.com/ipld/go-car v0.5.0 // indirect - github.com/ipld/go-car/v2 v2.5.0 // indirect + github.com/ipld/go-car/v2 v2.5.1 // indirect github.com/ipld/go-codec-dagpb v1.5.0 // indirect github.com/ipld/go-ipld-prime v0.19.0 // indirect github.com/ipld/go-ipld-selector-text-lite v0.0.1 // indirect + github.com/ipni/storetheindex v0.5.4 // indirect github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect @@ -238,6 +238,7 @@ require ( github.com/rivo/uniseg v0.1.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/rvagg/go-prioritywaitqueue v1.0.3 // indirect github.com/shirou/gopsutil v2.18.12+incompatible // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect diff --git a/go.sum b/go.sum index fdae39e..ca70192 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,8 @@ git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGy github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= -github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak= +github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= @@ -275,8 +275,8 @@ github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2 github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-data-transfer v1.15.2 h1:PzqsFr2Q/onMGKrGh7TtRT0dKsJcVJrioJJnjnKmxlk= github.com/filecoin-project/go-data-transfer v1.15.2/go.mod h1:qXOJ3IF5dEJQHykXXTwcaRxu17bXAxr+LglXzkL6bZQ= -github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc1 h1:nQqLXSHhLTdCTceGqgRm/PmZxL5pc60INLBWxWAJ+IE= -github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc1/go.mod h1:R+IlFJkqjUOWS/3eqDaKGdPkD4JULosN7YdIN22+BuI= +github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc1.0.20230111032640-1da99777ed8b h1:Tk0nxIJOcpJc4pQa/uC5ykt9AmzaykcSUdyDlUT9+Dk= +github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc1.0.20230111032640-1da99777ed8b/go.mod h1:zcT4aJHBC+BnaF9tZrrevG30eaPezi5aGokP6YAW2PY= github.com/filecoin-project/go-ds-versioning v0.1.2 h1:to4pTadv3IeV1wvgbCbN6Vqd+fu+7tveXgv/rCEZy6w= github.com/filecoin-project/go-ds-versioning v0.1.2/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= @@ -317,10 +317,10 @@ github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZO github.com/filecoin-project/go-statestore v0.2.0 h1:cRRO0aPLrxKQCZ2UOQbzFGn4WDNdofHZoGPjfNaAo5Q= github.com/filecoin-project/go-statestore v0.2.0/go.mod h1:8sjBYbS35HwPzct7iT4lIXjLlYyPor80aU7t7a/Kspo= github.com/filecoin-project/go-storedcounter v0.1.0 h1:Mui6wSUBC+cQGHbDUBcO7rfh5zQkWJM/CpAZa/uOuus= -github.com/filecoin-project/index-provider v0.9.1 h1:Jnh9dviIHvQxZ2baNoYu3n8z6F9O62ksnVlyREgPyyM= -github.com/filecoin-project/index-provider v0.9.1/go.mod h1:NlHxQcy2iMGfUoUGUzrRxntcpiC50QSnvp68u2VTT40= -github.com/filecoin-project/lassie v0.1.2 h1:zhy0SyKMqu7aiGHMyQPZ9XXBVzRCsVWuRp2JodOQkUk= -github.com/filecoin-project/lassie v0.1.2/go.mod h1:otmKXejRu4QXFMfv+mVaNT5n7cMefDQ6tuGwNOClfQA= +github.com/filecoin-project/index-provider v0.9.2 h1:mOVyoRgymFcDc0XFew1WgdJ0GdKe0yVz9s+lrRfLa68= +github.com/filecoin-project/index-provider v0.9.2/go.mod h1:NlHxQcy2iMGfUoUGUzrRxntcpiC50QSnvp68u2VTT40= +github.com/filecoin-project/lassie v0.2.1-0.20230124092052-87b0369110fa h1:s8BhPW2JN8Yd5G62TkzoYRu61fEo5BRJ3eniabeEFUM= +github.com/filecoin-project/lassie v0.2.1-0.20230124092052-87b0369110fa/go.mod h1:VQeoI5nNJ81ZZxAJVg0pA5r2yTFUXaI4chJk7mQVn04= github.com/filecoin-project/lotus v1.18.0 h1:HxdShHMEZT703n9KlQTgPVoUF/ocidMC/d3TzwxzTP8= github.com/filecoin-project/lotus v1.18.0/go.mod h1:jJih5ApnJZssc/wWsLJm+IWnfy8YaCyaDbvs/wTIVDk= github.com/filecoin-project/pubsub v1.0.0 h1:ZTmT27U07e54qV1mMiQo4HDr0buo8I1LDHBYLXlsNXM= @@ -344,7 +344,6 @@ github.com/filecoin-project/specs-actors/v7 v7.0.1/go.mod h1:tPLEYXoXhcpyLh69Ccq github.com/filecoin-project/specs-actors/v8 v8.0.1 h1:4u0tIRJeT5G7F05lwLRIsDnsrN+bJ5Ixj6h49Q7uE2Y= github.com/filecoin-project/specs-actors/v8 v8.0.1/go.mod h1:UYIPg65iPWoFw5NEftREdJwv9b/5yaLKdCgTvNI/2FA= github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 h1:6GCuzxLVHBzlz7y+FkbHh6n0UyoEGWqDwJKQPJoz7bE= -github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= @@ -760,8 +759,8 @@ github.com/ipld/go-car v0.5.0 h1:kcCEa3CvYMs0iE5BzD5sV7O2EwMiCIp3uF8tA6APQT8= github.com/ipld/go-car v0.5.0/go.mod h1:ppiN5GWpjOZU9PgpAZ9HbZd9ZgSpwPMr48fGRJOWmvE= github.com/ipld/go-car/v2 v2.1.1/go.mod h1:+2Yvf0Z3wzkv7NeI69i8tuZ+ft7jyjPYIWZzeVNeFcI= github.com/ipld/go-car/v2 v2.4.1/go.mod h1:zjpRf0Jew9gHqSvjsKVyoq9OY9SWoEKdYCQUKVaaPT0= -github.com/ipld/go-car/v2 v2.5.0 h1:S9h7A6qBAJ+B1M1jIKtau+HPDe30UbM71vsyBzwvRIE= -github.com/ipld/go-car/v2 v2.5.0/go.mod h1:jKjGOqoCj5zn6KjnabD6JbnCsMntqU2hLiU6baZVO3E= +github.com/ipld/go-car/v2 v2.5.1 h1:U2ux9JS23upEgrJScW8VQuxmE94560kYxj9CQUpcfmk= +github.com/ipld/go-car/v2 v2.5.1/go.mod h1:jKjGOqoCj5zn6KjnabD6JbnCsMntqU2hLiU6baZVO3E= github.com/ipld/go-codec-dagpb v1.2.0/go.mod h1:6nBN7X7h8EOsEejZGqC7tej5drsdBAXbMHyBT+Fne5s= github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA= github.com/ipld/go-codec-dagpb v1.3.1/go.mod h1:ErNNglIi5KMur/MfFE/svtgQthzVvf+43MrzLbpcIZY= @@ -782,6 +781,8 @@ github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY= github.com/ipld/go-ipld-selector-text-lite v0.0.1 h1:lNqFsQpBHc3p5xHob2KvEg/iM5dIFn6iw4L/Hh+kS1Y= github.com/ipld/go-ipld-selector-text-lite v0.0.1/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM= +github.com/ipni/storetheindex v0.5.4 h1:K6kL8zy5LCf+JI8HRKrrdcxjk5xPmroYxWpQttJMC7U= +github.com/ipni/storetheindex v0.5.4/go.mod h1:c/NS640Iu2NrCCIErnUhsUM5KVEyeXymgtNnx6eDwMU= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= @@ -1366,6 +1367,8 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/rvagg/go-prioritywaitqueue v1.0.3 h1:+YQwo80uQcUCDs1zkuLt0Frt+97ljghYjWq4WoZPdHs= +github.com/rvagg/go-prioritywaitqueue v1.0.3/go.mod h1:fvzih7jz43jygRQKhaITNW3wLfUiWeA3qIH0rUTBhZI= github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= @@ -1461,8 +1464,8 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= -github.com/urfave/cli/v2 v2.16.3 h1:gHoFIwpPjoyIMbJp/VFd+/vuD0dAgFK4B6DpEMFJfQk= -github.com/urfave/cli/v2 v2.16.3/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= +github.com/urfave/cli/v2 v2.23.7 h1:YHDQ46s3VghFHFf1DdF+Sh7H4RqhcM+t0TmZRJx4oJY= +github.com/urfave/cli/v2 v2.23.7/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8= From ebf5c4ee77e70e7732cd7dea1e0eee6bfab343f4 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Wed, 25 Jan 2023 20:18:13 +1100 Subject: [PATCH 2/2] fix: add signal channels to speed up processing --- bitswap/provider.go | 53 ++++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/bitswap/provider.go b/bitswap/provider.go index 38699e9..20e5b36 100644 --- a/bitswap/provider.go +++ b/bitswap/provider.go @@ -68,13 +68,16 @@ type Provider struct { retriever *lassieretriever.Retriever // Incoming messages to be processed - work is 1 per message - requestQueue *peertaskqueue.PeerTaskQueue + requestQueue *peertaskqueue.PeerTaskQueue + requestQueueSignalChan chan struct{} // Outgoing messages to be sent - work is size of messages in bytes - responseQueue *peertaskqueue.PeerTaskQueue + responseQueue *peertaskqueue.PeerTaskQueue + responseQueueSignalChan chan struct{} // CIDs that need to be retrieved - work is 1 per CID queued - retrievalQueue *peertaskqueue.PeerTaskQueue + retrievalQueue *peertaskqueue.PeerTaskQueue + retrievalQueueSignalChan chan struct{} } type overwriteTaskMerger struct{} @@ -123,13 +126,16 @@ func NewProvider( } provider := &Provider{ - config: config, - network: network.NewFromIpfsHost(host, routing), - blockManager: blockManager, - retriever: retriever, - requestQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)), - responseQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)), - retrievalQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)), + config: config, + network: network.NewFromIpfsHost(host, routing), + blockManager: blockManager, + retriever: retriever, + requestQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)), + requestQueueSignalChan: make(chan struct{}, 10), + responseQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)), + responseQueueSignalChan: make(chan struct{}, 10), + retrievalQueue: peertaskqueue.New(peertaskqueue.TaskMerger(&overwriteTaskMerger{}), peertaskqueue.IgnoreFreezing(true)), + retrievalQueueSignalChan: make(chan struct{}, 10), } provider.network.Start(provider) @@ -160,13 +166,18 @@ func (provider *Provider) ReceiveMessage(ctx context.Context, sender peer.ID, in }) } provider.requestQueue.PushTasks(sender, tasks...) + provider.requestQueueSignalChan <- struct{}{} } func (provider *Provider) handleRequests(ctx context.Context) { for ctx.Err() == nil { peerID, tasks, _ := provider.requestQueue.PopTasks(100) if len(tasks) == 0 { - time.Sleep(time.Millisecond * 250) + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond * 250): + case <-provider.requestQueueSignalChan: + } continue } @@ -252,6 +263,7 @@ func (provider *Provider) handleRequest( Priority: int(entry.Priority), Work: 1, }) + provider.retrievalQueueSignalChan <- struct{}{} return nil } @@ -260,7 +272,11 @@ func (provider *Provider) handleResponses(ctx context.Context) { for ctx.Err() == nil { peerID, tasks, _ := provider.responseQueue.PopTasks(targetMessageSize) if len(tasks) == 0 { - time.Sleep(time.Millisecond * 250) + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond * 250): + case <-provider.responseQueueSignalChan: + } continue } @@ -327,7 +343,11 @@ func (provider *Provider) handleRetrievals(ctx context.Context) { for ctx.Err() == nil { peerID, tasks, _ := provider.retrievalQueue.PopTasks(1) if len(tasks) == 0 { - time.Sleep(time.Millisecond * 250) + select { + case <-ctx.Done(): + case <-time.After(time.Millisecond * 250): + case <-provider.retrievalQueueSignalChan: + } continue } @@ -397,11 +417,11 @@ func (provider *Provider) ReceiveError(err error) { } func (provider *Provider) PeerConnected(peerID peer.ID) { - log.Debugf("Peer %s connected", peerID) + // TODO: too noisy: log.Debugf("Peer %s connected", peerID) } func (provider *Provider) PeerDisconnected(peerID peer.ID) { - log.Debugf("Peer %s disconnected", peerID) + // TODO: too noisy: log.Debugf("Peer %s disconnected", peerID) } func (provider *Provider) queueSendHave(peerID peer.ID, priority int, cid cid.Cid) { @@ -414,6 +434,7 @@ func (provider *Provider) queueSendHave(peerID peer.ID, priority int, cid cid.Ci action: actionSendHave, }, }) + provider.responseQueueSignalChan <- struct{}{} } func (provider *Provider) queueSendDontHave(peerID peer.ID, priority int, cid cid.Cid, reason string) { @@ -427,6 +448,7 @@ func (provider *Provider) queueSendDontHave(peerID peer.ID, priority int, cid ci reason: reason, }, }) + provider.responseQueueSignalChan <- struct{}{} } func (provider *Provider) queueSendBlock(peerID peer.ID, priority int, cid cid.Cid, size int) { @@ -439,4 +461,5 @@ func (provider *Provider) queueSendBlock(peerID peer.ID, priority int, cid cid.C action: actionSendBlock, // TODO: maybe check retrieval task for this }, }) + provider.responseQueueSignalChan <- struct{}{} }