brian m. carlson 00623425a2
tq: make Manifest an interface
Right now, any time we instantiate a Manifest object, we create an API
client, and when we create the API client, if we're using SSH, we try to
make a connection to the server.  However, we often instantiate a
Manifest object when performing various functionality such as smudging
data, which means that when a user creates an archive locally, they can
be prompted for an SSH password, which is undesirable.

Let's take a first step to fixing this by making Manifest an interface.
Right now, it has one concrete version, a concreteManifest, which can be
used to access the internals, and we provide methods to upgrade it from
the interface to the concrete type and determine whether it's upgraded
or not.  We attempt to upgrade it any time we need to access its
internals.  In the future, we'll also offer a lazyManifest, which is
lazy and will only instantiate the concreteManifest inside when we
attempt to upgrade it to the latter.  But for now, only implement the
concreteManifest to make it clearer what's changing.

Similarly, we make our TransferQueue upgradable so that we don't
upgrade its Manifest right away.

In both cases, we'll want to use the lazyManifest to delay the
instantiation of the API client (and hence the starting of the SSH
connection) in a future commit.
2023-03-23 16:55:57 +00:00

408 lines
13 KiB

package tq
import (
// Adapter for custom transfer via external process
type customAdapter struct {
path string
args string
concurrent bool
originalConcurrency int
standalone bool
// Struct to capture stderr and write to trace
type traceWriter struct {
buf bytes.Buffer
processName string
func (t *traceWriter) Write(b []byte) (int, error) {
n, err := t.buf.Write(b)
return n, err
func (t *traceWriter) Flush() {
var err error
for err == nil {
var s string
s, err = t.buf.ReadString('\n')
if len(s) > 0 {
tracerx.Printf("xfer[%v]: %v", t.processName, strings.TrimSpace(s))
type customAdapterWorkerContext struct {
workerNum int
cmd *subprocess.Cmd
stdout io.ReadCloser
bufferedOut *bufio.Reader
stdin io.WriteCloser
errTracer *traceWriter
type customAdapterInitRequest struct {
Event string `json:"event"`
Operation string `json:"operation"`
Remote string `json:"remote"`
Concurrent bool `json:"concurrent"`
ConcurrentTransfers int `json:"concurrenttransfers"`
func NewCustomAdapterInitRequest(
op string, remote string, concurrent bool, concurrentTransfers int,
) *customAdapterInitRequest {
return &customAdapterInitRequest{"init", op, remote, concurrent, concurrentTransfers}
type customAdapterTransferRequest struct {
// common between upload/download
Event string `json:"event"`
Oid string `json:"oid"`
Size int64 `json:"size"`
Path string `json:"path,omitempty"`
Action *Action `json:"action"`
func NewCustomAdapterUploadRequest(oid string, size int64, path string, action *Action) *customAdapterTransferRequest {
return &customAdapterTransferRequest{"upload", oid, size, path, action}
func NewCustomAdapterDownloadRequest(oid string, size int64, action *Action) *customAdapterTransferRequest {
return &customAdapterTransferRequest{"download", oid, size, "", action}
type customAdapterTerminateRequest struct {
Event string `json:"event"`
func NewCustomAdapterTerminateRequest() *customAdapterTerminateRequest {
return &customAdapterTerminateRequest{"terminate"}
// A common struct that allows all types of response to be identified
type customAdapterResponseMessage struct {
Event string `json:"event"`
Error *ObjectError `json:"error"`
Oid string `json:"oid"`
Path string `json:"path,omitempty"` // always blank for upload
BytesSoFar int64 `json:"bytesSoFar"`
BytesSinceLast int `json:"bytesSinceLast"`
func (a *customAdapter) Begin(cfg AdapterConfig, cb ProgressCallback) error {
a.originalConcurrency = cfg.ConcurrentTransfers()
if a.concurrent {
// Use common workers impl, but downgrade workers to number of processes
return a.adapterBase.Begin(cfg, cb)
// If config says not to launch multiple processes, downgrade incoming value
return a.adapterBase.Begin(&customAdapterConfig{AdapterConfig: cfg}, cb)
func (a *customAdapter) WorkerStarting(workerNum int) (interface{}, error) {
// Start a process per worker
// If concurrent = false we have already dialled back workers to 1
a.Trace("xfer: starting up custom transfer process %q for worker %d",, workerNum)
cmdName, cmdArgs := subprocess.FormatForShell(subprocess.ShellQuoteSingle(a.path), a.args)
cmd, err := subprocess.ExecCommand(cmdName, cmdArgs...)
if err != nil {
return nil, errors.New(tr.Tr.Get("failed to find custom transfer command %q remote: %v", a.path, err))
outp, err := cmd.StdoutPipe()
if err != nil {
return nil, errors.New(tr.Tr.Get("failed to get stdout for custom transfer command %q remote: %v", a.path, err))
inp, err := cmd.StdinPipe()
if err != nil {
return nil, errors.New(tr.Tr.Get("failed to get stdin for custom transfer command %q remote: %v", a.path, err))
// Capture stderr to trace
tracer := &traceWriter{}
tracer.processName = filepath.Base(a.path)
cmd.Stderr = tracer
err = cmd.Start()
if err != nil {
return nil, errors.New(tr.Tr.Get("failed to start custom transfer command %q remote: %v", a.path, err))
// Set up buffered reader/writer since we operate on lines
ctx := &customAdapterWorkerContext{workerNum, cmd, outp, bufio.NewReader(outp), inp, tracer}
// send initiate message
initReq := NewCustomAdapterInitRequest(
a.getOperationName(), a.remote, a.concurrent, a.originalConcurrency,
resp, err := a.exchangeMessage(ctx, initReq)
if err != nil {
return nil, err
if resp.Error != nil {
return nil, errors.New(tr.Tr.Get("error initializing custom adapter %q worker %d: %v",, workerNum, resp.Error))
a.Trace("xfer: started custom adapter process %q for worker %d OK", a.path, workerNum)
// Save this process context and use in future callbacks
return ctx, nil
func (a *customAdapter) getOperationName() string {
if a.direction == Download {
return "download"
return "upload"
// sendMessage sends a JSON message to the custom adapter process
func (a *customAdapter) sendMessage(ctx *customAdapterWorkerContext, req interface{}) error {
b, err := json.Marshal(req)
if err != nil {
return err
a.Trace("xfer: Custom adapter worker %d sending message: %v", ctx.workerNum, string(b))
// Line oriented JSON
b = append(b, '\n')
_, err = ctx.stdin.Write(b)
return err
func (a *customAdapter) readResponse(ctx *customAdapterWorkerContext) (*customAdapterResponseMessage, error) {
line, err := ctx.bufferedOut.ReadString('\n')
if err != nil {
return nil, err
a.Trace("xfer: Custom adapter worker %d received response: %v", ctx.workerNum, strings.TrimSpace(line))
resp := &customAdapterResponseMessage{}
err = json.Unmarshal([]byte(line), resp)
return resp, err
// exchangeMessage sends a message to a process and reads a response if resp != nil
// Only fatal errors to communicate return an error, errors may be embedded in reply
func (a *customAdapter) exchangeMessage(ctx *customAdapterWorkerContext, req interface{}) (*customAdapterResponseMessage, error) {
err := a.sendMessage(ctx, req)
if err != nil {
return nil, err
return a.readResponse(ctx)
// shutdownWorkerProcess terminates gracefully a custom adapter process
// returns an error if it couldn't shut down gracefully (caller may abortWorkerProcess)
func (a *customAdapter) shutdownWorkerProcess(ctx *customAdapterWorkerContext) error {
defer ctx.errTracer.Flush()
a.Trace("xfer: Shutting down adapter worker %d", ctx.workerNum)
finishChan := make(chan error, 1)
go func() {
termReq := NewCustomAdapterTerminateRequest()
err := a.sendMessage(ctx, termReq)
if err != nil {
finishChan <- err
finishChan <- ctx.cmd.Wait()
select {
case err := <-finishChan:
return err
case <-time.After(30 * time.Second):
return errors.New(tr.Tr.Get("timeout while shutting down worker process %d", ctx.workerNum))
// abortWorkerProcess terminates & aborts untidily, most probably breakdown of comms or internal error
func (a *customAdapter) abortWorkerProcess(ctx *customAdapterWorkerContext) {
a.Trace("xfer: Aborting worker process: %d", ctx.workerNum)
func (a *customAdapter) WorkerEnding(workerNum int, ctx interface{}) {
customCtx, ok := ctx.(*customAdapterWorkerContext)
if !ok {
tracerx.Printf("Context object for custom transfer %q was of the wrong type",
err := a.shutdownWorkerProcess(customCtx)
if err != nil {
tracerx.Printf("xfer: error finishing up custom transfer process %q worker %d, aborting: %v", a.path, customCtx.workerNum, err)
func (a *customAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error {
if ctx == nil {
return errors.New(tr.Tr.Get("custom transfer %q was not properly initialized, see previous errors",
customCtx, ok := ctx.(*customAdapterWorkerContext)
if !ok {
return errors.New(tr.Tr.Get("context object for custom transfer %q was of the wrong type",
var authCalled bool
rel, err := t.Rel(a.getOperationName())
if err != nil {
return err
if rel == nil && !a.standalone {
return errors.Errorf(tr.Tr.Get("Object %s not found on the server.", t.Oid))
var req *customAdapterTransferRequest
if a.direction == Upload {
req = NewCustomAdapterUploadRequest(t.Oid, t.Size, t.Path, rel)
} else {
req = NewCustomAdapterDownloadRequest(t.Oid, t.Size, rel)
if err = a.sendMessage(customCtx, req); err != nil {
return err
// 1..N replies (including progress & one of download / upload)
var complete bool
for !complete {
resp, err := a.readResponse(customCtx)
if err != nil {
return err
var wasAuthOk bool
switch resp.Event {
case "progress":
// Progress
if resp.Oid != t.Oid {
return errors.New(tr.Tr.Get("unexpected OID %q in response, expecting %q", resp.Oid, t.Oid))
if cb != nil {
cb(t.Name, t.Size, resp.BytesSoFar, resp.BytesSinceLast)
wasAuthOk = resp.BytesSoFar > 0
case "complete":
// Download/Upload complete
if resp.Oid != t.Oid {
return errors.New(tr.Tr.Get("unexpected OID %q in response, expecting %q", resp.Oid, t.Oid))
if resp.Error != nil {
return errors.New(tr.Tr.Get("error transferring %q: %v", t.Oid, resp.Error))
if a.direction == Download {
// So we don't have to blindly trust external providers, check SHA
if err = tools.VerifyFileHash(t.Oid, resp.Path); err != nil {
return errors.New(tr.Tr.Get("downloaded file failed checks: %v", err))
// Move file to final location
if err = tools.RenameFileCopyPermissions(resp.Path, t.Path); err != nil {
return errors.New(tr.Tr.Get("failed to copy downloaded file: %v", err))
} else if a.direction == Upload {
if err = verifyUpload(a.apiClient, a.remote, t); err != nil {
return err
wasAuthOk = true
complete = true
return errors.New(tr.Tr.Get("invalid message %q from custom adapter %q", resp.Event,
// Fall through from both progress and completion messages
// Call auth on first progress or success to free up other workers
if wasAuthOk && authOkFunc != nil && !authCalled {
authCalled = true
return nil
func newCustomAdapter(f *fs.Filesystem, name string, dir Direction, path, args string, concurrent, standalone bool) *customAdapter {
c := &customAdapter{newAdapterBase(f, name, dir, nil), path, args, concurrent, 3, standalone}
// self implements impl
c.transferImpl = c
return c
const (
standaloneFileName = "lfs-standalone-file"
func configureDefaultCustomAdapters(git Env, m *concreteManifest) {
newfunc := func(name string, dir Direction) Adapter {
standalone := m.standaloneTransferAgent != ""
return newCustomAdapter(m.fs, standaloneFileName, dir, "git-lfs", "standalone-file", false, standalone)
m.RegisterNewAdapterFunc(standaloneFileName, Download, newfunc)
m.RegisterNewAdapterFunc(standaloneFileName, Upload, newfunc)
// Initialise custom adapters based on current config
func configureCustomAdapters(git Env, m *concreteManifest) {
configureDefaultCustomAdapters(git, m)
pathRegex := regexp.MustCompile(`lfs.customtransfer.([^.]+).path`)
for k, _ := range git.All() {
match := pathRegex.FindStringSubmatch(k)
if match == nil {
name := match[1]
path, _ := git.Get(k)
// retrieve other values
args, _ := git.Get(fmt.Sprintf("lfs.customtransfer.%s.args", name))
concurrent := git.Bool(fmt.Sprintf("lfs.customtransfer.%s.concurrent", name), true)
direction, _ := git.Get(fmt.Sprintf("lfs.customtransfer.%s.direction", name))
if len(direction) == 0 {
direction = "both"
} else {
direction = strings.ToLower(direction)
// Separate closure for each since we need to capture vars above
newfunc := func(name string, dir Direction) Adapter {
standalone := m.standaloneTransferAgent != ""
return newCustomAdapter(m.fs, name, dir, path, args, concurrent, standalone)
if direction == "download" || direction == "both" {
m.RegisterNewAdapterFunc(name, Download, newfunc)
if direction == "upload" || direction == "both" {
m.RegisterNewAdapterFunc(name, Upload, newfunc)
type customAdapterConfig struct {
func (c *customAdapterConfig) ConcurrentTransfers() int {
return 1