diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 20108d35886c..39ea59b7b1c2 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -115,6 +115,9 @@ func (p *WorkerPool) hasNoWorkerScaling() bool { return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0) } +// zeroBoost will add a temporary boost worker for a no worker queue +// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function +// (This is because addWorkers has to be called whilst unlocked) func (p *WorkerPool) zeroBoost() { ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout) mq := GetManager().GetManagedQueue(p.qid) @@ -316,6 +319,17 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, } p.pause() } + select { + case <-p.baseCtx.Done(): + // this worker queue is shut-down don't reboost + default: + if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 { + // OK there are no workers but... there's still work to be done -> Reboost + p.zeroBoost() + // p.lock will be unlocked by zeroBoost + return + } + } p.lock.Unlock() }() } diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 6f285ec467c6..5639a08f9640 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -59,11 +59,13 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { handler := func(idx int, bean interface{}, limit int) error { var item SyncRequest + var repo *repo_model.Repository if m, ok := bean.(*repo_model.Mirror); ok { if m.Repo == nil { log.Error("Disconnected mirror found: %d", m.ID) return nil } + repo = m.Repo item = SyncRequest{ Type: PullMirrorType, RepoID: m.RepoID, @@ -73,6 +75,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { log.Error("Disconnected push-mirror found: %d", m.ID) return nil } + repo = m.Repo item = SyncRequest{ Type: PushMirrorType, RepoID: m.RepoID, @@ -89,17 +92,16 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { default: } - // Check if this request is already in the queue - has, err := mirrorQueue.Has(&item) - if err != nil { - return err - } - if has { - return nil - } - // Push to the Queue if err := mirrorQueue.Push(&item); err != nil { + if err == queue.ErrAlreadyInQueue { + if item.Type == PushMirrorType { + log.Trace("PushMirrors for %-v already queued for sync", repo) + } else { + log.Trace("PullMirrors for %-v already queued for sync", repo) + } + return nil + } return err } @@ -110,23 +112,29 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { return nil } + pullMirrorsRequested := 0 if pullLimit != 0 { + requested = 0 if err := repo_model.MirrorsIterate(func(idx int, bean interface{}) error { return handler(idx, bean, pullLimit) }); err != nil && err != errLimit { log.Error("MirrorsIterate: %v", err) return err } + pullMirrorsRequested, requested = requested, 0 } + pushMirrorsRequested := 0 if pushLimit != 0 { + requested = 0 if err := repo_model.PushMirrorsIterate(func(idx int, bean interface{}) error { return handler(idx, bean, pushLimit) }); err != nil && err != errLimit { log.Error("PushMirrorsIterate: %v", err) return err } + pushMirrorsRequested, requested = requested, 0 } - log.Trace("Finished: Update") + log.Trace("Finished: Update: %d pull mirrors and %d push mirrors queued", pullMirrorsRequested, pushMirrorsRequested) return nil }