62 Commits

Author SHA1 Message Date
rulego-team ba2cdd5629 fix:修复有特殊符号出现多余字段 2025-12-16 11:50:34 +08:00
rulego-team 47b7e07c6b fix:修复特殊符号被认为非法 2025-12-16 11:34:30 +08:00
rulego-team 9739f213e9 fix:修复Trigger未清除过期数据问题 2025-12-16 11:15:33 +08:00
rulego-team 4528d1f1a6 fix(window): 修复事件时间窗口未触发问题并添加调试日志 2025-11-15 13:54:27 +08:00
Whki 6ce76d506c Merge pull request #43 from rulego/dev
Dev
2025-11-15 13:15:05 +08:00
rulego-team 8207e6ba8c fix:修复延迟数据处理逻辑 2025-11-15 13:06:06 +08:00
rulego-team b2c638671a ci:增加测试时间 2025-11-15 00:49:49 +08:00
rulego-team 1744502678 feat:增加事件时间窗口处理机制 2025-11-15 00:27:55 +08:00
Whki 8a6e41ab78 Merge pull request #42 from rulego/dev
Dev
2025-11-14 10:53:22 +08:00
rulego-team 14d9a0874b feat:优化stream停止 2025-11-14 10:46:42 +08:00
rulego-team 6f77dc5f7f fix:修复测试用例断言 2025-11-14 08:51:45 +08:00
Whki 2907d7e3a3 Merge pull request #41 from rulego/dev
Dev
2025-11-13 18:47:20 +08:00
rulego-team eb29ed921d fix:取消对齐时间 2025-11-13 18:46:30 +08:00
rulego-team 73f7843996 fix:第一个窗口后启动步长定时器 2025-11-13 18:46:29 +08:00
Whki 3ef10b86a9 Merge pull request #40 from rulego/dev
Dev
2025-11-13 11:18:08 +08:00
rulego-team e435ef2040 fix:修复测试错误 2025-11-13 11:11:49 +08:00
rulego-team c39f915808 refactor:重构窗口函数参数传递问题 2025-11-13 11:03:49 +08:00
rulego-team e05213267d chore:添加测试用例 2025-11-13 11:03:28 +08:00
rulego-team 5f05b0ef61 perf:优化阻塞问题 2025-11-13 11:02:53 +08:00
rulego-team 3662949105 fix:支持混合keys分组 2025-11-13 11:01:52 +08:00
rulego-team c86d22e06a fix:修复窗口对齐问题 2025-11-13 11:01:13 +08:00
rulego-team 645c233762 fix:根据group by分组计数 2025-11-13 10:58:10 +08:00
rulego-team 29ed63ea50 chore(ci):dev 分支增加自动化测试 2025-11-13 10:56:51 +08:00
rulego-team 5f7076de7b chore: add awesome badge 2025-10-08 12:01:10 +08:00
rulego-team 3464ff5f6e fix:unnest function to properly expand arrays into multiple rows in stream processing 2025-09-08 18:40:26 +08:00
rulego-team 1d9e2a3dab test: add test cases 2025-08-30 00:56:48 +08:00
rulego-team b23fdea2cd perf:增加代码覆盖率 2025-08-29 17:30:58 +08:00
rulego-team 696b5f7177 perf:优化sql语法解析 2025-08-29 17:30:11 +08:00
rulego-team 90afdead78 perf:重构聚合函数运算 2025-08-29 17:29:27 +08:00
rulego-team 4615b7a308 feat:支持聚合函数的后运算 #37 2025-08-29 13:49:26 +08:00
Whki de6ca91c01 Merge pull request #36 from rulego/dev
Dev
2025-08-08 17:57:53 +08:00
rulego-team 5b13316754 test:add test cases 2025-08-08 17:52:43 +08:00
rulego-team c2c8f86d7f ci: skip regular tests for Go 1.21, only run coverage tests 2025-08-08 09:54:54 +08:00
Whki c66d974dfc Merge pull request #35 from rulego/dev
test:add test cases
2025-08-08 09:50:08 +08:00
rulego-team 049295b599 test:add test cases 2025-08-08 09:40:26 +08:00
Whki e388bacde3 Merge pull request #34 from rulego/dev
test:add coverage test case
2025-08-08 09:07:54 +08:00
rulego-team d6a8778731 test:add coverage test case 2025-08-08 09:00:02 +08:00
Whki 9764b95b7a Merge pull request #33 from rulego/dev
Dev
2025-08-07 19:42:59 +08:00
rulego-team 3ad5146654 更新readme 2025-08-07 19:41:52 +08:00
rulego-team 74efe9e526 fix(aggregator): enable Count aggregator to handle any data type 2025-08-07 19:36:43 +08:00
rulego-team ff00fd1f31 refactor:格式化代码和完善测试用例 2025-08-07 19:23:48 +08:00
rulego-team 10319b45a6 feat:ToFloat64E 增加bool类型转换 2025-08-07 19:22:22 +08:00
rulego-team 790e6c615d feat:函数增加别名 2025-08-07 19:20:14 +08:00
rulego-team a066a4df1b refactor:重构表达式引擎模块 2025-08-07 19:18:40 +08:00
Whki 05a25619b8 Merge pull request #32 from rulego/dev
Dev
2025-08-06 18:20:53 +08:00
rulego-team 57983f19d7 refactor:删除持久化策略 2025-08-06 18:11:44 +08:00
rulego-team 6f5305ca01 feat:完善测试测试用例 2025-08-06 17:18:50 +08:00
rulego-team a46b833608 feat:优化持久加载机制 2025-08-06 17:16:47 +08:00
rulego-team 4249cef16b perf:优化日志打印 2025-08-06 17:15:31 +08:00
rulego-team 6bfb592bd0 chore:增加测试用例 2025-08-06 11:05:06 +08:00
rulego-team 691ca41c87 fix:优化SubstringFunction负数处理逻辑 2025-08-06 10:55:28 +08:00
Whki a8cf91298a Merge pull request #31 from rulego/dev
Dev
2025-08-05 11:33:11 +08:00
rulego-team f3fe997ce8 test:增加测试用例 2025-08-05 11:25:49 +08:00
rulego-team 98dab93e5b fix:Case 语句返回字符串不正确问题 2025-08-05 00:37:22 +08:00
rulego-team 937e8243cf fix:修复负数无法正常解析 2025-08-05 00:36:21 +08:00
rulego-team a43445ebc7 refactor:优化注释 2025-08-04 14:45:43 +08:00
rulego-team de4d47d87d ci:codecov-action 使用v3 2025-08-04 14:36:51 +08:00
rulego-team a47748d4c7 refactor: translate Chinese comments to English in functions directory
- Convert all Chinese function comments to English in functions package
- Update interface documentation for better international readability
- Maintain original logic and functionality unchanged
- Improve code documentation standards for global development
2025-08-04 12:35:33 +08:00
rulego-team ed2a063000 perf:优化持久化溢出策略 2025-08-04 11:27:41 +08:00
rulego-team 8ebd152ec9 refactor:Emit入参从interface{} 改成 map[string]interface{};AddSink(func(results interface{})改成AddSink(func(results []map[string]interface{}) 2025-08-03 23:41:11 +08:00
rulego-team 343d045554 refactor:重构 stream 模块结构 2025-08-01 18:55:32 +08:00
rulego-team ec0ac04ebf ci: remove create release step from workflow 2025-07-30 19:42:48 +08:00
182 changed files with 52742 additions and 14982 deletions
+9 -85
View File
@@ -2,9 +2,9 @@ name: CI
on:
push:
branches: [ main, master, develop ]
branches: [ main, dev ]
pull_request:
branches: [ main, master, develop ]
branches: [ main, dev ]
jobs:
test:
@@ -41,91 +41,15 @@ jobs:
run: go build -v ./...
- name: Run tests
run: go test -v -race -timeout 300s ./...
if: matrix.go-version != '1.21'
run: go test -v -race -timeout 600s ./...
- name: Run tests with coverage
if: matrix.go-version == '1.21'
run: go test -v -race -coverprofile=coverage.out -covermode=atomic -timeout 300s ./...
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 600s ./...
- name: Upload coverage to Codecov
- name: Upload coverage reports to Codecov
if: matrix.go-version == '1.21'
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
case-expression-tests:
name: CASE Expression Tests
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.21'
- name: Check out code
uses: actions/checkout@v4
- name: Cache Go modules
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-1.21-${{ hashFiles('**/go.sum') }}
- name: Download dependencies
run: go mod download
- name: Run CASE Expression Parsing Tests
run: go test -v -run TestCaseExpressionParsing -timeout 15s
- name: Run CASE Expression Comprehensive Tests
run: go test -v -run TestCaseExpressionComprehensive -timeout 15s
- name: Run CASE Expression Field Extraction Tests
run: go test -v -run TestCaseExpressionFieldExtraction -timeout 15s
- name: Run CASE Expression in SQL Tests
run: go test -v -run TestCaseExpressionInSQL -timeout 15s
- name: Run CASE Expression Aggregation Tests (with known limitations)
run: go test -v -run "TestCaseExpressionInAggregation|TestComplexCaseExpressionsInAggregation" -timeout 20s
- name: Run CASE Expression Edge Cases
run: go test -v -run TestCaseExpressionEdgeCases -timeout 15s
# lint:
# name: Lint
# runs-on: ubuntu-latest
#
# steps:
# - name: Set up Go
# uses: actions/setup-go@v4
# with:
# go-version: '1.21'
#
# - name: Check out code
# uses: actions/checkout@v4
#
# - name: Run golangci-lint
# uses: golangci/golangci-lint-action@v3
# with:
# version: latest
# args: --timeout=5m
#
# security:
# name: Security Scan
# runs-on: ubuntu-latest
#
# steps:
# - name: Set up Go
# uses: actions/setup-go@v4
# with:
# go-version: '1.21'
#
# - name: Check out code
# uses: actions/checkout@v4
#
# - name: Run Gosec Security Scanner
# uses: securecodewarrior/github-action-gosec@v1
# with:
# args: './...'
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
+4 -9
View File
@@ -5,6 +5,9 @@ on:
tags:
- 'v*'
permissions:
contents: write
jobs:
test:
name: Test Before Release
@@ -54,12 +57,4 @@ jobs:
key: ${{ runner.os }}-go-1.21-${{ hashFiles('**/go.sum') }}
- name: Download dependencies
run: go mod download
- name: Create Release
uses: softprops/action-gh-release@v1
with:
draft: false
prerelease: false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: go mod download
+109 -22
View File
@@ -3,12 +3,14 @@
[![Go Report](https://goreportcard.com/badge/github.com/rulego/streamsql)](https://goreportcard.com/report/github.com/rulego/streamsql)
[![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
[![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml)
[![codecov](https://codecov.io/gh/rulego/streamsql/graph/badge.svg?token=1CK1O5J1BI)](https://codecov.io/gh/rulego/streamsql)
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go?tab=readme-ov-file#stream-processing)
English| [简体中文](README_ZH.md)
**StreamSQL** is a lightweight, SQL-based stream processing engine for IoT edge, enabling efficient data processing and analysis on unbounded streams.
Similar to: [Apache Flink](https://flink.apache.org/) and [ekuiper](https://ekuiper.org/)
📖 **[Documentation](https://rulego.cc/en/pages/streamsql-overview/)** | Similar to: [Apache Flink](https://flink.apache.org/)
## Features
@@ -82,9 +84,9 @@ func main() {
}
// Handle real-time transformation results
ssql.AddSink(func(result interface{}) {
fmt.Printf("Real-time result: %+v\n", result)
})
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Real-time result: %+v\n", results)
})
// Simulate sensor data input
sensorData := []map[string]interface{}{
@@ -111,6 +113,7 @@ func main() {
// Process data one by one, each will output results immediately
for _, data := range sensorData {
ssql.Emit(data)
//changedData,err:=ssql.EmitSync(data) //Synchronize to obtain processing results
time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
}
@@ -148,7 +151,7 @@ func main() {
// Step 1: Create StreamSQL Instance
// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
ssql := streamsql.New()
defer ssql.Stop()
// Step 2: Define Stream SQL Query Statement
// This SQL statement showcases StreamSQL's core capabilities:
// - SELECT: Choose output fields and aggregation functions
@@ -195,8 +198,8 @@ func main() {
"humidity": 50.0 + rand.Float64()*20, // Humidity range: 50-70%
}
// Add data to stream, triggering StreamSQL's real-time processing
// AddData distributes data to corresponding windows and aggregators
ssql.stream.AddData(randomData)
// Emit distributes data to corresponding windows and aggregators
ssql.Emit(randomData)
}
case <-ctx.Done():
@@ -209,10 +212,10 @@ func main() {
// Step 6: Setup Result Processing Pipeline
resultChan := make(chan interface{})
// Add computation result callback function (Sink)
// When window triggers computation, results are output through this callback
ssql.stream.AddSink(func(result interface{}) {
resultChan <- result
})
// When window triggers computation, results are output through this callback
ssql.AddSink(func(results []map[string]interface{}) {
resultChan <- results
})
// Step 7: Start Result Consumer Goroutine
// Count received results for effect verification
@@ -264,8 +267,7 @@ func main() {
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
GROUP BY device.location, TumblingWindow('5s')`
err := ssql.Execute(rsql)
if err != nil {
@@ -273,9 +275,9 @@ func main() {
}
// Handle aggregation results
ssql.AddSink(func(result interface{}) {
fmt.Printf("Aggregation result: %+v\n", result)
})
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Aggregation result: %+v\n", results)
})
// Add nested structured data
nestedData := map[string]interface{}{
@@ -332,6 +334,11 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
- **Characteristics**: The size of the window is not related to time but is divided based on the volume of data. It is suitable for segmenting data based on the amount of data.
- **Application Scenario**: In industrial IoT, an aggregation calculation is performed every time 100 device status data records are processed.
- **Session Window**
- **Definition**: A dynamic window based on data activity. When the interval between data exceeds a specified timeout, the current session ends and triggers the window.
- **Characteristics**: Window size changes dynamically, automatically dividing sessions based on data arrival intervals. When data arrives continuously, the session continues; when the data interval exceeds the timeout, the session ends and triggers the window.
- **Application Scenario**: In user behavior analysis, maintain a session when users operate continuously, and close the session and count operations within that session when users stop operating for more than 5 minutes.
### Stream
- **Definition**: A continuous sequence of data that is generated in an unbounded manner, typically from sensors, log systems, user behaviors, etc.
@@ -340,17 +347,97 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
### Time Semantics
- **Event Time**
- **Definition**: The actual time when the data occurred, usually represented by a timestamp generated by the data source.
StreamSQL supports two time concepts that determine how windows are divided and triggered:
- **Processing Time**
- **Definition**: The time when the data arrives at the processing system.
#### Event Time
- **Definition**: Event time refers to the actual time when data was generated, usually recorded in a field within the data itself (such as `event_time`, `timestamp`, `order_time`, etc.).
- **Characteristics**:
- Windows are divided based on timestamp field values in the data
- Even if data arrives late, it can be correctly counted into the corresponding window based on event time
- Uses Watermark mechanism to handle out-of-order and late data
- Results are accurate but may have delays (need to wait for late data)
- **Use Cases**:
- Scenarios requiring precise temporal analysis
- Scenarios where data may arrive out of order or delayed
- Historical data replay and analysis
- **Configuration**: Use `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` to specify the event time field
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
```
#### Processing Time
- **Definition**: Processing time refers to the time when data arrives at the StreamSQL processing system, i.e., the current time when the system receives the data.
- **Characteristics**:
- Windows are divided based on the time data arrives at the system (`time.Now()`)
- Regardless of the time field value in the data, it is counted into the current window based on arrival time
- Uses system clock (Timer) to trigger windows
- Low latency but results may be inaccurate (cannot handle out-of-order and late data)
- **Use Cases**:
- Real-time monitoring and alerting scenarios
- Scenarios with high latency requirements and relatively low accuracy requirements
- Scenarios where data arrives in order and delay is controllable
- **Configuration**: Default when `WITH (TIMESTAMP=...)` is not specified
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- No WITH clause specified, defaults to processing time
```
#### Event Time vs Processing Time Comparison
| Feature | Event Time | Processing Time |
|---------|------------|-----------------|
| **Time Source** | Timestamp field in data | System current time |
| **Window Division** | Based on event timestamp | Based on data arrival time |
| **Late Data Handling** | Supported (Watermark mechanism) | Not supported |
| **Out-of-Order Handling** | Supported (Watermark mechanism) | Not supported |
| **Result Accuracy** | Accurate | May be inaccurate |
| **Processing Latency** | Higher (need to wait for late data) | Low (real-time trigger) |
| **Configuration** | `WITH (TIMESTAMP='field')` | Default (no WITH clause) |
| **Use Cases** | Precise temporal analysis, historical replay | Real-time monitoring, low latency requirements |
#### Window Time
- **Window Start Time**
- **Definition**: The starting time point of the window based on event time. For example, for a sliding window based on event time, the window start time is the timestamp of the earliest event within the window.
- **Event Time Windows**: The starting time point of the window, aligned to window boundaries based on event time (e.g., aligned to minute or hour boundaries).
- **Processing Time Windows**: The starting time point of the window, based on the time data arrives at the system.
- **Example**: For an event-time-based tumbling window `TumblingWindow('5m')`, the window start time aligns to multiples of 5 minutes (e.g., 10:00, 10:05, 10:10).
- **Window End Time**
- **Definition**: The ending time point of the window based on event time. Typically, the window end time is the window start time plus the duration of the window. For example, if the duration of a sliding window is 1 minute, then the window end time is the window start time plus 1 minute.
- **Event Time Windows**: The ending time point of the window, usually the window start time plus the window duration. Windows trigger when `watermark >= window_end`.
- **Processing Time Windows**: The ending time point of the window, based on the time data arrives at the system plus the window duration. Windows trigger when the system clock reaches the end time.
- **Example**: For a tumbling window with a duration of 1 minute, if the window start time is 10:00, then the window end time is 10:01.
#### Watermark Mechanism (Event Time Windows Only)
- **Definition**: Watermark indicates "events with timestamps less than this time should not arrive anymore", used to determine when windows can trigger.
- **Calculation Formula**: `Watermark = max(event_time) - MaxOutOfOrderness`
- **Window Trigger Condition**: Windows trigger when `watermark >= window_end`
- **Configuration Parameters**:
- `MAXOUTOFORDERNESS`: Maximum allowed out-of-order time for tolerating data disorder (default: 0, no out-of-order allowed)
- `ALLOWEDLATENESS`: Time window can accept late data after triggering (default: 0, no late data accepted)
- `IDLETIMEOUT`: Timeout for advancing Watermark based on processing time when data source is idle (default: 0, disabled)
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- Tolerate 5 seconds of out-of-order
ALLOWEDLATENESS='2s', -- Accept 2 seconds of late data after window triggers
IDLETIMEOUT='5s' -- Advance watermark based on processing time after 5s of no data
)
```
## Contribution Guidelines
+115 -21
View File
@@ -3,12 +3,14 @@
[![Go Report](https://goreportcard.com/badge/github.com/rulego/streamsql)](https://goreportcard.com/report/github.com/rulego/streamsql)
[![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
[![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml)
[![codecov](https://codecov.io/gh/rulego/streamsql/graph/badge.svg?token=1CK1O5J1BI)](https://codecov.io/gh/rulego/streamsql)
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go?tab=readme-ov-file#stream-processing)
[English](README.md)| 简体中文
**StreamSQL** 是一款轻量级的、基于 SQL 的物联网边缘流处理引擎。它能够高效地处理和分析无界数据流。
类似: [Apache Flink](https://flink.apache.org/) 和 [ekuiper](https://ekuiper.org/)
📖 **[官方文档](https://rulego.cc/pages/streamsql-overview/)** | 类似: [Apache Flink](https://flink.apache.org/)
## 功能特性
@@ -85,8 +87,8 @@ func main() {
}
// 处理实时转换结果
ssql.AddSink(func(result interface{}) {
fmt.Printf("实时处理结果: %+v\n", result)
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("实时处理结果: %+v\n", results)
})
// 模拟传感器数据输入
@@ -114,6 +116,7 @@ func main() {
// 逐条处理数据,每条都会立即输出结果
for _, data := range sensorData {
ssql.Emit(data)
//changedData,err:=ssql.EmitSync(data) //同步获得处理结果
time.Sleep(100 * time.Millisecond) // 模拟实时数据到达
}
@@ -148,7 +151,7 @@ import (
func main() {
// 1. 创建StreamSQL实例 - 这是流式SQL处理引擎的入口
ssql := streamsql.New()
defer ssql.Stop()
// 2. 定义流式SQL查询语句
// 核心概念解析:
// - TumblingWindow('5s'): 滚动窗口每5秒创建一个新窗口窗口之间不重叠
@@ -206,7 +209,7 @@ func main() {
// - 按deviceId分组
// - 将数据分配到对应的时间窗口
// - 更新聚合计算状态
ssql.stream.AddData(randomData)
ssql.Emit(randomData)
}
case <-ctx.Done():
@@ -221,8 +224,10 @@ func main() {
// 6. 注册结果回调函数
// 当窗口触发时每5秒会调用这个回调函数
// 传递聚合计算的结果
ssql.stream.AddSink(func(result interface{}) {
resultChan <- result
ssql.AddSink(func(results []map[string]interface{}) {
for _, result := range results {
resultChan <- result
}
})
// 7. 结果消费者 - 处理计算结果
@@ -280,8 +285,7 @@ func main() {
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
GROUP BY device.location, TumblingWindow('5s')`
err := ssql.Execute(rsql)
if err != nil {
@@ -289,18 +293,22 @@ func main() {
}
// 处理聚合结果
ssql.AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %+v\n", result)
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("聚合结果: %+v\n", results)
})
// 添加嵌套结构数据
nestedData := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "temperature-sensor-001",
"type": "temperature",
"name": "temperature-sensor-001",
"type": "temperature",
"status": "active",
},
"location": map[string]interface{}{
"building": "智能温室-A区",
"floor": "3F",
},
"location": "智能温室-A区",
},
"sensor": map[string]interface{}{
"temperature": 25.5,
@@ -338,6 +346,11 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
- **特点**:窗口的大小与时间无关,而是根据数据量来划分,适合对数据量进行分段处理。
- **应用场景**:在工业物联网中,每处理 100 条设备状态数据后进行一次聚合计算。
- **会话窗口Session Window**
- **定义**:基于数据活跃度的动态窗口,当数据之间的间隔超过指定的超时时间时,当前会话结束并触发窗口。
- **特点**:窗口大小动态变化,根据数据到达的间隔自动划分会话。当数据连续到达时,会话持续;当数据间隔超过超时时间时,会话结束并触发窗口。
- **应用场景**:在用户行为分析中,当用户连续操作时保持会话,当用户停止操作超过 5 分钟后关闭会话并统计该会话内的操作次数。
### 流Stream
- **定义**:流是数据的连续序列,数据以无界的方式产生,通常来自于传感器、日志系统、用户行为等。
@@ -346,16 +359,97 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
### 时间语义
- **事件时间Event Time**
- **定义**:数据实际发生的时间,通常由数据源生成的时间戳表示。
StreamSQL 支持两种时间概念,它们决定了窗口如何划分和触发:
#### 事件时间Event Time
- **定义**:事件时间是指数据实际产生的时间,通常记录在数据本身的某个字段中(如 `event_time``timestamp``order_time` 等)。
- **特点**
- 窗口基于数据中的时间戳字段值来划分
- 即使数据延迟到达,也能根据事件时间正确统计到对应的窗口
- 使用 Watermark 机制来处理乱序和延迟数据
- 结果准确,但可能有延迟(需要等待延迟数据)
- **使用场景**
- 需要精确时序分析的场景
- 数据可能乱序或延迟到达的场景
- 历史数据回放和分析
- **配置方法**:使用 `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` 指定事件时间字段
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
```
#### 处理时间Processing Time
- **定义**:处理时间是指数据到达 StreamSQL 处理系统的时间,即系统接收到数据时的当前时间。
- **特点**
- 窗口基于数据到达系统的时间(`time.Now()`)来划分
- 不管数据中的时间字段是什么值,都按到达时间统计到当前窗口
- 使用系统时钟Timer来触发窗口
- 延迟低,但结果可能不准确(无法处理乱序和延迟数据)
- **使用场景**
- 实时监控和告警场景
- 对延迟要求高,对准确性要求相对较低的场景
- 数据顺序到达且延迟可控的场景
- **配置方法**:不指定 `WITH (TIMESTAMP=...)` 时,默认使用处理时间
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- 不指定 WITH 子句,默认使用处理时间
```
#### 事件时间 vs 处理时间对比
| 特性 | 事件时间 (Event Time) | 处理时间 (Processing Time) |
|------|---------------------|-------------------------|
| **时间来源** | 数据中的时间戳字段 | 系统当前时间 |
| **窗口划分** | 基于事件时间戳 | 基于数据到达时间 |
| **延迟处理** | 支持Watermark机制 | 不支持 |
| **乱序处理** | 支持Watermark机制 | 不支持 |
| **结果准确性** | 准确 | 可能不准确 |
| **处理延迟** | 较高(需等待延迟数据) | 低(实时触发) |
| **配置方式** | `WITH (TIMESTAMP='field')` | 默认不指定WITH |
| **适用场景** | 精确时序分析、历史回放 | 实时监控、低延迟要求 |
#### 窗口时间
- **处理时间Processing Time**
- **定义**:数据到达处理系统的时间。
- **窗口开始时间Window Start Time**
- **定义**:基于事件时间,窗口的起始时间点。例如,对于一个基于事件时间的滑动窗口,窗口开始时间是窗口内最早事件的时间戳
- **事件时间窗口**:窗口的起始时间点,基于事件时间对齐到窗口边界(如对齐到分钟、小时的整点)
- **处理时间窗口**:窗口的起始时间点,基于数据到达系统的时间。
- **示例**:对于一个基于事件时间的滚动窗口 `TumblingWindow('5m')`窗口开始时间会对齐到5分钟的倍数如 10:00、10:05、10:10
- **窗口结束时间Window End Time**
- **定义**:基于事件时间,窗口的结束时间点通常窗口结束时间是窗口开始时间加上窗口的持续时间。
- 例如,一个滑动窗口的持续时间为 1 分钟,则窗口结束时间窗口开始时间加上 1 分钟
- **事件时间窗口**窗口的结束时间点通常是窗口开始时间加上窗口的持续时间。窗口在 `watermark >= window_end` 时触发。
- **处理时间窗口**:窗口的结束时间点,基于数据到达系统的时间加上窗口持续时间窗口在系统时钟到达结束时间时触发
- **示例**:一个持续时间为 1 分钟的滚动窗口,如果窗口开始时间是 10:00则窗口结束时间是 10:01。
#### Watermark 机制(仅事件时间窗口)
- **定义**Watermark 表示"小于该时间的事件不应该再到达",用于判断窗口是否可以触发。
- **计算公式**`Watermark = max(event_time) - MaxOutOfOrderness`
- **窗口触发条件**:当 `watermark >= window_end` 时,窗口触发
- **配置参数**
- `MAXOUTOFORDERNESS`允许的最大乱序时间用于容忍数据乱序默认0不允许乱序
- `ALLOWEDLATENESS`窗口触发后还能接受延迟数据的时间默认0不接受延迟数据
- `IDLETIMEOUT`:数据源空闲时,基于处理时间推进 Watermark 的超时时间默认0禁用
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- 容忍5秒的乱序
ALLOWEDLATENESS='2s', -- 窗口触发后还能接受2秒的延迟数据
IDLETIMEOUT='5s' -- 5秒无数据基于处理时间推进watermark
)
```
## 贡献指南
+34 -12
View File
@@ -4,12 +4,10 @@ import (
"github.com/rulego/streamsql/functions"
)
// 为了向后兼容重新导出functions模块中的类型和函数
// AggregateType 聚合类型重新导出functions.AggregateType
// AggregateType aggregate type, re-exports functions.AggregateType
type AggregateType = functions.AggregateType
// 重新导出所有聚合类型常量
// Re-export all aggregate type constants
const (
Sum = functions.Sum
Count = functions.Count
@@ -22,45 +20,69 @@ const (
WindowStart = functions.WindowStart
WindowEnd = functions.WindowEnd
Collect = functions.Collect
FirstValue = functions.FirstValue
LastValue = functions.LastValue
MergeAgg = functions.MergeAgg
StdDevS = functions.StdDevS
Deduplicate = functions.Deduplicate
Var = functions.Var
VarS = functions.VarS
// 分析函数
// Analytical functions
Lag = functions.Lag
Latest = functions.Latest
ChangedCol = functions.ChangedCol
HadChanged = functions.HadChanged
// 表达式聚合器,用于处理自定义函数
// Expression aggregator for handling custom functions
Expression = functions.Expression
// Post-aggregation marker
PostAggregation = functions.PostAggregation
)
// AggregatorFunction 聚合器函数接口,重新导出functions.LegacyAggregatorFunction
// AggregatorFunction aggregator function interface, re-exports functions.LegacyAggregatorFunction
type AggregatorFunction = functions.LegacyAggregatorFunction
// ContextAggregator 支持context机制的聚合器接口重新导出functions.ContextAggregator
// ContextAggregator aggregator interface supporting context mechanism, re-exports functions.ContextAggregator
type ContextAggregator = functions.ContextAggregator
// Register 添加自定义聚合器到全局注册表,重新导出functions.RegisterLegacyAggregator
// Register adds custom aggregator to global registry, re-exports functions.RegisterLegacyAggregator
func Register(name string, constructor func() AggregatorFunction) {
functions.RegisterLegacyAggregator(name, constructor)
}
// CreateBuiltinAggregator 创建内置聚合器,重新导出functions.CreateLegacyAggregator
// CreateBuiltinAggregator creates built-in aggregator, re-exports functions.CreateLegacyAggregator
func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction {
// 特殊处理expression类型
// Special handling for expression type
if aggType == "expression" {
return &ExpressionAggregatorWrapper{
function: functions.NewExpressionAggregatorFunction(),
}
}
// Special handling for post-aggregation type (placeholder aggregator)
if aggType == "post_aggregation" {
return &PostAggregationPlaceholder{}
}
return functions.CreateLegacyAggregator(aggType)
}
// ExpressionAggregatorWrapper 包装表达式聚合器使其兼容LegacyAggregatorFunction接口
// PostAggregationPlaceholder is a placeholder aggregator for post-aggregation fields
type PostAggregationPlaceholder struct{}
func (p *PostAggregationPlaceholder) New() AggregatorFunction {
return &PostAggregationPlaceholder{}
}
func (p *PostAggregationPlaceholder) Add(value interface{}) {
// Do nothing - this is just a placeholder
}
func (p *PostAggregationPlaceholder) Result() interface{} {
// Return nil - actual result will be computed in post-processing
return nil
}
// ExpressionAggregatorWrapper wraps expression aggregator to make it compatible with LegacyAggregatorFunction interface
type ExpressionAggregatorWrapper struct {
function *functions.ExpressionAggregatorFunction
}
+148
View File
@@ -0,0 +1,148 @@
package aggregator
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestPostAggregationPlaceholder 测试后聚合占位符的完整功能
func TestPostAggregationPlaceholder(t *testing.T) {
t.Run("测试PostAggregationPlaceholder基本功能", func(t *testing.T) {
// 创建PostAggregationPlaceholder实例
placeholder := &PostAggregationPlaceholder{}
require.NotNil(t, placeholder)
// 测试New方法
newPlaceholder := placeholder.New()
require.NotNil(t, newPlaceholder)
assert.IsType(t, &PostAggregationPlaceholder{}, newPlaceholder)
// 测试Add方法应该不做任何操作
placeholder.Add(10)
placeholder.Add("test")
placeholder.Add(nil)
placeholder.Add([]int{1, 2, 3})
// 测试Result方法应该返回nil
result := placeholder.Result()
assert.Nil(t, result)
})
t.Run("测试通过CreateBuiltinAggregator创建PostAggregationPlaceholder", func(t *testing.T) {
// 使用CreateBuiltinAggregator创建post_aggregation类型的聚合器
aggregator := CreateBuiltinAggregator(PostAggregation)
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
// 测试创建的聚合器功能
newAgg := aggregator.New()
require.NotNil(t, newAgg)
assert.IsType(t, &PostAggregationPlaceholder{}, newAgg)
// 测试添加各种类型的值
newAgg.Add(100)
newAgg.Add("string_value")
newAgg.Add(map[string]interface{}{"key": "value"})
// 验证结果始终为nil
result := newAgg.Result()
assert.Nil(t, result)
})
t.Run("测试PostAggregationPlaceholder的多实例独立性", func(t *testing.T) {
// 创建多个实例
placeholder1 := &PostAggregationPlaceholder{}
placeholder2 := placeholder1.New()
placeholder3 := placeholder1.New()
// 验证实例类型正确
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder1)
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder2)
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder3)
// 每个实例都应该返回nil
assert.Nil(t, placeholder1.Result())
assert.Nil(t, placeholder2.Result())
assert.Nil(t, placeholder3.Result())
// 验证Add操作不会影响结果因为是占位符
placeholder1.Add("test1")
placeholder2.Add("test2")
placeholder3.Add("test3")
assert.Nil(t, placeholder1.Result())
assert.Nil(t, placeholder2.Result())
assert.Nil(t, placeholder3.Result())
})
t.Run("测试PostAggregationPlaceholder在聚合场景中的使用", func(t *testing.T) {
// 创建包含PostAggregationPlaceholder的聚合字段
groupFields := []string{"category"}
aggFields := []AggregationField{
{InputField: "value", AggregateType: Sum, OutputAlias: "sum_value"},
{InputField: "placeholder_field", AggregateType: PostAggregation, OutputAlias: "post_agg_field"},
}
// 创建分组聚合器
agg := NewGroupAggregator(groupFields, aggFields)
require.NotNil(t, agg)
// 添加测试数据
testData := []map[string]interface{}{
{"category": "A", "value": 10, "placeholder_field": "should_be_ignored"},
{"category": "A", "value": 20, "placeholder_field": "also_ignored"},
{"category": "B", "value": 30, "placeholder_field": 999},
}
for _, data := range testData {
err := agg.Add(data)
assert.NoError(t, err)
}
// 获取结果
results, err := agg.GetResults()
assert.NoError(t, err)
assert.Len(t, results, 2)
// 验证PostAggregationPlaceholder字段的结果为nil
for _, result := range results {
assert.Contains(t, result, "post_agg_field")
assert.Nil(t, result["post_agg_field"])
// 验证正常聚合字段工作正常
assert.Contains(t, result, "sum_value")
assert.NotNil(t, result["sum_value"])
}
})
}
// TestCreateBuiltinAggregatorPostAggregation 测试CreateBuiltinAggregator对post_aggregation类型的处理
func TestCreateBuiltinAggregatorPostAggregation(t *testing.T) {
t.Run("测试post_aggregation类型聚合器创建", func(t *testing.T) {
aggregator := CreateBuiltinAggregator("post_aggregation")
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
})
t.Run("测试PostAggregation常量", func(t *testing.T) {
// 验证PostAggregation常量值
assert.Equal(t, AggregateType("post_aggregation"), PostAggregation)
// 使用常量创建聚合器
aggregator := CreateBuiltinAggregator(PostAggregation)
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
})
t.Run("测试与其他聚合类型的区别", func(t *testing.T) {
// 创建不同类型的聚合器
sumAgg := CreateBuiltinAggregator(Sum)
countAgg := CreateBuiltinAggregator(Count)
postAgg := CreateBuiltinAggregator(PostAggregation)
// 验证类型不同
assert.NotEqual(t, sumAgg, postAgg)
assert.NotEqual(t, countAgg, postAgg)
assert.IsType(t, &PostAggregationPlaceholder{}, postAgg)
})
}
+165
View File
@@ -0,0 +1,165 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Package aggregator provides data aggregation functionality for StreamSQL.
This package implements group-based aggregation operations for stream processing,
supporting various aggregation functions and expression evaluation. It provides
thread-safe aggregation with support for custom expressions and built-in functions.
# Core Features
• Group Aggregation - Group data by specified fields and apply aggregation functions
• Built-in Functions - Support for Sum, Count, Avg, Max, Min, and more
• Expression Support - Custom expression evaluation within aggregations
• Thread Safety - Concurrent aggregation operations with proper synchronization
• Type Flexibility - Automatic type conversion and validation
• Performance Optimized - Efficient memory usage and processing
# Aggregation Types
Supported aggregation functions (re-exported from functions package):
// Mathematical aggregations
Sum, Count, Avg, Max, Min
StdDev, StdDevS, Var, VarS
Median, Percentile
// Collection aggregations
Collect, LastValue, MergeAgg
Deduplicate
// Window aggregations
WindowStart, WindowEnd
// Analytical functions
Lag, Latest, ChangedCol, HadChanged
// Custom expressions
Expression
# Core Interfaces
Main aggregation interfaces:
type Aggregator interface {
Add(data interface{}) error
Put(key string, val interface{}) error
GetResults() ([]map[string]interface{}, error)
Reset()
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}
type AggregatorFunction interface {
New() AggregatorFunction
Add(value interface{})
Result() interface{}
}
# Aggregation Configuration
Field configuration for aggregations:
type AggregationField struct {
InputField string // Source field name
AggregateType AggregateType // Aggregation function type
OutputAlias string // Result field alias
}
# Usage Examples
Basic group aggregation:
// Define aggregation fields
aggFields := []AggregationField{
{InputField: "temperature", AggregateType: Avg, OutputAlias: "avg_temp"},
{InputField: "humidity", AggregateType: Max, OutputAlias: "max_humidity"},
{InputField: "device_id", AggregateType: Count, OutputAlias: "device_count"},
}
// Create group aggregator
aggregator := NewGroupAggregator([]string{"location"}, aggFields)
// Add data
data := map[string]interface{}{
"location": "room1",
"temperature": 25.5,
"humidity": 60,
"device_id": "sensor001",
}
aggregator.Add(data)
// Get results
results, err := aggregator.GetResults()
Expression-based aggregation:
// Register custom expression
aggregator.RegisterExpression(
"comfort_index",
"temperature * 0.7 + humidity * 0.3",
[]string{"temperature", "humidity"},
func(data interface{}) (interface{}, error) {
// Custom evaluation logic
return evaluateComfortIndex(data)
},
)
Multiple group aggregation:
// Group by multiple fields
aggregator := NewGroupAggregator(
[]string{"location", "device_type"},
aggFields,
)
// Results will be grouped by both location and device_type
results, err := aggregator.GetResults()
# Built-in Aggregators
Create built-in aggregation functions:
// Create specific aggregator
sumAgg := CreateBuiltinAggregator(Sum)
avgAgg := CreateBuiltinAggregator(Avg)
countAgg := CreateBuiltinAggregator(Count)
// Use aggregator
sumAgg.Add(10)
sumAgg.Add(20)
result := sumAgg.Result() // returns 30
# Custom Aggregators
Register custom aggregation functions:
Register("custom_avg", func() AggregatorFunction {
return &CustomAvgAggregator{}
})
# Integration
Integrates with other StreamSQL components:
• Functions package - Built-in aggregation function implementations
• Stream package - Real-time data aggregation in streams
• Window package - Window-based aggregation operations
• Types package - Data type definitions and conversions
• RSQL package - SQL GROUP BY and aggregation parsing
*/
package aggregator
+81 -38
View File
@@ -11,20 +11,21 @@ import (
"github.com/rulego/streamsql/utils/fieldpath"
)
// Aggregator aggregator interface
type Aggregator interface {
Add(data interface{}) error
Put(key string, val interface{}) error
GetResults() ([]map[string]interface{}, error)
Reset()
// RegisterExpression 注册表达式计算器
// RegisterExpression registers expression evaluator
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}
// AggregationField 定义单个聚合字段的配置
// AggregationField defines configuration for a single aggregation field
type AggregationField struct {
InputField string // 输入字段名(如 "temperature"
AggregateType AggregateType // 聚合类型(如 Sum, Avg
OutputAlias string // 输出别名(如 "temp_sum"
InputField string // Input field name (e.g., "temperature")
AggregateType AggregateType // Aggregation type (e.g., Sum, Avg)
OutputAlias string // Output alias (e.g., "temp_sum")
}
type GroupAggregator struct {
@@ -34,26 +35,26 @@ type GroupAggregator struct {
groups map[string]map[string]AggregatorFunction
mu sync.RWMutex
context map[string]interface{}
// 表达式计算器
// Expression evaluators
expressions map[string]*ExpressionEvaluator
}
// ExpressionEvaluator 包装表达式计算功能
// ExpressionEvaluator wraps expression evaluation functionality
type ExpressionEvaluator struct {
Expression string // 完整表达式
Field string // 主字段名
Fields []string // 表达式中引用的所有字段
Expression string // Complete expression
Field string // Primary field name
Fields []string // All fields referenced in expression
evaluateFunc func(data interface{}) (interface{}, error)
}
// NewGroupAggregator 创建新的分组聚合器
// NewGroupAggregator creates a new group aggregator
func NewGroupAggregator(groupFields []string, aggregationFields []AggregationField) *GroupAggregator {
aggregators := make(map[string]AggregatorFunction)
// 为每个聚合字段创建聚合器
// Create aggregator for each aggregation field
for i := range aggregationFields {
if aggregationFields[i].OutputAlias == "" {
// 如果没有指定别名,使用输入字段名
// If no alias specified, use input field name
aggregationFields[i].OutputAlias = aggregationFields[i].InputField
}
aggregators[aggregationFields[i].OutputAlias] = CreateBuiltinAggregator(aggregationFields[i].AggregateType)
@@ -68,7 +69,7 @@ func NewGroupAggregator(groupFields []string, aggregationFields []AggregationFie
}
}
// RegisterExpression 注册表达式计算器
// RegisterExpression registers expression evaluator
func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error)) {
ga.mu.Lock()
defer ga.mu.Unlock()
@@ -91,26 +92,26 @@ func (ga *GroupAggregator) Put(key string, val interface{}) error {
return nil
}
// isNumericAggregator 检查聚合器是否需要数值类型输入
// isNumericAggregator checks if aggregator requires numeric type input
func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
// 通过functions模块动态检查函数类型
// Dynamically check function type through functions module
if fn, exists := functions.Get(string(aggType)); exists {
switch fn.GetType() {
case functions.TypeMath:
// 数学函数通常需要数值输入
// Math functions usually require numeric input
return true
case functions.TypeAggregation:
// 检查是否是数值聚合函数
// Check if it's a numeric aggregation function
switch string(aggType) {
case functions.SumStr, functions.AvgStr, functions.MinStr, functions.MaxStr, functions.CountStr,
functions.StdDevStr, functions.MedianStr, functions.PercentileStr,
functions.VarStr, functions.VarSStr, functions.StdDevSStr:
return true
case functions.CollectStr, functions.MergeAggStr, functions.DeduplicateStr, functions.LastValueStr:
// 这些函数可以处理任意类型
// These functions can handle any type
return false
default:
// 对于未知的聚合函数,尝试检查函数名称模式
// For unknown aggregation functions, try to check function name patterns
funcName := string(aggType)
if strings.Contains(funcName, functions.SumStr) || strings.Contains(funcName, functions.AvgStr) ||
strings.Contains(funcName, functions.MinStr) || strings.Contains(funcName, functions.MaxStr) ||
@@ -120,15 +121,15 @@ func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
return false
}
case functions.TypeAnalytical:
// 分析函数通常可以处理任意类型
// Analytical functions can usually handle any type
return false
default:
// 其他类型的函数,保守起见认为不需要数值转换
// For other types of functions, conservatively assume no numeric conversion needed
return false
}
}
// 如果函数不存在,根据名称模式判断
// If function doesn't exist, judge by name pattern
funcName := string(aggType)
if strings.Contains(funcName, functions.SumStr) || strings.Contains(funcName, functions.AvgStr) ||
strings.Contains(funcName, functions.MinStr) || strings.Contains(funcName, functions.MaxStr) ||
@@ -139,9 +140,21 @@ func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
return false
}
// shouldAllowNullValues 判断聚合函数是否应该允许NULL值
func (ga *GroupAggregator) shouldAllowNullValues(aggType AggregateType) bool {
// FIRST_VALUE和LAST_VALUE函数应该允许NULL值因为它们需要记录第一个/最后一个值即使是NULL
return aggType == FirstValue || aggType == LastValue
}
func (ga *GroupAggregator) Add(data interface{}) error {
ga.mu.Lock()
defer ga.mu.Unlock()
// 检查数据是否为nil
if data == nil {
return fmt.Errorf("data cannot be nil")
}
var v reflect.Value
switch data.(type) {
@@ -153,6 +166,10 @@ func (ga *GroupAggregator) Add(data interface{}) error {
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
// 检查是否为支持的数据类型
if v.Kind() != reflect.Struct && v.Kind() != reflect.Map {
return fmt.Errorf("unsupported data type: %T, expected struct or map", data)
}
}
key := ""
@@ -160,11 +177,11 @@ func (ga *GroupAggregator) Add(data interface{}) error {
var fieldVal interface{}
var found bool
// 检查是否是嵌套字段
// Check if it's a nested field
if fieldpath.IsNestedField(field) {
fieldVal, found = fieldpath.GetNestedField(data, field)
} else {
// 原有的字段访问逻辑
// Original field access logic
var f reflect.Value
if v.Kind() == reflect.Map {
keyVal := reflect.ValueOf(field)
@@ -198,21 +215,21 @@ func (ga *GroupAggregator) Add(data interface{}) error {
ga.groups[key] = make(map[string]AggregatorFunction)
}
// 为每个字段创建聚合器实例
// Create aggregator instances for each field
for outputAlias, agg := range ga.aggregators {
if _, exists := ga.groups[key][outputAlias]; !exists {
ga.groups[key][outputAlias] = agg.New()
}
}
// 处理每个聚合字段
// Process each aggregation field
for _, aggField := range ga.aggregationFields {
outputAlias := aggField.OutputAlias
if outputAlias == "" {
outputAlias = aggField.InputField
}
// 检查是否有表达式计算器
// Check if there's an expression evaluator
if expr, hasExpr := ga.expressions[outputAlias]; hasExpr {
result, err := expr.evaluateFunc(data)
if err != nil {
@@ -227,23 +244,23 @@ func (ga *GroupAggregator) Add(data interface{}) error {
inputField := aggField.InputField
// 特殊处理count(*)的情况
// Special handling for count(*) case
if inputField == "*" {
// 对于count(*)直接添加1不需要获取具体字段值
// For count(*), directly add 1 without getting specific field value
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(1)
}
continue
}
// 获取字段值 - 支持嵌套字段
// Get field value - supports nested fields
var fieldVal interface{}
var found bool
if fieldpath.IsNestedField(inputField) {
fieldVal, found = fieldpath.GetNestedField(data, inputField)
} else {
// 原有的字段访问逻辑
// Original field access logic
var f reflect.Value
if v.Kind() == reflect.Map {
keyVal := reflect.ValueOf(inputField)
@@ -259,7 +276,7 @@ func (ga *GroupAggregator) Add(data interface{}) error {
}
if !found {
// 尝试从context中获取
// Try to get from context
if ga.context != nil {
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
if contextAgg, ok := groupAgg.(ContextAggregator); ok {
@@ -275,19 +292,31 @@ func (ga *GroupAggregator) Add(data interface{}) error {
aggType := aggField.AggregateType
// 动态检查是否需要数值转换
if ga.isNumericAggregator(aggType) {
// 对于数值聚合函数,尝试转换为数值类型
// Skip nil values for most aggregation functions, but allow FIRST_VALUE and LAST_VALUE to handle them
if fieldVal == nil && !ga.shouldAllowNullValues(aggType) {
continue
}
// Special handling for Count aggregator - it can handle any type
if aggType == Count {
// Count can handle any non-null value
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(fieldVal)
}
} else if ga.isNumericAggregator(aggType) {
// For numeric aggregation functions, try to convert to numeric type
if numVal, err := cast.ToFloat64E(fieldVal); err == nil {
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(numVal)
}
} else {
return fmt.Errorf("cannot convert field %s value %v to numeric type for aggregator %s", inputField, fieldVal, aggType)
}
} else {
// 对于非数值聚合函数,直接传递原始值
// For non-numeric aggregation functions, pass original value directly
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(fieldVal)
}
}
@@ -299,6 +328,15 @@ func (ga *GroupAggregator) Add(data interface{}) error {
func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error) {
ga.mu.RLock()
defer ga.mu.RUnlock()
// 如果既没有分组字段又没有聚合字段,但有数据被添加过,返回一个空的结果行
if len(ga.aggregationFields) == 0 && len(ga.groupFields) == 0 {
if len(ga.groups) > 0 {
return []map[string]interface{}{{}}, nil
}
return []map[string]interface{}{}, nil
}
result := make([]map[string]interface{}, 0, len(ga.groups))
for key, aggregators := range ga.groups {
group := make(map[string]interface{})
@@ -309,7 +347,12 @@ func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error) {
}
}
for field, agg := range aggregators {
group[field] = agg.Result()
result := agg.Result()
group[field] = result
// Debug: log aggregator results (can be removed in production)
// if strings.HasPrefix(field, "__") {
// fmt.Printf("Aggregator %s result: %v (%T)\n", field, result, result)
// }
}
result = append(result, group)
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+14 -14
View File
@@ -16,7 +16,7 @@ type ExprCondition struct {
}
func NewExprCondition(expression string) (Condition, error) {
// 添加自定义字符串函数支持(startsWithendsWithcontains是内置操作符)
// Add custom string function support (startsWith, endsWith, contains are built-in operators)
options := []expr.Option{
expr.Function("like_match", func(params ...any) (any, error) {
if len(params) != 2 {
@@ -60,22 +60,22 @@ func (ec *ExprCondition) Evaluate(env interface{}) bool {
return result.(bool)
}
// matchesLikePattern 实现LIKE模式匹配
// 支持%匹配任意字符序列和_匹配单个字符
// matchesLikePattern implements LIKE pattern matching
// Supports % (matches any character sequence) and _ (matches single character)
func matchesLikePattern(text, pattern string) bool {
return likeMatch(text, pattern, 0, 0)
}
// likeMatch 递归实现LIKE匹配算法
// likeMatch recursively implements LIKE matching algorithm
func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
// 如果模式已经匹配完成
// If pattern has been fully matched
if patternIndex >= len(pattern) {
return textIndex >= len(text) // 文本也应该匹配完成
return textIndex >= len(text) // Text should also be fully matched
}
// 如果文本已经结束,但模式还有非%字符,则不匹配
// If text has ended but pattern still has non-% characters, no match
if textIndex >= len(text) {
// 检查剩余的模式是否都是%
// Check if remaining pattern characters are all %
for i := patternIndex; i < len(pattern); i++ {
if pattern[i] != '%' {
return false
@@ -84,16 +84,16 @@ func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
return true
}
// 处理当前模式字符
// Process current pattern character
patternChar := pattern[patternIndex]
if patternChar == '%' {
// %可以匹配0个或多个字符
// 尝试匹配0个字符跳过%
// % can match 0 or more characters
// Try matching 0 characters (skip %)
if likeMatch(text, pattern, textIndex, patternIndex+1) {
return true
}
// 尝试匹配1个或多个字符
// Try matching 1 or more characters
for i := textIndex; i < len(text); i++ {
if likeMatch(text, pattern, i+1, patternIndex+1) {
return true
@@ -101,10 +101,10 @@ func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
}
return false
} else if patternChar == '_' {
// _匹配恰好一个字符
// _ matches exactly one character
return likeMatch(text, pattern, textIndex+1, patternIndex+1)
} else {
// 普通字符必须精确匹配
// Regular characters must match exactly
if text[textIndex] == patternChar {
return likeMatch(text, pattern, textIndex+1, patternIndex+1)
}
File diff suppressed because it is too large Load Diff
+111
View File
@@ -0,0 +1,111 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Package condition provides condition evaluation functionality for StreamSQL.
This package implements condition evaluation using the expr-lang library,
supporting complex boolean expressions for filtering and conditional logic.
It provides custom functions for SQL-like operations including LIKE pattern
matching and NULL checking.
# Core Features
Boolean Expression Evaluation - Evaluate complex boolean conditions
LIKE Pattern Matching - SQL-style pattern matching with % and _ wildcards
NULL Checking - Support for IS NULL and IS NOT NULL operations
Custom Functions - Extended function library for SQL compatibility
Type Safety - Automatic type conversion and validation
Performance Optimized - Compiled expressions for fast evaluation
# Condition Interface
Unified interface for condition evaluation:
type Condition interface {
Evaluate(env interface{}) bool
}
# Custom Functions
Built-in SQL-compatible functions:
// LIKE pattern matching
like_match(text, pattern) - SQL LIKE operation with % and _ wildcards
// NULL checking
is_null(value) - Check if value is NULL
is_not_null(value) - Check if value is not NULL
# Usage Examples
Basic condition evaluation:
condition, err := NewExprCondition("age >= 18 AND status == 'active'")
if err != nil {
log.Fatal(err)
}
data := map[string]interface{}{
"age": 25,
"status": "active",
}
result := condition.Evaluate(data) // returns true
LIKE pattern matching:
condition, err := NewExprCondition("like_match(name, 'John%')")
data := map[string]interface{}{"name": "John Smith"}
result := condition.Evaluate(data) // returns true
NULL checking:
condition, err := NewExprCondition("is_not_null(email)")
data := map[string]interface{}{"email": "user@example.com"}
result := condition.Evaluate(data) // returns true
Complex conditions:
condition, err := NewExprCondition(`
age >= 18 AND
like_match(email, '%@company.com') AND
is_not_null(department)
`)
# Pattern Matching
LIKE pattern matching supports:
% - Matches any sequence of characters (including empty)
_ - Matches exactly one character
Examples:
'John%' matches 'John', 'John Smith', 'Johnny'
'J_hn' matches 'John' but not 'Johan'
'%@gmail.com' matches any email ending with @gmail.com
# Integration
Integrates with other StreamSQL components:
Stream package - Data filtering and conditional processing
RSQL package - WHERE and HAVING clause evaluation
Types package - Data type handling and conversion
Expr package - Expression parsing and evaluation
*/
package condition
+130 -56
View File
@@ -15,23 +15,26 @@
*/
/*
Package streamsql 是一个轻量级的基于 SQL 的物联网边缘流处理引擎
Package streamsql is a lightweight, SQL-based IoT edge stream processing engine.
StreamSQL 提供了高效的无界数据流处理和分析能力支持多种窗口类型聚合函数
自定义函数以及与 RuleGo 生态的无缝集成
StreamSQL provides efficient unbounded data stream processing and analysis capabilities,
supporting multiple window types, aggregate functions, custom functions, and seamless
integration with the RuleGo ecosystem.
# 核心特性
# Core Features
轻量级设计 - 纯内存操作无外部依赖
SQL语法支持 - 使用熟悉的SQL语法处理流数据
多种窗口类型 - 滑动窗口滚动窗口计数窗口会话窗口
丰富的聚合函数 - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE等
插件式自定义函数 - 运行时动态注册支持8种函数类型
RuleGo生态集成 - 利用RuleGo组件扩展输入输出源
Lightweight design - Pure in-memory operations, no external dependencies
SQL syntax support - Process stream data using familiar SQL syntax
Multiple window types - Sliding, tumbling, counting, and session windows
Event time and processing time - Support both time semantics for accurate stream processing
Watermark mechanism - Handle out-of-order and late-arriving data with configurable tolerance
Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
Plugin-based custom functions - Runtime dynamic registration, supports 8 function types
RuleGo ecosystem integration - Extend input/output sources using RuleGo components
# 入门示例
# Getting Started
基本的流数据处理
Basic stream data processing:
package main
@@ -43,10 +46,10 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
)
func main() {
// 创建StreamSQL实例
// Create StreamSQL instance
ssql := streamsql.New()
// 定义SQL查询 - 每5秒按设备ID分组计算温度平均值
// Define SQL query - Calculate temperature average by device ID every 5 seconds
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
MIN(humidity) as min_humidity,
@@ -56,18 +59,18 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
WHERE deviceId != 'device3'
GROUP BY deviceId, TumblingWindow('5s')`
// 执行SQL创建流处理任务
// Execute SQL, create stream processing task
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// 添加结果处理回调
ssql.AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %v\n", result)
// Add result processing callback
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf("Aggregation result: %v\n", result)
})
// 模拟发送流数据
// Simulate sending stream data
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
@@ -75,7 +78,7 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
for {
select {
case <-ticker.C:
// 生成随机设备数据
// Generate random device data
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1),
"temperature": 20.0 + rand.Float64()*10,
@@ -86,36 +89,107 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
}
}()
// 运行30秒
// Run for 30 seconds
time.Sleep(30 * time.Second)
}
# 窗口函数
# Window Functions
StreamSQL 支持多种窗口类型
StreamSQL supports multiple window types:
// 滚动窗口 - 每5秒一个独立窗口
// Tumbling window - Independent window every 5 seconds
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')
// 滑动窗口 - 窗口大小30秒每10秒滑动一次
// Sliding window - 30-second window size, slides every 10 seconds
SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')
// 计数窗口 - 每100条记录一个窗口
// Counting window - One window per 100 records
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)
// 会话窗口 - 超时5分钟自动关闭会话
// Session window - Automatically closes session after 5-minute timeout
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
# 自定义函数
# Event Time vs Processing Time
StreamSQL 支持插件式自定义函数运行时动态注册
StreamSQL supports two time semantics for window processing:
// 注册温度转换函数
## Processing Time (Default)
Processing time uses the system clock when data arrives. Windows are triggered based on data arrival time:
// Processing time window (default)
SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m')
// Windows are triggered every 5 minutes based on when data arrives
## Event Time
Event time uses timestamps embedded in the data itself. Windows are triggered based on event timestamps,
allowing correct handling of out-of-order and late-arriving data:
// Event time window - Use 'order_time' field as event timestamp
SELECT COUNT(*) as order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP='order_time')
// Event time with integer timestamp (Unix milliseconds)
SELECT AVG(temperature) FROM stream
GROUP BY TumblingWindow('1m')
WITH (TIMESTAMP='event_time', TIMEUNIT='ms')
## Watermark and Late Data Handling
Event time windows use watermark mechanism to handle out-of-order and late data:
// Configure max out-of-orderness (tolerate 5 seconds of out-of-order data)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
MAXOUTOFORDERNESS='5s' // Watermark = max(event_time) - 5s
)
// Configure allowed lateness (accept late data for 2 seconds after window closes)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
ALLOWEDLATENESS='2s' // Window stays open for 2s after trigger
)
// Combine both configurations
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
MAXOUTOFORDERNESS='5s', // Tolerate 5s out-of-order before trigger
ALLOWEDLATENESS='2s' // Accept 2s late data after trigger
)
// Configure idle source mechanism (advance watermark based on processing time when data source is idle)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
IDLETIMEOUT='5s' // If no data arrives within 5s, watermark advances based on processing time
)
Key concepts:
MaxOutOfOrderness: Affects watermark calculation, delays window trigger to tolerate out-of-order data
AllowedLateness: Keeps window open after trigger to accept late data and update results
IdleTimeout: When data source is idle (no data arrives within timeout), watermark advances based on processing time to ensure windows can close
Watermark: Indicates that no events with timestamp less than watermark are expected
# Custom Functions
StreamSQL supports plugin-based custom functions with runtime dynamic registration:
// Register temperature conversion function
functions.RegisterCustomFunction(
"fahrenheit_to_celsius",
functions.TypeConversion,
"温度转换",
"华氏度转摄氏度",
"Temperature conversion",
"Fahrenheit to Celsius",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
f, _ := functions.ConvertToFloat64(args[0])
@@ -123,55 +197,55 @@ StreamSQL 支持插件式自定义函数,运行时动态注册:
},
)
// 立即在SQL中使用
// Use immediately in SQL
sql := `SELECT deviceId,
AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
FROM stream GROUP BY deviceId, TumblingWindow('5s')`
支持的自定义函数类型
TypeMath - 数学计算函数
TypeString - 字符串处理函数
TypeConversion - 类型转换函数
TypeDateTime - 时间日期函数
TypeAggregation - 聚合函数
TypeAnalytical - 分析函数
TypeWindow - 窗口函数
TypeCustom - 通用自定义函数
Supported custom function types:
TypeMath - Mathematical calculation functions
TypeString - String processing functions
TypeConversion - Type conversion functions
TypeDateTime - Date and time functions
TypeAggregation - Aggregate functions
TypeAnalytical - Analytical functions
TypeWindow - Window functions
TypeCustom - General custom functions
# 日志配置
# Log Configuration
StreamSQL 提供灵活的日志配置选项
StreamSQL provides flexible log configuration options:
// 设置日志级别
// Set log level
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
// 输出到文件
// Output to file
logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))
// 禁用日志(生产环境)
// Disable logging (production environment)
ssql := streamsql.New(streamsql.WithDiscardLog())
# 与RuleGo集成
# RuleGo Integration
StreamSQL提供了与RuleGo规则引擎的深度集成通过两个专用组件实现流式数据处理
StreamSQL provides deep integration with the RuleGo rule engine through two dedicated components for stream data processing:
streamTransform (x/streamTransform) - 流转换器处理非聚合SQL查询
streamAggregator (x/streamAggregator) - 流聚合器处理聚合SQL查询
streamTransform (x/streamTransform) - Stream transformer, handles non-aggregation SQL queries
streamAggregator (x/streamAggregator) - Stream aggregator, handles aggregation SQL queries
基本集成示例
Basic integration example:
package main
import (
"github.com/rulego/rulego"
"github.com/rulego/rulego/api/types"
// 注册StreamSQL组件
// Register StreamSQL components
_ "github.com/rulego/rulego-components/external/streamsql"
)
func main() {
// 规则链配置
// Rule chain configuration
ruleChainJson := `{
"ruleChain": {"id": "rule01"},
"metadata": {
@@ -196,10 +270,10 @@ StreamSQL提供了与RuleGo规则引擎的深度集成通过两个专用组
}
}`
// 创建规则引擎
// Create rule engine
ruleEngine, _ := rulego.New("rule01", []byte(ruleChainJson))
// 发送数据
// Send data
data := `{"deviceId":"sensor01","temperature":25.5}`
msg := types.NewMsg(0, "TELEMETRY", types.JSON, types.NewMetadata(), data)
ruleEngine.OnMsg(msg)
+1 -1
View File
@@ -51,7 +51,7 @@ func main() {
fmt.Println("✓ SQL执行成功")
// 5. 添加结果监听器
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf("📊 聚合结果: %v\n", result)
})
+47 -53
View File
@@ -83,19 +83,17 @@ func demonstrateArrayAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" 📋 数组索引访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备: %v\n", item["device"])
fmt.Printf(" 第个传感器度: %v°C\n", item["first_sensor_temp"])
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
fmt.Println()
}
for i, item := range result {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备: %v\n", item["device"])
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
fmt.Printf(" 第个传感器湿度: %v%%\n", item["second_sensor_humidity"])
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
fmt.Println()
}
})
@@ -168,20 +166,19 @@ func demonstrateMapKeyAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" 🗝️ Map键访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备ID: %v\n", item["device_id"])
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
fmt.Printf(" 应用版本: %v\n", item["app_version"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备ID: %v\n", item["device_id"])
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
fmt.Printf(" 应用版本: %v\n", item["app_version"])
fmt.Println()
}
})
@@ -266,20 +263,19 @@ func demonstrateComplexMixedAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" 🔄 混合复杂访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 建筑: %v\n", item["building"])
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 建筑: %v\n", item["building"])
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
fmt.Println()
}
})
@@ -325,19 +321,18 @@ func demonstrateNegativeIndexAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" ⬅️ 负数索引访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
fmt.Println()
}
})
@@ -372,20 +367,19 @@ func demonstrateArrayIndexAggregation(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" 📈 数组索引聚合计算结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["location"])
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
fmt.Printf(" 设备数量: %v\n", item["device_count"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["location"])
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
fmt.Printf(" 设备数量: %v\n", item["device_count"])
fmt.Println()
}
})
+6 -6
View File
@@ -127,7 +127,7 @@ func testBasicFiltering() {
}
// 添加结果处理函数
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 高温告警: %v\n", result)
})
@@ -172,7 +172,7 @@ func testAggregation() {
}
// 处理聚合结果
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 聚合结果: %v\n", result)
})
@@ -221,7 +221,7 @@ func testSlidingWindow() {
return
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 滑动窗口分析: %v\n", result)
})
@@ -262,7 +262,7 @@ func testNestedFields() {
return
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 嵌套字段结果: %v\n", result)
})
@@ -336,7 +336,7 @@ func testCustomFunctions() {
return
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 自定义函数结果: %v\n", result)
})
@@ -396,7 +396,7 @@ func testComplexQuery() {
return
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 复杂查询结果: %v\n", result)
})
+18 -18
View File
@@ -609,14 +609,14 @@ func testMathFunctions(ssql *streamsql.Streamsql) {
}
// 添加测试数据
testData := []interface{}{
map[string]interface{}{
testData := []map[string]interface{}{
{
"device": "sensor1",
"temperature": 68.0, // 华氏度
"radius": 5.0,
"x1": 0.0, "y1": 0.0, "x2": 3.0, "y2": 4.0, // 距离=5
},
map[string]interface{}{
{
"device": "sensor1",
"temperature": 86.0, // 华氏度
"radius": 10.0,
@@ -625,7 +625,7 @@ func testMathFunctions(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 数学函数结果: %v\n", result)
})
@@ -659,20 +659,20 @@ func testStringFunctions(ssql *streamsql.Streamsql) {
}
// 添加测试数据
testData := []interface{}{
map[string]interface{}{
testData := []map[string]interface{}{
{
"device": "sensor1",
"metadata": `{"version":"1.0","type":"temperature"}`,
"level": 3,
},
map[string]interface{}{
{
"device": "sensor2",
"metadata": `{"version":"2.0","type":"humidity"}`,
"level": 5,
},
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 字符串函数结果: %v\n", result)
})
@@ -702,20 +702,20 @@ func testConversionFunctions(ssql *streamsql.Streamsql) {
}
// 添加测试数据
testData := []interface{}{
map[string]interface{}{
testData := []map[string]interface{}{
{
"device": "server1",
"client_ip": "192.168.1.100",
"memory_usage": 1073741824, // 1GB
},
map[string]interface{}{
{
"device": "server2",
"client_ip": "10.0.0.50",
"memory_usage": 2147483648, // 2GB
},
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 转换函数结果: %v\n", result)
})
@@ -746,14 +746,14 @@ func testAggregateFunctions(ssql *streamsql.Streamsql) {
}
// 添加测试数据
testData := []interface{}{
map[string]interface{}{"device": "sensor1", "value": 2.0, "category": "A"},
map[string]interface{}{"device": "sensor1", "value": 8.0, "category": "A"},
map[string]interface{}{"device": "sensor1", "value": 32.0, "category": "B"},
map[string]interface{}{"device": "sensor1", "value": 128.0, "category": "A"},
testData := []map[string]interface{}{
{"device": "sensor1", "value": 2.0, "category": "A"},
{"device": "sensor1", "value": 8.0, "category": "A"},
{"device": "sensor1", "value": 32.0, "category": "B"},
{"device": "sensor1", "value": 128.0, "category": "A"},
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 聚合函数结果: %v\n", result)
})
+67 -74
View File
@@ -119,19 +119,18 @@ func demonstrateBasicNestedAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" 📋 基础嵌套字段访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 设备位置: %v\n", item["device.location"])
fmt.Printf(" 温度: %v°C\n", item["sensor.temperature"])
fmt.Printf(" 湿度: %v%%\n", item["sensor.humidity"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 设备位置: %v\n", item["device.location"])
fmt.Printf(" 温度: %v°C\n", item["sensor.temperature"])
fmt.Printf(" 湿度: %v%%\n", item["sensor.humidity"])
fmt.Println()
}
})
@@ -166,20 +165,19 @@ func demonstrateNestedAggregation(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" 📈 嵌套字段聚合结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["device.location"])
fmt.Printf(" 平均温度: %.2f°C\n", item["avg_temp"])
fmt.Printf(" 最大湿度: %.1f%%\n", item["max_humidity"])
fmt.Printf(" 传感器数量: %v\n", item["sensor_count"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["device.location"])
fmt.Printf(" 平均温度: %.2f°C\n", item["avg_temp"])
fmt.Printf(" 最大湿度: %.1f%%\n", item["max_humidity"])
fmt.Printf(" 传感器数量: %v\n", item["sensor_count"])
fmt.Println()
}
})
@@ -271,19 +269,18 @@ func demonstrateArrayAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" 📋 数组索引访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备: %v\n", item["device"])
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备: %v\n", item["device"])
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
fmt.Println()
}
})
@@ -356,20 +353,19 @@ func demonstrateMapKeyAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" 🗝️ Map键访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备ID: %v\n", item["device_id"])
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
fmt.Printf(" 应用版本: %v\n", item["app_version"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备ID: %v\n", item["device_id"])
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
fmt.Printf(" 应用版本: %v\n", item["app_version"])
fmt.Println()
}
})
@@ -454,20 +450,19 @@ func demonstrateComplexMixedAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" 🔄 混合复杂访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 建筑: %v\n", item["building"])
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 建筑: %v\n", item["building"])
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
fmt.Println()
}
})
@@ -513,19 +508,18 @@ func demonstrateNegativeIndexAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" ⬅️ 负数索引访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
fmt.Println()
}
})
@@ -560,20 +554,19 @@ func demonstrateArrayIndexAggregation(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done()
fmt.Println(" 📈 数组索引聚合计算结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["location"])
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
fmt.Printf(" 设备数量: %v\n", item["device_count"])
fmt.Println()
}
resultSlice := result
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["location"])
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
fmt.Printf(" 设备数量: %v\n", item["device_count"])
fmt.Println()
}
})
+6 -6
View File
@@ -62,7 +62,7 @@ func demonstrateDataCleaning() {
}
// 结果处理
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 清洗后数据: %+v\n", result)
})
@@ -103,7 +103,7 @@ func demonstrateDataEnrichment() {
panic(err)
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 富化后数据: %+v\n", result)
})
@@ -147,7 +147,7 @@ func demonstrateRealTimeAlerting() {
panic(err)
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 🚨 告警事件: %+v\n", result)
})
@@ -191,7 +191,7 @@ func demonstrateDataFormatConversion() {
panic(err)
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 格式转换结果: %+v\n", result)
})
@@ -230,7 +230,7 @@ func demonstrateDataRouting() {
panic(err)
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 路由结果: %+v\n", result)
})
@@ -273,7 +273,7 @@ func demonstrateNestedFieldProcessing() {
panic(err)
}
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 嵌套字段处理结果: %+v\n", result)
})
+26 -36
View File
@@ -35,11 +35,9 @@ func demo1() {
panic(err)
}
ssql.AddSink(func(result interface{}) {
if results, ok := result.([]map[string]interface{}); ok {
for _, data := range results {
fmt.Printf("发现空值数据: %+v\n", data)
}
ssql.AddSink(func(result []map[string]interface{}) {
for _, data := range result {
fmt.Printf("发现空值数据: %+v\n", data)
}
})
@@ -75,11 +73,9 @@ func demo2() {
panic(err)
}
ssql.AddSink(func(result interface{}) {
if results, ok := result.([]map[string]interface{}); ok {
for _, data := range results {
fmt.Printf("发现有效数据: %+v\n", data)
}
ssql.AddSink(func(result []map[string]interface{}) {
for _, data := range result {
fmt.Printf("发现有效数据: %+v\n", data)
}
})
@@ -115,16 +111,14 @@ func demo3() {
panic(err)
}
ssql.AddSink(func(result interface{}) {
if results, ok := result.([]map[string]interface{}); ok {
for _, data := range results {
status := data["status"]
value := data["value"]
if status != nil {
fmt.Printf("状态非空的数据: %+v\n", data)
} else if value == nil {
fmt.Printf("值为空的数据: %+v\n", data)
}
ssql.AddSink(func(result []map[string]interface{}) {
for _, data := range result {
status := data["status"]
value := data["value"]
if status != nil {
fmt.Printf("状态非空的数据: %+v\n", data)
} else if value == nil {
fmt.Printf("值为空的数据: %+v\n", data)
}
}
})
@@ -162,18 +156,16 @@ func demo4() {
panic(err)
}
ssql.AddSink(func(result interface{}) {
if results, ok := result.([]map[string]interface{}); ok {
for _, data := range results {
value := data["value"]
status := data["status"]
priority := data["priority"]
ssql.AddSink(func(result []map[string]interface{}) {
for _, data := range result {
value := data["value"]
status := data["status"]
priority := data["priority"]
if value != nil && value.(float64) > 20 {
fmt.Printf("高值数据 (value > 20): %+v\n", data)
} else if status == nil && priority != nil {
fmt.Printf("状态异常但有优先级的数据: %+v\n", data)
}
if value != nil && value.(float64) > 20 {
fmt.Printf("高值数据 (value > 20): %+v\n", data)
} else if status == nil && priority != nil {
fmt.Printf("状态异常但有优先级的数据: %+v\n", data)
}
}
})
@@ -212,11 +204,9 @@ func demo5() {
panic(err)
}
ssql.AddSink(func(result interface{}) {
if results, ok := result.([]map[string]interface{}); ok {
for _, data := range results {
fmt.Printf("有位置信息的设备: %+v\n", data)
}
ssql.AddSink(func(result []map[string]interface{}) {
for _, data := range result {
fmt.Printf("有位置信息的设备: %+v\n", data)
}
})
-233
View File
@@ -1,233 +0,0 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/rulego/streamsql/stream"
"github.com/rulego/streamsql/types"
)
func main() {
fmt.Println("=== StreamSQL 持久化功能测试 ===")
// 清理之前的测试数据
cleanupTestData()
// 测试1: 创建持久化流并模拟数据溢出
fmt.Println("📌 测试1: 数据溢出持久化")
testDataOverflowPersistence()
// 测试2: 模拟程序重启和数据恢复
fmt.Println("📌 测试2: 程序重启数据恢复")
testDataRecovery()
// 测试3: 查看持久化文件内容
fmt.Println("📌 测试3: 持久化文件分析")
analyzePersistenceFiles()
fmt.Println("✅ 真正持久化功能测试完成!")
}
func testDataOverflowPersistence() {
config := types.Config{
SimpleFields: []string{"id", "value"},
}
// 创建小缓冲区的持久化流处理器
stream, err := stream.NewStreamWithLossPolicy(
config,
100, // 很小的缓冲区,容易溢出
100, // 小结果缓冲区
50, // 小sink池
"persist", // 持久化策略
5*time.Second,
)
if err != nil {
fmt.Printf("创建流失败: %v\n", err)
return
}
stream.Start()
// 快速发送大量数据,触发溢出
inputCount := 1000
fmt.Printf("快速发送 %d 条数据到小缓冲区 (容量100)...\n", inputCount)
start := time.Now()
for i := 0; i < inputCount; i++ {
data := map[string]interface{}{
"id": i,
"value": fmt.Sprintf("data_%d", i),
}
stream.Emit(data)
}
duration := time.Since(start)
// 等待持久化完成
fmt.Println("等待持久化操作完成...")
time.Sleep(8 * time.Second)
// 获取统计信息
stats := stream.GetDetailedStats()
persistStats := stream.GetPersistenceStats()
fmt.Printf("⏱️ 发送耗时: %v\n", duration)
fmt.Printf("📊 输入数据: %d\n", stats["basic_stats"].(map[string]int64)["input_count"])
fmt.Printf("📊 处理数据: %d\n", stats["basic_stats"].(map[string]int64)["output_count"])
fmt.Printf("📊 通道容量: %d\n", stats["basic_stats"].(map[string]int64)["data_chan_cap"])
fmt.Printf("📊 持久化启用: %v\n", persistStats["enabled"])
fmt.Printf("📊 待写入数据: %v\n", persistStats["pending_count"])
fmt.Printf("📊 当前文件大小: %v bytes\n", persistStats["current_file_size"])
fmt.Printf("📊 文件索引: %v\n", persistStats["file_index"])
stream.Stop()
}
func testDataRecovery() {
config := types.Config{
SimpleFields: []string{"id", "value"},
}
// 创建新的持久化流处理器(模拟程序重启)
stream, err := stream.NewStreamWithLossPolicy(
config,
200, // 更大的缓冲区用于恢复
200,
100,
"persist", // 持久化策略
5*time.Second,
)
if err != nil {
fmt.Printf("创建流失败: %v\n", err)
return
}
stream.Start()
// 添加sink来接收恢复的数据
recoveredCount := 0
stream.AddSink(func(data interface{}) {
recoveredCount++
if recoveredCount <= 5 {
fmt.Printf("恢复数据 %d: %+v\n", recoveredCount, data)
}
})
// 尝试加载并重新处理持久化数据
fmt.Println("尝试加载持久化数据...")
if err := stream.LoadAndReprocessPersistedData(); err != nil {
fmt.Printf("数据恢复失败: %v\n", err)
}
// 等待处理完成
time.Sleep(3 * time.Second)
stats := stream.GetDetailedStats()
fmt.Printf("📊 恢复后处理数据: %d\n", stats["basic_stats"].(map[string]int64)["output_count"])
fmt.Printf("📊 接收到的恢复数据: %d\n", recoveredCount)
stream.Stop()
}
func analyzePersistenceFiles() {
dataDir := "./streamsql_overflow_data"
// 检查持久化目录
if _, err := os.Stat(dataDir); os.IsNotExist(err) {
fmt.Println("持久化目录不存在")
return
}
// 列出所有持久化文件
files, err := filepath.Glob(filepath.Join(dataDir, "streamsql_overflow_*.log"))
if err != nil {
fmt.Printf("读取持久化文件失败: %v\n", err)
return
}
if len(files) == 0 {
fmt.Println("没有找到持久化文件(可能已被恢复过程删除)")
return
}
fmt.Printf("发现 %d 个持久化文件:\n", len(files))
for i, file := range files {
info, err := os.Stat(file)
if err != nil {
fmt.Printf(" %d. %s (无法读取文件信息)\n", i+1, filepath.Base(file))
continue
}
fmt.Printf(" %d. %s (大小: %d bytes, 修改时间: %s)\n",
i+1, filepath.Base(file), info.Size(), info.ModTime().Format("15:04:05"))
}
// 读取第一个文件的前几行内容
if len(files) > 0 {
fmt.Printf("\n第一个文件的前3行内容:\n")
showFileContent(files[0], 3)
}
}
func showFileContent(filename string, maxLines int) {
file, err := os.Open(filename)
if err != nil {
fmt.Printf("无法打开文件: %v\n", err)
return
}
defer file.Close()
buffer := make([]byte, 1024)
n, err := file.Read(buffer)
if err != nil {
fmt.Printf("无法读取文件: %v\n", err)
return
}
content := string(buffer[:n])
lines := []rune(content)
lineCount := 0
currentLine := ""
for _, char := range lines {
if char == '\n' {
lineCount++
fmt.Printf(" %d: %s\n", lineCount, currentLine)
currentLine = ""
if lineCount >= maxLines {
break
}
} else {
currentLine += string(char)
}
}
if currentLine != "" && lineCount < maxLines {
fmt.Printf(" %d: %s\n", lineCount+1, currentLine)
}
}
func cleanupTestData() {
dataDir := "./streamsql_overflow_data"
if err := os.RemoveAll(dataDir); err != nil {
fmt.Printf("清理测试数据失败: %v\n", err)
}
}
+9 -9
View File
@@ -122,19 +122,19 @@ func testSimpleQuery(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 简单查询结果: %v\n", result)
})
// 添加测试数据
testData := []interface{}{
map[string]interface{}{
testData := []map[string]interface{}{
{
"device": "sensor1",
"value": 5.0,
"temperature": 68.0, // 华氏度
"radius": 3.0,
},
map[string]interface{}{
{
"device": "sensor2",
"value": 10.0,
"temperature": 86.0, // 华氏度
@@ -171,25 +171,25 @@ func testAggregateQuery(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.AddSink(func(result interface{}) {
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 聚合查询结果: %v\n", result)
})
// 添加测试数据
testData := []interface{}{
map[string]interface{}{
testData := []map[string]interface{}{
{
"device": "sensor1",
"value": 3.0,
"temperature": 32.0, // 0°C
"radius": 1.0,
},
map[string]interface{}{
{
"device": "sensor1",
"value": 4.0,
"temperature": 212.0, // 100°C
"radius": 2.0,
},
map[string]interface{}{
{
"device": "sensor2",
"value": 5.0,
"temperature": 68.0, // 20°C
+1 -1
View File
@@ -79,4 +79,4 @@ func main() {
time.Sleep(3 * time.Second)
fmt.Println("\n=== 示例结束 ===")
}
}
-218
View File
@@ -1,218 +0,0 @@
package main
import (
"fmt"
"log"
"strings"
"time"
"github.com/rulego/streamsql/stream"
"github.com/rulego/streamsql/types"
)
func main() {
fmt.Println("=== StreamSQL 统一配置系统演示 ===")
// 1. 使用新的配置API创建默认配置Stream
fmt.Println("\n1. 默认配置Stream:")
defaultConfig := types.NewConfig()
defaultConfig.SimpleFields = []string{"temperature", "humidity", "location"}
defaultStream, err := stream.NewStream(defaultConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("默认配置", defaultStream)
// 2. 使用高性能预设配置
fmt.Println("\n2. 高性能配置Stream:")
highPerfConfig := types.NewConfigWithPerformance(types.HighPerformanceConfig())
highPerfConfig.SimpleFields = []string{"temperature", "humidity", "location"}
highPerfStream, err := stream.NewStreamWithHighPerformance(highPerfConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("高性能配置", highPerfStream)
// 3. 使用低延迟预设配置
fmt.Println("\n3. 低延迟配置Stream:")
lowLatencyConfig := types.NewConfigWithPerformance(types.LowLatencyConfig())
lowLatencyConfig.SimpleFields = []string{"temperature", "humidity", "location"}
lowLatencyStream, err := stream.NewStreamWithLowLatency(lowLatencyConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("低延迟配置", lowLatencyStream)
// 4. 使用零数据丢失预设配置
fmt.Println("\n4. 零数据丢失配置Stream:")
zeroLossConfig := types.NewConfigWithPerformance(types.ZeroDataLossConfig())
zeroLossConfig.SimpleFields = []string{"temperature", "humidity", "location"}
zeroLossStream, err := stream.NewStreamWithZeroDataLoss(zeroLossConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("零数据丢失配置", zeroLossStream)
// 5. 使用持久化预设配置
fmt.Println("\n5. 持久化配置Stream:")
persistConfig := types.NewConfigWithPerformance(types.PersistencePerformanceConfig())
persistConfig.SimpleFields = []string{"temperature", "humidity", "location"}
persistStream, err := stream.NewStreamWithCustomPerformance(persistConfig, types.PersistencePerformanceConfig())
if err != nil {
log.Fatal(err)
}
printStreamStats("持久化配置", persistStream)
// 6. 创建完全自定义的配置
fmt.Println("\n6. 自定义配置Stream:")
customPerfConfig := types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 30000,
ResultChannelSize: 25000,
WindowOutputSize: 3000,
EnableDynamicResize: true,
MaxBufferSize: 200000,
UsageThreshold: 0.85,
},
OverflowConfig: types.OverflowConfig{
Strategy: "expand",
BlockTimeout: 15 * time.Second,
AllowDataLoss: false,
ExpansionConfig: types.ExpansionConfig{
GrowthFactor: 2.0,
MinIncrement: 2000,
TriggerThreshold: 0.9,
ExpansionTimeout: 3 * time.Second,
},
},
WorkerConfig: types.WorkerConfig{
SinkPoolSize: 800,
SinkWorkerCount: 12,
MaxRetryRoutines: 10,
},
MonitoringConfig: types.MonitoringConfig{
EnableMonitoring: true,
StatsUpdateInterval: 500 * time.Millisecond,
EnableDetailedStats: true,
WarningThresholds: types.WarningThresholds{
DropRateWarning: 5.0,
DropRateCritical: 15.0,
BufferUsageWarning: 75.0,
BufferUsageCritical: 90.0,
},
},
}
customConfig := types.NewConfigWithPerformance(customPerfConfig)
customConfig.SimpleFields = []string{"temperature", "humidity", "location"}
customStream, err := stream.NewStreamWithCustomPerformance(customConfig, customPerfConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("自定义配置", customStream)
// 7. 配置比较演示
fmt.Println("\n7. 配置比较:")
compareConfigurations()
// 8. 实时数据处理演示
fmt.Println("\n8. 实时数据处理演示:")
demonstrateRealTimeProcessing(defaultStream)
// 9. 窗口统一配置演示
fmt.Println("\n9. 窗口统一配置演示:")
demonstrateWindowConfig()
// 清理资源
fmt.Println("\n10. 清理资源...")
defaultStream.Stop()
highPerfStream.Stop()
lowLatencyStream.Stop()
zeroLossStream.Stop()
persistStream.Stop()
customStream.Stop()
fmt.Println("\n=== 演示完成 ===")
}
func printStreamStats(name string, s *stream.Stream) {
stats := s.GetStats()
detailedStats := s.GetDetailedStats()
fmt.Printf("【%s】统计信息:\n", name)
fmt.Printf(" 数据通道: %d/%d (使用率: %.1f%%)\n",
stats["data_chan_len"], stats["data_chan_cap"],
detailedStats["data_chan_usage"])
fmt.Printf(" 结果通道: %d/%d (使用率: %.1f%%)\n",
stats["result_chan_len"], stats["result_chan_cap"],
detailedStats["result_chan_usage"])
fmt.Printf(" 工作池: %d/%d (使用率: %.1f%%)\n",
stats["sink_pool_len"], stats["sink_pool_cap"],
detailedStats["sink_pool_usage"])
fmt.Printf(" 性能等级: %s\n", detailedStats["performance_level"])
}
func compareConfigurations() {
configs := map[string]types.PerformanceConfig{
"默认配置": types.DefaultPerformanceConfig(),
"高性能配置": types.HighPerformanceConfig(),
"低延迟配置": types.LowLatencyConfig(),
"零丢失配置": types.ZeroDataLossConfig(),
"持久化配置": types.PersistencePerformanceConfig(),
}
fmt.Printf("%-12s %-10s %-10s %-10s %-10s %-15s\n",
"配置类型", "数据缓冲", "结果缓冲", "工作池", "工作线程", "溢出策略")
fmt.Println(strings.Repeat("-", 75))
for name, config := range configs {
fmt.Printf("%-12s %-10d %-10d %-10d %-10d %-15s\n",
name,
config.BufferConfig.DataChannelSize,
config.BufferConfig.ResultChannelSize,
config.WorkerConfig.SinkPoolSize,
config.WorkerConfig.SinkWorkerCount,
config.OverflowConfig.Strategy)
}
}
func demonstrateRealTimeProcessing(s *stream.Stream) {
// 设置数据接收器
s.AddSink(func(data interface{}) {
fmt.Printf(" 接收到处理结果: %v\n", data)
})
// 启动流处理
s.Start()
// 模拟发送数据
for i := 0; i < 3; i++ {
data := map[string]interface{}{
"temperature": 20.0 + float64(i)*2.5,
"humidity": 60.0 + float64(i)*5,
"location": fmt.Sprintf("sensor_%d", i+1),
"timestamp": time.Now().Unix(),
}
fmt.Printf(" 发送数据: %v\n", data)
s.Emit(data)
time.Sleep(100 * time.Millisecond)
}
// 等待处理完成
time.Sleep(200 * time.Millisecond)
// 显示最终统计
finalStats := s.GetDetailedStats()
fmt.Printf(" 最终统计 - 输入: %d, 输出: %d, 丢弃: %d, 处理率: %.1f%%\n",
finalStats["basic_stats"].(map[string]int64)["input_count"],
finalStats["basic_stats"].(map[string]int64)["output_count"],
finalStats["basic_stats"].(map[string]int64)["dropped_count"],
finalStats["process_rate"])
}
@@ -1,74 +0,0 @@
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/types"
)
// demonstrateWindowConfig 演示窗口统一配置的使用
func demonstrateWindowConfig() {
fmt.Println("=== 窗口统一配置演示 ===")
// 1. 测试默认配置的窗口
fmt.Println("\n1. 默认配置窗口测试")
testWindowWithConfig("默认配置", streamsql.New())
// 2. 测试高性能配置的窗口
fmt.Println("\n2. 高性能配置窗口测试")
testWindowWithConfig("高性能配置", streamsql.New(streamsql.WithHighPerformance()))
// 3. 测试低延迟配置的窗口
fmt.Println("\n3. 低延迟配置窗口测试")
testWindowWithConfig("低延迟配置", streamsql.New(streamsql.WithLowLatency()))
// 4. 测试自定义配置的窗口
fmt.Println("\n4. 自定义配置窗口测试")
customConfig := types.DefaultPerformanceConfig()
customConfig.BufferConfig.WindowOutputSize = 2000 // 自定义窗口输出缓冲区大小
testWindowWithConfig("自定义配置", streamsql.New(streamsql.WithCustomPerformance(customConfig)))
fmt.Println("\n=== 窗口配置演示完成 ===")
}
func testWindowWithConfig(configName string, ssql *streamsql.Streamsql) {
// 执行一个简单的滚动窗口查询
sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('2s')"
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ %s - 执行SQL失败: %v\n", configName, err)
return
}
// 添加结果处理器
stream := ssql.Stream()
if stream != nil {
stream.AddSink(func(result interface{}) {
fmt.Printf("📊 %s - 窗口结果: %v\n", configName, result)
})
// 发送测试数据
for i := 0; i < 5; i++ {
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device_%d", i%2),
"temperature": 20.0 + float64(i),
"timestamp": time.Now(),
}
ssql.Emit(data)
}
// 等待处理完成
time.Sleep(3 * time.Second)
// 获取统计信息
stats := ssql.GetDetailedStats()
fmt.Printf("📈 %s - 统计信息: %v\n", configName, stats)
}
// 停止流处理
ssql.Stop()
fmt.Printf("✅ %s - 测试完成\n", configName)
}
+245
View File
@@ -0,0 +1,245 @@
package expr
import (
"fmt"
"strings"
)
// parseCaseExpression parses CASE expression
func parseCaseExpression(tokens []string) (*ExprNode, []string, error) {
if len(tokens) == 0 || strings.ToUpper(tokens[0]) != "CASE" {
return nil, nil, fmt.Errorf("expected CASE keyword")
}
remaining := tokens[1:]
caseExpr := &CaseExpression{}
// Check if it's a simple CASE expression (CASE expr WHEN value THEN result)
if len(remaining) > 0 && strings.ToUpper(remaining[0]) != "WHEN" {
// Simple CASE expression
value, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing CASE expression: %v", err)
}
caseExpr.Value = value
remaining = newRemaining
}
// Parse WHEN clauses
for len(remaining) > 0 && strings.ToUpper(remaining[0]) == "WHEN" {
remaining = remaining[1:] // Skip WHEN
// Parse WHEN condition
condition, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing WHEN condition: %v", err)
}
remaining = newRemaining
// Check THEN keyword
if len(remaining) == 0 || strings.ToUpper(remaining[0]) != "THEN" {
return nil, nil, fmt.Errorf("expected THEN after WHEN condition")
}
remaining = remaining[1:] // Skip THEN
// Parse THEN result
result, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing THEN result: %v", err)
}
remaining = newRemaining
// Add WHEN clause
caseExpr.WhenClauses = append(caseExpr.WhenClauses, WhenClause{
Condition: condition,
Result: result,
})
}
// Parse optional ELSE clause
if len(remaining) > 0 && strings.ToUpper(remaining[0]) == "ELSE" {
remaining = remaining[1:] // Skip ELSE
elseExpr, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing ELSE expression: %v", err)
}
caseExpr.ElseResult = elseExpr
remaining = newRemaining
}
// Check END keyword
if len(remaining) == 0 || strings.ToUpper(remaining[0]) != "END" {
return nil, nil, fmt.Errorf("expected END to close CASE expression")
}
// Create ExprNode containing CaseExpression
caseNode := &ExprNode{
Type: TypeCase,
CaseExpr: caseExpr,
}
return caseNode, remaining[1:], nil
}
// evaluateCaseExpression evaluates the value of CASE expression
func evaluateCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
if node.Type != TypeCase {
return 0, fmt.Errorf("not a CASE expression")
}
if node.CaseExpr == nil {
return 0, fmt.Errorf("invalid CASE expression")
}
// Simple CASE expression: CASE expr WHEN value THEN result
if node.CaseExpr.Value != nil {
return evaluateSimpleCaseExpression(node, data)
}
// Search CASE expression: CASE WHEN condition THEN result
return evaluateSearchCaseExpression(node, data)
}
// evaluateSimpleCaseExpression evaluates simple CASE expression
func evaluateSimpleCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
caseExpr := node.CaseExpr
if caseExpr == nil {
return 0, fmt.Errorf("invalid CASE expression")
}
// Evaluate CASE expression value
caseValue, err := evaluateNodeValue(caseExpr.Value, data)
if err != nil {
return 0, err
}
// Iterate through WHEN clauses
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN value
whenValue, err := evaluateNodeValue(whenClause.Condition, data)
if err != nil {
return 0, err
}
// Compare values
if compareValuesForEquality(caseValue, whenValue) {
// Evaluate and return THEN result
return evaluateNode(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNode(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL (return 0 here)
return 0, nil
}
// evaluateSearchCaseExpression evaluates search CASE expression
func evaluateSearchCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
caseExpr := node.CaseExpr
if caseExpr == nil {
return 0, fmt.Errorf("invalid CASE expression")
}
// Iterate through WHEN clauses
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN condition - use boolean evaluation to handle logical operators
conditionResult, err := evaluateBoolNode(whenClause.Condition, data)
if err != nil {
return 0, err
}
// If condition is true, return THEN result
if conditionResult {
return evaluateNode(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNode(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL (return 0 here)
return 0, nil
}
// evaluateCaseExpressionWithNull evaluates CASE expression with NULL value support
func evaluateCaseExpressionWithNull(node *ExprNode, data map[string]interface{}) (interface{}, bool, error) {
if node.Type != TypeCase {
return nil, false, fmt.Errorf("not a CASE expression")
}
caseExpr := node.CaseExpr
if caseExpr == nil {
return nil, false, fmt.Errorf("invalid CASE expression")
}
// Simple CASE expression: CASE expr WHEN value THEN result
if caseExpr.Value != nil {
return evaluateCaseExpressionValueWithNull(node, data)
}
// Search CASE expression: CASE WHEN condition THEN result
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN condition - use boolean evaluation to handle logical operators
conditionResult, err := evaluateBoolNode(whenClause.Condition, data)
if err != nil {
return nil, false, err
}
// If condition is true, return THEN result
if conditionResult {
return evaluateNodeValueWithNull(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNodeValueWithNull(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL
return nil, true, nil
}
// evaluateCaseExpressionValueWithNull evaluates simple CASE expression (with NULL support)
func evaluateCaseExpressionValueWithNull(node *ExprNode, data map[string]interface{}) (interface{}, bool, error) {
caseExpr := node.CaseExpr
if caseExpr == nil {
return nil, false, fmt.Errorf("invalid CASE expression")
}
// Evaluate CASE expression value
caseValue, caseIsNull, err := evaluateNodeValueWithNull(caseExpr.Value, data)
if err != nil {
return nil, false, err
}
// Iterate through WHEN clauses
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN value
whenValue, whenIsNull, err := evaluateNodeValueWithNull(whenClause.Condition, data)
if err != nil {
return nil, false, err
}
// Compare values (with NULL comparison support)
if compareValuesWithNullForEquality(caseValue, caseIsNull, whenValue, whenIsNull) {
// Evaluate and return THEN result
return evaluateNodeValueWithNull(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNodeValueWithNull(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL
return nil, true, nil
}
File diff suppressed because it is too large Load Diff
+117
View File
@@ -0,0 +1,117 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Package expr provides expression parsing and evaluation capabilities for StreamSQL.
This package implements a comprehensive expression engine that supports mathematical operations,
logical comparisons, function calls, field references, and complex CASE expressions.
It serves as the foundation for WHERE clauses, HAVING clauses, and computed fields in SQL queries.
# Core Features
Mathematical Operations - Supports arithmetic operators (+, -, *, /, %, ^) with proper precedence
Logical Operations - Boolean logic with AND, OR operators and comparison operators (=, !=, <, >, <=, >=, LIKE)
Function Integration - Seamless integration with the functions package for built-in and custom functions
Field References - Dynamic field access with dot notation support for nested data structures
CASE Expressions - Full support for both simple and searched CASE expressions
Type Safety - Automatic type conversion and validation during expression evaluation
Fallback Support - Integration with expr-lang/expr library for complex expressions
# Expression Types
The package supports various expression node types:
// Basic types
TypeNumber - Numeric constants (integers and floats)
TypeString - String literals with proper escaping
TypeField - Field references (e.g., "temperature", "device.id")
TypeOperator - Binary and unary operators
TypeFunction - Function calls with argument validation
TypeParenthesis - Grouped expressions for precedence control
TypeCase - CASE expressions for conditional logic
# Usage Examples
Basic mathematical expression:
expr, err := NewExpression("temperature * 1.8 + 32")
if err != nil {
log.Fatal(err)
}
result, err := expr.Evaluate(data)
Logical expression with field references:
expr, err := NewExpression("temperature > 25 AND humidity < 60")
result, err := expr.Evaluate(data)
Function call expression:
expr, err := NewExpression("UPPER(device_name) LIKE 'SENSOR%'")
result, err := expr.Evaluate(data)
CASE expression for conditional logic:
expr, err := NewExpression(`
CASE
WHEN temperature > 30 THEN 'hot'
WHEN temperature > 20 THEN 'warm'
ELSE 'cold'
END
`)
result, err := expr.Evaluate(data)
# Operator Precedence
The expression parser follows standard mathematical precedence rules:
1. Parentheses (highest)
2. Power (^)
3. Multiplication, Division, Modulo (*, /, %)
4. Addition, Subtraction (+, -)
5. Comparison (>, <, >=, <=, LIKE, IS)
6. Equality (=, ==, !=, <>)
7. Logical AND
8. Logical OR (lowest)
# Error Handling
The package provides comprehensive error handling with detailed error messages:
Syntax validation during expression creation
Type checking during evaluation
Function argument validation
Graceful fallback to expr-lang for unsupported expressions
# Performance Considerations
Expressions are parsed once and can be evaluated multiple times
Built-in operator optimization for common mathematical operations
Lazy evaluation for logical operators (short-circuiting)
Efficient field access caching for repeated evaluations
Automatic fallback to optimized expr-lang library when needed
# Integration
This package integrates seamlessly with other StreamSQL components:
Functions package - For built-in and custom function execution
Types package - For data type definitions and conversions
Stream package - For real-time expression evaluation in data streams
RSQL package - For SQL parsing and expression extraction
*/
package expr

Some files were not shown because too many files have changed in this diff Show More