mirror of
https://gitee.com/rulego/streamsql.git
synced 2025-09-04 04:44:00 +00:00
16 KiB
16 KiB
StreamSQL 函数系统整合指南
本文档说明 StreamSQL 如何整合自定义函数系统,以提供更强大和丰富的表达式计算能力,包括强大的 CASE 条件表达式支持。
🏗️ 架构概述
双引擎架构
StreamSQL 现在支持两套表达式引擎:
-
自定义 expr 引擎 (
expr/expression.go
)- 专门针对数值计算优化
- 支持基本数学运算和函数
- 轻量级,高性能
-
expr-lang/expr 引擎
- 功能强大的通用表达式语言
- 支持复杂数据类型(数组、对象、字符串等)
- 丰富的内置函数库
桥接系统
functions/expr_bridge.go
提供了统一的接口,自动选择最合适的引擎并整合两套函数系统。
条件表达式系统
StreamSQL 内置了强大的 CASE 表达式支持,能够智能选择表达式引擎:
- 简单条件 → 自定义 expr 引擎(高性能)
- 复杂嵌套 → expr-lang/expr 引擎(功能完整)
📚 可用函数
StreamSQL 内置函数
数学函数 (TypeMath)
函数 | 描述 | 示例 |
---|---|---|
abs(x) |
绝对值 | abs(-5) → 5 |
sqrt(x) |
平方根 | sqrt(16) → 4 |
acos(x) |
反余弦 | acos(0.5) → 1.047 |
asin(x) |
反正弦 | asin(0.5) → 0.524 |
atan(x) |
反正切 | atan(1) → 0.785 |
atan2(y,x) |
双参数反正切 | atan2(1,1) → 0.785 |
bitand(a,b) |
按位与 | bitand(5,3) → 1 |
bitor(a,b) |
按位或 | bitor(5,3) → 7 |
bitxor(a,b) |
按位异或 | bitxor(5,3) → 6 |
bitnot(x) |
按位非 | bitnot(5) → -6 |
ceiling(x) |
向上取整 | ceiling(3.2) → 4 |
cos(x) |
余弦 | cos(0) → 1 |
cosh(x) |
双曲余弦 | cosh(0) → 1 |
exp(x) |
e的x次幂 | exp(1) → 2.718 |
floor(x) |
向下取整 | floor(3.8) → 3 |
ln(x) |
自然对数 | ln(2.718) → 1 |
power(x,y) |
x的y次幂 | power(2,3) → 8 |
字符串函数 (TypeString)
函数 | 描述 | 示例 |
---|---|---|
concat(s1,s2,...) |
字符串连接 | concat("hello"," ","world") → "hello world" |
length(s) |
字符串长度 | length("hello") → 5 |
upper(s) |
转大写 | upper("hello") → "HELLO" |
lower(s) |
转小写 | lower("HELLO") → "hello" |
转换函数 (TypeConversion)
函数 | 描述 | 示例 |
---|---|---|
cast(value, type) |
类型转换 | cast("123", "int64") → 123 |
hex2dec(hex) |
十六进制转十进制 | hex2dec("ff") → 255 |
dec2hex(num) |
十进制转十六进制 | dec2hex(255) → "ff" |
encode(data, format) |
编码 | encode("hello", "base64") → "aGVsbG8=" |
decode(data, format) |
解码 | decode("aGVsbG8=", "base64") → "hello" |
时间日期函数 (TypeDateTime)
函数 | 描述 | 示例 |
---|---|---|
now() |
当前时间戳 | now() → 1640995200 |
current_time() |
当前时间(HH:MM:SS) | current_time() → "14:30:25" |
current_date() |
当前日期(YYYY-MM-DD) | current_date() → "2025-01-01" |
聚合函数 (TypeAggregation)
函数 | 描述 | 示例 |
---|---|---|
sum(...) |
求和 | sum(1,2,3) → 6 |
avg(...) |
平均值 | avg(1,2,3) → 2 |
min(...) |
最小值 | min(1,2,3) → 1 |
max(...) |
最大值 | max(1,2,3) → 3 |
count(...) |
计数 | count(1,2,3) → 3 |
stddev(...) |
标准差 | stddev(1,2,3) → 0.816 |
median(...) |
中位数 | median(1,2,3) → 2 |
expr-lang/expr 内置函数
数学函数
函数 | 描述 | 示例 |
---|---|---|
abs(x) |
绝对值 | abs(-5) → 5 |
ceil(x) |
向上取整 | ceil(3.2) → 4 |
floor(x) |
向下取整 | floor(3.8) → 3 |
round(x) |
四舍五入 | round(3.6) → 4 |
max(a,b) |
最大值 | max(5,3) → 5 |
min(a,b) |
最小值 | min(5,3) → 3 |
字符串函数
函数 | 描述 | 示例 |
---|---|---|
trim(s) |
去除首尾空格 | trim(" hello ") → "hello" |
upper(s) |
转大写 | upper("hello") → "HELLO" |
lower(s) |
转小写 | lower("HELLO") → "hello" |
split(s, delimiter) |
分割字符串 | split("a,b,c", ",") → ["a","b","c"] |
replace(s, old, new) |
替换字符串 | replace("hello", "l", "x") → "hexxo" |
indexOf(s, sub) |
查找子串位置 | indexOf("hello", "ll") → 2 |
hasPrefix(s, prefix) |
检查前缀 | hasPrefix("hello", "he") → true |
hasSuffix(s, suffix) |
检查后缀 | hasSuffix("hello", "lo") → true |
数组/集合函数
函数 | 描述 | 示例 |
---|---|---|
all(array, predicate) |
所有元素满足条件 | all([2,4,6], # % 2 == 0) → true |
any(array, predicate) |
任一元素满足条件 | any([1,3,4], # % 2 == 0) → true |
filter(array, predicate) |
过滤元素 | filter([1,2,3,4], # > 2) → [3,4] |
map(array, expression) |
转换元素 | map([1,2,3], # * 2) → [2,4,6] |
find(array, predicate) |
查找元素 | find([1,2,3], # > 2) → 3 |
count(array, predicate) |
计数满足条件的元素 | count([1,2,3,4], # > 2) → 2 |
concat(array1, array2) |
连接数组 | concat([1,2], [3,4]) → [1,2,3,4] |
flatten(array) |
展平数组 | flatten([[1,2],[3,4]]) → [1,2,3,4] |
len(value) |
获取长度 | len([1,2,3]) → 3 |
时间函数
函数 | 描述 | 示例 |
---|---|---|
now() |
当前时间 | now() → 时间对象 |
duration(s) |
解析时间段 | duration("1h30m") → 时间段对象 |
date(s) |
解析日期 | date("2023-12-01") → 日期对象 |
类型转换函数
函数 | 描述 | 示例 |
---|---|---|
int(x) |
转整数 | int("123") → 123 |
float(x) |
转浮点数 | float("123.45") → 123.45 |
string(x) |
转字符串 | string(123) → "123" |
type(x) |
获取类型 | type(123) → "int" |
JSON/编码函数
函数 | 描述 | 示例 |
---|---|---|
toJSON(x) |
转JSON | toJSON({"a":1}) → '{"a":1}' |
fromJSON(s) |
解析JSON | fromJSON('{"a":1}') → {"a":1} |
toBase64(s) |
Base64编码 | toBase64("hello") → "aGVsbG8=" |
fromBase64(s) |
Base64解码 | fromBase64("aGVsbG8=") → "hello" |
🎯 条件表达式
CASE表达式
StreamSQL 支持强大的 CASE 条件表达式,用于实现复杂的条件逻辑判断。
语法支持
搜索CASE表达式:
CASE
WHEN condition1 THEN result1
WHEN condition2 THEN result2
...
ELSE default_result
END
简单CASE表达式:
CASE expression
WHEN value1 THEN result1
WHEN value2 THEN result2
...
ELSE default_result
END
功能特性
特性 | 支持状态 | 描述 |
---|---|---|
基本条件判断 | ✅ | 支持 WHEN/THEN/ELSE 逻辑 |
多重条件 | ✅ | 支持多个 WHEN 子句 |
逻辑运算符 | ✅ | 支持 AND、OR、NOT 操作 |
比较操作符 | ✅ | 支持 >、<、>=、<=、=、!= 等 |
数学函数 | ✅ | 支持 ABS、ROUND、CEIL 等函数调用 |
算术表达式 | ✅ | 支持 +、-、*、/ 运算 |
字符串操作 | ✅ | 支持字符串字面量和函数 |
聚合集成 | ✅ | 可在 SUM、AVG、COUNT 等聚合函数中使用 |
字段引用 | ✅ | 支持动态字段提取和计算 |
嵌套CASE | ⚠️ | 部分支持(回退到 expr-lang) |
使用示例
设备状态分类:
SELECT deviceId,
CASE
WHEN temperature > 30 AND humidity > 70 THEN 'CRITICAL'
WHEN temperature > 25 OR humidity > 80 THEN 'WARNING'
ELSE 'NORMAL'
END as alert_level
FROM stream
条件聚合统计:
SELECT deviceId,
COUNT(CASE WHEN temperature > 25 THEN 1 END) as high_temp_count,
SUM(CASE WHEN status = 'active' THEN temperature ELSE 0 END) as active_temp_sum,
AVG(CASE WHEN humidity > 50 THEN humidity END) as avg_high_humidity
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
数学函数和算术表达式:
SELECT deviceId,
CASE
WHEN ABS(temperature - 25) < 5 THEN 'NORMAL'
WHEN temperature * 1.8 + 32 > 100 THEN 'HOT_F'
WHEN ROUND(temperature) = 20 THEN 'EXACT_20'
ELSE 'OTHER'
END as temp_classification
FROM stream
状态码映射:
SELECT deviceId,
CASE status
WHEN 'active' THEN 1
WHEN 'inactive' THEN 0
WHEN 'maintenance' THEN -1
ELSE -999
END as status_code
FROM stream
表达式引擎选择
CASE表达式的处理遵循以下规则:
- 简单条件 → 使用自定义 expr 引擎(高性能)
- 嵌套CASE或复杂表达式 → 自动回退到 expr-lang/expr(功能完整)
- 混合函数调用 → 智能选择最合适的引擎
性能优化
- 条件顺序:将最常见的条件放在前面
- 函数调用:避免在条件中重复调用相同函数
- 类型一致性:保持THEN子句返回相同类型以避免转换开销
🔧 使用方法
基本使用
import "github.com/rulego/streamsql/functions"
// 直接使用桥接器评估表达式
result, err := functions.EvaluateWithBridge("abs(-5) + len([1,2,3])", map[string]interface{}{})
// result: 8 (5 + 3)
// CASE表达式示例
caseResult, err := functions.EvaluateWithBridge(
"CASE WHEN temperature > 30 THEN 'HOT' ELSE 'NORMAL' END",
map[string]interface{}{"temperature": 35.0})
// caseResult: "HOT"
在 SQL 查询中使用
-- 使用 StreamSQL 函数
SELECT device, abs(temperature - 20) as deviation
FROM stream;
-- 使用 expr-lang 函数
SELECT device, filter(measurements, # > 10) as high_values
FROM stream;
-- 混合使用
SELECT device, encode(concat(device, "_", string(now())), "base64") as device_id
FROM stream;
表达式引擎选择
表达式引擎会自动选择:
- 简单数值表达式 → 使用自定义 expr 引擎(更快)
- 复杂表达式或使用高级函数 → 使用 expr-lang/expr(更强大)
函数冲突解决
当两个系统有同名函数时:
- 默认优先级:expr-lang/expr > StreamSQL
- 访问 StreamSQL 版本:使用
streamsql_
前缀,如streamsql_abs(-5)
- 明确指定:通过函数解析器手动选择
🛠️ 高级用法
获取所有可用函数
info := functions.GetAllAvailableFunctions()
streamSQLFuncs := info["streamsql"]
exprLangFuncs := info["expr-lang"]
自定义函数注册
// 注册到 StreamSQL 系统
err := functions.RegisterCustomFunction("celsius_to_fahrenheit",
functions.TypeMath, "温度转换", "摄氏度转华氏度", 1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
celsius, _ := functions.ConvertToFloat64(args[0])
return celsius*1.8 + 32, nil
})
// 函数会自动在两个引擎中可用
表达式编译和缓存
bridge := functions.GetExprBridge()
// 编译表达式(可缓存)
program, err := bridge.CompileExpressionWithStreamSQLFunctions(
"abs(temperature - 20) > 5",
map[string]interface{}{"temperature": 0.0})
// 重复执行(高性能)
result, err := expr.Run(program, map[string]interface{}{"temperature": 25.5})
🔍 性能考虑
选择合适的引擎
- 纯数值计算:优先使用自定义 expr 引擎
- 字符串/数组操作:使用 expr-lang/expr
- 复杂逻辑表达式:使用 expr-lang/expr
优化建议
- 预编译表达式:对于重复使用的表达式,预编译以提高性能
- 函数选择:优先使用性能更好的版本
- 数据类型:避免不必要的类型转换
📝 示例
温度监控
SELECT
device,
temperature,
abs(temperature - 20) as deviation,
CASE
WHEN temperature > 30 THEN "hot"
WHEN temperature < 10 THEN "cold"
ELSE "normal"
END as status,
encode(concat(device, "_", current_date()), "base64") as device_key
FROM temperature_stream
WHERE abs(temperature - 20) > 5;
智能告警系统
SELECT
device_id,
timestamp,
temperature,
humidity,
pressure,
-- 多级告警判断
CASE
WHEN temperature > 40 AND humidity > 80 THEN 'CRITICAL_HEAT_HUMID'
WHEN temperature > 35 OR humidity > 90 THEN 'WARNING_HIGH'
WHEN temperature < 5 AND pressure < 950 THEN 'CRITICAL_COLD_LOW_PRESSURE'
WHEN ABS(temperature - 25) < 2 AND humidity BETWEEN 40 AND 60 THEN 'OPTIMAL'
ELSE 'NORMAL'
END as alert_level,
-- 设备状态映射
CASE device_status
WHEN 'online' THEN 1
WHEN 'offline' THEN 0
WHEN 'maintenance' THEN -1
ELSE -999
END as status_code,
-- 条件计算
CASE
WHEN temperature > 0 THEN ROUND(temperature * 1.8 + 32, 1)
ELSE NULL
END as fahrenheit_temp
FROM sensor_stream
WHERE device_id IS NOT NULL;
条件聚合分析
SELECT
device_type,
location,
-- 条件计数
COUNT(CASE WHEN temperature > 30 THEN 1 END) as hot_readings,
COUNT(CASE WHEN temperature < 10 THEN 1 END) as cold_readings,
COUNT(CASE WHEN humidity > 70 THEN 1 END) as humid_readings,
-- 条件求和
SUM(CASE WHEN status = 'active' THEN power_consumption ELSE 0 END) as active_power_sum,
-- 条件平均值
AVG(CASE WHEN temperature BETWEEN 20 AND 30 THEN temperature END) as normal_temp_avg,
-- 复杂条件统计
COUNT(CASE
WHEN temperature > 25 AND humidity < 60 AND status = 'active'
THEN 1
END) as optimal_active_count
FROM device_stream
GROUP BY device_type, location, TumblingWindow('10m')
HAVING COUNT(*) > 100;
数据处理
SELECT
sensor_id,
filter(readings, # > avg(readings)) as above_average,
map(readings, round(#, 2)) as rounded_readings,
len(readings) as reading_count
FROM sensor_data
WHERE len(readings) > 10;