diff --git a/lfs/transfer_queue.go b/lfs/transfer_queue.go index d684a5bd..8168c7b4 100644 --- a/lfs/transfer_queue.go +++ b/lfs/transfer_queue.go @@ -12,7 +12,7 @@ import ( ) const ( - batchSize = 100 + defaultBatchSize = 100 defaultMaxRetries = 1 ) @@ -100,6 +100,7 @@ type TransferQueue struct { meter progress.Meter errors []error transferables map[string]Transferable + batchSize int batcher *Batcher retriesc chan Transferable // Channel for processing retries errorc chan error // Channel for processing errors @@ -129,11 +130,15 @@ func WithProgress(m progress.Meter) transferQueueOption { } } +func WithBatchSize(size int) transferQueueOption { + return func(tq *TransferQueue) { tq.batchSize = size } +} + // newTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter func newTransferQueue(dir transfer.Direction, options ...transferQueueOption) *TransferQueue { q := &TransferQueue{ + batchSize: defaultBatchSize, direction: dir, - retriesc: make(chan Transferable, batchSize), errorc: make(chan error), transferables: make(map[string]Transferable), trMutex: &sync.Mutex{}, @@ -145,6 +150,8 @@ func newTransferQueue(dir transfer.Direction, options ...transferQueueOption) *T opt(q) } + q.retriesc = make(chan Transferable, q.batchSize) + if q.meter == nil { q.meter = progress.Noop() } @@ -327,7 +334,7 @@ func (q *TransferQueue) Wait() { // Watch returns a channel where the queue will write the OID of each transfer // as it completes. The channel will be closed when the queue finishes processing. func (q *TransferQueue) Watch() chan string { - c := make(chan string, batchSize) + c := make(chan string, q.batchSize) q.watchers = append(q.watchers, c) return c } @@ -446,8 +453,8 @@ func (q *TransferQueue) run() { go q.errorCollector() go q.retryCollector() - tracerx.Printf("tq: running as batched queue, batch size of %d", batchSize) - q.batcher = NewBatcher(batchSize) + tracerx.Printf("tq: running as batched queue, batch size of %d", q.batchSize) + q.batcher = NewBatcher(q.batchSize) go q.batchApiRoutine() }