More robust handling of parallel attempts to download the same file

1. git-lfs now only writes to unique temp files created with `ioutil.TempFile`
   that are open with `O_CREATE|O_EXCL`
2. Partially-downloaded file is now atomically borrowed and returned back via `os.Rename`
3. `.part <-> .tmp` and `.tmp -> final` renames are allowed to fail and are handled appropriately

This is a continuation of #3813
Fixes #2825

There are several error codepaths where we borrow .part file but remove it instead of returning back.
I believe that it is OK and in those erroneous cases it is better to restart download from scratch
instead of attempting to use possibly-corrupt .part file.
This commit is contained in:
Marat Radchenko 2019-09-19 15:15:11 +03:00
parent 881e9880d9
commit 1edb976a92

@ -42,52 +42,77 @@ func (a *basicDownloadAdapter) WorkerEnding(workerNum int, ctx interface{}) {
} }
func (a *basicDownloadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error { func (a *basicDownloadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error {
f, fromByte, hashSoFar, err := a.checkResumeDownload(t) // Reserve a temporary filename. We need to make sure nobody operates on the file simultaneously with us.
f, err := tools.TempFile(a.tempDir(), t.Oid, a.fs)
if err != nil { if err != nil {
return err return err
} }
return a.download(t, cb, authOkFunc, f, fromByte, hashSoFar) defer func() {
} f.Close()
// This will delete temp file if:
// - we failed to fully download file and move it to final location including the case when final location already
// exists because other parallel git-lfs processes downloaded file
// - we also failed to move it to a partially-downloaded location
os.Remove(f.Name())
}()
// Checks to see if a download can be resumed, and if so returns a non-nil locked file, byte start and hash // Close file because we will attempt to move partially-downloaded one on top of it
func (a *basicDownloadAdapter) checkResumeDownload(t *Transfer) (outFile *os.File, fromByte int64, hashSoFar hash.Hash, e error) { if err := f.Close(); err != nil {
// lock the file by opening it for read/write, rather than checking Stat() etc return err
// which could be subject to race conditions by other processes
f, err := os.OpenFile(a.downloadFilename(t), os.O_RDWR, 0644)
if err != nil {
// Create a new file instead, must not already exist or error (permissions / race condition)
newfile, err := os.OpenFile(a.downloadFilename(t), os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0644)
return newfile, 0, nil, err
} }
// Successfully opened an existing file at this point // Attempt to resume download. No error checking here. If we fail, we'll simply download from the start
// Read any existing data into hash then return file handle at end os.Rename(a.downloadFilename(t), f.Name())
// Open temp file. It is either empty or partially downloaded
f, err = os.OpenFile(f.Name(), os.O_RDWR, 0644)
if err != nil {
return err
}
// Read any existing data into hash
hash := tools.NewLfsContentHash() hash := tools.NewLfsContentHash()
n, err := io.Copy(hash, f) fromByte, err := io.Copy(hash, f)
if err != nil {
return err
}
// Ensure that partial file seems valid
if fromByte > 0 {
if fromByte < t.Size-1 {
tracerx.Printf("xfer: Attempting to resume download of %q from byte %d", t.Oid, fromByte)
} else {
// Somehow we have more data than expected. Let's retry from the beginning.
if _, err := f.Seek(0, io.SeekStart); err != nil {
return err
}
if err := f.Truncate(0); err != nil {
return err
}
fromByte = 0
hash = nil
}
}
err = a.download(t, cb, authOkFunc, f, fromByte, hash)
if err != nil { if err != nil {
f.Close() f.Close()
return nil, 0, nil, err // Rename file so next download can resume from where we stopped.
// No error checking here, if rename fails then file will be deleted and there just will be no download resuming
os.Rename(f.Name(), a.downloadFilename(t))
} }
tracerx.Printf("xfer: Attempting to resume download of %q from byte %d", t.Oid, n)
return f, n, hash, nil
return err
} }
// Create or open a download file for resuming // Returns path where partially downloaded file should be stored for download resuming
func (a *basicDownloadAdapter) downloadFilename(t *Transfer) string { func (a *basicDownloadAdapter) downloadFilename(t *Transfer) string {
// Not a temp file since we will be resuming it return filepath.Join(a.tempDir(), t.Oid+".part")
return filepath.Join(a.tempDir(), t.Oid+".tmp")
} }
// download starts or resumes and download. Always closes dlFile if non-nil // download starts or resumes and download. dlFile is expected to be an existing file open in RW mode
func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOkFunc func(), dlFile *os.File, fromByte int64, hash hash.Hash) error { func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOkFunc func(), dlFile *os.File, fromByte int64, hash hash.Hash) error {
if dlFile != nil {
// ensure we always close dlFile. Note that this does not conflict with the
// early close below, as close is idempotent.
defer dlFile.Close()
}
rel, err := t.Rel("download") rel, err := t.Rel("download")
if err != nil { if err != nil {
return err return err
@ -102,23 +127,8 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
} }
if fromByte > 0 { if fromByte > 0 {
if dlFile == nil || hash == nil {
return fmt.Errorf("Cannot restart %v from %d without a file & hash", t.Oid, fromByte)
}
if fromByte < t.Size-1 {
// We could just use a start byte, but since we know the length be specific // We could just use a start byte, but since we know the length be specific
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", fromByte, t.Size-1)) req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", fromByte, t.Size-1))
} else {
// Somehow we have more data than expected. Let's retry
// from the top.
dlFile.Close()
os.Remove(dlFile.Name())
dlFile = nil
fromByte = 0
hash = nil
}
} }
req = a.apiClient.LogRequest(req, "lfs.data.download") req = a.apiClient.LogRequest(req, "lfs.data.download")
@ -133,9 +143,13 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
// Special-case status code 416 () - fall back // Special-case status code 416 () - fall back
if fromByte > 0 && dlFile != nil && res.StatusCode == 416 { if fromByte > 0 && dlFile != nil && res.StatusCode == 416 {
tracerx.Printf("xfer: server rejected resume download request for %q from byte %d; re-downloading from start", t.Oid, fromByte) tracerx.Printf("xfer: server rejected resume download request for %q from byte %d; re-downloading from start", t.Oid, fromByte)
dlFile.Close() if _, err := dlFile.Seek(0, io.SeekStart); err != nil {
os.Remove(dlFile.Name()) return err
return a.download(t, cb, authOkFunc, nil, 0, nil) }
if err := dlFile.Truncate(0); err != nil {
return err
}
return a.download(t, cb, authOkFunc, dlFile, 0, nil)
} }
// Special-cae status code 429 - retry after certain time // Special-cae status code 429 - retry after certain time
@ -183,17 +197,22 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
} else { } else {
// Abort resume, perform regular download // Abort resume, perform regular download
tracerx.Printf("xfer: failed to resume download for %q from byte %d: %s. Re-downloading from start", t.Oid, fromByte, failReason) tracerx.Printf("xfer: failed to resume download for %q from byte %d: %s. Re-downloading from start", t.Oid, fromByte, failReason)
dlFile.Close()
os.Remove(dlFile.Name()) if _, err := dlFile.Seek(0, io.SeekStart); err != nil {
return err
}
if err := dlFile.Truncate(0); err != nil {
return err
}
fromByte = 0
hash = nil
if res.StatusCode == 200 { if res.StatusCode == 200 {
// If status code was 200 then server just ignored Range header and // If status code was 200 then server just ignored Range header and
// sent everything. Don't re-request, use this one from byte 0 // sent everything. Don't re-request, use this one from byte 0
dlFile = nil
fromByte = 0
hash = nil
} else { } else {
// re-request needed // re-request needed
return a.download(t, cb, authOkFunc, nil, 0, nil) return a.download(t, cb, authOkFunc, dlFile, fromByte, hash)
} }
} }
} }
@ -214,14 +233,6 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
hasher = tools.NewHashingReader(httpReader) hasher = tools.NewHashingReader(httpReader)
} }
if dlFile == nil {
// New file start
dlFile, err = os.OpenFile(a.downloadFilename(t), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer dlFile.Close()
}
dlfilename := dlFile.Name() dlfilename := dlFile.Name()
// Wrap callback to give name context // Wrap callback to give name context
ccb := func(totalSize int64, readSoFar int64, readSinceLast int) error { ccb := func(totalSize int64, readSoFar int64, readSinceLast int) error {
@ -234,12 +245,13 @@ func (a *basicDownloadAdapter) download(t *Transfer, cb ProgressCallback, authOk
if err != nil { if err != nil {
return errors.Wrapf(err, "cannot write data to tempfile %q", dlfilename) return errors.Wrapf(err, "cannot write data to tempfile %q", dlfilename)
} }
if err := dlFile.Close(); err != nil {
return fmt.Errorf("can't close tempfile %q: %v", dlfilename, err)
}
if actual := hasher.Hash(); actual != t.Oid { if actual := hasher.Hash(); actual != t.Oid {
return fmt.Errorf("Expected OID %s, got %s after %d bytes written", t.Oid, actual, written) return fmt.Errorf("expected OID %s, got %s after %d bytes written", t.Oid, actual, written)
}
if err := dlFile.Close(); err != nil {
return fmt.Errorf("can't close tempfile %q: %v", dlfilename, err)
} }
err = tools.RenameFileCopyPermissions(dlfilename, t.Path) err = tools.RenameFileCopyPermissions(dlfilename, t.Path)