Manager: add lifecycle events to the event bus

Send events on Manager startup & shutdown. To make this possible, events
sent to MQTT are now queued up until an MQTT server can be reached.
Otherwise the startup event would be sent before the MQTT connection was
established.
This commit is contained in:
Sybren A. Stüvel 2024-02-21 22:20:44 +01:00
parent fd9605583f
commit 12bfa82854
4 changed files with 120 additions and 33 deletions

@ -38,6 +38,7 @@ import (
"projects.blender.org/studio/flamenco/internal/manager/timeout_checker"
"projects.blender.org/studio/flamenco/internal/own_url"
"projects.blender.org/studio/flamenco/internal/upnp_ssdp"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/pkg/shaman"
"projects.blender.org/studio/flamenco/pkg/sysinfo"
)
@ -56,6 +57,8 @@ const (
webappEntryPoint = "index.html"
)
type shutdownFunc func()
func main() {
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
log.Logger = log.Output(output)
@ -188,7 +191,19 @@ func runFlamencoManager() bool {
// once it closes.
mainCtx, mainCtxCancel := context.WithCancel(context.Background())
installSignalHandler(mainCtxCancel)
triggerShutdown := func() {
// Notify that Flamenco is shutting down.
event := eventbus.NewLifeCycleEvent(api.LifeCycleEventTypeManagerShutdown)
eventBroker.BroadcastLifeCycleEvent(event)
// Give event bus some time to process the shutdown event.
time.Sleep(500 * time.Millisecond)
// Cancel the main context, triggering an application-wide shutdown.
mainCtxCancel()
}
installSignalHandler(triggerShutdown)
if mqttClient != nil {
mqttClient.Connect(mainCtx)
@ -274,11 +289,17 @@ func runFlamencoManager() bool {
go openWebbrowser(mainCtx, urls[0])
}
// Notify that Flamenco has started.
{
event := eventbus.NewLifeCycleEvent(api.LifeCycleEventTypeManagerStartup)
eventBroker.BroadcastLifeCycleEvent(event)
}
// Allow the Flamenco API itself trigger a shutdown as well.
log.Debug().Msg("waiting for a shutdown request from Flamenco")
doRestart := flamenco.WaitForShutdown(mainCtx)
log.Info().Bool("willRestart", doRestart).Msg("going to shut down the service")
mainCtxCancel()
triggerShutdown()
wg.Wait()
log.Info().Bool("willRestart", doRestart).Msg("Flamenco Manager service shut down")
@ -362,14 +383,14 @@ func openDB(configService config.Service) *persistence.DB {
}
// installSignalHandler spawns a goroutine that handles incoming POSIX signals.
func installSignalHandler(cancelFunc context.CancelFunc) {
func installSignalHandler(shutdownFunc shutdownFunc) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
signal.Notify(signals, syscall.SIGTERM)
go func() {
for signum := range signals {
log.Info().Str("signal", signum.String()).Msg("signal received, shutting down")
cancelFunc()
shutdownFunc()
}
}()
}

@ -0,0 +1,20 @@
package eventbus
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/pkg/api"
)
func NewLifeCycleEvent(lifeCycleType api.LifeCycleEventType) api.EventLifeCycle {
event := api.EventLifeCycle{
Type: lifeCycleType,
}
return event
}
func (b *Broker) BroadcastLifeCycleEvent(event api.EventLifeCycle) {
log.Debug().Interface("event", event).Msg("eventbus: broadcasting lifecycle event")
b.broadcast(TopicLifeCycle, event)
}

@ -20,7 +20,8 @@ const (
keepAlive = 30 // seconds
connectRetryDelay = 10 * time.Second
mqttQoS = 1
mqttQoS = 1 // QoS field for MQTT events.
mqttQueueSize = 10 // How many events are queued when there is no connection to an MQTT broker.
)
type MQTTForwarder struct {
@ -30,6 +31,9 @@ type MQTTForwarder struct {
// Context to use when publishing messages.
ctx context.Context
queue chan mqttQueuedMessage
queueCancel context.CancelFunc
}
var _ Forwarder = (*MQTTForwarder)(nil)
@ -44,6 +48,11 @@ type MQTTClientConfig struct {
Password string `yaml:"password"`
}
type mqttQueuedMessage struct {
topic string
payload []byte
}
func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder {
config.BrokerURL = strings.TrimSpace(config.BrokerURL)
config.ClientID = strings.TrimSpace(config.ClientID)
@ -66,6 +75,7 @@ func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder {
client := MQTTForwarder{
topicPrefix: config.TopicPrefix,
queue: make(chan mqttQueuedMessage, mqttQueueSize),
}
client.config = autopaho.ClientConfig{
BrokerUrls: []*url.URL{brokerURL},
@ -93,11 +103,14 @@ func (m *MQTTForwarder) Connect(ctx context.Context) {
m.conn = conn
m.ctx = ctx
}
func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) {
m.logger().Info().Msg("mqtt client: connection established")
queueCtx, queueCtxCancel := context.WithCancel(m.ctx)
m.queueCancel = queueCtxCancel
go m.queueRunner(queueCtx)
}
func (m *MQTTForwarder) onConnectionError(err error) {
@ -109,6 +122,10 @@ func (m *MQTTForwarder) onClientError(err error) {
}
func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) {
if m.queueCancel != nil {
m.queueCancel()
}
logEntry := m.logger().Warn()
if d.Properties != nil {
logEntry = logEntry.Str("reason", d.Properties.ReasonString)
@ -118,42 +135,70 @@ func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) {
logEntry.Msg("mqtt client: broker requested disconnect")
}
func (m *MQTTForwarder) queueRunner(queueRunnerCtx context.Context) {
m.logger().Debug().Msg("mqtt client: starting queue runner")
defer m.logger().Debug().Msg("mqtt client: stopping queue runner")
for {
select {
case <-queueRunnerCtx.Done():
return
case message := <-m.queue:
m.sendEvent(message)
}
}
}
func (m *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) {
fullTopic := m.topicPrefix + string(topic)
logger := m.logger().With().
Str("topic", fullTopic).
// Interface("event", payload).
Logger()
asJSON, err := json.Marshal(payload)
if err != nil {
logger.Error().AnErr("cause", err).Interface("event", payload).
m.logger().Error().
Str("topic", fullTopic).
AnErr("cause", err).
Interface("event", payload).
Msg("mqtt client: could not convert event to JSON")
return
}
// Publish will block so we run it in a GoRoutine.
// TODO: might be a good idea todo this at the event broker level, rather than in this function.
go func(topic string, msg []byte) {
pr, err := m.conn.Publish(m.ctx, &paho.Publish{
QoS: mqttQoS,
Topic: topic,
Payload: msg,
})
switch {
case err != nil:
logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event")
return
case pr.ReasonCode == 16:
logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers")
return
case pr.ReasonCode != 0:
logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker")
default:
logger.Debug().Msg("mqtt client: event sent to broker")
}
}(fullTopic, asJSON)
// Queue the message, if we can.
message := mqttQueuedMessage{
topic: fullTopic,
payload: asJSON,
}
select {
case m.queue <- message:
// All good, message is queued.
default:
m.logger().Error().
Str("topic", fullTopic).
Msg("mqtt client: could not send event, queue is full")
}
}
func (m *MQTTForwarder) sendEvent(message mqttQueuedMessage) {
logger := m.logger().With().
Str("topic", message.topic).
Logger()
pr, err := m.conn.Publish(m.ctx, &paho.Publish{
QoS: mqttQoS,
Topic: message.topic,
Payload: message.payload,
})
switch {
case err != nil:
logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event")
return
case pr.ReasonCode == 16:
logger.Debug().Msg("mqtt client: event sent to broker, but there were no subscribers")
return
case pr.ReasonCode != 0:
logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt broker")
default:
logger.Debug().Msg("mqtt client: event sent to broker")
}
}
func (m *MQTTForwarder) logger() *zerolog.Logger {

@ -6,6 +6,7 @@ import "fmt"
const (
// Topics on which events are published.
TopicLifeCycle EventTopic = "/lifecycle" // sends api.EventLifeCycle
TopicJobUpdate EventTopic = "/jobs" // sends api.EventJobUpdate
TopicLastRenderedImage EventTopic = "/last-rendered" // sends api.EventLastRenderedUpdate
TopicTaskUpdate EventTopic = "/task" // sends api.EventTaskUpdate