diff --git a/.gitignore b/.gitignore index 38588051..151fe05c 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ /flamenco-worker /flamenco-manager_race /flamenco-worker_race +/shaman-checkout-id-setter /stresser /addon-packer flamenco-manager.yaml diff --git a/cmd/shaman-checkout-id-setter/main.go b/cmd/shaman-checkout-id-setter/main.go new file mode 100644 index 00000000..18624347 --- /dev/null +++ b/cmd/shaman-checkout-id-setter/main.go @@ -0,0 +1,177 @@ +package main + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "bufio" + "context" + "errors" + "flag" + "io/fs" + "os" + "path/filepath" + "runtime" + "strings" + "time" + + "github.com/mattn/go-colorable" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "git.blender.org/flamenco/internal/appinfo" + "git.blender.org/flamenco/internal/manager/config" + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" +) + +func main() { + output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339} + log.Logger = log.Output(output) + log.Info(). + Str("version", appinfo.ApplicationVersion). + Str("git", appinfo.ApplicationGitHash). + Str("releaseCycle", appinfo.ReleaseCycle). + Str("os", runtime.GOOS). + Str("arch", runtime.GOARCH). + Msgf("starting %v shaman-checkout-id-setter", appinfo.ApplicationName) + + log.Warn().Msg("Use with care, and at your own risk.") + log.Warn().Msg("This is an experimental program, and may ruin your entire Flamenco database.") + log.Warn().Msg("Press Enter to continue.") + _, _ = bufio.NewReader(os.Stdin).ReadBytes('\n') + + parseCliArgs() + + // Load configuration. + configService := config.NewService() + err := configService.Load() + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Error().Err(err).Msg("loading configuration") + } + + // Reject working on a brand new installation. + isFirstRun, err := configService.IsFirstRun() + switch { + case err != nil: + log.Fatal().Err(err).Msg("unable to determine whether this is the first run of Flamenco or not") + case isFirstRun: + log.Fatal().Msg("this should be run on an already-used database") + } + + // Find the {jobs} variable. + vars := configService.ResolveVariables(config.VariableAudienceWorkers, config.VariablePlatform(runtime.GOOS)) + jobsPath := vars["jobs"].Value + if jobsPath == "" { + log.Fatal().Msg("unable to resolve 'jobs' variable") + } + + // Connect to the database. + ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer ctxCancel() + persist := openDB(*configService) + defer persist.Close() + + // Get all jobs from the database. + jobs, err := persist.QueryJobs(ctx, api.JobsQuery{}) + if err != nil { + log.Fatal().Err(err).Msg("unable to fetch jobs") + } + + log.Info().Int("numJobs", len(jobs)).Msg("processing all jobs") + numJobsUpdated := 0 + for _, job := range jobs { + logger := log.With().Uint("id", job.ID).Str("uuid", job.UUID).Logger() + + if job.Storage.ShamanCheckoutID != "" { + logger.Info().Str("checkoutID", job.Storage.ShamanCheckoutID).Msg("job already has a Shaman checkout ID") + continue + } + + logger.Debug().Msg("processing job") + + // Find the 'blendfile' setting. + blendfile, ok := job.Settings["blendfile"].(string) + if !ok { + logger.Info().Msg("skipping job, it has no `blendfile` setting") + continue + } + + // See if it starts with `{jobs}`, otherwise it's not submitted via Shaman. + relpath, found := strings.CutPrefix(blendfile, "{jobs}"+string(os.PathSeparator)) + if !found { + logger.Info().Str("blendfile", blendfile).Msg("skipping job, its blendfile setting doesn't start with `{jobs}/`") + continue + } + + // See if there is a `pack-info.txt` file next to the blend file. This is + // another indication that we have the right directory. + packInfoPath := filepath.Join(jobsPath, filepath.Dir(relpath), "pack-info.txt") + _, err := os.Stat(packInfoPath) + switch { + case errors.Is(err, os.ErrNotExist): + logger.Warn().Str("packInfo", packInfoPath).Msg("skipping job, pack-info.txt not found where expected") + continue + case err != nil: + logger.Fatal().Str("packInfo", packInfoPath).Msg("error accessing pack-info.txt") + } + + // Extract the checkout ID from the blend file path. + checkoutID := filepath.Dir(relpath) + logger = logger.With().Str("checkoutID", checkoutID).Logger() + + // Store it on the job. + logger.Info().Msg("updating job") + job.Storage.ShamanCheckoutID = checkoutID + if err := persist.SaveJobStorageInfo(ctx, job); err != nil { + logger.Error().Err(err).Msg("error saving job to the database") + continue + } + + numJobsUpdated++ + } + + log.Info().Msgf("done, updated %d of %d jobs", numJobsUpdated, len(jobs)) +} + +func parseCliArgs() { + var quiet, debug, trace bool + + flag.BoolVar(&quiet, "quiet", false, "Only log warning-level and worse.") + flag.BoolVar(&debug, "debug", false, "Enable debug-level logging.") + flag.BoolVar(&trace, "trace", false, "Enable trace-level logging.") + + flag.Parse() + + var logLevel zerolog.Level + switch { + case trace: + logLevel = zerolog.TraceLevel + case debug: + logLevel = zerolog.DebugLevel + case quiet: + logLevel = zerolog.WarnLevel + default: + logLevel = zerolog.InfoLevel + } + zerolog.SetGlobalLevel(logLevel) +} + +// openDB opens the database or dies. +func openDB(configService config.Service) *persistence.DB { + dsn := configService.Get().DatabaseDSN + if dsn == "" { + log.Fatal().Msg("configure the database in flamenco-manager.yaml") + } + + dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer dbCtxCancel() + persist, err := persistence.OpenDB(dbCtx, dsn) + if err != nil { + log.Fatal(). + Err(err). + Str("dsn", dsn). + Msg("error opening database") + } + + return persist +} diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 432a5b2e..de809523 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -306,6 +306,17 @@ func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error { return nil } +// SaveJobStorageInfo saves the job's Storage field. +func (db *DB) SaveJobStorageInfo(ctx context.Context, j *Job) error { + tx := db.gormDB.WithContext(ctx). + Model(j). + Updates(Job{Storage: j.Storage}) + if tx.Error != nil { + return jobError(tx.Error, "saving job storage") + } + return nil +} + func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) { dbTask := Task{} tx := db.gormDB.WithContext(ctx).