fix:fix: resolve window initialization deadlock in tests

This commit is contained in:
rulego-team
2025-07-30 18:52:01 +08:00
parent ffb050dc51
commit 8cb4152449
5 changed files with 56 additions and 76 deletions
+2 -49
View File
@@ -29,12 +29,7 @@ jobs:
run: go mod download
- name: Run all tests
run: go test -v -race -timeout 30s ./...
- name: Run CASE expression tests specifically
run: |
echo "Testing CASE expression functionality..."
go test -v -run TestCaseExpression -timeout 20s
run: go test -v -race -timeout 300s ./...
release:
name: Create Release
@@ -61,52 +56,10 @@ jobs:
- name: Download dependencies
run: go mod download
- name: Build binaries
run: |
# Build for multiple platforms
GOOS=linux GOARCH=amd64 go build -o streamsql-linux-amd64 ./...
GOOS=windows GOARCH=amd64 go build -o streamsql-windows-amd64.exe ./...
GOOS=darwin GOARCH=amd64 go build -o streamsql-darwin-amd64 ./...
GOOS=darwin GOARCH=arm64 go build -o streamsql-darwin-arm64 ./...
- name: Generate changelog
id: changelog
run: |
echo "CHANGELOG<<EOF" >> $GITHUB_OUTPUT
echo "## 🚀 StreamSQL $(echo ${{ github.ref }} | sed 's/refs\/tags\///')" >> $GITHUB_OUTPUT
echo "" >> $GITHUB_OUTPUT
echo "### ✨ 新增功能" >> $GITHUB_OUTPUT
echo "- 完善的CASE表达式支持" >> $GITHUB_OUTPUT
echo "- 多条件逻辑表达式 (AND, OR)" >> $GITHUB_OUTPUT
echo "- 数学函数集成" >> $GITHUB_OUTPUT
echo "- 字段提取和引用功能" >> $GITHUB_OUTPUT
echo "" >> $GITHUB_OUTPUT
echo "### 🔧 改进" >> $GITHUB_OUTPUT
echo "- 负数解析优化" >> $GITHUB_OUTPUT
echo "- 字符串和数值混合比较" >> $GITHUB_OUTPUT
echo "- 表达式解析性能提升" >> $GITHUB_OUTPUT
echo "" >> $GITHUB_OUTPUT
echo "### 📋 测试覆盖" >> $GITHUB_OUTPUT
echo "- ✅ 基础CASE表达式解析" >> $GITHUB_OUTPUT
echo "- ✅ 复杂条件组合" >> $GITHUB_OUTPUT
echo "- ✅ 函数调用支持" >> $GITHUB_OUTPUT
echo "- ✅ 字段提取功能" >> $GITHUB_OUTPUT
echo "- ⚠️ 聚合函数中的使用 (部分支持)" >> $GITHUB_OUTPUT
echo "" >> $GITHUB_OUTPUT
echo "---" >> $GITHUB_OUTPUT
echo "📖 **完整文档**: [README.md](README.md) | [中文文档](README_ZH.md)" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: Create Release
uses: softprops/action-gh-release@v1
with:
body: ${{ steps.changelog.outputs.CHANGELOG }}
files: |
streamsql-linux-amd64
streamsql-windows-amd64.exe
streamsql-darwin-amd64
streamsql-darwin-arm64
draft: false
prerelease: false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+23 -23
View File
@@ -2,6 +2,7 @@ package streamsql
import (
"fmt"
"sync"
"testing"
"time"
@@ -114,38 +115,33 @@ func TestIsNullOperatorInSQL(t *testing.T) {
// 收集结果
var results []map[string]interface{}
resultChan := make(chan interface{}, 10)
resultsMutex := sync.Mutex{}
ssql.Stream().AddSink(func(result interface{}) {
resultChan <- result
})
// 使用一个done channel来同步
done := make(chan bool, 1)
// 添加测试数据
for _, data := range tc.testData {
ssql.Stream().Emit(data)
}
// 在另一个goroutine中收集结果
go func() {
defer func() { done <- true }()
// 等待一段时间收集结果
timeout := time.After(300 * time.Millisecond)
for {
select {
case result := <-resultChan:
if resultSlice, ok := result.([]map[string]interface{}); ok {
results = append(results, resultSlice...)
}
case <-timeout:
return
}
}
}()
// 使用更短的超时时间,避免在CI环境中长时间等待
timeout := time.After(500 * time.Millisecond)
// 等待收集完成
<-done
collecting:
for {
select {
case result := <-resultChan:
resultsMutex.Lock()
if resultSlice, ok := result.([]map[string]interface{}); ok {
results = append(results, resultSlice...)
}
resultsMutex.Unlock()
case <-timeout:
break collecting
}
}
// 验证结果数量
assert.Len(t, results, len(tc.expected), "结果数量应该匹配")
@@ -156,10 +152,12 @@ func TestIsNullOperatorInSQL(t *testing.T) {
expectedDeviceIds[i] = exp["deviceId"].(string)
}
resultsMutex.Lock()
actualDeviceIds := make([]string, len(results))
for i, result := range results {
actualDeviceIds[i] = result["deviceId"].(string)
}
resultsMutex.Unlock()
// 验证每个期望的设备ID都在结果中
for _, expectedId := range expectedDeviceIds {
@@ -167,6 +165,7 @@ func TestIsNullOperatorInSQL(t *testing.T) {
}
// 验证每个结果的字段值
resultsMutex.Lock()
for _, result := range results {
deviceId := result["deviceId"].(string)
// 找到对应的期望结果
@@ -186,6 +185,7 @@ func TestIsNullOperatorInSQL(t *testing.T) {
}
}
}
resultsMutex.Unlock()
})
}
}
@@ -424,7 +424,7 @@ func TestIsNullWithOtherOperators(t *testing.T) {
// 使用超时方式安全收集结果
var results []map[string]interface{}
timeout := time.After(500 * time.Millisecond)
timeout := time.After(2 * time.Second)
collecting:
for {
@@ -1004,7 +1004,7 @@ func TestMixedNullComparisons(t *testing.T) {
// 使用超时方式安全收集结果
var results []map[string]interface{}
timeout := time.After(500 * time.Millisecond)
timeout := time.After(2 * time.Second)
collecting:
for {
+11 -1
View File
@@ -135,12 +135,22 @@ func (sw *SessionWindow) Add(data interface{}) {
}
// Start 启动会话窗口的定时检查机制
// Start 启动会话窗口,开始定期检查过期会话
// 采用延迟初始化模式,避免在没有数据时无限等待,同时确保后续数据能正常处理
func (sw *SessionWindow) Start() {
go func() {
<-sw.initChan
// 在函数结束时关闭输出通道
defer close(sw.outputChan)
// 等待初始化完成或上下文取消
select {
case <-sw.initChan:
// 正常初始化完成,继续处理
case <-sw.ctx.Done():
// 上下文被取消,直接退出
return
}
// 定期检查过期会话
sw.tickerMu.Lock()
sw.ticker = time.NewTicker(sw.timeout / 2)
+10 -2
View File
@@ -115,13 +115,21 @@ func (sw *SlidingWindow) Add(data interface{}) {
}
// Start 启动滑动窗口,开始定时触发窗口
// 采用延迟初始化模式,避免在没有数据时无限等待,同时确保后续数据能正常处理
func (sw *SlidingWindow) Start() {
go func() {
// 等待初始化信号
<-sw.initChan
// 在函数结束时关闭输出通道。
defer close(sw.outputChan)
// 等待初始化完成或上下文取消
select {
case <-sw.initChan:
// 正常初始化完成,继续处理
case <-sw.ctx.Done():
// 上下文被取消,直接退出
return
}
for {
// 在每次循环中安全地获取timer
sw.timerMu.Lock()
+10 -1
View File
@@ -130,12 +130,21 @@ func (tw *TumblingWindow) Stop() {
}
// Start 启动滚动窗口的定时触发机制。
// 采用延迟初始化模式,避免在没有数据时无限等待,同时确保后续数据能正常处理
func (tw *TumblingWindow) Start() {
go func() {
<-tw.initChan
// 在函数结束时关闭输出通道。
defer close(tw.outputChan)
// 等待初始化完成或上下文取消
select {
case <-tw.initChan:
// 正常初始化完成,继续处理
case <-tw.ctx.Done():
// 上下文被取消,直接退出
return
}
for {
// 在每次循环中安全地获取timer
tw.timerMu.Lock()