Manager: Introduce event bus system

Introduce an "event bus"-like system. It's more like a fan-out
broadcaster for certain events. Instead of directly sending events to
SocketIO, they are now sent to the broker, which in turn sends it to any
registered "forwarder". Currently there is ony one forwarder, for
SocketIO.

This opens the door for a proper MQTT client that sends the same events
to an MQTT server.
This commit is contained in:
Sybren A. Stüvel 2024-02-03 22:51:29 +01:00
parent 4f8d39f74a
commit 76a24243f0
22 changed files with 339 additions and 324 deletions

@ -26,6 +26,7 @@ import (
"projects.blender.org/studio/flamenco/internal/manager/api_impl"
"projects.blender.org/studio/flamenco/internal/manager/api_impl/dummy"
"projects.blender.org/studio/flamenco/internal/manager/config"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/job_compilers"
"projects.blender.org/studio/flamenco/internal/manager/job_deleter"
"projects.blender.org/studio/flamenco/internal/manager/last_rendered"
@ -35,7 +36,6 @@ import (
"projects.blender.org/studio/flamenco/internal/manager/task_logs"
"projects.blender.org/studio/flamenco/internal/manager/task_state_machine"
"projects.blender.org/studio/flamenco/internal/manager/timeout_checker"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/internal/own_url"
"projects.blender.org/studio/flamenco/internal/upnp_ssdp"
"projects.blender.org/studio/flamenco/pkg/shaman"
@ -152,29 +152,32 @@ func runFlamencoManager() bool {
log.Fatal().Err(err).Msg("error loading job compilers")
}
webUpdater := webupdates.New()
// Set up the event system.
eventBroker := eventbus.NewBroker()
socketio := eventbus.NewSocketIOForwarder()
eventBroker.AddForwarder(socketio)
localStorage := local_storage.NewNextToExe(configService.Get().LocalManagerStoragePath)
logStorage := task_logs.NewStorage(localStorage, timeService, webUpdater)
logStorage := task_logs.NewStorage(localStorage, timeService, eventBroker)
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage)
sleepScheduler := sleep_scheduler.New(timeService, persist, webUpdater)
taskStateMachine := task_state_machine.NewStateMachine(persist, eventBroker, logStorage)
sleepScheduler := sleep_scheduler.New(timeService, persist, eventBroker)
lastRender := last_rendered.New(localStorage)
shamanServer := buildShamanServer(configService, isFirstRun)
jobDeleter := job_deleter.NewService(persist, localStorage, webUpdater, shamanServer)
jobDeleter := job_deleter.NewService(persist, localStorage, eventBroker, shamanServer)
flamenco := api_impl.NewFlamenco(
compiler, persist, webUpdater, logStorage, configService,
compiler, persist, eventBroker, logStorage, configService,
taskStateMachine, shamanServer, timeService, lastRender,
localStorage, sleepScheduler, jobDeleter)
e := buildWebService(flamenco, persist, ssdp, webUpdater, urls, localStorage)
e := buildWebService(flamenco, persist, ssdp, socketio, urls, localStorage)
timeoutChecker := timeout_checker.New(
configService.Get().TaskTimeout,
configService.Get().WorkerTimeout,
timeService, persist, taskStateMachine, logStorage, webUpdater)
timeService, persist, taskStateMachine, logStorage, eventBroker)
// The main context determines the lifetime of the application. All
// long-running goroutines need to keep an eye on this, and stop their work

@ -20,9 +20,9 @@ import (
"github.com/ziflex/lecho/v3"
"projects.blender.org/studio/flamenco/internal/manager/api_impl"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/local_storage"
"projects.blender.org/studio/flamenco/internal/manager/swagger_ui"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/internal/upnp_ssdp"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/web"
@ -32,7 +32,7 @@ func buildWebService(
flamenco api.ServerInterface,
persist api_impl.PersistenceService,
ssdp *upnp_ssdp.Server,
webUpdater *webupdates.BiDirComms,
socketio *eventbus.SocketIOForwarder,
ownURLs []url.URL,
localStorage local_storage.StorageInfo,
) *echo.Echo {
@ -112,7 +112,7 @@ func buildWebService(
// Register routes.
api.RegisterHandlers(e, flamenco)
webUpdater.RegisterHandlers(e)
socketio.RegisterHandlers(e)
swagger_ui.RegisterSwaggerUIStaticFiles(e)
e.GET("/api/v3/openapi3.json", func(c echo.Context) error {
return c.JSON(http.StatusOK, swagger)

@ -14,13 +14,13 @@ import (
"github.com/rs/zerolog"
"projects.blender.org/studio/flamenco/internal/manager/config"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/job_compilers"
"projects.blender.org/studio/flamenco/internal/manager/job_deleter"
"projects.blender.org/studio/flamenco/internal/manager/last_rendered"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/sleep_scheduler"
"projects.blender.org/studio/flamenco/internal/manager/task_state_machine"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/pkg/shaman"
)
@ -125,8 +125,8 @@ type ChangeBroadcaster interface {
BroadcastNewWorkerTag(workerTagUpdate api.SocketIOWorkerTagUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms.
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
// ChangeBroadcaster should be a subset of eventbus.Broker.
var _ ChangeBroadcaster = (*eventbus.Broker)(nil)
type JobCompiler interface {
ListJobTypes() api.AvailableJobTypes

@ -16,9 +16,9 @@ import (
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/job_compilers"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/internal/uuid"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/pkg/crosspath"
@ -105,7 +105,7 @@ func (f *Flamenco) SubmitJob(e echo.Context) error {
return sendAPIError(e, http.StatusInternalServerError, "error retrieving job from database")
}
jobUpdate := webupdates.NewJobUpdate(dbJob)
jobUpdate := eventbus.NewJobUpdate(dbJob)
f.broadcaster.BroadcastNewJob(jobUpdate)
apiJob := jobDBtoAPI(dbJob)
@ -365,7 +365,7 @@ func (f *Flamenco) SetJobPriority(e echo.Context, jobID string) error {
}
// Broadcast this change to the SocketIO clients.
jobUpdate := webupdates.NewJobUpdate(dbJob)
jobUpdate := eventbus.NewJobUpdate(dbJob)
f.broadcaster.BroadcastJobUpdate(jobUpdate)
return e.NoContent(http.StatusNoContent)

@ -7,8 +7,8 @@ import (
"net/http"
"github.com/labstack/echo/v4"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/internal/uuid"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -121,7 +121,7 @@ func (f *Flamenco) DeleteWorker(e echo.Context, workerUUID string) error {
now := f.clock.Now()
// Broadcast the fact that this worker was just deleted.
update := webupdates.NewWorkerUpdate(worker)
update := eventbus.NewWorkerUpdate(worker)
update.DeletedAt = &now
f.broadcaster.BroadcastWorkerUpdate(update)
@ -183,7 +183,7 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string)
}
// Broadcast the change.
update := webupdates.NewWorkerUpdate(dbWorker)
update := eventbus.NewWorkerUpdate(dbWorker)
f.broadcaster.BroadcastWorkerUpdate(update)
return e.NoContent(http.StatusNoContent)
@ -228,7 +228,7 @@ func (f *Flamenco) SetWorkerTags(e echo.Context, workerUUID string) error {
}
// Broadcast the change.
update := webupdates.NewWorkerUpdate(dbWorker)
update := eventbus.NewWorkerUpdate(dbWorker)
f.broadcaster.BroadcastWorkerUpdate(update)
return e.NoContent(http.StatusNoContent)
@ -267,7 +267,7 @@ func (f *Flamenco) DeleteWorkerTag(e echo.Context, tagUUID string) error {
}
// SocketIO broadcast of tag deletion.
update := webupdates.NewWorkerTagDeletedUpdate(tagUUID)
update := eventbus.NewWorkerTagDeletedUpdate(tagUUID)
f.broadcaster.BroadcastWorkerTagUpdate(update)
logger.Info().Msg("worker tag deleted")
@ -347,7 +347,7 @@ func (f *Flamenco) UpdateWorkerTag(e echo.Context, tagUUID string) error {
}
// SocketIO broadcast of tag update.
sioUpdate := webupdates.NewWorkerTagUpdate(dbTag)
sioUpdate := eventbus.NewWorkerTagUpdate(dbTag)
f.broadcaster.BroadcastWorkerTagUpdate(sioUpdate)
logger.Info().Msg("worker tag updated")
@ -419,7 +419,7 @@ func (f *Flamenco) CreateWorkerTag(e echo.Context) error {
logger.Info().Msg("created new worker tag")
// SocketIO broadcast of tag creation.
sioUpdate := webupdates.NewWorkerTagUpdate(&dbTag)
sioUpdate := eventbus.NewWorkerTagUpdate(&dbTag)
f.broadcaster.BroadcastNewWorkerTag(sioUpdate)
return e.JSON(http.StatusOK, workerTagDBtoAPI(dbTag))

@ -13,10 +13,10 @@ import (
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/last_rendered"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/task_state_machine"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/internal/uuid"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -105,7 +105,7 @@ func (f *Flamenco) SignOn(e echo.Context) error {
}
// Broadcast the status change to 'starting'.
update := webupdates.NewWorkerUpdate(w)
update := eventbus.NewWorkerUpdate(w)
if prevStatus != "" {
update.PreviousStatus = &prevStatus
}
@ -208,7 +208,7 @@ func (f *Flamenco) SignOff(e echo.Context) error {
return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks")
}
update := webupdates.NewWorkerUpdate(w)
update := eventbus.NewWorkerUpdate(w)
update.PreviousStatus = &prevStatus
f.broadcaster.BroadcastWorkerUpdate(update)
@ -285,7 +285,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
}
}
update := webupdates.NewWorkerUpdate(w)
update := eventbus.NewWorkerUpdate(w)
update.PreviousStatus = &prevStatus
f.broadcaster.BroadcastWorkerUpdate(update)
@ -367,7 +367,7 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
}
// Broadcast a worker update so that the web frontend will show the newly assigned task.
update := webupdates.NewWorkerUpdate(worker)
update := eventbus.NewWorkerUpdate(worker)
f.broadcaster.BroadcastWorkerUpdate(update)
// Convert database objects to API objects:
@ -465,7 +465,7 @@ func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error {
}
// Broadcast when the processing is done.
update := webupdates.NewLastRenderedUpdate(jobUUID)
update := eventbus.NewLastRenderedUpdate(jobUUID)
update.Thumbnail = *thumbnailInfo
f.broadcaster.BroadcastLastRenderedImage(update)
},

@ -0,0 +1,40 @@
package eventbus
import (
"sync"
)
type (
EventTopic string
)
type Forwarder interface {
Broadcast(topic EventTopic, payload interface{})
}
type Broker struct {
forwarders []Forwarder
mutex sync.Mutex
}
func NewBroker() *Broker {
return &Broker{
forwarders: []Forwarder{},
mutex: sync.Mutex{},
}
}
func (b *Broker) AddForwarder(forwarder Forwarder) {
b.mutex.Lock()
defer b.mutex.Unlock()
b.forwarders = append(b.forwarders, forwarder)
}
func (b *Broker) broadcast(topic EventTopic, payload interface{}) {
b.mutex.Lock()
defer b.mutex.Unlock()
for _, forwarder := range b.forwarders {
forwarder.Broadcast(topic, payload)
}
}

@ -1,5 +1,5 @@
// SPDX-License-Identifier: GPL-3.0-or-later
package webupdates
package eventbus
import (
"github.com/rs/zerolog/log"
@ -64,49 +64,45 @@ func NewTaskLogUpdate(taskUUID string, logchunk string) api.SocketIOTaskLogUpdat
}
}
// BroadcastJobUpdate sends the job update to clients.
func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) {
log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update")
b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate)
}
// BroadcastNewJob sends a "new job" notification to clients.
// This function should be called when the job has been completely created, so
// including its tasks.
func (b *BiDirComms) BroadcastNewJob(jobUpdate api.SocketIOJobUpdate) {
func (b *Broker) BroadcastNewJob(jobUpdate api.SocketIOJobUpdate) {
if jobUpdate.PreviousStatus != nil {
log.Warn().Interface("jobUpdate", jobUpdate).Msg("socketIO: new jobs should not have a previous state")
jobUpdate.PreviousStatus = nil
}
log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting new job")
b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate)
b.broadcast(TopicJobUpdate, jobUpdate)
}
// BroadcastTaskUpdate sends the task update to clients.
func (b *BiDirComms) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) {
log.Debug().Interface("taskUpdate", taskUpdate).Msg("socketIO: broadcasting task update")
room := roomForJob(taskUpdate.JobId)
b.BroadcastTo(room, SIOEventTaskUpdate, taskUpdate)
func (b *Broker) BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) {
log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update")
b.broadcast(TopicJobUpdate, jobUpdate)
}
// BroadcastLastRenderedImage sends the 'last-rendered' update to clients.
func (b *BiDirComms) BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate) {
func (b *Broker) BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate) {
log.Debug().Interface("lastRenderedUpdate", update).Msg("socketIO: broadcasting last-rendered image update")
room := roomForJob(update.JobId)
b.BroadcastTo(room, SIOEventLastRenderedUpdate, update)
topic := topicForJobLastRendered(update.JobId)
b.broadcast(topic, update)
// TODO: throttle these via a last-in-one-out queue (see `pkg/last_in_one_out_queue`).
b.BroadcastTo(SocketIORoomLastRendered, SIOEventLastRenderedUpdate, update)
b.broadcast(TopicLastRenderedImage, update)
}
// BroadcastTaskLogUpdate sends the task log chunk to clients.
func (b *BiDirComms) BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) {
func (b *Broker) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) {
log.Debug().Interface("taskUpdate", taskUpdate).Msg("socketIO: broadcasting task update")
topic := topicForJob(taskUpdate.JobId)
b.broadcast(topic, taskUpdate)
}
func (b *Broker) BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) {
// Don't log the contents here; logs can get big.
room := roomForTaskLog(taskLogUpdate.TaskId)
topic := topicForTaskLog(taskLogUpdate.TaskId)
log.Debug().
Str("task", taskLogUpdate.TaskId).
Str("room", string(room)).
Str("topic", string(topic)).
Msg("socketIO: broadcasting task log")
b.BroadcastTo(room, SIOEventTaskLogUpdate, taskLogUpdate)
b.broadcast(topic, taskLogUpdate)
}

@ -1,9 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
package webupdates
package eventbus
import (
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -38,19 +36,17 @@ func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate {
return workerUpdate
}
// BroadcastWorkerUpdate sends the worker update to clients.
func (b *BiDirComms) BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) {
log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting worker update")
b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate)
}
// BroadcastNewWorker sends a "new worker" notification to clients.
func (b *BiDirComms) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) {
func (b *Broker) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) {
if workerUpdate.PreviousStatus != nil {
log.Warn().Interface("workerUpdate", workerUpdate).Msg("socketIO: new workers should not have a previous state")
log.Warn().Interface("workerUpdate", workerUpdate).Msg("eventbus: new workers should not have a previous state")
workerUpdate.PreviousStatus = nil
}
log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting new worker")
b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate)
log.Debug().Interface("workerUpdate", workerUpdate).Msg("eventbus: broadcasting new worker")
b.broadcast(TopicWorkerUpdate, workerUpdate)
}
func (b *Broker) BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) {
log.Debug().Interface("workerUpdate", workerUpdate).Msg("eventbus: broadcasting worker update")
b.broadcast(TopicWorkerUpdate, workerUpdate)
}

@ -1,6 +1,4 @@
package webupdates
// SPDX-License-Identifier: GPL-3.0-or-later
package eventbus
import (
"github.com/rs/zerolog/log"
@ -35,14 +33,12 @@ func NewWorkerTagDeletedUpdate(tagUUID string) api.SocketIOWorkerTagUpdate {
return tagUpdate
}
// BroadcastWorkerTagUpdate sends the worker tag update to clients.
func (b *BiDirComms) BroadcastWorkerTagUpdate(WorkerTagUpdate api.SocketIOWorkerTagUpdate) {
log.Debug().Interface("WorkerTagUpdate", WorkerTagUpdate).Msg("socketIO: broadcasting worker tag update")
b.BroadcastTo(SocketIORoomWorkerTags, SIOEventWorkerTagUpdate, WorkerTagUpdate)
func (b *Broker) BroadcastWorkerTagUpdate(workerTagUpdate api.SocketIOWorkerTagUpdate) {
log.Debug().Interface("WorkerTagUpdate", workerTagUpdate).Msg("eventbus: broadcasting worker tag update")
b.broadcast(TopicWorkerTagUpdate, workerTagUpdate)
}
// BroadcastNewWorkerTag sends a "new worker tag" notification to clients.
func (b *BiDirComms) BroadcastNewWorkerTag(WorkerTagUpdate api.SocketIOWorkerTagUpdate) {
log.Debug().Interface("WorkerTagUpdate", WorkerTagUpdate).Msg("socketIO: broadcasting new worker tag")
b.BroadcastTo(SocketIORoomWorkerTags, SIOEventWorkerTagUpdate, WorkerTagUpdate)
func (b *Broker) BroadcastNewWorkerTag(workerTagUpdate api.SocketIOWorkerTagUpdate) {
log.Debug().Interface("WorkerTagUpdate", workerTagUpdate).Msg("eventbus: broadcasting new worker tag")
b.broadcast(TopicWorkerTagUpdate, workerTagUpdate)
}

@ -0,0 +1,168 @@
package eventbus
import (
"fmt"
"reflect"
gosocketio "github.com/graarh/golang-socketio"
"github.com/graarh/golang-socketio/transport"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/internal/uuid"
"projects.blender.org/studio/flamenco/pkg/api"
)
type SocketIOEventType string
const (
SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription
)
var socketIOEventTypes = map[string]string{
reflect.TypeOf(api.SocketIOJobUpdate{}).Name(): "/jobs",
reflect.TypeOf(api.SocketIOTaskUpdate{}).Name(): "/task",
reflect.TypeOf(api.SocketIOLastRenderedUpdate{}).Name(): "/last-rendered",
reflect.TypeOf(api.SocketIOTaskLogUpdate{}).Name(): "/tasklog",
reflect.TypeOf(api.SocketIOWorkerTagUpdate{}).Name(): "/workertags",
reflect.TypeOf(api.SocketIOWorkerUpdate{}).Name(): "/workers",
}
// SocketIOForwarder is an event forwarder via SocketIO.
type SocketIOForwarder struct {
sockserv *gosocketio.Server
}
var _ Forwarder = (*SocketIOForwarder)(nil)
type Message struct {
Name string `json:"name"`
Text string `json:"text"`
}
func NewSocketIOForwarder() *SocketIOForwarder {
siof := SocketIOForwarder{
sockserv: gosocketio.NewServer(transport.GetDefaultWebsocketTransport()),
}
siof.registerSIOEventHandlers()
return &siof
}
func (s *SocketIOForwarder) RegisterHandlers(router *echo.Echo) {
router.Any("/socket.io/", echo.WrapHandler(s.sockserv))
}
func (s *SocketIOForwarder) Broadcast(topic EventTopic, payload interface{}) {
// SocketIO has a concept of 'event types'. MQTT doesn't have this, and thus the Flamenco event
// system doesn't rely on it. We use the payload type name as event type.
payloadType := reflect.TypeOf(payload).Name()
eventType := socketIOEventTypes[payloadType]
log.Debug().
Str("topic", string(topic)).
Str("eventType", eventType).
Interface("payload", payload).
Msg("socketIO: broadcasting message")
s.sockserv.BroadcastTo(string(topic), eventType, payload)
}
func (s *SocketIOForwarder) registerSIOEventHandlers() {
log.Debug().Msg("initialising SocketIO")
sio := s.sockserv
// the sio.On() and c.Join() calls only return an error when there is no
// server connected to them, but that's not possible with our setup.
// Errors are explicitly silenced (by assigning to _) to reduce clutter.
// socket connection
_ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) {
logger := sioLogger(c)
logger.Debug().Msg("socketIO: connected")
})
// socket disconnection
_ = sio.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) {
logger := sioLogger(c)
logger.Debug().Msg("socketIO: disconnected")
})
_ = sio.On(gosocketio.OnError, func(c *gosocketio.Channel) {
logger := sioLogger(c)
logger.Warn().Msg("socketIO: socketio error")
})
s.registerRoomEventHandlers()
}
func sioLogger(c *gosocketio.Channel) zerolog.Logger {
logger := log.With().
Str("clientID", c.Id()).
Str("remoteAddr", c.Ip()).
Logger()
return logger
}
func (s *SocketIOForwarder) registerRoomEventHandlers() {
_ = s.sockserv.On(string(SIOEventSubscription), s.handleRoomSubscription)
}
func (s *SocketIOForwarder) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string {
logger := sioLogger(c)
logCtx := logger.With().
Str("op", string(subs.Op)).
Str("type", string(subs.Type))
if subs.Uuid != nil {
logCtx = logCtx.Str("uuid", string(*subs.Uuid))
}
logger = logCtx.Logger()
if subs.Uuid != nil && !uuid.IsValid(*subs.Uuid) {
logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request")
return "invalid UUID, ignoring request"
}
var sioRoom EventTopic
switch subs.Type {
case api.SocketIOSubscriptionTypeAllJobs:
sioRoom = TopicJobUpdate
case api.SocketIOSubscriptionTypeAllWorkers:
sioRoom = TopicWorkerUpdate
case api.SocketIOSubscriptionTypeAllLastRendered:
sioRoom = TopicLastRenderedImage
case api.SocketIOSubscriptionTypeAllWorkerTags:
sioRoom = TopicWorkerTagUpdate
case api.SocketIOSubscriptionTypeJob:
if subs.Uuid == nil {
logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID")
return "operation on job requires a UUID"
}
sioRoom = topicForJob(*subs.Uuid)
case api.SocketIOSubscriptionTypeTasklog:
if subs.Uuid == nil {
logger.Warn().Msg("socketIO: trying to (un)subscribe to task without UUID")
return "operation on task requires a UUID"
}
sioRoom = topicForTaskLog(*subs.Uuid)
default:
logger.Warn().Msg("socketIO: unknown subscription type, ignoring")
return "unknown subscription type, ignoring request"
}
var err error
switch subs.Op {
case api.SocketIOSubscriptionOperationSubscribe:
err = c.Join(string(sioRoom))
case api.SocketIOSubscriptionOperationUnsubscribe:
err = c.Leave(string(sioRoom))
default:
logger.Warn().Msg("socketIO: invalid subscription operation, ignoring")
return "invalid subscription operation, ignoring request"
}
if err != nil {
logger.Warn().Err(err).Msg("socketIO: performing subscription operation")
return fmt.Sprintf("unable to perform subscription operation: %v", err)
}
logger.Debug().Msg("socketIO: subscription")
return "ok"
}

@ -0,0 +1,38 @@
// SPDX-License-Identifier: GPL-3.0-or-later
package eventbus
import "fmt"
const (
// Topics on which events are published.
TopicJobUpdate EventTopic = "/jobs" // sends api.SocketIOJobUpdate
TopicLastRenderedImage EventTopic = "/last-rendered" // sends api.SocketIOLastRenderedUpdate
TopicTaskUpdate EventTopic = "/task" // sends api.SocketIOTaskUpdate
TopicWorkerUpdate EventTopic = "/workers" // sends api.SocketIOWorkerUpdate
TopicWorkerTagUpdate EventTopic = "/workertags" // sends api.SocketIOWorkerTagUpdate
TopicSubscription EventTopic = "/subscription" // clients send api.SocketIOSubscription
// Parameterised topics.
TopicJobSpecific EventTopic = "/jobs/%s" // %s = job UUID
TopicJobLastRendered EventTopic = "/jobs/%s/last-rendered" // %s = job UUID
TopicTaskLog EventTopic = "/tasklog/%s" // %s = task UUID
)
// topicForJob will return the event topic for the given job. Clients subscribed
// to this topic receive info scoped to this job, so for example updates to all
// tasks of this job.
func topicForJob(jobUUID string) EventTopic {
return EventTopic(fmt.Sprintf(string(TopicJobSpecific), jobUUID))
}
func topicForJobLastRendered(jobUUID string) EventTopic {
return EventTopic(fmt.Sprintf(string(TopicJobLastRendered), jobUUID))
}
// topicForTaskLog will return the event topic for receiving task logs of
// the the given task.
//
// Note that general task updates are sent to their job's topic, and not to this
// one.
func topicForTaskLog(taskUUID string) EventTopic {
return EventTopic(fmt.Sprintf(string(TopicTaskLog), taskUUID))
}

@ -6,9 +6,9 @@ import (
"context"
"time"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/local_storage"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/pkg/shaman"
)
@ -44,8 +44,8 @@ type ChangeBroadcaster interface {
BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
// ChangeBroadcaster should be a subset of eventbus.Broker
var _ ChangeBroadcaster = (*eventbus.Broker)(nil)
type Shaman interface {
// IsEnabled returns whether this Shaman service is enabled or not.

@ -18,8 +18,8 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/pkg/shaman"
)
@ -76,7 +76,7 @@ func (s *Service) QueueJobDeletion(ctx context.Context, job *persistence.Job) er
}
// Broadcast that this job was queued for deleted.
jobUpdate := webupdates.NewJobUpdate(job)
jobUpdate := eventbus.NewJobUpdate(job)
s.changeBroadcaster.BroadcastJobUpdate(jobUpdate)
// Let the Run() goroutine know this job is ready for deletion.
@ -128,7 +128,7 @@ func (s *Service) broadcastAndQueueMassJobDeletion(ctx context.Context, jobUUIDs
Msg("job deleter: unable to fetch job to send updates")
continue
}
jobUpdate := webupdates.NewJobUpdate(job)
jobUpdate := eventbus.NewJobUpdate(job)
s.changeBroadcaster.BroadcastJobUpdate(jobUpdate)
}
}

@ -5,8 +5,8 @@ package sleep_scheduler
import (
"context"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -33,5 +33,5 @@ type ChangeBroadcaster interface {
BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms.
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
// ChangeBroadcaster should be a subset of eventbus.Broker.
var _ ChangeBroadcaster = (*eventbus.Broker)(nil)

@ -13,7 +13,7 @@ import (
"github.com/benbjohnson/clock"
"github.com/rs/zerolog"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -47,8 +47,8 @@ type ChangeBroadcaster interface {
BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
// ChangeBroadcaster should be a subset of eventbus.Broker
var _ ChangeBroadcaster = (*eventbus.Broker)(nil)
// NewStorage creates a new log storage rooted at `basePath`.
func NewStorage(
@ -72,7 +72,7 @@ func (s *Storage) Write(logger zerolog.Logger, jobID, taskID string, logText str
}
// Broadcast the task log to SocketIO clients.
taskUpdate := webupdates.NewTaskLogUpdate(taskID, logText)
taskUpdate := eventbus.NewTaskLogUpdate(taskID, logText)
s.broadcaster.BroadcastTaskLogUpdate(taskUpdate)
return nil
}

@ -6,9 +6,9 @@ import (
"context"
"github.com/rs/zerolog"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/task_logs"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -49,8 +49,8 @@ type ChangeBroadcaster interface {
BroadcastTaskUpdate(jobUpdate api.SocketIOTaskUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
// ChangeBroadcaster should be a subset of eventbus.Broker
var _ ChangeBroadcaster = (*eventbus.Broker)(nil)
// LogStorage writes to task logs.
type LogStorage interface {

@ -9,8 +9,8 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -88,7 +88,7 @@ func (sm *StateMachine) taskStatusChangeOnly(
}
// Broadcast this change to the SocketIO clients.
taskUpdate := webupdates.NewTaskUpdate(task)
taskUpdate := eventbus.NewTaskUpdate(task)
taskUpdate.PreviousStatus = &oldTaskStatus
sm.broadcaster.BroadcastTaskUpdate(taskUpdate)
@ -331,7 +331,7 @@ func (sm *StateMachine) jobStatusSet(ctx context.Context,
}
// Broadcast this change to the SocketIO clients.
jobUpdate := webupdates.NewJobUpdate(job)
jobUpdate := eventbus.NewJobUpdate(job)
jobUpdate.PreviousStatus = &oldJobStatus
jobUpdate.RefreshTasks = result.massTaskUpdate
sm.broadcaster.BroadcastJobUpdate(jobUpdate)

@ -7,9 +7,9 @@ import (
"time"
"github.com/rs/zerolog"
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/task_state_machine"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -43,5 +43,5 @@ type ChangeBroadcaster interface {
BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms.
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)
// ChangeBroadcaster should be a subset of eventbus.Broker.
var _ ChangeBroadcaster = (*eventbus.Broker)(nil)

@ -1,17 +0,0 @@
// SPDX-License-Identifier: GPL-3.0-or-later
package webupdates
import gosocketio "github.com/graarh/golang-socketio"
func (b *BiDirComms) registerChatEventHandlers() {
_ = b.sockserv.On(string(SIOEventChatMessageRcv),
func(c *gosocketio.Channel, message Message) string {
logger := sioLogger(c)
logger.Info().
Str("text", message.Text).
Str("name", message.Name).
Msg("socketIO: message received")
b.BroadcastTo(SocketIORoomChat, SIOEventChatMessageSend, message)
return "message sent successfully."
})
}

@ -1,135 +0,0 @@
// SPDX-License-Identifier: GPL-3.0-or-later
package webupdates
import (
"fmt"
gosocketio "github.com/graarh/golang-socketio"
"projects.blender.org/studio/flamenco/internal/uuid"
"projects.blender.org/studio/flamenco/pkg/api"
)
// Separate type aliases for room names and event types; it's otherwise too easy
// to confuse the two.
type (
SocketIORoomName string
SocketIOEventType string
)
const (
// Predefined SocketIO rooms. There will be others, but those will have a
// dynamic name like `job-fa48930a-105c-4125-a7f7-0aa1651dcd57` and cannot be
// listed here as constants. See `roomXXX()` functions for those.
SocketIORoomChat SocketIORoomName = "Chat" // For chat messages.
SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates.
SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates.
SocketIORoomWorkerTags SocketIORoomName = "WorkerTags" // For worker tag updates.
// For updates about ALL last-rendered images. Normally these are sent to a
// room specific to a particular job, but for the global "last rendered image"
// all updates are sent here too.
SocketIORoomLastRendered SocketIORoomName = "Last-Rendered"
)
const (
// Predefined SocketIO event types.
SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send chat messages here
SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here
SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate
SIOEventLastRenderedUpdate SocketIOEventType = "/last-rendered" // sends api.SocketIOLastRenderedUpdate
SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate
SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate
SIOEventWorkerUpdate SocketIOEventType = "/workers" // sends api.SocketIOWorkerUpdate
SIOEventWorkerTagUpdate SocketIOEventType = "/workertags" // sends api.SocketIOWorkerTagUpdate
SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription
)
func (b *BiDirComms) BroadcastTo(room SocketIORoomName, eventType SocketIOEventType, payload interface{}) {
b.sockserv.BroadcastTo(string(room), string(eventType), payload)
}
func (b *BiDirComms) registerRoomEventHandlers() {
_ = b.sockserv.On(string(SIOEventSubscription), b.handleRoomSubscription)
}
func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string {
logger := sioLogger(c)
logCtx := logger.With().
Str("op", string(subs.Op)).
Str("type", string(subs.Type))
if subs.Uuid != nil {
logCtx = logCtx.Str("uuid", string(*subs.Uuid))
}
logger = logCtx.Logger()
if subs.Uuid != nil && !uuid.IsValid(*subs.Uuid) {
logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request")
return "invalid UUID, ignoring request"
}
var sioRoom SocketIORoomName
switch subs.Type {
case api.SocketIOSubscriptionTypeAllJobs:
sioRoom = SocketIORoomJobs
case api.SocketIOSubscriptionTypeAllWorkers:
sioRoom = SocketIORoomWorkers
case api.SocketIOSubscriptionTypeAllLastRendered:
sioRoom = SocketIORoomLastRendered
case api.SocketIOSubscriptionTypeAllWorkerTags:
sioRoom = SocketIORoomWorkerTags
case api.SocketIOSubscriptionTypeJob:
if subs.Uuid == nil {
logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID")
return "operation on job requires a UUID"
}
sioRoom = roomForJob(*subs.Uuid)
case api.SocketIOSubscriptionTypeTasklog:
if subs.Uuid == nil {
logger.Warn().Msg("socketIO: trying to (un)subscribe to task without UUID")
return "operation on task requires a UUID"
}
sioRoom = roomForTaskLog(*subs.Uuid)
default:
logger.Warn().Msg("socketIO: unknown subscription type, ignoring")
return "unknown subscription type, ignoring request"
}
var err error
switch subs.Op {
case api.SocketIOSubscriptionOperationSubscribe:
err = c.Join(string(sioRoom))
case api.SocketIOSubscriptionOperationUnsubscribe:
err = c.Leave(string(sioRoom))
default:
logger.Warn().Msg("socketIO: invalid subscription operation, ignoring")
return "invalid subscription operation, ignoring request"
}
if err != nil {
logger.Warn().Err(err).Msg("socketIO: performing subscription operation")
return fmt.Sprintf("unable to perform subscription operation: %v", err)
}
logger.Debug().Msg("socketIO: subscription")
return "ok"
}
// roomForJob will return the SocketIO room name for the given job. Clients in
// this room will receive info scoped to this job, so for example updates to all
// tasks of this job.
//
// Note that `api.SocketIOJobUpdate`s themselves are sent to all SocketIO clients, and
// not to this room.
func roomForJob(jobUUID string) SocketIORoomName {
return SocketIORoomName("job-" + jobUUID)
}
// roomForTaskLog will return the SocketIO room name for receiving task logs of
// the the given task.
//
// Note that general task updates (`api.SIOEventTaskUpdate`) are sent to their
// job's room, and not to this room.
func roomForTaskLog(taskUUID string) SocketIORoomName {
return SocketIORoomName("tasklog-" + taskUUID)
}

@ -1,70 +0,0 @@
// package webupdates uses SocketIO to send updates to a web client.
// SPDX-License-Identifier: GPL-3.0-or-later
package webupdates
import (
gosocketio "github.com/graarh/golang-socketio"
"github.com/graarh/golang-socketio/transport"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type BiDirComms struct {
sockserv *gosocketio.Server
}
type Message struct {
Name string `json:"name"`
Text string `json:"text"`
}
func New() *BiDirComms {
bdc := BiDirComms{
sockserv: gosocketio.NewServer(transport.GetDefaultWebsocketTransport()),
}
bdc.registerSIOEventHandlers()
return &bdc
}
func (b *BiDirComms) RegisterHandlers(router *echo.Echo) {
router.Any("/socket.io/", echo.WrapHandler(b.sockserv))
}
func (b *BiDirComms) registerSIOEventHandlers() {
log.Debug().Msg("initialising SocketIO")
sio := b.sockserv
// the sio.On() and c.Join() calls only return an error when there is no
// server connected to them, but that's not possible with our setup.
// Errors are explicitly silenced (by assigning to _) to reduce clutter.
// socket connection
_ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) {
logger := sioLogger(c)
logger.Debug().Msg("socketIO: connected")
_ = c.Join(string(SocketIORoomChat)) // All clients connect to the chat room.
})
// socket disconnection
_ = sio.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) {
logger := sioLogger(c)
logger.Debug().Msg("socketIO: disconnected")
})
_ = sio.On(gosocketio.OnError, func(c *gosocketio.Channel) {
logger := sioLogger(c)
logger.Warn().Msg("socketIO: socketio error")
})
b.registerChatEventHandlers()
b.registerRoomEventHandlers()
}
func sioLogger(c *gosocketio.Channel) zerolog.Logger {
logger := log.With().
Str("clientID", c.Id()).
Str("remoteAddr", c.Ip()).
Logger()
return logger
}