mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-04-10 06:19:52 +00:00
494 lines
14 KiB
Markdown
494 lines
14 KiB
Markdown
# StreamSQL 自定义函数开发指南
|
|
|
|
## 🚀 概述
|
|
|
|
StreamSQL 提供了强大而灵活的自定义函数系统,支持用户根据业务需求扩展各种类型的函数,包括数学函数、字符串函数、聚合函数、分析函数等。
|
|
|
|
## 📋 函数类型分类
|
|
|
|
### 内置函数类型
|
|
|
|
```go
|
|
const (
|
|
TypeAggregation FunctionType = "aggregation" // 聚合函数
|
|
TypeWindow FunctionType = "window" // 窗口函数
|
|
TypeDateTime FunctionType = "datetime" // 时间日期函数
|
|
TypeConversion FunctionType = "conversion" // 转换函数
|
|
TypeMath FunctionType = "math" // 数学函数
|
|
TypeString FunctionType = "string" // 字符串函数
|
|
TypeAnalytical FunctionType = "analytical" // 分析函数
|
|
TypeCustom FunctionType = "custom" // 用户自定义函数
|
|
)
|
|
```
|
|
|
|
## 🛠️ 自定义函数实现方式
|
|
|
|
### 方式一:快速注册(推荐简单函数)
|
|
|
|
```go
|
|
import "github.com/rulego/streamsql/functions"
|
|
|
|
// 注册一个简单的数学函数
|
|
err := functions.RegisterCustomFunction(
|
|
"double", // 函数名
|
|
functions.TypeMath, // 函数类型
|
|
"数学函数", // 分类描述
|
|
"将数值乘以2", // 函数描述
|
|
1, // 最少参数个数
|
|
1, // 最多参数个数
|
|
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
|
val, err := functions.ConvertToFloat64(args[0])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return val * 2, nil
|
|
},
|
|
)
|
|
```
|
|
|
|
### 方式二:完整结构体实现(推荐复杂函数)
|
|
|
|
```go
|
|
// 1. 定义函数结构体
|
|
type AdvancedMathFunction struct {
|
|
*functions.BaseFunction
|
|
// 可以添加状态变量
|
|
cache map[string]interface{}
|
|
}
|
|
|
|
// 2. 实现构造函数
|
|
func NewAdvancedMathFunction() *AdvancedMathFunction {
|
|
return &AdvancedMathFunction{
|
|
BaseFunction: functions.NewBaseFunction(
|
|
"advanced_calc", // 函数名
|
|
functions.TypeMath, // 函数类型
|
|
"高级数学函数", // 分类
|
|
"高级数学计算", // 描述
|
|
2, // 最少参数
|
|
3, // 最多参数
|
|
),
|
|
cache: make(map[string]interface{}),
|
|
}
|
|
}
|
|
|
|
// 3. 实现验证方法(可选,如有特殊验证需求)
|
|
func (f *AdvancedMathFunction) Validate(args []interface{}) error {
|
|
if err := f.ValidateArgCount(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 自定义验证逻辑
|
|
if len(args) >= 2 {
|
|
if _, err := functions.ConvertToFloat64(args[0]); err != nil {
|
|
return fmt.Errorf("第一个参数必须是数值")
|
|
}
|
|
if _, err := functions.ConvertToFloat64(args[1]); err != nil {
|
|
return fmt.Errorf("第二个参数必须是数值")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// 4. 实现执行方法
|
|
func (f *AdvancedMathFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
|
a, _ := functions.ConvertToFloat64(args[0])
|
|
b, _ := functions.ConvertToFloat64(args[1])
|
|
|
|
operation := "add" // 默认操作
|
|
if len(args) > 2 {
|
|
op, err := functions.ConvertToString(args[2])
|
|
if err == nil {
|
|
operation = op
|
|
}
|
|
}
|
|
|
|
switch operation {
|
|
case "add":
|
|
return a + b, nil
|
|
case "multiply":
|
|
return a * b, nil
|
|
case "power":
|
|
return math.Pow(a, b), nil
|
|
default:
|
|
return nil, fmt.Errorf("不支持的操作: %s", operation)
|
|
}
|
|
}
|
|
|
|
// 5. 注册函数
|
|
func init() {
|
|
functions.Register(NewAdvancedMathFunction())
|
|
}
|
|
```
|
|
|
|
## 🎯 各类型函数实现示例
|
|
|
|
### 1. 数学函数示例
|
|
|
|
```go
|
|
// 距离计算函数
|
|
func RegisterDistanceFunction() error {
|
|
return functions.RegisterCustomFunction(
|
|
"distance",
|
|
functions.TypeMath,
|
|
"几何数学",
|
|
"计算两点间距离",
|
|
4, 4,
|
|
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
|
x1, err := functions.ConvertToFloat64(args[0])
|
|
if err != nil { return nil, err }
|
|
y1, err := functions.ConvertToFloat64(args[1])
|
|
if err != nil { return nil, err }
|
|
x2, err := functions.ConvertToFloat64(args[2])
|
|
if err != nil { return nil, err }
|
|
y2, err := functions.ConvertToFloat64(args[3])
|
|
if err != nil { return nil, err }
|
|
|
|
distance := math.Sqrt(math.Pow(x2-x1, 2) + math.Pow(y2-y1, 2))
|
|
return distance, nil
|
|
},
|
|
)
|
|
}
|
|
|
|
// SQL使用示例:
|
|
// SELECT device, distance(lat1, lon1, lat2, lon2) as dist FROM stream
|
|
```
|
|
|
|
### 2. 字符串函数示例
|
|
|
|
```go
|
|
// JSON提取函数
|
|
func RegisterJsonExtractFunction() error {
|
|
return functions.RegisterCustomFunction(
|
|
"json_extract",
|
|
functions.TypeString,
|
|
"JSON处理",
|
|
"从JSON字符串中提取字段值",
|
|
2, 2,
|
|
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
|
jsonStr, err := functions.ConvertToString(args[0])
|
|
if err != nil { return nil, err }
|
|
|
|
path, err := functions.ConvertToString(args[1])
|
|
if err != nil { return nil, err }
|
|
|
|
var data map[string]interface{}
|
|
if err := json.Unmarshal([]byte(jsonStr), &data); err != nil {
|
|
return nil, fmt.Errorf("invalid JSON: %v", err)
|
|
}
|
|
|
|
// 简单路径提取(可扩展为复杂JSONPath)
|
|
value, exists := data[path]
|
|
if !exists {
|
|
return nil, nil
|
|
}
|
|
|
|
return value, nil
|
|
},
|
|
)
|
|
}
|
|
|
|
// SQL使用示例:
|
|
// SELECT device, json_extract(metadata, 'version') as version FROM stream
|
|
```
|
|
|
|
### 3. 时间日期函数示例
|
|
|
|
```go
|
|
// 时间格式化函数
|
|
func RegisterDateFormatFunction() error {
|
|
return functions.RegisterCustomFunction(
|
|
"date_format",
|
|
functions.TypeDateTime,
|
|
"时间格式化",
|
|
"格式化时间戳为指定格式",
|
|
2, 2,
|
|
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
|
timestamp, err := functions.ConvertToInt64(args[0])
|
|
if err != nil { return nil, err }
|
|
|
|
format, err := functions.ConvertToString(args[1])
|
|
if err != nil { return nil, err }
|
|
|
|
t := time.Unix(timestamp, 0)
|
|
|
|
// 支持常见格式
|
|
switch format {
|
|
case "YYYY-MM-DD":
|
|
return t.Format("2006-01-02"), nil
|
|
case "YYYY-MM-DD HH:mm:ss":
|
|
return t.Format("2006-01-02 15:04:05"), nil
|
|
case "RFC3339":
|
|
return t.Format(time.RFC3339), nil
|
|
default:
|
|
return t.Format(format), nil
|
|
}
|
|
},
|
|
)
|
|
}
|
|
|
|
// SQL使用示例:
|
|
// SELECT device, date_format(timestamp, 'YYYY-MM-DD') as date FROM stream
|
|
```
|
|
|
|
### 4. 转换函数示例
|
|
|
|
```go
|
|
// IP地址转换函数
|
|
func RegisterIpToIntFunction() error {
|
|
return functions.RegisterCustomFunction(
|
|
"ip_to_int",
|
|
functions.TypeConversion,
|
|
"网络转换",
|
|
"将IP地址转换为整数",
|
|
1, 1,
|
|
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
|
ipStr, err := functions.ConvertToString(args[0])
|
|
if err != nil { return nil, err }
|
|
|
|
ip := net.ParseIP(ipStr)
|
|
if ip == nil {
|
|
return nil, fmt.Errorf("invalid IP address: %s", ipStr)
|
|
}
|
|
|
|
// 转换为IPv4
|
|
ip = ip.To4()
|
|
if ip == nil {
|
|
return nil, fmt.Errorf("not an IPv4 address: %s", ipStr)
|
|
}
|
|
|
|
return int64(ip[0])<<24 + int64(ip[1])<<16 + int64(ip[2])<<8 + int64(ip[3]), nil
|
|
},
|
|
)
|
|
}
|
|
|
|
// SQL使用示例:
|
|
// SELECT device, ip_to_int(client_ip) as ip_int FROM stream
|
|
```
|
|
|
|
### 5. 自定义聚合函数示例
|
|
|
|
对于聚合函数,需要同时实现函数和聚合器:
|
|
|
|
```go
|
|
// 1. 实现自定义聚合函数
|
|
type MedianAggFunction struct {
|
|
*functions.BaseFunction
|
|
}
|
|
|
|
func NewMedianAggFunction() *MedianAggFunction {
|
|
return &MedianAggFunction{
|
|
BaseFunction: functions.NewBaseFunction(
|
|
"median_agg",
|
|
functions.TypeAggregation,
|
|
"统计聚合",
|
|
"计算中位数",
|
|
1, -1,
|
|
),
|
|
}
|
|
}
|
|
|
|
func (f *MedianAggFunction) Validate(args []interface{}) error {
|
|
return f.ValidateArgCount(args)
|
|
}
|
|
|
|
func (f *MedianAggFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
|
// 聚合函数的Execute在这里可能不会被直接调用
|
|
// 实际逻辑在聚合器中实现
|
|
return nil, nil
|
|
}
|
|
|
|
// 2. 实现对应的聚合器
|
|
type MedianCustomAggregator struct {
|
|
values []float64
|
|
}
|
|
|
|
func (m *MedianCustomAggregator) New() aggregator.AggregatorFunction {
|
|
return &MedianCustomAggregator{
|
|
values: make([]float64, 0),
|
|
}
|
|
}
|
|
|
|
func (m *MedianCustomAggregator) Add(value interface{}) {
|
|
if val, err := functions.ConvertToFloat64(value); err == nil {
|
|
m.values = append(m.values, val)
|
|
}
|
|
}
|
|
|
|
func (m *MedianCustomAggregator) Result() interface{} {
|
|
if len(m.values) == 0 {
|
|
return 0.0
|
|
}
|
|
|
|
sort.Float64s(m.values)
|
|
mid := len(m.values) / 2
|
|
|
|
if len(m.values)%2 == 0 {
|
|
return (m.values[mid-1] + m.values[mid]) / 2
|
|
}
|
|
return m.values[mid]
|
|
}
|
|
|
|
// 3. 注册聚合器
|
|
func init() {
|
|
// 注册函数
|
|
functions.Register(NewMedianAggFunction())
|
|
|
|
// 注册聚合器
|
|
aggregator.Register("median_agg", func() aggregator.AggregatorFunction {
|
|
return &MedianCustomAggregator{}
|
|
})
|
|
}
|
|
|
|
// SQL使用示例:
|
|
// SELECT device, median_agg(temperature) as median_temp FROM stream GROUP BY device
|
|
```
|
|
|
|
## 📊 函数管理功能
|
|
|
|
### 查看已注册函数
|
|
|
|
```go
|
|
// 列出所有函数
|
|
allFunctions := functions.ListAll()
|
|
for name, fn := range allFunctions {
|
|
fmt.Printf("函数名: %s, 类型: %s, 描述: %s\n",
|
|
name, fn.GetType(), fn.GetDescription())
|
|
}
|
|
|
|
// 按类型查看函数
|
|
mathFunctions := functions.GetByType(functions.TypeMath)
|
|
for _, fn := range mathFunctions {
|
|
fmt.Printf("数学函数: %s - %s\n", fn.GetName(), fn.GetDescription())
|
|
}
|
|
|
|
// 检查函数是否存在
|
|
if fn, exists := functions.Get("my_function"); exists {
|
|
fmt.Printf("函数存在: %s\n", fn.GetDescription())
|
|
}
|
|
```
|
|
|
|
### 注销函数
|
|
|
|
```go
|
|
// 注销自定义函数
|
|
success := functions.Unregister("my_custom_function")
|
|
if success {
|
|
fmt.Println("函数注销成功")
|
|
}
|
|
```
|
|
|
|
## 🎯 最佳实践
|
|
|
|
### 1. 错误处理
|
|
|
|
```go
|
|
func (f *MyFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
|
// 1. 参数验证
|
|
if len(args) == 0 {
|
|
return nil, fmt.Errorf("至少需要一个参数")
|
|
}
|
|
|
|
// 2. 类型转换
|
|
val, err := functions.ConvertToFloat64(args[0])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("参数类型错误: %v", err)
|
|
}
|
|
|
|
// 3. 业务逻辑验证
|
|
if val < 0 {
|
|
return nil, fmt.Errorf("参数值必须为正数")
|
|
}
|
|
|
|
// 4. 计算逻辑
|
|
result := math.Sqrt(val)
|
|
|
|
return result, nil
|
|
}
|
|
```
|
|
|
|
### 2. 性能优化
|
|
|
|
```go
|
|
type CachedFunction struct {
|
|
*functions.BaseFunction
|
|
cache map[string]interface{}
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
func (f *CachedFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
|
// 生成缓存key
|
|
key := fmt.Sprintf("%v", args)
|
|
|
|
// 检查缓存
|
|
f.mutex.RLock()
|
|
if cached, exists := f.cache[key]; exists {
|
|
f.mutex.RUnlock()
|
|
return cached, nil
|
|
}
|
|
f.mutex.RUnlock()
|
|
|
|
// 计算结果
|
|
result := f.calculate(args)
|
|
|
|
// 存储到缓存
|
|
f.mutex.Lock()
|
|
f.cache[key] = result
|
|
f.mutex.Unlock()
|
|
|
|
return result, nil
|
|
}
|
|
```
|
|
|
|
### 3. 状态管理
|
|
|
|
```go
|
|
type StatefulFunction struct {
|
|
*functions.BaseFunction
|
|
counter int64
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
func (f *StatefulFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
|
|
f.mutex.Lock()
|
|
defer f.mutex.Unlock()
|
|
|
|
f.counter++
|
|
return f.counter, nil
|
|
}
|
|
```
|
|
|
|
## 🚨 注意事项
|
|
|
|
1. **线程安全**: 函数可能在多线程环境下并发执行,确保线程安全
|
|
2. **错误处理**: 总是返回有意义的错误信息
|
|
3. **类型转换**: 使用框架提供的转换函数进行类型转换
|
|
4. **性能考虑**: 避免在函数中执行耗时操作,考虑使用缓存
|
|
5. **资源管理**: 注意资源的申请和释放
|
|
6. **命名规范**: 使用清晰、描述性的函数名
|
|
|
|
## 📝 测试你的自定义函数
|
|
|
|
```go
|
|
func TestMyCustomFunction(t *testing.T) {
|
|
// 注册函数
|
|
err := functions.RegisterCustomFunction("test_func", /* ... */)
|
|
assert.NoError(t, err)
|
|
defer functions.Unregister("test_func")
|
|
|
|
// 获取函数
|
|
fn, exists := functions.Get("test_func")
|
|
assert.True(t, exists)
|
|
|
|
// 测试执行
|
|
ctx := &functions.FunctionContext{
|
|
Data: make(map[string]interface{}),
|
|
}
|
|
|
|
result, err := fn.Execute(ctx, []interface{}{10.0})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, expectedResult, result)
|
|
}
|
|
```
|
|
|
|
通过这个指南,你可以轻松扩展StreamSQL的功能,实现各种自定义函数来满足特定的业务需求。 |