Run update-index as a single background process

Reorganize the transfer queue to provide a channel to watch for object
OIDs as they finish. This can be used in the `get` command to feed a
goroutine that will copy the file to the working directory and inform
the update-index process about it as the transfers finish. This leads to
a greatly reduced amount of time spent updating the index after a get.
This commit is contained in:
rubyist 2015-05-27 15:45:18 -04:00
parent d059f105a3
commit ebc81aedb9
4 changed files with 76 additions and 42 deletions

@ -43,53 +43,69 @@ func getCommand(cmd *cobra.Command, args []string) {
q.Add(lfs.NewDownloadable(p))
}
processQueue := time.Now()
q.Process()
tracerx.PerformanceSince("process queue", processQueue)
target, err := git.ResolveRef(ref)
if err != nil {
Panic(err, "Could not resolve git ref")
}
current, err := git.CurrentRef()
if err != nil {
Panic(err, "Could not get the current git ref")
}
if target == current {
// We just downloaded the files for the current ref, we can copy them into
// the working directory and update the git index
updateWd := time.Now()
for _, pointer := range pointers {
file, err := os.Create(pointer.Name)
// the working directory and update the git index. We're doing this in a
// goroutine so they can be copied as they come in, for efficiency.
watch := q.Watch()
go func() {
files := make(map[string]*lfs.WrappedPointer, len(pointers))
for _, pointer := range pointers {
files[pointer.Oid] = pointer
}
// Fire up the update-index command
cmd := exec.Command("git", "update-index", "-q", "--refresh", "--stdin")
stdin, err := cmd.StdinPipe()
if err != nil {
Panic(err, "Could not create working directory file")
Panic(err, "Could not update the index")
}
if err := lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil); err != nil {
Panic(err, "Could not write working directory file")
if err := cmd.Start(); err != nil {
Panic(err, "Could not update the index")
}
}
tracerx.PerformanceSince("update working directory", updateWd)
updateIndex := time.Now()
cmd := exec.Command("git", "update-index", "-q", "--refresh", "--stdin")
stdin, err := cmd.StdinPipe()
if err != nil {
Panic(err, "Could not update the index")
}
// As files come in, write them to the wd and update the index
for oid := range watch {
pointer, ok := files[oid]
if !ok {
continue
}
if err := cmd.Start(); err != nil {
Panic(err, "Could not update the index")
}
file, err := os.Create(pointer.Name)
if err != nil {
Panic(err, "Could not create working directory file")
}
for _, pointer := range pointers {
stdin.Write([]byte(pointer.Name + "\n"))
}
stdin.Close()
cmd.Wait()
tracerx.PerformanceSince("update index", updateIndex)
if err := lfs.PointerSmudge(file, pointer.Pointer, pointer.Name, nil); err != nil {
Panic(err, "Could not write working directory file")
}
file.Close()
stdin.Write([]byte(pointer.Name + "\n"))
}
stdin.Close()
if err := cmd.Wait(); err != nil {
Panic(err, "Error updating the git index")
}
}()
processQueue := time.Now()
q.Process()
tracerx.PerformanceSince("process queue", processQueue)
}
}
func init() {

@ -1,11 +1,11 @@
package lfs
type Downloadable struct {
Pointer *wrappedPointer
Pointer *WrappedPointer
object *objectResource
}
func NewDownloadable(p *wrappedPointer) *Downloadable {
func NewDownloadable(p *WrappedPointer) *Downloadable {
return &Downloadable{Pointer: p}
}

@ -26,10 +26,10 @@ const (
chanBufSize = 100
)
// wrappedPointer wraps a pointer.Pointer and provides the git sha1
// WrappedPointer wraps a pointer.Pointer and provides the git sha1
// and the file name associated with the object, taken from the
// rev-list output.
type wrappedPointer struct {
type WrappedPointer struct {
Sha1 string
Name string
SrcName string
@ -49,9 +49,9 @@ type indexFile struct {
var z40 = regexp.MustCompile(`\^?0{40}`)
// ScanRefs takes a ref and returns a slice of wrappedPointer objects
// ScanRefs takes a ref and returns a slice of WrappedPointer objects
// for all Git LFS pointers it finds for that ref.
func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) {
func ScanRefs(refLeft, refRight string) ([]*WrappedPointer, error) {
nameMap := make(map[string]string, 0)
start := time.Now()
@ -74,7 +74,7 @@ func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) {
return nil, err
}
pointers := make([]*wrappedPointer, 0)
pointers := make([]*WrappedPointer, 0)
for p := range pointerc {
if name, ok := nameMap[p.Sha1]; ok {
p.Name = name
@ -85,9 +85,9 @@ func ScanRefs(refLeft, refRight string) ([]*wrappedPointer, error) {
return pointers, nil
}
// ScanIndex returns a slice of wrappedPointer objects for all
// ScanIndex returns a slice of WrappedPointer objects for all
// Git LFS pointers it finds in the index.
func ScanIndex() ([]*wrappedPointer, error) {
func ScanIndex() ([]*WrappedPointer, error) {
nameMap := make(map[string]*indexFile, 0)
start := time.Now()
@ -132,7 +132,7 @@ func ScanIndex() ([]*wrappedPointer, error) {
return nil, err
}
pointers := make([]*wrappedPointer, 0)
pointers := make([]*WrappedPointer, 0)
for p := range pointerc {
if e, ok := nameMap[p.Sha1]; ok {
p.Name = e.Name
@ -288,13 +288,13 @@ func catFileBatchCheck(revs chan string) (chan string, error) {
// of a git object, given its sha1. The contents will be decoded into
// a Git LFS pointer. revs is a channel over which strings containing Git SHA1s
// will be sent. It returns a channel from which point.Pointers can be read.
func catFileBatch(revs chan string) (chan *wrappedPointer, error) {
func catFileBatch(revs chan string) (chan *WrappedPointer, error) {
cmd, err := startCommand("git", "cat-file", "--batch")
if err != nil {
return nil, err
}
pointers := make(chan *wrappedPointer, chanBufSize)
pointers := make(chan *WrappedPointer, chanBufSize)
go func() {
for {
@ -316,7 +316,7 @@ func catFileBatch(revs chan string) (chan *wrappedPointer, error) {
p, err := DecodePointer(bytes.NewBuffer(nbuf))
if err == nil {
pointers <- &wrappedPointer{
pointers <- &WrappedPointer{
Sha1: string(fields[0]),
Size: p.Size,
Pointer: p,

@ -21,6 +21,7 @@ type Transferable interface {
type TransferQueue struct {
transferc chan Transferable
errorc chan *WrappedError
watchers []chan string
errors []*WrappedError
wg sync.WaitGroup
workers int
@ -39,6 +40,7 @@ func newTransferQueue(workers, files int) *TransferQueue {
return &TransferQueue{
transferc: make(chan Transferable, files),
errorc: make(chan *WrappedError),
watchers: make([]chan string, 0),
workers: workers,
files: files,
authCond: sync.NewCond(&sync.Mutex{}),
@ -51,6 +53,14 @@ func (q *TransferQueue) Add(t Transferable) {
q.transferables[t.Oid()] = t
}
// Watch returns a channel where the queue will write the OID of each transfer
// as it completes. The channel will be closed when the queue finishes processing.
func (q *TransferQueue) Watch() chan string {
c := make(chan string, q.files)
q.watchers = append(q.watchers, c)
return c
}
// processIndividual processes the queue of transfers one at a time by making
// a POST call for each object, feeding the results to the transfer workers.
// If configured, the object transfers can still happen concurrently, the
@ -185,6 +195,11 @@ func (q *TransferQueue) Process() {
if err := transfer.Transfer(cb); err != nil {
q.errorc <- err
} else {
oid := transfer.Oid()
for _, c := range q.watchers {
c <- oid
}
}
f := atomic.AddInt64(&q.finished, 1)
@ -202,6 +217,9 @@ func (q *TransferQueue) Process() {
q.wg.Wait()
close(q.errorc)
for _, watcher := range q.watchers {
close(watcher)
}
q.bar.Finish()
}