diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index 3d46a941..a515d6cc 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -198,6 +198,10 @@ type queriesTX struct { // queries returns the SQLC Queries struct, connected to this database. // It is intended that all GORM queries will be migrated to use this interface // instead. +// +// After calling this function, all queries should use this transaction until it +// is closed (either committed or rolled back). Otherwise SQLite will deadlock, +// as it will make any other query wait until this transaction is done. func (db *DB) queriesWithTX() (*queriesTX, error) { sqldb, err := db.gormDB.DB() if err != nil { diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 0a512db4..8e17e73b 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -147,44 +147,66 @@ type TaskFailure struct { // StoreJob stores an AuthoredJob and its tasks, and saves it to the database. // The job will be in 'under construction' status. It is up to the caller to transition it to its desired initial status. func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error { - return db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - // TODO: separate conversion of struct types from storing things in the database. - dbJob := Job{ - UUID: authoredJob.JobID, - Name: authoredJob.Name, - JobType: authoredJob.JobType, - Status: authoredJob.Status, - Priority: authoredJob.Priority, - Settings: StringInterfaceMap(authoredJob.Settings), - Metadata: StringStringMap(authoredJob.Metadata), - Storage: JobStorageInfo{ - ShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID, - }, + + // Run all queries in a single transaction. + qtx, err := db.queriesWithTX() + if err != nil { + return err + } + defer qtx.rollback() + + // Serialise the embedded JSON. + settings, err := json.Marshal(authoredJob.Settings) + if err != nil { + return fmt.Errorf("converting job settings to JSON: %w", err) + } + metadata, err := json.Marshal(authoredJob.Metadata) + if err != nil { + return fmt.Errorf("converting job metadata to JSON: %w", err) + } + + // Create the job itself. + params := sqlc.CreateJobParams{ + CreatedAt: db.gormDB.NowFunc(), + UUID: authoredJob.JobID, + Name: authoredJob.Name, + JobType: authoredJob.JobType, + Priority: int64(authoredJob.Priority), + Status: string(authoredJob.Status), + Settings: settings, + Metadata: metadata, + StorageShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID, + } + + if authoredJob.WorkerTagUUID != "" { + dbTag, err := qtx.queries.FetchWorkerTagByUUID(ctx, authoredJob.WorkerTagUUID) + switch { + case errors.Is(err, sql.ErrNoRows): + return fmt.Errorf("no worker tag %q found", authoredJob.WorkerTagUUID) + case err != nil: + return fmt.Errorf("could not find worker tag %q: %w", authoredJob.WorkerTagUUID, err) } + params.WorkerTagID = sql.NullInt64{Int64: dbTag.WorkerTag.ID, Valid: true} + } - log.Debug(). - Str("job", dbJob.UUID). - Str("type", dbJob.JobType). - Str("name", dbJob.Name). - Str("status", string(dbJob.Status)). - Msg("persistence: storing authored job") + log.Debug(). + Str("job", params.UUID). + Str("type", params.JobType). + Str("name", params.Name). + Str("status", params.Status). + Msg("persistence: storing authored job") - // Find and assign the worker tag. - if authoredJob.WorkerTagUUID != "" { - dbTag, err := fetchWorkerTag(tx, authoredJob.WorkerTagUUID) - if err != nil { - return err - } - dbJob.WorkerTagID = &dbTag.ID - dbJob.WorkerTag = dbTag - } + jobID, err := qtx.queries.CreateJob(ctx, params) + if err != nil { + return jobError(err, "storing job") + } - if err := tx.Create(&dbJob).Error; err != nil { - return jobError(err, "storing job") - } + err = db.storeAuthoredJobTaks(ctx, qtx, jobID, &authoredJob) + if err != nil { + return err + } - return db.storeAuthoredJobTaks(ctx, tx, &dbJob, &authoredJob) - }) + return qtx.commit() } // StoreAuthoredJobTaks is a low-level function that is only used for recreating an existing job's tasks. @@ -194,19 +216,41 @@ func (db *DB) StoreAuthoredJobTaks( job *Job, authoredJob *job_compilers.AuthoredJob, ) error { - tx := db.gormDB.WithContext(ctx) - return db.storeAuthoredJobTaks(ctx, tx, job, authoredJob) + qtx, err := db.queriesWithTX() + if err != nil { + return err + } + defer qtx.rollback() + + err = db.storeAuthoredJobTaks(ctx, qtx, int64(job.ID), authoredJob) + if err != nil { + return err + } + + return qtx.commit() } +// storeAuthoredJobTaks stores the tasks of the authored job. +// Note that this function does NOT commit the database transaction. That is up +// to the caller. func (db *DB) storeAuthoredJobTaks( ctx context.Context, - tx *gorm.DB, - dbJob *Job, + qtx *queriesTX, + jobID int64, authoredJob *job_compilers.AuthoredJob, ) error { + type TaskInfo struct { + ID int64 + UUID string + Name string + } - uuidToTask := make(map[string]*Task) + // Give every task the same creation timestamp. + now := db.gormDB.NowFunc() + + uuidToTask := make(map[string]TaskInfo) for _, authoredTask := range authoredJob.Tasks { + // Marshal commands to JSON. var commands []Command for _, authoredCommand := range authoredTask.Commands { commands = append(commands, Command{ @@ -214,31 +258,41 @@ func (db *DB) storeAuthoredJobTaks( Parameters: StringInterfaceMap(authoredCommand.Parameters), }) } + commandsJSON, err := json.Marshal(commands) + if err != nil { + return fmt.Errorf("could not convert commands of task %q to JSON: %w", + authoredTask.Name, err) + } - dbTask := Task{ - Name: authoredTask.Name, - Type: authoredTask.Type, - UUID: authoredTask.UUID, - Job: dbJob, - Priority: authoredTask.Priority, - Status: api.TaskStatusQueued, - Commands: commands, + taskParams := sqlc.CreateTaskParams{ + CreatedAt: now, + Name: authoredTask.Name, + Type: authoredTask.Type, + UUID: authoredTask.UUID, + JobID: jobID, + Priority: int64(authoredTask.Priority), + Status: string(api.TaskStatusQueued), + Commands: commandsJSON, // dependencies are stored below. } log.Debug(). - Str("task", dbTask.UUID). - Str("job", dbJob.UUID). - Str("type", dbTask.Type). - Str("name", dbTask.Name). - Str("status", string(dbTask.Status)). + Str("task", taskParams.UUID). + Str("type", taskParams.Type). + Str("name", taskParams.Name). + Str("status", string(taskParams.Status)). Msg("persistence: storing authored task") - if err := tx.Create(&dbTask).Error; err != nil { + taskID, err := qtx.queries.CreateTask(ctx, taskParams) + if err != nil { return taskError(err, "storing task: %v", err) } - uuidToTask[authoredTask.UUID] = &dbTask + uuidToTask[authoredTask.UUID] = TaskInfo{ + ID: taskID, + UUID: taskParams.UUID, + Name: taskParams.Name, + } } // Store the dependencies between tasks. @@ -247,18 +301,27 @@ func (db *DB) storeAuthoredJobTaks( continue } - dbTask, ok := uuidToTask[authoredTask.UUID] + taskInfo, ok := uuidToTask[authoredTask.UUID] if !ok { return taskError(nil, "unable to find task %q in the database, even though it was just authored", authoredTask.UUID) } - deps := make([]*Task, len(authoredTask.Dependencies)) - for i, t := range authoredTask.Dependencies { - depTask, ok := uuidToTask[t.UUID] + deps := make([]*TaskInfo, len(authoredTask.Dependencies)) + for idx, authoredDep := range authoredTask.Dependencies { + depTask, ok := uuidToTask[authoredDep.UUID] if !ok { - return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID) + return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", authoredDep.UUID) } - deps[i] = depTask + + err := qtx.queries.StoreTaskDependency(ctx, sqlc.StoreTaskDependencyParams{ + TaskID: taskInfo.ID, + DependencyID: depTask.ID, + }) + if err != nil { + return taskError(err, "error storing task %q depending on task %q", authoredTask.UUID, depTask.UUID) + } + + deps[idx] = &depTask } if log.Debug().Enabled() { @@ -267,26 +330,11 @@ func (db *DB) storeAuthoredJobTaks( depNames[i] = dep.Name } log.Debug(). - Str("task", dbTask.UUID). - Str("name", dbTask.Name). + Str("task", taskInfo.UUID). + Str("name", taskInfo.Name). Strs("dependencies", depNames). Msg("persistence: storing authored task dependencies") } - - dependenciesbatchsize := 1000 - for j := 0; j < len(deps); j += dependenciesbatchsize { - end := j + dependenciesbatchsize - if end > len(deps) { - end = len(deps) - } - currentDeps := deps[j:end] - dbTask.Dependencies = currentDeps - tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID) - subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps}) - if subQuery.Error != nil { - return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end) - } - } } return nil diff --git a/internal/manager/persistence/sqlc/query_jobs.sql b/internal/manager/persistence/sqlc/query_jobs.sql index 154fbeec..9d5e61d4 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql +++ b/internal/manager/persistence/sqlc/query_jobs.sql @@ -1,7 +1,8 @@ --- name: CreateJob :exec +-- name: CreateJob :execlastid INSERT INTO jobs ( created_at, + updated_at, uuid, name, job_type, @@ -10,9 +11,49 @@ INSERT INTO jobs ( activity, settings, metadata, - storage_shaman_checkout_id + storage_shaman_checkout_id, + worker_tag_id ) -VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ); +VALUES ( + @created_at, + @created_at, + @uuid, + @name, + @job_type, + @priority, + @status, + @activity, + @settings, + @metadata, + @storage_shaman_checkout_id, + @worker_tag_id +); + +-- name: CreateTask :execlastid +INSERT INTO tasks ( + created_at, + updated_at, + uuid, + name, + type, + job_id, + priority, + status, + commands +) VALUES ( + @created_at, + @created_at, + @uuid, + @name, + @type, + @job_id, + @priority, + @status, + @commands +); + +-- name: StoreTaskDependency :exec +INSERT INTO task_dependencies (task_id, dependency_id) VALUES (@task_id, @dependency_id); -- name: FetchJob :one -- Fetch a job by its UUID. diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index 4fd399fb..27470b1c 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -63,9 +63,10 @@ func (q *Queries) CountWorkersFailingTask(ctx context.Context, taskID int64) (in return num_failed, err } -const createJob = `-- name: CreateJob :exec +const createJob = `-- name: CreateJob :execlastid INSERT INTO jobs ( created_at, + updated_at, uuid, name, job_type, @@ -74,9 +75,23 @@ INSERT INTO jobs ( activity, settings, metadata, - storage_shaman_checkout_id + storage_shaman_checkout_id, + worker_tag_id +) +VALUES ( + ?1, + ?1, + ?2, + ?3, + ?4, + ?5, + ?6, + ?7, + ?8, + ?9, + ?10, + ?11 ) -VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) ` type CreateJobParams struct { @@ -90,10 +105,11 @@ type CreateJobParams struct { Settings json.RawMessage Metadata json.RawMessage StorageShamanCheckoutID string + WorkerTagID sql.NullInt64 } -func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error { - _, err := q.db.ExecContext(ctx, createJob, +func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) (int64, error) { + result, err := q.db.ExecContext(ctx, createJob, arg.CreatedAt, arg.UUID, arg.Name, @@ -104,8 +120,64 @@ func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error { arg.Settings, arg.Metadata, arg.StorageShamanCheckoutID, + arg.WorkerTagID, ) - return err + if err != nil { + return 0, err + } + return result.LastInsertId() +} + +const createTask = `-- name: CreateTask :execlastid +INSERT INTO tasks ( + created_at, + updated_at, + uuid, + name, + type, + job_id, + priority, + status, + commands +) VALUES ( + ?1, + ?1, + ?2, + ?3, + ?4, + ?5, + ?6, + ?7, + ?8 +) +` + +type CreateTaskParams struct { + CreatedAt time.Time + UUID string + Name string + Type string + JobID int64 + Priority int64 + Status string + Commands json.RawMessage +} + +func (q *Queries) CreateTask(ctx context.Context, arg CreateTaskParams) (int64, error) { + result, err := q.db.ExecContext(ctx, createTask, + arg.CreatedAt, + arg.UUID, + arg.Name, + arg.Type, + arg.JobID, + arg.Priority, + arg.Status, + arg.Commands, + ) + if err != nil { + return 0, err + } + return result.LastInsertId() } const deleteJob = `-- name: DeleteJob :exec @@ -805,6 +877,20 @@ func (q *Queries) SetLastRendered(ctx context.Context, arg SetLastRenderedParams return err } +const storeTaskDependency = `-- name: StoreTaskDependency :exec +INSERT INTO task_dependencies (task_id, dependency_id) VALUES (?1, ?2) +` + +type StoreTaskDependencyParams struct { + TaskID int64 + DependencyID int64 +} + +func (q *Queries) StoreTaskDependency(ctx context.Context, arg StoreTaskDependencyParams) error { + _, err := q.db.ExecContext(ctx, storeTaskDependency, arg.TaskID, arg.DependencyID) + return err +} + const taskAssignToWorker = `-- name: TaskAssignToWorker :exec UPDATE tasks SET updated_at = ?1, diff --git a/internal/manager/persistence/sqlc/query_workers.sql b/internal/manager/persistence/sqlc/query_workers.sql index a940f854..c3c0c527 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql +++ b/internal/manager/persistence/sqlc/query_workers.sql @@ -60,6 +60,11 @@ LEFT JOIN worker_tag_membership m ON (m.worker_tag_id = worker_tags.id) LEFT JOIN workers on (m.worker_id = workers.id) WHERE workers.uuid = @uuid; +-- name: FetchWorkerTagByUUID :one +SELECT sqlc.embed(worker_tags) +FROM worker_tags +WHERE worker_tags.uuid = @uuid; + -- name: SoftDeleteWorker :execrows UPDATE workers SET deleted_at=@deleted_at WHERE uuid=@uuid; diff --git a/internal/manager/persistence/sqlc/query_workers.sql.go b/internal/manager/persistence/sqlc/query_workers.sql.go index 5762a1c8..ea0d8d43 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql.go +++ b/internal/manager/persistence/sqlc/query_workers.sql.go @@ -129,6 +129,30 @@ func (q *Queries) FetchWorker(ctx context.Context, uuid string) (Worker, error) return i, err } +const fetchWorkerTagByUUID = `-- name: FetchWorkerTagByUUID :one +SELECT worker_tags.id, worker_tags.created_at, worker_tags.updated_at, worker_tags.uuid, worker_tags.name, worker_tags.description +FROM worker_tags +WHERE worker_tags.uuid = ?1 +` + +type FetchWorkerTagByUUIDRow struct { + WorkerTag WorkerTag +} + +func (q *Queries) FetchWorkerTagByUUID(ctx context.Context, uuid string) (FetchWorkerTagByUUIDRow, error) { + row := q.db.QueryRowContext(ctx, fetchWorkerTagByUUID, uuid) + var i FetchWorkerTagByUUIDRow + err := row.Scan( + &i.WorkerTag.ID, + &i.WorkerTag.CreatedAt, + &i.WorkerTag.UpdatedAt, + &i.WorkerTag.UUID, + &i.WorkerTag.Name, + &i.WorkerTag.Description, + ) + return i, err +} + const fetchWorkerTags = `-- name: FetchWorkerTags :many SELECT worker_tags.id, worker_tags.created_at, worker_tags.updated_at, worker_tags.uuid, worker_tags.name, worker_tags.description FROM worker_tags