Skip to content

Commit

Permalink
fix(framework): add lock when creating pipeline (#7733) (#7736)
Browse files Browse the repository at this point in the history
Co-authored-by: Lynwee <[email protected]>
  • Loading branch information
github-actions[bot] and d4x1 authored Jul 15, 2024
1 parent d15f7b4 commit 5fa4200
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
8 changes: 4 additions & 4 deletions backend/server/services/blueprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func MakePlanForBlueprint(blueprint *models.Blueprint, syncPolicy *models.SyncPo
if err != nil {
return nil, err
}
return SequencializePipelinePlans(blueprint.BeforePlan, plan, blueprint.AfterPlan), nil
return SequentializePipelinePlans(blueprint.BeforePlan, plan, blueprint.AfterPlan), nil
}

// ParallelizePipelinePlans merges multiple pipelines into one unified plan
Expand All @@ -388,9 +388,9 @@ func ParallelizePipelinePlans(plans ...models.PipelinePlan) models.PipelinePlan
return merged
}

// SequencializePipelinePlans merges multiple pipelines into one unified plan
// by assuming they must be executed in sequencial order
func SequencializePipelinePlans(plans ...models.PipelinePlan) models.PipelinePlan {
// SequentializePipelinePlans merges multiple pipelines into one unified plan
// by assuming they must be executed in sequential order
func SequentializePipelinePlans(plans ...models.PipelinePlan) models.PipelinePlan {
merged := make(models.PipelinePlan, 0)
// iterate all pipelineTasks and try to merge them into `merged`
for _, plan := range plans {
Expand Down
2 changes: 1 addition & 1 deletion backend/server/services/blueprint_makeplan_v200.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func GeneratePlanJsonV200(
}
}
}
plan := SequencializePipelinePlans(
plan := SequentializePipelinePlans(
planForProjectMapping,
ParallelizePipelinePlans(sourcePlans...),
ParallelizePipelinePlans(metricPlans...),
Expand Down
7 changes: 6 additions & 1 deletion backend/server/services/pipeline_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package services

import (
"fmt"
"sync"
"time"

"github.com/apache/incubator-devlake/core/dal"
Expand All @@ -27,8 +28,12 @@ import (
"github.com/apache/incubator-devlake/helpers/dbhelper"
)

var createDbPipelineLock sync.Mutex

// CreateDbPipeline returns a NewPipeline
func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipeline, err errors.Error) {
createDbPipelineLock.Lock()
defer createDbPipelineLock.Unlock()
pipeline = &models.Pipeline{}
txHelper := dbhelper.NewTxHelper(basicRes, &err)
defer txHelper.End()
Expand All @@ -49,7 +54,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipelin
dal.From(&models.Pipeline{}),
dal.Where("blueprint_id = ? AND status IN ?", newPipeline.BlueprintId, models.PendingTaskStatus),
))
// some pipeline is ruunning , get the detail and output them.
// some pipeline is running, get the detail and output them.
if count > 0 {
return nil, errors.BadInput.New("there are pending pipelines of current blueprint already")
}
Expand Down

0 comments on commit 5fa4200

Please sign in to comment.