ssh: support multiple connections for one transfer

Since we currently support multiple connections for HTTPS, let's also
add multiple connections for pure SSH connections.  For now, we spawn a
whole new connection, but in the future we'll support using OpenSSH's
ControlMaster flag.

Introduce several new functions to create SSH connections and adjust the
number of connection being used.
This commit is contained in:
brian m. carlson 2021-03-30 16:17:05 +00:00
parent 594f8e386c
commit 898dc43d1d
No known key found for this signature in database
GPG Key ID: 2D0C9BC12F82B3A1
2 changed files with 84 additions and 13 deletions

@ -1,16 +1,38 @@
package ssh package ssh
import ( import (
"sync"
"github.com/git-lfs/git-lfs/config" "github.com/git-lfs/git-lfs/config"
"github.com/git-lfs/git-lfs/subprocess" "github.com/git-lfs/git-lfs/subprocess"
"github.com/git-lfs/pktline" "github.com/git-lfs/pktline"
) )
type SSHTransfer struct { type SSHTransfer struct {
conn *PktlineConnection lock *sync.RWMutex
conn []*PktlineConnection
osEnv config.Environment
gitEnv config.Environment
meta *SSHMetadata
operation string
} }
func NewSSHTransfer(osEnv config.Environment, gitEnv config.Environment, meta *SSHMetadata, operation string) (*SSHTransfer, error) { func NewSSHTransfer(osEnv config.Environment, gitEnv config.Environment, meta *SSHMetadata, operation string) (*SSHTransfer, error) {
conn, err := startConnection(osEnv, gitEnv, meta, operation)
if err != nil {
return nil, err
}
return &SSHTransfer{
lock: &sync.RWMutex{},
osEnv: osEnv,
gitEnv: gitEnv,
meta: meta,
operation: operation,
conn: []*PktlineConnection{conn},
}, nil
}
func startConnection(osEnv config.Environment, gitEnv config.Environment, meta *SSHMetadata, operation string) (*PktlineConnection, error) {
exe, args := GetLFSExeAndArgs(osEnv, gitEnv, meta, "git-lfs-transfer", operation) exe, args := GetLFSExeAndArgs(osEnv, gitEnv, meta, "git-lfs-transfer", operation)
cmd := subprocess.ExecCommand(exe, args...) cmd := subprocess.ExecCommand(exe, args...)
r, err := cmd.StdoutPipe() r, err := cmd.StdoutPipe()
@ -37,14 +59,63 @@ func NewSSHTransfer(osEnv config.Environment, gitEnv config.Environment, meta *S
pl: pl, pl: pl,
} }
err = conn.Start() err = conn.Start()
if err != nil { return conn, err
return nil, err
}
return &SSHTransfer{
conn: conn,
}, nil
} }
func (tr *SSHTransfer) Connection() *PktlineConnection { // Connection returns the nth connection (starting from 0) in this transfer
return tr.conn // instance or nil if there is no such item.
func (tr *SSHTransfer) Connection(n int) *PktlineConnection {
tr.lock.RLock()
defer tr.lock.RUnlock()
if n >= len(tr.conn) {
return nil
}
return tr.conn[n]
}
// ConnectionCount returns the number of connections this object has.
func (tr *SSHTransfer) ConnectionCount() int {
tr.lock.RLock()
defer tr.lock.RUnlock()
return len(tr.conn)
}
// SetConnectionCount sets the number of connections to the specified number.
func (tr *SSHTransfer) SetConnectionCount(n int) error {
tr.lock.Lock()
defer tr.lock.Unlock()
return tr.setConnectionCount(n)
}
// SetConnectionCountAtLeast sets the number of connections to be not less than
// the specified number.
func (tr *SSHTransfer) SetConnectionCountAtLeast(n int) error {
tr.lock.Lock()
defer tr.lock.Unlock()
count := len(tr.conn)
if n <= count {
return nil
}
return tr.setConnectionCount(n)
}
func (tr *SSHTransfer) setConnectionCount(n int) error {
count := len(tr.conn)
if n < count {
for _, item := range tr.conn[n:count] {
if err := item.End(); err != nil {
return err
}
}
tr.conn = tr.conn[0:n]
} else if n > count {
for i := count; i < n; i++ {
conn, err := startConnection(tr.osEnv, tr.gitEnv, tr.meta, tr.operation)
if err != nil {
return err
}
tr.conn = append(tr.conn, conn)
}
}
return nil
} }

@ -25,7 +25,7 @@ type SSHBatchClient struct {
} }
func (a *SSHBatchClient) batchInternal(args []string, batchLines []string) (int, []string, error) { func (a *SSHBatchClient) batchInternal(args []string, batchLines []string) (int, []string, error) {
conn := a.transfer.Connection() conn := a.transfer.Connection(0)
conn.Lock() conn.Lock()
defer conn.Unlock() defer conn.Unlock()
err := conn.SendMessageWithLines("batch", args, batchLines) err := conn.SendMessageWithLines("batch", args, batchLines)
@ -196,7 +196,7 @@ func (a *SSHAdapter) download(t *Transfer, cb ProgressCallback) error {
// doDownload starts a download. f is expected to be an existing file open in RW mode // doDownload starts a download. f is expected to be an existing file open in RW mode
func (a *SSHAdapter) doDownload(t *Transfer, f *os.File, cb ProgressCallback) error { func (a *SSHAdapter) doDownload(t *Transfer, f *os.File, cb ProgressCallback) error {
conn := a.transfer.Connection() conn := a.transfer.Connection(0)
args := a.argumentsForTransfer(t, "download") args := a.argumentsForTransfer(t, "download")
conn.Lock() conn.Lock()
defer conn.Unlock() defer conn.Unlock()
@ -266,7 +266,7 @@ func (a *SSHAdapter) doDownload(t *Transfer, f *os.File, cb ProgressCallback) er
} }
func (a *SSHAdapter) verifyUpload(t *Transfer) error { func (a *SSHAdapter) verifyUpload(t *Transfer) error {
conn := a.transfer.Connection() conn := a.transfer.Connection(0)
args := a.argumentsForTransfer(t, "upload") args := a.argumentsForTransfer(t, "upload")
conn.Lock() conn.Lock()
defer conn.Unlock() defer conn.Unlock()
@ -288,7 +288,7 @@ func (a *SSHAdapter) verifyUpload(t *Transfer) error {
} }
func (a *SSHAdapter) doUpload(t *Transfer, f *os.File, cb ProgressCallback) (int, []string, []string, error) { func (a *SSHAdapter) doUpload(t *Transfer, f *os.File, cb ProgressCallback) (int, []string, []string, error) {
conn := a.transfer.Connection() conn := a.transfer.Connection(0)
args := a.argumentsForTransfer(t, "upload") args := a.argumentsForTransfer(t, "upload")
// Ensure progress callbacks made while uploading // Ensure progress callbacks made while uploading