tq: remove remote/operation/access from tq.Manifest, make remote an arg of NewTransferQueue()

This commit is contained in:
risk danger olson 2017-01-04 14:46:30 -07:00
parent d874b99f79
commit 2b20d510cc
18 changed files with 71 additions and 58 deletions

@ -17,7 +17,7 @@ func checkoutCommand(cmd *cobra.Command, args []string) {
var totalBytes int64 var totalBytes int64
meter := progress.NewMeter(progress.WithOSEnv(cfg.Os)) meter := progress.NewMeter(progress.WithOSEnv(cfg.Os))
singleCheckout := newSingleCheckout("") singleCheckout := newSingleCheckout()
chgitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) { chgitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) {
if err != nil { if err != nil {
LoggedError(err, "Scanner error") LoggedError(err, "Scanner error")

@ -35,7 +35,7 @@ func envCommand(cmd *cobra.Command, args []string) {
} }
} }
for _, env := range lfs.Environ(cfg, defaultTransferManifest()) { for _, env := range lfs.Environ(cfg, buildTransferManifest()) {
Print(env) Print(env)
} }

@ -279,8 +279,7 @@ func fetchAndReportToChan(allpointers []*lfs.WrappedPointer, filter *filepathfil
} }
ready, pointers, meter := readyAndMissingPointers(allpointers, filter) ready, pointers, meter := readyAndMissingPointers(allpointers, filter)
manifest := buildTransferManifest("download", cfg.CurrentRemote) q := newDownloadQueue(buildTransferManifest(), cfg.CurrentRemote, tq.WithProgress(meter))
q := newDownloadQueue(manifest, tq.WithProgress(meter))
if out != nil { if out != nil {
// If we already have it, or it won't be fetched // If we already have it, or it won't be fetched

@ -120,8 +120,7 @@ func prune(fetchPruneConfig config.FetchPruneConfig, verifyRemote, dryRun, verbo
var verifywait sync.WaitGroup var verifywait sync.WaitGroup
if verifyRemote { if verifyRemote {
manifest := buildTransferManifest("download", fetchPruneConfig.PruneRemoteName) verifyQueue = newDownloadCheckQueue(buildTransferManifest(), fetchPruneConfig.PruneRemoteName)
verifyQueue = newDownloadCheckQueue(manifest)
verifiedObjects = tools.NewStringSetWithCapacity(len(localObjects) / 2) verifiedObjects = tools.NewStringSetWithCapacity(len(localObjects) / 2)
// this channel is filled with oids for which Check() succeeded & Transfer() was called // this channel is filled with oids for which Check() succeeded & Transfer() was called

@ -48,8 +48,8 @@ func pull(remote string, filter *filepathfilter.Filter) {
pointers := newPointerMap() pointers := newPointerMap()
meter := progress.NewMeter(progress.WithOSEnv(cfg.Os)) meter := progress.NewMeter(progress.WithOSEnv(cfg.Os))
singleCheckout := newSingleCheckout(remote) singleCheckout := newSingleCheckout()
q := newDownloadQueue(singleCheckout.manifest, tq.WithProgress(meter)) q := newDownloadQueue(singleCheckout.manifest, remote, tq.WithProgress(meter))
gitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) { gitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) {
if err != nil { if err != nil {
LoggedError(err, "Scanner error") LoggedError(err, "Scanner error")

@ -40,8 +40,7 @@ func smudge(to io.Writer, ptr *lfs.Pointer, filename string, skip bool, filter *
download = filter.Allows(filename) download = filter.Allows(filename)
} }
manifest := buildTransferManifest("download", cfg.CurrentRemote) err = ptr.Smudge(to, filename, download, buildTransferManifest(), cb)
err = ptr.Smudge(to, filename, download, manifest, cb)
if file != nil { if file != nil {
file.Close() file.Close()
} }

@ -40,14 +40,8 @@ var (
// buildTransferManifest builds a tq.Manifest from the global os and git // buildTransferManifest builds a tq.Manifest from the global os and git
// environments. // environments.
func buildTransferManifest(operation, remote string) *tq.Manifest { func buildTransferManifest() *tq.Manifest {
return tq.NewManifestWithClient(newAPIClient(), operation, remote) return tq.NewManifestWithClient(newAPIClient())
}
// defaultTransferManifest builds a tq.Manifest from the commands package global
// cfg var.
func defaultTransferManifest() *tq.Manifest {
return buildTransferManifest("download", cfg.CurrentRemote)
} }
func newAPIClient() *lfsapi.Client { func newAPIClient() *lfsapi.Client {
@ -72,21 +66,21 @@ func newLockClient(remote string) *locking.Client {
} }
// newDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download // newDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download
func newDownloadCheckQueue(manifest *tq.Manifest, options ...tq.Option) *tq.TransferQueue { func newDownloadCheckQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
allOptions := make([]tq.Option, len(options), len(options)+1) allOptions := make([]tq.Option, len(options), len(options)+1)
allOptions = append(allOptions, options...) allOptions = append(allOptions, options...)
allOptions = append(allOptions, tq.DryRun(true)) allOptions = append(allOptions, tq.DryRun(true))
return newDownloadQueue(manifest, allOptions...) return newDownloadQueue(manifest, remote, allOptions...)
} }
// newDownloadQueue builds a DownloadQueue, allowing concurrent downloads. // newDownloadQueue builds a DownloadQueue, allowing concurrent downloads.
func newDownloadQueue(manifest *tq.Manifest, options ...tq.Option) *tq.TransferQueue { func newDownloadQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
return tq.NewTransferQueue(tq.Download, manifest, options...) return tq.NewTransferQueue(tq.Download, manifest, remote, options...)
} }
// newUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads. // newUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
func newUploadQueue(manifest *tq.Manifest, options ...tq.Option) *tq.TransferQueue { func newUploadQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
return tq.NewTransferQueue(tq.Upload, manifest, options...) return tq.NewTransferQueue(tq.Upload, manifest, remote, options...)
} }
func buildFilepathFilter(config *config.Configuration, includeArg, excludeArg *string) *filepathfilter.Filter { func buildFilepathFilter(config *config.Configuration, includeArg, excludeArg *string) *filepathfilter.Filter {
@ -352,7 +346,7 @@ func logPanicToWriter(w io.Writer, loggedError error) {
fmt.Fprintln(w, "\nENV:") fmt.Fprintln(w, "\nENV:")
// log the environment // log the environment
for _, env := range lfs.Environ(cfg, defaultTransferManifest()) { for _, env := range lfs.Environ(cfg, buildTransferManifest()) {
fmt.Fprintln(w, env) fmt.Fprintln(w, env)
} }
} }

@ -15,7 +15,7 @@ import (
// Handles the process of checking out a single file, and updating the git // Handles the process of checking out a single file, and updating the git
// index. // index.
func newSingleCheckout(remote string) *singleCheckout { func newSingleCheckout() *singleCheckout {
// Get a converter from repo-relative to cwd-relative // Get a converter from repo-relative to cwd-relative
// Since writing data & calling git update-index must be relative to cwd // Since writing data & calling git update-index must be relative to cwd
pathConverter, err := lfs.NewRepoToCurrentPathConverter() pathConverter, err := lfs.NewRepoToCurrentPathConverter()
@ -26,7 +26,7 @@ func newSingleCheckout(remote string) *singleCheckout {
return &singleCheckout{ return &singleCheckout{
gitIndexer: &gitIndexer{}, gitIndexer: &gitIndexer{},
pathConverter: pathConverter, pathConverter: pathConverter,
manifest: buildTransferManifest("download", remote), manifest: buildTransferManifest(),
} }
} }

@ -22,7 +22,7 @@ func newUploadContext(remote string, dryRun bool) *uploadContext {
cfg.CurrentRemote = remote cfg.CurrentRemote = remote
return &uploadContext{ return &uploadContext{
Remote: remote, Remote: remote,
Manifest: buildTransferManifest("upload", remote), Manifest: buildTransferManifest(),
DryRun: dryRun, DryRun: dryRun,
uploadedOids: tools.NewStringSet(), uploadedOids: tools.NewStringSet(),
} }
@ -80,7 +80,7 @@ func (c *uploadContext) prepareUpload(unfiltered []*lfs.WrappedPointer) (*tq.Tra
// build the TransferQueue, automatically skipping any missing objects that // build the TransferQueue, automatically skipping any missing objects that
// the server already has. // the server already has.
uploadQueue := newUploadQueue(c.Manifest, tq.WithProgress(meter), tq.DryRun(c.DryRun)) uploadQueue := newUploadQueue(c.Manifest, c.Remote, tq.WithProgress(meter), tq.DryRun(c.DryRun))
for _, p := range missingLocalObjects { for _, p := range missingLocalObjects {
if c.HasUploaded(p.Oid) { if c.HasUploaded(p.Oid) {
// if the server already has this object, call Skip() on // if the server already has this object, call Skip() on
@ -104,7 +104,7 @@ func (c *uploadContext) checkMissing(missing []*lfs.WrappedPointer, missingSize
return return
} }
checkQueue := newDownloadCheckQueue(c.Manifest) checkQueue := newDownloadCheckQueue(c.Manifest, c.Remote)
transferCh := checkQueue.Watch() transferCh := checkQueue.Watch()
done := make(chan int) done := make(chan int)

@ -15,7 +15,7 @@ func TestManifestIsConfigurable(t *testing.T) {
})) }))
require.Nil(t, err) require.Nil(t, err)
m := tq.NewManifestWithClient(cli, "", "") m := tq.NewManifestWithClient(cli)
assert.Equal(t, 3, m.MaxRetries()) assert.Equal(t, 3, m.MaxRetries())
} }
@ -27,7 +27,7 @@ func TestManifestChecksNTLM(t *testing.T) {
})) }))
require.Nil(t, err) require.Nil(t, err)
m := tq.NewManifestWithClient(cli, "", "") m := tq.NewManifestWithClient(cli)
assert.Equal(t, 1, m.MaxRetries()) assert.Equal(t, 1, m.MaxRetries())
} }
@ -37,7 +37,7 @@ func TestManifestClampsValidValues(t *testing.T) {
})) }))
require.Nil(t, err) require.Nil(t, err)
m := tq.NewManifestWithClient(cli, "", "") m := tq.NewManifestWithClient(cli)
assert.Equal(t, 1, m.MaxRetries()) assert.Equal(t, 1, m.MaxRetries())
} }
@ -47,6 +47,6 @@ func TestManifestIgnoresNonInts(t *testing.T) {
})) }))
require.Nil(t, err) require.Nil(t, err)
m := tq.NewManifestWithClient(cli, "", "") m := tq.NewManifestWithClient(cli)
assert.Equal(t, 1, m.MaxRetries()) assert.Equal(t, 1, m.MaxRetries())
} }

@ -74,7 +74,7 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, download
func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb progress.CopyCallback) error { func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb progress.CopyCallback) error {
fmt.Fprintf(os.Stderr, "Downloading %s (%s)\n", workingfile, pb.FormatBytes(ptr.Size)) fmt.Fprintf(os.Stderr, "Downloading %s (%s)\n", workingfile, pb.FormatBytes(ptr.Size))
q := tq.NewTransferQueue(tq.Download, manifest) q := tq.NewTransferQueue(tq.Download, manifest, "")
q.Add(filepath.Base(workingfile), mediafile, ptr.Oid, ptr.Size) q.Add(filepath.Base(workingfile), mediafile, ptr.Oid, ptr.Size)
q.Wait() q.Wait()

@ -166,8 +166,8 @@ func buildTestData() (oidsExist, oidsMissing []TestObject, err error) {
outputs := repo.AddCommits([]*test.CommitInput{&commit}) outputs := repo.AddCommits([]*test.CommitInput{&commit})
// now upload // now upload
manifest := tq.NewManifestWithClient(apiClient, "upload", cfg.CurrentRemote) manifest := tq.NewManifestWithClient(apiClient)
uploadQueue := tq.NewTransferQueue(tq.Upload, manifest, tq.WithProgress(meter)) uploadQueue := tq.NewTransferQueue(tq.Upload, manifest, "", tq.WithProgress(meter))
for _, f := range outputs[0].Files { for _, f := range outputs[0].Files {
oidsExist = append(oidsExist, TestObject{Oid: f.Oid, Size: f.Size}) oidsExist = append(oidsExist, TestObject{Oid: f.Oid, Size: f.Size})

@ -21,6 +21,7 @@ type batchRequest struct {
} }
type batchResponse struct { type batchResponse struct {
Endpoint lfsapi.Endpoint
TransferAdapterName string `json:"transfer"` TransferAdapterName string `json:"transfer"`
Objects []*api.ObjectResource `json:"objects"` Objects []*api.ObjectResource `json:"objects"`
} }
@ -35,8 +36,8 @@ func (c *tqClient) Batch(remote string, bReq *batchRequest) (*batchResponse, *ht
bReq.TransferAdapterNames = nil bReq.TransferAdapterNames = nil
} }
e := c.Endpoints.Endpoint(bReq.Operation, remote) bRes.Endpoint = c.Endpoints.Endpoint(bReq.Operation, remote)
req, err := c.NewRequest("POST", e, "objects/batch", bReq) req, err := c.NewRequest("POST", bRes.Endpoint, "objects/batch", bReq)
if err != nil { if err != nil {
return nil, nil, errors.Wrap(err, "batch request") return nil, nil, errors.Wrap(err, "batch request")
} }

@ -15,7 +15,7 @@ func TestCustomTransferBasicConfig(t *testing.T) {
})) }))
require.Nil(t, err) require.Nil(t, err)
m := NewManifestWithClient(cli, "", "") m := NewManifestWithClient(cli)
u := m.NewUploadAdapter("testsimple") u := m.NewUploadAdapter("testsimple")
assert.NotNil(t, u, "Upload adapter should be present") assert.NotNil(t, u, "Upload adapter should be present")
cu, _ := u.(*customAdapter) cu, _ := u.(*customAdapter)
@ -44,7 +44,7 @@ func TestCustomTransferDownloadConfig(t *testing.T) {
})) }))
require.Nil(t, err) require.Nil(t, err)
m := NewManifestWithClient(cli, "", "") m := NewManifestWithClient(cli)
u := m.NewUploadAdapter("testdownload") u := m.NewUploadAdapter("testdownload")
assert.NotNil(t, u, "Upload adapter should always be created") assert.NotNil(t, u, "Upload adapter should always be created")
cu, _ := u.(*customAdapter) cu, _ := u.(*customAdapter)
@ -70,7 +70,7 @@ func TestCustomTransferUploadConfig(t *testing.T) {
})) }))
require.Nil(t, err) require.Nil(t, err)
m := NewManifestWithClient(cli, "", "") m := NewManifestWithClient(cli)
d := m.NewDownloadAdapter("testupload") d := m.NewDownloadAdapter("testupload")
assert.NotNil(t, d, "Download adapter should always be created") assert.NotNil(t, d, "Download adapter should always be created")
cd, _ := d.(*customAdapter) cd, _ := d.(*customAdapter)
@ -96,7 +96,7 @@ func TestCustomTransferBothConfig(t *testing.T) {
})) }))
require.Nil(t, err) require.Nil(t, err)
m := NewManifestWithClient(cli, "", "") m := NewManifestWithClient(cli)
d := m.NewDownloadAdapter("testboth") d := m.NewDownloadAdapter("testboth")
assert.NotNil(t, d, "Download adapter should be present") assert.NotNil(t, d, "Download adapter should be present")
cd, _ := d.(*customAdapter) cd, _ := d.(*customAdapter)

@ -45,13 +45,12 @@ func NewManifest() *Manifest {
return nil return nil
} }
return NewManifestWithClient(cli, "", "") return NewManifestWithClient(cli)
} }
func NewManifestWithClient(apiClient *lfsapi.Client, operation, remote string) *Manifest { func NewManifestWithClient(apiClient *lfsapi.Client) *Manifest {
m := &Manifest{ m := &Manifest{
apiClient: apiClient, apiClient: apiClient,
remote: remote,
downloadAdapterFuncs: make(map[string]NewAdapterFunc), downloadAdapterFuncs: make(map[string]NewAdapterFunc),
uploadAdapterFuncs: make(map[string]NewAdapterFunc), uploadAdapterFuncs: make(map[string]NewAdapterFunc),
} }
@ -73,10 +72,7 @@ func NewManifestWithClient(apiClient *lfsapi.Client, operation, remote string) *
m.maxRetries = defaultMaxRetries m.maxRetries = defaultMaxRetries
} }
e := apiClient.Endpoints.Endpoint(operation, remote) if m.concurrentTransfers < 1 {
if apiClient.Endpoints.AccessFor(e.Url) == lfsapi.NTLMAccess {
m.concurrentTransfers = 1
} else if m.concurrentTransfers < 1 {
m.concurrentTransfers = defaultConcurrentTransfers m.concurrentTransfers = defaultConcurrentTransfers
} }

@ -8,6 +8,7 @@ import (
"github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/lfsapi"
) )
type Direction int type Direction int
@ -157,9 +158,23 @@ type NewAdapterFunc func(name string, dir Direction) Adapter
type ProgressCallback func(name string, totalSize, readSoFar int64, readSinceLast int) error type ProgressCallback func(name string, totalSize, readSoFar int64, readSinceLast int) error
type AdapterConfig interface { type AdapterConfig interface {
APIClient() *lfsapi.Client
ConcurrentTransfers() int ConcurrentTransfers() int
} }
type adapterConfig struct {
apiClient *lfsapi.Client
concurrentTransfers int
}
func (c *adapterConfig) ConcurrentTransfers() int {
return c.concurrentTransfers
}
func (c *adapterConfig) APIClient() *lfsapi.Client {
return c.apiClient
}
// Adapter is implemented by types which can upload and/or download LFS // Adapter is implemented by types which can upload and/or download LFS
// file content to a remote store. Each Adapter accepts one or more requests // file content to a remote store. Each Adapter accepts one or more requests
// which it may schedule and parallelise in whatever way it chooses, clients of // which it may schedule and parallelise in whatever way it chooses, clients of

@ -6,6 +6,7 @@ import (
"github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/git-lfs/git-lfs/progress" "github.com/git-lfs/git-lfs/progress"
"github.com/rubyist/tracerx" "github.com/rubyist/tracerx"
) )
@ -148,11 +149,11 @@ func WithBufferDepth(depth int) Option {
} }
// NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter // NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *TransferQueue { func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue {
q := &TransferQueue{ q := &TransferQueue{
direction: dir, direction: dir,
client: &tqClient{Client: manifest.APIClient()}, client: &tqClient{Client: manifest.APIClient()},
remote: manifest.remote, remote: remote,
errorc: make(chan error), errorc: make(chan error),
transfers: make(map[string]*objectTuple), transfers: make(map[string]*objectTuple),
trMutex: &sync.Mutex{}, trMutex: &sync.Mutex{},
@ -369,7 +370,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
} }
} }
retries := q.addToAdapter(toTransfer) retries := q.addToAdapter(bRes.Endpoint, toTransfer)
for t := range retries { for t := range retries {
q.rc.Increment(t.Oid) q.rc.Increment(t.Oid)
count := q.rc.CountFor(t.Oid) count := q.rc.CountFor(t.Oid)
@ -392,10 +393,10 @@ func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) }
// closed. // closed.
// //
// addToAdapter returns immediately, and does not block. // addToAdapter returns immediately, and does not block.
func (q *TransferQueue) addToAdapter(pending []*Transfer) <-chan *objectTuple { func (q *TransferQueue) addToAdapter(e lfsapi.Endpoint, pending []*Transfer) <-chan *objectTuple {
retries := make(chan *objectTuple, len(pending)) retries := make(chan *objectTuple, len(pending))
if err := q.ensureAdapterBegun(); err != nil { if err := q.ensureAdapterBegun(e); err != nil {
close(retries) close(retries)
q.errorc <- err q.errorc <- err
@ -522,7 +523,7 @@ func (q *TransferQueue) transferKind() string {
} }
} }
func (q *TransferQueue) ensureAdapterBegun() error { func (q *TransferQueue) ensureAdapterBegun(e lfsapi.Endpoint) error {
q.adapterInitMutex.Lock() q.adapterInitMutex.Lock()
defer q.adapterInitMutex.Unlock() defer q.adapterInitMutex.Unlock()
@ -537,7 +538,7 @@ func (q *TransferQueue) ensureAdapterBegun() error {
} }
tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name()) tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name())
err := q.adapter.Begin(q.manifest, cb) err := q.adapter.Begin(toAdapterCfg(q.manifest, e), cb)
if err != nil { if err != nil {
return err return err
} }
@ -546,6 +547,15 @@ func (q *TransferQueue) ensureAdapterBegun() error {
return nil return nil
} }
func toAdapterCfg(m *Manifest, e lfsapi.Endpoint) AdapterConfig {
apiClient := m.APIClient()
concurrency := m.ConcurrentTransfers()
if apiClient.Endpoints.AccessFor(e.Url) == lfsapi.NTLMAccess {
concurrency = 1
}
return &adapterConfig{concurrentTransfers: concurrency, apiClient: apiClient}
}
// Wait waits for the queue to finish processing all transfers. Once Wait is // Wait waits for the queue to finish processing all transfers. Once Wait is
// called, Add will no longer add transfers to the queue. Any failed // called, Add will no longer add transfers to the queue. Any failed
// transfers will be automatically retried once. // transfers will be automatically retried once.

@ -123,7 +123,7 @@ func testAdapterRegButBasicOnly(t *testing.T) {
})) }))
require.Nil(t, err) require.Nil(t, err)
m := NewManifestWithClient(cli, "", "") m := NewManifestWithClient(cli)
assert := assert.New(t) assert := assert.New(t)