Fix T99434: Two-way Variables

Two-way variable implementation in the job submission end-point. Where
Flamenco v2 did the variable replacement in the add-on, this has now
been moved to the Manager itself. The only thing the add-on needs to
pass is its platform, so that the right values can be recognised.

This also implements two-way replacement when tasks are handed out, such
that the `{jobs}` value gets replaced to a value suitable for the
Worker's platform as well.
This commit is contained in:
Sybren A. Stüvel 2022-07-21 16:41:40 +02:00
parent 98555db1f6
commit 11a352968a
15 changed files with 683 additions and 44 deletions

@ -1,6 +1,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING, Optional, Union
import platform
import bpy
@ -43,6 +44,7 @@ def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]:
priority=50,
settings=settings,
metadata=metadata,
submitter_platform=platform.system().lower(),
)
return job

@ -17,5 +17,6 @@ curl -X 'POST' \
"sleep_repeats": 1,
"message": "Blender is {blender}"
},
"type": "echo-sleep-test"
"type": "echo-sleep-test",
"submitter_platform": "manager"
}'

@ -25,5 +25,6 @@ curl -v -X 'POST' \
"render_output_root": "/tmp/flamenco/",
"video_container_format": "MPEG1"
},
"priority": 50
"priority": 50,
"submitter_platform": "manager"
}'

@ -9,6 +9,7 @@ import (
"net/http"
"os"
"path"
"runtime"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
@ -76,6 +77,16 @@ func (f *Flamenco) SubmitJob(e echo.Context) error {
ctx := e.Request().Context()
submittedJob := api.SubmittedJob(job)
// Replace the special "manager" platform with the Manager's actual platform.
if submittedJob.SubmitterPlatform == "manager" {
submittedJob.SubmitterPlatform = runtime.GOOS
}
// Before compiling the job, replace the two-way variables. This ensures all
// the tasks also use those.
replaceTwoWayVariables(f.config, submittedJob)
authoredJob, err := f.jobCompiler.Compile(ctx, submittedJob)
if err != nil {
logger.Warn().Err(err).Msg("error compiling job")

@ -9,6 +9,7 @@ import (
"os"
"testing"
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/last_rendered"
"git.blender.org/flamenco/internal/manager/persistence"
@ -21,7 +22,7 @@ func ptr[T any](value T) *T {
return &value
}
func TestSubmitJob(t *testing.T) {
func TestSubmitJobWithoutSettings(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
@ -29,11 +30,18 @@ func TestSubmitJob(t *testing.T) {
worker := testWorker()
submittedJob := api.SubmittedJob{
Name: "поднео посао",
Type: "test",
Priority: 50,
Name: "поднео посао",
Type: "test",
Priority: 50,
SubmitterPlatform: worker.Platform,
}
mf.expectConvertTwoWayVariables(t,
config.VariableAudienceWorkers,
config.VariablePlatform(worker.Platform),
map[string]string{},
)
// Expect the job compiler to be called.
authoredJob := job_compilers.AuthoredJob{
JobID: "afc47568-bd9d-4368-8016-e91d945db36d",
@ -78,9 +86,96 @@ func TestSubmitJob(t *testing.T) {
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.SubmitJob(echoCtx)
assert.NoError(t, err)
}
func TestSubmitJobWithSettings(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
submittedJob := api.SubmittedJob{
Name: "поднео посао",
Type: "test",
Priority: 50,
SubmitterPlatform: worker.Platform,
Settings: &api.JobSettings{AdditionalProperties: map[string]interface{}{
"result": "/render/frames/exploding.kittens",
}},
Metadata: &api.JobMetadata{AdditionalProperties: map[string]string{
"project": "/projects/exploding-kittens",
}},
}
mf.expectConvertTwoWayVariables(t,
config.VariableAudienceWorkers,
config.VariablePlatform(worker.Platform),
map[string]string{
"jobbies": "/render/jobs",
"frames": "/render/frames",
"projects": "/projects",
},
)
// Same job submittedJob, but then with two-way variables injected.
variableReplacedSettings := map[string]interface{}{
"result": "{frames}/exploding.kittens",
}
variableReplacedMetadata := map[string]string{
"project": "{projects}/exploding-kittens",
}
variableReplacedJob := submittedJob
variableReplacedJob.Settings = &api.JobSettings{AdditionalProperties: variableReplacedSettings}
variableReplacedJob.Metadata = &api.JobMetadata{AdditionalProperties: variableReplacedMetadata}
// Expect the job compiler to be called.
authoredJob := job_compilers.AuthoredJob{
JobID: "afc47568-bd9d-4368-8016-e91d945db36d",
Name: variableReplacedJob.Name,
JobType: variableReplacedJob.Type,
Priority: variableReplacedJob.Priority,
Status: api.JobStatusUnderConstruction,
Created: mf.clock.Now(),
Settings: variableReplacedJob.Settings.AdditionalProperties,
Metadata: variableReplacedJob.Metadata.AdditionalProperties,
}
mf.jobCompiler.EXPECT().Compile(gomock.Any(), variableReplacedJob).Return(&authoredJob, nil)
// Expect the job to be saved with 'queued' status:
queuedJob := authoredJob
queuedJob.Status = api.JobStatusQueued
mf.persistence.EXPECT().StoreAuthoredJob(gomock.Any(), queuedJob).Return(nil)
// Expect the job to be fetched from the database again:
dbJob := persistence.Job{
UUID: queuedJob.JobID,
Name: queuedJob.Name,
JobType: queuedJob.JobType,
Priority: queuedJob.Priority,
Status: queuedJob.Status,
Settings: variableReplacedSettings,
Metadata: variableReplacedMetadata,
}
mf.persistence.EXPECT().FetchJob(gomock.Any(), queuedJob.JobID).Return(&dbJob, nil)
// Expect the new job to be broadcast.
jobUpdate := api.SocketIOJobUpdate{
Id: dbJob.UUID,
Name: &dbJob.Name,
Priority: dbJob.Priority,
Status: dbJob.Status,
Type: dbJob.JobType,
Updated: dbJob.UpdatedAt,
}
mf.broadcaster.EXPECT().BroadcastNewJob(jobUpdate)
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(submittedJob)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.SubmitJob(echoCtx)
assert.NoError(t, err)
}
func TestGetJobTypeHappy(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

@ -700,6 +700,18 @@ func (m *MockConfigService) EXPECT() *MockConfigServiceMockRecorder {
return m.recorder
}
// ConvertTwoWayVariables mocks base method.
func (m *MockConfigService) ConvertTwoWayVariables(arg0 <-chan string, arg1 chan<- string, arg2 config.VariableAudience, arg3 config.VariablePlatform) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "ConvertTwoWayVariables", arg0, arg1, arg2, arg3)
}
// ConvertTwoWayVariables indicates an expected call of ConvertTwoWayVariables.
func (mr *MockConfigServiceMockRecorder) ConvertTwoWayVariables(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConvertTwoWayVariables", reflect.TypeOf((*MockConfigService)(nil).ConvertTwoWayVariables), arg0, arg1, arg2, arg3)
}
// EffectiveStoragePath mocks base method.
func (m *MockConfigService) EffectiveStoragePath() string {
m.ctrl.T.Helper()
@ -715,17 +727,15 @@ func (mr *MockConfigServiceMockRecorder) EffectiveStoragePath() *gomock.Call {
}
// ExpandVariables mocks base method.
func (m *MockConfigService) ExpandVariables(arg0 string, arg1 config.VariableAudience, arg2 config.VariablePlatform) string {
func (m *MockConfigService) ExpandVariables(arg0 <-chan string, arg1 chan<- string, arg2 config.VariableAudience, arg3 config.VariablePlatform) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ExpandVariables", arg0, arg1, arg2)
ret0, _ := ret[0].(string)
return ret0
m.ctrl.Call(m, "ExpandVariables", arg0, arg1, arg2, arg3)
}
// ExpandVariables indicates an expected call of ExpandVariables.
func (mr *MockConfigServiceMockRecorder) ExpandVariables(arg0, arg1, arg2 interface{}) *gomock.Call {
func (mr *MockConfigServiceMockRecorder) ExpandVariables(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExpandVariables", reflect.TypeOf((*MockConfigService)(nil).ExpandVariables), arg0, arg1, arg2)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExpandVariables", reflect.TypeOf((*MockConfigService)(nil).ExpandVariables), arg0, arg1, arg2, arg3)
}
// ForceFirstRun mocks base method.

@ -0,0 +1,73 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: VariableReplacer)
// Package mocks is a generated GoMock package.
package mocks
import (
reflect "reflect"
config "git.blender.org/flamenco/internal/manager/config"
gomock "github.com/golang/mock/gomock"
)
// MockVariableReplacer is a mock of VariableReplacer interface.
type MockVariableReplacer struct {
ctrl *gomock.Controller
recorder *MockVariableReplacerMockRecorder
}
// MockVariableReplacerMockRecorder is the mock recorder for MockVariableReplacer.
type MockVariableReplacerMockRecorder struct {
mock *MockVariableReplacer
}
// NewMockVariableReplacer creates a new mock instance.
func NewMockVariableReplacer(ctrl *gomock.Controller) *MockVariableReplacer {
mock := &MockVariableReplacer{ctrl: ctrl}
mock.recorder = &MockVariableReplacerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockVariableReplacer) EXPECT() *MockVariableReplacerMockRecorder {
return m.recorder
}
// ConvertTwoWayVariables mocks base method.
func (m *MockVariableReplacer) ConvertTwoWayVariables(arg0 <-chan string, arg1 chan<- string, arg2 config.VariableAudience, arg3 config.VariablePlatform) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "ConvertTwoWayVariables", arg0, arg1, arg2, arg3)
}
// ConvertTwoWayVariables indicates an expected call of ConvertTwoWayVariables.
func (mr *MockVariableReplacerMockRecorder) ConvertTwoWayVariables(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConvertTwoWayVariables", reflect.TypeOf((*MockVariableReplacer)(nil).ConvertTwoWayVariables), arg0, arg1, arg2, arg3)
}
// ExpandVariables mocks base method.
func (m *MockVariableReplacer) ExpandVariables(arg0 <-chan string, arg1 chan<- string, arg2 config.VariableAudience, arg3 config.VariablePlatform) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "ExpandVariables", arg0, arg1, arg2, arg3)
}
// ExpandVariables indicates an expected call of ExpandVariables.
func (mr *MockVariableReplacerMockRecorder) ExpandVariables(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExpandVariables", reflect.TypeOf((*MockVariableReplacer)(nil).ExpandVariables), arg0, arg1, arg2, arg3)
}
// ResolveVariables mocks base method.
func (m *MockVariableReplacer) ResolveVariables(arg0 config.VariableAudience, arg1 config.VariablePlatform) map[string]config.ResolvedVariable {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ResolveVariables", arg0, arg1)
ret0, _ := ret[0].(map[string]config.ResolvedVariable)
return ret0
}
// ResolveVariables indicates an expected call of ResolveVariables.
func (mr *MockVariableReplacerMockRecorder) ResolveVariables(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResolveVariables", reflect.TypeOf((*MockVariableReplacer)(nil).ResolveVariables), arg0, arg1)
}

@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/assert"
"git.blender.org/flamenco/internal/manager/api_impl/mocks"
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
)
@ -76,6 +77,55 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
}
}
func (mf *mockedFlamenco) expectExpandVariables(
t *testing.T,
expectAudience config.VariableAudience,
expectPlatform config.VariablePlatform,
variables map[string]string,
) *gomock.Call {
// Set up a fake configuration that matches the given variables.
c := config.DefaultConfig(func(c *config.Conf) {
for varName, varValue := range variables {
c.Variables[varName] = config.Variable{
Values: []config.VariableValue{
{Value: varValue, Audience: expectAudience, Platform: expectPlatform},
},
}
}
})
// Defer the mocked call to the fake configuration.
return mf.config.EXPECT().
ExpandVariables(gomock.Any(), gomock.Any(), expectAudience, expectPlatform).
DoAndReturn(c.ExpandVariables)
}
func (mf *mockedFlamenco) expectConvertTwoWayVariables(
t *testing.T,
expectAudience config.VariableAudience,
expectPlatform config.VariablePlatform,
variables map[string]string,
) *gomock.Call {
// Set up a fake configuration that matches the given variables.
c := config.DefaultConfig(func(c *config.Conf) {
for varName, varValue := range variables {
c.Variables[varName] = config.Variable{
IsTwoWay: true,
Values: []config.VariableValue{
{Value: varValue, Audience: expectAudience, Platform: expectPlatform},
},
}
}
})
// Defer the mocked call to the fake configuration.
return mf.config.EXPECT().
ConvertTwoWayVariables(gomock.Any(), gomock.Any(), expectAudience, expectPlatform).
DoAndReturn(c.ConvertTwoWayVariables)
}
// prepareMockedJSONRequest returns an `echo.Context` that has a JSON request body attached to it.
func (mf *mockedFlamenco) prepareMockedJSONRequest(requestBody interface{}) echo.Context {
bodyBytes, err := json.MarshalIndent(requestBody, "", " ")

@ -3,31 +3,45 @@ package api_impl
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"sync"
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
)
//go:generate go run github.com/golang/mock/mockgen -destination mocks/varrepl.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl VariableReplacer
type VariableReplacer interface {
ExpandVariables(valueToExpand string, audience config.VariableAudience, platform config.VariablePlatform) string
ExpandVariables(inputChannel <-chan string, outputChannel chan<- string, audience config.VariableAudience, platform config.VariablePlatform)
ResolveVariables(audience config.VariableAudience, platform config.VariablePlatform) map[string]config.ResolvedVariable
ConvertTwoWayVariables(inputChannel <-chan string, outputChannel chan<- string, audience config.VariableAudience, platform config.VariablePlatform)
}
// replaceTaskVariables performs variable replacement for worker tasks.
func replaceTaskVariables(replacer VariableReplacer, task api.AssignedTask, worker persistence.Worker) api.AssignedTask {
repl := func(value string) string {
return replacer.ExpandVariables(value, config.VariableAudienceWorkers, config.VariablePlatform(worker.Platform))
}
feeder := make(chan string, 1)
receiver := make(chan string, 1)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
replacer.ExpandVariables(feeder, receiver,
config.VariableAudienceWorkers, config.VariablePlatform(worker.Platform))
}()
for cmdIndex, cmd := range task.Commands {
for key, value := range cmd.Parameters {
switch v := value.(type) {
case string:
task.Commands[cmdIndex].Parameters[key] = repl(v)
feeder <- v
task.Commands[cmdIndex].Parameters[key] = <-receiver
case []string:
replaced := make([]string, len(v))
for idx := range v {
replaced[idx] = repl(v[idx])
feeder <- v[idx]
replaced[idx] = <-receiver
}
task.Commands[cmdIndex].Parameters[key] = replaced
@ -36,7 +50,8 @@ func replaceTaskVariables(replacer VariableReplacer, task api.AssignedTask, work
for idx := range v {
switch itemValue := v[idx].(type) {
case string:
replaced[idx] = repl(itemValue)
feeder <- itemValue
replaced[idx] = <-receiver
default:
replaced[idx] = itemValue
}
@ -49,5 +64,50 @@ func replaceTaskVariables(replacer VariableReplacer, task api.AssignedTask, work
}
}
close(feeder)
wg.Wait()
close(receiver)
return task
}
// replaceTwoWayVariables replaces values with their variables.
// For example, when there is a variable `render = /render/flamenco`, an output
// path `/render/flamenco/output.png` will be replaced with
// `{render}/output.png`
//
// NOTE: this updates the job in place.
func replaceTwoWayVariables(replacer VariableReplacer, job api.SubmittedJob) {
feeder := make(chan string, 1)
receiver := make(chan string, 1)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
replacer.ConvertTwoWayVariables(feeder, receiver,
config.VariableAudienceWorkers, config.VariablePlatform(job.SubmitterPlatform))
}()
// Only replace variables in settings and metadata, not in other job fields.
if job.Settings != nil {
for settingKey, settingValue := range job.Settings.AdditionalProperties {
stringValue, ok := settingValue.(string)
if !ok {
continue
}
feeder <- stringValue
job.Settings.AdditionalProperties[settingKey] = <-receiver
}
}
if job.Metadata != nil {
for metaKey, metaValue := range job.Metadata.AdditionalProperties {
feeder <- metaValue
job.Metadata.AdditionalProperties[metaKey] = <-receiver
}
}
close(feeder)
wg.Wait()
close(receiver)
}

@ -154,6 +154,122 @@ func TestReplaceJobsVariable(t *testing.T) {
}
}
func TestReplaceTwoWayVariables(t *testing.T) {
c := config.DefaultConfig(func(c *config.Conf) {
// Mock that the Manager is running Linux.
c.MockCurrentGOOSForTests("linux")
// Register one variable in the same way that the implicit 'jobs' variable is registered.
c.Variables["locally-set-path"] = config.Variable{
Values: []config.VariableValue{
{Value: "/render/frames", Platform: config.VariablePlatformAll, Audience: config.VariableAudienceAll},
},
}
c.Variables["unused"] = config.Variable{
Values: []config.VariableValue{
{Value: "Ignore it, it'll be faaaain!", Platform: config.VariablePlatformAll, Audience: config.VariableAudienceAll},
},
}
// These two-way variables should be used to translate the path as well.
c.Variables["project"] = config.Variable{
IsTwoWay: true,
Values: []config.VariableValue{
{Value: "/projects/sprite-fright", Platform: config.VariablePlatformAll, Audience: config.VariableAudienceAll},
},
}
c.Variables["render"] = config.Variable{
IsTwoWay: true,
Values: []config.VariableValue{
{Value: "/render", Platform: config.VariablePlatformLinux, Audience: config.VariableAudienceWorkers},
{Value: "/Volumes/render", Platform: config.VariablePlatformDarwin, Audience: config.VariableAudienceWorkers},
{Value: "R:", Platform: config.VariablePlatformWindows, Audience: config.VariableAudienceWorkers},
},
}
})
// Test job without settings or metadata.
{
original := varReplSubmittedJob()
original.Settings = nil
original.Metadata = nil
replaced := varReplSubmittedJob()
replaced.Settings = nil
replaced.Metadata = nil
replaceTwoWayVariables(&c, replaced)
assert.Equal(t, original.Type, replaced.Type, "two-way variable replacement shouldn't happen on the Type property")
assert.Equal(t, original.Name, replaced.Name, "two-way variable replacement shouldn't happen on the Name property")
assert.Equal(t, original.Priority, replaced.Priority, "two-way variable replacement shouldn't happen on the Priority property")
assert.Equal(t, original.SubmitterPlatform, replaced.SubmitterPlatform)
assert.Nil(t, replaced.Settings)
assert.Nil(t, replaced.Metadata)
}
// Test with settings & metadata.
{
original := varReplSubmittedJob()
replaced := jsonWash(varReplSubmittedJob())
replaceTwoWayVariables(&c, replaced)
expectSettings := map[string]interface{}{
"blender_cmd": "{blender}",
"filepath": "{render}/jobs/sf/scene123.blend",
"render_output_root": "{render}/frames/sf/scene123",
"render_output_path": "{render}/frames/sf/scene123/Substituição variável bidirecional/######",
"different_prefix_path": "/backup/render/frames/sf/scene123", // two-way variables should only apply to prefixes.
"frames": "1-10",
"chunk_size": float64(3), // Changed type due to the JSON-washing.
"fps": float64(24), // Changed type due to the JSON-washing.
"extract_audio": true,
"images_or_video": "images",
"format": "PNG",
"output_file_extension": ".png",
}
expectMetadata := map[string]string{
"user.name": "Sybren Stüvel",
"project": "Sprite Fright",
"root": "{project}",
"scene": "{project}/scenes/123",
}
assert.Equal(t, original.Type, replaced.Type, "two-way variable replacement shouldn't happen on the Type property")
assert.Equal(t, original.Name, replaced.Name, "two-way variable replacement shouldn't happen on the Name property")
assert.Equal(t, original.Priority, replaced.Priority, "two-way variable replacement shouldn't happen on the Priority property")
assert.Equal(t, original.SubmitterPlatform, replaced.SubmitterPlatform)
assert.Equal(t, expectSettings, replaced.Settings.AdditionalProperties)
assert.Equal(t, expectMetadata, replaced.Metadata.AdditionalProperties)
}
}
func varReplSubmittedJob() api.SubmittedJob {
return api.SubmittedJob{
Type: "simple-blender-render",
Name: "Ignore it, it'll be faaaain!",
Priority: 50,
SubmitterPlatform: "linux",
Settings: &api.JobSettings{AdditionalProperties: map[string]interface{}{
"blender_cmd": "{blender}",
"filepath": "/render/jobs/sf/scene123.blend",
"render_output_root": "/render/frames/sf/scene123",
"render_output_path": "/render/frames/sf/scene123/Substituição variável bidirecional/######",
"different_prefix_path": "/backup/render/frames/sf/scene123",
"frames": "1-10",
"chunk_size": 3,
"fps": 24,
"extract_audio": true,
"images_or_video": "images",
"format": "PNG",
"output_file_extension": ".png",
}},
Metadata: &api.JobMetadata{AdditionalProperties: map[string]string{
"user.name": "Sybren Stüvel",
"project": "Sprite Fright",
"root": "/projects/sprite-fright",
"scene": "/projects/sprite-fright/scenes/123",
}},
}
}
// jsonWash converts the given value to JSON and back.
// This makes sure the types are as closed to what the API will handle as
// possible, making the difference between "array of strings" and "array of

@ -13,6 +13,7 @@ import (
"github.com/labstack/echo/v4"
"github.com/stretchr/testify/assert"
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/last_rendered"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
@ -35,6 +36,11 @@ func TestTaskScheduleHappy(t *testing.T) {
task := persistence.Task{
UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503",
Job: &job,
Commands: []persistence.Command{
{Name: "test", Parameters: map[string]interface{}{
"param": "prefix-{variable}-suffix",
}},
},
}
ctx := echo.Request().Context()
@ -42,6 +48,11 @@ func TestTaskScheduleHappy(t *testing.T) {
mf.persistence.EXPECT().ScheduleTask(ctx, &worker).Return(&task, nil)
mf.persistence.EXPECT().TaskTouchedByWorker(bgCtx, &task)
mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker)
mf.expectExpandVariables(t,
config.VariableAudienceWorkers,
config.VariablePlatform(worker.Platform),
map[string]string{"variable": "value"},
)
mf.logStorage.EXPECT().WriteTimestamped(bgCtx, job.UUID, task.UUID,
"Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)")
@ -53,9 +64,13 @@ func TestTaskScheduleHappy(t *testing.T) {
// Check the response
assignedTask := api.AssignedTask{
Uuid: task.UUID,
Job: job.UUID,
Commands: []api.Command{},
Uuid: task.UUID,
Job: job.UUID,
Commands: []api.Command{
{Name: "test", Parameters: map[string]interface{}{
"param": "prefix-value-suffix",
}},
},
}
assertResponseJSON(t, echo, http.StatusOK, assignedTask)
resp := getRecordedResponse(echo)

@ -3,6 +3,8 @@ package config
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"io"
@ -114,6 +116,10 @@ type ShamanGarbageCollect struct {
type Conf struct {
Base `yaml:",inline"`
// Store GOOS in a variable so it can be modified by unit tests, making the
// test independent of the actual platform.
currentGOOS VariablePlatform `yaml:"-"`
// Variable name → Variable definition
Variables map[string]Variable `yaml:"variables"`
@ -163,10 +169,13 @@ func getConf() (Conf, error) {
// DefaultConfig returns a copy of the default configuration.
func DefaultConfig(override ...func(c *Conf)) Conf {
c := defaultConfig
c, err := defaultConfig.copy()
if err != nil {
panic(fmt.Sprintf("unable to create copy of default config: %v", err))
}
c.Meta.Version = latestConfigVersion
c.processAfterLoading(override...)
return c
return *c
}
// loadConf parses the given file and returns its contents as a Conf object.
@ -224,6 +233,13 @@ func (c *Conf) processAfterLoading(override ...func(c *Conf)) {
c.checkVariables()
}
// MockCurrentGOOSForTests can be used in unit tests to make the variable
// replacement system think it's running a different operating system. This
// should only be used to make the tests independent of the actual OS.
func (c *Conf) MockCurrentGOOSForTests(mockedGOOS string) {
c.currentGOOS = VariablePlatform(mockedGOOS)
}
func (c *Conf) processStorage() {
// The shared storage path should be absolute, but only if it's actually configured.
if c.SharedStoragePath != "" {
@ -267,6 +283,8 @@ func (c *Conf) EffectiveStoragePath() string {
func (c *Conf) addImplicitVariables() {
c.implicitVariables = make(map[string]Variable)
// The 'jobs' variable MUST be one-way only. There is no way that the Manager
// can know how this path can be reached on other platforms.
c.implicitVariables["jobs"] = Variable{
IsTwoWay: false,
Values: []VariableValue{
@ -387,29 +405,91 @@ func updateMap[K comparable, V any](target map[K]V, updateWith map[K]V) {
}
}
// ExpandVariables converts "{variable name}" to the value that belongs to the given audience and platform.
func (c *Conf) ExpandVariables(valueToExpand string, audience VariableAudience, platform VariablePlatform) string {
// ExpandVariables converts "{variable name}" to the value that belongs to the
// given audience and platform. The function iterates over all strings provided
// by the input channel, and sends the expanded result into the output channel.
// It will return when the input channel is closed.
func (c *Conf) ExpandVariables(inputChannel <-chan string, outputChannel chan<- string,
audience VariableAudience, platform VariablePlatform) {
// Get the variables for the given audience & platform.
varsForPlatform := c.getVariables(audience, platform)
if len(varsForPlatform) == 0 {
log.Warn().
Str("valueToExpand", valueToExpand).
Str("audience", string(audience)).
Str("platform", string(platform)).
Msg("no variables defined for this platform given this audience")
}
// Get the two-way variables for the Manager platform.
twoWayVars := make(map[string]string)
if platform != c.currentGOOS {
twoWayVars = c.GetTwoWayVariables(audience, c.currentGOOS)
}
doValueReplacement := func(valueToExpand string) string {
// Expand variables from {varname} to their value for the target platform.
for varname, varvalue := range varsForPlatform {
placeholder := fmt.Sprintf("{%s}", varname)
valueToExpand = strings.Replace(valueToExpand, placeholder, varvalue, -1)
}
// Go through the two-way variables, to make sure that the result of
// expanding variables gets the two-way variables applied as well. This is
// necessary to make implicitly-defined variable, which are only defined for
// the Manager's platform, usable for the target platform.
//
// Practically, this replaces "value for the Manager platform" with "value
// for the target platform".
for varname, managerValue := range twoWayVars {
targetValue, ok := varsForPlatform[varname]
if !ok {
continue
}
if !strings.HasPrefix(valueToExpand, managerValue) {
continue
}
valueToExpand = targetValue + valueToExpand[len(managerValue):]
}
return valueToExpand
}
// Variable replacement
for varname, varvalue := range varsForPlatform {
placeholder := fmt.Sprintf("{%s}", varname)
valueToExpand = strings.Replace(valueToExpand, placeholder, varvalue, -1)
for valueToExpand := range inputChannel {
outputChannel <- doValueReplacement(valueToExpand)
}
}
// ConvertTwoWayVariables converts the value of a variable with "{variable
// name}", but only for two-way variables. The function iterates over all
// strings provided by the input channel, and sends the expanded result into the
// output channel. It will return when the input channel is closed.
func (c *Conf) ConvertTwoWayVariables(inputChannel <-chan string, outputChannel chan<- string,
audience VariableAudience, platform VariablePlatform) {
// Get the variables for the given audience & platform.
twoWayVars := c.GetTwoWayVariables(audience, platform)
if len(twoWayVars) == 0 {
log.Debug().
Str("audience", string(audience)).
Str("platform", string(platform)).
Msg("no two-way variables defined for this platform given this audience")
}
// TODO: this needs to go through multiple variable replacements, to make sure
// that, for example, the `{jobs}` variable gets the two-way variables applied
// as well.
doValueReplacement := func(valueToConvert string) string {
for varName, varValue := range twoWayVars {
if !strings.HasPrefix(valueToConvert, varValue) {
continue
}
valueToConvert = fmt.Sprintf("{%s}%s", varName, valueToConvert[len(varValue):])
}
return valueToExpand
return valueToConvert
}
for valueToExpand := range inputChannel {
outputChannel <- doValueReplacement(valueToExpand)
}
}
// getVariables returns the variable values for this (audience, platform) combination.
@ -428,6 +508,24 @@ func (c *Conf) getVariables(audience VariableAudience, platform VariablePlatform
return varsForPlatform
}
// GetTwoWayVariables returns the two-way variable values for this (audience,
// platform) combination. If no variables are found, just returns an empty map.
// If a value is defined for both the "all" platform and specifically the given
// platform, the specific platform definition wins.
func (c *Conf) GetTwoWayVariables(audience VariableAudience, platform VariablePlatform) map[string]string {
varsForPlatform := c.getVariables(audience, platform)
// Only keep the two-way variables.
twoWayVars := map[string]string{}
for varname, value := range varsForPlatform {
isTwoWay := c.implicitVariables[varname].IsTwoWay || c.Variables[varname].IsTwoWay
if isTwoWay {
twoWayVars[varname] = value
}
}
return twoWayVars
}
// ResolveVariables returns the variables for this (audience, platform) combination.
// If no variables are found, just returns an empty map. If a value is defined
// for both the "all" platform and specifically the given platform, the specific
@ -551,6 +649,23 @@ func (c *Conf) Write(filename string) error {
return nil
}
// copy creates a deep copy of this configuration.
func (c *Conf) copy() (*Conf, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
dec := gob.NewDecoder(&buf)
err := enc.Encode(c)
if err != nil {
return nil, err
}
var copy Conf
err = dec.Decode(&copy)
if err != nil {
return nil, err
}
return &copy, nil
}
// GetTestConfig returns the configuration for unit tests.
// The config is loaded from `test-flamenco-manager.yaml` in the directory
// containing the caller's source.

@ -92,4 +92,7 @@ var defaultConfig = Conf{
// },
// },
},
// This should not be set to anything else, except in unit tests.
currentGOOS: VariablePlatform(runtime.GOOS),
}

@ -71,9 +71,11 @@ func (s *Service) Save() error {
}
// Expose some functions on Conf here, for easier mocking of functionality via interfaces.
//
func (s *Service) ExpandVariables(valueToExpand string, audience VariableAudience, platform VariablePlatform) string {
return s.config.ExpandVariables(valueToExpand, audience, platform)
func (s *Service) ExpandVariables(inputChannel <-chan string, outputChannel chan<- string, audience VariableAudience, platform VariablePlatform) {
s.config.ExpandVariables(inputChannel, outputChannel, audience, platform)
}
func (s *Service) ConvertTwoWayVariables(inputChannel <-chan string, outputChannel chan<- string, audience VariableAudience, platform VariablePlatform) {
s.config.ConvertTwoWayVariables(inputChannel, outputChannel, audience, platform)
}
func (s *Service) ResolveVariables(audience VariableAudience, platform VariablePlatform) map[string]ResolvedVariable {
return s.config.ResolveVariables(audience, platform)

@ -3,6 +3,7 @@ package config
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"sync"
"testing"
"git.blender.org/flamenco/pkg/crosspath"
@ -41,9 +42,6 @@ func TestVariableValidation(t *testing.T) {
assert.Equal(t, c.Variables["blender"].Values[1].Value, "/valid/path/blender")
}
// TODO: Test two-way variables. Even though they're not currently in the
// default configuration, they should work.
func TestStorageImplicitVariablesWithShaman(t *testing.T) {
c := DefaultConfig(func(c *Conf) {
// Having the Shaman enabled should create an implicit variable "{jobs}" at the Shaman checkout path.
@ -51,7 +49,7 @@ func TestStorageImplicitVariablesWithShaman(t *testing.T) {
c.Shaman.Enabled = true
c.Variables["jobs"] = Variable{
IsTwoWay: true,
IsTwoWay: false,
Values: []VariableValue{
{
Audience: VariableAudienceAll,
@ -79,7 +77,7 @@ func TestStorageImplicitVariablesWithoutShaman(t *testing.T) {
c.Shaman.Enabled = false
c.Variables["jobs"] = Variable{
IsTwoWay: true,
IsTwoWay: false,
Values: []VariableValue{
{
Audience: VariableAudienceAll,
@ -99,3 +97,90 @@ func TestStorageImplicitVariablesWithoutShaman(t *testing.T) {
crosspath.ToSlash(c.SharedStoragePath),
c.implicitVariables["jobs"].Values[0].Value)
}
func TestExpandVariables(t *testing.T) {
c := DefaultConfig(func(c *Conf) {
c.Variables["demo"] = Variable{
Values: []VariableValue{
{Value: "demo-value", Audience: VariableAudienceAll, Platform: VariablePlatformDarwin},
},
}
c.Variables["ffmpeg"] = Variable{
Values: []VariableValue{
{Value: "/path/to/ffmpeg", Audience: VariableAudienceUsers, Platform: VariablePlatformLinux},
{Value: "/path/to/ffmpeg/on/darwin", Audience: VariableAudienceUsers, Platform: VariablePlatformDarwin},
{Value: "C:/flamenco/ffmpeg", Audience: VariableAudienceUsers, Platform: VariablePlatformWindows},
},
}
})
feeder := make(chan string, 1)
receiver := make(chan string, 1)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c.ExpandVariables(feeder, receiver, VariableAudienceUsers, VariablePlatformWindows)
}()
feeder <- "unchanged value"
assert.Equal(t, "unchanged value", <-receiver)
feeder <- "{ffmpeg}"
assert.Equal(t, "C:/flamenco/ffmpeg", <-receiver)
feeder <- "{demo}"
assert.Equal(t, "{demo}", <-receiver, "missing value on the platform should not be replaced")
close(feeder)
wg.Wait()
close(receiver)
}
func TestExpandVariablesWithTwoWay(t *testing.T) {
c := DefaultConfig(func(c *Conf) {
// Mock that the Manager is running on Linux right now.
c.currentGOOS = VariablePlatformLinux
// Register one variable in the same way that the implicit 'jobs' variable is registered.
c.Variables["locally-set-path"] = Variable{
Values: []VariableValue{
{Value: "/path/on/linux", Platform: VariablePlatformAll, Audience: VariableAudienceAll},
},
}
// This two-way variable should be used to translate the path as well.
c.Variables["platform-specifics"] = Variable{
IsTwoWay: true,
Values: []VariableValue{
{Value: "/path/on/linux", Platform: VariablePlatformLinux, Audience: VariableAudienceWorkers},
{Value: "/path/on/darwin", Platform: VariablePlatformDarwin, Audience: VariableAudienceWorkers},
{Value: "C:/path/on/windows", Platform: VariablePlatformWindows, Audience: VariableAudienceWorkers},
},
}
})
feeder := make(chan string, 1)
receiver := make(chan string, 1)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// Always target a different-than-current target platform.
c.ExpandVariables(feeder, receiver, VariableAudienceWorkers, VariablePlatformWindows)
}()
// Simple two-way variable replacement.
feeder <- "/path/on/linux/file.txt"
assert.Equal(t, "C:/path/on/windows/file.txt", <-receiver)
// {locally-set-path} expands to a value that's then further replaced by a two-way variable.
feeder <- "{locally-set-path}/should/be/remapped"
assert.Equal(t, "C:/path/on/windows/should/be/remapped", <-receiver)
close(feeder)
wg.Wait()
close(receiver)
}