Files
streamsql/window/sliding_window_test.go
2025-05-25 18:02:37 +08:00

131 lines
3.3 KiB
Go

package window
import (
"context"
"github.com/rulego/streamsql/utils/timex"
"testing"
"time"
"github.com/rulego/streamsql/types"
"github.com/stretchr/testify/assert"
)
type TestResult struct {
size int
data []TestDate
start time.Time
end time.Time
}
func TestSlidingWindow(t *testing.T) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
sw, _ := NewSlidingWindow(types.WindowConfig{
Params: map[string]interface{}{
"size": "2s",
"slide": "1s",
},
TsProp: "Ts",
TimeUnit: time.Second,
})
sw.SetCallback(func(results []types.Row) {
if len(results) == 0 {
return
}
for _, row := range results {
t.Logf("Slot: %v Received row: %v", row.Slot, row.Data)
}
})
sw.Start()
// 添加数据
t_3 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 56, 789000000, time.UTC), tag: "1"}
t_2 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 57, 789000000, time.UTC), tag: "2"}
t_1 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 58, 789000000, time.UTC), tag: "3"}
t_0 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 59, 789000000, time.UTC), tag: "4"}
sw.Add(t_3)
sw.Add(t_2)
sw.Add(t_1)
sw.Add(t_0)
// 验证每个窗口的数据
expected := []TestResult{
{size: 2, data: []TestDate{t_3, t_2}, start: timex.AlignTime(t_3.Ts, time.Second, true), end: timex.AlignTime(t_2.Ts, time.Second, false)},
{size: 2, data: []TestDate{t_2, t_1}, start: timex.AlignTime(t_2.Ts, time.Second, true), end: timex.AlignTime(t_1.Ts, time.Second, false)},
{size: 2, data: []TestDate{t_1, t_0}, start: timex.AlignTime(t_1.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, false)},
{size: 1, data: []TestDate{t_0}, start: timex.AlignTime(t_0.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, true).Add(sw.size)},
}
// 等待一段时间,触发窗口
//time.Sleep(3 * time.Second)
// 检查结果
// resultsChan := sw.OutputChan()
// results := make(chan []types.Row)
actual := make([]TestResult, 0)
timeout := time.After(6 * time.Second)
for {
select {
case results := <-sw.OutputChan():
raw := make([]TestDate, 0)
for _, row := range results {
raw = append(raw, row.Data.(TestDate))
}
if len(results) == 0 {
continue
}
actual = append(actual, TestResult{
size: len(results),
data: raw,
start: *results[0].Slot.Start,
end: *results[0].Slot.End})
case <-timeout:
goto END
default:
}
}
END:
assert.Equal(t, len(actual), len(expected))
// 预期结果:保留最近 2 秒内的数据
for i, exp := range expected {
assert.Equal(t, actual[i].size, exp.size)
assert.Equal(t, actual[i].start, exp.start)
assert.Equal(t, actual[i].end, exp.end)
for _, d := range exp.data {
assert.Contains(t, actual[i].data, d)
}
}
}
type TestDate struct {
Ts time.Time
tag string
}
type TestDate2 struct {
ts time.Time
}
func (d TestDate2) GetTimestamp() time.Time {
return d.ts
}
func TestGetTimestamp(t *testing.T) {
t_0 := time.Now()
data := map[string]interface{}{"device": "aa", "temperature": 25.0, "humidity": 60, "ts": t_0}
t_1 := GetTimestamp(data, "ts", time.Millisecond)
data_1 := TestDate{Ts: t_0}
t_2 := GetTimestamp(data_1, "Ts", time.Millisecond)
data_2 := TestDate2{ts: t_0}
t_3 := GetTimestamp(data_2, "", time.Millisecond)
assert.Equal(t, t_0, t_1)
assert.Equal(t, t_0, t_2)
assert.Equal(t, t_0, t_3)
}