diff --git a/commands/command_fetch.go b/commands/command_fetch.go index c730426b..fad6586c 100644 --- a/commands/command_fetch.go +++ b/commands/command_fetch.go @@ -40,15 +40,10 @@ func fetchCommand(cmd *cobra.Command, args []string) { q := lfs.NewDownloadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) - size := int64(0) for _, p := range pointers { - size += p.Size q.Add(lfs.NewDownloadable(p)) } - pb := lfs.NewProgressMeter(len(pointers), size) - q.Monitor(pb) - target, err := git.ResolveRef(ref) if err != nil { Panic(err, "Could not resolve git ref") diff --git a/commands/command_pre_push.go b/commands/command_pre_push.go index 6dec142d..ef3d7995 100644 --- a/commands/command_pre_push.go +++ b/commands/command_pre_push.go @@ -1,6 +1,7 @@ package commands import ( + "fmt" "io/ioutil" "os" "strings" @@ -71,16 +72,19 @@ func prePushCommand(cmd *cobra.Command, args []string) { Panic(err, "Error scanning for Git LFS files") } + if len(pointers) > 0 && !prePushDryRun { + fmt.Fprintf(os.Stdout, "Checking %d Git LFS files\n", len(pointers)) + } + uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) - size := int64(0) for _, pointer := range pointers { if prePushDryRun { Print("push %s", pointer.Name) + uploadQueue.SuppressProgress() continue } - size += pointer.Size u, wErr := lfs.NewUploadable(pointer.Oid, pointer.Name) if wErr != nil { if cleanPointerErr, ok := wErr.Err.(*lfs.CleanedPointerError); ok { @@ -97,9 +101,6 @@ func prePushCommand(cmd *cobra.Command, args []string) { } if !prePushDryRun { - pb := lfs.NewProgressMeter(len(pointers), size) - uploadQueue.Monitor(pb) - uploadQueue.Wait() for _, err := range uploadQueue.Errors() { if Debugging || err.Panic { diff --git a/commands/command_push.go b/commands/command_push.go index 19946175..b26b2248 100644 --- a/commands/command_push.go +++ b/commands/command_push.go @@ -1,6 +1,7 @@ package commands import ( + "fmt" "io/ioutil" "os" "strings" @@ -32,16 +33,17 @@ func uploadsBetweenRefs(left string, right string) *lfs.TransferQueue { Panic(err, "Error scanning for Git LFS files") } + if len(pointers) > 0 && !pushDryRun { + fmt.Fprintf(os.Stdout, "Checking %d Git LFS files\n", len(pointers)) + } uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) - size := int64(0) for i, pointer := range pointers { if pushDryRun { Print("push %s", pointer.Name) continue } - size += pointer.Size tracerx.Printf("prepare upload: %s %s %d/%d", pointer.Oid, pointer.Name, i+1, len(pointers)) u, wErr := lfs.NewUploadable(pointer.Oid, pointer.Name) @@ -55,17 +57,15 @@ func uploadsBetweenRefs(left string, right string) *lfs.TransferQueue { uploadQueue.Add(u) } - pb := lfs.NewProgressMeter(len(pointers), size) - uploadQueue.Monitor(pb) - return uploadQueue } func uploadsWithObjectIDs(oids []string) *lfs.TransferQueue { + if len(oids) > 0 && !pushDryRun { + fmt.Fprintf(os.Stdout, "Checking %d Git LFS files\n", len(oids)) + } uploadQueue := lfs.NewUploadQueue(lfs.Config.ConcurrentTransfers(), len(oids)) - size := int64(0) - for i, oid := range oids { if pushDryRun { Print("push object ID %s", oid) @@ -81,13 +81,9 @@ func uploadsWithObjectIDs(oids []string) *lfs.TransferQueue { Exit(wErr.Error()) } } - size += u.Size() uploadQueue.Add(u) } - pb := lfs.NewProgressMeter(len(oids), size) - uploadQueue.Monitor(pb) - return uploadQueue } diff --git a/lfs/progress_meter.go b/lfs/progress_meter.go index 04be3b9b..a5d18216 100644 --- a/lfs/progress_meter.go +++ b/lfs/progress_meter.go @@ -4,18 +4,25 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync/atomic" + "time" - "github.com/github/git-lfs/vendor/_nuts/github.com/cheggaaa/pb" + "github.com/github/git-lfs/vendor/_nuts/github.com/olekukonko/ts" ) type ProgressMeter struct { - totalBytes int64 - startedFiles int64 - totalFiles int - bar *pb.ProgressBar - logger *progressLogger - fileIndex map[string]int64 // Maps a file name to its transfer number + transferringFiles int64 + finishedFiles int64 + totalFiles int64 + skippedFiles int64 + totalBytes int64 + currentBytes int64 + startTime time.Time + finished chan interface{} + logger *progressLogger + fileIndex map[string]int64 // Maps a file name to its transfer number + show bool } type progressEvent int @@ -26,45 +33,57 @@ const ( transferFinish ) -func NewProgressMeter(files int, bytes int64) *ProgressMeter { - bar := pb.New64(bytes) - bar.SetUnits(pb.U_BYTES) - bar.ShowBar = false - bar.Prefix(fmt.Sprintf("(0 of %d files) ", files)) - bar.Start() - +func NewProgressMeter() *ProgressMeter { logger, err := newProgressLogger() if err != nil { - // TODO display an error + fmt.Fprintf(os.Stderr, "Error creating progress logger: %s\n", err) } - return &ProgressMeter{ - totalBytes: bytes, - totalFiles: files, - bar: bar, - logger: logger, - fileIndex: make(map[string]int64), + pm := &ProgressMeter{ + logger: logger, + startTime: time.Now(), + fileIndex: make(map[string]int64), + finished: make(chan interface{}), + show: true, } + + go pm.writer() + + return pm +} + +func (p *ProgressMeter) Add(name string, size int64) { + atomic.AddInt64(&p.totalBytes, size) + idx := atomic.AddInt64(&p.transferringFiles, 1) + p.fileIndex[name] = idx +} + +func (p *ProgressMeter) Skip() { + atomic.AddInt64(&p.skippedFiles, 1) } func (p *ProgressMeter) Log(event progressEvent, direction, name string, read, total int64, current int) { switch event { - case transferStart: - idx := atomic.AddInt64(&p.startedFiles, 1) - p.fileIndex[name] = idx case transferBytes: - p.bar.Add(current) + atomic.AddInt64(&p.currentBytes, int64(current)) p.logBytes(direction, name, read, total) case transferFinish: + atomic.AddInt64(&p.finishedFiles, 1) delete(p.fileIndex, name) } - - p.bar.Prefix(fmt.Sprintf("(%d of %d files) ", p.startedFiles, p.totalFiles)) } func (p *ProgressMeter) Finish() { - p.bar.Finish() + close(p.finished) + p.update() p.logger.Close() + if p.show { + fmt.Fprintf(os.Stdout, "\n") + } +} + +func (p *ProgressMeter) Suppress() { + p.show = false } func (p *ProgressMeter) logBytes(direction, name string, read, total int64) { @@ -75,6 +94,43 @@ func (p *ProgressMeter) logBytes(direction, name string, read, total int64) { } } +func (p *ProgressMeter) writer() { + p.update() + for { + select { + case <-p.finished: + return + case <-time.After(time.Millisecond * 200): + p.update() + } + } +} + +func (p *ProgressMeter) update() { + if !p.show { + return + } + + width := 80 // default to 80 chars wide if ts.GetSize() fails + size, err := ts.GetSize() + if err == nil { + width = size.Col() + } + + out := fmt.Sprintf("\r(%d of %d files), %s/%s", + p.transferringFiles, + p.finishedFiles, + formatBytes(p.currentBytes), + formatBytes(p.totalBytes)) + + if skipped := atomic.LoadInt64(&p.skippedFiles); skipped > 0 { + out += fmt.Sprintf(", Skipped: %d", skipped) + } + + padding := strings.Repeat(" ", width-len(out)) + fmt.Fprintf(os.Stdout, out+padding) +} + // progressLogger provides a wrapper around an os.File that can either // write to the file or ignore all writes completely. type progressLogger struct { @@ -133,3 +189,18 @@ func newProgressLogger() (*progressLogger, error) { return &progressLogger{true, file}, nil } + +func formatBytes(i int64) string { + switch { + case i > 1099511627776: + return fmt.Sprintf("%#0.2f TB", float64(i)/1099511627776) + case i > 1073741824: + return fmt.Sprintf("%#0.2f GB", float64(i)/1073741824) + case i > 1048576: + return fmt.Sprintf("%#0.2f MB", float64(i)/1048576) + case i > 1024: + return fmt.Sprintf("%#0.2f KB", float64(i)/1024) + } + + return fmt.Sprintf("%d B", i) +} diff --git a/lfs/transfer_queue.go b/lfs/transfer_queue.go index 7016ca4e..f3bd8577 100644 --- a/lfs/transfer_queue.go +++ b/lfs/transfer_queue.go @@ -24,6 +24,7 @@ type Transferable interface { // TransferQueue provides a queue that will allow concurrent transfers. type TransferQueue struct { + meter *ProgressMeter workers int // Number of transfer workers to spawn transferKind string errors []*WrappedError @@ -33,13 +34,13 @@ type TransferQueue struct { transferc chan Transferable // Channel for processing transfers errorc chan *WrappedError // Channel for processing errors watchers []chan string - monitors []*ProgressMeter wait sync.WaitGroup } // newTransferQueue builds a TransferQueue, allowing `workers` concurrent transfers. func newTransferQueue(workers int) *TransferQueue { q := &TransferQueue{ + meter: NewProgressMeter(), apic: make(chan Transferable, batchSize), transferc: make(chan Transferable, batchSize), errorc: make(chan *WrappedError), @@ -80,9 +81,7 @@ func (q *TransferQueue) Wait() { close(watcher) } - for _, mon := range q.monitors { - mon.Finish() - } + q.meter.Finish() } // Watch returns a channel where the queue will write the OID of each transfer @@ -93,8 +92,9 @@ func (q *TransferQueue) Watch() chan string { return c } -func (q *TransferQueue) Monitor(m *ProgressMeter) { - q.monitors = append(q.monitors, m) +// SuppressProgress turns off progress metering for the TransferQueue +func (q *TransferQueue) SuppressProgress() { + q.meter.Suppress() } // individualApiRoutine processes the queue of transfers one at a time by making @@ -185,9 +185,11 @@ func (q *TransferQueue) batchApiRoutine() { transfer.SetObject(o) q.transferc <- transfer } else { + q.meter.Skip() q.wait.Done() } } else { + q.meter.Skip() q.wait.Done() } } @@ -204,17 +206,11 @@ func (q *TransferQueue) errorCollector() { func (q *TransferQueue) transferWorker() { for transfer := range q.transferc { cb := func(total, read int64, current int) error { - // Log out to monitors - for _, mon := range q.monitors { - mon.Log(transferBytes, q.transferKind, transfer.Name(), read, total, current) - } + q.meter.Log(transferBytes, q.transferKind, transfer.Name(), read, total, current) return nil } - for _, mon := range q.monitors { - mon.Log(transferStart, q.transferKind, transfer.Name(), 0, 0, 0) - } - + q.meter.Add(transfer.Name(), transfer.Size()) if err := transfer.Transfer(cb); err != nil { q.errorc <- err } else { @@ -224,9 +220,7 @@ func (q *TransferQueue) transferWorker() { } } - for _, mon := range q.monitors { - mon.Log(transferFinish, q.transferKind, transfer.Name(), 0, 0, 0) - } + q.meter.Log(transferFinish, q.transferKind, transfer.Name(), 0, 0, 0) q.wait.Done() } diff --git a/test/test-pre-push.sh b/test/test-pre-push.sh index eefc5684..e5f7c559 100755 --- a/test/test-pre-push.sh +++ b/test/test-pre-push.sh @@ -17,7 +17,7 @@ begin_test "pre-push" echo "refs/heads/master master refs/heads/master 0000000000000000000000000000000000000000" | git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 | tee push.log - grep "(0 of 0 files) 0 B 0" push.log + grep "(0 of 0 files)" push.log git lfs track "*.dat" echo "hi" > hi.dat @@ -67,6 +67,7 @@ begin_test "pre-push dry-run" git lfs pre-push --dry-run origin "$GITSERVER/$reponame" 2>&1 | tee push.log grep "push hi.dat" push.log + cat push.log [ `wc -l < push.log` = 1 ] refute_server_object "$reponame" 2840e0eafda1d0760771fe28b91247cf81c76aa888af28a850b5648a338dc15b diff --git a/test/test-push.sh b/test/test-push.sh index b6282cad..992999bd 100755 --- a/test/test-push.sh +++ b/test/test-push.sh @@ -17,7 +17,6 @@ begin_test "push" git lfs push origin master 2>&1 | tee push.log grep "(1 of 1 files)" push.log - [ $(wc -l < push.log) -eq 1 ] git checkout -b push-b echo "push b" > b.dat @@ -26,7 +25,6 @@ begin_test "push" git lfs push origin push-b 2>&1 | tee push.log grep "(2 of 2 files)" push.log - [ $(wc -l < push.log) -eq 1 ] ) end_test @@ -74,7 +72,6 @@ begin_test "push object id(s)" 4c48d2a6991c9895bcddcf027e1e4907280bcf21975492b1afbade396d6a3340 \ 2>&1 | tee push.log grep "(1 of 1 files)" push.log - [ $(wc -l < push.log) -eq 1 ] echo "push b" > b.dat git add b.dat @@ -85,6 +82,5 @@ begin_test "push object id(s)" 82be50ad35070a4ef3467a0a650c52d5b637035e7ad02c36652e59d01ba282b7 \ 2>&1 | tee push.log grep "(2 of 2 files)" push.log - [ $(wc -l < push.log) -eq 1 ] ) end_test