mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-04-17 17:48:26 +00:00
@@ -7,29 +7,29 @@ import (
|
||||
)
|
||||
|
||||
type testData struct {
|
||||
Device string
|
||||
Data1 float64
|
||||
Data2 float64
|
||||
Device string
|
||||
temperature float64
|
||||
humidity float64
|
||||
}
|
||||
|
||||
func TestGroupAggregator_MultiFieldSum(t *testing.T) {
|
||||
agg := NewGroupAggregator(
|
||||
[]string{"Device"},
|
||||
map[string]AggregateType{
|
||||
"Data1": Sum,
|
||||
"Data2": Sum,
|
||||
"temperature": Sum,
|
||||
"humidity": Sum,
|
||||
},
|
||||
map[string]string{
|
||||
"Data1": "Data1_sum",
|
||||
"Data2": "Data2_sum",
|
||||
"temperature": "temperature_sum",
|
||||
"humidity": "humidity_sum",
|
||||
},
|
||||
)
|
||||
|
||||
testData := []map[string]interface{}{
|
||||
{"Device": "aa", "Data1": 20, "Data2": 30},
|
||||
{"Device": "aa", "Data1": 21, "Data2": 0},
|
||||
{"Device": "bb", "Data1": 15, "Data2": 20},
|
||||
{"Device": "bb", "Data1": 16, "Data2": 20},
|
||||
{"Device": "aa", "temperature": 25.5, "humidity": 60.0},
|
||||
{"Device": "aa", "temperature": 26.8, "humidity": 55.0},
|
||||
{"Device": "bb", "temperature": 22.3, "humidity": 65.0},
|
||||
{"Device": "bb", "temperature": 23.5, "humidity": 70.0},
|
||||
}
|
||||
|
||||
for _, d := range testData {
|
||||
@@ -37,8 +37,8 @@ func TestGroupAggregator_MultiFieldSum(t *testing.T) {
|
||||
}
|
||||
|
||||
expected := []map[string]interface{}{
|
||||
{"Device": "aa", "Data1_sum": 41.0, "Data2_sum": 30.0},
|
||||
{"Device": "bb", "Data1_sum": 31.0, "Data2_sum": 40.0},
|
||||
{"Device": "aa", "temperature_sum": 52.3, "humidity_sum": 115.0},
|
||||
{"Device": "bb", "temperature_sum": 45.8, "humidity_sum": 135.0},
|
||||
}
|
||||
|
||||
results, _ := agg.GetResults()
|
||||
@@ -49,16 +49,16 @@ func TestGroupAggregator_SingleField(t *testing.T) {
|
||||
agg := NewGroupAggregator(
|
||||
[]string{"Device"},
|
||||
map[string]AggregateType{
|
||||
"Data1": Sum,
|
||||
"temperature": Sum,
|
||||
},
|
||||
map[string]string{
|
||||
"Data1": "Data1_sum",
|
||||
"temperature": "temperature_sum",
|
||||
},
|
||||
)
|
||||
|
||||
testData := []map[string]interface{}{
|
||||
{"Device": "cc", "Data1": 10},
|
||||
{"Device": "cc", "Data1": 20},
|
||||
{"Device": "cc", "temperature": 24.5},
|
||||
{"Device": "cc", "temperature": 27.8},
|
||||
}
|
||||
|
||||
for _, d := range testData {
|
||||
@@ -66,7 +66,7 @@ func TestGroupAggregator_SingleField(t *testing.T) {
|
||||
}
|
||||
|
||||
expected := []map[string]interface{}{
|
||||
{"Device": "cc", "Data1_sum": 30.0},
|
||||
{"Device": "cc", "temperature_sum": 52.3},
|
||||
}
|
||||
|
||||
results, _ := agg.GetResults()
|
||||
@@ -77,22 +77,22 @@ func TestGroupAggregator_MultipleAggregators(t *testing.T) {
|
||||
agg := NewGroupAggregator(
|
||||
[]string{"Device"},
|
||||
map[string]AggregateType{
|
||||
"Data1": Sum,
|
||||
"Data2": Avg,
|
||||
"Data3": Max,
|
||||
"Data4": Min,
|
||||
"temperature": Sum,
|
||||
"humidity": Avg,
|
||||
"presure": Max,
|
||||
"PM10": Min,
|
||||
},
|
||||
map[string]string{
|
||||
"Data1": "Data1_sum",
|
||||
"Data2": "Data2_avg",
|
||||
"Data3": "Data3_max",
|
||||
"Data4": "Data4_min",
|
||||
"temperature": "temperature_sum",
|
||||
"humidity": "humidity_avg",
|
||||
"presure": "presure_max",
|
||||
"PM10": "PM10_min",
|
||||
},
|
||||
)
|
||||
|
||||
testData := []map[string]interface{}{
|
||||
{"Device": "cc", "Data1": 10, "Data2": 5.5, "Data3": 8, "Data4": 3},
|
||||
{"Device": "cc", "Data1": 20, "Data2": 4.5, "Data3": 12, "Data4": 2},
|
||||
{"Device": "cc", "temperature": 25.5, "humidity": 65.5, "presure": 1008, "PM10": 35},
|
||||
{"Device": "cc", "temperature": 27.8, "humidity": 60.5, "presure": 1012, "PM10": 28},
|
||||
}
|
||||
|
||||
for _, d := range testData {
|
||||
@@ -101,11 +101,11 @@ func TestGroupAggregator_MultipleAggregators(t *testing.T) {
|
||||
|
||||
expected := []map[string]interface{}{
|
||||
{
|
||||
"Device": "cc",
|
||||
"Data1_sum": 30.0,
|
||||
"Data2_avg": 5.0,
|
||||
"Data3_max": 12.0,
|
||||
"Data4_min": 2.0,
|
||||
"Device": "cc",
|
||||
"temperature_sum": 53.3,
|
||||
"humidity_avg": 63.0,
|
||||
"presure_max": 1012.0,
|
||||
"PM10_min": 28.0,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
+5
-5
@@ -36,7 +36,7 @@ func TestParseSQL(t *testing.T) {
|
||||
condition: "deviceId == 'aa'",
|
||||
},
|
||||
{
|
||||
sql: "select max(score) as max_score, min(age) as min_age from Sensor group by type, SlidingWindow('20s', '5s')",
|
||||
sql: "select max(humidity) as max_humidity, min(temperature) as min_temp from Sensor group by type, SlidingWindow('20s', '5s')",
|
||||
expected: &model.Config{
|
||||
WindowConfig: model.WindowConfig{
|
||||
Type: "sliding",
|
||||
@@ -47,8 +47,8 @@ func TestParseSQL(t *testing.T) {
|
||||
},
|
||||
GroupFields: []string{"type"},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"score": "max",
|
||||
"age": "min",
|
||||
"humidity": "max",
|
||||
"temperature": "min",
|
||||
},
|
||||
},
|
||||
condition: "",
|
||||
@@ -121,8 +121,8 @@ func TestWindowParamParsing(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConditionParsing(t *testing.T) {
|
||||
sql := "select * from metrics where cpu > 80 || (mem < 20 && disk == '/dev/sda')"
|
||||
expected := "cpu > 80 || (mem < 20 && disk == '/dev/sda')"
|
||||
sql := "select cpu,mem from metrics where cpu > 80 or (mem < 20 and disk == '/dev/sda')"
|
||||
expected := "cpu > 80 || ( mem < 20 && disk == '/dev/sda' )"
|
||||
|
||||
parser := NewParser(sql)
|
||||
stmt, err := parser.Parse()
|
||||
|
||||
+58
-62
@@ -20,15 +20,15 @@ func TestStreamProcess(t *testing.T) {
|
||||
},
|
||||
GroupFields: []string{"device"},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"age": aggregator.Avg,
|
||||
"score": aggregator.Sum,
|
||||
"temperature": aggregator.Avg,
|
||||
"humidity": aggregator.Sum,
|
||||
},
|
||||
}
|
||||
|
||||
strm, err := NewStream(config)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = strm.RegisterFilter("device == 'aa' && age > 10")
|
||||
err = strm.RegisterFilter("device == 'aa' && temperature > 10")
|
||||
require.NoError(t, err)
|
||||
|
||||
// 添加 Sink 函数来捕获结果
|
||||
@@ -41,9 +41,9 @@ func TestStreamProcess(t *testing.T) {
|
||||
|
||||
// 准备测试数据
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{"device": "aa", "age": 15.0, "score": 100},
|
||||
map[string]interface{}{"device": "aa", "age": 20.0, "score": 200},
|
||||
map[string]interface{}{"device": "bb", "age": 25.0, "score": 300},
|
||||
map[string]interface{}{"device": "aa", "temperature": 25.0, "humidity": 60},
|
||||
map[string]interface{}{"device": "aa", "temperature": 30.0, "humidity": 55},
|
||||
map[string]interface{}{"device": "bb", "temperature": 22.0, "humidity": 70},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
@@ -62,18 +62,18 @@ func TestStreamProcess(t *testing.T) {
|
||||
t.Fatal("No results received within 5 seconds")
|
||||
}
|
||||
|
||||
// 预期结果:只有 device='aa' 且 age>10 的数据会被聚合
|
||||
// 预期结果:只有 device='aa' 且 temperature>10 的数据会被聚合
|
||||
expected := map[string]interface{}{
|
||||
"device": "aa",
|
||||
"age_avg": 17.5, // (15+20)/2
|
||||
"score_sum": 300.0, // 100+200
|
||||
"device": "aa",
|
||||
"temperature_avg": 27.5, // (25+30)/2
|
||||
"humidity_sum": 115.0, // 60+55
|
||||
}
|
||||
|
||||
// 验证结果
|
||||
assert.IsType(t, []map[string]interface{}{}, actual)
|
||||
resultMap := actual.([]map[string]interface{})
|
||||
assert.InEpsilon(t, expected["age_avg"].(float64), resultMap[0]["age_avg"].(float64), 0.0001)
|
||||
assert.InDelta(t, expected["score_sum"].(float64), resultMap[0]["score_sum"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expected["temperature_avg"].(float64), resultMap[0]["temperature_avg"].(float64), 0.0001)
|
||||
assert.InDelta(t, expected["humidity_sum"].(float64), resultMap[0]["humidity_sum"].(float64), 0.0001)
|
||||
}
|
||||
|
||||
// 不设置过滤器
|
||||
@@ -85,8 +85,8 @@ func TestStreamWithoutFilter(t *testing.T) {
|
||||
},
|
||||
GroupFields: []string{"device"},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"age": aggregator.Max,
|
||||
"score": aggregator.Min,
|
||||
"temperature": aggregator.Max,
|
||||
"humidity": aggregator.Min,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -96,9 +96,9 @@ func TestStreamWithoutFilter(t *testing.T) {
|
||||
strm.Start()
|
||||
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{"device": "aa", "age": 5.0, "score": 100},
|
||||
map[string]interface{}{"device": "aa", "age": 10.0, "score": 200},
|
||||
map[string]interface{}{"device": "bb", "age": 3.0, "score": 300},
|
||||
map[string]interface{}{"device": "aa", "temperature": 25.0, "humidity": 60},
|
||||
map[string]interface{}{"device": "aa", "temperature": 30.0, "humidity": 55},
|
||||
map[string]interface{}{"device": "bb", "temperature": 22.0, "humidity": 70},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
@@ -126,14 +126,14 @@ func TestStreamWithoutFilter(t *testing.T) {
|
||||
|
||||
expected := []map[string]interface{}{
|
||||
{
|
||||
"device": "aa",
|
||||
"age_max": 10.0,
|
||||
"score_min": 100.0,
|
||||
"device": "aa",
|
||||
"temperature_max": 30.0,
|
||||
"humidity_min": 55.0,
|
||||
},
|
||||
{
|
||||
"device": "bb",
|
||||
"age_max": 3.0,
|
||||
"score_min": 300.0,
|
||||
"device": "bb",
|
||||
"temperature_max": 22.0,
|
||||
"humidity_min": 70.0,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -145,14 +145,12 @@ func TestStreamWithoutFilter(t *testing.T) {
|
||||
for _, expectedResult := range expected {
|
||||
found := false
|
||||
for _, resultMap := range resultSlice {
|
||||
//if resultMap, ok := result.(map[string]interface{}); ok {
|
||||
if resultMap["device"] == expectedResult["device"] {
|
||||
assert.InEpsilon(t, expectedResult["age_max"].(float64), resultMap["age_max"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["score_min"].(float64), resultMap["score_min"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["temperature_max"].(float64), resultMap["temperature_max"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["humidity_min"].(float64), resultMap["humidity_min"].(float64), 0.0001)
|
||||
found = true
|
||||
break
|
||||
}
|
||||
//}
|
||||
}
|
||||
assert.True(t, found, fmt.Sprintf("Expected result for device %v not found", expectedResult["device"]))
|
||||
}
|
||||
@@ -166,15 +164,15 @@ func TestIncompleteStreamProcess(t *testing.T) {
|
||||
},
|
||||
GroupFields: []string{"device"},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"age": aggregator.Avg,
|
||||
"score": aggregator.Sum,
|
||||
"temperature": aggregator.Avg,
|
||||
"humidity": aggregator.Sum,
|
||||
},
|
||||
}
|
||||
|
||||
strm, err := NewStream(config)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = strm.RegisterFilter("device == 'aa' && age > 10")
|
||||
err = strm.RegisterFilter("device == 'aa' ")
|
||||
require.NoError(t, err)
|
||||
|
||||
// 添加 Sink 函数来捕获结果
|
||||
@@ -187,11 +185,11 @@ func TestIncompleteStreamProcess(t *testing.T) {
|
||||
|
||||
// 准备测试数据
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{"device": "aa", "age": 15.0},
|
||||
map[string]interface{}{"device": "aa", "score": 100},
|
||||
map[string]interface{}{"device": "aa", "age": 20.0},
|
||||
map[string]interface{}{"device": "aa", "score": 200},
|
||||
map[string]interface{}{"device": "bb", "age": 25.0, "score": 300},
|
||||
map[string]interface{}{"device": "aa", "temperature": 25.0},
|
||||
map[string]interface{}{"device": "aa", "humidity": 60},
|
||||
map[string]interface{}{"device": "aa", "temperature": 30.0},
|
||||
map[string]interface{}{"device": "aa", "humidity": 55},
|
||||
map[string]interface{}{"device": "bb", "temperature": 22.0, "humidity": 70},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
@@ -210,18 +208,18 @@ func TestIncompleteStreamProcess(t *testing.T) {
|
||||
t.Fatal("No results received within 5 seconds")
|
||||
}
|
||||
|
||||
// 预期结果:只有 device='aa' 且 age>10 的数据会被聚合
|
||||
// 预期结果:只有 device='aa' 且 temperature>10 的数据会被聚合
|
||||
expected := map[string]interface{}{
|
||||
"device": "aa",
|
||||
"age_avg": 17.5, // (15+20)/2
|
||||
"score_sum": 300.0, // 100+200
|
||||
"device": "aa",
|
||||
"temperature_avg": 27.5, // (25+30)/2
|
||||
"humidity_sum": 115.0, // 60+55
|
||||
}
|
||||
|
||||
// 验证结果
|
||||
assert.IsType(t, []map[string]interface{}{}, actual)
|
||||
resultMap := actual.([]map[string]interface{})
|
||||
assert.InEpsilon(t, expected["age_avg"].(float64), resultMap[0]["age_avg"].(float64), 0.0001)
|
||||
assert.InDelta(t, expected["score_sum"].(float64), resultMap[0]["score_sum"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expected["temperature_avg"].(float64), resultMap[0]["temperature_avg"].(float64), 0.0001)
|
||||
assert.InDelta(t, expected["humidity_sum"].(float64), resultMap[0]["humidity_sum"].(float64), 0.0001)
|
||||
}
|
||||
|
||||
func TestWindowSlotAgg(t *testing.T) {
|
||||
@@ -233,10 +231,10 @@ func TestWindowSlotAgg(t *testing.T) {
|
||||
},
|
||||
GroupFields: []string{"device"},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"age": aggregator.Max,
|
||||
"score": aggregator.Min,
|
||||
"start": aggregator.WindowStart,
|
||||
"end": aggregator.WindowEnd,
|
||||
"temperature": aggregator.Max,
|
||||
"humidity": aggregator.Min,
|
||||
"start": aggregator.WindowStart,
|
||||
"end": aggregator.WindowEnd,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -248,9 +246,9 @@ func TestWindowSlotAgg(t *testing.T) {
|
||||
baseTime := time.Date(2025, 4, 7, 16, 46, 0, 0, time.UTC)
|
||||
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{"device": "aa", "age": 5.0, "score": 100, "ts": baseTime},
|
||||
map[string]interface{}{"device": "aa", "age": 10.0, "score": 200, "ts": baseTime.Add(1 * time.Second)},
|
||||
map[string]interface{}{"device": "bb", "age": 3.0, "score": 300, "ts": baseTime},
|
||||
map[string]interface{}{"device": "aa", "temperature": 25.0, "humidity": 60, "ts": baseTime},
|
||||
map[string]interface{}{"device": "aa", "temperature": 30.0, "humidity": 55, "ts": baseTime.Add(1 * time.Second)},
|
||||
map[string]interface{}{"device": "bb", "temperature": 22.0, "humidity": 70, "ts": baseTime},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
@@ -278,18 +276,18 @@ func TestWindowSlotAgg(t *testing.T) {
|
||||
|
||||
expected := []map[string]interface{}{
|
||||
{
|
||||
"device": "aa",
|
||||
"age_max": 10.0,
|
||||
"score_min": 100.0,
|
||||
"start": baseTime.UnixNano(),
|
||||
"end": baseTime.Add(2 * time.Second).UnixNano(),
|
||||
"device": "aa",
|
||||
"temperature_max": 30.0,
|
||||
"humidity_min": 55.0,
|
||||
"start": baseTime.UnixNano(),
|
||||
"end": baseTime.Add(2 * time.Second).UnixNano(),
|
||||
},
|
||||
{
|
||||
"device": "bb",
|
||||
"age_max": 3.0,
|
||||
"score_min": 300.0,
|
||||
"start": baseTime.UnixNano(),
|
||||
"end": baseTime.Add(2 * time.Second).UnixNano(),
|
||||
"device": "bb",
|
||||
"temperature_max": 22.0,
|
||||
"humidity_min": 70.0,
|
||||
"start": baseTime.UnixNano(),
|
||||
"end": baseTime.Add(2 * time.Second).UnixNano(),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -301,16 +299,14 @@ func TestWindowSlotAgg(t *testing.T) {
|
||||
for _, expectedResult := range expected {
|
||||
found := false
|
||||
for _, resultMap := range resultSlice {
|
||||
//if resultMap, ok := result.(map[string]interface{}); ok {
|
||||
if resultMap["device"] == expectedResult["device"] {
|
||||
assert.InEpsilon(t, expectedResult["age_max"].(float64), resultMap["age_max"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["score_min"].(float64), resultMap["score_min"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["temperature_max"].(float64), resultMap["temperature_max"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["humidity_min"].(float64), resultMap["humidity_min"].(float64), 0.0001)
|
||||
assert.Equal(t, expectedResult["start"].(int64), resultMap["start"].(int64))
|
||||
assert.Equal(t, expectedResult["end"].(int64), resultMap["end"].(int64))
|
||||
found = true
|
||||
break
|
||||
}
|
||||
//}
|
||||
}
|
||||
assert.True(t, found, fmt.Sprintf("Expected result for device %v not found", expectedResult["device"]))
|
||||
}
|
||||
|
||||
+26
-28
@@ -76,15 +76,15 @@ func TestStreamData(t *testing.T) {
|
||||
|
||||
func TestStreamsql(t *testing.T) {
|
||||
streamsql := New()
|
||||
var rsql = "SELECT device,max(age) as max_age,min(score) as min_score,window_start() as start,window_end() as end FROM stream group by device,SlidingWindow('2s','1s') with (TIMESTAMP='Ts',TIMEUNIT='ss')"
|
||||
var rsql = "SELECT device,max(temperature) as max_temp,min(humidity) as min_humidity,window_start() as start,window_end() as end FROM stream group by device,SlidingWindow('2s','1s') with (TIMESTAMP='Ts',TIMEUNIT='ss')"
|
||||
err := streamsql.Execute(rsql)
|
||||
assert.Nil(t, err)
|
||||
strm := streamsql.stream
|
||||
baseTime := time.Date(2025, 4, 7, 16, 46, 0, 0, time.UTC)
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{"device": "aa", "age": 5.0, "score": 100, "Ts": baseTime},
|
||||
map[string]interface{}{"device": "aa", "age": 10.0, "score": 200, "Ts": baseTime.Add(1 * time.Second)},
|
||||
map[string]interface{}{"device": "bb", "age": 3.0, "score": 300, "Ts": baseTime},
|
||||
map[string]interface{}{"device": "aa", "temperature": 25.0, "humidity": 60, "Ts": baseTime},
|
||||
map[string]interface{}{"device": "aa", "temperature": 30.0, "humidity": 55, "Ts": baseTime.Add(1 * time.Second)},
|
||||
map[string]interface{}{"device": "bb", "temperature": 22.0, "humidity": 70, "Ts": baseTime},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
@@ -109,18 +109,18 @@ func TestStreamsql(t *testing.T) {
|
||||
|
||||
expected := []map[string]interface{}{
|
||||
{
|
||||
"device": "aa",
|
||||
"max_age": 10.0,
|
||||
"min_score": 100.0,
|
||||
"start": baseTime.UnixNano(),
|
||||
"end": baseTime.Add(2 * time.Second).UnixNano(),
|
||||
"device": "aa",
|
||||
"max_temp": 30.0,
|
||||
"min_humidity": 55.0,
|
||||
"start": baseTime.UnixNano(),
|
||||
"end": baseTime.Add(2 * time.Second).UnixNano(),
|
||||
},
|
||||
{
|
||||
"device": "bb",
|
||||
"max_age": 3.0,
|
||||
"min_score": 300.0,
|
||||
"start": baseTime.UnixNano(),
|
||||
"end": baseTime.Add(2 * time.Second).UnixNano(),
|
||||
"device": "bb",
|
||||
"max_temp": 22.0,
|
||||
"min_humidity": 70.0,
|
||||
"start": baseTime.UnixNano(),
|
||||
"end": baseTime.Add(2 * time.Second).UnixNano(),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -131,16 +131,14 @@ func TestStreamsql(t *testing.T) {
|
||||
for _, expectedResult := range expected {
|
||||
found := false
|
||||
for _, resultMap := range resultSlice {
|
||||
//if resultMap, ok := result.(map[string]interface{}); ok {
|
||||
if resultMap["device"] == expectedResult["device"] {
|
||||
assert.InEpsilon(t, expectedResult["max_age"].(float64), resultMap["max_age"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["min_score"].(float64), resultMap["min_score"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["max_temp"].(float64), resultMap["max_temp"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["min_humidity"].(float64), resultMap["min_humidity"].(float64), 0.0001)
|
||||
assert.Equal(t, expectedResult["start"].(int64), resultMap["start"].(int64))
|
||||
assert.Equal(t, expectedResult["end"].(int64), resultMap["end"].(int64))
|
||||
found = true
|
||||
break
|
||||
}
|
||||
//}
|
||||
}
|
||||
assert.True(t, found, fmt.Sprintf("Expected result for device %v not found", expectedResult["device"]))
|
||||
}
|
||||
@@ -148,15 +146,15 @@ func TestStreamsql(t *testing.T) {
|
||||
|
||||
func TestStreamsqlWithoutGroupBy(t *testing.T) {
|
||||
streamsql := New()
|
||||
var rsql = "SELECT max(age) as max_age,min(score) as min_score,window_start() as start,window_end() as end FROM stream SlidingWindow('2s','1s') with (TIMESTAMP='Ts',TIMEUNIT='ss')"
|
||||
var rsql = "SELECT max(temperature) as max_temp,min(humidity) as min_humidity,window_start() as start,window_end() as end FROM stream SlidingWindow('2s','1s') with (TIMESTAMP='Ts',TIMEUNIT='ss')"
|
||||
err := streamsql.Execute(rsql)
|
||||
assert.Nil(t, err)
|
||||
strm := streamsql.stream
|
||||
baseTime := time.Date(2025, 4, 7, 16, 46, 0, 0, time.UTC)
|
||||
testData := []interface{}{
|
||||
map[string]interface{}{"device": "aa", "age": 5.0, "score": 100, "Ts": baseTime},
|
||||
map[string]interface{}{"device": "aa", "age": 10.0, "score": 200, "Ts": baseTime.Add(1 * time.Second)},
|
||||
map[string]interface{}{"device": "bb", "age": 3.0, "score": 300, "Ts": baseTime},
|
||||
map[string]interface{}{"device": "aa", "temperature": 25.0, "humidity": 60, "Ts": baseTime},
|
||||
map[string]interface{}{"device": "aa", "temperature": 30.0, "humidity": 55, "Ts": baseTime.Add(1 * time.Second)},
|
||||
map[string]interface{}{"device": "bb", "temperature": 22.0, "humidity": 70, "Ts": baseTime},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
@@ -181,10 +179,10 @@ func TestStreamsqlWithoutGroupBy(t *testing.T) {
|
||||
|
||||
expected := []map[string]interface{}{
|
||||
{
|
||||
"max_age": 10.0,
|
||||
"min_score": 100.0,
|
||||
"start": baseTime.UnixNano(),
|
||||
"end": baseTime.Add(2 * time.Second).UnixNano(),
|
||||
"max_temp": 30.0,
|
||||
"min_humidity": 55.0,
|
||||
"start": baseTime.UnixNano(),
|
||||
"end": baseTime.Add(2 * time.Second).UnixNano(),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -195,8 +193,8 @@ func TestStreamsqlWithoutGroupBy(t *testing.T) {
|
||||
for _, expectedResult := range expected {
|
||||
//found := false
|
||||
for _, resultMap := range resultSlice {
|
||||
assert.InEpsilon(t, expectedResult["max_age"].(float64), resultMap["max_age"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["min_score"].(float64), resultMap["min_score"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["max_temp"].(float64), resultMap["max_temp"].(float64), 0.0001)
|
||||
assert.InEpsilon(t, expectedResult["min_humidity"].(float64), resultMap["min_humidity"].(float64), 0.0001)
|
||||
assert.Equal(t, expectedResult["start"].(int64), resultMap["start"].(int64))
|
||||
assert.Equal(t, expectedResult["end"].(int64), resultMap["end"].(int64))
|
||||
}
|
||||
|
||||
@@ -115,7 +115,7 @@ func (d TestDate2) GetTimestamp() time.Time {
|
||||
|
||||
func TestGetTimestamp(t *testing.T) {
|
||||
t_0 := time.Now()
|
||||
data := map[string]interface{}{"device": "aa", "age": 15.0, "score": 100, "ts": t_0}
|
||||
data := map[string]interface{}{"device": "aa", "temperature": 25.0, "humidity": 60, "ts": t_0}
|
||||
t_1 := GetTimestamp(data, "ts")
|
||||
|
||||
data_1 := TestDate{Ts: t_0}
|
||||
|
||||
Reference in New Issue
Block a user