Files
streamsql/streamsql_nested_field_test.go
T
2025-08-07 19:23:48 +08:00

315 lines
8.6 KiB
Go

package streamsql
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestComprehensiveNestedFieldAccess 全面测试嵌套字段访问功能
func TestComprehensiveNestedFieldAccess(t *testing.T) {
t.Run("多层嵌套字段访问", func(t *testing.T) {
streamsql := New()
defer func() {
if streamsql != nil {
streamsql.Stop()
}
}()
// 测试多层嵌套字段访问
var rsql = "SELECT device.info.name as device_name, device.location.building as building, sensor.data.temperature as temp FROM stream"
err := streamsql.Execute(rsql)
assert.Nil(t, err, "多层嵌套字段SQL应该能够执行")
require.NoError(t, err, "多层嵌套字段访问不应该出错")
strm := streamsql.stream
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果接收器
strm.AddSink(func(result []map[string]interface{}) {
resultChan <- result
})
// 添加带多层嵌套字段的测试数据
testData := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "温度传感器001",
"type": "temperature",
"status": "active",
},
"location": map[string]interface{}{
"building": "A栋",
"floor": "3F",
"room": "301",
},
},
"sensor": map[string]interface{}{
"data": map[string]interface{}{
"temperature": 28.5,
"humidity": 65.0,
"pressure": 1013.25,
},
"status": "online",
},
}
// 发送数据
strm.Emit(testData)
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case result := <-resultChan:
// 验证结果
resultSlice, ok := result.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
require.Len(t, resultSlice, 1, "应该只有一条结果")
item := resultSlice[0]
// 检查各个嵌套字段的提取情况
deviceName, deviceNameExists := item["device_name"]
assert.True(t, deviceNameExists, "device.info.name字段应该存在")
assert.Equal(t, "温度传感器001", deviceName, "device.info.name应该被正确提取")
building, buildingExists := item["building"]
assert.True(t, buildingExists, "device.location.building字段应该存在")
assert.Equal(t, "A栋", building, "device.location.building应该被正确提取")
temp, tempExists := item["temp"]
assert.True(t, tempExists, "sensor.data.temperature字段应该存在")
assert.Equal(t, 28.5, temp, "sensor.data.temperature应该被正确提取")
case <-ctx.Done():
t.Fatal("多层嵌套测试超时")
}
})
t.Run("嵌套字段聚合查询", func(t *testing.T) {
streamsql := New()
defer func() {
if streamsql != nil {
streamsql.Stop()
}
}()
// 测试聚合查询中的嵌套字段
var rsql = "SELECT device.type, AVG(sensor.temperature) as avg_temp, COUNT(*) as cnt FROM stream GROUP BY device.type, TumblingWindow('1s')"
err := streamsql.Execute(rsql)
assert.Nil(t, err, "嵌套字段聚合SQL应该能够执行")
require.NoError(t, err, "嵌套字段聚合查询不应该出错")
strm := streamsql.stream
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果回调
strm.AddSink(func(result []map[string]interface{}) {
resultChan <- result
})
// 添加测试数据
testData := []map[string]interface{}{
{
"device": map[string]interface{}{
"type": "temperature",
"id": "sensor001",
},
"sensor": map[string]interface{}{
"temperature": 25.0,
},
},
{
"device": map[string]interface{}{
"type": "temperature",
"id": "sensor002",
},
"sensor": map[string]interface{}{
"temperature": 35.0,
},
},
{
"device": map[string]interface{}{
"type": "humidity",
"id": "sensor003",
},
"sensor": map[string]interface{}{
"temperature": 20.0,
},
},
}
// 添加数据
for _, data := range testData {
strm.Emit(data)
}
// 等待窗口触发
time.Sleep(1200 * time.Millisecond)
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
select {
case result := <-resultChan:
// 验证聚合结果
resultSlice, ok := result.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
// 聚合查询可能返回空结果,这是正常的
if len(resultSlice) > 0 {
// 如果有结果,验证结果结构
for _, item := range resultSlice {
// 检查是否包含任何聚合字段
hasAnyField := false
if _, exists := item["type"]; exists {
hasAnyField = true
}
if _, exists := item["avg_temp"]; exists {
hasAnyField = true
}
if _, exists := item["cnt"]; exists {
hasAnyField = true
}
assert.True(t, hasAnyField, "聚合结果应该包含至少一个预期字段")
}
}
case <-ctx.Done():
t.Fatal("嵌套字段聚合查询测试超时")
}
})
t.Run("复杂嵌套字段WHERE条件", func(t *testing.T) {
streamsql := New()
defer func() {
if streamsql != nil {
streamsql.Stop()
}
}()
// 测试复杂的WHERE条件
var rsql = "SELECT * FROM stream WHERE device.info.status = 'active' AND sensor.data.temperature > 25 AND device.location.building = 'A栋'"
err := streamsql.Execute(rsql)
assert.Nil(t, err, "复杂嵌套字段WHERE SQL应该能够执行")
require.NoError(t, err, "复杂嵌套字段WHERE条件不应该出错")
strm := streamsql.stream
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果接收器
strm.AddSink(func(result []map[string]interface{}) {
resultChan <- result
})
// 添加测试数据:一条满足所有条件,一条不满足
testData1 := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "传感器A",
"status": "active", // 满足条件
},
"location": map[string]interface{}{
"building": "A栋", // 满足条件
},
},
"sensor": map[string]interface{}{
"data": map[string]interface{}{
"temperature": 30.0, // 满足条件 > 25
},
},
}
testData2 := map[string]interface{}{
"device": map[string]interface{}{
"info": map[string]interface{}{
"name": "传感器B",
"status": "inactive", // 不满足条件
},
"location": map[string]interface{}{
"building": "B栋", // 不满足条件
},
},
"sensor": map[string]interface{}{
"data": map[string]interface{}{
"temperature": 20.0, // 不满足条件 <= 25
},
},
}
// 发送数据
strm.Emit(testData1)
strm.Emit(testData2)
// 等待结果
var results []interface{}
timeout := time.After(3 * time.Second)
done := false
for !done {
select {
case result := <-resultChan:
results = append(results, result)
if len(results) >= 1 {
done = true
}
case <-timeout:
done = true
}
}
// 验证结果
assert.Greater(t, len(results), 0, "复杂WHERE条件应该返回结果")
for _, result := range results {
resultSlice, ok := result.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
for _, item := range resultSlice {
// 验证通过过滤的数据确实满足所有条件
device, deviceOk := item["device"].(map[string]interface{})
assert.True(t, deviceOk, "device字段应该存在且为map类型")
info, infoOk := device["info"].(map[string]interface{})
assert.True(t, infoOk, "device.info字段应该存在且为map类型")
status, statusOk := info["status"].(string)
assert.True(t, statusOk, "device.info.status字段应该存在且为string类型")
assert.Equal(t, "active", status, "status应该是active")
location, locationOk := device["location"].(map[string]interface{})
assert.True(t, locationOk, "device.location字段应该存在且为map类型")
building, buildingOk := location["building"].(string)
assert.True(t, buildingOk, "device.location.building字段应该存在且为string类型")
assert.Equal(t, "A栋", building, "building应该是A栋")
sensor, sensorOk := item["sensor"].(map[string]interface{})
assert.True(t, sensorOk, "sensor字段应该存在且为map类型")
data, dataOk := sensor["data"].(map[string]interface{})
assert.True(t, dataOk, "sensor.data字段应该存在且为map类型")
temp, tempOk := data["temperature"].(float64)
assert.True(t, tempOk, "sensor.data.temperature字段应该存在且为float64类型")
assert.Greater(t, temp, 25.0, "temperature应该大于25")
}
}
})
}