From c7cb7506161d6bac47c3eda6370b7995e3a1e02f Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 20 Jun 2022 17:23:34 +0800 Subject: [PATCH] refactor functions --- cmd/damon.go | 203 ++++++++++++++++++++++++++++----------------------- cmd/root.go | 6 +- 2 files changed, 111 insertions(+), 98 deletions(-) diff --git a/cmd/damon.go b/cmd/damon.go index 301b5fb..74457da 100644 --- a/cmd/damon.go +++ b/cmd/damon.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "os" "os/signal" "strings" @@ -35,6 +36,99 @@ const ( MsgTypeBuildResult // build result ) +func handleVersion1(conn *websocket.Conn, sigs chan os.Signal, message []byte, msg *Message) error { + switch msg.Type { + case MsgTypeRegister: + log.Info().Msgf("received registered success: %s", message) + return conn.WriteJSON(&Message{ + Version: 1, + Type: MsgTypeRequestBuild, + RunnerUUID: msg.RunnerUUID, + }) + case MsgTypeError: + log.Info().Msgf("received error msessage: %s", message) + return conn.WriteJSON(&Message{ + Version: 1, + Type: MsgTypeRequestBuild, + RunnerUUID: msg.RunnerUUID, + }) + case MsgTypeIdle: + log.Info().Msgf("received no task") + return conn.WriteJSON(&Message{ + Version: 1, + Type: MsgTypeRequestBuild, + RunnerUUID: msg.RunnerUUID, + }) + case MsgTypeRequestBuild: + switch msg.EventName { + case "push": + input := Input{ + forgeInstance: "github.com", + reuseContainers: true, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Hour) + defer cancel() + + done := make(chan error) + go func(chan error) { + done <- runTask(ctx, &input, "") + }(done) + + c := time.NewTicker(time.Second) + defer c.Stop() + + for { + select { + case <-sigs: + cancel() + log.Info().Msgf("cancel task") + return nil + case err := <-done: + if err != nil { + log.Error().Msgf("runTask failed: %v", err) + return conn.WriteJSON(&Message{ + Version: 1, + Type: MsgTypeBuildResult, + RunnerUUID: msg.RunnerUUID, + BuildUUID: msg.BuildUUID, + ErrCode: 1, + ErrContent: err.Error(), + }) + } + log.Error().Msgf("runTask success") + return conn.WriteJSON(&Message{ + Version: 1, + Type: MsgTypeBuildResult, + RunnerUUID: msg.RunnerUUID, + BuildUUID: msg.BuildUUID, + }) + case <-c.C: + } + } + default: + return fmt.Errorf("unknow event %s with payload %s", msg.EventName, msg.EventPayload) + } + default: + return fmt.Errorf("received a message with an unsupported type: %#v", msg) + } +} + +// TODO: handle the message +func handleMessage(conn *websocket.Conn, sigs chan os.Signal, message []byte) error { + var msg Message + if err := json.Unmarshal(message, &msg); err != nil { + return fmt.Errorf("unmarshal received message faild: %v", err) + } + + switch msg.Version { + case 1: + return handleVersion1(conn, sigs, message, &msg) + default: + return fmt.Errorf("recevied a message with an unsupported version, consider upgrade your runner") + } +} + func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error { log.Info().Msgf("Starting runner daemon") @@ -44,8 +138,15 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args ticker := time.NewTicker(time.Second) defer ticker.Stop() var failedCnt int + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + for { select { + case <-sigs: + log.Info().Msgf("cancel task") + return nil + case <-ctx.Done(): if conn != nil { err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) @@ -89,6 +190,13 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args const timeout = time.Second * 10 for { + select { + case <-sigs: + log.Info().Msgf("cancel task") + return nil + default: + } + conn.SetReadDeadline(time.Now().Add(timeout)) conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(timeout)); return nil }) @@ -110,101 +218,10 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args } failedCnt = 0 } - break + return nil } - // TODO: handle the message - var msg Message - if err = json.Unmarshal(message, &msg); err != nil { - log.Error().Msgf("unmarshal received message faild: %v", err) - continue - } - switch msg.Version { - case 1: - switch msg.Type { - case MsgTypeRegister: - log.Info().Msgf("received registered success: %s", message) - conn.WriteJSON(&Message{ - Version: 1, - Type: MsgTypeRequestBuild, - RunnerUUID: msg.RunnerUUID, - }) - case MsgTypeError: - log.Info().Msgf("received error msessage: %s", message) - conn.WriteJSON(&Message{ - Version: 1, - Type: MsgTypeRequestBuild, - RunnerUUID: msg.RunnerUUID, - }) - case MsgTypeIdle: - log.Info().Msgf("received no task") - conn.WriteJSON(&Message{ - Version: 1, - Type: MsgTypeRequestBuild, - RunnerUUID: msg.RunnerUUID, - }) - case MsgTypeRequestBuild: - switch msg.EventName { - case "push": - input := Input{ - forgeInstance: "github.com", - reuseContainers: true, - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Hour) - defer cancel() - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - done := make(chan error) - go func(chan error) { - done <- runTask(ctx, &input, "") - }(done) - - c := time.NewTicker(time.Second) - defer c.Stop() - - END: - for { - select { - case <-sigs: - cancel() - log.Info().Msgf("cancel task") - break END - case err := <-done: - if err != nil { - log.Error().Msgf("runTask failed: %v", err) - conn.WriteJSON(&Message{ - Version: 1, - Type: MsgTypeBuildResult, - RunnerUUID: msg.RunnerUUID, - BuildUUID: msg.BuildUUID, - ErrCode: 1, - ErrContent: err.Error(), - }) - } else { - log.Error().Msgf("runTask success") - conn.WriteJSON(&Message{ - Version: 1, - Type: MsgTypeBuildResult, - RunnerUUID: msg.RunnerUUID, - BuildUUID: msg.BuildUUID, - }) - } - - break END - case <-c.C: - } - } - - default: - log.Warn().Msgf("unknow event %s with payload %s", msg.EventName, msg.EventPayload) - } - - default: - log.Error().Msgf("received a message with an unsupported type: %#v", msg) - } - default: - log.Error().Msgf("recevied a message with an unsupported version, consider upgrade your runner") + if err := handleMessage(conn, sigs, message); err != nil { } } } diff --git a/cmd/root.go b/cmd/root.go index b4a8eb0..49acbd7 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -7,7 +7,6 @@ import ( "path/filepath" "github.com/nektos/act/pkg/artifacts" - "github.com/nektos/act/pkg/common" "github.com/nektos/act/pkg/model" "github.com/nektos/act/pkg/runner" "github.com/rs/zerolog/log" @@ -193,10 +192,7 @@ func runTask(ctx context.Context, input *Input, jobID string) error { return fmt.Errorf("New config failed: %v", err) } - log := logrus.StandardLogger() - log.AddHook(&StepHook{}) - - ctx = common.WithLogger(ctx, log) + logrus.AddHook(&StepHook{}) cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort)