mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2024-11-27 09:11:53 -05:00
62e6c9bc6c
* Add a storage layer for attachments * Fix some bug * fix test * Fix copyright head and lint * Fix bug * Add setting for minio and flags for migrate-storage * Add documents * fix lint * Add test for minio store type on attachments * fix test * fix test * Apply suggestions from code review Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> * Add warning when storage migrated successfully * Fix drone * fix test * rebase * Fix test * display the error on console * Move minio test to amd64 since minio docker don't support arm64 * refactor the codes * add trace * Fix test * remove log on xorm * Fi download bug * Add a storage layer for attachments * Add setting for minio and flags for migrate-storage * fix lint * Add test for minio store type on attachments * Apply suggestions from code review Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> * Fix drone * fix test * Fix test * display the error on console * Move minio test to amd64 since minio docker don't support arm64 * refactor the codes * add trace * Fix test * Add URL function to serve attachments directly from S3/Minio * Add ability to enable/disable redirection in attachment configuration * Fix typo * Add a storage layer for attachments * Add setting for minio and flags for migrate-storage * fix lint * Add test for minio store type on attachments * Apply suggestions from code review Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> * Fix drone * fix test * Fix test * display the error on console * Move minio test to amd64 since minio docker don't support arm64 * don't change unrelated files * Fix lint * Fix build * update go.mod and go.sum * Use github.com/minio/minio-go/v6 * Remove unused function * Upgrade minio to v7 and some other improvements * fix lint * Fix go mod Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Tyler <tystuyfzand@gmail.com>
119 lines
3.6 KiB
Go
Vendored
119 lines
3.6 KiB
Go
Vendored
package concurrent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
"reflect"
|
|
)
|
|
|
|
// HandlePanic logs goroutine panic by default
|
|
var HandlePanic = func(recovered interface{}, funcName string) {
|
|
ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
|
|
ErrorLogger.Println(string(debug.Stack()))
|
|
}
|
|
|
|
// UnboundedExecutor is a executor without limits on counts of alive goroutines
|
|
// it tracks the goroutine started by it, and can cancel them when shutdown
|
|
type UnboundedExecutor struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
activeGoroutinesMutex *sync.Mutex
|
|
activeGoroutines map[string]int
|
|
HandlePanic func(recovered interface{}, funcName string)
|
|
}
|
|
|
|
// GlobalUnboundedExecutor has the life cycle of the program itself
|
|
// any goroutine want to be shutdown before main exit can be started from this executor
|
|
// GlobalUnboundedExecutor expects the main function to call stop
|
|
// it does not magically knows the main function exits
|
|
var GlobalUnboundedExecutor = NewUnboundedExecutor()
|
|
|
|
// NewUnboundedExecutor creates a new UnboundedExecutor,
|
|
// UnboundedExecutor can not be created by &UnboundedExecutor{}
|
|
// HandlePanic can be set with a callback to override global HandlePanic
|
|
func NewUnboundedExecutor() *UnboundedExecutor {
|
|
ctx, cancel := context.WithCancel(context.TODO())
|
|
return &UnboundedExecutor{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
activeGoroutinesMutex: &sync.Mutex{},
|
|
activeGoroutines: map[string]int{},
|
|
}
|
|
}
|
|
|
|
// Go starts a new goroutine and tracks its lifecycle.
|
|
// Panic will be recovered and logged automatically, except for StopSignal
|
|
func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
|
|
pc := reflect.ValueOf(handler).Pointer()
|
|
f := runtime.FuncForPC(pc)
|
|
funcName := f.Name()
|
|
file, line := f.FileLine(pc)
|
|
executor.activeGoroutinesMutex.Lock()
|
|
defer executor.activeGoroutinesMutex.Unlock()
|
|
startFrom := fmt.Sprintf("%s:%d", file, line)
|
|
executor.activeGoroutines[startFrom] += 1
|
|
go func() {
|
|
defer func() {
|
|
recovered := recover()
|
|
// if you want to quit a goroutine without trigger HandlePanic
|
|
// use runtime.Goexit() to quit
|
|
if recovered != nil {
|
|
if executor.HandlePanic == nil {
|
|
HandlePanic(recovered, funcName)
|
|
} else {
|
|
executor.HandlePanic(recovered, funcName)
|
|
}
|
|
}
|
|
executor.activeGoroutinesMutex.Lock()
|
|
executor.activeGoroutines[startFrom] -= 1
|
|
executor.activeGoroutinesMutex.Unlock()
|
|
}()
|
|
handler(executor.ctx)
|
|
}()
|
|
}
|
|
|
|
// Stop cancel all goroutines started by this executor without wait
|
|
func (executor *UnboundedExecutor) Stop() {
|
|
executor.cancel()
|
|
}
|
|
|
|
// StopAndWaitForever cancel all goroutines started by this executor and
|
|
// wait until all goroutines exited
|
|
func (executor *UnboundedExecutor) StopAndWaitForever() {
|
|
executor.StopAndWait(context.Background())
|
|
}
|
|
|
|
// StopAndWait cancel all goroutines started by this executor and wait.
|
|
// Wait can be cancelled by the context passed in.
|
|
func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
|
|
executor.cancel()
|
|
for {
|
|
oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
|
|
select {
|
|
case <-oneHundredMilliseconds.C:
|
|
if executor.checkNoActiveGoroutines() {
|
|
return
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
|
|
executor.activeGoroutinesMutex.Lock()
|
|
defer executor.activeGoroutinesMutex.Unlock()
|
|
for startFrom, count := range executor.activeGoroutines {
|
|
if count > 0 {
|
|
InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
|
|
"startFrom", startFrom,
|
|
"count", count)
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|