lfs/batcher: un-buffer the input channel
In preparation for the next commit where I'll add a Truncate() method to the batcher, the `input` channel needs to guarentee to its callers that all items within it have been processed (specifically before returning from `Add()`). Since the input channel was buffered, the following situation could easily occur: 1. Several calls to Add() are issued, which get buffered by the "input" channel. 2. Items are processed on a separate goroutine, draining the buffer. 3. While the buffer still has items in it, a call to Truncate() causes an alternate select/case branch to fire (leaving all of the buffered items to be dropped on the floor). 4. A batch is returned containing all of the _processed_ items from `input`. 5. Buffered items are (perhaps) returned in the next batch. Since the time spent blocking is short and I could not find instances where this channel buffering is relied upon, this made the most sense of all the synchronization methods (`WaitGroup`s, `Cond`s, etc)
This commit is contained in:
parent
b7b1f2b798
commit
98873d87c4
@ -19,7 +19,7 @@ type Batcher struct {
|
||||
func NewBatcher(batchSize int) *Batcher {
|
||||
b := &Batcher{
|
||||
batchSize: batchSize,
|
||||
input: make(chan interface{}, batchSize),
|
||||
input: make(chan interface{}),
|
||||
batchReady: make(chan []interface{}),
|
||||
}
|
||||
|
||||
@ -31,7 +31,7 @@ func NewBatcher(batchSize int) *Batcher {
|
||||
// goroutines.
|
||||
func (b *Batcher) Add(t interface{}) {
|
||||
if atomic.CompareAndSwapUint32(&b.exited, 1, 0) {
|
||||
b.input = make(chan interface{}, b.batchSize)
|
||||
b.input = make(chan interface{})
|
||||
go b.acceptInput()
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user