git-lfs/tq/basic_download.go
brian m. carlson 00623425a2
tq: make Manifest an interface
Right now, any time we instantiate a Manifest object, we create an API
client, and when we create the API client, if we're using SSH, we try to
make a connection to the server.  However, we often instantiate a
Manifest object when performing various functionality such as smudging
data, which means that when a user creates an archive locally, they can
be prompted for an SSH password, which is undesirable.

Let's take a first step to fixing this by making Manifest an interface.
Right now, it has one concrete version, a concreteManifest, which can be
used to access the internals, and we provide methods to upgrade it from
the interface to the concrete type and determine whether it's upgraded
or not.  We attempt to upgrade it any time we need to access its
internals.  In the future, we'll also offer a lazyManifest, which is
lazy and will only instantiate the concreteManifest inside when we
attempt to upgrade it to the latter.  But for now, only implement the
concreteManifest to make it clearer what's changing.

Similarly, we make our TransferQueue upgradable so that we don't
upgrade its Manifest right away.

In both cases, we'll want to use the lazyManifest to delay the
instantiation of the API client (and hence the starting of the SSH
connection) in a future commit.
2023-03-23 16:55:57 +00:00

287 lines
8.5 KiB
Go

package tq
import (
"fmt"
"hash"
"io"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"github.com/git-lfs/git-lfs/v3/errors"
"github.com/git-lfs/git-lfs/v3/tools"
"github.com/git-lfs/git-lfs/v3/tr"
"github.com/rubyist/tracerx"
)
// Adapter for basic HTTP downloads, includes resuming via HTTP Range
type basicDownloadAdapter struct {
*adapterBase
}
func (a *basicDownloadAdapter) tempDir() string {
// Shared with the SSH adapter.
d := filepath.Join(a.fs.LFSStorageDir, "incomplete")
if err := tools.MkdirAll(d, a.fs); err != nil {
return os.TempDir()
}
return d
}
func (a *basicDownloadAdapter) WorkerStarting(workerNum int) (interface{}, error) {
return nil, nil
}
func (a *basicDownloadAdapter) WorkerEnding(workerNum int, ctx interface{}) {
}
func (a *basicDownloadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error {
// Reserve a temporary filename. We need to make sure nobody operates on the file simultaneously with us.
f, err := tools.TempFile(a.tempDir(), t.Oid, a.fs)
if err != nil {
return err
}
tmpName := f.Name()
defer func() {
// Fail-safe: Most implementation of os.File.Close() does nil check
if f != nil {
f.Close()
}
// This will delete temp file if:
// - we failed to fully download file and move it to final location including the case when final location already
// exists because other parallel git-lfs processes downloaded file
// - we also failed to move it to a partially-downloaded location
os.Remove(tmpName)
}()
// Close file because we will attempt to move partially-downloaded one on top of it
if err := f.Close(); err != nil {
return err
}
// Attempt to resume download. No error checking here. If we fail, we'll simply download from the start
tools.RobustRename(a.downloadFilename(t), f.Name())
// Open temp file. It is either empty or partially downloaded
f, err = os.OpenFile(f.Name(), os.O_RDWR, 0644)
if err != nil {
return err
}
// Read any existing data into hash
hash := tools.NewLfsContentHash()
fromByte, err := io.Copy(hash, f)
if err != nil {
return err
}
// Ensure that partial file seems valid
if fromByte > 0 {
if fromByte < t.Size-1 {
tracerx.Printf("xfer: Attempting to resume download of %q from byte %d", t.Oid, fromByte)
} else {
// Somehow we have more data than expected. Let's retry from the beginning.
if _, err := f.Seek(0, io.SeekStart); err != nil {
return err
}
if err := f.Truncate(0); err != nil {
return err
}
fromByte = 0
hash = nil
}
}
err = a.download(t, cb, authOkFunc, f, fromByte, hash)
if err != nil {
f.Close()
// Rename file so next download can resume from where we stopped.
// No error checking here, if rename fails then file will be deleted and there just will be no download resuming
tools.RobustRename(f.Name(), a.downloadFilename(t))
}
return err
}
// Returns path where partially downloaded file should be stored for download resuming
func (a *basicDownloadAdapter) downloadFilename(t *Transfer) string {
return filepath.Join(a.tempDir(), t.Oid+".part")
}
// download starts or resumes and download. dlFile is expected to be an existing file open in RW mode
func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOkFunc func(), dlFile *os.File, fromByte int64, hash hash.Hash) error {
rel, err := t.Rel("download")
if err != nil {
return err
}
if rel == nil {
return errors.Errorf(tr.Tr.Get("Object %s not found on the server.", t.Oid))
}
req, err := a.newHTTPRequest("GET", rel)
if err != nil {
return err
}
if fromByte > 0 {
// We could just use a start byte, but since we know the length be specific
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", fromByte, t.Size-1))
}
req = a.apiClient.LogRequest(req, "lfs.data.download")
res, err := a.makeRequest(t, req)
if err != nil {
if res == nil {
// We encountered a network or similar error which caused us
// to not receive a response at all.
return errors.NewRetriableError(err)
}
// Special-case status code 416 () - fall back
if fromByte > 0 && dlFile != nil && res.StatusCode == 416 {
tracerx.Printf("xfer: server rejected resume download request for %q from byte %d; re-downloading from start", t.Oid, fromByte)
if _, err := dlFile.Seek(0, io.SeekStart); err != nil {
return err
}
if err := dlFile.Truncate(0); err != nil {
return err
}
return a.download(t, cb, authOkFunc, dlFile, 0, nil)
}
// Special-cae status code 429 - retry after certain time
if res.StatusCode == 429 {
retLaterErr := errors.NewRetriableLaterError(err, res.Header["Retry-After"][0])
if retLaterErr != nil {
return retLaterErr
}
}
return errors.NewRetriableError(err)
}
defer res.Body.Close()
// Range request must return 206 & content range to confirm
if fromByte > 0 {
rangeRequestOk := false
var failReason string
// check 206 and Content-Range, fall back if either not as expected
if res.StatusCode == 206 {
// Probably a successful range request, check Content-Range
if rangeHdr := res.Header.Get("Content-Range"); rangeHdr != "" {
regex := regexp.MustCompile(`bytes (\d+)\-.*`)
match := regex.FindStringSubmatch(rangeHdr)
if match != nil && len(match) > 1 {
contentStart, _ := strconv.ParseInt(match[1], 10, 64)
if contentStart == fromByte {
rangeRequestOk = true
} else {
failReason = fmt.Sprintf("Content-Range start byte incorrect: %s expected %d", match[1], fromByte)
}
} else {
failReason = fmt.Sprintf("badly formatted Content-Range header: %q", rangeHdr)
}
} else {
failReason = "missing Content-Range header in response"
}
} else {
failReason = fmt.Sprintf("expected status code 206, received %d", res.StatusCode)
}
if rangeRequestOk {
tracerx.Printf("xfer: server accepted resume download request: %q from byte %d", t.Oid, fromByte)
advanceCallbackProgress(cb, t, fromByte)
} else {
// Abort resume, perform regular download
tracerx.Printf("xfer: failed to resume download for %q from byte %d: %s. Re-downloading from start", t.Oid, fromByte, failReason)
if _, err := dlFile.Seek(0, io.SeekStart); err != nil {
return err
}
if err := dlFile.Truncate(0); err != nil {
return err
}
fromByte = 0
hash = nil
if res.StatusCode == 200 {
// If status code was 200 then server just ignored Range header and
// sent everything. Don't re-request, use this one from byte 0
} else {
// re-request needed
return a.download(t, cb, authOkFunc, dlFile, fromByte, hash)
}
}
}
// Signal auth OK on success response, before starting download to free up
// other workers immediately
if authOkFunc != nil {
authOkFunc()
}
var hasher *tools.HashingReader
httpReader := tools.NewRetriableReader(res.Body)
if fromByte > 0 && hash != nil {
// pre-load hashing reader with previous content
hasher = tools.NewHashingReaderPreloadHash(httpReader, hash)
} else {
hasher = tools.NewHashingReader(httpReader)
}
dlfilename := dlFile.Name()
// Wrap callback to give name context
ccb := func(totalSize int64, readSoFar int64, readSinceLast int) error {
if cb != nil {
return cb(t.Name, totalSize, readSoFar+fromByte, readSinceLast)
}
return nil
}
written, err := tools.CopyWithCallback(dlFile, hasher, res.ContentLength, ccb)
if err != nil {
return errors.Wrapf(err, tr.Tr.Get("cannot write data to temporary file %q", dlfilename))
}
if actual := hasher.Hash(); actual != t.Oid {
return errors.New(tr.Tr.Get("expected OID %s, got %s after %d bytes written", t.Oid, actual, written))
}
if err := dlFile.Close(); err != nil {
return errors.New(tr.Tr.Get("can't close temporary file %q: %v", dlfilename, err))
}
err = tools.RenameFileCopyPermissions(dlfilename, t.Path)
if _, err2 := os.Stat(t.Path); err2 == nil {
// Target file already exists, possibly was downloaded by other git-lfs process
return nil
}
return err
}
func configureBasicDownloadAdapter(m *concreteManifest) {
m.RegisterNewAdapterFunc(BasicAdapterName, Download, func(name string, dir Direction) Adapter {
switch dir {
case Download:
bd := &basicDownloadAdapter{newAdapterBase(m.fs, name, dir, nil)}
// self implements impl
bd.transferImpl = bd
return bd
case Upload:
panic(tr.Tr.Get("Should never ask this function to upload"))
}
return nil
})
}
func (a *basicDownloadAdapter) makeRequest(t *Transfer, req *http.Request) (*http.Response, error) {
res, err := a.doHTTP(t, req)
if errors.IsAuthError(err) && len(req.Header.Get("Authorization")) == 0 {
return a.makeRequest(t, req)
}
return res, err
}