Pull common queue code into a TransferQueue, reduce duplicated code

This commit is contained in:
rubyist 2015-05-21 12:36:49 -04:00
parent f8f4ad230a
commit a3bb997567
4 changed files with 279 additions and 381 deletions

@ -36,7 +36,7 @@ func getCommand(cmd *cobra.Command, args []string) {
q := lfs.NewDownloadQueue(lfs.Config.ConcurrentTransfers(), len(pointers))
for _, p := range pointers {
q.Add(p)
q.Add(lfs.NewDownloadable(p))
}
q.Process()

@ -1,197 +1,45 @@
package lfs
import (
"fmt"
"github.com/cheggaaa/pb"
"sync"
"sync/atomic"
)
type Downloadable struct {
Pointer *wrappedPointer
Object *objectResource
object *objectResource
}
// DownloadQueue provides a queue that will allow concurrent uploads.
type DownloadQueue struct {
downloadc chan *Downloadable
errorc chan *WrappedError
errors []*WrappedError
wg sync.WaitGroup
workers int
files int
finished int64
size int64
authCond *sync.Cond
downloadables map[string]*Downloadable
bar *pb.ProgressBar
clientAuthorized int32
func NewDownloadable(p *wrappedPointer) *Downloadable {
return &Downloadable{Pointer: p}
}
// NewDownloadQueue builds a DownloadQueue, allowing `workers` concurrent downloads.
func NewDownloadQueue(workers, files int) *DownloadQueue {
return &DownloadQueue{
downloadc: make(chan *Downloadable, files),
errorc: make(chan *WrappedError),
workers: workers,
files: files,
authCond: sync.NewCond(&sync.Mutex{}),
downloadables: make(map[string]*Downloadable),
}
func (d *Downloadable) Check() (*objectResource, *WrappedError) {
return DownloadCheck(d.Pointer.Oid)
}
// Add adds an object to the download queue.
func (q *DownloadQueue) Add(p *wrappedPointer) {
q.downloadables[p.Oid] = &Downloadable{Pointer: p}
}
func (q *DownloadQueue) processBatch() {
q.files = 0
downloads := make([]*objectResource, 0, len(q.downloadables))
for _, d := range q.downloadables {
downloads = append(downloads, &objectResource{Oid: d.Pointer.Oid, Size: d.Pointer.Size})
}
objects, err := Batch(downloads)
func (d *Downloadable) Transfer(cb CopyCallback) *WrappedError {
err := PointerSmudgeObject(d.Pointer.Pointer, d.object, cb)
if err != nil {
q.errorc <- err
sendApiEvent(apiEventFail)
return
return Error(err)
}
for _, o := range objects {
if _, ok := o.Links["download"]; ok {
// This object can be downloaded
if downloadable, ok := q.downloadables[o.Oid]; ok {
q.files++
q.wg.Add(1)
downloadable.Object = o
q.downloadc <- downloadable
}
}
}
close(q.downloadc)
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files))
q.bar.Start()
sendApiEvent(apiEventSuccess) // Wake up upload workers
}
// apiWorker processes the queue, making the POST calls and
// feeding the results to uploadWorkers
func (q *DownloadQueue) processIndividual() {
apic := make(chan *Downloadable, q.files)
workersReady := make(chan int, q.workers)
var wg sync.WaitGroup
for i := 0; i < q.workers; i++ {
go func() {
workersReady <- 1
for d := range apic {
// If an API authorization has not occured, we wait until we're woken up.
q.authCond.L.Lock()
if atomic.LoadInt32(&q.clientAuthorized) == 0 {
q.authCond.Wait()
}
q.authCond.L.Unlock()
obj, err := DownloadCheck(d.Pointer.Oid)
if err != nil {
q.errorc <- err
wg.Done()
continue
}
q.wg.Add(1)
d.Object = obj
q.downloadc <- d
wg.Done()
}
}()
}
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.downloadables)))
q.bar.Start()
for _, d := range q.downloadables {
wg.Add(1)
apic <- d
}
<-workersReady
q.authCond.Signal() // Signal the first goroutine to run
close(apic)
wg.Wait()
close(q.downloadc)
}
// Process starts the download queue and displays a progress bar.
func (q *DownloadQueue) Process() {
q.bar = pb.New64(q.size)
q.bar.SetUnits(pb.U_BYTES)
q.bar.ShowBar = false
// This goroutine collects errors returned from downloads
go func() {
for err := range q.errorc {
q.errors = append(q.errors, err)
}
}()
// This goroutine watches for apiEvents. In order to prevent multiple
// credential requests from happening, the queue is processed sequentially
// until an API request succeeds (meaning authenication has happened successfully).
// Once the an API request succeeds, all worker goroutines are woken up and allowed
// to process downloads. Once a success happens, this goroutine exits.
go func() {
for {
event := <-apiEvent
switch event {
case apiEventSuccess:
atomic.StoreInt32(&q.clientAuthorized, 1)
q.authCond.Broadcast() // Wake all remaining goroutines
return
case apiEventFail:
q.authCond.Signal() // Wake the next goroutine
}
}
}()
for i := 0; i < q.workers; i++ {
// These are the worker goroutines that process uploads
go func(n int) {
for d := range q.downloadc {
cb := func(total, read int64, current int) error {
q.bar.Add(current)
return nil
}
if err := PointerSmudgeObject(d.Pointer.Pointer, d.Object, cb); err != nil {
q.errorc <- Error(err)
func (d *Downloadable) Object() *objectResource {
return d.object
}
f := atomic.AddInt64(&q.finished, 1)
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files))
q.wg.Done()
}
}(i)
func (d *Downloadable) Oid() string {
return d.Pointer.Oid
}
if Config.BatchTransfer() {
q.processBatch()
} else {
q.processIndividual()
func (d *Downloadable) Size() int64 {
return d.Pointer.Size
}
q.wg.Wait()
close(q.errorc)
q.bar.Finish()
func (d *Downloadable) SetObject(o *objectResource) {
d.object = o
}
// Errors returns any errors encountered during uploading.
func (q *DownloadQueue) Errors() []*WrappedError {
return q.errors
// NewDownloadQueue builds a DownloadQueue, allowing `workers` concurrent downloads.
func NewDownloadQueue(workers, files int) *TransferQueue {
q := newTransferQueue(workers, files)
q.transferKind = "download"
return q
}

205
lfs/transfer_queue.go Normal file

@ -0,0 +1,205 @@
package lfs
import (
"fmt"
"github.com/cheggaaa/pb"
"sync"
"sync/atomic"
)
type Transferable interface {
Check() (*objectResource, *WrappedError)
Transfer(CopyCallback) *WrappedError
Object() *objectResource
Oid() string
Size() int64
SetObject(*objectResource)
}
// TransferQueue provides a queue that will allow concurrent transfers.
type TransferQueue struct {
transferc chan Transferable
errorc chan *WrappedError
errors []*WrappedError
wg sync.WaitGroup
workers int
files int
finished int64
size int64
authCond *sync.Cond
transferables map[string]Transferable
bar *pb.ProgressBar
clientAuthorized int32
transferKind string
}
// newTransferQueue builds a TransferQueue, allowing `workers` concurrent transfers.
func newTransferQueue(workers, files int) *TransferQueue {
return &TransferQueue{
transferc: make(chan Transferable, files),
errorc: make(chan *WrappedError),
workers: workers,
files: files,
authCond: sync.NewCond(&sync.Mutex{}),
transferables: make(map[string]Transferable),
}
}
// Add adds an Uploadable to the upload queue.
func (q *TransferQueue) Add(t Transferable) {
q.transferables[t.Oid()] = t
}
// apiWorker processes the queue, making the POST calls and
// feeding the results to uploadWorkers
func (q *TransferQueue) processIndividual() {
apic := make(chan Transferable, q.files)
workersReady := make(chan int, q.workers)
var wg sync.WaitGroup
for i := 0; i < q.workers; i++ {
go func() {
workersReady <- 1
for t := range apic {
// If an API authorization has not occured, we wait until we're woken up.
q.authCond.L.Lock()
if atomic.LoadInt32(&q.clientAuthorized) == 0 {
q.authCond.Wait()
}
q.authCond.L.Unlock()
obj, err := t.Check()
if err != nil {
q.errorc <- err
wg.Done()
continue
}
if obj != nil {
q.wg.Add(1)
t.SetObject(obj)
q.transferc <- t
}
wg.Done()
}
}()
}
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.transferables)))
q.bar.Start()
for _, t := range q.transferables {
wg.Add(1)
apic <- t
}
<-workersReady
q.authCond.Signal() // Signal the first goroutine to run
close(apic)
wg.Wait()
close(q.transferc)
}
// batchWorker makes the batch POST call, feeding the results
// to the transfer workers
func (q *TransferQueue) processBatch() {
q.files = 0
transfers := make([]*objectResource, 0, len(q.transferables))
for _, t := range q.transferables {
transfers = append(transfers, &objectResource{Oid: t.Oid(), Size: t.Size()})
}
objects, err := Batch(transfers)
if err != nil {
q.errorc <- err
sendApiEvent(apiEventFail)
return
}
for _, o := range objects {
if _, ok := o.Links[q.transferKind]; ok {
// This object needs to be transfered
if transfer, ok := q.transferables[o.Oid]; ok {
q.files++
q.wg.Add(1)
transfer.SetObject(o)
q.transferc <- transfer
}
}
}
close(q.transferc)
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files))
q.bar.Start()
sendApiEvent(apiEventSuccess) // Wake up upload workers
}
// Process starts the upload queue and displays a progress bar.
func (q *TransferQueue) Process() {
q.bar = pb.New64(q.size)
q.bar.SetUnits(pb.U_BYTES)
q.bar.ShowBar = false
// This goroutine collects errors returned from uploads
go func() {
for err := range q.errorc {
q.errors = append(q.errors, err)
}
}()
// This goroutine watches for apiEvents. In order to prevent multiple
// credential requests from happening, the queue is processed sequentially
// until an API request succeeds (meaning authenication has happened successfully).
// Once the an API request succeeds, all worker goroutines are woken up and allowed
// to process uploads. Once a success happens, this goroutine exits.
go func() {
for {
event := <-apiEvent
switch event {
case apiEventSuccess:
atomic.StoreInt32(&q.clientAuthorized, 1)
q.authCond.Broadcast() // Wake all remaining goroutines
return
case apiEventFail:
q.authCond.Signal() // Wake the next goroutine
}
}
}()
for i := 0; i < q.workers; i++ {
// These are the worker goroutines that process uploads
go func(n int) {
for transfer := range q.transferc {
cb := func(total, read int64, current int) error {
q.bar.Add(current)
return nil
}
if err := transfer.Transfer(cb); err != nil {
q.errorc <- err
}
f := atomic.AddInt64(&q.finished, 1)
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files))
q.wg.Done()
}
}(i)
}
if Config.BatchTransfer() {
q.processBatch()
} else {
q.processIndividual()
}
q.wg.Wait()
close(q.errorc)
q.bar.Finish()
}
// Errors returns any errors encountered during uploading.
func (q *TransferQueue) Errors() []*WrappedError {
return q.errors
}

@ -2,20 +2,17 @@ package lfs
import (
"fmt"
"github.com/cheggaaa/pb"
"os"
"path/filepath"
"sync"
"sync/atomic"
)
// Uploadable describes a file that can be uploaded.
type Uploadable struct {
OID string
OIDPath string
oid string
OidPath string
Filename string
CB CopyCallback
Size int64
size int64
object *objectResource
}
@ -44,198 +41,46 @@ func NewUploadable(oid, filename string, index, totalFiles int) (*Uploadable, *W
defer file.Close()
}
return &Uploadable{OID: oid, OIDPath: path, Filename: filename, CB: cb, Size: fi.Size()}, nil
return &Uploadable{oid: oid, OidPath: path, Filename: filename, CB: cb, size: fi.Size()}, nil
}
// UploadQueue provides a queue that will allow concurrent uploads.
type UploadQueue struct {
uploadc chan *Uploadable
errorc chan *WrappedError
errors []*WrappedError
wg sync.WaitGroup
workers int
files int
finished int64
size int64
authCond *sync.Cond
uploadables map[string]*Uploadable
bar *pb.ProgressBar
clientAuthorized int32
func (u *Uploadable) Check() (*objectResource, *WrappedError) {
return UploadCheck(u.OidPath)
}
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
func NewUploadQueue(workers, files int) *UploadQueue {
return &UploadQueue{
uploadc: make(chan *Uploadable, files),
errorc: make(chan *WrappedError),
workers: workers,
files: files,
authCond: sync.NewCond(&sync.Mutex{}),
uploadables: make(map[string]*Uploadable),
}
}
// Add adds an Uploadable to the upload queue.
func (q *UploadQueue) Add(u *Uploadable) {
q.uploadables[u.OID] = u
}
// apiWorker processes the queue, making the POST calls and
// feeding the results to uploadWorkers
func (q *UploadQueue) processIndividual() {
apic := make(chan *Uploadable, q.files)
workersReady := make(chan int, q.workers)
var wg sync.WaitGroup
for i := 0; i < q.workers; i++ {
go func() {
workersReady <- 1
for u := range apic {
// If an API authorization has not occured, we wait until we're woken up.
q.authCond.L.Lock()
if atomic.LoadInt32(&q.clientAuthorized) == 0 {
q.authCond.Wait()
}
q.authCond.L.Unlock()
obj, err := UploadCheck(u.OIDPath)
if err != nil {
q.errorc <- err
wg.Done()
continue
}
if obj != nil {
q.wg.Add(1)
u.object = obj
q.uploadc <- u
}
wg.Done()
}
}()
}
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.uploadables)))
q.bar.Start()
for _, u := range q.uploadables {
wg.Add(1)
apic <- u
}
<-workersReady
q.authCond.Signal() // Signal the first goroutine to run
close(apic)
wg.Wait()
close(q.uploadc)
}
// batchWorker makes the batch POST call, feeding the results
// to the uploadWorkers
func (q *UploadQueue) processBatch() {
q.files = 0
uploads := make([]*objectResource, 0, len(q.uploadables))
for _, u := range q.uploadables {
uploads = append(uploads, &objectResource{Oid: u.OID, Size: u.Size})
}
objects, err := Batch(uploads)
if err != nil {
q.errorc <- err
sendApiEvent(apiEventFail)
return
}
for _, o := range objects {
if _, ok := o.Links["upload"]; ok {
// This object needs to be uploaded
if uploadable, ok := q.uploadables[o.Oid]; ok {
q.files++
q.wg.Add(1)
uploadable.object = o
q.uploadc <- uploadable
}
}
}
close(q.uploadc)
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files))
q.bar.Start()
sendApiEvent(apiEventSuccess) // Wake up upload workers
}
// Process starts the upload queue and displays a progress bar.
func (q *UploadQueue) Process() {
q.bar = pb.New64(q.size)
q.bar.SetUnits(pb.U_BYTES)
q.bar.ShowBar = false
// This goroutine collects errors returned from uploads
go func() {
for err := range q.errorc {
q.errors = append(q.errors, err)
}
}()
// This goroutine watches for apiEvents. In order to prevent multiple
// credential requests from happening, the queue is processed sequentially
// until an API request succeeds (meaning authenication has happened successfully).
// Once the an API request succeeds, all worker goroutines are woken up and allowed
// to process uploads. Once a success happens, this goroutine exits.
go func() {
for {
event := <-apiEvent
switch event {
case apiEventSuccess:
atomic.StoreInt32(&q.clientAuthorized, 1)
q.authCond.Broadcast() // Wake all remaining goroutines
return
case apiEventFail:
q.authCond.Signal() // Wake the next goroutine
}
}
}()
for i := 0; i < q.workers; i++ {
// These are the worker goroutines that process uploads
go func(n int) {
for upload := range q.uploadc {
cb := func(total, read int64, current int) error {
q.bar.Add(current)
if upload.CB != nil {
return upload.CB(total, read, current)
func (u *Uploadable) Transfer(cb CopyCallback) *WrappedError {
wcb := func(total, read int64, current int) error {
cb(total, read, current)
if u.CB != nil {
return u.CB(total, read, current)
}
return nil
}
err := UploadObject(upload.object, cb)
if err != nil {
q.errorc <- err
return UploadObject(u.object, wcb)
}
f := atomic.AddInt64(&q.finished, 1)
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files))
q.wg.Done()
}
}(i)
func (u *Uploadable) Object() *objectResource {
return u.object
}
if Config.BatchTransfer() {
q.processBatch()
} else {
q.processIndividual()
func (u *Uploadable) Oid() string {
return u.oid
}
q.wg.Wait()
close(q.errorc)
q.bar.Finish()
func (u *Uploadable) Size() int64 {
return u.size
}
// Errors returns any errors encountered during uploading.
func (q *UploadQueue) Errors() []*WrappedError {
return q.errors
func (u *Uploadable) SetObject(o *objectResource) {
u.object = o
}
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
func NewUploadQueue(workers, files int) *TransferQueue {
q := newTransferQueue(workers, files)
q.transferKind = "upload"
return q
}
// ensureFile makes sure that the cleanPath exists before pushing it. If it