From e852cb620f8c12f9bd2f5976ce53126215626b1b Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 4 Jan 2020 14:59:39 +0000 Subject: [PATCH] As per @guillep2k add mutex locks on shutdown/terminate --- modules/queue/queue_disk.go | 7 +++++++ modules/queue/queue_redis.go | 31 ++++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 550e78b97e..98e7b24e42 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -9,6 +9,7 @@ import ( "encoding/json" "fmt" "reflect" + "sync" "time" "code.gitea.io/gitea/modules/log" @@ -38,6 +39,7 @@ type LevelQueue struct { queue *levelqueue.Queue closed chan struct{} terminated chan struct{} + lock sync.Mutex exemplar interface{} workers int name string @@ -173,6 +175,8 @@ func (l *LevelQueue) Push(data Data) error { // Shutdown this queue and stop processing func (l *LevelQueue) Shutdown() { + l.lock.Lock() + defer l.lock.Unlock() log.Trace("LevelQueue: %s Shutdown", l.name) select { case <-l.closed: @@ -185,10 +189,13 @@ func (l *LevelQueue) Shutdown() { func (l *LevelQueue) Terminate() { log.Trace("LevelQueue: %s Terminating", l.name) l.Shutdown() + l.lock.Lock() select { case <-l.terminated: + l.lock.Unlock() default: close(l.terminated) + l.lock.Unlock() if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { log.Error("Error whilst closing internal queue in %s: %v", l.name, err) } diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index de2ceca5e2..87a0ccd932 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -11,6 +11,7 @@ import ( "fmt" "reflect" "strings" + "sync" "time" "code.gitea.io/gitea/modules/log" @@ -30,13 +31,15 @@ type redisClient interface { // RedisQueue redis queue type RedisQueue struct { - pool *WorkerPool - client redisClient - queueName string - closed chan struct{} - exemplar interface{} - workers int - name string + pool *WorkerPool + client redisClient + queueName string + closed chan struct{} + terminated chan struct{} + exemplar interface{} + workers int + name string + lock sync.Mutex } // RedisQueueConfiguration is the configuration for the redis queue @@ -195,19 +198,29 @@ func (r *RedisQueue) Push(data Data) error { // Shutdown processing from this queue func (r *RedisQueue) Shutdown() { log.Trace("Shutdown: %s", r.name) + r.lock.Lock() select { case <-r.closed: default: close(r.closed) } + r.lock.Unlock() } // Terminate this queue and close the queue func (r *RedisQueue) Terminate() { log.Trace("Terminating: %s", r.name) r.Shutdown() - if err := r.client.Close(); err != nil { - log.Error("Error whilst closing internal redis client in %s: %v", r.name, err) + r.lock.Lock() + select { + case <-r.terminated: + r.lock.Unlock() + default: + close(r.terminated) + r.lock.Unlock() + if err := r.client.Close(); err != nil { + log.Error("Error whilst closing internal redis client in %s: %v", r.name, err) + } } }