From 9ad9070555868e472c80ff4b5c27041ab49892c7 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 11 Dec 2019 20:34:05 +0000 Subject: [PATCH] Queue: Improve logging --- modules/queue/manager.go | 5 ++ modules/queue/queue_disk.go | 38 +++++++++------ modules/queue/queue_disk_channel.go | 10 +++- modules/queue/queue_disk_channel_test.go | 28 +++++++---- modules/queue/queue_disk_test.go | 59 +++++++++++++++--------- modules/queue/queue_redis.go | 16 +++++-- modules/queue/queue_wrapped.go | 7 +-- modules/queue/workerpool.go | 14 ++++-- 8 files changed, 115 insertions(+), 62 deletions(-) diff --git a/modules/queue/manager.go b/modules/queue/manager.go index 100780c706..81478019e5 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -12,6 +12,8 @@ import ( "sort" "sync" "time" + + "code.gitea.io/gitea/modules/log" ) var manager *Manager @@ -96,6 +98,7 @@ func (m *Manager) Add(queue Queue, } m.Queues[desc.QID] = desc m.mutex.Unlock() + log.Trace("Queue Manager registered: %s (QID: %d)", desc.Name, desc.QID) return desc.QID } @@ -104,6 +107,8 @@ func (m *Manager) Remove(qid int64) { m.mutex.Lock() delete(m.Queues, qid) m.mutex.Unlock() + log.Trace("Queue Manager removed: QID: %d", qid) + } // GetDescription by qid diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index f18f3c5f8e..41e8a9e7c0 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -91,16 +91,18 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) go l.readToChan() - log.Trace("%s Waiting til closed", l.name) + log.Trace("LevelQueue: %s Waiting til closed", l.name) <-l.closed - log.Trace("%s Waiting til done", l.name) + log.Trace("LevelQueue: %s Waiting til done", l.name) l.pool.Wait() - // FIXME: graceful: Needs HammerContext - log.Trace("%s Waiting til cleaned", l.name) - l.pool.CleanUp(context.TODO()) - log.Trace("%s cleaned", l.name) + log.Trace("LevelQueue: %s Waiting til cleaned", l.name) + ctx, cancel := context.WithCancel(context.Background()) + atTerminate(ctx, cancel) + l.pool.CleanUp(ctx) + cancel() + log.Trace("LevelQueue: %s Cleaned", l.name) } @@ -115,7 +117,7 @@ func (l *LevelQueue) readToChan() { bs, err := l.queue.RPop() if err != nil { if err != levelqueue.ErrNotFound { - log.Error("%s RPop: %v", l.name, err) + log.Error("LevelQueue: %s Error on RPop: %v", l.name, err) } time.Sleep(time.Millisecond * 100) continue @@ -137,14 +139,14 @@ func (l *LevelQueue) readToChan() { err = json.Unmarshal(bs, &data) } if err != nil { - log.Error("LevelQueue: %s failed to unmarshal: %v", l.name, err) - time.Sleep(time.Millisecond * 10) + log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err) + time.Sleep(time.Millisecond * 100) continue } - log.Trace("LevelQueue %s: task found: %#v", l.name, data) + log.Trace("LevelQueue %s: Task found: %#v", l.name, data) l.pool.Push(data) - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 10) } } @@ -170,7 +172,7 @@ func (l *LevelQueue) Push(data Data) error { // Shutdown this queue and stop processing func (l *LevelQueue) Shutdown() { - log.Trace("Shutdown: %s", l.name) + log.Trace("LevelQueue: %s Shutdown", l.name) select { case <-l.closed: default: @@ -180,10 +182,16 @@ func (l *LevelQueue) Shutdown() { // Terminate this queue and close the queue func (l *LevelQueue) Terminate() { - log.Trace("Terminating: %s", l.name) + log.Trace("LevelQueue: %s Terminating", l.name) l.Shutdown() - if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { - log.Error("Error whilst closing internal queue in %s: %v", l.name, err) + select { + case <-l.terminated: + default: + close(l.terminated) + if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" { + log.Error("Error whilst closing internal queue in %s: %v", l.name, err) + } + } } diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 3bf39b9fa5..884fc410df 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -133,22 +133,28 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0) }() + log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name) <-p.closed + log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name) p.ChannelQueue.pool.cancel() p.internal.(*LevelQueue).pool.cancel() + log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name) p.ChannelQueue.pool.Wait() p.internal.(*LevelQueue).pool.Wait() // Redirect all remaining data in the chan to the internal channel go func() { + log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name) for data := range p.ChannelQueue.pool.dataChan { _ = p.internal.Push(data) } + log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name) }() + log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name) } // Shutdown processing this queue func (p *PersistableChannelQueue) Shutdown() { - log.Trace("Shutdown: %s", p.delayedStarter.name) + log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name) select { case <-p.closed: default: @@ -163,7 +169,7 @@ func (p *PersistableChannelQueue) Shutdown() { // Terminate this queue and close the queue func (p *PersistableChannelQueue) Terminate() { - log.Trace("Terminating: %s", p.delayedStarter.name) + log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name) p.Shutdown() p.lock.Lock() defer p.lock.Unlock() diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 5f6f614bd8..01a90ebcfb 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -24,8 +24,8 @@ func TestPersistableChannelQueue(t *testing.T) { } } - var queueShutdown func() - var queueTerminate func() + queueShutdown := []func(){} + queueTerminate := []func(){} tmpDir, err := ioutil.TempDir("", "persistable-channel-queue-test-data") assert.NoError(t, err) @@ -40,9 +40,9 @@ func TestPersistableChannelQueue(t *testing.T) { assert.NoError(t, err) go queue.Run(func(_ context.Context, shutdown func()) { - queueShutdown = shutdown + queueShutdown = append(queueShutdown, shutdown) }, func(_ context.Context, terminate func()) { - queueTerminate = terminate + queueTerminate = append(queueTerminate, terminate) }) test1 := testData{"A", 1} @@ -66,7 +66,9 @@ func TestPersistableChannelQueue(t *testing.T) { err = queue.Push(test1) assert.Error(t, err) - queueShutdown() + for _, callback := range queueShutdown { + callback() + } time.Sleep(200 * time.Millisecond) err = queue.Push(&test1) assert.NoError(t, err) @@ -77,7 +79,9 @@ func TestPersistableChannelQueue(t *testing.T) { assert.Fail(t, "Handler processing should have stopped") default: } - queueTerminate() + for _, callback := range queueTerminate { + callback() + } // Reopen queue queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ @@ -89,9 +93,9 @@ func TestPersistableChannelQueue(t *testing.T) { assert.NoError(t, err) go queue.Run(func(_ context.Context, shutdown func()) { - queueShutdown = shutdown + queueShutdown = append(queueShutdown, shutdown) }, func(_ context.Context, terminate func()) { - queueTerminate = terminate + queueTerminate = append(queueTerminate, terminate) }) result3 := <-handleChan @@ -101,7 +105,11 @@ func TestPersistableChannelQueue(t *testing.T) { result4 := <-handleChan assert.Equal(t, test2.TestString, result4.TestString) assert.Equal(t, test2.TestInt, result4.TestInt) - queueShutdown() - queueTerminate() + for _, callback := range queueShutdown { + callback() + } + for _, callback := range queueTerminate { + callback() + } } diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index b9c6f278ef..03de451760 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -6,6 +6,7 @@ package queue import ( "context" + "io/ioutil" "os" "testing" "time" @@ -23,11 +24,15 @@ func TestLevelQueue(t *testing.T) { } } - var queueShutdown func() - var queueTerminate func() + queueShutdown := []func(){} + queueTerminate := []func(){} + + tmpDir, err := ioutil.TempDir("", "level-queue-test-data") + assert.NoError(t, err) + defer os.RemoveAll(tmpDir) queue, err := NewLevelQueue(handle, LevelQueueConfiguration{ - DataDir: "level-queue-test-data", + DataDir: tmpDir, BatchLength: 2, Workers: 1, QueueLength: 20, @@ -38,9 +43,9 @@ func TestLevelQueue(t *testing.T) { assert.NoError(t, err) go queue.Run(func(_ context.Context, shutdown func()) { - queueShutdown = shutdown + queueShutdown = append(queueShutdown, shutdown) }, func(_ context.Context, terminate func()) { - queueTerminate = terminate + queueTerminate = append(queueTerminate, terminate) }) test1 := testData{"A", 1} @@ -64,7 +69,9 @@ func TestLevelQueue(t *testing.T) { err = queue.Push(test1) assert.Error(t, err) - queueShutdown() + for _, callback := range queueShutdown { + callback() + } time.Sleep(200 * time.Millisecond) err = queue.Push(&test1) assert.NoError(t, err) @@ -75,24 +82,30 @@ func TestLevelQueue(t *testing.T) { assert.Fail(t, "Handler processing should have stopped") default: } - queueTerminate() + for _, callback := range queueTerminate { + callback() + } // Reopen queue - queue, err = NewLevelQueue(handle, LevelQueueConfiguration{ - DataDir: "level-queue-test-data", - BatchLength: 2, - Workers: 1, - QueueLength: 20, - BlockTimeout: 1 * time.Second, - BoostTimeout: 5 * time.Minute, - BoostWorkers: 5, - }, &testData{}) + queue, err = NewWrappedQueue(handle, + WrappedQueueConfiguration{ + Underlying: LevelQueueType, + Config: LevelQueueConfiguration{ + DataDir: tmpDir, + BatchLength: 2, + Workers: 1, + QueueLength: 20, + BlockTimeout: 1 * time.Second, + BoostTimeout: 5 * time.Minute, + BoostWorkers: 5, + }, + }, &testData{}) assert.NoError(t, err) go queue.Run(func(_ context.Context, shutdown func()) { - queueShutdown = shutdown + queueShutdown = append(queueShutdown, shutdown) }, func(_ context.Context, terminate func()) { - queueTerminate = terminate + queueTerminate = append(queueTerminate, terminate) }) result3 := <-handleChan @@ -102,8 +115,10 @@ func TestLevelQueue(t *testing.T) { result4 := <-handleChan assert.Equal(t, test2.TestString, result4.TestString) assert.Equal(t, test2.TestInt, result4.TestInt) - queueShutdown() - queueTerminate() - - os.RemoveAll("level-queue-test-data") + for _, callback := range queueShutdown { + callback() + } + for _, callback := range queueTerminate { + callback() + } } diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index 88794428a8..4f2ceec029 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -116,10 +116,16 @@ func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) go r.readToChan() + log.Trace("RedisQueue: %s Waiting til closed", r.name) <-r.closed + log.Trace("RedisQueue: %s Waiting til done", r.name) r.pool.Wait() - // FIXME: graceful: Needs HammerContext - r.pool.CleanUp(context.TODO()) + + log.Trace("RedisQueue: %s Waiting til cleaned", r.name) + ctx, cancel := context.WithCancel(context.Background()) + atTerminate(ctx, cancel) + r.pool.CleanUp(ctx) + cancel() } func (r *RedisQueue) readToChan() { @@ -132,7 +138,7 @@ func (r *RedisQueue) readToChan() { default: bs, err := r.client.LPop(r.queueName).Bytes() if err != nil && err != redis.Nil { - log.Error("RedisQueue: %s LPop failed: %v", r.name, err) + log.Error("RedisQueue: %s Error on LPop: %v", r.name, err) time.Sleep(time.Millisecond * 100) continue } @@ -153,12 +159,12 @@ func (r *RedisQueue) readToChan() { err = json.Unmarshal(bs, &data) } if err != nil { - log.Error("RedisQueue: %s Unmarshal: %v", r.name, err) + log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err) time.Sleep(time.Millisecond * 100) continue } - log.Trace("RedisQueue: %s task found: %#v", r.name, data) + log.Trace("RedisQueue: %s Task found: %#v", r.name, data) r.pool.Push(data) time.Sleep(time.Millisecond * 10) } diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index 57f19f6312..46557ea318 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -92,7 +92,7 @@ type WrappedQueue struct { // NewWrappedQueue will attempt to create a queue of the provided type, // but if there is a problem creating this queue it will instead create -// a WrappedQueue with delayed the startup of the queue instead and a +// a WrappedQueue with delayed startup of the queue instead and a // channel which will be redirected to the queue func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) { configInterface, err := toConfig(WrappedQueueConfiguration{}, cfg) @@ -162,11 +162,12 @@ func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()) } q.internal.Run(atShutdown, atTerminate) + log.Trace("WrappedQueue: %s Done", q.name) } // Shutdown this queue and stop processing func (q *WrappedQueue) Shutdown() { - log.Trace("Shutdown: %s", q.name) + log.Trace("WrappedQueue: %s Shutdown", q.name) q.lock.Lock() defer q.lock.Unlock() if q.internal == nil { @@ -179,7 +180,7 @@ func (q *WrappedQueue) Shutdown() { // Terminate this queue and close the queue func (q *WrappedQueue) Terminate() { - log.Trace("Terminating: %s", q.name) + log.Trace("WrappedQueue: %s Terminating", q.name) q.lock.Lock() defer q.lock.Unlock() if q.internal == nil { diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index bf3a15c00e..fe05e7fe6e 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -72,7 +72,7 @@ func (p *WorkerPool) pushBoost(data Data) { ctx, cancel := context.WithCancel(p.baseCtx) desc := GetManager().GetDescription(p.qid) if desc != nil { - log.Warn("Worker Channel for %v blocked for %v - adding %d temporary workers for %s, block timeout now %v", desc.Name, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) + log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, desc.Name, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) start := time.Now() pid := desc.RegisterWorkers(p.boostWorkers, start, false, start, cancel) @@ -82,7 +82,7 @@ func (p *WorkerPool) pushBoost(data Data) { cancel() }() } else { - log.Warn("Worker Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) + log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) } go func() { <-time.After(p.boostTimeout) @@ -128,6 +128,10 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance desc.RemoveWorkers(pid) cancel() }() + log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, desc.Name, number, pid) + } else { + log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) + } p.addWorkers(ctx, number) return cancel @@ -173,18 +177,18 @@ func (p *WorkerPool) Wait() { // CleanUp will drain the remaining contents of the channel // This should be called after AddWorkers context is closed func (p *WorkerPool) CleanUp(ctx context.Context) { - log.Trace("CleanUp") + log.Trace("WorkerPool: %d CleanUp", p.qid) close(p.dataChan) for data := range p.dataChan { p.handle(data) select { case <-ctx.Done(): - log.Warn("Cleanup context closed before finishing clean-up") + log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid) return default: } } - log.Trace("CleanUp done") + log.Trace("WorkerPool: %d CleanUp Done", p.qid) } func (p *WorkerPool) doWork(ctx context.Context) {