progress: add helper to build callback reader for ReadSeekCloser body
This commit is contained in:
parent
3621b987e5
commit
e04316f742
16
api/api.go
16
api/api.go
@ -11,7 +11,6 @@ import (
|
||||
"github.com/git-lfs/git-lfs/config"
|
||||
"github.com/git-lfs/git-lfs/errors"
|
||||
"github.com/git-lfs/git-lfs/httputil"
|
||||
"github.com/git-lfs/git-lfs/tools"
|
||||
|
||||
"github.com/rubyist/tracerx"
|
||||
)
|
||||
@ -54,7 +53,7 @@ func Batch(cfg *config.Configuration, objects []*ObjectResource, operation strin
|
||||
req.Header.Set("Content-Type", MediaType)
|
||||
req.Header.Set("Content-Length", strconv.Itoa(len(by)))
|
||||
req.ContentLength = int64(len(by))
|
||||
req.Body = tools.NewReadSeekCloserWrapper(bytes.NewReader(by))
|
||||
req.Body = newByteBody(by)
|
||||
|
||||
tracerx.Printf("api: batch %d files", len(objects))
|
||||
|
||||
@ -85,3 +84,16 @@ func Batch(cfg *config.Configuration, objects []*ObjectResource, operation strin
|
||||
|
||||
return bresp.Objects, bresp.TransferAdapterName, nil
|
||||
}
|
||||
|
||||
// temporary copied code from lfsapi, since api is going away
|
||||
func newByteBody(by []byte) *closingByteReader {
|
||||
return &closingByteReader{Reader: bytes.NewReader(by)}
|
||||
}
|
||||
|
||||
type closingByteReader struct {
|
||||
*bytes.Reader
|
||||
}
|
||||
|
||||
func (r *closingByteReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
@ -8,7 +8,34 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestWriterWithCallback(t *testing.T) {
|
||||
func TestBodyWithCallback(t *testing.T) {
|
||||
called := 0
|
||||
calledRead := make([]int64, 0, 2)
|
||||
|
||||
cb := func(total int64, read int64, current int) error {
|
||||
called += 1
|
||||
calledRead = append(calledRead, read)
|
||||
assert.Equal(t, 5, int(total))
|
||||
return nil
|
||||
}
|
||||
reader := progress.NewByteBodyWithCallback([]byte("BOOYA"), 5, cb)
|
||||
|
||||
readBuf := make([]byte, 3)
|
||||
n, err := reader.Read(readBuf)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "BOO", string(readBuf[0:n]))
|
||||
|
||||
n, err = reader.Read(readBuf)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "YA", string(readBuf[0:n]))
|
||||
|
||||
assert.Equal(t, 2, called)
|
||||
assert.Len(t, calledRead, 2)
|
||||
assert.Equal(t, 3, int(calledRead[0]))
|
||||
assert.Equal(t, 5, int(calledRead[1]))
|
||||
}
|
||||
|
||||
func TestReadWithCallback(t *testing.T) {
|
||||
called := 0
|
||||
calledRead := make([]int64, 0, 2)
|
||||
|
||||
|
@ -19,11 +19,11 @@ func MarshalToRequest(req *http.Request, obj interface{}) error {
|
||||
}
|
||||
|
||||
req.ContentLength = int64(len(by))
|
||||
req.Body = byteReaderBody(by)
|
||||
req.Body = NewByteBody(by)
|
||||
return nil
|
||||
}
|
||||
|
||||
func byteReaderBody(by []byte) ReadSeekCloser {
|
||||
func NewByteBody(by []byte) ReadSeekCloser {
|
||||
return &closingByteReader{Reader: bytes.NewReader(by)}
|
||||
}
|
||||
|
||||
|
@ -1,9 +1,45 @@
|
||||
package progress
|
||||
|
||||
import "io"
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
)
|
||||
|
||||
type CopyCallback func(totalSize int64, readSoFar int64, readSinceLast int) error
|
||||
|
||||
type bodyWithCallback struct {
|
||||
c CopyCallback
|
||||
totalSize int64
|
||||
readSize int64
|
||||
ReadSeekCloser
|
||||
}
|
||||
|
||||
func NewByteBodyWithCallback(by []byte, totalSize int64, cb CopyCallback) ReadSeekCloser {
|
||||
return NewBodyWithCallback(NewByteBody(by), totalSize, cb)
|
||||
}
|
||||
|
||||
func NewBodyWithCallback(body ReadSeekCloser, totalSize int64, cb CopyCallback) ReadSeekCloser {
|
||||
return &bodyWithCallback{
|
||||
c: cb,
|
||||
totalSize: totalSize,
|
||||
ReadSeekCloser: body,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *bodyWithCallback) Read(p []byte) (int, error) {
|
||||
n, err := r.ReadSeekCloser.Read(p)
|
||||
|
||||
if n > 0 {
|
||||
r.readSize += int64(n)
|
||||
}
|
||||
|
||||
if err == nil && r.c != nil {
|
||||
err = r.c(r.totalSize, r.readSize, n)
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
type CallbackReader struct {
|
||||
C CopyCallback
|
||||
TotalSize int64
|
||||
@ -24,3 +60,21 @@ func (w *CallbackReader) Read(p []byte) (int, error) {
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// prevent import cycle
|
||||
type ReadSeekCloser interface {
|
||||
io.Seeker
|
||||
io.ReadCloser
|
||||
}
|
||||
|
||||
func NewByteBody(by []byte) ReadSeekCloser {
|
||||
return &closingByteReader{Reader: bytes.NewReader(by)}
|
||||
}
|
||||
|
||||
type closingByteReader struct {
|
||||
*bytes.Reader
|
||||
}
|
||||
|
||||
func (r *closingByteReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
@ -178,14 +178,7 @@ func performUpload(oid string, size int64, a *action, fromPath string, writer, e
|
||||
sendProgress(oid, readSoFar, readSinceLast, writer, errWriter)
|
||||
return nil
|
||||
}
|
||||
var reader io.Reader
|
||||
reader = &progress.CallbackReader{
|
||||
C: cb,
|
||||
TotalSize: size,
|
||||
Reader: f,
|
||||
}
|
||||
|
||||
req.Body = ioutil.NopCloser(reader)
|
||||
req.Body = progress.NewBodyWithCallback(f, size, cb)
|
||||
|
||||
res, err := httputil.DoHttpRequest(cfg, req, true)
|
||||
if err != nil {
|
||||
|
@ -10,28 +10,6 @@ import (
|
||||
"github.com/git-lfs/git-lfs/progress"
|
||||
)
|
||||
|
||||
type readSeekCloserWrapper struct {
|
||||
readSeeker io.ReadSeeker
|
||||
}
|
||||
|
||||
func (r *readSeekCloserWrapper) Read(p []byte) (n int, err error) {
|
||||
return r.readSeeker.Read(p)
|
||||
}
|
||||
|
||||
func (r *readSeekCloserWrapper) Seek(offset int64, whence int) (int64, error) {
|
||||
return r.readSeeker.Seek(offset, whence)
|
||||
}
|
||||
|
||||
func (r *readSeekCloserWrapper) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewReadSeekCloserWrapper wraps an io.ReadSeeker and implements a no-op Close() function
|
||||
// to make it an io.ReadCloser
|
||||
func NewReadSeekCloserWrapper(r io.ReadSeeker) io.ReadCloser {
|
||||
return &readSeekCloserWrapper{r}
|
||||
}
|
||||
|
||||
// CopyWithCallback copies reader to writer while performing a progress callback
|
||||
func CopyWithCallback(writer io.Writer, reader io.Reader, totalSize int64, cb progress.CopyCallback) (int64, error) {
|
||||
if success, _ := CloneFile(writer, reader); success {
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/git-lfs/git-lfs/errors"
|
||||
"github.com/git-lfs/git-lfs/lfsapi"
|
||||
"github.com/git-lfs/git-lfs/progress"
|
||||
)
|
||||
|
||||
@ -79,21 +80,17 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
|
||||
}
|
||||
return nil
|
||||
}
|
||||
var reader io.Reader
|
||||
reader = &progress.CallbackReader{
|
||||
C: ccb,
|
||||
TotalSize: t.Size,
|
||||
Reader: f,
|
||||
}
|
||||
var reader lfsapi.ReadSeekCloser = progress.NewBodyWithCallback(f, t.Size, ccb)
|
||||
|
||||
// Signal auth was ok on first read; this frees up other workers to start
|
||||
if authOkFunc != nil {
|
||||
reader = newStartCallbackReader(reader, func(*startCallbackReader) {
|
||||
reader = newStartCallbackReader(reader, func() error {
|
||||
authOkFunc()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
req.Body = ioutil.NopCloser(reader)
|
||||
req.Body = reader
|
||||
|
||||
res, err := a.doHTTP(t, req)
|
||||
if err != nil {
|
||||
@ -126,20 +123,25 @@ func (a *basicUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb Progres
|
||||
// startCallbackReader is a reader wrapper which calls a function as soon as the
|
||||
// first Read() call is made. This callback is only made once
|
||||
type startCallbackReader struct {
|
||||
r io.Reader
|
||||
cb func(*startCallbackReader)
|
||||
cb func() error
|
||||
cbDone bool
|
||||
lfsapi.ReadSeekCloser
|
||||
}
|
||||
|
||||
func (s *startCallbackReader) Read(p []byte) (n int, err error) {
|
||||
if !s.cbDone && s.cb != nil {
|
||||
s.cb(s)
|
||||
if err := s.cb(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
s.cbDone = true
|
||||
}
|
||||
return s.r.Read(p)
|
||||
return s.ReadSeekCloser.Read(p)
|
||||
}
|
||||
func newStartCallbackReader(r lfsapi.ReadSeekCloser, cb func() error) *startCallbackReader {
|
||||
return &startCallbackReader{
|
||||
ReadSeekCloser: r,
|
||||
cb: cb,
|
||||
}
|
||||
func newStartCallbackReader(r io.Reader, cb func(*startCallbackReader)) *startCallbackReader {
|
||||
return &startCallbackReader{r, cb, false}
|
||||
}
|
||||
|
||||
func configureBasicUploadAdapter(m *Manifest) {
|
||||
|
@ -90,10 +90,6 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
|
||||
} else {
|
||||
tracerx.Printf("xfer: tus.io resuming upload %q from %d", t.Oid, offset)
|
||||
advanceCallbackProgress(cb, t, offset)
|
||||
_, err := f.Seek(offset, os.SEEK_CUR)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "tus upload")
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Send PATCH request with byte start point (even if 0) in Upload-Offset
|
||||
@ -121,21 +117,21 @@ func (a *tusUploadAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressC
|
||||
}
|
||||
return nil
|
||||
}
|
||||
var reader io.Reader
|
||||
reader = &progress.CallbackReader{
|
||||
C: ccb,
|
||||
TotalSize: t.Size,
|
||||
Reader: f,
|
||||
}
|
||||
|
||||
var reader lfsapi.ReadSeekCloser = progress.NewBodyWithCallback(f, t.Size, ccb)
|
||||
reader = newStartCallbackReader(reader, func() error {
|
||||
// seek to the offset since lfsapi.Client rewinds the body
|
||||
if _, err := f.Seek(offset, os.SEEK_CUR); err != nil {
|
||||
return err
|
||||
}
|
||||
// Signal auth was ok on first read; this frees up other workers to start
|
||||
if authOkFunc != nil {
|
||||
reader = newStartCallbackReader(reader, func(*startCallbackReader) {
|
||||
authOkFunc()
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
req.Body = ioutil.NopCloser(reader)
|
||||
req.Body = reader
|
||||
|
||||
res, err = a.doHTTP(t, req)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user