Files
streamsql/streamsql_post_aggregation_test.go
2025-08-29 13:49:26 +08:00

563 lines
21 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package streamsql
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// 辅助函数:创建测试环境
func createTestEnvironment(t *testing.T, rsql string) (*Streamsql, chan interface{}) {
ssql := New()
t.Cleanup(func() { ssql.Stop() })
err := ssql.Execute(rsql)
require.NoError(t, err)
resultChan := make(chan interface{}, 10)
ssql.AddSink(func(result []map[string]interface{}) {
resultChan <- result
})
return ssql, resultChan
}
// 辅助函数:发送测试数据并收集结果
func sendDataAndCollectResults(t *testing.T, ssql *Streamsql, resultChan chan interface{}, testData []map[string]interface{}, windowSizeSeconds int) []map[string]interface{} {
for _, data := range testData {
ssql.Emit(data)
}
// 等待窗口触发
time.Sleep(time.Duration(windowSizeSeconds+1) * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var results []map[string]interface{}
collecting:
for {
select {
case result := <-resultChan:
if resultSlice, ok := result.([]map[string]interface{}); ok {
results = append(results, resultSlice...)
}
case <-time.After(1 * time.Second):
break collecting
case <-ctx.Done():
break collecting
}
}
return results
}
// TestPostAggregationExpressions 测试分阶段聚合功能
func TestPostAggregationExpressions(t *testing.T) {
t.Run("基础聚合函数复杂运算", func(t *testing.T) {
rsql := `SELECT deviceId,
FIRST_VALUE(value) as firstVal,
LAST_VALUE(value) as lastVal,
(LAST_VALUE(value) - FIRST_VALUE(value)) as diffVal,
SUM(value) as sumVal,
AVG(value) as avgVal,
(SUM(value) / COUNT(*)) as calcAvg,
(SUM(value) + AVG(value)) as sumPlusAvg
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
ssql, resultChan := createTestEnvironment(t, rsql)
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
testData := []map[string]interface{}{
{"deviceId": "dev1", "value": 10.0, "ts": baseTime},
{"deviceId": "dev1", "value": 20.0, "ts": baseTime.Add(1 * time.Second)},
{"deviceId": "dev1", "value": 30.0, "ts": baseTime.Add(2 * time.Second)},
}
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
require.Len(t, results, 1)
result := results[0]
// 验证基础聚合函数运算
assert.Equal(t, "dev1", result["deviceId"])
assert.Equal(t, 10.0, result["firstVal"])
assert.Equal(t, 30.0, result["lastVal"])
assert.Equal(t, 20.0, result["diffVal"]) // LAST_VALUE - FIRST_VALUE
assert.Equal(t, 60.0, result["sumVal"])
assert.Equal(t, 20.0, result["avgVal"])
assert.Equal(t, 20.0, result["calcAvg"]) // SUM / COUNT
assert.Equal(t, 80.0, result["sumPlusAvg"]) // SUM + AVG
})
// IF_NULL 基础功能:在 IF_NULL 中包裹聚合/分析函数
t.Run("验证IF_NULL 基础功能", func(t *testing.T) {
rsql := `SELECT deviceId,
IF_NULL(FIRST_VALUE(value), 0) as firstOrZero,
IF_NULL(LAST_VALUE(value), 0) as lastOrZero,
IF_NULL(AVG(value), 0) as avgOrZero
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
ssql, resultChan := createTestEnvironment(t, rsql)
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
testData := []map[string]interface{}{
{"deviceId": "sensor1", "value": nil, "ts": baseTime},
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime.Add(1 * time.Second)},
{"deviceId": "sensor1", "value": nil, "ts": baseTime.Add(2 * time.Second)},
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(3 * time.Second)},
}
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
require.Len(t, results, 1)
result := results[0]
assert.Equal(t, "sensor1", result["deviceId"])
// FIRST_VALUE(value) 为 nil => IF_NULL(...,0) = 0
assert.Equal(t, 0.0, result["firstOrZero"])
// LAST_VALUE(value) 为 30 => IF_NULL(...,0) = 30
assert.Equal(t, 30.0, result["lastOrZero"])
// AVG(value) 仅计算非空 => (10+30)/2 = 20 => IF_NULL(...,0) = 20
assert.Equal(t, 20.0, result["avgOrZero"])
})
// 聚合函数参数中嵌套 IF_NULL如 SUM(IF_NULL(value,0))
t.Run("验证:聚合函数嵌套 IF_NULL", func(t *testing.T) {
rsql := `SELECT deviceId,
SUM(IF_NULL(value, 0)) as sumVal,
AVG(IF_NULL(value, 0)) as avgVal,
MAX(IF_NULL(value, 0)) as maxVal,
MIN(IF_NULL(value, 0)) as minVal
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
ssql, resultChan := createTestEnvironment(t, rsql)
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
testData := []map[string]interface{}{
{"deviceId": "sensor1", "value": nil, "ts": baseTime},
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime.Add(1 * time.Second)},
{"deviceId": "sensor1", "value": nil, "ts": baseTime.Add(2 * time.Second)},
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(3 * time.Second)},
}
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
require.Len(t, results, 1)
result := results[0]
assert.Equal(t, "sensor1", result["deviceId"])
// SUM(IF_NULL(value,0)) = 0 + 10 + 0 + 30 = 40
assert.Equal(t, 40.0, result["sumVal"])
// AVG(IF_NULL(value,0)) = (0 + 10 + 0 + 30)/4 = 10
assert.Equal(t, 10.0, result["avgVal"])
// MAX(IF_NULL(value,0)) = max(0,10,0,30) = 30
assert.Equal(t, 30.0, result["maxVal"])
// MIN(IF_NULL(value,0)) = min(0,10,0,30) = 0
assert.Equal(t, 0.0, result["minVal"])
})
t.Run("分析函数与聚合函数复杂运算", func(t *testing.T) {
rsql := `SELECT deviceId,
SUM(value) as total,
AVG(value) as average,
LATEST(value) as latest,
(SUM(value) + LATEST(value)) as totalPlusLatest,
(AVG(value) * LATEST(value)) as avgTimesLatest
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
ssql, resultChan := createTestEnvironment(t, rsql)
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
testData := []map[string]interface{}{
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime},
{"deviceId": "sensor1", "value": 20.0, "ts": baseTime.Add(1 * time.Second)},
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(2 * time.Second)},
}
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
require.Len(t, results, 1)
result := results[0]
// 验证分析函数与聚合函数的复杂运算
assert.Equal(t, "sensor1", result["deviceId"])
assert.Equal(t, 60.0, result["total"]) // 10+20+30
assert.Equal(t, 20.0, result["average"]) // 60/3
assert.Equal(t, 30.0, result["latest"]) // 最新值
assert.Equal(t, 90.0, result["totalPlusLatest"]) // 60 + 30
assert.Equal(t, 600.0, result["avgTimesLatest"]) // 20 * 30
})
t.Run("最外层嵌套普通函数验证", func(t *testing.T) {
rsql := `SELECT deviceId,
SUM(value) as total,
COUNT(*) as count,
AVG(value) as average,
MAX(value) as maxVal,
(COUNT(*) * AVG(value)) as countTimesAvg,
(SUM(value) / MAX(value)) as sumDivideMax,
((COUNT(*) + SUM(value)) * AVG(value)) as complexNested,
FLOOR((SUM(value) / MAX(value))) as floorResult,
CEIL((AVG(value) / COUNT(*))) as ceilResult,
ROUND((SUM(value) * AVG(value) / 1000), 2) as roundResult
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
ssql, resultChan := createTestEnvironment(t, rsql)
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
testData := []map[string]interface{}{
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime},
{"deviceId": "sensor1", "value": 20.0, "ts": baseTime.Add(1 * time.Second)},
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(2 * time.Second)},
{"deviceId": "sensor1", "value": 40.0, "ts": baseTime.Add(3 * time.Second)},
}
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
require.Len(t, results, 1)
result := results[0]
// 验证基础函数
assert.Equal(t, "sensor1", result["deviceId"])
assert.Equal(t, 100.0, result["total"]) // 10+20+30+40
assert.Equal(t, 4.0, result["count"]) // 4 records
assert.Equal(t, 25.0, result["average"]) // 100/4
assert.Equal(t, 40.0, result["maxVal"]) // max value
// 验证最外层嵌套普通函数
// (COUNT(*) * AVG(value)) = 4 * 25 = 100
assert.Equal(t, 100.0, result["countTimesAvg"], "最外层嵌套函数计算错误")
// (SUM(value) / MAX(value)) = 100 / 40 = 2.5
assert.Equal(t, 2.5, result["sumDivideMax"], "最外层嵌套函数计算错误")
// ((COUNT(*) + SUM(value)) * AVG(value)) = (4 + 100) * 25 = 2600
assert.Equal(t, 2600.0, result["complexNested"], "最外层复杂嵌套函数计算错误")
// 验证最外层嵌套普通函数
// FLOOR((SUM(value) / MAX(value))) = FLOOR(100/40) = FLOOR(2.5) = 2
if floorResult, ok := result["floorResult"].(float64); ok {
assert.Equal(t, 2.0, floorResult, "FLOOR函数嵌套计算错误")
}
// CEIL((AVG(value) / COUNT(*))) = CEIL(25/4) = CEIL(6.25) = 7
if ceilResult, ok := result["ceilResult"].(float64); ok {
assert.Equal(t, 7.0, ceilResult, "CEIL函数嵌套计算错误")
}
// ROUND((SUM(value) * AVG(value) / 1000), 2) = ROUND(100*25/1000, 2) = ROUND(2.5, 2) = 2.5
if roundResult, ok := result["roundResult"].(float64); ok {
assert.Equal(t, 2.5, roundResult, "ROUND函数嵌套计算错误")
}
// 验证最外层嵌套普通函数的正确性
assert.Equal(t, 100.0, result["countTimesAvg"], "COUNT(*) * AVG(value) 计算错误")
assert.Equal(t, 2.5, result["sumDivideMax"], "SUM(value) / MAX(value) 计算错误")
assert.Equal(t, 2600.0, result["complexNested"], "复杂嵌套表达式计算错误")
assert.Equal(t, 2.0, result["floorResult"], "FLOOR函数嵌套计算错误")
assert.Equal(t, 7.0, result["ceilResult"], "CEIL函数嵌套计算错误")
assert.Equal(t, 2.5, result["roundResult"], "ROUND函数嵌套计算错误")
})
t.Run("电表读数差值计算", func(t *testing.T) {
rsql := `SELECT deviceId,
(LAST_VALUE(displayNum) - FIRST_VALUE(displayNum)) as diffVal,
window_start() as start,
window_end() as end
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
ssql, resultChan := createTestEnvironment(t, rsql)
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
testData := []map[string]interface{}{
// 设备1的数据
{"deviceId": "meter001", "displayNum": 100.0, "ts": baseTime},
{"deviceId": "meter001", "displayNum": 115.0, "ts": baseTime.Add(3 * time.Second)},
// 设备2的数据
{"deviceId": "meter002", "displayNum": 200.0, "ts": baseTime.Add(1 * time.Second)},
{"deviceId": "meter002", "displayNum": 206.0, "ts": baseTime.Add(4 * time.Second)},
}
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
require.GreaterOrEqual(t, len(results), 1, "应该至少有一个窗口的结果")
// 预期结果
expectedDiffs := map[string]float64{
"meter001": 15.0, // 115.0 - 100.0 = 15.0 kWh
"meter002": 6.0, // 206.0 - 200.0 = 6.0 kWh
}
// 验证每个设备的计算结果
deviceResults := make(map[string]map[string]interface{})
for _, result := range results {
deviceId, ok := result["deviceId"].(string)
require.True(t, ok, "deviceId应该是字符串类型")
deviceResults[deviceId] = result
}
for deviceId, expectedDiff := range expectedDiffs {
result, exists := deviceResults[deviceId]
assert.True(t, exists, "应该有设备 %s 的结果", deviceId)
if exists {
diffVal, ok := result["diffVal"].(float64)
assert.True(t, ok, "diffVal应该是float64类型")
assert.InEpsilon(t, expectedDiff, diffVal, 0.001,
"设备 %s 的用电量计算应该正确: 期望 %.1f, 实际 %.1f",
deviceId, expectedDiff, diffVal)
// 验证窗口时间字段存在
assert.Contains(t, result, "start", "结果应包含窗口开始时间")
assert.Contains(t, result, "end", "结果应包含窗口结束时间")
}
}
// 原始问题验证成功:电表读数差值计算正确
})
t.Run("综合功能验证", func(t *testing.T) {
rsql := `SELECT deviceId,
SUM(value) as total,
AVG(value) as average,
FIRST_VALUE(value) as first,
LAST_VALUE(value) as last,
LATEST(value) as latest,
COUNT(*) as count,
MAX(value) as maxVal,
MIN(value) as minVal,
((SUM(value) + FIRST_VALUE(value)) / COUNT(*)) as complexCalc1,
(LAST_VALUE(value) * AVG(value) - FIRST_VALUE(value)) as complexCalc2,
((LATEST(value) + SUM(value)) / COUNT(*)) as complexCalc3,
(MAX(value) + MIN(value) - AVG(value)) as complexCalc4,
ROUND(SQRT(ABS(AVG(value) - MIN(value))), 2) as nestedMathFunc,
UPPER(CONCAT('RESULT_', CAST(ROUND(SUM(value), 0) as STRING))) as nestedStrMathFunc
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
ssql, resultChan := createTestEnvironment(t, rsql)
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
testData := []map[string]interface{}{
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime},
{"deviceId": "sensor1", "value": 20.0, "ts": baseTime.Add(1 * time.Second)},
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(2 * time.Second)},
{"deviceId": "sensor1", "value": 40.0, "ts": baseTime.Add(3 * time.Second)},
}
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
require.Len(t, results, 1)
result := results[0]
// 验证基础函数
assert.Equal(t, "sensor1", result["deviceId"])
assert.Equal(t, 100.0, result["total"]) // 10+20+30+40
assert.Equal(t, 25.0, result["average"]) // 100/4
assert.Equal(t, 10.0, result["first"]) // 第一个值
assert.Equal(t, 40.0, result["last"]) // 最后一个值
assert.Equal(t, 40.0, result["latest"]) // 最新值
assert.Equal(t, 4.0, result["count"]) // 4条记录
assert.Equal(t, 40.0, result["maxVal"]) // 最大值
assert.Equal(t, 10.0, result["minVal"]) // 最小值
// 验证复杂表达式计算
assert.Equal(t, 27.5, result["complexCalc1"]) // (100 + 10) / 4 = 27.5
assert.Equal(t, 990.0, result["complexCalc2"]) // 40 * 25 - 10 = 990
assert.Equal(t, 35.0, result["complexCalc3"]) // (40 + 100) / 4 = 35
assert.Equal(t, 25.0, result["complexCalc4"]) // 40 + 10 - 25 = 25
// 验证多层嵌套数学函数
// ROUND(SQRT(ABS(AVG(value) - MIN(value))), 2) = ROUND(SQRT(ABS(25-10)), 2) = ROUND(SQRT(15), 2) ≈ 3.87
if nestedMathFunc, ok := result["nestedMathFunc"].(float64); ok {
assert.InEpsilon(t, 3.87, nestedMathFunc, 0.01, "多层嵌套数学函数计算错误")
}
// 验证多层嵌套字符串和数学函数
// UPPER(CONCAT('RESULT_', CAST(ROUND(SUM(value), 0) as STRING))) = UPPER(CONCAT('RESULT_', '100')) = 'RESULT_100'
if nestedStrMathFunc, ok := result["nestedStrMathFunc"].(string); ok {
assert.Equal(t, "RESULT_100", nestedStrMathFunc, "多层嵌套字符串和数学函数计算错误")
}
})
t.Run("嵌套聚合函数运算测试", func(t *testing.T) {
rsql := `SELECT deviceId,
SUM(value) as total,
AVG(value) as average,
COUNT(*) as count,
MAX(value) as maxVal,
MIN(value) as minVal,
ROUND(AVG(ABS(value)), 2) as avgAbs,
MAX(ROUND(value, 1)) as maxRounded,
MIN(CEIL(value / 10)) as minCeiled,
AVG(SQRT(value)) as avgSqrt,
SUM(POWER(value, 2)) as sumSquares,
CEIL(AVG(FLOOR(SQRT(value)))) as tripleNested2,
ABS(MIN(ROUND(value / 5, 2))) as tripleNested3
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
ssql, resultChan := createTestEnvironment(t, rsql)
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
testData := []map[string]interface{}{
{"deviceId": "sensor1", "value": 16.0, "ts": baseTime},
{"deviceId": "sensor1", "value": 25.0, "ts": baseTime.Add(1 * time.Second)},
{"deviceId": "sensor1", "value": 36.0, "ts": baseTime.Add(2 * time.Second)},
{"deviceId": "sensor1", "value": 49.0, "ts": baseTime.Add(3 * time.Second)},
}
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
require.Len(t, results, 1)
result := results[0]
// 验证基础聚合函数
assert.Equal(t, "sensor1", result["deviceId"])
assert.Equal(t, 126.0, result["total"]) // 16+25+36+49
assert.Equal(t, 31.5, result["average"]) // 126/4
assert.Equal(t, 4.0, result["count"]) // 4 records
assert.Equal(t, 49.0, result["maxVal"]) // max value
assert.Equal(t, 16.0, result["minVal"]) // min value
// 验证嵌套聚合函数运算
// ROUND(AVG(ABS(value)), 2) = ROUND(AVG(16,25,36,49), 2) = ROUND(31.5, 2) = 31.5
if avgAbs, ok := result["avgAbs"].(float64); ok {
assert.Equal(t, 31.5, avgAbs, "AVG(ABS(value))计算错误")
}
// MAX(ROUND(value, 1)) = MAX(16.0, 25.0, 36.0, 49.0) = 49.0
if maxRounded, ok := result["maxRounded"].(float64); ok {
assert.Equal(t, 49.0, maxRounded, "MAX(ROUND(value, 1))计算错误")
}
// MIN(CEIL(value / 10)) = MIN(CEIL(1.6), CEIL(2.5), CEIL(3.6), CEIL(4.9)) = MIN(2, 3, 4, 5) = 2
if minCeiled, ok := result["minCeiled"].(float64); ok {
assert.Equal(t, 2.0, minCeiled, "MIN(CEIL(value / 10))计算错误")
}
// AVG(SQRT(value)) = AVG(SQRT(16), SQRT(25), SQRT(36), SQRT(49)) = AVG(4, 5, 6, 7) = 5.5
if avgSqrt, ok := result["avgSqrt"].(float64); ok {
assert.Equal(t, 5.5, avgSqrt, "AVG(SQRT(value))计算错误")
}
// SUM(POWER(value, 2)) = SUM(16^2, 25^2, 36^2, 49^2) = SUM(256, 625, 1296, 2401) = 4578
if sumSquares, ok := result["sumSquares"].(float64); ok {
assert.Equal(t, 4578.0, sumSquares, "SUM(POWER(value, 2))计算错误")
}
// CEIL(AVG(FLOOR(SQRT(value))))
// = CEIL(AVG(FLOOR(4), FLOOR(5), FLOOR(6), FLOOR(7))) = CEIL(AVG(4, 5, 6, 7)) = CEIL(5.5) = 6
if tripleNested2, ok := result["tripleNested2"].(float64); ok {
assert.Equal(t, 6.0, tripleNested2, "三层嵌套聚合2计算错误")
}
// ABS(MIN(ROUND(value / 5, 2)))
// = ABS(MIN(ROUND(3.2, 2), ROUND(5, 2), ROUND(7.2, 2), ROUND(9.8, 2)))
// = ABS(MIN(3.2, 5.0, 7.2, 9.8)) = ABS(3.2) = 3.2
if tripleNested3, ok := result["tripleNested3"].(float64); ok {
assert.Equal(t, 3.2, tripleNested3, "三层嵌套聚合3计算错误")
}
})
t.Run("验证NTH_VALUE和LEAD函数", func(t *testing.T) {
rsql := `SELECT deviceId,
SUM(value) as total,
COUNT(*) as count,
NTH_VALUE(value, 2) as secondValue,
LEAD(value, 1) as leadValue,
(COUNT(*) * NTH_VALUE(value, 2)) as countTimesSecond,
(SUM(value) + LEAD(value, 1)) as sumPlusLead
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
ssql, resultChan := createTestEnvironment(t, rsql)
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
testData := []map[string]interface{}{
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime},
{"deviceId": "sensor1", "value": 20.0, "ts": baseTime.Add(1 * time.Second)},
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(2 * time.Second)},
{"deviceId": "sensor1", "value": 40.0, "ts": baseTime.Add(3 * time.Second)},
}
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
require.Len(t, results, 1)
result := results[0]
// 验证基础函数
assert.Equal(t, "sensor1", result["deviceId"])
assert.Equal(t, 100.0, result["total"]) // 10+20+30+40
assert.Equal(t, 4.0, result["count"]) // 4 records
// 验证窗口函数基础功能
assert.NotNil(t, result["countTimesSecond"], "COUNT(*) * NTH_VALUE(value, 2) 应该有计算结果")
})
t.Run("验证NTH_VALUE基础功能", func(t *testing.T) {
rsql := `SELECT deviceId,
NTH_VALUE(value, 1) as firstValue,
NTH_VALUE(value, 2) as secondValue,
NTH_VALUE(value, 3) as thirdValue,
NTH_VALUE(value, 4) as fourthValue
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
ssql, resultChan := createTestEnvironment(t, rsql)
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
testData := []map[string]interface{}{
{"deviceId": "sensor1", "value": 100.0, "ts": baseTime},
{"deviceId": "sensor1", "value": 200.0, "ts": baseTime.Add(1 * time.Second)},
{"deviceId": "sensor1", "value": 300.0, "ts": baseTime.Add(2 * time.Second)},
{"deviceId": "sensor1", "value": 400.0, "ts": baseTime.Add(3 * time.Second)},
}
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
require.Len(t, results, 1)
result := results[0]
// 验证 NTH_VALUE 函数的返回值
// 期望结果:按添加顺序
// 第1个值: 100, 第2个值: 200, 第3个值: 300, 第4个值: 400
if firstValue, ok := result["firstValue"].(float64); ok {
assert.Equal(t, 100.0, firstValue, "第1个值应该是100")
} else {
assert.Error(t, errors.New("firstValue 为空"))
}
if secondValue, ok := result["secondValue"].(float64); ok {
assert.Equal(t, 200.0, secondValue, "第2个值应该是200")
} else {
assert.Error(t, errors.New("secondValue 为空"))
}
if thirdValue, ok := result["thirdValue"].(float64); ok {
assert.Equal(t, 300.0, thirdValue, "第3个值应该是300")
} else {
assert.Error(t, errors.New("thirdValue 为空"))
}
if fourthValue, ok := result["fourthValue"].(float64); ok {
assert.Equal(t, 400.0, fourthValue, "第4个值应该是400")
} else {
assert.Error(t, errors.New("fourthValue 为空"))
}
})
}