Files

429 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"fmt"
"math"
"math/rand"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/functions"
"github.com/rulego/streamsql/utils/cast"
)
func main() {
fmt.Println("🚀 StreamSQL 综合测试演示")
fmt.Println("=============================")
// 注册自定义函数
registerCustomFunctions()
// 运行各种测试场景
runAllTests()
fmt.Println("\n✅ 所有测试完成!")
}
// 注册自定义函数
func registerCustomFunctions() {
fmt.Println("\n📋 注册自定义函数...")
// 数学函数:平方
err := functions.RegisterCustomFunction(
"square",
functions.TypeMath,
"数学函数",
"计算平方",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
val := cast.ToFloat64(args[0])
return val * val, nil
},
)
if err != nil {
fmt.Printf("❌ 注册square函数失败: %v\n", err)
} else {
fmt.Println(" ✓ 注册数学函数: square")
}
// 华氏度转摄氏度函数
err = functions.RegisterCustomFunction(
"f_to_c",
functions.TypeConversion,
"温度转换",
"华氏度转摄氏度",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
fahrenheit := cast.ToFloat64(args[0])
celsius := (fahrenheit - 32) * 5 / 9
return celsius, nil
},
)
if err != nil {
fmt.Printf("❌ 注册f_to_c函数失败: %v\n", err)
} else {
fmt.Println(" ✓ 注册转换函数: f_to_c")
}
// 圆面积计算函数
err = functions.RegisterCustomFunction(
"circle_area",
functions.TypeMath,
"几何计算",
"计算圆的面积",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
radius := cast.ToFloat64(args[0])
if radius < 0 {
return nil, fmt.Errorf("半径必须为正数")
}
area := math.Pi * radius * radius
return area, nil
},
)
if err != nil {
fmt.Printf("❌ 注册circle_area函数失败: %v\n", err)
} else {
fmt.Println(" ✓ 注册几何函数: circle_area")
}
}
// 运行所有测试
func runAllTests() {
// 测试1基础数据过滤
testBasicFiltering()
// 测试2聚合分析
testAggregation()
// 测试3滑动窗口
testSlidingWindow()
// 测试4嵌套字段访问
testNestedFields()
// 测试5自定义函数
testCustomFunctions()
// 测试6复杂查询
testComplexQuery()
}
// 测试1基础数据过滤
func testBasicFiltering() {
fmt.Println("\n🔍 测试1基础数据过滤")
fmt.Println("========================")
ssql := streamsql.New()
defer ssql.Stop()
// 过滤温度大于25度的数据
sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 25"
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ SQL执行失败: %v\n", err)
return
}
// 添加结果处理函数
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 高温告警: %v\n", result)
})
// 发送测试数据
testData := []map[string]interface{}{
{"deviceId": "sensor001", "temperature": 23.5}, // 不会触发告警
{"deviceId": "sensor002", "temperature": 28.3}, // 会触发告警
{"deviceId": "sensor003", "temperature": 31.2}, // 会触发告警
{"deviceId": "sensor004", "temperature": 22.1}, // 不会触发告警
}
for _, data := range testData {
ssql.Emit(data)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
fmt.Println(" ✅ 基础过滤测试完成")
}
// 测试2聚合分析
func testAggregation() {
fmt.Println("\n📈 测试2聚合分析")
fmt.Println("==================")
ssql := streamsql.New()
defer ssql.Stop()
// 每2秒计算一次各设备的平均温度
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
COUNT(*) as sample_count,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp
FROM stream
GROUP BY deviceId, TumblingWindow('2s')`
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ SQL执行失败: %v\n", err)
return
}
// 处理聚合结果
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 聚合结果: %v\n", result)
})
// 模拟传感器数据流
devices := []string{"sensor001", "sensor002", "sensor003"}
for i := 0; i < 8; i++ {
for _, device := range devices {
data := map[string]interface{}{
"deviceId": device,
"temperature": 20.0 + rand.Float64()*15, // 20-35度随机温度
"timestamp": time.Now(),
}
ssql.Emit(data)
}
time.Sleep(300 * time.Millisecond)
}
// 等待窗口触发
time.Sleep(2 * time.Second)
ssql.Stream().Window.Trigger()
time.Sleep(500 * time.Millisecond)
fmt.Println(" ✅ 聚合分析测试完成")
}
// 测试3滑动窗口
func testSlidingWindow() {
fmt.Println("\n🔄 测试3滑动窗口")
fmt.Println("==================")
ssql := streamsql.New()
defer ssql.Stop()
// 6秒滑动窗口每2秒滑动一次
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp,
COUNT(*) as count
FROM stream
WHERE temperature > 0
GROUP BY deviceId, SlidingWindow('6s', '2s')`
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ SQL执行失败: %v\n", err)
return
}
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 滑动窗口分析: %v\n", result)
})
// 持续发送数据
for i := 0; i < 10; i++ {
data := map[string]interface{}{
"deviceId": "sensor001",
"temperature": 20.0 + rand.Float64()*10,
"timestamp": time.Now(),
}
ssql.Emit(data)
time.Sleep(800 * time.Millisecond)
}
time.Sleep(1 * time.Second)
fmt.Println(" ✅ 滑动窗口测试完成")
}
// 测试4嵌套字段访问
func testNestedFields() {
fmt.Println("\n🔧 测试4嵌套字段访问")
fmt.Println("=======================")
ssql := streamsql.New()
defer ssql.Stop()
// 访问嵌套字段的SQL查询
sql := `SELECT device.info.name as device_name,
device.location.building as building,
sensor.temperature as temp,
UPPER(device.info.type) as device_type
FROM stream
WHERE sensor.temperature > 25 AND device.info.status = 'active'`
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ SQL执行失败: %v\n", err)
return
}
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 嵌套字段结果: %v\n", result)
})
// 发送嵌套结构数据
complexData := []map[string]interface{}{
{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "温度传感器001",
"type": "temperature",
"status": "active",
},
"location": map[string]interface{}{
"building": "A栋",
"floor": "3F",
},
},
"sensor": map[string]interface{}{
"temperature": 28.5,
"humidity": 65.0,
},
},
{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "湿度传感器002",
"type": "humidity",
"status": "inactive", // 不会匹配
},
"location": map[string]interface{}{
"building": "B栋",
"floor": "2F",
},
},
"sensor": map[string]interface{}{
"temperature": 30.0,
"humidity": 70.0,
},
},
}
for _, data := range complexData {
ssql.Emit(data)
time.Sleep(200 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
fmt.Println(" ✅ 嵌套字段测试完成")
}
// 测试5自定义函数
func testCustomFunctions() {
fmt.Println("\n🎯 测试5自定义函数")
fmt.Println("====================")
ssql := streamsql.New()
defer ssql.Stop()
// 使用自定义函数的SQL查询
sql := `SELECT
device,
square(value) as squared_value,
f_to_c(temperature) as celsius,
circle_area(radius) as area
FROM stream
WHERE value > 0`
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ SQL执行失败: %v\n", err)
return
}
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 自定义函数结果: %v\n", result)
})
// 添加测试数据
testData := []map[string]interface{}{
{
"device": "sensor1",
"value": 5.0,
"temperature": 68.0, // 华氏度
"radius": 3.0,
},
{
"device": "sensor2",
"value": 10.0,
"temperature": 86.0, // 华氏度
"radius": 2.5,
},
{
"device": "sensor3",
"value": 0.0, // 不会匹配WHERE条件
"temperature": 32.0,
"radius": 1.0,
},
}
for _, data := range testData {
ssql.Emit(data)
time.Sleep(200 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
fmt.Println(" ✅ 自定义函数测试完成")
}
// 测试6复杂查询
func testComplexQuery() {
fmt.Println("\n🔬 测试6复杂查询")
fmt.Println("==================")
ssql := streamsql.New()
defer ssql.Stop()
// 复杂的聚合查询,结合自定义函数和嵌套字段
sql := `SELECT
device.location as location,
AVG(square(sensor.temperature)) as avg_temp_squared,
MAX(f_to_c(sensor.temperature)) as max_celsius,
COUNT(*) as sample_count,
SUM(circle_area(device.radius)) as total_area
FROM stream
WHERE sensor.temperature > 20 AND device.status = 'online'
GROUP BY device.location, TumblingWindow('3s')`
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ SQL执行失败: %v\n", err)
return
}
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 📊 复杂查询结果: %v\n", result)
})
// 发送复杂测试数据
locations := []string{"room-A", "room-B", "room-C"}
for i := 0; i < 12; i++ {
location := locations[i%len(locations)]
data := map[string]interface{}{
"device": map[string]interface{}{
"location": location,
"status": "online",
"radius": 1.0 + rand.Float64()*2.0, // 1-3的随机半径
},
"sensor": map[string]interface{}{
"temperature": 25.0 + rand.Float64()*10.0, // 25-35度
"humidity": 50.0 + rand.Float64()*30.0, // 50-80%
},
"timestamp": time.Now(),
}
ssql.Emit(data)
time.Sleep(300 * time.Millisecond)
}
// 等待窗口触发
time.Sleep(3 * time.Second)
ssql.Stream().Window.Trigger()
time.Sleep(500 * time.Millisecond)
fmt.Println(" ✅ 复杂查询测试完成")
}