From 7d55fd57c94d1862f91ff7e15b892ab5f4b0af88 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 14 Aug 2022 13:29:00 +0800 Subject: [PATCH] chore(piepline): add runtime package. Signed-off-by: Bo-Yi Wu --- client/client.go | 5 ++ client/http.go | 19 ++++ cmd/damon.go | 214 ++------------------------------------------- go.mod | 2 +- go.sum | 4 +- poller/poller.go | 26 +++++- runtime/runtime.go | 40 +++++++++ 7 files changed, 99 insertions(+), 211 deletions(-) create mode 100644 runtime/runtime.go diff --git a/client/client.go b/client/client.go index 15fd42e..2a299cc 100644 --- a/client/client.go +++ b/client/client.go @@ -2,10 +2,15 @@ package client import ( "context" + + v1 "gitea.com/gitea/proto/gen/proto/v1" ) // A Client manages communication with the runner. type Client interface { // Ping sends a ping message to the server to test connectivity. Ping(ctx context.Context, machine string) error + + // Request requests the next available build stage for execution. + Request(ctx context.Context) (*v1.Stage, error) } diff --git a/client/http.go b/client/http.go index de2bae1..c24ca7b 100644 --- a/client/http.go +++ b/client/http.go @@ -83,3 +83,22 @@ func (p *HTTPClient) Ping(ctx context.Context, machine string) error { _, err := client.Ping(ctx, req) return err } + +// Ping sends a ping message to the server to test connectivity. +func (p *HTTPClient) Request(ctx context.Context) (*v1.Stage, error) { + client := v1connect.NewRunnerServiceClient( + p.Client, + p.Endpoint, + p.opts..., + ) + req := connect.NewRequest(&v1.ConnectRequest{}) + + req.Header().Set("X-Gitea-Token", p.Secret) + + res, err := client.Connect(ctx, req) + if err != nil { + return nil, err + } + + return res.Msg.Stage, err +} diff --git a/cmd/damon.go b/cmd/damon.go index d00c61f..9971373 100644 --- a/cmd/damon.go +++ b/cmd/damon.go @@ -7,126 +7,14 @@ import ( "gitea.com/gitea/act_runner/client" "gitea.com/gitea/act_runner/engine" "gitea.com/gitea/act_runner/poller" - "golang.org/x/sync/errgroup" + "gitea.com/gitea/act_runner/runtime" "github.com/joho/godotenv" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" ) -// type Message struct { -// Version int // -// Type int // message type, 1 register 2 error -// RunnerUUID string // runner uuid -// BuildUUID string // build uuid -// ErrCode int // error code -// ErrContent string // errors message -// EventName string -// EventPayload string -// JobID string // only run the special job, empty means run all the jobs -// } - -// const ( -// MsgTypeRegister = iota + 1 // register -// MsgTypeError // error -// MsgTypeRequestBuild // request build task -// MsgTypeIdle // no task -// MsgTypeBuildResult // build result -// ) - -// func handleVersion1(ctx context.Context, conn *websocket.Conn, message []byte, msg *Message) error { -// switch msg.Type { -// case MsgTypeRegister: -// log.Info().Msgf("received registered success: %s", message) -// return conn.WriteJSON(&Message{ -// Version: 1, -// Type: MsgTypeRequestBuild, -// RunnerUUID: msg.RunnerUUID, -// }) -// case MsgTypeError: -// log.Info().Msgf("received error msessage: %s", message) -// return conn.WriteJSON(&Message{ -// Version: 1, -// Type: MsgTypeRequestBuild, -// RunnerUUID: msg.RunnerUUID, -// }) -// case MsgTypeIdle: -// log.Info().Msgf("received no task") -// return conn.WriteJSON(&Message{ -// Version: 1, -// Type: MsgTypeRequestBuild, -// RunnerUUID: msg.RunnerUUID, -// }) -// case MsgTypeRequestBuild: -// switch msg.EventName { -// case "push": -// input := Input{ -// forgeInstance: "github.com", -// reuseContainers: true, -// } - -// ctx, cancel := context.WithTimeout(ctx, time.Hour) -// defer cancel() - -// done := make(chan error) -// go func(chan error) { -// done <- runTask(ctx, &input, "") -// }(done) - -// c := time.NewTicker(time.Second) -// defer c.Stop() - -// for { -// select { -// case <-ctx.Done(): -// cancel() -// log.Info().Msgf("cancel task") -// return nil -// case err := <-done: -// if err != nil { -// log.Error().Msgf("runTask failed: %v", err) -// return conn.WriteJSON(&Message{ -// Version: 1, -// Type: MsgTypeBuildResult, -// RunnerUUID: msg.RunnerUUID, -// BuildUUID: msg.BuildUUID, -// ErrCode: 1, -// ErrContent: err.Error(), -// }) -// } -// log.Error().Msgf("runTask success") -// return conn.WriteJSON(&Message{ -// Version: 1, -// Type: MsgTypeBuildResult, -// RunnerUUID: msg.RunnerUUID, -// BuildUUID: msg.BuildUUID, -// }) -// case <-c.C: -// } -// } -// default: -// return fmt.Errorf("unknow event %s with payload %s", msg.EventName, msg.EventPayload) -// } -// default: -// return fmt.Errorf("received a message with an unsupported type: %#v", msg) -// } -// } - -// // TODO: handle the message -// func handleMessage(ctx context.Context, conn *websocket.Conn, message []byte) error { -// var msg Message -// if err := json.Unmarshal(message, &msg); err != nil { -// return fmt.Errorf("unmarshal received message faild: %v", err) -// } - -// switch msg.Version { -// case 1: -// return handleVersion1(ctx, conn, message, &msg) -// default: -// return fmt.Errorf("recevied a message with an unsupported version, consider upgrade your runner") -// } -// } - func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error { log.Infoln("Starting runner daemon") @@ -202,7 +90,13 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args var g errgroup.Group - poller := poller.New(cli) + runner := &runtime.Runner{ + Client: cli, + Machine: cfg.Runner.Name, + Environ: cfg.Runner.Environ, + } + + poller := poller.New(cli, runner.Run) g.Go(func() error { log.WithField("capacity", cfg.Runner.Capacity). @@ -221,95 +115,5 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args Errorln("shutting down the server") } return err - // var conn *websocket.Conn - // var err error - // ticker := time.NewTicker(time.Second) - // defer ticker.Stop() - // var failedCnt int - // for { - // select { - // case <-ctx.Done(): - // log.Info().Msgf("cancel task") - // if conn != nil { - // err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - // if err != nil { - // log.Error().Msgf("write close: %v", err) - // } - // } - // if errors.Is(ctx.Err(), context.Canceled) { - // return nil - // } - // return ctx.Err() - // case <-ticker.C: - // if conn == nil { - // log.Trace().Msgf("trying connect %v", "ws://localhost:3000/api/actions") - // conn, _, err = websocket.DefaultDialer.DialContext(ctx, "ws://localhost:3000/api/actions", nil) - // if err != nil { - // log.Error().Msgf("dial: %v", err) - // break - // } - - // // register the client - // msg := Message{ - // Version: 1, - // Type: MsgTypeRegister, - // RunnerUUID: "111111", - // } - // bs, err := json.Marshal(&msg) - // if err != nil { - // log.Error().Msgf("Marshal: %v", err) - // break - // } - - // if err = conn.WriteMessage(websocket.TextMessage, bs); err != nil { - // log.Error().Msgf("register failed: %v", err) - // conn.Close() - // conn = nil - // break - // } - // } - - // const timeout = time.Second * 10 - - // for { - // select { - // case <-ctx.Done(): - // log.Info().Msg("cancel task") - // return nil - // default: - // } - - // _ = conn.SetReadDeadline(time.Now().Add(timeout)) - // conn.SetPongHandler(func(string) error { - // return conn.SetReadDeadline(time.Now().Add(timeout)) - // }) - - // _, message, err := conn.ReadMessage() - // if err != nil { - // if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) || - // websocket.IsCloseError(err, websocket.CloseNormalClosure) { - // log.Trace().Msgf("closed from remote") - // conn.Close() - // conn = nil - // } else if !strings.Contains(err.Error(), "i/o timeout") { - // log.Error().Msgf("read message failed: %#v", err) - // } - // failedCnt++ - // if failedCnt > 60 { - // if conn != nil { - // conn.Close() - // conn = nil - // } - // failedCnt = 0 - // } - // break - // } - - // if err := handleMessage(ctx, conn, message); err != nil { - // log.Error().Msgf(err.Error()) - // } - // } - // } - // } } } diff --git a/go.mod b/go.mod index 15ae640..dc59c3a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gitea.com/gitea/act_runner go 1.18 require ( - gitea.com/gitea/proto v0.0.0-20220813120843-ce4b5dd68c1f + gitea.com/gitea/proto v0.0.0-20220814042910-32799131d693 github.com/bufbuild/connect-go v0.3.0 github.com/docker/docker v20.10.17+incompatible github.com/joho/godotenv v1.4.0 diff --git a/go.sum b/go.sum index 430ea39..44b44fe 100644 --- a/go.sum +++ b/go.sum @@ -25,8 +25,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee h1:T0wftx4RaYqbTH4t0A7bXGXxemZloCrjReA7xJvIVdY= gitea.com/gitea/act v0.0.0-20220812010840-c467b06265ee/go.mod h1:G37Vfz4J6kJ5NbcPI5xQUkeWPVkUCP5J+MFkaWU9jNY= -gitea.com/gitea/proto v0.0.0-20220813120843-ce4b5dd68c1f h1:o1fHWLbdhicM5Q6EXvpBLrpu/fVqlobYw1sU1YKSreM= -gitea.com/gitea/proto v0.0.0-20220813120843-ce4b5dd68c1f/go.mod h1:LWD9G0VCMxaDY4I+J3vSqJF5OYNum33pQtRpx43516s= +gitea.com/gitea/proto v0.0.0-20220814042910-32799131d693 h1:innZDNfMfvOBnvnfuFqCpk2XesFUKyaeFiEbNSsaxsA= +gitea.com/gitea/proto v0.0.0-20220814042910-32799131d693/go.mod h1:LWD9G0VCMxaDY4I+J3vSqJF5OYNum33pQtRpx43516s= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= diff --git a/poller/poller.go b/poller/poller.go index ae0b621..b816b43 100644 --- a/poller/poller.go +++ b/poller/poller.go @@ -2,22 +2,26 @@ package poller import ( "context" + "errors" "time" + v1 "gitea.com/gitea/proto/gen/proto/v1" "gitea.com/gitea/act_runner/client" log "github.com/sirupsen/logrus" ) -func New(cli client.Client) *Poller { +func New(cli client.Client, dispatch func(context.Context, *v1.Stage) error) *Poller { return &Poller{ Client: cli, + Dispatch: dispatch, routineGroup: newRoutineGroup(), } } type Poller struct { - Client client.Client + Client client.Client + Dispatch func(context.Context, *v1.Stage) error routineGroup *routineGroup } @@ -53,5 +57,21 @@ func (p *Poller) poll(ctx context.Context, thread int) error { // TODO: fetch the job from remote server time.Sleep(time.Second) - return nil + // request a new build stage for execution from the central + // build server. + stage, err := p.Client.Request(ctx) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + log.WithError(err).Trace("poller: no stage returned") + return nil + } + if err != nil { + log.WithError(err).Error("poller: cannot request stage") + return err + } + + if stage == nil || stage.BuildUuid == "" { + return nil + } + + return p.Dispatch(ctx, stage) } diff --git a/runtime/runtime.go b/runtime/runtime.go new file mode 100644 index 0000000..a4f9cbb --- /dev/null +++ b/runtime/runtime.go @@ -0,0 +1,40 @@ +package runtime + +import ( + "context" + + v1 "gitea.com/gitea/proto/gen/proto/v1" + "gitea.com/gitea/act_runner/client" + + "github.com/sirupsen/logrus" +) + +// Runner runs the pipeline. +type Runner struct { + Machine string + Environ map[string]string + Client client.Client +} + +// Run runs the pipeline stage. +func (s *Runner) Run(ctx context.Context, stage *v1.Stage) error { + l := logrus. + WithField("stage.build_uuid", stage.BuildUuid). + WithField("stage.runner_uuid", stage.RunnerUuid) + + l.Info("stage received") + // TODO: Update stage structure + + return s.run(ctx, stage) +} + +func (s *Runner) run(ctx context.Context, stage *v1.Stage) error { + l := logrus. + WithField("stage.build_uuid", stage.BuildUuid). + WithField("stage.runner_uuid", stage.RunnerUuid) + + l.Info("start running pipeline") + // TODO: docker runner with stage data + + return nil +}