From faeb7f00cb371ebbdacd3cd4161e55670e89457f Mon Sep 17 00:00:00 2001 From: risk danger olson Date: Wed, 4 Jan 2017 10:11:16 -0700 Subject: [PATCH] tq: use lfsapi.Client to make batch api requests --- test/test-batch-transfer.sh | 1 - tq/api.go | 18 ++++++++++++++++-- tq/manifest.go | 2 ++ tq/transfer_queue.go | 29 ++++++++++++++++++----------- 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/test/test-batch-transfer.sh b/test/test-batch-transfer.sh index 52d1455a..1046e5b6 100755 --- a/test/test-batch-transfer.sh +++ b/test/test-batch-transfer.sh @@ -100,4 +100,3 @@ begin_test "batch transfers occur in reverse order by size" [ "$pos_large" -lt "$pos_small" ] ) end_test - diff --git a/tq/api.go b/tq/api.go index 636da1a7..92dc3cec 100644 --- a/tq/api.go +++ b/tq/api.go @@ -2,9 +2,12 @@ package tq import ( "net/http" + "strings" "github.com/git-lfs/git-lfs/api" + "github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/lfsapi" + "github.com/rubyist/tracerx" ) type tqClient struct { @@ -35,12 +38,23 @@ func (c *tqClient) Batch(remote string, bReq *batchRequest) (*batchResponse, *ht e := c.Endpoints.Endpoint(bReq.Operation, remote) req, err := c.NewRequest("POST", e, "objects/batch", bReq) if err != nil { - return nil, nil, err + return nil, nil, errors.Wrap(err, "batch request") } + tracerx.Printf("api: batch %d files", len(bReq.Objects)) + res, err := c.DoWithAuth(remote, req) if err != nil { - return nil, nil, err + tracerx.Printf("api error: %s", err) + return nil, nil, errors.Wrap(err, "batch response") + } + c.LogResponse("lfs.batch", res) + + if res.StatusCode != 200 { + return nil, res, errors.Errorf("Invalid status for %s %s: %d", + req.Method, + strings.SplitN(req.URL.String(), "?", 2)[0], + res.StatusCode) } return bRes, res, lfsapi.DecodeJSON(res, bRes) diff --git a/tq/manifest.go b/tq/manifest.go index 8fca4177..2b2544f1 100644 --- a/tq/manifest.go +++ b/tq/manifest.go @@ -19,6 +19,7 @@ type Manifest struct { concurrentTransfers int basicTransfersOnly bool tusTransfersAllowed bool + remote string downloadAdapterFuncs map[string]NewAdapterFunc uploadAdapterFuncs map[string]NewAdapterFunc apiClient *lfsapi.Client @@ -50,6 +51,7 @@ func NewManifest() *Manifest { func NewManifestWithClient(apiClient *lfsapi.Client, operation, remote string) *Manifest { m := &Manifest{ apiClient: apiClient, + remote: remote, downloadAdapterFuncs: make(map[string]NewAdapterFunc), uploadAdapterFuncs: make(map[string]NewAdapterFunc), } diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index febd9f38..a9567d87 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/git-lfs/git-lfs/api" - "github.com/git-lfs/git-lfs/config" "github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/progress" "github.com/rubyist/tracerx" @@ -94,6 +93,8 @@ func (b batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] } // adapters, and dealing with progress, errors and retries. type TransferQueue struct { direction Direction + client *tqClient + remote string adapter Adapter adapterInProgress bool adapterInitMutex sync.Mutex @@ -150,6 +151,8 @@ func WithBufferDepth(depth int) Option { func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *TransferQueue { q := &TransferQueue{ direction: dir, + client: &tqClient{Client: manifest.APIClient()}, + remote: manifest.remote, errorc: make(chan error), transfers: make(map[string]*objectTuple), trMutex: &sync.Mutex{}, @@ -281,16 +284,16 @@ func (q *TransferQueue) collectBatches() { // enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been // processed. func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) { - cfg := config.Config - next := q.makeBatch() - transferAdapterNames := q.manifest.GetAdapterNames(q.direction) - tracerx.Printf("tq: sending batch of size %d", len(batch)) - objs, adapterName, err := api.Batch( - cfg, batch.ApiObjects(), q.transferKind(), transferAdapterNames, - ) + bReq := &batchRequest{ + Operation: q.transferKind(), + Objects: batch.ApiObjects(), + TransferAdapterNames: q.manifest.GetAdapterNames(q.direction), + } + + bRes, _, err := q.client.Batch(q.remote, bReq) if err != nil { // If there was an error making the batch API call, mark all of // the objects for retry, and return them along with the error @@ -309,12 +312,16 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) return next, err } - q.useAdapter(adapterName) + if len(bRes.Objects) == 0 { + return next, nil + } + + q.useAdapter(bRes.TransferAdapterName) q.startProgress.Do(q.meter.Start) - toTransfer := make([]*Transfer, 0, len(objs)) + toTransfer := make([]*Transfer, 0, len(bRes.Objects)) - for _, o := range objs { + for _, o := range bRes.Objects { if o.Error != nil { q.errorc <- errors.Wrapf(o.Error, "[%v] %v", o.Oid, o.Error.Message) q.Skip(o.Size)