Implement may-I-keep-running protocol

Worker and Manager implementation of the "may-I-kee-running" protocol.

While running tasks, the Worker will ask the Manager periodically
whether it's still allowed to keep running that task. This allows the
Manager to abort commands on Workers when:

- the Worker should go to another state (typically 'asleep' or
  'shutdown'),
- the task changed status from 'active' to something non-runnable
  (typically 'canceled' when the job as a whole is canceled).
- the task has been assigned to a different Worker. This can happen when
  a Worker loses its connection to its Manager, resulting in a task
  timeout (not yet implemented) after which the task can be assigned to
  another Worker. If then the connectivity is restored, the first Worker
  should abort (last-assigned Worker wins).
This commit is contained in:
Sybren A. Stüvel 2022-05-12 11:04:05 +02:00
parent fd16f7939e
commit 0b39f229a1
12 changed files with 323 additions and 21 deletions

@ -7,13 +7,14 @@ curl -X 'POST' \
-d '{
"metadata": {
"project": "Debugging Flamenco",
"user.name": "コードモンキー"
"user.name": "dr. Sybren",
"duration": "long"
},
"name": "Talk & Sleep",
"priority": 50,
"name": "Talk & Sleep longer",
"priority": 3,
"settings": {
"sleep_duration_seconds": 2,
"message": "{blender}"
"sleep_duration_seconds": 20,
"message": "Blender is {blender}"
},
"type": "echo-sleep-test"
}'

@ -9,6 +9,7 @@ import (
"io"
"net/http"
"strconv"
"sync"
"time"
"git.blender.org/flamenco/internal/manager/job_compilers"
@ -29,6 +30,11 @@ type Flamenco struct {
config ConfigService
stateMachine TaskStateMachine
shaman Shaman
// The task scheduler can be locked to prevent multiple Workers from getting
// the same task. It is also used for certain other queries, like
// `MayWorkerRun` to prevent similar race conditions.
taskSchedulerMutex sync.Mutex
}
var _ api.ServerInterface = (*Flamenco)(nil)

@ -4,6 +4,7 @@ package api_impl
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
@ -15,6 +16,7 @@ import (
"golang.org/x/crypto/bcrypt"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/task_state_machine"
"git.blender.org/flamenco/pkg/api"
)
@ -249,6 +251,9 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
worker := requestWorkerOrPanic(e)
logger.Debug().Msg("worker requesting task")
f.taskSchedulerMutex.Lock()
defer f.taskSchedulerMutex.Unlock()
// Check that this worker is actually allowed to do work.
requiredStatusToGetTask := api.WorkerStatusAwake
if worker.Status != api.WorkerStatusAwake {
@ -307,3 +312,58 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
customisedTask := replaceTaskVariables(f.config, apiTask, *worker)
return e.JSON(http.StatusOK, customisedTask)
}
func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error {
logger := requestLogger(e)
worker := requestWorkerOrPanic(e)
if _, err := uuid.Parse(taskID); err != nil {
logger.Debug().Msg("invalid task ID received")
return sendAPIError(e, http.StatusBadRequest, "task ID not valid")
}
logger = logger.With().Str("task", taskID).Logger()
// Lock the task scheduler so that tasks don't get reassigned while we perform our checks.
f.taskSchedulerMutex.Lock()
defer f.taskSchedulerMutex.Unlock()
// Fetch the task, to see if this worker is allowed to run it.
ctx := e.Request().Context()
dbTask, err := f.persist.FetchTask(ctx, taskID)
if err != nil {
if errors.Is(err, persistence.ErrTaskNotFound) {
mkr := api.MayKeepRunning{Reason: "Task not found"}
return e.JSON(http.StatusOK, mkr)
}
logger.Error().Err(err).Msg("MayWorkerRun: cannot fetch task")
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
}
if dbTask == nil {
panic("task could not be fetched, but database gave no error either")
}
mkr := mayWorkerRun(worker, dbTask)
if mkr.MayKeepRunning {
// TODO: record that this worker "touched" this task, for timeout calculations.
}
return e.JSON(http.StatusOK, mkr)
}
// mayWorkerRun checks the worker and the task, to see if this worker may keep running this task.
func mayWorkerRun(worker *persistence.Worker, dbTask *persistence.Task) api.MayKeepRunning {
if worker.StatusRequested != "" {
return api.MayKeepRunning{
Reason: "worker status change requested",
StatusChangeRequested: true,
}
}
if dbTask.WorkerID == nil || *dbTask.WorkerID != worker.ID {
return api.MayKeepRunning{Reason: "task not assigned to this worker"}
}
if !task_state_machine.IsRunnableTaskStatus(dbTask.Status) {
return api.MayKeepRunning{Reason: fmt.Sprintf("task is in non-runnable status %q", dbTask.Status)}
}
return api.MayKeepRunning{MayKeepRunning: true}
}

@ -8,6 +8,7 @@ import (
"testing"
"github.com/golang/mock/gomock"
"github.com/labstack/echo/v4"
"github.com/stretchr/testify/assert"
"git.blender.org/flamenco/internal/manager/persistence"
@ -139,3 +140,80 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
resp := getRecordedResponse(echo)
assert.Equal(t, http.StatusNoContent, resp.StatusCode)
}
func TestMayWorkerRun(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
prepareRequest := func() echo.Context {
echo := mf.prepareMockedRequest(nil)
requestWorkerStore(echo, &worker)
return echo
}
job := persistence.Job{
UUID: "583a7d59-887a-4c6c-b3e4-a753018f71b0",
}
task := persistence.Task{
UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503",
Job: &job,
Status: api.TaskStatusActive,
}
mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil).AnyTimes()
// Test: unhappy, task unassigned
{
echo := prepareRequest()
err := mf.flamenco.MayWorkerRun(echo, task.UUID)
assert.NoError(t, err)
assertResponseJSON(t, echo, http.StatusOK, api.MayKeepRunning{
MayKeepRunning: false,
Reason: "task not assigned to this worker",
})
}
// Test: happy, task assigned to this worker.
{
echo := prepareRequest()
task.WorkerID = &worker.ID
err := mf.flamenco.MayWorkerRun(echo, task.UUID)
assert.NoError(t, err)
assertResponseJSON(t, echo, http.StatusOK, api.MayKeepRunning{
MayKeepRunning: true,
})
}
// Test: unhappy, assigned but cancelled.
{
echo := prepareRequest()
task.WorkerID = &worker.ID
task.Status = api.TaskStatusCanceled
err := mf.flamenco.MayWorkerRun(echo, task.UUID)
assert.NoError(t, err)
assertResponseJSON(t, echo, http.StatusOK, api.MayKeepRunning{
MayKeepRunning: false,
Reason: "task is in non-runnable status \"canceled\"",
})
}
// Test: unhappy, assigned and runnable but worker should go to bed.
{
worker.StatusRequested = api.WorkerStatusAsleep
echo := prepareRequest()
task.WorkerID = &worker.ID
task.Status = api.TaskStatusActive
err := mf.flamenco.MayWorkerRun(echo, task.UUID)
assert.NoError(t, err)
assertResponseJSON(t, echo, http.StatusOK, api.MayKeepRunning{
MayKeepRunning: false,
Reason: "worker status change requested",
StatusChangeRequested: true,
})
}
}

@ -54,16 +54,6 @@ type ChangeBroadcaster interface {
// ChangeBroadcaster should be a subset of webupdates.BiDirComms
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
var (
// Task statuses that always get requeued when the job is requeued.
nonCompletedStatuses = []api.TaskStatus{
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,
api.TaskStatusSoftFailed,
}
)
func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster) *StateMachine {
return &StateMachine{
persist: persist,

@ -0,0 +1,26 @@
package task_state_machine
import "git.blender.org/flamenco/pkg/api"
var (
// Task statuses that always get requeued when the job is requeued.
nonCompletedStatuses = []api.TaskStatus{
api.TaskStatusCanceled,
api.TaskStatusFailed,
api.TaskStatusPaused,
api.TaskStatusSoftFailed,
}
// Workers are allowed to keep running tasks when they are in this status.
// 'queued', 'claimed-by-manager', and 'soft-failed' aren't considered runnable,
// as those statuses indicate the task wasn't assigned to a Worker by the scheduler.
runnableStatuses = map[api.TaskStatus]bool{
api.TaskStatusActive: true,
}
)
// IsRunnableTaskStatus returns whether the given status is considered "runnable".
// In other words, workers are allowed to keep running such tasks.
func IsRunnableTaskStatus(status api.TaskStatus) bool {
return runnableStatuses[status]
}

@ -35,10 +35,7 @@ func (w *Worker) runStateAsleep(ctx context.Context) {
logger.Debug().Msg("asleep state interrupted by shutdown")
return
case <-time.After(durationSleepCheck):
newStatus := w.queryManagerForStateChange(ctx)
if newStatus != nil {
logger.Debug().Str("newStatus", string(*newStatus)).Msg("asleep state interrupted by state change")
w.changeState(ctx, *newStatus)
if w.changeStateIfRequested(ctx) {
return
}
}

@ -6,6 +6,7 @@ import (
"context"
"errors"
"net/http"
"sync"
"time"
"github.com/rs/zerolog/log"
@ -18,8 +19,26 @@ const (
durationNoTask = 2 * time.Second // ... if there is no task now.
durationFetchFailed = 10 * time.Second // ... if fetching failed somehow.
durationTaskComplete = 2 * time.Second // ... when a task was completed.
mayKeepRunningPeriod = 1 * time.Second
)
// Implement error interface for `api.MayKeepRunning` to indicate a task run was
// aborted due to the Manager saying "NO".
type taskRunAborted api.MayKeepRunning
func (tra taskRunAborted) Error() string {
switch {
case tra.MayKeepRunning:
return "task could have been kept running"
case tra.StatusChangeRequested:
return "worker status change requested"
case tra.Reason == "":
return "manager said NO"
}
return tra.Reason
}
func (w *Worker) gotoStateAwake(ctx context.Context) {
w.stateMutex.Lock()
w.state = api.WorkerStatusAwake
@ -55,9 +74,15 @@ func (w *Worker) runStateAwake(ctx context.Context) {
// The task runner's listener will be responsible for sending results back
// to the Manager. This code only needs to fetch a task and run it.
err := w.taskRunner.Run(ctx, *task)
err := w.runTask(ctx, *task)
if err != nil {
if errors.Is(err, context.Canceled) {
var abortError taskRunAborted
if errors.As(err, &abortError) {
log.Warn().
Str("task", task.Uuid).
Str("reason", err.Error()).
Msg("task aborted by request of Manager")
} else if errors.Is(err, context.Canceled) {
log.Warn().Interface("task", *task).Msg("task aborted due to context being closed")
} else {
log.Warn().Err(err).Interface("task", *task).Msg("error executing task")
@ -128,3 +153,53 @@ func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask {
}
}
// runTask runs the given task.
func (w *Worker) runTask(ctx context.Context, task api.AssignedTask) error {
// Create a sub-context to manage the life-span of both the running of the
// task and the loop to check whether we're still allowed to run it.
taskCtx, taskCancel := context.WithCancel(ctx)
defer taskCancel()
var taskRunnerErr, abortReason error
// Run the actual task in a separate goroutine.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
defer taskCancel()
taskRunnerErr = w.taskRunner.Run(taskCtx, task)
}()
// Do a periodic check to see if we're actually allowed to run this task.
checkloop:
for {
select {
case <-taskCtx.Done():
// The task is done, no more need to check.
break checkloop
case <-time.After(mayKeepRunningPeriod):
// Time to do another check.
break
}
mkr := w.mayIKeepRunning(taskCtx, task.Uuid)
if mkr.MayKeepRunning {
continue
}
abortReason = taskRunAborted(mkr)
taskCancel()
break checkloop
}
// Wait for the task runner to either complete or abort.
wg.Wait()
if abortReason != nil {
return abortReason
}
return taskRunnerErr
}

@ -36,6 +36,24 @@ func (w *Worker) changeState(ctx context.Context, newState api.WorkerStatus) {
starter(ctx)
}
// changeStateIfRequested asks the Manager whether a status change is required
// or not, and if so, goes to that state.
// Returns `true` when the status was changed, so that the caller knows to stop
// whatever it's doing.
func (w *Worker) changeStateIfRequested(ctx context.Context) bool {
newStatus := w.queryManagerForStateChange(ctx)
if newStatus == nil {
return false
}
log.Info().
Str("currentStatus", string(w.state)).
Str("newStatus", string(*newStatus)).
Msg("Manager requested state change")
w.changeState(ctx, *newStatus)
return true
}
// Confirm that we're now in a certain state.
//
// This ACK can be given without a request from the server, for example to support

@ -37,3 +37,38 @@ func (w *Worker) queryManagerForStateChange(ctx context.Context) *api.WorkerStat
return nil
}
// mayIKeepRunning asks the Manager whether we can keep running a certain task.
// Any error communicating with the Manager is logged but otherwise ignored.
func (w *Worker) mayIKeepRunning(ctx context.Context, taskID string) api.MayKeepRunning {
resp, err := w.client.MayWorkerRunWithResponse(ctx, taskID)
if err != nil {
log.Warn().
Err(err).
Str("task", taskID).
Msg("error asking Manager may-I-keep-running task")
return api.MayKeepRunning{MayKeepRunning: true}
}
switch {
case resp.JSON200 != nil:
mkr := *resp.JSON200
logCtx := log.With().
Str("task", taskID).
Bool("mayKeepRunning", mkr.MayKeepRunning).
Bool("statusChangeRequested", mkr.StatusChangeRequested)
if mkr.Reason != "" {
logCtx = logCtx.Str("reason", mkr.Reason)
}
logger := logCtx.Logger()
logger.Debug().Msg("may-i-keep-running response")
return mkr
default:
log.Warn().
Str("task", taskID).
Int("code", resp.StatusCode()).
Str("error", string(resp.Body)).
Msg("unable to check may-i-keep-running for unknown reason")
return api.MayKeepRunning{MayKeepRunning: true}
}
}

@ -18,3 +18,7 @@ const pinia = createPinia()
app.use(pinia)
app.use(router)
app.mount('#app')
// For debugging.
import { useJobs } from '@/stores/jobs';
window.jobs = useJobs();

@ -46,6 +46,18 @@ export default {
}),
mounted() {
window.jobsView = this;
this.jobs.$subscribe((mutation, state) => {
console.log("Pinia mutation:", mutation)
console.log("Pinia state :", state)
// // import { MutationType } from 'pinia'
// mutation.type // 'direct' | 'patch object' | 'patch function'
// // same as cartStore.$id
// mutation.storeId // 'cart'
// // only available with mutation.type === 'patch object'
// mutation.payload // patch object passed to cartStore.$patch()
})
this._fetchJob(this.jobID);
this._fetchTask(this.taskID);
},