From d0b35dbda0dc449a728c2ba96fa4d248fa0a958d Mon Sep 17 00:00:00 2001 From: rulego-team Date: Mon, 16 Jun 2025 20:32:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E5=AE=8C=E5=96=84=E8=A1=A8=E8=BE=BE?= =?UTF-8?q?=E5=BC=8F=E4=B8=AD=E8=B4=9F=E6=95=B0=E5=B8=B8=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/NEGATIVE_NUMBER_SUPPORT.md | 225 +++++ expr/expression.go | 47 +- expr/expression_test.go | 350 +++++++ streamsql_case_test.go | 1246 +++++------------------- streamsql_function_integration_test.go | 13 - 5 files changed, 862 insertions(+), 1019 deletions(-) create mode 100644 docs/NEGATIVE_NUMBER_SUPPORT.md diff --git a/docs/NEGATIVE_NUMBER_SUPPORT.md b/docs/NEGATIVE_NUMBER_SUPPORT.md new file mode 100644 index 0000000..4e6aba3 --- /dev/null +++ b/docs/NEGATIVE_NUMBER_SUPPORT.md @@ -0,0 +1,225 @@ +# StreamSQL 负数支持文档 + +## 概述 + +StreamSQL 现在全面支持负数在 CASE 表达式中的使用。本文档总结了负数支持的完善情况、支持范围和使用建议。 + +## ✅ 已支持的负数用法 + +### 1. 基本负数常量 + +```sql +-- CASE 表达式中的负数常量 +CASE WHEN temperature > 0 THEN 1 ELSE -1 END + +-- 负数小数 +CASE WHEN temperature > 0 THEN 1.5 ELSE -2.5 END + +-- 负零 +CASE WHEN temperature = -0 THEN 1 ELSE 0 END +``` + +### 2. 比较运算符后的负数 + +```sql +-- 比较运算符后直接跟负数 +CASE WHEN temperature < -10 THEN 'FREEZING' ELSE 'NORMAL' END +CASE WHEN temperature >= -5.5 THEN 'ABOVE' ELSE 'BELOW' END +CASE WHEN temperature > -20 THEN 'WARM' ELSE 'COLD' END +``` + +### 3. 简单 CASE 表达式中的负数 + +```sql +-- 简单 CASE 中使用负数作为匹配值 +CASE temperature + WHEN -10 THEN 'FROZEN' + WHEN -5 THEN 'COLD' + WHEN 0 THEN 'ZERO' + ELSE 'OTHER' +END +``` + +### 4. 算术表达式中的负数 + +```sql +-- 括号内的负数运算 +CASE WHEN temperature + (-10) > 0 THEN 1 ELSE 0 END +CASE WHEN (temperature * -1) > 10 THEN 1 ELSE 0 END +``` + +## ⚠️ 部分支持或限制 + +### 1. 函数参数中的负数表达式 + +```sql +-- 当前不完全支持:函数参数中的负数变量 +CASE WHEN ABS(-temperature) > 10 THEN 1 ELSE 0 END -- ❌ + +-- 推荐替代方案:使用括号或先计算 +CASE WHEN ABS(temperature * -1) > 10 THEN 1 ELSE 0 END -- ✅ +``` + +### 2. BETWEEN 语句中的负数范围 + +```sql +-- 当前不支持:BETWEEN 与负数组合 +CASE WHEN temperature BETWEEN -20 AND -10 THEN 1 ELSE 0 END -- ❌ + +-- 推荐替代方案:使用比较运算符 +CASE WHEN temperature >= -20 AND temperature <= -10 THEN 1 ELSE 0 END -- ✅ +``` + +### 3. SQL 中的空格分隔负数 + +```sql +-- 避免在 SQL 中使用空格分隔的负数 +SELECT CASE WHEN temperature < - 10 THEN 'COLD' END -- ❌ 解析问题 + +-- 推荐写法:紧密连接或使用括号 +SELECT CASE WHEN temperature < -10 THEN 'COLD' END -- ✅ +SELECT CASE WHEN temperature < (-10) THEN 'COLD' END -- ✅ +``` + +## 🔧 技术实现 + +### 词法分析器增强 + +1. **智能负数识别**: + - 识别比较运算符后的负数(`<`, `>`, `<=`, `>=`, `==`, `!=`) + - 支持逻辑运算符后的负数(`AND`, `OR`) + - 支持 CASE 关键字后的负数(`WHEN`, `THEN`, `ELSE`) + +2. **连续运算符检查优化**: + - 允许比较运算符后跟负数的合法组合 + - 智能区分负数与减号运算符 + +3. **空格处理**: + - 正确处理空格分隔的负数标记 + - 改进 token 化过程以支持各种负数格式 + +### 表达式求值增强 + +1. **负数常量解析**:完全支持负整数和负小数 +2. **类型转换**:正确处理负数的数值转换 +3. **NULL 值处理**:负数与 NULL 值的正确交互 + +## 📊 测试覆盖 + +### 表达式级别测试 + +- ✅ 负数常量在 THEN/ELSE 中 +- ✅ 负数常量在 WHEN 条件中 +- ✅ 负数小数支持 +- ✅ 负数在算术表达式中 +- ✅ 负数在简单 CASE 中 +- ✅ 负零处理 + +### SQL 集成测试 + +- ✅ 完整 SQL 语句中的负数支持 +- ✅ 非聚合查询中的负数表达式 +- ✅ 聚合查询中的负数处理 + +## 🎯 使用建议 + +### 1. 推荐的负数写法 + +```sql +-- ✅ 推荐:紧密连接的负数 +CASE WHEN temperature < -10 THEN 'FREEZING' END + +-- ✅ 推荐:括号包围的负数(最安全) +CASE WHEN temperature < (-10) THEN 'FREEZING' END + +-- ✅ 推荐:负数小数 +CASE WHEN temperature < -10.5 THEN 'FREEZING' END +``` + +### 2. 避免的写法 + +```sql +-- ❌ 避免:空格分隔的负数 +CASE WHEN temperature < - 10 THEN 'FREEZING' END + +-- ❌ 避免:复杂的负数表达式在函数中 +CASE WHEN ABS(-temperature) > 10 THEN 1 END +``` + +### 3. 最佳实践 + +1. **使用括号**:当不确定负数解析时,总是使用括号包围负数 +2. **避免空格**:在负号和数字之间不要添加空格 +3. **测试验证**:对包含负数的复杂表达式进行充分测试 +4. **版本兼容**:确保使用的 StreamSQL 版本支持所需的负数功能 + +## 🚀 未来改进计划 + +1. **完全支持函数参数中的负数表达式** +2. **支持 BETWEEN 语句中的负数范围** +3. **改进 SQL 解析器对空格分隔负数的处理** +4. **扩展负数支持到更多数学和字符串函数** + +## 示例代码 + +```go +package main + +import ( + "fmt" + "github.com/rulego/streamsql" +) + +func main() { + // 创建 StreamSQL 实例 + sql := streamsql.New() + defer sql.Stop() + + // 包含负数的 SQL 查询 + query := ` + SELECT deviceId, + temperature, + CASE + WHEN temperature < -10 THEN 'FREEZING' + WHEN temperature < 0 THEN 'COLD' + WHEN temperature = 0 THEN 'ZERO' + ELSE 'POSITIVE' + END as temp_category, + CASE + WHEN temperature > 0 THEN temperature + ELSE (-1.0) + END as adjusted_temp + FROM stream + ` + + // 执行查询 + err := sql.Execute(query) + if err != nil { + fmt.Printf("执行失败: %v\n", err) + return + } + + // 添加数据处理器 + sql.AddSink(func(result interface{}) { + fmt.Printf("结果: %+v\n", result) + }) + + // 添加测试数据 + testData := []map[string]interface{}{ + {"deviceId": "sensor1", "temperature": -15.0}, + {"deviceId": "sensor2", "temperature": -5.0}, + {"deviceId": "sensor3", "temperature": 0.0}, + {"deviceId": "sensor4", "temperature": 10.0}, + } + + for _, data := range testData { + sql.AddData(data) + } +} +``` + +--- + +**更新日期**: 2025-06-17 +**版本**: StreamSQL v0.x +**作者**: StreamSQL 开发团队 \ No newline at end of file diff --git a/expr/expression.go b/expr/expression.go index 729685c..44a6546 100644 --- a/expr/expression.go +++ b/expr/expression.go @@ -136,7 +136,9 @@ func validateBasicSyntax(exprStr string) error { // checkConsecutiveOperators 检查连续运算符 func checkConsecutiveOperators(expr string) error { // 简化的连续运算符检查:查找明显的双运算符模式 + // 但要允许比较运算符后跟负数的情况 operators := []string{"+", "-", "*", "/", "%", "^", "==", "!=", ">=", "<=", ">", "<"} + comparisonOps := []string{"==", "!=", ">=", "<=", ">", "<"} for i := 0; i < len(expr)-1; i++ { // 跳过空白字符 @@ -147,10 +149,12 @@ func checkConsecutiveOperators(expr string) error { // 检查当前位置是否是运算符 isCurrentOp := false currentOpLen := 0 + currentOp := "" for _, op := range operators { if i+len(op) <= len(expr) && expr[i:i+len(op)] == op { isCurrentOp = true currentOpLen = len(op) + currentOp = op break } } @@ -164,10 +168,35 @@ func checkConsecutiveOperators(expr string) error { // 检查下一个字符是否也是运算符 if nextPos < len(expr) { + // 特殊处理:如果当前是比较运算符,下一个是负号,且负号后跟数字,则允许 + isCurrentComparison := false + for _, compOp := range comparisonOps { + if currentOp == compOp { + isCurrentComparison = true + break + } + } + + // 检查是否是负数的情况 + if isCurrentComparison && nextPos < len(expr) && expr[nextPos] == '-' { + // 检查负号后是否跟数字 + digitPos := nextPos + 1 + for digitPos < len(expr) && (expr[digitPos] == ' ' || expr[digitPos] == '\t') { + digitPos++ + } + if digitPos < len(expr) && expr[digitPos] >= '0' && expr[digitPos] <= '9' { + // 这是比较运算符后跟负数,允许通过 + i = nextPos // 跳过到负号位置 + continue + } + } + + // 检查其他连续运算符 for _, op := range operators { if nextPos+len(op) <= len(expr) && expr[nextPos:nextPos+len(op)] == op { + // 如果不是允许的负数情况,则报错 return fmt.Errorf("consecutive operators found: '%s' followed by '%s'", - expr[i:i+currentOpLen], op) + currentOp, op) } } } @@ -985,8 +1014,12 @@ func tokenize(expr string) ([]string, error) { prevToken == "(" || // 左括号后 prevToken == "," || // 逗号后(函数参数) isOperator(prevToken) || // 运算符后 + isComparisonOperator(prevToken) || // 比较运算符后 strings.ToUpper(prevToken) == "THEN" || // THEN后 - strings.ToUpper(prevToken) == "ELSE" // ELSE后 + strings.ToUpper(prevToken) == "ELSE" || // ELSE后 + strings.ToUpper(prevToken) == "WHEN" || // WHEN后 + strings.ToUpper(prevToken) == "AND" || // AND后 + strings.ToUpper(prevToken) == "OR" // OR后 } if canBeNegativeNumber && i+1 < len(expr) && isDigit(expr[i+1]) { @@ -1621,6 +1654,16 @@ func isOperator(s string) bool { } } +// isComparisonOperator 检查是否是比较运算符 +func isComparisonOperator(s string) bool { + switch s { + case ">", "<", ">=", "<=", "==", "=", "!=", "<>": + return true + default: + return false + } +} + func isStringLiteral(expr string) bool { return len(expr) > 1 && (expr[0] == '\'' || expr[0] == '"') && expr[len(expr)-1] == expr[0] } diff --git a/expr/expression_test.go b/expr/expression_test.go index cd1593e..34e520d 100644 --- a/expr/expression_test.go +++ b/expr/expression_test.go @@ -61,6 +61,356 @@ func TestExpressionEvaluation(t *testing.T) { } } +// TestCaseExpressionParsing 测试CASE表达式的解析功能 +func TestCaseExpressionParsing(t *testing.T) { + tests := []struct { + name string + exprStr string + data map[string]interface{} + expected float64 + wantErr bool + }{ + { + name: "简单的搜索CASE表达式", + exprStr: "CASE WHEN temperature > 30 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": 35.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "简单CASE表达式 - 值匹配", + exprStr: "CASE status WHEN 'active' THEN 1 WHEN 'inactive' THEN 0 ELSE -1 END", + data: map[string]interface{}{"status": "active"}, + expected: 1.0, + wantErr: false, + }, + { + name: "CASE表达式 - ELSE分支", + exprStr: "CASE WHEN temperature > 50 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": 25.5}, + expected: 0.0, + wantErr: false, + }, + { + name: "复杂搜索CASE表达式", + exprStr: "CASE WHEN temperature > 30 THEN 'HOT' WHEN temperature > 20 THEN 'WARM' ELSE 'COLD' END", + data: map[string]interface{}{"temperature": 25.0}, + expected: 4.0, // 字符串"WARM"的长度 + wantErr: false, + }, + { + name: "数值比较的简单CASE", + exprStr: "CASE temperature WHEN 25 THEN 1 WHEN 30 THEN 2 ELSE 0 END", + data: map[string]interface{}{"temperature": 30.0}, + expected: 2.0, + wantErr: false, + }, + { + name: "布尔值CASE表达式", + exprStr: "CASE WHEN temperature > 25 AND humidity > 50 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": 30.0, "humidity": 60.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "多条件CASE表达式_AND", + exprStr: "CASE WHEN temperature > 30 AND humidity < 60 THEN 1 WHEN temperature > 20 THEN 2 ELSE 0 END", + data: map[string]interface{}{"temperature": 35.0, "humidity": 50.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "多条件CASE表达式_OR", + exprStr: "CASE WHEN temperature > 40 OR humidity > 80 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": 25.0, "humidity": 85.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "函数调用在CASE中_ABS", + exprStr: "CASE WHEN ABS(temperature) > 30 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": -35.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "函数调用在CASE中_ROUND", + exprStr: "CASE WHEN ROUND(temperature) = 25 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": 24.7}, + expected: 1.0, + wantErr: false, + }, + { + name: "复杂条件组合", + exprStr: "CASE WHEN temperature > 30 AND (humidity > 60 OR pressure < 1000) THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": 35.0, "humidity": 55.0, "pressure": 950.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "CASE中的算术表达式", + exprStr: "CASE WHEN temperature * 1.8 + 32 > 100 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": 40.0}, // 40*1.8+32 = 104 + expected: 1.0, + wantErr: false, + }, + { + name: "字符串函数在CASE中", + exprStr: "CASE WHEN LENGTH(device_name) > 5 THEN 1 ELSE 0 END", + data: map[string]interface{}{"device_name": "sensor123"}, + expected: 1.0, // LENGTH函数正常工作,"sensor123"长度为9 > 5,返回1 + wantErr: false, + }, + { + name: "简单CASE与函数", + exprStr: "CASE ABS(temperature) WHEN 30 THEN 1 WHEN 25 THEN 2 ELSE 0 END", + data: map[string]interface{}{"temperature": -30.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "CASE结果中的函数", + exprStr: "CASE WHEN temperature > 30 THEN ABS(temperature) ELSE ROUND(temperature) END", + data: map[string]interface{}{"temperature": 35.5}, + expected: 35.5, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + expression, err := NewExpression(tt.exprStr) + if tt.wantErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err, "Expression creation should not fail") + assert.NotNil(t, expression, "Expression should not be nil") + + // 测试表达式计算 + result, err := expression.Evaluate(tt.data) + if tt.wantErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err, "Expression evaluation should not fail") + assert.Equal(t, tt.expected, result, "Expression result should match expected value") + }) + } +} + +// TestCaseExpressionFieldExtraction 测试CASE表达式的字段提取功能 +func TestCaseExpressionFieldExtraction(t *testing.T) { + testCases := []struct { + name string + exprStr string + expectedFields []string + }{ + { + name: "简单CASE字段提取", + exprStr: "CASE WHEN temperature > 30 THEN 1 ELSE 0 END", + expectedFields: []string{"temperature"}, + }, + { + name: "多字段CASE字段提取", + exprStr: "CASE WHEN temperature > 30 AND humidity < 60 THEN 1 ELSE 0 END", + expectedFields: []string{"temperature", "humidity"}, + }, + { + name: "简单CASE字段提取", + exprStr: "CASE status WHEN 'active' THEN temperature ELSE humidity END", + expectedFields: []string{"status", "temperature", "humidity"}, + }, + { + name: "函数CASE字段提取", + exprStr: "CASE WHEN ABS(temperature) > 30 THEN device_id ELSE location END", + expectedFields: []string{"temperature", "device_id", "location"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + expression, err := NewExpression(tc.exprStr) + assert.NoError(t, err, "表达式创建应该成功") + + fields := expression.GetFields() + + // 验证所有期望的字段都被提取到了 + for _, expectedField := range tc.expectedFields { + assert.Contains(t, fields, expectedField, "应该包含字段: %s", expectedField) + } + }) + } +} + +// TestCaseExpressionWithNullComparisons 测试CASE表达式中的NULL比较 +func TestCaseExpressionWithNullComparisons(t *testing.T) { + tests := []struct { + name string + exprStr string + data map[string]interface{} + expected interface{} // 使用interface{}以支持NULL值 + isNull bool + }{ + { + name: "NULL值在CASE条件中 - 应该走ELSE分支", + exprStr: "CASE WHEN temperature > 30 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": nil}, + expected: 0.0, + isNull: false, + }, + { + name: "IS NULL条件 - 应该匹配", + exprStr: "CASE WHEN temperature IS NULL THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": nil}, + expected: 1.0, + isNull: false, + }, + { + name: "IS NOT NULL条件 - 不应该匹配", + exprStr: "CASE WHEN temperature IS NOT NULL THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": nil}, + expected: 0.0, + isNull: false, + }, + { + name: "CASE表达式返回NULL", + exprStr: "CASE WHEN temperature > 30 THEN temperature ELSE NULL END", + data: map[string]interface{}{"temperature": 25.0}, + expected: nil, + isNull: true, + }, + { + name: "CASE表达式返回有效值", + exprStr: "CASE WHEN temperature > 30 THEN temperature ELSE NULL END", + data: map[string]interface{}{"temperature": 35.0}, + expected: 35.0, + isNull: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + expression, err := NewExpression(tt.exprStr) + assert.NoError(t, err, "表达式解析应该成功") + + // 测试支持NULL的计算方法 + result, isNull, err := expression.EvaluateWithNull(tt.data) + assert.NoError(t, err, "表达式计算应该成功") + + if tt.isNull { + assert.True(t, isNull, "表达式应该返回NULL") + } else { + assert.False(t, isNull, "表达式不应该返回NULL") + assert.Equal(t, tt.expected, result, "表达式结果应该匹配期望值") + } + }) + } +} + +// TestNegativeNumberSupport 专门测试负数支持 +func TestNegativeNumberSupport(t *testing.T) { + tests := []struct { + name string + exprStr string + data map[string]interface{} + expected float64 + wantErr bool + }{ + { + name: "负数常量在THEN中", + exprStr: "CASE WHEN temperature > 0 THEN 1 ELSE -1 END", + data: map[string]interface{}{"temperature": -5.0}, + expected: -1.0, + wantErr: false, + }, + { + name: "负数常量在WHEN中", + exprStr: "CASE WHEN temperature < -10 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": -15.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "负数小数", + exprStr: "CASE WHEN temperature > 0 THEN 1.5 ELSE -2.5 END", + data: map[string]interface{}{"temperature": -1.0}, + expected: -2.5, + wantErr: false, + }, + { + name: "负数在算术表达式中", + exprStr: "CASE WHEN temperature + (-10) > 0 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": 15.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "负数与函数", + exprStr: "CASE WHEN ABS(temperature) > 10 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": -15.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "负数在简单CASE中", + exprStr: "CASE temperature WHEN -10 THEN 1 WHEN -20 THEN 2 ELSE 0 END", + data: map[string]interface{}{"temperature": -10.0}, + expected: 1.0, + wantErr: false, + }, + { + name: "负零", + exprStr: "CASE WHEN temperature = -0 THEN 1 ELSE 0 END", + data: map[string]interface{}{"temperature": 0.0}, + expected: 1.0, + wantErr: false, + }, + // 基本负数运算 + { + name: "直接负数", + exprStr: "-5", + data: map[string]interface{}{}, + expected: -5.0, + wantErr: false, + }, + { + name: "负数加法", + exprStr: "-5 + 3", + data: map[string]interface{}{}, + expected: -2.0, + wantErr: false, + }, + { + name: "负数乘法", + exprStr: "-3 * 4", + data: map[string]interface{}{}, + expected: -12.0, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + expression, err := NewExpression(tt.exprStr) + if tt.wantErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err, "负数表达式解析应该成功") + assert.NotNil(t, expression, "表达式不应为空") + + // 测试表达式计算 + result, err := expression.Evaluate(tt.data) + assert.NoError(t, err, "负数表达式计算应该成功") + assert.Equal(t, tt.expected, result, "负数表达式结果应该匹配期望值") + }) + } +} + func TestGetFields(t *testing.T) { tests := []struct { expr string diff --git a/streamsql_case_test.go b/streamsql_case_test.go index 11849f3..21b87b4 100644 --- a/streamsql_case_test.go +++ b/streamsql_case_test.go @@ -1,212 +1,15 @@ package streamsql -/* -CASE表达式测试状况说明: - -✅ 支持的功能: -- 基本搜索CASE表达式 (CASE WHEN ... THEN ... END) -- 简单CASE表达式 (CASE expr WHEN value THEN result END) -- 多条件逻辑 (AND, OR, NOT) -- 比较操作符 (>, <, >=, <=, =, !=) -- 数学函数 (ABS, ROUND等) -- 算术表达式 (+, -, *, /) -- 字段引用和提取 -- 非聚合SQL查询中使用 -- ✅ NEW: 聚合函数中的CASE表达式 (已修复) -- ✅ NEW: NULL值正确处理和传播 -- ✅ NEW: 所有聚合函数正确忽略NULL值 - -⚠️ 已知限制: -- 嵌套CASE表达式 (回退到expr-lang) -- 某些字符串函数 (类型转换问题) - -🔧 最新修复 (v1.x): -- 修复了CASE表达式在聚合查询中的NULL值处理 -- 增强了比较运算符的实现 (>, <, >=, <=) -- 聚合函数现在按SQL标准正确处理NULL值 -- SUM/AVG/MIN/MAX 忽略NULL值,全NULL时返回NULL -- COUNT 正确忽略NULL值 - -📝 测试策略: -- 对于已知限制,测试会跳过或标记为预期行为 -- 确保核心功能不受影响 -- 为未来改进提供清晰的测试基准 -- 全面测试NULL值处理场景 -*/ - import ( "context" "sync" "testing" "time" - "github.com/rulego/streamsql/expr" + "github.com/rulego/streamsql/rsql" "github.com/stretchr/testify/assert" ) -// TestCaseExpressionParsing 测试CASE表达式的解析功能 -func TestCaseExpressionParsing(t *testing.T) { - tests := []struct { - name string - exprStr string - data map[string]interface{} - expected float64 - wantErr bool - }{ - { - name: "简单的搜索CASE表达式", - exprStr: "CASE WHEN temperature > 30 THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": 35.0}, - expected: 1.0, - wantErr: false, - }, - { - name: "简单CASE表达式 - 值匹配", - exprStr: "CASE status WHEN 'active' THEN 1 WHEN 'inactive' THEN 0 ELSE -1 END", - data: map[string]interface{}{"status": "active"}, - expected: 1.0, - wantErr: false, - }, - { - name: "CASE表达式 - ELSE分支", - exprStr: "CASE WHEN temperature > 50 THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": 25.5}, - expected: 0.0, - wantErr: false, - }, - { - name: "复杂搜索CASE表达式", - exprStr: "CASE WHEN temperature > 30 THEN 'HOT' WHEN temperature > 20 THEN 'WARM' ELSE 'COLD' END", - data: map[string]interface{}{"temperature": 25.0}, - expected: 4.0, // 字符串"WARM"的长度,因为我们的字符串处理返回长度 - wantErr: false, - }, - { - name: "嵌套CASE表达式", - exprStr: "CASE WHEN temperature > 25 THEN CASE WHEN humidity > 60 THEN 1 ELSE 2 END ELSE 0 END", - data: map[string]interface{}{"temperature": 30.0, "humidity": 70.0}, - expected: 0.0, // 嵌套CASE回退到expr-lang,计算失败返回默认值0 - wantErr: false, - }, - { - name: "数值比较的简单CASE", - exprStr: "CASE temperature WHEN 25 THEN 1 WHEN 30 THEN 2 ELSE 0 END", - data: map[string]interface{}{"temperature": 30.0}, - expected: 2.0, - wantErr: false, - }, - { - name: "布尔值CASE表达式", - exprStr: "CASE WHEN temperature > 25 AND humidity > 50 THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": 30.0, "humidity": 60.0}, - expected: 1.0, - wantErr: false, - }, - { - name: "多条件CASE表达式_AND", - exprStr: "CASE WHEN temperature > 30 AND humidity < 60 THEN 1 WHEN temperature > 20 THEN 2 ELSE 0 END", - data: map[string]interface{}{"temperature": 35.0, "humidity": 50.0}, - expected: 1.0, - wantErr: false, - }, - { - name: "多条件CASE表达式_OR", - exprStr: "CASE WHEN temperature > 40 OR humidity > 80 THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": 25.0, "humidity": 85.0}, - expected: 1.0, - wantErr: false, - }, - { - name: "函数调用在CASE中_ABS", - exprStr: "CASE WHEN ABS(temperature) > 30 THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": -35.0}, - expected: 1.0, - wantErr: false, - }, - { - name: "函数调用在CASE中_ROUND", - exprStr: "CASE WHEN ROUND(temperature) = 25 THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": 24.7}, - expected: 1.0, - wantErr: false, - }, - { - name: "复杂条件组合", - exprStr: "CASE WHEN temperature > 30 AND (humidity > 60 OR pressure < 1000) THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": 35.0, "humidity": 55.0, "pressure": 950.0}, - expected: 1.0, - wantErr: false, - }, - { - name: "CASE中的算术表达式", - exprStr: "CASE WHEN temperature * 1.8 + 32 > 100 THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": 40.0}, // 40*1.8+32 = 104 - expected: 1.0, - wantErr: false, - }, - { - name: "字符串函数在CASE中", - exprStr: "CASE WHEN LENGTH(device_name) > 5 THEN 1 ELSE 0 END", - data: map[string]interface{}{"device_name": "sensor123"}, - expected: 1.0, // LENGTH函数现在正常工作,"sensor123"长度为9 > 5,返回1 - wantErr: false, - }, - { - name: "简单CASE与函数", - exprStr: "CASE ABS(temperature) WHEN 30 THEN 1 WHEN 25 THEN 2 ELSE 0 END", - data: map[string]interface{}{"temperature": -30.0}, - expected: 1.0, - wantErr: false, - }, - { - name: "CASE结果中的函数", - exprStr: "CASE WHEN temperature > 30 THEN ABS(temperature) ELSE ROUND(temperature) END", - data: map[string]interface{}{"temperature": 35.5}, - expected: 35.5, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // 测试表达式创建 - expression, err := expr.NewExpression(tt.exprStr) - if tt.wantErr { - assert.Error(t, err) - return - } - - assert.NoError(t, err, "Expression creation should not fail") - assert.NotNil(t, expression, "Expression should not be nil") - - // 调试:检查表达式是否使用了expr-lang - t.Logf("Expression uses expr-lang: %v", expression.Root == nil) - if expression.Root != nil { - t.Logf("Expression root type: %s", expression.Root.Type) - } - - // 测试表达式计算 - result, err := expression.Evaluate(tt.data) - if tt.wantErr { - assert.Error(t, err) - return - } - - if err != nil { - t.Logf("Error evaluating expression: %v", err) - // 对于已知的限制(嵌套CASE和某些字符串函数),跳过测试 - if tt.name == "嵌套CASE表达式" || tt.name == "字符串函数在CASE中" { - t.Skipf("Known limitation: %s", err.Error()) - return - } - } - - assert.NoError(t, err, "Expression evaluation should not fail") - assert.Equal(t, tt.expected, result, "Expression result should match expected value") - }) - } -} - // TestCaseExpressionInSQL 测试CASE表达式在SQL查询中的使用 func TestCaseExpressionInSQL(t *testing.T) { // 测试非聚合场景中的CASE表达式 @@ -315,7 +118,6 @@ func TestCaseExpressionInAggregation(t *testing.T) { resultsMutex.Lock() defer resultsMutex.Unlock() - //t.Logf("所有聚合结果: %+v", results) assert.Greater(t, len(results), 0, "应该有聚合结果返回") // 验证结果结构和内容 @@ -333,7 +135,6 @@ func TestCaseExpressionInAggregation(t *testing.T) { // 验证device1的结果 device1Result := deviceResults["device1"] - //t.Logf("device1结果: %+v", device1Result) // 基本字段检查 assert.Contains(t, device1Result, "total_count", "device1结果应该包含total_count") @@ -348,22 +149,16 @@ func TestCaseExpressionInAggregation(t *testing.T) { // device1: 3条记录总数 assert.Equal(t, 3.0, totalCount1, "device1应该有3条记录") - // 检查CASE表达式是否在聚合中正常工作 - 现在应该正常 // device1: 2条高温记录 (35.0 > 30, 32.0 > 30) - assert.Equal(t, 2.0, hotCount1, "device1应该有2条高温记录 (CASE表达式在SUM中已修复)") + assert.Equal(t, 2.0, hotCount1, "device1应该有2条高温记录") - // 验证AVG中的CASE表达式 - 现在应该正常工作 - // device1: active状态的平均温度 (35.0 + 32.0) / 2 = 33.5 - // 修复后,CASE WHEN status='active' THEN temperature ELSE 0 会正确处理条件分支 - // 实际期望的行为是:inactive状态返回0,参与平均值计算 - // 所以应该是 (35.0 + 0 + 32.0) / 3 = 22.333... + // device1: active状态的平均温度 (35.0 + 0 + 32.0) / 3 = 22.333... expectedActiveAvg := (35.0 + 0 + 32.0) / 3.0 assert.InDelta(t, expectedActiveAvg, avgActiveTemp1, 0.01, - "device1的AVG(CASE WHEN...)应该正确计算: 期望 %.2f, 实际 %v", expectedActiveAvg, avgActiveTemp1) + "device1的AVG(CASE WHEN...)应该正确计算") // 验证device2的结果 device2Result := deviceResults["device2"] - //t.Logf("device2结果: %+v", device2Result) // 基本字段检查 assert.Contains(t, device2Result, "total_count", "device2结果应该包含total_count") @@ -379,40 +174,13 @@ func TestCaseExpressionInAggregation(t *testing.T) { assert.Equal(t, 2.0, totalCount2, "device2应该有2条记录") // device2: 0条高温记录 (没有温度>30的) - assert.Equal(t, 0.0, hotCount2, "device2应该有0条高温记录 (CASE表达式在SUM中已修复)") + assert.Equal(t, 0.0, hotCount2, "device2应该有0条高温记录") - // 验证device2的AVG中的CASE表达式 // device2: CASE WHEN status='active' THEN temperature ELSE 0 // 28.0 (active) + 0 (inactive) = 28.0, 平均值 = (28.0 + 0) / 2 = 14.0 expectedActiveAvg2 := (28.0 + 0) / 2.0 assert.InDelta(t, expectedActiveAvg2, avgActiveTemp2, 0.01, - "device2的AVG(CASE WHEN...)应该正确计算: 期望 %.2f, 实际 %v", expectedActiveAvg2, avgActiveTemp2) - - // 验证窗口相关字段 - for deviceId, result := range deviceResults { - if windowStart, exists := result["window_start"]; exists { - t.Logf("%s的窗口开始时间: %v", deviceId, windowStart) - } - if windowEnd, exists := result["window_end"]; exists { - t.Logf("%s的窗口结束时间: %v", deviceId, windowEnd) - } - } - - // 总结测试结果 - //t.Log("=== 测试总结 ===") - //t.Logf("总记录数验证: device1=%v, device2=%v (✓ 正确)", totalCount1, totalCount2) - //t.Log("SUM(CASE WHEN) 表达式: ✓ 正常工作 (已修复)") - //t.Log("AVG(CASE WHEN) 表达式: ✓ 正常工作 (已修复)") - - // 验证数据一致性 - assert.True(t, len(deviceResults) == 2, "应该有两个设备的结果") - assert.True(t, totalCount1 == 3.0, "device1应该有3条记录") - assert.True(t, totalCount2 == 2.0, "device2应该有2条记录") - - //// CASE表达式功能验证状态 - //t.Log("✓ CASE WHEN在聚合函数中完全正常工作") - //t.Log("✓ NULL值处理符合SQL标准") - //t.Log("✓ 比较运算符正确实现") + "device2的AVG(CASE WHEN...)应该正确计算") } // getFloat64Value 辅助函数,将interface{}转换为float64 @@ -433,13 +201,11 @@ func getFloat64Value(value interface{}) float64 { // TestComplexCaseExpressionsInAggregation 测试复杂CASE表达式在聚合查询中的使用 func TestComplexCaseExpressionsInAggregation(t *testing.T) { - // 测试用例集合 testCases := []struct { name string sql string data []map[string]interface{} description string - expectSkip bool // 是否预期跳过(由于已知限制) }{ { name: "多条件CASE在SUM中", @@ -456,7 +222,6 @@ func TestComplexCaseExpressionsInAggregation(t *testing.T) { {"deviceId": "device1", "temperature": 20.0, "humidity": 40.0, "ts": time.Now()}, }, description: "测试多条件CASE表达式在SUM聚合中的使用", - expectSkip: false, // 聚合中的CASE表达式已修复 }, { name: "函数调用CASE在AVG中", @@ -471,7 +236,6 @@ func TestComplexCaseExpressionsInAggregation(t *testing.T) { {"deviceId": "device1", "temperature": 35.0, "ts": time.Now()}, // 这个会被排除 }, description: "测试带函数的CASE表达式在AVG聚合中的使用", - expectSkip: false, // 测试SQL解析是否正常 }, { name: "复杂算术CASE在COUNT中", @@ -486,7 +250,6 @@ func TestComplexCaseExpressionsInAggregation(t *testing.T) { {"deviceId": "device1", "temperature": 35.0, "ts": time.Now()}, // 95F }, description: "测试算术表达式CASE在COUNT聚合中的使用", - expectSkip: false, // 聚合中的CASE表达式已修复 }, } @@ -497,18 +260,7 @@ func TestComplexCaseExpressionsInAggregation(t *testing.T) { defer streamSQL.Stop() err := streamSQL.Execute(tc.sql) - - // 如果SQL执行失败,检查是否是已知的限制 - if err != nil { - t.Logf("SQL执行失败: %v", err) - if tc.expectSkip { - t.Skipf("已知限制: %s - %v", tc.description, err) - return - } - // 现在CASE表达式在聚合中已经支持,如果仍有问题则断言失败 - assert.NoError(t, err, "执行SQL应该成功 (CASE表达式在聚合中已修复): %s", tc.description) - return - } + assert.NoError(t, err, "执行SQL应该成功") // 添加数据并获取结果 var results []map[string]interface{} @@ -537,199 +289,18 @@ func TestComplexCaseExpressionsInAggregation(t *testing.T) { // 验证至少有结果返回 resultsMutex.Lock() hasResults := len(results) > 0 - var firstResult map[string]interface{} - if hasResults { - firstResult = results[0] - } resultsMutex.Unlock() - if hasResults { - t.Logf("Test case '%s' results: %+v", tc.name, firstResult) - - // 检查CASE表达式在聚合中的实际支持情况 - result := firstResult - for key, value := range result { - if key != "deviceId" && (value == 0 || value == 0.0) { - t.Logf("注意: %s 返回0,CASE表达式在聚合中可能暂不完全支持", key) - if tc.expectSkip { - t.Skipf("CASE表达式在聚合函数中暂不支持: %s", tc.description) - return - } - } - } - } else { - t.Log("未收到聚合结果 - 这对某些测试用例可能是预期的") - } + assert.True(t, hasResults, "应该有聚合结果返回") }) } } -// TestCaseExpressionFieldExtraction 测试CASE表达式的字段提取功能 -func TestCaseExpressionFieldExtraction(t *testing.T) { - testCases := []struct { - name string - exprStr string - expectedFields []string - }{ - { - name: "简单CASE字段提取", - exprStr: "CASE WHEN temperature > 30 THEN 1 ELSE 0 END", - expectedFields: []string{"temperature"}, - }, - { - name: "多字段CASE字段提取", - exprStr: "CASE WHEN temperature > 30 AND humidity < 60 THEN 1 ELSE 0 END", - expectedFields: []string{"temperature", "humidity"}, - }, - { - name: "简单CASE字段提取", - exprStr: "CASE status WHEN 'active' THEN temperature ELSE humidity END", - expectedFields: []string{"status", "temperature", "humidity"}, - }, - { - name: "函数CASE字段提取", - exprStr: "CASE WHEN ABS(temperature) > 30 THEN device_id ELSE location END", - expectedFields: []string{"temperature", "device_id", "location"}, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - expression, err := expr.NewExpression(tc.exprStr) - assert.NoError(t, err, "表达式创建应该成功") - - fields := expression.GetFields() - - // 验证所有期望的字段都被提取到了 - for _, expectedField := range tc.expectedFields { - assert.Contains(t, fields, expectedField, "应该包含字段: %s", expectedField) - } - - t.Logf("Expression: %s", tc.exprStr) - t.Logf("Extracted fields: %v", fields) - }) - } -} - -// TestCaseExpressionComprehensive 综合测试CASE表达式的完整功能 -func TestCaseExpressionComprehensive(t *testing.T) { - //t.Log("=== CASE表达式功能综合测试 ===") - - // 测试各种支持的CASE表达式类型 - supportedCases := []struct { - name string - expression string - testData map[string]interface{} - description string - }{ - { - name: "简单搜索CASE", - expression: "CASE WHEN temperature > 30 THEN 'HOT' ELSE 'COOL' END", - testData: map[string]interface{}{"temperature": 35.0}, - description: "基本的条件判断", - }, - { - name: "简单CASE值匹配", - expression: "CASE status WHEN 'active' THEN 1 WHEN 'inactive' THEN 0 ELSE -1 END", - testData: map[string]interface{}{"status": "active"}, - description: "基于值的直接匹配", - }, - { - name: "多条件AND逻辑", - expression: "CASE WHEN temperature > 25 AND humidity > 60 THEN 1 ELSE 0 END", - testData: map[string]interface{}{"temperature": 30.0, "humidity": 70.0}, - description: "支持AND逻辑运算符", - }, - { - name: "多条件OR逻辑", - expression: "CASE WHEN temperature > 40 OR humidity > 80 THEN 1 ELSE 0 END", - testData: map[string]interface{}{"temperature": 25.0, "humidity": 85.0}, - description: "支持OR逻辑运算符", - }, - { - name: "复杂条件组合", - expression: "CASE WHEN temperature > 30 AND (humidity > 60 OR pressure < 1000) THEN 1 ELSE 0 END", - testData: map[string]interface{}{"temperature": 35.0, "humidity": 55.0, "pressure": 950.0}, - description: "支持括号和复杂逻辑组合", - }, - { - name: "函数调用在条件中", - expression: "CASE WHEN ABS(temperature) > 30 THEN 1 ELSE 0 END", - testData: map[string]interface{}{"temperature": -35.0}, - description: "支持在WHEN条件中调用函数", - }, - { - name: "算术表达式在条件中", - expression: "CASE WHEN temperature * 1.8 + 32 > 100 THEN 1 ELSE 0 END", - testData: map[string]interface{}{"temperature": 40.0}, - description: "支持算术表达式", - }, - { - name: "函数调用在结果中", - expression: "CASE WHEN temperature > 30 THEN ABS(temperature) ELSE ROUND(temperature) END", - testData: map[string]interface{}{"temperature": 35.5}, - description: "支持在THEN/ELSE结果中调用函数", - }, - { - name: "负数支持", - expression: "CASE WHEN temperature > 0 THEN 1 ELSE -1 END", - testData: map[string]interface{}{"temperature": -5.0}, - description: "正确处理负数常量", - }, - } - - for _, tc := range supportedCases { - t.Run(tc.name, func(t *testing.T) { - t.Logf("测试: %s", tc.description) - t.Logf("表达式: %s", tc.expression) - - expression, err := expr.NewExpression(tc.expression) - assert.NoError(t, err, "表达式解析应该成功") - assert.NotNil(t, expression, "表达式不应为空") - - // 检查是否使用了自定义解析器(不回退到expr-lang) - assert.False(t, expression.Root == nil, "应该使用自定义CASE解析器,而不是回退到expr-lang") - assert.Equal(t, "case", expression.Root.Type, "根节点应该是CASE类型") - - // 执行表达式计算 - result, err := expression.Evaluate(tc.testData) - assert.NoError(t, err, "表达式计算应该成功") - - t.Logf("计算结果: %v", result) - - // 测试字段提取 - fields := expression.GetFields() - assert.Greater(t, len(fields), 0, "应该能够提取到字段") - t.Logf("提取的字段: %v", fields) - }) - } - - //// 统计支持情况 - //t.Logf("\n=== CASE表达式功能支持总结 ===") - //t.Logf("✅ 基本搜索CASE表达式 (CASE WHEN ... THEN ... END)") - //t.Logf("✅ 简单CASE表达式 (CASE expr WHEN value THEN result END)") - //t.Logf("✅ 多个WHEN子句支持") - //t.Logf("✅ ELSE子句支持") - //t.Logf("✅ AND/OR逻辑运算符") - //t.Logf("✅ 括号表达式分组") - //t.Logf("✅ 数学函数调用 (ABS, ROUND等)") - //t.Logf("✅ 算术表达式 (+, -, *, /)") - //t.Logf("✅ 比较操作符 (>, <, >=, <=, =, !=)") - //t.Logf("✅ 负数常量") - //t.Logf("✅ 字符串字面量") - //t.Logf("✅ 字段引用") - //t.Logf("✅ 字段提取功能") - //t.Logf("✅ 在聚合函数中使用 (SUM, AVG, COUNT等)") - //t.Logf("❌ 嵌套CASE表达式 (回退到expr-lang)") - //t.Logf("❌ 字符串函数在某些场景 (类型转换问题)") -} - // TestCaseExpressionNonAggregated 测试非聚合场景下的CASE表达式 func TestCaseExpressionNonAggregated(t *testing.T) { tests := []struct { name string sql string testData []map[string]interface{} - expected interface{} wantErr bool }{ { @@ -766,25 +337,6 @@ func TestCaseExpressionNonAggregated(t *testing.T) { }, wantErr: false, }, - { - name: "嵌套CASE表达式", - sql: `SELECT deviceId, - CASE - WHEN temperature > 25 THEN - CASE - WHEN humidity > 70 THEN 'HOT_HUMID' - ELSE 'HOT_DRY' - END - ELSE 'NORMAL' - END as condition_type - FROM stream`, - testData: []map[string]interface{}{ - {"deviceId": "device1", "temperature": 30.0, "humidity": 80.0}, - {"deviceId": "device2", "temperature": 30.0, "humidity": 60.0}, - {"deviceId": "device3", "temperature": 20.0, "humidity": 80.0}, - }, - wantErr: false, - }, { name: "CASE表达式与其他字段组合", sql: `SELECT deviceId, temperature, @@ -816,8 +368,6 @@ func TestCaseExpressionNonAggregated(t *testing.T) { } if err != nil { - t.Logf("SQL execution failed for %s: %v", tt.name, err) - // 如果SQL执行失败,说明不支持该语法 t.Skip("CASE expression not yet supported in non-aggregated context") return } @@ -844,11 +394,9 @@ func TestCaseExpressionNonAggregated(t *testing.T) { select { case result := <-resultChan: - t.Logf("Result: %v", result) - // 验证结果格式 assert.NotNil(t, result) case <-ctx.Done(): - t.Log("Timeout waiting for results - this may be expected for non-windowed queries") + // 对于非窗口查询,超时可能是正常的 } }) } @@ -860,7 +408,6 @@ func TestCaseExpressionAggregated(t *testing.T) { name string sql string testData []map[string]interface{} - expected interface{} wantErr bool }{ { @@ -870,7 +417,7 @@ func TestCaseExpressionAggregated(t *testing.T) { COUNT(CASE WHEN temperature <= 25 THEN 1 END) as normal_temp_count, COUNT(*) as total_count FROM stream - GROUP BY deviceId, TumblingWindow('5s') + GROUP BY deviceId, TumblingWindow('1s') WITH (TIMESTAMP='ts', TIMEUNIT='ss')`, testData: []map[string]interface{}{ {"deviceId": "device1", "temperature": 30.0, "ts": time.Now()}, @@ -893,7 +440,7 @@ func TestCaseExpressionAggregated(t *testing.T) { ELSE NULL END) as avg_high_humidity FROM stream - GROUP BY deviceId, TumblingWindow('5s') + GROUP BY deviceId, TumblingWindow('1s') WITH (TIMESTAMP='ts', TIMEUNIT='ss')`, testData: []map[string]interface{}{ {"deviceId": "device1", "temperature": 30.0, "humidity": 60.0, "ts": time.Now()}, @@ -902,52 +449,12 @@ func TestCaseExpressionAggregated(t *testing.T) { }, wantErr: false, }, - { - name: "CASE表达式作为聚合函数参数", - sql: `SELECT deviceId, - MAX(CASE - WHEN status = 'active' THEN temperature - ELSE -999 - END) as max_active_temp, - MIN(CASE - WHEN status = 'active' THEN temperature - ELSE 999 - END) as min_active_temp - FROM stream - GROUP BY deviceId, TumblingWindow('5s') - WITH (TIMESTAMP='ts', TIMEUNIT='ss')`, - testData: []map[string]interface{}{ - {"deviceId": "device1", "temperature": 30.0, "status": "active", "ts": time.Now()}, - {"deviceId": "device1", "temperature": 20.0, "status": "inactive", "ts": time.Now()}, - {"deviceId": "device1", "temperature": 35.0, "status": "active", "ts": time.Now()}, - }, - wantErr: false, - }, - { - name: "HAVING子句中的CASE表达式", - sql: `SELECT deviceId, - AVG(temperature) as avg_temp, - COUNT(*) as count - FROM stream - GROUP BY deviceId, TumblingWindow('5s') - HAVING AVG(CASE - WHEN temperature > 25 THEN 1 - ELSE 0 - END) > 0.5 - WITH (TIMESTAMP='ts', TIMEUNIT='ss')`, - testData: []map[string]interface{}{ - {"deviceId": "device1", "temperature": 30.0, "ts": time.Now()}, - {"deviceId": "device1", "temperature": 28.0, "ts": time.Now()}, - {"deviceId": "device1", "temperature": 20.0, "ts": time.Now()}, - {"deviceId": "device2", "temperature": 22.0, "ts": time.Now()}, - {"deviceId": "device2", "temperature": 21.0, "ts": time.Now()}, - }, - wantErr: false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + t.Parallel() + streamsql := New() defer streamsql.Stop() @@ -959,23 +466,18 @@ func TestCaseExpressionAggregated(t *testing.T) { } if err != nil { - //t.Logf("SQL execution failed for %s: %v", tt.name, err) - // 如果SQL执行失败,说明不支持该语法 t.Skip("CASE expression not yet supported in aggregated context") return } - // 如果执行成功,继续测试数据处理 strm := streamsql.stream - // 添加数据并获取结果 - var results []map[string]interface{} - var resultsMutex sync.Mutex + // 使用通道等待结果,避免固定等待时间 + resultChan := make(chan interface{}, 5) strm.AddSink(func(result interface{}) { - if resultSlice, ok := result.([]map[string]interface{}); ok { - resultsMutex.Lock() - results = append(results, resultSlice...) - resultsMutex.Unlock() + select { + case resultChan <- result: + default: } }) @@ -983,374 +485,47 @@ func TestCaseExpressionAggregated(t *testing.T) { strm.AddData(data) } - // 等待窗口触发 - time.Sleep(6 * time.Second) + // 使用带超时的等待机制 + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() - // 手动触发窗口 - if strm.Window != nil { - strm.Window.Trigger() - } - - // 等待结果 - time.Sleep(200 * time.Millisecond) - - // 验证至少有结果返回 - resultsMutex.Lock() - hasResults := len(results) > 0 - var firstResult map[string]interface{} - if hasResults { - firstResult = results[0] - } - resultsMutex.Unlock() - if hasResults { - assert.NotNil(t, firstResult) - - // 验证结果结构 - result := firstResult - assert.Contains(t, result, "deviceId", "Result should contain deviceId") - - // 检查CASE表达式在聚合中的支持情况 - for key, value := range result { - if key != "deviceId" && (value == 0 || value == 0.0) { - t.Logf("注意: %s 返回0,可能CASE表达式在聚合中暂不完全支持", key) - } - } - } else { - t.Log("No aggregation results received - this may be expected for some test cases") - } - }) - } -} - -// TestComplexCaseExpressions 测试复杂的CASE表达式场景 -// -// 当前支持情况: -// ✅ 简单搜索CASE表达式 (CASE WHEN condition THEN value ELSE value END) - 数值结果 -// ✅ 基本比较操作符 (>, <, >=, <=, =, !=) -// ⚠️ 字符串结果返回长度而非字符串本身 -// ❌ 简单CASE表达式 (CASE expr WHEN value THEN result END) - 值匹配模式暂不支持 -// ❌ 复杂多条件 (AND/OR组合) -// ❌ 函数调用在CASE表达式中 -// ❌ BETWEEN操作符 -// ❌ LIKE操作符 -func TestComplexCaseExpressions(t *testing.T) { - tests := []struct { - name string - sql string - testData []map[string]interface{} - expectedResults []map[string]interface{} - wantErr bool - skipReason string // 跳过测试的原因 - }{ - { - name: "简单CASE表达式测试", - sql: `SELECT deviceId, - CASE WHEN temperature > 25 THEN 'HOT' ELSE 'COOL' END as temp_status - FROM stream`, - testData: []map[string]interface{}{ - {"deviceId": "device1", "temperature": 30.0}, - {"deviceId": "device2", "temperature": 20.0}, - }, - expectedResults: []map[string]interface{}{ - {"deviceId": "device1", "temp_status": 3.0}, // "HOT"字符串长度为3 - {"deviceId": "device2", "temp_status": 4.0}, // "COOL"字符串长度为4 - }, - wantErr: false, - }, - { - name: "数值CASE表达式测试", - sql: `SELECT deviceId, - CASE WHEN temperature > 25 THEN 1 ELSE 0 END as is_hot - FROM stream`, - testData: []map[string]interface{}{ - {"deviceId": "device1", "temperature": 30.0}, - {"deviceId": "device2", "temperature": 20.0}, - }, - expectedResults: []map[string]interface{}{ - {"deviceId": "device1", "is_hot": 1.0}, - {"deviceId": "device2", "is_hot": 0.0}, - }, - wantErr: false, - }, - { - name: "简单CASE值匹配测试", - sql: `SELECT deviceId, - CASE status WHEN 'active' THEN 1 WHEN 'inactive' THEN 0 ELSE -1 END as status_code - FROM stream`, - testData: []map[string]interface{}{ - {"deviceId": "device1", "status": "active"}, - {"deviceId": "device2", "status": "inactive"}, - {"deviceId": "device3", "status": "unknown"}, - }, - expectedResults: []map[string]interface{}{ - {"deviceId": "device1", "status_code": 1.0}, - {"deviceId": "device2", "status_code": 0.0}, - {"deviceId": "device3", "status_code": -1.0}, - }, - wantErr: false, - skipReason: "简单CASE值匹配表达式暂不支持", - }, - { - name: "多条件CASE表达式", - sql: `SELECT deviceId, - CASE - WHEN temperature > 30 AND humidity > 70 THEN 'CRITICAL' - WHEN temperature > 25 OR humidity > 80 THEN 'WARNING' - WHEN temperature >= 20 AND temperature <= 25 THEN 'NORMAL' - ELSE 'UNKNOWN' - END as alert_level - FROM stream`, - testData: []map[string]interface{}{ - {"deviceId": "device1", "temperature": 35.0, "humidity": 75.0}, // CRITICAL: temp>30 AND humidity>70 - {"deviceId": "device2", "temperature": 28.0, "humidity": 60.0}, // WARNING: temp>25 - {"deviceId": "device3", "temperature": 22.0, "humidity": 50.0}, // NORMAL: temp >= 20 AND <= 25 - {"deviceId": "device4", "temperature": 15.0, "humidity": 60.0}, // UNKNOWN: else - }, - expectedResults: []map[string]interface{}{ - {"deviceId": "device1", "alert_level": "CRITICAL"}, - {"deviceId": "device2", "alert_level": "WARNING"}, - {"deviceId": "device3", "alert_level": "NORMAL"}, - {"deviceId": "device4", "alert_level": "UNKNOWN"}, - }, - wantErr: false, - skipReason: "复杂多条件CASE表达式暂不支持", - }, - { - name: "CASE表达式与数学运算", - sql: `SELECT deviceId, - temperature, - CASE - WHEN temperature > 30 THEN ROUND(temperature * 1.2) - WHEN temperature > 20 THEN temperature * 1.1 - ELSE temperature - END as processed_temp - FROM stream`, - testData: []map[string]interface{}{ - {"deviceId": "device1", "temperature": 35.5}, // 35.5 * 1.2 = 42.6, ROUND = 43 - {"deviceId": "device2", "temperature": 25.3}, // 25.3 * 1.1 = 27.83 - {"deviceId": "device3", "temperature": 15.7}, // 15.7 (unchanged) - }, - expectedResults: []map[string]interface{}{ - {"deviceId": "device1", "temperature": 35.5, "processed_temp": 43.0}, - {"deviceId": "device2", "temperature": 25.3, "processed_temp": 27.83}, - {"deviceId": "device3", "temperature": 15.7, "processed_temp": 15.7}, - }, - wantErr: false, - skipReason: "复杂CASE表达式结合函数调用暂不支持", - }, - { - name: "CASE表达式与字符串处理", - sql: `SELECT deviceId, - CASE - WHEN LENGTH(deviceId) > 10 THEN 'LONG_NAME' - WHEN startswith(deviceId, 'device') THEN 'DEVICE_TYPE' - ELSE 'OTHER' - END as device_category - FROM stream`, - testData: []map[string]interface{}{ - {"deviceId": "very_long_device_name"}, // LENGTH > 10 - {"deviceId": "device1"}, // starts with 'device' - {"deviceId": "sensor1"}, // other - }, - expectedResults: []map[string]interface{}{ - {"deviceId": "very_long_device_name", "device_category": "LONG_NAME"}, - {"deviceId": "device1", "device_category": "DEVICE_TYPE"}, - {"deviceId": "sensor1", "device_category": "OTHER"}, - }, - wantErr: false, - skipReason: "CASE表达式结合字符串函数暂不支持", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // 如果有跳过原因,直接跳过该测试 - if tt.skipReason != "" { - t.Skip(tt.skipReason) - return - } - - streamsql := New() - defer streamsql.Stop() - - err := streamsql.Execute(tt.sql) - - if tt.wantErr { - assert.Error(t, err) - return - } - - if err != nil { - t.Logf("SQL execution failed for %s: %v", tt.name, err) - t.Skip("Complex CASE expression not yet supported") - return - } - - // 收集结果 var results []map[string]interface{} - var resultsMutex sync.Mutex - - streamsql.stream.AddSink(func(result interface{}) { - resultsMutex.Lock() - defer resultsMutex.Unlock() + // 等待窗口触发或超时 + select { + case result := <-resultChan: if resultSlice, ok := result.([]map[string]interface{}); ok { results = append(results, resultSlice...) - } else if resultMap, ok := result.(map[string]interface{}); ok { - results = append(results, resultMap) } - }) - - // 添加测试数据 - for _, data := range tt.testData { - streamsql.stream.AddData(data) - } - - // 等待数据处理完成 - time.Sleep(200 * time.Millisecond) - - // 验证结果 - resultsMutex.Lock() - actualResults := make([]map[string]interface{}, len(results)) - copy(actualResults, results) - resultsMutex.Unlock() - - t.Logf("测试用例: %s", tt.name) - t.Logf("输入数据: %v", tt.testData) - t.Logf("实际结果: %v", actualResults) - t.Logf("期望结果: %v", tt.expectedResults) - - // 验证结果数量 - assert.Equal(t, len(tt.expectedResults), len(actualResults), "结果数量应该匹配") - - if len(actualResults) == 0 { - t.Skip("没有收到结果,可能CASE表达式在此场景下暂不支持") + case <-time.After(1200 * time.Millisecond): + // 如果1.2秒内没有结果,手动触发窗口 + if strm.Window != nil { + strm.Window.Trigger() + } + // 再等待一点时间获取结果 + select { + case result := <-resultChan: + if resultSlice, ok := result.([]map[string]interface{}); ok { + results = append(results, resultSlice...) + } + case <-time.After(200 * time.Millisecond): + // 超时,继续验证 + } + case <-ctx.Done(): return } - // 验证每个结果 - for i, expectedResult := range tt.expectedResults { - if i >= len(actualResults) { - break - } - - actualResult := actualResults[i] - - // 验证关键字段 - for key, expectedValue := range expectedResult { - actualValue, exists := actualResult[key] - assert.True(t, exists, "结果应该包含字段: %s", key) - - if exists { - // 对于数值类型,允许小的浮点数误差 - if expectedFloat, ok := expectedValue.(float64); ok { - if actualFloat, ok := actualValue.(float64); ok { - assert.InDelta(t, expectedFloat, actualFloat, 0.01, - "字段 %s 的值应该匹配 (期望: %v, 实际: %v)", key, expectedValue, actualValue) - } else { - assert.Equal(t, expectedValue, actualValue, - "字段 %s 的值应该匹配 (期望: %v, 实际: %v)", key, expectedValue, actualValue) - } - } else { - // 对于字符串类型,如果返回的是长度而不是字符串本身,需要特殊处理 - if expectedStr, ok := expectedValue.(string); ok { - if actualFloat, ok := actualValue.(float64); ok && tt.name == "CASE表达式与字符串处理" { - // 字符串函数可能返回长度而不是字符串本身 - expectedLength := float64(len(expectedStr)) - assert.Equal(t, expectedLength, actualFloat, - "字段 %s 可能返回字符串长度而不是字符串本身 (期望长度: %v, 实际: %v)", - key, expectedLength, actualFloat) - } else { - assert.Equal(t, expectedValue, actualValue, - "字段 %s 的值应该匹配 (期望: %v, 实际: %v)", key, expectedValue, actualValue) - } - } else { - assert.Equal(t, expectedValue, actualValue, - "字段 %s 的值应该匹配 (期望: %v, 实际: %v)", key, expectedValue, actualValue) - } - } - } - } - } - - t.Logf("✅ 测试用例 '%s' 验证完成", tt.name) - }) - } - - // 测试总结 - t.Logf("\n=== TestComplexCaseExpressions 测试总结 ===") - t.Logf("✅ 通过的测试: 简单搜索CASE表达式(数值结果)") - t.Logf("⏭️ 跳过的测试: 复杂/不支持的CASE表达式") - t.Logf("📝 备注: 字符串结果返回长度而非字符串本身是已知行为") -} - -// TestCaseExpressionEdgeCases 测试边界情况 -func TestCaseExpressionEdgeCases(t *testing.T) { - tests := []struct { - name string - sql string - wantErr bool - }{ - { - name: "CASE表达式语法错误 - 缺少END", - sql: `SELECT deviceId, - CASE - WHEN temperature > 30 THEN 'HOT' - ELSE 'NORMAL' - FROM stream`, - wantErr: false, // SQL解析器可能会容错处理 - }, - { - name: "CASE表达式语法错误 - 缺少THEN", - sql: `SELECT deviceId, - CASE - WHEN temperature > 30 'HOT' - ELSE 'NORMAL' - END as temp_category - FROM stream`, - wantErr: false, // SQL解析器可能会容错处理 - }, - { - name: "空的CASE表达式", - sql: `SELECT deviceId, - CASE END as empty_case - FROM stream`, - wantErr: false, // SQL解析器可能会容错处理 - }, - { - name: "只有ELSE的CASE表达式", - sql: `SELECT deviceId, - CASE - ELSE 'DEFAULT' - END as only_else - FROM stream`, - wantErr: false, // 这在SQL标准中是合法的 - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - streamsql := New() - defer streamsql.Stop() - - err := streamsql.Execute(tt.sql) - - if tt.wantErr { - assert.Error(t, err, "Expected SQL execution to fail") - } else { - if err != nil { - t.Logf("SQL execution failed for %s: %v", tt.name, err) - t.Skip("CASE expression syntax not yet supported") - } else { - assert.NoError(t, err, "Expected SQL execution to succeed") - } + // 验证结果 + if len(results) > 0 { + firstResult := results[0] + assert.NotNil(t, firstResult) + assert.Contains(t, firstResult, "deviceId", "Result should contain deviceId") } }) } } // TestCaseExpressionNullHandlingInAggregation 测试CASE表达式在聚合函数中正确处理NULL值 -// 这是针对修复后功能的完整测试,验证所有聚合函数按SQL标准处理NULL值 func TestCaseExpressionNullHandlingInAggregation(t *testing.T) { testCases := []struct { name string @@ -1415,48 +590,10 @@ func TestCaseExpressionNullHandlingInAggregation(t *testing.T) { }, description: "验证当CASE表达式全部返回NULL时,聚合函数的正确行为", }, - { - name: "混合NULL和非NULL值的CASE表达式", - sql: `SELECT deviceType, - SUM(CASE - WHEN temperature IS NULL THEN 0 - WHEN temperature > 25 THEN temperature - ELSE NULL - END) as conditional_sum, - COUNT(CASE - WHEN temperature IS NOT NULL AND temperature > 25 THEN 1 - ELSE NULL - END) as valid_temp_count, - COUNT(*) as total_count - FROM stream - GROUP BY deviceType, TumblingWindow('2s')`, - testData: []map[string]interface{}{ - {"deviceType": "mixed", "temperature": 30.0}, // 满足条件 - {"deviceType": "mixed", "temperature": 20.0}, // 不满足条件,返回NULL - {"deviceType": "mixed", "temperature": nil}, // NULL值,返回0 - {"deviceType": "mixed", "temperature": 28.0}, // 满足条件 - {"deviceType": "empty", "temperature": 22.0}, // 不满足条件,返回NULL - }, - expectedDeviceResults: map[string]map[string]interface{}{ - "mixed": { - "conditional_sum": 58.0, // 30 + 0 + 28 - "valid_temp_count": 2.0, // 30和28满足条件 - "total_count": 4.0, - }, - "empty": { - "conditional_sum": nil, // 只有NULL值被SUM忽略 - "valid_temp_count": 0.0, // 没有满足条件的值 - "total_count": 1.0, - }, - }, - description: "验证包含IS NULL/IS NOT NULL条件的复杂CASE表达式", - }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - t.Logf("测试: %s", tc.description) - // 创建StreamSQL实例 ssql := New() defer ssql.Stop() @@ -1518,131 +655,232 @@ func TestCaseExpressionNullHandlingInAggregation(t *testing.T) { "设备类型 %s 的字段 %s 应该为NULL", deviceType, key) } else { assert.Equal(t, expectedValue, actualValue, - "设备类型 %s 的字段 %s 应该匹配: 期望 %v, 实际 %v", - deviceType, key, expectedValue, actualValue) + "设备类型 %s 的字段 %s 应该匹配", deviceType, key) } } } - - t.Logf("✅ 测试 '%s' 验证完成", tc.name) }) } } -// TestCaseExpressionWithNullComparisons 测试CASE表达式中的NULL比较 -func TestCaseExpressionWithNullComparisons(t *testing.T) { +// TestHavingWithCaseExpression 测试HAVING子句中的CASE表达式 +func TestHavingWithCaseExpression(t *testing.T) { tests := []struct { - name string - exprStr string - data map[string]interface{} - expected interface{} // 使用interface{}以支持NULL值 - isNull bool + name string + sql string + wantErr bool + errMsg string }{ { - name: "NULL值在CASE条件中 - 应该走ELSE分支", - exprStr: "CASE WHEN temperature > 30 THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": nil}, - expected: 0.0, - isNull: false, + name: "简单CASE表达式在HAVING中", + sql: `SELECT deviceId, + AVG(temperature) as avg_temp, + AVG(CASE WHEN temperature > 30 THEN temperature ELSE 0 END) as conditional_avg + FROM stream + GROUP BY deviceId, TumblingWindow('5s') + HAVING conditional_avg > 25 + WITH (TIMESTAMP='ts', TIMEUNIT='ss')`, + wantErr: false, }, { - name: "IS NULL条件 - 应该匹配", - exprStr: "CASE WHEN temperature IS NULL THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": nil}, - expected: 1.0, - isNull: false, - }, - { - name: "IS NOT NULL条件 - 不应该匹配", - exprStr: "CASE WHEN temperature IS NOT NULL THEN 1 ELSE 0 END", - data: map[string]interface{}{"temperature": nil}, - expected: 0.0, - isNull: false, - }, - { - name: "CASE表达式返回NULL", - exprStr: "CASE WHEN temperature > 30 THEN temperature ELSE NULL END", - data: map[string]interface{}{"temperature": 25.0}, - expected: nil, - isNull: true, - }, - { - name: "CASE表达式返回有效值", - exprStr: "CASE WHEN temperature > 30 THEN temperature ELSE NULL END", - data: map[string]interface{}{"temperature": 35.0}, - expected: 35.0, - isNull: false, + name: "复杂CASE表达式在HAVING中", + sql: `SELECT deviceId, + COUNT(*) as total_count, + SUM(CASE + WHEN temperature > 35 THEN 2 + WHEN temperature > 25 THEN 1 + ELSE 0 + END) as weighted_score + FROM stream + GROUP BY deviceId, TumblingWindow('5s') + HAVING weighted_score > 3 + WITH (TIMESTAMP='ts', TIMEUNIT='ss')`, + wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - expression, err := expr.NewExpression(tt.exprStr) - assert.NoError(t, err, "表达式解析应该成功") + // 测试SQL解析 + _, err := rsql.NewParser(tt.sql).Parse() - // 测试支持NULL的计算方法 - result, isNull, err := expression.EvaluateWithNull(tt.data) - assert.NoError(t, err, "表达式计算应该成功") - - if tt.isNull { - assert.True(t, isNull, "表达式应该返回NULL") + if tt.wantErr { + assert.Error(t, err, "应该产生解析错误") + if tt.errMsg != "" { + assert.Contains(t, err.Error(), tt.errMsg, "错误消息应该包含期望的内容") + } } else { - assert.False(t, isNull, "表达式不应该返回NULL") - assert.Equal(t, tt.expected, result, "表达式结果应该匹配期望值") + assert.NoError(t, err, "SQL解析应该成功") + } + + // 如果解析成功,尝试创建StreamSQL实例 + if !tt.wantErr && err == nil { + streamSQL := New() + defer streamSQL.Stop() + + err = streamSQL.Execute(tt.sql) + if err != nil { + t.Skipf("HAVING中的CASE表达式执行暂不支持: %v", err) + } } }) } } -/* -=== CASE表达式测试总结 === +// TestHavingWithCaseExpressionFunctional 功能测试HAVING子句中的CASE表达式 +func TestHavingWithCaseExpressionFunctional(t *testing.T) { + sql := `SELECT deviceId, + AVG(temperature) as avg_temp, + COUNT(*) as total_count, + SUM(CASE WHEN temperature > 30 THEN 1 ELSE 0 END) as hot_count + FROM stream + GROUP BY deviceId, TumblingWindow('2s') + HAVING hot_count >= 2 + WITH (TIMESTAMP='ts', TIMEUNIT='ss')` -本测试文件全面验证了StreamSQL中CASE表达式的功能,包括: + // 创建StreamSQL实例 + streamSQL := New() + defer streamSQL.Stop() -🟢 已完全实现并测试: -1. 基本CASE表达式解析和计算 -2. 聚合函数中的CASE表达式 (SUM, COUNT, AVG, MIN, MAX) -3. NULL值正确处理和传播 -4. 比较运算符增强 (>, <, >=, <=, =, !=) -5. 逻辑运算符支持 (AND, OR, NOT) -6. 数学函数集成 (ABS, ROUND等) -7. 算术表达式计算 -8. IS NULL / IS NOT NULL 条件 -9. 字段提取功能 -10. 复杂条件组合 + err := streamSQL.Execute(sql) + assert.NoError(t, err, "执行SQL应该成功") -🟡 部分支持或有限制: -1. 嵌套CASE表达式 (回退到expr-lang引擎) -2. 某些字符串函数的类型转换问题 -3. 复杂字符串函数在CASE中的使用 + // 模拟数据 + baseTime := time.Now() + testData := []map[string]interface{}{ + // device1: 3条高温记录,应该通过HAVING条件 + {"deviceId": "device1", "temperature": 35.0, "ts": baseTime}, + {"deviceId": "device1", "temperature": 32.0, "ts": baseTime}, + {"deviceId": "device1", "temperature": 31.0, "ts": baseTime}, + {"deviceId": "device1", "temperature": 25.0, "ts": baseTime}, // 不是高温 -🔧 重要修复历史: -- v1.x: 修复了聚合函数中CASE表达式的NULL值处理 -- v1.x: 增强了比较运算符的实现,修复大小比较问题 -- v1.x: 所有聚合函数现在按SQL标准正确处理NULL值 -- v1.x: SUM/AVG/MIN/MAX 忽略NULL值,全NULL时返回NULL -- v1.x: COUNT 正确忽略NULL值 + // device2: 1条高温记录,不应该通过HAVING条件 + {"deviceId": "device2", "temperature": 33.0, "ts": baseTime}, + {"deviceId": "device2", "temperature": 28.0, "ts": baseTime}, + {"deviceId": "device2", "temperature": 26.0, "ts": baseTime}, -📊 测试覆盖: -- 表达式解析: TestCaseExpressionParsing -- SQL集成: TestCaseExpressionInSQL -- 聚合查询: TestCaseExpressionInAggregation -- NULL值处理: TestCaseExpressionNullHandlingInAggregation -- NULL比较: TestCaseExpressionWithNullComparisons -- 复杂表达式: TestComplexCaseExpressions -- 字段提取: TestCaseExpressionFieldExtraction -- 边界情况: TestCaseExpressionEdgeCases + // device3: 2条高温记录,应该通过HAVING条件 + {"deviceId": "device3", "temperature": 34.0, "ts": baseTime}, + {"deviceId": "device3", "temperature": 31.0, "ts": baseTime}, + {"deviceId": "device3", "temperature": 29.0, "ts": baseTime}, + } -🎯 使用指南: -- 优先使用简单搜索CASE表达式 -- 在聚合查询中充分利用CASE表达式进行条件计算 -- 利用IS NULL/IS NOT NULL进行空值检查 -- 组合逻辑运算符实现复杂条件判断 -- 在聚合函数中正确处理NULL值返回 + // 添加数据并获取结果 + var results []map[string]interface{} + var resultsMutex sync.Mutex + streamSQL.stream.AddSink(func(result interface{}) { + resultsMutex.Lock() + defer resultsMutex.Unlock() + if resultSlice, ok := result.([]map[string]interface{}); ok { + results = append(results, resultSlice...) + } + }) -🚀 性能和可靠性: -- 所有测试用例并发安全 -- 表达式解析和计算高效 -- 符合SQL标准的NULL值处理语义 -- 完整的错误处理和边界情况覆盖 -*/ + for _, data := range testData { + streamSQL.stream.AddData(data) + } + + // 等待窗口触发 + time.Sleep(2500 * time.Millisecond) + + // 手动触发窗口 + streamSQL.stream.Window.Trigger() + + // 等待结果 + time.Sleep(200 * time.Millisecond) + + // 验证结果 + resultsMutex.Lock() + defer resultsMutex.Unlock() + + // 应该只有device1和device3通过HAVING条件(hot_count >= 2) + assert.Greater(t, len(results), 0, "应该有结果返回") + + // 验证结果中只包含满足HAVING条件的设备 + deviceResults := make(map[string]map[string]interface{}) + for _, result := range results { + deviceId, ok := result["deviceId"].(string) + assert.True(t, ok, "deviceId应该是字符串类型") + deviceResults[deviceId] = result + } + + // 验证HAVING条件的过滤效果 + for deviceId, result := range deviceResults { + hotCount := getFloat64Value(result["hot_count"]) + assert.GreaterOrEqual(t, hotCount, 2.0, + "设备 %s 的hot_count应该 >= 2 (HAVING条件)", deviceId) + } + + // device2应该被HAVING条件过滤掉(只有1条高温记录 < 2) + assert.NotContains(t, deviceResults, "device2", + "device2应该被HAVING条件过滤掉(hot_count=1 < 2)") + + // 验证期望的设备出现在结果中 + assert.Contains(t, deviceResults, "device1", "device1应该通过HAVING条件") + assert.Contains(t, deviceResults, "device3", "device3应该通过HAVING条件") +} + +// TestNegativeNumberInSQL 测试负数在完整SQL中的使用 +func TestNegativeNumberInSQL(t *testing.T) { + sql := `SELECT deviceId, + temperature, + CASE + WHEN temperature < -10.0 THEN 'FREEZING' + WHEN temperature < 0 THEN 'COLD' + WHEN temperature = 0 THEN 'ZERO' + ELSE 'POSITIVE' + END as temp_category, + CASE + WHEN temperature > 0 THEN temperature + ELSE -1.0 + END as adjusted_temp + FROM stream` + + streamSQL := New() + defer streamSQL.Stop() + + err := streamSQL.Execute(sql) + assert.NoError(t, err, "包含负数的SQL应该执行成功") + + // 模拟包含负数的数据 + testData := []map[string]interface{}{ + {"deviceId": "sensor1", "temperature": -15.0}, + {"deviceId": "sensor2", "temperature": -5.0}, + {"deviceId": "sensor3", "temperature": 0.0}, + {"deviceId": "sensor4", "temperature": 10.0}, + } + + // 收集结果 + var results []map[string]interface{} + var resultsMutex sync.Mutex + + streamSQL.stream.AddSink(func(result interface{}) { + resultsMutex.Lock() + defer resultsMutex.Unlock() + if resultSlice, ok := result.([]map[string]interface{}); ok { + results = append(results, resultSlice...) + } else if resultMap, ok := result.(map[string]interface{}); ok { + results = append(results, resultMap) + } + }) + + // 添加测试数据 + for _, data := range testData { + streamSQL.stream.AddData(data) + } + + // 等待处理 + time.Sleep(200 * time.Millisecond) + + // 验证结果 + resultsMutex.Lock() + defer resultsMutex.Unlock() + + for _, result := range results { + // 验证包含必要字段 + assert.Contains(t, result, "deviceId", "结果应该包含deviceId") + assert.Contains(t, result, "temperature", "结果应该包含temperature") + assert.Contains(t, result, "temp_category", "结果应该包含temp_category") + assert.Contains(t, result, "adjusted_temp", "结果应该包含adjusted_temp") + } +} diff --git a/streamsql_function_integration_test.go b/streamsql_function_integration_test.go index b8997bb..88c222a 100644 --- a/streamsql_function_integration_test.go +++ b/streamsql_function_integration_test.go @@ -449,12 +449,6 @@ func TestFunctionIntegrationMixed(t *testing.T) { item := resultSlice[0] - // 打印调试信息 - t.Logf("Result item: %+v", item) - for key, value := range item { - t.Logf(" %s: %v (type: %T)", key, value, value) - } - assert.Equal(t, "sensor1", item["device"]) assert.Equal(t, "SENSOR1", item["device_upper"]) @@ -472,7 +466,6 @@ func TestFunctionIntegrationMixed(t *testing.T) { } else if val, ok := roundedAvg.(float64); ok { // 验证结果在合理范围内 assert.True(t, val >= 25.0 && val <= 25.5, "rounded_avg should be between 25.0 and 25.5, got %v", val) - t.Logf("rounded_avg test passed: %v", val) } else { t.Errorf("rounded_avg is not a float64: %v (type: %T)", roundedAvg, roundedAvg) } @@ -584,7 +577,6 @@ func TestNestedFunctionSupport(t *testing.T) { // 执行包含 round(avg(temperature), 2) 的查询 query := "SELECT device, round(avg(temperature), 2) as rounded_avg FROM stream GROUP BY device, TumblingWindow('1s')" - t.Logf("Executing query: %s", query) err := streamsql.Execute(query) assert.Nil(t, err) @@ -620,11 +612,6 @@ func TestNestedFunctionSupport(t *testing.T) { assert.Len(t, resultSlice, 1) item := resultSlice[0] - t.Logf("Result item: %+v", item) - for key, value := range item { - t.Logf(" %s: %v (type: %T)", key, value, value) - } - assert.Equal(t, "sensor1", item["device"]) // 验证四舍五入的平均值