forked from GiteaTest2015/streamsql
344 lines
10 KiB
Go
344 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"time"
|
|
|
|
"github.com/rulego/streamsql"
|
|
)
|
|
|
|
// 非聚合场景使用示例
|
|
// 展示StreamSQL在实时数据转换、过滤、清洗等场景中的应用
|
|
func main() {
|
|
fmt.Println("=== StreamSQL 非聚合场景演示 ===")
|
|
|
|
// 场景1: 实时数据清洗和标准化
|
|
fmt.Println("\n1. 实时数据清洗和标准化")
|
|
demonstrateDataCleaning()
|
|
|
|
// 场景2: 数据富化和计算字段
|
|
fmt.Println("\n2. 数据富化和计算字段")
|
|
demonstrateDataEnrichment()
|
|
|
|
// 场景3: 实时告警和事件过滤
|
|
fmt.Println("\n3. 实时告警和事件过滤")
|
|
demonstrateRealTimeAlerting()
|
|
|
|
// 场景4: 数据格式转换
|
|
fmt.Println("\n4. 数据格式转换")
|
|
demonstrateDataFormatConversion()
|
|
|
|
// 场景5: 基于条件的数据路由
|
|
fmt.Println("\n5. 基于条件的数据路由")
|
|
demonstrateDataRouting()
|
|
|
|
// 场景6: 嵌套字段处理
|
|
fmt.Println("\n6. 嵌套字段处理")
|
|
demonstrateNestedFieldProcessing()
|
|
|
|
fmt.Println("\n=== 演示完成 ===")
|
|
}
|
|
|
|
// 场景1: 实时数据清洗和标准化
|
|
func demonstrateDataCleaning() {
|
|
ssql := streamsql.New()
|
|
defer ssql.Stop()
|
|
|
|
// 清洗和标准化SQL
|
|
rsql := `SELECT deviceId,
|
|
UPPER(TRIM(deviceType)) as device_type,
|
|
ROUND(temperature, 2) as temperature,
|
|
COALESCE(location, 'unknown') as location,
|
|
CASE WHEN status = 1 THEN 'active'
|
|
WHEN status = 0 THEN 'inactive'
|
|
ELSE 'unknown' END as status_text
|
|
FROM stream
|
|
WHERE deviceId != '' AND temperature > -999`
|
|
|
|
err := ssql.Execute(rsql)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// 结果处理
|
|
ssql.AddSink(func(result []map[string]interface{}) {
|
|
fmt.Printf(" 清洗后数据: %+v\n", result)
|
|
})
|
|
|
|
// 模拟脏数据输入
|
|
dirtyData := []map[string]interface{}{
|
|
{"deviceId": "sensor001", "deviceType": " temperature ", "temperature": 25.456789, "location": "room1", "status": 1},
|
|
{"deviceId": "sensor002", "deviceType": "humidity", "temperature": 60.123, "location": nil, "status": 0},
|
|
{"deviceId": "", "deviceType": "pressure", "temperature": nil, "location": "room2", "status": 2}, // 应被过滤
|
|
{"deviceId": "sensor003", "deviceType": "TEMPERATURE", "temperature": 22.7, "location": "room3", "status": 1},
|
|
}
|
|
|
|
for _, data := range dirtyData {
|
|
ssql.Emit(data)
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
|
|
// 场景2: 数据富化和计算字段
|
|
func demonstrateDataEnrichment() {
|
|
ssql := streamsql.New()
|
|
defer ssql.Stop()
|
|
|
|
// 数据富化SQL
|
|
rsql := `SELECT *,
|
|
temperature * 1.8 + 32 as temp_fahrenheit,
|
|
CASE WHEN temperature > 30 THEN 'hot'
|
|
WHEN temperature < 15 THEN 'cold'
|
|
ELSE 'normal' END as temp_category,
|
|
CONCAT(location, '-', deviceId) as full_identifier,
|
|
NOW() as processed_timestamp,
|
|
ROUND(humidity / 100.0, 4) as humidity_ratio
|
|
FROM stream`
|
|
|
|
err := ssql.Execute(rsql)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ssql.AddSink(func(result []map[string]interface{}) {
|
|
fmt.Printf(" 富化后数据: %+v\n", result)
|
|
})
|
|
|
|
// 原始数据
|
|
rawData := []map[string]interface{}{
|
|
{"deviceId": "sensor001", "temperature": 32.5, "humidity": 65, "location": "greenhouse"},
|
|
{"deviceId": "sensor002", "temperature": 12.0, "humidity": 45, "location": "warehouse"},
|
|
{"deviceId": "sensor003", "temperature": 22.8, "humidity": 70, "location": "office"},
|
|
}
|
|
|
|
for _, data := range rawData {
|
|
ssql.Emit(data)
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
|
|
// 场景3: 实时告警和事件过滤
|
|
func demonstrateRealTimeAlerting() {
|
|
ssql := streamsql.New()
|
|
defer ssql.Stop()
|
|
|
|
// 告警过滤SQL
|
|
rsql := `SELECT deviceId,
|
|
temperature,
|
|
humidity,
|
|
location,
|
|
'CRITICAL' as alert_level,
|
|
CASE WHEN temperature > 40 THEN 'High Temperature Alert'
|
|
WHEN temperature < 5 THEN 'Low Temperature Alert'
|
|
WHEN humidity > 90 THEN 'High Humidity Alert'
|
|
WHEN humidity < 20 THEN 'Low Humidity Alert'
|
|
ELSE 'Unknown Alert' END as alert_message,
|
|
NOW() as alert_time
|
|
FROM stream
|
|
WHERE temperature > 40 OR temperature < 5 OR humidity > 90 OR humidity < 20`
|
|
|
|
err := ssql.Execute(rsql)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ssql.AddSink(func(result []map[string]interface{}) {
|
|
fmt.Printf(" 🚨 告警事件: %+v\n", result)
|
|
})
|
|
|
|
// 模拟传感器数据(包含异常值)
|
|
sensorData := []map[string]interface{}{
|
|
{"deviceId": "sensor001", "temperature": 25.0, "humidity": 60, "location": "room1"}, // 正常
|
|
{"deviceId": "sensor002", "temperature": 45.0, "humidity": 50, "location": "room2"}, // 高温告警
|
|
{"deviceId": "sensor003", "temperature": 20.0, "humidity": 95, "location": "room3"}, // 高湿度告警
|
|
{"deviceId": "sensor004", "temperature": 2.0, "humidity": 30, "location": "room4"}, // 低温告警
|
|
{"deviceId": "sensor005", "temperature": 22.0, "humidity": 15, "location": "room5"}, // 低湿度告警
|
|
{"deviceId": "sensor006", "temperature": 24.0, "humidity": 55, "location": "room6"}, // 正常
|
|
}
|
|
|
|
for _, data := range sensorData {
|
|
ssql.Emit(data)
|
|
time.Sleep(150 * time.Millisecond)
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
|
|
// 场景4: 数据格式转换
|
|
func demonstrateDataFormatConversion() {
|
|
ssql := streamsql.New()
|
|
defer ssql.Stop()
|
|
|
|
// 格式转换SQL
|
|
rsql := `SELECT deviceId,
|
|
CONCAT('{"device_id":"', deviceId, '","metrics":{"temp":',
|
|
CAST(temperature AS STRING), ',"hum":',
|
|
CAST(humidity AS STRING), '},"location":"',
|
|
location, '","timestamp":',
|
|
CAST(NOW() AS STRING), '}') as json_format,
|
|
CONCAT(deviceId, '|', location, '|',
|
|
CAST(temperature AS STRING), '|',
|
|
CAST(humidity AS STRING)) as csv_format
|
|
FROM stream`
|
|
|
|
err := ssql.Execute(rsql)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ssql.AddSink(func(result []map[string]interface{}) {
|
|
fmt.Printf(" 格式转换结果: %+v\n", result)
|
|
})
|
|
|
|
// 输入数据
|
|
inputData := []map[string]interface{}{
|
|
{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60, "location": "warehouse-A"},
|
|
{"deviceId": "sensor002", "temperature": 22.0, "humidity": 55, "location": "warehouse-B"},
|
|
}
|
|
|
|
for _, data := range inputData {
|
|
ssql.Emit(data)
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
|
|
// 场景5: 基于条件的数据路由
|
|
func demonstrateDataRouting() {
|
|
ssql := streamsql.New()
|
|
defer ssql.Stop()
|
|
|
|
// 数据路由SQL
|
|
rsql := `SELECT *,
|
|
CASE WHEN deviceType = 'temperature' AND temperature > 30 THEN 'high_temp_topic'
|
|
WHEN deviceType = 'humidity' AND humidity > 80 THEN 'high_humidity_topic'
|
|
WHEN deviceType = 'pressure' THEN 'pressure_topic'
|
|
ELSE 'default_topic' END as routing_topic,
|
|
CASE WHEN temperature > 35 OR humidity > 85 THEN 'urgent'
|
|
WHEN temperature > 25 OR humidity > 70 THEN 'normal'
|
|
ELSE 'low' END as priority
|
|
FROM stream`
|
|
|
|
err := ssql.Execute(rsql)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ssql.AddSink(func(result []map[string]interface{}) {
|
|
fmt.Printf(" 路由结果: %+v\n", result)
|
|
})
|
|
|
|
// 不同类型的设备数据
|
|
deviceData := []map[string]interface{}{
|
|
{"deviceId": "temp001", "deviceType": "temperature", "temperature": 35.0, "humidity": 60},
|
|
{"deviceId": "hum001", "deviceType": "humidity", "temperature": 25.0, "humidity": 85},
|
|
{"deviceId": "press001", "deviceType": "pressure", "temperature": 22.0, "pressure": 1013.25},
|
|
{"deviceId": "temp002", "deviceType": "temperature", "temperature": 20.0, "humidity": 50},
|
|
}
|
|
|
|
for _, data := range deviceData {
|
|
ssql.Emit(data)
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
|
|
// 场景6: 嵌套字段处理
|
|
func demonstrateNestedFieldProcessing() {
|
|
ssql := streamsql.New()
|
|
defer ssql.Stop()
|
|
|
|
// 嵌套字段处理SQL
|
|
rsql := `SELECT device.info.id as device_id,
|
|
device.info.name as device_name,
|
|
device.location.building as building,
|
|
device.location.room as room,
|
|
metrics.temperature as temp,
|
|
metrics.humidity as humidity,
|
|
CONCAT(device.location.building, '-', device.location.room, '-', device.info.id) as full_path,
|
|
CASE WHEN metrics.temperature > device.config.max_temp THEN 'OVER_LIMIT'
|
|
ELSE 'NORMAL' END as temp_status
|
|
FROM stream
|
|
WHERE device.info.type = 'sensor'`
|
|
|
|
err := ssql.Execute(rsql)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ssql.AddSink(func(result []map[string]interface{}) {
|
|
fmt.Printf(" 嵌套字段处理结果: %+v\n", result)
|
|
})
|
|
|
|
// 嵌套结构数据
|
|
nestedData := []map[string]interface{}{
|
|
{
|
|
"device": map[string]interface{}{
|
|
"info": map[string]interface{}{
|
|
"id": "sensor001",
|
|
"name": "Temperature Sensor 1",
|
|
"type": "sensor",
|
|
},
|
|
"location": map[string]interface{}{
|
|
"building": "Building-A",
|
|
"room": "Room-101",
|
|
},
|
|
"config": map[string]interface{}{
|
|
"max_temp": 30.0,
|
|
"min_temp": 10.0,
|
|
},
|
|
},
|
|
"metrics": map[string]interface{}{
|
|
"temperature": 32.5,
|
|
"humidity": 65,
|
|
},
|
|
},
|
|
{
|
|
"device": map[string]interface{}{
|
|
"info": map[string]interface{}{
|
|
"id": "sensor002",
|
|
"name": "Humidity Sensor 1",
|
|
"type": "sensor",
|
|
},
|
|
"location": map[string]interface{}{
|
|
"building": "Building-B",
|
|
"room": "Room-201",
|
|
},
|
|
"config": map[string]interface{}{
|
|
"max_temp": 25.0,
|
|
"min_temp": 15.0,
|
|
},
|
|
},
|
|
"metrics": map[string]interface{}{
|
|
"temperature": 22.0,
|
|
"humidity": 70,
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, data := range nestedData {
|
|
ssql.Emit(data)
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
|
|
// 生成随机测试数据的辅助函数
|
|
func generateRandomSensorData(deviceId string) map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"deviceId": deviceId,
|
|
"temperature": 15.0 + rand.Float64()*25.0, // 15-40度
|
|
"humidity": 30.0 + rand.Float64()*40.0, // 30-70%
|
|
"location": fmt.Sprintf("room%d", rand.Intn(10)+1),
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
}
|