Add CLI utility to recreate tasks of jobs

Due to an issue (which has been fixed in the previous commit), all tasks
in the database were deleted when starting Flamenco. This tool attempts
to recompile the job and recreate its tasks.

The statuses of the tasks are set based on the job status. Basically:

- job active → tasks queued
- job completed → tasks completed
- job cancelled / failed → tasks cancelled
- otherwise → tasks queued

To ensure that the tool is only used to create tasks from scratch, it
refuses to work on a job that still has tasks in the database.
This commit is contained in:
Sybren A. Stüvel 2023-07-10 14:10:15 +02:00
parent 06738b8aa4
commit b58f1e15f1
4 changed files with 350 additions and 63 deletions

1
.gitignore vendored

@ -13,6 +13,7 @@
/flamenco-worker_race
/shaman-checkout-id-setter
/stresser
/job-creator
/addon-packer
flamenco-manager.yaml
flamenco-worker.yaml

@ -69,6 +69,10 @@ flamenco-worker:
stresser:
go build -v ${BUILD_FLAGS} ${PKG}/cmd/stresser
.PHONY: job-creator
job-creator:
go build -v ${BUILD_FLAGS} ${PKG}/cmd/job-creator
addon-packer: cmd/addon-packer/addon-packer.go
go build -v ${BUILD_FLAGS} ${PKG}/cmd/addon-packer

261
cmd/job-creator/main.go Normal file

@ -0,0 +1,261 @@
package main
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"flag"
"io/fs"
"os"
"os/signal"
"runtime"
"syscall"
"time"
"github.com/benbjohnson/clock"
"github.com/mattn/go-colorable"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/internal/appinfo"
"git.blender.org/flamenco/internal/manager/config"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
)
var cliArgs struct {
version bool
jobUUID string
}
func main() {
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
log.Logger = log.Output(output)
log.Info().
Str("version", appinfo.ApplicationVersion).
Str("git", appinfo.ApplicationGitHash).
Str("releaseCycle", appinfo.ReleaseCycle).
Str("os", runtime.GOOS).
Str("arch", runtime.GOARCH).
Msgf("starting %v job compiler", appinfo.ApplicationName)
parseCliArgs()
if cliArgs.version {
return
}
if cliArgs.jobUUID == "" {
log.Fatal().Msg("give me a job UUID to regenerate tasks for")
}
// Load configuration.
configService := config.NewService()
err := configService.Load()
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Error().Err(err).Msg("loading configuration")
}
isFirstRun, err := configService.IsFirstRun()
switch {
case err != nil:
log.Fatal().Err(err).Msg("unable to determine whether this is the first run of Flamenco or not")
case isFirstRun:
log.Info().Msg("This seems to be your first run of Flamenco, this tool won't work.")
return
}
// Construct the services.
persist := openDB(*configService)
defer persist.Close()
timeService := clock.New()
compiler, err := job_compilers.Load(timeService)
if err != nil {
log.Fatal().Err(err).Msg("error loading job compilers")
}
// The main context determines the lifetime of the application. All
// long-running goroutines need to keep an eye on this, and stop their work
// once it closes.
mainCtx, mainCtxCancel := context.WithCancel(context.Background())
defer mainCtxCancel()
installSignalHandler(mainCtxCancel)
recompile(mainCtx, cliArgs.jobUUID, persist, compiler)
}
// recompile regenerates the job's tasks.
func recompile(ctx context.Context, jobUUID string, db *persistence.DB, compiler *job_compilers.Service) {
dbJob, err := db.FetchJob(ctx, jobUUID)
if err != nil {
log.Fatal().Err(err).Msg("could not get job from database")
}
logger := log.With().Str("job", jobUUID).Logger()
logger.Info().Msg("found job")
dbTasks, err := db.FetchTasksOfJob(ctx, dbJob)
if err != nil {
log.Fatal().Err(err).Msg("could not query database for tasks")
}
if len(dbTasks) > 0 {
// This tool has only been tested with jobs that have had their tasks completely lost.
log.Fatal().
Int("numTasks", len(dbTasks)).
Msg("this job still has tasks, this is not a situation this tool should be used in")
}
// Recompile the job.
fakeSubmittedJob := constructSubmittedJob(dbJob)
authoredJob, err := compiler.Compile(ctx, fakeSubmittedJob)
if err != nil {
logger.Fatal().Err(err).Msg("could not recompile job")
}
sanityCheck(logger, dbJob, authoredJob)
// Store the recompiled tasks.
if err := db.StoreAuthoredJobTaks(ctx, dbJob, authoredJob); err != nil {
logger.Fatal().Err(err).Msg("error storing recompiled tasks")
}
logger.Info().Msg("new tasks have been stored")
updateTaskStatuses(ctx, logger, db, dbJob)
logger.Info().Msg("job recompilation seems to have worked out")
}
func constructSubmittedJob(dbJob *persistence.Job) api.SubmittedJob {
fakeSubmittedJob := api.SubmittedJob{
Name: dbJob.Name,
Priority: dbJob.Priority,
SubmitterPlatform: "reconstrutor", // The platform shouldn't matter, as all paths have already been replaced.
Type: dbJob.JobType,
TypeEtag: nil,
Settings: &api.JobSettings{AdditionalProperties: make(map[string]interface{})},
Metadata: &api.JobMetadata{AdditionalProperties: make(map[string]string)},
}
for key, value := range dbJob.Settings {
fakeSubmittedJob.Settings.AdditionalProperties[key] = value
}
for key, value := range dbJob.Metadata {
fakeSubmittedJob.Metadata.AdditionalProperties[key] = value
}
if dbJob.WorkerTag != nil {
fakeSubmittedJob.WorkerTag = &dbJob.WorkerTag.UUID
} else if dbJob.WorkerTagID != nil {
panic("WorkerTagID is set, but WorkerTag is not")
}
return fakeSubmittedJob
}
// Check that the authored job is consistent with the original job.
func sanityCheck(logger zerolog.Logger, expect *persistence.Job, actual *job_compilers.AuthoredJob) {
if actual.Name != expect.Name {
logger.Fatal().
Str("expected", expect.Name).
Str("actual", actual.Name).
Msg("recompilation did not produce expected name")
}
if actual.JobType != expect.JobType {
logger.Fatal().
Str("expected", expect.JobType).
Str("actual", actual.JobType).
Msg("recompilation did not produce expected job type")
}
}
func updateTaskStatuses(ctx context.Context, logger zerolog.Logger, db *persistence.DB, dbJob *persistence.Job) {
logger = logger.With().Str("jobStatus", string(dbJob.Status)).Logger()
// Update the task statuses based on the job status. This is NOT using the
// state machine, as these tasks are not actually going from one state to the
// other. They are just being updated in the database.
taskStatusMap := map[api.JobStatus]api.TaskStatus{
api.JobStatusActive: api.TaskStatusQueued,
api.JobStatusCancelRequested: api.TaskStatusCanceled,
api.JobStatusCanceled: api.TaskStatusCanceled,
api.JobStatusCompleted: api.TaskStatusCompleted,
api.JobStatusFailed: api.TaskStatusCanceled,
api.JobStatusPaused: api.TaskStatusPaused,
api.JobStatusQueued: api.TaskStatusQueued,
api.JobStatusRequeueing: api.TaskStatusQueued,
api.JobStatusUnderConstruction: api.TaskStatusQueued,
}
newTaskStatus, ok := taskStatusMap[dbJob.Status]
if !ok {
logger.Warn().Msg("unknown job status, not touching task statuses")
return
}
logger = logger.With().Str("taskStatus", string(newTaskStatus)).Logger()
err := db.UpdateJobsTaskStatuses(ctx, dbJob, newTaskStatus, "reset task status after job reconstruction")
if err != nil {
logger.Fatal().Msg("could not update task statuses")
}
logger.Info().Msg("task statuses have been updated based on the job status")
}
func parseCliArgs() {
var quiet, debug, trace bool
flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.")
flag.BoolVar(&quiet, "quiet", false, "Only log warning-level and worse.")
flag.BoolVar(&debug, "debug", false, "Enable debug-level logging.")
flag.BoolVar(&trace, "trace", false, "Enable trace-level logging.")
flag.StringVar(&cliArgs.jobUUID, "job", "", "Job UUID to regenerate")
flag.Parse()
var logLevel zerolog.Level
switch {
case trace:
logLevel = zerolog.TraceLevel
case debug:
logLevel = zerolog.DebugLevel
case quiet:
logLevel = zerolog.WarnLevel
default:
logLevel = zerolog.InfoLevel
}
zerolog.SetGlobalLevel(logLevel)
}
// openDB opens the database or dies.
func openDB(configService config.Service) *persistence.DB {
dsn := configService.Get().DatabaseDSN
if dsn == "" {
log.Fatal().Msg("configure the database in flamenco-manager.yaml")
}
dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer dbCtxCancel()
persist, err := persistence.OpenDB(dbCtx, dsn)
if err != nil {
log.Fatal().
Err(err).
Str("dsn", dsn).
Msg("error opening database")
}
return persist
}
// installSignalHandler spawns a goroutine that handles incoming POSIX signals.
func installSignalHandler(cancelFunc context.CancelFunc) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
signal.Notify(signals, syscall.SIGTERM)
go func() {
for signum := range signals {
log.Info().Str("signal", signum.String()).Msg("signal received, shutting down")
cancelFunc()
}
}()
}

@ -162,72 +162,93 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au
return jobError(err, "storing job")
}
uuidToTask := make(map[string]*Task)
for _, authoredTask := range authoredJob.Tasks {
var commands []Command
for _, authoredCommand := range authoredTask.Commands {
commands = append(commands, Command{
Name: authoredCommand.Name,
Parameters: StringInterfaceMap(authoredCommand.Parameters),
})
}
dbTask := Task{
Name: authoredTask.Name,
Type: authoredTask.Type,
UUID: authoredTask.UUID,
Job: &dbJob,
Priority: authoredTask.Priority,
Status: api.TaskStatusQueued,
Commands: commands,
// dependencies are stored below.
}
if err := tx.Create(&dbTask).Error; err != nil {
return taskError(err, "storing task: %v", err)
}
uuidToTask[authoredTask.UUID] = &dbTask
}
// Store the dependencies between tasks.
for _, authoredTask := range authoredJob.Tasks {
if len(authoredTask.Dependencies) == 0 {
continue
}
dbTask, ok := uuidToTask[authoredTask.UUID]
if !ok {
return taskError(nil, "unable to find task %q in the database, even though it was just authored", authoredTask.UUID)
}
deps := make([]*Task, len(authoredTask.Dependencies))
for i, t := range authoredTask.Dependencies {
depTask, ok := uuidToTask[t.UUID]
if !ok {
return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID)
}
deps[i] = depTask
}
dependenciesbatchsize := 1000
for j := 0; j < len(deps); j += dependenciesbatchsize {
end := j + dependenciesbatchsize
if end > len(deps) {
end = len(deps)
}
currentDeps := deps[j:end]
dbTask.Dependencies = currentDeps
tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID)
subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps})
if subQuery.Error != nil {
return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end)
}
}
}
return nil
return db.storeAuthoredJobTaks(ctx, tx, &dbJob, &authoredJob)
})
}
// StoreAuthoredJobTaks is a low-level function that is only used for recreating an existing job's tasks.
// It stores `authoredJob`'s tasks, but attaches them to the already-persisted `job`.
func (db *DB) StoreAuthoredJobTaks(
ctx context.Context,
job *Job,
authoredJob *job_compilers.AuthoredJob,
) error {
tx := db.gormDB.WithContext(ctx)
return db.storeAuthoredJobTaks(ctx, tx, job, authoredJob)
}
func (db *DB) storeAuthoredJobTaks(
ctx context.Context,
tx *gorm.DB,
dbJob *Job,
authoredJob *job_compilers.AuthoredJob,
) error {
uuidToTask := make(map[string]*Task)
for _, authoredTask := range authoredJob.Tasks {
var commands []Command
for _, authoredCommand := range authoredTask.Commands {
commands = append(commands, Command{
Name: authoredCommand.Name,
Parameters: StringInterfaceMap(authoredCommand.Parameters),
})
}
dbTask := Task{
Name: authoredTask.Name,
Type: authoredTask.Type,
UUID: authoredTask.UUID,
Job: dbJob,
Priority: authoredTask.Priority,
Status: api.TaskStatusQueued,
Commands: commands,
// dependencies are stored below.
}
if err := tx.Create(&dbTask).Error; err != nil {
return taskError(err, "storing task: %v", err)
}
uuidToTask[authoredTask.UUID] = &dbTask
}
// Store the dependencies between tasks.
for _, authoredTask := range authoredJob.Tasks {
if len(authoredTask.Dependencies) == 0 {
continue
}
dbTask, ok := uuidToTask[authoredTask.UUID]
if !ok {
return taskError(nil, "unable to find task %q in the database, even though it was just authored", authoredTask.UUID)
}
deps := make([]*Task, len(authoredTask.Dependencies))
for i, t := range authoredTask.Dependencies {
depTask, ok := uuidToTask[t.UUID]
if !ok {
return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID)
}
deps[i] = depTask
}
dependenciesbatchsize := 1000
for j := 0; j < len(deps); j += dependenciesbatchsize {
end := j + dependenciesbatchsize
if end > len(deps) {
end = len(deps)
}
currentDeps := deps[j:end]
dbTask.Dependencies = currentDeps
tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID)
subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps})
if subQuery.Error != nil {
return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end)
}
}
}
return nil
}
// FetchJob fetches a single job, without fetching its tasks.
func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
dbJob := Job{}