2015-05-21 16:36:49 +00:00
|
|
|
package lfs
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
2015-05-21 17:47:52 +00:00
|
|
|
|
2016-11-15 17:01:18 +00:00
|
|
|
"github.com/git-lfs/git-lfs/api"
|
|
|
|
"github.com/git-lfs/git-lfs/config"
|
|
|
|
"github.com/git-lfs/git-lfs/errors"
|
|
|
|
"github.com/git-lfs/git-lfs/progress"
|
|
|
|
"github.com/git-lfs/git-lfs/transfer"
|
2016-05-23 18:02:27 +00:00
|
|
|
"github.com/rubyist/tracerx"
|
2015-05-21 16:36:49 +00:00
|
|
|
)
|
|
|
|
|
2015-07-09 19:59:25 +00:00
|
|
|
const (
|
2016-09-21 19:46:34 +00:00
|
|
|
batchSize = 100
|
|
|
|
defaultMaxRetries = 1
|
2015-07-09 19:59:25 +00:00
|
|
|
)
|
|
|
|
|
2015-05-21 16:36:49 +00:00
|
|
|
type Transferable interface {
|
|
|
|
Oid() string
|
|
|
|
Size() int64
|
2015-06-19 17:59:30 +00:00
|
|
|
Name() string
|
2016-05-25 15:41:47 +00:00
|
|
|
Path() string
|
|
|
|
Object() *api.ObjectResource
|
2016-05-16 11:31:23 +00:00
|
|
|
SetObject(*api.ObjectResource)
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
2016-09-28 18:25:37 +00:00
|
|
|
type retryCounter struct {
|
|
|
|
// MaxRetries is the maximum number of retries a single object can
|
|
|
|
// attempt to make before it will be dropped.
|
|
|
|
MaxRetries int `git:"lfs.transfer.maxretries"`
|
|
|
|
|
|
|
|
// cmu guards count
|
|
|
|
cmu sync.Mutex
|
|
|
|
// count maps OIDs to number of retry attempts
|
|
|
|
count map[string]int
|
|
|
|
}
|
|
|
|
|
|
|
|
// newRetryCounter instantiates a new *retryCounter. It parses the gitconfig
|
|
|
|
// value: `lfs.transfer.maxretries`, and falls back to defaultMaxRetries if none
|
|
|
|
// was provided.
|
|
|
|
//
|
|
|
|
// If it encountered an error in Unmarshaling the *config.Configuration, it will
|
|
|
|
// be returned, otherwise nil.
|
|
|
|
func newRetryCounter(cfg *config.Configuration) *retryCounter {
|
|
|
|
rc := &retryCounter{
|
|
|
|
MaxRetries: defaultMaxRetries,
|
|
|
|
|
|
|
|
count: make(map[string]int),
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := cfg.Unmarshal(rc); err != nil {
|
|
|
|
tracerx.Printf("rc: error parsing config, falling back to default values...: %v", err)
|
|
|
|
rc.MaxRetries = 1
|
|
|
|
}
|
|
|
|
|
|
|
|
if rc.MaxRetries < 1 {
|
|
|
|
tracerx.Printf("rc: invalid retry count: %d, defaulting to %d", rc.MaxRetries, 1)
|
|
|
|
rc.MaxRetries = 1
|
|
|
|
}
|
|
|
|
|
|
|
|
return rc
|
|
|
|
}
|
|
|
|
|
|
|
|
// Increment increments the number of retries for a given OID. It is safe to
|
|
|
|
// call across multiple goroutines.
|
|
|
|
func (r *retryCounter) Increment(oid string) {
|
|
|
|
r.cmu.Lock()
|
|
|
|
defer r.cmu.Unlock()
|
|
|
|
|
|
|
|
r.count[oid]++
|
|
|
|
}
|
|
|
|
|
|
|
|
// CountFor returns the current number of retries for a given OID. It is safe to
|
|
|
|
// call across multiple goroutines.
|
|
|
|
func (r *retryCounter) CountFor(oid string) int {
|
|
|
|
r.cmu.Lock()
|
|
|
|
defer r.cmu.Unlock()
|
|
|
|
|
|
|
|
return r.count[oid]
|
|
|
|
}
|
|
|
|
|
|
|
|
// CanRetry returns the current number of retries, and whether or not it exceeds
|
|
|
|
// the maximum number of retries (see: retryCounter.MaxRetries).
|
|
|
|
func (r *retryCounter) CanRetry(oid string) (int, bool) {
|
|
|
|
count := r.CountFor(oid)
|
|
|
|
return count, count < r.MaxRetries
|
|
|
|
}
|
|
|
|
|
2016-05-24 16:05:02 +00:00
|
|
|
// TransferQueue organises the wider process of uploading and downloading,
|
|
|
|
// including calling the API, passing the actual transfer request to transfer
|
2016-09-21 19:46:34 +00:00
|
|
|
// adapters, and dealing with progress, errors and retries.
|
2015-05-21 16:36:49 +00:00
|
|
|
type TransferQueue struct {
|
2016-06-01 16:33:01 +00:00
|
|
|
direction transfer.Direction
|
2016-05-25 15:41:47 +00:00
|
|
|
adapter transfer.TransferAdapter
|
|
|
|
adapterInProgress bool
|
|
|
|
adapterResultChan chan transfer.TransferResult
|
|
|
|
adapterInitMutex sync.Mutex
|
|
|
|
dryRun bool
|
|
|
|
meter *progress.ProgressMeter
|
|
|
|
errors []error
|
|
|
|
transferables map[string]Transferable
|
|
|
|
batcher *Batcher
|
|
|
|
retriesc chan Transferable // Channel for processing retries
|
|
|
|
errorc chan error // Channel for processing errors
|
|
|
|
watchers []chan string
|
|
|
|
trMutex *sync.Mutex
|
|
|
|
errorwait sync.WaitGroup
|
|
|
|
retrywait sync.WaitGroup
|
2016-09-21 19:46:34 +00:00
|
|
|
// wait is used to keep track of pending transfers. It is incremented
|
|
|
|
// once per unique OID on Add(), and is decremented when that transfer
|
|
|
|
// is marked as completed or failed, but not retried.
|
2016-11-18 18:21:09 +00:00
|
|
|
wait sync.WaitGroup
|
|
|
|
manifest *transfer.Manifest
|
|
|
|
rc *retryCounter
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
2016-05-25 15:41:47 +00:00
|
|
|
// newTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
|
2016-06-01 16:33:01 +00:00
|
|
|
func newTransferQueue(files int, size int64, dryRun bool, dir transfer.Direction) *TransferQueue {
|
2016-09-28 18:25:37 +00:00
|
|
|
cfg := config.Config
|
|
|
|
|
|
|
|
logPath, _ := cfg.Os.Get("GIT_LFS_PROGRESS")
|
2016-08-10 20:23:03 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
q := &TransferQueue{
|
2016-06-01 16:33:01 +00:00
|
|
|
direction: dir,
|
2016-05-25 15:41:47 +00:00
|
|
|
dryRun: dryRun,
|
2016-08-10 20:23:03 +00:00
|
|
|
meter: progress.NewProgressMeter(files, size, dryRun, logPath),
|
2015-09-04 20:01:48 +00:00
|
|
|
retriesc: make(chan Transferable, batchSize),
|
2015-08-21 15:48:52 +00:00
|
|
|
errorc: make(chan error),
|
2015-05-21 16:36:49 +00:00
|
|
|
transferables: make(map[string]Transferable),
|
2016-04-22 20:24:50 +00:00
|
|
|
trMutex: &sync.Mutex{},
|
2016-08-09 22:07:41 +00:00
|
|
|
manifest: transfer.ConfigureManifest(transfer.NewManifest(), config.Config),
|
2016-09-28 18:25:37 +00:00
|
|
|
rc: newRetryCounter(cfg),
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
2015-07-09 18:21:49 +00:00
|
|
|
|
2015-09-04 15:21:26 +00:00
|
|
|
q.errorwait.Add(1)
|
2015-09-04 20:01:48 +00:00
|
|
|
q.retrywait.Add(1)
|
2015-09-04 15:21:26 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
q.run()
|
|
|
|
|
|
|
|
return q
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
2016-09-21 19:46:34 +00:00
|
|
|
// Add adds a Transferable to the transfer queue. It only increments the amount
|
|
|
|
// of waiting the TransferQueue has to do if the Transferable "t" is new.
|
2015-05-21 16:36:49 +00:00
|
|
|
func (q *TransferQueue) Add(t Transferable) {
|
2016-04-22 20:24:50 +00:00
|
|
|
q.trMutex.Lock()
|
2016-09-21 19:46:34 +00:00
|
|
|
if _, ok := q.transferables[t.Oid()]; !ok {
|
|
|
|
q.wait.Add(1)
|
|
|
|
q.transferables[t.Oid()] = t
|
2016-10-20 18:07:19 +00:00
|
|
|
q.trMutex.Unlock()
|
|
|
|
} else {
|
|
|
|
tracerx.Printf("already transferring %q, skipping duplicate", t)
|
|
|
|
q.trMutex.Unlock()
|
|
|
|
return
|
2016-09-21 19:46:34 +00:00
|
|
|
}
|
2015-07-09 18:21:49 +00:00
|
|
|
|
|
|
|
if q.batcher != nil {
|
|
|
|
q.batcher.Add(t)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2016-06-02 11:44:17 +00:00
|
|
|
func (q *TransferQueue) useAdapter(name string) {
|
|
|
|
q.adapterInitMutex.Lock()
|
|
|
|
defer q.adapterInitMutex.Unlock()
|
|
|
|
|
|
|
|
if q.adapter != nil {
|
|
|
|
if q.adapter.Name() == name {
|
|
|
|
// re-use, this is the normal path
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// If the adapter we're using isn't the same as the one we've been
|
|
|
|
// told to use now, must wait for the current one to finish then switch
|
|
|
|
// This will probably never happen but is just in case server starts
|
|
|
|
// changing adapter support in between batches
|
|
|
|
q.finishAdapter()
|
|
|
|
}
|
2016-08-09 22:07:41 +00:00
|
|
|
q.adapter = q.manifest.NewAdapterOrDefault(name, q.direction)
|
2016-06-01 16:33:01 +00:00
|
|
|
}
|
2016-06-02 11:11:45 +00:00
|
|
|
|
2016-06-02 11:44:17 +00:00
|
|
|
func (q *TransferQueue) finishAdapter() {
|
|
|
|
if q.adapterInProgress {
|
|
|
|
q.adapter.End()
|
|
|
|
q.adapterInProgress = false
|
|
|
|
q.adapter = nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-25 15:41:47 +00:00
|
|
|
func (q *TransferQueue) addToAdapter(t Transferable) {
|
|
|
|
tr := transfer.NewTransfer(t.Name(), t.Object(), t.Path())
|
|
|
|
|
|
|
|
if q.dryRun {
|
|
|
|
// Don't actually transfer
|
|
|
|
res := transfer.TransferResult{tr, nil}
|
|
|
|
q.handleTransferResult(res)
|
|
|
|
return
|
|
|
|
}
|
2016-07-11 10:17:10 +00:00
|
|
|
err := q.ensureAdapterBegun()
|
|
|
|
if err != nil {
|
|
|
|
q.errorc <- err
|
|
|
|
q.Skip(t.Size())
|
|
|
|
q.wait.Done()
|
|
|
|
return
|
|
|
|
}
|
2016-05-25 15:41:47 +00:00
|
|
|
q.adapter.Add(tr)
|
|
|
|
}
|
|
|
|
|
2016-04-06 19:06:34 +00:00
|
|
|
func (q *TransferQueue) Skip(size int64) {
|
|
|
|
q.meter.Skip(size)
|
|
|
|
}
|
|
|
|
|
2016-05-25 15:41:47 +00:00
|
|
|
func (q *TransferQueue) transferKind() string {
|
2016-06-01 16:33:01 +00:00
|
|
|
if q.direction == transfer.Download {
|
2016-05-25 15:41:47 +00:00
|
|
|
return "download"
|
|
|
|
} else {
|
|
|
|
return "upload"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-07-11 10:17:10 +00:00
|
|
|
func (q *TransferQueue) ensureAdapterBegun() error {
|
2016-05-25 15:41:47 +00:00
|
|
|
q.adapterInitMutex.Lock()
|
|
|
|
defer q.adapterInitMutex.Unlock()
|
|
|
|
|
|
|
|
if q.adapterInProgress {
|
2016-07-11 10:17:10 +00:00
|
|
|
return nil
|
2016-05-25 15:41:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
adapterResultChan := make(chan transfer.TransferResult, 20)
|
|
|
|
|
|
|
|
// Progress callback - receives byte updates
|
2016-05-27 11:52:18 +00:00
|
|
|
cb := func(name string, total, read int64, current int) error {
|
|
|
|
q.meter.TransferBytes(q.transferKind(), name, read, total, current)
|
2016-05-25 15:41:47 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name())
|
2016-07-11 10:17:10 +00:00
|
|
|
err := q.adapter.Begin(config.Config.ConcurrentTransfers(), cb, adapterResultChan)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-05-25 15:41:47 +00:00
|
|
|
q.adapterInProgress = true
|
|
|
|
|
|
|
|
// Collector for completed transfers
|
|
|
|
// q.wait.Done() in handleTransferResult is enough to know when this is complete for all transfers
|
|
|
|
go func() {
|
|
|
|
for res := range adapterResultChan {
|
|
|
|
q.handleTransferResult(res)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2016-07-11 10:17:10 +00:00
|
|
|
return nil
|
2016-05-25 15:41:47 +00:00
|
|
|
}
|
|
|
|
|
2016-09-21 19:46:34 +00:00
|
|
|
// handleTransferResult is responsible for dealing with the result of a
|
|
|
|
// successful or failed transfer.
|
|
|
|
//
|
|
|
|
// If there was an error assosicated with the given transfer, "res.Error", and
|
|
|
|
// it is retriable (see: `q.canRetryObject`), it will be placed in the next
|
|
|
|
// batch and be retried. If that error is not retriable for any reason, the
|
|
|
|
// transfer will be marked as having failed, and the error will be reported.
|
|
|
|
//
|
|
|
|
// If the transfer was successful, the watchers of this transfer queue will be
|
|
|
|
// notified, and the transfer will be marked as having been completed.
|
2016-05-25 15:41:47 +00:00
|
|
|
func (q *TransferQueue) handleTransferResult(res transfer.TransferResult) {
|
2016-09-21 19:46:34 +00:00
|
|
|
oid := res.Transfer.Object.Oid
|
|
|
|
|
2016-05-25 15:41:47 +00:00
|
|
|
if res.Error != nil {
|
2016-09-21 19:46:34 +00:00
|
|
|
if q.canRetryObject(oid, res.Error) {
|
|
|
|
tracerx.Printf("tq: retrying object %s", oid)
|
2016-05-25 15:41:47 +00:00
|
|
|
q.trMutex.Lock()
|
2016-09-21 19:46:34 +00:00
|
|
|
t, ok := q.transferables[oid]
|
2016-05-25 15:41:47 +00:00
|
|
|
q.trMutex.Unlock()
|
|
|
|
if ok {
|
|
|
|
q.retry(t)
|
|
|
|
} else {
|
|
|
|
q.errorc <- res.Error
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
q.errorc <- res.Error
|
2016-09-21 19:46:34 +00:00
|
|
|
q.wait.Done()
|
2016-05-25 15:41:47 +00:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
for _, c := range q.watchers {
|
|
|
|
c <- oid
|
|
|
|
}
|
2016-07-05 23:38:54 +00:00
|
|
|
|
|
|
|
q.meter.FinishTransfer(res.Transfer.Name)
|
2016-09-21 19:46:34 +00:00
|
|
|
q.wait.Done()
|
2016-05-25 15:41:47 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-09-09 14:24:50 +00:00
|
|
|
// Wait waits for the queue to finish processing all transfers. Once Wait is
|
|
|
|
// called, Add will no longer add transferables to the queue. Any failed
|
|
|
|
// transfers will be automatically retried once.
|
2015-07-09 18:21:49 +00:00
|
|
|
func (q *TransferQueue) Wait() {
|
|
|
|
if q.batcher != nil {
|
|
|
|
q.batcher.Exit()
|
|
|
|
}
|
2015-07-10 20:05:04 +00:00
|
|
|
|
2015-09-04 20:01:48 +00:00
|
|
|
q.wait.Wait()
|
|
|
|
|
|
|
|
// Handle any retries
|
|
|
|
close(q.retriesc)
|
|
|
|
q.retrywait.Wait()
|
2015-09-09 14:24:50 +00:00
|
|
|
|
2016-06-02 11:44:17 +00:00
|
|
|
q.finishAdapter()
|
2015-07-09 18:21:49 +00:00
|
|
|
close(q.errorc)
|
2015-07-10 20:05:04 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
for _, watcher := range q.watchers {
|
|
|
|
close(watcher)
|
|
|
|
}
|
2015-07-10 19:25:59 +00:00
|
|
|
|
2015-07-27 16:43:41 +00:00
|
|
|
q.meter.Finish()
|
2015-09-04 15:21:26 +00:00
|
|
|
q.errorwait.Wait()
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
2015-05-27 19:45:18 +00:00
|
|
|
// Watch returns a channel where the queue will write the OID of each transfer
|
|
|
|
// as it completes. The channel will be closed when the queue finishes processing.
|
|
|
|
func (q *TransferQueue) Watch() chan string {
|
2015-07-09 19:59:25 +00:00
|
|
|
c := make(chan string, batchSize)
|
2015-05-27 19:45:18 +00:00
|
|
|
q.watchers = append(q.watchers, c)
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
// batchApiRoutine processes the queue of transfers using the batch endpoint,
|
|
|
|
// making only one POST call for all objects. The results are then handed
|
|
|
|
// off to the transfer workers.
|
|
|
|
func (q *TransferQueue) batchApiRoutine() {
|
2015-07-31 14:30:08 +00:00
|
|
|
var startProgress sync.Once
|
|
|
|
|
2016-08-09 22:07:41 +00:00
|
|
|
transferAdapterNames := q.manifest.GetAdapterNames(q.direction)
|
2016-06-01 16:33:01 +00:00
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
for {
|
|
|
|
batch := q.batcher.Next()
|
|
|
|
if batch == nil {
|
|
|
|
break
|
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
|
2015-07-09 20:01:57 +00:00
|
|
|
tracerx.Printf("tq: sending batch of size %d", len(batch))
|
|
|
|
|
2016-05-16 11:31:23 +00:00
|
|
|
transfers := make([]*api.ObjectResource, 0, len(batch))
|
2016-05-18 15:54:54 +00:00
|
|
|
for _, i := range batch {
|
|
|
|
t := i.(Transferable)
|
2016-05-16 11:31:23 +00:00
|
|
|
transfers = append(transfers, &api.ObjectResource{Oid: t.Oid(), Size: t.Size()})
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
|
2016-06-02 09:30:13 +00:00
|
|
|
if len(transfers) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2016-08-09 19:18:18 +00:00
|
|
|
objs, adapterName, err := api.Batch(config.Config, transfers, q.transferKind(), transferAdapterNames)
|
2015-07-09 18:21:49 +00:00
|
|
|
if err != nil {
|
2016-09-21 19:46:34 +00:00
|
|
|
var errOnce sync.Once
|
|
|
|
for _, o := range batch {
|
|
|
|
t := o.(Transferable)
|
|
|
|
|
|
|
|
if q.canRetryObject(t.Oid(), err) {
|
|
|
|
q.retry(t)
|
|
|
|
} else {
|
|
|
|
errOnce.Do(func() { q.errorc <- err })
|
2016-10-10 21:57:12 +00:00
|
|
|
q.wait.Done()
|
2015-08-18 19:53:04 +00:00
|
|
|
}
|
|
|
|
}
|
2015-09-01 18:31:40 +00:00
|
|
|
|
2015-07-15 21:33:37 +00:00
|
|
|
continue
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-06-15 11:46:02 +00:00
|
|
|
|
2016-06-02 11:44:17 +00:00
|
|
|
q.useAdapter(adapterName)
|
2015-07-31 14:30:08 +00:00
|
|
|
startProgress.Do(q.meter.Start)
|
|
|
|
|
2016-06-02 11:11:45 +00:00
|
|
|
for _, o := range objs {
|
2015-09-21 20:19:50 +00:00
|
|
|
if o.Error != nil {
|
2016-08-18 20:35:11 +00:00
|
|
|
q.errorc <- errors.Wrapf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
|
2016-04-06 19:06:34 +00:00
|
|
|
q.Skip(o.Size)
|
2015-09-21 20:19:50 +00:00
|
|
|
q.wait.Done()
|
|
|
|
continue
|
|
|
|
}
|
2015-08-07 13:32:28 +00:00
|
|
|
|
2016-05-25 15:41:47 +00:00
|
|
|
if _, ok := o.Rel(q.transferKind()); ok {
|
2015-07-27 12:58:27 +00:00
|
|
|
// This object needs to be transferred
|
2016-04-22 20:24:50 +00:00
|
|
|
q.trMutex.Lock()
|
|
|
|
transfer, ok := q.transferables[o.Oid]
|
|
|
|
q.trMutex.Unlock()
|
|
|
|
|
|
|
|
if ok {
|
2015-07-09 18:21:49 +00:00
|
|
|
transfer.SetObject(o)
|
2015-07-27 20:41:57 +00:00
|
|
|
q.meter.Add(transfer.Name())
|
2016-05-25 15:41:47 +00:00
|
|
|
q.addToAdapter(transfer)
|
2015-07-17 01:29:32 +00:00
|
|
|
} else {
|
2016-04-06 19:06:34 +00:00
|
|
|
q.Skip(transfer.Size())
|
2015-07-17 01:29:32 +00:00
|
|
|
q.wait.Done()
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-07-17 01:29:32 +00:00
|
|
|
} else {
|
2016-04-06 19:06:34 +00:00
|
|
|
q.Skip(o.Size)
|
2015-07-17 01:29:32 +00:00
|
|
|
q.wait.Done()
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
2015-07-09 18:21:49 +00:00
|
|
|
// This goroutine collects errors returned from transfers
|
|
|
|
func (q *TransferQueue) errorCollector() {
|
|
|
|
for err := range q.errorc {
|
|
|
|
q.errors = append(q.errors, err)
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
2015-09-04 15:21:26 +00:00
|
|
|
q.errorwait.Done()
|
2015-07-09 18:21:49 +00:00
|
|
|
}
|
2015-05-21 16:36:49 +00:00
|
|
|
|
2016-09-21 19:46:34 +00:00
|
|
|
// retryCollector collects objects to retry, increments the number of times that
|
|
|
|
// they have been retried, and then enqueues them in the next batch, or legacy
|
|
|
|
// API channel. If the transfer queue is using a batcher, the batch will be
|
|
|
|
// flushed immediately.
|
|
|
|
//
|
|
|
|
// retryCollector runs in its own goroutine.
|
2015-09-04 20:01:48 +00:00
|
|
|
func (q *TransferQueue) retryCollector() {
|
|
|
|
for t := range q.retriesc {
|
2016-09-28 18:25:37 +00:00
|
|
|
q.rc.Increment(t.Oid())
|
|
|
|
count := q.rc.CountFor(t.Oid())
|
2016-09-21 19:46:34 +00:00
|
|
|
|
|
|
|
tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid(), t.Size())
|
|
|
|
|
2016-10-20 18:07:19 +00:00
|
|
|
// XXX(taylor): reuse some of the logic in
|
|
|
|
// `*TransferQueue.Add(t)` here to circumvent banned duplicate
|
|
|
|
// OIDs
|
2016-09-21 19:46:34 +00:00
|
|
|
if q.batcher != nil {
|
2016-10-14 19:00:49 +00:00
|
|
|
tracerx.Printf("tq: flushing batch in response to retry #%d for %q (size: %d)", count, t.Oid(), t.Size())
|
2016-10-20 18:07:19 +00:00
|
|
|
|
|
|
|
q.batcher.Add(t)
|
2016-09-21 19:46:34 +00:00
|
|
|
q.batcher.Flush()
|
|
|
|
}
|
2015-09-04 20:01:48 +00:00
|
|
|
}
|
|
|
|
q.retrywait.Done()
|
|
|
|
}
|
|
|
|
|
2015-07-10 20:05:04 +00:00
|
|
|
// run starts the transfer queue, doing individual or batch transfers depending
|
|
|
|
// on the Config.BatchTransfer() value. run will transfer files sequentially or
|
|
|
|
// concurrently depending on the Config.ConcurrentTransfers() value.
|
2015-07-09 18:21:49 +00:00
|
|
|
func (q *TransferQueue) run() {
|
|
|
|
go q.errorCollector()
|
2015-09-04 20:01:48 +00:00
|
|
|
go q.retryCollector()
|
2015-06-19 17:59:30 +00:00
|
|
|
|
2016-11-08 18:22:29 +00:00
|
|
|
tracerx.Printf("tq: running as batched queue, batch size of %d", batchSize)
|
|
|
|
q.batcher = NewBatcher(batchSize)
|
|
|
|
go q.batchApiRoutine()
|
2015-05-21 16:36:49 +00:00
|
|
|
}
|
|
|
|
|
2015-09-04 20:01:48 +00:00
|
|
|
func (q *TransferQueue) retry(t Transferable) {
|
|
|
|
q.retriesc <- t
|
|
|
|
}
|
2015-08-18 19:53:04 +00:00
|
|
|
|
2016-09-21 19:46:34 +00:00
|
|
|
// canRetry returns whether or not the given error "err" is retriable.
|
2015-09-04 20:01:48 +00:00
|
|
|
func (q *TransferQueue) canRetry(err error) bool {
|
2016-09-21 19:46:34 +00:00
|
|
|
return errors.IsRetriableError(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// canRetryObject returns whether the given error is retriable for the object
|
|
|
|
// given by "oid". If the an OID has met its retry limit, then it will not be
|
|
|
|
// able to be retried again. If so, canRetryObject returns whether or not that
|
|
|
|
// given error "err" is retriable.
|
|
|
|
func (q *TransferQueue) canRetryObject(oid string, err error) bool {
|
2016-09-28 18:25:37 +00:00
|
|
|
if count, ok := q.rc.CanRetry(oid); !ok {
|
2016-09-21 19:46:34 +00:00
|
|
|
tracerx.Printf("tq: refusing to retry %q, too many retries (%d)", oid, count)
|
2015-08-17 23:02:20 +00:00
|
|
|
return false
|
|
|
|
}
|
2015-09-04 13:42:37 +00:00
|
|
|
|
2016-09-21 19:46:34 +00:00
|
|
|
return q.canRetry(err)
|
2015-08-17 23:02:20 +00:00
|
|
|
}
|
|
|
|
|
2015-05-26 13:56:43 +00:00
|
|
|
// Errors returns any errors encountered during transfer.
|
2015-08-21 15:48:52 +00:00
|
|
|
func (q *TransferQueue) Errors() []error {
|
2015-05-21 16:36:49 +00:00
|
|
|
return q.errors
|
|
|
|
}
|