Merge pull request #1535 from github/multiple-retries

lfs/transfer_queue: support multiple retries per object
This commit is contained in:
Taylor Blau 2016-09-22 13:22:05 -06:00 committed by GitHub
commit fae68106c9

@ -2,7 +2,6 @@ package lfs
import (
"sync"
"sync/atomic"
"github.com/github/git-lfs/api"
"github.com/github/git-lfs/config"
@ -14,7 +13,8 @@ import (
)
const (
batchSize = 100
batchSize = 100
defaultMaxRetries = 1
)
type Transferable interface {
@ -30,7 +30,7 @@ type Transferable interface {
// TransferQueue organises the wider process of uploading and downloading,
// including calling the API, passing the actual transfer request to transfer
// adapters, and dealing with progress, errors and retries
// adapters, and dealing with progress, errors and retries.
type TransferQueue struct {
direction transfer.Direction
adapter transfer.TransferAdapter
@ -38,11 +38,9 @@ type TransferQueue struct {
adapterResultChan chan transfer.TransferResult
adapterInitMutex sync.Mutex
dryRun bool
retrying uint32
meter *progress.ProgressMeter
errors []error
transferables map[string]Transferable
retries []Transferable
batcher *Batcher
apic chan Transferable // Channel for processing individual API requests
retriesc chan Transferable // Channel for processing retries
@ -51,9 +49,17 @@ type TransferQueue struct {
trMutex *sync.Mutex
errorwait sync.WaitGroup
retrywait sync.WaitGroup
wait sync.WaitGroup // Incremented on Add(), decremented on transfer complete or skip
oldApiWorkers int // Number of non-batch API workers to spawn (deprecated)
manifest *transfer.Manifest
// wait is used to keep track of pending transfers. It is incremented
// once per unique OID on Add(), and is decremented when that transfer
// is marked as completed or failed, but not retried.
wait sync.WaitGroup
oldApiWorkers int // Number of non-batch API workers to spawn (deprecated)
manifest *transfer.Manifest
rmu sync.Mutex // rmu guards retryCount
retryCount map[string]uint32 // maps OIDs to number of retry attempts
// maxRetries is the maximum number of retries a single object can
// attempt to make before it will be dropped.
maxRetries uint32
}
// newTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
@ -71,6 +77,8 @@ func newTransferQueue(files int, size int64, dryRun bool, dir transfer.Direction
transferables: make(map[string]Transferable),
trMutex: &sync.Mutex{},
manifest: transfer.ConfigureManifest(transfer.NewManifest(), config.Config),
retryCount: make(map[string]uint32),
maxRetries: defaultMaxRetries,
}
q.errorwait.Add(1)
@ -81,11 +89,14 @@ func newTransferQueue(files int, size int64, dryRun bool, dir transfer.Direction
return q
}
// Add adds a Transferable to the transfer queue.
// Add adds a Transferable to the transfer queue. It only increments the amount
// of waiting the TransferQueue has to do if the Transferable "t" is new.
func (q *TransferQueue) Add(t Transferable) {
q.wait.Add(1)
q.trMutex.Lock()
q.transferables[t.Oid()] = t
if _, ok := q.transferables[t.Oid()]; !ok {
q.wait.Add(1)
q.transferables[t.Oid()] = t
}
q.trMutex.Unlock()
if q.batcher != nil {
@ -187,12 +198,24 @@ func (q *TransferQueue) ensureAdapterBegun() error {
return nil
}
// handleTransferResult is responsible for dealing with the result of a
// successful or failed transfer.
//
// If there was an error assosicated with the given transfer, "res.Error", and
// it is retriable (see: `q.canRetryObject`), it will be placed in the next
// batch and be retried. If that error is not retriable for any reason, the
// transfer will be marked as having failed, and the error will be reported.
//
// If the transfer was successful, the watchers of this transfer queue will be
// notified, and the transfer will be marked as having been completed.
func (q *TransferQueue) handleTransferResult(res transfer.TransferResult) {
oid := res.Transfer.Object.Oid
if res.Error != nil {
if q.canRetry(res.Error) {
tracerx.Printf("tq: retrying object %s", res.Transfer.Object.Oid)
if q.canRetryObject(oid, res.Error) {
tracerx.Printf("tq: retrying object %s", oid)
q.trMutex.Lock()
t, ok := q.transferables[res.Transfer.Object.Oid]
t, ok := q.transferables[oid]
q.trMutex.Unlock()
if ok {
q.retry(t)
@ -201,18 +224,16 @@ func (q *TransferQueue) handleTransferResult(res transfer.TransferResult) {
}
} else {
q.errorc <- res.Error
q.wait.Done()
}
} else {
oid := res.Transfer.Object.Oid
for _, c := range q.watchers {
c <- oid
}
q.meter.FinishTransfer(res.Transfer.Name)
q.wait.Done()
}
q.wait.Done()
}
// Wait waits for the queue to finish processing all transfers. Once Wait is
@ -228,20 +249,6 @@ func (q *TransferQueue) Wait() {
// Handle any retries
close(q.retriesc)
q.retrywait.Wait()
atomic.StoreUint32(&q.retrying, 1)
if len(q.retries) > 0 {
tracerx.Printf("tq: retrying %d failed transfers", len(q.retries))
for _, t := range q.retries {
q.Add(t)
}
if q.batcher != nil {
q.batcher.Exit()
}
q.wait.Wait()
}
atomic.StoreUint32(&q.retrying, 0)
close(q.apic)
q.finishAdapter()
@ -272,12 +279,12 @@ func (q *TransferQueue) individualApiRoutine(apiWaiter chan interface{}) {
for t := range q.apic {
obj, err := t.LegacyCheck()
if err != nil {
if q.canRetry(err) {
if q.canRetryObject(obj.Oid, err) {
q.retry(t)
} else {
q.errorc <- err
q.wait.Done()
}
q.wait.Done()
continue
}
@ -357,20 +364,22 @@ func (q *TransferQueue) batchApiRoutine() {
if err != nil {
if errors.IsNotImplementedError(err) {
git.Config.SetLocal("", "lfs.batch", "false")
go q.legacyFallback(batch)
return
}
if q.canRetry(err) {
for _, t := range batch {
q.retry(t.(Transferable))
var errOnce sync.Once
for _, o := range batch {
t := o.(Transferable)
if q.canRetryObject(t.Oid(), err) {
q.retry(t)
} else {
q.wait.Done()
errOnce.Do(func() { q.errorc <- err })
}
} else {
q.errorc <- err
}
q.wait.Add(-len(transfers))
continue
}
@ -415,9 +424,26 @@ func (q *TransferQueue) errorCollector() {
q.errorwait.Done()
}
// retryCollector collects objects to retry, increments the number of times that
// they have been retried, and then enqueues them in the next batch, or legacy
// API channel. If the transfer queue is using a batcher, the batch will be
// flushed immediately.
//
// retryCollector runs in its own goroutine.
func (q *TransferQueue) retryCollector() {
for t := range q.retriesc {
q.retries = append(q.retries, t)
q.rmu.Lock()
q.retryCount[t.Oid()]++
count := q.retryCount[t.Oid()]
q.rmu.Unlock()
tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid(), t.Size())
q.Add(t)
if q.batcher != nil {
tracerx.Printf("tq: flushing batch in response to retry #%d for %q", count, t.Oid(), t.Size())
q.batcher.Flush()
}
}
q.retrywait.Done()
}
@ -460,12 +486,26 @@ func (q *TransferQueue) retry(t Transferable) {
q.retriesc <- t
}
// canRetry returns whether or not the given error "err" is retriable.
func (q *TransferQueue) canRetry(err error) bool {
if !errors.IsRetriableError(err) || atomic.LoadUint32(&q.retrying) == 1 {
return errors.IsRetriableError(err)
}
// canRetryObject returns whether the given error is retriable for the object
// given by "oid". If the an OID has met its retry limit, then it will not be
// able to be retried again. If so, canRetryObject returns whether or not that
// given error "err" is retriable.
func (q *TransferQueue) canRetryObject(oid string, err error) bool {
q.rmu.Lock()
count := q.retryCount[oid]
q.rmu.Unlock()
if count > q.maxRetries {
tracerx.Printf("tq: refusing to retry %q, too many retries (%d)", oid, count)
return false
}
return true
return q.canRetry(err)
}
// Errors returns any errors encountered during transfer.