Files
streamsql/docs/FUNCTIONS_USAGE_GUIDE.md
2025-06-12 18:58:37 +08:00

21 KiB
Raw Permalink Blame History

StreamSQL 函数使用指南

StreamSQL 具有丰富的内置函数,可以对数据执行各种计算。所有函数都支持在流式处理环境中使用,部分函数支持增量计算以提高性能。

📊 聚合函数

聚合函数对一组值执行计算并返回单个值。聚合函数只能用在以下表达式中:

  • SELECT 语句的 SELECT 列表(子查询或外部查询)
  • HAVING 子句

SUM - 求和函数

语法: sum(col)
描述: 返回组中数值的总和。空值不参与计算。
增量计算: 支持
示例:

SELECT device, sum(temperature) as total_temp 
FROM stream 
GROUP BY device, TumblingWindow('10s')

AVG - 平均值函数

语法: avg(col)
描述: 返回组中数值的平均值。空值不参与计算。
增量计算: 支持
示例:

SELECT device, avg(temperature) as avg_temp 
FROM stream 
GROUP BY device, TumblingWindow('10s')

COUNT - 计数函数

语法: count(*)
描述: 返回组中的行数。
增量计算: 支持
示例:

SELECT device, count(*) as record_count 
FROM stream 
GROUP BY device, TumblingWindow('10s')

MIN - 最小值函数

语法: min(col)
描述: 返回组中数值的最小值。空值不参与计算。
增量计算: 支持
示例:

SELECT device, min(temperature) as min_temp 
FROM stream 
GROUP BY device, TumblingWindow('10s')

MAX - 最大值函数

语法: max(col)
描述: 返回组中数值的最大值。空值不参与计算。
增量计算: 支持
示例:

SELECT device, max(temperature) as max_temp 
FROM stream 
GROUP BY device, TumblingWindow('10s')

COLLECT - 收集函数

语法: collect(col)
描述: 获取当前窗口所有消息的列值组成的数组。
增量计算: 支持
示例:

SELECT device, collect(temperature) as temp_values 
FROM stream 
GROUP BY device, TumblingWindow('10s')

LAST_VALUE - 最后值函数

语法: last_value(col)
描述: 返回组中最后一行的值。
增量计算: 支持
示例:

SELECT device, last_value(temperature) as last_temp 
FROM stream 
GROUP BY device, TumblingWindow('10s')

MERGE_AGG - 合并聚合函数

语法: merge_agg(col)
描述: 将组中的值合并为单个值。对于对象类型,合并所有键值对;对于其他类型,用逗号连接。
增量计算: 支持
示例:

SELECT device, merge_agg(status) as all_status 
FROM stream 
GROUP BY device, TumblingWindow('10s')

DEDUPLICATE - 去重函数

语法: deduplicate(col, false)
描述: 返回当前组去重的结果,通常用在窗口中。第二个参数指定是否返回全部结果。
增量计算: 支持
示例:

SELECT device, deduplicate(temperature, true) as unique_temps 
FROM stream 
GROUP BY device, TumblingWindow('10s')

STDDEV - 标准差函数

语法: stddev(col)
描述: 返回组中所有值的总体标准差。空值不参与计算。
增量计算: 支持(使用韦尔福德算法优化)
示例:

SELECT device, stddev(temperature) as temp_stddev 
FROM stream 
GROUP BY device, TumblingWindow('10s')

STDDEVS - 样本标准差函数

语法: stddevs(col)
描述: 返回组中所有值的样本标准差。空值不参与计算。
增量计算: 支持(使用韦尔福德算法优化)
示例:

SELECT device, stddevs(temperature) as temp_sample_stddev 
FROM stream 
GROUP BY device, TumblingWindow('10s')

VAR - 方差函数

语法: var(col)
描述: 返回组中所有值的总体方差。空值不参与计算。
增量计算: 支持(使用韦尔福德算法优化)
示例:

SELECT device, var(temperature) as temp_variance 
FROM stream 
GROUP BY device, TumblingWindow('10s')

VARS - 样本方差函数

语法: vars(col)
描述: 返回组中所有值的样本方差。空值不参与计算。
增量计算: 支持(使用韦尔福德算法优化)
示例:

SELECT device, vars(temperature) as temp_sample_variance 
FROM stream 
GROUP BY device, TumblingWindow('10s')

MEDIAN - 中位数函数

语法: median(col)
描述: 返回组中所有值的中位数。空值不参与计算。
增量计算: 支持
示例:

SELECT device, median(temperature) as temp_median 
FROM stream 
GROUP BY device, TumblingWindow('10s')

PERCENTILE - 百分位数函数

语法: percentile(col, 0.5)
描述: 返回组中所有值的指定百分位数。第二个参数指定百分位数的值,取值范围为 0.0 ~ 1.0。
增量计算: 支持
示例:

SELECT device, percentile(temperature, 0.95) as temp_p95 
FROM stream 
GROUP BY device, TumblingWindow('10s')

🔍 分析函数

分析函数用于在数据流中进行复杂的分析计算,支持状态管理和历史数据访问。

LAG - 滞后函数

语法: lag(col, offset, default_value)
描述: 返回当前行之前的第N行的值。offset指定偏移量default_value为默认值。
增量计算: 支持
示例:

SELECT device, temperature, lag(temperature, 1) as prev_temp 
FROM stream 
GROUP BY device, TumblingWindow('10s')

LATEST - 最新值函数

语法: latest(col)
描述: 返回指定列的最新值。
增量计算: 支持
示例:

SELECT device, latest(temperature) as current_temp 
FROM stream 
GROUP BY device, TumblingWindow('10s')

CHANGED_COL - 变化列函数

语法: changed_col(row_data)
描述: 返回发生变化的列名数组。
增量计算: 支持
示例:

SELECT device, changed_col(*) as changed_columns 
FROM stream 
GROUP BY device, TumblingWindow('10s')

HAD_CHANGED - 变化检测函数

语法: had_changed(col)
描述: 判断指定列的值是否发生变化,返回布尔值。
增量计算: 支持
示例:

SELECT device, had_changed(status) as status_changed 
FROM stream 
GROUP BY device, TumblingWindow('10s')

🪟 窗口函数

窗口函数提供窗口相关的信息。

WINDOW_START - 窗口开始时间

语法: window_start()
描述: 返回当前窗口的开始时间。
增量计算: 支持
示例:

SELECT device, window_start() as window_begin, avg(temperature) as avg_temp 
FROM stream 
GROUP BY device, TumblingWindow('10s')

WINDOW_END - 窗口结束时间

语法: window_end()
描述: 返回当前窗口的结束时间。
增量计算: 支持
示例:

SELECT device, window_end() as window_finish, avg(temperature) as avg_temp 
FROM stream 
GROUP BY device, TumblingWindow('10s')

🧮 数学函数

数学函数用于数值计算。

ABS - 绝对值函数

语法: abs(number)
描述: 返回数值的绝对值。

SQRT - 平方根函数

语法: sqrt(number)
描述: 返回数值的平方根。

POWER - 幂函数

语法: power(base, exponent)
描述: 返回底数的指定次幂。

CEILING - 向上取整函数

语法: ceiling(number)
描述: 返回大于或等于指定数值的最小整数。

FLOOR - 向下取整函数

语法: floor(number)
描述: 返回小于或等于指定数值的最大整数。

ROUND - 四舍五入函数

语法: round(number, [precision])
描述: 将数值四舍五入到指定的小数位数。

MOD - 取模函数

语法: mod(dividend, divisor)
描述: 返回除法运算的余数。

RAND - 随机数函数

语法: rand()
描述: 返回0到1之间的随机数。

SIGN - 符号函数

语法: sign(number)
描述: 返回数值的符号(-1、0或1

三角函数

SIN - 正弦函数

语法: sin(number)
描述: 返回角度的正弦值(弧度制)。

COS - 余弦函数

语法: cos(number)
描述: 返回角度的余弦值(弧度制)。

TAN - 正切函数

语法: tan(number)
描述: 返回角度的正切值(弧度制)。

ASIN - 反正弦函数

语法: asin(number)
描述: 返回数值的反正弦值(弧度制)。

ACOS - 反余弦函数

语法: acos(number)
描述: 返回数值的反余弦值(弧度制)。

ATAN - 反正切函数

语法: atan(number)
描述: 返回数值的反正切值(弧度制)。

ATAN2 - 双参数反正切函数

语法: atan2(y, x)
描述: 返回y/x的反正切值弧度制

双曲函数

SINH - 双曲正弦函数

语法: sinh(number)
描述: 返回数值的双曲正弦值。

COSH - 双曲余弦函数

语法: cosh(number)
描述: 返回数值的双曲余弦值。

TANH - 双曲正切函数

语法: tanh(number)
描述: 返回数值的双曲正切值。

对数和指数函数

EXP - 指数函数

语法: exp(number)
描述: 返回e的指定次幂。

LN - 自然对数函数

语法: ln(number)
描述: 返回数值的自然对数。

LOG - 对数函数

语法: log(base, number)
描述: 返回指定底数的对数。

LOG10 - 常用对数函数

语法: log10(number)
描述: 返回数值的常用对数以10为底

LOG2 - 二进制对数函数

语法: log2(number)
描述: 返回数值的二进制对数以2为底

位运算函数

BIT_AND - 位与函数

语法: bit_and(number1, number2)
描述: 对两个整数执行位与运算。

BIT_OR - 位或函数

语法: bit_or(number1, number2)
描述: 对两个整数执行位或运算。

BIT_XOR - 位异或函数

语法: bit_xor(number1, number2)
描述: 对两个整数执行位异或运算。

BIT_NOT - 位非函数

语法: bit_not(number)
描述: 对整数执行位非运算。

📝 字符串函数

字符串函数用于文本处理。

UPPER - 转大写函数

语法: upper(str)
描述: 将字符串转换为大写。

LOWER - 转小写函数

语法: lower(str)
描述: 将字符串转换为小写。

CONCAT - 字符串连接函数

语法: concat(str1, str2, ...)
描述: 连接多个字符串。

LENGTH - 字符串长度函数

语法: length(str)
描述: 返回字符串的长度。

SUBSTRING - 子字符串函数

语法: substring(str, start, [length])
描述: 从字符串中提取子字符串。

TRIM - 去除空格函数

语法: trim(str)
描述: 去除字符串两端的空格。

LTRIM - 去除左侧空格函数

语法: ltrim(str)
描述: 去除字符串左侧的空格。

RTRIM - 去除右侧空格函数

语法: rtrim(str)
描述: 去除字符串右侧的空格。

FORMAT - 格式化函数

语法: format(format_str, ...)
描述: 按照指定格式格式化字符串。

ENDSWITH - 结尾检查函数

语法: endswith(str, suffix)
描述: 检查字符串是否以指定后缀结尾。

STARTSWITH - 开头检查函数

语法: startswith(str, prefix)
描述: 检查字符串是否以指定前缀开头。

INDEXOF - 查找位置函数

语法: indexof(str, substring)
描述: 返回子字符串在字符串中的位置。

REPLACE - 替换函数

语法: replace(str, old_str, new_str)
描述: 替换字符串中的指定内容。

SPLIT - 分割函数

语法: split(str, delimiter)
描述: 按照分隔符分割字符串。

LPAD - 左填充函数

语法: lpad(str, length, pad_str)
描述: 在字符串左侧填充字符到指定长度。

RPAD - 右填充函数

语法: rpad(str, length, pad_str)
描述: 在字符串右侧填充字符到指定长度。

正则表达式函数

REGEXP_MATCHES - 正则匹配函数

语法: regexp_matches(str, pattern)
描述: 检查字符串是否匹配正则表达式。

REGEXP_REPLACE - 正则替换函数

语法: regexp_replace(str, pattern, replacement)
描述: 使用正则表达式替换字符串内容。

REGEXP_SUBSTRING - 正则提取函数

语法: regexp_substring(str, pattern)
描述: 使用正则表达式提取字符串内容。

🔄 类型转换函数

类型转换函数用于数据类型转换。

CAST - 类型转换函数

语法: cast(value as type)
描述: 将值转换为指定类型。

HEX2DEC - 十六进制转十进制函数

语法: hex2dec(hex_str)
描述: 将十六进制字符串转换为十进制数。

DEC2HEX - 十进制转十六进制函数

语法: dec2hex(number)
描述: 将十进制数转换为十六进制字符串。

ENCODE - 编码函数

语法: encode(str, encoding)
描述: 按照指定编码方式编码字符串。

DECODE - 解码函数

语法: decode(str, encoding)
描述: 按照指定编码方式解码字符串。

CONVERT_TZ - 时区转换函数

语法: convert_tz(datetime, from_tz, to_tz)
描述: 将日期时间从一个时区转换到另一个时区。

TO_SECONDS - 转换为秒函数

语法: to_seconds(datetime)
描述: 将日期时间转换为秒数。

CHR - 字符函数

语法: chr(number)
描述: 将ASCII码转换为字符。

TRUNC - 截断函数

语法: trunc(number, [precision])
描述: 截断数值到指定精度。

URL_ENCODE - URL编码函数

语法: url_encode(str)
描述: 对字符串进行URL编码。

URL_DECODE - URL解码函数

语法: url_decode(str)
描述: 对字符串进行URL解码。

时间日期函数

时间日期函数用于处理时间和日期数据。

NOW - 当前时间函数

语法: now()
描述: 返回当前的日期和时间。

CURRENT_TIME - 当前时间函数

语法: current_time()
描述: 返回当前时间。

CURRENT_DATE - 当前日期函数

语法: current_date()
描述: 返回当前日期。

🔗 JSON函数

JSON函数用于处理JSON数据。

TO_JSON - 转换为JSON函数

语法: to_json(value)
描述: 将值转换为JSON字符串。

FROM_JSON - 从JSON解析函数

语法: from_json(json_str)
描述: 从JSON字符串解析值。

JSON_EXTRACT - JSON提取函数

语法: json_extract(json_str, path)
描述: 从JSON字符串中提取指定路径的值。

JSON_VALID - JSON验证函数

语法: json_valid(json_str)
描述: 验证字符串是否为有效的JSON。

JSON_TYPE - JSON类型函数

语法: json_type(json_str)
描述: 返回JSON值的类型。

JSON_LENGTH - JSON长度函数

语法: json_length(json_str)
描述: 返回JSON数组或对象的长度。

🔐 哈希函数

哈希函数用于生成数据的哈希值。

MD5 - MD5哈希函数

语法: md5(str)
描述: 生成字符串的MD5哈希值。

SHA1 - SHA1哈希函数

语法: sha1(str)
描述: 生成字符串的SHA1哈希值。

SHA256 - SHA256哈希函数

语法: sha256(str)
描述: 生成字符串的SHA256哈希值。

SHA512 - SHA512哈希函数

语法: sha512(str)
描述: 生成字符串的SHA512哈希值。

📋 数组函数

数组函数用于处理数组数据。

ARRAY_LENGTH - 数组长度函数

语法: array_length(array)
描述: 返回数组的长度。

ARRAY_CONTAINS - 数组包含函数

语法: array_contains(array, value)
描述: 检查数组是否包含指定值。

ARRAY_POSITION - 数组位置函数

语法: array_position(array, value)
描述: 返回值在数组中的位置。

ARRAY_REMOVE - 数组移除函数

语法: array_remove(array, value)
描述: 从数组中移除指定值。

ARRAY_DISTINCT - 数组去重函数

语法: array_distinct(array)
描述: 返回数组的去重结果。

ARRAY_INTERSECT - 数组交集函数

语法: array_intersect(array1, array2)
描述: 返回两个数组的交集。

ARRAY_UNION - 数组并集函数

语法: array_union(array1, array2)
描述: 返回两个数组的并集。

ARRAY_EXCEPT - 数组差集函数

语法: array_except(array1, array2)
描述: 返回两个数组的差集。

🔍 类型检查函数

类型检查函数用于检查数据类型。

IS_NULL - 空值检查函数

语法: is_null(value)
描述: 检查值是否为NULL。

IS_NOT_NULL - 非空值检查函数

语法: is_not_null(value)
描述: 检查值是否不为NULL。

IS_NUMERIC - 数值检查函数

语法: is_numeric(value)
描述: 检查值是否为数值类型。

IS_STRING - 字符串检查函数

语法: is_string(value)
描述: 检查值是否为字符串类型。

IS_BOOL - 布尔值检查函数

语法: is_bool(value)
描述: 检查值是否为布尔类型。

IS_ARRAY - 数组检查函数

语法: is_array(value)
描述: 检查值是否为数组类型。

IS_OBJECT - 对象检查函数

语法: is_object(value)
描述: 检查值是否为对象类型。

条件函数

条件函数用于条件判断和值选择。

IF_NULL - 空值处理函数

语法: if_null(value, default_value)
描述: 如果值为NULL返回默认值否则返回原值。

COALESCE - 合并函数

语法: coalesce(value1, value2, ...)
描述: 返回第一个非NULL值。

NULL_IF - 空值转换函数

语法: null_if(value1, value2)
描述: 如果两个值相等返回NULL否则返回第一个值。

GREATEST - 最大值函数

语法: greatest(value1, value2, ...)
描述: 返回参数中的最大值。

LEAST - 最小值函数

语法: least(value1, value2, ...)
描述: 返回参数中的最小值。

CASE_WHEN - 条件选择函数

语法: case_when(condition, value_if_true, value_if_false)
描述: 根据条件返回不同的值。

📊 多行函数

多行函数用于处理多行数据。

UNNEST - 展开函数

语法: unnest(array)
描述: 将数组展开为多行。

🪟 扩展窗口函数

扩展窗口函数提供更多窗口相关功能。

ROW_NUMBER - 行号函数

语法: row_number() OVER (ORDER BY col)
描述: 为结果集中的每一行分配一个唯一的行号。
增量计算: 支持

FIRST_VALUE - 首值函数

语法: first_value(col) OVER (ORDER BY col)
描述: 返回窗口中第一行的值。
增量计算: 支持

LEAD - 前导函数

语法: lead(col, offset, default_value) OVER (ORDER BY col)
描述: 返回当前行之后第N行的值。
增量计算: 支持

NTH_VALUE - 第N个值函数

语法: nth_value(col, n) OVER (ORDER BY col)
描述: 返回窗口中第N行的值。
增量计算: 支持

🔧 表达式函数

表达式函数用于动态表达式计算。

EXPRESSION - 表达式函数

语法: expression(expr_str)
描述: 动态计算表达式字符串。

EXPR - 表达式简写函数

语法: expr(expr_str)
描述: expression函数的简写形式。

增量计算性能优势

支持增量计算的函数具有以下性能优势:

内存效率

  • 传统批量计算: 需要存储窗口内所有数据,内存使用 O(n)
  • 增量计算: 只存储必要的状态信息,内存使用 O(1) 或 O(log n)

计算效率

  • 传统批量计算: 每次窗口触发都重新计算所有数据,时间复杂度 O(n)
  • 增量计算: 只处理新增数据,时间复杂度 O(1)

实时性

  • 传统批量计算: 只能在窗口结束时输出结果
  • 增量计算: 可以实时输出中间结果

🔧 自定义函数扩展

StreamSQL 支持自定义函数扩展,详见 functions/custom_example.go 中的示例。可以实现:

  • 自定义聚合函数(支持增量计算)
  • 自定义分析函数(支持状态管理)
  • 自定义数学函数
  • 自定义字符串函数

通过实现相应的接口,自定义函数可以无缝集成到 StreamSQL 的函数体系中。