From 449388f3ab1dae8944dd955288c77373036a96ac Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sun, 28 Aug 2022 14:05:56 +0800 Subject: [PATCH] chore(gRPC): register new runner Signed-off-by: Bo-Yi.Wu --- client/client.go | 12 ++++++++++-- client/http.go | 15 ++++++++------- cmd/daemon.go | 15 ++++++++++++--- go.mod | 2 +- go.sum | 4 ++++ poller/poller.go | 43 ++++++++++++++++++++----------------------- runtime/runtime.go | 25 +++++++++++++++---------- 7 files changed, 70 insertions(+), 46 deletions(-) diff --git a/client/client.go b/client/client.go index 27aaab9..fb1de0c 100644 --- a/client/client.go +++ b/client/client.go @@ -6,11 +6,19 @@ import ( runnerv1 "gitea.com/gitea/proto-go/runner/v1" ) +type Filter struct { + Kind string `json:"kind"` + Type string `json:"type"` + OS string `json:"os"` + Arch string `json:"arch"` + Capacity int `json:"capacity"` +} + // A Client manages communication with the runner. type Client interface { // Ping sends a ping message to the server to test connectivity. Ping(ctx context.Context, machine string) error - // Request requests the next available build stage for execution. - Request(ctx context.Context) (*runnerv1.Stage, error) + // Register for new runner. + Register(ctx context.Context, args *runnerv1.RegisterRequest) (*runnerv1.Runner, error) } diff --git a/client/http.go b/client/http.go index 4cf17d4..c202fbd 100644 --- a/client/http.go +++ b/client/http.go @@ -59,6 +59,8 @@ func New(endpoint, secret string, skipverify bool, opts ...Option) *HTTPClient { return client } +var _ Client = (*HTTPClient)(nil) + // An HTTPClient manages communication with the runner API. type HTTPClient struct { Client *http.Client @@ -80,27 +82,26 @@ func (p *HTTPClient) Ping(ctx context.Context, machine string) error { Data: machine, }) - req.Header().Set("X-Gitea-Token", p.Secret) + req.Header().Set("X-Runner-Token", p.Secret) _, err := client.Ping(ctx, req) return err } // Ping sends a ping message to the server to test connectivity. -func (p *HTTPClient) Request(ctx context.Context) (*runnerv1.Stage, error) { +func (p *HTTPClient) Register(ctx context.Context, arg *runnerv1.RegisterRequest) (*runnerv1.Runner, error) { client := runnerv1connect.NewRunnerServiceClient( p.Client, p.Endpoint, p.opts..., ) - req := connect.NewRequest(&runnerv1.ConnectRequest{}) + req := connect.NewRequest(arg) + req.Header().Set("X-Runner-Token", p.Secret) - req.Header().Set("X-Gitea-Token", p.Secret) - - res, err := client.Connect(ctx, req) + res, err := client.Register(ctx, req) if err != nil { return nil, err } - return res.Msg.Stage, err + return res.Msg.Runner, err } diff --git a/cmd/daemon.go b/cmd/daemon.go index 8c0fbfd..f7b6198 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -71,7 +71,17 @@ func runDaemon(ctx context.Context, task *runtime.Task) func(cmd *cobra.Command, Environ: cfg.Runner.Environ, } - poller := poller.New(cli, runner.Run) + poller := poller.New( + cli, + runner.Run, + &client.Filter{ + Kind: runtime.Kind, + Type: runtime.Type, + OS: cfg.Platform.OS, + Arch: cfg.Platform.Arch, + Capacity: cfg.Runner.Capacity, + }, + ) g.Go(func() error { log.WithField("capacity", cfg.Runner.Capacity). @@ -80,8 +90,7 @@ func runDaemon(ctx context.Context, task *runtime.Task) func(cmd *cobra.Command, WithField("arch", cfg.Platform.Arch). Infoln("polling the remote server") - poller.Poll(ctx, cfg.Runner.Capacity) - return nil + return poller.Poll(ctx, cfg.Runner.Capacity) }) err = g.Wait() diff --git a/go.mod b/go.mod index aa70af2..e767c3b 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gitea.com/gitea/act_runner go 1.18 require ( - gitea.com/gitea/proto-go v0.0.0-20220817054638-17fb0016dd41 + gitea.com/gitea/proto-go v0.0.0-20220828031749-616e40329b57 github.com/bufbuild/connect-go v0.3.0 github.com/docker/docker v20.10.17+incompatible github.com/joho/godotenv v1.4.0 diff --git a/go.sum b/go.sum index 4f624cc..bf949c3 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,10 @@ gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee h1:T0wftx4RaYqbTH4t0A7bXG gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee/go.mod h1:G37Vfz4J6kJ5NbcPI5xQUkeWPVkUCP5J+MFkaWU9jNY= gitea.com/gitea/proto-go v0.0.0-20220817054638-17fb0016dd41 h1:FIGF6szYd3lBIwvbeedfU5Lc7uG1Xpzi7bkktS6Vdvg= gitea.com/gitea/proto-go v0.0.0-20220817054638-17fb0016dd41/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= +gitea.com/gitea/proto-go v0.0.0-20220828011358-d0a015a5b095 h1:Ng3GDJLYpsG3lYdaqDzeZFkRm5ShA2V+LWJSHRD0IQ0= +gitea.com/gitea/proto-go v0.0.0-20220828011358-d0a015a5b095/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= +gitea.com/gitea/proto-go v0.0.0-20220828031749-616e40329b57 h1:eVM6m3h5KpmJM2+LEqroENFaMs2kAo8QNIPyMgho9jg= +gitea.com/gitea/proto-go v0.0.0-20220828031749-616e40329b57/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= diff --git a/poller/poller.go b/poller/poller.go index 2b1f52b..e900f8d 100644 --- a/poller/poller.go +++ b/poller/poller.go @@ -2,7 +2,6 @@ package poller import ( "context" - "errors" "time" "gitea.com/gitea/act_runner/client" @@ -11,9 +10,10 @@ import ( log "github.com/sirupsen/logrus" ) -func New(cli client.Client, dispatch func(context.Context, *runnerv1.Stage) error) *Poller { +func New(cli client.Client, dispatch func(context.Context, *runnerv1.Runner) error, filter *client.Filter) *Poller { return &Poller{ Client: cli, + Filter: filter, Dispatch: dispatch, routineGroup: newRoutineGroup(), } @@ -21,12 +21,24 @@ func New(cli client.Client, dispatch func(context.Context, *runnerv1.Stage) erro type Poller struct { Client client.Client - Dispatch func(context.Context, *runnerv1.Stage) error + Filter *client.Filter + Dispatch func(context.Context, *runnerv1.Runner) error routineGroup *routineGroup } -func (p *Poller) Poll(ctx context.Context, n int) { +func (p *Poller) Poll(ctx context.Context, n int) error { + // register new runner. + runner, err := p.Client.Register(ctx, &runnerv1.RegisterRequest{ + Os: p.Filter.OS, + Arch: p.Filter.Arch, + Capacity: int64(p.Filter.Capacity), + }) + if err != nil { + log.WithError(err).Error("poller: cannot register new runner") + return err + } + for i := 0; i < n; i++ { func(i int) { p.routineGroup.Run(func() { @@ -40,7 +52,7 @@ func (p *Poller) Poll(ctx context.Context, n int) { log.Infof("stopping the runner: %d", i+1) return } - if err := p.poll(ctx, i+1); err != nil { + if err := p.poll(ctx, runner, i+1); err != nil { log.WithError(err).Error("poll error") } } @@ -49,29 +61,14 @@ func (p *Poller) Poll(ctx context.Context, n int) { }(i) } p.routineGroup.Wait() + return nil } -func (p *Poller) poll(ctx context.Context, thread int) error { +func (p *Poller) poll(ctx context.Context, runner *runnerv1.Runner, thread int) error { log.WithField("thread", thread).Info("poller: request stage from remote server") // TODO: fetch the job from remote server time.Sleep(time.Second) - // request a new build stage for execution from the central - // build server. - stage, err := p.Client.Request(ctx) - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - log.WithError(err).Trace("poller: no stage returned") - return nil - } - if err != nil { - log.WithError(err).Error("poller: cannot request stage") - return err - } - - if stage == nil || stage.BuildUuid == "" { - return nil - } - - return p.Dispatch(ctx, stage) + return p.Dispatch(ctx, runner) } diff --git a/runtime/runtime.go b/runtime/runtime.go index 9d0b5f7..bc3e783 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -9,6 +9,12 @@ import ( "github.com/sirupsen/logrus" ) +// Defines the Resource Kind and Type. +const ( + Kind = "pipeline" + Type = "docker" +) + // Runner runs the pipeline. type Runner struct { Machine string @@ -17,25 +23,24 @@ type Runner struct { } // Run runs the pipeline stage. -func (s *Runner) Run(ctx context.Context, stage *runnerv1.Stage) error { +func (s *Runner) Run(ctx context.Context, runner *runnerv1.Runner) error { l := logrus. - WithField("stage.build_uuid", stage.BuildUuid). - WithField("stage.runner_uuid", stage.RunnerUuid) + WithField("runner.UUID", runner.Uuid). + WithField("runner.token", runner.Token) - l.Info("stage received") - // TODO: Update stage structure + l.Info("request a new task") + // TODO: get new task - return s.run(ctx, stage) + return s.run(ctx, runner) } -func (s *Runner) run(ctx context.Context, stage *runnerv1.Stage) error { +func (s *Runner) run(ctx context.Context, runner *runnerv1.Runner) error { l := logrus. - WithField("stage.build_uuid", stage.BuildUuid). - WithField("stage.runner_uuid", stage.RunnerUuid) + WithField("runner.Uuid", runner.Uuid) l.Info("start running pipeline") // TODO: docker runner with stage data - // task.Run is blocking, so we need to use goroutine to run it in backgroud + // task.Run is blocking, so we need to use goroutine to run it in background // return task metadata and status to the server task := NewTask() return task.Run(ctx)