From 11a352968ad1909b6c1c7d8645e29f0a52d3002e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 21 Jul 2022 16:41:40 +0200 Subject: [PATCH] Fix T99434: Two-way Variables Two-way variable implementation in the job submission end-point. Where Flamenco v2 did the variable replacement in the add-on, this has now been moved to the Manager itself. The only thing the add-on needs to pass is its platform, so that the right values can be recognised. This also implements two-way replacement when tasks are handed out, such that the `{jobs}` value gets replaced to a value suitable for the Worker's platform as well. --- addon/flamenco/job_submission.py | 2 + debug-job-echo.sh | 3 +- debug-job-render.sh | 3 +- internal/manager/api_impl/jobs.go | 11 ++ internal/manager/api_impl/jobs_test.go | 105 ++++++++++++- .../api_impl/mocks/api_impl_mock.gen.go | 22 ++- .../manager/api_impl/mocks/varrepl.gen.go | 73 +++++++++ internal/manager/api_impl/support_test.go | 50 +++++++ internal/manager/api_impl/varrepl.go | 74 ++++++++- internal/manager/api_impl/varrepl_test.go | 116 ++++++++++++++ internal/manager/api_impl/workers_test.go | 21 ++- internal/manager/config/config.go | 141 ++++++++++++++++-- internal/manager/config/defaults.go | 3 + internal/manager/config/service.go | 8 +- internal/manager/config/settings_test.go | 95 +++++++++++- 15 files changed, 683 insertions(+), 44 deletions(-) create mode 100644 internal/manager/api_impl/mocks/varrepl.gen.go diff --git a/addon/flamenco/job_submission.py b/addon/flamenco/job_submission.py index 24a2da58..fbd67348 100644 --- a/addon/flamenco/job_submission.py +++ b/addon/flamenco/job_submission.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: GPL-3.0-or-later from pathlib import Path, PurePosixPath from typing import TYPE_CHECKING, Optional, Union +import platform import bpy @@ -43,6 +44,7 @@ def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]: priority=50, settings=settings, metadata=metadata, + submitter_platform=platform.system().lower(), ) return job diff --git a/debug-job-echo.sh b/debug-job-echo.sh index b7e03ff7..55477dba 100755 --- a/debug-job-echo.sh +++ b/debug-job-echo.sh @@ -17,5 +17,6 @@ curl -X 'POST' \ "sleep_repeats": 1, "message": "Blender is {blender}" }, - "type": "echo-sleep-test" + "type": "echo-sleep-test", + "submitter_platform": "manager" }' diff --git a/debug-job-render.sh b/debug-job-render.sh index bdf0ec9a..04ccb6e2 100755 --- a/debug-job-render.sh +++ b/debug-job-render.sh @@ -25,5 +25,6 @@ curl -v -X 'POST' \ "render_output_root": "/tmp/flamenco/", "video_container_format": "MPEG1" }, - "priority": 50 + "priority": 50, + "submitter_platform": "manager" }' diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 648b6116..df1c6837 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "path" + "runtime" "github.com/labstack/echo/v4" "github.com/rs/zerolog" @@ -76,6 +77,16 @@ func (f *Flamenco) SubmitJob(e echo.Context) error { ctx := e.Request().Context() submittedJob := api.SubmittedJob(job) + + // Replace the special "manager" platform with the Manager's actual platform. + if submittedJob.SubmitterPlatform == "manager" { + submittedJob.SubmitterPlatform = runtime.GOOS + } + + // Before compiling the job, replace the two-way variables. This ensures all + // the tasks also use those. + replaceTwoWayVariables(f.config, submittedJob) + authoredJob, err := f.jobCompiler.Compile(ctx, submittedJob) if err != nil { logger.Warn().Err(err).Msg("error compiling job") diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 567c88c6..a6bbe11a 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -9,6 +9,7 @@ import ( "os" "testing" + "git.blender.org/flamenco/internal/manager/config" "git.blender.org/flamenco/internal/manager/job_compilers" "git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/persistence" @@ -21,7 +22,7 @@ func ptr[T any](value T) *T { return &value } -func TestSubmitJob(t *testing.T) { +func TestSubmitJobWithoutSettings(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -29,11 +30,18 @@ func TestSubmitJob(t *testing.T) { worker := testWorker() submittedJob := api.SubmittedJob{ - Name: "поднео посао", - Type: "test", - Priority: 50, + Name: "поднео посао", + Type: "test", + Priority: 50, + SubmitterPlatform: worker.Platform, } + mf.expectConvertTwoWayVariables(t, + config.VariableAudienceWorkers, + config.VariablePlatform(worker.Platform), + map[string]string{}, + ) + // Expect the job compiler to be called. authoredJob := job_compilers.AuthoredJob{ JobID: "afc47568-bd9d-4368-8016-e91d945db36d", @@ -78,9 +86,96 @@ func TestSubmitJob(t *testing.T) { requestWorkerStore(echoCtx, &worker) err := mf.flamenco.SubmitJob(echoCtx) assert.NoError(t, err) - } +func TestSubmitJobWithSettings(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + + submittedJob := api.SubmittedJob{ + Name: "поднео посао", + Type: "test", + Priority: 50, + SubmitterPlatform: worker.Platform, + Settings: &api.JobSettings{AdditionalProperties: map[string]interface{}{ + "result": "/render/frames/exploding.kittens", + }}, + Metadata: &api.JobMetadata{AdditionalProperties: map[string]string{ + "project": "/projects/exploding-kittens", + }}, + } + + mf.expectConvertTwoWayVariables(t, + config.VariableAudienceWorkers, + config.VariablePlatform(worker.Platform), + map[string]string{ + "jobbies": "/render/jobs", + "frames": "/render/frames", + "projects": "/projects", + }, + ) + + // Same job submittedJob, but then with two-way variables injected. + variableReplacedSettings := map[string]interface{}{ + "result": "{frames}/exploding.kittens", + } + variableReplacedMetadata := map[string]string{ + "project": "{projects}/exploding-kittens", + } + variableReplacedJob := submittedJob + variableReplacedJob.Settings = &api.JobSettings{AdditionalProperties: variableReplacedSettings} + variableReplacedJob.Metadata = &api.JobMetadata{AdditionalProperties: variableReplacedMetadata} + + // Expect the job compiler to be called. + authoredJob := job_compilers.AuthoredJob{ + JobID: "afc47568-bd9d-4368-8016-e91d945db36d", + Name: variableReplacedJob.Name, + JobType: variableReplacedJob.Type, + Priority: variableReplacedJob.Priority, + Status: api.JobStatusUnderConstruction, + Created: mf.clock.Now(), + Settings: variableReplacedJob.Settings.AdditionalProperties, + Metadata: variableReplacedJob.Metadata.AdditionalProperties, + } + mf.jobCompiler.EXPECT().Compile(gomock.Any(), variableReplacedJob).Return(&authoredJob, nil) + + // Expect the job to be saved with 'queued' status: + queuedJob := authoredJob + queuedJob.Status = api.JobStatusQueued + mf.persistence.EXPECT().StoreAuthoredJob(gomock.Any(), queuedJob).Return(nil) + + // Expect the job to be fetched from the database again: + dbJob := persistence.Job{ + UUID: queuedJob.JobID, + Name: queuedJob.Name, + JobType: queuedJob.JobType, + Priority: queuedJob.Priority, + Status: queuedJob.Status, + Settings: variableReplacedSettings, + Metadata: variableReplacedMetadata, + } + mf.persistence.EXPECT().FetchJob(gomock.Any(), queuedJob.JobID).Return(&dbJob, nil) + + // Expect the new job to be broadcast. + jobUpdate := api.SocketIOJobUpdate{ + Id: dbJob.UUID, + Name: &dbJob.Name, + Priority: dbJob.Priority, + Status: dbJob.Status, + Type: dbJob.JobType, + Updated: dbJob.UpdatedAt, + } + mf.broadcaster.EXPECT().BroadcastNewJob(jobUpdate) + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(submittedJob) + requestWorkerStore(echoCtx, &worker) + err := mf.flamenco.SubmitJob(echoCtx) + assert.NoError(t, err) +} func TestGetJobTypeHappy(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index ad493e0b..9240895e 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -700,6 +700,18 @@ func (m *MockConfigService) EXPECT() *MockConfigServiceMockRecorder { return m.recorder } +// ConvertTwoWayVariables mocks base method. +func (m *MockConfigService) ConvertTwoWayVariables(arg0 <-chan string, arg1 chan<- string, arg2 config.VariableAudience, arg3 config.VariablePlatform) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ConvertTwoWayVariables", arg0, arg1, arg2, arg3) +} + +// ConvertTwoWayVariables indicates an expected call of ConvertTwoWayVariables. +func (mr *MockConfigServiceMockRecorder) ConvertTwoWayVariables(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConvertTwoWayVariables", reflect.TypeOf((*MockConfigService)(nil).ConvertTwoWayVariables), arg0, arg1, arg2, arg3) +} + // EffectiveStoragePath mocks base method. func (m *MockConfigService) EffectiveStoragePath() string { m.ctrl.T.Helper() @@ -715,17 +727,15 @@ func (mr *MockConfigServiceMockRecorder) EffectiveStoragePath() *gomock.Call { } // ExpandVariables mocks base method. -func (m *MockConfigService) ExpandVariables(arg0 string, arg1 config.VariableAudience, arg2 config.VariablePlatform) string { +func (m *MockConfigService) ExpandVariables(arg0 <-chan string, arg1 chan<- string, arg2 config.VariableAudience, arg3 config.VariablePlatform) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ExpandVariables", arg0, arg1, arg2) - ret0, _ := ret[0].(string) - return ret0 + m.ctrl.Call(m, "ExpandVariables", arg0, arg1, arg2, arg3) } // ExpandVariables indicates an expected call of ExpandVariables. -func (mr *MockConfigServiceMockRecorder) ExpandVariables(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockConfigServiceMockRecorder) ExpandVariables(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExpandVariables", reflect.TypeOf((*MockConfigService)(nil).ExpandVariables), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExpandVariables", reflect.TypeOf((*MockConfigService)(nil).ExpandVariables), arg0, arg1, arg2, arg3) } // ForceFirstRun mocks base method. diff --git a/internal/manager/api_impl/mocks/varrepl.gen.go b/internal/manager/api_impl/mocks/varrepl.gen.go new file mode 100644 index 00000000..6b7da3c9 --- /dev/null +++ b/internal/manager/api_impl/mocks/varrepl.gen.go @@ -0,0 +1,73 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: VariableReplacer) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + config "git.blender.org/flamenco/internal/manager/config" + gomock "github.com/golang/mock/gomock" +) + +// MockVariableReplacer is a mock of VariableReplacer interface. +type MockVariableReplacer struct { + ctrl *gomock.Controller + recorder *MockVariableReplacerMockRecorder +} + +// MockVariableReplacerMockRecorder is the mock recorder for MockVariableReplacer. +type MockVariableReplacerMockRecorder struct { + mock *MockVariableReplacer +} + +// NewMockVariableReplacer creates a new mock instance. +func NewMockVariableReplacer(ctrl *gomock.Controller) *MockVariableReplacer { + mock := &MockVariableReplacer{ctrl: ctrl} + mock.recorder = &MockVariableReplacerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockVariableReplacer) EXPECT() *MockVariableReplacerMockRecorder { + return m.recorder +} + +// ConvertTwoWayVariables mocks base method. +func (m *MockVariableReplacer) ConvertTwoWayVariables(arg0 <-chan string, arg1 chan<- string, arg2 config.VariableAudience, arg3 config.VariablePlatform) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ConvertTwoWayVariables", arg0, arg1, arg2, arg3) +} + +// ConvertTwoWayVariables indicates an expected call of ConvertTwoWayVariables. +func (mr *MockVariableReplacerMockRecorder) ConvertTwoWayVariables(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConvertTwoWayVariables", reflect.TypeOf((*MockVariableReplacer)(nil).ConvertTwoWayVariables), arg0, arg1, arg2, arg3) +} + +// ExpandVariables mocks base method. +func (m *MockVariableReplacer) ExpandVariables(arg0 <-chan string, arg1 chan<- string, arg2 config.VariableAudience, arg3 config.VariablePlatform) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ExpandVariables", arg0, arg1, arg2, arg3) +} + +// ExpandVariables indicates an expected call of ExpandVariables. +func (mr *MockVariableReplacerMockRecorder) ExpandVariables(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExpandVariables", reflect.TypeOf((*MockVariableReplacer)(nil).ExpandVariables), arg0, arg1, arg2, arg3) +} + +// ResolveVariables mocks base method. +func (m *MockVariableReplacer) ResolveVariables(arg0 config.VariableAudience, arg1 config.VariablePlatform) map[string]config.ResolvedVariable { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ResolveVariables", arg0, arg1) + ret0, _ := ret[0].(map[string]config.ResolvedVariable) + return ret0 +} + +// ResolveVariables indicates an expected call of ResolveVariables. +func (mr *MockVariableReplacerMockRecorder) ResolveVariables(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResolveVariables", reflect.TypeOf((*MockVariableReplacer)(nil).ResolveVariables), arg0, arg1) +} diff --git a/internal/manager/api_impl/support_test.go b/internal/manager/api_impl/support_test.go index bdd85ecf..23a5d27c 100644 --- a/internal/manager/api_impl/support_test.go +++ b/internal/manager/api_impl/support_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/assert" "git.blender.org/flamenco/internal/manager/api_impl/mocks" + "git.blender.org/flamenco/internal/manager/config" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/pkg/api" ) @@ -76,6 +77,55 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { } } +func (mf *mockedFlamenco) expectExpandVariables( + t *testing.T, + expectAudience config.VariableAudience, + expectPlatform config.VariablePlatform, + variables map[string]string, +) *gomock.Call { + + // Set up a fake configuration that matches the given variables. + c := config.DefaultConfig(func(c *config.Conf) { + for varName, varValue := range variables { + c.Variables[varName] = config.Variable{ + Values: []config.VariableValue{ + {Value: varValue, Audience: expectAudience, Platform: expectPlatform}, + }, + } + } + }) + + // Defer the mocked call to the fake configuration. + return mf.config.EXPECT(). + ExpandVariables(gomock.Any(), gomock.Any(), expectAudience, expectPlatform). + DoAndReturn(c.ExpandVariables) +} + +func (mf *mockedFlamenco) expectConvertTwoWayVariables( + t *testing.T, + expectAudience config.VariableAudience, + expectPlatform config.VariablePlatform, + variables map[string]string, +) *gomock.Call { + + // Set up a fake configuration that matches the given variables. + c := config.DefaultConfig(func(c *config.Conf) { + for varName, varValue := range variables { + c.Variables[varName] = config.Variable{ + IsTwoWay: true, + Values: []config.VariableValue{ + {Value: varValue, Audience: expectAudience, Platform: expectPlatform}, + }, + } + } + }) + + // Defer the mocked call to the fake configuration. + return mf.config.EXPECT(). + ConvertTwoWayVariables(gomock.Any(), gomock.Any(), expectAudience, expectPlatform). + DoAndReturn(c.ConvertTwoWayVariables) +} + // prepareMockedJSONRequest returns an `echo.Context` that has a JSON request body attached to it. func (mf *mockedFlamenco) prepareMockedJSONRequest(requestBody interface{}) echo.Context { bodyBytes, err := json.MarshalIndent(requestBody, "", " ") diff --git a/internal/manager/api_impl/varrepl.go b/internal/manager/api_impl/varrepl.go index d36fe929..ed7c680d 100644 --- a/internal/manager/api_impl/varrepl.go +++ b/internal/manager/api_impl/varrepl.go @@ -3,31 +3,45 @@ package api_impl // SPDX-License-Identifier: GPL-3.0-or-later import ( + "sync" + "git.blender.org/flamenco/internal/manager/config" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/pkg/api" ) +//go:generate go run github.com/golang/mock/mockgen -destination mocks/varrepl.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl VariableReplacer type VariableReplacer interface { - ExpandVariables(valueToExpand string, audience config.VariableAudience, platform config.VariablePlatform) string + ExpandVariables(inputChannel <-chan string, outputChannel chan<- string, audience config.VariableAudience, platform config.VariablePlatform) ResolveVariables(audience config.VariableAudience, platform config.VariablePlatform) map[string]config.ResolvedVariable + ConvertTwoWayVariables(inputChannel <-chan string, outputChannel chan<- string, audience config.VariableAudience, platform config.VariablePlatform) } // replaceTaskVariables performs variable replacement for worker tasks. func replaceTaskVariables(replacer VariableReplacer, task api.AssignedTask, worker persistence.Worker) api.AssignedTask { - repl := func(value string) string { - return replacer.ExpandVariables(value, config.VariableAudienceWorkers, config.VariablePlatform(worker.Platform)) - } + feeder := make(chan string, 1) + receiver := make(chan string, 1) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + replacer.ExpandVariables(feeder, receiver, + config.VariableAudienceWorkers, config.VariablePlatform(worker.Platform)) + }() for cmdIndex, cmd := range task.Commands { for key, value := range cmd.Parameters { switch v := value.(type) { case string: - task.Commands[cmdIndex].Parameters[key] = repl(v) + feeder <- v + task.Commands[cmdIndex].Parameters[key] = <-receiver + case []string: replaced := make([]string, len(v)) for idx := range v { - replaced[idx] = repl(v[idx]) + feeder <- v[idx] + replaced[idx] = <-receiver } task.Commands[cmdIndex].Parameters[key] = replaced @@ -36,7 +50,8 @@ func replaceTaskVariables(replacer VariableReplacer, task api.AssignedTask, work for idx := range v { switch itemValue := v[idx].(type) { case string: - replaced[idx] = repl(itemValue) + feeder <- itemValue + replaced[idx] = <-receiver default: replaced[idx] = itemValue } @@ -49,5 +64,50 @@ func replaceTaskVariables(replacer VariableReplacer, task api.AssignedTask, work } } + close(feeder) + wg.Wait() + close(receiver) + return task } + +// replaceTwoWayVariables replaces values with their variables. +// For example, when there is a variable `render = /render/flamenco`, an output +// path `/render/flamenco/output.png` will be replaced with +// `{render}/output.png` +// +// NOTE: this updates the job in place. +func replaceTwoWayVariables(replacer VariableReplacer, job api.SubmittedJob) { + feeder := make(chan string, 1) + receiver := make(chan string, 1) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + replacer.ConvertTwoWayVariables(feeder, receiver, + config.VariableAudienceWorkers, config.VariablePlatform(job.SubmitterPlatform)) + }() + + // Only replace variables in settings and metadata, not in other job fields. + if job.Settings != nil { + for settingKey, settingValue := range job.Settings.AdditionalProperties { + stringValue, ok := settingValue.(string) + if !ok { + continue + } + feeder <- stringValue + job.Settings.AdditionalProperties[settingKey] = <-receiver + } + } + if job.Metadata != nil { + for metaKey, metaValue := range job.Metadata.AdditionalProperties { + feeder <- metaValue + job.Metadata.AdditionalProperties[metaKey] = <-receiver + } + } + + close(feeder) + wg.Wait() + close(receiver) +} diff --git a/internal/manager/api_impl/varrepl_test.go b/internal/manager/api_impl/varrepl_test.go index df8559d1..eabb43f0 100644 --- a/internal/manager/api_impl/varrepl_test.go +++ b/internal/manager/api_impl/varrepl_test.go @@ -154,6 +154,122 @@ func TestReplaceJobsVariable(t *testing.T) { } } +func TestReplaceTwoWayVariables(t *testing.T) { + c := config.DefaultConfig(func(c *config.Conf) { + // Mock that the Manager is running Linux. + c.MockCurrentGOOSForTests("linux") + + // Register one variable in the same way that the implicit 'jobs' variable is registered. + c.Variables["locally-set-path"] = config.Variable{ + Values: []config.VariableValue{ + {Value: "/render/frames", Platform: config.VariablePlatformAll, Audience: config.VariableAudienceAll}, + }, + } + c.Variables["unused"] = config.Variable{ + Values: []config.VariableValue{ + {Value: "Ignore it, it'll be faaaain!", Platform: config.VariablePlatformAll, Audience: config.VariableAudienceAll}, + }, + } + // These two-way variables should be used to translate the path as well. + c.Variables["project"] = config.Variable{ + IsTwoWay: true, + Values: []config.VariableValue{ + {Value: "/projects/sprite-fright", Platform: config.VariablePlatformAll, Audience: config.VariableAudienceAll}, + }, + } + c.Variables["render"] = config.Variable{ + IsTwoWay: true, + Values: []config.VariableValue{ + {Value: "/render", Platform: config.VariablePlatformLinux, Audience: config.VariableAudienceWorkers}, + {Value: "/Volumes/render", Platform: config.VariablePlatformDarwin, Audience: config.VariableAudienceWorkers}, + {Value: "R:", Platform: config.VariablePlatformWindows, Audience: config.VariableAudienceWorkers}, + }, + } + }) + + // Test job without settings or metadata. + { + original := varReplSubmittedJob() + original.Settings = nil + original.Metadata = nil + replaced := varReplSubmittedJob() + replaced.Settings = nil + replaced.Metadata = nil + replaceTwoWayVariables(&c, replaced) + + assert.Equal(t, original.Type, replaced.Type, "two-way variable replacement shouldn't happen on the Type property") + assert.Equal(t, original.Name, replaced.Name, "two-way variable replacement shouldn't happen on the Name property") + assert.Equal(t, original.Priority, replaced.Priority, "two-way variable replacement shouldn't happen on the Priority property") + assert.Equal(t, original.SubmitterPlatform, replaced.SubmitterPlatform) + assert.Nil(t, replaced.Settings) + assert.Nil(t, replaced.Metadata) + } + + // Test with settings & metadata. + { + original := varReplSubmittedJob() + replaced := jsonWash(varReplSubmittedJob()) + replaceTwoWayVariables(&c, replaced) + + expectSettings := map[string]interface{}{ + "blender_cmd": "{blender}", + "filepath": "{render}/jobs/sf/scene123.blend", + "render_output_root": "{render}/frames/sf/scene123", + "render_output_path": "{render}/frames/sf/scene123/Substituição variável bidirecional/######", + "different_prefix_path": "/backup/render/frames/sf/scene123", // two-way variables should only apply to prefixes. + "frames": "1-10", + "chunk_size": float64(3), // Changed type due to the JSON-washing. + "fps": float64(24), // Changed type due to the JSON-washing. + "extract_audio": true, + "images_or_video": "images", + "format": "PNG", + "output_file_extension": ".png", + } + expectMetadata := map[string]string{ + "user.name": "Sybren Stüvel", + "project": "Sprite Fright", + "root": "{project}", + "scene": "{project}/scenes/123", + } + + assert.Equal(t, original.Type, replaced.Type, "two-way variable replacement shouldn't happen on the Type property") + assert.Equal(t, original.Name, replaced.Name, "two-way variable replacement shouldn't happen on the Name property") + assert.Equal(t, original.Priority, replaced.Priority, "two-way variable replacement shouldn't happen on the Priority property") + assert.Equal(t, original.SubmitterPlatform, replaced.SubmitterPlatform) + assert.Equal(t, expectSettings, replaced.Settings.AdditionalProperties) + assert.Equal(t, expectMetadata, replaced.Metadata.AdditionalProperties) + } +} + +func varReplSubmittedJob() api.SubmittedJob { + return api.SubmittedJob{ + Type: "simple-blender-render", + Name: "Ignore it, it'll be faaaain!", + Priority: 50, + SubmitterPlatform: "linux", + Settings: &api.JobSettings{AdditionalProperties: map[string]interface{}{ + "blender_cmd": "{blender}", + "filepath": "/render/jobs/sf/scene123.blend", + "render_output_root": "/render/frames/sf/scene123", + "render_output_path": "/render/frames/sf/scene123/Substituição variável bidirecional/######", + "different_prefix_path": "/backup/render/frames/sf/scene123", + "frames": "1-10", + "chunk_size": 3, + "fps": 24, + "extract_audio": true, + "images_or_video": "images", + "format": "PNG", + "output_file_extension": ".png", + }}, + Metadata: &api.JobMetadata{AdditionalProperties: map[string]string{ + "user.name": "Sybren Stüvel", + "project": "Sprite Fright", + "root": "/projects/sprite-fright", + "scene": "/projects/sprite-fright/scenes/123", + }}, + } +} + // jsonWash converts the given value to JSON and back. // This makes sure the types are as closed to what the API will handle as // possible, making the difference between "array of strings" and "array of diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 972bdc4a..70903af4 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -13,6 +13,7 @@ import ( "github.com/labstack/echo/v4" "github.com/stretchr/testify/assert" + "git.blender.org/flamenco/internal/manager/config" "git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/pkg/api" @@ -35,6 +36,11 @@ func TestTaskScheduleHappy(t *testing.T) { task := persistence.Task{ UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503", Job: &job, + Commands: []persistence.Command{ + {Name: "test", Parameters: map[string]interface{}{ + "param": "prefix-{variable}-suffix", + }}, + }, } ctx := echo.Request().Context() @@ -42,6 +48,11 @@ func TestTaskScheduleHappy(t *testing.T) { mf.persistence.EXPECT().ScheduleTask(ctx, &worker).Return(&task, nil) mf.persistence.EXPECT().TaskTouchedByWorker(bgCtx, &task) mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker) + mf.expectExpandVariables(t, + config.VariableAudienceWorkers, + config.VariablePlatform(worker.Platform), + map[string]string{"variable": "value"}, + ) mf.logStorage.EXPECT().WriteTimestamped(bgCtx, job.UUID, task.UUID, "Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)") @@ -53,9 +64,13 @@ func TestTaskScheduleHappy(t *testing.T) { // Check the response assignedTask := api.AssignedTask{ - Uuid: task.UUID, - Job: job.UUID, - Commands: []api.Command{}, + Uuid: task.UUID, + Job: job.UUID, + Commands: []api.Command{ + {Name: "test", Parameters: map[string]interface{}{ + "param": "prefix-value-suffix", + }}, + }, } assertResponseJSON(t, echo, http.StatusOK, assignedTask) resp := getRecordedResponse(echo) diff --git a/internal/manager/config/config.go b/internal/manager/config/config.go index 912c10ac..a0ff4b23 100644 --- a/internal/manager/config/config.go +++ b/internal/manager/config/config.go @@ -3,6 +3,8 @@ package config // SPDX-License-Identifier: GPL-3.0-or-later import ( + "bytes" + "encoding/gob" "errors" "fmt" "io" @@ -114,6 +116,10 @@ type ShamanGarbageCollect struct { type Conf struct { Base `yaml:",inline"` + // Store GOOS in a variable so it can be modified by unit tests, making the + // test independent of the actual platform. + currentGOOS VariablePlatform `yaml:"-"` + // Variable name → Variable definition Variables map[string]Variable `yaml:"variables"` @@ -163,10 +169,13 @@ func getConf() (Conf, error) { // DefaultConfig returns a copy of the default configuration. func DefaultConfig(override ...func(c *Conf)) Conf { - c := defaultConfig + c, err := defaultConfig.copy() + if err != nil { + panic(fmt.Sprintf("unable to create copy of default config: %v", err)) + } c.Meta.Version = latestConfigVersion c.processAfterLoading(override...) - return c + return *c } // loadConf parses the given file and returns its contents as a Conf object. @@ -224,6 +233,13 @@ func (c *Conf) processAfterLoading(override ...func(c *Conf)) { c.checkVariables() } +// MockCurrentGOOSForTests can be used in unit tests to make the variable +// replacement system think it's running a different operating system. This +// should only be used to make the tests independent of the actual OS. +func (c *Conf) MockCurrentGOOSForTests(mockedGOOS string) { + c.currentGOOS = VariablePlatform(mockedGOOS) +} + func (c *Conf) processStorage() { // The shared storage path should be absolute, but only if it's actually configured. if c.SharedStoragePath != "" { @@ -267,6 +283,8 @@ func (c *Conf) EffectiveStoragePath() string { func (c *Conf) addImplicitVariables() { c.implicitVariables = make(map[string]Variable) + // The 'jobs' variable MUST be one-way only. There is no way that the Manager + // can know how this path can be reached on other platforms. c.implicitVariables["jobs"] = Variable{ IsTwoWay: false, Values: []VariableValue{ @@ -387,29 +405,91 @@ func updateMap[K comparable, V any](target map[K]V, updateWith map[K]V) { } } -// ExpandVariables converts "{variable name}" to the value that belongs to the given audience and platform. -func (c *Conf) ExpandVariables(valueToExpand string, audience VariableAudience, platform VariablePlatform) string { +// ExpandVariables converts "{variable name}" to the value that belongs to the +// given audience and platform. The function iterates over all strings provided +// by the input channel, and sends the expanded result into the output channel. +// It will return when the input channel is closed. +func (c *Conf) ExpandVariables(inputChannel <-chan string, outputChannel chan<- string, + audience VariableAudience, platform VariablePlatform) { + + // Get the variables for the given audience & platform. varsForPlatform := c.getVariables(audience, platform) if len(varsForPlatform) == 0 { log.Warn(). - Str("valueToExpand", valueToExpand). Str("audience", string(audience)). Str("platform", string(platform)). Msg("no variables defined for this platform given this audience") + } + + // Get the two-way variables for the Manager platform. + twoWayVars := make(map[string]string) + if platform != c.currentGOOS { + twoWayVars = c.GetTwoWayVariables(audience, c.currentGOOS) + } + + doValueReplacement := func(valueToExpand string) string { + // Expand variables from {varname} to their value for the target platform. + for varname, varvalue := range varsForPlatform { + placeholder := fmt.Sprintf("{%s}", varname) + valueToExpand = strings.Replace(valueToExpand, placeholder, varvalue, -1) + } + + // Go through the two-way variables, to make sure that the result of + // expanding variables gets the two-way variables applied as well. This is + // necessary to make implicitly-defined variable, which are only defined for + // the Manager's platform, usable for the target platform. + // + // Practically, this replaces "value for the Manager platform" with "value + // for the target platform". + for varname, managerValue := range twoWayVars { + targetValue, ok := varsForPlatform[varname] + if !ok { + continue + } + if !strings.HasPrefix(valueToExpand, managerValue) { + continue + } + valueToExpand = targetValue + valueToExpand[len(managerValue):] + } + return valueToExpand } - // Variable replacement - for varname, varvalue := range varsForPlatform { - placeholder := fmt.Sprintf("{%s}", varname) - valueToExpand = strings.Replace(valueToExpand, placeholder, varvalue, -1) + for valueToExpand := range inputChannel { + outputChannel <- doValueReplacement(valueToExpand) + } +} + +// ConvertTwoWayVariables converts the value of a variable with "{variable +// name}", but only for two-way variables. The function iterates over all +// strings provided by the input channel, and sends the expanded result into the +// output channel. It will return when the input channel is closed. +func (c *Conf) ConvertTwoWayVariables(inputChannel <-chan string, outputChannel chan<- string, + audience VariableAudience, platform VariablePlatform) { + + // Get the variables for the given audience & platform. + twoWayVars := c.GetTwoWayVariables(audience, platform) + if len(twoWayVars) == 0 { + log.Debug(). + Str("audience", string(audience)). + Str("platform", string(platform)). + Msg("no two-way variables defined for this platform given this audience") } - // TODO: this needs to go through multiple variable replacements, to make sure - // that, for example, the `{jobs}` variable gets the two-way variables applied - // as well. + doValueReplacement := func(valueToConvert string) string { + for varName, varValue := range twoWayVars { + if !strings.HasPrefix(valueToConvert, varValue) { + continue + } + valueToConvert = fmt.Sprintf("{%s}%s", varName, valueToConvert[len(varValue):]) + } - return valueToExpand + return valueToConvert + } + + for valueToExpand := range inputChannel { + outputChannel <- doValueReplacement(valueToExpand) + } } // getVariables returns the variable values for this (audience, platform) combination. @@ -428,6 +508,24 @@ func (c *Conf) getVariables(audience VariableAudience, platform VariablePlatform return varsForPlatform } +// GetTwoWayVariables returns the two-way variable values for this (audience, +// platform) combination. If no variables are found, just returns an empty map. +// If a value is defined for both the "all" platform and specifically the given +// platform, the specific platform definition wins. +func (c *Conf) GetTwoWayVariables(audience VariableAudience, platform VariablePlatform) map[string]string { + varsForPlatform := c.getVariables(audience, platform) + + // Only keep the two-way variables. + twoWayVars := map[string]string{} + for varname, value := range varsForPlatform { + isTwoWay := c.implicitVariables[varname].IsTwoWay || c.Variables[varname].IsTwoWay + if isTwoWay { + twoWayVars[varname] = value + } + } + return twoWayVars +} + // ResolveVariables returns the variables for this (audience, platform) combination. // If no variables are found, just returns an empty map. If a value is defined // for both the "all" platform and specifically the given platform, the specific @@ -551,6 +649,23 @@ func (c *Conf) Write(filename string) error { return nil } +// copy creates a deep copy of this configuration. +func (c *Conf) copy() (*Conf, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + dec := gob.NewDecoder(&buf) + err := enc.Encode(c) + if err != nil { + return nil, err + } + var copy Conf + err = dec.Decode(©) + if err != nil { + return nil, err + } + return ©, nil +} + // GetTestConfig returns the configuration for unit tests. // The config is loaded from `test-flamenco-manager.yaml` in the directory // containing the caller's source. diff --git a/internal/manager/config/defaults.go b/internal/manager/config/defaults.go index e547e90f..7091b1e0 100644 --- a/internal/manager/config/defaults.go +++ b/internal/manager/config/defaults.go @@ -92,4 +92,7 @@ var defaultConfig = Conf{ // }, // }, }, + + // This should not be set to anything else, except in unit tests. + currentGOOS: VariablePlatform(runtime.GOOS), } diff --git a/internal/manager/config/service.go b/internal/manager/config/service.go index c61ac9a4..c58cafee 100644 --- a/internal/manager/config/service.go +++ b/internal/manager/config/service.go @@ -71,9 +71,11 @@ func (s *Service) Save() error { } // Expose some functions on Conf here, for easier mocking of functionality via interfaces. -// -func (s *Service) ExpandVariables(valueToExpand string, audience VariableAudience, platform VariablePlatform) string { - return s.config.ExpandVariables(valueToExpand, audience, platform) +func (s *Service) ExpandVariables(inputChannel <-chan string, outputChannel chan<- string, audience VariableAudience, platform VariablePlatform) { + s.config.ExpandVariables(inputChannel, outputChannel, audience, platform) +} +func (s *Service) ConvertTwoWayVariables(inputChannel <-chan string, outputChannel chan<- string, audience VariableAudience, platform VariablePlatform) { + s.config.ConvertTwoWayVariables(inputChannel, outputChannel, audience, platform) } func (s *Service) ResolveVariables(audience VariableAudience, platform VariablePlatform) map[string]ResolvedVariable { return s.config.ResolveVariables(audience, platform) diff --git a/internal/manager/config/settings_test.go b/internal/manager/config/settings_test.go index ca62b0d1..0d59601e 100644 --- a/internal/manager/config/settings_test.go +++ b/internal/manager/config/settings_test.go @@ -3,6 +3,7 @@ package config // SPDX-License-Identifier: GPL-3.0-or-later import ( + "sync" "testing" "git.blender.org/flamenco/pkg/crosspath" @@ -41,9 +42,6 @@ func TestVariableValidation(t *testing.T) { assert.Equal(t, c.Variables["blender"].Values[1].Value, "/valid/path/blender") } -// TODO: Test two-way variables. Even though they're not currently in the -// default configuration, they should work. - func TestStorageImplicitVariablesWithShaman(t *testing.T) { c := DefaultConfig(func(c *Conf) { // Having the Shaman enabled should create an implicit variable "{jobs}" at the Shaman checkout path. @@ -51,7 +49,7 @@ func TestStorageImplicitVariablesWithShaman(t *testing.T) { c.Shaman.Enabled = true c.Variables["jobs"] = Variable{ - IsTwoWay: true, + IsTwoWay: false, Values: []VariableValue{ { Audience: VariableAudienceAll, @@ -79,7 +77,7 @@ func TestStorageImplicitVariablesWithoutShaman(t *testing.T) { c.Shaman.Enabled = false c.Variables["jobs"] = Variable{ - IsTwoWay: true, + IsTwoWay: false, Values: []VariableValue{ { Audience: VariableAudienceAll, @@ -99,3 +97,90 @@ func TestStorageImplicitVariablesWithoutShaman(t *testing.T) { crosspath.ToSlash(c.SharedStoragePath), c.implicitVariables["jobs"].Values[0].Value) } + +func TestExpandVariables(t *testing.T) { + c := DefaultConfig(func(c *Conf) { + c.Variables["demo"] = Variable{ + Values: []VariableValue{ + {Value: "demo-value", Audience: VariableAudienceAll, Platform: VariablePlatformDarwin}, + }, + } + c.Variables["ffmpeg"] = Variable{ + Values: []VariableValue{ + {Value: "/path/to/ffmpeg", Audience: VariableAudienceUsers, Platform: VariablePlatformLinux}, + {Value: "/path/to/ffmpeg/on/darwin", Audience: VariableAudienceUsers, Platform: VariablePlatformDarwin}, + {Value: "C:/flamenco/ffmpeg", Audience: VariableAudienceUsers, Platform: VariablePlatformWindows}, + }, + } + }) + + feeder := make(chan string, 1) + receiver := make(chan string, 1) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + c.ExpandVariables(feeder, receiver, VariableAudienceUsers, VariablePlatformWindows) + }() + + feeder <- "unchanged value" + assert.Equal(t, "unchanged value", <-receiver) + + feeder <- "{ffmpeg}" + assert.Equal(t, "C:/flamenco/ffmpeg", <-receiver) + + feeder <- "{demo}" + assert.Equal(t, "{demo}", <-receiver, "missing value on the platform should not be replaced") + + close(feeder) + wg.Wait() + close(receiver) +} + +func TestExpandVariablesWithTwoWay(t *testing.T) { + + c := DefaultConfig(func(c *Conf) { + // Mock that the Manager is running on Linux right now. + c.currentGOOS = VariablePlatformLinux + + // Register one variable in the same way that the implicit 'jobs' variable is registered. + c.Variables["locally-set-path"] = Variable{ + Values: []VariableValue{ + {Value: "/path/on/linux", Platform: VariablePlatformAll, Audience: VariableAudienceAll}, + }, + } + // This two-way variable should be used to translate the path as well. + c.Variables["platform-specifics"] = Variable{ + IsTwoWay: true, + Values: []VariableValue{ + {Value: "/path/on/linux", Platform: VariablePlatformLinux, Audience: VariableAudienceWorkers}, + {Value: "/path/on/darwin", Platform: VariablePlatformDarwin, Audience: VariableAudienceWorkers}, + {Value: "C:/path/on/windows", Platform: VariablePlatformWindows, Audience: VariableAudienceWorkers}, + }, + } + }) + + feeder := make(chan string, 1) + receiver := make(chan string, 1) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + // Always target a different-than-current target platform. + c.ExpandVariables(feeder, receiver, VariableAudienceWorkers, VariablePlatformWindows) + }() + + // Simple two-way variable replacement. + feeder <- "/path/on/linux/file.txt" + assert.Equal(t, "C:/path/on/windows/file.txt", <-receiver) + + // {locally-set-path} expands to a value that's then further replaced by a two-way variable. + feeder <- "{locally-set-path}/should/be/remapped" + assert.Equal(t, "C:/path/on/windows/should/be/remapped", <-receiver) + + close(feeder) + wg.Wait() + close(receiver) +}