git-lfs/lfs/transfer_queue.go
2015-09-02 15:25:40 -04:00

328 lines
7.9 KiB
Go

package lfs
import (
"path/filepath"
"sync"
"github.com/github/git-lfs/git"
"github.com/github/git-lfs/vendor/_nuts/github.com/rubyist/tracerx"
)
const (
batchSize = 100
maxBatchRetries = 3
)
type Transferable interface {
Check() (*objectResource, error)
Transfer(CopyCallback) error
Object() *objectResource
Oid() string
Size() int64
Name() string
SetObject(*objectResource)
}
// TransferQueue provides a queue that will allow concurrent transfers.
type TransferQueue struct {
meter *ProgressMeter
workers int // Number of transfer workers to spawn
transferKind string
errors []error
transferables map[string]Transferable
batcher *Batcher
apic chan Transferable // Channel for processing individual API requests
transferc chan Transferable // Channel for processing transfers
errorc chan error // Channel for processing errors
watchers []chan string
wait sync.WaitGroup
retries map[string]interface{}
retrylock sync.Mutex
}
// newTransferQueue builds a TransferQueue, allowing `workers` concurrent transfers.
func newTransferQueue(files int, size int64, dryRun bool) *TransferQueue {
q := &TransferQueue{
meter: NewProgressMeter(files, size, dryRun),
apic: make(chan Transferable, batchSize),
transferc: make(chan Transferable, batchSize),
errorc: make(chan error),
workers: Config.ConcurrentTransfers(),
transferables: make(map[string]Transferable),
retries: make(map[string]interface{}),
}
q.run()
return q
}
// Add adds a Transferable to the transfer queue.
func (q *TransferQueue) Add(t Transferable) {
q.wait.Add(1)
q.transferables[t.Oid()] = t
if q.batcher != nil {
q.batcher.Add(t)
return
}
q.apic <- t
}
// Wait waits for the queue to finish processing all transfers
func (q *TransferQueue) Wait() {
q.wait.Wait()
if q.batcher != nil {
q.batcher.Exit()
}
close(q.apic)
close(q.transferc)
close(q.errorc)
for _, watcher := range q.watchers {
close(watcher)
}
q.meter.Finish()
}
// Watch returns a channel where the queue will write the OID of each transfer
// as it completes. The channel will be closed when the queue finishes processing.
func (q *TransferQueue) Watch() chan string {
c := make(chan string, batchSize)
q.watchers = append(q.watchers, c)
return c
}
// individualApiRoutine processes the queue of transfers one at a time by making
// a POST call for each object, feeding the results to the transfer workers.
// If configured, the object transfers can still happen concurrently, the
// sequential nature here is only for the meta POST calls.
func (q *TransferQueue) individualApiRoutine(apiWaiter chan interface{}) {
for t := range q.apic {
obj, err := t.Check()
if err != nil {
if q.canRetry(err, t.Oid()) {
q.Add(t)
} else {
q.errorc <- err
}
q.wait.Done()
continue
}
if apiWaiter != nil { // Signal to launch more individual api workers
q.meter.Start()
select {
case apiWaiter <- 1:
default:
}
}
if obj != nil {
t.SetObject(obj)
q.meter.Add(t.Name())
q.transferc <- t
} else {
q.meter.Skip(t.Size())
q.wait.Done()
}
}
}
// legacyFallback is used when a batch request is made to a server that does
// not support the batch endpoint. When this happens, the Transferables are
// fed from the batcher into apic to be processed individually.
func (q *TransferQueue) legacyFallback(failedBatch []Transferable) {
tracerx.Printf("tq: batch api not implemented, falling back to individual")
q.launchIndividualApiRoutines()
for _, t := range failedBatch {
q.apic <- t
}
for {
batch := q.batcher.Next()
if batch == nil {
break
}
for _, t := range batch {
q.apic <- t
}
}
}
// batchApiRoutine processes the queue of transfers using the batch endpoint,
// making only one POST call for all objects. The results are then handed
// off to the transfer workers.
func (q *TransferQueue) batchApiRoutine() {
var startProgress sync.Once
batchRetries := 0
for {
batch := q.batcher.Next()
if batch == nil {
break
}
tracerx.Printf("tq: sending batch of size %d", len(batch))
transfers := make([]*objectResource, 0, len(batch))
for _, t := range batch {
transfers = append(transfers, &objectResource{Oid: t.Oid(), Size: t.Size()})
}
objects, err := Batch(transfers, q.transferKind)
if err != nil {
if IsNotImplementedError(err) {
configFile := filepath.Join(LocalGitDir, "config")
git.Config.SetLocal(configFile, "lfs.batch", "false")
go q.legacyFallback(batch)
return
}
// Batch operation retries should be caused by network issues. We want to
// retry these failures, but limit it to maxBatchRetries total retries,
// otherwise a serious network issue could cause an infinite loop of
// retried calls.
if IsRetriableError(err) && batchRetries <= maxBatchRetries {
batchRetries++
tracerx.Printf("tq: resubmitting batch: %s", err)
for _, t := range batch {
q.Add(t)
}
} else {
q.errorc <- err
}
q.wait.Add(-len(transfers))
continue
}
startProgress.Do(q.meter.Start)
for _, o := range objects {
if _, ok := o.Rel(q.transferKind); ok {
// This object has an error
if o.Error != nil {
q.errorc <- Error(o.Error)
q.meter.Skip(o.Size)
q.wait.Done()
continue
}
// This object needs to be transferred
if transfer, ok := q.transferables[o.Oid]; ok {
transfer.SetObject(o)
q.meter.Add(transfer.Name())
q.transferc <- transfer
} else {
q.meter.Skip(transfer.Size())
q.wait.Done()
}
} else {
q.meter.Skip(o.Size)
q.wait.Done()
}
}
}
}
// This goroutine collects errors returned from transfers
func (q *TransferQueue) errorCollector() {
for err := range q.errorc {
q.errors = append(q.errors, err)
}
}
func (q *TransferQueue) transferWorker() {
for transfer := range q.transferc {
cb := func(total, read int64, current int) error {
q.meter.TransferBytes(q.transferKind, transfer.Name(), read, total, current)
return nil
}
if err := transfer.Transfer(cb); err != nil {
if q.canRetry(err, transfer.Oid()) {
tracerx.Printf("tq: retrying object %s", transfer.Oid())
q.Add(transfer)
} else {
q.errorc <- err
}
} else {
oid := transfer.Oid()
for _, c := range q.watchers {
c <- oid
}
}
q.meter.FinishTransfer(transfer.Name())
q.wait.Done()
}
}
// launchIndividualApiRoutines first launches a single api worker. When it
// receives the first successful api request it launches workers - 1 more
// workers. This prevents being prompted for credentials multiple times at once
// when they're needed.
func (q *TransferQueue) launchIndividualApiRoutines() {
go func() {
apiWaiter := make(chan interface{})
go q.individualApiRoutine(apiWaiter)
<-apiWaiter
for i := 0; i < q.workers-1; i++ {
go q.individualApiRoutine(nil)
}
}()
}
// run starts the transfer queue, doing individual or batch transfers depending
// on the Config.BatchTransfer() value. run will transfer files sequentially or
// concurrently depending on the Config.ConcurrentTransfers() value.
func (q *TransferQueue) run() {
go q.errorCollector()
tracerx.Printf("tq: starting %d transfer workers", q.workers)
for i := 0; i < q.workers; i++ {
go q.transferWorker()
}
if Config.BatchTransfer() {
tracerx.Printf("tq: running as batched queue, batch size of %d", batchSize)
q.batcher = NewBatcher(batchSize)
go q.batchApiRoutine()
} else {
tracerx.Printf("tq: running as individual queue")
q.launchIndividualApiRoutines()
}
}
func (q *TransferQueue) canRetry(err error, id string) bool {
if !IsRetriableError(err) {
return false
}
defer q.retrylock.Unlock()
q.retrylock.Lock()
if _, ok := q.retries[id]; ok {
// Already retried it
return false
}
q.retries[id] = struct{}{}
return true
}
// Errors returns any errors encountered during transfer.
func (q *TransferQueue) Errors() []error {
return q.errors
}