From 4658b2f35e8f9058c9d0521910b784243a30b70e Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 16 Nov 2019 18:11:34 +0000 Subject: [PATCH] Task: Move to generic queue and gracefulise --- modules/setting/setting.go | 1 + modules/setting/task.go | 27 +++---- modules/task/queue.go | 14 ---- modules/task/queue_channel.go | 48 ------------- modules/task/queue_redis.go | 130 ---------------------------------- modules/task/task.go | 40 +++++------ 6 files changed, 30 insertions(+), 230 deletions(-) delete mode 100644 modules/task/queue.go delete mode 100644 modules/task/queue_channel.go delete mode 100644 modules/task/queue_redis.go diff --git a/modules/setting/setting.go b/modules/setting/setting.go index dbf43f31ee..a7a916e9c2 100644 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -1090,4 +1090,5 @@ func NewServices() { newMigrationsService() newIndexerService() newTaskService() + newQueueService() } diff --git a/modules/setting/task.go b/modules/setting/task.go index 97704d4a4d..fa63c669c6 100644 --- a/modules/setting/task.go +++ b/modules/setting/task.go @@ -4,22 +4,17 @@ package setting -var ( - // Task settings - Task = struct { - QueueType string - QueueLength int - QueueConnStr string - }{ - QueueType: ChannelQueueType, - QueueLength: 1000, - QueueConnStr: "addrs=127.0.0.1:6379 db=0", - } -) +import "code.gitea.io/gitea/modules/queue" func newTaskService() { - sec := Cfg.Section("task") - Task.QueueType = sec.Key("QUEUE_TYPE").MustString(ChannelQueueType) - Task.QueueLength = sec.Key("QUEUE_LENGTH").MustInt(1000) - Task.QueueConnStr = sec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0") + taskSec := Cfg.Section("task") + queueTaskSec := Cfg.Section("queue.task") + switch taskSec.Key("QUEUE_TYPE").MustString(ChannelQueueType) { + case ChannelQueueType: + queueTaskSec.Key("TYPE").MustString(string(queue.PersistableChannelQueueType)) + case RedisQueueType: + queueTaskSec.Key("TYPE").MustString(string(queue.RedisQueueType)) + } + queueTaskSec.Key("LENGTH").MustInt(taskSec.Key("QUEUE_LENGTH").MustInt(1000)) + queueTaskSec.Key("CONN_STR").MustString(taskSec.Key("QUEUE_CONN_STR").MustString("addrs=127.0.0.1:6379 db=0")) } diff --git a/modules/task/queue.go b/modules/task/queue.go deleted file mode 100644 index ddee0b3d46..0000000000 --- a/modules/task/queue.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2019 Gitea. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package task - -import "code.gitea.io/gitea/models" - -// Queue defines an interface to run task queue -type Queue interface { - Run() error - Push(*models.Task) error - Stop() -} diff --git a/modules/task/queue_channel.go b/modules/task/queue_channel.go deleted file mode 100644 index da541f4755..0000000000 --- a/modules/task/queue_channel.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package task - -import ( - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/log" -) - -var ( - _ Queue = &ChannelQueue{} -) - -// ChannelQueue implements -type ChannelQueue struct { - queue chan *models.Task -} - -// NewChannelQueue create a memory channel queue -func NewChannelQueue(queueLen int) *ChannelQueue { - return &ChannelQueue{ - queue: make(chan *models.Task, queueLen), - } -} - -// Run starts to run the queue -func (c *ChannelQueue) Run() error { - for task := range c.queue { - err := Run(task) - if err != nil { - log.Error("Run task failed: %s", err.Error()) - } - } - return nil -} - -// Push will push the task ID to queue -func (c *ChannelQueue) Push(task *models.Task) error { - c.queue <- task - return nil -} - -// Stop stop the queue -func (c *ChannelQueue) Stop() { - close(c.queue) -} diff --git a/modules/task/queue_redis.go b/modules/task/queue_redis.go deleted file mode 100644 index 127de0cdbf..0000000000 --- a/modules/task/queue_redis.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2019 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package task - -import ( - "encoding/json" - "errors" - "strconv" - "strings" - "time" - - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/log" - - "github.com/go-redis/redis" -) - -var ( - _ Queue = &RedisQueue{} -) - -type redisClient interface { - RPush(key string, args ...interface{}) *redis.IntCmd - LPop(key string) *redis.StringCmd - Ping() *redis.StatusCmd -} - -// RedisQueue redis queue -type RedisQueue struct { - client redisClient - queueName string - closeChan chan bool -} - -func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) { - fields := strings.Fields(connStr) - for _, f := range fields { - items := strings.SplitN(f, "=", 2) - if len(items) < 2 { - continue - } - switch strings.ToLower(items[0]) { - case "addrs": - addrs = items[1] - case "password": - password = items[1] - case "db": - dbIdx, err = strconv.Atoi(items[1]) - if err != nil { - return - } - } - } - return -} - -// NewRedisQueue creates single redis or cluster redis queue -func NewRedisQueue(addrs string, password string, dbIdx int) (*RedisQueue, error) { - dbs := strings.Split(addrs, ",") - var queue = RedisQueue{ - queueName: "task_queue", - closeChan: make(chan bool), - } - if len(dbs) == 0 { - return nil, errors.New("no redis host found") - } else if len(dbs) == 1 { - queue.client = redis.NewClient(&redis.Options{ - Addr: strings.TrimSpace(dbs[0]), // use default Addr - Password: password, // no password set - DB: dbIdx, // use default DB - }) - } else { - // cluster will ignore db - queue.client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: dbs, - Password: password, - }) - } - if err := queue.client.Ping().Err(); err != nil { - return nil, err - } - return &queue, nil -} - -// Run starts to run the queue -func (r *RedisQueue) Run() error { - for { - select { - case <-r.closeChan: - return nil - case <-time.After(time.Millisecond * 100): - } - - bs, err := r.client.LPop(r.queueName).Bytes() - if err != nil { - if err != redis.Nil { - log.Error("LPop failed: %v", err) - } - time.Sleep(time.Millisecond * 100) - continue - } - - var task models.Task - err = json.Unmarshal(bs, &task) - if err != nil { - log.Error("Unmarshal task failed: %s", err.Error()) - } else { - err = Run(&task) - if err != nil { - log.Error("Run task failed: %s", err.Error()) - } - } - } -} - -// Push implements Queue -func (r *RedisQueue) Push(task *models.Task) error { - bs, err := json.Marshal(task) - if err != nil { - return err - } - return r.client.RPush(r.queueName, bs).Err() -} - -// Stop stop the queue -func (r *RedisQueue) Stop() { - r.closeChan <- true -} diff --git a/modules/task/task.go b/modules/task/task.go index 64744afe7a..852319d406 100644 --- a/modules/task/task.go +++ b/modules/task/task.go @@ -8,14 +8,16 @@ import ( "fmt" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/migrations/base" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/structs" ) // taskQueue is a global queue of tasks -var taskQueue Queue +var taskQueue queue.Queue // Run a task func Run(t *models.Task) error { @@ -23,38 +25,32 @@ func Run(t *models.Task) error { case structs.TaskTypeMigrateRepo: return runMigrateTask(t) default: - return fmt.Errorf("Unknow task type: %d", t.Type) + return fmt.Errorf("Unknown task type: %d", t.Type) } } // Init will start the service to get all unfinished tasks and run them func Init() error { - switch setting.Task.QueueType { - case setting.ChannelQueueType: - taskQueue = NewChannelQueue(setting.Task.QueueLength) - case setting.RedisQueueType: - var err error - addrs, pass, idx, err := parseConnStr(setting.Task.QueueConnStr) - if err != nil { - return err - } - taskQueue, err = NewRedisQueue(addrs, pass, idx) - if err != nil { - return err - } - default: - return fmt.Errorf("Unsupported task queue type: %v", setting.Task.QueueType) + taskQueue = setting.CreateQueue("task", handle, &models.Task{}) + + if taskQueue == nil { + return fmt.Errorf("Unable to create Task Queue") } - go func() { - if err := taskQueue.Run(); err != nil { - log.Error("taskQueue.Run end failed: %v", err) - } - }() + go graceful.GetManager().RunWithShutdownFns(taskQueue.Run) return nil } +func handle(data ...queue.Data) { + for _, datum := range data { + task := datum.(*models.Task) + if err := Run(task); err != nil { + log.Error("Run task failed: %v", err) + } + } +} + // MigrateRepository add migration repository to task func MigrateRepository(doer, u *models.User, opts base.MigrateOptions) error { task, err := models.CreateMigrateTask(doer, u, opts)