allow tasks to emit Updates that override throttling

This commit is contained in:
rick olson 2017-09-22 12:37:42 -06:00
parent 78300fb8bd
commit a565382af3
3 changed files with 31 additions and 19 deletions

@ -198,7 +198,7 @@ func (l *Logger) logTask(task Task) {
var update *Update
for update = range task.Updates() {
if logAll || l.throttle == 0 || update.At.After(last.Add(l.throttle)) {
if logAll || l.throttle == 0 || !update.Throttled(last.Add(l.throttle)) {
l.logLine(update.S)
last = update.At
}

@ -26,8 +26,8 @@ func TestLoggerLogsTasks(t *testing.T) {
task := make(chan *Update)
go func() {
task <- &Update{"first", time.Now()}
task <- &Update{"second", time.Now()}
task <- &Update{"first", time.Now(), false}
task <- &Update{"second", time.Now(), false}
close(task)
}()
@ -45,14 +45,14 @@ func TestLoggerLogsMultipleTasksInOrder(t *testing.T) {
t1 := make(chan *Update)
go func() {
t1 <- &Update{"first", time.Now()}
t1 <- &Update{"second", time.Now()}
t1 <- &Update{"first", time.Now(), false}
t1 <- &Update{"second", time.Now(), false}
close(t1)
}()
t2 := make(chan *Update)
go func() {
t2 <- &Update{"third", time.Now()}
t2 <- &Update{"fourth", time.Now()}
t2 <- &Update{"third", time.Now(), false}
t2 <- &Update{"fourth", time.Now(), false}
close(t2)
}()
@ -82,10 +82,10 @@ func TestLoggerLogsMultipleTasksWithoutBlocking(t *testing.T) {
l.widthFn = func() int { return 0 }
l.enqueue(ChanTask(t1))
t1 <- &Update{"first", time.Now()}
t1 <- &Update{"first", time.Now(), false}
l.enqueue(ChanTask(t2))
close(t1)
t2 <- &Update{"second", time.Now()}
t2 <- &Update{"second", time.Now(), false}
close(t2)
l.Close()
@ -105,10 +105,11 @@ func TestLoggerThrottlesWrites(t *testing.T) {
go func() {
start := time.Now()
t1 <- &Update{"first", start} // t = 0 ms, throttle was open
t1 <- &Update{"second", start.Add(10 * time.Millisecond)} // t = 10+ε ms, throttle is closed
t1 <- &Update{"third", start.Add(20 * time.Millisecond)} // t = 20+ε ms, throttle was open
close(t1) // t = 20+2ε ms, throttle is closed
t1 <- &Update{"first", start, false} // t = 0 ms, throttle was open
t1 <- &Update{"forced", start.Add(10 * time.Millisecond), true} // t = 10+ε ms, throttle is closed
t1 <- &Update{"second", start.Add(10 * time.Millisecond), false} // t = 10+ε ms, throttle is closed
t1 <- &Update{"third", start.Add(26 * time.Millisecond), false} // t = 20+ε ms, throttle was open
close(t1) // t = 20+2ε ms, throttle is closed
}()
l := NewLogger(&buf)
@ -120,6 +121,7 @@ func TestLoggerThrottlesWrites(t *testing.T) {
assert.Equal(t, strings.Join([]string{
"first\r",
"forced\r",
"third\r",
"third, done\n",
}, ""), buf.String())
@ -132,9 +134,9 @@ func TestLoggerThrottlesLastWrite(t *testing.T) {
go func() {
start := time.Now()
t1 <- &Update{"first", start} // t = 0 ms, throttle was open
t1 <- &Update{"second", start.Add(10 * time.Millisecond)} // t = 10+ε ms, throttle is closed
close(t1) // t = 10+2ε ms, throttle is closed
t1 <- &Update{"first", start, false} // t = 0 ms, throttle was open
t1 <- &Update{"second", start.Add(10 * time.Millisecond), false} // t = 10+ε ms, throttle is closed
close(t1) // t = 10+2ε ms, throttle is closed
}()
l := NewLogger(&buf)
@ -159,9 +161,9 @@ func TestLoggerLogsAllDurableUpdates(t *testing.T) {
t1 := make(chan *Update)
go func() {
t1 <- &Update{"first", time.Now()} // t = 0+ε ms, throttle is open
t1 <- &Update{"second", time.Now()} // t = 0+2ε ms, throttle is closed
close(t1) // t = 0+3ε ms, throttle is closed
t1 <- &Update{"first", time.Now(), false} // t = 0+ε ms, throttle is open
t1 <- &Update{"second", time.Now(), false} // t = 0+2ε ms, throttle is closed
close(t1) // t = 0+3ε ms, throttle is closed
}()
l.enqueue(UnthrottledChanTask(t1))

@ -22,4 +22,14 @@ type Update struct {
S string
// At is the time that this update was sent.
At time.Time
// Force determines if this update should not be throttled.
Force bool
}
// Throttled determines whether this update should be throttled, based on the
// given earliest time of the next update. The caller should determine how often
// updates should be throttled. An Update with Force=true is never throttled.
func (u *Update) Throttled(next time.Time) bool {
return !(u.Force || u.At.After(next))
}