From 54e86de23afab7adc5ce525f7e7ccda9515aad8d Mon Sep 17 00:00:00 2001 From: Steve Streeting Date: Wed, 18 May 2016 16:54:54 +0100 Subject: [PATCH] Make the batcher more general and not specific to Transferable --- lfs/batcher.go | 16 ++++++++-------- lfs/transfer_queue.go | 11 ++++++----- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/lfs/batcher.go b/lfs/batcher.go index c0ad8c98..e90263e7 100644 --- a/lfs/batcher.go +++ b/lfs/batcher.go @@ -11,16 +11,16 @@ import "sync/atomic" type Batcher struct { exited uint32 batchSize int - input chan Transferable - batchReady chan []Transferable + input chan interface{} + batchReady chan []interface{} } // NewBatcher creates a Batcher with the batchSize. func NewBatcher(batchSize int) *Batcher { b := &Batcher{ batchSize: batchSize, - input: make(chan Transferable, batchSize), - batchReady: make(chan []Transferable), + input: make(chan interface{}, batchSize), + batchReady: make(chan []interface{}), } go b.acceptInput() @@ -29,9 +29,9 @@ func NewBatcher(batchSize int) *Batcher { // Add adds an item to the batcher. Add is safe to call from multiple // goroutines. -func (b *Batcher) Add(t Transferable) { +func (b *Batcher) Add(t interface{}) { if atomic.CompareAndSwapUint32(&b.exited, 1, 0) { - b.input = make(chan Transferable, b.batchSize) + b.input = make(chan interface{}, b.batchSize) go b.acceptInput() } @@ -40,7 +40,7 @@ func (b *Batcher) Add(t Transferable) { // Next will wait for the one of the above batch triggers to occur and return // the accumulated batch. -func (b *Batcher) Next() []Transferable { +func (b *Batcher) Next() []interface{} { return <-b.batchReady } @@ -58,7 +58,7 @@ func (b *Batcher) acceptInput() { exit := false for { - batch := make([]Transferable, 0, b.batchSize) + batch := make([]interface{}, 0, b.batchSize) Loop: for len(batch) < b.batchSize { t, ok := <-b.input diff --git a/lfs/transfer_queue.go b/lfs/transfer_queue.go index 1a6e5904..0d3bfa96 100644 --- a/lfs/transfer_queue.go +++ b/lfs/transfer_queue.go @@ -174,13 +174,13 @@ func (q *TransferQueue) individualApiRoutine(apiWaiter chan interface{}) { // legacyFallback is used when a batch request is made to a server that does // not support the batch endpoint. When this happens, the Transferables are // fed from the batcher into apic to be processed individually. -func (q *TransferQueue) legacyFallback(failedBatch []Transferable) { +func (q *TransferQueue) legacyFallback(failedBatch []interface{}) { tracerx.Printf("tq: batch api not implemented, falling back to individual") q.launchIndividualApiRoutines() for _, t := range failedBatch { - q.apic <- t + q.apic <- t.(Transferable) } for { @@ -190,7 +190,7 @@ func (q *TransferQueue) legacyFallback(failedBatch []Transferable) { } for _, t := range batch { - q.apic <- t + q.apic <- t.(Transferable) } } } @@ -210,7 +210,8 @@ func (q *TransferQueue) batchApiRoutine() { tracerx.Printf("tq: sending batch of size %d", len(batch)) transfers := make([]*api.ObjectResource, 0, len(batch)) - for _, t := range batch { + for _, i := range batch { + t := i.(Transferable) transfers = append(transfers, &api.ObjectResource{Oid: t.Oid(), Size: t.Size()}) } @@ -225,7 +226,7 @@ func (q *TransferQueue) batchApiRoutine() { if q.canRetry(err) { for _, t := range batch { - q.retry(t) + q.retry(t.(Transferable)) } } else { q.errorc <- err