From 9830f34d364aa289e98b03f6d501a5afbb8379eb Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Fri, 11 Nov 2022 16:57:41 +0800 Subject: [PATCH] fix(poller): graceful shutdown --- poller/poller.go | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/poller/poller.go b/poller/poller.go index 0ba3684..9e0d52d 100644 --- a/poller/poller.go +++ b/poller/poller.go @@ -12,11 +12,6 @@ import ( log "github.com/sirupsen/logrus" ) -const ( - errorRetryCounterLimit = 3 - errorRetryTimeSleepSecs = 30 -) - var ErrDataLock = errors.New("Data Lock Error") func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error) *Poller { @@ -33,9 +28,8 @@ type Poller struct { Filter *client.Filter Dispatch func(context.Context, *runnerv1.Task) error - routineGroup *routineGroup - metric *metric - errorRetryCounter int + routineGroup *routineGroup + metric *metric } func (p *Poller) Wait() { @@ -59,10 +53,10 @@ func (p *Poller) Poll(ctx context.Context, n int) error { if err := p.poll(ctx, i+1); err != nil { log.WithField("thread", i+1). WithError(err).Error("poll error") - if p.errorRetryCounter > errorRetryCounterLimit { - log.WithField("thread", i+1).Error("poller: too many errors, sleeping for 30 seconds") - // FIXME: it makes ctrl+c hang up - time.Sleep(time.Second * errorRetryTimeSleepSecs) + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Second): } } } @@ -86,19 +80,16 @@ func (p *Poller) poll(ctx context.Context, thread int) error { resp, err := p.Client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{})) if err == context.Canceled || err == context.DeadlineExceeded { l.WithError(err).Trace("poller: no stage returned") - p.errorRetryCounter++ return nil } if err != nil && err == ErrDataLock { l.WithError(err).Info("task accepted by another runner") - p.errorRetryCounter++ return nil } if err != nil { l.WithError(err).Error("cannot accept task") - p.errorRetryCounter++ return err } @@ -108,8 +99,6 @@ func (p *Poller) poll(ctx context.Context, thread int) error { return nil } - p.errorRetryCounter = 0 - runCtx, cancel := context.WithTimeout(ctx, time.Hour) defer cancel()