Very basic non-functional framework for a task runner

Also has some login/logout functionality for storing stuff in the DB.
This commit is contained in:
Sybren A. Stüvel 2022-01-31 16:05:27 +01:00
parent d3071146da
commit be89349632
11 changed files with 217 additions and 20 deletions

@ -67,8 +67,8 @@ func main() {
shutdownComplete = make(chan struct{})
taskRunner := struct{}{}
w = worker.NewWorker(client, taskRunner)
taskRunner := worker.TaskExecutor{}
w = worker.NewWorker(client, &taskRunner)
// Handle Ctrl+C
c := make(chan os.Signal, 1)

@ -42,6 +42,7 @@ type PersistenceService interface {
CreateWorker(ctx context.Context, w *persistence.Worker) error
FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error)
SaveWorker(ctx context.Context, w *persistence.Worker) error
}
type JobCompiler interface {

@ -79,7 +79,7 @@ func WorkerAuth(ctx context.Context, authInfo *openapi3filter.AuthenticationInpu
return nil
}
// requestWorker returns the Worker associated with this HTTP request.
// requestWorker returns the Worker associated with this HTTP request, or nil if there is none.
func requestWorker(e echo.Context) *persistence.Worker {
ctx := e.Request().Context()
worker, ok := ctx.Value(workerKey).(*persistence.Worker)
@ -88,3 +88,13 @@ func requestWorker(e echo.Context) *persistence.Worker {
}
return nil
}
// requestWorkerOrPanic returns the Worker associated with this HTTP request, or panics if there is none.
func requestWorkerOrPanic(e echo.Context) *persistence.Worker {
w := requestWorker(e)
if w == nil {
logger := requestLogger(e)
logger.Panic().Msg("no worker available where one was expected")
}
return w
}

@ -90,10 +90,23 @@ func (f *Flamenco) SignOn(e echo.Context) error {
logger.Info().Msg("worker signing on")
return e.JSON(http.StatusOK, &api.WorkerStateChange{
// TODO: look up proper status in DB.
StatusRequested: api.WorkerStatusAwake,
})
w := requestWorkerOrPanic(e)
w.Status = api.WorkerStatusStarting
err = f.persist.SaveWorker(e.Request().Context(), w)
if err != nil {
logger.Warn().Err(err).
Str("newStatus", string(w.Status)).
Msg("error storing Worker in database")
return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
}
resp := api.WorkerStateChange{}
if w.StatusRequested != "" {
resp.StatusRequested = w.StatusRequested
} else {
resp.StatusRequested = api.WorkerStatusAwake
}
return e.JSON(http.StatusOK, resp)
}
func (f *Flamenco) SignOff(e echo.Context) error {
@ -107,8 +120,20 @@ func (f *Flamenco) SignOff(e echo.Context) error {
}
logger.Info().Msg("worker signing off")
w := requestWorkerOrPanic(e)
w.Status = api.WorkerStatusOffline
// TODO: check whether we should pass the request context here, or a generic
// background context, as this should be stored even when the HTTP connection
// is aborted.
err = f.persist.SaveWorker(e.Request().Context(), w)
if err != nil {
logger.Warn().
Err(err).
Str("newStatus", string(w.Status)).
Msg("error storing worker status in database")
return sendAPIError(e, http.StatusInternalServerError, "error storing new status in database")
}
// TODO: store status in DB.
return e.String(http.StatusNoContent, "")
}
@ -131,6 +156,17 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
}
logger.Info().Str("newStatus", string(req.Status)).Msg("worker changed status")
w := requestWorkerOrPanic(e)
w.Status = req.Status
err = f.persist.SaveWorker(e.Request().Context(), w)
if err != nil {
logger.Warn().Err(err).
Str("newStatus", string(w.Status)).
Msg("error storing Worker in database")
return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
}
return e.String(http.StatusNoContent, "")
}

@ -34,11 +34,12 @@ type Worker struct {
Secret string `gorm:"type:varchar(255);not null"`
Name string `gorm:"type:varchar(64);not null"`
Address string `gorm:"type:varchar(39);not null;index"` // 39 = max length of IPv6 address.
LastActivity string `gorm:"type:varchar(255);not null"`
Platform string `gorm:"type:varchar(16);not null"`
Software string `gorm:"type:varchar(32);not null"`
Status api.WorkerStatus `gorm:"type:varchar(16);not null"`
Address string `gorm:"type:varchar(39);not null;index"` // 39 = max length of IPv6 address.
LastActivity string `gorm:"type:varchar(255);not null"`
Platform string `gorm:"type:varchar(16);not null"`
Software string `gorm:"type:varchar(32);not null"`
Status api.WorkerStatus `gorm:"type:varchar(16);not null"`
StatusRequested api.WorkerStatus `gorm:"type:varchar(16);not null;default:''"`
SupportedTaskTypes string `gorm:"type:varchar(255);not null"` // comma-separated list of task types.
}
@ -58,3 +59,10 @@ func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) {
}
return &w, nil
}
func (db *DB) SaveWorker(ctx context.Context, w *Worker) error {
if err := db.gormDB.Save(w).Error; err != nil {
return fmt.Errorf("error saving worker: %v", err)
}
return nil
}

@ -1,5 +1,25 @@
package worker
/* ***** BEGIN GPL LICENSE BLOCK *****
*
* Original Code Copyright (C) 2022 Blender Foundation.
*
* This file is part of Flamenco.
*
* Flamenco is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
*
* ***** END GPL LICENSE BLOCK ***** */
import (
"context"
"net/http"

@ -1,5 +1,25 @@
package worker
/* ***** BEGIN GPL LICENSE BLOCK *****
*
* Original Code Copyright (C) 2022 Blender Foundation.
*
* This file is part of Flamenco.
*
* Flamenco is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
*
* ***** END GPL LICENSE BLOCK ***** */
import (
"context"
"errors"
@ -34,13 +54,20 @@ func (w *Worker) gotoStateAwake(ctx context.Context) {
func (w *Worker) runStateAwake(ctx context.Context) {
defer w.doneWg.Done()
task := w.fetchTask(ctx)
if task == nil {
return
}
// TODO: actually execute the task
log.Error().Interface("task", *task).Msg("task execution not implemented yet")
for {
task := w.fetchTask(ctx)
if task == nil {
return
}
err := w.taskRunner.Run(ctx, *task)
if err != nil {
log.Warn().Err(err).Interface("task", *task).Msg("error executing task")
}
// TODO: send the result of the execution back to the Manager.
}
}
// fetchTasks periodically tries to fetch a task from the Manager, returning it when obtained.
@ -70,6 +97,7 @@ func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask {
resp, err := w.client.ScheduleTaskWithResponse(ctx)
if err != nil {
log.Error().Err(err).Msg("error obtaining task")
return nil
}
switch {
case resp.JSON200 != nil:

@ -1,5 +1,25 @@
package worker
/* ***** BEGIN GPL LICENSE BLOCK *****
*
* Original Code Copyright (C) 2022 Blender Foundation.
*
* This file is part of Flamenco.
*
* Flamenco is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
*
* ***** END GPL LICENSE BLOCK ***** */
import (
"context"
"os"

@ -1,5 +1,25 @@
package worker
/* ***** BEGIN GPL LICENSE BLOCK *****
*
* Original Code Copyright (C) 2022 Blender Foundation.
*
* This file is part of Flamenco.
*
* Flamenco is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
*
* ***** END GPL LICENSE BLOCK ***** */
import (
"context"

@ -0,0 +1,52 @@
package worker
/* ***** BEGIN GPL LICENSE BLOCK *****
*
* Original Code Copyright (C) 2022 Blender Foundation.
*
* This file is part of Flamenco.
*
* Flamenco is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
*
* ***** END GPL LICENSE BLOCK ***** */
import (
"context"
"errors"
"time"
"github.com/rs/zerolog/log"
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
)
type TaskExecutor struct{}
var _ TaskRunner = (*TaskExecutor)(nil)
func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error {
logger := log.With().Str("task", task.Uuid).Logger()
logger.Info().Str("taskType", task.TaskType).Msg("starting task")
for _, cmd := range task.Commands {
cmdLogger := logger.With().Str("command", cmd.Name).Interface("settings", cmd.Settings).Logger()
cmdLogger.Info().Msg("running command")
select {
case <-ctx.Done():
cmdLogger.Warn().Msg("command execution aborted due to context shutdown")
case <-time.After(1 * time.Second):
cmdLogger.Debug().Msg("mocked duration of command")
}
}
return errors.New("task running not implemented")
}

@ -29,7 +29,9 @@ type Worker struct {
type StateStarter func(context.Context)
type TaskRunner interface{}
type TaskRunner interface {
Run(ctx context.Context, task api.AssignedTask) error
}
// NewWorker constructs and returns a new Worker.
func NewWorker(