feat:SQL supports dot syntax to access fields

This commit is contained in:
rulego-team
2025-06-16 12:41:19 +08:00
parent 8d12c2860f
commit a05f4ace98
12 changed files with 1283 additions and 327 deletions
+67
View File
@@ -16,6 +16,7 @@ Similar to: [Apache Flink](https://flink.apache.org/) and [ekuiper](https://ekui
- Pure in-memory operations
- No dependencies
- Data processing with SQL syntax
- **Nested field access**: Support dot notation syntax (`device.info.name`) for accessing nested structured data
- Data analysis
- Built-in multiple window types: sliding window, tumbling window, counting window
- Built-in aggregate functions: MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
@@ -143,6 +144,72 @@ func main() {
}
```
### Nested Field Access
StreamSQL supports querying nested structured data using dot notation (`.`) syntax to access nested fields:
```go
// Nested field access example
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
defer ssql.Stop()
// SQL query using nested fields - supports dot notation syntax for accessing nested structures
rsql := `SELECT device.info.name as device_name,
device.location,
AVG(sensor.temperature) as avg_temp,
COUNT(*) as sensor_count,
window_start() as start,
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
// Handle aggregation results
ssql.Stream().AddSink(func(result interface{}) {
fmt.Printf("Aggregation result: %+v\n", result)
})
// Add nested structured data
nestedData := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "temperature-sensor-001",
"type": "temperature",
},
"location": "smart-greenhouse-A",
},
"sensor": map[string]interface{}{
"temperature": 25.5,
"humidity": 60.2,
},
"timestamp": time.Now().Unix(),
}
ssql.Stream().AddData(nestedData)
}
```
**Nested Field Access Features:**
- Support dot notation syntax: `device.info.name`, `sensor.temperature`
- Can be used in all SQL clauses: SELECT, WHERE, GROUP BY
- Support aggregate functions: `AVG(sensor.temperature)`, `MAX(device.status.uptime)`
- Backward compatible: existing flat field access methods remain unchanged
## Functions
StreamSQL supports a variety of function types, including mathematical, string, conversion, aggregate, analytic, window, and more. [Documentation](docs/FUNCTIONS_USAGE_GUIDE.md)
+67
View File
@@ -16,6 +16,7 @@
- 纯内存操作
- 无依赖
- SQL语法处理数据
- **嵌套字段访问**:支持点号语法(`device.info.name`)访问嵌套结构数据
- 数据分析
- 内置多种窗口类型:滑动窗口、滚动窗口、计数窗口
- 内置聚合函数MAX, MIN, AVG, SUM, STDDEV,MEDIAN,PERCENTILE等
@@ -159,6 +160,72 @@ func main() {
}
```
### 嵌套字段访问
StreamSQL 还支持对嵌套结构数据进行查询,可以使用点号(`.`)语法访问嵌套字段:
```go
// 嵌套字段访问示例
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
ssql := streamsql.New()
defer ssql.Stop()
// 使用嵌套字段的SQL查询 - 支持点号语法访问嵌套结构
rsql := `SELECT device.info.name as device_name,
device.location,
AVG(sensor.temperature) as avg_temp,
COUNT(*) as sensor_count,
window_start() as start,
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
// 处理聚合结果
ssql.Stream().AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %+v\n", result)
})
// 添加嵌套结构数据
nestedData := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "temperature-sensor-001",
"type": "temperature",
},
"location": "智能温室-A区",
},
"sensor": map[string]interface{}{
"temperature": 25.5,
"humidity": 60.2,
},
"timestamp": time.Now().Unix(),
}
ssql.Stream().AddData(nestedData)
}
```
**嵌套字段访问特性:**
- 支持点号语法:`device.info.name``sensor.temperature`
- 可用于 SELECT、WHERE、GROUP BY 等所有 SQL 子句
- 支持聚合函数:`AVG(sensor.temperature)``MAX(device.status.uptime)`
- 向后兼容:现有平坦字段访问方式保持不变
## 函数
StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合、分析、窗口等上百个函数。[文档](docs/FUNCTIONS_USAGE_GUIDE.md)
+43 -18
View File
@@ -7,6 +7,7 @@ import (
"sync"
"github.com/rulego/streamsql/functions"
"github.com/rulego/streamsql/utils"
"github.com/rulego/streamsql/utils/cast"
)
@@ -156,28 +157,40 @@ func (ga *GroupAggregator) Add(data interface{}) error {
key := ""
for _, field := range ga.groupFields {
var f reflect.Value
var fieldVal interface{}
var found bool
if v.Kind() == reflect.Map {
keyVal := reflect.ValueOf(field)
f = v.MapIndex(keyVal)
// 检查是否是嵌套字段
if utils.IsNestedField(field) {
fieldVal, found = utils.GetNestedField(data, field)
} else {
f = v.FieldByName(field)
// 原有的字段访问逻辑
var f reflect.Value
if v.Kind() == reflect.Map {
keyVal := reflect.ValueOf(field)
f = v.MapIndex(keyVal)
} else {
f = v.FieldByName(field)
}
if f.IsValid() {
fieldVal = f.Interface()
found = true
}
}
if !f.IsValid() {
if !found {
return fmt.Errorf("field %s not found", field)
}
keyVal := f.Interface()
if keyVal == nil {
if fieldVal == nil {
return fmt.Errorf("field %s has nil value", field)
}
if str, ok := keyVal.(string); ok {
if str, ok := fieldVal.(string); ok {
key += fmt.Sprintf("%s|", str)
} else {
key += fmt.Sprintf("%v|", keyVal)
key += fmt.Sprintf("%v|", fieldVal)
}
}
@@ -223,16 +236,29 @@ func (ga *GroupAggregator) Add(data interface{}) error {
continue
}
// 获取字段值
var f reflect.Value
if v.Kind() == reflect.Map {
keyVal := reflect.ValueOf(inputField)
f = v.MapIndex(keyVal)
// 获取字段值 - 支持嵌套字段
var fieldVal interface{}
var found bool
if utils.IsNestedField(inputField) {
fieldVal, found = utils.GetNestedField(data, inputField)
} else {
f = v.FieldByName(inputField)
// 原有的字段访问逻辑
var f reflect.Value
if v.Kind() == reflect.Map {
keyVal := reflect.ValueOf(inputField)
f = v.MapIndex(keyVal)
} else {
f = v.FieldByName(inputField)
}
if f.IsValid() {
fieldVal = f.Interface()
found = true
}
}
if !f.IsValid() {
if !found {
// 尝试从context中获取
if ga.context != nil {
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
@@ -247,7 +273,6 @@ func (ga *GroupAggregator) Add(data interface{}) error {
continue
}
fieldVal := f.Interface()
aggType := aggField.AggregateType
// 动态检查是否需要数值转换
+167
View File
@@ -0,0 +1,167 @@
# 嵌套字段访问功能
StreamSQL 支持对嵌套结构数据进行查询和聚合操作,可以使用点号(`.`)语法访问嵌套字段。
## 功能特性
- **点号语法访问**:支持 `field.subfield.property` 的访问方式
- **完整 SQL 支持**SELECT、WHERE、GROUP BY、聚合函数中都可以使用嵌套字段
- **类型兼容**:支持 `map[string]interface{}` 和结构体类型的嵌套访问
- **向后兼容**:现有的平坦字段访问方式保持不变
## 支持的数据格式
```json
{
"device": {
"info": {
"name": "sensor-001",
"type": "temperature"
},
"location": "room-A"
},
"sensor": {
"temperature": 25.5,
"humidity": 60.2
},
"timestamp": "2023-01-01T10:00:00Z"
}
```
## 使用示例
### 1. 基本查询
```sql
-- 查询设备信息和传感器数据
SELECT device.info.name as device_name,
device.location,
sensor.temperature
FROM stream
```
### 2. 条件过滤
```sql
-- 筛选特定房间的数据
SELECT device.info.name, sensor.temperature
FROM stream
WHERE device.location = 'room-A'
```
### 3. 聚合查询
```sql
-- 按房间统计平均温度
SELECT device.location,
AVG(sensor.temperature) as avg_temp,
COUNT(*) as sensor_count
FROM stream
GROUP BY device.location, TumblingWindow('1s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')
```
### 4. 复杂嵌套访问
```sql
-- 访问深层嵌套字段
SELECT device.info.name,
device.info.type,
sensor.temperature,
sensor.humidity
FROM stream
WHERE device.info.type = 'temperature'
```
## 实际应用示例
```go
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
func main() {
// 创建 StreamSQL 实例
ssql := streamsql.New()
defer ssql.Stop()
// 执行嵌套字段查询
rsql := `SELECT device.info.name as device_name,
device.location,
AVG(sensor.temperature) as avg_temp
FROM stream
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
// 添加数据处理回调
ssql.Stream().AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %+v\n", result)
})
// 添加嵌套结构数据
testData := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "sensor-001",
"type": "temperature",
},
"location": "room-A",
},
"sensor": map[string]interface{}{
"temperature": 25.5,
"humidity": 60.2,
},
"timestamp": time.Now().Unix(),
}
// 推送数据到流
ssql.Stream().AddData(testData)
}
```
## 输出结果示例
```json
[
{
"device.location": "room-A",
"avg_temp": 23.9
},
{
"device.location": "room-B",
"avg_temp": 28.1
}
]
```
## 技术实现
嵌套字段访问功能通过以下核心模块实现:
1. **词法分析器**:修改标识符解析,支持点号作为标识符的一部分
2. **工具函数**:提供 `GetNestedField()``SetNestedField()` 函数
3. **表达式引擎**:在字段访问时检查是否为嵌套字段并使用相应的访问方法
4. **聚合器**:支持嵌套字段作为分组键和聚合目标
5. **流处理器**:在数据处理过程中支持嵌套字段访问
## 性能考虑
- 嵌套字段访问比平坦字段略慢,因为需要逐层解析
- 建议在高频查询中避免过深的嵌套层级
- 对于频繁访问的嵌套字段,可以考虑在数据预处理阶段展平结构
## 注意事项
1. 字段路径区分大小写
2. 如果嵌套路径中某个层级不存在,该字段值将为 `null`
3. 嵌套字段名在结果中会保持完整路径(如 `device.location`),除非使用 `AS` 别名
4. 支持任意深度的嵌套,但建议控制在合理范围内以保证性能
+116
View File
@@ -0,0 +1,116 @@
# 嵌套字段访问演示
这个演示展示了 StreamSQL 的嵌套字段访问功能,包括基本查询和聚合操作。
## 功能特点
- **点号语法访问**:使用 `device.info.name` 的方式访问嵌套字段
- **完整 SQL 支持**:在 SELECT、WHERE、GROUP BY 中都支持嵌套字段
- **聚合计算**:支持对嵌套字段进行聚合计算
## 运行演示
```bash
cd examples/nested-field-access
go run main.go
```
## 演示内容
### 演示1基本嵌套字段查询
展示如何使用点号语法查询嵌套结构中的字段:
```sql
SELECT device.info.name as device_name,
device.location,
sensor.temperature,
sensor.humidity
FROM stream
```
### 演示2嵌套字段聚合查询
展示如何对嵌套字段进行分组聚合计算:
```sql
SELECT device.location,
device.info.type,
AVG(sensor.temperature) as avg_temp,
AVG(sensor.humidity) as avg_humidity,
COUNT(*) as sensor_count
FROM stream
GROUP BY device.location, device.info.type, TumblingWindow('3s')
```
## 数据结构
演示使用的嵌套数据结构:
```json
{
"device": {
"info": {
"name": "temperature-sensor-001",
"type": "temperature",
"model": "TempSense-Pro"
},
"location": "智能温室-A区",
"status": "online"
},
"sensor": {
"temperature": 24.5,
"humidity": 62.3
},
"timestamp": 1672531200
}
```
## 预期输出
### 基本查询输出
```
📊 演示1: 基本嵌套字段查询
📋 查询结果:
记录 1:
设备名称: temperature-sensor-001
设备位置: 智能温室-A区
温度: 24.5°C
湿度: 62.3%
记录 2:
设备名称: humidity-sensor-002
设备位置: 智能温室-B区
温度: 26.8°C
湿度: 58.7%
```
### 聚合查询输出
```
📈 演示2: 嵌套字段聚合查询
📊 聚合结果:
聚合组 1:
设备位置: 智能温室-A区
传感器类型: temperature
平均温度: 27.45°C
平均湿度: 62.31%
传感器数量: 4
窗口开始: 10:30:15
窗口结束: 10:30:18
```
## 技术要点
1. **嵌套字段访问**:使用点号分隔符访问多层嵌套的字段
2. **别名支持**:可以为嵌套字段设置简洁的别名
3. **聚合分组**:支持以嵌套字段作为分组键
4. **窗口计算**:在时间窗口内对嵌套字段进行聚合计算
## 应用场景
- **IoT 设备管理**:处理设备的多层级信息结构
- **传感器数据分析**:分析来自不同位置和类型的传感器数据
- **智能监控**:对复杂设备状态进行实时聚合分析
- **数据仓库ETL**处理和转换嵌套的JSON数据流
+209
View File
@@ -0,0 +1,209 @@
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
"github.com/rulego/streamsql"
)
func main() {
fmt.Println("🔍 StreamSQL 嵌套字段访问功能演示")
fmt.Println("=====================================")
// 创建 StreamSQL 实例
ssql := streamsql.New()
defer ssql.Stop()
// 演示1: 基本嵌套字段查询
fmt.Println("\n📊 演示1: 基本嵌套字段查询")
demonstrateBasicNestedQuery(ssql)
// 演示2: 嵌套字段聚合查询
fmt.Println("\n📈 演示2: 嵌套字段聚合查询")
demonstrateNestedAggregation(ssql)
fmt.Println("\n✅ 演示完成!")
}
// 演示基本嵌套字段查询
func demonstrateBasicNestedQuery(ssql *streamsql.Streamsql) {
// SQL查询提取设备信息和传感器数据
rsql := `SELECT device.info.name as device_name,
device.location,
sensor.temperature,
sensor.humidity
FROM stream`
err := ssql.Execute(rsql)
if err != nil {
fmt.Printf("❌ SQL执行失败: %v\n", err)
return
}
// 准备测试数据
testData := []map[string]interface{}{
{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "temperature-sensor-001",
"type": "temperature",
"model": "TempSense-Pro",
},
"location": "智能温室-A区",
"status": "online",
},
"sensor": map[string]interface{}{
"temperature": 24.5,
"humidity": 62.3,
},
"timestamp": time.Now().Unix(),
},
{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "humidity-sensor-002",
"type": "humidity",
"model": "HumiSense-X1",
},
"location": "智能温室-B区",
"status": "online",
},
"sensor": map[string]interface{}{
"temperature": 26.8,
"humidity": 58.7,
},
"timestamp": time.Now().Unix(),
},
}
var wg sync.WaitGroup
wg.Add(1)
// 设置结果回调
ssql.Stream().AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 📋 查询结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 设备位置: %v\n", item["device.location"])
fmt.Printf(" 温度: %v°C\n", item["sensor.temperature"])
fmt.Printf(" 湿度: %v%%\n", item["sensor.humidity"])
fmt.Println()
}
}
})
// 添加测试数据
for _, data := range testData {
ssql.Stream().AddData(data)
}
// 等待结果
wg.Wait()
}
// 演示嵌套字段聚合查询
func demonstrateNestedAggregation(ssql *streamsql.Streamsql) {
// SQL查询按设备位置统计平均温度和湿度
rsql := `SELECT device.location,
device.info.type,
AVG(sensor.temperature) as avg_temp,
AVG(sensor.humidity) as avg_humidity,
COUNT(*) as sensor_count,
window_start() as window_start,
window_end() as window_end
FROM stream
GROUP BY device.location, device.info.type, TumblingWindow('3s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
err := ssql.Execute(rsql)
if err != nil {
fmt.Printf("❌ SQL执行失败: %v\n", err)
return
}
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 设置结果回调
ssql.Stream().AddSink(func(result interface{}) {
fmt.Println(" 📊 聚合结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 聚合组 %d:\n", i+1)
fmt.Printf(" 设备位置: %v\n", item["device.location"])
fmt.Printf(" 传感器类型: %v\n", item["device.info.type"])
fmt.Printf(" 平均温度: %.2f°C\n", item["avg_temp"])
fmt.Printf(" 平均湿度: %.2f%%\n", item["avg_humidity"])
fmt.Printf(" 传感器数量: %v\n", item["sensor_count"])
fmt.Printf(" 窗口开始: %v\n", formatTime(item["window_start"]))
fmt.Printf(" 窗口结束: %v\n", formatTime(item["window_end"]))
fmt.Println()
}
}
})
// 数据生成器:模拟多个区域的传感器数据
go func() {
defer wg.Done()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
locations := []string{"智能温室-A区", "智能温室-B区", "智能温室-C区"}
sensorTypes := []string{"temperature", "humidity", "combo"}
for {
select {
case <-ticker.C:
// 每500ms生成3条不同位置的传感器数据
for i := 0; i < 3; i++ {
location := locations[rand.Intn(len(locations))]
sensorType := sensorTypes[rand.Intn(len(sensorTypes))]
data := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": fmt.Sprintf("%s-sensor-%03d", sensorType, rand.Intn(999)+1),
"type": sensorType,
"model": fmt.Sprintf("Model-%s", sensorType),
},
"location": location,
"status": "online",
},
"sensor": map[string]interface{}{
"temperature": 20.0 + rand.Float64()*15, // 20-35度
"humidity": 45.0 + rand.Float64()*30, // 45-75%
},
"timestamp": time.Now().Unix(),
}
ssql.Stream().AddData(data)
}
case <-ctx.Done():
return
}
}
}()
// 等待聚合结果
wg.Wait()
}
// 格式化时间戳
func formatTime(timestamp interface{}) string {
if ts, ok := timestamp.(int64); ok {
return time.Unix(ts, 0).Format("15:04:05")
}
return "N/A"
}
+31 -7
View File
@@ -7,6 +7,7 @@ import (
"strings"
"github.com/rulego/streamsql/functions"
"github.com/rulego/streamsql/utils"
)
// 表达式类型
@@ -267,13 +268,13 @@ func extractFieldsFromExprLang(expression string) []string {
// 暂时使用正则表达式或简单的字符串解析
fields := make(map[string]bool)
// 简单的字段提取:查找标识符模式
// 简单的字段提取:查找标识符模式,支持点号分隔的嵌套字段
tokens := strings.FieldsFunc(expression, func(c rune) bool {
return !(c >= 'a' && c <= 'z') && !(c >= 'A' && c <= 'Z') && !(c >= '0' && c <= '9') && c != '_'
return !(c >= 'a' && c <= 'z') && !(c >= 'A' && c <= 'Z') && !(c >= '0' && c <= '9') && c != '_' && c != '.'
})
for _, token := range tokens {
if isIdentifier(token) && !isNumber(token) && !isFunctionOrKeyword(token) {
if isValidFieldIdentifier(token) && !isNumber(token) && !isFunctionOrKeyword(token) {
fields[token] = true
}
}
@@ -285,6 +286,23 @@ func extractFieldsFromExprLang(expression string) []string {
return result
}
// isValidFieldIdentifier 检查是否是有效的字段标识符(支持点号分隔的嵌套字段)
func isValidFieldIdentifier(s string) bool {
if len(s) == 0 {
return false
}
// 分割点号分隔的字段
parts := strings.Split(s, ".")
for _, part := range parts {
if !isIdentifier(part) {
return false
}
}
return true
}
// isFunctionOrKeyword 检查是否是函数名或关键字
func isFunctionOrKeyword(token string) bool {
// 检查是否是已知函数或关键字
@@ -708,11 +726,17 @@ func evaluateNodeValue(node *ExprNode, data map[string]interface{}) (interface{}
return value, nil
case TypeField:
val, ok := data[node.Value]
if !ok {
return nil, fmt.Errorf("field %s not found in data", node.Value)
// 支持嵌套字段访问
if utils.IsNestedField(node.Value) {
if val, found := utils.GetNestedField(data, node.Value); found {
return val, nil
}
} else {
if val, ok := data[node.Value]; ok {
return val, nil
}
}
return val, nil
return nil, fmt.Errorf("field %s not found in data", node.Value)
default:
// 对于其他类型,回退到数值计算
+3 -3
View File
@@ -225,11 +225,11 @@ func (l *Lexer) peekChar() byte {
}
func (l *Lexer) readIdentifier() string {
pos := l.pos
for isLetter(l.ch) {
position := l.pos
for isLetter(l.ch) || isDigit(l.ch) || l.ch == '.' {
l.readChar()
}
return l.input[pos:l.pos]
return l.input[position:l.pos]
}
func (l *Lexer) readPreviousIdentifier() string {
+164 -36
View File
@@ -489,17 +489,29 @@ func (p *Parser) parseFrom(stmt *SelectStatement) error {
func (p *Parser) parseGroupBy(stmt *SelectStatement) error {
tok := p.lexer.lookupIdent(p.lexer.readPreviousIdentifier())
hasWindowFunction := false
if tok.Type == TokenTumbling || tok.Type == TokenSliding || tok.Type == TokenCounting || tok.Type == TokenSession {
hasWindowFunction = true
_ = p.parseWindowFunction(stmt, tok.Value)
}
hasGroupBy := false
if tok.Type == TokenGROUP {
hasGroupBy = true
p.lexer.NextToken() // 跳过BY
}
// 如果没有GROUP BY子句且没有窗口函数直接返回
if !hasGroupBy && !hasWindowFunction {
return nil
}
// 设置最大次数限制,防止无限循环
maxIterations := 100
iterations := 0
var limitToken *Token // 保存LIMIT token以便后续处理
for {
iterations++
// 安全检查:防止无限循环
@@ -510,6 +522,10 @@ func (p *Parser) parseGroupBy(stmt *SelectStatement) error {
tok := p.lexer.NextToken()
if tok.Type == TokenWITH || tok.Type == TokenOrder || tok.Type == TokenEOF ||
tok.Type == TokenHAVING || tok.Type == TokenLIMIT {
// 如果是LIMIT token保存它以便parseLimit处理
if tok.Type == TokenLIMIT {
limitToken = &tok
}
break
}
if tok.Type == TokenComma {
@@ -520,7 +536,15 @@ func (p *Parser) parseGroupBy(stmt *SelectStatement) error {
continue
}
stmt.GroupBy = append(stmt.GroupBy, tok.Value)
// 只有在有GROUP BY时才添加到GroupBy中
if hasGroupBy {
stmt.GroupBy = append(stmt.GroupBy, tok.Value)
}
}
// 如果遇到了LIMIT token直接在这里处理
if limitToken != nil {
return p.handleLimitToken(stmt, *limitToken)
}
return nil
}
@@ -607,42 +631,54 @@ func (p *Parser) parseWith(stmt *SelectStatement) error {
return nil
}
// parseLimit 解析LIMIT子句
func (p *Parser) parseLimit(stmt *SelectStatement) error {
// 查看当前token
if p.lexer.lookupIdent(p.lexer.readPreviousIdentifier()).Type == TokenLIMIT {
// 获取下一个token应该是一个数字
tok := p.lexer.NextToken()
if tok.Type == TokenNumber {
// 将数字字符串转换为整数
limit, err := strconv.Atoi(tok.Value)
if err != nil {
parseErr := CreateSyntaxError(
"LIMIT value must be a valid integer",
tok.Pos,
tok.Value,
[]string{"positive_integer"},
)
parseErr.Context = "LIMIT clause"
parseErr.Suggestions = []string{
"Use a positive integer, e.g., LIMIT 10",
"Ensure the number format is correct",
}
p.errorRecovery.AddError(parseErr)
return parseErr
// handleLimitToken 处理在parseGroupBy中遇到的LIMIT token
func (p *Parser) handleLimitToken(stmt *SelectStatement, limitToken Token) error {
// 获取下一个token应该是一个数字
tok := p.lexer.NextToken()
if tok.Type == TokenNumber {
// 将数字字符串转换为整数
limit, err := strconv.Atoi(tok.Value)
if err != nil {
parseErr := CreateSyntaxError(
"LIMIT value must be a valid integer",
tok.Pos,
tok.Value,
[]string{"positive_integer"},
)
parseErr.Context = "LIMIT clause"
parseErr.Suggestions = []string{
"Use a positive integer, e.g., LIMIT 10",
"Ensure the number format is correct",
}
if limit < 0 {
parseErr := CreateSyntaxError(
"LIMIT value must be positive",
tok.Pos,
tok.Value,
[]string{"positive_integer"},
)
parseErr.Suggestions = []string{"Use a positive integer, e.g., LIMIT 10"}
p.errorRecovery.AddError(parseErr)
return parseErr
}
stmt.Limit = limit
p.errorRecovery.AddError(parseErr)
return parseErr
}
if limit < 0 {
parseErr := CreateSyntaxError(
"LIMIT value must be positive",
tok.Pos,
tok.Value,
[]string{"positive_integer"},
)
parseErr.Suggestions = []string{"Use a positive integer, e.g., LIMIT 10"}
p.errorRecovery.AddError(parseErr)
return parseErr
}
stmt.Limit = limit
} else if tok.Type == TokenMinus {
// 处理负数情况:"-5"
nextTok := p.lexer.NextToken()
if nextTok.Type == TokenNumber {
parseErr := CreateSyntaxError(
"LIMIT value must be positive",
nextTok.Pos,
"-"+nextTok.Value,
[]string{"positive_integer"},
)
parseErr.Context = "LIMIT clause"
parseErr.Suggestions = []string{"Use a positive integer, e.g., LIMIT 10"}
p.errorRecovery.AddError(parseErr)
return parseErr
} else {
parseErr := CreateMissingTokenError("number", tok.Pos)
parseErr.Message = "LIMIT must be followed by an integer"
@@ -654,10 +690,102 @@ func (p *Parser) parseLimit(stmt *SelectStatement) error {
p.errorRecovery.AddError(parseErr)
return parseErr
}
} else {
// 处理非数字情况:如 "abc"
parseErr := CreateMissingTokenError("number", tok.Pos)
parseErr.Message = "LIMIT must be followed by an integer"
parseErr.Context = "LIMIT clause"
parseErr.Suggestions = []string{
"Add a number after LIMIT, e.g., LIMIT 10",
"Ensure LIMIT syntax is correct",
}
p.errorRecovery.AddError(parseErr)
return parseErr
}
return nil
}
// parseLimit 解析LIMIT子句
func (p *Parser) parseLimit(stmt *SelectStatement) error {
// 如果LIMIT已经被设置可能在parseGroupBy中已处理则跳过
if stmt.Limit > 0 {
return nil
}
// 直接解析输入字符串中的LIMIT子句
input := strings.ToUpper(p.input)
limitIndex := strings.LastIndex(input, "LIMIT")
if limitIndex == -1 {
return nil
}
// 找到LIMIT后面的内容
afterLimit := strings.TrimSpace(p.input[limitIndex+5:]) // 跳过"LIMIT"
if afterLimit == "" {
parseErr := CreateMissingTokenError("number", limitIndex+5)
parseErr.Message = "LIMIT must be followed by an integer"
parseErr.Context = "LIMIT clause"
parseErr.Suggestions = []string{
"Add a number after LIMIT, e.g., LIMIT 10",
"Ensure LIMIT syntax is correct",
}
p.errorRecovery.AddError(parseErr)
return parseErr
}
// 分割出第一个单词(应该是数字)
parts := strings.Fields(afterLimit)
if len(parts) == 0 {
parseErr := CreateMissingTokenError("number", limitIndex+5)
parseErr.Message = "LIMIT must be followed by an integer"
parseErr.Context = "LIMIT clause"
parseErr.Suggestions = []string{
"Add a number after LIMIT, e.g., LIMIT 10",
"Ensure LIMIT syntax is correct",
}
p.errorRecovery.AddError(parseErr)
return parseErr
}
limitValue := parts[0]
// 处理负数情况
if strings.HasPrefix(limitValue, "-") {
parseErr := CreateMissingTokenError("number", limitIndex+6)
parseErr.Message = "LIMIT must be followed by an integer"
parseErr.Context = "LIMIT clause"
parseErr.Suggestions = []string{"Use a positive integer, e.g., LIMIT 10"}
p.errorRecovery.AddError(parseErr)
return parseErr
}
// 尝试转换为整数
limit, err := strconv.Atoi(limitValue)
if err != nil {
parseErr := CreateMissingTokenError("number", limitIndex+6)
parseErr.Message = "LIMIT must be followed by an integer"
parseErr.Context = "LIMIT clause"
parseErr.Suggestions = []string{
"Add a number after LIMIT, e.g., LIMIT 10",
"Ensure LIMIT syntax is correct",
}
p.errorRecovery.AddError(parseErr)
return parseErr
}
if limit < 0 {
parseErr := CreateMissingTokenError("number", limitIndex+6)
parseErr.Message = "LIMIT must be followed by an integer"
parseErr.Context = "LIMIT clause"
parseErr.Suggestions = []string{"Use a positive integer, e.g., LIMIT 10"}
p.errorRecovery.AddError(parseErr)
return parseErr
}
stmt.Limit = limit
return nil
}
// parseHaving 解析HAVING子句
func (p *Parser) parseHaving(stmt *SelectStatement) error {
// 查看当前token
+14 -2
View File
@@ -17,6 +17,7 @@ import (
"github.com/rulego/streamsql/functions"
"github.com/rulego/streamsql/logger"
"github.com/rulego/streamsql/types"
"github.com/rulego/streamsql/utils"
"github.com/rulego/streamsql/window"
)
@@ -441,9 +442,20 @@ func (s *Stream) processDirectData(data interface{}) {
result[outputName] = nil
}
} else {
// 普通字段
if value, exists := dataMap[fieldName]; exists {
// 普通字段 - 支持嵌套字段
var value interface{}
var exists bool
if utils.IsNestedField(fieldName) {
value, exists = utils.GetNestedField(data, fieldName)
} else {
value, exists = dataMap[fieldName]
}
if exists {
result[outputName] = value
} else {
result[outputName] = nil
}
}
}
File diff suppressed because it is too large Load Diff
+141
View File
@@ -0,0 +1,141 @@
package utils
import (
"reflect"
"strings"
)
// GetNestedField 从嵌套的map或结构体中获取字段值
// 支持点号分隔的字段路径,如 "device.info.name"
func GetNestedField(data interface{}, fieldPath string) (interface{}, bool) {
if fieldPath == "" {
return nil, false
}
// 分割字段路径
fields := strings.Split(fieldPath, ".")
current := data
for _, field := range fields {
val, found := getFieldValue(current, field)
if !found {
return nil, false
}
current = val
}
return current, true
}
// getFieldValue 从单个层级获取字段值
func getFieldValue(data interface{}, fieldName string) (interface{}, bool) {
if data == nil {
return nil, false
}
v := reflect.ValueOf(data)
// 如果是指针,解引用
if v.Kind() == reflect.Ptr {
if v.IsNil() {
return nil, false
}
v = v.Elem()
}
switch v.Kind() {
case reflect.Map:
// 处理 map[string]interface{}
if v.Type().Key().Kind() == reflect.String {
mapVal := v.MapIndex(reflect.ValueOf(fieldName))
if mapVal.IsValid() {
return mapVal.Interface(), true
}
}
return nil, false
case reflect.Struct:
// 处理结构体
fieldVal := v.FieldByName(fieldName)
if fieldVal.IsValid() {
return fieldVal.Interface(), true
}
return nil, false
default:
return nil, false
}
}
// SetNestedField 在嵌套的map中设置字段值
// 如果路径中的某些层级不存在,会自动创建
func SetNestedField(data map[string]interface{}, fieldPath string, value interface{}) {
if fieldPath == "" {
return
}
fields := strings.Split(fieldPath, ".")
current := data
// 遍历到倒数第二层,确保路径存在
for i := 0; i < len(fields)-1; i++ {
field := fields[i]
if next, exists := current[field]; exists {
if nextMap, ok := next.(map[string]interface{}); ok {
current = nextMap
} else {
// 如果存在但不是map创建新的map覆盖
newMap := make(map[string]interface{})
current[field] = newMap
current = newMap
}
} else {
// 如果不存在创建新的map
newMap := make(map[string]interface{})
current[field] = newMap
current = newMap
}
}
// 设置最终的值
lastField := fields[len(fields)-1]
current[lastField] = value
}
// IsNestedField 检查字段名是否包含点号(嵌套字段)
func IsNestedField(fieldName string) bool {
return strings.Contains(fieldName, ".")
}
// ExtractTopLevelField 从嵌套字段路径中提取顶级字段名
// 例如:"device.info.name" 返回 "device"
func ExtractTopLevelField(fieldPath string) string {
if fieldPath == "" {
return ""
}
parts := strings.Split(fieldPath, ".")
return parts[0]
}
// GetAllReferencedFields 获取嵌套字段路径中引用的所有顶级字段
// 例如:["device.info.name", "sensor.temperature"] 返回 ["device", "sensor"]
func GetAllReferencedFields(fieldPaths []string) []string {
topLevelFields := make(map[string]bool)
for _, path := range fieldPaths {
if path != "" {
topField := ExtractTopLevelField(path)
if topField != "" {
topLevelFields[topField] = true
}
}
}
result := make([]string, 0, len(topLevelFields))
for field := range topLevelFields {
result = append(result, field)
}
return result
}