lfs/batcher: refactor run
goroutine, introduce type Lot
This commit introduces several refactorings to the `run` method (now called `acceptInput`) on the Batcher type, and introduces the type `Lot`. - A call to `acceptInput` directly does not spawn a goroutine. The recommended way to initialize is to spawn it as a goroutine, which is how the constructor method handles initialization of the Batcher type. - The responsibility of creating new batches is pushed down into the `newBatch` func, which allocates a slice of `Transferable`s and sets its capacity at the maximum batch size by calling into the NewLot constructor func. - The Lot type lets us make use of several convenience methods by wrapping the []Transferable type. Pushed out the responsibility of checking whether a given "batch" (now Lot) is full. Also delegates the responsibility of Lot allocation et. al. further down.
This commit is contained in:
parent
3f1eec26a2
commit
37a82cb630
@ -8,21 +8,41 @@ package lfs
|
||||
// * Exit() is called
|
||||
// When a timeout, Flush(), or Exit() occurs, the group may be smaller than the
|
||||
// batch size.
|
||||
|
||||
// A Lot represents a group of Transferables that was packaged up by a Batcher.
|
||||
type Lot []Transferable
|
||||
|
||||
func NewLot(l, c int) Lot {
|
||||
return Lot(make([]Transferable, l, c))
|
||||
}
|
||||
|
||||
// IsFull returns whether or not the given instance of a Lot is full, as
|
||||
// specified by the capacity given as the first argument.
|
||||
func (l Lot) IsFull(capacity int) bool {
|
||||
return len(l) == capacity
|
||||
}
|
||||
|
||||
// Add returns a new Lot that contains all of the contents of the existing Lot,
|
||||
// as well as the contents of the variadic Transferables argument.
|
||||
func (l Lot) Add(ts ...Transferable) Lot {
|
||||
return Lot(append(l, ts...))
|
||||
}
|
||||
|
||||
type Batcher struct {
|
||||
batchSize int
|
||||
input chan Transferable
|
||||
batchReady chan []Transferable
|
||||
batchSize int
|
||||
input chan Transferable
|
||||
lotReady chan Lot
|
||||
}
|
||||
|
||||
// NewBatcher creates a Batcher with the batchSize.
|
||||
func NewBatcher(batchSize int) *Batcher {
|
||||
b := &Batcher{
|
||||
batchSize: batchSize,
|
||||
input: make(chan Transferable, batchSize),
|
||||
batchReady: make(chan []Transferable),
|
||||
batchSize: batchSize,
|
||||
input: make(chan Transferable, batchSize),
|
||||
lotReady: make(chan Lot),
|
||||
}
|
||||
|
||||
b.run()
|
||||
go b.acceptInput()
|
||||
return b
|
||||
}
|
||||
|
||||
@ -34,8 +54,8 @@ func (b *Batcher) Add(t Transferable) {
|
||||
|
||||
// Next will wait for the one of the above batch triggers to occur and return
|
||||
// the accumulated batch.
|
||||
func (b *Batcher) Next() []Transferable {
|
||||
return <-b.batchReady
|
||||
func (b *Batcher) Next() Lot {
|
||||
return <-b.lotReady
|
||||
}
|
||||
|
||||
// Exit stops all batching and allows Next() to return. Calling Add() after
|
||||
@ -44,29 +64,35 @@ func (b *Batcher) Exit() {
|
||||
close(b.input)
|
||||
}
|
||||
|
||||
func (b *Batcher) run() {
|
||||
go func() {
|
||||
exit := false
|
||||
for {
|
||||
batch := make([]Transferable, 0, b.batchSize)
|
||||
Loop:
|
||||
for i := 0; i < b.batchSize; i++ {
|
||||
select {
|
||||
case t, ok := <-b.input:
|
||||
if ok {
|
||||
batch = append(batch, t)
|
||||
} else {
|
||||
exit = true // input channel was closed by Exit()
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
// acceptInput runs in its own goroutine and accepts input from external
|
||||
// clients. It fills and dispenses batches in a sequential order: for a batch
|
||||
// size N, N items will be processed before a new batch is ready.
|
||||
func (b *Batcher) acceptInput() {
|
||||
exit := false
|
||||
|
||||
for {
|
||||
lot := b.newLot()
|
||||
Loop:
|
||||
for !lot.IsFull(b.batchSize) {
|
||||
t, ok := <-b.input
|
||||
if !ok {
|
||||
exit = true // input channel was closed by Exit()
|
||||
break Loop
|
||||
}
|
||||
|
||||
b.batchReady <- batch
|
||||
|
||||
if exit {
|
||||
return
|
||||
}
|
||||
lot = lot.Add(t)
|
||||
}
|
||||
}()
|
||||
|
||||
b.lotReady <- lot
|
||||
|
||||
if exit {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// newBatch allocates a slice of Transferables with the capacity of the set
|
||||
// batch size.
|
||||
func (b *Batcher) newLot() Lot {
|
||||
return NewLot(0, b.batchSize)
|
||||
}
|
||||
|
@ -6,6 +6,55 @@ import (
|
||||
"github.com/github/git-lfs/vendor/_nuts/github.com/technoweenie/assert"
|
||||
)
|
||||
|
||||
type lotTestCase struct {
|
||||
Length int
|
||||
Capacity int
|
||||
}
|
||||
|
||||
func (c lotTestCase) Lot() Lot {
|
||||
return NewLot(c.Length, c.Capacity)
|
||||
}
|
||||
|
||||
func TestLotConstruction(t *testing.T) {
|
||||
cases := []lotTestCase{
|
||||
{0, 0},
|
||||
{3, 3},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
l := c.Lot()
|
||||
|
||||
assert.Equal(t, c.Length, len(l))
|
||||
assert.Equal(t, c.Capacity, cap(l))
|
||||
}
|
||||
}
|
||||
|
||||
func TestLotFullness(t *testing.T) {
|
||||
c := lotTestCase{0, 3}
|
||||
l := c.Lot()
|
||||
|
||||
assert.Equal(t, false, l.IsFull(c.Capacity))
|
||||
|
||||
for i := 0; i < c.Capacity; i++ {
|
||||
l = l.Add(&Downloadable{})
|
||||
}
|
||||
|
||||
assert.Equal(t, true, l.IsFull(c.Capacity))
|
||||
}
|
||||
|
||||
func TestLotAdding(t *testing.T) {
|
||||
c := lotTestCase{0, 10}
|
||||
l := c.Lot()
|
||||
|
||||
assert.Equal(t, 0, len(l))
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
l = l.Add(&Downloadable{})
|
||||
}
|
||||
|
||||
assert.Equal(t, 10, len(l))
|
||||
}
|
||||
|
||||
func TestBatcherSizeMet(t *testing.T) {
|
||||
runBatcherTests([]batcherTestCase{
|
||||
{2, 4, false},
|
||||
|
Loading…
Reference in New Issue
Block a user