tq: make Manifest an interface

Right now, any time we instantiate a Manifest object, we create an API
client, and when we create the API client, if we're using SSH, we try to
make a connection to the server.  However, we often instantiate a
Manifest object when performing various functionality such as smudging
data, which means that when a user creates an archive locally, they can
be prompted for an SSH password, which is undesirable.

Let's take a first step to fixing this by making Manifest an interface.
Right now, it has one concrete version, a concreteManifest, which can be
used to access the internals, and we provide methods to upgrade it from
the interface to the concrete type and determine whether it's upgraded
or not.  We attempt to upgrade it any time we need to access its
internals.  In the future, we'll also offer a lazyManifest, which is
lazy and will only instantiate the concreteManifest inside when we
attempt to upgrade it to the latter.  But for now, only implement the
concreteManifest to make it clearer what's changing.

Similarly, we make our TransferQueue upgradable so that we don't
upgrade its Manifest right away.

In both cases, we'll want to use the lazyManifest to delay the
instantiation of the API client (and hence the starting of the SSH
connection) in a future commit.
This commit is contained in:
brian m. carlson 2023-03-10 15:24:21 +00:00
parent 7a4005c60d
commit 00623425a2
No known key found for this signature in database
GPG Key ID: 2D0C9BC12F82B3A1
17 changed files with 129 additions and 70 deletions

@ -34,7 +34,7 @@ var (
ErrorWriter = newMultiWriter(os.Stderr, ErrorBuffer)
OutputWriter = newMultiWriter(os.Stdout, ErrorBuffer)
ManPages = make(map[string]string, 20)
tqManifest = make(map[string]*tq.Manifest)
tqManifest = make(map[string]tq.Manifest)
cfg *config.Configuration
apiClient *lfsapi.Client
@ -48,14 +48,14 @@ var (
// getTransferManifest builds a tq.Manifest from the global os and git
// environments.
func getTransferManifest() *tq.Manifest {
func getTransferManifest() tq.Manifest {
return getTransferManifestOperationRemote("", "")
}
// getTransferManifestOperationRemote builds a tq.Manifest from the global os
// and git environments and operation-specific and remote-specific settings.
// Operation must be "download", "upload", or the empty string.
func getTransferManifestOperationRemote(operation, remote string) *tq.Manifest {
func getTransferManifestOperationRemote(operation, remote string) tq.Manifest {
c := getAPIClient()
global.Lock()
@ -112,14 +112,14 @@ func newLockClient() *locking.Client {
}
// newDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download
func newDownloadCheckQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
func newDownloadCheckQueue(manifest tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
return newDownloadQueue(manifest, remote, append(options,
tq.DryRun(true),
)...)
}
// newDownloadQueue builds a DownloadQueue, allowing concurrent downloads.
func newDownloadQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
func newDownloadQueue(manifest tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
return tq.NewTransferQueue(tq.Download, manifest, remote, append(options,
tq.RemoteRef(currentRemoteRef()),
)...)

@ -151,7 +151,7 @@ func (lv *lockVerifier) newRefLocks(ref *git.Ref, l locking.Lock) *refLock {
}
}
func newLockVerifier(m *tq.Manifest) *lockVerifier {
func newLockVerifier(m tq.Manifest) *lockVerifier {
lv := &lockVerifier{
endpoint: getAPIClient().Endpoints.Endpoint("upload", cfg.PushRemote()),
verifiedRefs: make(map[string]bool),

@ -39,7 +39,7 @@ func newSingleCheckout(gitEnv config.Environment, remote string) abstractCheckou
}
type abstractCheckout interface {
Manifest() *tq.Manifest
Manifest() tq.Manifest
Skip() bool
Run(*lfs.WrappedPointer)
RunToPath(*lfs.WrappedPointer, string) error
@ -49,11 +49,11 @@ type abstractCheckout interface {
type singleCheckout struct {
gitIndexer *gitIndexer
pathConverter lfs.PathConverter
manifest *tq.Manifest
manifest tq.Manifest
remote string
}
func (c *singleCheckout) Manifest() *tq.Manifest {
func (c *singleCheckout) Manifest() tq.Manifest {
if c.manifest == nil {
c.manifest = getTransferManifestOperationRemote("download", c.remote)
}
@ -115,11 +115,11 @@ func (c *singleCheckout) Close() {
}
type noOpCheckout struct {
manifest *tq.Manifest
manifest tq.Manifest
remote string
}
func (c *noOpCheckout) Manifest() *tq.Manifest {
func (c *noOpCheckout) Manifest() tq.Manifest {
if c.manifest == nil {
c.manifest = getTransferManifestOperationRemote("download", c.remote)
}

@ -72,7 +72,7 @@ func uploadRangeOrAll(g *lfs.GitScanner, ctx *uploadContext, q *tq.TransferQueue
type uploadContext struct {
Remote string
DryRun bool
Manifest *tq.Manifest
Manifest tq.Manifest
uploadedOids tools.StringSet
gitfilter *lfs.GitFilter

@ -15,7 +15,7 @@ import (
"github.com/rubyist/tracerx"
)
func (f *GitFilter) SmudgeToFile(filename string, ptr *Pointer, download bool, manifest *tq.Manifest, cb tools.CopyCallback) error {
func (f *GitFilter) SmudgeToFile(filename string, ptr *Pointer, download bool, manifest tq.Manifest, cb tools.CopyCallback) error {
tools.MkdirAll(filepath.Dir(filename), f.cfg)
if stat, _ := os.Stat(filename); stat != nil && stat.Mode()&0200 == 0 {
@ -52,7 +52,7 @@ func (f *GitFilter) SmudgeToFile(filename string, ptr *Pointer, download bool, m
return nil
}
func (f *GitFilter) Smudge(writer io.Writer, ptr *Pointer, workingfile string, download bool, manifest *tq.Manifest, cb tools.CopyCallback) (int64, error) {
func (f *GitFilter) Smudge(writer io.Writer, ptr *Pointer, workingfile string, download bool, manifest tq.Manifest, cb tools.CopyCallback) (int64, error) {
mediafile, err := f.ObjectPath(ptr.Oid)
if err != nil {
return 0, err
@ -99,7 +99,7 @@ func (f *GitFilter) Smudge(writer io.Writer, ptr *Pointer, workingfile string, d
return n, nil
}
func (f *GitFilter) downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb tools.CopyCallback) (int64, error) {
func (f *GitFilter) downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest tq.Manifest, cb tools.CopyCallback) (int64, error) {
fmt.Fprintln(os.Stderr, tr.Tr.Get("Downloading %s (%s)", workingfile, humanize.FormatBytes(uint64(ptr.Size))))
// NOTE: if given, "cb" is a tools.CopyCallback which writes updates
@ -131,7 +131,7 @@ func (f *GitFilter) downloadFile(writer io.Writer, ptr *Pointer, workingfile, me
return f.readLocalFile(writer, ptr, mediafile, workingfile, nil)
}
func (f *GitFilter) downloadFileFallBack(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb tools.CopyCallback) (int64, error) {
func (f *GitFilter) downloadFileFallBack(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest tq.Manifest, cb tools.CopyCallback) (int64, error) {
// Attempt to find the LFS objects in all currently registered remotes.
// When a valid remote is found, this remote is taken persistent for
// future attempts within downloadFile(). In best case, the ordinary

@ -15,7 +15,7 @@ import (
"github.com/rubyist/tracerx"
)
func Environ(cfg *config.Configuration, manifest *tq.Manifest, envOverrides map[string]string) []string {
func Environ(cfg *config.Configuration, manifest tq.Manifest, envOverrides map[string]string) []string {
osEnviron := os.Environ()
env := make([]string, 0, len(osEnviron)+7)

@ -27,7 +27,7 @@ type TestObject struct {
type ServerTest struct {
Name string
F func(m *tq.Manifest, oidsExist, oidsMissing []TestObject) error
F func(m tq.Manifest, oidsExist, oidsMissing []TestObject) error
}
var (
@ -138,7 +138,7 @@ func (*testDataCallback) Errorf(format string, args ...interface{}) {
fmt.Printf(format, args...)
}
func buildManifest(r *t.Repo) (*tq.Manifest, error) {
func buildManifest(r *t.Repo) (tq.Manifest, error) {
// Configure the endpoint manually
finder := lfsapi.NewEndpointFinder(r)
@ -176,7 +176,7 @@ func (c *constantEndpoint) Endpoint(operation, remote string) lfshttp.Endpoint {
func (c *constantEndpoint) RemoteEndpoint(operation, remote string) lfshttp.Endpoint { return c.e }
func buildTestData(repo *t.Repo, manifest *tq.Manifest) (oidsExist, oidsMissing []TestObject, err error) {
func buildTestData(repo *t.Repo, manifest tq.Manifest) (oidsExist, oidsMissing []TestObject, err error) {
const oidCount = 50
oidsExist = make([]TestObject, 0, oidCount)
oidsMissing = make([]TestObject, 0, oidCount)
@ -242,7 +242,7 @@ func saveTestOids(filename string, objs []TestObject) {
}
func runTests(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) bool {
func runTests(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) bool {
ok := true
fmt.Printf("Running %d tests...\n", len(tests))
for _, t := range tests {
@ -254,7 +254,7 @@ func runTests(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) bool {
return ok
}
func runTest(t ServerTest, manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error {
func runTest(t ServerTest, manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
const linelen = 70
line := t.Name
if len(line) > linelen {
@ -280,11 +280,11 @@ func exit(format string, args ...interface{}) {
os.Exit(2)
}
func addTest(name string, f func(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error) {
func addTest(name string, f func(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error) {
tests = append(tests, ServerTest{Name: name, F: f})
}
func callBatchApi(manifest *tq.Manifest, dir tq.Direction, objs []TestObject) ([]*tq.Transfer, error) {
func callBatchApi(manifest tq.Manifest, dir tq.Direction, objs []TestObject) ([]*tq.Transfer, error) {
apiobjs := make([]*tq.Transfer, 0, len(objs))
for _, o := range objs {
apiobjs = append(apiobjs, &tq.Transfer{Oid: o.Oid, Size: o.Size})

@ -10,7 +10,7 @@ import (
)
// "download" - all present
func downloadAllExist(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error {
func downloadAllExist(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
retobjs, err := callBatchApi(manifest, tq.Download, oidsExist)
if err != nil {
@ -37,7 +37,7 @@ func downloadAllExist(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject
}
// "download" - all missing (test includes 404 error entry)
func downloadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error {
func downloadAllMissing(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
retobjs, err := callBatchApi(manifest, tq.Download, oidsMissing)
if err != nil {
@ -69,7 +69,7 @@ func downloadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObje
}
// "download" - mixture
func downloadMixed(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error {
func downloadMixed(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
existSet := tools.NewStringSetWithCapacity(len(oidsExist))
for _, o := range oidsExist {
existSet.Add(o.Oid)

@ -10,7 +10,7 @@ import (
)
// "upload" - all missing
func uploadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error {
func uploadAllMissing(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
retobjs, err := callBatchApi(manifest, tq.Upload, oidsMissing)
if err != nil {
@ -38,7 +38,7 @@ func uploadAllMissing(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject
}
// "upload" - all present
func uploadAllExists(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error {
func uploadAllExists(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
retobjs, err := callBatchApi(manifest, tq.Upload, oidsExist)
if err != nil {
@ -65,7 +65,7 @@ func uploadAllExists(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject)
}
// "upload" - mix of missing & present
func uploadMixed(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error {
func uploadMixed(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
existSet := tools.NewStringSetWithCapacity(len(oidsExist))
for _, o := range oidsExist {
existSet.Add(o.Oid)
@ -109,7 +109,7 @@ func uploadMixed(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) err
}
func uploadEdgeCases(manifest *tq.Manifest, oidsExist, oidsMissing []TestObject) error {
func uploadEdgeCases(manifest tq.Manifest, oidsExist, oidsMissing []TestObject) error {
errorCases := make([]TestObject, 0, 5)
errorCodeMap := make(map[string]int, 5)
errorReasonMap := make(map[string]string, 5)

@ -35,12 +35,14 @@ type BatchResponse struct {
endpoint lfshttp.Endpoint
}
func Batch(m *Manifest, dir Direction, remote string, remoteRef *git.Ref, objects []*Transfer) (*BatchResponse, error) {
func Batch(m Manifest, dir Direction, remote string, remoteRef *git.Ref, objects []*Transfer) (*BatchResponse, error) {
if len(objects) == 0 {
return &BatchResponse{}, nil
}
return m.batchClient().Batch(remote, &batchRequest{
cm := m.Upgrade()
return cm.batchClient().Batch(remote, &batchRequest{
Operation: dir.String(),
Objects: objects,
TransferAdapterNames: m.GetAdapterNames(dir),

@ -261,7 +261,7 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
return err
}
func configureBasicDownloadAdapter(m *Manifest) {
func configureBasicDownloadAdapter(m *concreteManifest) {
m.RegisterNewAdapterFunc(BasicAdapterName, Download, func(name string, dir Direction) Adapter {
switch dir {
case Download:

@ -209,7 +209,7 @@ func newStartCallbackReader(r lfsapi.ReadSeekCloser, cb func() error) *startCall
}
}
func configureBasicUploadAdapter(m *Manifest) {
func configureBasicUploadAdapter(m *concreteManifest) {
m.RegisterNewAdapterFunc(BasicAdapterName, Upload, func(name string, dir Direction) Adapter {
switch dir {
case Upload:

@ -351,7 +351,7 @@ const (
standaloneFileName = "lfs-standalone-file"
)
func configureDefaultCustomAdapters(git Env, m *Manifest) {
func configureDefaultCustomAdapters(git Env, m *concreteManifest) {
newfunc := func(name string, dir Direction) Adapter {
standalone := m.standaloneTransferAgent != ""
return newCustomAdapter(m.fs, standaloneFileName, dir, "git-lfs", "standalone-file", false, standalone)
@ -361,7 +361,7 @@ func configureDefaultCustomAdapters(git Env, m *Manifest) {
}
// Initialise custom adapters based on current config
func configureCustomAdapters(git Env, m *Manifest) {
func configureCustomAdapters(git Env, m *concreteManifest) {
configureDefaultCustomAdapters(git, m)
pathRegex := regexp.MustCompile(`lfs.customtransfer.([^.]+).path`)

@ -17,7 +17,27 @@ const (
defaultConcurrentTransfers = 8
)
type Manifest struct {
type Manifest interface {
APIClient() *lfsapi.Client
MaxRetries() int
MaxRetryDelay() int
ConcurrentTransfers() int
IsStandaloneTransfer() bool
batchClient() BatchClient
GetAdapterNames(dir Direction) []string
GetDownloadAdapterNames() []string
GetUploadAdapterNames() []string
getAdapterNames(adapters map[string]NewAdapterFunc) []string
RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc)
NewAdapterOrDefault(name string, dir Direction) Adapter
NewAdapter(name string, dir Direction) Adapter
NewDownloadAdapter(name string) Adapter
NewUploadAdapter(name string) Adapter
Upgrade() *concreteManifest
Upgraded() bool
}
type concreteManifest struct {
// maxRetries is the maximum number of retries a single object can
// attempt to make before it will be dropped. maxRetryDelay is the maximum
// time in seconds to wait between retry attempts when using backoff.
@ -36,34 +56,46 @@ type Manifest struct {
mu sync.Mutex
}
func (m *Manifest) APIClient() *lfsapi.Client {
func (m *concreteManifest) APIClient() *lfsapi.Client {
return m.apiClient
}
func (m *Manifest) MaxRetries() int {
func (m *concreteManifest) MaxRetries() int {
return m.maxRetries
}
func (m *Manifest) MaxRetryDelay() int {
func (m *concreteManifest) MaxRetryDelay() int {
return m.maxRetryDelay
}
func (m *Manifest) ConcurrentTransfers() int {
func (m *concreteManifest) ConcurrentTransfers() int {
return m.concurrentTransfers
}
func (m *Manifest) IsStandaloneTransfer() bool {
func (m *concreteManifest) IsStandaloneTransfer() bool {
return m.standaloneTransferAgent != ""
}
func (m *Manifest) batchClient() BatchClient {
func (m *concreteManifest) batchClient() BatchClient {
if r := m.MaxRetries(); r > 0 {
m.batchClientAdapter.SetMaxRetries(r)
}
return m.batchClientAdapter
}
func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) *Manifest {
func (m *concreteManifest) Upgrade() *concreteManifest {
return m
}
func (m *concreteManifest) Upgraded() bool {
return true
}
func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) Manifest {
return newConcreteManifest(f, apiClient, operation, remote)
}
func newConcreteManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) *concreteManifest {
if apiClient == nil {
cli, err := lfsapi.NewClient(nil)
if err != nil {
@ -79,7 +111,7 @@ func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote s
useSSHMultiplexing = sshTransfer.IsMultiplexingEnabled()
}
m := &Manifest{
m := &concreteManifest{
fs: f,
apiClient: apiClient,
batchClientAdapter: &tqClient{Client: apiClient},
@ -164,7 +196,7 @@ func findStandaloneTransfer(client *lfsapi.Client, operation, remote string) str
}
// GetAdapterNames returns a list of the names of adapters available to be created
func (m *Manifest) GetAdapterNames(dir Direction) []string {
func (m *concreteManifest) GetAdapterNames(dir Direction) []string {
switch dir {
case Upload:
return m.GetUploadAdapterNames()
@ -175,17 +207,17 @@ func (m *Manifest) GetAdapterNames(dir Direction) []string {
}
// GetDownloadAdapterNames returns a list of the names of download adapters available to be created
func (m *Manifest) GetDownloadAdapterNames() []string {
func (m *concreteManifest) GetDownloadAdapterNames() []string {
return m.getAdapterNames(m.downloadAdapterFuncs)
}
// GetUploadAdapterNames returns a list of the names of upload adapters available to be created
func (m *Manifest) GetUploadAdapterNames() []string {
func (m *concreteManifest) GetUploadAdapterNames() []string {
return m.getAdapterNames(m.uploadAdapterFuncs)
}
// getAdapterNames returns a list of the names of adapters available to be created
func (m *Manifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string {
func (m *concreteManifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string {
if m.basicTransfersOnly {
return []string{BasicAdapterName}
}
@ -203,7 +235,7 @@ func (m *Manifest) getAdapterNames(adapters map[string]NewAdapterFunc) []string
// RegisterNewTransferAdapterFunc registers a new function for creating upload
// or download adapters. If a function with that name & direction is already
// registered, it is overridden
func (m *Manifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc) {
func (m *concreteManifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc) {
m.mu.Lock()
defer m.mu.Unlock()
@ -216,7 +248,7 @@ func (m *Manifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapt
}
// Create a new adapter by name and direction; default to BasicAdapterName if doesn't exist
func (m *Manifest) NewAdapterOrDefault(name string, dir Direction) Adapter {
func (m *concreteManifest) NewAdapterOrDefault(name string, dir Direction) Adapter {
if len(name) == 0 {
name = BasicAdapterName
}
@ -230,7 +262,7 @@ func (m *Manifest) NewAdapterOrDefault(name string, dir Direction) Adapter {
}
// Create a new adapter by name and direction, or nil if doesn't exist
func (m *Manifest) NewAdapter(name string, dir Direction) Adapter {
func (m *concreteManifest) NewAdapter(name string, dir Direction) Adapter {
m.mu.Lock()
defer m.mu.Unlock()
@ -248,12 +280,12 @@ func (m *Manifest) NewAdapter(name string, dir Direction) Adapter {
}
// Create a new download adapter by name, or BasicAdapterName if doesn't exist
func (m *Manifest) NewDownloadAdapter(name string) Adapter {
func (m *concreteManifest) NewDownloadAdapter(name string) Adapter {
return m.NewAdapterOrDefault(name, Download)
}
// Create a new upload adapter by name, or BasicAdapterName if doesn't exist
func (m *Manifest) NewUploadAdapter(name string) Adapter {
func (m *concreteManifest) NewUploadAdapter(name string) Adapter {
return m.NewAdapterOrDefault(name, Upload)
}

@ -407,7 +407,7 @@ func (a *SSHAdapter) Trace(format string, args ...interface{}) {
tracerx.Printf(format, args...)
}
func configureSSHAdapter(m *Manifest) {
func configureSSHAdapter(m *concreteManifest) {
m.RegisterNewAdapterFunc("ssh", Upload, func(name string, dir Direction) Adapter {
a := &SSHAdapter{newAdapterBase(m.fs, name, dir, nil), nil, m.sshTransfer}
a.transferImpl = a

@ -209,7 +209,7 @@ type TransferQueue struct {
// once per unique OID on Add(), and is decremented when that transfer
// is marked as completed or failed, but not retried.
wait *abortableWaitGroup
manifest *Manifest
manifest Manifest
rc *retryCounter
// unsupportedContentType indicates whether the transfer queue ever saw
@ -298,10 +298,9 @@ func WithBufferDepth(depth int) Option {
}
// NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue {
func NewTransferQueue(dir Direction, manifest Manifest, remote string, options ...Option) *TransferQueue {
q := &TransferQueue{
direction: dir,
client: &tqClient{Client: manifest.APIClient()},
remote: remote,
errorc: make(chan error),
transfers: make(map[string]*objects),
@ -315,10 +314,6 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
opt(q)
}
q.rc.MaxRetries = q.manifest.maxRetries
q.rc.MaxRetryDelay = q.manifest.maxRetryDelay
q.client.SetMaxRetries(q.manifest.maxRetries)
if q.batchSize <= 0 {
q.batchSize = defaultBatchSize
}
@ -337,6 +332,18 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
return q
}
// Ensure we have a concrete manifest and that certain delayed variables are set
// properly.
func (q *TransferQueue) Upgrade() {
if q.client == nil {
manifest := q.manifest.Upgrade()
q.client = &tqClient{Client: manifest.APIClient()}
q.rc.MaxRetries = manifest.maxRetries
q.rc.MaxRetryDelay = manifest.maxRetryDelay
q.client.SetMaxRetries(manifest.maxRetries)
}
}
// Add adds a *Transfer to the transfer queue. It only increments the amount
// of waiting the TransferQueue has to do if the *Transfer "t" is new.
//
@ -347,6 +354,8 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
// Only one file will be transferred to/from the Path element of the first
// transfer.
func (q *TransferQueue) Add(name, path, oid string, size int64, missing bool, err error) {
q.Upgrade()
if err != nil {
q.errorc <- err
return
@ -384,6 +393,8 @@ func (q *TransferQueue) Add(name, path, oid string, size int64, missing bool, er
//
// It returns if the value is new or not.
func (q *TransferQueue) remember(t *objectTuple) objects {
q.Upgrade()
q.trMutex.Lock()
defer q.trMutex.Unlock()
@ -498,6 +509,8 @@ func (q *TransferQueue) collectBatches() {
// A "pending" batch is returned, along with whether or not "q.incoming" is
// closed.
func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) {
q.Upgrade()
for {
select {
case t, ok := <-q.incoming:
@ -525,6 +538,8 @@ func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch
// enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been
// processed.
func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) {
q.Upgrade()
next := q.makeBatch()
tracerx.Printf("tq: sending batch of size %d", len(batch))
@ -548,7 +563,8 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
q.meter.Pause()
var bRes *BatchResponse
if q.manifest.standaloneTransferAgent != "" {
manifest := q.manifest.Upgrade()
if manifest.standaloneTransferAgent != "" {
// Trust the external transfer agent can do everything by itself.
objects := make([]*Transfer, 0, len(batch))
for _, t := range batch {
@ -556,7 +572,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
}
bRes = &BatchResponse{
Objects: objects,
TransferAdapterName: q.manifest.standaloneTransferAgent,
TransferAdapterName: manifest.standaloneTransferAgent,
}
} else {
// Query the Git LFS server for what transfer method to use and
@ -651,7 +667,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
q.Skip(o.Size)
q.wait.Done()
}
} else if a == nil && q.manifest.standaloneTransferAgent == "" {
} else if a == nil && manifest.standaloneTransferAgent == "" {
q.Skip(o.Size)
q.wait.Done()
} else {
@ -680,6 +696,8 @@ func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) }
//
// addToAdapter returns immediately, and does not block.
func (q *TransferQueue) addToAdapter(e lfshttp.Endpoint, pending []*Transfer) <-chan *objectTuple {
q.Upgrade()
retries := make(chan *objectTuple, len(pending))
if err := q.ensureAdapterBegun(e); err != nil {
@ -718,6 +736,8 @@ func (q *TransferQueue) addToAdapter(e lfshttp.Endpoint, pending []*Transfer) <-
}
func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) {
q.Upgrade()
if q.direction != Upload {
return transfers, nil
}
@ -887,6 +907,8 @@ func (q *TransferQueue) Skip(size int64) {
}
func (q *TransferQueue) ensureAdapterBegun(e lfshttp.Endpoint) error {
q.Upgrade()
q.adapterInitMutex.Lock()
defer q.adapterInitMutex.Unlock()
@ -947,9 +969,12 @@ func (q *TransferQueue) Wait() {
q.meter.Flush()
q.errorwait.Wait()
if q.manifest.sshTransfer != nil {
q.manifest.sshTransfer.Shutdown()
q.manifest.sshTransfer = nil
if q.manifest.Upgraded() {
manifest := q.manifest.Upgrade()
if manifest.sshTransfer != nil {
manifest.sshTransfer.Shutdown()
manifest.sshTransfer = nil
}
}
if q.unsupportedContentType {

@ -156,7 +156,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
return verifyUpload(a.apiClient, a.remote, t)
}
func configureTusAdapter(m *Manifest) {
func configureTusAdapter(m *concreteManifest) {
m.RegisterNewAdapterFunc(TusAdapterName, Upload, func(name string, dir Direction) Adapter {
switch dir {
case Upload: