Manager: convert FetchWorkerTask() from gorm to sqlc

Ref: #104305

No functional changes
This commit is contained in:
Sybren A. Stüvel 2024-07-02 10:16:41 +02:00
parent 7db91294c2
commit 3b2b5c47f3
4 changed files with 118 additions and 39 deletions

@ -47,6 +47,21 @@ WHERE TF.worker_id IS NULL -- Not failed by this worker before.
AND tasks.type IN (sqlc.slice('supported_task_types'))
ORDER BY jobs.priority DESC, tasks.priority DESC;
-- name: FetchWorkerTask :one
-- Find the currently-active task assigned to a Worker. If not found, find the last task this Worker worked on.
SELECT
sqlc.embed(tasks),
sqlc.embed(jobs),
(tasks.status = @task_status_active AND jobs.status = @job_status_active) as is_active
FROM tasks
INNER JOIN jobs ON tasks.job_id = jobs.id
WHERE
tasks.worker_id = @worker_id
ORDER BY
is_active DESC,
tasks.updated_at DESC
LIMIT 1;
-- name: AssignTaskToWorker :exec
UPDATE tasks
SET worker_id=@worker_id, last_touched_at=@now, updated_at=@now

@ -82,6 +82,70 @@ func (q *Queries) FetchAssignedAndRunnableTaskOfWorker(ctx context.Context, arg
return i, err
}
const fetchWorkerTask = `-- name: FetchWorkerTask :one
SELECT
tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity,
jobs.id, jobs.created_at, jobs.updated_at, jobs.uuid, jobs.name, jobs.job_type, jobs.priority, jobs.status, jobs.activity, jobs.settings, jobs.metadata, jobs.delete_requested_at, jobs.storage_shaman_checkout_id, jobs.worker_tag_id,
(tasks.status = ?1 AND jobs.status = ?2) as is_active
FROM tasks
INNER JOIN jobs ON tasks.job_id = jobs.id
WHERE
tasks.worker_id = ?3
ORDER BY
is_active DESC,
tasks.updated_at DESC
LIMIT 1
`
type FetchWorkerTaskParams struct {
TaskStatusActive string
JobStatusActive string
WorkerID sql.NullInt64
}
type FetchWorkerTaskRow struct {
Task Task
Job Job
IsActive interface{}
}
// Find the currently-active task assigned to a Worker. If not found, find the last task this Worker worked on.
func (q *Queries) FetchWorkerTask(ctx context.Context, arg FetchWorkerTaskParams) (FetchWorkerTaskRow, error) {
row := q.db.QueryRowContext(ctx, fetchWorkerTask, arg.TaskStatusActive, arg.JobStatusActive, arg.WorkerID)
var i FetchWorkerTaskRow
err := row.Scan(
&i.Task.ID,
&i.Task.CreatedAt,
&i.Task.UpdatedAt,
&i.Task.UUID,
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
&i.Task.LastTouchedAt,
&i.Task.Commands,
&i.Task.Activity,
&i.Job.ID,
&i.Job.CreatedAt,
&i.Job.UpdatedAt,
&i.Job.UUID,
&i.Job.Name,
&i.Job.JobType,
&i.Job.Priority,
&i.Job.Status,
&i.Job.Activity,
&i.Job.Settings,
&i.Job.Metadata,
&i.Job.DeleteRequestedAt,
&i.Job.StorageShamanCheckoutID,
&i.Job.WorkerTagID,
&i.IsActive,
)
return i, err
}
const findRunnableTask = `-- name: FindRunnableTask :one
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
FROM tasks

@ -10,7 +10,6 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
"projects.blender.org/studio/flamenco/pkg/api"
@ -164,14 +163,3 @@ func findTaskForWorker(
}
return row.Task, nil
}
// taskAssignedAndRunnableQuery appends some GORM clauses to query for a task
// that's already assigned to this worker, and is in a runnable state.
func taskAssignedAndRunnableQuery(tx *gorm.DB, w *Worker) *gorm.DB {
return tx.
Joins("left join jobs on tasks.job_id = jobs.id").
Where("tasks.status = ?", api.TaskStatusActive).
Where("jobs.status in ?", schedulableJobStatuses).
Where("tasks.worker_id = ?", w.ID). // assigned to this worker
Limit(1)
}

@ -5,6 +5,7 @@ package persistence
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"
@ -191,38 +192,49 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) {
// FetchWorkerTask returns the most recent task assigned to the given Worker.
func (db *DB) FetchWorkerTask(ctx context.Context, worker *Worker) (*Task, error) {
task := Task{}
// See if there is a task assigned to this worker in the same way that the
// task scheduler does.
query := db.gormDB.WithContext(ctx)
query = taskAssignedAndRunnableQuery(query, worker)
tx := query.
Order("tasks.updated_at").
Preload("Job").
Find(&task)
if tx.Error != nil {
return nil, taskError(tx.Error, "fetching task assigned to Worker %s", worker.UUID)
}
if task.ID != 0 {
// Found a task!
return &task, nil
queries, err := db.queries()
if err != nil {
return nil, err
}
// If not found, just find the last-modified task associated with this Worker.
tx = db.gormDB.WithContext(ctx).
Where("worker_id = ?", worker.ID).
Order("tasks.updated_at DESC").
Preload("Job").
Find(&task)
if tx.Error != nil {
return nil, taskError(tx.Error, "fetching task assigned to Worker %s", worker.UUID)
}
if task.ID == 0 {
// Convert the WorkerID to a NullInt64. As task.worker_id can be NULL, this is
// what sqlc expects us to pass in.
workerID := sql.NullInt64{Int64: int64(worker.ID), Valid: true}
row, err := queries.FetchWorkerTask(ctx, sqlc.FetchWorkerTaskParams{
TaskStatusActive: string(api.TaskStatusActive),
JobStatusActive: string(api.JobStatusActive),
WorkerID: workerID,
})
switch {
case errors.Is(err, sql.ErrNoRows):
return nil, nil
case err != nil:
return nil, taskError(err, "fetching task assigned to Worker %s", worker.UUID)
}
return &task, nil
// Found a task!
if row.Job.ID == 0 {
panic(fmt.Sprintf("task found but with no job: %#v", row))
}
if row.Task.ID == 0 {
panic(fmt.Sprintf("task found but with zero ID: %#v", row))
}
// Convert the task & job to gorm data types.
gormTask, err := convertSqlcTask(row.Task, row.Job.UUID, worker.UUID)
if err != nil {
return nil, err
}
gormJob, err := convertSqlcJob(row.Job)
if err != nil {
return nil, err
}
gormTask.Job = &gormJob
gormTask.Worker = worker
return gormTask, nil
}
func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error {