Sybren A. Stüvel
02fac6a4df
Change the package base name of the Go code, from `git.blender.org/flamenco` to `projects.blender.org/studio/flamenco`. The old location, `git.blender.org`, has no longer been use since the [migration to Gitea][1]. The new package names now reflect the actual location where Flamenco is hosted. [1]: https://code.blender.org/2023/02/new-blender-development-infrastructure/
210 lines
5.2 KiB
Go
210 lines
5.2 KiB
Go
package worker
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
const (
|
|
// How long to wait to fetch another task...
|
|
durationNoTask = 2 * time.Second // ... if there is no task now.
|
|
durationFetchFailed = 10 * time.Second // ... if fetching failed somehow.
|
|
durationTaskComplete = 2 * time.Second // ... when a task was completed.
|
|
|
|
mayKeepRunningPeriod = 10 * time.Second
|
|
)
|
|
|
|
// Implement error interface for `api.MayKeepRunning` to indicate a task run was
|
|
// aborted due to the Manager saying "NO".
|
|
type taskRunAborted api.MayKeepRunning
|
|
|
|
func (tra taskRunAborted) Error() string {
|
|
switch {
|
|
case tra.MayKeepRunning:
|
|
return "task could have been kept running"
|
|
case tra.StatusChangeRequested:
|
|
return "worker status change requested"
|
|
case tra.Reason == "":
|
|
return "manager said NO"
|
|
}
|
|
return tra.Reason
|
|
}
|
|
|
|
func (w *Worker) gotoStateAwake(ctx context.Context) {
|
|
w.stateMutex.Lock()
|
|
w.state = api.WorkerStatusAwake
|
|
w.stateMutex.Unlock()
|
|
|
|
w.doneWg.Add(2)
|
|
w.ackStateChange(ctx, w.state)
|
|
|
|
go w.runStateAwake(ctx)
|
|
}
|
|
|
|
// runStateAwake fetches a task and executes it, in an endless loop.
|
|
func (w *Worker) runStateAwake(ctx context.Context) {
|
|
defer func() {
|
|
err := recover()
|
|
if err != nil {
|
|
w.SignOff(ctx)
|
|
logger := w.loggerWithStatus()
|
|
logger.Panic().
|
|
Interface("panic", err).
|
|
Msg("panic, so signed off and going to stop")
|
|
}
|
|
}()
|
|
|
|
defer w.doneWg.Done()
|
|
defer log.Debug().Msg("stopping state 'awake'")
|
|
|
|
for {
|
|
task := w.fetchTask(ctx)
|
|
if task == nil {
|
|
return
|
|
}
|
|
|
|
// The task runner's listener will be responsible for sending results back
|
|
// to the Manager. This code only needs to fetch a task and run it.
|
|
err := w.runTask(ctx, *task)
|
|
if err != nil {
|
|
var abortError taskRunAborted
|
|
if errors.As(err, &abortError) {
|
|
log.Warn().
|
|
Str("task", task.Uuid).
|
|
Str("reason", err.Error()).
|
|
Msg("task aborted by request of Manager")
|
|
} else if errors.Is(err, context.Canceled) {
|
|
log.Warn().Interface("task", *task).Msg("task aborted due to context being closed")
|
|
} else {
|
|
log.Warn().Err(err).Interface("task", *task).Msg("error executing task")
|
|
}
|
|
}
|
|
|
|
// Do some rate limiting. This is mostly useful while developing.
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(durationTaskComplete):
|
|
}
|
|
}
|
|
}
|
|
|
|
// fetchTasks periodically tries to fetch a task from the Manager, returning it when obtained.
|
|
// Returns nil when a task could not be obtained and the period loop was cancelled.
|
|
func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask {
|
|
logger := w.loggerWithStatus()
|
|
|
|
// Initially don't wait at all.
|
|
var wait time.Duration
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Debug().Msg("task fetching interrupted by context cancellation")
|
|
return nil
|
|
case <-w.doneChan:
|
|
logger.Debug().Msg("task fetching interrupted by shutdown")
|
|
return nil
|
|
case <-time.After(wait):
|
|
}
|
|
|
|
logger.Debug().Msg("fetching tasks")
|
|
resp, err := w.client.ScheduleTaskWithResponse(ctx)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error obtaining task")
|
|
wait = durationFetchFailed
|
|
continue
|
|
}
|
|
switch {
|
|
case resp.JSON200 != nil:
|
|
log.Info().
|
|
Interface("task", resp.JSON200).
|
|
Msg("obtained task")
|
|
return resp.JSON200
|
|
case resp.JSON423 != nil:
|
|
log.Info().
|
|
Str("requestedStatus", string(resp.JSON423.StatusRequested)).
|
|
Msg("Manager requests status change")
|
|
w.changeState(ctx, resp.JSON423.StatusRequested)
|
|
return nil
|
|
case resp.JSON403 != nil:
|
|
log.Error().
|
|
Int("code", resp.StatusCode()).
|
|
Str("error", string(resp.JSON403.Message)).
|
|
Msg("access denied")
|
|
wait = durationFetchFailed
|
|
case resp.StatusCode() == http.StatusNoContent:
|
|
log.Debug().Msg("no task available")
|
|
// TODO: implement gradual back-off, to avoid too frequent checks when the
|
|
// farm is idle.
|
|
wait = durationNoTask
|
|
default:
|
|
log.Warn().
|
|
Int("code", resp.StatusCode()).
|
|
Str("error", string(resp.Body)).
|
|
Msg("unable to obtain task for unknown reason")
|
|
wait = durationFetchFailed
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// runTask runs the given task.
|
|
func (w *Worker) runTask(ctx context.Context, task api.AssignedTask) error {
|
|
// Create a sub-context to manage the life-span of both the running of the
|
|
// task and the loop to check whether we're still allowed to run it.
|
|
taskCtx, taskCancel := context.WithCancel(ctx)
|
|
defer taskCancel()
|
|
|
|
var taskRunnerErr, abortReason error
|
|
|
|
// Run the actual task in a separate goroutine.
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
defer taskCancel()
|
|
taskRunnerErr = w.taskRunner.Run(taskCtx, task)
|
|
}()
|
|
|
|
// Do a periodic check to see if we're actually allowed to run this task.
|
|
checkloop:
|
|
for {
|
|
select {
|
|
case <-taskCtx.Done():
|
|
// The task is done, no more need to check.
|
|
break checkloop
|
|
case <-time.After(mayKeepRunningPeriod):
|
|
// Time to do another check.
|
|
break
|
|
}
|
|
|
|
mkr := w.mayIKeepRunning(taskCtx, task.Uuid)
|
|
if mkr.MayKeepRunning {
|
|
continue
|
|
}
|
|
|
|
abortReason = taskRunAborted(mkr)
|
|
taskCancel()
|
|
break checkloop
|
|
}
|
|
|
|
// Wait for the task runner to either complete or abort.
|
|
wg.Wait()
|
|
|
|
if abortReason != nil {
|
|
return abortReason
|
|
}
|
|
|
|
return taskRunnerErr
|
|
}
|