Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to go-data-transfer v2 #103

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions channelid.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"strconv"
"strings"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/libp2p/go-libp2p-core/peer"
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/libp2p/go-libp2p/core/peer"
)

func ChannelIDFromString(id string) (*datatransfer.ChannelID, error) {
Expand Down
8 changes: 4 additions & 4 deletions filc/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/filecoin-project/boost/transport/httptransport"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
datatransfer "github.com/filecoin-project/go-data-transfer"
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/types"
Expand All @@ -37,10 +37,10 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
inet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
cli "github.com/urfave/cli/v2"
"golang.org/x/xerrors"
Expand Down
8 changes: 4 additions & 4 deletions filc/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
levelds "github.com/ipfs/go-ds-leveldb"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
metrics "github.com/libp2p/go-libp2p-core/metrics"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
metrics "github.com/libp2p/go-libp2p/core/metrics"
cli "github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -190,7 +190,7 @@ func setup(ctx context.Context, cfgdir string) (*Node, error) {
Blockstore: bstore,
DHT: dht,
Datastore: ds,
Bitswap: bswap.(*bitswap.Bitswap),
Bitswap: bswap,
Wallet: wallet,
}, nil
}
Expand Down
172 changes: 86 additions & 86 deletions filclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-commp-utils/writer"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channelmonitor"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
gst "github.com/filecoin-project/go-data-transfer/transport/graphsync"
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channelmonitor"
dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl"
dtnet "github.com/filecoin-project/go-data-transfer/v2/network"
gst "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
Expand Down Expand Up @@ -55,10 +55,10 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
car "github.com/ipld/go-car"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p/core/host"
inet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -226,28 +226,23 @@ func NewClientWithConfig(cfg *Config) (*FilClient, error) {
return nil, err
}

err = mgr.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, nil)
err = mgr.RegisterVoucherType(requestvalidation.StorageDataTransferVoucherType, nil)
if err != nil {
return nil, err
}

err = mgr.RegisterVoucherType(&retrievalmarket.DealProposal{}, nil)
err = mgr.RegisterVoucherType(retrievalmarket.DealProposalType, nil)
if err != nil {
return nil, err
}

err = mgr.RegisterVoucherType(&retrievalmarket.DealPayment{}, nil)
if err != nil {
return nil, err
}

err = mgr.RegisterVoucherResultType(&retrievalmarket.DealResponse{})
err = mgr.RegisterVoucherType(retrievalmarket.DealPaymentType, nil)
if err != nil {
return nil, err
}

if cfg.RetrievalConfigurer != nil {
if err := mgr.RegisterTransportConfigurer(&retrievalmarket.DealProposal{}, cfg.RetrievalConfigurer); err != nil {
if err := mgr.RegisterTransportConfigurer(retrievalmarket.DealProposalType, cfg.RetrievalConfigurer); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -1213,10 +1208,11 @@ func (fc *FilClient) StartDataTransfer(ctx context.Context, miner address.Addres
}

voucher := &requestvalidation.StorageDataTransferVoucher{Proposal: propCid}
node := requestvalidation.BindnodeRegistry.TypeToNode(voucher)

fc.host.ConnManager().Protect(mpid, "transferring")

chanid, err := fc.dataTransfer.OpenPushDataChannel(ctx, mpid, voucher, dataCid, shared.AllSelector())
chanid, err := fc.dataTransfer.OpenPushDataChannel(ctx, mpid, datatransfer.TypedVoucher{Voucher: node, Type: requestvalidation.StorageDataTransferVoucherType}, dataCid, shared.AllSelector())
if err != nil {
return nil, fmt.Errorf("opening push data channel: %w", err)
}
Expand Down Expand Up @@ -1322,7 +1318,7 @@ func (fc *FilClient) CheckChainDeal(ctx context.Context, dealid abi.DealID) (boo
func (fc *FilClient) CheckOngoingTransfer(ctx context.Context, miner address.Address, st *ChannelState) (outerr error) {
defer func() {
// TODO: this is only here because for some reason restarting a data transfer can just panic
// https://github.com/filecoin-project/go-data-transfer/issues/150
// https://github.com/filecoin-project/go-data-transfer/v2/issues/150
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// https://github.com/filecoin-project/go-data-transfer/v2/issues/150
// https://github.com/filecoin-project/go-data-transfer/issues/150

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the https url is unchanged. yay go modules being weird.

if e := recover(); e != nil {
outerr = fmt.Errorf("panic while checking transfer: %s", e)
}
Expand Down Expand Up @@ -1611,78 +1607,81 @@ func (fc *FilClient) RetrieveContentFromPeerWithProgressCallback(
return
case datatransfer.NewVoucher:
case datatransfer.NewVoucherResult:
voucher := state.LastVoucherResult()
resType, err := retrievalmarket.DealResponseFromNode(voucher.Voucher)
if err != nil {
log.Errorf("unexpected voucher result received: %s", err.Error())
return
}
if len(resType.Message) != 0 {
log.Debugf("Received deal response voucher result %s (%v): %s\n\t%+v", resType.Status, resType.Status, resType.Message, resType)
} else {
log.Debugf("Received deal response voucher result %s (%v)\n\t%+v", resType.Status, resType.Status, resType)
}

switch resType := state.LastVoucherResult().(type) {
case *retrievalmarket.DealResponse:
if len(resType.Message) != 0 {
log.Debugf("Received deal response voucher result %s (%v): %s\n\t%+v", resType.Status, resType.Status, resType.Message, resType)
} else {
log.Debugf("Received deal response voucher result %s (%v)\n\t%+v", resType.Status, resType.Status, resType)
}
switch resType.Status {
case retrievalmarket.DealStatusAccepted:
log.Info("Deal accepted")

// publish deal accepted event
fc.retrievalEventPublisher.Publish(rep.NewRetrievalEventAccepted(rep.RetrievalPhase, rootCid, peerID, address.Undef))

// Respond with a payment voucher when funds are requested
case retrievalmarket.DealStatusFundsNeeded, retrievalmarket.DealStatusFundsNeededLastPayment:
if pchRequired {
log.Infof("Sending payment voucher (nonce: %v, amount: %v)", nonce, resType.PaymentOwed)

totalPayment = types.BigAdd(totalPayment, resType.PaymentOwed)

switch resType.Status {
case retrievalmarket.DealStatusAccepted:
log.Info("Deal accepted")

// publish deal accepted event
fc.retrievalEventPublisher.Publish(rep.NewRetrievalEventAccepted(rep.RetrievalPhase, rootCid, peerID, address.Undef))

// Respond with a payment voucher when funds are requested
case retrievalmarket.DealStatusFundsNeeded, retrievalmarket.DealStatusFundsNeededLastPayment:
if pchRequired {
log.Infof("Sending payment voucher (nonce: %v, amount: %v)", nonce, resType.PaymentOwed)

totalPayment = types.BigAdd(totalPayment, resType.PaymentOwed)

vres, err := fc.pchmgr.CreateVoucher(ctx, pchAddr, paych.SignedVoucher{
ChannelAddr: pchAddr,
Lane: pchLane,
Nonce: nonce,
Amount: totalPayment,
})
if err != nil {
finish(err)
return
}

if types.BigCmp(vres.Shortfall, big.NewInt(0)) > 0 {
finish(fmt.Errorf("not enough funds remaining in payment channel (shortfall = %s)", vres.Shortfall))
return
}

if err := fc.dataTransfer.SendVoucher(ctx, chanidCopy, &retrievalmarket.DealPayment{
ID: proposal.ID,
PaymentChannel: pchAddr,
PaymentVoucher: vres.Voucher,
}); err != nil {
finish(fmt.Errorf("failed to send payment voucher: %w", err))
return
}

nonce++
} else {
finish(fmt.Errorf("the miner requested payment even though this transaction was determined to be zero cost"))
vres, err := fc.pchmgr.CreateVoucher(ctx, pchAddr, paych.SignedVoucher{
ChannelAddr: pchAddr,
Lane: pchLane,
Nonce: nonce,
Amount: totalPayment,
})
if err != nil {
finish(err)
return
}
case retrievalmarket.DealStatusRejected:
finish(fmt.Errorf("deal rejected: %s", resType.Message))
return
case retrievalmarket.DealStatusFundsNeededUnseal, retrievalmarket.DealStatusUnsealing:
finish(fmt.Errorf("data is sealed"))
return
case retrievalmarket.DealStatusCancelled:
finish(fmt.Errorf("deal cancelled: %s", resType.Message))
return
case retrievalmarket.DealStatusErrored:
finish(fmt.Errorf("deal errored: %s", resType.Message))
return
case retrievalmarket.DealStatusCompleted:
if allBytesReceived {
finish(nil)

if types.BigCmp(vres.Shortfall, big.NewInt(0)) > 0 {
finish(fmt.Errorf("not enough funds remaining in payment channel (shortfall = %s)", vres.Shortfall))
return
}
dealComplete = true

vouch := retrievalmarket.BindnodeRegistry.TypeToNode(&retrievalmarket.DealPayment{
ID: proposal.ID,
PaymentChannel: pchAddr,
PaymentVoucher: vres.Voucher,
})
if err := fc.dataTransfer.SendVoucher(ctx, chanidCopy, datatransfer.TypedVoucher{Type: retrievalmarket.DealPaymentType, Voucher: vouch}); err != nil {
finish(fmt.Errorf("failed to send payment voucher: %w", err))
return
}

nonce++
} else {
finish(fmt.Errorf("the miner requested payment even though this transaction was determined to be zero cost"))
return
}
case retrievalmarket.DealStatusRejected:
finish(fmt.Errorf("deal rejected: %s", resType.Message))
return
case retrievalmarket.DealStatusFundsNeededUnseal, retrievalmarket.DealStatusUnsealing:
finish(fmt.Errorf("data is sealed"))
return
case retrievalmarket.DealStatusCancelled:
finish(fmt.Errorf("deal cancelled: %s", resType.Message))
return
case retrievalmarket.DealStatusErrored:
finish(fmt.Errorf("deal errored: %s", resType.Message))
return
case retrievalmarket.DealStatusCompleted:
if allBytesReceived {
finish(nil)
return
}
dealComplete = true
}
case datatransfer.PauseInitiator:
case datatransfer.ResumeInitiator:
Expand Down Expand Up @@ -1740,7 +1739,8 @@ func (fc *FilClient) RetrieveContentFromPeerWithProgressCallback(
defer unsubscribe()

// Submit the retrieval deal proposal to the miner
newchid, err := fc.dataTransfer.OpenPullDataChannel(ctx, peerID, proposal, proposal.PayloadCID, shared.AllSelector())
vouch := retrievalmarket.BindnodeRegistry.TypeToNode(proposal)
newchid, err := fc.dataTransfer.OpenPullDataChannel(ctx, peerID, datatransfer.TypedVoucher{Type: retrievalmarket.DealProposalType, Voucher: vouch}, proposal.PayloadCID, shared.AllSelector())
if err != nil {
// We could fail before a successful proposal
// publish event failure
Expand Down
4 changes: 2 additions & 2 deletions filclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

cborutil "github.com/filecoin-project/go-cbor-util"
datatransfer "github.com/filecoin-project/go-data-transfer"
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api"
lotusactors "github.com/filecoin-project/lotus/chain/actors"
Expand All @@ -30,7 +30,7 @@ import (
chunk "github.com/ipfs/go-ipfs-chunker"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
Expand Down
Loading