refactor:Rename AddData to Emit

This commit is contained in:
rulego-team
2025-07-25 12:05:21 +08:00
parent 340577f385
commit 1c7e94b167
25 changed files with 4478 additions and 4407 deletions
+4 -4
View File
@@ -82,7 +82,7 @@ func main() {
}
// Handle real-time transformation results
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("Real-time result: %+v\n", result)
})
@@ -110,7 +110,7 @@ func main() {
// Process data one by one, each will output results immediately
for _, data := range sensorData {
ssql.Stream().AddData(data)
ssql.Emit(data)
time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
}
@@ -273,7 +273,7 @@ func main() {
}
// Handle aggregation results
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("Aggregation result: %+v\n", result)
})
@@ -293,7 +293,7 @@ func main() {
"timestamp": time.Now().Unix(),
}
ssql.Stream().AddData(nestedData)
ssql.Emit(nestedData)
}
```
+4 -4
View File
@@ -85,7 +85,7 @@ func main() {
}
// 处理实时转换结果
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("实时处理结果: %+v\n", result)
})
@@ -113,7 +113,7 @@ func main() {
// 逐条处理数据,每条都会立即输出结果
for _, data := range sensorData {
ssql.Stream().AddData(data)
ssql.Emit(data)
time.Sleep(100 * time.Millisecond) // 模拟实时数据到达
}
@@ -289,7 +289,7 @@ func main() {
}
// 处理聚合结果
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %+v\n", result)
})
@@ -309,7 +309,7 @@ func main() {
"timestamp": time.Now().Unix(),
}
ssql.Stream().AddData(nestedData)
ssql.Emit(nestedData)
}
```
+175 -175
View File
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -51,7 +51,7 @@ func main() {
fmt.Println("✓ SQL执行成功")
// 5. 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf("📊 聚合结果: %v\n", result)
})
@@ -69,7 +69,7 @@ func main() {
for _, data := range sensorData {
fmt.Printf(" 设备: %s, 温度: %.1f°F, 湿度: %.1f%%\n",
data["device"], data["temperature"], data["humidity"])
ssql.AddData(data)
ssql.Emit(data)
}
// 7. 等待处理完成
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+8 -8
View File
@@ -625,12 +625,12 @@ func testMathFunctions(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 数学函数结果: %v\n", result)
})
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
}
time.Sleep(1 * time.Second)
@@ -672,12 +672,12 @@ func testStringFunctions(ssql *streamsql.Streamsql) {
},
}
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 字符串函数结果: %v\n", result)
})
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
}
time.Sleep(500 * time.Millisecond)
@@ -715,12 +715,12 @@ func testConversionFunctions(ssql *streamsql.Streamsql) {
},
}
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 转换函数结果: %v\n", result)
})
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
}
time.Sleep(500 * time.Millisecond)
@@ -753,12 +753,12 @@ func testAggregateFunctions(ssql *streamsql.Streamsql) {
map[string]interface{}{"device": "sensor1", "value": 128.0, "category": "A"},
}
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 聚合函数结果: %v\n", result)
})
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
}
time.Sleep(1 * time.Second)
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -78,7 +78,7 @@ func testDataOverflowPersistence() {
"id": i,
"value": fmt.Sprintf("data_%d", i),
}
stream.AddData(data)
stream.Emit(data)
}
duration := time.Since(start)
+4 -4
View File
@@ -122,7 +122,7 @@ func testSimpleQuery(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 简单查询结果: %v\n", result)
})
@@ -143,7 +143,7 @@ func testSimpleQuery(ssql *streamsql.Streamsql) {
}
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
time.Sleep(200 * time.Millisecond) // 稍微延迟
}
@@ -171,7 +171,7 @@ func testAggregateQuery(ssql *streamsql.Streamsql) {
}
// 添加结果监听器
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
fmt.Printf(" 📊 聚合查询结果: %v\n", result)
})
@@ -198,7 +198,7 @@ func testAggregateQuery(ssql *streamsql.Streamsql) {
}
for _, data := range testData {
ssql.AddData(data)
ssql.Emit(data)
}
// 等待窗口触发
File diff suppressed because it is too large Load Diff
+74 -74
View File
@@ -1,74 +1,74 @@
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/types"
)
// demonstrateWindowConfig 演示窗口统一配置的使用
func demonstrateWindowConfig() {
fmt.Println("=== 窗口统一配置演示 ===")
// 1. 测试默认配置的窗口
fmt.Println("\n1. 默认配置窗口测试")
testWindowWithConfig("默认配置", streamsql.New())
// 2. 测试高性能配置的窗口
fmt.Println("\n2. 高性能配置窗口测试")
testWindowWithConfig("高性能配置", streamsql.New(streamsql.WithHighPerformance()))
// 3. 测试低延迟配置的窗口
fmt.Println("\n3. 低延迟配置窗口测试")
testWindowWithConfig("低延迟配置", streamsql.New(streamsql.WithLowLatency()))
// 4. 测试自定义配置的窗口
fmt.Println("\n4. 自定义配置窗口测试")
customConfig := types.DefaultPerformanceConfig()
customConfig.BufferConfig.WindowOutputSize = 2000 // 自定义窗口输出缓冲区大小
testWindowWithConfig("自定义配置", streamsql.New(streamsql.WithCustomPerformance(customConfig)))
fmt.Println("\n=== 窗口配置演示完成 ===")
}
func testWindowWithConfig(configName string, ssql *streamsql.Streamsql) {
// 执行一个简单的滚动窗口查询
sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('2s')"
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ %s - 执行SQL失败: %v\n", configName, err)
return
}
// 添加结果处理器
stream := ssql.Stream()
if stream != nil {
stream.AddSink(func(result interface{}) {
fmt.Printf("📊 %s - 窗口结果: %v\n", configName, result)
})
// 发送测试数据
for i := 0; i < 5; i++ {
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device_%d", i%2),
"temperature": 20.0 + float64(i),
"timestamp": time.Now(),
}
ssql.AddData(data)
}
// 等待处理完成
time.Sleep(3 * time.Second)
// 获取统计信息
stats := ssql.GetDetailedStats()
fmt.Printf("📈 %s - 统计信息: %v\n", configName, stats)
}
// 停止流处理
ssql.Stop()
fmt.Printf("✅ %s - 测试完成\n", configName)
}
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/types"
)
// demonstrateWindowConfig 演示窗口统一配置的使用
func demonstrateWindowConfig() {
fmt.Println("=== 窗口统一配置演示 ===")
// 1. 测试默认配置的窗口
fmt.Println("\n1. 默认配置窗口测试")
testWindowWithConfig("默认配置", streamsql.New())
// 2. 测试高性能配置的窗口
fmt.Println("\n2. 高性能配置窗口测试")
testWindowWithConfig("高性能配置", streamsql.New(streamsql.WithHighPerformance()))
// 3. 测试低延迟配置的窗口
fmt.Println("\n3. 低延迟配置窗口测试")
testWindowWithConfig("低延迟配置", streamsql.New(streamsql.WithLowLatency()))
// 4. 测试自定义配置的窗口
fmt.Println("\n4. 自定义配置窗口测试")
customConfig := types.DefaultPerformanceConfig()
customConfig.BufferConfig.WindowOutputSize = 2000 // 自定义窗口输出缓冲区大小
testWindowWithConfig("自定义配置", streamsql.New(streamsql.WithCustomPerformance(customConfig)))
fmt.Println("\n=== 窗口配置演示完成 ===")
}
func testWindowWithConfig(configName string, ssql *streamsql.Streamsql) {
// 执行一个简单的滚动窗口查询
sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('2s')"
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ %s - 执行SQL失败: %v\n", configName, err)
return
}
// 添加结果处理器
stream := ssql.Stream()
if stream != nil {
stream.AddSink(func(result interface{}) {
fmt.Printf("📊 %s - 窗口结果: %v\n", configName, result)
})
// 发送测试数据
for i := 0; i < 5; i++ {
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device_%d", i%2),
"temperature": 20.0 + float64(i),
"timestamp": time.Now(),
}
ssql.Emit(data)
}
// 等待处理完成
time.Sleep(3 * time.Second)
// 获取统计信息
stats := ssql.GetDetailedStats()
fmt.Printf("📈 %s - 统计信息: %v\n", configName, stats)
}
// 停止流处理
ssql.Stop()
fmt.Printf("✅ %s - 测试完成\n", configName)
}
+1 -1
View File
@@ -1053,7 +1053,7 @@ func (s *Stream) smartSplitArgs(argsStr string) ([]string, error) {
return args, nil
}
func (s *Stream) AddData(data interface{}) {
func (s *Stream) Emit(data interface{}) {
atomic.AddInt64(&s.inputCount, 1)
// 性能优化:直接调用预编译的函数指针,避免switch判断
s.addDataFunc(data)
+10 -10
View File
@@ -55,7 +55,7 @@ func TestStreamProcess(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口关闭并触发结果
@@ -139,7 +139,7 @@ func TestStreamWithoutFilter(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 捕获结果
@@ -235,7 +235,7 @@ func TestIncompleteStreamProcess(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口关闭并触发结果
@@ -323,7 +323,7 @@ func TestWindowSlotAgg(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 捕获结果
@@ -492,7 +492,7 @@ func TestStreamWithPersistenceStrategy(t *testing.T) {
"temperature": float64(20 + i),
"timestamp": time.Now(),
}
stream.AddData(data)
stream.Emit(data)
}
// 等待处理完成
@@ -546,7 +546,7 @@ func TestStreamPersistenceRecovery(t *testing.T) {
}
for _, data := range testData {
stream1.AddData(data)
stream1.Emit(data)
}
// 等待数据持久化
@@ -695,7 +695,7 @@ func TestStreamPersistencePerformance(t *testing.T) {
"value": i,
"data": fmt.Sprintf("performance_test_data_%d", i),
}
stream.AddData(data)
stream.Emit(data)
}
elapsed := time.Since(start)
@@ -795,7 +795,7 @@ func TestSelectStarWithExpressionFields(t *testing.T) {
"age": 25,
}
stream.AddData(testData)
stream.Emit(testData)
// 等待处理完成
time.Sleep(100 * time.Millisecond)
@@ -873,7 +873,7 @@ func TestSelectStarWithExpressionFieldsOverride(t *testing.T) {
"status": "active",
}
stream.AddData(testData)
stream.Emit(testData)
// 等待处理完成
time.Sleep(100 * time.Millisecond)
@@ -939,7 +939,7 @@ func TestSelectStarWithoutExpressionFields(t *testing.T) {
"status": "inactive",
}
stream.AddData(testData)
stream.Emit(testData)
// 等待处理完成
time.Sleep(100 * time.Millisecond)
+79 -6
View File
@@ -31,7 +31,7 @@ import (
//
// ssql := streamsql.New()
// err := ssql.Execute("SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')")
// ssql.AddData(map[string]interface{}{"temperature": 25.5})
// ssql.Emit(map[string]interface{}{"temperature": 25.5})
type Streamsql struct {
stream *stream.Stream
@@ -158,7 +158,7 @@ func (s *Streamsql) Execute(sql string) error {
return nil
}
// AddData 向流中添加一条数据记录。
// Emit 向流中添加一条数据记录。
// 数据会根据已配置的SQL查询进行处理和聚合。
//
// 支持的数据格式:
@@ -171,7 +171,7 @@ func (s *Streamsql) Execute(sql string) error {
// 示例:
//
// // 添加设备数据
// ssql.AddData(map[string]interface{}{
// ssql.Emit(map[string]interface{}{
// "deviceId": "sensor001",
// "temperature": 25.5,
// "humidity": 60.0,
@@ -179,14 +179,14 @@ func (s *Streamsql) Execute(sql string) error {
// })
//
// // 添加用户行为数据
// ssql.AddData(map[string]interface{}{
// ssql.Emit(map[string]interface{}{
// "userId": "user123",
// "action": "click",
// "page": "/home",
// })
func (s *Streamsql) AddData(data interface{}) {
func (s *Streamsql) Emit(data interface{}) {
if s.stream != nil {
s.stream.AddData(data)
s.stream.Emit(data)
}
}
@@ -248,3 +248,76 @@ func (s *Streamsql) Stop() {
s.stream.Stop()
}
}
// AddSink 直接添加结果处理回调函数。
// 这是对 Stream().AddSink() 的便捷封装,使API调用更简洁。
//
// 参数:
// - sink: 结果处理函数,接收处理结果作为参数
//
// 示例:
//
// // 直接添加结果处理
// ssql.AddSink(func(result interface{}) {
// fmt.Printf("处理结果: %v\n", result)
// })
//
// // 添加多个处理器
// ssql.AddSink(func(result interface{}) {
// // 保存到数据库
// saveToDatabase(result)
// })
// ssql.AddSink(func(result interface{}) {
// // 发送到消息队列
// sendToQueue(result)
// })
func (s *Streamsql) AddSink(sink func(interface{})) {
if s.stream != nil {
s.stream.AddSink(sink)
}
}
// Print 打印结果到控制台。
// 这是一个便捷方法,自动添加一个打印结果的sink函数。
//
// 示例:
//
// // 简单打印结果
// ssql.Print()
//
// // 等价于:
// ssql.AddSink(func(result interface{}) {
// fmt.Printf("Ressult: %v\n", result)
// })
func (s *Streamsql) Print() {
s.AddSink(func(result interface{}) {
fmt.Printf("Ressult: %v\n", result)
})
}
// ToChannel 返回结果通道,用于异步获取处理结果。
// 通过此通道可以以非阻塞方式获取流处理结果。
//
// 返回值:
// - <-chan interface{}: 只读的结果通道,如果未执行SQL则返回nil
//
// 示例:
//
// // 获取结果通道
// resultChan := ssql.ToChannel()
// if resultChan != nil {
// go func() {
// for result := range resultChan {
// fmt.Printf("异步结果: %v\n", result)
// }
// }()
// }
//
// 注意:
// - 必须有消费者持续从通道读取数据,否则可能导致流处理阻塞
func (s *Streamsql) ToChannel() <-chan interface{} {
if s.stream != nil {
return s.stream.GetResultsChan()
}
return nil
}
File diff suppressed because it is too large Load Diff
+9 -9
View File
@@ -50,7 +50,7 @@ func TestCaseExpressionInSQL(t *testing.T) {
})
for _, data := range testData {
streamSQL.stream.AddData(data)
streamSQL.Emit(data)
}
// 等待处理
@@ -102,7 +102,7 @@ func TestCaseExpressionInAggregation(t *testing.T) {
})
for _, data := range testData {
streamSQL.stream.AddData(data)
streamSQL.Emit(data)
}
// 等待窗口触发
@@ -274,7 +274,7 @@ func TestComplexCaseExpressionsInAggregation(t *testing.T) {
})
for _, data := range tc.data {
streamSQL.stream.AddData(data)
streamSQL.Emit(data)
}
// 等待窗口触发
@@ -377,7 +377,7 @@ func TestCaseExpressionNonAggregated(t *testing.T) {
// 添加测试数据
for _, data := range tt.testData {
strm.AddData(data)
strm.Emit(data)
}
// 捕获结果
@@ -482,7 +482,7 @@ func TestCaseExpressionAggregated(t *testing.T) {
})
for _, data := range tt.testData {
strm.AddData(data)
strm.Emit(data)
}
// 使用带超时的等待机制
@@ -606,13 +606,13 @@ func TestCaseExpressionNullHandlingInAggregation(t *testing.T) {
var results []map[string]interface{}
resultChan := make(chan interface{}, 10)
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
resultChan <- result
})
// 添加测试数据
for _, data := range tc.testData {
ssql.Stream().AddData(data)
ssql.Stream().Emit(data)
}
// 等待窗口触发
@@ -777,7 +777,7 @@ func TestHavingWithCaseExpressionFunctional(t *testing.T) {
})
for _, data := range testData {
streamSQL.stream.AddData(data)
streamSQL.Emit(data)
}
// 等待窗口触发
@@ -866,7 +866,7 @@ func TestNegativeNumberInSQL(t *testing.T) {
// 添加测试数据
for _, data := range testData {
streamSQL.stream.AddData(data)
streamSQL.Emit(data)
}
// 等待处理
+8 -8
View File
@@ -72,7 +72,7 @@ func TestCustomMathFunctions(t *testing.T) {
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
streamsql.Stream().AddSink(func(result interface{}) {
streamsql.AddSink(func(result interface{}) {
resultChan <- result
})
@@ -86,7 +86,7 @@ func TestCustomMathFunctions(t *testing.T) {
"y2": 4.0, // 距离应该是5
}
streamsql.AddData(testData)
streamsql.Emit(testData)
// 等待窗口触发
time.Sleep(1 * time.Second)
@@ -179,7 +179,7 @@ func TestCustomStringFunctions(t *testing.T) {
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
streamsql.Stream().AddSink(func(result interface{}) {
streamsql.AddSink(func(result interface{}) {
resultChan <- result
})
@@ -189,7 +189,7 @@ func TestCustomStringFunctions(t *testing.T) {
"metadata": `{"version":"1.0","type":"temperature"}`,
}
streamsql.AddData(testData)
streamsql.Emit(testData)
time.Sleep(200 * time.Millisecond)
// 验证结果
@@ -318,7 +318,7 @@ func TestCustomAggregateFunctions(t *testing.T) {
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
streamsql.Stream().AddSink(func(result interface{}) {
streamsql.AddSink(func(result interface{}) {
resultChan <- result
})
@@ -331,7 +331,7 @@ func TestCustomAggregateFunctions(t *testing.T) {
}
for _, data := range testData {
streamsql.AddData(data)
streamsql.Emit(data)
}
time.Sleep(1 * time.Second)
@@ -557,7 +557,7 @@ func TestCustomFunctionWithAggregation(t *testing.T) {
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
streamsql.Stream().AddSink(func(result interface{}) {
streamsql.AddSink(func(result interface{}) {
resultChan <- result
})
@@ -568,7 +568,7 @@ func TestCustomFunctionWithAggregation(t *testing.T) {
}
for _, data := range testData {
streamsql.AddData(data)
streamsql.Emit(data)
}
time.Sleep(1 * time.Second)
+20 -20
View File
@@ -32,7 +32,7 @@ func TestFunctionIntegrationNonAggregation(t *testing.T) {
"temperature": -25.5,
"humidity": 64.0,
}
strm.AddData(testData)
strm.Emit(testData)
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@@ -77,7 +77,7 @@ func TestFunctionIntegrationNonAggregation(t *testing.T) {
"device": "sensor01",
"location": "ROOM_A",
}
strm.AddData(testData)
strm.Emit(testData)
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@@ -120,7 +120,7 @@ func TestFunctionIntegrationNonAggregation(t *testing.T) {
"temperature": 25.7,
"humidity": 65.0,
}
strm.AddData(testData)
strm.Emit(testData)
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@@ -162,7 +162,7 @@ func TestFunctionIntegrationNonAggregation(t *testing.T) {
"device": "test-device",
"timestamp": testTime,
}
strm.AddData(testData)
strm.Emit(testData)
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@@ -204,7 +204,7 @@ func TestFunctionIntegrationNonAggregation(t *testing.T) {
"device": "test-device",
"metadata": `{"type": "temperature_sensor", "version": "1.0"}`,
}
strm.AddData(testData)
strm.Emit(testData)
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@@ -253,7 +253,7 @@ func TestFunctionIntegrationAggregation(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -317,7 +317,7 @@ func TestFunctionIntegrationAggregation(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -368,7 +368,7 @@ func TestFunctionIntegrationAggregation(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -430,7 +430,7 @@ func TestFunctionIntegrationMixed(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -497,7 +497,7 @@ func TestFunctionIntegrationMixed(t *testing.T) {
"device": "sensor1",
"temperature": 25.7,
}
strm.AddData(testData)
strm.Emit(testData)
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@@ -540,7 +540,7 @@ func TestFunctionIntegrationMixed(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -594,7 +594,7 @@ func TestNestedFunctionSupport(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -659,7 +659,7 @@ func TestNestedFunctionSupport(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -728,7 +728,7 @@ func TestNestedFunctionSupport(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -794,7 +794,7 @@ func TestNestedFunctionExecutionOrder(t *testing.T) {
})
// 添加测试数据
strm.AddData(map[string]interface{}{"device": "sensor1", "temperature": 25.67})
strm.Emit(map[string]interface{}{"device": "sensor1", "temperature": 25.67})
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
@@ -834,7 +834,7 @@ func TestNestedFunctionExecutionOrder(t *testing.T) {
})
// 添加测试数据
strm.AddData(map[string]interface{}{"device": "sensor1"})
strm.Emit(map[string]interface{}{"device": "sensor1"})
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
@@ -874,7 +874,7 @@ func TestNestedFunctionExecutionOrder(t *testing.T) {
})
// 添加测试数据
strm.AddData(map[string]interface{}{"device": "sensor1", "temperature": 16.0})
strm.Emit(map[string]interface{}{"device": "sensor1", "temperature": 16.0})
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
@@ -921,7 +921,7 @@ func TestNestedFunctionExecutionOrder(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -966,7 +966,7 @@ func TestNestedFunctionExecutionOrder(t *testing.T) {
})
// 添加测试数据
strm.AddData(map[string]interface{}{"device": "sensor1", "created_at": "2023-12-25 15:30:45"})
strm.Emit(map[string]interface{}{"device": "sensor1", "created_at": "2023-12-25 15:30:45"})
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
@@ -1006,7 +1006,7 @@ func TestNestedFunctionExecutionOrder(t *testing.T) {
})
// 添加测试数据(不包含invalid_field
strm.AddData(map[string]interface{}{"device": "sensor1", "temperature": 25.0})
strm.Emit(map[string]interface{}{"device": "sensor1", "temperature": 25.0})
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+1033 -1033
View File
File diff suppressed because it is too large Load Diff
+551 -551
View File
File diff suppressed because it is too large Load Diff
+5 -5
View File
@@ -104,7 +104,7 @@ func testStringFunctionsOnly(t *testing.T) {
"phone": "13812345678",
}
streamsql.AddData(testData)
streamsql.Emit(testData)
time.Sleep(300 * time.Millisecond)
select {
@@ -149,7 +149,7 @@ func testConversionFunctionsOnly(t *testing.T) {
"user_id": "12345",
}
streamsql.AddData(testData)
streamsql.Emit(testData)
time.Sleep(300 * time.Millisecond)
select {
@@ -205,7 +205,7 @@ func testMathFunctionsInAggregate(t *testing.T) {
}
for _, data := range testData {
streamsql.AddData(data)
streamsql.Emit(data)
}
time.Sleep(1 * time.Second)
@@ -269,7 +269,7 @@ func TestRuntimeFunctionManagement(t *testing.T) {
resultChan <- result
})
streamsql.AddData(map[string]interface{}{"value": "test"})
streamsql.Emit(map[string]interface{}{"value": "test"})
time.Sleep(300 * time.Millisecond)
select {
@@ -390,7 +390,7 @@ func TestCompleteSQLIntegration(t *testing.T) {
"amount": 100.0,
}
streamsql.AddData(testData)
streamsql.Emit(testData)
time.Sleep(300 * time.Millisecond)
select {
+36 -36
View File
@@ -70,8 +70,8 @@ func TestStreamData(t *testing.T) {
"humidity": 50.0 + rand.Float64()*20, // 湿度范围: 50-70%
}
// 将数据添加到流中,触发 StreamSQL 的实时处理
// AddData 会将数据分发到相应的窗口和聚合器中
ssql.stream.AddData(randomData)
// Emit 会将数据分发到相应的窗口和聚合器中
ssql.Emit(randomData)
}
case <-ctx.Done():
@@ -131,7 +131,7 @@ func TestStreamsql(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 捕获结果
resultChan := make(chan interface{})
@@ -201,7 +201,7 @@ func TestStreamsqlWithoutGroupBy(t *testing.T) {
}
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 捕获结果
resultChan := make(chan interface{})
@@ -272,7 +272,7 @@ func TestStreamsqlDistinct(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -374,7 +374,7 @@ func TestStreamsqlLimit(t *testing.T) {
// 实时验证:添加一条数据,立即验证一条结果
for i, data := range testData {
// 添加数据
strm.AddData(data)
strm.Emit(data)
// 立即等待并验证结果
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@@ -438,7 +438,7 @@ func TestStreamsqlLimit(t *testing.T) {
// 添加数据
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待聚合
@@ -513,7 +513,7 @@ func TestStreamsqlLimit(t *testing.T) {
// 添加数据
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口触发
@@ -586,7 +586,7 @@ func TestStreamsqlLimit(t *testing.T) {
// 添加数据
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待聚合
@@ -657,7 +657,7 @@ func TestSimpleQuery(t *testing.T) {
// 发送数据
//fmt.Println("添加数据...")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待结果
@@ -713,7 +713,7 @@ func TestHavingClause(t *testing.T) {
// 添加数据
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -796,7 +796,7 @@ func TestSessionWindow(t *testing.T) {
if item.wait > 0 {
time.Sleep(item.wait)
}
strm.AddData(item.data)
strm.Emit(item.data)
}
// 等待会话超时,使最后一个会话触发
@@ -889,7 +889,7 @@ func TestExpressionInAggregation(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -974,7 +974,7 @@ func TestAdvancedFunctionsInSQL(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -1073,7 +1073,7 @@ func TestCustomFunctionInSQL(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -1158,7 +1158,7 @@ func TestNewAggregateFunctionsInSQL(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -1268,7 +1268,7 @@ func TestStatisticalAggregateFunctionsInSQL(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -1370,7 +1370,7 @@ func TestDeduplicateAggregateInSQL(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -1480,7 +1480,7 @@ func TestExprAggregationFunctions(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -1637,7 +1637,7 @@ func TestAnalyticalFunctionsInSQL(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -1734,7 +1734,7 @@ func TestLagFunctionInSQL(t *testing.T) {
//fmt.Println("添加测试数据:", testData)
for _, data := range testData {
//fmt.Printf("添加第%d个数据: temperature=%.1f\n", i+1, data.(map[string]interface{})["temperature"])
strm.AddData(data)
strm.Emit(data)
time.Sleep(100 * time.Millisecond) // 稍微延迟确保顺序
}
@@ -1832,7 +1832,7 @@ func TestHadChangedFunctionInSQL(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -1912,7 +1912,7 @@ func TestLatestFunctionInSQL(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -2004,7 +2004,7 @@ func TestChangedColFunctionInSQL(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -2085,7 +2085,7 @@ func TestAnalyticalFunctionsIncrementalComputation(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -2184,7 +2184,7 @@ func TestIncrementalComputationBasic(t *testing.T) {
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 创建结果接收通道
@@ -2288,7 +2288,7 @@ func TestExprFunctions(t *testing.T) {
// 添加数据
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待结果
@@ -2374,7 +2374,7 @@ func TestExprFunctionsInAggregation(t *testing.T) {
// 添加数据
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待窗口初始化
@@ -2450,7 +2450,7 @@ func TestNestedExprFunctions(t *testing.T) {
// 添加数据
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待结果
@@ -2540,7 +2540,7 @@ func TestExprFunctionsWithStreamSQLFunctions(t *testing.T) {
// 添加数据
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
}
// 等待结果
@@ -2622,7 +2622,7 @@ func TestSelectAllFeature(t *testing.T) {
}
// 发送数据
strm.AddData(testData)
strm.Emit(testData)
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
@@ -2682,7 +2682,7 @@ func TestSelectAllFeature(t *testing.T) {
// 发送数据
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
// 立即检查结果
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
@@ -2767,7 +2767,7 @@ func TestSelectAllFeature(t *testing.T) {
// 发送数据
for _, data := range testData {
strm.AddData(data)
strm.Emit(data)
// 立即检查结果
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
@@ -2840,7 +2840,7 @@ func TestSelectAllFeature(t *testing.T) {
}
// 发送数据
strm.AddData(testData)
strm.Emit(testData)
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
@@ -2896,7 +2896,7 @@ func TestCaseNullValueHandlingInAggregation(t *testing.T) {
var results []map[string]interface{}
resultChan := make(chan interface{}, 10)
ssql.Stream().AddSink(func(result interface{}) {
ssql.AddSink(func(result interface{}) {
resultChan <- result
})
@@ -2910,7 +2910,7 @@ func TestCaseNullValueHandlingInAggregation(t *testing.T) {
}
for _, data := range testData {
ssql.Stream().AddData(data)
ssql.Emit(data)
}
// 等待窗口触发