Manager: implement API endpoint for deleting jobs

Implement the `deleteJob` API endpoint. Calling this endpoint will mark
the job as "deletion requested", after which it's queued for actual
deletion. This makes the API response fast, even when there is a lot of
work to do in the background.

A new background service "job deleter" keeps track of the queue of such
jobs, and performs the actual deletion. It removes:

- Shaman checkout for the job (but see below)
- Manager-local files of the job (task logs, last-rendered images)
- The job itself

The removal is done in the above order, so the job is only removed from the
database if the rest of the removal was succesful.

Shaman checkouts are only removed if the job was submitted with Flamenco
version 3.2. Earlier versions did not record enough information to reliably
do this.
This commit is contained in:
Sybren A. Stüvel 2023-01-04 01:11:09 +01:00
parent 2e5f5ffadd
commit 791d877ff1
19 changed files with 1102 additions and 42 deletions

@ -27,6 +27,7 @@ import (
"git.blender.org/flamenco/internal/manager/api_impl/dummy"
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/job_deleter"
"git.blender.org/flamenco/internal/manager/last_rendered"
"git.blender.org/flamenco/internal/manager/local_storage"
"git.blender.org/flamenco/internal/manager/persistence"
@ -144,6 +145,11 @@ func runFlamencoManager() bool {
// go persist.PeriodicMaintenanceLoop(mainCtx)
timeService := clock.New()
compiler, err := job_compilers.Load(timeService)
if err != nil {
log.Fatal().Err(err).Msg("error loading job compilers")
}
webUpdater := webupdates.New()
localStorage := local_storage.NewNextToExe(configService.Get().LocalManagerStoragePath)
@ -154,8 +160,13 @@ func runFlamencoManager() bool {
lastRender := last_rendered.New(localStorage)
shamanServer := buildShamanServer(configService, isFirstRun)
flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine,
shamanServer, logStorage, webUpdater, lastRender, localStorage, sleepScheduler)
jobDeleter := job_deleter.NewService(persist, localStorage, webUpdater, shamanServer)
flamenco := api_impl.NewFlamenco(
compiler, persist, webUpdater, logStorage, configService,
taskStateMachine, shamanServer, timeService, lastRender,
localStorage, sleepScheduler, jobDeleter)
e := buildWebService(flamenco, persist, ssdp, webUpdater, urls, localStorage)
timeoutChecker := timeout_checker.New(
@ -222,6 +233,13 @@ func runFlamencoManager() bool {
sleepScheduler.Run(mainCtx)
}()
// Run the Job Deleter.
wg.Add(1)
go func() {
defer wg.Done()
jobDeleter.Run(mainCtx)
}()
// Log the URLs last, hopefully that makes them more visible / encouraging to go to for users.
go func() {
time.Sleep(100 * time.Millisecond)
@ -245,29 +263,6 @@ func runFlamencoManager() bool {
return doRestart
}
func buildFlamencoAPI(
timeService clock.Clock,
configService *config.Service,
persist *persistence.DB,
taskStateMachine *task_state_machine.StateMachine,
shamanServer api_impl.Shaman,
logStorage *task_logs.Storage,
webUpdater *webupdates.BiDirComms,
lastRender *last_rendered.LastRenderedProcessor,
localStorage local_storage.StorageInfo,
sleepScheduler *sleep_scheduler.SleepScheduler,
) *api_impl.Flamenco {
compiler, err := job_compilers.Load(timeService)
if err != nil {
log.Fatal().Err(err).Msg("error loading job compilers")
}
flamenco := api_impl.NewFlamenco(
compiler, persist, webUpdater, logStorage, configService,
taskStateMachine, shamanServer, timeService, lastRender,
localStorage, sleepScheduler)
return flamenco
}
func buildShamanServer(configService *config.Service, isFirstRun bool) api_impl.Shaman {
if isFirstRun {
log.Info().Msg("Not starting Shaman storage service, as this is the first run of Flamenco. Configure the shared storage location first.")

@ -27,6 +27,7 @@ type Flamenco struct {
lastRender LastRendered
localStorage LocalStorage
sleepScheduler WorkerSleepScheduler
jobDeleter JobDeleter
// The task scheduler can be locked to prevent multiple Workers from getting
// the same task. It is also used for certain other queries, like
@ -53,6 +54,7 @@ func NewFlamenco(
lr LastRendered,
localStorage LocalStorage,
wss WorkerSleepScheduler,
jd JobDeleter,
) *Flamenco {
return &Flamenco{
jobCompiler: jc,
@ -66,6 +68,7 @@ func NewFlamenco(
lastRender: lr,
localStorage: localStorage,
sleepScheduler: wss,
jobDeleter: jd,
done: make(chan struct{}),
}

@ -33,3 +33,6 @@ func (ds *DummyShaman) FileStoreCheck(ctx context.Context, checksum string, file
func (ds *DummyShaman) FileStore(ctx context.Context, file io.ReadCloser, checksum string, filesize int64, canDefer bool, originalFilename string) error {
return ErrDummyShaman
}
func (ds *DummyShaman) EraseCheckout(checkoutID string) error {
return ErrDummyShaman
}

@ -15,6 +15,7 @@ import (
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/job_deleter"
"git.blender.org/flamenco/internal/manager/last_rendered"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/sleep_scheduler"
@ -25,7 +26,7 @@ import (
)
// Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler
//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler,JobDeleter
type PersistenceService interface {
StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error
@ -197,6 +198,9 @@ type Shaman interface {
// return early when another client finishes uploading the exact same file, to
// prevent double uploads.
FileStore(ctx context.Context, file io.ReadCloser, checksum string, filesize int64, canDefer bool, originalFilename string) error
// EraseCheckout deletes the symlinks and the directory structure that makes up the checkout.
EraseCheckout(checkoutID string) error
}
var _ Shaman = (*shaman.Server)(nil)
@ -216,3 +220,9 @@ type WorkerSleepScheduler interface {
}
var _ WorkerSleepScheduler = (*sleep_scheduler.SleepScheduler)(nil)
type JobDeleter interface {
QueueJobDeletion(ctx context.Context, job *persistence.Job) error
}
var _ JobDeleter = (*job_deleter.Service)(nil)

@ -160,6 +160,40 @@ func (f *Flamenco) compileSubmittedJob(ctx context.Context, logger zerolog.Logge
return f.jobCompiler.Compile(ctx, submittedJob)
}
// DeleteJob marks the job as "deletion requested" so that the job deletion
// service can actually delete it.
func (f *Flamenco) DeleteJob(e echo.Context, jobID string) error {
logger := requestLogger(e).With().
Str("job", jobID).
Logger()
dbJob, err := f.fetchJob(e, logger, jobID)
if dbJob == nil {
// f.fetchJob already sent a response.
return err
}
logger = logger.With().
Str("currentstatus", string(dbJob.Status)).
Logger()
logger.Info().Msg("job deletion requested")
// All the required info is known, this can keep running even when the client
// disconnects.
ctx := context.Background()
err = f.jobDeleter.QueueJobDeletion(ctx, dbJob)
switch {
case persistence.ErrIsDBBusy(err):
logger.Error().AnErr("cause", err).Msg("database too busy to queue job deletion")
return sendAPIErrorDBBusy(e, "too busy to queue job deletion, try again later")
case err != nil:
logger.Error().AnErr("cause", err).Msg("error queueing job deletion")
return sendAPIError(e, http.StatusInternalServerError, "error queueing job deletion")
default:
return e.NoContent(http.StatusNoContent)
}
}
// SetJobStatus is used by the web interface to change a job's status.
func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error {
logger := requestLogger(e).With().

@ -804,3 +804,31 @@ func TestFetchGlobalLastRenderedInfo(t *testing.T) {
}
}
func TestDeleteJob(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
jobID := "18a9b096-d77e-438c-9be2-74397038298b"
dbJob := persistence.Job{
Model: persistence.Model{ID: 47},
UUID: jobID,
Name: "test job",
Status: api.JobStatusFailed,
Settings: persistence.StringInterfaceMap{},
Metadata: persistence.StringStringMap{},
}
// Set up expectations.
echoCtx := mf.prepareMockedRequest(nil)
mf.persistence.EXPECT().FetchJob(moremock.ContextWithDeadline(), jobID).Return(&dbJob, nil)
mf.jobDeleter.EXPECT().QueueJobDeletion(gomock.Any(), &dbJob)
// Do the call.
err := mf.flamenco.DeleteJob(echoCtx, jobID)
assert.NoError(t, err)
assertResponseNoContent(t, echoCtx)
}

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler)
// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler,JobDeleter)
// Package mocks is a generated GoMock package.
package mocks
@ -979,6 +979,20 @@ func (mr *MockShamanMockRecorder) Checkout(arg0, arg1 interface{}) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Checkout", reflect.TypeOf((*MockShaman)(nil).Checkout), arg0, arg1)
}
// EraseCheckout mocks base method.
func (m *MockShaman) EraseCheckout(arg0 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EraseCheckout", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// EraseCheckout indicates an expected call of EraseCheckout.
func (mr *MockShamanMockRecorder) EraseCheckout(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EraseCheckout", reflect.TypeOf((*MockShaman)(nil).EraseCheckout), arg0)
}
// FileStore mocks base method.
func (m *MockShaman) FileStore(arg0 context.Context, arg1 io.ReadCloser, arg2 string, arg3 int64, arg4 bool, arg5 string) error {
m.ctrl.T.Helper()
@ -1219,3 +1233,40 @@ func (mr *MockWorkerSleepSchedulerMockRecorder) WorkerStatus(arg0, arg1 interfac
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerStatus", reflect.TypeOf((*MockWorkerSleepScheduler)(nil).WorkerStatus), arg0, arg1)
}
// MockJobDeleter is a mock of JobDeleter interface.
type MockJobDeleter struct {
ctrl *gomock.Controller
recorder *MockJobDeleterMockRecorder
}
// MockJobDeleterMockRecorder is the mock recorder for MockJobDeleter.
type MockJobDeleterMockRecorder struct {
mock *MockJobDeleter
}
// NewMockJobDeleter creates a new mock instance.
func NewMockJobDeleter(ctrl *gomock.Controller) *MockJobDeleter {
mock := &MockJobDeleter{ctrl: ctrl}
mock.recorder = &MockJobDeleterMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockJobDeleter) EXPECT() *MockJobDeleterMockRecorder {
return m.recorder
}
// QueueJobDeletion mocks base method.
func (m *MockJobDeleter) QueueJobDeletion(arg0 context.Context, arg1 *persistence.Job) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "QueueJobDeletion", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// QueueJobDeletion indicates an expected call of QueueJobDeletion.
func (mr *MockJobDeleterMockRecorder) QueueJobDeletion(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueJobDeletion", reflect.TypeOf((*MockJobDeleter)(nil).QueueJobDeletion), arg0, arg1)
}

@ -36,6 +36,7 @@ type mockedFlamenco struct {
lastRender *mocks.MockLastRendered
localStorage *mocks.MockLocalStorage
sleepScheduler *mocks.MockWorkerSleepScheduler
jobDeleter *mocks.MockJobDeleter
// Place for some tests to store a temporary directory.
tempdir string
@ -52,6 +53,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
lr := mocks.NewMockLastRendered(mockCtrl)
localStore := mocks.NewMockLocalStorage(mockCtrl)
wss := mocks.NewMockWorkerSleepScheduler(mockCtrl)
jd := mocks.NewMockJobDeleter(mockCtrl)
clock := clock.NewMock()
mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00")
@ -60,7 +62,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
}
clock.Set(mockedNow)
f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore, wss)
f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore, wss, jd)
return mockedFlamenco{
flamenco: f,
@ -75,6 +77,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
lastRender: lr,
localStorage: localStore,
sleepScheduler: wss,
jobDeleter: jd,
}
}

@ -0,0 +1,53 @@
package job_deleter
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"git.blender.org/flamenco/internal/manager/local_storage"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/webupdates"
"git.blender.org/flamenco/pkg/api"
"git.blender.org/flamenco/pkg/shaman"
)
// Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/job_deleter PersistenceService,Storage,ChangeBroadcaster,Shaman
type PersistenceService interface {
FetchJob(ctx context.Context, jobUUID string) (*persistence.Job, error)
RequestJobDeletion(ctx context.Context, j *persistence.Job) error
// FetchJobsDeletionRequested returns the UUIDs of to-be-deleted jobs.
FetchJobsDeletionRequested(ctx context.Context) ([]string, error)
DeleteJob(ctx context.Context, jobUUID string) error
}
// PersistenceService should be a subset of persistence.DB
var _ PersistenceService = (*persistence.DB)(nil)
type Storage interface {
// RemoveJobStorage removes from disk the directory for storing job-related files.
RemoveJobStorage(ctx context.Context, jobUUID string) error
}
var _ Storage = (*local_storage.StorageInfo)(nil)
type ChangeBroadcaster interface {
// BroadcastJobUpdate sends the job update to SocketIO clients.
BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
type Shaman interface {
// IsEnabled returns whether this Shaman service is enabled or not.
IsEnabled() bool
// EraseCheckout deletes the symlinks and the directory structure that makes up the checkout.
EraseCheckout(checkoutID string) error
}
var _ Shaman = (*shaman.Server)(nil)

@ -0,0 +1,180 @@
// package job_deleter has functionality to delete jobs.
//
// Requesting deletion marks the job as "deletion requested" in the database.
// This is relatively fast, and persistent. After this, the job is queued for
// actual deletion by a different goroutine.
//
// At startup of the service the database is inspected and still-pending
// deletion requests are queued.
//
// SPDX-License-Identifier: GPL-3.0-or-later
package job_deleter
import (
"context"
"errors"
"fmt"
"time"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/shaman"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
// jobDeletionQueueSize determines how many job deletion requests can be kept in
// memory at a time. This is variable to allow unit testing with lower limits.
var jobDeletionQueueSize = defaultJobDeletionQueueSize
const defaultJobDeletionQueueSize = 100
// Service can mark jobs as "deletion requested", as well as delete those jobs
// in a background goroutine.
type Service struct {
// Injected dependencies.
persist PersistenceService
storage Storage
changeBroadcaster ChangeBroadcaster
shaman Shaman
queue chan string // Job UUIDs to process.
}
// NewService constructs a new job deletion service.
// `shaman` can be nil if Shaman checkouts shouldn't be erased.
func NewService(
persist PersistenceService,
storage Storage,
changeBroadcaster ChangeBroadcaster,
shaman Shaman,
) *Service {
return &Service{
persist: persist,
storage: storage,
changeBroadcaster: changeBroadcaster,
shaman: shaman,
queue: make(chan string, jobDeletionQueueSize),
}
}
func (s *Service) QueueJobDeletion(ctx context.Context, job *persistence.Job) error {
logger := log.With().Str("job", job.UUID).Logger()
logger.Info().Msg("job deleter: queueing job for deletion")
err := s.persist.RequestJobDeletion(ctx, job)
if err != nil {
return fmt.Errorf("requesting job deletion: %w", err)
}
// Let the Run() goroutine know this job is ready for deletion.
select {
case s.queue <- job.UUID:
logger.Debug().Msg("job deleter: job succesfully queued for deletion")
case <-time.After(100 * time.Millisecond):
logger.Debug().Msg("job deleter: job deletion queue is full")
}
return nil
}
// Run processes the queue of deletion requests. It starts by building up a
// queue of still-pending job deletions.
func (s *Service) Run(ctx context.Context) {
s.queuePendingDeletions(ctx)
log.Debug().Msg("job deleter: running")
defer log.Debug().Msg("job deleter: shutting down")
for {
select {
case <-ctx.Done():
return
case jobUUID := <-s.queue:
s.deleteJob(ctx, jobUUID)
case <-time.After(1 * time.Minute):
// Inspect the database to see if there was anything marked for deletion
// without getting into our queue. This can happen when lots of jobs are
// queued in quick succession, as then the queue channel gets full.
if len(s.queue) == 0 {
s.queuePendingDeletions(ctx)
}
}
}
}
func (s *Service) queuePendingDeletions(ctx context.Context) {
log.Debug().Msg("job deleter: finding pending deletions")
jobUUIDs, err := s.persist.FetchJobsDeletionRequested(ctx)
if err != nil {
log.Warn().AnErr("cause", err).Msg("job deleter: could not find jobs to be deleted in database")
return
}
for _, jobUUID := range jobUUIDs {
select {
case s.queue <- jobUUID:
log.Debug().Str("job", jobUUID).Msg("job deleter: job queued for deletion")
case <-time.After(100 * time.Millisecond):
log.Info().Msg("job deleter: job deletion queue is full")
break
}
}
}
func (s *Service) deleteJob(ctx context.Context, jobUUID string) error {
logger := log.With().Str("job", jobUUID).Logger()
err := s.deleteShamanCheckout(ctx, logger, jobUUID)
if err != nil {
return err
}
logger.Info().Msg("job deleter: removing logs, last-rendered images, etc.")
if err := s.storage.RemoveJobStorage(ctx, jobUUID); err != nil {
logger.Error().Err(err).Msg("job deleter: error removing job logs, job deletion aborted")
return err
}
logger.Info().Msg("job deleter: removing job from database")
if err := s.persist.DeleteJob(ctx, jobUUID); err != nil {
logger.Error().Err(err).Msg("job deleter: unable to remove job from database")
return err
}
// TODO: broadcast that this job was deleted.
logger.Info().Msg("job deleter: job removal complete")
return nil
}
func (s *Service) deleteShamanCheckout(ctx context.Context, logger zerolog.Logger, jobUUID string) error {
if !s.shaman.IsEnabled() {
logger.Debug().Msg("job deleter: Shaman not enabled, skipping job file deletion")
return nil
}
// To erase the Shaman checkout we need more info than just its UUID.
dbJob, err := s.persist.FetchJob(ctx, jobUUID)
if err != nil {
return fmt.Errorf("unable to fetch job from database: %w", err)
}
checkoutID := dbJob.Storage.ShamanCheckoutID
if checkoutID == "" {
logger.Info().Msg("job deleter: job was not created with Shaman (or before Flamenco v3.2), skipping job file deletion")
return nil
}
err = s.shaman.EraseCheckout(checkoutID)
switch {
case errors.Is(err, shaman.ErrDoesNotExist):
logger.Info().Msg("job deleter: Shaman checkout directory does not exist, ignoring")
return nil
case err != nil:
logger.Info().Err(err).Msg("job deleter: Shaman checkout directory could not be erased")
return err
}
return nil
}

@ -0,0 +1,193 @@
package job_deleter
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"testing"
"git.blender.org/flamenco/internal/manager/job_deleter/mocks"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/shaman"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)
type JobDeleterMocks struct {
persist *mocks.MockPersistenceService
storage *mocks.MockStorage
broadcaster *mocks.MockChangeBroadcaster
shaman *mocks.MockShaman
ctx context.Context
cancel context.CancelFunc
}
func TestQueueJobDeletion(t *testing.T) {
s, finish, mocks := jobDeleterTestFixtures(t)
defer finish()
job1 := &persistence.Job{UUID: "2f7d910f-08a6-4b0f-8ecb-b3946939ed1b"}
mocks.persist.EXPECT().RequestJobDeletion(mocks.ctx, job1)
assert.NoError(t, s.QueueJobDeletion(mocks.ctx, job1))
// Call twice more to overflow the queue.
job2 := &persistence.Job{UUID: "e8fbe41c-ed24-46df-ba63-8d4f5524071b"}
mocks.persist.EXPECT().RequestJobDeletion(mocks.ctx, job2)
assert.NoError(t, s.QueueJobDeletion(mocks.ctx, job2))
job3 := &persistence.Job{UUID: "deeab6ba-02cd-42c0-b7bc-2367a2f04c7d"}
mocks.persist.EXPECT().RequestJobDeletion(mocks.ctx, job3)
assert.NoError(t, s.QueueJobDeletion(mocks.ctx, job3))
if assert.Len(t, s.queue, 2, "the first two job UUID should be queued") {
assert.Equal(t, job1.UUID, <-s.queue)
assert.Equal(t, job2.UUID, <-s.queue)
}
}
func TestQueuePendingDeletions(t *testing.T) {
s, finish, mocks := jobDeleterTestFixtures(t)
defer finish()
// Queue one more job than fits.
job1 := "aa420164-926a-45d5-ae8b-510ff3d2cd4d"
job2 := "e5feadee-999e-48c2-853d-9db94e7623b0"
job3 := "8516ac60-787c-411e-80a7-026456034da4"
mocks.persist.EXPECT().
FetchJobsDeletionRequested(mocks.ctx).
Return([]string{job1, job2, job3}, nil)
s.queuePendingDeletions(mocks.ctx)
if assert.Len(t, s.queue, 2, "the first two job UUIDs should be queued") {
assert.Equal(t, job1, <-s.queue)
assert.Equal(t, job2, <-s.queue)
}
}
func TestQueuePendingDeletionsUnhappy(t *testing.T) {
s, finish, mocks := jobDeleterTestFixtures(t)
defer finish()
// Any error fetching the deletion-requested jobs should just be logged, and
// not cause any issues.
mocks.persist.EXPECT().
FetchJobsDeletionRequested(mocks.ctx).
Return(nil, errors.New("mocked DB failure"))
s.queuePendingDeletions(mocks.ctx)
assert.Len(t, s.queue, 0)
}
func TestDeleteJobWithoutShaman(t *testing.T) {
s, finish, mocks := jobDeleterTestFixtures(t)
defer finish()
jobUUID := "2f7d910f-08a6-4b0f-8ecb-b3946939ed1b"
mocks.shaman.EXPECT().IsEnabled().Return(false).AnyTimes()
mocks.persist.EXPECT().
FetchJobsDeletionRequested(mocks.ctx).
Return([]string{jobUUID}, nil).
AnyTimes()
// Mock log storage deletion failure. This should prevent the deletion from the database.
mocks.storage.EXPECT().
RemoveJobStorage(mocks.ctx, jobUUID).
Return(errors.New("intended log file deletion failure"))
assert.Error(t, s.deleteJob(mocks.ctx, jobUUID))
// Mock that log storage deletion is ok, but database is not.
mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID)
mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID).
Return(errors.New("mocked DB error"))
assert.Error(t, s.deleteJob(mocks.ctx, jobUUID))
// Mock that everything went OK.
mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID)
mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID)
// TODO: mocks.broadcaster.EXPECT().BroadcastJobUpdate(...)
assert.NoError(t, s.deleteJob(mocks.ctx, jobUUID))
}
func TestDeleteJobWithShaman(t *testing.T) {
s, finish, mocks := jobDeleterTestFixtures(t)
defer finish()
jobUUID := "2f7d910f-08a6-4b0f-8ecb-b3946939ed1b"
mocks.shaman.EXPECT().IsEnabled().Return(true).AnyTimes()
mocks.persist.EXPECT().
FetchJobsDeletionRequested(mocks.ctx).
Return([]string{jobUUID}, nil).
AnyTimes()
shamanCheckoutID := "010_0431_lighting"
dbJob := persistence.Job{
UUID: jobUUID,
Name: "сцена/shot/010_0431_lighting",
Storage: persistence.JobStorageInfo{
ShamanCheckoutID: shamanCheckoutID,
},
}
mocks.persist.EXPECT().FetchJob(mocks.ctx, jobUUID).Return(&dbJob, nil).AnyTimes()
// Mock that Shaman deletion failed. The rest of the deletion should be
// blocked by this.
mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID).Return(errors.New("mocked failure"))
assert.Error(t, s.deleteJob(mocks.ctx, jobUUID))
// Mock that Shaman deletion couldn't happen because the checkout dir doesn't
// exist. The rest of the deletion should continue.
mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID).Return(shaman.ErrDoesNotExist)
// Mock log storage deletion failure. This should prevent the deletion from the database.
mocks.storage.EXPECT().
RemoveJobStorage(mocks.ctx, jobUUID).
Return(errors.New("intended log file deletion failure"))
assert.Error(t, s.deleteJob(mocks.ctx, jobUUID))
// Mock that log storage deletion is ok, but database is not.
mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID)
mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID)
mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID).
Return(errors.New("mocked DB error"))
assert.Error(t, s.deleteJob(mocks.ctx, jobUUID))
// Mock that everything went OK.
mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID)
mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID)
mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID)
// TODO: mocks.broadcaster.EXPECT().BroadcastJobUpdate(...)
assert.NoError(t, s.deleteJob(mocks.ctx, jobUUID))
}
func jobDeleterTestFixtures(t *testing.T) (*Service, func(), *JobDeleterMocks) {
mockCtrl := gomock.NewController(t)
mocks := &JobDeleterMocks{
persist: mocks.NewMockPersistenceService(mockCtrl),
storage: mocks.NewMockStorage(mockCtrl),
broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl),
shaman: mocks.NewMockShaman(mockCtrl),
}
ctx, cancel := context.WithCancel(context.Background())
mocks.ctx = ctx
mocks.cancel = cancel
// This should be called at the end of each unit test.
finish := func() {
mocks.cancel()
jobDeletionQueueSize = defaultJobDeletionQueueSize
}
jobDeletionQueueSize = 2
s := NewService(
mocks.persist,
mocks.storage,
mocks.broadcaster,
mocks.shaman,
)
return s, finish, mocks
}

@ -0,0 +1,218 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/job_deleter (interfaces: PersistenceService,Storage,ChangeBroadcaster,Shaman)
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
persistence "git.blender.org/flamenco/internal/manager/persistence"
api "git.blender.org/flamenco/pkg/api"
gomock "github.com/golang/mock/gomock"
)
// MockPersistenceService is a mock of PersistenceService interface.
type MockPersistenceService struct {
ctrl *gomock.Controller
recorder *MockPersistenceServiceMockRecorder
}
// MockPersistenceServiceMockRecorder is the mock recorder for MockPersistenceService.
type MockPersistenceServiceMockRecorder struct {
mock *MockPersistenceService
}
// NewMockPersistenceService creates a new mock instance.
func NewMockPersistenceService(ctrl *gomock.Controller) *MockPersistenceService {
mock := &MockPersistenceService{ctrl: ctrl}
mock.recorder = &MockPersistenceServiceMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder {
return m.recorder
}
// DeleteJob mocks base method.
func (m *MockPersistenceService) DeleteJob(arg0 context.Context, arg1 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteJob", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteJob indicates an expected call of DeleteJob.
func (mr *MockPersistenceServiceMockRecorder) DeleteJob(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteJob", reflect.TypeOf((*MockPersistenceService)(nil).DeleteJob), arg0, arg1)
}
// FetchJob mocks base method.
func (m *MockPersistenceService) FetchJob(arg0 context.Context, arg1 string) (*persistence.Job, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchJob", arg0, arg1)
ret0, _ := ret[0].(*persistence.Job)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchJob indicates an expected call of FetchJob.
func (mr *MockPersistenceServiceMockRecorder) FetchJob(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJob", reflect.TypeOf((*MockPersistenceService)(nil).FetchJob), arg0, arg1)
}
// FetchJobsDeletionRequested mocks base method.
func (m *MockPersistenceService) FetchJobsDeletionRequested(arg0 context.Context) ([]string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchJobsDeletionRequested", arg0)
ret0, _ := ret[0].([]string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchJobsDeletionRequested indicates an expected call of FetchJobsDeletionRequested.
func (mr *MockPersistenceServiceMockRecorder) FetchJobsDeletionRequested(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsDeletionRequested", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsDeletionRequested), arg0)
}
// RequestJobDeletion mocks base method.
func (m *MockPersistenceService) RequestJobDeletion(arg0 context.Context, arg1 *persistence.Job) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RequestJobDeletion", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// RequestJobDeletion indicates an expected call of RequestJobDeletion.
func (mr *MockPersistenceServiceMockRecorder) RequestJobDeletion(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestJobDeletion", reflect.TypeOf((*MockPersistenceService)(nil).RequestJobDeletion), arg0, arg1)
}
// MockStorage is a mock of Storage interface.
type MockStorage struct {
ctrl *gomock.Controller
recorder *MockStorageMockRecorder
}
// MockStorageMockRecorder is the mock recorder for MockStorage.
type MockStorageMockRecorder struct {
mock *MockStorage
}
// NewMockStorage creates a new mock instance.
func NewMockStorage(ctrl *gomock.Controller) *MockStorage {
mock := &MockStorage{ctrl: ctrl}
mock.recorder = &MockStorageMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockStorage) EXPECT() *MockStorageMockRecorder {
return m.recorder
}
// RemoveJobStorage mocks base method.
func (m *MockStorage) RemoveJobStorage(arg0 context.Context, arg1 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RemoveJobStorage", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// RemoveJobStorage indicates an expected call of RemoveJobStorage.
func (mr *MockStorageMockRecorder) RemoveJobStorage(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveJobStorage", reflect.TypeOf((*MockStorage)(nil).RemoveJobStorage), arg0, arg1)
}
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
type MockChangeBroadcaster struct {
ctrl *gomock.Controller
recorder *MockChangeBroadcasterMockRecorder
}
// MockChangeBroadcasterMockRecorder is the mock recorder for MockChangeBroadcaster.
type MockChangeBroadcasterMockRecorder struct {
mock *MockChangeBroadcaster
}
// NewMockChangeBroadcaster creates a new mock instance.
func NewMockChangeBroadcaster(ctrl *gomock.Controller) *MockChangeBroadcaster {
mock := &MockChangeBroadcaster{ctrl: ctrl}
mock.recorder = &MockChangeBroadcasterMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder {
return m.recorder
}
// BroadcastJobUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastJobUpdate(arg0 api.SocketIOJobUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastJobUpdate", arg0)
}
// BroadcastJobUpdate indicates an expected call of BroadcastJobUpdate.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastJobUpdate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastJobUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastJobUpdate), arg0)
}
// MockShaman is a mock of Shaman interface.
type MockShaman struct {
ctrl *gomock.Controller
recorder *MockShamanMockRecorder
}
// MockShamanMockRecorder is the mock recorder for MockShaman.
type MockShamanMockRecorder struct {
mock *MockShaman
}
// NewMockShaman creates a new mock instance.
func NewMockShaman(ctrl *gomock.Controller) *MockShaman {
mock := &MockShaman{ctrl: ctrl}
mock.recorder = &MockShamanMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockShaman) EXPECT() *MockShamanMockRecorder {
return m.recorder
}
// EraseCheckout mocks base method.
func (m *MockShaman) EraseCheckout(arg0 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EraseCheckout", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// EraseCheckout indicates an expected call of EraseCheckout.
func (mr *MockShamanMockRecorder) EraseCheckout(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EraseCheckout", reflect.TypeOf((*MockShaman)(nil).EraseCheckout), arg0)
}
// IsEnabled mocks base method.
func (m *MockShaman) IsEnabled() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsEnabled")
ret0, _ := ret[0].(bool)
return ret0
}
// IsEnabled indicates an expected call of IsEnabled.
func (mr *MockShamanMockRecorder) IsEnabled() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsEnabled", reflect.TypeOf((*MockShaman)(nil).IsEnabled))
}

@ -3,6 +3,8 @@ package local_storage
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
@ -37,21 +39,44 @@ func (si StorageInfo) ForJob(jobUUID string) string {
return filepath.Join(si.rootPath, relPathForJob(jobUUID))
}
// Erase removes the entire storage directory from disk.
func (si StorageInfo) Erase() error {
// A few safety measures before erasing the planet.
if si.rootPath == "" {
return fmt.Errorf("%+v.Erase(): refusing to erase empty directory", si)
}
if crosspath.IsRoot(si.rootPath) {
return fmt.Errorf("%+v.Erase(): refusing to erase root directory", si)
}
if home, found := os.LookupEnv("HOME"); found && home == si.rootPath {
return fmt.Errorf("%+v.Erase(): refusing to erase home directory %s", si, home)
func (si StorageInfo) RemoveJobStorage(ctx context.Context, jobUUID string) error {
path := si.ForJob(jobUUID)
log.Info().Str("path", path).Msg("erasing manager-local job storage directory")
if err := removeDirectory(path); err != nil {
return fmt.Errorf("unable to erase %q: %w", path, err)
}
log.Debug().Str("path", si.rootPath).Msg("erasing storage directory")
return os.RemoveAll(si.rootPath)
// The path should be in some intermediate path
// (`root/intermediate/job-uuid`), which might need removing if it's empty.
intermediate := filepath.Dir(path)
if intermediate == si.rootPath {
// There is no intermediate dir for jobless situations. Or maybe the rest of
// the code changed since this function was written. Regardless of the
// reason, this function shouldn't remove the local storage root.
return nil
}
if err := os.Remove(intermediate); err != nil {
// This is absolutely fine, as it'll happen when the directory is not empty
// and thus shouldn't be removed anyway.
log.Trace().
Str("job", jobUUID).
Str("path", intermediate).
AnErr("cause", err).
Msg("RemoveJobStorage() could not remove intermediate directory, this is fine")
}
return nil
}
// Erase removes the entire storage directory from disk.
func (si StorageInfo) Erase() error {
log.Info().Str("path", si.rootPath).Msg("erasing storage directory")
if err := removeDirectory(si.rootPath); err != nil {
return fmt.Errorf("unable to erase %q: %w", si.rootPath, err)
}
return nil
}
// MustErase removes the entire storage directory from disk, and panics if it
@ -97,3 +122,20 @@ func getSuitableStorageRoot() string {
// Fall back to "." if all else fails.
return "."
}
// removeDirectory removes the given path, but only if it is not a root path and
// not the user's home directory.
func removeDirectory(path string) error {
if path == "" {
return fmt.Errorf("refusing to erase empty directory path (%q)", path)
}
if crosspath.IsRoot(path) {
return errors.New("refusing to erase root directory")
}
if home, found := os.LookupEnv("HOME"); found && home == path {
return errors.New("refusing to erase home directory")
}
log.Debug().Str("path", path).Msg("erasing directory")
return os.RemoveAll(path)
}

@ -3,12 +3,14 @@ package local_storage
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"os"
"path/filepath"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewNextToExe(t *testing.T) {
@ -56,3 +58,51 @@ func TestErase(t *testing.T) {
assert.NoError(t, si.Erase())
assert.NoDirExists(t, si.rootPath, "Erase() should erase the root path, and everything in it")
}
func TestRemoveJobStorage(t *testing.T) {
si := NewNextToExe("task-logs")
jobUUID := "08e126ef-d773-468b-8bab-19a8213cf2ff"
jobPath := si.ForJob(jobUUID)
assert.NoDirExists(t, jobPath, "getting a path should not create it")
assert.NoError(t, os.MkdirAll(jobPath, os.ModePerm))
assert.DirExists(t, jobPath, "os.MkdirAll is borked")
taskFile := filepath.Join(jobPath, "task-07c33f32-b345-4da9-8834-9c91532cd97e.txt")
assert.NoError(t, os.WriteFile(taskFile, []byte("dummy task log"), 0o777))
assert.NoError(t, si.RemoveJobStorage(context.Background(), jobUUID))
assert.NoDirExists(t, jobPath, "RemoveJobStorage() should erase the entire job-specific storage dir, and everything in it")
// See if the test assumption (that job dir is in another sub-dir of the root,
// `root/job-xxyy/xxyyzzblablabla`) still holds.
intermediate := filepath.Dir(jobPath)
require.NotEqual(t, si.rootPath, intermediate,
"Expected job path %s to be in child directory of root %s", jobPath, si.rootPath)
assert.NoDirExists(t, intermediate, "RemoveJobStorage() should remove empty intermediate paths")
assert.DirExists(t, si.rootPath, "RemoveJobStorage() should keep the root path")
}
func TestRemoveJobStorageWithoutJobUUID(t *testing.T) {
si := NewNextToExe("task-logs")
jobPath := si.ForJob("")
assert.NoDirExists(t, jobPath, "getting a path should not create it")
assert.NoError(t, os.MkdirAll(jobPath, os.ModePerm))
assert.DirExists(t, jobPath, "os.MkdirAll is borked")
taskFile := filepath.Join(jobPath, "task-07c33f32-b345-4da9-8834-9c91532cd97e.txt")
assert.NoError(t, os.WriteFile(taskFile, []byte("dummy task log"), 0o777))
assert.NoError(t, si.RemoveJobStorage(context.Background(), ""))
assert.NoDirExists(t, jobPath, "RemoveJobStorage() should erase the entire job-specific storage dir, and everything in it")
// See if the test assumption (that a jobless dir is directly inside the root) still holds.
intermediate := filepath.Dir(jobPath)
require.Equal(t, si.rootPath, intermediate,
"Expected job path %s to be a direct child of root %s", jobPath, si.rootPath)
assert.DirExists(t, si.rootPath, "RemoveJobStorage() should keep the root path")
}

@ -4,6 +4,7 @@ package persistence
import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"errors"
@ -31,12 +32,19 @@ type Job struct {
Settings StringInterfaceMap `gorm:"type:jsonb"`
Metadata StringStringMap `gorm:"type:jsonb"`
DeleteRequestedAt sql.NullTime
Storage JobStorageInfo `gorm:"embedded;embeddedPrefix:storage_"`
}
type StringInterfaceMap map[string]interface{}
type StringStringMap map[string]string
// DeleteRequested returns whether deletion of this job was requested.
func (j *Job) DeleteRequested() bool {
return j.DeleteRequestedAt.Valid
}
// JobStorageInfo contains info about where the job files are stored. It is
// intended to be used when removing a job, which may include the removal of its
// files.
@ -202,7 +210,8 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au
// FetchJob fetches a single job, without fetching its tasks.
func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
dbJob := Job{}
findResult := db.gormDB.WithContext(ctx).First(&dbJob, "uuid = ?", jobUUID)
findResult := db.gormDB.WithContext(ctx).
First(&dbJob, "uuid = ?", jobUUID)
if findResult.Error != nil {
return nil, jobError(findResult.Error, "fetching job")
}
@ -222,6 +231,41 @@ func (db *DB) DeleteJob(ctx context.Context, jobUUID string) error {
return nil
}
// RequestJobDeletion sets the job's "DeletionRequestedAt" field to "now".
func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error {
j.DeleteRequestedAt.Time = db.gormDB.NowFunc()
j.DeleteRequestedAt.Valid = true
tx := db.gormDB.WithContext(ctx).
Model(j).
Updates(Job{DeleteRequestedAt: j.DeleteRequestedAt})
if tx.Error != nil {
return jobError(tx.Error, "deleting job")
}
return nil
}
func (db *DB) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) {
var jobs []*Job
tx := db.gormDB.WithContext(ctx).
Model(&Job{}).
Select("UUID").
Where("delete_requested_at is not NULL").
Order("delete_requested_at").
Scan(&jobs)
if tx.Error != nil {
return nil, jobError(tx.Error, "fetching jobs marked for deletion")
}
uuids := make([]string, len(jobs))
for i := range jobs {
uuids[i] = jobs[i].UUID
}
return uuids, nil
}
func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*Job, error) {
var jobs []*Job

@ -101,6 +101,84 @@ func TestDeleteJob(t *testing.T) {
tx = db.gormDB.Model(&Task{}).Count(&numTasks)
assert.NoError(t, tx.Error)
assert.Equal(t, int64(0), numTasks, "tasks should have been deleted along with their job")
// TODO: test that blocklist entries and task dependencies are gone too.
}
func TestRequestJobDeletion(t *testing.T) {
ctx, close, db, job1, authoredJob1 := jobTasksTestFixtures(t)
defer close()
// Create another job, to see it's not touched by deleting the first one.
authoredJob2 := duplicateJobAndTasks(authoredJob1)
persistAuthoredJob(t, ctx, db, authoredJob2)
mockNow := time.Now()
db.gormDB.NowFunc = func() time.Time { return mockNow }
err := db.RequestJobDeletion(ctx, job1)
assert.NoError(t, err)
assert.True(t, job1.DeleteRequested())
assert.True(t, job1.DeleteRequestedAt.Valid)
assert.Equal(t, job1.DeleteRequestedAt.Time, mockNow)
dbJob1, err := db.FetchJob(ctx, job1.UUID)
assert.NoError(t, err)
assert.True(t, job1.DeleteRequested())
assert.True(t, dbJob1.DeleteRequestedAt.Valid)
assert.WithinDuration(t, mockNow, dbJob1.DeleteRequestedAt.Time, time.Second)
// Other jobs shouldn't be touched.
dbJob2, err := db.FetchJob(ctx, authoredJob2.JobID)
assert.NoError(t, err)
assert.False(t, dbJob2.DeleteRequested())
assert.False(t, dbJob2.DeleteRequestedAt.Valid)
}
func TestFetchJobsDeletionRequested(t *testing.T) {
ctx, close, db, job1, authoredJob1 := jobTasksTestFixtures(t)
defer close()
now := time.Now()
db.gormDB.NowFunc = func() time.Time { return now }
authoredJob2 := duplicateJobAndTasks(authoredJob1)
job2 := persistAuthoredJob(t, ctx, db, authoredJob2)
authoredJob3 := duplicateJobAndTasks(authoredJob1)
job3 := persistAuthoredJob(t, ctx, db, authoredJob3)
authoredJob4 := duplicateJobAndTasks(authoredJob1)
persistAuthoredJob(t, ctx, db, authoredJob4)
// Ensure different requests get different timestamps,
// out of chronological order.
timestamps := []time.Time{
// timestamps for 'delete requested at' and 'updated at'
now.Add(-3 * time.Second), now.Add(-3 * time.Second),
now.Add(-1 * time.Second), now.Add(-1 * time.Second),
now.Add(-5 * time.Second), now.Add(-5 * time.Second),
}
currentTimestampIndex := 0
db.gormDB.NowFunc = func() time.Time {
now := timestamps[currentTimestampIndex]
currentTimestampIndex++
return now
}
err := db.RequestJobDeletion(ctx, job1)
assert.NoError(t, err)
err = db.RequestJobDeletion(ctx, job2)
assert.NoError(t, err)
err = db.RequestJobDeletion(ctx, job3)
assert.NoError(t, err)
actualUUIDs, err := db.FetchJobsDeletionRequested(ctx)
assert.NoError(t, err)
assert.Len(t, actualUUIDs, 3, "3 out of 4 jobs were marked for deletion")
// Expect UUIDs in chronological order of deletion requests, so that the
// oldest request is handled first.
expectUUIDs := []string{job3.UUID, job1.UUID, job2.UUID}
assert.Equal(t, expectUUIDs, actualUUIDs)
}
func TestJobHasTasksInStatus(t *testing.T) {

@ -39,6 +39,11 @@ import (
"git.blender.org/flamenco/pkg/shaman/touch"
)
var (
// ErrDoesNotExist is returned by EraseCheckout().
ErrDoesNotExist = errors.New("checkout does not exist")
)
// Manager creates checkouts and provides info about missing files.
type Manager struct {
checkoutBasePath string
@ -161,11 +166,21 @@ func (m *Manager) PrepareCheckout(requestedCheckoutPath string) (ResolvedCheckou
}
// EraseCheckout removes the checkout directory structure identified by the ID.
// Returns ErrDoesNotExist if the checkout with this ID does not exist.
func (m *Manager) EraseCheckout(checkoutID string) error {
checkoutPaths, err := m.pathForCheckout(checkoutID)
if err != nil {
return err
}
_, err = os.Stat(checkoutPaths.absolutePath)
switch {
case err == nil:
break
case errors.Is(err, os.ErrNotExist):
return ErrDoesNotExist
default:
return err
}
logger := log.With().
Str("checkoutPath", checkoutPaths.absolutePath).

@ -23,6 +23,7 @@
package checkout
import (
"context"
"io/ioutil"
"os"
"path/filepath"
@ -30,6 +31,7 @@ import (
"testing"
"time"
"git.blender.org/flamenco/pkg/api"
"git.blender.org/flamenco/pkg/shaman/config"
"git.blender.org/flamenco/pkg/shaman/filestore"
"github.com/stretchr/testify/assert"
@ -97,3 +99,54 @@ func TestPrepareCheckout(t *testing.T) {
assert.NotEqual(t, requestedCheckoutPath, resolved.RelativePath)
assert.True(t, strings.HasPrefix(resolved.RelativePath, requestedCheckoutPath+"-"))
}
func TestEraseCheckout(t *testing.T) {
manager, cleanup := createTestManager()
defer cleanup()
ctx := context.Background()
filestore.LinkTestFileStore(manager.fileStore.BasePath())
// Create a few checkouts to test with.
checkout1 := api.ShamanCheckout{
CheckoutPath: "á hausinn á þér",
Files: []api.ShamanFileSpec{
{Sha: "590c148428d5c35fab3ebad2f3365bb469ab9c531b60831f3e826c472027a0b9", Size: 3367, Path: "subdir/replacer.py"},
{Sha: "80b749c27b2fef7255e7e7b3c2029b03b31299c75ff1f1c72732081c70a713a3", Size: 7488, Path: "feed.py"},
{Sha: "914853599dd2c351ab7b82b219aae6e527e51518a667f0ff32244b0c94c75688", Size: 486, Path: "httpstuff.py"},
{Sha: "d6fc7289b5196cc96748ea72f882a22c39b8833b457fe854ef4c03a01f5db0d3", Size: 7217, Path: "много ликова.py"},
},
}
checkoutID1, err := manager.Checkout(ctx, checkout1)
require.NoError(t, err)
checkout2 := checkout1
checkout2.CheckoutPath = "één ander pad"
checkoutID2, err := manager.Checkout(ctx, checkout2)
require.NoError(t, err)
// Check that removing one works, without deleting the other.
require.NoError(t, manager.EraseCheckout(checkoutID1))
checkoutPath1, err := manager.pathForCheckout(checkoutID1)
require.NoError(t, err)
checkoutPath2, err := manager.pathForCheckout(checkoutID2)
require.NoError(t, err)
assert.NoDirExists(t, checkoutPath1.absolutePath, "actual checkout path should have been erased")
assert.DirExists(t, checkoutPath2.absolutePath, "the other checkout path should have been kept")
assert.DirExists(t, manager.fileStore.StoragePath(), "Shaman storage path should be kept")
// Check that non-checkout paths should be refused.
require.Error(t, manager.EraseCheckout(manager.fileStore.BasePath()))
}
func TestEraseCheckoutNonExisting(t *testing.T) {
manager, cleanup := createTestManager()
defer cleanup()
filestore.LinkTestFileStore(manager.fileStore.BasePath())
// Erasing a non-existing checkout should return a specific error.
require.Error(t, manager.EraseCheckout("não existe"))
}

@ -36,6 +36,8 @@ import (
"github.com/rs/zerolog/log"
)
var ErrDoesNotExist = checkout.ErrDoesNotExist
// Server represents a Shaman Server.
type Server struct {
config config.Config
@ -153,3 +155,8 @@ func (s *Server) FileStore(ctx context.Context, file io.ReadCloser, checksum str
// the caller without relying on types declared in the `fileserver` package?
return err
}
// EraseCheckout deletes the symlinks and the directory structure that makes up the checkout.
func (s *Server) EraseCheckout(checkoutID string) error {
return s.checkoutMan.EraseCheckout(checkoutID)
}