From 7ba1b7112f47a2025e8538509a24d8c6d5b1f488 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 5 Feb 2022 20:51:25 +0000 Subject: [PATCH] Only attempt to flush queue if the underlying worker pool is not finished (#18593) * Only attempt to flush queue if the underlying worker pool is not finished There is a possible race whereby a worker pool could be cancelled but yet the underlying queue is not empty. This will lead to flush-all cycling because it cannot empty the pool. Signed-off-by: Andrew Thornton * Apply suggestions from code review Co-authored-by: Gusted Co-authored-by: Gusted --- modules/queue/manager.go | 11 +++++++++++ modules/queue/workerpool.go | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 56298a3e00b5..73c57540be85 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -84,6 +84,8 @@ type ManagedPool interface { BoostWorkers() int // SetPoolSettings sets the user updatable settings for the pool SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) + // Done returns a channel that will be closed when the Pool's baseCtx is closed + Done() <-chan struct{} } // ManagedQueueList implements the sort.Interface @@ -211,6 +213,15 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error continue } } + if pool, ok := mq.Managed.(ManagedPool); ok { + // No point into flushing pools when their base's ctx is already done. + select { + case <-pool.Done(): + wg.Done() + continue + default: + } + } allEmpty = false if flushable, ok := mq.Managed.(Flushable); ok { diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index fd56f782d4f9..20108d35886c 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -74,6 +74,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo return pool } +// Done returns when this worker pool's base context has been cancelled +func (p *WorkerPool) Done() <-chan struct{} { + return p.baseCtx.Done() +} + // Push pushes the data to the internal channel func (p *WorkerPool) Push(data Data) { atomic.AddInt64(&p.numInQueue, 1)