Fix #104201: Task Limit error in Flamenco Manager
Insert tasks in batches so that the required SQL query stays within the limits of SQLite. No changes to the API, only to the persistence layer. Reviewed-on: https://projects.blender.org/studio/flamenco/pulls/104205
This commit is contained in:
parent
ef53304b1b
commit
0502498dfa
@ -208,11 +208,19 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au
|
||||
}
|
||||
deps[i] = depTask
|
||||
}
|
||||
|
||||
dbTask.Dependencies = deps
|
||||
subQuery := tx.Model(dbTask).Updates(Task{Dependencies: deps})
|
||||
dependenciesbatchsize := 1000
|
||||
for j := 0; j < len(deps); j += dependenciesbatchsize {
|
||||
end := j + dependenciesbatchsize
|
||||
if end > len(deps) {
|
||||
end = len(deps)
|
||||
}
|
||||
currentDeps := deps[j:end]
|
||||
dbTask.Dependencies = currentDeps
|
||||
tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID)
|
||||
subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps})
|
||||
if subQuery.Error != nil {
|
||||
return taskError(subQuery.Error, "unable to store dependencies of task %q", authoredTask.UUID)
|
||||
return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -258,6 +258,21 @@ func TestCountTasksOfJobInStatus(t *testing.T) {
|
||||
assert.Equal(t, 3, numTotal)
|
||||
}
|
||||
|
||||
func TestCheckIfJobsHoldLargeNumOfTasks(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping test in short mode")
|
||||
}
|
||||
numtasks := 3500
|
||||
ctx, close, db, job, _ := jobTasksTestFixturesWithTaskNum(t, numtasks)
|
||||
defer close()
|
||||
|
||||
numQueued, numTotal, err := db.CountTasksOfJobInStatus(ctx, job, api.TaskStatusQueued)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, numtasks, numQueued)
|
||||
assert.Equal(t, numtasks, numTotal)
|
||||
|
||||
}
|
||||
|
||||
func TestFetchJobsInStatus(t *testing.T) {
|
||||
ctx, close, db, job1, _ := jobTasksTestFixtures(t)
|
||||
defer close()
|
||||
@ -594,6 +609,36 @@ func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob {
|
||||
return createTestAuthoredJob("263fd47e-b9f8-4637-b726-fd7e47ecfdae", task1, task2, task3)
|
||||
}
|
||||
|
||||
func createTestAuthoredJobWithNumTasks(numTasks int) job_compilers.AuthoredJob {
|
||||
//Generates all of the render jobs
|
||||
prevtasks := make([]*job_compilers.AuthoredTask, 0)
|
||||
for i := 0; i < numTasks-1; i++ {
|
||||
currtask := job_compilers.AuthoredTask{
|
||||
Name: "render-" + fmt.Sprintf("%d", i),
|
||||
Type: "blender-render",
|
||||
UUID: uuid.New(),
|
||||
Commands: []job_compilers.AuthoredCommand{},
|
||||
}
|
||||
prevtasks = append(prevtasks, &currtask)
|
||||
}
|
||||
// Generates the preview video command with Dependencies
|
||||
videoJob := job_compilers.AuthoredTask{
|
||||
Name: "preview-video",
|
||||
Type: "ffmpeg",
|
||||
UUID: uuid.New(),
|
||||
Commands: []job_compilers.AuthoredCommand{},
|
||||
Dependencies: prevtasks,
|
||||
}
|
||||
// convert pointers to values and generate job
|
||||
taskvalues := make([]job_compilers.AuthoredTask, len(prevtasks))
|
||||
for i, ptr := range prevtasks {
|
||||
taskvalues[i] = *ptr
|
||||
}
|
||||
taskvalues = append(taskvalues, videoJob)
|
||||
return createTestAuthoredJob(uuid.New(), taskvalues...)
|
||||
|
||||
}
|
||||
|
||||
func createTestAuthoredJob(jobID string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob {
|
||||
job := job_compilers.AuthoredJob{
|
||||
JobID: jobID,
|
||||
@ -676,6 +721,16 @@ func jobTasksTestFixtures(t *testing.T) (context.Context, context.CancelFunc, *D
|
||||
return ctx, cancel, db, dbJob, authoredJob
|
||||
}
|
||||
|
||||
// This created Test Jobs using the new function createTestAuthoredJobWithNumTasks so that you can set the number of tasks
|
||||
func jobTasksTestFixturesWithTaskNum(t *testing.T, numtasks int) (context.Context, context.CancelFunc, *DB, *Job, job_compilers.AuthoredJob) {
|
||||
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeoutlong)
|
||||
|
||||
authoredJob := createTestAuthoredJobWithNumTasks(numtasks)
|
||||
dbJob := persistAuthoredJob(t, ctx, db, authoredJob)
|
||||
|
||||
return ctx, cancel, db, dbJob, authoredJob
|
||||
}
|
||||
|
||||
func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*Worker)) *Worker {
|
||||
w := Worker{
|
||||
UUID: "f0a123a9-ab05-4ce2-8577-94802cfe74a4",
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
)
|
||||
|
||||
const schedulerTestTimeout = 100 * time.Millisecond
|
||||
const schedulerTestTimeoutlong = 5000 * time.Millisecond
|
||||
|
||||
func TestNoTasks(t *testing.T) {
|
||||
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
||||
|
Loading…
Reference in New Issue
Block a user