lfs,tq: move type *lfs.TransferQueue to pkg "tq"
This commit is contained in:
parent
f17a07ea70
commit
c5ce4b7287
@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/git-lfs/git-lfs/git"
|
"github.com/git-lfs/git-lfs/git"
|
||||||
"github.com/git-lfs/git-lfs/lfs"
|
"github.com/git-lfs/git-lfs/lfs"
|
||||||
"github.com/git-lfs/git-lfs/progress"
|
"github.com/git-lfs/git-lfs/progress"
|
||||||
|
"github.com/git-lfs/git-lfs/tq"
|
||||||
"github.com/rubyist/tracerx"
|
"github.com/rubyist/tracerx"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
@ -290,7 +291,7 @@ func fetchAndReportToChan(allpointers []*lfs.WrappedPointer, filter *filepathfil
|
|||||||
}
|
}
|
||||||
|
|
||||||
ready, pointers, meter := readyAndMissingPointers(allpointers, filter)
|
ready, pointers, meter := readyAndMissingPointers(allpointers, filter)
|
||||||
q := lfs.NewDownloadQueue(lfs.WithProgress(meter))
|
q := lfs.NewDownloadQueue(tq.WithProgress(meter))
|
||||||
|
|
||||||
if out != nil {
|
if out != nil {
|
||||||
// If we already have it, or it won't be fetched
|
// If we already have it, or it won't be fetched
|
||||||
|
@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/git-lfs/git-lfs/localstorage"
|
"github.com/git-lfs/git-lfs/localstorage"
|
||||||
"github.com/git-lfs/git-lfs/progress"
|
"github.com/git-lfs/git-lfs/progress"
|
||||||
"github.com/git-lfs/git-lfs/tools"
|
"github.com/git-lfs/git-lfs/tools"
|
||||||
|
"github.com/git-lfs/git-lfs/tq"
|
||||||
"github.com/rubyist/tracerx"
|
"github.com/rubyist/tracerx"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
@ -111,7 +112,7 @@ func prune(fetchPruneConfig config.FetchPruneConfig, verifyRemote, dryRun, verbo
|
|||||||
prunableObjects := make([]string, 0, len(localObjects)/2)
|
prunableObjects := make([]string, 0, len(localObjects)/2)
|
||||||
|
|
||||||
// Build list of prunables (also queue for verify at same time if applicable)
|
// Build list of prunables (also queue for verify at same time if applicable)
|
||||||
var verifyQueue *lfs.TransferQueue
|
var verifyQueue *tq.TransferQueue
|
||||||
var verifiedObjects tools.StringSet
|
var verifiedObjects tools.StringSet
|
||||||
var totalSize int64
|
var totalSize int64
|
||||||
var verboseOutput bytes.Buffer
|
var verboseOutput bytes.Buffer
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"github.com/git-lfs/git-lfs/errors"
|
"github.com/git-lfs/git-lfs/errors"
|
||||||
"github.com/git-lfs/git-lfs/lfs"
|
"github.com/git-lfs/git-lfs/lfs"
|
||||||
"github.com/git-lfs/git-lfs/tools"
|
"github.com/git-lfs/git-lfs/tools"
|
||||||
|
"github.com/git-lfs/git-lfs/tq"
|
||||||
)
|
)
|
||||||
|
|
||||||
var uploadMissingErr = "%s does not exist in .git/lfs/objects. Tried %s, which matches %s."
|
var uploadMissingErr = "%s does not exist in .git/lfs/objects. Tried %s, which matches %s."
|
||||||
@ -34,7 +35,7 @@ func (c *uploadContext) HasUploaded(oid string) bool {
|
|||||||
return c.uploadedOids.Contains(oid)
|
return c.uploadedOids.Contains(oid)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *uploadContext) prepareUpload(unfiltered []*lfs.WrappedPointer) (*lfs.TransferQueue, []*lfs.WrappedPointer) {
|
func (c *uploadContext) prepareUpload(unfiltered []*lfs.WrappedPointer) (*tq.TransferQueue, []*lfs.WrappedPointer) {
|
||||||
numUnfiltered := len(unfiltered)
|
numUnfiltered := len(unfiltered)
|
||||||
uploadables := make([]*lfs.WrappedPointer, 0, numUnfiltered)
|
uploadables := make([]*lfs.WrappedPointer, 0, numUnfiltered)
|
||||||
missingLocalObjects := make([]*lfs.WrappedPointer, 0, numUnfiltered)
|
missingLocalObjects := make([]*lfs.WrappedPointer, 0, numUnfiltered)
|
||||||
@ -74,7 +75,7 @@ func (c *uploadContext) prepareUpload(unfiltered []*lfs.WrappedPointer) (*lfs.Tr
|
|||||||
|
|
||||||
// build the TransferQueue, automatically skipping any missing objects that
|
// build the TransferQueue, automatically skipping any missing objects that
|
||||||
// the server already has.
|
// the server already has.
|
||||||
uploadQueue := lfs.NewUploadQueue(lfs.WithProgress(meter), lfs.DryRun(c.DryRun))
|
uploadQueue := lfs.NewUploadQueue(tq.WithProgress(meter), tq.DryRun(c.DryRun))
|
||||||
for _, p := range missingLocalObjects {
|
for _, p := range missingLocalObjects {
|
||||||
if c.HasUploaded(p.Oid) {
|
if c.HasUploaded(p.Oid) {
|
||||||
// if the server already has this object, call Skip() on
|
// if the server already has this object, call Skip() on
|
||||||
|
@ -2,6 +2,7 @@ package lfs
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/git-lfs/git-lfs/api"
|
"github.com/git-lfs/git-lfs/api"
|
||||||
|
"github.com/git-lfs/git-lfs/tq"
|
||||||
"github.com/git-lfs/git-lfs/transfer"
|
"github.com/git-lfs/git-lfs/transfer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -40,11 +41,11 @@ func NewDownloadable(p *WrappedPointer) *Downloadable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download
|
// NewDownloadCheckQueue builds a checking queue, checks that objects are there but doesn't download
|
||||||
func NewDownloadCheckQueue(options ...transferQueueOption) *TransferQueue {
|
func NewDownloadCheckQueue(options ...tq.TransferQueueOption) *tq.TransferQueue {
|
||||||
return newTransferQueue(transfer.Download, options...)
|
return tq.NewTransferQueue(transfer.Download, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDownloadQueue builds a DownloadQueue, allowing concurrent downloads.
|
// NewDownloadQueue builds a DownloadQueue, allowing concurrent downloads.
|
||||||
func NewDownloadQueue(options ...transferQueueOption) *TransferQueue {
|
func NewDownloadQueue(options ...tq.TransferQueueOption) *tq.TransferQueue {
|
||||||
return newTransferQueue(transfer.Download, options...)
|
return tq.NewTransferQueue(transfer.Download, options...)
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/git-lfs/git-lfs/api"
|
"github.com/git-lfs/git-lfs/api"
|
||||||
"github.com/git-lfs/git-lfs/config"
|
"github.com/git-lfs/git-lfs/config"
|
||||||
"github.com/git-lfs/git-lfs/errors"
|
"github.com/git-lfs/git-lfs/errors"
|
||||||
|
"github.com/git-lfs/git-lfs/tq"
|
||||||
"github.com/git-lfs/git-lfs/transfer"
|
"github.com/git-lfs/git-lfs/transfer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -67,8 +68,8 @@ func NewUploadable(oid, filename string) (*Uploadable, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
|
// NewUploadQueue builds an UploadQueue, allowing `workers` concurrent uploads.
|
||||||
func NewUploadQueue(options ...transferQueueOption) *TransferQueue {
|
func NewUploadQueue(options ...tq.TransferQueueOption) *tq.TransferQueue {
|
||||||
return newTransferQueue(transfer.Upload, options...)
|
return tq.NewTransferQueue(transfer.Upload, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensureFile makes sure that the cleanPath exists before pushing it. If it
|
// ensureFile makes sure that the cleanPath exists before pushing it. If it
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/git-lfs/git-lfs/lfs"
|
"github.com/git-lfs/git-lfs/lfs"
|
||||||
"github.com/git-lfs/git-lfs/progress"
|
"github.com/git-lfs/git-lfs/progress"
|
||||||
"github.com/git-lfs/git-lfs/test"
|
"github.com/git-lfs/git-lfs/test"
|
||||||
|
"github.com/git-lfs/git-lfs/tq"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -158,7 +159,7 @@ func buildTestData() (oidsExist, oidsMissing []TestObject, err error) {
|
|||||||
outputs := repo.AddCommits([]*test.CommitInput{&commit})
|
outputs := repo.AddCommits([]*test.CommitInput{&commit})
|
||||||
|
|
||||||
// now upload
|
// now upload
|
||||||
uploadQueue := lfs.NewUploadQueue(lfs.WithProgress(meter))
|
uploadQueue := lfs.NewUploadQueue(tq.WithProgress(meter))
|
||||||
for _, f := range outputs[0].Files {
|
for _, f := range outputs[0].Files {
|
||||||
oidsExist = append(oidsExist, TestObject{Oid: f.Oid, Size: f.Size})
|
oidsExist = append(oidsExist, TestObject{Oid: f.Oid, Size: f.Size})
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package lfs
|
package tq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
||||||
@ -137,30 +137,30 @@ type TransferQueue struct {
|
|||||||
rc *retryCounter
|
rc *retryCounter
|
||||||
}
|
}
|
||||||
|
|
||||||
type transferQueueOption func(*TransferQueue)
|
type TransferQueueOption func(*TransferQueue)
|
||||||
|
|
||||||
func DryRun(dryRun bool) transferQueueOption {
|
func DryRun(dryRun bool) TransferQueueOption {
|
||||||
return func(tq *TransferQueue) {
|
return func(tq *TransferQueue) {
|
||||||
tq.dryRun = dryRun
|
tq.dryRun = dryRun
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithProgress(m progress.Meter) transferQueueOption {
|
func WithProgress(m progress.Meter) TransferQueueOption {
|
||||||
return func(tq *TransferQueue) {
|
return func(tq *TransferQueue) {
|
||||||
tq.meter = m
|
tq.meter = m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithBatchSize(size int) transferQueueOption {
|
func WithBatchSize(size int) TransferQueueOption {
|
||||||
return func(tq *TransferQueue) { tq.batchSize = size }
|
return func(tq *TransferQueue) { tq.batchSize = size }
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithBufferDepth(depth int) transferQueueOption {
|
func WithBufferDepth(depth int) TransferQueueOption {
|
||||||
return func(tq *TransferQueue) { tq.bufferDepth = depth }
|
return func(tq *TransferQueue) { tq.bufferDepth = depth }
|
||||||
}
|
}
|
||||||
|
|
||||||
// newTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
|
// NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
|
||||||
func newTransferQueue(dir transfer.Direction, options ...transferQueueOption) *TransferQueue {
|
func NewTransferQueue(dir transfer.Direction, options ...TransferQueueOption) *TransferQueue {
|
||||||
q := &TransferQueue{
|
q := &TransferQueue{
|
||||||
batchSize: defaultBatchSize,
|
batchSize: defaultBatchSize,
|
||||||
bufferDepth: defaultBatchSize,
|
bufferDepth: defaultBatchSize,
|
@ -1,4 +1,4 @@
|
|||||||
package lfs
|
package tq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
Loading…
Reference in New Issue
Block a user