Merge pull request #9 from rulego/dev

feat:增强函数系统,实现大量的函数
This commit is contained in:
Whki
2025-05-25 18:13:07 +08:00
committed by GitHub
74 changed files with 16339 additions and 686 deletions
+5
View File
@@ -103,6 +103,11 @@ func main() {
wg.Wait()
}
```
## Functions
StreamSQL supports a variety of function types, including mathematical, string, conversion, aggregate, analytic, window, and more. [Documentation](docs/FUNCTIONS_USAGE_GUIDE.md)
## Concepts
### Windows
+15
View File
@@ -19,6 +19,9 @@
- 支持过滤条件
- 高可扩展性
- 提供灵活的函数扩展
- **完整的自定义函数系统**:支持数学、字符串、转换、聚合、分析等8种函数类型
- **简单易用的函数注册**:一行代码即可注册自定义函数
- **运行时动态扩展**:支持在运行时添加、移除和管理函数
- 接入`RuleGo`生态,利用`RuleGo`组件方式扩展输出和输入源
- 与[RuleGo](https://gitee.com/rulego/rulego) 集成
- 利用`RuleGo`丰富灵活的输入、输出、处理等组件,实现数据源接入以及和第三方系统联动
@@ -103,6 +106,18 @@ func main() {
}
```
## 函数
StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合、分析、窗口等。[文档](docs/FUNCTIONS_USAGE_GUIDE.md)
### 🎨 支持的函数类型
- **📊 数学函数** - sqrt, power, abs, 三角函数等
- **📝 字符串函数** - concat, upper, lower, trim等
- **🔄 转换函数** - cast, hex2dec, encode/decode等
- **📈 聚合函数** - 自定义聚合逻辑
- **🔍 分析函数** - lag, latest, 变化检测等
## 概念
### 窗口
+62 -276
View File
File diff suppressed because it is too large Load Diff
-45
View File
@@ -1,45 +0,0 @@
package aggregator
type ContextAggregator interface {
GetContextKey() string
}
type WindowStartAggregator struct {
val interface{}
}
func (w *WindowStartAggregator) New() AggregatorFunction {
return &WindowStartAggregator{}
}
func (w *WindowStartAggregator) Add(val interface{}) {
w.val = val
}
func (w *WindowStartAggregator) Result() interface{} {
return w.val
}
func (w *WindowStartAggregator) GetContextKey() string {
return "window_start"
}
type WindowEndAggregator struct {
val interface{}
}
func (w *WindowEndAggregator) New() AggregatorFunction {
return &WindowEndAggregator{}
}
func (w *WindowEndAggregator) Add(val interface{}) {
w.val = val
}
func (w *WindowEndAggregator) Result() interface{} {
return w.val
}
func (w *WindowEndAggregator) GetContextKey() string {
return "window_end"
}
File diff suppressed because it is too large Load Diff
@@ -1,4 +1,4 @@
package parser
package condition
import (
"github.com/expr-lang/expr"
File diff suppressed because it is too large Load Diff
+175
View File
@@ -0,0 +1,175 @@
/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
Package streamsql 是一个轻量级的、基于 SQL 的物联网边缘流处理引擎。
StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种窗口类型、聚合函数、
自定义函数,以及与 RuleGo 生态的无缝集成。
# 核心特性
• 轻量级设计 - 纯内存操作,无外部依赖
• SQL语法支持 - 使用熟悉的SQL语法处理流数据
• 多种窗口类型 - 滑动窗口、滚动窗口、计数窗口、会话窗口
• 丰富的聚合函数 - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE等
• 插件式自定义函数 - 运行时动态注册,支持8种函数类型
• RuleGo生态集成 - 利用RuleGo组件扩展输入输出源
# 入门示例
基本的流数据处理:
package main
import (
"fmt"
"math/rand"
"time"
"github.com/rulego/streamsql"
)
func main() {
// 创建StreamSQL实例
ssql := streamsql.New()
// 定义SQL查询 - 每5秒按设备ID分组计算温度平均值
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
MIN(humidity) as min_humidity,
window_start() as start,
window_end() as end
FROM stream
WHERE deviceId != 'device3'
GROUP BY deviceId, TumblingWindow('5s')`
// 执行SQL,创建流处理任务
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// 添加结果处理回调
ssql.Stream().AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %v\n", result)
})
// 模拟发送流数据
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 生成随机设备数据
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1),
"temperature": 20.0 + rand.Float64()*10,
"humidity": 50.0 + rand.Float64()*20,
}
ssql.AddData(data)
}
}
}()
// 运行30秒
time.Sleep(30 * time.Second)
}
# 窗口函数
StreamSQL 支持多种窗口类型:
// 滚动窗口 - 每5秒一个独立窗口
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')
// 滑动窗口 - 窗口大小30秒,每10秒滑动一次
SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')
// 计数窗口 - 每100条记录一个窗口
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)
// 会话窗口 - 超时5分钟自动关闭会话
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
# 自定义函数
StreamSQL 支持插件式自定义函数,运行时动态注册:
// 注册温度转换函数
functions.RegisterCustomFunction(
"fahrenheit_to_celsius",
functions.TypeConversion,
"温度转换",
"华氏度转摄氏度",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
f, _ := functions.ConvertToFloat64(args[0])
return (f - 32) * 5 / 9, nil
},
)
// 立即在SQL中使用
sql := `SELECT deviceId,
AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
FROM stream GROUP BY deviceId, TumblingWindow('5s')`
支持的自定义函数类型:
• TypeMath - 数学计算函数
• TypeString - 字符串处理函数
• TypeConversion - 类型转换函数
• TypeDateTime - 时间日期函数
• TypeAggregation - 聚合函数
• TypeAnalytical - 分析函数
• TypeWindow - 窗口函数
• TypeCustom - 通用自定义函数
# 日志配置
StreamSQL 提供灵活的日志配置选项:
// 设置日志级别
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
// 输出到文件
logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))
// 禁用日志(生产环境)
ssql := streamsql.New(streamsql.WithDiscardLog())
# 性能配置
对于生产环境,建议进行以下配置:
ssql := streamsql.New(
streamsql.WithDiscardLog(), // 禁用日志提升性能
// 其他配置选项...
)
# 与RuleGo集成
StreamSQL可以与RuleGo规则引擎无缝集成,利用RuleGo丰富的组件生态:
// TODO: 提供RuleGo集成示例
更多详细信息和高级用法,请参阅:
• 自定义函数开发指南: docs/CUSTOM_FUNCTIONS_GUIDE.md
• 快速入门指南: docs/FUNCTION_QUICK_START.md
• 完整示例: examples/
*/
package streamsql
File diff suppressed because it is too large Load Diff
+221
View File
@@ -0,0 +1,221 @@
# StreamSQL 函数系统
StreamSQL 现已支持强大的函数系统,允许在 SQL 查询中使用各种内置函数和自定义函数。
## 🚀 主要特性
### 1. 模块化函数架构
- **函数注册器**:统一的函数注册和管理系统
- **类型安全**:强类型参数验证和转换
- **可扩展性**:支持运行时注册自定义函数
- **分类管理**:按功能类型组织函数
### 2. 内置函数类别
#### 数学函数 (TypeMath)
- `ABS(x)` - 绝对值
- `SQRT(x)` - 平方根
#### 字符串函数 (TypeString)
- `CONCAT(str1, str2, ...)` - 字符串连接
- `LENGTH(str)` - 字符串长度
- `UPPER(str)` - 转大写
- `LOWER(str)` - 转小写
#### 转换函数 (TypeConversion)
- `CAST(value, type)` - 类型转换
- `HEX2DEC(hexStr)` - 十六进制转十进制
- `DEC2HEX(number)` - 十进制转十六进制
#### 时间日期函数 (TypeDateTime)
- `NOW()` - 当前时间戳
### 3. 表达式引擎增强
- 支持函数调用的复杂表达式
- 运算符优先级处理
- 括号分组支持
- 自动类型转换
## 📝 使用示例
### 基本函数使用
```sql
-- 数学函数
SELECT device, ABS(temperature - 20) as deviation
FROM stream;
-- 字符串函数
SELECT CONCAT(device, '_processed') as processed_name
FROM stream;
-- 表达式中的函数
SELECT device, AVG(ABS(temperature - 20)) as avg_deviation
FROM stream
GROUP BY device, TumblingWindow('1s');
```
### 自定义函数注册
```go
import "github.com/rulego/streamsql/functions"
// 注册华氏度转摄氏度函数
err := functions.RegisterCustomFunction(
"fahrenheit_to_celsius",
functions.TypeCustom,
"温度转换",
"华氏度转摄氏度",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
fahrenheit, err := functions.ConvertToFloat64(args[0])
if err != nil {
return nil, err
}
celsius := (fahrenheit - 32) * 5 / 9
return celsius, nil
})
// 在 SQL 中使用
sql := `
SELECT device, AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
FROM stream
GROUP BY device, TumblingWindow('2s')
`
```
### 复合表达式
```sql
-- 复杂的数学表达式
SELECT
device,
AVG(ABS(temperature - 20) * 1.8 + 32) as complex_calc
FROM stream
GROUP BY device, TumblingWindow('1s');
```
## 🛠️ 函数开发
### 实现自定义函数
```go
// 1. 定义函数结构
type MyCustomFunction struct {
*functions.BaseFunction
}
// 2. 实现构造函数
func NewMyCustomFunction() *MyCustomFunction {
return &MyCustomFunction{
BaseFunction: functions.NewBaseFunction(
"my_func",
functions.TypeCustom,
"自定义分类",
"函数描述",
1, 3, // 最少1个参数,最多3个参数
),
}
}
// 3. 实现验证方法
func (f *MyCustomFunction) Validate(args []interface{}) error {
return f.ValidateArgCount(args)
}
// 4. 实现执行方法
func (f *MyCustomFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
// 实现具体逻辑
return result, nil
}
// 5. 注册函数
functions.Register(NewMyCustomFunction())
```
### 便捷注册方式
```go
// 使用便捷方法注册函数
err := functions.RegisterCustomFunction(
"double",
functions.TypeCustom,
"数学运算",
"将数值乘以2",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
val, err := functions.ConvertToFloat64(args[0])
if err != nil {
return nil, err
}
return val * 2, nil
})
```
## 🧪 测试
### 运行函数系统测试
```bash
go test ./functions -v
```
### 运行集成测试
```bash
go test -v -run TestExpressionInAggregation
```
## 📋 支持的数据类型
函数系统支持以下数据类型的自动转换:
- **数值类型**: `int`, `int32`, `int64`, `uint`, `uint32`, `uint64`, `float32`, `float64`
- **字符串类型**: `string`
- **布尔类型**: `bool`
- **自动转换**: 字符串数值自动转换为相应的数值类型
## 🔧 类型转换工具
```go
// 使用内置转换函数
val, err := functions.ConvertToFloat64(someValue)
str, err := functions.ConvertToString(someValue)
num, err := functions.ConvertToInt64(someValue)
flag, err := functions.ConvertToBool(someValue)
```
## 📈 性能考虑
- **函数注册**: 一次性注册,运行时无开销
- **类型转换**: 高效的类型检查和转换
- **表达式缓存**: 表达式解析结果可复用
- **并发安全**: 函数注册器支持并发访问
## 🌟 路线图
已实现的功能:
- ✅ SELECT DISTINCT
- ✅ LIMIT 子句
- ✅ HAVING 子句
- ✅ SESSION 窗口
- ✅ 函数参数支持表达式运算
- ✅ 统一函数注册系统
待实现的功能:
- 🔄 更多聚合函数(MEDIAN、STDDEV 等)
- 🔄 窗口函数(ROW_NUMBER、RANK 等)
- 🔄 更多时间日期函数
- 🔄 正则表达式函数
- 🔄 JSON 处理函数
## 🤝 贡献
欢迎提交新的函数实现!请遵循以下步骤:
1.`functions/` 目录中实现函数
2. 添加相应的测试用例
3. 更新文档
4. 提交 Pull Request
---
*StreamSQL 函数系统让流处理更加强大和灵活!* 🚀
File diff suppressed because it is too large Load Diff
+267
View File
@@ -0,0 +1,267 @@
# StreamSQL 函数系统整合指南
本文档说明 StreamSQL 如何整合自定义函数系统与 expr-lang/expr 库,以提供更强大和丰富的表达式计算能力。
## 🏗️ 架构概述
### 双引擎架构
StreamSQL 现在支持两套表达式引擎:
1. **自定义 expr 引擎** (`expr/expression.go`)
- 专门针对数值计算优化
- 支持基本数学运算和函数
- 轻量级,高性能
2. **expr-lang/expr 引擎**
- 功能强大的通用表达式语言
- 支持复杂数据类型(数组、对象、字符串等)
- 丰富的内置函数库
### 桥接系统
`functions/expr_bridge.go` 提供了统一的接口,自动选择最合适的引擎并整合两套函数系统。
## 📚 可用函数
### StreamSQL 内置函数
#### 数学函数 (TypeMath)
| 函数 | 描述 | 示例 |
|---------------|--------|------------------------|
| `abs(x)` | 绝对值 | `abs(-5)``5` |
| `sqrt(x)` | 平方根 | `sqrt(16)``4` |
| `acos(x)` | 反余弦 | `acos(0.5)``1.047` |
| `asin(x)` | 反正弦 | `asin(0.5)``0.524` |
| `atan(x)` | 反正切 | `atan(1)``0.785` |
| `atan2(y,x)` | 双参数反正切 | `atan2(1,1)``0.785` |
| `bitand(a,b)` | 按位与 | `bitand(5,3)``1` |
| `bitor(a,b)` | 按位或 | `bitor(5,3)``7` |
| `bitxor(a,b)` | 按位异或 | `bitxor(5,3)``6` |
| `bitnot(x)` | 按位非 | `bitnot(5)``-6` |
| `ceiling(x)` | 向上取整 | `ceiling(3.2)``4` |
| `cos(x)` | 余弦 | `cos(0)``1` |
| `cosh(x)` | 双曲余弦 | `cosh(0)``1` |
| `exp(x)` | e的x次幂 | `exp(1)``2.718` |
| `floor(x)` | 向下取整 | `floor(3.8)``3` |
| `ln(x)` | 自然对数 | `ln(2.718)``1` |
| `power(x,y)` | x的y次幂 | `power(2,3)``8` |
#### 字符串函数 (TypeString)
| 函数 | 描述 | 示例 |
|---------------------|-------|-------------------------------------------------|
| `concat(s1,s2,...)` | 字符串连接 | `concat("hello"," ","world")``"hello world"` |
| `length(s)` | 字符串长度 | `length("hello")``5` |
| `upper(s)` | 转大写 | `upper("hello")``"HELLO"` |
| `lower(s)` | 转小写 | `lower("HELLO")``"hello"` |
#### 转换函数 (TypeConversion)
| 函数 | 描述 | 示例 |
|------------------------|----------|--------------------------------------------|
| `cast(value, type)` | 类型转换 | `cast("123", "int64")``123` |
| `hex2dec(hex)` | 十六进制转十进制 | `hex2dec("ff")``255` |
| `dec2hex(num)` | 十进制转十六进制 | `dec2hex(255)``"ff"` |
| `encode(data, format)` | 编码 | `encode("hello", "base64")``"aGVsbG8="` |
| `decode(data, format)` | 解码 | `decode("aGVsbG8=", "base64")``"hello"` |
#### 时间日期函数 (TypeDateTime)
| 函数 | 描述 | 示例 |
|------------------|------------------|-----------------------------------|
| `now()` | 当前时间戳 | `now()``1640995200` |
| `current_time()` | 当前时间(HH:MM:SS) | `current_time()``"14:30:25"` |
| `current_date()` | 当前日期(YYYY-MM-DD) | `current_date()``"2025-01-01"` |
#### 聚合函数 (TypeAggregation)
| 函数 | 描述 | 示例 |
|---------------|-----|---------------------------|
| `sum(...)` | 求和 | `sum(1,2,3)``6` |
| `avg(...)` | 平均值 | `avg(1,2,3)``2` |
| `min(...)` | 最小值 | `min(1,2,3)``1` |
| `max(...)` | 最大值 | `max(1,2,3)``3` |
| `count(...)` | 计数 | `count(1,2,3)``3` |
| `stddev(...)` | 标准差 | `stddev(1,2,3)``0.816` |
| `median(...)` | 中位数 | `median(1,2,3)``2` |
### expr-lang/expr 内置函数
#### 数学函数
| 函数 | 描述 | 示例 |
|------------|------|--------------------|
| `abs(x)` | 绝对值 | `abs(-5)``5` |
| `ceil(x)` | 向上取整 | `ceil(3.2)``4` |
| `floor(x)` | 向下取整 | `floor(3.8)``3` |
| `round(x)` | 四舍五入 | `round(3.6)``4` |
| `max(a,b)` | 最大值 | `max(5,3)``5` |
| `min(a,b)` | 最小值 | `min(5,3)``3` |
#### 字符串函数
| 函数 | 描述 | 示例 |
|------------------------|--------|------------------------------------------|
| `trim(s)` | 去除首尾空格 | `trim(" hello ")``"hello"` |
| `upper(s)` | 转大写 | `upper("hello")``"HELLO"` |
| `lower(s)` | 转小写 | `lower("HELLO")``"hello"` |
| `split(s, delimiter)` | 分割字符串 | `split("a,b,c", ",")``["a","b","c"]` |
| `replace(s, old, new)` | 替换字符串 | `replace("hello", "l", "x")``"hexxo"` |
| `indexOf(s, sub)` | 查找子串位置 | `indexOf("hello", "ll")``2` |
| `hasPrefix(s, prefix)` | 检查前缀 | `hasPrefix("hello", "he")``true` |
| `hasSuffix(s, suffix)` | 检查后缀 | `hasSuffix("hello", "lo")``true` |
#### 数组/集合函数
| 函数 | 描述 | 示例 |
|----------------------------|-----------|----------------------------------------|
| `all(array, predicate)` | 所有元素满足条件 | `all([2,4,6], # % 2 == 0)``true` |
| `any(array, predicate)` | 任一元素满足条件 | `any([1,3,4], # % 2 == 0)``true` |
| `filter(array, predicate)` | 过滤元素 | `filter([1,2,3,4], # > 2)``[3,4]` |
| `map(array, expression)` | 转换元素 | `map([1,2,3], # * 2)``[2,4,6]` |
| `find(array, predicate)` | 查找元素 | `find([1,2,3], # > 2)``3` |
| `count(array, predicate)` | 计数满足条件的元素 | `count([1,2,3,4], # > 2)``2` |
| `concat(array1, array2)` | 连接数组 | `concat([1,2], [3,4])``[1,2,3,4]` |
| `flatten(array)` | 展平数组 | `flatten([[1,2],[3,4]])``[1,2,3,4]` |
| `len(value)` | 获取长度 | `len([1,2,3])``3` |
#### 时间函数
| 函数 | 描述 | 示例 |
|---------------|-------|-------------------------------|
| `now()` | 当前时间 | `now()``时间对象` |
| `duration(s)` | 解析时间段 | `duration("1h30m")``时间段对象` |
| `date(s)` | 解析日期 | `date("2023-12-01")``日期对象` |
#### 类型转换函数
| 函数 | 描述 | 示例 |
|------|------|------|
| `int(x)` | 转整数 | `int("123")``123` |
| `float(x)` | 转浮点数 | `float("123.45")``123.45` |
| `string(x)` | 转字符串 | `string(123)``"123"` |
| `type(x)` | 获取类型 | `type(123)``"int"` |
#### JSON/编码函数
| 函数 | 描述 | 示例 |
|-----------------|----------|--------------------------------------|
| `toJSON(x)` | 转JSON | `toJSON({"a":1})``'{"a":1}'` |
| `fromJSON(s)` | 解析JSON | `fromJSON('{"a":1}')``{"a":1}` |
| `toBase64(s)` | Base64编码 | `toBase64("hello")``"aGVsbG8="` |
| `fromBase64(s)` | Base64解码 | `fromBase64("aGVsbG8=")``"hello"` |
## 🔧 使用方法
### 基本使用
```go
import "github.com/rulego/streamsql/functions"
// 直接使用桥接器评估表达式
result, err := functions.EvaluateWithBridge("abs(-5) + len([1,2,3])", map[string]interface{}{})
// result: 8 (5 + 3)
```
### 在 SQL 查询中使用
```sql
-- 使用 StreamSQL 函数
SELECT device, abs(temperature - 20) as deviation
FROM stream;
-- 使用 expr-lang 函数
SELECT device, filter(measurements, # > 10) as high_values
FROM stream;
-- 混合使用
SELECT device, encode(concat(device, "_", string(now())), "base64") as device_id
FROM stream;
```
### 表达式引擎选择
表达式引擎会自动选择:
1. **简单数值表达式** → 使用自定义 expr 引擎(更快)
2. **复杂表达式或使用高级函数** → 使用 expr-lang/expr(更强大)
### 函数冲突解决
当两个系统有同名函数时:
1. **默认优先级**expr-lang/expr > StreamSQL
2. **访问 StreamSQL 版本**:使用 `streamsql_` 前缀,如 `streamsql_abs(-5)`
3. **明确指定**:通过函数解析器手动选择
## 🛠️ 高级用法
### 获取所有可用函数
```go
info := functions.GetAllAvailableFunctions()
streamSQLFuncs := info["streamsql"]
exprLangFuncs := info["expr-lang"]
```
### 自定义函数注册
```go
// 注册到 StreamSQL 系统
err := functions.RegisterCustomFunction("celsius_to_fahrenheit",
functions.TypeMath, "温度转换", "摄氏度转华氏度", 1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
celsius, _ := functions.ConvertToFloat64(args[0])
return celsius*1.8 + 32, nil
})
// 函数会自动在两个引擎中可用
```
### 表达式编译和缓存
```go
bridge := functions.GetExprBridge()
// 编译表达式(可缓存)
program, err := bridge.CompileExpressionWithStreamSQLFunctions(
"abs(temperature - 20) > 5",
map[string]interface{}{"temperature": 0.0})
// 重复执行(高性能)
result, err := expr.Run(program, map[string]interface{}{"temperature": 25.5})
```
## 🔍 性能考虑
### 选择合适的引擎
1. **纯数值计算**:优先使用自定义 expr 引擎
2. **字符串/数组操作**:使用 expr-lang/expr
3. **复杂逻辑表达式**:使用 expr-lang/expr
### 优化建议
1. **预编译表达式**:对于重复使用的表达式,预编译以提高性能
2. **函数选择**:优先使用性能更好的版本
3. **数据类型**:避免不必要的类型转换
## 📝 示例
### 温度监控
```sql
SELECT
device,
temperature,
abs(temperature - 20) as deviation,
CASE
WHEN temperature > 30 THEN "hot"
WHEN temperature < 10 THEN "cold"
ELSE "normal"
END as status,
encode(concat(device, "_", current_date()), "base64") as device_key
FROM temperature_stream
WHERE abs(temperature - 20) > 5;
```
### 数据处理
```sql
SELECT
sensor_id,
filter(readings, # > avg(readings)) as above_average,
map(readings, round(#, 2)) as rounded_readings,
len(readings) as reading_count
FROM sensor_data
WHERE len(readings) > 10;
```
File diff suppressed because it is too large Load Diff
+214
View File
@@ -0,0 +1,214 @@
# StreamSQL 插件式自定义函数快速示例
## 🚀 5分钟上手插件式扩展
### 1️⃣ 注册自定义函数
```go
package main
import (
"fmt"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/functions"
)
func main() {
// 🔌 插件式注册 - 数据脱敏函数
functions.RegisterCustomFunction(
"mask_email", // 函数名
functions.TypeString, // 函数类型
"数据脱敏", // 分类
"邮箱地址脱敏", // 描述
1, 1, // 参数数量
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
email, _ := functions.ConvertToString(args[0])
parts := strings.Split(email, "@")
if len(parts) != 2 {
return email, nil
}
user := parts[0]
domain := parts[1]
if len(user) > 2 {
masked := user[:2] + "***" + user[len(user)-1:]
return masked + "@" + domain, nil
}
return email, nil
},
)
// 🔌 插件式注册 - 业务计算函数
functions.RegisterCustomFunction(
"calculate_score",
functions.TypeMath,
"业务计算",
"计算用户评分",
2, 2,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
base, _ := functions.ConvertToFloat64(args[0])
bonus, _ := functions.ConvertToFloat64(args[1])
return base + bonus*0.1, nil
},
)
// 🔌 插件式注册 - 状态转换函数
functions.RegisterCustomFunction(
"format_status",
functions.TypeConversion,
"状态转换",
"格式化状态显示",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
status, _ := functions.ConvertToString(args[0])
switch status {
case "1": return "✅ 活跃", nil
case "0": return "❌ 非活跃", nil
default: return "❓ 未知", nil
}
},
)
}
```
### 2️⃣ 立即在SQL中使用
```go
func demonstrateUsage() {
ssql := streamsql.New()
defer ssql.Stop()
// 🎯 直接在SQL中使用新注册的函数 - 无需修改任何核心代码!
sql := `
SELECT
user_id,
mask_email(email) as safe_email,
format_status(status) as status_display,
AVG(calculate_score(base_score, performance)) as avg_score
FROM stream
GROUP BY user_id, TumblingWindow('5s')
`
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// 添加结果监听
ssql.Stream().AddSink(func(result interface{}) {
fmt.Printf("处理结果: %v\n", result)
})
// 添加测试数据
testData := []map[string]interface{}{
{
"user_id": "U001",
"email": "john.doe@example.com",
"status": "1",
"base_score": 85.0,
"performance": 12.0,
},
{
"user_id": "U001",
"email": "john.doe@example.com",
"status": "1",
"base_score": 90.0,
"performance": 15.0,
},
}
for _, data := range testData {
ssql.AddData(data)
}
// 等待结果
time.Sleep(6 * time.Second)
}
```
### 3️⃣ 运行结果
```json
{
"user_id": "U001",
"safe_email": "jo***e@example.com",
"status_display": "✅ 活跃",
"avg_score": 86.35
}
```
## 🔥 核心优势
### ✅ 完全插件式
- **无需修改SQL解析器** - 新函数自动识别
- **无需重启应用** - 运行时动态注册
- **无需额外配置** - 注册后立即可用
### ✅ 智能处理
- **字符串函数** → 直接处理模式(低延迟)
- **数学函数** → 窗口聚合模式(支持统计)
- **转换函数** → 直接处理模式(实时转换)
### ✅ 灵活管理
```go
// 运行时管理
fn, exists := functions.Get("mask_email") // 查询函数
mathFuncs := functions.GetByType(functions.TypeMath) // 按类型查询
allFuncs := functions.ListAll() // 列出所有函数
success := functions.Unregister("old_function") // 注销函数
```
## 🎯 实际应用场景
### 📊 数据脱敏
```sql
SELECT
mask_email(email) as safe_email,
mask_phone(phone) as safe_phone
FROM user_stream
```
### 💼 业务计算
```sql
SELECT
user_id,
AVG(calculate_commission(sales, rate)) as avg_commission,
SUM(calculate_bonus(performance, level)) as total_bonus
FROM sales_stream
GROUP BY user_id, TumblingWindow('1h')
```
### 🔄 状态转换
```sql
SELECT
order_id,
format_status(status_code) as readable_status,
format_priority(priority_level) as priority_display
FROM order_stream
```
### 🌐 多语言支持
```go
// 注册多语言函数
functions.RegisterCustomFunction("translate", functions.TypeString, ...,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
text := args[0].(string)
lang := args[1].(string)
return translateService.Translate(text, lang), nil
})
// SQL中使用
// SELECT translate(message, 'zh-CN') as chinese_message FROM stream
```
## 🏁 总结
StreamSQL 的插件式自定义函数系统让你能够:
1. **🔌 即插即用** - 注册函数后立即在SQL中使用
2. **🚀 零停机扩展** - 运行时动态增加功能
3. **🎯 类型智能** - 根据函数类型自动选择最优处理模式
4. **📈 无限可能** - 支持任意复杂的业务逻辑
**真正实现了"写一个函数,SQL立即可用"的插件式体验!**
+91
View File
@@ -0,0 +1,91 @@
# 高级自定义函数示例
## 简介
展示StreamSQL自定义函数系统的高级特性,包括状态管理、缓存机制、性能优化等。
## 功能演示
- 🏗️ **结构体方式实现**:完整的函数生命周期管理
- 💾 **状态管理**:有状态函数的实现和使用
-**性能优化**:缓存机制和优化策略
- 🛡️ **高级验证**:复杂参数验证和错误处理
- 🧵 **线程安全**:并发环境下的安全实现
## 运行方式
```bash
cd examples/advanced-functions
go run main.go
```
## 代码亮点
### 1. 完整结构体实现
```go
type AdvancedFunction struct {
*functions.BaseFunction
cache map[string]interface{}
mutex sync.RWMutex
counter int64
}
func (f *AdvancedFunction) Validate(args []interface{}) error {
// 自定义验证逻辑
return f.ValidateArgCount(args)
}
func (f *AdvancedFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
// 复杂的执行逻辑
}
```
### 2. 状态管理
```go
type StatefulFunction struct {
*functions.BaseFunction
history []float64
mutex sync.Mutex
}
// 维护历史数据状态
func (f *StatefulFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
// 更新状态
f.history = append(f.history, value)
return f.calculate(), nil
}
```
### 3. 缓存优化
```go
func (f *CachedFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
key := f.generateKey(args)
// 检查缓存
if result, exists := f.getFromCache(key); exists {
return result, nil
}
// 计算并缓存
result := f.compute(args)
f.setCache(key, result)
return result, nil
}
```
## 高级特性
- **内存管理**:合理的资源分配和释放
- **错误恢复**:异常情况的处理和恢复
- **性能监控**:执行时间和资源使用统计
- **热重载**:运行时函数更新和替换
## 适用场景
- 🎯 **高性能应用**:需要极致性能优化的场景
- 🔄 **状态跟踪**:需要维护历史状态的计算
- 📈 **复杂算法**:机器学习、统计分析等
- 🏢 **企业级系统**:生产环境的稳定性要求
+119
View File
@@ -0,0 +1,119 @@
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/functions"
"github.com/rulego/streamsql/utils/cast"
)
func main() {
fmt.Println("=== StreamSQL 高级函数示例 ===")
// 1. 注册自定义函数:温度华氏度转摄氏度
err := functions.RegisterCustomFunction("fahrenheit_to_celsius", functions.TypeCustom, "温度转换", "华氏度转摄氏度", 1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
fahrenheit, err := cast.ToFloat64E(args[0])
if err != nil {
return nil, err
}
celsius := (fahrenheit - 32) * 5 / 9
return celsius, nil
})
if err != nil {
panic(fmt.Sprintf("注册自定义函数失败: %v", err))
}
fmt.Println("✓ 注册自定义函数:fahrenheit_to_celsius")
// 2. 创建 StreamSQL 实例
ssql := streamsql.New()
defer ssql.Stop()
// 3. 定义包含高级函数的 SQL
sql := `
SELECT
device,
AVG(abs(temperature - 20)) as avg_deviation,
AVG(fahrenheit_to_celsius(temperature)) as avg_celsius,
MAX(sqrt(humidity)) as max_sqrt_humidity
FROM stream
GROUP BY device, TumblingWindow('2s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')
`
// 4. 执行 SQL
err = ssql.Execute(sql)
if err != nil {
panic(fmt.Sprintf("执行SQL失败: %v", err))
}
fmt.Println("✓ SQL执行成功")
// 5. 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
fmt.Printf("📊 聚合结果: %v\n", result)
})
// 6. 模拟传感器数据
baseTime := time.Now()
sensorData := []map[string]interface{}{
{"device": "sensor1", "temperature": 68.0, "humidity": 25.0, "ts": baseTime.UnixMicro()}, // 20°C, 湿度25%
{"device": "sensor1", "temperature": 86.0, "humidity": 36.0, "ts": baseTime.Unix()}, // 30°C, 湿度36%
{"device": "sensor2", "temperature": 32.0, "humidity": 49.0, "ts": baseTime.Unix()}, // 0°C, 湿度49%
{"device": "sensor2", "temperature": 104.0, "humidity": 64.0, "ts": baseTime.Unix()}, // 40°C, 湿度64%
{"device": "temperature_probe", "temperature": 212.0, "humidity": 81.0, "ts": baseTime.Unix()}, // 100°C, 湿度81%
}
fmt.Println("\n🌡️ 发送传感器数据:")
for _, data := range sensorData {
fmt.Printf(" 设备: %s, 温度: %.1f°F, 湿度: %.1f%%\n",
data["device"], data["temperature"], data["humidity"])
ssql.AddData(data)
}
// 7. 等待处理完成
fmt.Println("\n⏳ 等待窗口处理...")
time.Sleep(3 * time.Second)
// 8. 演示内置函数
fmt.Println("\n🔧 内置函数演示:")
// 数学函数
fmt.Printf(" abs(-15.5) = %.1f\n", callFunction("abs", -15.5))
fmt.Printf(" sqrt(16) = %.1f\n", callFunction("sqrt", 16.0))
// 字符串函数
fmt.Printf(" concat('Hello', ' ', 'World') = %s\n", callFunction("concat", "Hello", " ", "World"))
fmt.Printf(" upper('streamsql') = %s\n", callFunction("upper", "streamsql"))
fmt.Printf(" length('StreamSQL') = %d\n", callFunction("length", "StreamSQL"))
// 转换函数
fmt.Printf(" hex2dec('ff') = %d\n", callFunction("hex2dec", "ff"))
fmt.Printf(" dec2hex(255) = %s\n", callFunction("dec2hex", 255))
// 时间函数
fmt.Printf(" now() = %d\n", callFunction("now"))
// 9. 显示已注册的函数
fmt.Println("\n📋 已注册的函数:")
allFunctions := functions.ListAll()
for name, fn := range allFunctions {
fmt.Printf(" %s (%s): %s\n", name, fn.GetType(), fn.GetDescription())
}
fmt.Println("\n✅ 示例完成!")
}
// 辅助函数:调用函数并返回结果
func callFunction(name string, args ...interface{}) interface{} {
ctx := &functions.FunctionContext{
Data: make(map[string]interface{}),
}
result, err := functions.Execute(name, ctx, args)
if err != nil {
return fmt.Sprintf("Error: %v", err)
}
return result
}
+71
View File
@@ -0,0 +1,71 @@
# 自定义函数完整演示
## 简介
这是StreamSQL自定义函数系统的完整功能演示,涵盖了所有函数类型和高级用法。
## 功能演示
- 🔢 **数学函数**:距离计算、温度转换、圆面积计算
- 📝 **字符串函数**:JSON提取、字符串反转、字符串重复
- 🔄 **转换函数**IP地址转换、字节大小格式化
- 📅 **时间日期函数**:时间格式化、时间差计算
- 📊 **聚合函数**:几何平均数、众数计算
- 🔍 **分析函数**:移动平均值
- 🛠️ **函数管理**:注册、查询、分类、注销
## 运行方式
```bash
cd examples/custom-functions-demo
go run main.go
```
## 代码亮点
### 1. 完整函数类型覆盖
```go
// 数学函数:距离计算
functions.RegisterCustomFunction("distance", functions.TypeMath, ...)
// 字符串函数:JSON提取
functions.RegisterCustomFunction("json_extract", functions.TypeString, ...)
// 转换函数:IP转换
functions.RegisterCustomFunction("ip_to_int", functions.TypeConversion, ...)
```
### 2. 自定义聚合函数
```go
type GeometricMeanFunction struct {
*functions.BaseFunction
}
// 配合聚合器使用
aggregator.Register("geometric_mean", func() aggregator.AggregatorFunction {
return &GeometricMeanAggregator{}
})
```
### 3. 复杂SQL查询
```sql
SELECT
device,
AVG(distance(x1, y1, x2, y2)) as avg_distance,
json_extract(metadata, 'version') as version,
format_bytes(memory_usage) as formatted_memory
FROM stream
GROUP BY device, TumblingWindow('1s')
```
## 演示流程
1. **函数注册阶段** - 注册各类型函数
2. **SQL测试阶段** - 在不同模式下测试函数
3. **管理功能演示** - 展示函数发现和管理功能
## 适用场景
- 🏢 **企业级应用**:了解完整功能特性
- 🔬 **功能验证**:测试复杂函数组合
- **学习参考**:最佳实践和使用模式
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,62 @@
# 函数集成演示
## 简介
展示自定义函数与StreamSQL各种特性的集成使用,包括窗口聚合、表达式计算、条件过滤等。
## 功能演示
- 🪟 **窗口集成**:自定义函数在不同窗口类型中的使用
- 🧮 **表达式集成**:函数与算术表达式的组合使用
- 🔍 **条件集成**:在WHERE、HAVING子句中使用自定义函数
- 📊 **聚合集成**:自定义函数与内置聚合函数的协同工作
## 运行方式
```bash
cd examples/function-integration-demo
go run main.go
```
## 代码亮点
### 1. 窗口函数集成
```sql
SELECT
device,
AVG(custom_calc(temperature, pressure)) as avg_result,
window_start() as start_time
FROM stream
GROUP BY device, SlidingWindow('30s', '10s')
```
### 2. 复杂表达式集成
```sql
SELECT
device,
custom_function(value * 1.8 + 32) as processed_value,
SUM(another_function(field1, field2)) as total
FROM stream
GROUP BY device
```
### 3. 条件过滤集成
```sql
SELECT device, AVG(temperature)
FROM stream
WHERE custom_validator(status) = true
HAVING custom_threshold(AVG(temperature)) > 0
```
## 演示场景
1. **传感器数据处理** - 温度、湿度、压力的综合计算
2. **业务指标计算** - 自定义评分和分级函数
3. **数据清洗** - 自定义验证和转换函数
4. **实时监控** - 阈值检查和告警函数
## 适用场景
- 🏭 **工业物联网**:复杂传感器数据处理
- 💼 **业务分析**:自定义业务逻辑计算
- 🔧 **系统集成**:已有函数库的整合使用
+195
View File
@@ -0,0 +1,195 @@
package main
import (
"fmt"
"strings"
"github.com/rulego/streamsql/functions"
)
func main() {
fmt.Println("🔧 StreamSQL 函数系统整合演示")
fmt.Println(strings.Repeat("=", 50))
// 1. 获取桥接器
bridge := functions.GetExprBridge()
// 2. 准备测试数据
data := map[string]interface{}{
"temperature": -15.5,
"humidity": 65.8,
"device": "sensor_001",
"values": []float64{1.2, -3.4, 5.6, -7.8, 9.0},
"tags": []string{"outdoor", "weather", "monitoring"},
"metadata": map[string]interface{}{
"location": "北京",
"type": "温湿度传感器",
},
}
fmt.Printf("📊 测试数据: %+v\n\n", data)
// 3. 演示 StreamSQL 函数
fmt.Println("🎯 StreamSQL 内置函数演示:")
testStreamSQLFunctions(bridge, data)
// 4. 演示 expr-lang 函数
fmt.Println("\n🚀 expr-lang 内置函数演示:")
testExprLangFunctions(bridge, data)
// 5. 演示混合使用
fmt.Println("\n🔀 混合函数使用演示:")
testMixedFunctions(bridge, data)
// 6. 演示函数冲突解决
fmt.Println("\n⚖️ 函数冲突解决演示:")
testFunctionConflicts(bridge, data)
// 7. 显示所有可用函数
fmt.Println("\n📋 所有可用函数:")
showAllFunctions()
}
func testStreamSQLFunctions(bridge *functions.ExprBridge, data map[string]interface{}) {
tests := []struct {
name string
expression string
expected string
}{
{"绝对值", "abs(temperature)", "15.5"},
{"平方根", "sqrt(64)", "8"},
{"字符串长度", "length(device)", "10"},
{"字符串连接", "concat(device, \"_processed\")", "sensor_001_processed"},
{"转大写", "upper(device)", "SENSOR_001"},
{"当前时间戳", "now()", "时间戳"},
{"编码", "encode(\"hello\", \"base64\")", "aGVsbG8="},
{"解码", "decode(\"aGVsbG8=\", \"base64\")", "hello"},
{"十六进制转换", "hex2dec(\"ff\")", "255"},
{"数学计算", "power(2, 3)", "8"},
{"三角函数", "cos(0)", "1"},
}
for _, test := range tests {
result, err := bridge.EvaluateExpression(test.expression, data)
if err != nil {
fmt.Printf(" ❌ %s: %s -> 错误: %v\n", test.name, test.expression, err)
} else {
fmt.Printf(" ✅ %s: %s -> %v\n", test.name, test.expression, result)
}
}
}
func testExprLangFunctions(bridge *functions.ExprBridge, data map[string]interface{}) {
tests := []struct {
name string
expression string
}{
{"数组长度", "len(values)"},
{"数组过滤", "filter(values, # > 0)"},
{"数组映射", "map(values, abs(#))"},
{"字符串处理", "trim(\" hello world \")"},
{"字符串分割", "split(device, \"_\")"},
{"类型转换", "int(humidity)"},
{"最大值", "max(values)"},
{"最小值", "min(values)"},
{"字符串包含", "\"sensor\" in device"},
{"条件表达式", "temperature < 0 ? \"冷\" : \"热\""},
}
for _, test := range tests {
result, err := bridge.EvaluateExpression(test.expression, data)
if err != nil {
fmt.Printf(" ❌ %s: %s -> 错误: %v\n", test.name, test.expression, err)
} else {
fmt.Printf(" ✅ %s: %s -> %v\n", test.name, test.expression, result)
}
}
}
func testMixedFunctions(bridge *functions.ExprBridge, data map[string]interface{}) {
tests := []struct {
name string
expression string
}{
{"混合计算1", "abs(temperature) + len(device)"},
{"混合计算2", "upper(concat(device, \"_\", string(int(humidity))))"},
{"复杂条件", "len(filter(values, abs(#) > 5)) > 0"},
{"字符串处理", "length(trim(upper(device)))"},
{"数值处理", "sqrt(abs(temperature)) + max(values)"},
}
for _, test := range tests {
result, err := bridge.EvaluateExpression(test.expression, data)
if err != nil {
fmt.Printf(" ❌ %s: %s -> 错误: %v\n", test.name, test.expression, err)
} else {
fmt.Printf(" ✅ %s: %s -> %v\n", test.name, test.expression, result)
}
}
}
func testFunctionConflicts(bridge *functions.ExprBridge, data map[string]interface{}) {
// 测试冲突函数的解析
conflictFunctions := []string{"abs", "max", "min", "upper", "lower"}
for _, funcName := range conflictFunctions {
_, exists, source := bridge.ResolveFunction(funcName)
if exists {
fmt.Printf(" 🔍 函数 '%s' 来源: %s\n", funcName, source)
}
}
// 测试使用别名访问StreamSQL版本
fmt.Println("\n 📝 使用别名访问StreamSQL函数:")
env := bridge.CreateEnhancedExprEnvironment(data)
if _, exists := env["streamsql_abs"]; exists {
fmt.Println(" ✅ streamsql_abs 别名可用")
}
if _, exists := env["streamsql_max"]; exists {
fmt.Println(" ✅ streamsql_max 别名可用")
}
}
func showAllFunctions() {
info := functions.GetAllAvailableFunctions()
// StreamSQL 函数
if streamSQLFuncs, ok := info["streamsql"].(map[string]interface{}); ok {
fmt.Printf(" 📦 StreamSQL 函数 (%d个):\n", len(streamSQLFuncs))
categories := make(map[string][]string)
for name, funcInfo := range streamSQLFuncs {
if info, ok := funcInfo.(map[string]interface{}); ok {
if category, ok := info["type"].(functions.FunctionType); ok {
categories[string(category)] = append(categories[string(category)], name)
}
}
}
for category, funcs := range categories {
fmt.Printf(" %s: %v\n", category, funcs)
}
}
// expr-lang 函数
if exprLangFuncs, ok := info["expr-lang"].(map[string]interface{}); ok {
fmt.Printf("\n 🚀 expr-lang 函数 (%d个):\n", len(exprLangFuncs))
categories := make(map[string][]string)
for name, funcInfo := range exprLangFuncs {
if info, ok := funcInfo.(map[string]interface{}); ok {
if category, ok := info["category"].(string); ok {
categories[category] = append(categories[category], name)
}
}
}
for category, funcs := range categories {
fmt.Printf(" %s: %v\n", category, funcs)
}
}
fmt.Printf("\n 📊 总计: StreamSQL %d个 + expr-lang %d个 函数\n",
len(info["streamsql"].(map[string]interface{})),
len(info["expr-lang"].(map[string]interface{})))
}
@@ -0,0 +1,51 @@
# 简单自定义函数示例
## 简介
这个示例展示了如何使用StreamSQL的插件式自定义函数系统注册和使用简单的自定义函数。
## 功能演示
- ✅ 数学函数:平方计算、华氏度转摄氏度、圆面积计算
- ✅ 直接SQL查询模式和聚合查询模式
- ✅ 函数管理功能:查询、分类、统计
## 运行方式
```bash
cd examples/simple-custom-functions
go run main.go
```
## 代码亮点
### 1. 简单函数注册
```go
functions.RegisterCustomFunction(
"square", // 函数名
functions.TypeMath, // 函数类型
"数学函数", // 分类
"计算平方", // 描述
1, 1, // 参数数量
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
val, _ := functions.ConvertToFloat64(args[0])
return val * val, nil
},
)
```
### 2. SQL中直接使用
```sql
SELECT square(value) as squared_value FROM stream
```
### 3. 聚合查询
```sql
SELECT AVG(square(value)) as avg_squared FROM stream GROUP BY device, TumblingWindow('1s')
```
## 适用场景
- 🔰 初学者入门StreamSQL自定义函数
- 📚 学习插件式函数注册机制
- 🧪 快速验证函数功能
+257
View File
@@ -0,0 +1,257 @@
package main
import (
"fmt"
"github.com/rulego/streamsql/utils/cast"
"math"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/functions"
)
func main() {
fmt.Println("🚀 StreamSQL 简单自定义函数演示")
fmt.Println("=================================")
// 注册一些简单的自定义函数
registerSimpleFunctions()
// 演示函数在SQL中的使用
demonstrateFunctions()
fmt.Println("\n✅ 演示完成!")
}
// 注册简单的自定义函数
func registerSimpleFunctions() {
fmt.Println("\n📋 注册自定义函数...")
// 1. 数学函数:平方
err := functions.RegisterCustomFunction(
"square",
functions.TypeMath,
"数学函数",
"计算平方",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
val := cast.ToFloat64(args[0])
return val * val, nil
},
)
if err != nil {
fmt.Printf("❌ 注册square函数失败: %v\n", err)
} else {
fmt.Println(" ✓ 注册数学函数: square")
}
// 2. 华氏度转摄氏度函数
err = functions.RegisterCustomFunction(
"f_to_c",
functions.TypeConversion,
"温度转换",
"华氏度转摄氏度",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
fahrenheit := cast.ToFloat64(args[0])
celsius := (fahrenheit - 32) * 5 / 9
return celsius, nil
},
)
if err != nil {
fmt.Printf("❌ 注册f_to_c函数失败: %v\n", err)
} else {
fmt.Println(" ✓ 注册转换函数: f_to_c")
}
// 3. 圆面积计算函数
err = functions.RegisterCustomFunction(
"circle_area",
functions.TypeMath,
"几何计算",
"计算圆的面积",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
radius := cast.ToFloat64(args[0])
if radius < 0 {
return nil, fmt.Errorf("半径必须为正数")
}
area := math.Pi * radius * radius
return area, nil
},
)
if err != nil {
fmt.Printf("❌ 注册circle_area函数失败: %v\n", err)
} else {
fmt.Println(" ✓ 注册几何函数: circle_area")
}
}
// 演示自定义函数的使用
func demonstrateFunctions() {
fmt.Println("\n🎯 演示自定义函数在SQL中的使用")
fmt.Println("================================")
// 创建StreamSQL实例
ssql := streamsql.New()
defer ssql.Stop()
// 1. 测试简单查询(不使用窗口)
testSimpleQuery(ssql)
// 2. 测试聚合查询(使用窗口)
testAggregateQuery(ssql)
}
func testSimpleQuery(ssql *streamsql.Streamsql) {
fmt.Println("\n📝 测试简单查询...")
sql := `
SELECT
device,
square(value) as squared_value,
f_to_c(temperature) as celsius,
circle_area(radius) as area
FROM stream
`
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ SQL执行失败: %v\n", err)
return
}
// 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
fmt.Printf(" 📊 简单查询结果: %v\n", result)
})
// 添加测试数据
testData := []interface{}{
map[string]interface{}{
"device": "sensor1",
"value": 5.0,
"temperature": 68.0, // 华氏度
"radius": 3.0,
},
map[string]interface{}{
"device": "sensor2",
"value": 10.0,
"temperature": 86.0, // 华氏度
"radius": 2.5,
},
}
for _, data := range testData {
ssql.AddData(data)
time.Sleep(200 * time.Millisecond) // 稍微延迟
}
time.Sleep(500 * time.Millisecond)
fmt.Println(" ✅ 简单查询测试完成")
}
func testAggregateQuery(ssql *streamsql.Streamsql) {
fmt.Println("\n📈 测试聚合查询...")
sql := `
SELECT
device,
AVG(square(value)) as avg_squared,
AVG(f_to_c(temperature)) as avg_celsius,
MAX(circle_area(radius)) as max_area
FROM stream
GROUP BY device, TumblingWindow('1s')
`
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ SQL执行失败: %v\n", err)
return
}
// 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
fmt.Printf(" 📊 聚合查询结果: %v\n", result)
})
// 添加测试数据
testData := []interface{}{
map[string]interface{}{
"device": "sensor1",
"value": 3.0,
"temperature": 32.0, // 0°C
"radius": 1.0,
},
map[string]interface{}{
"device": "sensor1",
"value": 4.0,
"temperature": 212.0, // 100°C
"radius": 2.0,
},
map[string]interface{}{
"device": "sensor2",
"value": 5.0,
"temperature": 68.0, // 20°C
"radius": 1.5,
},
}
for _, data := range testData {
ssql.AddData(data)
}
// 等待窗口触发
time.Sleep(1 * time.Second)
ssql.Stream().Window.Trigger()
time.Sleep(500 * time.Millisecond)
fmt.Println(" ✅ 聚合查询测试完成")
// 展示函数管理功能
showFunctionManagement()
}
func showFunctionManagement() {
fmt.Println("\n🔧 函数管理功能演示")
fmt.Println("==================")
// 列出所有数学函数
fmt.Println("\n📊 数学函数:")
mathFunctions := functions.GetByType(functions.TypeMath)
for _, fn := range mathFunctions {
fmt.Printf(" • %s - %s\n", fn.GetName(), fn.GetDescription())
}
// 列出所有字符串函数
fmt.Println("\n📝 字符串函数:")
stringFunctions := functions.GetByType(functions.TypeString)
for _, fn := range stringFunctions {
fmt.Printf(" • %s - %s\n", fn.GetName(), fn.GetDescription())
}
// 检查特定函数是否存在
fmt.Println("\n🔍 函数查找:")
if fn, exists := functions.Get("square"); exists {
fmt.Printf(" ✓ 找到函数: %s (%s)\n", fn.GetName(), fn.GetDescription())
}
if fn, exists := functions.Get("f_to_c"); exists {
fmt.Printf(" ✓ 找到函数: %s (%s)\n", fn.GetName(), fn.GetDescription())
}
// 统计函数数量
allFunctions := functions.ListAll()
fmt.Printf("\n📈 统计信息:\n")
fmt.Printf(" • 总函数数量: %d\n", len(allFunctions))
// 按类型统计
typeCount := make(map[functions.FunctionType]int)
for _, fn := range allFunctions {
typeCount[fn.GetType()]++
}
for fnType, count := range typeCount {
fmt.Printf(" • %s: %d个\n", fnType, count)
}
}
+731
View File
File diff suppressed because it is too large Load Diff
+117
View File
@@ -0,0 +1,117 @@
package expr
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestExpressionEvaluation(t *testing.T) {
tests := []struct {
name string
expr string
data map[string]interface{}
expected float64
hasError bool
}{
// 基本运算测试
{"Simple Addition", "a + b", map[string]interface{}{"a": 5, "b": 3}, 8, false},
{"Simple Subtraction", "a - b", map[string]interface{}{"a": 5, "b": 3}, 2, false},
{"Simple Multiplication", "a * b", map[string]interface{}{"a": 5, "b": 3}, 15, false},
{"Simple Division", "a / b", map[string]interface{}{"a": 6, "b": 3}, 2, false},
{"Modulo", "a % b", map[string]interface{}{"a": 7, "b": 4}, 3, false},
{"Power", "a ^ b", map[string]interface{}{"a": 2, "b": 3}, 8, false},
// 复合表达式测试
{"Complex Expression", "a + b * c", map[string]interface{}{"a": 5, "b": 3, "c": 2}, 11, false},
{"Complex Expression With Parentheses", "(a + b) * c", map[string]interface{}{"a": 5, "b": 3, "c": 2}, 16, false},
{"Multiple Operations", "a + b * c - d / e", map[string]interface{}{"a": 5, "b": 3, "c": 2, "d": 8, "e": 4}, 9, false},
// 函数调用测试
{"Abs Function", "abs(a - b)", map[string]interface{}{"a": 3, "b": 5}, 2, false},
{"Sqrt Function", "sqrt(a)", map[string]interface{}{"a": 16}, 4, false},
{"Round Function", "round(a)", map[string]interface{}{"a": 3.7}, 4, false},
// 转换测试
{"String to Number", "a + b", map[string]interface{}{"a": "5", "b": 3}, 8, false},
// 复杂表达式测试
{"Temperature Conversion", "temperature * 1.8 + 32", map[string]interface{}{"temperature": 25}, 77, false},
{"Complex Math", "sqrt(abs(a * b - c / d))", map[string]interface{}{"a": 10, "b": 2, "c": 5, "d": 1}, 4.5, false},
// 错误测试
{"Division by Zero", "a / b", map[string]interface{}{"a": 5, "b": 0}, 0, true},
{"Missing Field", "a + b", map[string]interface{}{"a": 5}, 0, true},
{"Invalid Function", "unknown(a)", map[string]interface{}{"a": 5}, 0, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
expr, err := NewExpression(tt.expr)
assert.NoError(t, err, "Expression parsing should not fail")
result, err := expr.Evaluate(tt.data)
if tt.hasError {
assert.Error(t, err, "Expected error")
} else {
assert.NoError(t, err, "Evaluation should not fail")
assert.InDelta(t, tt.expected, result, 0.001, "Result should match expected value")
}
})
}
}
func TestGetFields(t *testing.T) {
tests := []struct {
expr string
expectedFields []string
}{
{"a + b", []string{"a", "b"}},
{"a + b * c", []string{"a", "b", "c"}},
{"temperature * 1.8 + 32", []string{"temperature"}},
{"abs(humidity - 50)", []string{"humidity"}},
{"sqrt(x^2 + y^2)", []string{"x", "y"}},
}
for _, tt := range tests {
t.Run(tt.expr, func(t *testing.T) {
expr, err := NewExpression(tt.expr)
assert.NoError(t, err, "Expression parsing should not fail")
fields := expr.GetFields()
// 由于map迭代顺序不确定,我们只检查长度和包含关系
assert.Equal(t, len(tt.expectedFields), len(fields), "Number of fields should match")
for _, field := range tt.expectedFields {
found := false
for _, f := range fields {
if f == field {
found = true
break
}
}
assert.True(t, found, "Field %s should be found", field)
}
})
}
}
func TestParseError(t *testing.T) {
tests := []struct {
name string
expr string
}{
{"Empty Expression", ""},
{"Mismatched Parentheses", "a + (b * c"},
{"Invalid Character", "a # b"},
{"Double Operator", "a + * b"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewExpression(tt.expr)
assert.Error(t, err, "Expression parsing should fail")
})
}
}
+173
View File
@@ -0,0 +1,173 @@
# StreamSQL Functions 模块扩展
## 概述
本次扩展实现了统一的聚合函数和分析函数管理,简化了自定义函数的扩展过程。现在只需要在 `functions` 模块中实现函数,就可以自动在 `aggregator` 模块中使用。
## 主要改进
### 1. 统一的函数接口
- **AggregatorFunction**: 支持增量计算的聚合函数接口
- **AnalyticalFunction**: 支持状态管理的分析函数接口
- **Function**: 基础函数接口
### 2. 自动适配器
- **AggregatorAdapter**: 将 functions 模块的聚合函数适配到 aggregator 模块
- **AnalyticalAdapter**: 将 functions 模块的分析函数适配到 aggregator 模块
### 3. 简化的扩展流程
现在添加自定义函数只需要:
1. 在 functions 模块中实现函数
2. 注册函数和适配器
3. 无需修改 aggregator 模块
## 使用方法
### 创建自定义聚合函数
```go
// 1. 定义函数结构
type CustomSumFunction struct {
*BaseFunction
sum float64
}
// 2. 实现基础接口
func (f *CustomSumFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
// 实现函数逻辑
}
// 3. 实现AggregatorFunction接口
func (f *CustomSumFunction) New() AggregatorFunction {
return &CustomSumFunction{BaseFunction: f.BaseFunction}
}
func (f *CustomSumFunction) Add(value interface{}) {
// 增量计算逻辑
}
func (f *CustomSumFunction) Result() interface{} {
return f.sum
}
func (f *CustomSumFunction) Reset() {
f.sum = 0
}
func (f *CustomSumFunction) Clone() AggregatorFunction {
return &CustomSumFunction{BaseFunction: f.BaseFunction, sum: f.sum}
}
// 4. 注册函数
func init() {
Register(NewCustomSumFunction())
RegisterAggregatorAdapter("custom_sum")
}
```
### 创建自定义分析函数
```go
// 1. 定义函数结构
type CustomAnalyticalFunction struct {
*BaseFunction
state interface{}
}
// 2. 实现基础接口
func (f *CustomAnalyticalFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
// 实现分析逻辑
}
// 3. 实现AnalyticalFunction接口
func (f *CustomAnalyticalFunction) Reset() {
f.state = nil
}
func (f *CustomAnalyticalFunction) Clone() AnalyticalFunction {
return &CustomAnalyticalFunction{BaseFunction: f.BaseFunction, state: f.state}
}
// 4. 注册函数
func init() {
Register(NewCustomAnalyticalFunction())
RegisterAnalyticalAdapter("custom_analytical")
}
```
### 使用简化的注册方式
```go
// 注册简单的自定义函数
RegisterCustomFunction("double", TypeAggregation, "数学函数", "将值乘以2", 1, 1,
func(ctx *FunctionContext, args []interface{}) (interface{}, error) {
val, err := cast.ToFloat64E(args[0])
if err != nil {
return nil, err
}
return val * 2, nil
})
```
## 内置函数
### 聚合函数
- `sum`: 求和
- `avg`: 平均值
- `min`: 最小值
- `max`: 最大值
- `count`: 计数
- `stddev`: 标准差
- `median`: 中位数
- `percentile`: 百分位数
- `collect`: 收集所有值
- `last_value`: 最后一个值
- `merge_agg`: 合并聚合
- `stddevs`: 样本标准差
- `deduplicate`: 去重
- `var`: 总体方差
- `vars`: 样本方差
### 分析函数
- `lag`: 滞后函数
- `latest`: 最新值
- `changed_col`: 变化列
- `had_changed`: 是否变化
## 自定义函数示例
参考 `custom_example.go` 文件中的示例:
- `CustomProductFunction`: 乘积聚合函数
- `CustomGeometricMeanFunction`: 几何平均聚合函数
- `CustomMovingAverageFunction`: 移动平均分析函数
## 兼容性
- 完全兼容现有的 aggregator 模块接口
- 现有的聚合器和分析函数继续正常工作
- 新的函数会优先使用 functions 模块的实现
## SQL 解析调整
SQL 解析器需要调整以支持新的函数注册机制:
1. 在解析聚合函数时,优先查找 functions 模块中的注册函数
2. 支持动态函数发现和验证
3. 提供更好的错误信息和函数提示
## 性能优化
- 增量计算减少重复计算
- 函数注册表提供快速查找
- 适配器模式保持接口兼容性
- 状态管理支持复杂分析场景
## 扩展建议
1. **窗口函数**: 可以基于 AnalyticalFunction 实现更复杂的窗口函数
2. **用户定义函数**: 支持运行时动态加载函数
3. **函数组合**: 支持函数的组合和链式调用
4. **性能监控**: 添加函数执行性能监控和优化
+175
View File
@@ -0,0 +1,175 @@
# StreamSQL Functions 模块重构总结
## 重构目标
将所有函数计算相关的逻辑都迁移到 `functions` 模块,让 `aggregator` 模块只负责调用 `functions` 模块,简化自定义函数的扩展过程。
## 重构成果
### 1. 统一的函数管理
- **所有聚合函数和分析函数都在 `functions` 模块中实现**
- **`aggregator` 模块只保留接口定义和适配器逻辑**
- **新增自定义函数只需要在 `functions` 模块中添加,无需修改多个模块**
### 2. 支持增量计算的聚合函数
所有聚合函数都实现了 `AggregatorFunction` 接口,支持:
- `New()`: 创建新实例
- `Add(value)`: 增量添加值
- `Result()`: 获取聚合结果
- `Reset()`: 重置状态
- `Clone()`: 克隆实例
### 3. 支持状态管理的分析函数
所有分析函数都实现了 `AnalyticalFunction` 接口,支持:
- `Reset()`: 重置函数状态
- `Clone()`: 克隆函数实例
- 状态保持和历史数据管理
### 4. 自动适配器机制
- **AggregatorAdapter**: 将 functions 模块的聚合函数适配到 aggregator 模块
- **AnalyticalAdapter**: 将 functions 模块的分析函数适配到 aggregator 模块
- **AnalyticalAggregatorAdapter**: 将分析函数适配为聚合器接口
## 已实现的函数
### 聚合函数 (支持增量计算)
- `sum`: 求和
- `avg`: 平均值
- `min`: 最小值
- `max`: 最大值
- `count`: 计数
- `stddev`: 标准差
- `median`: 中位数
- `percentile`: 百分位数
- `collect`: 收集所有值
- `last_value`: 最后一个值
- `merge_agg`: 合并聚合
- `stddevs`: 样本标准差
- `deduplicate`: 去重
- `var`: 总体方差
- `vars`: 样本方差
### 分析函数 (支持状态管理)
- `lag`: 滞后函数
- `latest`: 最新值
- `changed_col`: 变化列
- `had_changed`: 是否变化
### 窗口函数
- `window_start`: 窗口开始时间
- `window_end`: 窗口结束时间
- `expression`: 表达式函数
## 使用方法
### 1. 创建聚合器实例
```go
// 通过 aggregator 模块(推荐)
agg := aggregator.CreateBuiltinAggregator(aggregator.Sum)
// 直接通过 functions 模块
sumFunc := functions.NewSumFunction()
aggInstance := sumFunc.New()
```
### 2. 增量计算
```go
agg.Add(10.0)
agg.Add(20.0)
agg.Add(30.0)
result := agg.Result() // 60.0
```
### 3. 分析函数使用
```go
lagFunc := functions.NewLagFunction()
ctx := &functions.FunctionContext{
Data: make(map[string]interface{}),
}
// 第一个值返回默认值 nil
result1, _ := lagFunc.Execute(ctx, []interface{}{10})
// 第二个值返回第一个值 10
result2, _ := lagFunc.Execute(ctx, []interface{}{20})
```
### 4. 添加自定义函数
```go
// 1. 实现聚合函数
type CustomSumFunction struct {
*functions.BaseFunction
sum float64
}
// 2. 实现必要的接口方法
func (f *CustomSumFunction) New() functions.AggregatorFunction { ... }
func (f *CustomSumFunction) Add(value interface{}) { ... }
func (f *CustomSumFunction) Result() interface{} { ... }
// ... 其他方法
// 3. 注册函数
functions.Register(NewCustomSumFunction())
functions.RegisterAggregatorAdapter("custom_sum")
```
## 兼容性
- **完全兼容现有的 aggregator 模块接口**
- **现有代码无需修改**
- **新的函数会优先使用 functions 模块的实现**
- **保留了原有的注册机制作为后备**
## 性能优化
- **增量计算减少重复计算开销**
- **函数注册表提供快速查找**
- **适配器模式保持接口兼容性**
- **状态管理支持复杂分析场景**
## 测试覆盖
所有重构后的功能都有完整的测试覆盖:
- `TestFunctionsAggregatorIntegration`: 聚合函数集成测试
- `TestAnalyticalFunctionsIntegration`: 分析函数集成测试
- `TestComplexAggregators`: 复杂聚合器测试
- `TestWindowFunctions`: 窗口函数测试
- `TestAdapterFunctions`: 适配器功能测试
## 扩展建议
1. **SQL 解析器调整**: 在解析聚合函数时,优先查找 functions 模块中的注册函数
2. **动态函数发现**: 支持运行时动态加载函数
3. **函数组合**: 支持函数的组合和链式调用
4. **性能监控**: 添加函数执行性能监控和优化
5. **更多内置函数**: 基于新的架构添加更多统计和分析函数
## 文件结构
```
functions/
├── aggregator_interface.go # 聚合器和分析函数接口定义
├── aggregator_adapter.go # 适配器实现
├── analytical_aggregator_adapter.go # 分析函数聚合器适配器
├── functions_aggregation.go # 聚合函数实现
├── functions_analytical.go # 分析函数实现
├── functions_window.go # 窗口函数实现
├── init.go # 函数注册
├── integration_test.go # 集成测试
├── custom_example.go # 自定义函数示例
└── README.md # 使用文档
aggregator/
├── builtin.go # 简化的聚合器接口和适配逻辑
└── analytical_aggregators.go # 简化的分析聚合器占位符
```
这次重构成功实现了将所有函数计算逻辑统一到 `functions` 模块的目标,大大简化了自定义函数的扩展过程。
+168
View File
@@ -0,0 +1,168 @@
package functions
import (
"sync"
)
// AggregatorAdapter 聚合器适配器,兼容原有的aggregator接口
type AggregatorAdapter struct {
aggFunc AggregatorFunction
}
// NewAggregatorAdapter 创建聚合器适配器
func NewAggregatorAdapter(name string) (*AggregatorAdapter, error) {
aggFunc, err := CreateAggregator(name)
if err != nil {
return nil, err
}
return &AggregatorAdapter{
aggFunc: aggFunc,
}, nil
}
// New 创建新的聚合器实例
func (a *AggregatorAdapter) New() interface{} {
return &AggregatorAdapter{
aggFunc: a.aggFunc.New(),
}
}
// Add 添加值
func (a *AggregatorAdapter) Add(value interface{}) {
a.aggFunc.Add(value)
}
// Result 获取结果
func (a *AggregatorAdapter) Result() interface{} {
return a.aggFunc.Result()
}
// GetFunctionName 获取底层函数名称,用于支持context机制
func (a *AggregatorAdapter) GetFunctionName() string {
if a.aggFunc != nil {
return a.aggFunc.GetName()
}
return ""
}
// AnalyticalAdapter 分析函数适配器
type AnalyticalAdapter struct {
analFunc AnalyticalFunction
}
// NewAnalyticalAdapter 创建分析函数适配器
func NewAnalyticalAdapter(name string) (*AnalyticalAdapter, error) {
analFunc, err := CreateAnalytical(name)
if err != nil {
return nil, err
}
return &AnalyticalAdapter{
analFunc: analFunc,
}, nil
}
// Execute 执行分析函数
func (a *AnalyticalAdapter) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
return a.analFunc.Execute(ctx, args)
}
// Reset 重置状态
func (a *AnalyticalAdapter) Reset() {
a.analFunc.Reset()
}
// Clone 克隆实例
func (a *AnalyticalAdapter) Clone() *AnalyticalAdapter {
return &AnalyticalAdapter{
analFunc: a.analFunc.Clone(),
}
}
// 全局适配器注册表
var (
aggregatorAdapters = make(map[string]func() interface{})
analyticalAdapters = make(map[string]func() *AnalyticalAdapter)
adapterMutex sync.RWMutex
)
// RegisterAggregatorAdapter 注册聚合器适配器
func RegisterAggregatorAdapter(name string) error {
adapterMutex.Lock()
defer adapterMutex.Unlock()
aggregatorAdapters[name] = func() interface{} {
adapter, err := NewAggregatorAdapter(name)
if err != nil {
return nil
}
return adapter
}
return nil
}
// RegisterAnalyticalAdapter 注册分析函数适配器
func RegisterAnalyticalAdapter(name string) error {
adapterMutex.Lock()
defer adapterMutex.Unlock()
analyticalAdapters[name] = func() *AnalyticalAdapter {
adapter, err := NewAnalyticalAdapter(name)
if err != nil {
return nil
}
return adapter
}
return nil
}
// GetAggregatorAdapter 获取聚合器适配器
func GetAggregatorAdapter(name string) (func() interface{}, bool) {
adapterMutex.RLock()
defer adapterMutex.RUnlock()
constructor, exists := aggregatorAdapters[name]
return constructor, exists
}
// GetAnalyticalAdapter 获取分析函数适配器
func GetAnalyticalAdapter(name string) (func() *AnalyticalAdapter, bool) {
adapterMutex.RLock()
defer adapterMutex.RUnlock()
constructor, exists := analyticalAdapters[name]
return constructor, exists
}
// CreateBuiltinAggregatorFromFunctions 从functions模块创建聚合器
func CreateBuiltinAggregatorFromFunctions(aggType string) interface{} {
// 首先尝试从适配器注册表获取
if constructor, exists := GetAggregatorAdapter(aggType); exists {
return constructor()
}
// 如果没有找到,尝试直接创建
adapter, err := NewAggregatorAdapter(aggType)
if err != nil {
return nil
}
return adapter
}
// CreateAnalyticalFromFunctions 从functions模块创建分析函数
func CreateAnalyticalFromFunctions(funcType string) *AnalyticalAdapter {
// 首先尝试从适配器注册表获取
if constructor, exists := GetAnalyticalAdapter(funcType); exists {
return constructor()
}
// 如果没有找到,尝试直接创建
adapter, err := NewAnalyticalAdapter(funcType)
if err != nil {
return nil
}
return adapter
}
+52
View File
@@ -0,0 +1,52 @@
package functions
import "fmt"
// AggregatorFunction 聚合器函数接口,支持增量计算
type AggregatorFunction interface {
Function
// New 创建新的聚合器实例
New() AggregatorFunction
// Add 添加值进行增量计算
Add(value interface{})
// Result 获取聚合结果
Result() interface{}
// Reset 重置聚合器状态
Reset()
// Clone 克隆聚合器(用于窗口函数等场景)
Clone() AggregatorFunction
}
// AnalyticalFunction 分析函数接口,支持状态管理
// 现在继承自AggregatorFunction,支持增量计算
type AnalyticalFunction interface {
AggregatorFunction
}
// CreateAggregator 创建聚合器实例
func CreateAggregator(name string) (AggregatorFunction, error) {
fn, exists := Get(name)
if !exists {
return nil, fmt.Errorf("aggregator function %s not found", name)
}
if aggFn, ok := fn.(AggregatorFunction); ok {
return aggFn.New(), nil
}
return nil, fmt.Errorf("function %s is not an aggregator function", name)
}
// CreateAnalytical 创建分析函数实例
func CreateAnalytical(name string) (AnalyticalFunction, error) {
fn, exists := Get(name)
if !exists {
return nil, fmt.Errorf("analytical function %s not found", name)
}
if analFn, ok := fn.(AnalyticalFunction); ok {
return analFn.New().(AnalyticalFunction), nil
}
return nil, fmt.Errorf("function %s is not an analytical function", name)
}
+166
View File
@@ -0,0 +1,166 @@
package functions
import (
"sync"
)
// AggregateType 聚合类型,从 aggregator.AggregateType 迁移而来
type AggregateType string
const (
Sum AggregateType = "sum"
Count AggregateType = "count"
Avg AggregateType = "avg"
Max AggregateType = "max"
Min AggregateType = "min"
StdDev AggregateType = "stddev"
Median AggregateType = "median"
Percentile AggregateType = "percentile"
WindowStart AggregateType = "window_start"
WindowEnd AggregateType = "window_end"
Collect AggregateType = "collect"
LastValue AggregateType = "last_value"
MergeAgg AggregateType = "merge_agg"
StdDevS AggregateType = "stddevs"
Deduplicate AggregateType = "deduplicate"
Var AggregateType = "var"
VarS AggregateType = "vars"
// 分析函数
Lag AggregateType = "lag"
Latest AggregateType = "latest"
ChangedCol AggregateType = "changed_col"
HadChanged AggregateType = "had_changed"
// 表达式聚合器,用于处理自定义函数
Expression AggregateType = "expression"
)
// 为了方便使用,提供字符串常量版本
const (
SumStr = string(Sum)
CountStr = string(Count)
AvgStr = string(Avg)
MaxStr = string(Max)
MinStr = string(Min)
StdDevStr = string(StdDev)
MedianStr = string(Median)
PercentileStr = string(Percentile)
WindowStartStr = string(WindowStart)
WindowEndStr = string(WindowEnd)
CollectStr = string(Collect)
LastValueStr = string(LastValue)
MergeAggStr = string(MergeAgg)
StdDevSStr = string(StdDevS)
DeduplicateStr = string(Deduplicate)
VarStr = string(Var)
VarSStr = string(VarS)
// 分析函数
LagStr = string(Lag)
LatestStr = string(Latest)
ChangedColStr = string(ChangedCol)
HadChangedStr = string(HadChanged)
// 表达式聚合器
ExpressionStr = string(Expression)
)
// LegacyAggregatorFunction 兼容原有aggregator接口的聚合器函数接口
// 保持与原有接口兼容,用于向后兼容
type LegacyAggregatorFunction interface {
New() LegacyAggregatorFunction
Add(value interface{})
Result() interface{}
}
// ContextAggregator 支持context机制的聚合器接口
type ContextAggregator interface {
GetContextKey() string
}
var (
legacyAggregatorRegistry = make(map[string]func() LegacyAggregatorFunction)
legacyRegistryMutex sync.RWMutex
)
// RegisterLegacyAggregator 注册传统聚合器到全局注册表
func RegisterLegacyAggregator(name string, constructor func() LegacyAggregatorFunction) {
legacyRegistryMutex.Lock()
defer legacyRegistryMutex.Unlock()
legacyAggregatorRegistry[name] = constructor
}
// CreateLegacyAggregator 创建传统聚合器,优先使用functions模块
func CreateLegacyAggregator(aggType AggregateType) LegacyAggregatorFunction {
// 首先尝试从functions模块创建聚合器
if aggFunc := CreateBuiltinAggregatorFromFunctions(string(aggType)); aggFunc != nil {
if adapter, ok := aggFunc.(*AggregatorAdapter); ok {
return &FunctionAggregatorWrapper{adapter: adapter}
}
}
// 尝试从functions模块创建分析函数聚合器
if analFunc := CreateAnalyticalAggregatorFromFunctions(string(aggType)); analFunc != nil {
if adapter, ok := analFunc.(*AnalyticalAggregatorAdapter); ok {
return &AnalyticalAggregatorWrapper{adapter: adapter}
}
}
// 检查自定义注册表
legacyRegistryMutex.RLock()
constructor, exists := legacyAggregatorRegistry[string(aggType)]
legacyRegistryMutex.RUnlock()
if exists {
return constructor()
}
// 如果都没有找到,抛出错误
panic("unsupported aggregator type: " + aggType)
}
// FunctionAggregatorWrapper 包装functions模块的聚合器,使其兼容原有接口
type FunctionAggregatorWrapper struct {
adapter *AggregatorAdapter
}
func (w *FunctionAggregatorWrapper) New() LegacyAggregatorFunction {
newAdapter := w.adapter.New().(*AggregatorAdapter)
return &FunctionAggregatorWrapper{adapter: newAdapter}
}
func (w *FunctionAggregatorWrapper) Add(value interface{}) {
w.adapter.Add(value)
}
func (w *FunctionAggregatorWrapper) Result() interface{} {
return w.adapter.Result()
}
// 实现ContextAggregator接口,支持窗口函数的context机制
func (w *FunctionAggregatorWrapper) GetContextKey() string {
// 检查底层函数是否是窗口函数
if w.adapter != nil {
switch w.adapter.GetFunctionName() {
case "window_start":
return "window_start"
case "window_end":
return "window_end"
}
}
return ""
}
// AnalyticalAggregatorWrapper 包装functions模块的分析函数聚合器,使其兼容原有接口
type AnalyticalAggregatorWrapper struct {
adapter *AnalyticalAggregatorAdapter
}
func (w *AnalyticalAggregatorWrapper) New() LegacyAggregatorFunction {
newAdapter := w.adapter.New().(*AnalyticalAggregatorAdapter)
return &AnalyticalAggregatorWrapper{adapter: newAdapter}
}
func (w *AnalyticalAggregatorWrapper) Add(value interface{}) {
w.adapter.Add(value)
}
func (w *AnalyticalAggregatorWrapper) Result() interface{} {
return w.adapter.Result()
}
@@ -0,0 +1,81 @@
package functions
// AnalyticalAggregatorAdapter 分析函数到聚合器的适配器
type AnalyticalAggregatorAdapter struct {
analFunc AnalyticalFunction
ctx *FunctionContext
}
// NewAnalyticalAggregatorAdapter 创建分析函数聚合器适配器
func NewAnalyticalAggregatorAdapter(name string) (*AnalyticalAggregatorAdapter, error) {
analFunc, err := CreateAnalytical(name)
if err != nil {
return nil, err
}
return &AnalyticalAggregatorAdapter{
analFunc: analFunc,
ctx: &FunctionContext{
Data: make(map[string]interface{}),
},
}, nil
}
// New 创建新的适配器实例
func (a *AnalyticalAggregatorAdapter) New() interface{} {
return &AnalyticalAggregatorAdapter{
analFunc: a.analFunc.Clone(),
ctx: &FunctionContext{
Data: make(map[string]interface{}),
},
}
}
// Add 添加值
func (a *AnalyticalAggregatorAdapter) Add(value interface{}) {
// 执行分析函数
args := []interface{}{value}
a.analFunc.Execute(a.ctx, args)
}
// Result 获取结果
func (a *AnalyticalAggregatorAdapter) Result() interface{} {
// 对于LatestFunction,直接返回LatestValue
if latestFunc, ok := a.analFunc.(*LatestFunction); ok {
return latestFunc.LatestValue
}
// 对于HadChangedFunction,返回当前状态
if hadChangedFunc, ok := a.analFunc.(*HadChangedFunction); ok {
return hadChangedFunc.IsSet
}
// 对于其他分析函数,尝试执行一次来获取当前状态的结果
// 这里传入nil作为参数,表示获取当前状态
result, _ := a.analFunc.Execute(a.ctx, []interface{}{nil})
return result
}
// CreateAnalyticalAggregatorFromFunctions 从functions模块创建分析函数聚合器
func CreateAnalyticalAggregatorFromFunctions(funcType string) interface{} {
// 首先尝试从适配器注册表获取
if constructor, exists := GetAnalyticalAdapter(funcType); exists {
adapter := constructor()
if adapter != nil {
return &AnalyticalAggregatorAdapter{
analFunc: adapter.analFunc,
ctx: &FunctionContext{
Data: make(map[string]interface{}),
},
}
}
}
// 如果没有找到,尝试直接创建
adapter, err := NewAnalyticalAggregatorAdapter(funcType)
if err != nil {
return nil
}
return adapter
}

Some files were not shown because too many files have changed in this diff Show More