diff --git a/FEATURES.md b/FEATURES.md index f6b581d9..986bc1a7 100644 --- a/FEATURES.md +++ b/FEATURES.md @@ -53,6 +53,7 @@ Note that list is **not** in any specific order. - [ ] Show blocklist in web interface, and allow removal of workers. - [x] Worker timeout monitoring - [ ] Last rendered image display + - [ ] Expand the processing queue from a single channel to a queue per job, so that a spammy job doesn't starve the other jobs from queueing images. - [ ] Web interface: Support actions on multiple selected things - [ ] Workers diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 8a6388fe..16552571 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -32,6 +32,8 @@ import ( "git.blender.org/flamenco/internal/manager/api_impl" "git.blender.org/flamenco/internal/manager/config" "git.blender.org/flamenco/internal/manager/job_compilers" + "git.blender.org/flamenco/internal/manager/last_rendered" + "git.blender.org/flamenco/internal/manager/local_storage" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/swagger_ui" "git.blender.org/flamenco/internal/manager/task_logs" @@ -110,9 +112,15 @@ func main() { timeService := clock.New() webUpdater := webupdates.New() + + // TODO: the local storage now is hard-coded to use the same sub-directory as the task log storage. + // This should be refactored so that the task logs storage uses the localStorage object as well. + localStorage := local_storage.NewNextToExe("task-logs") logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater) + taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage) - flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, logStorage, webUpdater) + lastRender := last_rendered.New(localStorage) + flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, logStorage, webUpdater, lastRender) e := buildWebService(flamenco, persist, ssdp, webUpdater, urls) timeoutChecker := timeout_checker.New( @@ -129,6 +137,13 @@ func main() { // done, the main() function will return and the process will stop. wg := new(sync.WaitGroup) + // Run the "last rendered image" processor. + wg.Add(1) + go func() { + defer wg.Done() + lastRender.Run(mainCtx) + }() + // Start the web server. wg.Add(1) go func() { @@ -171,6 +186,7 @@ func buildFlamencoAPI( taskStateMachine *task_state_machine.StateMachine, logStorage *task_logs.Storage, webUpdater *webupdates.BiDirComms, + lastRender *last_rendered.LastRenderedProcessor, ) api.ServerInterface { compiler, err := job_compilers.Load(timeService) if err != nil { @@ -179,7 +195,7 @@ func buildFlamencoAPI( shamanServer := shaman.NewServer(configService.Get().Shaman, nil) flamenco := api_impl.NewFlamenco( compiler, persist, webUpdater, logStorage, configService, - taskStateMachine, shamanServer, timeService) + taskStateMachine, shamanServer, timeService, lastRender) return flamenco } diff --git a/go.mod b/go.mod index fc525020..daab0309 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/disintegration/imaging v1.6.2 // indirect github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect github.com/gertd/go-pluralize v0.2.1 // indirect github.com/ghodss/yaml v1.0.0 // indirect @@ -48,6 +49,7 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.1 // indirect + golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect golang.org/x/mod v0.4.2 // indirect golang.org/x/sys v0.0.0-20211103235746-7861aae1554b // indirect golang.org/x/text v0.3.7 // indirect diff --git a/go.sum b/go.sum index de6c7293..e8d65458 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.0-20210816181553-5444fa50b93d/go. github.com/deepmap/oapi-codegen v1.9.0 h1:qpyRY+dzjMai5QejjA53ebnBtcSvIcZOtYwVlsgdxOc= github.com/deepmap/oapi-codegen v1.9.0/go.mod h1:7t4DbSxmAffcTEgrWvsPYEE2aOARZ8ZKWp3hDuZkHNc= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c= +github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4= github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 h1:Izz0+t1Z5nI16/II7vuEo/nHjodOg0p7+OiDpjX5t1E= github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/dop251/goja v0.0.0-20211217115348-3f9136fa235d h1:XT7Qdmcuwgsgz4GXejX7R5Morysk2GOpeguYJ9JoF5c= @@ -169,6 +171,8 @@ golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e h1:1SzTfNOXwIS2oWiMF+6qu0OUDKb0dauo6MoDUQyu+yU= golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 h1:hVwzHzIUGRjiF7EcUjqNxk3NCfkPxbDKRdnNE1Rpg0U= +golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index df68b28a..fb7f51f9 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -23,6 +23,7 @@ type Flamenco struct { stateMachine TaskStateMachine shaman Shaman clock TimeService + lastRender LastRendered // The task scheduler can be locked to prevent multiple Workers from getting // the same task. It is also used for certain other queries, like @@ -42,6 +43,7 @@ func NewFlamenco( sm TaskStateMachine, sha Shaman, ts TimeService, + lr LastRendered, ) *Flamenco { return &Flamenco{ jobCompiler: jc, @@ -52,6 +54,7 @@ func NewFlamenco( stateMachine: sm, shaman: sha, clock: ts, + lastRender: lr, } } diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 63a999ba..3448515a 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -15,6 +15,7 @@ import ( "git.blender.org/flamenco/internal/manager/config" "git.blender.org/flamenco/internal/manager/job_compilers" + "git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/task_state_machine" "git.blender.org/flamenco/internal/manager/webupdates" @@ -23,7 +24,7 @@ import ( ) // Generate mock implementations of these interfaces. -//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman +//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered type PersistenceService interface { StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error @@ -114,6 +115,14 @@ type LogStorage interface { Tail(jobID, taskID string) (string, error) } +// LastRendered processes the "last rendered" images. +type LastRendered interface { + // QueueImage queues an image for processing. Returns + // `last_rendered.ErrQueueFull` if there is no more space in the queue for + // new images. + QueueImage(payload last_rendered.Payload) error +} + type ConfigService interface { VariableReplacer diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 9f84d763..d6215974 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman) +// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered) // Package mocks is a generated GoMock package. package mocks @@ -11,6 +11,7 @@ import ( config "git.blender.org/flamenco/internal/manager/config" job_compilers "git.blender.org/flamenco/internal/manager/job_compilers" + last_rendered "git.blender.org/flamenco/internal/manager/last_rendered" persistence "git.blender.org/flamenco/internal/manager/persistence" api "git.blender.org/flamenco/pkg/api" gomock "github.com/golang/mock/gomock" @@ -801,3 +802,40 @@ func (mr *MockShamanMockRecorder) Requirements(arg0, arg1 interface{}) *gomock.C mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Requirements", reflect.TypeOf((*MockShaman)(nil).Requirements), arg0, arg1) } + +// MockLastRendered is a mock of LastRendered interface. +type MockLastRendered struct { + ctrl *gomock.Controller + recorder *MockLastRenderedMockRecorder +} + +// MockLastRenderedMockRecorder is the mock recorder for MockLastRendered. +type MockLastRenderedMockRecorder struct { + mock *MockLastRendered +} + +// NewMockLastRendered creates a new mock instance. +func NewMockLastRendered(ctrl *gomock.Controller) *MockLastRendered { + mock := &MockLastRendered{ctrl: ctrl} + mock.recorder = &MockLastRenderedMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLastRendered) EXPECT() *MockLastRenderedMockRecorder { + return m.recorder +} + +// QueueImage mocks base method. +func (m *MockLastRendered) QueueImage(arg0 last_rendered.Payload) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueueImage", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// QueueImage indicates an expected call of QueueImage. +func (mr *MockLastRenderedMockRecorder) QueueImage(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueImage", reflect.TypeOf((*MockLastRendered)(nil).QueueImage), arg0) +} diff --git a/internal/manager/api_impl/support_test.go b/internal/manager/api_impl/support_test.go index b90c7ee5..7007f257 100644 --- a/internal/manager/api_impl/support_test.go +++ b/internal/manager/api_impl/support_test.go @@ -32,6 +32,7 @@ type mockedFlamenco struct { stateMachine *mocks.MockTaskStateMachine shaman *mocks.MockShaman clock *clock.Mock + lastRender *mocks.MockLastRendered } func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { @@ -42,6 +43,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { cs := mocks.NewMockConfigService(mockCtrl) sm := mocks.NewMockTaskStateMachine(mockCtrl) sha := mocks.NewMockShaman(mockCtrl) + lr := mocks.NewMockLastRendered(mockCtrl) clock := clock.NewMock() mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00") @@ -50,7 +52,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { } clock.Set(mockedNow) - f := NewFlamenco(jc, ps, cb, ls, cs, sm, sha, clock) + f := NewFlamenco(jc, ps, cb, ls, cs, sm, sha, clock, lr) return mockedFlamenco{ flamenco: f, @@ -61,6 +63,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { config: cs, stateMachine: sm, clock: clock, + lastRender: lr, } } @@ -126,7 +129,11 @@ func assertResponseJSON(t *testing.T, echoCtx echo.Context, expectStatusCode int assert.JSONEq(t, string(expectJSON), string(actualJSON)) } -func assertResponseAPIError(t *testing.T, echoCtx echo.Context, expectStatusCode int, expectMessage string) { +func assertResponseAPIError(t *testing.T, echoCtx echo.Context, expectStatusCode int, expectMessage string, fmtArgs ...interface{}) { + if len(fmtArgs) > 0 { + expectMessage = fmt.Sprintf(expectMessage, fmtArgs...) + } + assertResponseJSON(t, echoCtx, expectStatusCode, api.Error{ Code: int32(expectStatusCode), Message: expectMessage, @@ -140,6 +147,13 @@ func assertResponseNoContent(t *testing.T, echoCtx echo.Context) { assert.Zero(t, resp.Body.Len(), "HTTP 204 No Content should have no content, got %v", resp.Body.String()) } +// assertResponseNoBody asserts the response has no body and the given status. +func assertResponseNoBody(t *testing.T, echoCtx echo.Context, expectStatus int) { + resp := getRecordedResponseRecorder(echoCtx) + assert.Equal(t, expectStatus, resp.Code, "Unexpected status: %v", resp.Result().Status) + assert.Zero(t, resp.Body.Len(), "HTTP response have no content, got %v", resp.Body.String()) +} + func testWorker() persistence.Worker { return persistence.Worker{ Model: persistence.Model{ID: 1}, diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 8901721c..95667dce 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "io" "net/http" "strings" "time" @@ -14,6 +15,7 @@ import ( "github.com/rs/zerolog" "golang.org/x/crypto/bcrypt" + "git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/task_state_machine" "git.blender.org/flamenco/internal/manager/webupdates" @@ -347,6 +349,84 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { return e.JSON(http.StatusOK, customisedTask) } +func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error { + ctx := e.Request().Context() + filesize := e.Request().ContentLength + worker := requestWorkerOrPanic(e) + logger := requestLogger(e).With(). + Str("task", taskID). + Int64("imageSizeBytes", filesize). + Logger() + + err := f.workerSeen(ctx, logger, worker) + if err != nil { + return sendAPIError(e, http.StatusInternalServerError, "error updating 'last seen' timestamp of worker: %v", err) + } + + // Check the file size: + switch { + case filesize <= 0: + logger.Warn().Msg("TaskOutputProduced: Worker did not sent Content-Length header") + return sendAPIError(e, http.StatusLengthRequired, "Content-Length header required") + case filesize > last_rendered.MaxImageSizeBytes: + logger.Warn(). + Int64("imageSizeBytesMax", last_rendered.MaxImageSizeBytes). + Msg("TaskOutputProduced: Worker sent too large last-rendered image") + return sendAPIError(e, http.StatusRequestEntityTooLarge, + "image too large; should be max %v bytes", last_rendered.MaxImageSizeBytes) + } + + // Fetch the task, to find its job UUID: + dbTask, err := f.persist.FetchTask(ctx, taskID) + switch { + case errors.Is(err, persistence.ErrTaskNotFound): + return e.JSON(http.StatusNotFound, "Task does not exist") + case err != nil: + logger.Error().Err(err).Msg("TaskOutputProduced: cannot fetch task") + return sendAPIError(e, http.StatusInternalServerError, "error fetching task") + case dbTask == nil: + panic("task could not be fetched, but database gave no error either") + } + + // Read the image bytes into memory. + imageBytes, err := io.ReadAll(e.Request().Body) + if err != nil { + logger.Warn().Err(err).Msg("TaskOutputProduced: error reading image from request") + return sendAPIError(e, http.StatusBadRequest, "error reading request body: %v", err) + } + + // Create the "last rendered" payload. + payload := last_rendered.Payload{ + JobUUID: dbTask.Job.UUID, + WorkerUUID: worker.UUID, + MimeType: e.Request().Header.Get("Content-Type"), + Image: imageBytes, + } + + // Queue the image for processing: + err = f.lastRender.QueueImage(payload) + if err != nil { + switch { + case errors.Is(err, last_rendered.ErrMimeTypeUnsupported): + logger.Warn(). + Str("mimeType", payload.MimeType). + Msg("TaskOutputProduced: Worker sent unsupported mime type") + return sendAPIError(e, http.StatusUnsupportedMediaType, "unsupported mime type %q", payload.MimeType) + case errors.Is(err, last_rendered.ErrQueueFull): + logger.Info(). + Msg("TaskOutputProduced: image processing queue is full, ignoring request") + return sendAPIError(e, http.StatusTooManyRequests, "image processing queue is full") + default: + logger.Error().Err(err). + Msg("TaskOutputProduced: error queueing image") + return sendAPIError(e, http.StatusInternalServerError, "error queueing image for processing: %v", err) + } + } + + logger.Info().Msg("TaskOutputProduced: accepted last-rendered image for processing") + return e.NoContent(http.StatusAccepted) +} + func (f *Flamenco) workerPingedTask( ctx context.Context, logger zerolog.Logger, diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index cec388e3..9ded970e 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -3,7 +3,9 @@ package api_impl // SPDX-License-Identifier: GPL-3.0-or-later import ( + "bytes" "context" + "io" "net/http" "testing" @@ -11,6 +13,7 @@ import ( "github.com/labstack/echo/v4" "github.com/stretchr/testify/assert" + "git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/pkg/api" ) @@ -455,3 +458,103 @@ func TestMayWorkerRun(t *testing.T) { }) } } + +func TestTaskOutputProduced(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + + prepareRequest := func(body io.Reader) echo.Context { + echo := mf.prepareMockedRequest(body) + requestWorkerStore(echo, &worker) + return echo + } + + job := persistence.Job{ + UUID: "583a7d59-887a-4c6c-b3e4-a753018f71b0", + } + task := persistence.Task{ + UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503", + Job: &job, + Status: api.TaskStatusActive, + } + + // Mock body to use in the request. + bodyBytes := []byte("JPEG file contents") + + // Test: unhappy, missing Content-Length header. + { + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) + + echo := prepareRequest(nil) + err := mf.flamenco.TaskOutputProduced(echo, task.UUID) + assert.NoError(t, err) + assertResponseAPIError(t, echo, http.StatusLengthRequired, "Content-Length header required") + } + + // Test: unhappy, too large Content-Length header. + { + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) + + bodyBytes := bytes.Repeat([]byte("x"), int(last_rendered.MaxImageSizeBytes+1)) + if int64(len(bodyBytes)) != last_rendered.MaxImageSizeBytes+1 { + panic("cannot generate enough bytes") + } + + echo := prepareRequest(bytes.NewReader(bodyBytes)) + err := mf.flamenco.TaskOutputProduced(echo, task.UUID) + assert.NoError(t, err) + assertResponseAPIError(t, echo, http.StatusRequestEntityTooLarge, + "image too large; should be max %v bytes", last_rendered.MaxImageSizeBytes) + } + + // Test: unhappy, wrong mime type + { + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) + mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil) + + echo := prepareRequest(bytes.NewReader(bodyBytes)) + echo.Request().Header.Set("Content-Type", "image/openexr") + mf.lastRender.EXPECT().QueueImage(gomock.Any()).Return(last_rendered.ErrMimeTypeUnsupported) + + err := mf.flamenco.TaskOutputProduced(echo, task.UUID) + assert.NoError(t, err) + assertResponseAPIError(t, echo, http.StatusUnsupportedMediaType, `unsupported mime type "image/openexr"`) + } + + // Test: unhappy, queue full + { + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) + mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil) + + echo := prepareRequest(bytes.NewReader(bodyBytes)) + mf.lastRender.EXPECT().QueueImage(gomock.Any()).Return(last_rendered.ErrQueueFull) + + err := mf.flamenco.TaskOutputProduced(echo, task.UUID) + assert.NoError(t, err) + assertResponseAPIError(t, echo, http.StatusTooManyRequests, "image processing queue is full") + } + + // Test: happy + { + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) + mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil) + + echo := prepareRequest(bytes.NewReader(bodyBytes)) + echo.Request().Header.Set("Content-Type", "image/jpeg") + expectPayload := last_rendered.Payload{ + JobUUID: job.UUID, + WorkerUUID: worker.UUID, + MimeType: "image/jpeg", + Image: bodyBytes, + } + mf.lastRender.EXPECT().QueueImage(expectPayload).Return(nil) + + err := mf.flamenco.TaskOutputProduced(echo, task.UUID) + assert.NoError(t, err) + assertResponseNoBody(t, echo, http.StatusAccepted) + } + +} diff --git a/internal/manager/last_rendered/image_processing.go b/internal/manager/last_rendered/image_processing.go new file mode 100644 index 00000000..b2f6697d --- /dev/null +++ b/internal/manager/last_rendered/image_processing.go @@ -0,0 +1,79 @@ +package last_rendered + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "bytes" + "errors" + "fmt" + "image" + "image/jpeg" + "os" + "path/filepath" + + // Import for side-effect of registering decoder. + _ "image/png" + + "github.com/disintegration/imaging" +) + +var ( + supportedMimeTypes = map[string]bool{ + "image/jpeg": true, + "image/png": true, + } + + ErrMimeTypeUnsupported = errors.New("mime type unsupported") +) + +// decodeImage checks the payload mime type, and if okay, decodes the image and returns it. +// Returns `ErrMimeTypeUnsupported` if the mime type is unsupported. +func decodeImage(payload Payload) (image.Image, error) { + if !supportedMimeTypes[payload.MimeType] { + return nil, ErrMimeTypeUnsupported + } + + reader := bytes.NewReader(payload.Image) + img, _, err := image.Decode(reader) + if err != nil { + return nil, fmt.Errorf("decoding image: %w", err) + } + return img, nil +} + +// saveJPEG writes the given image to a JPEG file. +func saveJPEG(targetpath string, img image.Image) error { + // Ensure the directory exists. + targetdir := filepath.Dir(targetpath) + err := os.MkdirAll(targetdir, os.ModePerm) + if err != nil { + return fmt.Errorf("creating directory %s: %w", targetdir, err) + } + + file, err := os.Create(targetpath) + if err != nil { + return fmt.Errorf("creating file: %w", err) + } + + options := jpeg.Options{ + Quality: thumbnailJPEGQuality, + } + err = jpeg.Encode(file, img, &options) + if err != nil { + return fmt.Errorf("encoding as JPEG: %w", err) + } + + err = file.Close() + if err != nil { + return fmt.Errorf("closing file: %w", err) + } + return nil +} + +func downscaleImage(spec thumbspec, img image.Image) image.Image { + // Fill out the entire frame, cropping the image if necessary: + // return imaging.Fill(img, spec.maxWidth, spec.maxHeight, imaging.Center, imaging.Lanczos) + + // Fit the image to the frame, potentially resulting in either a narrower or lower image: + return imaging.Fit(img, spec.maxWidth, spec.maxHeight, imaging.Lanczos) +} diff --git a/internal/manager/last_rendered/last_rendered.go b/internal/manager/last_rendered/last_rendered.go new file mode 100644 index 00000000..cb4d6691 --- /dev/null +++ b/internal/manager/last_rendered/last_rendered.go @@ -0,0 +1,167 @@ +package last_rendered + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "errors" + "path/filepath" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +const ( + // MaxImageSizeBytes is the maximum size in bytes allowed for to-be-processed images. + MaxImageSizeBytes int64 = 10 * 1024 * 1024 + + // queueSize determines how many images can be queued in memory before rejecting + // new requests to process. + queueSize = 3 + + thumbnailJPEGQuality = 80 +) + +var ( + ErrQueueFull = errors.New("queue full") + + // thumbnails specifies the thumbnail sizes. For efficiency, they should be + // listed from large to small, as each thumbnail is the input for the next + // one. + thumbnails = []thumbspec{ + {"last-rendered.jpg", 1920, 1080}, + {"last-rendered-small.jpg", 600, 338}, + {"last-rendered-tiny.jpg", 48, 28}, + } +) + +type Storage interface { + // ForJob returns the directory path for storing job-related files. + ForJob(jobUUID string) string +} + +// LastRenderedProcessor processes "last-rendered" images and stores them with +// the job. +type LastRenderedProcessor struct { + storage Storage + + // TODO: expand this queue to be per job, so that one spammy job doesn't block + // the queue for other jobs. + queue chan Payload + + processingDonecallback func(jobUUID string) +} + +// Payload contains the actual image to process. +type Payload struct { + JobUUID string // Used to determine the directory to store the image. + WorkerUUID string // Just for logging. + MimeType string + Image []byte +} + +// thumbspec specifies a thumbnail size & filename. +type thumbspec struct { + filename string + maxWidth int + maxHeight int +} + +func New(storage Storage) *LastRenderedProcessor { + return &LastRenderedProcessor{ + storage: storage, + queue: make(chan Payload, queueSize), + } +} + +// SetCallback registers a 'done' callback, which will be called after the job +// received a new last-rendered image. +// There is only one such callback, so calling this will overwrite the +// previously-set callback function. Pass `nil` to un-set. +func (lrp *LastRenderedProcessor) SetCallback(processingDonecallback func(jobUUID string)) { + lrp.processingDonecallback = processingDonecallback +} + +// Run is the main loop for the processing of images. It will keep running until +// the context is closed. +func (lrp *LastRenderedProcessor) Run(ctx context.Context) { + log.Debug().Msg("last-rendered: queue runner running") + defer log.Debug().Msg("last-rendered: queue runner shutting down") + + for { + select { + case <-ctx.Done(): + return + case payload := <-lrp.queue: + lrp.processImage(payload) + } + } +} + +// QueueImage queues an image for processing. +// Returns `ErrQueueFull` if there is no more space in the queue for new images. +func (lrp *LastRenderedProcessor) QueueImage(payload Payload) error { + logger := payload.sublogger(log.Logger) + select { + case lrp.queue <- payload: + logger.Debug().Msg("last-rendered: queued image for processing") + return nil + default: + logger.Debug().Msg("last-rendered: unable to queue image for processing") + return ErrQueueFull + } +} + +// processImage down-scales the image to a few thumbnails for presentation in +// the web interface, and stores those in a job-specific directory. +// +// Because this is intended as internal queue-processing function, errors are +// logged but not returned. +func (lrp *LastRenderedProcessor) processImage(payload Payload) { + jobDir := lrp.storage.ForJob(payload.JobUUID) + + logger := log.With().Str("jobDir", jobDir).Logger() + logger = payload.sublogger(logger) + + // Decode the image. + image, err := decodeImage(payload) + if err != nil { + logger.Error().Err(err).Msg("last-rendered: unable to decode image") + return + } + + // Generate the thumbnails. + for _, spec := range thumbnails { + thumbLogger := spec.sublogger(logger) + thumbLogger.Trace().Msg("last-rendered: creating thumbnail") + + image = downscaleImage(spec, image) + + imgpath := filepath.Join(jobDir, spec.filename) + if err := saveJPEG(imgpath, image); err != nil { + thumbLogger.Error().Err(err).Msg("last-rendered: error saving thumbnail") + break + } + } + + // Call the callback, if provided. + if lrp.processingDonecallback != nil { + lrp.processingDonecallback(payload.JobUUID) + } +} + +func (p Payload) sublogger(logger zerolog.Logger) zerolog.Logger { + return logger.With(). + Str("job", p.JobUUID). + Str("producedByWorker", p.WorkerUUID). + Str("mime", p.MimeType). + Logger() +} + +func (spec thumbspec) sublogger(logger zerolog.Logger) zerolog.Logger { + return logger.With(). + Int("width", spec.maxWidth). + Int("height", spec.maxHeight). + Str("filename", spec.filename). + Logger() +} diff --git a/internal/manager/last_rendered/last_rendered_test.go b/internal/manager/last_rendered/last_rendered_test.go new file mode 100644 index 00000000..faa98911 --- /dev/null +++ b/internal/manager/last_rendered/last_rendered_test.go @@ -0,0 +1,112 @@ +package last_rendered + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "image" + "os" + "path/filepath" + "testing" + + "git.blender.org/flamenco/internal/manager/local_storage" + "github.com/stretchr/testify/assert" +) + +func TestNew(t *testing.T) { + storage := local_storage.NewNextToExe("lrp") + defer storage.MustErase() + + callback := func(string) {} + lrp := New(storage) + assert.Equal(t, lrp.storage, storage) + assert.NotNil(t, lrp.queue) + assert.Nil(t, lrp.processingDonecallback) + + lrp.SetCallback(callback) + assert.NotNil(t, lrp.processingDonecallback) +} + +func TestQueueImage(t *testing.T) { + // Canary test. + if !assert.Equal(t, queueSize, 3) { + t.Fatalf("queueSize must be 3 for this test, but is %v", queueSize) + } + + payload := Payload{ + JobUUID: "2205227c-593c-46ac-a0d7-e115d4e80dd4", + MimeType: "image/png", + Image: []byte("PNG file contents"), + } + + storage := local_storage.NewNextToExe("lrp") + defer storage.MustErase() + lrp := New(storage) + + assert.NoError(t, lrp.QueueImage(payload)) + assert.NoError(t, lrp.QueueImage(payload)) + assert.NoError(t, lrp.QueueImage(payload)) + assert.ErrorIs(t, lrp.QueueImage(payload), ErrQueueFull) +} + +func TestProcessImage(t *testing.T) { + // Load the test image. Note that this intentionally has an approximate 21:9 + // ratio, whereas the thumbnail specs define a 16:9 ratio. + imgBytes, err := os.ReadFile("last_rendered_test.jpg") + if !assert.NoError(t, err) { + t.FailNow() + } + + jobID := "e078438b-c9f5-43e6-9e86-52f8be91dd12" + payload := Payload{ + JobUUID: jobID, + MimeType: "image/jpeg", + Image: imgBytes, + } + + storage := local_storage.NewNextToExe("lrp") + defer storage.MustErase() + lrp := New(storage) + + callbackCount := 0 + lrp.SetCallback(func(callbackJobID string) { + assert.Equal(t, jobID, callbackJobID) + callbackCount++ + }) + + // Sanity check: the thumbnails shouldn't exist yet. + jobdir := storage.ForJob(jobID) + assert.NoFileExists(t, filepath.Join(jobdir, "last-rendered.jpg")) + assert.NoFileExists(t, filepath.Join(jobdir, "last-rendered-small.jpg")) + assert.NoFileExists(t, filepath.Join(jobdir, "last-rendered-tiny.jpg")) + + lrp.processImage(payload) + + // The files should exist now. + assert.FileExists(t, filepath.Join(jobdir, "last-rendered.jpg")) + assert.FileExists(t, filepath.Join(jobdir, "last-rendered-small.jpg")) + assert.FileExists(t, filepath.Join(jobdir, "last-rendered-tiny.jpg")) + + assert.Equal(t, callbackCount, 1, "the 'done' callback should be called exactly once") + + // Check the sizes, they should match the thumbspec. + assertImageSize := func(spec thumbspec) { + path := filepath.Join(jobdir, spec.filename) + file, err := os.Open(path) + if !assert.NoError(t, err, "thumbnail %s should be openable", spec.filename) { + return + } + + img, format, err := image.Decode(file) + if !assert.NoErrorf(t, err, "thumbnail %s should be decodable", spec.filename) { + return + } + + assert.Equalf(t, "jpeg", format, "thumbnail %s not written in the expected format", spec.filename) + assert.LessOrEqualf(t, img.Bounds().Dx(), spec.maxWidth, "thumbnail %s has wrong width", spec.filename) + assert.LessOrEqualf(t, img.Bounds().Dy(), spec.maxHeight, "thumbnail %s has wrong height", spec.filename) + } + + for _, spec := range thumbnails { + assertImageSize(spec) + } +} diff --git a/internal/manager/last_rendered/last_rendered_test.jpg b/internal/manager/last_rendered/last_rendered_test.jpg new file mode 100644 index 00000000..cad62a7d Binary files /dev/null and b/internal/manager/last_rendered/last_rendered_test.jpg differ