diff --git a/cmd/sqlc-export-schema/main.go b/cmd/sqlc-export-schema/main.go new file mode 100644 index 00000000..ba84a798 --- /dev/null +++ b/cmd/sqlc-export-schema/main.go @@ -0,0 +1,189 @@ +package main + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "database/sql" + "flag" + "fmt" + "os" + "os/signal" + "regexp" + "strings" + "syscall" + "time" + + "github.com/mattn/go-colorable" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "gopkg.in/yaml.v2" + + _ "modernc.org/sqlite" +) + +var ( + // Tables and/or indices to skip when writing the schema. + // Anything that is *not* to be seen by sqlc should be listed here. + skips = map[SQLiteSchema]bool{ + // Goose manages its own versioning table. SQLC should ignore its existence. + {Type: "table", Name: "goose_db_version"}: true, + } + + tableNameDequoter = regexp.MustCompile("^(?:CREATE TABLE )(\"([^\"]+)\")") +) + +type SQLiteSchema struct { + Type string + Name string + TableName string + RootPage int + SQL sql.NullString +} + +func saveSchema(ctx context.Context, sqlOutPath string) error { + db, err := sql.Open("sqlite", "flamenco-manager.sqlite") + if err != nil { + return err + } + defer db.Close() + + rows, err := db.QueryContext(ctx, "select * from sqlite_schema order by type desc, name asc") + if err != nil { + return err + } + defer rows.Close() + + sqlBuilder := strings.Builder{} + + for rows.Next() { + var data SQLiteSchema + if err := rows.Scan( + &data.Type, + &data.Name, + &data.TableName, + &data.RootPage, + &data.SQL, + ); err != nil { + return err + } + if strings.HasPrefix(data.Name, "sqlite_") { + continue + } + if skips[SQLiteSchema{Type: data.Type, Name: data.Name}] { + continue + } + if !data.SQL.Valid { + continue + } + + sql := tableNameDequoter.ReplaceAllString(data.SQL.String, "CREATE TABLE $2") + + sqlBuilder.WriteString(sql) + sqlBuilder.WriteString(";\n") + } + + sqlBytes := []byte(sqlBuilder.String()) + if err := os.WriteFile(sqlOutPath, sqlBytes, os.ModePerm); err != nil { + return fmt.Errorf("writing to %s: %w", sqlOutPath, err) + } + + log.Info().Str("path", sqlOutPath).Msg("schema written to file") + return nil +} + +// SqlcConfig models the minimal subset of the sqlc.yaml we need to parse. +type SqlcConfig struct { + Version string `yaml:"version"` + SQL []struct { + Schema string `yaml:"schema"` + } `yaml:"sql"` +} + +func main() { + output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339} + log.Logger = log.Output(output) + parseCliArgs() + + mainCtx, mainCtxCancel := context.WithCancel(context.Background()) + defer mainCtxCancel() + + installSignalHandler(mainCtxCancel) + + schemaPath := schemaPathFromSqlcYAML() + + if err := saveSchema(mainCtx, schemaPath); err != nil { + log.Fatal().Err(err).Msg("couldn't export schema") + } +} + +// installSignalHandler spawns a goroutine that handles incoming POSIX signals. +func installSignalHandler(cancelFunc context.CancelFunc) { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + signal.Notify(signals, syscall.SIGTERM) + go func() { + for signum := range signals { + log.Info().Str("signal", signum.String()).Msg("signal received, shutting down") + cancelFunc() + } + }() +} + +func parseCliArgs() { + var quiet, debug, trace bool + + flag.BoolVar(&quiet, "quiet", false, "Only log warning-level and worse.") + flag.BoolVar(&debug, "debug", false, "Enable debug-level logging.") + flag.BoolVar(&trace, "trace", false, "Enable trace-level logging.") + + flag.Parse() + + var logLevel zerolog.Level + switch { + case trace: + logLevel = zerolog.TraceLevel + case debug: + logLevel = zerolog.DebugLevel + case quiet: + logLevel = zerolog.WarnLevel + default: + logLevel = zerolog.InfoLevel + } + zerolog.SetGlobalLevel(logLevel) +} + +func schemaPathFromSqlcYAML() string { + var sqlcConfig SqlcConfig + + { + sqlcConfigBytes, err := os.ReadFile("sqlc.yaml") + if err != nil { + log.Fatal().Err(err).Msg("cannot read sqlc.yaml") + } + + if err := yaml.Unmarshal(sqlcConfigBytes, &sqlcConfig); err != nil { + log.Fatal().Err(err).Msg("cannot parse sqlc.yaml") + } + } + + if sqlcConfig.Version != "2" { + log.Fatal(). + Str("version", sqlcConfig.Version). + Str("expected", "2"). + Msg("unexpected version in sqlc.yaml") + } + + if len(sqlcConfig.SQL) != 1 { + log.Fatal(). + Int("sql items", len(sqlcConfig.SQL)). + Msg("sqlc.yaml should contain a single item in the 'sql' list") + } + + schema := sqlcConfig.SQL[0].Schema + if schema == "" { + log.Fatal().Msg("sqlc.yaml should have a 'schema' key in the 'sql' item") + } + + return schema +} diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index 78e222a0..77574f7a 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -8,11 +8,11 @@ import ( "fmt" "time" + "github.com/glebarez/sqlite" "github.com/rs/zerolog/log" "gorm.io/gorm" - // sqlite "projects.blender.org/studio/flamenco/pkg/gorm-modernc-sqlite" - "github.com/glebarez/sqlite" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" ) // DB provides the database interface. @@ -171,6 +171,17 @@ func (db *DB) Close() error { return sqldb.Close() } +// queries returns the SQLC Queries struct, connected to this database. +// It is intended that all GORM queries will be migrated to use this interface +// instead. +func (db *DB) queries() (*sqlc.Queries, error) { + sqldb, err := db.gormDB.DB() + if err != nil { + return nil, fmt.Errorf("could not get low-level database driver: %w", err) + } + return sqlc.New(sqldb), nil +} + func (db *DB) pragmaForeignKeys(enabled bool) error { var ( value int diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 90c77ca6..5d7ede83 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -17,6 +17,7 @@ import ( "gorm.io/gorm/clause" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -252,19 +253,20 @@ func (db *DB) storeAuthoredJobTaks( // FetchJob fetches a single job, without fetching its tasks. func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { - dbJob := Job{} - findResult := db.gormDB.WithContext(ctx). - Limit(1). - Preload("WorkerTag"). - Find(&dbJob, "uuid = ?", jobUUID) - if findResult.Error != nil { - return nil, jobError(findResult.Error, "fetching job") - } - if dbJob.ID == 0 { - return nil, ErrJobNotFound + queries, err := db.queries() + if err != nil { + return nil, err } - return &dbJob, nil + sqlcJob, err := queries.FetchJob(ctx, jobUUID) + switch { + case errors.Is(err, sql.ErrNoRows): + return nil, ErrJobNotFound + case err != nil: + return nil, jobError(err, "fetching job") + } + + return convertSqlcJob(sqlcJob) } // DeleteJob deletes a job from the database. @@ -279,24 +281,39 @@ func (db *DB) DeleteJob(ctx context.Context, jobUUID string) error { return ErrDeletingWithoutFK } - tx := db.gormDB.WithContext(ctx). - Where("uuid = ?", jobUUID). - Delete(&Job{}) - if tx.Error != nil { - return jobError(tx.Error, "deleting job") + queries, err := db.queries() + if err != nil { + return err + } + + if err := queries.DeleteJob(ctx, jobUUID); err != nil { + return jobError(err, "deleting job") } return nil } // RequestJobDeletion sets the job's "DeletionRequestedAt" field to "now". func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error { + queries, err := db.queries() + if err != nil { + return err + } + + // Update the given job itself, so we don't have to re-fetch it from the database. j.DeleteRequestedAt.Time = db.gormDB.NowFunc() j.DeleteRequestedAt.Valid = true - tx := db.gormDB.WithContext(ctx). - Model(j). - Updates(Job{DeleteRequestedAt: j.DeleteRequestedAt}) - if tx.Error != nil { - return jobError(tx.Error, "queueing job for deletion") + + params := sqlc.RequestJobDeletionParams{ + Now: j.DeleteRequestedAt, + JobID: int64(j.ID), + } + + log.Trace(). + Str("job", j.UUID). + Time("deletedAt", params.Now.Time). + Msg("database: marking job as deletion-requested") + if err := queries.RequestJobDeletion(ctx, params); err != nil { + return jobError(err, "queueing job for deletion") } return nil } @@ -713,3 +730,42 @@ func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, err return workers, tx.Error } + +// convertSqlcJob converts a job from the SQLC-generated model to the model +// expected by the rest of the code. This is mostly in place to aid in the GORM +// to SQLC migration. It is intended that eventually the rest of the code will +// use the same SQLC-generated model. +func convertSqlcJob(job sqlc.Job) (*Job, error) { + dbJob := Job{ + Model: Model{ + ID: uint(job.ID), + CreatedAt: job.CreatedAt, + UpdatedAt: job.UpdatedAt.Time, + }, + UUID: job.Uuid, + Name: job.Name, + JobType: job.JobType, + Priority: int(job.Priority), + Status: api.JobStatus(job.Status), + Activity: job.Activity, + DeleteRequestedAt: job.DeleteRequestedAt, + Storage: JobStorageInfo{ + ShamanCheckoutID: job.StorageShamanCheckoutID, + }, + } + + if err := json.Unmarshal(job.Settings, &dbJob.Settings); err != nil { + return nil, jobError(err, fmt.Sprintf("job %s has invalid settings: %v", job.Uuid, err)) + } + + if err := json.Unmarshal(job.Metadata, &dbJob.Metadata); err != nil { + return nil, jobError(err, fmt.Sprintf("job %s has invalid metadata: %v", job.Uuid, err)) + } + + if job.WorkerTagID.Valid { + workerTagID := uint(job.WorkerTagID.Int64) + dbJob.WorkerTagID = &workerTagID + } + + return &dbJob, nil +} diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 0e51aaad..5fbc9f9c 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -288,10 +288,10 @@ func TestFetchJobsDeletionRequested(t *testing.T) { // Ensure different requests get different timestamps, // out of chronological order. timestamps := []time.Time{ - // timestamps for 'delete requested at' and 'updated at' - now.Add(-3 * time.Second), now.Add(-3 * time.Second), - now.Add(-1 * time.Second), now.Add(-1 * time.Second), - now.Add(-5 * time.Second), now.Add(-5 * time.Second), + // timestamps for 'delete requested at'. + now.Add(-3 * time.Second), + now.Add(-1 * time.Second), + now.Add(-5 * time.Second), } currentTimestampIndex := 0 db.gormDB.NowFunc = func() time.Time { diff --git a/internal/manager/persistence/migrations/0003_drop_worker_clusters.sql b/internal/manager/persistence/migrations/0003_drop_worker_clusters.sql new file mode 100644 index 00000000..4ae7b85a --- /dev/null +++ b/internal/manager/persistence/migrations/0003_drop_worker_clusters.sql @@ -0,0 +1,15 @@ +-- Drop tables that were in use in beta versions of Flamenco. These might exist +-- in developer databases, as well as databases of studios following the `main` +-- branch, such as Blender Studio. +-- +-- WARNING: this migration simply drops the tables. Their data is erased, and +-- cannot be brought back by rolling the migration back. +-- +-- +goose Up +DROP INDEX IF EXISTS `idx_worker_clusters_uuid`; +DROP TABLE IF EXISTS `worker_cluster_membership`; +DROP TABLE IF EXISTS `worker_clusters`; + +-- +goose Down +-- Do not recreate these tables, as no release of Flamenco ever used them. +-- Also their contents wouldn't be brought back anyway. diff --git a/internal/manager/persistence/migrations/0004_sqlc_compat_and_more_nonnull.sql b/internal/manager/persistence/migrations/0004_sqlc_compat_and_more_nonnull.sql new file mode 100644 index 00000000..0857d0b4 --- /dev/null +++ b/internal/manager/persistence/migrations/0004_sqlc_compat_and_more_nonnull.sql @@ -0,0 +1,371 @@ +-- GORM automigration wasn't smart, and thus the database had more nullable +-- columns than necessary. This migration makes columns that should never be +-- NULL actually NOT NULL. +-- +-- Since this migration recreates all tables in the database, this is now also +-- done in a way that makes the schema more compatible with sqlc (which is +-- mostly removing various quotes and backticks, and replacing char(N) with +-- varchar(N)). sqlc is the tool that'll replace GORM. +-- +-- +goose Up +CREATE TABLE temp_last_rendereds ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + job_id integer DEFAULT 0 NOT NULL, + PRIMARY KEY (id), + CONSTRAINT fk_last_rendereds_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE +); +INSERT INTO temp_last_rendereds SELECT * FROM last_rendereds; +DROP TABLE last_rendereds; +ALTER TABLE temp_last_rendereds RENAME TO last_rendereds; + +CREATE TABLE temp_task_dependencies ( + task_id integer NOT NULL, + dependency_id integer NOT NULL, + PRIMARY KEY (task_id, dependency_id), + CONSTRAINT fk_task_dependencies_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE, + CONSTRAINT fk_task_dependencies_dependencies FOREIGN KEY (dependency_id) REFERENCES tasks(id) ON DELETE CASCADE +); +INSERT INTO temp_task_dependencies SELECT * FROM task_dependencies; +DROP TABLE task_dependencies; +ALTER TABLE temp_task_dependencies RENAME TO task_dependencies; + +CREATE TABLE temp_task_failures ( + created_at datetime NOT NULL, + task_id integer NOT NULL, + worker_id integer NOT NULL, + PRIMARY KEY (task_id, worker_id), + CONSTRAINT fk_task_failures_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE, + CONSTRAINT fk_task_failures_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE +); +INSERT INTO temp_task_failures SELECT * FROM task_failures; +DROP TABLE task_failures; +ALTER TABLE temp_task_failures RENAME TO task_failures; + +CREATE TABLE temp_worker_tag_membership ( + worker_tag_id integer NOT NULL, + worker_id integer NOT NULL, + PRIMARY KEY (worker_tag_id, worker_id), + CONSTRAINT fk_worker_tag_membership_worker_tag FOREIGN KEY (worker_tag_id) REFERENCES worker_tags(id) ON DELETE CASCADE, + CONSTRAINT fk_worker_tag_membership_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE +); +INSERT INTO temp_worker_tag_membership SELECT * FROM worker_tag_membership; +DROP TABLE worker_tag_membership; +ALTER TABLE temp_worker_tag_membership RENAME TO worker_tag_membership; + +CREATE TABLE temp_worker_tags ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + uuid varchar(36) UNIQUE DEFAULT '' NOT NULL, + name varchar(64) UNIQUE DEFAULT '' NOT NULL, + description varchar(255) DEFAULT '' NOT NULL, + PRIMARY KEY (id) +); +INSERT INTO temp_worker_tags SELECT * FROM worker_tags; +DROP TABLE worker_tags; +ALTER TABLE temp_worker_tags RENAME TO worker_tags; + +CREATE TABLE temp_jobs ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + uuid varchar(36) UNIQUE DEFAULT '' NOT NULL, + name varchar(64) DEFAULT '' NOT NULL, + job_type varchar(32) DEFAULT '' NOT NULL, + priority smallint DEFAULT 0 NOT NULL, + status varchar(32) DEFAULT '' NOT NULL, + activity varchar(255) DEFAULT '' NOT NULL, + settings jsonb NOT NULL, + metadata jsonb NOT NULL, + delete_requested_at datetime, + storage_shaman_checkout_id varchar(255) DEFAULT '' NOT NULL, + worker_tag_id integer, + PRIMARY KEY (id), + CONSTRAINT fk_jobs_worker_tag FOREIGN KEY (worker_tag_id) REFERENCES worker_tags(id) ON DELETE SET NULL +); +-- This is using an explicit set of columns, as my development machine had an +-- extra column in there that caused errors. If anybody ever ran a beta where +-- the `worker_tag_id` was called `worker_cluster_id`, they'd have that extra +-- column too. +INSERT INTO temp_jobs SELECT + id, + created_at, + updated_at, + uuid, + name, + job_type, + priority, + status, + activity, + settings, + metadata, + delete_requested_at, + storage_shaman_checkout_id, + worker_tag_id +FROM jobs; +DROP TABLE jobs; +ALTER TABLE temp_jobs RENAME TO jobs; + +CREATE TABLE temp_workers ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + uuid varchar(36) UNIQUE DEFAULT '' NOT NULL, + secret varchar(255) DEFAULT '' NOT NULL, + name varchar(64) DEFAULT '' NOT NULL, + address varchar(39) DEFAULT '' NOT NULL, + platform varchar(16) DEFAULT '' NOT NULL, + software varchar(32) DEFAULT '' NOT NULL, + status varchar(16) DEFAULT '' NOT NULL, + last_seen_at datetime, + status_requested varchar(16) DEFAULT '' NOT NULL, + lazy_status_request smallint DEFAULT false NOT NULL, + supported_task_types varchar(255) DEFAULT '' NOT NULL, + deleted_at datetime, + can_restart smallint DEFAULT false NOT NULL, + PRIMARY KEY (id) +); +UPDATE workers SET supported_task_types = '' where supported_task_types is NULL; +INSERT INTO temp_workers SELECT * FROM workers; +DROP TABLE workers; +ALTER TABLE temp_workers RENAME TO workers; + +CREATE TABLE temp_job_blocks ( + id integer NOT NULL, + created_at datetime NOT NULL, + job_id integer DEFAULT 0 NOT NULL, + worker_id integer DEFAULT 0 NOT NULL, + task_type text NOT NULL, + PRIMARY KEY (id), + CONSTRAINT fk_job_blocks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE, + CONSTRAINT fk_job_blocks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE +); +INSERT INTO temp_job_blocks SELECT * FROM job_blocks; +DROP TABLE job_blocks; +ALTER TABLE temp_job_blocks RENAME TO job_blocks; + +CREATE TABLE temp_sleep_schedules ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + worker_id integer UNIQUE DEFAULT 0 NOT NULL, + is_active numeric DEFAULT false NOT NULL, + days_of_week text DEFAULT '' NOT NULL, + start_time text DEFAULT '' NOT NULL, + end_time text DEFAULT '' NOT NULL, + next_check datetime, + PRIMARY KEY (id), + CONSTRAINT fk_sleep_schedules_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE +); +INSERT INTO temp_sleep_schedules SELECT * FROM sleep_schedules; +DROP TABLE sleep_schedules; +ALTER TABLE temp_sleep_schedules RENAME TO sleep_schedules; + +CREATE TABLE temp_tasks ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + uuid varchar(36) UNIQUE DEFAULT '' NOT NULL, + name varchar(64) DEFAULT '' NOT NULL, + type varchar(32) DEFAULT '' NOT NULL, + job_id integer DEFAULT 0 NOT NULL, + priority smallint DEFAULT 50 NOT NULL, + status varchar(16) DEFAULT '' NOT NULL, + worker_id integer, + last_touched_at datetime, + commands jsonb NOT NULL, + activity varchar(255) DEFAULT '' NOT NULL, + PRIMARY KEY (id), + CONSTRAINT fk_tasks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE, + CONSTRAINT fk_tasks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE SET NULL +); +INSERT INTO temp_tasks SELECT * FROM tasks; +DROP TABLE tasks; +ALTER TABLE temp_tasks RENAME TO tasks; + +-- Recreate the indices on the new tables. +CREATE INDEX idx_worker_tags_uuid ON worker_tags(uuid); +CREATE INDEX idx_jobs_uuid ON jobs(uuid); +CREATE INDEX idx_workers_address ON workers(address); +CREATE INDEX idx_workers_last_seen_at ON workers(last_seen_at); +CREATE INDEX idx_workers_deleted_at ON workers(deleted_at); +CREATE INDEX idx_workers_uuid ON workers(uuid); +CREATE UNIQUE INDEX job_worker_tasktype ON job_blocks(job_id, worker_id, task_type); +CREATE INDEX idx_sleep_schedules_is_active ON sleep_schedules(is_active); +CREATE INDEX idx_sleep_schedules_worker_id ON sleep_schedules(worker_id); +CREATE INDEX idx_tasks_uuid ON tasks(uuid); +CREATE INDEX idx_tasks_last_touched_at ON tasks(last_touched_at); + +-- +goose Down + +CREATE TABLE `temp_last_rendereds` ( + `id` integer, + `created_at` datetime, + `updated_at` datetime, + `job_id` integer DEFAULT 0, + PRIMARY KEY (`id`), + CONSTRAINT `fk_last_rendereds_job` FOREIGN KEY (`job_id`) REFERENCES `jobs`(`id`) ON DELETE CASCADE +); +INSERT INTO temp_last_rendereds SELECT * FROM last_rendereds; +DROP TABLE last_rendereds; +ALTER TABLE temp_last_rendereds RENAME TO `last_rendereds`; + +CREATE TABLE `temp_task_dependencies` ( + `task_id` integer, + `dependency_id` integer, + PRIMARY KEY (`task_id`, `dependency_id`), + CONSTRAINT `fk_task_dependencies_task` FOREIGN KEY (`task_id`) REFERENCES `tasks`(`id`) ON DELETE CASCADE, + CONSTRAINT `fk_task_dependencies_dependencies` FOREIGN KEY (`dependency_id`) REFERENCES `tasks`(`id`) ON DELETE CASCADE +); +INSERT INTO temp_task_dependencies SELECT * FROM task_dependencies; +DROP TABLE task_dependencies; +ALTER TABLE temp_task_dependencies RENAME TO `task_dependencies`; + +CREATE TABLE `temp_task_failures` ( + `created_at` datetime, + `task_id` integer, + `worker_id` integer, + PRIMARY KEY (`task_id`, `worker_id`), + CONSTRAINT `fk_task_failures_task` FOREIGN KEY (`task_id`) REFERENCES `tasks`(`id`) ON DELETE CASCADE, + CONSTRAINT `fk_task_failures_worker` FOREIGN KEY (`worker_id`) REFERENCES `workers`(`id`) ON DELETE CASCADE +); +INSERT INTO temp_task_failures SELECT * FROM task_failures; +DROP TABLE task_failures; +ALTER TABLE temp_task_failures RENAME TO `task_failures`; + +CREATE TABLE `temp_worker_tag_membership` ( + `worker_tag_id` integer, + `worker_id` integer, + PRIMARY KEY (`worker_tag_id`, `worker_id`), + CONSTRAINT `fk_worker_tag_membership_worker_tag` FOREIGN KEY (`worker_tag_id`) REFERENCES `worker_tags`(`id`) ON DELETE CASCADE, + CONSTRAINT `fk_worker_tag_membership_worker` FOREIGN KEY (`worker_id`) REFERENCES `workers`(`id`) ON DELETE CASCADE +); +INSERT INTO temp_worker_tag_membership SELECT * FROM worker_tag_membership; +DROP TABLE worker_tag_membership; +ALTER TABLE temp_worker_tag_membership RENAME TO `worker_tag_membership`; + +CREATE TABLE "temp_worker_tags" ( + `id` integer, + `created_at` datetime, + `updated_at` datetime, + `uuid` char(36) UNIQUE DEFAULT "", + `name` varchar(64) UNIQUE DEFAULT "", + `description` varchar(255) DEFAULT "", + PRIMARY KEY (`id`) +); +INSERT INTO temp_worker_tags SELECT * FROM worker_tags; +DROP TABLE worker_tags; +ALTER TABLE temp_worker_tags RENAME TO `worker_tags`; + +CREATE TABLE "temp_jobs" ( + `id` integer, + `created_at` datetime, + `updated_at` datetime, + `uuid` char(36) UNIQUE DEFAULT "", + `name` varchar(64) DEFAULT "", + `job_type` varchar(32) DEFAULT "", + `priority` smallint DEFAULT 0, + `status` varchar(32) DEFAULT "", + `activity` varchar(255) DEFAULT "", + `settings` jsonb, + `metadata` jsonb, + `delete_requested_at` datetime, + `storage_shaman_checkout_id` varchar(255) DEFAULT "", + `worker_tag_id` integer, + PRIMARY KEY(`id`), + CONSTRAINT `fk_jobs_worker_tag` FOREIGN KEY(`worker_tag_id`) REFERENCES `worker_tags`(`id`) ON DELETE SET NULL +); +INSERT INTO temp_jobs SELECT * FROM jobs; +DROP TABLE jobs; +ALTER TABLE temp_jobs RENAME TO `jobs`; + +CREATE TABLE "temp_workers" ( + `id` integer, + `created_at` datetime, + `updated_at` datetime, + `deleted_at` datetime, + `uuid` char(36) UNIQUE DEFAULT "", + `secret` varchar(255) DEFAULT "", + `name` varchar(64) DEFAULT "", + `address` varchar(39) DEFAULT "", + `platform` varchar(16) DEFAULT "", + `software` varchar(32) DEFAULT "", + `status` varchar(16) DEFAULT "", + `last_seen_at` datetime, + `status_requested` varchar(16) DEFAULT "", + `lazy_status_request` smallint DEFAULT false, + `supported_task_types` varchar(255) DEFAULT "", + `can_restart` smallint DEFAULT false, + PRIMARY KEY (`id`) +); +INSERT INTO temp_workers SELECT * FROM workers; +DROP TABLE workers; +ALTER TABLE temp_workers RENAME TO `workers`; + +CREATE TABLE "temp_job_blocks" ( + `id` integer, + `created_at` datetime, + `job_id` integer DEFAULT 0, + `worker_id` integer DEFAULT 0, + `task_type` text, + PRIMARY KEY (`id`), + CONSTRAINT `fk_job_blocks_job` FOREIGN KEY (`job_id`) REFERENCES `jobs`(`id`) ON DELETE CASCADE, + CONSTRAINT `fk_job_blocks_worker` FOREIGN KEY (`worker_id`) REFERENCES `workers`(`id`) ON DELETE CASCADE +); +INSERT INTO temp_job_blocks SELECT * FROM job_blocks; +DROP TABLE job_blocks; +ALTER TABLE temp_job_blocks RENAME TO `job_blocks`; + +CREATE TABLE "temp_sleep_schedules" ( + `id` integer, + `created_at` datetime, + `updated_at` datetime, + `worker_id` integer UNIQUE DEFAULT 0, + `is_active` numeric DEFAULT false, + `days_of_week` text DEFAULT "", + `start_time` text DEFAULT "", + `end_time` text DEFAULT "", + `next_check` datetime, + PRIMARY KEY (`id`), + CONSTRAINT `fk_sleep_schedules_worker` FOREIGN KEY (`worker_id`) REFERENCES `workers`(`id`) ON DELETE CASCADE +); +INSERT INTO temp_sleep_schedules SELECT * FROM sleep_schedules; +DROP TABLE sleep_schedules; +ALTER TABLE temp_sleep_schedules RENAME TO `sleep_schedules`; + +CREATE TABLE "temp_tasks" ( + `id` integer, + `created_at` datetime, + `updated_at` datetime, + `uuid` char(36) UNIQUE DEFAULT "", + `name` varchar(64) DEFAULT "", + `type` varchar(32) DEFAULT "", + `job_id` integer DEFAULT 0, + `priority` smallint DEFAULT 50, + `status` varchar(16) DEFAULT "", + `worker_id` integer, + `last_touched_at` datetime, + `commands` jsonb, + `activity` varchar(255) DEFAULT "", + PRIMARY KEY (`id`), + CONSTRAINT `fk_tasks_job` FOREIGN KEY (`job_id`) REFERENCES `jobs`(`id`) ON DELETE CASCADE, + CONSTRAINT `fk_tasks_worker` FOREIGN KEY (`worker_id`) REFERENCES `workers`(`id`) ON DELETE + SET NULL +); +INSERT INTO temp_tasks SELECT * FROM tasks; +DROP TABLE tasks; +ALTER TABLE temp_tasks RENAME TO `tasks`; + +CREATE INDEX `idx_worker_tags_uuid` ON `worker_tags`(`uuid`); +CREATE INDEX `idx_jobs_uuid` ON `jobs`(`uuid`); +CREATE INDEX `idx_workers_address` ON `workers`(`address`); +CREATE INDEX `idx_workers_last_seen_at` ON `workers`(`last_seen_at`); +CREATE INDEX `idx_workers_deleted_at` ON `workers`(`deleted_at`); +CREATE INDEX `idx_workers_uuid` ON `workers`(`uuid`); +CREATE UNIQUE INDEX `job_worker_tasktype` ON `job_blocks`(`job_id`, `worker_id`, `task_type`); +CREATE INDEX `idx_sleep_schedules_is_active` ON `sleep_schedules`(`is_active`); +CREATE INDEX `idx_sleep_schedules_worker_id` ON `sleep_schedules`(`worker_id`); +CREATE INDEX `idx_tasks_uuid` ON `tasks`(`uuid`); +CREATE INDEX `idx_tasks_last_touched_at` ON `tasks`(`last_touched_at`); diff --git a/internal/manager/persistence/sqlc/db.go b/internal/manager/persistence/sqlc/db.go new file mode 100644 index 00000000..8ed64d13 --- /dev/null +++ b/internal/manager/persistence/sqlc/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 + +package sqlc + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/manager/persistence/sqlc/models.go b/internal/manager/persistence/sqlc/models.go new file mode 100644 index 00000000..c5845f3b --- /dev/null +++ b/internal/manager/persistence/sqlc/models.go @@ -0,0 +1,115 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 + +package sqlc + +import ( + "database/sql" + "encoding/json" + "time" +) + +type Job struct { + ID int64 + CreatedAt time.Time + UpdatedAt sql.NullTime + Uuid string + Name string + JobType string + Priority int64 + Status string + Activity string + Settings json.RawMessage + Metadata json.RawMessage + DeleteRequestedAt sql.NullTime + StorageShamanCheckoutID string + WorkerTagID sql.NullInt64 +} + +type JobBlock struct { + ID int64 + CreatedAt time.Time + JobID int64 + WorkerID int64 + TaskType string +} + +type LastRendered struct { + ID int64 + CreatedAt time.Time + UpdatedAt sql.NullTime + JobID int64 +} + +type SleepSchedule struct { + ID int64 + CreatedAt time.Time + UpdatedAt sql.NullTime + WorkerID int64 + IsActive float64 + DaysOfWeek string + StartTime string + EndTime string + NextCheck sql.NullTime +} + +type Task struct { + ID int64 + CreatedAt time.Time + UpdatedAt sql.NullTime + Uuid string + Name string + Type string + JobID int64 + Priority int64 + Status string + WorkerID sql.NullInt64 + LastTouchedAt sql.NullTime + Commands json.RawMessage + Activity string +} + +type TaskDependency struct { + TaskID int64 + DependencyID int64 +} + +type TaskFailure struct { + CreatedAt time.Time + TaskID int64 + WorkerID int64 +} + +type Worker struct { + ID int64 + CreatedAt time.Time + UpdatedAt sql.NullTime + Uuid string + Secret string + Name string + Address string + Platform string + Software string + Status string + LastSeenAt sql.NullTime + StatusRequested string + LazyStatusRequest int64 + SupportedTaskTypes string + DeletedAt sql.NullTime + CanRestart int64 +} + +type WorkerTag struct { + ID int64 + CreatedAt time.Time + UpdatedAt sql.NullTime + Uuid string + Name string + Description string +} + +type WorkerTagMembership struct { + WorkerTagID int64 + WorkerID int64 +} diff --git a/internal/manager/persistence/sqlc/query.sql b/internal/manager/persistence/sqlc/query.sql new file mode 100644 index 00000000..3318db5a --- /dev/null +++ b/internal/manager/persistence/sqlc/query.sql @@ -0,0 +1,35 @@ + +-- Jobs / Tasks queries +-- + +-- name: CreateJob :exec +INSERT INTO jobs ( + created_at, + uuid, + name, + job_type, + priority, + status, + activity, + settings, + metadata, + storage_shaman_checkout_id +) +VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ); + +-- name: DeleteJob :exec +DELETE FROM jobs WHERE uuid = ?; + +-- name: RequestJobDeletion :exec +UPDATE jobs SET + updated_at = @now, + delete_requested_at = @now +WHERE id = sqlc.arg('job_id'); + +-- name: FetchTask :one +SELECT * FROM tasks +WHERE uuid = ? LIMIT 1; + +-- name: FetchJob :one +SELECT * FROM jobs +WHERE uuid = ? LIMIT 1; diff --git a/internal/manager/persistence/sqlc/query.sql.go b/internal/manager/persistence/sqlc/query.sql.go new file mode 100644 index 00000000..caec9700 --- /dev/null +++ b/internal/manager/persistence/sqlc/query.sql.go @@ -0,0 +1,139 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 +// source: query.sql + +package sqlc + +import ( + "context" + "database/sql" + "encoding/json" + "time" +) + +const createJob = `-- name: CreateJob :exec + +INSERT INTO jobs ( + created_at, + uuid, + name, + job_type, + priority, + status, + activity, + settings, + metadata, + storage_shaman_checkout_id +) +VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) +` + +type CreateJobParams struct { + CreatedAt time.Time + Uuid string + Name string + JobType string + Priority int64 + Status string + Activity string + Settings json.RawMessage + Metadata json.RawMessage + StorageShamanCheckoutID string +} + +// Jobs / Tasks queries +func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error { + _, err := q.db.ExecContext(ctx, createJob, + arg.CreatedAt, + arg.Uuid, + arg.Name, + arg.JobType, + arg.Priority, + arg.Status, + arg.Activity, + arg.Settings, + arg.Metadata, + arg.StorageShamanCheckoutID, + ) + return err +} + +const deleteJob = `-- name: DeleteJob :exec +DELETE FROM jobs WHERE uuid = ? +` + +func (q *Queries) DeleteJob(ctx context.Context, uuid string) error { + _, err := q.db.ExecContext(ctx, deleteJob, uuid) + return err +} + +const fetchJob = `-- name: FetchJob :one +SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs +WHERE uuid = ? LIMIT 1 +` + +func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) { + row := q.db.QueryRowContext(ctx, fetchJob, uuid) + var i Job + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.Uuid, + &i.Name, + &i.JobType, + &i.Priority, + &i.Status, + &i.Activity, + &i.Settings, + &i.Metadata, + &i.DeleteRequestedAt, + &i.StorageShamanCheckoutID, + &i.WorkerTagID, + ) + return i, err +} + +const fetchTask = `-- name: FetchTask :one +SELECT id, created_at, updated_at, uuid, name, type, job_id, priority, status, worker_id, last_touched_at, commands, activity FROM tasks +WHERE uuid = ? LIMIT 1 +` + +func (q *Queries) FetchTask(ctx context.Context, uuid string) (Task, error) { + row := q.db.QueryRowContext(ctx, fetchTask, uuid) + var i Task + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.Uuid, + &i.Name, + &i.Type, + &i.JobID, + &i.Priority, + &i.Status, + &i.WorkerID, + &i.LastTouchedAt, + &i.Commands, + &i.Activity, + ) + return i, err +} + +const requestJobDeletion = `-- name: RequestJobDeletion :exec +UPDATE jobs SET + updated_at = ?1, + delete_requested_at = ?1 +WHERE id = ?2 +` + +type RequestJobDeletionParams struct { + Now sql.NullTime + JobID int64 +} + +func (q *Queries) RequestJobDeletion(ctx context.Context, arg RequestJobDeletionParams) error { + _, err := q.db.ExecContext(ctx, requestJobDeletion, arg.Now, arg.JobID) + return err +} diff --git a/internal/manager/persistence/sqlc/schema.sql b/internal/manager/persistence/sqlc/schema.sql new file mode 100644 index 00000000..916fe101 --- /dev/null +++ b/internal/manager/persistence/sqlc/schema.sql @@ -0,0 +1,128 @@ +CREATE TABLE job_blocks ( + id integer NOT NULL, + created_at datetime NOT NULL, + job_id integer DEFAULT 0 NOT NULL, + worker_id integer DEFAULT 0 NOT NULL, + task_type text NOT NULL, + PRIMARY KEY (id), + CONSTRAINT fk_job_blocks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE, + CONSTRAINT fk_job_blocks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE +); +CREATE TABLE jobs ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + uuid varchar(36) UNIQUE DEFAULT '' NOT NULL, + name varchar(64) DEFAULT '' NOT NULL, + job_type varchar(32) DEFAULT '' NOT NULL, + priority smallint DEFAULT 0 NOT NULL, + status varchar(32) DEFAULT '' NOT NULL, + activity varchar(255) DEFAULT '' NOT NULL, + settings jsonb NOT NULL, + metadata jsonb NOT NULL, + delete_requested_at datetime, + storage_shaman_checkout_id varchar(255) DEFAULT '' NOT NULL, + worker_tag_id integer, + PRIMARY KEY (id), + CONSTRAINT fk_jobs_worker_tag FOREIGN KEY (worker_tag_id) REFERENCES worker_tags(id) ON DELETE SET NULL +); +CREATE TABLE last_rendereds ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + job_id integer DEFAULT 0 NOT NULL, + PRIMARY KEY (id), + CONSTRAINT fk_last_rendereds_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE +); +CREATE TABLE sleep_schedules ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + worker_id integer UNIQUE DEFAULT 0 NOT NULL, + is_active numeric DEFAULT false NOT NULL, + days_of_week text DEFAULT '' NOT NULL, + start_time text DEFAULT '' NOT NULL, + end_time text DEFAULT '' NOT NULL, + next_check datetime, + PRIMARY KEY (id), + CONSTRAINT fk_sleep_schedules_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE +); +CREATE TABLE task_dependencies ( + task_id integer NOT NULL, + dependency_id integer NOT NULL, + PRIMARY KEY (task_id, dependency_id), + CONSTRAINT fk_task_dependencies_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE, + CONSTRAINT fk_task_dependencies_dependencies FOREIGN KEY (dependency_id) REFERENCES tasks(id) ON DELETE CASCADE +); +CREATE TABLE task_failures ( + created_at datetime NOT NULL, + task_id integer NOT NULL, + worker_id integer NOT NULL, + PRIMARY KEY (task_id, worker_id), + CONSTRAINT fk_task_failures_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE, + CONSTRAINT fk_task_failures_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE +); +CREATE TABLE tasks ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + uuid varchar(36) UNIQUE DEFAULT '' NOT NULL, + name varchar(64) DEFAULT '' NOT NULL, + type varchar(32) DEFAULT '' NOT NULL, + job_id integer DEFAULT 0 NOT NULL, + priority smallint DEFAULT 50 NOT NULL, + status varchar(16) DEFAULT '' NOT NULL, + worker_id integer, + last_touched_at datetime, + commands jsonb NOT NULL, + activity varchar(255) DEFAULT '' NOT NULL, + PRIMARY KEY (id), + CONSTRAINT fk_tasks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE, + CONSTRAINT fk_tasks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE SET NULL +); +CREATE TABLE worker_tag_membership ( + worker_tag_id integer NOT NULL, + worker_id integer NOT NULL, + PRIMARY KEY (worker_tag_id, worker_id), + CONSTRAINT fk_worker_tag_membership_worker_tag FOREIGN KEY (worker_tag_id) REFERENCES worker_tags(id) ON DELETE CASCADE, + CONSTRAINT fk_worker_tag_membership_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE +); +CREATE TABLE worker_tags ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + uuid varchar(36) UNIQUE DEFAULT '' NOT NULL, + name varchar(64) UNIQUE DEFAULT '' NOT NULL, + description varchar(255) DEFAULT '' NOT NULL, + PRIMARY KEY (id) +); +CREATE TABLE workers ( + id integer NOT NULL, + created_at datetime NOT NULL, + updated_at datetime, + uuid varchar(36) UNIQUE DEFAULT '' NOT NULL, + secret varchar(255) DEFAULT '' NOT NULL, + name varchar(64) DEFAULT '' NOT NULL, + address varchar(39) DEFAULT '' NOT NULL, + platform varchar(16) DEFAULT '' NOT NULL, + software varchar(32) DEFAULT '' NOT NULL, + status varchar(16) DEFAULT '' NOT NULL, + last_seen_at datetime, + status_requested varchar(16) DEFAULT '' NOT NULL, + lazy_status_request smallint DEFAULT false NOT NULL, + supported_task_types varchar(255) DEFAULT '' NOT NULL, + deleted_at datetime, + can_restart smallint DEFAULT false NOT NULL, + PRIMARY KEY (id) +); +CREATE INDEX idx_jobs_uuid ON jobs(uuid); +CREATE INDEX idx_sleep_schedules_is_active ON sleep_schedules(is_active); +CREATE INDEX idx_sleep_schedules_worker_id ON sleep_schedules(worker_id); +CREATE INDEX idx_tasks_last_touched_at ON tasks(last_touched_at); +CREATE INDEX idx_tasks_uuid ON tasks(uuid); +CREATE INDEX idx_worker_tags_uuid ON worker_tags(uuid); +CREATE INDEX idx_workers_address ON workers(address); +CREATE INDEX idx_workers_deleted_at ON workers(deleted_at); +CREATE INDEX idx_workers_last_seen_at ON workers(last_seen_at); +CREATE INDEX idx_workers_uuid ON workers(uuid); +CREATE UNIQUE INDEX job_worker_tasktype ON job_blocks(job_id, worker_id, task_type); diff --git a/sqlc.yaml b/sqlc.yaml new file mode 100644 index 00000000..d99d8136 --- /dev/null +++ b/sqlc.yaml @@ -0,0 +1,13 @@ +version: "2" +sql: + - engine: "sqlite" + schema: "internal/manager/persistence/sqlc/schema.sql" + queries: "internal/manager/persistence/sqlc/query.sql" + gen: + go: + out: "internal/manager/persistence/sqlc" + overrides: + - db_type: "jsonb" + go_type: + import: "encoding/json" + type: "RawMessage" diff --git a/web/project-website/content/development/database/_index.md b/web/project-website/content/development/database/_index.md index 05acee67..ca43f6e3 100644 --- a/web/project-website/content/development/database/_index.md +++ b/web/project-website/content/development/database/_index.md @@ -3,10 +3,27 @@ title: Database weight: 50 --- -Flamenco Manager and Worker use SQLite as database, and Gorm as -object-relational mapper. +Flamenco Manager and Worker use SQLite as database, and GORM as +object-relational mapper (but see the note below). Since SQLite has limited support for altering table schemas, migration requires copying old data to a temporary table with the new schema, then swap out the tables. Because of this, avoid `NOT NULL` columns, as they will be problematic in this process. + +## SQLC + +Flamenco mostly uses [GORM][gorm] for interfacing with its SQLite database. This +is gradually being phased out, to be replaced with [SQLC][sqlc]. + +To generate the SQLC schema file: +```sh +make db-migrate-up +go run ./cmd/sqlc-export-schema +``` + +To generate Go code with SQLC after changing `schema.sql` or `queries.sql`: +```sh +go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest +sqlc generate +```