From a3bb99756735f33331a42721fb20c759f34b52c1 Mon Sep 17 00:00:00 2001 From: rubyist Date: Thu, 21 May 2015 12:36:49 -0400 Subject: [PATCH] Pull common queue code into a TransferQueue, reduce duplicated code --- commands/command_get.go | 2 +- lfs/download_queue.go | 222 ++++++-------------------------------- lfs/transfer_queue.go | 205 +++++++++++++++++++++++++++++++++++ lfs/upload_queue.go | 231 +++++++--------------------------------- 4 files changed, 279 insertions(+), 381 deletions(-) create mode 100644 lfs/transfer_queue.go diff --git a/commands/command_get.go b/commands/command_get.go index d30a89dd..1e8d8b26 100644 --- a/commands/command_get.go +++ b/commands/command_get.go @@ -36,7 +36,7 @@ func getCommand(cmd *cobra.Command, args []string) { q := lfs.NewDownloadQueue(lfs.Config.ConcurrentTransfers(), len(pointers)) for _, p := range pointers { - q.Add(p) + q.Add(lfs.NewDownloadable(p)) } q.Process() diff --git a/lfs/download_queue.go b/lfs/download_queue.go index d46a1065..4822d4ea 100644 --- a/lfs/download_queue.go +++ b/lfs/download_queue.go @@ -1,197 +1,45 @@ package lfs -import ( - "fmt" - "github.com/cheggaaa/pb" - "sync" - "sync/atomic" -) - type Downloadable struct { Pointer *wrappedPointer - Object *objectResource + object *objectResource } -// DownloadQueue provides a queue that will allow concurrent uploads. -type DownloadQueue struct { - downloadc chan *Downloadable - errorc chan *WrappedError - errors []*WrappedError - wg sync.WaitGroup - workers int - files int - finished int64 - size int64 - authCond *sync.Cond - downloadables map[string]*Downloadable - bar *pb.ProgressBar - clientAuthorized int32 +func NewDownloadable(p *wrappedPointer) *Downloadable { + return &Downloadable{Pointer: p} +} + +func (d *Downloadable) Check() (*objectResource, *WrappedError) { + return DownloadCheck(d.Pointer.Oid) +} + +func (d *Downloadable) Transfer(cb CopyCallback) *WrappedError { + err := PointerSmudgeObject(d.Pointer.Pointer, d.object, cb) + if err != nil { + return Error(err) + } + return nil +} + +func (d *Downloadable) Object() *objectResource { + return d.object +} + +func (d *Downloadable) Oid() string { + return d.Pointer.Oid +} + +func (d *Downloadable) Size() int64 { + return d.Pointer.Size +} + +func (d *Downloadable) SetObject(o *objectResource) { + d.object = o } // NewDownloadQueue builds a DownloadQueue, allowing `workers` concurrent downloads. -func NewDownloadQueue(workers, files int) *DownloadQueue { - return &DownloadQueue{ - downloadc: make(chan *Downloadable, files), - errorc: make(chan *WrappedError), - workers: workers, - files: files, - authCond: sync.NewCond(&sync.Mutex{}), - downloadables: make(map[string]*Downloadable), - } -} - -// Add adds an object to the download queue. -func (q *DownloadQueue) Add(p *wrappedPointer) { - q.downloadables[p.Oid] = &Downloadable{Pointer: p} -} - -func (q *DownloadQueue) processBatch() { - q.files = 0 - downloads := make([]*objectResource, 0, len(q.downloadables)) - for _, d := range q.downloadables { - downloads = append(downloads, &objectResource{Oid: d.Pointer.Oid, Size: d.Pointer.Size}) - } - - objects, err := Batch(downloads) - if err != nil { - q.errorc <- err - sendApiEvent(apiEventFail) - return - } - - for _, o := range objects { - if _, ok := o.Links["download"]; ok { - // This object can be downloaded - if downloadable, ok := q.downloadables[o.Oid]; ok { - q.files++ - q.wg.Add(1) - downloadable.Object = o - q.downloadc <- downloadable - } - } - } - - close(q.downloadc) - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) - q.bar.Start() - sendApiEvent(apiEventSuccess) // Wake up upload workers -} - -// apiWorker processes the queue, making the POST calls and -// feeding the results to uploadWorkers -func (q *DownloadQueue) processIndividual() { - apic := make(chan *Downloadable, q.files) - workersReady := make(chan int, q.workers) - var wg sync.WaitGroup - - for i := 0; i < q.workers; i++ { - go func() { - workersReady <- 1 - for d := range apic { - // If an API authorization has not occured, we wait until we're woken up. - q.authCond.L.Lock() - if atomic.LoadInt32(&q.clientAuthorized) == 0 { - q.authCond.Wait() - } - q.authCond.L.Unlock() - - obj, err := DownloadCheck(d.Pointer.Oid) - if err != nil { - q.errorc <- err - wg.Done() - continue - } - - q.wg.Add(1) - d.Object = obj - q.downloadc <- d - wg.Done() - } - }() - } - - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.downloadables))) - q.bar.Start() - - for _, d := range q.downloadables { - wg.Add(1) - apic <- d - } - - <-workersReady - q.authCond.Signal() // Signal the first goroutine to run - close(apic) - wg.Wait() - - close(q.downloadc) -} - -// Process starts the download queue and displays a progress bar. -func (q *DownloadQueue) Process() { - q.bar = pb.New64(q.size) - q.bar.SetUnits(pb.U_BYTES) - q.bar.ShowBar = false - - // This goroutine collects errors returned from downloads - go func() { - for err := range q.errorc { - q.errors = append(q.errors, err) - } - }() - - // This goroutine watches for apiEvents. In order to prevent multiple - // credential requests from happening, the queue is processed sequentially - // until an API request succeeds (meaning authenication has happened successfully). - // Once the an API request succeeds, all worker goroutines are woken up and allowed - // to process downloads. Once a success happens, this goroutine exits. - go func() { - for { - event := <-apiEvent - switch event { - case apiEventSuccess: - atomic.StoreInt32(&q.clientAuthorized, 1) - q.authCond.Broadcast() // Wake all remaining goroutines - return - case apiEventFail: - q.authCond.Signal() // Wake the next goroutine - } - } - }() - - for i := 0; i < q.workers; i++ { - // These are the worker goroutines that process uploads - go func(n int) { - - for d := range q.downloadc { - cb := func(total, read int64, current int) error { - q.bar.Add(current) - return nil - } - - if err := PointerSmudgeObject(d.Pointer.Pointer, d.Object, cb); err != nil { - q.errorc <- Error(err) - } - - f := atomic.AddInt64(&q.finished, 1) - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) - q.wg.Done() - } - }(i) - } - - if Config.BatchTransfer() { - q.processBatch() - } else { - q.processIndividual() - } - - q.wg.Wait() - close(q.errorc) - - q.bar.Finish() -} - -// Errors returns any errors encountered during uploading. -func (q *DownloadQueue) Errors() []*WrappedError { - return q.errors +func NewDownloadQueue(workers, files int) *TransferQueue { + q := newTransferQueue(workers, files) + q.transferKind = "download" + return q } diff --git a/lfs/transfer_queue.go b/lfs/transfer_queue.go new file mode 100644 index 00000000..acf1ff9f --- /dev/null +++ b/lfs/transfer_queue.go @@ -0,0 +1,205 @@ +package lfs + +import ( + "fmt" + "github.com/cheggaaa/pb" + "sync" + "sync/atomic" +) + +type Transferable interface { + Check() (*objectResource, *WrappedError) + Transfer(CopyCallback) *WrappedError + Object() *objectResource + Oid() string + Size() int64 + SetObject(*objectResource) +} + +// TransferQueue provides a queue that will allow concurrent transfers. +type TransferQueue struct { + transferc chan Transferable + errorc chan *WrappedError + errors []*WrappedError + wg sync.WaitGroup + workers int + files int + finished int64 + size int64 + authCond *sync.Cond + transferables map[string]Transferable + bar *pb.ProgressBar + clientAuthorized int32 + transferKind string +} + +// newTransferQueue builds a TransferQueue, allowing `workers` concurrent transfers. +func newTransferQueue(workers, files int) *TransferQueue { + return &TransferQueue{ + transferc: make(chan Transferable, files), + errorc: make(chan *WrappedError), + workers: workers, + files: files, + authCond: sync.NewCond(&sync.Mutex{}), + transferables: make(map[string]Transferable), + } +} + +// Add adds an Uploadable to the upload queue. +func (q *TransferQueue) Add(t Transferable) { + q.transferables[t.Oid()] = t +} + +// apiWorker processes the queue, making the POST calls and +// feeding the results to uploadWorkers +func (q *TransferQueue) processIndividual() { + apic := make(chan Transferable, q.files) + workersReady := make(chan int, q.workers) + var wg sync.WaitGroup + + for i := 0; i < q.workers; i++ { + go func() { + workersReady <- 1 + for t := range apic { + // If an API authorization has not occured, we wait until we're woken up. + q.authCond.L.Lock() + if atomic.LoadInt32(&q.clientAuthorized) == 0 { + q.authCond.Wait() + } + q.authCond.L.Unlock() + + obj, err := t.Check() + if err != nil { + q.errorc <- err + wg.Done() + continue + } + if obj != nil { + q.wg.Add(1) + t.SetObject(obj) + q.transferc <- t + } + wg.Done() + } + }() + } + + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.transferables))) + q.bar.Start() + + for _, t := range q.transferables { + wg.Add(1) + apic <- t + } + + <-workersReady + q.authCond.Signal() // Signal the first goroutine to run + close(apic) + wg.Wait() + + close(q.transferc) +} + +// batchWorker makes the batch POST call, feeding the results +// to the transfer workers +func (q *TransferQueue) processBatch() { + q.files = 0 + transfers := make([]*objectResource, 0, len(q.transferables)) + for _, t := range q.transferables { + transfers = append(transfers, &objectResource{Oid: t.Oid(), Size: t.Size()}) + } + + objects, err := Batch(transfers) + if err != nil { + q.errorc <- err + sendApiEvent(apiEventFail) + return + } + + for _, o := range objects { + if _, ok := o.Links[q.transferKind]; ok { + // This object needs to be transfered + if transfer, ok := q.transferables[o.Oid]; ok { + q.files++ + q.wg.Add(1) + transfer.SetObject(o) + q.transferc <- transfer + } + } + } + + close(q.transferc) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) + q.bar.Start() + sendApiEvent(apiEventSuccess) // Wake up upload workers +} + +// Process starts the upload queue and displays a progress bar. +func (q *TransferQueue) Process() { + q.bar = pb.New64(q.size) + q.bar.SetUnits(pb.U_BYTES) + q.bar.ShowBar = false + + // This goroutine collects errors returned from uploads + go func() { + for err := range q.errorc { + q.errors = append(q.errors, err) + } + }() + + // This goroutine watches for apiEvents. In order to prevent multiple + // credential requests from happening, the queue is processed sequentially + // until an API request succeeds (meaning authenication has happened successfully). + // Once the an API request succeeds, all worker goroutines are woken up and allowed + // to process uploads. Once a success happens, this goroutine exits. + go func() { + for { + event := <-apiEvent + switch event { + case apiEventSuccess: + atomic.StoreInt32(&q.clientAuthorized, 1) + q.authCond.Broadcast() // Wake all remaining goroutines + return + case apiEventFail: + q.authCond.Signal() // Wake the next goroutine + } + } + }() + + for i := 0; i < q.workers; i++ { + // These are the worker goroutines that process uploads + go func(n int) { + + for transfer := range q.transferc { + cb := func(total, read int64, current int) error { + q.bar.Add(current) + return nil + } + + if err := transfer.Transfer(cb); err != nil { + q.errorc <- err + } + + f := atomic.AddInt64(&q.finished, 1) + q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) + q.wg.Done() + } + }(i) + } + + if Config.BatchTransfer() { + q.processBatch() + } else { + q.processIndividual() + } + + q.wg.Wait() + close(q.errorc) + + q.bar.Finish() +} + +// Errors returns any errors encountered during uploading. +func (q *TransferQueue) Errors() []*WrappedError { + return q.errors +} diff --git a/lfs/upload_queue.go b/lfs/upload_queue.go index 6abdbe8b..002f0521 100644 --- a/lfs/upload_queue.go +++ b/lfs/upload_queue.go @@ -2,20 +2,17 @@ package lfs import ( "fmt" - "github.com/cheggaaa/pb" "os" "path/filepath" - "sync" - "sync/atomic" ) // Uploadable describes a file that can be uploaded. type Uploadable struct { - OID string - OIDPath string + oid string + OidPath string Filename string CB CopyCallback - Size int64 + size int64 object *objectResource } @@ -44,198 +41,46 @@ func NewUploadable(oid, filename string, index, totalFiles int) (*Uploadable, *W defer file.Close() } - return &Uploadable{OID: oid, OIDPath: path, Filename: filename, CB: cb, Size: fi.Size()}, nil + return &Uploadable{oid: oid, OidPath: path, Filename: filename, CB: cb, size: fi.Size()}, nil } -// UploadQueue provides a queue that will allow concurrent uploads. -type UploadQueue struct { - uploadc chan *Uploadable - errorc chan *WrappedError - errors []*WrappedError - wg sync.WaitGroup - workers int - files int - finished int64 - size int64 - authCond *sync.Cond - uploadables map[string]*Uploadable - bar *pb.ProgressBar - clientAuthorized int32 +func (u *Uploadable) Check() (*objectResource, *WrappedError) { + return UploadCheck(u.OidPath) +} + +func (u *Uploadable) Transfer(cb CopyCallback) *WrappedError { + wcb := func(total, read int64, current int) error { + cb(total, read, current) + if u.CB != nil { + return u.CB(total, read, current) + } + return nil + } + + return UploadObject(u.object, wcb) +} + +func (u *Uploadable) Object() *objectResource { + return u.object +} + +func (u *Uploadable) Oid() string { + return u.oid +} + +func (u *Uploadable) Size() int64 { + return u.size +} + +func (u *Uploadable) SetObject(o *objectResource) { + u.object = o } // NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads. -func NewUploadQueue(workers, files int) *UploadQueue { - return &UploadQueue{ - uploadc: make(chan *Uploadable, files), - errorc: make(chan *WrappedError), - workers: workers, - files: files, - authCond: sync.NewCond(&sync.Mutex{}), - uploadables: make(map[string]*Uploadable), - } -} - -// Add adds an Uploadable to the upload queue. -func (q *UploadQueue) Add(u *Uploadable) { - q.uploadables[u.OID] = u -} - -// apiWorker processes the queue, making the POST calls and -// feeding the results to uploadWorkers -func (q *UploadQueue) processIndividual() { - apic := make(chan *Uploadable, q.files) - workersReady := make(chan int, q.workers) - var wg sync.WaitGroup - - for i := 0; i < q.workers; i++ { - go func() { - workersReady <- 1 - for u := range apic { - // If an API authorization has not occured, we wait until we're woken up. - q.authCond.L.Lock() - if atomic.LoadInt32(&q.clientAuthorized) == 0 { - q.authCond.Wait() - } - q.authCond.L.Unlock() - - obj, err := UploadCheck(u.OIDPath) - if err != nil { - q.errorc <- err - wg.Done() - continue - } - if obj != nil { - q.wg.Add(1) - u.object = obj - q.uploadc <- u - } - wg.Done() - } - }() - } - - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.uploadables))) - q.bar.Start() - - for _, u := range q.uploadables { - wg.Add(1) - apic <- u - } - - <-workersReady - q.authCond.Signal() // Signal the first goroutine to run - close(apic) - wg.Wait() - - close(q.uploadc) -} - -// batchWorker makes the batch POST call, feeding the results -// to the uploadWorkers -func (q *UploadQueue) processBatch() { - q.files = 0 - uploads := make([]*objectResource, 0, len(q.uploadables)) - for _, u := range q.uploadables { - uploads = append(uploads, &objectResource{Oid: u.OID, Size: u.Size}) - } - - objects, err := Batch(uploads) - if err != nil { - q.errorc <- err - sendApiEvent(apiEventFail) - return - } - - for _, o := range objects { - if _, ok := o.Links["upload"]; ok { - // This object needs to be uploaded - if uploadable, ok := q.uploadables[o.Oid]; ok { - q.files++ - q.wg.Add(1) - uploadable.object = o - q.uploadc <- uploadable - } - } - } - - close(q.uploadc) - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files)) - q.bar.Start() - sendApiEvent(apiEventSuccess) // Wake up upload workers -} - -// Process starts the upload queue and displays a progress bar. -func (q *UploadQueue) Process() { - q.bar = pb.New64(q.size) - q.bar.SetUnits(pb.U_BYTES) - q.bar.ShowBar = false - - // This goroutine collects errors returned from uploads - go func() { - for err := range q.errorc { - q.errors = append(q.errors, err) - } - }() - - // This goroutine watches for apiEvents. In order to prevent multiple - // credential requests from happening, the queue is processed sequentially - // until an API request succeeds (meaning authenication has happened successfully). - // Once the an API request succeeds, all worker goroutines are woken up and allowed - // to process uploads. Once a success happens, this goroutine exits. - go func() { - for { - event := <-apiEvent - switch event { - case apiEventSuccess: - atomic.StoreInt32(&q.clientAuthorized, 1) - q.authCond.Broadcast() // Wake all remaining goroutines - return - case apiEventFail: - q.authCond.Signal() // Wake the next goroutine - } - } - }() - - for i := 0; i < q.workers; i++ { - // These are the worker goroutines that process uploads - go func(n int) { - - for upload := range q.uploadc { - cb := func(total, read int64, current int) error { - q.bar.Add(current) - if upload.CB != nil { - return upload.CB(total, read, current) - } - return nil - } - - err := UploadObject(upload.object, cb) - if err != nil { - q.errorc <- err - } - - f := atomic.AddInt64(&q.finished, 1) - q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", f, q.files)) - q.wg.Done() - } - }(i) - } - - if Config.BatchTransfer() { - q.processBatch() - } else { - q.processIndividual() - } - - q.wg.Wait() - close(q.errorc) - - q.bar.Finish() -} - -// Errors returns any errors encountered during uploading. -func (q *UploadQueue) Errors() []*WrappedError { - return q.errors +func NewUploadQueue(workers, files int) *TransferQueue { + q := newTransferQueue(workers, files) + q.transferKind = "upload" + return q } // ensureFile makes sure that the cleanPath exists before pushing it. If it