brian m. carlson eb83fcda24
Avoid deadlock when transfer queue fails
In 1412d6e4 ("Don't fail if we lack objects the server has",
2019-04-30), we changed the code to abort later if a missing object
occurs.  In doing so, we had to consider the case where the transfer
queue aborts early for some reason and ensure that the sync.WaitGroup
does not unnecessarily block due to outstanding objects never getting

However, the approach we used, which was to explicitly add the number of
items we skipped processing, was error prone and didn't cover all cases.
Notably, a DNS failure could randomly cause a hang during a push.  Solve
this by creating a class for a wait group which is abortable and simply
abort it if we encounter an error, preventing any deadlocks caused by
miscounting the number of items.
2019-09-09 17:06:59 +00:00

997 lines
27 KiB

package tq
import (
const (
defaultBatchSize = 100
type retryCounter struct {
MaxRetries int `git:"lfs.transfer.maxretries"`
// cmu guards count
cmu sync.Mutex
// count maps OIDs to number of retry attempts
count map[string]int
// newRetryCounter instantiates a new *retryCounter. It parses the gitconfig
// value: `lfs.transfer.maxretries`, and falls back to defaultMaxRetries if none
// was provided.
// If it encountered an error in Unmarshaling the *config.Configuration, it will
// be returned, otherwise nil.
func newRetryCounter() *retryCounter {
return &retryCounter{
MaxRetries: defaultMaxRetries,
count: make(map[string]int),
// Increment increments the number of retries for a given OID. It is safe to
// call across multiple goroutines.
func (r *retryCounter) Increment(oid string) {
defer r.cmu.Unlock()
// CountFor returns the current number of retries for a given OID. It is safe to
// call across multiple goroutines.
func (r *retryCounter) CountFor(oid string) int {
defer r.cmu.Unlock()
return r.count[oid]
// CanRetry returns the current number of retries, and whether or not it exceeds
// the maximum number of retries (see: retryCounter.MaxRetries).
func (r *retryCounter) CanRetry(oid string) (int, bool) {
count := r.CountFor(oid)
return count, count < r.MaxRetries
// batch implements the sort.Interface interface and enables sorting on a slice
// of `*Transfer`s by object size.
// This interface is implemented here so that the largest objects can be
// processed first. Since adding a new batch is unable to occur until the
// current batch has finished processing, this enables us to reduce the risk of
// a single worker getting tied up on a large item at the end of a batch while
// all other workers are sitting idle.
type batch []*objectTuple
// Concat concatenates two batches together, returning a single, clamped batch as
// "left", and the remainder of elements as "right". If the union of the
// receiver and "other" has cardinality less than "size", "right" will be
// returned as nil. Any object tuple that is not currently able to be retried
// (ie Retry-After response), will also go into the right batch. Also, when object(s)
// are returend that are rate-limited, return the minimum duration required to wait until
// a object is ready.
func (b batch) Concat(other batch, size int) (left, right batch, minWait time.Duration) {
u := batch(append(b, other...))
for _, ot := range u {
if time.Now().After(ot.ReadyTime) {
// The current time is past the time the object should
// be available.
left = append(left, ot)
} else {
// The time hasn't passed for the object.
right = append(right, ot)
wait := time.Until(ot.ReadyTime)
if minWait == 0 {
minWait = wait
} else if wait < minWait {
minWait = wait
if len(left) <= size {
// If the size of left fits the given size limit, return with no adjustments.
return left, right, minWait
// If left is too large, trip left up to size and append the rest to right.
right = append(right, left[size:]...)
left = left[:size]
return left, right, minWait
func (b batch) ToTransfers() []*Transfer {
transfers := make([]*Transfer, 0, len(b))
for _, t := range b {
transfers = append(transfers, &Transfer{Oid: t.Oid, Size: t.Size, Missing: t.Missing})
return transfers
func (b batch) Len() int { return len(b) }
func (b batch) Less(i, j int) bool { return b[i].Size < b[j].Size }
func (b batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
type abortableWaitGroup struct {
wq sync.WaitGroup
counter int
mu sync.Mutex
func newAbortableWaitGroup() *abortableWaitGroup {
return &abortableWaitGroup{}
func (q *abortableWaitGroup) Add(delta int) {
q.counter += delta
func (q *abortableWaitGroup) Done() {
q.counter -= 1
func (q *abortableWaitGroup) Abort() {
func (q *abortableWaitGroup) Wait() {
// TransferQueue organises the wider process of uploading and downloading,
// including calling the API, passing the actual transfer request to transfer
// adapters, and dealing with progress, errors and retries.
type TransferQueue struct {
direction Direction
client *tqClient
remote string
ref *git.Ref
adapter Adapter
adapterInProgress bool
adapterInitMutex sync.Mutex
dryRun bool
cb tools.CopyCallback
meter *Meter
errors []error
transfers map[string]*objects
batchSize int
bufferDepth int
incoming chan *objectTuple // Channel for processing incoming items
errorc chan error // Channel for processing errors
watchers []chan *Transfer
trMutex *sync.Mutex
collectorWait sync.WaitGroup
errorwait sync.WaitGroup
// wait is used to keep track of pending transfers. It is incremented
// once per unique OID on Add(), and is decremented when that transfer
// is marked as completed or failed, but not retried.
wait *abortableWaitGroup
manifest *Manifest
rc *retryCounter
// unsupportedContentType indicates whether the transfer queue ever saw
// an HTTP 422 response indicating that their upload destination does
// not support Content-Type detection.
unsupportedContentType bool
// objects holds a set of objects.
type objects struct {
completed bool
objects []*objectTuple
// All returns all *objectTuple's contained in the *objects set.
func (s *objects) All() []*objectTuple {
return s.objects
// Append returns a new *objects with the given *objectTuple(s) appended to the
// end of the known objects.
func (s *objects) Append(os ...*objectTuple) *objects {
return &objects{
completed: s.completed,
objects: append(s.objects, os...),
// First returns the first *objectTuple in the chain of objects.
func (s *objects) First() *objectTuple {
if len(s.objects) == 0 {
return nil
return s.objects[0]
type objectTuple struct {
Name, Path, Oid string
Size int64
Missing bool
ReadyTime time.Time
func (o *objectTuple) ToTransfer() *Transfer {
return &Transfer{
Name: o.Name,
Path: o.Path,
Oid: o.Oid,
Size: o.Size,
Missing: o.Missing,
type Option func(*TransferQueue)
func DryRun(dryRun bool) Option {
return func(tq *TransferQueue) {
tq.dryRun = dryRun
func WithProgress(m *Meter) Option {
return func(tq *TransferQueue) {
tq.meter = m
func RemoteRef(ref *git.Ref) Option {
return func(tq *TransferQueue) {
tq.ref = ref
func WithProgressCallback(cb tools.CopyCallback) Option {
return func(tq *TransferQueue) {
tq.cb = cb
func WithBatchSize(size int) Option {
return func(tq *TransferQueue) { tq.batchSize = size }
func WithBufferDepth(depth int) Option {
return func(tq *TransferQueue) { tq.bufferDepth = depth }
// NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue {
q := &TransferQueue{
direction: dir,
client: &tqClient{Client: manifest.APIClient()},
remote: remote,
errorc: make(chan error),
transfers: make(map[string]*objects),
trMutex: &sync.Mutex{},
manifest: manifest,
rc: newRetryCounter(),
wait: newAbortableWaitGroup(),
for _, opt := range options {
q.rc.MaxRetries = q.manifest.maxRetries
q.client.MaxRetries = q.manifest.maxRetries
if q.batchSize <= 0 {
q.batchSize = defaultBatchSize
if q.bufferDepth <= 0 {
q.bufferDepth = q.batchSize
if q.meter != nil {
q.meter.Direction = q.direction
q.incoming = make(chan *objectTuple, q.bufferDepth)
return q
// Add adds a *Transfer to the transfer queue. It only increments the amount
// of waiting the TransferQueue has to do if the *Transfer "t" is new.
// If another transfer(s) with the same OID has been added to the *TransferQueue
// already, the given transfer will not be enqueued, but will be sent to any
// channel created by Watch() once the oldest transfer has completed.
// Only one file will be transferred to/from the Path element of the first
// transfer.
func (q *TransferQueue) Add(name, path, oid string, size int64, missing bool) {
t := &objectTuple{
Name: name,
Path: path,
Oid: oid,
Size: size,
Missing: missing,
if objs := q.remember(t); len(objs.objects) > 1 {
if objs.completed {
// If there is already a completed transfer chain for
// this OID, then this object is already "done", and can
// be sent through as completed to the watchers.
for _, w := range q.watchers {
w <- t.ToTransfer()
// If the chain is not done, there is no reason to enqueue this
// transfer into 'q.incoming'.
tracerx.Printf("already transferring %q, skipping duplicate", t.Oid)
q.incoming <- t
// remember remembers the *Transfer "t" if the *TransferQueue doesn't already
// know about a Transfer with the same OID.
// It returns if the value is new or not.
func (q *TransferQueue) remember(t *objectTuple) objects {
defer q.trMutex.Unlock()
if _, ok := q.transfers[t.Oid]; !ok {
q.transfers[t.Oid] = &objects{
objects: []*objectTuple{t},
return *q.transfers[t.Oid]
q.transfers[t.Oid] = q.transfers[t.Oid].Append(t)
return *q.transfers[t.Oid]
// collectBatches collects batches in a loop, prioritizing failed items from the
// previous before adding new items. The process works as follows:
// 1. Create a new batch, of size `q.batchSize`, and containing no items
// 2. While the batch contains less items than `q.batchSize` AND the channel
// is open, read one item from the `q.incoming` channel.
// a. If the read was a channel close, go to step 4.
// b. If the read was a transferable item, go to step 3.
// 3. Append the item to the batch.
// 4. Sort the batch by descending object size, make a batch API call, send
// the items to the `*adapterBase`.
// 5. In a separate goroutine, process the worker results, incrementing and
// appending retries if possible. On the main goroutine, accept new items
// into "pending".
// 6. Concat() the "next" and "pending" batches such that no more items than
// the maximum allowed per batch are in next, and the rest are in pending.
// 7. If the `q.incoming` channel is open, go to step 2.
// 8. If the next batch is empty AND the `q.incoming` channel is closed,
// terminate immediately.
// collectBatches runs in its own goroutine.
func (q *TransferQueue) collectBatches() {
defer q.collectorWait.Done()
var closing bool
next := q.makeBatch()
pending := q.makeBatch()
for {
for !closing && (len(next) < q.batchSize) {
t, ok := <-q.incoming
if !ok {
closing = true
next = append(next, t)
// Before enqueuing the next batch, sort by descending object
// size.
done := make(chan struct{})
var retries batch
var err error
go func() {
defer close(done)
if len(next) == 0 {
retries, err = q.enqueueAndCollectRetriesFor(next)
if err != nil {
q.errorc <- err
var collected batch
collected, closing = q.collectPendingUntil(done)
// If we've encountered a serious error here, abort immediately;
// don't process further batches. Abort the wait queue so that
// we don't deadlock waiting for objects to complete when they
// never will.
if err != nil {
// Ensure the next batch is filled with, in order:
// - retries from the previous batch,
// - new additions that were enqueued behind retries, &
// - items collected while the batch was processing.
var minWaitTime time.Duration
next, pending, minWaitTime = retries.Concat(append(pending, collected...), q.batchSize)
if len(next) == 0 && len(pending) != 0 {
// There are some pending that cound not be queued.
// Wait the requested time before resuming loop.
} else if len(next) == 0 && len(pending) == 0 && closing {
// There are no items remaining, it is safe to break
// collectPendingUntil collects items from q.incoming into a "pending" batch
// until the given "done" channel is written to, or is closed.
// A "pending" batch is returned, along with whether or not "q.incoming" is
// closed.
func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) {
for {
select {
case t, ok := <-q.incoming:
if !ok {
closing = true
pending = append(pending, t)
case <-done:
// enqueueAndCollectRetriesFor makes a Batch API call and returns a "next" batch
// containing all of the objects that failed from the previous batch and had
// retries availale to them.
// If an error was encountered while making the API request, _all_ of the items
// from the previous batch (that have retries available to them) will be
// returned immediately, along with the error that was encountered.
// enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been
// processed.
func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) {
next := q.makeBatch()
tracerx.Printf("tq: sending batch of size %d", len(batch))
var bRes *BatchResponse
if q.manifest.standaloneTransferAgent != "" {
// Trust the external transfer agent can do everything by itself.
objects := make([]*Transfer, 0, len(batch))
for _, t := range batch {
objects = append(objects, &Transfer{Oid: t.Oid, Size: t.Size, Path: t.Path, Missing: t.Missing})
bRes = &BatchResponse{
Objects: objects,
TransferAdapterName: q.manifest.standaloneTransferAgent,
} else {
// Query the Git LFS server for what transfer method to use and
// details such as URLs, authentication, etc.
var err error
bRes, err = Batch(q.manifest, q.direction, q.remote, q.ref, batch.ToTransfers())
if err != nil {
// If there was an error making the batch API call, mark all of
// the objects for retry, and return them along with the error
// that was encountered. If any of the objects couldn't be
// retried, they will be marked as failed.
for _, t := range batch {
if q.canRetryObject(t.Oid, err) {
next = append(next, t)
} else if readyTime, canRetry := q.canRetryObjectLater(t.Oid, err); canRetry {
tracerx.Printf("tq: retrying object %s after %s seconds.", t.Oid, time.Until(readyTime).Seconds())
err = nil
t.ReadyTime = readyTime
next = append(next, t)
} else {
return next, err
if len(bRes.Objects) == 0 {
return next, nil
// We check first that all of the objects we want to upload are present,
// and abort if any are missing. We'll never have any objects marked as
// missing except possibly on upload, so just skip iterating over the
// objects in that case.
if q.direction == Upload {
for _, o := range bRes.Objects {
// If the server already has the object, the list of
// actions will be empty. It's fine if the file is
// missing in that case, since we don't need to upload
// it.
if o.Missing && len(o.Actions) != 0 {
return nil, errors.Errorf("Unable to find source for object %v (try running git lfs fetch --all)", o.Oid)
toTransfer := make([]*Transfer, 0, len(bRes.Objects))
for _, o := range bRes.Objects {
if o.Error != nil {
q.errorc <- errors.Wrapf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
objects, ok := q.transfers[o.Oid]
if !ok {
// If we couldn't find any associated
// Transfer object, then we give up on the
// transfer by telling the progress meter to
// skip the number of bytes in "o".
q.errorc <- errors.Errorf("[%v] The server returned an unknown OID.", o.Oid)
} else {
// Pick t[0], since it will cover all transfers with the
// same OID.
tr := newTransfer(o, objects.First().Name, objects.First().Path)
if a, err := tr.Rel(q.direction.String()); err != nil {
// XXX(taylor): duplication
if q.canRetryObject(tr.Oid, err) {
count := q.rc.CountFor(tr.Oid)
tracerx.Printf("tq: enqueue retry #%d for %q (size: %d): %s", count, tr.Oid, tr.Size, err)
next = append(next, objects.First())
} else {
q.errorc <- errors.Errorf("[%v] %v", tr.Name, err)
} else if a == nil && q.manifest.standaloneTransferAgent == "" {
} else {
toTransfer = append(toTransfer, tr)
retries := q.addToAdapter(bRes.endpoint, toTransfer)
for t := range retries {
count := q.rc.CountFor(t.Oid)
tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid, t.Size)
next = append(next, t)
return next, nil
// makeBatch returns a new, empty batch, with a capacity equal to the maximum
// batch size designated by the `*TransferQueue`.
func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) }
// addToAdapter adds the given "pending" transfers to the transfer adapters and
// returns a channel of Transfers that are to be retried in the next batch.
// After all of the items in the batch have been processed, the channel is
// closed.
// addToAdapter returns immediately, and does not block.
func (q *TransferQueue) addToAdapter(e lfshttp.Endpoint, pending []*Transfer) <-chan *objectTuple {
retries := make(chan *objectTuple, len(pending))
if err := q.ensureAdapterBegun(e); err != nil {
q.errorc <- err
for _, t := range pending {
return retries
present, missingResults := q.partitionTransfers(pending)
go func() {
defer close(retries)
var results <-chan TransferResult
if q.dryRun {
results = q.makeDryRunResults(present)
} else {
results = q.adapter.Add(present...)
for _, res := range missingResults {
q.handleTransferResult(res, retries)
for res := range results {
q.handleTransferResult(res, retries)
return retries
func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) {
if q.direction != Upload {
return transfers, nil
present = make([]*Transfer, 0, len(transfers))
results = make([]TransferResult, 0, len(transfers))
for _, t := range transfers {
var err error
if t.Size < 0 {
err = errors.Errorf("Git LFS: object %q has invalid size (got: %d)", t.Oid, t.Size)
} else {
fd, serr := os.Stat(t.Path)
if serr != nil {
if os.IsNotExist(serr) {
err = newObjectMissingError(t.Name, t.Oid)
} else {
err = serr
} else if t.Size != fd.Size() {
err = newCorruptObjectError(t.Name, t.Oid)
if err != nil {
results = append(results, TransferResult{
Transfer: t,
Error: err,
} else {
present = append(present, t)
// makeDryRunResults returns a channel populated immediately with "successful"
// results for all of the given transfers in "ts".
func (q *TransferQueue) makeDryRunResults(ts []*Transfer) <-chan TransferResult {
results := make(chan TransferResult, len(ts))
for _, t := range ts {
results <- TransferResult{t, nil}
return results
// handleTransferResult observes the transfer result, sending it on the retries
// channel if it was able to be retried.
func (q *TransferQueue) handleTransferResult(
res TransferResult, retries chan<- *objectTuple,
) {
oid := res.Transfer.Oid
if res.Error != nil {
// If there was an error encountered when processing the
// transfer (res.Transfer), handle the error as is appropriate:
if readyTime, canRetry := q.canRetryObjectLater(oid, res.Error); canRetry {
// If the object can't be retried now, but can be
// after a certain period of time, send it to
// the retry channel with a time when it's ready.
tracerx.Printf("tq: retrying object %s after %s seconds.", oid, time.Until(readyTime).Seconds())
objects, ok := q.transfers[oid]
if ok {
t := objects.First()
t.ReadyTime = readyTime
retries <- t
} else {
q.errorc <- res.Error
} else if q.canRetryObject(oid, res.Error) {
// If the object can be retried, send it on the retries
// channel, where it will be read at the call-site and
// its retry count will be incremented.
tracerx.Printf("tq: retrying object %s: %s", oid, res.Error)
objects, ok := q.transfers[oid]
if ok {
retries <- objects.First()
} else {
q.errorc <- res.Error
} else {
// If the error wasn't retriable, OR the object has
// exceeded its retry budget, it will be NOT be sent to
// the retry channel, and the error will be reported
// immediately (unless the error is in response to a
// HTTP 422).
if errors.IsUnprocessableEntityError(res.Error) {
q.unsupportedContentType = true
} else {
q.errorc <- res.Error
} else {
objects := q.transfers[oid]
objects.completed = true
// Otherwise, if the transfer was successful, notify all of the
// watchers, and mark it as finished.
for _, c := range q.watchers {
// Send one update for each transfer with the
// same OID.
for _, t := range objects.All() {
c <- &Transfer{
Name: t.Name,
Path: t.Path,
Oid: t.Oid,
Size: t.Size,
func (q *TransferQueue) useAdapter(name string) {
defer q.adapterInitMutex.Unlock()
if q.adapter != nil {
if q.adapter.Name() == name {
// re-use, this is the normal path
// If the adapter we're using isn't the same as the one we've been
// told to use now, must wait for the current one to finish then switch
// This will probably never happen but is just in case server starts
// changing adapter support in between batches
q.adapter = q.manifest.NewAdapterOrDefault(name, q.direction)
func (q *TransferQueue) finishAdapter() {
if q.adapterInProgress {
q.adapterInProgress = false
q.adapter = nil
// BatchSize returns the batch size of the receiving *TransferQueue, or, the
// number of transfers to accept before beginning work on them.
func (q *TransferQueue) BatchSize() int {
return q.batchSize
func (q *TransferQueue) Skip(size int64) {
func (q *TransferQueue) ensureAdapterBegun(e lfshttp.Endpoint) error {
defer q.adapterInitMutex.Unlock()
if q.adapterInProgress {
return nil
// Progress callback - receives byte updates
cb := func(name string, total, read int64, current int) error {
q.meter.TransferBytes(q.direction.String(), name, read, total, current)
if q.cb != nil {
// NOTE: this is the mechanism by which the logpath
// specified by GIT_LFS_PROGRESS is written to.
// See: lfs.downloadFile() for more.
q.cb(total, read, current)
return nil
tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name())
err := q.adapter.Begin(q.toAdapterCfg(e), cb)
if err != nil {
return err
q.adapterInProgress = true
return nil
func (q *TransferQueue) toAdapterCfg(e lfshttp.Endpoint) AdapterConfig {
apiClient := q.manifest.APIClient()
concurrency := q.manifest.ConcurrentTransfers()
access := apiClient.Endpoints.AccessFor(e.Url)
if access.Mode() == lfsapi.NTLMAccess {
concurrency = 1
return &adapterConfig{
concurrentTransfers: concurrency,
apiClient: apiClient,
remote: q.remote,
var (
// contentTypeWarning is the message printed when a server returns an
// HTTP 422 at the end of a push.
contentTypeWarning = []string{
"Uploading failed due to unsupported Content-Type header(s).",
"Consider disabling Content-Type detection with:",
" $ git config lfs.contenttype false",
// Wait waits for the queue to finish processing all transfers. Once Wait is
// called, Add will no longer add transfers to the queue. Any failed
// transfers will be automatically retried once.
func (q *TransferQueue) Wait() {
for _, watcher := range q.watchers {
if q.unsupportedContentType {
for _, line := range contentTypeWarning {
fmt.Fprintf(os.Stderr, "info: %s\n", line)
// Watch returns a channel where the queue will write the value of each transfer
// as it completes. If multiple transfers exist with the same OID, they will all
// be recorded here, even though only one actual transfer took place. The
// channel will be closed when the queue finishes processing.
func (q *TransferQueue) Watch() chan *Transfer {
c := make(chan *Transfer, q.batchSize)
q.watchers = append(q.watchers, c)
return c
// This goroutine collects errors returned from transfers
func (q *TransferQueue) errorCollector() {
for err := range q.errorc {
q.errors = append(q.errors, err)
// run begins the transfer queue. It transfers files sequentially or
// concurrently depending on the Config.ConcurrentTransfers() value.
func (q *TransferQueue) run() {
tracerx.Printf("tq: running as batched queue, batch size of %d", q.batchSize)
go q.errorCollector()
go q.collectBatches()
// canRetry returns whether or not the given error "err" is retriable.
func (q *TransferQueue) canRetry(err error) bool {
return errors.IsRetriableError(err)
// canRetryLater returns the number of seconds until an error can be retried and if the error
// is a delayed-retriable error.
func (q *TransferQueue) canRetryLater(err error) (time.Time, bool) {
return errors.IsRetriableLaterError(err)
// canRetryObject returns whether the given error is retriable for the object
// given by "oid". If the an OID has met its retry limit, then it will not be
// able to be retried again. If so, canRetryObject returns whether or not that
// given error "err" is retriable.
func (q *TransferQueue) canRetryObject(oid string, err error) bool {
if count, ok := q.rc.CanRetry(oid); !ok {
tracerx.Printf("tq: refusing to retry %q, too many retries (%d)", oid, count)
return false
return q.canRetry(err)
func (q *TransferQueue) canRetryObjectLater(oid string, err error) (time.Time, bool) {
if count, ok := q.rc.CanRetry(oid); !ok {
tracerx.Printf("tq: refusing to retry %q, too many retries (%d)", oid, count)
return time.Time{}, false
return q.canRetryLater(err)
// Errors returns any errors encountered during transfer.
func (q *TransferQueue) Errors() []error {
return q.errors