Manager: Add MQTT client for sending events

Add an MQTT client to send events from the event bus to an MQTT broker.
This commit is contained in:
Sybren A. Stüvel 2024-02-03 01:04:30 +01:00
parent 2af9a3e98e
commit 4fe8605744
6 changed files with 189 additions and 5 deletions

@ -4,6 +4,10 @@ This file contains the history of changes to Flamenco. Only changes that might
be interesting for users are listed here, such as new features and fixes for
bugs in actually-released versions.
## 3.5 - in development
- Add MQTT support. Flamenco Manager can now send internal events to an MQTT broker.
## 3.4 - released 2024-01-12
- Fix [#104263: Error performing BAT pack in Windows with shared storage](https://projects.blender.org/studio/flamenco/issues/104263).

@ -156,6 +156,10 @@ func runFlamencoManager() bool {
eventBroker := eventbus.NewBroker()
socketio := eventbus.NewSocketIOForwarder()
eventBroker.AddForwarder(socketio)
mqttClient := eventbus.NewMQTTForwarder(configService.Get().MQTT.Client)
if mqttClient != nil {
eventBroker.AddForwarder(mqttClient)
}
localStorage := local_storage.NewNextToExe(configService.Get().LocalManagerStoragePath)
logStorage := task_logs.NewStorage(localStorage, timeService, eventBroker)
@ -186,6 +190,10 @@ func runFlamencoManager() bool {
installSignalHandler(mainCtxCancel)
if mqttClient != nil {
mqttClient.Connect(mainCtx)
}
// Before doing anything new, clean up in case we made a mess in an earlier run.
taskStateMachine.CheckStuck(mainCtx)

6
go.mod

@ -10,6 +10,7 @@ require (
github.com/disintegration/imaging v1.6.2
github.com/dop251/goja v0.0.0-20230812105242-81d76064690d
github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d
github.com/eclipse/paho.golang v0.12.0
github.com/fromkeith/gossdp v0.0.0-20180102154144-1b2c43f6886e
github.com/gertd/go-pluralize v0.2.1
github.com/getkin/kin-openapi v0.88.0
@ -24,7 +25,7 @@ require (
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8
github.com/pressly/goose/v3 v3.15.1
github.com/rs/zerolog v1.26.1
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.4
github.com/zcalusic/sysinfo v1.0.1
github.com/ziflex/lecho/v3 v3.1.0
golang.org/x/crypto v0.16.0
@ -47,7 +48,7 @@ require (
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/labstack/gommon v0.4.0 // indirect
@ -58,6 +59,7 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.1 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
golang.org/x/tools v0.16.1 // indirect

15
go.sum

@ -31,6 +31,8 @@ github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d h1:W1n4DvpzZGOI
github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8aVqWbuLRMHItjPUyqdj+HWPvnQe8V8y1nDpIbM=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/eclipse/paho.golang v0.12.0 h1:EXQFJbJklDnUqW6lyAknMWRhM2NgpHxwrrL8riUmp3Q=
github.com/eclipse/paho.golang v0.12.0/go.mod h1:TSDCUivu9JnoR9Hl+H7sQMcHkejWH2/xKK1NJGtLbIE=
github.com/fromkeith/gossdp v0.0.0-20180102154144-1b2c43f6886e h1:cG4ivpkHpkmWTaaLrgekDVR0xAr87V697T2c+WnUdiY=
github.com/fromkeith/gossdp v0.0.0-20180102154144-1b2c43f6886e/go.mod h1:7xQpS/YtlWo38XfIqje9GgtlPuBRatYcL23GlYBtgWM=
github.com/gertd/go-pluralize v0.2.1 h1:M3uASbVjMnTsPb0PNqg+E/24Vwigyo/tvyMTtAlLgiA=
@ -72,6 +74,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219 h1:utua3L2IbQJmauC5IXdEA547bcoU5dozgQAfc8Onsg4=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
@ -81,8 +85,8 @@ github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/graarh/golang-socketio v0.0.0-20170510162725-2c44953b9b5f h1:utzdm9zUvVWGRtIpkdE4+36n+Gv60kNb7mFvgGxLElY=
github.com/graarh/golang-socketio v0.0.0-20170510162725-2c44953b9b5f/go.mod h1:8gudiNCFh3ZfvInknmoXzPeV17FSH+X2J5k2cUPIwnA=
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
@ -162,8 +166,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn0=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
@ -181,6 +186,8 @@ github.com/zcalusic/sysinfo v1.0.1 h1:cVh8q3codjh43AGRTa54dJ2Zq+qPejv8n2VWpxKViw
github.com/zcalusic/sysinfo v1.0.1/go.mod h1:LxwKwtQdbTIQc65drhjQzYzt0o7jfB80LrrZm7SWn8o=
github.com/ziflex/lecho/v3 v3.1.0 h1:65bSzSc0yw7EEhi44lMnkOI877ZzbE7tGDWfYCQXZwI=
github.com/ziflex/lecho/v3 v3.1.0/go.mod h1:dwQ6xCAKmSBHhwZ6XmiAiDptD7iklVkW7xQYGUncX0Q=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -220,6 +227,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

@ -21,6 +21,7 @@ import (
yaml "gopkg.in/yaml.v2"
"projects.blender.org/studio/flamenco/internal/appinfo"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/pkg/crosspath"
shaman_config "projects.blender.org/studio/flamenco/pkg/shaman/config"
)
@ -101,6 +102,8 @@ type Base struct {
// When this many workers have tried the task and failed, it will be hard-failed
// (even when there are workers left that could technically retry the task).
TaskFailAfterSoftFailCount int `yaml:"task_fail_after_softfail_count"`
MQTT MQTTConfig `yaml:"mqtt"`
}
// GarbageCollect contains the config options for the GC.
@ -117,6 +120,11 @@ type ShamanGarbageCollect struct {
SilentlyDisable bool `yaml:"-"`
}
// MQTTConfig contains the configuration options for MQTT broker (idea for the future) and client.
type MQTTConfig struct {
Client eventbus.MQTTClientConfig `yaml:"client"`
}
// Conf is the latest version of the configuration.
// Currently it is version 3.
type Conf struct {

@ -0,0 +1,153 @@
package eventbus
import (
"context"
"encoding/json"
"net/url"
"strings"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"github.com/rs/zerolog/log"
)
const (
defaultClientID = "flamenco"
keepAlive = 30 // seconds
connectRetryDelay = 10 * time.Second
mqttQoS = 1
)
type MQTTForwarder struct {
config autopaho.ClientConfig
conn *autopaho.ConnectionManager
topicPrefix string
// Context to use when publishing messages.
ctx context.Context
}
var _ Forwarder = (*MQTTForwarder)(nil)
// MQTTClientConfig contains the MQTT client configuration.
type MQTTClientConfig struct {
BrokerURL string `yaml:"broker"`
ClientID string `yaml:"clientID"`
TopicPrefix string `yaml:"topic_prefix"`
Username string `yaml:"username"`
Password string `yaml:"password"`
}
func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder {
config.BrokerURL = strings.TrimSpace(config.BrokerURL)
config.ClientID = strings.TrimSpace(config.ClientID)
if config.BrokerURL == "" {
return nil
}
if config.ClientID == "" {
config.ClientID = defaultClientID
}
serverURL, err := url.Parse(config.BrokerURL)
if err != nil {
log.Error().
Err(err).
Str("mqttServerURL", config.BrokerURL).
Msg("could not parse MQTT server URL, skipping creation of MQTT client")
return nil
}
client := MQTTForwarder{
topicPrefix: config.TopicPrefix,
}
client.config = autopaho.ClientConfig{
BrokerUrls: []*url.URL{serverURL},
KeepAlive: keepAlive,
ConnectRetryDelay: connectRetryDelay,
OnConnectionUp: client.onConnectionUp,
OnConnectError: client.onConnectionError,
Debug: paho.NOOPLogger{},
ClientConfig: paho.ClientConfig{
ClientID: config.ClientID,
OnClientError: client.onClientError,
OnServerDisconnect: client.onServerDisconnect,
},
}
client.config.SetUsernamePassword(config.Username, []byte(config.Password))
return &client
}
func (m *MQTTForwarder) Connect(ctx context.Context) {
log.Debug().Msg("mqtt client: connecting")
conn, err := autopaho.NewConnection(ctx, m.config)
if err != nil {
panic(err)
}
m.conn = conn
m.ctx = ctx
}
func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) {
log.Info().Msg("mqtt client: connection established")
}
func (m *MQTTForwarder) onConnectionError(err error) {
log.Warn().AnErr("cause", err).Msg("mqtt client: connection error")
}
func (m *MQTTForwarder) onClientError(err error) {
log.Warn().AnErr("cause", err).Msg("mqtt client: server requested disconnect")
}
func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) {
logEntry := log.Warn()
if d.Properties != nil {
logEntry = logEntry.Str("reason", d.Properties.ReasonString)
} else {
logEntry = logEntry.Int("reasonCode", int(d.ReasonCode))
}
logEntry.Msg("mqtt client: server requested disconnect")
}
func (c *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) {
fullTopic := c.topicPrefix + string(topic)
logger := log.With().
Str("topic", fullTopic).
Interface("event", payload).
Logger()
asJSON, err := json.Marshal(payload)
if err != nil {
logger.Error().AnErr("cause", err).Interface("event", payload).
Msg("mqtt client: could not convert event to JSON")
return
}
// Publish will block so we run it in a GoRoutine.
// TODO: might be a good idea todo this at the event broker level, rather than in this function.
go func(topic string, msg []byte) {
pr, err := c.conn.Publish(c.ctx, &paho.Publish{
QoS: mqttQoS,
Topic: topic,
Payload: msg,
})
switch {
case err != nil:
logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event")
return
case pr.ReasonCode == 16:
logger.Debug().Msg("mqtt client: event sent to server, but there were no subscribers")
return
case pr.ReasonCode != 0:
logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt server")
default:
logger.Debug().Msg("mqtt client: event sent to server")
}
}(fullTopic, asJSON)
}