tq: use lfsapi.Client to make batch api requests
This commit is contained in:
parent
f52de2fd89
commit
faeb7f00cb
@ -100,4 +100,3 @@ begin_test "batch transfers occur in reverse order by size"
|
||||
[ "$pos_large" -lt "$pos_small" ]
|
||||
)
|
||||
end_test
|
||||
|
||||
|
18
tq/api.go
18
tq/api.go
@ -2,9 +2,12 @@ package tq
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/git-lfs/git-lfs/api"
|
||||
"github.com/git-lfs/git-lfs/errors"
|
||||
"github.com/git-lfs/git-lfs/lfsapi"
|
||||
"github.com/rubyist/tracerx"
|
||||
)
|
||||
|
||||
type tqClient struct {
|
||||
@ -35,12 +38,23 @@ func (c *tqClient) Batch(remote string, bReq *batchRequest) (*batchResponse, *ht
|
||||
e := c.Endpoints.Endpoint(bReq.Operation, remote)
|
||||
req, err := c.NewRequest("POST", e, "objects/batch", bReq)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, errors.Wrap(err, "batch request")
|
||||
}
|
||||
|
||||
tracerx.Printf("api: batch %d files", len(bReq.Objects))
|
||||
|
||||
res, err := c.DoWithAuth(remote, req)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
tracerx.Printf("api error: %s", err)
|
||||
return nil, nil, errors.Wrap(err, "batch response")
|
||||
}
|
||||
c.LogResponse("lfs.batch", res)
|
||||
|
||||
if res.StatusCode != 200 {
|
||||
return nil, res, errors.Errorf("Invalid status for %s %s: %d",
|
||||
req.Method,
|
||||
strings.SplitN(req.URL.String(), "?", 2)[0],
|
||||
res.StatusCode)
|
||||
}
|
||||
|
||||
return bRes, res, lfsapi.DecodeJSON(res, bRes)
|
||||
|
@ -19,6 +19,7 @@ type Manifest struct {
|
||||
concurrentTransfers int
|
||||
basicTransfersOnly bool
|
||||
tusTransfersAllowed bool
|
||||
remote string
|
||||
downloadAdapterFuncs map[string]NewAdapterFunc
|
||||
uploadAdapterFuncs map[string]NewAdapterFunc
|
||||
apiClient *lfsapi.Client
|
||||
@ -50,6 +51,7 @@ func NewManifest() *Manifest {
|
||||
func NewManifestWithClient(apiClient *lfsapi.Client, operation, remote string) *Manifest {
|
||||
m := &Manifest{
|
||||
apiClient: apiClient,
|
||||
remote: remote,
|
||||
downloadAdapterFuncs: make(map[string]NewAdapterFunc),
|
||||
uploadAdapterFuncs: make(map[string]NewAdapterFunc),
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/git-lfs/git-lfs/api"
|
||||
"github.com/git-lfs/git-lfs/config"
|
||||
"github.com/git-lfs/git-lfs/errors"
|
||||
"github.com/git-lfs/git-lfs/progress"
|
||||
"github.com/rubyist/tracerx"
|
||||
@ -94,6 +93,8 @@ func (b batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
||||
// adapters, and dealing with progress, errors and retries.
|
||||
type TransferQueue struct {
|
||||
direction Direction
|
||||
client *tqClient
|
||||
remote string
|
||||
adapter Adapter
|
||||
adapterInProgress bool
|
||||
adapterInitMutex sync.Mutex
|
||||
@ -150,6 +151,8 @@ func WithBufferDepth(depth int) Option {
|
||||
func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *TransferQueue {
|
||||
q := &TransferQueue{
|
||||
direction: dir,
|
||||
client: &tqClient{Client: manifest.APIClient()},
|
||||
remote: manifest.remote,
|
||||
errorc: make(chan error),
|
||||
transfers: make(map[string]*objectTuple),
|
||||
trMutex: &sync.Mutex{},
|
||||
@ -281,16 +284,16 @@ func (q *TransferQueue) collectBatches() {
|
||||
// enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been
|
||||
// processed.
|
||||
func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) {
|
||||
cfg := config.Config
|
||||
|
||||
next := q.makeBatch()
|
||||
transferAdapterNames := q.manifest.GetAdapterNames(q.direction)
|
||||
|
||||
tracerx.Printf("tq: sending batch of size %d", len(batch))
|
||||
|
||||
objs, adapterName, err := api.Batch(
|
||||
cfg, batch.ApiObjects(), q.transferKind(), transferAdapterNames,
|
||||
)
|
||||
bReq := &batchRequest{
|
||||
Operation: q.transferKind(),
|
||||
Objects: batch.ApiObjects(),
|
||||
TransferAdapterNames: q.manifest.GetAdapterNames(q.direction),
|
||||
}
|
||||
|
||||
bRes, _, err := q.client.Batch(q.remote, bReq)
|
||||
if err != nil {
|
||||
// If there was an error making the batch API call, mark all of
|
||||
// the objects for retry, and return them along with the error
|
||||
@ -309,12 +312,16 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
|
||||
return next, err
|
||||
}
|
||||
|
||||
q.useAdapter(adapterName)
|
||||
if len(bRes.Objects) == 0 {
|
||||
return next, nil
|
||||
}
|
||||
|
||||
q.useAdapter(bRes.TransferAdapterName)
|
||||
q.startProgress.Do(q.meter.Start)
|
||||
|
||||
toTransfer := make([]*Transfer, 0, len(objs))
|
||||
toTransfer := make([]*Transfer, 0, len(bRes.Objects))
|
||||
|
||||
for _, o := range objs {
|
||||
for _, o := range bRes.Objects {
|
||||
if o.Error != nil {
|
||||
q.errorc <- errors.Wrapf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
|
||||
q.Skip(o.Size)
|
||||
|
Loading…
Reference in New Issue
Block a user