Merge pull request #30 from rulego/dev

Dev
This commit is contained in:
Whki
2025-07-30 18:22:43 +08:00
committed by GitHub
45 changed files with 9370 additions and 6732 deletions
+4 -4
View File
@@ -82,7 +82,7 @@ func main() {
}
// Handle real-time transformation results
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("Real-time result: %+v\n", result)
})
@@ -110,7 +110,7 @@ func main() {
// Process data one by one, each will output results immediately
for _, data := range sensorData {
ssql.Stream().AddData(data)
ssql.Emit(data)
time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
}
@@ -273,7 +273,7 @@ func main() {
}
// Handle aggregation results
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("Aggregation result: %+v\n", result)
})
@@ -293,7 +293,7 @@ func main() {
"timestamp": time.Now().Unix(),
}
ssql.Stream().AddData(nestedData)
ssql.Emit(nestedData)
}
```
+4 -12
View File
@@ -85,7 +85,7 @@ func main() {
}
// 处理实时转换结果
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("实时处理结果: %+v\n", result)
})
@@ -113,7 +113,7 @@ func main() {
// 逐条处理数据,每条都会立即输出结果
for _, data := range sensorData {
ssql.Stream().AddData(data)
ssql.Emit(data)
time.Sleep(100 * time.Millisecond) // 模拟实时数据到达
}
@@ -289,7 +289,7 @@ func main() {
}
// 处理聚合结果
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %+v\n", result)
})
@@ -309,7 +309,7 @@ func main() {
"timestamp": time.Now().Unix(),
}
ssql.Stream().AddData(nestedData)
ssql.Emit(nestedData)
}
```
@@ -317,14 +317,6 @@ func main() {
StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合、分析、窗口等上百个函数。[文档](docs/FUNCTIONS_USAGE_GUIDE.md)
### 🎨 支持的函数类型
- **📊 数学函数** - sqrt, power, abs, 三角函数等
- **📝 字符串函数** - concat, upper, lower, trim等
- **🔄 转换函数** - cast, hex2dec, encode/decode等
- **📈 聚合函数** - 自定义聚合逻辑
- **🔍 分析函数** - lag, latest, 变化检测等
## 概念
### 窗口
+208 -175
View File
File diff suppressed because it is too large Load Diff
+225
View File
@@ -0,0 +1,225 @@
# StreamSQL 负数支持文档
## 概述
StreamSQL 现在全面支持负数在 CASE 表达式中的使用。本文档总结了负数支持的完善情况、支持范围和使用建议。
## ✅ 已支持的负数用法
### 1. 基本负数常量
```sql
-- CASE 表达式中的负数常量
CASE WHEN temperature > 0 THEN 1 ELSE -1 END
-- 负数小数
CASE WHEN temperature > 0 THEN 1.5 ELSE -2.5 END
-- 负零
CASE WHEN temperature = -0 THEN 1 ELSE 0 END
```
### 2. 比较运算符后的负数
```sql
-- 比较运算符后直接跟负数
CASE WHEN temperature < -10 THEN 'FREEZING' ELSE 'NORMAL' END
CASE WHEN temperature >= -5.5 THEN 'ABOVE' ELSE 'BELOW' END
CASE WHEN temperature > -20 THEN 'WARM' ELSE 'COLD' END
```
### 3. 简单 CASE 表达式中的负数
```sql
-- 简单 CASE 中使用负数作为匹配值
CASE temperature
WHEN -10 THEN 'FROZEN'
WHEN -5 THEN 'COLD'
WHEN 0 THEN 'ZERO'
ELSE 'OTHER'
END
```
### 4. 算术表达式中的负数
```sql
-- 括号内的负数运算
CASE WHEN temperature + (-10) > 0 THEN 1 ELSE 0 END
CASE WHEN (temperature * -1) > 10 THEN 1 ELSE 0 END
```
## ⚠️ 部分支持或限制
### 1. 函数参数中的负数表达式
```sql
-- 当前不完全支持:函数参数中的负数变量
CASE WHEN ABS(-temperature) > 10 THEN 1 ELSE 0 END -- ❌
-- 推荐替代方案:使用括号或先计算
CASE WHEN ABS(temperature * -1) > 10 THEN 1 ELSE 0 END -- ✅
```
### 2. BETWEEN 语句中的负数范围
```sql
-- 当前不支持:BETWEEN 与负数组合
CASE WHEN temperature BETWEEN -20 AND -10 THEN 1 ELSE 0 END -- ❌
-- 推荐替代方案:使用比较运算符
CASE WHEN temperature >= -20 AND temperature <= -10 THEN 1 ELSE 0 END -- ✅
```
### 3. SQL 中的空格分隔负数
```sql
-- 避免在 SQL 中使用空格分隔的负数
SELECT CASE WHEN temperature < - 10 THEN 'COLD' END -- ❌ 解析问题
-- 推荐写法:紧密连接或使用括号
SELECT CASE WHEN temperature < -10 THEN 'COLD' END -- ✅
SELECT CASE WHEN temperature < (-10) THEN 'COLD' END -- ✅
```
## 🔧 技术实现
### 词法分析器增强
1. **智能负数识别**
- 识别比较运算符后的负数(`<`, `>`, `<=`, `>=`, `==`, `!=`
- 支持逻辑运算符后的负数(`AND`, `OR`
- 支持 CASE 关键字后的负数(`WHEN`, `THEN`, `ELSE`
2. **连续运算符检查优化**
- 允许比较运算符后跟负数的合法组合
- 智能区分负数与减号运算符
3. **空格处理**
- 正确处理空格分隔的负数标记
- 改进 token 化过程以支持各种负数格式
### 表达式求值增强
1. **负数常量解析**:完全支持负整数和负小数
2. **类型转换**:正确处理负数的数值转换
3. **NULL 值处理**:负数与 NULL 值的正确交互
## 📊 测试覆盖
### 表达式级别测试
- ✅ 负数常量在 THEN/ELSE 中
- ✅ 负数常量在 WHEN 条件中
- ✅ 负数小数支持
- ✅ 负数在算术表达式中
- ✅ 负数在简单 CASE 中
- ✅ 负零处理
### SQL 集成测试
- ✅ 完整 SQL 语句中的负数支持
- ✅ 非聚合查询中的负数表达式
- ✅ 聚合查询中的负数处理
## 🎯 使用建议
### 1. 推荐的负数写法
```sql
-- ✅ 推荐:紧密连接的负数
CASE WHEN temperature < -10 THEN 'FREEZING' END
-- ✅ 推荐:括号包围的负数(最安全)
CASE WHEN temperature < (-10) THEN 'FREEZING' END
-- ✅ 推荐:负数小数
CASE WHEN temperature < -10.5 THEN 'FREEZING' END
```
### 2. 避免的写法
```sql
-- ❌ 避免:空格分隔的负数
CASE WHEN temperature < - 10 THEN 'FREEZING' END
-- ❌ 避免:复杂的负数表达式在函数中
CASE WHEN ABS(-temperature) > 10 THEN 1 END
```
### 3. 最佳实践
1. **使用括号**:当不确定负数解析时,总是使用括号包围负数
2. **避免空格**:在负号和数字之间不要添加空格
3. **测试验证**:对包含负数的复杂表达式进行充分测试
4. **版本兼容**:确保使用的 StreamSQL 版本支持所需的负数功能
## 🚀 未来改进计划
1. **完全支持函数参数中的负数表达式**
2. **支持 BETWEEN 语句中的负数范围**
3. **改进 SQL 解析器对空格分隔负数的处理**
4. **扩展负数支持到更多数学和字符串函数**
## 示例代码
```go
package main
import (
"fmt"
"github.com/rulego/streamsql"
)
func main() {
// 创建 StreamSQL 实例
sql := streamsql.New()
defer sql.Stop()
// 包含负数的 SQL 查询
query := `
SELECT deviceId,
temperature,
CASE
WHEN temperature < -10 THEN 'FREEZING'
WHEN temperature < 0 THEN 'COLD'
WHEN temperature = 0 THEN 'ZERO'
ELSE 'POSITIVE'
END as temp_category,
CASE
WHEN temperature > 0 THEN temperature
ELSE (-1.0)
END as adjusted_temp
FROM stream
`
// 执行查询
err := sql.Execute(query)
if err != nil {
fmt.Printf("执行失败: %v\n", err)
return
}
// 添加数据处理器
sql.AddSink(func(result interface{}) {
fmt.Printf("结果: %+v\n", result)
})
// 添加测试数据
testData := []map[string]interface{}{
{"deviceId": "sensor1", "temperature": -15.0},
{"deviceId": "sensor2", "temperature": -5.0},
{"deviceId": "sensor3", "temperature": 0.0},
{"deviceId": "sensor4", "temperature": 10.0},
}
for _, data := range testData {
sql.AddData(data)
}
}
```
---
**更新日期**: 2025-06-17
**版本**: StreamSQL v0.x
**作者**: StreamSQL 开发团队
+2 -2
View File
@@ -51,7 +51,7 @@ func main() {
fmt.Println("✓ SQL执行成功")
// 5. 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("📊 聚合结果: %v\n", result)
})
@@ -69,7 +69,7 @@ func main() {
for _, data := range sensorData {
fmt.Printf(" 设备: %s, 温度: %.1f°F, 湿度: %.1f%%\n",
data["device"], data["temperature"], data["humidity"])
ssql.AddData(data)
ssql.Emit(data)
}
// 7. 等待处理完成
File diff suppressed because it is too large Load Diff
+92
View File
@@ -0,0 +1,92 @@
# StreamSQL 综合测试演示
这个示例提供了一个统一的入口来测试和验证StreamSQL的各种功能特性。
## 功能覆盖
### 1. 基础数据过滤
- 简单的WHERE条件过滤
- 实时数据流处理
- 结果回调处理
### 2. 聚合分析
- 滚动窗口聚合(TumblingWindow
- 多种聚合函数:AVG、COUNT、MAX、MIN
- 按字段分组
### 3. 滑动窗口
- 滑动窗口分析(SlidingWindow
- 窗口大小和滑动间隔配置
- 连续数据流处理
### 4. 嵌套字段访问
- 多层嵌套对象访问
- 复杂数据结构处理
- 嵌套字段条件过滤
### 5. 自定义函数
- 数学函数(square、circle_area
- 转换函数(f_to_c
- 函数注册和使用
### 6. 复杂查询
- 多种功能组合使用
- 嵌套字段 + 自定义函数 + 聚合
- 复杂业务场景模拟
## 运行方式
```bash
cd examples\comprehensive-test
go run main.go
```
## 预期输出
程序会依次执行6个测试场景,每个场景都会输出相应的结果:
1. **基础过滤测试**:显示温度大于25度的设备告警
2. **聚合分析测试**:显示每个设备的温度统计信息
3. **滑动窗口测试**:显示滑动窗口内的温度分析
4. **嵌套字段测试**:显示复杂数据结构的字段提取
5. **自定义函数测试**:显示自定义函数的计算结果
6. **复杂查询测试**:显示综合功能的查询结果
## 测试数据
- **传感器数据**:包含设备ID、温度、湿度等信息
- **嵌套结构**:设备信息、位置信息、传感器数据的多层嵌套
- **随机数据**:使用随机数生成模拟真实的传感器数据流
## 自定义函数说明
### square(x)
- **功能**:计算数值的平方
- **参数**:数值
- **返回**:平方值
### f_to_c(fahrenheit)
- **功能**:华氏度转摄氏度
- **参数**:华氏度温度值
- **返回**:摄氏度温度值
- **公式**(F - 32) × 5/9
### circle_area(radius)
- **功能**:计算圆的面积
- **参数**:半径
- **返回**:圆的面积
- **公式**:π ×
## 注意事项
1. **窗口触发**:聚合查询需要等待窗口时间到达或手动触发
2. **数据格式**:确保输入数据格式正确,特别是嵌套字段的结构
3. **函数注册**:自定义函数需要在使用前注册
4. **资源清理**:使用defer确保StreamSQL实例正确关闭
## 扩展建议
- 可以添加更多的自定义函数
- 可以测试更复杂的窗口配置
- 可以添加错误处理和异常数据测试
- 可以集成性能测试和压力测试
File diff suppressed because it is too large Load Diff
+8 -8
View File
@@ -625,12 +625,12 @@ func testMathFunctions(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 数学函数结果: %v\n", result)
})
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
}
time.Sleep(1 * time.Second)
@@ -672,12 +672,12 @@ func testStringFunctions(ssql *streamsql.Streamsql) {
},
}
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 字符串函数结果: %v\n", result)
})
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
}
time.Sleep(500 * time.Millisecond)
@@ -715,12 +715,12 @@ func testConversionFunctions(ssql *streamsql.Streamsql) {
},
}
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 转换函数结果: %v\n", result)
})
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
}
time.Sleep(500 * time.Millisecond)
@@ -753,12 +753,12 @@ func testAggregateFunctions(ssql *streamsql.Streamsql) {
map[string]interface{}{"device": "sensor1", "value": 128.0, "category": "A"},
}
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 聚合函数结果: %v\n", result)
})
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
}
time.Sleep(1 * time.Second)
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -78,7 +78,7 @@ func testDataOverflowPersistence() {
"id": i,
"value": fmt.Sprintf("data_%d", i),
}
stream.AddData(data)
stream.Emit(data)
}
duration := time.Since(start)
+4 -4
View File
@@ -122,7 +122,7 @@ func testSimpleQuery(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 简单查询结果: %v\n", result)
})
@@ -143,7 +143,7 @@ func testSimpleQuery(ssql *streamsql.Streamsql) {
}
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
time.Sleep(200 * time.Millisecond) // 稍微延迟
}
@@ -171,7 +171,7 @@ func testAggregateQuery(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 聚合查询结果: %v\n", result)
})
@@ -198,7 +198,7 @@ func testAggregateQuery(ssql *streamsql.Streamsql) {
}
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
}
// 等待窗口触发
+82
View File
@@ -0,0 +1,82 @@
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
// main 演示PrintTable方法的使用
func main() {
fmt.Println("=== StreamSQL PrintTable 示例 ===")
// 创建StreamSQL实例
ssql := streamsql.New()
// 示例1: 聚合查询 - 按设备分组统计温度
fmt.Println("\n示例1: 聚合查询结果")
err := ssql.Execute("SELECT device, AVG(temperature) as avg_temp, MAX(temperature) as max_temp FROM stream GROUP BY device, TumblingWindow('3s')")
if err != nil {
fmt.Printf("执行SQL失败: %v\n", err)
return
}
// 使用PrintTable方法以表格形式输出结果
ssql.PrintTable()
// 发送测试数据
testData := []map[string]interface{}{
{"device": "sensor1", "temperature": 25.5, "timestamp": time.Now()},
{"device": "sensor1", "temperature": 26.0, "timestamp": time.Now()},
{"device": "sensor2", "temperature": 23.8, "timestamp": time.Now()},
{"device": "sensor2", "temperature": 24.2, "timestamp": time.Now()},
{"device": "sensor1", "temperature": 27.1, "timestamp": time.Now()},
}
for _, data := range testData {
ssql.Emit(data)
}
// 等待窗口触发
time.Sleep(4 * time.Second)
// 示例2: 非聚合查询
fmt.Println("\n示例2: 非聚合查询结果")
ssql2 := streamsql.New()
err = ssql2.Execute("SELECT device, temperature, temperature * 1.8 + 32 as fahrenheit FROM stream WHERE temperature > 24")
if err != nil {
fmt.Printf("执行SQL失败: %v\n", err)
return
}
ssql2.PrintTable()
// 发送测试数据
for _, data := range testData {
ssql2.Emit(data)
}
// 等待处理完成
time.Sleep(1 * time.Second)
// 示例3: 对比原始Print方法
fmt.Println("\n示例3: 原始Print方法输出对比")
ssql3 := streamsql.New()
err = ssql3.Execute("SELECT device, COUNT(*) as count FROM stream GROUP BY device, TumblingWindow('2s')")
if err != nil {
fmt.Printf("执行SQL失败: %v\n", err)
return
}
fmt.Println("原始PrintTable方法:")
ssql3.PrintTable()
// 发送数据
for i := 0; i < 3; i++ {
ssql3.Emit(map[string]interface{}{"device": "test_device", "value": i})
}
time.Sleep(3 * time.Second)
fmt.Println("\n=== 示例结束 ===")
}
File diff suppressed because it is too large Load Diff
+74 -74
View File
@@ -1,74 +1,74 @@
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/types"
)
// demonstrateWindowConfig 演示窗口统一配置的使用
func demonstrateWindowConfig() {
fmt.Println("=== 窗口统一配置演示 ===")
// 1. 测试默认配置的窗口
fmt.Println("\n1. 默认配置窗口测试")
testWindowWithConfig("默认配置", streamsql.New())
// 2. 测试高性能配置的窗口
fmt.Println("\n2. 高性能配置窗口测试")
testWindowWithConfig("高性能配置", streamsql.New(streamsql.WithHighPerformance()))
// 3. 测试低延迟配置的窗口
fmt.Println("\n3. 低延迟配置窗口测试")
testWindowWithConfig("低延迟配置", streamsql.New(streamsql.WithLowLatency()))
// 4. 测试自定义配置的窗口
fmt.Println("\n4. 自定义配置窗口测试")
customConfig := types.DefaultPerformanceConfig()
customConfig.BufferConfig.WindowOutputSize = 2000 // 自定义窗口输出缓冲区大小
testWindowWithConfig("自定义配置", streamsql.New(streamsql.WithCustomPerformance(customConfig)))
fmt.Println("\n=== 窗口配置演示完成 ===")
}
func testWindowWithConfig(configName string, ssql *streamsql.Streamsql) {
// 执行一个简单的滚动窗口查询
sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('2s')"
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ %s - 执行SQL失败: %v\n", configName, err)
return
}
// 添加结果处理器
stream := ssql.Stream()
if stream != nil {
stream.AddSink(func(result interface{}) {
fmt.Printf("📊 %s - 窗口结果: %v\n", configName, result)
})
// 发送测试数据
for i := 0; i < 5; i++ {
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device_%d", i%2),
"temperature": 20.0 + float64(i),
"timestamp": time.Now(),
}
ssql.AddData(data)
}
// 等待处理完成
time.Sleep(3 * time.Second)
// 获取统计信息
stats := ssql.GetDetailedStats()
fmt.Printf("📈 %s - 统计信息: %v\n", configName, stats)
}
// 停止流处理
ssql.Stop()
fmt.Printf("✅ %s - 测试完成\n", configName)
}
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/types"
)
// demonstrateWindowConfig 演示窗口统一配置的使用
func demonstrateWindowConfig() {
fmt.Println("=== 窗口统一配置演示 ===")
// 1. 测试默认配置的窗口
fmt.Println("\n1. 默认配置窗口测试")
testWindowWithConfig("默认配置", streamsql.New())
// 2. 测试高性能配置的窗口
fmt.Println("\n2. 高性能配置窗口测试")
testWindowWithConfig("高性能配置", streamsql.New(streamsql.WithHighPerformance()))
// 3. 测试低延迟配置的窗口
fmt.Println("\n3. 低延迟配置窗口测试")
testWindowWithConfig("低延迟配置", streamsql.New(streamsql.WithLowLatency()))
// 4. 测试自定义配置的窗口
fmt.Println("\n4. 自定义配置窗口测试")
customConfig := types.DefaultPerformanceConfig()
customConfig.BufferConfig.WindowOutputSize = 2000 // 自定义窗口输出缓冲区大小
testWindowWithConfig("自定义配置", streamsql.New(streamsql.WithCustomPerformance(customConfig)))
fmt.Println("\n=== 窗口配置演示完成 ===")
}
func testWindowWithConfig(configName string, ssql *streamsql.Streamsql) {
// 执行一个简单的滚动窗口查询
sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('2s')"
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ %s - 执行SQL失败: %v\n", configName, err)
return
}
// 添加结果处理器
stream := ssql.Stream()
if stream != nil {
stream.AddSink(func(result interface{}) {
fmt.Printf("📊 %s - 窗口结果: %v\n", configName, result)
})
// 发送测试数据
for i := 0; i < 5; i++ {
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device_%d", i%2),
"temperature": 20.0 + float64(i),
"timestamp": time.Now(),
}
ssql.Emit(data)
}
// 等待处理完成
time.Sleep(3 * time.Second)
// 获取统计信息
stats := ssql.GetDetailedStats()
fmt.Printf("📈 %s - 统计信息: %v\n", configName, stats)
}
// 停止流处理
ssql.Stop()
fmt.Printf("✅ %s - 测试完成\n", configName)
}
+575 -12
View File
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+37 -3
View File
@@ -151,7 +151,15 @@ func (bridge *ExprBridge) CompileExpressionWithStreamSQLFunctions(expression str
// EvaluateExpression 评估表达式,自动选择最合适的引擎
func (bridge *ExprBridge) EvaluateExpression(expression string, data map[string]interface{}) (interface{}, error) {
// 首先检查是否包含LIKE操作符,如果有则进行预处理
// 首先预处理反引号标识符
if bridge.ContainsBacktickIdentifiers(expression) {
processedExpr, err := bridge.PreprocessBacktickIdentifiers(expression)
if err == nil {
expression = processedExpr
}
}
// 检查是否包含LIKE操作符,如果有则进行预处理
if bridge.ContainsLikeOperator(expression) {
processedExpr, err := bridge.PreprocessLikeExpression(expression)
if err == nil {
@@ -407,8 +415,9 @@ func (bridge *ExprBridge) isFunctionCall(expression string) bool {
// PreprocessLikeExpression 预处理LIKE表达式,转换为expr-lang可理解的函数调用
func (bridge *ExprBridge) PreprocessLikeExpression(expression string) (string, error) {
// 使用正则表达式匹配LIKE模式
// 匹配: field LIKE 'pattern' (允许空模式)
likePattern := `(\w+(?:\.\w+)*)\s+LIKE\s+'([^']*)'`
// 匹配: field LIKE 'pattern' 或 `field` LIKE 'pattern' (允许空模式)
// 支持反引号标识符和普通标识符
likePattern := `((?:` + "`" + `[^` + "`" + `]+` + "`" + `|\w+)(?:\.(?:` + "`" + `[^` + "`" + `]+` + "`" + `|\w+))*)\s+LIKE\s+'([^']*)'`
re, err := regexp.Compile(likePattern)
if err != nil {
return expression, err
@@ -424,6 +433,11 @@ func (bridge *ExprBridge) PreprocessLikeExpression(expression string) (string, e
field := submatches[1]
pattern := submatches[2]
// 处理反引号标识符,去除反引号
if len(field) >= 2 && field[0] == '`' && field[len(field)-1] == '`' {
field = field[1 : len(field)-1] // 去掉反引号
}
// 将LIKE模式转换为相应的函数调用
return bridge.convertLikeToFunction(field, pattern)
})
@@ -476,6 +490,26 @@ func (bridge *ExprBridge) PreprocessIsNullExpression(expression string) (string,
return result, nil
}
// ContainsBacktickIdentifiers 检查表达式是否包含反引号标识符
func (bridge *ExprBridge) ContainsBacktickIdentifiers(expression string) bool {
return strings.Contains(expression, "`")
}
// PreprocessBacktickIdentifiers 预处理反引号标识符,去除反引号
func (bridge *ExprBridge) PreprocessBacktickIdentifiers(expression string) (string, error) {
// 使用正则表达式匹配反引号标识符
// 匹配: `identifier` 或 `nested.field`
backtickPattern := "`([^`]+)`"
re, err := regexp.Compile(backtickPattern)
if err != nil {
return expression, err
}
// 替换所有反引号标识符,去除反引号
result := re.ReplaceAllString(expression, "$1")
return result, nil
}
// convertLikeToFunction 将LIKE模式转换为expr-lang操作符
func (bridge *ExprBridge) convertLikeToFunction(field, pattern string) string {
// 处理空模式
+2 -2
View File
@@ -27,8 +27,8 @@ func TestAggregatorFunctionInterface(t *testing.T) {
// 测试重置
aggInstance.Reset()
result = aggInstance.Result()
if result != 0.0 {
t.Errorf("Expected 0.0 after reset, got %v", result)
if result != nil {
t.Errorf("Expected nil after reset (SQL standard: SUM with no rows returns NULL), got %v", result)
}
// 测试克隆
+62 -6
View File
@@ -12,12 +12,14 @@ import (
// SumFunction 求和函数
type SumFunction struct {
*BaseFunction
value float64
value float64
hasValues bool // 标记是否有非NULL值
}
func NewSumFunction() *SumFunction {
return &SumFunction{
BaseFunction: NewBaseFunction("sum", TypeAggregation, "聚合函数", "计算数值总和", 1, -1),
hasValues: false,
}
}
@@ -27,12 +29,20 @@ func (f *SumFunction) Validate(args []interface{}) error {
func (f *SumFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
sum := 0.0
hasValues := false
for _, arg := range args {
if arg == nil {
continue // 忽略NULL值
}
val, err := cast.ToFloat64E(arg)
if err != nil {
return nil, err
continue // 忽略无法转换的值
}
sum += val
hasValues = true
}
if !hasValues {
return nil, nil // 当没有有效值时返回NULL
}
return sum, nil
}
@@ -42,27 +52,40 @@ func (f *SumFunction) New() AggregatorFunction {
return &SumFunction{
BaseFunction: f.BaseFunction,
value: 0,
hasValues: false,
}
}
func (f *SumFunction) Add(value interface{}) {
// 增强的Add方法:忽略NULL值
if value == nil {
return // 忽略NULL值
}
if val, err := cast.ToFloat64E(value); err == nil {
f.value += val
f.hasValues = true
}
// 如果转换失败,也忽略该值
}
func (f *SumFunction) Result() interface{} {
if !f.hasValues {
return nil // 当没有有效值时返回NULL而不是0.0
}
return f.value
}
func (f *SumFunction) Reset() {
f.value = 0
f.hasValues = false
}
func (f *SumFunction) Clone() AggregatorFunction {
return &SumFunction{
BaseFunction: f.BaseFunction,
value: f.value,
hasValues: f.hasValues,
}
}
@@ -85,14 +108,22 @@ func (f *AvgFunction) Validate(args []interface{}) error {
func (f *AvgFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
sum := 0.0
count := 0
for _, arg := range args {
if arg == nil {
continue // 忽略NULL值
}
val, err := cast.ToFloat64E(arg)
if err != nil {
return nil, err
continue // 忽略无法转换的值
}
sum += val
count++
}
return sum / float64(len(args)), nil
if count == 0 {
return nil, nil // 当没有有效值时返回nil
}
return sum / float64(count), nil
}
// 实现AggregatorFunction接口
@@ -105,10 +136,16 @@ func (f *AvgFunction) New() AggregatorFunction {
}
func (f *AvgFunction) Add(value interface{}) {
// 增强的Add方法:忽略NULL值
if value == nil {
return // 忽略NULL值
}
if val, err := cast.ToFloat64E(value); err == nil {
f.sum += val
f.count++
}
// 如果转换失败,也忽略该值
}
func (f *AvgFunction) Result() interface{} {
@@ -172,6 +209,11 @@ func (f *MinFunction) New() AggregatorFunction {
}
func (f *MinFunction) Add(value interface{}) {
// 增强的Add方法:忽略NULL值
if value == nil {
return // 忽略NULL值
}
if val, err := cast.ToFloat64E(value); err == nil {
if f.first || val < f.value {
f.value = val
@@ -241,6 +283,11 @@ func (f *MaxFunction) New() AggregatorFunction {
}
func (f *MaxFunction) Add(value interface{}) {
// 增强的Add方法:忽略NULL值
if value == nil {
return // 忽略NULL值
}
if val, err := cast.ToFloat64E(value); err == nil {
if f.first || val > f.value {
f.value = val
@@ -286,7 +333,13 @@ func (f *CountFunction) Validate(args []interface{}) error {
}
func (f *CountFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
return int64(len(args)), nil
count := 0
for _, arg := range args {
if arg != nil {
count++
}
}
return int64(count), nil
}
// 实现AggregatorFunction接口
@@ -298,7 +351,10 @@ func (f *CountFunction) New() AggregatorFunction {
}
func (f *CountFunction) Add(value interface{}) {
f.count++
// 增强的Add方法:忽略NULL值
if value != nil {
f.count++
}
}
func (f *CountFunction) Result() interface{} {
+59 -8
View File
@@ -17,6 +17,7 @@ import (
type SelectStatement struct {
Fields []Field
Distinct bool
SelectAll bool // 新增:标识是否是SELECT *查询
Source string
Condition string
Window WindowDefinition
@@ -92,13 +93,26 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
// 如果没有聚合函数,收集简单字段
if !hasAggregation {
for _, field := range s.Fields {
fieldName := field.Expression
if field.Alias != "" {
// 如果有别名,用别名作为字段名
simpleFields = append(simpleFields, fieldName+":"+field.Alias)
} else {
simpleFields = append(simpleFields, fieldName)
// 如果是SELECT *查询,设置特殊标记
if s.SelectAll {
simpleFields = append(simpleFields, "*")
} else {
for _, field := range s.Fields {
fieldName := field.Expression
if field.Alias != "" {
// 如果有别名,用别名作为字段名
simpleFields = append(simpleFields, fieldName+":"+field.Alias)
} else {
// 对于没有别名的字段,检查是否为字符串字面量
_, n, _, _ := ParseAggregateTypeWithExpression(fieldName)
if n != "" {
// 如果是字符串字面量,使用解析出的字段名(去掉引号)
simpleFields = append(simpleFields, n)
} else {
// 否则使用原始表达式
simpleFields = append(simpleFields, fieldName)
}
}
}
}
logger.Debug("收集简单字段: %v", simpleFields)
@@ -107,6 +121,9 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
// 构建字段映射和表达式信息
aggs, fields, expressions := buildSelectFieldsWithExpressions(s.Fields)
// 提取字段顺序信息
fieldOrder := extractFieldOrder(s.Fields)
// 构建Stream配置
config := types.Config{
WindowConfig: types.WindowConfig{
@@ -125,6 +142,7 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
SimpleFields: simpleFields,
Having: s.Having,
FieldExpressions: expressions,
FieldOrder: fieldOrder,
}
return &config, s.Condition, nil
@@ -169,10 +187,33 @@ func isAggregationFunction(expr string) bool {
if strings.Contains(expr, "(") && strings.Contains(expr, ")") {
return true
}
return false
}
// extractFieldOrder 从Fields切片中提取字段的原始顺序
// 返回按SELECT语句中出现顺序排列的字段名列表
func extractFieldOrder(fields []Field) []string {
var fieldOrder []string
for _, field := range fields {
// 如果有别名,使用别名作为字段名
if field.Alias != "" {
fieldOrder = append(fieldOrder, field.Alias)
} else {
// 没有别名时,尝试解析表达式获取字段名
_, fieldName, _, _ := ParseAggregateTypeWithExpression(field.Expression)
if fieldName != "" {
// 如果解析出字段名(如字符串字面量),使用解析出的名称
fieldOrder = append(fieldOrder, fieldName)
} else {
// 否则使用原始表达式作为字段名
fieldOrder = append(fieldOrder, field.Expression)
}
}
}
return fieldOrder
}
func extractGroupFields(s *SelectStatement) []string {
var fields []string
for _, f := range s.GroupBy {
@@ -261,6 +302,15 @@ func ParseAggregateTypeWithExpression(exprStr string) (aggType aggregator.Aggreg
// 提取函数名
funcName := extractFunctionName(exprStr)
if funcName == "" {
// 检查是否是字符串字面量
trimmed := strings.TrimSpace(exprStr)
if (strings.HasPrefix(trimmed, "'") && strings.HasSuffix(trimmed, "'")) ||
(strings.HasPrefix(trimmed, "\"") && strings.HasSuffix(trimmed, "\"")) {
// 字符串字面量:使用去掉引号的内容作为字段名
fieldName := trimmed[1 : len(trimmed)-1]
return "expression", fieldName, exprStr, nil
}
// 如果不是函数调用,但包含运算符或关键字,可能是表达式
if strings.ContainsAny(exprStr, "+-*/<>=!&|") ||
strings.Contains(strings.ToUpper(exprStr), "AND") ||
@@ -638,6 +688,7 @@ func buildSelectFieldsWithExpressions(fields []Field) (
// 没有别名的情况,使用表达式本身作为字段名
t, n, expression, allFields := ParseAggregateTypeWithExpression(f.Expression)
if t != "" && n != "" {
// 对于字符串字面量,使用解析出的字段名(去掉引号)作为键
selectFields[n] = t
fieldMap[n] = n
+39
View File
@@ -12,6 +12,7 @@ const (
TokenIdent
TokenNumber
TokenString
TokenQuotedIdent // 反引号标识符
TokenComma
TokenLParen
TokenRParen
@@ -176,6 +177,8 @@ func (l *Lexer) NextToken() Token {
return l.readStringToken(tokenPos, tokenLine, tokenColumn)
case '"':
return l.readStringToken(tokenPos, tokenLine, tokenColumn)
case '`':
return l.readQuotedIdentToken(tokenPos, tokenLine, tokenColumn)
}
if isLetter(l.ch) {
@@ -439,6 +442,42 @@ func (l *Lexer) readStringToken(pos, line, column int) Token {
return Token{Type: TokenString, Value: value, Pos: pos, Line: line, Column: column}
}
// readQuotedIdentToken 读取反引号标识符token并处理错误
func (l *Lexer) readQuotedIdentToken(pos, line, column int) Token {
startPos := l.pos
l.readChar() // 跳过开头反引号
for l.ch != '`' && l.ch != 0 {
l.readChar()
}
if l.ch == 0 {
// 未闭合的反引号标识符
if l.errorRecovery != nil {
err := &ParseError{
Type: ErrorTypeUnterminatedString,
Message: "Unterminated quoted identifier",
Position: startPos,
Line: line,
Column: column,
Token: "`",
Suggestions: []string{"Add closing backtick '`'"},
Recoverable: true,
}
l.errorRecovery.AddError(err)
}
value := l.input[startPos:l.pos]
return Token{Type: TokenQuotedIdent, Value: value, Pos: pos, Line: line, Column: column}
}
if l.ch == '`' {
l.readChar() // 跳过结尾反引号
}
value := l.input[startPos:l.pos]
return Token{Type: TokenQuotedIdent, Value: value, Pos: pos, Line: line, Column: column}
}
// isValidNumber 验证数字格式
func (l *Lexer) isValidNumber(number string) bool {
if number == "" {
+126
View File
@@ -0,0 +1,126 @@
package rsql
import (
"testing"
"github.com/stretchr/testify/assert"
)
// TestQuotedIdentifiers 测试反引号标识符的词法分析
func TestQuotedIdentifiers(t *testing.T) {
t.Run("基本反引号标识符", func(t *testing.T) {
lexer := NewLexer("`deviceId`")
token := lexer.NextToken()
assert.Equal(t, TokenQuotedIdent, token.Type)
assert.Equal(t, "`deviceId`", token.Value)
})
t.Run("包含特殊字符的反引号标识符", func(t *testing.T) {
lexer := NewLexer("`device-id`")
token := lexer.NextToken()
assert.Equal(t, TokenQuotedIdent, token.Type)
assert.Equal(t, "`device-id`", token.Value)
})
t.Run("包含空格的反引号标识符", func(t *testing.T) {
lexer := NewLexer("`device id`")
token := lexer.NextToken()
assert.Equal(t, TokenQuotedIdent, token.Type)
assert.Equal(t, "`device id`", token.Value)
})
t.Run("未闭合的反引号标识符", func(t *testing.T) {
lexer := NewLexer("`deviceId")
errorRecovery := NewErrorRecovery(nil)
lexer.SetErrorRecovery(errorRecovery)
token := lexer.NextToken()
assert.Equal(t, TokenQuotedIdent, token.Type)
assert.True(t, errorRecovery.HasErrors())
errors := errorRecovery.GetErrors()
assert.Equal(t, 1, len(errors))
assert.Equal(t, ErrorTypeUnterminatedString, errors[0].Type)
})
}
// TestStringLiterals 测试字符串常量的词法分析
func TestStringLiterals(t *testing.T) {
t.Run("单引号字符串", func(t *testing.T) {
lexer := NewLexer("'hello world'")
token := lexer.NextToken()
assert.Equal(t, TokenString, token.Type)
assert.Equal(t, "'hello world'", token.Value)
})
t.Run("双引号字符串", func(t *testing.T) {
lexer := NewLexer(`"hello world"`)
token := lexer.NextToken()
assert.Equal(t, TokenString, token.Type)
assert.Equal(t, `"hello world"`, token.Value)
})
t.Run("包含特殊字符的字符串", func(t *testing.T) {
lexer := NewLexer("'test-value_123'")
token := lexer.NextToken()
assert.Equal(t, TokenString, token.Type)
assert.Equal(t, "'test-value_123'", token.Value)
})
t.Run("空字符串", func(t *testing.T) {
lexer := NewLexer("''")
token := lexer.NextToken()
assert.Equal(t, TokenString, token.Type)
assert.Equal(t, "''", token.Value)
})
}
// TestComplexSQL 测试复杂SQL语句的词法分析
func TestComplexSQL(t *testing.T) {
t.Run("包含反引号标识符和字符串常量的SQL", func(t *testing.T) {
sql := "SELECT `deviceId`, deviceType, 'aa' as test FROM stream WHERE `deviceId` LIKE 'sensor%'"
lexer := NewLexer(sql)
// 验证token序列
expectedTokens := []struct {
Type TokenType
Value string
}{
{TokenSELECT, "SELECT"},
{TokenQuotedIdent, "`deviceId`"},
{TokenComma, ","},
{TokenIdent, "deviceType"},
{TokenComma, ","},
{TokenString, "'aa'"},
{TokenAS, "as"},
{TokenIdent, "test"},
{TokenFROM, "FROM"},
{TokenIdent, "stream"},
{TokenWHERE, "WHERE"},
{TokenQuotedIdent, "`deviceId`"},
{TokenLIKE, "LIKE"},
{TokenString, "'sensor%'"},
{TokenEOF, ""},
}
for i, expected := range expectedTokens {
token := lexer.NextToken()
assert.Equal(t, expected.Type, token.Type, "Token %d type mismatch", i)
if expected.Value != "" {
assert.Equal(t, expected.Value, token.Value, "Token %d value mismatch", i)
}
}
})
t.Run("双引号字符串常量", func(t *testing.T) {
sql := `SELECT deviceId, "test value" as name FROM stream`
lexer := NewLexer(sql)
// 跳过前面的token直到字符串
lexer.NextToken() // SELECT
lexer.NextToken() // deviceId
lexer.NextToken() // ,
token := lexer.NextToken() // "test value"
assert.Equal(t, TokenString, token.Type)
assert.Equal(t, `"test value"`, token.Value)
})
}
+25 -6
View File
@@ -81,6 +81,8 @@ func (p *Parser) getTokenTypeName(tokenType TokenType) string {
return ")"
case TokenIdent:
return "identifier"
case TokenQuotedIdent:
return "quoted identifier"
case TokenNumber:
return "number"
case TokenString:
@@ -212,6 +214,23 @@ func (p *Parser) parseSelect(stmt *SelectStatement) error {
currentToken = p.lexer.NextToken() // 消费 DISTINCT,移动到下一个 token
}
// 检查是否是SELECT *查询
if currentToken.Type == TokenIdent && currentToken.Value == "*" {
stmt.SelectAll = true
// 添加一个特殊的字段标记SELECT *
stmt.Fields = append(stmt.Fields, Field{Expression: "*"})
// 消费*token并检查下一个token
currentToken = p.lexer.NextToken()
// 如果下一个token是FROM或EOF,则完成SELECT *解析
if currentToken.Type == TokenFROM || currentToken.Type == TokenEOF {
return nil
}
// 如果不是FROM/EOF,继续正常的字段解析流程
}
// 设置最大字段数量限制,防止无限循环
maxFields := 100
fieldCount := 0
@@ -289,12 +308,12 @@ func (p *Parser) parseSelect(stmt *SelectStatement) error {
shouldAddSpace = false
}
}
} else if len(exprStr) > 0 && currentToken.Type == TokenIdent {
// 检查前一个字符是否是数字,且前面没有空格
if (lastChar[0] >= '0' && lastChar[0] <= '9') && !strings.HasSuffix(exprStr, " ") {
shouldAddSpace = false
}
} else if len(exprStr) > 0 && (currentToken.Type == TokenIdent || currentToken.Type == TokenQuotedIdent) {
// 检查前一个字符是否是数字,且前面没有空格
if (lastChar[0] >= '0' && lastChar[0] <= '9') && !strings.HasSuffix(exprStr, " ") {
shouldAddSpace = false
}
}
if shouldAddSpace {
expr.WriteString(" ")
@@ -368,7 +387,7 @@ func (p *Parser) parseWhere(stmt *SelectStatement) error {
break
}
switch tok.Type {
case TokenIdent, TokenNumber:
case TokenIdent, TokenNumber, TokenQuotedIdent:
conditions = append(conditions, tok.Value)
case TokenString:
conditions = append(conditions, tok.Value)
+864 -192
View File
File diff suppressed because it is too large Load Diff
+223 -7
View File
@@ -55,7 +55,7 @@ func TestStreamProcess(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口关闭并触发结果
@@ -139,7 +139,7 @@ func TestStreamWithoutFilter(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 捕获结果
@@ -235,7 +235,7 @@ func TestIncompleteStreamProcess(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口关闭并触发结果
@@ -323,7 +323,7 @@ func TestWindowSlotAgg(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 捕获结果
@@ -492,7 +492,7 @@ func TestStreamWithPersistenceStrategy(t *testing.T) {
"temperature": float64(20 + i),
"timestamp": time.Now(),
}
stream.AddData(data)
stream.Emit(data)
}
// 等待处理完成
@@ -546,7 +546,7 @@ func TestStreamPersistenceRecovery(t *testing.T) {
}
for _, data := range testData {
stream1.AddData(data)
stream1.Emit(data)
}
// 等待数据持久化
@@ -695,7 +695,7 @@ func TestStreamPersistencePerformance(t *testing.T) {
"value": i,
"data": fmt.Sprintf("performance_test_data_%d", i),
}
stream.AddData(data)
stream.Emit(data)
}
elapsed := time.Since(start)
@@ -754,3 +754,219 @@ func TestStreamsqlPersistenceConfigPassing(t *testing.T) {
t.Logf("持久化配置验证通过: %+v", stats)
}
func TestSelectStarWithExpressionFields(t *testing.T) {
config := types.Config{
NeedWindow: false,
SimpleFields: []string{"*"}, // SELECT *
FieldExpressions: map[string]types.FieldExpression{
"name": {
Expression: "UPPER(name)",
Fields: []string{"name"},
},
"full_info": {
Expression: "CONCAT(name, ' - ', status)",
Fields: []string{"name", "status"},
},
},
}
stream, err := NewStream(config)
if err != nil {
t.Fatalf("Failed to create stream: %v", err)
}
defer stream.Stop()
// 收集结果 - 使用sync.Mutex防止数据竞争
var mu sync.Mutex
var results []interface{}
stream.AddSink(func(result interface{}) {
mu.Lock()
defer mu.Unlock()
results = append(results, result)
})
stream.Start()
// 添加测试数据
testData := map[string]interface{}{
"name": "john",
"status": "active",
"age": 25,
}
stream.Emit(testData)
// 等待处理完成
time.Sleep(100 * time.Millisecond)
// 验证结果 - 使用互斥锁保护读取
mu.Lock()
resultsLen := len(results)
var resultData map[string]interface{}
if resultsLen > 0 {
resultData = results[0].([]map[string]interface{})[0]
}
mu.Unlock()
if resultsLen != 1 {
t.Fatalf("Expected 1 result, got %d", resultsLen)
}
// 验证表达式字段的结果没有被覆盖
if resultData["name"] != "JOHN" {
t.Errorf("Expected name to be 'JOHN' (uppercase), got %v", resultData["name"])
}
if resultData["full_info"] != "john - active" {
t.Errorf("Expected full_info to be 'john - active', got %v", resultData["full_info"])
}
// 验证原始字段仍然存在
if resultData["status"] != "active" {
t.Errorf("Expected status to be 'active', got %v", resultData["status"])
}
if resultData["age"] != 25 {
t.Errorf("Expected age to be 25, got %v", resultData["age"])
}
}
func TestSelectStarWithExpressionFieldsOverride(t *testing.T) {
// 测试表达式字段名与原始字段名相同的情况
config := types.Config{
NeedWindow: false,
SimpleFields: []string{"*"}, // SELECT *
FieldExpressions: map[string]types.FieldExpression{
"name": {
Expression: "UPPER(name)",
Fields: []string{"name"},
},
"age": {
Expression: "age * 2",
Fields: []string{"age"},
},
},
}
stream, err := NewStream(config)
if err != nil {
t.Fatalf("Failed to create stream: %v", err)
}
defer stream.Stop()
// 收集结果 - 使用sync.Mutex防止数据竞争
var mu sync.Mutex
var results []interface{}
stream.AddSink(func(result interface{}) {
mu.Lock()
defer mu.Unlock()
results = append(results, result)
})
stream.Start()
// 添加测试数据
testData := map[string]interface{}{
"name": "alice",
"age": 30,
"status": "active",
}
stream.Emit(testData)
// 等待处理完成
time.Sleep(100 * time.Millisecond)
// 验证结果 - 使用互斥锁保护读取
mu.Lock()
resultsLen := len(results)
var resultData map[string]interface{}
if resultsLen > 0 {
resultData = results[0].([]map[string]interface{})[0]
}
mu.Unlock()
if resultsLen != 1 {
t.Fatalf("Expected 1 result, got %d", resultsLen)
}
// 验证表达式字段的结果覆盖了原始字段
if resultData["name"] != "ALICE" {
t.Errorf("Expected name to be 'ALICE' (expression result), got %v", resultData["name"])
}
// 检查age表达式的结果(可能是int或float64类型)
ageResult := resultData["age"]
if ageResult != 60 && ageResult != 60.0 {
t.Errorf("Expected age to be 60 (expression result), got %v (type: %T)", resultData["age"], resultData["age"])
}
// 验证没有表达式的字段保持原值
if resultData["status"] != "active" {
t.Errorf("Expected status to be 'active', got %v", resultData["status"])
}
}
func TestSelectStarWithoutExpressionFields(t *testing.T) {
// 测试没有表达式字段时SELECT *的行为
config := types.Config{
NeedWindow: false,
SimpleFields: []string{"*"}, // SELECT *
}
stream, err := NewStream(config)
if err != nil {
t.Fatalf("Failed to create stream: %v", err)
}
defer stream.Stop()
// 收集结果 - 使用sync.Mutex防止数据竞争
var mu sync.Mutex
var results []interface{}
stream.AddSink(func(result interface{}) {
mu.Lock()
defer mu.Unlock()
results = append(results, result)
})
stream.Start()
// 添加测试数据
testData := map[string]interface{}{
"name": "bob",
"age": 35,
"status": "inactive",
}
stream.Emit(testData)
// 等待处理完成
time.Sleep(100 * time.Millisecond)
// 验证结果 - 使用互斥锁保护读取
mu.Lock()
resultsLen := len(results)
var resultData map[string]interface{}
if resultsLen > 0 {
resultData = results[0].([]map[string]interface{})[0]
}
mu.Unlock()
if resultsLen != 1 {
t.Fatalf("Expected 1 result, got %d", resultsLen)
}
// 验证所有原始字段都被保留
if resultData["name"] != "bob" {
t.Errorf("Expected name to be 'bob', got %v", resultData["name"])
}
if resultData["age"] != 35 {
t.Errorf("Expected age to be 35, got %v", resultData["age"])
}
if resultData["status"] != "inactive" {
t.Errorf("Expected status to be 'inactive', got %v", resultData["status"])
}
}
+160 -6
View File
@@ -22,6 +22,7 @@ import (
"github.com/rulego/streamsql/rsql"
"github.com/rulego/streamsql/stream"
"github.com/rulego/streamsql/types"
"github.com/rulego/streamsql/utils/table"
)
// Streamsql 是StreamSQL流处理引擎的主要接口。
@@ -31,13 +32,19 @@ import (
//
// ssql := streamsql.New()
// err := ssql.Execute("SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')")
// ssql.AddData(map[string]interface{}{"temperature": 25.5})
// ssql.Emit(map[string]interface{}{"temperature": 25.5})
type Streamsql struct {
stream *stream.Stream
// 性能配置模式
performanceMode string // "default", "high_performance", "low_latency", "zero_data_loss", "custom"
customConfig *types.PerformanceConfig
// 新增:同步处理模式配置
enableSyncMode bool // 是否启用同步模式(用于非聚合查询)
// 保存原始SELECT字段顺序,用于表格输出时保持字段顺序
fieldOrder []string
}
// New 创建一个新的StreamSQL实例。
@@ -122,6 +129,9 @@ func (s *Streamsql) Execute(sql string) error {
return fmt.Errorf("SQL解析失败: %w", err)
}
// 从解析结果中获取字段顺序信息
s.fieldOrder = config.FieldOrder
// 根据性能模式创建流处理器
var streamInstance *stream.Stream
@@ -158,7 +168,7 @@ func (s *Streamsql) Execute(sql string) error {
return nil
}
// AddData 向流中添加一条数据记录。
// Emit 向流中添加一条数据记录。
// 数据会根据已配置的SQL查询进行处理和聚合。
//
// 支持的数据格式:
@@ -171,7 +181,7 @@ func (s *Streamsql) Execute(sql string) error {
// 示例:
//
// // 添加设备数据
// ssql.AddData(map[string]interface{}{
// ssql.Emit(map[string]interface{}{
// "deviceId": "sensor001",
// "temperature": 25.5,
// "humidity": 60.0,
@@ -179,17 +189,75 @@ func (s *Streamsql) Execute(sql string) error {
// })
//
// // 添加用户行为数据
// ssql.AddData(map[string]interface{}{
// ssql.Emit(map[string]interface{}{
// "userId": "user123",
// "action": "click",
// "page": "/home",
// })
func (s *Streamsql) AddData(data interface{}) {
func (s *Streamsql) Emit(data interface{}) {
if s.stream != nil {
s.stream.AddData(data)
s.stream.Emit(data)
}
}
// EmitSync 同步处理数据,立即返回处理结果。
// 仅适用于非聚合查询(如过滤、转换等),聚合查询会返回错误。
//
// 对于非聚合查询,此方法提供同步的数据处理能力,同时:
// 1. 立即返回处理结果(同步)
// 2. 触发已注册的AddSink回调(异步)
//
// 这确保了同步和异步模式的一致性,用户可以同时获得:
// - 立即可用的处理结果
// - 异步回调处理(用于日志、监控、持久化等)
//
// 参数:
// - data: 要处理的数据
//
// 返回值:
// - interface{}: 处理后的结果,如果不匹配过滤条件返回nil
// - error: 处理错误,如果是聚合查询会返回错误
//
// 示例:
//
// // 添加日志回调
// ssql.AddSink(func(result interface{}) {
// fmt.Printf("异步日志: %v\n", result)
// })
//
// // 同步处理并立即获取结果
// result, err := ssql.EmitSync(map[string]interface{}{
// "temperature": 25.5,
// "humidity": 60.0,
// })
// if err != nil {
// // 处理错误
// } else if result != nil {
// // 立即使用处理结果
// fmt.Printf("同步结果: %v\n", result)
// // 同时异步回调也会被触发
// }
func (s *Streamsql) EmitSync(data interface{}) (interface{}, error) {
if s.stream == nil {
return nil, fmt.Errorf("stream未初始化")
}
// 检查是否为非聚合查询
if s.stream.IsAggregationQuery() {
return nil, fmt.Errorf("同步模式仅支持非聚合查询,聚合查询请使用Emit()方法")
}
return s.stream.ProcessSync(data)
}
// IsAggregationQuery 检查当前查询是否为聚合查询
func (s *Streamsql) IsAggregationQuery() bool {
if s.stream == nil {
return false
}
return s.stream.IsAggregationQuery()
}
// Stream 返回底层的流处理器实例。
// 通过此方法可以访问更底层的流处理功能。
//
@@ -248,3 +316,89 @@ func (s *Streamsql) Stop() {
s.stream.Stop()
}
}
// AddSink 直接添加结果处理回调函数。
// 这是对 Stream().AddSink() 的便捷封装,使API调用更简洁。
//
// 参数:
// - sink: 结果处理函数,接收处理结果作为参数
//
// 示例:
//
// // 直接添加结果处理
// ssql.AddSink(func(result interface{}) {
// fmt.Printf("处理结果: %v\n", result)
// })
//
// // 添加多个处理器
// ssql.AddSink(func(result interface{}) {
// // 保存到数据库
// saveToDatabase(result)
// })
// ssql.AddSink(func(result interface{}) {
// // 发送到消息队列
// sendToQueue(result)
// })
func (s *Streamsql) AddSink(sink func(interface{})) {
if s.stream != nil {
s.stream.AddSink(sink)
}
}
// PrintTable 以表格形式打印结果到控制台,类似数据库输出格式。
// 首先显示列名,然后逐行显示数据。
//
// 支持的数据格式:
// - []map[string]interface{}: 多行记录
// - map[string]interface{}: 单行记录
// - 其他类型: 直接打印
//
// 示例:
//
// // 表格式打印结果
// ssql.PrintTable()
//
// // 输出格式:
// // +--------+----------+
// // | device | max_temp |
// // +--------+----------+
// // | aa | 30.0 |
// // | bb | 22.0 |
// // +--------+----------+
func (s *Streamsql) PrintTable() {
s.AddSink(func(result interface{}) {
s.printTableFormat(result)
})
}
// printTableFormat 格式化打印表格数据
func (s *Streamsql) printTableFormat(result interface{}) {
table.FormatTableData(result, s.fieldOrder)
}
// ToChannel 返回结果通道,用于异步获取处理结果。
// 通过此通道可以以非阻塞方式获取流处理结果。
//
// 返回值:
// - <-chan interface{}: 只读的结果通道,如果未执行SQL则返回nil
//
// 示例:
//
// // 获取结果通道
// resultChan := ssql.ToChannel()
// if resultChan != nil {
// go func() {
// for result := range resultChan {
// fmt.Printf("异步结果: %v\n", result)
// }
// }()
// }
//
// 注意:
// - 必须有消费者持续从通道读取数据,否则可能导致流处理阻塞
func (s *Streamsql) ToChannel() <-chan interface{} {
if s.stream != nil {
return s.stream.GetResultsChan()
}
return nil
}
+259 -1996
View File
File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More