Skip to content

Commit

Permalink
ActionRun ConcurrencyGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
Zettat123 committed Dec 10, 2024
1 parent 03f4471 commit 3551677
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 19 deletions.
14 changes: 11 additions & 3 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ActionRun struct {
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
Status Status `xorm:"index"`
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
ConcurrencyGroup string
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Expand Down Expand Up @@ -195,13 +196,20 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
// Find all runs in the specified repository, reference, and workflow with non-final status
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
opts := &FindRunOptions{
RepoID: repoID,
Ref: ref,
WorkflowID: workflowID,
TriggerEvent: event,
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
})
}
return CancelPreviousJobsWithOpts(ctx, opts)
}

// CancelPreviousJobs cancels all previous jobs with opts
func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error {
// Find all runs by opts
runs, total, err := db.FindAndCount[ActionRun](ctx, opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -262,7 +270,7 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin

// InsertRun inserts a run
// The title will be cut off at 255 characters if it's longer than 255 characters.
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow, blockedByConcurrency bool) error {
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return err
Expand Down
30 changes: 21 additions & 9 deletions models/actions/run_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,16 @@ func (runs RunList) LoadRepos(ctx context.Context) error {

type FindRunOptions struct {
db.ListOptions
RepoID int64
OwnerID int64
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
TriggerEvent webhook_module.HookEventType
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
RepoID int64
OwnerID int64
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
TriggerEvent webhook_module.HookEventType
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
SortType string
ConcurrencyGroup string
}

func (opts FindRunOptions) ToConds() builder.Cond {
Expand Down Expand Up @@ -99,11 +101,21 @@ func (opts FindRunOptions) ToConds() builder.Cond {
if opts.TriggerEvent != "" {
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
}
if len(opts.ConcurrencyGroup) > 0 {
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
}
return cond
}

func (opts FindRunOptions) ToOrders() string {
return "`id` DESC"
switch opts.SortType {
case "oldest":
return "created_unix ASC"
case "newest":
return "created_unix DESC"
default:
return "`id` DESC"
}
}

type StatusInfo struct {
Expand Down
1 change: 1 addition & 0 deletions models/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func prepareMigrationTasks() []*migration {
newMigration(309, "Improve Notification table indices", v1_23.ImproveNotificationTableIndices),
newMigration(310, "Add Priority to ProtectedBranch", v1_23.AddPriorityToProtectedBranch),
newMigration(311, "Add TimeEstimate to Issue table", v1_23.AddTimeEstimateColumnToIssueTable),
// TODO: add AddConcurrencyGroupToActionRun
}
return preparedMigrations
}
Expand Down
16 changes: 16 additions & 0 deletions models/migrations/v1_23/v312.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package v1_23 //nolint

import (
"xorm.io/xorm"
)

func AddConcurrencyGroupToActionRun(x *xorm.Engine) error {
type ActionRun struct {
ConcurrencyGroup string
}

return x.Sync(new(ActionRun))
}
2 changes: 1 addition & 1 deletion routers/web/repo/actions/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ func Run(ctx *context_module.Context) {
}

// Insert the action run and its associated jobs into the database
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
if err := actions_model.InsertRun(ctx, run, workflows, false); err != nil {
ctx.ServerError("workflow", err)
return
}
Expand Down
21 changes: 21 additions & 0 deletions services/actions/concurrency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package actions

import (
"context"

actions_model "code.gitea.io/gitea/models/actions"
)

func CancelActionRunByConcurrency(ctx context.Context, run *actions_model.ActionRun) error {
return actions_model.CancelPreviousJobsWithOpts(ctx, &actions_model.FindRunOptions{
ConcurrencyGroup: run.ConcurrencyGroup,
Status: []actions_model.Status{
actions_model.StatusRunning,
actions_model.StatusWaiting,
actions_model.StatusBlocked,
},
})
}
46 changes: 45 additions & 1 deletion services/actions/job_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,57 @@ func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate {
ctx := graceful.GetManager().ShutdownContext()
var ret []*jobUpdate
for _, update := range items {
if err := checkJobsOfRun(ctx, update.RunID); err != nil {
if err := checkJobsByRunID(ctx, update.RunID); err != nil {
ret = append(ret, update)
}
}
return ret
}

func checkJobsByRunID(ctx context.Context, runID int64) error {
run, exist, err := db.GetByID[actions_model.ActionRun](ctx, runID)
if err != nil {
return fmt.Errorf("get action run: %w", err)
}
if !exist {
return fmt.Errorf("action run %d does not exist", runID)
}

return db.WithTx(ctx, func(ctx context.Context) error {
// check jobs of the current run
if err := checkJobsOfRun(ctx, runID); err != nil {
return err
}

// check jobs by the concurrency group of the run
if len(run.ConcurrencyGroup) == 0 {
return nil
}
concurrentActionRuns, err := db.Find[actions_model.ActionRun](ctx, &actions_model.FindRunOptions{
ConcurrencyGroup: run.ConcurrencyGroup,
Status: []actions_model.Status{
actions_model.StatusRunning,
actions_model.StatusWaiting,
actions_model.StatusBlocked,
},
SortType: "oldest",
})
if err != nil {
return fmt.Errorf("find action run with concurrency group %s: %w", run.ConcurrencyGroup, err)
}
for _, cRun := range concurrentActionRuns {
if cRun.NeedApproval {
continue
}
if err := checkJobsOfRun(ctx, cRun.ID); err != nil {
return err
}
break // only run one blocked action run with the same concurrency group
}
return nil
})
}

func checkJobsOfRun(ctx context.Context, runID int64) error {
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID})
if err != nil {
Expand Down
16 changes: 12 additions & 4 deletions services/actions/notifier_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,18 @@ func handleWorkflows(
continue
}

// check workflow concurrency
wfGitCtx := jobparser.ToGitContext(GenerateGitContext(run, nil))
concurrencyGroup, concurrencyCancel := jobparser.InterpolateWorkflowConcurrency(dwf.Concurrency, wfGitCtx, vars)
_, _ = concurrencyGroup, concurrencyCancel
// TODO: check concurrencyGroup and concurrencyCancel
wfConcurrencyGroup, wfConcurrencyCancel := jobparser.InterpolateWorkflowConcurrency(dwf.Concurrency, wfGitCtx, vars)
if len(wfConcurrencyGroup) > 0 {
run.ConcurrencyGroup = wfConcurrencyGroup
if wfConcurrencyCancel {
if err := CancelActionRunByConcurrency(ctx, run); err != nil {
log.Error("CancelActionRunByConcurrency: %v", err)
}
}
}

jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars))
if err != nil {
log.Error("jobparser.Parse: %v", err)
Expand All @@ -356,7 +364,7 @@ func handleWorkflows(
}
}

if err := actions_model.InsertRun(ctx, run, jobs); err != nil {
if err := actions_model.InsertRun(ctx, run, jobs, !wfConcurrencyCancel); err != nil {
log.Error("InsertRun: %v", err)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion services/actions/schedule_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
}

// Insert the action run and its associated jobs into the database
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
if err := actions_model.InsertRun(ctx, run, workflows, false); err != nil {
return err
}

Expand Down

0 comments on commit 3551677

Please sign in to comment.