diff --git a/.gitignore b/.gitignore index c5d11ab7..b215777e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ bin/ benchmark/ out/ -resources.syso +resource.syso # only allow man/*.\d.ronn files man/* diff --git a/api/http_lifecycle.go b/api/http_lifecycle.go index 9a6eb160..63bf9cee 100644 --- a/api/http_lifecycle.go +++ b/api/http_lifecycle.go @@ -22,28 +22,24 @@ var ( ErrNoOperationGiven = errors.New("lfs/api: no operation provided in schema") ) -// EndpointSource is an interface which encapsulates the behavior of returning -// `config.Endpoint`s based on a particular operation. -type EndpointSource interface { - // Endpoint returns the `config.Endpoint` assosciated with a given - // operation. - Endpoint(operation string) endpoint.Endpoint -} - // HttpLifecycle serves as the default implementation of the Lifecycle interface // for HTTP requests. Internally, it leverages the *http.Client type to execute // HTTP requests against a root *url.URL, as given in `NewHttpLifecycle`. type HttpLifecycle struct { - endpoints EndpointSource + cfg *config.Configuration } var _ Lifecycle = new(HttpLifecycle) // NewHttpLifecycle initializes a new instance of the *HttpLifecycle type with a // new *http.Client, and the given root (see above). -func NewHttpLifecycle(endpoints EndpointSource) *HttpLifecycle { +// Passing a nil Configuration will use the global config +func NewHttpLifecycle(cfg *config.Configuration) *HttpLifecycle { + if cfg == nil { + cfg = config.Config + } return &HttpLifecycle{ - endpoints: endpoints, + cfg: cfg, } } @@ -79,7 +75,7 @@ func (l *HttpLifecycle) Build(schema *RequestSchema) (*http.Request, error) { return nil, err } - if _, err = auth.GetCreds(config.Config, req); err != nil { + if _, err = auth.GetCreds(l.cfg, req); err != nil { return nil, err } @@ -103,7 +99,7 @@ func (l *HttpLifecycle) Build(schema *RequestSchema) (*http.Request, error) { // Otherwise, the api.Response is returned, along with no error, signaling that // the request completed successfully. func (l *HttpLifecycle) Execute(req *http.Request, into interface{}) (Response, error) { - resp, err := httputil.DoHttpRequestWithRedirects(config.Config, req, []*http.Request{}, true) + resp, err := httputil.DoHttpRequestWithRedirects(l.cfg, req, []*http.Request{}, true) if err != nil { return nil, err } @@ -137,7 +133,7 @@ func (l *HttpLifecycle) absolutePath(operation Operation, path string) (*url.URL return nil, ErrNoOperationGiven } - root, err := url.Parse(l.endpoints.Endpoint(string(operation)).Url) + root, err := url.Parse(l.cfg.Endpoint(string(operation)).Url) if err != nil { return nil, err } diff --git a/api/http_lifecycle_test.go b/api/http_lifecycle_test.go index 40e3fd91..f5c1eda7 100644 --- a/api/http_lifecycle_test.go +++ b/api/http_lifecycle_test.go @@ -11,23 +11,17 @@ import ( "github.com/stretchr/testify/assert" ) -type NopEndpointSource struct { - Root string +func NewTestConfig() *config.Configuration { + c := config.NewFrom(config.Values{}) + c.SetManualEndpoint(config.Endpoint{Url: "https://example.com"}) + return c } -func (e *NopEndpointSource) Endpoint(op string) endpoint.Endpoint { - return endpoint.Endpoint{Url: e.Root} -} - -var ( - source = &NopEndpointSource{"https://example.com"} -) - func TestHttpLifecycleMakesRequestsAgainstAbsolutePath(t *testing.T) { SetupTestCredentialsFunc() defer RestoreCredentialsFunc() - l := api.NewHttpLifecycle(source) + l := api.NewHttpLifecycle(NewTestConfig()) req, err := l.Build(&api.RequestSchema{ Path: "/foo", Operation: api.DownloadOperation, @@ -41,7 +35,7 @@ func TestHttpLifecycleAttachesQueryParameters(t *testing.T) { SetupTestCredentialsFunc() defer RestoreCredentialsFunc() - l := api.NewHttpLifecycle(source) + l := api.NewHttpLifecycle(NewTestConfig()) req, err := l.Build(&api.RequestSchema{ Path: "/foo", Operation: api.DownloadOperation, @@ -58,7 +52,7 @@ func TestHttpLifecycleAttachesBodyWhenPresent(t *testing.T) { SetupTestCredentialsFunc() defer RestoreCredentialsFunc() - l := api.NewHttpLifecycle(source) + l := api.NewHttpLifecycle(NewTestConfig()) req, err := l.Build(&api.RequestSchema{ Operation: api.DownloadOperation, Body: struct { @@ -77,7 +71,7 @@ func TestHttpLifecycleDoesNotAttachBodyWhenEmpty(t *testing.T) { SetupTestCredentialsFunc() defer RestoreCredentialsFunc() - l := api.NewHttpLifecycle(source) + l := api.NewHttpLifecycle(NewTestConfig()) req, err := l.Build(&api.RequestSchema{ Operation: api.DownloadOperation, }) @@ -90,7 +84,7 @@ func TestHttpLifecycleErrsWithoutOperation(t *testing.T) { SetupTestCredentialsFunc() defer RestoreCredentialsFunc() - l := api.NewHttpLifecycle(source) + l := api.NewHttpLifecycle(NewTestConfig()) req, err := l.Build(&api.RequestSchema{ Path: "/foo", }) @@ -113,7 +107,7 @@ func TestHttpLifecycleExecutesRequestWithoutBody(t *testing.T) { req, _ := http.NewRequest("GET", server.URL+"/path", nil) - l := api.NewHttpLifecycle(source) + l := api.NewHttpLifecycle(NewTestConfig()) _, err := l.Execute(req, nil) assert.True(t, called) @@ -138,7 +132,7 @@ func TestHttpLifecycleExecutesRequestWithBody(t *testing.T) { req, _ := http.NewRequest("GET", server.URL+"/path", nil) - l := api.NewHttpLifecycle(source) + l := api.NewHttpLifecycle(NewTestConfig()) resp := new(Response) _, err := l.Execute(req, resp) diff --git a/api/lock_api.go b/api/lock_api.go index 7dd2df55..345f63f2 100644 --- a/api/lock_api.go +++ b/api/lock_api.go @@ -4,8 +4,6 @@ import ( "fmt" "strconv" "time" - - "github.com/git-lfs/git-lfs/config" ) // LockService is an API service which encapsulates the Git LFS Locking API. @@ -144,15 +142,8 @@ type Committer struct { Email string `json:"email"` } -// CurrentCommitter returns a Committer instance populated with the same -// credentials as would be used to author a commit. In particular, the -// "user.name" and "user.email" configuration values are used from the -// config.Config singleton. -func CurrentCommitter() Committer { - name, _ := config.Config.Git.Get("user.name") - email, _ := config.Config.Git.Get("user.email") - - return Committer{name, email} +func NewCommitter(name, email string) Committer { + return Committer{Name: name, Email: email} } // LockRequest encapsulates the payload sent across the API when a client would diff --git a/appveyor.yml b/appveyor.yml index 06cfabea..e07bcf45 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -7,7 +7,7 @@ environment: clone_folder: $(GOPATH)\src\github.com\git-lfs\git-lfs install: - - echo $(GOPATH) + - echo %GOPATH% - rd C:\Go /s /q - appveyor DownloadFile https://storage.googleapis.com/golang/go1.7.4.windows-amd64.zip - 7z x go1.7.4.windows-amd64.zip -oC:\ >nul @@ -16,7 +16,6 @@ install: - set PATH="C:\Program Files (x86)\Inno Setup 5";%PATH% build_script: - - C:\Ruby23\DevKit\mingw\bin\windres.exe script\windows-installer\resources.rc -o resources.syso - bash --login -c 'GOARCH=386 script/bootstrap' - mv bin\git-lfs.exe git-lfs-x86.exe - bash --login -c 'GOARCH=amd64 script/bootstrap' diff --git a/commands/command_checkout.go b/commands/command_checkout.go index 6d21cae8..9b9a31bb 100644 --- a/commands/command_checkout.go +++ b/commands/command_checkout.go @@ -1,14 +1,6 @@ package commands import ( - "bytes" - "fmt" - "io" - "os" - "os/exec" - "sync" - - "github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/filepathfilter" "github.com/git-lfs/git-lfs/git" "github.com/git-lfs/git-lfs/lfs" @@ -18,225 +10,58 @@ import ( func checkoutCommand(cmd *cobra.Command, args []string) { requireInRepo() - - // Parameters are filters - // firstly convert any pathspecs to the root of the repo, in case this is being executed in a sub-folder - var rootedpaths []string - - inchan := make(chan string, 1) - outchan, err := lfs.ConvertCwdFilesRelativeToRepo(inchan) - if err != nil { - Panic(err, "Could not checkout") - } - for _, arg := range args { - inchan <- arg - rootedpaths = append(rootedpaths, <-outchan) - } - close(inchan) - - filter := filepathfilter.New(rootedpaths, nil) - checkoutWithIncludeExclude(filter) -} - -func checkoutFromFetchChan(in chan *lfs.WrappedPointer, filter *filepathfilter.Filter) { ref, err := git.CurrentRef() if err != nil { Panic(err, "Could not checkout") } - // Need to ScanTree to identify multiple files with the same content (fetch will only report oids once) - // use new gitscanner so mapping has all the scanned pointers before continuing - mapping := make(map[string][]*lfs.WrappedPointer) - chgitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) { - if err != nil { - Panic(err, "Could not scan for Git LFS files") - return - } - mapping[p.Oid] = append(mapping[p.Oid], p) - }) - chgitscanner.Filter = filter - - if err := chgitscanner.ScanTree(ref.Sha, nil); err != nil { - ExitWithError(err) - } - - chgitscanner.Close() - - // Launch git update-index - c := make(chan *lfs.WrappedPointer) - - var wait sync.WaitGroup - wait.Add(1) - - go func() { - checkoutWithChan(c) - wait.Done() - }() - - // Feed it from in, which comes from fetch - for p := range in { - // Add all of the files for this oid - for _, fp := range mapping[p.Oid] { - c <- fp - } - } - close(c) - wait.Wait() -} - -func checkoutWithIncludeExclude(filter *filepathfilter.Filter) { - ref, err := git.CurrentRef() - if err != nil { - Panic(err, "Could not checkout") - } - - // this func has to load all pointers into memory - var pointers []*lfs.WrappedPointer - var multiErr error - chgitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) { - if err != nil { - if multiErr != nil { - multiErr = fmt.Errorf("%v\n%v", multiErr, err) - } else { - multiErr = err - } - return - } - - pointers = append(pointers, p) - }) - - chgitscanner.Filter = filter - - if err := chgitscanner.ScanTree(ref.Sha, nil); err != nil { - ExitWithError(err) - } - chgitscanner.Close() - - if multiErr != nil { - Panic(multiErr, "Could not scan for Git LFS files") - } - - var wait sync.WaitGroup - wait.Add(1) - - c := make(chan *lfs.WrappedPointer, 1) - - go func() { - checkoutWithChan(c) - wait.Done() - }() - - meter := progress.NewMeter(progress.WithOSEnv(cfg.Os)) - meter.Start() var totalBytes int64 - for _, pointer := range pointers { - totalBytes += pointer.Size - meter.Add(totalBytes) - meter.StartTransfer(pointer.Name) - c <- pointer + meter := progress.NewMeter(progress.WithOSEnv(cfg.Os)) + singleCheckout := newSingleCheckout() + chgitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) { + if err != nil { + LoggedError(err, "Scanner error") + return + } + + totalBytes += p.Size + meter.Add(p.Size) + meter.StartTransfer(p.Name) + + singleCheckout.Run(p) + // not strictly correct (parallel) but we don't have a callback & it's just local // plus only 1 slot in channel so it'll block & be close - meter.TransferBytes("checkout", pointer.Name, pointer.Size, totalBytes, int(pointer.Size)) - meter.FinishTransfer(pointer.Name) + meter.TransferBytes("checkout", p.Name, p.Size, totalBytes, int(p.Size)) + meter.FinishTransfer(p.Name) + }) + + chgitscanner.Filter = filepathfilter.New(rootedPaths(args), nil) + + if err := chgitscanner.ScanTree(ref.Sha); err != nil { + ExitWithError(err) } - close(c) - wait.Wait() + + meter.Start() + chgitscanner.Close() meter.Finish() + singleCheckout.Close() } -// Populate the working copy with the real content of objects where the file is -// either missing, or contains a matching pointer placeholder, from a list of pointers. -// If the file exists but has other content it is left alone -// Callers of this function MUST NOT Panic or otherwise exit the process -// without waiting for this function to shut down. If the process exits while -// update-index is in the middle of processing a file the git index can be left -// in a locked state. -func checkoutWithChan(in <-chan *lfs.WrappedPointer) { - // Get a converter from repo-relative to cwd-relative - // Since writing data & calling git update-index must be relative to cwd - repopathchan := make(chan string, 1) - cwdpathchan, err := lfs.ConvertRepoFilesRelativeToCwd(repopathchan) +// Parameters are filters +// firstly convert any pathspecs to the root of the repo, in case this is being +// executed in a sub-folder +func rootedPaths(args []string) []string { + pathConverter, err := lfs.NewCurrentToRepoPathConverter() if err != nil { - Panic(err, "Could not convert file paths") + Panic(err, "Could not checkout") } - // Don't fire up the update-index command until we have at least one file to - // give it. Otherwise git interprets the lack of arguments to mean param-less update-index - // which can trigger entire working copy to be re-examined, which triggers clean filters - // and which has unexpected side effects (e.g. downloading filtered-out files) - var cmd *exec.Cmd - var updateIdxStdin io.WriteCloser - var updateIdxOut bytes.Buffer - - // From this point on, git update-index is running. Code in this loop MUST - // NOT Panic() or otherwise cause the process to exit. If the process exits - // while update-index is in the middle of updating, the index can remain in a - // locked state. - - // As files come in, write them to the wd and update the index - - manifest := TransferManifest() - - for pointer := range in { - - // Check the content - either missing or still this pointer (not exist is ok) - filepointer, err := lfs.DecodePointerFromFile(pointer.Name) - if err != nil && !os.IsNotExist(err) { - if errors.IsNotAPointerError(err) { - // File has non-pointer content, leave it alone - continue - } - LoggedError(err, "Problem accessing %v", pointer.Name) - continue - } - - if filepointer != nil && filepointer.Oid != pointer.Oid { - // User has probably manually reset a file to another commit - // while leaving it a pointer; don't mess with this - continue - } - - repopathchan <- pointer.Name - cwdfilepath := <-cwdpathchan - - err = lfs.PointerSmudgeToFile(cwdfilepath, pointer.Pointer, false, manifest, nil) - if err != nil { - if errors.IsDownloadDeclinedError(err) { - // acceptable error, data not local (fetch not run or include/exclude) - LoggedError(err, "Skipped checkout for %v, content not local. Use fetch to download.", pointer.Name) - } else { - LoggedError(err, "Could not checkout file") - continue - } - } - - if cmd == nil { - // Fire up the update-index command - cmd = exec.Command("git", "update-index", "-q", "--refresh", "--stdin") - cmd.Stdout = &updateIdxOut - cmd.Stderr = &updateIdxOut - updateIdxStdin, err = cmd.StdinPipe() - if err != nil { - Panic(err, "Could not update the index") - } - - if err := cmd.Start(); err != nil { - Panic(err, "Could not update the index") - } - - } - - updateIdxStdin.Write([]byte(cwdfilepath + "\n")) - } - close(repopathchan) - - if cmd != nil && updateIdxStdin != nil { - updateIdxStdin.Close() - if err := cmd.Wait(); err != nil { - LoggedError(err, "Error updating the git index:\n%s", updateIdxOut.String()) - } + rootedpaths := make([]string, 0, len(args)) + for _, arg := range args { + rootedpaths = append(rootedpaths, pathConverter.Convert(arg)) } + return rootedpaths } func init() { diff --git a/commands/command_fetch.go b/commands/command_fetch.go index 7b346a3f..05404fba 100644 --- a/commands/command_fetch.go +++ b/commands/command_fetch.go @@ -125,7 +125,7 @@ func pointersToFetchForRef(ref string, filter *filepathfilter.Filter) ([]*lfs.Wr tempgitscanner.Filter = filter - if err := tempgitscanner.ScanTree(ref, nil); err != nil { + if err := tempgitscanner.ScanTree(ref); err != nil { return nil, err } @@ -133,18 +133,6 @@ func pointersToFetchForRef(ref string, filter *filepathfilter.Filter) ([]*lfs.Wr return pointers, multiErr } -func fetchRefToChan(ref string, filter *filepathfilter.Filter) chan *lfs.WrappedPointer { - c := make(chan *lfs.WrappedPointer) - pointers, err := pointersToFetchForRef(ref, filter) - if err != nil { - Panic(err, "Could not scan for Git LFS files") - } - - go fetchAndReportToChan(pointers, filter, c) - - return c -} - // Fetch all binaries for a given ref (that we don't have already) func fetchRef(ref string, filter *filepathfilter.Filter) bool { pointers, err := pointersToFetchForRef(ref, filter) @@ -326,7 +314,8 @@ func fetchAndReportToChan(allpointers []*lfs.WrappedPointer, filter *filepathfil for _, p := range pointers { tracerx.Printf("fetch %v [%v]", p.Name, p.Oid) - q.Add(lfs.NewDownloadable(p)) + + q.Add(downloadTransfer(p)) } processQueue := time.Now() diff --git a/commands/command_lock.go b/commands/command_lock.go index c5c5c7c9..8ccf63e5 100644 --- a/commands/command_lock.go +++ b/commands/command_lock.go @@ -6,60 +6,43 @@ import ( "path/filepath" "strings" - "github.com/git-lfs/git-lfs/api" - "github.com/git-lfs/git-lfs/config" "github.com/git-lfs/git-lfs/git" + "github.com/git-lfs/git-lfs/locking" "github.com/spf13/cobra" ) var ( lockRemote string lockRemoteHelp = "specify which remote to use when interacting with locks" - - // TODO(taylor): consider making this (and the above flag) a property of - // some parent-command, or another similarly less ugly way of handling - // this - setLockRemoteFor = func(c *config.Configuration) { - c.CurrentRemote = lockRemote - } ) func lockCommand(cmd *cobra.Command, args []string) { - setLockRemoteFor(cfg) if len(args) == 0 { Print("Usage: git lfs lock ") return } - latest, err := git.CurrentRemoteRef() - if err != nil { - Error(err.Error()) - Exit("Unable to determine lastest remote ref for branch.") - } - path, err := lockPath(args[0]) if err != nil { Exit(err.Error()) } - s, resp := API.Locks.Lock(&api.LockRequest{ - Path: path, - Committer: api.CurrentCommitter(), - LatestRemoteCommit: latest.Sha, - }) - - if _, err := API.Do(s); err != nil { - Error(err.Error()) - Exit("Error communicating with LFS API.") + if len(lockRemote) > 0 { + cfg.CurrentRemote = lockRemote } - if len(resp.Err) > 0 { - Error(resp.Err) - Exit("Server unable to create lock.") + lockClient, err := locking.NewClient(cfg) + if err != nil { + Exit("Unable to create lock system: %v", err.Error()) + } + defer lockClient.Close() + lock, err := lockClient.LockFile(path) + if err != nil { + Exit("Lock failed: %v", err) } - Print("\n'%s' was locked (%s)", args[0], resp.Lock.Id) + Print("\n'%s' was locked (%s)", args[0], lock.Id) } // lockPaths relativizes the given filepath such that it is relative to the root diff --git a/commands/command_locks.go b/commands/command_locks.go index 10771ca0..13fc6965 100644 --- a/commands/command_locks.go +++ b/commands/command_locks.go @@ -1,7 +1,7 @@ package commands import ( - "github.com/git-lfs/git-lfs/api" + "github.com/git-lfs/git-lfs/locking" "github.com/spf13/cobra" ) @@ -10,45 +10,33 @@ var ( ) func locksCommand(cmd *cobra.Command, args []string) { - setLockRemoteFor(cfg) filters, err := locksCmdFlags.Filters() if err != nil { - Error(err.Error()) + Exit("Error building filters: %v", err) } - var locks []api.Lock - - query := &api.LockSearchRequest{Filters: filters} - for { - s, resp := API.Locks.Search(query) - if _, err := API.Do(s); err != nil { - Error(err.Error()) - Exit("Error communicating with LFS API.") - } - - if resp.Err != "" { - Error(resp.Err) - } - - locks = append(locks, resp.Locks...) - - if locksCmdFlags.Limit > 0 && len(locks) > locksCmdFlags.Limit { - locks = locks[:locksCmdFlags.Limit] - break - } - - if resp.NextCursor != "" { - query.Cursor = resp.NextCursor - } else { - break - } + if len(lockRemote) > 0 { + cfg.CurrentRemote = lockRemote } - - Print("\n%d lock(s) matched query:", len(locks)) + lockClient, err := locking.NewClient(cfg) + if err != nil { + Exit("Unable to create lock system: %v", err.Error()) + } + defer lockClient.Close() + var lockCount int + locks, err := lockClient.SearchLocks(filters, locksCmdFlags.Limit, locksCmdFlags.Local) + // Print any we got before exiting for _, lock := range locks { - Print("%s\t%s <%s>", lock.Path, lock.Committer.Name, lock.Committer.Email) + Print("%s\t%s <%s>", lock.Path, lock.Name, lock.Email) + lockCount++ } + + if err != nil { + Exit("Error while retrieving locks: %v", err) + } + + Print("\n%d lock(s) matched query.", lockCount) } // locksFlags wraps up and holds all of the flags that can be given to the @@ -63,13 +51,14 @@ type locksFlags struct { // limit is an optional request parameter sent to the server used to // limit the Limit int + // local limits the scope of lock reporting to the locally cached record + // of locks for the current user & doesn't query the server + Local bool } -// Filters produces a slice of api.Filter instances based on the internal state -// of this locksFlags instance. The return value of this method is capable (and -// recommend to be used with) the api.LockSearchRequest type. -func (l *locksFlags) Filters() ([]api.Filter, error) { - filters := make([]api.Filter, 0) +// Filters produces a filter based on locksFlags instance. +func (l *locksFlags) Filters() (map[string]string, error) { + filters := make(map[string]string) if l.Path != "" { path, err := lockPath(l.Path) @@ -77,10 +66,10 @@ func (l *locksFlags) Filters() ([]api.Filter, error) { return nil, err } - filters = append(filters, api.Filter{"path", path}) + filters["path"] = path } if l.Id != "" { - filters = append(filters, api.Filter{"id", l.Id}) + filters["id"] = l.Id } return filters, nil @@ -96,5 +85,6 @@ func init() { cmd.Flags().StringVarP(&locksCmdFlags.Path, "path", "p", "", "filter locks results matching a particular path") cmd.Flags().StringVarP(&locksCmdFlags.Id, "id", "i", "", "filter locks results matching a particular ID") cmd.Flags().IntVarP(&locksCmdFlags.Limit, "limit", "l", 0, "optional limit for number of results to return") + cmd.Flags().BoolVarP(&locksCmdFlags.Local, "local", "", false, "only list cached local record of own locks") }) } diff --git a/commands/command_ls_files.go b/commands/command_ls_files.go index ec0d12d0..af2b4f51 100644 --- a/commands/command_ls_files.go +++ b/commands/command_ls_files.go @@ -42,7 +42,7 @@ func lsFilesCommand(cmd *cobra.Command, args []string) { }) defer gitscanner.Close() - if err := gitscanner.ScanTree(ref, nil); err != nil { + if err := gitscanner.ScanTree(ref); err != nil { Exit("Could not scan for Git LFS tree: %s", err) } } diff --git a/commands/command_prune.go b/commands/command_prune.go index 94682277..b1314c73 100644 --- a/commands/command_prune.go +++ b/commands/command_prune.go @@ -149,8 +149,10 @@ func prune(fetchPruneConfig config.FetchPruneConfig, verifyRemote, dryRun, verbo if verifyRemote { tracerx.Printf("VERIFYING: %v", file.Oid) - pointer := lfs.NewPointer(file.Oid, file.Size, nil) - verifyQueue.Add(lfs.NewDownloadable(&lfs.WrappedPointer{Pointer: pointer})) + + verifyQueue.Add(downloadTransfer(&lfs.WrappedPointer{ + Pointer: lfs.NewPointer(file.Oid, file.Size, nil), + })) } } } diff --git a/commands/command_pull.go b/commands/command_pull.go index 4fced5dc..6bd6e97c 100644 --- a/commands/command_pull.go +++ b/commands/command_pull.go @@ -2,9 +2,15 @@ package commands import ( "fmt" + "sync" + "time" "github.com/git-lfs/git-lfs/filepathfilter" "github.com/git-lfs/git-lfs/git" + "github.com/git-lfs/git-lfs/lfs" + "github.com/git-lfs/git-lfs/progress" + "github.com/git-lfs/git-lfs/tq" + "github.com/rubyist/tracerx" "github.com/spf13/cobra" ) @@ -38,8 +44,99 @@ func pull(filter *filepathfilter.Filter) { Panic(err, "Could not pull") } - c := fetchRefToChan(ref.Sha, filter) - checkoutFromFetchChan(c, filter) + pointers := newPointerMap() + meter := progress.NewMeter(progress.WithOSEnv(cfg.Os)) + singleCheckout := newSingleCheckout() + q := newDownloadQueue(tq.WithProgress(meter)) + gitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) { + if err != nil { + LoggedError(err, "Scanner error") + return + } + + if pointers.Seen(p) { + return + } + + // no need to download objects that exist locally already + lfs.LinkOrCopyFromReference(p.Oid, p.Size) + if lfs.ObjectExistsOfSize(p.Oid, p.Size) { + singleCheckout.Run(p) + return + } + + meter.Add(p.Size) + meter.StartTransfer(p.Name) + tracerx.Printf("fetch %v [%v]", p.Name, p.Oid) + pointers.Add(p) + q.Add(downloadTransfer(p)) + }) + + gitscanner.Filter = filter + + dlwatch := q.Watch() + var wg sync.WaitGroup + wg.Add(1) + + go func() { + for oid := range dlwatch { + for _, p := range pointers.All(oid) { + singleCheckout.Run(p) + } + } + wg.Done() + }() + + processQueue := time.Now() + if err := gitscanner.ScanTree(ref.Sha); err != nil { + ExitWithError(err) + } + + meter.Start() + gitscanner.Close() + q.Wait() + wg.Wait() + tracerx.PerformanceSince("process queue", processQueue) + + singleCheckout.Close() + + for _, err := range q.Errors() { + FullError(err) + } +} + +// tracks LFS objects being downloaded, according to their unique OIDs. +type pointerMap struct { + pointers map[string][]*lfs.WrappedPointer + mu sync.Mutex +} + +func newPointerMap() *pointerMap { + return &pointerMap{pointers: make(map[string][]*lfs.WrappedPointer)} +} + +func (m *pointerMap) Seen(p *lfs.WrappedPointer) bool { + m.mu.Lock() + defer m.mu.Unlock() + if existing, ok := m.pointers[p.Oid]; ok { + m.pointers[p.Oid] = append(existing, p) + return true + } + return false +} + +func (m *pointerMap) Add(p *lfs.WrappedPointer) { + m.mu.Lock() + defer m.mu.Unlock() + m.pointers[p.Oid] = append(m.pointers[p.Oid], p) +} + +func (m *pointerMap) All(oid string) []*lfs.WrappedPointer { + m.mu.Lock() + defer m.mu.Unlock() + pointers := m.pointers[oid] + delete(m.pointers, oid) + return pointers } func init() { diff --git a/commands/command_unlock.go b/commands/command_unlock.go index a52aa731..505d57fa 100644 --- a/commands/command_unlock.go +++ b/commands/command_unlock.go @@ -1,20 +1,12 @@ package commands import ( - "errors" + "github.com/git-lfs/git-lfs/locking" - "github.com/git-lfs/git-lfs/api" "github.com/spf13/cobra" ) var ( - // errNoMatchingLocks is an error returned when no matching locks were - // able to be resolved - errNoMatchingLocks = errors.New("lfs: no matching locks found") - // errLockAmbiguous is an error returned when multiple matching locks - // were found - errLockAmbiguous = errors.New("lfs: multiple locks found; ambiguous") - unlockCmdFlags unlockFlags ) @@ -29,67 +21,36 @@ type unlockFlags struct { } func unlockCommand(cmd *cobra.Command, args []string) { - setLockRemoteFor(cfg) - var id string + if len(lockRemote) > 0 { + cfg.CurrentRemote = lockRemote + } + + lockClient, err := locking.NewClient(cfg) + if err != nil { + Exit("Unable to create lock system: %v", err.Error()) + } + defer lockClient.Close() if len(args) != 0 { path, err := lockPath(args[0]) if err != nil { - Error(err.Error()) + Exit("Unable to determine path: %v", err.Error()) } - if id, err = lockIdFromPath(path); err != nil { - Error(err.Error()) + err = lockClient.UnlockFile(path, unlockCmdFlags.Force) + if err != nil { + Exit("Unable to unlock: %v", err.Error()) } } else if unlockCmdFlags.Id != "" { - id = unlockCmdFlags.Id + err := lockClient.UnlockFileById(unlockCmdFlags.Id, unlockCmdFlags.Force) + if err != nil { + Exit("Unable to unlock %v: %v", unlockCmdFlags.Id, err.Error()) + } } else { Error("Usage: git lfs unlock (--id my-lock-id | )") } - s, resp := API.Locks.Unlock(id, unlockCmdFlags.Force) - - if _, err := API.Do(s); err != nil { - Error(err.Error()) - Exit("Error communicating with LFS API.") - } - - if len(resp.Err) > 0 { - Error(resp.Err) - Exit("Server unable to unlock lock.") - } - - Print("'%s' was unlocked (%s)", args[0], resp.Lock.Id) -} - -// lockIdFromPath makes a call to the LFS API and resolves the ID for the locked -// locked at the given path. -// -// If the API call failed, an error will be returned. If multiple locks matched -// the given path (should not happen during real-world usage), an error will be -// returnd. If no locks matched the given path, an error will be returned. -// -// If the API call is successful, and only one lock matches the given filepath, -// then its ID will be returned, along with a value of "nil" for the error. -func lockIdFromPath(path string) (string, error) { - s, resp := API.Locks.Search(&api.LockSearchRequest{ - Filters: []api.Filter{ - {"path", path}, - }, - }) - - if _, err := API.Do(s); err != nil { - return "", err - } - - switch len(resp.Locks) { - case 0: - return "", errNoMatchingLocks - case 1: - return resp.Locks[0].Id, nil - default: - return "", errLockAmbiguous - } + Print("'%s' was unlocked", args[0]) } func init() { diff --git a/commands/commands.go b/commands/commands.go index 85235df2..cc45317c 100644 --- a/commands/commands.go +++ b/commands/commands.go @@ -11,7 +11,6 @@ import ( "strings" "time" - "github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/config" "github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/filepathfilter" @@ -26,10 +25,6 @@ import ( //go:generate go run ../docs/man/mangen.go var ( - // API is a package-local instance of the API client for use within - // various command implementations. - API = api.NewClient(nil) - Debugging = false ErrorBuffer = &bytes.Buffer{} ErrorWriter = io.MultiWriter(os.Stderr, ErrorBuffer) @@ -67,6 +62,74 @@ func buildFilepathFilter(config *config.Configuration, includeArg, excludeArg *s return filepathfilter.New(inc, exc) } +func downloadTransfer(p *lfs.WrappedPointer) (name, path, oid string, size int64) { + path, _ = lfs.LocalMediaPath(p.Oid) + + return p.Name, path, p.Oid, p.Size +} + +func uploadTransfer(oid, filename string) (*tq.Transfer, error) { + localMediaPath, err := lfs.LocalMediaPath(oid) + if err != nil { + return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid) + } + + if len(filename) > 0 { + if err = ensureFile(filename, localMediaPath); err != nil { + return nil, err + } + } + + fi, err := os.Stat(localMediaPath) + if err != nil { + return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid) + } + + return &tq.Transfer{ + Name: filename, + Path: localMediaPath, + Oid: oid, + Size: fi.Size(), + }, nil +} + +// ensureFile makes sure that the cleanPath exists before pushing it. If it +// does not exist, it attempts to clean it by reading the file at smudgePath. +func ensureFile(smudgePath, cleanPath string) error { + if _, err := os.Stat(cleanPath); err == nil { + return nil + } + + expectedOid := filepath.Base(cleanPath) + localPath := filepath.Join(config.LocalWorkingDir, smudgePath) + file, err := os.Open(localPath) + if err != nil { + return err + } + + defer file.Close() + + stat, err := file.Stat() + if err != nil { + return err + } + + cleaned, err := lfs.PointerClean(file, file.Name(), stat.Size(), nil) + if cleaned != nil { + cleaned.Teardown() + } + + if err != nil { + return err + } + + if expectedOid != cleaned.Oid { + return fmt.Errorf("Trying to push %q with OID %s.\nNot found in %s.", smudgePath, expectedOid, filepath.Dir(cleanPath)) + } + + return nil +} + // Error prints a formatted message to Stderr. It also gets printed to the // panic log if one is created for this command. func Error(format string, args ...interface{}) { diff --git a/commands/pull.go b/commands/pull.go new file mode 100644 index 00000000..ce764982 --- /dev/null +++ b/commands/pull.go @@ -0,0 +1,136 @@ +package commands + +import ( + "bytes" + "fmt" + "io" + "os" + "os/exec" + "sync" + + "github.com/git-lfs/git-lfs/errors" + "github.com/git-lfs/git-lfs/lfs" + "github.com/git-lfs/git-lfs/tq" +) + +// Handles the process of checking out a single file, and updating the git +// index. +func newSingleCheckout() *singleCheckout { + // Get a converter from repo-relative to cwd-relative + // Since writing data & calling git update-index must be relative to cwd + pathConverter, err := lfs.NewRepoToCurrentPathConverter() + if err != nil { + Panic(err, "Could not convert file paths") + } + + return &singleCheckout{ + gitIndexer: &gitIndexer{}, + pathConverter: pathConverter, + manifest: TransferManifest(), + } +} + +type singleCheckout struct { + gitIndexer *gitIndexer + pathConverter lfs.PathConverter + manifest *tq.Manifest +} + +func (c *singleCheckout) Run(p *lfs.WrappedPointer) { + // Check the content - either missing or still this pointer (not exist is ok) + filepointer, err := lfs.DecodePointerFromFile(p.Name) + if err != nil && !os.IsNotExist(err) { + if errors.IsNotAPointerError(err) { + // File has non-pointer content, leave it alone + return + } + + LoggedError(err, "Checkout error: %s", err) + return + } + + if filepointer != nil && filepointer.Oid != p.Oid { + // User has probably manually reset a file to another commit + // while leaving it a pointer; don't mess with this + return + } + + cwdfilepath := c.pathConverter.Convert(p.Name) + + err = lfs.PointerSmudgeToFile(cwdfilepath, p.Pointer, false, c.manifest, nil) + if err != nil { + if errors.IsDownloadDeclinedError(err) { + // acceptable error, data not local (fetch not run or include/exclude) + LoggedError(err, "Skipped checkout for %q, content not local. Use fetch to download.", p.Name) + } else { + FullError(fmt.Errorf("Could not check out %q", p.Name)) + } + return + } + + // errors are only returned when the gitIndexer is starting a new cmd + if err := c.gitIndexer.Add(cwdfilepath); err != nil { + Panic(err, "Could not update the index") + } +} + +func (c *singleCheckout) Close() { + if err := c.gitIndexer.Close(); err != nil { + LoggedError(err, "Error updating the git index:\n%s", c.gitIndexer.Output()) + } +} + +// Don't fire up the update-index command until we have at least one file to +// give it. Otherwise git interprets the lack of arguments to mean param-less update-index +// which can trigger entire working copy to be re-examined, which triggers clean filters +// and which has unexpected side effects (e.g. downloading filtered-out files) +type gitIndexer struct { + cmd *exec.Cmd + input io.WriteCloser + output bytes.Buffer + mu sync.Mutex +} + +func (i *gitIndexer) Add(path string) error { + i.mu.Lock() + defer i.mu.Unlock() + + if i.cmd == nil { + // Fire up the update-index command + i.cmd = exec.Command("git", "update-index", "-q", "--refresh", "--stdin") + i.cmd.Stdout = &i.output + i.cmd.Stderr = &i.output + stdin, err := i.cmd.StdinPipe() + if err == nil { + err = i.cmd.Start() + } + + if err != nil { + return err + } + + i.input = stdin + } + + i.input.Write([]byte(path + "\n")) + return nil +} + +func (i *gitIndexer) Output() string { + return i.output.String() +} + +func (i *gitIndexer) Close() error { + i.mu.Lock() + defer i.mu.Unlock() + + if i.input != nil { + i.input.Close() + } + + if i.cmd != nil { + return i.cmd.Wait() + } + + return nil +} diff --git a/commands/uploader.go b/commands/uploader.go index d5472ec3..6634b7cf 100644 --- a/commands/uploader.go +++ b/commands/uploader.go @@ -113,7 +113,7 @@ func (c *uploadContext) checkMissing(missing []*lfs.WrappedPointer, missingSize }() for _, p := range missing { - checkQueue.Add(lfs.NewDownloadable(p)) + checkQueue.Add(downloadTransfer(p)) } // Currently this is needed to flush the batch but is not enough to sync @@ -140,7 +140,7 @@ func uploadPointers(c *uploadContext, unfiltered []*lfs.WrappedPointer) { q, pointers := c.prepareUpload(unfiltered) for _, p := range pointers { - u, err := lfs.NewUploadable(p.Oid, p.Name) + t, err := uploadTransfer(p.Oid, p.Name) if err != nil { if errors.IsCleanPointerError(err) { Exit(uploadMissingErr, p.Oid, p.Name, errors.GetContext(err, "pointer").(*lfs.Pointer).Oid) @@ -149,7 +149,7 @@ func uploadPointers(c *uploadContext, unfiltered []*lfs.WrappedPointer) { } } - q.Add(u) + q.Add(t.Name, t.Path, t.Oid, t.Size) c.SetUploaded(p.Oid) } diff --git a/config/config.go b/config/config.go index ea1eaa35..14b9efec 100644 --- a/config/config.go +++ b/config/config.go @@ -394,3 +394,12 @@ func (c *Configuration) loadGitConfig() bool { return false } + +// CurrentCommitter returns the name/email that would be used to author a commit +// with this configuration. In particular, the "user.name" and "user.email" +// configuration values are used +func (c *Configuration) CurrentCommitter() (name, email string) { + name, _ = c.Git.Get("user.name") + email, _ = c.Git.Get("user.email") + return +} diff --git a/docs/api/batch.md b/docs/api/batch.md index 730dacfb..7a661613 100644 --- a/docs/api/batch.md +++ b/docs/api/batch.md @@ -31,7 +31,7 @@ that the client has configured. If omitted, the `basic` transfer adapter MUST be assumed by the server. * `objects` - An Array of objects to download. * `oid` - String OID of the LFS object. - * `size` - Integer byte size of the LFS object. + * `size` - Integer byte size of the LFS object. Must be at least zero. Note: Git LFS currently only supports the `basic` transfer adapter. This property was added for future compatibility with some experimental transfer @@ -70,7 +70,7 @@ client will use the `basic` transfer adapter if the `transfer` property is omitted. * `objects` - An Array of objects to download. * `oid` - String OID of the LFS object. - * `size` - Integer byte size of the LFS object. + * `size` - Integer byte size of the LFS object. Must be at least zero. * `authenticated` - Optional boolean specifying whether the request for this specific object is authenticated. If omitted or false, Git LFS will attempt to [find credentials for this URL](./authentication.md). diff --git a/docs/api/schemas/http-batch-request-schema.json b/docs/api/schemas/http-batch-request-schema.json index f50e4451..95e22cf3 100644 --- a/docs/api/schemas/http-batch-request-schema.json +++ b/docs/api/schemas/http-batch-request-schema.json @@ -21,7 +21,8 @@ "type": "string" }, "size": { - "type": "number" + "type": "number", + "minimum": 0 }, "authenticated": { "type": "boolean" diff --git a/filepathfilter/filepathfilter_test.go b/filepathfilter/filepathfilter_test.go index 10819c36..ab678cfc 100644 --- a/filepathfilter/filepathfilter_test.go +++ b/filepathfilter/filepathfilter_test.go @@ -78,6 +78,7 @@ func TestFilterAllows(t *testing.T) { filterTest{true, []string{"test/fil*"}, nil}, filterTest{false, []string{"test/g*"}, nil}, filterTest{true, []string{"tes*/*"}, nil}, + filterTest{true, []string{"[Tt]est/[Ff]ilename.dat"}, nil}, // Exclusion filterTest{false, nil, []string{"*.dat"}}, filterTest{false, nil, []string{"file*.dat"}}, @@ -96,6 +97,7 @@ func TestFilterAllows(t *testing.T) { filterTest{false, nil, []string{"test/fil*"}}, filterTest{true, nil, []string{"test/g*"}}, filterTest{false, nil, []string{"tes*/*"}}, + filterTest{false, nil, []string{"[Tt]est/[Ff]ilename.dat"}}, // Both filterTest{true, []string{"test/filename.dat"}, []string{"test/notfilename.dat"}}, diff --git a/git-lfs.go b/git-lfs.go index 823dcebd..c0dad72a 100644 --- a/git-lfs.go +++ b/git-lfs.go @@ -1,3 +1,5 @@ +//go:generate goversioninfo -icon=script/windows-installer/git-lfs-logo.ico + package main import ( diff --git a/lfs/download_queue.go b/lfs/download_queue.go index 62360259..7e4b7ae2 100644 --- a/lfs/download_queue.go +++ b/lfs/download_queue.go @@ -1,45 +1,10 @@ package lfs import ( - "github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/config" "github.com/git-lfs/git-lfs/tq" ) -type Downloadable struct { - pointer *WrappedPointer - object *api.ObjectResource -} - -func (d *Downloadable) Object() *api.ObjectResource { - return d.object -} - -func (d *Downloadable) Oid() string { - return d.pointer.Oid -} - -func (d *Downloadable) Size() int64 { - return d.pointer.Size -} - -func (d *Downloadable) Name() string { - return d.pointer.Name -} - -func (d *Downloadable) Path() string { - p, _ := LocalMediaPath(d.pointer.Oid) - return p -} - -func (d *Downloadable) SetObject(o *api.ObjectResource) { - d.object = o -} - -func NewDownloadable(p *WrappedPointer) *Downloadable { - return &Downloadable{pointer: p} -} - // NewDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download func NewDownloadCheckQueue(cfg *config.Configuration, options ...tq.Option) *tq.TransferQueue { allOptions := make([]tq.Option, len(options), len(options)+1) diff --git a/lfs/gitscanner.go b/lfs/gitscanner.go index 123e6cfc..952842c4 100644 --- a/lfs/gitscanner.go +++ b/lfs/gitscanner.go @@ -132,8 +132,8 @@ func (s *GitScanner) ScanAll(cb GitScannerCallback) error { // ScanTree takes a ref and returns WrappedPointer objects in the tree at that // ref. Differs from ScanRefs in that multiple files in the tree with the same // content are all reported. -func (s *GitScanner) ScanTree(ref string, cb GitScannerCallback) error { - callback, err := firstGitScannerCallback(cb, s.callback) +func (s *GitScanner) ScanTree(ref string) error { + callback, err := firstGitScannerCallback(s.callback) if err != nil { return err } diff --git a/lfs/pointer_smudge.go b/lfs/pointer_smudge.go index d86a0410..df556396 100644 --- a/lfs/pointer_smudge.go +++ b/lfs/pointer_smudge.go @@ -10,7 +10,6 @@ import ( "github.com/git-lfs/git-lfs/tools" "github.com/git-lfs/git-lfs/tq" - "github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/config" "github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/progress" @@ -75,34 +74,21 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, download func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, manifest *tq.Manifest, cb progress.CopyCallback) error { fmt.Fprintf(os.Stderr, "Downloading %s (%s)\n", workingfile, pb.FormatBytes(ptr.Size)) - xfers := manifest.GetDownloadAdapterNames() - obj, adapterName, err := api.BatchSingle(config.Config, &api.ObjectResource{Oid: ptr.Oid, Size: ptr.Size}, "download", xfers) - if err != nil { - return errors.Wrapf(err, "Error downloading %s: %s", filepath.Base(mediafile), err) - } + q := tq.NewTransferQueue(tq.Download, manifest) + q.Add(filepath.Base(workingfile), mediafile, ptr.Oid, ptr.Size) + q.Wait() - if ptr.Size == 0 { - ptr.Size = obj.Size - } - - adapter := manifest.NewDownloadAdapter(adapterName) - var tcb tq.ProgressCallback - if cb != nil { - tcb = func(name string, totalSize, readSoFar int64, readSinceLast int) error { - return cb(totalSize, readSoFar, readSinceLast) + if errs := q.Errors(); len(errs) > 0 { + var multiErr error + for _, e := range errs { + if multiErr != nil { + multiErr = fmt.Errorf("%v\n%v", multiErr, e) + } else { + multiErr = e + } + return errors.Wrapf(multiErr, "Error downloading %s (%s)", workingfile, ptr.Oid) } } - // Single download - err = adapter.Begin(1, tcb) - if err != nil { - return err - } - res := <-adapter.Add(tq.NewTransfer(filepath.Base(workingfile), obj, mediafile)) - adapter.End() - - if res.Error != nil { - return errors.Wrapf(err, "Error buffering media file: %s", res.Error) - } return readLocalFile(writer, ptr, mediafile, workingfile, nil) } diff --git a/lfs/upload_queue.go b/lfs/upload_queue.go index 6c13a39f..f8df98c3 100644 --- a/lfs/upload_queue.go +++ b/lfs/upload_queue.go @@ -1,108 +1,10 @@ package lfs import ( - "fmt" - "os" - "path/filepath" - - "github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/config" - "github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/tq" ) -// Uploadable describes a file that can be uploaded. -type Uploadable struct { - oid string - OidPath string - Filename string - size int64 - object *api.ObjectResource -} - -func (u *Uploadable) Object() *api.ObjectResource { - return u.object -} - -func (u *Uploadable) Oid() string { - return u.oid -} - -func (u *Uploadable) Size() int64 { - return u.size -} - -func (u *Uploadable) Name() string { - return u.Filename -} - -func (u *Uploadable) SetObject(o *api.ObjectResource) { - u.object = o -} - -func (u *Uploadable) Path() string { - return u.OidPath -} - -// NewUploadable builds the Uploadable from the given information. -// "filename" can be empty if a raw object is pushed (see "object-id" flag in push command)/ -func NewUploadable(oid, filename string) (*Uploadable, error) { - localMediaPath, err := LocalMediaPath(oid) - if err != nil { - return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid) - } - - if len(filename) > 0 { - if err := ensureFile(filename, localMediaPath); err != nil { - return nil, err - } - } - - fi, err := os.Stat(localMediaPath) - if err != nil { - return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid) - } - - return &Uploadable{oid: oid, OidPath: localMediaPath, Filename: filename, size: fi.Size()}, nil -} - -// ensureFile makes sure that the cleanPath exists before pushing it. If it -// does not exist, it attempts to clean it by reading the file at smudgePath. -func ensureFile(smudgePath, cleanPath string) error { - if _, err := os.Stat(cleanPath); err == nil { - return nil - } - - expectedOid := filepath.Base(cleanPath) - localPath := filepath.Join(config.LocalWorkingDir, smudgePath) - file, err := os.Open(localPath) - if err != nil { - return err - } - - defer file.Close() - - stat, err := file.Stat() - if err != nil { - return err - } - - cleaned, err := PointerClean(file, file.Name(), stat.Size(), nil) - if cleaned != nil { - cleaned.Teardown() - } - - if err != nil { - return err - } - - if expectedOid != cleaned.Oid { - return fmt.Errorf("Trying to push %q with OID %s.\nNot found in %s.", smudgePath, expectedOid, filepath.Dir(cleanPath)) - } - - return nil -} - // NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads. func NewUploadQueue(cfg *config.Configuration, options ...tq.Option) *tq.TransferQueue { return tq.NewTransferQueue(tq.Upload, TransferManifest(cfg), options...) diff --git a/lfs/util.go b/lfs/util.go index 16836978..06b67e7d 100644 --- a/lfs/util.go +++ b/lfs/util.go @@ -87,90 +87,98 @@ func GetPlatform() Platform { return currentPlatform } +type PathConverter interface { + Convert(string) string +} + // Convert filenames expressed relative to the root of the repo relative to the // current working dir. Useful when needing to calling git with results from a rooted command, // but the user is in a subdir of their repo // Pass in a channel which you will fill with relative files & receive a channel which will get results -func ConvertRepoFilesRelativeToCwd(repochan <-chan string) (<-chan string, error) { - wd, err := os.Getwd() +func NewRepoToCurrentPathConverter() (PathConverter, error) { + r, c, p, err := pathConverterArgs() if err != nil { - return nil, fmt.Errorf("Unable to get working dir: %v", err) - } - wd = tools.ResolveSymlinks(wd) - - // Early-out if working dir is root dir, same result - passthrough := false - if config.LocalWorkingDir == wd { - passthrough = true + return nil, err } - outchan := make(chan string, 1) + return &repoToCurrentPathConverter{ + repoDir: r, + currDir: c, + passthrough: p, + }, nil +} - go func() { - for f := range repochan { - if passthrough { - outchan <- f - continue - } - abs := filepath.Join(config.LocalWorkingDir, f) - rel, err := filepath.Rel(wd, abs) - if err != nil { - // Use absolute file instead - outchan <- abs - } else { - outchan <- rel - } - } - close(outchan) - }() +type repoToCurrentPathConverter struct { + repoDir string + currDir string + passthrough bool +} - return outchan, nil +func (p *repoToCurrentPathConverter) Convert(filename string) string { + if p.passthrough { + return filename + } + + abs := filepath.Join(p.repoDir, filename) + rel, err := filepath.Rel(p.currDir, abs) + if err != nil { + // Use absolute file instead + return abs + } else { + return rel + } } // Convert filenames expressed relative to the current directory to be // relative to the repo root. Useful when calling git with arguments that requires them // to be rooted but the user is in a subdir of their repo & expects to use relative args // Pass in a channel which you will fill with relative files & receive a channel which will get results -func ConvertCwdFilesRelativeToRepo(cwdchan <-chan string) (<-chan string, error) { - curdir, err := os.Getwd() +func NewCurrentToRepoPathConverter() (PathConverter, error) { + r, c, p, err := pathConverterArgs() if err != nil { - return nil, fmt.Errorf("Could not retrieve current directory: %v", err) - } - // Make sure to resolve symlinks - curdir = tools.ResolveSymlinks(curdir) - - // Early-out if working dir is root dir, same result - passthrough := false - if config.LocalWorkingDir == curdir { - passthrough = true + return nil, err } - outchan := make(chan string, 1) - go func() { - for p := range cwdchan { - if passthrough { - outchan <- p - continue - } - var abs string - if filepath.IsAbs(p) { - abs = tools.ResolveSymlinks(p) - } else { - abs = filepath.Join(curdir, p) - } - reltoroot, err := filepath.Rel(config.LocalWorkingDir, abs) - if err != nil { - // Can't do this, use absolute as best fallback - outchan <- abs - } else { - outchan <- reltoroot - } - } - close(outchan) - }() + return ¤tToRepoPathConverter{ + repoDir: r, + currDir: c, + passthrough: p, + }, nil +} - return outchan, nil +type currentToRepoPathConverter struct { + repoDir string + currDir string + passthrough bool +} +func (p *currentToRepoPathConverter) Convert(filename string) string { + if p.passthrough { + return filename + } + + var abs string + if filepath.IsAbs(filename) { + abs = tools.ResolveSymlinks(filename) + } else { + abs = filepath.Join(p.currDir, filename) + } + reltoroot, err := filepath.Rel(p.repoDir, abs) + if err != nil { + // Can't do this, use absolute as best fallback + return abs + } else { + return reltoroot + } +} + +func pathConverterArgs() (string, string, bool, error) { + currDir, err := os.Getwd() + if err != nil { + return "", "", false, fmt.Errorf("Unable to get working dir: %v", err) + } + currDir = tools.ResolveSymlinks(currDir) + return config.LocalWorkingDir, currDir, config.LocalWorkingDir == currDir, nil } // Are we running on Windows? Need to handle some extra path shenanigans diff --git a/lfsapi/errors.go b/lfsapi/errors.go new file mode 100644 index 00000000..58b13eeb --- /dev/null +++ b/lfsapi/errors.go @@ -0,0 +1,79 @@ +package lfsapi + +import ( + "fmt" + "net/http" + + "github.com/git-lfs/git-lfs/errors" +) + +type ClientError struct { + Message string `json:"message"` + DocumentationUrl string `json:"documentation_url,omitempty"` + RequestId string `json:"request_id,omitempty"` +} + +func (e *ClientError) Error() string { + msg := e.Message + if len(e.DocumentationUrl) > 0 { + msg += "\nDocs: " + e.DocumentationUrl + } + if len(e.RequestId) > 0 { + msg += "\nRequest ID: " + e.RequestId + } + return msg +} + +func (c *Client) handleResponse(res *http.Response) error { + if res.StatusCode < 400 { + return nil + } + + cliErr := &ClientError{} + err := decodeResponse(res, cliErr) + if err == nil { + if len(cliErr.Message) == 0 { + err = defaultError(res) + } else { + err = errors.Wrap(cliErr, "http") + } + } + + if res.StatusCode == 401 { + return errors.NewAuthError(err) + } + + if res.StatusCode > 499 && res.StatusCode != 501 && res.StatusCode != 507 && res.StatusCode != 509 { + return errors.NewFatalError(err) + } + + return err +} + +var ( + defaultErrors = map[int]string{ + 400: "Client error: %s", + 401: "Authorization error: %s\nCheck that you have proper access to the repository", + 403: "Authorization error: %s\nCheck that you have proper access to the repository", + 404: "Repository or object not found: %s\nCheck that it exists and that you have proper access to it", + 429: "Rate limit exceeded: %s", + 500: "Server error: %s", + 501: "Not Implemented: %s", + 507: "Insufficient server storage: %s", + 509: "Bandwidth limit exceeded: %s", + } +) + +func defaultError(res *http.Response) error { + var msgFmt string + + if f, ok := defaultErrors[res.StatusCode]; ok { + msgFmt = f + } else if res.StatusCode < 500 { + msgFmt = defaultErrors[400] + fmt.Sprintf(" from HTTP %d", res.StatusCode) + } else { + msgFmt = defaultErrors[500] + fmt.Sprintf(" from HTTP %d", res.StatusCode) + } + + return errors.Errorf(msgFmt, res.Request.URL) +} diff --git a/lfsapi/lfsapi.go b/lfsapi/lfsapi.go new file mode 100644 index 00000000..4260f9bc --- /dev/null +++ b/lfsapi/lfsapi.go @@ -0,0 +1,42 @@ +package lfsapi + +import ( + "encoding/json" + "net/http" + "regexp" + + "github.com/git-lfs/git-lfs/errors" +) + +var ( + lfsMediaTypeRE = regexp.MustCompile(`\Aapplication/vnd\.git\-lfs\+json(;|\z)`) + jsonMediaTypeRE = regexp.MustCompile(`\Aapplication/json(;|\z)`) +) + +type Client struct { +} + +func (c *Client) Do(req *http.Request) (*http.Response, error) { + res, err := http.DefaultClient.Do(req) + if err != nil { + return res, err + } + + return res, c.handleResponse(res) +} + +func decodeResponse(res *http.Response, obj interface{}) error { + ctype := res.Header.Get("Content-Type") + if !(lfsMediaTypeRE.MatchString(ctype) || jsonMediaTypeRE.MatchString(ctype)) { + return nil + } + + err := json.NewDecoder(res.Body).Decode(obj) + res.Body.Close() + + if err != nil { + return errors.Wrapf(err, "Unable to parse HTTP response for %s %s", res.Request.Method, res.Request.URL) + } + + return nil +} diff --git a/locking/cache.go b/locking/cache.go new file mode 100644 index 00000000..765e7298 --- /dev/null +++ b/locking/cache.go @@ -0,0 +1,101 @@ +package locking + +import ( + "strings" + + "github.com/git-lfs/git-lfs/tools/kv" +) + +const ( + // We want to use a single cache file for integrity, but to make it easy to + // list all locks, prefix the id->path map in a way we can identify (something + // that won't be in a path) + idKeyPrefix string = "*id*://" +) + +type LockCache struct { + kv *kv.Store +} + +func NewLockCache(filepath string) (*LockCache, error) { + kv, err := kv.NewStore(filepath) + if err != nil { + return nil, err + } + return &LockCache{kv}, nil +} + +func (c *LockCache) encodeIdKey(id string) string { + // Safety against accidents + if !c.isIdKey(id) { + return idKeyPrefix + id + } + return id +} +func (c *LockCache) decodeIdKey(key string) string { + // Safety against accidents + if c.isIdKey(key) { + return key[len(idKeyPrefix):] + } + return key +} +func (c *LockCache) isIdKey(key string) bool { + return strings.HasPrefix(key, idKeyPrefix) +} + +// Cache a successful lock for faster local lookup later +func (c *LockCache) Add(l Lock) error { + // Store reference in both directions + // Path -> Lock + c.kv.Set(l.Path, &l) + // EncodedId -> Lock (encoded so we can easily identify) + c.kv.Set(c.encodeIdKey(l.Id), &l) + return nil +} + +// Remove a cached lock by path becuase it's been relinquished +func (c *LockCache) RemoveByPath(filePath string) error { + ilock := c.kv.Get(filePath) + if lock, ok := ilock.(*Lock); ok && lock != nil { + c.kv.Remove(lock.Path) + // Id as key is encoded + c.kv.Remove(c.encodeIdKey(lock.Id)) + } + return nil +} + +// Remove a cached lock by id because it's been relinquished +func (c *LockCache) RemoveById(id string) error { + // Id as key is encoded + idkey := c.encodeIdKey(id) + ilock := c.kv.Get(idkey) + if lock, ok := ilock.(*Lock); ok && lock != nil { + c.kv.Remove(idkey) + c.kv.Remove(lock.Path) + } + return nil +} + +// Get the list of cached locked files +func (c *LockCache) Locks() []Lock { + var locks []Lock + c.kv.Visit(func(key string, val interface{}) bool { + // Only report file->id entries not reverse + if !c.isIdKey(key) { + lock := val.(*Lock) + locks = append(locks, *lock) + } + return true // continue + }) + return locks +} + +// Clear the cache +func (c *LockCache) Clear() { + c.kv.RemoveAll() +} + +// Save the cache +func (c *LockCache) Save() error { + return c.kv.Save() +} diff --git a/locking/cache_test.go b/locking/cache_test.go new file mode 100644 index 00000000..c8f28fc8 --- /dev/null +++ b/locking/cache_test.go @@ -0,0 +1,61 @@ +package locking + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLockCache(t *testing.T) { + var err error + + tmpf, err := ioutil.TempFile("", "testCacheLock") + assert.Nil(t, err) + defer func() { + os.Remove(tmpf.Name()) + }() + tmpf.Close() + + cache, err := NewLockCache(tmpf.Name()) + assert.Nil(t, err) + + testLocks := []Lock{ + Lock{Path: "folder/test1.dat", Id: "101"}, + Lock{Path: "folder/test2.dat", Id: "102"}, + Lock{Path: "root.dat", Id: "103"}, + } + + for _, l := range testLocks { + err = cache.Add(l) + assert.Nil(t, err) + } + + locks := cache.Locks() + for _, l := range testLocks { + assert.Contains(t, locks, l) + } + assert.Equal(t, len(testLocks), len(locks)) + + err = cache.RemoveByPath("folder/test2.dat") + assert.Nil(t, err) + + locks = cache.Locks() + // delete item 1 from test locls + testLocks = append(testLocks[:1], testLocks[2:]...) + for _, l := range testLocks { + assert.Contains(t, locks, l) + } + assert.Equal(t, len(testLocks), len(locks)) + + err = cache.RemoveById("101") + assert.Nil(t, err) + + locks = cache.Locks() + testLocks = testLocks[1:] + for _, l := range testLocks { + assert.Contains(t, locks, l) + } + assert.Equal(t, len(testLocks), len(locks)) +} diff --git a/locking/locks.go b/locking/locks.go new file mode 100644 index 00000000..e1dd8ac2 --- /dev/null +++ b/locking/locks.go @@ -0,0 +1,276 @@ +package locking + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/git-lfs/git-lfs/api" + "github.com/git-lfs/git-lfs/config" + "github.com/git-lfs/git-lfs/git" + "github.com/git-lfs/git-lfs/tools/kv" +) + +var ( + // ErrNoMatchingLocks is an error returned when no matching locks were + // able to be resolved + ErrNoMatchingLocks = errors.New("lfs: no matching locks found") + // ErrLockAmbiguous is an error returned when multiple matching locks + // were found + ErrLockAmbiguous = errors.New("lfs: multiple locks found; ambiguous") +) + +// Client is the main interface object for the locking package +type Client struct { + cfg *config.Configuration + apiClient *api.Client + cache *LockCache +} + +// NewClient creates a new locking client with the given configuration +// You must call the returned object's `Close` method when you are finished with +// it +func NewClient(cfg *config.Configuration) (*Client, error) { + + apiClient := api.NewClient(api.NewHttpLifecycle(cfg)) + + lockDir := filepath.Join(config.LocalGitStorageDir, "lfs") + err := os.MkdirAll(lockDir, 0755) + if err != nil { + return nil, err + } + lockFile := filepath.Join(lockDir, "lockcache.db") + cache, err := NewLockCache(lockFile) + if err != nil { + return nil, err + } + return &Client{cfg, apiClient, cache}, nil +} + +// Close this client instance; must be called to dispose of resources +func (c *Client) Close() error { + return c.cache.Save() +} + +// LockFile attempts to lock a file on the current remote +// path must be relative to the root of the repository +// Returns the lock id if successful, or an error +func (c *Client) LockFile(path string) (Lock, error) { + + // TODO: this is not really the constraint we need to avoid merges, improve as per proposal + latest, err := git.CurrentRemoteRef() + if err != nil { + return Lock{}, err + } + + s, resp := c.apiClient.Locks.Lock(&api.LockRequest{ + Path: path, + Committer: api.NewCommitter(c.cfg.CurrentCommitter()), + LatestRemoteCommit: latest.Sha, + }) + + if _, err := c.apiClient.Do(s); err != nil { + return Lock{}, fmt.Errorf("Error communicating with LFS API: %v", err) + } + + if len(resp.Err) > 0 { + return Lock{}, fmt.Errorf("Server unable to create lock: %v", resp.Err) + } + + lock := c.newLockFromApi(*resp.Lock) + + if err := c.cache.Add(lock); err != nil { + return Lock{}, fmt.Errorf("Error caching lock information: %v", err) + } + + return lock, nil +} + +// UnlockFile attempts to unlock a file on the current remote +// path must be relative to the root of the repository +// Force causes the file to be unlocked from other users as well +func (c *Client) UnlockFile(path string, force bool) error { + + id, err := c.lockIdFromPath(path) + if err != nil { + return fmt.Errorf("Unable to get lock id: %v", err) + } + + return c.UnlockFileById(id, force) + +} + +// UnlockFileById attempts to unlock a lock with a given id on the current remote +// Force causes the file to be unlocked from other users as well +func (c *Client) UnlockFileById(id string, force bool) error { + s, resp := c.apiClient.Locks.Unlock(id, force) + + if _, err := c.apiClient.Do(s); err != nil { + return fmt.Errorf("Error communicating with LFS API: %v", err) + } + + if len(resp.Err) > 0 { + return fmt.Errorf("Server unable to unlock lock: %v", resp.Err) + } + + if err := c.cache.RemoveById(id); err != nil { + return fmt.Errorf("Error caching unlock information: %v", err) + } + + return nil +} + +// Lock is a record of a locked file +type Lock struct { + // Id is the unique identifier corresponding to this particular Lock. It + // must be consistent with the local copy, and the server's copy. + Id string + // Path is an absolute path to the file that is locked as a part of this + // lock. + Path string + // Name is the name of the person holding this lock + Name string + // Email address of the person holding this lock + Email string + // LockedAt is the time at which this lock was acquired. + LockedAt time.Time +} + +func (c *Client) newLockFromApi(a api.Lock) Lock { + return Lock{ + Id: a.Id, + Path: a.Path, + Name: a.Committer.Name, + Email: a.Committer.Email, + LockedAt: a.LockedAt, + } +} + +// SearchLocks returns a channel of locks which match the given name/value filter +// If limit > 0 then search stops at that number of locks +// If localOnly = true, don't query the server & report only own local locks +func (c *Client) SearchLocks(filter map[string]string, limit int, localOnly bool) (locks []Lock, err error) { + + if localOnly { + return c.searchCachedLocks(filter, limit) + } else { + return c.searchRemoteLocks(filter, limit) + } +} + +func (c *Client) searchCachedLocks(filter map[string]string, limit int) ([]Lock, error) { + cachedlocks := c.cache.Locks() + path, filterByPath := filter["path"] + id, filterById := filter["id"] + lockCount := 0 + locks := make([]Lock, 0, len(cachedlocks)) + for _, l := range cachedlocks { + // Manually filter by Path/Id + if (filterByPath && path != l.Path) || + (filterById && id != l.Id) { + continue + } + locks = append(locks, l) + lockCount++ + if limit > 0 && lockCount >= limit { + break + } + } + return locks, nil +} + +func (c *Client) searchRemoteLocks(filter map[string]string, limit int) ([]Lock, error) { + locks := make([]Lock, 0, limit) + + apifilters := make([]api.Filter, 0, len(filter)) + for k, v := range filter { + apifilters = append(apifilters, api.Filter{k, v}) + } + query := &api.LockSearchRequest{Filters: apifilters} + for { + s, resp := c.apiClient.Locks.Search(query) + if _, err := c.apiClient.Do(s); err != nil { + return locks, fmt.Errorf("Error communicating with LFS API: %v", err) + } + + if resp.Err != "" { + return locks, fmt.Errorf("Error response from LFS API: %v", resp.Err) + } + + for _, l := range resp.Locks { + locks = append(locks, c.newLockFromApi(l)) + if limit > 0 && len(locks) >= limit { + // Exit outer loop too + return locks, nil + } + } + + if resp.NextCursor != "" { + query.Cursor = resp.NextCursor + } else { + break + } + } + + return locks, nil + +} + +// lockIdFromPath makes a call to the LFS API and resolves the ID for the locked +// locked at the given path. +// +// If the API call failed, an error will be returned. If multiple locks matched +// the given path (should not happen during real-world usage), an error will be +// returnd. If no locks matched the given path, an error will be returned. +// +// If the API call is successful, and only one lock matches the given filepath, +// then its ID will be returned, along with a value of "nil" for the error. +func (c *Client) lockIdFromPath(path string) (string, error) { + s, resp := c.apiClient.Locks.Search(&api.LockSearchRequest{ + Filters: []api.Filter{ + {"path", path}, + }, + }) + + if _, err := c.apiClient.Do(s); err != nil { + return "", err + } + + switch len(resp.Locks) { + case 0: + return "", ErrNoMatchingLocks + case 1: + return resp.Locks[0].Id, nil + default: + return "", ErrLockAmbiguous + } +} + +// Fetch locked files for the current committer and cache them locally +// This can be used to sync up locked files when moving machines +func (c *Client) refreshLockCache() error { + // TODO: filters don't seem to currently define how to search for a + // committer's email. Is it "committer.email"? For now, just iterate + locks, err := c.SearchLocks(nil, 0, false) + if err != nil { + return err + } + + // We're going to overwrite the entire local cache + c.cache.Clear() + + _, email := c.cfg.CurrentCommitter() + for _, l := range locks { + if l.Email == email { + c.cache.Add(l) + } + } + + return nil +} + +func init() { + kv.RegisterTypeForStorage(&Lock{}) +} diff --git a/locking/locks_test.go b/locking/locks_test.go new file mode 100644 index 00000000..a5f8d7f2 --- /dev/null +++ b/locking/locks_test.go @@ -0,0 +1,98 @@ +package locking + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "os" + "sort" + "testing" + "time" + + "github.com/git-lfs/git-lfs/api" + "github.com/git-lfs/git-lfs/config" + "github.com/stretchr/testify/assert" +) + +type TestLifecycle struct { +} + +func (l *TestLifecycle) Build(schema *api.RequestSchema) (*http.Request, error) { + return http.NewRequest("GET", "http://dummy", nil) +} + +func (l *TestLifecycle) Execute(req *http.Request, into interface{}) (api.Response, error) { + // Return test data including other users + locks := api.LockList{Locks: []api.Lock{ + api.Lock{Id: "99", Path: "folder/test3.dat", Committer: api.Committer{Name: "Alice", Email: "alice@wonderland.com"}}, + api.Lock{Id: "101", Path: "folder/test1.dat", Committer: api.Committer{Name: "Fred", Email: "fred@bloggs.com"}}, + api.Lock{Id: "102", Path: "folder/test2.dat", Committer: api.Committer{Name: "Fred", Email: "fred@bloggs.com"}}, + api.Lock{Id: "103", Path: "root.dat", Committer: api.Committer{Name: "Fred", Email: "fred@bloggs.com"}}, + api.Lock{Id: "199", Path: "other/test1.dat", Committer: api.Committer{Name: "Charles", Email: "charles@incharge.com"}}, + }} + locksJson, _ := json.Marshal(locks) + r := &http.Response{ + Status: "200 OK", + StatusCode: 200, + Proto: "HTTP/1.0", + Body: ioutil.NopCloser(bytes.NewReader(locksJson)), + } + if into != nil { + decoder := json.NewDecoder(r.Body) + if err := decoder.Decode(into); err != nil { + return nil, err + } + } + return api.WrapHttpResponse(r), nil +} +func (l *TestLifecycle) Cleanup(resp api.Response) error { + return resp.Body().Close() +} + +type LocksById []Lock + +func (a LocksById) Len() int { return len(a) } +func (a LocksById) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a LocksById) Less(i, j int) bool { return a[i].Id < a[j].Id } + +func TestRefreshCache(t *testing.T) { + var err error + oldStore := config.LocalGitStorageDir + config.LocalGitStorageDir, err = ioutil.TempDir("", "testCacheLock") + assert.Nil(t, err) + defer func() { + os.RemoveAll(config.LocalGitStorageDir) + config.LocalGitStorageDir = oldStore + }() + + cfg := config.NewFrom(config.Values{ + Git: map[string]string{"user.name": "Fred", "user.email": "fred@bloggs.com"}}) + client, err := NewClient(cfg) + assert.Nil(t, err) + // Override api client for testing + client.apiClient = api.NewClient(&TestLifecycle{}) + + // Should start with no cached items + locks, err := client.SearchLocks(nil, 0, true) + assert.Nil(t, err) + assert.Empty(t, locks) + + // Should load from test data, just Fred's + err = client.refreshLockCache() + assert.Nil(t, err) + + locks, err = client.SearchLocks(nil, 0, true) + assert.Nil(t, err) + // Need to include zero time in structure for equal to work + zeroTime := time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC) + + // Sort locks for stable comparison + sort.Sort(LocksById(locks)) + assert.Equal(t, []Lock{ + Lock{Path: "folder/test1.dat", Id: "101", Name: "Fred", Email: "fred@bloggs.com", LockedAt: zeroTime}, + Lock{Path: "folder/test2.dat", Id: "102", Name: "Fred", Email: "fred@bloggs.com", LockedAt: zeroTime}, + Lock{Path: "root.dat", Id: "103", Name: "Fred", Email: "fred@bloggs.com", LockedAt: zeroTime}, + }, locks) + +} diff --git a/progress/noop.go b/progress/noop.go index 0ab4f3c6..27b36e2c 100644 --- a/progress/noop.go +++ b/progress/noop.go @@ -7,6 +7,7 @@ func Noop() Meter { type nonMeter struct{} func (m *nonMeter) Start() {} +func (m *nonMeter) Add(size int64) {} func (m *nonMeter) Skip(size int64) {} func (m *nonMeter) StartTransfer(name string) {} func (m *nonMeter) TransferBytes(direction, name string, read, total int64, current int) {} diff --git a/progress/progress.go b/progress/progress.go index 21ae289a..c016b3c5 100644 --- a/progress/progress.go +++ b/progress/progress.go @@ -4,6 +4,7 @@ package progress type Meter interface { Start() + Add(int64) Skip(size int64) StartTransfer(name string) TransferBytes(direction, name string, read, total int64, current int) diff --git a/script/bootstrap b/script/bootstrap index 1ed05341..e57a1b80 100755 --- a/script/bootstrap +++ b/script/bootstrap @@ -14,6 +14,14 @@ if [ -z "$GOPATH" ]; then ln -s "$GOPATH" src/github.com/git-lfs/git-lfs fi +if uname -s | grep -q "_NT-"; then + echo "Installing goversioninfo to embed resources into Windows executables..." + go get github.com/josephspurrier/goversioninfo/cmd/goversioninfo + export PATH=$PATH:$GOPATH/bin/windows_386 + echo "Creating the resource.syso version information file..." + go generate +fi + script/fmt rm -rf bin/* GO15VENDOREXPERIMENT=1 go run script/*.go -cmd build "$@" diff --git a/script/update-version b/script/update-version new file mode 100755 index 00000000..1742ff88 --- /dev/null +++ b/script/update-version @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +VERSION_STRING=$1 +VERSION_ARRAY=( ${VERSION_STRING//./ } ) +VERSION_MAJOR=${VERSION_ARRAY[0]} +VERSION_MINOR=${VERSION_ARRAY[1]} +VERSION_PATCH=${VERSION_ARRAY[2]:-0} +VERSION_BUILD=${VERSION_ARRAY[3]:-0} + +# Update the version number git-lfs is reporting. +sed -i "s,\(Version = \"\).*\(\"\),\1$VERSION_STRING\2," config/version.go + +# Update the version number in the RPM package. +sed -i "s,\(Version:[[:space:]]*\).*,\1$VERSION_STRING," rpm/SPECS/git-lfs.spec + +# Update the version numbers in the Windows installer. +sed -i "s,\(\"Major\": \).*\,,\1$VERSION_MAJOR\,," versioninfo.json +sed -i "s,\(\"Minor\": \).*\,,\1$VERSION_MINOR\,," versioninfo.json +sed -i "s,\(\"Patch\": \).*\,,\1$VERSION_PATCH\,," versioninfo.json +sed -i "s,\(\"Build\": \).*,\1$VERSION_BUILD," versioninfo.json +sed -i "s,\(\"ProductVersion\": \"\).*\(\"\),\1$VERSION_STRING\2," versioninfo.json diff --git a/script/windows-installer/inno-setup-git-lfs-installer.iss b/script/windows-installer/inno-setup-git-lfs-installer.iss index d1a288d9..227da56c 100644 --- a/script/windows-installer/inno-setup-git-lfs-installer.iss +++ b/script/windows-installer/inno-setup-git-lfs-installer.iss @@ -1,8 +1,10 @@ #define MyAppName "Git LFS" -; Misuse RemoveFileExt to strip the 4th patch-level version number. ; Arbitrarily choose the x86 executable here as both have the version embedded. -#define MyAppVersion RemoveFileExt(GetFileVersion("..\..\git-lfs-x86.exe")) +#define MyVersionInfoVersion GetFileVersion("..\..\git-lfs-x86.exe") + +; Misuse RemoveFileExt to strip the 4th patch-level version number. +#define MyAppVersion RemoveFileExt(MyVersionInfoVersion) #define MyAppPublisher "GitHub, Inc." #define MyAppURL "https://git-lfs.github.com/" @@ -15,7 +17,7 @@ AppId={{286391DE-F778-44EA-9375-1B21AAA04FF0} AppName={#MyAppName} AppVersion={#MyAppVersion} -;AppVerName={#MyAppName} {#MyAppVersion} +AppCopyright=GitHub, Inc. and Git LFS contributors AppPublisher={#MyAppPublisher} AppPublisherURL={#MyAppURL} AppSupportURL={#MyAppURL} @@ -32,6 +34,7 @@ DisableReadyPage=True ArchitecturesInstallIn64BitMode=x64 ChangesEnvironment=yes SetupIconFile=git-lfs-logo.ico +VersionInfoVersion={#MyVersionInfoVersion} WizardImageFile=git-lfs-wizard-image.bmp WizardSmallImageFile=git-lfs-logo.bmp diff --git a/script/windows-installer/resources.rc b/script/windows-installer/resources.rc deleted file mode 100644 index 5fb0b451..00000000 --- a/script/windows-installer/resources.rc +++ /dev/null @@ -1,20 +0,0 @@ -main ICON "git-lfs-logo.ico" -1 VERSIONINFO -FILEVERSION 1,5,0,0 -BEGIN - BLOCK "StringFileInfo" - BEGIN - BLOCK "040904b0" /* LANG_ENGLISH/SUBLANG_ENGLISH_US, Unicode CP */ - BEGIN - VALUE "FileDescription", "Git LFS\0" - VALUE "ProductName", "Git Large File Storage (LFS)\0" - VALUE "ProductVersion", "1.5.0\0" - VALUE "LegalCopyright", "GitHub, Inc. and Git LFS contributors\0" - END - END - - BLOCK "VarFileInfo" - BEGIN - VALUE "Translation", 0x409, 1200 - END -END diff --git a/test/git-lfs-test-server-api/main.go b/test/git-lfs-test-server-api/main.go index efd0af7e..11aa840c 100644 --- a/test/git-lfs-test-server-api/main.go +++ b/test/git-lfs-test-server-api/main.go @@ -165,11 +165,11 @@ func buildTestData() (oidsExist, oidsMissing []TestObject, err error) { for _, f := range outputs[0].Files { oidsExist = append(oidsExist, TestObject{Oid: f.Oid, Size: f.Size}) - u, err := lfs.NewUploadable(f.Oid, "Test file") + t, err := uploadTransfer(f.Oid, "Test file") if err != nil { return nil, nil, err } - uploadQueue.Add(u) + uploadQueue.Add(t.Name, t.Path, t.Oid, t.Size) } uploadQueue.Wait() @@ -287,6 +287,25 @@ func interleaveTestData(slice1, slice2 []TestObject) []TestObject { return ret } +func uploadTransfer(oid, filename string) (*tq.Transfer, error) { + localMediaPath, err := lfs.LocalMediaPath(oid) + if err != nil { + return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid) + } + + fi, err := os.Stat(localMediaPath) + if err != nil { + return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid) + } + + return &tq.Transfer{ + Name: filename, + Path: localMediaPath, + Oid: oid, + Size: fi.Size(), + }, nil +} + func init() { RootCmd.Flags().StringVarP(&apiUrl, "url", "u", "", "URL of the API (must supply this or --clone)") RootCmd.Flags().StringVarP(&cloneUrl, "clone", "c", "", "Clone URL from which to find API (must supply this or --url)") diff --git a/test/test-custom-transfers.sh b/test/test-custom-transfers.sh index 9a32bb76..e1ebd079 100755 --- a/test/test-custom-transfers.sh +++ b/test/test-custom-transfers.sh @@ -28,7 +28,7 @@ begin_test "custom-transfer-wrong-path" GIT_TRACE=1 git push origin master 2>&1 | tee pushcustom.log # use PIPESTATUS otherwise we get exit code from tee res=${PIPESTATUS[0]} - grep "xfer: Custom transfer adapter" pushcustom.log + grep "xfer: adapter \"testcustom\" Begin()" pushcustom.log grep "Failed to start custom transfer command" pushcustom.log if [ "$res" = "0" ]; then echo "Push should have failed because of an incorrect custom transfer path." @@ -101,9 +101,8 @@ begin_test "custom-transfer-upload-download" grep "xfer: started custom adapter process" fetchcustom.log grep "xfer\[lfstest-customadapter\]:" fetchcustom.log - grep "11 of 11 files" fetchcustom.log + grep "11 of 11 files" fetchcustom.log [ `find .git/lfs/objects -type f | wc -l` = 11 ] ) end_test - diff --git a/test/test-locks.sh b/test/test-locks.sh index 9d0f32b1..43f1a94a 100755 --- a/test/test-locks.sh +++ b/test/test-locks.sh @@ -88,3 +88,48 @@ begin_test "list locks with pagination" grep "4 lock(s) matched query" locks.log ) end_test + +begin_test "cached locks" +( + set -e + + reponame="cached_locks" + setup_remote_repo "remote_$reponame" + clone_repo "remote_$reponame" "clone_$reponame" + + git lfs track "*.dat" + echo "foo" > "cached1.dat" + echo "bar" > "cached2.dat" + + git add "cached1.dat" "cached2.dat" ".gitattributes" + git commit -m "add files" | tee commit.log + grep "3 files changed" commit.log + grep "create mode 100644 cached1.dat" commit.log + grep "create mode 100644 cached2.dat" commit.log + grep "create mode 100644 .gitattributes" commit.log + + git push origin master 2>&1 | tee push.log + grep "master -> master" push.log + + GITLFSLOCKSENABLED=1 git lfs lock "cached1.dat" | tee lock.log + assert_server_lock "$(grep -oh "\((.*)\)" lock.log | tr -d "()")" + + GITLFSLOCKSENABLED=1 git lfs lock "cached2.dat" | tee lock.log + assert_server_lock "$(grep -oh "\((.*)\)" lock.log | tr -d "()")" + + GITLFSLOCKSENABLED=1 git lfs locks --local | tee locks.log + grep "2 lock(s) matched query" locks.log + + # delete the remote to prove we're using the local records + git remote remove origin + + GITLFSLOCKSENABLED=1 git lfs locks --local --path "cached1.dat" | tee locks.log + grep "1 lock(s) matched query" locks.log + grep "cached1.dat" locks.log + + GITLFSLOCKSENABLED=1 git lfs locks --local --limit 1 | tee locks.log + grep "1 lock(s) matched query" locks.log +) +end_test + + diff --git a/test/test-push.sh b/test/test-push.sh index 860a3867..d161c915 100755 --- a/test/test-push.sh +++ b/test/test-push.sh @@ -507,7 +507,7 @@ begin_test "push (retry with expired actions)" GIT_TRACE=1 git push origin master 2>&1 | tee push.log - [ "1" -eq "$(grep -c "expired, retrying..." push.log)" ] + [ "1" -eq "$(grep -c "enqueue retry" push.log)" ] grep "(1 of 1 files)" push.log ) end_test diff --git a/tools/kv/keyvaluestore.go b/tools/kv/keyvaluestore.go new file mode 100644 index 00000000..0699b98f --- /dev/null +++ b/tools/kv/keyvaluestore.go @@ -0,0 +1,230 @@ +package kv + +import ( + "encoding/gob" + "fmt" + "io" + "os" + "sync" +) + +// Store provides an in-memory key/value store which is persisted to +// a file. The file handle itself is not kept locked for the duration; it is +// only locked during load and save, to make it concurrency friendly. When +// saving, the store uses optimistic locking to determine whether the db on disk +// has been modified by another process; in which case it loads the latest +// version and re-applies modifications made during this session. This means +// the Lost Update db concurrency issue is possible; so don't use this if you +// need more DB integrity than Read Committed isolation levels. +type Store struct { + // Locks the entire store + mu sync.RWMutex + filename string + log []change + + // This is the persistent data + // version for optimistic locking, this field is incremented with every Save() + version int64 + db map[string]interface{} +} + +// Type of operation; set or remove +type operation int + +const ( + // Set a value for a key + setOperation = operation(iota) + // Removed a value for a key + removeOperation = operation(iota) +) + +type change struct { + op operation + key string + value interface{} +} + +// NewStore creates a new key/value store and initialises it with contents from +// the named file, if it exists +func NewStore(filepath string) (*Store, error) { + kv := &Store{filename: filepath, db: make(map[string]interface{})} + return kv, kv.loadAndMergeIfNeeded() +} + +// Set updates the key/value store in memory +// Changes are not persisted until you call Save() +func (k *Store) Set(key string, value interface{}) { + k.mu.Lock() + defer k.mu.Unlock() + + k.db[key] = value + k.logChange(setOperation, key, value) +} + +// Remove removes the key and its value from the store in memory +// Changes are not persisted until you call Save() +func (k *Store) Remove(key string) { + k.mu.Lock() + defer k.mu.Unlock() + + delete(k.db, key) + k.logChange(removeOperation, key, nil) +} + +// RemoveAll removes all entries from the store +// These changes are not persisted until you call Save() +func (k *Store) RemoveAll() { + k.mu.Lock() + defer k.mu.Unlock() + + // Log all changes + for key, _ := range k.db { + k.logChange(removeOperation, key, nil) + } + k.db = make(map[string]interface{}) +} + +// Visit walks through the entire store via a function; return false from +// your visitor function to halt the walk +func (k *Store) Visit(cb func(string, interface{}) bool) { + // Read-only lock + k.mu.RLock() + defer k.mu.RUnlock() + + for k, v := range k.db { + if !cb(k, v) { + break + } + } +} + +// Append a change to the log; mutex must already be locked +func (k *Store) logChange(op operation, key string, value interface{}) { + k.log = append(k.log, change{op, key, value}) +} + +// Get retrieves a value from the store, or nil if it is not present +func (k *Store) Get(key string) interface{} { + // Read-only lock + k.mu.RLock() + defer k.mu.RUnlock() + + // zero value of interface{} is nil so this does what we want + return k.db[key] +} + +// Save persists the changes made to disk +// If any changes have been written by other code they will be merged +func (k *Store) Save() error { + k.mu.Lock() + defer k.mu.Unlock() + + // Short-circuit if we have no changes + if len(k.log) == 0 { + return nil + } + + // firstly peek at version; open read/write to keep lock between check & write + f, err := os.OpenFile(k.filename, os.O_RDWR|os.O_CREATE, 0664) + if err != nil { + return err + } + + defer f.Close() + + // Only try to merge if > 0 bytes, ignore empty files (decoder will fail) + if stat, _ := f.Stat(); stat.Size() > 0 { + k.loadAndMergeReaderIfNeeded(f) + // Now we overwrite the file + f.Seek(0, os.SEEK_SET) + f.Truncate(0) + } + + k.version++ + + enc := gob.NewEncoder(f) + if err := enc.Encode(k.version); err != nil { + return fmt.Errorf("Error while writing version data to %v: %v", k.filename, err) + } + if err := enc.Encode(k.db); err != nil { + return fmt.Errorf("Error while writing new key/value data to %v: %v", k.filename, err) + } + // Clear log now that it's saved + k.log = nil + + return nil +} + +// Reads as little as possible from the passed in file to determine if the +// contents are different from the version already held. If so, reads the +// contents and merges with any outstanding changes. If not, stops early without +// reading the rest of the file +func (k *Store) loadAndMergeIfNeeded() error { + stat, err := os.Stat(k.filename) + if err != nil { + if os.IsNotExist(err) { + return nil // missing is OK + } + return err + } + // Do nothing if empty file + if stat.Size() == 0 { + return nil + } + + f, err := os.OpenFile(k.filename, os.O_RDONLY, 0664) + if err == nil { + defer f.Close() + return k.loadAndMergeReaderIfNeeded(f) + } else { + return err + } +} + +// As loadAndMergeIfNeeded but lets caller decide how to manage file handles +func (k *Store) loadAndMergeReaderIfNeeded(f io.Reader) error { + var versionOnDisk int64 + // Decode *only* the version field to check whether anyone else has + // modified the db; gob serializes structs in order so it will always be 1st + dec := gob.NewDecoder(f) + err := dec.Decode(&versionOnDisk) + if err != nil { + return fmt.Errorf("Problem checking version of key/value data from %v: %v", k.filename, err) + } + // Totally uninitialised Version == 0, saved versions are always >=1 + if versionOnDisk != k.version { + // Reload data & merge + var dbOnDisk map[string]interface{} + err = dec.Decode(&dbOnDisk) + if err != nil { + return fmt.Errorf("Problem reading updated key/value data from %v: %v", k.filename, err) + } + k.reapplyChanges(dbOnDisk) + k.version = versionOnDisk + } + return nil +} + +// reapplyChanges replays the changes made since the last load onto baseDb +// and stores the result as our own DB +func (k *Store) reapplyChanges(baseDb map[string]interface{}) { + for _, change := range k.log { + switch change.op { + case setOperation: + baseDb[change.key] = change.value + case removeOperation: + delete(baseDb, change.key) + } + } + // Note, log is not cleared here, that only happens on Save since it's a + // list of unsaved changes + k.db = baseDb + +} + +// RegisterTypeForStorage registers a custom type (e.g. a struct) for +// use in the key value store. This is necessary if you intend to pass custom +// structs to Store.Set() rather than primitive types. +func RegisterTypeForStorage(val interface{}) { + gob.Register(val) +} diff --git a/tools/kv/keyvaluestore_test.go b/tools/kv/keyvaluestore_test.go new file mode 100644 index 00000000..2f531f35 --- /dev/null +++ b/tools/kv/keyvaluestore_test.go @@ -0,0 +1,187 @@ +package kv + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStoreSimple(t *testing.T) { + tmpf, err := ioutil.TempFile("", "lfstest1") + assert.Nil(t, err) + filename := tmpf.Name() + defer os.Remove(filename) + tmpf.Close() + + kvs, err := NewStore(filename) + assert.Nil(t, err) + + // We'll include storing custom structs + type customData struct { + Val1 string + Val2 int + } + // Needed to store custom struct + RegisterTypeForStorage(&customData{}) + + kvs.Set("stringVal", "This is a string value") + kvs.Set("intVal", 3) + kvs.Set("floatVal", 3.142) + kvs.Set("structVal", &customData{"structTest", 20}) + + s := kvs.Get("stringVal") + assert.Equal(t, "This is a string value", s) + i := kvs.Get("intVal") + assert.Equal(t, 3, i) + f := kvs.Get("floatVal") + assert.Equal(t, 3.142, f) + c := kvs.Get("structVal") + assert.Equal(t, c, &customData{"structTest", 20}) + n := kvs.Get("noValue") + assert.Nil(t, n) + + kvs.Remove("stringVal") + s = kvs.Get("stringVal") + assert.Nil(t, s) + // Set the string value again before saving + kvs.Set("stringVal", "This is a string value") + + err = kvs.Save() + assert.Nil(t, err) + kvs = nil + + // Now confirm that we can read it all back + kvs2, err := NewStore(filename) + assert.Nil(t, err) + s = kvs2.Get("stringVal") + assert.Equal(t, "This is a string value", s) + i = kvs2.Get("intVal") + assert.Equal(t, 3, i) + f = kvs2.Get("floatVal") + assert.Equal(t, 3.142, f) + c = kvs2.Get("structVal") + assert.Equal(t, c, &customData{"structTest", 20}) + n = kvs2.Get("noValue") + assert.Nil(t, n) + + // Test remove all + kvs2.RemoveAll() + s = kvs2.Get("stringVal") + assert.Nil(t, s) + i = kvs2.Get("intVal") + assert.Nil(t, i) + f = kvs2.Get("floatVal") + assert.Nil(t, f) + c = kvs2.Get("structVal") + assert.Nil(t, c) + + err = kvs2.Save() + assert.Nil(t, err) + kvs2 = nil + + // Now confirm that we can read blank & get nothing + kvs, err = NewStore(filename) + kvs.Visit(func(k string, v interface{}) bool { + // Should not be called + assert.Fail(t, "Should be no entries") + return true + }) + +} + +func TestStoreOptimisticConflict(t *testing.T) { + tmpf, err := ioutil.TempFile("", "lfstest2") + assert.Nil(t, err) + filename := tmpf.Name() + defer os.Remove(filename) + tmpf.Close() + + kvs1, err := NewStore(filename) + assert.Nil(t, err) + + kvs1.Set("key1", "value1") + kvs1.Set("key2", "value2") + kvs1.Set("key3", "value3") + err = kvs1.Save() + assert.Nil(t, err) + + // Load second copy & modify + kvs2, err := NewStore(filename) + assert.Nil(t, err) + // New keys + kvs2.Set("key4", "value4_fromkvs2") + kvs2.Set("key5", "value5_fromkvs2") + // Modify a key too + kvs2.Set("key1", "value1_fromkvs2") + err = kvs2.Save() + assert.Nil(t, err) + + // Now modify first copy & save; it should detect optimistic lock issue + // New item + kvs1.Set("key10", "value10") + // Overlapping item; since we save second this will overwrite one from kvs2 + kvs1.Set("key4", "value4") + err = kvs1.Save() + assert.Nil(t, err) + + // This should have merged changes from kvs2 in the process + v := kvs1.Get("key1") + assert.Equal(t, "value1_fromkvs2", v) // this one was modified by kvs2 + v = kvs1.Get("key2") + assert.Equal(t, "value2", v) + v = kvs1.Get("key3") + assert.Equal(t, "value3", v) + v = kvs1.Get("key4") + assert.Equal(t, "value4", v) // we overwrote this so would not be merged + v = kvs1.Get("key5") + assert.Equal(t, "value5_fromkvs2", v) + +} + +func TestStoreReduceSize(t *testing.T) { + tmpf, err := ioutil.TempFile("", "lfstest3") + assert.Nil(t, err) + filename := tmpf.Name() + defer os.Remove(filename) + tmpf.Close() + + kvs, err := NewStore(filename) + assert.Nil(t, err) + + kvs.Set("key1", "I woke up in a Soho doorway") + kvs.Set("key2", "A policeman knew my name") + kvs.Set("key3", "He said 'You can go sleep at home tonight") + kvs.Set("key4", "If you can get up and walk away'") + + assert.NotNil(t, kvs.Get("key1")) + assert.NotNil(t, kvs.Get("key2")) + assert.NotNil(t, kvs.Get("key3")) + assert.NotNil(t, kvs.Get("key4")) + + assert.Nil(t, kvs.Save()) + + stat1, _ := os.Stat(filename) + + // Remove all but 1 key & save smaller version + kvs.Remove("key2") + kvs.Remove("key3") + kvs.Remove("key4") + assert.Nil(t, kvs.Save()) + + // Now reload fresh & prove works + kvs = nil + + kvs, err = NewStore(filename) + assert.Nil(t, err) + assert.NotNil(t, kvs.Get("key1")) + assert.Nil(t, kvs.Get("key2")) + assert.Nil(t, kvs.Get("key3")) + assert.Nil(t, kvs.Get("key4")) + + stat2, _ := os.Stat(filename) + + assert.True(t, stat2.Size() < stat1.Size(), "Size should have reduced, was %d now %d", stat1.Size(), stat2.Size()) + +} diff --git a/tq/adapterbase.go b/tq/adapterbase.go index 6b842bc2..85262e51 100644 --- a/tq/adapterbase.go +++ b/tq/adapterbase.go @@ -3,19 +3,10 @@ package tq import ( "fmt" "sync" - "time" - "github.com/git-lfs/git-lfs/errors" "github.com/rubyist/tracerx" ) -const ( - // objectExpirationToTransfer is the duration we expect to have passed - // from the time that the object's expires_at property is checked to - // when the transfer is executed. - objectExpirationToTransfer = 5 * time.Second -) - // adapterBase implements the common functionality for core adapters which // process transfers with N workers handling an oid each, and which wait for // authentication to succeed on one worker before proceeding @@ -69,9 +60,10 @@ func (a *adapterBase) Direction() Direction { return a.direction } -func (a *adapterBase) Begin(maxConcurrency int, cb ProgressCallback) error { +func (a *adapterBase) Begin(cfg AdapterConfig, cb ProgressCallback) error { a.cb = cb a.jobChan = make(chan *job, 100) + maxConcurrency := cfg.ConcurrentTransfers() tracerx.Printf("xfer: adapter %q Begin() with %d workers", a.Name(), maxConcurrency) @@ -156,27 +148,12 @@ func (a *adapterBase) worker(workerNum int, ctx interface{}) { signalAuthOnResponse = false } } - tracerx.Printf("xfer: adapter %q worker %d processing job for %q", a.Name(), workerNum, t.Object.Oid) - - // transferTime is the time that we are to compare the transfer's - // `expired_at` property against. - // - // We add the `objectExpirationToTransfer` since there will be - // some time lost from this comparison to the time we actually - // transfer the object - transferTime := time.Now().Add(objectExpirationToTransfer) + tracerx.Printf("xfer: adapter %q worker %d processing job for %q", a.Name(), workerNum, t.Oid) // Actual transfer happens here var err error - if expAt, expired := t.Object.IsExpired(transferTime); expired { - tracerx.Printf("xfer: adapter %q worker %d found job for %q expired, retrying...", a.Name(), workerNum, t.Object.Oid) - err = errors.NewRetriableError(errors.Errorf( - "lfs/transfer: object %q expires at %s", - t.Object.Oid, expAt.In(time.Local).Format(time.RFC822), - )) - } else if t.Object.Size < 0 { - tracerx.Printf("xfer: adapter %q worker %d found invalid size for %q (got: %d), retrying...", a.Name(), workerNum, t.Object.Oid, t.Object.Size) - err = fmt.Errorf("Git LFS: object %q has invalid size (got: %d)", t.Object.Oid, t.Object.Size) + if t.Size < 0 { + err = fmt.Errorf("Git LFS: object %q has invalid size (got: %d)", t.Oid, t.Size) } else { err = a.transferImpl.DoTransfer(ctx, t, a.cb, authCallback) } @@ -184,7 +161,7 @@ func (a *adapterBase) worker(workerNum int, ctx interface{}) { // Mark the job as completed, and alter all listeners job.Done(err) - tracerx.Printf("xfer: adapter %q worker %d finished job for %q", a.Name(), workerNum, t.Object.Oid) + tracerx.Printf("xfer: adapter %q worker %d finished job for %q", a.Name(), workerNum, t.Oid) } // This will only happen if no jobs were submitted; just wake up all workers to finish if signalAuthOnResponse { @@ -203,10 +180,10 @@ func advanceCallbackProgress(cb ProgressCallback, t *Transfer, numBytes int64) { remainder := numBytes - read if remainder > int64(maxInt) { read += int64(maxInt) - cb(t.Name, t.Object.Size, read, maxInt) + cb(t.Name, t.Size, read, maxInt) } else { read += remainder - cb(t.Name, t.Object.Size, read, int(remainder)) + cb(t.Name, t.Size, read, int(remainder)) } } diff --git a/tq/basic_download.go b/tq/basic_download.go index 98335876..766bca93 100644 --- a/tq/basic_download.go +++ b/tq/basic_download.go @@ -72,7 +72,7 @@ func (a *basicDownloadAdapter) checkResumeDownload(t *Transfer) (outFile *os.Fil f.Close() return nil, 0, nil, err } - tracerx.Printf("xfer: Attempting to resume download of %q from byte %d", t.Object.Oid, n) + tracerx.Printf("xfer: Attempting to resume download of %q from byte %d", t.Oid, n) return f, n, hash, nil } @@ -80,7 +80,7 @@ func (a *basicDownloadAdapter) checkResumeDownload(t *Transfer) (outFile *os.Fil // Create or open a download file for resuming func (a *basicDownloadAdapter) downloadFilename(t *Transfer) string { // Not a temp file since we will be resuming it - return filepath.Join(a.tempDir(), t.Object.Oid+".tmp") + return filepath.Join(a.tempDir(), t.Oid+".tmp") } // download starts or resumes and download. Always closes dlFile if non-nil @@ -91,9 +91,10 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk defer dlFile.Close() } - rel, ok := t.Object.Rel("download") - if !ok { - return errors.New("Object not found on the server.") + rel, err := t.Actions.Get("download") + if err != nil { + return err + // return errors.New("Object not found on the server.") } req, err := httputil.NewHttpRequest("GET", rel.Href, rel.Header) @@ -103,17 +104,17 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk if fromByte > 0 { if dlFile == nil || hash == nil { - return fmt.Errorf("Cannot restart %v from %d without a file & hash", t.Object.Oid, fromByte) + return fmt.Errorf("Cannot restart %v from %d without a file & hash", t.Oid, fromByte) } // We could just use a start byte, but since we know the length be specific - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", fromByte, t.Object.Size-1)) + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", fromByte, t.Size-1)) } - res, err := httputil.DoHttpRequest(config.Config, req, t.Object.NeedsAuth()) + res, err := httputil.DoHttpRequest(config.Config, req, !t.Authenticated) if err != nil { // Special-case status code 416 () - fall back if fromByte > 0 && dlFile != nil && res.StatusCode == 416 { - tracerx.Printf("xfer: server rejected resume download request for %q from byte %d; re-downloading from start", t.Object.Oid, fromByte) + tracerx.Printf("xfer: server rejected resume download request for %q from byte %d; re-downloading from start", t.Oid, fromByte) dlFile.Close() os.Remove(dlFile.Name()) return a.download(t, cb, authOkFunc, nil, 0, nil) @@ -150,11 +151,11 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk failReason = fmt.Sprintf("expected status code 206, received %d", res.StatusCode) } if rangeRequestOk { - tracerx.Printf("xfer: server accepted resume download request: %q from byte %d", t.Object.Oid, fromByte) + tracerx.Printf("xfer: server accepted resume download request: %q from byte %d", t.Oid, fromByte) advanceCallbackProgress(cb, t, fromByte) } else { // Abort resume, perform regular download - tracerx.Printf("xfer: failed to resume download for %q from byte %d: %s. Re-downloading from start", t.Object.Oid, fromByte, failReason) + tracerx.Printf("xfer: failed to resume download for %q from byte %d: %s. Re-downloading from start", t.Oid, fromByte, failReason) dlFile.Close() os.Remove(dlFile.Name()) if res.StatusCode == 200 { @@ -210,8 +211,8 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk return fmt.Errorf("can't close tempfile %q: %v", dlfilename, err) } - if actual := hasher.Hash(); actual != t.Object.Oid { - return fmt.Errorf("Expected OID %s, got %s after %d bytes written", t.Object.Oid, actual, written) + if actual := hasher.Hash(); actual != t.Oid { + return fmt.Errorf("Expected OID %s, got %s after %d bytes written", t.Oid, actual, written) } return tools.RenameFileCopyPermissions(dlfilename, t.Path) diff --git a/tq/basic_upload.go b/tq/basic_upload.go index 60f935d2..d03b9cea 100644 --- a/tq/basic_upload.go +++ b/tq/basic_upload.go @@ -1,17 +1,16 @@ package tq import ( - "fmt" "io" "io/ioutil" "os" "path/filepath" "strconv" - "github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/config" "github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/httputil" + "github.com/git-lfs/git-lfs/lfsapi" "github.com/git-lfs/git-lfs/progress" ) @@ -45,9 +44,10 @@ func (a *basicUploadAdapter) WorkerEnding(workerNum int, ctx interface{}) { } func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error { - rel, ok := t.Object.Rel("upload") - if !ok { - return fmt.Errorf("No upload action for this object.") + rel, err := t.Actions.Get("upload") + if err != nil { + return err + // return fmt.Errorf("No upload action for this object.") } req, err := httputil.NewHttpRequest("PUT", rel.Href, rel.Header) @@ -62,10 +62,10 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres if req.Header.Get("Transfer-Encoding") == "chunked" { req.TransferEncoding = []string{"chunked"} } else { - req.Header.Set("Content-Length", strconv.FormatInt(t.Object.Size, 10)) + req.Header.Set("Content-Length", strconv.FormatInt(t.Size, 10)) } - req.ContentLength = t.Object.Size + req.ContentLength = t.Size f, err := os.OpenFile(t.Path, os.O_RDONLY, 0644) if err != nil { @@ -84,7 +84,7 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres var reader io.Reader reader = &progress.CallbackReader{ C: ccb, - TotalSize: t.Object.Size, + TotalSize: t.Size, Reader: f, } @@ -97,7 +97,7 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres req.Body = ioutil.NopCloser(reader) - res, err := httputil.DoHttpRequest(config.Config, req, t.Object.NeedsAuth()) + res, err := httputil.DoHttpRequest(config.Config, req, !t.Authenticated) if err != nil { return errors.NewRetriableError(err) } @@ -117,7 +117,8 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres io.Copy(ioutil.Discard, res.Body) res.Body.Close() - return api.VerifyUpload(config.Config, t.Object) + cli := &lfsapi.Client{} + return verifyUpload(cli, t) } // startCallbackReader is a reader wrapper which calls a function as soon as the diff --git a/tq/custom.go b/tq/custom.go index 50b0bbe8..bacd46e5 100644 --- a/tq/custom.go +++ b/tq/custom.go @@ -4,7 +4,6 @@ import ( "bufio" "bytes" "encoding/json" - "errors" "fmt" "io" "os/exec" @@ -74,17 +73,17 @@ func NewCustomAdapterInitRequest(op string, concurrent bool, concurrentTransfers } type customAdapterTransferRequest struct { // common between upload/download - Event string `json:"event"` - Oid string `json:"oid"` - Size int64 `json:"size"` - Path string `json:"path,omitempty"` - Action *api.LinkRelation `json:"action"` + Event string `json:"event"` + Oid string `json:"oid"` + Size int64 `json:"size"` + Path string `json:"path,omitempty"` + Action *Action `json:"action"` } -func NewCustomAdapterUploadRequest(oid string, size int64, path string, action *api.LinkRelation) *customAdapterTransferRequest { +func NewCustomAdapterUploadRequest(oid string, size int64, path string, action *Action) *customAdapterTransferRequest { return &customAdapterTransferRequest{"upload", oid, size, path, action} } -func NewCustomAdapterDownloadRequest(oid string, size int64, action *api.LinkRelation) *customAdapterTransferRequest { +func NewCustomAdapterDownloadRequest(oid string, size int64, action *Action) *customAdapterTransferRequest { return &customAdapterTransferRequest{"download", oid, size, "", action} } @@ -98,26 +97,24 @@ func NewCustomAdapterTerminateRequest() *customAdapterTerminateRequest { // A common struct that allows all types of response to be identified type customAdapterResponseMessage struct { - Event string `json:"event"` - Error *api.ObjectError `json:"error"` - Oid string `json:"oid"` - Path string `json:"path,omitempty"` // always blank for upload - BytesSoFar int64 `json:"bytesSoFar"` - BytesSinceLast int `json:"bytesSinceLast"` + Event string `json:"event"` + Error *ObjectError `json:"error"` + Oid string `json:"oid"` + Path string `json:"path,omitempty"` // always blank for upload + BytesSoFar int64 `json:"bytesSoFar"` + BytesSinceLast int `json:"bytesSinceLast"` } -func (a *customAdapter) Begin(maxConcurrency int, cb ProgressCallback) error { - // If config says not to launch multiple processes, downgrade incoming value - useConcurrency := maxConcurrency - if !a.concurrent { - useConcurrency = 1 +func (a *customAdapter) Begin(cfg AdapterConfig, cb ProgressCallback) error { + a.originalConcurrency = cfg.ConcurrentTransfers() + if a.concurrent { + // Use common workers impl, but downgrade workers to number of processes + return a.adapterBase.Begin(cfg, cb) } - a.originalConcurrency = maxConcurrency - tracerx.Printf("xfer: Custom transfer adapter %q using concurrency %d", a.name, useConcurrency) - - // Use common workers impl, but downgrade workers to number of processes - return a.adapterBase.Begin(useConcurrency, cb) + // If config says not to launch multiple processes, downgrade incoming value + newCfg := &Manifest{concurrentTransfers: 1} + return a.adapterBase.Begin(newCfg, cb) } func (a *customAdapter) ClearTempStorage() error { @@ -268,18 +265,18 @@ func (a *customAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCall } var authCalled bool - rel, ok := t.Object.Rel(a.getOperationName()) - if !ok { - return errors.New("Object not found on the server.") + rel, err := t.Actions.Get(a.getOperationName()) + if err != nil { + return err + // return errors.New("Object not found on the server.") } var req *customAdapterTransferRequest if a.direction == Upload { - req = NewCustomAdapterUploadRequest(t.Object.Oid, t.Object.Size, t.Path, rel) + req = NewCustomAdapterUploadRequest(t.Oid, t.Size, t.Path, rel) } else { - req = NewCustomAdapterDownloadRequest(t.Object.Oid, t.Object.Size, rel) + req = NewCustomAdapterDownloadRequest(t.Oid, t.Size, rel) } - err := a.sendMessage(customCtx, req) - if err != nil { + if err = a.sendMessage(customCtx, req); err != nil { return err } @@ -294,24 +291,24 @@ func (a *customAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCall switch resp.Event { case "progress": // Progress - if resp.Oid != t.Object.Oid { - return fmt.Errorf("Unexpected oid %q in response, expecting %q", resp.Oid, t.Object.Oid) + if resp.Oid != t.Oid { + return fmt.Errorf("Unexpected oid %q in response, expecting %q", resp.Oid, t.Oid) } if cb != nil { - cb(t.Name, t.Object.Size, resp.BytesSoFar, resp.BytesSinceLast) + cb(t.Name, t.Size, resp.BytesSoFar, resp.BytesSinceLast) } wasAuthOk = resp.BytesSoFar > 0 case "complete": // Download/Upload complete - if resp.Oid != t.Object.Oid { - return fmt.Errorf("Unexpected oid %q in response, expecting %q", resp.Oid, t.Object.Oid) + if resp.Oid != t.Oid { + return fmt.Errorf("Unexpected oid %q in response, expecting %q", resp.Oid, t.Oid) } if resp.Error != nil { - return fmt.Errorf("Error transferring %q: %v", t.Object.Oid, resp.Error) + return fmt.Errorf("Error transferring %q: %v", t.Oid, resp.Error) } if a.direction == Download { // So we don't have to blindly trust external providers, check SHA - if err = tools.VerifyFileHash(t.Object.Oid, resp.Path); err != nil { + if err = tools.VerifyFileHash(t.Oid, resp.Path); err != nil { return fmt.Errorf("Downloaded file failed checks: %v", err) } // Move file to final location @@ -319,7 +316,7 @@ func (a *customAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCall return fmt.Errorf("Failed to copy downloaded file: %v", err) } } else if a.direction == Upload { - if err = api.VerifyUpload(config.Config, t.Object); err != nil { + if err = api.VerifyUpload(config.Config, toApiObject(t)); err != nil { return err } } diff --git a/tq/transfer.go b/tq/transfer.go index 1af8af20..84135f43 100644 --- a/tq/transfer.go +++ b/tq/transfer.go @@ -2,7 +2,13 @@ // NOTE: Subject to change, do not rely on this package from outside git-lfs source package tq -import "github.com/git-lfs/git-lfs/api" +import ( + "fmt" + "time" + + "github.com/git-lfs/git-lfs/api" + "github.com/git-lfs/git-lfs/errors" +) type Direction int @@ -11,6 +17,137 @@ const ( Download = Direction(iota) ) +type Transfer struct { + Name string `json:"name"` + Oid string `json:"oid,omitempty"` + Size int64 `json:"size"` + Authenticated bool `json:"authenticated,omitempty"` + Actions ActionSet `json:"actions,omitempty"` + Error *ObjectError `json:"error,omitempty"` + Path string `json:"path"` +} + +type ObjectError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// newTransfer creates a new Transfer instance +func newTransfer(name string, obj *api.ObjectResource, path string) *Transfer { + t := &Transfer{ + Name: name, + Oid: obj.Oid, + Size: obj.Size, + Authenticated: obj.Authenticated, + Actions: make(ActionSet), + Path: path, + } + + if obj.Error != nil { + t.Error = &ObjectError{ + Code: obj.Error.Code, + Message: obj.Error.Message, + } + } + + for rel, action := range obj.Actions { + t.Actions[rel] = &Action{ + Href: action.Href, + Header: action.Header, + ExpiresAt: action.ExpiresAt, + } + } + + return t + +} + +type Action struct { + Href string `json:"href"` + Header map[string]string `json:"header,omitempty"` + ExpiresAt time.Time `json:"expires_at,omitempty"` +} + +type ActionSet map[string]*Action + +const ( + // objectExpirationToTransfer is the duration we expect to have passed + // from the time that the object's expires_at property is checked to + // when the transfer is executed. + objectExpirationToTransfer = 5 * time.Second +) + +func (as ActionSet) Get(rel string) (*Action, error) { + a, ok := as[rel] + if !ok { + return nil, &ActionMissingError{Rel: rel} + } + + if !a.ExpiresAt.IsZero() && a.ExpiresAt.Before(time.Now().Add(objectExpirationToTransfer)) { + return nil, errors.NewRetriableError(&ActionExpiredErr{Rel: rel, At: a.ExpiresAt}) + } + + return a, nil +} + +type ActionExpiredErr struct { + Rel string + At time.Time +} + +func (e ActionExpiredErr) Error() string { + return fmt.Sprintf("tq: action %q expires at %s", + e.Rel, e.At.In(time.Local).Format(time.RFC822)) +} + +type ActionMissingError struct { + Rel string +} + +func (e ActionMissingError) Error() string { + return fmt.Sprintf("tq: unable to find action %q", e.Rel) +} + +func IsActionExpiredError(err error) bool { + if _, ok := err.(*ActionExpiredErr); ok { + return true + } + return false +} + +func IsActionMissingError(err error) bool { + if _, ok := err.(*ActionMissingError); ok { + return true + } + return false +} + +func toApiObject(t *Transfer) *api.ObjectResource { + o := &api.ObjectResource{ + Oid: t.Oid, + Size: t.Size, + Authenticated: t.Authenticated, + Actions: make(map[string]*api.LinkRelation), + } + + for rel, a := range t.Actions { + o.Actions[rel] = &api.LinkRelation{ + Href: a.Href, + Header: a.Header, + ExpiresAt: a.ExpiresAt, + } + } + + if t.Error != nil { + o.Error = &api.ObjectError{ + Code: t.Error.Code, + Message: t.Error.Message, + } + } + + return o +} + // NewAdapterFunc creates new instances of Adapter. Code that wishes // to provide new Adapter instances should pass an implementation of this // function to RegisterNewTransferAdapterFunc() on a *Manifest. @@ -19,6 +156,10 @@ type NewAdapterFunc func(name string, dir Direction) Adapter type ProgressCallback func(name string, totalSize, readSoFar int64, readSinceLast int) error +type AdapterConfig interface { + ConcurrentTransfers() int +} + // Adapter is implemented by types which can upload and/or download LFS // file content to a remote store. Each Adapter accepts one or more requests // which it may schedule and parallelise in whatever way it chooses, clients of @@ -43,7 +184,7 @@ type Adapter interface { // one or more Add calls. maxConcurrency controls the number of transfers // that may be done at once. The passed in callback will receive updates on // progress. Either argument may be nil if not required by the client. - Begin(maxConcurrency int, cb ProgressCallback) error + Begin(cfg AdapterConfig, cb ProgressCallback) error // Add queues a download/upload, which will complete asynchronously and // notify the callbacks given to Begin() Add(transfers ...*Transfer) (results <-chan TransferResult) @@ -56,22 +197,6 @@ type Adapter interface { ClearTempStorage() error } -// General struct for both uploads and downloads -type Transfer struct { - // Name of the file that triggered this transfer - Name string - // Object from API which provides the core data for this transfer - Object *api.ObjectResource - // Path for uploads is the source of data to send, for downloads is the - // location to place the final result - Path string -} - -// NewTransfer creates a new Transfer instance -func NewTransfer(name string, obj *api.ObjectResource, path string) *Transfer { - return &Transfer{name, obj, path} -} - // Result of a transfer returned through CompletionChannel() type TransferResult struct { Transfer *Transfer diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index 15b8f771..febd9f38 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -15,15 +15,6 @@ const ( defaultBatchSize = 100 ) -type Transferable interface { - Oid() string - Size() int64 - Name() string - Path() string - Object() *api.ObjectResource - SetObject(*api.ObjectResource) -} - type retryCounter struct { MaxRetries int `git:"lfs.transfer.maxretries"` @@ -71,31 +62,32 @@ func (r *retryCounter) CanRetry(oid string) (int, bool) { return count, count < r.MaxRetries } -// Batch implements the sort.Interface interface and enables sorting on a slice -// of `Transferable`s by object size. +// batch implements the sort.Interface interface and enables sorting on a slice +// of `*Transfer`s by object size. // // This interface is implemented here so that the largest objects can be // processed first. Since adding a new batch is unable to occur until the // current batch has finished processing, this enables us to reduce the risk of // a single worker getting tied up on a large item at the end of a batch while // all other workers are sitting idle. -type Batch []Transferable +type batch []*objectTuple -func (b Batch) ApiObjects() []*api.ObjectResource { +func (b batch) ApiObjects() []*api.ObjectResource { transfers := make([]*api.ObjectResource, 0, len(b)) for _, t := range b { - transfers = append(transfers, &api.ObjectResource{ - Oid: t.Oid(), - Size: t.Size(), - }) + transfers = append(transfers, tupleToApiObject(t)) } return transfers } -func (b Batch) Len() int { return len(b) } -func (b Batch) Less(i, j int) bool { return b[i].Size() < b[j].Size() } -func (b Batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func tupleToApiObject(t *objectTuple) *api.ObjectResource { + return &api.ObjectResource{Oid: t.Oid, Size: t.Size} +} + +func (b batch) Len() int { return len(b) } +func (b batch) Less(i, j int) bool { return b[i].Size < b[j].Size } +func (b batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] } // TransferQueue organises the wider process of uploading and downloading, // including calling the API, passing the actual transfer request to transfer @@ -108,11 +100,11 @@ type TransferQueue struct { dryRun bool meter progress.Meter errors []error - transferables map[string]Transferable + transfers map[string]*objectTuple batchSize int bufferDepth int // Channel for processing (and buffering) incoming items - incoming chan Transferable + incoming chan *objectTuple errorc chan error // Channel for processing errors watchers []chan string trMutex *sync.Mutex @@ -127,6 +119,11 @@ type TransferQueue struct { rc *retryCounter } +type objectTuple struct { + Name, Path, Oid string + Size int64 +} + type Option func(*TransferQueue) func DryRun(dryRun bool) Option { @@ -152,12 +149,12 @@ func WithBufferDepth(depth int) Option { // NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *TransferQueue { q := &TransferQueue{ - direction: dir, - errorc: make(chan error), - transferables: make(map[string]Transferable), - trMutex: &sync.Mutex{}, - manifest: manifest, - rc: newRetryCounter(), + direction: dir, + errorc: make(chan error), + transfers: make(map[string]*objectTuple), + trMutex: &sync.Mutex{}, + manifest: manifest, + rc: newRetryCounter(), } for _, opt := range options { @@ -173,7 +170,7 @@ func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *Tra q.bufferDepth = q.batchSize } - q.incoming = make(chan Transferable, q.bufferDepth) + q.incoming = make(chan *objectTuple, q.bufferDepth) if q.meter == nil { q.meter = progress.Noop() @@ -186,28 +183,35 @@ func NewTransferQueue(dir Direction, manifest *Manifest, options ...Option) *Tra return q } -// Add adds a Transferable to the transfer queue. It only increments the amount -// of waiting the TransferQueue has to do if the Transferable "t" is new. -func (q *TransferQueue) Add(t Transferable) { +// Add adds a *Transfer to the transfer queue. It only increments the amount +// of waiting the TransferQueue has to do if the *Transfer "t" is new. +func (q *TransferQueue) Add(name, path, oid string, size int64) { + t := &objectTuple{ + Name: name, + Path: path, + Oid: oid, + Size: size, + } + if isNew := q.remember(t); !isNew { - tracerx.Printf("already transferring %q, skipping duplicate", t.Oid()) + tracerx.Printf("already transferring %q, skipping duplicate", t.Oid) return } q.incoming <- t } -// remember remembers the Transferable "t" if the *TransferQueue doesn't already -// know about a Transferable with the same OID. +// remember remembers the *Transfer "t" if the *TransferQueue doesn't already +// know about a Transfer with the same OID. // // It returns if the value is new or not. -func (q *TransferQueue) remember(t Transferable) bool { +func (q *TransferQueue) remember(t *objectTuple) bool { q.trMutex.Lock() defer q.trMutex.Unlock() - if _, ok := q.transferables[t.Oid()]; !ok { + if _, ok := q.transfers[t.Oid]; !ok { q.wait.Add(1) - q.transferables[t.Oid()] = t + q.transfers[t.Oid] = t return true } @@ -221,7 +225,7 @@ func (q *TransferQueue) remember(t Transferable) bool { // 2. While the batch contains less items than `q.batchSize` AND the channel // is open, read one item from the `q.incoming` channel. // a. If the read was a channel close, go to step 4. -// b. If the read was a Transferable item, go to step 3. +// b. If the read was a TransferTransferable item, go to step 3. // 3. Append the item to the batch. // 4. Sort the batch by descending object size, make a batch API call, send // the items to the `*adapterBase`. @@ -276,7 +280,7 @@ func (q *TransferQueue) collectBatches() { // // enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been // processed. -func (q *TransferQueue) enqueueAndCollectRetriesFor(batch Batch) (Batch, error) { +func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) { cfg := config.Config next := q.makeBatch() @@ -293,8 +297,8 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch Batch) (Batch, error) // that was encountered. If any of the objects couldn't be // retried, they will be marked as failed. for _, t := range batch { - if q.canRetryObject(t.Oid(), err) { - q.rc.Increment(t.Oid()) + if q.canRetryObject(t.Oid, err) { + q.rc.Increment(t.Oid) next = append(next, t) } else { @@ -319,47 +323,51 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch Batch) (Batch, error) continue } - if _, needsTransfer := o.Rel(q.transferKind()); needsTransfer { - // If the object has a link relation for the kind of - // transfer that we want to perform, grab a Transferable - // that matches the object's OID. - q.trMutex.Lock() - t, ok := q.transferables[o.Oid] - q.trMutex.Unlock() + q.trMutex.Lock() + t, ok := q.transfers[o.Oid] + q.trMutex.Unlock() + if !ok { + // If we couldn't find any associated + // Transfer object, then we give up on the + // transfer by telling the progress meter to + // skip the number of bytes in "o". + q.errorc <- errors.Errorf("[%v] The server returned an unknown OID.", o.Oid) - if ok { - // If we knew about an associated Transferable, - // begin the transfer. - t.SetObject(o) - q.meter.StartTransfer(t.Name()) - - toTransfer = append(toTransfer, NewTransfer( - t.Name(), t.Object(), t.Path(), - )) - } else { - // If we couldn't find any associated - // Transferable object, then we give up on the - // transfer by telling the progress meter to - // skip the number of bytes in "o". - q.errorc <- errors.Errorf("[%v] The server returned an unknown OID.", o.Oid) - - q.Skip(o.Size) - q.wait.Done() - } - } else { - // Otherwise, if the object didn't need a transfer at - // all, skip and decrement it. q.Skip(o.Size) q.wait.Done() + } else { + tr := newTransfer(t.Name, o, t.Path) + + if _, err := tr.Actions.Get(q.transferKind()); err != nil { + // XXX(taylor): duplication + if q.canRetryObject(tr.Oid, err) { + q.rc.Increment(tr.Oid) + count := q.rc.CountFor(tr.Oid) + + tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, tr.Oid, tr.Size) + next = append(next, t) + } else { + if !IsActionMissingError(err) { + q.errorc <- errors.Errorf("[%v] %v", tr.Name, err) + } + + q.Skip(o.Size) + q.wait.Done() + } + + } else { + q.meter.StartTransfer(t.Name) + toTransfer = append(toTransfer, tr) + } } } retries := q.addToAdapter(toTransfer) for t := range retries { - q.rc.Increment(t.Oid()) - count := q.rc.CountFor(t.Oid()) + q.rc.Increment(t.Oid) + count := q.rc.CountFor(t.Oid) - tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid(), t.Size()) + tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid, t.Size) next = append(next, t) } @@ -369,23 +377,23 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch Batch) (Batch, error) // makeBatch returns a new, empty batch, with a capacity equal to the maximum // batch size designated by the `*TransferQueue`. -func (q *TransferQueue) makeBatch() Batch { return make(Batch, 0, q.batchSize) } +func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) } // addToAdapter adds the given "pending" transfers to the transfer adapters and -// returns a channel of Transferables that are to be retried in the next batch. +// returns a channel of Transfers that are to be retried in the next batch. // After all of the items in the batch have been processed, the channel is // closed. // // addToAdapter returns immediately, and does not block. -func (q *TransferQueue) addToAdapter(pending []*Transfer) <-chan Transferable { - retries := make(chan Transferable, len(pending)) +func (q *TransferQueue) addToAdapter(pending []*Transfer) <-chan *objectTuple { + retries := make(chan *objectTuple, len(pending)) if err := q.ensureAdapterBegun(); err != nil { close(retries) q.errorc <- err for _, t := range pending { - q.Skip(t.Object.Size) + q.Skip(t.Size) q.wait.Done() } @@ -426,9 +434,9 @@ func (q *TransferQueue) makeDryRunResults(ts []*Transfer) <-chan TransferResult // handleTransferResult observes the transfer result, sending it on the retries // channel if it was able to be retried. func (q *TransferQueue) handleTransferResult( - res TransferResult, retries chan<- Transferable, + res TransferResult, retries chan<- *objectTuple, ) { - oid := res.Transfer.Object.Oid + oid := res.Transfer.Oid if res.Error != nil { // If there was an error encountered when processing the @@ -441,7 +449,7 @@ func (q *TransferQueue) handleTransferResult( tracerx.Printf("tq: retrying object %s", oid) q.trMutex.Lock() - t, ok := q.transferables[oid] + t, ok := q.transfers[oid] q.trMutex.Unlock() if ok { @@ -522,7 +530,7 @@ func (q *TransferQueue) ensureAdapterBegun() error { } tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name()) - err := q.adapter.Begin(q.manifest.ConcurrentTransfers(), cb) + err := q.adapter.Begin(q.manifest, cb) if err != nil { return err } @@ -532,7 +540,7 @@ func (q *TransferQueue) ensureAdapterBegun() error { } // Wait waits for the queue to finish processing all transfers. Once Wait is -// called, Add will no longer add transferables to the queue. Any failed +// called, Add will no longer add transfers to the queue. Any failed // transfers will be automatically retried once. func (q *TransferQueue) Wait() { close(q.incoming) diff --git a/tq/transfer_test.go b/tq/transfer_test.go index ad4dc862..96640146 100644 --- a/tq/transfer_test.go +++ b/tq/transfer_test.go @@ -21,7 +21,7 @@ func (a *testAdapter) Direction() Direction { return a.dir } -func (a *testAdapter) Begin(maxConcurrency int, cb ProgressCallback) error { +func (a *testAdapter) Begin(cfg AdapterConfig, cb ProgressCallback) error { return nil } diff --git a/tq/tus_upload.go b/tq/tus_upload.go index a4801fee..dced036e 100644 --- a/tq/tus_upload.go +++ b/tq/tus_upload.go @@ -37,9 +37,10 @@ func (a *tusUploadAdapter) WorkerEnding(workerNum int, ctx interface{}) { } func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error { - rel, ok := t.Object.Rel("upload") - if !ok { - return fmt.Errorf("No upload action for this object.") + rel, err := t.Actions.Get("upload") + if err != nil { + return err + // return fmt.Errorf("No upload action for this object.") } // Note not supporting the Creation extension since the batch API generates URLs @@ -47,7 +48,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC // 1. Send HEAD request to determine upload start point // Request must include Tus-Resumable header (version) - tracerx.Printf("xfer: sending tus.io HEAD request for %q", t.Object.Oid) + tracerx.Printf("xfer: sending tus.io HEAD request for %q", t.Oid) req, err := httputil.NewHttpRequest("HEAD", rel.Href, rel.Header) if err != nil { return err @@ -69,9 +70,9 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC } // Upload-Offset=size means already completed (skip) // Batch API will probably already detect this, but handle just in case - if offset >= t.Object.Size { - tracerx.Printf("xfer: tus.io HEAD offset %d indicates %q is already fully uploaded, skipping", offset, t.Object.Oid) - advanceCallbackProgress(cb, t, t.Object.Size) + if offset >= t.Size { + tracerx.Printf("xfer: tus.io HEAD offset %d indicates %q is already fully uploaded, skipping", offset, t.Oid) + advanceCallbackProgress(cb, t, t.Size) return nil } @@ -84,9 +85,9 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC // Upload-Offset=0 means start from scratch, but still send PATCH if offset == 0 { - tracerx.Printf("xfer: tus.io uploading %q from start", t.Object.Oid) + tracerx.Printf("xfer: tus.io uploading %q from start", t.Oid) } else { - tracerx.Printf("xfer: tus.io resuming upload %q from %d", t.Object.Oid, offset) + tracerx.Printf("xfer: tus.io resuming upload %q from %d", t.Oid, offset) advanceCallbackProgress(cb, t, offset) _, err := f.Seek(offset, os.SEEK_CUR) if err != nil { @@ -99,7 +100,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC // Response Upload-Offset must be request Upload-Offset plus sent bytes // Response may include Upload-Expires header in which case check not passed - tracerx.Printf("xfer: sending tus.io PATCH request for %q", t.Object.Oid) + tracerx.Printf("xfer: sending tus.io PATCH request for %q", t.Oid) req, err = httputil.NewHttpRequest("PATCH", rel.Href, rel.Header) if err != nil { return err @@ -107,8 +108,8 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC req.Header.Set("Tus-Resumable", TusVersion) req.Header.Set("Upload-Offset", strconv.FormatInt(offset, 10)) req.Header.Set("Content-Type", "application/offset+octet-stream") - req.Header.Set("Content-Length", strconv.FormatInt(t.Object.Size-offset, 10)) - req.ContentLength = t.Object.Size - offset + req.Header.Set("Content-Length", strconv.FormatInt(t.Size-offset, 10)) + req.ContentLength = t.Size - offset // Ensure progress callbacks made while uploading // Wrap callback to give name context @@ -121,7 +122,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC var reader io.Reader reader = &progress.CallbackReader{ C: ccb, - TotalSize: t.Object.Size, + TotalSize: t.Size, Reader: f, } @@ -154,7 +155,7 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC io.Copy(ioutil.Discard, res.Body) res.Body.Close() - return api.VerifyUpload(config.Config, t.Object) + return api.VerifyUpload(config.Config, toApiObject(t)) } func configureTusAdapter(m *Manifest) { diff --git a/tq/verify.go b/tq/verify.go new file mode 100644 index 00000000..ac0654b8 --- /dev/null +++ b/tq/verify.go @@ -0,0 +1,44 @@ +package tq + +import ( + "bytes" + "encoding/json" + "net/http" + + "github.com/git-lfs/git-lfs/lfsapi" +) + +func verifyUpload(c *lfsapi.Client, t *Transfer) error { + action, err := t.Actions.Get("verify") + if err != nil { + if IsActionMissingError(err) { + return nil + } + return err + } + + by, err := json.Marshal(struct { + Oid string `json:"oid"` + Size int64 `json:"size"` + }{Oid: t.Oid, Size: t.Size}) + if err != nil { + return err + } + + req, err := http.NewRequest("POST", action.Href, bytes.NewReader(by)) + if err != nil { + return err + } + + for key, value := range action.Header { + req.Header.Set(key, value) + } + req.Header.Set("Content-Type", "application/vnd.git-lfs+json") + + res, err := c.Do(req) + if err != nil { + return err + } + + return res.Body.Close() +} diff --git a/tq/verify_test.go b/tq/verify_test.go new file mode 100644 index 00000000..205781e0 --- /dev/null +++ b/tq/verify_test.go @@ -0,0 +1,274 @@ +package tq + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + + "github.com/git-lfs/git-lfs/errors" + "github.com/git-lfs/git-lfs/lfsapi" + "github.com/stretchr/testify/assert" +) + +func TestVerifyWithoutAction(t *testing.T) { + c := &lfsapi.Client{} + tr := &Transfer{ + Oid: "abc", + Size: 123, + } + + assert.Nil(t, verifyUpload(c, tr)) +} + +func TestVerifySuccess(t *testing.T) { + var called uint32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.String() != "/verify" { + w.WriteHeader(http.StatusNotFound) + return + } + + atomic.AddUint32(&called, 1) + + var tr Transfer + err := json.NewDecoder(r.Body).Decode(&tr) + + assert.Nil(t, err) + assert.Equal(t, "abc", tr.Oid) + assert.EqualValues(t, 123, tr.Size) + assert.Equal(t, "bar", r.Header.Get("Foo")) + assert.Equal(t, "application/vnd.git-lfs+json", r.Header.Get("Content-Type")) + assert.Equal(t, "24", r.Header.Get("Content-Length")) + })) + defer srv.Close() + + c := &lfsapi.Client{} + tr := &Transfer{ + Oid: "abc", + Size: 123, + Actions: map[string]*Action{ + "verify": &Action{ + Href: srv.URL + "/verify", + Header: map[string]string{ + "foo": "bar", + }, + }, + }, + } + + assert.Nil(t, verifyUpload(c, tr)) + assert.EqualValues(t, 1, called) +} + +func TestVerifyAuthErrWithBody(t *testing.T) { + var called uint32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.String() != "/verify" { + w.WriteHeader(http.StatusNotFound) + return + } + + atomic.AddUint32(&called, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(401) + w.Write([]byte(`{"message":"custom auth error"}`)) + })) + defer srv.Close() + + c := &lfsapi.Client{} + tr := &Transfer{ + Oid: "abc", + Size: 123, + Actions: map[string]*Action{ + "verify": &Action{ + Href: srv.URL + "/verify", + }, + }, + } + + err := verifyUpload(c, tr) + assert.NotNil(t, err) + assert.True(t, errors.IsAuthError(err)) + assert.Equal(t, "Authentication required: http: custom auth error", err.Error()) + assert.EqualValues(t, 1, called) +} + +func TestVerifyFatalWithBody(t *testing.T) { + var called uint32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.String() != "/verify" { + w.WriteHeader(http.StatusNotFound) + return + } + + atomic.AddUint32(&called, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(500) + w.Write([]byte(`{"message":"custom fatal error"}`)) + })) + defer srv.Close() + + c := &lfsapi.Client{} + tr := &Transfer{ + Oid: "abc", + Size: 123, + Actions: map[string]*Action{ + "verify": &Action{ + Href: srv.URL + "/verify", + }, + }, + } + + err := verifyUpload(c, tr) + assert.NotNil(t, err) + assert.True(t, errors.IsFatalError(err)) + assert.Equal(t, "Fatal error: http: custom fatal error", err.Error()) + assert.EqualValues(t, 1, called) +} + +func TestVerifyWithNonFatal500WithBody(t *testing.T) { + c := &lfsapi.Client{} + tr := &Transfer{ + Oid: "abc", + Size: 123, + Actions: map[string]*Action{ + "verify": &Action{}, + }, + } + + var called uint32 + + nonFatalCodes := map[int]string{ + 501: "custom 501 error", + 507: "custom 507 error", + 509: "custom 509 error", + } + + for nonFatalCode, expectedErr := range nonFatalCodes { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.String() != "/verify" { + w.WriteHeader(http.StatusNotFound) + return + } + + atomic.AddUint32(&called, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(nonFatalCode) + w.Write([]byte(`{"message":"` + expectedErr + `"}`)) + })) + + tr.Actions["verify"].Href = srv.URL + "/verify" + err := verifyUpload(c, tr) + t.Logf("non fatal code %d", nonFatalCode) + assert.NotNil(t, err) + assert.Equal(t, "http: "+expectedErr, err.Error()) + srv.Close() + } + + assert.EqualValues(t, 3, called) +} + +func TestVerifyAuthErrWithoutBody(t *testing.T) { + var called uint32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.String() != "/verify" { + w.WriteHeader(http.StatusNotFound) + return + } + + atomic.AddUint32(&called, 1) + w.WriteHeader(401) + })) + defer srv.Close() + + c := &lfsapi.Client{} + tr := &Transfer{ + Oid: "abc", + Size: 123, + Actions: map[string]*Action{ + "verify": &Action{ + Href: srv.URL + "/verify", + }, + }, + } + + err := verifyUpload(c, tr) + assert.NotNil(t, err) + assert.True(t, errors.IsAuthError(err)) + assert.True(t, strings.HasPrefix(err.Error(), "Authentication required: Authorization error:"), err.Error()) + assert.EqualValues(t, 1, called) +} + +func TestVerifyFatalWithoutBody(t *testing.T) { + var called uint32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.String() != "/verify" { + w.WriteHeader(http.StatusNotFound) + return + } + + atomic.AddUint32(&called, 1) + w.WriteHeader(500) + })) + defer srv.Close() + + c := &lfsapi.Client{} + tr := &Transfer{ + Oid: "abc", + Size: 123, + Actions: map[string]*Action{ + "verify": &Action{ + Href: srv.URL + "/verify", + }, + }, + } + + err := verifyUpload(c, tr) + assert.NotNil(t, err) + assert.True(t, errors.IsFatalError(err)) + assert.True(t, strings.HasPrefix(err.Error(), "Fatal error: Server error:"), err.Error()) + assert.EqualValues(t, 1, called) +} + +func TestVerifyWithNonFatal500WithoutBody(t *testing.T) { + c := &lfsapi.Client{} + tr := &Transfer{ + Oid: "abc", + Size: 123, + Actions: map[string]*Action{ + "verify": &Action{}, + }, + } + + var called uint32 + + nonFatalCodes := map[int]string{ + 501: "Not Implemented:", + 507: "Insufficient server storage:", + 509: "Bandwidth limit exceeded:", + } + + for nonFatalCode, errPrefix := range nonFatalCodes { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.String() != "/verify" { + w.WriteHeader(http.StatusNotFound) + return + } + + atomic.AddUint32(&called, 1) + w.WriteHeader(nonFatalCode) + })) + + tr.Actions["verify"].Href = srv.URL + "/verify" + err := verifyUpload(c, tr) + t.Logf("non fatal code %d", nonFatalCode) + assert.NotNil(t, err) + assert.True(t, strings.HasPrefix(err.Error(), errPrefix)) + srv.Close() + } + + assert.EqualValues(t, 3, called) +} diff --git a/update-version.sh b/update-version.sh deleted file mode 100755 index a1d2292e..00000000 --- a/update-version.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash - -VERSION_STRING=$1 -VERSION_ARRAY=( ${VERSION_STRING//./ } ) -VERSION_MAJOR=${VERSION_ARRAY[0]} -VERSION_MINOR=${VERSION_ARRAY[1]} -VERSION_PATCH=${VERSION_ARRAY[2]} - -# Update the version number git-lfs is reporting. -sed -i "s,\(Version = \"\).*\(\"\),\1$VERSION_STRING\2," config/version.go - -# Update the version number in the RPM package. -sed -i "s,\(Version:[[:space:]]*\).*,\1$VERSION_STRING," rpm/SPECS/git-lfs.spec - -# Update the version numbers in the Windows installer. -sed -i "s,\(FILEVERSION \).*,\1$VERSION_MAJOR\,$VERSION_MINOR\,$VERSION_PATCH\,0," script/windows-installer/resources.rc -sed -i "s,\([[:space:]]*VALUE \"ProductVersion\"\, \"\).*\(\\\\0\"\),\1$VERSION_STRING\2," script/windows-installer/resources.rc diff --git a/versioninfo.json b/versioninfo.json new file mode 100644 index 00000000..3d1e5ec7 --- /dev/null +++ b/versioninfo.json @@ -0,0 +1,18 @@ +{ + "FixedFileInfo": + { + "FileVersion": { + "Major": 1, + "Minor": 5, + "Patch": 0, + "Build": 0 + } + }, + "StringFileInfo": + { + "FileDescription": "Git LFS", + "LegalCopyright": "GitHub, Inc. and Git LFS contributors", + "ProductName": "Git Large File Storage (LFS)", + "ProductVersion": "1.5.0" + } +}