diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index 2c7a5e8..2ee2b8a 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -46,28 +46,28 @@ func (p *Poller) Poll(ctx context.Context) { func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) { defer wg.Done() for { - p.pollTaskWithRateLimit(ctx, limiter) + if err := limiter.Wait(ctx); err != nil { + if ctx.Err() != nil { + log.WithError(err).Debug("limiter wait failed") + } + return + } + task, ok := p.fetchTask(ctx) + if !ok { + continue + } + p.runTaskWithRecover(ctx, task) } } -func (p *Poller) pollTaskWithRateLimit(ctx context.Context, limiter *rate.Limiter) { +func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) { defer func() { if r := recover(); r != nil { err := fmt.Errorf("panic: %v", r) - log.WithError(err).Error("panic in pollTaskWithRateLimit") + log.WithError(err).Error("panic in runTaskWithRecover") } }() - if err := limiter.Wait(ctx); err != nil { - if ctx.Err() != nil { - log.WithError(err).Debug("limiter wait failed") - } - return - } - task, ok := p.fetchTask(ctx) - if !ok { - return - } if err := p.runner.Run(ctx, task); err != nil { log.WithError(err).Error("failed to run task") }