package stream import ( "context" "fmt" "sync" "testing" "time" "github.com/rulego/streamsql/aggregator" "github.com/rulego/streamsql/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // TestStreamBasicOperations 测试Stream基本操作(合并了构造函数、添加数据、获取结果等测试) func TestStreamBasicOperations(t *testing.T) { tests := []struct { name string config types.Config testFunc string expectError bool }{ { name: "基本构造函数测试", config: types.Config{ SimpleFields: []string{"name", "age"}, }, testFunc: "constructor", expectError: false, }, { name: "添加数据测试", config: types.Config{ SimpleFields: []string{"name", "age"}, }, testFunc: "addData", expectError: false, }, { name: "获取结果测试", config: types.Config{ SimpleFields: []string{"name", "age"}, }, testFunc: "getResults", expectError: false, }, { name: "窗口功能测试", config: types.Config{ SimpleFields: []string{"name", "age"}, NeedWindow: true, WindowConfig: types.WindowConfig{ Type: "tumbling", TimeUnit: 1000, Params: map[string]interface{}{"size": 1 * time.Second}, }, }, testFunc: "withWindow", expectError: false, }, { name: "线程安全测试", config: types.Config{ SimpleFields: []string{"name", "age"}, }, testFunc: "threadSafety", expectError: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { stream, err := NewStream(tt.config) if tt.expectError { assert.Error(t, err) return } require.NoError(t, err) defer func() { if stream != nil { close(stream.done) } }() // 根据测试类型执行不同的测试逻辑 switch tt.testFunc { case "constructor": // 验证stream创建成功 assert.NotNil(t, stream) assert.NotNil(t, stream.dataChan) assert.NotNil(t, stream.resultChan) case "addData": // 测试添加数据 testData := map[string]interface{}{ "name": "test", "age": 25, } sent := stream.safeSendToDataChan(testData) assert.True(t, sent) case "getResults": // 测试获取结果通道 resultsChan := stream.GetResultsChan() assert.NotNil(t, resultsChan) case "withWindow": // 验证窗口配置 assert.NotNil(t, stream.Window) assert.True(t, stream.config.NeedWindow) case "threadSafety": // 测试并发安全性 var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() testData := map[string]interface{}{ "name": fmt.Sprintf("test_%d", id), "age": 20 + id, } stream.safeSendToDataChan(testData) }(i) } wg.Wait() } }) } } // TestStreamBasicFunctionality 测试Stream基本功能 func TestStreamBasicFunctionality(t *testing.T) { tests := []struct { name string config types.Config filter string testData []map[string]interface{} expectedDevice string expectedTemp float64 expectedHum float64 }{ { name: "带过滤器的窗口聚合", config: types.Config{ WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{"size": 500 * time.Millisecond}, }, GroupFields: []string{"device"}, SelectFields: map[string]aggregator.AggregateType{ "temperature": aggregator.Avg, "humidity": aggregator.Sum, }, NeedWindow: true, }, filter: "device == 'aa' && temperature > 10", testData: []map[string]interface{}{ {"device": "aa", "temperature": 25.0, "humidity": 60}, {"device": "aa", "temperature": 30.0, "humidity": 55}, {"device": "bb", "temperature": 22.0, "humidity": 70}, }, expectedDevice: "aa", expectedTemp: 27.5, expectedHum: 115.0, }, { name: "不完整数据处理", config: types.Config{ WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{"size": 500 * time.Millisecond}, }, GroupFields: []string{"device"}, SelectFields: map[string]aggregator.AggregateType{ "temperature": aggregator.Avg, "humidity": aggregator.Sum, }, NeedWindow: true, }, filter: "device == 'aa'", testData: []map[string]interface{}{ {"device": "aa", "temperature": 25.0}, {"device": "aa", "humidity": 60}, {"device": "aa", "temperature": 30.0}, {"device": "aa", "humidity": 55}, {"device": "bb", "temperature": 22.0, "humidity": 70}, }, expectedDevice: "aa", expectedTemp: 27.5, expectedHum: 115.0, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { strm, err := NewStream(tt.config) require.NoError(t, err) defer strm.Stop() if tt.filter != "" { err = strm.RegisterFilter(tt.filter) require.NoError(t, err) } // 添加 Sink 函数来捕获结果 resultChan := make(chan interface{}, 1) strm.AddSink(func(result []map[string]interface{}) { select { case resultChan <- result: default: // 防止阻塞 } }) strm.Start() // 发送测试数据 for _, data := range tt.testData { strm.Emit(data) } // 等待窗口关闭并触发结果 time.Sleep(700 * time.Millisecond) // 等待结果 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() var actual interface{} select { case actual = <-resultChan: cancel() case <-ctx.Done(): t.Fatal("No results received within 3 seconds") } // 验证结果 require.NotNil(t, actual) assert.IsType(t, []map[string]interface{}{}, actual) resultMap := actual.([]map[string]interface{}) require.Greater(t, len(resultMap), 0) firstResult := resultMap[0] assert.Equal(t, tt.expectedDevice, firstResult["device"]) assert.InEpsilon(t, tt.expectedTemp, firstResult["temperature"].(float64), 0.0001) assert.InDelta(t, tt.expectedHum, firstResult["humidity"].(float64), 0.0001) }) } } // TestStreamWithoutFilter 测试无过滤器的流处理 func TestStreamWithoutFilter(t *testing.T) { config := types.Config{ WindowConfig: types.WindowConfig{ Type: "sliding", Params: map[string]interface{}{"size": 2 * time.Second, "slide": 1 * time.Second}, }, GroupFields: []string{"device"}, SelectFields: map[string]aggregator.AggregateType{ "temperature": aggregator.Max, "humidity": aggregator.Min, }, NeedWindow: true, } strm, err := NewStream(config) require.NoError(t, err) defer strm.Stop() strm.Start() testData := []map[string]interface{}{ {"device": "aa", "temperature": 25.0, "humidity": 60}, {"device": "aa", "temperature": 30.0, "humidity": 55}, {"device": "bb", "temperature": 22.0, "humidity": 70}, } for _, data := range testData { strm.Emit(data) } // 捕获结果 resultChan := make(chan interface{}) strm.AddSink(func(result []map[string]interface{}) { resultChan <- result }) // 等待窗口触发 time.Sleep(3 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var actual interface{} select { case actual = <-resultChan: cancel() case <-ctx.Done(): t.Fatal("Timeout waiting for results") } expected := []map[string]interface{}{ {"device": "aa", "temperature": 30.0, "humidity": 55.0}, {"device": "bb", "temperature": 22.0, "humidity": 70.0}, } assert.IsType(t, []map[string]interface{}{}, actual) resultSlice := actual.([]map[string]interface{}) assert.Len(t, resultSlice, 2) for _, expectedResult := range expected { found := false for _, resultMap := range resultSlice { if resultMap["device"] == expectedResult["device"] { assert.InEpsilon(t, expectedResult["temperature"].(float64), resultMap["temperature"].(float64), 0.0001) assert.InEpsilon(t, expectedResult["humidity"].(float64), resultMap["humidity"].(float64), 0.0001) found = true break } } assert.True(t, found, "Expected result for device %v not found", expectedResult["device"]) } } // TestStreamRefactoring 测试重构后的Stream功能 func TestStreamRefactoring(t *testing.T) { config := types.Config{ SimpleFields: []string{"name", "age"}, SelectFields: map[string]aggregator.AggregateType{ "count": aggregator.Count, }, GroupFields: []string{"category"}, NeedWindow: false, } stream, err := NewStream(config) require.NoError(t, err) defer stream.Stop() testData := map[string]interface{}{ "name": "test", "age": 25, "category": "A", } // 启动Stream stream.Start() // 发送测试数据 stream.Emit(testData) // 等待处理完成 time.Sleep(100 * time.Millisecond) // 获取统计信息 stats := stream.GetStats() assert.Equal(t, int64(1), stats[InputCount]) } // TestStreamFactory 测试StreamFactory功能 func TestStreamFactory(t *testing.T) { factory := NewStreamFactory() require.NotNil(t, factory) config := types.Config{ SimpleFields: []string{"value"}, NeedWindow: false, } // 测试不同的创建方法 tests := []struct { name string create func() (*Stream, error) }{ {"CreateStream", func() (*Stream, error) { return factory.CreateStream(config) }}, {"CreateHighPerformanceStream", func() (*Stream, error) { return factory.CreateHighPerformanceStream(config) }}, {"CreateLowLatencyStream", func() (*Stream, error) { return factory.CreateLowLatencyStream(config) }}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { stream, err := tt.create() require.NoError(t, err) require.NotNil(t, stream) defer stream.Stop() }) } } // TestMigratedFunctions 测试迁移后的函数 func TestMigratedFunctions(t *testing.T) { // 测试性能评估函数 tests := []struct { name string dataUsage float64 dropRate float64 expected string }{ {"Critical", 50.0, 60.0, PerformanceLevelCritical}, {"Warning", 50.0, 30.0, PerformanceLevelWarning}, {"HighLoad", 95.0, 5.0, PerformanceLevelHighLoad}, {"ModerateLoad", 75.0, 5.0, PerformanceLevelModerateLoad}, {"Optimal", 50.0, 5.0, PerformanceLevelOptimal}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result := AssessPerformanceLevel(tt.dataUsage, tt.dropRate) assert.Equal(t, tt.expected, result) }) } } // TestStreamConstructors 测试各种Stream构造函数 func TestStreamConstructors(t *testing.T) { config := types.Config{ SimpleFields: []string{"name", "age"}, } // 测试高性能Stream highPerfStream, err := NewStreamWithHighPerformance(config) require.NoError(t, err) require.NotNil(t, highPerfStream) defer func() { if highPerfStream != nil { close(highPerfStream.done) } }() // 测试低延迟Stream lowLatencyStream, err := NewStreamWithLowLatency(config) require.NoError(t, err) require.NotNil(t, lowLatencyStream) defer func() { if lowLatencyStream != nil { close(lowLatencyStream.done) } }() // 测试自定义性能配置Stream perfConfig := types.PerformanceConfig{ WorkerConfig: types.WorkerConfig{ SinkWorkerCount: 10, }, BufferConfig: types.BufferConfig{ DataChannelSize: 1000, }, } customPerfStream, err := NewStreamWithCustomPerformance(config, perfConfig) require.NoError(t, err) require.NotNil(t, customPerfStream) defer func() { if customPerfStream != nil { close(customPerfStream.done) } }() } // TestStreamFilterRegistration 测试过滤器注册功能 func TestStreamFilterRegistration(t *testing.T) { config := types.Config{ SimpleFields: []string{"name", "age"}, } stream, err := NewStream(config) require.NoError(t, err) defer func() { if stream != nil { close(stream.done) } }() // 测试空过滤器 err = stream.RegisterFilter("") require.NoError(t, err) // 测试简单条件过滤器 err = stream.RegisterFilter("age > 18") require.NoError(t, err) // 测试带反引号的过滤器 err = stream.RegisterFilter("`user_name` == 'test'") require.NoError(t, err) // 测试LIKE语法 err = stream.RegisterFilter("name LIKE '%test%'") require.NoError(t, err) // 测试IS NULL语法 err = stream.RegisterFilter("age IS NULL") require.NoError(t, err) // 测试IS NOT NULL语法 err = stream.RegisterFilter("name IS NOT NULL") require.NoError(t, err) // 测试复杂条件 err = stream.RegisterFilter("age > 18 && name LIKE '%test%' || `user_id` IS NOT NULL") require.NoError(t, err) } // TestStreamAggregationQuery 测试聚合查询功能 func TestStreamAggregationQuery(t *testing.T) { config := types.Config{ SimpleFields: []string{"name", "age"}, SelectFields: map[string]aggregator.AggregateType{ "avg_age": aggregator.Avg, "max_age": aggregator.Max, }, NeedWindow: true, WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{ "size": "5s", }, }, } stream, err := NewStream(config) require.NoError(t, err) defer func() { if stream != nil { close(stream.done) } }() // 测试是否为聚合查询 isAggregation := stream.IsAggregationQuery() require.True(t, isAggregation) // 测试非聚合查询 simpleConfig := types.Config{ SimpleFields: []string{"name", "age"}, } simpleStream, err := NewStream(simpleConfig) require.NoError(t, err) defer func() { if simpleStream != nil { close(simpleStream.done) } }() isSimpleAggregation := simpleStream.IsAggregationQuery() require.False(t, isSimpleAggregation) } // TestStreamSyncProcessing 测试同步处理功能 func TestStreamSyncProcessing(t *testing.T) { config := types.Config{ SimpleFields: []string{"name", "age"}, } stream, err := NewStream(config) require.NoError(t, err) defer func() { if stream != nil { close(stream.done) } }() // 测试同步处理数据 testData := map[string]interface{}{ "name": "test", "age": 25, } result, err := stream.ProcessSync(testData) require.NoError(t, err) require.NotNil(t, result) require.Equal(t, "test", result["name"]) require.Equal(t, 25, result["age"]) } // TestStreamErrorHandling 测试错误处理 func TestStreamErrorHandling(t *testing.T) { // 测试无效配置 invalidConfig := types.Config{ // 故意留空以测试错误处理 } stream, err := NewStream(invalidConfig) // 这个可能会成功,因为配置验证可能比较宽松 // require.Error(t, err) if stream != nil { defer func() { close(stream.done) }() // 测试无效过滤器 err = stream.RegisterFilter("invalid syntax !!!") // 这个应该会失败 if err != nil { require.Contains(t, err.Error(), "compile filter error") } } } // TestStreamConcurrency 测试并发安全性 func TestStreamConcurrency(t *testing.T) { config := types.Config{ SimpleFields: []string{"name", "age"}, } stream, err := NewStream(config) require.NoError(t, err) defer func() { if stream != nil { close(stream.done) } }() // 启动多个goroutine并发发送数据 const numGoroutines = 10 const numMessages = 100 var wg sync.WaitGroup wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go func(id int) { defer wg.Done() for j := 0; j < numMessages; j++ { data := map[string]interface{}{ "name": fmt.Sprintf("user_%d_%d", id, j), "age": id + j, } stream.Emit(data) } }(i) } wg.Wait() } // TestStreamPerformance 测试性能相关功能 func TestStreamPerformance(t *testing.T) { config := types.Config{ SimpleFields: []string{"name", "age"}, PerformanceConfig: types.PerformanceConfig{ WorkerConfig: types.WorkerConfig{ SinkWorkerCount: 5, }, BufferConfig: types.BufferConfig{ DataChannelSize: 1000, // 增加缓冲区大小以容纳所有消息 }, }, } stream, err := NewStream(config) require.NoError(t, err) defer stream.Stop() // 启动流处理 stream.Start() // 测试大量数据处理 const numMessages = 1000 for i := 0; i < numMessages; i++ { data := map[string]interface{}{ "name": fmt.Sprintf("user_%d", i), "age": i % 100, } stream.Emit(data) } // 等待处理完成 time.Sleep(100 * time.Millisecond) } // TestStreamLifecycle 测试Stream生命周期 func TestStreamLifecycle(t *testing.T) { config := types.Config{ SimpleFields: []string{"name", "age"}, } stream, err := NewStream(config) require.NoError(t, err) // 测试启动 stream.Start() // 发送一些数据 for i := 0; i < 10; i++ { data := map[string]interface{}{ "name": fmt.Sprintf("user_%d", i), "age": i, } stream.Emit(data) } // 测试停止 stream.Stop() // 验证停止后不能发送数据(这取决于具体实现) // 这里只是测试Stop方法不会panic } // TestConvertToAggregationFields 测试聚合字段转换 func TestConvertToAggregationFields(t *testing.T) { selectFields := map[string]aggregator.AggregateType{ "avg_age": aggregator.Avg, "max_age": aggregator.Max, "count": aggregator.Count, } fieldAlias := map[string]string{ "avg_age": "age", "max_age": "age", "count": "id", } fields := convertToAggregationFields(selectFields, fieldAlias) require.Len(t, fields, 3) // 验证字段转换结果 for _, field := range fields { switch field.OutputAlias { case "avg_age": require.Equal(t, aggregator.Avg, field.AggregateType) require.Equal(t, "age", field.InputField) case "max_age": require.Equal(t, aggregator.Max, field.AggregateType) require.Equal(t, "age", field.InputField) case "count": require.Equal(t, aggregator.Count, field.AggregateType) require.Equal(t, "id", field.InputField) } } } // TestStreamWithWindowAndAggregation 测试带窗口和聚合的Stream func TestStreamWithWindowAndAggregation(t *testing.T) { config := types.Config{ SimpleFields: []string{"name", "age"}, WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{"size": 100 * time.Millisecond}, }, SelectFields: map[string]aggregator.AggregateType{ "avg_age": aggregator.Avg, "count": aggregator.Count, }, } stream, err := NewStream(config) require.NoError(t, err) defer func() { if stream != nil && stream.done != nil { select { case <-stream.done: // channel already closed default: close(stream.done) } } }() // 注册过滤器 err = stream.RegisterFilter("age > 0") require.NoError(t, err) // 启动Stream stream.Start() // 发送数据 for i := 0; i < 50; i++ { data := map[string]interface{}{ "name": fmt.Sprintf("user_%d", i), "age": i + 1, } stream.Emit(data) } // 等待处理完成 time.Sleep(200 * time.Millisecond) stream.Stop() } // TestDataChannelExpansion 测试数据通道动态扩容功能 func TestDataChannelExpansion(t *testing.T) { config := types.Config{ SimpleFields: []string{"name", "value"}, PerformanceConfig: types.PerformanceConfig{ BufferConfig: types.BufferConfig{ DataChannelSize: 10, // 小容量便于测试扩容 ResultChannelSize: 100, }, WorkerConfig: types.WorkerConfig{ SinkPoolSize: 5, MaxRetryRoutines: 2, }, }, } stream, err := NewStream(config) require.NoError(t, err) defer stream.Stop() // 启动流 stream.Start() // 发送数据测试通道功能 for i := 0; i < 8; i++ { data := map[string]interface{}{ "name": "test", "value": i, } stream.Emit(data) } // 等待处理 time.Sleep(100 * time.Millisecond) // 验证数据处理 stats := stream.GetStats() assert.NotNil(t, stats) assert.True(t, stats[InputCount] > 0) } // TestStatsCollectorDetailedFunctions 测试统计收集器的详细功能 func TestStatsCollectorDetailedFunctions(t *testing.T) { collector := NewStatsCollector() // 测试重置功能 collector.IncrementInput() collector.IncrementOutput() collector.IncrementDropped() assert.Equal(t, int64(1), collector.GetInputCount()) assert.Equal(t, int64(1), collector.GetOutputCount()) assert.Equal(t, int64(1), collector.GetDroppedCount()) // 测试重置 collector.Reset() assert.Equal(t, int64(0), collector.GetInputCount()) assert.Equal(t, int64(0), collector.GetOutputCount()) assert.Equal(t, int64(0), collector.GetDroppedCount()) // 测试基本统计信息 basicStats := collector.GetBasicStats(5, 10, 3, 20, 2, 5, 1, 0) assert.Equal(t, int64(0), basicStats[InputCount]) assert.Equal(t, int64(5), basicStats[DataChanLen]) assert.Equal(t, int64(10), basicStats[DataChanCap]) // 测试详细统计信息 collector.IncrementInput() collector.IncrementInput() collector.IncrementOutput() collector.IncrementDropped() basicStats = collector.GetBasicStats(8, 10, 15, 20, 4, 5, 2, 1) detailedStats := collector.GetDetailedStats(basicStats) assert.Contains(t, detailedStats, BasicStats) assert.Contains(t, detailedStats, DataChanUsage) assert.Contains(t, detailedStats, ResultChanUsage) assert.Contains(t, detailedStats, ProcessRate) assert.Contains(t, detailedStats, DropRate) // 验证计算结果 dataUsage := detailedStats[DataChanUsage].(float64) assert.Equal(t, 80.0, dataUsage) // 8/10 * 100 processRate := detailedStats[ProcessRate].(float64) assert.Equal(t, 50.0, processRate) // 1/2 * 100 dropRate := detailedStats[DropRate].(float64) assert.Equal(t, 50.0, dropRate) // 1/2 * 100 } // TestResultHandlerGetResultsChan 测试结果处理器的GetResultsChan功能 func TestResultHandlerGetResultsChan(t *testing.T) { config := types.Config{ SimpleFields: []string{"test"}, } stream, err := NewStream(config) require.NoError(t, err) defer func() { if stream != nil { close(stream.done) } }() // 测试获取结果通道 resultsChan := stream.GetResultsChan() assert.NotNil(t, resultsChan) } // TestConcurrentDataChannelExpansion 测试并发数据通道扩容 func TestConcurrentDataChannelExpansion(t *testing.T) { // 创建流 config := types.Config{ SimpleFields: []string{"id", "message"}, PerformanceConfig: types.PerformanceConfig{ BufferConfig: types.BufferConfig{ DataChannelSize: 5, ResultChannelSize: 10, }, WorkerConfig: types.WorkerConfig{ SinkPoolSize: 2, MaxRetryRoutines: 1, }, }, } stream, err := NewStream(config) require.NoError(t, err) defer stream.Stop() // 启动流 stream.Start() // 并发添加大量数据 var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(id int) { defer wg.Done() data := map[string]interface{}{ "id": id, "message": "concurrent_expansion", } stream.Emit(data) }(i) } wg.Wait() time.Sleep(100 * time.Millisecond) // 验证流仍在正常运行 assert.NotNil(t, stream) } // TestExpandDataChannelDirectly 直接测试expandDataChannel函数 // 提高expandDataChannel函数的覆盖率 func TestExpandDataChannelDirectly(t *testing.T) { // 创建一个Stream实例用于测试 config := types.Config{ SimpleFields: []string{"test"}, PerformanceConfig: types.PerformanceConfig{ BufferConfig: types.BufferConfig{ DataChannelSize: 10, ResultChannelSize: 10, }, }, } stream, err := NewStream(config) require.NoError(t, err) defer stream.Stop() // 填充通道到80%以上以触发扩容条件 for i := 0; i < 9; i++ { select { case stream.dataChan <- map[string]interface{}{"test": i}: default: t.Fatal("Failed to fill channel") } } // 记录原始容量 originalCap := cap(stream.dataChan) // 直接调用扩容函数 stream.expandDataChannel() // 验证容量是否增加 newCap := cap(stream.dataChan) assert.Greater(t, newCap, originalCap, "Channel capacity should increase after expansion") // 验证数据是否正确迁移 assert.Equal(t, 9, len(stream.dataChan), "All data should be migrated to new channel") // 测试并发扩容保护 var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func() { defer wg.Done() stream.expandDataChannel() }() } wg.Wait() } // TestExpandDataChannelBelowThreshold 测试通道使用率低于阈值时不扩容 func TestExpandDataChannelBelowThreshold(t *testing.T) { config := types.Config{ SimpleFields: []string{"test"}, PerformanceConfig: types.PerformanceConfig{ BufferConfig: types.BufferConfig{ DataChannelSize: 10, ResultChannelSize: 10, }, }, } stream, err := NewStream(config) require.NoError(t, err) defer stream.Stop() // 只填充少量数据(低于80%阈值) for i := 0; i < 3; i++ { stream.dataChan <- map[string]interface{}{"test": i} } originalCap := cap(stream.dataChan) // 调用扩容函数 stream.expandDataChannel() // 验证容量没有变化 newCap := cap(stream.dataChan) assert.Equal(t, originalCap, newCap, "Channel capacity should not change when below threshold") } // TestStreamConfigErrorHandlingEnhanced 测试流配置错误处理增强版 func TestStreamConfigErrorHandlingEnhanced(t *testing.T) { tests := []struct { name string config types.Config expectError bool }{ { name: "空配置", config: types.Config{ PerformanceConfig: types.DefaultPerformanceConfig(), }, expectError: false, }, { name: "无效窗口类型", config: types.Config{ WindowConfig: types.WindowConfig{ Type: "invalid_type", }, NeedWindow: true, PerformanceConfig: types.DefaultPerformanceConfig(), }, expectError: true, }, { name: "零时间窗口", config: func() types.Config { c := types.NewConfig() c.WindowConfig = types.WindowConfig{ Type: "tumbling", TimeUnit: 0, } c.NeedWindow = true c.PerformanceConfig = types.DefaultPerformanceConfig() return c }(), expectError: true, }, { name: "负时间窗口", config: func() types.Config { c := types.NewConfig() c.WindowConfig = types.WindowConfig{ Type: "sliding", TimeUnit: 1 * time.Second, } c.NeedWindow = true c.PerformanceConfig = types.DefaultPerformanceConfig() return c }(), expectError: true, }, { name: "超大缓冲区配置", config: func() types.Config { c := types.NewConfig() c.PerformanceConfig = types.PerformanceConfig{ BufferConfig: types.BufferConfig{ DataChannelSize: -1, }, OverflowConfig: types.OverflowConfig{ Strategy: "invalid", }, } return c }(), expectError: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, err := NewStream(tt.config) if (err != nil) != tt.expectError { t.Errorf("expected error: %v, got: %v", tt.expectError, err) } }) } } // TestStreamDataValidationEnhanced 测试流数据验证增强版 func TestStreamDataValidationEnhanced(t *testing.T) { config := types.NewConfig() config.PerformanceConfig = types.DefaultPerformanceConfig() stream, err := NewStream(config) if err != nil { t.Fatalf("failed to create stream: %v", err) } tests := []struct { name string data map[string]interface{} expectError bool }{ { name: "nil数据", data: nil, expectError: true, }, { name: "空数据", data: map[string]interface{}{}, expectError: false, }, { name: "无效时间戳类型", data: map[string]interface{}{ "timestamp": "invalid", "value": 1, }, expectError: false, // 由处理器处理,不直接报错 }, { name: "负数值", data: map[string]interface{}{ "value": -100, }, expectError: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // 这里简化测试,只验证数据结构 if tt.data != nil && len(tt.data) == 0 { // 空数据测试 } }) } _ = stream } // TestStreamMemoryPressureEnhanced 测试内存压力场景增强版 func TestStreamMemoryPressureEnhanced(t *testing.T) { config := types.NewConfig() config.PerformanceConfig = types.PerformanceConfig{ BufferConfig: types.BufferConfig{ DataChannelSize: 1, ResultChannelSize: 1, MaxBufferSize: 10, }, OverflowConfig: types.OverflowConfig{ Strategy: "drop", AllowDataLoss: true, }, } stream, err := NewStream(config) if err != nil { t.Fatalf("failed to create stream: %v", err) } // 模拟大量数据 largeData := make([]map[string]interface{}, 1000) for i := 0; i < 1000; i++ { largeData[i] = map[string]interface{}{ "value": i, "key": "test", } } _ = stream _ = largeData } // TestStreamConcurrentAccessEnhanced 测试并发访问边界条件增强版 func TestStreamConcurrentAccessEnhanced(t *testing.T) { config := types.NewConfig() config.PerformanceConfig = types.DefaultPerformanceConfig() stream, err := NewStream(config) if err != nil { t.Fatalf("failed to create stream: %v", err) } // 测试并发关闭 go func() { stream.Stop() }() // 测试并发启动 go func() { stream.Start() time.Sleep(100 * time.Millisecond) }() // 给并发操作一些时间 time.Sleep(10 * time.Millisecond) } // TestStreamWindowEdgeCasesEnhanced 测试窗口边界情况增强版 func TestStreamWindowEdgeCasesEnhanced(t *testing.T) { tests := []struct { name string config types.Config expectError bool }{ { name: "极小时间窗口", config: func() types.Config { c := types.NewConfig() c.WindowConfig = types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{ "size": 1 * time.Nanosecond, // 极小时间窗口 }, TimeUnit: 1 * time.Nanosecond, } c.NeedWindow = true c.PerformanceConfig = types.DefaultPerformanceConfig() return c }(), expectError: false, }, { name: "极大时间窗口", config: types.Config{ WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{ "size": 8760 * time.Hour, // 1年 }, TimeUnit: 8760 * time.Hour, }, NeedWindow: true, PerformanceConfig: types.DefaultPerformanceConfig(), }, expectError: false, }, { name: "滑动窗口零滑动", config: types.Config{ WindowConfig: types.WindowConfig{ Type: "sliding", Params: map[string]interface{}{ "size": 1 * time.Second, "slide": 1 * time.Millisecond, // 很小的滑动间隔 }, TimeUnit: 1 * time.Second, }, NeedWindow: true, PerformanceConfig: types.DefaultPerformanceConfig(), }, expectError: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, err := NewStream(tt.config) if (err != nil) != tt.expectError { t.Errorf("expected error: %v, got: %v", tt.expectError, err) } }) } } // TestStreamUnifiedConfigIntegration 测试Stream和Window统一配置的集成 func TestStreamUnifiedConfigIntegration(t *testing.T) { testCases := []struct { name string performanceConfig types.PerformanceConfig expectedWindowBufferSize int }{ { name: "默认配置", performanceConfig: types.DefaultPerformanceConfig(), expectedWindowBufferSize: 1000, }, { name: "高性能配置", performanceConfig: types.HighPerformanceConfig(), expectedWindowBufferSize: 5000, }, { name: "低延迟配置", performanceConfig: types.LowLatencyConfig(), expectedWindowBufferSize: 100, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { config := types.Config{ NeedWindow: true, WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{ "size": "5s", }, }, SelectFields: map[string]aggregator.AggregateType{ "value": aggregator.Count, }, PerformanceConfig: tc.performanceConfig, } s, err := NewStream(config) require.NoError(t, err) defer s.Stop() // 验证stream的缓冲区配置 assert.Equal(t, tc.performanceConfig.BufferConfig.DataChannelSize, cap(s.dataChan), "数据通道大小不匹配") assert.Equal(t, tc.performanceConfig.BufferConfig.ResultChannelSize, cap(s.resultChan), "结果通道大小不匹配") // 验证窗口创建成功 assert.NotNil(t, s.Window, "窗口应该被创建") }) } } // TestStreamUnifiedConfigPerformanceImpact 测试统一配置对Stream性能的影响 func TestStreamUnifiedConfigPerformanceImpact(t *testing.T) { configs := map[string]types.PerformanceConfig{ "默认配置": types.DefaultPerformanceConfig(), "高性能配置": types.HighPerformanceConfig(), "低延迟配置": types.LowLatencyConfig(), } for name, perfConfig := range configs { t.Run(name, func(t *testing.T) { config := types.Config{ NeedWindow: true, WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{ "size": "1s", }, }, SelectFields: map[string]aggregator.AggregateType{ "value": aggregator.Sum, }, PerformanceConfig: perfConfig, } s, err := NewStream(config) require.NoError(t, err) defer s.Stop() go s.Start() // 发送测试数据并测量性能 dataCount := 1000 startTime := time.Now() for i := 0; i < dataCount; i++ { data := map[string]interface{}{ "value": i, "timestamp": time.Now().Unix(), } select { case s.dataChan <- data: // 成功发送 case <-time.After(100 * time.Millisecond): // 发送超时 t.Logf("第%d条数据发送超时", i) break } } processingTime := time.Since(startTime) t.Logf("%s 处理%d条数据耗时: %v", name, dataCount, processingTime) // 等待一些结果 time.Sleep(1500 * time.Millisecond) // 检查结果 resultCount := 0 for { select { case <-s.resultChan: resultCount++ default: goto done } } done: // 性能测试主要关注处理时间,结果数量可能因窗口触发时机而变化 }) } } // TestStreamUnifiedConfigErrorHandling 测试统一配置的错误处理 func TestStreamUnifiedConfigErrorHandling(t *testing.T) { tests := []struct { name string config types.Config expectError bool description string }{ { name: "无效窗口类型", config: types.Config{ NeedWindow: true, WindowConfig: types.WindowConfig{ Type: "invalid_window_type", Params: map[string]interface{}{ "size": "5s", }, }, SelectFields: map[string]aggregator.AggregateType{ "value": aggregator.Count, }, PerformanceConfig: types.DefaultPerformanceConfig(), }, expectError: true, description: "无效的窗口类型应该导致创建失败", }, { name: "缺少窗口大小参数", config: types.Config{ NeedWindow: true, WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{}, }, SelectFields: map[string]aggregator.AggregateType{ "value": aggregator.Count, }, PerformanceConfig: types.DefaultPerformanceConfig(), }, expectError: true, description: "缺少size参数应该导致创建失败", }, { name: "有效配置", config: types.Config{ NeedWindow: true, WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{ "size": "5s", }, }, SelectFields: map[string]aggregator.AggregateType{ "value": aggregator.Count, }, PerformanceConfig: types.DefaultPerformanceConfig(), }, expectError: false, description: "有效配置应该创建成功", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { stream, err := NewStream(tt.config) if tt.expectError { assert.Error(t, err, tt.description) assert.Nil(t, stream) } else { assert.NoError(t, err, tt.description) assert.NotNil(t, stream) if stream != nil { defer stream.Stop() } } }) } } // TestStreamUnifiedConfigCompatibility 测试统一配置的兼容性 func TestStreamUnifiedConfigCompatibility(t *testing.T) { // 测试新的统一配置 newConfig := types.Config{ NeedWindow: false, SelectFields: map[string]aggregator.AggregateType{ "value": aggregator.Count, }, PerformanceConfig: types.HighPerformanceConfig(), } s1, err := NewStream(newConfig) require.NoError(t, err) defer s1.Stop() // 验证新配置生效 expectedDataSize := types.HighPerformanceConfig().BufferConfig.DataChannelSize assert.Equal(t, expectedDataSize, cap(s1.dataChan), "高性能配置的数据通道大小不匹配") // 测试默认配置 defaultConfig := types.Config{ NeedWindow: false, SelectFields: map[string]aggregator.AggregateType{ "value": aggregator.Count, }, PerformanceConfig: types.DefaultPerformanceConfig(), } s2, err := NewStream(defaultConfig) require.NoError(t, err) defer s2.Stop() // 验证默认配置 expectedDefaultSize := types.DefaultPerformanceConfig().BufferConfig.DataChannelSize assert.Equal(t, expectedDefaultSize, cap(s2.dataChan), "默认配置的数据通道大小不匹配") } // TestStatsManagerEnhanced 测试统计管理器增强版 func TestStatsManagerEnhanced(t *testing.T) { config := types.Config{ NeedWindow: false, SimpleFields: []string{"value"}, } stream, err := NewStream(config) require.NoError(t, err) defer stream.Stop() // 启动流处理 stream.Start() // 发送一些数据来生成统计信息 for i := 0; i < 10; i++ { stream.Emit(map[string]interface{}{"value": i}) } // 等待处理完成 time.Sleep(100 * time.Millisecond) // 测试基本统计 stats := stream.GetStats() assert.Equal(t, int64(10), stats[InputCount], "输入计数不匹配") assert.GreaterOrEqual(t, stats[OutputCount], int64(1), "输出计数应该大于等于1") // 测试重置统计 stream.ResetStats() stats = stream.GetStats() assert.Equal(t, int64(0), stats[InputCount], "重置后输入计数应该为0") } // TestDataHandlerEnhanced 测试数据处理器增强版 func TestDataHandlerEnhanced(t *testing.T) { tests := []struct { name string performanceConfig types.PerformanceConfig dataCount int expectedDrops bool }{ { name: "高性能配置 - 无丢弃", performanceConfig: types.HighPerformanceConfig(), dataCount: 100, expectedDrops: false, }, { name: "低延迟配置 - 可能丢弃", performanceConfig: types.LowLatencyConfig(), dataCount: 1000, expectedDrops: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { config := types.Config{ NeedWindow: false, SimpleFields: []string{"value"}, PerformanceConfig: tt.performanceConfig, } stream, err := NewStream(config) require.NoError(t, err) defer stream.Stop() stream.Start() // 快速发送大量数据 for i := 0; i < tt.dataCount; i++ { stream.Emit(map[string]interface{}{"value": i}) } time.Sleep(100 * time.Millisecond) stats := stream.GetStats() droppedCount := stats[DroppedCount] if tt.expectedDrops { // 在高负载下可能会有丢弃 t.Logf("%s: 输入 %d, 丢弃 %d", tt.name, stats[InputCount], droppedCount) } else { // 高性能配置应该能处理所有数据 assert.Equal(t, int64(0), droppedCount, "高性能配置不应该丢弃数据") } }) } } // TestResultHandlerEnhanced 测试结果处理器增强版 func TestResultHandlerEnhanced(t *testing.T) { config := types.Config{ NeedWindow: false, SimpleFields: []string{"value"}, } stream, err := NewStream(config) require.NoError(t, err) defer stream.Stop() // 测试Sink功能 var mu sync.Mutex var receivedResults []interface{} stream.AddSink(func(result []map[string]interface{}) { mu.Lock() defer mu.Unlock() receivedResults = append(receivedResults, result) }) stream.Start() // 发送测试数据 testData := []map[string]interface{}{ {"value": 1}, {"value": 2}, {"value": 3}, } for _, data := range testData { stream.Emit(data) } time.Sleep(100 * time.Millisecond) // 验证结果 mu.Lock() defer mu.Unlock() assert.GreaterOrEqual(t, len(receivedResults), len(testData), "应该接收到所有结果") // 验证结果格式 for _, result := range receivedResults { assert.IsType(t, []map[string]interface{}{}, result, "结果应该是map切片类型") resultSlice := result.([]map[string]interface{}) assert.Greater(t, len(resultSlice), 0, "结果切片不应该为空") } } // TestPerformanceConfigurationsEnhanced 测试不同性能配置的效果增强版 func TestPerformanceConfigurationsEnhanced(t *testing.T) { configs := map[string]types.PerformanceConfig{ "Default": types.DefaultPerformanceConfig(), "HighPerformance": types.HighPerformanceConfig(), "LowLatency": types.LowLatencyConfig(), } for name, perfConfig := range configs { t.Run(name, func(t *testing.T) { config := types.Config{ NeedWindow: false, SimpleFields: []string{"value"}, PerformanceConfig: perfConfig, } stream, err := NewStream(config) require.NoError(t, err) defer stream.Stop() // 验证缓冲区大小 assert.Equal(t, perfConfig.BufferConfig.DataChannelSize, cap(stream.dataChan)) assert.Equal(t, perfConfig.BufferConfig.ResultChannelSize, cap(stream.resultChan)) // 验证工作池配置 assert.Equal(t, perfConfig.WorkerConfig.SinkPoolSize, cap(stream.sinkWorkerPool)) //t.Logf("%s配置: 数据通道=%d, 结果通道=%d, Sink池=%d", // name, // cap(stream.dataChan), // cap(stream.resultChan), // cap(stream.sinkWorkerPool)) }) } } // TestNewStreamFactory 测试流工厂创建 func TestNewStreamFactory(t *testing.T) { factory := NewStreamFactory() assert.NotNil(t, factory) } // TestStreamFactory_CreateStream 测试创建默认配置的流 func TestStreamFactory_CreateStream(t *testing.T) { factory := NewStreamFactory() config := types.Config{ SimpleFields: []string{"name", "age"}, } stream, err := factory.CreateStream(config) require.NoError(t, err) assert.NotNil(t, stream) defer func() { if stream != nil { close(stream.done) } }() // 验证默认性能配置已应用 assert.NotEqual(t, types.PerformanceConfig{}, stream.config.PerformanceConfig) } // TestStreamFactory_CreateHighPerformanceStream 测试创建高性能流 func TestStreamFactory_CreateHighPerformanceStream(t *testing.T) { factory := NewStreamFactory() config := types.Config{ SimpleFields: []string{"name", "age"}, } stream, err := factory.CreateHighPerformanceStream(config) require.NoError(t, err) assert.NotNil(t, stream) defer func() { if stream != nil { close(stream.done) } }() // 验证高性能配置 expectedConfig := types.HighPerformanceConfig() assert.Equal(t, expectedConfig, stream.config.PerformanceConfig) } // TestStreamFactory_CreateLowLatencyStream 测试创建低延迟流 func TestStreamFactory_CreateLowLatencyStream(t *testing.T) { factory := NewStreamFactory() config := types.Config{ SimpleFields: []string{"name", "age"}, } stream, err := factory.CreateLowLatencyStream(config) require.NoError(t, err) assert.NotNil(t, stream) defer func() { if stream != nil { close(stream.done) } }() // 验证低延迟配置 expectedConfig := types.LowLatencyConfig() assert.Equal(t, expectedConfig, stream.config.PerformanceConfig) } // TestStreamFactory_CreateCustomPerformanceStream 测试创建自定义性能配置流 func TestStreamFactory_CreateCustomPerformanceStream(t *testing.T) { factory := NewStreamFactory() config := types.Config{ SimpleFields: []string{"name", "age"}, } customPerfConfig := types.PerformanceConfig{ BufferConfig: types.BufferConfig{ DataChannelSize: 500, ResultChannelSize: 200, }, OverflowConfig: types.OverflowConfig{ Strategy: StrategyDrop, AllowDataLoss: true, BlockTimeout: time.Second, }, WorkerConfig: types.WorkerConfig{ SinkWorkerCount: 4, SinkPoolSize: 100, MaxRetryRoutines: 2, }, } stream, err := factory.CreateCustomPerformanceStream(config, customPerfConfig) require.NoError(t, err) assert.NotNil(t, stream) defer func() { if stream != nil { close(stream.done) } }() // 验证自定义配置 assert.Equal(t, customPerfConfig, stream.config.PerformanceConfig) } // TestStreamFactory_CreateStreamWithWindow 测试创建带窗口的流 func TestStreamFactory_CreateStreamWithWindow(t *testing.T) { factory := NewStreamFactory() config := types.Config{ SimpleFields: []string{"name", "age"}, NeedWindow: true, WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{ "size": "5s", }, }, } stream, err := factory.CreateStream(config) require.NoError(t, err) assert.NotNil(t, stream) assert.NotNil(t, stream.Window) defer func() { if stream != nil { close(stream.done) } }() } // TestStreamFactory_CreateStreamWithInvalidStrategy 测试创建无效策略的流 func TestStreamFactory_CreateStreamWithInvalidStrategy(t *testing.T) { factory := NewStreamFactory() config := types.Config{ SimpleFields: []string{"name", "age"}, PerformanceConfig: types.PerformanceConfig{ OverflowConfig: types.OverflowConfig{ Strategy: "invalid_strategy", }, }, } _, err := factory.CreateStream(config) // 应该报错,因为策略无效 require.Error(t, err) assert.Contains(t, err.Error(), "invalid overflow strategy") } // TestStreamFactory_CreateWindow 测试窗口创建 func TestStreamFactory_CreateWindow(t *testing.T) { factory := NewStreamFactory() config := types.Config{ WindowConfig: types.WindowConfig{ Type: "tumbling", Params: map[string]interface{}{ "size": "5s", }, }, PerformanceConfig: types.DefaultPerformanceConfig(), } win, err := factory.createWindow(config) require.NoError(t, err) assert.NotNil(t, win) } // TestStreamFactory_CreateStreamInstance 测试流实例创建 func TestStreamFactory_CreateStreamInstance(t *testing.T) { factory := NewStreamFactory() config := types.Config{ SimpleFields: []string{"name", "age"}, PerformanceConfig: types.DefaultPerformanceConfig(), } stream := factory.createStreamInstance(config, nil) assert.NotNil(t, stream) assert.NotNil(t, stream.dataChan) assert.NotNil(t, stream.resultChan) assert.NotNil(t, stream.done) assert.NotNil(t, stream.sinkWorkerPool) assert.Equal(t, config, stream.config) } // TestStreamFactory_Performance 测试工厂性能 func TestStreamFactory_Performance(t *testing.T) { factory := NewStreamFactory() config := types.Config{ SimpleFields: []string{"name", "age"}, } // 创建多个流实例,验证工厂性能 streams := make([]*Stream, 10) for i := 0; i < 10; i++ { stream, err := factory.CreateStream(config) require.NoError(t, err) streams[i] = stream } // 清理 for _, stream := range streams { stream.Stop() } } // TestStreamFactory_ConcurrentCreation 测试并发创建流 func TestStreamFactory_ConcurrentCreation(t *testing.T) { factory := NewStreamFactory() config := types.Config{ SimpleFields: []string{"name", "age"}, } const numGoroutines = 10 streams := make([]*Stream, numGoroutines) errors := make([]error, numGoroutines) done := make(chan struct{}) // 并发创建流 for i := 0; i < numGoroutines; i++ { go func(index int) { stream, err := factory.CreateStream(config) streams[index] = stream errors[index] = err done <- struct{}{} }(i) } // 等待所有goroutine完成 for i := 0; i < numGoroutines; i++ { <-done } // 验证结果 for i := 0; i < numGoroutines; i++ { assert.NoError(t, errors[i]) assert.NotNil(t, streams[i]) if streams[i] != nil { streams[i].Stop() } } }