Flamenco/internal/stresser/fake_worker.go
Sybren A. Stüvel ab8ecc24cc Cleanup: Add missing license specifiers
Add license specifiers to Go files that were missing them:

```
// SPDX-License-Identifier: GPL-3.0-or-later
```

No functional changes.
2022-07-25 16:08:07 +02:00

128 lines
3.4 KiB
Go

package stresser
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/internal/worker"
"git.blender.org/flamenco/pkg/api"
)
const (
// For fetching the task to stress test.
durationNoTask = 1 * time.Second // ... if there is no task now.
durationFetchFailed = 2 * time.Second // ... if fetching failed somehow.
)
var (
ErrTaskReassigned = worker.ErrTaskReassigned
ErrTaskUpdateRejected = errors.New("task update was rejected")
)
func GetFlamencoClient(
ctx context.Context,
config worker.WorkerConfigWithCredentials,
) worker.FlamencoClient {
startupCtx, startupCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer startupCtxCancel()
client, startupState := worker.RegisterOrSignOn(startupCtx, config)
if startupState != api.WorkerStatusAwake {
log.Fatal().Str("requestedStartupState", string(startupState)).Msg("stresser should always be awake")
}
ackStateChange(ctx, client, startupState)
return client
}
func fetchTask(ctx context.Context, client worker.FlamencoClient) *api.AssignedTask {
// Initially don't wait at all.
var wait time.Duration
for {
select {
case <-ctx.Done():
log.Debug().Msg("task fetching interrupted by context cancellation")
return nil
case <-time.After(wait):
}
log.Debug().Msg("fetching tasks")
resp, err := client.ScheduleTaskWithResponse(ctx)
if err != nil {
log.Error().Err(err).Msg("error obtaining task")
wait = durationFetchFailed
continue
}
switch {
case resp.JSON200 != nil:
return resp.JSON200
case resp.JSON423 != nil:
log.Fatal().Str("requestedStatus", string(resp.JSON423.StatusRequested)).
Msg("Manager requests status change, stresser does not support this")
return nil
case resp.JSON403 != nil:
log.Error().
Int("code", resp.StatusCode()).
Str("error", string(resp.JSON403.Message)).
Msg("access denied")
wait = durationFetchFailed
case resp.StatusCode() == http.StatusNoContent:
log.Debug().Msg("no task available")
wait = durationNoTask
default:
log.Warn().
Int("code", resp.StatusCode()).
Str("error", string(resp.Body)).
Msg("unable to obtain task for unknown reason")
wait = durationFetchFailed
}
}
}
func ackStateChange(ctx context.Context, client worker.FlamencoClient, state api.WorkerStatus) {
req := api.WorkerStateChangedJSONRequestBody{Status: state}
logger := log.With().Str("state", string(state)).Logger()
logger.Debug().Msg("notifying Manager of our state")
resp, err := client.WorkerStateChangedWithResponse(ctx, req)
if err != nil {
logger.Fatal().Err(err).Msg("unable to notify Manager of status change")
return
}
// The 'default' response is for error cases.
if resp.JSONDefault != nil {
logger.Fatal().
Str("httpCode", resp.HTTPResponse.Status).
Interface("error", resp.JSONDefault).
Msg("error sending status change to Manager")
return
}
}
func sendTaskUpdate(ctx context.Context, client worker.FlamencoClient, taskID string, update api.TaskUpdate) error {
resp, err := client.TaskUpdateWithResponse(ctx, taskID, api.TaskUpdateJSONRequestBody(update))
if err != nil {
return err
}
switch resp.StatusCode() {
case http.StatusNoContent:
return nil
case http.StatusConflict:
return worker.ErrTaskReassigned
default:
return fmt.Errorf("%w: task=%s", ErrTaskUpdateRejected, taskID)
}
}