diff --git a/internal/manager/timeout_checker/interfaces.go b/internal/manager/timeout_checker/interfaces.go index 4b7d4ce5..9f072e11 100644 --- a/internal/manager/timeout_checker/interfaces.go +++ b/internal/manager/timeout_checker/interfaces.go @@ -26,6 +26,7 @@ var _ PersistenceService = (*persistence.DB)(nil) type TaskStateMachine interface { // TaskStatusChange gives a Task a new status, and handles the resulting status changes on the job. TaskStatusChange(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error + RequeueTasksOfWorker(ctx context.Context, worker *persistence.Worker, reason string) error } var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil) diff --git a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go index 7152148c..77a981f1 100644 --- a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go +++ b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go @@ -105,6 +105,20 @@ func (m *MockTaskStateMachine) EXPECT() *MockTaskStateMachineMockRecorder { return m.recorder } +// RequeueTasksOfWorker mocks base method. +func (m *MockTaskStateMachine) RequeueTasksOfWorker(arg0 context.Context, arg1 *persistence.Worker, arg2 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RequeueTasksOfWorker", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// RequeueTasksOfWorker indicates an expected call of RequeueTasksOfWorker. +func (mr *MockTaskStateMachineMockRecorder) RequeueTasksOfWorker(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequeueTasksOfWorker", reflect.TypeOf((*MockTaskStateMachine)(nil).RequeueTasksOfWorker), arg0, arg1, arg2) +} + // TaskStatusChange mocks base method. func (m *MockTaskStateMachine) TaskStatusChange(arg0 context.Context, arg1 *persistence.Task, arg2 api.TaskStatus) error { m.ctrl.T.Helper() diff --git a/internal/manager/timeout_checker/workers.go b/internal/manager/timeout_checker/workers.go index ab39513f..8bdce881 100644 --- a/internal/manager/timeout_checker/workers.go +++ b/internal/manager/timeout_checker/workers.go @@ -53,6 +53,10 @@ func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistenc logger.Error().Err(err).Msg("TimeoutChecker: error saving timed-out worker to database") } - // Re-queue all tasks assigned to this worker. + err = ttc.taskStateMachine.RequeueTasksOfWorker(ctx, worker, "worker timed out") + if err != nil { + logger.Error().Err(err).Msg("TimeoutChecker: error re-queueing tasks of timed-out worker") + } + // TODO: broadcast worker change via SocketIO } diff --git a/internal/manager/timeout_checker/workers_test.go b/internal/manager/timeout_checker/workers_test.go index 1d95edfa..5be02249 100644 --- a/internal/manager/timeout_checker/workers_test.go +++ b/internal/manager/timeout_checker/workers_test.go @@ -35,10 +35,15 @@ func TestWorkerTimeout(t *testing.T) { StatusRequested: api.WorkerStatusAwake, } + // No tasks are timing out in this test. mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()).Return([]*persistence.Task{}, nil) + mocks.persist.EXPECT().FetchTimedOutWorkers(mocks.ctx, gomock.Any()). Return([]*persistence.Worker{&worker}, nil) + // Expect all tasks assigned to the worker to get requeued. + mocks.taskStateMachine.EXPECT().RequeueTasksOfWorker(mocks.ctx, &worker, "worker timed out") + persistedWorker := worker persistedWorker.Status = api.WorkerStatusError // Any queued up status change should be cleared, as the Worker is not allowed @@ -46,8 +51,6 @@ func TestWorkerTimeout(t *testing.T) { persistedWorker.StatusChangeClear() mocks.persist.EXPECT().SaveWorker(mocks.ctx, &persistedWorker).Return(nil) - // TODO: expect all tasks assigned to the worker to get requeued. - // All the timeouts should be handled after the initial sleep. mocks.clock.Add(timeoutInitialSleep) }