Merge branch 'api-master' into tq/extract-endpoint

This commit is contained in:
Taylor Blau 2016-12-16 16:13:45 -07:00 committed by GitHub
commit 218347c417
56 changed files with 2374 additions and 893 deletions

2
.gitignore vendored

@ -1,7 +1,7 @@
bin/
benchmark/
out/
resources.syso
resource.syso
# only allow man/*.\d.ronn files
man/*

@ -22,28 +22,24 @@ var (
ErrNoOperationGiven = errors.New("lfs/api: no operation provided in schema")
)
// EndpointSource is an interface which encapsulates the behavior of returning
// `config.Endpoint`s based on a particular operation.
type EndpointSource interface {
// Endpoint returns the `config.Endpoint` assosciated with a given
// operation.
Endpoint(operation string) endpoint.Endpoint
}
// HttpLifecycle serves as the default implementation of the Lifecycle interface
// for HTTP requests. Internally, it leverages the *http.Client type to execute
// HTTP requests against a root *url.URL, as given in `NewHttpLifecycle`.
type HttpLifecycle struct {
endpoints EndpointSource
cfg *config.Configuration
}
var _ Lifecycle = new(HttpLifecycle)
// NewHttpLifecycle initializes a new instance of the *HttpLifecycle type with a
// new *http.Client, and the given root (see above).
func NewHttpLifecycle(endpoints EndpointSource) *HttpLifecycle {
// Passing a nil Configuration will use the global config
func NewHttpLifecycle(cfg *config.Configuration) *HttpLifecycle {
if cfg == nil {
cfg = config.Config
}
return &HttpLifecycle{
endpoints: endpoints,
cfg: cfg,
}
}
@ -79,7 +75,7 @@ func (l *HttpLifecycle) Build(schema *RequestSchema) (*http.Request, error) {
return nil, err
}
if _, err = auth.GetCreds(config.Config, req); err != nil {
if _, err = auth.GetCreds(l.cfg, req); err != nil {
return nil, err
}
@ -103,7 +99,7 @@ func (l *HttpLifecycle) Build(schema *RequestSchema) (*http.Request, error) {
// Otherwise, the api.Response is returned, along with no error, signaling that
// the request completed successfully.
func (l *HttpLifecycle) Execute(req *http.Request, into interface{}) (Response, error) {
resp, err := httputil.DoHttpRequestWithRedirects(config.Config, req, []*http.Request{}, true)
resp, err := httputil.DoHttpRequestWithRedirects(l.cfg, req, []*http.Request{}, true)
if err != nil {
return nil, err
}
@ -137,7 +133,7 @@ func (l *HttpLifecycle) absolutePath(operation Operation, path string) (*url.URL
return nil, ErrNoOperationGiven
}
root, err := url.Parse(l.endpoints.Endpoint(string(operation)).Url)
root, err := url.Parse(l.cfg.Endpoint(string(operation)).Url)
if err != nil {
return nil, err
}

@ -11,23 +11,17 @@ import (
"github.com/stretchr/testify/assert"
)
type NopEndpointSource struct {
Root string
func NewTestConfig() *config.Configuration {
c := config.NewFrom(config.Values{})
c.SetManualEndpoint(config.Endpoint{Url: "https://example.com"})
return c
}
func (e *NopEndpointSource) Endpoint(op string) endpoint.Endpoint {
return endpoint.Endpoint{Url: e.Root}
}
var (
source = &NopEndpointSource{"https://example.com"}
)
func TestHttpLifecycleMakesRequestsAgainstAbsolutePath(t *testing.T) {
SetupTestCredentialsFunc()
defer RestoreCredentialsFunc()
l := api.NewHttpLifecycle(source)
l := api.NewHttpLifecycle(NewTestConfig())
req, err := l.Build(&api.RequestSchema{
Path: "/foo",
Operation: api.DownloadOperation,
@ -41,7 +35,7 @@ func TestHttpLifecycleAttachesQueryParameters(t *testing.T) {
SetupTestCredentialsFunc()
defer RestoreCredentialsFunc()
l := api.NewHttpLifecycle(source)
l := api.NewHttpLifecycle(NewTestConfig())
req, err := l.Build(&api.RequestSchema{
Path: "/foo",
Operation: api.DownloadOperation,
@ -58,7 +52,7 @@ func TestHttpLifecycleAttachesBodyWhenPresent(t *testing.T) {
SetupTestCredentialsFunc()
defer RestoreCredentialsFunc()
l := api.NewHttpLifecycle(source)
l := api.NewHttpLifecycle(NewTestConfig())
req, err := l.Build(&api.RequestSchema{
Operation: api.DownloadOperation,
Body: struct {
@ -77,7 +71,7 @@ func TestHttpLifecycleDoesNotAttachBodyWhenEmpty(t *testing.T) {
SetupTestCredentialsFunc()
defer RestoreCredentialsFunc()
l := api.NewHttpLifecycle(source)
l := api.NewHttpLifecycle(NewTestConfig())
req, err := l.Build(&api.RequestSchema{
Operation: api.DownloadOperation,
})
@ -90,7 +84,7 @@ func TestHttpLifecycleErrsWithoutOperation(t *testing.T) {
SetupTestCredentialsFunc()
defer RestoreCredentialsFunc()
l := api.NewHttpLifecycle(source)
l := api.NewHttpLifecycle(NewTestConfig())
req, err := l.Build(&api.RequestSchema{
Path: "/foo",
})
@ -113,7 +107,7 @@ func TestHttpLifecycleExecutesRequestWithoutBody(t *testing.T) {
req, _ := http.NewRequest("GET", server.URL+"/path", nil)
l := api.NewHttpLifecycle(source)
l := api.NewHttpLifecycle(NewTestConfig())
_, err := l.Execute(req, nil)
assert.True(t, called)
@ -138,7 +132,7 @@ func TestHttpLifecycleExecutesRequestWithBody(t *testing.T) {
req, _ := http.NewRequest("GET", server.URL+"/path", nil)
l := api.NewHttpLifecycle(source)
l := api.NewHttpLifecycle(NewTestConfig())
resp := new(Response)
_, err := l.Execute(req, resp)

@ -4,8 +4,6 @@ import (
"fmt"
"strconv"
"time"
"github.com/git-lfs/git-lfs/config"
)
// LockService is an API service which encapsulates the Git LFS Locking API.
@ -144,15 +142,8 @@ type Committer struct {
Email string `json:"email"`
}
// CurrentCommitter returns a Committer instance populated with the same
// credentials as would be used to author a commit. In particular, the
// "user.name" and "user.email" configuration values are used from the
// config.Config singleton.
func CurrentCommitter() Committer {
name, _ := config.Config.Git.Get("user.name")
email, _ := config.Config.Git.Get("user.email")
return Committer{name, email}
func NewCommitter(name, email string) Committer {
return Committer{Name: name, Email: email}
}
// LockRequest encapsulates the payload sent across the API when a client would

@ -7,7 +7,7 @@ environment:
clone_folder: $(GOPATH)\src\github.com\git-lfs\git-lfs
install:
- echo $(GOPATH)
- echo %GOPATH%
- rd C:\Go /s /q
- appveyor DownloadFile https://storage.googleapis.com/golang/go1.7.4.windows-amd64.zip
- 7z x go1.7.4.windows-amd64.zip -oC:\ >nul
@ -16,7 +16,6 @@ install:
- set PATH="C:\Program Files (x86)\Inno Setup 5";%PATH%
build_script:
- C:\Ruby23\DevKit\mingw\bin\windres.exe script\windows-installer\resources.rc -o resources.syso
- bash --login -c 'GOARCH=386 script/bootstrap'
- mv bin\git-lfs.exe git-lfs-x86.exe
- bash --login -c 'GOARCH=amd64 script/bootstrap'

@ -1,14 +1,6 @@
package commands
import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"sync"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/filepathfilter"
"github.com/git-lfs/git-lfs/git"
"github.com/git-lfs/git-lfs/lfs"
@ -18,225 +10,58 @@ import (
func checkoutCommand(cmd *cobra.Command, args []string) {
requireInRepo()
// Parameters are filters
// firstly convert any pathspecs to the root of the repo, in case this is being executed in a sub-folder
var rootedpaths []string
inchan := make(chan string, 1)
outchan, err := lfs.ConvertCwdFilesRelativeToRepo(inchan)
if err != nil {
Panic(err, "Could not checkout")
}
for _, arg := range args {
inchan <- arg
rootedpaths = append(rootedpaths, <-outchan)
}
close(inchan)
filter := filepathfilter.New(rootedpaths, nil)
checkoutWithIncludeExclude(filter)
}
func checkoutFromFetchChan(in chan *lfs.WrappedPointer, filter *filepathfilter.Filter) {
ref, err := git.CurrentRef()
if err != nil {
Panic(err, "Could not checkout")
}
// Need to ScanTree to identify multiple files with the same content (fetch will only report oids once)
// use new gitscanner so mapping has all the scanned pointers before continuing
mapping := make(map[string][]*lfs.WrappedPointer)
chgitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) {
if err != nil {
Panic(err, "Could not scan for Git LFS files")
return
}
mapping[p.Oid] = append(mapping[p.Oid], p)
})
chgitscanner.Filter = filter
if err := chgitscanner.ScanTree(ref.Sha, nil); err != nil {
ExitWithError(err)
}
chgitscanner.Close()
// Launch git update-index
c := make(chan *lfs.WrappedPointer)
var wait sync.WaitGroup
wait.Add(1)
go func() {
checkoutWithChan(c)
wait.Done()
}()
// Feed it from in, which comes from fetch
for p := range in {
// Add all of the files for this oid
for _, fp := range mapping[p.Oid] {
c <- fp
}
}
close(c)
wait.Wait()
}
func checkoutWithIncludeExclude(filter *filepathfilter.Filter) {
ref, err := git.CurrentRef()
if err != nil {
Panic(err, "Could not checkout")
}
// this func has to load all pointers into memory
var pointers []*lfs.WrappedPointer
var multiErr error
chgitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) {
if err != nil {
if multiErr != nil {
multiErr = fmt.Errorf("%v\n%v", multiErr, err)
} else {
multiErr = err
}
return
}
pointers = append(pointers, p)
})
chgitscanner.Filter = filter
if err := chgitscanner.ScanTree(ref.Sha, nil); err != nil {
ExitWithError(err)
}
chgitscanner.Close()
if multiErr != nil {
Panic(multiErr, "Could not scan for Git LFS files")
}
var wait sync.WaitGroup
wait.Add(1)
c := make(chan *lfs.WrappedPointer, 1)
go func() {
checkoutWithChan(c)
wait.Done()
}()
meter := progress.NewMeter(progress.WithOSEnv(cfg.Os))
meter.Start()
var totalBytes int64
for _, pointer := range pointers {
totalBytes += pointer.Size
meter.Add(totalBytes)
meter.StartTransfer(pointer.Name)
c <- pointer
meter := progress.NewMeter(progress.WithOSEnv(cfg.Os))
singleCheckout := newSingleCheckout()
chgitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) {
if err != nil {
LoggedError(err, "Scanner error")
return
}
totalBytes += p.Size
meter.Add(p.Size)
meter.StartTransfer(p.Name)
singleCheckout.Run(p)
// not strictly correct (parallel) but we don't have a callback & it's just local
// plus only 1 slot in channel so it'll block & be close
meter.TransferBytes("checkout", pointer.Name, pointer.Size, totalBytes, int(pointer.Size))
meter.FinishTransfer(pointer.Name)
meter.TransferBytes("checkout", p.Name, p.Size, totalBytes, int(p.Size))
meter.FinishTransfer(p.Name)
})
chgitscanner.Filter = filepathfilter.New(rootedPaths(args), nil)
if err := chgitscanner.ScanTree(ref.Sha); err != nil {
ExitWithError(err)
}
close(c)
wait.Wait()
meter.Start()
chgitscanner.Close()
meter.Finish()
singleCheckout.Close()
}
// Populate the working copy with the real content of objects where the file is
// either missing, or contains a matching pointer placeholder, from a list of pointers.
// If the file exists but has other content it is left alone
// Callers of this function MUST NOT Panic or otherwise exit the process
// without waiting for this function to shut down. If the process exits while
// update-index is in the middle of processing a file the git index can be left
// in a locked state.
func checkoutWithChan(in <-chan *lfs.WrappedPointer) {
// Get a converter from repo-relative to cwd-relative
// Since writing data & calling git update-index must be relative to cwd
repopathchan := make(chan string, 1)
cwdpathchan, err := lfs.ConvertRepoFilesRelativeToCwd(repopathchan)
// Parameters are filters
// firstly convert any pathspecs to the root of the repo, in case this is being
// executed in a sub-folder
func rootedPaths(args []string) []string {
pathConverter, err := lfs.NewCurrentToRepoPathConverter()
if err != nil {
Panic(err, "Could not convert file paths")
Panic(err, "Could not checkout")
}
// Don't fire up the update-index command until we have at least one file to
// give it. Otherwise git interprets the lack of arguments to mean param-less update-index
// which can trigger entire working copy to be re-examined, which triggers clean filters
// and which has unexpected side effects (e.g. downloading filtered-out files)
var cmd *exec.Cmd
var updateIdxStdin io.WriteCloser
var updateIdxOut bytes.Buffer
// From this point on, git update-index is running. Code in this loop MUST
// NOT Panic() or otherwise cause the process to exit. If the process exits
// while update-index is in the middle of updating, the index can remain in a
// locked state.
// As files come in, write them to the wd and update the index
manifest := TransferManifest()
for pointer := range in {
// Check the content - either missing or still this pointer (not exist is ok)
filepointer, err := lfs.DecodePointerFromFile(pointer.Name)
if err != nil && !os.IsNotExist(err) {
if errors.IsNotAPointerError(err) {
// File has non-pointer content, leave it alone
continue
}
LoggedError(err, "Problem accessing %v", pointer.Name)
continue
}
if filepointer != nil && filepointer.Oid != pointer.Oid {
// User has probably manually reset a file to another commit
// while leaving it a pointer; don't mess with this
continue
}
repopathchan <- pointer.Name
cwdfilepath := <-cwdpathchan
err = lfs.PointerSmudgeToFile(cwdfilepath, pointer.Pointer, false, manifest, nil)
if err != nil {
if errors.IsDownloadDeclinedError(err) {
// acceptable error, data not local (fetch not run or include/exclude)
LoggedError(err, "Skipped checkout for %v, content not local. Use fetch to download.", pointer.Name)
} else {
LoggedError(err, "Could not checkout file")
continue
}
}
if cmd == nil {
// Fire up the update-index command
cmd = exec.Command("git", "update-index", "-q", "--refresh", "--stdin")
cmd.Stdout = &updateIdxOut
cmd.Stderr = &updateIdxOut
updateIdxStdin, err = cmd.StdinPipe()
if err != nil {
Panic(err, "Could not update the index")
}
if err := cmd.Start(); err != nil {
Panic(err, "Could not update the index")
}
}
updateIdxStdin.Write([]byte(cwdfilepath + "\n"))
}
close(repopathchan)
if cmd != nil && updateIdxStdin != nil {
updateIdxStdin.Close()
if err := cmd.Wait(); err != nil {
LoggedError(err, "Error updating the git index:\n%s", updateIdxOut.String())
}
rootedpaths := make([]string, 0, len(args))
for _, arg := range args {
rootedpaths = append(rootedpaths, pathConverter.Convert(arg))
}
return rootedpaths
}
func init() {

@ -125,7 +125,7 @@ func pointersToFetchForRef(ref string, filter *filepathfilter.Filter) ([]*lfs.Wr
tempgitscanner.Filter = filter
if err := tempgitscanner.ScanTree(ref, nil); err != nil {
if err := tempgitscanner.ScanTree(ref); err != nil {
return nil, err
}
@ -133,18 +133,6 @@ func pointersToFetchForRef(ref string, filter *filepathfilter.Filter) ([]*lfs.Wr
return pointers, multiErr
}
func fetchRefToChan(ref string, filter *filepathfilter.Filter) chan *lfs.WrappedPointer {
c := make(chan *lfs.WrappedPointer)
pointers, err := pointersToFetchForRef(ref, filter)
if err != nil {
Panic(err, "Could not scan for Git LFS files")
}
go fetchAndReportToChan(pointers, filter, c)
return c
}
// Fetch all binaries for a given ref (that we don't have already)
func fetchRef(ref string, filter *filepathfilter.Filter) bool {
pointers, err := pointersToFetchForRef(ref, filter)
@ -326,7 +314,8 @@ func fetchAndReportToChan(allpointers []*lfs.WrappedPointer, filter *filepathfil
for _, p := range pointers {
tracerx.Printf("fetch %v [%v]", p.Name, p.Oid)
q.Add(lfs.NewDownloadable(p))
q.Add(downloadTransfer(p))
}
processQueue := time.Now()

@ -6,60 +6,43 @@ import (
"path/filepath"
"strings"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/git"
"github.com/git-lfs/git-lfs/locking"
"github.com/spf13/cobra"
)
var (
lockRemote string
lockRemoteHelp = "specify which remote to use when interacting with locks"
// TODO(taylor): consider making this (and the above flag) a property of
// some parent-command, or another similarly less ugly way of handling
// this
setLockRemoteFor = func(c *config.Configuration) {
c.CurrentRemote = lockRemote
}
)
func lockCommand(cmd *cobra.Command, args []string) {
setLockRemoteFor(cfg)
if len(args) == 0 {
Print("Usage: git lfs lock <path>")
return
}
latest, err := git.CurrentRemoteRef()
if err != nil {
Error(err.Error())
Exit("Unable to determine lastest remote ref for branch.")
}
path, err := lockPath(args[0])
if err != nil {
Exit(err.Error())
}
s, resp := API.Locks.Lock(&api.LockRequest{
Path: path,
Committer: api.CurrentCommitter(),
LatestRemoteCommit: latest.Sha,
})
if _, err := API.Do(s); err != nil {
Error(err.Error())
Exit("Error communicating with LFS API.")
if len(lockRemote) > 0 {
cfg.CurrentRemote = lockRemote
}
if len(resp.Err) > 0 {
Error(resp.Err)
Exit("Server unable to create lock.")
lockClient, err := locking.NewClient(cfg)
if err != nil {
Exit("Unable to create lock system: %v", err.Error())
}
defer lockClient.Close()
lock, err := lockClient.LockFile(path)
if err != nil {
Exit("Lock failed: %v", err)
}
Print("\n'%s' was locked (%s)", args[0], resp.Lock.Id)
Print("\n'%s' was locked (%s)", args[0], lock.Id)
}
// lockPaths relativizes the given filepath such that it is relative to the root

@ -1,7 +1,7 @@
package commands
import (
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/locking"
"github.com/spf13/cobra"
)
@ -10,45 +10,33 @@ var (
)
func locksCommand(cmd *cobra.Command, args []string) {
setLockRemoteFor(cfg)
filters, err := locksCmdFlags.Filters()
if err != nil {
Error(err.Error())
Exit("Error building filters: %v", err)
}
var locks []api.Lock
query := &api.LockSearchRequest{Filters: filters}
for {
s, resp := API.Locks.Search(query)
if _, err := API.Do(s); err != nil {
Error(err.Error())
Exit("Error communicating with LFS API.")
}
if resp.Err != "" {
Error(resp.Err)
}
locks = append(locks, resp.Locks...)
if locksCmdFlags.Limit > 0 && len(locks) > locksCmdFlags.Limit {
locks = locks[:locksCmdFlags.Limit]
break
}
if resp.NextCursor != "" {
query.Cursor = resp.NextCursor
} else {
break
}
if len(lockRemote) > 0 {
cfg.CurrentRemote = lockRemote
}
Print("\n%d lock(s) matched query:", len(locks))
lockClient, err := locking.NewClient(cfg)
if err != nil {
Exit("Unable to create lock system: %v", err.Error())
}
defer lockClient.Close()
var lockCount int
locks, err := lockClient.SearchLocks(filters, locksCmdFlags.Limit, locksCmdFlags.Local)
// Print any we got before exiting
for _, lock := range locks {
Print("%s\t%s <%s>", lock.Path, lock.Committer.Name, lock.Committer.Email)
Print("%s\t%s <%s>", lock.Path, lock.Name, lock.Email)
lockCount++
}
if err != nil {
Exit("Error while retrieving locks: %v", err)
}
Print("\n%d lock(s) matched query.", lockCount)
}
// locksFlags wraps up and holds all of the flags that can be given to the
@ -63,13 +51,14 @@ type locksFlags struct {
// limit is an optional request parameter sent to the server used to
// limit the
Limit int
// local limits the scope of lock reporting to the locally cached record
// of locks for the current user & doesn't query the server
Local bool
}
// Filters produces a slice of api.Filter instances based on the internal state
// of this locksFlags instance. The return value of this method is capable (and
// recommend to be used with) the api.LockSearchRequest type.
func (l *locksFlags) Filters() ([]api.Filter, error) {
filters := make([]api.Filter, 0)
// Filters produces a filter based on locksFlags instance.
func (l *locksFlags) Filters() (map[string]string, error) {
filters := make(map[string]string)
if l.Path != "" {
path, err := lockPath(l.Path)
@ -77,10 +66,10 @@ func (l *locksFlags) Filters() ([]api.Filter, error) {
return nil, err
}
filters = append(filters, api.Filter{"path", path})
filters["path"] = path
}
if l.Id != "" {
filters = append(filters, api.Filter{"id", l.Id})
filters["id"] = l.Id
}
return filters, nil
@ -96,5 +85,6 @@ func init() {
cmd.Flags().StringVarP(&locksCmdFlags.Path, "path", "p", "", "filter locks results matching a particular path")
cmd.Flags().StringVarP(&locksCmdFlags.Id, "id", "i", "", "filter locks results matching a particular ID")
cmd.Flags().IntVarP(&locksCmdFlags.Limit, "limit", "l", 0, "optional limit for number of results to return")
cmd.Flags().BoolVarP(&locksCmdFlags.Local, "local", "", false, "only list cached local record of own locks")
})
}

@ -42,7 +42,7 @@ func lsFilesCommand(cmd *cobra.Command, args []string) {
})
defer gitscanner.Close()
if err := gitscanner.ScanTree(ref, nil); err != nil {
if err := gitscanner.ScanTree(ref); err != nil {
Exit("Could not scan for Git LFS tree: %s", err)
}
}

@ -149,8 +149,10 @@ func prune(fetchPruneConfig config.FetchPruneConfig, verifyRemote, dryRun, verbo
if verifyRemote {
tracerx.Printf("VERIFYING: %v", file.Oid)
pointer := lfs.NewPointer(file.Oid, file.Size, nil)
verifyQueue.Add(lfs.NewDownloadable(&lfs.WrappedPointer{Pointer: pointer}))
verifyQueue.Add(downloadTransfer(&lfs.WrappedPointer{
Pointer: lfs.NewPointer(file.Oid, file.Size, nil),
}))
}
}
}

@ -2,9 +2,15 @@ package commands
import (
"fmt"
"sync"
"time"
"github.com/git-lfs/git-lfs/filepathfilter"
"github.com/git-lfs/git-lfs/git"
"github.com/git-lfs/git-lfs/lfs"
"github.com/git-lfs/git-lfs/progress"
"github.com/git-lfs/git-lfs/tq"
"github.com/rubyist/tracerx"
"github.com/spf13/cobra"
)
@ -38,8 +44,99 @@ func pull(filter *filepathfilter.Filter) {
Panic(err, "Could not pull")
}
c := fetchRefToChan(ref.Sha, filter)
checkoutFromFetchChan(c, filter)
pointers := newPointerMap()
meter := progress.NewMeter(progress.WithOSEnv(cfg.Os))
singleCheckout := newSingleCheckout()
q := newDownloadQueue(tq.WithProgress(meter))
gitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) {
if err != nil {
LoggedError(err, "Scanner error")
return
}
if pointers.Seen(p) {
return
}
// no need to download objects that exist locally already
lfs.LinkOrCopyFromReference(p.Oid, p.Size)
if lfs.ObjectExistsOfSize(p.Oid, p.Size) {
singleCheckout.Run(p)
return
}
meter.Add(p.Size)
meter.StartTransfer(p.Name)
tracerx.Printf("fetch %v [%v]", p.Name, p.Oid)
pointers.Add(p)
q.Add(downloadTransfer(p))
})
gitscanner.Filter = filter
dlwatch := q.Watch()
var wg sync.WaitGroup
wg.Add(1)
go func() {
for oid := range dlwatch {
for _, p := range pointers.All(oid) {
singleCheckout.Run(p)
}
}
wg.Done()
}()
processQueue := time.Now()
if err := gitscanner.ScanTree(ref.Sha); err != nil {
ExitWithError(err)
}
meter.Start()
gitscanner.Close()
q.Wait()
wg.Wait()
tracerx.PerformanceSince("process queue", processQueue)
singleCheckout.Close()
for _, err := range q.Errors() {
FullError(err)
}
}
// tracks LFS objects being downloaded, according to their unique OIDs.
type pointerMap struct {
pointers map[string][]*lfs.WrappedPointer
mu sync.Mutex
}
func newPointerMap() *pointerMap {
return &pointerMap{pointers: make(map[string][]*lfs.WrappedPointer)}
}
func (m *pointerMap) Seen(p *lfs.WrappedPointer) bool {
m.mu.Lock()
defer m.mu.Unlock()
if existing, ok := m.pointers[p.Oid]; ok {
m.pointers[p.Oid] = append(existing, p)
return true
}
return false
}
func (m *pointerMap) Add(p *lfs.WrappedPointer) {
m.mu.Lock()
defer m.mu.Unlock()
m.pointers[p.Oid] = append(m.pointers[p.Oid], p)
}
func (m *pointerMap) All(oid string) []*lfs.WrappedPointer {
m.mu.Lock()
defer m.mu.Unlock()
pointers := m.pointers[oid]
delete(m.pointers, oid)
return pointers
}
func init() {

@ -1,20 +1,12 @@
package commands
import (
"errors"
"github.com/git-lfs/git-lfs/locking"
"github.com/git-lfs/git-lfs/api"
"github.com/spf13/cobra"
)
var (
// errNoMatchingLocks is an error returned when no matching locks were
// able to be resolved
errNoMatchingLocks = errors.New("lfs: no matching locks found")
// errLockAmbiguous is an error returned when multiple matching locks
// were found
errLockAmbiguous = errors.New("lfs: multiple locks found; ambiguous")
unlockCmdFlags unlockFlags
)
@ -29,67 +21,36 @@ type unlockFlags struct {
}
func unlockCommand(cmd *cobra.Command, args []string) {
setLockRemoteFor(cfg)
var id string
if len(lockRemote) > 0 {
cfg.CurrentRemote = lockRemote
}
lockClient, err := locking.NewClient(cfg)
if err != nil {
Exit("Unable to create lock system: %v", err.Error())
}
defer lockClient.Close()
if len(args) != 0 {
path, err := lockPath(args[0])
if err != nil {
Error(err.Error())
Exit("Unable to determine path: %v", err.Error())
}
if id, err = lockIdFromPath(path); err != nil {
Error(err.Error())
err = lockClient.UnlockFile(path, unlockCmdFlags.Force)
if err != nil {
Exit("Unable to unlock: %v", err.Error())
}
} else if unlockCmdFlags.Id != "" {
id = unlockCmdFlags.Id
err := lockClient.UnlockFileById(unlockCmdFlags.Id, unlockCmdFlags.Force)
if err != nil {
Exit("Unable to unlock %v: %v", unlockCmdFlags.Id, err.Error())
}
} else {
Error("Usage: git lfs unlock (--id my-lock-id | <path>)")
}
s, resp := API.Locks.Unlock(id, unlockCmdFlags.Force)
if _, err := API.Do(s); err != nil {
Error(err.Error())
Exit("Error communicating with LFS API.")
}
if len(resp.Err) > 0 {
Error(resp.Err)
Exit("Server unable to unlock lock.")
}
Print("'%s' was unlocked (%s)", args[0], resp.Lock.Id)
}
// lockIdFromPath makes a call to the LFS API and resolves the ID for the locked
// locked at the given path.
//
// If the API call failed, an error will be returned. If multiple locks matched
// the given path (should not happen during real-world usage), an error will be
// returnd. If no locks matched the given path, an error will be returned.
//
// If the API call is successful, and only one lock matches the given filepath,
// then its ID will be returned, along with a value of "nil" for the error.
func lockIdFromPath(path string) (string, error) {
s, resp := API.Locks.Search(&api.LockSearchRequest{
Filters: []api.Filter{
{"path", path},
},
})
if _, err := API.Do(s); err != nil {
return "", err
}
switch len(resp.Locks) {
case 0:
return "", errNoMatchingLocks
case 1:
return resp.Locks[0].Id, nil
default:
return "", errLockAmbiguous
}
Print("'%s' was unlocked", args[0])
}
func init() {

@ -11,7 +11,6 @@ import (
"strings"
"time"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/filepathfilter"
@ -26,10 +25,6 @@ import (
//go:generate go run ../docs/man/mangen.go
var (
// API is a package-local instance of the API client for use within
// various command implementations.
API = api.NewClient(nil)
Debugging = false
ErrorBuffer = &bytes.Buffer{}
ErrorWriter = io.MultiWriter(os.Stderr, ErrorBuffer)
@ -67,6 +62,74 @@ func buildFilepathFilter(config *config.Configuration, includeArg, excludeArg *s
return filepathfilter.New(inc, exc)
}
func downloadTransfer(p *lfs.WrappedPointer) (name, path, oid string, size int64) {
path, _ = lfs.LocalMediaPath(p.Oid)
return p.Name, path, p.Oid, p.Size
}
func uploadTransfer(oid, filename string) (*tq.Transfer, error) {
localMediaPath, err := lfs.LocalMediaPath(oid)
if err != nil {
return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
}
if len(filename) > 0 {
if err = ensureFile(filename, localMediaPath); err != nil {
return nil, err
}
}
fi, err := os.Stat(localMediaPath)
if err != nil {
return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
}
return &tq.Transfer{
Name: filename,
Path: localMediaPath,
Oid: oid,
Size: fi.Size(),
}, nil
}
// ensureFile makes sure that the cleanPath exists before pushing it. If it
// does not exist, it attempts to clean it by reading the file at smudgePath.
func ensureFile(smudgePath, cleanPath string) error {
if _, err := os.Stat(cleanPath); err == nil {
return nil
}
expectedOid := filepath.Base(cleanPath)
localPath := filepath.Join(config.LocalWorkingDir, smudgePath)
file, err := os.Open(localPath)
if err != nil {
return err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return err
}
cleaned, err := lfs.PointerClean(file, file.Name(), stat.Size(), nil)
if cleaned != nil {
cleaned.Teardown()
}
if err != nil {
return err
}
if expectedOid != cleaned.Oid {
return fmt.Errorf("Trying to push %q with OID %s.\nNot found in %s.", smudgePath, expectedOid, filepath.Dir(cleanPath))
}
return nil
}
// Error prints a formatted message to Stderr. It also gets printed to the
// panic log if one is created for this command.
func Error(format string, args ...interface{}) {

136
commands/pull.go Normal file

@ -0,0 +1,136 @@
package commands
import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"sync"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/lfs"
"github.com/git-lfs/git-lfs/tq"
)
// Handles the process of checking out a single file, and updating the git
// index.
func newSingleCheckout() *singleCheckout {
// Get a converter from repo-relative to cwd-relative
// Since writing data & calling git update-index must be relative to cwd
pathConverter, err := lfs.NewRepoToCurrentPathConverter()
if err != nil {
Panic(err, "Could not convert file paths")
}
return &singleCheckout{
gitIndexer: &gitIndexer{},
pathConverter: pathConverter,
manifest: TransferManifest(),
}
}
type singleCheckout struct {
gitIndexer *gitIndexer
pathConverter lfs.PathConverter
manifest *tq.Manifest
}
func (c *singleCheckout) Run(p *lfs.WrappedPointer) {
// Check the content - either missing or still this pointer (not exist is ok)
filepointer, err := lfs.DecodePointerFromFile(p.Name)
if err != nil && !os.IsNotExist(err) {
if errors.IsNotAPointerError(err) {
// File has non-pointer content, leave it alone
return
}
LoggedError(err, "Checkout error: %s", err)
return
}
if filepointer != nil && filepointer.Oid != p.Oid {
// User has probably manually reset a file to another commit
// while leaving it a pointer; don't mess with this
return
}
cwdfilepath := c.pathConverter.Convert(p.Name)
err = lfs.PointerSmudgeToFile(cwdfilepath, p.Pointer, false, c.manifest, nil)
if err != nil {
if errors.IsDownloadDeclinedError(err) {
// acceptable error, data not local (fetch not run or include/exclude)
LoggedError(err, "Skipped checkout for %q, content not local. Use fetch to download.", p.Name)
} else {
FullError(fmt.Errorf("Could not check out %q", p.Name))
}
return
}
// errors are only returned when the gitIndexer is starting a new cmd
if err := c.gitIndexer.Add(cwdfilepath); err != nil {
Panic(err, "Could not update the index")
}
}
func (c *singleCheckout) Close() {
if err := c.gitIndexer.Close(); err != nil {
LoggedError(err, "Error updating the git index:\n%s", c.gitIndexer.Output())
}
}
// Don't fire up the update-index command until we have at least one file to
// give it. Otherwise git interprets the lack of arguments to mean param-less update-index
// which can trigger entire working copy to be re-examined, which triggers clean filters
// and which has unexpected side effects (e.g. downloading filtered-out files)
type gitIndexer struct {
cmd *exec.Cmd
input io.WriteCloser
output bytes.Buffer
mu sync.Mutex
}
func (i *gitIndexer) Add(path string) error {
i.mu.Lock()
defer i.mu.Unlock()
if i.cmd == nil {
// Fire up the update-index command
i.cmd = exec.Command("git", "update-index", "-q", "--refresh", "--stdin")
i.cmd.Stdout = &i.output
i.cmd.Stderr = &i.output
stdin, err := i.cmd.StdinPipe()
if err == nil {
err = i.cmd.Start()
}
if err != nil {
return err
}
i.input = stdin
}
i.input.Write([]byte(path + "\n"))
return nil
}
func (i *gitIndexer) Output() string {
return i.output.String()
}
func (i *gitIndexer) Close() error {
i.mu.Lock()
defer i.mu.Unlock()
if i.input != nil {
i.input.Close()
}
if i.cmd != nil {
return i.cmd.Wait()
}
return nil
}

@ -113,7 +113,7 @@ func (c *uploadContext) checkMissing(missing []*lfs.WrappedPointer, missingSize
}()
for _, p := range missing {
checkQueue.Add(lfs.NewDownloadable(p))
checkQueue.Add(downloadTransfer(p))
}
// Currently this is needed to flush the batch but is not enough to sync
@ -140,7 +140,7 @@ func uploadPointers(c *uploadContext, unfiltered []*lfs.WrappedPointer) {
q, pointers := c.prepareUpload(unfiltered)
for _, p := range pointers {
u, err := lfs.NewUploadable(p.Oid, p.Name)
t, err := uploadTransfer(p.Oid, p.Name)
if err != nil {
if errors.IsCleanPointerError(err) {
Exit(uploadMissingErr, p.Oid, p.Name, errors.GetContext(err, "pointer").(*lfs.Pointer).Oid)
@ -149,7 +149,7 @@ func uploadPointers(c *uploadContext, unfiltered []*lfs.WrappedPointer) {
}
}
q.Add(u)
q.Add(t.Name, t.Path, t.Oid, t.Size)
c.SetUploaded(p.Oid)
}

@ -394,3 +394,12 @@ func (c *Configuration) loadGitConfig() bool {
return false
}
// CurrentCommitter returns the name/email that would be used to author a commit
// with this configuration. In particular, the "user.name" and "user.email"
// configuration values are used
func (c *Configuration) CurrentCommitter() (name, email string) {
name, _ = c.Git.Get("user.name")
email, _ = c.Git.Get("user.email")
return
}

@ -31,7 +31,7 @@ that the client has configured. If omitted, the `basic` transfer adapter MUST
be assumed by the server.
* `objects` - An Array of objects to download.
* `oid` - String OID of the LFS object.
* `size` - Integer byte size of the LFS object.
* `size` - Integer byte size of the LFS object. Must be at least zero.
Note: Git LFS currently only supports the `basic` transfer adapter. This
property was added for future compatibility with some experimental transfer
@ -70,7 +70,7 @@ client will use the `basic` transfer adapter if the `transfer` property is
omitted.
* `objects` - An Array of objects to download.
* `oid` - String OID of the LFS object.
* `size` - Integer byte size of the LFS object.
* `size` - Integer byte size of the LFS object. Must be at least zero.
* `authenticated` - Optional boolean specifying whether the request for this
specific object is authenticated. If omitted or false, Git LFS will attempt
to [find credentials for this URL](./authentication.md).

@ -21,7 +21,8 @@
"type": "string"
},
"size": {
"type": "number"
"type": "number",
"minimum": 0
},
"authenticated": {
"type": "boolean"

@ -78,6 +78,7 @@ func TestFilterAllows(t *testing.T) {
filterTest{true, []string{"test/fil*"}, nil},
filterTest{false, []string{"test/g*"}, nil},
filterTest{true, []string{"tes*/*"}, nil},
filterTest{true, []string{"[Tt]est/[Ff]ilename.dat"}, nil},
// Exclusion
filterTest{false, nil, []string{"*.dat"}},
filterTest{false, nil, []string{"file*.dat"}},
@ -96,6 +97,7 @@ func TestFilterAllows(t *testing.T) {
filterTest{false, nil, []string{"test/fil*"}},
filterTest{true, nil, []string{"test/g*"}},
filterTest{false, nil, []string{"tes*/*"}},
filterTest{false, nil, []string{"[Tt]est/[Ff]ilename.dat"}},
// Both
filterTest{true, []string{"test/filename.dat"}, []string{"test/notfilename.dat"}},

@ -1,3 +1,5 @@
//go:generate goversioninfo -icon=script/windows-installer/git-lfs-logo.ico
package main
import (

@ -1,45 +1,10 @@
package lfs
import (
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/tq"
)
type Downloadable struct {
pointer *WrappedPointer
object *api.ObjectResource
}
func (d *Downloadable) Object() *api.ObjectResource {
return d.object
}
func (d *Downloadable) Oid() string {
return d.pointer.Oid
}
func (d *Downloadable) Size() int64 {
return d.pointer.Size
}
func (d *Downloadable) Name() string {
return d.pointer.Name
}
func (d *Downloadable) Path() string {
p, _ := LocalMediaPath(d.pointer.Oid)
return p
}
func (d *Downloadable) SetObject(o *api.ObjectResource) {
d.object = o
}
func NewDownloadable(p *WrappedPointer) *Downloadable {
return &Downloadable{pointer: p}
}
// NewDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download
func NewDownloadCheckQueue(cfg *config.Configuration, options ...tq.Option) *tq.TransferQueue {
allOptions := make([]tq.Option, len(options), len(options)+1)

@ -132,8 +132,8 @@ func (s *GitScanner) ScanAll(cb GitScannerCallback) error {
// ScanTree takes a ref and returns WrappedPointer objects in the tree at that
// ref. Differs from ScanRefs in that multiple files in the tree with the same
// content are all reported.
func (s *GitScanner) ScanTree(ref string, cb GitScannerCallback) error {
callback, err := firstGitScannerCallback(cb, s.callback)
func (s *GitScanner) ScanTree(ref string) error {
callback, err := firstGitScannerCallback(s.callback)
if err != nil {
return err
}

@ -10,7 +10,6 @@ import (
"github.com/git-lfs/git-lfs/tools"
"github.com/git-lfs/git-lfs/tq"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/progress"
@ -75,34 +74,21 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, download
func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb progress.CopyCallback) error {
fmt.Fprintf(os.Stderr, "Downloading %s (%s)\n", workingfile, pb.FormatBytes(ptr.Size))
xfers := manifest.GetDownloadAdapterNames()
obj, adapterName, err := api.BatchSingle(config.Config, &api.ObjectResource{Oid: ptr.Oid, Size: ptr.Size}, "download", xfers)
if err != nil {
return errors.Wrapf(err, "Error downloading %s: %s", filepath.Base(mediafile), err)
}
q := tq.NewTransferQueue(tq.Download, manifest)
q.Add(filepath.Base(workingfile), mediafile, ptr.Oid, ptr.Size)
q.Wait()
if ptr.Size == 0 {
ptr.Size = obj.Size
}
adapter := manifest.NewDownloadAdapter(adapterName)
var tcb tq.ProgressCallback
if cb != nil {
tcb = func(name string, totalSize, readSoFar int64, readSinceLast int) error {
return cb(totalSize, readSoFar, readSinceLast)
if errs := q.Errors(); len(errs) > 0 {
var multiErr error
for _, e := range errs {
if multiErr != nil {
multiErr = fmt.Errorf("%v\n%v", multiErr, e)
} else {
multiErr = e
}
return errors.Wrapf(multiErr, "Error downloading %s (%s)", workingfile, ptr.Oid)
}
}
// Single download
err = adapter.Begin(1, tcb)
if err != nil {
return err
}
res := <-adapter.Add(tq.NewTransfer(filepath.Base(workingfile), obj, mediafile))
adapter.End()
if res.Error != nil {
return errors.Wrapf(err, "Error buffering media file: %s", res.Error)
}
return readLocalFile(writer, ptr, mediafile, workingfile, nil)
}

@ -1,108 +1,10 @@
package lfs
import (
"fmt"
"os"
"path/filepath"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/tq"
)
// Uploadable describes a file that can be uploaded.
type Uploadable struct {
oid string
OidPath string
Filename string
size int64
object *api.ObjectResource
}
func (u *Uploadable) Object() *api.ObjectResource {
return u.object
}
func (u *Uploadable) Oid() string {
return u.oid
}
func (u *Uploadable) Size() int64 {
return u.size
}
func (u *Uploadable) Name() string {
return u.Filename
}
func (u *Uploadable) SetObject(o *api.ObjectResource) {
u.object = o
}
func (u *Uploadable) Path() string {
return u.OidPath
}
// NewUploadable builds the Uploadable from the given information.
// "filename" can be empty if a raw object is pushed (see "object-id" flag in push command)/
func NewUploadable(oid, filename string) (*Uploadable, error) {
localMediaPath, err := LocalMediaPath(oid)
if err != nil {
return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
}
if len(filename) > 0 {
if err := ensureFile(filename, localMediaPath); err != nil {
return nil, err
}
}
fi, err := os.Stat(localMediaPath)
if err != nil {
return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
}
return &Uploadable{oid: oid, OidPath: localMediaPath, Filename: filename, size: fi.Size()}, nil
}
// ensureFile makes sure that the cleanPath exists before pushing it. If it
// does not exist, it attempts to clean it by reading the file at smudgePath.
func ensureFile(smudgePath, cleanPath string) error {
if _, err := os.Stat(cleanPath); err == nil {
return nil
}
expectedOid := filepath.Base(cleanPath)
localPath := filepath.Join(config.LocalWorkingDir, smudgePath)
file, err := os.Open(localPath)
if err != nil {
return err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return err
}
cleaned, err := PointerClean(file, file.Name(), stat.Size(), nil)
if cleaned != nil {
cleaned.Teardown()
}
if err != nil {
return err
}
if expectedOid != cleaned.Oid {
return fmt.Errorf("Trying to push %q with OID %s.\nNot found in %s.", smudgePath, expectedOid, filepath.Dir(cleanPath))
}
return nil
}
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
func NewUploadQueue(cfg *config.Configuration, options ...tq.Option) *tq.TransferQueue {
return tq.NewTransferQueue(tq.Upload, TransferManifest(cfg), options...)

@ -87,90 +87,98 @@ func GetPlatform() Platform {
return currentPlatform
}
type PathConverter interface {
Convert(string) string
}
// Convert filenames expressed relative to the root of the repo relative to the
// current working dir. Useful when needing to calling git with results from a rooted command,
// but the user is in a subdir of their repo
// Pass in a channel which you will fill with relative files & receive a channel which will get results
func ConvertRepoFilesRelativeToCwd(repochan <-chan string) (<-chan string, error) {
wd, err := os.Getwd()
func NewRepoToCurrentPathConverter() (PathConverter, error) {
r, c, p, err := pathConverterArgs()
if err != nil {
return nil, fmt.Errorf("Unable to get working dir: %v", err)
}
wd = tools.ResolveSymlinks(wd)
// Early-out if working dir is root dir, same result
passthrough := false
if config.LocalWorkingDir == wd {
passthrough = true
return nil, err
}
outchan := make(chan string, 1)
return &repoToCurrentPathConverter{
repoDir: r,
currDir: c,
passthrough: p,
}, nil
}
go func() {
for f := range repochan {
if passthrough {
outchan <- f
continue
}
abs := filepath.Join(config.LocalWorkingDir, f)
rel, err := filepath.Rel(wd, abs)
if err != nil {
// Use absolute file instead
outchan <- abs
} else {
outchan <- rel
}
}
close(outchan)
}()
type repoToCurrentPathConverter struct {
repoDir string
currDir string
passthrough bool
}
return outchan, nil
func (p *repoToCurrentPathConverter) Convert(filename string) string {
if p.passthrough {
return filename
}
abs := filepath.Join(p.repoDir, filename)
rel, err := filepath.Rel(p.currDir, abs)
if err != nil {
// Use absolute file instead
return abs
} else {
return rel
}
}
// Convert filenames expressed relative to the current directory to be
// relative to the repo root. Useful when calling git with arguments that requires them
// to be rooted but the user is in a subdir of their repo & expects to use relative args
// Pass in a channel which you will fill with relative files & receive a channel which will get results
func ConvertCwdFilesRelativeToRepo(cwdchan <-chan string) (<-chan string, error) {
curdir, err := os.Getwd()
func NewCurrentToRepoPathConverter() (PathConverter, error) {
r, c, p, err := pathConverterArgs()
if err != nil {
return nil, fmt.Errorf("Could not retrieve current directory: %v", err)
}
// Make sure to resolve symlinks
curdir = tools.ResolveSymlinks(curdir)
// Early-out if working dir is root dir, same result
passthrough := false
if config.LocalWorkingDir == curdir {
passthrough = true
return nil, err
}
outchan := make(chan string, 1)
go func() {
for p := range cwdchan {
if passthrough {
outchan <- p
continue
}
var abs string
if filepath.IsAbs(p) {
abs = tools.ResolveSymlinks(p)
} else {
abs = filepath.Join(curdir, p)
}
reltoroot, err := filepath.Rel(config.LocalWorkingDir, abs)
if err != nil {
// Can't do this, use absolute as best fallback
outchan <- abs
} else {
outchan <- reltoroot
}
}
close(outchan)
}()
return &currentToRepoPathConverter{
repoDir: r,
currDir: c,
passthrough: p,
}, nil
}
return outchan, nil
type currentToRepoPathConverter struct {
repoDir string
currDir string
passthrough bool
}
func (p *currentToRepoPathConverter) Convert(filename string) string {
if p.passthrough {
return filename
}
var abs string
if filepath.IsAbs(filename) {
abs = tools.ResolveSymlinks(filename)
} else {
abs = filepath.Join(p.currDir, filename)
}
reltoroot, err := filepath.Rel(p.repoDir, abs)
if err != nil {
// Can't do this, use absolute as best fallback
return abs
} else {
return reltoroot
}
}
func pathConverterArgs() (string, string, bool, error) {
currDir, err := os.Getwd()
if err != nil {
return "", "", false, fmt.Errorf("Unable to get working dir: %v", err)
}
currDir = tools.ResolveSymlinks(currDir)
return config.LocalWorkingDir, currDir, config.LocalWorkingDir == currDir, nil
}
// Are we running on Windows? Need to handle some extra path shenanigans

79
lfsapi/errors.go Normal file

@ -0,0 +1,79 @@
package lfsapi
import (
"fmt"
"net/http"
"github.com/git-lfs/git-lfs/errors"
)
type ClientError struct {
Message string `json:"message"`
DocumentationUrl string `json:"documentation_url,omitempty"`
RequestId string `json:"request_id,omitempty"`
}
func (e *ClientError) Error() string {
msg := e.Message
if len(e.DocumentationUrl) > 0 {
msg += "\nDocs: " + e.DocumentationUrl
}
if len(e.RequestId) > 0 {
msg += "\nRequest ID: " + e.RequestId
}
return msg
}
func (c *Client) handleResponse(res *http.Response) error {
if res.StatusCode < 400 {
return nil
}
cliErr := &ClientError{}
err := decodeResponse(res, cliErr)
if err == nil {
if len(cliErr.Message) == 0 {
err = defaultError(res)
} else {
err = errors.Wrap(cliErr, "http")
}
}
if res.StatusCode == 401 {
return errors.NewAuthError(err)
}
if res.StatusCode > 499 && res.StatusCode != 501 && res.StatusCode != 507 && res.StatusCode != 509 {
return errors.NewFatalError(err)
}
return err
}
var (
defaultErrors = map[int]string{
400: "Client error: %s",
401: "Authorization error: %s\nCheck that you have proper access to the repository",
403: "Authorization error: %s\nCheck that you have proper access to the repository",
404: "Repository or object not found: %s\nCheck that it exists and that you have proper access to it",
429: "Rate limit exceeded: %s",
500: "Server error: %s",
501: "Not Implemented: %s",
507: "Insufficient server storage: %s",
509: "Bandwidth limit exceeded: %s",
}
)
func defaultError(res *http.Response) error {
var msgFmt string
if f, ok := defaultErrors[res.StatusCode]; ok {
msgFmt = f
} else if res.StatusCode < 500 {
msgFmt = defaultErrors[400] + fmt.Sprintf(" from HTTP %d", res.StatusCode)
} else {
msgFmt = defaultErrors[500] + fmt.Sprintf(" from HTTP %d", res.StatusCode)
}
return errors.Errorf(msgFmt, res.Request.URL)
}

42
lfsapi/lfsapi.go Normal file

@ -0,0 +1,42 @@
package lfsapi
import (
"encoding/json"
"net/http"
"regexp"
"github.com/git-lfs/git-lfs/errors"
)
var (
lfsMediaTypeRE = regexp.MustCompile(`\Aapplication/vnd\.git\-lfs\+json(;|\z)`)
jsonMediaTypeRE = regexp.MustCompile(`\Aapplication/json(;|\z)`)
)
type Client struct {
}
func (c *Client) Do(req *http.Request) (*http.Response, error) {
res, err := http.DefaultClient.Do(req)
if err != nil {
return res, err
}
return res, c.handleResponse(res)
}
func decodeResponse(res *http.Response, obj interface{}) error {
ctype := res.Header.Get("Content-Type")
if !(lfsMediaTypeRE.MatchString(ctype) || jsonMediaTypeRE.MatchString(ctype)) {
return nil
}
err := json.NewDecoder(res.Body).Decode(obj)
res.Body.Close()
if err != nil {
return errors.Wrapf(err, "Unable to parse HTTP response for %s %s", res.Request.Method, res.Request.URL)
}
return nil
}

101
locking/cache.go Normal file

@ -0,0 +1,101 @@
package locking
import (
"strings"
"github.com/git-lfs/git-lfs/tools/kv"
)
const (
// We want to use a single cache file for integrity, but to make it easy to
// list all locks, prefix the id->path map in a way we can identify (something
// that won't be in a path)
idKeyPrefix string = "*id*://"
)
type LockCache struct {
kv *kv.Store
}
func NewLockCache(filepath string) (*LockCache, error) {
kv, err := kv.NewStore(filepath)
if err != nil {
return nil, err
}
return &LockCache{kv}, nil
}
func (c *LockCache) encodeIdKey(id string) string {
// Safety against accidents
if !c.isIdKey(id) {
return idKeyPrefix + id
}
return id
}
func (c *LockCache) decodeIdKey(key string) string {
// Safety against accidents
if c.isIdKey(key) {
return key[len(idKeyPrefix):]
}
return key
}
func (c *LockCache) isIdKey(key string) bool {
return strings.HasPrefix(key, idKeyPrefix)
}
// Cache a successful lock for faster local lookup later
func (c *LockCache) Add(l Lock) error {
// Store reference in both directions
// Path -> Lock
c.kv.Set(l.Path, &l)
// EncodedId -> Lock (encoded so we can easily identify)
c.kv.Set(c.encodeIdKey(l.Id), &l)
return nil
}
// Remove a cached lock by path becuase it's been relinquished
func (c *LockCache) RemoveByPath(filePath string) error {
ilock := c.kv.Get(filePath)
if lock, ok := ilock.(*Lock); ok && lock != nil {
c.kv.Remove(lock.Path)
// Id as key is encoded
c.kv.Remove(c.encodeIdKey(lock.Id))
}
return nil
}
// Remove a cached lock by id because it's been relinquished
func (c *LockCache) RemoveById(id string) error {
// Id as key is encoded
idkey := c.encodeIdKey(id)
ilock := c.kv.Get(idkey)
if lock, ok := ilock.(*Lock); ok && lock != nil {
c.kv.Remove(idkey)
c.kv.Remove(lock.Path)
}
return nil
}
// Get the list of cached locked files
func (c *LockCache) Locks() []Lock {
var locks []Lock
c.kv.Visit(func(key string, val interface{}) bool {
// Only report file->id entries not reverse
if !c.isIdKey(key) {
lock := val.(*Lock)
locks = append(locks, *lock)
}
return true // continue
})
return locks
}
// Clear the cache
func (c *LockCache) Clear() {
c.kv.RemoveAll()
}
// Save the cache
func (c *LockCache) Save() error {
return c.kv.Save()
}

61
locking/cache_test.go Normal file

@ -0,0 +1,61 @@
package locking
import (
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func TestLockCache(t *testing.T) {
var err error
tmpf, err := ioutil.TempFile("", "testCacheLock")
assert.Nil(t, err)
defer func() {
os.Remove(tmpf.Name())
}()
tmpf.Close()
cache, err := NewLockCache(tmpf.Name())
assert.Nil(t, err)
testLocks := []Lock{
Lock{Path: "folder/test1.dat", Id: "101"},
Lock{Path: "folder/test2.dat", Id: "102"},
Lock{Path: "root.dat", Id: "103"},
}
for _, l := range testLocks {
err = cache.Add(l)
assert.Nil(t, err)
}
locks := cache.Locks()
for _, l := range testLocks {
assert.Contains(t, locks, l)
}
assert.Equal(t, len(testLocks), len(locks))
err = cache.RemoveByPath("folder/test2.dat")
assert.Nil(t, err)
locks = cache.Locks()
// delete item 1 from test locls
testLocks = append(testLocks[:1], testLocks[2:]...)
for _, l := range testLocks {
assert.Contains(t, locks, l)
}
assert.Equal(t, len(testLocks), len(locks))
err = cache.RemoveById("101")
assert.Nil(t, err)
locks = cache.Locks()
testLocks = testLocks[1:]
for _, l := range testLocks {
assert.Contains(t, locks, l)
}
assert.Equal(t, len(testLocks), len(locks))
}

276
locking/locks.go Normal file

@ -0,0 +1,276 @@
package locking
import (
"errors"
"fmt"
"os"
"path/filepath"
"time"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/git"
"github.com/git-lfs/git-lfs/tools/kv"
)
var (
// ErrNoMatchingLocks is an error returned when no matching locks were
// able to be resolved
ErrNoMatchingLocks = errors.New("lfs: no matching locks found")
// ErrLockAmbiguous is an error returned when multiple matching locks
// were found
ErrLockAmbiguous = errors.New("lfs: multiple locks found; ambiguous")
)
// Client is the main interface object for the locking package
type Client struct {
cfg *config.Configuration
apiClient *api.Client
cache *LockCache
}
// NewClient creates a new locking client with the given configuration
// You must call the returned object's `Close` method when you are finished with
// it
func NewClient(cfg *config.Configuration) (*Client, error) {
apiClient := api.NewClient(api.NewHttpLifecycle(cfg))
lockDir := filepath.Join(config.LocalGitStorageDir, "lfs")
err := os.MkdirAll(lockDir, 0755)
if err != nil {
return nil, err
}
lockFile := filepath.Join(lockDir, "lockcache.db")
cache, err := NewLockCache(lockFile)
if err != nil {
return nil, err
}
return &Client{cfg, apiClient, cache}, nil
}
// Close this client instance; must be called to dispose of resources
func (c *Client) Close() error {
return c.cache.Save()
}
// LockFile attempts to lock a file on the current remote
// path must be relative to the root of the repository
// Returns the lock id if successful, or an error
func (c *Client) LockFile(path string) (Lock, error) {
// TODO: this is not really the constraint we need to avoid merges, improve as per proposal
latest, err := git.CurrentRemoteRef()
if err != nil {
return Lock{}, err
}
s, resp := c.apiClient.Locks.Lock(&api.LockRequest{
Path: path,
Committer: api.NewCommitter(c.cfg.CurrentCommitter()),
LatestRemoteCommit: latest.Sha,
})
if _, err := c.apiClient.Do(s); err != nil {
return Lock{}, fmt.Errorf("Error communicating with LFS API: %v", err)
}
if len(resp.Err) > 0 {
return Lock{}, fmt.Errorf("Server unable to create lock: %v", resp.Err)
}
lock := c.newLockFromApi(*resp.Lock)
if err := c.cache.Add(lock); err != nil {
return Lock{}, fmt.Errorf("Error caching lock information: %v", err)
}
return lock, nil
}
// UnlockFile attempts to unlock a file on the current remote
// path must be relative to the root of the repository
// Force causes the file to be unlocked from other users as well
func (c *Client) UnlockFile(path string, force bool) error {
id, err := c.lockIdFromPath(path)
if err != nil {
return fmt.Errorf("Unable to get lock id: %v", err)
}
return c.UnlockFileById(id, force)
}
// UnlockFileById attempts to unlock a lock with a given id on the current remote
// Force causes the file to be unlocked from other users as well
func (c *Client) UnlockFileById(id string, force bool) error {
s, resp := c.apiClient.Locks.Unlock(id, force)
if _, err := c.apiClient.Do(s); err != nil {
return fmt.Errorf("Error communicating with LFS API: %v", err)
}
if len(resp.Err) > 0 {
return fmt.Errorf("Server unable to unlock lock: %v", resp.Err)
}
if err := c.cache.RemoveById(id); err != nil {
return fmt.Errorf("Error caching unlock information: %v", err)
}
return nil
}
// Lock is a record of a locked file
type Lock struct {
// Id is the unique identifier corresponding to this particular Lock. It
// must be consistent with the local copy, and the server's copy.
Id string
// Path is an absolute path to the file that is locked as a part of this
// lock.
Path string
// Name is the name of the person holding this lock
Name string
// Email address of the person holding this lock
Email string
// LockedAt is the time at which this lock was acquired.
LockedAt time.Time
}
func (c *Client) newLockFromApi(a api.Lock) Lock {
return Lock{
Id: a.Id,
Path: a.Path,
Name: a.Committer.Name,
Email: a.Committer.Email,
LockedAt: a.LockedAt,
}
}
// SearchLocks returns a channel of locks which match the given name/value filter
// If limit > 0 then search stops at that number of locks
// If localOnly = true, don't query the server & report only own local locks
func (c *Client) SearchLocks(filter map[string]string, limit int, localOnly bool) (locks []Lock, err error) {
if localOnly {
return c.searchCachedLocks(filter, limit)
} else {
return c.searchRemoteLocks(filter, limit)
}
}
func (c *Client) searchCachedLocks(filter map[string]string, limit int) ([]Lock, error) {
cachedlocks := c.cache.Locks()
path, filterByPath := filter["path"]
id, filterById := filter["id"]
lockCount := 0
locks := make([]Lock, 0, len(cachedlocks))
for _, l := range cachedlocks {
// Manually filter by Path/Id
if (filterByPath && path != l.Path) ||
(filterById && id != l.Id) {
continue
}
locks = append(locks, l)
lockCount++
if limit > 0 && lockCount >= limit {
break
}
}
return locks, nil
}
func (c *Client) searchRemoteLocks(filter map[string]string, limit int) ([]Lock, error) {
locks := make([]Lock, 0, limit)
apifilters := make([]api.Filter, 0, len(filter))
for k, v := range filter {
apifilters = append(apifilters, api.Filter{k, v})
}
query := &api.LockSearchRequest{Filters: apifilters}
for {
s, resp := c.apiClient.Locks.Search(query)
if _, err := c.apiClient.Do(s); err != nil {
return locks, fmt.Errorf("Error communicating with LFS API: %v", err)
}
if resp.Err != "" {
return locks, fmt.Errorf("Error response from LFS API: %v", resp.Err)
}
for _, l := range resp.Locks {
locks = append(locks, c.newLockFromApi(l))
if limit > 0 && len(locks) >= limit {
// Exit outer loop too
return locks, nil
}
}
if resp.NextCursor != "" {
query.Cursor = resp.NextCursor
} else {
break
}
}
return locks, nil
}
// lockIdFromPath makes a call to the LFS API and resolves the ID for the locked
// locked at the given path.
//
// If the API call failed, an error will be returned. If multiple locks matched
// the given path (should not happen during real-world usage), an error will be
// returnd. If no locks matched the given path, an error will be returned.
//
// If the API call is successful, and only one lock matches the given filepath,
// then its ID will be returned, along with a value of "nil" for the error.
func (c *Client) lockIdFromPath(path string) (string, error) {
s, resp := c.apiClient.Locks.Search(&api.LockSearchRequest{
Filters: []api.Filter{
{"path", path},
},
})
if _, err := c.apiClient.Do(s); err != nil {
return "", err
}
switch len(resp.Locks) {
case 0:
return "", ErrNoMatchingLocks
case 1:
return resp.Locks[0].Id, nil
default:
return "", ErrLockAmbiguous
}
}
// Fetch locked files for the current committer and cache them locally
// This can be used to sync up locked files when moving machines
func (c *Client) refreshLockCache() error {
// TODO: filters don't seem to currently define how to search for a
// committer's email. Is it "committer.email"? For now, just iterate
locks, err := c.SearchLocks(nil, 0, false)
if err != nil {
return err
}
// We're going to overwrite the entire local cache
c.cache.Clear()
_, email := c.cfg.CurrentCommitter()
for _, l := range locks {
if l.Email == email {
c.cache.Add(l)
}
}
return nil
}
func init() {
kv.RegisterTypeForStorage(&Lock{})
}

98
locking/locks_test.go Normal file

@ -0,0 +1,98 @@
package locking
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"os"
"sort"
"testing"
"time"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/config"
"github.com/stretchr/testify/assert"
)
type TestLifecycle struct {
}
func (l *TestLifecycle) Build(schema *api.RequestSchema) (*http.Request, error) {
return http.NewRequest("GET", "http://dummy", nil)
}
func (l *TestLifecycle) Execute(req *http.Request, into interface{}) (api.Response, error) {
// Return test data including other users
locks := api.LockList{Locks: []api.Lock{
api.Lock{Id: "99", Path: "folder/test3.dat", Committer: api.Committer{Name: "Alice", Email: "alice@wonderland.com"}},
api.Lock{Id: "101", Path: "folder/test1.dat", Committer: api.Committer{Name: "Fred", Email: "fred@bloggs.com"}},
api.Lock{Id: "102", Path: "folder/test2.dat", Committer: api.Committer{Name: "Fred", Email: "fred@bloggs.com"}},
api.Lock{Id: "103", Path: "root.dat", Committer: api.Committer{Name: "Fred", Email: "fred@bloggs.com"}},
api.Lock{Id: "199", Path: "other/test1.dat", Committer: api.Committer{Name: "Charles", Email: "charles@incharge.com"}},
}}
locksJson, _ := json.Marshal(locks)
r := &http.Response{
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.0",
Body: ioutil.NopCloser(bytes.NewReader(locksJson)),
}
if into != nil {
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(into); err != nil {
return nil, err
}
}
return api.WrapHttpResponse(r), nil
}
func (l *TestLifecycle) Cleanup(resp api.Response) error {
return resp.Body().Close()
}
type LocksById []Lock
func (a LocksById) Len() int { return len(a) }
func (a LocksById) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a LocksById) Less(i, j int) bool { return a[i].Id < a[j].Id }
func TestRefreshCache(t *testing.T) {
var err error
oldStore := config.LocalGitStorageDir
config.LocalGitStorageDir, err = ioutil.TempDir("", "testCacheLock")
assert.Nil(t, err)
defer func() {
os.RemoveAll(config.LocalGitStorageDir)
config.LocalGitStorageDir = oldStore
}()
cfg := config.NewFrom(config.Values{
Git: map[string]string{"user.name": "Fred", "user.email": "fred@bloggs.com"}})
client, err := NewClient(cfg)
assert.Nil(t, err)
// Override api client for testing
client.apiClient = api.NewClient(&TestLifecycle{})
// Should start with no cached items
locks, err := client.SearchLocks(nil, 0, true)
assert.Nil(t, err)
assert.Empty(t, locks)
// Should load from test data, just Fred's
err = client.refreshLockCache()
assert.Nil(t, err)
locks, err = client.SearchLocks(nil, 0, true)
assert.Nil(t, err)
// Need to include zero time in structure for equal to work
zeroTime := time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
// Sort locks for stable comparison
sort.Sort(LocksById(locks))
assert.Equal(t, []Lock{
Lock{Path: "folder/test1.dat", Id: "101", Name: "Fred", Email: "fred@bloggs.com", LockedAt: zeroTime},
Lock{Path: "folder/test2.dat", Id: "102", Name: "Fred", Email: "fred@bloggs.com", LockedAt: zeroTime},
Lock{Path: "root.dat", Id: "103", Name: "Fred", Email: "fred@bloggs.com", LockedAt: zeroTime},
}, locks)
}

@ -7,6 +7,7 @@ func Noop() Meter {
type nonMeter struct{}
func (m *nonMeter) Start() {}
func (m *nonMeter) Add(size int64) {}
func (m *nonMeter) Skip(size int64) {}
func (m *nonMeter) StartTransfer(name string) {}
func (m *nonMeter) TransferBytes(direction, name string, read, total int64, current int) {}

@ -4,6 +4,7 @@ package progress
type Meter interface {
Start()
Add(int64)
Skip(size int64)
StartTransfer(name string)
TransferBytes(direction, name string, read, total int64, current int)

@ -14,6 +14,14 @@ if [ -z "$GOPATH" ]; then
ln -s "$GOPATH" src/github.com/git-lfs/git-lfs
fi
if uname -s | grep -q "_NT-"; then
echo "Installing goversioninfo to embed resources into Windows executables..."
go get github.com/josephspurrier/goversioninfo/cmd/goversioninfo
export PATH=$PATH:$GOPATH/bin/windows_386
echo "Creating the resource.syso version information file..."
go generate
fi
script/fmt
rm -rf bin/*
GO15VENDOREXPERIMENT=1 go run script/*.go -cmd build "$@"

21
script/update-version Executable file

@ -0,0 +1,21 @@
#!/usr/bin/env bash
VERSION_STRING=$1
VERSION_ARRAY=( ${VERSION_STRING//./ } )
VERSION_MAJOR=${VERSION_ARRAY[0]}
VERSION_MINOR=${VERSION_ARRAY[1]}
VERSION_PATCH=${VERSION_ARRAY[2]:-0}
VERSION_BUILD=${VERSION_ARRAY[3]:-0}
# Update the version number git-lfs is reporting.
sed -i "s,\(Version = \"\).*\(\"\),\1$VERSION_STRING\2," config/version.go
# Update the version number in the RPM package.
sed -i "s,\(Version:[[:space:]]*\).*,\1$VERSION_STRING," rpm/SPECS/git-lfs.spec
# Update the version numbers in the Windows installer.
sed -i "s,\(\"Major\": \).*\,,\1$VERSION_MAJOR\,," versioninfo.json
sed -i "s,\(\"Minor\": \).*\,,\1$VERSION_MINOR\,," versioninfo.json
sed -i "s,\(\"Patch\": \).*\,,\1$VERSION_PATCH\,," versioninfo.json
sed -i "s,\(\"Build\": \).*,\1$VERSION_BUILD," versioninfo.json
sed -i "s,\(\"ProductVersion\": \"\).*\(\"\),\1$VERSION_STRING\2," versioninfo.json

@ -1,8 +1,10 @@
#define MyAppName "Git LFS"
; Misuse RemoveFileExt to strip the 4th patch-level version number.
; Arbitrarily choose the x86 executable here as both have the version embedded.
#define MyAppVersion RemoveFileExt(GetFileVersion("..\..\git-lfs-x86.exe"))
#define MyVersionInfoVersion GetFileVersion("..\..\git-lfs-x86.exe")
; Misuse RemoveFileExt to strip the 4th patch-level version number.
#define MyAppVersion RemoveFileExt(MyVersionInfoVersion)
#define MyAppPublisher "GitHub, Inc."
#define MyAppURL "https://git-lfs.github.com/"
@ -15,7 +17,7 @@
AppId={{286391DE-F778-44EA-9375-1B21AAA04FF0}
AppName={#MyAppName}
AppVersion={#MyAppVersion}
;AppVerName={#MyAppName} {#MyAppVersion}
AppCopyright=GitHub, Inc. and Git LFS contributors
AppPublisher={#MyAppPublisher}
AppPublisherURL={#MyAppURL}
AppSupportURL={#MyAppURL}
@ -32,6 +34,7 @@ DisableReadyPage=True
ArchitecturesInstallIn64BitMode=x64
ChangesEnvironment=yes
SetupIconFile=git-lfs-logo.ico
VersionInfoVersion={#MyVersionInfoVersion}
WizardImageFile=git-lfs-wizard-image.bmp
WizardSmallImageFile=git-lfs-logo.bmp

@ -1,20 +0,0 @@
main ICON "git-lfs-logo.ico"
1 VERSIONINFO
FILEVERSION 1,5,0,0
BEGIN
BLOCK "StringFileInfo"
BEGIN
BLOCK "040904b0" /* LANG_ENGLISH/SUBLANG_ENGLISH_US, Unicode CP */
BEGIN
VALUE "FileDescription", "Git LFS\0"
VALUE "ProductName", "Git Large File Storage (LFS)\0"
VALUE "ProductVersion", "1.5.0\0"
VALUE "LegalCopyright", "GitHub, Inc. and Git LFS contributors\0"
END
END
BLOCK "VarFileInfo"
BEGIN
VALUE "Translation", 0x409, 1200
END
END

@ -165,11 +165,11 @@ func buildTestData() (oidsExist, oidsMissing []TestObject, err error) {
for _, f := range outputs[0].Files {
oidsExist = append(oidsExist, TestObject{Oid: f.Oid, Size: f.Size})
u, err := lfs.NewUploadable(f.Oid, "Test file")
t, err := uploadTransfer(f.Oid, "Test file")
if err != nil {
return nil, nil, err
}
uploadQueue.Add(u)
uploadQueue.Add(t.Name, t.Path, t.Oid, t.Size)
}
uploadQueue.Wait()
@ -287,6 +287,25 @@ func interleaveTestData(slice1, slice2 []TestObject) []TestObject {
return ret
}
func uploadTransfer(oid, filename string) (*tq.Transfer, error) {
localMediaPath, err := lfs.LocalMediaPath(oid)
if err != nil {
return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
}
fi, err := os.Stat(localMediaPath)
if err != nil {
return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
}
return &tq.Transfer{
Name: filename,
Path: localMediaPath,
Oid: oid,
Size: fi.Size(),
}, nil
}
func init() {
RootCmd.Flags().StringVarP(&apiUrl, "url", "u", "", "URL of the API (must supply this or --clone)")
RootCmd.Flags().StringVarP(&cloneUrl, "clone", "c", "", "Clone URL from which to find API (must supply this or --url)")

@ -28,7 +28,7 @@ begin_test "custom-transfer-wrong-path"
GIT_TRACE=1 git push origin master 2>&1 | tee pushcustom.log
# use PIPESTATUS otherwise we get exit code from tee
res=${PIPESTATUS[0]}
grep "xfer: Custom transfer adapter" pushcustom.log
grep "xfer: adapter \"testcustom\" Begin()" pushcustom.log
grep "Failed to start custom transfer command" pushcustom.log
if [ "$res" = "0" ]; then
echo "Push should have failed because of an incorrect custom transfer path."
@ -101,9 +101,8 @@ begin_test "custom-transfer-upload-download"
grep "xfer: started custom adapter process" fetchcustom.log
grep "xfer\[lfstest-customadapter\]:" fetchcustom.log
grep "11 of 11 files" fetchcustom.log
grep "11 of 11 files" fetchcustom.log
[ `find .git/lfs/objects -type f | wc -l` = 11 ]
)
end_test

@ -88,3 +88,48 @@ begin_test "list locks with pagination"
grep "4 lock(s) matched query" locks.log
)
end_test
begin_test "cached locks"
(
set -e
reponame="cached_locks"
setup_remote_repo "remote_$reponame"
clone_repo "remote_$reponame" "clone_$reponame"
git lfs track "*.dat"
echo "foo" > "cached1.dat"
echo "bar" > "cached2.dat"
git add "cached1.dat" "cached2.dat" ".gitattributes"
git commit -m "add files" | tee commit.log
grep "3 files changed" commit.log
grep "create mode 100644 cached1.dat" commit.log
grep "create mode 100644 cached2.dat" commit.log
grep "create mode 100644 .gitattributes" commit.log
git push origin master 2>&1 | tee push.log
grep "master -> master" push.log
GITLFSLOCKSENABLED=1 git lfs lock "cached1.dat" | tee lock.log
assert_server_lock "$(grep -oh "\((.*)\)" lock.log | tr -d "()")"
GITLFSLOCKSENABLED=1 git lfs lock "cached2.dat" | tee lock.log
assert_server_lock "$(grep -oh "\((.*)\)" lock.log | tr -d "()")"
GITLFSLOCKSENABLED=1 git lfs locks --local | tee locks.log
grep "2 lock(s) matched query" locks.log
# delete the remote to prove we're using the local records
git remote remove origin
GITLFSLOCKSENABLED=1 git lfs locks --local --path "cached1.dat" | tee locks.log
grep "1 lock(s) matched query" locks.log
grep "cached1.dat" locks.log
GITLFSLOCKSENABLED=1 git lfs locks --local --limit 1 | tee locks.log
grep "1 lock(s) matched query" locks.log
)
end_test

@ -507,7 +507,7 @@ begin_test "push (retry with expired actions)"
GIT_TRACE=1 git push origin master 2>&1 | tee push.log
[ "1" -eq "$(grep -c "expired, retrying..." push.log)" ]
[ "1" -eq "$(grep -c "enqueue retry" push.log)" ]
grep "(1 of 1 files)" push.log
)
end_test

230
tools/kv/keyvaluestore.go Normal file

@ -0,0 +1,230 @@
package kv
import (
"encoding/gob"
"fmt"
"io"
"os"
"sync"
)
// Store provides an in-memory key/value store which is persisted to
// a file. The file handle itself is not kept locked for the duration; it is
// only locked during load and save, to make it concurrency friendly. When
// saving, the store uses optimistic locking to determine whether the db on disk
// has been modified by another process; in which case it loads the latest
// version and re-applies modifications made during this session. This means
// the Lost Update db concurrency issue is possible; so don't use this if you
// need more DB integrity than Read Committed isolation levels.
type Store struct {
// Locks the entire store
mu sync.RWMutex
filename string
log []change
// This is the persistent data
// version for optimistic locking, this field is incremented with every Save()
version int64
db map[string]interface{}
}
// Type of operation; set or remove
type operation int
const (
// Set a value for a key
setOperation = operation(iota)
// Removed a value for a key
removeOperation = operation(iota)
)
type change struct {
op operation
key string
value interface{}
}
// NewStore creates a new key/value store and initialises it with contents from
// the named file, if it exists
func NewStore(filepath string) (*Store, error) {
kv := &Store{filename: filepath, db: make(map[string]interface{})}
return kv, kv.loadAndMergeIfNeeded()
}
// Set updates the key/value store in memory
// Changes are not persisted until you call Save()
func (k *Store) Set(key string, value interface{}) {
k.mu.Lock()
defer k.mu.Unlock()
k.db[key] = value
k.logChange(setOperation, key, value)
}
// Remove removes the key and its value from the store in memory
// Changes are not persisted until you call Save()
func (k *Store) Remove(key string) {
k.mu.Lock()
defer k.mu.Unlock()
delete(k.db, key)
k.logChange(removeOperation, key, nil)
}
// RemoveAll removes all entries from the store
// These changes are not persisted until you call Save()
func (k *Store) RemoveAll() {
k.mu.Lock()
defer k.mu.Unlock()
// Log all changes
for key, _ := range k.db {
k.logChange(removeOperation, key, nil)
}
k.db = make(map[string]interface{})
}
// Visit walks through the entire store via a function; return false from
// your visitor function to halt the walk
func (k *Store) Visit(cb func(string, interface{}) bool) {
// Read-only lock
k.mu.RLock()
defer k.mu.RUnlock()
for k, v := range k.db {
if !cb(k, v) {
break
}
}
}
// Append a change to the log; mutex must already be locked
func (k *Store) logChange(op operation, key string, value interface{}) {
k.log = append(k.log, change{op, key, value})
}
// Get retrieves a value from the store, or nil if it is not present
func (k *Store) Get(key string) interface{} {
// Read-only lock
k.mu.RLock()
defer k.mu.RUnlock()
// zero value of interface{} is nil so this does what we want
return k.db[key]
}
// Save persists the changes made to disk
// If any changes have been written by other code they will be merged
func (k *Store) Save() error {
k.mu.Lock()
defer k.mu.Unlock()
// Short-circuit if we have no changes
if len(k.log) == 0 {
return nil
}
// firstly peek at version; open read/write to keep lock between check & write
f, err := os.OpenFile(k.filename, os.O_RDWR|os.O_CREATE, 0664)
if err != nil {
return err
}
defer f.Close()
// Only try to merge if > 0 bytes, ignore empty files (decoder will fail)
if stat, _ := f.Stat(); stat.Size() > 0 {
k.loadAndMergeReaderIfNeeded(f)
// Now we overwrite the file
f.Seek(0, os.SEEK_SET)
f.Truncate(0)
}
k.version++
enc := gob.NewEncoder(f)
if err := enc.Encode(k.version); err != nil {
return fmt.Errorf("Error while writing version data to %v: %v", k.filename, err)
}
if err := enc.Encode(k.db); err != nil {
return fmt.Errorf("Error while writing new key/value data to %v: %v", k.filename, err)
}
// Clear log now that it's saved
k.log = nil
return nil
}
// Reads as little as possible from the passed in file to determine if the
// contents are different from the version already held. If so, reads the
// contents and merges with any outstanding changes. If not, stops early without
// reading the rest of the file
func (k *Store) loadAndMergeIfNeeded() error {
stat, err := os.Stat(k.filename)
if err != nil {
if os.IsNotExist(err) {
return nil // missing is OK
}
return err
}
// Do nothing if empty file
if stat.Size() == 0 {
return nil
}
f, err := os.OpenFile(k.filename, os.O_RDONLY, 0664)
if err == nil {
defer f.Close()
return k.loadAndMergeReaderIfNeeded(f)
} else {
return err
}
}
// As loadAndMergeIfNeeded but lets caller decide how to manage file handles
func (k *Store) loadAndMergeReaderIfNeeded(f io.Reader) error {
var versionOnDisk int64
// Decode *only* the version field to check whether anyone else has
// modified the db; gob serializes structs in order so it will always be 1st
dec := gob.NewDecoder(f)
err := dec.Decode(&versionOnDisk)
if err != nil {
return fmt.Errorf("Problem checking version of key/value data from %v: %v", k.filename, err)
}
// Totally uninitialised Version == 0, saved versions are always >=1
if versionOnDisk != k.version {
// Reload data & merge
var dbOnDisk map[string]interface{}
err = dec.Decode(&dbOnDisk)
if err != nil {
return fmt.Errorf("Problem reading updated key/value data from %v: %v", k.filename, err)
}
k.reapplyChanges(dbOnDisk)
k.version = versionOnDisk
}
return nil
}
// reapplyChanges replays the changes made since the last load onto baseDb
// and stores the result as our own DB
func (k *Store) reapplyChanges(baseDb map[string]interface{}) {
for _, change := range k.log {
switch change.op {
case setOperation:
baseDb[change.key] = change.value
case removeOperation:
delete(baseDb, change.key)
}
}
// Note, log is not cleared here, that only happens on Save since it's a
// list of unsaved changes
k.db = baseDb
}
// RegisterTypeForStorage registers a custom type (e.g. a struct) for
// use in the key value store. This is necessary if you intend to pass custom
// structs to Store.Set() rather than primitive types.
func RegisterTypeForStorage(val interface{}) {
gob.Register(val)
}

@ -0,0 +1,187 @@
package kv
import (
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func TestStoreSimple(t *testing.T) {
tmpf, err := ioutil.TempFile("", "lfstest1")
assert.Nil(t, err)
filename := tmpf.Name()
defer os.Remove(filename)
tmpf.Close()
kvs, err := NewStore(filename)
assert.Nil(t, err)
// We'll include storing custom structs
type customData struct {
Val1 string
Val2 int
}
// Needed to store custom struct
RegisterTypeForStorage(&customData{})
kvs.Set("stringVal", "This is a string value")
kvs.Set("intVal", 3)
kvs.Set("floatVal", 3.142)
kvs.Set("structVal", &customData{"structTest", 20})
s := kvs.Get("stringVal")
assert.Equal(t, "This is a string value", s)
i := kvs.Get("intVal")
assert.Equal(t, 3, i)
f := kvs.Get("floatVal")
assert.Equal(t, 3.142, f)
c := kvs.Get("structVal")
assert.Equal(t, c, &customData{"structTest", 20})
n := kvs.Get("noValue")
assert.Nil(t, n)
kvs.Remove("stringVal")
s = kvs.Get("stringVal")
assert.Nil(t, s)
// Set the string value again before saving
kvs.Set("stringVal", "This is a string value")
err = kvs.Save()
assert.Nil(t, err)
kvs = nil
// Now confirm that we can read it all back
kvs2, err := NewStore(filename)
assert.Nil(t, err)
s = kvs2.Get("stringVal")
assert.Equal(t, "This is a string value", s)
i = kvs2.Get("intVal")
assert.Equal(t, 3, i)
f = kvs2.Get("floatVal")
assert.Equal(t, 3.142, f)
c = kvs2.Get("structVal")
assert.Equal(t, c, &customData{"structTest", 20})
n = kvs2.Get("noValue")
assert.Nil(t, n)
// Test remove all
kvs2.RemoveAll()
s = kvs2.Get("stringVal")
assert.Nil(t, s)
i = kvs2.Get("intVal")
assert.Nil(t, i)
f = kvs2.Get("floatVal")
assert.Nil(t, f)
c = kvs2.Get("structVal")
assert.Nil(t, c)
err = kvs2.Save()
assert.Nil(t, err)
kvs2 = nil
// Now confirm that we can read blank & get nothing
kvs, err = NewStore(filename)
kvs.Visit(func(k string, v interface{}) bool {
// Should not be called
assert.Fail(t, "Should be no entries")
return true
})
}
func TestStoreOptimisticConflict(t *testing.T) {
tmpf, err := ioutil.TempFile("", "lfstest2")
assert.Nil(t, err)
filename := tmpf.Name()
defer os.Remove(filename)
tmpf.Close()
kvs1, err := NewStore(filename)
assert.Nil(t, err)
kvs1.Set("key1", "value1")
kvs1.Set("key2", "value2")
kvs1.Set("key3", "value3")
err = kvs1.Save()
assert.Nil(t, err)
// Load second copy & modify
kvs2, err := NewStore(filename)
assert.Nil(t, err)
// New keys
kvs2.Set("key4", "value4_fromkvs2")
kvs2.Set("key5", "value5_fromkvs2")
// Modify a key too
kvs2.Set("key1", "value1_fromkvs2")
err = kvs2.Save()
assert.Nil(t, err)
// Now modify first copy & save; it should detect optimistic lock issue
// New item
kvs1.Set("key10", "value10")
// Overlapping item; since we save second this will overwrite one from kvs2
kvs1.Set("key4", "value4")
err = kvs1.Save()
assert.Nil(t, err)
// This should have merged changes from kvs2 in the process
v := kvs1.Get("key1")
assert.Equal(t, "value1_fromkvs2", v) // this one was modified by kvs2
v = kvs1.Get("key2")
assert.Equal(t, "value2", v)
v = kvs1.Get("key3")
assert.Equal(t, "value3", v)
v = kvs1.Get("key4")
assert.Equal(t, "value4", v) // we overwrote this so would not be merged
v = kvs1.Get("key5")
assert.Equal(t, "value5_fromkvs2", v)
}
func TestStoreReduceSize(t *testing.T) {
tmpf, err := ioutil.TempFile("", "lfstest3")
assert.Nil(t, err)
filename := tmpf.Name()
defer os.Remove(filename)
tmpf.Close()
kvs, err := NewStore(filename)
assert.Nil(t, err)
kvs.Set("key1", "I woke up in a Soho doorway")
kvs.Set("key2", "A policeman knew my name")
kvs.Set("key3", "He said 'You can go sleep at home tonight")
kvs.Set("key4", "If you can get up and walk away'")
assert.NotNil(t, kvs.Get("key1"))
assert.NotNil(t, kvs.Get("key2"))
assert.NotNil(t, kvs.Get("key3"))
assert.NotNil(t, kvs.Get("key4"))
assert.Nil(t, kvs.Save())
stat1, _ := os.Stat(filename)
// Remove all but 1 key & save smaller version
kvs.Remove("key2")
kvs.Remove("key3")
kvs.Remove("key4")
assert.Nil(t, kvs.Save())
// Now reload fresh & prove works
kvs = nil
kvs, err = NewStore(filename)
assert.Nil(t, err)
assert.NotNil(t, kvs.Get("key1"))
assert.Nil(t, kvs.Get("key2"))
assert.Nil(t, kvs.Get("key3"))
assert.Nil(t, kvs.Get("key4"))
stat2, _ := os.Stat(filename)
assert.True(t, stat2.Size() < stat1.Size(), "Size should have reduced, was %d now %d", stat1.Size(), stat2.Size())
}

@ -3,19 +3,10 @@ package tq
import (
"fmt"
"sync"
"time"
"github.com/git-lfs/git-lfs/errors"
"github.com/rubyist/tracerx"
)
const (
// objectExpirationToTransfer is the duration we expect to have passed
// from the time that the object's expires_at property is checked to
// when the transfer is executed.
objectExpirationToTransfer = 5 * time.Second
)
// adapterBase implements the common functionality for core adapters which
// process transfers with N workers handling an oid each, and which wait for
// authentication to succeed on one worker before proceeding
@ -69,9 +60,10 @@ func (a *adapterBase) Direction() Direction {
return a.direction
}
func (a *adapterBase) Begin(maxConcurrency int, cb ProgressCallback) error {
func (a *adapterBase) Begin(cfg AdapterConfig, cb ProgressCallback) error {
a.cb = cb
a.jobChan = make(chan *job, 100)
maxConcurrency := cfg.ConcurrentTransfers()
tracerx.Printf("xfer: adapter %q Begin() with %d workers", a.Name(), maxConcurrency)
@ -156,27 +148,12 @@ func (a *adapterBase) worker(workerNum int, ctx interface{}) {
signalAuthOnResponse = false
}
}
tracerx.Printf("xfer: adapter %q worker %d processing job for %q", a.Name(), workerNum, t.Object.Oid)
// transferTime is the time that we are to compare the transfer's
// `expired_at` property against.
//
// We add the `objectExpirationToTransfer` since there will be
// some time lost from this comparison to the time we actually
// transfer the object
transferTime := time.Now().Add(objectExpirationToTransfer)
tracerx.Printf("xfer: adapter %q worker %d processing job for %q", a.Name(), workerNum, t.Oid)
// Actual transfer happens here
var err error
if expAt, expired := t.Object.IsExpired(transferTime); expired {
tracerx.Printf("xfer: adapter %q worker %d found job for %q expired, retrying...", a.Name(), workerNum, t.Object.Oid)
err = errors.NewRetriableError(errors.Errorf(
"lfs/transfer: object %q expires at %s",
t.Object.Oid, expAt.In(time.Local).Format(time.RFC822),
))
} else if t.Object.Size < 0 {
tracerx.Printf("xfer: adapter %q worker %d found invalid size for %q (got: %d), retrying...", a.Name(), workerNum, t.Object.Oid, t.Object.Size)
err = fmt.Errorf("Git LFS: object %q has invalid size (got: %d)", t.Object.Oid, t.Object.Size)
if t.Size < 0 {
err = fmt.Errorf("Git LFS: object %q has invalid size (got: %d)", t.Oid, t.Size)
} else {
err = a.transferImpl.DoTransfer(ctx, t, a.cb, authCallback)
}
@ -184,7 +161,7 @@ func (a *adapterBase) worker(workerNum int, ctx interface{}) {
// Mark the job as completed, and alter all listeners
job.Done(err)
tracerx.Printf("xfer: adapter %q worker %d finished job for %q", a.Name(), workerNum, t.Object.Oid)
tracerx.Printf("xfer: adapter %q worker %d finished job for %q", a.Name(), workerNum, t.Oid)
}
// This will only happen if no jobs were submitted; just wake up all workers to finish
if signalAuthOnResponse {
@ -203,10 +180,10 @@ func advanceCallbackProgress(cb ProgressCallback, t *Transfer, numBytes int64) {
remainder := numBytes - read
if remainder > int64(maxInt) {
read += int64(maxInt)
cb(t.Name, t.Object.Size, read, maxInt)
cb(t.Name, t.Size, read, maxInt)
} else {
read += remainder
cb(t.Name, t.Object.Size, read, int(remainder))
cb(t.Name, t.Size, read, int(remainder))
}
}

@ -72,7 +72,7 @@ func (a *basicDownloadAdapter) checkResumeDownload(t *Transfer) (outFile *os.Fil
f.Close()
return nil, 0, nil, err
}
tracerx.Printf("xfer: Attempting to resume download of %q from byte %d", t.Object.Oid, n)
tracerx.Printf("xfer: Attempting to resume download of %q from byte %d", t.Oid, n)
return f, n, hash, nil
}
@ -80,7 +80,7 @@ func (a *basicDownloadAdapter) checkResumeDownload(t *Transfer) (outFile *os.Fil
// Create or open a download file for resuming
func (a *basicDownloadAdapter) downloadFilename(t *Transfer) string {
// Not a temp file since we will be resuming it
return filepath.Join(a.tempDir(), t.Object.Oid+".tmp")
return filepath.Join(a.tempDir(), t.Oid+".tmp")
}
// download starts or resumes and download. Always closes dlFile if non-nil
@ -91,9 +91,10 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
defer dlFile.Close()
}
rel, ok := t.Object.Rel("download")
if !ok {
return errors.New("Object not found on the server.")
rel, err := t.Actions.Get("download")
if err != nil {
return err
// return errors.New("Object not found on the server.")
}
req, err := httputil.NewHttpRequest("GET", rel.Href, rel.Header)
@ -103,17 +104,17 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
if fromByte > 0 {
if dlFile == nil || hash == nil {
return fmt.Errorf("Cannot restart %v from %d without a file & hash", t.Object.Oid, fromByte)
return fmt.Errorf("Cannot restart %v from %d without a file & hash", t.Oid, fromByte)
}
// We could just use a start byte, but since we know the length be specific
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", fromByte, t.Object.Size-1))
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", fromByte, t.Size-1))
}
res, err := httputil.DoHttpRequest(config.Config, req, t.Object.NeedsAuth())
res, err := httputil.DoHttpRequest(config.Config, req, !t.Authenticated)
if err != nil {
// Special-case status code 416 () - fall back
if fromByte > 0 && dlFile != nil && res.StatusCode == 416 {
tracerx.Printf("xfer: server rejected resume download request for %q from byte %d; re-downloading from start", t.Object.Oid, fromByte)
tracerx.Printf("xfer: server rejected resume download request for %q from byte %d; re-downloading from start", t.Oid, fromByte)
dlFile.Close()
os.Remove(dlFile.Name())
return a.download(t, cb, authOkFunc, nil, 0, nil)
@ -150,11 +151,11 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
failReason = fmt.Sprintf("expected status code 206, received %d", res.StatusCode)
}
if rangeRequestOk {
tracerx.Printf("xfer: server accepted resume download request: %q from byte %d", t.Object.Oid, fromByte)
tracerx.Printf("xfer: server accepted resume download request: %q from byte %d", t.Oid, fromByte)
advanceCallbackProgress(cb, t, fromByte)
} else {
// Abort resume, perform regular download
tracerx.Printf("xfer: failed to resume download for %q from byte %d: %s. Re-downloading from start", t.Object.Oid, fromByte, failReason)
tracerx.Printf("xfer: failed to resume download for %q from byte %d: %s. Re-downloading from start", t.Oid, fromByte, failReason)
dlFile.Close()
os.Remove(dlFile.Name())
if res.StatusCode == 200 {
@ -210,8 +211,8 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
return fmt.Errorf("can't close tempfile %q: %v", dlfilename, err)
}
if actual := hasher.Hash(); actual != t.Object.Oid {
return fmt.Errorf("Expected OID %s, got %s after %d bytes written", t.Object.Oid, actual, written)
if actual := hasher.Hash(); actual != t.Oid {
return fmt.Errorf("Expected OID %s, got %s after %d bytes written", t.Oid, actual, written)
}
return tools.RenameFileCopyPermissions(dlfilename, t.Path)

@ -1,17 +1,16 @@
package tq
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/httputil"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/git-lfs/git-lfs/progress"
)
@ -45,9 +44,10 @@ func (a *basicUploadAdapter) WorkerEnding(workerNum int, ctx interface{}) {
}
func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error {
rel, ok := t.Object.Rel("upload")
if !ok {
return fmt.Errorf("No upload action for this object.")
rel, err := t.Actions.Get("upload")
if err != nil {
return err
// return fmt.Errorf("No upload action for this object.")
}
req, err := httputil.NewHttpRequest("PUT", rel.Href, rel.Header)
@ -62,10 +62,10 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
if req.Header.Get("Transfer-Encoding") == "chunked" {
req.TransferEncoding = []string{"chunked"}
} else {
req.Header.Set("Content-Length", strconv.FormatInt(t.Object.Size, 10))
req.Header.Set("Content-Length", strconv.FormatInt(t.Size, 10))
}
req.ContentLength = t.Object.Size
req.ContentLength = t.Size
f, err := os.OpenFile(t.Path, os.O_RDONLY, 0644)
if err != nil {
@ -84,7 +84,7 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
var reader io.Reader
reader = &progress.CallbackReader{
C: ccb,
TotalSize: t.Object.Size,
TotalSize: t.Size,
Reader: f,
}
@ -97,7 +97,7 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
req.Body = ioutil.NopCloser(reader)
res, err := httputil.DoHttpRequest(config.Config, req, t.Object.NeedsAuth())
res, err := httputil.DoHttpRequest(config.Config, req, !t.Authenticated)
if err != nil {
return errors.NewRetriableError(err)
}
@ -117,7 +117,8 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()
return api.VerifyUpload(config.Config, t.Object)
cli := &lfsapi.Client{}
return verifyUpload(cli, t)
}
// startCallbackReader is a reader wrapper which calls a function as soon as the

@ -4,7 +4,6 @@ import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"os/exec"
@ -74,17 +73,17 @@ func NewCustomAdapterInitRequest(op string, concurrent bool, concurrentTransfers
}
type customAdapterTransferRequest struct { // common between upload/download
Event string `json:"event"`
Oid string `json:"oid"`
Size int64 `json:"size"`
Path string `json:"path,omitempty"`
Action *api.LinkRelation `json:"action"`
Event string `json:"event"`
Oid string `json:"oid"`
Size int64 `json:"size"`
Path string `json:"path,omitempty"`
Action *Action `json:"action"`
}
func NewCustomAdapterUploadRequest(oid string, size int64, path string, action *api.LinkRelation) *customAdapterTransferRequest {
func NewCustomAdapterUploadRequest(oid string, size int64, path string, action *Action) *customAdapterTransferRequest {
return &customAdapterTransferRequest{"upload", oid, size, path, action}
}
func NewCustomAdapterDownloadRequest(oid string, size int64, action *api.LinkRelation) *customAdapterTransferRequest {
func NewCustomAdapterDownloadRequest(oid string, size int64, action *Action) *customAdapterTransferRequest {
return &customAdapterTransferRequest{"download", oid, size, "", action}
}
@ -98,26 +97,24 @@ func NewCustomAdapterTerminateRequest() *customAdapterTerminateRequest {
// A common struct that allows all types of response to be identified
type customAdapterResponseMessage struct {
Event string `json:"event"`
Error *api.ObjectError `json:"error"`
Oid string `json:"oid"`
Path string `json:"path,omitempty"` // always blank for upload
BytesSoFar int64 `json:"bytesSoFar"`
BytesSinceLast int `json:"bytesSinceLast"`
Event string `json:"event"`
Error *ObjectError `json:"error"`
Oid string `json:"oid"`
Path string `json:"path,omitempty"` // always blank for upload
BytesSoFar int64 `json:"bytesSoFar"`
BytesSinceLast int `json:"bytesSinceLast"`
}
func (a *customAdapter) Begin(maxConcurrency int, cb ProgressCallback) error {
// If config says not to launch multiple processes, downgrade incoming value
useConcurrency := maxConcurrency
if !a.concurrent {
useConcurrency = 1
func (a *customAdapter) Begin(cfg AdapterConfig, cb ProgressCallback) error {
a.originalConcurrency = cfg.ConcurrentTransfers()
if a.concurrent {
// Use common workers impl, but downgrade workers to number of processes
return a.adapterBase.Begin(cfg, cb)
}
a.originalConcurrency = maxConcurrency
tracerx.Printf("xfer: Custom transfer adapter %q using concurrency %d", a.name, useConcurrency)
// Use common workers impl, but downgrade workers to number of processes
return a.adapterBase.Begin(useConcurrency, cb)
// If config says not to launch multiple processes, downgrade incoming value
newCfg := &Manifest{concurrentTransfers: 1}
return a.adapterBase.Begin(newCfg, cb)
}
func (a *customAdapter) ClearTempStorage() error {
@ -268,18 +265,18 @@ func (a *customAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCall
}
var authCalled bool
rel, ok := t.Object.Rel(a.getOperationName())
if !ok {
return errors.New("Object not found on the server.")
rel, err := t.Actions.Get(a.getOperationName())
if err != nil {
return err
// return errors.New("Object not found on the server.")
}
var req *customAdapterTransferRequest
if a.direction == Upload {
req = NewCustomAdapterUploadRequest(t.Object.Oid, t.Object.Size, t.Path, rel)
req = NewCustomAdapterUploadRequest(t.Oid, t.Size, t.Path, rel)
} else {
req = NewCustomAdapterDownloadRequest(t.Object.Oid, t.Object.Size, rel)
req = NewCustomAdapterDownloadRequest(t.Oid, t.Size, rel)
}
err := a.sendMessage(customCtx, req)
if err != nil {
if err = a.sendMessage(customCtx, req); err != nil {
return err
}
@ -294,24 +291,24 @@ func (a *customAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCall
switch resp.Event {
case "progress":
// Progress
if resp.Oid != t.Object.Oid {
return fmt.Errorf("Unexpected oid %q in response, expecting %q", resp.Oid, t.Object.Oid)
if resp.Oid != t.Oid {
return fmt.Errorf("Unexpected oid %q in response, expecting %q", resp.Oid, t.Oid)
}
if cb != nil {
cb(t.Name, t.Object.Size, resp.BytesSoFar, resp.BytesSinceLast)
cb(t.Name, t.Size, resp.BytesSoFar, resp.BytesSinceLast)
}
wasAuthOk = resp.BytesSoFar > 0
case "complete":
// Download/Upload complete
if resp.Oid != t.Object.Oid {
return fmt.Errorf("Unexpected oid %q in response, expecting %q", resp.Oid, t.Object.Oid)
if resp.Oid != t.Oid {
return fmt.Errorf("Unexpected oid %q in response, expecting %q", resp.Oid, t.Oid)
}
if resp.Error != nil {
return fmt.Errorf("Error transferring %q: %v", t.Object.Oid, resp.Error)
return fmt.Errorf("Error transferring %q: %v", t.Oid, resp.Error)
}
if a.direction == Download {
// So we don't have to blindly trust external providers, check SHA
if err = tools.VerifyFileHash(t.Object.Oid, resp.Path); err != nil {
if err = tools.VerifyFileHash(t.Oid, resp.Path); err != nil {
return fmt.Errorf("Downloaded file failed checks: %v", err)
}
// Move file to final location
@ -319,7 +316,7 @@ func (a *customAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCall
return fmt.Errorf("Failed to copy downloaded file: %v", err)
}
} else if a.direction == Upload {
if err = api.VerifyUpload(config.Config, t.Object); err != nil {
if err = api.VerifyUpload(config.Config, toApiObject(t)); err != nil {
return err
}
}

@ -2,7 +2,13 @@
// NOTE: Subject to change, do not rely on this package from outside git-lfs source
package tq
import "github.com/git-lfs/git-lfs/api"
import (
"fmt"
"time"
"github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/errors"
)
type Direction int
@ -11,6 +17,137 @@ const (
Download = Direction(iota)
)
type Transfer struct {
Name string `json:"name"`
Oid string `json:"oid,omitempty"`
Size int64 `json:"size"`
Authenticated bool `json:"authenticated,omitempty"`
Actions ActionSet `json:"actions,omitempty"`
Error *ObjectError `json:"error,omitempty"`
Path string `json:"path"`
}
type ObjectError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// newTransfer creates a new Transfer instance
func newTransfer(name string, obj *api.ObjectResource, path string) *Transfer {
t := &Transfer{
Name: name,
Oid: obj.Oid,
Size: obj.Size,
Authenticated: obj.Authenticated,
Actions: make(ActionSet),
Path: path,
}
if obj.Error != nil {
t.Error = &ObjectError{
Code: obj.Error.Code,
Message: obj.Error.Message,
}
}
for rel, action := range obj.Actions {
t.Actions[rel] = &Action{
Href: action.Href,
Header: action.Header,
ExpiresAt: action.ExpiresAt,
}
}
return t
}
type Action struct {
Href string `json:"href"`
Header map[string]string `json:"header,omitempty"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
}
type ActionSet map[string]*Action
const (
// objectExpirationToTransfer is the duration we expect to have passed
// from the time that the object's expires_at property is checked to
// when the transfer is executed.
objectExpirationToTransfer = 5 * time.Second
)
func (as ActionSet) Get(rel string) (*Action, error) {
a, ok := as[rel]
if !ok {
return nil, &ActionMissingError{Rel: rel}
}
if !a.ExpiresAt.IsZero() && a.ExpiresAt.Before(time.Now().Add(objectExpirationToTransfer)) {
return nil, errors.NewRetriableError(&ActionExpiredErr{Rel: rel, At: a.ExpiresAt})
}
return a, nil
}
type ActionExpiredErr struct {
Rel string
At time.Time
}
func (e ActionExpiredErr) Error() string {
return fmt.Sprintf("tq: action %q expires at %s",
e.Rel, e.At.In(time.Local).Format(time.RFC822))
}
type ActionMissingError struct {
Rel string
}
func (e ActionMissingError) Error() string {
return fmt.Sprintf("tq: unable to find action %q", e.Rel)
}
func IsActionExpiredError(err error) bool {
if _, ok := err.(*ActionExpiredErr); ok {
return true
}
return false
}
func IsActionMissingError(err error) bool {
if _, ok := err.(*ActionMissingError); ok {
return true
}
return false
}
func toApiObject(t *Transfer) *api.ObjectResource {
o := &api.ObjectResource{
Oid: t.Oid,
Size: t.Size,
Authenticated: t.Authenticated,
Actions: make(map[string]*api.LinkRelation),
}
for rel, a := range t.Actions {
o.Actions[rel] = &api.LinkRelation{
Href: a.Href,
Header: a.Header,
ExpiresAt: a.ExpiresAt,
}
}
if t.Error != nil {
o.Error = &api.ObjectError{
Code: t.Error.Code,
Message: t.Error.Message,
}
}
return o
}
// NewAdapterFunc creates new instances of Adapter. Code that wishes
// to provide new Adapter instances should pass an implementation of this
// function to RegisterNewTransferAdapterFunc() on a *Manifest.
@ -19,6 +156,10 @@ type NewAdapterFunc func(name string, dir Direction) Adapter
type ProgressCallback func(name string, totalSize, readSoFar int64, readSinceLast int) error
type AdapterConfig interface {
ConcurrentTransfers() int
}
// Adapter is implemented by types which can upload and/or download LFS
// file content to a remote store. Each Adapter accepts one or more requests
// which it may schedule and parallelise in whatever way it chooses, clients of
@ -43,7 +184,7 @@ type Adapter interface {
// one or more Add calls. maxConcurrency controls the number of transfers
// that may be done at once. The passed in callback will receive updates on
// progress. Either argument may be nil if not required by the client.
Begin(maxConcurrency int, cb ProgressCallback) error
Begin(cfg AdapterConfig, cb ProgressCallback) error
// Add queues a download/upload, which will complete asynchronously and
// notify the callbacks given to Begin()
Add(transfers ...*Transfer) (results <-chan TransferResult)
@ -56,22 +197,6 @@ type Adapter interface {
ClearTempStorage() error
}
// General struct for both uploads and downloads
type Transfer struct {
// Name of the file that triggered this transfer
Name string
// Object from API which provides the core data for this transfer
Object *api.ObjectResource
// Path for uploads is the source of data to send, for downloads is the
// location to place the final result
Path string
}
// NewTransfer creates a new Transfer instance
func NewTransfer(name string, obj *api.ObjectResource, path string) *Transfer {
return &Transfer{name, obj, path}
}
// Result of a transfer returned through CompletionChannel()
type TransferResult struct {
Transfer *Transfer

@ -15,15 +15,6 @@ const (
defaultBatchSize = 100
)
type Transferable interface {
Oid() string
Size() int64
Name() string
Path() string
Object() *api.ObjectResource
SetObject(*api.ObjectResource)
}
type retryCounter struct {
MaxRetries int `git:"lfs.transfer.maxretries"`
@ -71,31 +62,32 @@ func (r *retryCounter) CanRetry(oid string) (int, bool) {
return count, count < r.MaxRetries
}
// Batch implements the sort.Interface interface and enables sorting on a slice
// of `Transferable`s by object size.
// batch implements the sort.Interface interface and enables sorting on a slice
// of `*Transfer`s by object size.
//
// This interface is implemented here so that the largest objects can be
// processed first. Since adding a new batch is unable to occur until the
// current batch has finished processing, this enables us to reduce the risk of
// a single worker getting tied up on a large item at the end of a batch while
// all other workers are sitting idle.
type Batch []Transferable
type batch []*objectTuple
func (b Batch) ApiObjects() []*api.ObjectResource {
func (b batch) ApiObjects() []*api.ObjectResource {
transfers := make([]*api.ObjectResource, 0, len(b))
for _, t := range b {
transfers = append(transfers, &api.ObjectResource{
Oid: t.Oid(),
Size: t.Size(),
})
transfers = append(transfers, tupleToApiObject(t))
}
return transfers
}
func (b Batch) Len() int { return len(b) }
func (b Batch) Less(i, j int) bool { return b[i].Size() < b[j].Size() }
func (b Batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func tupleToApiObject(t *objectTuple) *api.ObjectResource {
return &api.ObjectResource{Oid: t.Oid, Size: t.Size}
}
func (b batch) Len() int { return len(b) }
func (b batch) Less(i, j int) bool { return b[i].Size < b[j].Size }
func (b batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
// TransferQueue organises the wider process of uploading and downloading,
// including calling the API, passing the actual transfer request to transfer
@ -108,11 +100,11 @@ type TransferQueue struct {
dryRun bool
meter progress.Meter
errors []error
transferables map[string]Transferable
transfers map[string]*objectTuple
batchSize int
bufferDepth int
// Channel for processing (and buffering) incoming items
incoming chan Transferable
incoming chan *objectTuple
errorc chan error // Channel for processing errors
watchers []chan string
trMutex *sync.Mutex
@ -127,6 +119,11 @@ type TransferQueue struct {
rc *retryCounter
}
type objectTuple struct {
Name, Path, Oid string
Size int64
}
type Option func(*TransferQueue)
func DryRun(dryRun bool) Option {
@ -152,12 +149,12 @@ func WithBufferDepth(depth int) Option {
// NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *TransferQueue {
q := &TransferQueue{
direction: dir,
errorc: make(chan error),
transferables: make(map[string]Transferable),
trMutex: &sync.Mutex{},
manifest: manifest,
rc: newRetryCounter(),
direction: dir,
errorc: make(chan error),
transfers: make(map[string]*objectTuple),
trMutex: &sync.Mutex{},
manifest: manifest,
rc: newRetryCounter(),
}
for _, opt := range options {
@ -173,7 +170,7 @@ func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *Tra
q.bufferDepth = q.batchSize
}
q.incoming = make(chan Transferable, q.bufferDepth)
q.incoming = make(chan *objectTuple, q.bufferDepth)
if q.meter == nil {
q.meter = progress.Noop()
@ -186,28 +183,35 @@ func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *Tra
return q
}
// Add adds a Transferable to the transfer queue. It only increments the amount
// of waiting the TransferQueue has to do if the Transferable "t" is new.
func (q *TransferQueue) Add(t Transferable) {
// Add adds a *Transfer to the transfer queue. It only increments the amount
// of waiting the TransferQueue has to do if the *Transfer "t" is new.
func (q *TransferQueue) Add(name, path, oid string, size int64) {
t := &objectTuple{
Name: name,
Path: path,
Oid: oid,
Size: size,
}
if isNew := q.remember(t); !isNew {
tracerx.Printf("already transferring %q, skipping duplicate", t.Oid())
tracerx.Printf("already transferring %q, skipping duplicate", t.Oid)
return
}
q.incoming <- t
}
// remember remembers the Transferable "t" if the *TransferQueue doesn't already
// know about a Transferable with the same OID.
// remember remembers the *Transfer "t" if the *TransferQueue doesn't already
// know about a Transfer with the same OID.
//
// It returns if the value is new or not.
func (q *TransferQueue) remember(t Transferable) bool {
func (q *TransferQueue) remember(t *objectTuple) bool {
q.trMutex.Lock()
defer q.trMutex.Unlock()
if _, ok := q.transferables[t.Oid()]; !ok {
if _, ok := q.transfers[t.Oid]; !ok {
q.wait.Add(1)
q.transferables[t.Oid()] = t
q.transfers[t.Oid] = t
return true
}
@ -221,7 +225,7 @@ func (q *TransferQueue) remember(t Transferable) bool {
// 2. While the batch contains less items than `q.batchSize` AND the channel
// is open, read one item from the `q.incoming` channel.
// a. If the read was a channel close, go to step 4.
// b. If the read was a Transferable item, go to step 3.
// b. If the read was a TransferTransferable item, go to step 3.
// 3. Append the item to the batch.
// 4. Sort the batch by descending object size, make a batch API call, send
// the items to the `*adapterBase`.
@ -276,7 +280,7 @@ func (q *TransferQueue) collectBatches() {
//
// enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been
// processed.
func (q *TransferQueue) enqueueAndCollectRetriesFor(batch Batch) (Batch, error) {
func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) {
cfg := config.Config
next := q.makeBatch()
@ -293,8 +297,8 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch Batch) (Batch, error)
// that was encountered. If any of the objects couldn't be
// retried, they will be marked as failed.
for _, t := range batch {
if q.canRetryObject(t.Oid(), err) {
q.rc.Increment(t.Oid())
if q.canRetryObject(t.Oid, err) {
q.rc.Increment(t.Oid)
next = append(next, t)
} else {
@ -319,47 +323,51 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch Batch) (Batch, error)
continue
}
if _, needsTransfer := o.Rel(q.transferKind()); needsTransfer {
// If the object has a link relation for the kind of
// transfer that we want to perform, grab a Transferable
// that matches the object's OID.
q.trMutex.Lock()
t, ok := q.transferables[o.Oid]
q.trMutex.Unlock()
q.trMutex.Lock()
t, ok := q.transfers[o.Oid]
q.trMutex.Unlock()
if !ok {
// If we couldn't find any associated
// Transfer object, then we give up on the
// transfer by telling the progress meter to
// skip the number of bytes in "o".
q.errorc <- errors.Errorf("[%v] The server returned an unknown OID.", o.Oid)
if ok {
// If we knew about an associated Transferable,
// begin the transfer.
t.SetObject(o)
q.meter.StartTransfer(t.Name())
toTransfer = append(toTransfer, NewTransfer(
t.Name(), t.Object(), t.Path(),
))
} else {
// If we couldn't find any associated
// Transferable object, then we give up on the
// transfer by telling the progress meter to
// skip the number of bytes in "o".
q.errorc <- errors.Errorf("[%v] The server returned an unknown OID.", o.Oid)
q.Skip(o.Size)
q.wait.Done()
}
} else {
// Otherwise, if the object didn't need a transfer at
// all, skip and decrement it.
q.Skip(o.Size)
q.wait.Done()
} else {
tr := newTransfer(t.Name, o, t.Path)
if _, err := tr.Actions.Get(q.transferKind()); err != nil {
// XXX(taylor): duplication
if q.canRetryObject(tr.Oid, err) {
q.rc.Increment(tr.Oid)
count := q.rc.CountFor(tr.Oid)
tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, tr.Oid, tr.Size)
next = append(next, t)
} else {
if !IsActionMissingError(err) {
q.errorc <- errors.Errorf("[%v] %v", tr.Name, err)
}
q.Skip(o.Size)
q.wait.Done()
}
} else {
q.meter.StartTransfer(t.Name)
toTransfer = append(toTransfer, tr)
}
}
}
retries := q.addToAdapter(toTransfer)
for t := range retries {
q.rc.Increment(t.Oid())
count := q.rc.CountFor(t.Oid())
q.rc.Increment(t.Oid)
count := q.rc.CountFor(t.Oid)
tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid(), t.Size())
tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid, t.Size)
next = append(next, t)
}
@ -369,23 +377,23 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch Batch) (Batch, error)
// makeBatch returns a new, empty batch, with a capacity equal to the maximum
// batch size designated by the `*TransferQueue`.
func (q *TransferQueue) makeBatch() Batch { return make(Batch, 0, q.batchSize) }
func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) }
// addToAdapter adds the given "pending" transfers to the transfer adapters and
// returns a channel of Transferables that are to be retried in the next batch.
// returns a channel of Transfers that are to be retried in the next batch.
// After all of the items in the batch have been processed, the channel is
// closed.
//
// addToAdapter returns immediately, and does not block.
func (q *TransferQueue) addToAdapter(pending []*Transfer) <-chan Transferable {
retries := make(chan Transferable, len(pending))
func (q *TransferQueue) addToAdapter(pending []*Transfer) <-chan *objectTuple {
retries := make(chan *objectTuple, len(pending))
if err := q.ensureAdapterBegun(); err != nil {
close(retries)
q.errorc <- err
for _, t := range pending {
q.Skip(t.Object.Size)
q.Skip(t.Size)
q.wait.Done()
}
@ -426,9 +434,9 @@ func (q *TransferQueue) makeDryRunResults(ts []*Transfer) <-chan TransferResult
// handleTransferResult observes the transfer result, sending it on the retries
// channel if it was able to be retried.
func (q *TransferQueue) handleTransferResult(
res TransferResult, retries chan<- Transferable,
res TransferResult, retries chan<- *objectTuple,
) {
oid := res.Transfer.Object.Oid
oid := res.Transfer.Oid
if res.Error != nil {
// If there was an error encountered when processing the
@ -441,7 +449,7 @@ func (q *TransferQueue) handleTransferResult(
tracerx.Printf("tq: retrying object %s", oid)
q.trMutex.Lock()
t, ok := q.transferables[oid]
t, ok := q.transfers[oid]
q.trMutex.Unlock()
if ok {
@ -522,7 +530,7 @@ func (q *TransferQueue) ensureAdapterBegun() error {
}
tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name())
err := q.adapter.Begin(q.manifest.ConcurrentTransfers(), cb)
err := q.adapter.Begin(q.manifest, cb)
if err != nil {
return err
}
@ -532,7 +540,7 @@ func (q *TransferQueue) ensureAdapterBegun() error {
}
// Wait waits for the queue to finish processing all transfers. Once Wait is
// called, Add will no longer add transferables to the queue. Any failed
// called, Add will no longer add transfers to the queue. Any failed
// transfers will be automatically retried once.
func (q *TransferQueue) Wait() {
close(q.incoming)

@ -21,7 +21,7 @@ func (a *testAdapter) Direction() Direction {
return a.dir
}
func (a *testAdapter) Begin(maxConcurrency int, cb ProgressCallback) error {
func (a *testAdapter) Begin(cfg AdapterConfig, cb ProgressCallback) error {
return nil
}

@ -37,9 +37,10 @@ func (a *tusUploadAdapter) WorkerEnding(workerNum int, ctx interface{}) {
}
func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error {
rel, ok := t.Object.Rel("upload")
if !ok {
return fmt.Errorf("No upload action for this object.")
rel, err := t.Actions.Get("upload")
if err != nil {
return err
// return fmt.Errorf("No upload action for this object.")
}
// Note not supporting the Creation extension since the batch API generates URLs
@ -47,7 +48,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
// 1. Send HEAD request to determine upload start point
// Request must include Tus-Resumable header (version)
tracerx.Printf("xfer: sending tus.io HEAD request for %q", t.Object.Oid)
tracerx.Printf("xfer: sending tus.io HEAD request for %q", t.Oid)
req, err := httputil.NewHttpRequest("HEAD", rel.Href, rel.Header)
if err != nil {
return err
@ -69,9 +70,9 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
}
// Upload-Offset=size means already completed (skip)
// Batch API will probably already detect this, but handle just in case
if offset >= t.Object.Size {
tracerx.Printf("xfer: tus.io HEAD offset %d indicates %q is already fully uploaded, skipping", offset, t.Object.Oid)
advanceCallbackProgress(cb, t, t.Object.Size)
if offset >= t.Size {
tracerx.Printf("xfer: tus.io HEAD offset %d indicates %q is already fully uploaded, skipping", offset, t.Oid)
advanceCallbackProgress(cb, t, t.Size)
return nil
}
@ -84,9 +85,9 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
// Upload-Offset=0 means start from scratch, but still send PATCH
if offset == 0 {
tracerx.Printf("xfer: tus.io uploading %q from start", t.Object.Oid)
tracerx.Printf("xfer: tus.io uploading %q from start", t.Oid)
} else {
tracerx.Printf("xfer: tus.io resuming upload %q from %d", t.Object.Oid, offset)
tracerx.Printf("xfer: tus.io resuming upload %q from %d", t.Oid, offset)
advanceCallbackProgress(cb, t, offset)
_, err := f.Seek(offset, os.SEEK_CUR)
if err != nil {
@ -99,7 +100,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
// Response Upload-Offset must be request Upload-Offset plus sent bytes
// Response may include Upload-Expires header in which case check not passed
tracerx.Printf("xfer: sending tus.io PATCH request for %q", t.Object.Oid)
tracerx.Printf("xfer: sending tus.io PATCH request for %q", t.Oid)
req, err = httputil.NewHttpRequest("PATCH", rel.Href, rel.Header)
if err != nil {
return err
@ -107,8 +108,8 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
req.Header.Set("Tus-Resumable", TusVersion)
req.Header.Set("Upload-Offset", strconv.FormatInt(offset, 10))
req.Header.Set("Content-Type", "application/offset+octet-stream")
req.Header.Set("Content-Length", strconv.FormatInt(t.Object.Size-offset, 10))
req.ContentLength = t.Object.Size - offset
req.Header.Set("Content-Length", strconv.FormatInt(t.Size-offset, 10))
req.ContentLength = t.Size - offset
// Ensure progress callbacks made while uploading
// Wrap callback to give name context
@ -121,7 +122,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
var reader io.Reader
reader = &progress.CallbackReader{
C: ccb,
TotalSize: t.Object.Size,
TotalSize: t.Size,
Reader: f,
}
@ -154,7 +155,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()
return api.VerifyUpload(config.Config, t.Object)
return api.VerifyUpload(config.Config, toApiObject(t))
}
func configureTusAdapter(m *Manifest) {

44
tq/verify.go Normal file

@ -0,0 +1,44 @@
package tq
import (
"bytes"
"encoding/json"
"net/http"
"github.com/git-lfs/git-lfs/lfsapi"
)
func verifyUpload(c *lfsapi.Client, t *Transfer) error {
action, err := t.Actions.Get("verify")
if err != nil {
if IsActionMissingError(err) {
return nil
}
return err
}
by, err := json.Marshal(struct {
Oid string `json:"oid"`
Size int64 `json:"size"`
}{Oid: t.Oid, Size: t.Size})
if err != nil {
return err
}
req, err := http.NewRequest("POST", action.Href, bytes.NewReader(by))
if err != nil {
return err
}
for key, value := range action.Header {
req.Header.Set(key, value)
}
req.Header.Set("Content-Type", "application/vnd.git-lfs+json")
res, err := c.Do(req)
if err != nil {
return err
}
return res.Body.Close()
}

274
tq/verify_test.go Normal file

@ -0,0 +1,274 @@
package tq
import (
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/stretchr/testify/assert"
)
func TestVerifyWithoutAction(t *testing.T) {
c := &lfsapi.Client{}
tr := &Transfer{
Oid: "abc",
Size: 123,
}
assert.Nil(t, verifyUpload(c, tr))
}
func TestVerifySuccess(t *testing.T) {
var called uint32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() != "/verify" {
w.WriteHeader(http.StatusNotFound)
return
}
atomic.AddUint32(&called, 1)
var tr Transfer
err := json.NewDecoder(r.Body).Decode(&tr)
assert.Nil(t, err)
assert.Equal(t, "abc", tr.Oid)
assert.EqualValues(t, 123, tr.Size)
assert.Equal(t, "bar", r.Header.Get("Foo"))
assert.Equal(t, "application/vnd.git-lfs+json", r.Header.Get("Content-Type"))
assert.Equal(t, "24", r.Header.Get("Content-Length"))
}))
defer srv.Close()
c := &lfsapi.Client{}
tr := &Transfer{
Oid: "abc",
Size: 123,
Actions: map[string]*Action{
"verify": &Action{
Href: srv.URL + "/verify",
Header: map[string]string{
"foo": "bar",
},
},
},
}
assert.Nil(t, verifyUpload(c, tr))
assert.EqualValues(t, 1, called)
}
func TestVerifyAuthErrWithBody(t *testing.T) {
var called uint32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() != "/verify" {
w.WriteHeader(http.StatusNotFound)
return
}
atomic.AddUint32(&called, 1)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(401)
w.Write([]byte(`{"message":"custom auth error"}`))
}))
defer srv.Close()
c := &lfsapi.Client{}
tr := &Transfer{
Oid: "abc",
Size: 123,
Actions: map[string]*Action{
"verify": &Action{
Href: srv.URL + "/verify",
},
},
}
err := verifyUpload(c, tr)
assert.NotNil(t, err)
assert.True(t, errors.IsAuthError(err))
assert.Equal(t, "Authentication required: http: custom auth error", err.Error())
assert.EqualValues(t, 1, called)
}
func TestVerifyFatalWithBody(t *testing.T) {
var called uint32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() != "/verify" {
w.WriteHeader(http.StatusNotFound)
return
}
atomic.AddUint32(&called, 1)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(500)
w.Write([]byte(`{"message":"custom fatal error"}`))
}))
defer srv.Close()
c := &lfsapi.Client{}
tr := &Transfer{
Oid: "abc",
Size: 123,
Actions: map[string]*Action{
"verify": &Action{
Href: srv.URL + "/verify",
},
},
}
err := verifyUpload(c, tr)
assert.NotNil(t, err)
assert.True(t, errors.IsFatalError(err))
assert.Equal(t, "Fatal error: http: custom fatal error", err.Error())
assert.EqualValues(t, 1, called)
}
func TestVerifyWithNonFatal500WithBody(t *testing.T) {
c := &lfsapi.Client{}
tr := &Transfer{
Oid: "abc",
Size: 123,
Actions: map[string]*Action{
"verify": &Action{},
},
}
var called uint32
nonFatalCodes := map[int]string{
501: "custom 501 error",
507: "custom 507 error",
509: "custom 509 error",
}
for nonFatalCode, expectedErr := range nonFatalCodes {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() != "/verify" {
w.WriteHeader(http.StatusNotFound)
return
}
atomic.AddUint32(&called, 1)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(nonFatalCode)
w.Write([]byte(`{"message":"` + expectedErr + `"}`))
}))
tr.Actions["verify"].Href = srv.URL + "/verify"
err := verifyUpload(c, tr)
t.Logf("non fatal code %d", nonFatalCode)
assert.NotNil(t, err)
assert.Equal(t, "http: "+expectedErr, err.Error())
srv.Close()
}
assert.EqualValues(t, 3, called)
}
func TestVerifyAuthErrWithoutBody(t *testing.T) {
var called uint32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() != "/verify" {
w.WriteHeader(http.StatusNotFound)
return
}
atomic.AddUint32(&called, 1)
w.WriteHeader(401)
}))
defer srv.Close()
c := &lfsapi.Client{}
tr := &Transfer{
Oid: "abc",
Size: 123,
Actions: map[string]*Action{
"verify": &Action{
Href: srv.URL + "/verify",
},
},
}
err := verifyUpload(c, tr)
assert.NotNil(t, err)
assert.True(t, errors.IsAuthError(err))
assert.True(t, strings.HasPrefix(err.Error(), "Authentication required: Authorization error:"), err.Error())
assert.EqualValues(t, 1, called)
}
func TestVerifyFatalWithoutBody(t *testing.T) {
var called uint32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() != "/verify" {
w.WriteHeader(http.StatusNotFound)
return
}
atomic.AddUint32(&called, 1)
w.WriteHeader(500)
}))
defer srv.Close()
c := &lfsapi.Client{}
tr := &Transfer{
Oid: "abc",
Size: 123,
Actions: map[string]*Action{
"verify": &Action{
Href: srv.URL + "/verify",
},
},
}
err := verifyUpload(c, tr)
assert.NotNil(t, err)
assert.True(t, errors.IsFatalError(err))
assert.True(t, strings.HasPrefix(err.Error(), "Fatal error: Server error:"), err.Error())
assert.EqualValues(t, 1, called)
}
func TestVerifyWithNonFatal500WithoutBody(t *testing.T) {
c := &lfsapi.Client{}
tr := &Transfer{
Oid: "abc",
Size: 123,
Actions: map[string]*Action{
"verify": &Action{},
},
}
var called uint32
nonFatalCodes := map[int]string{
501: "Not Implemented:",
507: "Insufficient server storage:",
509: "Bandwidth limit exceeded:",
}
for nonFatalCode, errPrefix := range nonFatalCodes {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.String() != "/verify" {
w.WriteHeader(http.StatusNotFound)
return
}
atomic.AddUint32(&called, 1)
w.WriteHeader(nonFatalCode)
}))
tr.Actions["verify"].Href = srv.URL + "/verify"
err := verifyUpload(c, tr)
t.Logf("non fatal code %d", nonFatalCode)
assert.NotNil(t, err)
assert.True(t, strings.HasPrefix(err.Error(), errPrefix))
srv.Close()
}
assert.EqualValues(t, 3, called)
}

@ -1,17 +0,0 @@
#!/bin/bash
VERSION_STRING=$1
VERSION_ARRAY=( ${VERSION_STRING//./ } )
VERSION_MAJOR=${VERSION_ARRAY[0]}
VERSION_MINOR=${VERSION_ARRAY[1]}
VERSION_PATCH=${VERSION_ARRAY[2]}
# Update the version number git-lfs is reporting.
sed -i "s,\(Version = \"\).*\(\"\),\1$VERSION_STRING\2," config/version.go
# Update the version number in the RPM package.
sed -i "s,\(Version:[[:space:]]*\).*,\1$VERSION_STRING," rpm/SPECS/git-lfs.spec
# Update the version numbers in the Windows installer.
sed -i "s,\(FILEVERSION \).*,\1$VERSION_MAJOR\,$VERSION_MINOR\,$VERSION_PATCH\,0," script/windows-installer/resources.rc
sed -i "s,\([[:space:]]*VALUE \"ProductVersion\"\, \"\).*\(\\\\0\"\),\1$VERSION_STRING\2," script/windows-installer/resources.rc

18
versioninfo.json Normal file

@ -0,0 +1,18 @@
{
"FixedFileInfo":
{
"FileVersion": {
"Major": 1,
"Minor": 5,
"Patch": 0,
"Build": 0
}
},
"StringFileInfo":
{
"FileDescription": "Git LFS",
"LegalCopyright": "GitHub, Inc. and Git LFS contributors",
"ProductName": "Git Large File Storage (LFS)",
"ProductVersion": "1.5.0"
}
}