Merge pull request #5309 from bk2204/ssh-transfer-manifest

Avoid needlessly spawning SSH connections with `git archive`
This commit is contained in:
brian m. carlson 2023-03-28 16:57:11 +00:00 committed by GitHub
commit 8434ced48e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 248 additions and 70 deletions

@ -34,7 +34,7 @@ var (
ErrorWriter = newMultiWriter(os.Stderr, ErrorBuffer) ErrorWriter = newMultiWriter(os.Stderr, ErrorBuffer)
OutputWriter = newMultiWriter(os.Stdout, ErrorBuffer) OutputWriter = newMultiWriter(os.Stdout, ErrorBuffer)
ManPages = make(map[string]string, 20) ManPages = make(map[string]string, 20)
tqManifest = make(map[string]*tq.Manifest) tqManifest = make(map[string]tq.Manifest)
cfg *config.Configuration cfg *config.Configuration
apiClient *lfsapi.Client apiClient *lfsapi.Client
@ -48,14 +48,14 @@ var (
// getTransferManifest builds a tq.Manifest from the global os and git // getTransferManifest builds a tq.Manifest from the global os and git
// environments. // environments.
func getTransferManifest() *tq.Manifest { func getTransferManifest() tq.Manifest {
return getTransferManifestOperationRemote("", "") return getTransferManifestOperationRemote("", "")
} }
// getTransferManifestOperationRemote builds a tq.Manifest from the global os // getTransferManifestOperationRemote builds a tq.Manifest from the global os
// and git environments and operation-specific and remote-specific settings. // and git environments and operation-specific and remote-specific settings.
// Operation must be "download", "upload", or the empty string. // Operation must be "download", "upload", or the empty string.
func getTransferManifestOperationRemote(operation, remote string) *tq.Manifest { func getTransferManifestOperationRemote(operation, remote string) tq.Manifest {
c := getAPIClient() c := getAPIClient()
global.Lock() global.Lock()
@ -112,14 +112,14 @@ func newLockClient() *locking.Client {
} }
// newDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download // newDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download
func newDownloadCheckQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue { func newDownloadCheckQueue(manifest tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
return newDownloadQueue(manifest, remote, append(options, return newDownloadQueue(manifest, remote, append(options,
tq.DryRun(true), tq.DryRun(true),
)...) )...)
} }
// newDownloadQueue builds a DownloadQueue, allowing concurrent downloads. // newDownloadQueue builds a DownloadQueue, allowing concurrent downloads.
func newDownloadQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue { func newDownloadQueue(manifest tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
return tq.NewTransferQueue(tq.Download, manifest, remote, append(options, return tq.NewTransferQueue(tq.Download, manifest, remote, append(options,
tq.RemoteRef(currentRemoteRef()), tq.RemoteRef(currentRemoteRef()),
)...) )...)

@ -151,7 +151,7 @@ func (lv *lockVerifier) newRefLocks(ref *git.Ref, l locking.Lock) *refLock {
} }
} }
func newLockVerifier(m *tq.Manifest) *lockVerifier { func newLockVerifier(m tq.Manifest) *lockVerifier {
lv := &lockVerifier{ lv := &lockVerifier{
endpoint: getAPIClient().Endpoints.Endpoint("upload", cfg.PushRemote()), endpoint: getAPIClient().Endpoints.Endpoint("upload", cfg.PushRemote()),
verifiedRefs: make(map[string]bool), verifiedRefs: make(map[string]bool),

@ -39,7 +39,7 @@ func newSingleCheckout(gitEnv config.Environment, remote string) abstractCheckou
} }
type abstractCheckout interface { type abstractCheckout interface {
Manifest() *tq.Manifest Manifest() tq.Manifest
Skip() bool Skip() bool
Run(*lfs.WrappedPointer) Run(*lfs.WrappedPointer)
RunToPath(*lfs.WrappedPointer, string) error RunToPath(*lfs.WrappedPointer, string) error
@ -49,11 +49,11 @@ type abstractCheckout interface {
type singleCheckout struct { type singleCheckout struct {
gitIndexer *gitIndexer gitIndexer *gitIndexer
pathConverter lfs.PathConverter pathConverter lfs.PathConverter
manifest *tq.Manifest manifest tq.Manifest
remote string remote string
} }
func (c *singleCheckout) Manifest() *tq.Manifest { func (c *singleCheckout) Manifest() tq.Manifest {
if c.manifest == nil { if c.manifest == nil {
c.manifest = getTransferManifestOperationRemote("download", c.remote) c.manifest = getTransferManifestOperationRemote("download", c.remote)
} }
@ -115,11 +115,11 @@ func (c *singleCheckout) Close() {
} }
type noOpCheckout struct { type noOpCheckout struct {
manifest *tq.Manifest manifest tq.Manifest
remote string remote string
} }
func (c *noOpCheckout) Manifest() *tq.Manifest { func (c *noOpCheckout) Manifest() tq.Manifest {
if c.manifest == nil { if c.manifest == nil {
c.manifest = getTransferManifestOperationRemote("download", c.remote) c.manifest = getTransferManifestOperationRemote("download", c.remote)
} }

@ -72,7 +72,7 @@ func uploadRangeOrAll(g *lfs.GitScanner, ctx *uploadContext, q *tq.TransferQueue
type uploadContext struct { type uploadContext struct {
Remote string Remote string
DryRun bool DryRun bool
Manifest *tq.Manifest Manifest tq.Manifest
uploadedOids tools.StringSet uploadedOids tools.StringSet
gitfilter *lfs.GitFilter gitfilter *lfs.GitFilter

@ -15,7 +15,7 @@ import (
"github.com/rubyist/tracerx" "github.com/rubyist/tracerx"
) )
func (f *GitFilter) SmudgeToFile(filename string, ptr *Pointer, download bool, manifest *tq.Manifest, cb tools.CopyCallback) error { func (f *GitFilter) SmudgeToFile(filename string, ptr *Pointer, download bool, manifest tq.Manifest, cb tools.CopyCallback) error {
tools.MkdirAll(filepath.Dir(filename), f.cfg) tools.MkdirAll(filepath.Dir(filename), f.cfg)
if stat, _ := os.Stat(filename); stat != nil && stat.Mode()&0200 == 0 { if stat, _ := os.Stat(filename); stat != nil && stat.Mode()&0200 == 0 {
@ -52,7 +52,7 @@ func (f *GitFilter) SmudgeToFile(filename string, ptr *Pointer, download bool, m
return nil return nil
} }
func (f *GitFilter) Smudge(writer io.Writer, ptr *Pointer, workingfile string, download bool, manifest *tq.Manifest, cb tools.CopyCallback) (int64, error) { func (f *GitFilter) Smudge(writer io.Writer, ptr *Pointer, workingfile string, download bool, manifest tq.Manifest, cb tools.CopyCallback) (int64, error) {
mediafile, err := f.ObjectPath(ptr.Oid) mediafile, err := f.ObjectPath(ptr.Oid)
if err != nil { if err != nil {
return 0, err return 0, err
@ -99,7 +99,7 @@ func (f *GitFilter) Smudge(writer io.Writer, ptr *Pointer, workingfile string, d
return n, nil return n, nil
} }
func (f *GitFilter) downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb tools.CopyCallback) (int64, error) { func (f *GitFilter) downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest tq.Manifest, cb tools.CopyCallback) (int64, error) {
fmt.Fprintln(os.Stderr, tr.Tr.Get("Downloading %s (%s)", workingfile, humanize.FormatBytes(uint64(ptr.Size)))) fmt.Fprintln(os.Stderr, tr.Tr.Get("Downloading %s (%s)", workingfile, humanize.FormatBytes(uint64(ptr.Size))))
// NOTE: if given, "cb" is a tools.CopyCallback which writes updates // NOTE: if given, "cb" is a tools.CopyCallback which writes updates
@ -131,7 +131,7 @@ func (f *GitFilter) downloadFile(writer io.Writer, ptr *Pointer, workingfile, me
return f.readLocalFile(writer, ptr, mediafile, workingfile, nil) return f.readLocalFile(writer, ptr, mediafile, workingfile, nil)
} }
func (f *GitFilter) downloadFileFallBack(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb tools.CopyCallback) (int64, error) { func (f *GitFilter) downloadFileFallBack(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest tq.Manifest, cb tools.CopyCallback) (int64, error) {
// Attempt to find the LFS objects in all currently registered remotes. // Attempt to find the LFS objects in all currently registered remotes.
// When a valid remote is found, this remote is taken persistent for // When a valid remote is found, this remote is taken persistent for
// future attempts within downloadFile(). In best case, the ordinary // future attempts within downloadFile(). In best case, the ordinary

@ -15,7 +15,7 @@ import (
"github.com/rubyist/tracerx" "github.com/rubyist/tracerx"
) )
func Environ(cfg *config.Configuration, manifest *tq.Manifest, envOverrides map[string]string) []string { func Environ(cfg *config.Configuration, manifest tq.Manifest, envOverrides map[string]string) []string {
osEnviron := os.Environ() osEnviron := os.Environ()
env := make([]string, 0, len(osEnviron)+7) env := make([]string, 0, len(osEnviron)+7)

@ -27,7 +27,7 @@ type TestObject struct {
type ServerTest struct { type ServerTest struct {
Name string Name string
F func(m *tq.Manifest, oidsExist, oidsMissing []TestObject) error F func(m tq.Manifest, oidsExist, oidsMissing []TestObject) error
} }
var ( var (
@ -138,7 +138,7 @@ func (*testDataCallback) Errorf(format string, args ...interface{}) {
fmt.Printf(format, args...) fmt.Printf(format, args...)
} }
func buildManifest(r *t.Repo) (*tq.Manifest, error) { func buildManifest(r *t.Repo) (tq.Manifest, error) {
// Configure the endpoint manually // Configure the endpoint manually
finder := lfsapi.NewEndpointFinder(r) finder := lfsapi.NewEndpointFinder(r)
@ -176,7 +176,7 @@ func (c *constantEndpoint) Endpoint(operation, remote string) lfshttp.Endpoint {
func (c *constantEndpoint) RemoteEndpoint(operation, remote string) lfshttp.Endpoint { return c.e } func (c *constantEndpoint) RemoteEndpoint(operation, remote string) lfshttp.Endpoint { return c.e }
func buildTestData(repo *t.Repo, manifest *tq.Manifest) (oidsExist, oidsMissing []TestObject, err error) { func buildTestData(repo *t.Repo, manifest tq.Manifest) (oidsExist, oidsMissing []TestObject, err error) {
const oidCount = 50 const oidCount = 50
oidsExist = make([]TestObject, 0, oidCount) oidsExist = make([]TestObject, 0, oidCount)
oidsMissing = make([]TestObject, 0, oidCount) oidsMissing = make([]TestObject, 0, oidCount)
@ -242,7 +242,7 @@ func saveTestOids(filename string, objs []TestObject) {
} }
func runTests(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) bool { func runTests(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) bool {
ok := true ok := true
fmt.Printf("Running %d tests...\n", len(tests)) fmt.Printf("Running %d tests...\n", len(tests))
for _, t := range tests { for _, t := range tests {
@ -254,7 +254,7 @@ func runTests(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) bool {
return ok return ok
} }
func runTest(t ServerTest, manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { func runTest(t ServerTest, manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
const linelen = 70 const linelen = 70
line := t.Name line := t.Name
if len(line) > linelen { if len(line) > linelen {
@ -280,11 +280,11 @@ func exit(format string, args ...interface{}) {
os.Exit(2) os.Exit(2)
} }
func addTest(name string, f func(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error) { func addTest(name string, f func(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error) {
tests = append(tests, ServerTest{Name: name, F: f}) tests = append(tests, ServerTest{Name: name, F: f})
} }
func callBatchApi(manifest *tq.Manifest, dir tq.Direction, objs []TestObject) ([]*tq.Transfer, error) { func callBatchApi(manifest tq.Manifest, dir tq.Direction, objs []TestObject) ([]*tq.Transfer, error) {
apiobjs := make([]*tq.Transfer, 0, len(objs)) apiobjs := make([]*tq.Transfer, 0, len(objs))
for _, o := range objs { for _, o := range objs {
apiobjs = append(apiobjs, &tq.Transfer{Oid: o.Oid, Size: o.Size}) apiobjs = append(apiobjs, &tq.Transfer{Oid: o.Oid, Size: o.Size})

@ -10,7 +10,7 @@ import (
) )
// "download" - all present // "download" - all present
func downloadAllExist(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { func downloadAllExist(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
retobjs, err := callBatchApi(manifest, tq.Download, oidsExist) retobjs, err := callBatchApi(manifest, tq.Download, oidsExist)
if err != nil { if err != nil {
@ -37,7 +37,7 @@ func downloadAllExist(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject
} }
// "download" - all missing (test includes 404 error entry) // "download" - all missing (test includes 404 error entry)
func downloadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { func downloadAllMissing(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
retobjs, err := callBatchApi(manifest, tq.Download, oidsMissing) retobjs, err := callBatchApi(manifest, tq.Download, oidsMissing)
if err != nil { if err != nil {
@ -69,7 +69,7 @@ func downloadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObje
} }
// "download" - mixture // "download" - mixture
func downloadMixed(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { func downloadMixed(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
existSet := tools.NewStringSetWithCapacity(len(oidsExist)) existSet := tools.NewStringSetWithCapacity(len(oidsExist))
for _, o := range oidsExist { for _, o := range oidsExist {
existSet.Add(o.Oid) existSet.Add(o.Oid)

@ -10,7 +10,7 @@ import (
) )
// "upload" - all missing // "upload" - all missing
func uploadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { func uploadAllMissing(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
retobjs, err := callBatchApi(manifest, tq.Upload, oidsMissing) retobjs, err := callBatchApi(manifest, tq.Upload, oidsMissing)
if err != nil { if err != nil {
@ -38,7 +38,7 @@ func uploadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject
} }
// "upload" - all present // "upload" - all present
func uploadAllExists(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { func uploadAllExists(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
retobjs, err := callBatchApi(manifest, tq.Upload, oidsExist) retobjs, err := callBatchApi(manifest, tq.Upload, oidsExist)
if err != nil { if err != nil {
@ -65,7 +65,7 @@ func uploadAllExists(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject)
} }
// "upload" - mix of missing & present // "upload" - mix of missing & present
func uploadMixed(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { func uploadMixed(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
existSet := tools.NewStringSetWithCapacity(len(oidsExist)) existSet := tools.NewStringSetWithCapacity(len(oidsExist))
for _, o := range oidsExist { for _, o := range oidsExist {
existSet.Add(o.Oid) existSet.Add(o.Oid)
@ -109,7 +109,7 @@ func uploadMixed(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) err
} }
func uploadEdgeCases(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error { func uploadEdgeCases(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
errorCases := make([]TestObject, 0, 5) errorCases := make([]TestObject, 0, 5)
errorCodeMap := make(map[string]int, 5) errorCodeMap := make(map[string]int, 5)
errorReasonMap := make(map[string]string, 5) errorReasonMap := make(map[string]string, 5)

@ -241,3 +241,33 @@ begin_test "filter process: checking out a branch with --skip-smudge and checkou
assert_pointer "b" "b.dat" "$contents_b_oid" 10 assert_pointer "b" "b.dat" "$contents_b_oid" 10
) )
end_test end_test
begin_test "filter process: git archive does not invoke SSH"
(
set -e
setup_pure_ssh
reponame="filter-process-archive"
setup_remote_repo "$reponame"
clone_repo "$reponame" "$reponame"
sshurl=$(ssh_remote "$reponame")
git config lfs.url "$sshurl"
contents="test"
git lfs track "*.dat"
printf "%s" "$contents" > test.dat
git add .gitattributes test.dat
git commit -m "initial commit"
git push origin main 2>&1
cd ..
GIT_TRACE=1 git clone "$sshurl" "$reponame-2" 2>&1 | tee trace.log
grep "lfs-ssh-echo.*git-lfs-transfer .*$reponame.git download" trace.log
cd "$reponame-2"
GIT_TRACE=1 GIT_TRACE_PACKET=1 git archive -o foo.tar HEAD 2>&1 | tee archive.log
grep 'pure SSH' archive.log && exit 1
true
)
end_test

@ -35,12 +35,14 @@ type BatchResponse struct {
endpoint lfshttp.Endpoint endpoint lfshttp.Endpoint
} }
func Batch(m *Manifest, dir Direction, remote string, remoteRef *git.Ref, objects []*Transfer) (*BatchResponse, error) { func Batch(m Manifest, dir Direction, remote string, remoteRef *git.Ref, objects []*Transfer) (*BatchResponse, error) {
if len(objects) == 0 { if len(objects) == 0 {
return &BatchResponse{}, nil return &BatchResponse{}, nil
} }
return m.batchClient().Batch(remote, &batchRequest{ cm := m.Upgrade()
return cm.batchClient().Batch(remote, &batchRequest{
Operation: dir.String(), Operation: dir.String(),
Objects: objects, Objects: objects,
TransferAdapterNames: m.GetAdapterNames(dir), TransferAdapterNames: m.GetAdapterNames(dir),

@ -261,7 +261,7 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
return err return err
} }
func configureBasicDownloadAdapter(m *Manifest) { func configureBasicDownloadAdapter(m *concreteManifest) {
m.RegisterNewAdapterFunc(BasicAdapterName, Download, func(name string, dir Direction) Adapter { m.RegisterNewAdapterFunc(BasicAdapterName, Download, func(name string, dir Direction) Adapter {
switch dir { switch dir {
case Download: case Download:

@ -209,7 +209,7 @@ func newStartCallbackReader(r lfsapi.ReadSeekCloser, cb func() error) *startCall
} }
} }
func configureBasicUploadAdapter(m *Manifest) { func configureBasicUploadAdapter(m *concreteManifest) {
m.RegisterNewAdapterFunc(BasicAdapterName, Upload, func(name string, dir Direction) Adapter { m.RegisterNewAdapterFunc(BasicAdapterName, Upload, func(name string, dir Direction) Adapter {
switch dir { switch dir {
case Upload: case Upload:

@ -351,7 +351,7 @@ const (
standaloneFileName = "lfs-standalone-file" standaloneFileName = "lfs-standalone-file"
) )
func configureDefaultCustomAdapters(git Env, m *Manifest) { func configureDefaultCustomAdapters(git Env, m *concreteManifest) {
newfunc := func(name string, dir Direction) Adapter { newfunc := func(name string, dir Direction) Adapter {
standalone := m.standaloneTransferAgent != "" standalone := m.standaloneTransferAgent != ""
return newCustomAdapter(m.fs, standaloneFileName, dir, "git-lfs", "standalone-file", false, standalone) return newCustomAdapter(m.fs, standaloneFileName, dir, "git-lfs", "standalone-file", false, standalone)
@ -361,7 +361,7 @@ func configureDefaultCustomAdapters(git Env, m *Manifest) {
} }
// Initialise custom adapters based on current config // Initialise custom adapters based on current config
func configureCustomAdapters(git Env, m *Manifest) { func configureCustomAdapters(git Env, m *concreteManifest) {
configureDefaultCustomAdapters(git, m) configureDefaultCustomAdapters(git, m)
pathRegex := regexp.MustCompile(`lfs.customtransfer.([^.]+).path`) pathRegex := regexp.MustCompile(`lfs.customtransfer.([^.]+).path`)

@ -17,7 +17,116 @@ const (
defaultConcurrentTransfers = 8 defaultConcurrentTransfers = 8
) )
type Manifest struct { type Manifest interface {
APIClient() *lfsapi.Client
MaxRetries() int
MaxRetryDelay() int
ConcurrentTransfers() int
IsStandaloneTransfer() bool
batchClient() BatchClient
GetAdapterNames(dir Direction) []string
GetDownloadAdapterNames() []string
GetUploadAdapterNames() []string
getAdapterNames(adapters map[string]NewAdapterFunc) []string
RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc)
NewAdapterOrDefault(name string, dir Direction) Adapter
NewAdapter(name string, dir Direction) Adapter
NewDownloadAdapter(name string) Adapter
NewUploadAdapter(name string) Adapter
Upgrade() *concreteManifest
Upgraded() bool
}
type lazyManifest struct {
f *fs.Filesystem
apiClient *lfsapi.Client
operation string
remote string
m *concreteManifest
}
func newLazyManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) *lazyManifest {
return &lazyManifest{
f: f,
apiClient: apiClient,
operation: operation,
remote: remote,
m: nil,
}
}
func (m *lazyManifest) APIClient() *lfsapi.Client {
return m.Upgrade().APIClient()
}
func (m *lazyManifest) MaxRetries() int {
return m.Upgrade().MaxRetries()
}
func (m *lazyManifest) MaxRetryDelay() int {
return m.Upgrade().MaxRetryDelay()
}
func (m *lazyManifest) ConcurrentTransfers() int {
return m.Upgrade().ConcurrentTransfers()
}
func (m *lazyManifest) IsStandaloneTransfer() bool {
return m.Upgrade().IsStandaloneTransfer()
}
func (m *lazyManifest) batchClient() BatchClient {
return m.Upgrade().batchClient()
}
func (m *lazyManifest) GetAdapterNames(dir Direction) []string {
return m.Upgrade().GetAdapterNames(dir)
}
func (m *lazyManifest) GetDownloadAdapterNames() []string {
return m.Upgrade().GetDownloadAdapterNames()
}
func (m *lazyManifest) GetUploadAdapterNames() []string {
return m.Upgrade().GetUploadAdapterNames()
}
func (m *lazyManifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string {
return m.Upgrade().getAdapterNames(adapters)
}
func (m *lazyManifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc) {
m.Upgrade().RegisterNewAdapterFunc(name, dir, f)
}
func (m *lazyManifest) NewAdapterOrDefault(name string, dir Direction) Adapter {
return m.Upgrade().NewAdapterOrDefault(name, dir)
}
func (m *lazyManifest) NewAdapter(name string, dir Direction) Adapter {
return m.Upgrade().NewAdapter(name, dir)
}
func (m *lazyManifest) NewDownloadAdapter(name string) Adapter {
return m.Upgrade().NewDownloadAdapter(name)
}
func (m *lazyManifest) NewUploadAdapter(name string) Adapter {
return m.Upgrade().NewUploadAdapter(name)
}
func (m *lazyManifest) Upgrade() *concreteManifest {
if m.m == nil {
m.m = newConcreteManifest(m.f, m.apiClient, m.operation, m.remote)
}
return m.m
}
func (m *lazyManifest) Upgraded() bool {
return m.m != nil
}
type concreteManifest struct {
// maxRetries is the maximum number of retries a single object can // maxRetries is the maximum number of retries a single object can
// attempt to make before it will be dropped. maxRetryDelay is the maximum // attempt to make before it will be dropped. maxRetryDelay is the maximum
// time in seconds to wait between retry attempts when using backoff. // time in seconds to wait between retry attempts when using backoff.
@ -36,34 +145,46 @@ type Manifest struct {
mu sync.Mutex mu sync.Mutex
} }
func (m *Manifest) APIClient() *lfsapi.Client { func (m *concreteManifest) APIClient() *lfsapi.Client {
return m.apiClient return m.apiClient
} }
func (m *Manifest) MaxRetries() int { func (m *concreteManifest) MaxRetries() int {
return m.maxRetries return m.maxRetries
} }
func (m *Manifest) MaxRetryDelay() int { func (m *concreteManifest) MaxRetryDelay() int {
return m.maxRetryDelay return m.maxRetryDelay
} }
func (m *Manifest) ConcurrentTransfers() int { func (m *concreteManifest) ConcurrentTransfers() int {
return m.concurrentTransfers return m.concurrentTransfers
} }
func (m *Manifest) IsStandaloneTransfer() bool { func (m *concreteManifest) IsStandaloneTransfer() bool {
return m.standaloneTransferAgent != "" return m.standaloneTransferAgent != ""
} }
func (m *Manifest) batchClient() BatchClient { func (m *concreteManifest) batchClient() BatchClient {
if r := m.MaxRetries(); r > 0 { if r := m.MaxRetries(); r > 0 {
m.batchClientAdapter.SetMaxRetries(r) m.batchClientAdapter.SetMaxRetries(r)
} }
return m.batchClientAdapter return m.batchClientAdapter
} }
func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) *Manifest { func (m *concreteManifest) Upgrade() *concreteManifest {
return m
}
func (m *concreteManifest) Upgraded() bool {
return true
}
func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) Manifest {
return newLazyManifest(f, apiClient, operation, remote)
}
func newConcreteManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) *concreteManifest {
if apiClient == nil { if apiClient == nil {
cli, err := lfsapi.NewClient(nil) cli, err := lfsapi.NewClient(nil)
if err != nil { if err != nil {
@ -79,7 +200,7 @@ func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote s
useSSHMultiplexing = sshTransfer.IsMultiplexingEnabled() useSSHMultiplexing = sshTransfer.IsMultiplexingEnabled()
} }
m := &Manifest{ m := &concreteManifest{
fs: f, fs: f,
apiClient: apiClient, apiClient: apiClient,
batchClientAdapter: &tqClient{Client: apiClient}, batchClientAdapter: &tqClient{Client: apiClient},
@ -164,7 +285,7 @@ func findStandaloneTransfer(client *lfsapi.Client, operation, remote string) str
} }
// GetAdapterNames returns a list of the names of adapters available to be created // GetAdapterNames returns a list of the names of adapters available to be created
func (m *Manifest) GetAdapterNames(dir Direction) []string { func (m *concreteManifest) GetAdapterNames(dir Direction) []string {
switch dir { switch dir {
case Upload: case Upload:
return m.GetUploadAdapterNames() return m.GetUploadAdapterNames()
@ -175,17 +296,17 @@ func (m *Manifest) GetAdapterNames(dir Direction) []string {
} }
// GetDownloadAdapterNames returns a list of the names of download adapters available to be created // GetDownloadAdapterNames returns a list of the names of download adapters available to be created
func (m *Manifest) GetDownloadAdapterNames() []string { func (m *concreteManifest) GetDownloadAdapterNames() []string {
return m.getAdapterNames(m.downloadAdapterFuncs) return m.getAdapterNames(m.downloadAdapterFuncs)
} }
// GetUploadAdapterNames returns a list of the names of upload adapters available to be created // GetUploadAdapterNames returns a list of the names of upload adapters available to be created
func (m *Manifest) GetUploadAdapterNames() []string { func (m *concreteManifest) GetUploadAdapterNames() []string {
return m.getAdapterNames(m.uploadAdapterFuncs) return m.getAdapterNames(m.uploadAdapterFuncs)
} }
// getAdapterNames returns a list of the names of adapters available to be created // getAdapterNames returns a list of the names of adapters available to be created
func (m *Manifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string { func (m *concreteManifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string {
if m.basicTransfersOnly { if m.basicTransfersOnly {
return []string{BasicAdapterName} return []string{BasicAdapterName}
} }
@ -203,7 +324,7 @@ func (m *Manifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string
// RegisterNewTransferAdapterFunc registers a new function for creating upload // RegisterNewTransferAdapterFunc registers a new function for creating upload
// or download adapters. If a function with that name & direction is already // or download adapters. If a function with that name & direction is already
// registered, it is overridden // registered, it is overridden
func (m *Manifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc) { func (m *concreteManifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@ -216,7 +337,7 @@ func (m *Manifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapt
} }
// Create a new adapter by name and direction; default to BasicAdapterName if doesn't exist // Create a new adapter by name and direction; default to BasicAdapterName if doesn't exist
func (m *Manifest) NewAdapterOrDefault(name string, dir Direction) Adapter { func (m *concreteManifest) NewAdapterOrDefault(name string, dir Direction) Adapter {
if len(name) == 0 { if len(name) == 0 {
name = BasicAdapterName name = BasicAdapterName
} }
@ -230,7 +351,7 @@ func (m *Manifest) NewAdapterOrDefault(name string, dir Direction) Adapter {
} }
// Create a new adapter by name and direction, or nil if doesn't exist // Create a new adapter by name and direction, or nil if doesn't exist
func (m *Manifest) NewAdapter(name string, dir Direction) Adapter { func (m *concreteManifest) NewAdapter(name string, dir Direction) Adapter {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@ -248,12 +369,12 @@ func (m *Manifest) NewAdapter(name string, dir Direction) Adapter {
} }
// Create a new download adapter by name, or BasicAdapterName if doesn't exist // Create a new download adapter by name, or BasicAdapterName if doesn't exist
func (m *Manifest) NewDownloadAdapter(name string) Adapter { func (m *concreteManifest) NewDownloadAdapter(name string) Adapter {
return m.NewAdapterOrDefault(name, Download) return m.NewAdapterOrDefault(name, Download)
} }
// Create a new upload adapter by name, or BasicAdapterName if doesn't exist // Create a new upload adapter by name, or BasicAdapterName if doesn't exist
func (m *Manifest) NewUploadAdapter(name string) Adapter { func (m *concreteManifest) NewUploadAdapter(name string) Adapter {
return m.NewAdapterOrDefault(name, Upload) return m.NewAdapterOrDefault(name, Upload)
} }

@ -407,7 +407,7 @@ func (a *SSHAdapter) Trace(format string, args ...interface{}) {
tracerx.Printf(format, args...) tracerx.Printf(format, args...)
} }
func configureSSHAdapter(m *Manifest) { func configureSSHAdapter(m *concreteManifest) {
m.RegisterNewAdapterFunc("ssh", Upload, func(name string, dir Direction) Adapter { m.RegisterNewAdapterFunc("ssh", Upload, func(name string, dir Direction) Adapter {
a := &SSHAdapter{newAdapterBase(m.fs, name, dir, nil), nil, m.sshTransfer} a := &SSHAdapter{newAdapterBase(m.fs, name, dir, nil), nil, m.sshTransfer}
a.transferImpl = a a.transferImpl = a

@ -209,7 +209,7 @@ type TransferQueue struct {
// once per unique OID on Add(), and is decremented when that transfer // once per unique OID on Add(), and is decremented when that transfer
// is marked as completed or failed, but not retried. // is marked as completed or failed, but not retried.
wait *abortableWaitGroup wait *abortableWaitGroup
manifest *Manifest manifest Manifest
rc *retryCounter rc *retryCounter
// unsupportedContentType indicates whether the transfer queue ever saw // unsupportedContentType indicates whether the transfer queue ever saw
@ -298,10 +298,9 @@ func WithBufferDepth(depth int) Option {
} }
// NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter // NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue { func NewTransferQueue(dir Direction, manifest Manifest, remote string, options ...Option) *TransferQueue {
q := &TransferQueue{ q := &TransferQueue{
direction: dir, direction: dir,
client: &tqClient{Client: manifest.APIClient()},
remote: remote, remote: remote,
errorc: make(chan error), errorc: make(chan error),
transfers: make(map[string]*objects), transfers: make(map[string]*objects),
@ -315,10 +314,6 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
opt(q) opt(q)
} }
q.rc.MaxRetries = q.manifest.maxRetries
q.rc.MaxRetryDelay = q.manifest.maxRetryDelay
q.client.SetMaxRetries(q.manifest.maxRetries)
if q.batchSize <= 0 { if q.batchSize <= 0 {
q.batchSize = defaultBatchSize q.batchSize = defaultBatchSize
} }
@ -337,6 +332,18 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
return q return q
} }
// Ensure we have a concrete manifest and that certain delayed variables are set
// properly.
func (q *TransferQueue) Upgrade() {
if q.client == nil {
manifest := q.manifest.Upgrade()
q.client = &tqClient{Client: manifest.APIClient()}
q.rc.MaxRetries = manifest.maxRetries
q.rc.MaxRetryDelay = manifest.maxRetryDelay
q.client.SetMaxRetries(manifest.maxRetries)
}
}
// Add adds a *Transfer to the transfer queue. It only increments the amount // Add adds a *Transfer to the transfer queue. It only increments the amount
// of waiting the TransferQueue has to do if the *Transfer "t" is new. // of waiting the TransferQueue has to do if the *Transfer "t" is new.
// //
@ -347,6 +354,8 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
// Only one file will be transferred to/from the Path element of the first // Only one file will be transferred to/from the Path element of the first
// transfer. // transfer.
func (q *TransferQueue) Add(name, path, oid string, size int64, missing bool, err error) { func (q *TransferQueue) Add(name, path, oid string, size int64, missing bool, err error) {
q.Upgrade()
if err != nil { if err != nil {
q.errorc <- err q.errorc <- err
return return
@ -384,6 +393,8 @@ func (q *TransferQueue) Add(name, path, oid string, size int64, missing bool, er
// //
// It returns if the value is new or not. // It returns if the value is new or not.
func (q *TransferQueue) remember(t *objectTuple) objects { func (q *TransferQueue) remember(t *objectTuple) objects {
q.Upgrade()
q.trMutex.Lock() q.trMutex.Lock()
defer q.trMutex.Unlock() defer q.trMutex.Unlock()
@ -498,6 +509,8 @@ func (q *TransferQueue) collectBatches() {
// A "pending" batch is returned, along with whether or not "q.incoming" is // A "pending" batch is returned, along with whether or not "q.incoming" is
// closed. // closed.
func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) { func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) {
q.Upgrade()
for { for {
select { select {
case t, ok := <-q.incoming: case t, ok := <-q.incoming:
@ -525,6 +538,8 @@ func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch
// enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been // enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been
// processed. // processed.
func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) { func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) {
q.Upgrade()
next := q.makeBatch() next := q.makeBatch()
tracerx.Printf("tq: sending batch of size %d", len(batch)) tracerx.Printf("tq: sending batch of size %d", len(batch))
@ -548,7 +563,8 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
q.meter.Pause() q.meter.Pause()
var bRes *BatchResponse var bRes *BatchResponse
if q.manifest.standaloneTransferAgent != "" { manifest := q.manifest.Upgrade()
if manifest.standaloneTransferAgent != "" {
// Trust the external transfer agent can do everything by itself. // Trust the external transfer agent can do everything by itself.
objects := make([]*Transfer, 0, len(batch)) objects := make([]*Transfer, 0, len(batch))
for _, t := range batch { for _, t := range batch {
@ -556,7 +572,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
} }
bRes = &BatchResponse{ bRes = &BatchResponse{
Objects: objects, Objects: objects,
TransferAdapterName: q.manifest.standaloneTransferAgent, TransferAdapterName: manifest.standaloneTransferAgent,
} }
} else { } else {
// Query the Git LFS server for what transfer method to use and // Query the Git LFS server for what transfer method to use and
@ -651,7 +667,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
q.Skip(o.Size) q.Skip(o.Size)
q.wait.Done() q.wait.Done()
} }
} else if a == nil && q.manifest.standaloneTransferAgent == "" { } else if a == nil && manifest.standaloneTransferAgent == "" {
q.Skip(o.Size) q.Skip(o.Size)
q.wait.Done() q.wait.Done()
} else { } else {
@ -680,6 +696,8 @@ func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) }
// //
// addToAdapter returns immediately, and does not block. // addToAdapter returns immediately, and does not block.
func (q *TransferQueue) addToAdapter(e lfshttp.Endpoint, pending []*Transfer) <-chan *objectTuple { func (q *TransferQueue) addToAdapter(e lfshttp.Endpoint, pending []*Transfer) <-chan *objectTuple {
q.Upgrade()
retries := make(chan *objectTuple, len(pending)) retries := make(chan *objectTuple, len(pending))
if err := q.ensureAdapterBegun(e); err != nil { if err := q.ensureAdapterBegun(e); err != nil {
@ -718,6 +736,8 @@ func (q *TransferQueue) addToAdapter(e lfshttp.Endpoint, pending []*Transfer) <-
} }
func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) { func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) {
q.Upgrade()
if q.direction != Upload { if q.direction != Upload {
return transfers, nil return transfers, nil
} }
@ -887,6 +907,8 @@ func (q *TransferQueue) Skip(size int64) {
} }
func (q *TransferQueue) ensureAdapterBegun(e lfshttp.Endpoint) error { func (q *TransferQueue) ensureAdapterBegun(e lfshttp.Endpoint) error {
q.Upgrade()
q.adapterInitMutex.Lock() q.adapterInitMutex.Lock()
defer q.adapterInitMutex.Unlock() defer q.adapterInitMutex.Unlock()
@ -947,9 +969,12 @@ func (q *TransferQueue) Wait() {
q.meter.Flush() q.meter.Flush()
q.errorwait.Wait() q.errorwait.Wait()
if q.manifest.sshTransfer != nil { if q.manifest.Upgraded() {
q.manifest.sshTransfer.Shutdown() manifest := q.manifest.Upgrade()
q.manifest.sshTransfer = nil if manifest.sshTransfer != nil {
manifest.sshTransfer.Shutdown()
manifest.sshTransfer = nil
}
} }
if q.unsupportedContentType { if q.unsupportedContentType {

@ -156,7 +156,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
return verifyUpload(a.apiClient, a.remote, t) return verifyUpload(a.apiClient, a.remote, t)
} }
func configureTusAdapter(m *Manifest) { func configureTusAdapter(m *concreteManifest) {
m.RegisterNewAdapterFunc(TusAdapterName, Upload, func(name string, dir Direction) Adapter { m.RegisterNewAdapterFunc(TusAdapterName, Upload, func(name string, dir Direction) Adapter {
switch dir { switch dir {
case Upload: case Upload: