Worker: buffer task updates when Manager offline
Queue task updates in an SQLite database when the Manager is unreachable, periodically checking whether they can be flushed.
This commit is contained in:
parent
e948c5d936
commit
2a1f5a0df4
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,6 +1,7 @@
|
||||
*.exe
|
||||
/*-poc
|
||||
/*.sqlite
|
||||
*.sqlite
|
||||
*.db
|
||||
*.blend[0-9]
|
||||
|
||||
/flamenco-worker.yaml
|
||||
|
@ -43,6 +43,7 @@ import (
|
||||
var (
|
||||
w *worker.Worker
|
||||
listener *worker.Listener
|
||||
buffer *worker.UpstreamBufferDB
|
||||
shutdownComplete chan struct{}
|
||||
)
|
||||
|
||||
@ -83,9 +84,12 @@ func main() {
|
||||
shutdownComplete = make(chan struct{})
|
||||
workerCtx, workerCtxCancel := context.WithCancel(context.Background())
|
||||
|
||||
cliRunner := worker.NewCLIRunner()
|
||||
listener = worker.NewListener(client)
|
||||
timeService := clock.New()
|
||||
buffer = upstreamBufferOrDie(client, timeService)
|
||||
go buffer.Flush(workerCtx) // Immediately try to flush any updates.
|
||||
|
||||
cliRunner := worker.NewCLIRunner()
|
||||
listener = worker.NewListener(client, buffer)
|
||||
cmdRunner := worker.NewCommandExecutor(cliRunner, listener, timeService)
|
||||
taskRunner := worker.NewTaskExecutor(cmdRunner, listener)
|
||||
w = worker.NewWorker(client, taskRunner)
|
||||
@ -121,6 +125,9 @@ func shutdown(signum os.Signal) {
|
||||
w.SignOff(shutdownCtx)
|
||||
w.Close()
|
||||
listener.Wait()
|
||||
if err := buffer.Close(shutdownCtx); err != nil {
|
||||
log.Error().Err(err).Msg("closing upstream task buffer")
|
||||
}
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
@ -155,3 +162,20 @@ func parseCliArgs() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func upstreamBufferOrDie(client worker.FlamencoClient, timeService clock.Clock) *worker.UpstreamBufferDB {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
buffer, err := worker.NewUpstreamBuffer(client, timeService)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("unable to create task update queue database")
|
||||
}
|
||||
|
||||
// TODO: make filename configurable?
|
||||
if err := buffer.OpenDB(ctx, "flamenco-worker-queue.db"); err != nil {
|
||||
log.Fatal().Err(err).Msg("unable to open task update queue database")
|
||||
}
|
||||
|
||||
return buffer
|
||||
}
|
||||
|
18
go.mod
18
go.mod
@ -9,6 +9,7 @@ require (
|
||||
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7
|
||||
github.com/getkin/kin-openapi v0.88.0
|
||||
github.com/golang/mock v1.6.0
|
||||
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/graarh/golang-socketio v0.0.0-20170510162725-2c44953b9b5f
|
||||
github.com/labstack/echo/v4 v4.6.1
|
||||
@ -18,9 +19,11 @@ require (
|
||||
github.com/ziflex/lecho/v3 v3.1.0
|
||||
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e
|
||||
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
gorm.io/driver/postgres v1.0.8
|
||||
gorm.io/gorm v1.21.4
|
||||
modernc.org/sqlite v1.14.6
|
||||
)
|
||||
|
||||
require (
|
||||
@ -32,7 +35,6 @@ require (
|
||||
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
|
||||
github.com/gofrs/uuid v4.0.0+incompatible // indirect
|
||||
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
|
||||
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
|
||||
github.com/gorilla/mux v1.8.0 // indirect
|
||||
github.com/gorilla/websocket v1.4.2 // indirect
|
||||
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
|
||||
@ -44,21 +46,31 @@ require (
|
||||
github.com/jackc/pgtype v1.6.2 // indirect
|
||||
github.com/jackc/pgx/v4 v4.10.1 // indirect
|
||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||
github.com/jinzhu/now v1.1.1 // indirect
|
||||
github.com/jinzhu/now v1.1.4 // indirect
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
|
||||
github.com/labstack/gommon v0.3.1 // indirect
|
||||
github.com/lib/pq v1.10.0 // indirect
|
||||
github.com/mailru/easyjson v0.7.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.14 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
|
||||
github.com/shopspring/decimal v1.2.0 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasttemplate v1.2.1 // indirect
|
||||
golang.org/x/mod v0.4.2 // indirect
|
||||
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b // indirect
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
|
||||
golang.org/x/tools v0.1.7 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||
lukechampine.com/uint128 v1.1.1 // indirect
|
||||
modernc.org/cc/v3 v3.35.22 // indirect
|
||||
modernc.org/ccgo/v3 v3.15.13 // indirect
|
||||
modernc.org/libc v1.14.5 // indirect
|
||||
modernc.org/mathutil v1.4.1 // indirect
|
||||
modernc.org/memory v1.0.5 // indirect
|
||||
modernc.org/opt v0.1.1 // indirect
|
||||
modernc.org/strutil v1.1.1 // indirect
|
||||
modernc.org/token v1.0.0 // indirect
|
||||
)
|
||||
|
150
go.sum
150
go.sum
@ -23,6 +23,8 @@ github.com/dop251/goja v0.0.0-20211217115348-3f9136fa235d h1:XT7Qdmcuwgsgz4GXejX
|
||||
github.com/dop251/goja v0.0.0-20211217115348-3f9136fa235d/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk=
|
||||
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7 h1:tYwu/z8Y0NkkzGEh3z21mSWggMg4LwLRFucLS7TjARg=
|
||||
github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y=
|
||||
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/getkin/kin-openapi v0.80.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg=
|
||||
github.com/getkin/kin-openapi v0.88.0 h1:BjJ2JERWJbYE1o1RGEj/5LmR5qw7ecfl3O3su4ImR+0=
|
||||
github.com/getkin/kin-openapi v0.88.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg=
|
||||
@ -58,6 +60,8 @@ github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaW
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
|
||||
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
|
||||
@ -124,10 +128,13 @@ github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dv
|
||||
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
|
||||
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
||||
github.com/jinzhu/now v1.1.1 h1:g39TucaRWyV3dwDO++eEc6qf8TVIQ/Da48WmqjZ3i7E=
|
||||
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||
github.com/jinzhu/now v1.1.4 h1:tHnRBy1i5F2Dh8BAFxqFzxKqqvezXrL2OW1TnX+Mlas=
|
||||
github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
@ -181,6 +188,8 @@ github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2y
|
||||
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
|
||||
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
|
||||
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
|
||||
github.com/mattn/go-sqlite3 v1.14.10 h1:MLn+5bFRlWMGoSRmJour3CL1w/qL96mvipqpwQW/Sfk=
|
||||
github.com/mattn/go-sqlite3 v1.14.10/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
@ -191,6 +200,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
|
||||
@ -294,6 +305,7 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201126233918-771906719818/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
@ -301,8 +313,10 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210902050250-f475640dd07b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211031064116-611d5d643895/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b h1:1VkfZQv42XQlA/jchYumAnv1UPo6RgF9rJFkTgZIxO4=
|
||||
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
@ -328,6 +342,7 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtn
|
||||
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200918232735-d647fc253266/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
|
||||
golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.0.0-20210114065538-d78b04bdf963/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ=
|
||||
@ -361,4 +376,137 @@ gorm.io/driver/postgres v1.0.8/go.mod h1:4eOzrI1MUfm6ObJU/UcmbXyiHSs8jSwH95G5P5d
|
||||
gorm.io/gorm v1.20.12/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
|
||||
gorm.io/gorm v1.21.4 h1:J0xfPJMRfHgpVcYLrEAIqY/apdvTIkrltPQNHQLq9Qc=
|
||||
gorm.io/gorm v1.21.4/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
|
||||
gorm.io/gorm v1.23.1 h1:aj5IlhDzEPsoIyOPtTRVI+SyaN1u6k613sbt4pwbxG0=
|
||||
gorm.io/gorm v1.23.1/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
|
||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
lukechampine.com/uint128 v1.1.1 h1:pnxCASz787iMf+02ssImqk6OLt+Z5QHMoZyUXR4z6JU=
|
||||
lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
|
||||
modernc.org/cc/v3 v3.33.6/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.33.9/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.33.11/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.34.0/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.0/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.4/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.5/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.7/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.8/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.10/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.15/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.16/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.17/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.18/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.20/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/cc/v3 v3.35.22 h1:BzShpwCAP7TWzFppM4k2t03RhXhgYqaibROWkrWq7lE=
|
||||
modernc.org/cc/v3 v3.35.22/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
|
||||
modernc.org/ccgo/v3 v3.9.5/go.mod h1:umuo2EP2oDSBnD3ckjaVUXMrmeAw8C8OSICVa0iFf60=
|
||||
modernc.org/ccgo/v3 v3.10.0/go.mod h1:c0yBmkRFi7uW4J7fwx/JiijwOjeAeR2NoSaRVFPmjMw=
|
||||
modernc.org/ccgo/v3 v3.11.0/go.mod h1:dGNposbDp9TOZ/1KBxghxtUp/bzErD0/0QW4hhSaBMI=
|
||||
modernc.org/ccgo/v3 v3.11.1/go.mod h1:lWHxfsn13L3f7hgGsGlU28D9eUOf6y3ZYHKoPaKU0ag=
|
||||
modernc.org/ccgo/v3 v3.11.3/go.mod h1:0oHunRBMBiXOKdaglfMlRPBALQqsfrCKXgw9okQ3GEw=
|
||||
modernc.org/ccgo/v3 v3.12.4/go.mod h1:Bk+m6m2tsooJchP/Yk5ji56cClmN6R1cqc9o/YtbgBQ=
|
||||
modernc.org/ccgo/v3 v3.12.6/go.mod h1:0Ji3ruvpFPpz+yu+1m0wk68pdr/LENABhTrDkMDWH6c=
|
||||
modernc.org/ccgo/v3 v3.12.8/go.mod h1:Hq9keM4ZfjCDuDXxaHptpv9N24JhgBZmUG5q60iLgUo=
|
||||
modernc.org/ccgo/v3 v3.12.11/go.mod h1:0jVcmyDwDKDGWbcrzQ+xwJjbhZruHtouiBEvDfoIsdg=
|
||||
modernc.org/ccgo/v3 v3.12.14/go.mod h1:GhTu1k0YCpJSuWwtRAEHAol5W7g1/RRfS4/9hc9vF5I=
|
||||
modernc.org/ccgo/v3 v3.12.18/go.mod h1:jvg/xVdWWmZACSgOiAhpWpwHWylbJaSzayCqNOJKIhs=
|
||||
modernc.org/ccgo/v3 v3.12.20/go.mod h1:aKEdssiu7gVgSy/jjMastnv/q6wWGRbszbheXgWRHc8=
|
||||
modernc.org/ccgo/v3 v3.12.21/go.mod h1:ydgg2tEprnyMn159ZO/N4pLBqpL7NOkJ88GT5zNU2dE=
|
||||
modernc.org/ccgo/v3 v3.12.22/go.mod h1:nyDVFMmMWhMsgQw+5JH6B6o4MnZ+UQNw1pp52XYFPRk=
|
||||
modernc.org/ccgo/v3 v3.12.25/go.mod h1:UaLyWI26TwyIT4+ZFNjkyTbsPsY3plAEB6E7L/vZV3w=
|
||||
modernc.org/ccgo/v3 v3.12.29/go.mod h1:FXVjG7YLf9FetsS2OOYcwNhcdOLGt8S9bQ48+OP75cE=
|
||||
modernc.org/ccgo/v3 v3.12.36/go.mod h1:uP3/Fiezp/Ga8onfvMLpREq+KUjUmYMxXPO8tETHtA8=
|
||||
modernc.org/ccgo/v3 v3.12.38/go.mod h1:93O0G7baRST1vNj4wnZ49b1kLxt0xCW5Hsa2qRaZPqc=
|
||||
modernc.org/ccgo/v3 v3.12.43/go.mod h1:k+DqGXd3o7W+inNujK15S5ZYuPoWYLpF5PYougCmthU=
|
||||
modernc.org/ccgo/v3 v3.12.46/go.mod h1:UZe6EvMSqOxaJ4sznY7b23/k13R8XNlyWsO5bAmSgOE=
|
||||
modernc.org/ccgo/v3 v3.12.47/go.mod h1:m8d6p0zNps187fhBwzY/ii6gxfjob1VxWb919Nk1HUk=
|
||||
modernc.org/ccgo/v3 v3.12.50/go.mod h1:bu9YIwtg+HXQxBhsRDE+cJjQRuINuT9PUK4orOco/JI=
|
||||
modernc.org/ccgo/v3 v3.12.51/go.mod h1:gaIIlx4YpmGO2bLye04/yeblmvWEmE4BBBls4aJXFiE=
|
||||
modernc.org/ccgo/v3 v3.12.53/go.mod h1:8xWGGTFkdFEWBEsUmi+DBjwu/WLy3SSOrqEmKUjMeEg=
|
||||
modernc.org/ccgo/v3 v3.12.54/go.mod h1:yANKFTm9llTFVX1FqNKHE0aMcQb1fuPJx6p8AcUx+74=
|
||||
modernc.org/ccgo/v3 v3.12.55/go.mod h1:rsXiIyJi9psOwiBkplOaHye5L4MOOaCjHg1Fxkj7IeU=
|
||||
modernc.org/ccgo/v3 v3.12.56/go.mod h1:ljeFks3faDseCkr60JMpeDb2GSO3TKAmrzm7q9YOcMU=
|
||||
modernc.org/ccgo/v3 v3.12.57/go.mod h1:hNSF4DNVgBl8wYHpMvPqQWDQx8luqxDnNGCMM4NFNMc=
|
||||
modernc.org/ccgo/v3 v3.12.60/go.mod h1:k/Nn0zdO1xHVWjPYVshDeWKqbRWIfif5dtsIOCUVMqM=
|
||||
modernc.org/ccgo/v3 v3.12.66/go.mod h1:jUuxlCFZTUZLMV08s7B1ekHX5+LIAurKTTaugUr/EhQ=
|
||||
modernc.org/ccgo/v3 v3.12.67/go.mod h1:Bll3KwKvGROizP2Xj17GEGOTrlvB1XcVaBrC90ORO84=
|
||||
modernc.org/ccgo/v3 v3.12.73/go.mod h1:hngkB+nUUqzOf3iqsM48Gf1FZhY599qzVg1iX+BT3cQ=
|
||||
modernc.org/ccgo/v3 v3.12.81/go.mod h1:p2A1duHoBBg1mFtYvnhAnQyI6vL0uw5PGYLSIgF6rYY=
|
||||
modernc.org/ccgo/v3 v3.12.84/go.mod h1:ApbflUfa5BKadjHynCficldU1ghjen84tuM5jRynB7w=
|
||||
modernc.org/ccgo/v3 v3.12.86/go.mod h1:dN7S26DLTgVSni1PVA3KxxHTcykyDurf3OgUzNqTSrU=
|
||||
modernc.org/ccgo/v3 v3.12.90/go.mod h1:obhSc3CdivCRpYZmrvO88TXlW0NvoSVvdh/ccRjJYko=
|
||||
modernc.org/ccgo/v3 v3.12.92/go.mod h1:5yDdN7ti9KWPi5bRVWPl8UNhpEAtCjuEE7ayQnzzqHA=
|
||||
modernc.org/ccgo/v3 v3.13.1/go.mod h1:aBYVOUfIlcSnrsRVU8VRS35y2DIfpgkmVkYZ0tpIXi4=
|
||||
modernc.org/ccgo/v3 v3.15.1/go.mod h1:md59wBwDT2LznX/OTCPoVS6KIsdRgY8xqQwBV+hkTH0=
|
||||
modernc.org/ccgo/v3 v3.15.9/go.mod h1:md59wBwDT2LznX/OTCPoVS6KIsdRgY8xqQwBV+hkTH0=
|
||||
modernc.org/ccgo/v3 v3.15.10/go.mod h1:wQKxoFn0ynxMuCLfFD09c8XPUCc8obfchoVR9Cn0fI8=
|
||||
modernc.org/ccgo/v3 v3.15.12/go.mod h1:VFePOWoCd8uDGRJpq/zfJ29D0EVzMSyID8LCMWYbX6I=
|
||||
modernc.org/ccgo/v3 v3.15.13 h1:hqlCzNJTXLrhS70y1PqWckrF9x1btSQRC7JFuQcBg5c=
|
||||
modernc.org/ccgo/v3 v3.15.13/go.mod h1:QHtvdpeODlXjdK3tsbpyK+7U9JV4PQsrPGIbtmc0KfY=
|
||||
modernc.org/ccorpus v1.11.1/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ=
|
||||
modernc.org/ccorpus v1.11.4 h1:YOmQBBzE8GC/puUx76D5j/gJYIZQsydrh6VMJVfXF0M=
|
||||
modernc.org/ccorpus v1.11.4/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ=
|
||||
modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM=
|
||||
modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM=
|
||||
modernc.org/libc v1.9.8/go.mod h1:U1eq8YWr/Kc1RWCMFUWEdkTg8OTcfLw2kY8EDwl039w=
|
||||
modernc.org/libc v1.9.11/go.mod h1:NyF3tsA5ArIjJ83XB0JlqhjTabTCHm9aX4XMPHyQn0Q=
|
||||
modernc.org/libc v1.11.0/go.mod h1:2lOfPmj7cz+g1MrPNmX65QCzVxgNq2C5o0jdLY2gAYg=
|
||||
modernc.org/libc v1.11.2/go.mod h1:ioIyrl3ETkugDO3SGZ+6EOKvlP3zSOycUETe4XM4n8M=
|
||||
modernc.org/libc v1.11.5/go.mod h1:k3HDCP95A6U111Q5TmG3nAyUcp3kR5YFZTeDS9v8vSU=
|
||||
modernc.org/libc v1.11.6/go.mod h1:ddqmzR6p5i4jIGK1d/EiSw97LBcE3dK24QEwCFvgNgE=
|
||||
modernc.org/libc v1.11.11/go.mod h1:lXEp9QOOk4qAYOtL3BmMve99S5Owz7Qyowzvg6LiZso=
|
||||
modernc.org/libc v1.11.13/go.mod h1:ZYawJWlXIzXy2Pzghaf7YfM8OKacP3eZQI81PDLFdY8=
|
||||
modernc.org/libc v1.11.16/go.mod h1:+DJquzYi+DMRUtWI1YNxrlQO6TcA5+dRRiq8HWBWRC8=
|
||||
modernc.org/libc v1.11.19/go.mod h1:e0dgEame6mkydy19KKaVPBeEnyJB4LGNb0bBH1EtQ3I=
|
||||
modernc.org/libc v1.11.24/go.mod h1:FOSzE0UwookyT1TtCJrRkvsOrX2k38HoInhw+cSCUGk=
|
||||
modernc.org/libc v1.11.26/go.mod h1:SFjnYi9OSd2W7f4ct622o/PAYqk7KHv6GS8NZULIjKY=
|
||||
modernc.org/libc v1.11.27/go.mod h1:zmWm6kcFXt/jpzeCgfvUNswM0qke8qVwxqZrnddlDiE=
|
||||
modernc.org/libc v1.11.28/go.mod h1:Ii4V0fTFcbq3qrv3CNn+OGHAvzqMBvC7dBNyC4vHZlg=
|
||||
modernc.org/libc v1.11.31/go.mod h1:FpBncUkEAtopRNJj8aRo29qUiyx5AvAlAxzlx9GNaVM=
|
||||
modernc.org/libc v1.11.34/go.mod h1:+Tzc4hnb1iaX/SKAutJmfzES6awxfU1BPvrrJO0pYLg=
|
||||
modernc.org/libc v1.11.37/go.mod h1:dCQebOwoO1046yTrfUE5nX1f3YpGZQKNcITUYWlrAWo=
|
||||
modernc.org/libc v1.11.39/go.mod h1:mV8lJMo2S5A31uD0k1cMu7vrJbSA3J3waQJxpV4iqx8=
|
||||
modernc.org/libc v1.11.42/go.mod h1:yzrLDU+sSjLE+D4bIhS7q1L5UwXDOw99PLSX0BlZvSQ=
|
||||
modernc.org/libc v1.11.44/go.mod h1:KFq33jsma7F5WXiYelU8quMJasCCTnHK0mkri4yPHgA=
|
||||
modernc.org/libc v1.11.45/go.mod h1:Y192orvfVQQYFzCNsn+Xt0Hxt4DiO4USpLNXBlXg/tM=
|
||||
modernc.org/libc v1.11.47/go.mod h1:tPkE4PzCTW27E6AIKIR5IwHAQKCAtudEIeAV1/SiyBg=
|
||||
modernc.org/libc v1.11.49/go.mod h1:9JrJuK5WTtoTWIFQ7QjX2Mb/bagYdZdscI3xrvHbXjE=
|
||||
modernc.org/libc v1.11.51/go.mod h1:R9I8u9TS+meaWLdbfQhq2kFknTW0O3aw3kEMqDDxMaM=
|
||||
modernc.org/libc v1.11.53/go.mod h1:5ip5vWYPAoMulkQ5XlSJTy12Sz5U6blOQiYasilVPsU=
|
||||
modernc.org/libc v1.11.54/go.mod h1:S/FVnskbzVUrjfBqlGFIPA5m7UwB3n9fojHhCNfSsnw=
|
||||
modernc.org/libc v1.11.55/go.mod h1:j2A5YBRm6HjNkoSs/fzZrSxCuwWqcMYTDPLNx0URn3M=
|
||||
modernc.org/libc v1.11.56/go.mod h1:pakHkg5JdMLt2OgRadpPOTnyRXm/uzu+Yyg/LSLdi18=
|
||||
modernc.org/libc v1.11.58/go.mod h1:ns94Rxv0OWyoQrDqMFfWwka2BcaF6/61CqJRK9LP7S8=
|
||||
modernc.org/libc v1.11.71/go.mod h1:DUOmMYe+IvKi9n6Mycyx3DbjfzSKrdr/0Vgt3j7P5gw=
|
||||
modernc.org/libc v1.11.75/go.mod h1:dGRVugT6edz361wmD9gk6ax1AbDSe0x5vji0dGJiPT0=
|
||||
modernc.org/libc v1.11.82/go.mod h1:NF+Ek1BOl2jeC7lw3a7Jj5PWyHPwWD4aq3wVKxqV1fI=
|
||||
modernc.org/libc v1.11.86/go.mod h1:ePuYgoQLmvxdNT06RpGnaDKJmDNEkV7ZPKI2jnsvZoE=
|
||||
modernc.org/libc v1.11.87/go.mod h1:Qvd5iXTeLhI5PS0XSyqMY99282y+3euapQFxM7jYnpY=
|
||||
modernc.org/libc v1.11.88/go.mod h1:h3oIVe8dxmTcchcFuCcJ4nAWaoiwzKCdv82MM0oiIdQ=
|
||||
modernc.org/libc v1.11.98/go.mod h1:ynK5sbjsU77AP+nn61+k+wxUGRx9rOFcIqWYYMaDZ4c=
|
||||
modernc.org/libc v1.11.101/go.mod h1:wLLYgEiY2D17NbBOEp+mIJJJBGSiy7fLL4ZrGGZ+8jI=
|
||||
modernc.org/libc v1.12.0/go.mod h1:2MH3DaF/gCU8i/UBiVE1VFRos4o523M7zipmwH8SIgQ=
|
||||
modernc.org/libc v1.14.1/go.mod h1:npFeGWjmZTjFeWALQLrvklVmAxv4m80jnG3+xI8FdJk=
|
||||
modernc.org/libc v1.14.2/go.mod h1:MX1GBLnRLNdvmK9azU9LCxZ5lMyhrbEMK8rG3X/Fe34=
|
||||
modernc.org/libc v1.14.3/go.mod h1:GPIvQVOVPizzlqyRX3l756/3ppsAgg1QgPxjr5Q4agQ=
|
||||
modernc.org/libc v1.14.5 h1:DAHvwGoVRDZs5iJXnX9RJrgXSsorupCWmJ2ac964Owk=
|
||||
modernc.org/libc v1.14.5/go.mod h1:2PJHINagVxO4QW/5OQdRrvMYo+bm5ClpUFfyXCYl9ak=
|
||||
modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
|
||||
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
|
||||
modernc.org/mathutil v1.4.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
|
||||
modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8=
|
||||
modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
|
||||
modernc.org/memory v1.0.4/go.mod h1:nV2OApxradM3/OVbs2/0OsP6nPfakXpi50C7dcoHXlc=
|
||||
modernc.org/memory v1.0.5 h1:XRch8trV7GgvTec2i7jc33YlUI0RKVDBvZ5eZ5m8y14=
|
||||
modernc.org/memory v1.0.5/go.mod h1:B7OYswTRnfGg+4tDH1t1OeUNnsy2viGTdME4tzd+IjM=
|
||||
modernc.org/opt v0.1.1 h1:/0RX92k9vwVeDXj+Xn23DKp2VJubL7k8qNffND6qn3A=
|
||||
modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
|
||||
modernc.org/sqlite v1.14.6 h1:Jt5P3k80EtDBWaq1beAxnWW+5MdHXbZITujnRS7+zWg=
|
||||
modernc.org/sqlite v1.14.6/go.mod h1:yiCvMv3HblGmzENNIaNtFhfaNIwcla4u2JQEwJPzfEc=
|
||||
modernc.org/strutil v1.1.1 h1:xv+J1BXY3Opl2ALrBwyfEikFAj8pmqcpnfmuwUwcozs=
|
||||
modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw=
|
||||
modernc.org/tcl v1.11.0 h1:B/zzEYjINeaki38KcIqdQRQx7W3WE7TkrlTwGnbm2II=
|
||||
modernc.org/tcl v1.11.0/go.mod h1:zsTUpbQ+NxQEjOjCUlImDLPv1sG8Ww0qp66ZvyOxCgw=
|
||||
modernc.org/token v1.0.0 h1:a0jaWiNMDhDUtqOj09wvjWWAqd3q7WpBulmL9H2egsk=
|
||||
modernc.org/token v1.0.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
||||
modernc.org/z v1.3.0 h1:4RWULo1Nvaq5ZBhbLe74u8p6tV4Mmm0ZrPBXYPm/xjM=
|
||||
modernc.org/z v1.3.0/go.mod h1:+mvgLH814oDjtATDdT3rs84JnUIpkvAF5B8AVkNlE2g=
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -44,13 +43,20 @@ var (
|
||||
type Listener struct {
|
||||
doneWg *sync.WaitGroup
|
||||
client FlamencoClient
|
||||
buffer UpstreamBuffer
|
||||
}
|
||||
|
||||
// UpstreamBuffer can buffer up-stream task updates, in case the Manager cannot be reached.
|
||||
type UpstreamBuffer interface {
|
||||
SendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error
|
||||
}
|
||||
|
||||
// NewListener creates a new Listener that will send updates to the API client.
|
||||
func NewListener(client FlamencoClient) *Listener {
|
||||
func NewListener(client FlamencoClient, buffer UpstreamBuffer) *Listener {
|
||||
l := &Listener{
|
||||
doneWg: new(sync.WaitGroup),
|
||||
client: client,
|
||||
buffer: buffer,
|
||||
}
|
||||
l.doneWg.Add(1)
|
||||
return l
|
||||
@ -66,7 +72,7 @@ func (l *Listener) Run(ctx context.Context) {
|
||||
case <-time.After(10 * time.Second):
|
||||
// This is just a dummy thing.
|
||||
}
|
||||
log.Debug().Msg("listener is still running")
|
||||
log.Trace().Msg("listener is still running")
|
||||
}
|
||||
|
||||
log.Debug().Msg("listener shutting down")
|
||||
@ -120,17 +126,5 @@ func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLoca
|
||||
}
|
||||
|
||||
func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {
|
||||
resp, err := l.client.TaskUpdateWithResponse(ctx, string(taskID), update)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error sending task update: %w", err)
|
||||
}
|
||||
|
||||
switch resp.StatusCode() {
|
||||
case http.StatusNoContent:
|
||||
return nil
|
||||
case http.StatusConflict:
|
||||
return ErrTaskReassigned
|
||||
default:
|
||||
return fmt.Errorf("unknown error from Manager, code %d: %v", resp.StatusCode(), resp.JSONDefault)
|
||||
}
|
||||
return l.buffer.SendTaskUpdate(ctx, taskID, update)
|
||||
}
|
||||
|
352
internal/worker/upstream_buffer.go
Normal file
352
internal/worker/upstream_buffer.go
Normal file
@ -0,0 +1,352 @@
|
||||
package worker
|
||||
|
||||
/* ***** BEGIN GPL LICENSE BLOCK *****
|
||||
*
|
||||
* Original Code Copyright (C) 2022 Blender Foundation.
|
||||
*
|
||||
* This file is part of Flamenco.
|
||||
*
|
||||
* Flamenco is free software: you can redistribute it and/or modify it under
|
||||
* the terms of the GNU General Public License as published by the Free Software
|
||||
* Foundation, either version 3 of the License, or (at your option) any later
|
||||
* version.
|
||||
*
|
||||
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
* ***** END GPL LICENSE BLOCK ***** */
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
// TODO: pull the SQLite stuff out of this file into a more global place, so
|
||||
// that other areas of Flamenco Worker can also use it.
|
||||
|
||||
// UpstreamBufferDB implements the UpstreamBuffer interface using a database as backend.
|
||||
type UpstreamBufferDB struct {
|
||||
db *sql.DB
|
||||
dbMutex *sync.Mutex // Protects from "database locked" errors
|
||||
|
||||
client FlamencoClient
|
||||
clock TimeService
|
||||
flushInterval time.Duration
|
||||
|
||||
done chan struct{}
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
const defaultUpstreamFlushInterval = 30 * time.Second
|
||||
|
||||
var _ UpstreamBuffer = (*UpstreamBufferDB)(nil)
|
||||
|
||||
func NewUpstreamBuffer(client FlamencoClient, clock TimeService) (*UpstreamBufferDB, error) {
|
||||
ub := UpstreamBufferDB{
|
||||
db: nil,
|
||||
dbMutex: new(sync.Mutex),
|
||||
|
||||
client: client,
|
||||
clock: clock,
|
||||
flushInterval: defaultUpstreamFlushInterval,
|
||||
|
||||
done: make(chan struct{}),
|
||||
wg: new(sync.WaitGroup),
|
||||
}
|
||||
return &ub, nil
|
||||
}
|
||||
|
||||
// OpenDB opens the database. Must be called once before using.
|
||||
func (ub *UpstreamBufferDB) OpenDB(ctx context.Context, databaseFilename string) error {
|
||||
if ub.db != nil {
|
||||
return errors.New("upstream buffer database already opened")
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite", databaseFilename)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening %s: %w", databaseFilename, err)
|
||||
}
|
||||
|
||||
if err := db.PingContext(ctx); err != nil {
|
||||
return fmt.Errorf("error accessing %s: %w", databaseFilename, err)
|
||||
}
|
||||
|
||||
ub.db = db
|
||||
|
||||
if err := ub.prepareDatabase(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ub.wg.Add(1)
|
||||
go ub.periodicFlushLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {
|
||||
ub.dbMutex.Lock()
|
||||
defer ub.dbMutex.Unlock()
|
||||
|
||||
queueSize, err := ub.queueSize(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to determine upstream queue size: %w", err)
|
||||
}
|
||||
|
||||
// Immediately queue if there is already stuff queued, to ensure the order of updates is maintained.
|
||||
if queueSize > 0 {
|
||||
log.Debug().Int("queueSize", queueSize).
|
||||
Msg("task updates already queued, immediately queueing new update")
|
||||
return ub.queueTaskUpdate(ctx, taskID, update)
|
||||
}
|
||||
|
||||
// Try to deliver the update.
|
||||
resp, err := ub.client.TaskUpdateWithResponse(ctx, taskID, update)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Str("task", taskID).
|
||||
Msg("error communicating with Manager, going to queue task update for sending later")
|
||||
return ub.queueTaskUpdate(ctx, taskID, update)
|
||||
}
|
||||
|
||||
// The Manager responded, so no need to queue this update, even when there was an error.
|
||||
switch resp.StatusCode() {
|
||||
case http.StatusNoContent:
|
||||
return nil
|
||||
case http.StatusConflict:
|
||||
return ErrTaskReassigned
|
||||
default:
|
||||
return fmt.Errorf("unknown error from Manager, code %d: %v",
|
||||
resp.StatusCode(), resp.JSONDefault)
|
||||
}
|
||||
}
|
||||
|
||||
// Close releases the database. It does not try to flush any pending items.
|
||||
func (ub *UpstreamBufferDB) Close(ctx context.Context) error {
|
||||
if ub.db == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop the periodic flush loop.
|
||||
close(ub.done)
|
||||
ub.wg.Wait()
|
||||
|
||||
// Close the database.
|
||||
return ub.db.Close()
|
||||
}
|
||||
|
||||
// prepareDatabase creates the database schema, if necessary.
|
||||
func (ub *UpstreamBufferDB) prepareDatabase(ctx context.Context) error {
|
||||
ub.dbMutex.Lock()
|
||||
defer ub.dbMutex.Unlock()
|
||||
|
||||
tx, err := ub.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("beginning database transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
stmt := `CREATE TABLE IF NOT EXISTS task_update_queue(task_id VARCHAR(36), payload BLOB)`
|
||||
log.Debug().Str("sql", stmt).Msg("creating database table")
|
||||
|
||||
if _, err := tx.ExecContext(ctx, stmt); err != nil {
|
||||
return fmt.Errorf("creating database table: %w", err)
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return fmt.Errorf("commiting creation of database table: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ub *UpstreamBufferDB) queueSize(ctx context.Context) (int, error) {
|
||||
if ub.db == nil {
|
||||
log.Panic().Msg("no database opened, unable to inspect upstream queue")
|
||||
}
|
||||
|
||||
var queueSize int
|
||||
|
||||
err := ub.db.
|
||||
QueryRowContext(ctx, "SELECT count(*) FROM task_update_queue").
|
||||
Scan(&queueSize)
|
||||
|
||||
switch {
|
||||
case err == sql.ErrNoRows:
|
||||
return 0, nil
|
||||
case err != nil:
|
||||
return 0, err
|
||||
default:
|
||||
return queueSize, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (ub *UpstreamBufferDB) queueTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {
|
||||
if ub.db == nil {
|
||||
log.Panic().Msg("no database opened, unable to queue task updates")
|
||||
}
|
||||
|
||||
tx, err := ub.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("beginning database transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
blob, err := json.Marshal(update)
|
||||
if err != nil {
|
||||
return fmt.Errorf("converting task update to JSON: %w", err)
|
||||
}
|
||||
|
||||
stmt := `INSERT INTO task_update_queue (task_id, payload) VALUES (?, ?)`
|
||||
log.Debug().Str("sql", stmt).Str("task", taskID).Msg("inserting task update")
|
||||
|
||||
if _, err := tx.ExecContext(ctx, stmt, taskID, blob); err != nil {
|
||||
return fmt.Errorf("queueing task update: %w", err)
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return fmt.Errorf("committing queued task update: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ub *UpstreamBufferDB) Flush(ctx context.Context) error {
|
||||
ub.dbMutex.Lock()
|
||||
defer ub.dbMutex.Unlock()
|
||||
|
||||
if ub.db == nil {
|
||||
log.Panic().Msg("no database opened, unable to queue task updates")
|
||||
}
|
||||
|
||||
// See if we need to flush at all.
|
||||
queueSize, err := ub.queueSize(ctx)
|
||||
switch {
|
||||
case err != nil:
|
||||
return fmt.Errorf("unable to determine queue size: %w", err)
|
||||
case queueSize == 0:
|
||||
log.Debug().Msg("task update queue empty, nothing to flush")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Keep flushing until the queue is empty or there is an error.
|
||||
var done bool
|
||||
for !done {
|
||||
done, err = ub.flushFirstItem(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err error) {
|
||||
tx, err := ub.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("beginning database transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
stmt := `SELECT rowid, task_id, payload FROM task_update_queue ORDER BY rowid LIMIT 1`
|
||||
log.Trace().Str("sql", stmt).Msg("fetching queued task updates")
|
||||
|
||||
var rowID int64
|
||||
var taskID string
|
||||
var blob []byte
|
||||
|
||||
err = tx.QueryRowContext(ctx, stmt).Scan(&rowID, &taskID, &blob)
|
||||
switch {
|
||||
case err == sql.ErrNoRows:
|
||||
// Flush operation is done.
|
||||
log.Debug().Msg("task update queue empty")
|
||||
return true, nil
|
||||
case err != nil:
|
||||
return false, fmt.Errorf("querying task update queue: %w", err)
|
||||
}
|
||||
|
||||
logger := log.With().Str("task", taskID).Logger()
|
||||
|
||||
var update api.TaskUpdateJSONRequestBody
|
||||
if err := json.Unmarshal(blob, &update); err != nil {
|
||||
// If we can't unmarshal the queued task update, there is little else to do
|
||||
// than to discard it and ignore it ever happened.
|
||||
logger.Warn().Err(err).
|
||||
Msg("unable to unmarshal queued task update, discarding")
|
||||
if err := ub.discardRow(ctx, tx, rowID); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return false, tx.Commit()
|
||||
}
|
||||
|
||||
// actually attempt delivery.
|
||||
resp, err := ub.client.TaskUpdateWithResponse(ctx, taskID, update)
|
||||
if err != nil {
|
||||
logger.Info().Err(err).Msg("communication with Manager still problematic")
|
||||
return true, err
|
||||
}
|
||||
|
||||
// Regardless of the response, there is little else to do but to discard the
|
||||
// update from the queue.
|
||||
switch resp.StatusCode() {
|
||||
case http.StatusNoContent:
|
||||
logger.Debug().Msg("queued task updated accepted by Manager")
|
||||
case http.StatusConflict:
|
||||
logger.Warn().Msg("queued task update discarded by Manager, task was already reassigned to other Worker")
|
||||
default:
|
||||
logger.Warn().
|
||||
Int("statusCode", resp.StatusCode()).
|
||||
Interface("response", resp.JSONDefault).
|
||||
Msg("queued task update discarded by Manager, unknown reason")
|
||||
}
|
||||
|
||||
if err := ub.discardRow(ctx, tx, rowID); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return false, tx.Commit()
|
||||
}
|
||||
|
||||
func (ub *UpstreamBufferDB) discardRow(ctx context.Context, tx *sql.Tx, rowID int64) error {
|
||||
stmt := `DELETE FROM task_update_queue WHERE rowid = ?`
|
||||
log.Trace().Str("sql", stmt).Int64("rowID", rowID).Msg("un-queueing task update")
|
||||
|
||||
_, err := tx.ExecContext(ctx, stmt, rowID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error un-queueing task update: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ub *UpstreamBufferDB) periodicFlushLoop() {
|
||||
defer ub.wg.Done()
|
||||
defer log.Debug().Msg("periodic task update flush loop stopping")
|
||||
log.Debug().Msg("periodic task update flush loop starting")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ub.done:
|
||||
return
|
||||
case <-ub.clock.After(ub.flushInterval):
|
||||
log.Trace().Msg("task upstream queue: periodic flush")
|
||||
err := ub.Flush(ctx)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("error flushing task update queue")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
121
internal/worker/upstream_buffer_test.go
Normal file
121
internal/worker/upstream_buffer_test.go
Normal file
@ -0,0 +1,121 @@
|
||||
package worker
|
||||
|
||||
/* ***** BEGIN GPL LICENSE BLOCK *****
|
||||
*
|
||||
* Original Code Copyright (C) 2022 Blender Foundation.
|
||||
*
|
||||
* This file is part of Flamenco.
|
||||
*
|
||||
* Flamenco is free software: you can redistribute it and/or modify it under
|
||||
* the terms of the GNU General Public License as published by the Free Software
|
||||
* Foundation, either version 3 of the License, or (at your option) any later
|
||||
* version.
|
||||
*
|
||||
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
* ***** END GPL LICENSE BLOCK ***** */
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"gitlab.com/blender/flamenco-ng-poc/internal/worker/mocks"
|
||||
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
const testBufferDBFilename = "test-flamenco-worker.db"
|
||||
|
||||
type UpstreamBufferDBMocks struct {
|
||||
client *mocks.MockFlamencoClient
|
||||
clock *clock.Mock
|
||||
}
|
||||
|
||||
func mockUpstreamBufferDB(t *testing.T, mockCtrl *gomock.Controller) (*UpstreamBufferDB, *UpstreamBufferDBMocks) {
|
||||
mocks := UpstreamBufferDBMocks{
|
||||
client: mocks.NewMockFlamencoClient(mockCtrl),
|
||||
clock: clock.NewMock(),
|
||||
}
|
||||
|
||||
// Always start tests with a fresh database.
|
||||
os.Remove(testBufferDBFilename)
|
||||
|
||||
ub, err := NewUpstreamBuffer(mocks.client, mocks.clock)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create upstream buffer: %v", err)
|
||||
}
|
||||
|
||||
return ub, &mocks
|
||||
}
|
||||
|
||||
func TestUpstreamBufferCloseUnopened(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
ub, _ := mockUpstreamBufferDB(t, mockCtrl)
|
||||
err := ub.Close(context.Background())
|
||||
assert.NoError(t, err, "Closing without opening should be OK")
|
||||
}
|
||||
|
||||
func TestUpstreamBufferManagerUnavailable(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
ub, mocks := mockUpstreamBufferDB(t, mockCtrl)
|
||||
assert.NoError(t, ub.OpenDB(ctx, testBufferDBFilename))
|
||||
|
||||
// Send a task update without Manager available.
|
||||
taskID := "3960dec4-978e-40ab-bede-bfa6428c6ebc"
|
||||
update := api.TaskUpdateJSONRequestBody{
|
||||
Activity: ptr("Testing da ünits"),
|
||||
Log: ptr("¿Unicode logging should work?"),
|
||||
TaskStatus: ptr(api.TaskStatusActive),
|
||||
}
|
||||
|
||||
updateError := errors.New("mock manager unavailable")
|
||||
managerCallFail := mocks.client.EXPECT().
|
||||
TaskUpdateWithResponse(ctx, taskID, update).
|
||||
Return(nil, updateError)
|
||||
|
||||
err := ub.SendTaskUpdate(ctx, taskID, update)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Check the queue size, it should have an item queued.
|
||||
queueSize, err := ub.queueSize(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, queueSize)
|
||||
|
||||
// Wait for the flushing with Manager available.
|
||||
resp := &api.TaskUpdateResponse{}
|
||||
mocks.client.EXPECT().
|
||||
TaskUpdateWithResponse(ctx, taskID, update).
|
||||
Return(resp, nil).
|
||||
After(managerCallFail)
|
||||
|
||||
// Only add exactly the flush interval, as that maximises the chances of
|
||||
// getting conflicts on the database level (if we didn't have the
|
||||
// database-protection mutex).
|
||||
mocks.clock.Add(defaultUpstreamFlushInterval)
|
||||
|
||||
// Queue should be empty now.
|
||||
ub.dbMutex.Lock()
|
||||
queueSize, err = ub.queueSize(ctx)
|
||||
ub.dbMutex.Unlock()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, queueSize)
|
||||
|
||||
assert.NoError(t, ub.Close(ctx))
|
||||
}
|
Loading…
Reference in New Issue
Block a user