Make the batcher more general and not specific to Transferable

This commit is contained in:
Steve Streeting 2016-05-18 16:54:54 +01:00
parent ab53c53e72
commit 54e86de23a
2 changed files with 14 additions and 13 deletions

@ -11,16 +11,16 @@ import "sync/atomic"
type Batcher struct {
exited uint32
batchSize int
input chan Transferable
batchReady chan []Transferable
input chan interface{}
batchReady chan []interface{}
}
// NewBatcher creates a Batcher with the batchSize.
func NewBatcher(batchSize int) *Batcher {
b := &Batcher{
batchSize: batchSize,
input: make(chan Transferable, batchSize),
batchReady: make(chan []Transferable),
input: make(chan interface{}, batchSize),
batchReady: make(chan []interface{}),
}
go b.acceptInput()
@ -29,9 +29,9 @@ func NewBatcher(batchSize int) *Batcher {
// Add adds an item to the batcher. Add is safe to call from multiple
// goroutines.
func (b *Batcher) Add(t Transferable) {
func (b *Batcher) Add(t interface{}) {
if atomic.CompareAndSwapUint32(&b.exited, 1, 0) {
b.input = make(chan Transferable, b.batchSize)
b.input = make(chan interface{}, b.batchSize)
go b.acceptInput()
}
@ -40,7 +40,7 @@ func (b *Batcher) Add(t Transferable) {
// Next will wait for the one of the above batch triggers to occur and return
// the accumulated batch.
func (b *Batcher) Next() []Transferable {
func (b *Batcher) Next() []interface{} {
return <-b.batchReady
}
@ -58,7 +58,7 @@ func (b *Batcher) acceptInput() {
exit := false
for {
batch := make([]Transferable, 0, b.batchSize)
batch := make([]interface{}, 0, b.batchSize)
Loop:
for len(batch) < b.batchSize {
t, ok := <-b.input

@ -174,13 +174,13 @@ func (q *TransferQueue) individualApiRoutine(apiWaiter chan interface{}) {
// legacyFallback is used when a batch request is made to a server that does
// not support the batch endpoint. When this happens, the Transferables are
// fed from the batcher into apic to be processed individually.
func (q *TransferQueue) legacyFallback(failedBatch []Transferable) {
func (q *TransferQueue) legacyFallback(failedBatch []interface{}) {
tracerx.Printf("tq: batch api not implemented, falling back to individual")
q.launchIndividualApiRoutines()
for _, t := range failedBatch {
q.apic <- t
q.apic <- t.(Transferable)
}
for {
@ -190,7 +190,7 @@ func (q *TransferQueue) legacyFallback(failedBatch []Transferable) {
}
for _, t := range batch {
q.apic <- t
q.apic <- t.(Transferable)
}
}
}
@ -210,7 +210,8 @@ func (q *TransferQueue) batchApiRoutine() {
tracerx.Printf("tq: sending batch of size %d", len(batch))
transfers := make([]*api.ObjectResource, 0, len(batch))
for _, t := range batch {
for _, i := range batch {
t := i.(Transferable)
transfers = append(transfers, &api.ObjectResource{Oid: t.Oid(), Size: t.Size()})
}
@ -225,7 +226,7 @@ func (q *TransferQueue) batchApiRoutine() {
if q.canRetry(err) {
for _, t := range batch {
q.retry(t)
q.retry(t.(Transferable))
}
} else {
q.errorc <- err