From 6f77dc5f7f20bfda0fb6e6316bd451d5450fc9aa Mon Sep 17 00:00:00 2001 From: rulego-team Date: Thu, 13 Nov 2025 18:54:35 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B=E6=96=AD=E8=A8=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- streamsql_sliding_window_test.go | 110 ++++++++++++++++++++++--------- window/sliding_window_test.go | 29 ++++---- 2 files changed, 96 insertions(+), 43 deletions(-) diff --git a/streamsql_sliding_window_test.go b/streamsql_sliding_window_test.go index 22978aa..2c9b706 100644 --- a/streamsql_sliding_window_test.go +++ b/streamsql_sliding_window_test.go @@ -1,6 +1,7 @@ package streamsql import ( + "sync" "testing" "time" @@ -23,9 +24,12 @@ func TestSQLSlidingWindow_Basic(t *testing.T) { ch := make(chan []map[string]interface{}, 10) windowResults := make([][]map[string]interface{}, 0) + var windowResultsMu sync.Mutex ssql.AddSink(func(results []map[string]interface{}) { if len(results) > 0 { + windowResultsMu.Lock() windowResults = append(windowResults, results) + windowResultsMu.Unlock() ch <- results } }) @@ -62,10 +66,15 @@ func TestSQLSlidingWindow_Basic(t *testing.T) { END: assert.Greater(t, len(results), 0, "应该至少触发一个窗口") - require.Greater(t, len(windowResults), 0, "应该至少有一个窗口结果") + windowResultsMu.Lock() + windowResultsLen := len(windowResults) + windowResultsCopy := make([][]map[string]interface{}, len(windowResults)) + copy(windowResultsCopy, windowResults) + windowResultsMu.Unlock() + require.Greater(t, windowResultsLen, 0, "应该至少有一个窗口结果") - if len(windowResults) > 0 { - firstWindow := windowResults[0] + if windowResultsLen > 0 { + firstWindow := windowResultsCopy[0] require.Len(t, firstWindow, 1, "第一个窗口应该只有一行结果") cnt := firstWindow[0]["cnt"].(float64) @@ -90,12 +99,12 @@ END: } // 验证有多个窗口被触发(滑动窗口应该每2秒触发一次) - if len(windowResults) > 1 { - t.Logf("总共触发了 %d 个窗口", len(windowResults)) + if windowResultsLen > 1 { + t.Logf("总共触发了 %d 个窗口", windowResultsLen) // 验证后续窗口也包含数据 - for i := 1; i < len(windowResults) && i < 5; i++ { - if len(windowResults[i]) > 0 { - cnt := windowResults[i][0]["cnt"].(float64) + for i := 1; i < windowResultsLen && i < 5; i++ { + if len(windowResultsCopy[i]) > 0 { + cnt := windowResultsCopy[i][0]["cnt"].(float64) assert.Greater(t, cnt, 0.0, "窗口 %d 应该包含数据", i+1) } } @@ -203,6 +212,7 @@ func TestSQLSlidingWindow_MultipleWindowsAlignment(t *testing.T) { ch := make(chan []map[string]interface{}, 20) windowResults := make([][]map[string]interface{}, 0) + var windowResultsMu sync.Mutex ssql.AddSink(func(results []map[string]interface{}) { ch <- results }) @@ -223,7 +233,9 @@ func TestSQLSlidingWindow_MultipleWindowsAlignment(t *testing.T) { select { case res := <-ch: if len(res) > 0 { + windowResultsMu.Lock() windowResults = append(windowResults, res) + windowResultsMu.Unlock() } case <-timeout: goto END @@ -231,9 +243,14 @@ func TestSQLSlidingWindow_MultipleWindowsAlignment(t *testing.T) { } END: - require.Greater(t, len(windowResults), 0, "应该至少触发一个窗口") + windowResultsMu.Lock() + windowResultsLen := len(windowResults) + windowResultsCopy := make([][]map[string]interface{}, len(windowResults)) + copy(windowResultsCopy, windowResults) + windowResultsMu.Unlock() + require.Greater(t, windowResultsLen, 0, "应该至少触发一个窗口") - for i, res := range windowResults { + for i, res := range windowResultsCopy { require.Len(t, res, 1, "窗口 %d 应该只有一行聚合结果", i+1) row := res[0] @@ -259,9 +276,9 @@ END: assert.LessOrEqual(t, cnt, 15.0, "窗口 %d 计数不应该超过15", i+1) } - if len(windowResults) > 1 { - firstWindow := windowResults[0] - lastWindow := windowResults[len(windowResults)-1] + if windowResultsLen > 1 { + firstWindow := windowResultsCopy[0] + lastWindow := windowResultsCopy[windowResultsLen-1] firstCnt := firstWindow[0]["cnt"].(float64) lastCnt := lastWindow[0]["cnt"].(float64) @@ -274,8 +291,8 @@ END: "第一个窗口的最小值应该小于等于最后一个窗口的最小值") } - allCounts := make([]float64, len(windowResults)) - for i, res := range windowResults { + allCounts := make([]float64, windowResultsLen) + for i, res := range windowResultsCopy { allCounts[i] = res[0]["cnt"].(float64) } @@ -407,9 +424,12 @@ func TestSQLSlidingWindow_FirstWindowTiming(t *testing.T) { ch := make(chan []map[string]interface{}, 20) windowTimings := make([]time.Time, 0) + var windowTimingsMu sync.Mutex ssql.AddSink(func(results []map[string]interface{}) { if len(results) > 0 { + windowTimingsMu.Lock() windowTimings = append(windowTimings, time.Now()) + windowTimingsMu.Unlock() ch <- results } }) @@ -435,7 +455,9 @@ func TestSQLSlidingWindow_FirstWindowTiming(t *testing.T) { case res := <-ch: if len(res) > 0 && !firstWindowReceived { firstWindowReceived = true + windowTimingsMu.Lock() firstWindowTime := windowTimings[0] + windowTimingsMu.Unlock() elapsed := firstWindowTime.Sub(firstDataTime) // 第一个窗口应该在窗口大小时间(10秒)后触发 @@ -482,9 +504,12 @@ func TestSQLSlidingWindow_DataOverlap(t *testing.T) { ch := make(chan []map[string]interface{}, 20) windowResults := make([][]map[string]interface{}, 0) + var windowResultsMu sync.Mutex ssql.AddSink(func(results []map[string]interface{}) { if len(results) > 0 { + windowResultsMu.Lock() windowResults = append(windowResults, results) + windowResultsMu.Unlock() ch <- results } }) @@ -501,9 +526,12 @@ func TestSQLSlidingWindow_DataOverlap(t *testing.T) { } // 等待足够的时间让多个窗口触发 - time.Sleep(12 * time.Second) + // 第一个窗口在10秒后触发,后续窗口每2秒触发一次 + // 等待足够的时间让至少3个窗口触发:10秒(第一个窗口)+ 2秒(第二个窗口)+ 2秒(第三个窗口)= 14秒 + time.Sleep(15 * time.Second) - timeout := time.After(3 * time.Second) + // 收集所有已到达的结果,设置合理的超时时间 + timeout := time.After(2 * time.Second) for { select { case <-ch: @@ -514,11 +542,16 @@ func TestSQLSlidingWindow_DataOverlap(t *testing.T) { } END: - require.GreaterOrEqual(t, len(windowResults), 3, "应该至少触发3个窗口") + windowResultsMu.Lock() + windowResultsLen := len(windowResults) + windowResultsCopy := make([][]map[string]interface{}, len(windowResults)) + copy(windowResultsCopy, windowResults) + windowResultsMu.Unlock() + require.GreaterOrEqual(t, windowResultsLen, 3, "应该至少触发3个窗口") // 验证第一个窗口包含数据0-9 - if len(windowResults) > 0 { - firstWindow := windowResults[0] + if windowResultsLen > 0 { + firstWindow := windowResultsCopy[0] require.Len(t, firstWindow, 1) firstRow := firstWindow[0] firstCnt := firstRow["cnt"].(float64) @@ -543,8 +576,8 @@ END: } // 验证第二个窗口与第一个窗口有重叠 - if len(windowResults) > 1 { - secondWindow := windowResults[1] + if windowResultsLen > 1 { + secondWindow := windowResultsCopy[1] require.Len(t, secondWindow, 1) secondRow := secondWindow[0] secondCnt := secondRow["cnt"].(float64) @@ -558,8 +591,8 @@ END: // 验证重叠:第二个窗口的最小值应该大于第一个窗口的最小值 // 因为窗口滑动,第二个窗口应该从数据2开始 - if len(windowResults) > 0 { - firstMin := windowResults[0][0]["min_temp"].(float64) + if windowResultsLen > 0 { + firstMin := windowResultsCopy[0][0]["min_temp"].(float64) assert.GreaterOrEqual(t, secondMin, firstMin, "第二个窗口的最小值应该大于等于第一个窗口的最小值,说明窗口正确滑动") } @@ -569,7 +602,7 @@ END: // 验证窗口数据不会过早丢失 // 检查是否有窗口的数据量异常少(可能是数据被过早清理) - for i, res := range windowResults { + for i, res := range windowResultsCopy { if len(res) > 0 { cnt := res[0]["cnt"].(float64) // 对于前几个窗口,数据量不应该异常少 @@ -601,9 +634,12 @@ func TestSQLSlidingWindow_DataRetention(t *testing.T) { ch := make(chan []map[string]interface{}, 20) windowResults := make([][]map[string]interface{}, 0) + var windowResultsMu sync.Mutex ssql.AddSink(func(results []map[string]interface{}) { if len(results) > 0 { + windowResultsMu.Lock() windowResults = append(windowResults, results) + windowResultsMu.Unlock() ch <- results } }) @@ -620,9 +656,14 @@ func TestSQLSlidingWindow_DataRetention(t *testing.T) { } // 等待多个窗口触发 - time.Sleep(12 * time.Second) + // 第一个窗口在10秒后触发,后续窗口每2秒触发一次 + // 等待足够的时间让至少3个窗口触发:10秒(第一个窗口)+ 2秒(第二个窗口)+ 2秒(第三个窗口)= 14秒 + time.Sleep(15 * time.Second) - timeout := time.After(3 * time.Second) + // 收集所有已到达的结果,设置合理的超时时间 + // 由于已经等待了15秒,大部分窗口应该已经触发 + // 这里只需要等待一小段时间收集剩余的结果 + timeout := time.After(2 * time.Second) for { select { case <-ch: @@ -633,13 +674,18 @@ func TestSQLSlidingWindow_DataRetention(t *testing.T) { } END: - require.GreaterOrEqual(t, len(windowResults), 3, "应该至少触发3个窗口") + windowResultsMu.Lock() + windowResultsLen := len(windowResults) + windowResultsCopy := make([][]map[string]interface{}, len(windowResults)) + copy(windowResultsCopy, windowResults) + windowResultsMu.Unlock() + require.GreaterOrEqual(t, windowResultsLen, 3, "应该至少触发3个窗口") // 验证数据保留:检查最小值的变化趋势 // 由于窗口滑动,后续窗口的最小值应该逐渐增大 // 但如果数据保留逻辑正确,不应该突然跳跃 minTemps := make([]float64, 0) - for _, res := range windowResults { + for _, res := range windowResultsCopy { if len(res) > 0 { minTemp := res[0]["min_temp"].(float64) minTemps = append(minTemps, minTemp) @@ -659,9 +705,9 @@ END: // 验证窗口数据量:前几个窗口的数据量应该足够 // 使用处理时间时,如果数据保留逻辑正确,窗口数据量应该逐渐减少(因为旧数据逐渐过期) // 但减少应该是平滑的,不应该突然大幅减少 - for i := 0; i < len(windowResults) && i < 5; i++ { - if len(windowResults[i]) > 0 { - cnt := windowResults[i][0]["cnt"].(float64) + for i := 0; i < windowResultsLen && i < 5; i++ { + if len(windowResultsCopy[i]) > 0 { + cnt := windowResultsCopy[i][0]["cnt"].(float64) // 前几个窗口应该包含足够的数据(使用处理时间) // 窗口大小10秒,每秒1条数据,应该包含约10条数据 if i < 3 { diff --git a/window/sliding_window_test.go b/window/sliding_window_test.go index 9b203e4..586f130 100644 --- a/window/sliding_window_test.go +++ b/window/sliding_window_test.go @@ -2,7 +2,6 @@ package window import ( "context" - "github.com/rulego/streamsql/utils/timex" "testing" "time" @@ -22,7 +21,7 @@ func TestSlidingWindow(t *testing.T) { defer cancel() sw, _ := NewSlidingWindow(types.WindowConfig{ - Params: []interface{}{2 * time.Second, time.Second}, + Params: []interface{}{2 * time.Second, time.Second}, TsProp: "Ts", TimeUnit: time.Second, }) @@ -49,11 +48,17 @@ func TestSlidingWindow(t *testing.T) { sw.Add(t_0) // 验证每个窗口的数据 + // 移除对齐后,窗口从第一个数据的时间开始 + // 窗口大小2秒,滑动步长1秒 + // 第一个窗口: [t_3, t_3 + 2秒) = [16:46:56.789, 16:46:58.789) + // 第二个窗口: [t_3 + 1秒, t_3 + 3秒) = [16:46:57.789, 16:46:59.789) + // 第三个窗口: [t_3 + 2秒, t_3 + 4秒) = [16:46:58.789, 16:47:00.789) + // 第四个窗口: [t_3 + 3秒, t_3 + 5秒) = [16:46:59.789, 16:47:01.789) expected := []TestResult{ - {size: 2, data: []TestDate{t_3, t_2}, start: timex.AlignTime(t_3.Ts, time.Second, true), end: timex.AlignTime(t_2.Ts, time.Second, false)}, - {size: 2, data: []TestDate{t_2, t_1}, start: timex.AlignTime(t_2.Ts, time.Second, true), end: timex.AlignTime(t_1.Ts, time.Second, false)}, - {size: 2, data: []TestDate{t_1, t_0}, start: timex.AlignTime(t_1.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, false)}, - {size: 1, data: []TestDate{t_0}, start: timex.AlignTime(t_0.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, true).Add(sw.size)}, + {size: 2, data: []TestDate{t_3, t_2}, start: t_3.Ts, end: t_3.Ts.Add(sw.size)}, + {size: 2, data: []TestDate{t_2, t_1}, start: t_3.Ts.Add(sw.slide), end: t_3.Ts.Add(sw.slide).Add(sw.size)}, + {size: 2, data: []TestDate{t_1, t_0}, start: t_3.Ts.Add(2 * sw.slide), end: t_3.Ts.Add(2 * sw.slide).Add(sw.size)}, + {size: 1, data: []TestDate{t_0}, start: t_3.Ts.Add(3 * sw.slide), end: t_3.Ts.Add(3 * sw.slide).Add(sw.size)}, } // 等待一段时间,触发窗口 //time.Sleep(3 * time.Second) @@ -88,12 +93,14 @@ END: assert.Equal(t, len(actual), len(expected)) // 预期结果:保留最近 2 秒内的数据 for i, exp := range expected { - assert.Equal(t, actual[i].size, exp.size) - // 允许时间有1秒的误差 - assert.WithinDuration(t, exp.start, actual[i].start, time.Second) - assert.WithinDuration(t, exp.end, actual[i].end, time.Second) + assert.Equal(t, actual[i].size, exp.size, "窗口 %d 的数据量应该匹配", i+1) + // 移除对齐后,窗口时间应该精确匹配(允许微小的纳秒级误差) + assert.WithinDuration(t, exp.start, actual[i].start, 100*time.Millisecond, + "窗口 %d 的开始时间应该匹配,预期: %v, 实际: %v", i+1, exp.start, actual[i].start) + assert.WithinDuration(t, exp.end, actual[i].end, 100*time.Millisecond, + "窗口 %d 的结束时间应该匹配,预期: %v, 实际: %v", i+1, exp.end, actual[i].end) for _, d := range exp.data { - assert.Contains(t, actual[i].data, d) + assert.Contains(t, actual[i].data, d, "窗口 %d 应该包含数据 %v", i+1, d) } } }