Can batch downloads
This commit is contained in:
parent
c869fc6ef5
commit
3bb2846e6b
@ -10,9 +10,14 @@ import (
|
||||
"github.com/cheggaaa/pb"
|
||||
)
|
||||
|
||||
type Downloadable struct {
|
||||
Pointer *wrappedPointer
|
||||
Object *objectResource
|
||||
}
|
||||
|
||||
// DownloadQueue provides a queue that will allow concurrent uploads.
|
||||
type DownloadQueue struct {
|
||||
downloadc chan *wrappedPointer
|
||||
downloadc chan *Downloadable
|
||||
errorc chan *WrappedError
|
||||
errors []*WrappedError
|
||||
wg sync.WaitGroup
|
||||
@ -21,7 +26,7 @@ type DownloadQueue struct {
|
||||
finished int64
|
||||
size int64
|
||||
authCond *sync.Cond
|
||||
pointers []*wrappedPointer
|
||||
downloadables map[string]*Downloadable
|
||||
bar *pb.ProgressBar
|
||||
clientAuthorized int32
|
||||
}
|
||||
@ -29,30 +34,63 @@ type DownloadQueue struct {
|
||||
// NewDownloadQueue builds a DownloadQueue, allowing `workers` concurrent downloads.
|
||||
func NewDownloadQueue(workers, files int) *DownloadQueue {
|
||||
return &DownloadQueue{
|
||||
downloadc: make(chan *wrappedPointer, files),
|
||||
errorc: make(chan *WrappedError),
|
||||
workers: workers,
|
||||
files: files,
|
||||
authCond: sync.NewCond(&sync.Mutex{}),
|
||||
downloadc: make(chan *Downloadable, files),
|
||||
errorc: make(chan *WrappedError),
|
||||
workers: workers,
|
||||
files: files,
|
||||
authCond: sync.NewCond(&sync.Mutex{}),
|
||||
downloadables: make(map[string]*Downloadable),
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds an object to the download queue.
|
||||
func (q *DownloadQueue) Add(p *wrappedPointer) {
|
||||
q.pointers = append(q.pointers, p)
|
||||
q.downloadables[p.Oid] = &Downloadable{Pointer: p}
|
||||
}
|
||||
|
||||
func (q *DownloadQueue) processBatch() {
|
||||
q.files = 0
|
||||
downloads := make([]*objectResource, 0, len(q.downloadables))
|
||||
for _, d := range q.downloadables {
|
||||
downloads = append(downloads, &objectResource{Oid: d.Pointer.Oid, Size: d.Pointer.Size})
|
||||
}
|
||||
|
||||
objects, err := Batch(downloads)
|
||||
if err != nil {
|
||||
q.errorc <- err
|
||||
sendApiEvent(apiEventFail)
|
||||
return
|
||||
}
|
||||
|
||||
for _, o := range objects {
|
||||
if _, ok := o.Links["download"]; ok {
|
||||
// This object can be downloaded
|
||||
if downloadable, ok := q.downloadables[o.Oid]; ok {
|
||||
q.files++
|
||||
q.wg.Add(1)
|
||||
downloadable.Object = o
|
||||
q.downloadc <- downloadable
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
close(q.downloadc)
|
||||
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, q.files))
|
||||
q.bar.Start()
|
||||
sendApiEvent(apiEventSuccess) // Wake up upload workers
|
||||
}
|
||||
|
||||
// apiWorker processes the queue, making the POST calls and
|
||||
// feeding the results to uploadWorkers
|
||||
func (q *DownloadQueue) processIndividual() {
|
||||
apic := make(chan *wrappedPointer, q.files)
|
||||
apic := make(chan *Downloadable, q.files)
|
||||
workersReady := make(chan int, q.workers)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < q.workers; i++ {
|
||||
go func() {
|
||||
workersReady <- 1
|
||||
for p := range apic {
|
||||
for d := range apic {
|
||||
// If an API authorization has not occured, we wait until we're woken up.
|
||||
q.authCond.L.Lock()
|
||||
if atomic.LoadInt32(&q.clientAuthorized) == 0 {
|
||||
@ -60,7 +98,7 @@ func (q *DownloadQueue) processIndividual() {
|
||||
}
|
||||
q.authCond.L.Unlock()
|
||||
|
||||
_, err := DownloadCheck(p.Oid)
|
||||
obj, err := DownloadCheck(d.Pointer.Oid)
|
||||
if err != nil {
|
||||
q.errorc <- err
|
||||
wg.Done()
|
||||
@ -68,18 +106,19 @@ func (q *DownloadQueue) processIndividual() {
|
||||
}
|
||||
|
||||
q.wg.Add(1)
|
||||
q.downloadc <- p
|
||||
d.Object = obj
|
||||
q.downloadc <- d
|
||||
wg.Done()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.pointers)))
|
||||
q.bar.Prefix(fmt.Sprintf("(%d of %d files) ", q.finished, len(q.downloadables)))
|
||||
q.bar.Start()
|
||||
|
||||
for _, p := range q.pointers {
|
||||
for _, d := range q.downloadables {
|
||||
wg.Add(1)
|
||||
apic <- p
|
||||
apic <- d
|
||||
}
|
||||
|
||||
<-workersReady
|
||||
@ -126,8 +165,8 @@ func (q *DownloadQueue) Process() {
|
||||
// These are the worker goroutines that process uploads
|
||||
go func(n int) {
|
||||
|
||||
for ptr := range q.downloadc {
|
||||
fullPath := filepath.Join(LocalWorkingDir, ptr.Name)
|
||||
for d := range q.downloadc {
|
||||
fullPath := filepath.Join(LocalWorkingDir, d.Pointer.Name)
|
||||
output, err := os.Create(fullPath)
|
||||
if err != nil {
|
||||
q.errorc <- Error(err)
|
||||
@ -141,8 +180,7 @@ func (q *DownloadQueue) Process() {
|
||||
q.bar.Add(current)
|
||||
return nil
|
||||
}
|
||||
// TODO need a callback
|
||||
if err := PointerSmudge(output, ptr.Pointer, ptr.Name, cb); err != nil {
|
||||
if err := PointerSmudgeObject(output, d.Pointer.Pointer, d.Object, cb); err != nil {
|
||||
q.errorc <- Error(err)
|
||||
}
|
||||
|
||||
@ -153,11 +191,11 @@ func (q *DownloadQueue) Process() {
|
||||
}(i)
|
||||
}
|
||||
|
||||
// if Config.BatchTransfer() {
|
||||
// q.processBatch()
|
||||
// } else {
|
||||
q.processIndividual()
|
||||
// }
|
||||
if Config.BatchTransfer() {
|
||||
q.processBatch()
|
||||
} else {
|
||||
q.processIndividual()
|
||||
}
|
||||
|
||||
q.wg.Wait()
|
||||
close(q.errorc)
|
||||
|
@ -37,9 +37,75 @@ func PointerSmudge(writer io.Writer, ptr *Pointer, workingfile string, cb CopyCa
|
||||
|
||||
if wErr != nil {
|
||||
return &SmudgeError{ptr.Oid, mediafile, wErr}
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func PointerSmudgeObject(writer io.Writer, ptr *Pointer, obj *objectResource, cb CopyCallback) error {
|
||||
mediafile, err := LocalMediaPath(obj.Oid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stat, statErr := os.Stat(mediafile)
|
||||
if statErr == nil && stat != nil {
|
||||
fileSize := stat.Size()
|
||||
if fileSize == 0 || fileSize != obj.Size {
|
||||
tracerx.Printf("Removing %s, size %d is invalid", mediafile, fileSize)
|
||||
os.RemoveAll(mediafile)
|
||||
stat = nil
|
||||
}
|
||||
}
|
||||
|
||||
var wErr *WrappedError
|
||||
if statErr != nil || stat == nil {
|
||||
wErr = downloadObject(writer, ptr, obj, mediafile, cb)
|
||||
} else {
|
||||
wErr = readLocalFile(writer, ptr, mediafile, cb)
|
||||
}
|
||||
|
||||
if wErr != nil {
|
||||
sendApiEvent(apiEventFail)
|
||||
return &SmudgeError{obj.Oid, mediafile, wErr}
|
||||
}
|
||||
|
||||
sendApiEvent(apiEventSuccess)
|
||||
return nil
|
||||
}
|
||||
|
||||
func downloadObject(writer io.Writer, ptr *Pointer, obj *objectResource, mediafile string, cb CopyCallback) *WrappedError {
|
||||
reader, size, wErr := DownloadObject(obj)
|
||||
if reader != nil {
|
||||
defer reader.Close()
|
||||
}
|
||||
|
||||
// TODO this can be unified with the same code in downloadFile
|
||||
if wErr != nil {
|
||||
wErr.Errorf("Error downloading %s.", mediafile)
|
||||
return wErr
|
||||
}
|
||||
|
||||
if ptr.Size == 0 {
|
||||
ptr.Size = size
|
||||
}
|
||||
|
||||
mediaFile, err := contentaddressable.NewFile(mediafile)
|
||||
if err != nil {
|
||||
return Errorf(err, "Error opening media file buffer.")
|
||||
}
|
||||
|
||||
_, err = CopyWithCallback(mediaFile, reader, ptr.Size, cb)
|
||||
if err == nil {
|
||||
err = mediaFile.Accept()
|
||||
}
|
||||
mediaFile.Close()
|
||||
|
||||
if err != nil {
|
||||
return Errorf(err, "Error buffering media file.")
|
||||
}
|
||||
|
||||
return readLocalFile(writer, ptr, mediafile, nil)
|
||||
}
|
||||
|
||||
func downloadFile(writer io.Writer, ptr *Pointer, workingfile, mediafile string, cb CopyCallback) *WrappedError {
|
||||
|
Loading…
Reference in New Issue
Block a user