Manager: Convert JobHasTasksInStatus and CountTasksOfJobInStatus to sqlc

No functional changes.
This commit is contained in:
Sybren A. Stüvel 2024-05-20 21:18:19 +02:00
parent b66490831c
commit 4ab853da40
4 changed files with 100 additions and 24 deletions

@ -715,38 +715,44 @@ func (db *DB) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, worker *Worke
} }
func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) { func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) {
var numTasksInStatus int64 queries, err := db.queries()
tx := db.gormDB.WithContext(ctx). if err != nil {
Model(&Task{}). return false, err
Where("job_id", job.ID).
Where("status", taskStatus).
Count(&numTasksInStatus)
if tx.Error != nil {
return false, taskError(tx.Error, "counting tasks of job %s in status %q", job.UUID, taskStatus)
} }
return numTasksInStatus > 0, nil
count, err := queries.JobCountTasksInStatus(ctx, sqlc.JobCountTasksInStatusParams{
JobID: int64(job.ID),
TaskStatus: string(taskStatus),
})
if err != nil {
return false, taskError(err, "counting tasks of job %s in status %q", job.UUID, taskStatus)
}
return count > 0, nil
} }
// CountTasksOfJobInStatus counts the number of tasks in the job.
// It returns two counts, one is the number of tasks in the given statuses, the
// other is the total number of tasks of the job.
func (db *DB) CountTasksOfJobInStatus( func (db *DB) CountTasksOfJobInStatus(
ctx context.Context, ctx context.Context,
job *Job, job *Job,
taskStatuses ...api.TaskStatus, taskStatuses ...api.TaskStatus,
) (numInStatus, numTotal int, err error) { ) (numInStatus, numTotal int, err error) {
type Result struct { queries, err := db.queries()
Status api.TaskStatus if err != nil {
NumTasks int return 0, 0, err
} }
var results []Result
tx := db.gormDB.WithContext(ctx). // Convert from []api.TaskStatus to []string for feeding to sqlc.
Model(&Task{}). statusesAsStrings := make([]string, len(taskStatuses))
Select("status, count(*) as num_tasks"). for index := range taskStatuses {
Where("job_id", job.ID). statusesAsStrings[index] = string(taskStatuses[index])
Group("status"). }
Scan(&results)
if tx.Error != nil { results, err := queries.JobCountTaskStatuses(ctx, int64(job.ID))
return 0, 0, jobError(tx.Error, "count tasks of job %s in status %q", job.UUID, taskStatuses) if err != nil {
return 0, 0, jobError(err, "count tasks of job %s in status %q", job.UUID, taskStatuses)
} }
// Create lookup table for which statuses to count. // Create lookup table for which statuses to count.
@ -757,10 +763,10 @@ func (db *DB) CountTasksOfJobInStatus(
// Count the number of tasks per status. // Count the number of tasks per status.
for _, result := range results { for _, result := range results {
if countStatus[result.Status] { if countStatus[api.TaskStatus(result.Status)] {
numInStatus += result.NumTasks numInStatus += int(result.NumTasks)
} }
numTotal += result.NumTasks numTotal += int(result.NumTasks)
} }
return return

@ -396,6 +396,12 @@ func TestCountTasksOfJobInStatus(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 0, numActive) assert.Equal(t, 0, numActive)
assert.Equal(t, 3, numTotal) assert.Equal(t, 3, numTotal)
numCounted, numTotal, err := db.CountTasksOfJobInStatus(ctx, job,
api.TaskStatusFailed, api.TaskStatusQueued)
require.NoError(t, err)
assert.Equal(t, 3, numCounted)
assert.Equal(t, 3, numTotal)
} }
func TestCheckIfJobsHoldLargeNumOfTasks(t *testing.T) { func TestCheckIfJobsHoldLargeNumOfTasks(t *testing.T) {

@ -120,3 +120,14 @@ UPDATE tasks SET
updated_at = @updated_at, updated_at = @updated_at,
worker_id = @worker_id worker_id = @worker_id
WHERE id=@id; WHERE id=@id;
-- name: JobCountTasksInStatus :one
-- Fetch number of tasks in the given status, of the given job.
SELECT count(*) as num_tasks FROM tasks
WHERE job_id = @job_id AND status = @task_status;
-- name: JobCountTaskStatuses :many
-- Fetch (status, num tasks in that status) rows for the given job.
SELECT status, count(*) as num_tasks FROM tasks
WHERE job_id = @job_id
GROUP BY status;

@ -396,6 +396,59 @@ func (q *Queries) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, arg Fetch
return items, nil return items, nil
} }
const jobCountTaskStatuses = `-- name: JobCountTaskStatuses :many
SELECT status, count(*) as num_tasks FROM tasks
WHERE job_id = ?1
GROUP BY status
`
type JobCountTaskStatusesRow struct {
Status string
NumTasks int64
}
// Fetch (status, num tasks in that status) rows for the given job.
func (q *Queries) JobCountTaskStatuses(ctx context.Context, jobID int64) ([]JobCountTaskStatusesRow, error) {
rows, err := q.db.QueryContext(ctx, jobCountTaskStatuses, jobID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []JobCountTaskStatusesRow
for rows.Next() {
var i JobCountTaskStatusesRow
if err := rows.Scan(&i.Status, &i.NumTasks); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const jobCountTasksInStatus = `-- name: JobCountTasksInStatus :one
SELECT count(*) as num_tasks FROM tasks
WHERE job_id = ?1 AND status = ?2
`
type JobCountTasksInStatusParams struct {
JobID int64
TaskStatus string
}
// Fetch number of tasks in the given status, of the given job.
func (q *Queries) JobCountTasksInStatus(ctx context.Context, arg JobCountTasksInStatusParams) (int64, error) {
row := q.db.QueryRowContext(ctx, jobCountTasksInStatus, arg.JobID, arg.TaskStatus)
var num_tasks int64
err := row.Scan(&num_tasks)
return num_tasks, err
}
const requestJobDeletion = `-- name: RequestJobDeletion :exec const requestJobDeletion = `-- name: RequestJobDeletion :exec
UPDATE jobs SET UPDATE jobs SET
updated_at = ?1, updated_at = ?1,