lfs/tq: extract RetryCounter
This commit is contained in:
parent
06b4c1463d
commit
b5cba63f36
@ -28,6 +28,68 @@ type Transferable interface {
|
||||
LegacyCheck() (*api.ObjectResource, error)
|
||||
}
|
||||
|
||||
type retryCounter struct {
|
||||
// MaxRetries is the maximum number of retries a single object can
|
||||
// attempt to make before it will be dropped.
|
||||
MaxRetries int `git:"lfs.transfer.maxretries"`
|
||||
|
||||
// cmu guards count
|
||||
cmu sync.Mutex
|
||||
// count maps OIDs to number of retry attempts
|
||||
count map[string]int
|
||||
}
|
||||
|
||||
// newRetryCounter instantiates a new *retryCounter. It parses the gitconfig
|
||||
// value: `lfs.transfer.maxretries`, and falls back to defaultMaxRetries if none
|
||||
// was provided.
|
||||
//
|
||||
// If it encountered an error in Unmarshaling the *config.Configuration, it will
|
||||
// be returned, otherwise nil.
|
||||
func newRetryCounter(cfg *config.Configuration) *retryCounter {
|
||||
rc := &retryCounter{
|
||||
MaxRetries: defaultMaxRetries,
|
||||
|
||||
count: make(map[string]int),
|
||||
}
|
||||
|
||||
if err := cfg.Unmarshal(rc); err != nil {
|
||||
tracerx.Printf("rc: error parsing config, falling back to default values...: %v", err)
|
||||
rc.MaxRetries = 1
|
||||
}
|
||||
|
||||
if rc.MaxRetries < 1 {
|
||||
tracerx.Printf("rc: invalid retry count: %d, defaulting to %d", rc.MaxRetries, 1)
|
||||
rc.MaxRetries = 1
|
||||
}
|
||||
|
||||
return rc
|
||||
}
|
||||
|
||||
// Increment increments the number of retries for a given OID. It is safe to
|
||||
// call across multiple goroutines.
|
||||
func (r *retryCounter) Increment(oid string) {
|
||||
r.cmu.Lock()
|
||||
defer r.cmu.Unlock()
|
||||
|
||||
r.count[oid]++
|
||||
}
|
||||
|
||||
// CountFor returns the current number of retries for a given OID. It is safe to
|
||||
// call across multiple goroutines.
|
||||
func (r *retryCounter) CountFor(oid string) int {
|
||||
r.cmu.Lock()
|
||||
defer r.cmu.Unlock()
|
||||
|
||||
return r.count[oid]
|
||||
}
|
||||
|
||||
// CanRetry returns the current number of retries, and whether or not it exceeds
|
||||
// the maximum number of retries (see: retryCounter.MaxRetries).
|
||||
func (r *retryCounter) CanRetry(oid string) (int, bool) {
|
||||
count := r.CountFor(oid)
|
||||
return count, count < r.MaxRetries
|
||||
}
|
||||
|
||||
// TransferQueue organises the wider process of uploading and downloading,
|
||||
// including calling the API, passing the actual transfer request to transfer
|
||||
// adapters, and dealing with progress, errors and retries.
|
||||
@ -55,16 +117,14 @@ type TransferQueue struct {
|
||||
wait sync.WaitGroup
|
||||
oldApiWorkers int // Number of non-batch API workers to spawn (deprecated)
|
||||
manifest *transfer.Manifest
|
||||
rmu sync.Mutex // rmu guards retryCount
|
||||
retryCount map[string]uint32 // maps OIDs to number of retry attempts
|
||||
// maxRetries is the maximum number of retries a single object can
|
||||
// attempt to make before it will be dropped.
|
||||
maxRetries uint32
|
||||
rc *retryCounter
|
||||
}
|
||||
|
||||
// newTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
|
||||
func newTransferQueue(files int, size int64, dryRun bool, dir transfer.Direction) *TransferQueue {
|
||||
logPath, _ := config.Config.Os.Get("GIT_LFS_PROGRESS")
|
||||
cfg := config.Config
|
||||
|
||||
logPath, _ := cfg.Os.Get("GIT_LFS_PROGRESS")
|
||||
|
||||
q := &TransferQueue{
|
||||
direction: dir,
|
||||
@ -77,8 +137,7 @@ func newTransferQueue(files int, size int64, dryRun bool, dir transfer.Direction
|
||||
transferables: make(map[string]Transferable),
|
||||
trMutex: &sync.Mutex{},
|
||||
manifest: transfer.ConfigureManifest(transfer.NewManifest(), config.Config),
|
||||
retryCount: make(map[string]uint32),
|
||||
maxRetries: defaultMaxRetries,
|
||||
rc: newRetryCounter(cfg),
|
||||
}
|
||||
|
||||
q.errorwait.Add(1)
|
||||
@ -432,10 +491,8 @@ func (q *TransferQueue) errorCollector() {
|
||||
// retryCollector runs in its own goroutine.
|
||||
func (q *TransferQueue) retryCollector() {
|
||||
for t := range q.retriesc {
|
||||
q.rmu.Lock()
|
||||
q.retryCount[t.Oid()]++
|
||||
count := q.retryCount[t.Oid()]
|
||||
q.rmu.Unlock()
|
||||
q.rc.Increment(t.Oid())
|
||||
count := q.rc.CountFor(t.Oid())
|
||||
|
||||
tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid(), t.Size())
|
||||
|
||||
@ -496,11 +553,7 @@ func (q *TransferQueue) canRetry(err error) bool {
|
||||
// able to be retried again. If so, canRetryObject returns whether or not that
|
||||
// given error "err" is retriable.
|
||||
func (q *TransferQueue) canRetryObject(oid string, err error) bool {
|
||||
q.rmu.Lock()
|
||||
count := q.retryCount[oid]
|
||||
q.rmu.Unlock()
|
||||
|
||||
if count > q.maxRetries {
|
||||
if count, ok := q.rc.CanRetry(oid); !ok {
|
||||
tracerx.Printf("tq: refusing to retry %q, too many retries (%d)", oid, count)
|
||||
return false
|
||||
}
|
||||
|
62
lfs/transfer_queue_test.go
Normal file
62
lfs/transfer_queue_test.go
Normal file
@ -0,0 +1,62 @@
|
||||
package lfs
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/github/git-lfs/config"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestRetryCounterDefaultsToFixedRetries(t *testing.T) {
|
||||
rc := newRetryCounter(config.NewFrom(config.Values{}))
|
||||
|
||||
assert.Equal(t, 1, rc.MaxRetries)
|
||||
}
|
||||
|
||||
func TestRetryCounterIsConfigurable(t *testing.T) {
|
||||
rc := newRetryCounter(config.NewFrom(config.Values{
|
||||
Git: map[string]string{
|
||||
"lfs.transfer.maxretries": "3",
|
||||
},
|
||||
}))
|
||||
|
||||
assert.Equal(t, 3, rc.MaxRetries)
|
||||
}
|
||||
|
||||
func TestRetryCounterClampsValidValues(t *testing.T) {
|
||||
rc := newRetryCounter(config.NewFrom(config.Values{
|
||||
Git: map[string]string{
|
||||
"lfs.transfer.maxretries": "-1",
|
||||
},
|
||||
}))
|
||||
|
||||
assert.Equal(t, 1, rc.MaxRetries)
|
||||
}
|
||||
|
||||
func TestRetryCounterIgnoresNonInts(t *testing.T) {
|
||||
rc := newRetryCounter(config.NewFrom(config.Values{
|
||||
Git: map[string]string{
|
||||
"lfs.transfer.maxretries": "not_an_int",
|
||||
},
|
||||
}))
|
||||
|
||||
assert.Equal(t, 1, rc.MaxRetries)
|
||||
}
|
||||
|
||||
func TestRetryCounterIncrementsObjects(t *testing.T) {
|
||||
rc := newRetryCounter(config.NewFrom(config.Values{}))
|
||||
|
||||
rc.Increment("oid")
|
||||
|
||||
assert.Equal(t, 1, rc.CountFor("oid"))
|
||||
}
|
||||
|
||||
func TestRetryCounterCanNotRetryAfterExceedingRetryCount(t *testing.T) {
|
||||
rc := newRetryCounter(config.NewFrom(config.Values{}))
|
||||
|
||||
rc.Increment("oid")
|
||||
count, canRetry := rc.CanRetry("oid")
|
||||
|
||||
assert.Equal(t, 1, count)
|
||||
assert.False(t, canRetry)
|
||||
}
|
Loading…
Reference in New Issue
Block a user