From ebc81aedb9af1dfb24f492ff93115000c2da18ea Mon Sep 17 00:00:00 2001 From: rubyist Date: Wed, 27 May 2015 15:45:18 -0400 Subject: [PATCH] Run update-index as a single background process Reorganize the transfer queue to provide a channel to watch for object OIDs as they finish. This can be used in the `get` command to feed a goroutine that will copy the file to the working directory and inform the update-index process about it as the transfers finish. This leads to a greatly reduced amount of time spent updating the index after a get. --- commands/command_get.go | 74 +++++++++++++++++++++++++---------------- lfs/download_queue.go | 4 +-- lfs/scanner.go | 22 ++++++------ lfs/transfer_queue.go | 18 ++++++++++ 4 files changed, 76 insertions(+), 42 deletions(-) diff --git a/commands/command_get.go b/commands/command_get.go index 47818514..2cb8ab1c 100644 --- a/commands/command_get.go +++ b/commands/command_get.go @@ -43,53 +43,69 @@ func getCommand(cmd *cobra.Command, args []string) { q.Add(lfs.NewDownloadable(p)) } - processQueue := time.Now() - q.Process() - tracerx.PerformanceSince("process queue", processQueue) - target, err := git.ResolveRef(ref) if err != nil { + Panic(err, "Could not resolve git ref") } current, err := git.CurrentRef() if err != nil { + Panic(err, "Could not get the current git ref") } if target == current { // We just downloaded the files for the current ref, we can copy them into - // the working directory and update the git index - updateWd := time.Now() - for _, pointer := range pointers { - file, err := os.Create(pointer.Name) + // the working directory and update the git index. We're doing this in a + // goroutine so they can be copied as they come in, for efficiency. + watch := q.Watch() + + go func() { + files := make(map[string]*lfs.WrappedPointer, len(pointers)) + for _, pointer := range pointers { + files[pointer.Oid] = pointer + } + + // Fire up the update-index command + cmd := exec.Command("git", "update-index", "-q", "--refresh", "--stdin") + stdin, err := cmd.StdinPipe() if err != nil { - Panic(err, "Could not create working directory file") + Panic(err, "Could not update the index") } - if err := lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil); err != nil { - Panic(err, "Could not write working directory file") + if err := cmd.Start(); err != nil { + Panic(err, "Could not update the index") } - } - tracerx.PerformanceSince("update working directory", updateWd) - updateIndex := time.Now() - cmd := exec.Command("git", "update-index", "-q", "--refresh", "--stdin") - stdin, err := cmd.StdinPipe() - if err != nil { - Panic(err, "Could not update the index") - } + // As files come in, write them to the wd and update the index + for oid := range watch { + pointer, ok := files[oid] + if !ok { + continue + } - if err := cmd.Start(); err != nil { - Panic(err, "Could not update the index") - } + file, err := os.Create(pointer.Name) + if err != nil { + Panic(err, "Could not create working directory file") + } - for _, pointer := range pointers { - stdin.Write([]byte(pointer.Name + "\n")) - } - stdin.Close() - cmd.Wait() - tracerx.PerformanceSince("update index", updateIndex) + if err := lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil); err != nil { + Panic(err, "Could not write working directory file") + } + file.Close() + + stdin.Write([]byte(pointer.Name + "\n")) + } + + stdin.Close() + if err := cmd.Wait(); err != nil { + Panic(err, "Error updating the git index") + } + }() + + processQueue := time.Now() + q.Process() + tracerx.PerformanceSince("process queue", processQueue) } - } func init() { diff --git a/lfs/download_queue.go b/lfs/download_queue.go index 4822d4ea..c0ba4ae2 100644 --- a/lfs/download_queue.go +++ b/lfs/download_queue.go @@ -1,11 +1,11 @@ package lfs type Downloadable struct { - Pointer *wrappedPointer + Pointer *WrappedPointer object *objectResource } -func NewDownloadable(p *wrappedPointer) *Downloadable { +func NewDownloadable(p *WrappedPointer) *Downloadable { return &Downloadable{Pointer: p} } diff --git a/lfs/scanner.go b/lfs/scanner.go index 57c3d7d1..0478e45d 100644 --- a/lfs/scanner.go +++ b/lfs/scanner.go @@ -26,10 +26,10 @@ const ( chanBufSize = 100 ) -// wrappedPointer wraps a pointer.Pointer and provides the git sha1 +// WrappedPointer wraps a pointer.Pointer and provides the git sha1 // and the file name associated with the object, taken from the // rev-list output. -type wrappedPointer struct { +type WrappedPointer struct { Sha1 string Name string SrcName string @@ -49,9 +49,9 @@ type indexFile struct { var z40 = regexp.MustCompile(`\^?0{40}`) -// ScanRefs takes a ref and returns a slice of wrappedPointer objects +// ScanRefs takes a ref and returns a slice of WrappedPointer objects // for all Git LFS pointers it finds for that ref. -func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) { +func ScanRefs(refLeft, refRight string) ([]*WrappedPointer, error) { nameMap := make(map[string]string, 0) start := time.Now() @@ -74,7 +74,7 @@ func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) { return nil, err } - pointers := make([]*wrappedPointer, 0) + pointers := make([]*WrappedPointer, 0) for p := range pointerc { if name, ok := nameMap[p.Sha1]; ok { p.Name = name @@ -85,9 +85,9 @@ func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) { return pointers, nil } -// ScanIndex returns a slice of wrappedPointer objects for all +// ScanIndex returns a slice of WrappedPointer objects for all // Git LFS pointers it finds in the index. -func ScanIndex() ([]*wrappedPointer, error) { +func ScanIndex() ([]*WrappedPointer, error) { nameMap := make(map[string]*indexFile, 0) start := time.Now() @@ -132,7 +132,7 @@ func ScanIndex() ([]*wrappedPointer, error) { return nil, err } - pointers := make([]*wrappedPointer, 0) + pointers := make([]*WrappedPointer, 0) for p := range pointerc { if e, ok := nameMap[p.Sha1]; ok { p.Name = e.Name @@ -288,13 +288,13 @@ func catFileBatchCheck(revs chan string) (chan string, error) { // of a git object, given its sha1. The contents will be decoded into // a Git LFS pointer. revs is a channel over which strings containing Git SHA1s // will be sent. It returns a channel from which point.Pointers can be read. -func catFileBatch(revs chan string) (chan *wrappedPointer, error) { +func catFileBatch(revs chan string) (chan *WrappedPointer, error) { cmd, err := startCommand("git", "cat-file", "--batch") if err != nil { return nil, err } - pointers := make(chan *wrappedPointer, chanBufSize) + pointers := make(chan *WrappedPointer, chanBufSize) go func() { for { @@ -316,7 +316,7 @@ func catFileBatch(revs chan string) (chan *wrappedPointer, error) { p, err := DecodePointer(bytes.NewBuffer(nbuf)) if err == nil { - pointers <- &wrappedPointer{ + pointers <- &WrappedPointer{ Sha1: string(fields[0]), Size: p.Size, Pointer: p, diff --git a/lfs/transfer_queue.go b/lfs/transfer_queue.go index 84839028..477a598b 100644 --- a/lfs/transfer_queue.go +++ b/lfs/transfer_queue.go @@ -21,6 +21,7 @@ type Transferable interface { type TransferQueue struct { transferc chan Transferable errorc chan *WrappedError + watchers []chan string errors []*WrappedError wg sync.WaitGroup workers int @@ -39,6 +40,7 @@ func newTransferQueue(workers, files int) *TransferQueue { return &TransferQueue{ transferc: make(chan Transferable, files), errorc: make(chan *WrappedError), + watchers: make([]chan string, 0), workers: workers, files: files, authCond: sync.NewCond(&sync.Mutex{}), @@ -51,6 +53,14 @@ func (q *TransferQueue) Add(t Transferable) { q.transferables[t.Oid()] = t } +// Watch returns a channel where the queue will write the OID of each transfer +// as it completes. The channel will be closed when the queue finishes processing. +func (q *TransferQueue) Watch() chan string { + c := make(chan string, q.files) + q.watchers = append(q.watchers, c) + return c +} + // processIndividual processes the queue of transfers one at a time by making // a POST call for each object, feeding the results to the transfer workers. // If configured, the object transfers can still happen concurrently, the @@ -185,6 +195,11 @@ func (q *TransferQueue) Process() { if err := transfer.Transfer(cb); err != nil { q.errorc <- err + } else { + oid := transfer.Oid() + for _, c := range q.watchers { + c <- oid + } } f := atomic.AddInt64(&q.finished, 1) @@ -202,6 +217,9 @@ func (q *TransferQueue) Process() { q.wg.Wait() close(q.errorc) + for _, watcher := range q.watchers { + close(watcher) + } q.bar.Finish() }