From 00623425a2822b041fd1bf97803ff834a8ebb0a2 Mon Sep 17 00:00:00 2001 From: "brian m. carlson" Date: Fri, 10 Mar 2023 15:24:21 +0000 Subject: [PATCH 1/2] tq: make Manifest an interface Right now, any time we instantiate a Manifest object, we create an API client, and when we create the API client, if we're using SSH, we try to make a connection to the server. However, we often instantiate a Manifest object when performing various functionality such as smudging data, which means that when a user creates an archive locally, they can be prompted for an SSH password, which is undesirable. Let's take a first step to fixing this by making Manifest an interface. Right now, it has one concrete version, a concreteManifest, which can be used to access the internals, and we provide methods to upgrade it from the interface to the concrete type and determine whether it's upgraded or not. We attempt to upgrade it any time we need to access its internals. In the future, we'll also offer a lazyManifest, which is lazy and will only instantiate the concreteManifest inside when we attempt to upgrade it to the latter. But for now, only implement the concreteManifest to make it clearer what's changing. Similarly, we make our TransferQueue upgradable so that we don't upgrade its Manifest right away. In both cases, we'll want to use the lazyManifest to delay the instantiation of the API client (and hence the starting of the SSH connection) in a future commit. --- commands/commands.go | 10 ++-- commands/lockverifier.go | 2 +- commands/pull.go | 10 ++-- commands/uploader.go | 2 +- lfs/gitfilter_smudge.go | 8 +-- lfs/lfs.go | 2 +- t/git-lfs-test-server-api/main.go | 14 ++--- t/git-lfs-test-server-api/testdownload.go | 6 +- t/git-lfs-test-server-api/testupload.go | 8 +-- tq/api.go | 6 +- tq/basic_download.go | 2 +- tq/basic_upload.go | 2 +- tq/custom.go | 4 +- tq/manifest.go | 68 +++++++++++++++++------ tq/ssh.go | 2 +- tq/transfer_queue.go | 51 ++++++++++++----- tq/tus_upload.go | 2 +- 17 files changed, 129 insertions(+), 70 deletions(-) diff --git a/commands/commands.go b/commands/commands.go index d86652bb..1b5cdf56 100644 --- a/commands/commands.go +++ b/commands/commands.go @@ -34,7 +34,7 @@ var ( ErrorWriter = newMultiWriter(os.Stderr, ErrorBuffer) OutputWriter = newMultiWriter(os.Stdout, ErrorBuffer) ManPages = make(map[string]string, 20) - tqManifest = make(map[string]*tq.Manifest) + tqManifest = make(map[string]tq.Manifest) cfg *config.Configuration apiClient *lfsapi.Client @@ -48,14 +48,14 @@ var ( // getTransferManifest builds a tq.Manifest from the global os and git // environments. -func getTransferManifest() *tq.Manifest { +func getTransferManifest() tq.Manifest { return getTransferManifestOperationRemote("", "") } // getTransferManifestOperationRemote builds a tq.Manifest from the global os // and git environments and operation-specific and remote-specific settings. // Operation must be "download", "upload", or the empty string. -func getTransferManifestOperationRemote(operation, remote string) *tq.Manifest { +func getTransferManifestOperationRemote(operation, remote string) tq.Manifest { c := getAPIClient() global.Lock() @@ -112,14 +112,14 @@ func newLockClient() *locking.Client { } // newDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download -func newDownloadCheckQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue { +func newDownloadCheckQueue(manifest tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue { return newDownloadQueue(manifest, remote, append(options, tq.DryRun(true), )...) } // newDownloadQueue builds a DownloadQueue, allowing concurrent downloads. -func newDownloadQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue { +func newDownloadQueue(manifest tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue { return tq.NewTransferQueue(tq.Download, manifest, remote, append(options, tq.RemoteRef(currentRemoteRef()), )...) diff --git a/commands/lockverifier.go b/commands/lockverifier.go index 360e4c0f..ee0cf0ad 100644 --- a/commands/lockverifier.go +++ b/commands/lockverifier.go @@ -151,7 +151,7 @@ func (lv *lockVerifier) newRefLocks(ref *git.Ref, l locking.Lock) *refLock { } } -func newLockVerifier(m *tq.Manifest) *lockVerifier { +func newLockVerifier(m tq.Manifest) *lockVerifier { lv := &lockVerifier{ endpoint: getAPIClient().Endpoints.Endpoint("upload", cfg.PushRemote()), verifiedRefs: make(map[string]bool), diff --git a/commands/pull.go b/commands/pull.go index 8e94cc8a..75f8ee92 100644 --- a/commands/pull.go +++ b/commands/pull.go @@ -39,7 +39,7 @@ func newSingleCheckout(gitEnv config.Environment, remote string) abstractCheckou } type abstractCheckout interface { - Manifest() *tq.Manifest + Manifest() tq.Manifest Skip() bool Run(*lfs.WrappedPointer) RunToPath(*lfs.WrappedPointer, string) error @@ -49,11 +49,11 @@ type abstractCheckout interface { type singleCheckout struct { gitIndexer *gitIndexer pathConverter lfs.PathConverter - manifest *tq.Manifest + manifest tq.Manifest remote string } -func (c *singleCheckout) Manifest() *tq.Manifest { +func (c *singleCheckout) Manifest() tq.Manifest { if c.manifest == nil { c.manifest = getTransferManifestOperationRemote("download", c.remote) } @@ -115,11 +115,11 @@ func (c *singleCheckout) Close() { } type noOpCheckout struct { - manifest *tq.Manifest + manifest tq.Manifest remote string } -func (c *noOpCheckout) Manifest() *tq.Manifest { +func (c *noOpCheckout) Manifest() tq.Manifest { if c.manifest == nil { c.manifest = getTransferManifestOperationRemote("download", c.remote) } diff --git a/commands/uploader.go b/commands/uploader.go index 09393ca8..f0e21372 100644 --- a/commands/uploader.go +++ b/commands/uploader.go @@ -72,7 +72,7 @@ func uploadRangeOrAll(g *lfs.GitScanner, ctx *uploadContext, q *tq.TransferQueue type uploadContext struct { Remote string DryRun bool - Manifest *tq.Manifest + Manifest tq.Manifest uploadedOids tools.StringSet gitfilter *lfs.GitFilter diff --git a/lfs/gitfilter_smudge.go b/lfs/gitfilter_smudge.go index 90cbbc61..e150b907 100644 --- a/lfs/gitfilter_smudge.go +++ b/lfs/gitfilter_smudge.go @@ -15,7 +15,7 @@ import ( "github.com/rubyist/tracerx" ) -func (f *GitFilter) SmudgeToFile(filename string, ptr *Pointer, download bool, manifest *tq.Manifest, cb tools.CopyCallback) error { +func (f *GitFilter) SmudgeToFile(filename string, ptr *Pointer, download bool, manifest tq.Manifest, cb tools.CopyCallback) error { tools.MkdirAll(filepath.Dir(filename), f.cfg) if stat, _ := os.Stat(filename); stat != nil && stat.Mode()&0200 == 0 { @@ -52,7 +52,7 @@ func (f *GitFilter) SmudgeToFile(filename string, ptr *Pointer, download bool, m return nil } -func (f *GitFilter) Smudge(writer io.Writer, ptr *Pointer, workingfile string, download bool, manifest *tq.Manifest, cb tools.CopyCallback) (int64, error) { +func (f *GitFilter) Smudge(writer io.Writer, ptr *Pointer, workingfile string, download bool, manifest tq.Manifest, cb tools.CopyCallback) (int64, error) { mediafile, err := f.ObjectPath(ptr.Oid) if err != nil { return 0, err @@ -99,7 +99,7 @@ func (f *GitFilter) Smudge(writer io.Writer, ptr *Pointer, workingfile string, d return n, nil } -func (f *GitFilter) downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb tools.CopyCallback) (int64, error) { +func (f *GitFilter) downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest tq.Manifest, cb tools.CopyCallback) (int64, error) { fmt.Fprintln(os.Stderr, tr.Tr.Get("Downloading %s (%s)", workingfile, humanize.FormatBytes(uint64(ptr.Size)))) // NOTE: if given, "cb" is a tools.CopyCallback which writes updates @@ -131,7 +131,7 @@ func (f *GitFilter) downloadFile(writer io.Writer, ptr *Pointer, workingfile, me return f.readLocalFile(writer, ptr, mediafile, workingfile, nil) } -func (f *GitFilter) downloadFileFallBack(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb tools.CopyCallback) (int64, error) { +func (f *GitFilter) downloadFileFallBack(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest tq.Manifest, cb tools.CopyCallback) (int64, error) { // Attempt to find the LFS objects in all currently registered remotes. // When a valid remote is found, this remote is taken persistent for // future attempts within downloadFile(). In best case, the ordinary diff --git a/lfs/lfs.go b/lfs/lfs.go index ec3850ba..37895e9d 100644 --- a/lfs/lfs.go +++ b/lfs/lfs.go @@ -15,7 +15,7 @@ import ( "github.com/rubyist/tracerx" ) -func Environ(cfg *config.Configuration, manifest *tq.Manifest, envOverrides map[string]string) []string { +func Environ(cfg *config.Configuration, manifest tq.Manifest, envOverrides map[string]string) []string { osEnviron := os.Environ() env := make([]string, 0, len(osEnviron)+7) diff --git a/t/git-lfs-test-server-api/main.go b/t/git-lfs-test-server-api/main.go index 58ec2d1a..f897fd6e 100644 --- a/t/git-lfs-test-server-api/main.go +++ b/t/git-lfs-test-server-api/main.go @@ -27,7 +27,7 @@ type TestObject struct { type ServerTest struct { Name string - F func(m *tq.Manifest, oidsExist, oidsMissing []TestObject) error + F func(m tq.Manifest, oidsExist, oidsMissing []TestObject) error } var ( @@ -138,7 +138,7 @@ func (*testDataCallback) Errorf(format string, args ...interface{}) { fmt.Printf(format, args...) } -func buildManifest(r *t.Repo) (*tq.Manifest, error) { +func buildManifest(r *t.Repo) (tq.Manifest, error) { // Configure the endpoint manually finder := lfsapi.NewEndpointFinder(r) @@ -176,7 +176,7 @@ func (c *constantEndpoint) Endpoint(operation, remote string) lfshttp.Endpoint { func (c *constantEndpoint) RemoteEndpoint(operation, remote string) lfshttp.Endpoint { return c.e } -func buildTestData(repo *t.Repo, manifest *tq.Manifest) (oidsExist, oidsMissing []TestObject, err error) { +func buildTestData(repo *t.Repo, manifest tq.Manifest) (oidsExist, oidsMissing []TestObject, err error) { const oidCount = 50 oidsExist = make([]TestObject, 0, oidCount) oidsMissing = make([]TestObject, 0, oidCount) @@ -242,7 +242,7 @@ func saveTestOids(filename string, objs []TestObject) { } -func runTests(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) bool { +func runTests(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) bool { ok := true fmt.Printf("Running %d tests...\n", len(tests)) for _, t := range tests { @@ -254,7 +254,7 @@ func runTests(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) bool { return ok } -func runTest(t ServerTest, manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { +func runTest(t ServerTest, manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error { const linelen = 70 line := t.Name if len(line) > linelen { @@ -280,11 +280,11 @@ func exit(format string, args ...interface{}) { os.Exit(2) } -func addTest(name string, f func(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error) { +func addTest(name string, f func(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error) { tests = append(tests, ServerTest{Name: name, F: f}) } -func callBatchApi(manifest *tq.Manifest, dir tq.Direction, objs []TestObject) ([]*tq.Transfer, error) { +func callBatchApi(manifest tq.Manifest, dir tq.Direction, objs []TestObject) ([]*tq.Transfer, error) { apiobjs := make([]*tq.Transfer, 0, len(objs)) for _, o := range objs { apiobjs = append(apiobjs, &tq.Transfer{Oid: o.Oid, Size: o.Size}) diff --git a/t/git-lfs-test-server-api/testdownload.go b/t/git-lfs-test-server-api/testdownload.go index 1e9e5eaf..ba861c33 100644 --- a/t/git-lfs-test-server-api/testdownload.go +++ b/t/git-lfs-test-server-api/testdownload.go @@ -10,7 +10,7 @@ import ( ) // "download" - all present -func downloadAllExist(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { +func downloadAllExist(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error { retobjs, err := callBatchApi(manifest, tq.Download, oidsExist) if err != nil { @@ -37,7 +37,7 @@ func downloadAllExist(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject } // "download" - all missing (test includes 404 error entry) -func downloadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { +func downloadAllMissing(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error { retobjs, err := callBatchApi(manifest, tq.Download, oidsMissing) if err != nil { @@ -69,7 +69,7 @@ func downloadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObje } // "download" - mixture -func downloadMixed(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { +func downloadMixed(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error { existSet := tools.NewStringSetWithCapacity(len(oidsExist)) for _, o := range oidsExist { existSet.Add(o.Oid) diff --git a/t/git-lfs-test-server-api/testupload.go b/t/git-lfs-test-server-api/testupload.go index e3553913..26c90628 100644 --- a/t/git-lfs-test-server-api/testupload.go +++ b/t/git-lfs-test-server-api/testupload.go @@ -10,7 +10,7 @@ import ( ) // "upload" - all missing -func uploadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { +func uploadAllMissing(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error { retobjs, err := callBatchApi(manifest, tq.Upload, oidsMissing) if err != nil { @@ -38,7 +38,7 @@ func uploadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject } // "upload" - all present -func uploadAllExists(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { +func uploadAllExists(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error { retobjs, err := callBatchApi(manifest, tq.Upload, oidsExist) if err != nil { @@ -65,7 +65,7 @@ func uploadAllExists(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) } // "upload" - mix of missing & present -func uploadMixed(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { +func uploadMixed(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error { existSet := tools.NewStringSetWithCapacity(len(oidsExist)) for _, o := range oidsExist { existSet.Add(o.Oid) @@ -109,7 +109,7 @@ func uploadMixed(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) err } -func uploadEdgeCases(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { +func uploadEdgeCases(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error { errorCases := make([]TestObject, 0, 5) errorCodeMap := make(map[string]int, 5) errorReasonMap := make(map[string]string, 5) diff --git a/tq/api.go b/tq/api.go index cd79fbcf..42717c8d 100644 --- a/tq/api.go +++ b/tq/api.go @@ -35,12 +35,14 @@ type BatchResponse struct { endpoint lfshttp.Endpoint } -func Batch(m *Manifest, dir Direction, remote string, remoteRef *git.Ref, objects []*Transfer) (*BatchResponse, error) { +func Batch(m Manifest, dir Direction, remote string, remoteRef *git.Ref, objects []*Transfer) (*BatchResponse, error) { if len(objects) == 0 { return &BatchResponse{}, nil } - return m.batchClient().Batch(remote, &batchRequest{ + cm := m.Upgrade() + + return cm.batchClient().Batch(remote, &batchRequest{ Operation: dir.String(), Objects: objects, TransferAdapterNames: m.GetAdapterNames(dir), diff --git a/tq/basic_download.go b/tq/basic_download.go index 67489d4e..9250a99d 100644 --- a/tq/basic_download.go +++ b/tq/basic_download.go @@ -261,7 +261,7 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk return err } -func configureBasicDownloadAdapter(m *Manifest) { +func configureBasicDownloadAdapter(m *concreteManifest) { m.RegisterNewAdapterFunc(BasicAdapterName, Download, func(name string, dir Direction) Adapter { switch dir { case Download: diff --git a/tq/basic_upload.go b/tq/basic_upload.go index d0ca0307..cabc8c4d 100644 --- a/tq/basic_upload.go +++ b/tq/basic_upload.go @@ -209,7 +209,7 @@ func newStartCallbackReader(r lfsapi.ReadSeekCloser, cb func() error) *startCall } } -func configureBasicUploadAdapter(m *Manifest) { +func configureBasicUploadAdapter(m *concreteManifest) { m.RegisterNewAdapterFunc(BasicAdapterName, Upload, func(name string, dir Direction) Adapter { switch dir { case Upload: diff --git a/tq/custom.go b/tq/custom.go index 773a6aa4..060e77db 100644 --- a/tq/custom.go +++ b/tq/custom.go @@ -351,7 +351,7 @@ const ( standaloneFileName = "lfs-standalone-file" ) -func configureDefaultCustomAdapters(git Env, m *Manifest) { +func configureDefaultCustomAdapters(git Env, m *concreteManifest) { newfunc := func(name string, dir Direction) Adapter { standalone := m.standaloneTransferAgent != "" return newCustomAdapter(m.fs, standaloneFileName, dir, "git-lfs", "standalone-file", false, standalone) @@ -361,7 +361,7 @@ func configureDefaultCustomAdapters(git Env, m *Manifest) { } // Initialise custom adapters based on current config -func configureCustomAdapters(git Env, m *Manifest) { +func configureCustomAdapters(git Env, m *concreteManifest) { configureDefaultCustomAdapters(git, m) pathRegex := regexp.MustCompile(`lfs.customtransfer.([^.]+).path`) diff --git a/tq/manifest.go b/tq/manifest.go index 9f8c845c..ea080ea8 100644 --- a/tq/manifest.go +++ b/tq/manifest.go @@ -17,7 +17,27 @@ const ( defaultConcurrentTransfers = 8 ) -type Manifest struct { +type Manifest interface { + APIClient() *lfsapi.Client + MaxRetries() int + MaxRetryDelay() int + ConcurrentTransfers() int + IsStandaloneTransfer() bool + batchClient() BatchClient + GetAdapterNames(dir Direction) []string + GetDownloadAdapterNames() []string + GetUploadAdapterNames() []string + getAdapterNames(adapters map[string]NewAdapterFunc) []string + RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc) + NewAdapterOrDefault(name string, dir Direction) Adapter + NewAdapter(name string, dir Direction) Adapter + NewDownloadAdapter(name string) Adapter + NewUploadAdapter(name string) Adapter + Upgrade() *concreteManifest + Upgraded() bool +} + +type concreteManifest struct { // maxRetries is the maximum number of retries a single object can // attempt to make before it will be dropped. maxRetryDelay is the maximum // time in seconds to wait between retry attempts when using backoff. @@ -36,34 +56,46 @@ type Manifest struct { mu sync.Mutex } -func (m *Manifest) APIClient() *lfsapi.Client { +func (m *concreteManifest) APIClient() *lfsapi.Client { return m.apiClient } -func (m *Manifest) MaxRetries() int { +func (m *concreteManifest) MaxRetries() int { return m.maxRetries } -func (m *Manifest) MaxRetryDelay() int { +func (m *concreteManifest) MaxRetryDelay() int { return m.maxRetryDelay } -func (m *Manifest) ConcurrentTransfers() int { +func (m *concreteManifest) ConcurrentTransfers() int { return m.concurrentTransfers } -func (m *Manifest) IsStandaloneTransfer() bool { +func (m *concreteManifest) IsStandaloneTransfer() bool { return m.standaloneTransferAgent != "" } -func (m *Manifest) batchClient() BatchClient { +func (m *concreteManifest) batchClient() BatchClient { if r := m.MaxRetries(); r > 0 { m.batchClientAdapter.SetMaxRetries(r) } return m.batchClientAdapter } -func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) *Manifest { +func (m *concreteManifest) Upgrade() *concreteManifest { + return m +} + +func (m *concreteManifest) Upgraded() bool { + return true +} + +func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) Manifest { + return newConcreteManifest(f, apiClient, operation, remote) +} + +func newConcreteManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) *concreteManifest { if apiClient == nil { cli, err := lfsapi.NewClient(nil) if err != nil { @@ -79,7 +111,7 @@ func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote s useSSHMultiplexing = sshTransfer.IsMultiplexingEnabled() } - m := &Manifest{ + m := &concreteManifest{ fs: f, apiClient: apiClient, batchClientAdapter: &tqClient{Client: apiClient}, @@ -164,7 +196,7 @@ func findStandaloneTransfer(client *lfsapi.Client, operation, remote string) str } // GetAdapterNames returns a list of the names of adapters available to be created -func (m *Manifest) GetAdapterNames(dir Direction) []string { +func (m *concreteManifest) GetAdapterNames(dir Direction) []string { switch dir { case Upload: return m.GetUploadAdapterNames() @@ -175,17 +207,17 @@ func (m *Manifest) GetAdapterNames(dir Direction) []string { } // GetDownloadAdapterNames returns a list of the names of download adapters available to be created -func (m *Manifest) GetDownloadAdapterNames() []string { +func (m *concreteManifest) GetDownloadAdapterNames() []string { return m.getAdapterNames(m.downloadAdapterFuncs) } // GetUploadAdapterNames returns a list of the names of upload adapters available to be created -func (m *Manifest) GetUploadAdapterNames() []string { +func (m *concreteManifest) GetUploadAdapterNames() []string { return m.getAdapterNames(m.uploadAdapterFuncs) } // getAdapterNames returns a list of the names of adapters available to be created -func (m *Manifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string { +func (m *concreteManifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string { if m.basicTransfersOnly { return []string{BasicAdapterName} } @@ -203,7 +235,7 @@ func (m *Manifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string // RegisterNewTransferAdapterFunc registers a new function for creating upload // or download adapters. If a function with that name & direction is already // registered, it is overridden -func (m *Manifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc) { +func (m *concreteManifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc) { m.mu.Lock() defer m.mu.Unlock() @@ -216,7 +248,7 @@ func (m *Manifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapt } // Create a new adapter by name and direction; default to BasicAdapterName if doesn't exist -func (m *Manifest) NewAdapterOrDefault(name string, dir Direction) Adapter { +func (m *concreteManifest) NewAdapterOrDefault(name string, dir Direction) Adapter { if len(name) == 0 { name = BasicAdapterName } @@ -230,7 +262,7 @@ func (m *Manifest) NewAdapterOrDefault(name string, dir Direction) Adapter { } // Create a new adapter by name and direction, or nil if doesn't exist -func (m *Manifest) NewAdapter(name string, dir Direction) Adapter { +func (m *concreteManifest) NewAdapter(name string, dir Direction) Adapter { m.mu.Lock() defer m.mu.Unlock() @@ -248,12 +280,12 @@ func (m *Manifest) NewAdapter(name string, dir Direction) Adapter { } // Create a new download adapter by name, or BasicAdapterName if doesn't exist -func (m *Manifest) NewDownloadAdapter(name string) Adapter { +func (m *concreteManifest) NewDownloadAdapter(name string) Adapter { return m.NewAdapterOrDefault(name, Download) } // Create a new upload adapter by name, or BasicAdapterName if doesn't exist -func (m *Manifest) NewUploadAdapter(name string) Adapter { +func (m *concreteManifest) NewUploadAdapter(name string) Adapter { return m.NewAdapterOrDefault(name, Upload) } diff --git a/tq/ssh.go b/tq/ssh.go index 312bf910..fc8c7ca3 100644 --- a/tq/ssh.go +++ b/tq/ssh.go @@ -407,7 +407,7 @@ func (a *SSHAdapter) Trace(format string, args ...interface{}) { tracerx.Printf(format, args...) } -func configureSSHAdapter(m *Manifest) { +func configureSSHAdapter(m *concreteManifest) { m.RegisterNewAdapterFunc("ssh", Upload, func(name string, dir Direction) Adapter { a := &SSHAdapter{newAdapterBase(m.fs, name, dir, nil), nil, m.sshTransfer} a.transferImpl = a diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index 9ecb9a8e..a63552f3 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -209,7 +209,7 @@ type TransferQueue struct { // once per unique OID on Add(), and is decremented when that transfer // is marked as completed or failed, but not retried. wait *abortableWaitGroup - manifest *Manifest + manifest Manifest rc *retryCounter // unsupportedContentType indicates whether the transfer queue ever saw @@ -298,10 +298,9 @@ func WithBufferDepth(depth int) Option { } // NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter -func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue { +func NewTransferQueue(dir Direction, manifest Manifest, remote string, options ...Option) *TransferQueue { q := &TransferQueue{ direction: dir, - client: &tqClient{Client: manifest.APIClient()}, remote: remote, errorc: make(chan error), transfers: make(map[string]*objects), @@ -315,10 +314,6 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options opt(q) } - q.rc.MaxRetries = q.manifest.maxRetries - q.rc.MaxRetryDelay = q.manifest.maxRetryDelay - q.client.SetMaxRetries(q.manifest.maxRetries) - if q.batchSize <= 0 { q.batchSize = defaultBatchSize } @@ -337,6 +332,18 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options return q } +// Ensure we have a concrete manifest and that certain delayed variables are set +// properly. +func (q *TransferQueue) Upgrade() { + if q.client == nil { + manifest := q.manifest.Upgrade() + q.client = &tqClient{Client: manifest.APIClient()} + q.rc.MaxRetries = manifest.maxRetries + q.rc.MaxRetryDelay = manifest.maxRetryDelay + q.client.SetMaxRetries(manifest.maxRetries) + } +} + // Add adds a *Transfer to the transfer queue. It only increments the amount // of waiting the TransferQueue has to do if the *Transfer "t" is new. // @@ -347,6 +354,8 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options // Only one file will be transferred to/from the Path element of the first // transfer. func (q *TransferQueue) Add(name, path, oid string, size int64, missing bool, err error) { + q.Upgrade() + if err != nil { q.errorc <- err return @@ -384,6 +393,8 @@ func (q *TransferQueue) Add(name, path, oid string, size int64, missing bool, er // // It returns if the value is new or not. func (q *TransferQueue) remember(t *objectTuple) objects { + q.Upgrade() + q.trMutex.Lock() defer q.trMutex.Unlock() @@ -498,6 +509,8 @@ func (q *TransferQueue) collectBatches() { // A "pending" batch is returned, along with whether or not "q.incoming" is // closed. func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) { + q.Upgrade() + for { select { case t, ok := <-q.incoming: @@ -525,6 +538,8 @@ func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch // enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been // processed. func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) { + q.Upgrade() + next := q.makeBatch() tracerx.Printf("tq: sending batch of size %d", len(batch)) @@ -548,7 +563,8 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) q.meter.Pause() var bRes *BatchResponse - if q.manifest.standaloneTransferAgent != "" { + manifest := q.manifest.Upgrade() + if manifest.standaloneTransferAgent != "" { // Trust the external transfer agent can do everything by itself. objects := make([]*Transfer, 0, len(batch)) for _, t := range batch { @@ -556,7 +572,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) } bRes = &BatchResponse{ Objects: objects, - TransferAdapterName: q.manifest.standaloneTransferAgent, + TransferAdapterName: manifest.standaloneTransferAgent, } } else { // Query the Git LFS server for what transfer method to use and @@ -651,7 +667,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) q.Skip(o.Size) q.wait.Done() } - } else if a == nil && q.manifest.standaloneTransferAgent == "" { + } else if a == nil && manifest.standaloneTransferAgent == "" { q.Skip(o.Size) q.wait.Done() } else { @@ -680,6 +696,8 @@ func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) } // // addToAdapter returns immediately, and does not block. func (q *TransferQueue) addToAdapter(e lfshttp.Endpoint, pending []*Transfer) <-chan *objectTuple { + q.Upgrade() + retries := make(chan *objectTuple, len(pending)) if err := q.ensureAdapterBegun(e); err != nil { @@ -718,6 +736,8 @@ func (q *TransferQueue) addToAdapter(e lfshttp.Endpoint, pending []*Transfer) <- } func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) { + q.Upgrade() + if q.direction != Upload { return transfers, nil } @@ -887,6 +907,8 @@ func (q *TransferQueue) Skip(size int64) { } func (q *TransferQueue) ensureAdapterBegun(e lfshttp.Endpoint) error { + q.Upgrade() + q.adapterInitMutex.Lock() defer q.adapterInitMutex.Unlock() @@ -947,9 +969,12 @@ func (q *TransferQueue) Wait() { q.meter.Flush() q.errorwait.Wait() - if q.manifest.sshTransfer != nil { - q.manifest.sshTransfer.Shutdown() - q.manifest.sshTransfer = nil + if q.manifest.Upgraded() { + manifest := q.manifest.Upgrade() + if manifest.sshTransfer != nil { + manifest.sshTransfer.Shutdown() + manifest.sshTransfer = nil + } } if q.unsupportedContentType { diff --git a/tq/tus_upload.go b/tq/tus_upload.go index 2de491d7..75f96435 100644 --- a/tq/tus_upload.go +++ b/tq/tus_upload.go @@ -156,7 +156,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC return verifyUpload(a.apiClient, a.remote, t) } -func configureTusAdapter(m *Manifest) { +func configureTusAdapter(m *concreteManifest) { m.RegisterNewAdapterFunc(TusAdapterName, Upload, func(name string, dir Direction) Adapter { switch dir { case Upload: From a0065c0a484cd69fc2c20fc16512488f6a7e6e3e Mon Sep 17 00:00:00 2001 From: "brian m. carlson" Date: Fri, 10 Mar 2023 16:18:00 +0000 Subject: [PATCH 2/2] tq: avoid spawning SSH process needlessly When a user invokes `git archive` with LFS files, `git lfs filter-process` is invoked to smudge the LFS files. However, currently when we instantiate the manifest object as part of that, an attempt is made to connect to the remote using SSH, which we don't want to do unless necessary. For example, if the user already has all the files locally, the network connection is needless and serves only to waste resources. In the previous commit, we made our manifest an abstract interface with a single implementing type: a concrete manifest. Now, introduce a lazy manifest, which can upgrade to a concrete manifest but doesn't instantiate one until that happens. This allows us to instantiate a manifest without making the SSH connection, and we can delay the SSH connection until it's really needed, if at all. Add a test for this case as well. --- t/t-filter-process.sh | 30 ++++++++++++++ tq/manifest.go | 91 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 120 insertions(+), 1 deletion(-) diff --git a/t/t-filter-process.sh b/t/t-filter-process.sh index 2ff4bf17..c8cf3059 100755 --- a/t/t-filter-process.sh +++ b/t/t-filter-process.sh @@ -241,3 +241,33 @@ begin_test "filter process: checking out a branch with --skip-smudge and checkou assert_pointer "b" "b.dat" "$contents_b_oid" 10 ) end_test + +begin_test "filter process: git archive does not invoke SSH" +( + set -e + + setup_pure_ssh + + reponame="filter-process-archive" + setup_remote_repo "$reponame" + clone_repo "$reponame" "$reponame" + + sshurl=$(ssh_remote "$reponame") + git config lfs.url "$sshurl" + + contents="test" + git lfs track "*.dat" + printf "%s" "$contents" > test.dat + git add .gitattributes test.dat + git commit -m "initial commit" + + git push origin main 2>&1 + cd .. + GIT_TRACE=1 git clone "$sshurl" "$reponame-2" 2>&1 | tee trace.log + grep "lfs-ssh-echo.*git-lfs-transfer .*$reponame.git download" trace.log + cd "$reponame-2" + GIT_TRACE=1 GIT_TRACE_PACKET=1 git archive -o foo.tar HEAD 2>&1 | tee archive.log + grep 'pure SSH' archive.log && exit 1 + true +) +end_test diff --git a/tq/manifest.go b/tq/manifest.go index ea080ea8..8928f674 100644 --- a/tq/manifest.go +++ b/tq/manifest.go @@ -37,6 +37,95 @@ type Manifest interface { Upgraded() bool } +type lazyManifest struct { + f *fs.Filesystem + apiClient *lfsapi.Client + operation string + remote string + m *concreteManifest +} + +func newLazyManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) *lazyManifest { + return &lazyManifest{ + f: f, + apiClient: apiClient, + operation: operation, + remote: remote, + m: nil, + } +} + +func (m *lazyManifest) APIClient() *lfsapi.Client { + return m.Upgrade().APIClient() +} + +func (m *lazyManifest) MaxRetries() int { + return m.Upgrade().MaxRetries() +} + +func (m *lazyManifest) MaxRetryDelay() int { + return m.Upgrade().MaxRetryDelay() +} + +func (m *lazyManifest) ConcurrentTransfers() int { + return m.Upgrade().ConcurrentTransfers() +} + +func (m *lazyManifest) IsStandaloneTransfer() bool { + return m.Upgrade().IsStandaloneTransfer() +} + +func (m *lazyManifest) batchClient() BatchClient { + return m.Upgrade().batchClient() +} + +func (m *lazyManifest) GetAdapterNames(dir Direction) []string { + return m.Upgrade().GetAdapterNames(dir) +} + +func (m *lazyManifest) GetDownloadAdapterNames() []string { + return m.Upgrade().GetDownloadAdapterNames() +} + +func (m *lazyManifest) GetUploadAdapterNames() []string { + return m.Upgrade().GetUploadAdapterNames() +} + +func (m *lazyManifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string { + return m.Upgrade().getAdapterNames(adapters) +} + +func (m *lazyManifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc) { + m.Upgrade().RegisterNewAdapterFunc(name, dir, f) +} + +func (m *lazyManifest) NewAdapterOrDefault(name string, dir Direction) Adapter { + return m.Upgrade().NewAdapterOrDefault(name, dir) +} + +func (m *lazyManifest) NewAdapter(name string, dir Direction) Adapter { + return m.Upgrade().NewAdapter(name, dir) +} + +func (m *lazyManifest) NewDownloadAdapter(name string) Adapter { + return m.Upgrade().NewDownloadAdapter(name) +} + +func (m *lazyManifest) NewUploadAdapter(name string) Adapter { + return m.Upgrade().NewUploadAdapter(name) +} + +func (m *lazyManifest) Upgrade() *concreteManifest { + if m.m == nil { + m.m = newConcreteManifest(m.f, m.apiClient, m.operation, m.remote) + } + return m.m +} + +func (m *lazyManifest) Upgraded() bool { + return m.m != nil +} + type concreteManifest struct { // maxRetries is the maximum number of retries a single object can // attempt to make before it will be dropped. maxRetryDelay is the maximum @@ -92,7 +181,7 @@ func (m *concreteManifest) Upgraded() bool { } func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) Manifest { - return newConcreteManifest(f, apiClient, operation, remote) + return newLazyManifest(f, apiClient, operation, remote) } func newConcreteManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) *concreteManifest {