progress,tools,tq: implement ResetProgress on progress.BodyReader

This commit is contained in:
Taylor Blau 2017-02-17 16:52:33 -07:00
parent 6124c447b2
commit 589dffbefa
5 changed files with 61 additions and 141 deletions

@ -7,26 +7,30 @@ import (
type CopyCallback func(totalSize int64, readSoFar int64, readSinceLast int) error
type bodyWithCallback struct {
type BodyWithCallback struct {
c CopyCallback
totalSize int64
readSize int64
ReadSeekCloser
}
func NewByteBodyWithCallback(by []byte, totalSize int64, cb CopyCallback) ReadSeekCloser {
var _ ReadSeekCloser = (*BodyWithCallback)(nil)
func NewByteBodyWithCallback(by []byte, totalSize int64, cb CopyCallback) *BodyWithCallback {
return NewBodyWithCallback(NewByteBody(by), totalSize, cb)
}
func NewBodyWithCallback(body ReadSeekCloser, totalSize int64, cb CopyCallback) ReadSeekCloser {
return &bodyWithCallback{
func NewBodyWithCallback(body ReadSeekCloser, totalSize int64, cb CopyCallback) *BodyWithCallback {
return &BodyWithCallback{
c: cb,
totalSize: totalSize,
ReadSeekCloser: body,
}
}
func (r *bodyWithCallback) Read(p []byte) (int, error) {
// Read wraps the underlying Reader's "Read" method. It also captures the number
// of bytes read, and calls the callback.
func (r *BodyWithCallback) Read(p []byte) (int, error) {
n, err := r.ReadSeekCloser.Read(p)
if n > 0 {
@ -39,6 +43,27 @@ func (r *bodyWithCallback) Read(p []byte) (int, error) {
return n, err
}
// Seek wraps the underlying Seeker's "Seek" method, updating the number of
// bytes that have been consumed by this reader.
func (r *BodyWithCallback) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
r.readSize = offset
case io.SeekCurrent:
r.readSize += offset
case io.SeekEnd:
r.readSize = r.totalSize + offset
}
return r.ReadSeekCloser.Seek(offset, whence)
}
// ResetProgress calls the callback with a negative read size equal to the
// total number of bytes read so far, effectively "resetting" the progress.
func (r *BodyWithCallback) ResetProgress() error {
return r.c(r.totalSize, r.readSize, -int(r.readSize))
}
type CallbackReader struct {
C CopyCallback
TotalSize int64

@ -72,3 +72,29 @@ func TestEOFReaderReturnsEOFs(t *testing.T) {
assert.Equal(t, 1, n)
assert.Equal(t, io.EOF, err)
}
func TestBodyCallbackReaderCountsReads(t *testing.T) {
br := NewByteBodyWithCallback([]byte{0x1, 0x2, 0x3, 0x4}, 4, nil)
assert.EqualValues(t, 0, br.readSize)
p := make([]byte, 8)
n, err := br.Read(p)
assert.Equal(t, 4, n)
assert.Nil(t, err)
assert.EqualValues(t, 4, br.readSize)
}
func TestBodyCallbackReaderUpdatesOffsetOnSeek(t *testing.T) {
br := NewByteBodyWithCallback([]byte{0x1, 0x2, 0x3, 0x4}, 4, nil)
br.Seek(1, io.SeekStart)
assert.EqualValues(t, 1, br.readSize)
br.Seek(1, io.SeekCurrent)
assert.EqualValues(t, 2, br.readSize)
br.Seek(-1, io.SeekEnd)
assert.EqualValues(t, 3, br.readSize)
}

@ -8,7 +8,6 @@ import (
"io"
"io/ioutil"
"os"
"sync/atomic"
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/progress"
@ -145,71 +144,3 @@ func Spool(to io.Writer, from io.Reader) (n int64, err error) {
return io.Copy(to, spool)
}
type ReadSeekCloser interface {
io.Reader
io.Seeker
io.Closer
}
// CountingReadSeekCloser wraps an underlying reader and keeps track of the
// number of bytes that that reader has read.
type CountingReadSeekCloser struct {
ReadSeekCloser
// n is the number of bytes that have been read from the underlying
// reader.
n int64
// n is the number of bytes that have been read from the underlying
// reader.
totalSize int64
}
var _ io.Reader = (*CountingReadSeekCloser)(nil)
// NewCountingReadSeekCloser wraps a given io.Reader "r" as a
// CountingReadSeekCloser. If "r" already is a CountingReadSeekCloser, it will
// be returned as is.
func NewCountingReadSeekCloser(r ReadSeekCloser, total int64) *CountingReadSeekCloser {
if cr, ok := r.(*CountingReadSeekCloser); ok {
return cr
}
return &CountingReadSeekCloser{
ReadSeekCloser: r,
totalSize: total,
}
}
// Read wraps the underlying Reader's "Read" method. It also captures the number
// of bytes read, and atomically updates the running count, making this method
// safe to call across multiple goroutines.
func (r *CountingReadSeekCloser) Read(p []byte) (n int, err error) {
n, err = r.ReadSeekCloser.Read(p)
atomic.AddInt64(&r.n, int64(n))
return
}
// Seek wraps the underlying Seeker's "Seak" method, atomically updating the
// number of bytes that have been consumed by this reader. Since the data access
// is atomic, this method is safe to call across multiple goroutines.
func (r *CountingReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
atomic.SwapInt64(&r.n, offset)
case io.SeekCurrent:
atomic.AddInt64(&r.n, offset)
case io.SeekEnd:
atomic.SwapInt64(&r.n, r.totalSize+offset)
}
return r.ReadSeekCloser.Seek(offset, whence)
}
// N returns the number of bytes read from the underlying reader.
//
// N uses accesses the number of bytes read atomically, so this method is safe
// to call across multiple goroutines.
func (r *CountingReadSeekCloser) N() int64 { return atomic.LoadInt64(&r.n) }

@ -64,48 +64,6 @@ func TestRetriableReaderDoesNotRewrap(t *testing.T) {
}
func TestCountingReaderCountsReads(t *testing.T) {
cr := tools.NewCountingReadSeekCloser(NopReadSeekCloser(bytes.NewReader(
[]byte{0x1, 0x2, 0x3, 0x4},
)), 0)
assert.EqualValues(t, 0, cr.N())
p := make([]byte, 8)
n, err := cr.Read(p)
assert.Equal(t, 4, n)
assert.Nil(t, err)
assert.EqualValues(t, 4, cr.N())
}
func TestCountingReaderPassesErrors(t *testing.T) {
expected := errors.New("some err")
cr := tools.NewCountingReadSeekCloser(NopReadSeekCloser(&ErrReader{expected}), -1)
p := make([]byte, 4)
n, err := cr.Read(p)
assert.Equal(t, 0, n)
assert.Equal(t, expected, err)
}
func TestCountingReaderUpdatesOffsetOnSeek(t *testing.T) {
cr := tools.NewCountingReadSeekCloser(NopReadSeekCloser(bytes.NewReader(
[]byte{0x1, 0x2, 0x3, 0x4},
)), 4)
cr.Seek(1, io.SeekStart)
assert.EqualValues(t, 1, cr.N())
cr.Seek(1, io.SeekCurrent)
assert.EqualValues(t, 2, cr.N())
cr.Seek(-1, io.SeekEnd)
assert.EqualValues(t, 3, cr.N())
}
// ErrReader implements io.Reader and only returns errors.
type ErrReader struct {
// err is the error that this reader will return.
@ -116,18 +74,3 @@ type ErrReader struct {
func (e *ErrReader) Read(p []byte) (n int, err error) {
return 0, e.err
}
// Seek implements io.Seeker.Seek and returns (0, e.err).
func (e *ErrReader) Seek(offset int64, whence int) (int64, error) {
return 0, e.err
}
type readSeekCloser struct {
io.ReadSeeker
}
func NopReadSeekCloser(r io.ReadSeeker) tools.ReadSeekCloser {
return &readSeekCloser{r}
}
func (rsc *readSeekCloser) Close() error { return nil }

@ -11,7 +11,6 @@ import (
"github.com/git-lfs/git-lfs/errors"
"github.com/git-lfs/git-lfs/lfsapi"
"github.com/git-lfs/git-lfs/progress"
"github.com/git-lfs/git-lfs/tools"
)
const (
@ -73,11 +72,6 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
}
defer f.Close()
stat, err := f.Stat()
if err != nil {
return errors.Wrap(err, "basic upload stat")
}
// Ensure progress callbacks made while uploading
// Wrap callback to give name context
ccb := func(totalSize int64, readSoFar int64, readSinceLast int) error {
@ -87,8 +81,8 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
return nil
}
fcount := tools.NewCountingReadSeekCloser(f, stat.Size())
var reader lfsapi.ReadSeekCloser = progress.NewBodyWithCallback(fcount, t.Size, ccb)
cbr := progress.NewBodyWithCallback(f, t.Size, ccb)
var reader lfsapi.ReadSeekCloser = cbr
// Signal auth was ok on first read; this frees up other workers to start
if authOkFunc != nil {
@ -108,8 +102,9 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
// Either way, let's decrement the number of bytes that we've
// read _so far_, so that the next iteration doesn't re-transfer
// those bytes, according to the progress meter.
offset := fcount.N()
ccb(t.Size, offset, -int(offset))
if perr := cbr.ResetProgress(); perr != nil {
err = errors.Wrap(err, perr.Error())
}
return errors.NewRetriableError(err)
}