Skip to content

Commit

Permalink
feat: replication factor for e2e deals (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alvin Reyes authored Apr 5, 2023
2 parents edb652e + 9b10562 commit ef6ea94
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 70 deletions.
163 changes: 143 additions & 20 deletions api/deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type DealRequest struct {
Size int64 `json:"size,omitempty"`
StartEpoch int64 `json:"start_epoch,omitempty"`
StartEpochInDays int64 `json:"start_epoch_in_days,omitempty"`
Replication int64 `json:"replication,omitempty"`
Replication int `json:"replication,omitempty"`
RemoveUnsealedCopy bool `json:"remove_unsealed_copy"`
SkipIPNIAnnounce bool `json:"skip_ipni_announce"`
AutoRetry bool `json:"auto_retry"`
Expand All @@ -61,20 +61,32 @@ type DealRequest struct {

// DealResponse Creating a new struct called DealResponse and then returning it.
type DealResponse struct {
Status string `json:"status"`
Message string `json:"message"`
ContentId int64 `json:"content_id,omitempty"`
DealRequest interface{} `json:"deal_request_meta,omitempty"`
DealProposalParameterRequest interface{} `json:"deal_proposal_parameter_request_meta,omitempty"`
Status string `json:"status"`
Message string `json:"message"`
ContentId int64 `json:"content_id,omitempty"`
DealRequest interface{} `json:"deal_request_meta,omitempty"`
DealProposalParameterRequest interface{} `json:"deal_proposal_parameter_request_meta,omitempty"`
ReplicatedContents []DealResponse `json:"replicated_contents,omitempty"`
}

type DealReplication struct {
Content model.Content `json:"content"`
ContentDealProposalParameter model.ContentDealProposalParameters `json:"deal_proposal_parameter"`
DealRequest DealRequest `json:"deal_request"`
}

var statsService *core.StatsService

//var replicationService *core.ReplicationService

// ConfigureDealRouter It's a function that takes a pointer to an echo.Group and a pointer to a DeltaNode, and then it adds a bunch of routes
// to the echo.Group
// `ConfigureDealRouter` is a function that takes a `Group` and a `DeltaNode` and configures the `Group` to handle the
// `DeltaNode`'s deal-making functionality
func ConfigureDealRouter(e *echo.Group, node *core.DeltaNode) {

statsService := core.NewStatsStatsService(node)
statsService = core.NewStatsStatsService(node)
//replicationService = core.NewReplicationService(node)
dealMake := e.Group("/deal")

// upload limiter middleware
Expand Down Expand Up @@ -635,7 +647,7 @@ func handleExistingContentAdd(c echo.Context, node *core.DeltaNode) error {

func handleEndToEndDeal(c echo.Context, node *core.DeltaNode) error {
var dealRequest DealRequest
// lets record this.

authorizationString := c.Request().Header.Get("Authorization")
authParts := strings.Split(authorizationString, " ")
file, err := c.FormFile("data") // file
Expand Down Expand Up @@ -809,23 +821,58 @@ func handleEndToEndDeal(c echo.Context, node *core.DeltaNode) error {

// deal proposal parameters
tx.Create(&dealProposalParam)
if dealRequest.Replication == 0 {
var dispatchJobs core.IProcessor
if pieceCommp.ID != 0 {
dispatchJobs = jobs.NewStorageDealMakerProcessor(node, content, pieceCommp) // straight to storage deal making
} else {
dispatchJobs = jobs.NewPieceCommpProcessor(node, content) // straight to pieceCommp
}

node.Dispatcher.AddJobAndDispatch(dispatchJobs, 1)

err = c.JSON(200, DealResponse{
Status: "success",
Message: "Deal request received. Please take note of the content_id. You can use the content_id to check the status of the deal.",
ContentId: content.ID,
DealRequest: dealRequest,
DealProposalParameterRequest: dealProposalParam,
})

var dispatchJobs core.IProcessor
if pieceCommp.ID != 0 {
dispatchJobs = jobs.NewStorageDealMakerProcessor(node, content, pieceCommp) // straight to storage deal making
} else {
dealReplication := DealReplication{
Content: content,
ContentDealProposalParameter: dealProposalParam,
DealRequest: dealRequest,
}

// TODO: Improve this, this is a hack to make sure the replication is done before the deal is made
contents := ReplicateContent(dealReplication, dealRequest, tx)
var dispatchJobs core.IProcessor
for _, contentRep := range contents {
dispatchJobs = jobs.NewPieceCommpProcessor(node, contentRep.Content) // straight to pieceCommp
node.Dispatcher.AddJob(dispatchJobs)
}
dispatchJobs = jobs.NewPieceCommpProcessor(node, content) // straight to pieceCommp
}
node.Dispatcher.AddJob(dispatchJobs)

node.Dispatcher.AddJobAndDispatch(dispatchJobs, 1)
node.Dispatcher.Start(len(contents) + 1)
err = c.JSON(200, DealResponse{
Status: "success",
Message: "Deal request received. Please take note of the content_id. You can use the content_id to check the status of the deal.",
ContentId: content.ID,
DealRequest: dealRequest,
DealProposalParameterRequest: dealProposalParam,
ReplicatedContents: func() []DealResponse {
var dealResponses []DealResponse
for _, contentRep := range contents {
dealResponses = append(dealResponses, contentRep.DealResponse)
}
return dealResponses
}(),
})
}

err = c.JSON(200, DealResponse{
Status: "success",
Message: "Deal request received. Please take note of the content_id. You can use the content_id to check the status of the deal.",
ContentId: content.ID,
DealRequest: dealRequest,
DealProposalParameterRequest: dealProposalParam,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -1324,10 +1371,18 @@ func ValidateMeta(dealRequest DealRequest) error {
// return errors.New("miner is required")
//}

if (DealRequest{} != dealRequest && dealRequest.Replication > 0 && dealRequest.ConnectionMode == utils.CONNECTION_MODE_IMPORT) {
return errors.New("replication factor is not supported for import mode")
}

if (DealRequest{} != dealRequest && dealRequest.DurationInDays > 0 && dealRequest.StartEpochInDays == 0) {
return errors.New("start_epoch_in_days is required when duration_in_days is set")
}

if (DealRequest{} != dealRequest && dealRequest.Replication > 6) {
return errors.New("replication factor can only be up to 6")
}

if (DealRequest{} != dealRequest && dealRequest.StartEpochInDays > 0 && dealRequest.DurationInDays == 0) {
return errors.New("duration_in_days is required when start_epoch_in_days is set")
}
Expand Down Expand Up @@ -1396,6 +1451,74 @@ func ValidateMeta(dealRequest DealRequest) error {
return nil
}

type ReplicatedContent struct {
Content model.Content
DealRequest DealRequest
DealResponse DealResponse
}

func ReplicateContent(contentSource DealReplication, dealRequest DealRequest, txn *gorm.DB) []ReplicatedContent {
var replicatedContents []ReplicatedContent
for i := 0; i < dealRequest.Replication; i++ {
var replicatedContent ReplicatedContent
var dealResponse DealResponse
var newContent model.Content
var newContentDealProposalParameter model.ContentDealProposalParameters
newContent = contentSource.Content
newContentDealProposalParameter = contentSource.ContentDealProposalParameter
newContent.ID = 0

err := txn.Create(&newContent).Error
if err != nil {
fmt.Println(err)
return nil
}

newContentDealProposalParameter.ID = 0
newContentDealProposalParameter.Content = newContent.ID
err = txn.Create(&newContentDealProposalParameter).Error
if err != nil {
//tx.Rollback()
fmt.Println(err)
return nil
}
// assign a miner
minerAssignService := core.NewMinerAssignmentService()
provider, errOnPv := minerAssignService.GetSPWithGivenBytes(newContent.Size)
if errOnPv != nil {
fmt.Println(errOnPv)
return nil
}

contentMinerAssignment := model.ContentMiner{
Miner: provider.Address,
Content: newContent.ID,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
err = txn.Create(&contentMinerAssignment).Error
if err != nil {
//tx.Rollback()
fmt.Println(err)
return nil
}
dealRequest.Miner = provider.Address
dealResponse.DealRequest = dealRequest
dealResponse.ContentId = newContent.ID
dealResponse.DealProposalParameterRequest = newContentDealProposalParameter
dealResponse.Status = utils.CONTENT_PINNED
dealResponse.Message = "Content replication request successful"

replicatedContent.Content = newContent
replicatedContent.DealRequest = dealRequest
replicatedContent.DealResponse = dealResponse

replicatedContents = append(replicatedContents, replicatedContent)

}
return replicatedContents
}

// It takes a request, and returns a response
func handlePrepareContent(c echo.Context, node *core.DeltaNode, statsService core.StatsService) {
// > This function is called when a node receives a `PrepareCommitmentPiece` message
Expand Down
76 changes: 76 additions & 0 deletions core/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package core

//
//import (
// "fmt"
// model "github.com/application-research/delta-db/db_models"
// "gorm.io/gorm"
// "time"
//)
//
//type ReplicationService struct {
// // `NewReplicationService` is a function that returns a `ReplicationService` struct
// LightNode *DeltaNode
//}
//
//// NewReplicationService Creating a new `ReplicationService` struct.
//func NewReplicationService(ln *DeltaNode) *ReplicationService {
// return &ReplicationService{
// LightNode: ln,
// }
//}
//
//type DealReplication struct {
// Content model.Content `json:"content"`
// ContentDealProposalParameter model.ContentDealProposalParameters `json:"deal_proposal_parameter"`
//}
//
//func (r ReplicationService) ReplicateContent(contentSource DealReplication, numberOfReplication int, txn *gorm.DB) []model.Content {
// var replicatedContents []model.Content
// for i := 0; i < numberOfReplication; i++ {
// var newContent model.Content
// var newContentDealProposalParameter model.ContentDealProposalParameters
// newContent = contentSource.Content
// newContentDealProposalParameter = contentSource.ContentDealProposalParameter
// newContent.ID = 0
//
// err := txn.Create(&newContent).Error
// if err != nil {
// fmt.Println(err)
// return nil
// }
//
// newContentDealProposalParameter.ID = 0
// newContentDealProposalParameter.Content = newContent.ID
// err = txn.Create(&newContentDealProposalParameter).Error
// if err != nil {
// //tx.Rollback()
// fmt.Println(err)
// return nil
// }
// // assign a miner
// minerAssignService := NewMinerAssignmentService()
// provider, errOnPv := minerAssignService.GetSPWithGivenBytes(newContent.Size)
// if errOnPv != nil {
// fmt.Println(errOnPv)
// return nil
// }
//
// contentMinerAssignment := model.ContentMiner{
// Miner: provider.Address,
// Content: newContent.ID,
// CreatedAt: time.Now(),
// UpdatedAt: time.Now(),
// }
// err = txn.Create(&contentMinerAssignment).Error
// if err != nil {
// //tx.Rollback()
// fmt.Println(err)
// return nil
// }
//
// replicatedContents = append(replicatedContents, newContent)
//
// }
// return replicatedContents
//}
49 changes: 35 additions & 14 deletions jobs/piece_commp_compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,38 @@ func NewPieceCommpProcessor(ln *core.DeltaNode, content model.Content) IProcesso

// Run The process of generating the commp.
func (i PieceCommpProcessor) Run() error {
i.LightNode.DB.Model(&i.Content).Where("id = ?", i.Content.ID).Updates(model.Content{
Status: utils.CONTENT_PIECE_COMPUTING,
UpdatedAt: time.Now(),
})

// if you already have the piece entry for the CID, let's just create a new record with the same commp

var content model.Content
i.LightNode.DB.Model(&i.Content).Where("id = ?", i.Content.ID).Find(&content)

var existingCommp model.PieceCommitment
i.LightNode.DB.Model(&model.PieceCommitment{}).Where("cid = ?", i.Content.Cid).Find(&existingCommp)
if existingCommp.ID != 0 {

// just assign it if it's already there.
i.Content.Status = utils.CONTENT_PIECE_ASSIGNED
i.Content.PieceCommitmentId = existingCommp.ID
i.Content.UpdatedAt = time.Now()
i.LightNode.DB.Save(&i.Content)

// then launch the deal maker with the content and the existing commp
item := NewStorageDealMakerProcessor(i.LightNode, i.Content, existingCommp)
i.LightNode.Dispatcher.AddJobAndDispatch(item, 1)
return nil
}

content.Status = utils.CONTENT_PIECE_COMPUTING
content.UpdatedAt = time.Now()
i.LightNode.DB.Save(&content)

payloadCid, err := cid.Decode(i.Content.Cid)
if err != nil {
i.LightNode.DB.Model(&i.Content).Where("id = ?", i.Content.ID).Updates(model.Content{
Status: utils.CONTENT_PIECE_COMPUTING_FAILED,
LastMessage: err.Error(),
UpdatedAt: time.Now(),
})
content.Status = utils.CONTENT_PIECE_COMPUTING_FAILED
content.LastMessage = err.Error()
content.UpdatedAt = time.Now()
i.LightNode.DB.Save(&content)
}

// prepare the commp
Expand Down Expand Up @@ -135,11 +155,12 @@ func (i PieceCommpProcessor) Run() error {
}

i.LightNode.DB.Create(commpRec)
i.LightNode.DB.Model(&i.Content).Where("id = ?", i.Content.ID).Updates(model.Content{
Status: utils.CONTENT_PIECE_ASSIGNED,
PieceCommitmentId: commpRec.ID,
UpdatedAt: time.Now(),
})

// update the content record
i.Content.Status = utils.CONTENT_PIECE_ASSIGNED
i.Content.PieceCommitmentId = commpRec.ID
i.Content.UpdatedAt = time.Now()
i.LightNode.DB.Save(&i.Content)

// add this to the job queue
item := NewStorageDealMakerProcessor(i.LightNode, i.Content, *commpRec)
Expand Down
Loading

0 comments on commit ef6ea94

Please sign in to comment.