From 28539fbaa20c6af289abc983129af4081a537c90 Mon Sep 17 00:00:00 2001 From: Taylor Blau Date: Wed, 2 Aug 2017 13:17:39 -0600 Subject: [PATCH] tq: teach Watch() to return <-chan *tq.Transfer --- commands/command_fetch.go | 4 ++-- commands/command_prune.go | 8 ++++---- commands/command_pull.go | 4 ++-- tq/transfer_queue.go | 13 +++++++------ 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/commands/command_fetch.go b/commands/command_fetch.go index e1fc12d0..dac89702 100644 --- a/commands/command_fetch.go +++ b/commands/command_fetch.go @@ -301,8 +301,8 @@ func fetchAndReportToChan(allpointers []*lfs.WrappedPointer, filter *filepathfil oidToPointers[pointer.Oid] = append(plist, pointer) } - for oid := range dlwatch { - plist, ok := oidToPointers[oid] + for t := range dlwatch { + plist, ok := oidToPointers[t.Oid] if !ok { continue } diff --git a/commands/command_prune.go b/commands/command_prune.go index 291219f9..72945d6f 100644 --- a/commands/command_prune.go +++ b/commands/command_prune.go @@ -117,7 +117,7 @@ func prune(fetchPruneConfig config.FetchPruneConfig, verifyRemote, dryRun, verbo var verifiedObjects tools.StringSet var totalSize int64 var verboseOutput bytes.Buffer - var verifyc chan string + var verifyc chan *tq.Transfer var verifywait sync.WaitGroup if verifyRemote { @@ -128,9 +128,9 @@ func prune(fetchPruneConfig config.FetchPruneConfig, verifyRemote, dryRun, verbo verifyc = verifyQueue.Watch() verifywait.Add(1) go func() { - for oid := range verifyc { - verifiedObjects.Add(oid) - tracerx.Printf("VERIFIED: %v", oid) + for t := range verifyc { + verifiedObjects.Add(t.Oid) + tracerx.Printf("VERIFIED: %v", t.Oid) progressChan <- PruneProgress{PruneProgressTypeVerify, 1} } verifywait.Done() diff --git a/commands/command_pull.go b/commands/command_pull.go index de4f56d5..dc543ad5 100644 --- a/commands/command_pull.go +++ b/commands/command_pull.go @@ -81,8 +81,8 @@ func pull(remote string, filter *filepathfilter.Filter) { wg.Add(1) go func() { - for oid := range dlwatch { - for _, p := range pointers.All(oid) { + for t := range dlwatch { + for _, p := range pointers.All(t.Oid) { singleCheckout.Run(p) } } diff --git a/tq/transfer_queue.go b/tq/transfer_queue.go index 67f4e9a7..6416621c 100644 --- a/tq/transfer_queue.go +++ b/tq/transfer_queue.go @@ -104,7 +104,7 @@ type TransferQueue struct { // Channel for processing (and buffering) incoming items incoming chan *objectTuple errorc chan error // Channel for processing errors - watchers []chan string + watchers []chan *Transfer trMutex *sync.Mutex collectorWait sync.WaitGroup errorwait sync.WaitGroup @@ -533,7 +533,7 @@ func (q *TransferQueue) handleTransferResult( // Otherwise, if the transfer was successful, notify all of the // watchers, and mark it as finished. for _, c := range q.watchers { - c <- oid + c <- res.Transfer } q.meter.FinishTransfer(res.Transfer.Name) @@ -636,10 +636,11 @@ func (q *TransferQueue) Wait() { q.errorwait.Wait() } -// Watch returns a channel where the queue will write the OID of each transfer -// as it completes. The channel will be closed when the queue finishes processing. -func (q *TransferQueue) Watch() chan string { - c := make(chan string, q.batchSize) +// Watch returns a channel where the queue will write the value of each transfer +// as it completes. The channel will be closed when the queue finishes +// processing. +func (q *TransferQueue) Watch() chan *Transfer { + c := make(chan *Transfer, q.batchSize) q.watchers = append(q.watchers, c) return c }