Syncrhonize uploads until one is successful, then allow concurrent

To prevent being asked for credentials n times (where n = # of
concurrent uploads), we process the upload queue synchronously until one
of the uploads succeeds, and then allow concurrent uploads.

This could be a bit faster if the condition is fired after the first
successful API hit, rather than the first successful total upload, but
some deeper refactoring will need to be done.
This commit is contained in:
rubyist 2015-04-29 10:37:27 -04:00
parent 23ea40978a
commit 5619c5cd4a

@ -9,6 +9,10 @@ import (
"sync/atomic"
)
var (
clientAuthorized = 0
)
// Uploadable describes a file that can be uploaded.
type Uploadable struct {
OIDPath string
@ -55,15 +59,17 @@ type UploadQueue struct {
files int
finished int64
size int64
authCond *sync.Cond
}
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
func NewUploadQueue(workers, files int) *UploadQueue {
return &UploadQueue{
uploadc: make(chan *Uploadable, files),
errorc: make(chan *WrappedError),
workers: workers,
files: files,
uploadc: make(chan *Uploadable, files),
errorc: make(chan *WrappedError),
workers: workers,
files: files,
authCond: sync.NewCond(&sync.Mutex{}),
}
}
@ -88,9 +94,19 @@ func (q *UploadQueue) Process() {
}
}()
workersReady := make(chan int, q.workers)
for i := 0; i < q.workers; i++ {
go func(n int) {
workersReady <- 1
for upload := range q.uploadc {
q.authCond.L.Lock()
if clientAuthorized == 0 {
q.authCond.Wait()
}
q.authCond.L.Unlock()
cb := func(total, read int64, current int) error {
bar.Add(current)
if upload.CB != nil {
@ -102,7 +118,14 @@ func (q *UploadQueue) Process() {
err := Upload(upload.OIDPath, upload.Filename, cb)
if err != nil {
q.errorc <- err
// This one failed, send a Signal() to wake up the next one
q.authCond.Signal()
} else {
// This one succeeded, set the auth condition to 1 and Broadcast()
clientAuthorized = 1
q.authCond.Broadcast()
}
f := atomic.AddInt64(&q.finished, 1)
bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files))
q.wg.Done()
@ -111,6 +134,8 @@ func (q *UploadQueue) Process() {
}
close(q.uploadc)
<-workersReady
q.authCond.Signal() // Signal the first goroutine to run
q.wg.Wait()
close(q.errorc)