The batcher Reset can be implicit

This commit is contained in:
rubyist 2015-09-09 10:18:49 -04:00
parent 66aa279f28
commit e48d6feb92
2 changed files with 7 additions and 15 deletions

@ -32,6 +32,11 @@ func NewBatcher(batchSize int) *Batcher {
// Add adds an item to the batcher. Add is safe to call from multiple // Add adds an item to the batcher. Add is safe to call from multiple
// goroutines. // goroutines.
func (b *Batcher) Add(t Transferable) { func (b *Batcher) Add(t Transferable) {
if atomic.CompareAndSwapUint32(&b.exited, 1, 0) {
b.input = make(chan Transferable, b.batchSize)
go b.acceptInput()
}
b.input <- t b.input <- t
} }
@ -48,14 +53,6 @@ func (b *Batcher) Exit() {
close(b.input) close(b.input)
} }
// Reset allows an Exit()ed batcher to be re-started.
func (b *Batcher) Reset() {
if atomic.CompareAndSwapUint32(&b.exited, 1, 0) {
b.input = make(chan Transferable, b.batchSize)
go b.acceptInput()
}
}
// acceptInput runs in its own goroutine and accepts input from external // acceptInput runs in its own goroutine and accepts input from external
// clients. It fills and dispenses batches in a sequential order: for a batch // clients. It fills and dispenses batches in a sequential order: for a batch
// size N, N items will be processed before a new batch is ready. // size N, N items will be processed before a new batch is ready.

@ -89,17 +89,12 @@ func (q *TransferQueue) Wait() {
q.retrywait.Wait() q.retrywait.Wait()
atomic.StoreUint32(&q.retrying, 1) atomic.StoreUint32(&q.retrying, 1)
if len(q.retries) > 0 { if len(q.retries) > 0 && q.batcher != nil {
tracerx.Printf("tq: retrying %d failed transfers", len(q.retries)) tracerx.Printf("tq: retrying %d failed transfers", len(q.retries))
if q.batcher != nil {
q.batcher.Reset()
}
for _, t := range q.retries { for _, t := range q.retries {
q.Add(t) q.Add(t)
} }
if q.batcher != nil {
q.batcher.Exit() q.batcher.Exit()
}
q.wait.Wait() q.wait.Wait()
} }