diff --git a/README.md b/README.md index ea3dd31..90a9120 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,6 @@ package main import ( "context" "fmt" - "testing" "time" "math/rand" @@ -45,62 +44,98 @@ import ( "github.com/rulego/streamsql" ) +// StreamSQL Usage Example +// This example demonstrates the complete workflow of StreamSQL: from instance creation to data processing and result handling func main() { + // Step 1: Create StreamSQL Instance + // StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle ssql := streamsql.New() - // Define the SQL statement. Every 5 seconds, group by deviceId and output the average temperature and minimum humidity of the device. + + // Step 2: Define Stream SQL Query Statement + // This SQL statement showcases StreamSQL's core capabilities: + // - SELECT: Choose output fields and aggregation functions + // - FROM stream: Specify the data source as stream data + // - WHERE: Filter condition, excluding device3 data + // - GROUP BY: Group by deviceId, combined with tumbling window for aggregation + // - TumblingWindow('5s'): 5-second tumbling window, triggers computation every 5 seconds + // - avg(), min(): Aggregation functions for calculating average and minimum values + // - window_start(), window_end(): Window functions to get window start and end times rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," + "window_start() as start,window_end() as end FROM stream where deviceId!='device3' group by deviceId,TumblingWindow('5s')" - // Create a stream processing task based on the SQL statement. + + // Step 3: Execute SQL Statement and Start Stream Analysis Task + // The Execute method parses SQL, builds execution plan, initializes window manager and aggregators err := ssql.Execute(rsql) if err != nil { panic(err) } + + // Step 4: Setup Test Environment and Concurrency Control var wg sync.WaitGroup wg.Add(1) - // Set a 30-second test timeout + // Set 30-second test timeout to prevent infinite running ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // Add test data + + // Step 5: Start Data Producer Goroutine + // Simulate real-time data stream, continuously feeding data into StreamSQL go func() { defer wg.Done() + // Create ticker to trigger data generation every second ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: - // Generate random test data, generating 10 data points per second + // Generate 10 random test data points per second, simulating high-frequency data stream + // This data density tests StreamSQL's real-time processing capability for i := 0; i < 10; i++ { + // Construct device data containing deviceId, temperature, and humidity randomData := map[string]interface{}{ - "deviceId": fmt.Sprintf("device%d", rand.Intn(2)+1), - "temperature": 20.0 + rand.Float64()*10, // Temperature between 20-30 degrees - "humidity": 50.0 + rand.Float64()*20, // Humidity between 50-70% + "deviceId": fmt.Sprintf("device%d", rand.Intn(2)+1), // Randomly select device1 or device2 + "temperature": 20.0 + rand.Float64()*10, // Temperature range: 20-30 degrees + "humidity": 50.0 + rand.Float64()*20, // Humidity range: 50-70% } - // Add data to the stream + // Add data to stream, triggering StreamSQL's real-time processing + // AddData distributes data to corresponding windows and aggregators ssql.stream.AddData(randomData) } case <-ctx.Done(): + // Timeout or cancellation signal, stop data generation return } } }() + // Step 6: Setup Result Processing Pipeline resultChan := make(chan interface{}) - // Add a result callback + // Add computation result callback function (Sink) + // When window triggers computation, results are output through this callback ssql.stream.AddSink(func(result interface{}) { resultChan <- result }) - // Count the number of results received + + // Step 7: Start Result Consumer Goroutine + // Count received results for effect verification resultCount := 0 go func() { for result := range resultChan { - // Print results every 5 seconds - fmt.Printf("Print result: [%s] %v\n", time.Now().Format("15:04:05.000"), result) + // Print results when window computation is triggered (every 5 seconds) + // This demonstrates StreamSQL's window-based aggregation results + fmt.Printf("Window Result [%s]: %v\n", time.Now().Format("15:04:05.000"), result) resultCount++ } }() - // End of test + + // Step 8: Wait for Processing Completion + // Wait for data producer goroutine to finish (30-second timeout or manual cancellation) wg.Wait() + + // Step 9: Display Final Statistics + // Show total number of window results received during the test period + fmt.Printf("\nTotal window results received: %d\n", resultCount) + fmt.Println("StreamSQL processing completed successfully!") } ``` diff --git a/README_ZH.md b/README_ZH.md index 70d60f2..a98a304 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -40,7 +40,7 @@ package main import ( "context" "fmt" - "testing" + "sync" "time" "math/rand" @@ -48,21 +48,42 @@ import ( ) func main() { + // 1. 创建StreamSQL实例 - 这是流式SQL处理引擎的入口 ssql := streamsql.New() - // 定义SQL语句。含义:每隔5秒按deviceId分组输出设备的温度平均值和湿度最小值。 + + // 2. 定义流式SQL查询语句 + // 核心概念解析: + // - TumblingWindow('5s'): 滚动窗口,每5秒创建一个新窗口,窗口之间不重叠 + // - GROUP BY deviceId: 按设备ID分组,每个设备独立计算 + // - avg(temperature): 聚合函数,计算窗口内温度的平均值 + // - min(humidity): 聚合函数,计算窗口内湿度的最小值 + // - window_start()/window_end(): 窗口函数,获取当前窗口的开始和结束时间 rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," + "window_start() as start,window_end() as end FROM stream where deviceId!='device3' group by deviceId,TumblingWindow('5s')" - // 根据SQL语句,创建流式分析任务。 + + // 3. 解析并执行SQL语句,创建流式分析任务 + // 这一步会: + // - 解析SQL语句,构建执行计划 + // - 创建窗口管理器(每5秒触发一次计算) + // - 设置数据过滤条件(排除device3) + // - 配置聚合计算逻辑 err := ssql.Execute(rsql) if err != nil { panic(err) } + var wg sync.WaitGroup wg.Add(1) // 设置30秒测试超时时间 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // 添加测试数据 + + // 4. 数据生产者 - 模拟实时数据流输入 + // 在实际应用中,这可能是: + // - IoT设备传感器数据 + // - 用户行为事件 + // - 系统监控指标 + // - 消息队列数据等 go func() { defer wg.Done() ticker := time.NewTicker(1 * time.Second) @@ -70,14 +91,23 @@ func main() { for { select { case <-ticker.C: - // 生成随机测试数据,每秒生成10条数据 + // 每秒生成10条随机数据,模拟高频数据流 + // 数据特点: + // - 只有device1和device2(device3被SQL过滤掉) + // - 温度范围:20-30度 + // - 湿度范围:50-70% for i := 0; i < 10; i++ { randomData := map[string]interface{}{ - "deviceId": fmt.Sprintf("device%d", rand.Intn(2)+1), - "temperature": 20.0 + rand.Float64()*10, // 20-30度之间 - "humidity": 50.0 + rand.Float64()*20, // 50-70%湿度 + "deviceId": fmt.Sprintf("device%d", rand.Intn(2)+1), // 随机生成device1或device2 + "temperature": 20.0 + rand.Float64()*10, // 20-30度之间的随机温度 + "humidity": 50.0 + rand.Float64()*20, // 50-70%之间的随机湿度 } - // 将数据添加到流中 + // 将数据推送到流处理引擎 + // 引擎会自动: + // - 应用WHERE过滤条件 + // - 按deviceId分组 + // - 将数据分配到对应的时间窗口 + // - 更新聚合计算状态 ssql.stream.AddData(randomData) } @@ -87,22 +117,41 @@ func main() { } }() + // 5. 结果处理管道 - 接收窗口计算结果 resultChan := make(chan interface{}) - // 添加计算结果回调 + + // 6. 注册结果回调函数 + // 当窗口触发时(每5秒),会调用这个回调函数 + // 传递聚合计算的结果 ssql.stream.AddSink(func(result interface{}) { resultChan <- result }) - // 记录收到的结果数量 + + // 7. 结果消费者 - 处理计算结果 + // 在实际应用中,这里可能是: + // - 发送告警通知 + // - 存储到数据库 + // - 推送到仪表板 + // - 触发下游业务逻辑 resultCount := 0 go func() { for result := range resultChan { - //每隔5秒打印一次结果 - fmt.Printf("打印结果: [%s] %v\n", time.Now().Format("15:04:05.000"), result) + // 每当5秒窗口结束时,会收到该窗口的聚合结果 + // 结果包含: + // - deviceId: 设备ID + // - avg_temp: 该设备在窗口内的平均温度 + // - min_humidity: 该设备在窗口内的最小湿度 + // - start/end: 窗口的时间范围 + fmt.Printf("窗口计算结果: [%s] %v\n", time.Now().Format("15:04:05.000"), result) resultCount++ } }() - //测试结束 + + // 8. 等待测试完成 + // 整个流程展示了StreamSQL的核心工作原理: + // 数据输入 -> 过滤 -> 分组 -> 窗口聚合 -> 结果输出 wg.Wait() + fmt.Printf("\n测试完成,共收到 %d 个窗口结果\n", resultCount) } ``` diff --git a/docs/FUNCTIONS_USAGE_GUIDE.md b/docs/FUNCTIONS_USAGE_GUIDE.md index 73db268..ba7eb32 100644 --- a/docs/FUNCTIONS_USAGE_GUIDE.md +++ b/docs/FUNCTIONS_USAGE_GUIDE.md @@ -258,146 +258,119 @@ GROUP BY device, TumblingWindow('10s') ### SQRT - 平方根函数 **语法**: `sqrt(number)` **描述**: 返回数值的平方根。 -**增量计算**: ❌ 不支持(单值函数) ### POWER - 幂函数 **语法**: `power(base, exponent)` **描述**: 返回底数的指定次幂。 -**增量计算**: ❌ 不支持(单值函数) - + ### CEILING - 向上取整函数 **语法**: `ceiling(number)` **描述**: 返回大于或等于指定数值的最小整数。 -**增量计算**: ❌ 不支持(单值函数) ### FLOOR - 向下取整函数 **语法**: `floor(number)` **描述**: 返回小于或等于指定数值的最大整数。 -**增量计算**: ❌ 不支持(单值函数) - + ### ROUND - 四舍五入函数 **语法**: `round(number, [precision])` **描述**: 将数值四舍五入到指定的小数位数。 -**增量计算**: ❌ 不支持(单值函数) - + ### MOD - 取模函数 **语法**: `mod(dividend, divisor)` **描述**: 返回除法运算的余数。 -**增量计算**: ❌ 不支持(单值函数) - + ### RAND - 随机数函数 **语法**: `rand()` **描述**: 返回0到1之间的随机数。 -**增量计算**: ❌ 不支持(单值函数) ### SIGN - 符号函数 **语法**: `sign(number)` **描述**: 返回数值的符号(-1、0或1)。 -**增量计算**: ❌ 不支持(单值函数) - + ### 三角函数 #### SIN - 正弦函数 **语法**: `sin(number)` **描述**: 返回角度的正弦值(弧度制)。 -**增量计算**: ❌ 不支持(单值函数) #### COS - 余弦函数 **语法**: `cos(number)` **描述**: 返回角度的余弦值(弧度制)。 -**增量计算**: ❌ 不支持(单值函数) - + #### TAN - 正切函数 **语法**: `tan(number)` **描述**: 返回角度的正切值(弧度制)。 -**增量计算**: ❌ 不支持(单值函数) - + #### ASIN - 反正弦函数 **语法**: `asin(number)` **描述**: 返回数值的反正弦值(弧度制)。 -**增量计算**: ❌ 不支持(单值函数) - + #### ACOS - 反余弦函数 **语法**: `acos(number)` **描述**: 返回数值的反余弦值(弧度制)。 -**增量计算**: ❌ 不支持(单值函数) - + #### ATAN - 反正切函数 **语法**: `atan(number)` **描述**: 返回数值的反正切值(弧度制)。 -**增量计算**: ❌ 不支持(单值函数) - + #### ATAN2 - 双参数反正切函数 **语法**: `atan2(y, x)` **描述**: 返回y/x的反正切值(弧度制)。 -**增量计算**: ❌ 不支持(单值函数) - + ### 双曲函数 #### SINH - 双曲正弦函数 **语法**: `sinh(number)` **描述**: 返回数值的双曲正弦值。 -**增量计算**: ❌ 不支持(单值函数) - + #### COSH - 双曲余弦函数 **语法**: `cosh(number)` **描述**: 返回数值的双曲余弦值。 -**增量计算**: ❌ 不支持(单值函数) - + #### TANH - 双曲正切函数 **语法**: `tanh(number)` **描述**: 返回数值的双曲正切值。 -**增量计算**: ❌ 不支持(单值函数) - + ### 对数和指数函数 #### EXP - 指数函数 **语法**: `exp(number)` **描述**: 返回e的指定次幂。 -**增量计算**: ❌ 不支持(单值函数) - + #### LN - 自然对数函数 **语法**: `ln(number)` **描述**: 返回数值的自然对数。 -**增量计算**: ❌ 不支持(单值函数) - + #### LOG - 对数函数 **语法**: `log(base, number)` **描述**: 返回指定底数的对数。 -**增量计算**: ❌ 不支持(单值函数) - + #### LOG10 - 常用对数函数 **语法**: `log10(number)` **描述**: 返回数值的常用对数(以10为底)。 -**增量计算**: ❌ 不支持(单值函数) - + #### LOG2 - 二进制对数函数 **语法**: `log2(number)` **描述**: 返回数值的二进制对数(以2为底)。 -**增量计算**: ❌ 不支持(单值函数) - + ### 位运算函数 #### BIT_AND - 位与函数 **语法**: `bit_and(number1, number2)` **描述**: 对两个整数执行位与运算。 -**增量计算**: ❌ 不支持(单值函数) - + #### BIT_OR - 位或函数 **语法**: `bit_or(number1, number2)` **描述**: 对两个整数执行位或运算。 -**增量计算**: ❌ 不支持(单值函数) - + #### BIT_XOR - 位异或函数 **语法**: `bit_xor(number1, number2)` **描述**: 对两个整数执行位异或运算。 -**增量计算**: ❌ 不支持(单值函数) - + #### BIT_NOT - 位非函数 **语法**: `bit_not(number)` **描述**: 对整数执行位非运算。 -**增量计算**: ❌ 不支持(单值函数) - + ## 📝 字符串函数 字符串函数用于文本处理。 @@ -405,99 +378,80 @@ GROUP BY device, TumblingWindow('10s') ### UPPER - 转大写函数 **语法**: `upper(str)` **描述**: 将字符串转换为大写。 -**增量计算**: ❌ 不支持(单值函数) - + ### LOWER - 转小写函数 **语法**: `lower(str)` **描述**: 将字符串转换为小写。 -**增量计算**: ❌ 不支持(单值函数) - + ### CONCAT - 字符串连接函数 **语法**: `concat(str1, str2, ...)` **描述**: 连接多个字符串。 -**增量计算**: ❌ 不支持(单值函数) - + ### LENGTH - 字符串长度函数 **语法**: `length(str)` **描述**: 返回字符串的长度。 -**增量计算**: ❌ 不支持(单值函数) - + ### SUBSTRING - 子字符串函数 **语法**: `substring(str, start, [length])` **描述**: 从字符串中提取子字符串。 -**增量计算**: ❌ 不支持(单值函数) - + ### TRIM - 去除空格函数 **语法**: `trim(str)` **描述**: 去除字符串两端的空格。 -**增量计算**: ❌ 不支持(单值函数) - + ### LTRIM - 去除左侧空格函数 **语法**: `ltrim(str)` **描述**: 去除字符串左侧的空格。 -**增量计算**: ❌ 不支持(单值函数) - + ### RTRIM - 去除右侧空格函数 **语法**: `rtrim(str)` **描述**: 去除字符串右侧的空格。 -**增量计算**: ❌ 不支持(单值函数) ### FORMAT - 格式化函数 **语法**: `format(format_str, ...)` **描述**: 按照指定格式格式化字符串。 -**增量计算**: ❌ 不支持(单值函数) ### ENDSWITH - 结尾检查函数 **语法**: `endswith(str, suffix)` **描述**: 检查字符串是否以指定后缀结尾。 -**增量计算**: ❌ 不支持(单值函数) ### STARTSWITH - 开头检查函数 **语法**: `startswith(str, prefix)` **描述**: 检查字符串是否以指定前缀开头。 -**增量计算**: ❌ 不支持(单值函数) ### INDEXOF - 查找位置函数 **语法**: `indexof(str, substring)` **描述**: 返回子字符串在字符串中的位置。 -**增量计算**: ❌ 不支持(单值函数) - + ### REPLACE - 替换函数 **语法**: `replace(str, old_str, new_str)` **描述**: 替换字符串中的指定内容。 -**增量计算**: ❌ 不支持(单值函数) ### SPLIT - 分割函数 **语法**: `split(str, delimiter)` **描述**: 按照分隔符分割字符串。 -**增量计算**: ❌ 不支持(单值函数) - + ### LPAD - 左填充函数 **语法**: `lpad(str, length, pad_str)` **描述**: 在字符串左侧填充字符到指定长度。 -**增量计算**: ❌ 不支持(单值函数) - + ### RPAD - 右填充函数 **语法**: `rpad(str, length, pad_str)` **描述**: 在字符串右侧填充字符到指定长度。 -**增量计算**: ❌ 不支持(单值函数) - + ### 正则表达式函数 #### REGEXP_MATCHES - 正则匹配函数 **语法**: `regexp_matches(str, pattern)` **描述**: 检查字符串是否匹配正则表达式。 -**增量计算**: ❌ 不支持(单值函数) #### REGEXP_REPLACE - 正则替换函数 **语法**: `regexp_replace(str, pattern, replacement)` **描述**: 使用正则表达式替换字符串内容。 -**增量计算**: ❌ 不支持(单值函数) - + #### REGEXP_SUBSTRING - 正则提取函数 **语法**: `regexp_substring(str, pattern)` **描述**: 使用正则表达式提取字符串内容。 -**增量计算**: ❌ 不支持(单值函数) ## 🔄 类型转换函数 @@ -506,58 +460,47 @@ GROUP BY device, TumblingWindow('10s') ### CAST - 类型转换函数 **语法**: `cast(value as type)` **描述**: 将值转换为指定类型。 -**增量计算**: ❌ 不支持(单值函数) - + ### HEX2DEC - 十六进制转十进制函数 **语法**: `hex2dec(hex_str)` **描述**: 将十六进制字符串转换为十进制数。 -**增量计算**: ❌ 不支持(单值函数) - + ### DEC2HEX - 十进制转十六进制函数 **语法**: `dec2hex(number)` **描述**: 将十进制数转换为十六进制字符串。 -**增量计算**: ❌ 不支持(单值函数) ### ENCODE - 编码函数 **语法**: `encode(str, encoding)` **描述**: 按照指定编码方式编码字符串。 -**增量计算**: ❌ 不支持(单值函数) - + ### DECODE - 解码函数 **语法**: `decode(str, encoding)` **描述**: 按照指定编码方式解码字符串。 -**增量计算**: ❌ 不支持(单值函数) - + ### CONVERT_TZ - 时区转换函数 **语法**: `convert_tz(datetime, from_tz, to_tz)` **描述**: 将日期时间从一个时区转换到另一个时区。 -**增量计算**: ❌ 不支持(单值函数) ### TO_SECONDS - 转换为秒函数 **语法**: `to_seconds(datetime)` **描述**: 将日期时间转换为秒数。 -**增量计算**: ❌ 不支持(单值函数) ### CHR - 字符函数 **语法**: `chr(number)` **描述**: 将ASCII码转换为字符。 -**增量计算**: ❌ 不支持(单值函数) - + ### TRUNC - 截断函数 **语法**: `trunc(number, [precision])` **描述**: 截断数值到指定精度。 -**增量计算**: ❌ 不支持(单值函数) - + ### URL_ENCODE - URL编码函数 **语法**: `url_encode(str)` **描述**: 对字符串进行URL编码。 -**增量计算**: ❌ 不支持(单值函数) - + ### URL_DECODE - URL解码函数 **语法**: `url_decode(str)` **描述**: 对字符串进行URL解码。 -**增量计算**: ❌ 不支持(单值函数) - + ## ⏰ 时间日期函数 时间日期函数用于处理时间和日期数据。 @@ -565,18 +508,15 @@ GROUP BY device, TumblingWindow('10s') ### NOW - 当前时间函数 **语法**: `now()` **描述**: 返回当前的日期和时间。 -**增量计算**: ❌ 不支持(单值函数) - + ### CURRENT_TIME - 当前时间函数 **语法**: `current_time()` **描述**: 返回当前时间。 -**增量计算**: ❌ 不支持(单值函数) - + ### CURRENT_DATE - 当前日期函数 **语法**: `current_date()` **描述**: 返回当前日期。 -**增量计算**: ❌ 不支持(单值函数) - + ## 🔗 JSON函数 JSON函数用于处理JSON数据。 @@ -584,32 +524,26 @@ JSON函数用于处理JSON数据。 ### TO_JSON - 转换为JSON函数 **语法**: `to_json(value)` **描述**: 将值转换为JSON字符串。 -**增量计算**: ❌ 不支持(单值函数) - + ### FROM_JSON - 从JSON解析函数 **语法**: `from_json(json_str)` **描述**: 从JSON字符串解析值。 -**增量计算**: ❌ 不支持(单值函数) - + ### JSON_EXTRACT - JSON提取函数 **语法**: `json_extract(json_str, path)` **描述**: 从JSON字符串中提取指定路径的值。 -**增量计算**: ❌ 不支持(单值函数) - + ### JSON_VALID - JSON验证函数 **语法**: `json_valid(json_str)` **描述**: 验证字符串是否为有效的JSON。 -**增量计算**: ❌ 不支持(单值函数) ### JSON_TYPE - JSON类型函数 **语法**: `json_type(json_str)` **描述**: 返回JSON值的类型。 -**增量计算**: ❌ 不支持(单值函数) ### JSON_LENGTH - JSON长度函数 **语法**: `json_length(json_str)` **描述**: 返回JSON数组或对象的长度。 -**增量计算**: ❌ 不支持(单值函数) ## 🔐 哈希函数 @@ -618,22 +552,18 @@ JSON函数用于处理JSON数据。 ### MD5 - MD5哈希函数 **语法**: `md5(str)` **描述**: 生成字符串的MD5哈希值。 -**增量计算**: ❌ 不支持(单值函数) - + ### SHA1 - SHA1哈希函数 **语法**: `sha1(str)` **描述**: 生成字符串的SHA1哈希值。 -**增量计算**: ❌ 不支持(单值函数) ### SHA256 - SHA256哈希函数 **语法**: `sha256(str)` **描述**: 生成字符串的SHA256哈希值。 -**增量计算**: ❌ 不支持(单值函数) ### SHA512 - SHA512哈希函数 **语法**: `sha512(str)` **描述**: 生成字符串的SHA512哈希值。 -**增量计算**: ❌ 不支持(单值函数) ## 📋 数组函数 @@ -642,43 +572,36 @@ JSON函数用于处理JSON数据。 ### ARRAY_LENGTH - 数组长度函数 **语法**: `array_length(array)` **描述**: 返回数组的长度。 -**增量计算**: ❌ 不支持(单值函数) ### ARRAY_CONTAINS - 数组包含函数 **语法**: `array_contains(array, value)` **描述**: 检查数组是否包含指定值。 -**增量计算**: ❌ 不支持(单值函数) - + ### ARRAY_POSITION - 数组位置函数 **语法**: `array_position(array, value)` **描述**: 返回值在数组中的位置。 -**增量计算**: ❌ 不支持(单值函数) ### ARRAY_REMOVE - 数组移除函数 **语法**: `array_remove(array, value)` **描述**: 从数组中移除指定值。 -**增量计算**: ❌ 不支持(单值函数) + ### ARRAY_DISTINCT - 数组去重函数 **语法**: `array_distinct(array)` **描述**: 返回数组的去重结果。 -**增量计算**: ❌ 不支持(单值函数) - + ### ARRAY_INTERSECT - 数组交集函数 **语法**: `array_intersect(array1, array2)` **描述**: 返回两个数组的交集。 -**增量计算**: ❌ 不支持(单值函数) - + ### ARRAY_UNION - 数组并集函数 **语法**: `array_union(array1, array2)` **描述**: 返回两个数组的并集。 -**增量计算**: ❌ 不支持(单值函数) - + ### ARRAY_EXCEPT - 数组差集函数 **语法**: `array_except(array1, array2)` **描述**: 返回两个数组的差集。 -**增量计算**: ❌ 不支持(单值函数) - + ## 🔍 类型检查函数 类型检查函数用于检查数据类型。 @@ -686,38 +609,32 @@ JSON函数用于处理JSON数据。 ### IS_NULL - 空值检查函数 **语法**: `is_null(value)` **描述**: 检查值是否为NULL。 -**增量计算**: ❌ 不支持(单值函数) - + ### IS_NOT_NULL - 非空值检查函数 **语法**: `is_not_null(value)` **描述**: 检查值是否不为NULL。 -**增量计算**: ❌ 不支持(单值函数) + ### IS_NUMERIC - 数值检查函数 **语法**: `is_numeric(value)` **描述**: 检查值是否为数值类型。 -**增量计算**: ❌ 不支持(单值函数) - + ### IS_STRING - 字符串检查函数 **语法**: `is_string(value)` **描述**: 检查值是否为字符串类型。 -**增量计算**: ❌ 不支持(单值函数) - + ### IS_BOOL - 布尔值检查函数 **语法**: `is_bool(value)` **描述**: 检查值是否为布尔类型。 -**增量计算**: ❌ 不支持(单值函数) - + ### IS_ARRAY - 数组检查函数 **语法**: `is_array(value)` **描述**: 检查值是否为数组类型。 -**增量计算**: ❌ 不支持(单值函数) - + ### IS_OBJECT - 对象检查函数 **语法**: `is_object(value)` **描述**: 检查值是否为对象类型。 -**增量计算**: ❌ 不支持(单值函数) - + ## ❓ 条件函数 条件函数用于条件判断和值选择。 @@ -725,32 +642,26 @@ JSON函数用于处理JSON数据。 ### IF_NULL - 空值处理函数 **语法**: `if_null(value, default_value)` **描述**: 如果值为NULL,返回默认值,否则返回原值。 -**增量计算**: ❌ 不支持(单值函数) - + ### COALESCE - 合并函数 **语法**: `coalesce(value1, value2, ...)` **描述**: 返回第一个非NULL值。 -**增量计算**: ❌ 不支持(单值函数) ### NULL_IF - 空值转换函数 **语法**: `null_if(value1, value2)` **描述**: 如果两个值相等,返回NULL,否则返回第一个值。 -**增量计算**: ❌ 不支持(单值函数) ### GREATEST - 最大值函数 **语法**: `greatest(value1, value2, ...)` **描述**: 返回参数中的最大值。 -**增量计算**: ❌ 不支持(单值函数) ### LEAST - 最小值函数 **语法**: `least(value1, value2, ...)` **描述**: 返回参数中的最小值。 -**增量计算**: ❌ 不支持(单值函数) ### CASE_WHEN - 条件选择函数 **语法**: `case_when(condition, value_if_true, value_if_false)` **描述**: 根据条件返回不同的值。 -**增量计算**: ❌ 不支持(单值函数) ## 📊 多行函数 @@ -759,8 +670,7 @@ JSON函数用于处理JSON数据。 ### UNNEST - 展开函数 **语法**: `unnest(array)` **描述**: 将数组展开为多行。 -**增量计算**: ❌ 不支持(单值函数) - + ## 🪟 扩展窗口函数 扩展窗口函数提供更多窗口相关功能。 @@ -792,13 +702,11 @@ JSON函数用于处理JSON数据。 ### EXPRESSION - 表达式函数 **语法**: `expression(expr_str)` **描述**: 动态计算表达式字符串。 -**增量计算**: ❌ 不支持(单值函数) ### EXPR - 表达式简写函数 **语法**: `expr(expr_str)` **描述**: expression函数的简写形式。 -**增量计算**: ❌ 不支持(单值函数) - + ## ⚡ 增量计算性能优势 支持增量计算的函数具有以下性能优势: @@ -815,22 +723,6 @@ JSON函数用于处理JSON数据。 - **传统批量计算**: 只能在窗口结束时输出结果 - **增量计算**: 可以实时输出中间结果 -### 性能测试结果 -根据我们的性能测试: -- **计算速度**: 增量计算比批量计算快 2-3 倍 -- **内存使用**: 减少 99.9% 以上的内存占用 -- **实时性**: 支持流式处理,实时输出中间结果 - -## 💡 使用建议 - -1. **优先使用支持增量计算的函数**: 在大数据量和高频率数据流场景下,优先选择支持增量计算的函数。 - -2. **合理选择窗口大小**: 窗口大小影响计算精度和性能,需要根据业务需求平衡。 - -3. **组合使用函数**: 可以在同一个查询中组合使用多个函数,实现复杂的分析需求。 - -4. **注意数据类型**: 确保输入数据类型与函数要求匹配,避免类型转换错误。 - ## 🔧 自定义函数扩展 StreamSQL 支持自定义函数扩展,详见 `functions/custom_example.go` 中的示例。可以实现: diff --git a/streamsql_test.go b/streamsql_test.go index 4012c61..ce95020 100644 --- a/streamsql_test.go +++ b/streamsql_test.go @@ -17,64 +17,97 @@ import ( "github.com/stretchr/testify/require" ) +// TestStreamData 测试 StreamSQL 的流式数据处理功能 +// 这个测试演示了 StreamSQL 的完整工作流程:从创建实例到数据处理再到结果验证 func TestStreamData(t *testing.T) { + // 步骤1: 创建 StreamSQL 实例 + // StreamSQL 是流式 SQL 处理引擎的核心组件,负责管理整个流处理生命周期 ssql := New() - // 定义SQL语句。TumblingWindow 滚动窗口,5秒滚动一次 + + // 步骤2: 定义流式 SQL 查询语句 + // 这个 SQL 语句展示了 StreamSQL 的核心功能: + // - SELECT: 选择要输出的字段和聚合函数 + // - FROM stream: 指定数据源为流数据 + // - WHERE: 过滤条件,排除 device3 的数据 + // - GROUP BY: 按设备ID分组,配合滚动窗口进行聚合 + // - TumblingWindow('5s'): 5秒滚动窗口,每5秒触发一次计算 + // - avg(), min(): 聚合函数,计算平均值和最小值 + // - window_start(), window_end(): 窗口函数,获取窗口的开始和结束时间 rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," + "window_start() as start,window_end() as end FROM stream where deviceId!='device3' group by deviceId,TumblingWindow('5s')" - // 根据SQL语句,创建流式分析任务。 + + // 步骤3: 执行 SQL 语句,启动流式分析任务 + // Execute 方法会解析 SQL、构建执行计划、初始化窗口管理器和聚合器 err := ssql.Execute(rsql) if err != nil { panic(err) } + + // 步骤4: 设置测试环境和并发控制 var wg sync.WaitGroup wg.Add(1) - // 设置30秒测试超时时间 + // 设置30秒测试超时时间,防止测试无限运行 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // 添加测试数据 + + // 步骤5: 启动数据生产者协程 + // 模拟实时数据流,持续向 StreamSQL 输入数据 go func() { defer wg.Done() + // 创建定时器,每秒触发一次数据生成 ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: - // 生成随机测试数据,每秒生成10条数据 + // 每秒生成10条随机测试数据,模拟高频数据流 + // 这种数据密度可以测试 StreamSQL 的实时处理能力 for i := 0; i < 10; i++ { + // 构造设备数据,包含设备ID、温度和湿度 randomData := map[string]interface{}{ - "deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1), - "temperature": 20.0 + rand.Float64()*10, // 20-30度之间 - "humidity": 50.0 + rand.Float64()*20, // 50-70%湿度 + "deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1), // 随机选择 device1, device2, device3 + "temperature": 20.0 + rand.Float64()*10, // 温度范围: 20-30度 + "humidity": 50.0 + rand.Float64()*20, // 湿度范围: 50-70% } - // 将数据添加到流中 + // 将数据添加到流中,触发 StreamSQL 的实时处理 + // AddData 会将数据分发到相应的窗口和聚合器中 ssql.stream.AddData(randomData) } case <-ctx.Done(): + // 超时或取消信号,停止数据生成 return } } }() + // 步骤6: 设置结果处理管道 resultChan := make(chan interface{}) - // 添加计算结果回调 + // 添加计算结果回调函数(Sink) + // 当窗口触发计算时,结果会通过这个回调函数输出 ssql.stream.AddSink(func(result interface{}) { resultChan <- result }) - // 记录收到的结果数量 + + // 步骤7: 启动结果消费者协程 + // 记录收到的结果数量,用于验证测试效果 resultCount := 0 go func() { for range resultChan { - //每隔5秒打印一次结果 + // 每当收到一个窗口的计算结果时,计数器加1 + // 注释掉的代码可以用于调试,打印每个结果的详细信息 //fmt.Printf("打印结果: [%s] %v\n", time.Now().Format("15:04:05.000"), result) resultCount++ } }() - //测试结束 + + // 步骤8: 等待测试完成 + // 等待数据生产者协程结束(30秒超时或手动取消) wg.Wait() - // 验证是否收到了结果 + // 步骤9: 验证测试结果 + // 预期在30秒内应该收到5个窗口的计算结果(每5秒一个窗口) + // 这验证了 StreamSQL 的窗口触发机制是否正常工作 assert.Equal(t, resultCount, 5) }