mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-04-18 01:58:20 +00:00
Compare commits
38 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3464ff5f6e | |||
| 1d9e2a3dab | |||
| b23fdea2cd | |||
| 696b5f7177 | |||
| 90afdead78 | |||
| 4615b7a308 | |||
| de6ca91c01 | |||
| 5b13316754 | |||
| c2c8f86d7f | |||
| c66d974dfc | |||
| 049295b599 | |||
| e388bacde3 | |||
| d6a8778731 | |||
| 9764b95b7a | |||
| 3ad5146654 | |||
| 74efe9e526 | |||
| ff00fd1f31 | |||
| 10319b45a6 | |||
| 790e6c615d | |||
| a066a4df1b | |||
| 05a25619b8 | |||
| 57983f19d7 | |||
| 6f5305ca01 | |||
| a46b833608 | |||
| 4249cef16b | |||
| 6bfb592bd0 | |||
| 691ca41c87 | |||
| a8cf91298a | |||
| f3fe997ce8 | |||
| 98dab93e5b | |||
| 937e8243cf | |||
| a43445ebc7 | |||
| de4d47d87d | |||
| a47748d4c7 | |||
| ed2a063000 | |||
| 8ebd152ec9 | |||
| 343d045554 | |||
| ec0ac04ebf |
@@ -41,91 +41,15 @@ jobs:
|
||||
run: go build -v ./...
|
||||
|
||||
- name: Run tests
|
||||
if: matrix.go-version != '1.21'
|
||||
run: go test -v -race -timeout 300s ./...
|
||||
|
||||
- name: Run tests with coverage
|
||||
if: matrix.go-version == '1.21'
|
||||
run: go test -v -race -coverprofile=coverage.out -covermode=atomic -timeout 300s ./...
|
||||
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 300s ./...
|
||||
|
||||
- name: Upload coverage to Codecov
|
||||
- name: Upload coverage reports to Codecov
|
||||
if: matrix.go-version == '1.21'
|
||||
uses: codecov/codecov-action@v5
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
|
||||
case-expression-tests:
|
||||
name: CASE Expression Tests
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version: '1.21'
|
||||
|
||||
- name: Check out code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Cache Go modules
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/go/pkg/mod
|
||||
key: ${{ runner.os }}-go-1.21-${{ hashFiles('**/go.sum') }}
|
||||
|
||||
- name: Download dependencies
|
||||
run: go mod download
|
||||
|
||||
- name: Run CASE Expression Parsing Tests
|
||||
run: go test -v -run TestCaseExpressionParsing -timeout 15s
|
||||
|
||||
- name: Run CASE Expression Comprehensive Tests
|
||||
run: go test -v -run TestCaseExpressionComprehensive -timeout 15s
|
||||
|
||||
- name: Run CASE Expression Field Extraction Tests
|
||||
run: go test -v -run TestCaseExpressionFieldExtraction -timeout 15s
|
||||
|
||||
- name: Run CASE Expression in SQL Tests
|
||||
run: go test -v -run TestCaseExpressionInSQL -timeout 15s
|
||||
|
||||
- name: Run CASE Expression Aggregation Tests (with known limitations)
|
||||
run: go test -v -run "TestCaseExpressionInAggregation|TestComplexCaseExpressionsInAggregation" -timeout 20s
|
||||
|
||||
- name: Run CASE Expression Edge Cases
|
||||
run: go test -v -run TestCaseExpressionEdgeCases -timeout 15s
|
||||
|
||||
# lint:
|
||||
# name: Lint
|
||||
# runs-on: ubuntu-latest
|
||||
#
|
||||
# steps:
|
||||
# - name: Set up Go
|
||||
# uses: actions/setup-go@v4
|
||||
# with:
|
||||
# go-version: '1.21'
|
||||
#
|
||||
# - name: Check out code
|
||||
# uses: actions/checkout@v4
|
||||
#
|
||||
# - name: Run golangci-lint
|
||||
# uses: golangci/golangci-lint-action@v3
|
||||
# with:
|
||||
# version: latest
|
||||
# args: --timeout=5m
|
||||
#
|
||||
# security:
|
||||
# name: Security Scan
|
||||
# runs-on: ubuntu-latest
|
||||
#
|
||||
# steps:
|
||||
# - name: Set up Go
|
||||
# uses: actions/setup-go@v4
|
||||
# with:
|
||||
# go-version: '1.21'
|
||||
#
|
||||
# - name: Check out code
|
||||
# uses: actions/checkout@v4
|
||||
#
|
||||
# - name: Run Gosec Security Scanner
|
||||
# uses: securecodewarrior/github-action-gosec@v1
|
||||
# with:
|
||||
# args: './...'
|
||||
uses: codecov/codecov-action@v3
|
||||
env:
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
|
||||
@@ -5,6 +5,9 @@ on:
|
||||
tags:
|
||||
- 'v*'
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: Test Before Release
|
||||
@@ -54,12 +57,4 @@ jobs:
|
||||
key: ${{ runner.os }}-go-1.21-${{ hashFiles('**/go.sum') }}
|
||||
|
||||
- name: Download dependencies
|
||||
run: go mod download
|
||||
|
||||
- name: Create Release
|
||||
uses: softprops/action-gh-release@v1
|
||||
with:
|
||||
draft: false
|
||||
prerelease: false
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: go mod download
|
||||
@@ -3,12 +3,13 @@
|
||||
[](https://goreportcard.com/report/github.com/rulego/streamsql)
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/release.yml)
|
||||
[](https://codecov.io/gh/rulego/streamsql)
|
||||
|
||||
English| [简体中文](README_ZH.md)
|
||||
|
||||
**StreamSQL** is a lightweight, SQL-based stream processing engine for IoT edge, enabling efficient data processing and analysis on unbounded streams.
|
||||
|
||||
Similar to: [Apache Flink](https://flink.apache.org/) and [ekuiper](https://ekuiper.org/)
|
||||
📖 **[Documentation](https://rulego.cc/en/pages/streamsql-overview/)** | Similar to: [Apache Flink](https://flink.apache.org/)
|
||||
|
||||
## Features
|
||||
|
||||
@@ -82,9 +83,9 @@ func main() {
|
||||
}
|
||||
|
||||
// Handle real-time transformation results
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
fmt.Printf("Real-time result: %+v\n", result)
|
||||
})
|
||||
ssql.AddSink(func(results []map[string]interface{}) {
|
||||
fmt.Printf("Real-time result: %+v\n", results)
|
||||
})
|
||||
|
||||
// Simulate sensor data input
|
||||
sensorData := []map[string]interface{}{
|
||||
@@ -111,6 +112,7 @@ func main() {
|
||||
// Process data one by one, each will output results immediately
|
||||
for _, data := range sensorData {
|
||||
ssql.Emit(data)
|
||||
//changedData,err:=ssql.EmitSync(data) //Synchronize to obtain processing results
|
||||
time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
|
||||
}
|
||||
|
||||
@@ -195,8 +197,8 @@ func main() {
|
||||
"humidity": 50.0 + rand.Float64()*20, // Humidity range: 50-70%
|
||||
}
|
||||
// Add data to stream, triggering StreamSQL's real-time processing
|
||||
// AddData distributes data to corresponding windows and aggregators
|
||||
ssql.stream.AddData(randomData)
|
||||
// Emit distributes data to corresponding windows and aggregators
|
||||
ssql.Emit(randomData)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
@@ -209,10 +211,10 @@ func main() {
|
||||
// Step 6: Setup Result Processing Pipeline
|
||||
resultChan := make(chan interface{})
|
||||
// Add computation result callback function (Sink)
|
||||
// When window triggers computation, results are output through this callback
|
||||
ssql.stream.AddSink(func(result interface{}) {
|
||||
resultChan <- result
|
||||
})
|
||||
// When window triggers computation, results are output through this callback
|
||||
ssql.AddSink(func(results []map[string]interface{}) {
|
||||
resultChan <- results
|
||||
})
|
||||
|
||||
// Step 7: Start Result Consumer Goroutine
|
||||
// Count received results for effect verification
|
||||
@@ -273,9 +275,9 @@ func main() {
|
||||
}
|
||||
|
||||
// Handle aggregation results
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
fmt.Printf("Aggregation result: %+v\n", result)
|
||||
})
|
||||
ssql.AddSink(func(results []map[string]interface{}) {
|
||||
fmt.Printf("Aggregation result: %+v\n", results)
|
||||
})
|
||||
|
||||
// Add nested structured data
|
||||
nestedData := map[string]interface{}{
|
||||
|
||||
+19
-11
@@ -3,12 +3,13 @@
|
||||
[](https://goreportcard.com/report/github.com/rulego/streamsql)
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/release.yml)
|
||||
[](https://codecov.io/gh/rulego/streamsql)
|
||||
|
||||
[English](README.md)| 简体中文
|
||||
|
||||
**StreamSQL** 是一款轻量级的、基于 SQL 的物联网边缘流处理引擎。它能够高效地处理和分析无界数据流。
|
||||
|
||||
类似: [Apache Flink](https://flink.apache.org/) 和 [ekuiper](https://ekuiper.org/)
|
||||
📖 **[官方文档](https://rulego.cc/pages/streamsql-overview/)** | 类似: [Apache Flink](https://flink.apache.org/)
|
||||
|
||||
## 功能特性
|
||||
|
||||
@@ -85,8 +86,8 @@ func main() {
|
||||
}
|
||||
|
||||
// 处理实时转换结果
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
fmt.Printf("实时处理结果: %+v\n", result)
|
||||
ssql.AddSink(func(results []map[string]interface{}) {
|
||||
fmt.Printf("实时处理结果: %+v\n", results)
|
||||
})
|
||||
|
||||
// 模拟传感器数据输入
|
||||
@@ -114,6 +115,7 @@ func main() {
|
||||
// 逐条处理数据,每条都会立即输出结果
|
||||
for _, data := range sensorData {
|
||||
ssql.Emit(data)
|
||||
//changedData,err:=ssql.EmitSync(data) //同步获得处理结果
|
||||
time.Sleep(100 * time.Millisecond) // 模拟实时数据到达
|
||||
}
|
||||
|
||||
@@ -206,7 +208,7 @@ func main() {
|
||||
// - 按deviceId分组
|
||||
// - 将数据分配到对应的时间窗口
|
||||
// - 更新聚合计算状态
|
||||
ssql.stream.AddData(randomData)
|
||||
ssql.Emit(randomData)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
@@ -221,8 +223,10 @@ func main() {
|
||||
// 6. 注册结果回调函数
|
||||
// 当窗口触发时(每5秒),会调用这个回调函数
|
||||
// 传递聚合计算的结果
|
||||
ssql.stream.AddSink(func(result interface{}) {
|
||||
resultChan <- result
|
||||
ssql.AddSink(func(results []map[string]interface{}) {
|
||||
for _, result := range results {
|
||||
resultChan <- result
|
||||
}
|
||||
})
|
||||
|
||||
// 7. 结果消费者 - 处理计算结果
|
||||
@@ -289,18 +293,22 @@ func main() {
|
||||
}
|
||||
|
||||
// 处理聚合结果
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
fmt.Printf("聚合结果: %+v\n", result)
|
||||
ssql.AddSink(func(results []map[string]interface{}) {
|
||||
fmt.Printf("聚合结果: %+v\n", results)
|
||||
})
|
||||
|
||||
// 添加嵌套结构数据
|
||||
nestedData := map[string]interface{}{
|
||||
"device": map[string]interface{}{
|
||||
"info": map[string]interface{}{
|
||||
"name": "temperature-sensor-001",
|
||||
"type": "temperature",
|
||||
"name": "temperature-sensor-001",
|
||||
"type": "temperature",
|
||||
"status": "active",
|
||||
},
|
||||
"location": map[string]interface{}{
|
||||
"building": "智能温室-A区",
|
||||
"floor": "3F",
|
||||
},
|
||||
"location": "智能温室-A区",
|
||||
},
|
||||
"sensor": map[string]interface{}{
|
||||
"temperature": 25.5,
|
||||
|
||||
+34
-12
@@ -4,12 +4,10 @@ import (
|
||||
"github.com/rulego/streamsql/functions"
|
||||
)
|
||||
|
||||
// 为了向后兼容,重新导出functions模块中的类型和函数
|
||||
|
||||
// AggregateType 聚合类型,重新导出functions.AggregateType
|
||||
// AggregateType aggregate type, re-exports functions.AggregateType
|
||||
type AggregateType = functions.AggregateType
|
||||
|
||||
// 重新导出所有聚合类型常量
|
||||
// Re-export all aggregate type constants
|
||||
const (
|
||||
Sum = functions.Sum
|
||||
Count = functions.Count
|
||||
@@ -22,45 +20,69 @@ const (
|
||||
WindowStart = functions.WindowStart
|
||||
WindowEnd = functions.WindowEnd
|
||||
Collect = functions.Collect
|
||||
FirstValue = functions.FirstValue
|
||||
LastValue = functions.LastValue
|
||||
MergeAgg = functions.MergeAgg
|
||||
StdDevS = functions.StdDevS
|
||||
Deduplicate = functions.Deduplicate
|
||||
Var = functions.Var
|
||||
VarS = functions.VarS
|
||||
// 分析函数
|
||||
// Analytical functions
|
||||
Lag = functions.Lag
|
||||
Latest = functions.Latest
|
||||
ChangedCol = functions.ChangedCol
|
||||
HadChanged = functions.HadChanged
|
||||
// 表达式聚合器,用于处理自定义函数
|
||||
// Expression aggregator for handling custom functions
|
||||
Expression = functions.Expression
|
||||
// Post-aggregation marker
|
||||
PostAggregation = functions.PostAggregation
|
||||
)
|
||||
|
||||
// AggregatorFunction 聚合器函数接口,重新导出functions.LegacyAggregatorFunction
|
||||
// AggregatorFunction aggregator function interface, re-exports functions.LegacyAggregatorFunction
|
||||
type AggregatorFunction = functions.LegacyAggregatorFunction
|
||||
|
||||
// ContextAggregator 支持context机制的聚合器接口,重新导出functions.ContextAggregator
|
||||
// ContextAggregator aggregator interface supporting context mechanism, re-exports functions.ContextAggregator
|
||||
type ContextAggregator = functions.ContextAggregator
|
||||
|
||||
// Register 添加自定义聚合器到全局注册表,重新导出functions.RegisterLegacyAggregator
|
||||
// Register adds custom aggregator to global registry, re-exports functions.RegisterLegacyAggregator
|
||||
func Register(name string, constructor func() AggregatorFunction) {
|
||||
functions.RegisterLegacyAggregator(name, constructor)
|
||||
}
|
||||
|
||||
// CreateBuiltinAggregator 创建内置聚合器,重新导出functions.CreateLegacyAggregator
|
||||
// CreateBuiltinAggregator creates built-in aggregator, re-exports functions.CreateLegacyAggregator
|
||||
func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction {
|
||||
// 特殊处理expression类型
|
||||
// Special handling for expression type
|
||||
if aggType == "expression" {
|
||||
return &ExpressionAggregatorWrapper{
|
||||
function: functions.NewExpressionAggregatorFunction(),
|
||||
}
|
||||
}
|
||||
|
||||
// Special handling for post-aggregation type (placeholder aggregator)
|
||||
if aggType == "post_aggregation" {
|
||||
return &PostAggregationPlaceholder{}
|
||||
}
|
||||
|
||||
return functions.CreateLegacyAggregator(aggType)
|
||||
}
|
||||
|
||||
// ExpressionAggregatorWrapper 包装表达式聚合器,使其兼容LegacyAggregatorFunction接口
|
||||
// PostAggregationPlaceholder is a placeholder aggregator for post-aggregation fields
|
||||
type PostAggregationPlaceholder struct{}
|
||||
|
||||
func (p *PostAggregationPlaceholder) New() AggregatorFunction {
|
||||
return &PostAggregationPlaceholder{}
|
||||
}
|
||||
|
||||
func (p *PostAggregationPlaceholder) Add(value interface{}) {
|
||||
// Do nothing - this is just a placeholder
|
||||
}
|
||||
|
||||
func (p *PostAggregationPlaceholder) Result() interface{} {
|
||||
// Return nil - actual result will be computed in post-processing
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExpressionAggregatorWrapper wraps expression aggregator to make it compatible with LegacyAggregatorFunction interface
|
||||
type ExpressionAggregatorWrapper struct {
|
||||
function *functions.ExpressionAggregatorFunction
|
||||
}
|
||||
|
||||
@@ -0,0 +1,148 @@
|
||||
package aggregator
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestPostAggregationPlaceholder 测试后聚合占位符的完整功能
|
||||
func TestPostAggregationPlaceholder(t *testing.T) {
|
||||
t.Run("测试PostAggregationPlaceholder基本功能", func(t *testing.T) {
|
||||
// 创建PostAggregationPlaceholder实例
|
||||
placeholder := &PostAggregationPlaceholder{}
|
||||
require.NotNil(t, placeholder)
|
||||
|
||||
// 测试New方法
|
||||
newPlaceholder := placeholder.New()
|
||||
require.NotNil(t, newPlaceholder)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, newPlaceholder)
|
||||
|
||||
// 测试Add方法(应该不做任何操作)
|
||||
placeholder.Add(10)
|
||||
placeholder.Add("test")
|
||||
placeholder.Add(nil)
|
||||
placeholder.Add([]int{1, 2, 3})
|
||||
|
||||
// 测试Result方法(应该返回nil)
|
||||
result := placeholder.Result()
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("测试通过CreateBuiltinAggregator创建PostAggregationPlaceholder", func(t *testing.T) {
|
||||
// 使用CreateBuiltinAggregator创建post_aggregation类型的聚合器
|
||||
aggregator := CreateBuiltinAggregator(PostAggregation)
|
||||
require.NotNil(t, aggregator)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
|
||||
|
||||
// 测试创建的聚合器功能
|
||||
newAgg := aggregator.New()
|
||||
require.NotNil(t, newAgg)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, newAgg)
|
||||
|
||||
// 测试添加各种类型的值
|
||||
newAgg.Add(100)
|
||||
newAgg.Add("string_value")
|
||||
newAgg.Add(map[string]interface{}{"key": "value"})
|
||||
|
||||
// 验证结果始终为nil
|
||||
result := newAgg.Result()
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("测试PostAggregationPlaceholder的多实例独立性", func(t *testing.T) {
|
||||
// 创建多个实例
|
||||
placeholder1 := &PostAggregationPlaceholder{}
|
||||
placeholder2 := placeholder1.New()
|
||||
placeholder3 := placeholder1.New()
|
||||
|
||||
// 验证实例类型正确
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder1)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder2)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder3)
|
||||
|
||||
// 每个实例都应该返回nil
|
||||
assert.Nil(t, placeholder1.Result())
|
||||
assert.Nil(t, placeholder2.Result())
|
||||
assert.Nil(t, placeholder3.Result())
|
||||
|
||||
// 验证Add操作不会影响结果(因为是占位符)
|
||||
placeholder1.Add("test1")
|
||||
placeholder2.Add("test2")
|
||||
placeholder3.Add("test3")
|
||||
assert.Nil(t, placeholder1.Result())
|
||||
assert.Nil(t, placeholder2.Result())
|
||||
assert.Nil(t, placeholder3.Result())
|
||||
})
|
||||
|
||||
t.Run("测试PostAggregationPlaceholder在聚合场景中的使用", func(t *testing.T) {
|
||||
// 创建包含PostAggregationPlaceholder的聚合字段
|
||||
groupFields := []string{"category"}
|
||||
aggFields := []AggregationField{
|
||||
{InputField: "value", AggregateType: Sum, OutputAlias: "sum_value"},
|
||||
{InputField: "placeholder_field", AggregateType: PostAggregation, OutputAlias: "post_agg_field"},
|
||||
}
|
||||
|
||||
// 创建分组聚合器
|
||||
agg := NewGroupAggregator(groupFields, aggFields)
|
||||
require.NotNil(t, agg)
|
||||
|
||||
// 添加测试数据
|
||||
testData := []map[string]interface{}{
|
||||
{"category": "A", "value": 10, "placeholder_field": "should_be_ignored"},
|
||||
{"category": "A", "value": 20, "placeholder_field": "also_ignored"},
|
||||
{"category": "B", "value": 30, "placeholder_field": 999},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
err := agg.Add(data)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// 获取结果
|
||||
results, err := agg.GetResults()
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, results, 2)
|
||||
|
||||
// 验证PostAggregationPlaceholder字段的结果为nil
|
||||
for _, result := range results {
|
||||
assert.Contains(t, result, "post_agg_field")
|
||||
assert.Nil(t, result["post_agg_field"])
|
||||
// 验证正常聚合字段工作正常
|
||||
assert.Contains(t, result, "sum_value")
|
||||
assert.NotNil(t, result["sum_value"])
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestCreateBuiltinAggregatorPostAggregation 测试CreateBuiltinAggregator对post_aggregation类型的处理
|
||||
func TestCreateBuiltinAggregatorPostAggregation(t *testing.T) {
|
||||
t.Run("测试post_aggregation类型聚合器创建", func(t *testing.T) {
|
||||
aggregator := CreateBuiltinAggregator("post_aggregation")
|
||||
require.NotNil(t, aggregator)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
|
||||
})
|
||||
|
||||
t.Run("测试PostAggregation常量", func(t *testing.T) {
|
||||
// 验证PostAggregation常量值
|
||||
assert.Equal(t, AggregateType("post_aggregation"), PostAggregation)
|
||||
|
||||
// 使用常量创建聚合器
|
||||
aggregator := CreateBuiltinAggregator(PostAggregation)
|
||||
require.NotNil(t, aggregator)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
|
||||
})
|
||||
|
||||
t.Run("测试与其他聚合类型的区别", func(t *testing.T) {
|
||||
// 创建不同类型的聚合器
|
||||
sumAgg := CreateBuiltinAggregator(Sum)
|
||||
countAgg := CreateBuiltinAggregator(Count)
|
||||
postAgg := CreateBuiltinAggregator(PostAggregation)
|
||||
|
||||
// 验证类型不同
|
||||
assert.NotEqual(t, sumAgg, postAgg)
|
||||
assert.NotEqual(t, countAgg, postAgg)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, postAgg)
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
/*
|
||||
* Copyright 2025 The RuleGo Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
Package aggregator provides data aggregation functionality for StreamSQL.
|
||||
|
||||
This package implements group-based aggregation operations for stream processing,
|
||||
supporting various aggregation functions and expression evaluation. It provides
|
||||
thread-safe aggregation with support for custom expressions and built-in functions.
|
||||
|
||||
# Core Features
|
||||
|
||||
• Group Aggregation - Group data by specified fields and apply aggregation functions
|
||||
• Built-in Functions - Support for Sum, Count, Avg, Max, Min, and more
|
||||
• Expression Support - Custom expression evaluation within aggregations
|
||||
• Thread Safety - Concurrent aggregation operations with proper synchronization
|
||||
• Type Flexibility - Automatic type conversion and validation
|
||||
• Performance Optimized - Efficient memory usage and processing
|
||||
|
||||
# Aggregation Types
|
||||
|
||||
Supported aggregation functions (re-exported from functions package):
|
||||
|
||||
// Mathematical aggregations
|
||||
Sum, Count, Avg, Max, Min
|
||||
StdDev, StdDevS, Var, VarS
|
||||
Median, Percentile
|
||||
|
||||
// Collection aggregations
|
||||
Collect, LastValue, MergeAgg
|
||||
Deduplicate
|
||||
|
||||
// Window aggregations
|
||||
WindowStart, WindowEnd
|
||||
|
||||
// Analytical functions
|
||||
Lag, Latest, ChangedCol, HadChanged
|
||||
|
||||
// Custom expressions
|
||||
Expression
|
||||
|
||||
# Core Interfaces
|
||||
|
||||
Main aggregation interfaces:
|
||||
|
||||
type Aggregator interface {
|
||||
Add(data interface{}) error
|
||||
Put(key string, val interface{}) error
|
||||
GetResults() ([]map[string]interface{}, error)
|
||||
Reset()
|
||||
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
|
||||
}
|
||||
|
||||
type AggregatorFunction interface {
|
||||
New() AggregatorFunction
|
||||
Add(value interface{})
|
||||
Result() interface{}
|
||||
}
|
||||
|
||||
# Aggregation Configuration
|
||||
|
||||
Field configuration for aggregations:
|
||||
|
||||
type AggregationField struct {
|
||||
InputField string // Source field name
|
||||
AggregateType AggregateType // Aggregation function type
|
||||
OutputAlias string // Result field alias
|
||||
}
|
||||
|
||||
# Usage Examples
|
||||
|
||||
Basic group aggregation:
|
||||
|
||||
// Define aggregation fields
|
||||
aggFields := []AggregationField{
|
||||
{InputField: "temperature", AggregateType: Avg, OutputAlias: "avg_temp"},
|
||||
{InputField: "humidity", AggregateType: Max, OutputAlias: "max_humidity"},
|
||||
{InputField: "device_id", AggregateType: Count, OutputAlias: "device_count"},
|
||||
}
|
||||
|
||||
// Create group aggregator
|
||||
aggregator := NewGroupAggregator([]string{"location"}, aggFields)
|
||||
|
||||
// Add data
|
||||
data := map[string]interface{}{
|
||||
"location": "room1",
|
||||
"temperature": 25.5,
|
||||
"humidity": 60,
|
||||
"device_id": "sensor001",
|
||||
}
|
||||
aggregator.Add(data)
|
||||
|
||||
// Get results
|
||||
results, err := aggregator.GetResults()
|
||||
|
||||
Expression-based aggregation:
|
||||
|
||||
// Register custom expression
|
||||
aggregator.RegisterExpression(
|
||||
"comfort_index",
|
||||
"temperature * 0.7 + humidity * 0.3",
|
||||
[]string{"temperature", "humidity"},
|
||||
func(data interface{}) (interface{}, error) {
|
||||
// Custom evaluation logic
|
||||
return evaluateComfortIndex(data)
|
||||
},
|
||||
)
|
||||
|
||||
Multiple group aggregation:
|
||||
|
||||
// Group by multiple fields
|
||||
aggregator := NewGroupAggregator(
|
||||
[]string{"location", "device_type"},
|
||||
aggFields,
|
||||
)
|
||||
|
||||
// Results will be grouped by both location and device_type
|
||||
results, err := aggregator.GetResults()
|
||||
|
||||
# Built-in Aggregators
|
||||
|
||||
Create built-in aggregation functions:
|
||||
|
||||
// Create specific aggregator
|
||||
sumAgg := CreateBuiltinAggregator(Sum)
|
||||
avgAgg := CreateBuiltinAggregator(Avg)
|
||||
countAgg := CreateBuiltinAggregator(Count)
|
||||
|
||||
// Use aggregator
|
||||
sumAgg.Add(10)
|
||||
sumAgg.Add(20)
|
||||
result := sumAgg.Result() // returns 30
|
||||
|
||||
# Custom Aggregators
|
||||
|
||||
Register custom aggregation functions:
|
||||
|
||||
Register("custom_avg", func() AggregatorFunction {
|
||||
return &CustomAvgAggregator{}
|
||||
})
|
||||
|
||||
# Integration
|
||||
|
||||
Integrates with other StreamSQL components:
|
||||
|
||||
• Functions package - Built-in aggregation function implementations
|
||||
• Stream package - Real-time data aggregation in streams
|
||||
• Window package - Window-based aggregation operations
|
||||
• Types package - Data type definitions and conversions
|
||||
• RSQL package - SQL GROUP BY and aggregation parsing
|
||||
*/
|
||||
package aggregator
|
||||
@@ -11,20 +11,21 @@ import (
|
||||
"github.com/rulego/streamsql/utils/fieldpath"
|
||||
)
|
||||
|
||||
// Aggregator aggregator interface
|
||||
type Aggregator interface {
|
||||
Add(data interface{}) error
|
||||
Put(key string, val interface{}) error
|
||||
GetResults() ([]map[string]interface{}, error)
|
||||
Reset()
|
||||
// RegisterExpression 注册表达式计算器
|
||||
// RegisterExpression registers expression evaluator
|
||||
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
|
||||
}
|
||||
|
||||
// AggregationField 定义单个聚合字段的配置
|
||||
// AggregationField defines configuration for a single aggregation field
|
||||
type AggregationField struct {
|
||||
InputField string // 输入字段名(如 "temperature")
|
||||
AggregateType AggregateType // 聚合类型(如 Sum, Avg)
|
||||
OutputAlias string // 输出别名(如 "temp_sum")
|
||||
InputField string // Input field name (e.g., "temperature")
|
||||
AggregateType AggregateType // Aggregation type (e.g., Sum, Avg)
|
||||
OutputAlias string // Output alias (e.g., "temp_sum")
|
||||
}
|
||||
|
||||
type GroupAggregator struct {
|
||||
@@ -34,26 +35,26 @@ type GroupAggregator struct {
|
||||
groups map[string]map[string]AggregatorFunction
|
||||
mu sync.RWMutex
|
||||
context map[string]interface{}
|
||||
// 表达式计算器
|
||||
// Expression evaluators
|
||||
expressions map[string]*ExpressionEvaluator
|
||||
}
|
||||
|
||||
// ExpressionEvaluator 包装表达式计算功能
|
||||
// ExpressionEvaluator wraps expression evaluation functionality
|
||||
type ExpressionEvaluator struct {
|
||||
Expression string // 完整表达式
|
||||
Field string // 主字段名
|
||||
Fields []string // 表达式中引用的所有字段
|
||||
Expression string // Complete expression
|
||||
Field string // Primary field name
|
||||
Fields []string // All fields referenced in expression
|
||||
evaluateFunc func(data interface{}) (interface{}, error)
|
||||
}
|
||||
|
||||
// NewGroupAggregator 创建新的分组聚合器
|
||||
// NewGroupAggregator creates a new group aggregator
|
||||
func NewGroupAggregator(groupFields []string, aggregationFields []AggregationField) *GroupAggregator {
|
||||
aggregators := make(map[string]AggregatorFunction)
|
||||
|
||||
// 为每个聚合字段创建聚合器
|
||||
// Create aggregator for each aggregation field
|
||||
for i := range aggregationFields {
|
||||
if aggregationFields[i].OutputAlias == "" {
|
||||
// 如果没有指定别名,使用输入字段名
|
||||
// If no alias specified, use input field name
|
||||
aggregationFields[i].OutputAlias = aggregationFields[i].InputField
|
||||
}
|
||||
aggregators[aggregationFields[i].OutputAlias] = CreateBuiltinAggregator(aggregationFields[i].AggregateType)
|
||||
@@ -68,7 +69,7 @@ func NewGroupAggregator(groupFields []string, aggregationFields []AggregationFie
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterExpression 注册表达式计算器
|
||||
// RegisterExpression registers expression evaluator
|
||||
func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error)) {
|
||||
ga.mu.Lock()
|
||||
defer ga.mu.Unlock()
|
||||
@@ -91,26 +92,26 @@ func (ga *GroupAggregator) Put(key string, val interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// isNumericAggregator 检查聚合器是否需要数值类型输入
|
||||
// isNumericAggregator checks if aggregator requires numeric type input
|
||||
func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
|
||||
// 通过functions模块动态检查函数类型
|
||||
// Dynamically check function type through functions module
|
||||
if fn, exists := functions.Get(string(aggType)); exists {
|
||||
switch fn.GetType() {
|
||||
case functions.TypeMath:
|
||||
// 数学函数通常需要数值输入
|
||||
// Math functions usually require numeric input
|
||||
return true
|
||||
case functions.TypeAggregation:
|
||||
// 检查是否是数值聚合函数
|
||||
// Check if it's a numeric aggregation function
|
||||
switch string(aggType) {
|
||||
case functions.SumStr, functions.AvgStr, functions.MinStr, functions.MaxStr, functions.CountStr,
|
||||
functions.StdDevStr, functions.MedianStr, functions.PercentileStr,
|
||||
functions.VarStr, functions.VarSStr, functions.StdDevSStr:
|
||||
return true
|
||||
case functions.CollectStr, functions.MergeAggStr, functions.DeduplicateStr, functions.LastValueStr:
|
||||
// 这些函数可以处理任意类型
|
||||
// These functions can handle any type
|
||||
return false
|
||||
default:
|
||||
// 对于未知的聚合函数,尝试检查函数名称模式
|
||||
// For unknown aggregation functions, try to check function name patterns
|
||||
funcName := string(aggType)
|
||||
if strings.Contains(funcName, functions.SumStr) || strings.Contains(funcName, functions.AvgStr) ||
|
||||
strings.Contains(funcName, functions.MinStr) || strings.Contains(funcName, functions.MaxStr) ||
|
||||
@@ -120,15 +121,15 @@ func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
|
||||
return false
|
||||
}
|
||||
case functions.TypeAnalytical:
|
||||
// 分析函数通常可以处理任意类型
|
||||
// Analytical functions can usually handle any type
|
||||
return false
|
||||
default:
|
||||
// 其他类型的函数,保守起见认为不需要数值转换
|
||||
// For other types of functions, conservatively assume no numeric conversion needed
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// 如果函数不存在,根据名称模式判断
|
||||
// If function doesn't exist, judge by name pattern
|
||||
funcName := string(aggType)
|
||||
if strings.Contains(funcName, functions.SumStr) || strings.Contains(funcName, functions.AvgStr) ||
|
||||
strings.Contains(funcName, functions.MinStr) || strings.Contains(funcName, functions.MaxStr) ||
|
||||
@@ -139,9 +140,21 @@ func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// shouldAllowNullValues 判断聚合函数是否应该允许NULL值
|
||||
func (ga *GroupAggregator) shouldAllowNullValues(aggType AggregateType) bool {
|
||||
// FIRST_VALUE和LAST_VALUE函数应该允许NULL值,因为它们需要记录第一个/最后一个值,即使是NULL
|
||||
return aggType == FirstValue || aggType == LastValue
|
||||
}
|
||||
|
||||
func (ga *GroupAggregator) Add(data interface{}) error {
|
||||
ga.mu.Lock()
|
||||
defer ga.mu.Unlock()
|
||||
|
||||
// 检查数据是否为nil
|
||||
if data == nil {
|
||||
return fmt.Errorf("data cannot be nil")
|
||||
}
|
||||
|
||||
var v reflect.Value
|
||||
|
||||
switch data.(type) {
|
||||
@@ -153,6 +166,10 @@ func (ga *GroupAggregator) Add(data interface{}) error {
|
||||
if v.Kind() == reflect.Ptr {
|
||||
v = v.Elem()
|
||||
}
|
||||
// 检查是否为支持的数据类型
|
||||
if v.Kind() != reflect.Struct && v.Kind() != reflect.Map {
|
||||
return fmt.Errorf("unsupported data type: %T, expected struct or map", data)
|
||||
}
|
||||
}
|
||||
|
||||
key := ""
|
||||
@@ -160,11 +177,11 @@ func (ga *GroupAggregator) Add(data interface{}) error {
|
||||
var fieldVal interface{}
|
||||
var found bool
|
||||
|
||||
// 检查是否是嵌套字段
|
||||
// Check if it's a nested field
|
||||
if fieldpath.IsNestedField(field) {
|
||||
fieldVal, found = fieldpath.GetNestedField(data, field)
|
||||
} else {
|
||||
// 原有的字段访问逻辑
|
||||
// Original field access logic
|
||||
var f reflect.Value
|
||||
if v.Kind() == reflect.Map {
|
||||
keyVal := reflect.ValueOf(field)
|
||||
@@ -198,21 +215,21 @@ func (ga *GroupAggregator) Add(data interface{}) error {
|
||||
ga.groups[key] = make(map[string]AggregatorFunction)
|
||||
}
|
||||
|
||||
// 为每个字段创建聚合器实例
|
||||
// Create aggregator instances for each field
|
||||
for outputAlias, agg := range ga.aggregators {
|
||||
if _, exists := ga.groups[key][outputAlias]; !exists {
|
||||
ga.groups[key][outputAlias] = agg.New()
|
||||
}
|
||||
}
|
||||
|
||||
// 处理每个聚合字段
|
||||
// Process each aggregation field
|
||||
for _, aggField := range ga.aggregationFields {
|
||||
outputAlias := aggField.OutputAlias
|
||||
if outputAlias == "" {
|
||||
outputAlias = aggField.InputField
|
||||
}
|
||||
|
||||
// 检查是否有表达式计算器
|
||||
// Check if there's an expression evaluator
|
||||
if expr, hasExpr := ga.expressions[outputAlias]; hasExpr {
|
||||
result, err := expr.evaluateFunc(data)
|
||||
if err != nil {
|
||||
@@ -227,23 +244,23 @@ func (ga *GroupAggregator) Add(data interface{}) error {
|
||||
|
||||
inputField := aggField.InputField
|
||||
|
||||
// 特殊处理count(*)的情况
|
||||
// Special handling for count(*) case
|
||||
if inputField == "*" {
|
||||
// 对于count(*),直接添加1,不需要获取具体字段值
|
||||
// For count(*), directly add 1 without getting specific field value
|
||||
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
|
||||
groupAgg.Add(1)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 获取字段值 - 支持嵌套字段
|
||||
// Get field value - supports nested fields
|
||||
var fieldVal interface{}
|
||||
var found bool
|
||||
|
||||
if fieldpath.IsNestedField(inputField) {
|
||||
fieldVal, found = fieldpath.GetNestedField(data, inputField)
|
||||
} else {
|
||||
// 原有的字段访问逻辑
|
||||
// Original field access logic
|
||||
var f reflect.Value
|
||||
if v.Kind() == reflect.Map {
|
||||
keyVal := reflect.ValueOf(inputField)
|
||||
@@ -259,7 +276,7 @@ func (ga *GroupAggregator) Add(data interface{}) error {
|
||||
}
|
||||
|
||||
if !found {
|
||||
// 尝试从context中获取
|
||||
// Try to get from context
|
||||
if ga.context != nil {
|
||||
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
|
||||
if contextAgg, ok := groupAgg.(ContextAggregator); ok {
|
||||
@@ -275,19 +292,31 @@ func (ga *GroupAggregator) Add(data interface{}) error {
|
||||
|
||||
aggType := aggField.AggregateType
|
||||
|
||||
// 动态检查是否需要数值转换
|
||||
if ga.isNumericAggregator(aggType) {
|
||||
// 对于数值聚合函数,尝试转换为数值类型
|
||||
// Skip nil values for most aggregation functions, but allow FIRST_VALUE and LAST_VALUE to handle them
|
||||
if fieldVal == nil && !ga.shouldAllowNullValues(aggType) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Special handling for Count aggregator - it can handle any type
|
||||
if aggType == Count {
|
||||
// Count can handle any non-null value
|
||||
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
|
||||
groupAgg.Add(fieldVal)
|
||||
}
|
||||
} else if ga.isNumericAggregator(aggType) {
|
||||
// For numeric aggregation functions, try to convert to numeric type
|
||||
if numVal, err := cast.ToFloat64E(fieldVal); err == nil {
|
||||
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
|
||||
|
||||
groupAgg.Add(numVal)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("cannot convert field %s value %v to numeric type for aggregator %s", inputField, fieldVal, aggType)
|
||||
}
|
||||
} else {
|
||||
// 对于非数值聚合函数,直接传递原始值
|
||||
// For non-numeric aggregation functions, pass original value directly
|
||||
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
|
||||
|
||||
groupAgg.Add(fieldVal)
|
||||
}
|
||||
}
|
||||
@@ -299,6 +328,15 @@ func (ga *GroupAggregator) Add(data interface{}) error {
|
||||
func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error) {
|
||||
ga.mu.RLock()
|
||||
defer ga.mu.RUnlock()
|
||||
|
||||
// 如果既没有分组字段又没有聚合字段,但有数据被添加过,返回一个空的结果行
|
||||
if len(ga.aggregationFields) == 0 && len(ga.groupFields) == 0 {
|
||||
if len(ga.groups) > 0 {
|
||||
return []map[string]interface{}{{}}, nil
|
||||
}
|
||||
return []map[string]interface{}{}, nil
|
||||
}
|
||||
|
||||
result := make([]map[string]interface{}, 0, len(ga.groups))
|
||||
for key, aggregators := range ga.groups {
|
||||
group := make(map[string]interface{})
|
||||
@@ -309,7 +347,12 @@ func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error) {
|
||||
}
|
||||
}
|
||||
for field, agg := range aggregators {
|
||||
group[field] = agg.Result()
|
||||
result := agg.Result()
|
||||
group[field] = result
|
||||
// Debug: log aggregator results (can be removed in production)
|
||||
// if strings.HasPrefix(field, "__") {
|
||||
// fmt.Printf("Aggregator %s result: %v (%T)\n", field, result, result)
|
||||
// }
|
||||
}
|
||||
result = append(result, group)
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
+14
-14
@@ -16,7 +16,7 @@ type ExprCondition struct {
|
||||
}
|
||||
|
||||
func NewExprCondition(expression string) (Condition, error) {
|
||||
// 添加自定义字符串函数支持(startsWith、endsWith、contains是内置操作符)
|
||||
// Add custom string function support (startsWith, endsWith, contains are built-in operators)
|
||||
options := []expr.Option{
|
||||
expr.Function("like_match", func(params ...any) (any, error) {
|
||||
if len(params) != 2 {
|
||||
@@ -60,22 +60,22 @@ func (ec *ExprCondition) Evaluate(env interface{}) bool {
|
||||
return result.(bool)
|
||||
}
|
||||
|
||||
// matchesLikePattern 实现LIKE模式匹配
|
||||
// 支持%(匹配任意字符序列)和_(匹配单个字符)
|
||||
// matchesLikePattern implements LIKE pattern matching
|
||||
// Supports % (matches any character sequence) and _ (matches single character)
|
||||
func matchesLikePattern(text, pattern string) bool {
|
||||
return likeMatch(text, pattern, 0, 0)
|
||||
}
|
||||
|
||||
// likeMatch 递归实现LIKE匹配算法
|
||||
// likeMatch recursively implements LIKE matching algorithm
|
||||
func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
|
||||
// 如果模式已经匹配完成
|
||||
// If pattern has been fully matched
|
||||
if patternIndex >= len(pattern) {
|
||||
return textIndex >= len(text) // 文本也应该匹配完成
|
||||
return textIndex >= len(text) // Text should also be fully matched
|
||||
}
|
||||
|
||||
// 如果文本已经结束,但模式还有非%字符,则不匹配
|
||||
// If text has ended but pattern still has non-% characters, no match
|
||||
if textIndex >= len(text) {
|
||||
// 检查剩余的模式是否都是%
|
||||
// Check if remaining pattern characters are all %
|
||||
for i := patternIndex; i < len(pattern); i++ {
|
||||
if pattern[i] != '%' {
|
||||
return false
|
||||
@@ -84,16 +84,16 @@ func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// 处理当前模式字符
|
||||
// Process current pattern character
|
||||
patternChar := pattern[patternIndex]
|
||||
|
||||
if patternChar == '%' {
|
||||
// %可以匹配0个或多个字符
|
||||
// 尝试匹配0个字符(跳过%)
|
||||
// % can match 0 or more characters
|
||||
// Try matching 0 characters (skip %)
|
||||
if likeMatch(text, pattern, textIndex, patternIndex+1) {
|
||||
return true
|
||||
}
|
||||
// 尝试匹配1个或多个字符
|
||||
// Try matching 1 or more characters
|
||||
for i := textIndex; i < len(text); i++ {
|
||||
if likeMatch(text, pattern, i+1, patternIndex+1) {
|
||||
return true
|
||||
@@ -101,10 +101,10 @@ func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
|
||||
}
|
||||
return false
|
||||
} else if patternChar == '_' {
|
||||
// _匹配恰好一个字符
|
||||
// _ matches exactly one character
|
||||
return likeMatch(text, pattern, textIndex+1, patternIndex+1)
|
||||
} else {
|
||||
// 普通字符必须精确匹配
|
||||
// Regular characters must match exactly
|
||||
if text[textIndex] == patternChar {
|
||||
return likeMatch(text, pattern, textIndex+1, patternIndex+1)
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright 2025 The RuleGo Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
Package condition provides condition evaluation functionality for StreamSQL.
|
||||
|
||||
This package implements condition evaluation using the expr-lang library,
|
||||
supporting complex boolean expressions for filtering and conditional logic.
|
||||
It provides custom functions for SQL-like operations including LIKE pattern
|
||||
matching and NULL checking.
|
||||
|
||||
# Core Features
|
||||
|
||||
• Boolean Expression Evaluation - Evaluate complex boolean conditions
|
||||
• LIKE Pattern Matching - SQL-style pattern matching with % and _ wildcards
|
||||
• NULL Checking - Support for IS NULL and IS NOT NULL operations
|
||||
• Custom Functions - Extended function library for SQL compatibility
|
||||
• Type Safety - Automatic type conversion and validation
|
||||
• Performance Optimized - Compiled expressions for fast evaluation
|
||||
|
||||
# Condition Interface
|
||||
|
||||
Unified interface for condition evaluation:
|
||||
|
||||
type Condition interface {
|
||||
Evaluate(env interface{}) bool
|
||||
}
|
||||
|
||||
# Custom Functions
|
||||
|
||||
Built-in SQL-compatible functions:
|
||||
|
||||
// LIKE pattern matching
|
||||
like_match(text, pattern) - SQL LIKE operation with % and _ wildcards
|
||||
|
||||
// NULL checking
|
||||
is_null(value) - Check if value is NULL
|
||||
is_not_null(value) - Check if value is not NULL
|
||||
|
||||
# Usage Examples
|
||||
|
||||
Basic condition evaluation:
|
||||
|
||||
condition, err := NewExprCondition("age >= 18 AND status == 'active'")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
data := map[string]interface{}{
|
||||
"age": 25,
|
||||
"status": "active",
|
||||
}
|
||||
|
||||
result := condition.Evaluate(data) // returns true
|
||||
|
||||
LIKE pattern matching:
|
||||
|
||||
condition, err := NewExprCondition("like_match(name, 'John%')")
|
||||
data := map[string]interface{}{"name": "John Smith"}
|
||||
result := condition.Evaluate(data) // returns true
|
||||
|
||||
NULL checking:
|
||||
|
||||
condition, err := NewExprCondition("is_not_null(email)")
|
||||
data := map[string]interface{}{"email": "user@example.com"}
|
||||
result := condition.Evaluate(data) // returns true
|
||||
|
||||
Complex conditions:
|
||||
|
||||
condition, err := NewExprCondition(`
|
||||
age >= 18 AND
|
||||
like_match(email, '%@company.com') AND
|
||||
is_not_null(department)
|
||||
`)
|
||||
|
||||
# Pattern Matching
|
||||
|
||||
LIKE pattern matching supports:
|
||||
|
||||
% - Matches any sequence of characters (including empty)
|
||||
_ - Matches exactly one character
|
||||
|
||||
Examples:
|
||||
|
||||
'John%' matches 'John', 'John Smith', 'Johnny'
|
||||
'J_hn' matches 'John' but not 'Johan'
|
||||
'%@gmail.com' matches any email ending with @gmail.com
|
||||
|
||||
# Integration
|
||||
|
||||
Integrates with other StreamSQL components:
|
||||
|
||||
• Stream package - Data filtering and conditional processing
|
||||
• RSQL package - WHERE and HAVING clause evaluation
|
||||
• Types package - Data type handling and conversion
|
||||
• Expr package - Expression parsing and evaluation
|
||||
*/
|
||||
package condition
|
||||
@@ -15,23 +15,24 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
Package streamsql 是一个轻量级的、基于 SQL 的物联网边缘流处理引擎。
|
||||
Package streamsql is a lightweight, SQL-based IoT edge stream processing engine.
|
||||
|
||||
StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种窗口类型、聚合函数、
|
||||
自定义函数,以及与 RuleGo 生态的无缝集成。
|
||||
StreamSQL provides efficient unbounded data stream processing and analysis capabilities,
|
||||
supporting multiple window types, aggregate functions, custom functions, and seamless
|
||||
integration with the RuleGo ecosystem.
|
||||
|
||||
# 核心特性
|
||||
# Core Features
|
||||
|
||||
• 轻量级设计 - 纯内存操作,无外部依赖
|
||||
• SQL语法支持 - 使用熟悉的SQL语法处理流数据
|
||||
• 多种窗口类型 - 滑动窗口、滚动窗口、计数窗口、会话窗口
|
||||
• 丰富的聚合函数 - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE等
|
||||
• 插件式自定义函数 - 运行时动态注册,支持8种函数类型
|
||||
• RuleGo生态集成 - 利用RuleGo组件扩展输入输出源
|
||||
• Lightweight design - Pure in-memory operations, no external dependencies
|
||||
• SQL syntax support - Process stream data using familiar SQL syntax
|
||||
• Multiple window types - Sliding, tumbling, counting, and session windows
|
||||
• Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
|
||||
• Plugin-based custom functions - Runtime dynamic registration, supports 8 function types
|
||||
• RuleGo ecosystem integration - Extend input/output sources using RuleGo components
|
||||
|
||||
# 入门示例
|
||||
# Getting Started
|
||||
|
||||
基本的流数据处理:
|
||||
Basic stream data processing:
|
||||
|
||||
package main
|
||||
|
||||
@@ -43,10 +44,10 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 创建StreamSQL实例
|
||||
// Create StreamSQL instance
|
||||
ssql := streamsql.New()
|
||||
|
||||
// 定义SQL查询 - 每5秒按设备ID分组计算温度平均值
|
||||
// Define SQL query - Calculate temperature average by device ID every 5 seconds
|
||||
sql := `SELECT deviceId,
|
||||
AVG(temperature) as avg_temp,
|
||||
MIN(humidity) as min_humidity,
|
||||
@@ -56,18 +57,18 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
|
||||
WHERE deviceId != 'device3'
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
// 执行SQL,创建流处理任务
|
||||
// Execute SQL, create stream processing task
|
||||
err := ssql.Execute(sql)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// 添加结果处理回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
fmt.Printf("聚合结果: %v\n", result)
|
||||
// Add result processing callback
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf("Aggregation result: %v\n", result)
|
||||
})
|
||||
|
||||
// 模拟发送流数据
|
||||
// Simulate sending stream data
|
||||
go func() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
@@ -75,7 +76,7 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// 生成随机设备数据
|
||||
// Generate random device data
|
||||
data := map[string]interface{}{
|
||||
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1),
|
||||
"temperature": 20.0 + rand.Float64()*10,
|
||||
@@ -86,36 +87,36 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
|
||||
}
|
||||
}()
|
||||
|
||||
// 运行30秒
|
||||
// Run for 30 seconds
|
||||
time.Sleep(30 * time.Second)
|
||||
}
|
||||
|
||||
# 窗口函数
|
||||
# Window Functions
|
||||
|
||||
StreamSQL 支持多种窗口类型:
|
||||
StreamSQL supports multiple window types:
|
||||
|
||||
// 滚动窗口 - 每5秒一个独立窗口
|
||||
// Tumbling window - Independent window every 5 seconds
|
||||
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')
|
||||
|
||||
// 滑动窗口 - 窗口大小30秒,每10秒滑动一次
|
||||
// Sliding window - 30-second window size, slides every 10 seconds
|
||||
SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')
|
||||
|
||||
// 计数窗口 - 每100条记录一个窗口
|
||||
// Counting window - One window per 100 records
|
||||
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)
|
||||
|
||||
// 会话窗口 - 超时5分钟自动关闭会话
|
||||
// Session window - Automatically closes session after 5-minute timeout
|
||||
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
|
||||
|
||||
# 自定义函数
|
||||
# Custom Functions
|
||||
|
||||
StreamSQL 支持插件式自定义函数,运行时动态注册:
|
||||
StreamSQL supports plugin-based custom functions with runtime dynamic registration:
|
||||
|
||||
// 注册温度转换函数
|
||||
// Register temperature conversion function
|
||||
functions.RegisterCustomFunction(
|
||||
"fahrenheit_to_celsius",
|
||||
functions.TypeConversion,
|
||||
"温度转换",
|
||||
"华氏度转摄氏度",
|
||||
"Temperature conversion",
|
||||
"Fahrenheit to Celsius",
|
||||
1, 1,
|
||||
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
||||
f, _ := functions.ConvertToFloat64(args[0])
|
||||
@@ -123,55 +124,55 @@ StreamSQL 支持插件式自定义函数,运行时动态注册:
|
||||
},
|
||||
)
|
||||
|
||||
// 立即在SQL中使用
|
||||
// Use immediately in SQL
|
||||
sql := `SELECT deviceId,
|
||||
AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
|
||||
FROM stream GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
支持的自定义函数类型:
|
||||
• TypeMath - 数学计算函数
|
||||
• TypeString - 字符串处理函数
|
||||
• TypeConversion - 类型转换函数
|
||||
• TypeDateTime - 时间日期函数
|
||||
• TypeAggregation - 聚合函数
|
||||
• TypeAnalytical - 分析函数
|
||||
• TypeWindow - 窗口函数
|
||||
• TypeCustom - 通用自定义函数
|
||||
Supported custom function types:
|
||||
• TypeMath - Mathematical calculation functions
|
||||
• TypeString - String processing functions
|
||||
• TypeConversion - Type conversion functions
|
||||
• TypeDateTime - Date and time functions
|
||||
• TypeAggregation - Aggregate functions
|
||||
• TypeAnalytical - Analytical functions
|
||||
• TypeWindow - Window functions
|
||||
• TypeCustom - General custom functions
|
||||
|
||||
# 日志配置
|
||||
# Log Configuration
|
||||
|
||||
StreamSQL 提供灵活的日志配置选项:
|
||||
StreamSQL provides flexible log configuration options:
|
||||
|
||||
// 设置日志级别
|
||||
// Set log level
|
||||
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
|
||||
|
||||
// 输出到文件
|
||||
// Output to file
|
||||
logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||
ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))
|
||||
|
||||
// 禁用日志(生产环境)
|
||||
// Disable logging (production environment)
|
||||
ssql := streamsql.New(streamsql.WithDiscardLog())
|
||||
|
||||
# 与RuleGo集成
|
||||
# RuleGo Integration
|
||||
|
||||
StreamSQL提供了与RuleGo规则引擎的深度集成,通过两个专用组件实现流式数据处理:
|
||||
StreamSQL provides deep integration with the RuleGo rule engine through two dedicated components for stream data processing:
|
||||
|
||||
• streamTransform (x/streamTransform) - 流转换器,处理非聚合SQL查询
|
||||
• streamAggregator (x/streamAggregator) - 流聚合器,处理聚合SQL查询
|
||||
• streamTransform (x/streamTransform) - Stream transformer, handles non-aggregation SQL queries
|
||||
• streamAggregator (x/streamAggregator) - Stream aggregator, handles aggregation SQL queries
|
||||
|
||||
基本集成示例:
|
||||
Basic integration example:
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/rulego/rulego"
|
||||
"github.com/rulego/rulego/api/types"
|
||||
// 注册StreamSQL组件
|
||||
// Register StreamSQL components
|
||||
_ "github.com/rulego/rulego-components/external/streamsql"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 规则链配置
|
||||
// Rule chain configuration
|
||||
ruleChainJson := `{
|
||||
"ruleChain": {"id": "rule01"},
|
||||
"metadata": {
|
||||
@@ -196,10 +197,10 @@ StreamSQL提供了与RuleGo规则引擎的深度集成,通过两个专用组
|
||||
}
|
||||
}`
|
||||
|
||||
// 创建规则引擎
|
||||
// Create rule engine
|
||||
ruleEngine, _ := rulego.New("rule01", []byte(ruleChainJson))
|
||||
|
||||
// 发送数据
|
||||
// Send data
|
||||
data := `{"deviceId":"sensor01","temperature":25.5}`
|
||||
msg := types.NewMsg(0, "TELEMETRY", types.JSON, types.NewMetadata(), data)
|
||||
ruleEngine.OnMsg(msg)
|
||||
|
||||
@@ -51,7 +51,7 @@ func main() {
|
||||
fmt.Println("✓ SQL执行成功")
|
||||
|
||||
// 5. 添加结果监听器
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf("📊 聚合结果: %v\n", result)
|
||||
})
|
||||
|
||||
|
||||
@@ -83,19 +83,17 @@ func demonstrateArrayAccess(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" 📋 数组索引访问结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备: %v\n", item["device"])
|
||||
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
|
||||
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
|
||||
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
|
||||
fmt.Println()
|
||||
}
|
||||
for i, item := range result {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备: %v\n", item["device"])
|
||||
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
|
||||
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
|
||||
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -168,20 +166,19 @@ func demonstrateMapKeyAccess(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" 🗝️ Map键访问结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备ID: %v\n", item["device_id"])
|
||||
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
|
||||
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
|
||||
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
|
||||
fmt.Printf(" 应用版本: %v\n", item["app_version"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备ID: %v\n", item["device_id"])
|
||||
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
|
||||
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
|
||||
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
|
||||
fmt.Printf(" 应用版本: %v\n", item["app_version"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -266,20 +263,19 @@ func demonstrateComplexMixedAccess(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" 🔄 混合复杂访问结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 建筑: %v\n", item["building"])
|
||||
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
|
||||
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
|
||||
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
|
||||
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 建筑: %v\n", item["building"])
|
||||
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
|
||||
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
|
||||
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
|
||||
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -325,19 +321,18 @@ func demonstrateNegativeIndexAccess(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" ⬅️ 负数索引访问结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备名称: %v\n", item["device_name"])
|
||||
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
|
||||
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
|
||||
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备名称: %v\n", item["device_name"])
|
||||
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
|
||||
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
|
||||
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -372,20 +367,19 @@ func demonstrateArrayIndexAggregation(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" 📈 数组索引聚合计算结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
resultCount++
|
||||
fmt.Printf(" 聚合结果 %d:\n", i+1)
|
||||
fmt.Printf(" 位置: %v\n", item["location"])
|
||||
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
|
||||
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
|
||||
fmt.Printf(" 设备数量: %v\n", item["device_count"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
resultCount++
|
||||
fmt.Printf(" 聚合结果 %d:\n", i+1)
|
||||
fmt.Printf(" 位置: %v\n", item["location"])
|
||||
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
|
||||
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
|
||||
fmt.Printf(" 设备数量: %v\n", item["device_count"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
@@ -127,7 +127,7 @@ func testBasicFiltering() {
|
||||
}
|
||||
|
||||
// 添加结果处理函数
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 高温告警: %v\n", result)
|
||||
})
|
||||
|
||||
@@ -172,7 +172,7 @@ func testAggregation() {
|
||||
}
|
||||
|
||||
// 处理聚合结果
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 聚合结果: %v\n", result)
|
||||
})
|
||||
|
||||
@@ -221,7 +221,7 @@ func testSlidingWindow() {
|
||||
return
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 滑动窗口分析: %v\n", result)
|
||||
})
|
||||
|
||||
@@ -262,7 +262,7 @@ func testNestedFields() {
|
||||
return
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 嵌套字段结果: %v\n", result)
|
||||
})
|
||||
|
||||
@@ -336,7 +336,7 @@ func testCustomFunctions() {
|
||||
return
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 自定义函数结果: %v\n", result)
|
||||
})
|
||||
|
||||
@@ -396,7 +396,7 @@ func testComplexQuery() {
|
||||
return
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 复杂查询结果: %v\n", result)
|
||||
})
|
||||
|
||||
|
||||
@@ -609,14 +609,14 @@ func testMathFunctions(ssql *streamsql.Streamsql) {
|
||||
}
|
||||
|
||||
// 添加测试数据
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{
|
||||
testData := []map[string]interface{}{
|
||||
{
|
||||
"device": "sensor1",
|
||||
"temperature": 68.0, // 华氏度
|
||||
"radius": 5.0,
|
||||
"x1": 0.0, "y1": 0.0, "x2": 3.0, "y2": 4.0, // 距离=5
|
||||
},
|
||||
map[string]interface{}{
|
||||
{
|
||||
"device": "sensor1",
|
||||
"temperature": 86.0, // 华氏度
|
||||
"radius": 10.0,
|
||||
@@ -625,7 +625,7 @@ func testMathFunctions(ssql *streamsql.Streamsql) {
|
||||
}
|
||||
|
||||
// 添加结果监听器
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 数学函数结果: %v\n", result)
|
||||
})
|
||||
|
||||
@@ -659,20 +659,20 @@ func testStringFunctions(ssql *streamsql.Streamsql) {
|
||||
}
|
||||
|
||||
// 添加测试数据
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{
|
||||
testData := []map[string]interface{}{
|
||||
{
|
||||
"device": "sensor1",
|
||||
"metadata": `{"version":"1.0","type":"temperature"}`,
|
||||
"level": 3,
|
||||
},
|
||||
map[string]interface{}{
|
||||
{
|
||||
"device": "sensor2",
|
||||
"metadata": `{"version":"2.0","type":"humidity"}`,
|
||||
"level": 5,
|
||||
},
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 字符串函数结果: %v\n", result)
|
||||
})
|
||||
|
||||
@@ -702,20 +702,20 @@ func testConversionFunctions(ssql *streamsql.Streamsql) {
|
||||
}
|
||||
|
||||
// 添加测试数据
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{
|
||||
testData := []map[string]interface{}{
|
||||
{
|
||||
"device": "server1",
|
||||
"client_ip": "192.168.1.100",
|
||||
"memory_usage": 1073741824, // 1GB
|
||||
},
|
||||
map[string]interface{}{
|
||||
{
|
||||
"device": "server2",
|
||||
"client_ip": "10.0.0.50",
|
||||
"memory_usage": 2147483648, // 2GB
|
||||
},
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 转换函数结果: %v\n", result)
|
||||
})
|
||||
|
||||
@@ -746,14 +746,14 @@ func testAggregateFunctions(ssql *streamsql.Streamsql) {
|
||||
}
|
||||
|
||||
// 添加测试数据
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{"device": "sensor1", "value": 2.0, "category": "A"},
|
||||
map[string]interface{}{"device": "sensor1", "value": 8.0, "category": "A"},
|
||||
map[string]interface{}{"device": "sensor1", "value": 32.0, "category": "B"},
|
||||
map[string]interface{}{"device": "sensor1", "value": 128.0, "category": "A"},
|
||||
testData := []map[string]interface{}{
|
||||
{"device": "sensor1", "value": 2.0, "category": "A"},
|
||||
{"device": "sensor1", "value": 8.0, "category": "A"},
|
||||
{"device": "sensor1", "value": 32.0, "category": "B"},
|
||||
{"device": "sensor1", "value": 128.0, "category": "A"},
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 聚合函数结果: %v\n", result)
|
||||
})
|
||||
|
||||
|
||||
@@ -119,19 +119,18 @@ func demonstrateBasicNestedAccess(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" 📋 基础嵌套字段访问结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备名称: %v\n", item["device_name"])
|
||||
fmt.Printf(" 设备位置: %v\n", item["device.location"])
|
||||
fmt.Printf(" 温度: %v°C\n", item["sensor.temperature"])
|
||||
fmt.Printf(" 湿度: %v%%\n", item["sensor.humidity"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备名称: %v\n", item["device_name"])
|
||||
fmt.Printf(" 设备位置: %v\n", item["device.location"])
|
||||
fmt.Printf(" 温度: %v°C\n", item["sensor.temperature"])
|
||||
fmt.Printf(" 湿度: %v%%\n", item["sensor.humidity"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -166,20 +165,19 @@ func demonstrateNestedAggregation(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" 📈 嵌套字段聚合结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
resultCount++
|
||||
fmt.Printf(" 聚合结果 %d:\n", i+1)
|
||||
fmt.Printf(" 位置: %v\n", item["device.location"])
|
||||
fmt.Printf(" 平均温度: %.2f°C\n", item["avg_temp"])
|
||||
fmt.Printf(" 最大湿度: %.1f%%\n", item["max_humidity"])
|
||||
fmt.Printf(" 传感器数量: %v\n", item["sensor_count"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
resultCount++
|
||||
fmt.Printf(" 聚合结果 %d:\n", i+1)
|
||||
fmt.Printf(" 位置: %v\n", item["device.location"])
|
||||
fmt.Printf(" 平均温度: %.2f°C\n", item["avg_temp"])
|
||||
fmt.Printf(" 最大湿度: %.1f%%\n", item["max_humidity"])
|
||||
fmt.Printf(" 传感器数量: %v\n", item["sensor_count"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -271,19 +269,18 @@ func demonstrateArrayAccess(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" 📋 数组索引访问结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备: %v\n", item["device"])
|
||||
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
|
||||
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
|
||||
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备: %v\n", item["device"])
|
||||
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
|
||||
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
|
||||
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -356,20 +353,19 @@ func demonstrateMapKeyAccess(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" 🗝️ Map键访问结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备ID: %v\n", item["device_id"])
|
||||
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
|
||||
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
|
||||
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
|
||||
fmt.Printf(" 应用版本: %v\n", item["app_version"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备ID: %v\n", item["device_id"])
|
||||
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
|
||||
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
|
||||
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
|
||||
fmt.Printf(" 应用版本: %v\n", item["app_version"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -454,20 +450,19 @@ func demonstrateComplexMixedAccess(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" 🔄 混合复杂访问结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 建筑: %v\n", item["building"])
|
||||
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
|
||||
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
|
||||
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
|
||||
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 建筑: %v\n", item["building"])
|
||||
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
|
||||
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
|
||||
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
|
||||
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -513,19 +508,18 @@ func demonstrateNegativeIndexAccess(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" ⬅️ 负数索引访问结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备名称: %v\n", item["device_name"])
|
||||
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
|
||||
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
|
||||
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
fmt.Printf(" 记录 %d:\n", i+1)
|
||||
fmt.Printf(" 设备名称: %v\n", item["device_name"])
|
||||
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
|
||||
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
|
||||
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -560,20 +554,19 @@ func demonstrateArrayIndexAggregation(ssql *streamsql.Streamsql) {
|
||||
wg.Add(1)
|
||||
|
||||
// 设置结果回调
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Println(" 📈 数组索引聚合计算结果:")
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
for i, item := range resultSlice {
|
||||
resultCount++
|
||||
fmt.Printf(" 聚合结果 %d:\n", i+1)
|
||||
fmt.Printf(" 位置: %v\n", item["location"])
|
||||
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
|
||||
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
|
||||
fmt.Printf(" 设备数量: %v\n", item["device_count"])
|
||||
fmt.Println()
|
||||
}
|
||||
resultSlice := result
|
||||
for i, item := range resultSlice {
|
||||
resultCount++
|
||||
fmt.Printf(" 聚合结果 %d:\n", i+1)
|
||||
fmt.Printf(" 位置: %v\n", item["location"])
|
||||
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
|
||||
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
|
||||
fmt.Printf(" 设备数量: %v\n", item["device_count"])
|
||||
fmt.Println()
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ func demonstrateDataCleaning() {
|
||||
}
|
||||
|
||||
// 结果处理
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 清洗后数据: %+v\n", result)
|
||||
})
|
||||
|
||||
@@ -103,7 +103,7 @@ func demonstrateDataEnrichment() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 富化后数据: %+v\n", result)
|
||||
})
|
||||
|
||||
@@ -147,7 +147,7 @@ func demonstrateRealTimeAlerting() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 🚨 告警事件: %+v\n", result)
|
||||
})
|
||||
|
||||
@@ -191,7 +191,7 @@ func demonstrateDataFormatConversion() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 格式转换结果: %+v\n", result)
|
||||
})
|
||||
|
||||
@@ -230,7 +230,7 @@ func demonstrateDataRouting() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 路由结果: %+v\n", result)
|
||||
})
|
||||
|
||||
@@ -273,7 +273,7 @@ func demonstrateNestedFieldProcessing() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 嵌套字段处理结果: %+v\n", result)
|
||||
})
|
||||
|
||||
|
||||
@@ -35,11 +35,9 @@ func demo1() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
if results, ok := result.([]map[string]interface{}); ok {
|
||||
for _, data := range results {
|
||||
fmt.Printf("发现空值数据: %+v\n", data)
|
||||
}
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
for _, data := range result {
|
||||
fmt.Printf("发现空值数据: %+v\n", data)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -75,11 +73,9 @@ func demo2() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
if results, ok := result.([]map[string]interface{}); ok {
|
||||
for _, data := range results {
|
||||
fmt.Printf("发现有效数据: %+v\n", data)
|
||||
}
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
for _, data := range result {
|
||||
fmt.Printf("发现有效数据: %+v\n", data)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -115,16 +111,14 @@ func demo3() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
if results, ok := result.([]map[string]interface{}); ok {
|
||||
for _, data := range results {
|
||||
status := data["status"]
|
||||
value := data["value"]
|
||||
if status != nil {
|
||||
fmt.Printf("状态非空的数据: %+v\n", data)
|
||||
} else if value == nil {
|
||||
fmt.Printf("值为空的数据: %+v\n", data)
|
||||
}
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
for _, data := range result {
|
||||
status := data["status"]
|
||||
value := data["value"]
|
||||
if status != nil {
|
||||
fmt.Printf("状态非空的数据: %+v\n", data)
|
||||
} else if value == nil {
|
||||
fmt.Printf("值为空的数据: %+v\n", data)
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -162,18 +156,16 @@ func demo4() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
if results, ok := result.([]map[string]interface{}); ok {
|
||||
for _, data := range results {
|
||||
value := data["value"]
|
||||
status := data["status"]
|
||||
priority := data["priority"]
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
for _, data := range result {
|
||||
value := data["value"]
|
||||
status := data["status"]
|
||||
priority := data["priority"]
|
||||
|
||||
if value != nil && value.(float64) > 20 {
|
||||
fmt.Printf("高值数据 (value > 20): %+v\n", data)
|
||||
} else if status == nil && priority != nil {
|
||||
fmt.Printf("状态异常但有优先级的数据: %+v\n", data)
|
||||
}
|
||||
if value != nil && value.(float64) > 20 {
|
||||
fmt.Printf("高值数据 (value > 20): %+v\n", data)
|
||||
} else if status == nil && priority != nil {
|
||||
fmt.Printf("状态异常但有优先级的数据: %+v\n", data)
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -212,11 +204,9 @@ func demo5() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
if results, ok := result.([]map[string]interface{}); ok {
|
||||
for _, data := range results {
|
||||
fmt.Printf("有位置信息的设备: %+v\n", data)
|
||||
}
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
for _, data := range result {
|
||||
fmt.Printf("有位置信息的设备: %+v\n", data)
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
@@ -1,233 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025 The RuleGo Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/stream"
|
||||
"github.com/rulego/streamsql/types"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Println("=== StreamSQL 持久化功能测试 ===")
|
||||
|
||||
// 清理之前的测试数据
|
||||
cleanupTestData()
|
||||
|
||||
// 测试1: 创建持久化流并模拟数据溢出
|
||||
fmt.Println("📌 测试1: 数据溢出持久化")
|
||||
testDataOverflowPersistence()
|
||||
|
||||
// 测试2: 模拟程序重启和数据恢复
|
||||
fmt.Println("📌 测试2: 程序重启数据恢复")
|
||||
testDataRecovery()
|
||||
|
||||
// 测试3: 查看持久化文件内容
|
||||
fmt.Println("📌 测试3: 持久化文件分析")
|
||||
analyzePersistenceFiles()
|
||||
|
||||
fmt.Println("✅ 真正持久化功能测试完成!")
|
||||
}
|
||||
|
||||
func testDataOverflowPersistence() {
|
||||
config := types.Config{
|
||||
SimpleFields: []string{"id", "value"},
|
||||
}
|
||||
|
||||
// 创建小缓冲区的持久化流处理器
|
||||
stream, err := stream.NewStreamWithLossPolicy(
|
||||
config,
|
||||
100, // 很小的缓冲区,容易溢出
|
||||
100, // 小结果缓冲区
|
||||
50, // 小sink池
|
||||
"persist", // 持久化策略
|
||||
5*time.Second,
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Printf("创建流失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
stream.Start()
|
||||
|
||||
// 快速发送大量数据,触发溢出
|
||||
inputCount := 1000
|
||||
fmt.Printf("快速发送 %d 条数据到小缓冲区 (容量100)...\n", inputCount)
|
||||
|
||||
start := time.Now()
|
||||
for i := 0; i < inputCount; i++ {
|
||||
data := map[string]interface{}{
|
||||
"id": i,
|
||||
"value": fmt.Sprintf("data_%d", i),
|
||||
}
|
||||
stream.Emit(data)
|
||||
}
|
||||
duration := time.Since(start)
|
||||
|
||||
// 等待持久化完成
|
||||
fmt.Println("等待持久化操作完成...")
|
||||
time.Sleep(8 * time.Second)
|
||||
|
||||
// 获取统计信息
|
||||
stats := stream.GetDetailedStats()
|
||||
persistStats := stream.GetPersistenceStats()
|
||||
|
||||
fmt.Printf("⏱️ 发送耗时: %v\n", duration)
|
||||
fmt.Printf("📊 输入数据: %d\n", stats["basic_stats"].(map[string]int64)["input_count"])
|
||||
fmt.Printf("📊 处理数据: %d\n", stats["basic_stats"].(map[string]int64)["output_count"])
|
||||
fmt.Printf("📊 通道容量: %d\n", stats["basic_stats"].(map[string]int64)["data_chan_cap"])
|
||||
fmt.Printf("📊 持久化启用: %v\n", persistStats["enabled"])
|
||||
fmt.Printf("📊 待写入数据: %v\n", persistStats["pending_count"])
|
||||
fmt.Printf("📊 当前文件大小: %v bytes\n", persistStats["current_file_size"])
|
||||
fmt.Printf("📊 文件索引: %v\n", persistStats["file_index"])
|
||||
|
||||
stream.Stop()
|
||||
}
|
||||
|
||||
func testDataRecovery() {
|
||||
config := types.Config{
|
||||
SimpleFields: []string{"id", "value"},
|
||||
}
|
||||
|
||||
// 创建新的持久化流处理器(模拟程序重启)
|
||||
stream, err := stream.NewStreamWithLossPolicy(
|
||||
config,
|
||||
200, // 更大的缓冲区用于恢复
|
||||
200,
|
||||
100,
|
||||
"persist", // 持久化策略
|
||||
5*time.Second,
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Printf("创建流失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
stream.Start()
|
||||
|
||||
// 添加sink来接收恢复的数据
|
||||
recoveredCount := 0
|
||||
stream.AddSink(func(data interface{}) {
|
||||
recoveredCount++
|
||||
if recoveredCount <= 5 {
|
||||
fmt.Printf("恢复数据 %d: %+v\n", recoveredCount, data)
|
||||
}
|
||||
})
|
||||
|
||||
// 尝试加载并重新处理持久化数据
|
||||
fmt.Println("尝试加载持久化数据...")
|
||||
if err := stream.LoadAndReprocessPersistedData(); err != nil {
|
||||
fmt.Printf("数据恢复失败: %v\n", err)
|
||||
}
|
||||
|
||||
// 等待处理完成
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
stats := stream.GetDetailedStats()
|
||||
fmt.Printf("📊 恢复后处理数据: %d\n", stats["basic_stats"].(map[string]int64)["output_count"])
|
||||
fmt.Printf("📊 接收到的恢复数据: %d\n", recoveredCount)
|
||||
|
||||
stream.Stop()
|
||||
}
|
||||
|
||||
func analyzePersistenceFiles() {
|
||||
dataDir := "./streamsql_overflow_data"
|
||||
|
||||
// 检查持久化目录
|
||||
if _, err := os.Stat(dataDir); os.IsNotExist(err) {
|
||||
fmt.Println("持久化目录不存在")
|
||||
return
|
||||
}
|
||||
|
||||
// 列出所有持久化文件
|
||||
files, err := filepath.Glob(filepath.Join(dataDir, "streamsql_overflow_*.log"))
|
||||
if err != nil {
|
||||
fmt.Printf("读取持久化文件失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(files) == 0 {
|
||||
fmt.Println("没有找到持久化文件(可能已被恢复过程删除)")
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("发现 %d 个持久化文件:\n", len(files))
|
||||
for i, file := range files {
|
||||
info, err := os.Stat(file)
|
||||
if err != nil {
|
||||
fmt.Printf(" %d. %s (无法读取文件信息)\n", i+1, filepath.Base(file))
|
||||
continue
|
||||
}
|
||||
fmt.Printf(" %d. %s (大小: %d bytes, 修改时间: %s)\n",
|
||||
i+1, filepath.Base(file), info.Size(), info.ModTime().Format("15:04:05"))
|
||||
}
|
||||
|
||||
// 读取第一个文件的前几行内容
|
||||
if len(files) > 0 {
|
||||
fmt.Printf("\n第一个文件的前3行内容:\n")
|
||||
showFileContent(files[0], 3)
|
||||
}
|
||||
}
|
||||
|
||||
func showFileContent(filename string, maxLines int) {
|
||||
file, err := os.Open(filename)
|
||||
if err != nil {
|
||||
fmt.Printf("无法打开文件: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
buffer := make([]byte, 1024)
|
||||
n, err := file.Read(buffer)
|
||||
if err != nil {
|
||||
fmt.Printf("无法读取文件: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
content := string(buffer[:n])
|
||||
lines := []rune(content)
|
||||
lineCount := 0
|
||||
currentLine := ""
|
||||
|
||||
for _, char := range lines {
|
||||
if char == '\n' {
|
||||
lineCount++
|
||||
fmt.Printf(" %d: %s\n", lineCount, currentLine)
|
||||
currentLine = ""
|
||||
if lineCount >= maxLines {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
currentLine += string(char)
|
||||
}
|
||||
}
|
||||
|
||||
if currentLine != "" && lineCount < maxLines {
|
||||
fmt.Printf(" %d: %s\n", lineCount+1, currentLine)
|
||||
}
|
||||
}
|
||||
|
||||
func cleanupTestData() {
|
||||
dataDir := "./streamsql_overflow_data"
|
||||
if err := os.RemoveAll(dataDir); err != nil {
|
||||
fmt.Printf("清理测试数据失败: %v\n", err)
|
||||
}
|
||||
}
|
||||
@@ -122,19 +122,19 @@ func testSimpleQuery(ssql *streamsql.Streamsql) {
|
||||
}
|
||||
|
||||
// 添加结果监听器
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 简单查询结果: %v\n", result)
|
||||
})
|
||||
|
||||
// 添加测试数据
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{
|
||||
testData := []map[string]interface{}{
|
||||
{
|
||||
"device": "sensor1",
|
||||
"value": 5.0,
|
||||
"temperature": 68.0, // 华氏度
|
||||
"radius": 3.0,
|
||||
},
|
||||
map[string]interface{}{
|
||||
{
|
||||
"device": "sensor2",
|
||||
"value": 10.0,
|
||||
"temperature": 86.0, // 华氏度
|
||||
@@ -171,25 +171,25 @@ func testAggregateQuery(ssql *streamsql.Streamsql) {
|
||||
}
|
||||
|
||||
// 添加结果监听器
|
||||
ssql.AddSink(func(result interface{}) {
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
fmt.Printf(" 📊 聚合查询结果: %v\n", result)
|
||||
})
|
||||
|
||||
// 添加测试数据
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{
|
||||
testData := []map[string]interface{}{
|
||||
{
|
||||
"device": "sensor1",
|
||||
"value": 3.0,
|
||||
"temperature": 32.0, // 0°C
|
||||
"radius": 1.0,
|
||||
},
|
||||
map[string]interface{}{
|
||||
{
|
||||
"device": "sensor1",
|
||||
"value": 4.0,
|
||||
"temperature": 212.0, // 100°C
|
||||
"radius": 2.0,
|
||||
},
|
||||
map[string]interface{}{
|
||||
{
|
||||
"device": "sensor2",
|
||||
"value": 5.0,
|
||||
"temperature": 68.0, // 20°C
|
||||
|
||||
@@ -79,4 +79,4 @@ func main() {
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
fmt.Println("\n=== 示例结束 ===")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,218 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/stream"
|
||||
"github.com/rulego/streamsql/types"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Println("=== StreamSQL 统一配置系统演示 ===")
|
||||
|
||||
// 1. 使用新的配置API创建默认配置Stream
|
||||
fmt.Println("\n1. 默认配置Stream:")
|
||||
defaultConfig := types.NewConfig()
|
||||
defaultConfig.SimpleFields = []string{"temperature", "humidity", "location"}
|
||||
|
||||
defaultStream, err := stream.NewStream(defaultConfig)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
printStreamStats("默认配置", defaultStream)
|
||||
|
||||
// 2. 使用高性能预设配置
|
||||
fmt.Println("\n2. 高性能配置Stream:")
|
||||
highPerfConfig := types.NewConfigWithPerformance(types.HighPerformanceConfig())
|
||||
highPerfConfig.SimpleFields = []string{"temperature", "humidity", "location"}
|
||||
|
||||
highPerfStream, err := stream.NewStreamWithHighPerformance(highPerfConfig)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
printStreamStats("高性能配置", highPerfStream)
|
||||
|
||||
// 3. 使用低延迟预设配置
|
||||
fmt.Println("\n3. 低延迟配置Stream:")
|
||||
lowLatencyConfig := types.NewConfigWithPerformance(types.LowLatencyConfig())
|
||||
lowLatencyConfig.SimpleFields = []string{"temperature", "humidity", "location"}
|
||||
|
||||
lowLatencyStream, err := stream.NewStreamWithLowLatency(lowLatencyConfig)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
printStreamStats("低延迟配置", lowLatencyStream)
|
||||
|
||||
// 4. 使用零数据丢失预设配置
|
||||
fmt.Println("\n4. 零数据丢失配置Stream:")
|
||||
zeroLossConfig := types.NewConfigWithPerformance(types.ZeroDataLossConfig())
|
||||
zeroLossConfig.SimpleFields = []string{"temperature", "humidity", "location"}
|
||||
|
||||
zeroLossStream, err := stream.NewStreamWithZeroDataLoss(zeroLossConfig)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
printStreamStats("零数据丢失配置", zeroLossStream)
|
||||
|
||||
// 5. 使用持久化预设配置
|
||||
fmt.Println("\n5. 持久化配置Stream:")
|
||||
persistConfig := types.NewConfigWithPerformance(types.PersistencePerformanceConfig())
|
||||
persistConfig.SimpleFields = []string{"temperature", "humidity", "location"}
|
||||
|
||||
persistStream, err := stream.NewStreamWithCustomPerformance(persistConfig, types.PersistencePerformanceConfig())
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
printStreamStats("持久化配置", persistStream)
|
||||
|
||||
// 6. 创建完全自定义的配置
|
||||
fmt.Println("\n6. 自定义配置Stream:")
|
||||
customPerfConfig := types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
DataChannelSize: 30000,
|
||||
ResultChannelSize: 25000,
|
||||
WindowOutputSize: 3000,
|
||||
EnableDynamicResize: true,
|
||||
MaxBufferSize: 200000,
|
||||
UsageThreshold: 0.85,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: "expand",
|
||||
BlockTimeout: 15 * time.Second,
|
||||
AllowDataLoss: false,
|
||||
ExpansionConfig: types.ExpansionConfig{
|
||||
GrowthFactor: 2.0,
|
||||
MinIncrement: 2000,
|
||||
TriggerThreshold: 0.9,
|
||||
ExpansionTimeout: 3 * time.Second,
|
||||
},
|
||||
},
|
||||
WorkerConfig: types.WorkerConfig{
|
||||
SinkPoolSize: 800,
|
||||
SinkWorkerCount: 12,
|
||||
MaxRetryRoutines: 10,
|
||||
},
|
||||
MonitoringConfig: types.MonitoringConfig{
|
||||
EnableMonitoring: true,
|
||||
StatsUpdateInterval: 500 * time.Millisecond,
|
||||
EnableDetailedStats: true,
|
||||
WarningThresholds: types.WarningThresholds{
|
||||
DropRateWarning: 5.0,
|
||||
DropRateCritical: 15.0,
|
||||
BufferUsageWarning: 75.0,
|
||||
BufferUsageCritical: 90.0,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
customConfig := types.NewConfigWithPerformance(customPerfConfig)
|
||||
customConfig.SimpleFields = []string{"temperature", "humidity", "location"}
|
||||
|
||||
customStream, err := stream.NewStreamWithCustomPerformance(customConfig, customPerfConfig)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
printStreamStats("自定义配置", customStream)
|
||||
|
||||
// 7. 配置比较演示
|
||||
fmt.Println("\n7. 配置比较:")
|
||||
compareConfigurations()
|
||||
|
||||
// 8. 实时数据处理演示
|
||||
fmt.Println("\n8. 实时数据处理演示:")
|
||||
demonstrateRealTimeProcessing(defaultStream)
|
||||
|
||||
// 9. 窗口统一配置演示
|
||||
fmt.Println("\n9. 窗口统一配置演示:")
|
||||
demonstrateWindowConfig()
|
||||
|
||||
// 清理资源
|
||||
fmt.Println("\n10. 清理资源...")
|
||||
defaultStream.Stop()
|
||||
highPerfStream.Stop()
|
||||
lowLatencyStream.Stop()
|
||||
zeroLossStream.Stop()
|
||||
persistStream.Stop()
|
||||
customStream.Stop()
|
||||
|
||||
fmt.Println("\n=== 演示完成 ===")
|
||||
}
|
||||
|
||||
func printStreamStats(name string, s *stream.Stream) {
|
||||
stats := s.GetStats()
|
||||
detailedStats := s.GetDetailedStats()
|
||||
|
||||
fmt.Printf("【%s】统计信息:\n", name)
|
||||
fmt.Printf(" 数据通道: %d/%d (使用率: %.1f%%)\n",
|
||||
stats["data_chan_len"], stats["data_chan_cap"],
|
||||
detailedStats["data_chan_usage"])
|
||||
fmt.Printf(" 结果通道: %d/%d (使用率: %.1f%%)\n",
|
||||
stats["result_chan_len"], stats["result_chan_cap"],
|
||||
detailedStats["result_chan_usage"])
|
||||
fmt.Printf(" 工作池: %d/%d (使用率: %.1f%%)\n",
|
||||
stats["sink_pool_len"], stats["sink_pool_cap"],
|
||||
detailedStats["sink_pool_usage"])
|
||||
fmt.Printf(" 性能等级: %s\n", detailedStats["performance_level"])
|
||||
}
|
||||
|
||||
func compareConfigurations() {
|
||||
configs := map[string]types.PerformanceConfig{
|
||||
"默认配置": types.DefaultPerformanceConfig(),
|
||||
"高性能配置": types.HighPerformanceConfig(),
|
||||
"低延迟配置": types.LowLatencyConfig(),
|
||||
"零丢失配置": types.ZeroDataLossConfig(),
|
||||
"持久化配置": types.PersistencePerformanceConfig(),
|
||||
}
|
||||
|
||||
fmt.Printf("%-12s %-10s %-10s %-10s %-10s %-15s\n",
|
||||
"配置类型", "数据缓冲", "结果缓冲", "工作池", "工作线程", "溢出策略")
|
||||
fmt.Println(strings.Repeat("-", 75))
|
||||
|
||||
for name, config := range configs {
|
||||
fmt.Printf("%-12s %-10d %-10d %-10d %-10d %-15s\n",
|
||||
name,
|
||||
config.BufferConfig.DataChannelSize,
|
||||
config.BufferConfig.ResultChannelSize,
|
||||
config.WorkerConfig.SinkPoolSize,
|
||||
config.WorkerConfig.SinkWorkerCount,
|
||||
config.OverflowConfig.Strategy)
|
||||
}
|
||||
}
|
||||
|
||||
func demonstrateRealTimeProcessing(s *stream.Stream) {
|
||||
// 设置数据接收器
|
||||
s.AddSink(func(data interface{}) {
|
||||
fmt.Printf(" 接收到处理结果: %v\n", data)
|
||||
})
|
||||
|
||||
// 启动流处理
|
||||
s.Start()
|
||||
|
||||
// 模拟发送数据
|
||||
for i := 0; i < 3; i++ {
|
||||
data := map[string]interface{}{
|
||||
"temperature": 20.0 + float64(i)*2.5,
|
||||
"humidity": 60.0 + float64(i)*5,
|
||||
"location": fmt.Sprintf("sensor_%d", i+1),
|
||||
"timestamp": time.Now().Unix(),
|
||||
}
|
||||
|
||||
fmt.Printf(" 发送数据: %v\n", data)
|
||||
s.Emit(data)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// 等待处理完成
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// 显示最终统计
|
||||
finalStats := s.GetDetailedStats()
|
||||
fmt.Printf(" 最终统计 - 输入: %d, 输出: %d, 丢弃: %d, 处理率: %.1f%%\n",
|
||||
finalStats["basic_stats"].(map[string]int64)["input_count"],
|
||||
finalStats["basic_stats"].(map[string]int64)["output_count"],
|
||||
finalStats["basic_stats"].(map[string]int64)["dropped_count"],
|
||||
finalStats["process_rate"])
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql"
|
||||
"github.com/rulego/streamsql/types"
|
||||
)
|
||||
|
||||
// demonstrateWindowConfig 演示窗口统一配置的使用
|
||||
func demonstrateWindowConfig() {
|
||||
fmt.Println("=== 窗口统一配置演示 ===")
|
||||
|
||||
// 1. 测试默认配置的窗口
|
||||
fmt.Println("\n1. 默认配置窗口测试")
|
||||
testWindowWithConfig("默认配置", streamsql.New())
|
||||
|
||||
// 2. 测试高性能配置的窗口
|
||||
fmt.Println("\n2. 高性能配置窗口测试")
|
||||
testWindowWithConfig("高性能配置", streamsql.New(streamsql.WithHighPerformance()))
|
||||
|
||||
// 3. 测试低延迟配置的窗口
|
||||
fmt.Println("\n3. 低延迟配置窗口测试")
|
||||
testWindowWithConfig("低延迟配置", streamsql.New(streamsql.WithLowLatency()))
|
||||
|
||||
// 4. 测试自定义配置的窗口
|
||||
fmt.Println("\n4. 自定义配置窗口测试")
|
||||
customConfig := types.DefaultPerformanceConfig()
|
||||
customConfig.BufferConfig.WindowOutputSize = 2000 // 自定义窗口输出缓冲区大小
|
||||
testWindowWithConfig("自定义配置", streamsql.New(streamsql.WithCustomPerformance(customConfig)))
|
||||
|
||||
fmt.Println("\n=== 窗口配置演示完成 ===")
|
||||
}
|
||||
|
||||
func testWindowWithConfig(configName string, ssql *streamsql.Streamsql) {
|
||||
// 执行一个简单的滚动窗口查询
|
||||
sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('2s')"
|
||||
|
||||
err := ssql.Execute(sql)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ %s - 执行SQL失败: %v\n", configName, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 添加结果处理器
|
||||
stream := ssql.Stream()
|
||||
if stream != nil {
|
||||
stream.AddSink(func(result interface{}) {
|
||||
fmt.Printf("📊 %s - 窗口结果: %v\n", configName, result)
|
||||
})
|
||||
|
||||
// 发送测试数据
|
||||
for i := 0; i < 5; i++ {
|
||||
data := map[string]interface{}{
|
||||
"deviceId": fmt.Sprintf("device_%d", i%2),
|
||||
"temperature": 20.0 + float64(i),
|
||||
"timestamp": time.Now(),
|
||||
}
|
||||
ssql.Emit(data)
|
||||
}
|
||||
|
||||
// 等待处理完成
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
// 获取统计信息
|
||||
stats := ssql.GetDetailedStats()
|
||||
fmt.Printf("📈 %s - 统计信息: %v\n", configName, stats)
|
||||
}
|
||||
|
||||
// 停止流处理
|
||||
ssql.Stop()
|
||||
fmt.Printf("✅ %s - 测试完成\n", configName)
|
||||
}
|
||||
@@ -0,0 +1,245 @@
|
||||
package expr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// parseCaseExpression parses CASE expression
|
||||
func parseCaseExpression(tokens []string) (*ExprNode, []string, error) {
|
||||
if len(tokens) == 0 || strings.ToUpper(tokens[0]) != "CASE" {
|
||||
return nil, nil, fmt.Errorf("expected CASE keyword")
|
||||
}
|
||||
|
||||
remaining := tokens[1:]
|
||||
caseExpr := &CaseExpression{}
|
||||
|
||||
// Check if it's a simple CASE expression (CASE expr WHEN value THEN result)
|
||||
if len(remaining) > 0 && strings.ToUpper(remaining[0]) != "WHEN" {
|
||||
// Simple CASE expression
|
||||
value, newRemaining, err := parseOrExpression(remaining)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error parsing CASE expression: %v", err)
|
||||
}
|
||||
caseExpr.Value = value
|
||||
remaining = newRemaining
|
||||
}
|
||||
|
||||
// Parse WHEN clauses
|
||||
for len(remaining) > 0 && strings.ToUpper(remaining[0]) == "WHEN" {
|
||||
remaining = remaining[1:] // Skip WHEN
|
||||
|
||||
// Parse WHEN condition
|
||||
condition, newRemaining, err := parseOrExpression(remaining)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error parsing WHEN condition: %v", err)
|
||||
}
|
||||
remaining = newRemaining
|
||||
|
||||
// Check THEN keyword
|
||||
if len(remaining) == 0 || strings.ToUpper(remaining[0]) != "THEN" {
|
||||
return nil, nil, fmt.Errorf("expected THEN after WHEN condition")
|
||||
}
|
||||
remaining = remaining[1:] // Skip THEN
|
||||
|
||||
// Parse THEN result
|
||||
result, newRemaining, err := parseOrExpression(remaining)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error parsing THEN result: %v", err)
|
||||
}
|
||||
remaining = newRemaining
|
||||
|
||||
// Add WHEN clause
|
||||
caseExpr.WhenClauses = append(caseExpr.WhenClauses, WhenClause{
|
||||
Condition: condition,
|
||||
Result: result,
|
||||
})
|
||||
}
|
||||
|
||||
// Parse optional ELSE clause
|
||||
if len(remaining) > 0 && strings.ToUpper(remaining[0]) == "ELSE" {
|
||||
remaining = remaining[1:] // Skip ELSE
|
||||
|
||||
elseExpr, newRemaining, err := parseOrExpression(remaining)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error parsing ELSE expression: %v", err)
|
||||
}
|
||||
caseExpr.ElseResult = elseExpr
|
||||
remaining = newRemaining
|
||||
}
|
||||
|
||||
// Check END keyword
|
||||
if len(remaining) == 0 || strings.ToUpper(remaining[0]) != "END" {
|
||||
return nil, nil, fmt.Errorf("expected END to close CASE expression")
|
||||
}
|
||||
|
||||
// Create ExprNode containing CaseExpression
|
||||
caseNode := &ExprNode{
|
||||
Type: TypeCase,
|
||||
CaseExpr: caseExpr,
|
||||
}
|
||||
|
||||
return caseNode, remaining[1:], nil
|
||||
}
|
||||
|
||||
// evaluateCaseExpression evaluates the value of CASE expression
|
||||
func evaluateCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
|
||||
if node.Type != TypeCase {
|
||||
return 0, fmt.Errorf("not a CASE expression")
|
||||
}
|
||||
|
||||
if node.CaseExpr == nil {
|
||||
return 0, fmt.Errorf("invalid CASE expression")
|
||||
}
|
||||
|
||||
// Simple CASE expression: CASE expr WHEN value THEN result
|
||||
if node.CaseExpr.Value != nil {
|
||||
return evaluateSimpleCaseExpression(node, data)
|
||||
}
|
||||
|
||||
// Search CASE expression: CASE WHEN condition THEN result
|
||||
return evaluateSearchCaseExpression(node, data)
|
||||
}
|
||||
|
||||
// evaluateSimpleCaseExpression evaluates simple CASE expression
|
||||
func evaluateSimpleCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
|
||||
caseExpr := node.CaseExpr
|
||||
if caseExpr == nil {
|
||||
return 0, fmt.Errorf("invalid CASE expression")
|
||||
}
|
||||
|
||||
// Evaluate CASE expression value
|
||||
caseValue, err := evaluateNodeValue(caseExpr.Value, data)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Iterate through WHEN clauses
|
||||
for _, whenClause := range caseExpr.WhenClauses {
|
||||
// Evaluate WHEN value
|
||||
whenValue, err := evaluateNodeValue(whenClause.Condition, data)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Compare values
|
||||
if compareValuesForEquality(caseValue, whenValue) {
|
||||
// Evaluate and return THEN result
|
||||
return evaluateNode(whenClause.Result, data)
|
||||
}
|
||||
}
|
||||
|
||||
// If no matching WHEN clause, evaluate ELSE expression
|
||||
if caseExpr.ElseResult != nil {
|
||||
return evaluateNode(caseExpr.ElseResult, data)
|
||||
}
|
||||
|
||||
// If no ELSE clause, return NULL (return 0 here)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// evaluateSearchCaseExpression evaluates search CASE expression
|
||||
func evaluateSearchCaseExpression(node *ExprNode, data map[string]interface{}) (float64, error) {
|
||||
caseExpr := node.CaseExpr
|
||||
if caseExpr == nil {
|
||||
return 0, fmt.Errorf("invalid CASE expression")
|
||||
}
|
||||
|
||||
// Iterate through WHEN clauses
|
||||
for _, whenClause := range caseExpr.WhenClauses {
|
||||
// Evaluate WHEN condition - use boolean evaluation to handle logical operators
|
||||
conditionResult, err := evaluateBoolNode(whenClause.Condition, data)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// If condition is true, return THEN result
|
||||
if conditionResult {
|
||||
return evaluateNode(whenClause.Result, data)
|
||||
}
|
||||
}
|
||||
|
||||
// If no matching WHEN clause, evaluate ELSE expression
|
||||
if caseExpr.ElseResult != nil {
|
||||
return evaluateNode(caseExpr.ElseResult, data)
|
||||
}
|
||||
|
||||
// If no ELSE clause, return NULL (return 0 here)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// evaluateCaseExpressionWithNull evaluates CASE expression with NULL value support
|
||||
func evaluateCaseExpressionWithNull(node *ExprNode, data map[string]interface{}) (interface{}, bool, error) {
|
||||
if node.Type != TypeCase {
|
||||
return nil, false, fmt.Errorf("not a CASE expression")
|
||||
}
|
||||
|
||||
caseExpr := node.CaseExpr
|
||||
if caseExpr == nil {
|
||||
return nil, false, fmt.Errorf("invalid CASE expression")
|
||||
}
|
||||
|
||||
// Simple CASE expression: CASE expr WHEN value THEN result
|
||||
if caseExpr.Value != nil {
|
||||
return evaluateCaseExpressionValueWithNull(node, data)
|
||||
}
|
||||
|
||||
// Search CASE expression: CASE WHEN condition THEN result
|
||||
for _, whenClause := range caseExpr.WhenClauses {
|
||||
// Evaluate WHEN condition - use boolean evaluation to handle logical operators
|
||||
conditionResult, err := evaluateBoolNode(whenClause.Condition, data)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// If condition is true, return THEN result
|
||||
if conditionResult {
|
||||
return evaluateNodeValueWithNull(whenClause.Result, data)
|
||||
}
|
||||
}
|
||||
|
||||
// If no matching WHEN clause, evaluate ELSE expression
|
||||
if caseExpr.ElseResult != nil {
|
||||
return evaluateNodeValueWithNull(caseExpr.ElseResult, data)
|
||||
}
|
||||
|
||||
// If no ELSE clause, return NULL
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
// evaluateCaseExpressionValueWithNull evaluates simple CASE expression (with NULL support)
|
||||
func evaluateCaseExpressionValueWithNull(node *ExprNode, data map[string]interface{}) (interface{}, bool, error) {
|
||||
caseExpr := node.CaseExpr
|
||||
if caseExpr == nil {
|
||||
return nil, false, fmt.Errorf("invalid CASE expression")
|
||||
}
|
||||
|
||||
// Evaluate CASE expression value
|
||||
caseValue, caseIsNull, err := evaluateNodeValueWithNull(caseExpr.Value, data)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Iterate through WHEN clauses
|
||||
for _, whenClause := range caseExpr.WhenClauses {
|
||||
// Evaluate WHEN value
|
||||
whenValue, whenIsNull, err := evaluateNodeValueWithNull(whenClause.Condition, data)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// Compare values (with NULL comparison support)
|
||||
if compareValuesWithNullForEquality(caseValue, caseIsNull, whenValue, whenIsNull) {
|
||||
// Evaluate and return THEN result
|
||||
return evaluateNodeValueWithNull(whenClause.Result, data)
|
||||
}
|
||||
}
|
||||
|
||||
// If no matching WHEN clause, evaluate ELSE expression
|
||||
if caseExpr.ElseResult != nil {
|
||||
return evaluateNodeValueWithNull(caseExpr.ElseResult, data)
|
||||
}
|
||||
|
||||
// If no ELSE clause, return NULL
|
||||
return nil, true, nil
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
+117
@@ -0,0 +1,117 @@
|
||||
/*
|
||||
* Copyright 2025 The RuleGo Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
Package expr provides expression parsing and evaluation capabilities for StreamSQL.
|
||||
|
||||
This package implements a comprehensive expression engine that supports mathematical operations,
|
||||
logical comparisons, function calls, field references, and complex CASE expressions.
|
||||
It serves as the foundation for WHERE clauses, HAVING clauses, and computed fields in SQL queries.
|
||||
|
||||
# Core Features
|
||||
|
||||
• Mathematical Operations - Supports arithmetic operators (+, -, *, /, %, ^) with proper precedence
|
||||
• Logical Operations - Boolean logic with AND, OR operators and comparison operators (=, !=, <, >, <=, >=, LIKE)
|
||||
• Function Integration - Seamless integration with the functions package for built-in and custom functions
|
||||
• Field References - Dynamic field access with dot notation support for nested data structures
|
||||
• CASE Expressions - Full support for both simple and searched CASE expressions
|
||||
• Type Safety - Automatic type conversion and validation during expression evaluation
|
||||
• Fallback Support - Integration with expr-lang/expr library for complex expressions
|
||||
|
||||
# Expression Types
|
||||
|
||||
The package supports various expression node types:
|
||||
|
||||
// Basic types
|
||||
TypeNumber - Numeric constants (integers and floats)
|
||||
TypeString - String literals with proper escaping
|
||||
TypeField - Field references (e.g., "temperature", "device.id")
|
||||
TypeOperator - Binary and unary operators
|
||||
TypeFunction - Function calls with argument validation
|
||||
TypeParenthesis - Grouped expressions for precedence control
|
||||
TypeCase - CASE expressions for conditional logic
|
||||
|
||||
# Usage Examples
|
||||
|
||||
Basic mathematical expression:
|
||||
|
||||
expr, err := NewExpression("temperature * 1.8 + 32")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
result, err := expr.Evaluate(data)
|
||||
|
||||
Logical expression with field references:
|
||||
|
||||
expr, err := NewExpression("temperature > 25 AND humidity < 60")
|
||||
result, err := expr.Evaluate(data)
|
||||
|
||||
Function call expression:
|
||||
|
||||
expr, err := NewExpression("UPPER(device_name) LIKE 'SENSOR%'")
|
||||
result, err := expr.Evaluate(data)
|
||||
|
||||
CASE expression for conditional logic:
|
||||
|
||||
expr, err := NewExpression(`
|
||||
CASE
|
||||
WHEN temperature > 30 THEN 'hot'
|
||||
WHEN temperature > 20 THEN 'warm'
|
||||
ELSE 'cold'
|
||||
END
|
||||
`)
|
||||
result, err := expr.Evaluate(data)
|
||||
|
||||
# Operator Precedence
|
||||
|
||||
The expression parser follows standard mathematical precedence rules:
|
||||
|
||||
1. Parentheses (highest)
|
||||
2. Power (^)
|
||||
3. Multiplication, Division, Modulo (*, /, %)
|
||||
4. Addition, Subtraction (+, -)
|
||||
5. Comparison (>, <, >=, <=, LIKE, IS)
|
||||
6. Equality (=, ==, !=, <>)
|
||||
7. Logical AND
|
||||
8. Logical OR (lowest)
|
||||
|
||||
# Error Handling
|
||||
|
||||
The package provides comprehensive error handling with detailed error messages:
|
||||
|
||||
• Syntax validation during expression creation
|
||||
• Type checking during evaluation
|
||||
• Function argument validation
|
||||
• Graceful fallback to expr-lang for unsupported expressions
|
||||
|
||||
# Performance Considerations
|
||||
|
||||
• Expressions are parsed once and can be evaluated multiple times
|
||||
• Built-in operator optimization for common mathematical operations
|
||||
• Lazy evaluation for logical operators (short-circuiting)
|
||||
• Efficient field access caching for repeated evaluations
|
||||
• Automatic fallback to optimized expr-lang library when needed
|
||||
|
||||
# Integration
|
||||
|
||||
This package integrates seamlessly with other StreamSQL components:
|
||||
|
||||
• Functions package - For built-in and custom function execution
|
||||
• Types package - For data type definitions and conversions
|
||||
• Stream package - For real-time expression evaluation in data streams
|
||||
• RSQL package - For SQL parsing and expression extraction
|
||||
*/
|
||||
package expr
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user