mirror of
https://gitee.com/rulego/streamsql.git
synced 2025-07-01 13:57:38 +00:00
131 lines
3.3 KiB
Go
131 lines
3.3 KiB
Go
package window
|
|
|
|
import (
|
|
"context"
|
|
"github.com/rulego/streamsql/utils/timex"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/rulego/streamsql/types"
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
type TestResult struct {
|
|
size int
|
|
data []TestDate
|
|
start time.Time
|
|
end time.Time
|
|
}
|
|
|
|
func TestSlidingWindow(t *testing.T) {
|
|
_, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
sw, _ := NewSlidingWindow(types.WindowConfig{
|
|
Params: map[string]interface{}{
|
|
"size": "2s",
|
|
"slide": "1s",
|
|
},
|
|
TsProp: "Ts",
|
|
TimeUnit: time.Second,
|
|
})
|
|
sw.SetCallback(func(results []types.Row) {
|
|
if len(results) == 0 {
|
|
return
|
|
}
|
|
for _, row := range results {
|
|
t.Logf("Slot: %v Received row: %v", row.Slot, row.Data)
|
|
}
|
|
|
|
})
|
|
sw.Start()
|
|
|
|
// 添加数据
|
|
t_3 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 56, 789000000, time.UTC), tag: "1"}
|
|
t_2 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 57, 789000000, time.UTC), tag: "2"}
|
|
t_1 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 58, 789000000, time.UTC), tag: "3"}
|
|
t_0 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 59, 789000000, time.UTC), tag: "4"}
|
|
|
|
sw.Add(t_3)
|
|
sw.Add(t_2)
|
|
sw.Add(t_1)
|
|
sw.Add(t_0)
|
|
|
|
// 验证每个窗口的数据
|
|
expected := []TestResult{
|
|
{size: 2, data: []TestDate{t_3, t_2}, start: timex.AlignTime(t_3.Ts, time.Second, true), end: timex.AlignTime(t_2.Ts, time.Second, false)},
|
|
{size: 2, data: []TestDate{t_2, t_1}, start: timex.AlignTime(t_2.Ts, time.Second, true), end: timex.AlignTime(t_1.Ts, time.Second, false)},
|
|
{size: 2, data: []TestDate{t_1, t_0}, start: timex.AlignTime(t_1.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, false)},
|
|
{size: 1, data: []TestDate{t_0}, start: timex.AlignTime(t_0.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, true).Add(sw.size)},
|
|
}
|
|
// 等待一段时间,触发窗口
|
|
//time.Sleep(3 * time.Second)
|
|
|
|
// 检查结果
|
|
// resultsChan := sw.OutputChan()
|
|
// results := make(chan []types.Row)
|
|
actual := make([]TestResult, 0)
|
|
timeout := time.After(6 * time.Second)
|
|
for {
|
|
select {
|
|
case results := <-sw.OutputChan():
|
|
raw := make([]TestDate, 0)
|
|
for _, row := range results {
|
|
raw = append(raw, row.Data.(TestDate))
|
|
}
|
|
if len(results) == 0 {
|
|
continue
|
|
}
|
|
actual = append(actual, TestResult{
|
|
size: len(results),
|
|
data: raw,
|
|
start: *results[0].Slot.Start,
|
|
end: *results[0].Slot.End})
|
|
case <-timeout:
|
|
goto END
|
|
default:
|
|
}
|
|
}
|
|
|
|
END:
|
|
assert.Equal(t, len(actual), len(expected))
|
|
// 预期结果:保留最近 2 秒内的数据
|
|
for i, exp := range expected {
|
|
assert.Equal(t, actual[i].size, exp.size)
|
|
assert.Equal(t, actual[i].start, exp.start)
|
|
assert.Equal(t, actual[i].end, exp.end)
|
|
for _, d := range exp.data {
|
|
assert.Contains(t, actual[i].data, d)
|
|
}
|
|
}
|
|
}
|
|
|
|
type TestDate struct {
|
|
Ts time.Time
|
|
tag string
|
|
}
|
|
|
|
type TestDate2 struct {
|
|
ts time.Time
|
|
}
|
|
|
|
func (d TestDate2) GetTimestamp() time.Time {
|
|
return d.ts
|
|
}
|
|
|
|
func TestGetTimestamp(t *testing.T) {
|
|
t_0 := time.Now()
|
|
data := map[string]interface{}{"device": "aa", "temperature": 25.0, "humidity": 60, "ts": t_0}
|
|
t_1 := GetTimestamp(data, "ts", time.Millisecond)
|
|
|
|
data_1 := TestDate{Ts: t_0}
|
|
t_2 := GetTimestamp(data_1, "Ts", time.Millisecond)
|
|
|
|
data_2 := TestDate2{ts: t_0}
|
|
t_3 := GetTimestamp(data_2, "", time.Millisecond)
|
|
|
|
assert.Equal(t, t_0, t_1)
|
|
assert.Equal(t, t_0, t_2)
|
|
assert.Equal(t, t_0, t_3)
|
|
}
|