diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index 2bdb673e..1ab687ea 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -166,6 +166,18 @@ func (sm *StateMachine) jobStatusIfAThenB( return sm.JobStatusChange(ctx, job, thenStatus, reason) } +// isJobPausingComplete returns true when the job status is pause-requested and there are no more active tasks. +func (sm *StateMachine) isJobPausingComplete(ctx context.Context, job *persistence.Job) (bool, error) { + if job.Status != api.JobStatusPauseRequested { + return false, nil + } + numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive) + if err != nil { + return false, err + } + return numActive == 0, nil +} + // updateJobOnTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job. func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { // If no more tasks can run, cancel the job. @@ -180,6 +192,15 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job") } + // Deal with the special case when the job is in pause-requested status. + toBePaused, err := sm.isJobPausingComplete(ctx, job) + if err != nil { + return err + } + if toBePaused { + return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task cancellation") + } + return nil } @@ -204,6 +225,16 @@ func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger } // If the job didn't fail, this failure indicates that at least the job is active. failLogger.Info().Msg("task failed, but not enough to fail the job") + + // Deal with the special case when the job is in pause-requested status. + toBePaused, err := sm.isJobPausingComplete(ctx, job) + if err != nil { + return err + } + if toBePaused { + return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task failure") + } + return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, "task failed, but not enough to fail the job") } @@ -218,6 +249,16 @@ func (sm *StateMachine) updateJobOnTaskStatusCompleted(ctx context.Context, logg logger.Info().Msg("all tasks of job are completed, job is completed") return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed") } + + // Deal with the special case when the job is in pause-requested status. + toBePaused, err := sm.isJobPausingComplete(ctx, job) + if err != nil { + return err + } + if toBePaused { + return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task completion") + } + logger.Info(). Int("taskNumTotal", numTotal). Int("taskNumComplete", numComplete). @@ -369,7 +410,7 @@ func (sm *StateMachine) updateTasksAfterJobStatusChange( // Every case in this switch MUST return, for sanity sake. switch job.Status { - case api.JobStatusCompleted, api.JobStatusCanceled: + case api.JobStatusCompleted, api.JobStatusCanceled, api.JobStatusPaused: // Nothing to do; this will happen as a response to all tasks receiving this status. return tasksUpdateResult{}, nil @@ -385,6 +426,13 @@ func (sm *StateMachine) updateTasksAfterJobStatusChange( massTaskUpdate: true, }, err + case api.JobStatusPauseRequested: + jobStatus, err := sm.pauseTasks(ctx, logger, job) + return tasksUpdateResult{ + followingJobStatus: jobStatus, + massTaskUpdate: true, + }, err + case api.JobStatusRequeueing: jobStatus, err := sm.requeueTasks(ctx, logger, job, oldJobStatus) return tasksUpdateResult{ @@ -438,6 +486,38 @@ func (sm *StateMachine) cancelTasks( return "", nil } +func (sm *StateMachine) pauseTasks( + ctx context.Context, logger zerolog.Logger, job *persistence.Job, +) (api.JobStatus, error) { + logger.Info().Msg("pausing tasks of job") + + // Any task that might run in the future should get paused. + // Active tasks should remain active until finished. + taskStatusesToPause := []api.TaskStatus{ + api.TaskStatusQueued, + api.TaskStatusSoftFailed, + } + err := sm.persist.UpdateJobsTaskStatusesConditional( + ctx, job, taskStatusesToPause, api.TaskStatusPaused, + fmt.Sprintf("Manager paused this task because the job got status %q.", job.Status), + ) + if err != nil { + return "", fmt.Errorf("pausing tasks of job %s: %w", job.UUID, err) + } + + // If pausing was requested, it has now happened, so the job can transition. + toBePaused, err := sm.isJobPausingComplete(ctx, job) + if err != nil { + return "", err + } + if toBePaused { + logger.Info().Msg("all tasks of job paused, job can go to 'paused' status") + return api.JobStatusPaused, nil + } + + return api.JobStatusPauseRequested, nil +} + // requeueTasks re-queues all tasks of the job. // // This function assumes that the current job status is "requeueing". diff --git a/internal/manager/task_state_machine/task_state_machine_test.go b/internal/manager/task_state_machine/task_state_machine_test.go index 97f67fc4..531bd7ca 100644 --- a/internal/manager/task_state_machine/task_state_machine_test.go +++ b/internal/manager/task_state_machine/task_state_machine_test.go @@ -336,6 +336,94 @@ func TestJobCancelWithSomeCompletedTasks(t *testing.T) { require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusCancelRequested, "someone wrote a unittest")) } +func TestJobPauseWithAllQueuedTasks(t *testing.T) { + mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) + defer mockCtrl.Finish() + + task1 := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued) + task2 := taskOfSameJob(task1, api.TaskStatusQueued) + task3 := taskOfSameJob(task2, api.TaskStatusQueued) + job := task3.Job + + mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested) + + // Expect pausing of the job to trigger pausing of all its queued tasks. + mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job, + []api.TaskStatus{ + api.TaskStatusQueued, + api.TaskStatusSoftFailed, + }, + api.TaskStatusPaused, + "Manager paused this task because the job got status \"pause-requested\".", + ) + mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, + api.TaskStatusActive). + Return(0, 3, nil) + mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused) + mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusQueued, api.JobStatusPauseRequested) + mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused) + + require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest")) +} + +func TestJobPauseWithSomeCompletedTasks(t *testing.T) { + mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) + defer mockCtrl.Finish() + + task1 := taskWithStatus(api.JobStatusQueued, api.TaskStatusCompleted) + task2 := taskOfSameJob(task1, api.TaskStatusQueued) + task3 := taskOfSameJob(task2, api.TaskStatusQueued) + job := task3.Job + + mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested) + + // Expect pausing of the job to trigger pausing of all its queued tasks. + mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job, + []api.TaskStatus{ + api.TaskStatusQueued, + api.TaskStatusSoftFailed, + }, + api.TaskStatusPaused, + "Manager paused this task because the job got status \"pause-requested\".", + ) + mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, + api.TaskStatusActive). + Return(0, 3, nil) + mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused) + mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusQueued, api.JobStatusPauseRequested) + mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused) + + require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest")) +} + +func TestJobPauseWithSomeActiveTasks(t *testing.T) { + mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) + defer mockCtrl.Finish() + + task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusActive) + task2 := taskOfSameJob(task1, api.TaskStatusCompleted) + task3 := taskOfSameJob(task2, api.TaskStatusQueued) + job := task3.Job + + mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested) + + // Expect pausing of the job to trigger pausing of all its queued tasks. + mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job, + []api.TaskStatus{ + api.TaskStatusQueued, + api.TaskStatusSoftFailed, + }, + api.TaskStatusPaused, + "Manager paused this task because the job got status \"pause-requested\".", + ) + mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, + api.TaskStatusActive). + Return(1, 3, nil) + mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusPauseRequested) + + require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest")) +} + func TestCheckStuck(t *testing.T) { mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) defer mockCtrl.Finish() diff --git a/web/app/src/components/jobs/JobActionsBar.vue b/web/app/src/components/jobs/JobActionsBar.vue index 615ffae7..d69deff6 100644 --- a/web/app/src/components/jobs/JobActionsBar.vue +++ b/web/app/src/components/jobs/JobActionsBar.vue @@ -8,6 +8,9 @@ + @@ -69,6 +72,9 @@ export default { onButtonRequeue() { return this._handleJobActionPromise(this.jobs.requeueJobs(), 'requeueing'); }, + onButtonPause() { + return this._handleJobActionPromise(this.jobs.pauseJobs(), 'marked for pausing'); + }, _handleJobActionPromise(promise, description) { return promise.then(() => { diff --git a/web/app/src/stores/jobs.js b/web/app/src/stores/jobs.js index 84c31552..3e524717 100644 --- a/web/app/src/stores/jobs.js +++ b/web/app/src/stores/jobs.js @@ -33,6 +33,9 @@ export const useJobs = defineStore('jobs', { canRequeue() { return this._anyJobWithStatus(['canceled', 'completed', 'failed', 'paused']); }, + canPause() { + return this._anyJobWithStatus(['active', 'queued', 'canceled']); + }, }, actions: { setIsJobless(isJobless) { @@ -74,6 +77,9 @@ export const useJobs = defineStore('jobs', { cancelJobs() { return this._setJobStatus('cancel-requested'); }, + pauseJobs() { + return this._setJobStatus('pause-requested'); + }, requeueJobs() { return this._setJobStatus('requeueing'); }, diff --git a/web/app/src/stores/tasks.js b/web/app/src/stores/tasks.js index afcdf393..351dff6e 100644 --- a/web/app/src/stores/tasks.js +++ b/web/app/src/stores/tasks.js @@ -2,6 +2,7 @@ import { defineStore } from 'pinia'; import * as API from '@/manager-api'; import { getAPIClient } from '@/api-client'; +import { useJobs } from '@/stores/jobs'; const jobsAPI = new API.JobsApi(getAPIClient()); @@ -19,6 +20,21 @@ export const useTasks = defineStore('tasks', { }), getters: { canCancel() { + const jobs = useJobs(); + const activeJob = jobs.activeJob; + + if (!activeJob) { + console.warn('no active job, unable to determine whether the active task is cancellable'); + return false; + } + + if (activeJob.status == 'pause-requested') { + // Cancelling a task should not be possible while the job is being paused. + // In the future this might be supported, see issue #104315. + return false; + } + + // Allow cancellation for specified task statuses. return this._anyTaskWithStatus(['queued', 'active', 'soft-failed']); }, canRequeue() {