tq: teach Watch() to return <-chan *tq.Transfer

This commit is contained in:
Taylor Blau 2017-08-02 13:17:39 -06:00
parent 330e899466
commit 28539fbaa2
4 changed files with 15 additions and 14 deletions

@ -301,8 +301,8 @@ func fetchAndReportToChan(allpointers []*lfs.WrappedPointer, filter *filepathfil
oidToPointers[pointer.Oid] = append(plist, pointer)
}
for oid := range dlwatch {
plist, ok := oidToPointers[oid]
for t := range dlwatch {
plist, ok := oidToPointers[t.Oid]
if !ok {
continue
}

@ -117,7 +117,7 @@ func prune(fetchPruneConfig config.FetchPruneConfig, verifyRemote, dryRun, verbo
var verifiedObjects tools.StringSet
var totalSize int64
var verboseOutput bytes.Buffer
var verifyc chan string
var verifyc chan *tq.Transfer
var verifywait sync.WaitGroup
if verifyRemote {
@ -128,9 +128,9 @@ func prune(fetchPruneConfig config.FetchPruneConfig, verifyRemote, dryRun, verbo
verifyc = verifyQueue.Watch()
verifywait.Add(1)
go func() {
for oid := range verifyc {
verifiedObjects.Add(oid)
tracerx.Printf("VERIFIED: %v", oid)
for t := range verifyc {
verifiedObjects.Add(t.Oid)
tracerx.Printf("VERIFIED: %v", t.Oid)
progressChan <- PruneProgress{PruneProgressTypeVerify, 1}
}
verifywait.Done()

@ -81,8 +81,8 @@ func pull(remote string, filter *filepathfilter.Filter) {
wg.Add(1)
go func() {
for oid := range dlwatch {
for _, p := range pointers.All(oid) {
for t := range dlwatch {
for _, p := range pointers.All(t.Oid) {
singleCheckout.Run(p)
}
}

@ -104,7 +104,7 @@ type TransferQueue struct {
// Channel for processing (and buffering) incoming items
incoming chan *objectTuple
errorc chan error // Channel for processing errors
watchers []chan string
watchers []chan *Transfer
trMutex *sync.Mutex
collectorWait sync.WaitGroup
errorwait sync.WaitGroup
@ -533,7 +533,7 @@ func (q *TransferQueue) handleTransferResult(
// Otherwise, if the transfer was successful, notify all of the
// watchers, and mark it as finished.
for _, c := range q.watchers {
c <- oid
c <- res.Transfer
}
q.meter.FinishTransfer(res.Transfer.Name)
@ -636,10 +636,11 @@ func (q *TransferQueue) Wait() {
q.errorwait.Wait()
}
// Watch returns a channel where the queue will write the OID of each transfer
// as it completes. The channel will be closed when the queue finishes processing.
func (q *TransferQueue) Watch() chan string {
c := make(chan string, q.batchSize)
// Watch returns a channel where the queue will write the value of each transfer
// as it completes. The channel will be closed when the queue finishes
// processing.
func (q *TransferQueue) Watch() chan *Transfer {
c := make(chan *Transfer, q.batchSize)
q.watchers = append(q.watchers, c)
return c
}