From 44781f9f5c4ede618660d8cfe42437f0e8dc22a0 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Tue, 25 Jul 2023 11:15:55 +0800 Subject: [PATCH] Implement auto-cancellation of concurrent jobs if the event is push (#25716) - cancel running jobs if the event is push - Add a new function `CancelRunningJobs` to cancel all running jobs of a run - Update `FindRunOptions` struct to include `Ref` field and update its condition in `toConds` function - Implement auto cancellation of running jobs in the same workflow in `notify` function related task: https://github.com/go-gitea/gitea/pull/22751/ --------- Signed-off-by: Bo-Yi Wu Signed-off-by: appleboy Co-authored-by: Jason Song Co-authored-by: delvh --- models/actions/run.go | 69 ++++++++++++++++++++++++++++- models/actions/run_list.go | 24 +++++----- models/migrations/migrations.go | 2 + models/migrations/v1_21/v268.go | 16 +++++++ routers/web/repo/actions/actions.go | 12 +++-- services/actions/notifier_helper.go | 25 ++++++++--- 6 files changed, 128 insertions(+), 20 deletions(-) create mode 100644 models/migrations/v1_21/v268.go diff --git a/models/actions/run.go b/models/actions/run.go index 5396c612f6e3..ab6e319b1cc8 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -34,7 +34,7 @@ type ActionRun struct { Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository TriggerUserID int64 `xorm:"index"` TriggerUser *user_model.User `xorm:"-"` - Ref string + Ref string `xorm:"index"` // the commit/tag/… that caused the run CommitSHA string IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow. NeedApproval bool // may need approval if it's a fork pull request @@ -164,6 +164,73 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err return err } +// CancelRunningJobs cancels all running and waiting jobs associated with a specific workflow. +func CancelRunningJobs(ctx context.Context, repoID int64, ref, workflowID string) error { + // Find all runs in the specified repository, reference, and workflow with statuses 'Running' or 'Waiting'. + runs, total, err := FindRuns(ctx, FindRunOptions{ + RepoID: repoID, + Ref: ref, + WorkflowID: workflowID, + Status: []Status{StatusRunning, StatusWaiting}, + }) + if err != nil { + return err + } + + // If there are no runs found, there's no need to proceed with cancellation, so return nil. + if total == 0 { + return nil + } + + // Iterate over each found run and cancel its associated jobs. + for _, run := range runs { + // Find all jobs associated with the current run. + jobs, _, err := FindRunJobs(ctx, FindRunJobOptions{ + RunID: run.ID, + }) + if err != nil { + return err + } + + // Iterate over each job and attempt to cancel it. + for _, job := range jobs { + // Skip jobs that are already in a terminal state (completed, cancelled, etc.). + status := job.Status + if status.IsDone() { + continue + } + + // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. + if job.TaskID == 0 { + job.Status = StatusCancelled + job.Stopped = timeutil.TimeStampNow() + + // Update the job's status and stopped time in the database. + n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") + if err != nil { + return err + } + + // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. + if n == 0 { + return fmt.Errorf("job has changed, try again") + } + + // Continue with the next job. + continue + } + + // If the job has an associated task, try to stop the task, effectively cancelling the job. + if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { + return err + } + } + } + + // Return nil to indicate successful cancellation of all running and waiting jobs. + return nil +} + // InsertRun inserts a run func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error { ctx, commiter, err := db.TxContext(ctx) diff --git a/models/actions/run_list.go b/models/actions/run_list.go index 29ab193d57f3..db36f6df981d 100644 --- a/models/actions/run_list.go +++ b/models/actions/run_list.go @@ -66,12 +66,13 @@ func (runs RunList) LoadRepos() error { type FindRunOptions struct { db.ListOptions - RepoID int64 - OwnerID int64 - WorkflowFileName string - TriggerUserID int64 - Approved bool // not util.OptionalBool, it works only when it's true - Status Status + RepoID int64 + OwnerID int64 + WorkflowID string + Ref string // the commit/tag/… that caused this workflow + TriggerUserID int64 + Approved bool // not util.OptionalBool, it works only when it's true + Status []Status } func (opts FindRunOptions) toConds() builder.Cond { @@ -82,8 +83,8 @@ func (opts FindRunOptions) toConds() builder.Cond { if opts.OwnerID > 0 { cond = cond.And(builder.Eq{"owner_id": opts.OwnerID}) } - if opts.WorkflowFileName != "" { - cond = cond.And(builder.Eq{"workflow_id": opts.WorkflowFileName}) + if opts.WorkflowID != "" { + cond = cond.And(builder.Eq{"workflow_id": opts.WorkflowID}) } if opts.TriggerUserID > 0 { cond = cond.And(builder.Eq{"trigger_user_id": opts.TriggerUserID}) @@ -91,8 +92,11 @@ func (opts FindRunOptions) toConds() builder.Cond { if opts.Approved { cond = cond.And(builder.Gt{"approved_by": 0}) } - if opts.Status > StatusUnknown { - cond = cond.And(builder.Eq{"status": opts.Status}) + if len(opts.Status) > 0 { + cond = cond.And(builder.In("status", opts.Status)) + } + if opts.Ref != "" { + cond = cond.And(builder.Eq{"ref": opts.Ref}) } return cond } diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index bfe4b56cd1ce..fc11d54071fa 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -517,6 +517,8 @@ var migrations = []Migration{ NewMigration("Reduce commit status", v1_21.ReduceCommitStatus), // v267 -> v268 NewMigration("Add action_tasks_version table", v1_21.CreateActionTasksVersionTable), + // v268 -> v269 + NewMigration("Update Action Ref", v1_21.UpdateActionsRefIndex), } // GetCurrentDBVersion returns the current db version diff --git a/models/migrations/v1_21/v268.go b/models/migrations/v1_21/v268.go new file mode 100644 index 000000000000..332793ff073b --- /dev/null +++ b/models/migrations/v1_21/v268.go @@ -0,0 +1,16 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_21 //nolint + +import ( + "xorm.io/xorm" +) + +// UpdateActionsRefIndex updates the index of actions ref field +func UpdateActionsRefIndex(x *xorm.Engine) error { + type ActionRun struct { + Ref string `xorm:"index"` // the commit/tag/… causing the run + } + return x.Sync(new(ActionRun)) +} diff --git a/routers/web/repo/actions/actions.go b/routers/web/repo/actions/actions.go index d215201bcdc2..5a12f52dcd14 100644 --- a/routers/web/repo/actions/actions.go +++ b/routers/web/repo/actions/actions.go @@ -150,10 +150,14 @@ func List(ctx *context.Context) { Page: page, PageSize: convert.ToCorrectPageSize(ctx.FormInt("limit")), }, - RepoID: ctx.Repo.Repository.ID, - WorkflowFileName: workflow, - TriggerUserID: actorID, - Status: actions_model.Status(status), + RepoID: ctx.Repo.Repository.ID, + WorkflowID: workflow, + TriggerUserID: actorID, + } + + // if status is not StatusUnknown, it means user has selected a status filter + if actions_model.Status(status) != actions_model.StatusUnknown { + opts.Status = []actions_model.Status{actions_model.Status(status)} } runs, total, err := actions_model.FindRuns(ctx, opts) diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index 90ad3001bad2..764d24a7dbc8 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -230,16 +230,31 @@ func notify(ctx context.Context, input *notifyInput) error { log.Error("jobparser.Parse: %v", err) continue } + + // cancel running jobs if the event is push + if run.Event == webhook_module.HookEventPush { + // cancel running jobs of the same workflow + if err := actions_model.CancelRunningJobs( + ctx, + run.RepoID, + run.Ref, + run.WorkflowID, + ); err != nil { + log.Error("CancelRunningJobs: %v", err) + } + } + if err := actions_model.InsertRun(ctx, run, jobs); err != nil { log.Error("InsertRun: %v", err) continue } - if jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID}); err != nil { - log.Error("FindRunJobs: %v", err) - } else { - CreateCommitStatus(ctx, jobs...) - } + alljobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID}) + if err != nil { + log.Error("FindRunJobs: %v", err) + continue + } + CreateCommitStatus(ctx, alljobs...) } return nil }