introduce an uploadContext struct to share in push and pre-push

This commit is contained in:
risk danger olson 2016-04-06 13:06:34 -06:00
parent 31dc0c2807
commit abd9b1e456
6 changed files with 214 additions and 222 deletions

@ -15,9 +15,8 @@ var (
Use: "pre-push", Use: "pre-push",
Run: prePushCommand, Run: prePushCommand,
} }
prePushDryRun = false prePushDryRun = false
prePushDeleteBranch = strings.Repeat("0", 40) prePushDeleteBranch = strings.Repeat("0", 40)
prePushMissingErrMsg = "%s is an LFS pointer to %s, which does not exist in .git/lfs/objects.\n\nRun 'git lfs fsck' to verify Git LFS objects."
) )
// prePushCommand is run through Git's pre-push hook. The pre-push hook passes // prePushCommand is run through Git's pre-push hook. The pre-push hook passes
@ -43,7 +42,6 @@ var (
// In the case of deleting a branch, no attempts to push Git LFS objects will be // In the case of deleting a branch, no attempts to push Git LFS objects will be
// made. // made.
func prePushCommand(cmd *cobra.Command, args []string) { func prePushCommand(cmd *cobra.Command, args []string) {
if len(args) == 0 { if len(args) == 0 {
Print("This should be run through Git's pre-push hook. Run `git lfs update` to install it.") Print("This should be run through Git's pre-push hook. Run `git lfs update` to install it.")
os.Exit(1) os.Exit(1)
@ -55,6 +53,13 @@ func prePushCommand(cmd *cobra.Command, args []string) {
} }
lfs.Config.CurrentRemote = args[0] lfs.Config.CurrentRemote = args[0]
ctx := newUploadContext()
ctx.DryRun = prePushDryRun
scanOpt := lfs.NewScanRefsOptions()
scanOpt.ScanMode = lfs.ScanLeftToRemoteMode
scanOpt.RemoteName = ctx.RemoteName
// We can be passed multiple lines of refs // We can be passed multiple lines of refs
scanner := bufio.NewScanner(os.Stdin) scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() { for scanner.Scan() {
@ -69,113 +74,13 @@ func prePushCommand(cmd *cobra.Command, args []string) {
continue continue
} }
prePushRef(left, right) pointers, err := lfs.ScanRefs(left, right, scanOpt)
}
}
func prePushRef(left, right string) {
// Just use scanner here
scanOpt := lfs.NewScanRefsOptions()
scanOpt.ScanMode = lfs.ScanLeftToRemoteMode
scanOpt.RemoteName = lfs.Config.CurrentRemote
pointers, err := lfs.ScanRefs(left, right, scanOpt)
if err != nil {
Panic(err, "Error scanning for Git LFS files")
}
totalSize := int64(0)
for _, p := range pointers {
totalSize += p.Size
}
// Objects to skip because they're missing locally but on server
var skipObjects lfs.StringSet
if !prePushDryRun {
// Do this as a pre-flight check since upload queue starts immediately
skipObjects = prePushCheckForMissingObjects(pointers)
}
uploadQueue := lfs.NewUploadQueue(len(pointers), totalSize, prePushDryRun)
for _, pointer := range pointers {
if prePushDryRun {
Print("push %s => %s", pointer.Oid, pointer.Name)
continue
}
if skipObjects.Contains(pointer.Oid) {
// object missing locally but on server, don't bother
continue
}
u, err := lfs.NewUploadable(pointer.Oid, pointer.Name)
if err != nil { if err != nil {
if lfs.IsCleanPointerError(err) { Panic(err, "Error scanning for Git LFS files")
Exit(prePushMissingErrMsg, pointer.Name, lfs.ErrorGetContext(err, "pointer").(*lfs.Pointer).Oid)
} else {
ExitWithError(err)
}
} }
uploadQueue.Add(u) ctx.Upload(pointers)
} }
if !prePushDryRun {
uploadQueue.Wait()
for _, err := range uploadQueue.Errors() {
if Debugging || lfs.IsFatalError(err) {
LoggedError(err, err.Error())
} else {
if inner := lfs.GetInnerError(err); inner != nil {
Error(inner.Error())
}
Error(err.Error())
}
}
if len(uploadQueue.Errors()) > 0 {
os.Exit(2)
}
}
}
func prePushCheckForMissingObjects(pointers []*lfs.WrappedPointer) (objectsOnServer lfs.StringSet) {
var missingLocalObjects []*lfs.WrappedPointer
var missingSize int64
var skipObjects = lfs.NewStringSetWithCapacity(len(pointers))
for _, pointer := range pointers {
if !lfs.ObjectExistsOfSize(pointer.Oid, pointer.Size) {
// We think we need to push this but we don't have it
// Store for server checking later
missingLocalObjects = append(missingLocalObjects, pointer)
missingSize += pointer.Size
}
}
if len(missingLocalObjects) == 0 {
return nil
}
checkQueue := lfs.NewDownloadCheckQueue(len(missingLocalObjects), missingSize, true)
for _, p := range missingLocalObjects {
checkQueue.Add(lfs.NewDownloadCheckable(p))
}
// this channel is filled with oids for which Check() succeeded & Transfer() was called
transferc := checkQueue.Watch()
done := make(chan int)
go func() {
for oid := range transferc {
skipObjects.Add(oid)
}
done <- 1
}()
// Currently this is needed to flush the batch but is not enough to sync transferc completely
checkQueue.Wait()
<-done
return skipObjects
} }
// decodeRefs pulls the sha1s out of the line read from the pre-push // decodeRefs pulls the sha1s out of the line read from the pre-push

@ -23,115 +23,83 @@ var (
// shares some global vars and functions with command_pre_push.go // shares some global vars and functions with command_pre_push.go
) )
func uploadsBetweenRefs(left string, right string) *lfs.TransferQueue { func uploadsBetweenRefs(ctx *uploadContext, left string, right string) {
tracerx.Printf("Upload between %v and %v", left, right) tracerx.Printf("Upload between %v and %v", left, right)
// Just use scanner here scanOpt := lfs.NewScanRefsOptions()
pointers, err := lfs.ScanRefs(left, right, nil) scanOpt.ScanMode = lfs.ScanRefsMode
scanOpt.RemoteName = ctx.RemoteName
pointers, err := lfs.ScanRefs(left, right, scanOpt)
if err != nil { if err != nil {
Panic(err, "Error scanning for Git LFS files") Panic(err, "Error scanning for Git LFS files")
} }
return uploadPointers(pointers)
ctx.Upload(pointers)
} }
func uploadsBetweenRefAndRemote(remote string, refs []string) *lfs.TransferQueue { func uploadsBetweenRefAndRemote(ctx *uploadContext, refnames []string) {
tracerx.Printf("Upload refs %v to remote %v", refs, remote) tracerx.Printf("Upload refs %v to remote %v", refnames, ctx.RemoteName)
scanOpt := lfs.NewScanRefsOptions() scanOpt := lfs.NewScanRefsOptions()
scanOpt.ScanMode = lfs.ScanLeftToRemoteMode scanOpt.ScanMode = lfs.ScanLeftToRemoteMode
scanOpt.RemoteName = remote scanOpt.RemoteName = ctx.RemoteName
if pushAll { if pushAll {
if len(refs) == 0 { scanOpt.ScanMode = lfs.ScanRefsMode
pointers := scanAll()
Print("Pushing objects...")
return uploadPointers(pointers)
} else {
scanOpt.ScanMode = lfs.ScanRefsMode
}
} }
// keep a unique set of pointers refs, err := refsByNames(refnames)
oidPointerMap := make(map[string]*lfs.WrappedPointer) if err != nil {
Error(err.Error())
Exit("Error getting local refs.")
}
for _, ref := range refs { for _, ref := range refs {
pointers, err := lfs.ScanRefs(ref, "", scanOpt) pointers, err := lfs.ScanRefs(ref.Name, "", scanOpt)
if err != nil { if err != nil {
Panic(err, "Error scanning for Git LFS files in the %q ref", ref) Panic(err, "Error scanning for Git LFS files in the %q ref", ref.Name)
} }
for _, p := range pointers { ctx.Upload(pointers)
oidPointerMap[p.Oid] = p
}
} }
i := 0
pointers := make([]*lfs.WrappedPointer, len(oidPointerMap))
for _, pointer := range oidPointerMap {
pointers[i] = pointer
i += 1
}
return uploadPointers(pointers)
} }
func uploadPointers(pointers []*lfs.WrappedPointer) *lfs.TransferQueue { func uploadsWithObjectIDs(ctx *uploadContext, oids []string) {
totalSize := int64(0) pointers := make([]*lfs.WrappedPointer, len(oids))
for _, p := range pointers {
totalSize += p.Size for idx, oid := range oids {
pointers[idx] = &lfs.WrappedPointer{Pointer: &lfs.Pointer{Oid: oid}}
} }
skipObjects := prePushCheckForMissingObjects(pointers) ctx.Upload(pointers)
uploadQueue := lfs.NewUploadQueue(len(pointers), totalSize, pushDryRun)
for i, pointer := range pointers {
if pushDryRun {
Print("push %s => %s", pointer.Oid, pointer.Name)
continue
}
if _, skip := skipObjects[pointer.Oid]; skip {
// object missing locally but on server, don't bother
continue
}
tracerx.Printf("prepare upload: %s %s %d/%d", pointer.Oid, pointer.Name, i+1, len(pointers))
u, err := lfs.NewUploadable(pointer.Oid, pointer.Name)
if err != nil {
ExitWithError(err)
}
uploadQueue.Add(u)
}
return uploadQueue
} }
func uploadsWithObjectIDs(oids []string) *lfs.TransferQueue { func refsByNames(refnames []string) ([]*git.Ref, error) {
uploads := []*lfs.Uploadable{} localrefs, err := git.LocalRefs()
totalSize := int64(0) if err != nil {
return nil, err
for i, oid := range oids {
if pushDryRun {
Print("push object ID %s", oid)
continue
}
tracerx.Printf("prepare upload: %s %d/%d", oid, i+1, len(oids))
u, err := lfs.NewUploadable(oid, "")
if err != nil {
ExitWithError(err)
}
uploads = append(uploads, u)
} }
uploadQueue := lfs.NewUploadQueue(len(oids), totalSize, pushDryRun) if pushAll && len(refnames) == 0 {
return localrefs, nil
for _, u := range uploads {
uploadQueue.Add(u)
} }
return uploadQueue reflookup := make(map[string]*git.Ref, len(localrefs))
for _, ref := range localrefs {
reflookup[ref.Name] = ref
}
refs := make([]*git.Ref, len(refnames))
for i, name := range refnames {
if ref, ok := reflookup[name]; ok {
refs[i] = ref
} else {
refs[i] = &git.Ref{name, git.RefTypeOther, name}
}
}
return refs, nil
} }
// pushCommand pushes local objects to a Git LFS server. It takes two // pushCommand pushes local objects to a Git LFS server. It takes two
@ -144,8 +112,6 @@ func uploadsWithObjectIDs(oids []string) *lfs.TransferQueue {
// pushCommand calculates the git objects to send by looking comparing the range // pushCommand calculates the git objects to send by looking comparing the range
// of commits between the local and remote git servers. // of commits between the local and remote git servers.
func pushCommand(cmd *cobra.Command, args []string) { func pushCommand(cmd *cobra.Command, args []string) {
var uploadQueue *lfs.TransferQueue
if len(args) == 0 { if len(args) == 0 {
Print("Specify a remote and a remote branch name (`git lfs push origin master`)") Print("Specify a remote and a remote branch name (`git lfs push origin master`)")
os.Exit(1) os.Exit(1)
@ -157,6 +123,9 @@ func pushCommand(cmd *cobra.Command, args []string) {
} }
lfs.Config.CurrentRemote = args[0] lfs.Config.CurrentRemote = args[0]
ctx := newUploadContext()
ctx.DryRun = pushDryRun
if useStdin { if useStdin {
requireStdin("Run this command from the Git pre-push hook, or leave the --stdin flag off.") requireStdin("Run this command from the Git pre-push hook, or leave the --stdin flag off.")
@ -178,39 +147,21 @@ func pushCommand(cmd *cobra.Command, args []string) {
return return
} }
uploadQueue = uploadsBetweenRefs(left, right) uploadsBetweenRefs(ctx, left, right)
} else if pushObjectIDs { } else if pushObjectIDs {
if len(args) < 2 { if len(args) < 2 {
Print("Usage: git lfs push --object-id <remote> <lfs-object-id> [lfs-object-id] ...") Print("Usage: git lfs push --object-id <remote> <lfs-object-id> [lfs-object-id] ...")
return return
} }
uploadQueue = uploadsWithObjectIDs(args[1:]) uploadsWithObjectIDs(ctx, args[1:])
} else { } else {
if len(args) < 1 { if len(args) < 1 {
Print("Usage: git lfs push --dry-run <remote> [ref]") Print("Usage: git lfs push --dry-run <remote> [ref]")
return return
} }
uploadQueue = uploadsBetweenRefAndRemote(args[0], args[1:]) uploadsBetweenRefAndRemote(ctx, args[1:])
}
if !pushDryRun {
uploadQueue.Wait()
for _, err := range uploadQueue.Errors() {
if Debugging || lfs.IsFatalError(err) {
LoggedError(err, err.Error())
} else {
if inner := lfs.GetInnerError(err); inner != nil {
Error(inner.Error())
}
Error(err.Error())
}
}
if len(uploadQueue.Errors()) > 0 {
os.Exit(2)
}
} }
} }

130
commands/uploader.go Normal file

@ -0,0 +1,130 @@
package commands
import (
"os"
"github.com/github/git-lfs/lfs"
)
var uploadMissingErr = "%s does not exist in .git/lfs/objects. Tried %s, which matches %s."
type uploadContext struct {
RemoteName string
DryRun bool
uploadedOids lfs.StringSet
}
func newUploadContext() *uploadContext {
return &uploadContext{
uploadedOids: lfs.NewStringSet(),
RemoteName: lfs.Config.CurrentRemote,
}
}
func (c *uploadContext) Upload(unfilteredPointers []*lfs.WrappedPointer) {
filtered := c.filterUploadedObjects(noSkip, unfilteredPointers)
totalSize := int64(0)
for _, p := range filtered {
totalSize += p.Size
}
uploadQueue := lfs.NewUploadQueue(len(filtered), totalSize, c.DryRun)
if c.DryRun {
for _, pointer := range filtered {
Print("push %s => %s", pointer.Oid, pointer.Name)
c.uploadedOids.Add(pointer.Oid)
}
return
}
c.filterServerObjects(filtered)
pointers := c.filterUploadedObjects(uploadQueue, filtered)
for _, pointer := range pointers {
u, err := lfs.NewUploadable(pointer.Oid, pointer.Name)
if err != nil {
if lfs.IsCleanPointerError(err) {
Exit(uploadMissingErr, pointer.Oid, pointer.Name, lfs.ErrorGetContext(err, "pointer").(*lfs.Pointer).Oid)
} else {
ExitWithError(err)
}
}
uploadQueue.Add(u)
c.uploadedOids.Add(pointer.Oid)
}
uploadQueue.Wait()
for _, err := range uploadQueue.Errors() {
if Debugging || lfs.IsFatalError(err) {
LoggedError(err, err.Error())
} else {
if inner := lfs.GetInnerError(err); inner != nil {
Error(inner.Error())
}
Error(err.Error())
}
}
if len(uploadQueue.Errors()) > 0 {
os.Exit(2)
}
}
func (c *uploadContext) filterUploadedObjects(q transferQueueSkip, pointers []*lfs.WrappedPointer) []*lfs.WrappedPointer {
filtered := make([]*lfs.WrappedPointer, 0, len(pointers))
for _, pointer := range pointers {
if c.uploadedOids.Contains(pointer.Oid) {
q.Skip(pointer.Size)
} else {
filtered = append(filtered, pointer)
}
}
return filtered
}
func (c *uploadContext) filterServerObjects(pointers []*lfs.WrappedPointer) {
missingLocalObjects := make([]*lfs.WrappedPointer, 0, len(pointers))
missingSize := int64(0)
for _, pointer := range pointers {
if !lfs.ObjectExistsOfSize(pointer.Oid, pointer.Size) {
// We think we need to push this but we don't have it
// Store for server checking later
missingLocalObjects = append(missingLocalObjects, pointer)
missingSize += pointer.Size
}
}
if len(missingLocalObjects) == 0 {
return
}
checkQueue := lfs.NewDownloadCheckQueue(len(missingLocalObjects), missingSize, true)
for _, p := range missingLocalObjects {
checkQueue.Add(lfs.NewDownloadCheckable(p))
}
// this channel is filled with oids for which Check() succeeded & Transfer() was called
transferc := checkQueue.Watch()
done := make(chan int)
go func() {
for oid := range transferc {
c.uploadedOids.Add(oid)
}
done <- 1
}()
// Currently this is needed to flush the batch but is not enough to sync transferc completely
checkQueue.Wait()
<-done
}
type transferQueueSkip interface {
Skip(int64)
}
type skipNoOp struct{}
func (s *skipNoOp) Skip(n int64) {}
var noSkip = &skipNoOp{}

@ -75,6 +75,10 @@ func (q *TransferQueue) Add(t Transferable) {
q.apic <- t q.apic <- t
} }
func (q *TransferQueue) Skip(size int64) {
q.meter.Skip(size)
}
// Wait waits for the queue to finish processing all transfers. Once Wait is // Wait waits for the queue to finish processing all transfers. Once Wait is
// called, Add will no longer add transferables to the queue. Any failed // called, Add will no longer add transferables to the queue. Any failed
// transfers will be automatically retried once. // transfers will be automatically retried once.
@ -153,7 +157,7 @@ func (q *TransferQueue) individualApiRoutine(apiWaiter chan interface{}) {
q.meter.Add(t.Name()) q.meter.Add(t.Name())
q.transferc <- t q.transferc <- t
} else { } else {
q.meter.Skip(t.Size()) q.Skip(t.Size())
q.wait.Done() q.wait.Done()
} }
} }
@ -228,7 +232,7 @@ func (q *TransferQueue) batchApiRoutine() {
for _, o := range objects { for _, o := range objects {
if o.Error != nil { if o.Error != nil {
q.errorc <- Errorf(o.Error, "[%v] %v", o.Oid, o.Error.Message) q.errorc <- Errorf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
q.meter.Skip(o.Size) q.Skip(o.Size)
q.wait.Done() q.wait.Done()
continue continue
} }
@ -240,11 +244,11 @@ func (q *TransferQueue) batchApiRoutine() {
q.meter.Add(transfer.Name()) q.meter.Add(transfer.Name())
q.transferc <- transfer q.transferc <- transfer
} else { } else {
q.meter.Skip(transfer.Size()) q.Skip(transfer.Size())
q.wait.Done() q.wait.Done()
} }
} else { } else {
q.meter.Skip(o.Size) q.Skip(o.Size)
q.wait.Done() q.wait.Done()
} }
} }

@ -189,7 +189,7 @@ begin_test "pre-push with missing pointer not on server"
git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 | git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 |
tee push.log tee push.log
set -e set -e
grep "new.dat is an LFS pointer to 7aa7a5359173d05b63cfd682e3c38487f3cb4f7f1d60659fe59fab1505977d4c, which does not exist in .git/lfs/objects" push.log grep "7aa7a5359173d05b63cfd682e3c38487f3cb4f7f1d60659fe59fab1505977d4c does not exist in .git/lfs/objects. Tried new.dat, which matches 7aa7a5359173d05b63cfd682e3c38487f3cb4f7f1d60659fe59fab1505977d4c." push.log
) )
end_test end_test
@ -254,7 +254,7 @@ begin_test "pre-push with missing pointer not on server (BATCH)"
git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 | git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 |
tee push.log tee push.log
set -e set -e
grep "new.dat is an LFS pointer to 7aa7a5359173d05b63cfd682e3c38487f3cb4f7f1d60659fe59fab1505977d4c, which does not exist in .git/lfs/objects" push.log grep "7aa7a5359173d05b63cfd682e3c38487f3cb4f7f1d60659fe59fab1505977d4c does not exist in .git/lfs/objects. Tried new.dat, which matches 7aa7a5359173d05b63cfd682e3c38487f3cb4f7f1d60659fe59fab1505977d4c." push.log
) )
end_test end_test
@ -448,7 +448,7 @@ begin_test "pre-push unfetched deleted remote branch & server GC"
# we hadn't done git fetch --prune yet # we hadn't done git fetch --prune yet
mv branch-to-delete.ref .git/refs/remotes/origin/branch-to-delete mv branch-to-delete.ref .git/refs/remotes/origin/branch-to-delete
# Confirm that local cache of remote branch is back # Confirm that local cache of remote branch is back
git branch -r 2>&1 | tee branch-r.log git branch -r 2>&1 | tee branch-r.log
grep "origin/branch-to-delete" branch-r.log grep "origin/branch-to-delete" branch-r.log
# Now push later branch which should now need to re-push previous commits LFS too # Now push later branch which should now need to re-push previous commits LFS too
git push origin branch-to-push-after git push origin branch-to-push-after
@ -509,10 +509,10 @@ begin_test "pre-push delete branch"
assert_server_object "$reponame" "${oid[$a]}" assert_server_object "$reponame" "${oid[$a]}"
done done
# deleting a branch with git push should not fail # deleting a branch with git push should not fail
# (requires correct special casing of "(delete) 0000000000.." in hook) # (requires correct special casing of "(delete) 0000000000.." in hook)
git push origin --delete branch-to-delete git push origin --delete branch-to-delete
) )
end_test end_test

@ -132,7 +132,7 @@ begin_test "push --all (no ref args)"
[ $(grep -c "push" < push.log) -eq 6 ] [ $(grep -c "push" < push.log) -eq 6 ]
git push --all origin 2>&1 | tee push.log git push --all origin 2>&1 | tee push.log
grep "5 files" push.log # should be 6? [ $(grep -c "(3 of 3 files)" push.log) -eq 2 ]
assert_server_object "$reponame-$suffix" "$oid1" assert_server_object "$reponame-$suffix" "$oid1"
assert_server_object "$reponame-$suffix" "$oid2" assert_server_object "$reponame-$suffix" "$oid2"
assert_server_object "$reponame-$suffix" "$oid3" assert_server_object "$reponame-$suffix" "$oid3"
@ -151,7 +151,7 @@ begin_test "push --all (no ref args)"
refute_server_object "$reponame-$suffix-2" "$extraoid" refute_server_object "$reponame-$suffix-2" "$extraoid"
rm ".git/lfs/objects/${oid1:0:2}/${oid1:2:2}/$oid1" rm ".git/lfs/objects/${oid1:0:2}/${oid1:2:2}/$oid1"
# dry run doesn't change echo "dry run missing local object that exists on server"
git lfs push --dry-run --all origin 2>&1 | tee push.log git lfs push --dry-run --all origin 2>&1 | tee push.log
grep "push $oid1 => file1.dat" push.log grep "push $oid1 => file1.dat" push.log
grep "push $oid2 => file1.dat" push.log grep "push $oid2 => file1.dat" push.log
@ -162,7 +162,10 @@ begin_test "push --all (no ref args)"
[ $(grep -c "push" push.log) -eq 6 ] [ $(grep -c "push" push.log) -eq 6 ]
git push --all origin 2>&1 | tee push.log git push --all origin 2>&1 | tee push.log
grep "5 files, 1 skipped" push.log # should be 5? grep "(2 of 3 files, 1 skipped)" push.log
grep "(3 of 3 files)" push.log
grep "files)" push.log
grep "skipped)" push.log
assert_server_object "$reponame-$suffix-2" "$oid2" assert_server_object "$reponame-$suffix-2" "$oid2"
assert_server_object "$reponame-$suffix-2" "$oid3" assert_server_object "$reponame-$suffix-2" "$oid3"
assert_server_object "$reponame-$suffix-2" "$oid4" assert_server_object "$reponame-$suffix-2" "$oid4"
@ -483,4 +486,3 @@ begin_test "push ambiguous branch name"
) )
end_test end_test