mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-04-14 16:18:20 +00:00
@@ -37,7 +37,6 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"math/rand"
|
||||
@@ -45,62 +44,98 @@ import (
|
||||
"github.com/rulego/streamsql"
|
||||
)
|
||||
|
||||
// StreamSQL Usage Example
|
||||
// This example demonstrates the complete workflow of StreamSQL: from instance creation to data processing and result handling
|
||||
func main() {
|
||||
// Step 1: Create StreamSQL Instance
|
||||
// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
|
||||
ssql := streamsql.New()
|
||||
// Define the SQL statement. Every 5 seconds, group by deviceId and output the average temperature and minimum humidity of the device.
|
||||
|
||||
// Step 2: Define Stream SQL Query Statement
|
||||
// This SQL statement showcases StreamSQL's core capabilities:
|
||||
// - SELECT: Choose output fields and aggregation functions
|
||||
// - FROM stream: Specify the data source as stream data
|
||||
// - WHERE: Filter condition, excluding device3 data
|
||||
// - GROUP BY: Group by deviceId, combined with tumbling window for aggregation
|
||||
// - TumblingWindow('5s'): 5-second tumbling window, triggers computation every 5 seconds
|
||||
// - avg(), min(): Aggregation functions for calculating average and minimum values
|
||||
// - window_start(), window_end(): Window functions to get window start and end times
|
||||
rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," +
|
||||
"window_start() as start,window_end() as end FROM stream where deviceId!='device3' group by deviceId,TumblingWindow('5s')"
|
||||
// Create a stream processing task based on the SQL statement.
|
||||
|
||||
// Step 3: Execute SQL Statement and Start Stream Analysis Task
|
||||
// The Execute method parses SQL, builds execution plan, initializes window manager and aggregators
|
||||
err := ssql.Execute(rsql)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Step 4: Setup Test Environment and Concurrency Control
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
// Set a 30-second test timeout
|
||||
// Set 30-second test timeout to prevent infinite running
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
// Add test data
|
||||
|
||||
// Step 5: Start Data Producer Goroutine
|
||||
// Simulate real-time data stream, continuously feeding data into StreamSQL
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// Create ticker to trigger data generation every second
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// Generate random test data, generating 10 data points per second
|
||||
// Generate 10 random test data points per second, simulating high-frequency data stream
|
||||
// This data density tests StreamSQL's real-time processing capability
|
||||
for i := 0; i < 10; i++ {
|
||||
// Construct device data containing deviceId, temperature, and humidity
|
||||
randomData := map[string]interface{}{
|
||||
"deviceId": fmt.Sprintf("device%d", rand.Intn(2)+1),
|
||||
"temperature": 20.0 + rand.Float64()*10, // Temperature between 20-30 degrees
|
||||
"humidity": 50.0 + rand.Float64()*20, // Humidity between 50-70%
|
||||
"deviceId": fmt.Sprintf("device%d", rand.Intn(2)+1), // Randomly select device1 or device2
|
||||
"temperature": 20.0 + rand.Float64()*10, // Temperature range: 20-30 degrees
|
||||
"humidity": 50.0 + rand.Float64()*20, // Humidity range: 50-70%
|
||||
}
|
||||
// Add data to the stream
|
||||
// Add data to stream, triggering StreamSQL's real-time processing
|
||||
// AddData distributes data to corresponding windows and aggregators
|
||||
ssql.stream.AddData(randomData)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
// Timeout or cancellation signal, stop data generation
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Step 6: Setup Result Processing Pipeline
|
||||
resultChan := make(chan interface{})
|
||||
// Add a result callback
|
||||
// Add computation result callback function (Sink)
|
||||
// When window triggers computation, results are output through this callback
|
||||
ssql.stream.AddSink(func(result interface{}) {
|
||||
resultChan <- result
|
||||
})
|
||||
// Count the number of results received
|
||||
|
||||
// Step 7: Start Result Consumer Goroutine
|
||||
// Count received results for effect verification
|
||||
resultCount := 0
|
||||
go func() {
|
||||
for result := range resultChan {
|
||||
// Print results every 5 seconds
|
||||
fmt.Printf("Print result: [%s] %v\n", time.Now().Format("15:04:05.000"), result)
|
||||
// Print results when window computation is triggered (every 5 seconds)
|
||||
// This demonstrates StreamSQL's window-based aggregation results
|
||||
fmt.Printf("Window Result [%s]: %v\n", time.Now().Format("15:04:05.000"), result)
|
||||
resultCount++
|
||||
}
|
||||
}()
|
||||
// End of test
|
||||
|
||||
// Step 8: Wait for Processing Completion
|
||||
// Wait for data producer goroutine to finish (30-second timeout or manual cancellation)
|
||||
wg.Wait()
|
||||
|
||||
// Step 9: Display Final Statistics
|
||||
// Show total number of window results received during the test period
|
||||
fmt.Printf("\nTotal window results received: %d\n", resultCount)
|
||||
fmt.Println("StreamSQL processing completed successfully!")
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
+63
-14
@@ -40,7 +40,7 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"math/rand"
|
||||
@@ -48,21 +48,42 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 1. 创建StreamSQL实例 - 这是流式SQL处理引擎的入口
|
||||
ssql := streamsql.New()
|
||||
// 定义SQL语句。含义:每隔5秒按deviceId分组输出设备的温度平均值和湿度最小值。
|
||||
|
||||
// 2. 定义流式SQL查询语句
|
||||
// 核心概念解析:
|
||||
// - TumblingWindow('5s'): 滚动窗口,每5秒创建一个新窗口,窗口之间不重叠
|
||||
// - GROUP BY deviceId: 按设备ID分组,每个设备独立计算
|
||||
// - avg(temperature): 聚合函数,计算窗口内温度的平均值
|
||||
// - min(humidity): 聚合函数,计算窗口内湿度的最小值
|
||||
// - window_start()/window_end(): 窗口函数,获取当前窗口的开始和结束时间
|
||||
rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," +
|
||||
"window_start() as start,window_end() as end FROM stream where deviceId!='device3' group by deviceId,TumblingWindow('5s')"
|
||||
// 根据SQL语句,创建流式分析任务。
|
||||
|
||||
// 3. 解析并执行SQL语句,创建流式分析任务
|
||||
// 这一步会:
|
||||
// - 解析SQL语句,构建执行计划
|
||||
// - 创建窗口管理器(每5秒触发一次计算)
|
||||
// - 设置数据过滤条件(排除device3)
|
||||
// - 配置聚合计算逻辑
|
||||
err := ssql.Execute(rsql)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
// 设置30秒测试超时时间
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
// 添加测试数据
|
||||
|
||||
// 4. 数据生产者 - 模拟实时数据流输入
|
||||
// 在实际应用中,这可能是:
|
||||
// - IoT设备传感器数据
|
||||
// - 用户行为事件
|
||||
// - 系统监控指标
|
||||
// - 消息队列数据等
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
@@ -70,14 +91,23 @@ func main() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// 生成随机测试数据,每秒生成10条数据
|
||||
// 每秒生成10条随机数据,模拟高频数据流
|
||||
// 数据特点:
|
||||
// - 只有device1和device2(device3被SQL过滤掉)
|
||||
// - 温度范围:20-30度
|
||||
// - 湿度范围:50-70%
|
||||
for i := 0; i < 10; i++ {
|
||||
randomData := map[string]interface{}{
|
||||
"deviceId": fmt.Sprintf("device%d", rand.Intn(2)+1),
|
||||
"temperature": 20.0 + rand.Float64()*10, // 20-30度之间
|
||||
"humidity": 50.0 + rand.Float64()*20, // 50-70%湿度
|
||||
"deviceId": fmt.Sprintf("device%d", rand.Intn(2)+1), // 随机生成device1或device2
|
||||
"temperature": 20.0 + rand.Float64()*10, // 20-30度之间的随机温度
|
||||
"humidity": 50.0 + rand.Float64()*20, // 50-70%之间的随机湿度
|
||||
}
|
||||
// 将数据添加到流中
|
||||
// 将数据推送到流处理引擎
|
||||
// 引擎会自动:
|
||||
// - 应用WHERE过滤条件
|
||||
// - 按deviceId分组
|
||||
// - 将数据分配到对应的时间窗口
|
||||
// - 更新聚合计算状态
|
||||
ssql.stream.AddData(randomData)
|
||||
}
|
||||
|
||||
@@ -87,22 +117,41 @@ func main() {
|
||||
}
|
||||
}()
|
||||
|
||||
// 5. 结果处理管道 - 接收窗口计算结果
|
||||
resultChan := make(chan interface{})
|
||||
// 添加计算结果回调
|
||||
|
||||
// 6. 注册结果回调函数
|
||||
// 当窗口触发时(每5秒),会调用这个回调函数
|
||||
// 传递聚合计算的结果
|
||||
ssql.stream.AddSink(func(result interface{}) {
|
||||
resultChan <- result
|
||||
})
|
||||
// 记录收到的结果数量
|
||||
|
||||
// 7. 结果消费者 - 处理计算结果
|
||||
// 在实际应用中,这里可能是:
|
||||
// - 发送告警通知
|
||||
// - 存储到数据库
|
||||
// - 推送到仪表板
|
||||
// - 触发下游业务逻辑
|
||||
resultCount := 0
|
||||
go func() {
|
||||
for result := range resultChan {
|
||||
//每隔5秒打印一次结果
|
||||
fmt.Printf("打印结果: [%s] %v\n", time.Now().Format("15:04:05.000"), result)
|
||||
// 每当5秒窗口结束时,会收到该窗口的聚合结果
|
||||
// 结果包含:
|
||||
// - deviceId: 设备ID
|
||||
// - avg_temp: 该设备在窗口内的平均温度
|
||||
// - min_humidity: 该设备在窗口内的最小湿度
|
||||
// - start/end: 窗口的时间范围
|
||||
fmt.Printf("窗口计算结果: [%s] %v\n", time.Now().Format("15:04:05.000"), result)
|
||||
resultCount++
|
||||
}
|
||||
}()
|
||||
//测试结束
|
||||
|
||||
// 8. 等待测试完成
|
||||
// 整个流程展示了StreamSQL的核心工作原理:
|
||||
// 数据输入 -> 过滤 -> 分组 -> 窗口聚合 -> 结果输出
|
||||
wg.Wait()
|
||||
fmt.Printf("\n测试完成,共收到 %d 个窗口结果\n", resultCount)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
+66
-174
File diff suppressed because it is too large
Load Diff
+47
-14
@@ -17,64 +17,97 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestStreamData 测试 StreamSQL 的流式数据处理功能
|
||||
// 这个测试演示了 StreamSQL 的完整工作流程:从创建实例到数据处理再到结果验证
|
||||
func TestStreamData(t *testing.T) {
|
||||
// 步骤1: 创建 StreamSQL 实例
|
||||
// StreamSQL 是流式 SQL 处理引擎的核心组件,负责管理整个流处理生命周期
|
||||
ssql := New()
|
||||
// 定义SQL语句。TumblingWindow 滚动窗口,5秒滚动一次
|
||||
|
||||
// 步骤2: 定义流式 SQL 查询语句
|
||||
// 这个 SQL 语句展示了 StreamSQL 的核心功能:
|
||||
// - SELECT: 选择要输出的字段和聚合函数
|
||||
// - FROM stream: 指定数据源为流数据
|
||||
// - WHERE: 过滤条件,排除 device3 的数据
|
||||
// - GROUP BY: 按设备ID分组,配合滚动窗口进行聚合
|
||||
// - TumblingWindow('5s'): 5秒滚动窗口,每5秒触发一次计算
|
||||
// - avg(), min(): 聚合函数,计算平均值和最小值
|
||||
// - window_start(), window_end(): 窗口函数,获取窗口的开始和结束时间
|
||||
rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," +
|
||||
"window_start() as start,window_end() as end FROM stream where deviceId!='device3' group by deviceId,TumblingWindow('5s')"
|
||||
// 根据SQL语句,创建流式分析任务。
|
||||
|
||||
// 步骤3: 执行 SQL 语句,启动流式分析任务
|
||||
// Execute 方法会解析 SQL、构建执行计划、初始化窗口管理器和聚合器
|
||||
err := ssql.Execute(rsql)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// 步骤4: 设置测试环境和并发控制
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
// 设置30秒测试超时时间
|
||||
// 设置30秒测试超时时间,防止测试无限运行
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
// 添加测试数据
|
||||
|
||||
// 步骤5: 启动数据生产者协程
|
||||
// 模拟实时数据流,持续向 StreamSQL 输入数据
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// 创建定时器,每秒触发一次数据生成
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// 生成随机测试数据,每秒生成10条数据
|
||||
// 每秒生成10条随机测试数据,模拟高频数据流
|
||||
// 这种数据密度可以测试 StreamSQL 的实时处理能力
|
||||
for i := 0; i < 10; i++ {
|
||||
// 构造设备数据,包含设备ID、温度和湿度
|
||||
randomData := map[string]interface{}{
|
||||
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1),
|
||||
"temperature": 20.0 + rand.Float64()*10, // 20-30度之间
|
||||
"humidity": 50.0 + rand.Float64()*20, // 50-70%湿度
|
||||
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1), // 随机选择 device1, device2, device3
|
||||
"temperature": 20.0 + rand.Float64()*10, // 温度范围: 20-30度
|
||||
"humidity": 50.0 + rand.Float64()*20, // 湿度范围: 50-70%
|
||||
}
|
||||
// 将数据添加到流中
|
||||
// 将数据添加到流中,触发 StreamSQL 的实时处理
|
||||
// AddData 会将数据分发到相应的窗口和聚合器中
|
||||
ssql.stream.AddData(randomData)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
// 超时或取消信号,停止数据生成
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// 步骤6: 设置结果处理管道
|
||||
resultChan := make(chan interface{})
|
||||
// 添加计算结果回调
|
||||
// 添加计算结果回调函数(Sink)
|
||||
// 当窗口触发计算时,结果会通过这个回调函数输出
|
||||
ssql.stream.AddSink(func(result interface{}) {
|
||||
resultChan <- result
|
||||
})
|
||||
// 记录收到的结果数量
|
||||
|
||||
// 步骤7: 启动结果消费者协程
|
||||
// 记录收到的结果数量,用于验证测试效果
|
||||
resultCount := 0
|
||||
go func() {
|
||||
for range resultChan {
|
||||
//每隔5秒打印一次结果
|
||||
// 每当收到一个窗口的计算结果时,计数器加1
|
||||
// 注释掉的代码可以用于调试,打印每个结果的详细信息
|
||||
//fmt.Printf("打印结果: [%s] %v\n", time.Now().Format("15:04:05.000"), result)
|
||||
resultCount++
|
||||
}
|
||||
}()
|
||||
//测试结束
|
||||
|
||||
// 步骤8: 等待测试完成
|
||||
// 等待数据生产者协程结束(30秒超时或手动取消)
|
||||
wg.Wait()
|
||||
|
||||
// 验证是否收到了结果
|
||||
// 步骤9: 验证测试结果
|
||||
// 预期在30秒内应该收到5个窗口的计算结果(每5秒一个窗口)
|
||||
// 这验证了 StreamSQL 的窗口触发机制是否正常工作
|
||||
assert.Equal(t, resultCount, 5)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user