diff --git a/commands/command_checkout.go b/commands/command_checkout.go index 26170288..9b9a31bb 100644 --- a/commands/command_checkout.go +++ b/commands/command_checkout.go @@ -17,7 +17,7 @@ func checkoutCommand(cmd *cobra.Command, args []string) { var totalBytes int64 meter := progress.NewMeter(progress.WithOSEnv(cfg.Os)) - singleCheckout := newSingleCheckout("") + singleCheckout := newSingleCheckout() chgitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) { if err != nil { LoggedError(err, "Scanner error") diff --git a/commands/command_env.go b/commands/command_env.go index 52f495d6..070a4dc4 100644 --- a/commands/command_env.go +++ b/commands/command_env.go @@ -35,7 +35,7 @@ func envCommand(cmd *cobra.Command, args []string) { } } - for _, env := range lfs.Environ(cfg, defaultTransferManifest()) { + for _, env := range lfs.Environ(cfg, buildTransferManifest()) { Print(env) } diff --git a/commands/command_fetch.go b/commands/command_fetch.go index 82a0c885..580229a9 100644 --- a/commands/command_fetch.go +++ b/commands/command_fetch.go @@ -279,8 +279,7 @@ func fetchAndReportToChan(allpointers []*lfs.WrappedPointer, filter *filepathfil } ready, pointers, meter := readyAndMissingPointers(allpointers, filter) - manifest := buildTransferManifest("download", cfg.CurrentRemote) - q := newDownloadQueue(manifest, tq.WithProgress(meter)) + q := newDownloadQueue(buildTransferManifest(), cfg.CurrentRemote, tq.WithProgress(meter)) if out != nil { // If we already have it, or it won't be fetched diff --git a/commands/command_prune.go b/commands/command_prune.go index f0c61dee..41b8df1a 100644 --- a/commands/command_prune.go +++ b/commands/command_prune.go @@ -120,8 +120,7 @@ func prune(fetchPruneConfig config.FetchPruneConfig, verifyRemote, dryRun, verbo var verifywait sync.WaitGroup if verifyRemote { - manifest := buildTransferManifest("download", fetchPruneConfig.PruneRemoteName) - verifyQueue = newDownloadCheckQueue(manifest) + verifyQueue = newDownloadCheckQueue(buildTransferManifest(), fetchPruneConfig.PruneRemoteName) verifiedObjects = tools.NewStringSetWithCapacity(len(localObjects) / 2) // this channel is filled with oids for which Check() succeeded & Transfer() was called diff --git a/commands/command_pull.go b/commands/command_pull.go index 0506e588..b4a212ea 100644 --- a/commands/command_pull.go +++ b/commands/command_pull.go @@ -48,8 +48,8 @@ func pull(remote string, filter *filepathfilter.Filter) { pointers := newPointerMap() meter := progress.NewMeter(progress.WithOSEnv(cfg.Os)) - singleCheckout := newSingleCheckout(remote) - q := newDownloadQueue(singleCheckout.manifest, tq.WithProgress(meter)) + singleCheckout := newSingleCheckout() + q := newDownloadQueue(singleCheckout.manifest, remote, tq.WithProgress(meter)) gitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) { if err != nil { LoggedError(err, "Scanner error") diff --git a/commands/command_smudge.go b/commands/command_smudge.go index 74d5adfc..d1248ed4 100644 --- a/commands/command_smudge.go +++ b/commands/command_smudge.go @@ -40,8 +40,7 @@ func smudge(to io.Writer, ptr *lfs.Pointer, filename string, skip bool, filter * download = filter.Allows(filename) } - manifest := buildTransferManifest("download", cfg.CurrentRemote) - err = ptr.Smudge(to, filename, download, manifest, cb) + err = ptr.Smudge(to, filename, download, buildTransferManifest(), cb) if file != nil { file.Close() } diff --git a/commands/commands.go b/commands/commands.go index 8f9d45af..bdc1e382 100644 --- a/commands/commands.go +++ b/commands/commands.go @@ -40,14 +40,8 @@ var ( // buildTransferManifest builds a tq.Manifest from the global os and git // environments. -func buildTransferManifest(operation, remote string) *tq.Manifest { - return tq.NewManifestWithClient(newAPIClient(), operation, remote) -} - -// defaultTransferManifest builds a tq.Manifest from the commands package global -// cfg var. -func defaultTransferManifest() *tq.Manifest { - return buildTransferManifest("download", cfg.CurrentRemote) +func buildTransferManifest() *tq.Manifest { + return tq.NewManifestWithClient(newAPIClient()) } func newAPIClient() *lfsapi.Client { @@ -72,21 +66,21 @@ func newLockClient(remote string) *locking.Client { } // newDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download -func newDownloadCheckQueue(manifest *tq.Manifest, options ...tq.Option) *tq.TransferQueue { +func newDownloadCheckQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue { allOptions := make([]tq.Option, len(options), len(options)+1) allOptions = append(allOptions, options...) allOptions = append(allOptions, tq.DryRun(true)) - return newDownloadQueue(manifest, allOptions...) + return newDownloadQueue(manifest, remote, allOptions...) } // newDownloadQueue builds a DownloadQueue, allowing concurrent downloads. -func newDownloadQueue(manifest *tq.Manifest, options ...tq.Option) *tq.TransferQueue { - return tq.NewTransferQueue(tq.Download, manifest, options...) +func newDownloadQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue { + return tq.NewTransferQueue(tq.Download, manifest, remote, options...) } // newUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads. -func newUploadQueue(manifest *tq.Manifest, options ...tq.Option) *tq.TransferQueue { - return tq.NewTransferQueue(tq.Upload, manifest, options...) +func newUploadQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue { + return tq.NewTransferQueue(tq.Upload, manifest, remote, options...) } func buildFilepathFilter(config *config.Configuration, includeArg, excludeArg *string) *filepathfilter.Filter { @@ -352,7 +346,7 @@ func logPanicToWriter(w io.Writer, loggedError error) { fmt.Fprintln(w, "\nENV:") // log the environment - for _, env := range lfs.Environ(cfg, defaultTransferManifest()) { + for _, env := range lfs.Environ(cfg, buildTransferManifest()) { fmt.Fprintln(w, env) } } diff --git a/commands/pull.go b/commands/pull.go index 9c545691..42ab0ec2 100644 --- a/commands/pull.go +++ b/commands/pull.go @@ -15,7 +15,7 @@ import ( // Handles the process of checking out a single file, and updating the git // index. -func newSingleCheckout(remote string) *singleCheckout { +func newSingleCheckout() *singleCheckout { // Get a converter from repo-relative to cwd-relative // Since writing data & calling git update-index must be relative to cwd pathConverter, err := lfs.NewRepoToCurrentPathConverter() @@ -26,7 +26,7 @@ func newSingleCheckout(remote string) *singleCheckout { return &singleCheckout{ gitIndexer: &gitIndexer{}, pathConverter: pathConverter, - manifest: buildTransferManifest("download", remote), + manifest: buildTransferManifest(), } } diff --git a/commands/uploader.go b/commands/uploader.go index 947d9649..0ac22d51 100644 --- a/commands/uploader.go +++ b/commands/uploader.go @@ -22,7 +22,7 @@ func newUploadContext(remote string, dryRun bool) *uploadContext { cfg.CurrentRemote = remote return &uploadContext{ Remote: remote, - Manifest: buildTransferManifest("upload", remote), + Manifest: buildTransferManifest(), DryRun: dryRun, uploadedOids: tools.NewStringSet(), } @@ -80,7 +80,7 @@ func (c *uploadContext) prepareUpload(unfiltered []*lfs.WrappedPointer) (*tq.Tra // build the TransferQueue, automatically skipping any missing objects that // the server already has. - uploadQueue := newUploadQueue(c.Manifest, tq.WithProgress(meter), tq.DryRun(c.DryRun)) + uploadQueue := newUploadQueue(c.Manifest, c.Remote, tq.WithProgress(meter), tq.DryRun(c.DryRun)) for _, p := range missingLocalObjects { if c.HasUploaded(p.Oid) { // if the server already has this object, call Skip() on @@ -104,7 +104,7 @@ func (c *uploadContext) checkMissing(missing []*lfs.WrappedPointer, missingSize return } - checkQueue := newDownloadCheckQueue(c.Manifest) + checkQueue := newDownloadCheckQueue(c.Manifest, c.Remote) transferCh := checkQueue.Watch() done := make(chan int) diff --git a/lfs/manifest_test.go b/lfs/manifest_test.go index 7a60dd47..ad93dfdb 100644 --- a/lfs/manifest_test.go +++ b/lfs/manifest_test.go @@ -15,7 +15,7 @@ func TestManifestIsConfigurable(t *testing.T) { })) require.Nil(t, err) - m := tq.NewManifestWithClient(cli, "", "") + m := tq.NewManifestWithClient(cli) assert.Equal(t, 3, m.MaxRetries()) } @@ -27,7 +27,7 @@ func TestManifestChecksNTLM(t *testing.T) { })) require.Nil(t, err) - m := tq.NewManifestWithClient(cli, "", "") + m := tq.NewManifestWithClient(cli) assert.Equal(t, 1, m.MaxRetries()) } @@ -37,7 +37,7 @@ func TestManifestClampsValidValues(t *testing.T) { })) require.Nil(t, err) - m := tq.NewManifestWithClient(cli, "", "") + m := tq.NewManifestWithClient(cli) assert.Equal(t, 1, m.MaxRetries()) } @@ -47,6 +47,6 @@ func TestManifestIgnoresNonInts(t *testing.T) { })) require.Nil(t, err) - m := tq.NewManifestWithClient(cli, "", "") + m := tq.NewManifestWithClient(cli) assert.Equal(t, 1, m.MaxRetries()) } diff --git a/lfs/pointer_smudge.go b/lfs/pointer_smudge.go index df556396..1173383d 100644 --- a/lfs/pointer_smudge.go +++ b/lfs/pointer_smudge.go @@ -74,7 +74,7 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, download func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb progress.CopyCallback) error { fmt.Fprintf(os.Stderr, "Downloading %s (%s)\n", workingfile, pb.FormatBytes(ptr.Size)) - q := tq.NewTransferQueue(tq.Download, manifest) + q := tq.NewTransferQueue(tq.Download, manifest, "") q.Add(filepath.Base(workingfile), mediafile, ptr.Oid, ptr.Size) q.Wait() diff --git a/test/git-lfs-test-server-api/main.go b/test/git-lfs-test-server-api/main.go index d8162d58..b232d73b 100644 --- a/test/git-lfs-test-server-api/main.go +++ b/test/git-lfs-test-server-api/main.go @@ -166,8 +166,8 @@ func buildTestData() (oidsExist, oidsMissing []TestObject, err error) { outputs := repo.AddCommits([]*test.CommitInput{&commit}) // now upload - manifest := tq.NewManifestWithClient(apiClient, "upload", cfg.CurrentRemote) - uploadQueue := tq.NewTransferQueue(tq.Upload, manifest, tq.WithProgress(meter)) + manifest := tq.NewManifestWithClient(apiClient) + uploadQueue := tq.NewTransferQueue(tq.Upload, manifest, "", tq.WithProgress(meter)) for _, f := range outputs[0].Files { oidsExist = append(oidsExist, TestObject{Oid: f.Oid, Size: f.Size}) diff --git a/tq/api.go b/tq/api.go index 772c0e49..c94ab250 100644 --- a/tq/api.go +++ b/tq/api.go @@ -21,6 +21,7 @@ type batchRequest struct { } type batchResponse struct { + Endpoint lfsapi.Endpoint TransferAdapterName string `json:"transfer"` Objects []*api.ObjectResource `json:"objects"` } @@ -35,8 +36,8 @@ func (c *tqClient) Batch(remote string, bReq *batchRequest) (*batchResponse, *ht bReq.TransferAdapterNames = nil } - e := c.Endpoints.Endpoint(bReq.Operation, remote) - req, err := c.NewRequest("POST", e, "objects/batch", bReq) + bRes.Endpoint = c.Endpoints.Endpoint(bReq.Operation, remote) + req, err := c.NewRequest("POST", bRes.Endpoint, "objects/batch", bReq) if err != nil { return nil, nil, errors.Wrap(err, "batch request") } diff --git a/tq/custom_test.go b/tq/custom_test.go index 9b35b407..8c90febf 100644 --- a/tq/custom_test.go +++ b/tq/custom_test.go @@ -15,7 +15,7 @@ func TestCustomTransferBasicConfig(t *testing.T) { })) require.Nil(t, err) - m := NewManifestWithClient(cli, "", "") + m := NewManifestWithClient(cli) u := m.NewUploadAdapter("testsimple") assert.NotNil(t, u, "Upload adapter should be present") cu, _ := u.(*customAdapter) @@ -44,7 +44,7 @@ func TestCustomTransferDownloadConfig(t *testing.T) { })) require.Nil(t, err) - m := NewManifestWithClient(cli, "", "") + m := NewManifestWithClient(cli) u := m.NewUploadAdapter("testdownload") assert.NotNil(t, u, "Upload adapter should always be created") cu, _ := u.(*customAdapter) @@ -70,7 +70,7 @@ func TestCustomTransferUploadConfig(t *testing.T) { })) require.Nil(t, err) - m := NewManifestWithClient(cli, "", "") + m := NewManifestWithClient(cli) d := m.NewDownloadAdapter("testupload") assert.NotNil(t, d, "Download adapter should always be created") cd, _ := d.(*customAdapter) @@ -96,7 +96,7 @@ func TestCustomTransferBothConfig(t *testing.T) { })) require.Nil(t, err) - m := NewManifestWithClient(cli, "", "") + m := NewManifestWithClient(cli) d := m.NewDownloadAdapter("testboth") assert.NotNil(t, d, "Download adapter should be present") cd, _ := d.(*customAdapter) diff --git a/tq/manifest.go b/tq/manifest.go index 2b2544f1..3f99903d 100644 --- a/tq/manifest.go +++ b/tq/manifest.go @@ -45,13 +45,12 @@ func NewManifest() *Manifest { return nil } - return NewManifestWithClient(cli, "", "") + return NewManifestWithClient(cli) } -func NewManifestWithClient(apiClient *lfsapi.Client, operation, remote string) *Manifest { +func NewManifestWithClient(apiClient *lfsapi.Client) *Manifest { m := &Manifest{ apiClient: apiClient, - remote: remote, downloadAdapterFuncs: make(map[string]NewAdapterFunc), uploadAdapterFuncs: make(map[string]NewAdapterFunc), } @@ -73,10 +72,7 @@ func NewManifestWithClient(apiClient *lfsapi.Client, operation, remote string) * m.maxRetries = defaultMaxRetries } - e := apiClient.Endpoints.Endpoint(operation, remote) - if apiClient.Endpoints.AccessFor(e.Url) == lfsapi.NTLMAccess { - m.concurrentTransfers = 1 - } else if m.concurrentTransfers < 1 { + if m.concurrentTransfers < 1 { m.concurrentTransfers = defaultConcurrentTransfers } diff --git a/tq/transfer.go b/tq/transfer.go index 84135f43..d381dbf6 100644 --- a/tq/transfer.go +++ b/tq/transfer.go @@ -8,6 +8,7 @@ import ( "github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/errors" + "github.com/git-lfs/git-lfs/lfsapi" ) type Direction int @@ -157,9 +158,23 @@ type NewAdapterFunc func(name string, dir Direction) Adapter type ProgressCallback func(name string, totalSize, readSoFar int64, readSinceLast int) error type AdapterConfig interface { + APIClient() *lfsapi.Client ConcurrentTransfers() int } +type adapterConfig struct { + apiClient *lfsapi.Client + concurrentTransfers int +} + +func (c *adapterConfig) ConcurrentTransfers() int { + return c.concurrentTransfers +} + +func (c *adapterConfig) APIClient() *lfsapi.Client { + return c.apiClient +} + // Adapter is implemented by types which can upload and/or download LFS // file content to a remote store. Each Adapter accepts one or more requests // which it may schedule and parallelise in whatever way it chooses, clients of diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index a9567d87..85254052 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -6,6 +6,7 @@ import ( "github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/errors" + "github.com/git-lfs/git-lfs/lfsapi" "github.com/git-lfs/git-lfs/progress" "github.com/rubyist/tracerx" ) @@ -148,11 +149,11 @@ func WithBufferDepth(depth int) Option { } // NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter -func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *TransferQueue { +func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue { q := &TransferQueue{ direction: dir, client: &tqClient{Client: manifest.APIClient()}, - remote: manifest.remote, + remote: remote, errorc: make(chan error), transfers: make(map[string]*objectTuple), trMutex: &sync.Mutex{}, @@ -369,7 +370,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) } } - retries := q.addToAdapter(toTransfer) + retries := q.addToAdapter(bRes.Endpoint, toTransfer) for t := range retries { q.rc.Increment(t.Oid) count := q.rc.CountFor(t.Oid) @@ -392,10 +393,10 @@ func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) } // closed. // // addToAdapter returns immediately, and does not block. -func (q *TransferQueue) addToAdapter(pending []*Transfer) <-chan *objectTuple { +func (q *TransferQueue) addToAdapter(e lfsapi.Endpoint, pending []*Transfer) <-chan *objectTuple { retries := make(chan *objectTuple, len(pending)) - if err := q.ensureAdapterBegun(); err != nil { + if err := q.ensureAdapterBegun(e); err != nil { close(retries) q.errorc <- err @@ -522,7 +523,7 @@ func (q *TransferQueue) transferKind() string { } } -func (q *TransferQueue) ensureAdapterBegun() error { +func (q *TransferQueue) ensureAdapterBegun(e lfsapi.Endpoint) error { q.adapterInitMutex.Lock() defer q.adapterInitMutex.Unlock() @@ -537,7 +538,7 @@ func (q *TransferQueue) ensureAdapterBegun() error { } tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name()) - err := q.adapter.Begin(q.manifest, cb) + err := q.adapter.Begin(toAdapterCfg(q.manifest, e), cb) if err != nil { return err } @@ -546,6 +547,15 @@ func (q *TransferQueue) ensureAdapterBegun() error { return nil } +func toAdapterCfg(m *Manifest, e lfsapi.Endpoint) AdapterConfig { + apiClient := m.APIClient() + concurrency := m.ConcurrentTransfers() + if apiClient.Endpoints.AccessFor(e.Url) == lfsapi.NTLMAccess { + concurrency = 1 + } + return &adapterConfig{concurrentTransfers: concurrency, apiClient: apiClient} +} + // Wait waits for the queue to finish processing all transfers. Once Wait is // called, Add will no longer add transfers to the queue. Any failed // transfers will be automatically retried once. diff --git a/tq/transfer_test.go b/tq/transfer_test.go index 764b0016..6a8c4641 100644 --- a/tq/transfer_test.go +++ b/tq/transfer_test.go @@ -123,7 +123,7 @@ func testAdapterRegButBasicOnly(t *testing.T) { })) require.Nil(t, err) - m := NewManifestWithClient(cli, "", "") + m := NewManifestWithClient(cli) assert := assert.New(t)