Manager: add "last rendered image" processing pipeline

Add a handler for the OpenAPI `taskOutputProduced` operation, and an
image thumbnailing goroutine.

The queue of images to process + the function to handle queued images
is managed by `last_rendered.LastRenderedProcessor`. This queue currently
simply allows 3 requests; this should be improved such that it keeps
track of the job IDs as well, as with the current approach a spammy job
can starve the updates from a more calm job.
This commit is contained in:
Sybren A. Stüvel 2022-06-24 16:48:57 +02:00
parent a43826ce0c
commit e687c95e5d
14 changed files with 634 additions and 6 deletions

@ -53,6 +53,7 @@ Note that list is **not** in any specific order.
- [ ] Show blocklist in web interface, and allow removal of workers.
- [x] Worker timeout monitoring
- [ ] Last rendered image display
- [ ] Expand the processing queue from a single channel to a queue per job, so that a spammy job doesn't starve the other jobs from queueing images.
- [ ] Web interface: Support actions on multiple selected things
- [ ] Workers

@ -32,6 +32,8 @@ import (
"git.blender.org/flamenco/internal/manager/api_impl"
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/last_rendered"
"git.blender.org/flamenco/internal/manager/local_storage"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/swagger_ui"
"git.blender.org/flamenco/internal/manager/task_logs"
@ -110,9 +112,15 @@ func main() {
timeService := clock.New()
webUpdater := webupdates.New()
// TODO: the local storage now is hard-coded to use the same sub-directory as the task log storage.
// This should be refactored so that the task logs storage uses the localStorage object as well.
localStorage := local_storage.NewNextToExe("task-logs")
logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater)
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage)
flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, logStorage, webUpdater)
lastRender := last_rendered.New(localStorage)
flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, logStorage, webUpdater, lastRender)
e := buildWebService(flamenco, persist, ssdp, webUpdater, urls)
timeoutChecker := timeout_checker.New(
@ -129,6 +137,13 @@ func main() {
// done, the main() function will return and the process will stop.
wg := new(sync.WaitGroup)
// Run the "last rendered image" processor.
wg.Add(1)
go func() {
defer wg.Done()
lastRender.Run(mainCtx)
}()
// Start the web server.
wg.Add(1)
go func() {
@ -171,6 +186,7 @@ func buildFlamencoAPI(
taskStateMachine *task_state_machine.StateMachine,
logStorage *task_logs.Storage,
webUpdater *webupdates.BiDirComms,
lastRender *last_rendered.LastRenderedProcessor,
) api.ServerInterface {
compiler, err := job_compilers.Load(timeService)
if err != nil {
@ -179,7 +195,7 @@ func buildFlamencoAPI(
shamanServer := shaman.NewServer(configService.Get().Shaman, nil)
flamenco := api_impl.NewFlamenco(
compiler, persist, webUpdater, logStorage, configService,
taskStateMachine, shamanServer, timeService)
taskStateMachine, shamanServer, timeService, lastRender)
return flamenco
}

2
go.mod

@ -29,6 +29,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/disintegration/imaging v1.6.2 // indirect
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect
github.com/gertd/go-pluralize v0.2.1 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
@ -48,6 +49,7 @@ require (
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.1 // indirect
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b // indirect
golang.org/x/text v0.3.7 // indirect

4
go.sum

@ -11,6 +11,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.0-20210816181553-5444fa50b93d/go.
github.com/deepmap/oapi-codegen v1.9.0 h1:qpyRY+dzjMai5QejjA53ebnBtcSvIcZOtYwVlsgdxOc=
github.com/deepmap/oapi-codegen v1.9.0/go.mod h1:7t4DbSxmAffcTEgrWvsPYEE2aOARZ8ZKWp3hDuZkHNc=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c=
github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4=
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 h1:Izz0+t1Z5nI16/II7vuEo/nHjodOg0p7+OiDpjX5t1E=
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc=
github.com/dop251/goja v0.0.0-20211217115348-3f9136fa235d h1:XT7Qdmcuwgsgz4GXejX7R5Morysk2GOpeguYJ9JoF5c=
@ -169,6 +171,8 @@ golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e h1:1SzTfNOXwIS2oWiMF+6qu0OUDKb0dauo6MoDUQyu+yU=
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 h1:hVwzHzIUGRjiF7EcUjqNxk3NCfkPxbDKRdnNE1Rpg0U=
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=

@ -23,6 +23,7 @@ type Flamenco struct {
stateMachine TaskStateMachine
shaman Shaman
clock TimeService
lastRender LastRendered
// The task scheduler can be locked to prevent multiple Workers from getting
// the same task. It is also used for certain other queries, like
@ -42,6 +43,7 @@ func NewFlamenco(
sm TaskStateMachine,
sha Shaman,
ts TimeService,
lr LastRendered,
) *Flamenco {
return &Flamenco{
jobCompiler: jc,
@ -52,6 +54,7 @@ func NewFlamenco(
stateMachine: sm,
shaman: sha,
clock: ts,
lastRender: lr,
}
}

@ -15,6 +15,7 @@ import (
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/last_rendered"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/task_state_machine"
"git.blender.org/flamenco/internal/manager/webupdates"
@ -23,7 +24,7 @@ import (
)
// Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman
//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered
type PersistenceService interface {
StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error
@ -114,6 +115,14 @@ type LogStorage interface {
Tail(jobID, taskID string) (string, error)
}
// LastRendered processes the "last rendered" images.
type LastRendered interface {
// QueueImage queues an image for processing. Returns
// `last_rendered.ErrQueueFull` if there is no more space in the queue for
// new images.
QueueImage(payload last_rendered.Payload) error
}
type ConfigService interface {
VariableReplacer

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman)
// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered)
// Package mocks is a generated GoMock package.
package mocks
@ -11,6 +11,7 @@ import (
config "git.blender.org/flamenco/internal/manager/config"
job_compilers "git.blender.org/flamenco/internal/manager/job_compilers"
last_rendered "git.blender.org/flamenco/internal/manager/last_rendered"
persistence "git.blender.org/flamenco/internal/manager/persistence"
api "git.blender.org/flamenco/pkg/api"
gomock "github.com/golang/mock/gomock"
@ -801,3 +802,40 @@ func (mr *MockShamanMockRecorder) Requirements(arg0, arg1 interface{}) *gomock.C
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Requirements", reflect.TypeOf((*MockShaman)(nil).Requirements), arg0, arg1)
}
// MockLastRendered is a mock of LastRendered interface.
type MockLastRendered struct {
ctrl *gomock.Controller
recorder *MockLastRenderedMockRecorder
}
// MockLastRenderedMockRecorder is the mock recorder for MockLastRendered.
type MockLastRenderedMockRecorder struct {
mock *MockLastRendered
}
// NewMockLastRendered creates a new mock instance.
func NewMockLastRendered(ctrl *gomock.Controller) *MockLastRendered {
mock := &MockLastRendered{ctrl: ctrl}
mock.recorder = &MockLastRenderedMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockLastRendered) EXPECT() *MockLastRenderedMockRecorder {
return m.recorder
}
// QueueImage mocks base method.
func (m *MockLastRendered) QueueImage(arg0 last_rendered.Payload) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "QueueImage", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// QueueImage indicates an expected call of QueueImage.
func (mr *MockLastRenderedMockRecorder) QueueImage(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueImage", reflect.TypeOf((*MockLastRendered)(nil).QueueImage), arg0)
}

@ -32,6 +32,7 @@ type mockedFlamenco struct {
stateMachine *mocks.MockTaskStateMachine
shaman *mocks.MockShaman
clock *clock.Mock
lastRender *mocks.MockLastRendered
}
func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
@ -42,6 +43,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
cs := mocks.NewMockConfigService(mockCtrl)
sm := mocks.NewMockTaskStateMachine(mockCtrl)
sha := mocks.NewMockShaman(mockCtrl)
lr := mocks.NewMockLastRendered(mockCtrl)
clock := clock.NewMock()
mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00")
@ -50,7 +52,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
}
clock.Set(mockedNow)
f := NewFlamenco(jc, ps, cb, ls, cs, sm, sha, clock)
f := NewFlamenco(jc, ps, cb, ls, cs, sm, sha, clock, lr)
return mockedFlamenco{
flamenco: f,
@ -61,6 +63,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
config: cs,
stateMachine: sm,
clock: clock,
lastRender: lr,
}
}
@ -126,7 +129,11 @@ func assertResponseJSON(t *testing.T, echoCtx echo.Context, expectStatusCode int
assert.JSONEq(t, string(expectJSON), string(actualJSON))
}
func assertResponseAPIError(t *testing.T, echoCtx echo.Context, expectStatusCode int, expectMessage string) {
func assertResponseAPIError(t *testing.T, echoCtx echo.Context, expectStatusCode int, expectMessage string, fmtArgs ...interface{}) {
if len(fmtArgs) > 0 {
expectMessage = fmt.Sprintf(expectMessage, fmtArgs...)
}
assertResponseJSON(t, echoCtx, expectStatusCode, api.Error{
Code: int32(expectStatusCode),
Message: expectMessage,
@ -140,6 +147,13 @@ func assertResponseNoContent(t *testing.T, echoCtx echo.Context) {
assert.Zero(t, resp.Body.Len(), "HTTP 204 No Content should have no content, got %v", resp.Body.String())
}
// assertResponseNoBody asserts the response has no body and the given status.
func assertResponseNoBody(t *testing.T, echoCtx echo.Context, expectStatus int) {
resp := getRecordedResponseRecorder(echoCtx)
assert.Equal(t, expectStatus, resp.Code, "Unexpected status: %v", resp.Result().Status)
assert.Zero(t, resp.Body.Len(), "HTTP response have no content, got %v", resp.Body.String())
}
func testWorker() persistence.Worker {
return persistence.Worker{
Model: persistence.Model{ID: 1},

@ -6,6 +6,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
@ -14,6 +15,7 @@ import (
"github.com/rs/zerolog"
"golang.org/x/crypto/bcrypt"
"git.blender.org/flamenco/internal/manager/last_rendered"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/task_state_machine"
"git.blender.org/flamenco/internal/manager/webupdates"
@ -347,6 +349,84 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
return e.JSON(http.StatusOK, customisedTask)
}
func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error {
ctx := e.Request().Context()
filesize := e.Request().ContentLength
worker := requestWorkerOrPanic(e)
logger := requestLogger(e).With().
Str("task", taskID).
Int64("imageSizeBytes", filesize).
Logger()
err := f.workerSeen(ctx, logger, worker)
if err != nil {
return sendAPIError(e, http.StatusInternalServerError, "error updating 'last seen' timestamp of worker: %v", err)
}
// Check the file size:
switch {
case filesize <= 0:
logger.Warn().Msg("TaskOutputProduced: Worker did not sent Content-Length header")
return sendAPIError(e, http.StatusLengthRequired, "Content-Length header required")
case filesize > last_rendered.MaxImageSizeBytes:
logger.Warn().
Int64("imageSizeBytesMax", last_rendered.MaxImageSizeBytes).
Msg("TaskOutputProduced: Worker sent too large last-rendered image")
return sendAPIError(e, http.StatusRequestEntityTooLarge,
"image too large; should be max %v bytes", last_rendered.MaxImageSizeBytes)
}
// Fetch the task, to find its job UUID:
dbTask, err := f.persist.FetchTask(ctx, taskID)
switch {
case errors.Is(err, persistence.ErrTaskNotFound):
return e.JSON(http.StatusNotFound, "Task does not exist")
case err != nil:
logger.Error().Err(err).Msg("TaskOutputProduced: cannot fetch task")
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
case dbTask == nil:
panic("task could not be fetched, but database gave no error either")
}
// Read the image bytes into memory.
imageBytes, err := io.ReadAll(e.Request().Body)
if err != nil {
logger.Warn().Err(err).Msg("TaskOutputProduced: error reading image from request")
return sendAPIError(e, http.StatusBadRequest, "error reading request body: %v", err)
}
// Create the "last rendered" payload.
payload := last_rendered.Payload{
JobUUID: dbTask.Job.UUID,
WorkerUUID: worker.UUID,
MimeType: e.Request().Header.Get("Content-Type"),
Image: imageBytes,
}
// Queue the image for processing:
err = f.lastRender.QueueImage(payload)
if err != nil {
switch {
case errors.Is(err, last_rendered.ErrMimeTypeUnsupported):
logger.Warn().
Str("mimeType", payload.MimeType).
Msg("TaskOutputProduced: Worker sent unsupported mime type")
return sendAPIError(e, http.StatusUnsupportedMediaType, "unsupported mime type %q", payload.MimeType)
case errors.Is(err, last_rendered.ErrQueueFull):
logger.Info().
Msg("TaskOutputProduced: image processing queue is full, ignoring request")
return sendAPIError(e, http.StatusTooManyRequests, "image processing queue is full")
default:
logger.Error().Err(err).
Msg("TaskOutputProduced: error queueing image")
return sendAPIError(e, http.StatusInternalServerError, "error queueing image for processing: %v", err)
}
}
logger.Info().Msg("TaskOutputProduced: accepted last-rendered image for processing")
return e.NoContent(http.StatusAccepted)
}
func (f *Flamenco) workerPingedTask(
ctx context.Context,
logger zerolog.Logger,

@ -3,7 +3,9 @@ package api_impl
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"bytes"
"context"
"io"
"net/http"
"testing"
@ -11,6 +13,7 @@ import (
"github.com/labstack/echo/v4"
"github.com/stretchr/testify/assert"
"git.blender.org/flamenco/internal/manager/last_rendered"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
)
@ -455,3 +458,103 @@ func TestMayWorkerRun(t *testing.T) {
})
}
}
func TestTaskOutputProduced(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
prepareRequest := func(body io.Reader) echo.Context {
echo := mf.prepareMockedRequest(body)
requestWorkerStore(echo, &worker)
return echo
}
job := persistence.Job{
UUID: "583a7d59-887a-4c6c-b3e4-a753018f71b0",
}
task := persistence.Task{
UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503",
Job: &job,
Status: api.TaskStatusActive,
}
// Mock body to use in the request.
bodyBytes := []byte("JPEG file contents")
// Test: unhappy, missing Content-Length header.
{
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
echo := prepareRequest(nil)
err := mf.flamenco.TaskOutputProduced(echo, task.UUID)
assert.NoError(t, err)
assertResponseAPIError(t, echo, http.StatusLengthRequired, "Content-Length header required")
}
// Test: unhappy, too large Content-Length header.
{
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
bodyBytes := bytes.Repeat([]byte("x"), int(last_rendered.MaxImageSizeBytes+1))
if int64(len(bodyBytes)) != last_rendered.MaxImageSizeBytes+1 {
panic("cannot generate enough bytes")
}
echo := prepareRequest(bytes.NewReader(bodyBytes))
err := mf.flamenco.TaskOutputProduced(echo, task.UUID)
assert.NoError(t, err)
assertResponseAPIError(t, echo, http.StatusRequestEntityTooLarge,
"image too large; should be max %v bytes", last_rendered.MaxImageSizeBytes)
}
// Test: unhappy, wrong mime type
{
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil)
echo := prepareRequest(bytes.NewReader(bodyBytes))
echo.Request().Header.Set("Content-Type", "image/openexr")
mf.lastRender.EXPECT().QueueImage(gomock.Any()).Return(last_rendered.ErrMimeTypeUnsupported)
err := mf.flamenco.TaskOutputProduced(echo, task.UUID)
assert.NoError(t, err)
assertResponseAPIError(t, echo, http.StatusUnsupportedMediaType, `unsupported mime type "image/openexr"`)
}
// Test: unhappy, queue full
{
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil)
echo := prepareRequest(bytes.NewReader(bodyBytes))
mf.lastRender.EXPECT().QueueImage(gomock.Any()).Return(last_rendered.ErrQueueFull)
err := mf.flamenco.TaskOutputProduced(echo, task.UUID)
assert.NoError(t, err)
assertResponseAPIError(t, echo, http.StatusTooManyRequests, "image processing queue is full")
}
// Test: happy
{
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil)
echo := prepareRequest(bytes.NewReader(bodyBytes))
echo.Request().Header.Set("Content-Type", "image/jpeg")
expectPayload := last_rendered.Payload{
JobUUID: job.UUID,
WorkerUUID: worker.UUID,
MimeType: "image/jpeg",
Image: bodyBytes,
}
mf.lastRender.EXPECT().QueueImage(expectPayload).Return(nil)
err := mf.flamenco.TaskOutputProduced(echo, task.UUID)
assert.NoError(t, err)
assertResponseNoBody(t, echo, http.StatusAccepted)
}
}

@ -0,0 +1,79 @@
package last_rendered
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"bytes"
"errors"
"fmt"
"image"
"image/jpeg"
"os"
"path/filepath"
// Import for side-effect of registering decoder.
_ "image/png"
"github.com/disintegration/imaging"
)
var (
supportedMimeTypes = map[string]bool{
"image/jpeg": true,
"image/png": true,
}
ErrMimeTypeUnsupported = errors.New("mime type unsupported")
)
// decodeImage checks the payload mime type, and if okay, decodes the image and returns it.
// Returns `ErrMimeTypeUnsupported` if the mime type is unsupported.
func decodeImage(payload Payload) (image.Image, error) {
if !supportedMimeTypes[payload.MimeType] {
return nil, ErrMimeTypeUnsupported
}
reader := bytes.NewReader(payload.Image)
img, _, err := image.Decode(reader)
if err != nil {
return nil, fmt.Errorf("decoding image: %w", err)
}
return img, nil
}
// saveJPEG writes the given image to a JPEG file.
func saveJPEG(targetpath string, img image.Image) error {
// Ensure the directory exists.
targetdir := filepath.Dir(targetpath)
err := os.MkdirAll(targetdir, os.ModePerm)
if err != nil {
return fmt.Errorf("creating directory %s: %w", targetdir, err)
}
file, err := os.Create(targetpath)
if err != nil {
return fmt.Errorf("creating file: %w", err)
}
options := jpeg.Options{
Quality: thumbnailJPEGQuality,
}
err = jpeg.Encode(file, img, &options)
if err != nil {
return fmt.Errorf("encoding as JPEG: %w", err)
}
err = file.Close()
if err != nil {
return fmt.Errorf("closing file: %w", err)
}
return nil
}
func downscaleImage(spec thumbspec, img image.Image) image.Image {
// Fill out the entire frame, cropping the image if necessary:
// return imaging.Fill(img, spec.maxWidth, spec.maxHeight, imaging.Center, imaging.Lanczos)
// Fit the image to the frame, potentially resulting in either a narrower or lower image:
return imaging.Fit(img, spec.maxWidth, spec.maxHeight, imaging.Lanczos)
}

@ -0,0 +1,167 @@
package last_rendered
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"path/filepath"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
const (
// MaxImageSizeBytes is the maximum size in bytes allowed for to-be-processed images.
MaxImageSizeBytes int64 = 10 * 1024 * 1024
// queueSize determines how many images can be queued in memory before rejecting
// new requests to process.
queueSize = 3
thumbnailJPEGQuality = 80
)
var (
ErrQueueFull = errors.New("queue full")
// thumbnails specifies the thumbnail sizes. For efficiency, they should be
// listed from large to small, as each thumbnail is the input for the next
// one.
thumbnails = []thumbspec{
{"last-rendered.jpg", 1920, 1080},
{"last-rendered-small.jpg", 600, 338},
{"last-rendered-tiny.jpg", 48, 28},
}
)
type Storage interface {
// ForJob returns the directory path for storing job-related files.
ForJob(jobUUID string) string
}
// LastRenderedProcessor processes "last-rendered" images and stores them with
// the job.
type LastRenderedProcessor struct {
storage Storage
// TODO: expand this queue to be per job, so that one spammy job doesn't block
// the queue for other jobs.
queue chan Payload
processingDonecallback func(jobUUID string)
}
// Payload contains the actual image to process.
type Payload struct {
JobUUID string // Used to determine the directory to store the image.
WorkerUUID string // Just for logging.
MimeType string
Image []byte
}
// thumbspec specifies a thumbnail size & filename.
type thumbspec struct {
filename string
maxWidth int
maxHeight int
}
func New(storage Storage) *LastRenderedProcessor {
return &LastRenderedProcessor{
storage: storage,
queue: make(chan Payload, queueSize),
}
}
// SetCallback registers a 'done' callback, which will be called after the job
// received a new last-rendered image.
// There is only one such callback, so calling this will overwrite the
// previously-set callback function. Pass `nil` to un-set.
func (lrp *LastRenderedProcessor) SetCallback(processingDonecallback func(jobUUID string)) {
lrp.processingDonecallback = processingDonecallback
}
// Run is the main loop for the processing of images. It will keep running until
// the context is closed.
func (lrp *LastRenderedProcessor) Run(ctx context.Context) {
log.Debug().Msg("last-rendered: queue runner running")
defer log.Debug().Msg("last-rendered: queue runner shutting down")
for {
select {
case <-ctx.Done():
return
case payload := <-lrp.queue:
lrp.processImage(payload)
}
}
}
// QueueImage queues an image for processing.
// Returns `ErrQueueFull` if there is no more space in the queue for new images.
func (lrp *LastRenderedProcessor) QueueImage(payload Payload) error {
logger := payload.sublogger(log.Logger)
select {
case lrp.queue <- payload:
logger.Debug().Msg("last-rendered: queued image for processing")
return nil
default:
logger.Debug().Msg("last-rendered: unable to queue image for processing")
return ErrQueueFull
}
}
// processImage down-scales the image to a few thumbnails for presentation in
// the web interface, and stores those in a job-specific directory.
//
// Because this is intended as internal queue-processing function, errors are
// logged but not returned.
func (lrp *LastRenderedProcessor) processImage(payload Payload) {
jobDir := lrp.storage.ForJob(payload.JobUUID)
logger := log.With().Str("jobDir", jobDir).Logger()
logger = payload.sublogger(logger)
// Decode the image.
image, err := decodeImage(payload)
if err != nil {
logger.Error().Err(err).Msg("last-rendered: unable to decode image")
return
}
// Generate the thumbnails.
for _, spec := range thumbnails {
thumbLogger := spec.sublogger(logger)
thumbLogger.Trace().Msg("last-rendered: creating thumbnail")
image = downscaleImage(spec, image)
imgpath := filepath.Join(jobDir, spec.filename)
if err := saveJPEG(imgpath, image); err != nil {
thumbLogger.Error().Err(err).Msg("last-rendered: error saving thumbnail")
break
}
}
// Call the callback, if provided.
if lrp.processingDonecallback != nil {
lrp.processingDonecallback(payload.JobUUID)
}
}
func (p Payload) sublogger(logger zerolog.Logger) zerolog.Logger {
return logger.With().
Str("job", p.JobUUID).
Str("producedByWorker", p.WorkerUUID).
Str("mime", p.MimeType).
Logger()
}
func (spec thumbspec) sublogger(logger zerolog.Logger) zerolog.Logger {
return logger.With().
Int("width", spec.maxWidth).
Int("height", spec.maxHeight).
Str("filename", spec.filename).
Logger()
}

@ -0,0 +1,112 @@
package last_rendered
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"image"
"os"
"path/filepath"
"testing"
"git.blender.org/flamenco/internal/manager/local_storage"
"github.com/stretchr/testify/assert"
)
func TestNew(t *testing.T) {
storage := local_storage.NewNextToExe("lrp")
defer storage.MustErase()
callback := func(string) {}
lrp := New(storage)
assert.Equal(t, lrp.storage, storage)
assert.NotNil(t, lrp.queue)
assert.Nil(t, lrp.processingDonecallback)
lrp.SetCallback(callback)
assert.NotNil(t, lrp.processingDonecallback)
}
func TestQueueImage(t *testing.T) {
// Canary test.
if !assert.Equal(t, queueSize, 3) {
t.Fatalf("queueSize must be 3 for this test, but is %v", queueSize)
}
payload := Payload{
JobUUID: "2205227c-593c-46ac-a0d7-e115d4e80dd4",
MimeType: "image/png",
Image: []byte("PNG file contents"),
}
storage := local_storage.NewNextToExe("lrp")
defer storage.MustErase()
lrp := New(storage)
assert.NoError(t, lrp.QueueImage(payload))
assert.NoError(t, lrp.QueueImage(payload))
assert.NoError(t, lrp.QueueImage(payload))
assert.ErrorIs(t, lrp.QueueImage(payload), ErrQueueFull)
}
func TestProcessImage(t *testing.T) {
// Load the test image. Note that this intentionally has an approximate 21:9
// ratio, whereas the thumbnail specs define a 16:9 ratio.
imgBytes, err := os.ReadFile("last_rendered_test.jpg")
if !assert.NoError(t, err) {
t.FailNow()
}
jobID := "e078438b-c9f5-43e6-9e86-52f8be91dd12"
payload := Payload{
JobUUID: jobID,
MimeType: "image/jpeg",
Image: imgBytes,
}
storage := local_storage.NewNextToExe("lrp")
defer storage.MustErase()
lrp := New(storage)
callbackCount := 0
lrp.SetCallback(func(callbackJobID string) {
assert.Equal(t, jobID, callbackJobID)
callbackCount++
})
// Sanity check: the thumbnails shouldn't exist yet.
jobdir := storage.ForJob(jobID)
assert.NoFileExists(t, filepath.Join(jobdir, "last-rendered.jpg"))
assert.NoFileExists(t, filepath.Join(jobdir, "last-rendered-small.jpg"))
assert.NoFileExists(t, filepath.Join(jobdir, "last-rendered-tiny.jpg"))
lrp.processImage(payload)
// The files should exist now.
assert.FileExists(t, filepath.Join(jobdir, "last-rendered.jpg"))
assert.FileExists(t, filepath.Join(jobdir, "last-rendered-small.jpg"))
assert.FileExists(t, filepath.Join(jobdir, "last-rendered-tiny.jpg"))
assert.Equal(t, callbackCount, 1, "the 'done' callback should be called exactly once")
// Check the sizes, they should match the thumbspec.
assertImageSize := func(spec thumbspec) {
path := filepath.Join(jobdir, spec.filename)
file, err := os.Open(path)
if !assert.NoError(t, err, "thumbnail %s should be openable", spec.filename) {
return
}
img, format, err := image.Decode(file)
if !assert.NoErrorf(t, err, "thumbnail %s should be decodable", spec.filename) {
return
}
assert.Equalf(t, "jpeg", format, "thumbnail %s not written in the expected format", spec.filename)
assert.LessOrEqualf(t, img.Bounds().Dx(), spec.maxWidth, "thumbnail %s has wrong width", spec.filename)
assert.LessOrEqualf(t, img.Bounds().Dy(), spec.maxHeight, "thumbnail %s has wrong height", spec.filename)
}
for _, spec := range thumbnails {
assertImageSize(spec)
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 238 KiB