{{$.i18n.Tr "admin.monitor.queue.pool.cancel_notices" `` | Safe}}
+{{$.i18n.Tr "admin.monitor.queue.pool.cancel_desc"}}
+diff --git a/modules/queue/manager.go b/modules/queue/manager.go new file mode 100644 index 0000000000..100780c706 --- /dev/null +++ b/modules/queue/manager.go @@ -0,0 +1,211 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "sort" + "sync" + "time" +) + +var manager *Manager + +// Manager is a queue manager +type Manager struct { + mutex sync.Mutex + + counter int64 + Queues map[int64]*Description +} + +// Description represents a working queue inheriting from Gitea. +type Description struct { + mutex sync.Mutex + QID int64 + Queue Queue + Type Type + Name string + Configuration interface{} + ExemplarType string + addWorkers func(number int, timeout time.Duration) context.CancelFunc + numberOfWorkers func() int + counter int64 + PoolWorkers map[int64]*PoolWorkers +} + +// DescriptionList implements the sort.Interface +type DescriptionList []*Description + +// PoolWorkers represents a working queue inheriting from Gitea. +type PoolWorkers struct { + PID int64 + Workers int + Start time.Time + Timeout time.Time + HasTimeout bool + Cancel context.CancelFunc +} + +// PoolWorkersList implements the sort.Interface +type PoolWorkersList []*PoolWorkers + +func init() { + _ = GetManager() +} + +// GetManager returns a Manager and initializes one as singleton if there's none yet +func GetManager() *Manager { + if manager == nil { + manager = &Manager{ + Queues: make(map[int64]*Description), + } + } + return manager +} + +// Add adds a queue to this manager +func (m *Manager) Add(queue Queue, + t Type, + configuration, + exemplar interface{}, + addWorkers func(number int, timeout time.Duration) context.CancelFunc, + numberOfWorkers func() int) int64 { + + cfg, _ := json.Marshal(configuration) + desc := &Description{ + Queue: queue, + Type: t, + Configuration: string(cfg), + ExemplarType: reflect.TypeOf(exemplar).String(), + PoolWorkers: make(map[int64]*PoolWorkers), + addWorkers: addWorkers, + numberOfWorkers: numberOfWorkers, + } + m.mutex.Lock() + m.counter++ + desc.QID = m.counter + desc.Name = fmt.Sprintf("queue-%d", desc.QID) + if named, ok := queue.(Named); ok { + desc.Name = named.Name() + } + m.Queues[desc.QID] = desc + m.mutex.Unlock() + return desc.QID +} + +// Remove a queue from the Manager +func (m *Manager) Remove(qid int64) { + m.mutex.Lock() + delete(m.Queues, qid) + m.mutex.Unlock() +} + +// GetDescription by qid +func (m *Manager) GetDescription(qid int64) *Description { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.Queues[qid] +} + +// Descriptions returns the queue descriptions +func (m *Manager) Descriptions() []*Description { + m.mutex.Lock() + descs := make([]*Description, 0, len(m.Queues)) + for _, desc := range m.Queues { + descs = append(descs, desc) + } + m.mutex.Unlock() + sort.Sort(DescriptionList(descs)) + return descs +} + +// Workers returns the poolworkers +func (q *Description) Workers() []*PoolWorkers { + q.mutex.Lock() + workers := make([]*PoolWorkers, 0, len(q.PoolWorkers)) + for _, worker := range q.PoolWorkers { + workers = append(workers, worker) + } + q.mutex.Unlock() + sort.Sort(PoolWorkersList(workers)) + return workers +} + +// RegisterWorkers registers workers to this queue +func (q *Description) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 { + q.mutex.Lock() + defer q.mutex.Unlock() + q.counter++ + q.PoolWorkers[q.counter] = &PoolWorkers{ + PID: q.counter, + Workers: number, + Start: start, + Timeout: timeout, + HasTimeout: hasTimeout, + Cancel: cancel, + } + return q.counter +} + +// CancelWorkers cancels pooled workers with pid +func (q *Description) CancelWorkers(pid int64) { + q.mutex.Lock() + pw, ok := q.PoolWorkers[pid] + q.mutex.Unlock() + if !ok { + return + } + pw.Cancel() +} + +// RemoveWorkers deletes pooled workers with pid +func (q *Description) RemoveWorkers(pid int64) { + q.mutex.Lock() + delete(q.PoolWorkers, pid) + q.mutex.Unlock() +} + +// AddWorkers adds workers to the queue if it has registered an add worker function +func (q *Description) AddWorkers(number int, timeout time.Duration) { + if q.addWorkers != nil { + _ = q.addWorkers(number, timeout) + } +} + +// NumberOfWorkers returns the number of workers in the queue +func (q *Description) NumberOfWorkers() int { + if q.numberOfWorkers != nil { + return q.numberOfWorkers() + } + return -1 +} + +func (l DescriptionList) Len() int { + return len(l) +} + +func (l DescriptionList) Less(i, j int) bool { + return l[i].Name < l[j].Name +} + +func (l DescriptionList) Swap(i, j int) { + l[i], l[j] = l[j], l[i] +} + +func (l PoolWorkersList) Len() int { + return len(l) +} + +func (l PoolWorkersList) Less(i, j int) bool { + return l[i].Start.Before(l[j].Start) +} + +func (l PoolWorkersList) Swap(i, j int) { + l[i], l[j] = l[j], l[i] +} diff --git a/modules/queue/queue.go b/modules/queue/queue.go index 1220db5c03..464e16dab1 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -48,6 +48,11 @@ type Shutdownable interface { Terminate() } +// Named represents a queue with a name +type Named interface { + Name() string +} + // Queue defines an interface to save an issue indexer queue type Queue interface { Run(atShutdown, atTerminate func(context.Context, func())) diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 90ec52347d..265a5c88f1 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -48,7 +48,7 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro dataChan := make(chan Data, config.QueueLength) ctx, cancel := context.WithCancel(context.Background()) - return &ChannelQueue{ + queue := &ChannelQueue{ pool: &WorkerPool{ baseCtx: ctx, cancel: cancel, @@ -62,7 +62,9 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro exemplar: exemplar, workers: config.Workers, name: config.Name, - }, nil + } + queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool.AddWorkers, queue.pool.NumberOfWorkers) + return queue, nil } // Run starts to run the queue @@ -73,7 +75,9 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()) atTerminate(context.Background(), func() { log.Warn("ChannelQueue: %s is not terminatable!", c.name) }) - c.pool.addWorkers(c.pool.baseCtx, c.workers) + go func() { + _ = c.pool.AddWorkers(c.workers, 0) + }() } // Push will push the indexer data to queue @@ -90,6 +94,11 @@ func (c *ChannelQueue) Push(data Data) error { return nil } +// Name returns the name of this queue +func (c *ChannelQueue) Name() string { + return c.name +} + func init() { queuesMap[ChannelQueueType] = NewChannelQueue } diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index cb95b96119..f18f3c5f8e 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -50,7 +50,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) } config := configInterface.(LevelQueueConfiguration) - queue, err := levelqueue.Open(config.DataDir) + internal, err := levelqueue.Open(config.DataDir) if err != nil { return nil, err } @@ -58,7 +58,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) dataChan := make(chan Data, config.QueueLength) ctx, cancel := context.WithCancel(context.Background()) - return &LevelQueue{ + queue := &LevelQueue{ pool: &WorkerPool{ baseCtx: ctx, cancel: cancel, @@ -69,13 +69,15 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) boostTimeout: config.BoostTimeout, boostWorkers: config.BoostWorkers, }, - queue: queue, + queue: internal, exemplar: exemplar, closed: make(chan struct{}), terminated: make(chan struct{}), workers: config.Workers, name: config.Name, - }, nil + } + queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool.AddWorkers, queue.pool.NumberOfWorkers) + return queue, nil } // Run starts to run the queue @@ -83,7 +85,9 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) atShutdown(context.Background(), l.Shutdown) atTerminate(context.Background(), l.Terminate) - go l.pool.addWorkers(l.pool.baseCtx, l.workers) + go func() { + _ = l.pool.AddWorkers(l.workers, 0) + }() go l.readToChan() @@ -140,7 +144,7 @@ func (l *LevelQueue) readToChan() { log.Trace("LevelQueue %s: task found: %#v", l.name, data) l.pool.Push(data) - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 100) } } @@ -183,6 +187,11 @@ func (l *LevelQueue) Terminate() { } } +// Name returns the name of this queue +func (l *LevelQueue) Name() string { + return l.name +} + func init() { queuesMap[LevelQueueType] = NewLevelQueue } diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index fc186b3bb9..3bf39b9fa5 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -85,7 +85,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( return nil, ErrInvalidConfiguration{cfg: cfg} } - return &PersistableChannelQueue{ + queue := &PersistableChannelQueue{ ChannelQueue: channelQueue.(*ChannelQueue), delayedStarter: delayedStarter{ cfg: levelCfg, @@ -95,7 +95,9 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) ( name: config.Name, }, closed: make(chan struct{}), - }, nil + } + _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil, nil) + return queue, nil } // Name returns the name of this queue @@ -127,7 +129,9 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte // Just run the level queue - we shut it down later go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) - go p.ChannelQueue.pool.addWorkers(p.ChannelQueue.pool.baseCtx, p.workers) + go func() { + _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0) + }() <-p.closed p.ChannelQueue.pool.cancel() diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index ebcba683cb..88794428a8 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -67,7 +67,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) dataChan := make(chan Data, config.QueueLength) ctx, cancel := context.WithCancel(context.Background()) - var queue = RedisQueue{ + var queue = &RedisQueue{ pool: &WorkerPool{ baseCtx: ctx, cancel: cancel, @@ -100,7 +100,9 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) if err := queue.client.Ping().Err(); err != nil { return nil, err } - return &queue, nil + queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool.AddWorkers, queue.pool.NumberOfWorkers) + + return queue, nil } // Run runs the redis queue @@ -108,7 +110,9 @@ func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) atShutdown(context.Background(), r.Shutdown) atTerminate(context.Background(), r.Terminate) - go r.pool.addWorkers(r.pool.baseCtx, r.workers) + go func() { + _ = r.pool.AddWorkers(r.workers, 0) + }() go r.readToChan() @@ -198,6 +202,11 @@ func (r *RedisQueue) Terminate() { } } +// Name returns the name of this queue +func (r *RedisQueue) Name() string { + return r.name +} + func init() { queuesMap[RedisQueueType] = NewRedisQueue } diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index 2293327348..57f19f6312 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -111,7 +111,7 @@ func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro return nil, ErrInvalidConfiguration{cfg: cfg} } - return &WrappedQueue{ + queue = &WrappedQueue{ handle: handle, channel: make(chan Data, config.QueueLength), exemplar: exemplar, @@ -122,7 +122,14 @@ func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro maxAttempts: config.MaxAttempts, name: config.Name, }, - }, nil + } + _ = GetManager().Add(queue, WrappedQueueType, config, exemplar, nil, nil) + return queue, nil +} + +// Name returns the name of the queue +func (q *WrappedQueue) Name() string { + return q.name + "-wrapper" } // Push will push the data to the internal channel checking it against the exemplar diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 02e053a427..bf3a15c00e 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -18,6 +18,7 @@ type WorkerPool struct { baseCtx context.Context cancel context.CancelFunc cond *sync.Cond + qid int64 numberOfWorkers int batchLength int handle HandlerFunc @@ -68,8 +69,21 @@ func (p *WorkerPool) pushBoost(data Data) { return } p.blockTimeout *= 2 - log.Warn("Worker Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) ctx, cancel := context.WithCancel(p.baseCtx) + desc := GetManager().GetDescription(p.qid) + if desc != nil { + log.Warn("Worker Channel for %v blocked for %v - adding %d temporary workers for %s, block timeout now %v", desc.Name, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) + + start := time.Now() + pid := desc.RegisterWorkers(p.boostWorkers, start, false, start, cancel) + go func() { + <-ctx.Done() + desc.RemoveWorkers(pid) + cancel() + }() + } else { + log.Warn("Worker Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) + } go func() { <-time.After(p.boostTimeout) cancel() @@ -95,12 +109,26 @@ func (p *WorkerPool) NumberOfWorkers() int { func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { var ctx context.Context var cancel context.CancelFunc + start := time.Now() + end := start + hasTimeout := false if timeout > 0 { ctx, cancel = context.WithTimeout(p.baseCtx, timeout) + end = start.Add(timeout) + hasTimeout = true } else { ctx, cancel = context.WithCancel(p.baseCtx) } + desc := GetManager().GetDescription(p.qid) + if desc != nil { + pid := desc.RegisterWorkers(number, start, hasTimeout, end, cancel) + go func() { + <-ctx.Done() + desc.RemoveWorkers(pid) + cancel() + }() + } p.addWorkers(ctx, number) return cancel } diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index 6911904271..d6a96b55a5 100644 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -2022,6 +2022,34 @@ monitor.execute_time = Execution Time monitor.process.cancel = Cancel process monitor.process.cancel_desc = Cancelling a process may cause data loss monitor.process.cancel_notices = Cancel: %s? +monitor.queues = Queues +monitor.queue = Queue: %s +monitor.queue.name = Name +monitor.queue.type = Type +monitor.queue.exemplar = Exemplar Type +monitor.queue.numberworkers = Number of Workers +monitor.queue.review = Review Config +monitor.queue.review_add = Review/Add Workers +monitor.queue.configuration = Initial Configuration +monitor.queue.nopool.title = No Worker Pool +monitor.queue.nopool.desc = This queue wraps other queues and does not itself have a worker pool. +monitor.queue.wrapped.desc = A wrapped queue wraps a slow starting queue, buffering queued requests in a channel. It does not have a worker pool itself. +monitor.queue.persistable-channel.desc = A persistable-channel wraps two queues, a channel queue that has its own worker pool and a level queue for persisted requests from previous shutdowns. It does not have a worker pool itself. +monitor.queue.pool.timeout = Timeout +monitor.queue.pool.addworkers.title = Add Workers +monitor.queue.pool.addworkers.submit = Add Workers +monitor.queue.pool.addworkers.desc = Add Workers to this pool with or without a timeout. If you set a timeout these workers will be removed from the pool after the timeout has lapsed. +monitor.queue.pool.addworkers.numberworkers.placeholder = Number of Workers +monitor.queue.pool.addworkers.timeout.placeholder = Set to 0 for no timeout +monitor.queue.pool.addworkers.mustnumbergreaterzero = Number of Workers to add must be greater than zero +monitor.queue.pool.addworkers.musttimeoutduration = Timeout must be a golang duration eg. 5m or be 0 +monitor.queue.pool.added = Worker Group Added +monitor.queue.pool.workers.title = Active Worker Groups +monitor.queue.pool.workers.none = No worker groups. +monitor.queue.pool.cancel = Shutdown Worker Group +monitor.queue.pool.cancelling = Worker Group shutting down +monitor.queue.pool.cancel_notices = Shutdown this group of %s workers? +monitor.queue.pool.cancel_desc = Leaving a queue without any worker groups may cause requests may block indefinitely. notices.system_notice_list = System Notices notices.view_detail_header = View Notice Details diff --git a/routers/admin/admin.go b/routers/admin/admin.go index ccedcaf8a6..7fc57edf31 100644 --- a/routers/admin/admin.go +++ b/routers/admin/admin.go @@ -22,6 +22,7 @@ import ( "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/services/mailer" @@ -35,6 +36,7 @@ const ( tplDashboard base.TplName = "admin/dashboard" tplConfig base.TplName = "admin/config" tplMonitor base.TplName = "admin/monitor" + tplQueue base.TplName = "admin/queue" ) var ( @@ -355,6 +357,7 @@ func Monitor(ctx *context.Context) { ctx.Data["PageIsAdminMonitor"] = true ctx.Data["Processes"] = process.GetManager().Processes() ctx.Data["Entries"] = cron.ListTasks() + ctx.Data["Queues"] = queue.GetManager().Descriptions() ctx.HTML(200, tplMonitor) } @@ -366,3 +369,59 @@ func MonitorCancel(ctx *context.Context) { "redirect": ctx.Repo.RepoLink + "/admin/monitor", }) } + +// Queue shows details for a specific queue +func Queue(ctx *context.Context) { + qid := ctx.ParamsInt64("qid") + desc := queue.GetManager().GetDescription(qid) + if desc == nil { + ctx.Status(404) + return + } + ctx.Data["Title"] = ctx.Tr("admin.monitor.queue", desc.Name) + ctx.Data["PageIsAdmin"] = true + ctx.Data["PageIsAdminMonitor"] = true + ctx.Data["Queue"] = desc + ctx.HTML(200, tplQueue) +} + +// WorkerCancel cancels a worker group +func WorkerCancel(ctx *context.Context) { + qid := ctx.ParamsInt64("qid") + desc := queue.GetManager().GetDescription(qid) + if desc == nil { + ctx.Status(404) + return + } + pid := ctx.ParamsInt64("pid") + desc.CancelWorkers(pid) + ctx.Flash.Info(ctx.Tr("admin.monitor.queue.pool.cancelling")) + ctx.JSON(200, map[string]interface{}{ + "redirect": setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid), + }) +} + +// AddWorkers adds workers to a worker group +func AddWorkers(ctx *context.Context) { + qid := ctx.ParamsInt64("qid") + desc := queue.GetManager().GetDescription(qid) + if desc == nil { + ctx.Status(404) + return + } + number := ctx.QueryInt("number") + if number < 1 { + ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.addworkers.mustnumbergreaterzero")) + ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) + return + } + timeout, err := time.ParseDuration(ctx.Query("timeout")) + if err != nil { + ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.addworkers.musttimeoutduration")) + ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) + return + } + desc.AddWorkers(number, timeout) + ctx.Flash.Success(ctx.Tr("admin.monitor.queue.pool.added")) + ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) +} diff --git a/routers/routes/routes.go b/routers/routes/routes.go index c8351f312b..e97a932692 100644 --- a/routers/routes/routes.go +++ b/routers/routes/routes.go @@ -411,8 +411,15 @@ func RegisterRoutes(m *macaron.Macaron) { m.Get("", adminReq, admin.Dashboard) m.Get("/config", admin.Config) m.Post("/config/test_mail", admin.SendTestMail) - m.Get("/monitor", admin.Monitor) - m.Post("/monitor/cancel/:pid", admin.MonitorCancel) + m.Group("/monitor", func() { + m.Get("", admin.Monitor) + m.Post("/cancel/:pid", admin.MonitorCancel) + m.Group("/queue/:qid", func() { + m.Get("", admin.Queue) + m.Post("/add", admin.AddWorkers) + m.Post("/cancel/:pid", admin.WorkerCancel) + }) + }) m.Group("/users", func() { m.Get("", admin.Users) diff --git a/templates/admin/monitor.tmpl b/templates/admin/monitor.tmpl index 38402fece2..0f9c2150b6 100644 --- a/templates/admin/monitor.tmpl +++ b/templates/admin/monitor.tmpl @@ -31,6 +31,34 @@ +
{{.i18n.Tr "admin.monitor.queue.name"}} | +{{.i18n.Tr "admin.monitor.queue.type"}} | +{{.i18n.Tr "admin.monitor.queue.exemplar"}} | +{{.i18n.Tr "admin.monitor.queue.numberworkers"}} | ++ |
---|---|---|---|---|
{{.Name}} | +{{.Type}} | +{{.ExemplarType}} | +{{$sum := .NumberOfWorkers}}{{if lt $sum 0}}-{{else}}{{$sum}}{{end}} | +{{if lt $sum 0}}{{$.i18n.Tr "admin.monitor.queue.review"}}{{else}}{{$.i18n.Tr "admin.monitor.queue.review_add"}}{{end}} + |
{{.i18n.Tr "admin.monitor.queue.name"}} | +{{.i18n.Tr "admin.monitor.queue.type"}} | +{{.i18n.Tr "admin.monitor.queue.exemplar"}} | +{{.i18n.Tr "admin.monitor.queue.numberworkers"}} | +
---|---|---|---|
{{.Queue.Name}} | +{{.Queue.Type}} | +{{.Queue.ExemplarType}} | +{{$sum := .Queue.NumberOfWorkers}}{{if lt $sum 0}}-{{else}}{{$sum}}{{end}} | +
{{.i18n.Tr "admin.monitor.queue.wrapped.desc"}}
+ {{else if eq .Queue.Type "persistable-channel"}} +{{.i18n.Tr "admin.monitor.queue.persistable-channel.desc"}}
+ {{else}} +{{.i18n.Tr "admin.monitor.queue.nopool.desc"}}
+ {{end}} +{{.i18n.Tr "admin.monitor.queue.pool.addworkers.desc"}}
+ +{{.Queue.Configuration | JsonPrettyPrint}} +
{{$.i18n.Tr "admin.monitor.queue.pool.cancel_notices" `` | Safe}}
+{{$.i18n.Tr "admin.monitor.queue.pool.cancel_desc"}}
+