From 030b6d91c854ddf49d99dd5a9cf9ae5588fd8dc8 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Tue, 31 Dec 2019 18:18:03 +0000 Subject: [PATCH] Remove dependency on queue from setting --- modules/indexer/issues/indexer.go | 2 +- modules/queue/queue.go | 4 +- modules/queue/queue_wrapped.go | 4 +- modules/queue/setting.go | 75 +++++++++++++++++++++++++++++++ modules/setting/queue.go | 63 ++++---------------------- modules/setting/task.go | 6 +-- modules/task/task.go | 3 +- 7 files changed, 91 insertions(+), 66 deletions(-) create mode 100644 modules/queue/setting.go diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 6c89a9708a..894f37a963 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -127,7 +127,7 @@ func InitIssueIndexer(syncReindex bool) { } } - issueIndexerQueue = setting.CreateQueue("issue_indexer", handler, &IndexerData{}) + issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{}) if issueIndexerQueue == nil { log.Fatal("Unable to create issue indexer queue") diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 464e16dab1..d458a7d506 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -123,8 +123,8 @@ func RegisteredTypesAsString() []string { return types } -// CreateQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error -func CreateQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) { +// NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error +func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) { newFn, ok := queuesMap[queueType] if !ok { return nil, fmt.Errorf("Unsupported queue type: %v", queueType) diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index c218749b65..4578cd7250 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -59,7 +59,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h q.lock.Unlock() log.Fatal("Timedout creating queue %v with cfg %v in %s", q.underlying, q.cfg, q.name) default: - queue, err := CreateQueue(q.underlying, handle, q.cfg, exemplar) + queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar) if err == nil { q.internal = queue q.lock.Unlock() @@ -101,7 +101,7 @@ func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro } config := configInterface.(WrappedQueueConfiguration) - queue, err := CreateQueue(config.Underlying, handle, config.Config, exemplar) + queue, err := NewQueue(config.Underlying, handle, config.Config, exemplar) if err == nil { // Just return the queue there is no need to wrap return queue, nil diff --git a/modules/queue/setting.go b/modules/queue/setting.go new file mode 100644 index 0000000000..d5a6b41882 --- /dev/null +++ b/modules/queue/setting.go @@ -0,0 +1,75 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "encoding/json" + "fmt" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +func validType(t string) (Type, error) { + if len(t) == 0 { + return PersistableChannelQueueType, nil + } + for _, typ := range RegisteredTypes() { + if t == string(typ) { + return typ, nil + } + } + return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType)) +} + +// CreateQueue for name with provided handler and exemplar +func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { + q := setting.GetQueueSettings(name) + opts := make(map[string]interface{}) + opts["Name"] = name + opts["QueueLength"] = q.Length + opts["BatchLength"] = q.BatchLength + opts["DataDir"] = q.DataDir + opts["Addresses"] = q.Addresses + opts["Network"] = q.Network + opts["Password"] = q.Password + opts["DBIndex"] = q.DBIndex + opts["QueueName"] = q.QueueName + opts["Workers"] = q.Workers + opts["MaxWorkers"] = q.MaxWorkers + opts["BlockTimeout"] = q.BlockTimeout + opts["BoostTimeout"] = q.BoostTimeout + opts["BoostWorkers"] = q.BoostWorkers + + typ, err := validType(q.Type) + if err != nil { + log.Error("Invalid type %s provided for queue named %s defaulting to %s", q.Type, name, string(typ)) + } + + cfg, err := json.Marshal(opts) + if err != nil { + log.Error("Unable to marshall generic options: %v Error: %v", opts, err) + log.Error("Unable to create queue for %s", name, err) + return nil + } + + returnable, err := NewQueue(typ, handle, cfg, exemplar) + if q.WrapIfNecessary && err != nil { + log.Warn("Unable to create queue for %s: %v", name, err) + log.Warn("Attempting to create wrapped queue") + returnable, err = NewQueue(WrappedQueueType, handle, WrappedQueueConfiguration{ + Underlying: Type(q.Type), + Timeout: q.Timeout, + MaxAttempts: q.MaxAttempts, + Config: cfg, + QueueLength: q.Length, + }, exemplar) + } + if err != nil { + log.Error("Unable to create queue for %s: %v", name, err) + return nil + } + return returnable +} diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 0170834391..bb3c301262 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -5,7 +5,6 @@ package setting import ( - "encoding/json" "fmt" "path" "strconv" @@ -13,10 +12,10 @@ import ( "time" "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/queue" ) -type queueSettings struct { +// QueueSettings represent the settings for a queue from the ini +type QueueSettings struct { DataDir string Length int BatchLength int @@ -38,55 +37,11 @@ type queueSettings struct { } // Queue settings -var Queue = queueSettings{} +var Queue = QueueSettings{} -// CreateQueue for name with provided handler and exemplar -func CreateQueue(name string, handle queue.HandlerFunc, exemplar interface{}) queue.Queue { - q := getQueueSettings(name) - opts := make(map[string]interface{}) - opts["Name"] = name - opts["QueueLength"] = q.Length - opts["BatchLength"] = q.BatchLength - opts["DataDir"] = q.DataDir - opts["Addresses"] = q.Addresses - opts["Network"] = q.Network - opts["Password"] = q.Password - opts["DBIndex"] = q.DBIndex - opts["QueueName"] = q.QueueName - opts["Workers"] = q.Workers - opts["MaxWorkers"] = q.MaxWorkers - opts["BlockTimeout"] = q.BlockTimeout - opts["BoostTimeout"] = q.BoostTimeout - opts["BoostWorkers"] = q.BoostWorkers - - cfg, err := json.Marshal(opts) - if err != nil { - log.Error("Unable to marshall generic options: %v Error: %v", opts, err) - log.Error("Unable to create queue for %s", name, err) - return nil - } - - returnable, err := queue.CreateQueue(queue.Type(q.Type), handle, cfg, exemplar) - if q.WrapIfNecessary && err != nil { - log.Warn("Unable to create queue for %s: %v", name, err) - log.Warn("Attempting to create wrapped queue") - returnable, err = queue.CreateQueue(queue.WrappedQueueType, handle, queue.WrappedQueueConfiguration{ - Underlying: queue.Type(q.Type), - Timeout: q.Timeout, - MaxAttempts: q.MaxAttempts, - Config: cfg, - QueueLength: q.Length, - }, exemplar) - } - if err != nil { - log.Error("Unable to create queue for %s: %v", name, err) - return nil - } - return returnable -} - -func getQueueSettings(name string) queueSettings { - q := queueSettings{} +// GetQueueSettings returns the queue settings for the appropriately named queue +func GetQueueSettings(name string) QueueSettings { + q := QueueSettings{} sec := Cfg.Section("queue." + name) // DataDir is not directly inheritable q.DataDir = path.Join(Queue.DataDir, name) @@ -104,8 +59,7 @@ func getQueueSettings(name string) queueSettings { q.Length = sec.Key("LENGTH").MustInt(Queue.Length) q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength) q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString) - validTypes := queue.RegisteredTypesAsString() - q.Type = sec.Key("TYPE").In(Queue.Type, validTypes) + q.Type = sec.Key("TYPE").MustString(Queue.Type) q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary) q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts) q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout) @@ -131,8 +85,7 @@ func NewQueueService() { Queue.Length = sec.Key("LENGTH").MustInt(20) Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20) Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, "")) - validTypes := queue.RegisteredTypesAsString() - Queue.Type = sec.Key("TYPE").In(string(queue.PersistableChannelQueueType), validTypes) + Queue.Type = sec.Key("TYPE").MustString("") Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString) Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true) Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10) diff --git a/modules/setting/task.go b/modules/setting/task.go index fa63c669c6..81ed39a9fb 100644 --- a/modules/setting/task.go +++ b/modules/setting/task.go @@ -4,16 +4,14 @@ package setting -import "code.gitea.io/gitea/modules/queue" - func newTaskService() { taskSec := Cfg.Section("task") queueTaskSec := Cfg.Section("queue.task") switch taskSec.Key("QUEUE_TYPE").MustString(ChannelQueueType) { case ChannelQueueType: - queueTaskSec.Key("TYPE").MustString(string(queue.PersistableChannelQueueType)) + queueTaskSec.Key("TYPE").MustString("persistable-channel") case RedisQueueType: - queueTaskSec.Key("TYPE").MustString(string(queue.RedisQueueType)) + queueTaskSec.Key("TYPE").MustString("redis") } queueTaskSec.Key("LENGTH").MustInt(taskSec.Key("QUEUE_LENGTH").MustInt(1000)) queueTaskSec.Key("CONN_STR").MustString(taskSec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0")) diff --git a/modules/task/task.go b/modules/task/task.go index 852319d406..416f0c696a 100644 --- a/modules/task/task.go +++ b/modules/task/task.go @@ -12,7 +12,6 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/migrations/base" "code.gitea.io/gitea/modules/queue" - "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/structs" ) @@ -31,7 +30,7 @@ func Run(t *models.Task) error { // Init will start the service to get all unfinished tasks and run them func Init() error { - taskQueue = setting.CreateQueue("task", handle, &models.Task{}) + taskQueue = queue.CreateQueue("task", handle, &models.Task{}) if taskQueue == nil { return fmt.Errorf("Unable to create Task Queue")