Merge pull request #2483 from git-lfs/tq-infinite-buffer

tq: make Add() accept new items during batch processing
This commit is contained in:
Taylor Blau 2017-08-07 14:52:24 -06:00 committed by GitHub
commit 8605c80fbf

@ -72,6 +72,18 @@ func (r *retryCounter) CanRetry(oid string) (int, bool) {
// all other workers are sitting idle. // all other workers are sitting idle.
type batch []*objectTuple type batch []*objectTuple
// Concat concatenates two batches together, returning a single, clamped batch as
// "left", and the remainder of elements as "right". If the union of the
// receiver and "other" has cardinality less than "size", "right" will be
// returned as nil.
func (b batch) Concat(other batch, size int) (left, right batch) {
u := batch(append(b, other...))
if len(u) <= size {
return u, nil
}
return u[:size], u[size:]
}
func (b batch) ToTransfers() []*Transfer { func (b batch) ToTransfers() []*Transfer {
transfers := make([]*Transfer, 0, len(b)) transfers := make([]*Transfer, 0, len(b))
for _, t := range b { for _, t := range b {
@ -98,17 +110,15 @@ type TransferQueue struct {
cb progress.CopyCallback cb progress.CopyCallback
meter progress.Meter meter progress.Meter
errors []error errors []error
// transfers maps transfer OIDs to a set of transfers with the same OID. transfers map[string]*objects
transfers map[string]*objects batchSize int
batchSize int bufferDepth int
bufferDepth int incoming chan *objectTuple // Channel for processing incoming items
// Channel for processing (and buffering) incoming items errorc chan error // Channel for processing errors
incoming chan *objectTuple watchers []chan *Transfer
errorc chan error // Channel for processing errors trMutex *sync.Mutex
watchers []chan *Transfer collectorWait sync.WaitGroup
trMutex *sync.Mutex errorwait sync.WaitGroup
collectorWait sync.WaitGroup
errorwait sync.WaitGroup
// wait is used to keep track of pending transfers. It is incremented // wait is used to keep track of pending transfers. It is incremented
// once per unique OID on Add(), and is decremented when that transfer // once per unique OID on Add(), and is decremented when that transfer
// is marked as completed or failed, but not retried. // is marked as completed or failed, but not retried.
@ -291,14 +301,17 @@ func (q *TransferQueue) remember(t *objectTuple) objects {
// 2. While the batch contains less items than `q.batchSize` AND the channel // 2. While the batch contains less items than `q.batchSize` AND the channel
// is open, read one item from the `q.incoming` channel. // is open, read one item from the `q.incoming` channel.
// a. If the read was a channel close, go to step 4. // a. If the read was a channel close, go to step 4.
// b. If the read was a TransferTransferable item, go to step 3. // b. If the read was a transferable item, go to step 3.
// 3. Append the item to the batch. // 3. Append the item to the batch.
// 4. Sort the batch by descending object size, make a batch API call, send // 4. Sort the batch by descending object size, make a batch API call, send
// the items to the `*adapterBase`. // the items to the `*adapterBase`.
// 5. Process the worker results, incrementing and appending retries if // 5. In a separate goroutine, process the worker results, incrementing and
// possible. // appending retries if possible. On the main goroutine, accept new items
// 6. If the `q.incoming` channel is open, go to step 2. // into "pending".
// 7. If the next batch is empty AND the `q.incoming` channel is closed, // 6. Concat() the "next" and "pending" batches such that no more items than
// the maximum allowed per batch are in next, and the rest are in pending.
// 7. If the `q.incoming` channel is open, go to step 2.
// 8. If the next batch is empty AND the `q.incoming` channel is closed,
// terminate immediately. // terminate immediately.
// //
// collectBatches runs in its own goroutine. // collectBatches runs in its own goroutine.
@ -306,33 +319,76 @@ func (q *TransferQueue) collectBatches() {
defer q.collectorWait.Done() defer q.collectorWait.Done()
var closing bool var closing bool
batch := q.makeBatch() next := q.makeBatch()
pending := q.makeBatch()
for { for {
for !closing && (len(batch) < q.batchSize) { for !closing && (len(next) < q.batchSize) {
t, ok := <-q.incoming t, ok := <-q.incoming
if !ok { if !ok {
closing = true closing = true
break break
} }
batch = append(batch, t) next = append(next, t)
} }
// Before enqueuing the next batch, sort by descending object // Before enqueuing the next batch, sort by descending object
// size. // size.
sort.Sort(sort.Reverse(batch)) sort.Sort(sort.Reverse(next))
retries, err := q.enqueueAndCollectRetriesFor(batch) done := make(chan struct{})
if err != nil {
q.errorc <- err
}
if closing && len(retries) == 0 { var retries batch
go func() {
var err error
retries, err = q.enqueueAndCollectRetriesFor(next)
if err != nil {
q.errorc <- err
}
close(done)
}()
var collected batch
collected, closing = q.collectPendingUntil(done)
// Ensure the next batch is filled with, in order:
//
// - retries from the previous batch,
// - new additions that were enqueued behind retries, &
// - items collected while the batch was processing.
next, pending = retries.Concat(append(pending, collected...), q.batchSize)
if closing && len(next) == 0 {
// If len(next) == 0, there are no items in "pending",
// and it is safe to exit.
break break
} }
}
}
batch = retries // collectPendingUntil collects items from q.incoming into a "pending" batch
// until the given "done" channel is written to, or is closed.
//
// A "pending" batch is returned, along with whether or not "q.incoming" is
// closed.
func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) {
for {
select {
case t, ok := <-q.incoming:
if !ok {
closing = true
<-done
return
}
pending = append(pending, t)
case <-done:
return
}
} }
} }