From abdb547b1b5f5f3e5addc3c9c07b8eb0d9d1fc22 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Fri, 11 Nov 2022 15:00:38 +0800 Subject: [PATCH] chore(poller): add metric to track the worker number Add metric to track multiple task. --- poller/metric.go | 33 +++++++++++++++++++++++++++++++++ poller/poller.go | 35 +++++++++++++++++++++++++++++++++++ runtime/runtime.go | 32 -------------------------------- 3 files changed, 68 insertions(+), 32 deletions(-) create mode 100644 poller/metric.go diff --git a/poller/metric.go b/poller/metric.go new file mode 100644 index 0000000..16343aa --- /dev/null +++ b/poller/metric.go @@ -0,0 +1,33 @@ +package poller + +import "sync/atomic" + +// Metric interface +type Metric interface { + IncBusyWorker() uint64 + DecBusyWorker() uint64 + BusyWorkers() uint64 +} + +var _ Metric = (*metric)(nil) + +type metric struct { + busyWorkers uint64 +} + +// NewMetric for default metric structure +func NewMetric() Metric { + return &metric{} +} + +func (m *metric) IncBusyWorker() uint64 { + return atomic.AddUint64(&m.busyWorkers, 1) +} + +func (m *metric) DecBusyWorker() uint64 { + return atomic.AddUint64(&m.busyWorkers, ^uint64(0)) +} + +func (m *metric) BusyWorkers() uint64 { + return atomic.LoadUint64(&m.busyWorkers) +} diff --git a/poller/poller.go b/poller/poller.go index b719418..0ba3684 100644 --- a/poller/poller.go +++ b/poller/poller.go @@ -24,6 +24,7 @@ func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error Client: cli, Dispatch: dispatch, routineGroup: newRoutineGroup(), + metric: &metric{}, } } @@ -33,6 +34,7 @@ type Poller struct { Dispatch func(context.Context, *runnerv1.Task) error routineGroup *routineGroup + metric *metric errorRetryCounter int } @@ -111,5 +113,38 @@ func (p *Poller) poll(ctx context.Context, thread int) error { runCtx, cancel := context.WithTimeout(ctx, time.Hour) defer cancel() + // update runner status + // running: idle -> active + // stopped: active -> idle + if val := p.metric.IncBusyWorker(); val == 1 { + if _, err := p.Client.UpdateRunner( + ctx, + connect.NewRequest(&runnerv1.UpdateRunnerRequest{ + Status: runnerv1.RunnerStatus_RUNNER_STATUS_ACTIVE, + }), + ); err != nil { + return err + } + l.Info("update runner status to active") + } + + defer func() { + if val := p.metric.DecBusyWorker(); val != 0 { + return + } + + defer func() { + if _, err := p.Client.UpdateRunner( + ctx, + connect.NewRequest(&runnerv1.UpdateRunnerRequest{ + Status: runnerv1.RunnerStatus_RUNNER_STATUS_IDLE, + }), + ); err != nil { + log.Errorln("update status error:", err.Error()) + } + l.Info("update runner status to idle") + }() + }() + return p.Dispatch(runCtx, resp.Msg.Task) } diff --git a/runtime/runtime.go b/runtime/runtime.go index 644a77c..4b304d3 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -5,9 +5,6 @@ import ( "gitea.com/gitea/act_runner/client" runnerv1 "gitea.com/gitea/proto-go/runner/v1" - - "github.com/bufbuild/connect-go" - log "github.com/sirupsen/logrus" ) // Runner runs the pipeline. @@ -20,34 +17,5 @@ type Runner struct { // Run runs the pipeline stage. func (s *Runner) Run(ctx context.Context, task *runnerv1.Task) error { - l := log. - WithField("task.id", task.Id) - l.Info("start running pipeline") - - // update runner status - // running: idle -> active - // stopped: active -> idle - if _, err := s.Client.UpdateRunner( - ctx, - connect.NewRequest(&runnerv1.UpdateRunnerRequest{ - Status: runnerv1.RunnerStatus_RUNNER_STATUS_ACTIVE, - }), - ); err != nil { - return err - } - - l.Info("update runner status to active") - defer func() { - if _, err := s.Client.UpdateRunner( - ctx, - connect.NewRequest(&runnerv1.UpdateRunnerRequest{ - Status: runnerv1.RunnerStatus_RUNNER_STATUS_IDLE, - }), - ); err != nil { - log.Errorln("update status error:", err.Error()) - } - l.Info("update runner status to idle") - }() - return NewTask(s.ForgeInstance, task.Id, s.Client, s.Environ).Run(ctx, task) }