Add integration tests; check other places where 429 could occur
This commit is contained in:
parent
2197f76dde
commit
4fe0a8db4f
@ -3,6 +3,7 @@ package errors
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@ -355,18 +356,25 @@ type retriableLaterError struct {
|
||||
timeAvailable time.Time
|
||||
}
|
||||
|
||||
func NewRetriableLaterErrorFromDelay(err error, retryAfter int) error {
|
||||
return retriableLaterError{
|
||||
wrappedError: newWrappedError(err, ""),
|
||||
timeAvailable: time.Now().Add(time.Duration(retryAfter) * time.Second),
|
||||
func NewRetriableLaterError(err error, header string) error {
|
||||
secs, err := strconv.Atoi(header)
|
||||
if err == nil {
|
||||
return retriableLaterError{
|
||||
wrappedError: newWrappedError(err, ""),
|
||||
timeAvailable: time.Now().Add(time.Duration(secs) * time.Second),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func NewRetriableLaterErrorFromTime(err error, timeAvailable time.Time) error {
|
||||
return retriableLaterError{
|
||||
wrappedError: newWrappedError(err, ""),
|
||||
timeAvailable: timeAvailable,
|
||||
time, err := time.Parse(time.RFC1123, header)
|
||||
if err == nil {
|
||||
return retriableLaterError{
|
||||
wrappedError: newWrappedError(err, ""),
|
||||
timeAvailable: time,
|
||||
}
|
||||
}
|
||||
|
||||
// We could not return a successful error from the Retry-After header.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e retriableLaterError) RetriableLaterError() (time.Time, bool) {
|
||||
|
@ -3,9 +3,7 @@ package lfshttp
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/git-lfs/git-lfs/errors"
|
||||
)
|
||||
@ -67,16 +65,9 @@ func (c *Client) handleResponse(res *http.Response) error {
|
||||
if res.StatusCode == 429 {
|
||||
// The Retry-After header could be set, check to see if it exists.
|
||||
h := res.Header.Get("Retry-After")
|
||||
if h != "" {
|
||||
retryAfter, err := strconv.Atoi(h)
|
||||
if err == nil {
|
||||
return errors.NewRetriableLaterErrorFromDelay(err, retryAfter)
|
||||
}
|
||||
|
||||
date, err := time.Parse(time.RFC1123, h)
|
||||
if err == nil {
|
||||
return errors.NewRetriableLaterErrorFromTime(err, date)
|
||||
}
|
||||
retLaterErr := errors.NewRetriableLaterError(err, h)
|
||||
if retLaterErr != nil {
|
||||
return retLaterErr
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ var (
|
||||
"status-batch-403", "status-batch-404", "status-batch-410", "status-batch-422", "status-batch-500",
|
||||
"status-storage-403", "status-storage-404", "status-storage-410", "status-storage-422", "status-storage-500", "status-storage-503",
|
||||
"status-batch-resume-206", "batch-resume-fail-fallback", "return-expired-action", "return-expired-action-forever", "return-invalid-size",
|
||||
"object-authenticated", "storage-download-retry", "storage-upload-retry", "unknown-oid",
|
||||
"object-authenticated", "storage-download-retry", "storage-upload-retry", "storage-upload-retry-later", "unknown-oid",
|
||||
"send-verify-action", "send-deprecated-links",
|
||||
}
|
||||
)
|
||||
@ -242,6 +242,50 @@ func lfsUrl(repo, oid string) string {
|
||||
return server.URL + "/storage/" + oid + "?r=" + repo
|
||||
}
|
||||
|
||||
const (
|
||||
secondsToRefillTokens = 10
|
||||
refillTokenCount = 5
|
||||
)
|
||||
|
||||
var (
|
||||
requestTokens = make(map[string]int)
|
||||
retryStartTimes = make(map[string]time.Time)
|
||||
laterRetriesMu sync.Mutex
|
||||
)
|
||||
|
||||
// checkRateLimit tracks the various requests to the git-server. If it is the first
|
||||
// request of its kind, then a times is started, that when it is finished, a certain
|
||||
// number of requests become available.
|
||||
func checkRateLimit(api, direction, repo, oid string) (seconds int, isWait bool) {
|
||||
laterRetriesMu.Lock()
|
||||
defer laterRetriesMu.Unlock()
|
||||
key := strings.Join([]string{direction, repo, oid}, ":")
|
||||
if requestsRemaining, ok := requestTokens[key]; !ok || requestsRemaining == 0 {
|
||||
if retryStartTimes[key] == (time.Time{}) {
|
||||
// If time is not initialized, set it to now
|
||||
retryStartTimes[key] = time.Now()
|
||||
}
|
||||
// The user is not allowed to make a request now and must wait for the required
|
||||
// time to pass.
|
||||
secsPassed := time.Since(retryStartTimes[key]).Seconds()
|
||||
if secsPassed >= float64(secondsToRefillTokens) {
|
||||
// The required time has passed.
|
||||
requestTokens[key] = refillTokenCount
|
||||
return 0, false
|
||||
}
|
||||
return secondsToRefillTokens - int(secsPassed) + 1, true
|
||||
}
|
||||
|
||||
requestTokens[key]--
|
||||
|
||||
// Tokens are now over, record time.
|
||||
if requestTokens[key] == 0 {
|
||||
retryStartTimes[key] = time.Now()
|
||||
}
|
||||
|
||||
return 0, false
|
||||
}
|
||||
|
||||
var (
|
||||
retries = make(map[string]uint32)
|
||||
retriesMu sync.Mutex
|
||||
@ -621,6 +665,15 @@ func storageHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte("malformed content"))
|
||||
|
||||
return
|
||||
}
|
||||
case "storage-upload-retry-later":
|
||||
if timeLeft, isWaiting := checkRateLimit("storage", "upload", repo, oid); isWaiting {
|
||||
w.Header().Set("Retry-After", strconv.Itoa(timeLeft))
|
||||
w.WriteHeader(http.StatusTooManyRequests)
|
||||
|
||||
w.Write([]byte("rate limit reached"))
|
||||
fmt.Println("Setting header to: ", strconv.Itoa(timeLeft))
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -658,7 +711,14 @@ func storageHandler(w http.ResponseWriter, r *http.Request) {
|
||||
resumeAt := int64(0)
|
||||
|
||||
if by, ok := largeObjects.Get(repo, oid); ok {
|
||||
if len(by) == len("storage-download-retry") && string(by) == "storage-download-retry" {
|
||||
if len(by) == len("storage-download-retry-later") && string(by) == "storage-download-retry-later" {
|
||||
if secsToWait, wait := checkRateLimit("storage", "download", repo, oid); wait {
|
||||
statusCode = http.StatusTooManyRequests
|
||||
w.Header().Set("Retry-After", strconv.Itoa(secsToWait))
|
||||
by = []byte("rate limit reached")
|
||||
fmt.Println("Setting header to: ", strconv.Itoa(secsToWait))
|
||||
}
|
||||
} else if len(by) == len("storage-download-retry") && string(by) == "storage-download-retry" {
|
||||
if retries, ok := incrementRetriesFor("storage", "download", repo, oid, false); ok && retries < 3 {
|
||||
statusCode = 500
|
||||
by = []byte("malformed content")
|
||||
|
70
t/t-batch-retries-ratelimit.sh
Normal file
70
t/t-batch-retries-ratelimit.sh
Normal file
@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
. "$(dirname "$0")/testlib.sh"
|
||||
|
||||
begin_test "batch storage upload causes retries"
|
||||
(
|
||||
set -e
|
||||
|
||||
reponame="batch-storage-upload-retry-later"
|
||||
setup_remote_repo "$reponame"
|
||||
clone_repo "$reponame" batch-storage-repo-upload
|
||||
|
||||
contents="storage-upload-retry-later"
|
||||
oid="$(calc_oid "$contents")"
|
||||
printf "%s" "$contents" > a.dat
|
||||
|
||||
git lfs track "*.dat"
|
||||
git add .gitattributes a.dat
|
||||
git commit -m "initial commit"
|
||||
|
||||
GIT_TRACE=1 git push origin master 2>&1 | tee push.log
|
||||
if [ "0" -ne "${PIPESTATUS[0]}" ]; then
|
||||
echo >&2 "fatal: expected \`git push origin master\` to succeed ..."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
assert_server_object "$reponame" "$oid"
|
||||
)
|
||||
end_test
|
||||
|
||||
begin_test "batch storage download causes retries"
|
||||
(
|
||||
set -e
|
||||
|
||||
reponame="batch-storage-download-retry-later"
|
||||
setup_remote_repo "$reponame"
|
||||
clone_repo "$reponame" batch-storage-repo-download
|
||||
|
||||
contents="storage-download-retry-later"
|
||||
oid="$(calc_oid "$contents")"
|
||||
printf "%s" "$contents" > a.dat
|
||||
|
||||
git lfs track "*.dat"
|
||||
git add .gitattributes a.dat
|
||||
git commit -m "initial commit"
|
||||
|
||||
git push origin master
|
||||
assert_server_object "$reponame" "$oid"
|
||||
|
||||
pushd ..
|
||||
git \
|
||||
-c "filter.lfs.process=" \
|
||||
-c "filter.lfs.smudge=cat" \
|
||||
-c "filter.lfs.required=false" \
|
||||
clone "$GITSERVER/$reponame" "$reponame-assert"
|
||||
|
||||
cd "$reponame-assert"
|
||||
|
||||
git config credential.helper lfstest
|
||||
|
||||
GIT_TRACE=1 git lfs pull origin master 2>&1 | tee pull.log
|
||||
if [ "0" -ne "${PIPESTATUS[0]}" ]; then
|
||||
echo >&2 "fatal: expected \`git lfs pull origin master\` to succeed ..."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
assert_local_object "$oid" "${#contents}"
|
||||
popd
|
||||
)
|
||||
end_test
|
@ -119,6 +119,15 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
|
||||
os.Remove(dlFile.Name())
|
||||
return a.download(t, cb, authOkFunc, nil, 0, nil)
|
||||
}
|
||||
|
||||
// Special-cae status code 429 - retry after certain time
|
||||
if res.StatusCode == 429 {
|
||||
retLaterErr := errors.NewRetriableLaterError(err, res.Header["Retry-After"][0])
|
||||
if retLaterErr != nil {
|
||||
return retLaterErr
|
||||
}
|
||||
}
|
||||
|
||||
return errors.NewRetriableError(err)
|
||||
}
|
||||
|
||||
|
@ -123,6 +123,12 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
|
||||
err = errors.Wrap(err, perr.Error())
|
||||
}
|
||||
|
||||
if res.StatusCode == 429 {
|
||||
retLaterErr := errors.NewRetriableLaterError(err, res.Header["Retry-After"][0])
|
||||
if retLaterErr != nil {
|
||||
return retLaterErr
|
||||
}
|
||||
}
|
||||
return errors.NewRetriableError(err)
|
||||
}
|
||||
|
||||
|
@ -674,23 +674,7 @@ func (q *TransferQueue) handleTransferResult(
|
||||
if res.Error != nil {
|
||||
// If there was an error encountered when processing the
|
||||
// transfer (res.Transfer), handle the error as is appropriate:
|
||||
|
||||
if q.canRetryObject(oid, res.Error) {
|
||||
// If the object can be retried, send it on the retries
|
||||
// channel, where it will be read at the call-site and
|
||||
// its retry count will be incremented.
|
||||
tracerx.Printf("tq: retrying object %s: %s", oid, res.Error)
|
||||
|
||||
q.trMutex.Lock()
|
||||
objects, ok := q.transfers[oid]
|
||||
q.trMutex.Unlock()
|
||||
|
||||
if ok {
|
||||
retries <- objects.First()
|
||||
} else {
|
||||
q.errorc <- res.Error
|
||||
}
|
||||
} else if readyTime, canRetry := q.canRetryObjectLater(oid, res.Error); canRetry {
|
||||
if readyTime, canRetry := q.canRetryObjectLater(oid, res.Error); canRetry {
|
||||
// If the object can't be retried now, but can be
|
||||
// after a certain period of time, send it to
|
||||
// the retry channel with a time when it's ready.
|
||||
@ -706,6 +690,21 @@ func (q *TransferQueue) handleTransferResult(
|
||||
} else {
|
||||
q.errorc <- res.Error
|
||||
}
|
||||
} else if q.canRetryObject(oid, res.Error) {
|
||||
// If the object can be retried, send it on the retries
|
||||
// channel, where it will be read at the call-site and
|
||||
// its retry count will be incremented.
|
||||
tracerx.Printf("tq: retrying object %s: %s", oid, res.Error)
|
||||
|
||||
q.trMutex.Lock()
|
||||
objects, ok := q.transfers[oid]
|
||||
q.trMutex.Unlock()
|
||||
|
||||
if ok {
|
||||
retries <- objects.First()
|
||||
} else {
|
||||
q.errorc <- res.Error
|
||||
}
|
||||
} else {
|
||||
// If the error wasn't retriable, OR the object has
|
||||
// exceeded its retry budget, it will be NOT be sent to
|
||||
|
Loading…
Reference in New Issue
Block a user