rewrite new catfilebatch implementation for upcoming gitscanner pkg
This commit is contained in:
parent
4935486492
commit
01e9e59a91
84
lfs/gitscanner_catfilebatch.go
Normal file
84
lfs/gitscanner_catfilebatch.go
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
package lfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// runCatFileBatch uses 'git cat-file --batch' to get the object contents of a
|
||||||
|
// git object, given its sha1. The contents will be decoded into a Git LFS
|
||||||
|
// pointer. Git Blob SHA1s are read from the sha1Ch channel and fed to STDIN.
|
||||||
|
// Results are parsed from STDOUT, and any elegible LFS pointers are sent to
|
||||||
|
// pointerCh. Any errors are sent to errCh. An error is returned if the 'git
|
||||||
|
// cat-file' command fails to start.
|
||||||
|
func runCatFileBatch(pointerCh chan *WrappedPointer, sha1Ch <-chan string, errCh chan error) error {
|
||||||
|
cmd, err := startCommand("git", "cat-file", "--batch")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go catFileBatchOutput(pointerCh, cmd, errCh)
|
||||||
|
go catFileBatchInput(cmd, sha1Ch, errCh)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func catFileBatchOutput(pointerCh chan *WrappedPointer, cmd *wrappedCmd, errCh chan error) {
|
||||||
|
for {
|
||||||
|
l, err := cmd.Stdout.ReadBytes('\n')
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
errCh <- errors.Wrap(err, "git cat-file --batch:")
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Line is formatted:
|
||||||
|
// <sha1> <type> <size>
|
||||||
|
fields := bytes.Fields(l)
|
||||||
|
s, _ := strconv.Atoi(string(fields[2]))
|
||||||
|
|
||||||
|
nbuf := make([]byte, s)
|
||||||
|
_, err = io.ReadFull(cmd.Stdout, nbuf)
|
||||||
|
if err != nil {
|
||||||
|
break // Legit errors
|
||||||
|
}
|
||||||
|
|
||||||
|
p, err := DecodePointer(bytes.NewBuffer(nbuf))
|
||||||
|
if err == nil {
|
||||||
|
pointerCh <- &WrappedPointer{
|
||||||
|
Sha1: string(fields[0]),
|
||||||
|
Size: p.Size,
|
||||||
|
Pointer: p,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = cmd.Stdout.ReadBytes('\n') // Extra \n inserted by cat-file
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
errCh <- errors.Wrap(err, "git cat-file --batch:")
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stderr, _ := ioutil.ReadAll(cmd.Stderr)
|
||||||
|
err := cmd.Wait()
|
||||||
|
if err != nil {
|
||||||
|
errCh <- fmt.Errorf("Error in git cat-file --batch: %v %v", err, string(stderr))
|
||||||
|
}
|
||||||
|
|
||||||
|
close(pointerCh)
|
||||||
|
close(errCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
func catFileBatchInput(cmd *wrappedCmd, sha1Ch <-chan string, errCh chan error) {
|
||||||
|
for r := range sha1Ch {
|
||||||
|
cmd.Stdin.Write([]byte(r + "\n"))
|
||||||
|
}
|
||||||
|
cmd.Stdin.Close()
|
||||||
|
}
|
79
lfs/gitscanner_catfilebatch_test.go
Normal file
79
lfs/gitscanner_catfilebatch_test.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
package lfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCatFileBatchValidInput(t *testing.T) {
|
||||||
|
outc := make(chan *WrappedPointer)
|
||||||
|
inc := make(chan string, 2)
|
||||||
|
errc := make(chan error)
|
||||||
|
shut1 := make(chan bool)
|
||||||
|
shut2 := make(chan bool)
|
||||||
|
|
||||||
|
go func(t *testing.T, shut chan bool, errc chan error) {
|
||||||
|
for err := range errc {
|
||||||
|
t.Errorf("err channel: %+v", err)
|
||||||
|
}
|
||||||
|
shut <- true
|
||||||
|
}(t, shut1, errc)
|
||||||
|
|
||||||
|
go func(t *testing.T, shut chan bool, outc chan *WrappedPointer) {
|
||||||
|
expected := []*WrappedPointer{
|
||||||
|
&WrappedPointer{
|
||||||
|
Sha1: "60c8d8ab2adcf57a391163a7eeb0cdb8bf348e44",
|
||||||
|
Size: 12345,
|
||||||
|
Pointer: &Pointer{
|
||||||
|
Oid: "4d7a214614ab2935c943f9e0ff69d22eadbb8f32b1258daaa5e2ca24d17e2393",
|
||||||
|
Size: 12345,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&WrappedPointer{
|
||||||
|
Sha1: "e71d7db5669ed8dda17b4b1dceb30cb14745c591",
|
||||||
|
Size: 12347,
|
||||||
|
Pointer: &Pointer{
|
||||||
|
Oid: "7d7a214614ab2935c943f9e0ff69d22eadbb8f32b1258daaa5e2ca24d17e2393",
|
||||||
|
Size: 12347,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
t.Log("reading output in test")
|
||||||
|
for actual := range outc {
|
||||||
|
t.Logf("found actual: %+v", actual)
|
||||||
|
if len(expected) <= i {
|
||||||
|
t.Errorf("Cannot access index %d of output", i)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, expected[i].Sha1, actual.Sha1)
|
||||||
|
assert.Equal(t, expected[i].Oid, actual.Oid)
|
||||||
|
assert.Equal(t, expected[i].Size, actual.Size)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(expected) > i {
|
||||||
|
t.Errorf("got to index %d of %d", i, len(expected))
|
||||||
|
}
|
||||||
|
|
||||||
|
shut <- true
|
||||||
|
}(t, shut2, outc)
|
||||||
|
|
||||||
|
if err := runCatFileBatch(outc, inc, errc); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("sending input")
|
||||||
|
inc <- "126fd41019b623ce1621a638d2535b26e0edb4df"
|
||||||
|
inc <- "60c8d8ab2adcf57a391163a7eeb0cdb8bf348e44"
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
inc <- "e71d7db5669ed8dda17b4b1dceb30cb14745c591"
|
||||||
|
close(inc)
|
||||||
|
|
||||||
|
<-shut1
|
||||||
|
<-shut2
|
||||||
|
}
|
@ -566,68 +566,11 @@ func catFileBatchCheck(revs *StringChannelWrapper) (*StringChannelWrapper, error
|
|||||||
// a Git LFS pointer. revs is a channel over which strings containing Git SHA1s
|
// a Git LFS pointer. revs is a channel over which strings containing Git SHA1s
|
||||||
// will be sent. It returns a channel from which point.Pointers can be read.
|
// will be sent. It returns a channel from which point.Pointers can be read.
|
||||||
func catFileBatch(revs *StringChannelWrapper) (*PointerChannelWrapper, error) {
|
func catFileBatch(revs *StringChannelWrapper) (*PointerChannelWrapper, error) {
|
||||||
cmd, err := startCommand("git", "cat-file", "--batch")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pointers := make(chan *WrappedPointer, chanBufSize)
|
pointers := make(chan *WrappedPointer, chanBufSize)
|
||||||
errchan := make(chan error, 5) // shared by 2 goroutines & may add more detail errors?
|
errchan := make(chan error, 5) // shared by 2 goroutines & may add more detail errors?
|
||||||
|
if err := runCatFileBatch(pointers, revs.Results, errchan); err != nil {
|
||||||
go func() {
|
return nil, err
|
||||||
for {
|
}
|
||||||
l, err := cmd.Stdout.ReadBytes('\n')
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Line is formatted:
|
|
||||||
// <sha1> <type> <size>
|
|
||||||
fields := bytes.Fields(l)
|
|
||||||
s, _ := strconv.Atoi(string(fields[2]))
|
|
||||||
|
|
||||||
nbuf := make([]byte, s)
|
|
||||||
_, err = io.ReadFull(cmd.Stdout, nbuf)
|
|
||||||
if err != nil {
|
|
||||||
break // Legit errors
|
|
||||||
}
|
|
||||||
|
|
||||||
p, err := DecodePointer(bytes.NewBuffer(nbuf))
|
|
||||||
if err == nil {
|
|
||||||
pointers <- &WrappedPointer{
|
|
||||||
Sha1: string(fields[0]),
|
|
||||||
Size: p.Size,
|
|
||||||
Pointer: p,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = cmd.Stdout.ReadBytes('\n') // Extra \n inserted by cat-file
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stderr, _ := ioutil.ReadAll(cmd.Stderr)
|
|
||||||
err = cmd.Wait()
|
|
||||||
if err != nil {
|
|
||||||
errchan <- fmt.Errorf("Error in git cat-file --batch: %v %v", err, string(stderr))
|
|
||||||
}
|
|
||||||
close(pointers)
|
|
||||||
close(errchan)
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for r := range revs.Results {
|
|
||||||
cmd.Stdin.Write([]byte(r + "\n"))
|
|
||||||
}
|
|
||||||
err := revs.Wait()
|
|
||||||
if err != nil {
|
|
||||||
// We can share errchan with other goroutine since that won't close it
|
|
||||||
// until we close the stdin below
|
|
||||||
errchan <- err
|
|
||||||
}
|
|
||||||
cmd.Stdin.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
return NewPointerChannelWrapper(pointers, errchan), nil
|
return NewPointerChannelWrapper(pointers, errchan), nil
|
||||||
}
|
}
|
||||||
@ -1107,7 +1050,6 @@ type BaseChannelWrapper struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *BaseChannelWrapper) Wait() error {
|
func (w *BaseChannelWrapper) Wait() error {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
for e := range w.errorChan {
|
for e := range w.errorChan {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user