Merge pull request #1650 from git-lfs/catfilebatch-v2

rewrite new catfilebatch implementation for upcoming gitscanner pkg
This commit is contained in:
risk danger olson 2016-11-16 16:50:15 -07:00 committed by GitHub
commit ced527048c
3 changed files with 251 additions and 121 deletions

@ -0,0 +1,126 @@
package lfs
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"strconv"
"github.com/git-lfs/git-lfs/errors"
)
// runCatFileBatch uses 'git cat-file --batch' to get the object contents of a
// git object, given its sha1. The contents will be decoded into a Git LFS
// pointer. Git Blob SHA1s are read from the sha1Ch channel and fed to STDIN.
// Results are parsed from STDOUT, and any elegible LFS pointers are sent to
// pointerCh. Any errors are sent to errCh. An error is returned if the 'git
// cat-file' command fails to start.
func runCatFileBatch(pointerCh chan *WrappedPointer, revs *StringChannelWrapper, errCh chan error) error {
cmd, err := startCommand("git", "cat-file", "--batch")
if err != nil {
return err
}
go catFileBatchOutput(pointerCh, cmd, errCh)
go catFileBatchInput(cmd, revs, errCh)
return nil
}
func catFileBatchOutput(pointerCh chan *WrappedPointer, cmd *wrappedCmd, errCh chan error) {
scanner := &catFileBatchScanner{r: cmd.Stdout}
for scanner.Scan() {
pointerCh <- scanner.Pointer()
}
if err := scanner.Err(); err != nil {
errCh <- err
}
stderr, _ := ioutil.ReadAll(cmd.Stderr)
err := cmd.Wait()
if err != nil {
errCh <- fmt.Errorf("Error in git cat-file --batch: %v %v", err, string(stderr))
}
close(pointerCh)
close(errCh)
}
func catFileBatchInput(cmd *wrappedCmd, revs *StringChannelWrapper, errCh chan error) {
for r := range revs.Results {
cmd.Stdin.Write([]byte(r + "\n"))
}
err := revs.Wait()
if err != nil {
// We can share errchan with other goroutine since that won't close it
// until we close the stdin below
errCh <- err
}
cmd.Stdin.Close()
}
type catFileBatchScanner struct {
r *bufio.Reader
pointer *WrappedPointer
err error
}
func (s *catFileBatchScanner) Pointer() *WrappedPointer {
return s.pointer
}
func (s *catFileBatchScanner) Err() error {
return s.err
}
func (s *catFileBatchScanner) Scan() bool {
s.pointer, s.err = nil, nil
p, err := scanPointer(s.r)
if err != nil {
// EOF halts scanning, but isn't a reportable error
if err != io.EOF {
s.err = err
}
return false
}
s.pointer = p
return true
}
func scanPointer(r *bufio.Reader) (*WrappedPointer, error) {
var pointer *WrappedPointer
for pointer == nil {
l, err := r.ReadBytes('\n')
if err != nil {
return nil, err
}
// Line is formatted:
// <sha1> <type> <size>
fields := bytes.Fields(l)
if len(fields) < 3 {
return nil, errors.Wrap(fmt.Errorf("Invalid: %q", string(l)), "git cat-file --batch:")
}
size, _ := strconv.Atoi(string(fields[2]))
p, err := DecodePointer(io.LimitReader(r, int64(size)))
if err == nil {
pointer = &WrappedPointer{
Sha1: string(fields[0]),
Size: p.Size,
Pointer: p,
}
}
_, err = r.ReadBytes('\n') // Extra \n inserted by cat-file
if err != nil {
return nil, err
}
}
return pointer, nil
}

@ -0,0 +1,117 @@
package lfs
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"strconv"
)
// runCatFileBatchCheck uses 'git cat-file --batch-check' to get the type and
// size of a git object. Any object that isn't of type blob and under the
// blobSizeCutoff will be ignored. revs is a channel over which strings
// containing git sha1s will be sent. It returns a channel from which sha1
// strings can be read.
func runCatFileBatchCheck(smallRevCh chan string, revs *StringChannelWrapper, errCh chan error) error {
cmd, err := startCommand("git", "cat-file", "--batch-check")
if err != nil {
return err
}
go catFileBatchCheckOutput(smallRevCh, cmd, errCh)
go catFileBatchCheckInput(cmd, revs, errCh)
return nil
}
func catFileBatchCheckOutput(smallRevCh chan string, cmd *wrappedCmd, errCh chan error) {
scanner := &catFileBatchCheckScanner{s: bufio.NewScanner(cmd.Stdout)}
for scanner.Scan() {
smallRevCh <- scanner.BlobOID()
}
if err := scanner.Err(); err != nil {
errCh <- err
}
stderr, _ := ioutil.ReadAll(cmd.Stderr)
err := cmd.Wait()
if err != nil {
errCh <- fmt.Errorf("Error in git cat-file --batch-check: %v %v", err, string(stderr))
}
close(smallRevCh)
close(errCh)
}
func catFileBatchCheckInput(cmd *wrappedCmd, revs *StringChannelWrapper, errCh chan error) {
for r := range revs.Results {
cmd.Stdin.Write([]byte(r + "\n"))
}
err := revs.Wait()
if err != nil {
// We can share errchan with other goroutine since that won't close it
// until we close the stdin below
errCh <- err
}
cmd.Stdin.Close()
}
type catFileBatchCheckScanner struct {
s *bufio.Scanner
blobOID string
err error
}
func (s *catFileBatchCheckScanner) BlobOID() string {
return s.blobOID
}
func (s *catFileBatchCheckScanner) Err() error {
return s.err
}
func (s *catFileBatchCheckScanner) Scan() bool {
s.blobOID, s.err = "", nil
b, err := scanBlobOID(s.s)
if err != nil {
// EOF halts scanning, but isn't a reportable error
if err != io.EOF {
s.err = err
}
return false
}
s.blobOID = b
return true
}
func scanBlobOID(s *bufio.Scanner) (string, error) {
objType := "blob"
for s.Scan() {
line := s.Text()
lineLen := len(line)
// Format is:
// <sha1> <type> <size>
// type is at a fixed spot, if we see that it's "blob", we can avoid
// splitting the line just to get the size.
if lineLen < 46 {
continue
}
if line[41:45] != objType {
continue
}
size, err := strconv.Atoi(line[46:lineLen])
if err != nil {
continue
}
if size < blobSizeCutoff {
return line[0:40], nil
}
}
return "", io.EOF
}

@ -499,66 +499,12 @@ func revListIndex(atRef string, cache bool, indexMap *indexFileMap) (*StringChan
// which strings containing git sha1s will be sent. It returns a channel
// from which sha1 strings can be read.
func catFileBatchCheck(revs *StringChannelWrapper) (*StringChannelWrapper, error) {
cmd, err := startCommand("git", "cat-file", "--batch-check")
if err != nil {
smallRevCh := make(chan string, chanBufSize)
errCh := make(chan error, 2) // up to 2 errors, one from each goroutine
if err := runCatFileBatchCheck(smallRevCh, revs, errCh); err != nil {
return nil, err
}
smallRevs := make(chan string, chanBufSize)
errchan := make(chan error, 2) // up to 2 errors, one from each goroutine
go func() {
scanner := bufio.NewScanner(cmd.Stdout)
for scanner.Scan() {
line := scanner.Text()
lineLen := len(line)
// Format is:
// <sha1> <type> <size>
// type is at a fixed spot, if we see that it's "blob", we can avoid
// splitting the line just to get the size.
if lineLen < 46 {
continue
}
if line[41:45] != "blob" {
continue
}
size, err := strconv.Atoi(line[46:lineLen])
if err != nil {
continue
}
if size < blobSizeCutoff {
smallRevs <- line[0:40]
}
}
stderr, _ := ioutil.ReadAll(cmd.Stderr)
err := cmd.Wait()
if err != nil {
errchan <- fmt.Errorf("Error in git cat-file --batch-check: %v %v", err, string(stderr))
}
close(smallRevs)
close(errchan)
}()
go func() {
for r := range revs.Results {
cmd.Stdin.Write([]byte(r + "\n"))
}
err := revs.Wait()
if err != nil {
// We can share errchan with other goroutine since that won't close it
// until we close the stdin below
errchan <- err
}
cmd.Stdin.Close()
}()
return NewStringChannelWrapper(smallRevs, errchan), nil
return NewStringChannelWrapper(smallRevCh, errCh), nil
}
// catFileBatch uses git cat-file --batch to get the object contents
@ -566,70 +512,12 @@ func catFileBatchCheck(revs *StringChannelWrapper) (*StringChannelWrapper, error
// a Git LFS pointer. revs is a channel over which strings containing Git SHA1s
// will be sent. It returns a channel from which point.Pointers can be read.
func catFileBatch(revs *StringChannelWrapper) (*PointerChannelWrapper, error) {
cmd, err := startCommand("git", "cat-file", "--batch")
if err != nil {
pointerCh := make(chan *WrappedPointer, chanBufSize)
errCh := make(chan error, 5) // shared by 2 goroutines & may add more detail errors?
if err := runCatFileBatch(pointerCh, revs, errCh); err != nil {
return nil, err
}
pointers := make(chan *WrappedPointer, chanBufSize)
errchan := make(chan error, 5) // shared by 2 goroutines & may add more detail errors?
go func() {
for {
l, err := cmd.Stdout.ReadBytes('\n')
if err != nil {
break
}
// Line is formatted:
// <sha1> <type> <size>
fields := bytes.Fields(l)
s, _ := strconv.Atoi(string(fields[2]))
nbuf := make([]byte, s)
_, err = io.ReadFull(cmd.Stdout, nbuf)
if err != nil {
break // Legit errors
}
p, err := DecodePointer(bytes.NewBuffer(nbuf))
if err == nil {
pointers <- &WrappedPointer{
Sha1: string(fields[0]),
Size: p.Size,
Pointer: p,
}
}
_, err = cmd.Stdout.ReadBytes('\n') // Extra \n inserted by cat-file
if err != nil {
break
}
}
stderr, _ := ioutil.ReadAll(cmd.Stderr)
err = cmd.Wait()
if err != nil {
errchan <- fmt.Errorf("Error in git cat-file --batch: %v %v", err, string(stderr))
}
close(pointers)
close(errchan)
}()
go func() {
for r := range revs.Results {
cmd.Stdin.Write([]byte(r + "\n"))
}
err := revs.Wait()
if err != nil {
// We can share errchan with other goroutine since that won't close it
// until we close the stdin below
errchan <- err
}
cmd.Stdin.Close()
}()
return NewPointerChannelWrapper(pointers, errchan), nil
return NewPointerChannelWrapper(pointerCh, errCh), nil
}
type wrappedCmd struct {
@ -1107,7 +995,6 @@ type BaseChannelWrapper struct {
}
func (w *BaseChannelWrapper) Wait() error {
var err error
for e := range w.errorChan {
if err != nil {