Manager: re-queue previously failed tasks of worker when blocklisting
When a Worker is blocked from a job, re-queue its previously failed tasks so that other workers can give them a try.
This commit is contained in:
parent
b95bed1f96
commit
046853932d
@ -75,6 +75,7 @@ type TaskStateMachine interface {
|
||||
JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus, reason string) error
|
||||
|
||||
RequeueActiveTasksOfWorker(ctx context.Context, worker *persistence.Worker, reason string) error
|
||||
RequeueFailedTasksOfWorkerOfJob(ctx context.Context, worker *persistence.Worker, job *persistence.Job, reason string) error
|
||||
}
|
||||
|
||||
// TaskStateMachine should be a subset of task_state_machine.StateMachine.
|
||||
|
@ -679,6 +679,20 @@ func (mr *MockTaskStateMachineMockRecorder) RequeueActiveTasksOfWorker(arg0, arg
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequeueActiveTasksOfWorker", reflect.TypeOf((*MockTaskStateMachine)(nil).RequeueActiveTasksOfWorker), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// RequeueFailedTasksOfWorkerOfJob mocks base method.
|
||||
func (m *MockTaskStateMachine) RequeueFailedTasksOfWorkerOfJob(arg0 context.Context, arg1 *persistence.Worker, arg2 *persistence.Job, arg3 string) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "RequeueFailedTasksOfWorkerOfJob", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// RequeueFailedTasksOfWorkerOfJob indicates an expected call of RequeueFailedTasksOfWorkerOfJob.
|
||||
func (mr *MockTaskStateMachineMockRecorder) RequeueFailedTasksOfWorkerOfJob(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequeueFailedTasksOfWorkerOfJob", reflect.TypeOf((*MockTaskStateMachine)(nil).RequeueFailedTasksOfWorkerOfJob), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// TaskStatusChange mocks base method.
|
||||
func (m *MockTaskStateMachine) TaskStatusChange(arg0 context.Context, arg1 *persistence.Task, arg2 api.TaskStatus) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -2,17 +2,17 @@
|
||||
_meta:
|
||||
version: 3
|
||||
mode: develop
|
||||
listen: '[::0]:8083'
|
||||
listen: "[::0]:8083"
|
||||
own_url: http://192.168.3.108:8083/
|
||||
flamenco: http://localhost:51234/
|
||||
manager_id: 5852bc5198377351f95d103e
|
||||
manager_secret: SRVwA7wAxPRfudvqTDOLXwPn1cDRIlADz5Ef9kHk7d52Us
|
||||
|
||||
task_logs_path: /tmp/flamenco-unittests
|
||||
blacklist_threshold: 3
|
||||
blocklist_threshold: 3
|
||||
|
||||
shaman:
|
||||
enabled: false
|
||||
enabled: false
|
||||
|
||||
variables:
|
||||
blender:
|
||||
@ -25,7 +25,7 @@ variables:
|
||||
platform: linux
|
||||
value: /opt/myblenderbuild/blender
|
||||
- platform: windows
|
||||
value: 'c:/temp/blender.exe'
|
||||
value: "c:/temp/blender.exe"
|
||||
- platform: darwin
|
||||
value: /opt/myblenderbuild/blender
|
||||
ffmpeg:
|
||||
|
@ -161,15 +161,22 @@ func (f *Flamenco) onTaskFailed(
|
||||
}
|
||||
|
||||
logger = logger.With().Str("taskType", task.Type).Logger()
|
||||
shouldHardFail, err := f.maybeBlocklistWorker(ctx, logger, worker, task)
|
||||
wasBlacklisted, shoudlFailJob, err := f.maybeBlocklistWorker(ctx, logger, worker, task)
|
||||
if err != nil {
|
||||
return fmt.Errorf("block-listing worker: %w", err)
|
||||
}
|
||||
if shouldHardFail {
|
||||
// Hard failure because of blocklisting should simply fail the entire job.
|
||||
// There are no more workers left to finish it.
|
||||
if shoudlFailJob {
|
||||
// There are no more workers left to finish the job.
|
||||
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
|
||||
}
|
||||
if wasBlacklisted {
|
||||
// Requeue all tasks of this job & task type that were hard-failed before by this worker.
|
||||
reason := fmt.Sprintf("worker %s was blocked from tasks of type %q", worker.Name, task.Type)
|
||||
err := f.stateMachine.RequeueFailedTasksOfWorkerOfJob(ctx, worker, task.Job, reason)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Determine whether this is soft or hard failure.
|
||||
threshold := f.config.Get().TaskFailAfterSoftFailCount
|
||||
@ -186,22 +193,18 @@ func (f *Flamenco) onTaskFailed(
|
||||
// maybeBlocklistWorker potentially block-lists the Worker, and checks whether
|
||||
// there are any workers left to run tasks of this type.
|
||||
//
|
||||
// Returns "must hard-fail". That is, returns `false` if there are still workers
|
||||
// left to run tasks of this type, on this job.
|
||||
//
|
||||
// If the worker is NOT block-listed at this moment, always returns `false`.
|
||||
//
|
||||
// Returns `true` if ALL workers that can execute this task type are blocked
|
||||
// from working on this job.
|
||||
// Returns whether the worker was blacklisted, and whether the entire job should
|
||||
// be failed (in case this was the last worker that could have worked on this
|
||||
// task).
|
||||
func (f *Flamenco) maybeBlocklistWorker(
|
||||
ctx context.Context,
|
||||
logger zerolog.Logger,
|
||||
worker *persistence.Worker,
|
||||
task *persistence.Task,
|
||||
) (bool, error) {
|
||||
) (wasBlacklisted, shouldFailJob bool, err error) {
|
||||
numFailures, err := f.persist.CountTaskFailuresOfWorker(ctx, task.Job, worker, task.Type)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("counting failures of worker on job %q, task type %q: %w", task.Job.UUID, task.Type, err)
|
||||
return false, false, fmt.Errorf("counting failures of worker on job %q, task type %q: %w", task.Job.UUID, task.Type, err)
|
||||
}
|
||||
// The received task update hasn't been persisted in the database yet,
|
||||
// so we should count that too.
|
||||
@ -213,17 +216,17 @@ func (f *Flamenco) maybeBlocklistWorker(
|
||||
// TODO: This might need special handling, as this worker will be blocked
|
||||
// from retrying this particular task. It could have been the last worker to
|
||||
// be allowed this task type; if that is the case, the job is now stuck.
|
||||
return false, nil
|
||||
return false, false, nil
|
||||
}
|
||||
|
||||
// Blocklist the Worker.
|
||||
if err := f.blocklistWorker(ctx, logger, worker, task); err != nil {
|
||||
return false, err
|
||||
return true, false, err
|
||||
}
|
||||
|
||||
// Return hard-failure if there are no workers left for this task type.
|
||||
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
|
||||
return numWorkers == 0, err
|
||||
return true, numWorkers == 0, err
|
||||
}
|
||||
|
||||
func (f *Flamenco) blocklistWorker(
|
||||
@ -239,8 +242,6 @@ func (f *Flamenco) blocklistWorker(
|
||||
if err != nil {
|
||||
return fmt.Errorf("adding worker to block list: %w", err)
|
||||
}
|
||||
|
||||
// TODO: requeue all tasks of this job & task type that were hard-failed by this worker.
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -590,6 +590,12 @@ func TestBlockingAfterFailure(t *testing.T) {
|
||||
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
|
||||
"Task failed by 1 worker, Manager will mark it as soft failure. 2 more failures will cause hard failure.")
|
||||
|
||||
// Because the job didn't fail in its entirety, the tasks previously failed
|
||||
// by the Worker should be requeued so they can be picked up by another.
|
||||
mf.stateMachine.EXPECT().RequeueFailedTasksOfWorkerOfJob(
|
||||
gomock.Any(), &worker, &mockJob,
|
||||
"worker дрон was blocked from tasks of type \"misc\"")
|
||||
|
||||
// Do the call.
|
||||
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
|
||||
requestWorkerStore(echoCtx, &worker)
|
||||
@ -619,6 +625,8 @@ func TestBlockingAfterFailure(t *testing.T) {
|
||||
mf.stateMachine.EXPECT().
|
||||
JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"")
|
||||
|
||||
// Because the job failed, there is no need to re-queue any tasks previously failed by this worker.
|
||||
|
||||
// Do the call.
|
||||
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
|
||||
requestWorkerStore(echoCtx, &worker)
|
||||
@ -652,6 +660,8 @@ func TestBlockingAfterFailure(t *testing.T) {
|
||||
mf.stateMachine.EXPECT().
|
||||
JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"")
|
||||
|
||||
// Because the job failed, there is no need to re-queue any tasks previously failed by this worker.
|
||||
|
||||
// Do the call.
|
||||
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
|
||||
requestWorkerStore(echoCtx, &worker)
|
||||
@ -743,5 +753,4 @@ func TestMayWorkerRun(t *testing.T) {
|
||||
StatusChangeRequested: true,
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -287,6 +287,21 @@ func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, ta
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (db *DB) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, worker *Worker, taskStatus api.TaskStatus, job *Job) ([]*Task, error) {
|
||||
result := []*Task{}
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Model(&Task{}).
|
||||
Joins("Job").
|
||||
Where("tasks.worker_id = ?", worker.ID).
|
||||
Where("tasks.status = ?", taskStatus).
|
||||
Where("job.id = ?", job.ID).
|
||||
Scan(&result)
|
||||
if tx.Error != nil {
|
||||
return nil, taskError(tx.Error, "finding tasks of worker %s in status %q and job %s", worker.UUID, taskStatus, job.UUID)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) {
|
||||
var numTasksInStatus int64
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
|
@ -34,6 +34,7 @@ type PersistenceService interface {
|
||||
|
||||
FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*persistence.Job, error)
|
||||
FetchTasksOfWorkerInStatus(context.Context, *persistence.Worker, api.TaskStatus) ([]*persistence.Task, error)
|
||||
FetchTasksOfWorkerInStatusOfJob(context.Context, *persistence.Worker, api.TaskStatus, *persistence.Job) ([]*persistence.Task, error)
|
||||
}
|
||||
|
||||
// PersistenceService should be a subset of persistence.DB
|
||||
|
@ -93,6 +93,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfWorkerInStatus(arg0, a
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTasksOfWorkerInStatus", reflect.TypeOf((*MockPersistenceService)(nil).FetchTasksOfWorkerInStatus), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// FetchTasksOfWorkerInStatusOfJob mocks base method.
|
||||
func (m *MockPersistenceService) FetchTasksOfWorkerInStatusOfJob(arg0 context.Context, arg1 *persistence.Worker, arg2 api.TaskStatus, arg3 *persistence.Job) ([]*persistence.Task, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "FetchTasksOfWorkerInStatusOfJob", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].([]*persistence.Task)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// FetchTasksOfWorkerInStatusOfJob indicates an expected call of FetchTasksOfWorkerInStatusOfJob.
|
||||
func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfWorkerInStatusOfJob(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTasksOfWorkerInStatusOfJob", reflect.TypeOf((*MockPersistenceService)(nil).FetchTasksOfWorkerInStatusOfJob), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// JobHasTasksInStatus mocks base method.
|
||||
func (m *MockPersistenceService) JobHasTasksInStatus(arg0 context.Context, arg1 *persistence.Job, arg2 api.TaskStatus) (bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -4,7 +4,6 @@ package task_state_machine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.blender.org/flamenco/internal/manager/persistence"
|
||||
"git.blender.org/flamenco/pkg/api"
|
||||
@ -18,17 +17,47 @@ func (sm *StateMachine) RequeueActiveTasksOfWorker(
|
||||
ctx context.Context,
|
||||
worker *persistence.Worker,
|
||||
reason string,
|
||||
) error {
|
||||
// Fetch the tasks to update.
|
||||
tasks, err := sm.persist.FetchTasksOfWorkerInStatus(
|
||||
ctx, worker, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return sm.requeueTasksOfWorker(ctx, tasks, worker, reason)
|
||||
}
|
||||
|
||||
// RequeueFailedTasksOfWorkerOfJob re-queues all failed tasks of this worker on this job.
|
||||
//
|
||||
// `reason`: a string that can be appended to text like "Task requeued because "
|
||||
func (sm *StateMachine) RequeueFailedTasksOfWorkerOfJob(
|
||||
ctx context.Context,
|
||||
worker *persistence.Worker,
|
||||
job *persistence.Job,
|
||||
reason string,
|
||||
) error {
|
||||
// Fetch the tasks to update.
|
||||
tasks, err := sm.persist.FetchTasksOfWorkerInStatusOfJob(
|
||||
ctx, worker, api.TaskStatusFailed, job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return sm.requeueTasksOfWorker(ctx, tasks, worker, reason)
|
||||
}
|
||||
|
||||
func (sm *StateMachine) requeueTasksOfWorker(
|
||||
ctx context.Context,
|
||||
tasks []*persistence.Task,
|
||||
worker *persistence.Worker,
|
||||
reason string,
|
||||
) error {
|
||||
logger := log.With().
|
||||
Str("worker", worker.UUID).
|
||||
Str("reason", reason).
|
||||
Logger()
|
||||
|
||||
// Fetch the tasks to update.
|
||||
tasks, err := sm.persist.FetchTasksOfWorkerInStatus(ctx, worker, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetching tasks of worker %s in status %q: %w", worker.UUID, api.TaskStatusActive, err)
|
||||
}
|
||||
|
||||
// Run each task change through the task state machine.
|
||||
var lastErr error
|
||||
for _, task := range tasks {
|
||||
|
Loading…
Reference in New Issue
Block a user