refactor:优化注释

This commit is contained in:
rulego-team
2025-08-04 14:38:30 +08:00
parent de4d47d87d
commit a43445ebc7
44 changed files with 2608 additions and 804 deletions
+57 -56
View File
@@ -15,23 +15,24 @@
*/
/*
Package streamsql 是一个轻量级的、基于 SQL 的物联网边缘流处理引擎。
Package streamsql is a lightweight, SQL-based IoT edge stream processing engine.
StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种窗口类型、聚合函数、
自定义函数,以及与 RuleGo 生态的无缝集成。
StreamSQL provides efficient unbounded data stream processing and analysis capabilities,
supporting multiple window types, aggregate functions, custom functions, and seamless
integration with the RuleGo ecosystem.
# 核心特性
# Core Features
轻量级设计 - 纯内存操作,无外部依赖
• SQL语法支持 - 使用熟悉的SQL语法处理流数据
多种窗口类型 - 滑动窗口、滚动窗口、计数窗口、会话窗口
丰富的聚合函数 - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE
插件式自定义函数 - 运行时动态注册,支持8种函数类型
• RuleGo生态集成 - 利用RuleGo组件扩展输入输出源
Lightweight design - Pure in-memory operations, no external dependencies
• SQL syntax support - Process stream data using familiar SQL syntax
Multiple window types - Sliding, tumbling, counting, and session windows
Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
Plugin-based custom functions - Runtime dynamic registration, supports 8 function types
• RuleGo ecosystem integration - Extend input/output sources using RuleGo components
# 入门示例
# Getting Started
基本的流数据处理:
Basic stream data processing:
package main
@@ -43,10 +44,10 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
)
func main() {
// 创建StreamSQL实例
// Create StreamSQL instance
ssql := streamsql.New()
// 定义SQL查询 - 每5秒按设备ID分组计算温度平均值
// Define SQL query - Calculate temperature average by device ID every 5 seconds
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
MIN(humidity) as min_humidity,
@@ -56,18 +57,18 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
WHERE deviceId != 'device3'
GROUP BY deviceId, TumblingWindow('5s')`
// 执行SQL,创建流处理任务
// Execute SQL, create stream processing task
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// 添加结果处理回调
ssql.AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %v\n", result)
// Add result processing callback
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf("Aggregation result: %v\n", result)
})
// 模拟发送流数据
// Simulate sending stream data
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
@@ -75,7 +76,7 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
for {
select {
case <-ticker.C:
// 生成随机设备数据
// Generate random device data
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1),
"temperature": 20.0 + rand.Float64()*10,
@@ -86,36 +87,36 @@ StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种
}
}()
// 运行30秒
// Run for 30 seconds
time.Sleep(30 * time.Second)
}
# 窗口函数
# Window Functions
StreamSQL 支持多种窗口类型:
StreamSQL supports multiple window types:
// 滚动窗口 - 每5秒一个独立窗口
// Tumbling window - Independent window every 5 seconds
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')
// 滑动窗口 - 窗口大小30秒,每10秒滑动一次
// Sliding window - 30-second window size, slides every 10 seconds
SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')
// 计数窗口 - 每100条记录一个窗口
// Counting window - One window per 100 records
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)
// 会话窗口 - 超时5分钟自动关闭会话
// Session window - Automatically closes session after 5-minute timeout
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
# 自定义函数
# Custom Functions
StreamSQL 支持插件式自定义函数,运行时动态注册:
StreamSQL supports plugin-based custom functions with runtime dynamic registration:
// 注册温度转换函数
// Register temperature conversion function
functions.RegisterCustomFunction(
"fahrenheit_to_celsius",
functions.TypeConversion,
"温度转换",
"华氏度转摄氏度",
"Temperature conversion",
"Fahrenheit to Celsius",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
f, _ := functions.ConvertToFloat64(args[0])
@@ -123,55 +124,55 @@ StreamSQL 支持插件式自定义函数,运行时动态注册:
},
)
// 立即在SQL中使用
// Use immediately in SQL
sql := `SELECT deviceId,
AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
FROM stream GROUP BY deviceId, TumblingWindow('5s')`
支持的自定义函数类型:
• TypeMath - 数学计算函数
• TypeString - 字符串处理函数
• TypeConversion - 类型转换函数
• TypeDateTime - 时间日期函数
• TypeAggregation - 聚合函数
• TypeAnalytical - 分析函数
• TypeWindow - 窗口函数
• TypeCustom - 通用自定义函数
Supported custom function types:
• TypeMath - Mathematical calculation functions
• TypeString - String processing functions
• TypeConversion - Type conversion functions
• TypeDateTime - Date and time functions
• TypeAggregation - Aggregate functions
• TypeAnalytical - Analytical functions
• TypeWindow - Window functions
• TypeCustom - General custom functions
# 日志配置
# Log Configuration
StreamSQL 提供灵活的日志配置选项:
StreamSQL provides flexible log configuration options:
// 设置日志级别
// Set log level
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
// 输出到文件
// Output to file
logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))
// 禁用日志(生产环境)
// Disable logging (production environment)
ssql := streamsql.New(streamsql.WithDiscardLog())
# RuleGo集成
# RuleGo Integration
StreamSQL提供了与RuleGo规则引擎的深度集成,通过两个专用组件实现流式数据处理:
StreamSQL provides deep integration with the RuleGo rule engine through two dedicated components for stream data processing:
• streamTransform (x/streamTransform) - 流转换器,处理非聚合SQL查询
• streamAggregator (x/streamAggregator) - 流聚合器,处理聚合SQL查询
• streamTransform (x/streamTransform) - Stream transformer, handles non-aggregation SQL queries
• streamAggregator (x/streamAggregator) - Stream aggregator, handles aggregation SQL queries
基本集成示例:
Basic integration example:
package main
import (
"github.com/rulego/rulego"
"github.com/rulego/rulego/api/types"
// 注册StreamSQL组件
// Register StreamSQL components
_ "github.com/rulego/rulego-components/external/streamsql"
)
func main() {
// 规则链配置
// Rule chain configuration
ruleChainJson := `{
"ruleChain": {"id": "rule01"},
"metadata": {
@@ -196,10 +197,10 @@ StreamSQL提供了与RuleGo规则引擎的深度集成,通过两个专用组
}
}`
// 创建规则引擎
// Create rule engine
ruleEngine, _ := rulego.New("rule01", []byte(ruleChainJson))
// 发送数据
// Send data
data := `{"deviceId":"sensor01","temperature":25.5}`
msg := types.NewMsg(0, "TELEMETRY", types.JSON, types.NewMetadata(), data)
ruleEngine.OnMsg(msg)