Manager: convert storing authored jobs from gorm to sqlc

Convert the creation of new jobs & tasks in the database (which happens
by storing the 'authored job' + its tasks) from gorm to sqlc.
This commit is contained in:
Sybren A. Stüvel 2024-06-30 23:23:07 +02:00
parent bfe47ea394
commit 842d1ab9a4
6 changed files with 293 additions and 85 deletions

@ -198,6 +198,10 @@ type queriesTX struct {
// queries returns the SQLC Queries struct, connected to this database.
// It is intended that all GORM queries will be migrated to use this interface
// instead.
//
// After calling this function, all queries should use this transaction until it
// is closed (either committed or rolled back). Otherwise SQLite will deadlock,
// as it will make any other query wait until this transaction is done.
func (db *DB) queriesWithTX() (*queriesTX, error) {
sqldb, err := db.gormDB.DB()
if err != nil {

@ -147,44 +147,66 @@ type TaskFailure struct {
// StoreJob stores an AuthoredJob and its tasks, and saves it to the database.
// The job will be in 'under construction' status. It is up to the caller to transition it to its desired initial status.
func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error {
return db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// TODO: separate conversion of struct types from storing things in the database.
dbJob := Job{
UUID: authoredJob.JobID,
Name: authoredJob.Name,
JobType: authoredJob.JobType,
Status: authoredJob.Status,
Priority: authoredJob.Priority,
Settings: StringInterfaceMap(authoredJob.Settings),
Metadata: StringStringMap(authoredJob.Metadata),
Storage: JobStorageInfo{
ShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID,
},
// Run all queries in a single transaction.
qtx, err := db.queriesWithTX()
if err != nil {
return err
}
defer qtx.rollback()
// Serialise the embedded JSON.
settings, err := json.Marshal(authoredJob.Settings)
if err != nil {
return fmt.Errorf("converting job settings to JSON: %w", err)
}
metadata, err := json.Marshal(authoredJob.Metadata)
if err != nil {
return fmt.Errorf("converting job metadata to JSON: %w", err)
}
// Create the job itself.
params := sqlc.CreateJobParams{
CreatedAt: db.gormDB.NowFunc(),
UUID: authoredJob.JobID,
Name: authoredJob.Name,
JobType: authoredJob.JobType,
Priority: int64(authoredJob.Priority),
Status: string(authoredJob.Status),
Settings: settings,
Metadata: metadata,
StorageShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID,
}
if authoredJob.WorkerTagUUID != "" {
dbTag, err := qtx.queries.FetchWorkerTagByUUID(ctx, authoredJob.WorkerTagUUID)
switch {
case errors.Is(err, sql.ErrNoRows):
return fmt.Errorf("no worker tag %q found", authoredJob.WorkerTagUUID)
case err != nil:
return fmt.Errorf("could not find worker tag %q: %w", authoredJob.WorkerTagUUID, err)
}
params.WorkerTagID = sql.NullInt64{Int64: dbTag.WorkerTag.ID, Valid: true}
}
log.Debug().
Str("job", dbJob.UUID).
Str("type", dbJob.JobType).
Str("name", dbJob.Name).
Str("status", string(dbJob.Status)).
Msg("persistence: storing authored job")
log.Debug().
Str("job", params.UUID).
Str("type", params.JobType).
Str("name", params.Name).
Str("status", params.Status).
Msg("persistence: storing authored job")
// Find and assign the worker tag.
if authoredJob.WorkerTagUUID != "" {
dbTag, err := fetchWorkerTag(tx, authoredJob.WorkerTagUUID)
if err != nil {
return err
}
dbJob.WorkerTagID = &dbTag.ID
dbJob.WorkerTag = dbTag
}
jobID, err := qtx.queries.CreateJob(ctx, params)
if err != nil {
return jobError(err, "storing job")
}
if err := tx.Create(&dbJob).Error; err != nil {
return jobError(err, "storing job")
}
err = db.storeAuthoredJobTaks(ctx, qtx, jobID, &authoredJob)
if err != nil {
return err
}
return db.storeAuthoredJobTaks(ctx, tx, &dbJob, &authoredJob)
})
return qtx.commit()
}
// StoreAuthoredJobTaks is a low-level function that is only used for recreating an existing job's tasks.
@ -194,19 +216,41 @@ func (db *DB) StoreAuthoredJobTaks(
job *Job,
authoredJob *job_compilers.AuthoredJob,
) error {
tx := db.gormDB.WithContext(ctx)
return db.storeAuthoredJobTaks(ctx, tx, job, authoredJob)
qtx, err := db.queriesWithTX()
if err != nil {
return err
}
defer qtx.rollback()
err = db.storeAuthoredJobTaks(ctx, qtx, int64(job.ID), authoredJob)
if err != nil {
return err
}
return qtx.commit()
}
// storeAuthoredJobTaks stores the tasks of the authored job.
// Note that this function does NOT commit the database transaction. That is up
// to the caller.
func (db *DB) storeAuthoredJobTaks(
ctx context.Context,
tx *gorm.DB,
dbJob *Job,
qtx *queriesTX,
jobID int64,
authoredJob *job_compilers.AuthoredJob,
) error {
type TaskInfo struct {
ID int64
UUID string
Name string
}
uuidToTask := make(map[string]*Task)
// Give every task the same creation timestamp.
now := db.gormDB.NowFunc()
uuidToTask := make(map[string]TaskInfo)
for _, authoredTask := range authoredJob.Tasks {
// Marshal commands to JSON.
var commands []Command
for _, authoredCommand := range authoredTask.Commands {
commands = append(commands, Command{
@ -214,31 +258,41 @@ func (db *DB) storeAuthoredJobTaks(
Parameters: StringInterfaceMap(authoredCommand.Parameters),
})
}
commandsJSON, err := json.Marshal(commands)
if err != nil {
return fmt.Errorf("could not convert commands of task %q to JSON: %w",
authoredTask.Name, err)
}
dbTask := Task{
Name: authoredTask.Name,
Type: authoredTask.Type,
UUID: authoredTask.UUID,
Job: dbJob,
Priority: authoredTask.Priority,
Status: api.TaskStatusQueued,
Commands: commands,
taskParams := sqlc.CreateTaskParams{
CreatedAt: now,
Name: authoredTask.Name,
Type: authoredTask.Type,
UUID: authoredTask.UUID,
JobID: jobID,
Priority: int64(authoredTask.Priority),
Status: string(api.TaskStatusQueued),
Commands: commandsJSON,
// dependencies are stored below.
}
log.Debug().
Str("task", dbTask.UUID).
Str("job", dbJob.UUID).
Str("type", dbTask.Type).
Str("name", dbTask.Name).
Str("status", string(dbTask.Status)).
Str("task", taskParams.UUID).
Str("type", taskParams.Type).
Str("name", taskParams.Name).
Str("status", string(taskParams.Status)).
Msg("persistence: storing authored task")
if err := tx.Create(&dbTask).Error; err != nil {
taskID, err := qtx.queries.CreateTask(ctx, taskParams)
if err != nil {
return taskError(err, "storing task: %v", err)
}
uuidToTask[authoredTask.UUID] = &dbTask
uuidToTask[authoredTask.UUID] = TaskInfo{
ID: taskID,
UUID: taskParams.UUID,
Name: taskParams.Name,
}
}
// Store the dependencies between tasks.
@ -247,18 +301,27 @@ func (db *DB) storeAuthoredJobTaks(
continue
}
dbTask, ok := uuidToTask[authoredTask.UUID]
taskInfo, ok := uuidToTask[authoredTask.UUID]
if !ok {
return taskError(nil, "unable to find task %q in the database, even though it was just authored", authoredTask.UUID)
}
deps := make([]*Task, len(authoredTask.Dependencies))
for i, t := range authoredTask.Dependencies {
depTask, ok := uuidToTask[t.UUID]
deps := make([]*TaskInfo, len(authoredTask.Dependencies))
for idx, authoredDep := range authoredTask.Dependencies {
depTask, ok := uuidToTask[authoredDep.UUID]
if !ok {
return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID)
return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", authoredDep.UUID)
}
deps[i] = depTask
err := qtx.queries.StoreTaskDependency(ctx, sqlc.StoreTaskDependencyParams{
TaskID: taskInfo.ID,
DependencyID: depTask.ID,
})
if err != nil {
return taskError(err, "error storing task %q depending on task %q", authoredTask.UUID, depTask.UUID)
}
deps[idx] = &depTask
}
if log.Debug().Enabled() {
@ -267,26 +330,11 @@ func (db *DB) storeAuthoredJobTaks(
depNames[i] = dep.Name
}
log.Debug().
Str("task", dbTask.UUID).
Str("name", dbTask.Name).
Str("task", taskInfo.UUID).
Str("name", taskInfo.Name).
Strs("dependencies", depNames).
Msg("persistence: storing authored task dependencies")
}
dependenciesbatchsize := 1000
for j := 0; j < len(deps); j += dependenciesbatchsize {
end := j + dependenciesbatchsize
if end > len(deps) {
end = len(deps)
}
currentDeps := deps[j:end]
dbTask.Dependencies = currentDeps
tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID)
subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps})
if subQuery.Error != nil {
return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end)
}
}
}
return nil

@ -1,7 +1,8 @@
-- name: CreateJob :exec
-- name: CreateJob :execlastid
INSERT INTO jobs (
created_at,
updated_at,
uuid,
name,
job_type,
@ -10,9 +11,49 @@ INSERT INTO jobs (
activity,
settings,
metadata,
storage_shaman_checkout_id
storage_shaman_checkout_id,
worker_tag_id
)
VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? );
VALUES (
@created_at,
@created_at,
@uuid,
@name,
@job_type,
@priority,
@status,
@activity,
@settings,
@metadata,
@storage_shaman_checkout_id,
@worker_tag_id
);
-- name: CreateTask :execlastid
INSERT INTO tasks (
created_at,
updated_at,
uuid,
name,
type,
job_id,
priority,
status,
commands
) VALUES (
@created_at,
@created_at,
@uuid,
@name,
@type,
@job_id,
@priority,
@status,
@commands
);
-- name: StoreTaskDependency :exec
INSERT INTO task_dependencies (task_id, dependency_id) VALUES (@task_id, @dependency_id);
-- name: FetchJob :one
-- Fetch a job by its UUID.

@ -63,9 +63,10 @@ func (q *Queries) CountWorkersFailingTask(ctx context.Context, taskID int64) (in
return num_failed, err
}
const createJob = `-- name: CreateJob :exec
const createJob = `-- name: CreateJob :execlastid
INSERT INTO jobs (
created_at,
updated_at,
uuid,
name,
job_type,
@ -74,9 +75,23 @@ INSERT INTO jobs (
activity,
settings,
metadata,
storage_shaman_checkout_id
storage_shaman_checkout_id,
worker_tag_id
)
VALUES (
?1,
?1,
?2,
?3,
?4,
?5,
?6,
?7,
?8,
?9,
?10,
?11
)
VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )
`
type CreateJobParams struct {
@ -90,10 +105,11 @@ type CreateJobParams struct {
Settings json.RawMessage
Metadata json.RawMessage
StorageShamanCheckoutID string
WorkerTagID sql.NullInt64
}
func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error {
_, err := q.db.ExecContext(ctx, createJob,
func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) (int64, error) {
result, err := q.db.ExecContext(ctx, createJob,
arg.CreatedAt,
arg.UUID,
arg.Name,
@ -104,8 +120,64 @@ func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error {
arg.Settings,
arg.Metadata,
arg.StorageShamanCheckoutID,
arg.WorkerTagID,
)
return err
if err != nil {
return 0, err
}
return result.LastInsertId()
}
const createTask = `-- name: CreateTask :execlastid
INSERT INTO tasks (
created_at,
updated_at,
uuid,
name,
type,
job_id,
priority,
status,
commands
) VALUES (
?1,
?1,
?2,
?3,
?4,
?5,
?6,
?7,
?8
)
`
type CreateTaskParams struct {
CreatedAt time.Time
UUID string
Name string
Type string
JobID int64
Priority int64
Status string
Commands json.RawMessage
}
func (q *Queries) CreateTask(ctx context.Context, arg CreateTaskParams) (int64, error) {
result, err := q.db.ExecContext(ctx, createTask,
arg.CreatedAt,
arg.UUID,
arg.Name,
arg.Type,
arg.JobID,
arg.Priority,
arg.Status,
arg.Commands,
)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
const deleteJob = `-- name: DeleteJob :exec
@ -805,6 +877,20 @@ func (q *Queries) SetLastRendered(ctx context.Context, arg SetLastRenderedParams
return err
}
const storeTaskDependency = `-- name: StoreTaskDependency :exec
INSERT INTO task_dependencies (task_id, dependency_id) VALUES (?1, ?2)
`
type StoreTaskDependencyParams struct {
TaskID int64
DependencyID int64
}
func (q *Queries) StoreTaskDependency(ctx context.Context, arg StoreTaskDependencyParams) error {
_, err := q.db.ExecContext(ctx, storeTaskDependency, arg.TaskID, arg.DependencyID)
return err
}
const taskAssignToWorker = `-- name: TaskAssignToWorker :exec
UPDATE tasks SET
updated_at = ?1,

@ -60,6 +60,11 @@ LEFT JOIN worker_tag_membership m ON (m.worker_tag_id = worker_tags.id)
LEFT JOIN workers on (m.worker_id = workers.id)
WHERE workers.uuid = @uuid;
-- name: FetchWorkerTagByUUID :one
SELECT sqlc.embed(worker_tags)
FROM worker_tags
WHERE worker_tags.uuid = @uuid;
-- name: SoftDeleteWorker :execrows
UPDATE workers SET deleted_at=@deleted_at
WHERE uuid=@uuid;

@ -129,6 +129,30 @@ func (q *Queries) FetchWorker(ctx context.Context, uuid string) (Worker, error)
return i, err
}
const fetchWorkerTagByUUID = `-- name: FetchWorkerTagByUUID :one
SELECT worker_tags.id, worker_tags.created_at, worker_tags.updated_at, worker_tags.uuid, worker_tags.name, worker_tags.description
FROM worker_tags
WHERE worker_tags.uuid = ?1
`
type FetchWorkerTagByUUIDRow struct {
WorkerTag WorkerTag
}
func (q *Queries) FetchWorkerTagByUUID(ctx context.Context, uuid string) (FetchWorkerTagByUUIDRow, error) {
row := q.db.QueryRowContext(ctx, fetchWorkerTagByUUID, uuid)
var i FetchWorkerTagByUUIDRow
err := row.Scan(
&i.WorkerTag.ID,
&i.WorkerTag.CreatedAt,
&i.WorkerTag.UpdatedAt,
&i.WorkerTag.UUID,
&i.WorkerTag.Name,
&i.WorkerTag.Description,
)
return i, err
}
const fetchWorkerTags = `-- name: FetchWorkerTags :many
SELECT worker_tags.id, worker_tags.created_at, worker_tags.updated_at, worker_tags.uuid, worker_tags.name, worker_tags.description
FROM worker_tags