git-lfs/git/githistory/log/log.go

228 lines
5.2 KiB
Go

package log
import (
"fmt"
"io"
"io/ioutil"
"strings"
"sync"
"time"
"github.com/git-lfs/git-lfs/tools"
"github.com/olekukonko/ts"
)
const (
DefaultLoggingThrottle = 200 * time.Millisecond
)
// Logger logs a series of tasks to an io.Writer, processing each task in order
// until completion .
type Logger struct {
// sink is the writer to write to.
sink io.Writer
// widthFn is a function that returns the width of the terminal that
// this logger is running within.
widthFn func() int
// throttle is the minimum amount of time that must pass between each
// instant data is logged.
throttle time.Duration
// queue is the incoming, unbuffered queue of tasks to enqueue.
queue chan Task
// tasks is the set of tasks to process.
tasks chan Task
// wg is a WaitGroup that is incremented when new tasks are enqueued,
// and decremented when tasks finish.
wg *sync.WaitGroup
}
// NewLogger retuns a new *Logger instance that logs to "sink" and uses the
// current terminal width as the width of the line.
func NewLogger(sink io.Writer) *Logger {
if sink == nil {
sink = ioutil.Discard
}
l := &Logger{
sink: sink,
throttle: DefaultLoggingThrottle,
widthFn: func() int {
size, err := ts.GetSize()
if err != nil {
return 80
}
return size.Col()
},
queue: make(chan Task),
tasks: make(chan Task),
wg: new(sync.WaitGroup),
}
go l.consume()
return l
}
// Close closes the queue and does not allow new Tasks to be `enqueue()`'d. It
// waits until the currently running Task has completed.
func (l *Logger) Close() {
if l == nil {
return
}
close(l.queue)
l.wg.Wait()
}
// Waitier creates and enqueues a new *WaitingTask.
func (l *Logger) Waiter(msg string) *WaitingTask {
t := NewWaitingTask(msg)
l.enqueue(t)
return t
}
// Percentage creates and enqueues a new *PercentageTask.
func (l *Logger) Percentage(msg string, total uint64) *PercentageTask {
t := NewPercentageTask(msg, total)
l.enqueue(t)
return t
}
// List creates and enqueues a new *ListTask.
func (l *Logger) List(msg string) *ListTask {
t := NewListTask(msg)
l.enqueue(t)
return t
}
// enqueue enqueues the given Tasks "ts".
func (l *Logger) enqueue(ts ...Task) {
if l == nil {
for _, t := range ts {
go func(t Task) {
for range t.Updates() {
// Discard all updates.
}
}(t)
}
return
}
l.wg.Add(len(ts))
for _, t := range ts {
l.queue <- t
}
}
// consume creates a pseudo-infinte buffer between the incoming set of tasks and
// the queue of tasks to work on.
func (l *Logger) consume() {
go func() {
// Process the single next task in sequence until completion,
// then consume the next task.
for task := range l.tasks {
l.logTask(task)
}
}()
defer close(l.tasks)
pending := make([]Task, 0)
for {
// If there is a pending task, "peek" it off of the set of
// pending tasks.
var next Task
if len(pending) > 0 {
next = pending[0]
}
if next == nil {
// If there was no pending task, wait for either a)
// l.queue to close, or b) a new task to be submitted.
task, ok := <-l.queue
if !ok {
// If the queue is closed, no more new tasks may
// be added.
return
}
// Otherwise, add a new task to the set of tasks to
// process immediately, since there is no current
// buffer.
l.tasks <- task
} else {
// If there is a pending task, wait for either a) a
// write to process the task to become non-blocking, or
// b) a new task to enter the queue.
select {
case task, ok := <-l.queue:
if !ok {
// If the queue is closed, no more tasks
// may be added.
return
}
// Otherwise, add the next task to the set of
// pending, active tasks.
pending = append(pending, task)
case l.tasks <- next:
// Or "pop" the peeked task off of the pending
// set.
pending = pending[1:]
}
}
}
}
// logTask logs the set of updates from a given task to the sink, then logs a
// "done" message, and then marks the task as done.
//
// By default, the *Logger throttles log entry updates to once per the duration
// of time specified by `l.throttle time.Duration`.
//
// If the duration if 0, or the task is "durable" (by implementing
// github.com/git-lfs/git-lfs/git/githistory/log#DurableTask), then all entries
// will be logged.
func (l *Logger) logTask(task Task) {
defer l.wg.Done()
logAll := !task.Throttled()
var last time.Time
var update *Update
for update = range task.Updates() {
if logAll || l.throttle == 0 || update.At.After(last.Add(l.throttle)) {
l.logLine(update.S)
last = update.At
}
}
l.log(fmt.Sprintf("%s, done\n", update.S))
}
// logLine writes a complete line and moves the cursor to the beginning of the
// line.
//
// It returns the number of bytes "n" written to the sink and the error "err",
// if one was encountered.
func (l *Logger) logLine(str string) (n int, err error) {
padding := strings.Repeat(" ", tools.MaxInt(0, l.widthFn()-len(str)))
return l.log(str + padding + "\r")
}
// log writes a string verbatim to the sink.
//
// It returns the number of bytes "n" written to the sink and the error "err",
// if one was encountered.
func (l *Logger) log(str string) (n int, err error) {
return fmt.Fprint(l.sink, str)
}