forked from GiteaTest2015/streamsql
432 lines
11 KiB
Go
432 lines
11 KiB
Go
package streamsql
|
||
|
||
import (
|
||
"sync"
|
||
"sync/atomic"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/stretchr/testify/assert"
|
||
"github.com/stretchr/testify/require"
|
||
)
|
||
|
||
// TestStreamSQLErrorHandling 测试StreamSQL的错误处理机制
|
||
func TestStreamSQLErrorHandling(t *testing.T) {
|
||
t.Run("invalid SQL syntax", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("INVALID SQL STATEMENT")
|
||
require.NotNil(t, err)
|
||
require.Contains(t, err.Error(), "SQL parsing failed")
|
||
})
|
||
|
||
t.Run("missing SELECT keyword", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("FROM stream WHERE id > 1")
|
||
// 修改后的解析器会对缺少SELECT关键字进行严格检查
|
||
require.NotNil(t, err)
|
||
require.Contains(t, err.Error(), "Expected SELECT")
|
||
})
|
||
|
||
t.Run("invalid function name", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT INVALID_FUNCTION(id) FROM stream")
|
||
require.NotNil(t, err)
|
||
})
|
||
|
||
t.Run("invalid window function", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT id FROM stream GROUP BY InvalidWindow('5s')")
|
||
// TODO InvalidWindow 在SQL解析阶段被当作普通字段处理,而不是窗口函数
|
||
// 因此不会在stream创建阶段报错,这是当前解析器的设计行为
|
||
require.Nil(t, err)
|
||
})
|
||
|
||
t.Run("EmitSync without Execute", func(t *testing.T) {
|
||
ssql := New()
|
||
_, err := ssql.EmitSync(map[string]interface{}{"id": 1})
|
||
require.NotNil(t, err)
|
||
require.Contains(t, err.Error(), "stream not initialized")
|
||
})
|
||
|
||
t.Run("EmitSync with aggregation query", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT COUNT(*) FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
_, err = ssql.EmitSync(map[string]interface{}{"id": 1})
|
||
require.NotNil(t, err)
|
||
require.Contains(t, err.Error(), "synchronous mode only supports non-aggregation queries")
|
||
})
|
||
|
||
t.Run("Emit without Execute", func(t *testing.T) {
|
||
ssql := New()
|
||
// 这不应该引发panic,但也不会有任何效果
|
||
ssql.Emit(map[string]interface{}{"id": 1})
|
||
})
|
||
|
||
t.Run("Stop without Execute", func(t *testing.T) {
|
||
ssql := New()
|
||
// 这不应该引发panic
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("GetStats without Execute", func(t *testing.T) {
|
||
ssql := New()
|
||
stats := ssql.GetStats()
|
||
require.NotNil(t, stats)
|
||
require.Equal(t, 0, len(stats))
|
||
})
|
||
|
||
t.Run("GetDetailedStats without Execute", func(t *testing.T) {
|
||
ssql := New()
|
||
stats := ssql.GetDetailedStats()
|
||
require.NotNil(t, stats)
|
||
require.Equal(t, 0, len(stats))
|
||
})
|
||
|
||
t.Run("ToChannel without Execute", func(t *testing.T) {
|
||
ssql := New()
|
||
ch := ssql.ToChannel()
|
||
require.Nil(t, ch)
|
||
})
|
||
|
||
t.Run("Stream without Execute", func(t *testing.T) {
|
||
ssql := New()
|
||
stream := ssql.Stream()
|
||
require.Nil(t, stream)
|
||
})
|
||
|
||
t.Run("IsAggregationQuery without Execute", func(t *testing.T) {
|
||
ssql := New()
|
||
isAgg := ssql.IsAggregationQuery()
|
||
require.False(t, isAgg)
|
||
})
|
||
|
||
t.Run("AddSink without Execute", func(t *testing.T) {
|
||
ssql := New()
|
||
// 这不应该引发panic
|
||
ssql.AddSink(func(results []map[string]interface{}) {})
|
||
})
|
||
|
||
t.Run("PrintTable without Execute", func(t *testing.T) {
|
||
ssql := New()
|
||
// 这不应该引发panic
|
||
ssql.PrintTable()
|
||
})
|
||
}
|
||
|
||
// TestStreamSQLEdgeCases 测试边界条件和特殊情况
|
||
func TestStreamSQLEdgeCases(t *testing.T) {
|
||
t.Run("empty SQL string", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("")
|
||
require.NotNil(t, err)
|
||
})
|
||
|
||
t.Run("whitespace only SQL", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute(" \n\t ")
|
||
require.NotNil(t, err)
|
||
})
|
||
|
||
t.Run("SQL with comments", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("-- This is a comment\nSELECT id FROM stream")
|
||
// 根据实际的SQL解析器行为,这可能成功或失败
|
||
// 这里我们只是确保不会panic
|
||
_ = err
|
||
})
|
||
|
||
t.Run("very long SQL statement", func(t *testing.T) {
|
||
ssql := New()
|
||
longSQL := "SELECT "
|
||
for i := 0; i < 1000; i++ {
|
||
if i > 0 {
|
||
longSQL += ", "
|
||
}
|
||
longSQL += "field" + string(rune('0'+i%10))
|
||
}
|
||
longSQL += " FROM stream"
|
||
err := ssql.Execute(longSQL)
|
||
// 应该能够处理长SQL语句
|
||
_ = err
|
||
})
|
||
|
||
t.Run("multiple Execute calls", func(t *testing.T) {
|
||
ssql := New()
|
||
err1 := ssql.Execute("SELECT id FROM stream")
|
||
require.Nil(t, err1)
|
||
|
||
// 第二次Execute应该失败,因为已经执行过了
|
||
err2 := ssql.Execute("SELECT name FROM stream")
|
||
require.Error(t, err2)
|
||
require.Contains(t, err2.Error(), "Execute() has already been called")
|
||
})
|
||
|
||
t.Run("Execute after Stop", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT id FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
ssql.Stop()
|
||
|
||
// 停止后再次Execute应该失败,因为已经执行过了
|
||
err = ssql.Execute("SELECT name FROM stream")
|
||
require.Error(t, err)
|
||
require.Contains(t, err.Error(), "Execute() has already been called")
|
||
})
|
||
|
||
t.Run("concurrent Execute calls", func(t *testing.T) {
|
||
ssql := New()
|
||
|
||
done := make(chan bool, 2)
|
||
var successCount int32
|
||
var errorCount int32
|
||
|
||
go func() {
|
||
err := ssql.Execute("SELECT id FROM stream")
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
done <- true
|
||
}()
|
||
|
||
go func() {
|
||
err := ssql.Execute("SELECT name FROM stream")
|
||
if err == nil {
|
||
atomic.AddInt32(&successCount, 1)
|
||
} else {
|
||
atomic.AddInt32(&errorCount, 1)
|
||
}
|
||
done <- true
|
||
}()
|
||
|
||
// 等待两个goroutine完成
|
||
<-done
|
||
<-done
|
||
|
||
// 验证只有一个成功,一个失败
|
||
assert.Equal(t, int32(1), atomic.LoadInt32(&successCount))
|
||
assert.Equal(t, int32(1), atomic.LoadInt32(&errorCount))
|
||
|
||
// 确保最终有一个有效的stream
|
||
require.NotNil(t, ssql.Stream())
|
||
})
|
||
}
|
||
|
||
// TestStreamSQLNilHandling 测试nil值处理
|
||
func TestStreamSQLNilHandling(t *testing.T) {
|
||
t.Run("emit nil map", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT id FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
// 发送nil数据不应该panic
|
||
ssql.Emit(nil)
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("emit map with nil values", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT id, name FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
// 发送包含nil值的数据
|
||
ssql.Emit(map[string]interface{}{
|
||
"id": 1,
|
||
"name": nil,
|
||
})
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("EmitSync with nil data", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT id FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
// EmitSync with nil data
|
||
_, err = ssql.EmitSync(nil)
|
||
// 根据实现,这可能返回错误或处理nil值
|
||
_ = err
|
||
ssql.Stop()
|
||
})
|
||
}
|
||
|
||
// TestStreamSQLComplexQueries 测试复杂查询
|
||
func TestStreamSQLComplexQueries(t *testing.T) {
|
||
t.Run("query with multiple fields", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT id, name, value, timestamp FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
ssql.Emit(map[string]interface{}{
|
||
"id": 1,
|
||
"name": "test",
|
||
"value": 100.5,
|
||
"timestamp": time.Now(),
|
||
})
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("query with WHERE clause", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT id, value FROM stream WHERE value > 50")
|
||
require.Nil(t, err)
|
||
|
||
ssql.Emit(map[string]interface{}{"id": 1, "value": 100})
|
||
ssql.Emit(map[string]interface{}{"id": 2, "value": 25})
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("query with aggregation functions", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT COUNT(*), SUM(value), AVG(value) FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
for i := 0; i < 5; i++ {
|
||
ssql.Emit(map[string]interface{}{"id": i, "value": i * 10})
|
||
}
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("query with window functions", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT id, value FROM stream GROUP BY TumblingWindow('5s')")
|
||
// 根据实际实现,这可能成功或失败
|
||
_ = err
|
||
if err == nil {
|
||
ssql.Stop()
|
||
}
|
||
})
|
||
}
|
||
|
||
// TestStreamSQLDataTypes 测试不同数据类型
|
||
func TestStreamSQLDataTypes(t *testing.T) {
|
||
t.Run("string data types", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT name FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
ssql.Emit(map[string]interface{}{"name": "test string"})
|
||
ssql.Emit(map[string]interface{}{"name": ""})
|
||
ssql.Emit(map[string]interface{}{"name": "unicode测试🚀"})
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("numeric data types", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT value FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
ssql.Emit(map[string]interface{}{"value": 42})
|
||
ssql.Emit(map[string]interface{}{"value": 3.14159})
|
||
ssql.Emit(map[string]interface{}{"value": int64(9223372036854775807)})
|
||
ssql.Emit(map[string]interface{}{"value": float32(1.23)})
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("boolean data types", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT active FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
ssql.Emit(map[string]interface{}{"active": true})
|
||
ssql.Emit(map[string]interface{}{"active": false})
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("time data types", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT timestamp FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
now := time.Now()
|
||
ssql.Emit(map[string]interface{}{"timestamp": now})
|
||
ssql.Emit(map[string]interface{}{"timestamp": now.Unix()})
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("array and slice data types", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT data FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
ssql.Emit(map[string]interface{}{"data": []int{1, 2, 3, 4, 5}})
|
||
ssql.Emit(map[string]interface{}{"data": []string{"a", "b", "c"}})
|
||
ssql.Emit(map[string]interface{}{"data": []interface{}{1, "test", true}})
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("map data types", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT metadata FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
ssql.Emit(map[string]interface{}{
|
||
"metadata": map[string]interface{}{
|
||
"key1": "value1",
|
||
"key2": 42,
|
||
"key3": true,
|
||
},
|
||
})
|
||
ssql.Stop()
|
||
})
|
||
}
|
||
|
||
// TestStreamSQLStressTest 压力测试
|
||
func TestStreamSQLStressTest(t *testing.T) {
|
||
t.Run("high frequency emissions", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT id FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
// 高频率发送数据
|
||
for i := 0; i < 1000; i++ {
|
||
ssql.Emit(map[string]interface{}{"id": i})
|
||
}
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("large data payloads", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT data FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
// 发送大数据负载
|
||
largeString := make([]byte, 10*1024) // 10KB
|
||
for i := range largeString {
|
||
largeString[i] = byte('A' + (i % 26))
|
||
}
|
||
|
||
for i := 0; i < 10; i++ {
|
||
ssql.Emit(map[string]interface{}{"data": string(largeString)})
|
||
}
|
||
ssql.Stop()
|
||
})
|
||
|
||
t.Run("concurrent operations", func(t *testing.T) {
|
||
ssql := New()
|
||
err := ssql.Execute("SELECT id FROM stream")
|
||
require.Nil(t, err)
|
||
|
||
var wg sync.WaitGroup
|
||
numWorkers := 5
|
||
numEmissions := 100
|
||
|
||
for i := 0; i < numWorkers; i++ {
|
||
wg.Add(1)
|
||
go func(workerID int) {
|
||
defer wg.Done()
|
||
for j := 0; j < numEmissions; j++ {
|
||
ssql.Emit(map[string]interface{}{"id": workerID*1000 + j})
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
ssql.Stop()
|
||
})
|
||
}
|