mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2025-01-09 15:28:22 -05:00
As per @guillep2k add mutex locks on shutdown/terminate
This commit is contained in:
parent
9941ae2966
commit
e852cb620f
2 changed files with 29 additions and 9 deletions
|
@ -9,6 +9,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
@ -38,6 +39,7 @@ type LevelQueue struct {
|
||||||
queue *levelqueue.Queue
|
queue *levelqueue.Queue
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
terminated chan struct{}
|
terminated chan struct{}
|
||||||
|
lock sync.Mutex
|
||||||
exemplar interface{}
|
exemplar interface{}
|
||||||
workers int
|
workers int
|
||||||
name string
|
name string
|
||||||
|
@ -173,6 +175,8 @@ func (l *LevelQueue) Push(data Data) error {
|
||||||
|
|
||||||
// Shutdown this queue and stop processing
|
// Shutdown this queue and stop processing
|
||||||
func (l *LevelQueue) Shutdown() {
|
func (l *LevelQueue) Shutdown() {
|
||||||
|
l.lock.Lock()
|
||||||
|
defer l.lock.Unlock()
|
||||||
log.Trace("LevelQueue: %s Shutdown", l.name)
|
log.Trace("LevelQueue: %s Shutdown", l.name)
|
||||||
select {
|
select {
|
||||||
case <-l.closed:
|
case <-l.closed:
|
||||||
|
@ -185,10 +189,13 @@ func (l *LevelQueue) Shutdown() {
|
||||||
func (l *LevelQueue) Terminate() {
|
func (l *LevelQueue) Terminate() {
|
||||||
log.Trace("LevelQueue: %s Terminating", l.name)
|
log.Trace("LevelQueue: %s Terminating", l.name)
|
||||||
l.Shutdown()
|
l.Shutdown()
|
||||||
|
l.lock.Lock()
|
||||||
select {
|
select {
|
||||||
case <-l.terminated:
|
case <-l.terminated:
|
||||||
|
l.lock.Unlock()
|
||||||
default:
|
default:
|
||||||
close(l.terminated)
|
close(l.terminated)
|
||||||
|
l.lock.Unlock()
|
||||||
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
|
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
|
||||||
log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
|
log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
@ -30,13 +31,15 @@ type redisClient interface {
|
||||||
|
|
||||||
// RedisQueue redis queue
|
// RedisQueue redis queue
|
||||||
type RedisQueue struct {
|
type RedisQueue struct {
|
||||||
pool *WorkerPool
|
pool *WorkerPool
|
||||||
client redisClient
|
client redisClient
|
||||||
queueName string
|
queueName string
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
exemplar interface{}
|
terminated chan struct{}
|
||||||
workers int
|
exemplar interface{}
|
||||||
name string
|
workers int
|
||||||
|
name string
|
||||||
|
lock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// RedisQueueConfiguration is the configuration for the redis queue
|
// RedisQueueConfiguration is the configuration for the redis queue
|
||||||
|
@ -195,19 +198,29 @@ func (r *RedisQueue) Push(data Data) error {
|
||||||
// Shutdown processing from this queue
|
// Shutdown processing from this queue
|
||||||
func (r *RedisQueue) Shutdown() {
|
func (r *RedisQueue) Shutdown() {
|
||||||
log.Trace("Shutdown: %s", r.name)
|
log.Trace("Shutdown: %s", r.name)
|
||||||
|
r.lock.Lock()
|
||||||
select {
|
select {
|
||||||
case <-r.closed:
|
case <-r.closed:
|
||||||
default:
|
default:
|
||||||
close(r.closed)
|
close(r.closed)
|
||||||
}
|
}
|
||||||
|
r.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Terminate this queue and close the queue
|
// Terminate this queue and close the queue
|
||||||
func (r *RedisQueue) Terminate() {
|
func (r *RedisQueue) Terminate() {
|
||||||
log.Trace("Terminating: %s", r.name)
|
log.Trace("Terminating: %s", r.name)
|
||||||
r.Shutdown()
|
r.Shutdown()
|
||||||
if err := r.client.Close(); err != nil {
|
r.lock.Lock()
|
||||||
log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
|
select {
|
||||||
|
case <-r.terminated:
|
||||||
|
r.lock.Unlock()
|
||||||
|
default:
|
||||||
|
close(r.terminated)
|
||||||
|
r.lock.Unlock()
|
||||||
|
if err := r.client.Close(); err != nil {
|
||||||
|
log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue