mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-04-18 10:08:28 +00:00
refactor(stream): 移除stream定时触发器并优化测试逻辑
This commit is contained in:
+34
-3
@@ -3,6 +3,7 @@ package streamsql
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -20,6 +21,13 @@ func TestStreamData(t *testing.T) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// 添加测试数据
|
||||
wg.Add(1)
|
||||
// 创建上下文和取消函数,设置测试超时时间
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
// 添加测试数据
|
||||
go func() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
@@ -34,6 +42,8 @@ func TestStreamData(t *testing.T) {
|
||||
"humidity": 50.0 + rand.Float64()*20, // 50-70%湿度
|
||||
}
|
||||
ssql.stream.AddData(randomData)
|
||||
case <-ctx.Done():
|
||||
return // 上下文取消时退出
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -43,14 +53,35 @@ func TestStreamData(t *testing.T) {
|
||||
ssql.stream.AddSink(func(result interface{}) {
|
||||
resultChan <- result
|
||||
})
|
||||
// 计数器,用于记录收到的结果数量
|
||||
resultCount := 0
|
||||
maxResults := 5 // 设置期望接收的结果数量
|
||||
//打印结果
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for result := range resultChan {
|
||||
fmt.Printf("打印结果: %v\n", result)
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case result := <-resultChan:
|
||||
fmt.Printf("打印结果: [%s] %v\n", time.Now().Format("15:04:05.000"), result)
|
||||
resultCount++
|
||||
// 收到足够的结果后取消上下文
|
||||
if resultCount >= maxResults {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
ssql.stream.Window.Reset()
|
||||
return // 上下文取消时退出
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(30 * time.Second)
|
||||
// 等待所有 goroutine 完成
|
||||
wg.Wait()
|
||||
|
||||
// 验证是否收到了结果
|
||||
assert.Greater(t, resultCount, 5, "应该至少收到一个结果")
|
||||
}
|
||||
func TestStreamsql(t *testing.T) {
|
||||
streamsql := New()
|
||||
|
||||
Reference in New Issue
Block a user