Merge pull request #285 from github/multitransfer

Endpoint for batch upload/download operations
This commit is contained in:
risk danger olson 2015-05-28 10:50:31 -06:00
commit ac67b68440
24 changed files with 1187 additions and 251 deletions

@ -68,7 +68,7 @@ func PerformanceSince(what string, t time.Time) {
func PerformanceSinceKey(key, what string, t time.Time) { func PerformanceSinceKey(key, what string, t time.Time) {
tracer := getTracer(key) tracer := getTracer(key)
if tracer.enabled && tracer.performance { if tracer.performance {
since := time.Since(t) since := time.Since(t)
fmt.Fprintf(tracer.w, "performance %s: %.9f s\n", what, since.Seconds()) fmt.Fprintf(tracer.w, "performance %s: %.9f s\n", what, since.Seconds())
} }
@ -114,7 +114,7 @@ func initializeTracer(key string) *tracer {
trace := os.Getenv(fmt.Sprintf("%s_TRACE", key)) trace := os.Getenv(fmt.Sprintf("%s_TRACE", key))
if trace == "" || strings.ToLower(trace) == "false" { if trace == "" || strings.ToLower(trace) == "false" {
return tracer tracer.enabled = false
} }
perf := os.Getenv(fmt.Sprintf("%s_TRACE_PERFORMANCE", key)) perf := os.Getenv(fmt.Sprintf("%s_TRACE_PERFORMANCE", key))
@ -122,6 +122,10 @@ func initializeTracer(key string) *tracer {
tracer.performance = true tracer.performance = true
} }
if !tracer.enabled && !tracer.performance {
return tracer
}
fd, err := strconv.Atoi(trace) fd, err := strconv.Atoi(trace)
if err != nil { if err != nil {
// Not a number, it could be a path for a log file // Not a number, it could be a path for a log file

2
Godeps

@ -6,4 +6,4 @@ github.com/olekukonko/ts ecf753e7c962639ab5a1fb46f7da627d4c
github.com/spf13/cobra 864687ae689edc28688c67edef47e3d2ad651a1b github.com/spf13/cobra 864687ae689edc28688c67edef47e3d2ad651a1b
github.com/spf13/pflag 463bdc838f2b35e9307e91d480878bda5fff7232 github.com/spf13/pflag 463bdc838f2b35e9307e91d480878bda5fff7232
github.com/technoweenie/go-contentaddressable 38171def3cd15e3b76eb156219b3d48704643899 github.com/technoweenie/go-contentaddressable 38171def3cd15e3b76eb156219b3d48704643899
github.com/rubyist/tracerx f6aa9369b3277bc21384878e8279642da722f407 github.com/rubyist/tracerx 51cd50e73e07cc41c22caec66af15313dff1aebb

113
commands/command_get.go Normal file

@ -0,0 +1,113 @@
package commands
import (
"os"
"os/exec"
"time"
"github.com/github/git-lfs/git"
"github.com/github/git-lfs/lfs"
"github.com/rubyist/tracerx"
"github.com/spf13/cobra"
)
var (
getCmd = &cobra.Command{
Use: "get",
Short: "get",
Run: getCommand,
}
)
func getCommand(cmd *cobra.Command, args []string) {
var ref string
var err error
if len(args) == 1 {
ref = args[0]
} else {
ref, err = git.CurrentRef()
if err != nil {
Panic(err, "Could not get")
}
}
pointers, err := lfs.ScanRefs(ref, "")
if err != nil {
Panic(err, "Could not scan for Git LFS files")
}
q := lfs.NewDownloadQueue(lfs.Config.ConcurrentTransfers(), len(pointers))
for _, p := range pointers {
q.Add(lfs.NewDownloadable(p))
}
target, err := git.ResolveRef(ref)
if err != nil {
Panic(err, "Could not resolve git ref")
}
current, err := git.CurrentRef()
if err != nil {
Panic(err, "Could not get the current git ref")
}
if target == current {
// We just downloaded the files for the current ref, we can copy them into
// the working directory and update the git index. We're doing this in a
// goroutine so they can be copied as they come in, for efficiency.
watch := q.Watch()
go func() {
files := make(map[string]*lfs.WrappedPointer, len(pointers))
for _, pointer := range pointers {
files[pointer.Oid] = pointer
}
// Fire up the update-index command
cmd := exec.Command("git", "update-index", "-q", "--refresh", "--stdin")
stdin, err := cmd.StdinPipe()
if err != nil {
Panic(err, "Could not update the index")
}
if err := cmd.Start(); err != nil {
Panic(err, "Could not update the index")
}
// As files come in, write them to the wd and update the index
for oid := range watch {
pointer, ok := files[oid]
if !ok {
continue
}
file, err := os.Create(pointer.Name)
if err != nil {
Panic(err, "Could not create working directory file")
}
if err := lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil); err != nil {
Panic(err, "Could not write working directory file")
}
file.Close()
stdin.Write([]byte(pointer.Name + "\n"))
}
stdin.Close()
if err := cmd.Wait(); err != nil {
Panic(err, "Error updating the git index")
}
}()
processQueue := time.Now()
q.Process()
tracerx.PerformanceSince("process queue", processQueue)
}
}
func init() {
RootCmd.AddCommand(getCmd)
}

@ -71,7 +71,7 @@ func prePushCommand(cmd *cobra.Command, args []string) {
Panic(err, "Error scanning for Git LFS files") Panic(err, "Error scanning for Git LFS files")
} }
uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentUploads(), len(pointers)) uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(pointers))
for i, pointer := range pointers { for i, pointer := range pointers {
if prePushDryRun { if prePushDryRun {

@ -98,7 +98,7 @@ func pushCommand(cmd *cobra.Command, args []string) {
Panic(err, "Error scanning for Git LFS files") Panic(err, "Error scanning for Git LFS files")
} }
uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentUploads(), len(pointers)) uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(pointers))
for i, pointer := range pointers { for i, pointer := range pointers {
if pushDryRun { if pushDryRun {

@ -266,6 +266,73 @@ only appears on a 200 status.
* 403 - The user has **read**, but not **write** access. * 403 - The user has **read**, but not **write** access.
* 404 - The repository does not exist for the user. * 404 - The repository does not exist for the user.
## POST /objects/batch
This request retrieves the metadata for a batch of objects, given a JSON body
containing an object with an array of objects with the oid and size of each object.
NOTE: This is an experimental API that is subject to change. It will ship disabled
by default in Git LFS v0.5.2. You can enable it if your Git LFS server supports it
with `git config lfs.batch true`.
```
> POST https://git-lfs-server.com/objects/batch HTTP/1.1
> Accept: application/vnd.git-lfs+json
> Content-Type: application/vnd.git-lfs+json
> Authorization: Basic ... (if authentication is needed)
>
> {
> "objects": [
> {
> "oid": "1111111",
> "size": 123
> }
> ]
> }
>
< HTTP/1.1 200 Accepted
< Content-Type: application/vnd.git-lfs+json
<
< {
< "objects": [
< "oid": "1111111",
< "_links": {
< "upload": {
< "href": "https://some-upload.com",
< "header": {
< "Key": "value"
< }
< },
< "verify": {
< "href": "https://some-callback.com",
< "header": {
< "Key": "value"
< }
< }
< }
< ]
< }
```
The response will be an object containing an array of objects with one of
multiple link relations, each with an `href` property and an optional `header`
property.
* `upload` - This relation describes how to upload the object. Expect this with
when the object has not been previously uploaded.
* `verify` - The server can specify a URL for the client to hit after
successfully uploading an object. This is an optional relation for the case that
the server has not verified the object.
* `download` - This relation describes how to download the object content. This
only appears if an object has been previously uploaded.
### Responses
* 200 - OK
* 401 - The authentication credentials are incorrect.
* 403 - The user has **read**, but not **write** access.
* 404 - The repository does not exist for the user.
## Verification ## Verification
When Git LFS clients issue a POST request to initiate an object upload, the When Git LFS clients issue a POST request to initiate an object upload, the

@ -0,0 +1,31 @@
git-lfs-get(1) -- Download all Git LFS files for a given ref
============================================================
## SYNOPSIS
`git lfs get` [<ref>]
## DESCRIPTION
Download any Git LFS objects for a given ref. If no ref is given,
the currently checked out ref will be used.
If the given ref is the same as the currently checked out ref, the
files will be written to the working directory.
## EXAMPLES
* Get the LFS objects for the current ref
`git lfs get`
* Get the LFS objects for a branch
`git lfs get mybranch`
* Get the LFS objects for a commit
`git lfs get e445b45c1c9c6282614f201b62778e4c0688b5c8`
Part of the git-lfs(1) suite.

@ -22,8 +22,12 @@ func LsRemote(remote, remoteRef string) (string, error) {
return simpleExec(nil, "git", "ls-remote", remote, remoteRef) return simpleExec(nil, "git", "ls-remote", remote, remoteRef)
} }
func ResolveRef(ref string) (string, error) {
return simpleExec(nil, "git", "rev-parse", ref)
}
func CurrentRef() (string, error) { func CurrentRef() (string, error) {
return simpleExec(nil, "git", "rev-parse", "HEAD") return ResolveRef("HEAD")
} }
func CurrentBranch() (string, error) { func CurrentBranch() (string, error) {
@ -36,7 +40,7 @@ func CurrentRemoteRef() (string, error) {
return "", err return "", err
} }
return simpleExec(nil, "git", "rev-parse", remote) return ResolveRef(remote)
} }
func CurrentRemote() (string, error) { func CurrentRemote() (string, error) {
@ -57,6 +61,11 @@ func CurrentRemote() (string, error) {
return remote + "/" + branch, nil return remote + "/" + branch, nil
} }
func UpdateIndex(file string) error {
_, err := simpleExec(nil, "git", "update-index", "-q", "--refresh", file)
return err
}
type gitConfig struct { type gitConfig struct {
} }

@ -110,9 +110,12 @@ func Download(oid string) (io.ReadCloser, int64, *WrappedError) {
res, obj, wErr := doApiRequest(req, creds) res, obj, wErr := doApiRequest(req, creds)
if wErr != nil { if wErr != nil {
sendApiEvent(apiEventFail)
return nil, 0, wErr return nil, 0, wErr
} }
sendApiEvent(apiEventSuccess)
req, creds, err = obj.NewRequest("download", "GET") req, creds, err = obj.NewRequest("download", "GET")
if err != nil { if err != nil {
return nil, 0, Error(err) return nil, 0, Error(err)
@ -130,21 +133,87 @@ type byteCloser struct {
*bytes.Reader *bytes.Reader
} }
func DownloadCheck(oid string) (*objectResource, *WrappedError) {
req, creds, err := newApiRequest("GET", oid)
if err != nil {
return nil, Error(err)
}
_, obj, wErr := doApiRequest(req, creds)
if wErr != nil {
return nil, wErr
}
_, _, err = obj.NewRequest("download", "GET")
if err != nil {
return nil, Error(err)
}
return obj, nil
}
func DownloadObject(obj *objectResource) (io.ReadCloser, int64, *WrappedError) {
req, creds, err := obj.NewRequest("download", "GET")
if err != nil {
return nil, 0, Error(err)
}
res, wErr := doHttpRequest(req, creds)
if wErr != nil {
return nil, 0, wErr
}
return res.Body, res.ContentLength, nil
}
func (b *byteCloser) Close() error { func (b *byteCloser) Close() error {
return nil return nil
} }
func Upload(oidPath, filename string, cb CopyCallback) *WrappedError { func Batch(objects []*objectResource) ([]*objectResource, *WrappedError) {
oid := filepath.Base(oidPath) if len(objects) == 0 {
file, err := os.Open(oidPath) return nil, nil
if err != nil {
return Error(err)
} }
defer file.Close()
stat, err := file.Stat() o := map[string][]*objectResource{"objects": objects}
by, err := json.Marshal(o)
if err != nil { if err != nil {
return Error(err) return nil, Error(err)
}
req, creds, err := newApiRequest("POST", "batch")
if err != nil {
return nil, Error(err)
}
req.Header.Set("Content-Type", mediaType)
req.Header.Set("Content-Length", strconv.Itoa(len(by)))
req.ContentLength = int64(len(by))
req.Body = &byteCloser{bytes.NewReader(by)}
tracerx.Printf("api: batch %d files", len(objects))
res, objs, wErr := doApiBatchRequest(req, creds)
if wErr != nil {
sendApiEvent(apiEventFail)
return nil, wErr
}
sendApiEvent(apiEventSuccess)
if res.StatusCode != 200 {
return nil, Errorf(nil, "Invalid status for %s %s: %d", req.Method, req.URL, res.StatusCode)
}
return objs, nil
}
func UploadCheck(oidPath string) (*objectResource, *WrappedError) {
oid := filepath.Base(oidPath)
stat, err := os.Stat(oidPath)
if err != nil {
sendApiEvent(apiEventFail)
return nil, Error(err)
} }
reqObj := &objectResource{ reqObj := &objectResource{
@ -154,12 +223,14 @@ func Upload(oidPath, filename string, cb CopyCallback) *WrappedError {
by, err := json.Marshal(reqObj) by, err := json.Marshal(reqObj)
if err != nil { if err != nil {
return Error(err) sendApiEvent(apiEventFail)
return nil, Error(err)
} }
req, creds, err := newApiRequest("POST", oid) req, creds, err := newApiRequest("POST", oid)
if err != nil { if err != nil {
return Error(err) sendApiEvent(apiEventFail)
return nil, Error(err)
} }
req.Header.Set("Content-Type", mediaType) req.Header.Set("Content-Type", mediaType)
@ -167,28 +238,41 @@ func Upload(oidPath, filename string, cb CopyCallback) *WrappedError {
req.ContentLength = int64(len(by)) req.ContentLength = int64(len(by))
req.Body = &byteCloser{bytes.NewReader(by)} req.Body = &byteCloser{bytes.NewReader(by)}
tracerx.Printf("api: uploading %s (%s)", filename, oid) tracerx.Printf("api: uploading (%s)", oid)
res, obj, wErr := doApiRequest(req, creds) res, obj, wErr := doApiRequest(req, creds)
if wErr != nil { if wErr != nil {
sendApiEvent(apiEventFail) sendApiEvent(apiEventFail)
return wErr return nil, wErr
} }
sendApiEvent(apiEventSuccess) sendApiEvent(apiEventSuccess)
if res.StatusCode == 200 {
return nil, nil
}
return obj, nil
}
func UploadObject(o *objectResource, cb CopyCallback) *WrappedError {
path, err := LocalMediaPath(o.Oid)
if err != nil {
return Error(err)
}
file, err := os.Open(path)
if err != nil {
return Error(err)
}
defer file.Close()
reader := &CallbackReader{ reader := &CallbackReader{
C: cb, C: cb,
TotalSize: reqObj.Size, TotalSize: o.Size,
Reader: file, Reader: file,
} }
if res.StatusCode == 200 { req, creds, err := o.NewRequest("upload", "PUT")
// Drain the reader to update any progress bars
io.Copy(ioutil.Discard, reader)
return nil
}
req, creds, err = obj.NewRequest("upload", "PUT")
if err != nil { if err != nil {
return Error(err) return Error(err)
} }
@ -196,12 +280,12 @@ func Upload(oidPath, filename string, cb CopyCallback) *WrappedError {
if len(req.Header.Get("Content-Type")) == 0 { if len(req.Header.Get("Content-Type")) == 0 {
req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("Content-Type", "application/octet-stream")
} }
req.Header.Set("Content-Length", strconv.FormatInt(reqObj.Size, 10)) req.Header.Set("Content-Length", strconv.FormatInt(o.Size, 10))
req.ContentLength = reqObj.Size req.ContentLength = o.Size
req.Body = ioutil.NopCloser(reader) req.Body = ioutil.NopCloser(reader)
res, wErr = doHttpRequest(req, creds) res, wErr := doHttpRequest(req, creds)
if wErr != nil { if wErr != nil {
return wErr return wErr
} }
@ -213,13 +297,18 @@ func Upload(oidPath, filename string, cb CopyCallback) *WrappedError {
io.Copy(ioutil.Discard, res.Body) io.Copy(ioutil.Discard, res.Body)
res.Body.Close() res.Body.Close()
req, creds, err = obj.NewRequest("verify", "POST") req, creds, err = o.NewRequest("verify", "POST")
if err == objectRelationDoesNotExist { if err == objectRelationDoesNotExist {
return nil return nil
} else if err != nil { } else if err != nil {
return Error(err) return Error(err)
} }
by, err := json.Marshal(o)
if err != nil {
return Error(err)
}
req.Header.Set("Content-Type", mediaType) req.Header.Set("Content-Type", mediaType)
req.Header.Set("Content-Length", strconv.Itoa(len(by))) req.Header.Set("Content-Length", strconv.Itoa(len(by)))
req.ContentLength = int64(len(by)) req.ContentLength = int64(len(by))
@ -310,6 +399,23 @@ func doApiRequest(req *http.Request, creds Creds) (*http.Response, *objectResour
return res, obj, nil return res, obj, nil
} }
func doApiBatchRequest(req *http.Request, creds Creds) (*http.Response, []*objectResource, *WrappedError) {
via := make([]*http.Request, 0, 4)
res, wErr := doApiRequestWithRedirects(req, creds, via)
if wErr != nil {
return res, nil, wErr
}
var objs map[string][]*objectResource
wErr = decodeApiResponse(res, &objs)
if wErr != nil {
setErrorResponseContext(wErr, res)
}
return res, objs["objects"], wErr
}
func handleResponse(res *http.Response) *WrappedError { func handleResponse(res *http.Response) *WrappedError {
if res.StatusCode < 400 { if res.StatusCode < 400 {
return nil return nil
@ -382,9 +488,11 @@ func newApiRequest(method, oid string) (*http.Request, Creds, error) {
objectOid := oid objectOid := oid
operation := "download" operation := "download"
if method == "POST" { if method == "POST" {
if oid != "batch" {
objectOid = "" objectOid = ""
operation = "upload" operation = "upload"
} }
}
res, err := sshAuthenticate(endpoint, operation, oid) res, err := sshAuthenticate(endpoint, operation, oid)
if err != nil { if err != nil {

@ -71,12 +71,12 @@ func (c *Configuration) Endpoint() Endpoint {
return c.RemoteEndpoint(defaultRemote) return c.RemoteEndpoint(defaultRemote)
} }
func (c *Configuration) ConcurrentUploads() int { func (c *Configuration) ConcurrentTransfers() int {
uploads := 3 uploads := 3
if v, ok := c.GitConfig("lfs.concurrentuploads"); ok { if v, ok := c.GitConfig("lfs.concurrenttransfers"); ok {
n, err := strconv.Atoi(v) n, err := strconv.Atoi(v)
if err == nil { if err == nil && n > 0 {
uploads = n uploads = n
} }
} }
@ -84,6 +84,20 @@ func (c *Configuration) ConcurrentUploads() int {
return uploads return uploads
} }
func (c *Configuration) BatchTransfer() bool {
if v, ok := c.GitConfig("lfs.batch"); ok {
if v == "true" || v == "" {
return true
}
// Any numeric value except 0 is considered true
if n, err := strconv.Atoi(v); err == nil && n != 0 {
return true
}
}
return false
}
func (c *Configuration) RemoteEndpoint(remote string) Endpoint { func (c *Configuration) RemoteEndpoint(remote string) Endpoint {
if len(remote) == 0 { if len(remote) == 0 {
remote = defaultRemote remote = defaultRemote

@ -193,3 +193,138 @@ func TestObjectsUrl(t *testing.T) {
} }
} }
} }
func TestConcurrentTransfersSetValue(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.concurrenttransfers": "5",
},
}
n := config.ConcurrentTransfers()
assert.Equal(t, 5, n)
}
func TestConcurrentTransfersDefault(t *testing.T) {
config := &Configuration{}
n := config.ConcurrentTransfers()
assert.Equal(t, 3, n)
}
func TestConcurrentTransfersZeroValue(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.concurrenttransfers": "0",
},
}
n := config.ConcurrentTransfers()
assert.Equal(t, 3, n)
}
func TestConcurrentTransfersNonNumeric(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.concurrenttransfers": "elephant",
},
}
n := config.ConcurrentTransfers()
assert.Equal(t, 3, n)
}
func TestConcurrentTransfersNegativeValue(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.concurrenttransfers": "-5",
},
}
n := config.ConcurrentTransfers()
assert.Equal(t, 3, n)
}
func TestBatchTrue(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.batch": "true",
},
}
v := config.BatchTransfer()
assert.Equal(t, true, v)
}
func TestBatchNumeric1IsTrue(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.batch": "1",
},
}
v := config.BatchTransfer()
assert.Equal(t, true, v)
}
func TestBatchNumeric0IsFalse(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.batch": "0",
},
}
v := config.BatchTransfer()
assert.Equal(t, false, v)
}
func TestBatchOtherNumericsAreTrue(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.batch": "42",
},
}
v := config.BatchTransfer()
assert.Equal(t, true, v)
}
func TestBatchNegativeNumericsAreTrue(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.batch": "-1",
},
}
v := config.BatchTransfer()
assert.Equal(t, true, v)
}
func TestBatchNonBooleanIsFalse(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.batch": "elephant",
},
}
v := config.BatchTransfer()
assert.Equal(t, false, v)
}
func TestBatchPresentButBlankIsTrue(t *testing.T) {
config := &Configuration{
gitConfig: map[string]string{
"lfs.batch": "",
},
}
v := config.BatchTransfer()
assert.Equal(t, true, v)
}
func TestBatchAbsentIsFalse(t *testing.T) {
config := &Configuration{}
v := config.BatchTransfer()
assert.Equal(t, false, v)
}

45
lfs/download_queue.go Normal file

@ -0,0 +1,45 @@
package lfs
type Downloadable struct {
Pointer *WrappedPointer
object *objectResource
}
func NewDownloadable(p *WrappedPointer) *Downloadable {
return &Downloadable{Pointer: p}
}
func (d *Downloadable) Check() (*objectResource, *WrappedError) {
return DownloadCheck(d.Pointer.Oid)
}
func (d *Downloadable) Transfer(cb CopyCallback) *WrappedError {
err := PointerSmudgeObject(d.Pointer.Pointer, d.object, cb)
if err != nil {
return Error(err)
}
return nil
}
func (d *Downloadable) Object() *objectResource {
return d.object
}
func (d *Downloadable) Oid() string {
return d.Pointer.Oid
}
func (d *Downloadable) Size() int64 {
return d.Pointer.Size
}
func (d *Downloadable) SetObject(o *objectResource) {
d.object = o
}
// NewDownloadQueue builds a DownloadQueue, allowing `workers` concurrent downloads.
func NewDownloadQueue(workers, files int) *TransferQueue {
q := newTransferQueue(workers, files)
q.transferKind = "download"
return q
}

@ -52,11 +52,13 @@ func LocalMediaPath(sha string) (string, error) {
func Environ() []string { func Environ() []string {
osEnviron := os.Environ() osEnviron := os.Environ()
env := make([]string, 4, len(osEnviron)+4) env := make([]string, 6, len(osEnviron)+6)
env[0] = fmt.Sprintf("LocalWorkingDir=%s", LocalWorkingDir) env[0] = fmt.Sprintf("LocalWorkingDir=%s", LocalWorkingDir)
env[1] = fmt.Sprintf("LocalGitDir=%s", LocalGitDir) env[1] = fmt.Sprintf("LocalGitDir=%s", LocalGitDir)
env[2] = fmt.Sprintf("LocalMediaDir=%s", LocalMediaDir) env[2] = fmt.Sprintf("LocalMediaDir=%s", LocalMediaDir)
env[3] = fmt.Sprintf("TempDir=%s", TempDir) env[3] = fmt.Sprintf("TempDir=%s", TempDir)
env[4] = fmt.Sprintf("ConcurrentTransfers=%d", Config.ConcurrentTransfers())
env[5] = fmt.Sprintf("BatchTransfer=%v", Config.BatchTransfer())
for _, e := range osEnviron { for _, e := range osEnviron {
if !strings.Contains(e, "GIT_") { if !strings.Contains(e, "GIT_") {

@ -31,6 +31,7 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, cb CopyCa
if statErr != nil || stat == nil { if statErr != nil || stat == nil {
wErr = downloadFile(writer, ptr, workingfile, mediafile, cb) wErr = downloadFile(writer, ptr, workingfile, mediafile, cb)
} else { } else {
sendApiEvent(apiEventSuccess)
wErr = readLocalFile(writer, ptr, mediafile, cb) wErr = readLocalFile(writer, ptr, mediafile, cb)
} }
@ -41,6 +42,71 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, cb CopyCa
return nil return nil
} }
// PointerSmudgeObject uses a Pointer and objectResource to download the object to the
// media directory. It does not write the file to the working directory.
func PointerSmudgeObject(ptr *Pointer, obj *objectResource, cb CopyCallback) error {
mediafile, err := LocalMediaPath(obj.Oid)
if err != nil {
return err
}
stat, statErr := os.Stat(mediafile)
if statErr == nil && stat != nil {
fileSize := stat.Size()
if fileSize == 0 || fileSize != obj.Size {
tracerx.Printf("Removing %s, size %d is invalid", mediafile, fileSize)
os.RemoveAll(mediafile)
stat = nil
}
}
if statErr != nil || stat == nil {
wErr := downloadObject(ptr, obj, mediafile, cb)
if wErr != nil {
sendApiEvent(apiEventFail)
return &SmudgeError{obj.Oid, mediafile, wErr}
}
}
sendApiEvent(apiEventSuccess)
return nil
}
func downloadObject(ptr *Pointer, obj *objectResource, mediafile string, cb CopyCallback) *WrappedError {
reader, size, wErr := DownloadObject(obj)
if reader != nil {
defer reader.Close()
}
// TODO this can be unified with the same code in downloadFile
if wErr != nil {
wErr.Errorf("Error downloading %s.", mediafile)
return wErr
}
if ptr.Size == 0 {
ptr.Size = size
}
mediaFile, err := contentaddressable.NewFile(mediafile)
if err != nil {
return Errorf(err, "Error opening media file buffer.")
}
_, err = CopyWithCallback(mediaFile, reader, ptr.Size, cb)
if err == nil {
err = mediaFile.Accept()
}
mediaFile.Close()
if err != nil {
return Errorf(err, "Error buffering media file.")
}
return nil
}
func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, cb CopyCallback) *WrappedError { func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, cb CopyCallback) *WrappedError {
fmt.Fprintf(os.Stderr, "Downloading %s (%s)\n", workingfile, pb.FormatBytes(ptr.Size)) fmt.Fprintf(os.Stderr, "Downloading %s (%s)\n", workingfile, pb.FormatBytes(ptr.Size))
reader, size, wErr := Download(filepath.Base(mediafile)) reader, size, wErr := Download(filepath.Base(mediafile))

@ -26,10 +26,10 @@ const (
chanBufSize = 100 chanBufSize = 100
) )
// wrappedPointer wraps a pointer.Pointer and provides the git sha1 // WrappedPointer wraps a pointer.Pointer and provides the git sha1
// and the file name associated with the object, taken from the // and the file name associated with the object, taken from the
// rev-list output. // rev-list output.
type wrappedPointer struct { type WrappedPointer struct {
Sha1 string Sha1 string
Name string Name string
SrcName string SrcName string
@ -49,9 +49,9 @@ type indexFile struct {
var z40 = regexp.MustCompile(`\^?0{40}`) var z40 = regexp.MustCompile(`\^?0{40}`)
// ScanRefs takes a ref and returns a slice of wrappedPointer objects // ScanRefs takes a ref and returns a slice of WrappedPointer objects
// for all Git LFS pointers it finds for that ref. // for all Git LFS pointers it finds for that ref.
func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) { func ScanRefs(refLeft, refRight string) ([]*WrappedPointer, error) {
nameMap := make(map[string]string, 0) nameMap := make(map[string]string, 0)
start := time.Now() start := time.Now()
@ -74,7 +74,7 @@ func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) {
return nil, err return nil, err
} }
pointers := make([]*wrappedPointer, 0) pointers := make([]*WrappedPointer, 0)
for p := range pointerc { for p := range pointerc {
if name, ok := nameMap[p.Sha1]; ok { if name, ok := nameMap[p.Sha1]; ok {
p.Name = name p.Name = name
@ -85,9 +85,9 @@ func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) {
return pointers, nil return pointers, nil
} }
// ScanIndex returns a slice of wrappedPointer objects for all // ScanIndex returns a slice of WrappedPointer objects for all
// Git LFS pointers it finds in the index. // Git LFS pointers it finds in the index.
func ScanIndex() ([]*wrappedPointer, error) { func ScanIndex() ([]*WrappedPointer, error) {
nameMap := make(map[string]*indexFile, 0) nameMap := make(map[string]*indexFile, 0)
start := time.Now() start := time.Now()
@ -132,7 +132,7 @@ func ScanIndex() ([]*wrappedPointer, error) {
return nil, err return nil, err
} }
pointers := make([]*wrappedPointer, 0) pointers := make([]*WrappedPointer, 0)
for p := range pointerc { for p := range pointerc {
if e, ok := nameMap[p.Sha1]; ok { if e, ok := nameMap[p.Sha1]; ok {
p.Name = e.Name p.Name = e.Name
@ -288,13 +288,13 @@ func catFileBatchCheck(revs chan string) (chan string, error) {
// of a git object, given its sha1. The contents will be decoded into // of a git object, given its sha1. The contents will be decoded into
// a Git LFS pointer. revs is a channel over which strings containing Git SHA1s // a Git LFS pointer. revs is a channel over which strings containing Git SHA1s
// will be sent. It returns a channel from which point.Pointers can be read. // will be sent. It returns a channel from which point.Pointers can be read.
func catFileBatch(revs chan string) (chan *wrappedPointer, error) { func catFileBatch(revs chan string) (chan *WrappedPointer, error) {
cmd, err := startCommand("git", "cat-file", "--batch") cmd, err := startCommand("git", "cat-file", "--batch")
if err != nil { if err != nil {
return nil, err return nil, err
} }
pointers := make(chan *wrappedPointer, chanBufSize) pointers := make(chan *WrappedPointer, chanBufSize)
go func() { go func() {
for { for {
@ -316,7 +316,7 @@ func catFileBatch(revs chan string) (chan *wrappedPointer, error) {
p, err := DecodePointer(bytes.NewBuffer(nbuf)) p, err := DecodePointer(bytes.NewBuffer(nbuf))
if err == nil { if err == nil {
pointers <- &wrappedPointer{ pointers <- &WrappedPointer{
Sha1: string(fields[0]), Sha1: string(fields[0]),
Size: p.Size, Size: p.Size,
Pointer: p, Pointer: p,

230
lfs/transfer_queue.go Normal file

@ -0,0 +1,230 @@
package lfs
import (
"fmt"
"sync"
"sync/atomic"
"github.com/cheggaaa/pb"
)
type Transferable interface {
Check() (*objectResource, *WrappedError)
Transfer(CopyCallback) *WrappedError
Object() *objectResource
Oid() string
Size() int64
SetObject(*objectResource)
}
// TransferQueue provides a queue that will allow concurrent transfers.
type TransferQueue struct {
transferc chan Transferable
errorc chan *WrappedError
watchers []chan string
errors []*WrappedError
wg sync.WaitGroup
workers int
files int
finished int64
size int64
authCond *sync.Cond
transferables map[string]Transferable
bar *pb.ProgressBar
clientAuthorized int32
transferKind string
}
// newTransferQueue builds a TransferQueue, allowing `workers` concurrent transfers.
func newTransferQueue(workers, files int) *TransferQueue {
return &TransferQueue{
transferc: make(chan Transferable, files),
errorc: make(chan *WrappedError),
watchers: make([]chan string, 0),
workers: workers,
files: files,
authCond: sync.NewCond(&sync.Mutex{}),
transferables: make(map[string]Transferable),
}
}
// Add adds a Transferable to the transfer queue.
func (q *TransferQueue) Add(t Transferable) {
q.transferables[t.Oid()] = t
}
// Watch returns a channel where the queue will write the OID of each transfer
// as it completes. The channel will be closed when the queue finishes processing.
func (q *TransferQueue) Watch() chan string {
c := make(chan string, q.files)
q.watchers = append(q.watchers, c)
return c
}
// processIndividual processes the queue of transfers one at a time by making
// a POST call for each object, feeding the results to the transfer workers.
// If configured, the object transfers can still happen concurrently, the
// sequential nature here is only for the meta POST calls.
func (q *TransferQueue) processIndividual() {
apic := make(chan Transferable, q.files)
workersReady := make(chan int, q.workers)
var wg sync.WaitGroup
for i := 0; i < q.workers; i++ {
go func() {
workersReady <- 1
for t := range apic {
// If an API authorization has not occured, we wait until we're woken up.
q.authCond.L.Lock()
if atomic.LoadInt32(&q.clientAuthorized) == 0 {
q.authCond.Wait()
}
q.authCond.L.Unlock()
obj, err := t.Check()
if err != nil {
q.errorc <- err
wg.Done()
continue
}
if obj != nil {
q.wg.Add(1)
t.SetObject(obj)
q.transferc <- t
}
wg.Done()
}
}()
}
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.transferables)))
q.bar.Start()
for _, t := range q.transferables {
wg.Add(1)
apic <- t
}
<-workersReady
q.authCond.Signal() // Signal the first goroutine to run
close(apic)
wg.Wait()
close(q.transferc)
}
// processBatch processes the queue of transfers using the batch endpoint,
// making only one POST call for all objects. The results are then handed
// off to the transfer workers.
func (q *TransferQueue) processBatch() {
q.files = 0
transfers := make([]*objectResource, 0, len(q.transferables))
for _, t := range q.transferables {
transfers = append(transfers, &objectResource{Oid: t.Oid(), Size: t.Size()})
}
objects, err := Batch(transfers)
if err != nil {
q.errorc <- err
sendApiEvent(apiEventFail)
return
}
for _, o := range objects {
if _, ok := o.Links[q.transferKind]; ok {
// This object needs to be transfered
if transfer, ok := q.transferables[o.Oid]; ok {
q.files++
q.wg.Add(1)
transfer.SetObject(o)
q.transferc <- transfer
}
}
}
close(q.transferc)
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files))
q.bar.Start()
sendApiEvent(apiEventSuccess) // Wake up transfer workers
}
// Process starts the transfer queue and displays a progress bar. Process will
// do individual or batch transfers depending on the Config.BatchTransfer() value.
// Process will transfer files sequentially or concurrently depending on the
// Concig.ConcurrentTransfers() value.
func (q *TransferQueue) Process() {
q.bar = pb.New64(q.size)
q.bar.SetUnits(pb.U_BYTES)
q.bar.ShowBar = false
// This goroutine collects errors returned from transfers
go func() {
for err := range q.errorc {
q.errors = append(q.errors, err)
}
}()
// This goroutine watches for apiEvents. In order to prevent multiple
// credential requests from happening, the queue is processed sequentially
// until an API request succeeds (meaning authenication has happened successfully).
// Once the an API request succeeds, all worker goroutines are woken up and allowed
// to process transfers. Once a success happens, this goroutine exits.
go func() {
for {
event := <-apiEvent
switch event {
case apiEventSuccess:
atomic.StoreInt32(&q.clientAuthorized, 1)
q.authCond.Broadcast() // Wake all remaining goroutines
return
case apiEventFail:
q.authCond.Signal() // Wake the next goroutine
}
}
}()
for i := 0; i < q.workers; i++ {
// These are the worker goroutines that process transfers
go func(n int) {
for transfer := range q.transferc {
cb := func(total, read int64, current int) error {
q.bar.Add(current)
return nil
}
if err := transfer.Transfer(cb); err != nil {
q.errorc <- err
} else {
oid := transfer.Oid()
for _, c := range q.watchers {
c <- oid
}
}
f := atomic.AddInt64(&q.finished, 1)
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files))
q.wg.Done()
}
}(i)
}
if Config.BatchTransfer() {
q.processBatch()
} else {
q.processIndividual()
}
q.wg.Wait()
close(q.errorc)
for _, watcher := range q.watchers {
close(watcher)
}
q.bar.Finish()
}
// Errors returns any errors encountered during transfer.
func (q *TransferQueue) Errors() []*WrappedError {
return q.errors
}

@ -4,22 +4,16 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"sync/atomic"
"github.com/cheggaaa/pb"
)
var (
clientAuthorized = int32(0)
) )
// Uploadable describes a file that can be uploaded. // Uploadable describes a file that can be uploaded.
type Uploadable struct { type Uploadable struct {
OIDPath string oid string
OidPath string
Filename string Filename string
CB CopyCallback CB CopyCallback
Size int64 size int64
object *objectResource
} }
// NewUploadable builds the Uploadable from the given information. // NewUploadable builds the Uploadable from the given information.
@ -47,123 +41,46 @@ func NewUploadable(oid, filename string, index, totalFiles int) (*Uploadable, *W
defer file.Close() defer file.Close()
} }
return &Uploadable{path, filename, cb, fi.Size()}, nil return &Uploadable{oid: oid, OidPath: path, Filename: filename, CB: cb, size: fi.Size()}, nil
} }
// UploadQueue provides a queue that will allow concurrent uploads. func (u *Uploadable) Check() (*objectResource, *WrappedError) {
type UploadQueue struct { return UploadCheck(u.OidPath)
uploadc chan *Uploadable
errorc chan *WrappedError
errors []*WrappedError
wg sync.WaitGroup
workers int
files int
finished int64
size int64
authCond *sync.Cond
} }
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads. func (u *Uploadable) Transfer(cb CopyCallback) *WrappedError {
func NewUploadQueue(workers, files int) *UploadQueue { wcb := func(total, read int64, current int) error {
return &UploadQueue{ cb(total, read, current)
uploadc: make(chan *Uploadable, files), if u.CB != nil {
errorc: make(chan *WrappedError), return u.CB(total, read, current)
workers: workers,
files: files,
authCond: sync.NewCond(&sync.Mutex{}),
}
}
// Add adds an Uploadable to the upload queue.
func (q *UploadQueue) Add(u *Uploadable) {
q.wg.Add(1)
q.size += u.Size
q.uploadc <- u
}
// Process starts the upload queue and displays a progress bar.
func (q *UploadQueue) Process() {
bar := pb.New64(q.size)
bar.SetUnits(pb.U_BYTES)
bar.ShowBar = false
bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files))
bar.Start()
// This goroutine collects errors returned from uploads
go func() {
for err := range q.errorc {
q.errors = append(q.errors, err)
}
}()
// This goroutine watches for apiEvents. In order to prevent multiple
// credential requests from happening, the queue is processed sequentially
// until an API request succeeds (meaning authenication has happened successfully).
// Once the an API request succeeds, all worker goroutines are woken up and allowed
// to process uploads. Once a success happens, this goroutine exits.
go func() {
for {
event := <-apiEvent
switch event {
case apiEventSuccess:
atomic.StoreInt32(&clientAuthorized, 1)
q.authCond.Broadcast() // Wake all remaining goroutines
return
case apiEventFail:
q.authCond.Signal() // Wake the next goroutine
}
}
}()
// This will block Process() until the worker goroutines are spun up and ready
// to process uploads.
workersReady := make(chan int, q.workers)
for i := 0; i < q.workers; i++ {
// These are the worker goroutines that process uploads
go func(n int) {
workersReady <- 1
for upload := range q.uploadc {
// If an API authorization has not occured, we wait until we're woken up.
q.authCond.L.Lock()
if atomic.LoadInt32(&clientAuthorized) == 0 {
q.authCond.Wait()
}
q.authCond.L.Unlock()
cb := func(total, read int64, current int) error {
bar.Add(current)
if upload.CB != nil {
return upload.CB(total, read, current)
} }
return nil return nil
} }
err := Upload(upload.OIDPath, upload.Filename, cb) return UploadObject(u.object, wcb)
if err != nil {
q.errorc <- err
}
f := atomic.AddInt64(&q.finished, 1)
bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files))
q.wg.Done()
}
}(i)
}
close(q.uploadc)
<-workersReady
q.authCond.Signal() // Signal the first goroutine to run
q.wg.Wait()
close(q.errorc)
bar.Finish()
} }
// Errors returns any errors encountered during uploading. func (u *Uploadable) Object() *objectResource {
func (q *UploadQueue) Errors() []*WrappedError { return u.object
return q.errors }
func (u *Uploadable) Oid() string {
return u.oid
}
func (u *Uploadable) Size() int64 {
return u.size
}
func (u *Uploadable) SetObject(o *objectResource) {
u.object = o
}
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
func NewUploadQueue(workers, files int) *TransferQueue {
q := newTransferQueue(workers, files)
q.transferKind = "upload"
return q
} }
// ensureFile makes sure that the cleanPath exists before pushing it. If it // ensureFile makes sure that the cleanPath exists before pushing it. If it

@ -9,7 +9,6 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"path/filepath"
"strconv" "strconv"
"testing" "testing"
) )
@ -18,6 +17,11 @@ func TestExistingUpload(t *testing.T) {
mux := http.NewServeMux() mux := http.NewServeMux()
server := httptest.NewServer(mux) server := httptest.NewServer(mux)
tmp := tempdir(t) tmp := tempdir(t)
olddir := LocalMediaDir
LocalMediaDir = tmp
defer func() {
LocalMediaDir = olddir
}()
defer server.Close() defer server.Close()
defer os.RemoveAll(tmp) defer os.RemoveAll(tmp)
@ -51,7 +55,7 @@ func TestExistingUpload(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if reqObj.Oid != "oid" { if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" {
t.Errorf("invalid oid from request: %s", reqObj.Oid) t.Errorf("invalid oid from request: %s", reqObj.Oid)
} }
@ -60,6 +64,8 @@ func TestExistingUpload(t *testing.T) {
} }
obj := &objectResource{ obj := &objectResource{
Oid: reqObj.Oid,
Size: reqObj.Size,
Links: map[string]*linkRelation{ Links: map[string]*linkRelation{
"upload": &linkRelation{ "upload": &linkRelation{
Href: server.URL + "/upload", Href: server.URL + "/upload",
@ -99,22 +105,18 @@ func TestExistingUpload(t *testing.T) {
Config.SetConfig("lfs.url", server.URL+"/media") Config.SetConfig("lfs.url", server.URL+"/media")
oidPath := filepath.Join(tmp, "oid") oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11")
if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// stores callbacks o, wErr := UploadCheck(oidPath)
calls := make([][]int64, 0, 5)
cb := func(total int64, written int64, current int) error {
calls = append(calls, []int64{total, written})
return nil
}
wErr := Upload(oidPath, "", cb)
if wErr != nil { if wErr != nil {
t.Fatal(wErr) t.Fatal(wErr)
} }
if o != nil {
t.Errorf("Got an object back")
}
if !postCalled { if !postCalled {
t.Errorf("POST not called") t.Errorf("POST not called")
@ -133,6 +135,11 @@ func TestUploadWithRedirect(t *testing.T) {
mux := http.NewServeMux() mux := http.NewServeMux()
server := httptest.NewServer(mux) server := httptest.NewServer(mux)
tmp := tempdir(t) tmp := tempdir(t)
olddir := LocalMediaDir
LocalMediaDir = tmp
defer func() {
LocalMediaDir = olddir
}()
defer server.Close() defer server.Close()
defer os.RemoveAll(tmp) defer os.RemoveAll(tmp)
@ -186,7 +193,7 @@ func TestUploadWithRedirect(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if reqObj.Oid != "oid" { if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" {
t.Errorf("invalid oid from request: %s", reqObj.Oid) t.Errorf("invalid oid from request: %s", reqObj.Oid)
} }
@ -221,21 +228,30 @@ func TestUploadWithRedirect(t *testing.T) {
Config.SetConfig("lfs.url", server.URL+"/redirect") Config.SetConfig("lfs.url", server.URL+"/redirect")
oidPath := filepath.Join(tmp, "oid") oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11")
if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil {
t.Fatal(err) t.Fatal(err)
} }
wErr := Upload(oidPath, "", nil) obj, wErr := UploadCheck(oidPath)
if wErr != nil { if wErr != nil {
t.Fatal(wErr) t.Fatal(wErr)
} }
if obj != nil {
t.Fatal("Received an object")
}
} }
func TestSuccessfulUploadWithVerify(t *testing.T) { func TestSuccessfulUploadWithVerify(t *testing.T) {
mux := http.NewServeMux() mux := http.NewServeMux()
server := httptest.NewServer(mux) server := httptest.NewServer(mux)
tmp := tempdir(t) tmp := tempdir(t)
olddir := LocalMediaDir
LocalMediaDir = tmp
defer func() {
LocalMediaDir = olddir
}()
defer server.Close() defer server.Close()
defer os.RemoveAll(tmp) defer os.RemoveAll(tmp)
@ -269,7 +285,7 @@ func TestSuccessfulUploadWithVerify(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if reqObj.Oid != "oid" { if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" {
t.Errorf("invalid oid from request: %s", reqObj.Oid) t.Errorf("invalid oid from request: %s", reqObj.Oid)
} }
@ -278,6 +294,8 @@ func TestSuccessfulUploadWithVerify(t *testing.T) {
} }
obj := &objectResource{ obj := &objectResource{
Oid: reqObj.Oid,
Size: reqObj.Size,
Links: map[string]*linkRelation{ Links: map[string]*linkRelation{
"upload": &linkRelation{ "upload": &linkRelation{
Href: server.URL + "/upload", Href: server.URL + "/upload",
@ -369,7 +387,7 @@ func TestSuccessfulUploadWithVerify(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if reqObj.Oid != "oid" { if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" {
t.Errorf("invalid oid from request: %s", reqObj.Oid) t.Errorf("invalid oid from request: %s", reqObj.Oid)
} }
@ -383,7 +401,7 @@ func TestSuccessfulUploadWithVerify(t *testing.T) {
Config.SetConfig("lfs.url", server.URL+"/media") Config.SetConfig("lfs.url", server.URL+"/media")
oidPath := filepath.Join(tmp, "oid") oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11")
if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -395,7 +413,11 @@ func TestSuccessfulUploadWithVerify(t *testing.T) {
return nil return nil
} }
wErr := Upload(oidPath, "", cb) obj, wErr := UploadCheck(oidPath)
if wErr != nil {
t.Fatal(wErr)
}
wErr = UploadObject(obj, cb)
if wErr != nil { if wErr != nil {
t.Fatal(wErr) t.Fatal(wErr)
} }
@ -428,6 +450,11 @@ func TestSuccessfulUploadWithoutVerify(t *testing.T) {
mux := http.NewServeMux() mux := http.NewServeMux()
server := httptest.NewServer(mux) server := httptest.NewServer(mux)
tmp := tempdir(t) tmp := tempdir(t)
olddir := LocalMediaDir
LocalMediaDir = tmp
defer func() {
LocalMediaDir = olddir
}()
defer server.Close() defer server.Close()
defer os.RemoveAll(tmp) defer os.RemoveAll(tmp)
@ -460,7 +487,7 @@ func TestSuccessfulUploadWithoutVerify(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if reqObj.Oid != "oid" { if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" {
t.Errorf("invalid oid from request: %s", reqObj.Oid) t.Errorf("invalid oid from request: %s", reqObj.Oid)
} }
@ -469,6 +496,8 @@ func TestSuccessfulUploadWithoutVerify(t *testing.T) {
} }
obj := &objectResource{ obj := &objectResource{
Oid: reqObj.Oid,
Size: reqObj.Size,
Links: map[string]*linkRelation{ Links: map[string]*linkRelation{
"upload": &linkRelation{ "upload": &linkRelation{
Href: server.URL + "/upload", Href: server.URL + "/upload",
@ -532,12 +561,16 @@ func TestSuccessfulUploadWithoutVerify(t *testing.T) {
Config.SetConfig("lfs.url", server.URL+"/media") Config.SetConfig("lfs.url", server.URL+"/media")
oidPath := filepath.Join(tmp, "oid") oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11")
if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil {
t.Fatal(err) t.Fatal(err)
} }
wErr := Upload(oidPath, "", nil) obj, wErr := UploadCheck(oidPath)
if wErr != nil {
t.Fatal(wErr)
}
wErr = UploadObject(obj, nil)
if wErr != nil { if wErr != nil {
t.Fatal(wErr) t.Fatal(wErr)
} }
@ -555,6 +588,11 @@ func TestUploadApiError(t *testing.T) {
mux := http.NewServeMux() mux := http.NewServeMux()
server := httptest.NewServer(mux) server := httptest.NewServer(mux)
tmp := tempdir(t) tmp := tempdir(t)
olddir := LocalMediaDir
LocalMediaDir = olddir
defer func() {
LocalMediaDir = olddir
}()
defer server.Close() defer server.Close()
defer os.RemoveAll(tmp) defer os.RemoveAll(tmp)
@ -567,14 +605,14 @@ func TestUploadApiError(t *testing.T) {
Config.SetConfig("lfs.url", server.URL+"/media") Config.SetConfig("lfs.url", server.URL+"/media")
oidPath := filepath.Join(tmp, "oid") oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11")
if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil {
t.Fatal(err) t.Fatal(err)
} }
wErr := Upload(oidPath, "", nil) _, wErr := UploadCheck(oidPath)
if wErr == nil { if wErr == nil {
t.Fatal("no error?") t.Fatal(wErr)
} }
if wErr.Panic { if wErr.Panic {
@ -594,6 +632,11 @@ func TestUploadStorageError(t *testing.T) {
mux := http.NewServeMux() mux := http.NewServeMux()
server := httptest.NewServer(mux) server := httptest.NewServer(mux)
tmp := tempdir(t) tmp := tempdir(t)
olddir := LocalMediaDir
LocalMediaDir = tmp
defer func() {
LocalMediaDir = olddir
}()
defer server.Close() defer server.Close()
defer os.RemoveAll(tmp) defer os.RemoveAll(tmp)
@ -626,7 +669,7 @@ func TestUploadStorageError(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if reqObj.Oid != "oid" { if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" {
t.Errorf("invalid oid from request: %s", reqObj.Oid) t.Errorf("invalid oid from request: %s", reqObj.Oid)
} }
@ -635,6 +678,8 @@ func TestUploadStorageError(t *testing.T) {
} }
obj := &objectResource{ obj := &objectResource{
Oid: reqObj.Oid,
Size: reqObj.Size,
Links: map[string]*linkRelation{ Links: map[string]*linkRelation{
"upload": &linkRelation{ "upload": &linkRelation{
Href: server.URL + "/upload", Href: server.URL + "/upload",
@ -667,14 +712,18 @@ func TestUploadStorageError(t *testing.T) {
Config.SetConfig("lfs.url", server.URL+"/media") Config.SetConfig("lfs.url", server.URL+"/media")
oidPath := filepath.Join(tmp, "oid") oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11")
if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil {
t.Fatal(err) t.Fatal(err)
} }
wErr := Upload(oidPath, "", nil) obj, wErr := UploadCheck(oidPath)
if wErr != nil {
t.Fatal(wErr)
}
wErr = UploadObject(obj, nil)
if wErr == nil { if wErr == nil {
t.Fatal("no error?") t.Fatal("Expected an error")
} }
if wErr.Panic { if wErr.Panic {
@ -698,6 +747,11 @@ func TestUploadVerifyError(t *testing.T) {
mux := http.NewServeMux() mux := http.NewServeMux()
server := httptest.NewServer(mux) server := httptest.NewServer(mux)
tmp := tempdir(t) tmp := tempdir(t)
olddir := LocalMediaDir
LocalMediaDir = tmp
defer func() {
LocalMediaDir = olddir
}()
defer server.Close() defer server.Close()
defer os.RemoveAll(tmp) defer os.RemoveAll(tmp)
@ -731,7 +785,7 @@ func TestUploadVerifyError(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if reqObj.Oid != "oid" { if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" {
t.Errorf("invalid oid from request: %s", reqObj.Oid) t.Errorf("invalid oid from request: %s", reqObj.Oid)
} }
@ -740,6 +794,8 @@ func TestUploadVerifyError(t *testing.T) {
} }
obj := &objectResource{ obj := &objectResource{
Oid: reqObj.Oid,
Size: reqObj.Size,
Links: map[string]*linkRelation{ Links: map[string]*linkRelation{
"upload": &linkRelation{ "upload": &linkRelation{
Href: server.URL + "/upload", Href: server.URL + "/upload",
@ -804,14 +860,18 @@ func TestUploadVerifyError(t *testing.T) {
Config.SetConfig("lfs.url", server.URL+"/media") Config.SetConfig("lfs.url", server.URL+"/media")
oidPath := filepath.Join(tmp, "oid") oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11")
if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil {
t.Fatal(err) t.Fatal(err)
} }
wErr := Upload(oidPath, "", nil) obj, wErr := UploadCheck(oidPath)
if wErr != nil {
t.Fatal(wErr)
}
wErr = UploadObject(obj, nil)
if wErr == nil { if wErr == nil {
t.Fatal("no error?") t.Fatal("Expected an error")
} }
if wErr.Panic { if wErr.Panic {

@ -85,7 +85,11 @@ func lfsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/vnd.git-lfs+json") w.Header().Set("Content-Type", "application/vnd.git-lfs+json")
switch r.Method { switch r.Method {
case "POST": case "POST":
if strings.HasSuffix(r.URL.String(), "batch") {
lfsBatchHandler(w, r)
} else {
lfsPostHandler(w, r) lfsPostHandler(w, r)
}
case "GET": case "GET":
lfsGetHandler(w, r) lfsGetHandler(w, r)
default: default:
@ -111,6 +115,8 @@ func lfsPostHandler(w http.ResponseWriter, r *http.Request) {
} }
res := &lfsObject{ res := &lfsObject{
Oid: obj.Oid,
Size: obj.Size,
Links: map[string]lfsLink{ Links: map[string]lfsLink{
"upload": lfsLink{ "upload": lfsLink{
Href: server.URL + "/storage/" + obj.Oid, Href: server.URL + "/storage/" + obj.Oid,
@ -162,6 +168,50 @@ func lfsGetHandler(w http.ResponseWriter, r *http.Request) {
w.Write(by) w.Write(by)
} }
func lfsBatchHandler(w http.ResponseWriter, r *http.Request) {
buf := &bytes.Buffer{}
tee := io.TeeReader(r.Body, buf)
var objs map[string][]lfsObject
err := json.NewDecoder(tee).Decode(&objs)
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
log.Println("REQUEST")
log.Println(buf.String())
if err != nil {
log.Fatal(err)
}
res := []lfsObject{}
for _, obj := range objs["objects"] {
o := lfsObject{
Oid: obj.Oid,
Size: obj.Size,
Links: map[string]lfsLink{
"upload": lfsLink{
Href: server.URL + "/storage/" + obj.Oid,
},
},
}
res = append(res, o)
}
ores := map[string][]lfsObject{"objects": res}
by, err := json.Marshal(ores)
if err != nil {
log.Fatal(err)
}
log.Println("RESPONSE: 200")
log.Println(string(by))
w.WriteHeader(200)
w.Write(by)
}
// handles any /storage/{oid} requests // handles any /storage/{oid} requests
func storageHandler(w http.ResponseWriter, r *http.Request) { func storageHandler(w http.ResponseWriter, r *http.Request) {
log.Printf("storage %s %s\n", r.Method, r.URL) log.Printf("storage %s %s\n", r.Method, r.URL)

67
test/test-batch-transfer.sh Executable file

@ -0,0 +1,67 @@
#!/bin/sh
# This is a sample Git LFS test. See test/README.md and testhelpers.sh for
# more documentation.
. "test/testlib.sh"
begin_test "batch transfer"
(
set -e
# This initializes a new bare git repository in test/remote.
# These remote repositories are global to every test, so keep the names
# unique.
reponame="$(basename "$0" ".sh")"
setup_remote_repo "$reponame"
# Clone the repository from the test Git server. This is empty, and will be
# used to test a "git pull" below. The repo is cloned to $TRASHDIR/clone
clone_repo "$reponame" clone
# Clone the repository again to $TRASHDIR/repo. This will be used to commit
# and push objects.
clone_repo "$reponame" repo
# This executes Git LFS from the local repo that was just cloned.
git lfs track "*.dat" 2>&1 | tee track.log
grep "Tracking \*.dat" track.log
contents="a"
contents_oid=$(printf "$contents" | shasum -a 256 | cut -f 1 -d " ")
printf "$contents" > a.dat
git add a.dat
git add .gitattributes
git commit -m "add a.dat" 2>&1 | tee commit.log
grep "master (root-commit)" commit.log
grep "2 files changed" commit.log
grep "create mode 100644 a.dat" commit.log
grep "create mode 100644 .gitattributes" commit.log
[ "a" = "$(cat a.dat)" ]
# This is a small shell function that runs several git commands together.
assert_pointer "master" "a.dat" "$contents_oid" 1
refute_server_object "$contents_oid"
# Ensure batch transfer is turned on for this repo
git config --add --local lfs.batch true
# This pushes to the remote repository set up at the top of the test.
git push origin master 2>&1 | tee push.log
grep "(1 of 1 files)" push.log
grep "master -> master" push.log
assert_server_object "$contents_oid" "$contents"
# change to the clone's working directory
cd ../clone
git pull 2>&1 | grep "Downloading a.dat (1 B)"
[ "a" = "$(cat a.dat)" ]
assert_pointer "master" "a.dat" "$contents_oid" 1
)
end_test

@ -14,6 +14,8 @@ begin_test "env with no remote"
LocalGitDir=$TRASHDIR/$reponame/.git LocalGitDir=$TRASHDIR/$reponame/.git
LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects
TempDir=$TRASHDIR/$reponame/.git/lfs/tmp TempDir=$TRASHDIR/$reponame/.git/lfs/tmp
ConcurrentTransfers=3
BatchTransfer=false
$(env | grep "^GIT") $(env | grep "^GIT")
") ")
actual=$(git lfs env) actual=$(git lfs env)
@ -35,6 +37,8 @@ LocalWorkingDir=$TRASHDIR/$reponame
LocalGitDir=$TRASHDIR/$reponame/.git LocalGitDir=$TRASHDIR/$reponame/.git
LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects
TempDir=$TRASHDIR/$reponame/.git/lfs/tmp TempDir=$TRASHDIR/$reponame/.git/lfs/tmp
ConcurrentTransfers=3
BatchTransfer=false
$(env | grep "^GIT") $(env | grep "^GIT")
") ")
actual=$(git lfs env) actual=$(git lfs env)
@ -62,6 +66,8 @@ LocalWorkingDir=$TRASHDIR/$reponame
LocalGitDir=$TRASHDIR/$reponame/.git LocalGitDir=$TRASHDIR/$reponame/.git
LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects
TempDir=$TRASHDIR/$reponame/.git/lfs/tmp TempDir=$TRASHDIR/$reponame/.git/lfs/tmp
ConcurrentTransfers=3
BatchTransfer=false
$(env | grep "^GIT") $(env | grep "^GIT")
") ")
actual=$(git lfs env) actual=$(git lfs env)
@ -87,6 +93,8 @@ LocalWorkingDir=$TRASHDIR/$reponame
LocalGitDir=$TRASHDIR/$reponame/.git LocalGitDir=$TRASHDIR/$reponame/.git
LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects
TempDir=$TRASHDIR/$reponame/.git/lfs/tmp TempDir=$TRASHDIR/$reponame/.git/lfs/tmp
ConcurrentTransfers=3
BatchTransfer=false
$(env | grep "^GIT") $(env | grep "^GIT")
") ")
actual=$(git lfs env) actual=$(git lfs env)
@ -115,6 +123,8 @@ LocalWorkingDir=$TRASHDIR/$reponame
LocalGitDir=$TRASHDIR/$reponame/.git LocalGitDir=$TRASHDIR/$reponame/.git
LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects
TempDir=$TRASHDIR/$reponame/.git/lfs/tmp TempDir=$TRASHDIR/$reponame/.git/lfs/tmp
ConcurrentTransfers=3
BatchTransfer=false
$(env | grep "^GIT") $(env | grep "^GIT")
") ")
actual=$(git lfs env) actual=$(git lfs env)
@ -145,6 +155,42 @@ LocalWorkingDir=$TRASHDIR/$reponame
LocalGitDir=$TRASHDIR/$reponame/.git LocalGitDir=$TRASHDIR/$reponame/.git
LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects
TempDir=$TRASHDIR/$reponame/.git/lfs/tmp TempDir=$TRASHDIR/$reponame/.git/lfs/tmp
ConcurrentTransfers=3
BatchTransfer=false
$(env | grep "^GIT")
")
actual=$(git lfs env)
[ "$expected" = "$actual" ]
cd .git
[ "$expected" = "$actual" ]
)
end_test
begin_test "env with multiple remotes and lfs url and batch configs"
(
set -e
reponame="env-multiple-remotes-lfs-batch-configs"
mkdir $reponame
cd $reponame
git init
git remote add origin "$GITSERVER/env-origin-remote"
git remote add other "$GITSERVER/env-other-remote"
git config lfs.url "http://foo/bar"
git config lfs.batch true
git config lfs.concurrenttransfers 5
git config remote.origin.lfsurl "http://custom/origin"
git config remote.other.lfsurl "http://custom/other"
expected=$(printf "Endpoint=http://foo/bar
Endpoint (other)=http://custom/other
LocalWorkingDir=$TRASHDIR/$reponame
LocalGitDir=$TRASHDIR/$reponame/.git
LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects
TempDir=$TRASHDIR/$reponame/.git/lfs/tmp
ConcurrentTransfers=5
BatchTransfer=true
$(env | grep "^GIT") $(env | grep "^GIT")
") ")
actual=$(git lfs env) actual=$(git lfs env)

@ -48,7 +48,7 @@ begin_test "happy path"
# This pushes to the remote repository set up at the top of the test. # This pushes to the remote repository set up at the top of the test.
git push origin master 2>&1 | tee push.log git push origin master 2>&1 | tee push.log
grep "(1 of 1 files) 1 B / 1 B 100.00 %" push.log grep "(1 of 1 files)" push.log
grep "master -> master" push.log grep "master -> master" push.log
assert_server_object "$contents_oid" "$contents" assert_server_object "$contents_oid" "$contents"

@ -16,11 +16,8 @@ begin_test "pre-push"
echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" | echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" |
git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 | git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 |
tee push.log | tee push.log
grep "(0 of 0 files) 0 B 0" || { grep "(0 of 0 files) 0 B 0" push.log
cat push.log
exit 1
}
git lfs track "*.dat" git lfs track "*.dat"
echo "hi" > hi.dat echo "hi" > hi.dat
@ -33,31 +30,23 @@ begin_test "pre-push"
curl -v "$GITSERVER/$reponame.git/info/lfs/objects/98ea6e4f216f2fb4b69fff9b3a44842c38686ca685f3f55dc48c5d3fb1107be4" \ curl -v "$GITSERVER/$reponame.git/info/lfs/objects/98ea6e4f216f2fb4b69fff9b3a44842c38686ca685f3f55dc48c5d3fb1107be4" \
-u "user:pass" \ -u "user:pass" \
-H "Accept: application/vnd.git-lfs+json" 2>&1 | -H "Accept: application/vnd.git-lfs+json" 2>&1 |
tee http.log | tee http.log
grep "404 Not Found" || {
cat http.log grep "404 Not Found" http.log
exit 1
}
# push file to the git lfs server # push file to the git lfs server
echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" | echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" |
git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 | git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 |
tee push.log | tee push.log
grep "(1 of 1 files) 3 B / 3 B 100.00 %" || { grep "(1 of 1 files)" push.log
cat push.log
exit 1
}
# now the file exists # now the file exists
curl -v "$GITSERVER/$reponame.git/info/lfs/objects/98ea6e4f216f2fb4b69fff9b3a44842c38686ca685f3f55dc48c5d3fb1107be4" \ curl -v "$GITSERVER/$reponame.git/info/lfs/objects/98ea6e4f216f2fb4b69fff9b3a44842c38686ca685f3f55dc48c5d3fb1107be4" \
-u "user:pass" \ -u "user:pass" \
-o lfs.json \ -o lfs.json \
-H "Accept: application/vnd.git-lfs+json" 2>&1 | -H "Accept: application/vnd.git-lfs+json" 2>&1 |
tee http.log | tee http.log
grep "200 OK" || { grep "200 OK" http.log
cat http.log
exit 1
}
grep "download" lfs.json || { grep "download" lfs.json || {
cat lfs.json cat lfs.json
@ -95,28 +84,19 @@ begin_test "pre-push dry-run"
curl -v "$GITSERVER/$reponame.git/info/lfs/objects/2840e0eafda1d0760771fe28b91247cf81c76aa888af28a850b5648a338dc15b" \ curl -v "$GITSERVER/$reponame.git/info/lfs/objects/2840e0eafda1d0760771fe28b91247cf81c76aa888af28a850b5648a338dc15b" \
-u "user:pass" \ -u "user:pass" \
-H "Accept: application/vnd.git-lfs+json" 2>&1 | -H "Accept: application/vnd.git-lfs+json" 2>&1 |
tee http.log | tee http.log
grep "404 Not Found" || { grep "404 Not Found" http.log
cat http.log
exit 1
}
echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" | echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" |
git lfs pre-push --dry-run origin "$GITSERVER/$reponame" 2>&1 | git lfs pre-push --dry-run origin "$GITSERVER/$reponame" 2>&1 |
tee push.log | tee push.log
grep "push hi.dat" || { grep "push hi.dat" push.log
cat push.log
exit 1
}
# file still doesn't exist # file still doesn't exist
curl -v "$GITSERVER/$reponame.git/info/lfs/objects/2840e0eafda1d0760771fe28b91247cf81c76aa888af28a850b5648a338dc15b" \ curl -v "$GITSERVER/$reponame.git/info/lfs/objects/2840e0eafda1d0760771fe28b91247cf81c76aa888af28a850b5648a338dc15b" \
-u "user:pass" \ -u "user:pass" \
-H "Accept: application/vnd.git-lfs+json" 2>&1 | -H "Accept: application/vnd.git-lfs+json" 2>&1 |
tee http.log | tee http.log
grep "404 Not Found" || { grep "404 Not Found" http.log
cat http.log
exit 1
}
) )
end_test end_test

@ -15,24 +15,16 @@ begin_test "push"
git add .gitattributes a.dat git add .gitattributes a.dat
git commit -m "add a.dat" git commit -m "add a.dat"
git lfs push origin master 2>&1 | git lfs push origin master 2>&1 | tee push.log
tee push.log | grep "(1 of 1 files)" push.log
grep "(1 of 1 files) 7 B / 7 B 100.00 %" || {
cat push.log
exit 1
}
git checkout -b push-b git checkout -b push-b
echo "push b" > b.dat echo "push b" > b.dat
git add b.dat git add b.dat
git commit -m "add b.dat" git commit -m "add b.dat"
git lfs push origin push-b 2>&1 | git lfs push origin push-b 2>&1 | tee push.log
tee push.log | grep "(2 of 2 files)" push.log
grep "(2 of 2 files) 14 B / 14 B 100.00 %" || {
cat push.log
exit 1
}
) )
end_test end_test