Manager: start replacing GORM with SQLC

GORM has certain downsides:

- Code-first approach, where queries have to be translated to the Go code
  required to execute them.
- GORM comes with its own SQLite implementation, which doesn't provide an
  on-connect callback. This means that new connections cannot correctly
  enable foreign key constraints, causing database consistency issues.

[SQLC](https://sqlc.dev/) solves these issues for us.

This commit doesn't fully replace GORM with SQLC, but introduces it for
a few queries. Once all queries have been converted, GORM can be removed
completely.
This commit is contained in:
Sybren A. Stüvel 2024-03-03 20:15:39 +01:00
parent 7b31eba8d7
commit c046094880
13 changed files with 1149 additions and 29 deletions

@ -0,0 +1,189 @@
package main
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"database/sql"
"flag"
"fmt"
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"time"
"github.com/mattn/go-colorable"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v2"
_ "modernc.org/sqlite"
)
var (
// Tables and/or indices to skip when writing the schema.
// Anything that is *not* to be seen by sqlc should be listed here.
skips = map[SQLiteSchema]bool{
// Goose manages its own versioning table. SQLC should ignore its existence.
{Type: "table", Name: "goose_db_version"}: true,
}
tableNameDequoter = regexp.MustCompile("^(?:CREATE TABLE )(\"([^\"]+)\")")
)
type SQLiteSchema struct {
Type string
Name string
TableName string
RootPage int
SQL sql.NullString
}
func saveSchema(ctx context.Context, sqlOutPath string) error {
db, err := sql.Open("sqlite", "flamenco-manager.sqlite")
if err != nil {
return err
}
defer db.Close()
rows, err := db.QueryContext(ctx, "select * from sqlite_schema order by type desc, name asc")
if err != nil {
return err
}
defer rows.Close()
sqlBuilder := strings.Builder{}
for rows.Next() {
var data SQLiteSchema
if err := rows.Scan(
&data.Type,
&data.Name,
&data.TableName,
&data.RootPage,
&data.SQL,
); err != nil {
return err
}
if strings.HasPrefix(data.Name, "sqlite_") {
continue
}
if skips[SQLiteSchema{Type: data.Type, Name: data.Name}] {
continue
}
if !data.SQL.Valid {
continue
}
sql := tableNameDequoter.ReplaceAllString(data.SQL.String, "CREATE TABLE $2")
sqlBuilder.WriteString(sql)
sqlBuilder.WriteString(";\n")
}
sqlBytes := []byte(sqlBuilder.String())
if err := os.WriteFile(sqlOutPath, sqlBytes, os.ModePerm); err != nil {
return fmt.Errorf("writing to %s: %w", sqlOutPath, err)
}
log.Info().Str("path", sqlOutPath).Msg("schema written to file")
return nil
}
// SqlcConfig models the minimal subset of the sqlc.yaml we need to parse.
type SqlcConfig struct {
Version string `yaml:"version"`
SQL []struct {
Schema string `yaml:"schema"`
} `yaml:"sql"`
}
func main() {
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
log.Logger = log.Output(output)
parseCliArgs()
mainCtx, mainCtxCancel := context.WithCancel(context.Background())
defer mainCtxCancel()
installSignalHandler(mainCtxCancel)
schemaPath := schemaPathFromSqlcYAML()
if err := saveSchema(mainCtx, schemaPath); err != nil {
log.Fatal().Err(err).Msg("couldn't export schema")
}
}
// installSignalHandler spawns a goroutine that handles incoming POSIX signals.
func installSignalHandler(cancelFunc context.CancelFunc) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
signal.Notify(signals, syscall.SIGTERM)
go func() {
for signum := range signals {
log.Info().Str("signal", signum.String()).Msg("signal received, shutting down")
cancelFunc()
}
}()
}
func parseCliArgs() {
var quiet, debug, trace bool
flag.BoolVar(&quiet, "quiet", false, "Only log warning-level and worse.")
flag.BoolVar(&debug, "debug", false, "Enable debug-level logging.")
flag.BoolVar(&trace, "trace", false, "Enable trace-level logging.")
flag.Parse()
var logLevel zerolog.Level
switch {
case trace:
logLevel = zerolog.TraceLevel
case debug:
logLevel = zerolog.DebugLevel
case quiet:
logLevel = zerolog.WarnLevel
default:
logLevel = zerolog.InfoLevel
}
zerolog.SetGlobalLevel(logLevel)
}
func schemaPathFromSqlcYAML() string {
var sqlcConfig SqlcConfig
{
sqlcConfigBytes, err := os.ReadFile("sqlc.yaml")
if err != nil {
log.Fatal().Err(err).Msg("cannot read sqlc.yaml")
}
if err := yaml.Unmarshal(sqlcConfigBytes, &sqlcConfig); err != nil {
log.Fatal().Err(err).Msg("cannot parse sqlc.yaml")
}
}
if sqlcConfig.Version != "2" {
log.Fatal().
Str("version", sqlcConfig.Version).
Str("expected", "2").
Msg("unexpected version in sqlc.yaml")
}
if len(sqlcConfig.SQL) != 1 {
log.Fatal().
Int("sql items", len(sqlcConfig.SQL)).
Msg("sqlc.yaml should contain a single item in the 'sql' list")
}
schema := sqlcConfig.SQL[0].Schema
if schema == "" {
log.Fatal().Msg("sqlc.yaml should have a 'schema' key in the 'sql' item")
}
return schema
}

@ -8,11 +8,11 @@ import (
"fmt"
"time"
"github.com/glebarez/sqlite"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
// sqlite "projects.blender.org/studio/flamenco/pkg/gorm-modernc-sqlite"
"github.com/glebarez/sqlite"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
)
// DB provides the database interface.
@ -171,6 +171,17 @@ func (db *DB) Close() error {
return sqldb.Close()
}
// queries returns the SQLC Queries struct, connected to this database.
// It is intended that all GORM queries will be migrated to use this interface
// instead.
func (db *DB) queries() (*sqlc.Queries, error) {
sqldb, err := db.gormDB.DB()
if err != nil {
return nil, fmt.Errorf("could not get low-level database driver: %w", err)
}
return sqlc.New(sqldb), nil
}
func (db *DB) pragmaForeignKeys(enabled bool) error {
var (
value int

@ -17,6 +17,7 @@ import (
"gorm.io/gorm/clause"
"projects.blender.org/studio/flamenco/internal/manager/job_compilers"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -252,19 +253,20 @@ func (db *DB) storeAuthoredJobTaks(
// FetchJob fetches a single job, without fetching its tasks.
func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
dbJob := Job{}
findResult := db.gormDB.WithContext(ctx).
Limit(1).
Preload("WorkerTag").
Find(&dbJob, "uuid = ?", jobUUID)
if findResult.Error != nil {
return nil, jobError(findResult.Error, "fetching job")
}
if dbJob.ID == 0 {
return nil, ErrJobNotFound
queries, err := db.queries()
if err != nil {
return nil, err
}
return &dbJob, nil
sqlcJob, err := queries.FetchJob(ctx, jobUUID)
switch {
case errors.Is(err, sql.ErrNoRows):
return nil, ErrJobNotFound
case err != nil:
return nil, jobError(err, "fetching job")
}
return convertSqlcJob(sqlcJob)
}
// DeleteJob deletes a job from the database.
@ -279,24 +281,39 @@ func (db *DB) DeleteJob(ctx context.Context, jobUUID string) error {
return ErrDeletingWithoutFK
}
tx := db.gormDB.WithContext(ctx).
Where("uuid = ?", jobUUID).
Delete(&Job{})
if tx.Error != nil {
return jobError(tx.Error, "deleting job")
queries, err := db.queries()
if err != nil {
return err
}
if err := queries.DeleteJob(ctx, jobUUID); err != nil {
return jobError(err, "deleting job")
}
return nil
}
// RequestJobDeletion sets the job's "DeletionRequestedAt" field to "now".
func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error {
queries, err := db.queries()
if err != nil {
return err
}
// Update the given job itself, so we don't have to re-fetch it from the database.
j.DeleteRequestedAt.Time = db.gormDB.NowFunc()
j.DeleteRequestedAt.Valid = true
tx := db.gormDB.WithContext(ctx).
Model(j).
Updates(Job{DeleteRequestedAt: j.DeleteRequestedAt})
if tx.Error != nil {
return jobError(tx.Error, "queueing job for deletion")
params := sqlc.RequestJobDeletionParams{
Now: j.DeleteRequestedAt,
JobID: int64(j.ID),
}
log.Trace().
Str("job", j.UUID).
Time("deletedAt", params.Now.Time).
Msg("database: marking job as deletion-requested")
if err := queries.RequestJobDeletion(ctx, params); err != nil {
return jobError(err, "queueing job for deletion")
}
return nil
}
@ -713,3 +730,42 @@ func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, err
return workers, tx.Error
}
// convertSqlcJob converts a job from the SQLC-generated model to the model
// expected by the rest of the code. This is mostly in place to aid in the GORM
// to SQLC migration. It is intended that eventually the rest of the code will
// use the same SQLC-generated model.
func convertSqlcJob(job sqlc.Job) (*Job, error) {
dbJob := Job{
Model: Model{
ID: uint(job.ID),
CreatedAt: job.CreatedAt,
UpdatedAt: job.UpdatedAt.Time,
},
UUID: job.Uuid,
Name: job.Name,
JobType: job.JobType,
Priority: int(job.Priority),
Status: api.JobStatus(job.Status),
Activity: job.Activity,
DeleteRequestedAt: job.DeleteRequestedAt,
Storage: JobStorageInfo{
ShamanCheckoutID: job.StorageShamanCheckoutID,
},
}
if err := json.Unmarshal(job.Settings, &dbJob.Settings); err != nil {
return nil, jobError(err, fmt.Sprintf("job %s has invalid settings: %v", job.Uuid, err))
}
if err := json.Unmarshal(job.Metadata, &dbJob.Metadata); err != nil {
return nil, jobError(err, fmt.Sprintf("job %s has invalid metadata: %v", job.Uuid, err))
}
if job.WorkerTagID.Valid {
workerTagID := uint(job.WorkerTagID.Int64)
dbJob.WorkerTagID = &workerTagID
}
return &dbJob, nil
}

@ -288,10 +288,10 @@ func TestFetchJobsDeletionRequested(t *testing.T) {
// Ensure different requests get different timestamps,
// out of chronological order.
timestamps := []time.Time{
// timestamps for 'delete requested at' and 'updated at'
now.Add(-3 * time.Second), now.Add(-3 * time.Second),
now.Add(-1 * time.Second), now.Add(-1 * time.Second),
now.Add(-5 * time.Second), now.Add(-5 * time.Second),
// timestamps for 'delete requested at'.
now.Add(-3 * time.Second),
now.Add(-1 * time.Second),
now.Add(-5 * time.Second),
}
currentTimestampIndex := 0
db.gormDB.NowFunc = func() time.Time {

@ -0,0 +1,15 @@
-- Drop tables that were in use in beta versions of Flamenco. These might exist
-- in developer databases, as well as databases of studios following the `main`
-- branch, such as Blender Studio.
--
-- WARNING: this migration simply drops the tables. Their data is erased, and
-- cannot be brought back by rolling the migration back.
--
-- +goose Up
DROP INDEX IF EXISTS `idx_worker_clusters_uuid`;
DROP TABLE IF EXISTS `worker_cluster_membership`;
DROP TABLE IF EXISTS `worker_clusters`;
-- +goose Down
-- Do not recreate these tables, as no release of Flamenco ever used them.
-- Also their contents wouldn't be brought back anyway.

@ -0,0 +1,371 @@
-- GORM automigration wasn't smart, and thus the database had more nullable
-- columns than necessary. This migration makes columns that should never be
-- NULL actually NOT NULL.
--
-- Since this migration recreates all tables in the database, this is now also
-- done in a way that makes the schema more compatible with sqlc (which is
-- mostly removing various quotes and backticks, and replacing char(N) with
-- varchar(N)). sqlc is the tool that'll replace GORM.
--
-- +goose Up
CREATE TABLE temp_last_rendereds (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
job_id integer DEFAULT 0 NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_last_rendereds_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE
);
INSERT INTO temp_last_rendereds SELECT * FROM last_rendereds;
DROP TABLE last_rendereds;
ALTER TABLE temp_last_rendereds RENAME TO last_rendereds;
CREATE TABLE temp_task_dependencies (
task_id integer NOT NULL,
dependency_id integer NOT NULL,
PRIMARY KEY (task_id, dependency_id),
CONSTRAINT fk_task_dependencies_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
CONSTRAINT fk_task_dependencies_dependencies FOREIGN KEY (dependency_id) REFERENCES tasks(id) ON DELETE CASCADE
);
INSERT INTO temp_task_dependencies SELECT * FROM task_dependencies;
DROP TABLE task_dependencies;
ALTER TABLE temp_task_dependencies RENAME TO task_dependencies;
CREATE TABLE temp_task_failures (
created_at datetime NOT NULL,
task_id integer NOT NULL,
worker_id integer NOT NULL,
PRIMARY KEY (task_id, worker_id),
CONSTRAINT fk_task_failures_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
CONSTRAINT fk_task_failures_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE
);
INSERT INTO temp_task_failures SELECT * FROM task_failures;
DROP TABLE task_failures;
ALTER TABLE temp_task_failures RENAME TO task_failures;
CREATE TABLE temp_worker_tag_membership (
worker_tag_id integer NOT NULL,
worker_id integer NOT NULL,
PRIMARY KEY (worker_tag_id, worker_id),
CONSTRAINT fk_worker_tag_membership_worker_tag FOREIGN KEY (worker_tag_id) REFERENCES worker_tags(id) ON DELETE CASCADE,
CONSTRAINT fk_worker_tag_membership_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE
);
INSERT INTO temp_worker_tag_membership SELECT * FROM worker_tag_membership;
DROP TABLE worker_tag_membership;
ALTER TABLE temp_worker_tag_membership RENAME TO worker_tag_membership;
CREATE TABLE temp_worker_tags (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
uuid varchar(36) UNIQUE DEFAULT '' NOT NULL,
name varchar(64) UNIQUE DEFAULT '' NOT NULL,
description varchar(255) DEFAULT '' NOT NULL,
PRIMARY KEY (id)
);
INSERT INTO temp_worker_tags SELECT * FROM worker_tags;
DROP TABLE worker_tags;
ALTER TABLE temp_worker_tags RENAME TO worker_tags;
CREATE TABLE temp_jobs (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
uuid varchar(36) UNIQUE DEFAULT '' NOT NULL,
name varchar(64) DEFAULT '' NOT NULL,
job_type varchar(32) DEFAULT '' NOT NULL,
priority smallint DEFAULT 0 NOT NULL,
status varchar(32) DEFAULT '' NOT NULL,
activity varchar(255) DEFAULT '' NOT NULL,
settings jsonb NOT NULL,
metadata jsonb NOT NULL,
delete_requested_at datetime,
storage_shaman_checkout_id varchar(255) DEFAULT '' NOT NULL,
worker_tag_id integer,
PRIMARY KEY (id),
CONSTRAINT fk_jobs_worker_tag FOREIGN KEY (worker_tag_id) REFERENCES worker_tags(id) ON DELETE SET NULL
);
-- This is using an explicit set of columns, as my development machine had an
-- extra column in there that caused errors. If anybody ever ran a beta where
-- the `worker_tag_id` was called `worker_cluster_id`, they'd have that extra
-- column too.
INSERT INTO temp_jobs SELECT
id,
created_at,
updated_at,
uuid,
name,
job_type,
priority,
status,
activity,
settings,
metadata,
delete_requested_at,
storage_shaman_checkout_id,
worker_tag_id
FROM jobs;
DROP TABLE jobs;
ALTER TABLE temp_jobs RENAME TO jobs;
CREATE TABLE temp_workers (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
uuid varchar(36) UNIQUE DEFAULT '' NOT NULL,
secret varchar(255) DEFAULT '' NOT NULL,
name varchar(64) DEFAULT '' NOT NULL,
address varchar(39) DEFAULT '' NOT NULL,
platform varchar(16) DEFAULT '' NOT NULL,
software varchar(32) DEFAULT '' NOT NULL,
status varchar(16) DEFAULT '' NOT NULL,
last_seen_at datetime,
status_requested varchar(16) DEFAULT '' NOT NULL,
lazy_status_request smallint DEFAULT false NOT NULL,
supported_task_types varchar(255) DEFAULT '' NOT NULL,
deleted_at datetime,
can_restart smallint DEFAULT false NOT NULL,
PRIMARY KEY (id)
);
UPDATE workers SET supported_task_types = '' where supported_task_types is NULL;
INSERT INTO temp_workers SELECT * FROM workers;
DROP TABLE workers;
ALTER TABLE temp_workers RENAME TO workers;
CREATE TABLE temp_job_blocks (
id integer NOT NULL,
created_at datetime NOT NULL,
job_id integer DEFAULT 0 NOT NULL,
worker_id integer DEFAULT 0 NOT NULL,
task_type text NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_job_blocks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE,
CONSTRAINT fk_job_blocks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE
);
INSERT INTO temp_job_blocks SELECT * FROM job_blocks;
DROP TABLE job_blocks;
ALTER TABLE temp_job_blocks RENAME TO job_blocks;
CREATE TABLE temp_sleep_schedules (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
worker_id integer UNIQUE DEFAULT 0 NOT NULL,
is_active numeric DEFAULT false NOT NULL,
days_of_week text DEFAULT '' NOT NULL,
start_time text DEFAULT '' NOT NULL,
end_time text DEFAULT '' NOT NULL,
next_check datetime,
PRIMARY KEY (id),
CONSTRAINT fk_sleep_schedules_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE
);
INSERT INTO temp_sleep_schedules SELECT * FROM sleep_schedules;
DROP TABLE sleep_schedules;
ALTER TABLE temp_sleep_schedules RENAME TO sleep_schedules;
CREATE TABLE temp_tasks (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
uuid varchar(36) UNIQUE DEFAULT '' NOT NULL,
name varchar(64) DEFAULT '' NOT NULL,
type varchar(32) DEFAULT '' NOT NULL,
job_id integer DEFAULT 0 NOT NULL,
priority smallint DEFAULT 50 NOT NULL,
status varchar(16) DEFAULT '' NOT NULL,
worker_id integer,
last_touched_at datetime,
commands jsonb NOT NULL,
activity varchar(255) DEFAULT '' NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_tasks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE,
CONSTRAINT fk_tasks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE SET NULL
);
INSERT INTO temp_tasks SELECT * FROM tasks;
DROP TABLE tasks;
ALTER TABLE temp_tasks RENAME TO tasks;
-- Recreate the indices on the new tables.
CREATE INDEX idx_worker_tags_uuid ON worker_tags(uuid);
CREATE INDEX idx_jobs_uuid ON jobs(uuid);
CREATE INDEX idx_workers_address ON workers(address);
CREATE INDEX idx_workers_last_seen_at ON workers(last_seen_at);
CREATE INDEX idx_workers_deleted_at ON workers(deleted_at);
CREATE INDEX idx_workers_uuid ON workers(uuid);
CREATE UNIQUE INDEX job_worker_tasktype ON job_blocks(job_id, worker_id, task_type);
CREATE INDEX idx_sleep_schedules_is_active ON sleep_schedules(is_active);
CREATE INDEX idx_sleep_schedules_worker_id ON sleep_schedules(worker_id);
CREATE INDEX idx_tasks_uuid ON tasks(uuid);
CREATE INDEX idx_tasks_last_touched_at ON tasks(last_touched_at);
-- +goose Down
CREATE TABLE `temp_last_rendereds` (
`id` integer,
`created_at` datetime,
`updated_at` datetime,
`job_id` integer DEFAULT 0,
PRIMARY KEY (`id`),
CONSTRAINT `fk_last_rendereds_job` FOREIGN KEY (`job_id`) REFERENCES `jobs`(`id`) ON DELETE CASCADE
);
INSERT INTO temp_last_rendereds SELECT * FROM last_rendereds;
DROP TABLE last_rendereds;
ALTER TABLE temp_last_rendereds RENAME TO `last_rendereds`;
CREATE TABLE `temp_task_dependencies` (
`task_id` integer,
`dependency_id` integer,
PRIMARY KEY (`task_id`, `dependency_id`),
CONSTRAINT `fk_task_dependencies_task` FOREIGN KEY (`task_id`) REFERENCES `tasks`(`id`) ON DELETE CASCADE,
CONSTRAINT `fk_task_dependencies_dependencies` FOREIGN KEY (`dependency_id`) REFERENCES `tasks`(`id`) ON DELETE CASCADE
);
INSERT INTO temp_task_dependencies SELECT * FROM task_dependencies;
DROP TABLE task_dependencies;
ALTER TABLE temp_task_dependencies RENAME TO `task_dependencies`;
CREATE TABLE `temp_task_failures` (
`created_at` datetime,
`task_id` integer,
`worker_id` integer,
PRIMARY KEY (`task_id`, `worker_id`),
CONSTRAINT `fk_task_failures_task` FOREIGN KEY (`task_id`) REFERENCES `tasks`(`id`) ON DELETE CASCADE,
CONSTRAINT `fk_task_failures_worker` FOREIGN KEY (`worker_id`) REFERENCES `workers`(`id`) ON DELETE CASCADE
);
INSERT INTO temp_task_failures SELECT * FROM task_failures;
DROP TABLE task_failures;
ALTER TABLE temp_task_failures RENAME TO `task_failures`;
CREATE TABLE `temp_worker_tag_membership` (
`worker_tag_id` integer,
`worker_id` integer,
PRIMARY KEY (`worker_tag_id`, `worker_id`),
CONSTRAINT `fk_worker_tag_membership_worker_tag` FOREIGN KEY (`worker_tag_id`) REFERENCES `worker_tags`(`id`) ON DELETE CASCADE,
CONSTRAINT `fk_worker_tag_membership_worker` FOREIGN KEY (`worker_id`) REFERENCES `workers`(`id`) ON DELETE CASCADE
);
INSERT INTO temp_worker_tag_membership SELECT * FROM worker_tag_membership;
DROP TABLE worker_tag_membership;
ALTER TABLE temp_worker_tag_membership RENAME TO `worker_tag_membership`;
CREATE TABLE "temp_worker_tags" (
`id` integer,
`created_at` datetime,
`updated_at` datetime,
`uuid` char(36) UNIQUE DEFAULT "",
`name` varchar(64) UNIQUE DEFAULT "",
`description` varchar(255) DEFAULT "",
PRIMARY KEY (`id`)
);
INSERT INTO temp_worker_tags SELECT * FROM worker_tags;
DROP TABLE worker_tags;
ALTER TABLE temp_worker_tags RENAME TO `worker_tags`;
CREATE TABLE "temp_jobs" (
`id` integer,
`created_at` datetime,
`updated_at` datetime,
`uuid` char(36) UNIQUE DEFAULT "",
`name` varchar(64) DEFAULT "",
`job_type` varchar(32) DEFAULT "",
`priority` smallint DEFAULT 0,
`status` varchar(32) DEFAULT "",
`activity` varchar(255) DEFAULT "",
`settings` jsonb,
`metadata` jsonb,
`delete_requested_at` datetime,
`storage_shaman_checkout_id` varchar(255) DEFAULT "",
`worker_tag_id` integer,
PRIMARY KEY(`id`),
CONSTRAINT `fk_jobs_worker_tag` FOREIGN KEY(`worker_tag_id`) REFERENCES `worker_tags`(`id`) ON DELETE SET NULL
);
INSERT INTO temp_jobs SELECT * FROM jobs;
DROP TABLE jobs;
ALTER TABLE temp_jobs RENAME TO `jobs`;
CREATE TABLE "temp_workers" (
`id` integer,
`created_at` datetime,
`updated_at` datetime,
`deleted_at` datetime,
`uuid` char(36) UNIQUE DEFAULT "",
`secret` varchar(255) DEFAULT "",
`name` varchar(64) DEFAULT "",
`address` varchar(39) DEFAULT "",
`platform` varchar(16) DEFAULT "",
`software` varchar(32) DEFAULT "",
`status` varchar(16) DEFAULT "",
`last_seen_at` datetime,
`status_requested` varchar(16) DEFAULT "",
`lazy_status_request` smallint DEFAULT false,
`supported_task_types` varchar(255) DEFAULT "",
`can_restart` smallint DEFAULT false,
PRIMARY KEY (`id`)
);
INSERT INTO temp_workers SELECT * FROM workers;
DROP TABLE workers;
ALTER TABLE temp_workers RENAME TO `workers`;
CREATE TABLE "temp_job_blocks" (
`id` integer,
`created_at` datetime,
`job_id` integer DEFAULT 0,
`worker_id` integer DEFAULT 0,
`task_type` text,
PRIMARY KEY (`id`),
CONSTRAINT `fk_job_blocks_job` FOREIGN KEY (`job_id`) REFERENCES `jobs`(`id`) ON DELETE CASCADE,
CONSTRAINT `fk_job_blocks_worker` FOREIGN KEY (`worker_id`) REFERENCES `workers`(`id`) ON DELETE CASCADE
);
INSERT INTO temp_job_blocks SELECT * FROM job_blocks;
DROP TABLE job_blocks;
ALTER TABLE temp_job_blocks RENAME TO `job_blocks`;
CREATE TABLE "temp_sleep_schedules" (
`id` integer,
`created_at` datetime,
`updated_at` datetime,
`worker_id` integer UNIQUE DEFAULT 0,
`is_active` numeric DEFAULT false,
`days_of_week` text DEFAULT "",
`start_time` text DEFAULT "",
`end_time` text DEFAULT "",
`next_check` datetime,
PRIMARY KEY (`id`),
CONSTRAINT `fk_sleep_schedules_worker` FOREIGN KEY (`worker_id`) REFERENCES `workers`(`id`) ON DELETE CASCADE
);
INSERT INTO temp_sleep_schedules SELECT * FROM sleep_schedules;
DROP TABLE sleep_schedules;
ALTER TABLE temp_sleep_schedules RENAME TO `sleep_schedules`;
CREATE TABLE "temp_tasks" (
`id` integer,
`created_at` datetime,
`updated_at` datetime,
`uuid` char(36) UNIQUE DEFAULT "",
`name` varchar(64) DEFAULT "",
`type` varchar(32) DEFAULT "",
`job_id` integer DEFAULT 0,
`priority` smallint DEFAULT 50,
`status` varchar(16) DEFAULT "",
`worker_id` integer,
`last_touched_at` datetime,
`commands` jsonb,
`activity` varchar(255) DEFAULT "",
PRIMARY KEY (`id`),
CONSTRAINT `fk_tasks_job` FOREIGN KEY (`job_id`) REFERENCES `jobs`(`id`) ON DELETE CASCADE,
CONSTRAINT `fk_tasks_worker` FOREIGN KEY (`worker_id`) REFERENCES `workers`(`id`) ON DELETE
SET NULL
);
INSERT INTO temp_tasks SELECT * FROM tasks;
DROP TABLE tasks;
ALTER TABLE temp_tasks RENAME TO `tasks`;
CREATE INDEX `idx_worker_tags_uuid` ON `worker_tags`(`uuid`);
CREATE INDEX `idx_jobs_uuid` ON `jobs`(`uuid`);
CREATE INDEX `idx_workers_address` ON `workers`(`address`);
CREATE INDEX `idx_workers_last_seen_at` ON `workers`(`last_seen_at`);
CREATE INDEX `idx_workers_deleted_at` ON `workers`(`deleted_at`);
CREATE INDEX `idx_workers_uuid` ON `workers`(`uuid`);
CREATE UNIQUE INDEX `job_worker_tasktype` ON `job_blocks`(`job_id`, `worker_id`, `task_type`);
CREATE INDEX `idx_sleep_schedules_is_active` ON `sleep_schedules`(`is_active`);
CREATE INDEX `idx_sleep_schedules_worker_id` ON `sleep_schedules`(`worker_id`);
CREATE INDEX `idx_tasks_uuid` ON `tasks`(`uuid`);
CREATE INDEX `idx_tasks_last_touched_at` ON `tasks`(`last_touched_at`);

@ -0,0 +1,31 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.25.0
package sqlc
import (
"context"
"database/sql"
)
type DBTX interface {
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
PrepareContext(context.Context, string) (*sql.Stmt, error)
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}
func New(db DBTX) *Queries {
return &Queries{db: db}
}
type Queries struct {
db DBTX
}
func (q *Queries) WithTx(tx *sql.Tx) *Queries {
return &Queries{
db: tx,
}
}

@ -0,0 +1,115 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.25.0
package sqlc
import (
"database/sql"
"encoding/json"
"time"
)
type Job struct {
ID int64
CreatedAt time.Time
UpdatedAt sql.NullTime
Uuid string
Name string
JobType string
Priority int64
Status string
Activity string
Settings json.RawMessage
Metadata json.RawMessage
DeleteRequestedAt sql.NullTime
StorageShamanCheckoutID string
WorkerTagID sql.NullInt64
}
type JobBlock struct {
ID int64
CreatedAt time.Time
JobID int64
WorkerID int64
TaskType string
}
type LastRendered struct {
ID int64
CreatedAt time.Time
UpdatedAt sql.NullTime
JobID int64
}
type SleepSchedule struct {
ID int64
CreatedAt time.Time
UpdatedAt sql.NullTime
WorkerID int64
IsActive float64
DaysOfWeek string
StartTime string
EndTime string
NextCheck sql.NullTime
}
type Task struct {
ID int64
CreatedAt time.Time
UpdatedAt sql.NullTime
Uuid string
Name string
Type string
JobID int64
Priority int64
Status string
WorkerID sql.NullInt64
LastTouchedAt sql.NullTime
Commands json.RawMessage
Activity string
}
type TaskDependency struct {
TaskID int64
DependencyID int64
}
type TaskFailure struct {
CreatedAt time.Time
TaskID int64
WorkerID int64
}
type Worker struct {
ID int64
CreatedAt time.Time
UpdatedAt sql.NullTime
Uuid string
Secret string
Name string
Address string
Platform string
Software string
Status string
LastSeenAt sql.NullTime
StatusRequested string
LazyStatusRequest int64
SupportedTaskTypes string
DeletedAt sql.NullTime
CanRestart int64
}
type WorkerTag struct {
ID int64
CreatedAt time.Time
UpdatedAt sql.NullTime
Uuid string
Name string
Description string
}
type WorkerTagMembership struct {
WorkerTagID int64
WorkerID int64
}

@ -0,0 +1,35 @@
-- Jobs / Tasks queries
--
-- name: CreateJob :exec
INSERT INTO jobs (
created_at,
uuid,
name,
job_type,
priority,
status,
activity,
settings,
metadata,
storage_shaman_checkout_id
)
VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? );
-- name: DeleteJob :exec
DELETE FROM jobs WHERE uuid = ?;
-- name: RequestJobDeletion :exec
UPDATE jobs SET
updated_at = @now,
delete_requested_at = @now
WHERE id = sqlc.arg('job_id');
-- name: FetchTask :one
SELECT * FROM tasks
WHERE uuid = ? LIMIT 1;
-- name: FetchJob :one
SELECT * FROM jobs
WHERE uuid = ? LIMIT 1;

@ -0,0 +1,139 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.25.0
// source: query.sql
package sqlc
import (
"context"
"database/sql"
"encoding/json"
"time"
)
const createJob = `-- name: CreateJob :exec
INSERT INTO jobs (
created_at,
uuid,
name,
job_type,
priority,
status,
activity,
settings,
metadata,
storage_shaman_checkout_id
)
VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )
`
type CreateJobParams struct {
CreatedAt time.Time
Uuid string
Name string
JobType string
Priority int64
Status string
Activity string
Settings json.RawMessage
Metadata json.RawMessage
StorageShamanCheckoutID string
}
// Jobs / Tasks queries
func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error {
_, err := q.db.ExecContext(ctx, createJob,
arg.CreatedAt,
arg.Uuid,
arg.Name,
arg.JobType,
arg.Priority,
arg.Status,
arg.Activity,
arg.Settings,
arg.Metadata,
arg.StorageShamanCheckoutID,
)
return err
}
const deleteJob = `-- name: DeleteJob :exec
DELETE FROM jobs WHERE uuid = ?
`
func (q *Queries) DeleteJob(ctx context.Context, uuid string) error {
_, err := q.db.ExecContext(ctx, deleteJob, uuid)
return err
}
const fetchJob = `-- name: FetchJob :one
SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs
WHERE uuid = ? LIMIT 1
`
func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) {
row := q.db.QueryRowContext(ctx, fetchJob, uuid)
var i Job
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Uuid,
&i.Name,
&i.JobType,
&i.Priority,
&i.Status,
&i.Activity,
&i.Settings,
&i.Metadata,
&i.DeleteRequestedAt,
&i.StorageShamanCheckoutID,
&i.WorkerTagID,
)
return i, err
}
const fetchTask = `-- name: FetchTask :one
SELECT id, created_at, updated_at, uuid, name, type, job_id, priority, status, worker_id, last_touched_at, commands, activity FROM tasks
WHERE uuid = ? LIMIT 1
`
func (q *Queries) FetchTask(ctx context.Context, uuid string) (Task, error) {
row := q.db.QueryRowContext(ctx, fetchTask, uuid)
var i Task
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Uuid,
&i.Name,
&i.Type,
&i.JobID,
&i.Priority,
&i.Status,
&i.WorkerID,
&i.LastTouchedAt,
&i.Commands,
&i.Activity,
)
return i, err
}
const requestJobDeletion = `-- name: RequestJobDeletion :exec
UPDATE jobs SET
updated_at = ?1,
delete_requested_at = ?1
WHERE id = ?2
`
type RequestJobDeletionParams struct {
Now sql.NullTime
JobID int64
}
func (q *Queries) RequestJobDeletion(ctx context.Context, arg RequestJobDeletionParams) error {
_, err := q.db.ExecContext(ctx, requestJobDeletion, arg.Now, arg.JobID)
return err
}

@ -0,0 +1,128 @@
CREATE TABLE job_blocks (
id integer NOT NULL,
created_at datetime NOT NULL,
job_id integer DEFAULT 0 NOT NULL,
worker_id integer DEFAULT 0 NOT NULL,
task_type text NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_job_blocks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE,
CONSTRAINT fk_job_blocks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE
);
CREATE TABLE jobs (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
uuid varchar(36) UNIQUE DEFAULT '' NOT NULL,
name varchar(64) DEFAULT '' NOT NULL,
job_type varchar(32) DEFAULT '' NOT NULL,
priority smallint DEFAULT 0 NOT NULL,
status varchar(32) DEFAULT '' NOT NULL,
activity varchar(255) DEFAULT '' NOT NULL,
settings jsonb NOT NULL,
metadata jsonb NOT NULL,
delete_requested_at datetime,
storage_shaman_checkout_id varchar(255) DEFAULT '' NOT NULL,
worker_tag_id integer,
PRIMARY KEY (id),
CONSTRAINT fk_jobs_worker_tag FOREIGN KEY (worker_tag_id) REFERENCES worker_tags(id) ON DELETE SET NULL
);
CREATE TABLE last_rendereds (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
job_id integer DEFAULT 0 NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_last_rendereds_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE
);
CREATE TABLE sleep_schedules (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
worker_id integer UNIQUE DEFAULT 0 NOT NULL,
is_active numeric DEFAULT false NOT NULL,
days_of_week text DEFAULT '' NOT NULL,
start_time text DEFAULT '' NOT NULL,
end_time text DEFAULT '' NOT NULL,
next_check datetime,
PRIMARY KEY (id),
CONSTRAINT fk_sleep_schedules_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE
);
CREATE TABLE task_dependencies (
task_id integer NOT NULL,
dependency_id integer NOT NULL,
PRIMARY KEY (task_id, dependency_id),
CONSTRAINT fk_task_dependencies_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
CONSTRAINT fk_task_dependencies_dependencies FOREIGN KEY (dependency_id) REFERENCES tasks(id) ON DELETE CASCADE
);
CREATE TABLE task_failures (
created_at datetime NOT NULL,
task_id integer NOT NULL,
worker_id integer NOT NULL,
PRIMARY KEY (task_id, worker_id),
CONSTRAINT fk_task_failures_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
CONSTRAINT fk_task_failures_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE
);
CREATE TABLE tasks (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
uuid varchar(36) UNIQUE DEFAULT '' NOT NULL,
name varchar(64) DEFAULT '' NOT NULL,
type varchar(32) DEFAULT '' NOT NULL,
job_id integer DEFAULT 0 NOT NULL,
priority smallint DEFAULT 50 NOT NULL,
status varchar(16) DEFAULT '' NOT NULL,
worker_id integer,
last_touched_at datetime,
commands jsonb NOT NULL,
activity varchar(255) DEFAULT '' NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_tasks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE,
CONSTRAINT fk_tasks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE SET NULL
);
CREATE TABLE worker_tag_membership (
worker_tag_id integer NOT NULL,
worker_id integer NOT NULL,
PRIMARY KEY (worker_tag_id, worker_id),
CONSTRAINT fk_worker_tag_membership_worker_tag FOREIGN KEY (worker_tag_id) REFERENCES worker_tags(id) ON DELETE CASCADE,
CONSTRAINT fk_worker_tag_membership_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE
);
CREATE TABLE worker_tags (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
uuid varchar(36) UNIQUE DEFAULT '' NOT NULL,
name varchar(64) UNIQUE DEFAULT '' NOT NULL,
description varchar(255) DEFAULT '' NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE workers (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
uuid varchar(36) UNIQUE DEFAULT '' NOT NULL,
secret varchar(255) DEFAULT '' NOT NULL,
name varchar(64) DEFAULT '' NOT NULL,
address varchar(39) DEFAULT '' NOT NULL,
platform varchar(16) DEFAULT '' NOT NULL,
software varchar(32) DEFAULT '' NOT NULL,
status varchar(16) DEFAULT '' NOT NULL,
last_seen_at datetime,
status_requested varchar(16) DEFAULT '' NOT NULL,
lazy_status_request smallint DEFAULT false NOT NULL,
supported_task_types varchar(255) DEFAULT '' NOT NULL,
deleted_at datetime,
can_restart smallint DEFAULT false NOT NULL,
PRIMARY KEY (id)
);
CREATE INDEX idx_jobs_uuid ON jobs(uuid);
CREATE INDEX idx_sleep_schedules_is_active ON sleep_schedules(is_active);
CREATE INDEX idx_sleep_schedules_worker_id ON sleep_schedules(worker_id);
CREATE INDEX idx_tasks_last_touched_at ON tasks(last_touched_at);
CREATE INDEX idx_tasks_uuid ON tasks(uuid);
CREATE INDEX idx_worker_tags_uuid ON worker_tags(uuid);
CREATE INDEX idx_workers_address ON workers(address);
CREATE INDEX idx_workers_deleted_at ON workers(deleted_at);
CREATE INDEX idx_workers_last_seen_at ON workers(last_seen_at);
CREATE INDEX idx_workers_uuid ON workers(uuid);
CREATE UNIQUE INDEX job_worker_tasktype ON job_blocks(job_id, worker_id, task_type);

13
sqlc.yaml Normal file

@ -0,0 +1,13 @@
version: "2"
sql:
- engine: "sqlite"
schema: "internal/manager/persistence/sqlc/schema.sql"
queries: "internal/manager/persistence/sqlc/query.sql"
gen:
go:
out: "internal/manager/persistence/sqlc"
overrides:
- db_type: "jsonb"
go_type:
import: "encoding/json"
type: "RawMessage"

@ -3,10 +3,27 @@ title: Database
weight: 50
---
Flamenco Manager and Worker use SQLite as database, and Gorm as
object-relational mapper.
Flamenco Manager and Worker use SQLite as database, and GORM as
object-relational mapper (but see the note below).
Since SQLite has limited support for altering table schemas, migration requires
copying old data to a temporary table with the new schema, then swap out the
tables. Because of this, avoid `NOT NULL` columns, as they will be problematic
in this process.
## SQLC
Flamenco mostly uses [GORM][gorm] for interfacing with its SQLite database. This
is gradually being phased out, to be replaced with [SQLC][sqlc].
To generate the SQLC schema file:
```sh
make db-migrate-up
go run ./cmd/sqlc-export-schema
```
To generate Go code with SQLC after changing `schema.sql` or `queries.sql`:
```sh
go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest
sqlc generate
```