From 721b057e40bf84e87c17ed75fcf81601b0a8bf7a Mon Sep 17 00:00:00 2001 From: Taylor Blau Date: Thu, 3 Aug 2017 14:44:53 -0600 Subject: [PATCH] tq: use *objects to remember objects sharing the same OID --- tq/transfer_queue.go | 85 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 69 insertions(+), 16 deletions(-) diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index 6416621c..08cb654f 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -98,9 +98,10 @@ type TransferQueue struct { cb progress.CopyCallback meter progress.Meter errors []error - transfers map[string]*objectTuple - batchSize int - bufferDepth int + // transfers maps transfer OIDs to a set of transfers with the same OID. + transfers map[string]*objects + batchSize int + bufferDepth int // Channel for processing (and buffering) incoming items incoming chan *objectTuple errorc chan error // Channel for processing errors @@ -116,6 +117,34 @@ type TransferQueue struct { rc *retryCounter } +// objects holds a set of objects. +type objects struct { + completed bool + objects []*objectTuple +} + +// All returns all *objectTuple's contained in the *objects set. +func (s *objects) All() []*objectTuple { + return s.objects +} + +// Append returns a new *objects with the given *objectTuple(s) appended to the +// end of the known objects. +func (s *objects) Append(os ...*objectTuple) *objects { + return &objects{ + completed: s.completed, + objects: append(s.objects, os...), + } +} + +// First returns the first *objectTuple in the chain of objects. +func (s *objects) First() *objectTuple { + if len(s.objects) == 0 { + return nil + } + return s.objects[0] +} + type objectTuple struct { Name, Path, Oid string Size int64 @@ -156,7 +185,7 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options client: &tqClient{Client: manifest.APIClient()}, remote: remote, errorc: make(chan error), - transfers: make(map[string]*objectTuple), + transfers: make(map[string]*objects), trMutex: &sync.Mutex{}, manifest: manifest, rc: newRetryCounter(), @@ -198,7 +227,9 @@ func (q *TransferQueue) Add(name, path, oid string, size int64) { Size: size, } - if isNew := q.remember(t); !isNew { + if objs := q.remember(t); len(objs.objects) > 1 { + // If the chain is not done, there is no reason to enqueue this + // transfer into 'q.incoming'. tracerx.Printf("already transferring %q, skipping duplicate", t.Oid) return } @@ -210,17 +241,22 @@ func (q *TransferQueue) Add(name, path, oid string, size int64) { // know about a Transfer with the same OID. // // It returns if the value is new or not. -func (q *TransferQueue) remember(t *objectTuple) bool { +func (q *TransferQueue) remember(t *objectTuple) objects { q.trMutex.Lock() defer q.trMutex.Unlock() if _, ok := q.transfers[t.Oid]; !ok { q.wait.Add(1) - q.transfers[t.Oid] = t + q.transfers[t.Oid] = &objects{ + objects: []*objectTuple{t}, + } - return true + return *q.transfers[t.Oid] } - return false + + q.transfers[t.Oid] = q.transfers[t.Oid].Append(t) + + return *q.transfers[t.Oid] } // collectBatches collects batches in a loop, prioritizing failed items from the @@ -344,7 +380,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) } q.trMutex.Lock() - t, ok := q.transfers[o.Oid] + objects, ok := q.transfers[o.Oid] q.trMutex.Unlock() if !ok { // If we couldn't find any associated @@ -356,7 +392,9 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) q.Skip(o.Size) q.wait.Done() } else { - tr := newTransfer(o, t.Name, t.Path) + // Pick t[0], since it will cover all transfers with the + // same OID. + tr := newTransfer(o, objects.First().Name, objects.First().Path) if a, err := tr.Rel(q.direction.String()); err != nil { // XXX(taylor): duplication @@ -365,7 +403,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) count := q.rc.CountFor(tr.Oid) tracerx.Printf("tq: enqueue retry #%d for %q (size: %d): %s", count, tr.Oid, tr.Size, err) - next = append(next, t) + next = append(next, objects.First()) } else { q.errorc <- errors.Errorf("[%v] %v", tr.Name, err) @@ -376,7 +414,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) q.Skip(o.Size) q.wait.Done() } else { - q.meter.StartTransfer(t.Name) + q.meter.StartTransfer(objects.First().Name) toTransfer = append(toTransfer, tr) } } @@ -513,11 +551,11 @@ func (q *TransferQueue) handleTransferResult( tracerx.Printf("tq: retrying object %s: %s", oid, res.Error) q.trMutex.Lock() - t, ok := q.transfers[oid] + objects, ok := q.transfers[oid] q.trMutex.Unlock() if ok { - retries <- t + retries <- objects.First() } else { q.errorc <- res.Error } @@ -530,12 +568,27 @@ func (q *TransferQueue) handleTransferResult( q.wait.Done() } } else { + q.trMutex.Lock() + objects := q.transfers[oid] + objects.completed = true + // Otherwise, if the transfer was successful, notify all of the // watchers, and mark it as finished. for _, c := range q.watchers { - c <- res.Transfer + // Send one update for each transfer with the + // same OID. + for _, t := range objects.All() { + c <- &Transfer{ + Name: t.Name, + Path: t.Path, + Oid: t.Oid, + Size: t.Size, + } + } } + q.trMutex.Unlock() + q.meter.FinishTransfer(res.Transfer.Name) q.wait.Done() }