Manager: prepend timestamp to "task assigned to worker" task log entries

Add a new `clock` service to the Flamenco struct, which allows us to mock
the passing of time, and thus test for timestamps in a stable fashion.
This commit is contained in:
Sybren A. Stüvel 2022-06-09 11:24:02 +02:00
parent 7c43b9e1bc
commit 75903a2da3
9 changed files with 43 additions and 8 deletions

@ -161,7 +161,9 @@ func buildFlamencoAPI(
}
logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath)
shamanServer := shaman.NewServer(configService.Get().Shaman, nil)
flamenco := api_impl.NewFlamenco(compiler, persist, webUpdater, logStorage, configService, taskStateMachine, shamanServer)
flamenco := api_impl.NewFlamenco(
compiler, persist, webUpdater, logStorage, configService,
taskStateMachine, shamanServer, timeService)
return flamenco
}

@ -22,6 +22,7 @@ type Flamenco struct {
config ConfigService
stateMachine TaskStateMachine
shaman Shaman
clock TimeService
// The task scheduler can be locked to prevent multiple Workers from getting
// the same task. It is also used for certain other queries, like
@ -40,6 +41,7 @@ func NewFlamenco(
cs ConfigService,
sm TaskStateMachine,
sha Shaman,
ts TimeService,
) *Flamenco {
return &Flamenco{
jobCompiler: jc,
@ -49,6 +51,7 @@ func NewFlamenco(
config: cs,
stateMachine: sm,
shaman: sha,
clock: ts,
}
}

@ -8,7 +8,9 @@ package api_impl
import (
"context"
"io"
"time"
"github.com/benbjohnson/clock"
"github.com/rs/zerolog"
"git.blender.org/flamenco/internal/manager/job_compilers"
@ -124,3 +126,11 @@ type Shaman interface {
}
var _ Shaman = (*shaman.Server)(nil)
// TimeService provides functionality from the stdlib `time` module, but in a
// way that allows mocking.
type TimeService interface {
Now() time.Time
}
var _ TimeService = (clock.Clock)(nil)

@ -7,8 +7,10 @@ import (
"fmt"
"net/http"
"os"
"time"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/persistence"
@ -219,6 +221,13 @@ func (f *Flamenco) FetchTaskLogTail(e echo.Context, taskID string) error {
return e.String(http.StatusOK, tail)
}
// taskLogAppendTimestamped writes the given log text, prefixed with the current
// date & time, to the task's log.
func (f *Flamenco) taskLogAppendTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error {
now := f.clock.Now().Format(time.RFC3339)
return f.logStorage.Write(logger, jobID, taskID, now+" "+logText)
}
func jobDBtoAPI(dbJob *persistence.Job) api.Job {
apiJob := api.Job{
SubmittedJob: api.SubmittedJob{

@ -26,8 +26,8 @@ func TestFetchTask(t *testing.T) {
dbTask := persistence.Task{
Model: gorm.Model{
ID: 327,
CreatedAt: time.Now().Add(-30 * time.Second),
UpdatedAt: time.Now(),
CreatedAt: mf.clock.Now().Add(-30 * time.Second),
UpdatedAt: mf.clock.Now(),
},
UUID: taskUUID,
Name: "симпатичная задача",

@ -8,7 +8,6 @@ import (
"net/http"
"os"
"testing"
"time"
"git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/persistence"
@ -41,7 +40,7 @@ func TestSubmitJob(t *testing.T) {
JobType: submittedJob.Type,
Priority: submittedJob.Priority,
Status: api.JobStatusUnderConstruction,
Created: time.Now(),
Created: mf.clock.Now(),
}
mf.jobCompiler.EXPECT().Compile(gomock.Any(), submittedJob).Return(&authoredJob, nil)

@ -10,7 +10,9 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/golang/mock/gomock"
"github.com/labstack/echo/v4"
"github.com/stretchr/testify/assert"
@ -30,6 +32,7 @@ type mockedFlamenco struct {
config *mocks.MockConfigService
stateMachine *mocks.MockTaskStateMachine
shaman *mocks.MockShaman
clock *clock.Mock
}
func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
@ -40,7 +43,15 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
cs := mocks.NewMockConfigService(mockCtrl)
sm := mocks.NewMockTaskStateMachine(mockCtrl)
sha := mocks.NewMockShaman(mockCtrl)
f := NewFlamenco(jc, ps, cb, ls, cs, sm, sha)
clock := clock.NewMock()
mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00")
if err != nil {
panic(err)
}
clock.Set(mockedNow)
f := NewFlamenco(jc, ps, cb, ls, cs, sm, sha, clock)
return mockedFlamenco{
flamenco: f,
@ -50,6 +61,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
logStorage: ls,
config: cs,
stateMachine: sm,
clock: clock,
}
}

@ -319,7 +319,7 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
}
// Add a note to the task log about the worker assignment.
err = f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID,
err = f.taskLogAppendTimestamped(logger, dbTask.Job.UUID, dbTask.UUID,
fmt.Sprintf("Task assigned to worker %s (%s)", worker.Name, worker.UUID))
if err != nil {
logger.Error().Err(err).Msg("error writing to task log")

@ -36,7 +36,7 @@ func TestTaskScheduleHappy(t *testing.T) {
mf.persistence.EXPECT().ScheduleTask(echo.Request().Context(), &worker).Return(&task, nil)
mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task.UUID,
"Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)")
"2022-06-09T11:14:41+02:00 Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)")
err := mf.flamenco.ScheduleTask(echo)
assert.NoError(t, err)