diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 23e96155a9..310f3cd4e1 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -72,6 +72,8 @@ type ManagedPool interface { BoostWorkers() int // SetPoolSettings sets the user updatable settings for the pool SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) + // Done returns a channel that will be closed when the Pool's baseCtx is closed + Done() <-chan struct{} } // ManagedQueueList implements the sort.Interface @@ -141,7 +143,6 @@ func (m *Manager) Remove(qid int64) { delete(m.Queues, qid) m.mutex.Unlock() log.Trace("Queue Manager removed: QID: %d", qid) - } // GetManagedQueue by qid @@ -193,6 +194,17 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error wg.Done() continue } + + if pool, ok := mq.Managed.(ManagedPool); ok { + // No point into flushing pools when their base's ctx is already done. + select { + case <-pool.Done(): + wg.Done() + continue + default: + } + } + allEmpty = false if flushable, ok := mq.Managed.(Flushable); ok { log.Debug("Flushing (flushable) queue: %s", mq.Name) @@ -225,7 +237,6 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error wg.Wait() } return nil - } // ManagedQueues returns the managed queues diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index c3a1c5781e..72f330670a 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -173,7 +173,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { q.internal.(*LevelQueue).Shutdown() GetManager().Remove(q.internal.(*LevelQueue).qid) } - } // Flush flushes the queue and blocks till the queue is empty @@ -252,14 +251,13 @@ func (q *PersistableChannelQueue) Shutdown() { q.channelQueue.Wait() q.internal.(*LevelQueue).Wait() // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) - atomic.AddInt64(&q.channelQueue.numInQueue, -1) - } - log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() + close(q.channelQueue.dataChan) + log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + atomic.AddInt64(&q.channelQueue.numInQueue, -1) + } + log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 0176e2e0b2..dc6ff3b633 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -65,6 +65,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo return pool } +// Done returns when this worker pool's base context has been cancelled +func (p *WorkerPool) Done() <-chan struct{} { + return p.baseCtx.Done() +} + // Push pushes the data to the internal channel func (p *WorkerPool) Push(data Data) { atomic.AddInt64(&p.numInQueue, 1) @@ -90,7 +95,7 @@ func (p *WorkerPool) zeroBoost() { boost = p.maxNumberOfWorkers - p.numberOfWorkers } if mq != nil { - log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout) + log.Debug("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout) start := time.Now() pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) @@ -98,7 +103,7 @@ func (p *WorkerPool) zeroBoost() { mq.RemoveWorkers(pid) } } else { - log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) + log.Debug("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) } p.lock.Unlock() p.addWorkers(ctx, cancel, boost) @@ -326,7 +331,10 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error { log.Trace("WorkerPool: %d Flush", p.qid) for { select { - case data := <-p.dataChan: + case data, ok := <-p.dataChan: + if !ok { + return nil + } p.handle(data) atomic.AddInt64(&p.numInQueue, -1) case <-p.baseCtx.Done(): @@ -341,7 +349,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error { func (p *WorkerPool) doWork(ctx context.Context) { delay := time.Millisecond * 300 - var data = make([]Data, 0, p.batchLength) + data := make([]Data, 0, p.batchLength) for { select { case <-ctx.Done():