From 120228c9ae280ce59dbc10193fa730b8bc5fc087 Mon Sep 17 00:00:00 2001 From: Taylor Blau Date: Thu, 3 Aug 2017 14:45:06 -0600 Subject: [PATCH] tq: teach the *TransferQueue to accept objects with the same OID --- tq/transfer_queue.go | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index 08cb654f..78ef9d6f 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -150,6 +150,15 @@ type objectTuple struct { Size int64 } +func (o *objectTuple) ToTransfer() *Transfer { + return &Transfer{ + Name: o.Name, + Path: o.Path, + Oid: o.Oid, + Size: o.Size, + } +} + type Option func(*TransferQueue) func DryRun(dryRun bool) Option { @@ -219,6 +228,13 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options // Add adds a *Transfer to the transfer queue. It only increments the amount // of waiting the TransferQueue has to do if the *Transfer "t" is new. +// +// If another transfer(s) with the same OID has been added to the *TransferQueue +// already, the given transfer will not be enqueued, but will be sent to any +// channel created by Watch() once the oldest transfer has completed. +// +// Only one file will be transferred to/from the Path element of the first +// transfer. func (q *TransferQueue) Add(name, path, oid string, size int64) { t := &objectTuple{ Name: name, @@ -228,6 +244,15 @@ func (q *TransferQueue) Add(name, path, oid string, size int64) { } if objs := q.remember(t); len(objs.objects) > 1 { + if objs.completed { + // If there is already a completed transfer chain for + // this OID, then this object is already "done", and can + // be sent through as completed to the watchers. + for _, w := range q.watchers { + w <- t.ToTransfer() + } + } + // If the chain is not done, there is no reason to enqueue this // transfer into 'q.incoming'. tracerx.Printf("already transferring %q, skipping duplicate", t.Oid) @@ -690,8 +715,9 @@ func (q *TransferQueue) Wait() { } // Watch returns a channel where the queue will write the value of each transfer -// as it completes. The channel will be closed when the queue finishes -// processing. +// as it completes. If multiple transfers exist with the same OID, they will all +// be recorded here, even though only one actual transfer took place. The +// channel will be closed when the queue finishes processing. func (q *TransferQueue) Watch() chan *Transfer { c := make(chan *Transfer, q.batchSize) q.watchers = append(q.watchers, c)