Progress meter needs to be more dynamic for batches of n

This removes the `pb` based progress bar for the transfer queue and add
a simpler custom bar. pb really wants to know total counts and byte
sizes up front but when doing batches of n we would prefer to not know
that. This is due to the fact that the batcher will block until it is
drained instead of processing all pointers and storing them in memory
while transfer operations run. This also clarifies when files are
skipped because the server already has them. Progress output here
contains the number of finished transfers/bytes, the number of
transfers/bytes currently in the queue, and the number of files skipped,
if any.
This commit is contained in:
rubyist 2015-07-27 12:43:41 -04:00
parent 1c4b0aafd3
commit ab5b35a8c0
7 changed files with 125 additions and 71 deletions

@ -40,15 +40,10 @@ func fetchCommand(cmd *cobra.Command, args []string) {
q := lfs.NewDownloadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) q := lfs.NewDownloadQueue(lfs.Config.ConcurrentTransfers(), len(pointers))
size := int64(0)
for _, p := range pointers { for _, p := range pointers {
size += p.Size
q.Add(lfs.NewDownloadable(p)) q.Add(lfs.NewDownloadable(p))
} }
pb := lfs.NewProgressMeter(len(pointers), size)
q.Monitor(pb)
target, err := git.ResolveRef(ref) target, err := git.ResolveRef(ref)
if err != nil { if err != nil {
Panic(err, "Could not resolve git ref") Panic(err, "Could not resolve git ref")

@ -1,6 +1,7 @@
package commands package commands
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"strings" "strings"
@ -71,16 +72,19 @@ func prePushCommand(cmd *cobra.Command, args []string) {
Panic(err, "Error scanning for Git LFS files") Panic(err, "Error scanning for Git LFS files")
} }
if len(pointers) > 0 && !prePushDryRun {
fmt.Fprintf(os.Stdout, "Checking %d Git LFS files\n", len(pointers))
}
uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(pointers))
size := int64(0)
for _, pointer := range pointers { for _, pointer := range pointers {
if prePushDryRun { if prePushDryRun {
Print("push %s", pointer.Name) Print("push %s", pointer.Name)
uploadQueue.SuppressProgress()
continue continue
} }
size += pointer.Size
u, wErr := lfs.NewUploadable(pointer.Oid, pointer.Name) u, wErr := lfs.NewUploadable(pointer.Oid, pointer.Name)
if wErr != nil { if wErr != nil {
if cleanPointerErr, ok := wErr.Err.(*lfs.CleanedPointerError); ok { if cleanPointerErr, ok := wErr.Err.(*lfs.CleanedPointerError); ok {
@ -97,9 +101,6 @@ func prePushCommand(cmd *cobra.Command, args []string) {
} }
if !prePushDryRun { if !prePushDryRun {
pb := lfs.NewProgressMeter(len(pointers), size)
uploadQueue.Monitor(pb)
uploadQueue.Wait() uploadQueue.Wait()
for _, err := range uploadQueue.Errors() { for _, err := range uploadQueue.Errors() {
if Debugging || err.Panic { if Debugging || err.Panic {

@ -1,6 +1,7 @@
package commands package commands
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"strings" "strings"
@ -32,16 +33,17 @@ func uploadsBetweenRefs(left string, right string) *lfs.TransferQueue {
Panic(err, "Error scanning for Git LFS files") Panic(err, "Error scanning for Git LFS files")
} }
if len(pointers) > 0 && !pushDryRun {
fmt.Fprintf(os.Stdout, "Checking %d Git LFS files\n", len(pointers))
}
uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(pointers))
size := int64(0)
for i, pointer := range pointers { for i, pointer := range pointers {
if pushDryRun { if pushDryRun {
Print("push %s", pointer.Name) Print("push %s", pointer.Name)
continue continue
} }
size += pointer.Size
tracerx.Printf("prepare upload: %s %s %d/%d", pointer.Oid, pointer.Name, i+1, len(pointers)) tracerx.Printf("prepare upload: %s %s %d/%d", pointer.Oid, pointer.Name, i+1, len(pointers))
u, wErr := lfs.NewUploadable(pointer.Oid, pointer.Name) u, wErr := lfs.NewUploadable(pointer.Oid, pointer.Name)
@ -55,17 +57,15 @@ func uploadsBetweenRefs(left string, right string) *lfs.TransferQueue {
uploadQueue.Add(u) uploadQueue.Add(u)
} }
pb := lfs.NewProgressMeter(len(pointers), size)
uploadQueue.Monitor(pb)
return uploadQueue return uploadQueue
} }
func uploadsWithObjectIDs(oids []string) *lfs.TransferQueue { func uploadsWithObjectIDs(oids []string) *lfs.TransferQueue {
if len(oids) > 0 && !pushDryRun {
fmt.Fprintf(os.Stdout, "Checking %d Git LFS files\n", len(oids))
}
uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(oids)) uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(oids))
size := int64(0)
for i, oid := range oids { for i, oid := range oids {
if pushDryRun { if pushDryRun {
Print("push object ID %s", oid) Print("push object ID %s", oid)
@ -81,13 +81,9 @@ func uploadsWithObjectIDs(oids []string) *lfs.TransferQueue {
Exit(wErr.Error()) Exit(wErr.Error())
} }
} }
size += u.Size()
uploadQueue.Add(u) uploadQueue.Add(u)
} }
pb := lfs.NewProgressMeter(len(oids), size)
uploadQueue.Monitor(pb)
return uploadQueue return uploadQueue
} }

@ -4,18 +4,25 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync/atomic" "sync/atomic"
"time"
"github.com/github/git-lfs/vendor/_nuts/github.com/cheggaaa/pb" "github.com/github/git-lfs/vendor/_nuts/github.com/olekukonko/ts"
) )
type ProgressMeter struct { type ProgressMeter struct {
totalBytes int64 transferringFiles int64
startedFiles int64 finishedFiles int64
totalFiles int totalFiles int64
bar *pb.ProgressBar skippedFiles int64
logger *progressLogger totalBytes int64
fileIndex map[string]int64 // Maps a file name to its transfer number currentBytes int64
startTime time.Time
finished chan interface{}
logger *progressLogger
fileIndex map[string]int64 // Maps a file name to its transfer number
show bool
} }
type progressEvent int type progressEvent int
@ -26,45 +33,57 @@ const (
transferFinish transferFinish
) )
func NewProgressMeter(files int, bytes int64) *ProgressMeter { func NewProgressMeter() *ProgressMeter {
bar := pb.New64(bytes)
bar.SetUnits(pb.U_BYTES)
bar.ShowBar = false
bar.Prefix(fmt.Sprintf("(0 of %d files) ", files))
bar.Start()
logger, err := newProgressLogger() logger, err := newProgressLogger()
if err != nil { if err != nil {
// TODO display an error fmt.Fprintf(os.Stderr, "Error creating progress logger: %s\n", err)
} }
return &ProgressMeter{ pm := &ProgressMeter{
totalBytes: bytes, logger: logger,
totalFiles: files, startTime: time.Now(),
bar: bar, fileIndex: make(map[string]int64),
logger: logger, finished: make(chan interface{}),
fileIndex: make(map[string]int64), show: true,
} }
go pm.writer()
return pm
}
func (p *ProgressMeter) Add(name string, size int64) {
atomic.AddInt64(&p.totalBytes, size)
idx := atomic.AddInt64(&p.transferringFiles, 1)
p.fileIndex[name] = idx
}
func (p *ProgressMeter) Skip() {
atomic.AddInt64(&p.skippedFiles, 1)
} }
func (p *ProgressMeter) Log(event progressEvent, direction, name string, read, total int64, current int) { func (p *ProgressMeter) Log(event progressEvent, direction, name string, read, total int64, current int) {
switch event { switch event {
case transferStart:
idx := atomic.AddInt64(&p.startedFiles, 1)
p.fileIndex[name] = idx
case transferBytes: case transferBytes:
p.bar.Add(current) atomic.AddInt64(&p.currentBytes, int64(current))
p.logBytes(direction, name, read, total) p.logBytes(direction, name, read, total)
case transferFinish: case transferFinish:
atomic.AddInt64(&p.finishedFiles, 1)
delete(p.fileIndex, name) delete(p.fileIndex, name)
} }
p.bar.Prefix(fmt.Sprintf("(%d of %d files) ", p.startedFiles, p.totalFiles))
} }
func (p *ProgressMeter) Finish() { func (p *ProgressMeter) Finish() {
p.bar.Finish() close(p.finished)
p.update()
p.logger.Close() p.logger.Close()
if p.show {
fmt.Fprintf(os.Stdout, "\n")
}
}
func (p *ProgressMeter) Suppress() {
p.show = false
} }
func (p *ProgressMeter) logBytes(direction, name string, read, total int64) { func (p *ProgressMeter) logBytes(direction, name string, read, total int64) {
@ -75,6 +94,43 @@ func (p *ProgressMeter) logBytes(direction, name string, read, total int64) {
} }
} }
func (p *ProgressMeter) writer() {
p.update()
for {
select {
case <-p.finished:
return
case <-time.After(time.Millisecond * 200):
p.update()
}
}
}
func (p *ProgressMeter) update() {
if !p.show {
return
}
width := 80 // default to 80 chars wide if ts.GetSize() fails
size, err := ts.GetSize()
if err == nil {
width = size.Col()
}
out := fmt.Sprintf("\r(%d of %d files), %s/%s",
p.transferringFiles,
p.finishedFiles,
formatBytes(p.currentBytes),
formatBytes(p.totalBytes))
if skipped := atomic.LoadInt64(&p.skippedFiles); skipped > 0 {
out += fmt.Sprintf(", Skipped: %d", skipped)
}
padding := strings.Repeat(" ", width-len(out))
fmt.Fprintf(os.Stdout, out+padding)
}
// progressLogger provides a wrapper around an os.File that can either // progressLogger provides a wrapper around an os.File that can either
// write to the file or ignore all writes completely. // write to the file or ignore all writes completely.
type progressLogger struct { type progressLogger struct {
@ -133,3 +189,18 @@ func newProgressLogger() (*progressLogger, error) {
return &progressLogger{true, file}, nil return &progressLogger{true, file}, nil
} }
func formatBytes(i int64) string {
switch {
case i > 1099511627776:
return fmt.Sprintf("%#0.2f TB", float64(i)/1099511627776)
case i > 1073741824:
return fmt.Sprintf("%#0.2f GB", float64(i)/1073741824)
case i > 1048576:
return fmt.Sprintf("%#0.2f MB", float64(i)/1048576)
case i > 1024:
return fmt.Sprintf("%#0.2f KB", float64(i)/1024)
}
return fmt.Sprintf("%d B", i)
}

@ -24,6 +24,7 @@ type Transferable interface {
// TransferQueue provides a queue that will allow concurrent transfers. // TransferQueue provides a queue that will allow concurrent transfers.
type TransferQueue struct { type TransferQueue struct {
meter *ProgressMeter
workers int // Number of transfer workers to spawn workers int // Number of transfer workers to spawn
transferKind string transferKind string
errors []*WrappedError errors []*WrappedError
@ -33,13 +34,13 @@ type TransferQueue struct {
transferc chan Transferable // Channel for processing transfers transferc chan Transferable // Channel for processing transfers
errorc chan *WrappedError // Channel for processing errors errorc chan *WrappedError // Channel for processing errors
watchers []chan string watchers []chan string
monitors []*ProgressMeter
wait sync.WaitGroup wait sync.WaitGroup
} }
// newTransferQueue builds a TransferQueue, allowing `workers` concurrent transfers. // newTransferQueue builds a TransferQueue, allowing `workers` concurrent transfers.
func newTransferQueue(workers int) *TransferQueue { func newTransferQueue(workers int) *TransferQueue {
q := &TransferQueue{ q := &TransferQueue{
meter: NewProgressMeter(),
apic: make(chan Transferable, batchSize), apic: make(chan Transferable, batchSize),
transferc: make(chan Transferable, batchSize), transferc: make(chan Transferable, batchSize),
errorc: make(chan *WrappedError), errorc: make(chan *WrappedError),
@ -80,9 +81,7 @@ func (q *TransferQueue) Wait() {
close(watcher) close(watcher)
} }
for _, mon := range q.monitors { q.meter.Finish()
mon.Finish()
}
} }
// Watch returns a channel where the queue will write the OID of each transfer // Watch returns a channel where the queue will write the OID of each transfer
@ -93,8 +92,9 @@ func (q *TransferQueue) Watch() chan string {
return c return c
} }
func (q *TransferQueue) Monitor(m *ProgressMeter) { // SuppressProgress turns off progress metering for the TransferQueue
q.monitors = append(q.monitors, m) func (q *TransferQueue) SuppressProgress() {
q.meter.Suppress()
} }
// individualApiRoutine processes the queue of transfers one at a time by making // individualApiRoutine processes the queue of transfers one at a time by making
@ -185,9 +185,11 @@ func (q *TransferQueue) batchApiRoutine() {
transfer.SetObject(o) transfer.SetObject(o)
q.transferc <- transfer q.transferc <- transfer
} else { } else {
q.meter.Skip()
q.wait.Done() q.wait.Done()
} }
} else { } else {
q.meter.Skip()
q.wait.Done() q.wait.Done()
} }
} }
@ -204,17 +206,11 @@ func (q *TransferQueue) errorCollector() {
func (q *TransferQueue) transferWorker() { func (q *TransferQueue) transferWorker() {
for transfer := range q.transferc { for transfer := range q.transferc {
cb := func(total, read int64, current int) error { cb := func(total, read int64, current int) error {
// Log out to monitors q.meter.Log(transferBytes, q.transferKind, transfer.Name(), read, total, current)
for _, mon := range q.monitors {
mon.Log(transferBytes, q.transferKind, transfer.Name(), read, total, current)
}
return nil return nil
} }
for _, mon := range q.monitors { q.meter.Add(transfer.Name(), transfer.Size())
mon.Log(transferStart, q.transferKind, transfer.Name(), 0, 0, 0)
}
if err := transfer.Transfer(cb); err != nil { if err := transfer.Transfer(cb); err != nil {
q.errorc <- err q.errorc <- err
} else { } else {
@ -224,9 +220,7 @@ func (q *TransferQueue) transferWorker() {
} }
} }
for _, mon := range q.monitors { q.meter.Log(transferFinish, q.transferKind, transfer.Name(), 0, 0, 0)
mon.Log(transferFinish, q.transferKind, transfer.Name(), 0, 0, 0)
}
q.wait.Done() q.wait.Done()
} }

@ -17,7 +17,7 @@ begin_test "pre-push"
echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" | echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" |
git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 | git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 |
tee push.log tee push.log
grep "(0 of 0 files) 0 B 0" push.log grep "(0 of 0 files)" push.log
git lfs track "*.dat" git lfs track "*.dat"
echo "hi" > hi.dat echo "hi" > hi.dat
@ -67,6 +67,7 @@ begin_test "pre-push dry-run"
git lfs pre-push --dry-run origin "$GITSERVER/$reponame" 2>&1 | git lfs pre-push --dry-run origin "$GITSERVER/$reponame" 2>&1 |
tee push.log tee push.log
grep "push hi.dat" push.log grep "push hi.dat" push.log
cat push.log
[ `wc -l < push.log` = 1 ] [ `wc -l < push.log` = 1 ]
refute_server_object "$reponame" 2840e0eafda1d0760771fe28b91247cf81c76aa888af28a850b5648a338dc15b refute_server_object "$reponame" 2840e0eafda1d0760771fe28b91247cf81c76aa888af28a850b5648a338dc15b

@ -17,7 +17,6 @@ begin_test "push"
git lfs push origin master 2>&1 | tee push.log git lfs push origin master 2>&1 | tee push.log
grep "(1 of 1 files)" push.log grep "(1 of 1 files)" push.log
[ $(wc -l < push.log) -eq 1 ]
git checkout -b push-b git checkout -b push-b
echo "push b" > b.dat echo "push b" > b.dat
@ -26,7 +25,6 @@ begin_test "push"
git lfs push origin push-b 2>&1 | tee push.log git lfs push origin push-b 2>&1 | tee push.log
grep "(2 of 2 files)" push.log grep "(2 of 2 files)" push.log
[ $(wc -l < push.log) -eq 1 ]
) )
end_test end_test
@ -74,7 +72,6 @@ begin_test "push object id(s)"
4c48d2a6991c9895bcddcf027e1e4907280bcf21975492b1afbade396d6a3340 \ 4c48d2a6991c9895bcddcf027e1e4907280bcf21975492b1afbade396d6a3340 \
2>&1 | tee push.log 2>&1 | tee push.log
grep "(1 of 1 files)" push.log grep "(1 of 1 files)" push.log
[ $(wc -l < push.log) -eq 1 ]
echo "push b" > b.dat echo "push b" > b.dat
git add b.dat git add b.dat
@ -85,6 +82,5 @@ begin_test "push object id(s)"
82be50ad35070a4ef3467a0a650c52d5b637035e7ad02c36652e59d01ba282b7 \ 82be50ad35070a4ef3467a0a650c52d5b637035e7ad02c36652e59d01ba282b7 \
2>&1 | tee push.log 2>&1 | tee push.log
grep "(2 of 2 files)" push.log grep "(2 of 2 files)" push.log
[ $(wc -l < push.log) -eq 1 ]
) )
end_test end_test