Files

344 lines
10 KiB
Go

package main
import (
"fmt"
"math/rand"
"time"
"github.com/rulego/streamsql"
)
// 非聚合场景使用示例
// 展示StreamSQL在实时数据转换、过滤、清洗等场景中的应用
func main() {
fmt.Println("=== StreamSQL 非聚合场景演示 ===")
// 场景1: 实时数据清洗和标准化
fmt.Println("\n1. 实时数据清洗和标准化")
demonstrateDataCleaning()
// 场景2: 数据富化和计算字段
fmt.Println("\n2. 数据富化和计算字段")
demonstrateDataEnrichment()
// 场景3: 实时告警和事件过滤
fmt.Println("\n3. 实时告警和事件过滤")
demonstrateRealTimeAlerting()
// 场景4: 数据格式转换
fmt.Println("\n4. 数据格式转换")
demonstrateDataFormatConversion()
// 场景5: 基于条件的数据路由
fmt.Println("\n5. 基于条件的数据路由")
demonstrateDataRouting()
// 场景6: 嵌套字段处理
fmt.Println("\n6. 嵌套字段处理")
demonstrateNestedFieldProcessing()
fmt.Println("\n=== 演示完成 ===")
}
// 场景1: 实时数据清洗和标准化
func demonstrateDataCleaning() {
ssql := streamsql.New()
defer ssql.Stop()
// 清洗和标准化SQL
rsql := `SELECT deviceId,
UPPER(TRIM(deviceType)) as device_type,
ROUND(temperature, 2) as temperature,
COALESCE(location, 'unknown') as location,
CASE WHEN status = 1 THEN 'active'
WHEN status = 0 THEN 'inactive'
ELSE 'unknown' END as status_text
FROM stream
WHERE deviceId != '' AND temperature > -999`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
// 结果处理
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 清洗后数据: %+v\n", result)
})
// 模拟脏数据输入
dirtyData := []map[string]interface{}{
{"deviceId": "sensor001", "deviceType": " temperature ", "temperature": 25.456789, "location": "room1", "status": 1},
{"deviceId": "sensor002", "deviceType": "humidity", "temperature": 60.123, "location": nil, "status": 0},
{"deviceId": "", "deviceType": "pressure", "temperature": nil, "location": "room2", "status": 2}, // 应被过滤
{"deviceId": "sensor003", "deviceType": "TEMPERATURE", "temperature": 22.7, "location": "room3", "status": 1},
}
for _, data := range dirtyData {
ssql.Emit(data)
time.Sleep(50 * time.Millisecond)
}
time.Sleep(200 * time.Millisecond)
}
// 场景2: 数据富化和计算字段
func demonstrateDataEnrichment() {
ssql := streamsql.New()
defer ssql.Stop()
// 数据富化SQL
rsql := `SELECT *,
temperature * 1.8 + 32 as temp_fahrenheit,
CASE WHEN temperature > 30 THEN 'hot'
WHEN temperature < 15 THEN 'cold'
ELSE 'normal' END as temp_category,
CONCAT(location, '-', deviceId) as full_identifier,
NOW() as processed_timestamp,
ROUND(humidity / 100.0, 4) as humidity_ratio
FROM stream`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 富化后数据: %+v\n", result)
})
// 原始数据
rawData := []map[string]interface{}{
{"deviceId": "sensor001", "temperature": 32.5, "humidity": 65, "location": "greenhouse"},
{"deviceId": "sensor002", "temperature": 12.0, "humidity": 45, "location": "warehouse"},
{"deviceId": "sensor003", "temperature": 22.8, "humidity": 70, "location": "office"},
}
for _, data := range rawData {
ssql.Emit(data)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(200 * time.Millisecond)
}
// 场景3: 实时告警和事件过滤
func demonstrateRealTimeAlerting() {
ssql := streamsql.New()
defer ssql.Stop()
// 告警过滤SQL
rsql := `SELECT deviceId,
temperature,
humidity,
location,
'CRITICAL' as alert_level,
CASE WHEN temperature > 40 THEN 'High Temperature Alert'
WHEN temperature < 5 THEN 'Low Temperature Alert'
WHEN humidity > 90 THEN 'High Humidity Alert'
WHEN humidity < 20 THEN 'Low Humidity Alert'
ELSE 'Unknown Alert' END as alert_message,
NOW() as alert_time
FROM stream
WHERE temperature > 40 OR temperature < 5 OR humidity > 90 OR humidity < 20`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 🚨 告警事件: %+v\n", result)
})
// 模拟传感器数据(包含异常值)
sensorData := []map[string]interface{}{
{"deviceId": "sensor001", "temperature": 25.0, "humidity": 60, "location": "room1"}, // 正常
{"deviceId": "sensor002", "temperature": 45.0, "humidity": 50, "location": "room2"}, // 高温告警
{"deviceId": "sensor003", "temperature": 20.0, "humidity": 95, "location": "room3"}, // 高湿度告警
{"deviceId": "sensor004", "temperature": 2.0, "humidity": 30, "location": "room4"}, // 低温告警
{"deviceId": "sensor005", "temperature": 22.0, "humidity": 15, "location": "room5"}, // 低湿度告警
{"deviceId": "sensor006", "temperature": 24.0, "humidity": 55, "location": "room6"}, // 正常
}
for _, data := range sensorData {
ssql.Emit(data)
time.Sleep(150 * time.Millisecond)
}
time.Sleep(200 * time.Millisecond)
}
// 场景4: 数据格式转换
func demonstrateDataFormatConversion() {
ssql := streamsql.New()
defer ssql.Stop()
// 格式转换SQL
rsql := `SELECT deviceId,
CONCAT('{"device_id":"', deviceId, '","metrics":{"temp":',
CAST(temperature AS STRING), ',"hum":',
CAST(humidity AS STRING), '},"location":"',
location, '","timestamp":',
CAST(NOW() AS STRING), '}') as json_format,
CONCAT(deviceId, '|', location, '|',
CAST(temperature AS STRING), '|',
CAST(humidity AS STRING)) as csv_format
FROM stream`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 格式转换结果: %+v\n", result)
})
// 输入数据
inputData := []map[string]interface{}{
{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60, "location": "warehouse-A"},
{"deviceId": "sensor002", "temperature": 22.0, "humidity": 55, "location": "warehouse-B"},
}
for _, data := range inputData {
ssql.Emit(data)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(200 * time.Millisecond)
}
// 场景5: 基于条件的数据路由
func demonstrateDataRouting() {
ssql := streamsql.New()
defer ssql.Stop()
// 数据路由SQL
rsql := `SELECT *,
CASE WHEN deviceType = 'temperature' AND temperature > 30 THEN 'high_temp_topic'
WHEN deviceType = 'humidity' AND humidity > 80 THEN 'high_humidity_topic'
WHEN deviceType = 'pressure' THEN 'pressure_topic'
ELSE 'default_topic' END as routing_topic,
CASE WHEN temperature > 35 OR humidity > 85 THEN 'urgent'
WHEN temperature > 25 OR humidity > 70 THEN 'normal'
ELSE 'low' END as priority
FROM stream`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 路由结果: %+v\n", result)
})
// 不同类型的设备数据
deviceData := []map[string]interface{}{
{"deviceId": "temp001", "deviceType": "temperature", "temperature": 35.0, "humidity": 60},
{"deviceId": "hum001", "deviceType": "humidity", "temperature": 25.0, "humidity": 85},
{"deviceId": "press001", "deviceType": "pressure", "temperature": 22.0, "pressure": 1013.25},
{"deviceId": "temp002", "deviceType": "temperature", "temperature": 20.0, "humidity": 50},
}
for _, data := range deviceData {
ssql.Emit(data)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(200 * time.Millisecond)
}
// 场景6: 嵌套字段处理
func demonstrateNestedFieldProcessing() {
ssql := streamsql.New()
defer ssql.Stop()
// 嵌套字段处理SQL
rsql := `SELECT device.info.id as device_id,
device.info.name as device_name,
device.location.building as building,
device.location.room as room,
metrics.temperature as temp,
metrics.humidity as humidity,
CONCAT(device.location.building, '-', device.location.room, '-', device.info.id) as full_path,
CASE WHEN metrics.temperature > device.config.max_temp THEN 'OVER_LIMIT'
ELSE 'NORMAL' END as temp_status
FROM stream
WHERE device.info.type = 'sensor'`
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf(" 嵌套字段处理结果: %+v\n", result)
})
// 嵌套结构数据
nestedData := []map[string]interface{}{
{
"device": map[string]interface{}{
"info": map[string]interface{}{
"id": "sensor001",
"name": "Temperature Sensor 1",
"type": "sensor",
},
"location": map[string]interface{}{
"building": "Building-A",
"room": "Room-101",
},
"config": map[string]interface{}{
"max_temp": 30.0,
"min_temp": 10.0,
},
},
"metrics": map[string]interface{}{
"temperature": 32.5,
"humidity": 65,
},
},
{
"device": map[string]interface{}{
"info": map[string]interface{}{
"id": "sensor002",
"name": "Humidity Sensor 1",
"type": "sensor",
},
"location": map[string]interface{}{
"building": "Building-B",
"room": "Room-201",
},
"config": map[string]interface{}{
"max_temp": 25.0,
"min_temp": 15.0,
},
},
"metrics": map[string]interface{}{
"temperature": 22.0,
"humidity": 70,
},
},
}
for _, data := range nestedData {
ssql.Emit(data)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(200 * time.Millisecond)
}
// 生成随机测试数据的辅助函数
func generateRandomSensorData(deviceId string) map[string]interface{} {
return map[string]interface{}{
"deviceId": deviceId,
"temperature": 15.0 + rand.Float64()*25.0, // 15-40度
"humidity": 30.0 + rand.Float64()*40.0, // 30-70%
"location": fmt.Sprintf("room%d", rand.Intn(10)+1),
"timestamp": time.Now().Unix(),
}
}