git-lfs/transfer/basic_download.go

233 lines
7.5 KiB
Go

package transfer
import (
"errors"
"fmt"
"hash"
"io"
"os"
"path/filepath"
"regexp"
"strconv"
"github.com/github/git-lfs/config"
"github.com/github/git-lfs/errutil"
"github.com/github/git-lfs/httputil"
"github.com/github/git-lfs/localstorage"
"github.com/github/git-lfs/tools"
"github.com/rubyist/tracerx"
)
// Adapter for basic HTTP downloads, includes resuming via HTTP Range
type basicDownloadAdapter struct {
*adapterBase
}
func (a *basicDownloadAdapter) ClearTempStorage() error {
return os.RemoveAll(a.tempDir())
}
func (a *basicDownloadAdapter) tempDir() string {
// Must be dedicated to this adapter as deleted by ClearTempStorage
// Also make local to this repo not global, and separate to localstorage temp,
// which gets cleared at the end of every invocation
d := filepath.Join(localstorage.Objects().RootDir, "incomplete")
if err := os.MkdirAll(d, 0755); err != nil {
return os.TempDir()
}
return d
}
func (a *basicDownloadAdapter) WorkerStarting(workerNum int) (interface{}, error) {
return nil, nil
}
func (a *basicDownloadAdapter) WorkerEnding(workerNum int, ctx interface{}) {
}
func (a *basicDownloadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb TransferProgressCallback, authOkFunc func()) error {
f, fromByte, hashSoFar, err := a.checkResumeDownload(t)
if err != nil {
return err
}
return a.download(t, cb, authOkFunc, f, fromByte, hashSoFar)
}
// Checks to see if a download can be resumed, and if so returns a non-nil locked file, byte start and hash
func (a *basicDownloadAdapter) checkResumeDownload(t *Transfer) (outFile *os.File, fromByte int64, hashSoFar hash.Hash, e error) {
// lock the file by opening it for read/write, rather than checking Stat() etc
// which could be subject to race conditions by other processes
f, err := os.OpenFile(a.downloadFilename(t), os.O_RDWR, 0644)
if err != nil {
// Create a new file instead, must not already exist or error (permissions / race condition)
newfile, err := os.OpenFile(a.downloadFilename(t), os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0644)
return newfile, 0, nil, err
}
// Successfully opened an existing file at this point
// Read any existing data into hash then return file handle at end
hash := tools.NewLfsContentHash()
n, err := io.Copy(hash, f)
if err != nil {
f.Close()
return nil, 0, nil, err
}
tracerx.Printf("xfer: Attempting to resume download of %q from byte %d", t.Object.Oid, n)
return f, n, hash, nil
}
// Create or open a download file for resuming
func (a *basicDownloadAdapter) downloadFilename(t *Transfer) string {
// Not a temp file since we will be resuming it
return filepath.Join(a.tempDir(), t.Object.Oid+".tmp")
}
// download starts or resumes and download. Always closes dlFile if non-nil
func (a *basicDownloadAdapter) download(t *Transfer, cb TransferProgressCallback, authOkFunc func(), dlFile *os.File, fromByte int64, hash hash.Hash) error {
if dlFile != nil {
// ensure we always close dlFile. Note that this does not conflict with the
// early close below, as close is idempotent.
defer dlFile.Close()
}
rel, ok := t.Object.Rel("download")
if !ok {
return errors.New("Object not found on the server.")
}
req, err := httputil.NewHttpRequest("GET", rel.Href, rel.Header)
if err != nil {
return err
}
if fromByte > 0 {
if dlFile == nil || hash == nil {
return fmt.Errorf("Cannot restart %v from %d without a file & hash", t.Object.Oid, fromByte)
}
// We could just use a start byte, but since we know the length be specific
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", fromByte, t.Object.Size-1))
}
res, err := httputil.DoHttpRequest(config.Config, req, t.Object.NeedsAuth())
if err != nil {
// Special-case status code 416 () - fall back
if fromByte > 0 && dlFile != nil && res.StatusCode == 416 {
tracerx.Printf("xfer: server rejected resume download request for %q from byte %d; re-downloading from start", t.Object.Oid, fromByte)
dlFile.Close()
os.Remove(dlFile.Name())
return a.download(t, cb, authOkFunc, nil, 0, nil)
}
return errutil.NewRetriableError(err)
}
httputil.LogTransfer(config.Config, "lfs.data.download", res)
defer res.Body.Close()
// Range request must return 206 & content range to confirm
if fromByte > 0 {
rangeRequestOk := false
var failReason string
// check 206 and Content-Range, fall back if either not as expected
if res.StatusCode == 206 {
// Probably a successful range request, check Content-Range
if rangeHdr := res.Header.Get("Content-Range"); rangeHdr != "" {
regex := regexp.MustCompile(`bytes (\d+)\-.*`)
match := regex.FindStringSubmatch(rangeHdr)
if match != nil && len(match) > 1 {
contentStart, _ := strconv.ParseInt(match[1], 10, 64)
if contentStart == fromByte {
rangeRequestOk = true
} else {
failReason = fmt.Sprintf("Content-Range start byte incorrect: %s expected %d", match[1], fromByte)
}
} else {
failReason = fmt.Sprintf("badly formatted Content-Range header: %q", rangeHdr)
}
} else {
failReason = "missing Content-Range header in response"
}
} else {
failReason = fmt.Sprintf("expected status code 206, received %d", res.StatusCode)
}
if rangeRequestOk {
tracerx.Printf("xfer: server accepted resume download request: %q from byte %d", t.Object.Oid, fromByte)
advanceCallbackProgress(cb, t, fromByte)
} else {
// Abort resume, perform regular download
tracerx.Printf("xfer: failed to resume download for %q from byte %d: %s. Re-downloading from start", t.Object.Oid, fromByte, failReason)
dlFile.Close()
os.Remove(dlFile.Name())
if res.StatusCode == 200 {
// If status code was 200 then server just ignored Range header and
// sent everything. Don't re-request, use this one from byte 0
dlFile = nil
fromByte = 0
hash = nil
} else {
// re-request needed
return a.download(t, cb, authOkFunc, nil, 0, nil)
}
}
}
// Signal auth OK on success response, before starting download to free up
// other workers immediately
if authOkFunc != nil {
authOkFunc()
}
var hasher *tools.HashingReader
if fromByte > 0 && hash != nil {
// pre-load hashing reader with previous content
hasher = tools.NewHashingReaderPreloadHash(res.Body, hash)
} else {
hasher = tools.NewHashingReader(res.Body)
}
if dlFile == nil {
// New file start
dlFile, err = os.OpenFile(a.downloadFilename(t), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer dlFile.Close()
}
dlfilename := dlFile.Name()
// Wrap callback to give name context
ccb := func(totalSize int64, readSoFar int64, readSinceLast int) error {
if cb != nil {
return cb(t.Name, totalSize, readSoFar+fromByte, readSinceLast)
}
return nil
}
written, err := tools.CopyWithCallback(dlFile, hasher, res.ContentLength, ccb)
if err != nil {
return fmt.Errorf("cannot write data to tempfile %q: %v", dlfilename, err)
}
if err := dlFile.Close(); err != nil {
return fmt.Errorf("can't close tempfile %q: %v", dlfilename, err)
}
if actual := hasher.Hash(); actual != t.Object.Oid {
return fmt.Errorf("Expected OID %s, got %s after %d bytes written", t.Object.Oid, actual, written)
}
return tools.RenameFileCopyPermissions(dlfilename, t.Path)
}
func configureBasicDownloadAdapter(m *Manifest) {
m.RegisterNewTransferAdapterFunc(BasicAdapterName, Download, func(name string, dir Direction) TransferAdapter {
switch dir {
case Download:
bd := &basicDownloadAdapter{newAdapterBase(name, dir, nil)}
// self implements impl
bd.transferImpl = bd
return bd
case Upload:
panic("Should never ask this func to upload")
}
return nil
})
}