diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 1fcef59f34..8f0593acff 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -6,7 +6,6 @@ package issues import ( "context" - "encoding/json" "os" "sync" "time" @@ -113,64 +112,10 @@ func InitIssueIndexer(syncReindex bool) { } } - queueType := queue.PersistableChannelQueueType - switch setting.Indexer.IssueQueueType { - case setting.LevelQueueType: - queueType = queue.LevelQueueType - case setting.ChannelQueueType: - queueType = queue.PersistableChannelQueueType - case setting.RedisQueueType: - queueType = queue.RedisQueueType - default: - log.Fatal("Unsupported indexer queue type: %v", - setting.Indexer.IssueQueueType) - } + issueIndexerQueue = setting.CreateQueue("issue_indexer", handler, &IndexerData{}) - name := "issue_indexer_queue" - opts := make(map[string]interface{}) - opts["QueueLength"] = setting.Indexer.UpdateQueueLength - opts["BatchLength"] = setting.Indexer.IssueQueueBatchNumber - opts["DataDir"] = setting.Indexer.IssueQueueDir - - addrs, password, dbIdx, err := setting.ParseQueueConnStr(setting.Indexer.IssueQueueConnStr) - if queueType == queue.RedisQueueType && err != nil { - log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v", - setting.Indexer.IssueQueueConnStr, - err) - } - opts["Addresses"] = addrs - opts["Password"] = password - opts["DBIndex"] = dbIdx - opts["QueueName"] = name - opts["Name"] = name - opts["Workers"] = 1 - opts["BlockTimeout"] = 1 * time.Second - opts["BoostTimeout"] = 5 * time.Minute - opts["BoostWorkers"] = 5 - cfg, err := json.Marshal(opts) - if err != nil { - log.Error("Unable to marshall generic options: %v Error: %v", opts, err) - log.Fatal("Unable to create issue indexer queue with type %s: %v", - queueType, - err) - } - log.Debug("Creating issue indexer queue with type %s: configuration: %s", queueType, string(cfg)) - issueIndexerQueue, err = queue.CreateQueue(queueType, handler, cfg, &IndexerData{}) - if err != nil { - issueIndexerQueue, err = queue.CreateQueue(queue.WrappedQueueType, handler, queue.WrappedQueueConfiguration{ - Underlying: queueType, - Timeout: setting.GracefulHammerTime + 30*time.Second, - MaxAttempts: 10, - Config: cfg, - QueueLength: setting.Indexer.UpdateQueueLength, - Name: name, - }, &IndexerData{}) - } - if err != nil { - log.Fatal("Unable to create issue indexer queue with type %s: %v : %v", - queueType, - string(cfg), - err) + if issueIndexerQueue == nil { + log.Fatal("Unable to create issue indexer queue") } default: issueIndexerQueue = &queue.DummyQueue{} diff --git a/modules/setting/queue.go b/modules/setting/queue.go index 0066d5a946..08f6eaf3ee 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -6,6 +6,7 @@ package setting import ( "encoding/json" + "fmt" "path" "strconv" "strings" @@ -145,6 +146,38 @@ func newQueueService() { if !hasWorkers { Cfg.Section("queue.notification").Key("WORKERS").SetValue("5") } + + // Now handle the old issue_indexer configuration + section := Cfg.Section("queue.issue_indexer") + issueIndexerSectionMap := map[string]string{} + for _, key := range section.Keys() { + issueIndexerSectionMap[key.Name()] = key.Value() + } + if _, ok := issueIndexerSectionMap["TYPE"]; !ok { + switch Indexer.IssueQueueType { + case LevelQueueType: + section.Key("TYPE").SetValue("level") + case ChannelQueueType: + section.Key("TYPE").SetValue("persistable-channel") + case RedisQueueType: + section.Key("TYPE").SetValue("redis") + default: + log.Fatal("Unsupported indexer queue type: %v", + Indexer.IssueQueueType) + } + } + if _, ok := issueIndexerSectionMap["LENGTH"]; !ok { + section.Key("LENGTH").SetValue(fmt.Sprintf("%d", Indexer.UpdateQueueLength)) + } + if _, ok := issueIndexerSectionMap["BATCH_LENGTH"]; !ok { + section.Key("BATCH_LENGTH").SetValue(fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber)) + } + if _, ok := issueIndexerSectionMap["DATADIR"]; !ok { + section.Key("DATADIR").SetValue(Indexer.IssueQueueDir) + } + if _, ok := issueIndexerSectionMap["CONN_STR"]; !ok { + section.Key("CONN_STR").SetValue(Indexer.IssueQueueConnStr) + } } // ParseQueueConnStr parses a queue connection string