From 85d1a7f7d22f21ed341d538388b3894b54947be0 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 7 Dec 2019 16:46:36 +0000 Subject: [PATCH] Queue: Add name variable to queues --- modules/queue/queue_channel.go | 7 +++++-- modules/queue/queue_disk.go | 23 +++++++++++++---------- modules/queue/queue_disk_channel.go | 13 ++++++++++++- modules/queue/queue_redis.go | 13 +++++++++---- modules/queue/queue_wrapped.go | 11 ++++++++--- modules/setting/queue.go | 6 +++++- 6 files changed, 52 insertions(+), 21 deletions(-) diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index ebcf22ef79..90ec52347d 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -24,6 +24,7 @@ type ChannelQueueConfiguration struct { BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int + Name string } // ChannelQueue implements @@ -31,6 +32,7 @@ type ChannelQueue struct { pool *WorkerPool exemplar interface{} workers int + name string } // NewChannelQueue create a memory channel queue @@ -59,16 +61,17 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro }, exemplar: exemplar, workers: config.Workers, + name: config.Name, }, nil } // Run starts to run the queue func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { atShutdown(context.Background(), func() { - log.Warn("ChannelQueue is not shutdownable!") + log.Warn("ChannelQueue: %s is not shutdownable!", c.name) }) atTerminate(context.Background(), func() { - log.Warn("ChannelQueue is not terminatable!") + log.Warn("ChannelQueue: %s is not terminatable!", c.name) }) c.pool.addWorkers(c.pool.baseCtx, c.workers) } diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 50e49f3a29..cb95b96119 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -28,6 +28,7 @@ type LevelQueueConfiguration struct { BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int + Name string } // LevelQueue implements a disk library queue @@ -38,6 +39,7 @@ type LevelQueue struct { terminated chan struct{} exemplar interface{} workers int + name string } // NewLevelQueue creates a ledis local queue @@ -72,6 +74,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) closed: make(chan struct{}), terminated: make(chan struct{}), workers: config.Workers, + name: config.Name, }, nil } @@ -84,16 +87,16 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) go l.readToChan() - log.Trace("Waiting til closed") + log.Trace("%s Waiting til closed", l.name) <-l.closed - log.Trace("Waiting til done") + log.Trace("%s Waiting til done", l.name) l.pool.Wait() // FIXME: graceful: Needs HammerContext - log.Trace("Waiting til cleaned") + log.Trace("%s Waiting til cleaned", l.name) l.pool.CleanUp(context.TODO()) - log.Trace("cleaned") + log.Trace("%s cleaned", l.name) } @@ -108,7 +111,7 @@ func (l *LevelQueue) readToChan() { bs, err := l.queue.RPop() if err != nil { if err != levelqueue.ErrNotFound { - log.Error("RPop: %v", err) + log.Error("%s RPop: %v", l.name, err) } time.Sleep(time.Millisecond * 100) continue @@ -130,12 +133,12 @@ func (l *LevelQueue) readToChan() { err = json.Unmarshal(bs, &data) } if err != nil { - log.Error("LevelQueue failed to unmarshal: %v", err) + log.Error("LevelQueue: %s failed to unmarshal: %v", l.name, err) time.Sleep(time.Millisecond * 10) continue } - log.Trace("LevelQueue: task found: %#v", data) + log.Trace("LevelQueue %s: task found: %#v", l.name, data) l.pool.Push(data) time.Sleep(time.Millisecond * 10) @@ -163,6 +166,7 @@ func (l *LevelQueue) Push(data Data) error { // Shutdown this queue and stop processing func (l *LevelQueue) Shutdown() { + log.Trace("Shutdown: %s", l.name) select { case <-l.closed: default: @@ -172,12 +176,11 @@ func (l *LevelQueue) Shutdown() { // Terminate this queue and close the queue func (l *LevelQueue) Terminate() { - log.Trace("Terminating") + log.Trace("Terminating: %s", l.name) l.Shutdown() if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { - log.Error("Error whilst closing internal queue: %v", err) + log.Error("Error whilst closing internal queue in %s: %v", l.name, err) } - } func init() { diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index f327827152..fc186b3bb9 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -16,6 +16,7 @@ const PersistableChannelQueueType Type = "persistable-channel" // PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue type PersistableChannelQueueConfiguration struct { + Name string DataDir string BatchLength int QueueLength int @@ -50,6 +51,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( BlockTimeout: config.BlockTimeout, BoostTimeout: config.BoostTimeout, BoostWorkers: config.BoostWorkers, + Name: config.Name + "-channel", }, exemplar) if err != nil { return nil, err @@ -64,6 +66,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, BoostWorkers: 5, + Name: config.Name + "-level", } levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar) @@ -72,6 +75,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( ChannelQueue: channelQueue.(*ChannelQueue), delayedStarter: delayedStarter{ internal: levelQueue.(*LevelQueue), + name: config.Name, }, closed: make(chan struct{}), }, nil @@ -88,11 +92,17 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( underlying: LevelQueueType, timeout: config.Timeout, maxAttempts: config.MaxAttempts, + name: config.Name, }, closed: make(chan struct{}), }, nil } +// Name returns the name of this queue +func (p *PersistableChannelQueue) Name() string { + return p.delayedStarter.name +} + // Push will push the indexer data to queue func (p *PersistableChannelQueue) Push(data Data) error { select { @@ -134,6 +144,7 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte // Shutdown processing this queue func (p *PersistableChannelQueue) Shutdown() { + log.Trace("Shutdown: %s", p.delayedStarter.name) select { case <-p.closed: default: @@ -148,7 +159,7 @@ func (p *PersistableChannelQueue) Shutdown() { // Terminate this queue and close the queue func (p *PersistableChannelQueue) Terminate() { - log.Trace("Terminating") + log.Trace("Terminating: %s", p.delayedStarter.name) p.Shutdown() p.lock.Lock() defer p.lock.Unlock() diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index acc6feeb95..ebcba683cb 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -36,6 +36,7 @@ type RedisQueue struct { closed chan struct{} exemplar interface{} workers int + name string } // RedisQueueConfiguration is the configuration for the redis queue @@ -50,6 +51,7 @@ type RedisQueueConfiguration struct { BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int + Name string } // NewRedisQueue creates single redis or cluster redis queue @@ -80,6 +82,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) exemplar: exemplar, closed: make(chan struct{}), workers: config.Workers, + name: config.Name, } if len(dbs) == 0 { return nil, errors.New("no redis host found") @@ -125,7 +128,7 @@ func (r *RedisQueue) readToChan() { default: bs, err := r.client.LPop(r.queueName).Bytes() if err != nil && err != redis.Nil { - log.Error("LPop failed: %v", err) + log.Error("RedisQueue: %s LPop failed: %v", r.name, err) time.Sleep(time.Millisecond * 100) continue } @@ -146,12 +149,12 @@ func (r *RedisQueue) readToChan() { err = json.Unmarshal(bs, &data) } if err != nil { - log.Error("Unmarshal: %v", err) + log.Error("RedisQueue: %s Unmarshal: %v", r.name, err) time.Sleep(time.Millisecond * 100) continue } - log.Trace("RedisQueue: task found: %#v", data) + log.Trace("RedisQueue: %s task found: %#v", r.name, data) r.pool.Push(data) time.Sleep(time.Millisecond * 10) } @@ -178,6 +181,7 @@ func (r *RedisQueue) Push(data Data) error { // Shutdown processing from this queue func (r *RedisQueue) Shutdown() { + log.Trace("Shutdown: %s", r.name) select { case <-r.closed: default: @@ -187,9 +191,10 @@ func (r *RedisQueue) Shutdown() { // Terminate this queue and close the queue func (r *RedisQueue) Terminate() { + log.Trace("Terminating: %s", r.name) r.Shutdown() if err := r.client.Close(); err != nil { - log.Error("Error whilst closing internal redis client: %v", err) + log.Error("Error whilst closing internal redis client in %s: %v", r.name, err) } } diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index f99675a9f9..2293327348 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -24,6 +24,7 @@ type WrappedQueueConfiguration struct { MaxAttempts int Config interface{} QueueLength int + Name string } type delayedStarter struct { @@ -33,6 +34,7 @@ type delayedStarter struct { cfg interface{} timeout time.Duration maxAttempts int + name string } func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) { @@ -55,7 +57,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h select { case <-ctx.Done(): q.lock.Unlock() - log.Fatal("Timedout creating queue %v with cfg %v ", q.underlying, q.cfg) + log.Fatal("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name) default: queue, err := CreateQueue(q.underlying, handle, q.cfg, exemplar) if err == nil { @@ -64,12 +66,12 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h break } if err.Error() != "resource temporarily unavailable" { - log.Warn("[Attempt: %d] Failed to create queue: %v cfg: %v error: %v", i, q.underlying, q.cfg, err) + log.Warn("[Attempt: %d] Failed to create queue: %v for %s cfg: %v error: %v", i, q.underlying, q.name, q.cfg, err) } i++ if q.maxAttempts > 0 && i > q.maxAttempts { q.lock.Unlock() - log.Fatal("Unable to create queue %v with cfg %v by max attempts: error: %v", q.underlying, q.cfg, err) + log.Fatal("Unable to create queue %v for %s with cfg %v by max attempts: error: %v", q.underlying, q.name, q.cfg, err) } sleepTime := 100 * time.Millisecond if q.timeout > 0 && q.maxAttempts > 0 { @@ -118,6 +120,7 @@ func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro underlying: config.Underlying, timeout: config.Timeout, maxAttempts: config.MaxAttempts, + name: config.Name, }, }, nil } @@ -156,6 +159,7 @@ func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()) // Shutdown this queue and stop processing func (q *WrappedQueue) Shutdown() { + log.Trace("Shutdown: %s", q.name) q.lock.Lock() defer q.lock.Unlock() if q.internal == nil { @@ -168,6 +172,7 @@ func (q *WrappedQueue) Shutdown() { // Terminate this queue and close the queue func (q *WrappedQueue) Terminate() { + log.Trace("Terminating: %s", q.name) q.lock.Lock() defer q.lock.Unlock() if q.internal == nil { diff --git a/modules/setting/queue.go b/modules/setting/queue.go index b619c9855a..0066d5a946 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -23,6 +23,7 @@ type queueSettings struct { Type string Addresses string Password string + QueueName string DBIndex int WrapIfNecessary bool MaxAttempts int @@ -40,13 +41,14 @@ var Queue = queueSettings{} func CreateQueue(name string, handle queue.HandlerFunc, exemplar interface{}) queue.Queue { q := getQueueSettings(name) opts := make(map[string]interface{}) + opts["Name"] = name opts["QueueLength"] = q.Length opts["BatchLength"] = q.BatchLength opts["DataDir"] = q.DataDir opts["Addresses"] = q.Addresses opts["Password"] = q.Password opts["DBIndex"] = q.DBIndex - opts["QueueName"] = name + opts["QueueName"] = q.QueueName opts["Workers"] = q.Workers opts["BlockTimeout"] = q.BlockTimeout opts["BoostTimeout"] = q.BoostTimeout @@ -106,6 +108,7 @@ func getQueueSettings(name string) queueSettings { q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout) q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout) q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers) + q.QueueName = sec.Key("QUEUE_NAME").MustString(Queue.QueueName) q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString) return q @@ -130,6 +133,7 @@ func newQueueService() { Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second) Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute) Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5) + Queue.QueueName = sec.Key("QUEUE_NAME").MustString(Queue.QueueName) hasWorkers := false for _, key := range Cfg.Section("queue.notification").Keys() {