2015-05-21 16:36:49 +00:00
|
|
|
package lfs
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
2015-09-04 20:01:48 +00:00
|
|
|
"sync/atomic"
|
2015-05-21 17:47:52 +00:00
|
|
|
|
2015-06-02 15:19:59 +00:00
|
|
|
"github.com/github/git-lfs/git"
|
2015-07-09 20:51:55 +00:00
|
|
|
"github.com/github/git-lfs/vendor/_nuts/github.com/rubyist/tracerx"
|
2015-05-21 16:36:49 +00:00
|
|
|
)
|
|
|
|
|
2015-07-09 19:59:25 +00:00
|
|
|
const (
|
2015-09-04 20:01:48 +00:00
|
|
|
batchSize = 100
|
2015-07-09 19:59:25 +00:00
|
|
|
)
|
|
|
|
|
2015-05-21 16:36:49 +00:00
|
|
|
type Transferable interface {
|
2015-11-25 12:01:45 +00:00
|
|
|
Check() (*ObjectResource, error)
|
2015-08-21 15:48:52 +00:00
|
|
|
Transfer(CopyCallback) error
|
2015-11-25 12:01:45 +00:00
|
|
|
Object() *ObjectResource
|
2015-05-21 16:36:49 +00:00
|
|
|
Oid() string
|
|
|
|
Size() int64
|
2015-06-19 17:59:30 +00:00
|
|
|
Name() string
|
2015-11-25 12:01:45 +00:00
|
|
|
SetObject(*ObjectResource)
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// TransferQueue provides a queue that will allow concurrent transfers.
|
|
|
|
type TransferQueue struct {
|
2015-09-04 20:01:48 +00:00
|
|
|
retrying uint32
|
2015-07-27 16:43:41 +00:00
|
|
|
meter *ProgressMeter
|
2015-07-10 20:05:04 +00:00
|
|
|
workers int // Number of transfer workers to spawn
|
2015-07-09 18:57:05 +00:00
|
|
|
transferKind string
|
2015-08-21 15:48:52 +00:00
|
|
|
errors []error
|
2015-07-09 18:57:05 +00:00
|
|
|
transferables map[string]Transferable
|
2015-09-04 20:01:48 +00:00
|
|
|
retries []Transferable
|
2015-07-09 18:57:05 +00:00
|
|
|
batcher *Batcher
|
2015-08-21 15:48:52 +00:00
|
|
|
apic chan Transferable // Channel for processing individual API requests
|
|
|
|
transferc chan Transferable // Channel for processing transfers
|
2015-09-04 20:01:48 +00:00
|
|
|
retriesc chan Transferable // Channel for processing retries
|
2015-08-21 15:48:52 +00:00
|
|
|
errorc chan error // Channel for processing errors
|
2015-07-09 18:57:05 +00:00
|
|
|
watchers []chan string
|
2015-09-04 15:21:26 +00:00
|
|
|
errorwait sync.WaitGroup
|
2015-09-04 20:01:48 +00:00
|
|
|
retrywait sync.WaitGroup
|
2015-07-09 18:57:05 +00:00
|
|
|
wait sync.WaitGroup
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// newTransferQueue builds a TransferQueue, allowing `workers` concurrent transfers.
|
2015-07-30 14:43:50 +00:00
|
|
|
func newTransferQueue(files int, size int64, dryRun bool) *TransferQueue {
|
2015-07-09 18:21:49 +00:00
|
|
|
q := &TransferQueue{
|
2015-07-30 14:23:01 +00:00
|
|
|
meter: NewProgressMeter(files, size, dryRun),
|
2015-07-09 19:59:25 +00:00
|
|
|
apic: make(chan Transferable, batchSize),
|
|
|
|
transferc: make(chan Transferable, batchSize),
|
2015-09-04 20:01:48 +00:00
|
|
|
retriesc: make(chan Transferable, batchSize),
|
2015-08-21 15:48:52 +00:00
|
|
|
errorc: make(chan error),
|
2015-07-30 14:43:50 +00:00
|
|
|
workers: Config.ConcurrentTransfers(),
|
2015-05-21 16:36:49 +00:00
|
|
|
transferables: make(map[string]Transferable),
|
|
|
|
}
|
2015-07-09 18:21:49 +00:00
|
|
|
|
2015-09-04 15:21:26 +00:00
|
|
|
q.errorwait.Add(1)
|
2015-09-04 20:01:48 +00:00
|
|
|
q.retrywait.Add(1)
|
2015-09-04 15:21:26 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
q.run()
|
|
|
|
|
|
|
|
return q
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
2015-05-26 13:56:43 +00:00
|
|
|
// Add adds a Transferable to the transfer queue.
|
2015-05-21 16:36:49 +00:00
|
|
|
func (q *TransferQueue) Add(t Transferable) {
|
2015-07-09 18:21:49 +00:00
|
|
|
q.wait.Add(1)
|
2015-05-21 16:36:49 +00:00
|
|
|
q.transferables[t.Oid()] = t
|
2015-07-09 18:21:49 +00:00
|
|
|
|
|
|
|
if q.batcher != nil {
|
|
|
|
q.batcher.Add(t)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
q.apic <- t
|
|
|
|
}
|
|
|
|
|
2015-09-09 14:24:50 +00:00
|
|
|
// Wait waits for the queue to finish processing all transfers. Once Wait is
|
|
|
|
// called, Add will no longer add transferables to the queue. Any failed
|
|
|
|
// transfers will be automatically retried once.
|
2015-07-09 18:21:49 +00:00
|
|
|
func (q *TransferQueue) Wait() {
|
|
|
|
if q.batcher != nil {
|
|
|
|
q.batcher.Exit()
|
|
|
|
}
|
2015-07-10 20:05:04 +00:00
|
|
|
|
2015-09-04 20:01:48 +00:00
|
|
|
q.wait.Wait()
|
|
|
|
|
|
|
|
// Handle any retries
|
|
|
|
close(q.retriesc)
|
|
|
|
q.retrywait.Wait()
|
|
|
|
atomic.StoreUint32(&q.retrying, 1)
|
|
|
|
|
2015-09-21 14:07:51 +00:00
|
|
|
if len(q.retries) > 0 {
|
2015-09-08 14:09:57 +00:00
|
|
|
tracerx.Printf("tq: retrying %d failed transfers", len(q.retries))
|
2015-09-04 20:01:48 +00:00
|
|
|
for _, t := range q.retries {
|
|
|
|
q.Add(t)
|
|
|
|
}
|
2015-09-21 14:07:51 +00:00
|
|
|
if q.batcher != nil {
|
|
|
|
q.batcher.Exit()
|
|
|
|
}
|
2015-09-04 20:01:48 +00:00
|
|
|
q.wait.Wait()
|
|
|
|
}
|
|
|
|
|
2015-09-09 14:24:50 +00:00
|
|
|
atomic.StoreUint32(&q.retrying, 0)
|
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
close(q.apic)
|
|
|
|
close(q.transferc)
|
|
|
|
close(q.errorc)
|
2015-07-10 20:05:04 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
for _, watcher := range q.watchers {
|
|
|
|
close(watcher)
|
|
|
|
}
|
2015-07-10 19:25:59 +00:00
|
|
|
|
2015-07-27 16:43:41 +00:00
|
|
|
q.meter.Finish()
|
2015-09-04 15:21:26 +00:00
|
|
|
q.errorwait.Wait()
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
2015-05-27 19:45:18 +00:00
|
|
|
// Watch returns a channel where the queue will write the OID of each transfer
|
|
|
|
// as it completes. The channel will be closed when the queue finishes processing.
|
|
|
|
func (q *TransferQueue) Watch() chan string {
|
2015-07-09 19:59:25 +00:00
|
|
|
c := make(chan string, batchSize)
|
2015-05-27 19:45:18 +00:00
|
|
|
q.watchers = append(q.watchers, c)
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
// individualApiRoutine processes the queue of transfers one at a time by making
|
2015-05-26 13:56:43 +00:00
|
|
|
// a POST call for each object, feeding the results to the transfer workers.
|
|
|
|
// If configured, the object transfers can still happen concurrently, the
|
|
|
|
// sequential nature here is only for the meta POST calls.
|
2015-07-09 18:21:49 +00:00
|
|
|
func (q *TransferQueue) individualApiRoutine(apiWaiter chan interface{}) {
|
|
|
|
for t := range q.apic {
|
|
|
|
obj, err := t.Check()
|
|
|
|
if err != nil {
|
2015-09-04 20:01:48 +00:00
|
|
|
if q.canRetry(err) {
|
|
|
|
q.retry(t)
|
2015-08-18 19:53:04 +00:00
|
|
|
} else {
|
|
|
|
q.errorc <- err
|
|
|
|
}
|
2015-08-11 16:33:28 +00:00
|
|
|
q.wait.Done()
|
2015-07-09 18:21:49 +00:00
|
|
|
continue
|
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
if apiWaiter != nil { // Signal to launch more individual api workers
|
2015-07-31 14:30:08 +00:00
|
|
|
q.meter.Start()
|
2015-07-09 18:21:49 +00:00
|
|
|
select {
|
|
|
|
case apiWaiter <- 1:
|
|
|
|
default:
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if obj != nil {
|
|
|
|
t.SetObject(obj)
|
2015-07-27 20:41:57 +00:00
|
|
|
q.meter.Add(t.Name())
|
2015-07-09 18:21:49 +00:00
|
|
|
q.transferc <- t
|
2015-08-10 14:07:20 +00:00
|
|
|
} else {
|
|
|
|
q.meter.Skip(t.Size())
|
|
|
|
q.wait.Done()
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
|
2015-07-09 19:57:33 +00:00
|
|
|
// legacyFallback is used when a batch request is made to a server that does
|
|
|
|
// not support the batch endpoint. When this happens, the Transferables are
|
2015-07-29 15:06:38 +00:00
|
|
|
// fed from the batcher into apic to be processed individually.
|
2015-07-09 19:57:33 +00:00
|
|
|
func (q *TransferQueue) legacyFallback(failedBatch []Transferable) {
|
|
|
|
tracerx.Printf("tq: batch api not implemented, falling back to individual")
|
|
|
|
|
|
|
|
q.launchIndividualApiRoutines()
|
|
|
|
|
|
|
|
for _, t := range failedBatch {
|
|
|
|
q.apic <- t
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
batch := q.batcher.Next()
|
|
|
|
if batch == nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, t := range batch {
|
|
|
|
q.apic <- t
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
// batchApiRoutine processes the queue of transfers using the batch endpoint,
|
|
|
|
// making only one POST call for all objects. The results are then handed
|
|
|
|
// off to the transfer workers.
|
|
|
|
func (q *TransferQueue) batchApiRoutine() {
|
2015-07-31 14:30:08 +00:00
|
|
|
var startProgress sync.Once
|
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
for {
|
|
|
|
batch := q.batcher.Next()
|
|
|
|
if batch == nil {
|
|
|
|
break
|
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
|
2015-07-09 20:01:57 +00:00
|
|
|
tracerx.Printf("tq: sending batch of size %d", len(batch))
|
|
|
|
|
2015-11-25 12:01:45 +00:00
|
|
|
transfers := make([]*ObjectResource, 0, len(batch))
|
2015-07-09 18:21:49 +00:00
|
|
|
for _, t := range batch {
|
2015-11-25 12:01:45 +00:00
|
|
|
transfers = append(transfers, &ObjectResource{Oid: t.Oid(), Size: t.Size()})
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
objects, err := Batch(transfers, q.transferKind)
|
|
|
|
if err != nil {
|
2015-08-21 15:48:52 +00:00
|
|
|
if IsNotImplementedError(err) {
|
2015-09-02 19:28:30 +00:00
|
|
|
git.Config.SetLocal("", "lfs.batch", "false")
|
2015-07-09 19:57:33 +00:00
|
|
|
|
|
|
|
go q.legacyFallback(batch)
|
|
|
|
return
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-08-17 21:21:08 +00:00
|
|
|
|
2015-09-04 20:01:48 +00:00
|
|
|
if q.canRetry(err) {
|
2015-08-18 19:53:04 +00:00
|
|
|
for _, t := range batch {
|
2015-09-04 20:01:48 +00:00
|
|
|
q.retry(t)
|
2015-08-18 19:53:04 +00:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
q.errorc <- err
|
|
|
|
}
|
2015-09-01 18:31:40 +00:00
|
|
|
|
2015-08-18 19:53:04 +00:00
|
|
|
q.wait.Add(-len(transfers))
|
2015-07-15 21:33:37 +00:00
|
|
|
continue
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-06-15 11:46:02 +00:00
|
|
|
|
2015-07-31 14:30:08 +00:00
|
|
|
startProgress.Do(q.meter.Start)
|
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
for _, o := range objects {
|
2015-09-21 20:19:50 +00:00
|
|
|
if o.Error != nil {
|
2015-10-13 14:35:13 +00:00
|
|
|
q.errorc <- Errorf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
|
2015-09-21 20:19:50 +00:00
|
|
|
q.meter.Skip(o.Size)
|
|
|
|
q.wait.Done()
|
|
|
|
continue
|
|
|
|
}
|
2015-08-07 13:32:28 +00:00
|
|
|
|
2015-09-21 20:19:50 +00:00
|
|
|
if _, ok := o.Rel(q.transferKind); ok {
|
2015-07-27 12:58:27 +00:00
|
|
|
// This object needs to be transferred
|
2015-07-09 18:21:49 +00:00
|
|
|
if transfer, ok := q.transferables[o.Oid]; ok {
|
|
|
|
transfer.SetObject(o)
|
2015-07-27 20:41:57 +00:00
|
|
|
q.meter.Add(transfer.Name())
|
2015-07-09 18:21:49 +00:00
|
|
|
q.transferc <- transfer
|
2015-07-17 01:29:32 +00:00
|
|
|
} else {
|
2015-07-27 20:41:57 +00:00
|
|
|
q.meter.Skip(transfer.Size())
|
2015-07-17 01:29:32 +00:00
|
|
|
q.wait.Done()
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-07-17 01:29:32 +00:00
|
|
|
} else {
|
2015-07-27 20:41:57 +00:00
|
|
|
q.meter.Skip(o.Size)
|
2015-07-17 01:29:32 +00:00
|
|
|
q.wait.Done()
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
// This goroutine collects errors returned from transfers
|
|
|
|
func (q *TransferQueue) errorCollector() {
|
|
|
|
for err := range q.errorc {
|
|
|
|
q.errors = append(q.errors, err)
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
2015-09-04 15:21:26 +00:00
|
|
|
q.errorwait.Done()
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
|
2015-09-04 20:01:48 +00:00
|
|
|
func (q *TransferQueue) retryCollector() {
|
|
|
|
for t := range q.retriesc {
|
|
|
|
q.retries = append(q.retries, t)
|
|
|
|
}
|
|
|
|
q.retrywait.Done()
|
|
|
|
}
|
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
func (q *TransferQueue) transferWorker() {
|
|
|
|
for transfer := range q.transferc {
|
|
|
|
cb := func(total, read int64, current int) error {
|
2015-07-27 20:55:23 +00:00
|
|
|
q.meter.TransferBytes(q.transferKind, transfer.Name(), read, total, current)
|
2015-07-09 18:21:49 +00:00
|
|
|
return nil
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
if err := transfer.Transfer(cb); err != nil {
|
2015-09-04 20:01:48 +00:00
|
|
|
if q.canRetry(err) {
|
2015-08-18 19:53:04 +00:00
|
|
|
tracerx.Printf("tq: retrying object %s", transfer.Oid())
|
2015-09-04 20:01:48 +00:00
|
|
|
q.retry(transfer)
|
2015-08-17 23:02:20 +00:00
|
|
|
} else {
|
|
|
|
q.errorc <- err
|
|
|
|
}
|
2015-07-09 18:21:49 +00:00
|
|
|
} else {
|
|
|
|
oid := transfer.Oid()
|
|
|
|
for _, c := range q.watchers {
|
|
|
|
c <- oid
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-07-27 20:55:23 +00:00
|
|
|
q.meter.FinishTransfer(transfer.Name())
|
2015-07-10 19:25:59 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
q.wait.Done()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// launchIndividualApiRoutines first launches a single api worker. When it
|
|
|
|
// receives the first successful api request it launches workers - 1 more
|
|
|
|
// workers. This prevents being prompted for credentials multiple times at once
|
|
|
|
// when they're needed.
|
|
|
|
func (q *TransferQueue) launchIndividualApiRoutines() {
|
2015-06-19 17:59:30 +00:00
|
|
|
go func() {
|
2015-07-09 18:21:49 +00:00
|
|
|
apiWaiter := make(chan interface{})
|
|
|
|
go q.individualApiRoutine(apiWaiter)
|
2015-06-19 17:59:30 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
<-apiWaiter
|
2015-06-19 17:59:30 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
for i := 0; i < q.workers-1; i++ {
|
|
|
|
go q.individualApiRoutine(nil)
|
|
|
|
}
|
2015-06-19 17:59:30 +00:00
|
|
|
}()
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-06-19 17:59:30 +00:00
|
|
|
|
2015-07-10 20:05:04 +00:00
|
|
|
// run starts the transfer queue, doing individual or batch transfers depending
|
|
|
|
// on the Config.BatchTransfer() value. run will transfer files sequentially or
|
|
|
|
// concurrently depending on the Config.ConcurrentTransfers() value.
|
2015-07-09 18:21:49 +00:00
|
|
|
func (q *TransferQueue) run() {
|
|
|
|
go q.errorCollector()
|
2015-09-04 20:01:48 +00:00
|
|
|
go q.retryCollector()
|
2015-06-19 17:59:30 +00:00
|
|
|
|
2015-07-09 20:01:57 +00:00
|
|
|
tracerx.Printf("tq: starting %d transfer workers", q.workers)
|
2015-05-21 16:36:49 +00:00
|
|
|
for i := 0; i < q.workers; i++ {
|
2015-07-09 18:21:49 +00:00
|
|
|
go q.transferWorker()
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if Config.BatchTransfer() {
|
2015-07-09 20:01:57 +00:00
|
|
|
tracerx.Printf("tq: running as batched queue, batch size of %d", batchSize)
|
2015-07-09 19:59:25 +00:00
|
|
|
q.batcher = NewBatcher(batchSize)
|
2015-07-09 18:21:49 +00:00
|
|
|
go q.batchApiRoutine()
|
2015-05-21 16:36:49 +00:00
|
|
|
} else {
|
2015-07-09 20:01:57 +00:00
|
|
|
tracerx.Printf("tq: running as individual queue")
|
2015-07-09 18:21:49 +00:00
|
|
|
q.launchIndividualApiRoutines()
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-09-04 20:01:48 +00:00
|
|
|
func (q *TransferQueue) retry(t Transferable) {
|
|
|
|
q.retriesc <- t
|
|
|
|
}
|
2015-08-18 19:53:04 +00:00
|
|
|
|
2015-09-04 20:01:48 +00:00
|
|
|
func (q *TransferQueue) canRetry(err error) bool {
|
|
|
|
if !IsRetriableError(err) || atomic.LoadUint32(&q.retrying) == 1 {
|
2015-08-17 23:02:20 +00:00
|
|
|
return false
|
|
|
|
}
|
2015-09-04 13:42:37 +00:00
|
|
|
|
2015-08-17 23:02:20 +00:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2015-05-26 13:56:43 +00:00
|
|
|
// Errors returns any errors encountered during transfer.
|
2015-08-21 15:48:52 +00:00
|
|
|
func (q *TransferQueue) Errors() []error {
|
2015-05-21 16:36:49 +00:00
|
|
|
return q.errors
|
|
|
|
}
|