Merge pull request #1746 from git-lfs/tq-options

Introduce transfer queue options
This commit is contained in:
risk danger olson 2016-12-07 13:13:29 -07:00 committed by GitHub
commit 110605a288
10 changed files with 90 additions and 71 deletions

@ -127,8 +127,7 @@ func checkoutWithIncludeExclude(filter *filepathfilter.Filter) {
wait.Done() wait.Done()
}() }()
logPath, _ := cfg.Os.Get("GIT_LFS_PROGRESS") meter := progress.NewMeter(progress.WithOSEnv(cfg.Os))
meter := progress.NewMeter(logPath)
meter.Start() meter.Start()
var totalBytes int64 var totalBytes int64
for _, pointer := range pointers { for _, pointer := range pointers {

@ -290,7 +290,7 @@ func fetchAndReportToChan(allpointers []*lfs.WrappedPointer, filter *filepathfil
} }
ready, pointers, meter := readyAndMissingPointers(allpointers, filter) ready, pointers, meter := readyAndMissingPointers(allpointers, filter)
q := lfs.NewDownloadQueue(meter, false) q := lfs.NewDownloadQueue(lfs.WithProgress(meter))
if out != nil { if out != nil {
// If we already have it, or it won't be fetched // If we already have it, or it won't be fetched

@ -262,8 +262,7 @@ func determineIncludeExcludePaths(config *config.Configuration, includeArg, excl
} }
func buildProgressMeter() *progress.ProgressMeter { func buildProgressMeter() *progress.ProgressMeter {
logPath, _ := cfg.Os.Get("GIT_LFS_PROGRESS") return progress.NewMeter(progress.WithOSEnv(cfg.Os))
return progress.NewMeter(logPath)
} }
// isCommandEnabled returns whether the environment variable GITLFS<CMD>ENABLED // isCommandEnabled returns whether the environment variable GITLFS<CMD>ENABLED

@ -75,7 +75,7 @@ func (c *uploadContext) prepareUpload(unfiltered []*lfs.WrappedPointer) (*lfs.Tr
// build the TransferQueue, automatically skipping any missing objects that // build the TransferQueue, automatically skipping any missing objects that
// the server already has. // the server already has.
uploadQueue := lfs.NewUploadQueue(meter, c.DryRun) uploadQueue := lfs.NewUploadQueue(lfs.WithProgress(meter), lfs.DryRun(c.DryRun))
for _, p := range missingLocalObjects { for _, p := range missingLocalObjects {
if c.HasUploaded(p.Oid) { if c.HasUploaded(p.Oid) {
// if the server already has this object, call Skip() on // if the server already has this object, call Skip() on

@ -2,7 +2,6 @@ package lfs
import ( import (
"github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/progress"
"github.com/git-lfs/git-lfs/transfer" "github.com/git-lfs/git-lfs/transfer"
) )
@ -41,11 +40,11 @@ func NewDownloadable(p *WrappedPointer) *Downloadable {
} }
// NewDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download // NewDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download
func NewDownloadCheckQueue() *TransferQueue { func NewDownloadCheckQueue(options ...TransferQueueOption) *TransferQueue {
return newTransferQueue(transfer.Download, nil, true) return newTransferQueue(transfer.Download, options...)
} }
// NewDownloadQueue builds a DownloadQueue, allowing concurrent downloads. // NewDownloadQueue builds a DownloadQueue, allowing concurrent downloads.
func NewDownloadQueue(meter *progress.ProgressMeter, dryRun bool) *TransferQueue { func NewDownloadQueue(options ...TransferQueueOption) *TransferQueue {
return newTransferQueue(transfer.Download, meter, dryRun) return newTransferQueue(transfer.Download, options...)
} }

@ -115,16 +115,24 @@ type TransferQueue struct {
rc *retryCounter rc *retryCounter
} }
// newTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter type TransferQueueOption func(*TransferQueue)
func newTransferQueue(dir transfer.Direction, meter progress.Meter, dryRun bool) *TransferQueue {
if meter == nil {
meter = progress.Noop()
}
func DryRun(dryRun bool) TransferQueueOption {
return func(tq *TransferQueue) {
tq.dryRun = dryRun
}
}
func WithProgress(m progress.Meter) TransferQueueOption {
return func(tq *TransferQueue) {
tq.meter = m
}
}
// newTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
func newTransferQueue(dir transfer.Direction, options ...TransferQueueOption) *TransferQueue {
q := &TransferQueue{ q := &TransferQueue{
direction: dir, direction: dir,
dryRun: dryRun,
meter: meter,
retriesc: make(chan Transferable, batchSize), retriesc: make(chan Transferable, batchSize),
errorc: make(chan error), errorc: make(chan error),
transferables: make(map[string]Transferable), transferables: make(map[string]Transferable),
@ -133,6 +141,14 @@ func newTransferQueue(dir transfer.Direction, meter progress.Meter, dryRun bool)
rc: newRetryCounter(config.Config), rc: newRetryCounter(config.Config),
} }
for _, opt := range options {
opt(q)
}
if q.meter == nil {
q.meter = progress.Noop()
}
q.errorwait.Add(1) q.errorwait.Add(1)
q.retrywait.Add(1) q.retrywait.Add(1)
q.run() q.run()

@ -8,7 +8,6 @@ import (
"github.com/git-lfs/git-lfs/api" "github.com/git-lfs/git-lfs/api"
"github.com/git-lfs/git-lfs/config" "github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/progress"
"github.com/git-lfs/git-lfs/transfer" "github.com/git-lfs/git-lfs/transfer"
) )
@ -68,8 +67,8 @@ func NewUploadable(oid, filename string) (*Uploadable, error) {
} }
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads. // NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
func NewUploadQueue(meter *progress.ProgressMeter, dryRun bool) *TransferQueue { func NewUploadQueue(options ...TransferQueueOption) *TransferQueue {
return newTransferQueue(transfer.Upload, meter, dryRun) return newTransferQueue(transfer.Upload, options...)
} }
// ensureFile makes sure that the cleanPath exists before pushing it. If it // ensureFile makes sure that the cleanPath exists before pushing it. If it

@ -1,10 +1,6 @@
package progress package progress
import ( import "os"
"fmt"
"os"
"path/filepath"
)
// progressLogger provides a wrapper around an os.File that can either // progressLogger provides a wrapper around an os.File that can either
// write to the file or ignore all writes completely. // write to the file or ignore all writes completely.
@ -16,12 +12,12 @@ type progressLogger struct {
// Write will write to the file and perform a Sync() if writing succeeds. // Write will write to the file and perform a Sync() if writing succeeds.
func (l *progressLogger) Write(b []byte) error { func (l *progressLogger) Write(b []byte) error {
if l.writeData { if l.writeData {
return nil
}
if _, err := l.log.Write(b); err != nil { if _, err := l.log.Write(b); err != nil {
return err return err
} }
return l.log.Sync() return l.log.Sync()
}
return nil
} }
// Close will call Close() on the underlying file // Close will call Close() on the underlying file
@ -37,28 +33,3 @@ func (l *progressLogger) Close() error {
func (l *progressLogger) Shutdown() { func (l *progressLogger) Shutdown() {
l.writeData = false l.writeData = false
} }
// newProgressLogger creates a progressLogger with a log file path.
// If a log file is able to be created, the logger will write to the file. If
// there is an err creating the file, the logger will ignore all writes.
func newProgressLogger(logPath string) (*progressLogger, error) {
if len(logPath) == 0 {
return &progressLogger{}, nil
}
if !filepath.IsAbs(logPath) {
return &progressLogger{}, fmt.Errorf("GIT_LFS_PROGRESS must be an absolute path")
}
cbDir := filepath.Dir(logPath)
if err := os.MkdirAll(cbDir, 0755); err != nil {
return &progressLogger{}, err
}
file, err := os.OpenFile(logPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return &progressLogger{}, err
}
return &progressLogger{true, file}, nil
}

@ -3,6 +3,7 @@ package progress
import ( import (
"fmt" "fmt"
"os" "os"
"path/filepath"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -32,28 +33,64 @@ type ProgressMeter struct {
DryRun bool DryRun bool
} }
func NewMeter(logPath string) *ProgressMeter { type env interface {
return NewProgressMeter(0, 0, false, logPath) Get(key string) (val string, ok bool)
} }
// NewProgressMeter creates a new ProgressMeter for the number and size of type MeterOption func(*ProgressMeter)
// files given.
func NewProgressMeter(estFiles int, estBytes int64, dryRun bool, logPath string) *ProgressMeter { func WithLogFile(name string) MeterOption {
logger, err := newProgressLogger(logPath) printErr := func(err string) {
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating progress logger: %s\n", err) fmt.Fprintf(os.Stderr, "Error creating progress logger: %s\n", err)
} }
return &ProgressMeter{ return func(m *ProgressMeter) {
logger: logger, if len(name) == 0 {
return
}
if !filepath.IsAbs(name) {
printErr("GIT_LFS_PROGRESS must be an absolute path")
return
}
cbDir := filepath.Dir(name)
if err := os.MkdirAll(cbDir, 0755); err != nil {
printErr(err.Error())
return
}
file, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
printErr(err.Error())
return
}
m.logger.writeData = true
m.logger.log = file
}
}
func WithOSEnv(os env) MeterOption {
name, _ := os.Get("GIT_LFS_PROGRESS")
return WithLogFile(name)
}
// NewMeter creates a new ProgressMeter.
func NewMeter(options ...MeterOption) *ProgressMeter {
m := &ProgressMeter{
logger: &progressLogger{},
startTime: time.Now(), startTime: time.Now(),
fileIndex: make(map[string]int64), fileIndex: make(map[string]int64),
fileIndexMutex: &sync.Mutex{}, fileIndexMutex: &sync.Mutex{},
finished: make(chan interface{}), finished: make(chan interface{}),
estimatedFiles: int32(estFiles),
estimatedBytes: estBytes,
DryRun: dryRun,
} }
for _, opt := range options {
opt(m)
}
return m
} }
func (p *ProgressMeter) Start() { func (p *ProgressMeter) Start() {

@ -138,8 +138,7 @@ func buildTestData() (oidsExist, oidsMissing []TestObject, err error) {
const oidCount = 50 const oidCount = 50
oidsExist = make([]TestObject, 0, oidCount) oidsExist = make([]TestObject, 0, oidCount)
oidsMissing = make([]TestObject, 0, oidCount) oidsMissing = make([]TestObject, 0, oidCount)
logPath, _ := config.Config.Os.Get("GIT_LFS_PROGRESS") meter := progress.NewMeter(progress.WithOSEnv(config.Config.Os))
meter := progress.NewMeter(logPath)
// Build test data for existing files & upload // Build test data for existing files & upload
// Use test repo for this to simplify the process of making sure data matches oid // Use test repo for this to simplify the process of making sure data matches oid
@ -159,7 +158,7 @@ func buildTestData() (oidsExist, oidsMissing []TestObject, err error) {
outputs := repo.AddCommits([]*test.CommitInput{&commit}) outputs := repo.AddCommits([]*test.CommitInput{&commit})
// now upload // now upload
uploadQueue := lfs.NewUploadQueue(meter, false) uploadQueue := lfs.NewUploadQueue(lfs.WithProgress(meter))
for _, f := range outputs[0].Files { for _, f := range outputs[0].Files {
oidsExist = append(oidsExist, TestObject{Oid: f.Oid, Size: f.Size}) oidsExist = append(oidsExist, TestObject{Oid: f.Oid, Size: f.Size})