tq: teach the *TransferQueue to accept objects with the same OID

This commit is contained in:
Taylor Blau 2017-08-03 14:45:06 -06:00
parent 721b057e40
commit 120228c9ae

@ -150,6 +150,15 @@ type objectTuple struct {
Size int64
}
func (o *objectTuple) ToTransfer() *Transfer {
return &Transfer{
Name: o.Name,
Path: o.Path,
Oid: o.Oid,
Size: o.Size,
}
}
type Option func(*TransferQueue)
func DryRun(dryRun bool) Option {
@ -219,6 +228,13 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
// Add adds a *Transfer to the transfer queue. It only increments the amount
// of waiting the TransferQueue has to do if the *Transfer "t" is new.
//
// If another transfer(s) with the same OID has been added to the *TransferQueue
// already, the given transfer will not be enqueued, but will be sent to any
// channel created by Watch() once the oldest transfer has completed.
//
// Only one file will be transferred to/from the Path element of the first
// transfer.
func (q *TransferQueue) Add(name, path, oid string, size int64) {
t := &objectTuple{
Name: name,
@ -228,6 +244,15 @@ func (q *TransferQueue) Add(name, path, oid string, size int64) {
}
if objs := q.remember(t); len(objs.objects) > 1 {
if objs.completed {
// If there is already a completed transfer chain for
// this OID, then this object is already "done", and can
// be sent through as completed to the watchers.
for _, w := range q.watchers {
w <- t.ToTransfer()
}
}
// If the chain is not done, there is no reason to enqueue this
// transfer into 'q.incoming'.
tracerx.Printf("already transferring %q, skipping duplicate", t.Oid)
@ -690,8 +715,9 @@ func (q *TransferQueue) Wait() {
}
// Watch returns a channel where the queue will write the value of each transfer
// as it completes. The channel will be closed when the queue finishes
// processing.
// as it completes. If multiple transfers exist with the same OID, they will all
// be recorded here, even though only one actual transfer took place. The
// channel will be closed when the queue finishes processing.
func (q *TransferQueue) Watch() chan *Transfer {
c := make(chan *Transfer, q.batchSize)
q.watchers = append(q.watchers, c)