Skip to content

Commit fcf4517

Browse files
committed
ActionRun ConcurrencyGroup
1 parent ab75c3f commit fcf4517

File tree

9 files changed

+129
-19
lines changed

9 files changed

+129
-19
lines changed

models/actions/run.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type ActionRun struct {
4646
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4747
Status Status `xorm:"index"`
4848
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
49+
ConcurrencyGroup string
4950
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5051
Started timeutil.TimeStamp
5152
Stopped timeutil.TimeStamp
@@ -195,13 +196,20 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
195196
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
196197
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
197198
// Find all runs in the specified repository, reference, and workflow with non-final status
198-
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
199+
opts := &FindRunOptions{
199200
RepoID: repoID,
200201
Ref: ref,
201202
WorkflowID: workflowID,
202203
TriggerEvent: event,
203204
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
204-
})
205+
}
206+
return CancelPreviousJobsWithOpts(ctx, opts)
207+
}
208+
209+
// CancelPreviousJobs cancels all previous jobs with opts
210+
func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error {
211+
// Find all runs by opts
212+
runs, total, err := db.FindAndCount[ActionRun](ctx, opts)
205213
if err != nil {
206214
return err
207215
}
@@ -262,7 +270,7 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
262270

263271
// InsertRun inserts a run
264272
// The title will be cut off at 255 characters if it's longer than 255 characters.
265-
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
273+
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow, blockedByConcurrency bool) error {
266274
ctx, committer, err := db.TxContext(ctx)
267275
if err != nil {
268276
return err

models/actions/run_list.go

+21-9
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,16 @@ func (runs RunList) LoadRepos(ctx context.Context) error {
6363

6464
type FindRunOptions struct {
6565
db.ListOptions
66-
RepoID int64
67-
OwnerID int64
68-
WorkflowID string
69-
Ref string // the commit/tag/… that caused this workflow
70-
TriggerUserID int64
71-
TriggerEvent webhook_module.HookEventType
72-
Approved bool // not util.OptionalBool, it works only when it's true
73-
Status []Status
66+
RepoID int64
67+
OwnerID int64
68+
WorkflowID string
69+
Ref string // the commit/tag/… that caused this workflow
70+
TriggerUserID int64
71+
TriggerEvent webhook_module.HookEventType
72+
Approved bool // not util.OptionalBool, it works only when it's true
73+
Status []Status
74+
SortType string
75+
ConcurrencyGroup string
7476
}
7577

7678
func (opts FindRunOptions) ToConds() builder.Cond {
@@ -99,11 +101,21 @@ func (opts FindRunOptions) ToConds() builder.Cond {
99101
if opts.TriggerEvent != "" {
100102
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
101103
}
104+
if len(opts.ConcurrencyGroup) > 0 {
105+
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
106+
}
102107
return cond
103108
}
104109

105110
func (opts FindRunOptions) ToOrders() string {
106-
return "`id` DESC"
111+
switch opts.SortType {
112+
case "oldest":
113+
return "created_unix ASC"
114+
case "newest":
115+
return "created_unix DESC"
116+
default:
117+
return "`id` DESC"
118+
}
107119
}
108120

109121
type StatusInfo struct {

models/migrations/migrations.go

+1
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ func prepareMigrationTasks() []*migration {
369369
newMigration(309, "Improve Notification table indices", v1_23.ImproveNotificationTableIndices),
370370
newMigration(310, "Add Priority to ProtectedBranch", v1_23.AddPriorityToProtectedBranch),
371371
newMigration(311, "Add TimeEstimate to Issue table", v1_23.AddTimeEstimateColumnToIssueTable),
372+
// TODO: add AddConcurrencyGroupToActionRun
372373
}
373374
return preparedMigrations
374375
}

models/migrations/v1_23/v312.go

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright 2024 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package v1_23 //nolint
5+
6+
import (
7+
"xorm.io/xorm"
8+
)
9+
10+
func AddConcurrencyGroupToActionRun(x *xorm.Engine) error {
11+
type ActionRun struct {
12+
ConcurrencyGroup string
13+
}
14+
15+
return x.Sync(new(ActionRun))
16+
}

routers/web/repo/actions/view.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -893,7 +893,7 @@ func Run(ctx *context_module.Context) {
893893
}
894894

895895
// Insert the action run and its associated jobs into the database
896-
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
896+
if err := actions_model.InsertRun(ctx, run, workflows, false); err != nil {
897897
ctx.ServerError("workflow", err)
898898
return
899899
}

services/actions/concurrency.go

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright 2024 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package actions
5+
6+
import (
7+
"context"
8+
9+
actions_model "code.gitea.io/gitea/models/actions"
10+
)
11+
12+
func CancelActionRunByConcurrency(ctx context.Context, run *actions_model.ActionRun) error {
13+
return actions_model.CancelPreviousJobsWithOpts(ctx, &actions_model.FindRunOptions{
14+
ConcurrencyGroup: run.ConcurrencyGroup,
15+
Status: []actions_model.Status{
16+
actions_model.StatusRunning,
17+
actions_model.StatusWaiting,
18+
actions_model.StatusBlocked,
19+
},
20+
})
21+
}

services/actions/job_emitter.go

+45-1
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,57 @@ func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate {
3737
ctx := graceful.GetManager().ShutdownContext()
3838
var ret []*jobUpdate
3939
for _, update := range items {
40-
if err := checkJobsOfRun(ctx, update.RunID); err != nil {
40+
if err := checkJobsByRunID(ctx, update.RunID); err != nil {
4141
ret = append(ret, update)
4242
}
4343
}
4444
return ret
4545
}
4646

47+
func checkJobsByRunID(ctx context.Context, runID int64) error {
48+
run, exist, err := db.GetByID[actions_model.ActionRun](ctx, runID)
49+
if err != nil {
50+
return fmt.Errorf("get action run: %w", err)
51+
}
52+
if !exist {
53+
return fmt.Errorf("action run %d does not exist", runID)
54+
}
55+
56+
return db.WithTx(ctx, func(ctx context.Context) error {
57+
// check jobs of the current run
58+
if err := checkJobsOfRun(ctx, runID); err != nil {
59+
return err
60+
}
61+
62+
// check jobs by the concurrency group of the run
63+
if len(run.ConcurrencyGroup) == 0 {
64+
return nil
65+
}
66+
concurrentActionRuns, err := db.Find[actions_model.ActionRun](ctx, &actions_model.FindRunOptions{
67+
ConcurrencyGroup: run.ConcurrencyGroup,
68+
Status: []actions_model.Status{
69+
actions_model.StatusRunning,
70+
actions_model.StatusWaiting,
71+
actions_model.StatusBlocked,
72+
},
73+
SortType: "oldest",
74+
})
75+
if err != nil {
76+
return fmt.Errorf("find action run with concurrency group %s: %w", run.ConcurrencyGroup, err)
77+
}
78+
for _, cRun := range concurrentActionRuns {
79+
if cRun.NeedApproval {
80+
continue
81+
}
82+
if err := checkJobsOfRun(ctx, cRun.ID); err != nil {
83+
return err
84+
}
85+
break // only run one blocked action run with the same concurrency group
86+
}
87+
return nil
88+
})
89+
}
90+
4791
func checkJobsOfRun(ctx context.Context, runID int64) error {
4892
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID})
4993
if err != nil {

services/actions/notifier_helper.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -332,10 +332,18 @@ func handleWorkflows(
332332
continue
333333
}
334334

335+
// check workflow concurrency
335336
wfGitCtx := jobparser.ToGitContext(GenerateGitContext(run, nil))
336-
concurrencyGroup, concurrencyCancel := jobparser.InterpolateWorkflowConcurrency(dwf.Concurrency, wfGitCtx, vars)
337-
_, _ = concurrencyGroup, concurrencyCancel
338-
// TODO: check concurrencyGroup and concurrencyCancel
337+
wfConcurrencyGroup, wfConcurrencyCancel := jobparser.InterpolateWorkflowConcurrency(dwf.Concurrency, wfGitCtx, vars)
338+
if len(wfConcurrencyGroup) > 0 {
339+
run.ConcurrencyGroup = wfConcurrencyGroup
340+
if wfConcurrencyCancel {
341+
if err := CancelActionRunByConcurrency(ctx, run); err != nil {
342+
log.Error("CancelActionRunByConcurrency: %v", err)
343+
}
344+
}
345+
}
346+
339347
jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars))
340348
if err != nil {
341349
log.Error("jobparser.Parse: %v", err)
@@ -356,7 +364,7 @@ func handleWorkflows(
356364
}
357365
}
358366

359-
if err := actions_model.InsertRun(ctx, run, jobs); err != nil {
367+
if err := actions_model.InsertRun(ctx, run, jobs, !wfConcurrencyCancel); err != nil {
360368
log.Error("InsertRun: %v", err)
361369
continue
362370
}

services/actions/schedule_tasks.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
145145
}
146146

147147
// Insert the action run and its associated jobs into the database
148-
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
148+
if err := actions_model.InsertRun(ctx, run, workflows, false); err != nil {
149149
return err
150150
}
151151

0 commit comments

Comments
 (0)