mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2024-12-30 14:09:42 -05:00
58dfaf3a75
Although some features are mixed together in this PR, this PR is not that large, and these features are all related. Actually there are more than 70 lines are for a toy "test queue", so this PR is quite simple. Major features: 1. Allow site admin to clear a queue (remove all items in a queue) * Because there is no transaction, the "unique queue" could be corrupted in rare cases, that's unfixable. * eg: the item is in the "set" but not in the "list", so the item would never be able to be pushed into the queue. * Now site admin could simply clear the queue, then everything becomes correct, the lost items could be re-pushed into queue by future operations. 3. Split the "admin/monitor" to separate pages 4. Allow to download diagnosis report * In history, there were many users reporting that Gitea queue gets stuck, or Gitea's CPU is 100% * With diagnosis report, maintainers could know what happens clearly The diagnosis report sample: [gitea-diagnosis-20230510-192913.zip](https://github.com/go-gitea/gitea/files/11441346/gitea-diagnosis-20230510-192913.zip) , use "go tool pprof profile.dat" to view the report. Screenshots: ![image](https://github.com/go-gitea/gitea/assets/2114189/320659b4-2eda-4def-8dc0-5ea08d578063) ![image](https://github.com/go-gitea/gitea/assets/2114189/c5c46fae-9dc0-44ca-8cd3-57beedc5035e) ![image](https://github.com/go-gitea/gitea/assets/2114189/6168a811-42a1-4e64-a263-0617a6c8c4fe) --------- Co-authored-by: Jason Song <i@wolfogre.com> Co-authored-by: Giteabot <teabot@gitea.io>
138 lines
3.4 KiB
Go
138 lines
3.4 KiB
Go
// Copyright 2023 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package queue
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/modules/graceful"
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/nosql"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
type baseRedis struct {
|
|
client redis.UniversalClient
|
|
isUnique bool
|
|
cfg *BaseConfig
|
|
|
|
mu sync.Mutex // the old implementation is not thread-safe, the queue operation and set operation should be protected together
|
|
}
|
|
|
|
var _ baseQueue = (*baseRedis)(nil)
|
|
|
|
func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
|
|
client := nosql.GetManager().GetRedisClient(cfg.ConnStr)
|
|
|
|
var err error
|
|
for i := 0; i < 10; i++ {
|
|
err = client.Ping(graceful.GetManager().ShutdownContext()).Err()
|
|
if err == nil {
|
|
break
|
|
}
|
|
log.Warn("Redis is not ready, waiting for 1 second to retry: %v", err)
|
|
time.Sleep(time.Second)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &baseRedis{cfg: cfg, client: client, isUnique: unique}, nil
|
|
}
|
|
|
|
func newBaseRedisSimple(cfg *BaseConfig) (baseQueue, error) {
|
|
return newBaseRedisGeneric(cfg, false)
|
|
}
|
|
|
|
func newBaseRedisUnique(cfg *BaseConfig) (baseQueue, error) {
|
|
return newBaseRedisGeneric(cfg, true)
|
|
}
|
|
|
|
func (q *baseRedis) PushItem(ctx context.Context, data []byte) error {
|
|
return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if int(cnt) >= q.cfg.Length {
|
|
return true, nil
|
|
}
|
|
|
|
if q.isUnique {
|
|
added, err := q.client.SAdd(ctx, q.cfg.SetFullName, data).Result()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if added == 0 {
|
|
return false, ErrAlreadyInQueue
|
|
}
|
|
}
|
|
return false, q.client.RPush(ctx, q.cfg.QueueFullName, data).Err()
|
|
})
|
|
}
|
|
|
|
func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) {
|
|
return backoffRetErr(ctx, backoffBegin, backoffUpper, infiniteTimerC, func() (retry bool, data []byte, err error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
data, err = q.client.LPop(ctx, q.cfg.QueueFullName).Bytes()
|
|
if err == redis.Nil {
|
|
return true, nil, nil
|
|
}
|
|
if err != nil {
|
|
return true, nil, nil
|
|
}
|
|
if q.isUnique {
|
|
// the data has been popped, even if there is any error we can't do anything
|
|
_ = q.client.SRem(ctx, q.cfg.SetFullName, data).Err()
|
|
}
|
|
return false, data, err
|
|
})
|
|
}
|
|
|
|
func (q *baseRedis) HasItem(ctx context.Context, data []byte) (bool, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
if !q.isUnique {
|
|
return false, nil
|
|
}
|
|
return q.client.SIsMember(ctx, q.cfg.SetFullName, data).Result()
|
|
}
|
|
|
|
func (q *baseRedis) Len(ctx context.Context) (int, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result()
|
|
return int(cnt), err
|
|
}
|
|
|
|
func (q *baseRedis) Close() error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
return q.client.Close()
|
|
}
|
|
|
|
func (q *baseRedis) RemoveAll(ctx context.Context) error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
c1 := q.client.Del(ctx, q.cfg.QueueFullName)
|
|
// the "set" must be cleared after the "list" because there is no transaction.
|
|
// it's better to have duplicate items than losing items.
|
|
c2 := q.client.Del(ctx, q.cfg.SetFullName)
|
|
if c1.Err() != nil {
|
|
return c1.Err()
|
|
}
|
|
if c2.Err() != nil {
|
|
return c2.Err()
|
|
}
|
|
return nil // actually, checking errors doesn't make sense here because the state could be out-of-sync
|
|
}
|