26 Commits

Author SHA1 Message Date
rulego-team 5b13316754 test:add test cases 2025-08-08 17:52:43 +08:00
rulego-team c2c8f86d7f ci: skip regular tests for Go 1.21, only run coverage tests 2025-08-08 09:54:54 +08:00
rulego-team 049295b599 test:add test cases 2025-08-08 09:40:26 +08:00
rulego-team d6a8778731 test:add coverage test case 2025-08-08 09:00:02 +08:00
rulego-team 3ad5146654 更新readme 2025-08-07 19:41:52 +08:00
rulego-team 74efe9e526 fix(aggregator): enable Count aggregator to handle any data type 2025-08-07 19:36:43 +08:00
rulego-team ff00fd1f31 refactor:格式化代码和完善测试用例 2025-08-07 19:23:48 +08:00
rulego-team 10319b45a6 feat:ToFloat64E 增加bool类型转换 2025-08-07 19:22:22 +08:00
rulego-team 790e6c615d feat:函数增加别名 2025-08-07 19:20:14 +08:00
rulego-team a066a4df1b refactor:重构表达式引擎模块 2025-08-07 19:18:40 +08:00
rulego-team 57983f19d7 refactor:删除持久化策略 2025-08-06 18:11:44 +08:00
rulego-team 6f5305ca01 feat:完善测试测试用例 2025-08-06 17:18:50 +08:00
rulego-team a46b833608 feat:优化持久加载机制 2025-08-06 17:16:47 +08:00
rulego-team 4249cef16b perf:优化日志打印 2025-08-06 17:15:31 +08:00
rulego-team 6bfb592bd0 chore:增加测试用例 2025-08-06 11:05:06 +08:00
rulego-team 691ca41c87 fix:优化SubstringFunction负数处理逻辑 2025-08-06 10:55:28 +08:00
rulego-team f3fe997ce8 test:增加测试用例 2025-08-05 11:25:49 +08:00
rulego-team 98dab93e5b fix:Case 语句返回字符串不正确问题 2025-08-05 00:37:22 +08:00
rulego-team 937e8243cf fix:修复负数无法正常解析 2025-08-05 00:36:21 +08:00
rulego-team a43445ebc7 refactor:优化注释 2025-08-04 14:45:43 +08:00
rulego-team de4d47d87d ci:codecov-action 使用v3 2025-08-04 14:36:51 +08:00
rulego-team a47748d4c7 refactor: translate Chinese comments to English in functions directory
- Convert all Chinese function comments to English in functions package
- Update interface documentation for better international readability
- Maintain original logic and functionality unchanged
- Improve code documentation standards for global development
2025-08-04 12:35:33 +08:00
rulego-team ed2a063000 perf:优化持久化溢出策略 2025-08-04 11:27:41 +08:00
rulego-team 8ebd152ec9 refactor:Emit入参从interface{} 改成 map[string]interface{};AddSink(func(results interface{})改成AddSink(func(results []map[string]interface{}) 2025-08-03 23:41:11 +08:00
rulego-team 343d045554 refactor:重构 stream 模块结构 2025-08-01 18:55:32 +08:00
rulego-team ec0ac04ebf ci: remove create release step from workflow 2025-07-30 19:42:48 +08:00
169 changed files with 40005 additions and 13201 deletions
+6 -82
View File
@@ -41,91 +41,15 @@ jobs:
run: go build -v ./... run: go build -v ./...
- name: Run tests - name: Run tests
if: matrix.go-version != '1.21'
run: go test -v -race -timeout 300s ./... run: go test -v -race -timeout 300s ./...
- name: Run tests with coverage - name: Run tests with coverage
if: matrix.go-version == '1.21' if: matrix.go-version == '1.21'
run: go test -v -race -coverprofile=coverage.out -covermode=atomic -timeout 300s ./... run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 300s ./...
- name: Upload coverage to Codecov - name: Upload coverage reports to Codecov
if: matrix.go-version == '1.21' if: matrix.go-version == '1.21'
uses: codecov/codecov-action@v5 uses: codecov/codecov-action@v3
with: env:
token: ${{ secrets.CODECOV_TOKEN }} CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
case-expression-tests:
name: CASE Expression Tests
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.21'
- name: Check out code
uses: actions/checkout@v4
- name: Cache Go modules
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-1.21-${{ hashFiles('**/go.sum') }}
- name: Download dependencies
run: go mod download
- name: Run CASE Expression Parsing Tests
run: go test -v -run TestCaseExpressionParsing -timeout 15s
- name: Run CASE Expression Comprehensive Tests
run: go test -v -run TestCaseExpressionComprehensive -timeout 15s
- name: Run CASE Expression Field Extraction Tests
run: go test -v -run TestCaseExpressionFieldExtraction -timeout 15s
- name: Run CASE Expression in SQL Tests
run: go test -v -run TestCaseExpressionInSQL -timeout 15s
- name: Run CASE Expression Aggregation Tests (with known limitations)
run: go test -v -run "TestCaseExpressionInAggregation|TestComplexCaseExpressionsInAggregation" -timeout 20s
- name: Run CASE Expression Edge Cases
run: go test -v -run TestCaseExpressionEdgeCases -timeout 15s
# lint:
# name: Lint
# runs-on: ubuntu-latest
#
# steps:
# - name: Set up Go
# uses: actions/setup-go@v4
# with:
# go-version: '1.21'
#
# - name: Check out code
# uses: actions/checkout@v4
#
# - name: Run golangci-lint
# uses: golangci/golangci-lint-action@v3
# with:
# version: latest
# args: --timeout=5m
#
# security:
# name: Security Scan
# runs-on: ubuntu-latest
#
# steps:
# - name: Set up Go
# uses: actions/setup-go@v4
# with:
# go-version: '1.21'
#
# - name: Check out code
# uses: actions/checkout@v4
#
# - name: Run Gosec Security Scanner
# uses: securecodewarrior/github-action-gosec@v1
# with:
# args: './...'
+3 -8
View File
@@ -5,6 +5,9 @@ on:
tags: tags:
- 'v*' - 'v*'
permissions:
contents: write
jobs: jobs:
test: test:
name: Test Before Release name: Test Before Release
@@ -55,11 +58,3 @@ jobs:
- name: Download dependencies - name: Download dependencies
run: go mod download run: go mod download
- name: Create Release
uses: softprops/action-gh-release@v1
with:
draft: false
prerelease: false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+15 -13
View File
@@ -3,12 +3,13 @@
[![Go Report](https://goreportcard.com/badge/github.com/rulego/streamsql)](https://goreportcard.com/report/github.com/rulego/streamsql) [![Go Report](https://goreportcard.com/badge/github.com/rulego/streamsql)](https://goreportcard.com/report/github.com/rulego/streamsql)
[![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml) [![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
[![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml) [![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml)
[![codecov](https://codecov.io/gh/rulego/streamsql/graph/badge.svg?token=1CK1O5J1BI)](https://codecov.io/gh/rulego/streamsql)
English| [简体中文](README_ZH.md) English| [简体中文](README_ZH.md)
**StreamSQL** is a lightweight, SQL-based stream processing engine for IoT edge, enabling efficient data processing and analysis on unbounded streams. **StreamSQL** is a lightweight, SQL-based stream processing engine for IoT edge, enabling efficient data processing and analysis on unbounded streams.
Similar to: [Apache Flink](https://flink.apache.org/) and [ekuiper](https://ekuiper.org/) 📖 **[Documentation](https://rulego.cc/en/pages/streamsql-overview/)** | Similar to: [Apache Flink](https://flink.apache.org/)
## Features ## Features
@@ -82,9 +83,9 @@ func main() {
} }
// Handle real-time transformation results // Handle real-time transformation results
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Real-time result: %+v\n", result) fmt.Printf("Real-time result: %+v\n", results)
}) })
// Simulate sensor data input // Simulate sensor data input
sensorData := []map[string]interface{}{ sensorData := []map[string]interface{}{
@@ -111,6 +112,7 @@ func main() {
// Process data one by one, each will output results immediately // Process data one by one, each will output results immediately
for _, data := range sensorData { for _, data := range sensorData {
ssql.Emit(data) ssql.Emit(data)
//changedData,err:=ssql.EmitSync(data) //Synchronize to obtain processing results
time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
} }
@@ -195,8 +197,8 @@ func main() {
"humidity": 50.0 + rand.Float64()*20, // Humidity range: 50-70% "humidity": 50.0 + rand.Float64()*20, // Humidity range: 50-70%
} }
// Add data to stream, triggering StreamSQL's real-time processing // Add data to stream, triggering StreamSQL's real-time processing
// AddData distributes data to corresponding windows and aggregators // Emit distributes data to corresponding windows and aggregators
ssql.stream.AddData(randomData) ssql.Emit(randomData)
} }
case <-ctx.Done(): case <-ctx.Done():
@@ -209,10 +211,10 @@ func main() {
// Step 6: Setup Result Processing Pipeline // Step 6: Setup Result Processing Pipeline
resultChan := make(chan interface{}) resultChan := make(chan interface{})
// Add computation result callback function (Sink) // Add computation result callback function (Sink)
// When window triggers computation, results are output through this callback // When window triggers computation, results are output through this callback
ssql.stream.AddSink(func(result interface{}) { ssql.AddSink(func(results []map[string]interface{}) {
resultChan <- result resultChan <- results
}) })
// Step 7: Start Result Consumer Goroutine // Step 7: Start Result Consumer Goroutine
// Count received results for effect verification // Count received results for effect verification
@@ -273,9 +275,9 @@ func main() {
} }
// Handle aggregation results // Handle aggregation results
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Aggregation result: %+v\n", result) fmt.Printf("Aggregation result: %+v\n", results)
}) })
// Add nested structured data // Add nested structured data
nestedData := map[string]interface{}{ nestedData := map[string]interface{}{
+19 -11
View File
@@ -3,12 +3,13 @@
[![Go Report](https://goreportcard.com/badge/github.com/rulego/streamsql)](https://goreportcard.com/report/github.com/rulego/streamsql) [![Go Report](https://goreportcard.com/badge/github.com/rulego/streamsql)](https://goreportcard.com/report/github.com/rulego/streamsql)
[![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml) [![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
[![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml) [![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml)
[![codecov](https://codecov.io/gh/rulego/streamsql/graph/badge.svg?token=1CK1O5J1BI)](https://codecov.io/gh/rulego/streamsql)
[English](README.md)| 简体中文 [English](README.md)| 简体中文
**StreamSQL** 是一款轻量级的、基于 SQL 的物联网边缘流处理引擎。它能够高效地处理和分析无界数据流。 **StreamSQL** 是一款轻量级的、基于 SQL 的物联网边缘流处理引擎。它能够高效地处理和分析无界数据流。
类似: [Apache Flink](https://flink.apache.org/) 和 [ekuiper](https://ekuiper.org/) 📖 **[官方文档](https://rulego.cc/pages/streamsql-overview/)** | 类似: [Apache Flink](https://flink.apache.org/)
## 功能特性 ## 功能特性
@@ -85,8 +86,8 @@ func main() {
} }
// 处理实时转换结果 // 处理实时转换结果
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("实时处理结果: %+v\n", result) fmt.Printf("实时处理结果: %+v\n", results)
}) })
// 模拟传感器数据输入 // 模拟传感器数据输入
@@ -114,6 +115,7 @@ func main() {
// 逐条处理数据,每条都会立即输出结果 // 逐条处理数据,每条都会立即输出结果
for _, data := range sensorData { for _, data := range sensorData {
ssql.Emit(data) ssql.Emit(data)
//changedData,err:=ssql.EmitSync(data) //同步获得处理结果
time.Sleep(100 * time.Millisecond) // 模拟实时数据到达 time.Sleep(100 * time.Millisecond) // 模拟实时数据到达
} }
@@ -206,7 +208,7 @@ func main() {
// - 按deviceId分组 // - 按deviceId分组
// - 将数据分配到对应的时间窗口 // - 将数据分配到对应的时间窗口
// - 更新聚合计算状态 // - 更新聚合计算状态
ssql.stream.AddData(randomData) ssql.Emit(randomData)
} }
case <-ctx.Done(): case <-ctx.Done():
@@ -221,8 +223,10 @@ func main() {
// 6. 注册结果回调函数 // 6. 注册结果回调函数
// 当窗口触发时(每5秒),会调用这个回调函数 // 当窗口触发时(每5秒),会调用这个回调函数
// 传递聚合计算的结果 // 传递聚合计算的结果
ssql.stream.AddSink(func(result interface{}) { ssql.AddSink(func(results []map[string]interface{}) {
resultChan <- result for _, result := range results {
resultChan <- result
}
}) })
// 7. 结果消费者 - 处理计算结果 // 7. 结果消费者 - 处理计算结果
@@ -289,18 +293,22 @@ func main() {
} }
// 处理聚合结果 // 处理聚合结果
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("聚合结果: %+v\n", result) fmt.Printf("聚合结果: %+v\n", results)
}) })
// 添加嵌套结构数据 // 添加嵌套结构数据
nestedData := map[string]interface{}{ nestedData := map[string]interface{}{
"device": map[string]interface{}{ "device": map[string]interface{}{
"info": map[string]interface{}{ "info": map[string]interface{}{
"name": "temperature-sensor-001", "name": "temperature-sensor-001",
"type": "temperature", "type": "temperature",
"status": "active",
},
"location": map[string]interface{}{
"building": "智能温室-A区",
"floor": "3F",
}, },
"location": "智能温室-A区",
}, },
"sensor": map[string]interface{}{ "sensor": map[string]interface{}{
"temperature": 25.5, "temperature": 25.5,
+10 -12
View File
@@ -4,12 +4,10 @@ import (
"github.com/rulego/streamsql/functions" "github.com/rulego/streamsql/functions"
) )
// 为了向后兼容,重新导出functions模块中的类型和函数 // AggregateType aggregate type, re-exports functions.AggregateType
// AggregateType 聚合类型,重新导出functions.AggregateType
type AggregateType = functions.AggregateType type AggregateType = functions.AggregateType
// 重新导出所有聚合类型常量 // Re-export all aggregate type constants
const ( const (
Sum = functions.Sum Sum = functions.Sum
Count = functions.Count Count = functions.Count
@@ -28,29 +26,29 @@ const (
Deduplicate = functions.Deduplicate Deduplicate = functions.Deduplicate
Var = functions.Var Var = functions.Var
VarS = functions.VarS VarS = functions.VarS
// 分析函数 // Analytical functions
Lag = functions.Lag Lag = functions.Lag
Latest = functions.Latest Latest = functions.Latest
ChangedCol = functions.ChangedCol ChangedCol = functions.ChangedCol
HadChanged = functions.HadChanged HadChanged = functions.HadChanged
// 表达式聚合器,用于处理自定义函数 // Expression aggregator for handling custom functions
Expression = functions.Expression Expression = functions.Expression
) )
// AggregatorFunction 聚合器函数接口,重新导出functions.LegacyAggregatorFunction // AggregatorFunction aggregator function interface, re-exports functions.LegacyAggregatorFunction
type AggregatorFunction = functions.LegacyAggregatorFunction type AggregatorFunction = functions.LegacyAggregatorFunction
// ContextAggregator 支持context机制的聚合器接口,重新导出functions.ContextAggregator // ContextAggregator aggregator interface supporting context mechanism, re-exports functions.ContextAggregator
type ContextAggregator = functions.ContextAggregator type ContextAggregator = functions.ContextAggregator
// Register 添加自定义聚合器到全局注册表,重新导出functions.RegisterLegacyAggregator // Register adds custom aggregator to global registry, re-exports functions.RegisterLegacyAggregator
func Register(name string, constructor func() AggregatorFunction) { func Register(name string, constructor func() AggregatorFunction) {
functions.RegisterLegacyAggregator(name, constructor) functions.RegisterLegacyAggregator(name, constructor)
} }
// CreateBuiltinAggregator 创建内置聚合器,重新导出functions.CreateLegacyAggregator // CreateBuiltinAggregator creates built-in aggregator, re-exports functions.CreateLegacyAggregator
func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction { func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction {
// 特殊处理expression类型 // Special handling for expression type
if aggType == "expression" { if aggType == "expression" {
return &ExpressionAggregatorWrapper{ return &ExpressionAggregatorWrapper{
function: functions.NewExpressionAggregatorFunction(), function: functions.NewExpressionAggregatorFunction(),
@@ -60,7 +58,7 @@ func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction {
return functions.CreateLegacyAggregator(aggType) return functions.CreateLegacyAggregator(aggType)
} }
// ExpressionAggregatorWrapper 包装表达式聚合器,使其兼容LegacyAggregatorFunction接口 // ExpressionAggregatorWrapper wraps expression aggregator to make it compatible with LegacyAggregatorFunction interface
type ExpressionAggregatorWrapper struct { type ExpressionAggregatorWrapper struct {
function *functions.ExpressionAggregatorFunction function *functions.ExpressionAggregatorFunction
} }
+165
View File
@@ -0,0 +1,165 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Package aggregator provides data aggregation functionality for StreamSQL.
This package implements group-based aggregation operations for stream processing,
supporting various aggregation functions and expression evaluation. It provides
thread-safe aggregation with support for custom expressions and built-in functions.
# Core Features
• Group Aggregation - Group data by specified fields and apply aggregation functions
• Built-in Functions - Support for Sum, Count, Avg, Max, Min, and more
• Expression Support - Custom expression evaluation within aggregations
• Thread Safety - Concurrent aggregation operations with proper synchronization
• Type Flexibility - Automatic type conversion and validation
• Performance Optimized - Efficient memory usage and processing
# Aggregation Types
Supported aggregation functions (re-exported from functions package):
// Mathematical aggregations
Sum, Count, Avg, Max, Min
StdDev, StdDevS, Var, VarS
Median, Percentile
// Collection aggregations
Collect, LastValue, MergeAgg
Deduplicate
// Window aggregations
WindowStart, WindowEnd
// Analytical functions
Lag, Latest, ChangedCol, HadChanged
// Custom expressions
Expression
# Core Interfaces
Main aggregation interfaces:
type Aggregator interface {
Add(data interface{}) error
Put(key string, val interface{}) error
GetResults() ([]map[string]interface{}, error)
Reset()
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}
type AggregatorFunction interface {
New() AggregatorFunction
Add(value interface{})
Result() interface{}
}
# Aggregation Configuration
Field configuration for aggregations:
type AggregationField struct {
InputField string // Source field name
AggregateType AggregateType // Aggregation function type
OutputAlias string // Result field alias
}
# Usage Examples
Basic group aggregation:
// Define aggregation fields
aggFields := []AggregationField{
{InputField: "temperature", AggregateType: Avg, OutputAlias: "avg_temp"},
{InputField: "humidity", AggregateType: Max, OutputAlias: "max_humidity"},
{InputField: "device_id", AggregateType: Count, OutputAlias: "device_count"},
}
// Create group aggregator
aggregator := NewGroupAggregator([]string{"location"}, aggFields)
// Add data
data := map[string]interface{}{
"location": "room1",
"temperature": 25.5,
"humidity": 60,
"device_id": "sensor001",
}
aggregator.Add(data)
// Get results
results, err := aggregator.GetResults()
Expression-based aggregation:
// Register custom expression
aggregator.RegisterExpression(
"comfort_index",
"temperature * 0.7 + humidity * 0.3",
[]string{"temperature", "humidity"},
func(data interface{}) (interface{}, error) {
// Custom evaluation logic
return evaluateComfortIndex(data)
},
)
Multiple group aggregation:
// Group by multiple fields
aggregator := NewGroupAggregator(
[]string{"location", "device_type"},
aggFields,
)
// Results will be grouped by both location and device_type
results, err := aggregator.GetResults()
# Built-in Aggregators
Create built-in aggregation functions:
// Create specific aggregator
sumAgg := CreateBuiltinAggregator(Sum)
avgAgg := CreateBuiltinAggregator(Avg)
countAgg := CreateBuiltinAggregator(Count)
// Use aggregator
sumAgg.Add(10)
sumAgg.Add(20)
result := sumAgg.Result() // returns 30
# Custom Aggregators
Register custom aggregation functions:
Register("custom_avg", func() AggregatorFunction {
return &CustomAvgAggregator{}
})
# Integration
Integrates with other StreamSQL components:
• Functions package - Built-in aggregation function implementations
• Stream package - Real-time data aggregation in streams
• Window package - Window-based aggregation operations
• Types package - Data type definitions and conversions
• RSQL package - SQL GROUP BY and aggregation parsing
*/
package aggregator
+64 -37
View File
@@ -11,20 +11,21 @@ import (
"github.com/rulego/streamsql/utils/fieldpath" "github.com/rulego/streamsql/utils/fieldpath"
) )
// Aggregator aggregator interface
type Aggregator interface { type Aggregator interface {
Add(data interface{}) error Add(data interface{}) error
Put(key string, val interface{}) error Put(key string, val interface{}) error
GetResults() ([]map[string]interface{}, error) GetResults() ([]map[string]interface{}, error)
Reset() Reset()
// RegisterExpression 注册表达式计算器 // RegisterExpression registers expression evaluator
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error)) RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
} }
// AggregationField 定义单个聚合字段的配置 // AggregationField defines configuration for a single aggregation field
type AggregationField struct { type AggregationField struct {
InputField string // 输入字段名(如 "temperature" InputField string // Input field name (e.g., "temperature")
AggregateType AggregateType // 聚合类型(如 Sum, Avg AggregateType AggregateType // Aggregation type (e.g., Sum, Avg)
OutputAlias string // 输出别名(如 "temp_sum" OutputAlias string // Output alias (e.g., "temp_sum")
} }
type GroupAggregator struct { type GroupAggregator struct {
@@ -34,26 +35,26 @@ type GroupAggregator struct {
groups map[string]map[string]AggregatorFunction groups map[string]map[string]AggregatorFunction
mu sync.RWMutex mu sync.RWMutex
context map[string]interface{} context map[string]interface{}
// 表达式计算器 // Expression evaluators
expressions map[string]*ExpressionEvaluator expressions map[string]*ExpressionEvaluator
} }
// ExpressionEvaluator 包装表达式计算功能 // ExpressionEvaluator wraps expression evaluation functionality
type ExpressionEvaluator struct { type ExpressionEvaluator struct {
Expression string // 完整表达式 Expression string // Complete expression
Field string // 主字段名 Field string // Primary field name
Fields []string // 表达式中引用的所有字段 Fields []string // All fields referenced in expression
evaluateFunc func(data interface{}) (interface{}, error) evaluateFunc func(data interface{}) (interface{}, error)
} }
// NewGroupAggregator 创建新的分组聚合器 // NewGroupAggregator creates a new group aggregator
func NewGroupAggregator(groupFields []string, aggregationFields []AggregationField) *GroupAggregator { func NewGroupAggregator(groupFields []string, aggregationFields []AggregationField) *GroupAggregator {
aggregators := make(map[string]AggregatorFunction) aggregators := make(map[string]AggregatorFunction)
// 为每个聚合字段创建聚合器 // Create aggregator for each aggregation field
for i := range aggregationFields { for i := range aggregationFields {
if aggregationFields[i].OutputAlias == "" { if aggregationFields[i].OutputAlias == "" {
// 如果没有指定别名,使用输入字段名 // If no alias specified, use input field name
aggregationFields[i].OutputAlias = aggregationFields[i].InputField aggregationFields[i].OutputAlias = aggregationFields[i].InputField
} }
aggregators[aggregationFields[i].OutputAlias] = CreateBuiltinAggregator(aggregationFields[i].AggregateType) aggregators[aggregationFields[i].OutputAlias] = CreateBuiltinAggregator(aggregationFields[i].AggregateType)
@@ -68,7 +69,7 @@ func NewGroupAggregator(groupFields []string, aggregationFields []AggregationFie
} }
} }
// RegisterExpression 注册表达式计算器 // RegisterExpression registers expression evaluator
func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error)) { func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error)) {
ga.mu.Lock() ga.mu.Lock()
defer ga.mu.Unlock() defer ga.mu.Unlock()
@@ -91,26 +92,26 @@ func (ga *GroupAggregator) Put(key string, val interface{}) error {
return nil return nil
} }
// isNumericAggregator 检查聚合器是否需要数值类型输入 // isNumericAggregator checks if aggregator requires numeric type input
func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool { func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
// 通过functions模块动态检查函数类型 // Dynamically check function type through functions module
if fn, exists := functions.Get(string(aggType)); exists { if fn, exists := functions.Get(string(aggType)); exists {
switch fn.GetType() { switch fn.GetType() {
case functions.TypeMath: case functions.TypeMath:
// 数学函数通常需要数值输入 // Math functions usually require numeric input
return true return true
case functions.TypeAggregation: case functions.TypeAggregation:
// 检查是否是数值聚合函数 // Check if it's a numeric aggregation function
switch string(aggType) { switch string(aggType) {
case functions.SumStr, functions.AvgStr, functions.MinStr, functions.MaxStr, functions.CountStr, case functions.SumStr, functions.AvgStr, functions.MinStr, functions.MaxStr, functions.CountStr,
functions.StdDevStr, functions.MedianStr, functions.PercentileStr, functions.StdDevStr, functions.MedianStr, functions.PercentileStr,
functions.VarStr, functions.VarSStr, functions.StdDevSStr: functions.VarStr, functions.VarSStr, functions.StdDevSStr:
return true return true
case functions.CollectStr, functions.MergeAggStr, functions.DeduplicateStr, functions.LastValueStr: case functions.CollectStr, functions.MergeAggStr, functions.DeduplicateStr, functions.LastValueStr:
// 这些函数可以处理任意类型 // These functions can handle any type
return false return false
default: default:
// 对于未知的聚合函数,尝试检查函数名称模式 // For unknown aggregation functions, try to check function name patterns
funcName := string(aggType) funcName := string(aggType)
if strings.Contains(funcName, functions.SumStr) || strings.Contains(funcName, functions.AvgStr) || if strings.Contains(funcName, functions.SumStr) || strings.Contains(funcName, functions.AvgStr) ||
strings.Contains(funcName, functions.MinStr) || strings.Contains(funcName, functions.MaxStr) || strings.Contains(funcName, functions.MinStr) || strings.Contains(funcName, functions.MaxStr) ||
@@ -120,15 +121,15 @@ func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
return false return false
} }
case functions.TypeAnalytical: case functions.TypeAnalytical:
// 分析函数通常可以处理任意类型 // Analytical functions can usually handle any type
return false return false
default: default:
// 其他类型的函数,保守起见认为不需要数值转换 // For other types of functions, conservatively assume no numeric conversion needed
return false return false
} }
} }
// 如果函数不存在,根据名称模式判断 // If function doesn't exist, judge by name pattern
funcName := string(aggType) funcName := string(aggType)
if strings.Contains(funcName, functions.SumStr) || strings.Contains(funcName, functions.AvgStr) || if strings.Contains(funcName, functions.SumStr) || strings.Contains(funcName, functions.AvgStr) ||
strings.Contains(funcName, functions.MinStr) || strings.Contains(funcName, functions.MaxStr) || strings.Contains(funcName, functions.MinStr) || strings.Contains(funcName, functions.MaxStr) ||
@@ -142,6 +143,12 @@ func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
func (ga *GroupAggregator) Add(data interface{}) error { func (ga *GroupAggregator) Add(data interface{}) error {
ga.mu.Lock() ga.mu.Lock()
defer ga.mu.Unlock() defer ga.mu.Unlock()
// 检查数据是否为nil
if data == nil {
return fmt.Errorf("data cannot be nil")
}
var v reflect.Value var v reflect.Value
switch data.(type) { switch data.(type) {
@@ -153,6 +160,10 @@ func (ga *GroupAggregator) Add(data interface{}) error {
if v.Kind() == reflect.Ptr { if v.Kind() == reflect.Ptr {
v = v.Elem() v = v.Elem()
} }
// 检查是否为支持的数据类型
if v.Kind() != reflect.Struct && v.Kind() != reflect.Map {
return fmt.Errorf("unsupported data type: %T, expected struct or map", data)
}
} }
key := "" key := ""
@@ -160,11 +171,11 @@ func (ga *GroupAggregator) Add(data interface{}) error {
var fieldVal interface{} var fieldVal interface{}
var found bool var found bool
// 检查是否是嵌套字段 // Check if it's a nested field
if fieldpath.IsNestedField(field) { if fieldpath.IsNestedField(field) {
fieldVal, found = fieldpath.GetNestedField(data, field) fieldVal, found = fieldpath.GetNestedField(data, field)
} else { } else {
// 原有的字段访问逻辑 // Original field access logic
var f reflect.Value var f reflect.Value
if v.Kind() == reflect.Map { if v.Kind() == reflect.Map {
keyVal := reflect.ValueOf(field) keyVal := reflect.ValueOf(field)
@@ -198,21 +209,21 @@ func (ga *GroupAggregator) Add(data interface{}) error {
ga.groups[key] = make(map[string]AggregatorFunction) ga.groups[key] = make(map[string]AggregatorFunction)
} }
// 为每个字段创建聚合器实例 // Create aggregator instances for each field
for outputAlias, agg := range ga.aggregators { for outputAlias, agg := range ga.aggregators {
if _, exists := ga.groups[key][outputAlias]; !exists { if _, exists := ga.groups[key][outputAlias]; !exists {
ga.groups[key][outputAlias] = agg.New() ga.groups[key][outputAlias] = agg.New()
} }
} }
// 处理每个聚合字段 // Process each aggregation field
for _, aggField := range ga.aggregationFields { for _, aggField := range ga.aggregationFields {
outputAlias := aggField.OutputAlias outputAlias := aggField.OutputAlias
if outputAlias == "" { if outputAlias == "" {
outputAlias = aggField.InputField outputAlias = aggField.InputField
} }
// 检查是否有表达式计算器 // Check if there's an expression evaluator
if expr, hasExpr := ga.expressions[outputAlias]; hasExpr { if expr, hasExpr := ga.expressions[outputAlias]; hasExpr {
result, err := expr.evaluateFunc(data) result, err := expr.evaluateFunc(data)
if err != nil { if err != nil {
@@ -227,23 +238,23 @@ func (ga *GroupAggregator) Add(data interface{}) error {
inputField := aggField.InputField inputField := aggField.InputField
// 特殊处理count(*)的情况 // Special handling for count(*) case
if inputField == "*" { if inputField == "*" {
// 对于count(*),直接添加1,不需要获取具体字段值 // For count(*), directly add 1 without getting specific field value
if groupAgg, exists := ga.groups[key][outputAlias]; exists { if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(1) groupAgg.Add(1)
} }
continue continue
} }
// 获取字段值 - 支持嵌套字段 // Get field value - supports nested fields
var fieldVal interface{} var fieldVal interface{}
var found bool var found bool
if fieldpath.IsNestedField(inputField) { if fieldpath.IsNestedField(inputField) {
fieldVal, found = fieldpath.GetNestedField(data, inputField) fieldVal, found = fieldpath.GetNestedField(data, inputField)
} else { } else {
// 原有的字段访问逻辑 // Original field access logic
var f reflect.Value var f reflect.Value
if v.Kind() == reflect.Map { if v.Kind() == reflect.Map {
keyVal := reflect.ValueOf(inputField) keyVal := reflect.ValueOf(inputField)
@@ -259,7 +270,7 @@ func (ga *GroupAggregator) Add(data interface{}) error {
} }
if !found { if !found {
// 尝试从context中获取 // Try to get from context
if ga.context != nil { if ga.context != nil {
if groupAgg, exists := ga.groups[key][outputAlias]; exists { if groupAgg, exists := ga.groups[key][outputAlias]; exists {
if contextAgg, ok := groupAgg.(ContextAggregator); ok { if contextAgg, ok := groupAgg.(ContextAggregator); ok {
@@ -275,9 +286,19 @@ func (ga *GroupAggregator) Add(data interface{}) error {
aggType := aggField.AggregateType aggType := aggField.AggregateType
// 动态检查是否需要数值转换 // Skip nil values for aggregation
if ga.isNumericAggregator(aggType) { if fieldVal == nil {
// 对于数值聚合函数,尝试转换为数值类型 continue
}
// Special handling for Count aggregator - it can handle any type
if aggType == Count {
// Count can handle any non-null value
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(fieldVal)
}
} else if ga.isNumericAggregator(aggType) {
// For numeric aggregation functions, try to convert to numeric type
if numVal, err := cast.ToFloat64E(fieldVal); err == nil { if numVal, err := cast.ToFloat64E(fieldVal); err == nil {
if groupAgg, exists := ga.groups[key][outputAlias]; exists { if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(numVal) groupAgg.Add(numVal)
@@ -286,7 +307,7 @@ func (ga *GroupAggregator) Add(data interface{}) error {
return fmt.Errorf("cannot convert field %s value %v to numeric type for aggregator %s", inputField, fieldVal, aggType) return fmt.Errorf("cannot convert field %s value %v to numeric type for aggregator %s", inputField, fieldVal, aggType)
} }
} else { } else {
// 对于非数值聚合函数,直接传递原始值 // For non-numeric aggregation functions, pass original value directly
if groupAgg, exists := ga.groups[key][outputAlias]; exists { if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(fieldVal) groupAgg.Add(fieldVal)
} }
@@ -299,6 +320,12 @@ func (ga *GroupAggregator) Add(data interface{}) error {
func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error) { func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error) {
ga.mu.RLock() ga.mu.RLock()
defer ga.mu.RUnlock() defer ga.mu.RUnlock()
// 如果既没有分组字段又没有聚合字段,返回空结果
if len(ga.aggregationFields) == 0 && len(ga.groupFields) == 0 {
return []map[string]interface{}{}, nil
}
result := make([]map[string]interface{}, 0, len(ga.groups)) result := make([]map[string]interface{}, 0, len(ga.groups))
for key, aggregators := range ga.groups { for key, aggregators := range ga.groups {
group := make(map[string]interface{}) group := make(map[string]interface{})
File diff suppressed because it is too large Load Diff
+14 -14
View File
@@ -16,7 +16,7 @@ type ExprCondition struct {
} }
func NewExprCondition(expression string) (Condition, error) { func NewExprCondition(expression string) (Condition, error) {
// 添加自定义字符串函数支持(startsWithendsWithcontains是内置操作符) // Add custom string function support (startsWith, endsWith, contains are built-in operators)
options := []expr.Option{ options := []expr.Option{
expr.Function("like_match", func(params ...any) (any, error) { expr.Function("like_match", func(params ...any) (any, error) {
if len(params) != 2 { if len(params) != 2 {
@@ -60,22 +60,22 @@ func (ec *ExprCondition) Evaluate(env interface{}) bool {
return result.(bool) return result.(bool)
} }
// matchesLikePattern 实现LIKE模式匹配 // matchesLikePattern implements LIKE pattern matching
// 支持%(匹配任意字符序列)和_(匹配单个字符) // Supports % (matches any character sequence) and _ (matches single character)
func matchesLikePattern(text, pattern string) bool { func matchesLikePattern(text, pattern string) bool {
return likeMatch(text, pattern, 0, 0) return likeMatch(text, pattern, 0, 0)
} }
// likeMatch 递归实现LIKE匹配算法 // likeMatch recursively implements LIKE matching algorithm
func likeMatch(text, pattern string, textIndex, patternIndex int) bool { func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
// 如果模式已经匹配完成 // If pattern has been fully matched
if patternIndex >= len(pattern) { if patternIndex >= len(pattern) {
return textIndex >= len(text) // 文本也应该匹配完成 return textIndex >= len(text) // Text should also be fully matched
} }
// 如果文本已经结束,但模式还有非%字符,则不匹配 // If text has ended but pattern still has non-% characters, no match
if textIndex >= len(text) { if textIndex >= len(text) {
// 检查剩余的模式是否都是% // Check if remaining pattern characters are all %
for i := patternIndex; i < len(pattern); i++ { for i := patternIndex; i < len(pattern); i++ {
if pattern[i] != '%' { if pattern[i] != '%' {
return false return false
@@ -84,16 +84,16 @@ func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
return true return true
} }
// 处理当前模式字符 // Process current pattern character
patternChar := pattern[patternIndex] patternChar := pattern[patternIndex]
if patternChar == '%' { if patternChar == '%' {
// %可以匹配0个或多个字符 // % can match 0 or more characters
// 尝试匹配0个字符(跳过% // Try matching 0 characters (skip %)
if likeMatch(text, pattern, textIndex, patternIndex+1) { if likeMatch(text, pattern, textIndex, patternIndex+1) {
return true return true
} }
// 尝试匹配1个或多个字符 // Try matching 1 or more characters
for i := textIndex; i < len(text); i++ { for i := textIndex; i < len(text); i++ {
if likeMatch(text, pattern, i+1, patternIndex+1) { if likeMatch(text, pattern, i+1, patternIndex+1) {
return true return true
@@ -101,10 +101,10 @@ func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
} }
return false return false
} else if patternChar == '_' { } else if patternChar == '_' {
// _匹配恰好一个字符 // _ matches exactly one character
return likeMatch(text, pattern, textIndex+1, patternIndex+1) return likeMatch(text, pattern, textIndex+1, patternIndex+1)
} else { } else {
// 普通字符必须精确匹配 // Regular characters must match exactly
if text[textIndex] == patternChar { if text[textIndex] == patternChar {
return likeMatch(text, pattern, textIndex+1, patternIndex+1) return likeMatch(text, pattern, textIndex+1, patternIndex+1)
} }
File diff suppressed because it is too large Load Diff
+111
View File
@@ -0,0 +1,111 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Package condition provides condition evaluation functionality for StreamSQL.
This package implements condition evaluation using the expr-lang library,
supporting complex boolean expressions for filtering and conditional logic.
It provides custom functions for SQL-like operations including LIKE pattern
matching and NULL checking.
# Core Features
Boolean Expression Evaluation - Evaluate complex boolean conditions
LIKE Pattern Matching - SQL-style pattern matching with % and _ wildcards
NULL Checking - Support for IS NULL and IS NOT NULL operations
Custom Functions - Extended function library for SQL compatibility
Type Safety - Automatic type conversion and validation
Performance Optimized - Compiled expressions for fast evaluation
# Condition Interface
Unified interface for condition evaluation:
type Condition interface {
Evaluate(env interface{}) bool
}
# Custom Functions
Built-in SQL-compatible functions:
// LIKE pattern matching
like_match(text, pattern) - SQL LIKE operation with % and _ wildcards
// NULL checking
is_null(value) - Check if value is NULL
is_not_null(value) - Check if value is not NULL
# Usage Examples
Basic condition evaluation:
condition, err := NewExprCondition("age >= 18 AND status == 'active'")
if err != nil {
log.Fatal(err)
}
data := map[string]interface{}{
"age": 25,
"status": "active",
}
result := condition.Evaluate(data) // returns true
LIKE pattern matching:
condition, err := NewExprCondition("like_match(name, 'John%')")
data := map[string]interface{}{"name": "John Smith"}
result := condition.Evaluate(data) // returns true
NULL checking:
condition, err := NewExprCondition("is_not_null(email)")
data := map[string]interface{}{"email": "user@example.com"}
result := condition.Evaluate(data) // returns true
Complex conditions:
condition, err := NewExprCondition(`
age >= 18 AND
like_match(email, '%@company.com') AND
is_not_null(department)
`)
# Pattern Matching
LIKE pattern matching supports:
% - Matches any sequence of characters (including empty)
_ - Matches exactly one character
Examples:
'John%' matches 'John', 'John Smith', 'Johnny'
'J_hn' matches 'John' but not 'Johan'
'%@gmail.com' matches any email ending with @gmail.com
# Integration
Integrates with other StreamSQL components:
Stream package - Data filtering and conditional processing
RSQL package - WHERE and HAVING clause evaluation
Types package - Data type handling and conversion
Expr package - Expression parsing and evaluation
*/
package condition
+57 -56
View File
@@ -15,23 +15,24 @@
*/ */
/* /*
Package streamsql 是一个轻量级的基于 SQL 的物联网边缘流处理引擎 Package streamsql is a lightweight, SQL-based IoT edge stream processing engine.
StreamSQL 提供了高效的无界数据流处理和分析能力支持多种窗口类型聚合函数 StreamSQL provides efficient unbounded data stream processing and analysis capabilities,
自定义函数以及与 RuleGo 生态的无缝集成 supporting multiple window types, aggregate functions, custom functions, and seamless
integration with the RuleGo ecosystem.
# 核心特性 # Core Features
轻量级设计 - 纯内存操作无外部依赖 Lightweight design - Pure in-memory operations, no external dependencies
SQL语法支持 - 使用熟悉的SQL语法处理流数据 SQL syntax support - Process stream data using familiar SQL syntax
多种窗口类型 - 滑动窗口滚动窗口计数窗口会话窗口 Multiple window types - Sliding, tumbling, counting, and session windows
丰富的聚合函数 - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE等 Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
插件式自定义函数 - 运行时动态注册支持8种函数类型 Plugin-based custom functions - Runtime dynamic registration, supports 8 function types
RuleGo生态集成 - 利用RuleGo组件扩展输入输出源 RuleGo ecosystem integration - Extend input/output sources using RuleGo components
# 入门示例 # Getting Started
基本的流数据处理 Basic stream data processing:
package main package main
@@ -43,10 +44,10 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
) )
func main() { func main() {
// 创建StreamSQL实例 // Create StreamSQL instance
ssql := streamsql.New() ssql := streamsql.New()
// 定义SQL查询 - 每5秒按设备ID分组计算温度平均值 // Define SQL query - Calculate temperature average by device ID every 5 seconds
sql := `SELECT deviceId, sql := `SELECT deviceId,
AVG(temperature) as avg_temp, AVG(temperature) as avg_temp,
MIN(humidity) as min_humidity, MIN(humidity) as min_humidity,
@@ -56,18 +57,18 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
WHERE deviceId != 'device3' WHERE deviceId != 'device3'
GROUP BY deviceId, TumblingWindow('5s')` GROUP BY deviceId, TumblingWindow('5s')`
// 执行SQL,创建流处理任务 // Execute SQL, create stream processing task
err := ssql.Execute(sql) err := ssql.Execute(sql)
if err != nil { if err != nil {
panic(err) panic(err)
} }
// 添加结果处理回调 // Add result processing callback
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf("聚合结果: %v\n", result) fmt.Printf("Aggregation result: %v\n", result)
}) })
// 模拟发送流数据 // Simulate sending stream data
go func() { go func() {
ticker := time.NewTicker(1 * time.Second) ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop() defer ticker.Stop()
@@ -75,7 +76,7 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
// 生成随机设备数据 // Generate random device data
data := map[string]interface{}{ data := map[string]interface{}{
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1), "deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1),
"temperature": 20.0 + rand.Float64()*10, "temperature": 20.0 + rand.Float64()*10,
@@ -86,36 +87,36 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
} }
}() }()
// 运行30秒 // Run for 30 seconds
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
} }
# 窗口函数 # Window Functions
StreamSQL 支持多种窗口类型 StreamSQL supports multiple window types:
// 滚动窗口 - 每5秒一个独立窗口 // Tumbling window - Independent window every 5 seconds
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s') SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')
// 滑动窗口 - 窗口大小30秒,每10秒滑动一次 // Sliding window - 30-second window size, slides every 10 seconds
SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s') SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')
// 计数窗口 - 每100条记录一个窗口 // Counting window - One window per 100 records
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100) SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)
// 会话窗口 - 超时5分钟自动关闭会话 // Session window - Automatically closes session after 5-minute timeout
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m') SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
# 自定义函数 # Custom Functions
StreamSQL 支持插件式自定义函数运行时动态注册 StreamSQL supports plugin-based custom functions with runtime dynamic registration:
// 注册温度转换函数 // Register temperature conversion function
functions.RegisterCustomFunction( functions.RegisterCustomFunction(
"fahrenheit_to_celsius", "fahrenheit_to_celsius",
functions.TypeConversion, functions.TypeConversion,
"温度转换", "Temperature conversion",
"华氏度转摄氏度", "Fahrenheit to Celsius",
1, 1, 1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) { func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
f, _ := functions.ConvertToFloat64(args[0]) f, _ := functions.ConvertToFloat64(args[0])
@@ -123,55 +124,55 @@ StreamSQL 支持插件式自定义函数,运行时动态注册:
}, },
) )
// 立即在SQL中使用 // Use immediately in SQL
sql := `SELECT deviceId, sql := `SELECT deviceId,
AVG(fahrenheit_to_celsius(temperature)) as avg_celsius AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
FROM stream GROUP BY deviceId, TumblingWindow('5s')` FROM stream GROUP BY deviceId, TumblingWindow('5s')`
支持的自定义函数类型 Supported custom function types:
TypeMath - 数学计算函数 TypeMath - Mathematical calculation functions
TypeString - 字符串处理函数 TypeString - String processing functions
TypeConversion - 类型转换函数 TypeConversion - Type conversion functions
TypeDateTime - 时间日期函数 TypeDateTime - Date and time functions
TypeAggregation - 聚合函数 TypeAggregation - Aggregate functions
TypeAnalytical - 分析函数 TypeAnalytical - Analytical functions
TypeWindow - 窗口函数 TypeWindow - Window functions
TypeCustom - 通用自定义函数 TypeCustom - General custom functions
# 日志配置 # Log Configuration
StreamSQL 提供灵活的日志配置选项 StreamSQL provides flexible log configuration options:
// 设置日志级别 // Set log level
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG)) ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
// 输出到文件 // Output to file
logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO)) ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))
// 禁用日志(生产环境) // Disable logging (production environment)
ssql := streamsql.New(streamsql.WithDiscardLog()) ssql := streamsql.New(streamsql.WithDiscardLog())
# 与RuleGo集成 # RuleGo Integration
StreamSQL提供了与RuleGo规则引擎的深度集成通过两个专用组件实现流式数据处理 StreamSQL provides deep integration with the RuleGo rule engine through two dedicated components for stream data processing:
streamTransform (x/streamTransform) - 流转换器处理非聚合SQL查询 streamTransform (x/streamTransform) - Stream transformer, handles non-aggregation SQL queries
streamAggregator (x/streamAggregator) - 流聚合器处理聚合SQL查询 streamAggregator (x/streamAggregator) - Stream aggregator, handles aggregation SQL queries
基本集成示例 Basic integration example:
package main package main
import ( import (
"github.com/rulego/rulego" "github.com/rulego/rulego"
"github.com/rulego/rulego/api/types" "github.com/rulego/rulego/api/types"
// 注册StreamSQL组件 // Register StreamSQL components
_ "github.com/rulego/rulego-components/external/streamsql" _ "github.com/rulego/rulego-components/external/streamsql"
) )
func main() { func main() {
// 规则链配置 // Rule chain configuration
ruleChainJson := `{ ruleChainJson := `{
"ruleChain": {"id": "rule01"}, "ruleChain": {"id": "rule01"},
"metadata": { "metadata": {
@@ -196,10 +197,10 @@ StreamSQL提供了与RuleGo规则引擎的深度集成,通过两个专用组
} }
}` }`
// 创建规则引擎 // Create rule engine
ruleEngine, _ := rulego.New("rule01", []byte(ruleChainJson)) ruleEngine, _ := rulego.New("rule01", []byte(ruleChainJson))
// 发送数据 // Send data
data := `{"deviceId":"sensor01","temperature":25.5}` data := `{"deviceId":"sensor01","temperature":25.5}`
msg := types.NewMsg(0, "TELEMETRY", types.JSON, types.NewMetadata(), data) msg := types.NewMsg(0, "TELEMETRY", types.JSON, types.NewMetadata(), data)
ruleEngine.OnMsg(msg) ruleEngine.OnMsg(msg)
+1 -1
View File
@@ -51,7 +51,7 @@ func main() {
fmt.Println("✓ SQL执行成功") fmt.Println("✓ SQL执行成功")
// 5. 添加结果监听器 // 5. 添加结果监听器
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf("📊 聚合结果: %v\n", result) fmt.Printf("📊 聚合结果: %v\n", result)
}) })
+47 -53
View File
@@ -83,19 +83,17 @@ func demonstrateArrayAccess(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" 📋 数组索引访问结果:") fmt.Println(" 📋 数组索引访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { for i, item := range result {
for i, item := range resultSlice { fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 记录 %d:\n", i+1) fmt.Printf(" 设备: %v\n", item["device"])
fmt.Printf(" 设备: %v\n", item["device"]) fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
fmt.Printf(" 第个传感器度: %v°C\n", item["first_sensor_temp"]) fmt.Printf(" 第个传感器湿度: %v%%\n", item["second_sensor_humidity"])
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"]) fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"]) fmt.Println()
fmt.Println()
}
} }
}) })
@@ -168,20 +166,19 @@ func demonstrateMapKeyAccess(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" 🗝️ Map键访问结果:") fmt.Println(" 🗝️ Map键访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1) fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备ID: %v\n", item["device_id"]) fmt.Printf(" 设备ID: %v\n", item["device_id"])
fmt.Printf(" 服务器主机: %v\n", item["server_host"]) fmt.Printf(" 服务器主机: %v\n", item["server_host"])
fmt.Printf(" 服务器端口: %v\n", item["server_port"]) fmt.Printf(" 服务器端口: %v\n", item["server_port"])
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"]) fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
fmt.Printf(" 应用版本: %v\n", item["app_version"]) fmt.Printf(" 应用版本: %v\n", item["app_version"])
fmt.Println() fmt.Println()
}
} }
}) })
@@ -266,20 +263,19 @@ func demonstrateComplexMixedAccess(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" 🔄 混合复杂访问结果:") fmt.Println(" 🔄 混合复杂访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1) fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 建筑: %v\n", item["building"]) fmt.Printf(" 建筑: %v\n", item["building"])
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"]) fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"]) fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
fmt.Printf(" 建筑师: %v\n", item["building_architect"]) fmt.Printf(" 建筑师: %v\n", item["building_architect"])
fmt.Printf(" 最新警报: %v\n", item["latest_alert"]) fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
fmt.Println() fmt.Println()
}
} }
}) })
@@ -325,19 +321,18 @@ func demonstrateNegativeIndexAccess(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" ⬅️ 负数索引访问结果:") fmt.Println(" ⬅️ 负数索引访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1) fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"]) fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 最新读数: %v\n", item["latest_reading"]) fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"]) fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"]) fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
fmt.Println() fmt.Println()
}
} }
}) })
@@ -372,20 +367,19 @@ func demonstrateArrayIndexAggregation(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" 📈 数组索引聚合计算结果:") fmt.Println(" 📈 数组索引聚合计算结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
resultCount++ resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1) fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["location"]) fmt.Printf(" 位置: %v\n", item["location"])
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"]) fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"]) fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
fmt.Printf(" 设备数量: %v\n", item["device_count"]) fmt.Printf(" 设备数量: %v\n", item["device_count"])
fmt.Println() fmt.Println()
}
} }
}) })
+6 -6
View File
@@ -127,7 +127,7 @@ func testBasicFiltering() {
} }
// 添加结果处理函数 // 添加结果处理函数
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 高温告警: %v\n", result) fmt.Printf(" 📊 高温告警: %v\n", result)
}) })
@@ -172,7 +172,7 @@ func testAggregation() {
} }
// 处理聚合结果 // 处理聚合结果
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 聚合结果: %v\n", result) fmt.Printf(" 📊 聚合结果: %v\n", result)
}) })
@@ -221,7 +221,7 @@ func testSlidingWindow() {
return return
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 滑动窗口分析: %v\n", result) fmt.Printf(" 📊 滑动窗口分析: %v\n", result)
}) })
@@ -262,7 +262,7 @@ func testNestedFields() {
return return
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 嵌套字段结果: %v\n", result) fmt.Printf(" 📊 嵌套字段结果: %v\n", result)
}) })
@@ -336,7 +336,7 @@ func testCustomFunctions() {
return return
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 自定义函数结果: %v\n", result) fmt.Printf(" 📊 自定义函数结果: %v\n", result)
}) })
@@ -396,7 +396,7 @@ func testComplexQuery() {
return return
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 复杂查询结果: %v\n", result) fmt.Printf(" 📊 复杂查询结果: %v\n", result)
}) })
+18 -18
View File
@@ -609,14 +609,14 @@ func testMathFunctions(ssql *streamsql.Streamsql) {
} }
// 添加测试数据 // 添加测试数据
testData := []interface{}{ testData := []map[string]interface{}{
map[string]interface{}{ {
"device": "sensor1", "device": "sensor1",
"temperature": 68.0, // 华氏度 "temperature": 68.0, // 华氏度
"radius": 5.0, "radius": 5.0,
"x1": 0.0, "y1": 0.0, "x2": 3.0, "y2": 4.0, // 距离=5 "x1": 0.0, "y1": 0.0, "x2": 3.0, "y2": 4.0, // 距离=5
}, },
map[string]interface{}{ {
"device": "sensor1", "device": "sensor1",
"temperature": 86.0, // 华氏度 "temperature": 86.0, // 华氏度
"radius": 10.0, "radius": 10.0,
@@ -625,7 +625,7 @@ func testMathFunctions(ssql *streamsql.Streamsql) {
} }
// 添加结果监听器 // 添加结果监听器
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 数学函数结果: %v\n", result) fmt.Printf(" 📊 数学函数结果: %v\n", result)
}) })
@@ -659,20 +659,20 @@ func testStringFunctions(ssql *streamsql.Streamsql) {
} }
// 添加测试数据 // 添加测试数据
testData := []interface{}{ testData := []map[string]interface{}{
map[string]interface{}{ {
"device": "sensor1", "device": "sensor1",
"metadata": `{"version":"1.0","type":"temperature"}`, "metadata": `{"version":"1.0","type":"temperature"}`,
"level": 3, "level": 3,
}, },
map[string]interface{}{ {
"device": "sensor2", "device": "sensor2",
"metadata": `{"version":"2.0","type":"humidity"}`, "metadata": `{"version":"2.0","type":"humidity"}`,
"level": 5, "level": 5,
}, },
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 字符串函数结果: %v\n", result) fmt.Printf(" 📊 字符串函数结果: %v\n", result)
}) })
@@ -702,20 +702,20 @@ func testConversionFunctions(ssql *streamsql.Streamsql) {
} }
// 添加测试数据 // 添加测试数据
testData := []interface{}{ testData := []map[string]interface{}{
map[string]interface{}{ {
"device": "server1", "device": "server1",
"client_ip": "192.168.1.100", "client_ip": "192.168.1.100",
"memory_usage": 1073741824, // 1GB "memory_usage": 1073741824, // 1GB
}, },
map[string]interface{}{ {
"device": "server2", "device": "server2",
"client_ip": "10.0.0.50", "client_ip": "10.0.0.50",
"memory_usage": 2147483648, // 2GB "memory_usage": 2147483648, // 2GB
}, },
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 转换函数结果: %v\n", result) fmt.Printf(" 📊 转换函数结果: %v\n", result)
}) })
@@ -746,14 +746,14 @@ func testAggregateFunctions(ssql *streamsql.Streamsql) {
} }
// 添加测试数据 // 添加测试数据
testData := []interface{}{ testData := []map[string]interface{}{
map[string]interface{}{"device": "sensor1", "value": 2.0, "category": "A"}, {"device": "sensor1", "value": 2.0, "category": "A"},
map[string]interface{}{"device": "sensor1", "value": 8.0, "category": "A"}, {"device": "sensor1", "value": 8.0, "category": "A"},
map[string]interface{}{"device": "sensor1", "value": 32.0, "category": "B"}, {"device": "sensor1", "value": 32.0, "category": "B"},
map[string]interface{}{"device": "sensor1", "value": 128.0, "category": "A"}, {"device": "sensor1", "value": 128.0, "category": "A"},
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 聚合函数结果: %v\n", result) fmt.Printf(" 📊 聚合函数结果: %v\n", result)
}) })
+67 -74
View File
@@ -119,19 +119,18 @@ func demonstrateBasicNestedAccess(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" 📋 基础嵌套字段访问结果:") fmt.Println(" 📋 基础嵌套字段访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1) fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"]) fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 设备位置: %v\n", item["device.location"]) fmt.Printf(" 设备位置: %v\n", item["device.location"])
fmt.Printf(" 温度: %v°C\n", item["sensor.temperature"]) fmt.Printf(" 温度: %v°C\n", item["sensor.temperature"])
fmt.Printf(" 湿度: %v%%\n", item["sensor.humidity"]) fmt.Printf(" 湿度: %v%%\n", item["sensor.humidity"])
fmt.Println() fmt.Println()
}
} }
}) })
@@ -166,20 +165,19 @@ func demonstrateNestedAggregation(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" 📈 嵌套字段聚合结果:") fmt.Println(" 📈 嵌套字段聚合结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
resultCount++ resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1) fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["device.location"]) fmt.Printf(" 位置: %v\n", item["device.location"])
fmt.Printf(" 平均温度: %.2f°C\n", item["avg_temp"]) fmt.Printf(" 平均温度: %.2f°C\n", item["avg_temp"])
fmt.Printf(" 最大湿度: %.1f%%\n", item["max_humidity"]) fmt.Printf(" 最大湿度: %.1f%%\n", item["max_humidity"])
fmt.Printf(" 传感器数量: %v\n", item["sensor_count"]) fmt.Printf(" 传感器数量: %v\n", item["sensor_count"])
fmt.Println() fmt.Println()
}
} }
}) })
@@ -271,19 +269,18 @@ func demonstrateArrayAccess(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" 📋 数组索引访问结果:") fmt.Println(" 📋 数组索引访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1) fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备: %v\n", item["device"]) fmt.Printf(" 设备: %v\n", item["device"])
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"]) fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"]) fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"]) fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
fmt.Println() fmt.Println()
}
} }
}) })
@@ -356,20 +353,19 @@ func demonstrateMapKeyAccess(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" 🗝️ Map键访问结果:") fmt.Println(" 🗝️ Map键访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1) fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备ID: %v\n", item["device_id"]) fmt.Printf(" 设备ID: %v\n", item["device_id"])
fmt.Printf(" 服务器主机: %v\n", item["server_host"]) fmt.Printf(" 服务器主机: %v\n", item["server_host"])
fmt.Printf(" 服务器端口: %v\n", item["server_port"]) fmt.Printf(" 服务器端口: %v\n", item["server_port"])
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"]) fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
fmt.Printf(" 应用版本: %v\n", item["app_version"]) fmt.Printf(" 应用版本: %v\n", item["app_version"])
fmt.Println() fmt.Println()
}
} }
}) })
@@ -454,20 +450,19 @@ func demonstrateComplexMixedAccess(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" 🔄 混合复杂访问结果:") fmt.Println(" 🔄 混合复杂访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1) fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 建筑: %v\n", item["building"]) fmt.Printf(" 建筑: %v\n", item["building"])
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"]) fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"]) fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
fmt.Printf(" 建筑师: %v\n", item["building_architect"]) fmt.Printf(" 建筑师: %v\n", item["building_architect"])
fmt.Printf(" 最新警报: %v\n", item["latest_alert"]) fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
fmt.Println() fmt.Println()
}
} }
}) })
@@ -513,19 +508,18 @@ func demonstrateNegativeIndexAccess(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" ⬅️ 负数索引访问结果:") fmt.Println(" ⬅️ 负数索引访问结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
fmt.Printf(" 记录 %d:\n", i+1) fmt.Printf(" 记录 %d:\n", i+1)
fmt.Printf(" 设备名称: %v\n", item["device_name"]) fmt.Printf(" 设备名称: %v\n", item["device_name"])
fmt.Printf(" 最新读数: %v\n", item["latest_reading"]) fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"]) fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"]) fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
fmt.Println() fmt.Println()
}
} }
}) })
@@ -560,20 +554,19 @@ func demonstrateArrayIndexAggregation(ssql *streamsql.Streamsql) {
wg.Add(1) wg.Add(1)
// 设置结果回调 // 设置结果回调
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
defer wg.Done() defer wg.Done()
fmt.Println(" 📈 数组索引聚合计算结果:") fmt.Println(" 📈 数组索引聚合计算结果:")
if resultSlice, ok := result.([]map[string]interface{}); ok { resultSlice := result
for i, item := range resultSlice { for i, item := range resultSlice {
resultCount++ resultCount++
fmt.Printf(" 聚合结果 %d:\n", i+1) fmt.Printf(" 聚合结果 %d:\n", i+1)
fmt.Printf(" 位置: %v\n", item["location"]) fmt.Printf(" 位置: %v\n", item["location"])
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"]) fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"]) fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
fmt.Printf(" 设备数量: %v\n", item["device_count"]) fmt.Printf(" 设备数量: %v\n", item["device_count"])
fmt.Println() fmt.Println()
}
} }
}) })
+6 -6
View File
@@ -62,7 +62,7 @@ func demonstrateDataCleaning() {
} }
// 结果处理 // 结果处理
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 清洗后数据: %+v\n", result) fmt.Printf(" 清洗后数据: %+v\n", result)
}) })
@@ -103,7 +103,7 @@ func demonstrateDataEnrichment() {
panic(err) panic(err)
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 富化后数据: %+v\n", result) fmt.Printf(" 富化后数据: %+v\n", result)
}) })
@@ -147,7 +147,7 @@ func demonstrateRealTimeAlerting() {
panic(err) panic(err)
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 🚨 告警事件: %+v\n", result) fmt.Printf(" 🚨 告警事件: %+v\n", result)
}) })
@@ -191,7 +191,7 @@ func demonstrateDataFormatConversion() {
panic(err) panic(err)
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 格式转换结果: %+v\n", result) fmt.Printf(" 格式转换结果: %+v\n", result)
}) })
@@ -230,7 +230,7 @@ func demonstrateDataRouting() {
panic(err) panic(err)
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 路由结果: %+v\n", result) fmt.Printf(" 路由结果: %+v\n", result)
}) })
@@ -273,7 +273,7 @@ func demonstrateNestedFieldProcessing() {
panic(err) panic(err)
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 嵌套字段处理结果: %+v\n", result) fmt.Printf(" 嵌套字段处理结果: %+v\n", result)
}) })
+26 -36
View File
@@ -35,11 +35,9 @@ func demo1() {
panic(err) panic(err)
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
if results, ok := result.([]map[string]interface{}); ok { for _, data := range result {
for _, data := range results { fmt.Printf("发现空值数据: %+v\n", data)
fmt.Printf("发现空值数据: %+v\n", data)
}
} }
}) })
@@ -75,11 +73,9 @@ func demo2() {
panic(err) panic(err)
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
if results, ok := result.([]map[string]interface{}); ok { for _, data := range result {
for _, data := range results { fmt.Printf("发现有效数据: %+v\n", data)
fmt.Printf("发现有效数据: %+v\n", data)
}
} }
}) })
@@ -115,16 +111,14 @@ func demo3() {
panic(err) panic(err)
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
if results, ok := result.([]map[string]interface{}); ok { for _, data := range result {
for _, data := range results { status := data["status"]
status := data["status"] value := data["value"]
value := data["value"] if status != nil {
if status != nil { fmt.Printf("状态非空的数据: %+v\n", data)
fmt.Printf("状态非空的数据: %+v\n", data) } else if value == nil {
} else if value == nil { fmt.Printf("值为空的数据: %+v\n", data)
fmt.Printf("值为空的数据: %+v\n", data)
}
} }
} }
}) })
@@ -162,18 +156,16 @@ func demo4() {
panic(err) panic(err)
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
if results, ok := result.([]map[string]interface{}); ok { for _, data := range result {
for _, data := range results { value := data["value"]
value := data["value"] status := data["status"]
status := data["status"] priority := data["priority"]
priority := data["priority"]
if value != nil && value.(float64) > 20 { if value != nil && value.(float64) > 20 {
fmt.Printf("高值数据 (value > 20): %+v\n", data) fmt.Printf("高值数据 (value > 20): %+v\n", data)
} else if status == nil && priority != nil { } else if status == nil && priority != nil {
fmt.Printf("状态异常但有优先级的数据: %+v\n", data) fmt.Printf("状态异常但有优先级的数据: %+v\n", data)
}
} }
} }
}) })
@@ -212,11 +204,9 @@ func demo5() {
panic(err) panic(err)
} }
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
if results, ok := result.([]map[string]interface{}); ok { for _, data := range result {
for _, data := range results { fmt.Printf("有位置信息的设备: %+v\n", data)
fmt.Printf("有位置信息的设备: %+v\n", data)
}
} }
}) })
-233
View File
@@ -1,233 +0,0 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/rulego/streamsql/stream"
"github.com/rulego/streamsql/types"
)
func main() {
fmt.Println("=== StreamSQL 持久化功能测试 ===")
// 清理之前的测试数据
cleanupTestData()
// 测试1: 创建持久化流并模拟数据溢出
fmt.Println("📌 测试1: 数据溢出持久化")
testDataOverflowPersistence()
// 测试2: 模拟程序重启和数据恢复
fmt.Println("📌 测试2: 程序重启数据恢复")
testDataRecovery()
// 测试3: 查看持久化文件内容
fmt.Println("📌 测试3: 持久化文件分析")
analyzePersistenceFiles()
fmt.Println("✅ 真正持久化功能测试完成!")
}
func testDataOverflowPersistence() {
config := types.Config{
SimpleFields: []string{"id", "value"},
}
// 创建小缓冲区的持久化流处理器
stream, err := stream.NewStreamWithLossPolicy(
config,
100, // 很小的缓冲区,容易溢出
100, // 小结果缓冲区
50, // 小sink池
"persist", // 持久化策略
5*time.Second,
)
if err != nil {
fmt.Printf("创建流失败: %v\n", err)
return
}
stream.Start()
// 快速发送大量数据,触发溢出
inputCount := 1000
fmt.Printf("快速发送 %d 条数据到小缓冲区 (容量100)...\n", inputCount)
start := time.Now()
for i := 0; i < inputCount; i++ {
data := map[string]interface{}{
"id": i,
"value": fmt.Sprintf("data_%d", i),
}
stream.Emit(data)
}
duration := time.Since(start)
// 等待持久化完成
fmt.Println("等待持久化操作完成...")
time.Sleep(8 * time.Second)
// 获取统计信息
stats := stream.GetDetailedStats()
persistStats := stream.GetPersistenceStats()
fmt.Printf("⏱️ 发送耗时: %v\n", duration)
fmt.Printf("📊 输入数据: %d\n", stats["basic_stats"].(map[string]int64)["input_count"])
fmt.Printf("📊 处理数据: %d\n", stats["basic_stats"].(map[string]int64)["output_count"])
fmt.Printf("📊 通道容量: %d\n", stats["basic_stats"].(map[string]int64)["data_chan_cap"])
fmt.Printf("📊 持久化启用: %v\n", persistStats["enabled"])
fmt.Printf("📊 待写入数据: %v\n", persistStats["pending_count"])
fmt.Printf("📊 当前文件大小: %v bytes\n", persistStats["current_file_size"])
fmt.Printf("📊 文件索引: %v\n", persistStats["file_index"])
stream.Stop()
}
func testDataRecovery() {
config := types.Config{
SimpleFields: []string{"id", "value"},
}
// 创建新的持久化流处理器(模拟程序重启)
stream, err := stream.NewStreamWithLossPolicy(
config,
200, // 更大的缓冲区用于恢复
200,
100,
"persist", // 持久化策略
5*time.Second,
)
if err != nil {
fmt.Printf("创建流失败: %v\n", err)
return
}
stream.Start()
// 添加sink来接收恢复的数据
recoveredCount := 0
stream.AddSink(func(data interface{}) {
recoveredCount++
if recoveredCount <= 5 {
fmt.Printf("恢复数据 %d: %+v\n", recoveredCount, data)
}
})
// 尝试加载并重新处理持久化数据
fmt.Println("尝试加载持久化数据...")
if err := stream.LoadAndReprocessPersistedData(); err != nil {
fmt.Printf("数据恢复失败: %v\n", err)
}
// 等待处理完成
time.Sleep(3 * time.Second)
stats := stream.GetDetailedStats()
fmt.Printf("📊 恢复后处理数据: %d\n", stats["basic_stats"].(map[string]int64)["output_count"])
fmt.Printf("📊 接收到的恢复数据: %d\n", recoveredCount)
stream.Stop()
}
func analyzePersistenceFiles() {
dataDir := "./streamsql_overflow_data"
// 检查持久化目录
if _, err := os.Stat(dataDir); os.IsNotExist(err) {
fmt.Println("持久化目录不存在")
return
}
// 列出所有持久化文件
files, err := filepath.Glob(filepath.Join(dataDir, "streamsql_overflow_*.log"))
if err != nil {
fmt.Printf("读取持久化文件失败: %v\n", err)
return
}
if len(files) == 0 {
fmt.Println("没有找到持久化文件(可能已被恢复过程删除)")
return
}
fmt.Printf("发现 %d 个持久化文件:\n", len(files))
for i, file := range files {
info, err := os.Stat(file)
if err != nil {
fmt.Printf(" %d. %s (无法读取文件信息)\n", i+1, filepath.Base(file))
continue
}
fmt.Printf(" %d. %s (大小: %d bytes, 修改时间: %s)\n",
i+1, filepath.Base(file), info.Size(), info.ModTime().Format("15:04:05"))
}
// 读取第一个文件的前几行内容
if len(files) > 0 {
fmt.Printf("\n第一个文件的前3行内容:\n")
showFileContent(files[0], 3)
}
}
func showFileContent(filename string, maxLines int) {
file, err := os.Open(filename)
if err != nil {
fmt.Printf("无法打开文件: %v\n", err)
return
}
defer file.Close()
buffer := make([]byte, 1024)
n, err := file.Read(buffer)
if err != nil {
fmt.Printf("无法读取文件: %v\n", err)
return
}
content := string(buffer[:n])
lines := []rune(content)
lineCount := 0
currentLine := ""
for _, char := range lines {
if char == '\n' {
lineCount++
fmt.Printf(" %d: %s\n", lineCount, currentLine)
currentLine = ""
if lineCount >= maxLines {
break
}
} else {
currentLine += string(char)
}
}
if currentLine != "" && lineCount < maxLines {
fmt.Printf(" %d: %s\n", lineCount+1, currentLine)
}
}
func cleanupTestData() {
dataDir := "./streamsql_overflow_data"
if err := os.RemoveAll(dataDir); err != nil {
fmt.Printf("清理测试数据失败: %v\n", err)
}
}
+9 -9
View File
@@ -122,19 +122,19 @@ func testSimpleQuery(ssql *streamsql.Streamsql) {
} }
// 添加结果监听器 // 添加结果监听器
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 简单查询结果: %v\n", result) fmt.Printf(" 📊 简单查询结果: %v\n", result)
}) })
// 添加测试数据 // 添加测试数据
testData := []interface{}{ testData := []map[string]interface{}{
map[string]interface{}{ {
"device": "sensor1", "device": "sensor1",
"value": 5.0, "value": 5.0,
"temperature": 68.0, // 华氏度 "temperature": 68.0, // 华氏度
"radius": 3.0, "radius": 3.0,
}, },
map[string]interface{}{ {
"device": "sensor2", "device": "sensor2",
"value": 10.0, "value": 10.0,
"temperature": 86.0, // 华氏度 "temperature": 86.0, // 华氏度
@@ -171,25 +171,25 @@ func testAggregateQuery(ssql *streamsql.Streamsql) {
} }
// 添加结果监听器 // 添加结果监听器
ssql.AddSink(func(result interface{}) { ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 聚合查询结果: %v\n", result) fmt.Printf(" 📊 聚合查询结果: %v\n", result)
}) })
// 添加测试数据 // 添加测试数据
testData := []interface{}{ testData := []map[string]interface{}{
map[string]interface{}{ {
"device": "sensor1", "device": "sensor1",
"value": 3.0, "value": 3.0,
"temperature": 32.0, // 0°C "temperature": 32.0, // 0°C
"radius": 1.0, "radius": 1.0,
}, },
map[string]interface{}{ {
"device": "sensor1", "device": "sensor1",
"value": 4.0, "value": 4.0,
"temperature": 212.0, // 100°C "temperature": 212.0, // 100°C
"radius": 2.0, "radius": 2.0,
}, },
map[string]interface{}{ {
"device": "sensor2", "device": "sensor2",
"value": 5.0, "value": 5.0,
"temperature": 68.0, // 20°C "temperature": 68.0, // 20°C
-218
View File
@@ -1,218 +0,0 @@
package main
import (
"fmt"
"log"
"strings"
"time"
"github.com/rulego/streamsql/stream"
"github.com/rulego/streamsql/types"
)
func main() {
fmt.Println("=== StreamSQL 统一配置系统演示 ===")
// 1. 使用新的配置API创建默认配置Stream
fmt.Println("\n1. 默认配置Stream:")
defaultConfig := types.NewConfig()
defaultConfig.SimpleFields = []string{"temperature", "humidity", "location"}
defaultStream, err := stream.NewStream(defaultConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("默认配置", defaultStream)
// 2. 使用高性能预设配置
fmt.Println("\n2. 高性能配置Stream:")
highPerfConfig := types.NewConfigWithPerformance(types.HighPerformanceConfig())
highPerfConfig.SimpleFields = []string{"temperature", "humidity", "location"}
highPerfStream, err := stream.NewStreamWithHighPerformance(highPerfConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("高性能配置", highPerfStream)
// 3. 使用低延迟预设配置
fmt.Println("\n3. 低延迟配置Stream:")
lowLatencyConfig := types.NewConfigWithPerformance(types.LowLatencyConfig())
lowLatencyConfig.SimpleFields = []string{"temperature", "humidity", "location"}
lowLatencyStream, err := stream.NewStreamWithLowLatency(lowLatencyConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("低延迟配置", lowLatencyStream)
// 4. 使用零数据丢失预设配置
fmt.Println("\n4. 零数据丢失配置Stream:")
zeroLossConfig := types.NewConfigWithPerformance(types.ZeroDataLossConfig())
zeroLossConfig.SimpleFields = []string{"temperature", "humidity", "location"}
zeroLossStream, err := stream.NewStreamWithZeroDataLoss(zeroLossConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("零数据丢失配置", zeroLossStream)
// 5. 使用持久化预设配置
fmt.Println("\n5. 持久化配置Stream:")
persistConfig := types.NewConfigWithPerformance(types.PersistencePerformanceConfig())
persistConfig.SimpleFields = []string{"temperature", "humidity", "location"}
persistStream, err := stream.NewStreamWithCustomPerformance(persistConfig, types.PersistencePerformanceConfig())
if err != nil {
log.Fatal(err)
}
printStreamStats("持久化配置", persistStream)
// 6. 创建完全自定义的配置
fmt.Println("\n6. 自定义配置Stream:")
customPerfConfig := types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 30000,
ResultChannelSize: 25000,
WindowOutputSize: 3000,
EnableDynamicResize: true,
MaxBufferSize: 200000,
UsageThreshold: 0.85,
},
OverflowConfig: types.OverflowConfig{
Strategy: "expand",
BlockTimeout: 15 * time.Second,
AllowDataLoss: false,
ExpansionConfig: types.ExpansionConfig{
GrowthFactor: 2.0,
MinIncrement: 2000,
TriggerThreshold: 0.9,
ExpansionTimeout: 3 * time.Second,
},
},
WorkerConfig: types.WorkerConfig{
SinkPoolSize: 800,
SinkWorkerCount: 12,
MaxRetryRoutines: 10,
},
MonitoringConfig: types.MonitoringConfig{
EnableMonitoring: true,
StatsUpdateInterval: 500 * time.Millisecond,
EnableDetailedStats: true,
WarningThresholds: types.WarningThresholds{
DropRateWarning: 5.0,
DropRateCritical: 15.0,
BufferUsageWarning: 75.0,
BufferUsageCritical: 90.0,
},
},
}
customConfig := types.NewConfigWithPerformance(customPerfConfig)
customConfig.SimpleFields = []string{"temperature", "humidity", "location"}
customStream, err := stream.NewStreamWithCustomPerformance(customConfig, customPerfConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("自定义配置", customStream)
// 7. 配置比较演示
fmt.Println("\n7. 配置比较:")
compareConfigurations()
// 8. 实时数据处理演示
fmt.Println("\n8. 实时数据处理演示:")
demonstrateRealTimeProcessing(defaultStream)
// 9. 窗口统一配置演示
fmt.Println("\n9. 窗口统一配置演示:")
demonstrateWindowConfig()
// 清理资源
fmt.Println("\n10. 清理资源...")
defaultStream.Stop()
highPerfStream.Stop()
lowLatencyStream.Stop()
zeroLossStream.Stop()
persistStream.Stop()
customStream.Stop()
fmt.Println("\n=== 演示完成 ===")
}
func printStreamStats(name string, s *stream.Stream) {
stats := s.GetStats()
detailedStats := s.GetDetailedStats()
fmt.Printf("【%s】统计信息:\n", name)
fmt.Printf(" 数据通道: %d/%d (使用率: %.1f%%)\n",
stats["data_chan_len"], stats["data_chan_cap"],
detailedStats["data_chan_usage"])
fmt.Printf(" 结果通道: %d/%d (使用率: %.1f%%)\n",
stats["result_chan_len"], stats["result_chan_cap"],
detailedStats["result_chan_usage"])
fmt.Printf(" 工作池: %d/%d (使用率: %.1f%%)\n",
stats["sink_pool_len"], stats["sink_pool_cap"],
detailedStats["sink_pool_usage"])
fmt.Printf(" 性能等级: %s\n", detailedStats["performance_level"])
}
func compareConfigurations() {
configs := map[string]types.PerformanceConfig{
"默认配置": types.DefaultPerformanceConfig(),
"高性能配置": types.HighPerformanceConfig(),
"低延迟配置": types.LowLatencyConfig(),
"零丢失配置": types.ZeroDataLossConfig(),
"持久化配置": types.PersistencePerformanceConfig(),
}
fmt.Printf("%-12s %-10s %-10s %-10s %-10s %-15s\n",
"配置类型", "数据缓冲", "结果缓冲", "工作池", "工作线程", "溢出策略")
fmt.Println(strings.Repeat("-", 75))
for name, config := range configs {
fmt.Printf("%-12s %-10d %-10d %-10d %-10d %-15s\n",
name,
config.BufferConfig.DataChannelSize,
config.BufferConfig.ResultChannelSize,
config.WorkerConfig.SinkPoolSize,
config.WorkerConfig.SinkWorkerCount,
config.OverflowConfig.Strategy)
}
}
func demonstrateRealTimeProcessing(s *stream.Stream) {
// 设置数据接收器
s.AddSink(func(data interface{}) {
fmt.Printf(" 接收到处理结果: %v\n", data)
})
// 启动流处理
s.Start()
// 模拟发送数据
for i := 0; i < 3; i++ {
data := map[string]interface{}{
"temperature": 20.0 + float64(i)*2.5,
"humidity": 60.0 + float64(i)*5,
"location": fmt.Sprintf("sensor_%d", i+1),
"timestamp": time.Now().Unix(),
}
fmt.Printf(" 发送数据: %v\n", data)
s.Emit(data)
time.Sleep(100 * time.Millisecond)
}
// 等待处理完成
time.Sleep(200 * time.Millisecond)
// 显示最终统计
finalStats := s.GetDetailedStats()
fmt.Printf(" 最终统计 - 输入: %d, 输出: %d, 丢弃: %d, 处理率: %.1f%%\n",
finalStats["basic_stats"].(map[string]int64)["input_count"],
finalStats["basic_stats"].(map[string]int64)["output_count"],
finalStats["basic_stats"].(map[string]int64)["dropped_count"],
finalStats["process_rate"])
}
@@ -1,74 +0,0 @@
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/types"
)
// demonstrateWindowConfig 演示窗口统一配置的使用
func demonstrateWindowConfig() {
fmt.Println("=== 窗口统一配置演示 ===")
// 1. 测试默认配置的窗口
fmt.Println("\n1. 默认配置窗口测试")
testWindowWithConfig("默认配置", streamsql.New())
// 2. 测试高性能配置的窗口
fmt.Println("\n2. 高性能配置窗口测试")
testWindowWithConfig("高性能配置", streamsql.New(streamsql.WithHighPerformance()))
// 3. 测试低延迟配置的窗口
fmt.Println("\n3. 低延迟配置窗口测试")
testWindowWithConfig("低延迟配置", streamsql.New(streamsql.WithLowLatency()))
// 4. 测试自定义配置的窗口
fmt.Println("\n4. 自定义配置窗口测试")
customConfig := types.DefaultPerformanceConfig()
customConfig.BufferConfig.WindowOutputSize = 2000 // 自定义窗口输出缓冲区大小
testWindowWithConfig("自定义配置", streamsql.New(streamsql.WithCustomPerformance(customConfig)))
fmt.Println("\n=== 窗口配置演示完成 ===")
}
func testWindowWithConfig(configName string, ssql *streamsql.Streamsql) {
// 执行一个简单的滚动窗口查询
sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('2s')"
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ %s - 执行SQL失败: %v\n", configName, err)
return
}
// 添加结果处理器
stream := ssql.Stream()
if stream != nil {
stream.AddSink(func(result interface{}) {
fmt.Printf("📊 %s - 窗口结果: %v\n", configName, result)
})
// 发送测试数据
for i := 0; i < 5; i++ {
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device_%d", i%2),
"temperature": 20.0 + float64(i),
"timestamp": time.Now(),
}
ssql.Emit(data)
}
// 等待处理完成
time.Sleep(3 * time.Second)
// 获取统计信息
stats := ssql.GetDetailedStats()
fmt.Printf("📈 %s - 统计信息: %v\n", configName, stats)
}
// 停止流处理
ssql.Stop()
fmt.Printf("✅ %s - 测试完成\n", configName)
}
+245
View File
@@ -0,0 +1,245 @@
package expr
import (
"fmt"
"strings"
)
// parseCaseExpression parses CASE expression
func parseCaseExpression(tokens []string) (*ExprNode, []string, error) {
if len(tokens) == 0 || strings.ToUpper(tokens[0]) != "CASE" {
return nil, nil, fmt.Errorf("expected CASE keyword")
}
remaining := tokens[1:]
caseExpr := &CaseExpression{}
// Check if it's a simple CASE expression (CASE expr WHEN value THEN result)
if len(remaining) > 0 && strings.ToUpper(remaining[0]) != "WHEN" {
// Simple CASE expression
value, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing CASE expression: %v", err)
}
caseExpr.Value = value
remaining = newRemaining
}
// Parse WHEN clauses
for len(remaining) > 0 && strings.ToUpper(remaining[0]) == "WHEN" {
remaining = remaining[1:] // Skip WHEN
// Parse WHEN condition
condition, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing WHEN condition: %v", err)
}
remaining = newRemaining
// Check THEN keyword
if len(remaining) == 0 || strings.ToUpper(remaining[0]) != "THEN" {
return nil, nil, fmt.Errorf("expected THEN after WHEN condition")
}
remaining = remaining[1:] // Skip THEN
// Parse THEN result
result, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing THEN result: %v", err)
}
remaining = newRemaining
// Add WHEN clause
caseExpr.WhenClauses = append(caseExpr.WhenClauses, WhenClause{
Condition: condition,
Result: result,
})
}
// Parse optional ELSE clause
if len(remaining) > 0 && strings.ToUpper(remaining[0]) == "ELSE" {
remaining = remaining[1:] // Skip ELSE
elseExpr, newRemaining, err := parseOrExpression(remaining)
if err != nil {
return nil, nil, fmt.Errorf("error parsing ELSE expression: %v", err)
}
caseExpr.ElseResult = elseExpr
remaining = newRemaining
}
// Check END keyword
if len(remaining) == 0 || strings.ToUpper(remaining[0]) != "END" {
return nil, nil, fmt.Errorf("expected END to close CASE expression")
}
// Create ExprNode containing CaseExpression
caseNode := &ExprNode{
Type: TypeCase,
CaseExpr: caseExpr,
}
return caseNode, remaining[1:], nil
}
// evaluateCaseExpression evaluates the value of CASE expression
func evaluateCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
if node.Type != TypeCase {
return 0, fmt.Errorf("not a CASE expression")
}
if node.CaseExpr == nil {
return 0, fmt.Errorf("invalid CASE expression")
}
// Simple CASE expression: CASE expr WHEN value THEN result
if node.CaseExpr.Value != nil {
return evaluateSimpleCaseExpression(node, data)
}
// Search CASE expression: CASE WHEN condition THEN result
return evaluateSearchCaseExpression(node, data)
}
// evaluateSimpleCaseExpression evaluates simple CASE expression
func evaluateSimpleCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
caseExpr := node.CaseExpr
if caseExpr == nil {
return 0, fmt.Errorf("invalid CASE expression")
}
// Evaluate CASE expression value
caseValue, err := evaluateNodeValue(caseExpr.Value, data)
if err != nil {
return 0, err
}
// Iterate through WHEN clauses
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN value
whenValue, err := evaluateNodeValue(whenClause.Condition, data)
if err != nil {
return 0, err
}
// Compare values
if compareValuesForEquality(caseValue, whenValue) {
// Evaluate and return THEN result
return evaluateNode(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNode(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL (return 0 here)
return 0, nil
}
// evaluateSearchCaseExpression evaluates search CASE expression
func evaluateSearchCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
caseExpr := node.CaseExpr
if caseExpr == nil {
return 0, fmt.Errorf("invalid CASE expression")
}
// Iterate through WHEN clauses
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN condition - use boolean evaluation to handle logical operators
conditionResult, err := evaluateBoolNode(whenClause.Condition, data)
if err != nil {
return 0, err
}
// If condition is true, return THEN result
if conditionResult {
return evaluateNode(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNode(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL (return 0 here)
return 0, nil
}
// evaluateCaseExpressionWithNull evaluates CASE expression with NULL value support
func evaluateCaseExpressionWithNull(node *ExprNode, data map[string]interface{}) (interface{}, bool, error) {
if node.Type != TypeCase {
return nil, false, fmt.Errorf("not a CASE expression")
}
caseExpr := node.CaseExpr
if caseExpr == nil {
return nil, false, fmt.Errorf("invalid CASE expression")
}
// Simple CASE expression: CASE expr WHEN value THEN result
if caseExpr.Value != nil {
return evaluateCaseExpressionValueWithNull(node, data)
}
// Search CASE expression: CASE WHEN condition THEN result
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN condition - use boolean evaluation to handle logical operators
conditionResult, err := evaluateBoolNode(whenClause.Condition, data)
if err != nil {
return nil, false, err
}
// If condition is true, return THEN result
if conditionResult {
return evaluateNodeValueWithNull(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNodeValueWithNull(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL
return nil, true, nil
}
// evaluateCaseExpressionValueWithNull evaluates simple CASE expression (with NULL support)
func evaluateCaseExpressionValueWithNull(node *ExprNode, data map[string]interface{}) (interface{}, bool, error) {
caseExpr := node.CaseExpr
if caseExpr == nil {
return nil, false, fmt.Errorf("invalid CASE expression")
}
// Evaluate CASE expression value
caseValue, caseIsNull, err := evaluateNodeValueWithNull(caseExpr.Value, data)
if err != nil {
return nil, false, err
}
// Iterate through WHEN clauses
for _, whenClause := range caseExpr.WhenClauses {
// Evaluate WHEN value
whenValue, whenIsNull, err := evaluateNodeValueWithNull(whenClause.Condition, data)
if err != nil {
return nil, false, err
}
// Compare values (with NULL comparison support)
if compareValuesWithNullForEquality(caseValue, caseIsNull, whenValue, whenIsNull) {
// Evaluate and return THEN result
return evaluateNodeValueWithNull(whenClause.Result, data)
}
}
// If no matching WHEN clause, evaluate ELSE expression
if caseExpr.ElseResult != nil {
return evaluateNodeValueWithNull(caseExpr.ElseResult, data)
}
// If no ELSE clause, return NULL
return nil, true, nil
}
File diff suppressed because it is too large Load Diff
+117
View File
@@ -0,0 +1,117 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Package expr provides expression parsing and evaluation capabilities for StreamSQL.
This package implements a comprehensive expression engine that supports mathematical operations,
logical comparisons, function calls, field references, and complex CASE expressions.
It serves as the foundation for WHERE clauses, HAVING clauses, and computed fields in SQL queries.
# Core Features
Mathematical Operations - Supports arithmetic operators (+, -, *, /, %, ^) with proper precedence
Logical Operations - Boolean logic with AND, OR operators and comparison operators (=, !=, <, >, <=, >=, LIKE)
Function Integration - Seamless integration with the functions package for built-in and custom functions
Field References - Dynamic field access with dot notation support for nested data structures
CASE Expressions - Full support for both simple and searched CASE expressions
Type Safety - Automatic type conversion and validation during expression evaluation
Fallback Support - Integration with expr-lang/expr library for complex expressions
# Expression Types
The package supports various expression node types:
// Basic types
TypeNumber - Numeric constants (integers and floats)
TypeString - String literals with proper escaping
TypeField - Field references (e.g., "temperature", "device.id")
TypeOperator - Binary and unary operators
TypeFunction - Function calls with argument validation
TypeParenthesis - Grouped expressions for precedence control
TypeCase - CASE expressions for conditional logic
# Usage Examples
Basic mathematical expression:
expr, err := NewExpression("temperature * 1.8 + 32")
if err != nil {
log.Fatal(err)
}
result, err := expr.Evaluate(data)
Logical expression with field references:
expr, err := NewExpression("temperature > 25 AND humidity < 60")
result, err := expr.Evaluate(data)
Function call expression:
expr, err := NewExpression("UPPER(device_name) LIKE 'SENSOR%'")
result, err := expr.Evaluate(data)
CASE expression for conditional logic:
expr, err := NewExpression(`
CASE
WHEN temperature > 30 THEN 'hot'
WHEN temperature > 20 THEN 'warm'
ELSE 'cold'
END
`)
result, err := expr.Evaluate(data)
# Operator Precedence
The expression parser follows standard mathematical precedence rules:
1. Parentheses (highest)
2. Power (^)
3. Multiplication, Division, Modulo (*, /, %)
4. Addition, Subtraction (+, -)
5. Comparison (>, <, >=, <=, LIKE, IS)
6. Equality (=, ==, !=, <>)
7. Logical AND
8. Logical OR (lowest)
# Error Handling
The package provides comprehensive error handling with detailed error messages:
Syntax validation during expression creation
Type checking during evaluation
Function argument validation
Graceful fallback to expr-lang for unsupported expressions
# Performance Considerations
Expressions are parsed once and can be evaluated multiple times
Built-in operator optimization for common mathematical operations
Lazy evaluation for logical operators (short-circuiting)
Efficient field access caching for repeated evaluations
Automatic fallback to optimized expr-lang library when needed
# Integration
This package integrates seamlessly with other StreamSQL components:
Functions package - For built-in and custom function execution
Types package - For data type definitions and conversions
Stream package - For real-time expression evaluation in data streams
RSQL package - For SQL parsing and expression extraction
*/
package expr
+962
View File
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+123 -1954
View File
File diff suppressed because it is too large Load Diff
+763 -25
View File
File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More