2 Commits

Author SHA1 Message Date
Whki e02413c242 Merge pull request #28 from rulego/dev
Dev
2025-07-27 13:20:52 +08:00
Whki 990d240e84 Merge pull request #27 from rulego/dev
refactor:Rename AddData to Emit
2025-07-25 12:40:33 +08:00
184 changed files with 14606 additions and 54248 deletions
+85 -9
View File
@@ -2,9 +2,9 @@ name: CI
on:
push:
branches: [ main, dev ]
branches: [ main, master, develop ]
pull_request:
branches: [ main, dev ]
branches: [ main, master, develop ]
jobs:
test:
@@ -41,15 +41,91 @@ jobs:
run: go build -v ./...
- name: Run tests
if: matrix.go-version != '1.21'
run: go test -v -race -timeout 600s ./...
run: go test -v -race -timeout 300s ./...
- name: Run tests with coverage
if: matrix.go-version == '1.21'
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 600s ./...
run: go test -v -race -coverprofile=coverage.out -covermode=atomic -timeout 300s ./...
- name: Upload coverage reports to Codecov
- name: Upload coverage to Codecov
if: matrix.go-version == '1.21'
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
case-expression-tests:
name: CASE Expression Tests
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.21'
- name: Check out code
uses: actions/checkout@v4
- name: Cache Go modules
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-1.21-${{ hashFiles('**/go.sum') }}
- name: Download dependencies
run: go mod download
- name: Run CASE Expression Parsing Tests
run: go test -v -run TestCaseExpressionParsing -timeout 15s
- name: Run CASE Expression Comprehensive Tests
run: go test -v -run TestCaseExpressionComprehensive -timeout 15s
- name: Run CASE Expression Field Extraction Tests
run: go test -v -run TestCaseExpressionFieldExtraction -timeout 15s
- name: Run CASE Expression in SQL Tests
run: go test -v -run TestCaseExpressionInSQL -timeout 15s
- name: Run CASE Expression Aggregation Tests (with known limitations)
run: go test -v -run "TestCaseExpressionInAggregation|TestComplexCaseExpressionsInAggregation" -timeout 20s
- name: Run CASE Expression Edge Cases
run: go test -v -run TestCaseExpressionEdgeCases -timeout 15s
# lint:
# name: Lint
# runs-on: ubuntu-latest
#
# steps:
# - name: Set up Go
# uses: actions/setup-go@v4
# with:
# go-version: '1.21'
#
# - name: Check out code
# uses: actions/checkout@v4
#
# - name: Run golangci-lint
# uses: golangci/golangci-lint-action@v3
# with:
# version: latest
# args: --timeout=5m
#
# security:
# name: Security Scan
# runs-on: ubuntu-latest
#
# steps:
# - name: Set up Go
# uses: actions/setup-go@v4
# with:
# go-version: '1.21'
#
# - name: Check out code
# uses: actions/checkout@v4
#
# - name: Run Gosec Security Scanner
# uses: securecodewarrior/github-action-gosec@v1
# with:
# args: './...'
+57 -5
View File
@@ -5,9 +5,6 @@ on:
tags:
- 'v*'
permissions:
contents: write
jobs:
test:
name: Test Before Release
@@ -32,7 +29,12 @@ jobs:
run: go mod download
- name: Run all tests
run: go test -v -race -timeout 600s ./...
run: go test -v -race -timeout 30s ./...
- name: Run CASE expression tests specifically
run: |
echo "Testing CASE expression functionality..."
go test -v -run TestCaseExpression -timeout 20s
release:
name: Create Release
@@ -57,4 +59,54 @@ jobs:
key: ${{ runner.os }}-go-1.21-${{ hashFiles('**/go.sum') }}
- name: Download dependencies
run: go mod download
run: go mod download
- name: Build binaries
run: |
# Build for multiple platforms
GOOS=linux GOARCH=amd64 go build -o streamsql-linux-amd64 ./...
GOOS=windows GOARCH=amd64 go build -o streamsql-windows-amd64.exe ./...
GOOS=darwin GOARCH=amd64 go build -o streamsql-darwin-amd64 ./...
GOOS=darwin GOARCH=arm64 go build -o streamsql-darwin-arm64 ./...
- name: Generate changelog
id: changelog
run: |
echo "CHANGELOG<<EOF" >> $GITHUB_OUTPUT
echo "## 🚀 StreamSQL $(echo ${{ github.ref }} | sed 's/refs\/tags\///')" >> $GITHUB_OUTPUT
echo "" >> $GITHUB_OUTPUT
echo "### ✨ 新增功能" >> $GITHUB_OUTPUT
echo "- 完善的CASE表达式支持" >> $GITHUB_OUTPUT
echo "- 多条件逻辑表达式 (AND, OR)" >> $GITHUB_OUTPUT
echo "- 数学函数集成" >> $GITHUB_OUTPUT
echo "- 字段提取和引用功能" >> $GITHUB_OUTPUT
echo "" >> $GITHUB_OUTPUT
echo "### 🔧 改进" >> $GITHUB_OUTPUT
echo "- 负数解析优化" >> $GITHUB_OUTPUT
echo "- 字符串和数值混合比较" >> $GITHUB_OUTPUT
echo "- 表达式解析性能提升" >> $GITHUB_OUTPUT
echo "" >> $GITHUB_OUTPUT
echo "### 📋 测试覆盖" >> $GITHUB_OUTPUT
echo "- ✅ 基础CASE表达式解析" >> $GITHUB_OUTPUT
echo "- ✅ 复杂条件组合" >> $GITHUB_OUTPUT
echo "- ✅ 函数调用支持" >> $GITHUB_OUTPUT
echo "- ✅ 字段提取功能" >> $GITHUB_OUTPUT
echo "- ⚠️ 聚合函数中的使用 (部分支持)" >> $GITHUB_OUTPUT
echo "" >> $GITHUB_OUTPUT
echo "---" >> $GITHUB_OUTPUT
echo "📖 **完整文档**: [README.md](README.md) | [中文文档](README_ZH.md)" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: Create Release
uses: softprops/action-gh-release@v1
with:
body: ${{ steps.changelog.outputs.CHANGELOG }}
files: |
streamsql-linux-amd64
streamsql-windows-amd64.exe
streamsql-darwin-amd64
streamsql-darwin-arm64
draft: false
prerelease: false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+22 -109
View File
@@ -3,14 +3,12 @@
[![Go Report](https://goreportcard.com/badge/github.com/rulego/streamsql)](https://goreportcard.com/report/github.com/rulego/streamsql)
[![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
[![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml)
[![codecov](https://codecov.io/gh/rulego/streamsql/graph/badge.svg?token=1CK1O5J1BI)](https://codecov.io/gh/rulego/streamsql)
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go?tab=readme-ov-file#stream-processing)
English| [简体中文](README_ZH.md)
**StreamSQL** is a lightweight, SQL-based stream processing engine for IoT edge, enabling efficient data processing and analysis on unbounded streams.
📖 **[Documentation](https://rulego.cc/en/pages/streamsql-overview/)** | Similar to: [Apache Flink](https://flink.apache.org/)
Similar to: [Apache Flink](https://flink.apache.org/) and [ekuiper](https://ekuiper.org/)
## Features
@@ -84,9 +82,9 @@ func main() {
}
// Handle real-time transformation results
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Real-time result: %+v\n", results)
})
ssql.AddSink(func(result interface{}) {
fmt.Printf("Real-time result: %+v\n", result)
})
// Simulate sensor data input
sensorData := []map[string]interface{}{
@@ -113,7 +111,6 @@ func main() {
// Process data one by one, each will output results immediately
for _, data := range sensorData {
ssql.Emit(data)
//changedData,err:=ssql.EmitSync(data) //Synchronize to obtain processing results
time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
}
@@ -151,7 +148,7 @@ func main() {
// Step 1: Create StreamSQL Instance
// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
ssql := streamsql.New()
defer ssql.Stop()
// Step 2: Define Stream SQL Query Statement
// This SQL statement showcases StreamSQL's core capabilities:
// - SELECT: Choose output fields and aggregation functions
@@ -198,8 +195,8 @@ func main() {
"humidity": 50.0 + rand.Float64()*20, // Humidity range: 50-70%
}
// Add data to stream, triggering StreamSQL's real-time processing
// Emit distributes data to corresponding windows and aggregators
ssql.Emit(randomData)
// AddData distributes data to corresponding windows and aggregators
ssql.stream.AddData(randomData)
}
case <-ctx.Done():
@@ -212,10 +209,10 @@ func main() {
// Step 6: Setup Result Processing Pipeline
resultChan := make(chan interface{})
// Add computation result callback function (Sink)
// When window triggers computation, results are output through this callback
ssql.AddSink(func(results []map[string]interface{}) {
resultChan <- results
})
// When window triggers computation, results are output through this callback
ssql.stream.AddSink(func(result interface{}) {
resultChan <- result
})
// Step 7: Start Result Consumer Goroutine
// Count received results for effect verification
@@ -267,7 +264,8 @@ func main() {
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')`
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
err := ssql.Execute(rsql)
if err != nil {
@@ -275,9 +273,9 @@ func main() {
}
// Handle aggregation results
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Aggregation result: %+v\n", results)
})
ssql.AddSink(func(result interface{}) {
fmt.Printf("Aggregation result: %+v\n", result)
})
// Add nested structured data
nestedData := map[string]interface{}{
@@ -334,11 +332,6 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
- **Characteristics**: The size of the window is not related to time but is divided based on the volume of data. It is suitable for segmenting data based on the amount of data.
- **Application Scenario**: In industrial IoT, an aggregation calculation is performed every time 100 device status data records are processed.
- **Session Window**
- **Definition**: A dynamic window based on data activity. When the interval between data exceeds a specified timeout, the current session ends and triggers the window.
- **Characteristics**: Window size changes dynamically, automatically dividing sessions based on data arrival intervals. When data arrives continuously, the session continues; when the data interval exceeds the timeout, the session ends and triggers the window.
- **Application Scenario**: In user behavior analysis, maintain a session when users operate continuously, and close the session and count operations within that session when users stop operating for more than 5 minutes.
### Stream
- **Definition**: A continuous sequence of data that is generated in an unbounded manner, typically from sensors, log systems, user behaviors, etc.
@@ -347,97 +340,17 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
### Time Semantics
StreamSQL supports two time concepts that determine how windows are divided and triggered:
- **Event Time**
- **Definition**: The actual time when the data occurred, usually represented by a timestamp generated by the data source.
#### Event Time
- **Definition**: Event time refers to the actual time when data was generated, usually recorded in a field within the data itself (such as `event_time`, `timestamp`, `order_time`, etc.).
- **Characteristics**:
- Windows are divided based on timestamp field values in the data
- Even if data arrives late, it can be correctly counted into the corresponding window based on event time
- Uses Watermark mechanism to handle out-of-order and late data
- Results are accurate but may have delays (need to wait for late data)
- **Use Cases**:
- Scenarios requiring precise temporal analysis
- Scenarios where data may arrive out of order or delayed
- Historical data replay and analysis
- **Configuration**: Use `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` to specify the event time field
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
```
#### Processing Time
- **Definition**: Processing time refers to the time when data arrives at the StreamSQL processing system, i.e., the current time when the system receives the data.
- **Characteristics**:
- Windows are divided based on the time data arrives at the system (`time.Now()`)
- Regardless of the time field value in the data, it is counted into the current window based on arrival time
- Uses system clock (Timer) to trigger windows
- Low latency but results may be inaccurate (cannot handle out-of-order and late data)
- **Use Cases**:
- Real-time monitoring and alerting scenarios
- Scenarios with high latency requirements and relatively low accuracy requirements
- Scenarios where data arrives in order and delay is controllable
- **Configuration**: Default when `WITH (TIMESTAMP=...)` is not specified
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- No WITH clause specified, defaults to processing time
```
#### Event Time vs Processing Time Comparison
| Feature | Event Time | Processing Time |
|---------|------------|-----------------|
| **Time Source** | Timestamp field in data | System current time |
| **Window Division** | Based on event timestamp | Based on data arrival time |
| **Late Data Handling** | Supported (Watermark mechanism) | Not supported |
| **Out-of-Order Handling** | Supported (Watermark mechanism) | Not supported |
| **Result Accuracy** | Accurate | May be inaccurate |
| **Processing Latency** | Higher (need to wait for late data) | Low (real-time trigger) |
| **Configuration** | `WITH (TIMESTAMP='field')` | Default (no WITH clause) |
| **Use Cases** | Precise temporal analysis, historical replay | Real-time monitoring, low latency requirements |
#### Window Time
- **Processing Time**
- **Definition**: The time when the data arrives at the processing system.
- **Window Start Time**
- **Event Time Windows**: The starting time point of the window, aligned to window boundaries based on event time (e.g., aligned to minute or hour boundaries).
- **Processing Time Windows**: The starting time point of the window, based on the time data arrives at the system.
- **Example**: For an event-time-based tumbling window `TumblingWindow('5m')`, the window start time aligns to multiples of 5 minutes (e.g., 10:00, 10:05, 10:10).
- **Definition**: The starting time point of the window based on event time. For example, for a sliding window based on event time, the window start time is the timestamp of the earliest event within the window.
- **Window End Time**
- **Event Time Windows**: The ending time point of the window, usually the window start time plus the window duration. Windows trigger when `watermark >= window_end`.
- **Processing Time Windows**: The ending time point of the window, based on the time data arrives at the system plus the window duration. Windows trigger when the system clock reaches the end time.
- **Example**: For a tumbling window with a duration of 1 minute, if the window start time is 10:00, then the window end time is 10:01.
#### Watermark Mechanism (Event Time Windows Only)
- **Definition**: Watermark indicates "events with timestamps less than this time should not arrive anymore", used to determine when windows can trigger.
- **Calculation Formula**: `Watermark = max(event_time) - MaxOutOfOrderness`
- **Window Trigger Condition**: Windows trigger when `watermark >= window_end`
- **Configuration Parameters**:
- `MAXOUTOFORDERNESS`: Maximum allowed out-of-order time for tolerating data disorder (default: 0, no out-of-order allowed)
- `ALLOWEDLATENESS`: Time window can accept late data after triggering (default: 0, no late data accepted)
- `IDLETIMEOUT`: Timeout for advancing Watermark based on processing time when data source is idle (default: 0, disabled)
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- Tolerate 5 seconds of out-of-order
ALLOWEDLATENESS='2s', -- Accept 2 seconds of late data after window triggers
IDLETIMEOUT='5s' -- Advance watermark based on processing time after 5s of no data
)
```
- **Definition**: The ending time point of the window based on event time. Typically, the window end time is the window start time plus the duration of the window. For example, if the duration of a sliding window is 1 minute, then the window end time is the window start time plus 1 minute.
## Contribution Guidelines
+21 -115
View File
@@ -3,14 +3,12 @@
[![Go Report](https://goreportcard.com/badge/github.com/rulego/streamsql)](https://goreportcard.com/report/github.com/rulego/streamsql)
[![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
[![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml)
[![codecov](https://codecov.io/gh/rulego/streamsql/graph/badge.svg?token=1CK1O5J1BI)](https://codecov.io/gh/rulego/streamsql)
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go?tab=readme-ov-file#stream-processing)
[English](README.md)| 简体中文
**StreamSQL** 是一款轻量级的、基于 SQL 的物联网边缘流处理引擎。它能够高效地处理和分析无界数据流。
📖 **[官方文档](https://rulego.cc/pages/streamsql-overview/)** | 类似: [Apache Flink](https://flink.apache.org/)
类似: [Apache Flink](https://flink.apache.org/) 和 [ekuiper](https://ekuiper.org/)
## 功能特性
@@ -87,8 +85,8 @@ func main() {
}
// 处理实时转换结果
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("实时处理结果: %+v\n", results)
ssql.AddSink(func(result interface{}) {
fmt.Printf("实时处理结果: %+v\n", result)
})
// 模拟传感器数据输入
@@ -116,7 +114,6 @@ func main() {
// 逐条处理数据,每条都会立即输出结果
for _, data := range sensorData {
ssql.Emit(data)
//changedData,err:=ssql.EmitSync(data) //同步获得处理结果
time.Sleep(100 * time.Millisecond) // 模拟实时数据到达
}
@@ -151,7 +148,7 @@ import (
func main() {
// 1. 创建StreamSQL实例 - 这是流式SQL处理引擎的入口
ssql := streamsql.New()
defer ssql.Stop()
// 2. 定义流式SQL查询语句
// 核心概念解析:
// - TumblingWindow('5s'): 滚动窗口每5秒创建一个新窗口窗口之间不重叠
@@ -209,7 +206,7 @@ func main() {
// - 按deviceId分组
// - 将数据分配到对应的时间窗口
// - 更新聚合计算状态
ssql.Emit(randomData)
ssql.stream.AddData(randomData)
}
case <-ctx.Done():
@@ -224,10 +221,8 @@ func main() {
// 6. 注册结果回调函数
// 当窗口触发时每5秒会调用这个回调函数
// 传递聚合计算的结果
ssql.AddSink(func(results []map[string]interface{}) {
for _, result := range results {
resultChan <- result
}
ssql.stream.AddSink(func(result interface{}) {
resultChan <- result
})
// 7. 结果消费者 - 处理计算结果
@@ -285,7 +280,8 @@ func main() {
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')`
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
err := ssql.Execute(rsql)
if err != nil {
@@ -293,22 +289,18 @@ func main() {
}
// 处理聚合结果
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("聚合结果: %+v\n", results)
ssql.AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %+v\n", result)
})
// 添加嵌套结构数据
nestedData := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "temperature-sensor-001",
"type": "temperature",
"status": "active",
},
"location": map[string]interface{}{
"building": "智能温室-A区",
"floor": "3F",
"name": "temperature-sensor-001",
"type": "temperature",
},
"location": "智能温室-A区",
},
"sensor": map[string]interface{}{
"temperature": 25.5,
@@ -346,11 +338,6 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
- **特点**:窗口的大小与时间无关,而是根据数据量来划分,适合对数据量进行分段处理。
- **应用场景**:在工业物联网中,每处理 100 条设备状态数据后进行一次聚合计算。
- **会话窗口Session Window**
- **定义**:基于数据活跃度的动态窗口,当数据之间的间隔超过指定的超时时间时,当前会话结束并触发窗口。
- **特点**:窗口大小动态变化,根据数据到达的间隔自动划分会话。当数据连续到达时,会话持续;当数据间隔超过超时时间时,会话结束并触发窗口。
- **应用场景**:在用户行为分析中,当用户连续操作时保持会话,当用户停止操作超过 5 分钟后关闭会话并统计该会话内的操作次数。
### 流Stream
- **定义**:流是数据的连续序列,数据以无界的方式产生,通常来自于传感器、日志系统、用户行为等。
@@ -359,97 +346,16 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
### 时间语义
StreamSQL 支持两种时间概念,它们决定了窗口如何划分和触发:
#### 事件时间Event Time
- **定义**:事件时间是指数据实际产生的时间,通常记录在数据本身的某个字段中(如 `event_time``timestamp``order_time` 等)。
- **特点**
- 窗口基于数据中的时间戳字段值来划分
- 即使数据延迟到达,也能根据事件时间正确统计到对应的窗口
- 使用 Watermark 机制来处理乱序和延迟数据
- 结果准确,但可能有延迟(需要等待延迟数据)
- **使用场景**
- 需要精确时序分析的场景
- 数据可能乱序或延迟到达的场景
- 历史数据回放和分析
- **配置方法**:使用 `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` 指定事件时间字段
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
```
#### 处理时间Processing Time
- **定义**:处理时间是指数据到达 StreamSQL 处理系统的时间,即系统接收到数据时的当前时间。
- **特点**
- 窗口基于数据到达系统的时间(`time.Now()`)来划分
- 不管数据中的时间字段是什么值,都按到达时间统计到当前窗口
- 使用系统时钟Timer来触发窗口
- 延迟低,但结果可能不准确(无法处理乱序和延迟数据)
- **使用场景**
- 实时监控和告警场景
- 对延迟要求高,对准确性要求相对较低的场景
- 数据顺序到达且延迟可控的场景
- **配置方法**:不指定 `WITH (TIMESTAMP=...)` 时,默认使用处理时间
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- 不指定 WITH 子句,默认使用处理时间
```
#### 事件时间 vs 处理时间对比
| 特性 | 事件时间 (Event Time) | 处理时间 (Processing Time) |
|------|---------------------|-------------------------|
| **时间来源** | 数据中的时间戳字段 | 系统当前时间 |
| **窗口划分** | 基于事件时间戳 | 基于数据到达时间 |
| **延迟处理** | 支持Watermark机制 | 不支持 |
| **乱序处理** | 支持Watermark机制 | 不支持 |
| **结果准确性** | 准确 | 可能不准确 |
| **处理延迟** | 较高(需等待延迟数据) | 低(实时触发) |
| **配置方式** | `WITH (TIMESTAMP='field')` | 默认不指定WITH |
| **适用场景** | 精确时序分析、历史回放 | 实时监控、低延迟要求 |
#### 窗口时间
- **事件时间Event Time**
- **定义**:数据实际发生的时间,通常由数据源生成的时间戳表示。
- **处理时间Processing Time**
- **定义**:数据到达处理系统的时间。
- **窗口开始时间Window Start Time**
- **事件时间窗口**:窗口的起始时间点,基于事件时间对齐到窗口边界(如对齐到分钟、小时的整点)
- **处理时间窗口**:窗口的起始时间点,基于数据到达系统的时间。
- **示例**:对于一个基于事件时间的滚动窗口 `TumblingWindow('5m')`窗口开始时间会对齐到5分钟的倍数如 10:00、10:05、10:10
- **定义**:基于事件时间,窗口的起始时间点。例如,对于一个基于事件时间的滑动窗口,窗口开始时间是窗口内最早事件的时间戳
- **窗口结束时间Window End Time**
- **事件时间窗口**窗口的结束时间点通常是窗口开始时间加上窗口的持续时间。窗口在 `watermark >= window_end` 时触发。
- **处理时间窗口**:窗口的结束时间点,基于数据到达系统的时间加上窗口持续时间。窗口在系统时钟到达结束时间时触发
- **示例**:一个持续时间为 1 分钟的滚动窗口,如果窗口开始时间是 10:00则窗口结束时间是 10:01。
#### Watermark 机制(仅事件时间窗口)
- **定义**Watermark 表示"小于该时间的事件不应该再到达",用于判断窗口是否可以触发。
- **计算公式**`Watermark = max(event_time) - MaxOutOfOrderness`
- **窗口触发条件**:当 `watermark >= window_end` 时,窗口触发
- **配置参数**
- `MAXOUTOFORDERNESS`允许的最大乱序时间用于容忍数据乱序默认0不允许乱序
- `ALLOWEDLATENESS`窗口触发后还能接受延迟数据的时间默认0不接受延迟数据
- `IDLETIMEOUT`:数据源空闲时,基于处理时间推进 Watermark 的超时时间默认0禁用
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- 容忍5秒的乱序
ALLOWEDLATENESS='2s', -- 窗口触发后还能接受2秒的延迟数据
IDLETIMEOUT='5s' -- 5秒无数据基于处理时间推进watermark
)
```
- **定义**:基于事件时间,窗口的结束时间点通常窗口结束时间是窗口开始时间加上窗口的持续时间。
- 例如,一个滑动窗口持续时间为 1 分钟,则窗口结束时间是窗口开始时间加上 1 分钟
## 贡献指南
+12 -34
View File
@@ -4,10 +4,12 @@ import (
"github.com/rulego/streamsql/functions"
)
// AggregateType aggregate type, re-exports functions.AggregateType
// 为了向后兼容重新导出functions模块中的类型和函数
// AggregateType 聚合类型重新导出functions.AggregateType
type AggregateType = functions.AggregateType
// Re-export all aggregate type constants
// 重新导出所有聚合类型常量
const (
Sum = functions.Sum
Count = functions.Count
@@ -20,69 +22,45 @@ const (
WindowStart = functions.WindowStart
WindowEnd = functions.WindowEnd
Collect = functions.Collect
FirstValue = functions.FirstValue
LastValue = functions.LastValue
MergeAgg = functions.MergeAgg
StdDevS = functions.StdDevS
Deduplicate = functions.Deduplicate
Var = functions.Var
VarS = functions.VarS
// Analytical functions
// 分析函数
Lag = functions.Lag
Latest = functions.Latest
ChangedCol = functions.ChangedCol
HadChanged = functions.HadChanged
// Expression aggregator for handling custom functions
// 表达式聚合器,用于处理自定义函数
Expression = functions.Expression
// Post-aggregation marker
PostAggregation = functions.PostAggregation
)
// AggregatorFunction aggregator function interface, re-exports functions.LegacyAggregatorFunction
// AggregatorFunction 聚合器函数接口,重新导出functions.LegacyAggregatorFunction
type AggregatorFunction = functions.LegacyAggregatorFunction
// ContextAggregator aggregator interface supporting context mechanism, re-exports functions.ContextAggregator
// ContextAggregator 支持context机制的聚合器接口重新导出functions.ContextAggregator
type ContextAggregator = functions.ContextAggregator
// Register adds custom aggregator to global registry, re-exports functions.RegisterLegacyAggregator
// Register 添加自定义聚合器到全局注册表,重新导出functions.RegisterLegacyAggregator
func Register(name string, constructor func() AggregatorFunction) {
functions.RegisterLegacyAggregator(name, constructor)
}
// CreateBuiltinAggregator creates built-in aggregator, re-exports functions.CreateLegacyAggregator
// CreateBuiltinAggregator 创建内置聚合器,重新导出functions.CreateLegacyAggregator
func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction {
// Special handling for expression type
// 特殊处理expression类型
if aggType == "expression" {
return &ExpressionAggregatorWrapper{
function: functions.NewExpressionAggregatorFunction(),
}
}
// Special handling for post-aggregation type (placeholder aggregator)
if aggType == "post_aggregation" {
return &PostAggregationPlaceholder{}
}
return functions.CreateLegacyAggregator(aggType)
}
// PostAggregationPlaceholder is a placeholder aggregator for post-aggregation fields
type PostAggregationPlaceholder struct{}
func (p *PostAggregationPlaceholder) New() AggregatorFunction {
return &PostAggregationPlaceholder{}
}
func (p *PostAggregationPlaceholder) Add(value interface{}) {
// Do nothing - this is just a placeholder
}
func (p *PostAggregationPlaceholder) Result() interface{} {
// Return nil - actual result will be computed in post-processing
return nil
}
// ExpressionAggregatorWrapper wraps expression aggregator to make it compatible with LegacyAggregatorFunction interface
// ExpressionAggregatorWrapper 包装表达式聚合器使其兼容LegacyAggregatorFunction接口
type ExpressionAggregatorWrapper struct {
function *functions.ExpressionAggregatorFunction
}
-148
View File
@@ -1,148 +0,0 @@
package aggregator
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestPostAggregationPlaceholder 测试后聚合占位符的完整功能
func TestPostAggregationPlaceholder(t *testing.T) {
t.Run("测试PostAggregationPlaceholder基本功能", func(t *testing.T) {
// 创建PostAggregationPlaceholder实例
placeholder := &PostAggregationPlaceholder{}
require.NotNil(t, placeholder)
// 测试New方法
newPlaceholder := placeholder.New()
require.NotNil(t, newPlaceholder)
assert.IsType(t, &PostAggregationPlaceholder{}, newPlaceholder)
// 测试Add方法应该不做任何操作
placeholder.Add(10)
placeholder.Add("test")
placeholder.Add(nil)
placeholder.Add([]int{1, 2, 3})
// 测试Result方法应该返回nil
result := placeholder.Result()
assert.Nil(t, result)
})
t.Run("测试通过CreateBuiltinAggregator创建PostAggregationPlaceholder", func(t *testing.T) {
// 使用CreateBuiltinAggregator创建post_aggregation类型的聚合器
aggregator := CreateBuiltinAggregator(PostAggregation)
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
// 测试创建的聚合器功能
newAgg := aggregator.New()
require.NotNil(t, newAgg)
assert.IsType(t, &PostAggregationPlaceholder{}, newAgg)
// 测试添加各种类型的值
newAgg.Add(100)
newAgg.Add("string_value")
newAgg.Add(map[string]interface{}{"key": "value"})
// 验证结果始终为nil
result := newAgg.Result()
assert.Nil(t, result)
})
t.Run("测试PostAggregationPlaceholder的多实例独立性", func(t *testing.T) {
// 创建多个实例
placeholder1 := &PostAggregationPlaceholder{}
placeholder2 := placeholder1.New()
placeholder3 := placeholder1.New()
// 验证实例类型正确
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder1)
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder2)
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder3)
// 每个实例都应该返回nil
assert.Nil(t, placeholder1.Result())
assert.Nil(t, placeholder2.Result())
assert.Nil(t, placeholder3.Result())
// 验证Add操作不会影响结果因为是占位符
placeholder1.Add("test1")
placeholder2.Add("test2")
placeholder3.Add("test3")
assert.Nil(t, placeholder1.Result())
assert.Nil(t, placeholder2.Result())
assert.Nil(t, placeholder3.Result())
})
t.Run("测试PostAggregationPlaceholder在聚合场景中的使用", func(t *testing.T) {
// 创建包含PostAggregationPlaceholder的聚合字段
groupFields := []string{"category"}
aggFields := []AggregationField{
{InputField: "value", AggregateType: Sum, OutputAlias: "sum_value"},
{InputField: "placeholder_field", AggregateType: PostAggregation, OutputAlias: "post_agg_field"},
}
// 创建分组聚合器
agg := NewGroupAggregator(groupFields, aggFields)
require.NotNil(t, agg)
// 添加测试数据
testData := []map[string]interface{}{
{"category": "A", "value": 10, "placeholder_field": "should_be_ignored"},
{"category": "A", "value": 20, "placeholder_field": "also_ignored"},
{"category": "B", "value": 30, "placeholder_field": 999},
}
for _, data := range testData {
err := agg.Add(data)
assert.NoError(t, err)
}
// 获取结果
results, err := agg.GetResults()
assert.NoError(t, err)
assert.Len(t, results, 2)
// 验证PostAggregationPlaceholder字段的结果为nil
for _, result := range results {
assert.Contains(t, result, "post_agg_field")
assert.Nil(t, result["post_agg_field"])
// 验证正常聚合字段工作正常
assert.Contains(t, result, "sum_value")
assert.NotNil(t, result["sum_value"])
}
})
}
// TestCreateBuiltinAggregatorPostAggregation 测试CreateBuiltinAggregator对post_aggregation类型的处理
func TestCreateBuiltinAggregatorPostAggregation(t *testing.T) {
t.Run("测试post_aggregation类型聚合器创建", func(t *testing.T) {
aggregator := CreateBuiltinAggregator("post_aggregation")
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
})
t.Run("测试PostAggregation常量", func(t *testing.T) {
// 验证PostAggregation常量值
assert.Equal(t, AggregateType("post_aggregation"), PostAggregation)
// 使用常量创建聚合器
aggregator := CreateBuiltinAggregator(PostAggregation)
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
})
t.Run("测试与其他聚合类型的区别", func(t *testing.T) {
// 创建不同类型的聚合器
sumAgg := CreateBuiltinAggregator(Sum)
countAgg := CreateBuiltinAggregator(Count)
postAgg := CreateBuiltinAggregator(PostAggregation)
// 验证类型不同
assert.NotEqual(t, sumAgg, postAgg)
assert.NotEqual(t, countAgg, postAgg)
assert.IsType(t, &PostAggregationPlaceholder{}, postAgg)
})
}
-165
View File
@@ -1,165 +0,0 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Package aggregator provides data aggregation functionality for StreamSQL.
This package implements group-based aggregation operations for stream processing,
supporting various aggregation functions and expression evaluation. It provides
thread-safe aggregation with support for custom expressions and built-in functions.
# Core Features
• Group Aggregation - Group data by specified fields and apply aggregation functions
• Built-in Functions - Support for Sum, Count, Avg, Max, Min, and more
• Expression Support - Custom expression evaluation within aggregations
• Thread Safety - Concurrent aggregation operations with proper synchronization
• Type Flexibility - Automatic type conversion and validation
• Performance Optimized - Efficient memory usage and processing
# Aggregation Types
Supported aggregation functions (re-exported from functions package):
// Mathematical aggregations
Sum, Count, Avg, Max, Min
StdDev, StdDevS, Var, VarS
Median, Percentile
// Collection aggregations
Collect, LastValue, MergeAgg
Deduplicate
// Window aggregations
WindowStart, WindowEnd
// Analytical functions
Lag, Latest, ChangedCol, HadChanged
// Custom expressions
Expression
# Core Interfaces
Main aggregation interfaces:
type Aggregator interface {
Add(data interface{}) error
Put(key string, val interface{}) error
GetResults() ([]map[string]interface{}, error)
Reset()
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}
type AggregatorFunction interface {
New() AggregatorFunction
Add(value interface{})
Result() interface{}
}
# Aggregation Configuration
Field configuration for aggregations:
type AggregationField struct {
InputField string // Source field name
AggregateType AggregateType // Aggregation function type
OutputAlias string // Result field alias
}
# Usage Examples
Basic group aggregation:
// Define aggregation fields
aggFields := []AggregationField{
{InputField: "temperature", AggregateType: Avg, OutputAlias: "avg_temp"},
{InputField: "humidity", AggregateType: Max, OutputAlias: "max_humidity"},
{InputField: "device_id", AggregateType: Count, OutputAlias: "device_count"},
}
// Create group aggregator
aggregator := NewGroupAggregator([]string{"location"}, aggFields)
// Add data
data := map[string]interface{}{
"location": "room1",
"temperature": 25.5,
"humidity": 60,
"device_id": "sensor001",
}
aggregator.Add(data)
// Get results
results, err := aggregator.GetResults()
Expression-based aggregation:
// Register custom expression
aggregator.RegisterExpression(
"comfort_index",
"temperature * 0.7 + humidity * 0.3",
[]string{"temperature", "humidity"},
func(data interface{}) (interface{}, error) {
// Custom evaluation logic
return evaluateComfortIndex(data)
},
)
Multiple group aggregation:
// Group by multiple fields
aggregator := NewGroupAggregator(
[]string{"location", "device_type"},
aggFields,
)
// Results will be grouped by both location and device_type
results, err := aggregator.GetResults()
# Built-in Aggregators
Create built-in aggregation functions:
// Create specific aggregator
sumAgg := CreateBuiltinAggregator(Sum)
avgAgg := CreateBuiltinAggregator(Avg)
countAgg := CreateBuiltinAggregator(Count)
// Use aggregator
sumAgg.Add(10)
sumAgg.Add(20)
result := sumAgg.Result() // returns 30
# Custom Aggregators
Register custom aggregation functions:
Register("custom_avg", func() AggregatorFunction {
return &CustomAvgAggregator{}
})
# Integration
Integrates with other StreamSQL components:
• Functions package - Built-in aggregation function implementations
• Stream package - Real-time data aggregation in streams
• Window package - Window-based aggregation operations
• Types package - Data type definitions and conversions
• RSQL package - SQL GROUP BY and aggregation parsing
*/
package aggregator
+38 -81
View File
@@ -11,21 +11,20 @@ import (
"github.com/rulego/streamsql/utils/fieldpath"
)
// Aggregator aggregator interface
type Aggregator interface {
Add(data interface{}) error
Put(key string, val interface{}) error
GetResults() ([]map[string]interface{}, error)
Reset()
// RegisterExpression registers expression evaluator
// RegisterExpression 注册表达式计算器
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}
// AggregationField defines configuration for a single aggregation field
// AggregationField 定义单个聚合字段的配置
type AggregationField struct {
InputField string // Input field name (e.g., "temperature")
AggregateType AggregateType // Aggregation type (e.g., Sum, Avg)
OutputAlias string // Output alias (e.g., "temp_sum")
InputField string // 输入字段名(如 "temperature"
AggregateType AggregateType // 聚合类型(如 Sum, Avg
OutputAlias string // 输出别名(如 "temp_sum"
}
type GroupAggregator struct {
@@ -35,26 +34,26 @@ type GroupAggregator struct {
groups map[string]map[string]AggregatorFunction
mu sync.RWMutex
context map[string]interface{}
// Expression evaluators
// 表达式计算器
expressions map[string]*ExpressionEvaluator
}
// ExpressionEvaluator wraps expression evaluation functionality
// ExpressionEvaluator 包装表达式计算功能
type ExpressionEvaluator struct {
Expression string // Complete expression
Field string // Primary field name
Fields []string // All fields referenced in expression
Expression string // 完整表达式
Field string // 主字段名
Fields []string // 表达式中引用的所有字段
evaluateFunc func(data interface{}) (interface{}, error)
}
// NewGroupAggregator creates a new group aggregator
// NewGroupAggregator 创建新的分组聚合器
func NewGroupAggregator(groupFields []string, aggregationFields []AggregationField) *GroupAggregator {
aggregators := make(map[string]AggregatorFunction)
// Create aggregator for each aggregation field
// 为每个聚合字段创建聚合器
for i := range aggregationFields {
if aggregationFields[i].OutputAlias == "" {
// If no alias specified, use input field name
// 如果没有指定别名,使用输入字段名
aggregationFields[i].OutputAlias = aggregationFields[i].InputField
}
aggregators[aggregationFields[i].OutputAlias] = CreateBuiltinAggregator(aggregationFields[i].AggregateType)
@@ -69,7 +68,7 @@ func NewGroupAggregator(groupFields []string, aggregationFields []AggregationFie
}
}
// RegisterExpression registers expression evaluator
// RegisterExpression 注册表达式计算器
func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error)) {
ga.mu.Lock()
defer ga.mu.Unlock()
@@ -92,26 +91,26 @@ func (ga *GroupAggregator) Put(key string, val interface{}) error {
return nil
}
// isNumericAggregator checks if aggregator requires numeric type input
// isNumericAggregator 检查聚合器是否需要数值类型输入
func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
// Dynamically check function type through functions module
// 通过functions模块动态检查函数类型
if fn, exists := functions.Get(string(aggType)); exists {
switch fn.GetType() {
case functions.TypeMath:
// Math functions usually require numeric input
// 数学函数通常需要数值输入
return true
case functions.TypeAggregation:
// Check if it's a numeric aggregation function
// 检查是否是数值聚合函数
switch string(aggType) {
case functions.SumStr, functions.AvgStr, functions.MinStr, functions.MaxStr, functions.CountStr,
functions.StdDevStr, functions.MedianStr, functions.PercentileStr,
functions.VarStr, functions.VarSStr, functions.StdDevSStr:
return true
case functions.CollectStr, functions.MergeAggStr, functions.DeduplicateStr, functions.LastValueStr:
// These functions can handle any type
// 这些函数可以处理任意类型
return false
default:
// For unknown aggregation functions, try to check function name patterns
// 对于未知的聚合函数,尝试检查函数名称模式
funcName := string(aggType)
if strings.Contains(funcName, functions.SumStr) || strings.Contains(funcName, functions.AvgStr) ||
strings.Contains(funcName, functions.MinStr) || strings.Contains(funcName, functions.MaxStr) ||
@@ -121,15 +120,15 @@ func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
return false
}
case functions.TypeAnalytical:
// Analytical functions can usually handle any type
// 分析函数通常可以处理任意类型
return false
default:
// For other types of functions, conservatively assume no numeric conversion needed
// 其他类型的函数,保守起见认为不需要数值转换
return false
}
}
// If function doesn't exist, judge by name pattern
// 如果函数不存在,根据名称模式判断
funcName := string(aggType)
if strings.Contains(funcName, functions.SumStr) || strings.Contains(funcName, functions.AvgStr) ||
strings.Contains(funcName, functions.MinStr) || strings.Contains(funcName, functions.MaxStr) ||
@@ -140,21 +139,9 @@ func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
return false
}
// shouldAllowNullValues 判断聚合函数是否应该允许NULL值
func (ga *GroupAggregator) shouldAllowNullValues(aggType AggregateType) bool {
// FIRST_VALUE和LAST_VALUE函数应该允许NULL值因为它们需要记录第一个/最后一个值即使是NULL
return aggType == FirstValue || aggType == LastValue
}
func (ga *GroupAggregator) Add(data interface{}) error {
ga.mu.Lock()
defer ga.mu.Unlock()
// 检查数据是否为nil
if data == nil {
return fmt.Errorf("data cannot be nil")
}
var v reflect.Value
switch data.(type) {
@@ -166,10 +153,6 @@ func (ga *GroupAggregator) Add(data interface{}) error {
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
// 检查是否为支持的数据类型
if v.Kind() != reflect.Struct && v.Kind() != reflect.Map {
return fmt.Errorf("unsupported data type: %T, expected struct or map", data)
}
}
key := ""
@@ -177,11 +160,11 @@ func (ga *GroupAggregator) Add(data interface{}) error {
var fieldVal interface{}
var found bool
// Check if it's a nested field
// 检查是否是嵌套字段
if fieldpath.IsNestedField(field) {
fieldVal, found = fieldpath.GetNestedField(data, field)
} else {
// Original field access logic
// 原有的字段访问逻辑
var f reflect.Value
if v.Kind() == reflect.Map {
keyVal := reflect.ValueOf(field)
@@ -215,21 +198,21 @@ func (ga *GroupAggregator) Add(data interface{}) error {
ga.groups[key] = make(map[string]AggregatorFunction)
}
// Create aggregator instances for each field
// 为每个字段创建聚合器实例
for outputAlias, agg := range ga.aggregators {
if _, exists := ga.groups[key][outputAlias]; !exists {
ga.groups[key][outputAlias] = agg.New()
}
}
// Process each aggregation field
// 处理每个聚合字段
for _, aggField := range ga.aggregationFields {
outputAlias := aggField.OutputAlias
if outputAlias == "" {
outputAlias = aggField.InputField
}
// Check if there's an expression evaluator
// 检查是否有表达式计算器
if expr, hasExpr := ga.expressions[outputAlias]; hasExpr {
result, err := expr.evaluateFunc(data)
if err != nil {
@@ -244,23 +227,23 @@ func (ga *GroupAggregator) Add(data interface{}) error {
inputField := aggField.InputField
// Special handling for count(*) case
// 特殊处理count(*)的情况
if inputField == "*" {
// For count(*), directly add 1 without getting specific field value
// 对于count(*)直接添加1不需要获取具体字段值
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(1)
}
continue
}
// Get field value - supports nested fields
// 获取字段值 - 支持嵌套字段
var fieldVal interface{}
var found bool
if fieldpath.IsNestedField(inputField) {
fieldVal, found = fieldpath.GetNestedField(data, inputField)
} else {
// Original field access logic
// 原有的字段访问逻辑
var f reflect.Value
if v.Kind() == reflect.Map {
keyVal := reflect.ValueOf(inputField)
@@ -276,7 +259,7 @@ func (ga *GroupAggregator) Add(data interface{}) error {
}
if !found {
// Try to get from context
// 尝试从context中获取
if ga.context != nil {
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
if contextAgg, ok := groupAgg.(ContextAggregator); ok {
@@ -292,31 +275,19 @@ func (ga *GroupAggregator) Add(data interface{}) error {
aggType := aggField.AggregateType
// Skip nil values for most aggregation functions, but allow FIRST_VALUE and LAST_VALUE to handle them
if fieldVal == nil && !ga.shouldAllowNullValues(aggType) {
continue
}
// Special handling for Count aggregator - it can handle any type
if aggType == Count {
// Count can handle any non-null value
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(fieldVal)
}
} else if ga.isNumericAggregator(aggType) {
// For numeric aggregation functions, try to convert to numeric type
// 动态检查是否需要数值转换
if ga.isNumericAggregator(aggType) {
// 对于数值聚合函数,尝试转换为数值类型
if numVal, err := cast.ToFloat64E(fieldVal); err == nil {
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(numVal)
}
} else {
return fmt.Errorf("cannot convert field %s value %v to numeric type for aggregator %s", inputField, fieldVal, aggType)
}
} else {
// For non-numeric aggregation functions, pass original value directly
// 对于非数值聚合函数,直接传递原始值
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(fieldVal)
}
}
@@ -328,15 +299,6 @@ func (ga *GroupAggregator) Add(data interface{}) error {
func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error) {
ga.mu.RLock()
defer ga.mu.RUnlock()
// 如果既没有分组字段又没有聚合字段,但有数据被添加过,返回一个空的结果行
if len(ga.aggregationFields) == 0 && len(ga.groupFields) == 0 {
if len(ga.groups) > 0 {
return []map[string]interface{}{{}}, nil
}
return []map[string]interface{}{}, nil
}
result := make([]map[string]interface{}, 0, len(ga.groups))
for key, aggregators := range ga.groups {
group := make(map[string]interface{})
@@ -347,12 +309,7 @@ func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error) {
}
}
for field, agg := range aggregators {
result := agg.Result()
group[field] = result
// Debug: log aggregator results (can be removed in production)
// if strings.HasPrefix(field, "__") {
// fmt.Printf("Aggregator %s result: %v (%T)\n", field, result, result)
// }
group[field] = agg.Result()
}
result = append(result, group)
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+14 -14
View File
@@ -16,7 +16,7 @@ type ExprCondition struct {
}
func NewExprCondition(expression string) (Condition, error) {
// Add custom string function support (startsWith, endsWith, contains are built-in operators)
// 添加自定义字符串函数支持(startsWithendsWithcontains是内置操作符)
options := []expr.Option{
expr.Function("like_match", func(params ...any) (any, error) {
if len(params) != 2 {
@@ -60,22 +60,22 @@ func (ec *ExprCondition) Evaluate(env interface{}) bool {
return result.(bool)
}
// matchesLikePattern implements LIKE pattern matching
// Supports % (matches any character sequence) and _ (matches single character)
// matchesLikePattern 实现LIKE模式匹配
// 支持%匹配任意字符序列和_匹配单个字符
func matchesLikePattern(text, pattern string) bool {
return likeMatch(text, pattern, 0, 0)
}
// likeMatch recursively implements LIKE matching algorithm
// likeMatch 递归实现LIKE匹配算法
func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
// If pattern has been fully matched
// 如果模式已经匹配完成
if patternIndex >= len(pattern) {
return textIndex >= len(text) // Text should also be fully matched
return textIndex >= len(text) // 文本也应该匹配完成
}
// If text has ended but pattern still has non-% characters, no match
// 如果文本已经结束,但模式还有非%字符,则不匹配
if textIndex >= len(text) {
// Check if remaining pattern characters are all %
// 检查剩余的模式是否都是%
for i := patternIndex; i < len(pattern); i++ {
if pattern[i] != '%' {
return false
@@ -84,16 +84,16 @@ func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
return true
}
// Process current pattern character
// 处理当前模式字符
patternChar := pattern[patternIndex]
if patternChar == '%' {
// % can match 0 or more characters
// Try matching 0 characters (skip %)
// %可以匹配0个或多个字符
// 尝试匹配0个字符跳过%
if likeMatch(text, pattern, textIndex, patternIndex+1) {
return true
}
// Try matching 1 or more characters
// 尝试匹配1个或多个字符
for i := textIndex; i < len(text); i++ {
if likeMatch(text, pattern, i+1, patternIndex+1) {
return true
@@ -101,10 +101,10 @@ func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
}
return false
} else if patternChar == '_' {
// _ matches exactly one character
// _匹配恰好一个字符
return likeMatch(text, pattern, textIndex+1, patternIndex+1)
} else {
// Regular characters must match exactly
// 普通字符必须精确匹配
if text[textIndex] == patternChar {
return likeMatch(text, pattern, textIndex+1, patternIndex+1)
}
File diff suppressed because it is too large Load Diff
-111
View File
@@ -1,111 +0,0 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Package condition provides condition evaluation functionality for StreamSQL.
This package implements condition evaluation using the expr-lang library,
supporting complex boolean expressions for filtering and conditional logic.
It provides custom functions for SQL-like operations including LIKE pattern
matching and NULL checking.
# Core Features
Boolean Expression Evaluation - Evaluate complex boolean conditions
LIKE Pattern Matching - SQL-style pattern matching with % and _ wildcards
NULL Checking - Support for IS NULL and IS NOT NULL operations
Custom Functions - Extended function library for SQL compatibility
Type Safety - Automatic type conversion and validation
Performance Optimized - Compiled expressions for fast evaluation
# Condition Interface
Unified interface for condition evaluation:
type Condition interface {
Evaluate(env interface{}) bool
}
# Custom Functions
Built-in SQL-compatible functions:
// LIKE pattern matching
like_match(text, pattern) - SQL LIKE operation with % and _ wildcards
// NULL checking
is_null(value) - Check if value is NULL
is_not_null(value) - Check if value is not NULL
# Usage Examples
Basic condition evaluation:
condition, err := NewExprCondition("age >= 18 AND status == 'active'")
if err != nil {
log.Fatal(err)
}
data := map[string]interface{}{
"age": 25,
"status": "active",
}
result := condition.Evaluate(data) // returns true
LIKE pattern matching:
condition, err := NewExprCondition("like_match(name, 'John%')")
data := map[string]interface{}{"name": "John Smith"}
result := condition.Evaluate(data) // returns true
NULL checking:
condition, err := NewExprCondition("is_not_null(email)")
data := map[string]interface{}{"email": "user@example.com"}
result := condition.Evaluate(data) // returns true
Complex conditions:
condition, err := NewExprCondition(`
age >= 18 AND
like_match(email, '%@company.com') AND
is_not_null(department)
`)
# Pattern Matching
LIKE pattern matching supports:
% - Matches any sequence of characters (including empty)
_ - Matches exactly one character
Examples:
'John%' matches 'John', 'John Smith', 'Johnny'
'J_hn' matches 'John' but not 'Johan'
'%@gmail.com' matches any email ending with @gmail.com
# Integration
Integrates with other StreamSQL components:
Stream package - Data filtering and conditional processing
RSQL package - WHERE and HAVING clause evaluation
Types package - Data type handling and conversion
Expr package - Expression parsing and evaluation
*/
package condition
+56 -130
View File
@@ -15,26 +15,23 @@
*/
/*
Package streamsql is a lightweight, SQL-based IoT edge stream processing engine.
Package streamsql 是一个轻量级的基于 SQL 的物联网边缘流处理引擎
StreamSQL provides efficient unbounded data stream processing and analysis capabilities,
supporting multiple window types, aggregate functions, custom functions, and seamless
integration with the RuleGo ecosystem.
StreamSQL 提供了高效的无界数据流处理和分析能力支持多种窗口类型聚合函数
自定义函数以及与 RuleGo 生态的无缝集成
# Core Features
# 核心特性
Lightweight design - Pure in-memory operations, no external dependencies
SQL syntax support - Process stream data using familiar SQL syntax
Multiple window types - Sliding, tumbling, counting, and session windows
Event time and processing time - Support both time semantics for accurate stream processing
Watermark mechanism - Handle out-of-order and late-arriving data with configurable tolerance
Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
Plugin-based custom functions - Runtime dynamic registration, supports 8 function types
RuleGo ecosystem integration - Extend input/output sources using RuleGo components
轻量级设计 - 纯内存操作无外部依赖
SQL语法支持 - 使用熟悉的SQL语法处理流数据
多种窗口类型 - 滑动窗口滚动窗口计数窗口会话窗口
丰富的聚合函数 - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE等
插件式自定义函数 - 运行时动态注册支持8种函数类型
RuleGo生态集成 - 利用RuleGo组件扩展输入输出源
# Getting Started
# 入门示例
Basic stream data processing:
基本的流数据处理
package main
@@ -46,10 +43,10 @@ Basic stream data processing:
)
func main() {
// Create StreamSQL instance
// 创建StreamSQL实例
ssql := streamsql.New()
// Define SQL query - Calculate temperature average by device ID every 5 seconds
// 定义SQL查询 - 每5秒按设备ID分组计算温度平均值
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
MIN(humidity) as min_humidity,
@@ -59,18 +56,18 @@ Basic stream data processing:
WHERE deviceId != 'device3'
GROUP BY deviceId, TumblingWindow('5s')`
// Execute SQL, create stream processing task
// 执行SQL创建流处理任务
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// Add result processing callback
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf("Aggregation result: %v\n", result)
// 添加结果处理回调
ssql.AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %v\n", result)
})
// Simulate sending stream data
// 模拟发送流数据
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
@@ -78,7 +75,7 @@ Basic stream data processing:
for {
select {
case <-ticker.C:
// Generate random device data
// 生成随机设备数据
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1),
"temperature": 20.0 + rand.Float64()*10,
@@ -89,107 +86,36 @@ Basic stream data processing:
}
}()
// Run for 30 seconds
// 运行30秒
time.Sleep(30 * time.Second)
}
# Window Functions
# 窗口函数
StreamSQL supports multiple window types:
StreamSQL 支持多种窗口类型
// Tumbling window - Independent window every 5 seconds
// 滚动窗口 - 每5秒一个独立窗口
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')
// Sliding window - 30-second window size, slides every 10 seconds
// 滑动窗口 - 窗口大小30秒每10秒滑动一次
SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')
// Counting window - One window per 100 records
// 计数窗口 - 每100条记录一个窗口
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)
// Session window - Automatically closes session after 5-minute timeout
// 会话窗口 - 超时5分钟自动关闭会话
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
# Event Time vs Processing Time
# 自定义函数
StreamSQL supports two time semantics for window processing:
StreamSQL 支持插件式自定义函数运行时动态注册
## Processing Time (Default)
Processing time uses the system clock when data arrives. Windows are triggered based on data arrival time:
// Processing time window (default)
SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m')
// Windows are triggered every 5 minutes based on when data arrives
## Event Time
Event time uses timestamps embedded in the data itself. Windows are triggered based on event timestamps,
allowing correct handling of out-of-order and late-arriving data:
// Event time window - Use 'order_time' field as event timestamp
SELECT COUNT(*) as order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP='order_time')
// Event time with integer timestamp (Unix milliseconds)
SELECT AVG(temperature) FROM stream
GROUP BY TumblingWindow('1m')
WITH (TIMESTAMP='event_time', TIMEUNIT='ms')
## Watermark and Late Data Handling
Event time windows use watermark mechanism to handle out-of-order and late data:
// Configure max out-of-orderness (tolerate 5 seconds of out-of-order data)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
MAXOUTOFORDERNESS='5s' // Watermark = max(event_time) - 5s
)
// Configure allowed lateness (accept late data for 2 seconds after window closes)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
ALLOWEDLATENESS='2s' // Window stays open for 2s after trigger
)
// Combine both configurations
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
MAXOUTOFORDERNESS='5s', // Tolerate 5s out-of-order before trigger
ALLOWEDLATENESS='2s' // Accept 2s late data after trigger
)
// Configure idle source mechanism (advance watermark based on processing time when data source is idle)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
IDLETIMEOUT='5s' // If no data arrives within 5s, watermark advances based on processing time
)
Key concepts:
MaxOutOfOrderness: Affects watermark calculation, delays window trigger to tolerate out-of-order data
AllowedLateness: Keeps window open after trigger to accept late data and update results
IdleTimeout: When data source is idle (no data arrives within timeout), watermark advances based on processing time to ensure windows can close
Watermark: Indicates that no events with timestamp less than watermark are expected
# Custom Functions
StreamSQL supports plugin-based custom functions with runtime dynamic registration:
// Register temperature conversion function
// 注册温度转换函数
functions.RegisterCustomFunction(
"fahrenheit_to_celsius",
functions.TypeConversion,
"Temperature conversion",
"Fahrenheit to Celsius",
"温度转换",
"华氏度转摄氏度",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
f, _ := functions.ConvertToFloat64(args[0])
@@ -197,55 +123,55 @@ StreamSQL supports plugin-based custom functions with runtime dynamic registrati
},
)
// Use immediately in SQL
// 立即在SQL中使用
sql := `SELECT deviceId,
AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
FROM stream GROUP BY deviceId, TumblingWindow('5s')`
Supported custom function types:
TypeMath - Mathematical calculation functions
TypeString - String processing functions
TypeConversion - Type conversion functions
TypeDateTime - Date and time functions
TypeAggregation - Aggregate functions
TypeAnalytical - Analytical functions
TypeWindow - Window functions
TypeCustom - General custom functions
支持的自定义函数类型
TypeMath - 数学计算函数
TypeString - 字符串处理函数
TypeConversion - 类型转换函数
TypeDateTime - 时间日期函数
TypeAggregation - 聚合函数
TypeAnalytical - 分析函数
TypeWindow - 窗口函数
TypeCustom - 通用自定义函数
# Log Configuration
# 日志配置
StreamSQL provides flexible log configuration options:
StreamSQL 提供灵活的日志配置选项
// Set log level
// 设置日志级别
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
// Output to file
// 输出到文件
logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))
// Disable logging (production environment)
// 禁用日志(生产环境)
ssql := streamsql.New(streamsql.WithDiscardLog())
# RuleGo Integration
# 与RuleGo集成
StreamSQL provides deep integration with the RuleGo rule engine through two dedicated components for stream data processing:
StreamSQL提供了与RuleGo规则引擎的深度集成通过两个专用组件实现流式数据处理
streamTransform (x/streamTransform) - Stream transformer, handles non-aggregation SQL queries
streamAggregator (x/streamAggregator) - Stream aggregator, handles aggregation SQL queries
streamTransform (x/streamTransform) - 流转换器处理非聚合SQL查询
streamAggregator (x/streamAggregator) - 流聚合器处理聚合SQL查询
Basic integration example:
基本集成示例
package main
import (
"github.com/rulego/rulego"
"github.com/rulego/rulego/api/types"
// Register StreamSQL components
// 注册StreamSQL组件
_ "github.com/rulego/rulego-components/external/streamsql"
)
func main() {
// Rule chain configuration
// 规则链配置
ruleChainJson := `{
"ruleChain": {"id": "rule01"},
"metadata": {
@@ -270,10 +196,10 @@ Basic integration example:
}
}`
// Create rule engine
// 创建规则引擎
ruleEngine, _ := rulego.New("rule01", []byte(ruleChainJson))
// Send data
// 发送数据
data := `{"deviceId":"sensor01","temperature":25.5}`
msg := types.NewMsg(0, "TELEMETRY", types.JSON, types.NewMetadata(), data)
ruleEngine.OnMsg(msg)
+1 -1
View File
@@ -51,7 +51,7 @@ func main() {
fmt.Println("✓ SQL执行成功")
// 5. 添加结果监听器
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("📊 聚合结果: %v\n", result)
})
+53 -47
View File
@@ -83,17 +83,19 @@ func demonstrateArrayAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 📋 数组索引访问结果:")
for i, item := range result {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备: %v\n", item["device"])
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
fmt.Printf(" 第个传感器湿度: %v%%\n", item["second_sensor_humidity"])
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备: %v\n", item["device"])
fmt.Printf(" 第个传感器度: %v°C\n", item["first_sensor_temp"])
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
fmt.Println()
}
}
})
@@ -166,19 +168,20 @@ func demonstrateMapKeyAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 🗝️ Map键访问结果:")
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备ID: %v\n", item["device_id"])
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
fmt.Printf(" 应用版本: %v\n", item["app_version"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备ID: %v\n", item["device_id"])
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
fmt.Printf(" 应用版本: %v\n", item["app_version"])
fmt.Println()
}
}
})
@@ -263,19 +266,20 @@ func demonstrateComplexMixedAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 🔄 混合复杂访问结果:")
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 建筑: %v\n", item["building"])
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 建筑: %v\n", item["building"])
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
fmt.Println()
}
}
})
@@ -321,18 +325,19 @@ func demonstrateNegativeIndexAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" ⬅️ 负数索引访问结果:")
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
fmt.Println()
}
}
})
@@ -367,19 +372,20 @@ func demonstrateArrayIndexAggregation(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 📈 数组索引聚合计算结果:")
resultSlice := result
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["location"])
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
fmt.Printf(" 设备数量: %v\n", item["device_count"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["location"])
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
fmt.Printf(" 设备数量: %v\n", item["device_count"])
fmt.Println()
}
}
})
+6 -6
View File
@@ -127,7 +127,7 @@ func testBasicFiltering() {
}
// 添加结果处理函数
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 高温告警: %v\n", result)
})
@@ -172,7 +172,7 @@ func testAggregation() {
}
// 处理聚合结果
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 聚合结果: %v\n", result)
})
@@ -221,7 +221,7 @@ func testSlidingWindow() {
return
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 滑动窗口分析: %v\n", result)
})
@@ -262,7 +262,7 @@ func testNestedFields() {
return
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 嵌套字段结果: %v\n", result)
})
@@ -336,7 +336,7 @@ func testCustomFunctions() {
return
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 自定义函数结果: %v\n", result)
})
@@ -396,7 +396,7 @@ func testComplexQuery() {
return
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 复杂查询结果: %v\n", result)
})
+18 -18
View File
@@ -609,14 +609,14 @@ func testMathFunctions(ssql *streamsql.Streamsql) {
}
// 添加测试数据
testData := []map[string]interface{}{
{
testData := []interface{}{
map[string]interface{}{
"device": "sensor1",
"temperature": 68.0, // 华氏度
"radius": 5.0,
"x1": 0.0, "y1": 0.0, "x2": 3.0, "y2": 4.0, // 距离=5
},
{
map[string]interface{}{
"device": "sensor1",
"temperature": 86.0, // 华氏度
"radius": 10.0,
@@ -625,7 +625,7 @@ func testMathFunctions(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 数学函数结果: %v\n", result)
})
@@ -659,20 +659,20 @@ func testStringFunctions(ssql *streamsql.Streamsql) {
}
// 添加测试数据
testData := []map[string]interface{}{
{
testData := []interface{}{
map[string]interface{}{
"device": "sensor1",
"metadata": `{"version":"1.0","type":"temperature"}`,
"level": 3,
},
{
map[string]interface{}{
"device": "sensor2",
"metadata": `{"version":"2.0","type":"humidity"}`,
"level": 5,
},
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 字符串函数结果: %v\n", result)
})
@@ -702,20 +702,20 @@ func testConversionFunctions(ssql *streamsql.Streamsql) {
}
// 添加测试数据
testData := []map[string]interface{}{
{
testData := []interface{}{
map[string]interface{}{
"device": "server1",
"client_ip": "192.168.1.100",
"memory_usage": 1073741824, // 1GB
},
{
map[string]interface{}{
"device": "server2",
"client_ip": "10.0.0.50",
"memory_usage": 2147483648, // 2GB
},
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 转换函数结果: %v\n", result)
})
@@ -746,14 +746,14 @@ func testAggregateFunctions(ssql *streamsql.Streamsql) {
}
// 添加测试数据
testData := []map[string]interface{}{
{"device": "sensor1", "value": 2.0, "category": "A"},
{"device": "sensor1", "value": 8.0, "category": "A"},
{"device": "sensor1", "value": 32.0, "category": "B"},
{"device": "sensor1", "value": 128.0, "category": "A"},
testData := []interface{}{
map[string]interface{}{"device": "sensor1", "value": 2.0, "category": "A"},
map[string]interface{}{"device": "sensor1", "value": 8.0, "category": "A"},
map[string]interface{}{"device": "sensor1", "value": 32.0, "category": "B"},
map[string]interface{}{"device": "sensor1", "value": 128.0, "category": "A"},
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 聚合函数结果: %v\n", result)
})
+74 -67
View File
@@ -119,18 +119,19 @@ func demonstrateBasicNestedAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 📋 基础嵌套字段访问结果:")
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 设备位置: %v\n", item["device.location"])
fmt.Printf(" 温度: %v°C\n", item["sensor.temperature"])
fmt.Printf(" 湿度: %v%%\n", item["sensor.humidity"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 设备位置: %v\n", item["device.location"])
fmt.Printf(" 温度: %v°C\n", item["sensor.temperature"])
fmt.Printf(" 湿度: %v%%\n", item["sensor.humidity"])
fmt.Println()
}
}
})
@@ -165,19 +166,20 @@ func demonstrateNestedAggregation(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 📈 嵌套字段聚合结果:")
resultSlice := result
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["device.location"])
fmt.Printf(" 平均温度: %.2f°C\n", item["avg_temp"])
fmt.Printf(" 最大湿度: %.1f%%\n", item["max_humidity"])
fmt.Printf(" 传感器数量: %v\n", item["sensor_count"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["device.location"])
fmt.Printf(" 平均温度: %.2f°C\n", item["avg_temp"])
fmt.Printf(" 最大湿度: %.1f%%\n", item["max_humidity"])
fmt.Printf(" 传感器数量: %v\n", item["sensor_count"])
fmt.Println()
}
}
})
@@ -269,18 +271,19 @@ func demonstrateArrayAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 📋 数组索引访问结果:")
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备: %v\n", item["device"])
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备: %v\n", item["device"])
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
fmt.Println()
}
}
})
@@ -353,19 +356,20 @@ func demonstrateMapKeyAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 🗝️ Map键访问结果:")
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备ID: %v\n", item["device_id"])
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
fmt.Printf(" 应用版本: %v\n", item["app_version"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备ID: %v\n", item["device_id"])
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
fmt.Printf(" 应用版本: %v\n", item["app_version"])
fmt.Println()
}
}
})
@@ -450,19 +454,20 @@ func demonstrateComplexMixedAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 🔄 混合复杂访问结果:")
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 建筑: %v\n", item["building"])
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 建筑: %v\n", item["building"])
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
fmt.Println()
}
}
})
@@ -508,18 +513,19 @@ func demonstrateNegativeIndexAccess(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" ⬅️ 负数索引访问结果:")
resultSlice := result
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
fmt.Println()
}
}
})
@@ -554,19 +560,20 @@ func demonstrateArrayIndexAggregation(ssql *streamsql.Streamsql) {
wg.Add(1)
// 设置结果回调
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
defer wg.Done()
fmt.Println(" 📈 数组索引聚合计算结果:")
resultSlice := result
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["location"])
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
fmt.Printf(" 设备数量: %v\n", item["device_count"])
fmt.Println()
if resultSlice, ok := result.([]map[string]interface{}); ok {
for i, item := range resultSlice {
resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["location"])
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
fmt.Printf(" 设备数量: %v\n", item["device_count"])
fmt.Println()
}
}
})
+6 -6
View File
@@ -62,7 +62,7 @@ func demonstrateDataCleaning() {
}
// 结果处理
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 清洗后数据: %+v\n", result)
})
@@ -103,7 +103,7 @@ func demonstrateDataEnrichment() {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 富化后数据: %+v\n", result)
})
@@ -147,7 +147,7 @@ func demonstrateRealTimeAlerting() {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 🚨 告警事件: %+v\n", result)
})
@@ -191,7 +191,7 @@ func demonstrateDataFormatConversion() {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 格式转换结果: %+v\n", result)
})
@@ -230,7 +230,7 @@ func demonstrateDataRouting() {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 路由结果: %+v\n", result)
})
@@ -273,7 +273,7 @@ func demonstrateNestedFieldProcessing() {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 嵌套字段处理结果: %+v\n", result)
})
+36 -26
View File
@@ -35,9 +35,11 @@ func demo1() {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
for _, data := range result {
fmt.Printf("发现空值数据: %+v\n", data)
ssql.AddSink(func(result interface{}) {
if results, ok := result.([]map[string]interface{}); ok {
for _, data := range results {
fmt.Printf("发现空值数据: %+v\n", data)
}
}
})
@@ -73,9 +75,11 @@ func demo2() {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
for _, data := range result {
fmt.Printf("发现有效数据: %+v\n", data)
ssql.AddSink(func(result interface{}) {
if results, ok := result.([]map[string]interface{}); ok {
for _, data := range results {
fmt.Printf("发现有效数据: %+v\n", data)
}
}
})
@@ -111,14 +115,16 @@ func demo3() {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
for _, data := range result {
status := data["status"]
value := data["value"]
if status != nil {
fmt.Printf("状态非空的数据: %+v\n", data)
} else if value == nil {
fmt.Printf("值为空的数据: %+v\n", data)
ssql.AddSink(func(result interface{}) {
if results, ok := result.([]map[string]interface{}); ok {
for _, data := range results {
status := data["status"]
value := data["value"]
if status != nil {
fmt.Printf("状态非空的数据: %+v\n", data)
} else if value == nil {
fmt.Printf("值为空的数据: %+v\n", data)
}
}
}
})
@@ -156,16 +162,18 @@ func demo4() {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
for _, data := range result {
value := data["value"]
status := data["status"]
priority := data["priority"]
ssql.AddSink(func(result interface{}) {
if results, ok := result.([]map[string]interface{}); ok {
for _, data := range results {
value := data["value"]
status := data["status"]
priority := data["priority"]
if value != nil && value.(float64) > 20 {
fmt.Printf("高值数据 (value > 20): %+v\n", data)
} else if status == nil && priority != nil {
fmt.Printf("状态异常但有优先级的数据: %+v\n", data)
if value != nil && value.(float64) > 20 {
fmt.Printf("高值数据 (value > 20): %+v\n", data)
} else if status == nil && priority != nil {
fmt.Printf("状态异常但有优先级的数据: %+v\n", data)
}
}
}
})
@@ -204,9 +212,11 @@ func demo5() {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
for _, data := range result {
fmt.Printf("有位置信息的设备: %+v\n", data)
ssql.AddSink(func(result interface{}) {
if results, ok := result.([]map[string]interface{}); ok {
for _, data := range results {
fmt.Printf("有位置信息的设备: %+v\n", data)
}
}
})
+233
View File
@@ -0,0 +1,233 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/rulego/streamsql/stream"
"github.com/rulego/streamsql/types"
)
func main() {
fmt.Println("=== StreamSQL 持久化功能测试 ===")
// 清理之前的测试数据
cleanupTestData()
// 测试1: 创建持久化流并模拟数据溢出
fmt.Println("📌 测试1: 数据溢出持久化")
testDataOverflowPersistence()
// 测试2: 模拟程序重启和数据恢复
fmt.Println("📌 测试2: 程序重启数据恢复")
testDataRecovery()
// 测试3: 查看持久化文件内容
fmt.Println("📌 测试3: 持久化文件分析")
analyzePersistenceFiles()
fmt.Println("✅ 真正持久化功能测试完成!")
}
func testDataOverflowPersistence() {
config := types.Config{
SimpleFields: []string{"id", "value"},
}
// 创建小缓冲区的持久化流处理器
stream, err := stream.NewStreamWithLossPolicy(
config,
100, // 很小的缓冲区,容易溢出
100, // 小结果缓冲区
50, // 小sink池
"persist", // 持久化策略
5*time.Second,
)
if err != nil {
fmt.Printf("创建流失败: %v\n", err)
return
}
stream.Start()
// 快速发送大量数据,触发溢出
inputCount := 1000
fmt.Printf("快速发送 %d 条数据到小缓冲区 (容量100)...\n", inputCount)
start := time.Now()
for i := 0; i < inputCount; i++ {
data := map[string]interface{}{
"id": i,
"value": fmt.Sprintf("data_%d", i),
}
stream.Emit(data)
}
duration := time.Since(start)
// 等待持久化完成
fmt.Println("等待持久化操作完成...")
time.Sleep(8 * time.Second)
// 获取统计信息
stats := stream.GetDetailedStats()
persistStats := stream.GetPersistenceStats()
fmt.Printf("⏱️ 发送耗时: %v\n", duration)
fmt.Printf("📊 输入数据: %d\n", stats["basic_stats"].(map[string]int64)["input_count"])
fmt.Printf("📊 处理数据: %d\n", stats["basic_stats"].(map[string]int64)["output_count"])
fmt.Printf("📊 通道容量: %d\n", stats["basic_stats"].(map[string]int64)["data_chan_cap"])
fmt.Printf("📊 持久化启用: %v\n", persistStats["enabled"])
fmt.Printf("📊 待写入数据: %v\n", persistStats["pending_count"])
fmt.Printf("📊 当前文件大小: %v bytes\n", persistStats["current_file_size"])
fmt.Printf("📊 文件索引: %v\n", persistStats["file_index"])
stream.Stop()
}
func testDataRecovery() {
config := types.Config{
SimpleFields: []string{"id", "value"},
}
// 创建新的持久化流处理器(模拟程序重启)
stream, err := stream.NewStreamWithLossPolicy(
config,
200, // 更大的缓冲区用于恢复
200,
100,
"persist", // 持久化策略
5*time.Second,
)
if err != nil {
fmt.Printf("创建流失败: %v\n", err)
return
}
stream.Start()
// 添加sink来接收恢复的数据
recoveredCount := 0
stream.AddSink(func(data interface{}) {
recoveredCount++
if recoveredCount <= 5 {
fmt.Printf("恢复数据 %d: %+v\n", recoveredCount, data)
}
})
// 尝试加载并重新处理持久化数据
fmt.Println("尝试加载持久化数据...")
if err := stream.LoadAndReprocessPersistedData(); err != nil {
fmt.Printf("数据恢复失败: %v\n", err)
}
// 等待处理完成
time.Sleep(3 * time.Second)
stats := stream.GetDetailedStats()
fmt.Printf("📊 恢复后处理数据: %d\n", stats["basic_stats"].(map[string]int64)["output_count"])
fmt.Printf("📊 接收到的恢复数据: %d\n", recoveredCount)
stream.Stop()
}
func analyzePersistenceFiles() {
dataDir := "./streamsql_overflow_data"
// 检查持久化目录
if _, err := os.Stat(dataDir); os.IsNotExist(err) {
fmt.Println("持久化目录不存在")
return
}
// 列出所有持久化文件
files, err := filepath.Glob(filepath.Join(dataDir, "streamsql_overflow_*.log"))
if err != nil {
fmt.Printf("读取持久化文件失败: %v\n", err)
return
}
if len(files) == 0 {
fmt.Println("没有找到持久化文件(可能已被恢复过程删除)")
return
}
fmt.Printf("发现 %d 个持久化文件:\n", len(files))
for i, file := range files {
info, err := os.Stat(file)
if err != nil {
fmt.Printf(" %d. %s (无法读取文件信息)\n", i+1, filepath.Base(file))
continue
}
fmt.Printf(" %d. %s (大小: %d bytes, 修改时间: %s)\n",
i+1, filepath.Base(file), info.Size(), info.ModTime().Format("15:04:05"))
}
// 读取第一个文件的前几行内容
if len(files) > 0 {
fmt.Printf("\n第一个文件的前3行内容:\n")
showFileContent(files[0], 3)
}
}
func showFileContent(filename string, maxLines int) {
file, err := os.Open(filename)
if err != nil {
fmt.Printf("无法打开文件: %v\n", err)
return
}
defer file.Close()
buffer := make([]byte, 1024)
n, err := file.Read(buffer)
if err != nil {
fmt.Printf("无法读取文件: %v\n", err)
return
}
content := string(buffer[:n])
lines := []rune(content)
lineCount := 0
currentLine := ""
for _, char := range lines {
if char == '\n' {
lineCount++
fmt.Printf(" %d: %s\n", lineCount, currentLine)
currentLine = ""
if lineCount >= maxLines {
break
}
} else {
currentLine += string(char)
}
}
if currentLine != "" && lineCount < maxLines {
fmt.Printf(" %d: %s\n", lineCount+1, currentLine)
}
}
func cleanupTestData() {
dataDir := "./streamsql_overflow_data"
if err := os.RemoveAll(dataDir); err != nil {
fmt.Printf("清理测试数据失败: %v\n", err)
}
}
+9 -9
View File
@@ -122,19 +122,19 @@ func testSimpleQuery(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 简单查询结果: %v\n", result)
})
// 添加测试数据
testData := []map[string]interface{}{
{
testData := []interface{}{
map[string]interface{}{
"device": "sensor1",
"value": 5.0,
"temperature": 68.0, // 华氏度
"radius": 3.0,
},
{
map[string]interface{}{
"device": "sensor2",
"value": 10.0,
"temperature": 86.0, // 华氏度
@@ -171,25 +171,25 @@ func testAggregateQuery(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.AddSink(func(result []map[string]interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 聚合查询结果: %v\n", result)
})
// 添加测试数据
testData := []map[string]interface{}{
{
testData := []interface{}{
map[string]interface{}{
"device": "sensor1",
"value": 3.0,
"temperature": 32.0, // 0°C
"radius": 1.0,
},
{
map[string]interface{}{
"device": "sensor1",
"value": 4.0,
"temperature": 212.0, // 100°C
"radius": 2.0,
},
{
map[string]interface{}{
"device": "sensor2",
"value": 5.0,
"temperature": 68.0, // 20°C
-82
View File
@@ -1,82 +0,0 @@
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
)
// main 演示PrintTable方法的使用
func main() {
fmt.Println("=== StreamSQL PrintTable 示例 ===")
// 创建StreamSQL实例
ssql := streamsql.New()
// 示例1: 聚合查询 - 按设备分组统计温度
fmt.Println("\n示例1: 聚合查询结果")
err := ssql.Execute("SELECT device, AVG(temperature) as avg_temp, MAX(temperature) as max_temp FROM stream GROUP BY device, TumblingWindow('3s')")
if err != nil {
fmt.Printf("执行SQL失败: %v\n", err)
return
}
// 使用PrintTable方法以表格形式输出结果
ssql.PrintTable()
// 发送测试数据
testData := []map[string]interface{}{
{"device": "sensor1", "temperature": 25.5, "timestamp": time.Now()},
{"device": "sensor1", "temperature": 26.0, "timestamp": time.Now()},
{"device": "sensor2", "temperature": 23.8, "timestamp": time.Now()},
{"device": "sensor2", "temperature": 24.2, "timestamp": time.Now()},
{"device": "sensor1", "temperature": 27.1, "timestamp": time.Now()},
}
for _, data := range testData {
ssql.Emit(data)
}
// 等待窗口触发
time.Sleep(4 * time.Second)
// 示例2: 非聚合查询
fmt.Println("\n示例2: 非聚合查询结果")
ssql2 := streamsql.New()
err = ssql2.Execute("SELECT device, temperature, temperature * 1.8 + 32 as fahrenheit FROM stream WHERE temperature > 24")
if err != nil {
fmt.Printf("执行SQL失败: %v\n", err)
return
}
ssql2.PrintTable()
// 发送测试数据
for _, data := range testData {
ssql2.Emit(data)
}
// 等待处理完成
time.Sleep(1 * time.Second)
// 示例3: 对比原始Print方法
fmt.Println("\n示例3: 原始Print方法输出对比")
ssql3 := streamsql.New()
err = ssql3.Execute("SELECT device, COUNT(*) as count FROM stream GROUP BY device, TumblingWindow('2s')")
if err != nil {
fmt.Printf("执行SQL失败: %v\n", err)
return
}
fmt.Println("原始PrintTable方法:")
ssql3.PrintTable()
// 发送数据
for i := 0; i < 3; i++ {
ssql3.Emit(map[string]interface{}{"device": "test_device", "value": i})
}
time.Sleep(3 * time.Second)
fmt.Println("\n=== 示例结束 ===")
}
+218
View File
@@ -0,0 +1,218 @@
package main
import (
"fmt"
"log"
"strings"
"time"
"github.com/rulego/streamsql/stream"
"github.com/rulego/streamsql/types"
)
func main() {
fmt.Println("=== StreamSQL 统一配置系统演示 ===")
// 1. 使用新的配置API创建默认配置Stream
fmt.Println("\n1. 默认配置Stream:")
defaultConfig := types.NewConfig()
defaultConfig.SimpleFields = []string{"temperature", "humidity", "location"}
defaultStream, err := stream.NewStream(defaultConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("默认配置", defaultStream)
// 2. 使用高性能预设配置
fmt.Println("\n2. 高性能配置Stream:")
highPerfConfig := types.NewConfigWithPerformance(types.HighPerformanceConfig())
highPerfConfig.SimpleFields = []string{"temperature", "humidity", "location"}
highPerfStream, err := stream.NewStreamWithHighPerformance(highPerfConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("高性能配置", highPerfStream)
// 3. 使用低延迟预设配置
fmt.Println("\n3. 低延迟配置Stream:")
lowLatencyConfig := types.NewConfigWithPerformance(types.LowLatencyConfig())
lowLatencyConfig.SimpleFields = []string{"temperature", "humidity", "location"}
lowLatencyStream, err := stream.NewStreamWithLowLatency(lowLatencyConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("低延迟配置", lowLatencyStream)
// 4. 使用零数据丢失预设配置
fmt.Println("\n4. 零数据丢失配置Stream:")
zeroLossConfig := types.NewConfigWithPerformance(types.ZeroDataLossConfig())
zeroLossConfig.SimpleFields = []string{"temperature", "humidity", "location"}
zeroLossStream, err := stream.NewStreamWithZeroDataLoss(zeroLossConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("零数据丢失配置", zeroLossStream)
// 5. 使用持久化预设配置
fmt.Println("\n5. 持久化配置Stream:")
persistConfig := types.NewConfigWithPerformance(types.PersistencePerformanceConfig())
persistConfig.SimpleFields = []string{"temperature", "humidity", "location"}
persistStream, err := stream.NewStreamWithCustomPerformance(persistConfig, types.PersistencePerformanceConfig())
if err != nil {
log.Fatal(err)
}
printStreamStats("持久化配置", persistStream)
// 6. 创建完全自定义的配置
fmt.Println("\n6. 自定义配置Stream:")
customPerfConfig := types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 30000,
ResultChannelSize: 25000,
WindowOutputSize: 3000,
EnableDynamicResize: true,
MaxBufferSize: 200000,
UsageThreshold: 0.85,
},
OverflowConfig: types.OverflowConfig{
Strategy: "expand",
BlockTimeout: 15 * time.Second,
AllowDataLoss: false,
ExpansionConfig: types.ExpansionConfig{
GrowthFactor: 2.0,
MinIncrement: 2000,
TriggerThreshold: 0.9,
ExpansionTimeout: 3 * time.Second,
},
},
WorkerConfig: types.WorkerConfig{
SinkPoolSize: 800,
SinkWorkerCount: 12,
MaxRetryRoutines: 10,
},
MonitoringConfig: types.MonitoringConfig{
EnableMonitoring: true,
StatsUpdateInterval: 500 * time.Millisecond,
EnableDetailedStats: true,
WarningThresholds: types.WarningThresholds{
DropRateWarning: 5.0,
DropRateCritical: 15.0,
BufferUsageWarning: 75.0,
BufferUsageCritical: 90.0,
},
},
}
customConfig := types.NewConfigWithPerformance(customPerfConfig)
customConfig.SimpleFields = []string{"temperature", "humidity", "location"}
customStream, err := stream.NewStreamWithCustomPerformance(customConfig, customPerfConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("自定义配置", customStream)
// 7. 配置比较演示
fmt.Println("\n7. 配置比较:")
compareConfigurations()
// 8. 实时数据处理演示
fmt.Println("\n8. 实时数据处理演示:")
demonstrateRealTimeProcessing(defaultStream)
// 9. 窗口统一配置演示
fmt.Println("\n9. 窗口统一配置演示:")
demonstrateWindowConfig()
// 清理资源
fmt.Println("\n10. 清理资源...")
defaultStream.Stop()
highPerfStream.Stop()
lowLatencyStream.Stop()
zeroLossStream.Stop()
persistStream.Stop()
customStream.Stop()
fmt.Println("\n=== 演示完成 ===")
}
func printStreamStats(name string, s *stream.Stream) {
stats := s.GetStats()
detailedStats := s.GetDetailedStats()
fmt.Printf("【%s】统计信息:\n", name)
fmt.Printf(" 数据通道: %d/%d (使用率: %.1f%%)\n",
stats["data_chan_len"], stats["data_chan_cap"],
detailedStats["data_chan_usage"])
fmt.Printf(" 结果通道: %d/%d (使用率: %.1f%%)\n",
stats["result_chan_len"], stats["result_chan_cap"],
detailedStats["result_chan_usage"])
fmt.Printf(" 工作池: %d/%d (使用率: %.1f%%)\n",
stats["sink_pool_len"], stats["sink_pool_cap"],
detailedStats["sink_pool_usage"])
fmt.Printf(" 性能等级: %s\n", detailedStats["performance_level"])
}
func compareConfigurations() {
configs := map[string]types.PerformanceConfig{
"默认配置": types.DefaultPerformanceConfig(),
"高性能配置": types.HighPerformanceConfig(),
"低延迟配置": types.LowLatencyConfig(),
"零丢失配置": types.ZeroDataLossConfig(),
"持久化配置": types.PersistencePerformanceConfig(),
}
fmt.Printf("%-12s %-10s %-10s %-10s %-10s %-15s\n",
"配置类型", "数据缓冲", "结果缓冲", "工作池", "工作线程", "溢出策略")
fmt.Println(strings.Repeat("-", 75))
for name, config := range configs {
fmt.Printf("%-12s %-10d %-10d %-10d %-10d %-15s\n",
name,
config.BufferConfig.DataChannelSize,
config.BufferConfig.ResultChannelSize,
config.WorkerConfig.SinkPoolSize,
config.WorkerConfig.SinkWorkerCount,
config.OverflowConfig.Strategy)
}
}
func demonstrateRealTimeProcessing(s *stream.Stream) {
// 设置数据接收器
s.AddSink(func(data interface{}) {
fmt.Printf(" 接收到处理结果: %v\n", data)
})
// 启动流处理
s.Start()
// 模拟发送数据
for i := 0; i < 3; i++ {
data := map[string]interface{}{
"temperature": 20.0 + float64(i)*2.5,
"humidity": 60.0 + float64(i)*5,
"location": fmt.Sprintf("sensor_%d", i+1),
"timestamp": time.Now().Unix(),
}
fmt.Printf(" 发送数据: %v\n", data)
s.Emit(data)
time.Sleep(100 * time.Millisecond)
}
// 等待处理完成
time.Sleep(200 * time.Millisecond)
// 显示最终统计
finalStats := s.GetDetailedStats()
fmt.Printf(" 最终统计 - 输入: %d, 输出: %d, 丢弃: %d, 处理率: %.1f%%\n",
finalStats["basic_stats"].(map[string]int64)["input_count"],
finalStats["basic_stats"].(map[string]int64)["output_count"],
finalStats["basic_stats"].(map[string]int64)["dropped_count"],
finalStats["process_rate"])
}
@@ -0,0 +1,74 @@
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/types"
)
// demonstrateWindowConfig 演示窗口统一配置的使用
func demonstrateWindowConfig() {
fmt.Println("=== 窗口统一配置演示 ===")
// 1. 测试默认配置的窗口
fmt.Println("\n1. 默认配置窗口测试")
testWindowWithConfig("默认配置", streamsql.New())
// 2. 测试高性能配置的窗口
fmt.Println("\n2. 高性能配置窗口测试")
testWindowWithConfig("高性能配置", streamsql.New(streamsql.WithHighPerformance()))
// 3. 测试低延迟配置的窗口
fmt.Println("\n3. 低延迟配置窗口测试")
testWindowWithConfig("低延迟配置", streamsql.New(streamsql.WithLowLatency()))
// 4. 测试自定义配置的窗口
fmt.Println("\n4. 自定义配置窗口测试")
customConfig := types.DefaultPerformanceConfig()
customConfig.BufferConfig.WindowOutputSize = 2000 // 自定义窗口输出缓冲区大小
testWindowWithConfig("自定义配置", streamsql.New(streamsql.WithCustomPerformance(customConfig)))
fmt.Println("\n=== 窗口配置演示完成 ===")
}
func testWindowWithConfig(configName string, ssql *streamsql.Streamsql) {
// 执行一个简单的滚动窗口查询
sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('2s')"
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ %s - 执行SQL失败: %v\n", configName, err)
return
}
// 添加结果处理器
stream := ssql.Stream()
if stream != nil {
stream.AddSink(func(result interface{}) {
fmt.Printf("📊 %s - 窗口结果: %v\n", configName, result)
})
// 发送测试数据
for i := 0; i < 5; i++ {
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device_%d", i%2),
"temperature": 20.0 + float64(i),
"timestamp": time.Now(),
}
ssql.Emit(data)
}
// 等待处理完成
time.Sleep(3 * time.Second)
// 获取统计信息
stats := ssql.GetDetailedStats()
fmt.Printf("📈 %s - 统计信息: %v\n", configName, stats)
}
// 停止流处理
ssql.Stop()
fmt.Printf("✅ %s - 测试完成\n", configName)
}
-245
View File
@@ -1,245 +0,0 @@
package expr
import (
"fmt"
"strings"
)
// parseCaseExpression parses CASE expression
func parseCaseExpression(tokens []string) (*ExprNode, []string, error) {
if len(tokens) == 0 || strings.ToUpper(tokens[0]) != "CASE" {
return nil, nil, fmt.Errorf("expected CASE keyword")
}
remaining := tokens[1:]
caseExpr := &CaseExpression{}
// Check if it's a simple CASE expression (CASE expr WHEN value THEN result)
if len(remaining) > 0 && strings.ToUpper(remaining[0]) != "WHEN" {
// Simple CASE expression
value, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing CASE expression: %v", err)
}
caseExpr.Value = value
remaining = newRemaining
}
// Parse WHEN clauses
for len(remaining) > 0 && strings.ToUpper(remaining[0]) == "WHEN" {
remaining = remaining[1:] // Skip WHEN
// Parse WHEN condition
condition, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing WHEN condition: %v", err)
}
remaining = newRemaining
// Check THEN keyword
if len(remaining) == 0 || strings.ToUpper(remaining[0]) != "THEN" {
return nil, nil, fmt.Errorf("expected THEN after WHEN condition")
}
remaining = remaining[1:] // Skip THEN
// Parse THEN result
result, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing THEN result: %v", err)
}
remaining = newRemaining
// Add WHEN clause
caseExpr.WhenClauses = append(caseExpr.WhenClauses, WhenClause{
Condition: condition,
Result: result,
})
}
// Parse optional ELSE clause
if len(remaining) > 0 && strings.ToUpper(remaining[0]) == "ELSE" {
remaining = remaining[1:] // Skip ELSE
elseExpr, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing ELSE expression: %v", err)
}
caseExpr.ElseResult = elseExpr
remaining = newRemaining
}
// Check END keyword
if len(remaining) == 0 || strings.ToUpper(remaining[0]) != "END" {
return nil, nil, fmt.Errorf("expected END to close CASE expression")
}
// Create ExprNode containing CaseExpression
caseNode := &ExprNode{
Type: TypeCase,
CaseExpr: caseExpr,
}
return caseNode, remaining[1:], nil
}
// evaluateCaseExpression evaluates the value of CASE expression
func evaluateCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
if node.Type != TypeCase {
return 0, fmt.Errorf("not a CASE expression")
}
if node.CaseExpr == nil {
return 0, fmt.Errorf("invalid CASE expression")
}
// Simple CASE expression: CASE expr WHEN value THEN result
if node.CaseExpr.Value != nil {
return evaluateSimpleCaseExpression(node, data)
}
// Search CASE expression: CASE WHEN condition THEN result
return evaluateSearchCaseExpression(node, data)
}
// evaluateSimpleCaseExpression evaluates simple CASE expression
func evaluateSimpleCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
caseExpr := node.CaseExpr
if caseExpr == nil {
return 0, fmt.Errorf("invalid CASE expression")
}
// Evaluate CASE expression value
caseValue, err := evaluateNodeValue(caseExpr.Value, data)
if err != nil {
return 0, err
}
// Iterate through WHEN clauses
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN value
whenValue, err := evaluateNodeValue(whenClause.Condition, data)
if err != nil {
return 0, err
}
// Compare values
if compareValuesForEquality(caseValue, whenValue) {
// Evaluate and return THEN result
return evaluateNode(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNode(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL (return 0 here)
return 0, nil
}
// evaluateSearchCaseExpression evaluates search CASE expression
func evaluateSearchCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
caseExpr := node.CaseExpr
if caseExpr == nil {
return 0, fmt.Errorf("invalid CASE expression")
}
// Iterate through WHEN clauses
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN condition - use boolean evaluation to handle logical operators
conditionResult, err := evaluateBoolNode(whenClause.Condition, data)
if err != nil {
return 0, err
}
// If condition is true, return THEN result
if conditionResult {
return evaluateNode(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNode(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL (return 0 here)
return 0, nil
}
// evaluateCaseExpressionWithNull evaluates CASE expression with NULL value support
func evaluateCaseExpressionWithNull(node *ExprNode, data map[string]interface{}) (interface{}, bool, error) {
if node.Type != TypeCase {
return nil, false, fmt.Errorf("not a CASE expression")
}
caseExpr := node.CaseExpr
if caseExpr == nil {
return nil, false, fmt.Errorf("invalid CASE expression")
}
// Simple CASE expression: CASE expr WHEN value THEN result
if caseExpr.Value != nil {
return evaluateCaseExpressionValueWithNull(node, data)
}
// Search CASE expression: CASE WHEN condition THEN result
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN condition - use boolean evaluation to handle logical operators
conditionResult, err := evaluateBoolNode(whenClause.Condition, data)
if err != nil {
return nil, false, err
}
// If condition is true, return THEN result
if conditionResult {
return evaluateNodeValueWithNull(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNodeValueWithNull(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL
return nil, true, nil
}
// evaluateCaseExpressionValueWithNull evaluates simple CASE expression (with NULL support)
func evaluateCaseExpressionValueWithNull(node *ExprNode, data map[string]interface{}) (interface{}, bool, error) {
caseExpr := node.CaseExpr
if caseExpr == nil {
return nil, false, fmt.Errorf("invalid CASE expression")
}
// Evaluate CASE expression value
caseValue, caseIsNull, err := evaluateNodeValueWithNull(caseExpr.Value, data)
if err != nil {
return nil, false, err
}
// Iterate through WHEN clauses
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN value
whenValue, whenIsNull, err := evaluateNodeValueWithNull(whenClause.Condition, data)
if err != nil {
return nil, false, err
}
// Compare values (with NULL comparison support)
if compareValuesWithNullForEquality(caseValue, caseIsNull, whenValue, whenIsNull) {
// Evaluate and return THEN result
return evaluateNodeValueWithNull(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNodeValueWithNull(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL
return nil, true, nil
}
File diff suppressed because it is too large Load Diff
-117
View File
@@ -1,117 +0,0 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Package expr provides expression parsing and evaluation capabilities for StreamSQL.
This package implements a comprehensive expression engine that supports mathematical operations,
logical comparisons, function calls, field references, and complex CASE expressions.
It serves as the foundation for WHERE clauses, HAVING clauses, and computed fields in SQL queries.
# Core Features
Mathematical Operations - Supports arithmetic operators (+, -, *, /, %, ^) with proper precedence
Logical Operations - Boolean logic with AND, OR operators and comparison operators (=, !=, <, >, <=, >=, LIKE)
Function Integration - Seamless integration with the functions package for built-in and custom functions
Field References - Dynamic field access with dot notation support for nested data structures
CASE Expressions - Full support for both simple and searched CASE expressions
Type Safety - Automatic type conversion and validation during expression evaluation
Fallback Support - Integration with expr-lang/expr library for complex expressions
# Expression Types
The package supports various expression node types:
// Basic types
TypeNumber - Numeric constants (integers and floats)
TypeString - String literals with proper escaping
TypeField - Field references (e.g., "temperature", "device.id")
TypeOperator - Binary and unary operators
TypeFunction - Function calls with argument validation
TypeParenthesis - Grouped expressions for precedence control
TypeCase - CASE expressions for conditional logic
# Usage Examples
Basic mathematical expression:
expr, err := NewExpression("temperature * 1.8 + 32")
if err != nil {
log.Fatal(err)
}
result, err := expr.Evaluate(data)
Logical expression with field references:
expr, err := NewExpression("temperature > 25 AND humidity < 60")
result, err := expr.Evaluate(data)
Function call expression:
expr, err := NewExpression("UPPER(device_name) LIKE 'SENSOR%'")
result, err := expr.Evaluate(data)
CASE expression for conditional logic:
expr, err := NewExpression(`
CASE
WHEN temperature > 30 THEN 'hot'
WHEN temperature > 20 THEN 'warm'
ELSE 'cold'
END
`)
result, err := expr.Evaluate(data)
# Operator Precedence
The expression parser follows standard mathematical precedence rules:
1. Parentheses (highest)
2. Power (^)
3. Multiplication, Division, Modulo (*, /, %)
4. Addition, Subtraction (+, -)
5. Comparison (>, <, >=, <=, LIKE, IS)
6. Equality (=, ==, !=, <>)
7. Logical AND
8. Logical OR (lowest)
# Error Handling
The package provides comprehensive error handling with detailed error messages:
Syntax validation during expression creation
Type checking during evaluation
Function argument validation
Graceful fallback to expr-lang for unsupported expressions
# Performance Considerations
Expressions are parsed once and can be evaluated multiple times
Built-in operator optimization for common mathematical operations
Lazy evaluation for logical operators (short-circuiting)
Efficient field access caching for repeated evaluations
Automatic fallback to optimized expr-lang library when needed
# Integration
This package integrates seamlessly with other StreamSQL components:
Functions package - For built-in and custom function execution
Types package - For data type definitions and conversions
Stream package - For real-time expression evaluation in data streams
RSQL package - For SQL parsing and expression extraction
*/
package expr

Some files were not shown because too many files have changed in this diff Show More