From 0842701c0eae8cb302de84b17d5b5f1ba1488ce5 Mon Sep 17 00:00:00 2001 From: Taylor Blau Date: Sun, 6 Aug 2017 11:03:42 -0600 Subject: [PATCH 1/3] tq: rename 'batch' to 'next' --- tq/transfer_queue.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index 78ef9d6f..2017eabb 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -306,33 +306,33 @@ func (q *TransferQueue) collectBatches() { defer q.collectorWait.Done() var closing bool - batch := q.makeBatch() + next := q.makeBatch() for { - for !closing && (len(batch) < q.batchSize) { + for !closing && (len(next) < q.batchSize) { t, ok := <-q.incoming if !ok { closing = true break } - batch = append(batch, t) + next = append(next, t) } // Before enqueuing the next batch, sort by descending object // size. - sort.Sort(sort.Reverse(batch)) + sort.Sort(sort.Reverse(next)) retries, err := q.enqueueAndCollectRetriesFor(batch) if err != nil { q.errorc <- err } - if closing && len(retries) == 0 { + if closing && len(next) == 0 { break } - batch = retries + next = retries } } From 17eeb67401f394d0e3334bfe781c2c3cb526d642 Mon Sep 17 00:00:00 2001 From: Taylor Blau Date: Sun, 6 Aug 2017 11:05:02 -0600 Subject: [PATCH 2/3] tq: teach Concat() to batch type --- tq/transfer_queue.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index 2017eabb..41b2b5bd 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -72,6 +72,18 @@ func (r *retryCounter) CanRetry(oid string) (int, bool) { // all other workers are sitting idle. type batch []*objectTuple +// Concat concatenates two batches together, returning a single, clamped batch as +// "left", and the remainder of elements as "right". If the union of the +// receiver and "other" has cardinality less than "size", "right" will be +// returned as nil. +func (b batch) Concat(other batch, size int) (left, right batch) { + u := batch(append(b, other...)) + if len(u) <= size { + return u, nil + } + return u[:size], u[size:] +} + func (b batch) ToTransfers() []*Transfer { transfers := make([]*Transfer, 0, len(b)) for _, t := range b { From 3de4b9cd0dff02e3d28ab8dd7d4b7e9c848e80c7 Mon Sep 17 00:00:00 2001 From: Taylor Blau Date: Mon, 7 Aug 2017 14:29:16 -0600 Subject: [PATCH 3/3] tq: accept new items during batch processing --- tq/transfer_queue.go | 86 +++++++++++++++++++++++++++++++++----------- 1 file changed, 65 insertions(+), 21 deletions(-) diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index 41b2b5bd..891e17b5 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -110,17 +110,15 @@ type TransferQueue struct { cb progress.CopyCallback meter progress.Meter errors []error - // transfers maps transfer OIDs to a set of transfers with the same OID. - transfers map[string]*objects - batchSize int - bufferDepth int - // Channel for processing (and buffering) incoming items - incoming chan *objectTuple - errorc chan error // Channel for processing errors - watchers []chan *Transfer - trMutex *sync.Mutex - collectorWait sync.WaitGroup - errorwait sync.WaitGroup + transfers map[string]*objects + batchSize int + bufferDepth int + incoming chan *objectTuple // Channel for processing incoming items + errorc chan error // Channel for processing errors + watchers []chan *Transfer + trMutex *sync.Mutex + collectorWait sync.WaitGroup + errorwait sync.WaitGroup // wait is used to keep track of pending transfers. It is incremented // once per unique OID on Add(), and is decremented when that transfer // is marked as completed or failed, but not retried. @@ -303,14 +301,17 @@ func (q *TransferQueue) remember(t *objectTuple) objects { // 2. While the batch contains less items than `q.batchSize` AND the channel // is open, read one item from the `q.incoming` channel. // a. If the read was a channel close, go to step 4. -// b. If the read was a TransferTransferable item, go to step 3. +// b. If the read was a transferable item, go to step 3. // 3. Append the item to the batch. // 4. Sort the batch by descending object size, make a batch API call, send // the items to the `*adapterBase`. -// 5. Process the worker results, incrementing and appending retries if -// possible. -// 6. If the `q.incoming` channel is open, go to step 2. -// 7. If the next batch is empty AND the `q.incoming` channel is closed, +// 5. In a separate goroutine, process the worker results, incrementing and +// appending retries if possible. On the main goroutine, accept new items +// into "pending". +// 6. Concat() the "next" and "pending" batches such that no more items than +// the maximum allowed per batch are in next, and the rest are in pending. +// 7. If the `q.incoming` channel is open, go to step 2. +// 8. If the next batch is empty AND the `q.incoming` channel is closed, // terminate immediately. // // collectBatches runs in its own goroutine. @@ -319,6 +320,7 @@ func (q *TransferQueue) collectBatches() { var closing bool next := q.makeBatch() + pending := q.makeBatch() for { for !closing && (len(next) < q.batchSize) { @@ -335,16 +337,58 @@ func (q *TransferQueue) collectBatches() { // size. sort.Sort(sort.Reverse(next)) - retries, err := q.enqueueAndCollectRetriesFor(batch) - if err != nil { - q.errorc <- err - } + done := make(chan struct{}) + + var retries batch + + go func() { + var err error + + retries, err = q.enqueueAndCollectRetriesFor(next) + if err != nil { + q.errorc <- err + } + + close(done) + }() + + var collected batch + collected, closing = q.collectPendingUntil(done) + + // Ensure the next batch is filled with, in order: + // + // - retries from the previous batch, + // - new additions that were enqueued behind retries, & + // - items collected while the batch was processing. + next, pending = retries.Concat(append(pending, collected...), q.batchSize) if closing && len(next) == 0 { + // If len(next) == 0, there are no items in "pending", + // and it is safe to exit. break } + } +} - next = retries +// collectPendingUntil collects items from q.incoming into a "pending" batch +// until the given "done" channel is written to, or is closed. +// +// A "pending" batch is returned, along with whether or not "q.incoming" is +// closed. +func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) { + for { + select { + case t, ok := <-q.incoming: + if !ok { + closing = true + <-done + return + } + + pending = append(pending, t) + case <-done: + return + } } }