From 694a6fda305560a1b7f6f9a7a4c796fd8a410c0d Mon Sep 17 00:00:00 2001 From: rubyist Date: Tue, 5 May 2015 11:11:14 -0400 Subject: [PATCH 01/36] Draft an endpoint for batch upload/download operations --- docs/api.md | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/docs/api.md b/docs/api.md index 4d8d6ddb..13810341 100644 --- a/docs/api.md +++ b/docs/api.md @@ -266,6 +266,66 @@ only appears on a 200 status. * 403 - The user has **read**, but not **write** access. * 404 - The repository does not exist for the user. +## POST /objects/batch + +This request retrieves the metadata for a batch of objects, given a JSON body +containing an array of objects with the oid and size of each object. + +``` +> POST https://git-lfs-server.com/objects/batch HTTP/1.1 +> Accept: application/vnd.git-lfs+json +> Content-Type: application/vnd.git-lfs+json +> Authorization: Basic ... (if authentication is needed) +> +> [ +> { +> "oid": "1111111", +> "size": 123 +> } +> ] +> +< HTTP/1.1 200 Accepted +< Content-Type: application/vnd.git-lfs+json +< +< [ +< { +< "oid": "1111111", +< "_links": { +< "upload": { +< "href": "https://some-upload.com", +< "header": { +< "Key": "value" +< } +< }, +< "verify": { +< "href": "https://some-callback.com", +< "header": { +< "Key": "value" +< } +< } +< } +< } +< ] +``` + +The response will be an array of objects containing one of multiple link relations, +each with an `href` property and an optional `header` property. + +* `upload` - This relation describes how to upload the object. Expect this with +when the object has not been previously uploaded. +* `verify` - The server can specify a URL for the client to hit after +successfully uploading an object. This is an optional relation for the case that +the server has not verified the object. +* `download` - This relation describes how to download the object content. This +only appears if an object has been previously uploaded. + +### Responses + +* 200 - OK +* 401 - The authentication credentials are incorrect. +* 403 - The user has **read**, but not **write** access. +* 404 - The repository does not exist for the user. + ## Verification When Git LFS clients issue a POST request to initiate an object upload, the From 978471f77dbc0555fdb486f1fd7c16aa52920428 Mon Sep 17 00:00:00 2001 From: rubyist Date: Tue, 5 May 2015 16:08:37 -0400 Subject: [PATCH 02/36] accept and return an object with an array instead of an array --- docs/api.md | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/docs/api.md b/docs/api.md index 13810341..3fe22c04 100644 --- a/docs/api.md +++ b/docs/api.md @@ -269,7 +269,7 @@ only appears on a 200 status. ## POST /objects/batch This request retrieves the metadata for a batch of objects, given a JSON body -containing an array of objects with the oid and size of each object. +containing an object with an array of objects with the oid and size of each object. ``` > POST https://git-lfs-server.com/objects/batch HTTP/1.1 @@ -277,18 +277,20 @@ containing an array of objects with the oid and size of each object. > Content-Type: application/vnd.git-lfs+json > Authorization: Basic ... (if authentication is needed) > -> [ -> { -> "oid": "1111111", -> "size": 123 -> } -> ] +> { +> "objects": [ +> { +> "oid": "1111111", +> "size": 123 +> } +> ] +> } > < HTTP/1.1 200 Accepted < Content-Type: application/vnd.git-lfs+json < -< [ -< { +< { +< "objects": [ < "oid": "1111111", < "_links": { < "upload": { @@ -304,12 +306,13 @@ containing an array of objects with the oid and size of each object. < } < } < } -< } -< ] +< ] +< } ``` -The response will be an array of objects containing one of multiple link relations, -each with an `href` property and an optional `header` property. +The response will be an object containing an array of objects with one of +multiple link relations, each with an `href` property and an optional `header` +property. * `upload` - This relation describes how to upload the object. Expect this with when the object has not been previously uploaded. From 52dac4f75a3a29b0e496092710c86fa40862d2ef Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 7 May 2015 14:33:51 -0400 Subject: [PATCH 03/36] Initial rough draft of client code for the Batch endpoint and batch uploads --- lfs/client.go | 188 +++++++++++++++++++++++++++++++++++++++++++- lfs/upload_queue.go | 155 +++++++++++++++++++++++++++--------- 2 files changed, 302 insertions(+), 41 deletions(-) diff --git a/lfs/client.go b/lfs/client.go index a2ea99ad..17a40093 100644 --- a/lfs/client.go +++ b/lfs/client.go @@ -133,6 +133,46 @@ func (b *byteCloser) Close() error { return nil } +func Batch(uploads []*Uploadable) ([]*objectResource, *WrappedError) { + type objects struct { + Objects []*objectResource `json:"objects"` + } + + o := &objects{make([]*objectResource, 0, len(uploads))} + for _, u := range uploads { + o.Objects = append(o.Objects, &objectResource{Oid: u.OID, Size: u.Size}) + } + + by, err := json.Marshal(o) + if err != nil { + return nil, Error(err) + } + + req, creds, err := newApiRequest("POST", "batch") + if err != nil { + return nil, Error(err) + } + + req.Header.Set("Content-Type", mediaType) + req.Header.Set("Content-Length", strconv.Itoa(len(by))) + req.ContentLength = int64(len(by)) + req.Body = &byteCloser{bytes.NewReader(by)} + + tracerx.Printf("api: batch %d files", len(uploads)) + res, objs, wErr := doApiBatchRequest(req, creds) + if wErr != nil { + sendApiEvent(apiEventFail) + return nil, wErr + } + + sendApiEvent(apiEventSuccess) + if res.StatusCode != 200 { + return nil, Errorf(nil, "Invalid status for %s %s: %d", req.Method, req.URL, res.StatusCode) + } + + return objs, nil +} + func Upload(oidPath, filename string, cb CopyCallback) *WrappedError { oid := filepath.Base(oidPath) file, err := os.Open(oidPath) @@ -231,6 +271,126 @@ func Upload(oidPath, filename string, cb CopyCallback) *WrappedError { return wErr } +func UploadCheck(oidPath string) (*objectResource, *WrappedError) { + oid := filepath.Base(oidPath) + file, err := os.Open(oidPath) + if err != nil { + sendApiEvent(apiEventFail) + return nil, Error(err) + } + defer file.Close() + + stat, err := file.Stat() // Stat without opening TODO + if err != nil { + sendApiEvent(apiEventFail) + return nil, Error(err) + } + + reqObj := &objectResource{ + Oid: oid, + Size: stat.Size(), + } + + by, err := json.Marshal(reqObj) + if err != nil { + sendApiEvent(apiEventFail) + return nil, Error(err) + } + + req, creds, err := newApiRequest("POST", oid) + if err != nil { + sendApiEvent(apiEventFail) + return nil, Error(err) + } + + req.Header.Set("Content-Type", mediaType) + req.Header.Set("Content-Length", strconv.Itoa(len(by))) + req.ContentLength = int64(len(by)) + req.Body = &byteCloser{bytes.NewReader(by)} + + tracerx.Printf("api: uploading (%s)", oid) + res, obj, wErr := doApiRequest(req, creds) + if wErr != nil { + sendApiEvent(apiEventFail) + return nil, wErr + } + + sendApiEvent(apiEventSuccess) + + if res.StatusCode == 200 { + return nil, nil + } + + return obj, nil +} + +func UploadObject(o *objectResource, cb CopyCallback) *WrappedError { + path, err := LocalMediaPath(o.Oid) + if err != nil { + return Error(err) + } + + file, err := os.Open(path) + if err != nil { + return Error(err) + } + defer file.Close() + + reader := &CallbackReader{ + C: cb, + TotalSize: o.Size, + Reader: file, + } + + req, creds, err := o.NewRequest("upload", "PUT") + if err != nil { + return Error(err) + } + + if len(req.Header.Get("Content-Type")) == 0 { + req.Header.Set("Content-Type", "application/octet-stream") + } + req.Header.Set("Content-Length", strconv.FormatInt(o.Size, 10)) + req.ContentLength = o.Size + + req.Body = ioutil.NopCloser(reader) + + res, wErr := doHttpRequest(req, creds) + if wErr != nil { + return wErr + } + + if res.StatusCode > 299 { + return Errorf(nil, "Invalid status for %s %s: %d", req.Method, req.URL, res.StatusCode) + } + + io.Copy(ioutil.Discard, res.Body) + res.Body.Close() + + req, creds, err = o.NewRequest("verify", "POST") + if err == objectRelationDoesNotExist { + return nil + } else if err != nil { + return Error(err) + } + + by, err := json.Marshal(o) + if err != nil { + return Error(err) + } + + req.Header.Set("Content-Type", mediaType) + req.Header.Set("Content-Length", strconv.Itoa(len(by))) + req.ContentLength = int64(len(by)) + req.Body = ioutil.NopCloser(bytes.NewReader(by)) + res, wErr = doHttpRequest(req, creds) + + io.Copy(ioutil.Discard, res.Body) + res.Body.Close() + + return wErr +} + func doHttpRequest(req *http.Request, creds Creds) (*http.Response, *WrappedError) { res, err := DoHTTP(Config, req) @@ -308,6 +468,27 @@ func doApiRequest(req *http.Request, creds Creds) (*http.Response, *objectResour return res, obj, wErr } +func doApiBatchRequest(req *http.Request, creds Creds) (*http.Response, []*objectResource, *WrappedError) { + via := make([]*http.Request, 0, 4) + res, wErr := doApiRequestWithRedirects(req, creds, via) + if wErr != nil { + return res, nil, wErr + } + + type ro struct { + Objects []*objectResource `json:"objects"` + } + + var objs ro + wErr = decodeApiResponse(res, &objs) + + if wErr != nil { + setErrorResponseContext(wErr, res) + } + + return res, objs.Objects, wErr +} + func handleResponse(res *http.Response) *WrappedError { if res.StatusCode < 400 { return nil @@ -343,6 +524,7 @@ func decodeApiResponse(res *http.Response, obj interface{}) *WrappedError { res.Body.Close() if err != nil { + fmt.Printf("DECODE ERROR: %s\n", err) return Errorf(err, "Unable to parse HTTP response for %s %s", res.Request.Method, res.Request.URL) } @@ -380,8 +562,10 @@ func newApiRequest(method, oid string) (*http.Request, Creds, error) { objectOid := oid operation := "download" if method == "POST" { - objectOid = "" - operation = "upload" + if oid != "batch" { + objectOid = "" + operation = "upload" + } } res, err := sshAuthenticate(endpoint, operation, oid) diff --git a/lfs/upload_queue.go b/lfs/upload_queue.go index 145fd1fe..ceeb3907 100644 --- a/lfs/upload_queue.go +++ b/lfs/upload_queue.go @@ -15,10 +15,12 @@ var ( // Uploadable describes a file that can be uploaded. type Uploadable struct { + OID string OIDPath string Filename string CB CopyCallback Size int64 + object *objectResource } // NewUploadable builds the Uploadable from the given information. @@ -46,47 +48,130 @@ func NewUploadable(oid, filename string, index, totalFiles int) (*Uploadable, *W defer file.Close() } - return &Uploadable{path, filename, cb, fi.Size()}, nil + return &Uploadable{OID: oid, OIDPath: path, Filename: filename, CB: cb, Size: fi.Size()}, nil } // UploadQueue provides a queue that will allow concurrent uploads. type UploadQueue struct { - uploadc chan *Uploadable - errorc chan *WrappedError - errors []*WrappedError - wg sync.WaitGroup - workers int - files int - finished int64 - size int64 - authCond *sync.Cond + uploadc chan *Uploadable + errorc chan *WrappedError + errors []*WrappedError + wg sync.WaitGroup + workers int + files int + finished int64 + size int64 + authCond *sync.Cond + uploadables map[string]*Uploadable + bar *pb.ProgressBar } // NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads. func NewUploadQueue(workers, files int) *UploadQueue { return &UploadQueue{ - uploadc: make(chan *Uploadable, files), - errorc: make(chan *WrappedError), - workers: workers, - files: files, - authCond: sync.NewCond(&sync.Mutex{}), + uploadc: make(chan *Uploadable, files), + errorc: make(chan *WrappedError), + workers: workers, + files: files, + authCond: sync.NewCond(&sync.Mutex{}), + uploadables: make(map[string]*Uploadable), } } // Add adds an Uploadable to the upload queue. func (q *UploadQueue) Add(u *Uploadable) { - q.wg.Add(1) - q.size += u.Size - q.uploadc <- u + q.uploadables[u.OID] = u +} + +// apiWorker processes the queue, making the POST calls and +// feeding the results to uploadWorkers +func (q *UploadQueue) processIndividual() { + apic := make(chan *Uploadable, q.workers) + workersReady := make(chan int, q.workers) + var wg sync.WaitGroup + + for i := 0; i < q.workers; i++ { + go func() { + workersReady <- 1 + for u := range apic { + // If an API authorization has not occured, we wait until we're woken up. + q.authCond.L.Lock() + if atomic.LoadInt32(&clientAuthorized) == 0 { + q.authCond.Wait() + } + q.authCond.L.Unlock() + + obj, err := UploadCheck(u.OIDPath) + if err != nil { + q.errorc <- err + wg.Done() + continue + } + if obj != nil { + q.wg.Add(1) + u.object = obj + q.uploadc <- u + } + wg.Done() + } + }() + } + + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.uploadables))) + q.bar.Start() + + for _, u := range q.uploadables { + wg.Add(1) + apic <- u + } + + <-workersReady + q.authCond.Signal() // Signal the first goroutine to run + close(apic) + wg.Wait() + + close(q.uploadc) +} + +// batchWorker makes the batch POST call, feeding the results +// to the uploadWorkers +func (q *UploadQueue) processBatch() { + q.files = 0 + uploads := make([]*Uploadable, 0, len(q.uploadables)) + for _, u := range q.uploadables { + uploads = append(uploads, u) + } + + objects, err := Batch(uploads) + if err != nil { + q.errorc <- err + sendApiEvent(apiEventFail) + return + } + + for _, o := range objects { + if _, ok := o.Links["upload"]; ok { + // This object needs to be uploaded + if uploadable, ok := q.uploadables[o.Oid]; ok { + q.files++ + q.wg.Add(1) + uploadable.object = o + q.uploadc <- uploadable + } + } + } + + close(q.uploadc) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) + q.bar.Start() + sendApiEvent(apiEventSuccess) // Wake up upload workers } // Process starts the upload queue and displays a progress bar. func (q *UploadQueue) Process() { - bar := pb.New64(q.size) - bar.SetUnits(pb.U_BYTES) - bar.ShowBar = false - bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) - bar.Start() + q.bar = pb.New64(q.size) + q.bar.SetUnits(pb.U_BYTES) + q.bar.ShowBar = false // This goroutine collects errors returned from uploads go func() { @@ -116,48 +201,40 @@ func (q *UploadQueue) Process() { // This will block Process() until the worker goroutines are spun up and ready // to process uploads. - workersReady := make(chan int, q.workers) for i := 0; i < q.workers; i++ { // These are the worker goroutines that process uploads go func(n int) { - workersReady <- 1 for upload := range q.uploadc { - // If an API authorization has not occured, we wait until we're woken up. - q.authCond.L.Lock() - if atomic.LoadInt32(&clientAuthorized) == 0 { - q.authCond.Wait() - } - q.authCond.L.Unlock() - cb := func(total, read int64, current int) error { - bar.Add(current) + q.bar.Add(current) if upload.CB != nil { return upload.CB(total, read, current) } return nil } - err := Upload(upload.OIDPath, upload.Filename, cb) + err := UploadObject(upload.object, cb) if err != nil { q.errorc <- err } f := atomic.AddInt64(&q.finished, 1) - bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) q.wg.Done() } }(i) } - close(q.uploadc) - <-workersReady - q.authCond.Signal() // Signal the first goroutine to run + // Assume batch for now + //q.processBatch() + q.processIndividual() + q.wg.Wait() close(q.errorc) - bar.Finish() + q.bar.Finish() } // Errors returns any errors encountered during uploading. From a870cc83b9fbf674e489e1a6424f78a551d75d50 Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 7 May 2015 14:37:30 -0400 Subject: [PATCH 04/36] Flip batch with config --- lfs/config.go | 7 +++++++ lfs/upload_queue.go | 8 +++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/lfs/config.go b/lfs/config.go index 1c3cab9a..b013fabe 100644 --- a/lfs/config.go +++ b/lfs/config.go @@ -83,6 +83,13 @@ func (c *Configuration) ConcurrentUploads() int { return uploads } +func (c *Configuration) BatchTransfer() bool { + if v, ok := c.GitConfig("lfs.batch"); ok { + return v == "true" + } + return false +} + func (c *Configuration) RemoteEndpoint(remote string) Endpoint { if len(remote) == 0 { remote = defaultRemote diff --git a/lfs/upload_queue.go b/lfs/upload_queue.go index ceeb3907..12db5a36 100644 --- a/lfs/upload_queue.go +++ b/lfs/upload_queue.go @@ -227,9 +227,11 @@ func (q *UploadQueue) Process() { }(i) } - // Assume batch for now - //q.processBatch() - q.processIndividual() + if Config.BatchTransfer() { + q.processBatch() + } else { + q.processIndividual() + } q.wg.Wait() close(q.errorc) From 93381bf0d49cd39f72d0cc9975e84856cd967435 Mon Sep 17 00:00:00 2001 From: rubyist Date: Tue, 12 May 2015 10:35:55 -0400 Subject: [PATCH 05/36] Batch takes objectResource instead of Uploadable --- lfs/client.go | 11 ++++------- lfs/upload_queue.go | 7 ++----- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/lfs/client.go b/lfs/client.go index 17a40093..314afced 100644 --- a/lfs/client.go +++ b/lfs/client.go @@ -133,15 +133,12 @@ func (b *byteCloser) Close() error { return nil } -func Batch(uploads []*Uploadable) ([]*objectResource, *WrappedError) { - type objects struct { +func Batch(objects []*objectResource) ([]*objectResource, *WrappedError) { + type objres struct { Objects []*objectResource `json:"objects"` } - o := &objects{make([]*objectResource, 0, len(uploads))} - for _, u := range uploads { - o.Objects = append(o.Objects, &objectResource{Oid: u.OID, Size: u.Size}) - } + o := &objres{objects} by, err := json.Marshal(o) if err != nil { @@ -158,7 +155,7 @@ func Batch(uploads []*Uploadable) ([]*objectResource, *WrappedError) { req.ContentLength = int64(len(by)) req.Body = &byteCloser{bytes.NewReader(by)} - tracerx.Printf("api: batch %d files", len(uploads)) + tracerx.Printf("api: batch %d files", len(objects)) res, objs, wErr := doApiBatchRequest(req, creds) if wErr != nil { sendApiEvent(apiEventFail) diff --git a/lfs/upload_queue.go b/lfs/upload_queue.go index 12db5a36..205fefad 100644 --- a/lfs/upload_queue.go +++ b/lfs/upload_queue.go @@ -137,9 +137,9 @@ func (q *UploadQueue) processIndividual() { // to the uploadWorkers func (q *UploadQueue) processBatch() { q.files = 0 - uploads := make([]*Uploadable, 0, len(q.uploadables)) + uploads := make([]*objectResource, 0, len(q.uploadables)) for _, u := range q.uploadables { - uploads = append(uploads, u) + uploads = append(uploads, &objectResource{Oid: u.OID, Size: u.Size}) } objects, err := Batch(uploads) @@ -199,9 +199,6 @@ func (q *UploadQueue) Process() { } }() - // This will block Process() until the worker goroutines are spun up and ready - // to process uploads. - for i := 0; i < q.workers; i++ { // These are the worker goroutines that process uploads go func(n int) { From b4a48449f9fae20863f54612578b95d7fe5f4eb3 Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 13 May 2015 10:23:49 -0400 Subject: [PATCH 06/36] Start a download queue --- lfs/client.go | 28 ++++++ lfs/download_queue.go | 205 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+) create mode 100644 lfs/download_queue.go diff --git a/lfs/client.go b/lfs/client.go index 314afced..83d05aa7 100644 --- a/lfs/client.go +++ b/lfs/client.go @@ -129,6 +129,34 @@ type byteCloser struct { *bytes.Reader } +func DownloadCheck(oid string) (*objectResource, *WrappedError) { + req, creds, err := newApiRequest("GET", oid) + if err != nil { + return nil, Error(err) + } + + _, obj, wErr := doApiRequest(req, creds) + if wErr != nil { + return nil, wErr + } + + return obj, nil +} + +func DownloadObject(obj *objectResource) (io.ReadCloser, int64, *WrappedError) { + req, creds, err := obj.NewRequest("download", "GET") + if err != nil { + return nil, 0, Error(err) + } + + res, wErr := doHttpRequest(req, creds) + if wErr != nil { + return nil, 0, wErr + } + + return res.Body, res.ContentLength, nil +} + func (b *byteCloser) Close() error { return nil } diff --git a/lfs/download_queue.go b/lfs/download_queue.go new file mode 100644 index 00000000..16d0182b --- /dev/null +++ b/lfs/download_queue.go @@ -0,0 +1,205 @@ +package lfs + +import ( + "fmt" + "github.com/cheggaaa/pb" + "sync" + "sync/atomic" +) + +var ( + downloadClientAuthorized = int32(0) +) + +type Downloadable struct { + OID string + Size int64 + Filename string + CB CopyCallback + object *objectResource +} + +// DownloadQueue provides a queue that will allow concurrent uploads. +type DownloadQueue struct { + downloadc chan *Downloadable + errorc chan *WrappedError + errors []*WrappedError + wg sync.WaitGroup + workers int + files int + finished int64 + size int64 + authCond *sync.Cond + downloadables map[string]*Downloadable + bar *pb.ProgressBar +} + +// NewDownloadQueue builds a DownloadQueue, allowing `workers` concurrent downloads. +func NewDownloadQueue(workers, files int) *DownloadQueue { + return &DownloadQueue{ + downloadc: make(chan *Downloadable, files), + errorc: make(chan *WrappedError), + workers: workers, + files: files, + authCond: sync.NewCond(&sync.Mutex{}), + downloadables: make(map[string]*Downloadable), + } +} + +// Add adds an object to the download queue. +func (q *DownloadQueue) Add(oid, filename string, size int64) { + // TODO create the callback and such + q.downloadables[oid] = &Downloadable{OID: oid, Filename: filename, Size: size} +} + +// apiWorker processes the queue, making the POST calls and +// feeding the results to uploadWorkers +func (q *DownloadQueue) processIndividual() { + apic := make(chan *Downloadable, q.workers) + workersReady := make(chan int, q.workers) + var wg sync.WaitGroup + + for i := 0; i < q.workers; i++ { + go func() { + workersReady <- 1 + for d := range apic { + // If an API authorization has not occured, we wait until we're woken up. + q.authCond.L.Lock() + if atomic.LoadInt32(&downloadClientAuthorized) == 0 { + q.authCond.Wait() + } + q.authCond.L.Unlock() + + obj, err := DownloadCheck(d.OID) + if err != nil { + q.errorc <- err + wg.Done() + continue + } + if obj != nil { + q.wg.Add(1) + d.object = obj + q.downloadc <- d + } + wg.Done() + } + }() + } + + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.downloadables))) + q.bar.Start() + + for _, d := range q.downloadables { + wg.Add(1) + apic <- d + } + + <-workersReady + q.authCond.Signal() // Signal the first goroutine to run + close(apic) + wg.Wait() + + close(q.downloadc) +} + +// batchWorker makes the batch POST call, feeding the results +// to the uploadWorkers +func (q *DownloadQueue) processBatch() { + q.files = 0 + downloads := make([]*objectResource, 0, len(q.downloadables)) + for _, d := range q.downloadables { + downloads = append(downloads, &objectResource{Oid: d.OID, Size: d.Size}) + } + + objects, err := Batch(downloads) + if err != nil { + q.errorc <- err + sendApiEvent(apiEventFail) + return + } + + for _, o := range objects { + if _, ok := o.Links["download"]; ok { + // This object can be downloaded + if downloadable, ok := q.downloadables[o.Oid]; ok { + q.files++ + q.wg.Add(1) + downloadable.object = o + q.downloadc <- downloadable + } + } + } + + close(q.downloadc) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) + q.bar.Start() + sendApiEvent(apiEventSuccess) // Wake up download workers +} + +// Process starts the download queue and displays a progress bar. +func (q *DownloadQueue) Process() { + q.bar = pb.New64(q.size) + q.bar.SetUnits(pb.U_BYTES) + q.bar.ShowBar = false + + // This goroutine collects errors returned from downloads + go func() { + for err := range q.errorc { + q.errors = append(q.errors, err) + } + }() + + // This goroutine watches for apiEvents. In order to prevent multiple + // credential requests from happening, the queue is processed sequentially + // until an API request succeeds (meaning authenication has happened successfully). + // Once the an API request succeeds, all worker goroutines are woken up and allowed + // to process downloads. Once a success happens, this goroutine exits. + go func() { + for { + event := <-apiEvent + switch event { + case apiEventSuccess: + atomic.StoreInt32(&downloadClientAuthorized, 1) + q.authCond.Broadcast() // Wake all remaining goroutines + return + case apiEventFail: + q.authCond.Signal() // Wake the next goroutine + } + } + }() + + for i := 0; i < q.workers; i++ { + // These are the worker goroutines that process uploads + go func(n int) { + + for download := range q.downloadc { + _, _, err := DownloadObject(download.object) + if err != nil { + q.errorc <- err + } + + // TODO: Process the download + + f := atomic.AddInt64(&q.finished, 1) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) + q.wg.Done() + } + }(i) + } + + if Config.BatchTransfer() { + q.processBatch() + } else { + q.processIndividual() + } + + q.wg.Wait() + close(q.errorc) + + q.bar.Finish() +} + +// Errors returns any errors encountered during uploading. +func (q *DownloadQueue) Errors() []*WrappedError { + return q.errors +} From bf966e0958f43d4137fb6161a7b62cc05776ec1f Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 13 May 2015 10:25:23 -0400 Subject: [PATCH 07/36] Should be part of the queues --- lfs/download_queue.go | 31 ++++++++++++++----------------- lfs/upload_queue.go | 31 ++++++++++++++----------------- 2 files changed, 28 insertions(+), 34 deletions(-) diff --git a/lfs/download_queue.go b/lfs/download_queue.go index 16d0182b..33244f2c 100644 --- a/lfs/download_queue.go +++ b/lfs/download_queue.go @@ -7,10 +7,6 @@ import ( "sync/atomic" ) -var ( - downloadClientAuthorized = int32(0) -) - type Downloadable struct { OID string Size int64 @@ -21,17 +17,18 @@ type Downloadable struct { // DownloadQueue provides a queue that will allow concurrent uploads. type DownloadQueue struct { - downloadc chan *Downloadable - errorc chan *WrappedError - errors []*WrappedError - wg sync.WaitGroup - workers int - files int - finished int64 - size int64 - authCond *sync.Cond - downloadables map[string]*Downloadable - bar *pb.ProgressBar + downloadc chan *Downloadable + errorc chan *WrappedError + errors []*WrappedError + wg sync.WaitGroup + workers int + files int + finished int64 + size int64 + authCond *sync.Cond + downloadables map[string]*Downloadable + bar *pb.ProgressBar + clientAuthorized int32 } // NewDownloadQueue builds a DownloadQueue, allowing `workers` concurrent downloads. @@ -65,7 +62,7 @@ func (q *DownloadQueue) processIndividual() { for d := range apic { // If an API authorization has not occured, we wait until we're woken up. q.authCond.L.Lock() - if atomic.LoadInt32(&downloadClientAuthorized) == 0 { + if atomic.LoadInt32(&q.clientAuthorized) == 0 { q.authCond.Wait() } q.authCond.L.Unlock() @@ -159,7 +156,7 @@ func (q *DownloadQueue) Process() { event := <-apiEvent switch event { case apiEventSuccess: - atomic.StoreInt32(&downloadClientAuthorized, 1) + atomic.StoreInt32(&q.clientAuthorized, 1) q.authCond.Broadcast() // Wake all remaining goroutines return case apiEventFail: diff --git a/lfs/upload_queue.go b/lfs/upload_queue.go index 205fefad..714896b3 100644 --- a/lfs/upload_queue.go +++ b/lfs/upload_queue.go @@ -9,10 +9,6 @@ import ( "sync/atomic" ) -var ( - clientAuthorized = int32(0) -) - // Uploadable describes a file that can be uploaded. type Uploadable struct { OID string @@ -53,17 +49,18 @@ func NewUploadable(oid, filename string, index, totalFiles int) (*Uploadable, *W // UploadQueue provides a queue that will allow concurrent uploads. type UploadQueue struct { - uploadc chan *Uploadable - errorc chan *WrappedError - errors []*WrappedError - wg sync.WaitGroup - workers int - files int - finished int64 - size int64 - authCond *sync.Cond - uploadables map[string]*Uploadable - bar *pb.ProgressBar + uploadc chan *Uploadable + errorc chan *WrappedError + errors []*WrappedError + wg sync.WaitGroup + workers int + files int + finished int64 + size int64 + authCond *sync.Cond + uploadables map[string]*Uploadable + bar *pb.ProgressBar + clientAuthorized int32 } // NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads. @@ -96,7 +93,7 @@ func (q *UploadQueue) processIndividual() { for u := range apic { // If an API authorization has not occured, we wait until we're woken up. q.authCond.L.Lock() - if atomic.LoadInt32(&clientAuthorized) == 0 { + if atomic.LoadInt32(&q.clientAuthorized) == 0 { q.authCond.Wait() } q.authCond.L.Unlock() @@ -190,7 +187,7 @@ func (q *UploadQueue) Process() { event := <-apiEvent switch event { case apiEventSuccess: - atomic.StoreInt32(&clientAuthorized, 1) + atomic.StoreInt32(&q.clientAuthorized, 1) q.authCond.Broadcast() // Wake all remaining goroutines return case apiEventFail: From f2ecad00b71decf36c85d3dc18c0aa92d0976e52 Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 13 May 2015 11:27:06 -0400 Subject: [PATCH 08/36] ConcurrentUploads => ConcurrentTransfers --- commands/command_push.go | 2 +- commands/commands_pre_push.go | 2 +- lfs/config.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/commands/command_push.go b/commands/command_push.go index d2818ea2..0aeaac6a 100644 --- a/commands/command_push.go +++ b/commands/command_push.go @@ -97,7 +97,7 @@ func pushCommand(cmd *cobra.Command, args []string) { Panic(err, "Error scanning for Git LFS files") } - uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentUploads(), len(pointers)) + uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) for i, pointer := range pointers { if pushDryRun { diff --git a/commands/commands_pre_push.go b/commands/commands_pre_push.go index 754abe34..87360231 100644 --- a/commands/commands_pre_push.go +++ b/commands/commands_pre_push.go @@ -70,7 +70,7 @@ func prePushCommand(cmd *cobra.Command, args []string) { Panic(err, "Error scanning for Git LFS files") } - uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentUploads(), len(pointers)) + uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) for i, pointer := range pointers { if prePushDryRun { diff --git a/lfs/config.go b/lfs/config.go index b013fabe..ace38767 100644 --- a/lfs/config.go +++ b/lfs/config.go @@ -70,10 +70,10 @@ func (c *Configuration) Endpoint() Endpoint { return c.RemoteEndpoint(defaultRemote) } -func (c *Configuration) ConcurrentUploads() int { +func (c *Configuration) ConcurrentTransfers() int { uploads := 3 - if v, ok := c.GitConfig("lfs.concurrentuploads"); ok { + if v, ok := c.GitConfig("lfs.concurrenttransfers"); ok { n, err := strconv.Atoi(v) if err == nil { uploads = n From bf10519579736da6abc4362de4db2b7db14a9660 Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 13 May 2015 11:27:30 -0400 Subject: [PATCH 09/36] Naming consistency --- commands/{commands_pre_push.go => command_pre_push.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename commands/{commands_pre_push.go => command_pre_push.go} (100%) diff --git a/commands/commands_pre_push.go b/commands/command_pre_push.go similarity index 100% rename from commands/commands_pre_push.go rename to commands/command_pre_push.go From c869fc6ef504259eac32e8fc807dcb016dc37210 Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 14 May 2015 12:33:29 -0400 Subject: [PATCH 10/36] Basic "lfs get" that takes a ref, queues downloads --- commands/command_get.go | 46 ++++++++++++++++ lfs/client.go | 8 +++ lfs/download_queue.go | 117 +++++++++++++++------------------------- lfs/pointer_smudge.go | 8 +-- lfs/upload_queue.go | 2 +- 5 files changed, 103 insertions(+), 78 deletions(-) create mode 100644 commands/command_get.go diff --git a/commands/command_get.go b/commands/command_get.go new file mode 100644 index 00000000..52d02674 --- /dev/null +++ b/commands/command_get.go @@ -0,0 +1,46 @@ +package commands + +import ( + "github.com/github/git-lfs/git" + "github.com/github/git-lfs/lfs" + "github.com/spf13/cobra" +) + +var ( + getCmd = &cobra.Command{ + Use: "get", + Short: "get", + Run: getCommand, + } +) + +func getCommand(cmd *cobra.Command, args []string) { + var ref string + var err error + + if len(args) == 1 { + ref = args[0] + } else { + ref, err = git.CurrentRef() + if err != nil { + Panic(err, "Could not get") + } + } + + pointers, err := lfs.ScanRefs(ref, "") + if err != nil { + Panic(err, "Could not scan for Git LFS files") + } + + q := lfs.NewDownloadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) + + for _, p := range pointers { + q.Add(p) + } + + q.Process() +} + +func init() { + RootCmd.AddCommand(getCmd) +} diff --git a/lfs/client.go b/lfs/client.go index 83d05aa7..3eabcf54 100644 --- a/lfs/client.go +++ b/lfs/client.go @@ -109,9 +109,12 @@ func Download(oid string) (io.ReadCloser, int64, *WrappedError) { res, obj, wErr := doApiRequest(req, creds) if wErr != nil { + sendApiEvent(apiEventFail) return nil, 0, wErr } + sendApiEvent(apiEventSuccess) + req, creds, err = obj.NewRequest("download", "GET") if err != nil { return nil, 0, Error(err) @@ -140,6 +143,11 @@ func DownloadCheck(oid string) (*objectResource, *WrappedError) { return nil, wErr } + _, _, err = obj.NewRequest("download", "GET") + if err != nil { + return nil, Error(err) + } + return obj, nil } diff --git a/lfs/download_queue.go b/lfs/download_queue.go index 33244f2c..0a2aa66c 100644 --- a/lfs/download_queue.go +++ b/lfs/download_queue.go @@ -2,22 +2,17 @@ package lfs import ( "fmt" - "github.com/cheggaaa/pb" + "os" + "path/filepath" "sync" "sync/atomic" -) -type Downloadable struct { - OID string - Size int64 - Filename string - CB CopyCallback - object *objectResource -} + "github.com/cheggaaa/pb" +) // DownloadQueue provides a queue that will allow concurrent uploads. type DownloadQueue struct { - downloadc chan *Downloadable + downloadc chan *wrappedPointer errorc chan *WrappedError errors []*WrappedError wg sync.WaitGroup @@ -26,7 +21,7 @@ type DownloadQueue struct { finished int64 size int64 authCond *sync.Cond - downloadables map[string]*Downloadable + pointers []*wrappedPointer bar *pb.ProgressBar clientAuthorized int32 } @@ -34,32 +29,30 @@ type DownloadQueue struct { // NewDownloadQueue builds a DownloadQueue, allowing `workers` concurrent downloads. func NewDownloadQueue(workers, files int) *DownloadQueue { return &DownloadQueue{ - downloadc: make(chan *Downloadable, files), - errorc: make(chan *WrappedError), - workers: workers, - files: files, - authCond: sync.NewCond(&sync.Mutex{}), - downloadables: make(map[string]*Downloadable), + downloadc: make(chan *wrappedPointer, files), + errorc: make(chan *WrappedError), + workers: workers, + files: files, + authCond: sync.NewCond(&sync.Mutex{}), } } // Add adds an object to the download queue. -func (q *DownloadQueue) Add(oid, filename string, size int64) { - // TODO create the callback and such - q.downloadables[oid] = &Downloadable{OID: oid, Filename: filename, Size: size} +func (q *DownloadQueue) Add(p *wrappedPointer) { + q.pointers = append(q.pointers, p) } // apiWorker processes the queue, making the POST calls and // feeding the results to uploadWorkers func (q *DownloadQueue) processIndividual() { - apic := make(chan *Downloadable, q.workers) + apic := make(chan *wrappedPointer, q.files) workersReady := make(chan int, q.workers) var wg sync.WaitGroup for i := 0; i < q.workers; i++ { go func() { workersReady <- 1 - for d := range apic { + for p := range apic { // If an API authorization has not occured, we wait until we're woken up. q.authCond.L.Lock() if atomic.LoadInt32(&q.clientAuthorized) == 0 { @@ -67,28 +60,26 @@ func (q *DownloadQueue) processIndividual() { } q.authCond.L.Unlock() - obj, err := DownloadCheck(d.OID) + _, err := DownloadCheck(p.Oid) if err != nil { q.errorc <- err wg.Done() continue } - if obj != nil { - q.wg.Add(1) - d.object = obj - q.downloadc <- d - } + + q.wg.Add(1) + q.downloadc <- p wg.Done() } }() } - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.downloadables))) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.pointers))) q.bar.Start() - for _, d := range q.downloadables { + for _, p := range q.pointers { wg.Add(1) - apic <- d + apic <- p } <-workersReady @@ -99,40 +90,6 @@ func (q *DownloadQueue) processIndividual() { close(q.downloadc) } -// batchWorker makes the batch POST call, feeding the results -// to the uploadWorkers -func (q *DownloadQueue) processBatch() { - q.files = 0 - downloads := make([]*objectResource, 0, len(q.downloadables)) - for _, d := range q.downloadables { - downloads = append(downloads, &objectResource{Oid: d.OID, Size: d.Size}) - } - - objects, err := Batch(downloads) - if err != nil { - q.errorc <- err - sendApiEvent(apiEventFail) - return - } - - for _, o := range objects { - if _, ok := o.Links["download"]; ok { - // This object can be downloaded - if downloadable, ok := q.downloadables[o.Oid]; ok { - q.files++ - q.wg.Add(1) - downloadable.object = o - q.downloadc <- downloadable - } - } - } - - close(q.downloadc) - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) - q.bar.Start() - sendApiEvent(apiEventSuccess) // Wake up download workers -} - // Process starts the download queue and displays a progress bar. func (q *DownloadQueue) Process() { q.bar = pb.New64(q.size) @@ -169,13 +126,25 @@ func (q *DownloadQueue) Process() { // These are the worker goroutines that process uploads go func(n int) { - for download := range q.downloadc { - _, _, err := DownloadObject(download.object) + for ptr := range q.downloadc { + fullPath := filepath.Join(LocalWorkingDir, ptr.Name) + output, err := os.Create(fullPath) if err != nil { - q.errorc <- err + q.errorc <- Error(err) + f := atomic.AddInt64(&q.finished, 1) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) + q.wg.Done() + continue } - // TODO: Process the download + cb := func(total, read int64, current int) error { + q.bar.Add(current) + return nil + } + // TODO need a callback + if err := PointerSmudge(output, ptr.Pointer, ptr.Name, cb); err != nil { + q.errorc <- Error(err) + } f := atomic.AddInt64(&q.finished, 1) q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) @@ -184,11 +153,11 @@ func (q *DownloadQueue) Process() { }(i) } - if Config.BatchTransfer() { - q.processBatch() - } else { - q.processIndividual() - } + // if Config.BatchTransfer() { + // q.processBatch() + // } else { + q.processIndividual() + // } q.wg.Wait() close(q.errorc) diff --git a/lfs/pointer_smudge.go b/lfs/pointer_smudge.go index 4281547b..52d547fa 100644 --- a/lfs/pointer_smudge.go +++ b/lfs/pointer_smudge.go @@ -2,12 +2,13 @@ package lfs import ( "fmt" - "github.com/cheggaaa/pb" - "github.com/rubyist/tracerx" - "github.com/technoweenie/go-contentaddressable" "io" "os" "path/filepath" + + "github.com/cheggaaa/pb" + "github.com/rubyist/tracerx" + "github.com/technoweenie/go-contentaddressable" ) func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, cb CopyCallback) error { @@ -30,6 +31,7 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, cb CopyCa if statErr != nil || stat == nil { wErr = downloadFile(writer, ptr, workingfile, mediafile, cb) } else { + sendApiEvent(apiEventSuccess) wErr = readLocalFile(writer, ptr, mediafile, cb) } diff --git a/lfs/upload_queue.go b/lfs/upload_queue.go index 714896b3..6abdbe8b 100644 --- a/lfs/upload_queue.go +++ b/lfs/upload_queue.go @@ -83,7 +83,7 @@ func (q *UploadQueue) Add(u *Uploadable) { // apiWorker processes the queue, making the POST calls and // feeding the results to uploadWorkers func (q *UploadQueue) processIndividual() { - apic := make(chan *Uploadable, q.workers) + apic := make(chan *Uploadable, q.files) workersReady := make(chan int, q.workers) var wg sync.WaitGroup From 3bb2846e6b50c6181efdb972be23c948bef8fc1d Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 14 May 2015 14:56:30 -0400 Subject: [PATCH 11/36] Can batch downloads --- lfs/download_queue.go | 86 +++++++++++++++++++++++++++++++------------ lfs/pointer_smudge.go | 70 ++++++++++++++++++++++++++++++++++- 2 files changed, 130 insertions(+), 26 deletions(-) diff --git a/lfs/download_queue.go b/lfs/download_queue.go index 0a2aa66c..cab3ea27 100644 --- a/lfs/download_queue.go +++ b/lfs/download_queue.go @@ -10,9 +10,14 @@ import ( "github.com/cheggaaa/pb" ) +type Downloadable struct { + Pointer *wrappedPointer + Object *objectResource +} + // DownloadQueue provides a queue that will allow concurrent uploads. type DownloadQueue struct { - downloadc chan *wrappedPointer + downloadc chan *Downloadable errorc chan *WrappedError errors []*WrappedError wg sync.WaitGroup @@ -21,7 +26,7 @@ type DownloadQueue struct { finished int64 size int64 authCond *sync.Cond - pointers []*wrappedPointer + downloadables map[string]*Downloadable bar *pb.ProgressBar clientAuthorized int32 } @@ -29,30 +34,63 @@ type DownloadQueue struct { // NewDownloadQueue builds a DownloadQueue, allowing `workers` concurrent downloads. func NewDownloadQueue(workers, files int) *DownloadQueue { return &DownloadQueue{ - downloadc: make(chan *wrappedPointer, files), - errorc: make(chan *WrappedError), - workers: workers, - files: files, - authCond: sync.NewCond(&sync.Mutex{}), + downloadc: make(chan *Downloadable, files), + errorc: make(chan *WrappedError), + workers: workers, + files: files, + authCond: sync.NewCond(&sync.Mutex{}), + downloadables: make(map[string]*Downloadable), } } // Add adds an object to the download queue. func (q *DownloadQueue) Add(p *wrappedPointer) { - q.pointers = append(q.pointers, p) + q.downloadables[p.Oid] = &Downloadable{Pointer: p} +} + +func (q *DownloadQueue) processBatch() { + q.files = 0 + downloads := make([]*objectResource, 0, len(q.downloadables)) + for _, d := range q.downloadables { + downloads = append(downloads, &objectResource{Oid: d.Pointer.Oid, Size: d.Pointer.Size}) + } + + objects, err := Batch(downloads) + if err != nil { + q.errorc <- err + sendApiEvent(apiEventFail) + return + } + + for _, o := range objects { + if _, ok := o.Links["download"]; ok { + // This object can be downloaded + if downloadable, ok := q.downloadables[o.Oid]; ok { + q.files++ + q.wg.Add(1) + downloadable.Object = o + q.downloadc <- downloadable + } + } + } + + close(q.downloadc) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) + q.bar.Start() + sendApiEvent(apiEventSuccess) // Wake up upload workers } // apiWorker processes the queue, making the POST calls and // feeding the results to uploadWorkers func (q *DownloadQueue) processIndividual() { - apic := make(chan *wrappedPointer, q.files) + apic := make(chan *Downloadable, q.files) workersReady := make(chan int, q.workers) var wg sync.WaitGroup for i := 0; i < q.workers; i++ { go func() { workersReady <- 1 - for p := range apic { + for d := range apic { // If an API authorization has not occured, we wait until we're woken up. q.authCond.L.Lock() if atomic.LoadInt32(&q.clientAuthorized) == 0 { @@ -60,7 +98,7 @@ func (q *DownloadQueue) processIndividual() { } q.authCond.L.Unlock() - _, err := DownloadCheck(p.Oid) + obj, err := DownloadCheck(d.Pointer.Oid) if err != nil { q.errorc <- err wg.Done() @@ -68,18 +106,19 @@ func (q *DownloadQueue) processIndividual() { } q.wg.Add(1) - q.downloadc <- p + d.Object = obj + q.downloadc <- d wg.Done() } }() } - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.pointers))) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.downloadables))) q.bar.Start() - for _, p := range q.pointers { + for _, d := range q.downloadables { wg.Add(1) - apic <- p + apic <- d } <-workersReady @@ -126,8 +165,8 @@ func (q *DownloadQueue) Process() { // These are the worker goroutines that process uploads go func(n int) { - for ptr := range q.downloadc { - fullPath := filepath.Join(LocalWorkingDir, ptr.Name) + for d := range q.downloadc { + fullPath := filepath.Join(LocalWorkingDir, d.Pointer.Name) output, err := os.Create(fullPath) if err != nil { q.errorc <- Error(err) @@ -141,8 +180,7 @@ func (q *DownloadQueue) Process() { q.bar.Add(current) return nil } - // TODO need a callback - if err := PointerSmudge(output, ptr.Pointer, ptr.Name, cb); err != nil { + if err := PointerSmudgeObject(output, d.Pointer.Pointer, d.Object, cb); err != nil { q.errorc <- Error(err) } @@ -153,11 +191,11 @@ func (q *DownloadQueue) Process() { }(i) } - // if Config.BatchTransfer() { - // q.processBatch() - // } else { - q.processIndividual() - // } + if Config.BatchTransfer() { + q.processBatch() + } else { + q.processIndividual() + } q.wg.Wait() close(q.errorc) diff --git a/lfs/pointer_smudge.go b/lfs/pointer_smudge.go index 52d547fa..26e4ae8a 100644 --- a/lfs/pointer_smudge.go +++ b/lfs/pointer_smudge.go @@ -37,9 +37,75 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, cb CopyCa if wErr != nil { return &SmudgeError{ptr.Oid, mediafile, wErr} - } else { - return nil } + + return nil +} + +func PointerSmudgeObject(writer io.Writer, ptr *Pointer, obj *objectResource, cb CopyCallback) error { + mediafile, err := LocalMediaPath(obj.Oid) + if err != nil { + return err + } + + stat, statErr := os.Stat(mediafile) + if statErr == nil && stat != nil { + fileSize := stat.Size() + if fileSize == 0 || fileSize != obj.Size { + tracerx.Printf("Removing %s, size %d is invalid", mediafile, fileSize) + os.RemoveAll(mediafile) + stat = nil + } + } + + var wErr *WrappedError + if statErr != nil || stat == nil { + wErr = downloadObject(writer, ptr, obj, mediafile, cb) + } else { + wErr = readLocalFile(writer, ptr, mediafile, cb) + } + + if wErr != nil { + sendApiEvent(apiEventFail) + return &SmudgeError{obj.Oid, mediafile, wErr} + } + + sendApiEvent(apiEventSuccess) + return nil +} + +func downloadObject(writer io.Writer, ptr *Pointer, obj *objectResource, mediafile string, cb CopyCallback) *WrappedError { + reader, size, wErr := DownloadObject(obj) + if reader != nil { + defer reader.Close() + } + + // TODO this can be unified with the same code in downloadFile + if wErr != nil { + wErr.Errorf("Error downloading %s.", mediafile) + return wErr + } + + if ptr.Size == 0 { + ptr.Size = size + } + + mediaFile, err := contentaddressable.NewFile(mediafile) + if err != nil { + return Errorf(err, "Error opening media file buffer.") + } + + _, err = CopyWithCallback(mediaFile, reader, ptr.Size, cb) + if err == nil { + err = mediaFile.Accept() + } + mediaFile.Close() + + if err != nil { + return Errorf(err, "Error buffering media file.") + } + + return readLocalFile(writer, ptr, mediafile, nil) } func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, cb CopyCallback) *WrappedError { From de200c6a71463e33ec26164e70b9bca2f33f9afa Mon Sep 17 00:00:00 2001 From: rubyist Date: Mon, 18 May 2015 10:57:51 -0400 Subject: [PATCH 12/36] Using structs here is unnecessary, a map will do --- lfs/client.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/lfs/client.go b/lfs/client.go index 3eabcf54..f6a4b7b2 100644 --- a/lfs/client.go +++ b/lfs/client.go @@ -170,11 +170,11 @@ func (b *byteCloser) Close() error { } func Batch(objects []*objectResource) ([]*objectResource, *WrappedError) { - type objres struct { - Objects []*objectResource `json:"objects"` + if len(objects) == 0 { + return nil, nil } - o := &objres{objects} + o := map[string][]*objectResource{"objects": objects} by, err := json.Marshal(o) if err != nil { @@ -508,18 +508,14 @@ func doApiBatchRequest(req *http.Request, creds Creds) (*http.Response, []*objec return res, nil, wErr } - type ro struct { - Objects []*objectResource `json:"objects"` - } - - var objs ro + var objs map[string][]*objectResource wErr = decodeApiResponse(res, &objs) if wErr != nil { setErrorResponseContext(wErr, res) } - return res, objs.Objects, wErr + return res, objs["objects"], wErr } func handleResponse(res *http.Response) *WrappedError { From 1d87b73ef0eef7c2e8d4317cde6fc343fd3a6d87 Mon Sep 17 00:00:00 2001 From: rubyist Date: Mon, 18 May 2015 11:00:20 -0400 Subject: [PATCH 13/36] Don't need to open if we're just using Stat() --- lfs/client.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lfs/client.go b/lfs/client.go index f6a4b7b2..8b1c9b34 100644 --- a/lfs/client.go +++ b/lfs/client.go @@ -306,14 +306,8 @@ func Upload(oidPath, filename string, cb CopyCallback) *WrappedError { func UploadCheck(oidPath string) (*objectResource, *WrappedError) { oid := filepath.Base(oidPath) - file, err := os.Open(oidPath) - if err != nil { - sendApiEvent(apiEventFail) - return nil, Error(err) - } - defer file.Close() - stat, err := file.Stat() // Stat without opening TODO + stat, err := os.Stat(oidPath) if err != nil { sendApiEvent(apiEventFail) return nil, Error(err) From f905afa2702b8a7732c4108c1ddb443d3cd2c50d Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 21 May 2015 10:29:17 -0400 Subject: [PATCH 14/36] Make PointerSmudgeObject only download to the media directory --- lfs/download_queue.go | 18 +++--------------- lfs/pointer_smudge.go | 21 ++++++++++----------- 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/lfs/download_queue.go b/lfs/download_queue.go index cab3ea27..d46a1065 100644 --- a/lfs/download_queue.go +++ b/lfs/download_queue.go @@ -2,12 +2,9 @@ package lfs import ( "fmt" - "os" - "path/filepath" + "github.com/cheggaaa/pb" "sync" "sync/atomic" - - "github.com/cheggaaa/pb" ) type Downloadable struct { @@ -166,21 +163,12 @@ func (q *DownloadQueue) Process() { go func(n int) { for d := range q.downloadc { - fullPath := filepath.Join(LocalWorkingDir, d.Pointer.Name) - output, err := os.Create(fullPath) - if err != nil { - q.errorc <- Error(err) - f := atomic.AddInt64(&q.finished, 1) - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) - q.wg.Done() - continue - } - cb := func(total, read int64, current int) error { q.bar.Add(current) return nil } - if err := PointerSmudgeObject(output, d.Pointer.Pointer, d.Object, cb); err != nil { + + if err := PointerSmudgeObject(d.Pointer.Pointer, d.Object, cb); err != nil { q.errorc <- Error(err) } diff --git a/lfs/pointer_smudge.go b/lfs/pointer_smudge.go index 26e4ae8a..a34d431e 100644 --- a/lfs/pointer_smudge.go +++ b/lfs/pointer_smudge.go @@ -42,7 +42,9 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, cb CopyCa return nil } -func PointerSmudgeObject(writer io.Writer, ptr *Pointer, obj *objectResource, cb CopyCallback) error { +// PointerSmudgeObject uses a Pointer and objectResource to download the object to the +// media directory. It does not write the file to the working directory. +func PointerSmudgeObject(ptr *Pointer, obj *objectResource, cb CopyCallback) error { mediafile, err := LocalMediaPath(obj.Oid) if err != nil { return err @@ -58,23 +60,20 @@ func PointerSmudgeObject(writer io.Writer, ptr *Pointer, obj *objectResource, cb } } - var wErr *WrappedError if statErr != nil || stat == nil { - wErr = downloadObject(writer, ptr, obj, mediafile, cb) - } else { - wErr = readLocalFile(writer, ptr, mediafile, cb) - } + wErr := downloadObject(ptr, obj, mediafile, cb) - if wErr != nil { - sendApiEvent(apiEventFail) - return &SmudgeError{obj.Oid, mediafile, wErr} + if wErr != nil { + sendApiEvent(apiEventFail) + return &SmudgeError{obj.Oid, mediafile, wErr} + } } sendApiEvent(apiEventSuccess) return nil } -func downloadObject(writer io.Writer, ptr *Pointer, obj *objectResource, mediafile string, cb CopyCallback) *WrappedError { +func downloadObject(ptr *Pointer, obj *objectResource, mediafile string, cb CopyCallback) *WrappedError { reader, size, wErr := DownloadObject(obj) if reader != nil { defer reader.Close() @@ -105,7 +104,7 @@ func downloadObject(writer io.Writer, ptr *Pointer, obj *objectResource, mediafi return Errorf(err, "Error buffering media file.") } - return readLocalFile(writer, ptr, mediafile, nil) + return nil } func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, cb CopyCallback) *WrappedError { From 685e8771d3ab811a02a8c40216c7b5c34793c7dd Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 21 May 2015 10:36:42 -0400 Subject: [PATCH 15/36] Add some git code to resolve a ref to a sha --- git/git.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/git/git.go b/git/git.go index fe6e01db..932f27f4 100644 --- a/git/git.go +++ b/git/git.go @@ -21,8 +21,12 @@ func LsRemote(remote, remoteRef string) (string, error) { return simpleExec(nil, "git", "ls-remote", remote, remoteRef) } +func ResolveRef(ref string) (string, error) { + return simpleExec(nil, "git", "rev-parse", ref) +} + func CurrentRef() (string, error) { - return simpleExec(nil, "git", "rev-parse", "HEAD") + return ResolveRef("HEAD") } func CurrentBranch() (string, error) { @@ -35,7 +39,7 @@ func CurrentRemoteRef() (string, error) { return "", err } - return simpleExec(nil, "git", "rev-parse", remote) + return ResolveRef(remote) } func CurrentRemote() (string, error) { From e445b45c1c9c6282614f201b62778e4c0688b5c8 Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 21 May 2015 10:49:27 -0400 Subject: [PATCH 16/36] If the ref we 'get' is the ref we're on, we can write to the working directory --- commands/command_get.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/commands/command_get.go b/commands/command_get.go index 52d02674..d30a89dd 100644 --- a/commands/command_get.go +++ b/commands/command_get.go @@ -4,6 +4,7 @@ import ( "github.com/github/git-lfs/git" "github.com/github/git-lfs/lfs" "github.com/spf13/cobra" + "os" ) var ( @@ -39,6 +40,28 @@ func getCommand(cmd *cobra.Command, args []string) { } q.Process() + + target, err := git.ResolveRef(ref) + if err != nil { + } + + current, err := git.CurrentRef() + if err != nil { + } + + if target == current { + for _, pointer := range pointers { + file, err := os.Create(pointer.Name) + if err != nil { + Panic(err, "Could not create working directory file") + } + + err = lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil) + if err != nil { + Panic(err, "Could not write working directory file") + } + } + } } func init() { From f8f4ad230ab7cd08f067ff95a3450dca105b40da Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 21 May 2015 11:20:54 -0400 Subject: [PATCH 17/36] Start a man page for lfs get --- docs/man/git-lfs-get.1.ronn | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 docs/man/git-lfs-get.1.ronn diff --git a/docs/man/git-lfs-get.1.ronn b/docs/man/git-lfs-get.1.ronn new file mode 100644 index 00000000..7d38173c --- /dev/null +++ b/docs/man/git-lfs-get.1.ronn @@ -0,0 +1,31 @@ +git-lfs-get(1) -- Download all Git LFS files for a given ref +============================================================ + +## SYNOPSIS + +`git lfs get` [] + +## DESCRIPTION + +Download any Git LFS objects for a given ref. If no ref is given, +the currently checked out ref will be used. + +If the given ref is the same as the currently checked out ref, the +files will be written to the working directory. + +## EXAMPLES + +* Get the LFS objects for the current ref + + `git lfs get` + +* Get the LFS objects for a branch + + `git lfs get mybranch` + +* Get the LFS objects for a commit + + `git lfs get e445b45c1c9c6282614f201b62778e4c0688b5c8` + +Part of the git-lfs(1) suite. + From a3bb99756735f33331a42721fb20c759f34b52c1 Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 21 May 2015 12:36:49 -0400 Subject: [PATCH 18/36] Pull common queue code into a TransferQueue, reduce duplicated code --- commands/command_get.go | 2 +- lfs/download_queue.go | 222 ++++++-------------------------------- lfs/transfer_queue.go | 205 +++++++++++++++++++++++++++++++++++ lfs/upload_queue.go | 231 +++++++--------------------------------- 4 files changed, 279 insertions(+), 381 deletions(-) create mode 100644 lfs/transfer_queue.go diff --git a/commands/command_get.go b/commands/command_get.go index d30a89dd..1e8d8b26 100644 --- a/commands/command_get.go +++ b/commands/command_get.go @@ -36,7 +36,7 @@ func getCommand(cmd *cobra.Command, args []string) { q := lfs.NewDownloadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) for _, p := range pointers { - q.Add(p) + q.Add(lfs.NewDownloadable(p)) } q.Process() diff --git a/lfs/download_queue.go b/lfs/download_queue.go index d46a1065..4822d4ea 100644 --- a/lfs/download_queue.go +++ b/lfs/download_queue.go @@ -1,197 +1,45 @@ package lfs -import ( - "fmt" - "github.com/cheggaaa/pb" - "sync" - "sync/atomic" -) - type Downloadable struct { Pointer *wrappedPointer - Object *objectResource + object *objectResource } -// DownloadQueue provides a queue that will allow concurrent uploads. -type DownloadQueue struct { - downloadc chan *Downloadable - errorc chan *WrappedError - errors []*WrappedError - wg sync.WaitGroup - workers int - files int - finished int64 - size int64 - authCond *sync.Cond - downloadables map[string]*Downloadable - bar *pb.ProgressBar - clientAuthorized int32 +func NewDownloadable(p *wrappedPointer) *Downloadable { + return &Downloadable{Pointer: p} +} + +func (d *Downloadable) Check() (*objectResource, *WrappedError) { + return DownloadCheck(d.Pointer.Oid) +} + +func (d *Downloadable) Transfer(cb CopyCallback) *WrappedError { + err := PointerSmudgeObject(d.Pointer.Pointer, d.object, cb) + if err != nil { + return Error(err) + } + return nil +} + +func (d *Downloadable) Object() *objectResource { + return d.object +} + +func (d *Downloadable) Oid() string { + return d.Pointer.Oid +} + +func (d *Downloadable) Size() int64 { + return d.Pointer.Size +} + +func (d *Downloadable) SetObject(o *objectResource) { + d.object = o } // NewDownloadQueue builds a DownloadQueue, allowing `workers` concurrent downloads. -func NewDownloadQueue(workers, files int) *DownloadQueue { - return &DownloadQueue{ - downloadc: make(chan *Downloadable, files), - errorc: make(chan *WrappedError), - workers: workers, - files: files, - authCond: sync.NewCond(&sync.Mutex{}), - downloadables: make(map[string]*Downloadable), - } -} - -// Add adds an object to the download queue. -func (q *DownloadQueue) Add(p *wrappedPointer) { - q.downloadables[p.Oid] = &Downloadable{Pointer: p} -} - -func (q *DownloadQueue) processBatch() { - q.files = 0 - downloads := make([]*objectResource, 0, len(q.downloadables)) - for _, d := range q.downloadables { - downloads = append(downloads, &objectResource{Oid: d.Pointer.Oid, Size: d.Pointer.Size}) - } - - objects, err := Batch(downloads) - if err != nil { - q.errorc <- err - sendApiEvent(apiEventFail) - return - } - - for _, o := range objects { - if _, ok := o.Links["download"]; ok { - // This object can be downloaded - if downloadable, ok := q.downloadables[o.Oid]; ok { - q.files++ - q.wg.Add(1) - downloadable.Object = o - q.downloadc <- downloadable - } - } - } - - close(q.downloadc) - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) - q.bar.Start() - sendApiEvent(apiEventSuccess) // Wake up upload workers -} - -// apiWorker processes the queue, making the POST calls and -// feeding the results to uploadWorkers -func (q *DownloadQueue) processIndividual() { - apic := make(chan *Downloadable, q.files) - workersReady := make(chan int, q.workers) - var wg sync.WaitGroup - - for i := 0; i < q.workers; i++ { - go func() { - workersReady <- 1 - for d := range apic { - // If an API authorization has not occured, we wait until we're woken up. - q.authCond.L.Lock() - if atomic.LoadInt32(&q.clientAuthorized) == 0 { - q.authCond.Wait() - } - q.authCond.L.Unlock() - - obj, err := DownloadCheck(d.Pointer.Oid) - if err != nil { - q.errorc <- err - wg.Done() - continue - } - - q.wg.Add(1) - d.Object = obj - q.downloadc <- d - wg.Done() - } - }() - } - - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.downloadables))) - q.bar.Start() - - for _, d := range q.downloadables { - wg.Add(1) - apic <- d - } - - <-workersReady - q.authCond.Signal() // Signal the first goroutine to run - close(apic) - wg.Wait() - - close(q.downloadc) -} - -// Process starts the download queue and displays a progress bar. -func (q *DownloadQueue) Process() { - q.bar = pb.New64(q.size) - q.bar.SetUnits(pb.U_BYTES) - q.bar.ShowBar = false - - // This goroutine collects errors returned from downloads - go func() { - for err := range q.errorc { - q.errors = append(q.errors, err) - } - }() - - // This goroutine watches for apiEvents. In order to prevent multiple - // credential requests from happening, the queue is processed sequentially - // until an API request succeeds (meaning authenication has happened successfully). - // Once the an API request succeeds, all worker goroutines are woken up and allowed - // to process downloads. Once a success happens, this goroutine exits. - go func() { - for { - event := <-apiEvent - switch event { - case apiEventSuccess: - atomic.StoreInt32(&q.clientAuthorized, 1) - q.authCond.Broadcast() // Wake all remaining goroutines - return - case apiEventFail: - q.authCond.Signal() // Wake the next goroutine - } - } - }() - - for i := 0; i < q.workers; i++ { - // These are the worker goroutines that process uploads - go func(n int) { - - for d := range q.downloadc { - cb := func(total, read int64, current int) error { - q.bar.Add(current) - return nil - } - - if err := PointerSmudgeObject(d.Pointer.Pointer, d.Object, cb); err != nil { - q.errorc <- Error(err) - } - - f := atomic.AddInt64(&q.finished, 1) - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) - q.wg.Done() - } - }(i) - } - - if Config.BatchTransfer() { - q.processBatch() - } else { - q.processIndividual() - } - - q.wg.Wait() - close(q.errorc) - - q.bar.Finish() -} - -// Errors returns any errors encountered during uploading. -func (q *DownloadQueue) Errors() []*WrappedError { - return q.errors +func NewDownloadQueue(workers, files int) *TransferQueue { + q := newTransferQueue(workers, files) + q.transferKind = "download" + return q } diff --git a/lfs/transfer_queue.go b/lfs/transfer_queue.go new file mode 100644 index 00000000..acf1ff9f --- /dev/null +++ b/lfs/transfer_queue.go @@ -0,0 +1,205 @@ +package lfs + +import ( + "fmt" + "github.com/cheggaaa/pb" + "sync" + "sync/atomic" +) + +type Transferable interface { + Check() (*objectResource, *WrappedError) + Transfer(CopyCallback) *WrappedError + Object() *objectResource + Oid() string + Size() int64 + SetObject(*objectResource) +} + +// TransferQueue provides a queue that will allow concurrent transfers. +type TransferQueue struct { + transferc chan Transferable + errorc chan *WrappedError + errors []*WrappedError + wg sync.WaitGroup + workers int + files int + finished int64 + size int64 + authCond *sync.Cond + transferables map[string]Transferable + bar *pb.ProgressBar + clientAuthorized int32 + transferKind string +} + +// newTransferQueue builds a TransferQueue, allowing `workers` concurrent transfers. +func newTransferQueue(workers, files int) *TransferQueue { + return &TransferQueue{ + transferc: make(chan Transferable, files), + errorc: make(chan *WrappedError), + workers: workers, + files: files, + authCond: sync.NewCond(&sync.Mutex{}), + transferables: make(map[string]Transferable), + } +} + +// Add adds an Uploadable to the upload queue. +func (q *TransferQueue) Add(t Transferable) { + q.transferables[t.Oid()] = t +} + +// apiWorker processes the queue, making the POST calls and +// feeding the results to uploadWorkers +func (q *TransferQueue) processIndividual() { + apic := make(chan Transferable, q.files) + workersReady := make(chan int, q.workers) + var wg sync.WaitGroup + + for i := 0; i < q.workers; i++ { + go func() { + workersReady <- 1 + for t := range apic { + // If an API authorization has not occured, we wait until we're woken up. + q.authCond.L.Lock() + if atomic.LoadInt32(&q.clientAuthorized) == 0 { + q.authCond.Wait() + } + q.authCond.L.Unlock() + + obj, err := t.Check() + if err != nil { + q.errorc <- err + wg.Done() + continue + } + if obj != nil { + q.wg.Add(1) + t.SetObject(obj) + q.transferc <- t + } + wg.Done() + } + }() + } + + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.transferables))) + q.bar.Start() + + for _, t := range q.transferables { + wg.Add(1) + apic <- t + } + + <-workersReady + q.authCond.Signal() // Signal the first goroutine to run + close(apic) + wg.Wait() + + close(q.transferc) +} + +// batchWorker makes the batch POST call, feeding the results +// to the transfer workers +func (q *TransferQueue) processBatch() { + q.files = 0 + transfers := make([]*objectResource, 0, len(q.transferables)) + for _, t := range q.transferables { + transfers = append(transfers, &objectResource{Oid: t.Oid(), Size: t.Size()}) + } + + objects, err := Batch(transfers) + if err != nil { + q.errorc <- err + sendApiEvent(apiEventFail) + return + } + + for _, o := range objects { + if _, ok := o.Links[q.transferKind]; ok { + // This object needs to be transfered + if transfer, ok := q.transferables[o.Oid]; ok { + q.files++ + q.wg.Add(1) + transfer.SetObject(o) + q.transferc <- transfer + } + } + } + + close(q.transferc) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) + q.bar.Start() + sendApiEvent(apiEventSuccess) // Wake up upload workers +} + +// Process starts the upload queue and displays a progress bar. +func (q *TransferQueue) Process() { + q.bar = pb.New64(q.size) + q.bar.SetUnits(pb.U_BYTES) + q.bar.ShowBar = false + + // This goroutine collects errors returned from uploads + go func() { + for err := range q.errorc { + q.errors = append(q.errors, err) + } + }() + + // This goroutine watches for apiEvents. In order to prevent multiple + // credential requests from happening, the queue is processed sequentially + // until an API request succeeds (meaning authenication has happened successfully). + // Once the an API request succeeds, all worker goroutines are woken up and allowed + // to process uploads. Once a success happens, this goroutine exits. + go func() { + for { + event := <-apiEvent + switch event { + case apiEventSuccess: + atomic.StoreInt32(&q.clientAuthorized, 1) + q.authCond.Broadcast() // Wake all remaining goroutines + return + case apiEventFail: + q.authCond.Signal() // Wake the next goroutine + } + } + }() + + for i := 0; i < q.workers; i++ { + // These are the worker goroutines that process uploads + go func(n int) { + + for transfer := range q.transferc { + cb := func(total, read int64, current int) error { + q.bar.Add(current) + return nil + } + + if err := transfer.Transfer(cb); err != nil { + q.errorc <- err + } + + f := atomic.AddInt64(&q.finished, 1) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) + q.wg.Done() + } + }(i) + } + + if Config.BatchTransfer() { + q.processBatch() + } else { + q.processIndividual() + } + + q.wg.Wait() + close(q.errorc) + + q.bar.Finish() +} + +// Errors returns any errors encountered during uploading. +func (q *TransferQueue) Errors() []*WrappedError { + return q.errors +} diff --git a/lfs/upload_queue.go b/lfs/upload_queue.go index 6abdbe8b..002f0521 100644 --- a/lfs/upload_queue.go +++ b/lfs/upload_queue.go @@ -2,20 +2,17 @@ package lfs import ( "fmt" - "github.com/cheggaaa/pb" "os" "path/filepath" - "sync" - "sync/atomic" ) // Uploadable describes a file that can be uploaded. type Uploadable struct { - OID string - OIDPath string + oid string + OidPath string Filename string CB CopyCallback - Size int64 + size int64 object *objectResource } @@ -44,198 +41,46 @@ func NewUploadable(oid, filename string, index, totalFiles int) (*Uploadable, *W defer file.Close() } - return &Uploadable{OID: oid, OIDPath: path, Filename: filename, CB: cb, Size: fi.Size()}, nil + return &Uploadable{oid: oid, OidPath: path, Filename: filename, CB: cb, size: fi.Size()}, nil } -// UploadQueue provides a queue that will allow concurrent uploads. -type UploadQueue struct { - uploadc chan *Uploadable - errorc chan *WrappedError - errors []*WrappedError - wg sync.WaitGroup - workers int - files int - finished int64 - size int64 - authCond *sync.Cond - uploadables map[string]*Uploadable - bar *pb.ProgressBar - clientAuthorized int32 +func (u *Uploadable) Check() (*objectResource, *WrappedError) { + return UploadCheck(u.OidPath) +} + +func (u *Uploadable) Transfer(cb CopyCallback) *WrappedError { + wcb := func(total, read int64, current int) error { + cb(total, read, current) + if u.CB != nil { + return u.CB(total, read, current) + } + return nil + } + + return UploadObject(u.object, wcb) +} + +func (u *Uploadable) Object() *objectResource { + return u.object +} + +func (u *Uploadable) Oid() string { + return u.oid +} + +func (u *Uploadable) Size() int64 { + return u.size +} + +func (u *Uploadable) SetObject(o *objectResource) { + u.object = o } // NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads. -func NewUploadQueue(workers, files int) *UploadQueue { - return &UploadQueue{ - uploadc: make(chan *Uploadable, files), - errorc: make(chan *WrappedError), - workers: workers, - files: files, - authCond: sync.NewCond(&sync.Mutex{}), - uploadables: make(map[string]*Uploadable), - } -} - -// Add adds an Uploadable to the upload queue. -func (q *UploadQueue) Add(u *Uploadable) { - q.uploadables[u.OID] = u -} - -// apiWorker processes the queue, making the POST calls and -// feeding the results to uploadWorkers -func (q *UploadQueue) processIndividual() { - apic := make(chan *Uploadable, q.files) - workersReady := make(chan int, q.workers) - var wg sync.WaitGroup - - for i := 0; i < q.workers; i++ { - go func() { - workersReady <- 1 - for u := range apic { - // If an API authorization has not occured, we wait until we're woken up. - q.authCond.L.Lock() - if atomic.LoadInt32(&q.clientAuthorized) == 0 { - q.authCond.Wait() - } - q.authCond.L.Unlock() - - obj, err := UploadCheck(u.OIDPath) - if err != nil { - q.errorc <- err - wg.Done() - continue - } - if obj != nil { - q.wg.Add(1) - u.object = obj - q.uploadc <- u - } - wg.Done() - } - }() - } - - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.uploadables))) - q.bar.Start() - - for _, u := range q.uploadables { - wg.Add(1) - apic <- u - } - - <-workersReady - q.authCond.Signal() // Signal the first goroutine to run - close(apic) - wg.Wait() - - close(q.uploadc) -} - -// batchWorker makes the batch POST call, feeding the results -// to the uploadWorkers -func (q *UploadQueue) processBatch() { - q.files = 0 - uploads := make([]*objectResource, 0, len(q.uploadables)) - for _, u := range q.uploadables { - uploads = append(uploads, &objectResource{Oid: u.OID, Size: u.Size}) - } - - objects, err := Batch(uploads) - if err != nil { - q.errorc <- err - sendApiEvent(apiEventFail) - return - } - - for _, o := range objects { - if _, ok := o.Links["upload"]; ok { - // This object needs to be uploaded - if uploadable, ok := q.uploadables[o.Oid]; ok { - q.files++ - q.wg.Add(1) - uploadable.object = o - q.uploadc <- uploadable - } - } - } - - close(q.uploadc) - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) - q.bar.Start() - sendApiEvent(apiEventSuccess) // Wake up upload workers -} - -// Process starts the upload queue and displays a progress bar. -func (q *UploadQueue) Process() { - q.bar = pb.New64(q.size) - q.bar.SetUnits(pb.U_BYTES) - q.bar.ShowBar = false - - // This goroutine collects errors returned from uploads - go func() { - for err := range q.errorc { - q.errors = append(q.errors, err) - } - }() - - // This goroutine watches for apiEvents. In order to prevent multiple - // credential requests from happening, the queue is processed sequentially - // until an API request succeeds (meaning authenication has happened successfully). - // Once the an API request succeeds, all worker goroutines are woken up and allowed - // to process uploads. Once a success happens, this goroutine exits. - go func() { - for { - event := <-apiEvent - switch event { - case apiEventSuccess: - atomic.StoreInt32(&q.clientAuthorized, 1) - q.authCond.Broadcast() // Wake all remaining goroutines - return - case apiEventFail: - q.authCond.Signal() // Wake the next goroutine - } - } - }() - - for i := 0; i < q.workers; i++ { - // These are the worker goroutines that process uploads - go func(n int) { - - for upload := range q.uploadc { - cb := func(total, read int64, current int) error { - q.bar.Add(current) - if upload.CB != nil { - return upload.CB(total, read, current) - } - return nil - } - - err := UploadObject(upload.object, cb) - if err != nil { - q.errorc <- err - } - - f := atomic.AddInt64(&q.finished, 1) - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) - q.wg.Done() - } - }(i) - } - - if Config.BatchTransfer() { - q.processBatch() - } else { - q.processIndividual() - } - - q.wg.Wait() - close(q.errorc) - - q.bar.Finish() -} - -// Errors returns any errors encountered during uploading. -func (q *UploadQueue) Errors() []*WrappedError { - return q.errors +func NewUploadQueue(workers, files int) *TransferQueue { + q := newTransferQueue(workers, files) + q.transferKind = "upload" + return q } // ensureFile makes sure that the cleanPath exists before pushing it. If it From ee68f55d249141584e6f28991d871d3b92d194f9 Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 21 May 2015 14:11:25 -0400 Subject: [PATCH 19/36] The test server needs to give the oid and size back in the json --- test/cmd/lfstest-gitserver.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/cmd/lfstest-gitserver.go b/test/cmd/lfstest-gitserver.go index d1da10f8..af520c92 100644 --- a/test/cmd/lfstest-gitserver.go +++ b/test/cmd/lfstest-gitserver.go @@ -111,6 +111,8 @@ func lfsPostHandler(w http.ResponseWriter, r *http.Request) { } res := &lfsObject{ + Oid: obj.Oid, + Size: obj.Size, Links: map[string]lfsLink{ "upload": lfsLink{ Href: server.URL + "/storage/" + obj.Oid, From e9646c042d945e57f5f74cf01f60de78f9f0d121 Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 21 May 2015 14:11:39 -0400 Subject: [PATCH 20/36] Update happy path output test for batched progress output --- test/test-happy-path.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test-happy-path.sh b/test/test-happy-path.sh index c1d1840a..df909a3d 100755 --- a/test/test-happy-path.sh +++ b/test/test-happy-path.sh @@ -51,7 +51,7 @@ begin_test "happy path" # This pushes to the remote repository set up at the top of the test. out=$(git push origin master 2>&1) - echo "$out" | grep "(1 of 1 files) 1 B / 1 B 100.00 %" + echo "$out" | grep "(1 of 1 files)" echo "$out" | grep "master -> master" assert_server_object "$contents_oid" "$contents" From cc960a6261b965369b3bdef894c5059efc92818a Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 21 May 2015 16:51:02 -0400 Subject: [PATCH 21/36] Add batch endpoint to integration test server --- test/cmd/lfstest-gitserver.go | 50 ++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/test/cmd/lfstest-gitserver.go b/test/cmd/lfstest-gitserver.go index af520c92..e13feb18 100644 --- a/test/cmd/lfstest-gitserver.go +++ b/test/cmd/lfstest-gitserver.go @@ -85,7 +85,11 @@ func lfsHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/vnd.git-lfs+json") switch r.Method { case "POST": - lfsPostHandler(w, r) + if strings.HasSuffix(r.URL.String(), "batch") { + lfsBatchHandler(w, r) + } else { + lfsPostHandler(w, r) + } case "GET": lfsGetHandler(w, r) default: @@ -164,6 +168,50 @@ func lfsGetHandler(w http.ResponseWriter, r *http.Request) { w.Write(by) } +func lfsBatchHandler(w http.ResponseWriter, r *http.Request) { + buf := &bytes.Buffer{} + tee := io.TeeReader(r.Body, buf) + var objs map[string][]lfsObject + err := json.NewDecoder(tee).Decode(&objs) + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + + log.Println("REQUEST") + log.Println(buf.String()) + + if err != nil { + log.Fatal(err) + } + + res := []lfsObject{} + for _, obj := range objs["objects"] { + o := lfsObject{ + Oid: obj.Oid, + Size: obj.Size, + Links: map[string]lfsLink{ + "upload": lfsLink{ + Href: server.URL + "/storage/" + obj.Oid, + }, + }, + } + + res = append(res, o) + } + + ores := map[string][]lfsObject{"objects": res} + + by, err := json.Marshal(ores) + if err != nil { + log.Fatal(err) + } + + log.Println("RESPONSE: 200") + log.Println(string(by)) + + w.WriteHeader(200) + w.Write(by) +} + // handles any /storage/{oid} requests func storageHandler(w http.ResponseWriter, r *http.Request) { log.Printf("storage %s %s\n", r.Method, r.URL) From 740ba62254709b0f0a7f52bcbd507d05ce826727 Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 21 May 2015 16:51:15 -0400 Subject: [PATCH 22/36] Add an integration test that pushes with batch enabled --- test/test-batch-transfer.sh | 74 +++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100755 test/test-batch-transfer.sh diff --git a/test/test-batch-transfer.sh b/test/test-batch-transfer.sh new file mode 100755 index 00000000..9fced1e0 --- /dev/null +++ b/test/test-batch-transfer.sh @@ -0,0 +1,74 @@ +#!/bin/sh +# This is a sample Git LFS test. See test/README.md and testhelpers.sh for +# more documentation. + +. "test/testlib.sh" + +begin_test "batch transfer" +( + set -e + + # This initializes a new bare git repository in test/remote. + # These remote repositories are global to every test, so keep the names + # unique. + reponame="$(basename "$0" ".sh")" + setup_remote_repo "$reponame" + + # Clone the repository from the test Git server. This is empty, and will be + # used to test a "git pull" below. The repo is cloned to $TRASHDIR/clone + clone_repo "$reponame" clone + + # Clone the repository again to $TRASHDIR/repo. This will be used to commit + # and push objects. + clone_repo "$reponame" repo + + # This executes Git LFS from the local repo that was just cloned. + out=$($GITLFS track "*.dat" 2>&1) + echo "$out" | grep "Tracking \*.dat" + + contents=$(printf "a") + contents_oid=$(printf "$contents" | shasum -a 256 | cut -f 1 -d " ") + + printf "$contents" > a.dat + git add a.dat + git add .gitattributes + out=$(git commit -m "add a.dat" 2>&1) + echo "$out" | grep "master (root-commit)" + echo "$out" | grep "2 files changed" + echo "$out" | grep "create mode 100644 a.dat" + echo "$out" | grep "create mode 100644 .gitattributes" + + out=$(cat a.dat) + if [ "$out" != "a" ]; then + exit 1 + fi + + + # This is a small shell function that runs several git commands together. + assert_pointer "master" "a.dat" "$contents_oid" 1 + + refute_server_object "$contents_oid" + + # Ensure batch transfer is turned on for this repo + git config --add --local lfs.batch true + + # This pushes to the remote repository set up at the top of the test. + out=$(git push origin master 2>&1) + echo "$out" | grep "(1 of 1 files)" + echo "$out" | grep "master -> master" + + assert_server_object "$contents_oid" "$contents" + + # change to the clone's working directory + cd ../clone + + git pull 2>&1 | grep "Downloading a.dat (1 B)" + + out=$(cat a.dat) + if [ "$out" != "a" ]; then + exit 1 + fi + + assert_pointer "master" "a.dat" "$contents_oid" 1 +) +end_test From 8d2d8264f79007b349f70764932dae0d64b0419a Mon Sep 17 00:00:00 2001 From: rubyist Date: Fri, 22 May 2015 10:10:01 -0400 Subject: [PATCH 23/36] Update the index when replacing files in the wd with `get` --- commands/command_get.go | 9 +++++++-- git/git.go | 5 +++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/commands/command_get.go b/commands/command_get.go index 96a189ec..362655ab 100644 --- a/commands/command_get.go +++ b/commands/command_get.go @@ -51,16 +51,21 @@ func getCommand(cmd *cobra.Command, args []string) { } if target == current { + // We just downloaded the files for the current ref, we can copy them into + // the working directory and update the git index for _, pointer := range pointers { file, err := os.Create(pointer.Name) if err != nil { Panic(err, "Could not create working directory file") } - err = lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil) - if err != nil { + if err := lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil); err != nil { Panic(err, "Could not write working directory file") } + + if err := git.UpdateIndex(pointer.Name); err != nil { + Panic(err, "Could not update index") + } } } } diff --git a/git/git.go b/git/git.go index a95397b0..77f31fa2 100644 --- a/git/git.go +++ b/git/git.go @@ -61,6 +61,11 @@ func CurrentRemote() (string, error) { return remote + "/" + branch, nil } +func UpdateIndex(file string) error { + _, err := simpleExec(nil, "git", "update-index", "-q", "--refresh", file) + return err +} + type gitConfig struct { } From 8ed808cc05d093bf1f183cdc5274c47c79e5f4d2 Mon Sep 17 00:00:00 2001 From: rubyist Date: Tue, 26 May 2015 09:47:27 -0400 Subject: [PATCH 24/36] Remove debugging print --- lfs/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lfs/client.go b/lfs/client.go index 2e3dfe76..83f63acb 100644 --- a/lfs/client.go +++ b/lfs/client.go @@ -549,7 +549,6 @@ func decodeApiResponse(res *http.Response, obj interface{}) *WrappedError { res.Body.Close() if err != nil { - fmt.Printf("DECODE ERROR: %s\n", err) return Errorf(err, "Unable to parse HTTP response for %s %s", res.Request.Method, res.Request.URL) } From 565928c872fcb626f1ebdafb3d971a04e7c32fe3 Mon Sep 17 00:00:00 2001 From: rubyist Date: Tue, 26 May 2015 09:56:43 -0400 Subject: [PATCH 25/36] Update some documentation --- lfs/transfer_queue.go | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/lfs/transfer_queue.go b/lfs/transfer_queue.go index 9c507d2f..84839028 100644 --- a/lfs/transfer_queue.go +++ b/lfs/transfer_queue.go @@ -46,13 +46,15 @@ func newTransferQueue(workers, files int) *TransferQueue { } } -// Add adds an Uploadable to the upload queue. +// Add adds a Transferable to the transfer queue. func (q *TransferQueue) Add(t Transferable) { q.transferables[t.Oid()] = t } -// apiWorker processes the queue, making the POST calls and -// feeding the results to uploadWorkers +// processIndividual processes the queue of transfers one at a time by making +// a POST call for each object, feeding the results to the transfer workers. +// If configured, the object transfers can still happen concurrently, the +// sequential nature here is only for the meta POST calls. func (q *TransferQueue) processIndividual() { apic := make(chan Transferable, q.files) workersReady := make(chan int, q.workers) @@ -101,8 +103,9 @@ func (q *TransferQueue) processIndividual() { close(q.transferc) } -// batchWorker makes the batch POST call, feeding the results -// to the transfer workers +// processBatch processes the queue of transfers using the batch endpoint, +// making only one POST call for all objects. The results are then handed +// off to the transfer workers. func (q *TransferQueue) processBatch() { q.files = 0 transfers := make([]*objectResource, 0, len(q.transferables)) @@ -132,16 +135,19 @@ func (q *TransferQueue) processBatch() { close(q.transferc) q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) q.bar.Start() - sendApiEvent(apiEventSuccess) // Wake up upload workers + sendApiEvent(apiEventSuccess) // Wake up transfer workers } -// Process starts the upload queue and displays a progress bar. +// Process starts the transfer queue and displays a progress bar. Process will +// do individual or batch transfers depending on the Config.BatchTransfer() value. +// Process will transfer files sequentially or concurrently depending on the +// Concig.ConcurrentTransfers() value. func (q *TransferQueue) Process() { q.bar = pb.New64(q.size) q.bar.SetUnits(pb.U_BYTES) q.bar.ShowBar = false - // This goroutine collects errors returned from uploads + // This goroutine collects errors returned from transfers go func() { for err := range q.errorc { q.errors = append(q.errors, err) @@ -152,7 +158,7 @@ func (q *TransferQueue) Process() { // credential requests from happening, the queue is processed sequentially // until an API request succeeds (meaning authenication has happened successfully). // Once the an API request succeeds, all worker goroutines are woken up and allowed - // to process uploads. Once a success happens, this goroutine exits. + // to process transfers. Once a success happens, this goroutine exits. go func() { for { event := <-apiEvent @@ -168,7 +174,7 @@ func (q *TransferQueue) Process() { }() for i := 0; i < q.workers; i++ { - // These are the worker goroutines that process uploads + // These are the worker goroutines that process transfers go func(n int) { for transfer := range q.transferc { @@ -200,7 +206,7 @@ func (q *TransferQueue) Process() { q.bar.Finish() } -// Errors returns any errors encountered during uploading. +// Errors returns any errors encountered during transfer. func (q *TransferQueue) Errors() []*WrappedError { return q.errors } From 135d59bc4e9b30e9d80e9b9be90279a2e5231e52 Mon Sep 17 00:00:00 2001 From: rubyist Date: Tue, 26 May 2015 15:42:37 -0400 Subject: [PATCH 26/36] More robust config parsing for concurrent/batch vals * Ensure concurrent values are at least 1 * Ensure batch boolean follows git config's rules * Tests for each --- lfs/config.go | 11 +++- lfs/config_test.go | 135 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/lfs/config.go b/lfs/config.go index 9b95ecd4..288984ef 100644 --- a/lfs/config.go +++ b/lfs/config.go @@ -76,7 +76,7 @@ func (c *Configuration) ConcurrentTransfers() int { if v, ok := c.GitConfig("lfs.concurrenttransfers"); ok { n, err := strconv.Atoi(v) - if err == nil { + if err == nil && n > 0 { uploads = n } } @@ -86,7 +86,14 @@ func (c *Configuration) ConcurrentTransfers() int { func (c *Configuration) BatchTransfer() bool { if v, ok := c.GitConfig("lfs.batch"); ok { - return v == "true" + if v == "true" || v == "" { + return true + } + + // Any numeric value except 0 is considered true + if n, err := strconv.Atoi(v); err == nil && n != 0 { + return true + } } return false } diff --git a/lfs/config_test.go b/lfs/config_test.go index cb14e6fa..1508f72d 100644 --- a/lfs/config_test.go +++ b/lfs/config_test.go @@ -193,3 +193,138 @@ func TestObjectsUrl(t *testing.T) { } } } + +func TestConcurrentTransfersSetValue(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.concurrenttransfers": "5", + }, + } + + n := config.ConcurrentTransfers() + assert.Equal(t, 5, n) +} + +func TestConcurrentTransfersDefault(t *testing.T) { + config := &Configuration{} + + n := config.ConcurrentTransfers() + assert.Equal(t, 3, n) +} + +func TestConcurrentTransfersZeroValue(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.concurrenttransfers": "0", + }, + } + + n := config.ConcurrentTransfers() + assert.Equal(t, 3, n) +} + +func TestConcurrentTransfersNonNumeric(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.concurrenttransfers": "elephant", + }, + } + + n := config.ConcurrentTransfers() + assert.Equal(t, 3, n) +} + +func TestConcurrentTransfersNegativeValue(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.concurrenttransfers": "-5", + }, + } + + n := config.ConcurrentTransfers() + assert.Equal(t, 3, n) +} + +func TestBatchTrue(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.batch": "true", + }, + } + + v := config.BatchTransfer() + assert.Equal(t, true, v) +} + +func TestBatchNumeric1IsTrue(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.batch": "1", + }, + } + + v := config.BatchTransfer() + assert.Equal(t, true, v) +} + +func TestBatchNumeric0IsFalse(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.batch": "0", + }, + } + + v := config.BatchTransfer() + assert.Equal(t, false, v) +} + +func TestBatchOtherNumericsAreTrue(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.batch": "42", + }, + } + + v := config.BatchTransfer() + assert.Equal(t, true, v) +} + +func TestBatchNegativeNumericsAreTrue(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.batch": "-1", + }, + } + + v := config.BatchTransfer() + assert.Equal(t, true, v) +} + +func TestBatchNonBooleanIsFalse(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.batch": "elephant", + }, + } + + v := config.BatchTransfer() + assert.Equal(t, false, v) +} + +func TestBatchPresentButBlankIsTrue(t *testing.T) { + config := &Configuration{ + gitConfig: map[string]string{ + "lfs.batch": "", + }, + } + + v := config.BatchTransfer() + assert.Equal(t, true, v) +} + +func TestBatchAbsentIsFalse(t *testing.T) { + config := &Configuration{} + + v := config.BatchTransfer() + assert.Equal(t, false, v) +} From 36246c58ae74965d059043bb4f1e08c1fe73dc66 Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 27 May 2015 10:37:06 -0400 Subject: [PATCH 27/36] Add the concurrent and batch settings to `lfs env` output --- lfs/lfs.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lfs/lfs.go b/lfs/lfs.go index f01cb4a5..76762a8d 100644 --- a/lfs/lfs.go +++ b/lfs/lfs.go @@ -52,11 +52,13 @@ func LocalMediaPath(sha string) (string, error) { func Environ() []string { osEnviron := os.Environ() - env := make([]string, 4, len(osEnviron)+4) + env := make([]string, 6, len(osEnviron)+6) env[0] = fmt.Sprintf("LocalWorkingDir=%s", LocalWorkingDir) env[1] = fmt.Sprintf("LocalGitDir=%s", LocalGitDir) env[2] = fmt.Sprintf("LocalMediaDir=%s", LocalMediaDir) env[3] = fmt.Sprintf("TempDir=%s", TempDir) + env[4] = fmt.Sprintf("ConcurrentTransfers=%d", Config.ConcurrentTransfers()) + env[5] = fmt.Sprintf("BatchTransfer=%v", Config.BatchTransfer()) for _, e := range osEnviron { if !strings.Contains(e, "GIT_") { From b44af514e991e75bbbe18f26e2f070e4af7b7b10 Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 27 May 2015 15:03:12 -0400 Subject: [PATCH 28/36] Update tracerx so it can run perf tracing without regular tracing --- .vendor/src/github.com/rubyist/tracerx/tracerx.go | 8 ++++++-- Godeps | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.vendor/src/github.com/rubyist/tracerx/tracerx.go b/.vendor/src/github.com/rubyist/tracerx/tracerx.go index 5246ab63..a4bdcf4b 100644 --- a/.vendor/src/github.com/rubyist/tracerx/tracerx.go +++ b/.vendor/src/github.com/rubyist/tracerx/tracerx.go @@ -68,7 +68,7 @@ func PerformanceSince(what string, t time.Time) { func PerformanceSinceKey(key, what string, t time.Time) { tracer := getTracer(key) - if tracer.enabled && tracer.performance { + if tracer.performance { since := time.Since(t) fmt.Fprintf(tracer.w, "performance %s: %.9f s\n", what, since.Seconds()) } @@ -114,7 +114,7 @@ func initializeTracer(key string) *tracer { trace := os.Getenv(fmt.Sprintf("%s_TRACE", key)) if trace == "" || strings.ToLower(trace) == "false" { - return tracer + tracer.enabled = false } perf := os.Getenv(fmt.Sprintf("%s_TRACE_PERFORMANCE", key)) @@ -122,6 +122,10 @@ func initializeTracer(key string) *tracer { tracer.performance = true } + if !tracer.enabled && !tracer.performance { + return tracer + } + fd, err := strconv.Atoi(trace) if err != nil { // Not a number, it could be a path for a log file diff --git a/Godeps b/Godeps index 3831059a..5fd6349c 100644 --- a/Godeps +++ b/Godeps @@ -6,4 +6,4 @@ github.com/olekukonko/ts ecf753e7c962639ab5a1fb46f7da627d4c github.com/spf13/cobra 864687ae689edc28688c67edef47e3d2ad651a1b github.com/spf13/pflag 463bdc838f2b35e9307e91d480878bda5fff7232 github.com/technoweenie/go-contentaddressable 38171def3cd15e3b76eb156219b3d48704643899 -github.com/rubyist/tracerx f6aa9369b3277bc21384878e8279642da722f407 +github.com/rubyist/tracerx 51cd50e73e07cc41c22caec66af15313dff1aebb From d059f105a3ad3f566a8e2344767ca0c13e565d6b Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 27 May 2015 15:03:28 -0400 Subject: [PATCH 29/36] Add some perf tracing to lfs get, run a single update-index process --- commands/command_get.go | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/commands/command_get.go b/commands/command_get.go index 362655ab..47818514 100644 --- a/commands/command_get.go +++ b/commands/command_get.go @@ -2,9 +2,12 @@ package commands import ( "os" + "os/exec" + "time" "github.com/github/git-lfs/git" "github.com/github/git-lfs/lfs" + "github.com/rubyist/tracerx" "github.com/spf13/cobra" ) @@ -40,7 +43,9 @@ func getCommand(cmd *cobra.Command, args []string) { q.Add(lfs.NewDownloadable(p)) } + processQueue := time.Now() q.Process() + tracerx.PerformanceSince("process queue", processQueue) target, err := git.ResolveRef(ref) if err != nil { @@ -53,6 +58,7 @@ func getCommand(cmd *cobra.Command, args []string) { if target == current { // We just downloaded the files for the current ref, we can copy them into // the working directory and update the git index + updateWd := time.Now() for _, pointer := range pointers { file, err := os.Create(pointer.Name) if err != nil { @@ -62,12 +68,28 @@ func getCommand(cmd *cobra.Command, args []string) { if err := lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil); err != nil { Panic(err, "Could not write working directory file") } - - if err := git.UpdateIndex(pointer.Name); err != nil { - Panic(err, "Could not update index") - } } + tracerx.PerformanceSince("update working directory", updateWd) + + updateIndex := time.Now() + cmd := exec.Command("git", "update-index", "-q", "--refresh", "--stdin") + stdin, err := cmd.StdinPipe() + if err != nil { + Panic(err, "Could not update the index") + } + + if err := cmd.Start(); err != nil { + Panic(err, "Could not update the index") + } + + for _, pointer := range pointers { + stdin.Write([]byte(pointer.Name + "\n")) + } + stdin.Close() + cmd.Wait() + tracerx.PerformanceSince("update index", updateIndex) } + } func init() { From ebc81aedb9af1dfb24f492ff93115000c2da18ea Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 27 May 2015 15:45:18 -0400 Subject: [PATCH 30/36] Run update-index as a single background process Reorganize the transfer queue to provide a channel to watch for object OIDs as they finish. This can be used in the `get` command to feed a goroutine that will copy the file to the working directory and inform the update-index process about it as the transfers finish. This leads to a greatly reduced amount of time spent updating the index after a get. --- commands/command_get.go | 74 +++++++++++++++++++++++++---------------- lfs/download_queue.go | 4 +-- lfs/scanner.go | 22 ++++++------ lfs/transfer_queue.go | 18 ++++++++++ 4 files changed, 76 insertions(+), 42 deletions(-) diff --git a/commands/command_get.go b/commands/command_get.go index 47818514..2cb8ab1c 100644 --- a/commands/command_get.go +++ b/commands/command_get.go @@ -43,53 +43,69 @@ func getCommand(cmd *cobra.Command, args []string) { q.Add(lfs.NewDownloadable(p)) } - processQueue := time.Now() - q.Process() - tracerx.PerformanceSince("process queue", processQueue) - target, err := git.ResolveRef(ref) if err != nil { + Panic(err, "Could not resolve git ref") } current, err := git.CurrentRef() if err != nil { + Panic(err, "Could not get the current git ref") } if target == current { // We just downloaded the files for the current ref, we can copy them into - // the working directory and update the git index - updateWd := time.Now() - for _, pointer := range pointers { - file, err := os.Create(pointer.Name) + // the working directory and update the git index. We're doing this in a + // goroutine so they can be copied as they come in, for efficiency. + watch := q.Watch() + + go func() { + files := make(map[string]*lfs.WrappedPointer, len(pointers)) + for _, pointer := range pointers { + files[pointer.Oid] = pointer + } + + // Fire up the update-index command + cmd := exec.Command("git", "update-index", "-q", "--refresh", "--stdin") + stdin, err := cmd.StdinPipe() if err != nil { - Panic(err, "Could not create working directory file") + Panic(err, "Could not update the index") } - if err := lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil); err != nil { - Panic(err, "Could not write working directory file") + if err := cmd.Start(); err != nil { + Panic(err, "Could not update the index") } - } - tracerx.PerformanceSince("update working directory", updateWd) - updateIndex := time.Now() - cmd := exec.Command("git", "update-index", "-q", "--refresh", "--stdin") - stdin, err := cmd.StdinPipe() - if err != nil { - Panic(err, "Could not update the index") - } + // As files come in, write them to the wd and update the index + for oid := range watch { + pointer, ok := files[oid] + if !ok { + continue + } - if err := cmd.Start(); err != nil { - Panic(err, "Could not update the index") - } + file, err := os.Create(pointer.Name) + if err != nil { + Panic(err, "Could not create working directory file") + } - for _, pointer := range pointers { - stdin.Write([]byte(pointer.Name + "\n")) - } - stdin.Close() - cmd.Wait() - tracerx.PerformanceSince("update index", updateIndex) + if err := lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil); err != nil { + Panic(err, "Could not write working directory file") + } + file.Close() + + stdin.Write([]byte(pointer.Name + "\n")) + } + + stdin.Close() + if err := cmd.Wait(); err != nil { + Panic(err, "Error updating the git index") + } + }() + + processQueue := time.Now() + q.Process() + tracerx.PerformanceSince("process queue", processQueue) } - } func init() { diff --git a/lfs/download_queue.go b/lfs/download_queue.go index 4822d4ea..c0ba4ae2 100644 --- a/lfs/download_queue.go +++ b/lfs/download_queue.go @@ -1,11 +1,11 @@ package lfs type Downloadable struct { - Pointer *wrappedPointer + Pointer *WrappedPointer object *objectResource } -func NewDownloadable(p *wrappedPointer) *Downloadable { +func NewDownloadable(p *WrappedPointer) *Downloadable { return &Downloadable{Pointer: p} } diff --git a/lfs/scanner.go b/lfs/scanner.go index 57c3d7d1..0478e45d 100644 --- a/lfs/scanner.go +++ b/lfs/scanner.go @@ -26,10 +26,10 @@ const ( chanBufSize = 100 ) -// wrappedPointer wraps a pointer.Pointer and provides the git sha1 +// WrappedPointer wraps a pointer.Pointer and provides the git sha1 // and the file name associated with the object, taken from the // rev-list output. -type wrappedPointer struct { +type WrappedPointer struct { Sha1 string Name string SrcName string @@ -49,9 +49,9 @@ type indexFile struct { var z40 = regexp.MustCompile(`\^?0{40}`) -// ScanRefs takes a ref and returns a slice of wrappedPointer objects +// ScanRefs takes a ref and returns a slice of WrappedPointer objects // for all Git LFS pointers it finds for that ref. -func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) { +func ScanRefs(refLeft, refRight string) ([]*WrappedPointer, error) { nameMap := make(map[string]string, 0) start := time.Now() @@ -74,7 +74,7 @@ func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) { return nil, err } - pointers := make([]*wrappedPointer, 0) + pointers := make([]*WrappedPointer, 0) for p := range pointerc { if name, ok := nameMap[p.Sha1]; ok { p.Name = name @@ -85,9 +85,9 @@ func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) { return pointers, nil } -// ScanIndex returns a slice of wrappedPointer objects for all +// ScanIndex returns a slice of WrappedPointer objects for all // Git LFS pointers it finds in the index. -func ScanIndex() ([]*wrappedPointer, error) { +func ScanIndex() ([]*WrappedPointer, error) { nameMap := make(map[string]*indexFile, 0) start := time.Now() @@ -132,7 +132,7 @@ func ScanIndex() ([]*wrappedPointer, error) { return nil, err } - pointers := make([]*wrappedPointer, 0) + pointers := make([]*WrappedPointer, 0) for p := range pointerc { if e, ok := nameMap[p.Sha1]; ok { p.Name = e.Name @@ -288,13 +288,13 @@ func catFileBatchCheck(revs chan string) (chan string, error) { // of a git object, given its sha1. The contents will be decoded into // a Git LFS pointer. revs is a channel over which strings containing Git SHA1s // will be sent. It returns a channel from which point.Pointers can be read. -func catFileBatch(revs chan string) (chan *wrappedPointer, error) { +func catFileBatch(revs chan string) (chan *WrappedPointer, error) { cmd, err := startCommand("git", "cat-file", "--batch") if err != nil { return nil, err } - pointers := make(chan *wrappedPointer, chanBufSize) + pointers := make(chan *WrappedPointer, chanBufSize) go func() { for { @@ -316,7 +316,7 @@ func catFileBatch(revs chan string) (chan *wrappedPointer, error) { p, err := DecodePointer(bytes.NewBuffer(nbuf)) if err == nil { - pointers <- &wrappedPointer{ + pointers <- &WrappedPointer{ Sha1: string(fields[0]), Size: p.Size, Pointer: p, diff --git a/lfs/transfer_queue.go b/lfs/transfer_queue.go index 84839028..477a598b 100644 --- a/lfs/transfer_queue.go +++ b/lfs/transfer_queue.go @@ -21,6 +21,7 @@ type Transferable interface { type TransferQueue struct { transferc chan Transferable errorc chan *WrappedError + watchers []chan string errors []*WrappedError wg sync.WaitGroup workers int @@ -39,6 +40,7 @@ func newTransferQueue(workers, files int) *TransferQueue { return &TransferQueue{ transferc: make(chan Transferable, files), errorc: make(chan *WrappedError), + watchers: make([]chan string, 0), workers: workers, files: files, authCond: sync.NewCond(&sync.Mutex{}), @@ -51,6 +53,14 @@ func (q *TransferQueue) Add(t Transferable) { q.transferables[t.Oid()] = t } +// Watch returns a channel where the queue will write the OID of each transfer +// as it completes. The channel will be closed when the queue finishes processing. +func (q *TransferQueue) Watch() chan string { + c := make(chan string, q.files) + q.watchers = append(q.watchers, c) + return c +} + // processIndividual processes the queue of transfers one at a time by making // a POST call for each object, feeding the results to the transfer workers. // If configured, the object transfers can still happen concurrently, the @@ -185,6 +195,11 @@ func (q *TransferQueue) Process() { if err := transfer.Transfer(cb); err != nil { q.errorc <- err + } else { + oid := transfer.Oid() + for _, c := range q.watchers { + c <- oid + } } f := atomic.AddInt64(&q.finished, 1) @@ -202,6 +217,9 @@ func (q *TransferQueue) Process() { q.wg.Wait() close(q.errorc) + for _, watcher := range q.watchers { + close(watcher) + } q.bar.Finish() } From 589592c32e77279b7246576d6db584969bbcca3b Mon Sep 17 00:00:00 2001 From: Rick Olson Date: Wed, 27 May 2015 14:09:58 -0600 Subject: [PATCH 31/36] fix env tests --- test/test-env.sh | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/test/test-env.sh b/test/test-env.sh index 1c83866b..8abdaf20 100755 --- a/test/test-env.sh +++ b/test/test-env.sh @@ -14,6 +14,8 @@ begin_test "env with no remote" LocalGitDir=$TRASHDIR/$reponame/.git LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects TempDir=$TRASHDIR/$reponame/.git/lfs/tmp +ConcurrentTransfers=3 +BatchTransfer=false $(env | grep "^GIT") ") actual=$(git lfs env) @@ -35,6 +37,8 @@ LocalWorkingDir=$TRASHDIR/$reponame LocalGitDir=$TRASHDIR/$reponame/.git LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects TempDir=$TRASHDIR/$reponame/.git/lfs/tmp +ConcurrentTransfers=3 +BatchTransfer=false $(env | grep "^GIT") ") actual=$(git lfs env) @@ -62,6 +66,8 @@ LocalWorkingDir=$TRASHDIR/$reponame LocalGitDir=$TRASHDIR/$reponame/.git LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects TempDir=$TRASHDIR/$reponame/.git/lfs/tmp +ConcurrentTransfers=3 +BatchTransfer=false $(env | grep "^GIT") ") actual=$(git lfs env) @@ -87,6 +93,8 @@ LocalWorkingDir=$TRASHDIR/$reponame LocalGitDir=$TRASHDIR/$reponame/.git LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects TempDir=$TRASHDIR/$reponame/.git/lfs/tmp +ConcurrentTransfers=3 +BatchTransfer=false $(env | grep "^GIT") ") actual=$(git lfs env) @@ -115,6 +123,8 @@ LocalWorkingDir=$TRASHDIR/$reponame LocalGitDir=$TRASHDIR/$reponame/.git LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects TempDir=$TRASHDIR/$reponame/.git/lfs/tmp +ConcurrentTransfers=3 +BatchTransfer=false $(env | grep "^GIT") ") actual=$(git lfs env) @@ -145,6 +155,42 @@ LocalWorkingDir=$TRASHDIR/$reponame LocalGitDir=$TRASHDIR/$reponame/.git LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects TempDir=$TRASHDIR/$reponame/.git/lfs/tmp +ConcurrentTransfers=3 +BatchTransfer=false +$(env | grep "^GIT") +") + actual=$(git lfs env) + [ "$expected" = "$actual" ] + + cd .git + + [ "$expected" = "$actual" ] +) +end_test + +begin_test "env with multiple remotes and lfs url and batch configs" +( + set -e + reponame="env-multiple-remotes-lfs-batch-configs" + mkdir $reponame + cd $reponame + git init + git remote add origin "$GITSERVER/env-origin-remote" + git remote add other "$GITSERVER/env-other-remote" + git config lfs.url "http://foo/bar" + git config lfs.batch true + git config lfs.concurrenttransfers 5 + git config remote.origin.lfsurl "http://custom/origin" + git config remote.other.lfsurl "http://custom/other" + + expected=$(printf "Endpoint=http://foo/bar +Endpoint (other)=http://custom/other +LocalWorkingDir=$TRASHDIR/$reponame +LocalGitDir=$TRASHDIR/$reponame/.git +LocalMediaDir=$TRASHDIR/$reponame/.git/lfs/objects +TempDir=$TRASHDIR/$reponame/.git/lfs/tmp +ConcurrentTransfers=5 +BatchTransfer=true $(env | grep "^GIT") ") actual=$(git lfs env) From e3173e282dd1867374e424b28904e34e7df08a23 Mon Sep 17 00:00:00 2001 From: Rick Olson Date: Wed, 27 May 2015 14:12:18 -0600 Subject: [PATCH 32/36] fix batch tests is no longer needed, just 'git lfs' grep and string comparison improvements from test-happy-path.sh --- test/test-batch-transfer.sh | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/test/test-batch-transfer.sh b/test/test-batch-transfer.sh index 9fced1e0..42ecc3c3 100755 --- a/test/test-batch-transfer.sh +++ b/test/test-batch-transfer.sh @@ -23,26 +23,22 @@ begin_test "batch transfer" clone_repo "$reponame" repo # This executes Git LFS from the local repo that was just cloned. - out=$($GITLFS track "*.dat" 2>&1) - echo "$out" | grep "Tracking \*.dat" + git lfs track "*.dat" 2>&1 | tee track.log + grep "Tracking \*.dat" track.log - contents=$(printf "a") + contents="a" contents_oid=$(printf "$contents" | shasum -a 256 | cut -f 1 -d " ") printf "$contents" > a.dat git add a.dat git add .gitattributes - out=$(git commit -m "add a.dat" 2>&1) - echo "$out" | grep "master (root-commit)" - echo "$out" | grep "2 files changed" - echo "$out" | grep "create mode 100644 a.dat" - echo "$out" | grep "create mode 100644 .gitattributes" - - out=$(cat a.dat) - if [ "$out" != "a" ]; then - exit 1 - fi + git commit -m "add a.dat" 2>&1 | tee commit.log + grep "master (root-commit)" commit.log + grep "2 files changed" commit.log + grep "create mode 100644 a.dat" commit.log + grep "create mode 100644 .gitattributes" commit.log + [ "a" = "$(cat a.dat)" ] # This is a small shell function that runs several git commands together. assert_pointer "master" "a.dat" "$contents_oid" 1 @@ -64,10 +60,7 @@ begin_test "batch transfer" git pull 2>&1 | grep "Downloading a.dat (1 B)" - out=$(cat a.dat) - if [ "$out" != "a" ]; then - exit 1 - fi + [ "a" = "$(cat a.dat)" ] assert_pointer "master" "a.dat" "$contents_oid" 1 ) From 71ebdfdbffa4ae79a588b948c05004d2bbe6a1cd Mon Sep 17 00:00:00 2001 From: Rick Olson Date: Wed, 27 May 2015 14:18:02 -0600 Subject: [PATCH 33/36] fix push and pre-push tests --- test/test-pre-push.sh | 50 +++++++++++++------------------------------ test/test-push.sh | 16 ++++---------- 2 files changed, 19 insertions(+), 47 deletions(-) diff --git a/test/test-pre-push.sh b/test/test-pre-push.sh index 1e250ed5..42bfbfe4 100755 --- a/test/test-pre-push.sh +++ b/test/test-pre-push.sh @@ -16,11 +16,8 @@ begin_test "pre-push" echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" | git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 | - tee push.log | - grep "(0 of 0 files) 0 B 0" || { - cat push.log - exit 1 - } + tee push.log + grep "(0 of 0 files) 0 B 0" push.log git lfs track "*.dat" echo "hi" > hi.dat @@ -33,31 +30,23 @@ begin_test "pre-push" curl -v "$GITSERVER/$reponame.git/info/lfs/objects/98ea6e4f216f2fb4b69fff9b3a44842c38686ca685f3f55dc48c5d3fb1107be4" \ -u "user:pass" \ -H "Accept: application/vnd.git-lfs+json" 2>&1 | - tee http.log | - grep "404 Not Found" || { - cat http.log - exit 1 - } + tee http.log + + grep "404 Not Found" http.log # push file to the git lfs server echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" | git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 | - tee push.log | - grep "(1 of 1 files) 3 B / 3 B 100.00 %" || { - cat push.log - exit 1 - } + tee push.log + grep "(1 of 1 files)" push.log # now the file exists curl -v "$GITSERVER/$reponame.git/info/lfs/objects/98ea6e4f216f2fb4b69fff9b3a44842c38686ca685f3f55dc48c5d3fb1107be4" \ -u "user:pass" \ -o lfs.json \ -H "Accept: application/vnd.git-lfs+json" 2>&1 | - tee http.log | - grep "200 OK" || { - cat http.log - exit 1 - } + tee http.log + grep "200 OK" http.log grep "download" lfs.json || { cat lfs.json @@ -95,28 +84,19 @@ begin_test "pre-push dry-run" curl -v "$GITSERVER/$reponame.git/info/lfs/objects/2840e0eafda1d0760771fe28b91247cf81c76aa888af28a850b5648a338dc15b" \ -u "user:pass" \ -H "Accept: application/vnd.git-lfs+json" 2>&1 | - tee http.log | - grep "404 Not Found" || { - cat http.log - exit 1 - } + tee http.log + grep "404 Not Found" http.log echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" | git lfs pre-push --dry-run origin "$GITSERVER/$reponame" 2>&1 | - tee push.log | - grep "push hi.dat" || { - cat push.log - exit 1 - } + tee push.log + grep "push hi.dat" push.log # file still doesn't exist curl -v "$GITSERVER/$reponame.git/info/lfs/objects/2840e0eafda1d0760771fe28b91247cf81c76aa888af28a850b5648a338dc15b" \ -u "user:pass" \ -H "Accept: application/vnd.git-lfs+json" 2>&1 | - tee http.log | - grep "404 Not Found" || { - cat http.log - exit 1 - } + tee http.log + grep "404 Not Found" http.log ) end_test diff --git a/test/test-push.sh b/test/test-push.sh index fbfc3936..a63843c3 100755 --- a/test/test-push.sh +++ b/test/test-push.sh @@ -15,24 +15,16 @@ begin_test "push" git add .gitattributes a.dat git commit -m "add a.dat" - git lfs push origin master 2>&1 | - tee push.log | - grep "(1 of 1 files) 7 B / 7 B 100.00 %" || { - cat push.log - exit 1 - } + git lfs push origin master 2>&1 | tee push.log + grep "(1 of 1 files)" push.log git checkout -b push-b echo "push b" > b.dat git add b.dat git commit -m "add b.dat" - git lfs push origin push-b 2>&1 | - tee push.log | - grep "(2 of 2 files) 14 B / 14 B 100.00 %" || { - cat push.log - exit 1 - } + git lfs push origin push-b 2>&1 | tee push.log + grep "(2 of 2 files)" push.log ) end_test From efdcb06b856b248c99c9374b835f9331ff3ba97b Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 27 May 2015 16:56:03 -0400 Subject: [PATCH 34/36] Remove unused Upload() function, fix upload tests to use new functions --- lfs/client.go | 98 ----------------------------------- lfs/upload_test.go | 124 +++++++++++++++++++++++++++++++++------------ 2 files changed, 92 insertions(+), 130 deletions(-) diff --git a/lfs/client.go b/lfs/client.go index 494c6125..e7ab7454 100644 --- a/lfs/client.go +++ b/lfs/client.go @@ -207,104 +207,6 @@ func Batch(objects []*objectResource) ([]*objectResource, *WrappedError) { return objs, nil } -func Upload(oidPath, filename string, cb CopyCallback) *WrappedError { - oid := filepath.Base(oidPath) - file, err := os.Open(oidPath) - if err != nil { - return Error(err) - } - defer file.Close() - - stat, err := file.Stat() - if err != nil { - return Error(err) - } - - reqObj := &objectResource{ - Oid: oid, - Size: stat.Size(), - } - - by, err := json.Marshal(reqObj) - if err != nil { - return Error(err) - } - - req, creds, err := newApiRequest("POST", oid) - if err != nil { - return Error(err) - } - - req.Header.Set("Content-Type", mediaType) - req.Header.Set("Content-Length", strconv.Itoa(len(by))) - req.ContentLength = int64(len(by)) - req.Body = &byteCloser{bytes.NewReader(by)} - - tracerx.Printf("api: uploading %s (%s)", filename, oid) - res, obj, wErr := doApiRequest(req, creds) - if wErr != nil { - sendApiEvent(apiEventFail) - return wErr - } - - sendApiEvent(apiEventSuccess) - - reader := &CallbackReader{ - C: cb, - TotalSize: reqObj.Size, - Reader: file, - } - - if res.StatusCode == 200 { - // Drain the reader to update any progress bars - io.Copy(ioutil.Discard, reader) - return nil - } - - req, creds, err = obj.NewRequest("upload", "PUT") - if err != nil { - return Error(err) - } - - if len(req.Header.Get("Content-Type")) == 0 { - req.Header.Set("Content-Type", "application/octet-stream") - } - req.Header.Set("Content-Length", strconv.FormatInt(reqObj.Size, 10)) - req.ContentLength = reqObj.Size - - req.Body = ioutil.NopCloser(reader) - - res, wErr = doHttpRequest(req, creds) - if wErr != nil { - return wErr - } - - if res.StatusCode > 299 { - return Errorf(nil, "Invalid status for %s %s: %d", req.Method, req.URL, res.StatusCode) - } - - io.Copy(ioutil.Discard, res.Body) - res.Body.Close() - - req, creds, err = obj.NewRequest("verify", "POST") - if err == objectRelationDoesNotExist { - return nil - } else if err != nil { - return Error(err) - } - - req.Header.Set("Content-Type", mediaType) - req.Header.Set("Content-Length", strconv.Itoa(len(by))) - req.ContentLength = int64(len(by)) - req.Body = ioutil.NopCloser(bytes.NewReader(by)) - res, wErr = doHttpRequest(req, creds) - - io.Copy(ioutil.Discard, res.Body) - res.Body.Close() - - return wErr -} - func UploadCheck(oidPath string) (*objectResource, *WrappedError) { oid := filepath.Base(oidPath) diff --git a/lfs/upload_test.go b/lfs/upload_test.go index b048f0d1..3bcc7031 100644 --- a/lfs/upload_test.go +++ b/lfs/upload_test.go @@ -9,7 +9,6 @@ import ( "net/http" "net/http/httptest" "os" - "path/filepath" "strconv" "testing" ) @@ -18,6 +17,11 @@ func TestExistingUpload(t *testing.T) { mux := http.NewServeMux() server := httptest.NewServer(mux) tmp := tempdir(t) + olddir := LocalMediaDir + LocalMediaDir = tmp + defer func() { + LocalMediaDir = olddir + }() defer server.Close() defer os.RemoveAll(tmp) @@ -51,7 +55,7 @@ func TestExistingUpload(t *testing.T) { t.Fatal(err) } - if reqObj.Oid != "oid" { + if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" { t.Errorf("invalid oid from request: %s", reqObj.Oid) } @@ -60,6 +64,8 @@ func TestExistingUpload(t *testing.T) { } obj := &objectResource{ + Oid: reqObj.Oid, + Size: reqObj.Size, Links: map[string]*linkRelation{ "upload": &linkRelation{ Href: server.URL + "/upload", @@ -99,22 +105,18 @@ func TestExistingUpload(t *testing.T) { Config.SetConfig("lfs.url", server.URL+"/media") - oidPath := filepath.Join(tmp, "oid") + oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11") if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { t.Fatal(err) } - // stores callbacks - calls := make([][]int64, 0, 5) - cb := func(total int64, written int64, current int) error { - calls = append(calls, []int64{total, written}) - return nil - } - - wErr := Upload(oidPath, "", cb) + o, wErr := UploadCheck(oidPath) if wErr != nil { t.Fatal(wErr) } + if o != nil { + t.Errorf("Got an object back") + } if !postCalled { t.Errorf("POST not called") @@ -133,6 +135,11 @@ func TestUploadWithRedirect(t *testing.T) { mux := http.NewServeMux() server := httptest.NewServer(mux) tmp := tempdir(t) + olddir := LocalMediaDir + LocalMediaDir = tmp + defer func() { + LocalMediaDir = olddir + }() defer server.Close() defer os.RemoveAll(tmp) @@ -186,7 +193,7 @@ func TestUploadWithRedirect(t *testing.T) { t.Fatal(err) } - if reqObj.Oid != "oid" { + if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" { t.Errorf("invalid oid from request: %s", reqObj.Oid) } @@ -221,21 +228,30 @@ func TestUploadWithRedirect(t *testing.T) { Config.SetConfig("lfs.url", server.URL+"/redirect") - oidPath := filepath.Join(tmp, "oid") + oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11") if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { t.Fatal(err) } - wErr := Upload(oidPath, "", nil) + obj, wErr := UploadCheck(oidPath) if wErr != nil { t.Fatal(wErr) } + + if obj != nil { + t.Fatal("Received an object") + } } func TestSuccessfulUploadWithVerify(t *testing.T) { mux := http.NewServeMux() server := httptest.NewServer(mux) tmp := tempdir(t) + olddir := LocalMediaDir + LocalMediaDir = tmp + defer func() { + LocalMediaDir = olddir + }() defer server.Close() defer os.RemoveAll(tmp) @@ -269,7 +285,7 @@ func TestSuccessfulUploadWithVerify(t *testing.T) { t.Fatal(err) } - if reqObj.Oid != "oid" { + if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" { t.Errorf("invalid oid from request: %s", reqObj.Oid) } @@ -278,6 +294,8 @@ func TestSuccessfulUploadWithVerify(t *testing.T) { } obj := &objectResource{ + Oid: reqObj.Oid, + Size: reqObj.Size, Links: map[string]*linkRelation{ "upload": &linkRelation{ Href: server.URL + "/upload", @@ -369,7 +387,7 @@ func TestSuccessfulUploadWithVerify(t *testing.T) { t.Fatal(err) } - if reqObj.Oid != "oid" { + if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" { t.Errorf("invalid oid from request: %s", reqObj.Oid) } @@ -383,7 +401,7 @@ func TestSuccessfulUploadWithVerify(t *testing.T) { Config.SetConfig("lfs.url", server.URL+"/media") - oidPath := filepath.Join(tmp, "oid") + oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11") if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { t.Fatal(err) } @@ -395,7 +413,11 @@ func TestSuccessfulUploadWithVerify(t *testing.T) { return nil } - wErr := Upload(oidPath, "", cb) + obj, wErr := UploadCheck(oidPath) + if wErr != nil { + t.Fatal(wErr) + } + wErr = UploadObject(obj, cb) if wErr != nil { t.Fatal(wErr) } @@ -428,6 +450,11 @@ func TestSuccessfulUploadWithoutVerify(t *testing.T) { mux := http.NewServeMux() server := httptest.NewServer(mux) tmp := tempdir(t) + olddir := LocalMediaDir + LocalMediaDir = tmp + defer func() { + LocalMediaDir = olddir + }() defer server.Close() defer os.RemoveAll(tmp) @@ -460,7 +487,7 @@ func TestSuccessfulUploadWithoutVerify(t *testing.T) { t.Fatal(err) } - if reqObj.Oid != "oid" { + if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" { t.Errorf("invalid oid from request: %s", reqObj.Oid) } @@ -469,6 +496,8 @@ func TestSuccessfulUploadWithoutVerify(t *testing.T) { } obj := &objectResource{ + Oid: reqObj.Oid, + Size: reqObj.Size, Links: map[string]*linkRelation{ "upload": &linkRelation{ Href: server.URL + "/upload", @@ -532,12 +561,16 @@ func TestSuccessfulUploadWithoutVerify(t *testing.T) { Config.SetConfig("lfs.url", server.URL+"/media") - oidPath := filepath.Join(tmp, "oid") + oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11") if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { t.Fatal(err) } - wErr := Upload(oidPath, "", nil) + obj, wErr := UploadCheck(oidPath) + if wErr != nil { + t.Fatal(wErr) + } + wErr = UploadObject(obj, nil) if wErr != nil { t.Fatal(wErr) } @@ -555,6 +588,11 @@ func TestUploadApiError(t *testing.T) { mux := http.NewServeMux() server := httptest.NewServer(mux) tmp := tempdir(t) + olddir := LocalMediaDir + LocalMediaDir = olddir + defer func() { + LocalMediaDir = olddir + }() defer server.Close() defer os.RemoveAll(tmp) @@ -567,14 +605,14 @@ func TestUploadApiError(t *testing.T) { Config.SetConfig("lfs.url", server.URL+"/media") - oidPath := filepath.Join(tmp, "oid") + oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11") if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { t.Fatal(err) } - wErr := Upload(oidPath, "", nil) + _, wErr := UploadCheck(oidPath) if wErr == nil { - t.Fatal("no error?") + t.Fatal(wErr) } if wErr.Panic { @@ -594,6 +632,11 @@ func TestUploadStorageError(t *testing.T) { mux := http.NewServeMux() server := httptest.NewServer(mux) tmp := tempdir(t) + olddir := LocalMediaDir + LocalMediaDir = tmp + defer func() { + LocalMediaDir = olddir + }() defer server.Close() defer os.RemoveAll(tmp) @@ -626,7 +669,7 @@ func TestUploadStorageError(t *testing.T) { t.Fatal(err) } - if reqObj.Oid != "oid" { + if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" { t.Errorf("invalid oid from request: %s", reqObj.Oid) } @@ -635,6 +678,8 @@ func TestUploadStorageError(t *testing.T) { } obj := &objectResource{ + Oid: reqObj.Oid, + Size: reqObj.Size, Links: map[string]*linkRelation{ "upload": &linkRelation{ Href: server.URL + "/upload", @@ -667,14 +712,18 @@ func TestUploadStorageError(t *testing.T) { Config.SetConfig("lfs.url", server.URL+"/media") - oidPath := filepath.Join(tmp, "oid") + oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11") if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { t.Fatal(err) } - wErr := Upload(oidPath, "", nil) + obj, wErr := UploadCheck(oidPath) + if wErr != nil { + t.Fatal(wErr) + } + wErr = UploadObject(obj, nil) if wErr == nil { - t.Fatal("no error?") + t.Fatal("Expected an error") } if wErr.Panic { @@ -698,6 +747,11 @@ func TestUploadVerifyError(t *testing.T) { mux := http.NewServeMux() server := httptest.NewServer(mux) tmp := tempdir(t) + olddir := LocalMediaDir + LocalMediaDir = tmp + defer func() { + LocalMediaDir = olddir + }() defer server.Close() defer os.RemoveAll(tmp) @@ -731,7 +785,7 @@ func TestUploadVerifyError(t *testing.T) { t.Fatal(err) } - if reqObj.Oid != "oid" { + if reqObj.Oid != "988881adc9fc3655077dc2d4d757d480b5ea0e11" { t.Errorf("invalid oid from request: %s", reqObj.Oid) } @@ -740,6 +794,8 @@ func TestUploadVerifyError(t *testing.T) { } obj := &objectResource{ + Oid: reqObj.Oid, + Size: reqObj.Size, Links: map[string]*linkRelation{ "upload": &linkRelation{ Href: server.URL + "/upload", @@ -804,14 +860,18 @@ func TestUploadVerifyError(t *testing.T) { Config.SetConfig("lfs.url", server.URL+"/media") - oidPath := filepath.Join(tmp, "oid") + oidPath, _ := LocalMediaPath("988881adc9fc3655077dc2d4d757d480b5ea0e11") if err := ioutil.WriteFile(oidPath, []byte("test"), 0744); err != nil { t.Fatal(err) } - wErr := Upload(oidPath, "", nil) + obj, wErr := UploadCheck(oidPath) + if wErr != nil { + t.Fatal(wErr) + } + wErr = UploadObject(obj, nil) if wErr == nil { - t.Fatal("no error?") + t.Fatal("Expected an error") } if wErr.Panic { From 2fbe6eac3fe5b2f7b77ac7a1fbf26eeb933e9613 Mon Sep 17 00:00:00 2001 From: Rick Olson Date: Wed, 27 May 2015 15:09:53 -0600 Subject: [PATCH 35/36] update test to use tee --- test/test-batch-transfer.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test-batch-transfer.sh b/test/test-batch-transfer.sh index 42ecc3c3..42fc6bea 100755 --- a/test/test-batch-transfer.sh +++ b/test/test-batch-transfer.sh @@ -49,9 +49,9 @@ begin_test "batch transfer" git config --add --local lfs.batch true # This pushes to the remote repository set up at the top of the test. - out=$(git push origin master 2>&1) - echo "$out" | grep "(1 of 1 files)" - echo "$out" | grep "master -> master" + git push origin master 2>&1 | tee push.log + grep "(1 of 1 files)" push.log + grep "master -> master" push.log assert_server_object "$contents_oid" "$contents" From 7df39996faca1bd5a0021cd5a462331648b0fd8a Mon Sep 17 00:00:00 2001 From: risk danger olson Date: Thu, 28 May 2015 10:43:05 -0600 Subject: [PATCH 36/36] mention in the API spec that the batch api is still experimental --- docs/api.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/api.md b/docs/api.md index 4a31c708..32ab4be5 100644 --- a/docs/api.md +++ b/docs/api.md @@ -271,6 +271,10 @@ only appears on a 200 status. This request retrieves the metadata for a batch of objects, given a JSON body containing an object with an array of objects with the oid and size of each object. +NOTE: This is an experimental API that is subject to change. It will ship disabled +by default in Git LFS v0.5.2. You can enable it if your Git LFS server supports it +with `git config lfs.batch true`. + ``` > POST https://git-lfs-server.com/objects/batch HTTP/1.1 > Accept: application/vnd.git-lfs+json