Manager: add 'task failure list' to record workers failing tasks

The persistence layer can now store which worker failed which task, as
preparation for a blocklisting system. Such a system should be able to
determine whether there are still any workers left to do the work.
This commit is contained in:
Sybren A. Stüvel 2022-06-13 16:51:19 +02:00
parent e35911d106
commit c5debdeb70
3 changed files with 90 additions and 1 deletions

@ -7,7 +7,7 @@ import (
) )
func (db *DB) migrate() error { func (db *DB) migrate() error {
err := db.gormDB.AutoMigrate(&Job{}, &Task{}, &Worker{}) err := db.gormDB.AutoMigrate(&Job{}, &Task{}, &TaskFailure{}, &Worker{})
if err != nil { if err != nil {
return fmt.Errorf("failed to automigrate database: %v", err) return fmt.Errorf("failed to automigrate database: %v", err)
} }

@ -7,9 +7,12 @@ import (
"database/sql/driver" "database/sql/driver"
"encoding/json" "encoding/json"
"errors" "errors"
"math"
"time" "time"
"github.com/rs/zerolog/log"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause"
"git.blender.org/flamenco/internal/manager/job_compilers" "git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/pkg/api" "git.blender.org/flamenco/pkg/api"
@ -95,6 +98,17 @@ func (js *StringStringMap) Scan(value interface{}) error {
return json.Unmarshal(b, &js) return json.Unmarshal(b, &js)
} }
// TaskFailure keeps track of which Worker failed which Task.
type TaskFailure struct {
// Don't include the standard Gorm ID, UpdatedAt, or DeletedAt fields, as they're useless here.
// Entries will never be updated, and should never be soft-deleted but just purged from existence.
CreatedAt time.Time
TaskID uint `gorm:"primaryKey;autoIncrement:false"`
Task *Task `gorm:"foreignkey:TaskID;references:ID;constraint:OnDelete:CASCADE"`
WorkerID uint `gorm:"primaryKey;autoIncrement:false"`
Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"`
}
// StoreJob stores an AuthoredJob and its tasks, and saves it to the database. // StoreJob stores an AuthoredJob and its tasks, and saves it to the database.
// The job will be in 'under construction' status. It is up to the caller to transition it to its desired initial status. // The job will be in 'under construction' status. It is up to the caller to transition it to its desired initial status.
func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error { func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error {
@ -411,3 +425,37 @@ func (db *DB) TaskTouchedByWorker(ctx context.Context, t *Task) error {
} }
return nil return nil
} }
// AddWorkerToTaskFailedList records that the given worker failed the given task.
// This information is not used directly by the task scheduler. It's used to
// determine whether there are any workers left to perform this task, and thus
// whether it should be hard- or soft-failed.
//
// Calling this multiple times with the same task/worker is a no-op.
//
// Returns the new number of workers that failed this task.
func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker) (numFailed int, err error) {
entry := TaskFailure{
Task: t,
Worker: w,
}
tx := db.gormDB.WithContext(ctx).
Clauses(clause.OnConflict{DoNothing: true}).
Create(&entry)
if tx.Error != nil {
return 0, tx.Error
}
var numFailed64 int64
tx = db.gormDB.WithContext(ctx).Model(&TaskFailure{}).
Where("task_id=?", t.ID).
Count(&numFailed64)
// Integer literals are of type `int`, so that's just a bit nicer to work with
// than `int64`.
if numFailed64 > math.MaxUint32 {
log.Warn().Int64("numFailed", numFailed64).Msg("number of failed workers is crazy high, something is wrong here")
return math.MaxUint32, tx.Error
}
return int(numFailed64), tx.Error
}

@ -263,6 +263,47 @@ func TestTaskTouchedByWorker(t *testing.T) {
assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second) assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second)
} }
func TestAddWorkerToTaskFailedList(t *testing.T) {
ctx, close, db, _, authoredJob := jobTasksTestFixtures(t)
defer close()
task, err := db.FetchTask(ctx, authoredJob.Tasks[1].UUID)
assert.NoError(t, err)
worker1 := createWorker(ctx, t, db)
// Create another working, using the 1st as template:
newWorker := *worker1
newWorker.ID = 0
newWorker.UUID = "89ed2b02-b51b-4cd4-b44a-4a1c8d01db85"
newWorker.Name = "Worker 2"
assert.NoError(t, db.SaveWorker(ctx, &newWorker))
worker2, err := db.FetchWorker(ctx, newWorker.UUID)
assert.NoError(t, err)
// First failure should be registered just fine.
numFailed, err := db.AddWorkerToTaskFailedList(ctx, task, worker1)
assert.NoError(t, err)
assert.Equal(t, 1, numFailed)
// Calling again should be a no-op and not cause any errors.
numFailed, err = db.AddWorkerToTaskFailedList(ctx, task, worker1)
assert.NoError(t, err)
assert.Equal(t, 1, numFailed)
// Another worker should be able to fail this task as well.
numFailed, err = db.AddWorkerToTaskFailedList(ctx, task, worker2)
assert.NoError(t, err)
assert.Equal(t, 2, numFailed)
// Deleting the task should also delete the failures.
assert.NoError(t, db.DeleteJob(ctx, authoredJob.JobID))
var num int64
tx := db.gormDB.Model(&TaskFailure{}).Count(&num)
assert.NoError(t, tx.Error)
assert.Zero(t, num)
}
func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob { func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob {
task1 := job_compilers.AuthoredTask{ task1 := job_compilers.AuthoredTask{
Name: "render-1-3", Name: "render-1-3",