diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 39ea59b7b1c2..100197c5e1f1 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -308,22 +308,18 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, p.cond.Broadcast() cancel() } - if p.hasNoWorkerScaling() { - select { - case <-p.baseCtx.Done(): - // Don't warn if the baseCtx is shutdown - default: + select { + case <-p.baseCtx.Done(): + // Don't warn or check for ongoing work if the baseCtx is shutdown + case <-p.paused: + // Don't warn or check for ongoing work if the pool is paused + default: + if p.hasNoWorkerScaling() { log.Warn( "Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+ "The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid) - } - p.pause() - } - select { - case <-p.baseCtx.Done(): - // this worker queue is shut-down don't reboost - default: - if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 { + p.pause() + } else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 { // OK there are no workers but... there's still work to be done -> Reboost p.zeroBoost() // p.lock will be unlocked by zeroBoost @@ -385,14 +381,37 @@ func (p *WorkerPool) pause() { // Resume resumes the WorkerPool func (p *WorkerPool) Resume() { - p.lock.Lock() - defer p.lock.Unlock() + p.lock.Lock() // can't defer unlock because of the zeroBoost at the end select { case <-p.resumed: + // already resumed - there's nothing to do + p.lock.Unlock() + return default: - p.paused = make(chan struct{}) - close(p.resumed) } + + p.paused = make(chan struct{}) + close(p.resumed) + + // OK now we need to check if we need to add some workers... + if p.numberOfWorkers > 0 || p.hasNoWorkerScaling() || atomic.LoadInt64(&p.numInQueue) == 0 { + // We either have workers, can't scale or there's no work to be done -> so just resume + p.lock.Unlock() + return + } + + // OK we got some work but no workers we need to think about boosting + select { + case <-p.baseCtx.Done(): + // don't bother boosting if the baseCtx is done + p.lock.Unlock() + return + default: + } + + // OK we'd better add some boost workers! + p.zeroBoost() + // p.zeroBoost will unlock the lock } // CleanUp will drain the remaining contents of the channel