forked from GiteaTest2015/streamsql
14 KiB
14 KiB
StreamSQL 自定义函数开发指南
🚀 概述
StreamSQL 提供了强大而灵活的自定义函数系统,支持用户根据业务需求扩展各种类型的函数,包括数学函数、字符串函数、聚合函数、分析函数等。
📋 函数类型分类
内置函数类型
const (
TypeAggregation FunctionType = "aggregation" // 聚合函数
TypeWindow FunctionType = "window" // 窗口函数
TypeDateTime FunctionType = "datetime" // 时间日期函数
TypeConversion FunctionType = "conversion" // 转换函数
TypeMath FunctionType = "math" // 数学函数
TypeString FunctionType = "string" // 字符串函数
TypeAnalytical FunctionType = "analytical" // 分析函数
TypeCustom FunctionType = "custom" // 用户自定义函数
)
🛠️ 自定义函数实现方式
方式一:快速注册(推荐简单函数)
import "github.com/rulego/streamsql/functions"
// 注册一个简单的数学函数
err := functions.RegisterCustomFunction(
"double", // 函数名
functions.TypeMath, // 函数类型
"数学函数", // 分类描述
"将数值乘以2", // 函数描述
1, // 最少参数个数
1, // 最多参数个数
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
val, err := functions.ConvertToFloat64(args[0])
if err != nil {
return nil, err
}
return val * 2, nil
},
)
方式二:完整结构体实现(推荐复杂函数)
// 1. 定义函数结构体
type AdvancedMathFunction struct {
*functions.BaseFunction
// 可以添加状态变量
cache map[string]interface{}
}
// 2. 实现构造函数
func NewAdvancedMathFunction() *AdvancedMathFunction {
return &AdvancedMathFunction{
BaseFunction: functions.NewBaseFunction(
"advanced_calc", // 函数名
functions.TypeMath, // 函数类型
"高级数学函数", // 分类
"高级数学计算", // 描述
2, // 最少参数
3, // 最多参数
),
cache: make(map[string]interface{}),
}
}
// 3. 实现验证方法(可选,如有特殊验证需求)
func (f *AdvancedMathFunction) Validate(args []interface{}) error {
if err := f.ValidateArgCount(args); err != nil {
return err
}
// 自定义验证逻辑
if len(args) >= 2 {
if _, err := functions.ConvertToFloat64(args[0]); err != nil {
return fmt.Errorf("第一个参数必须是数值")
}
if _, err := functions.ConvertToFloat64(args[1]); err != nil {
return fmt.Errorf("第二个参数必须是数值")
}
}
return nil
}
// 4. 实现执行方法
func (f *AdvancedMathFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
a, _ := functions.ConvertToFloat64(args[0])
b, _ := functions.ConvertToFloat64(args[1])
operation := "add" // 默认操作
if len(args) > 2 {
op, err := functions.ConvertToString(args[2])
if err == nil {
operation = op
}
}
switch operation {
case "add":
return a + b, nil
case "multiply":
return a * b, nil
case "power":
return math.Pow(a, b), nil
default:
return nil, fmt.Errorf("不支持的操作: %s", operation)
}
}
// 5. 注册函数
func init() {
functions.Register(NewAdvancedMathFunction())
}
🎯 各类型函数实现示例
1. 数学函数示例
// 距离计算函数
func RegisterDistanceFunction() error {
return functions.RegisterCustomFunction(
"distance",
functions.TypeMath,
"几何数学",
"计算两点间距离",
4, 4,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
x1, err := functions.ConvertToFloat64(args[0])
if err != nil { return nil, err }
y1, err := functions.ConvertToFloat64(args[1])
if err != nil { return nil, err }
x2, err := functions.ConvertToFloat64(args[2])
if err != nil { return nil, err }
y2, err := functions.ConvertToFloat64(args[3])
if err != nil { return nil, err }
distance := math.Sqrt(math.Pow(x2-x1, 2) + math.Pow(y2-y1, 2))
return distance, nil
},
)
}
// SQL使用示例:
// SELECT device, distance(lat1, lon1, lat2, lon2) as dist FROM stream
2. 字符串函数示例
// JSON提取函数
func RegisterJsonExtractFunction() error {
return functions.RegisterCustomFunction(
"json_extract",
functions.TypeString,
"JSON处理",
"从JSON字符串中提取字段值",
2, 2,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
jsonStr, err := functions.ConvertToString(args[0])
if err != nil { return nil, err }
path, err := functions.ConvertToString(args[1])
if err != nil { return nil, err }
var data map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &data); err != nil {
return nil, fmt.Errorf("invalid JSON: %v", err)
}
// 简单路径提取(可扩展为复杂JSONPath)
value, exists := data[path]
if !exists {
return nil, nil
}
return value, nil
},
)
}
// SQL使用示例:
// SELECT device, json_extract(metadata, 'version') as version FROM stream
3. 时间日期函数示例
// 时间格式化函数
func RegisterDateFormatFunction() error {
return functions.RegisterCustomFunction(
"date_format",
functions.TypeDateTime,
"时间格式化",
"格式化时间戳为指定格式",
2, 2,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
timestamp, err := functions.ConvertToInt64(args[0])
if err != nil { return nil, err }
format, err := functions.ConvertToString(args[1])
if err != nil { return nil, err }
t := time.Unix(timestamp, 0)
// 支持常见格式
switch format {
case "YYYY-MM-DD":
return t.Format("2006-01-02"), nil
case "YYYY-MM-DD HH:mm:ss":
return t.Format("2006-01-02 15:04:05"), nil
case "RFC3339":
return t.Format(time.RFC3339), nil
default:
return t.Format(format), nil
}
},
)
}
// SQL使用示例:
// SELECT device, date_format(timestamp, 'YYYY-MM-DD') as date FROM stream
4. 转换函数示例
// IP地址转换函数
func RegisterIpToIntFunction() error {
return functions.RegisterCustomFunction(
"ip_to_int",
functions.TypeConversion,
"网络转换",
"将IP地址转换为整数",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
ipStr, err := functions.ConvertToString(args[0])
if err != nil { return nil, err }
ip := net.ParseIP(ipStr)
if ip == nil {
return nil, fmt.Errorf("invalid IP address: %s", ipStr)
}
// 转换为IPv4
ip = ip.To4()
if ip == nil {
return nil, fmt.Errorf("not an IPv4 address: %s", ipStr)
}
return int64(ip[0])<<24 + int64(ip[1])<<16 + int64(ip[2])<<8 + int64(ip[3]), nil
},
)
}
// SQL使用示例:
// SELECT device, ip_to_int(client_ip) as ip_int FROM stream
5. 自定义聚合函数示例
对于聚合函数,需要同时实现函数和聚合器:
// 1. 实现自定义聚合函数
type MedianAggFunction struct {
*functions.BaseFunction
}
func NewMedianAggFunction() *MedianAggFunction {
return &MedianAggFunction{
BaseFunction: functions.NewBaseFunction(
"median_agg",
functions.TypeAggregation,
"统计聚合",
"计算中位数",
1, -1,
),
}
}
func (f *MedianAggFunction) Validate(args []interface{}) error {
return f.ValidateArgCount(args)
}
func (f *MedianAggFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
// 聚合函数的Execute在这里可能不会被直接调用
// 实际逻辑在聚合器中实现
return nil, nil
}
// 2. 实现对应的聚合器
type MedianCustomAggregator struct {
values []float64
}
func (m *MedianCustomAggregator) New() aggregator.AggregatorFunction {
return &MedianCustomAggregator{
values: make([]float64, 0),
}
}
func (m *MedianCustomAggregator) Add(value interface{}) {
if val, err := functions.ConvertToFloat64(value); err == nil {
m.values = append(m.values, val)
}
}
func (m *MedianCustomAggregator) Result() interface{} {
if len(m.values) == 0 {
return 0.0
}
sort.Float64s(m.values)
mid := len(m.values) / 2
if len(m.values)%2 == 0 {
return (m.values[mid-1] + m.values[mid]) / 2
}
return m.values[mid]
}
// 3. 注册聚合器
func init() {
// 注册函数
functions.Register(NewMedianAggFunction())
// 注册聚合器
aggregator.Register("median_agg", func() aggregator.AggregatorFunction {
return &MedianCustomAggregator{}
})
}
// SQL使用示例:
// SELECT device, median_agg(temperature) as median_temp FROM stream GROUP BY device
📊 函数管理功能
查看已注册函数
// 列出所有函数
allFunctions := functions.ListAll()
for name, fn := range allFunctions {
fmt.Printf("函数名: %s, 类型: %s, 描述: %s\n",
name, fn.GetType(), fn.GetDescription())
}
// 按类型查看函数
mathFunctions := functions.GetByType(functions.TypeMath)
for _, fn := range mathFunctions {
fmt.Printf("数学函数: %s - %s\n", fn.GetName(), fn.GetDescription())
}
// 检查函数是否存在
if fn, exists := functions.Get("my_function"); exists {
fmt.Printf("函数存在: %s\n", fn.GetDescription())
}
注销函数
// 注销自定义函数
success := functions.Unregister("my_custom_function")
if success {
fmt.Println("函数注销成功")
}
🎯 最佳实践
1. 错误处理
func (f *MyFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
// 1. 参数验证
if len(args) == 0 {
return nil, fmt.Errorf("至少需要一个参数")
}
// 2. 类型转换
val, err := functions.ConvertToFloat64(args[0])
if err != nil {
return nil, fmt.Errorf("参数类型错误: %v", err)
}
// 3. 业务逻辑验证
if val < 0 {
return nil, fmt.Errorf("参数值必须为正数")
}
// 4. 计算逻辑
result := math.Sqrt(val)
return result, nil
}
2. 性能优化
type CachedFunction struct {
*functions.BaseFunction
cache map[string]interface{}
mutex sync.RWMutex
}
func (f *CachedFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
// 生成缓存key
key := fmt.Sprintf("%v", args)
// 检查缓存
f.mutex.RLock()
if cached, exists := f.cache[key]; exists {
f.mutex.RUnlock()
return cached, nil
}
f.mutex.RUnlock()
// 计算结果
result := f.calculate(args)
// 存储到缓存
f.mutex.Lock()
f.cache[key] = result
f.mutex.Unlock()
return result, nil
}
3. 状态管理
type StatefulFunction struct {
*functions.BaseFunction
counter int64
mutex sync.Mutex
}
func (f *StatefulFunction) Execute(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
f.counter++
return f.counter, nil
}
🚨 注意事项
- 线程安全: 函数可能在多线程环境下并发执行,确保线程安全
- 错误处理: 总是返回有意义的错误信息
- 类型转换: 使用框架提供的转换函数进行类型转换
- 性能考虑: 避免在函数中执行耗时操作,考虑使用缓存
- 资源管理: 注意资源的申请和释放
- 命名规范: 使用清晰、描述性的函数名
📝 测试你的自定义函数
func TestMyCustomFunction(t *testing.T) {
// 注册函数
err := functions.RegisterCustomFunction("test_func", /* ... */)
assert.NoError(t, err)
defer functions.Unregister("test_func")
// 获取函数
fn, exists := functions.Get("test_func")
assert.True(t, exists)
// 测试执行
ctx := &functions.FunctionContext{
Data: make(map[string]interface{}),
}
result, err := fn.Execute(ctx, []interface{}{10.0})
assert.NoError(t, err)
assert.Equal(t, expectedResult, result)
}
通过这个指南,你可以轻松扩展StreamSQL的功能,实现各种自定义函数来满足特定的业务需求。