lfs/tq: promote batchSize to instance variable

This commit is contained in:
Taylor Blau 2016-12-09 10:23:49 -07:00
parent 63b8639b59
commit 29f6b94e23

@ -12,7 +12,7 @@ import (
)
const (
batchSize = 100
defaultBatchSize = 100
defaultMaxRetries = 1
)
@ -100,6 +100,7 @@ type TransferQueue struct {
meter progress.Meter
errors []error
transferables map[string]Transferable
batchSize int
batcher *Batcher
retriesc chan Transferable // Channel for processing retries
errorc chan error // Channel for processing errors
@ -129,11 +130,15 @@ func WithProgress(m progress.Meter) transferQueueOption {
}
}
func WithBatchSize(size int) transferQueueOption {
return func(tq *TransferQueue) { tq.batchSize = size }
}
// newTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
func newTransferQueue(dir transfer.Direction, options ...transferQueueOption) *TransferQueue {
q := &TransferQueue{
batchSize: defaultBatchSize,
direction: dir,
retriesc: make(chan Transferable, batchSize),
errorc: make(chan error),
transferables: make(map[string]Transferable),
trMutex: &sync.Mutex{},
@ -145,6 +150,8 @@ func newTransferQueue(dir transfer.Direction, options ...transferQueueOption) *T
opt(q)
}
q.retriesc = make(chan Transferable, q.batchSize)
if q.meter == nil {
q.meter = progress.Noop()
}
@ -327,7 +334,7 @@ func (q *TransferQueue) Wait() {
// Watch returns a channel where the queue will write the OID of each transfer
// as it completes. The channel will be closed when the queue finishes processing.
func (q *TransferQueue) Watch() chan string {
c := make(chan string, batchSize)
c := make(chan string, q.batchSize)
q.watchers = append(q.watchers, c)
return c
}
@ -446,8 +453,8 @@ func (q *TransferQueue) run() {
go q.errorCollector()
go q.retryCollector()
tracerx.Printf("tq: running as batched queue, batch size of %d", batchSize)
q.batcher = NewBatcher(batchSize)
tracerx.Printf("tq: running as batched queue, batch size of %d", q.batchSize)
q.batcher = NewBatcher(q.batchSize)
go q.batchApiRoutine()
}