Rename job status requeued
to requeueing
This commit is contained in:
parent
ce43eead9a
commit
7b664475ca
@ -37,7 +37,7 @@ Note that list is **not** in any specific order.
|
||||
- [ ] Port the old 'fail-requested' task status handling code to the new Manager
|
||||
- [ ] Ensure "task state machine" can run in a single database transaction.
|
||||
- [x] At startup check & fix "stuck" jobs.
|
||||
Example: jobs in statuses `cancel-requested`, `requeued`, etc.
|
||||
Example: jobs in statuses `cancel-requested`, `requeueing`, etc.
|
||||
- [ ] Task timeout monitoring
|
||||
- [ ] Worker blocklisting & failed task requeueing
|
||||
- [ ] Worker timeout monitoring
|
||||
|
@ -124,7 +124,7 @@ func TestFetchJobsInStatus(t *testing.T) {
|
||||
// Update a job status, query for two of the three used statuses.
|
||||
job1.Status = api.JobStatusQueued
|
||||
assert.NoError(t, db.SaveJobStatus(ctx, job1))
|
||||
job2.Status = api.JobStatusRequeued
|
||||
job2.Status = api.JobStatusRequeueing
|
||||
assert.NoError(t, db.SaveJobStatus(ctx, job2))
|
||||
|
||||
jobs, err = db.FetchJobsInStatus(ctx, api.JobStatusQueued, api.JobStatusUnderConstruction)
|
||||
|
@ -140,7 +140,7 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange(
|
||||
switch task.Status {
|
||||
case api.TaskStatusQueued:
|
||||
// Re-queueing a task on a completed job should re-queue the job too.
|
||||
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeued, "task was queued")
|
||||
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeueing, "task was queued")
|
||||
|
||||
case api.TaskStatusPaused:
|
||||
// Pausing a task has no impact on the job.
|
||||
@ -409,7 +409,7 @@ func (sm *StateMachine) updateTasksAfterJobStatusChange(
|
||||
massTaskUpdate: true,
|
||||
}, err
|
||||
|
||||
case api.JobStatusRequeued:
|
||||
case api.JobStatusRequeueing:
|
||||
jobStatus, err := sm.requeueTasks(ctx, logger, job, oldJobStatus)
|
||||
return tasksUpdateResult{
|
||||
followingJobStatus: jobStatus,
|
||||
@ -464,14 +464,14 @@ func (sm *StateMachine) cancelTasks(
|
||||
|
||||
// requeueTasks re-queues all tasks of the job.
|
||||
//
|
||||
// This function assumes that the current job status is "requeued".
|
||||
// This function assumes that the current job status is "requeueing".
|
||||
//
|
||||
// Returns the new job status, if this status transition should be followed by
|
||||
// another one.
|
||||
func (sm *StateMachine) requeueTasks(
|
||||
ctx context.Context, logger zerolog.Logger, job *persistence.Job, oldJobStatus api.JobStatus,
|
||||
) (api.JobStatus, error) {
|
||||
if job.Status != api.JobStatusRequeued {
|
||||
if job.Status != api.JobStatusRequeueing {
|
||||
logger.Warn().Msg("unexpected job status in StateMachine::requeueTasks()")
|
||||
}
|
||||
|
||||
@ -505,7 +505,7 @@ func (sm *StateMachine) requeueTasks(
|
||||
|
||||
// TODO: also reset the 'failed by workers' blacklist.
|
||||
|
||||
// The appropriate tasks have been requeued, so now the job can go from "requeued" to "queued".
|
||||
// The appropriate tasks have been requeued, so now the job can go from "requeueing" to "queued".
|
||||
return api.JobStatusQueued, nil
|
||||
}
|
||||
|
||||
@ -539,7 +539,7 @@ func (sm *StateMachine) checkTaskCompletion(
|
||||
// to run at startup of Flamenco Manager, and checks to see if there are any
|
||||
// jobs in a status that a human will not be able to fix otherwise.
|
||||
func (sm *StateMachine) CheckStuck(ctx context.Context) {
|
||||
stuckJobs, err := sm.persist.FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeued)
|
||||
stuckJobs, err := sm.persist.FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeueing)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("unable to fetch stuck jobs")
|
||||
return
|
||||
|
@ -143,19 +143,19 @@ func TestTaskStatusChangeRequeueOnCompletedJob(t *testing.T) {
|
||||
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
// T: completed > queued --> J: completed > requeued > queued
|
||||
// T: completed > queued --> J: completed > requeueing > queued
|
||||
task := taskWithStatus(api.JobStatusCompleted, api.TaskStatusCompleted)
|
||||
mocks.expectSaveTaskWithStatus(t, task, api.TaskStatusQueued)
|
||||
mocks.expectBroadcastTaskChange(task, api.TaskStatusCompleted, api.TaskStatusQueued)
|
||||
mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusRequeued)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(task.Job, api.JobStatusCompleted, api.JobStatusRequeued)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(task.Job, api.JobStatusRequeued, api.JobStatusQueued)
|
||||
mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusRequeueing)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(task.Job, api.JobStatusCompleted, api.JobStatusRequeueing)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(task.Job, api.JobStatusRequeueing, api.JobStatusQueued)
|
||||
|
||||
// Expect queueing of the job to trigger queueing of all its tasks, if those tasks were all completed before.
|
||||
// 2 out of 3 completed, because one was just queued.
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(2, 3, nil)
|
||||
mocks.persist.EXPECT().UpdateJobsTaskStatuses(ctx, task.Job, api.TaskStatusQueued,
|
||||
"Queued because job transitioned status from \"completed\" to \"requeued\"",
|
||||
"Queued because job transitioned status from \"completed\" to \"requeueing\"",
|
||||
)
|
||||
mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusQueued)
|
||||
|
||||
@ -236,7 +236,7 @@ func TestJobRequeueWithSomeCompletedTasks(t *testing.T) {
|
||||
// task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed)
|
||||
job := task1.Job
|
||||
|
||||
mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued)
|
||||
mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeueing)
|
||||
|
||||
// Expect queueing of the job to trigger queueing of all its not-yet-completed tasks.
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(1, 3, nil)
|
||||
@ -248,15 +248,15 @@ func TestJobRequeueWithSomeCompletedTasks(t *testing.T) {
|
||||
api.TaskStatusSoftFailed,
|
||||
},
|
||||
api.TaskStatusQueued,
|
||||
"Queued because job transitioned status from \"active\" to \"requeued\"",
|
||||
"Queued because job transitioned status from \"active\" to \"requeueing\"",
|
||||
)
|
||||
|
||||
mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued)
|
||||
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusRequeued)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeued, api.JobStatusQueued)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusRequeueing)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeueing, api.JobStatusQueued)
|
||||
|
||||
assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued, "someone wrote a unittest"))
|
||||
assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeueing, "someone wrote a unittest"))
|
||||
}
|
||||
|
||||
func TestJobRequeueWithAllCompletedTasks(t *testing.T) {
|
||||
@ -270,12 +270,12 @@ func TestJobRequeueWithAllCompletedTasks(t *testing.T) {
|
||||
// task3 := taskOfSameJob(task2, api.TaskStatusCompleted)
|
||||
job := task1.Job
|
||||
|
||||
call1 := mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued)
|
||||
call1 := mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeueing)
|
||||
|
||||
// Expect queueing of the job to trigger queueing of all its not-yet-completed tasks.
|
||||
updateCall := mocks.persist.EXPECT().
|
||||
UpdateJobsTaskStatuses(ctx, job, api.TaskStatusQueued,
|
||||
"Queued because job transitioned status from \"completed\" to \"requeued\"").
|
||||
"Queued because job transitioned status from \"completed\" to \"requeueing\"").
|
||||
After(call1)
|
||||
|
||||
saveJobCall := mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued).After(updateCall)
|
||||
@ -285,10 +285,10 @@ func TestJobRequeueWithAllCompletedTasks(t *testing.T) {
|
||||
Return(0, 3, nil). // By now all tasks are queued.
|
||||
After(saveJobCall)
|
||||
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusCompleted, api.JobStatusRequeued)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeued, api.JobStatusQueued)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusCompleted, api.JobStatusRequeueing)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeueing, api.JobStatusQueued)
|
||||
|
||||
assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeued, "someone wrote a unit test"))
|
||||
assert.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusRequeueing, "someone wrote a unit test"))
|
||||
}
|
||||
|
||||
func TestJobCancelWithSomeCompletedTasks(t *testing.T) {
|
||||
@ -330,9 +330,9 @@ func TestCheckStuck(t *testing.T) {
|
||||
// task2 := taskOfSameJob(task1, api.TaskStatusFailed)
|
||||
// task3 := taskOfSameJob(task2, api.TaskStatusSoftFailed)
|
||||
job := task1.Job
|
||||
job.Status = api.JobStatusRequeued
|
||||
job.Status = api.JobStatusRequeueing
|
||||
|
||||
mocks.persist.EXPECT().FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeued).
|
||||
mocks.persist.EXPECT().FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeueing).
|
||||
Return([]*persistence.Job{job}, nil)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted).Return(1, 3, nil)
|
||||
|
||||
@ -348,11 +348,11 @@ func TestCheckStuck(t *testing.T) {
|
||||
)
|
||||
|
||||
// Expect Job -> Queued and non-completed tasks -> Queued.
|
||||
mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeued) // should be called once for the current status
|
||||
mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued) // and then with the new status
|
||||
mocks.expectSaveJobWithStatus(t, job, api.JobStatusRequeueing) // should be called once for the current status
|
||||
mocks.expectSaveJobWithStatus(t, job, api.JobStatusQueued) // and then with the new status
|
||||
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeued, api.JobStatusRequeued)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeued, api.JobStatusQueued)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeueing, api.JobStatusRequeueing)
|
||||
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusRequeueing, api.JobStatusQueued)
|
||||
|
||||
sm.CheckStuck(ctx)
|
||||
}
|
||||
|
@ -3,7 +3,7 @@ package task_state_machine
|
||||
import "git.blender.org/flamenco/pkg/api"
|
||||
|
||||
var (
|
||||
// Task statuses that always get requeued when the job is requeued.
|
||||
// Task statuses that always get requeued when the job is requeueing.
|
||||
nonCompletedStatuses = []api.TaskStatus{
|
||||
api.TaskStatusCanceled,
|
||||
api.TaskStatusFailed,
|
||||
|
@ -49,7 +49,7 @@
|
||||
--color-status-soft-failed: hsl(356 70% 40%);
|
||||
|
||||
--color-status-queued: hsl(276, 100%, 50%);
|
||||
--color-status-requeued: hsl(276, 100%, 50%);
|
||||
--color-status-requeueing: hsl(276, 100%, 50%);
|
||||
--color-status-canceled: hsl(194 100% 46%);
|
||||
--color-status-paused: hsl(194 20% 46%);
|
||||
|
||||
@ -347,8 +347,8 @@ ul.status-filter-bar .status-filter-indicator .indicator {
|
||||
.status-cancel-requested {
|
||||
--indicator-color: var(--color-status-cancel-requested);
|
||||
}
|
||||
.status-requeued {
|
||||
--indicator-color: var(--color-status-requeued);
|
||||
.status-requeueing {
|
||||
--indicator-color: var(--color-status-requeueing);
|
||||
}
|
||||
.status-under-construction {
|
||||
--indicator-color: var(--color-status-under-construction);
|
||||
|
@ -29,7 +29,7 @@ export default {
|
||||
},
|
||||
onButtonRequeue() {
|
||||
return this._handleJobActionPromise(
|
||||
this.jobs.requeueJobs(), "requeued");
|
||||
this.jobs.requeueJobs(), "requeueing");
|
||||
},
|
||||
|
||||
_handleJobActionPromise(promise, description) {
|
||||
|
@ -24,7 +24,7 @@ export default {
|
||||
},
|
||||
onButtonRequeue() {
|
||||
return this._handleTaskActionPromise(
|
||||
this.tasks.requeueTasks(), "requeued");
|
||||
this.tasks.requeueTasks(), "requeueing");
|
||||
},
|
||||
|
||||
_handleTaskActionPromise(promise, description) {
|
||||
|
@ -64,7 +64,7 @@ export const useJobs = defineStore('jobs', {
|
||||
* code now assumes that only the active job needs to be operated on.
|
||||
*/
|
||||
cancelJobs() { return this._setJobStatus("cancel-requested"); },
|
||||
requeueJobs() { return this._setJobStatus("requeued"); },
|
||||
requeueJobs() { return this._setJobStatus("requeueing"); },
|
||||
|
||||
// Internal methods.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user