diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 2098353879..46a097e84a 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -122,7 +122,12 @@ func (p *PersistableChannelQueue) Push(data Data) error { func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { p.lock.Lock() if p.internal == nil { - p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar) + err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar) + p.lock.Unlock() + if err != nil { + log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err) + return + } } else { p.lock.Unlock() } diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index 4578cd7250..d0b93b54d0 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -37,7 +37,8 @@ type delayedStarter struct { name string } -func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) { +// setInternal must be called with the lock locked. +func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error { var ctx context.Context var cancel context.CancelFunc if q.timeout > 0 { @@ -56,8 +57,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h for q.internal == nil { select { case <-ctx.Done(): - q.lock.Unlock() - log.Fatal("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name) + return fmt.Errorf("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name) default: queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar) if err == nil { @@ -70,16 +70,21 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h } i++ if q.maxAttempts > 0 && i > q.maxAttempts { - q.lock.Unlock() - log.Fatal("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) + return fmt.Errorf("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) } sleepTime := 100 * time.Millisecond if q.timeout > 0 && q.maxAttempts > 0 { sleepTime = (q.timeout - 200*time.Millisecond) / time.Duration(q.maxAttempts) } - time.Sleep(sleepTime) + t := time.NewTimer(sleepTime) + select { + case <-ctx.Done(): + t.Stop() + case <-t.C: + } } } + return nil } // WrappedQueue wraps a delayed starting queue @@ -151,7 +156,12 @@ func (q *WrappedQueue) Push(data Data) error { func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) { q.lock.Lock() if q.internal == nil { - q.setInternal(atShutdown, q.handle, q.exemplar) + err := q.setInternal(atShutdown, q.handle, q.exemplar) + q.lock.Unlock() + if err != nil { + log.Fatal("Unable to set the internal queue for %s Error: %v", q.Name(), err) + return + } go func() { for data := range q.channel { _ = q.internal.Push(data)