forked from GiteaTest2015/streamsql
1470 lines
32 KiB
Go
1470 lines
32 KiB
Go
package window
|
||
|
||
import (
|
||
"reflect"
|
||
"sync"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/rulego/streamsql/types"
|
||
"github.com/stretchr/testify/assert"
|
||
"github.com/stretchr/testify/require"
|
||
)
|
||
|
||
// getTypeString 获取对象的类型字符串表示
|
||
func getTypeString(obj interface{}) string {
|
||
if obj == nil {
|
||
return ""
|
||
}
|
||
return reflect.TypeOf(obj).String()
|
||
}
|
||
|
||
// TestWindowEdgeCases 测试窗口的边界条件
|
||
func TestWindowEdgeCases(t *testing.T) {
|
||
t.Run("tumbling window with zero duration", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Duration(0),
|
||
},
|
||
}
|
||
_, err := NewTumblingWindow(config)
|
||
// 零持续时间可能是有效的,取决于实现
|
||
_ = err
|
||
})
|
||
|
||
t.Run("tumbling window with negative duration", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": -time.Second,
|
||
},
|
||
}
|
||
_, err := NewTumblingWindow(config)
|
||
// 负持续时间可能是有效的,取决于实现
|
||
_ = err
|
||
})
|
||
|
||
t.Run("sliding window with zero window size", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Duration(0),
|
||
"slide": time.Second,
|
||
},
|
||
}
|
||
_, err := NewSlidingWindow(config)
|
||
// 零滑动间隔可能是有效的,取决于实现
|
||
_ = err
|
||
})
|
||
|
||
t.Run("sliding window with zero slide interval", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Minute,
|
||
"slide": time.Duration(0),
|
||
},
|
||
}
|
||
_, err := NewSlidingWindow(config)
|
||
// 零滑动间隔可能是有效的,取决于实现
|
||
_ = err
|
||
})
|
||
|
||
t.Run("sliding window with slide larger than window", func(t *testing.T) {
|
||
// 这种情况可能是有效的,取决于实现
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
"slide": time.Minute,
|
||
},
|
||
}
|
||
window, err := NewSlidingWindow(config)
|
||
_ = window
|
||
_ = err
|
||
})
|
||
|
||
t.Run("counting window with zero count", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"count": 0,
|
||
},
|
||
}
|
||
_, err := NewCountingWindow(config)
|
||
require.NotNil(t, err)
|
||
})
|
||
|
||
t.Run("counting window with negative count", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"count": -10,
|
||
},
|
||
}
|
||
_, err := NewCountingWindow(config)
|
||
require.NotNil(t, err)
|
||
})
|
||
|
||
t.Run("session window with zero timeout", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"timeout": time.Duration(0),
|
||
},
|
||
}
|
||
_, err := NewSessionWindow(config)
|
||
// 零超时可能是有效的,取决于实现
|
||
_ = err
|
||
})
|
||
|
||
t.Run("session window with negative timeout", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"timeout": -time.Second,
|
||
},
|
||
}
|
||
_, err := NewSessionWindow(config)
|
||
// 负超时可能是有效的,取决于实现
|
||
_ = err
|
||
})
|
||
}
|
||
|
||
// TestWindowWithNilCallback 测试窗口使用nil回调函数
|
||
func TestWindowWithNilCallback(t *testing.T) {
|
||
t.Run("tumbling window with nil callback", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
require.NotNil(t, window)
|
||
window.Start()
|
||
|
||
// 添加数据不应该panic
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": 1},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
}
|
||
})
|
||
|
||
t.Run("sliding window with nil callback", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Minute,
|
||
"slide": time.Second,
|
||
},
|
||
}
|
||
window, err := NewSlidingWindow(config)
|
||
if err == nil {
|
||
require.NotNil(t, window)
|
||
window.Start()
|
||
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": 1},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
}
|
||
})
|
||
|
||
t.Run("counting window with nil callback", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"count": 10,
|
||
},
|
||
}
|
||
window, err := NewCountingWindow(config)
|
||
if err == nil {
|
||
require.NotNil(t, window)
|
||
window.Start()
|
||
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": 1},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
}
|
||
})
|
||
|
||
t.Run("session window with nil callback", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"timeout": time.Minute,
|
||
},
|
||
}
|
||
window, err := NewSessionWindow(config)
|
||
if err == nil {
|
||
require.NotNil(t, window)
|
||
window.Start()
|
||
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": 1},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
}
|
||
})
|
||
}
|
||
|
||
// TestWindowConcurrency 测试窗口的并发安全性
|
||
func TestWindowConcurrency(t *testing.T) {
|
||
t.Run("concurrent add to tumbling window", func(t *testing.T) {
|
||
var receivedData [][]types.Row
|
||
var mu sync.Mutex
|
||
|
||
callback := func(rows []types.Row) {
|
||
mu.Lock()
|
||
receivedData = append(receivedData, rows)
|
||
mu.Unlock()
|
||
}
|
||
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Millisecond * 100,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(callback)
|
||
}
|
||
require.Nil(t, err)
|
||
|
||
window.Start()
|
||
|
||
var wg sync.WaitGroup
|
||
numGoroutines := 10
|
||
numRowsPerGoroutine := 50
|
||
|
||
for i := 0; i < numGoroutines; i++ {
|
||
wg.Add(1)
|
||
go func(goroutineID int) {
|
||
defer wg.Done()
|
||
for j := 0; j < numRowsPerGoroutine; j++ {
|
||
row := types.Row{
|
||
Data: map[string]interface{}{
|
||
"id": goroutineID*1000 + j,
|
||
"value": float64(j),
|
||
},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
|
||
// 等待窗口处理完成
|
||
time.Sleep(time.Millisecond * 200)
|
||
})
|
||
|
||
t.Run("concurrent start stop", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(func(results []types.Row) {})
|
||
}
|
||
require.Nil(t, err)
|
||
|
||
var wg sync.WaitGroup
|
||
numGoroutines := 5
|
||
|
||
for i := 0; i < numGoroutines; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
window.Start()
|
||
time.Sleep(time.Millisecond * 10)
|
||
|
||
}()
|
||
}
|
||
|
||
wg.Wait()
|
||
})
|
||
|
||
t.Run("concurrent add and stop", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(func(results []types.Row) {})
|
||
}
|
||
require.Nil(t, err)
|
||
|
||
window.Start()
|
||
|
||
var wg sync.WaitGroup
|
||
|
||
// 一个goroutine添加数据
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
for i := 0; i < 100; i++ {
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": i},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
time.Sleep(time.Millisecond)
|
||
}
|
||
}()
|
||
|
||
// 另一个goroutine停止窗口
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
time.Sleep(time.Millisecond * 50)
|
||
|
||
}()
|
||
|
||
wg.Wait()
|
||
})
|
||
}
|
||
|
||
// TestWindowMemoryManagement 测试窗口的内存管理
|
||
func TestWindowMemoryManagement(t *testing.T) {
|
||
t.Run("large data in tumbling window", func(t *testing.T) {
|
||
var processedCount int
|
||
callback := func(rows []types.Row) {
|
||
processedCount += len(rows)
|
||
}
|
||
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Millisecond * 50,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(callback)
|
||
}
|
||
require.Nil(t, err)
|
||
|
||
window.Start()
|
||
|
||
// 添加大量数据
|
||
largeData := make([]byte, 1024*1024) // 1MB
|
||
for i := range largeData {
|
||
largeData[i] = byte(i % 256)
|
||
}
|
||
|
||
for i := 0; i < 10; i++ {
|
||
row := types.Row{
|
||
Data: map[string]interface{}{
|
||
"id": i,
|
||
"data": string(largeData),
|
||
},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
}
|
||
|
||
// 等待处理完成
|
||
time.Sleep(time.Millisecond * 200)
|
||
})
|
||
|
||
t.Run("rapid data addition", func(t *testing.T) {
|
||
var processedCount int
|
||
var mu sync.Mutex
|
||
|
||
callback := func(rows []types.Row) {
|
||
mu.Lock()
|
||
processedCount += len(rows)
|
||
mu.Unlock()
|
||
}
|
||
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Millisecond * 10,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(callback)
|
||
}
|
||
require.Nil(t, err)
|
||
|
||
window.Start()
|
||
|
||
// 快速添加大量小数据
|
||
for i := 0; i < 1000; i++ {
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": i},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
}
|
||
|
||
// 等待处理完成
|
||
time.Sleep(time.Millisecond * 100)
|
||
})
|
||
}
|
||
|
||
// TestWindowErrorConditions 测试窗口的错误条件
|
||
func TestWindowErrorConditions(t *testing.T) {
|
||
t.Run("add to stopped window", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(func(results []types.Row) {})
|
||
}
|
||
require.Nil(t, err)
|
||
|
||
window.Start()
|
||
|
||
// 向已停止的窗口添加数据不应该panic
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": 1},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
})
|
||
|
||
t.Run("add invalid data types", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(func(results []types.Row) {})
|
||
}
|
||
require.Nil(t, err)
|
||
|
||
window.Start()
|
||
|
||
// 添加包含不可序列化数据的行
|
||
row := types.Row{
|
||
Data: map[string]interface{}{
|
||
"id": 1,
|
||
"channel": make(chan int),
|
||
"func": func() {},
|
||
},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
})
|
||
|
||
t.Run("add row with zero timestamp", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(func(results []types.Row) {})
|
||
}
|
||
require.Nil(t, err)
|
||
|
||
window.Start()
|
||
|
||
// 添加时间戳为零值的行
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": 1},
|
||
Timestamp: time.Time{},
|
||
}
|
||
window.Add(row)
|
||
})
|
||
|
||
t.Run("add row with future timestamp", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(func(results []types.Row) {})
|
||
}
|
||
require.Nil(t, err)
|
||
|
||
window.Start()
|
||
|
||
// 添加未来时间戳的行
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": 1},
|
||
Timestamp: time.Now().Add(time.Hour),
|
||
}
|
||
window.Add(row)
|
||
})
|
||
|
||
t.Run("add row with very old timestamp", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(func(results []types.Row) {})
|
||
}
|
||
require.Nil(t, err)
|
||
|
||
window.Start()
|
||
|
||
// 添加很久以前的时间戳的行
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": 1},
|
||
Timestamp: time.Now().Add(-time.Hour * 24),
|
||
}
|
||
window.Add(row)
|
||
})
|
||
}
|
||
|
||
// TestWindowStatsAndMetrics 测试窗口的统计和指标
|
||
func TestWindowStatsAndMetrics(t *testing.T) {
|
||
t.Run("get stats from tumbling window", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(func(results []types.Row) {})
|
||
}
|
||
assert.Nil(t, err)
|
||
|
||
// 获取统计信息不应该panic
|
||
stats := window.GetStats()
|
||
_ = stats
|
||
})
|
||
|
||
t.Run("reset stats", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(func(results []types.Row) {})
|
||
}
|
||
assert.Nil(t, err)
|
||
|
||
window.Start()
|
||
|
||
// 添加一些数据
|
||
row := types.Row{
|
||
Data: map[string]interface{}{"id": 1},
|
||
Timestamp: time.Now(),
|
||
}
|
||
window.Add(row)
|
||
|
||
// 重置统计信息不应该panic
|
||
window.ResetStats()
|
||
})
|
||
|
||
t.Run("get output channel", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
window.SetCallback(func(results []types.Row) {})
|
||
}
|
||
assert.Nil(t, err)
|
||
|
||
// 获取输出通道不应该panic
|
||
outputChan := window.OutputChan()
|
||
_ = outputChan
|
||
})
|
||
|
||
t.Run("set callback", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Params: map[string]interface{}{
|
||
"size": time.Second,
|
||
},
|
||
}
|
||
window, err := NewTumblingWindow(config)
|
||
if err == nil {
|
||
// 设置新的回调函数不应该panic
|
||
newCallback := func(rows []types.Row) {
|
||
// 新的回调逻辑
|
||
}
|
||
window.SetCallback(newCallback)
|
||
}
|
||
})
|
||
}
|
||
|
||
// TestWindowWithPerformanceConfig 测试窗口性能配置
|
||
func TestWindowWithPerformanceConfig(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
windowType string
|
||
performanceConfig types.PerformanceConfig
|
||
expectedBufferSize int
|
||
extraParams map[string]interface{}
|
||
}{
|
||
{
|
||
name: "滚动窗口-默认配置",
|
||
windowType: TypeTumbling,
|
||
performanceConfig: types.DefaultPerformanceConfig(),
|
||
expectedBufferSize: 50,
|
||
extraParams: map[string]interface{}{"size": "2s"},
|
||
},
|
||
{
|
||
name: "滚动窗口-高性能配置",
|
||
windowType: TypeTumbling,
|
||
performanceConfig: types.HighPerformanceConfig(),
|
||
expectedBufferSize: 200,
|
||
extraParams: map[string]interface{}{"size": "2s"},
|
||
},
|
||
{
|
||
name: "滚动窗口-低延迟配置",
|
||
windowType: TypeTumbling,
|
||
performanceConfig: types.LowLatencyConfig(),
|
||
expectedBufferSize: 20,
|
||
extraParams: map[string]interface{}{"size": "2s"},
|
||
},
|
||
{
|
||
name: "滑动窗口-高性能配置",
|
||
windowType: TypeSliding,
|
||
performanceConfig: types.HighPerformanceConfig(),
|
||
expectedBufferSize: 200,
|
||
extraParams: map[string]interface{}{"size": "10s", "slide": "5s"},
|
||
},
|
||
{
|
||
name: "计数窗口-高性能配置",
|
||
windowType: TypeCounting,
|
||
performanceConfig: types.HighPerformanceConfig(),
|
||
expectedBufferSize: 20, // 200 / 10
|
||
extraParams: map[string]interface{}{"count": 10},
|
||
},
|
||
{
|
||
name: "自定义性能配置",
|
||
windowType: TypeTumbling,
|
||
performanceConfig: types.PerformanceConfig{BufferConfig: types.BufferConfig{WindowOutputSize: 500}},
|
||
expectedBufferSize: 500,
|
||
extraParams: map[string]interface{}{"size": "2s"},
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: tt.windowType,
|
||
Params: make(map[string]interface{}),
|
||
}
|
||
|
||
// 合并参数
|
||
for k, v := range tt.extraParams {
|
||
config.Params[k] = v
|
||
}
|
||
config.Params["performanceConfig"] = tt.performanceConfig
|
||
|
||
var window Window
|
||
var err error
|
||
|
||
switch tt.windowType {
|
||
case TypeTumbling:
|
||
window, err = NewTumblingWindow(config)
|
||
case TypeSliding:
|
||
window, err = NewSlidingWindow(config)
|
||
case TypeCounting:
|
||
window, err = NewCountingWindow(config)
|
||
case TypeSession:
|
||
window, err = NewSessionWindow(config)
|
||
}
|
||
|
||
assert.NoError(t, err)
|
||
assert.Equal(t, tt.expectedBufferSize, cap(window.OutputChan()))
|
||
|
||
if closer, ok := window.(interface{ Stop() }); ok {
|
||
closer.Stop()
|
||
}
|
||
})
|
||
}
|
||
|
||
t.Run("无性能配置-使用默认值", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeTumbling,
|
||
Params: map[string]interface{}{
|
||
"size": "3s",
|
||
},
|
||
}
|
||
|
||
tw, err := NewTumblingWindow(config)
|
||
assert.NoError(t, err)
|
||
assert.Equal(t, 1000, cap(tw.outputChan))
|
||
tw.Stop()
|
||
})
|
||
}
|
||
|
||
// TestGetTimestampEdgeCases 测试GetTimestamp函数的边缘情况
|
||
func TestGetTimestampEdgeCases(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
data interface{}
|
||
tsProp string
|
||
timeUnit time.Duration
|
||
checkNow bool
|
||
}{
|
||
{
|
||
name: "空字符串时间戳属性",
|
||
data: map[string]interface{}{"value": 42},
|
||
tsProp: "",
|
||
timeUnit: time.Second,
|
||
checkNow: true,
|
||
},
|
||
{
|
||
name: "结构体中不存在的字段",
|
||
data: struct {
|
||
Value int
|
||
}{Value: 42},
|
||
tsProp: "NonExistentField",
|
||
timeUnit: time.Second,
|
||
checkNow: true,
|
||
},
|
||
{
|
||
name: "map中不存在的键",
|
||
data: map[string]interface{}{
|
||
"value": 42,
|
||
},
|
||
tsProp: "nonexistent",
|
||
timeUnit: time.Second,
|
||
checkNow: true,
|
||
},
|
||
{
|
||
name: "map中非时间类型的值",
|
||
data: map[string]interface{}{
|
||
"timestamp": "not a time",
|
||
},
|
||
tsProp: "timestamp",
|
||
timeUnit: time.Second,
|
||
checkNow: true,
|
||
},
|
||
{
|
||
name: "非字符串键的map",
|
||
data: map[int]interface{}{
|
||
1: time.Now(),
|
||
},
|
||
tsProp: "timestamp",
|
||
timeUnit: time.Second,
|
||
checkNow: true,
|
||
},
|
||
{
|
||
name: "nil数据",
|
||
data: nil,
|
||
tsProp: "timestamp",
|
||
timeUnit: time.Second,
|
||
checkNow: true,
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
result := GetTimestamp(tt.data, tt.tsProp, tt.timeUnit)
|
||
if tt.checkNow {
|
||
// 检查返回的时间是否接近当前时间(允许1秒误差)
|
||
assert.WithinDuration(t, time.Now(), result, time.Second)
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestSessionWindowSessionKey 测试会话窗口的会话键提取
|
||
func TestSessionWindowSessionKey(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeSession,
|
||
Params: map[string]interface{}{
|
||
"timeout": "5s",
|
||
},
|
||
GroupByKey: "user_id",
|
||
}
|
||
|
||
sw, err := NewSessionWindow(config)
|
||
assert.NoError(t, err)
|
||
|
||
// 测试不同类型的数据
|
||
tests := []struct {
|
||
name string
|
||
data interface{}
|
||
}{
|
||
{
|
||
name: "map数据",
|
||
data: map[string]interface{}{
|
||
"user_id": "user123",
|
||
"value": 100,
|
||
},
|
||
},
|
||
{
|
||
name: "结构体数据",
|
||
data: struct {
|
||
UserID string `json:"user_id"`
|
||
Value int
|
||
}{UserID: "user456", Value: 200},
|
||
},
|
||
{
|
||
name: "无效数据",
|
||
data: "invalid",
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
// 这里只是测试Add方法不会panic
|
||
assert.NotPanics(t, func() {
|
||
sw.Add(tt.data)
|
||
})
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestWindowStopBeforeStart 测试在启动前停止窗口
|
||
func TestWindowStopBeforeStart(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
config types.WindowConfig
|
||
}{
|
||
{
|
||
name: "滚动窗口",
|
||
config: types.WindowConfig{
|
||
Type: TypeTumbling,
|
||
Params: map[string]interface{}{"size": "1s"},
|
||
},
|
||
},
|
||
{
|
||
name: "滑动窗口",
|
||
config: types.WindowConfig{
|
||
Type: TypeSliding,
|
||
Params: map[string]interface{}{
|
||
"size": "2s",
|
||
"slide": "1s",
|
||
},
|
||
},
|
||
},
|
||
{
|
||
name: "计数窗口",
|
||
config: types.WindowConfig{
|
||
Type: TypeCounting,
|
||
Params: map[string]interface{}{"count": 10},
|
||
},
|
||
},
|
||
{
|
||
name: "会话窗口",
|
||
config: types.WindowConfig{
|
||
Type: TypeSession,
|
||
Params: map[string]interface{}{"timeout": "5s"},
|
||
},
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
window, err := CreateWindow(tt.config)
|
||
assert.NoError(t, err)
|
||
|
||
// 在启动前停止窗口应该不会panic
|
||
assert.NotPanics(t, func() {
|
||
if tw, ok := window.(*TumblingWindow); ok {
|
||
tw.Stop()
|
||
} else if sw, ok := window.(*SlidingWindow); ok {
|
||
sw.Stop()
|
||
} else if cw, ok := window.(*CountingWindow); ok {
|
||
// CountingWindow doesn't have Stop method
|
||
_ = cw
|
||
} else if sesw, ok := window.(*SessionWindow); ok {
|
||
sesw.Stop()
|
||
}
|
||
})
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestWindowMultipleStops 测试多次停止窗口
|
||
func TestWindowMultipleStops(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeTumbling,
|
||
Params: map[string]interface{}{"size": "1s"},
|
||
}
|
||
|
||
tw, err := NewTumblingWindow(config)
|
||
assert.NoError(t, err)
|
||
|
||
tw.Start()
|
||
|
||
// 多次停止应该不会panic
|
||
assert.NotPanics(t, func() {
|
||
tw.Stop()
|
||
tw.Stop()
|
||
tw.Stop()
|
||
})
|
||
}
|
||
|
||
// TestWindowAddAfterStop 测试停止后添加数据
|
||
func TestWindowAddAfterStop(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeTumbling,
|
||
Params: map[string]interface{}{"size": "1s"},
|
||
}
|
||
|
||
tw, err := NewTumblingWindow(config)
|
||
assert.NoError(t, err)
|
||
|
||
tw.Start()
|
||
tw.Stop()
|
||
|
||
// 停止后添加数据应该不会panic
|
||
assert.NotPanics(t, func() {
|
||
tw.Add(map[string]interface{}{"value": 42})
|
||
})
|
||
}
|
||
|
||
// TestCountingWindowWithCallback 测试计数窗口的回调功能
|
||
func TestCountingWindowWithCallback(t *testing.T) {
|
||
var mu sync.Mutex
|
||
callbackData := make([][]types.Row, 0)
|
||
callback := func(results []types.Row) {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
callbackData = append(callbackData, results)
|
||
}
|
||
|
||
config := types.WindowConfig{
|
||
Type: TypeCounting,
|
||
Params: map[string]interface{}{
|
||
"count": 2,
|
||
"callback": callback,
|
||
},
|
||
}
|
||
|
||
cw, err := NewCountingWindow(config)
|
||
assert.NoError(t, err)
|
||
|
||
cw.Start()
|
||
// CountingWindow doesn't have Stop method, will be handled by context cancellation
|
||
|
||
// 添加数据
|
||
cw.Add(map[string]interface{}{"value": 1})
|
||
cw.Add(map[string]interface{}{"value": 2})
|
||
|
||
// 等待处理
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 检查回调是否被调用
|
||
assert.Eventually(t, func() bool {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
return len(callbackData) > 0
|
||
}, time.Second, 10*time.Millisecond)
|
||
}
|
||
|
||
// TestSlidingWindowInvalidParams 测试滑动窗口的无效参数
|
||
func TestSlidingWindowInvalidParams(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
params map[string]interface{}
|
||
}{
|
||
{
|
||
name: "无效的slide参数",
|
||
params: map[string]interface{}{
|
||
"size": "10s",
|
||
"slide": "invalid",
|
||
},
|
||
},
|
||
{
|
||
name: "缺少slide参数",
|
||
params: map[string]interface{}{
|
||
"size": "10s",
|
||
},
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeSliding,
|
||
Params: tt.params,
|
||
}
|
||
_, err := NewSlidingWindow(config)
|
||
assert.Error(t, err)
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestWindowUnifiedConfigIntegration 集成测试:验证窗口配置与实际数据处理的集成
|
||
func TestWindowUnifiedConfigIntegration(t *testing.T) {
|
||
t.Run("性能配置集成测试", func(t *testing.T) {
|
||
performanceConfig := types.HighPerformanceConfig()
|
||
|
||
windowConfig := types.WindowConfig{
|
||
Type: TypeTumbling,
|
||
Params: map[string]interface{}{
|
||
"size": "1s",
|
||
"performanceConfig": performanceConfig,
|
||
},
|
||
}
|
||
|
||
tw, err := NewTumblingWindow(windowConfig)
|
||
assert.NoError(t, err)
|
||
defer tw.Stop()
|
||
|
||
// 验证缓冲区大小
|
||
assert.Equal(t, 200, cap(tw.outputChan))
|
||
|
||
// 启动窗口
|
||
tw.Start()
|
||
|
||
// 发送测试数据
|
||
for i := 0; i < 10; i++ {
|
||
tw.Add(map[string]interface{}{
|
||
"id": i,
|
||
"value": i * 10,
|
||
})
|
||
}
|
||
|
||
// 等待窗口触发
|
||
time.Sleep(1200 * time.Millisecond)
|
||
|
||
// 验证窗口能正常工作
|
||
select {
|
||
case data := <-tw.OutputChan():
|
||
assert.Greater(t, len(data), 0)
|
||
assert.LessOrEqual(t, len(data), 10)
|
||
case <-time.After(500 * time.Millisecond):
|
||
t.Error("超时未接收到窗口输出")
|
||
}
|
||
})
|
||
|
||
t.Run("缓冲区溢出处理", func(t *testing.T) {
|
||
// 创建一个小缓冲区的窗口
|
||
smallBufferConfig := types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
WindowOutputSize: 1, // 非常小的缓冲区
|
||
},
|
||
}
|
||
|
||
config := types.WindowConfig{
|
||
Type: TypeTumbling,
|
||
Params: map[string]interface{}{
|
||
"size": "100ms",
|
||
"performanceConfig": smallBufferConfig,
|
||
},
|
||
}
|
||
|
||
tw, err := NewTumblingWindow(config)
|
||
assert.NoError(t, err)
|
||
|
||
tw.Start()
|
||
defer tw.Stop()
|
||
|
||
// 快速添加大量数据,可能导致缓冲区溢出
|
||
for i := 0; i < 10; i++ {
|
||
tw.Add(map[string]interface{}{"value": i})
|
||
}
|
||
|
||
// 等待处理
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
// 检查统计信息
|
||
stats := tw.GetStats()
|
||
assert.Contains(t, stats, "dropped_count")
|
||
assert.Contains(t, stats, "sent_count")
|
||
})
|
||
}
|
||
|
||
// TestCreateWindow 测试窗口工厂函数
|
||
func TestCreateWindow(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
config types.WindowConfig
|
||
expectError bool
|
||
expectedType string
|
||
}{
|
||
{
|
||
name: "创建滚动窗口",
|
||
config: types.WindowConfig{
|
||
Type: TypeTumbling,
|
||
Params: map[string]interface{}{
|
||
"size": "5s",
|
||
},
|
||
},
|
||
expectError: false,
|
||
expectedType: "*window.TumblingWindow",
|
||
},
|
||
{
|
||
name: "创建滑动窗口",
|
||
config: types.WindowConfig{
|
||
Type: TypeSliding,
|
||
Params: map[string]interface{}{
|
||
"size": "10s",
|
||
"slide": "5s",
|
||
},
|
||
},
|
||
expectError: false,
|
||
expectedType: "*window.SlidingWindow",
|
||
},
|
||
{
|
||
name: "创建计数窗口",
|
||
config: types.WindowConfig{
|
||
Type: TypeCounting,
|
||
Params: map[string]interface{}{
|
||
"count": 100,
|
||
},
|
||
},
|
||
expectError: false,
|
||
expectedType: "*window.CountingWindow",
|
||
},
|
||
{
|
||
name: "创建会话窗口",
|
||
config: types.WindowConfig{
|
||
Type: TypeSession,
|
||
Params: map[string]interface{}{
|
||
"timeout": "30s",
|
||
},
|
||
},
|
||
expectError: false,
|
||
expectedType: "*window.SessionWindow",
|
||
},
|
||
{
|
||
name: "窗口工厂与统一配置集成",
|
||
config: types.WindowConfig{
|
||
Type: TypeTumbling,
|
||
Params: map[string]interface{}{
|
||
"size": "5s",
|
||
"performanceConfig": types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
WindowOutputSize: 1500,
|
||
},
|
||
},
|
||
},
|
||
},
|
||
expectError: false,
|
||
expectedType: "*window.TumblingWindow",
|
||
},
|
||
{
|
||
name: "无效的窗口类型",
|
||
config: types.WindowConfig{
|
||
Type: "invalid",
|
||
Params: map[string]interface{}{
|
||
"size": "5s",
|
||
},
|
||
},
|
||
expectError: true,
|
||
expectedType: "",
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
window, err := CreateWindow(tt.config)
|
||
|
||
if tt.expectError {
|
||
assert.Error(t, err)
|
||
return
|
||
}
|
||
|
||
assert.NoError(t, err)
|
||
assert.NotNil(t, window)
|
||
assert.Equal(t, tt.expectedType, getTypeString(window))
|
||
|
||
// 验证窗口能正常工作
|
||
if closer, ok := window.(interface{ Stop() }); ok {
|
||
closer.Stop()
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestGetTimestampCoverage 测试时间戳提取函数
|
||
func TestGetTimestampCoverage(t *testing.T) {
|
||
testTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
|
||
|
||
tests := []struct {
|
||
name string
|
||
data interface{}
|
||
tsProp string
|
||
timeUnit time.Duration
|
||
expected time.Time
|
||
}{
|
||
{
|
||
name: "使用GetTimestamp接口",
|
||
data: TestDate2{ts: testTime},
|
||
tsProp: "",
|
||
timeUnit: time.Second,
|
||
expected: testTime,
|
||
},
|
||
{
|
||
name: "从结构体字段提取时间戳",
|
||
data: struct {
|
||
Timestamp time.Time
|
||
Value int
|
||
}{Timestamp: testTime, Value: 42},
|
||
tsProp: "Timestamp",
|
||
timeUnit: time.Second,
|
||
expected: testTime,
|
||
},
|
||
{
|
||
name: "从map中提取时间戳",
|
||
data: map[string]interface{}{
|
||
"timestamp": testTime,
|
||
"value": 42,
|
||
},
|
||
tsProp: "timestamp",
|
||
timeUnit: time.Second,
|
||
expected: testTime,
|
||
},
|
||
{
|
||
name: "从map中提取int64时间戳",
|
||
data: map[string]interface{}{
|
||
"timestamp": testTime.Unix(),
|
||
},
|
||
tsProp: "timestamp",
|
||
timeUnit: time.Second,
|
||
expected: time.Unix(testTime.Unix(), 0),
|
||
},
|
||
{
|
||
name: "无法提取时间戳,使用当前时间",
|
||
data: "invalid data",
|
||
tsProp: "nonexistent",
|
||
timeUnit: time.Second,
|
||
// expected will be checked with time tolerance
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
result := GetTimestamp(tt.data, tt.tsProp, tt.timeUnit)
|
||
if tt.name == "无法提取时间戳,使用当前时间" {
|
||
// 检查返回的时间是否接近当前时间(允许1秒误差)
|
||
assert.WithinDuration(t, time.Now(), result, time.Second)
|
||
} else {
|
||
assert.Equal(t, tt.expected, result)
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestWindowErrorHandling 测试窗口错误处理
|
||
func TestWindowErrorHandling(t *testing.T) {
|
||
t.Run("滚动窗口无效大小", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeTumbling,
|
||
Params: map[string]interface{}{
|
||
"size": "invalid",
|
||
},
|
||
}
|
||
_, err := NewTumblingWindow(config)
|
||
assert.Error(t, err)
|
||
})
|
||
|
||
t.Run("滑动窗口无效参数", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeSliding,
|
||
Params: map[string]interface{}{
|
||
"size": "invalid",
|
||
"slide": "5s",
|
||
},
|
||
}
|
||
_, err := NewSlidingWindow(config)
|
||
assert.Error(t, err)
|
||
})
|
||
|
||
t.Run("计数窗口无效计数", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeCounting,
|
||
Params: map[string]interface{}{
|
||
"count": 0,
|
||
},
|
||
}
|
||
_, err := NewCountingWindow(config)
|
||
assert.Error(t, err)
|
||
})
|
||
|
||
t.Run("会话窗口无效超时", func(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeSession,
|
||
Params: map[string]interface{}{
|
||
"timeout": "invalid",
|
||
},
|
||
}
|
||
_, err := NewSessionWindow(config)
|
||
assert.Error(t, err)
|
||
})
|
||
}
|
||
|
||
// TestSessionWindowAdvanced 测试会话窗口的高级功能
|
||
func TestSessionWindowAdvanced(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeSession,
|
||
Params: map[string]interface{}{
|
||
"timeout": "1s",
|
||
},
|
||
GroupByKey: "user_id",
|
||
}
|
||
|
||
sw, err := NewSessionWindow(config)
|
||
assert.NoError(t, err)
|
||
assert.NotNil(t, sw)
|
||
|
||
// 测试设置回调函数
|
||
sw.SetCallback(func(results []types.Row) {
|
||
// Callback executed
|
||
})
|
||
|
||
// 启动窗口
|
||
sw.Start()
|
||
defer sw.Stop()
|
||
|
||
// 添加不同用户的数据
|
||
sw.Add(map[string]interface{}{
|
||
"user_id": "user1",
|
||
"value": 100,
|
||
})
|
||
|
||
sw.Add(map[string]interface{}{
|
||
"user_id": "user2",
|
||
"value": 200,
|
||
})
|
||
|
||
// 等待会话超时
|
||
time.Sleep(1500 * time.Millisecond)
|
||
|
||
// 检查输出通道
|
||
select {
|
||
case data := <-sw.OutputChan():
|
||
assert.NotEmpty(t, data)
|
||
case <-time.After(500 * time.Millisecond):
|
||
// 可能没有数据输出,这也是正常的
|
||
}
|
||
|
||
// 测试重置功能
|
||
sw.Reset()
|
||
|
||
// 测试手动触发
|
||
sw.Trigger()
|
||
}
|
||
|
||
// TestSlidingWindowAdvanced 测试滑动窗口的高级功能
|
||
func TestSlidingWindowAdvanced(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeSliding,
|
||
Params: map[string]interface{}{
|
||
"size": "2s",
|
||
"slide": "1s",
|
||
},
|
||
TsProp: "timestamp",
|
||
TimeUnit: time.Second,
|
||
}
|
||
|
||
sw, err := NewSlidingWindow(config)
|
||
assert.NoError(t, err)
|
||
assert.NotNil(t, sw)
|
||
|
||
// 测试获取输出通道
|
||
outputChan := sw.OutputChan()
|
||
assert.NotNil(t, outputChan)
|
||
|
||
// 测试重置功能
|
||
sw.Reset()
|
||
|
||
// 测试手动触发
|
||
sw.Trigger()
|
||
}
|
||
|
||
// TestCountingWindowAdvanced 测试计数窗口的高级功能
|
||
func TestCountingWindowAdvanced(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeCounting,
|
||
Params: map[string]interface{}{
|
||
"count": 3,
|
||
},
|
||
TsProp: "timestamp",
|
||
TimeUnit: time.Second,
|
||
}
|
||
|
||
cw, err := NewCountingWindow(config)
|
||
assert.NoError(t, err)
|
||
assert.NotNil(t, cw)
|
||
|
||
// 测试设置回调函数
|
||
cw.SetCallback(func(results []types.Row) {
|
||
// Callback executed
|
||
})
|
||
|
||
// 启动窗口
|
||
cw.Start()
|
||
// CountingWindow doesn't have Stop method
|
||
|
||
// 添加数据直到达到阈值
|
||
for i := 0; i < 3; i++ {
|
||
cw.Add(map[string]interface{}{
|
||
"timestamp": time.Now().Unix(),
|
||
"value": i,
|
||
})
|
||
}
|
||
|
||
// 等待一段时间让窗口处理数据
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 检查输出通道
|
||
select {
|
||
case data := <-cw.OutputChan():
|
||
assert.Len(t, data, 3)
|
||
case <-time.After(500 * time.Millisecond):
|
||
// 可能没有数据输出,这也是正常的
|
||
}
|
||
|
||
// 测试重置功能
|
||
cw.Reset()
|
||
|
||
// 测试手动触发
|
||
cw.Trigger()
|
||
}
|
||
|
||
// TestTumblingWindowAdvanced 测试滚动窗口的高级功能
|
||
func TestTumblingWindowAdvanced(t *testing.T) {
|
||
config := types.WindowConfig{
|
||
Type: TypeTumbling,
|
||
Params: map[string]interface{}{
|
||
"size": "1s",
|
||
},
|
||
TsProp: "timestamp",
|
||
TimeUnit: time.Second,
|
||
}
|
||
|
||
tw, err := NewTumblingWindow(config)
|
||
assert.NoError(t, err)
|
||
assert.NotNil(t, tw)
|
||
|
||
// 检查统计信息
|
||
stats := tw.GetStats()
|
||
assert.Contains(t, stats, "sent_count")
|
||
assert.Contains(t, stats, "dropped_count")
|
||
|
||
// 测试重置统计信息
|
||
tw.ResetStats()
|
||
stats = tw.GetStats()
|
||
assert.Equal(t, int64(0), stats["droppedCount"])
|
||
assert.Equal(t, int64(0), stats["sentCount"])
|
||
|
||
// 测试设置回调函数
|
||
tw.SetCallback(func(results []types.Row) {
|
||
// Callback executed
|
||
})
|
||
|
||
// 测试获取输出通道
|
||
outputChan := tw.OutputChan()
|
||
assert.NotNil(t, outputChan)
|
||
|
||
// 测试重置功能
|
||
tw.Reset()
|
||
|
||
// 测试手动触发
|
||
tw.Trigger()
|
||
}
|