Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update boxo for bitswap providing refactor #10270

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/commands/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ var bitswapStatCmd = &cmds.Command{
human, _ := req.Options[bitswapHumanOptionName].(bool)

fmt.Fprintln(w, "bitswap status")
fmt.Fprintf(w, "\tprovides buffer: %d / %d\n", s.ProvideBufLen, bitswap.HasBlockBufferSize)
fmt.Fprintf(w, "\tblocks received: %d\n", s.BlocksReceived)
fmt.Fprintf(w, "\tblocks sent: %d\n", s.BlocksSent)
if human {
Expand Down
8 changes: 1 addition & 7 deletions core/commands/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/ipfs/kubo/core"
"github.com/ipfs/kubo/core/commands/cmdenv"

bservice "github.com/ipfs/boxo/blockservice"
offline "github.com/ipfs/boxo/exchange/offline"
dag "github.com/ipfs/boxo/ipld/merkledag"
ft "github.com/ipfs/boxo/ipld/unixfs"
mfs "github.com/ipfs/boxo/mfs"
Expand Down Expand Up @@ -162,11 +160,7 @@ var filesStatCmd = &cmds.Command{

var dagserv ipld.DAGService
if withLocal {
// an offline DAGService will not fetch from the network
dagserv = dag.NewDAGService(bservice.New(
node.Blockstore,
offline.Exchange(node.Blockstore),
))
dagserv = node.OfflineDAG
} else {
dagserv = node.DAG
}
Expand Down
5 changes: 1 addition & 4 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"os"
"time"

bserv "github.com/ipfs/boxo/blockservice"
offline "github.com/ipfs/boxo/exchange/offline"
dag "github.com/ipfs/boxo/ipld/merkledag"
verifcid "github.com/ipfs/boxo/verifcid"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -738,8 +736,7 @@ type pinVerifyOpts struct {
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan any, error) {
visited := make(map[cid.Cid]PinStatus)

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
DAG := n.OfflineDAG
getLinks := dag.GetLinksWithDAG(DAG)

var checkPin func(root cid.Cid) PinStatus
Expand Down
2 changes: 2 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ type IpfsNode struct {
BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.
OfflineBlocks bserv.BlockService `name:"offlineBlockService"` // blockservice which doesn't try to fetch from the network
DAG ipld.DAGService // the merkle dag service, get/add objects.
OfflineDAG ipld.DAGService `name:"offlineDagService"` // merkle dag service which doesn't try to fetch from the network
IPLDFetcherFactory fetcher.Factory `name:"ipldFetcher"` // fetcher that paths over the IPLD data model
UnixFSFetcherFactory fetcher.Factory `name:"unixfsFetcher"` // fetcher that interprets UnixFS data
OfflineIPLDFetcherFactory fetcher.Factory `name:"offlineIpldFetcher"` // fetcher that paths over the IPLD data model without fetching new blocks
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e

if settings.Offline || !settings.FetchBlocks {
subAPI.exchange = offlinexch.Exchange(subAPI.blockstore)
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange)
subAPI.blocks = bserv.New(subAPI.blockstore, nil, bserv.WithProvider(subAPI.provider))
subAPI.dag = dag.NewDAGService(subAPI.blocks)
}

Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

bserv "github.com/ipfs/boxo/blockservice"
offline "github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/path"
pin "github.com/ipfs/boxo/pinning/pinner"
Expand Down Expand Up @@ -195,7 +194,8 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro

visited := make(map[cid.Cid]*pinStatus)
bs := api.blockstore
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
// FIXME: we are recreating a dag and blockservice, maybe offline varients should be shared ?
DAG := merkledag.NewDAGService(bserv.New(bs, nil, bserv.WithProvider(api.provider)))
getLinks := merkledag.GetLinksWithDAG(DAG)

var checkPin func(root cid.Cid) *pinStatus
Expand Down
4 changes: 3 additions & 1 deletion core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
}
exch := api.exchange
pinning := api.pinning
prov := api.provider

if settings.OnlyHash {
node, err := getOrCreateNilNode()
Expand All @@ -115,9 +116,10 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
addblockstore = node.Blockstore
exch = node.Exchange
pinning = node.Pinning
prov = nil
}

bserv := blockservice.New(addblockstore, exch) // hash security 001
bserv := blockservice.New(addblockstore, exch, blockservice.WithProvider(prov)) // hash security 001
dserv := merkledag.NewDAGService(bserv)

// add a sync call to the DagService
Expand Down
6 changes: 2 additions & 4 deletions core/corehttp/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"net/http"
"time"

"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/gateway"
"github.com/ipfs/boxo/namesys"
Expand Down Expand Up @@ -86,7 +84,7 @@ func VersionOption() ServeOption {

func Libp2pGatewayOption() ServeOption {
return func(n *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
bserv := blockservice.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
bserv := n.OfflineBlocks

backend, err := gateway.NewBlocksBackend(bserv,
// GatewayOverLibp2p only returns things that are in local blockstore
Expand Down Expand Up @@ -125,7 +123,7 @@ func newGatewayBackend(n *core.IpfsNode) (gateway.IPFSBackend, error) {
pathResolver := n.UnixFSPathResolver

if cfg.Gateway.NoFetch {
bserv = blockservice.New(bserv.Blockstore(), offline.Exchange(bserv.Blockstore()))
bserv = n.OfflineBlocks

cs := cfg.Ipns.ResolveCacheSize
if cs == 0 {
Expand Down
20 changes: 9 additions & 11 deletions core/node/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,16 @@ type bitswapOptionsOut struct {
BitswapOpts []bitswap.Option `group:"bitswap-options,flatten"`
}

// BitswapOptions creates configuration options for Bitswap from the config file
// and whether to provide data.
func BitswapOptions(cfg *config.Config, provide bool) interface{} {
return func() bitswapOptionsOut {
// BitswapOptions creates configuration options for Bitswap from the config file.
func BitswapOptions(cfg *config.Config) fx.Option {
return fx.Provide(func(routing irouting.ProvideManyRouter) bitswapOptionsOut {
var internalBsCfg config.InternalBitswap
if cfg.Internal.Bitswap != nil {
internalBsCfg = *cfg.Internal.Bitswap
}

opts := []bitswap.Option{
bitswap.ProvideEnabled(provide),
bitswap.WithContentSearch(routing),
bitswap.ProviderSearchDelay(internalBsCfg.ProviderSearchDelay.WithDefault(DefaultProviderSearchDelay)), // See https://github.com/ipfs/go-ipfs/issues/8807 for rationale
bitswap.EngineBlockstoreWorkerCount(int(internalBsCfg.EngineBlockstoreWorkerCount.WithDefault(DefaultEngineBlockstoreWorkerCount))),
bitswap.TaskWorkerCount(int(internalBsCfg.TaskWorkerCount.WithDefault(DefaultTaskWorkerCount))),
Expand All @@ -50,25 +49,24 @@ func BitswapOptions(cfg *config.Config, provide bool) interface{} {
}

return bitswapOptionsOut{BitswapOpts: opts}
}
})
}

type onlineExchangeIn struct {
fx.In

Mctx helpers.MetricsCtx
Host host.Host
Rt irouting.ProvideManyRouter
Bs blockstore.GCBlockstore
BitswapOpts []bitswap.Option `group:"bitswap-options"`
}

// OnlineExchange creates new LibP2P backed block exchange (BitSwap).
// Additional options to bitswap.New can be provided via the "bitswap-options"
// group.
func OnlineExchange() interface{} {
return func(in onlineExchangeIn, lc fx.Lifecycle) exchange.Interface {
bitswapNetwork := network.NewFromIpfsHost(in.Host, in.Rt)
func OnlineExchange() fx.Option {
return fx.Provide(func(in onlineExchangeIn, lc fx.Lifecycle) exchange.Interface {
bitswapNetwork := network.NewFromIpfsHost(in.Host)

exch := bitswap.New(helpers.LifecycleCtx(in.Mctx, lc), bitswapNetwork, in.Bs, in.BitswapOpts...)
lc.Append(fx.Hook{
Expand All @@ -77,5 +75,5 @@ func OnlineExchange() interface{} {
},
})
return exch
}
})
}
82 changes: 65 additions & 17 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/ipfs/boxo/blockservice"
blockstore "github.com/ipfs/boxo/blockstore"
exchange "github.com/ipfs/boxo/exchange"
offline "github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/fetcher"
bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
"github.com/ipfs/boxo/filestore"
Expand All @@ -17,6 +16,7 @@ import (
pathresolver "github.com/ipfs/boxo/path/resolver"
pin "github.com/ipfs/boxo/pinning/pinner"
"github.com/ipfs/boxo/pinning/pinner/dspinner"
"github.com/ipfs/boxo/provider"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
format "github.com/ipfs/go-ipld-format"
Expand All @@ -29,8 +29,8 @@ import (
)

// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
bsvc := blockservice.New(bs, rem)
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface, prov provider.System) blockservice.BlockService {
bsvc := blockservice.New(bs, rem, blockservice.WithProvider(prov))

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
Expand All @@ -41,6 +41,32 @@ func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interf
return bsvc
}

type offlineIn struct {
fx.In

Bs blockstore.Blockstore
Prov provider.System `optional:"true"`
}

type offlineOut struct {
fx.Out

Bs blockservice.BlockService `name:"offlineBlockService"`
}

// OfflineBlockservice is like [BlockService] but it makes an offline version.
func OfflineBlockservice(lc fx.Lifecycle, in offlineIn) offlineOut {
bsvc := blockservice.New(in.Bs, nil, blockservice.WithProvider(in.Prov))

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bsvc.Close()
},
})

return offlineOut{Bs: bsvc}
}

// Pinning creates new pinner which tells GC which blocks should be kept
func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
rootDS := repo.Datastore()
Expand Down Expand Up @@ -82,38 +108,34 @@ func (s *syncDagService) Session(ctx context.Context) format.NodeGetter {
return merkledag.NewSession(ctx, s.DAGService)
}

// FetchersOut allows injection of fetchers.
type FetchersOut struct {
// fetchersOut allows injection of fetchers.
type fetchersOut struct {
fx.Out
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
}

// FetchersIn allows using fetchers for other dependencies.
type FetchersIn struct {
type fetcherIn struct {
fx.In
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
Online blockservice.BlockService
Offline blockservice.BlockService `name:"offlineBlockService"`
}

// FetcherConfig returns a fetcher config that can build new fetcher instances
func FetcherConfig(bs blockservice.BlockService) FetchersOut {
ipldFetcher := bsfetcher.NewFetcherConfig(bs)
func FetcherConfig(in fetcherIn) fetchersOut {
ipldFetcher := bsfetcher.NewFetcherConfig(in.Online)
ipldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
unixFSFetcher := ipldFetcher.WithReifier(unixfsnode.Reify)

// Construct offline versions which we can safely use in contexts where
// path resolution should not fetch new blocks via exchange.
offlineBs := blockservice.New(bs.Blockstore(), offline.Exchange(bs.Blockstore()))
offlineIpldFetcher := bsfetcher.NewFetcherConfig(offlineBs)
offlineIpldFetcher := bsfetcher.NewFetcherConfig(in.Offline)
offlineIpldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
offlineUnixFSFetcher := offlineIpldFetcher.WithReifier(unixfsnode.Reify)

return FetchersOut{
return fetchersOut{
IPLDFetcher: ipldFetcher,
UnixfsFetcher: unixFSFetcher,
OfflineIPLDFetcher: offlineIpldFetcher,
Expand All @@ -130,8 +152,17 @@ type PathResolversOut struct {
OfflineUnixFSPathResolver pathresolver.Resolver `name:"offlineUnixFSPathResolver"`
}

// PathResolverIn allows using fetchers for other dependencies.
type PathResolverIn struct {
fx.In
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
}

// PathResolverConfig creates path resolvers with the given fetchers.
func PathResolverConfig(fetchers FetchersIn) PathResolversOut {
func PathResolverConfig(fetchers PathResolverIn) PathResolversOut {
return PathResolversOut{
IPLDPathResolver: pathresolver.NewBasicResolver(fetchers.IPLDFetcher),
UnixFSPathResolver: pathresolver.NewBasicResolver(fetchers.UnixfsFetcher),
Expand All @@ -145,6 +176,23 @@ func Dag(bs blockservice.BlockService) format.DAGService {
return merkledag.NewDAGService(bs)
}

type offlineDagIn struct {
fx.In

Bs blockservice.BlockService `name:"offlineBlockService"`
}

type offlineDagOut struct {
fx.Out

DAG format.DAGService `name:"offlineDagService"`
}

// OfflineDag is like [Dag] but it makes an offline version.
func OfflineDag(lc fx.Lifecycle, in offlineDagIn) offlineDagOut {
return offlineDagOut{DAG: merkledag.NewDAGService(in.Bs)}
}

// Files loads persisted MFS root
func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) {
dsk := datastore.NewKey("/local/filesroot")
Expand Down
9 changes: 4 additions & 5 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,9 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
recordLifetime = d
}

/* don't provide from bitswap when the strategic provider service is active */
shouldBitswapProvide := !cfg.Experimental.StrategicProviding

return fx.Options(
fx.Provide(BitswapOptions(cfg, shouldBitswapProvide)),
fx.Provide(OnlineExchange()),
BitswapOptions(cfg),
OnlineExchange(),
fx.Provide(DNSResolver),
fx.Provide(Namesys(ipnsCacheSize, cfg.Ipns.MaxCacheTTL.WithDefault(config.DefaultIpnsMaxCacheTTL))),
fx.Provide(Peering),
Expand Down Expand Up @@ -307,7 +304,9 @@ func Offline(cfg *config.Config) fx.Option {
// Core groups basic IPFS services
var Core = fx.Options(
fx.Provide(BlockService),
fx.Provide(OfflineBlockservice),
fx.Provide(Dag),
fx.Provide(OfflineDag),
fx.Provide(FetcherConfig),
fx.Provide(PathResolverConfig),
fx.Provide(Pinning),
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/kubo-as-a-library/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.20
replace github.com/ipfs/kubo => ./../../..

require (
github.com/ipfs/boxo v0.17.1-0.20240206084652-79cb4e2886d7
github.com/ipfs/boxo v0.17.1-0.20240216140830-2e813d83999c
github.com/ipfs/kubo v0.0.0-00010101000000-000000000000
github.com/libp2p/go-libp2p v0.32.2
github.com/multiformats/go-multiaddr v0.12.2
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/kubo-as-a-library/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c h1:7Uy
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c/go.mod h1:6EekK/jo+TynwSE/ZOiOJd4eEvRXoavEC3vquKtv4yI=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/boxo v0.17.1-0.20240206084652-79cb4e2886d7 h1:1xhvfhNpPSJZ6GavPT6MuR15HhN4azBQvu7wsziJph4=
github.com/ipfs/boxo v0.17.1-0.20240206084652-79cb4e2886d7/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80=
github.com/ipfs/boxo v0.17.1-0.20240216140830-2e813d83999c h1:4wRWKU3JeuX90CrKA2u6/VuN/eptDLJJZ6RgOwwGkBs=
github.com/ipfs/boxo v0.17.1-0.20240216140830-2e813d83999c/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk=
Expand Down
Loading
Loading