git-lfs/tq/meter.go

301 lines
6.8 KiB
Go
Raw Normal View History

package tq
import (
"fmt"
"os"
"path/filepath"
2016-02-23 19:00:45 +00:00
"sync"
"sync/atomic"
"time"
"github.com/git-lfs/git-lfs/tasklog"
"github.com/git-lfs/git-lfs/tools"
"github.com/git-lfs/git-lfs/tools/humanize"
)
// Meter provides a progress bar type output for the TransferQueue. It
2015-07-27 20:55:23 +00:00
// is given an estimated file count and size up front and tracks the number of
// files and bytes transferred as well as the number of files and bytes that
// get skipped because the transfer is unnecessary.
type Meter struct {
finishedFiles int64 // int64s must come first for struct alignment
skippedFiles int64
transferringFiles int64
estimatedBytes int64
currentBytes int64
skippedBytes int64
estimatedFiles int32
paused uint32
logToFile uint32
logger *tools.SyncWriter
fileIndex map[string]int64 // Maps a file name to its transfer number
2016-02-23 19:00:45 +00:00
fileIndexMutex *sync.Mutex
2016-12-07 20:27:42 +00:00
dryRun bool
updates chan *tasklog.Update
}
type env interface {
Get(key string) (val string, ok bool)
}
type meterOption func(*Meter)
2016-12-07 23:54:24 +00:00
// WithLogFile is an option for NewMeter() that sends updates to a text file.
2016-12-07 20:18:33 +00:00
func WithLogFile(name string) meterOption {
printErr := func(err string) {
fmt.Fprintf(os.Stderr, "Error creating progress logger: %s\n", err)
}
return func(m *Meter) {
if len(name) == 0 {
return
}
if !filepath.IsAbs(name) {
printErr("GIT_LFS_PROGRESS must be an absolute path")
return
}
cbDir := filepath.Dir(name)
if err := os.MkdirAll(cbDir, 0755); err != nil {
printErr(err.Error())
return
}
file, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
printErr(err.Error())
return
}
m.logToFile = 1
m.logger = tools.NewSyncWriter(file)
}
}
2016-12-07 23:54:24 +00:00
// WithOSEnv is an option for NewMeter() that sends updates to the text file
// path specified in the OS Env.
2016-12-07 20:18:33 +00:00
func WithOSEnv(os env) meterOption {
name, _ := os.Get("GIT_LFS_PROGRESS")
return WithLogFile(name)
}
type MeterOption struct {
// DryRun is an option that determines whether updates should be sent to
// stdout.
DryRun bool
// LogFile is an option that sends updates to a text file.
LogFile string
// OS is an option that sends updates to the text file path specified in
// the OS Env.
OS interface {
Get(key string) (val string, ok bool)
}
}
func (o *MeterOption) configure(m *Meter) {
m.dryRun = o.DryRun
if f, ok := o.logFile(); ok {
m.logToFile = 1
m.logger = tools.NewSyncWriter(f)
}
}
func (o *MeterOption) logFile() (*os.File, bool) {
var name string = o.LogFile
if len(name) == 0 {
name, _ = o.OS.Get("GIT_LFS_PROGRESS")
if len(name) == 0 {
return nil, false
}
}
printErr := func(err string) {
fmt.Fprintf(os.Stderr, "Error creating progress logger: %s\n", err)
}
if !filepath.IsAbs(name) {
printErr("GIT_LFS_PROGRESS must be an absolute path")
return nil, false
}
if err := os.MkdirAll(filepath.Dir(name), 0755); err != nil {
printErr(err.Error())
return nil, false
}
file, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
printErr(err.Error())
return nil, false
}
return file, true
}
// NewMeter creates a new Meter.
func NewMeter(opt *MeterOption) *Meter {
m := &Meter{
fileIndex: make(map[string]int64),
2016-02-23 19:00:45 +00:00
fileIndexMutex: &sync.Mutex{},
updates: make(chan *tasklog.Update),
}
opt.configure(m)
return m
}
2016-12-07 23:54:24 +00:00
// Start begins sending status updates to the optional log file, and stdout.
func (p *Meter) Start() {
if p == nil {
return
}
atomic.StoreUint32(&p.paused, 0)
}
2017-01-26 16:31:12 +00:00
// Pause stops sending status updates temporarily, until Start() is called again.
func (p *Meter) Pause() {
if p == nil {
return
}
atomic.StoreUint32(&p.paused, 1)
2017-01-24 21:11:38 +00:00
}
// Add tells the progress meter that a single file of the given size will
// possibly be transferred. If a file doesn't need to be transferred for some
// reason, be sure to call Skip(int64) with the same size.
func (p *Meter) Add(size int64) {
if p == nil {
return
}
defer p.update()
atomic.AddInt32(&p.estimatedFiles, 1)
atomic.AddInt64(&p.estimatedBytes, size)
}
2015-07-27 20:55:23 +00:00
// Skip tells the progress meter that a file of size `size` is being skipped
// because the transfer is unnecessary.
func (p *Meter) Skip(size int64) {
if p == nil {
return
}
defer p.update()
atomic.AddInt64(&p.skippedFiles, 1)
atomic.AddInt64(&p.skippedBytes, size)
// Reduce bytes and files so progress easier to parse
atomic.AddInt32(&p.estimatedFiles, -1)
atomic.AddInt64(&p.estimatedBytes, -size)
}
// StartTransfer tells the progress meter that a transferring file is being
// added to the TransferQueue.
func (p *Meter) StartTransfer(name string) {
if p == nil {
return
}
defer p.update()
idx := atomic.AddInt64(&p.transferringFiles, 1)
p.fileIndexMutex.Lock()
p.fileIndex[name] = idx
p.fileIndexMutex.Unlock()
}
2015-07-27 20:55:23 +00:00
// TransferBytes increments the number of bytes transferred
func (p *Meter) TransferBytes(direction, name string, read, total int64, current int) {
if p == nil {
return
}
defer p.update()
2015-07-27 20:55:23 +00:00
atomic.AddInt64(&p.currentBytes, int64(current))
p.logBytes(direction, name, read, total)
}
// FinishTransfer increments the finished transfer count
func (p *Meter) FinishTransfer(name string) {
if p == nil {
return
}
defer p.update()
2015-07-27 20:55:23 +00:00
atomic.AddInt64(&p.finishedFiles, 1)
2016-02-23 19:00:45 +00:00
p.fileIndexMutex.Lock()
2015-07-27 20:55:23 +00:00
delete(p.fileIndex, name)
2016-02-23 19:00:45 +00:00
p.fileIndexMutex.Unlock()
}
// Finish shuts down the Meter.
func (p *Meter) Finish() {
if p == nil {
return
}
p.update()
close(p.updates)
}
func (p *Meter) Updates() <-chan *tasklog.Update {
if p == nil {
return nil
}
return p.updates
}
func (p *Meter) Throttled() bool {
return true
}
func (p *Meter) update() {
if p.skipUpdate() {
return
}
p.updates <- &tasklog.Update{
S: p.str(),
At: time.Now(),
}
}
func (p *Meter) skipUpdate() bool {
return p.dryRun ||
(p.estimatedFiles == 0 && p.skippedFiles == 0) ||
atomic.LoadUint32(&p.paused) == 1
}
func (p *Meter) str() string {
// (%d of %d files, %d skipped) %f B / %f B, %f B skipped
// skipped counts only show when > 0
out := fmt.Sprintf("\rGit LFS: (%d of %d files",
p.finishedFiles,
p.estimatedFiles)
if p.skippedFiles > 0 {
out += fmt.Sprintf(", %d skipped", p.skippedFiles)
}
out += fmt.Sprintf(") %s / %s",
humanize.FormatBytes(uint64(p.currentBytes)),
humanize.FormatBytes(uint64(p.estimatedBytes)))
if p.skippedBytes > 0 {
out += fmt.Sprintf(", %s skipped",
humanize.FormatBytes(uint64(p.skippedBytes)))
}
return out
}
func (p *Meter) logBytes(direction, name string, read, total int64) {
p.fileIndexMutex.Lock()
idx := p.fileIndex[name]
p.fileIndexMutex.Unlock()
line := fmt.Sprintf("%s %d/%d %d/%d %s\n", direction, idx, p.estimatedFiles, read, total, name)
if atomic.LoadUint32(&p.logToFile) == 1 {
if err := p.logger.Write([]byte(line)); err != nil {
atomic.StoreUint32(&p.logToFile, 0)
}
}
}