Merge pull request #1827 from git-lfs/api/batch

Api/batch
This commit is contained in:
risk danger olson 2017-01-06 11:48:07 -07:00 committed by GitHub
commit 2e43f34885
48 changed files with 620 additions and 306 deletions

@ -62,19 +62,21 @@ func cloneCommand(cmd *cobra.Command, args []string) {
// Now just call pull with default args
// Support --origin option to clone
var remote string
if len(cloneFlags.Origin) > 0 {
cfg.CurrentRemote = cloneFlags.Origin
remote = cloneFlags.Origin
} else {
cfg.CurrentRemote = "origin"
remote = "origin"
}
includeArg, excludeArg := getIncludeExcludeArgs(cmd)
filter := buildFilepathFilter(cfg, includeArg, excludeArg)
if cloneFlags.NoCheckout || cloneFlags.Bare {
// If --no-checkout or --bare then we shouldn't check out, just fetch instead
cfg.CurrentRemote = remote
fetchRef("HEAD", filter)
} else {
pull(filter)
pull(remote, filter)
err := postCloneSubmodules(args)
if err != nil {
Exit("Error performing 'git lfs pull' for submodules: %v", err)

@ -35,7 +35,7 @@ func envCommand(cmd *cobra.Command, args []string) {
}
}
for _, env := range lfs.Environ(cfg, TransferManifest()) {
for _, env := range lfs.Environ(cfg, getTransferManifest()) {
Print(env)
}

@ -279,7 +279,7 @@ func fetchAndReportToChan(allpointers []*lfs.WrappedPointer, filter *filepathfil
}
ready, pointers, meter := readyAndMissingPointers(allpointers, filter)
q := newDownloadQueue(tq.WithProgress(meter))
q := newDownloadQueue(getTransferManifest(), cfg.CurrentRemote, tq.WithProgress(meter))
if out != nil {
// If we already have it, or it won't be fetched

@ -52,11 +52,10 @@ func prePushCommand(cmd *cobra.Command, args []string) {
Exit("Invalid remote name %q", args[0])
}
cfg.CurrentRemote = args[0]
ctx := newUploadContext(prePushDryRun)
ctx := newUploadContext(args[0], prePushDryRun)
gitscanner := lfs.NewGitScanner(nil)
if err := gitscanner.RemoteForPush(cfg.CurrentRemote); err != nil {
if err := gitscanner.RemoteForPush(ctx.Remote); err != nil {
ExitWithError(err)
}

@ -120,9 +120,7 @@ func prune(fetchPruneConfig config.FetchPruneConfig, verifyRemote, dryRun, verbo
var verifywait sync.WaitGroup
if verifyRemote {
cfg.CurrentRemote = fetchPruneConfig.PruneRemoteName
// build queue now, no estimates or progress output
verifyQueue = newDownloadCheckQueue()
verifyQueue = newDownloadCheckQueue(getTransferManifest(), fetchPruneConfig.PruneRemoteName)
verifiedObjects = tools.NewStringSetWithCapacity(len(localObjects) / 2)
// this channel is filled with oids for which Check() succeeded & Transfer() was called

@ -18,27 +18,29 @@ func pullCommand(cmd *cobra.Command, args []string) {
requireGitVersion()
requireInRepo()
var remote string
if len(args) > 0 {
// Remote is first arg
if err := git.ValidateRemote(args[0]); err != nil {
Panic(err, fmt.Sprintf("Invalid remote name '%v'", args[0]))
}
cfg.CurrentRemote = args[0]
remote = args[0]
} else {
// Actively find the default remote, don't just assume origin
defaultRemote, err := git.DefaultRemote()
if err != nil {
Panic(err, "No default remote")
}
cfg.CurrentRemote = defaultRemote
remote = defaultRemote
}
includeArg, excludeArg := getIncludeExcludeArgs(cmd)
filter := buildFilepathFilter(cfg, includeArg, excludeArg)
pull(filter)
pull(remote, filter)
}
func pull(filter *filepathfilter.Filter) {
func pull(remote string, filter *filepathfilter.Filter) {
cfg.CurrentRemote = remote
ref, err := git.CurrentRef()
if err != nil {
Panic(err, "Could not pull")
@ -47,7 +49,7 @@ func pull(filter *filepathfilter.Filter) {
pointers := newPointerMap()
meter := progress.NewMeter(progress.WithOSEnv(cfg.Os))
singleCheckout := newSingleCheckout()
q := newDownloadQueue(tq.WithProgress(meter))
q := newDownloadQueue(singleCheckout.manifest, remote, tq.WithProgress(meter))
gitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) {
if err != nil {
LoggedError(err, "Scanner error")

@ -20,10 +20,10 @@ var (
)
func uploadsBetweenRefAndRemote(ctx *uploadContext, refnames []string) {
tracerx.Printf("Upload refs %v to remote %v", refnames, cfg.CurrentRemote)
tracerx.Printf("Upload refs %v to remote %v", refnames, ctx.Remote)
gitscanner := lfs.NewGitScanner(nil)
if err := gitscanner.RemoteForPush(cfg.CurrentRemote); err != nil {
if err := gitscanner.RemoteForPush(ctx.Remote); err != nil {
ExitWithError(err)
}
defer gitscanner.Close()
@ -128,8 +128,7 @@ func pushCommand(cmd *cobra.Command, args []string) {
Exit("Invalid remote name %q", args[0])
}
cfg.CurrentRemote = args[0]
ctx := newUploadContext(pushDryRun)
ctx := newUploadContext(args[0], pushDryRun)
if pushObjectIDs {
if len(args) < 2 {

@ -40,7 +40,7 @@ func smudge(to io.Writer, ptr *lfs.Pointer, filename string, skip bool, filter *
download = filter.Allows(filename)
}
err = ptr.Smudge(to, filename, download, TransferManifest(), cb)
err = ptr.Smudge(to, filename, download, getTransferManifest(), cb)
if file != nil {
file.Close()
}

@ -1,7 +1,7 @@
package commands
import (
"github.com/git-lfs/git-lfs/httputil"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/spf13/cobra"
)
@ -10,7 +10,7 @@ var (
)
func versionCommand(cmd *cobra.Command, args []string) {
Print(httputil.UserAgent)
Print(lfsapi.UserAgent)
if lovesComics {
Print("Nothing may see Gah Lak Tus and survive!")

@ -9,6 +9,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/git-lfs/git-lfs/config"
@ -34,20 +35,45 @@ var (
ManPages = make(map[string]string, 20)
cfg = config.Config
tqManifest *tq.Manifest
apiClient *lfsapi.Client
global sync.Mutex
includeArg string
excludeArg string
)
func newAPIClient() *lfsapi.Client {
// getTransferManifest builds a tq.Manifest from the global os and git
// environments.
func getTransferManifest() *tq.Manifest {
c := getAPIClient()
global.Lock()
defer global.Unlock()
if tqManifest == nil {
tqManifest = tq.NewManifestWithClient(c)
}
return tqManifest
}
func getAPIClient() *lfsapi.Client {
global.Lock()
defer global.Unlock()
if apiClient == nil {
c, err := lfsapi.NewClient(cfg.Os, cfg.Git)
if err != nil {
ExitWithError(err)
}
return c
apiClient = c
}
return apiClient
}
func newLockClient(remote string) *locking.Client {
lockClient, err := locking.NewClient(remote, newAPIClient())
lockClient, err := locking.NewClient(remote, getAPIClient())
if err == nil {
err = lockClient.SetupFileCache(filepath.Join(config.LocalGitStorageDir, "lfs"))
}
@ -59,25 +85,22 @@ func newLockClient(remote string) *locking.Client {
return lockClient
}
// TransferManifest builds a tq.Manifest from the commands package global
// cfg var.
func TransferManifest() *tq.Manifest {
return lfs.TransferManifest(cfg)
}
// newDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download
func newDownloadCheckQueue(options ...tq.Option) *tq.TransferQueue {
return lfs.NewDownloadCheckQueue(cfg, options...)
func newDownloadCheckQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
allOptions := make([]tq.Option, 0, len(options)+1)
allOptions = append(allOptions, options...)
allOptions = append(allOptions, tq.DryRun(true))
return newDownloadQueue(manifest, remote, allOptions...)
}
// newDownloadQueue builds a DownloadQueue, allowing concurrent downloads.
func newDownloadQueue(options ...tq.Option) *tq.TransferQueue {
return lfs.NewDownloadQueue(cfg, options...)
func newDownloadQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
return tq.NewTransferQueue(tq.Download, manifest, remote, options...)
}
// newUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
func newUploadQueue(options ...tq.Option) *tq.TransferQueue {
return lfs.NewUploadQueue(cfg, options...)
func newUploadQueue(manifest *tq.Manifest, remote string, options ...tq.Option) *tq.TransferQueue {
return tq.NewTransferQueue(tq.Upload, manifest, remote, options...)
}
func buildFilepathFilter(config *config.Configuration, includeArg, excludeArg *string) *filepathfilter.Filter {
@ -343,7 +366,7 @@ func logPanicToWriter(w io.Writer, loggedError error) {
fmt.Fprintln(w, "\nENV:")
// log the environment
for _, env := range lfs.Environ(cfg, TransferManifest()) {
for _, env := range lfs.Environ(cfg, getTransferManifest()) {
fmt.Fprintln(w, env)
}
}

@ -26,7 +26,7 @@ func newSingleCheckout() *singleCheckout {
return &singleCheckout{
gitIndexer: &gitIndexer{},
pathConverter: pathConverter,
manifest: TransferManifest(),
manifest: getTransferManifest(),
}
}

@ -3,11 +3,13 @@ package commands
import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/httputil"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/git-lfs/git-lfs/localstorage"
"github.com/spf13/cobra"
)
@ -64,7 +66,7 @@ func Run() {
}
root.Execute()
httputil.LogHttpStats(cfg)
logHTTPStats(getAPIClient())
}
func gitlfsCommand(cmd *cobra.Command, args []string) {
@ -103,3 +105,28 @@ func printHelp(commandName string) {
fmt.Fprintf(os.Stderr, "Sorry, no usage text found for %q\n", commandName)
}
}
func logHTTPStats(c *lfsapi.Client) {
if !c.LoggingStats {
return
}
file, err := statsLogFile()
if err != nil {
fmt.Fprintf(os.Stderr, "Error logging http stats: %s\n", err)
return
}
defer file.Close()
c.LogStats(file)
}
func statsLogFile() (*os.File, error) {
logBase := filepath.Join(config.LocalLogDir, "http")
if err := os.MkdirAll(logBase, 0755); err != nil {
return nil, err
}
logFile := fmt.Sprintf("http-%d.log", time.Now().Unix())
return os.Create(filepath.Join(logBase, logFile))
}

@ -12,12 +12,17 @@ import (
var uploadMissingErr = "%s does not exist in .git/lfs/objects. Tried %s, which matches %s."
type uploadContext struct {
Remote string
DryRun bool
Manifest *tq.Manifest
uploadedOids tools.StringSet
}
func newUploadContext(dryRun bool) *uploadContext {
func newUploadContext(remote string, dryRun bool) *uploadContext {
cfg.CurrentRemote = remote
return &uploadContext{
Remote: remote,
Manifest: getTransferManifest(),
DryRun: dryRun,
uploadedOids: tools.NewStringSet(),
}
@ -75,7 +80,7 @@ func (c *uploadContext) prepareUpload(unfiltered []*lfs.WrappedPointer) (*tq.Tra
// build the TransferQueue, automatically skipping any missing objects that
// the server already has.
uploadQueue := newUploadQueue(tq.WithProgress(meter), tq.DryRun(c.DryRun))
uploadQueue := newUploadQueue(c.Manifest, c.Remote, tq.WithProgress(meter), tq.DryRun(c.DryRun))
for _, p := range missingLocalObjects {
if c.HasUploaded(p.Oid) {
// if the server already has this object, call Skip() on
@ -99,7 +104,7 @@ func (c *uploadContext) checkMissing(missing []*lfs.WrappedPointer, missingSize
return
}
checkQueue := newDownloadCheckQueue()
checkQueue := newDownloadCheckQueue(c.Manifest, c.Remote)
transferCh := checkQueue.Watch()
done := make(chan int)

@ -1,19 +0,0 @@
package lfs
import (
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/tq"
)
// NewDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download
func NewDownloadCheckQueue(cfg *config.Configuration, options ...tq.Option) *tq.TransferQueue {
allOptions := make([]tq.Option, len(options), len(options)+1)
allOptions = append(allOptions, options...)
allOptions = append(allOptions, tq.DryRun(true))
return NewDownloadQueue(cfg, allOptions...)
}
// NewDownloadQueue builds a DownloadQueue, allowing concurrent downloads.
func NewDownloadQueue(cfg *config.Configuration, options ...tq.Option) *tq.TransferQueue {
return tq.NewTransferQueue(tq.Download, TransferManifest(cfg), options...)
}

@ -118,11 +118,6 @@ func Environ(cfg *config.Configuration, manifest *tq.Manifest) []string {
return env
}
// TransferManifest builds a tq.Manifest using the given cfg.
func TransferManifest(cfg *config.Configuration) *tq.Manifest {
return tq.NewManifestWithGitEnv(string(cfg.Access("download")), cfg.Git)
}
func InRepo() bool {
return config.LocalGitDir != ""
}

@ -1,50 +0,0 @@
package lfs
import (
"testing"
"github.com/git-lfs/git-lfs/config"
"github.com/stretchr/testify/assert"
)
func TestManifestIsConfigurable(t *testing.T) {
cfg := config.NewFrom(config.Values{
Git: map[string]string{
"lfs.transfer.maxretries": "3",
},
})
m := TransferManifest(cfg)
assert.Equal(t, 3, m.MaxRetries())
}
func TestManifestChecksNTLM(t *testing.T) {
cfg := config.NewFrom(config.Values{
Git: map[string]string{
"lfs.url": "http://foo",
"lfs.http://foo.access": "ntlm",
"lfs.concurrenttransfers": "3",
},
})
m := TransferManifest(cfg)
assert.Equal(t, 1, m.MaxRetries())
}
func TestManifestClampsValidValues(t *testing.T) {
cfg := config.NewFrom(config.Values{
Git: map[string]string{
"lfs.transfer.maxretries": "-1",
},
})
m := TransferManifest(cfg)
assert.Equal(t, 1, m.MaxRetries())
}
func TestManifestIgnoresNonInts(t *testing.T) {
cfg := config.NewFrom(config.Values{
Git: map[string]string{
"lfs.transfer.maxretries": "not_an_int",
},
})
m := TransferManifest(cfg)
assert.Equal(t, 1, m.MaxRetries())
}

@ -74,7 +74,7 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, download
func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb progress.CopyCallback) error {
fmt.Fprintf(os.Stderr, "Downloading %s (%s)\n", workingfile, pb.FormatBytes(ptr.Size))
q := tq.NewTransferQueue(tq.Download, manifest)
q := tq.NewTransferQueue(tq.Download, manifest, "")
q.Add(filepath.Base(workingfile), mediafile, ptr.Oid, ptr.Size)
q.Wait()

@ -1,11 +0,0 @@
package lfs
import (
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/tq"
)
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
func NewUploadQueue(cfg *config.Configuration, options ...tq.Option) *tq.TransferQueue {
return tq.NewTransferQueue(tq.Upload, TransferManifest(cfg), options...)
}

@ -74,7 +74,7 @@ func TestDoWithAuthApprove(t *testing.T) {
creds := newMockCredentialHelper()
c := &Client{
Credentials: creds,
Endpoints: NewEndpointFinder(Env(map[string]string{
Endpoints: NewEndpointFinder(TestEnv(map[string]string{
"lfs.url": srv.URL + "/repo/lfs",
})),
}
@ -144,7 +144,7 @@ func TestDoWithAuthReject(t *testing.T) {
c := &Client{
Credentials: creds,
Endpoints: NewEndpointFinder(Env(map[string]string{
Endpoints: NewEndpointFinder(TestEnv(map[string]string{
"lfs.url": srv.URL,
})),
}
@ -467,7 +467,7 @@ func TestGetCreds(t *testing.T) {
req.Header.Set(key, value)
}
ef := NewEndpointFinder(Env(test.Config))
ef := NewEndpointFinder(TestEnv(test.Config))
endpoint, access, creds, credsURL, err := getCreds(credHelper, netrcFinder, ef, test.Remote, req)
if !assert.Nil(t, err) {
continue

@ -35,7 +35,7 @@ func getRootCAsForHost(c *Client, host string) *x509.CertPool {
return appendRootCAsForHostFromPlatform(pool, host)
}
func appendRootCAsForHostFromGitconfig(osEnv env, gitEnv env, pool *x509.CertPool, host string) *x509.CertPool {
func appendRootCAsForHostFromGitconfig(osEnv Env, gitEnv Env, pool *x509.CertPool, host string) *x509.CertPool {
// Accumulate certs from all these locations:
// GIT_SSL_CAINFO first

@ -60,7 +60,7 @@ func TestCertFromSSLCAInfoConfig(t *testing.T) {
// Test http.<url>.sslcainfo
for _, hostName := range sslCAInfoConfigHostNames {
hostKey := fmt.Sprintf("http.https://%v.sslcainfo", hostName)
c, err := NewClient(nil, Env(map[string]string{
c, err := NewClient(nil, TestEnv(map[string]string{
hostKey: tempfile.Name(),
}))
assert.Nil(t, err)
@ -82,7 +82,7 @@ func TestCertFromSSLCAInfoConfig(t *testing.T) {
}
// Test http.sslcainfo
c, err := NewClient(nil, Env(map[string]string{
c, err := NewClient(nil, TestEnv(map[string]string{
"http.sslcainfo": tempfile.Name(),
}))
assert.Nil(t, err)
@ -103,7 +103,7 @@ func TestCertFromSSLCAInfoEnv(t *testing.T) {
assert.Nil(t, err, "Error writing temp cert file")
tempfile.Close()
c, err := NewClient(Env(map[string]string{
c, err := NewClient(TestEnv(map[string]string{
"GIT_SSL_CAINFO": tempfile.Name(),
}), nil)
assert.Nil(t, err)
@ -123,7 +123,7 @@ func TestCertFromSSLCAPathConfig(t *testing.T) {
err = ioutil.WriteFile(filepath.Join(tempdir, "cert1.pem"), []byte(testCert), 0644)
assert.Nil(t, err, "Error creating cert file")
c, err := NewClient(nil, Env(map[string]string{
c, err := NewClient(nil, TestEnv(map[string]string{
"http.sslcapath": tempdir,
}))
@ -144,7 +144,7 @@ func TestCertFromSSLCAPathEnv(t *testing.T) {
err = ioutil.WriteFile(filepath.Join(tempdir, "cert1.pem"), []byte(testCert), 0644)
assert.Nil(t, err, "Error creating cert file")
c, err := NewClient(Env(map[string]string{
c, err := NewClient(TestEnv(map[string]string{
"GIT_SSL_CAPATH": tempdir,
}), nil)
assert.Nil(t, err)
@ -164,7 +164,7 @@ func TestCertVerifyDisabledGlobalEnv(t *testing.T) {
assert.False(t, tr.TLSClientConfig.InsecureSkipVerify)
}
c, err := NewClient(Env(map[string]string{
c, err := NewClient(TestEnv(map[string]string{
"GIT_SSL_NO_VERIFY": "1",
}), nil)
@ -185,7 +185,7 @@ func TestCertVerifyDisabledGlobalConfig(t *testing.T) {
assert.False(t, tr.TLSClientConfig.InsecureSkipVerify)
}
c, err := NewClient(nil, Env(map[string]string{
c, err := NewClient(nil, TestEnv(map[string]string{
"http.sslverify": "false",
}))
assert.Nil(t, err)
@ -211,7 +211,7 @@ func TestCertVerifyDisabledHostConfig(t *testing.T) {
assert.False(t, tr.TLSClientConfig.InsecureSkipVerify)
}
c, err := NewClient(nil, Env(map[string]string{
c, err := NewClient(nil, TestEnv(map[string]string{
"http.https://specifichost.com/.sslverify": "false",
}))
assert.Nil(t, err)

@ -99,11 +99,11 @@ func (c *Client) httpClient(host string) *http.Client {
defer c.clientMu.Unlock()
if c.gitEnv == nil {
c.gitEnv = make(Env)
c.gitEnv = make(TestEnv)
}
if c.osEnv == nil {
c.osEnv = make(Env)
c.osEnv = make(TestEnv)
}
if c.hostClients == nil {

@ -99,7 +99,7 @@ func TestClientRedirect(t *testing.T) {
}
func TestNewClient(t *testing.T) {
c, err := NewClient(Env(map[string]string{}), Env(map[string]string{
c, err := NewClient(TestEnv(map[string]string{}), TestEnv(map[string]string{
"lfs.dialtimeout": "151",
"lfs.keepalive": "152",
"lfs.tlstimeout": "153",
@ -119,7 +119,7 @@ func TestNewClientWithGitSSLVerify(t *testing.T) {
assert.False(t, c.SkipSSLVerify)
for _, value := range []string{"true", "1", "t"} {
c, err = NewClient(Env(map[string]string{}), Env(map[string]string{
c, err = NewClient(TestEnv(map[string]string{}), TestEnv(map[string]string{
"http.sslverify": value,
}))
t.Logf("http.sslverify: %q", value)
@ -128,7 +128,7 @@ func TestNewClientWithGitSSLVerify(t *testing.T) {
}
for _, value := range []string{"false", "0", "f"} {
c, err = NewClient(Env(map[string]string{}), Env(map[string]string{
c, err = NewClient(TestEnv(map[string]string{}), TestEnv(map[string]string{
"http.sslverify": value,
}))
t.Logf("http.sslverify: %q", value)
@ -143,18 +143,18 @@ func TestNewClientWithOSSSLVerify(t *testing.T) {
assert.False(t, c.SkipSSLVerify)
for _, value := range []string{"false", "0", "f"} {
c, err = NewClient(Env(map[string]string{
c, err = NewClient(TestEnv(map[string]string{
"GIT_SSL_NO_VERIFY": value,
}), Env(map[string]string{}))
}), TestEnv(map[string]string{}))
t.Logf("GIT_SSL_NO_VERIFY: %q", value)
assert.Nil(t, err)
assert.False(t, c.SkipSSLVerify)
}
for _, value := range []string{"true", "1", "t"} {
c, err = NewClient(Env(map[string]string{
c, err = NewClient(TestEnv(map[string]string{
"GIT_SSL_NO_VERIFY": value,
}), Env(map[string]string{}))
}), TestEnv(map[string]string{}))
t.Logf("GIT_SSL_NO_VERIFY: %q", value)
assert.Nil(t, err)
assert.True(t, c.SkipSSLVerify)
@ -170,7 +170,7 @@ func TestNewRequest(t *testing.T) {
}
for _, test := range tests {
c, err := NewClient(nil, Env(map[string]string{
c, err := NewClient(nil, TestEnv(map[string]string{
"lfs.url": test[0],
}))
require.Nil(t, err)

@ -36,7 +36,7 @@ type EndpointFinder interface {
}
type endpointGitFinder struct {
git env
git Env
gitProtocol string
aliasMu sync.Mutex
@ -46,7 +46,7 @@ type endpointGitFinder struct {
urlAccess map[string]Access
}
func NewEndpointFinder(git env) EndpointFinder {
func NewEndpointFinder(git Env) EndpointFinder {
e := &endpointGitFinder{
gitProtocol: "https",
aliases: make(map[string]string),
@ -229,7 +229,7 @@ func urlWithoutAuth(rawurl string) string {
return u.String()
}
func fetchGitAccess(git env, key string) Access {
func fetchGitAccess(git Env, key string) Access {
if v, _ := git.Get(key); len(v) > 0 {
access := Access(strings.ToLower(v))
if access == PrivateAccess {
@ -269,7 +269,7 @@ func (e *endpointGitFinder) ReplaceUrlAlias(rawurl string) string {
return rawurl
}
func initAliases(e *endpointGitFinder, git env) {
func initAliases(e *endpointGitFinder, git Env) {
prefix := "url."
suffix := ".insteadof"
for gitkey, gitval := range git.All() {

@ -7,7 +7,7 @@ import (
)
func TestEndpointDefaultsToOrigin(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.lfsurl": "abc",
}))
@ -18,7 +18,7 @@ func TestEndpointDefaultsToOrigin(t *testing.T) {
}
func TestEndpointOverridesOrigin(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"lfs.url": "abc",
"remote.origin.lfsurl": "def",
}))
@ -30,7 +30,7 @@ func TestEndpointOverridesOrigin(t *testing.T) {
}
func TestEndpointNoOverrideDefaultRemote(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.lfsurl": "abc",
"remote.other.lfsurl": "def",
}))
@ -42,7 +42,7 @@ func TestEndpointNoOverrideDefaultRemote(t *testing.T) {
}
func TestEndpointUseAlternateRemote(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.lfsurl": "abc",
"remote.other.lfsurl": "def",
}))
@ -54,7 +54,7 @@ func TestEndpointUseAlternateRemote(t *testing.T) {
}
func TestEndpointAddsLfsSuffix(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "https://example.com/foo/bar",
}))
@ -65,7 +65,7 @@ func TestEndpointAddsLfsSuffix(t *testing.T) {
}
func TestBareEndpointAddsLfsSuffix(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "https://example.com/foo/bar.git",
}))
@ -76,7 +76,7 @@ func TestBareEndpointAddsLfsSuffix(t *testing.T) {
}
func TestEndpointSeparateClonePushUrl(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "https://example.com/foo/bar.git",
"remote.origin.pushurl": "https://readwrite.com/foo/bar.git",
}))
@ -93,7 +93,7 @@ func TestEndpointSeparateClonePushUrl(t *testing.T) {
}
func TestEndpointOverriddenSeparateClonePushLfsUrl(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "https://example.com/foo/bar.git",
"remote.origin.pushurl": "https://readwrite.com/foo/bar.git",
"remote.origin.lfsurl": "https://examplelfs.com/foo/bar",
@ -112,7 +112,7 @@ func TestEndpointOverriddenSeparateClonePushLfsUrl(t *testing.T) {
}
func TestEndpointGlobalSeparateLfsPush(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"lfs.url": "https://readonly.com/foo/bar",
"lfs.pushurl": "https://write.com/foo/bar",
}))
@ -129,7 +129,7 @@ func TestEndpointGlobalSeparateLfsPush(t *testing.T) {
}
func TestSSHEndpointOverridden(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "git@example.com:foo/bar",
"remote.origin.lfsurl": "lfs",
}))
@ -142,7 +142,7 @@ func TestSSHEndpointOverridden(t *testing.T) {
}
func TestSSHEndpointAddsLfsSuffix(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "ssh://git@example.com/foo/bar",
}))
@ -154,7 +154,7 @@ func TestSSHEndpointAddsLfsSuffix(t *testing.T) {
}
func TestSSHCustomPortEndpointAddsLfsSuffix(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "ssh://git@example.com:9000/foo/bar",
}))
@ -166,7 +166,7 @@ func TestSSHCustomPortEndpointAddsLfsSuffix(t *testing.T) {
}
func TestBareSSHEndpointAddsLfsSuffix(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "git@example.com:foo/bar.git",
}))
@ -178,7 +178,7 @@ func TestBareSSHEndpointAddsLfsSuffix(t *testing.T) {
}
func TestSSHEndpointFromGlobalLfsUrl(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"lfs.url": "git@example.com:foo/bar.git",
}))
@ -190,7 +190,7 @@ func TestSSHEndpointFromGlobalLfsUrl(t *testing.T) {
}
func TestHTTPEndpointAddsLfsSuffix(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "http://example.com/foo/bar",
}))
@ -202,7 +202,7 @@ func TestHTTPEndpointAddsLfsSuffix(t *testing.T) {
}
func TestBareHTTPEndpointAddsLfsSuffix(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "http://example.com/foo/bar.git",
}))
@ -214,7 +214,7 @@ func TestBareHTTPEndpointAddsLfsSuffix(t *testing.T) {
}
func TestGitEndpointAddsLfsSuffix(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "git://example.com/foo/bar",
}))
@ -226,7 +226,7 @@ func TestGitEndpointAddsLfsSuffix(t *testing.T) {
}
func TestGitEndpointAddsLfsSuffixWithCustomProtocol(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "git://example.com/foo/bar",
"lfs.gitprotocol": "http",
}))
@ -239,7 +239,7 @@ func TestGitEndpointAddsLfsSuffixWithCustomProtocol(t *testing.T) {
}
func TestBareGitEndpointAddsLfsSuffix(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"remote.origin.url": "git://example.com/foo/bar.git",
}))
@ -266,7 +266,7 @@ func TestAccessConfig(t *testing.T) {
}
for value, expected := range tests {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"lfs.url": "http://example.com",
"lfs.http://example.com.access": value,
"lfs.https://example.com.access": "bad",
@ -285,7 +285,7 @@ func TestAccessConfig(t *testing.T) {
// Test again but with separate push url
for value, expected := range tests {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"lfs.url": "http://example.com",
"lfs.pushurl": "http://examplepush.com",
"lfs.http://example.com.access": value,
@ -312,7 +312,7 @@ func TestAccessAbsentConfig(t *testing.T) {
}
func TestSetAccess(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{}))
finder := NewEndpointFinder(TestEnv(map[string]string{}))
assert.Equal(t, NoneAccess, finder.AccessFor("http://example.com"))
finder.SetAccess("http://example.com", NTLMAccess)
@ -320,7 +320,7 @@ func TestSetAccess(t *testing.T) {
}
func TestChangeAccess(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"lfs.http://example.com.access": "basic",
}))
@ -330,7 +330,7 @@ func TestChangeAccess(t *testing.T) {
}
func TestDeleteAccessWithNone(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"lfs.http://example.com.access": "basic",
}))
@ -340,7 +340,7 @@ func TestDeleteAccessWithNone(t *testing.T) {
}
func TestDeleteAccessWithEmptyString(t *testing.T) {
finder := NewEndpointFinder(Env(map[string]string{
finder := NewEndpointFinder(TestEnv(map[string]string{
"lfs.http://example.com.access": "basic",
}))

@ -46,17 +46,17 @@ type Client struct {
transferMu sync.Mutex
// only used for per-host ssl certs
gitEnv env
osEnv env
gitEnv Env
osEnv Env
}
func NewClient(osEnv env, gitEnv env) (*Client, error) {
func NewClient(osEnv Env, gitEnv Env) (*Client, error) {
if osEnv == nil {
osEnv = make(Env)
osEnv = make(TestEnv)
}
if gitEnv == nil {
gitEnv = make(Env)
gitEnv = make(TestEnv)
}
netrc, err := ParseNetrc(osEnv)
@ -90,6 +90,14 @@ func NewClient(osEnv env, gitEnv env) (*Client, error) {
return c, nil
}
func (c *Client) GitEnv() Env {
return c.gitEnv
}
func (c *Client) OSEnv() Env {
return c.osEnv
}
func IsDecodeTypeError(err error) bool {
_, ok := err.(*decodeTypeError)
return ok
@ -121,23 +129,25 @@ func DecodeJSON(res *http.Response, obj interface{}) error {
return nil
}
type env interface {
// Env is an interface for the config.Environment methods that this package
// relies on.
type Env interface {
Get(string) (string, bool)
Int(string, int) int
Bool(string, bool) bool
All() map[string]string
}
// basic config.Environment implementation. Only used in tests, or as a zero
// value to NewClient().
type Env map[string]string
// TestEnv is a basic config.Environment implementation. Only used in tests, or
// as a zero value to NewClient().
type TestEnv map[string]string
func (e Env) Get(key string) (string, bool) {
func (e TestEnv) Get(key string) (string, bool) {
v, ok := e[key]
return v, ok
}
func (e Env) Int(key string, def int) (val int) {
func (e TestEnv) Int(key string, def int) (val int) {
s, _ := e.Get(key)
if len(s) == 0 {
return def
@ -151,7 +161,7 @@ func (e Env) Int(key string, def int) (val int) {
return i
}
func (e Env) Bool(key string, def bool) (val bool) {
func (e TestEnv) Bool(key string, def bool) (val bool) {
s, _ := e.Get(key)
if len(s) == 0 {
return def
@ -167,6 +177,6 @@ func (e Env) Bool(key string, def bool) (val bool) {
}
}
func (e Env) All() map[string]string {
func (e TestEnv) All() map[string]string {
return e
}

@ -11,7 +11,7 @@ type NetrcFinder interface {
FindMachine(string) *netrc.Machine
}
func ParseNetrc(osEnv env) (NetrcFinder, error) {
func ParseNetrc(osEnv Env) (NetrcFinder, error) {
home, _ := osEnv.Get("HOME")
if len(home) == 0 {
return &noFinder{}, nil

@ -48,7 +48,7 @@ func ProxyFromClient(c *Client) func(req *http.Request) (*url.URL, error) {
}
}
func getProxyServers(osEnv env, gitEnv env) (string, string, string) {
func getProxyServers(osEnv Env, gitEnv Env) (string, string, string) {
var httpsProxy string
httpProxy, _ := gitEnv.Get("http.proxy")
if strings.HasPrefix(httpProxy, "https://") {

@ -9,9 +9,9 @@ import (
)
func TestProxyFromGitConfig(t *testing.T) {
c, err := NewClient(Env(map[string]string{
c, err := NewClient(TestEnv(map[string]string{
"HTTPS_PROXY": "https://proxy-from-env:8080",
}), Env(map[string]string{
}), TestEnv(map[string]string{
"http.proxy": "https://proxy-from-git-config:8080",
}))
require.Nil(t, err)
@ -25,9 +25,9 @@ func TestProxyFromGitConfig(t *testing.T) {
}
func TestHttpProxyFromGitConfig(t *testing.T) {
c, err := NewClient(Env(map[string]string{
c, err := NewClient(TestEnv(map[string]string{
"HTTPS_PROXY": "https://proxy-from-env:8080",
}), Env(map[string]string{
}), TestEnv(map[string]string{
"http.proxy": "http://proxy-from-git-config:8080",
}))
require.Nil(t, err)
@ -41,7 +41,7 @@ func TestHttpProxyFromGitConfig(t *testing.T) {
}
func TestProxyFromEnvironment(t *testing.T) {
c, err := NewClient(Env(map[string]string{
c, err := NewClient(TestEnv(map[string]string{
"HTTPS_PROXY": "https://proxy-from-env:8080",
}), nil)
require.Nil(t, err)
@ -66,9 +66,9 @@ func TestProxyIsNil(t *testing.T) {
}
func TestProxyNoProxy(t *testing.T) {
c, err := NewClient(Env(map[string]string{
c, err := NewClient(TestEnv(map[string]string{
"NO_PROXY": "some-host",
}), Env(map[string]string{
}), TestEnv(map[string]string{
"http.proxy": "https://proxy-from-git-config:8080",
}))
require.Nil(t, err)

@ -62,14 +62,14 @@ func (c *lockClient) Lock(remote string, lockReq *lockRequest) (*lockResponse, *
}
lockRes := &lockResponse{}
err = lfsapi.DecodeJSON(res, lockRes)
return lockRes, res, err
return lockRes, res, lfsapi.DecodeJSON(res, lockRes)
}
// UnlockRequest encapsulates the data sent in an API request to remove a lock.
type unlockRequest struct {
// Id is the Id of the lock that the user wishes to unlock.
Id string `json:"id"`
// Force determines whether or not the lock should be "forcibly"
// unlocked; that is to say whether or not a given individual should be
// able to break a different individual's lock.

@ -39,7 +39,7 @@ func TestAPILock(t *testing.T) {
}))
defer srv.Close()
c, err := lfsapi.NewClient(nil, lfsapi.Env(map[string]string{
c, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.url": srv.URL + "/api",
}))
require.Nil(t, err)
@ -81,7 +81,7 @@ func TestAPIUnlock(t *testing.T) {
}))
defer srv.Close()
c, err := lfsapi.NewClient(nil, lfsapi.Env(map[string]string{
c, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.url": srv.URL + "/api",
}))
require.Nil(t, err)
@ -121,7 +121,7 @@ func TestAPISearch(t *testing.T) {
}))
defer srv.Close()
c, err := lfsapi.NewClient(nil, lfsapi.Env(map[string]string{
c, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.url": srv.URL + "/api",
}))
require.Nil(t, err)

@ -46,7 +46,7 @@ func TestRefreshCache(t *testing.T) {
srv.Close()
}()
lfsclient, err := lfsapi.NewClient(nil, lfsapi.Env(map[string]string{
lfsclient, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.url": srv.URL + "/api",
"user.name": "Fred",
"user.email": "fred@bloggs.com",

@ -49,7 +49,6 @@ func main() {
}
func testServerApi(cmd *cobra.Command, args []string) {
if (len(apiUrl) == 0 && len(cloneUrl) == 0) ||
(len(apiUrl) != 0 && len(cloneUrl) != 0) {
exit("Must supply either --url or --clone (and not both)")
@ -138,6 +137,12 @@ func (*testDataCallback) Errorf(format string, args ...interface{}) {
}
func buildTestData() (oidsExist, oidsMissing []TestObject, err error) {
cfg := config.Config
apiClient, err := lfsapi.NewClient(cfg.Os, cfg.Git)
if err != nil {
return nil, nil, err
}
const oidCount = 50
oidsExist = make([]TestObject, 0, oidCount)
oidsMissing = make([]TestObject, 0, oidCount)
@ -161,7 +166,8 @@ func buildTestData() (oidsExist, oidsMissing []TestObject, err error) {
outputs := repo.AddCommits([]*test.CommitInput{&commit})
// now upload
uploadQueue := lfs.NewUploadQueue(config.Config, tq.WithProgress(meter))
manifest := tq.NewManifestWithClient(apiClient)
uploadQueue := tq.NewTransferQueue(tq.Upload, manifest, "", tq.WithProgress(meter))
for _, f := range outputs[0].Files {
oidsExist = append(oidsExist, TestObject{Oid: f.Oid, Size: f.Size})

@ -100,4 +100,3 @@ begin_test "batch transfers occur in reverse order by size"
[ "$pos_large" -lt "$pos_small" ]
)
end_test

@ -102,7 +102,8 @@ begin_test "custom-transfer-upload-download"
grep "xfer: started custom adapter process" fetchcustom.log
grep "xfer\[lfstest-customadapter\]:" fetchcustom.log
grep "11 of 11 files" fetchcustom.log
[ `find .git/lfs/objects -type f | wc -l` = 11 ]
objectlist=`find .git/lfs/objects -type f`
[ "$(echo "$objectlist" | wc -l)" -eq 11 ]
)
end_test

@ -2,8 +2,10 @@ package tq
import (
"fmt"
"net/http"
"sync"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/rubyist/tracerx"
)
@ -14,6 +16,8 @@ type adapterBase struct {
name string
direction Direction
transferImpl transferImplementation
apiClient *lfsapi.Client
remote string
jobChan chan *job
cb ProgressCallback
// WaitGroup to sync the completion of all workers
@ -47,7 +51,6 @@ func newAdapterBase(name string, dir Direction, ti transferImplementation) *adap
name: name,
direction: dir,
transferImpl: ti,
jobWait: new(sync.WaitGroup),
}
}
@ -61,6 +64,8 @@ func (a *adapterBase) Direction() Direction {
}
func (a *adapterBase) Begin(cfg AdapterConfig, cb ProgressCallback) error {
a.apiClient = cfg.APIClient()
a.remote = cfg.Remote()
a.cb = cb
a.jobChan = make(chan *job, 100)
maxConcurrency := cfg.ConcurrentTransfers()
@ -123,7 +128,6 @@ func (a *adapterBase) End() {
// worker function, many of these run per adapter
func (a *adapterBase) worker(workerNum int, ctx interface{}) {
tracerx.Printf("xfer: adapter %q worker %d starting", a.Name(), workerNum)
waitForAuth := workerNum > 0
signalAuthOnResponse := workerNum == 0
@ -172,6 +176,26 @@ func (a *adapterBase) worker(workerNum int, ctx interface{}) {
a.workerWait.Done()
}
func (a *adapterBase) newHTTPRequest(method string, rel *Action) (*http.Request, error) {
req, err := http.NewRequest(method, rel.Href, nil)
if err != nil {
return nil, err
}
for key, value := range rel.Header {
req.Header.Set(key, value)
}
return req, nil
}
func (a *adapterBase) doHTTP(t *Transfer, req *http.Request) (*http.Response, error) {
if t.Authenticated {
return a.apiClient.Do(req)
}
return a.apiClient.DoWithAuth(a.remote, req)
}
func advanceCallbackProgress(cb ProgressCallback, t *Transfer, numBytes int64) {
if cb != nil {
// Must split into max int sizes since read count is int

66
tq/api.go Normal file

@ -0,0 +1,66 @@
package tq
import (
"net/http"
"strings"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/rubyist/tracerx"
)
type tqClient struct {
*lfsapi.Client
}
type batchRequest struct {
Operation string `json:"operation"`
Objects []*api.ObjectResource `json:"objects"`
TransferAdapterNames []string `json:"transfers,omitempty"`
}
type batchResponse struct {
Endpoint lfsapi.Endpoint
TransferAdapterName string `json:"transfer"`
Objects []*api.ObjectResource `json:"objects"`
}
func (c *tqClient) Batch(remote string, bReq *batchRequest) (*batchResponse, *http.Response, error) {
bRes := &batchResponse{}
if len(bReq.Objects) == 0 {
return bRes, nil, nil
}
if len(bReq.TransferAdapterNames) == 1 && bReq.TransferAdapterNames[0] == "basic" {
bReq.TransferAdapterNames = nil
}
bRes.Endpoint = c.Endpoints.Endpoint(bReq.Operation, remote)
req, err := c.NewRequest("POST", bRes.Endpoint, "objects/batch", bReq)
if err != nil {
return nil, nil, errors.Wrap(err, "batch request")
}
tracerx.Printf("api: batch %d files", len(bReq.Objects))
res, err := c.DoWithAuth(remote, req)
if err != nil {
tracerx.Printf("api error: %s", err)
return nil, nil, errors.Wrap(err, "batch response")
}
c.LogResponse("lfs.batch", res)
if err := lfsapi.DecodeJSON(res, bRes); err != nil {
return bRes, res, errors.Wrap(err, "batch response")
}
if res.StatusCode != 200 {
return nil, res, errors.Errorf("Invalid status for %s %s: %d",
req.Method,
strings.SplitN(req.URL.String(), "?", 2)[0],
res.StatusCode)
}
return bRes, res, nil
}

118
tq/api_test.go Normal file

@ -0,0 +1,118 @@
package tq
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAPIBatch(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/objects/batch" {
w.WriteHeader(404)
return
}
assert.Equal(t, "POST", r.Method)
bReq := &batchRequest{}
err := json.NewDecoder(r.Body).Decode(bReq)
r.Body.Close()
assert.Nil(t, err)
assert.EqualValues(t, []string{"basic", "whatev"}, bReq.TransferAdapterNames)
if assert.Equal(t, 1, len(bReq.Objects)) {
assert.Equal(t, "a", bReq.Objects[0].Oid)
}
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(&batchResponse{
TransferAdapterName: "basic",
Objects: bReq.Objects,
})
}))
defer srv.Close()
c, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.url": srv.URL + "/api",
}))
require.Nil(t, err)
tqc := &tqClient{Client: c}
bReq := &batchRequest{
TransferAdapterNames: []string{"basic", "whatev"},
Objects: []*api.ObjectResource{
&api.ObjectResource{Oid: "a", Size: 1},
},
}
bRes, res, err := tqc.Batch("remote", bReq)
require.Nil(t, err)
assert.Equal(t, 200, res.StatusCode)
assert.Equal(t, "basic", bRes.TransferAdapterName)
if assert.Equal(t, 1, len(bRes.Objects)) {
assert.Equal(t, "a", bRes.Objects[0].Oid)
}
}
func TestAPIBatchOnlyBasic(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/objects/batch" {
w.WriteHeader(404)
return
}
assert.Equal(t, "POST", r.Method)
bReq := &batchRequest{}
err := json.NewDecoder(r.Body).Decode(bReq)
r.Body.Close()
assert.Nil(t, err)
assert.Equal(t, 0, len(bReq.TransferAdapterNames))
if assert.Equal(t, 1, len(bReq.Objects)) {
assert.Equal(t, "a", bReq.Objects[0].Oid)
}
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(&batchResponse{
TransferAdapterName: "basic",
})
}))
defer srv.Close()
c, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.url": srv.URL + "/api",
}))
require.Nil(t, err)
tqc := &tqClient{Client: c}
bReq := &batchRequest{
TransferAdapterNames: []string{"basic"},
Objects: []*api.ObjectResource{
&api.ObjectResource{Oid: "a", Size: 1},
},
}
bRes, res, err := tqc.Batch("remote", bReq)
require.Nil(t, err)
assert.Equal(t, 200, res.StatusCode)
assert.Equal(t, "basic", bRes.TransferAdapterName)
}
func TestAPIBatchEmptyObjects(t *testing.T) {
c, err := lfsapi.NewClient(nil, nil)
require.Nil(t, err)
tqc := &tqClient{Client: c}
bReq := &batchRequest{
TransferAdapterNames: []string{"basic", "whatev"},
}
bRes, res, err := tqc.Batch("remote", bReq)
require.Nil(t, err)
assert.Nil(t, res)
assert.Equal(t, "", bRes.TransferAdapterName)
assert.Equal(t, 0, len(bRes.Objects))
}

@ -9,9 +9,7 @@ import (
"regexp"
"strconv"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/httputil"
"github.com/git-lfs/git-lfs/localstorage"
"github.com/git-lfs/git-lfs/tools"
"github.com/rubyist/tracerx"
@ -44,7 +42,6 @@ func (a *basicDownloadAdapter) WorkerEnding(workerNum int, ctx interface{}) {
}
func (a *basicDownloadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error {
f, fromByte, hashSoFar, err := a.checkResumeDownload(t)
if err != nil {
return err
@ -97,7 +94,7 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
// return errors.New("Object not found on the server.")
}
req, err := httputil.NewHttpRequest("GET", rel.Href, rel.Header)
req, err := a.newHTTPRequest("GET", rel)
if err != nil {
return err
}
@ -110,7 +107,7 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", fromByte, t.Size-1))
}
res, err := httputil.DoHttpRequest(config.Config, req, !t.Authenticated)
res, err := a.doHTTP(t, req)
if err != nil {
// Special-case status code 416 () - fall back
if fromByte > 0 && dlFile != nil && res.StatusCode == 416 {
@ -121,7 +118,8 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
}
return errors.NewRetriableError(err)
}
httputil.LogTransfer(config.Config, "lfs.data.download", res)
a.apiClient.LogResponse("lfs.data.download", res)
defer res.Body.Close()
// Range request must return 206 & content range to confirm

@ -6,11 +6,9 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/httputil"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/git-lfs/git-lfs/progress"
)
@ -50,7 +48,7 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
// return fmt.Errorf("No upload action for this object.")
}
req, err := httputil.NewHttpRequest("PUT", rel.Href, rel.Header)
req, err := a.newHTTPRequest("PUT", rel)
if err != nil {
return err
}
@ -97,11 +95,12 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
req.Body = ioutil.NopCloser(reader)
res, err := httputil.DoHttpRequest(config.Config, req, !t.Authenticated)
res, err := a.doHTTP(t, req)
if err != nil {
return errors.NewRetriableError(err)
}
httputil.LogTransfer(config.Config, "lfs.data.upload", res)
a.apiClient.LogResponse("lfs.data.upload", res)
// A status code of 403 likely means that an authentication token for the
// upload has expired. This can be safely retried.
@ -111,14 +110,17 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
}
if res.StatusCode > 299 {
return errors.Wrapf(nil, "Invalid status for %s: %d", httputil.TraceHttpReq(req), res.StatusCode)
return errors.Wrapf(nil, "Invalid status for %s %s: %d",
req.Method,
strings.SplitN(req.URL.String(), "?", 2)[0],
res.StatusCode,
)
}
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()
cli := &lfsapi.Client{}
return verifyUpload(cli, t)
return verifyUpload(a.apiClient, t)
}
// startCallbackReader is a reader wrapper which calls a function as soon as the

@ -111,8 +111,7 @@ func (a *customAdapter) Begin(cfg AdapterConfig, cb ProgressCallback) error {
}
// If config says not to launch multiple processes, downgrade incoming value
newCfg := &Manifest{concurrentTransfers: 1}
return a.adapterBase.Begin(newCfg, cb)
return a.adapterBase.Begin(&customAdapterConfig{AdapterConfig: cfg}, cb)
}
func (a *customAdapter) ClearTempStorage() error {
@ -376,3 +375,11 @@ func configureCustomAdapters(git Env, m *Manifest) {
}
}
}
type customAdapterConfig struct {
AdapterConfig
}
func (c *customAdapterConfig) ConcurrentTransfers() int {
return 1
}

@ -3,17 +3,19 @@ package tq
import (
"testing"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCustomTransferBasicConfig(t *testing.T) {
path := "/path/to/binary"
cfg := config.NewFrom(config.Values{
Git: map[string]string{"lfs.customtransfer.testsimple.path": path},
})
cli, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.customtransfer.testsimple.path": path,
}))
require.Nil(t, err)
m := NewManifestWithGitEnv("", cfg.Git)
m := NewManifestWithClient(cli)
u := m.NewUploadAdapter("testsimple")
assert.NotNil(t, u, "Upload adapter should be present")
cu, _ := u.(*customAdapter)
@ -34,16 +36,15 @@ func TestCustomTransferBasicConfig(t *testing.T) {
func TestCustomTransferDownloadConfig(t *testing.T) {
path := "/path/to/binary"
args := "-c 1 --whatever"
cfg := config.NewFrom(config.Values{
Git: map[string]string{
cli, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.customtransfer.testdownload.path": path,
"lfs.customtransfer.testdownload.args": args,
"lfs.customtransfer.testdownload.concurrent": "false",
"lfs.customtransfer.testdownload.direction": "download",
},
})
}))
require.Nil(t, err)
m := NewManifestWithGitEnv("", cfg.Git)
m := NewManifestWithClient(cli)
u := m.NewUploadAdapter("testdownload")
assert.NotNil(t, u, "Upload adapter should always be created")
cu, _ := u.(*customAdapter)
@ -61,16 +62,15 @@ func TestCustomTransferDownloadConfig(t *testing.T) {
func TestCustomTransferUploadConfig(t *testing.T) {
path := "/path/to/binary"
args := "-c 1 --whatever"
cfg := config.NewFrom(config.Values{
Git: map[string]string{
cli, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.customtransfer.testupload.path": path,
"lfs.customtransfer.testupload.args": args,
"lfs.customtransfer.testupload.concurrent": "false",
"lfs.customtransfer.testupload.direction": "upload",
},
})
}))
require.Nil(t, err)
m := NewManifestWithGitEnv("", cfg.Git)
m := NewManifestWithClient(cli)
d := m.NewDownloadAdapter("testupload")
assert.NotNil(t, d, "Download adapter should always be created")
cd, _ := d.(*customAdapter)
@ -88,16 +88,15 @@ func TestCustomTransferUploadConfig(t *testing.T) {
func TestCustomTransferBothConfig(t *testing.T) {
path := "/path/to/binary"
args := "-c 1 --whatever --yeah"
cfg := config.NewFrom(config.Values{
Git: map[string]string{
cli, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.customtransfer.testboth.path": path,
"lfs.customtransfer.testboth.args": args,
"lfs.customtransfer.testboth.concurrent": "yes",
"lfs.customtransfer.testboth.direction": "both",
},
})
}))
require.Nil(t, err)
m := NewManifestWithGitEnv("", cfg.Git)
m := NewManifestWithClient(cli)
d := m.NewDownloadAdapter("testboth")
assert.NotNil(t, d, "Download adapter should be present")
cd, _ := d.(*customAdapter)

@ -3,6 +3,7 @@ package tq
import (
"sync"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/rubyist/tracerx"
)
@ -20,9 +21,14 @@ type Manifest struct {
tusTransfersAllowed bool
downloadAdapterFuncs map[string]NewAdapterFunc
uploadAdapterFuncs map[string]NewAdapterFunc
apiClient *lfsapi.Client
mu sync.Mutex
}
func (m *Manifest) APIClient() *lfsapi.Client {
return m.apiClient
}
func (m *Manifest) MaxRetries() int {
return m.maxRetries
}
@ -32,17 +38,24 @@ func (m *Manifest) ConcurrentTransfers() int {
}
func NewManifest() *Manifest {
return NewManifestWithGitEnv("", nil)
cli, err := lfsapi.NewClient(nil, nil)
if err != nil {
tracerx.Printf("unable to init tq.Manifest: %s", err)
return nil
}
return NewManifestWithClient(cli)
}
func NewManifestWithGitEnv(access string, git Env) *Manifest {
func NewManifestWithClient(apiClient *lfsapi.Client) *Manifest {
m := &Manifest{
apiClient: apiClient,
downloadAdapterFuncs: make(map[string]NewAdapterFunc),
uploadAdapterFuncs: make(map[string]NewAdapterFunc),
}
var tusAllowed bool
if git != nil {
if git := apiClient.GitEnv(); git != nil {
if v := git.Int("lfs.transfer.maxretries", 0); v > 0 {
m.maxRetries = v
}
@ -58,9 +71,7 @@ func NewManifestWithGitEnv(access string, git Env) *Manifest {
m.maxRetries = defaultMaxRetries
}
if access == "ntlm" {
m.concurrentTransfers = 1
} else if m.concurrentTransfers < 1 {
if m.concurrentTransfers < 1 {
m.concurrentTransfers = defaultConcurrentTransfers
}

51
tq/manifest_test.go Normal file

@ -0,0 +1,51 @@
package tq
import (
"testing"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestManifestIsConfigurable(t *testing.T) {
cli, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.transfer.maxretries": "3",
}))
require.Nil(t, err)
m := NewManifestWithClient(cli)
assert.Equal(t, 3, m.MaxRetries())
}
func TestManifestChecksNTLM(t *testing.T) {
cli, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.url": "http://foo",
"lfs.http://foo.access": "ntlm",
"lfs.concurrenttransfers": "3",
}))
require.Nil(t, err)
m := NewManifestWithClient(cli)
assert.Equal(t, 1, m.MaxRetries())
}
func TestManifestClampsValidValues(t *testing.T) {
cli, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.transfer.maxretries": "-1",
}))
require.Nil(t, err)
m := NewManifestWithClient(cli)
assert.Equal(t, 1, m.MaxRetries())
}
func TestManifestIgnoresNonInts(t *testing.T) {
cli, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.transfer.maxretries": "not_an_int",
}))
require.Nil(t, err)
m := NewManifestWithClient(cli)
assert.Equal(t, 1, m.MaxRetries())
}

@ -8,6 +8,7 @@ import (
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/lfsapi"
)
type Direction int
@ -157,7 +158,27 @@ type NewAdapterFunc func(name string, dir Direction) Adapter
type ProgressCallback func(name string, totalSize, readSoFar int64, readSinceLast int) error
type AdapterConfig interface {
APIClient() *lfsapi.Client
ConcurrentTransfers() int
Remote() string
}
type adapterConfig struct {
apiClient *lfsapi.Client
concurrentTransfers int
remote string
}
func (c *adapterConfig) ConcurrentTransfers() int {
return c.concurrentTransfers
}
func (c *adapterConfig) APIClient() *lfsapi.Client {
return c.apiClient
}
func (c *adapterConfig) Remote() string {
return c.remote
}
// Adapter is implemented by types which can upload and/or download LFS

@ -5,8 +5,8 @@ import (
"sync"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/git-lfs/git-lfs/progress"
"github.com/rubyist/tracerx"
)
@ -94,6 +94,8 @@ func (b batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
// adapters, and dealing with progress, errors and retries.
type TransferQueue struct {
direction Direction
client *tqClient
remote string
adapter Adapter
adapterInProgress bool
adapterInitMutex sync.Mutex
@ -147,9 +149,11 @@ func WithBufferDepth(depth int) Option {
}
// NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *TransferQueue {
func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue {
q := &TransferQueue{
direction: dir,
client: &tqClient{Client: manifest.APIClient()},
remote: remote,
errorc: make(chan error),
transfers: make(map[string]*objectTuple),
trMutex: &sync.Mutex{},
@ -281,16 +285,16 @@ func (q *TransferQueue) collectBatches() {
// enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been
// processed.
func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) {
cfg := config.Config
next := q.makeBatch()
transferAdapterNames := q.manifest.GetAdapterNames(q.direction)
tracerx.Printf("tq: sending batch of size %d", len(batch))
objs, adapterName, err := api.Batch(
cfg, batch.ApiObjects(), q.transferKind(), transferAdapterNames,
)
bReq := &batchRequest{
Operation: q.transferKind(),
Objects: batch.ApiObjects(),
TransferAdapterNames: q.manifest.GetAdapterNames(q.direction),
}
bRes, _, err := q.client.Batch(q.remote, bReq)
if err != nil {
// If there was an error making the batch API call, mark all of
// the objects for retry, and return them along with the error
@ -309,12 +313,16 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
return next, err
}
q.useAdapter(adapterName)
if len(bRes.Objects) == 0 {
return next, nil
}
q.useAdapter(bRes.TransferAdapterName)
q.startProgress.Do(q.meter.Start)
toTransfer := make([]*Transfer, 0, len(objs))
toTransfer := make([]*Transfer, 0, len(bRes.Objects))
for _, o := range objs {
for _, o := range bRes.Objects {
if o.Error != nil {
q.errorc <- errors.Wrapf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
q.Skip(o.Size)
@ -362,7 +370,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
}
}
retries := q.addToAdapter(toTransfer)
retries := q.addToAdapter(bRes.Endpoint, toTransfer)
for t := range retries {
q.rc.Increment(t.Oid)
count := q.rc.CountFor(t.Oid)
@ -385,10 +393,10 @@ func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) }
// closed.
//
// addToAdapter returns immediately, and does not block.
func (q *TransferQueue) addToAdapter(pending []*Transfer) <-chan *objectTuple {
func (q *TransferQueue) addToAdapter(e lfsapi.Endpoint, pending []*Transfer) <-chan *objectTuple {
retries := make(chan *objectTuple, len(pending))
if err := q.ensureAdapterBegun(); err != nil {
if err := q.ensureAdapterBegun(e); err != nil {
close(retries)
q.errorc <- err
@ -515,7 +523,7 @@ func (q *TransferQueue) transferKind() string {
}
}
func (q *TransferQueue) ensureAdapterBegun() error {
func (q *TransferQueue) ensureAdapterBegun(e lfsapi.Endpoint) error {
q.adapterInitMutex.Lock()
defer q.adapterInitMutex.Unlock()
@ -530,7 +538,7 @@ func (q *TransferQueue) ensureAdapterBegun() error {
}
tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name())
err := q.adapter.Begin(q.manifest, cb)
err := q.adapter.Begin(q.toAdapterCfg(e), cb)
if err != nil {
return err
}
@ -539,6 +547,20 @@ func (q *TransferQueue) ensureAdapterBegun() error {
return nil
}
func (q *TransferQueue) toAdapterCfg(e lfsapi.Endpoint) AdapterConfig {
apiClient := q.manifest.APIClient()
concurrency := q.manifest.ConcurrentTransfers()
if apiClient.Endpoints.AccessFor(e.Url) == lfsapi.NTLMAccess {
concurrency = 1
}
return &adapterConfig{
concurrentTransfers: concurrency,
apiClient: apiClient,
remote: q.remote,
}
}
// Wait waits for the queue to finish processing all transfers. Once Wait is
// called, Add will no longer add transfers to the queue. Any failed
// transfers will be automatically retried once.

@ -3,9 +3,9 @@ package tq
import (
"testing"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type testAdapter struct {
@ -118,10 +118,12 @@ func testAdapterRegAndOverride(t *testing.T) {
}
func testAdapterRegButBasicOnly(t *testing.T) {
cfg := config.NewFrom(config.Values{
Git: map[string]string{"lfs.basictransfersonly": "yes"},
})
m := NewManifestWithGitEnv("", cfg.Git)
cli, err := lfsapi.NewClient(nil, lfsapi.TestEnv(map[string]string{
"lfs.basictransfersonly": "yes",
}))
require.Nil(t, err)
m := NewManifestWithClient(cli)
assert := assert.New(t)

@ -6,10 +6,9 @@ import (
"io/ioutil"
"os"
"strconv"
"strings"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/httputil"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/git-lfs/git-lfs/progress"
"github.com/rubyist/tracerx"
@ -49,12 +48,14 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
// 1. Send HEAD request to determine upload start point
// Request must include Tus-Resumable header (version)
tracerx.Printf("xfer: sending tus.io HEAD request for %q", t.Oid)
req, err := httputil.NewHttpRequest("HEAD", rel.Href, rel.Header)
req, err := a.newHTTPRequest("HEAD", rel)
if err != nil {
return err
}
req.Header.Set("Tus-Resumable", TusVersion)
res, err := httputil.DoHttpRequest(config.Config, req, false)
res, err := a.doHTTP(t, req)
if err != nil {
return errors.NewRetriableError(err)
}
@ -101,10 +102,11 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
// Response may include Upload-Expires header in which case check not passed
tracerx.Printf("xfer: sending tus.io PATCH request for %q", t.Oid)
req, err = httputil.NewHttpRequest("PATCH", rel.Href, rel.Header)
req, err = a.newHTTPRequest("PATCH", rel)
if err != nil {
return err
}
req.Header.Set("Tus-Resumable", TusVersion)
req.Header.Set("Upload-Offset", strconv.FormatInt(offset, 10))
req.Header.Set("Content-Type", "application/offset+octet-stream")
@ -135,11 +137,12 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
req.Body = ioutil.NopCloser(reader)
res, err = httputil.DoHttpRequest(config.Config, req, false)
res, err = a.doHTTP(t, req)
if err != nil {
return errors.NewRetriableError(err)
}
httputil.LogTransfer(config.Config, "lfs.data.upload", res)
a.apiClient.LogResponse("lfs.data.upload", res)
// A status code of 403 likely means that an authentication token for the
// upload has expired. This can be safely retried.
@ -149,7 +152,11 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
}
if res.StatusCode > 299 {
return errors.Wrapf(nil, "Invalid status for %s: %d", httputil.TraceHttpReq(req), res.StatusCode)
return errors.Wrapf(nil, "Invalid status for %s %s: %d",
req.Method,
strings.SplitN(req.URL.String(), "?", 2)[0],
res.StatusCode,
)
}
io.Copy(ioutil.Discard, res.Body)