package cmd import ( "context" "time" "gitea.com/gitea/act_runner/client" "gitea.com/gitea/act_runner/engine" "gitea.com/gitea/act_runner/poller" "golang.org/x/sync/errgroup" "github.com/joho/godotenv" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) // type Message struct { // Version int // // Type int // message type, 1 register 2 error // RunnerUUID string // runner uuid // BuildUUID string // build uuid // ErrCode int // error code // ErrContent string // errors message // EventName string // EventPayload string // JobID string // only run the special job, empty means run all the jobs // } // const ( // MsgTypeRegister = iota + 1 // register // MsgTypeError // error // MsgTypeRequestBuild // request build task // MsgTypeIdle // no task // MsgTypeBuildResult // build result // ) // func handleVersion1(ctx context.Context, conn *websocket.Conn, message []byte, msg *Message) error { // switch msg.Type { // case MsgTypeRegister: // log.Info().Msgf("received registered success: %s", message) // return conn.WriteJSON(&Message{ // Version: 1, // Type: MsgTypeRequestBuild, // RunnerUUID: msg.RunnerUUID, // }) // case MsgTypeError: // log.Info().Msgf("received error msessage: %s", message) // return conn.WriteJSON(&Message{ // Version: 1, // Type: MsgTypeRequestBuild, // RunnerUUID: msg.RunnerUUID, // }) // case MsgTypeIdle: // log.Info().Msgf("received no task") // return conn.WriteJSON(&Message{ // Version: 1, // Type: MsgTypeRequestBuild, // RunnerUUID: msg.RunnerUUID, // }) // case MsgTypeRequestBuild: // switch msg.EventName { // case "push": // input := Input{ // forgeInstance: "github.com", // reuseContainers: true, // } // ctx, cancel := context.WithTimeout(ctx, time.Hour) // defer cancel() // done := make(chan error) // go func(chan error) { // done <- runTask(ctx, &input, "") // }(done) // c := time.NewTicker(time.Second) // defer c.Stop() // for { // select { // case <-ctx.Done(): // cancel() // log.Info().Msgf("cancel task") // return nil // case err := <-done: // if err != nil { // log.Error().Msgf("runTask failed: %v", err) // return conn.WriteJSON(&Message{ // Version: 1, // Type: MsgTypeBuildResult, // RunnerUUID: msg.RunnerUUID, // BuildUUID: msg.BuildUUID, // ErrCode: 1, // ErrContent: err.Error(), // }) // } // log.Error().Msgf("runTask success") // return conn.WriteJSON(&Message{ // Version: 1, // Type: MsgTypeBuildResult, // RunnerUUID: msg.RunnerUUID, // BuildUUID: msg.BuildUUID, // }) // case <-c.C: // } // } // default: // return fmt.Errorf("unknow event %s with payload %s", msg.EventName, msg.EventPayload) // } // default: // return fmt.Errorf("received a message with an unsupported type: %#v", msg) // } // } // // TODO: handle the message // func handleMessage(ctx context.Context, conn *websocket.Conn, message []byte) error { // var msg Message // if err := json.Unmarshal(message, &msg); err != nil { // return fmt.Errorf("unmarshal received message faild: %v", err) // } // switch msg.Version { // case 1: // return handleVersion1(ctx, conn, message, &msg) // default: // return fmt.Errorf("recevied a message with an unsupported version, consider upgrade your runner") // } // } func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error { log.Infoln("Starting runner daemon") _ = godotenv.Load(input.envFile) cfg, err := fromEnviron() if err != nil { log.WithError(err). Fatalln("invalid configuration") } initLogging(cfg) engine, err := engine.New() if err != nil { log.WithError(err). Fatalln("cannot load the docker engine") } count := 0 for { err := engine.Ping(ctx) if err == context.Canceled { break } select { case <-ctx.Done(): return ctx.Err() default: } if err != nil { log.WithError(err). Errorln("cannot ping the docker daemon") count++ if count == 5 { log.WithError(err). Fatalf("retry count reached: %d", count) } time.Sleep(time.Second) } else { log.Infoln("successfully pinged the docker daemon") break } } cli := client.New( cfg.Client.Address, cfg.Client.Secret, cfg.Client.SkipVerify, ) for { err := cli.Ping(ctx, cfg.Runner.Name) select { case <-ctx.Done(): return nil default: } if ctx.Err() != nil { break } if err != nil { log.WithError(err). Errorln("cannot ping the remote server") time.Sleep(time.Second) } else { log.Infoln("successfully pinged the remote server") break } } var g errgroup.Group poller := poller.New(cli) g.Go(func() error { log.WithField("capacity", cfg.Runner.Capacity). WithField("endpoint", cfg.Client.Address). WithField("os", cfg.Platform.OS). WithField("arch", cfg.Platform.Arch). Infoln("polling the remote server") poller.Poll(ctx, cfg.Runner.Capacity) return nil }) err = g.Wait() if err != nil { log.WithError(err). Errorln("shutting down the server") } return err // var conn *websocket.Conn // var err error // ticker := time.NewTicker(time.Second) // defer ticker.Stop() // var failedCnt int // for { // select { // case <-ctx.Done(): // log.Info().Msgf("cancel task") // if conn != nil { // err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) // if err != nil { // log.Error().Msgf("write close: %v", err) // } // } // if errors.Is(ctx.Err(), context.Canceled) { // return nil // } // return ctx.Err() // case <-ticker.C: // if conn == nil { // log.Trace().Msgf("trying connect %v", "ws://localhost:3000/api/actions") // conn, _, err = websocket.DefaultDialer.DialContext(ctx, "ws://localhost:3000/api/actions", nil) // if err != nil { // log.Error().Msgf("dial: %v", err) // break // } // // register the client // msg := Message{ // Version: 1, // Type: MsgTypeRegister, // RunnerUUID: "111111", // } // bs, err := json.Marshal(&msg) // if err != nil { // log.Error().Msgf("Marshal: %v", err) // break // } // if err = conn.WriteMessage(websocket.TextMessage, bs); err != nil { // log.Error().Msgf("register failed: %v", err) // conn.Close() // conn = nil // break // } // } // const timeout = time.Second * 10 // for { // select { // case <-ctx.Done(): // log.Info().Msg("cancel task") // return nil // default: // } // _ = conn.SetReadDeadline(time.Now().Add(timeout)) // conn.SetPongHandler(func(string) error { // return conn.SetReadDeadline(time.Now().Add(timeout)) // }) // _, message, err := conn.ReadMessage() // if err != nil { // if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) || // websocket.IsCloseError(err, websocket.CloseNormalClosure) { // log.Trace().Msgf("closed from remote") // conn.Close() // conn = nil // } else if !strings.Contains(err.Error(), "i/o timeout") { // log.Error().Msgf("read message failed: %#v", err) // } // failedCnt++ // if failedCnt > 60 { // if conn != nil { // conn.Close() // conn = nil // } // failedCnt = 0 // } // break // } // if err := handleMessage(ctx, conn, message); err != nil { // log.Error().Msgf(err.Error()) // } // } // } // } } }