From e04316f742ede29246e29b3b687bbb8dc6c504df Mon Sep 17 00:00:00 2001 From: risk danger olson Date: Fri, 6 Jan 2017 13:31:45 -0700 Subject: [PATCH] progress: add helper to build callback reader for ReadSeekCloser body --- api/api.go | 16 +++++++-- lfs/util_test.go | 29 +++++++++++++++- lfsapi/body.go | 4 +-- progress/copycallback.go | 56 ++++++++++++++++++++++++++++++- test/cmd/lfstest-customadapter.go | 9 +---- tools/iotools.go | 22 ------------ tq/basic_upload.go | 30 +++++++++-------- tq/tus_upload.go | 28 +++++++--------- 8 files changed, 128 insertions(+), 66 deletions(-) diff --git a/api/api.go b/api/api.go index 9cdc6005..ba34409c 100644 --- a/api/api.go +++ b/api/api.go @@ -11,7 +11,6 @@ import ( "github.com/git-lfs/git-lfs/config" "github.com/git-lfs/git-lfs/errors" "github.com/git-lfs/git-lfs/httputil" - "github.com/git-lfs/git-lfs/tools" "github.com/rubyist/tracerx" ) @@ -54,7 +53,7 @@ func Batch(cfg *config.Configuration, objects []*ObjectResource, operation strin req.Header.Set("Content-Type", MediaType) req.Header.Set("Content-Length", strconv.Itoa(len(by))) req.ContentLength = int64(len(by)) - req.Body = tools.NewReadSeekCloserWrapper(bytes.NewReader(by)) + req.Body = newByteBody(by) tracerx.Printf("api: batch %d files", len(objects)) @@ -85,3 +84,16 @@ func Batch(cfg *config.Configuration, objects []*ObjectResource, operation strin return bresp.Objects, bresp.TransferAdapterName, nil } + +// temporary copied code from lfsapi, since api is going away +func newByteBody(by []byte) *closingByteReader { + return &closingByteReader{Reader: bytes.NewReader(by)} +} + +type closingByteReader struct { + *bytes.Reader +} + +func (r *closingByteReader) Close() error { + return nil +} diff --git a/lfs/util_test.go b/lfs/util_test.go index 2697dc3d..ae0c61e8 100644 --- a/lfs/util_test.go +++ b/lfs/util_test.go @@ -8,7 +8,34 @@ import ( "github.com/stretchr/testify/assert" ) -func TestWriterWithCallback(t *testing.T) { +func TestBodyWithCallback(t *testing.T) { + called := 0 + calledRead := make([]int64, 0, 2) + + cb := func(total int64, read int64, current int) error { + called += 1 + calledRead = append(calledRead, read) + assert.Equal(t, 5, int(total)) + return nil + } + reader := progress.NewByteBodyWithCallback([]byte("BOOYA"), 5, cb) + + readBuf := make([]byte, 3) + n, err := reader.Read(readBuf) + assert.Nil(t, err) + assert.Equal(t, "BOO", string(readBuf[0:n])) + + n, err = reader.Read(readBuf) + assert.Nil(t, err) + assert.Equal(t, "YA", string(readBuf[0:n])) + + assert.Equal(t, 2, called) + assert.Len(t, calledRead, 2) + assert.Equal(t, 3, int(calledRead[0])) + assert.Equal(t, 5, int(calledRead[1])) +} + +func TestReadWithCallback(t *testing.T) { called := 0 calledRead := make([]int64, 0, 2) diff --git a/lfsapi/body.go b/lfsapi/body.go index 1fc2aef6..3b54c9f2 100644 --- a/lfsapi/body.go +++ b/lfsapi/body.go @@ -19,11 +19,11 @@ func MarshalToRequest(req *http.Request, obj interface{}) error { } req.ContentLength = int64(len(by)) - req.Body = byteReaderBody(by) + req.Body = NewByteBody(by) return nil } -func byteReaderBody(by []byte) ReadSeekCloser { +func NewByteBody(by []byte) ReadSeekCloser { return &closingByteReader{Reader: bytes.NewReader(by)} } diff --git a/progress/copycallback.go b/progress/copycallback.go index b9075fd2..f291abe8 100644 --- a/progress/copycallback.go +++ b/progress/copycallback.go @@ -1,9 +1,45 @@ package progress -import "io" +import ( + "bytes" + "io" +) type CopyCallback func(totalSize int64, readSoFar int64, readSinceLast int) error +type bodyWithCallback struct { + c CopyCallback + totalSize int64 + readSize int64 + ReadSeekCloser +} + +func NewByteBodyWithCallback(by []byte, totalSize int64, cb CopyCallback) ReadSeekCloser { + return NewBodyWithCallback(NewByteBody(by), totalSize, cb) +} + +func NewBodyWithCallback(body ReadSeekCloser, totalSize int64, cb CopyCallback) ReadSeekCloser { + return &bodyWithCallback{ + c: cb, + totalSize: totalSize, + ReadSeekCloser: body, + } +} + +func (r *bodyWithCallback) Read(p []byte) (int, error) { + n, err := r.ReadSeekCloser.Read(p) + + if n > 0 { + r.readSize += int64(n) + } + + if err == nil && r.c != nil { + err = r.c(r.totalSize, r.readSize, n) + } + + return n, err +} + type CallbackReader struct { C CopyCallback TotalSize int64 @@ -24,3 +60,21 @@ func (w *CallbackReader) Read(p []byte) (int, error) { return n, err } + +// prevent import cycle +type ReadSeekCloser interface { + io.Seeker + io.ReadCloser +} + +func NewByteBody(by []byte) ReadSeekCloser { + return &closingByteReader{Reader: bytes.NewReader(by)} +} + +type closingByteReader struct { + *bytes.Reader +} + +func (r *closingByteReader) Close() error { + return nil +} diff --git a/test/cmd/lfstest-customadapter.go b/test/cmd/lfstest-customadapter.go index c126c398..a5624f5c 100644 --- a/test/cmd/lfstest-customadapter.go +++ b/test/cmd/lfstest-customadapter.go @@ -178,14 +178,7 @@ func performUpload(oid string, size int64, a *action, fromPath string, writer, e sendProgress(oid, readSoFar, readSinceLast, writer, errWriter) return nil } - var reader io.Reader - reader = &progress.CallbackReader{ - C: cb, - TotalSize: size, - Reader: f, - } - - req.Body = ioutil.NopCloser(reader) + req.Body = progress.NewBodyWithCallback(f, size, cb) res, err := httputil.DoHttpRequest(cfg, req, true) if err != nil { diff --git a/tools/iotools.go b/tools/iotools.go index 8c1077fa..13b5f7ed 100644 --- a/tools/iotools.go +++ b/tools/iotools.go @@ -10,28 +10,6 @@ import ( "github.com/git-lfs/git-lfs/progress" ) -type readSeekCloserWrapper struct { - readSeeker io.ReadSeeker -} - -func (r *readSeekCloserWrapper) Read(p []byte) (n int, err error) { - return r.readSeeker.Read(p) -} - -func (r *readSeekCloserWrapper) Seek(offset int64, whence int) (int64, error) { - return r.readSeeker.Seek(offset, whence) -} - -func (r *readSeekCloserWrapper) Close() error { - return nil -} - -// NewReadSeekCloserWrapper wraps an io.ReadSeeker and implements a no-op Close() function -// to make it an io.ReadCloser -func NewReadSeekCloserWrapper(r io.ReadSeeker) io.ReadCloser { - return &readSeekCloserWrapper{r} -} - // CopyWithCallback copies reader to writer while performing a progress callback func CopyWithCallback(writer io.Writer, reader io.Reader, totalSize int64, cb progress.CopyCallback) (int64, error) { if success, _ := CloneFile(writer, reader); success { diff --git a/tq/basic_upload.go b/tq/basic_upload.go index b1cb9fb7..41b9d082 100644 --- a/tq/basic_upload.go +++ b/tq/basic_upload.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/git-lfs/git-lfs/errors" + "github.com/git-lfs/git-lfs/lfsapi" "github.com/git-lfs/git-lfs/progress" ) @@ -79,21 +80,17 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres } return nil } - var reader io.Reader - reader = &progress.CallbackReader{ - C: ccb, - TotalSize: t.Size, - Reader: f, - } + var reader lfsapi.ReadSeekCloser = progress.NewBodyWithCallback(f, t.Size, ccb) // Signal auth was ok on first read; this frees up other workers to start if authOkFunc != nil { - reader = newStartCallbackReader(reader, func(*startCallbackReader) { + reader = newStartCallbackReader(reader, func() error { authOkFunc() + return nil }) } - req.Body = ioutil.NopCloser(reader) + req.Body = reader res, err := a.doHTTP(t, req) if err != nil { @@ -126,20 +123,25 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres // startCallbackReader is a reader wrapper which calls a function as soon as the // first Read() call is made. This callback is only made once type startCallbackReader struct { - r io.Reader - cb func(*startCallbackReader) + cb func() error cbDone bool + lfsapi.ReadSeekCloser } func (s *startCallbackReader) Read(p []byte) (n int, err error) { if !s.cbDone && s.cb != nil { - s.cb(s) + if err := s.cb(); err != nil { + return 0, err + } s.cbDone = true } - return s.r.Read(p) + return s.ReadSeekCloser.Read(p) } -func newStartCallbackReader(r io.Reader, cb func(*startCallbackReader)) *startCallbackReader { - return &startCallbackReader{r, cb, false} +func newStartCallbackReader(r lfsapi.ReadSeekCloser, cb func() error) *startCallbackReader { + return &startCallbackReader{ + ReadSeekCloser: r, + cb: cb, + } } func configureBasicUploadAdapter(m *Manifest) { diff --git a/tq/tus_upload.go b/tq/tus_upload.go index 9722fc24..75317a39 100644 --- a/tq/tus_upload.go +++ b/tq/tus_upload.go @@ -90,10 +90,6 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC } else { tracerx.Printf("xfer: tus.io resuming upload %q from %d", t.Oid, offset) advanceCallbackProgress(cb, t, offset) - _, err := f.Seek(offset, os.SEEK_CUR) - if err != nil { - return errors.Wrap(err, "tus upload") - } } // 2. Send PATCH request with byte start point (even if 0) in Upload-Offset @@ -121,21 +117,21 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC } return nil } - var reader io.Reader - reader = &progress.CallbackReader{ - C: ccb, - TotalSize: t.Size, - Reader: f, - } - // Signal auth was ok on first read; this frees up other workers to start - if authOkFunc != nil { - reader = newStartCallbackReader(reader, func(*startCallbackReader) { + var reader lfsapi.ReadSeekCloser = progress.NewBodyWithCallback(f, t.Size, ccb) + reader = newStartCallbackReader(reader, func() error { + // seek to the offset since lfsapi.Client rewinds the body + if _, err := f.Seek(offset, os.SEEK_CUR); err != nil { + return err + } + // Signal auth was ok on first read; this frees up other workers to start + if authOkFunc != nil { authOkFunc() - }) - } + } + return nil + }) - req.Body = ioutil.NopCloser(reader) + req.Body = reader res, err = a.doHTTP(t, req) if err != nil {