Manager: re-queue tasks of timed-out workers

Allow other workers to pick up the task(s) assigned to a timed-out worker.
This commit is contained in:
Sybren A. Stüvel 2022-06-10 17:55:22 +02:00
parent 7d5aae25b5
commit 986b647967
4 changed files with 25 additions and 3 deletions

@ -26,6 +26,7 @@ var _ PersistenceService = (*persistence.DB)(nil)
type TaskStateMachine interface {
// TaskStatusChange gives a Task a new status, and handles the resulting status changes on the job.
TaskStatusChange(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error
RequeueTasksOfWorker(ctx context.Context, worker *persistence.Worker, reason string) error
}
var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil)

@ -105,6 +105,20 @@ func (m *MockTaskStateMachine) EXPECT() *MockTaskStateMachineMockRecorder {
return m.recorder
}
// RequeueTasksOfWorker mocks base method.
func (m *MockTaskStateMachine) RequeueTasksOfWorker(arg0 context.Context, arg1 *persistence.Worker, arg2 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RequeueTasksOfWorker", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// RequeueTasksOfWorker indicates an expected call of RequeueTasksOfWorker.
func (mr *MockTaskStateMachineMockRecorder) RequeueTasksOfWorker(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequeueTasksOfWorker", reflect.TypeOf((*MockTaskStateMachine)(nil).RequeueTasksOfWorker), arg0, arg1, arg2)
}
// TaskStatusChange mocks base method.
func (m *MockTaskStateMachine) TaskStatusChange(arg0 context.Context, arg1 *persistence.Task, arg2 api.TaskStatus) error {
m.ctrl.T.Helper()

@ -53,6 +53,10 @@ func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistenc
logger.Error().Err(err).Msg("TimeoutChecker: error saving timed-out worker to database")
}
// Re-queue all tasks assigned to this worker.
err = ttc.taskStateMachine.RequeueTasksOfWorker(ctx, worker, "worker timed out")
if err != nil {
logger.Error().Err(err).Msg("TimeoutChecker: error re-queueing tasks of timed-out worker")
}
// TODO: broadcast worker change via SocketIO
}

@ -35,10 +35,15 @@ func TestWorkerTimeout(t *testing.T) {
StatusRequested: api.WorkerStatusAwake,
}
// No tasks are timing out in this test.
mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()).Return([]*persistence.Task{}, nil)
mocks.persist.EXPECT().FetchTimedOutWorkers(mocks.ctx, gomock.Any()).
Return([]*persistence.Worker{&worker}, nil)
// Expect all tasks assigned to the worker to get requeued.
mocks.taskStateMachine.EXPECT().RequeueTasksOfWorker(mocks.ctx, &worker, "worker timed out")
persistedWorker := worker
persistedWorker.Status = api.WorkerStatusError
// Any queued up status change should be cleared, as the Worker is not allowed
@ -46,8 +51,6 @@ func TestWorkerTimeout(t *testing.T) {
persistedWorker.StatusChangeClear()
mocks.persist.EXPECT().SaveWorker(mocks.ctx, &persistedWorker).Return(nil)
// TODO: expect all tasks assigned to the worker to get requeued.
// All the timeouts should be handled after the initial sleep.
mocks.clock.Add(timeoutInitialSleep)
}