diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 9b3c5409..f3bceeab 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -26,6 +26,7 @@ import ( "projects.blender.org/studio/flamenco/internal/manager/api_impl" "projects.blender.org/studio/flamenco/internal/manager/api_impl/dummy" "projects.blender.org/studio/flamenco/internal/manager/config" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/manager/job_deleter" "projects.blender.org/studio/flamenco/internal/manager/last_rendered" @@ -35,7 +36,6 @@ import ( "projects.blender.org/studio/flamenco/internal/manager/task_logs" "projects.blender.org/studio/flamenco/internal/manager/task_state_machine" "projects.blender.org/studio/flamenco/internal/manager/timeout_checker" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/internal/own_url" "projects.blender.org/studio/flamenco/internal/upnp_ssdp" "projects.blender.org/studio/flamenco/pkg/shaman" @@ -152,29 +152,32 @@ func runFlamencoManager() bool { log.Fatal().Err(err).Msg("error loading job compilers") } - webUpdater := webupdates.New() + // Set up the event system. + eventBroker := eventbus.NewBroker() + socketio := eventbus.NewSocketIOForwarder() + eventBroker.AddForwarder(socketio) localStorage := local_storage.NewNextToExe(configService.Get().LocalManagerStoragePath) - logStorage := task_logs.NewStorage(localStorage, timeService, webUpdater) + logStorage := task_logs.NewStorage(localStorage, timeService, eventBroker) - taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage) - sleepScheduler := sleep_scheduler.New(timeService, persist, webUpdater) + taskStateMachine := task_state_machine.NewStateMachine(persist, eventBroker, logStorage) + sleepScheduler := sleep_scheduler.New(timeService, persist, eventBroker) lastRender := last_rendered.New(localStorage) shamanServer := buildShamanServer(configService, isFirstRun) - jobDeleter := job_deleter.NewService(persist, localStorage, webUpdater, shamanServer) + jobDeleter := job_deleter.NewService(persist, localStorage, eventBroker, shamanServer) flamenco := api_impl.NewFlamenco( - compiler, persist, webUpdater, logStorage, configService, + compiler, persist, eventBroker, logStorage, configService, taskStateMachine, shamanServer, timeService, lastRender, localStorage, sleepScheduler, jobDeleter) - e := buildWebService(flamenco, persist, ssdp, webUpdater, urls, localStorage) + e := buildWebService(flamenco, persist, ssdp, socketio, urls, localStorage) timeoutChecker := timeout_checker.New( configService.Get().TaskTimeout, configService.Get().WorkerTimeout, - timeService, persist, taskStateMachine, logStorage, webUpdater) + timeService, persist, taskStateMachine, logStorage, eventBroker) // The main context determines the lifetime of the application. All // long-running goroutines need to keep an eye on this, and stop their work diff --git a/cmd/flamenco-manager/webservice.go b/cmd/flamenco-manager/webservice.go index 295cb230..ede66add 100644 --- a/cmd/flamenco-manager/webservice.go +++ b/cmd/flamenco-manager/webservice.go @@ -20,9 +20,9 @@ import ( "github.com/ziflex/lecho/v3" "projects.blender.org/studio/flamenco/internal/manager/api_impl" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/local_storage" "projects.blender.org/studio/flamenco/internal/manager/swagger_ui" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/internal/upnp_ssdp" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/web" @@ -32,7 +32,7 @@ func buildWebService( flamenco api.ServerInterface, persist api_impl.PersistenceService, ssdp *upnp_ssdp.Server, - webUpdater *webupdates.BiDirComms, + socketio *eventbus.SocketIOForwarder, ownURLs []url.URL, localStorage local_storage.StorageInfo, ) *echo.Echo { @@ -112,7 +112,7 @@ func buildWebService( // Register routes. api.RegisterHandlers(e, flamenco) - webUpdater.RegisterHandlers(e) + socketio.RegisterHandlers(e) swagger_ui.RegisterSwaggerUIStaticFiles(e) e.GET("/api/v3/openapi3.json", func(c echo.Context) error { return c.JSON(http.StatusOK, swagger) diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 593e8244..52ab5d25 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -14,13 +14,13 @@ import ( "github.com/rs/zerolog" "projects.blender.org/studio/flamenco/internal/manager/config" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/manager/job_deleter" "projects.blender.org/studio/flamenco/internal/manager/last_rendered" "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/internal/manager/sleep_scheduler" "projects.blender.org/studio/flamenco/internal/manager/task_state_machine" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/shaman" ) @@ -125,8 +125,8 @@ type ChangeBroadcaster interface { BroadcastNewWorkerTag(workerTagUpdate api.SocketIOWorkerTagUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms. -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker. +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) type JobCompiler interface { ListJobTypes() api.AvailableJobTypes diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 35aca5e9..9e9918b2 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -16,9 +16,9 @@ import ( "github.com/labstack/echo/v4" "github.com/rs/zerolog" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/crosspath" @@ -105,7 +105,7 @@ func (f *Flamenco) SubmitJob(e echo.Context) error { return sendAPIError(e, http.StatusInternalServerError, "error retrieving job from database") } - jobUpdate := webupdates.NewJobUpdate(dbJob) + jobUpdate := eventbus.NewJobUpdate(dbJob) f.broadcaster.BroadcastNewJob(jobUpdate) apiJob := jobDBtoAPI(dbJob) @@ -365,7 +365,7 @@ func (f *Flamenco) SetJobPriority(e echo.Context, jobID string) error { } // Broadcast this change to the SocketIO clients. - jobUpdate := webupdates.NewJobUpdate(dbJob) + jobUpdate := eventbus.NewJobUpdate(dbJob) f.broadcaster.BroadcastJobUpdate(jobUpdate) return e.NoContent(http.StatusNoContent) diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index 590a481c..a2412fa4 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -7,8 +7,8 @@ import ( "net/http" "github.com/labstack/echo/v4" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -121,7 +121,7 @@ func (f *Flamenco) DeleteWorker(e echo.Context, workerUUID string) error { now := f.clock.Now() // Broadcast the fact that this worker was just deleted. - update := webupdates.NewWorkerUpdate(worker) + update := eventbus.NewWorkerUpdate(worker) update.DeletedAt = &now f.broadcaster.BroadcastWorkerUpdate(update) @@ -183,7 +183,7 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string) } // Broadcast the change. - update := webupdates.NewWorkerUpdate(dbWorker) + update := eventbus.NewWorkerUpdate(dbWorker) f.broadcaster.BroadcastWorkerUpdate(update) return e.NoContent(http.StatusNoContent) @@ -228,7 +228,7 @@ func (f *Flamenco) SetWorkerTags(e echo.Context, workerUUID string) error { } // Broadcast the change. - update := webupdates.NewWorkerUpdate(dbWorker) + update := eventbus.NewWorkerUpdate(dbWorker) f.broadcaster.BroadcastWorkerUpdate(update) return e.NoContent(http.StatusNoContent) @@ -267,7 +267,7 @@ func (f *Flamenco) DeleteWorkerTag(e echo.Context, tagUUID string) error { } // SocketIO broadcast of tag deletion. - update := webupdates.NewWorkerTagDeletedUpdate(tagUUID) + update := eventbus.NewWorkerTagDeletedUpdate(tagUUID) f.broadcaster.BroadcastWorkerTagUpdate(update) logger.Info().Msg("worker tag deleted") @@ -347,7 +347,7 @@ func (f *Flamenco) UpdateWorkerTag(e echo.Context, tagUUID string) error { } // SocketIO broadcast of tag update. - sioUpdate := webupdates.NewWorkerTagUpdate(dbTag) + sioUpdate := eventbus.NewWorkerTagUpdate(dbTag) f.broadcaster.BroadcastWorkerTagUpdate(sioUpdate) logger.Info().Msg("worker tag updated") @@ -419,7 +419,7 @@ func (f *Flamenco) CreateWorkerTag(e echo.Context) error { logger.Info().Msg("created new worker tag") // SocketIO broadcast of tag creation. - sioUpdate := webupdates.NewWorkerTagUpdate(&dbTag) + sioUpdate := eventbus.NewWorkerTagUpdate(&dbTag) f.broadcaster.BroadcastNewWorkerTag(sioUpdate) return e.JSON(http.StatusOK, workerTagDBtoAPI(dbTag)) diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index af1eb29d..8e77545a 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -13,10 +13,10 @@ import ( "github.com/labstack/echo/v4" "github.com/rs/zerolog" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/last_rendered" "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/internal/manager/task_state_machine" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -105,7 +105,7 @@ func (f *Flamenco) SignOn(e echo.Context) error { } // Broadcast the status change to 'starting'. - update := webupdates.NewWorkerUpdate(w) + update := eventbus.NewWorkerUpdate(w) if prevStatus != "" { update.PreviousStatus = &prevStatus } @@ -208,7 +208,7 @@ func (f *Flamenco) SignOff(e echo.Context) error { return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks") } - update := webupdates.NewWorkerUpdate(w) + update := eventbus.NewWorkerUpdate(w) update.PreviousStatus = &prevStatus f.broadcaster.BroadcastWorkerUpdate(update) @@ -285,7 +285,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error { } } - update := webupdates.NewWorkerUpdate(w) + update := eventbus.NewWorkerUpdate(w) update.PreviousStatus = &prevStatus f.broadcaster.BroadcastWorkerUpdate(update) @@ -367,7 +367,7 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { } // Broadcast a worker update so that the web frontend will show the newly assigned task. - update := webupdates.NewWorkerUpdate(worker) + update := eventbus.NewWorkerUpdate(worker) f.broadcaster.BroadcastWorkerUpdate(update) // Convert database objects to API objects: @@ -465,7 +465,7 @@ func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error { } // Broadcast when the processing is done. - update := webupdates.NewLastRenderedUpdate(jobUUID) + update := eventbus.NewLastRenderedUpdate(jobUUID) update.Thumbnail = *thumbnailInfo f.broadcaster.BroadcastLastRenderedImage(update) }, diff --git a/internal/manager/eventbus/eventbus.go b/internal/manager/eventbus/eventbus.go new file mode 100644 index 00000000..c0b4fdfe --- /dev/null +++ b/internal/manager/eventbus/eventbus.go @@ -0,0 +1,40 @@ +package eventbus + +import ( + "sync" +) + +type ( + EventTopic string +) + +type Forwarder interface { + Broadcast(topic EventTopic, payload interface{}) +} + +type Broker struct { + forwarders []Forwarder + mutex sync.Mutex +} + +func NewBroker() *Broker { + return &Broker{ + forwarders: []Forwarder{}, + mutex: sync.Mutex{}, + } +} + +func (b *Broker) AddForwarder(forwarder Forwarder) { + b.mutex.Lock() + defer b.mutex.Unlock() + b.forwarders = append(b.forwarders, forwarder) +} + +func (b *Broker) broadcast(topic EventTopic, payload interface{}) { + b.mutex.Lock() + defer b.mutex.Unlock() + + for _, forwarder := range b.forwarders { + forwarder.Broadcast(topic, payload) + } +} diff --git a/internal/manager/webupdates/job_updates.go b/internal/manager/eventbus/events_jobs.go similarity index 71% rename from internal/manager/webupdates/job_updates.go rename to internal/manager/eventbus/events_jobs.go index 0ad547a7..eca03e05 100644 --- a/internal/manager/webupdates/job_updates.go +++ b/internal/manager/eventbus/events_jobs.go @@ -1,5 +1,5 @@ // SPDX-License-Identifier: GPL-3.0-or-later -package webupdates +package eventbus import ( "github.com/rs/zerolog/log" @@ -64,49 +64,45 @@ func NewTaskLogUpdate(taskUUID string, logchunk string) api.SocketIOTaskLogUpdat } } -// BroadcastJobUpdate sends the job update to clients. -func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) { - log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update") - b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate) -} - // BroadcastNewJob sends a "new job" notification to clients. // This function should be called when the job has been completely created, so // including its tasks. -func (b *BiDirComms) BroadcastNewJob(jobUpdate api.SocketIOJobUpdate) { +func (b *Broker) BroadcastNewJob(jobUpdate api.SocketIOJobUpdate) { if jobUpdate.PreviousStatus != nil { log.Warn().Interface("jobUpdate", jobUpdate).Msg("socketIO: new jobs should not have a previous state") jobUpdate.PreviousStatus = nil } log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting new job") - b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate) + b.broadcast(TopicJobUpdate, jobUpdate) } -// BroadcastTaskUpdate sends the task update to clients. -func (b *BiDirComms) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) { - log.Debug().Interface("taskUpdate", taskUpdate).Msg("socketIO: broadcasting task update") - room := roomForJob(taskUpdate.JobId) - b.BroadcastTo(room, SIOEventTaskUpdate, taskUpdate) +func (b *Broker) BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) { + log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update") + b.broadcast(TopicJobUpdate, jobUpdate) } -// BroadcastLastRenderedImage sends the 'last-rendered' update to clients. -func (b *BiDirComms) BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate) { +func (b *Broker) BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate) { log.Debug().Interface("lastRenderedUpdate", update).Msg("socketIO: broadcasting last-rendered image update") - room := roomForJob(update.JobId) - b.BroadcastTo(room, SIOEventLastRenderedUpdate, update) + topic := topicForJobLastRendered(update.JobId) + b.broadcast(topic, update) // TODO: throttle these via a last-in-one-out queue (see `pkg/last_in_one_out_queue`). - b.BroadcastTo(SocketIORoomLastRendered, SIOEventLastRenderedUpdate, update) + b.broadcast(TopicLastRenderedImage, update) } -// BroadcastTaskLogUpdate sends the task log chunk to clients. -func (b *BiDirComms) BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) { +func (b *Broker) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) { + log.Debug().Interface("taskUpdate", taskUpdate).Msg("socketIO: broadcasting task update") + topic := topicForJob(taskUpdate.JobId) + b.broadcast(topic, taskUpdate) +} + +func (b *Broker) BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) { // Don't log the contents here; logs can get big. - room := roomForTaskLog(taskLogUpdate.TaskId) + topic := topicForTaskLog(taskLogUpdate.TaskId) log.Debug(). Str("task", taskLogUpdate.TaskId). - Str("room", string(room)). + Str("topic", string(topic)). Msg("socketIO: broadcasting task log") - b.BroadcastTo(room, SIOEventTaskLogUpdate, taskLogUpdate) + b.broadcast(topic, taskLogUpdate) } diff --git a/internal/manager/webupdates/worker_updates.go b/internal/manager/eventbus/events_workers.go similarity index 58% rename from internal/manager/webupdates/worker_updates.go rename to internal/manager/eventbus/events_workers.go index 5d6a3d7d..e208f4fb 100644 --- a/internal/manager/webupdates/worker_updates.go +++ b/internal/manager/eventbus/events_workers.go @@ -1,9 +1,7 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -package webupdates +package eventbus import ( "github.com/rs/zerolog/log" - "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -38,19 +36,17 @@ func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate { return workerUpdate } -// BroadcastWorkerUpdate sends the worker update to clients. -func (b *BiDirComms) BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) { - log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting worker update") - b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate) -} - -// BroadcastNewWorker sends a "new worker" notification to clients. -func (b *BiDirComms) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) { +func (b *Broker) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) { if workerUpdate.PreviousStatus != nil { - log.Warn().Interface("workerUpdate", workerUpdate).Msg("socketIO: new workers should not have a previous state") + log.Warn().Interface("workerUpdate", workerUpdate).Msg("eventbus: new workers should not have a previous state") workerUpdate.PreviousStatus = nil } - log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting new worker") - b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate) + log.Debug().Interface("workerUpdate", workerUpdate).Msg("eventbus: broadcasting new worker") + b.broadcast(TopicWorkerUpdate, workerUpdate) +} + +func (b *Broker) BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) { + log.Debug().Interface("workerUpdate", workerUpdate).Msg("eventbus: broadcasting worker update") + b.broadcast(TopicWorkerUpdate, workerUpdate) } diff --git a/internal/manager/webupdates/workertag_updates.go b/internal/manager/eventbus/events_workertags.go similarity index 55% rename from internal/manager/webupdates/workertag_updates.go rename to internal/manager/eventbus/events_workertags.go index 95d0ec71..4ec42601 100644 --- a/internal/manager/webupdates/workertag_updates.go +++ b/internal/manager/eventbus/events_workertags.go @@ -1,6 +1,4 @@ -package webupdates - -// SPDX-License-Identifier: GPL-3.0-or-later +package eventbus import ( "github.com/rs/zerolog/log" @@ -35,14 +33,12 @@ func NewWorkerTagDeletedUpdate(tagUUID string) api.SocketIOWorkerTagUpdate { return tagUpdate } -// BroadcastWorkerTagUpdate sends the worker tag update to clients. -func (b *BiDirComms) BroadcastWorkerTagUpdate(WorkerTagUpdate api.SocketIOWorkerTagUpdate) { - log.Debug().Interface("WorkerTagUpdate", WorkerTagUpdate).Msg("socketIO: broadcasting worker tag update") - b.BroadcastTo(SocketIORoomWorkerTags, SIOEventWorkerTagUpdate, WorkerTagUpdate) +func (b *Broker) BroadcastWorkerTagUpdate(workerTagUpdate api.SocketIOWorkerTagUpdate) { + log.Debug().Interface("WorkerTagUpdate", workerTagUpdate).Msg("eventbus: broadcasting worker tag update") + b.broadcast(TopicWorkerTagUpdate, workerTagUpdate) } -// BroadcastNewWorkerTag sends a "new worker tag" notification to clients. -func (b *BiDirComms) BroadcastNewWorkerTag(WorkerTagUpdate api.SocketIOWorkerTagUpdate) { - log.Debug().Interface("WorkerTagUpdate", WorkerTagUpdate).Msg("socketIO: broadcasting new worker tag") - b.BroadcastTo(SocketIORoomWorkerTags, SIOEventWorkerTagUpdate, WorkerTagUpdate) +func (b *Broker) BroadcastNewWorkerTag(workerTagUpdate api.SocketIOWorkerTagUpdate) { + log.Debug().Interface("WorkerTagUpdate", workerTagUpdate).Msg("eventbus: broadcasting new worker tag") + b.broadcast(TopicWorkerTagUpdate, workerTagUpdate) } diff --git a/internal/manager/eventbus/socketio.go b/internal/manager/eventbus/socketio.go new file mode 100644 index 00000000..6b2e7abc --- /dev/null +++ b/internal/manager/eventbus/socketio.go @@ -0,0 +1,168 @@ +package eventbus + +import ( + "fmt" + "reflect" + + gosocketio "github.com/graarh/golang-socketio" + "github.com/graarh/golang-socketio/transport" + "github.com/labstack/echo/v4" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/internal/uuid" + "projects.blender.org/studio/flamenco/pkg/api" +) + +type SocketIOEventType string + +const ( + SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription +) + +var socketIOEventTypes = map[string]string{ + reflect.TypeOf(api.SocketIOJobUpdate{}).Name(): "/jobs", + reflect.TypeOf(api.SocketIOTaskUpdate{}).Name(): "/task", + reflect.TypeOf(api.SocketIOLastRenderedUpdate{}).Name(): "/last-rendered", + reflect.TypeOf(api.SocketIOTaskLogUpdate{}).Name(): "/tasklog", + reflect.TypeOf(api.SocketIOWorkerTagUpdate{}).Name(): "/workertags", + reflect.TypeOf(api.SocketIOWorkerUpdate{}).Name(): "/workers", +} + +// SocketIOForwarder is an event forwarder via SocketIO. +type SocketIOForwarder struct { + sockserv *gosocketio.Server +} + +var _ Forwarder = (*SocketIOForwarder)(nil) + +type Message struct { + Name string `json:"name"` + Text string `json:"text"` +} + +func NewSocketIOForwarder() *SocketIOForwarder { + siof := SocketIOForwarder{ + sockserv: gosocketio.NewServer(transport.GetDefaultWebsocketTransport()), + } + siof.registerSIOEventHandlers() + return &siof +} + +func (s *SocketIOForwarder) RegisterHandlers(router *echo.Echo) { + router.Any("/socket.io/", echo.WrapHandler(s.sockserv)) +} + +func (s *SocketIOForwarder) Broadcast(topic EventTopic, payload interface{}) { + // SocketIO has a concept of 'event types'. MQTT doesn't have this, and thus the Flamenco event + // system doesn't rely on it. We use the payload type name as event type. + payloadType := reflect.TypeOf(payload).Name() + eventType := socketIOEventTypes[payloadType] + log.Debug(). + Str("topic", string(topic)). + Str("eventType", eventType). + Interface("payload", payload). + Msg("socketIO: broadcasting message") + s.sockserv.BroadcastTo(string(topic), eventType, payload) +} + +func (s *SocketIOForwarder) registerSIOEventHandlers() { + log.Debug().Msg("initialising SocketIO") + + sio := s.sockserv + // the sio.On() and c.Join() calls only return an error when there is no + // server connected to them, but that's not possible with our setup. + // Errors are explicitly silenced (by assigning to _) to reduce clutter. + + // socket connection + _ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) { + logger := sioLogger(c) + logger.Debug().Msg("socketIO: connected") + }) + + // socket disconnection + _ = sio.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) { + logger := sioLogger(c) + logger.Debug().Msg("socketIO: disconnected") + }) + + _ = sio.On(gosocketio.OnError, func(c *gosocketio.Channel) { + logger := sioLogger(c) + logger.Warn().Msg("socketIO: socketio error") + }) + + s.registerRoomEventHandlers() +} + +func sioLogger(c *gosocketio.Channel) zerolog.Logger { + logger := log.With(). + Str("clientID", c.Id()). + Str("remoteAddr", c.Ip()). + Logger() + return logger +} + +func (s *SocketIOForwarder) registerRoomEventHandlers() { + _ = s.sockserv.On(string(SIOEventSubscription), s.handleRoomSubscription) +} + +func (s *SocketIOForwarder) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string { + logger := sioLogger(c) + logCtx := logger.With(). + Str("op", string(subs.Op)). + Str("type", string(subs.Type)) + if subs.Uuid != nil { + logCtx = logCtx.Str("uuid", string(*subs.Uuid)) + } + logger = logCtx.Logger() + + if subs.Uuid != nil && !uuid.IsValid(*subs.Uuid) { + logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request") + return "invalid UUID, ignoring request" + } + + var sioRoom EventTopic + switch subs.Type { + case api.SocketIOSubscriptionTypeAllJobs: + sioRoom = TopicJobUpdate + case api.SocketIOSubscriptionTypeAllWorkers: + sioRoom = TopicWorkerUpdate + case api.SocketIOSubscriptionTypeAllLastRendered: + sioRoom = TopicLastRenderedImage + case api.SocketIOSubscriptionTypeAllWorkerTags: + sioRoom = TopicWorkerTagUpdate + case api.SocketIOSubscriptionTypeJob: + if subs.Uuid == nil { + logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID") + return "operation on job requires a UUID" + } + sioRoom = topicForJob(*subs.Uuid) + case api.SocketIOSubscriptionTypeTasklog: + if subs.Uuid == nil { + logger.Warn().Msg("socketIO: trying to (un)subscribe to task without UUID") + return "operation on task requires a UUID" + } + sioRoom = topicForTaskLog(*subs.Uuid) + default: + logger.Warn().Msg("socketIO: unknown subscription type, ignoring") + return "unknown subscription type, ignoring request" + } + + var err error + switch subs.Op { + case api.SocketIOSubscriptionOperationSubscribe: + err = c.Join(string(sioRoom)) + case api.SocketIOSubscriptionOperationUnsubscribe: + err = c.Leave(string(sioRoom)) + default: + logger.Warn().Msg("socketIO: invalid subscription operation, ignoring") + return "invalid subscription operation, ignoring request" + } + + if err != nil { + logger.Warn().Err(err).Msg("socketIO: performing subscription operation") + return fmt.Sprintf("unable to perform subscription operation: %v", err) + } + + logger.Debug().Msg("socketIO: subscription") + return "ok" +} diff --git a/internal/manager/eventbus/topics.go b/internal/manager/eventbus/topics.go new file mode 100644 index 00000000..96315f39 --- /dev/null +++ b/internal/manager/eventbus/topics.go @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +package eventbus + +import "fmt" + +const ( + // Topics on which events are published. + TopicJobUpdate EventTopic = "/jobs" // sends api.SocketIOJobUpdate + TopicLastRenderedImage EventTopic = "/last-rendered" // sends api.SocketIOLastRenderedUpdate + TopicTaskUpdate EventTopic = "/task" // sends api.SocketIOTaskUpdate + TopicWorkerUpdate EventTopic = "/workers" // sends api.SocketIOWorkerUpdate + TopicWorkerTagUpdate EventTopic = "/workertags" // sends api.SocketIOWorkerTagUpdate + TopicSubscription EventTopic = "/subscription" // clients send api.SocketIOSubscription + + // Parameterised topics. + TopicJobSpecific EventTopic = "/jobs/%s" // %s = job UUID + TopicJobLastRendered EventTopic = "/jobs/%s/last-rendered" // %s = job UUID + TopicTaskLog EventTopic = "/tasklog/%s" // %s = task UUID +) + +// topicForJob will return the event topic for the given job. Clients subscribed +// to this topic receive info scoped to this job, so for example updates to all +// tasks of this job. +func topicForJob(jobUUID string) EventTopic { + return EventTopic(fmt.Sprintf(string(TopicJobSpecific), jobUUID)) +} +func topicForJobLastRendered(jobUUID string) EventTopic { + return EventTopic(fmt.Sprintf(string(TopicJobLastRendered), jobUUID)) +} + +// topicForTaskLog will return the event topic for receiving task logs of +// the the given task. +// +// Note that general task updates are sent to their job's topic, and not to this +// one. +func topicForTaskLog(taskUUID string) EventTopic { + return EventTopic(fmt.Sprintf(string(TopicTaskLog), taskUUID)) +} diff --git a/internal/manager/job_deleter/interfaces.go b/internal/manager/job_deleter/interfaces.go index ab0f1747..527d71c5 100644 --- a/internal/manager/job_deleter/interfaces.go +++ b/internal/manager/job_deleter/interfaces.go @@ -6,9 +6,9 @@ import ( "context" "time" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/local_storage" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/shaman" ) @@ -44,8 +44,8 @@ type ChangeBroadcaster interface { BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) type Shaman interface { // IsEnabled returns whether this Shaman service is enabled or not. diff --git a/internal/manager/job_deleter/job_deleter.go b/internal/manager/job_deleter/job_deleter.go index 95fe7589..15b3eb9c 100644 --- a/internal/manager/job_deleter/job_deleter.go +++ b/internal/manager/job_deleter/job_deleter.go @@ -18,8 +18,8 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/shaman" ) @@ -76,7 +76,7 @@ func (s *Service) QueueJobDeletion(ctx context.Context, job *persistence.Job) er } // Broadcast that this job was queued for deleted. - jobUpdate := webupdates.NewJobUpdate(job) + jobUpdate := eventbus.NewJobUpdate(job) s.changeBroadcaster.BroadcastJobUpdate(jobUpdate) // Let the Run() goroutine know this job is ready for deletion. @@ -128,7 +128,7 @@ func (s *Service) broadcastAndQueueMassJobDeletion(ctx context.Context, jobUUIDs Msg("job deleter: unable to fetch job to send updates") continue } - jobUpdate := webupdates.NewJobUpdate(job) + jobUpdate := eventbus.NewJobUpdate(job) s.changeBroadcaster.BroadcastJobUpdate(jobUpdate) } } diff --git a/internal/manager/sleep_scheduler/interfaces.go b/internal/manager/sleep_scheduler/interfaces.go index 03eaa85f..de653ad5 100644 --- a/internal/manager/sleep_scheduler/interfaces.go +++ b/internal/manager/sleep_scheduler/interfaces.go @@ -5,8 +5,8 @@ package sleep_scheduler import ( "context" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -33,5 +33,5 @@ type ChangeBroadcaster interface { BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms. -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker. +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) diff --git a/internal/manager/task_logs/task_logs.go b/internal/manager/task_logs/task_logs.go index ef48e120..bc6e3f4a 100644 --- a/internal/manager/task_logs/task_logs.go +++ b/internal/manager/task_logs/task_logs.go @@ -13,7 +13,7 @@ import ( "github.com/benbjohnson/clock" "github.com/rs/zerolog" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -47,8 +47,8 @@ type ChangeBroadcaster interface { BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) // NewStorage creates a new log storage rooted at `basePath`. func NewStorage( @@ -72,7 +72,7 @@ func (s *Storage) Write(logger zerolog.Logger, jobID, taskID string, logText str } // Broadcast the task log to SocketIO clients. - taskUpdate := webupdates.NewTaskLogUpdate(taskID, logText) + taskUpdate := eventbus.NewTaskLogUpdate(taskID, logText) s.broadcaster.BroadcastTaskLogUpdate(taskUpdate) return nil } diff --git a/internal/manager/task_state_machine/interfaces.go b/internal/manager/task_state_machine/interfaces.go index 28e66d9f..829b0dc2 100644 --- a/internal/manager/task_state_machine/interfaces.go +++ b/internal/manager/task_state_machine/interfaces.go @@ -6,9 +6,9 @@ import ( "context" "github.com/rs/zerolog" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/internal/manager/task_logs" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -49,8 +49,8 @@ type ChangeBroadcaster interface { BroadcastTaskUpdate(jobUpdate api.SocketIOTaskUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) // LogStorage writes to task logs. type LogStorage interface { diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index 1e696dd9..38b02659 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -9,8 +9,8 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -88,7 +88,7 @@ func (sm *StateMachine) taskStatusChangeOnly( } // Broadcast this change to the SocketIO clients. - taskUpdate := webupdates.NewTaskUpdate(task) + taskUpdate := eventbus.NewTaskUpdate(task) taskUpdate.PreviousStatus = &oldTaskStatus sm.broadcaster.BroadcastTaskUpdate(taskUpdate) @@ -331,7 +331,7 @@ func (sm *StateMachine) jobStatusSet(ctx context.Context, } // Broadcast this change to the SocketIO clients. - jobUpdate := webupdates.NewJobUpdate(job) + jobUpdate := eventbus.NewJobUpdate(job) jobUpdate.PreviousStatus = &oldJobStatus jobUpdate.RefreshTasks = result.massTaskUpdate sm.broadcaster.BroadcastJobUpdate(jobUpdate) diff --git a/internal/manager/timeout_checker/interfaces.go b/internal/manager/timeout_checker/interfaces.go index 2d2b35b8..ae8a9fd9 100644 --- a/internal/manager/timeout_checker/interfaces.go +++ b/internal/manager/timeout_checker/interfaces.go @@ -7,9 +7,9 @@ import ( "time" "github.com/rs/zerolog" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/internal/manager/task_state_machine" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -43,5 +43,5 @@ type ChangeBroadcaster interface { BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms. -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker. +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) diff --git a/internal/manager/webupdates/chat.go b/internal/manager/webupdates/chat.go deleted file mode 100644 index 4723abdb..00000000 --- a/internal/manager/webupdates/chat.go +++ /dev/null @@ -1,17 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -package webupdates - -import gosocketio "github.com/graarh/golang-socketio" - -func (b *BiDirComms) registerChatEventHandlers() { - _ = b.sockserv.On(string(SIOEventChatMessageRcv), - func(c *gosocketio.Channel, message Message) string { - logger := sioLogger(c) - logger.Info(). - Str("text", message.Text). - Str("name", message.Name). - Msg("socketIO: message received") - b.BroadcastTo(SocketIORoomChat, SIOEventChatMessageSend, message) - return "message sent successfully." - }) -} diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go deleted file mode 100644 index 2ddba497..00000000 --- a/internal/manager/webupdates/sio_rooms.go +++ /dev/null @@ -1,135 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -package webupdates - -import ( - "fmt" - - gosocketio "github.com/graarh/golang-socketio" - - "projects.blender.org/studio/flamenco/internal/uuid" - "projects.blender.org/studio/flamenco/pkg/api" -) - -// Separate type aliases for room names and event types; it's otherwise too easy -// to confuse the two. -type ( - SocketIORoomName string - SocketIOEventType string -) - -const ( - // Predefined SocketIO rooms. There will be others, but those will have a - // dynamic name like `job-fa48930a-105c-4125-a7f7-0aa1651dcd57` and cannot be - // listed here as constants. See `roomXXX()` functions for those. - SocketIORoomChat SocketIORoomName = "Chat" // For chat messages. - SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates. - SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates. - SocketIORoomWorkerTags SocketIORoomName = "WorkerTags" // For worker tag updates. - - // For updates about ALL last-rendered images. Normally these are sent to a - // room specific to a particular job, but for the global "last rendered image" - // all updates are sent here too. - SocketIORoomLastRendered SocketIORoomName = "Last-Rendered" -) - -const ( - // Predefined SocketIO event types. - SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send chat messages here - SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here - SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate - SIOEventLastRenderedUpdate SocketIOEventType = "/last-rendered" // sends api.SocketIOLastRenderedUpdate - SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate - SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate - SIOEventWorkerUpdate SocketIOEventType = "/workers" // sends api.SocketIOWorkerUpdate - SIOEventWorkerTagUpdate SocketIOEventType = "/workertags" // sends api.SocketIOWorkerTagUpdate - SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription -) - -func (b *BiDirComms) BroadcastTo(room SocketIORoomName, eventType SocketIOEventType, payload interface{}) { - b.sockserv.BroadcastTo(string(room), string(eventType), payload) -} - -func (b *BiDirComms) registerRoomEventHandlers() { - _ = b.sockserv.On(string(SIOEventSubscription), b.handleRoomSubscription) -} - -func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string { - logger := sioLogger(c) - logCtx := logger.With(). - Str("op", string(subs.Op)). - Str("type", string(subs.Type)) - if subs.Uuid != nil { - logCtx = logCtx.Str("uuid", string(*subs.Uuid)) - } - logger = logCtx.Logger() - - if subs.Uuid != nil && !uuid.IsValid(*subs.Uuid) { - logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request") - return "invalid UUID, ignoring request" - } - - var sioRoom SocketIORoomName - switch subs.Type { - case api.SocketIOSubscriptionTypeAllJobs: - sioRoom = SocketIORoomJobs - case api.SocketIOSubscriptionTypeAllWorkers: - sioRoom = SocketIORoomWorkers - case api.SocketIOSubscriptionTypeAllLastRendered: - sioRoom = SocketIORoomLastRendered - case api.SocketIOSubscriptionTypeAllWorkerTags: - sioRoom = SocketIORoomWorkerTags - case api.SocketIOSubscriptionTypeJob: - if subs.Uuid == nil { - logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID") - return "operation on job requires a UUID" - } - sioRoom = roomForJob(*subs.Uuid) - case api.SocketIOSubscriptionTypeTasklog: - if subs.Uuid == nil { - logger.Warn().Msg("socketIO: trying to (un)subscribe to task without UUID") - return "operation on task requires a UUID" - } - sioRoom = roomForTaskLog(*subs.Uuid) - default: - logger.Warn().Msg("socketIO: unknown subscription type, ignoring") - return "unknown subscription type, ignoring request" - } - - var err error - switch subs.Op { - case api.SocketIOSubscriptionOperationSubscribe: - err = c.Join(string(sioRoom)) - case api.SocketIOSubscriptionOperationUnsubscribe: - err = c.Leave(string(sioRoom)) - default: - logger.Warn().Msg("socketIO: invalid subscription operation, ignoring") - return "invalid subscription operation, ignoring request" - } - - if err != nil { - logger.Warn().Err(err).Msg("socketIO: performing subscription operation") - return fmt.Sprintf("unable to perform subscription operation: %v", err) - } - - logger.Debug().Msg("socketIO: subscription") - return "ok" -} - -// roomForJob will return the SocketIO room name for the given job. Clients in -// this room will receive info scoped to this job, so for example updates to all -// tasks of this job. -// -// Note that `api.SocketIOJobUpdate`s themselves are sent to all SocketIO clients, and -// not to this room. -func roomForJob(jobUUID string) SocketIORoomName { - return SocketIORoomName("job-" + jobUUID) -} - -// roomForTaskLog will return the SocketIO room name for receiving task logs of -// the the given task. -// -// Note that general task updates (`api.SIOEventTaskUpdate`) are sent to their -// job's room, and not to this room. -func roomForTaskLog(taskUUID string) SocketIORoomName { - return SocketIORoomName("tasklog-" + taskUUID) -} diff --git a/internal/manager/webupdates/webupdates.go b/internal/manager/webupdates/webupdates.go deleted file mode 100644 index 6b31b4d8..00000000 --- a/internal/manager/webupdates/webupdates.go +++ /dev/null @@ -1,70 +0,0 @@ -// package webupdates uses SocketIO to send updates to a web client. -// SPDX-License-Identifier: GPL-3.0-or-later -package webupdates - -import ( - gosocketio "github.com/graarh/golang-socketio" - "github.com/graarh/golang-socketio/transport" - "github.com/labstack/echo/v4" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" -) - -type BiDirComms struct { - sockserv *gosocketio.Server -} - -type Message struct { - Name string `json:"name"` - Text string `json:"text"` -} - -func New() *BiDirComms { - bdc := BiDirComms{ - sockserv: gosocketio.NewServer(transport.GetDefaultWebsocketTransport()), - } - bdc.registerSIOEventHandlers() - return &bdc -} - -func (b *BiDirComms) RegisterHandlers(router *echo.Echo) { - router.Any("/socket.io/", echo.WrapHandler(b.sockserv)) -} - -func (b *BiDirComms) registerSIOEventHandlers() { - log.Debug().Msg("initialising SocketIO") - - sio := b.sockserv - // the sio.On() and c.Join() calls only return an error when there is no - // server connected to them, but that's not possible with our setup. - // Errors are explicitly silenced (by assigning to _) to reduce clutter. - - // socket connection - _ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) { - logger := sioLogger(c) - logger.Debug().Msg("socketIO: connected") - _ = c.Join(string(SocketIORoomChat)) // All clients connect to the chat room. - }) - - // socket disconnection - _ = sio.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) { - logger := sioLogger(c) - logger.Debug().Msg("socketIO: disconnected") - }) - - _ = sio.On(gosocketio.OnError, func(c *gosocketio.Channel) { - logger := sioLogger(c) - logger.Warn().Msg("socketIO: socketio error") - }) - - b.registerChatEventHandlers() - b.registerRoomEventHandlers() -} - -func sioLogger(c *gosocketio.Channel) zerolog.Logger { - logger := log.With(). - Str("clientID", c.Id()). - Str("remoteAddr", c.Ip()). - Logger() - return logger -}