From ad4313b93fef0ccd2477e3944281e5205ae7e897 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 11 Feb 2022 17:09:58 +0100 Subject: [PATCH] Untested command runner with 'sleep' and 'echo' commands --- internal/worker/command_executor.go | 63 +++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/internal/worker/command_executor.go b/internal/worker/command_executor.go index 6f7b2d96..5de9dca0 100644 --- a/internal/worker/command_executor.go +++ b/internal/worker/command_executor.go @@ -23,7 +23,10 @@ package worker import ( "context" "errors" + "fmt" + "time" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "gitlab.com/blender/flamenco-ng-poc/pkg/api" ) @@ -37,19 +40,73 @@ type CommandListener interface { type CommandExecutor struct { listener CommandListener + // registry maps a command name to a function that runs that command. + registry map[string]commandCallable } var _ CommandRunner = (*CommandExecutor)(nil) +type commandCallable func(ctx context.Context, logger zerolog.Logger, taskID TaskID, cmd api.Command) error + func NewCommandExecutor(listener CommandListener) *CommandExecutor { - return &CommandExecutor{ + ce := &CommandExecutor{ listener: listener, } + ce.registry = map[string]commandCallable{ + "echo": ce.cmdEcho, + "sleep": ce.cmdSleep, + } + return ce } -func (te *CommandExecutor) Run(ctx context.Context, taskID TaskID, cmd api.Command) error { +func (ce *CommandExecutor) Run(ctx context.Context, taskID TaskID, cmd api.Command) error { logger := log.With().Str("task", string(taskID)).Str("command", cmd.Name).Logger() logger.Info().Interface("settings", cmd.Settings).Msg("running command") - return errors.New("command running not implemented") + runner, ok := ce.registry[cmd.Name] + if !ok { + return fmt.Errorf("unknown command: %q", cmd.Name) + } + + return runner(ctx, logger, taskID, cmd) +} + +func (ce *CommandExecutor) cmdEcho(ctx context.Context, logger zerolog.Logger, taskID TaskID, cmd api.Command) error { + message, ok := cmd.Settings["message"] + if !ok { + return fmt.Errorf("missing 'message' setting") + } + messageStr := fmt.Sprintf("%v", message) + + logger.Info().Str("message", messageStr).Msg("echo") + logLines := []string{ + fmt.Sprintf("echo: %q", messageStr), + } + + if err := ce.listener.LogProduced(taskID, logLines); err != nil { + return err + } + return nil +} + +func (ce *CommandExecutor) cmdSleep(ctx context.Context, logger zerolog.Logger, taskID TaskID, cmd api.Command) error { + + sleepTime, ok := cmd.Settings["time_in_seconds"] + if !ok { + return errors.New("missing setting 'time_in_seconds'") + } + + var duration time.Duration + switch v := sleepTime.(type) { + case int: + duration = time.Duration(v) * time.Second + default: + log.Warn().Interface("time_in_seconds", v).Msg("bad type for setting 'time_in_seconds', expected int") + return fmt.Errorf("bad type for setting 'time_in_seconds', expected int, not %v", v) + } + + log.Info().Str("duration", duration.String()).Msg("sleep") + time.Sleep(duration) + + return nil }