lfs/batcher: rename Truncate() to Flush()

This commit is contained in:
Taylor Blau 2016-09-19 17:04:24 -06:00
parent a0d0635c82
commit 65109df3b9
2 changed files with 12 additions and 12 deletions

@ -9,9 +9,9 @@ import (
// be added to the batcher from multiple goroutines and pulled off in groups
// when one of the following conditions occurs:
// * The batch size is reached
// * Truncate() is called, forcing the batch to be returned immediately, as-is
// * Flush() is called, forcing the batch to be returned immediately, as-is
// * Exit() is called
// When an Exit() or Truncate() occurs, the group may be smaller than the batch
// When an Exit() or Flush() occurs, the group may be smaller than the batch
// size.
type Batcher struct {
exited uint32
@ -19,7 +19,7 @@ type Batcher struct {
wg sync.WaitGroup
input chan interface{}
batchReady chan []interface{}
truncate chan interface{}
flush chan interface{}
}
// NewBatcher creates a Batcher with the batchSize.
@ -28,7 +28,7 @@ func NewBatcher(batchSize int) *Batcher {
batchSize: batchSize,
input: make(chan interface{}, batchSize),
batchReady: make(chan []interface{}),
truncate: make(chan interface{}),
flush: make(chan interface{}),
}
go b.acceptInput()
@ -40,7 +40,7 @@ func NewBatcher(batchSize int) *Batcher {
func (b *Batcher) Add(ts ...interface{}) {
if atomic.CompareAndSwapUint32(&b.exited, 1, 0) {
b.input = make(chan interface{}, b.batchSize)
b.truncate = make(chan interface{})
b.flush = make(chan interface{})
go b.acceptInput()
}
@ -57,10 +57,10 @@ func (b *Batcher) Next() []interface{} {
return <-b.batchReady
}
// Truncate causes the current batch to halt accumulation and return
// Flush causes the current batch to halt accumulation and return
// immediately, even if it is smaller than the given batch size.
func (b *Batcher) Truncate() {
b.truncate <- struct{}{}
func (b *Batcher) Flush() {
b.flush <- struct{}{}
}
// Exit stops all batching and allows Next() to return. Calling Add() after
@ -68,7 +68,7 @@ func (b *Batcher) Truncate() {
func (b *Batcher) Exit() {
atomic.StoreUint32(&b.exited, 1)
close(b.input)
close(b.truncate)
close(b.flush)
}
// acceptInput runs in its own goroutine and accepts input from external
@ -92,7 +92,7 @@ func (b *Batcher) acceptInput() {
batch = append(batch, t)
b.wg.Done()
case <-b.truncate:
case <-b.flush:
break Acc
}
}

@ -24,13 +24,13 @@ func TestBatcherReturnsIncompleteBatchesWhenExiting(t *testing.T) {
}, t)
}
func TestBatcherTruncatesPartialBatches(t *testing.T) {
func TestBatcherFlushesPartialBatches(t *testing.T) {
first, second := "first", "second"
b := NewBatcher(3)
b.Add(first)
b.Add(second)
b.Truncate()
b.Flush()
batch := b.Next()