Files
streamsql/window/window_test.go
T
2025-08-06 18:11:44 +08:00

1470 lines
32 KiB
Go

package window
import (
"reflect"
"sync"
"testing"
"time"
"github.com/rulego/streamsql/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// getTypeString 获取对象的类型字符串表示
func getTypeString(obj interface{}) string {
if obj == nil {
return ""
}
return reflect.TypeOf(obj).String()
}
// TestWindowEdgeCases 测试窗口的边界条件
func TestWindowEdgeCases(t *testing.T) {
t.Run("tumbling window with zero duration", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Duration(0),
},
}
_, err := NewTumblingWindow(config)
// 零持续时间可能是有效的,取决于实现
_ = err
})
t.Run("tumbling window with negative duration", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": -time.Second,
},
}
_, err := NewTumblingWindow(config)
// 负持续时间可能是有效的,取决于实现
_ = err
})
t.Run("sliding window with zero window size", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Duration(0),
"slide": time.Second,
},
}
_, err := NewSlidingWindow(config)
// 零滑动间隔可能是有效的,取决于实现
_ = err
})
t.Run("sliding window with zero slide interval", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Minute,
"slide": time.Duration(0),
},
}
_, err := NewSlidingWindow(config)
// 零滑动间隔可能是有效的,取决于实现
_ = err
})
t.Run("sliding window with slide larger than window", func(t *testing.T) {
// 这种情况可能是有效的,取决于实现
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
"slide": time.Minute,
},
}
window, err := NewSlidingWindow(config)
_ = window
_ = err
})
t.Run("counting window with zero count", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"count": 0,
},
}
_, err := NewCountingWindow(config)
require.NotNil(t, err)
})
t.Run("counting window with negative count", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"count": -10,
},
}
_, err := NewCountingWindow(config)
require.NotNil(t, err)
})
t.Run("session window with zero timeout", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"timeout": time.Duration(0),
},
}
_, err := NewSessionWindow(config)
// 零超时可能是有效的,取决于实现
_ = err
})
t.Run("session window with negative timeout", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"timeout": -time.Second,
},
}
_, err := NewSessionWindow(config)
// 负超时可能是有效的,取决于实现
_ = err
})
}
// TestWindowWithNilCallback 测试窗口使用nil回调函数
func TestWindowWithNilCallback(t *testing.T) {
t.Run("tumbling window with nil callback", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
require.NotNil(t, window)
window.Start()
// 添加数据不应该panic
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
}
})
t.Run("sliding window with nil callback", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Minute,
"slide": time.Second,
},
}
window, err := NewSlidingWindow(config)
if err == nil {
require.NotNil(t, window)
window.Start()
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
}
})
t.Run("counting window with nil callback", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"count": 10,
},
}
window, err := NewCountingWindow(config)
if err == nil {
require.NotNil(t, window)
window.Start()
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
}
})
t.Run("session window with nil callback", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"timeout": time.Minute,
},
}
window, err := NewSessionWindow(config)
if err == nil {
require.NotNil(t, window)
window.Start()
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
}
})
}
// TestWindowConcurrency 测试窗口的并发安全性
func TestWindowConcurrency(t *testing.T) {
t.Run("concurrent add to tumbling window", func(t *testing.T) {
var receivedData [][]types.Row
var mu sync.Mutex
callback := func(rows []types.Row) {
mu.Lock()
receivedData = append(receivedData, rows)
mu.Unlock()
}
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Millisecond * 100,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(callback)
}
require.Nil(t, err)
window.Start()
var wg sync.WaitGroup
numGoroutines := 10
numRowsPerGoroutine := 50
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numRowsPerGoroutine; j++ {
row := types.Row{
Data: map[string]interface{}{
"id": goroutineID*1000 + j,
"value": float64(j),
},
Timestamp: time.Now(),
}
window.Add(row)
}
}(i)
}
wg.Wait()
// 等待窗口处理完成
time.Sleep(time.Millisecond * 200)
})
t.Run("concurrent start stop", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
var wg sync.WaitGroup
numGoroutines := 5
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
window.Start()
time.Sleep(time.Millisecond * 10)
}()
}
wg.Wait()
})
t.Run("concurrent add and stop", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
var wg sync.WaitGroup
// 一个goroutine添加数据
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
row := types.Row{
Data: map[string]interface{}{"id": i},
Timestamp: time.Now(),
}
window.Add(row)
time.Sleep(time.Millisecond)
}
}()
// 另一个goroutine停止窗口
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 50)
}()
wg.Wait()
})
}
// TestWindowMemoryManagement 测试窗口的内存管理
func TestWindowMemoryManagement(t *testing.T) {
t.Run("large data in tumbling window", func(t *testing.T) {
var processedCount int
callback := func(rows []types.Row) {
processedCount += len(rows)
}
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Millisecond * 50,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(callback)
}
require.Nil(t, err)
window.Start()
// 添加大量数据
largeData := make([]byte, 1024*1024) // 1MB
for i := range largeData {
largeData[i] = byte(i % 256)
}
for i := 0; i < 10; i++ {
row := types.Row{
Data: map[string]interface{}{
"id": i,
"data": string(largeData),
},
Timestamp: time.Now(),
}
window.Add(row)
}
// 等待处理完成
time.Sleep(time.Millisecond * 200)
})
t.Run("rapid data addition", func(t *testing.T) {
var processedCount int
var mu sync.Mutex
callback := func(rows []types.Row) {
mu.Lock()
processedCount += len(rows)
mu.Unlock()
}
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Millisecond * 10,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(callback)
}
require.Nil(t, err)
window.Start()
// 快速添加大量小数据
for i := 0; i < 1000; i++ {
row := types.Row{
Data: map[string]interface{}{"id": i},
Timestamp: time.Now(),
}
window.Add(row)
}
// 等待处理完成
time.Sleep(time.Millisecond * 100)
})
}
// TestWindowErrorConditions 测试窗口的错误条件
func TestWindowErrorConditions(t *testing.T) {
t.Run("add to stopped window", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
// 向已停止的窗口添加数据不应该panic
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
})
t.Run("add invalid data types", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
// 添加包含不可序列化数据的行
row := types.Row{
Data: map[string]interface{}{
"id": 1,
"channel": make(chan int),
"func": func() {},
},
Timestamp: time.Now(),
}
window.Add(row)
})
t.Run("add row with zero timestamp", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
// 添加时间戳为零值的行
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Time{},
}
window.Add(row)
})
t.Run("add row with future timestamp", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
// 添加未来时间戳的行
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now().Add(time.Hour),
}
window.Add(row)
})
t.Run("add row with very old timestamp", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
// 添加很久以前的时间戳的行
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now().Add(-time.Hour * 24),
}
window.Add(row)
})
}
// TestWindowStatsAndMetrics 测试窗口的统计和指标
func TestWindowStatsAndMetrics(t *testing.T) {
t.Run("get stats from tumbling window", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
assert.Nil(t, err)
// 获取统计信息不应该panic
stats := window.GetStats()
_ = stats
})
t.Run("reset stats", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
assert.Nil(t, err)
window.Start()
// 添加一些数据
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
// 重置统计信息不应该panic
window.ResetStats()
})
t.Run("get output channel", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
assert.Nil(t, err)
// 获取输出通道不应该panic
outputChan := window.OutputChan()
_ = outputChan
})
t.Run("set callback", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
// 设置新的回调函数不应该panic
newCallback := func(rows []types.Row) {
// 新的回调逻辑
}
window.SetCallback(newCallback)
}
})
}
// TestWindowWithPerformanceConfig 测试窗口性能配置
func TestWindowWithPerformanceConfig(t *testing.T) {
tests := []struct {
name string
windowType string
performanceConfig types.PerformanceConfig
expectedBufferSize int
extraParams map[string]interface{}
}{
{
name: "滚动窗口-默认配置",
windowType: TypeTumbling,
performanceConfig: types.DefaultPerformanceConfig(),
expectedBufferSize: 50,
extraParams: map[string]interface{}{"size": "2s"},
},
{
name: "滚动窗口-高性能配置",
windowType: TypeTumbling,
performanceConfig: types.HighPerformanceConfig(),
expectedBufferSize: 200,
extraParams: map[string]interface{}{"size": "2s"},
},
{
name: "滚动窗口-低延迟配置",
windowType: TypeTumbling,
performanceConfig: types.LowLatencyConfig(),
expectedBufferSize: 20,
extraParams: map[string]interface{}{"size": "2s"},
},
{
name: "滑动窗口-高性能配置",
windowType: TypeSliding,
performanceConfig: types.HighPerformanceConfig(),
expectedBufferSize: 200,
extraParams: map[string]interface{}{"size": "10s", "slide": "5s"},
},
{
name: "计数窗口-高性能配置",
windowType: TypeCounting,
performanceConfig: types.HighPerformanceConfig(),
expectedBufferSize: 20, // 200 / 10
extraParams: map[string]interface{}{"count": 10},
},
{
name: "自定义性能配置",
windowType: TypeTumbling,
performanceConfig: types.PerformanceConfig{BufferConfig: types.BufferConfig{WindowOutputSize: 500}},
expectedBufferSize: 500,
extraParams: map[string]interface{}{"size": "2s"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := types.WindowConfig{
Type: tt.windowType,
Params: make(map[string]interface{}),
}
// 合并参数
for k, v := range tt.extraParams {
config.Params[k] = v
}
config.Params["performanceConfig"] = tt.performanceConfig
var window Window
var err error
switch tt.windowType {
case TypeTumbling:
window, err = NewTumblingWindow(config)
case TypeSliding:
window, err = NewSlidingWindow(config)
case TypeCounting:
window, err = NewCountingWindow(config)
case TypeSession:
window, err = NewSessionWindow(config)
}
assert.NoError(t, err)
assert.Equal(t, tt.expectedBufferSize, cap(window.OutputChan()))
if closer, ok := window.(interface{ Stop() }); ok {
closer.Stop()
}
})
}
t.Run("无性能配置-使用默认值", func(t *testing.T) {
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "3s",
},
}
tw, err := NewTumblingWindow(config)
assert.NoError(t, err)
assert.Equal(t, 1000, cap(tw.outputChan))
tw.Stop()
})
}
// TestGetTimestampEdgeCases 测试GetTimestamp函数的边缘情况
func TestGetTimestampEdgeCases(t *testing.T) {
tests := []struct {
name string
data interface{}
tsProp string
timeUnit time.Duration
checkNow bool
}{
{
name: "空字符串时间戳属性",
data: map[string]interface{}{"value": 42},
tsProp: "",
timeUnit: time.Second,
checkNow: true,
},
{
name: "结构体中不存在的字段",
data: struct {
Value int
}{Value: 42},
tsProp: "NonExistentField",
timeUnit: time.Second,
checkNow: true,
},
{
name: "map中不存在的键",
data: map[string]interface{}{
"value": 42,
},
tsProp: "nonexistent",
timeUnit: time.Second,
checkNow: true,
},
{
name: "map中非时间类型的值",
data: map[string]interface{}{
"timestamp": "not a time",
},
tsProp: "timestamp",
timeUnit: time.Second,
checkNow: true,
},
{
name: "非字符串键的map",
data: map[int]interface{}{
1: time.Now(),
},
tsProp: "timestamp",
timeUnit: time.Second,
checkNow: true,
},
{
name: "nil数据",
data: nil,
tsProp: "timestamp",
timeUnit: time.Second,
checkNow: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := GetTimestamp(tt.data, tt.tsProp, tt.timeUnit)
if tt.checkNow {
// 检查返回的时间是否接近当前时间(允许1秒误差)
assert.WithinDuration(t, time.Now(), result, time.Second)
}
})
}
}
// TestSessionWindowSessionKey 测试会话窗口的会话键提取
func TestSessionWindowSessionKey(t *testing.T) {
config := types.WindowConfig{
Type: TypeSession,
Params: map[string]interface{}{
"timeout": "5s",
},
GroupByKey: "user_id",
}
sw, err := NewSessionWindow(config)
assert.NoError(t, err)
// 测试不同类型的数据
tests := []struct {
name string
data interface{}
}{
{
name: "map数据",
data: map[string]interface{}{
"user_id": "user123",
"value": 100,
},
},
{
name: "结构体数据",
data: struct {
UserID string `json:"user_id"`
Value int
}{UserID: "user456", Value: 200},
},
{
name: "无效数据",
data: "invalid",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 这里只是测试Add方法不会panic
assert.NotPanics(t, func() {
sw.Add(tt.data)
})
})
}
}
// TestWindowStopBeforeStart 测试在启动前停止窗口
func TestWindowStopBeforeStart(t *testing.T) {
tests := []struct {
name string
config types.WindowConfig
}{
{
name: "滚动窗口",
config: types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{"size": "1s"},
},
},
{
name: "滑动窗口",
config: types.WindowConfig{
Type: TypeSliding,
Params: map[string]interface{}{
"size": "2s",
"slide": "1s",
},
},
},
{
name: "计数窗口",
config: types.WindowConfig{
Type: TypeCounting,
Params: map[string]interface{}{"count": 10},
},
},
{
name: "会话窗口",
config: types.WindowConfig{
Type: TypeSession,
Params: map[string]interface{}{"timeout": "5s"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
window, err := CreateWindow(tt.config)
assert.NoError(t, err)
// 在启动前停止窗口应该不会panic
assert.NotPanics(t, func() {
if tw, ok := window.(*TumblingWindow); ok {
tw.Stop()
} else if sw, ok := window.(*SlidingWindow); ok {
sw.Stop()
} else if cw, ok := window.(*CountingWindow); ok {
// CountingWindow doesn't have Stop method
_ = cw
} else if sesw, ok := window.(*SessionWindow); ok {
sesw.Stop()
}
})
})
}
}
// TestWindowMultipleStops 测试多次停止窗口
func TestWindowMultipleStops(t *testing.T) {
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{"size": "1s"},
}
tw, err := NewTumblingWindow(config)
assert.NoError(t, err)
tw.Start()
// 多次停止应该不会panic
assert.NotPanics(t, func() {
tw.Stop()
tw.Stop()
tw.Stop()
})
}
// TestWindowAddAfterStop 测试停止后添加数据
func TestWindowAddAfterStop(t *testing.T) {
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{"size": "1s"},
}
tw, err := NewTumblingWindow(config)
assert.NoError(t, err)
tw.Start()
tw.Stop()
// 停止后添加数据应该不会panic
assert.NotPanics(t, func() {
tw.Add(map[string]interface{}{"value": 42})
})
}
// TestCountingWindowWithCallback 测试计数窗口的回调功能
func TestCountingWindowWithCallback(t *testing.T) {
var mu sync.Mutex
callbackData := make([][]types.Row, 0)
callback := func(results []types.Row) {
mu.Lock()
defer mu.Unlock()
callbackData = append(callbackData, results)
}
config := types.WindowConfig{
Type: TypeCounting,
Params: map[string]interface{}{
"count": 2,
"callback": callback,
},
}
cw, err := NewCountingWindow(config)
assert.NoError(t, err)
cw.Start()
// CountingWindow doesn't have Stop method, will be handled by context cancellation
// 添加数据
cw.Add(map[string]interface{}{"value": 1})
cw.Add(map[string]interface{}{"value": 2})
// 等待处理
time.Sleep(100 * time.Millisecond)
// 检查回调是否被调用
assert.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(callbackData) > 0
}, time.Second, 10*time.Millisecond)
}
// TestSlidingWindowInvalidParams 测试滑动窗口的无效参数
func TestSlidingWindowInvalidParams(t *testing.T) {
tests := []struct {
name string
params map[string]interface{}
}{
{
name: "无效的slide参数",
params: map[string]interface{}{
"size": "10s",
"slide": "invalid",
},
},
{
name: "缺少slide参数",
params: map[string]interface{}{
"size": "10s",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := types.WindowConfig{
Type: TypeSliding,
Params: tt.params,
}
_, err := NewSlidingWindow(config)
assert.Error(t, err)
})
}
}
// TestWindowUnifiedConfigIntegration 集成测试:验证窗口配置与实际数据处理的集成
func TestWindowUnifiedConfigIntegration(t *testing.T) {
t.Run("性能配置集成测试", func(t *testing.T) {
performanceConfig := types.HighPerformanceConfig()
windowConfig := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "1s",
"performanceConfig": performanceConfig,
},
}
tw, err := NewTumblingWindow(windowConfig)
assert.NoError(t, err)
defer tw.Stop()
// 验证缓冲区大小
assert.Equal(t, 200, cap(tw.outputChan))
// 启动窗口
tw.Start()
// 发送测试数据
for i := 0; i < 10; i++ {
tw.Add(map[string]interface{}{
"id": i,
"value": i * 10,
})
}
// 等待窗口触发
time.Sleep(1200 * time.Millisecond)
// 验证窗口能正常工作
select {
case data := <-tw.OutputChan():
assert.Greater(t, len(data), 0)
assert.LessOrEqual(t, len(data), 10)
case <-time.After(500 * time.Millisecond):
t.Error("超时未接收到窗口输出")
}
})
t.Run("缓冲区溢出处理", func(t *testing.T) {
// 创建一个小缓冲区的窗口
smallBufferConfig := types.PerformanceConfig{
BufferConfig: types.BufferConfig{
WindowOutputSize: 1, // 非常小的缓冲区
},
}
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "100ms",
"performanceConfig": smallBufferConfig,
},
}
tw, err := NewTumblingWindow(config)
assert.NoError(t, err)
tw.Start()
defer tw.Stop()
// 快速添加大量数据,可能导致缓冲区溢出
for i := 0; i < 10; i++ {
tw.Add(map[string]interface{}{"value": i})
}
// 等待处理
time.Sleep(200 * time.Millisecond)
// 检查统计信息
stats := tw.GetStats()
assert.Contains(t, stats, "dropped_count")
assert.Contains(t, stats, "sent_count")
})
}
// TestCreateWindow 测试窗口工厂函数
func TestCreateWindow(t *testing.T) {
tests := []struct {
name string
config types.WindowConfig
expectError bool
expectedType string
}{
{
name: "创建滚动窗口",
config: types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "5s",
},
},
expectError: false,
expectedType: "*window.TumblingWindow",
},
{
name: "创建滑动窗口",
config: types.WindowConfig{
Type: TypeSliding,
Params: map[string]interface{}{
"size": "10s",
"slide": "5s",
},
},
expectError: false,
expectedType: "*window.SlidingWindow",
},
{
name: "创建计数窗口",
config: types.WindowConfig{
Type: TypeCounting,
Params: map[string]interface{}{
"count": 100,
},
},
expectError: false,
expectedType: "*window.CountingWindow",
},
{
name: "创建会话窗口",
config: types.WindowConfig{
Type: TypeSession,
Params: map[string]interface{}{
"timeout": "30s",
},
},
expectError: false,
expectedType: "*window.SessionWindow",
},
{
name: "窗口工厂与统一配置集成",
config: types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "5s",
"performanceConfig": types.PerformanceConfig{
BufferConfig: types.BufferConfig{
WindowOutputSize: 1500,
},
},
},
},
expectError: false,
expectedType: "*window.TumblingWindow",
},
{
name: "无效的窗口类型",
config: types.WindowConfig{
Type: "invalid",
Params: map[string]interface{}{
"size": "5s",
},
},
expectError: true,
expectedType: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
window, err := CreateWindow(tt.config)
if tt.expectError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.NotNil(t, window)
assert.Equal(t, tt.expectedType, getTypeString(window))
// 验证窗口能正常工作
if closer, ok := window.(interface{ Stop() }); ok {
closer.Stop()
}
})
}
}
// TestGetTimestampCoverage 测试时间戳提取函数
func TestGetTimestampCoverage(t *testing.T) {
testTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
tests := []struct {
name string
data interface{}
tsProp string
timeUnit time.Duration
expected time.Time
}{
{
name: "使用GetTimestamp接口",
data: TestDate2{ts: testTime},
tsProp: "",
timeUnit: time.Second,
expected: testTime,
},
{
name: "从结构体字段提取时间戳",
data: struct {
Timestamp time.Time
Value int
}{Timestamp: testTime, Value: 42},
tsProp: "Timestamp",
timeUnit: time.Second,
expected: testTime,
},
{
name: "从map中提取时间戳",
data: map[string]interface{}{
"timestamp": testTime,
"value": 42,
},
tsProp: "timestamp",
timeUnit: time.Second,
expected: testTime,
},
{
name: "从map中提取int64时间戳",
data: map[string]interface{}{
"timestamp": testTime.Unix(),
},
tsProp: "timestamp",
timeUnit: time.Second,
expected: time.Unix(testTime.Unix(), 0),
},
{
name: "无法提取时间戳,使用当前时间",
data: "invalid data",
tsProp: "nonexistent",
timeUnit: time.Second,
// expected will be checked with time tolerance
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := GetTimestamp(tt.data, tt.tsProp, tt.timeUnit)
if tt.name == "无法提取时间戳,使用当前时间" {
// 检查返回的时间是否接近当前时间(允许1秒误差)
assert.WithinDuration(t, time.Now(), result, time.Second)
} else {
assert.Equal(t, tt.expected, result)
}
})
}
}
// TestWindowErrorHandling 测试窗口错误处理
func TestWindowErrorHandling(t *testing.T) {
t.Run("滚动窗口无效大小", func(t *testing.T) {
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "invalid",
},
}
_, err := NewTumblingWindow(config)
assert.Error(t, err)
})
t.Run("滑动窗口无效参数", func(t *testing.T) {
config := types.WindowConfig{
Type: TypeSliding,
Params: map[string]interface{}{
"size": "invalid",
"slide": "5s",
},
}
_, err := NewSlidingWindow(config)
assert.Error(t, err)
})
t.Run("计数窗口无效计数", func(t *testing.T) {
config := types.WindowConfig{
Type: TypeCounting,
Params: map[string]interface{}{
"count": 0,
},
}
_, err := NewCountingWindow(config)
assert.Error(t, err)
})
t.Run("会话窗口无效超时", func(t *testing.T) {
config := types.WindowConfig{
Type: TypeSession,
Params: map[string]interface{}{
"timeout": "invalid",
},
}
_, err := NewSessionWindow(config)
assert.Error(t, err)
})
}
// TestSessionWindowAdvanced 测试会话窗口的高级功能
func TestSessionWindowAdvanced(t *testing.T) {
config := types.WindowConfig{
Type: TypeSession,
Params: map[string]interface{}{
"timeout": "1s",
},
GroupByKey: "user_id",
}
sw, err := NewSessionWindow(config)
assert.NoError(t, err)
assert.NotNil(t, sw)
// 测试设置回调函数
sw.SetCallback(func(results []types.Row) {
// Callback executed
})
// 启动窗口
sw.Start()
defer sw.Stop()
// 添加不同用户的数据
sw.Add(map[string]interface{}{
"user_id": "user1",
"value": 100,
})
sw.Add(map[string]interface{}{
"user_id": "user2",
"value": 200,
})
// 等待会话超时
time.Sleep(1500 * time.Millisecond)
// 检查输出通道
select {
case data := <-sw.OutputChan():
assert.NotEmpty(t, data)
case <-time.After(500 * time.Millisecond):
// 可能没有数据输出,这也是正常的
}
// 测试重置功能
sw.Reset()
// 测试手动触发
sw.Trigger()
}
// TestSlidingWindowAdvanced 测试滑动窗口的高级功能
func TestSlidingWindowAdvanced(t *testing.T) {
config := types.WindowConfig{
Type: TypeSliding,
Params: map[string]interface{}{
"size": "2s",
"slide": "1s",
},
TsProp: "timestamp",
TimeUnit: time.Second,
}
sw, err := NewSlidingWindow(config)
assert.NoError(t, err)
assert.NotNil(t, sw)
// 测试获取输出通道
outputChan := sw.OutputChan()
assert.NotNil(t, outputChan)
// 测试重置功能
sw.Reset()
// 测试手动触发
sw.Trigger()
}
// TestCountingWindowAdvanced 测试计数窗口的高级功能
func TestCountingWindowAdvanced(t *testing.T) {
config := types.WindowConfig{
Type: TypeCounting,
Params: map[string]interface{}{
"count": 3,
},
TsProp: "timestamp",
TimeUnit: time.Second,
}
cw, err := NewCountingWindow(config)
assert.NoError(t, err)
assert.NotNil(t, cw)
// 测试设置回调函数
cw.SetCallback(func(results []types.Row) {
// Callback executed
})
// 启动窗口
cw.Start()
// CountingWindow doesn't have Stop method
// 添加数据直到达到阈值
for i := 0; i < 3; i++ {
cw.Add(map[string]interface{}{
"timestamp": time.Now().Unix(),
"value": i,
})
}
// 等待一段时间让窗口处理数据
time.Sleep(100 * time.Millisecond)
// 检查输出通道
select {
case data := <-cw.OutputChan():
assert.Len(t, data, 3)
case <-time.After(500 * time.Millisecond):
// 可能没有数据输出,这也是正常的
}
// 测试重置功能
cw.Reset()
// 测试手动触发
cw.Trigger()
}
// TestTumblingWindowAdvanced 测试滚动窗口的高级功能
func TestTumblingWindowAdvanced(t *testing.T) {
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "1s",
},
TsProp: "timestamp",
TimeUnit: time.Second,
}
tw, err := NewTumblingWindow(config)
assert.NoError(t, err)
assert.NotNil(t, tw)
// 检查统计信息
stats := tw.GetStats()
assert.Contains(t, stats, "sent_count")
assert.Contains(t, stats, "dropped_count")
// 测试重置统计信息
tw.ResetStats()
stats = tw.GetStats()
assert.Equal(t, int64(0), stats["droppedCount"])
assert.Equal(t, int64(0), stats["sentCount"])
// 测试设置回调函数
tw.SetCallback(func(results []types.Row) {
// Callback executed
})
// 测试获取输出通道
outputChan := tw.OutputChan()
assert.NotNil(t, outputChan)
// 测试重置功能
tw.Reset()
// 测试手动触发
tw.Trigger()
}