Worker: Refactor the running of subprocesses

Blender and FFmpeg were run in the same way, using copy-pasted code. This
is now abstracted away into the CLI runner, which in turn is moved into
its own subpackage.

No functional changes.
This commit is contained in:
Sybren A. Stüvel 2022-07-28 14:29:46 +02:00
parent c42665322b
commit c79fe55068
8 changed files with 170 additions and 129 deletions

@ -23,6 +23,7 @@ import (
"git.blender.org/flamenco/internal/appinfo"
"git.blender.org/flamenco/internal/find_ffmpeg"
"git.blender.org/flamenco/internal/worker"
"git.blender.org/flamenco/internal/worker/cli_runner"
)
var (
@ -132,7 +133,7 @@ func main() {
return
}
cliRunner := worker.NewCLIRunner()
cliRunner := cli_runner.NewCLIRunner()
listener = worker.NewListener(client, buffer)
cmdRunner := worker.NewCommandExecutor(cliRunner, listener, timeService)
taskRunner := worker.NewTaskExecutor(cmdRunner, listener)

@ -1,20 +0,0 @@
package worker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"os/exec"
)
// CLIRunner is a wrapper around exec.CommandContext() to allow mocking.
type CLIRunner struct {
}
func NewCLIRunner() *CLIRunner {
return &CLIRunner{}
}
func (cli *CLIRunner) CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd {
return exec.CommandContext(ctx, name, arg...)
}

@ -0,0 +1,103 @@
package cli_runner
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"github.com/rs/zerolog"
)
// The buffer size used to read stdout/stderr output from subprocesses.
// Effectively this determines the maximum line length that can be handled.
const StdoutBufferSize = 40 * 1024
// CLIRunner is a wrapper around exec.CommandContext() to allow mocking.
type CLIRunner struct {
}
func NewCLIRunner() *CLIRunner {
return &CLIRunner{}
}
func (cli *CLIRunner) CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd {
return exec.CommandContext(ctx, name, arg...)
}
// RunWithTextOutput runs a command and sends its output line-by-line to the
// lineChannel. Stdout and stderr are combined.
func (cli *CLIRunner) RunWithTextOutput(
ctx context.Context,
logger zerolog.Logger,
execCmd *exec.Cmd,
logChunker LogChunker,
lineChannel chan<- string,
) error {
outPipe, err := execCmd.StdoutPipe()
if err != nil {
return err
}
execCmd.Stderr = execCmd.Stdout // Redirect stderr to stdout.
if err := execCmd.Start(); err != nil {
logger.Error().Err(err).Msg("error starting CLI execution")
return err
}
blenderPID := execCmd.Process.Pid
logger = logger.With().Int("pid", blenderPID).Logger()
reader := bufio.NewReaderSize(outPipe, StdoutBufferSize)
for {
lineBytes, isPrefix, readErr := reader.ReadLine()
if readErr == io.EOF {
break
}
if readErr != nil {
logger.Error().Err(err).Msg("error reading stdout/err")
return err
}
line := string(lineBytes)
if isPrefix {
logger.Warn().
Str("line", fmt.Sprintf("%s...", line[:256])).
Int("lineLength", len(line)).
Msg("unexpectedly long line read, truncating")
}
logger.Debug().Msg(line)
if lineChannel != nil {
lineChannel <- line
}
if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line)); err != nil {
return fmt.Errorf("appending log entry to log chunker: %w", err)
}
}
if err := logChunker.Flush(ctx); err != nil {
return fmt.Errorf("flushing log chunker: %w", err)
}
if err := execCmd.Wait(); err != nil {
logger.Error().Err(err).Msg("error in CLI execution")
return err
}
if execCmd.ProcessState.Success() {
logger.Info().Msg("command exited succesfully")
} else {
logger.Error().
Int("exitCode", execCmd.ProcessState.ExitCode()).
Msg("command exited abnormally")
return fmt.Errorf("command exited abnormally with code %d", execCmd.ProcessState.ExitCode())
}
return nil
}

@ -0,0 +1,12 @@
package cli_runner
// SPDX-License-Identifier: GPL-3.0-or-later
import "context"
type LogChunker interface {
// Flush sends any buffered logs to the listener.
Flush(ctx context.Context) error
// Append log lines to the buffer, sending to the listener when the buffer gets too large.
Append(ctx context.Context, logLines ...string) error
}

@ -5,12 +5,11 @@ package worker
/* This file contains the commands in the "blender" type group. */
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"regexp"
"sync"
"github.com/google/shlex"
"github.com/rs/zerolog"
@ -20,10 +19,6 @@ import (
"git.blender.org/flamenco/pkg/crosspath"
)
// The buffer size used to read stdout/stderr output from Blender.
// Effectively this determines the maximum line length that can be handled.
const StdoutBufferSize = 40 * 1024
var regexpFileSaved = regexp.MustCompile("Saved: '(.*)'")
type BlenderParameters struct {
@ -43,65 +38,39 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog.
return err
}
outPipe, err := execCmd.StdoutPipe()
if err != nil {
return err
}
execCmd.Stderr = execCmd.Stdout // Redirect stderr to stdout.
if err := execCmd.Start(); err != nil {
logger.Error().Err(err).Msg("error starting CLI execution")
return err
}
blenderPID := execCmd.Process.Pid
logger = logger.With().Int("pid", blenderPID).Logger()
reader := bufio.NewReaderSize(outPipe, StdoutBufferSize)
logChunker := NewLogChunker(taskID, ce.listener, ce.timeService)
lineChannel := make(chan string)
for {
lineBytes, isPrefix, readErr := reader.ReadLine()
if readErr == io.EOF {
break
}
if readErr != nil {
logger.Error().Err(err).Msg("error reading stdout/err")
return err
}
line := string(lineBytes)
if isPrefix {
logger.Warn().
Str("line", fmt.Sprintf("%s...", line[:256])).
Int("lineLength", len(line)).
Msg("unexpectedly long line read, truncating")
// Process the output of Blender.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for line := range lineChannel {
ce.processLineBlender(ctx, logger, taskID, line)
}
}()
logger.Debug().Msg(line)
ce.processLineBlender(ctx, logger, taskID, line)
// Run the subprocess.
subprocessErr := ce.cli.RunWithTextOutput(ctx,
logger,
execCmd,
logChunker,
lineChannel,
)
if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", blenderPID, line)); err != nil {
return fmt.Errorf("appending log entry to log chunker: %w", err)
}
}
if err := logChunker.Flush(ctx); err != nil {
return fmt.Errorf("flushing log chunker: %w", err)
}
// Wait for the processing to stop.
close(lineChannel)
wg.Wait()
if err := execCmd.Wait(); err != nil {
logger.Error().Err(err).Msg("error in CLI execution")
return err
}
if execCmd.ProcessState.Success() {
logger.Info().Msg("command exited succesfully")
} else {
logger.Error().
if subprocessErr != nil {
logger.Error().Err(subprocessErr).
Int("exitCode", execCmd.ProcessState.ExitCode()).
Msg("command exited abnormally")
return fmt.Errorf("command exited abnormally with code %d", execCmd.ProcessState.ExitCode())
return subprocessErr
}
logger.Info().Msg("command exited succesfully")
return nil
}

@ -12,6 +12,7 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/internal/worker/cli_runner"
"git.blender.org/flamenco/pkg/api"
)
@ -50,6 +51,13 @@ type TimeService interface {
// CommandLineRunner is an interface around exec.CommandContext().
type CommandLineRunner interface {
CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd
RunWithTextOutput(
ctx context.Context,
logger zerolog.Logger,
execCmd *exec.Cmd,
logChunker cli_runner.LogChunker,
lineChannel chan<- string,
) error
}
// ErrNoExecCmd means CommandLineRunner.CommandContext() returned nil.

@ -5,11 +5,9 @@ package worker
/* This file contains the commands in the "ffmpeg" type group. */
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"os/exec"
@ -47,63 +45,17 @@ func (ce *CommandExecutor) cmdFramesToVideo(ctx context.Context, logger zerolog.
}
defer cleanup()
outPipe, err := execCmd.StdoutPipe()
if err != nil {
return err
}
execCmd.Stderr = execCmd.Stdout // Redirect stderr to stdout.
if err := execCmd.Start(); err != nil {
logger.Error().Err(err).Msg("error starting CLI execution")
return err
}
ffmpegPID := execCmd.Process.Pid
logger = logger.With().Int("pid", ffmpegPID).Logger()
reader := bufio.NewReaderSize(outPipe, StdoutBufferSize)
logChunker := NewLogChunker(taskID, ce.listener, ce.timeService)
subprocessErr := ce.cli.RunWithTextOutput(ctx, logger, execCmd, logChunker, nil)
for {
lineBytes, isPrefix, readErr := reader.ReadLine()
if readErr == io.EOF {
break
}
if readErr != nil {
logger.Error().Err(err).Msg("error reading stdout/err")
return err
}
line := string(lineBytes)
if isPrefix {
logger.Warn().
Str("line", fmt.Sprintf("%s...", line[:256])).
Int("lineLength", len(line)).
Msg("unexpectedly long line read, truncating")
}
logger.Debug().Msg(line)
if err := logChunker.Append(ctx, fmt.Sprintf("pid=%d > %s", ffmpegPID, line)); err != nil {
return fmt.Errorf("appending log entry to log chunker: %w", err)
}
}
if err := logChunker.Flush(ctx); err != nil {
return fmt.Errorf("flushing log chunker: %w", err)
}
if err := execCmd.Wait(); err != nil {
logger.Error().Err(err).Msg("error in CLI execution")
return err
}
if execCmd.ProcessState.Success() {
logger.Info().Msg("command exited succesfully")
} else {
logger.Error().
if subprocessErr != nil {
logger.Error().Err(subprocessErr).
Int("exitCode", execCmd.ProcessState.ExitCode()).
Msg("command exited abnormally")
return fmt.Errorf("command exited abnormally with code %d", execCmd.ProcessState.ExitCode())
return subprocessErr
}
logger.Info().Msg("command exited succesfully")
return nil
}

@ -9,7 +9,9 @@ import (
exec "os/exec"
reflect "reflect"
cli_runner "git.blender.org/flamenco/internal/worker/cli_runner"
gomock "github.com/golang/mock/gomock"
zerolog "github.com/rs/zerolog"
)
// MockCommandLineRunner is a mock of CommandLineRunner interface.
@ -53,3 +55,17 @@ func (mr *MockCommandLineRunnerMockRecorder) CommandContext(arg0, arg1 interface
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CommandContext", reflect.TypeOf((*MockCommandLineRunner)(nil).CommandContext), varargs...)
}
// RunWithTextOutput mocks base method.
func (m *MockCommandLineRunner) RunWithTextOutput(arg0 context.Context, arg1 zerolog.Logger, arg2 *exec.Cmd, arg3 cli_runner.LogChunker, arg4 chan<- string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RunWithTextOutput", arg0, arg1, arg2, arg3, arg4)
ret0, _ := ret[0].(error)
return ret0
}
// RunWithTextOutput indicates an expected call of RunWithTextOutput.
func (mr *MockCommandLineRunnerMockRecorder) RunWithTextOutput(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunWithTextOutput", reflect.TypeOf((*MockCommandLineRunner)(nil).RunWithTextOutput), arg0, arg1, arg2, arg3, arg4)
}