From 3de4b9cd0dff02e3d28ab8dd7d4b7e9c848e80c7 Mon Sep 17 00:00:00 2001 From: Taylor Blau Date: Mon, 7 Aug 2017 14:29:16 -0600 Subject: [PATCH] tq: accept new items during batch processing --- tq/transfer_queue.go | 86 +++++++++++++++++++++++++++++++++----------- 1 file changed, 65 insertions(+), 21 deletions(-) diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index 41b2b5bd..891e17b5 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -110,17 +110,15 @@ type TransferQueue struct { cb progress.CopyCallback meter progress.Meter errors []error - // transfers maps transfer OIDs to a set of transfers with the same OID. - transfers map[string]*objects - batchSize int - bufferDepth int - // Channel for processing (and buffering) incoming items - incoming chan *objectTuple - errorc chan error // Channel for processing errors - watchers []chan *Transfer - trMutex *sync.Mutex - collectorWait sync.WaitGroup - errorwait sync.WaitGroup + transfers map[string]*objects + batchSize int + bufferDepth int + incoming chan *objectTuple // Channel for processing incoming items + errorc chan error // Channel for processing errors + watchers []chan *Transfer + trMutex *sync.Mutex + collectorWait sync.WaitGroup + errorwait sync.WaitGroup // wait is used to keep track of pending transfers. It is incremented // once per unique OID on Add(), and is decremented when that transfer // is marked as completed or failed, but not retried. @@ -303,14 +301,17 @@ func (q *TransferQueue) remember(t *objectTuple) objects { // 2. While the batch contains less items than `q.batchSize` AND the channel // is open, read one item from the `q.incoming` channel. // a. If the read was a channel close, go to step 4. -// b. If the read was a TransferTransferable item, go to step 3. +// b. If the read was a transferable item, go to step 3. // 3. Append the item to the batch. // 4. Sort the batch by descending object size, make a batch API call, send // the items to the `*adapterBase`. -// 5. Process the worker results, incrementing and appending retries if -// possible. -// 6. If the `q.incoming` channel is open, go to step 2. -// 7. If the next batch is empty AND the `q.incoming` channel is closed, +// 5. In a separate goroutine, process the worker results, incrementing and +// appending retries if possible. On the main goroutine, accept new items +// into "pending". +// 6. Concat() the "next" and "pending" batches such that no more items than +// the maximum allowed per batch are in next, and the rest are in pending. +// 7. If the `q.incoming` channel is open, go to step 2. +// 8. If the next batch is empty AND the `q.incoming` channel is closed, // terminate immediately. // // collectBatches runs in its own goroutine. @@ -319,6 +320,7 @@ func (q *TransferQueue) collectBatches() { var closing bool next := q.makeBatch() + pending := q.makeBatch() for { for !closing && (len(next) < q.batchSize) { @@ -335,16 +337,58 @@ func (q *TransferQueue) collectBatches() { // size. sort.Sort(sort.Reverse(next)) - retries, err := q.enqueueAndCollectRetriesFor(batch) - if err != nil { - q.errorc <- err - } + done := make(chan struct{}) + + var retries batch + + go func() { + var err error + + retries, err = q.enqueueAndCollectRetriesFor(next) + if err != nil { + q.errorc <- err + } + + close(done) + }() + + var collected batch + collected, closing = q.collectPendingUntil(done) + + // Ensure the next batch is filled with, in order: + // + // - retries from the previous batch, + // - new additions that were enqueued behind retries, & + // - items collected while the batch was processing. + next, pending = retries.Concat(append(pending, collected...), q.batchSize) if closing && len(next) == 0 { + // If len(next) == 0, there are no items in "pending", + // and it is safe to exit. break } + } +} - next = retries +// collectPendingUntil collects items from q.incoming into a "pending" batch +// until the given "done" channel is written to, or is closed. +// +// A "pending" batch is returned, along with whether or not "q.incoming" is +// closed. +func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) { + for { + select { + case t, ok := <-q.incoming: + if !ok { + closing = true + <-done + return + } + + pending = append(pending, t) + case <-done: + return + } } }