mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-12 21:37:17 +00:00
432 lines
12 KiB
Go
432 lines
12 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"math/rand"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/rulego/streamsql"
|
||
)
|
||
|
||
func main() {
|
||
fmt.Println("🔧 StreamSQL 复杂嵌套字段访问功能演示")
|
||
fmt.Println("=======================================")
|
||
|
||
// 创建 StreamSQL 实例
|
||
ssql := streamsql.New()
|
||
defer ssql.Stop()
|
||
|
||
// 演示1: 数组索引访问
|
||
fmt.Println("\n📊 演示1: 数组索引访问")
|
||
demonstrateArrayAccess(ssql)
|
||
|
||
// 演示2: Map键访问
|
||
fmt.Println("\n🗝️ 演示2: Map键访问")
|
||
demonstrateMapKeyAccess(ssql)
|
||
|
||
// 演示3: 混合复杂访问
|
||
fmt.Println("\n🔄 演示3: 混合复杂访问")
|
||
demonstrateComplexMixedAccess(ssql)
|
||
|
||
// 演示4: 负数索引访问
|
||
fmt.Println("\n⬅️ 演示4: 负数索引访问")
|
||
demonstrateNegativeIndexAccess(ssql)
|
||
|
||
// 演示5: 数组索引聚合计算
|
||
fmt.Println("\n📈 演示5: 数组索引聚合计算")
|
||
demonstrateArrayIndexAggregation(ssql)
|
||
|
||
fmt.Println("\n✅ 演示完成!")
|
||
}
|
||
|
||
// 演示数组索引访问
|
||
func demonstrateArrayAccess(ssql *streamsql.Streamsql) {
|
||
// SQL查询:提取数组中的特定元素
|
||
rsql := `SELECT device,
|
||
sensors[0].temperature as first_sensor_temp,
|
||
sensors[1].humidity as second_sensor_humidity,
|
||
data[2] as third_data_item
|
||
FROM stream`
|
||
|
||
err := ssql.Execute(rsql)
|
||
if err != nil {
|
||
fmt.Printf("❌ SQL执行失败: %v\n", err)
|
||
return
|
||
}
|
||
|
||
// 准备测试数据
|
||
testData := []map[string]interface{}{
|
||
{
|
||
"device": "工业传感器-001",
|
||
"sensors": []interface{}{
|
||
map[string]interface{}{"temperature": 25.5, "humidity": 60.2},
|
||
map[string]interface{}{"temperature": 26.8, "humidity": 58.7},
|
||
map[string]interface{}{"temperature": 24.1, "humidity": 62.1},
|
||
},
|
||
"data": []interface{}{"status_ok", "battery_95%", "signal_strong", "location_A1"},
|
||
"timestamp": time.Now().Unix(),
|
||
},
|
||
{
|
||
"device": "环境监测器-002",
|
||
"sensors": []interface{}{
|
||
map[string]interface{}{"temperature": 22.3, "humidity": 65.8},
|
||
map[string]interface{}{"temperature": 23.1, "humidity": 63.2},
|
||
},
|
||
"data": []interface{}{"status_warning", "battery_78%", "signal_weak"},
|
||
"timestamp": time.Now().Unix(),
|
||
},
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
wg.Add(1)
|
||
|
||
// 设置结果回调
|
||
ssql.AddSink(func(result []map[string]interface{}) {
|
||
defer wg.Done()
|
||
|
||
fmt.Println(" 📋 数组索引访问结果:")
|
||
for i, item := range result {
|
||
fmt.Printf(" 记录 %d:\n", i+1)
|
||
fmt.Printf(" 设备: %v\n", item["device"])
|
||
fmt.Printf(" 第一个传感器温度: %v°C\n", item["first_sensor_temp"])
|
||
fmt.Printf(" 第二个传感器湿度: %v%%\n", item["second_sensor_humidity"])
|
||
fmt.Printf(" 第三个数据项: %v\n", item["third_data_item"])
|
||
fmt.Println()
|
||
}
|
||
})
|
||
|
||
// 添加测试数据
|
||
for _, data := range testData {
|
||
ssql.Emit(data)
|
||
}
|
||
|
||
// 等待结果
|
||
wg.Wait()
|
||
}
|
||
|
||
// 演示Map键访问
|
||
func demonstrateMapKeyAccess(ssql *streamsql.Streamsql) {
|
||
// SQL查询:使用字符串键访问Map数据
|
||
rsql := `SELECT device_id,
|
||
config['host'] as server_host,
|
||
config["port"] as server_port,
|
||
settings['enable_ssl'] as ssl_enabled,
|
||
metadata["version"] as app_version
|
||
FROM stream`
|
||
|
||
err := ssql.Execute(rsql)
|
||
if err != nil {
|
||
fmt.Printf("❌ SQL执行失败: %v\n", err)
|
||
return
|
||
}
|
||
|
||
// 准备测试数据
|
||
testData := []map[string]interface{}{
|
||
{
|
||
"device_id": "gateway-001",
|
||
"config": map[string]interface{}{
|
||
"host": "192.168.1.100",
|
||
"port": 8080,
|
||
"protocol": "https",
|
||
},
|
||
"settings": map[string]interface{}{
|
||
"enable_ssl": true,
|
||
"timeout": 30,
|
||
"max_retries": 3,
|
||
},
|
||
"metadata": map[string]interface{}{
|
||
"version": "v2.1.3",
|
||
"build_date": "2023-12-01",
|
||
"vendor": "TechCorp",
|
||
},
|
||
},
|
||
{
|
||
"device_id": "gateway-002",
|
||
"config": map[string]interface{}{
|
||
"host": "192.168.1.101",
|
||
"port": 8443,
|
||
"protocol": "https",
|
||
},
|
||
"settings": map[string]interface{}{
|
||
"enable_ssl": false,
|
||
"timeout": 60,
|
||
"max_retries": 5,
|
||
},
|
||
"metadata": map[string]interface{}{
|
||
"version": "v2.0.8",
|
||
"build_date": "2023-11-15",
|
||
"vendor": "TechCorp",
|
||
},
|
||
},
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
wg.Add(1)
|
||
|
||
// 设置结果回调
|
||
ssql.AddSink(func(result []map[string]interface{}) {
|
||
defer wg.Done()
|
||
|
||
fmt.Println(" 🗝️ Map键访问结果:")
|
||
resultSlice := result
|
||
for i, item := range resultSlice {
|
||
fmt.Printf(" 记录 %d:\n", i+1)
|
||
fmt.Printf(" 设备ID: %v\n", item["device_id"])
|
||
fmt.Printf(" 服务器主机: %v\n", item["server_host"])
|
||
fmt.Printf(" 服务器端口: %v\n", item["server_port"])
|
||
fmt.Printf(" SSL启用: %v\n", item["ssl_enabled"])
|
||
fmt.Printf(" 应用版本: %v\n", item["app_version"])
|
||
fmt.Println()
|
||
}
|
||
})
|
||
|
||
// 添加测试数据
|
||
for _, data := range testData {
|
||
ssql.Emit(data)
|
||
}
|
||
|
||
// 等待结果
|
||
wg.Wait()
|
||
}
|
||
|
||
// 演示混合复杂访问
|
||
func demonstrateComplexMixedAccess(ssql *streamsql.Streamsql) {
|
||
// SQL查询:混合使用数组索引、Map键和嵌套字段访问
|
||
rsql := `SELECT building,
|
||
floors[0].rooms[2]['name'] as first_floor_room3_name,
|
||
floors[1].sensors[0].readings['temperature'] as second_floor_first_sensor_temp,
|
||
metadata.building_info['architect'] as building_architect,
|
||
alerts[-1].message as latest_alert
|
||
FROM stream`
|
||
|
||
err := ssql.Execute(rsql)
|
||
if err != nil {
|
||
fmt.Printf("❌ SQL执行失败: %v\n", err)
|
||
return
|
||
}
|
||
|
||
// 准备复杂嵌套数据
|
||
testData := map[string]interface{}{
|
||
"building": "智能大厦A座",
|
||
"floors": []interface{}{
|
||
// 第一层
|
||
map[string]interface{}{
|
||
"floor_number": 1,
|
||
"rooms": []interface{}{
|
||
map[string]interface{}{"name": "大厅", "type": "public"},
|
||
map[string]interface{}{"name": "接待室", "type": "office"},
|
||
map[string]interface{}{"name": "会议室A", "type": "meeting"},
|
||
map[string]interface{}{"name": "休息区", "type": "lounge"},
|
||
},
|
||
},
|
||
// 第二层
|
||
map[string]interface{}{
|
||
"floor_number": 2,
|
||
"sensors": []interface{}{
|
||
map[string]interface{}{
|
||
"id": "sensor-201",
|
||
"readings": map[string]interface{}{
|
||
"temperature": 23.5,
|
||
"humidity": 58.2,
|
||
"co2": 420,
|
||
},
|
||
},
|
||
map[string]interface{}{
|
||
"id": "sensor-202",
|
||
"readings": map[string]interface{}{
|
||
"temperature": 24.1,
|
||
"humidity": 60.8,
|
||
"co2": 380,
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"metadata": map[string]interface{}{
|
||
"building_info": map[string]interface{}{
|
||
"architect": "张建筑师",
|
||
"year_built": 2020,
|
||
"total_floors": 25,
|
||
},
|
||
"owner": "科技园管委会",
|
||
},
|
||
"alerts": []interface{}{
|
||
map[string]interface{}{"level": "info", "message": "系统启动完成"},
|
||
map[string]interface{}{"level": "warning", "message": "传感器信号弱"},
|
||
map[string]interface{}{"level": "info", "message": "定期维护提醒"},
|
||
},
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
wg.Add(1)
|
||
|
||
// 设置结果回调
|
||
ssql.AddSink(func(result []map[string]interface{}) {
|
||
defer wg.Done()
|
||
|
||
fmt.Println(" 🔄 混合复杂访问结果:")
|
||
resultSlice := result
|
||
for i, item := range resultSlice {
|
||
fmt.Printf(" 记录 %d:\n", i+1)
|
||
fmt.Printf(" 建筑: %v\n", item["building"])
|
||
fmt.Printf(" 一层第3个房间: %v\n", item["first_floor_room3_name"])
|
||
fmt.Printf(" 二层第1个传感器温度: %v°C\n", item["second_floor_first_sensor_temp"])
|
||
fmt.Printf(" 建筑师: %v\n", item["building_architect"])
|
||
fmt.Printf(" 最新警报: %v\n", item["latest_alert"])
|
||
fmt.Println()
|
||
}
|
||
})
|
||
|
||
// 添加数据
|
||
ssql.Emit(testData)
|
||
|
||
// 等待结果
|
||
wg.Wait()
|
||
}
|
||
|
||
// 演示负数索引访问
|
||
func demonstrateNegativeIndexAccess(ssql *streamsql.Streamsql) {
|
||
// SQL查询:使用负数索引访问数组末尾元素
|
||
rsql := `SELECT device_name,
|
||
readings[-1] as latest_reading,
|
||
history[-2] as second_last_event,
|
||
tags[-1] as last_tag
|
||
FROM stream`
|
||
|
||
err := ssql.Execute(rsql)
|
||
if err != nil {
|
||
fmt.Printf("❌ SQL执行失败: %v\n", err)
|
||
return
|
||
}
|
||
|
||
// 准备测试数据
|
||
testData := []map[string]interface{}{
|
||
{
|
||
"device_name": "温度监测器-Alpha",
|
||
"readings": []interface{}{18.5, 19.2, 20.1, 21.3, 22.8, 23.5}, // [-1] = 23.5
|
||
"history": []interface{}{"boot", "calibration", "running", "alert", "resolved"}, // [-2] = "alert"
|
||
"tags": []interface{}{"indoor", "critical", "monitored"}, // [-1] = "monitored"
|
||
},
|
||
{
|
||
"device_name": "湿度传感器-Beta",
|
||
"readings": []interface{}{45.2, 47.8, 52.1, 48.9}, // [-1] = 48.9
|
||
"history": []interface{}{"init", "testing", "deployed"}, // [-2] = "testing"
|
||
"tags": []interface{}{"outdoor", "backup"}, // [-1] = "backup"
|
||
},
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
wg.Add(1)
|
||
|
||
// 设置结果回调
|
||
ssql.AddSink(func(result []map[string]interface{}) {
|
||
defer wg.Done()
|
||
|
||
fmt.Println(" ⬅️ 负数索引访问结果:")
|
||
resultSlice := result
|
||
for i, item := range resultSlice {
|
||
fmt.Printf(" 记录 %d:\n", i+1)
|
||
fmt.Printf(" 设备名称: %v\n", item["device_name"])
|
||
fmt.Printf(" 最新读数: %v\n", item["latest_reading"])
|
||
fmt.Printf(" 倒数第二个事件: %v\n", item["second_last_event"])
|
||
fmt.Printf(" 最后一个标签: %v\n", item["last_tag"])
|
||
fmt.Println()
|
||
}
|
||
})
|
||
|
||
// 添加测试数据
|
||
for _, data := range testData {
|
||
ssql.Emit(data)
|
||
}
|
||
|
||
// 等待结果
|
||
wg.Wait()
|
||
}
|
||
|
||
// 演示数组索引聚合计算
|
||
func demonstrateArrayIndexAggregation(ssql *streamsql.Streamsql) {
|
||
// SQL查询:对数组中特定位置的数据进行聚合计算
|
||
rsql := `SELECT location,
|
||
AVG(sensors[0].temperature) as avg_first_sensor_temp,
|
||
MAX(sensors[1].humidity) as max_second_sensor_humidity,
|
||
COUNT(*) as device_count
|
||
FROM stream
|
||
GROUP BY location, TumblingWindow('2s')
|
||
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
|
||
|
||
err := ssql.Execute(rsql)
|
||
if err != nil {
|
||
fmt.Printf("❌ SQL执行失败: %v\n", err)
|
||
return
|
||
}
|
||
|
||
var resultCount int
|
||
var wg sync.WaitGroup
|
||
wg.Add(1)
|
||
|
||
// 设置结果回调
|
||
ssql.AddSink(func(result []map[string]interface{}) {
|
||
defer wg.Done()
|
||
|
||
fmt.Println(" 📈 数组索引聚合计算结果:")
|
||
resultSlice := result
|
||
for i, item := range resultSlice {
|
||
resultCount++
|
||
fmt.Printf(" 聚合结果 %d:\n", i+1)
|
||
fmt.Printf(" 位置: %v\n", item["location"])
|
||
fmt.Printf(" 第一个传感器平均温度: %.2f°C\n", item["avg_first_sensor_temp"])
|
||
fmt.Printf(" 第二个传感器最大湿度: %.1f%%\n", item["max_second_sensor_humidity"])
|
||
fmt.Printf(" 设备数量: %v\n", item["device_count"])
|
||
fmt.Println()
|
||
}
|
||
})
|
||
|
||
// 生成模拟数据
|
||
locations := []string{"车间A", "车间B", "车间C"}
|
||
|
||
go func() {
|
||
for i := 0; i < 12; i++ {
|
||
location := locations[rand.Intn(len(locations))]
|
||
|
||
data := map[string]interface{}{
|
||
"device_id": fmt.Sprintf("device-%03d", i+1),
|
||
"location": location,
|
||
"sensors": []interface{}{
|
||
map[string]interface{}{
|
||
"temperature": 20.0 + rand.Float64()*10.0, // 20-30°C
|
||
"humidity": 50.0 + rand.Float64()*20.0, // 50-70%
|
||
},
|
||
map[string]interface{}{
|
||
"temperature": 18.0 + rand.Float64()*12.0, // 18-30°C
|
||
"humidity": 45.0 + rand.Float64()*25.0, // 45-70%
|
||
},
|
||
},
|
||
"timestamp": time.Now().Unix(),
|
||
}
|
||
|
||
ssql.Emit(data)
|
||
time.Sleep(200 * time.Millisecond) // 每200ms发送一条数据
|
||
}
|
||
}()
|
||
|
||
// 等待聚合结果
|
||
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
|
||
defer cancel()
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
fmt.Println(" ⏰ 聚合计算超时")
|
||
case <-func() chan struct{} {
|
||
done := make(chan struct{})
|
||
go func() {
|
||
wg.Wait()
|
||
close(done)
|
||
}()
|
||
return done
|
||
}():
|
||
fmt.Printf(" ✅ 聚合计算完成,共生成 %d 个窗口结果\n", resultCount)
|
||
}
|
||
}
|