fix:修复测试用例断言

This commit is contained in:
rulego-team
2025-11-13 18:54:35 +08:00
parent eb29ed921d
commit 6f77dc5f7f
2 changed files with 96 additions and 43 deletions
+78 -32
View File
@@ -1,6 +1,7 @@
package streamsql
import (
"sync"
"testing"
"time"
@@ -23,9 +24,12 @@ func TestSQLSlidingWindow_Basic(t *testing.T) {
ch := make(chan []map[string]interface{}, 10)
windowResults := make([][]map[string]interface{}, 0)
var windowResultsMu sync.Mutex
ssql.AddSink(func(results []map[string]interface{}) {
if len(results) > 0 {
windowResultsMu.Lock()
windowResults = append(windowResults, results)
windowResultsMu.Unlock()
ch <- results
}
})
@@ -62,10 +66,15 @@ func TestSQLSlidingWindow_Basic(t *testing.T) {
END:
assert.Greater(t, len(results), 0, "应该至少触发一个窗口")
require.Greater(t, len(windowResults), 0, "应该至少有一个窗口结果")
windowResultsMu.Lock()
windowResultsLen := len(windowResults)
windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
copy(windowResultsCopy, windowResults)
windowResultsMu.Unlock()
require.Greater(t, windowResultsLen, 0, "应该至少有一个窗口结果")
if len(windowResults) > 0 {
firstWindow := windowResults[0]
if windowResultsLen > 0 {
firstWindow := windowResultsCopy[0]
require.Len(t, firstWindow, 1, "第一个窗口应该只有一行结果")
cnt := firstWindow[0]["cnt"].(float64)
@@ -90,12 +99,12 @@ END:
}
// 验证有多个窗口被触发(滑动窗口应该每2秒触发一次)
if len(windowResults) > 1 {
t.Logf("总共触发了 %d 个窗口", len(windowResults))
if windowResultsLen > 1 {
t.Logf("总共触发了 %d 个窗口", windowResultsLen)
// 验证后续窗口也包含数据
for i := 1; i < len(windowResults) && i < 5; i++ {
if len(windowResults[i]) > 0 {
cnt := windowResults[i][0]["cnt"].(float64)
for i := 1; i < windowResultsLen && i < 5; i++ {
if len(windowResultsCopy[i]) > 0 {
cnt := windowResultsCopy[i][0]["cnt"].(float64)
assert.Greater(t, cnt, 0.0, "窗口 %d 应该包含数据", i+1)
}
}
@@ -203,6 +212,7 @@ func TestSQLSlidingWindow_MultipleWindowsAlignment(t *testing.T) {
ch := make(chan []map[string]interface{}, 20)
windowResults := make([][]map[string]interface{}, 0)
var windowResultsMu sync.Mutex
ssql.AddSink(func(results []map[string]interface{}) {
ch <- results
})
@@ -223,7 +233,9 @@ func TestSQLSlidingWindow_MultipleWindowsAlignment(t *testing.T) {
select {
case res := <-ch:
if len(res) > 0 {
windowResultsMu.Lock()
windowResults = append(windowResults, res)
windowResultsMu.Unlock()
}
case <-timeout:
goto END
@@ -231,9 +243,14 @@ func TestSQLSlidingWindow_MultipleWindowsAlignment(t *testing.T) {
}
END:
require.Greater(t, len(windowResults), 0, "应该至少触发一个窗口")
windowResultsMu.Lock()
windowResultsLen := len(windowResults)
windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
copy(windowResultsCopy, windowResults)
windowResultsMu.Unlock()
require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口")
for i, res := range windowResults {
for i, res := range windowResultsCopy {
require.Len(t, res, 1, "窗口 %d 应该只有一行聚合结果", i+1)
row := res[0]
@@ -259,9 +276,9 @@ END:
assert.LessOrEqual(t, cnt, 15.0, "窗口 %d 计数不应该超过15", i+1)
}
if len(windowResults) > 1 {
firstWindow := windowResults[0]
lastWindow := windowResults[len(windowResults)-1]
if windowResultsLen > 1 {
firstWindow := windowResultsCopy[0]
lastWindow := windowResultsCopy[windowResultsLen-1]
firstCnt := firstWindow[0]["cnt"].(float64)
lastCnt := lastWindow[0]["cnt"].(float64)
@@ -274,8 +291,8 @@ END:
"第一个窗口的最小值应该小于等于最后一个窗口的最小值")
}
allCounts := make([]float64, len(windowResults))
for i, res := range windowResults {
allCounts := make([]float64, windowResultsLen)
for i, res := range windowResultsCopy {
allCounts[i] = res[0]["cnt"].(float64)
}
@@ -407,9 +424,12 @@ func TestSQLSlidingWindow_FirstWindowTiming(t *testing.T) {
ch := make(chan []map[string]interface{}, 20)
windowTimings := make([]time.Time, 0)
var windowTimingsMu sync.Mutex
ssql.AddSink(func(results []map[string]interface{}) {
if len(results) > 0 {
windowTimingsMu.Lock()
windowTimings = append(windowTimings, time.Now())
windowTimingsMu.Unlock()
ch <- results
}
})
@@ -435,7 +455,9 @@ func TestSQLSlidingWindow_FirstWindowTiming(t *testing.T) {
case res := <-ch:
if len(res) > 0 && !firstWindowReceived {
firstWindowReceived = true
windowTimingsMu.Lock()
firstWindowTime := windowTimings[0]
windowTimingsMu.Unlock()
elapsed := firstWindowTime.Sub(firstDataTime)
// 第一个窗口应该在窗口大小时间(10秒)后触发
@@ -482,9 +504,12 @@ func TestSQLSlidingWindow_DataOverlap(t *testing.T) {
ch := make(chan []map[string]interface{}, 20)
windowResults := make([][]map[string]interface{}, 0)
var windowResultsMu sync.Mutex
ssql.AddSink(func(results []map[string]interface{}) {
if len(results) > 0 {
windowResultsMu.Lock()
windowResults = append(windowResults, results)
windowResultsMu.Unlock()
ch <- results
}
})
@@ -501,9 +526,12 @@ func TestSQLSlidingWindow_DataOverlap(t *testing.T) {
}
// 等待足够的时间让多个窗口触发
time.Sleep(12 * time.Second)
// 第一个窗口在10秒后触发,后续窗口每2秒触发一次
// 等待足够的时间让至少3个窗口触发:10秒(第一个窗口)+ 2秒(第二个窗口)+ 2秒(第三个窗口)= 14秒
time.Sleep(15 * time.Second)
timeout := time.After(3 * time.Second)
// 收集所有已到达的结果,设置合理的超时时间
timeout := time.After(2 * time.Second)
for {
select {
case <-ch:
@@ -514,11 +542,16 @@ func TestSQLSlidingWindow_DataOverlap(t *testing.T) {
}
END:
require.GreaterOrEqual(t, len(windowResults), 3, "应该至少触发3个窗口")
windowResultsMu.Lock()
windowResultsLen := len(windowResults)
windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
copy(windowResultsCopy, windowResults)
windowResultsMu.Unlock()
require.GreaterOrEqual(t, windowResultsLen, 3, "应该至少触发3个窗口")
// 验证第一个窗口包含数据0-9
if len(windowResults) > 0 {
firstWindow := windowResults[0]
if windowResultsLen > 0 {
firstWindow := windowResultsCopy[0]
require.Len(t, firstWindow, 1)
firstRow := firstWindow[0]
firstCnt := firstRow["cnt"].(float64)
@@ -543,8 +576,8 @@ END:
}
// 验证第二个窗口与第一个窗口有重叠
if len(windowResults) > 1 {
secondWindow := windowResults[1]
if windowResultsLen > 1 {
secondWindow := windowResultsCopy[1]
require.Len(t, secondWindow, 1)
secondRow := secondWindow[0]
secondCnt := secondRow["cnt"].(float64)
@@ -558,8 +591,8 @@ END:
// 验证重叠:第二个窗口的最小值应该大于第一个窗口的最小值
// 因为窗口滑动,第二个窗口应该从数据2开始
if len(windowResults) > 0 {
firstMin := windowResults[0][0]["min_temp"].(float64)
if windowResultsLen > 0 {
firstMin := windowResultsCopy[0][0]["min_temp"].(float64)
assert.GreaterOrEqual(t, secondMin, firstMin,
"第二个窗口的最小值应该大于等于第一个窗口的最小值,说明窗口正确滑动")
}
@@ -569,7 +602,7 @@ END:
// 验证窗口数据不会过早丢失
// 检查是否有窗口的数据量异常少(可能是数据被过早清理)
for i, res := range windowResults {
for i, res := range windowResultsCopy {
if len(res) > 0 {
cnt := res[0]["cnt"].(float64)
// 对于前几个窗口,数据量不应该异常少
@@ -601,9 +634,12 @@ func TestSQLSlidingWindow_DataRetention(t *testing.T) {
ch := make(chan []map[string]interface{}, 20)
windowResults := make([][]map[string]interface{}, 0)
var windowResultsMu sync.Mutex
ssql.AddSink(func(results []map[string]interface{}) {
if len(results) > 0 {
windowResultsMu.Lock()
windowResults = append(windowResults, results)
windowResultsMu.Unlock()
ch <- results
}
})
@@ -620,9 +656,14 @@ func TestSQLSlidingWindow_DataRetention(t *testing.T) {
}
// 等待多个窗口触发
time.Sleep(12 * time.Second)
// 第一个窗口在10秒后触发,后续窗口每2秒触发一次
// 等待足够的时间让至少3个窗口触发:10秒(第一个窗口)+ 2秒(第二个窗口)+ 2秒(第三个窗口)= 14秒
time.Sleep(15 * time.Second)
timeout := time.After(3 * time.Second)
// 收集所有已到达的结果,设置合理的超时时间
// 由于已经等待了15秒,大部分窗口应该已经触发
// 这里只需要等待一小段时间收集剩余的结果
timeout := time.After(2 * time.Second)
for {
select {
case <-ch:
@@ -633,13 +674,18 @@ func TestSQLSlidingWindow_DataRetention(t *testing.T) {
}
END:
require.GreaterOrEqual(t, len(windowResults), 3, "应该至少触发3个窗口")
windowResultsMu.Lock()
windowResultsLen := len(windowResults)
windowResultsCopy := make([][]map[string]interface{}, len(windowResults))
copy(windowResultsCopy, windowResults)
windowResultsMu.Unlock()
require.GreaterOrEqual(t, windowResultsLen, 3, "应该至少触发3个窗口")
// 验证数据保留:检查最小值的变化趋势
// 由于窗口滑动,后续窗口的最小值应该逐渐增大
// 但如果数据保留逻辑正确,不应该突然跳跃
minTemps := make([]float64, 0)
for _, res := range windowResults {
for _, res := range windowResultsCopy {
if len(res) > 0 {
minTemp := res[0]["min_temp"].(float64)
minTemps = append(minTemps, minTemp)
@@ -659,9 +705,9 @@ END:
// 验证窗口数据量:前几个窗口的数据量应该足够
// 使用处理时间时,如果数据保留逻辑正确,窗口数据量应该逐渐减少(因为旧数据逐渐过期)
// 但减少应该是平滑的,不应该突然大幅减少
for i := 0; i < len(windowResults) && i < 5; i++ {
if len(windowResults[i]) > 0 {
cnt := windowResults[i][0]["cnt"].(float64)
for i := 0; i < windowResultsLen && i < 5; i++ {
if len(windowResultsCopy[i]) > 0 {
cnt := windowResultsCopy[i][0]["cnt"].(float64)
// 前几个窗口应该包含足够的数据(使用处理时间)
// 窗口大小10秒,每秒1条数据,应该包含约10条数据
if i < 3 {
+18 -11
View File
@@ -2,7 +2,6 @@ package window
import (
"context"
"github.com/rulego/streamsql/utils/timex"
"testing"
"time"
@@ -22,7 +21,7 @@ func TestSlidingWindow(t *testing.T) {
defer cancel()
sw, _ := NewSlidingWindow(types.WindowConfig{
Params: []interface{}{2 * time.Second, time.Second},
Params: []interface{}{2 * time.Second, time.Second},
TsProp: "Ts",
TimeUnit: time.Second,
})
@@ -49,11 +48,17 @@ func TestSlidingWindow(t *testing.T) {
sw.Add(t_0)
// 验证每个窗口的数据
// 移除对齐后,窗口从第一个数据的时间开始
// 窗口大小2秒,滑动步长1秒
// 第一个窗口: [t_3, t_3 + 2秒) = [16:46:56.789, 16:46:58.789)
// 第二个窗口: [t_3 + 1秒, t_3 + 3秒) = [16:46:57.789, 16:46:59.789)
// 第三个窗口: [t_3 + 2秒, t_3 + 4秒) = [16:46:58.789, 16:47:00.789)
// 第四个窗口: [t_3 + 3秒, t_3 + 5秒) = [16:46:59.789, 16:47:01.789)
expected := []TestResult{
{size: 2, data: []TestDate{t_3, t_2}, start: timex.AlignTime(t_3.Ts, time.Second, true), end: timex.AlignTime(t_2.Ts, time.Second, false)},
{size: 2, data: []TestDate{t_2, t_1}, start: timex.AlignTime(t_2.Ts, time.Second, true), end: timex.AlignTime(t_1.Ts, time.Second, false)},
{size: 2, data: []TestDate{t_1, t_0}, start: timex.AlignTime(t_1.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, false)},
{size: 1, data: []TestDate{t_0}, start: timex.AlignTime(t_0.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, true).Add(sw.size)},
{size: 2, data: []TestDate{t_3, t_2}, start: t_3.Ts, end: t_3.Ts.Add(sw.size)},
{size: 2, data: []TestDate{t_2, t_1}, start: t_3.Ts.Add(sw.slide), end: t_3.Ts.Add(sw.slide).Add(sw.size)},
{size: 2, data: []TestDate{t_1, t_0}, start: t_3.Ts.Add(2 * sw.slide), end: t_3.Ts.Add(2 * sw.slide).Add(sw.size)},
{size: 1, data: []TestDate{t_0}, start: t_3.Ts.Add(3 * sw.slide), end: t_3.Ts.Add(3 * sw.slide).Add(sw.size)},
}
// 等待一段时间,触发窗口
//time.Sleep(3 * time.Second)
@@ -88,12 +93,14 @@ END:
assert.Equal(t, len(actual), len(expected))
// 预期结果:保留最近 2 秒内的数据
for i, exp := range expected {
assert.Equal(t, actual[i].size, exp.size)
// 允许时间有1秒的误差
assert.WithinDuration(t, exp.start, actual[i].start, time.Second)
assert.WithinDuration(t, exp.end, actual[i].end, time.Second)
assert.Equal(t, actual[i].size, exp.size, "窗口 %d 的数据量应该匹配", i+1)
// 移除对齐后,窗口时间应该精确匹配(允许微小的纳秒级误差
assert.WithinDuration(t, exp.start, actual[i].start, 100*time.Millisecond,
"窗口 %d 的开始时间应该匹配,预期: %v, 实际: %v", i+1, exp.start, actual[i].start)
assert.WithinDuration(t, exp.end, actual[i].end, 100*time.Millisecond,
"窗口 %d 的结束时间应该匹配,预期: %v, 实际: %v", i+1, exp.end, actual[i].end)
for _, d := range exp.data {
assert.Contains(t, actual[i].data, d)
assert.Contains(t, actual[i].data, d, "窗口 %d 应该包含数据 %v", i+1, d)
}
}
}