Improve testing and try to fix MySQL hanging (#32515)
By some CI fine tunes (`run tests`), SQLite & MSSQL could complete in about 12~13 minutes (before > 14), MySQL could complete in 18 minutes (before: about 23 or even > 30) Major changes: 1. use tmpfs for MySQL storage 1. run `make test-mysql` instead of `make integration-test-coverage` because the code coverage is not really used at the moment. 1. refactor testlogger to make it more reliable and be able to report stuck stacktrace 1. do not requeue failed items when a queue is being flushed (failed items would keep failing and make flush uncompleted) 1. reduce the file sizes for testing 1. use math ChaCha20 random data instead of crypot/rand (for testing purpose only) 1. no need to `DeleteRepository` in `TestLinguist` 1. other related refactoring to make code easier to maintain
This commit is contained in:
@ -86,6 +86,8 @@ type ColoredValue struct {
|
||||
colors []ColorAttribute
|
||||
}
|
||||
|
||||
var _ fmt.Formatter = (*ColoredValue)(nil)
|
||||
|
||||
func (c *ColoredValue) Format(f fmt.State, verb rune) {
|
||||
_, _ = f.Write(ColorBytes(c.colors...))
|
||||
s := fmt.Sprintf(fmt.FormatString(f, verb), c.v)
|
||||
@ -93,6 +95,10 @@ func (c *ColoredValue) Format(f fmt.State, verb rune) {
|
||||
_, _ = f.Write(resetBytes)
|
||||
}
|
||||
|
||||
func (c *ColoredValue) Value() any {
|
||||
return c.v
|
||||
}
|
||||
|
||||
func NewColoredValue(v any, color ...ColorAttribute) *ColoredValue {
|
||||
return &ColoredValue{v: v, colors: color}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ var (
|
||||
)
|
||||
|
||||
func init() {
|
||||
unhandledItemRequeueDuration.Store(int64(5 * time.Second))
|
||||
unhandledItemRequeueDuration.Store(int64(time.Second))
|
||||
}
|
||||
|
||||
// workerGroup is a group of workers to work with a WorkerPoolQueue
|
||||
@ -104,7 +104,12 @@ func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
|
||||
// if none of the items were handled, it should back-off for a few seconds
|
||||
// in this case the handler (eg: document indexer) may have encountered some errors/failures
|
||||
if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 {
|
||||
if q.isFlushing.Load() {
|
||||
return // do not requeue items when flushing, since all items failed, requeue them will continue failing.
|
||||
}
|
||||
log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch))
|
||||
// TODO: ideally it shouldn't "sleep" here (blocks the worker, then blocks flush).
|
||||
// It could debounce the requeue operation, and try to requeue the items in the future.
|
||||
select {
|
||||
case <-q.ctxRun.Done():
|
||||
case <-time.After(time.Duration(unhandledItemRequeueDuration.Load())):
|
||||
@ -193,6 +198,9 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
|
||||
// doFlush flushes the queue: it tries to read all items from the queue and handles them.
|
||||
// It is for testing purpose only. It's not designed to work for a cluster.
|
||||
func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) {
|
||||
q.isFlushing.Store(true)
|
||||
defer q.isFlushing.Store(false)
|
||||
|
||||
log.Debug("Queue %q starts flushing", q.GetName())
|
||||
defer log.Debug("Queue %q finishes flushing", q.GetName())
|
||||
|
||||
@ -236,6 +244,9 @@ loop:
|
||||
emptyCounter := 0
|
||||
for {
|
||||
select {
|
||||
case <-q.ctxRun.Done():
|
||||
log.Debug("Queue %q is shutting down", q.GetName())
|
||||
return
|
||||
case data, dataOk := <-wg.popItemChan:
|
||||
if !dataOk {
|
||||
return
|
||||
@ -251,9 +262,6 @@ loop:
|
||||
log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err)
|
||||
}
|
||||
return
|
||||
case <-q.ctxRun.Done():
|
||||
log.Debug("Queue %q is shutting down", q.GetName())
|
||||
return
|
||||
case <-time.After(20 * time.Millisecond):
|
||||
// There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables.
|
||||
// If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance.
|
||||
@ -331,6 +339,15 @@ func (q *WorkerPoolQueue[T]) doRun() {
|
||||
var batchDispatchC <-chan time.Time = infiniteTimerC
|
||||
for {
|
||||
select {
|
||||
case flush := <-q.flushChan:
|
||||
// before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
|
||||
// after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
|
||||
// since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
|
||||
q.doDispatchBatchToWorker(wg, skipFlushChan)
|
||||
q.doFlush(wg, flush)
|
||||
case <-q.ctxRun.Done():
|
||||
log.Debug("Queue %q is shutting down", q.GetName())
|
||||
return
|
||||
case data, dataOk := <-wg.popItemChan:
|
||||
if !dataOk {
|
||||
return
|
||||
@ -349,20 +366,11 @@ func (q *WorkerPoolQueue[T]) doRun() {
|
||||
case <-batchDispatchC:
|
||||
batchDispatchC = infiniteTimerC
|
||||
q.doDispatchBatchToWorker(wg, q.flushChan)
|
||||
case flush := <-q.flushChan:
|
||||
// before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
|
||||
// after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
|
||||
// since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
|
||||
q.doDispatchBatchToWorker(wg, skipFlushChan)
|
||||
q.doFlush(wg, flush)
|
||||
case err := <-wg.popItemErr:
|
||||
if !q.isCtxRunCanceled() {
|
||||
log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err)
|
||||
}
|
||||
return
|
||||
case <-q.ctxRun.Done():
|
||||
log.Debug("Queue %q is shutting down", q.GetName())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -32,8 +32,9 @@ type WorkerPoolQueue[T any] struct {
|
||||
baseConfig *BaseConfig
|
||||
baseQueue baseQueue
|
||||
|
||||
batchChan chan []T
|
||||
flushChan chan flushType
|
||||
batchChan chan []T
|
||||
flushChan chan flushType
|
||||
isFlushing atomic.Bool
|
||||
|
||||
batchLength int
|
||||
workerNum int
|
||||
|
@ -19,9 +19,10 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
prefix string
|
||||
SlowTest = 10 * time.Second
|
||||
SlowFlush = 5 * time.Second
|
||||
prefix string
|
||||
TestTimeout = 10 * time.Minute
|
||||
TestSlowRun = 10 * time.Second
|
||||
TestSlowFlush = 1 * time.Second
|
||||
)
|
||||
|
||||
var WriterCloser = &testLoggerWriterCloser{}
|
||||
@ -89,79 +90,97 @@ func (w *testLoggerWriterCloser) Reset() {
|
||||
w.Unlock()
|
||||
}
|
||||
|
||||
// Printf takes a format and args and prints the string to os.Stdout
|
||||
func Printf(format string, args ...any) {
|
||||
if !log.CanColorStdout {
|
||||
for i := 0; i < len(args); i++ {
|
||||
if c, ok := args[i].(*log.ColoredValue); ok {
|
||||
args[i] = c.Value()
|
||||
}
|
||||
}
|
||||
}
|
||||
_, _ = fmt.Fprintf(os.Stdout, format, args...)
|
||||
}
|
||||
|
||||
// PrintCurrentTest prints the current test to os.Stdout
|
||||
func PrintCurrentTest(t testing.TB, skip ...int) func() {
|
||||
t.Helper()
|
||||
start := time.Now()
|
||||
runStart := time.Now()
|
||||
actualSkip := util.OptionalArg(skip) + 1
|
||||
_, filename, line, _ := runtime.Caller(actualSkip)
|
||||
|
||||
if log.CanColorStdout {
|
||||
_, _ = fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", fmt.Formatter(log.NewColoredValue(t.Name())), strings.TrimPrefix(filename, prefix), line)
|
||||
} else {
|
||||
_, _ = fmt.Fprintf(os.Stdout, "=== %s (%s:%d)\n", t.Name(), strings.TrimPrefix(filename, prefix), line)
|
||||
}
|
||||
Printf("=== %s (%s:%d)\n", log.NewColoredValue(t.Name()), strings.TrimPrefix(filename, prefix), line)
|
||||
|
||||
WriterCloser.pushT(t)
|
||||
return func() {
|
||||
took := time.Since(start)
|
||||
if took > SlowTest {
|
||||
if log.CanColorStdout {
|
||||
_, _ = fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgYellow)), fmt.Formatter(log.NewColoredValue(took, log.Bold, log.FgYellow)))
|
||||
} else {
|
||||
_, _ = fmt.Fprintf(os.Stdout, "+++ %s is a slow test (took %v)\n", t.Name(), took)
|
||||
timeoutChecker := time.AfterFunc(TestTimeout, func() {
|
||||
l := 128 * 1024
|
||||
var stack []byte
|
||||
for {
|
||||
stack = make([]byte, l)
|
||||
n := runtime.Stack(stack, true)
|
||||
if n <= l {
|
||||
stack = stack[:n]
|
||||
break
|
||||
}
|
||||
l = n
|
||||
}
|
||||
timer := time.AfterFunc(SlowFlush, func() {
|
||||
if log.CanColorStdout {
|
||||
_, _ = fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), SlowFlush)
|
||||
} else {
|
||||
_, _ = fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", t.Name(), SlowFlush)
|
||||
}
|
||||
Printf("!!! %s ... timeout: %v ... stacktrace:\n%s\n\n", log.NewColoredValue(t.Name(), log.Bold, log.FgRed), TestTimeout, string(stack))
|
||||
})
|
||||
return func() {
|
||||
flushStart := time.Now()
|
||||
slowFlushChecker := time.AfterFunc(TestSlowFlush, func() {
|
||||
Printf("+++ %s ... still flushing after %v ...\n", log.NewColoredValue(t.Name(), log.Bold, log.FgRed), TestSlowFlush)
|
||||
})
|
||||
if err := queue.GetManager().FlushAll(context.Background(), -1); err != nil {
|
||||
t.Errorf("Flushing queues failed with error %v", err)
|
||||
}
|
||||
timer.Stop()
|
||||
flushTook := time.Since(start) - took
|
||||
if flushTook > SlowFlush {
|
||||
if log.CanColorStdout {
|
||||
_, _ = fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", fmt.Formatter(log.NewColoredValue(t.Name(), log.Bold, log.FgRed)), fmt.Formatter(log.NewColoredValue(flushTook, log.Bold, log.FgRed)))
|
||||
} else {
|
||||
_, _ = fmt.Fprintf(os.Stdout, "+++ %s had a slow clean-up flush (took %v)\n", t.Name(), flushTook)
|
||||
}
|
||||
slowFlushChecker.Stop()
|
||||
timeoutChecker.Stop()
|
||||
|
||||
runDuration := time.Since(runStart)
|
||||
flushDuration := time.Since(flushStart)
|
||||
if runDuration > TestSlowRun {
|
||||
Printf("+++ %s is a slow test (run: %v, flush: %v)\n", log.NewColoredValue(t.Name(), log.Bold, log.FgYellow), runDuration, flushDuration)
|
||||
}
|
||||
WriterCloser.popT()
|
||||
}
|
||||
}
|
||||
|
||||
// Printf takes a format and args and prints the string to os.Stdout
|
||||
func Printf(format string, args ...any) {
|
||||
if log.CanColorStdout {
|
||||
for i := 0; i < len(args); i++ {
|
||||
args[i] = log.NewColoredValue(args[i])
|
||||
}
|
||||
}
|
||||
_, _ = fmt.Fprintf(os.Stdout, "\t"+format, args...)
|
||||
}
|
||||
|
||||
// TestLogEventWriter is a logger which will write to the testing log
|
||||
type TestLogEventWriter struct {
|
||||
*log.EventWriterBaseImpl
|
||||
}
|
||||
|
||||
// NewTestLoggerWriter creates a TestLogEventWriter as a log.LoggerProvider
|
||||
func NewTestLoggerWriter(name string, mode log.WriterMode) log.EventWriter {
|
||||
// newTestLoggerWriter creates a TestLogEventWriter as a log.LoggerProvider
|
||||
func newTestLoggerWriter(name string, mode log.WriterMode) log.EventWriter {
|
||||
w := &TestLogEventWriter{}
|
||||
w.EventWriterBaseImpl = log.NewEventWriterBase(name, "test-log-writer", mode)
|
||||
w.OutputWriteCloser = WriterCloser
|
||||
return w
|
||||
}
|
||||
|
||||
func init() {
|
||||
func Init() {
|
||||
const relFilePath = "modules/testlogger/testlogger.go"
|
||||
_, filename, _, _ := runtime.Caller(0)
|
||||
if !strings.HasSuffix(filename, relFilePath) {
|
||||
panic("source code file path doesn't match expected: " + relFilePath)
|
||||
}
|
||||
prefix = strings.TrimSuffix(filename, relFilePath)
|
||||
|
||||
log.RegisterEventWriter("test", newTestLoggerWriter)
|
||||
|
||||
duration, err := time.ParseDuration(os.Getenv("GITEA_TEST_SLOW_RUN"))
|
||||
if err == nil && duration > 0 {
|
||||
TestSlowRun = duration
|
||||
}
|
||||
|
||||
duration, err = time.ParseDuration(os.Getenv("GITEA_TEST_SLOW_FLUSH"))
|
||||
if err == nil && duration > 0 {
|
||||
TestSlowFlush = duration
|
||||
}
|
||||
}
|
||||
|
||||
func Fatalf(format string, args ...any) {
|
||||
Printf(format+"\n", args...)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
Reference in New Issue
Block a user