mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2025-01-02 14:28:52 -05:00
Make LFS http_client parallel within a batch. (#32369)
Signed-off-by: Royce Remer <royceremer@gmail.com> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
parent
3c9b3ddf5c
commit
f6273e2250
7 changed files with 183 additions and 153 deletions
|
@ -2668,9 +2668,15 @@ LEVEL = Info
|
||||||
;; override the minio base path if storage type is minio
|
;; override the minio base path if storage type is minio
|
||||||
;MINIO_BASE_PATH = lfs/
|
;MINIO_BASE_PATH = lfs/
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; settings for Gitea's LFS client (eg: mirroring an upstream lfs endpoint)
|
||||||
|
;;
|
||||||
;[lfs_client]
|
;[lfs_client]
|
||||||
;; When mirroring an upstream lfs endpoint, limit the number of pointers in each batch request to this number
|
;; Limit the number of pointers in each batch request to this number
|
||||||
;BATCH_SIZE = 20
|
;BATCH_SIZE = 20
|
||||||
|
;; Limit the number of concurrent upload/download operations within a batch
|
||||||
|
;BATCH_OPERATION_CONCURRENCY = 3
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -108,6 +108,7 @@ require (
|
||||||
golang.org/x/image v0.23.0
|
golang.org/x/image v0.23.0
|
||||||
golang.org/x/net v0.31.0
|
golang.org/x/net v0.31.0
|
||||||
golang.org/x/oauth2 v0.23.0
|
golang.org/x/oauth2 v0.23.0
|
||||||
|
golang.org/x/sync v0.10.0
|
||||||
golang.org/x/sys v0.28.0
|
golang.org/x/sys v0.28.0
|
||||||
golang.org/x/text v0.21.0
|
golang.org/x/text v0.21.0
|
||||||
golang.org/x/tools v0.26.0
|
golang.org/x/tools v0.26.0
|
||||||
|
@ -284,7 +285,6 @@ require (
|
||||||
go.uber.org/zap v1.27.0 // indirect
|
go.uber.org/zap v1.27.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
|
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
|
||||||
golang.org/x/mod v0.21.0 // indirect
|
golang.org/x/mod v0.21.0 // indirect
|
||||||
golang.org/x/sync v0.10.0 // indirect
|
|
||||||
golang.org/x/time v0.5.0 // indirect
|
golang.org/x/time v0.5.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
|
||||||
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
|
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
|
||||||
|
|
|
@ -17,6 +17,8 @@ import (
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
"code.gitea.io/gitea/modules/proxy"
|
"code.gitea.io/gitea/modules/proxy"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
"code.gitea.io/gitea/modules/setting"
|
||||||
|
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HTTPClient is used to communicate with the LFS server
|
// HTTPClient is used to communicate with the LFS server
|
||||||
|
@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
|
||||||
return c.performOperation(ctx, objects, nil, callback)
|
return c.performOperation(ctx, objects, nil, callback)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
|
||||||
func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
|
func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
|
||||||
if len(objects) == 0 {
|
if len(objects) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -133,30 +136,48 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
|
||||||
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
|
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
errGroup, groupCtx := errgroup.WithContext(ctx)
|
||||||
|
errGroup.SetLimit(setting.LFSClient.BatchOperationConcurrency)
|
||||||
for _, object := range result.Objects {
|
for _, object := range result.Objects {
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
return performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// only the first error is returned, preserving legacy behavior before concurrency
|
||||||
|
return errGroup.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// performSingleOperation performs an LFS upload or download operation on a single object
|
||||||
|
func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
|
||||||
|
// the response from a lfs batch api request for this specific object id contained an error
|
||||||
if object.Error != nil {
|
if object.Error != nil {
|
||||||
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
|
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
|
||||||
|
|
||||||
|
// this was an 'upload' request inside the batch request
|
||||||
if uc != nil {
|
if uc != nil {
|
||||||
if _, err := uc(object.Pointer, object.Error); err != nil {
|
if _, err := uc(object.Pointer, object.Error); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
|
||||||
if err := dc(object.Pointer, nil, object.Error); err != nil {
|
if err := dc(object.Pointer, nil, object.Error); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue
|
// if the callback returns no err, then the error could be ignored, and the operations should continue
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the response from an lfs batch api request contained necessary upload/download fields to act upon
|
||||||
if uc != nil {
|
if uc != nil {
|
||||||
if len(object.Actions) == 0 {
|
if len(object.Actions) == 0 {
|
||||||
log.Trace("%v already present on server", object.Pointer)
|
log.Trace("%v already present on server", object.Pointer)
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
link, ok := object.Actions["upload"]
|
link, ok := object.Actions["upload"]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Debug("%+v", object)
|
|
||||||
return errors.New("missing action 'upload'")
|
return errors.New("missing action 'upload'")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,8 +217,6 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"code.gitea.io/gitea/modules/json"
|
"code.gitea.io/gitea/modules/json"
|
||||||
|
"code.gitea.io/gitea/modules/setting"
|
||||||
|
"code.gitea.io/gitea/modules/test"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -184,71 +186,61 @@ func TestHTTPClientDownload(t *testing.T) {
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
endpoint string
|
endpoint string
|
||||||
expectederror string
|
expectedError string
|
||||||
}{
|
}{
|
||||||
// case 0
|
|
||||||
{
|
{
|
||||||
endpoint: "https://status-not-ok.io",
|
endpoint: "https://status-not-ok.io",
|
||||||
expectederror: io.ErrUnexpectedEOF.Error(),
|
expectedError: io.ErrUnexpectedEOF.Error(),
|
||||||
},
|
},
|
||||||
// case 1
|
|
||||||
{
|
{
|
||||||
endpoint: "https://invalid-json-response.io",
|
endpoint: "https://invalid-json-response.io",
|
||||||
expectederror: "invalid json",
|
expectedError: "invalid json",
|
||||||
},
|
},
|
||||||
// case 2
|
|
||||||
{
|
{
|
||||||
endpoint: "https://valid-batch-request-download.io",
|
endpoint: "https://valid-batch-request-download.io",
|
||||||
expectederror: "",
|
expectedError: "",
|
||||||
},
|
},
|
||||||
// case 3
|
|
||||||
{
|
{
|
||||||
endpoint: "https://response-no-objects.io",
|
endpoint: "https://response-no-objects.io",
|
||||||
expectederror: "",
|
expectedError: "",
|
||||||
},
|
},
|
||||||
// case 4
|
|
||||||
{
|
{
|
||||||
endpoint: "https://unknown-transfer-adapter.io",
|
endpoint: "https://unknown-transfer-adapter.io",
|
||||||
expectederror: "TransferAdapter not found: ",
|
expectedError: "TransferAdapter not found: ",
|
||||||
},
|
},
|
||||||
// case 5
|
|
||||||
{
|
{
|
||||||
endpoint: "https://error-in-response-objects.io",
|
endpoint: "https://error-in-response-objects.io",
|
||||||
expectederror: "Object not found",
|
expectedError: "Object not found",
|
||||||
},
|
},
|
||||||
// case 6
|
|
||||||
{
|
{
|
||||||
endpoint: "https://empty-actions-map.io",
|
endpoint: "https://empty-actions-map.io",
|
||||||
expectederror: "missing action 'download'",
|
expectedError: "missing action 'download'",
|
||||||
},
|
},
|
||||||
// case 7
|
|
||||||
{
|
{
|
||||||
endpoint: "https://download-actions-map.io",
|
endpoint: "https://download-actions-map.io",
|
||||||
expectederror: "",
|
expectedError: "",
|
||||||
},
|
},
|
||||||
// case 8
|
|
||||||
{
|
{
|
||||||
endpoint: "https://upload-actions-map.io",
|
endpoint: "https://upload-actions-map.io",
|
||||||
expectederror: "missing action 'download'",
|
expectedError: "missing action 'download'",
|
||||||
},
|
},
|
||||||
// case 9
|
|
||||||
{
|
{
|
||||||
endpoint: "https://verify-actions-map.io",
|
endpoint: "https://verify-actions-map.io",
|
||||||
expectederror: "missing action 'download'",
|
expectedError: "missing action 'download'",
|
||||||
},
|
},
|
||||||
// case 10
|
|
||||||
{
|
{
|
||||||
endpoint: "https://unknown-actions-map.io",
|
endpoint: "https://unknown-actions-map.io",
|
||||||
expectederror: "missing action 'download'",
|
expectedError: "missing action 'download'",
|
||||||
},
|
},
|
||||||
// case 11
|
|
||||||
{
|
{
|
||||||
endpoint: "https://legacy-batch-request-download.io",
|
endpoint: "https://legacy-batch-request-download.io",
|
||||||
expectederror: "",
|
expectedError: "",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for n, c := range cases {
|
defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 3)()
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.endpoint, func(t *testing.T) {
|
||||||
client := &HTTPClient{
|
client := &HTTPClient{
|
||||||
client: hc,
|
client: hc,
|
||||||
endpoint: c.endpoint,
|
endpoint: c.endpoint,
|
||||||
|
@ -266,11 +258,12 @@ func TestHTTPClientDownload(t *testing.T) {
|
||||||
assert.Equal(t, []byte("dummy"), b)
|
assert.Equal(t, []byte("dummy"), b)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if len(c.expectederror) > 0 {
|
if c.expectedError != "" {
|
||||||
assert.Contains(t, err.Error(), c.expectederror, "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror)
|
assert.ErrorContains(t, err, c.expectedError)
|
||||||
} else {
|
} else {
|
||||||
require.NoError(t, err, "case %d", n)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,66 +290,57 @@ func TestHTTPClientUpload(t *testing.T) {
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
endpoint string
|
endpoint string
|
||||||
expectederror string
|
expectedError string
|
||||||
}{
|
}{
|
||||||
// case 0
|
|
||||||
{
|
{
|
||||||
endpoint: "https://status-not-ok.io",
|
endpoint: "https://status-not-ok.io",
|
||||||
expectederror: io.ErrUnexpectedEOF.Error(),
|
expectedError: io.ErrUnexpectedEOF.Error(),
|
||||||
},
|
},
|
||||||
// case 1
|
|
||||||
{
|
{
|
||||||
endpoint: "https://invalid-json-response.io",
|
endpoint: "https://invalid-json-response.io",
|
||||||
expectederror: "invalid json",
|
expectedError: "invalid json",
|
||||||
},
|
},
|
||||||
// case 2
|
|
||||||
{
|
{
|
||||||
endpoint: "https://valid-batch-request-upload.io",
|
endpoint: "https://valid-batch-request-upload.io",
|
||||||
expectederror: "",
|
expectedError: "",
|
||||||
},
|
},
|
||||||
// case 3
|
|
||||||
{
|
{
|
||||||
endpoint: "https://response-no-objects.io",
|
endpoint: "https://response-no-objects.io",
|
||||||
expectederror: "",
|
expectedError: "",
|
||||||
},
|
},
|
||||||
// case 4
|
|
||||||
{
|
{
|
||||||
endpoint: "https://unknown-transfer-adapter.io",
|
endpoint: "https://unknown-transfer-adapter.io",
|
||||||
expectederror: "TransferAdapter not found: ",
|
expectedError: "TransferAdapter not found: ",
|
||||||
},
|
},
|
||||||
// case 5
|
|
||||||
{
|
{
|
||||||
endpoint: "https://error-in-response-objects.io",
|
endpoint: "https://error-in-response-objects.io",
|
||||||
expectederror: "Object not found",
|
expectedError: "Object not found",
|
||||||
},
|
},
|
||||||
// case 6
|
|
||||||
{
|
{
|
||||||
endpoint: "https://empty-actions-map.io",
|
endpoint: "https://empty-actions-map.io",
|
||||||
expectederror: "",
|
expectedError: "",
|
||||||
},
|
},
|
||||||
// case 7
|
|
||||||
{
|
{
|
||||||
endpoint: "https://download-actions-map.io",
|
endpoint: "https://download-actions-map.io",
|
||||||
expectederror: "missing action 'upload'",
|
expectedError: "missing action 'upload'",
|
||||||
},
|
},
|
||||||
// case 8
|
|
||||||
{
|
{
|
||||||
endpoint: "https://upload-actions-map.io",
|
endpoint: "https://upload-actions-map.io",
|
||||||
expectederror: "",
|
expectedError: "",
|
||||||
},
|
},
|
||||||
// case 9
|
|
||||||
{
|
{
|
||||||
endpoint: "https://verify-actions-map.io",
|
endpoint: "https://verify-actions-map.io",
|
||||||
expectederror: "missing action 'upload'",
|
expectedError: "missing action 'upload'",
|
||||||
},
|
},
|
||||||
// case 10
|
|
||||||
{
|
{
|
||||||
endpoint: "https://unknown-actions-map.io",
|
endpoint: "https://unknown-actions-map.io",
|
||||||
expectederror: "missing action 'upload'",
|
expectedError: "missing action 'upload'",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for n, c := range cases {
|
defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 3)()
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.endpoint, func(t *testing.T) {
|
||||||
client := &HTTPClient{
|
client := &HTTPClient{
|
||||||
client: hc,
|
client: hc,
|
||||||
endpoint: c.endpoint,
|
endpoint: c.endpoint,
|
||||||
|
@ -368,10 +352,11 @@ func TestHTTPClientUpload(t *testing.T) {
|
||||||
err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) {
|
err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) {
|
||||||
return io.NopCloser(new(bytes.Buffer)), objectError
|
return io.NopCloser(new(bytes.Buffer)), objectError
|
||||||
})
|
})
|
||||||
if len(c.expectederror) > 0 {
|
if c.expectedError != "" {
|
||||||
assert.Contains(t, err.Error(), c.expectederror, "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror)
|
assert.ErrorContains(t, err, c.expectedError)
|
||||||
} else {
|
} else {
|
||||||
require.NoError(t, err, "case %d", n)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,11 +182,12 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re
|
||||||
|
|
||||||
downloadObjects := func(pointers []lfs.Pointer) error {
|
downloadObjects := func(pointers []lfs.Pointer) error {
|
||||||
err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error {
|
err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error {
|
||||||
if objectError != nil {
|
|
||||||
if errors.Is(objectError, lfs.ErrObjectNotExist) {
|
if errors.Is(objectError, lfs.ErrObjectNotExist) {
|
||||||
log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError)
|
log.Warn("Ignoring missing upstream LFS object %-v: %v", p, objectError)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if objectError != nil {
|
||||||
return objectError
|
return objectError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ var LFS = struct {
|
||||||
// LFSClient represents configuration for Gitea's LFS clients, for example: mirroring upstream Git LFS
|
// LFSClient represents configuration for Gitea's LFS clients, for example: mirroring upstream Git LFS
|
||||||
var LFSClient = struct {
|
var LFSClient = struct {
|
||||||
BatchSize int `ini:"BATCH_SIZE"`
|
BatchSize int `ini:"BATCH_SIZE"`
|
||||||
|
BatchOperationConcurrency int `ini:"BATCH_OPERATION_CONCURRENCY"`
|
||||||
}{}
|
}{}
|
||||||
|
|
||||||
func loadLFSFrom(rootCfg ConfigProvider) error {
|
func loadLFSFrom(rootCfg ConfigProvider) error {
|
||||||
|
@ -65,6 +66,11 @@ func loadLFSFrom(rootCfg ConfigProvider) error {
|
||||||
LFSClient.BatchSize = 20
|
LFSClient.BatchSize = 20
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if LFSClient.BatchOperationConcurrency < 1 {
|
||||||
|
// match the default git-lfs's `lfs.concurrenttransfers`
|
||||||
|
LFSClient.BatchOperationConcurrency = 3
|
||||||
|
}
|
||||||
|
|
||||||
LFS.HTTPAuthExpiry = sec.Key("LFS_HTTP_AUTH_EXPIRY").MustDuration(24 * time.Hour)
|
LFS.HTTPAuthExpiry = sec.Key("LFS_HTTP_AUTH_EXPIRY").MustDuration(24 * time.Hour)
|
||||||
|
|
||||||
if !LFS.StartServer || !InstallLock {
|
if !LFS.StartServer || !InstallLock {
|
||||||
|
|
|
@ -115,4 +115,17 @@ BATCH_SIZE = 0
|
||||||
assert.NoError(t, loadLFSFrom(cfg))
|
assert.NoError(t, loadLFSFrom(cfg))
|
||||||
assert.EqualValues(t, 100, LFS.MaxBatchSize)
|
assert.EqualValues(t, 100, LFS.MaxBatchSize)
|
||||||
assert.EqualValues(t, 20, LFSClient.BatchSize)
|
assert.EqualValues(t, 20, LFSClient.BatchSize)
|
||||||
|
assert.EqualValues(t, 3, LFSClient.BatchOperationConcurrency)
|
||||||
|
|
||||||
|
iniStr = `
|
||||||
|
[lfs_client]
|
||||||
|
BATCH_SIZE = 50
|
||||||
|
BATCH_OPERATION_CONCURRENCY = 10
|
||||||
|
`
|
||||||
|
cfg, err = NewConfigProviderFromData(iniStr)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.NoError(t, loadLFSFrom(cfg))
|
||||||
|
assert.EqualValues(t, 50, LFSClient.BatchSize)
|
||||||
|
assert.EqualValues(t, 10, LFSClient.BatchOperationConcurrency)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue