2016-12-12 00:28:47 +00:00
|
|
|
package tq
|
2016-06-03 15:32:46 +00:00
|
|
|
|
|
|
|
import (
|
2016-07-05 14:25:45 +00:00
|
|
|
"fmt"
|
2017-01-04 22:43:29 +00:00
|
|
|
"net/http"
|
2017-09-26 18:27:00 +00:00
|
|
|
"regexp"
|
2017-09-26 18:35:31 +00:00
|
|
|
"strings"
|
2016-06-03 15:32:46 +00:00
|
|
|
"sync"
|
|
|
|
|
2017-10-25 17:51:01 +00:00
|
|
|
"github.com/git-lfs/git-lfs/fs"
|
2017-01-04 22:22:31 +00:00
|
|
|
"github.com/git-lfs/git-lfs/lfsapi"
|
2016-06-03 15:32:46 +00:00
|
|
|
"github.com/rubyist/tracerx"
|
|
|
|
)
|
|
|
|
|
|
|
|
// adapterBase implements the common functionality for core adapters which
|
|
|
|
// process transfers with N workers handling an oid each, and which wait for
|
|
|
|
// authentication to succeed on one worker before proceeding
|
|
|
|
type adapterBase struct {
|
2017-10-25 17:51:01 +00:00
|
|
|
fs *fs.Filesystem
|
2016-06-03 15:32:46 +00:00
|
|
|
name string
|
|
|
|
direction Direction
|
|
|
|
transferImpl transferImplementation
|
2017-01-04 22:22:31 +00:00
|
|
|
apiClient *lfsapi.Client
|
|
|
|
remote string
|
2016-12-09 20:55:04 +00:00
|
|
|
jobChan chan *job
|
2017-03-23 16:29:31 +00:00
|
|
|
debugging bool
|
2016-12-12 00:52:00 +00:00
|
|
|
cb ProgressCallback
|
2016-06-03 15:32:46 +00:00
|
|
|
// WaitGroup to sync the completion of all workers
|
|
|
|
workerWait sync.WaitGroup
|
2016-12-09 20:55:04 +00:00
|
|
|
// WaitGroup to sync the completion of all in-flight jobs
|
|
|
|
jobWait *sync.WaitGroup
|
2016-06-03 15:32:46 +00:00
|
|
|
// WaitGroup to serialise the first transfer response to perform login if needed
|
|
|
|
authWait sync.WaitGroup
|
|
|
|
}
|
|
|
|
|
|
|
|
// transferImplementation must be implemented to provide the actual upload/download
|
|
|
|
// implementation for all core transfer approaches that use adapterBase for
|
|
|
|
// convenience. This function will be called on multiple goroutines so it
|
|
|
|
// must be either stateless or thread safe. However it will never be called
|
|
|
|
// for the same oid in parallel.
|
|
|
|
// If authOkFunc is not nil, implementations must call it as early as possible
|
|
|
|
// when authentication succeeded, before the whole file content is transferred
|
|
|
|
type transferImplementation interface {
|
2016-07-08 15:30:46 +00:00
|
|
|
// WorkerStarting is called when a worker goroutine starts to process jobs
|
|
|
|
// Implementations can run some startup logic here & return some context if needed
|
|
|
|
WorkerStarting(workerNum int) (interface{}, error)
|
|
|
|
// WorkerEnding is called when a worker goroutine is shutting down
|
2016-07-08 15:31:55 +00:00
|
|
|
// Implementations can clean up per-worker resources here, context is as returned from WorkerStarting
|
2016-07-08 15:30:46 +00:00
|
|
|
WorkerEnding(workerNum int, ctx interface{})
|
2016-07-08 15:31:55 +00:00
|
|
|
// DoTransfer performs a single transfer within a worker. ctx is any context returned from WorkerStarting
|
2016-12-12 00:52:00 +00:00
|
|
|
DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error
|
2016-06-03 15:32:46 +00:00
|
|
|
}
|
|
|
|
|
2017-10-25 17:51:01 +00:00
|
|
|
func newAdapterBase(f *fs.Filesystem, name string, dir Direction, ti transferImplementation) *adapterBase {
|
2016-12-09 20:55:04 +00:00
|
|
|
return &adapterBase{
|
2017-10-25 17:51:01 +00:00
|
|
|
fs: f,
|
2016-12-09 20:55:04 +00:00
|
|
|
name: name,
|
|
|
|
direction: dir,
|
|
|
|
transferImpl: ti,
|
2017-01-04 22:43:29 +00:00
|
|
|
jobWait: new(sync.WaitGroup),
|
2016-12-09 20:55:04 +00:00
|
|
|
}
|
2016-06-03 15:32:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (a *adapterBase) Name() string {
|
|
|
|
return a.name
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *adapterBase) Direction() Direction {
|
|
|
|
return a.direction
|
|
|
|
}
|
|
|
|
|
2016-12-15 16:59:06 +00:00
|
|
|
func (a *adapterBase) Begin(cfg AdapterConfig, cb ProgressCallback) error {
|
2017-01-04 22:22:31 +00:00
|
|
|
a.apiClient = cfg.APIClient()
|
|
|
|
a.remote = cfg.Remote()
|
2016-06-03 15:32:46 +00:00
|
|
|
a.cb = cb
|
2016-12-09 20:55:04 +00:00
|
|
|
a.jobChan = make(chan *job, 100)
|
2017-03-23 16:29:31 +00:00
|
|
|
a.debugging = a.apiClient.OSEnv().Bool("GIT_TRANSFER_TRACE", false)
|
2016-12-15 16:59:06 +00:00
|
|
|
maxConcurrency := cfg.ConcurrentTransfers()
|
2016-06-03 15:32:46 +00:00
|
|
|
|
2017-03-23 16:29:31 +00:00
|
|
|
a.Trace("xfer: adapter %q Begin() with %d workers", a.Name(), maxConcurrency)
|
2016-06-03 15:32:46 +00:00
|
|
|
|
|
|
|
a.workerWait.Add(maxConcurrency)
|
|
|
|
a.authWait.Add(1)
|
|
|
|
for i := 0; i < maxConcurrency; i++ {
|
2016-07-08 15:30:46 +00:00
|
|
|
ctx, err := a.transferImpl.WorkerStarting(i)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
go a.worker(i, ctx)
|
2016-06-03 15:32:46 +00:00
|
|
|
}
|
2017-03-23 16:29:31 +00:00
|
|
|
a.Trace("xfer: adapter %q started", a.Name())
|
2016-06-03 15:32:46 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-12-09 20:55:04 +00:00
|
|
|
type job struct {
|
|
|
|
T *Transfer
|
|
|
|
|
2016-12-12 16:23:06 +00:00
|
|
|
results chan<- TransferResult
|
|
|
|
wg *sync.WaitGroup
|
2016-12-09 20:55:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (j *job) Done(err error) {
|
2016-12-12 16:23:06 +00:00
|
|
|
j.results <- TransferResult{j.T, err}
|
2016-12-09 20:55:04 +00:00
|
|
|
j.wg.Done()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *adapterBase) Add(transfers ...*Transfer) <-chan TransferResult {
|
|
|
|
results := make(chan TransferResult, len(transfers))
|
|
|
|
|
|
|
|
a.jobWait.Add(len(transfers))
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for _, t := range transfers {
|
2016-12-12 16:23:06 +00:00
|
|
|
a.jobChan <- &job{t, results, a.jobWait}
|
2016-12-09 20:55:04 +00:00
|
|
|
}
|
|
|
|
a.jobWait.Wait()
|
2016-12-12 22:37:02 +00:00
|
|
|
|
|
|
|
close(results)
|
2016-12-09 20:55:04 +00:00
|
|
|
}()
|
|
|
|
|
|
|
|
return results
|
2016-06-03 15:32:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (a *adapterBase) End() {
|
2017-03-23 16:29:31 +00:00
|
|
|
a.Trace("xfer: adapter %q End()", a.Name())
|
2016-12-09 20:55:04 +00:00
|
|
|
|
|
|
|
a.jobWait.Wait()
|
2016-06-03 15:32:46 +00:00
|
|
|
close(a.jobChan)
|
2016-12-09 20:55:04 +00:00
|
|
|
|
2016-06-03 15:32:46 +00:00
|
|
|
// wait for all transfers to complete
|
|
|
|
a.workerWait.Wait()
|
2016-12-09 20:55:04 +00:00
|
|
|
|
2017-03-23 16:29:31 +00:00
|
|
|
a.Trace("xfer: adapter %q stopped", a.Name())
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *adapterBase) Trace(format string, args ...interface{}) {
|
|
|
|
if !a.debugging {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
tracerx.Printf(format, args...)
|
2016-06-03 15:32:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// worker function, many of these run per adapter
|
2016-07-08 15:30:46 +00:00
|
|
|
func (a *adapterBase) worker(workerNum int, ctx interface{}) {
|
2017-03-23 16:29:31 +00:00
|
|
|
a.Trace("xfer: adapter %q worker %d starting", a.Name(), workerNum)
|
2016-06-03 15:32:46 +00:00
|
|
|
waitForAuth := workerNum > 0
|
|
|
|
signalAuthOnResponse := workerNum == 0
|
|
|
|
|
|
|
|
// First worker is the only one allowed to start immediately
|
|
|
|
// The rest wait until successful response from 1st worker to
|
|
|
|
// make sure only 1 login prompt is presented if necessary
|
|
|
|
// Deliberately outside jobChan processing so we know worker 0 will process 1st item
|
|
|
|
if waitForAuth {
|
2017-03-23 16:29:31 +00:00
|
|
|
a.Trace("xfer: adapter %q worker %d waiting for Auth", a.Name(), workerNum)
|
2016-06-03 15:32:46 +00:00
|
|
|
a.authWait.Wait()
|
2017-03-23 16:29:31 +00:00
|
|
|
a.Trace("xfer: adapter %q worker %d auth signal received", a.Name(), workerNum)
|
2016-06-03 15:32:46 +00:00
|
|
|
}
|
|
|
|
|
2016-12-09 20:55:04 +00:00
|
|
|
for job := range a.jobChan {
|
|
|
|
t := job.T
|
|
|
|
|
2016-06-03 15:32:46 +00:00
|
|
|
var authCallback func()
|
|
|
|
if signalAuthOnResponse {
|
|
|
|
authCallback = func() {
|
|
|
|
a.authWait.Done()
|
2016-06-07 11:54:14 +00:00
|
|
|
signalAuthOnResponse = false
|
2016-06-03 15:32:46 +00:00
|
|
|
}
|
|
|
|
}
|
2017-03-23 16:29:31 +00:00
|
|
|
a.Trace("xfer: adapter %q worker %d processing job for %q", a.Name(), workerNum, t.Oid)
|
2016-10-21 20:36:05 +00:00
|
|
|
|
2016-06-03 15:32:46 +00:00
|
|
|
// Actual transfer happens here
|
2016-07-05 14:25:45 +00:00
|
|
|
var err error
|
2016-12-14 17:57:34 +00:00
|
|
|
if t.Size < 0 {
|
|
|
|
err = fmt.Errorf("Git LFS: object %q has invalid size (got: %d)", t.Oid, t.Size)
|
2016-07-05 14:25:45 +00:00
|
|
|
} else {
|
2016-07-08 15:30:46 +00:00
|
|
|
err = a.transferImpl.DoTransfer(ctx, t, a.cb, authCallback)
|
2016-07-05 14:25:45 +00:00
|
|
|
}
|
2016-06-03 15:32:46 +00:00
|
|
|
|
2016-12-09 20:55:04 +00:00
|
|
|
// Mark the job as completed, and alter all listeners
|
|
|
|
job.Done(err)
|
2016-06-03 15:32:46 +00:00
|
|
|
|
2017-03-23 16:29:31 +00:00
|
|
|
a.Trace("xfer: adapter %q worker %d finished job for %q", a.Name(), workerNum, t.Oid)
|
2016-06-03 15:32:46 +00:00
|
|
|
}
|
|
|
|
// This will only happen if no jobs were submitted; just wake up all workers to finish
|
|
|
|
if signalAuthOnResponse {
|
|
|
|
a.authWait.Done()
|
|
|
|
}
|
2017-03-23 16:29:31 +00:00
|
|
|
a.Trace("xfer: adapter %q worker %d stopping", a.Name(), workerNum)
|
2016-07-08 15:30:46 +00:00
|
|
|
a.transferImpl.WorkerEnding(workerNum, ctx)
|
2016-06-03 15:32:46 +00:00
|
|
|
a.workerWait.Done()
|
|
|
|
}
|
2016-06-08 16:03:05 +00:00
|
|
|
|
2017-09-26 18:27:00 +00:00
|
|
|
var httpRE = regexp.MustCompile(`\Ahttps?://`)
|
|
|
|
|
2017-01-04 22:43:29 +00:00
|
|
|
func (a *adapterBase) newHTTPRequest(method string, rel *Action) (*http.Request, error) {
|
2017-09-26 18:27:00 +00:00
|
|
|
if !httpRE.MatchString(rel.Href) {
|
2017-09-26 18:35:31 +00:00
|
|
|
urlfragment := strings.SplitN(rel.Href, "?", 2)[0]
|
|
|
|
return nil, fmt.Errorf("missing protocol: %q", urlfragment)
|
2017-09-26 18:27:00 +00:00
|
|
|
}
|
|
|
|
|
2017-01-04 22:43:29 +00:00
|
|
|
req, err := http.NewRequest(method, rel.Href, nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for key, value := range rel.Header {
|
|
|
|
req.Header.Set(key, value)
|
|
|
|
}
|
|
|
|
|
|
|
|
return req, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *adapterBase) doHTTP(t *Transfer, req *http.Request) (*http.Response, error) {
|
|
|
|
if t.Authenticated {
|
|
|
|
return a.apiClient.Do(req)
|
|
|
|
}
|
|
|
|
return a.apiClient.DoWithAuth(a.remote, req)
|
|
|
|
}
|
|
|
|
|
2016-12-12 00:52:00 +00:00
|
|
|
func advanceCallbackProgress(cb ProgressCallback, t *Transfer, numBytes int64) {
|
2016-06-08 16:03:05 +00:00
|
|
|
if cb != nil {
|
|
|
|
// Must split into max int sizes since read count is int
|
|
|
|
const maxInt = int(^uint(0) >> 1)
|
|
|
|
for read := int64(0); read < numBytes; {
|
|
|
|
remainder := numBytes - read
|
|
|
|
if remainder > int64(maxInt) {
|
|
|
|
read += int64(maxInt)
|
2016-12-14 17:57:34 +00:00
|
|
|
cb(t.Name, t.Size, read, maxInt)
|
2016-06-08 16:03:05 +00:00
|
|
|
} else {
|
|
|
|
read += remainder
|
2016-12-14 17:57:34 +00:00
|
|
|
cb(t.Name, t.Size, read, int(remainder))
|
2016-06-08 16:03:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|