diff --git a/client/client.go b/client/client.go index 6716c9e..b314e93 100644 --- a/client/client.go +++ b/client/client.go @@ -25,29 +25,12 @@ type Client interface { // Request requests the next available build stage for execution. Request(ctx context.Context, args *runnerv1.RequestRequest) (*runnerv1.Stage, error) + // Detail fetches build details + Detail(ctx context.Context, args *runnerv1.DetailRequest) (*runnerv1.DetailResponse, error) + // Update updates the build stage. Update(ctxt context.Context, args *runnerv1.UpdateRequest) error // UpdateStep updates the build step. UpdateStep(ctx context.Context, args *runnerv1.UpdateStepRequest) error } - -type contextKey string - -const clientContextKey = contextKey("gitea.rpc.client") - -// FromContext returns the client from the context. -func FromContext(ctx context.Context) Client { - val := ctx.Value(clientContextKey) - if val != nil { - if c, ok := val.(Client); ok { - return c - } - } - return nil -} - -// WithClient returns a new context with the given client. -func WithClient(ctx context.Context, c Client) context.Context { - return context.WithValue(ctx, clientContextKey, c) -} diff --git a/client/http.go b/client/http.go index 0bc1bd2..da88b9d 100644 --- a/client/http.go +++ b/client/http.go @@ -153,3 +153,22 @@ func (p *HTTPClient) UpdateStep(ctx context.Context, arg *runnerv1.UpdateStepReq return err } + +// Detail fetches build details +func (p *HTTPClient) Detail(ctx context.Context, arg *runnerv1.DetailRequest) (*runnerv1.DetailResponse, error) { + client := runnerv1connect.NewRunnerServiceClient( + p.Client, + p.Endpoint, + p.opts..., + ) + req := connect.NewRequest(arg) + req.Header().Set("X-Runner-Token", p.Secret) + + resp, err := client.Detail(ctx, req) + + return &runnerv1.DetailResponse{ + Repo: resp.Msg.Repo, + Build: resp.Msg.Build, + Stage: resp.Msg.Stage, + }, err +} diff --git a/cmd/root.go b/cmd/root.go index d56c5d8..4d3579e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -31,7 +31,7 @@ func initLogging(cfg Config) { } func Execute(ctx context.Context) { - task := runtime.NewTask(0) + task := runtime.NewTask(0, nil) // ./act_runner rootCmd := &cobra.Command{ diff --git a/go.mod b/go.mod index e0d75f5..5002e23 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gitea.com/gitea/act_runner go 1.18 require ( - gitea.com/gitea/proto-go v0.0.0-20220901135226-82a982903134 + gitea.com/gitea/proto-go v0.0.0-20220903092234-20f71c2df67e github.com/bufbuild/connect-go v0.3.0 github.com/docker/docker v20.10.17+incompatible github.com/joho/godotenv v1.4.0 diff --git a/go.sum b/go.sum index 7f7ec6f..7190a45 100644 --- a/go.sum +++ b/go.sum @@ -25,8 +25,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee h1:T0wftx4RaYqbTH4t0A7bXGXxemZloCrjReA7xJvIVdY= gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee/go.mod h1:G37Vfz4J6kJ5NbcPI5xQUkeWPVkUCP5J+MFkaWU9jNY= -gitea.com/gitea/proto-go v0.0.0-20220901135226-82a982903134 h1:5ofH0FGEkIj/P9a6oFDgkdmGSWow1yD1uubiftMA2Kw= -gitea.com/gitea/proto-go v0.0.0-20220901135226-82a982903134/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= +gitea.com/gitea/proto-go v0.0.0-20220903092234-20f71c2df67e h1:xlNITjAs+Ce6QR4mo0BvHTzTdkpqgS7zwfLpEi31mIM= +gitea.com/gitea/proto-go v0.0.0-20220903092234-20f71c2df67e/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= diff --git a/poller/poller.go b/poller/poller.go index 15f5b2d..90e1add 100644 --- a/poller/poller.go +++ b/poller/poller.go @@ -94,12 +94,5 @@ func (p *Poller) poll(ctx context.Context, thread int) error { return nil } - // FIXME: for testing task - // stage.Id = 111 - // stage.BuildId = 1222 - - // set client to context, so that the stage can use it to - ctx = client.WithClient(ctx, p.Client) - return p.Dispatch(ctx, stage) } diff --git a/runtime/runtime.go b/runtime/runtime.go index 1f463af..41f620c 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -2,11 +2,12 @@ package runtime import ( "context" + "fmt" "gitea.com/gitea/act_runner/client" runnerv1 "gitea.com/gitea/proto-go/runner/v1" - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" ) // Defines the Resource Kind and Type. @@ -24,12 +25,45 @@ type Runner struct { // Run runs the pipeline stage. func (s *Runner) Run(ctx context.Context, stage *runnerv1.Stage) error { - l := logrus. - WithField("runner.ID", stage.Id). - WithField("runner.BuildID", stage.BuildId) + l := log. + WithField("stage.id", stage.Id). + WithField("stage.name", stage.Name) l.Info("start running pipeline") - // TODO: use ctx to transfer usage information - return startTask(stage.BuildId, ctx) + // update machine in stage + stage.Machine = s.Machine + data, err := s.Client.Detail(ctx, &runnerv1.DetailRequest{ + Stage: stage, + }) + if err != nil { + l.Debug("stage accepted by another runner") + return nil + } + + l = log.WithField("repo.id", data.Repo.Id). + WithField("repo.name", data.Repo.Name). + WithField("build.id", data.Build.Id). + WithField("build.name", data.Build.Name) + + l.Info("stage details fetched") + + return s.run(ctx, data) +} + +func (s *Runner) run(ctx context.Context, data *runnerv1.DetailResponse) error { + _, exist := globalTaskMap.Load(data.Build.Id) + if exist { + return fmt.Errorf("task %d already exists", data.Build.Id) + } + + task := NewTask(data.Build.Id, s.Client) + + // set task ve to global map + // when task is done or canceled, it will be removed from the map + globalTaskMap.Store(data.Build.Id, task) + + go task.Run(ctx) + + return nil } diff --git a/runtime/task.go b/runtime/task.go index 5b4ef74..4a5a98d 100644 --- a/runtime/task.go +++ b/runtime/task.go @@ -9,14 +9,13 @@ import ( "time" "gitea.com/gitea/act_runner/client" + runnerv1 "gitea.com/gitea/proto-go/runner/v1" + "github.com/nektos/act/pkg/artifacts" "github.com/nektos/act/pkg/common" "github.com/nektos/act/pkg/model" "github.com/nektos/act/pkg/runner" - "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" - - runnerv1 "gitea.com/gitea/proto-go/runner/v1" ) type TaskInput struct { @@ -116,11 +115,11 @@ type Task struct { logHook *taskLogHook state TaskState client client.Client - log *logrus.Entry + log *log.Entry } // newTask creates a new task -func NewTask(buildID int64) *Task { +func NewTask(buildID int64, client client.Client) *Task { task := &Task{ Input: &TaskInput{ reuseContainers: true, @@ -129,8 +128,8 @@ func NewTask(buildID int64) *Task { BuildID: buildID, state: TaskStatePending, - client: nil, - log: logrus.WithField("buildID", buildID), + client: client, + log: log.WithField("buildID", buildID), logHook: &taskLogHook{}, } task.Input.repoDirectory, _ = os.Getwd() @@ -168,13 +167,12 @@ func (t *Task) reportFailure(ctx context.Context, err error) { if t.client == nil { // TODO: fill the step request stepRequest := &runnerv1.UpdateStepRequest{} - t.client.UpdateStep(ctx, stepRequest) + _ = t.client.UpdateStep(ctx, stepRequest) return } - } -func (t *Task) startReporting(interval int64, ctx context.Context) { +func (t *Task) startReporting(ctx context.Context, interval int64) { for { time.Sleep(time.Duration(interval) * time.Second) if t.state == TaskStateSuccess || t.state == TaskStateFailure { @@ -199,7 +197,7 @@ func (t *Task) reportStep(ctx context.Context) { // TODO: fill the step request stepRequest := &runnerv1.UpdateStepRequest{} - t.client.UpdateStep(ctx, stepRequest) + _ = t.client.UpdateStep(ctx, stepRequest) } // reportSuccess reports the success of the task @@ -215,18 +213,10 @@ func (t *Task) reportSuccess(ctx context.Context) { // TODO: fill the step request stepRequest := &runnerv1.UpdateStepRequest{} - t.client.UpdateStep(ctx, stepRequest) + _ = t.client.UpdateStep(ctx, stepRequest) } func (t *Task) Run(ctx context.Context) { - // get client for context, use for reporting - t.client = client.FromContext(ctx) - if t.client == nil { - t.log.Warnf("no client found in context") - } else { - t.log.Infof("client found in context") - } - workflowsPath, err := getWorkflowsPath(t.Input.repoDirectory) if err != nil { t.reportFailure(ctx, err) @@ -324,7 +314,7 @@ func (t *Task) Run(ctx context.Context) { // add logger recorders ctx = common.WithLoggerHook(ctx, t.logHook) - go t.startReporting(1, ctx) + go t.startReporting(ctx, 1) if err := executor(ctx); err != nil { t.reportFailure(ctx, err) diff --git a/runtime/taskmap.go b/runtime/taskmap.go index 37369a1..ca79aa5 100644 --- a/runtime/taskmap.go +++ b/runtime/taskmap.go @@ -1,31 +1,11 @@ package runtime import ( - "context" - "fmt" "sync" ) var globalTaskMap sync.Map -// startTask adds the task to global map -func startTask(buildID int64, ctx context.Context) error { - _, exist := globalTaskMap.Load(buildID) - if exist { - return fmt.Errorf("task %d already exists", buildID) - } - - task := NewTask(buildID) - - // set task ve to global map - // when task is done or canceled, it will be removed from the map - globalTaskMap.Store(buildID, task) - - go task.Run(ctx) - - return nil -} - // finishTask removes the task from global map func finishTask(buildID int64) { globalTaskMap.Delete(buildID)