package aggregator import ( "errors" "fmt" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) type testData struct { Device string temperature float64 humidity float64 } // TestGetResultsErrorCases 测试GetResults函数的错误情况 func TestGetResultsErrorCases(t *testing.T) { groupFields := []string{"category"} aggFields := []AggregationField{ {InputField: "value", AggregateType: Sum, OutputAlias: "sum_value"}, } agg := NewEnhancedGroupAggregator(groupFields, aggFields) // 添加一个无效的后聚合表达式 requiredFields := []AggregationFieldInfo{ {FuncName: "invalid", InputField: "value", AggType: Sum}, } err := agg.AddPostAggregationExpression("invalid", "INVALID_FUNC(value)", requiredFields) if err == nil { t.Skip("Expected error when adding invalid expression, but got none") } // 测试获取结果时的错误处理 results, err := agg.GetResults() if err != nil { t.Errorf("Unexpected error: %v", err) } if results == nil { t.Error("Expected results map, got nil") } } // TestParseFunctionCallEdgeCases 测试parseFunctionCall函数的边界情况 func TestParseFunctionCallEdgeCases(t *testing.T) { groupFields := []string{"category"} aggFields := []AggregationField{ {InputField: "value", AggregateType: Sum, OutputAlias: "sum_value"}, } agg := NewEnhancedGroupAggregator(groupFields, aggFields) tests := []struct { name string expr string expectError bool }{ { name: "Function with nested parentheses", expr: "SUM(CASE WHEN (value > 0) THEN value ELSE 0 END)", expectError: false, }, { name: "Function with string literals", expr: "CONCAT('Hello', 'World')", expectError: false, }, { name: "Function with quoted identifiers", expr: "SUM(`column name`)", expectError: false, }, { name: "Unmatched parentheses", expr: "SUM(value", expectError: true, }, { name: "Empty function call", expr: "()", expectError: true, }, { name: "Function with arithmetic", expr: "SUM(value * 2 + 1)", expectError: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, _ = agg.parseFunctionCall(tt.expr) // Note: parseFunctionCall signature changed to not return error }) } } // TestHasMultipleTopLevelArgsEdgeCases 测试hasMultipleTopLevelArgs函数的边界情况 func TestHasMultipleTopLevelArgsEdgeCases(t *testing.T) { tests := []struct { name string args string expected bool }{ { name: "Single argument", args: "value", expected: false, }, { name: "Multiple arguments", args: "value1, value2", expected: true, }, { name: "Arguments with nested function", args: "SUM(value), COUNT(*)", expected: true, }, { name: "Arguments with parentheses", args: "(value1 + value2), value3", expected: true, }, { name: "Single complex argument", args: "(value1, value2)", expected: false, }, { name: "Empty arguments", args: "", expected: false, }, { name: "Arguments with string literals", args: "'hello, world', value", expected: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result := hasMultipleTopLevelArgs(tt.args) if result != tt.expected { t.Errorf("hasMultipleTopLevelArgs(%q) = %v, want %v", tt.args, result, tt.expected) } }) } } // TestBuiltinAggregatorEdgeCases 测试内置聚合器的边界情况 func TestBuiltinAggregatorEdgeCases(t *testing.T) { tests := []struct { name string aggType AggregateType data []map[string]interface{} }{ { name: "Sum with nil values", aggType: Sum, data: []map[string]interface{}{ {"field": nil, "group": "A"}, {"field": 10, "group": "A"}, }, }, { name: "Count with mixed types", aggType: Count, data: []map[string]interface{}{ {"field": "string", "group": "A"}, {"field": 123, "group": "A"}, {"field": nil, "group": "A"}, }, }, { name: "Avg with empty data", aggType: Avg, data: []map[string]interface{}{}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { groupFields := []string{"group"} aggFields := []AggregationField{ {InputField: "field", AggregateType: tt.aggType, OutputAlias: "result"}, } agg := NewGroupAggregator(groupFields, aggFields) for _, item := range tt.data { agg.Add(item) } results, err := agg.GetResults() assert.NoError(t, err) assert.NotNil(t, results) }) } } func TestGroupAggregator_MultiFieldSum(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, { InputField: "humidity", AggregateType: Sum, OutputAlias: "humidity_sum", }, }, ) testData := []map[string]interface{}{ {"Device": "aa", "temperature": 25.5, "humidity": 60.0}, {"Device": "aa", "temperature": 26.8, "humidity": 55.0}, {"Device": "bb", "temperature": 22.3, "humidity": 65.0}, {"Device": "bb", "temperature": 23.5, "humidity": 70.0}, } for _, d := range testData { agg.Add(d) } expected := []map[string]interface{}{ {"Device": "aa", "temperature_sum": 52.3, "humidity_sum": 115.0}, {"Device": "bb", "temperature_sum": 45.8, "humidity_sum": 135.0}, } results, _ := agg.GetResults() assert.ElementsMatch(t, expected, results) } // TestGroupAggregator_Put 测试Put方法 func TestGroupAggregator_Put(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) // 测试Put方法 err := agg.Put("test_key", "test_value") assert.NoError(t, err) // 测试多次Put err = agg.Put("key1", 123) assert.NoError(t, err) err = agg.Put("key2", 456.78) assert.NoError(t, err) } // TestGroupAggregator_RegisterExpression 测试表达式注册 func TestGroupAggregator_RegisterExpression(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) // 注册表达式 evaluator := func(data interface{}) (interface{}, error) { if dataMap, ok := data.(map[string]interface{}); ok { if temp, exists := dataMap["temperature"]; exists { if tempFloat, ok := temp.(float64); ok { return tempFloat*1.8 + 32, nil // 摄氏度转华氏度 } } } return nil, errors.New("invalid data") } agg.RegisterExpression("fahrenheit", "temperature * 1.8 + 32", []string{"temperature"}, evaluator) // 验证表达式已注册 assert.NotNil(t, agg.expressions["fahrenheit"]) assert.Equal(t, "fahrenheit", agg.expressions["fahrenheit"].Field) assert.Equal(t, "temperature * 1.8 + 32", agg.expressions["fahrenheit"].Expression) assert.Equal(t, []string{"temperature"}, agg.expressions["fahrenheit"].Fields) } // TestGroupAggregator_Reset 测试Reset方法 func TestGroupAggregator_Reset(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) // 添加一些数据 testData := []map[string]interface{}{ {"Device": "test", "temperature": 25.5}, {"Device": "test", "temperature": 26.8}, } for _, d := range testData { agg.Add(d) } // 验证有数据 results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, 1) // 重置 agg.Reset() // 验证数据已清空 results, _ = agg.GetResults() assert.Len(t, results, 0) } // TestGroupAggregator_ErrorHandling 测试错误处理 func TestGroupAggregator_ErrorHandling(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) // 测试添加无效数据 err := agg.Add(nil) assert.Error(t, err) // 测试添加非map类型数据 err = agg.Add("invalid data") assert.Error(t, err) // 测试添加缺少分组字段的数据 err = agg.Add(map[string]interface{}{"temperature": 25.5}) assert.Error(t, err) } // TestGroupAggregator_DifferentAggregateTypes 测试不同聚合类型 func TestGroupAggregator_DifferentAggregateTypes(t *testing.T) { agg := NewGroupAggregator( []string{"category"}, []AggregationField{ { InputField: "value", AggregateType: Count, OutputAlias: "count", }, { InputField: "score", AggregateType: Avg, OutputAlias: "avg_score", }, { InputField: "score", AggregateType: Max, OutputAlias: "max_score", }, { InputField: "score", AggregateType: Min, OutputAlias: "min_score", }, }, ) testData := []map[string]interface{}{ {"category": "A", "value": 1, "score": 85.5}, {"category": "A", "value": 2, "score": 92.0}, {"category": "A", "value": 3, "score": 78.5}, {"category": "B", "value": 4, "score": 88.0}, {"category": "B", "value": 5, "score": 95.5}, } for _, d := range testData { err := agg.Add(d) assert.NoError(t, err) } results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, 2) // 验证结果 for _, result := range results { category := result["category"] if category == "A" { assert.Equal(t, float64(3), result["count"]) assert.InDelta(t, 85.33, result["avg_score"], 0.1) assert.Equal(t, 92.0, result["max_score"]) assert.Equal(t, 78.5, result["min_score"]) } else if category == "B" { assert.Equal(t, float64(2), result["count"]) assert.InDelta(t, 91.75, result["avg_score"], 0.1) assert.Equal(t, 95.5, result["max_score"]) assert.Equal(t, 88.0, result["min_score"]) } } } // TestGroupAggregator_MultipleGroupFields 测试多个分组字段 func TestGroupAggregator_MultipleGroupFields(t *testing.T) { agg := NewGroupAggregator( []string{"region", "category"}, []AggregationField{ { InputField: "sales", AggregateType: Sum, OutputAlias: "total_sales", }, }, ) testData := []map[string]interface{}{ {"region": "North", "category": "A", "sales": 100.0}, {"region": "North", "category": "A", "sales": 150.0}, {"region": "North", "category": "B", "sales": 200.0}, {"region": "South", "category": "A", "sales": 120.0}, {"region": "South", "category": "B", "sales": 180.0}, } for _, d := range testData { err := agg.Add(d) assert.NoError(t, err) } results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, 4) // 验证每个组合的结果 expected := map[string]float64{ "North-A": 250.0, "North-B": 200.0, "South-A": 120.0, "South-B": 180.0, } for _, result := range results { key := result["region"].(string) + "-" + result["category"].(string) expectedSales, exists := expected[key] assert.True(t, exists, "Unexpected group key: %s", key) assert.Equal(t, expectedSales, result["total_sales"]) } } // TestGroupAggregator_EmptyData 测试空数据处理 func TestGroupAggregator_EmptyData(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) // 不添加任何数据,直接获取结果 results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, 0) } // TestGroupAggregator_NilValues 测试空值处理 func TestGroupAggregator_NilValues(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) testData := []map[string]interface{}{ {"Device": "test", "temperature": 25.5}, {"Device": "test", "temperature": nil}, // 空值 {"Device": "test", "temperature": 30.0}, } for _, d := range testData { err := agg.Add(d) assert.NoError(t, err) } results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, 1) // 空值应该被忽略,只计算非空值 expected := 55.5 // 25.5 + 30.0 assert.Equal(t, expected, results[0]["temperature_sum"]) } // TestGroupAggregator_ConcurrentAccess 测试并发访问 func TestGroupAggregator_ConcurrentAccess(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) // 并发添加数据 go func() { for i := 0; i < 10; i++ { agg.Add(map[string]interface{}{"Device": "A", "temperature": float64(i)}) } }() go func() { for i := 0; i < 10; i++ { agg.Add(map[string]interface{}{"Device": "B", "temperature": float64(i * 2)}) } }() // 并发注册表达式 go func() { evaluator := func(data interface{}) (interface{}, error) { return 1.0, nil } agg.RegisterExpression("test_expr", "1", []string{}, evaluator) }() // 并发Put操作 go func() { for i := 0; i < 5; i++ { agg.Put("key"+string(rune(i)), i) } }() // 等待一段时间确保所有goroutine完成 // 注意:这不是最佳的同步方式,但对于测试来说足够了 // 在实际应用中应该使用sync.WaitGroup for i := 0; i < 100; i++ { // 尝试获取结果,测试并发读取 _, _ = agg.GetResults() } } // TestCreateBuiltinAggregator 测试内置聚合器创建 func TestCreateBuiltinAggregator(t *testing.T) { tests := []struct { name string aggType AggregateType }{ {"Sum聚合器", Sum}, {"Count聚合器", Count}, {"Avg聚合器", Avg}, {"Max聚合器", Max}, {"Min聚合器", Min}, {"Expression聚合器", Expression}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { aggregator := CreateBuiltinAggregator(tt.aggType) assert.NotNil(t, aggregator) // 测试New方法 newAgg := aggregator.New() assert.NotNil(t, newAgg) }) } } // TestExpressionAggregatorWrapper 测试表达式聚合器包装器 func TestExpressionAggregatorWrapper(t *testing.T) { wrapper := CreateBuiltinAggregator(Expression) require.NotNil(t, wrapper) // 测试类型断言 exprWrapper, ok := wrapper.(*ExpressionAggregatorWrapper) assert.True(t, ok) assert.NotNil(t, exprWrapper.function) // 测试New方法 newWrapper := wrapper.New() assert.NotNil(t, newWrapper) // 测试Add和Result方法 wrapper.Add(10.0) wrapper.Add(20.0) result := wrapper.Result() assert.NotNil(t, result) } func TestGroupAggregator_SingleField(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) testData := []map[string]interface{}{ {"Device": "cc", "temperature": 24.5}, {"Device": "cc", "temperature": 27.8}, } for _, d := range testData { agg.Add(d) } expected := []map[string]interface{}{ {"Device": "cc", "temperature_sum": 52.3}, } results, _ := agg.GetResults() assert.ElementsMatch(t, expected, results) } func TestGroupAggregator_MultipleAggregators(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, { InputField: "humidity", AggregateType: Avg, OutputAlias: "humidity_avg", }, { InputField: "presure", AggregateType: Max, OutputAlias: "presure_max", }, { InputField: "PM10", AggregateType: Min, OutputAlias: "PM10_min", }, }, ) testData := []map[string]interface{}{ {"Device": "cc", "temperature": 25.5, "humidity": 65.5, "presure": 1008, "PM10": 35}, {"Device": "cc", "temperature": 27.8, "humidity": 60.5, "presure": 1012, "PM10": 28}, } for _, d := range testData { agg.Add(d) } expected := []map[string]interface{}{ { "Device": "cc", "temperature_sum": 53.3, "humidity_avg": 63.0, "presure_max": 1012.0, "PM10_min": 28.0, }, } results, _ := agg.GetResults() assert.ElementsMatch(t, expected, results) } func TestGroupAggregator_NoAlias(t *testing.T) { // Test case where no alias is specified, should use input field name as output field name agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, // OutputAlias left empty, should use InputField }, }, ) testData := []map[string]interface{}{ {"Device": "dd", "temperature": 10.0}, {"Device": "dd", "temperature": 15.0}, } for _, d := range testData { agg.Add(d) } expected := []map[string]interface{}{ {"Device": "dd", "temperature": 25.0}, } results, _ := agg.GetResults() assert.ElementsMatch(t, expected, results) } // TestGroupAggregatorAdvancedFeatures 测试聚合器高级功能 func TestGroupAggregatorAdvancedFeatures(t *testing.T) { // 测试复杂聚合表达式 t.Run("Complex Aggregation Expressions", func(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Avg, OutputAlias: "avg_temp", }, { InputField: "humidity", AggregateType: Max, OutputAlias: "max_humidity", }, { InputField: "pressure", AggregateType: Min, OutputAlias: "min_pressure", }, }, ) testData := []map[string]interface{}{ {"Device": "sensor1", "temperature": 25.5, "humidity": 60.0, "pressure": 1013.25}, {"Device": "sensor1", "temperature": 26.8, "humidity": 65.0, "pressure": 1012.50}, {"Device": "sensor2", "temperature": 22.3, "humidity": 55.0, "pressure": 1014.75}, {"Device": "sensor2", "temperature": 23.5, "humidity": 70.0, "pressure": 1013.00}, } for _, d := range testData { agg.Add(d) } results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, 2) // 验证结果 for _, result := range results { device := result["Device"].(string) if device == "sensor1" { assert.InDelta(t, 26.15, result["avg_temp"], 0.01) assert.Equal(t, 65.0, result["max_humidity"]) assert.Equal(t, 1012.50, result["min_pressure"]) } else if device == "sensor2" { assert.InDelta(t, 22.9, result["avg_temp"], 0.01) assert.Equal(t, 70.0, result["max_humidity"]) assert.Equal(t, 1013.00, result["min_pressure"]) } } }) // 测试统计聚合函数 t.Run("Statistical Aggregation Functions", func(t *testing.T) { tests := []struct { name string aggType AggregateType data []map[string]interface{} }{ {"StdDev", StdDev, []map[string]interface{}{ {"group": "A", "value": 1.0}, {"group": "A", "value": 2.0}, {"group": "A", "value": 3.0}, }}, {"Var", Var, []map[string]interface{}{ {"group": "A", "value": 1.0}, {"group": "A", "value": 2.0}, {"group": "A", "value": 3.0}, }}, {"Median", Median, []map[string]interface{}{ {"group": "A", "value": 1.0}, {"group": "A", "value": 2.0}, {"group": "A", "value": 3.0}, }}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { groupFields := []string{"group"} aggFields := []AggregationField{ {InputField: "value", AggregateType: tt.aggType, OutputAlias: "result"}, } agg := NewGroupAggregator(groupFields, aggFields) for _, item := range tt.data { agg.Add(item) } results, _ := agg.GetResults() assert.NotNil(t, results) }) } }) } // TestGroupAggregatorDataTypes 测试不同数据类型的聚合 func TestGroupAggregatorDataTypes(t *testing.T) { tests := []struct { name string aggType AggregateType inputData []map[string]interface{} expectedKey string expectedVal interface{} }{ { name: "String Count", aggType: Count, inputData: []map[string]interface{}{ {"group": "A", "value": "hello"}, {"group": "A", "value": "world"}, {"group": "B", "value": "test"}, }, expectedKey: "count", expectedVal: 2.0, // Count聚合器计算所有非空值 }, { name: "Boolean Count", aggType: Count, inputData: []map[string]interface{}{ {"group": "A", "value": true}, {"group": "A", "value": false}, {"group": "A", "value": true}, {"group": "B", "value": false}, }, expectedKey: "count", expectedVal: 3.0, // Count聚合器计算所有非空值 }, { name: "Mixed Types Count", aggType: Count, inputData: []map[string]interface{}{ {"group": "A", "value": 123}, {"group": "A", "value": "string"}, {"group": "A", "value": true}, {"group": "B", "value": 456}, }, expectedKey: "count", expectedVal: 3.0, // Count聚合器计算所有非空值 }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { agg := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "value", AggregateType: tt.aggType, OutputAlias: tt.expectedKey, }, }, ) for _, d := range tt.inputData { agg.Add(d) } results, err := agg.GetResults() assert.NoError(t, err) // 找到A组的结果 var groupAResult map[string]interface{} for _, result := range results { if result["group"] == "A" { groupAResult = result break } } assert.NotNil(t, groupAResult) assert.Equal(t, tt.expectedVal, groupAResult[tt.expectedKey]) }) } } // TestGroupAggregatorEdgeCases 测试聚合器边界情况 func TestGroupAggregatorEdgeCases(t *testing.T) { // 测试空数据 t.Run("Empty Data", func(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) results, err := agg.GetResults() assert.NoError(t, err) assert.Empty(t, results) }) // 测试空分组字段 t.Run("Empty Group Fields", func(t *testing.T) { agg := NewGroupAggregator( []string{}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) testData := []map[string]interface{}{ {"temperature": 25.5}, {"temperature": 26.8}, } for _, d := range testData { agg.Add(d) } results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, 1) assert.InDelta(t, 52.3, results[0]["temperature_sum"], 0.01) }) // 测试空聚合字段 t.Run("Empty Aggregation Fields", func(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{}, ) testData := []map[string]interface{}{ {"Device": "sensor1", "temperature": 25.5}, {"Device": "sensor2", "temperature": 26.8}, } for _, d := range testData { agg.Add(d) } results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, 2) // 只应该包含分组字段,没有聚合字段 for _, result := range results { assert.Contains(t, result, "Device") assert.Len(t, result, 1) } }) // 测试缺失字段 t.Run("Missing Fields", func(t *testing.T) { agg := NewGroupAggregator( []string{"Device"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "temperature_sum", }, }, ) testData := []map[string]interface{}{ {"Device": "sensor1", "temperature": 25.5}, {"Device": "sensor2"}, // 缺少temperature字段 {"Device": "sensor3", "temperature": 26.8}, } for _, d := range testData { agg.Add(d) } results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, 3) // 验证结果 for _, result := range results { device := result["Device"].(string) if device == "sensor1" { assert.Equal(t, 25.5, result["temperature_sum"]) } else if device == "sensor2" { assert.Nil(t, result["temperature_sum"]) // 缺失字段应该为nil } else if device == "sensor3" { assert.Equal(t, 26.8, result["temperature_sum"]) } } }) } // TestGroupAggregatorPerformance 测试聚合器性能 func TestGroupAggregatorPerformance(t *testing.T) { // 测试大量数据处理性能 t.Run("Large Dataset Performance", func(t *testing.T) { agg := NewGroupAggregator( []string{"category"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum", }, { InputField: "value", AggregateType: Avg, OutputAlias: "avg", }, { InputField: "value", AggregateType: Max, OutputAlias: "max", }, { InputField: "value", AggregateType: Min, OutputAlias: "min", }, }, ) // 生成大量测试数据 const numRecords = 10000 const numCategories = 100 for i := 0; i < numRecords; i++ { category := i % numCategories value := float64(i % 1000) data := map[string]interface{}{ "category": fmt.Sprintf("cat_%d", category), "value": value, } agg.Add(data) } results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, numCategories) // 验证结果 for _, result := range results { assert.Contains(t, result, "sum") assert.Contains(t, result, "avg") assert.Contains(t, result, "max") assert.Contains(t, result, "min") } }) // 测试并发性能 t.Run("Concurrent Performance", func(t *testing.T) { agg := NewGroupAggregator( []string{"category"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum", }, }, ) const numGoroutines = 10 const recordsPerGoroutine = 1000 var wg sync.WaitGroup wg.Add(numGoroutines) // 启动多个goroutine并发添加数据 for i := 0; i < numGoroutines; i++ { go func(goroutineID int) { defer wg.Done() for j := 0; j < recordsPerGoroutine; j++ { category := (goroutineID + j) % 100 value := float64(j) data := map[string]interface{}{ "category": fmt.Sprintf("cat_%d", category), "value": value, } agg.Add(data) } }(i) } wg.Wait() results, err := agg.GetResults() assert.NoError(t, err) assert.NotEmpty(t, results) }) } // TestGroupAggregatorMemoryUsage 测试聚合器内存使用 func TestGroupAggregatorMemoryUsage(t *testing.T) { // 测试大量分组的内存使用 t.Run("Many Groups Memory Usage", func(t *testing.T) { agg := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum", }, }, ) // 创建大量不同的分组 const numGroups = 10000 for i := 0; i < numGroups; i++ { data := map[string]interface{}{ "group": fmt.Sprintf("group_%d", i), "value": float64(i), } agg.Add(data) } results, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results, numGroups) // 验证每个分组都有正确的结果 for i := 0; i < numGroups; i++ { expectedGroup := fmt.Sprintf("group_%d", i) found := false for _, result := range results { if result["group"] == expectedGroup { assert.Equal(t, float64(i), result["sum"]) found = true break } } assert.True(t, found, "Group %s should be found in results", expectedGroup) } }) } // TestGroupAggregatorResetAndReuse 测试聚合器重置和重用 func TestGroupAggregatorResetAndReuse(t *testing.T) { agg := NewGroupAggregator( []string{"category"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum", }, }, ) // 第一轮数据 testData1 := []map[string]interface{}{ {"category": "A", "value": 10.0}, {"category": "B", "value": 20.0}, } for _, d := range testData1 { agg.Add(d) } results1, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results1, 2) // 重置聚合器 agg.Reset() // 第二轮数据 testData2 := []map[string]interface{}{ {"category": "A", "value": 15.0}, {"category": "C", "value": 25.0}, } for _, d := range testData2 { agg.Add(d) } results2, err := agg.GetResults() assert.NoError(t, err) assert.Len(t, results2, 2) // 验证第二轮结果 for _, result := range results2 { category := result["category"].(string) if category == "A" { assert.Equal(t, 15.0, result["sum"]) } else if category == "C" { assert.Equal(t, 25.0, result["sum"]) } } } // TestGroupAggregatorBasic 测试基本聚合器功能 func TestGroupAggregatorBasic(t *testing.T) { // 创建聚合字段配置 aggFields := []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum_value", }, } // 创建分组聚合器 ga := NewGroupAggregator([]string{"group"}, aggFields) // 测试数据 data := []map[string]interface{}{ {"group": "A", "value": 10}, {"group": "A", "value": 20}, {"group": "B", "value": 30}, } // 添加数据 for _, item := range data { err := ga.Add(item) if err != nil { t.Errorf("unexpected error: %v", err) } } // 获取结果 results, err := ga.GetResults() if err != nil { t.Errorf("unexpected error: %v", err) } // 验证结果 if len(results) != 2 { t.Errorf("expected 2 groups, got %d", len(results)) } } // TestGroupAggregatorErrorHandling 测试错误处理 func TestGroupAggregatorErrorHandling(t *testing.T) { // 测试空配置 ga := NewGroupAggregator([]string{}, []AggregationField{}) // 添加数据应该不会出错 err := ga.Add(map[string]interface{}{"field": "value"}) if err != nil { t.Errorf("unexpected error: %v", err) } // 获取结果 results, err := ga.GetResults() if err != nil { t.Errorf("unexpected error: %v", err) } // 空配置应该返回空结果 if len(results) != 1 { t.Errorf("expected 1 result, got %d", len(results)) } } // TestGroupAggregatorConcurrency 测试并发安全 func TestGroupAggregatorConcurrency(t *testing.T) { aggFields := []AggregationField{ { InputField: "value", AggregateType: Count, OutputAlias: "count_value", }, } ga := NewGroupAggregator([]string{"group"}, aggFields) // 并发添加数据 for i := 0; i < 100; i++ { go func(id int) { data := map[string]interface{}{ "group": "test", "value": id, } err := ga.Add(data) if err != nil { t.Errorf("unexpected error: %v", err) } }(i) } // 等待一段时间确保所有goroutine完成 time.Sleep(100 * time.Millisecond) // 获取结果 results, err := ga.GetResults() if err != nil { t.Errorf("unexpected error: %v", err) } // 验证结果存在 if len(results) == 0 { t.Error("expected results, got none") } } // TestRegisterFunction 测试 Register 函数 func TestRegisterFunction(t *testing.T) { // 创建自定义聚合器 customAggregator := func() AggregatorFunction { return &testCustomAggregator{} } // 注册自定义聚合器 Register("custom_test", customAggregator) // 验证注册成功(通过创建聚合器来验证) agg := CreateBuiltinAggregator("custom_test") assert.NotNil(t, agg) } // testCustomAggregator 测试用的自定义聚合器 type testCustomAggregator struct { sum float64 } func (t *testCustomAggregator) New() AggregatorFunction { return &testCustomAggregator{} } func (t *testCustomAggregator) Add(value interface{}) { if v, ok := value.(float64); ok { t.sum += v } } func (t *testCustomAggregator) Result() interface{} { return t.sum } // TestIsNumericAggregator 测试 isNumericAggregator 方法的各种分支 func TestIsNumericAggregator(t *testing.T) { ga := NewGroupAggregator([]string{"group"}, []AggregationField{}) // 测试数值聚合器 assert.True(t, ga.isNumericAggregator(Sum)) assert.True(t, ga.isNumericAggregator(Avg)) assert.True(t, ga.isNumericAggregator(Max)) assert.True(t, ga.isNumericAggregator(Min)) assert.True(t, ga.isNumericAggregator(Count)) assert.True(t, ga.isNumericAggregator(StdDev)) assert.True(t, ga.isNumericAggregator(Median)) assert.True(t, ga.isNumericAggregator(Percentile)) assert.True(t, ga.isNumericAggregator(Var)) assert.True(t, ga.isNumericAggregator(VarS)) assert.True(t, ga.isNumericAggregator(StdDevS)) // 测试非数值聚合器 assert.False(t, ga.isNumericAggregator(Collect)) assert.False(t, ga.isNumericAggregator(MergeAgg)) assert.False(t, ga.isNumericAggregator(Deduplicate)) assert.False(t, ga.isNumericAggregator(LastValue)) // 测试分析函数 assert.False(t, ga.isNumericAggregator(Lag)) assert.False(t, ga.isNumericAggregator(Latest)) assert.False(t, ga.isNumericAggregator(ChangedCol)) assert.False(t, ga.isNumericAggregator(HadChanged)) // 测试未知聚合器(通过名称模式匹配) assert.True(t, ga.isNumericAggregator("custom_sum")) assert.True(t, ga.isNumericAggregator("custom_avg")) assert.True(t, ga.isNumericAggregator("custom_min")) assert.True(t, ga.isNumericAggregator("custom_max")) assert.True(t, ga.isNumericAggregator("custom_count")) assert.True(t, ga.isNumericAggregator("custom_std")) assert.True(t, ga.isNumericAggregator("custom_var")) // 测试不匹配模式的未知聚合器 assert.False(t, ga.isNumericAggregator("custom_collect")) assert.False(t, ga.isNumericAggregator("unknown_function")) } // TestExpressionAggregator 测试表达式聚合器 func TestExpressionAggregator(t *testing.T) { // 创建表达式聚合器 agg := CreateBuiltinAggregator(Expression) assert.NotNil(t, agg) // 测试创建新实例 newAgg := agg.New() assert.NotNil(t, newAgg) // 测试添加值和获取结果 newAgg.Add("test_value") result := newAgg.Result() assert.NotNil(t, result) } // TestGroupAggregatorContextAggregator 测试 ContextAggregator 功能 func TestGroupAggregatorContextAggregator(t *testing.T) { // 创建一个共享的values切片来跟踪所有添加的值 sharedValues := &[]interface{}{} // 注册模拟聚合器 Register("mock_context", func() AggregatorFunction { return &mockContextAggregator{ contextKey: "test_context_key", values: sharedValues, } }) ga := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "missing_field", // 故意使用不存在的字段 AggregateType: "mock_context", OutputAlias: "context_result", }, }, ) // 设置上下文值 err := ga.Put("test_context_key", "context_value") assert.NoError(t, err) // 添加数据(字段不存在,应该从上下文获取) err = ga.Add(map[string]interface{}{ "group": "test_group", // 故意不包含 missing_field }) assert.NoError(t, err) // 获取结果 results, err := ga.GetResults() assert.NoError(t, err) assert.Len(t, results, 1) // 验证上下文值被使用 assert.Contains(t, *sharedValues, "context_value") } // mockContextAggregator 模拟的 ContextAggregator type mockContextAggregator struct { contextKey string values *[]interface{} } func (m *mockContextAggregator) New() AggregatorFunction { return &mockContextAggregator{ contextKey: m.contextKey, values: m.values, // 共享同一个values切片 } } func (m *mockContextAggregator) Add(value interface{}) { *m.values = append(*m.values, value) } func (m *mockContextAggregator) Result() interface{} { return len(*m.values) } func (m *mockContextAggregator) GetContextKey() string { return m.contextKey } // TestGroupAggregatorNumericConversionError 测试数值转换错误 func TestGroupAggregatorNumericConversionError(t *testing.T) { ga := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "value", AggregateType: Sum, // Sum 需要数值类型 OutputAlias: "sum_value", }, }, ) // 添加无法转换为数值的数据 err := ga.Add(map[string]interface{}{ "group": "test_group", "value": "not_a_number", // 无法转换为数值 }) // 应该返回转换错误 assert.Error(t, err) assert.Contains(t, err.Error(), "cannot convert field value") } // TestGroupAggregatorWithExpressionEvaluator 测试表达式求值器 func TestGroupAggregatorWithExpressionEvaluator(t *testing.T) { ga := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "temperature", AggregateType: Sum, OutputAlias: "fahrenheit_sum", }, }, ) // 注册表达式求值器(摄氏度转华氏度) evaluator := func(data interface{}) (interface{}, error) { if dataMap, ok := data.(map[string]interface{}); ok { if temp, exists := dataMap["temperature"]; exists { if tempFloat, ok := temp.(float64); ok { return tempFloat*1.8 + 32, nil } } } return nil, errors.New("invalid temperature data") } ga.RegisterExpression("fahrenheit_sum", "temperature * 1.8 + 32", []string{"temperature"}, evaluator) // 添加测试数据 testData := []map[string]interface{}{ {"group": "sensor1", "temperature": 0.0}, // 32°F {"group": "sensor1", "temperature": 100.0}, // 212°F } for _, data := range testData { err := ga.Add(data) assert.NoError(t, err) } // 获取结果 results, err := ga.GetResults() assert.NoError(t, err) assert.Len(t, results, 1) // 验证表达式求值结果 (32 + 212 = 244) assert.Equal(t, "sensor1", results[0]["group"]) assert.Equal(t, 244.0, results[0]["fahrenheit_sum"]) } // TestGroupAggregatorExpressionEvaluatorError 测试表达式求值器错误处理 func TestGroupAggregatorExpressionEvaluatorError(t *testing.T) { ga := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "processed_value", }, }, ) // 注册会出错的表达式求值器 errorEvaluator := func(data interface{}) (interface{}, error) { return nil, errors.New("expression evaluation failed") } ga.RegisterExpression("processed_value", "error_expression", []string{"value"}, errorEvaluator) // 添加测试数据 err := ga.Add(map[string]interface{}{ "group": "test_group", "value": 10.0, }) // 应该没有错误,因为表达式错误会被忽略 assert.NoError(t, err) // 获取结果 results, err := ga.GetResults() assert.NoError(t, err) assert.Len(t, results, 1) // 由于表达式求值失败,聚合器应该没有值 assert.Equal(t, "test_group", results[0]["group"]) // processed_value 应该是聚合器的默认结果(通常是 nil 或 0) } // TestGroupAggregatorCountStarField 测试 count(*) 功能 func TestGroupAggregatorCountStarField(t *testing.T) { ga := NewGroupAggregator( []string{"category"}, []AggregationField{ { InputField: "*", // count(*) 语法 AggregateType: Count, OutputAlias: "total_count", }, }, ) // 添加测试数据 testData := []map[string]interface{}{ {"category": "A", "value": 10}, {"category": "A", "value": 20}, {"category": "A"}, // 没有 value 字段 {"category": "B", "value": 30}, } for _, data := range testData { err := ga.Add(data) assert.NoError(t, err) } // 获取结果 results, err := ga.GetResults() assert.NoError(t, err) assert.Len(t, results, 2) // 验证 count(*) 结果 for _, result := range results { category := result["category"].(string) if category == "A" { assert.Equal(t, float64(3), result["total_count"]) // A 类别有 3 条记录 } else if category == "B" { assert.Equal(t, float64(1), result["total_count"]) // B 类别有 1 条记录 } } } // TestGroupAggregatorNilFieldValue 测试 nil 字段值处理 func TestGroupAggregatorNilFieldValue(t *testing.T) { ga := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum_value", }, }, ) // 添加包含 nil 值的数据 testData := []map[string]interface{}{ {"group": "test", "value": 10.0}, {"group": "test", "value": nil}, // nil 值应该被跳过 {"group": "test", "value": 20.0}, } for _, data := range testData { err := ga.Add(data) assert.NoError(t, err) } // 获取结果 results, err := ga.GetResults() assert.NoError(t, err) assert.Len(t, results, 1) // nil 值应该被跳过,只计算 10.0 + 20.0 = 30.0 assert.Equal(t, "test", results[0]["group"]) assert.Equal(t, 30.0, results[0]["sum_value"]) } // TestGroupAggregatorStructData 测试结构体数据 func TestGroupAggregatorStructData(t *testing.T) { // 定义测试结构体 type TestStruct struct { Group string Value float64 } ga := NewGroupAggregator( []string{"Group"}, []AggregationField{ { InputField: "Value", AggregateType: Sum, OutputAlias: "sum_value", }, }, ) // 添加结构体数据 testData := []TestStruct{ {Group: "A", Value: 10.0}, {Group: "A", Value: 20.0}, {Group: "B", Value: 30.0}, } for _, data := range testData { err := ga.Add(data) assert.NoError(t, err) } // 获取结果 results, err := ga.GetResults() assert.NoError(t, err) assert.Len(t, results, 2) // 验证结果 for _, result := range results { group := result["Group"].(string) if group == "A" { assert.Equal(t, 30.0, result["sum_value"]) } else if group == "B" { assert.Equal(t, 30.0, result["sum_value"]) } } } // TestGroupAggregatorPointerData 测试指针数据 func TestGroupAggregatorPointerData(t *testing.T) { type TestStruct struct { Group string Value float64 } ga := NewGroupAggregator( []string{"Group"}, []AggregationField{ { InputField: "Value", AggregateType: Sum, OutputAlias: "sum_value", }, }, ) // 添加指针数据 testData := &TestStruct{Group: "test", Value: 42.0} err := ga.Add(testData) assert.NoError(t, err) // 获取结果 results, err := ga.GetResults() assert.NoError(t, err) assert.Len(t, results, 1) assert.Equal(t, "test", results[0]["Group"]) assert.Equal(t, 42.0, results[0]["sum_value"]) } // TestGroupAggregatorUnsupportedDataType 测试不支持的数据类型 func TestGroupAggregatorUnsupportedDataType(t *testing.T) { ga := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum_value", }, }, ) // 测试不支持的数据类型 err := ga.Add(123) // int 类型不是 struct 或 map assert.Error(t, err) assert.Contains(t, err.Error(), "unsupported data type") err = ga.Add("string") // string 类型不支持 assert.Error(t, err) assert.Contains(t, err.Error(), "unsupported data type") err = ga.Add([]int{1, 2, 3}) // slice 类型不支持 assert.Error(t, err) assert.Contains(t, err.Error(), "unsupported data type") } // TestGroupAggregatorGroupFieldNilValue 测试分组字段为 nil 的情况 func TestGroupAggregatorGroupFieldNilValue(t *testing.T) { ga := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum_value", }, }, ) // 添加分组字段为 nil 的数据 err := ga.Add(map[string]interface{}{ "group": nil, // 分组字段为 nil "value": 10.0, }) // 应该返回错误 assert.Error(t, err) assert.Contains(t, err.Error(), "field group has nil value") } // TestIsNumericAggregatorAdvanced 测试 isNumericAggregator 的更多分支 func TestIsNumericAggregatorAdvanced(t *testing.T) { ga := NewGroupAggregator([]string{"group"}, []AggregationField{}) // 测试 TypeAnalytical 类型 result := ga.isNumericAggregator("analytical_func") assert.False(t, result) // 测试不存在的函数,但名称包含数值聚合关键字 result = ga.isNumericAggregator("custom_sum_func") assert.True(t, result) result = ga.isNumericAggregator("custom_avg_func") assert.True(t, result) result = ga.isNumericAggregator("custom_min_func") assert.True(t, result) result = ga.isNumericAggregator("custom_max_func") assert.True(t, result) result = ga.isNumericAggregator("custom_count_func") assert.True(t, result) result = ga.isNumericAggregator("custom_std_func") assert.True(t, result) result = ga.isNumericAggregator("custom_var_func") assert.True(t, result) // 测试不包含数值关键字的函数 result = ga.isNumericAggregator("custom_text_func") assert.False(t, result) } // TestGroupAggregatorNilData 测试 nil 数据 func TestGroupAggregatorNilData(t *testing.T) { ga := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum_value", }, }, ) // 测试 nil 数据 err := ga.Add(nil) assert.Error(t, err) assert.Contains(t, err.Error(), "data cannot be nil") } // TestGroupAggregatorMissingGroupField 测试缺少分组字段 func TestGroupAggregatorMissingGroupField(t *testing.T) { ga := NewGroupAggregator( []string{"missing_group"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum_value", }, }, ) // 添加缺少分组字段的数据 err := ga.Add(map[string]interface{}{ "value": 10, // 缺少 missing_group 字段 }) assert.Error(t, err) assert.Contains(t, err.Error(), "field missing_group not found") } // TestGroupAggregatorMissingAggregationField 测试缺少聚合字段但有上下文 func TestGroupAggregatorMissingAggregationField(t *testing.T) { // 创建一个不会从上下文获取值的聚合器 ga := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "missing_field", AggregateType: Sum, OutputAlias: "sum_value", }, }, ) // 添加缺少聚合字段的数据(没有上下文) err := ga.Add(map[string]interface{}{ "group": "test", // 缺少 missing_field }) assert.NoError(t, err) // 应该成功,因为会跳过缺少的字段 // 获取结果 results, err := ga.GetResults() assert.NoError(t, err) assert.Len(t, results, 1) // 由于没有添加任何值,聚合器应该返回默认值 } // TestGroupAggregatorExpressionEvaluationError 测试表达式求值错误但继续处理 func TestGroupAggregatorExpressionEvaluationError(t *testing.T) { ga := NewGroupAggregator( []string{"group"}, []AggregationField{ { InputField: "value", AggregateType: Sum, OutputAlias: "sum_value", }, { InputField: "other", AggregateType: Sum, OutputAlias: "expr_result", }, }, ) // 注册一个会出错的表达式求值器 ga.RegisterExpression("expr_result", "error_expr", []string{"other"}, func(data interface{}) (interface{}, error) { return nil, fmt.Errorf("evaluation error") }) // 添加数据 err := ga.Add(map[string]interface{}{ "group": "test", "value": 10, "other": 20, }) assert.NoError(t, err) // 应该成功,因为表达式错误会被跳过 // 获取结果 results, err := ga.GetResults() assert.NoError(t, err) assert.Len(t, results, 1) // sum_value 应该有值,expr_result 应该没有值或为默认值 assert.Equal(t, float64(10), results[0]["sum_value"]) }