Flamenco/internal/worker/worker.go
Sybren A. Stüvel 3e72391cbf Restartable workers
When the worker is started with `-restart-exit-code 47` or has
`restart_exit_code=47` in `flamenco-worker.yaml`, it's marked as
'restartable'. This will enable two worker actions 'Restart
(immediately)' and 'Restart (after task is finished)' in the Manager web
interface. When a worker is asked to restart, it will exit with exit
code `47`. Of course any positive exit code can be used here.
2023-08-14 16:00:09 +02:00

94 lines
2.1 KiB
Go

package worker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"sync"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/pkg/api"
)
// Worker performs regular Flamenco Worker operations.
type Worker struct {
doneChan chan struct{}
doneWg *sync.WaitGroup
// Will be closed by the Worker when it wants to shut down. See Worker.WaitForShutdown().
shutdown chan struct{}
restartAfterShutdown bool
client FlamencoClient
state api.WorkerStatus
stateStarters map[api.WorkerStatus]StateStarter // gotoStateXXX functions
stateMutex *sync.Mutex
taskRunner TaskRunner
}
type StateStarter func(context.Context)
type TaskRunner interface {
Run(ctx context.Context, task api.AssignedTask) error
}
// NewWorker constructs and returns a new Worker.
func NewWorker(
flamenco FlamencoClient,
taskRunner TaskRunner,
) *Worker {
worker := &Worker{
doneChan: make(chan struct{}),
doneWg: new(sync.WaitGroup),
shutdown: make(chan struct{}),
client: flamenco,
state: api.WorkerStatusStarting,
stateStarters: make(map[api.WorkerStatus]StateStarter),
stateMutex: new(sync.Mutex),
taskRunner: taskRunner,
}
worker.setupStateMachine()
return worker
}
// Start starts the worker by sending it to the given state.
func (w *Worker) Start(ctx context.Context, state api.WorkerStatus) {
w.changeState(ctx, state)
}
// Close gracefully shuts down the Worker.
func (w *Worker) Close() {
log.Debug().Msg("worker gracefully shutting down")
close(w.doneChan)
w.doneWg.Wait()
}
type ShutdownReason int
const (
ReasonContextClosed ShutdownReason = iota // Main Context closed.
ReasonShutdownReq // Manager requested a shutdown.
ReasonRestartReq // Manager requested a restart.
)
// WaitForShutdown waits until Flamenco wants to shut down the application.
// Returns the reason of the shutdown.
func (w *Worker) WaitForShutdown(ctx context.Context) ShutdownReason {
select {
case <-ctx.Done():
return ReasonContextClosed
case <-w.shutdown:
if w.restartAfterShutdown {
return ReasonRestartReq
}
return ReasonShutdownReq
}
}