From 83ec0ba9094b18255c9614e31800d0034776ef2e Mon Sep 17 00:00:00 2001 From: Jason Song Date: Thu, 20 Apr 2023 23:27:46 +0800 Subject: [PATCH] Support upload `outputs` and use `needs` context (#133) See [Example usage of the needs context](https://docs.github.com/en/actions/learn-github-actions/contexts#example-usage-of-the-needs-context). Related to: - [actions-proto-def #5](https://gitea.com/gitea/actions-proto-def/pulls/5) - [gitea #24230](https://github.com/go-gitea/gitea/pull/24230) Reviewed-on: https://gitea.com/gitea/act_runner/pulls/133 Reviewed-by: Lunny Xiao Reviewed-by: Zettat123 Co-authored-by: Jason Song Co-committed-by: Jason Song --- internal/app/run/runner.go | 12 ++--- internal/app/run/workflow.go | 54 ++++++++++++++++++++ internal/app/run/workflow_test.go | 74 +++++++++++++++++++++++++++ internal/pkg/report/reporter.go | 84 ++++++++++++++++++++++++------- 4 files changed, 199 insertions(+), 25 deletions(-) create mode 100644 internal/app/run/workflow.go create mode 100644 internal/app/run/workflow_test.go diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go index 076418c..6a98de7 100644 --- a/internal/app/run/runner.go +++ b/internal/app/run/runner.go @@ -4,7 +4,6 @@ package run import ( - "bytes" "context" "encoding/json" "fmt" @@ -112,16 +111,11 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report. reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue()) - workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload)) + workflow, jobID, err := generateWorkflow(task) if err != nil { return err } - jobIDs := workflow.GetJobIDs() - if len(jobIDs) != 1 { - return fmt.Errorf("multiple jobs found: %v", jobIDs) - } - jobID := jobIDs[0] plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID) if err != nil { return err @@ -209,5 +203,7 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report. // add logger recorders ctx = common.WithLoggerHook(ctx, reporter) - return executor(ctx) + execErr := executor(ctx) + reporter.SetOutputs(job.Outputs) + return execErr } diff --git a/internal/app/run/workflow.go b/internal/app/run/workflow.go new file mode 100644 index 0000000..a6fbb71 --- /dev/null +++ b/internal/app/run/workflow.go @@ -0,0 +1,54 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package run + +import ( + "bytes" + "fmt" + "sort" + "strings" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/nektos/act/pkg/model" + "gopkg.in/yaml.v3" +) + +func generateWorkflow(task *runnerv1.Task) (*model.Workflow, string, error) { + workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload)) + if err != nil { + return nil, "", err + } + + jobIDs := workflow.GetJobIDs() + if len(jobIDs) != 1 { + return nil, "", fmt.Errorf("multiple jobs found: %v", jobIDs) + } + jobID := jobIDs[0] + + needJobIDs := make([]string, 0, len(task.Needs)) + for id, need := range task.Needs { + needJobIDs = append(needJobIDs, id) + needJob := &model.Job{ + Outputs: need.Outputs, + Result: strings.ToLower(strings.TrimPrefix(need.Result.String(), "RESULT_")), + } + workflow.Jobs[id] = needJob + } + sort.Strings(needJobIDs) + + rawNeeds := yaml.Node{ + Kind: yaml.SequenceNode, + Content: make([]*yaml.Node, 0, len(needJobIDs)), + } + for _, id := range needJobIDs { + rawNeeds.Content = append(rawNeeds.Content, &yaml.Node{ + Kind: yaml.ScalarNode, + Value: id, + }) + } + + workflow.Jobs[jobID].RawNeeds = rawNeeds + + return workflow, jobID, nil +} diff --git a/internal/app/run/workflow_test.go b/internal/app/run/workflow_test.go new file mode 100644 index 0000000..c7598db --- /dev/null +++ b/internal/app/run/workflow_test.go @@ -0,0 +1,74 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package run + +import ( + "testing" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/nektos/act/pkg/model" + "github.com/stretchr/testify/require" + "gotest.tools/v3/assert" +) + +func Test_generateWorkflow(t *testing.T) { + type args struct { + task *runnerv1.Task + } + tests := []struct { + name string + args args + assert func(t *testing.T, wf *model.Workflow) + want1 string + wantErr bool + }{ + { + name: "has needs", + args: args{ + task: &runnerv1.Task{ + WorkflowPayload: []byte(` +name: Build and deploy +on: push + +jobs: + job9: + needs: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: ./deploy --build ${{ needs.job1.outputs.output1 }} + - run: ./deploy --build ${{ needs.job2.outputs.output2 }} +`), + Needs: map[string]*runnerv1.TaskNeed{ + "job1": { + Outputs: map[string]string{ + "output1": "output1 value", + }, + Result: runnerv1.Result_RESULT_SUCCESS, + }, + "job2": { + Outputs: map[string]string{ + "output2": "output2 value", + }, + Result: runnerv1.Result_RESULT_SUCCESS, + }, + }, + }, + }, + assert: func(t *testing.T, wf *model.Workflow) { + assert.DeepEqual(t, wf.GetJob("job9").Needs(), []string{"job1", "job2"}) + }, + want1: "job9", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1, err := generateWorkflow(tt.args.task) + require.NoError(t, err) + tt.assert(t, got) + assert.Equal(t, got1, tt.want1) + }) + } +} diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index 72b0e1d..3a1a75c 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -31,8 +31,10 @@ type Reporter struct { logOffset int logRows []*runnerv1.LogRow logReplacer *strings.Replacer - state *runnerv1.TaskState - stateM sync.RWMutex + + state *runnerv1.TaskState + stateMu sync.RWMutex + outputs sync.Map } func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task) *Reporter { @@ -56,8 +58,8 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C } func (r *Reporter) ResetSteps(l int) { - r.stateM.Lock() - defer r.stateM.Unlock() + r.stateMu.Lock() + defer r.stateMu.Unlock() for i := 0; i < l; i++ { r.state.Steps = append(r.state.Steps, &runnerv1.StepState{ Id: int64(i), @@ -70,8 +72,8 @@ func (r *Reporter) Levels() []log.Level { } func (r *Reporter) Fire(entry *log.Entry) error { - r.stateM.Lock() - defer r.stateM.Unlock() + r.stateMu.Lock() + defer r.stateMu.Unlock() log.WithFields(entry.Data).Trace(entry.Message) @@ -155,9 +157,13 @@ func (r *Reporter) RunDaemon() { } func (r *Reporter) Logf(format string, a ...interface{}) { - r.stateM.Lock() - defer r.stateM.Unlock() + r.stateMu.Lock() + defer r.stateMu.Unlock() + r.logf(format, a...) +} + +func (r *Reporter) logf(format string, a ...interface{}) { if !r.duringSteps() { r.logRows = append(r.logRows, &runnerv1.LogRow{ Time: timestamppb.Now(), @@ -166,10 +172,30 @@ func (r *Reporter) Logf(format string, a ...interface{}) { } } +func (r *Reporter) SetOutputs(outputs map[string]string) { + r.stateMu.Lock() + defer r.stateMu.Unlock() + + for k, v := range outputs { + if len(k) > 255 { + r.logf("ignore output because the key is too long: %q", k) + continue + } + if l := len(v); l > 1024*1024 { + log.Println("ignore output because the value is too long:", k, l) + r.logf("ignore output because the value %q is too long: %d", k, l) + } + if _, ok := r.outputs.Load(k); ok { + continue + } + r.outputs.Store(k, v) + } +} + func (r *Reporter) Close(lastWords string) error { r.closed = true - r.stateM.Lock() + r.stateMu.Lock() if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { if lastWords == "" { lastWords = "Early termination" @@ -191,7 +217,7 @@ func (r *Reporter) Close(lastWords string) error { Content: lastWords, }) } - r.stateM.Unlock() + r.stateMu.Unlock() return retry.Do(func() error { if err := r.ReportLog(true); err != nil { @@ -205,9 +231,9 @@ func (r *Reporter) ReportLog(noMore bool) error { r.clientM.Lock() defer r.clientM.Unlock() - r.stateM.RLock() + r.stateMu.RLock() rows := r.logRows - r.stateM.RUnlock() + r.stateMu.RUnlock() resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{ TaskId: r.state.Id, @@ -224,10 +250,10 @@ func (r *Reporter) ReportLog(noMore bool) error { return fmt.Errorf("submitted logs are lost") } - r.stateM.Lock() + r.stateMu.Lock() r.logRows = r.logRows[ack-r.logOffset:] r.logOffset = ack - r.stateM.Unlock() + r.stateMu.Unlock() if noMore && ack < r.logOffset+len(rows) { return fmt.Errorf("not all logs are submitted") @@ -240,21 +266,45 @@ func (r *Reporter) ReportState() error { r.clientM.Lock() defer r.clientM.Unlock() - r.stateM.RLock() + r.stateMu.RLock() state := proto.Clone(r.state).(*runnerv1.TaskState) - r.stateM.RUnlock() + r.stateMu.RUnlock() + + outputs := make(map[string]string) + r.outputs.Range(func(k, v interface{}) bool { + if val, ok := v.(string); ok { + outputs[k.(string)] = val + } + return true + }) resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ - State: state, + State: state, + Outputs: outputs, })) if err != nil { return err } + for _, k := range resp.Msg.SentOutputs { + r.outputs.Store(k, struct{}{}) + } + if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED { r.cancel() } + var noSent []string + r.outputs.Range(func(k, v interface{}) bool { + if _, ok := v.(string); ok { + noSent = append(noSent, k.(string)) + } + return true + }) + if len(noSent) > 0 { + return fmt.Errorf("there are still outputs that have not been sent: %v", noSent) + } + return nil }