Worker: separate contexts of upstream buffer

The upstream buffer takes care of two things: communication with Flamenco
Manager (first context) and buffering things in a SQLite database (second
context). This commit separates those two contexts, so that shutting down
the application isn't going to prevent buffering things in the database.
This commit is contained in:
Sybren A. Stüvel 2022-04-08 16:49:53 +02:00
parent 959a235c59
commit cc20b850ec
2 changed files with 47 additions and 27 deletions

@ -21,6 +21,13 @@ import (
// TODO: pull the SQLite stuff out of this file into a more global place, so
// that other areas of Flamenco Worker can also use it.
// Note that there are two contexts used in this file. One (`dbCtx`) is for
// database access and is a local, short-lived, background context. The other
// (`ctx`) is the one that's passed from the caller, which should indicate the
// global worker context. If that context is done, queueing updates in the
// database will still work, but all communication with Flamenco Manager will
// halt.
// UpstreamBufferDB implements the UpstreamBuffer interface using a database as backend.
type UpstreamBufferDB struct {
db *sql.DB
@ -35,6 +42,7 @@ type UpstreamBufferDB struct {
}
const defaultUpstreamFlushInterval = 30 * time.Second
const databaseContextTimeout = 10 * time.Second
var _ UpstreamBuffer = (*UpstreamBufferDB)(nil)
@ -54,7 +62,7 @@ func NewUpstreamBuffer(client FlamencoClient, clock TimeService) (*UpstreamBuffe
}
// OpenDB opens the database. Must be called once before using.
func (ub *UpstreamBufferDB) OpenDB(ctx context.Context, databaseFilename string) error {
func (ub *UpstreamBufferDB) OpenDB(dbCtx context.Context, databaseFilename string) error {
if ub.db != nil {
return errors.New("upstream buffer database already opened")
}
@ -64,13 +72,13 @@ func (ub *UpstreamBufferDB) OpenDB(ctx context.Context, databaseFilename string)
return fmt.Errorf("opening %s: %w", databaseFilename, err)
}
if err := db.PingContext(ctx); err != nil {
if err := db.PingContext(dbCtx); err != nil {
return fmt.Errorf("accessing %s: %w", databaseFilename, err)
}
ub.db = db
if err := ub.prepareDatabase(ctx); err != nil {
if err := ub.prepareDatabase(dbCtx); err != nil {
return err
}
@ -84,7 +92,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u
ub.dbMutex.Lock()
defer ub.dbMutex.Unlock()
queueSize, err := ub.queueSize(ctx)
queueSize, err := ub.queueSize()
if err != nil {
return fmt.Errorf("unable to determine upstream queue size: %w", err)
}
@ -93,7 +101,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u
if queueSize > 0 {
log.Debug().Int("queueSize", queueSize).
Msg("task updates already queued, immediately queueing new update")
return ub.queueTaskUpdate(ctx, taskID, update)
return ub.queueTaskUpdate(taskID, update)
}
// Try to deliver the update.
@ -101,7 +109,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u
if err != nil {
log.Warn().Err(err).Str("task", taskID).
Msg("error communicating with Manager, going to queue task update for sending later")
return ub.queueTaskUpdate(ctx, taskID, update)
return ub.queueTaskUpdate(taskID, update)
}
// The Manager responded, so no need to queue this update, even when there was an error.
@ -117,7 +125,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u
}
// Close releases the database. It does not try to flush any pending items.
func (ub *UpstreamBufferDB) Close(ctx context.Context) error {
func (ub *UpstreamBufferDB) Close() error {
if ub.db == nil {
return nil
}
@ -131,11 +139,11 @@ func (ub *UpstreamBufferDB) Close(ctx context.Context) error {
}
// prepareDatabase creates the database schema, if necessary.
func (ub *UpstreamBufferDB) prepareDatabase(ctx context.Context) error {
func (ub *UpstreamBufferDB) prepareDatabase(dbCtx context.Context) error {
ub.dbMutex.Lock()
defer ub.dbMutex.Unlock()
tx, err := ub.db.BeginTx(ctx, nil)
tx, err := ub.db.BeginTx(dbCtx, nil)
if err != nil {
return fmt.Errorf("beginning database transaction: %w", err)
}
@ -144,7 +152,7 @@ func (ub *UpstreamBufferDB) prepareDatabase(ctx context.Context) error {
stmt := `CREATE TABLE IF NOT EXISTS task_update_queue(task_id VARCHAR(36), payload BLOB)`
log.Debug().Str("sql", stmt).Msg("creating database table")
if _, err := tx.ExecContext(ctx, stmt); err != nil {
if _, err := tx.ExecContext(dbCtx, stmt); err != nil {
return fmt.Errorf("creating database table: %w", err)
}
@ -155,15 +163,18 @@ func (ub *UpstreamBufferDB) prepareDatabase(ctx context.Context) error {
return nil
}
func (ub *UpstreamBufferDB) queueSize(ctx context.Context) (int, error) {
func (ub *UpstreamBufferDB) queueSize() (int, error) {
if ub.db == nil {
log.Panic().Msg("no database opened, unable to inspect upstream queue")
}
dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout)
defer dbCtxCancel()
var queueSize int
err := ub.db.
QueryRowContext(ctx, "SELECT count(*) FROM task_update_queue").
QueryRowContext(dbCtx, "SELECT count(*) FROM task_update_queue").
Scan(&queueSize)
switch {
@ -176,12 +187,15 @@ func (ub *UpstreamBufferDB) queueSize(ctx context.Context) (int, error) {
}
}
func (ub *UpstreamBufferDB) queueTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {
func (ub *UpstreamBufferDB) queueTaskUpdate(taskID string, update api.TaskUpdateJSONRequestBody) error {
if ub.db == nil {
log.Panic().Msg("no database opened, unable to queue task updates")
}
tx, err := ub.db.BeginTx(ctx, nil)
dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout)
defer dbCtxCancel()
tx, err := ub.db.BeginTx(dbCtx, nil)
if err != nil {
return fmt.Errorf("beginning database transaction: %w", err)
}
@ -195,7 +209,7 @@ func (ub *UpstreamBufferDB) queueTaskUpdate(ctx context.Context, taskID string,
stmt := `INSERT INTO task_update_queue (task_id, payload) VALUES (?, ?)`
log.Debug().Str("sql", stmt).Str("task", taskID).Msg("inserting task update")
if _, err := tx.ExecContext(ctx, stmt, taskID, blob); err != nil {
if _, err := tx.ExecContext(dbCtx, stmt, taskID, blob); err != nil {
return fmt.Errorf("queueing task update: %w", err)
}
@ -215,7 +229,7 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error {
}
// See if we need to flush at all.
queueSize, err := ub.queueSize(ctx)
queueSize, err := ub.queueSize()
switch {
case err != nil:
return fmt.Errorf("unable to determine queue size: %w", err)
@ -237,7 +251,10 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error {
}
func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err error) {
tx, err := ub.db.BeginTx(ctx, nil)
dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout)
defer dbCtxCancel()
tx, err := ub.db.BeginTx(dbCtx, nil)
if err != nil {
return false, fmt.Errorf("beginning database transaction: %w", err)
}
@ -250,7 +267,7 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err
var taskID string
var blob []byte
err = tx.QueryRowContext(ctx, stmt).Scan(&rowID, &taskID, &blob)
err = tx.QueryRowContext(dbCtx, stmt).Scan(&rowID, &taskID, &blob)
switch {
case err == sql.ErrNoRows:
// Flush operation is done.
@ -268,7 +285,7 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err
// than to discard it and ignore it ever happened.
logger.Warn().Err(err).
Msg("unable to unmarshal queued task update, discarding")
if err := ub.discardRow(ctx, tx, rowID); err != nil {
if err := ub.discardRow(tx, rowID); err != nil {
return false, err
}
return false, tx.Commit()
@ -295,17 +312,20 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err
Msg("queued task update discarded by Manager, unknown reason")
}
if err := ub.discardRow(ctx, tx, rowID); err != nil {
if err := ub.discardRow(tx, rowID); err != nil {
return false, err
}
return false, tx.Commit()
}
func (ub *UpstreamBufferDB) discardRow(ctx context.Context, tx *sql.Tx, rowID int64) error {
func (ub *UpstreamBufferDB) discardRow(tx *sql.Tx, rowID int64) error {
dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout)
defer dbCtxCancel()
stmt := `DELETE FROM task_update_queue WHERE rowid = ?`
log.Trace().Str("sql", stmt).Int64("rowID", rowID).Msg("un-queueing task update")
_, err := tx.ExecContext(ctx, stmt, rowID)
_, err := tx.ExecContext(dbCtx, stmt, rowID)
if err != nil {
return fmt.Errorf("un-queueing task update: %w", err)
}
@ -334,7 +354,7 @@ func (ub *UpstreamBufferDB) periodicFlushLoop() {
}
func rollbackTransaction(tx *sql.Tx) {
if err := tx.Rollback(); err != nil {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
log.Error().Err(err).Msg("rolling back transaction")
}
}

@ -46,7 +46,7 @@ func TestUpstreamBufferCloseUnopened(t *testing.T) {
defer mockCtrl.Finish()
ub, _ := mockUpstreamBufferDB(t, mockCtrl)
err := ub.Close(context.Background())
err := ub.Close()
assert.NoError(t, err, "Closing without opening should be OK")
}
@ -76,7 +76,7 @@ func TestUpstreamBufferManagerUnavailable(t *testing.T) {
assert.NoError(t, err)
// Check the queue size, it should have an item queued.
queueSize, err := ub.queueSize(ctx)
queueSize, err := ub.queueSize()
assert.NoError(t, err)
assert.Equal(t, 1, queueSize)
@ -94,10 +94,10 @@ func TestUpstreamBufferManagerUnavailable(t *testing.T) {
// Queue should be empty now.
ub.dbMutex.Lock()
queueSize, err = ub.queueSize(ctx)
queueSize, err = ub.queueSize()
ub.dbMutex.Unlock()
assert.NoError(t, err)
assert.Equal(t, 0, queueSize)
assert.NoError(t, ub.Close(ctx))
assert.NoError(t, ub.Close())
}