From 8475810768819cc3c95674ffa34ba53506450b0d Mon Sep 17 00:00:00 2001 From: rulego-team Date: Wed, 30 Jul 2025 18:52:01 +0800 Subject: [PATCH] fix:fix: resolve window initialization deadlock in tests --- .github/workflows/release.yml | 51 ++--------------------------------- streamsql_is_null_test.go | 46 +++++++++++++++---------------- window/session_window.go | 12 ++++++++- window/sliding_window.go | 12 +++++++-- window/tumbling_window.go | 11 +++++++- 5 files changed, 56 insertions(+), 76 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5420d52..be07b25 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -29,12 +29,7 @@ jobs: run: go mod download - name: Run all tests - run: go test -v -race -timeout 30s ./... - - - name: Run CASE expression tests specifically - run: | - echo "Testing CASE expression functionality..." - go test -v -run TestCaseExpression -timeout 20s + run: go test -v -race -timeout 300s ./... release: name: Create Release @@ -61,52 +56,10 @@ jobs: - name: Download dependencies run: go mod download - - name: Build binaries - run: | - # Build for multiple platforms - GOOS=linux GOARCH=amd64 go build -o streamsql-linux-amd64 ./... - GOOS=windows GOARCH=amd64 go build -o streamsql-windows-amd64.exe ./... - GOOS=darwin GOARCH=amd64 go build -o streamsql-darwin-amd64 ./... - GOOS=darwin GOARCH=arm64 go build -o streamsql-darwin-arm64 ./... - - - name: Generate changelog - id: changelog - run: | - echo "CHANGELOG<> $GITHUB_OUTPUT - echo "## 🚀 StreamSQL $(echo ${{ github.ref }} | sed 's/refs\/tags\///')" >> $GITHUB_OUTPUT - echo "" >> $GITHUB_OUTPUT - echo "### ✨ 新增功能" >> $GITHUB_OUTPUT - echo "- 完善的CASE表达式支持" >> $GITHUB_OUTPUT - echo "- 多条件逻辑表达式 (AND, OR)" >> $GITHUB_OUTPUT - echo "- 数学函数集成" >> $GITHUB_OUTPUT - echo "- 字段提取和引用功能" >> $GITHUB_OUTPUT - echo "" >> $GITHUB_OUTPUT - echo "### 🔧 改进" >> $GITHUB_OUTPUT - echo "- 负数解析优化" >> $GITHUB_OUTPUT - echo "- 字符串和数值混合比较" >> $GITHUB_OUTPUT - echo "- 表达式解析性能提升" >> $GITHUB_OUTPUT - echo "" >> $GITHUB_OUTPUT - echo "### 📋 测试覆盖" >> $GITHUB_OUTPUT - echo "- ✅ 基础CASE表达式解析" >> $GITHUB_OUTPUT - echo "- ✅ 复杂条件组合" >> $GITHUB_OUTPUT - echo "- ✅ 函数调用支持" >> $GITHUB_OUTPUT - echo "- ✅ 字段提取功能" >> $GITHUB_OUTPUT - echo "- ⚠️ 聚合函数中的使用 (部分支持)" >> $GITHUB_OUTPUT - echo "" >> $GITHUB_OUTPUT - echo "---" >> $GITHUB_OUTPUT - echo "📖 **完整文档**: [README.md](README.md) | [中文文档](README_ZH.md)" >> $GITHUB_OUTPUT - echo "EOF" >> $GITHUB_OUTPUT - - name: Create Release uses: softprops/action-gh-release@v1 with: - body: ${{ steps.changelog.outputs.CHANGELOG }} - files: | - streamsql-linux-amd64 - streamsql-windows-amd64.exe - streamsql-darwin-amd64 - streamsql-darwin-arm64 draft: false prerelease: false env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/streamsql_is_null_test.go b/streamsql_is_null_test.go index c31ab54..9334ca7 100644 --- a/streamsql_is_null_test.go +++ b/streamsql_is_null_test.go @@ -2,6 +2,7 @@ package streamsql import ( "fmt" + "sync" "testing" "time" @@ -114,38 +115,33 @@ func TestIsNullOperatorInSQL(t *testing.T) { // 收集结果 var results []map[string]interface{} resultChan := make(chan interface{}, 10) + resultsMutex := sync.Mutex{} ssql.Stream().AddSink(func(result interface{}) { resultChan <- result }) - // 使用一个done channel来同步 - done := make(chan bool, 1) - // 添加测试数据 for _, data := range tc.testData { ssql.Stream().Emit(data) } - // 在另一个goroutine中收集结果 - go func() { - defer func() { done <- true }() - // 等待一段时间收集结果 - timeout := time.After(300 * time.Millisecond) - for { - select { - case result := <-resultChan: - if resultSlice, ok := result.([]map[string]interface{}); ok { - results = append(results, resultSlice...) - } - case <-timeout: - return - } - } - }() + // 使用更短的超时时间,避免在CI环境中长时间等待 + timeout := time.After(500 * time.Millisecond) - // 等待收集完成 - <-done + collecting: + for { + select { + case result := <-resultChan: + resultsMutex.Lock() + if resultSlice, ok := result.([]map[string]interface{}); ok { + results = append(results, resultSlice...) + } + resultsMutex.Unlock() + case <-timeout: + break collecting + } + } // 验证结果数量 assert.Len(t, results, len(tc.expected), "结果数量应该匹配") @@ -156,10 +152,12 @@ func TestIsNullOperatorInSQL(t *testing.T) { expectedDeviceIds[i] = exp["deviceId"].(string) } + resultsMutex.Lock() actualDeviceIds := make([]string, len(results)) for i, result := range results { actualDeviceIds[i] = result["deviceId"].(string) } + resultsMutex.Unlock() // 验证每个期望的设备ID都在结果中 for _, expectedId := range expectedDeviceIds { @@ -167,6 +165,7 @@ func TestIsNullOperatorInSQL(t *testing.T) { } // 验证每个结果的字段值 + resultsMutex.Lock() for _, result := range results { deviceId := result["deviceId"].(string) // 找到对应的期望结果 @@ -186,6 +185,7 @@ func TestIsNullOperatorInSQL(t *testing.T) { } } } + resultsMutex.Unlock() }) } } @@ -424,7 +424,7 @@ func TestIsNullWithOtherOperators(t *testing.T) { // 使用超时方式安全收集结果 var results []map[string]interface{} - timeout := time.After(500 * time.Millisecond) + timeout := time.After(2 * time.Second) collecting: for { @@ -1004,7 +1004,7 @@ func TestMixedNullComparisons(t *testing.T) { // 使用超时方式安全收集结果 var results []map[string]interface{} - timeout := time.After(500 * time.Millisecond) + timeout := time.After(2 * time.Second) collecting: for { diff --git a/window/session_window.go b/window/session_window.go index ce6b0e0..517c877 100644 --- a/window/session_window.go +++ b/window/session_window.go @@ -135,12 +135,22 @@ func (sw *SessionWindow) Add(data interface{}) { } // Start 启动会话窗口的定时检查机制 +// Start 启动会话窗口,开始定期检查过期会话 +// 采用延迟初始化模式,避免在没有数据时无限等待,同时确保后续数据能正常处理 func (sw *SessionWindow) Start() { go func() { - <-sw.initChan // 在函数结束时关闭输出通道 defer close(sw.outputChan) + // 等待初始化完成或上下文取消 + select { + case <-sw.initChan: + // 正常初始化完成,继续处理 + case <-sw.ctx.Done(): + // 上下文被取消,直接退出 + return + } + // 定期检查过期会话 sw.tickerMu.Lock() sw.ticker = time.NewTicker(sw.timeout / 2) diff --git a/window/sliding_window.go b/window/sliding_window.go index d587b34..47b14e1 100644 --- a/window/sliding_window.go +++ b/window/sliding_window.go @@ -115,13 +115,21 @@ func (sw *SlidingWindow) Add(data interface{}) { } // Start 启动滑动窗口,开始定时触发窗口 +// 采用延迟初始化模式,避免在没有数据时无限等待,同时确保后续数据能正常处理 func (sw *SlidingWindow) Start() { go func() { - // 等待初始化信号 - <-sw.initChan // 在函数结束时关闭输出通道。 defer close(sw.outputChan) + // 等待初始化完成或上下文取消 + select { + case <-sw.initChan: + // 正常初始化完成,继续处理 + case <-sw.ctx.Done(): + // 上下文被取消,直接退出 + return + } + for { // 在每次循环中安全地获取timer sw.timerMu.Lock() diff --git a/window/tumbling_window.go b/window/tumbling_window.go index c916048..e74444f 100644 --- a/window/tumbling_window.go +++ b/window/tumbling_window.go @@ -130,12 +130,21 @@ func (tw *TumblingWindow) Stop() { } // Start 启动滚动窗口的定时触发机制。 +// 采用延迟初始化模式,避免在没有数据时无限等待,同时确保后续数据能正常处理 func (tw *TumblingWindow) Start() { go func() { - <-tw.initChan // 在函数结束时关闭输出通道。 defer close(tw.outputChan) + // 等待初始化完成或上下文取消 + select { + case <-tw.initChan: + // 正常初始化完成,继续处理 + case <-tw.ctx.Done(): + // 上下文被取消,直接退出 + return + } + for { // 在每次循环中安全地获取timer tw.timerMu.Lock()