tq: use *objects to remember objects sharing the same OID

This commit is contained in:
Taylor Blau 2017-08-03 14:44:53 -06:00
parent 63c6bd4c97
commit 721b057e40

@ -98,7 +98,8 @@ type TransferQueue struct {
cb progress.CopyCallback cb progress.CopyCallback
meter progress.Meter meter progress.Meter
errors []error errors []error
transfers map[string]*objectTuple // transfers maps transfer OIDs to a set of transfers with the same OID.
transfers map[string]*objects
batchSize int batchSize int
bufferDepth int bufferDepth int
// Channel for processing (and buffering) incoming items // Channel for processing (and buffering) incoming items
@ -116,6 +117,34 @@ type TransferQueue struct {
rc *retryCounter rc *retryCounter
} }
// objects holds a set of objects.
type objects struct {
completed bool
objects []*objectTuple
}
// All returns all *objectTuple's contained in the *objects set.
func (s *objects) All() []*objectTuple {
return s.objects
}
// Append returns a new *objects with the given *objectTuple(s) appended to the
// end of the known objects.
func (s *objects) Append(os ...*objectTuple) *objects {
return &objects{
completed: s.completed,
objects: append(s.objects, os...),
}
}
// First returns the first *objectTuple in the chain of objects.
func (s *objects) First() *objectTuple {
if len(s.objects) == 0 {
return nil
}
return s.objects[0]
}
type objectTuple struct { type objectTuple struct {
Name, Path, Oid string Name, Path, Oid string
Size int64 Size int64
@ -156,7 +185,7 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
client: &tqClient{Client: manifest.APIClient()}, client: &tqClient{Client: manifest.APIClient()},
remote: remote, remote: remote,
errorc: make(chan error), errorc: make(chan error),
transfers: make(map[string]*objectTuple), transfers: make(map[string]*objects),
trMutex: &sync.Mutex{}, trMutex: &sync.Mutex{},
manifest: manifest, manifest: manifest,
rc: newRetryCounter(), rc: newRetryCounter(),
@ -198,7 +227,9 @@ func (q *TransferQueue) Add(name, path, oid string, size int64) {
Size: size, Size: size,
} }
if isNew := q.remember(t); !isNew { if objs := q.remember(t); len(objs.objects) > 1 {
// If the chain is not done, there is no reason to enqueue this
// transfer into 'q.incoming'.
tracerx.Printf("already transferring %q, skipping duplicate", t.Oid) tracerx.Printf("already transferring %q, skipping duplicate", t.Oid)
return return
} }
@ -210,17 +241,22 @@ func (q *TransferQueue) Add(name, path, oid string, size int64) {
// know about a Transfer with the same OID. // know about a Transfer with the same OID.
// //
// It returns if the value is new or not. // It returns if the value is new or not.
func (q *TransferQueue) remember(t *objectTuple) bool { func (q *TransferQueue) remember(t *objectTuple) objects {
q.trMutex.Lock() q.trMutex.Lock()
defer q.trMutex.Unlock() defer q.trMutex.Unlock()
if _, ok := q.transfers[t.Oid]; !ok { if _, ok := q.transfers[t.Oid]; !ok {
q.wait.Add(1) q.wait.Add(1)
q.transfers[t.Oid] = t q.transfers[t.Oid] = &objects{
objects: []*objectTuple{t},
return true
} }
return false
return *q.transfers[t.Oid]
}
q.transfers[t.Oid] = q.transfers[t.Oid].Append(t)
return *q.transfers[t.Oid]
} }
// collectBatches collects batches in a loop, prioritizing failed items from the // collectBatches collects batches in a loop, prioritizing failed items from the
@ -344,7 +380,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
} }
q.trMutex.Lock() q.trMutex.Lock()
t, ok := q.transfers[o.Oid] objects, ok := q.transfers[o.Oid]
q.trMutex.Unlock() q.trMutex.Unlock()
if !ok { if !ok {
// If we couldn't find any associated // If we couldn't find any associated
@ -356,7 +392,9 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
q.Skip(o.Size) q.Skip(o.Size)
q.wait.Done() q.wait.Done()
} else { } else {
tr := newTransfer(o, t.Name, t.Path) // Pick t[0], since it will cover all transfers with the
// same OID.
tr := newTransfer(o, objects.First().Name, objects.First().Path)
if a, err := tr.Rel(q.direction.String()); err != nil { if a, err := tr.Rel(q.direction.String()); err != nil {
// XXX(taylor): duplication // XXX(taylor): duplication
@ -365,7 +403,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
count := q.rc.CountFor(tr.Oid) count := q.rc.CountFor(tr.Oid)
tracerx.Printf("tq: enqueue retry #%d for %q (size: %d): %s", count, tr.Oid, tr.Size, err) tracerx.Printf("tq: enqueue retry #%d for %q (size: %d): %s", count, tr.Oid, tr.Size, err)
next = append(next, t) next = append(next, objects.First())
} else { } else {
q.errorc <- errors.Errorf("[%v] %v", tr.Name, err) q.errorc <- errors.Errorf("[%v] %v", tr.Name, err)
@ -376,7 +414,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
q.Skip(o.Size) q.Skip(o.Size)
q.wait.Done() q.wait.Done()
} else { } else {
q.meter.StartTransfer(t.Name) q.meter.StartTransfer(objects.First().Name)
toTransfer = append(toTransfer, tr) toTransfer = append(toTransfer, tr)
} }
} }
@ -513,11 +551,11 @@ func (q *TransferQueue) handleTransferResult(
tracerx.Printf("tq: retrying object %s: %s", oid, res.Error) tracerx.Printf("tq: retrying object %s: %s", oid, res.Error)
q.trMutex.Lock() q.trMutex.Lock()
t, ok := q.transfers[oid] objects, ok := q.transfers[oid]
q.trMutex.Unlock() q.trMutex.Unlock()
if ok { if ok {
retries <- t retries <- objects.First()
} else { } else {
q.errorc <- res.Error q.errorc <- res.Error
} }
@ -530,11 +568,26 @@ func (q *TransferQueue) handleTransferResult(
q.wait.Done() q.wait.Done()
} }
} else { } else {
q.trMutex.Lock()
objects := q.transfers[oid]
objects.completed = true
// Otherwise, if the transfer was successful, notify all of the // Otherwise, if the transfer was successful, notify all of the
// watchers, and mark it as finished. // watchers, and mark it as finished.
for _, c := range q.watchers { for _, c := range q.watchers {
c <- res.Transfer // Send one update for each transfer with the
// same OID.
for _, t := range objects.All() {
c <- &Transfer{
Name: t.Name,
Path: t.Path,
Oid: t.Oid,
Size: t.Size,
} }
}
}
q.trMutex.Unlock()
q.meter.FinishTransfer(res.Transfer.Name) q.meter.FinishTransfer(res.Transfer.Name)
q.wait.Done() q.wait.Done()