Files
2025-08-06 18:11:44 +08:00

1470 lines
32 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package window
import (
"reflect"
"sync"
"testing"
"time"
"github.com/rulego/streamsql/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// getTypeString 获取对象的类型字符串表示
func getTypeString(obj interface{}) string {
if obj == nil {
return ""
}
return reflect.TypeOf(obj).String()
}
// TestWindowEdgeCases 测试窗口的边界条件
func TestWindowEdgeCases(t *testing.T) {
t.Run("tumbling window with zero duration", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Duration(0),
},
}
_, err := NewTumblingWindow(config)
// 零持续时间可能是有效的,取决于实现
_ = err
})
t.Run("tumbling window with negative duration", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": -time.Second,
},
}
_, err := NewTumblingWindow(config)
// 负持续时间可能是有效的,取决于实现
_ = err
})
t.Run("sliding window with zero window size", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Duration(0),
"slide": time.Second,
},
}
_, err := NewSlidingWindow(config)
// 零滑动间隔可能是有效的,取决于实现
_ = err
})
t.Run("sliding window with zero slide interval", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Minute,
"slide": time.Duration(0),
},
}
_, err := NewSlidingWindow(config)
// 零滑动间隔可能是有效的,取决于实现
_ = err
})
t.Run("sliding window with slide larger than window", func(t *testing.T) {
// 这种情况可能是有效的,取决于实现
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
"slide": time.Minute,
},
}
window, err := NewSlidingWindow(config)
_ = window
_ = err
})
t.Run("counting window with zero count", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"count": 0,
},
}
_, err := NewCountingWindow(config)
require.NotNil(t, err)
})
t.Run("counting window with negative count", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"count": -10,
},
}
_, err := NewCountingWindow(config)
require.NotNil(t, err)
})
t.Run("session window with zero timeout", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"timeout": time.Duration(0),
},
}
_, err := NewSessionWindow(config)
// 零超时可能是有效的,取决于实现
_ = err
})
t.Run("session window with negative timeout", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"timeout": -time.Second,
},
}
_, err := NewSessionWindow(config)
// 负超时可能是有效的,取决于实现
_ = err
})
}
// TestWindowWithNilCallback 测试窗口使用nil回调函数
func TestWindowWithNilCallback(t *testing.T) {
t.Run("tumbling window with nil callback", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
require.NotNil(t, window)
window.Start()
// 添加数据不应该panic
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
}
})
t.Run("sliding window with nil callback", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Minute,
"slide": time.Second,
},
}
window, err := NewSlidingWindow(config)
if err == nil {
require.NotNil(t, window)
window.Start()
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
}
})
t.Run("counting window with nil callback", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"count": 10,
},
}
window, err := NewCountingWindow(config)
if err == nil {
require.NotNil(t, window)
window.Start()
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
}
})
t.Run("session window with nil callback", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"timeout": time.Minute,
},
}
window, err := NewSessionWindow(config)
if err == nil {
require.NotNil(t, window)
window.Start()
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
}
})
}
// TestWindowConcurrency 测试窗口的并发安全性
func TestWindowConcurrency(t *testing.T) {
t.Run("concurrent add to tumbling window", func(t *testing.T) {
var receivedData [][]types.Row
var mu sync.Mutex
callback := func(rows []types.Row) {
mu.Lock()
receivedData = append(receivedData, rows)
mu.Unlock()
}
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Millisecond * 100,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(callback)
}
require.Nil(t, err)
window.Start()
var wg sync.WaitGroup
numGoroutines := 10
numRowsPerGoroutine := 50
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numRowsPerGoroutine; j++ {
row := types.Row{
Data: map[string]interface{}{
"id": goroutineID*1000 + j,
"value": float64(j),
},
Timestamp: time.Now(),
}
window.Add(row)
}
}(i)
}
wg.Wait()
// 等待窗口处理完成
time.Sleep(time.Millisecond * 200)
})
t.Run("concurrent start stop", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
var wg sync.WaitGroup
numGoroutines := 5
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
window.Start()
time.Sleep(time.Millisecond * 10)
}()
}
wg.Wait()
})
t.Run("concurrent add and stop", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
var wg sync.WaitGroup
// 一个goroutine添加数据
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
row := types.Row{
Data: map[string]interface{}{"id": i},
Timestamp: time.Now(),
}
window.Add(row)
time.Sleep(time.Millisecond)
}
}()
// 另一个goroutine停止窗口
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 50)
}()
wg.Wait()
})
}
// TestWindowMemoryManagement 测试窗口的内存管理
func TestWindowMemoryManagement(t *testing.T) {
t.Run("large data in tumbling window", func(t *testing.T) {
var processedCount int
callback := func(rows []types.Row) {
processedCount += len(rows)
}
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Millisecond * 50,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(callback)
}
require.Nil(t, err)
window.Start()
// 添加大量数据
largeData := make([]byte, 1024*1024) // 1MB
for i := range largeData {
largeData[i] = byte(i % 256)
}
for i := 0; i < 10; i++ {
row := types.Row{
Data: map[string]interface{}{
"id": i,
"data": string(largeData),
},
Timestamp: time.Now(),
}
window.Add(row)
}
// 等待处理完成
time.Sleep(time.Millisecond * 200)
})
t.Run("rapid data addition", func(t *testing.T) {
var processedCount int
var mu sync.Mutex
callback := func(rows []types.Row) {
mu.Lock()
processedCount += len(rows)
mu.Unlock()
}
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Millisecond * 10,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(callback)
}
require.Nil(t, err)
window.Start()
// 快速添加大量小数据
for i := 0; i < 1000; i++ {
row := types.Row{
Data: map[string]interface{}{"id": i},
Timestamp: time.Now(),
}
window.Add(row)
}
// 等待处理完成
time.Sleep(time.Millisecond * 100)
})
}
// TestWindowErrorConditions 测试窗口的错误条件
func TestWindowErrorConditions(t *testing.T) {
t.Run("add to stopped window", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
// 向已停止的窗口添加数据不应该panic
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
})
t.Run("add invalid data types", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
// 添加包含不可序列化数据的行
row := types.Row{
Data: map[string]interface{}{
"id": 1,
"channel": make(chan int),
"func": func() {},
},
Timestamp: time.Now(),
}
window.Add(row)
})
t.Run("add row with zero timestamp", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
// 添加时间戳为零值的行
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Time{},
}
window.Add(row)
})
t.Run("add row with future timestamp", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
// 添加未来时间戳的行
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now().Add(time.Hour),
}
window.Add(row)
})
t.Run("add row with very old timestamp", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
require.Nil(t, err)
window.Start()
// 添加很久以前的时间戳的行
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now().Add(-time.Hour * 24),
}
window.Add(row)
})
}
// TestWindowStatsAndMetrics 测试窗口的统计和指标
func TestWindowStatsAndMetrics(t *testing.T) {
t.Run("get stats from tumbling window", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
assert.Nil(t, err)
// 获取统计信息不应该panic
stats := window.GetStats()
_ = stats
})
t.Run("reset stats", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
assert.Nil(t, err)
window.Start()
// 添加一些数据
row := types.Row{
Data: map[string]interface{}{"id": 1},
Timestamp: time.Now(),
}
window.Add(row)
// 重置统计信息不应该panic
window.ResetStats()
})
t.Run("get output channel", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
window.SetCallback(func(results []types.Row) {})
}
assert.Nil(t, err)
// 获取输出通道不应该panic
outputChan := window.OutputChan()
_ = outputChan
})
t.Run("set callback", func(t *testing.T) {
config := types.WindowConfig{
Params: map[string]interface{}{
"size": time.Second,
},
}
window, err := NewTumblingWindow(config)
if err == nil {
// 设置新的回调函数不应该panic
newCallback := func(rows []types.Row) {
// 新的回调逻辑
}
window.SetCallback(newCallback)
}
})
}
// TestWindowWithPerformanceConfig 测试窗口性能配置
func TestWindowWithPerformanceConfig(t *testing.T) {
tests := []struct {
name string
windowType string
performanceConfig types.PerformanceConfig
expectedBufferSize int
extraParams map[string]interface{}
}{
{
name: "滚动窗口-默认配置",
windowType: TypeTumbling,
performanceConfig: types.DefaultPerformanceConfig(),
expectedBufferSize: 50,
extraParams: map[string]interface{}{"size": "2s"},
},
{
name: "滚动窗口-高性能配置",
windowType: TypeTumbling,
performanceConfig: types.HighPerformanceConfig(),
expectedBufferSize: 200,
extraParams: map[string]interface{}{"size": "2s"},
},
{
name: "滚动窗口-低延迟配置",
windowType: TypeTumbling,
performanceConfig: types.LowLatencyConfig(),
expectedBufferSize: 20,
extraParams: map[string]interface{}{"size": "2s"},
},
{
name: "滑动窗口-高性能配置",
windowType: TypeSliding,
performanceConfig: types.HighPerformanceConfig(),
expectedBufferSize: 200,
extraParams: map[string]interface{}{"size": "10s", "slide": "5s"},
},
{
name: "计数窗口-高性能配置",
windowType: TypeCounting,
performanceConfig: types.HighPerformanceConfig(),
expectedBufferSize: 20, // 200 / 10
extraParams: map[string]interface{}{"count": 10},
},
{
name: "自定义性能配置",
windowType: TypeTumbling,
performanceConfig: types.PerformanceConfig{BufferConfig: types.BufferConfig{WindowOutputSize: 500}},
expectedBufferSize: 500,
extraParams: map[string]interface{}{"size": "2s"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := types.WindowConfig{
Type: tt.windowType,
Params: make(map[string]interface{}),
}
// 合并参数
for k, v := range tt.extraParams {
config.Params[k] = v
}
config.Params["performanceConfig"] = tt.performanceConfig
var window Window
var err error
switch tt.windowType {
case TypeTumbling:
window, err = NewTumblingWindow(config)
case TypeSliding:
window, err = NewSlidingWindow(config)
case TypeCounting:
window, err = NewCountingWindow(config)
case TypeSession:
window, err = NewSessionWindow(config)
}
assert.NoError(t, err)
assert.Equal(t, tt.expectedBufferSize, cap(window.OutputChan()))
if closer, ok := window.(interface{ Stop() }); ok {
closer.Stop()
}
})
}
t.Run("无性能配置-使用默认值", func(t *testing.T) {
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "3s",
},
}
tw, err := NewTumblingWindow(config)
assert.NoError(t, err)
assert.Equal(t, 1000, cap(tw.outputChan))
tw.Stop()
})
}
// TestGetTimestampEdgeCases 测试GetTimestamp函数的边缘情况
func TestGetTimestampEdgeCases(t *testing.T) {
tests := []struct {
name string
data interface{}
tsProp string
timeUnit time.Duration
checkNow bool
}{
{
name: "空字符串时间戳属性",
data: map[string]interface{}{"value": 42},
tsProp: "",
timeUnit: time.Second,
checkNow: true,
},
{
name: "结构体中不存在的字段",
data: struct {
Value int
}{Value: 42},
tsProp: "NonExistentField",
timeUnit: time.Second,
checkNow: true,
},
{
name: "map中不存在的键",
data: map[string]interface{}{
"value": 42,
},
tsProp: "nonexistent",
timeUnit: time.Second,
checkNow: true,
},
{
name: "map中非时间类型的值",
data: map[string]interface{}{
"timestamp": "not a time",
},
tsProp: "timestamp",
timeUnit: time.Second,
checkNow: true,
},
{
name: "非字符串键的map",
data: map[int]interface{}{
1: time.Now(),
},
tsProp: "timestamp",
timeUnit: time.Second,
checkNow: true,
},
{
name: "nil数据",
data: nil,
tsProp: "timestamp",
timeUnit: time.Second,
checkNow: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := GetTimestamp(tt.data, tt.tsProp, tt.timeUnit)
if tt.checkNow {
// 检查返回的时间是否接近当前时间允许1秒误差
assert.WithinDuration(t, time.Now(), result, time.Second)
}
})
}
}
// TestSessionWindowSessionKey 测试会话窗口的会话键提取
func TestSessionWindowSessionKey(t *testing.T) {
config := types.WindowConfig{
Type: TypeSession,
Params: map[string]interface{}{
"timeout": "5s",
},
GroupByKey: "user_id",
}
sw, err := NewSessionWindow(config)
assert.NoError(t, err)
// 测试不同类型的数据
tests := []struct {
name string
data interface{}
}{
{
name: "map数据",
data: map[string]interface{}{
"user_id": "user123",
"value": 100,
},
},
{
name: "结构体数据",
data: struct {
UserID string `json:"user_id"`
Value int
}{UserID: "user456", Value: 200},
},
{
name: "无效数据",
data: "invalid",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 这里只是测试Add方法不会panic
assert.NotPanics(t, func() {
sw.Add(tt.data)
})
})
}
}
// TestWindowStopBeforeStart 测试在启动前停止窗口
func TestWindowStopBeforeStart(t *testing.T) {
tests := []struct {
name string
config types.WindowConfig
}{
{
name: "滚动窗口",
config: types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{"size": "1s"},
},
},
{
name: "滑动窗口",
config: types.WindowConfig{
Type: TypeSliding,
Params: map[string]interface{}{
"size": "2s",
"slide": "1s",
},
},
},
{
name: "计数窗口",
config: types.WindowConfig{
Type: TypeCounting,
Params: map[string]interface{}{"count": 10},
},
},
{
name: "会话窗口",
config: types.WindowConfig{
Type: TypeSession,
Params: map[string]interface{}{"timeout": "5s"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
window, err := CreateWindow(tt.config)
assert.NoError(t, err)
// 在启动前停止窗口应该不会panic
assert.NotPanics(t, func() {
if tw, ok := window.(*TumblingWindow); ok {
tw.Stop()
} else if sw, ok := window.(*SlidingWindow); ok {
sw.Stop()
} else if cw, ok := window.(*CountingWindow); ok {
// CountingWindow doesn't have Stop method
_ = cw
} else if sesw, ok := window.(*SessionWindow); ok {
sesw.Stop()
}
})
})
}
}
// TestWindowMultipleStops 测试多次停止窗口
func TestWindowMultipleStops(t *testing.T) {
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{"size": "1s"},
}
tw, err := NewTumblingWindow(config)
assert.NoError(t, err)
tw.Start()
// 多次停止应该不会panic
assert.NotPanics(t, func() {
tw.Stop()
tw.Stop()
tw.Stop()
})
}
// TestWindowAddAfterStop 测试停止后添加数据
func TestWindowAddAfterStop(t *testing.T) {
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{"size": "1s"},
}
tw, err := NewTumblingWindow(config)
assert.NoError(t, err)
tw.Start()
tw.Stop()
// 停止后添加数据应该不会panic
assert.NotPanics(t, func() {
tw.Add(map[string]interface{}{"value": 42})
})
}
// TestCountingWindowWithCallback 测试计数窗口的回调功能
func TestCountingWindowWithCallback(t *testing.T) {
var mu sync.Mutex
callbackData := make([][]types.Row, 0)
callback := func(results []types.Row) {
mu.Lock()
defer mu.Unlock()
callbackData = append(callbackData, results)
}
config := types.WindowConfig{
Type: TypeCounting,
Params: map[string]interface{}{
"count": 2,
"callback": callback,
},
}
cw, err := NewCountingWindow(config)
assert.NoError(t, err)
cw.Start()
// CountingWindow doesn't have Stop method, will be handled by context cancellation
// 添加数据
cw.Add(map[string]interface{}{"value": 1})
cw.Add(map[string]interface{}{"value": 2})
// 等待处理
time.Sleep(100 * time.Millisecond)
// 检查回调是否被调用
assert.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(callbackData) > 0
}, time.Second, 10*time.Millisecond)
}
// TestSlidingWindowInvalidParams 测试滑动窗口的无效参数
func TestSlidingWindowInvalidParams(t *testing.T) {
tests := []struct {
name string
params map[string]interface{}
}{
{
name: "无效的slide参数",
params: map[string]interface{}{
"size": "10s",
"slide": "invalid",
},
},
{
name: "缺少slide参数",
params: map[string]interface{}{
"size": "10s",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := types.WindowConfig{
Type: TypeSliding,
Params: tt.params,
}
_, err := NewSlidingWindow(config)
assert.Error(t, err)
})
}
}
// TestWindowUnifiedConfigIntegration 集成测试:验证窗口配置与实际数据处理的集成
func TestWindowUnifiedConfigIntegration(t *testing.T) {
t.Run("性能配置集成测试", func(t *testing.T) {
performanceConfig := types.HighPerformanceConfig()
windowConfig := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "1s",
"performanceConfig": performanceConfig,
},
}
tw, err := NewTumblingWindow(windowConfig)
assert.NoError(t, err)
defer tw.Stop()
// 验证缓冲区大小
assert.Equal(t, 200, cap(tw.outputChan))
// 启动窗口
tw.Start()
// 发送测试数据
for i := 0; i < 10; i++ {
tw.Add(map[string]interface{}{
"id": i,
"value": i * 10,
})
}
// 等待窗口触发
time.Sleep(1200 * time.Millisecond)
// 验证窗口能正常工作
select {
case data := <-tw.OutputChan():
assert.Greater(t, len(data), 0)
assert.LessOrEqual(t, len(data), 10)
case <-time.After(500 * time.Millisecond):
t.Error("超时未接收到窗口输出")
}
})
t.Run("缓冲区溢出处理", func(t *testing.T) {
// 创建一个小缓冲区的窗口
smallBufferConfig := types.PerformanceConfig{
BufferConfig: types.BufferConfig{
WindowOutputSize: 1, // 非常小的缓冲区
},
}
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "100ms",
"performanceConfig": smallBufferConfig,
},
}
tw, err := NewTumblingWindow(config)
assert.NoError(t, err)
tw.Start()
defer tw.Stop()
// 快速添加大量数据,可能导致缓冲区溢出
for i := 0; i < 10; i++ {
tw.Add(map[string]interface{}{"value": i})
}
// 等待处理
time.Sleep(200 * time.Millisecond)
// 检查统计信息
stats := tw.GetStats()
assert.Contains(t, stats, "dropped_count")
assert.Contains(t, stats, "sent_count")
})
}
// TestCreateWindow 测试窗口工厂函数
func TestCreateWindow(t *testing.T) {
tests := []struct {
name string
config types.WindowConfig
expectError bool
expectedType string
}{
{
name: "创建滚动窗口",
config: types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "5s",
},
},
expectError: false,
expectedType: "*window.TumblingWindow",
},
{
name: "创建滑动窗口",
config: types.WindowConfig{
Type: TypeSliding,
Params: map[string]interface{}{
"size": "10s",
"slide": "5s",
},
},
expectError: false,
expectedType: "*window.SlidingWindow",
},
{
name: "创建计数窗口",
config: types.WindowConfig{
Type: TypeCounting,
Params: map[string]interface{}{
"count": 100,
},
},
expectError: false,
expectedType: "*window.CountingWindow",
},
{
name: "创建会话窗口",
config: types.WindowConfig{
Type: TypeSession,
Params: map[string]interface{}{
"timeout": "30s",
},
},
expectError: false,
expectedType: "*window.SessionWindow",
},
{
name: "窗口工厂与统一配置集成",
config: types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "5s",
"performanceConfig": types.PerformanceConfig{
BufferConfig: types.BufferConfig{
WindowOutputSize: 1500,
},
},
},
},
expectError: false,
expectedType: "*window.TumblingWindow",
},
{
name: "无效的窗口类型",
config: types.WindowConfig{
Type: "invalid",
Params: map[string]interface{}{
"size": "5s",
},
},
expectError: true,
expectedType: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
window, err := CreateWindow(tt.config)
if tt.expectError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.NotNil(t, window)
assert.Equal(t, tt.expectedType, getTypeString(window))
// 验证窗口能正常工作
if closer, ok := window.(interface{ Stop() }); ok {
closer.Stop()
}
})
}
}
// TestGetTimestampCoverage 测试时间戳提取函数
func TestGetTimestampCoverage(t *testing.T) {
testTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
tests := []struct {
name string
data interface{}
tsProp string
timeUnit time.Duration
expected time.Time
}{
{
name: "使用GetTimestamp接口",
data: TestDate2{ts: testTime},
tsProp: "",
timeUnit: time.Second,
expected: testTime,
},
{
name: "从结构体字段提取时间戳",
data: struct {
Timestamp time.Time
Value int
}{Timestamp: testTime, Value: 42},
tsProp: "Timestamp",
timeUnit: time.Second,
expected: testTime,
},
{
name: "从map中提取时间戳",
data: map[string]interface{}{
"timestamp": testTime,
"value": 42,
},
tsProp: "timestamp",
timeUnit: time.Second,
expected: testTime,
},
{
name: "从map中提取int64时间戳",
data: map[string]interface{}{
"timestamp": testTime.Unix(),
},
tsProp: "timestamp",
timeUnit: time.Second,
expected: time.Unix(testTime.Unix(), 0),
},
{
name: "无法提取时间戳,使用当前时间",
data: "invalid data",
tsProp: "nonexistent",
timeUnit: time.Second,
// expected will be checked with time tolerance
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := GetTimestamp(tt.data, tt.tsProp, tt.timeUnit)
if tt.name == "无法提取时间戳,使用当前时间" {
// 检查返回的时间是否接近当前时间允许1秒误差
assert.WithinDuration(t, time.Now(), result, time.Second)
} else {
assert.Equal(t, tt.expected, result)
}
})
}
}
// TestWindowErrorHandling 测试窗口错误处理
func TestWindowErrorHandling(t *testing.T) {
t.Run("滚动窗口无效大小", func(t *testing.T) {
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "invalid",
},
}
_, err := NewTumblingWindow(config)
assert.Error(t, err)
})
t.Run("滑动窗口无效参数", func(t *testing.T) {
config := types.WindowConfig{
Type: TypeSliding,
Params: map[string]interface{}{
"size": "invalid",
"slide": "5s",
},
}
_, err := NewSlidingWindow(config)
assert.Error(t, err)
})
t.Run("计数窗口无效计数", func(t *testing.T) {
config := types.WindowConfig{
Type: TypeCounting,
Params: map[string]interface{}{
"count": 0,
},
}
_, err := NewCountingWindow(config)
assert.Error(t, err)
})
t.Run("会话窗口无效超时", func(t *testing.T) {
config := types.WindowConfig{
Type: TypeSession,
Params: map[string]interface{}{
"timeout": "invalid",
},
}
_, err := NewSessionWindow(config)
assert.Error(t, err)
})
}
// TestSessionWindowAdvanced 测试会话窗口的高级功能
func TestSessionWindowAdvanced(t *testing.T) {
config := types.WindowConfig{
Type: TypeSession,
Params: map[string]interface{}{
"timeout": "1s",
},
GroupByKey: "user_id",
}
sw, err := NewSessionWindow(config)
assert.NoError(t, err)
assert.NotNil(t, sw)
// 测试设置回调函数
sw.SetCallback(func(results []types.Row) {
// Callback executed
})
// 启动窗口
sw.Start()
defer sw.Stop()
// 添加不同用户的数据
sw.Add(map[string]interface{}{
"user_id": "user1",
"value": 100,
})
sw.Add(map[string]interface{}{
"user_id": "user2",
"value": 200,
})
// 等待会话超时
time.Sleep(1500 * time.Millisecond)
// 检查输出通道
select {
case data := <-sw.OutputChan():
assert.NotEmpty(t, data)
case <-time.After(500 * time.Millisecond):
// 可能没有数据输出,这也是正常的
}
// 测试重置功能
sw.Reset()
// 测试手动触发
sw.Trigger()
}
// TestSlidingWindowAdvanced 测试滑动窗口的高级功能
func TestSlidingWindowAdvanced(t *testing.T) {
config := types.WindowConfig{
Type: TypeSliding,
Params: map[string]interface{}{
"size": "2s",
"slide": "1s",
},
TsProp: "timestamp",
TimeUnit: time.Second,
}
sw, err := NewSlidingWindow(config)
assert.NoError(t, err)
assert.NotNil(t, sw)
// 测试获取输出通道
outputChan := sw.OutputChan()
assert.NotNil(t, outputChan)
// 测试重置功能
sw.Reset()
// 测试手动触发
sw.Trigger()
}
// TestCountingWindowAdvanced 测试计数窗口的高级功能
func TestCountingWindowAdvanced(t *testing.T) {
config := types.WindowConfig{
Type: TypeCounting,
Params: map[string]interface{}{
"count": 3,
},
TsProp: "timestamp",
TimeUnit: time.Second,
}
cw, err := NewCountingWindow(config)
assert.NoError(t, err)
assert.NotNil(t, cw)
// 测试设置回调函数
cw.SetCallback(func(results []types.Row) {
// Callback executed
})
// 启动窗口
cw.Start()
// CountingWindow doesn't have Stop method
// 添加数据直到达到阈值
for i := 0; i < 3; i++ {
cw.Add(map[string]interface{}{
"timestamp": time.Now().Unix(),
"value": i,
})
}
// 等待一段时间让窗口处理数据
time.Sleep(100 * time.Millisecond)
// 检查输出通道
select {
case data := <-cw.OutputChan():
assert.Len(t, data, 3)
case <-time.After(500 * time.Millisecond):
// 可能没有数据输出,这也是正常的
}
// 测试重置功能
cw.Reset()
// 测试手动触发
cw.Trigger()
}
// TestTumblingWindowAdvanced 测试滚动窗口的高级功能
func TestTumblingWindowAdvanced(t *testing.T) {
config := types.WindowConfig{
Type: TypeTumbling,
Params: map[string]interface{}{
"size": "1s",
},
TsProp: "timestamp",
TimeUnit: time.Second,
}
tw, err := NewTumblingWindow(config)
assert.NoError(t, err)
assert.NotNil(t, tw)
// 检查统计信息
stats := tw.GetStats()
assert.Contains(t, stats, "sent_count")
assert.Contains(t, stats, "dropped_count")
// 测试重置统计信息
tw.ResetStats()
stats = tw.GetStats()
assert.Equal(t, int64(0), stats["droppedCount"])
assert.Equal(t, int64(0), stats["sentCount"])
// 测试设置回调函数
tw.SetCallback(func(results []types.Row) {
// Callback executed
})
// 测试获取输出通道
outputChan := tw.OutputChan()
assert.NotNil(t, outputChan)
// 测试重置功能
tw.Reset()
// 测试手动触发
tw.Trigger()
}