From 782ba53dec10dc1694b60c499331394335181599 Mon Sep 17 00:00:00 2001 From: rulego-team Date: Fri, 13 Jun 2025 18:12:14 +0800 Subject: [PATCH] =?UTF-8?q?test:=E5=A2=9E=E5=8A=A0=E5=9F=BA=E5=87=86?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/ci.yml | 133 ++ .github/workflows/release.yml | 112 ++ .golangci.yml | 79 + streamsql_benchmark_test.go | 2661 +++++++++++++++++++++++++++++++++ 4 files changed, 2985 insertions(+) create mode 100644 .github/workflows/ci.yml create mode 100644 .github/workflows/release.yml create mode 100644 .golangci.yml create mode 100644 streamsql_benchmark_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..052bb34 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,133 @@ +name: CI + +on: + push: + branches: [ main, master, develop ] + pull_request: + branches: [ main, master, develop ] + +jobs: + test: + name: Test + runs-on: ubuntu-latest + strategy: + matrix: + go-version: ['1.19', '1.20', '1.21'] + + steps: + - name: Set up Go ${{ matrix.go-version }} + uses: actions/setup-go@v4 + with: + go-version: ${{ matrix.go-version }} + + - name: Check out code + uses: actions/checkout@v4 + + - name: Cache Go modules + uses: actions/cache@v3 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ matrix.go-version }}-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go-${{ matrix.go-version }}- + + - name: Download dependencies + run: go mod download + + - name: Verify dependencies + run: go mod verify + + - name: Build + run: go build -v ./... + + - name: Run tests + run: go test -v -race -timeout 30s ./... + + - name: Run tests with coverage + if: matrix.go-version == '1.21' + run: go test -v -race -coverprofile=coverage.out -covermode=atomic -timeout 30s ./... + + - name: Upload coverage to Codecov + if: matrix.go-version == '1.21' + uses: codecov/codecov-action@v3 + with: + file: ./coverage.out + flags: unittests + name: codecov-umbrella + + case-expression-tests: + name: CASE Expression Tests + runs-on: ubuntu-latest + + steps: + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: Check out code + uses: actions/checkout@v4 + + - name: Cache Go modules + uses: actions/cache@v3 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-1.21-${{ hashFiles('**/go.sum') }} + + - name: Download dependencies + run: go mod download + + - name: Run CASE Expression Parsing Tests + run: go test -v -run TestCaseExpressionParsing -timeout 15s + + - name: Run CASE Expression Comprehensive Tests + run: go test -v -run TestCaseExpressionComprehensive -timeout 15s + + - name: Run CASE Expression Field Extraction Tests + run: go test -v -run TestCaseExpressionFieldExtraction -timeout 15s + + - name: Run CASE Expression in SQL Tests + run: go test -v -run TestCaseExpressionInSQL -timeout 15s + + - name: Run CASE Expression Aggregation Tests (with known limitations) + run: go test -v -run "TestCaseExpressionInAggregation|TestComplexCaseExpressionsInAggregation" -timeout 20s + + - name: Run CASE Expression Edge Cases + run: go test -v -run TestCaseExpressionEdgeCases -timeout 15s + + lint: + name: Lint + runs-on: ubuntu-latest + + steps: + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: Check out code + uses: actions/checkout@v4 + + - name: Run golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + version: latest + args: --timeout=5m + + security: + name: Security Scan + runs-on: ubuntu-latest + + steps: + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: Check out code + uses: actions/checkout@v4 + + - name: Run Gosec Security Scanner + uses: securecodewarrior/github-action-gosec@master + with: + args: './...' \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..5420d52 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,112 @@ +name: Release + +on: + push: + tags: + - 'v*' + +jobs: + test: + name: Test Before Release + runs-on: ubuntu-latest + + steps: + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: Check out code + uses: actions/checkout@v4 + + - name: Cache Go modules + uses: actions/cache@v3 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-1.21-${{ hashFiles('**/go.sum') }} + + - name: Download dependencies + run: go mod download + + - name: Run all tests + run: go test -v -race -timeout 30s ./... + + - name: Run CASE expression tests specifically + run: | + echo "Testing CASE expression functionality..." + go test -v -run TestCaseExpression -timeout 20s + + release: + name: Create Release + runs-on: ubuntu-latest + needs: test + + steps: + - name: Check out code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + + - name: Cache Go modules + uses: actions/cache@v3 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-1.21-${{ hashFiles('**/go.sum') }} + + - name: Download dependencies + run: go mod download + + - name: Build binaries + run: | + # Build for multiple platforms + GOOS=linux GOARCH=amd64 go build -o streamsql-linux-amd64 ./... + GOOS=windows GOARCH=amd64 go build -o streamsql-windows-amd64.exe ./... + GOOS=darwin GOARCH=amd64 go build -o streamsql-darwin-amd64 ./... + GOOS=darwin GOARCH=arm64 go build -o streamsql-darwin-arm64 ./... + + - name: Generate changelog + id: changelog + run: | + echo "CHANGELOG<> $GITHUB_OUTPUT + echo "## 🚀 StreamSQL $(echo ${{ github.ref }} | sed 's/refs\/tags\///')" >> $GITHUB_OUTPUT + echo "" >> $GITHUB_OUTPUT + echo "### ✨ 新增功能" >> $GITHUB_OUTPUT + echo "- 完善的CASE表达式支持" >> $GITHUB_OUTPUT + echo "- 多条件逻辑表达式 (AND, OR)" >> $GITHUB_OUTPUT + echo "- 数学函数集成" >> $GITHUB_OUTPUT + echo "- 字段提取和引用功能" >> $GITHUB_OUTPUT + echo "" >> $GITHUB_OUTPUT + echo "### 🔧 改进" >> $GITHUB_OUTPUT + echo "- 负数解析优化" >> $GITHUB_OUTPUT + echo "- 字符串和数值混合比较" >> $GITHUB_OUTPUT + echo "- 表达式解析性能提升" >> $GITHUB_OUTPUT + echo "" >> $GITHUB_OUTPUT + echo "### 📋 测试覆盖" >> $GITHUB_OUTPUT + echo "- ✅ 基础CASE表达式解析" >> $GITHUB_OUTPUT + echo "- ✅ 复杂条件组合" >> $GITHUB_OUTPUT + echo "- ✅ 函数调用支持" >> $GITHUB_OUTPUT + echo "- ✅ 字段提取功能" >> $GITHUB_OUTPUT + echo "- ⚠️ 聚合函数中的使用 (部分支持)" >> $GITHUB_OUTPUT + echo "" >> $GITHUB_OUTPUT + echo "---" >> $GITHUB_OUTPUT + echo "📖 **完整文档**: [README.md](README.md) | [中文文档](README_ZH.md)" >> $GITHUB_OUTPUT + echo "EOF" >> $GITHUB_OUTPUT + + - name: Create Release + uses: softprops/action-gh-release@v1 + with: + body: ${{ steps.changelog.outputs.CHANGELOG }} + files: | + streamsql-linux-amd64 + streamsql-windows-amd64.exe + streamsql-darwin-amd64 + streamsql-darwin-arm64 + draft: false + prerelease: false + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..e32f63b --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,79 @@ +run: + timeout: 5m + modules-download-mode: readonly + +linters: + enable: + - gofmt + - goimports + - govet + - ineffassign + - misspell + - gosimple + - staticcheck + - unused + - typecheck + - errcheck + - gosec + - gocritic + - gocyclo + - gofumpt + - godot + - goprintffuncname + - gomodguard + - revive + +linters-settings: + gocyclo: + min-complexity: 15 + + gosec: + excludes: + - G404 # 随机数生成器可能不是密码学安全的 + + gocritic: + enabled-tags: + - diagnostic + - style + - performance + disabled-checks: + - unnamedResult + - whyNoLint + + revive: + rules: + - name: exported + disabled: false + arguments: + - "checkPrivateReceivers" + - "sayRepetitiveInsteadOfStutters" + +issues: + exclude-rules: + # 排除测试文件中的某些检查 + - path: _test\.go + linters: + - gosec + - errcheck + - gocyclo + + # 排除CASE表达式测试中的复杂度检查 + - path: streamsql_case_test\.go + linters: + - gocyclo + - funlen + + # 排除generated文件 + - path: ".*\\.pb\\.go" + linters: + - all + + exclude: + # 排除一些常见的false positive + - "Error return value of .((os\\.)?std(out|err)\\..*|.*Close|.*Flush|os\\.Remove(All)?|.*print.*|os\\.(Un)?Setenv). is not checked" + - "should have a package comment" + +output: + format: colored-line-number + print-issued-lines: true + print-linter-name: true \ No newline at end of file diff --git a/streamsql_benchmark_test.go b/streamsql_benchmark_test.go new file mode 100644 index 0000000..dad090d --- /dev/null +++ b/streamsql_benchmark_test.go @@ -0,0 +1,2661 @@ +package streamsql + +import ( + "context" + "fmt" + "math/rand" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/rulego/streamsql/stream" + "github.com/rulego/streamsql/types" +) + +// BenchmarkStreamSQLPerformance 综合性能基准测试(优化版本) +func BenchmarkStreamSQLPerformance(b *testing.B) { + tests := []struct { + name string + sql string + hasWindow bool + waitTime time.Duration + config string // 配置描述 + }{ + { + name: "SimpleFilter", + sql: "SELECT deviceId, temperature FROM stream WHERE temperature > 20", + hasWindow: false, + waitTime: 50 * time.Millisecond, + config: "基准测试专用", + }, + { + name: "BasicAggregation", + sql: "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('200ms')", + hasWindow: true, + waitTime: 400 * time.Millisecond, + config: "基准测试专用", + }, + { + name: "ComplexQuery", + sql: "SELECT deviceId, AVG(temperature), COUNT(*) FROM stream WHERE humidity > 50 GROUP BY deviceId, TumblingWindow('100ms')", + hasWindow: true, + waitTime: 300 * time.Millisecond, + config: "基准测试专用", + }, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + // 使用超大缓冲区专门针对基准测试优化 + // 基准测试需要处理大量迭代,需要更大的缓冲区 + bufferSize := max(int64(100000), int64(b.N/10)) // 至少10万,或者迭代数的1/10 + ssql := New(WithBufferSizes(int(bufferSize), int(bufferSize), 2000)) + defer ssql.Stop() + + err := ssql.Execute(tt.sql) + if err != nil { + b.Fatalf("SQL执行失败: %v", err) + } + + var resultReceived int64 + + // 添加非阻塞sink处理结果 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&resultReceived, 1) + }) + + // 使用context控制生命周期 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 异步消费resultChan,确保通道不被填满 + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + // 快速消费,避免通道阻塞 + case <-ctx.Done(): + return + } + } + }() + + // 测试数据 - 减少数据种类避免过度生成 + testData := generateTestData(3) + + // 重置统计,获得准确的基准测试数据 + ssql.Stream().ResetStats() + + b.ResetTimer() + + // 执行基准测试 - 添加节流以避免瞬间填满缓冲区 + start := time.Now() + batchSize := 1000 // 分批处理,避免瞬间打满缓冲区 + for i := 0; i < b.N; i++ { + ssql.AddData(testData[i%len(testData)]) + + // 每处理一批数据,稍微暂停,让系统有时间处理 + if i > 0 && i%batchSize == 0 { + time.Sleep(10 * time.Microsecond) // 极短暂停 + } + } + inputDuration := time.Since(start) + + b.StopTimer() + + // 等待处理完成 + time.Sleep(tt.waitTime) + + cancel() // 停止结果处理goroutine + + // 获取详细统计信息 + detailedStats := ssql.Stream().GetDetailedStats() + received := atomic.LoadInt64(&resultReceived) + + // 计算性能指标 + inputThroughput := float64(b.N) / inputDuration.Seconds() + processRate := detailedStats["process_rate"].(float64) + dropRate := detailedStats["drop_rate"].(float64) + perfLevel := detailedStats["performance_level"].(string) + + b.ReportMetric(inputThroughput, "input_ops/sec") + b.ReportMetric(float64(received), "results_received") + b.ReportMetric(processRate, "process_rate_%") + b.ReportMetric(dropRate, "drop_rate_%") + + // 性能分析报告 + b.Logf("%s配置 (缓冲区: %d) - 性能等级: %s", tt.config, bufferSize, perfLevel) + b.Logf("处理效率: %.2f%%, 丢弃率: %.2f%%", processRate, dropRate) + b.Logf("缓冲区使用: 数据通道 %.1f%%, 结果通道 %.1f%%", + detailedStats["data_chan_usage"], detailedStats["result_chan_usage"]) + + if dropRate > 5 { // 降低警告阈值 + b.Logf("警告: 丢弃率 %.2f%% - 建议增加缓冲区大小", dropRate) + } + + if !tt.hasWindow && received == 0 { + b.Logf("警告: 非聚合查询未收到结果") + } + + // 性能建议 + if dropRate > 10 { + b.Logf("建议: 使用更大缓冲区配置,当前缓冲区可能不足") + } else if processRate == 100.0 && dropRate == 0.0 { + b.Logf("✓ 优秀: 完美的处理效率,无数据丢失") + } + }) + } +} + +// BenchmarkStreamSQLFixed 修复版本的基准测试 +func BenchmarkStreamSQLFixed(b *testing.B) { + tests := []struct { + name string + sql string + hasWindow bool + waitTime time.Duration + }{ + { + name: "SimpleFilter", + sql: "SELECT deviceId, temperature FROM stream WHERE temperature > 20", + hasWindow: false, + waitTime: 10 * time.Millisecond, + }, + { + name: "BasicAggregation", + sql: "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('200ms')", + hasWindow: true, + waitTime: 300 * time.Millisecond, + }, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + ssql := New() + defer ssql.Stop() + + err := ssql.Execute(tt.sql) + if err != nil { + b.Fatalf("SQL执行失败: %v", err) + } + + var processedCount int64 + + // 使用非阻塞的sink避免阻塞 + ssql.Stream().AddSink(func(result interface{}) { + // 非阻塞计数,不做任何可能阻塞的操作 + atomic.AddInt64(&processedCount, 1) + }) + + // 启动一个goroutine异步消费resultChan,防止填满 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + // 快速消费,不做处理 + case <-ctx.Done(): + return + } + } + }() + + // 测试数据 + testData := generateTestData(5) // 减少数据量避免过载 + + b.ResetTimer() + + // 执行基准测试 + start := time.Now() + for i := 0; i < b.N; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + b.StopTimer() + + // 等待处理完成 + time.Sleep(tt.waitTime) + + // 计算性能指标 + inputThroughput := float64(b.N) / inputDuration.Seconds() + processed := atomic.LoadInt64(&processedCount) + + b.ReportMetric(inputThroughput, "input_ops/sec") + b.ReportMetric(float64(processed), "processed_results") + }) + } +} + +// BenchmarkPureInputPerformance 纯输入性能基准测试(避免结果处理的影响) +func BenchmarkPureInputPerformance(b *testing.B) { + ssql := New() + defer ssql.Stop() + + // 最简单的查询 + sql := "SELECT deviceId FROM stream" + err := ssql.Execute(sql) + if err != nil { + b.Fatal(err) + } + + // 启动结果消费者防止阻塞 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + // 快速丢弃结果 + case <-ctx.Done(): + return + } + } + }() + + // 预生成数据 + data := map[string]interface{}{ + "deviceId": "device1", + "temperature": 25.0, + } + + b.ResetTimer() + + // 测量纯输入吞吐量 + for i := 0; i < b.N; i++ { + ssql.AddData(data) + } + + duration := b.Elapsed() + throughput := float64(b.N) / duration.Seconds() + b.ReportMetric(throughput, "pure_input_ops/sec") +} + +// generateTestData 生成测试数据 +func generateTestData(count int) []map[string]interface{} { + data := make([]map[string]interface{}, count) + devices := []string{"device1", "device2", "device3", "device4", "device5"} + + for i := 0; i < count; i++ { + data[i] = map[string]interface{}{ + "deviceId": devices[rand.Intn(len(devices))], + "temperature": 15.0 + rand.Float64()*20, // 15-35度 + "humidity": 30.0 + rand.Float64()*40, // 30-70% + "timestamp": time.Now().UnixNano(), + } + } + return data +} + +// generateIoTData 生成更真实的IoT设备数据 +func generateIoTData(count int) []map[string]interface{} { + data := make([]map[string]interface{}, count) + devices := []string{"sensor001", "sensor002", "sensor003", "gateway001", "gateway002"} + locations := []string{"building_a", "building_b", "outdoor", "warehouse"} + + for i := 0; i < count; i++ { + baseTemp := 20.0 + if rand.Float64() < 0.1 { // 10%概率产生异常值 + baseTemp = 40.0 + } + + data[i] = map[string]interface{}{ + "deviceId": devices[rand.Intn(len(devices))], + "location": locations[rand.Intn(len(locations))], + "temperature": baseTemp + rand.Float64()*10, + "humidity": 40.0 + rand.Float64()*30, + "pressure": 1000.0 + rand.Float64()*100, + "battery": rand.Float64() * 100, + "signal": -30.0 - rand.Float64()*50, + "timestamp": time.Now().UnixNano(), + } + } + return data +} + +// max 辅助函数 +func max(a, b int64) int64 { + if a > b { + return a + } + return b +} + +// TestStreamSQLPerformanceAnalysis 性能分析测试 +func TestStreamSQLPerformanceAnalysis(t *testing.T) { + scenarios := []struct { + name string + sql string + dataCount int + duration time.Duration + expectResults bool + }{ + { + name: "高频非聚合查询", + sql: "SELECT deviceId, temperature FROM stream WHERE temperature > 20", + dataCount: 1000, + duration: 100 * time.Millisecond, + expectResults: true, + }, + { + name: "窗口聚合查询", + sql: "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('50ms')", + dataCount: 500, + duration: 200 * time.Millisecond, + expectResults: true, + }, + { + name: "复杂聚合查询", + sql: "SELECT deviceId, AVG(temperature), MAX(humidity), COUNT(*) FROM stream WHERE temperature > 15 GROUP BY deviceId, TumblingWindow('100ms')", + dataCount: 300, + duration: 300 * time.Millisecond, + expectResults: true, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + err := ssql.Execute(scenario.sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + var inputCount int64 + var resultCount int64 + + // 结果监听 + ctx, cancel := context.WithTimeout(context.Background(), scenario.duration*2) + defer cancel() + + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + atomic.AddInt64(&resultCount, 1) + case <-ctx.Done(): + return + } + } + }() + + // 生成和输入数据 + testData := generateTestData(20) + start := time.Now() + + for i := 0; i < scenario.dataCount; i++ { + ssql.AddData(testData[i%len(testData)]) + atomic.AddInt64(&inputCount, 1) + + // 控制输入频率,避免过快 + if i%100 == 0 { + time.Sleep(1 * time.Millisecond) + } + } + + inputDuration := time.Since(start) + + // 等待处理完成 + time.Sleep(scenario.duration) + + input := atomic.LoadInt64(&inputCount) + results := atomic.LoadInt64(&resultCount) + + inputRate := float64(input) / inputDuration.Seconds() + + t.Logf("场景: %s", scenario.name) + t.Logf("输入数据: %d 条, 耗时: %v", input, inputDuration) + t.Logf("输入速率: %.2f ops/sec", inputRate) + t.Logf("生成结果: %d 个", results) + + if scenario.expectResults && results == 0 { + t.Logf("警告: 预期有结果但未收到任何结果") + } + + // 基本性能验证 + if inputRate < 1000 { + t.Logf("注意: 输入速率较低 (%.2f ops/sec)", inputRate) + } + + if input != int64(scenario.dataCount) { + t.Errorf("输入数据不完整: 期望 %d, 实际 %d", scenario.dataCount, input) + } + }) + } +} + +// TestDiagnoseBenchmarkIssues 诊断基准测试阻塞问题的测试用例 +func TestDiagnoseBenchmarkIssues(t *testing.T) { + t.Run("基础功能测试", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + // 使用最简单的查询 + sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + // 检查流是否正确创建 + if ssql.Stream() == nil { + t.Fatal("流创建失败") + } + + var resultCount int64 + var lastResult interface{} + + // 添加结果回调 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&resultCount, 1) + lastResult = result + t.Logf("收到结果 #%d: %v", atomic.LoadInt64(&resultCount), result) + }) + + // 添加测试数据 + testData := map[string]interface{}{ + "deviceId": "device1", + "temperature": 25.0, + "humidity": 60.0, + } + + t.Logf("添加数据: %v", testData) + ssql.AddData(testData) + + // 等待结果 + time.Sleep(100 * time.Millisecond) + + count := atomic.LoadInt64(&resultCount) + t.Logf("处理结果数量: %d", count) + if count > 0 { + t.Logf("最后结果: %v", lastResult) + } + + // 验证非聚合查询应该立即返回结果 + if count == 0 { + t.Error("非聚合查询没有返回任何结果") + } + }) + + t.Run("窗口聚合测试", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + // 使用滚动窗口 + sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('200ms')" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + var resultCount int64 + var lastResult interface{} + + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&resultCount, 1) + lastResult = result + t.Logf("窗口结果 #%d: %v", atomic.LoadInt64(&resultCount), result) + }) + + // 添加多条数据 + for i := 0; i < 5; i++ { + testData := map[string]interface{}{ + "deviceId": "device1", + "temperature": 20.0 + float64(i), + "humidity": 60.0, + } + t.Logf("添加数据 #%d: %v", i+1, testData) + ssql.AddData(testData) + time.Sleep(10 * time.Millisecond) // 小间隔 + } + + // 等待窗口触发 + t.Log("等待窗口触发...") + time.Sleep(300 * time.Millisecond) + + count := atomic.LoadInt64(&resultCount) + t.Logf("窗口结果数量: %d", count) + if count > 0 { + t.Logf("最后窗口结果: %v", lastResult) + } + + if count == 0 { + t.Error("窗口聚合没有返回任何结果") + } + }) + + t.Run("高频数据测试", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT deviceId, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('100ms')" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + var resultCount int64 + var totalDataPoints int64 + + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&resultCount, 1) + if resultSlice, ok := result.([]map[string]interface{}); ok { + for _, r := range resultSlice { + if count, exists := r["count"]; exists { + if countVal, ok := count.(int64); ok { + atomic.AddInt64(&totalDataPoints, countVal) + } + } + } + } + t.Logf("高频结果 #%d: %v", atomic.LoadInt64(&resultCount), result) + }) + + // 高频添加数据 + start := time.Now() + dataCount := 50 + for i := 0; i < dataCount; i++ { + testData := map[string]interface{}{ + "deviceId": fmt.Sprintf("device%d", (i%3)+1), + "temperature": 20.0 + rand.Float64()*10, + "humidity": 50.0 + rand.Float64()*20, + } + ssql.AddData(testData) + } + inputDuration := time.Since(start) + + // 等待窗口处理 + time.Sleep(200 * time.Millisecond) + + windows := atomic.LoadInt64(&resultCount) + processed := atomic.LoadInt64(&totalDataPoints) + + t.Logf("输入 %d 条数据,用时 %v", dataCount, inputDuration) + t.Logf("生成 %d 个窗口,处理 %d 条数据", windows, processed) + t.Logf("输入速率: %.2f ops/sec", float64(dataCount)/inputDuration.Seconds()) + + if processed != int64(dataCount) { + t.Logf("警告: 处理数据量 (%d) 与输入数据量 (%d) 不匹配", processed, dataCount) + } + + if windows == 0 { + t.Error("高频数据测试没有生成任何窗口") + } + }) + + t.Run("性能基准预测试", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT deviceId, temperature, humidity FROM stream WHERE temperature > 20" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + var processedCount int64 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&processedCount, 1) + }) + + testData := generateTestData(10) + + // 模拟基准测试的执行方式 + start := time.Now() + iterations := 1000 + for i := 0; i < iterations; i++ { + ssql.AddData(testData[i%len(testData)]) + } + duration := time.Since(start) + + time.Sleep(20 * time.Millisecond) // 等待处理完成 + + processed := atomic.LoadInt64(&processedCount) + throughput := float64(iterations) / duration.Seconds() + + t.Logf("执行 %d 次迭代,用时 %v", iterations, duration) + t.Logf("处理 %d 条数据", processed) + t.Logf("吞吐量: %.2f ops/sec", throughput) + + if processed == 0 { + t.Error("性能基准预测试没有处理任何数据") + } + + // 检查是否会阻塞 + if duration > 5*time.Second { + t.Error("执行时间过长,可能存在阻塞问题") + } + }) +} + +// TestStreamOptimizations 测试Stream优化效果 +func TestStreamOptimizations(t *testing.T) { + t.Run("非阻塞性能测试", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + // 使用简单查询测试 + sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + // 添加多个sink模拟实际使用场景 + var sink1Count, sink2Count, sink3Count int64 + + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&sink1Count, 1) + time.Sleep(1 * time.Millisecond) // 模拟处理延迟 + }) + + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&sink2Count, 1) + time.Sleep(2 * time.Millisecond) // 模拟较慢的sink + }) + + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&sink3Count, 1) + }) + + // 快速输入大量数据 + testData := generateTestData(10) + inputCount := 1000 + + start := time.Now() + for i := 0; i < inputCount; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + // 等待处理完成 + time.Sleep(200 * time.Millisecond) + + // 获取统计信息 + stats := ssql.Stream().GetStats() + + t.Logf("输入 %d 条数据,耗时: %v", inputCount, inputDuration) + t.Logf("输入速率: %.2f ops/sec", float64(inputCount)/inputDuration.Seconds()) + t.Logf("统计信息: %+v", stats) + t.Logf("Sink计数: sink1=%d, sink2=%d, sink3=%d", + atomic.LoadInt64(&sink1Count), + atomic.LoadInt64(&sink2Count), + atomic.LoadInt64(&sink3Count)) + + // 验证性能指标 + if inputDuration > 100*time.Millisecond { + t.Errorf("输入耗时过长: %v", inputDuration) + } + + if stats["dropped_count"] > int64(inputCount/10) { + t.Errorf("丢弃数据过多: %d", stats["dropped_count"]) + } + + // 验证非阻塞性 + throughput := float64(inputCount) / inputDuration.Seconds() + if throughput < 5000 { // 最低5K ops/sec + t.Errorf("吞吐量过低: %.2f ops/sec", throughput) + } + }) + + t.Run("窗口聚合优化测试", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('100ms')" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + var resultCount int64 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&resultCount, 1) + // 模拟慢sink + time.Sleep(5 * time.Millisecond) + }) + + // 快速输入数据 + testData := generateTestData(5) + inputCount := 500 + + start := time.Now() + for i := 0; i < inputCount; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + // 等待窗口触发 + time.Sleep(200 * time.Millisecond) + + stats := ssql.Stream().GetStats() + results := atomic.LoadInt64(&resultCount) + + t.Logf("窗口聚合测试 - 输入: %d, 耗时: %v", inputCount, inputDuration) + t.Logf("输入速率: %.2f ops/sec", float64(inputCount)/inputDuration.Seconds()) + t.Logf("生成窗口结果: %d", results) + t.Logf("统计信息: %+v", stats) + + // 验证窗口功能正常 + if results == 0 { + t.Error("窗口聚合未生成任何结果") + } + + // 验证非阻塞性 + if inputDuration > 100*time.Millisecond { + t.Errorf("窗口模式输入耗时过长: %v", inputDuration) + } + }) + + t.Run("高负载压力测试", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT deviceId, temperature FROM stream" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + // 添加会阻塞的sink + ssql.Stream().AddSink(func(result interface{}) { + time.Sleep(10 * time.Millisecond) // 故意阻塞 + }) + + // 高频输入 + testData := generateTestData(3) + inputCount := 2000 // 增加输入量 + + start := time.Now() + for i := 0; i < inputCount; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + // 短暂等待 + time.Sleep(50 * time.Millisecond) + + stats := ssql.Stream().GetStats() + + t.Logf("高负载测试 - 输入: %d, 耗时: %v", inputCount, inputDuration) + t.Logf("输入速率: %.2f ops/sec", float64(inputCount)/inputDuration.Seconds()) + t.Logf("统计信息: %+v", stats) + + // 即使有阻塞的sink,系统也应该保持响应 + if inputDuration > 200*time.Millisecond { + t.Errorf("高负载下输入耗时过长: %v", inputDuration) + } + + // 验证系统没有完全阻塞 + throughput := float64(inputCount) / inputDuration.Seconds() + if throughput < 1000 { + t.Errorf("高负载下吞吐量过低: %.2f ops/sec", throughput) + } + }) +} + +// TestStreamOptimizationsImproved 测试Stream改进后的优化效果 +func TestStreamOptimizationsImproved(t *testing.T) { + t.Run("改进的非阻塞性能测试", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + // 使用简单查询测试 + sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + // 添加多个sink模拟真实场景 + var sink1Count, sink2Count int64 + + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&sink1Count, 1) + time.Sleep(2 * time.Millisecond) // 模拟处理延迟 + }) + + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&sink2Count, 1) + time.Sleep(1 * time.Millisecond) // 模拟较快的sink + }) + + // 重置统计信息 + ssql.Stream().ResetStats() + + // 快速输入大量数据 + testData := generateTestData(10) + inputCount := 2000 // 增加输入量测试 + + start := time.Now() + for i := 0; i < inputCount; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + // 等待处理完成 + time.Sleep(300 * time.Millisecond) + + // 获取统计信息 + stats := ssql.Stream().GetStats() + + t.Logf("改进测试 - 输入 %d 条数据,耗时: %v", inputCount, inputDuration) + t.Logf("输入速率: %.2f ops/sec", float64(inputCount)/inputDuration.Seconds()) + t.Logf("统计信息: %+v", stats) + t.Logf("Sink计数: sink1=%d, sink2=%d", + atomic.LoadInt64(&sink1Count), + atomic.LoadInt64(&sink2Count)) + + // 计算处理效率 + inputTotal := stats["input_count"] + outputTotal := stats["output_count"] + droppedTotal := stats["dropped_count"] + + if inputTotal > 0 { + processRate := float64(outputTotal) / float64(inputTotal) * 100 + dropRate := float64(droppedTotal) / float64(inputTotal) * 100 + + t.Logf("处理效率: %.2f%%, 丢弃率: %.2f%%", processRate, dropRate) + + // 验证改进效果 + if dropRate > 50 { // 丢弃率不应超过50% + t.Errorf("丢弃率过高: %.2f%%", dropRate) + } + } + + // 验证非阻塞性 + throughput := float64(inputCount) / inputDuration.Seconds() + if throughput < 10000 { // 期望至少10K ops/sec + t.Logf("注意: 吞吐量较低: %.2f ops/sec", throughput) + } + + // 验证系统没有完全阻塞 + if inputDuration > 500*time.Millisecond { + t.Errorf("输入耗时过长: %v", inputDuration) + } + }) + + t.Run("超高负载压力测试", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT deviceId, temperature FROM stream" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + // 添加会严重阻塞的sink + var sinkCount int64 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&sinkCount, 1) + time.Sleep(20 * time.Millisecond) // 故意制造严重阻塞 + }) + + // 重置统计 + ssql.Stream().ResetStats() + + // 超高频输入 + testData := generateTestData(3) + inputCount := 5000 // 大幅增加输入量 + + start := time.Now() + for i := 0; i < inputCount; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + // 短暂等待 + time.Sleep(100 * time.Millisecond) + + stats := ssql.Stream().GetStats() + sinks := atomic.LoadInt64(&sinkCount) + + t.Logf("超高负载测试 - 输入: %d, 耗时: %v", inputCount, inputDuration) + t.Logf("输入速率: %.2f ops/sec", float64(inputCount)/inputDuration.Seconds()) + t.Logf("Sink处理数: %d", sinks) + t.Logf("统计信息: %+v", stats) + + // 即使有严重阻塞的sink,系统仍应保持响应 + throughput := float64(inputCount) / inputDuration.Seconds() + + // 验证系统没有完全卡死 + if inputDuration > 1*time.Second { + t.Errorf("超高负载下输入耗时过长: %v", inputDuration) + } + + if throughput < 5000 { + t.Logf("注意: 超高负载下吞吐量: %.2f ops/sec", throughput) + } else { + t.Logf("优秀: 超高负载下仍保持高吞吐量: %.2f ops/sec", throughput) + } + + // 验证背压控制有效 + inputTotal := stats["input_count"] + droppedTotal := stats["dropped_count"] + if inputTotal > 0 { + dropRate := float64(droppedTotal) / float64(inputTotal) * 100 + t.Logf("背压控制 - 丢弃率: %.2f%%", dropRate) + } + }) + + t.Run("性能对比测试", func(t *testing.T) { + // 测试不同负载下的性能表现 + testCases := []struct { + name string + inputCount int + sinkDelay time.Duration + maxDropRate float64 + }{ + {"轻负载", 500, 1 * time.Millisecond, 10.0}, + {"中负载", 1500, 3 * time.Millisecond, 25.0}, + {"重负载", 3000, 5 * time.Millisecond, 40.0}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 15" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + var sinkCount int64 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&sinkCount, 1) + time.Sleep(tc.sinkDelay) + }) + + ssql.Stream().ResetStats() + + testData := generateTestData(5) + start := time.Now() + + for i := 0; i < tc.inputCount; i++ { + ssql.AddData(testData[i%len(testData)]) + } + + inputDuration := time.Since(start) + time.Sleep(150 * time.Millisecond) + + stats := ssql.Stream().GetStats() + sinks := atomic.LoadInt64(&sinkCount) + + throughput := float64(tc.inputCount) / inputDuration.Seconds() + + inputTotal := stats["input_count"] + outputTotal := stats["output_count"] + droppedTotal := stats["dropped_count"] + + var processRate, dropRate float64 + if inputTotal > 0 { + processRate = float64(outputTotal) / float64(inputTotal) * 100 + dropRate = float64(droppedTotal) / float64(inputTotal) * 100 + } + + t.Logf("%s结果:", tc.name) + t.Logf(" 输入速率: %.2f ops/sec", throughput) + t.Logf(" 处理效率: %.2f%%", processRate) + t.Logf(" 丢弃率: %.2f%%", dropRate) + t.Logf(" Sink处理: %d", sinks) + t.Logf(" 统计: %+v", stats) + + // 验证性能标准 + if dropRate > tc.maxDropRate { + t.Errorf("%s: 丢弃率过高 %.2f%% > %.2f%%", tc.name, dropRate, tc.maxDropRate) + } + + if throughput < 1000 { + t.Errorf("%s: 吞吐量过低 %.2f ops/sec", tc.name, throughput) + } + }) + } + }) +} + +// TestMassiveBufferOptimization 测试超大缓冲区优化效果 +func TestMassiveBufferOptimization(t *testing.T) { + t.Run("标准配置vs高性能配置对比", func(t *testing.T) { + // 测试标准配置 + t.Run("标准配置", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20" + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + var sinkCount int64 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&sinkCount, 1) + time.Sleep(1 * time.Millisecond) // 模拟处理延迟 + }) + + ssql.Stream().ResetStats() + + // 大量数据输入 + testData := generateTestData(5) + inputCount := 10000 + + start := time.Now() + for i := 0; i < inputCount; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + time.Sleep(300 * time.Millisecond) + + detailedStats := ssql.Stream().GetDetailedStats() + sinks := atomic.LoadInt64(&sinkCount) + + t.Logf("标准配置结果:") + t.Logf(" 输入速率: %.2f ops/sec", float64(inputCount)/inputDuration.Seconds()) + t.Logf(" 处理效率: %.2f%%", detailedStats["process_rate"]) + t.Logf(" 丢弃率: %.2f%%", detailedStats["drop_rate"]) + t.Logf(" 性能等级: %s", detailedStats["performance_level"]) + t.Logf(" 数据通道使用率: %.2f%%", detailedStats["data_chan_usage"]) + t.Logf(" Sink处理数: %d", sinks) + t.Logf(" 详细统计: %+v", detailedStats["basic_stats"]) + }) + + // 测试高性能配置 + t.Run("高性能配置", func(t *testing.T) { + // 直接创建高性能Stream(绕过StreamSQL包装) + config := types.Config{ + SimpleFields: []string{"deviceId", "temperature"}, + } + + stream, err := stream.NewHighPerformanceStream(config) + if err != nil { + t.Fatalf("高性能Stream创建失败: %v", err) + } + defer stream.Stop() + + err = stream.RegisterFilter("temperature > 20") + if err != nil { + t.Fatalf("过滤器注册失败: %v", err) + } + + stream.Start() + + var sinkCount int64 + stream.AddSink(func(result interface{}) { + atomic.AddInt64(&sinkCount, 1) + time.Sleep(1 * time.Millisecond) // 模拟处理延迟 + }) + + stream.ResetStats() + + // 大量数据输入 + testData := generateTestData(5) + inputCount := 10000 + + start := time.Now() + for i := 0; i < inputCount; i++ { + stream.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + time.Sleep(300 * time.Millisecond) + + detailedStats := stream.GetDetailedStats() + sinks := atomic.LoadInt64(&sinkCount) + + t.Logf("高性能配置结果:") + t.Logf(" 输入速率: %.2f ops/sec", float64(inputCount)/inputDuration.Seconds()) + t.Logf(" 处理效率: %.2f%%", detailedStats["process_rate"]) + t.Logf(" 丢弃率: %.2f%%", detailedStats["drop_rate"]) + t.Logf(" 性能等级: %s", detailedStats["performance_level"]) + t.Logf(" 数据通道使用率: %.2f%%", detailedStats["data_chan_usage"]) + t.Logf(" Sink处理数: %d", sinks) + t.Logf(" 详细统计: %+v", detailedStats["basic_stats"]) + }) + }) + + t.Run("超高负载抗压测试", func(t *testing.T) { + // 使用最大缓冲区配置测试极限情况 + config := types.Config{ + SimpleFields: []string{"deviceId", "temperature"}, + } + + // 自定义超大缓冲区:100K输入,100K结果,2K sink池 + stream, err := stream.NewStreamWithBuffers(config, 100000, 100000, 2000) + if err != nil { + t.Fatalf("超大缓冲区Stream创建失败: %v", err) + } + defer stream.Stop() + + err = stream.RegisterFilter("temperature > 15") + if err != nil { + t.Fatalf("过滤器注册失败: %v", err) + } + + stream.Start() + + // 添加多个慢速sink模拟极端场景 + var sink1Count, sink2Count, sink3Count int64 + + stream.AddSink(func(result interface{}) { + atomic.AddInt64(&sink1Count, 1) + time.Sleep(3 * time.Millisecond) // 慢速sink + }) + + stream.AddSink(func(result interface{}) { + atomic.AddInt64(&sink2Count, 1) + time.Sleep(5 * time.Millisecond) // 更慢的sink + }) + + stream.AddSink(func(result interface{}) { + atomic.AddInt64(&sink3Count, 1) + time.Sleep(1 * time.Millisecond) // 相对快速的sink + }) + + stream.ResetStats() + + // 超大量数据输入 + testData := generateTestData(3) + inputCount := 50000 // 5万条数据 + + t.Logf("开始超高负载测试:输入 %d 条数据", inputCount) + + start := time.Now() + for i := 0; i < inputCount; i++ { + stream.AddData(testData[i%len(testData)]) + + // 偶尔检查状态,避免测试超时 + if i%10000 == 0 && i > 0 { + t.Logf("已输入 %d 条数据", i) + } + } + inputDuration := time.Since(start) + + t.Logf("数据输入完成,耗时: %v", inputDuration) + + // 等待处理完成 + time.Sleep(500 * time.Millisecond) + + detailedStats := stream.GetDetailedStats() + + t.Logf("超高负载测试结果:") + t.Logf(" 输入速率: %.2f ops/sec", float64(inputCount)/inputDuration.Seconds()) + t.Logf(" 处理效率: %.2f%%", detailedStats["process_rate"]) + t.Logf(" 丢弃率: %.2f%%", detailedStats["drop_rate"]) + t.Logf(" 性能等级: %s", detailedStats["performance_level"]) + t.Logf(" 数据通道使用率: %.2f%%", detailedStats["data_chan_usage"]) + t.Logf(" 结果通道使用率: %.2f%%", detailedStats["result_chan_usage"]) + t.Logf(" Sink池使用率: %.2f%%", detailedStats["sink_pool_usage"]) + + sinks := []int64{ + atomic.LoadInt64(&sink1Count), + atomic.LoadInt64(&sink2Count), + atomic.LoadInt64(&sink3Count), + } + t.Logf(" Sink处理数: %v", sinks) + t.Logf(" 详细统计: %+v", detailedStats["basic_stats"]) + + // 验证性能指标 + if detailedStats["drop_rate"].(float64) > 30 { + t.Logf("注意: 丢弃率较高 %.2f%%,但系统未阻塞", detailedStats["drop_rate"]) + } + + throughput := float64(inputCount) / inputDuration.Seconds() + if throughput < 50000 { // 期望至少5万ops/sec + t.Logf("注意: 吞吐量 %.2f ops/sec,但在超高负载下属于正常范围", throughput) + } else { + t.Logf("优秀: 超高负载下仍保持高吞吐量 %.2f ops/sec", throughput) + } + + // 验证系统没有完全阻塞 + if inputDuration > 2*time.Second { + t.Errorf("超高负载下输入耗时过长: %v", inputDuration) + } + + // 验证缓冲区配置有效 + basicStats := detailedStats["basic_stats"].(map[string]int64) + t.Logf("缓冲区配置验证:") + t.Logf(" 数据通道: %d/%d", basicStats["data_chan_len"], basicStats["data_chan_cap"]) + t.Logf(" 结果通道: %d/%d", basicStats["result_chan_len"], basicStats["result_chan_cap"]) + t.Logf(" Sink池: %d/%d", basicStats["sink_pool_len"], basicStats["sink_pool_cap"]) + }) +} + +// BenchmarkConfigurationComparison 不同配置性能对比基准测试 +func BenchmarkConfigurationComparison(b *testing.B) { + tests := []struct { + name string + setupFunc func() *Streamsql + description string + }{ + { + name: "轻量配置", + setupFunc: func() *Streamsql { + return New(WithBufferSizes(5000, 5000, 250)) + }, + description: "5K数据缓冲,5K结果缓冲,250 sink池", + }, + { + name: "默认配置(中等场景)", + setupFunc: func() *Streamsql { + return New() + }, + description: "20K数据缓冲,20K结果缓冲,800 sink池", + }, + { + name: "重负载配置", + setupFunc: func() *Streamsql { + return New(WithBufferSizes(35000, 35000, 1200)) + }, + description: "35K数据缓冲,35K结果缓冲,1.2K sink池", + }, + { + name: "高性能配置", + setupFunc: func() *Streamsql { + return New(WithHighPerformance()) + }, + description: "50K数据缓冲,50K结果缓冲,1K sink池", + }, + { + name: "超大缓冲配置", + setupFunc: func() *Streamsql { + return New(WithBufferSizes(100000, 100000, 2000)) + }, + description: "100K数据缓冲,100K结果缓冲,2K sink池", + }, + } + + sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20" + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + ssql := tt.setupFunc() + defer ssql.Stop() + + err := ssql.Execute(sql) + if err != nil { + b.Fatalf("SQL执行失败: %v", err) + } + + var resultCount int64 + + // 添加轻量级sink + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&resultCount, 1) + }) + + // 异步消费resultChan + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + // 快速消费 + case <-ctx.Done(): + return + } + } + }() + + // 测试数据 + testData := generateTestData(3) + + b.ResetTimer() + + // 执行基准测试 + start := time.Now() + for i := 0; i < b.N; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + b.StopTimer() + + // 等待处理完成 + time.Sleep(100 * time.Millisecond) + + cancel() + + // 获取统计信息 + detailedStats := ssql.Stream().GetDetailedStats() + results := atomic.LoadInt64(&resultCount) + + // 性能指标 + inputThroughput := float64(b.N) / inputDuration.Seconds() + processRate := detailedStats["process_rate"].(float64) + dropRate := detailedStats["drop_rate"].(float64) + perfLevel := detailedStats["performance_level"].(string) + + b.ReportMetric(inputThroughput, "input_ops/sec") + b.ReportMetric(processRate, "process_rate_%") + b.ReportMetric(dropRate, "drop_rate_%") + b.ReportMetric(float64(results), "results_count") + + // 详细报告 + b.Logf("配置: %s", tt.description) + b.Logf("性能等级: %s", perfLevel) + b.Logf("处理效率: %.2f%%, 丢弃率: %.2f%%", processRate, dropRate) + + // 缓冲区使用情况 + dataChanUsage := detailedStats["data_chan_usage"].(float64) + resultChanUsage := detailedStats["result_chan_usage"].(float64) + sinkPoolUsage := detailedStats["sink_pool_usage"].(float64) + + b.Logf("缓冲区使用率 - 数据: %.1f%%, 结果: %.1f%%, Sink池: %.1f%%", + dataChanUsage, resultChanUsage, sinkPoolUsage) + }) + } +} + +// BenchmarkOptimizedPerformance 优化后的单项性能测试 +func BenchmarkOptimizedPerformance(b *testing.B) { + b.Run("纯输入性能-高性能配置", func(b *testing.B) { + ssql := New(WithHighPerformance()) + defer ssql.Stop() + + sql := "SELECT deviceId FROM stream" + err := ssql.Execute(sql) + if err != nil { + b.Fatal(err) + } + + // 消费者防止阻塞 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + case <-ctx.Done(): + return + } + } + }() + + // 预生成数据 + data := map[string]interface{}{ + "deviceId": "device1", + "temperature": 25.0, + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + ssql.AddData(data) + } + + duration := b.Elapsed() + throughput := float64(b.N) / duration.Seconds() + + b.StopTimer() + + // 获取统计 + detailedStats := ssql.Stream().GetDetailedStats() + dropRate := detailedStats["drop_rate"].(float64) + + b.ReportMetric(throughput, "pure_input_ops/sec") + b.ReportMetric(dropRate, "drop_rate_%") + + b.Logf("高性能配置下丢弃率: %.2f%%", dropRate) + }) + + b.Run("窗口聚合性能-超大缓冲", func(b *testing.B) { + ssql := New(WithBufferSizes(50000, 50000, 1500)) + defer ssql.Stop() + + sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('100ms')" + err := ssql.Execute(sql) + if err != nil { + b.Fatal(err) + } + + var resultCount int64 + + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&resultCount, 1) + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + case <-ctx.Done(): + return + } + } + }() + + testData := generateTestData(2) + + b.ResetTimer() + + start := time.Now() + for i := 0; i < b.N; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + b.StopTimer() + + time.Sleep(200 * time.Millisecond) + cancel() + + results := atomic.LoadInt64(&resultCount) + detailedStats := ssql.Stream().GetDetailedStats() + + inputThroughput := float64(b.N) / inputDuration.Seconds() + processRate := detailedStats["process_rate"].(float64) + dropRate := detailedStats["drop_rate"].(float64) + + b.ReportMetric(inputThroughput, "input_ops/sec") + b.ReportMetric(processRate, "process_rate_%") + b.ReportMetric(dropRate, "drop_rate_%") + b.ReportMetric(float64(results), "window_results") + + b.Logf("窗口聚合 - 处理效率: %.2f%%, 丢弃率: %.2f%%", processRate, dropRate) + }) +} + +// TestMemoryUsageComparison 内存使用对比测试 +func TestMemoryUsageComparison(t *testing.T) { + tests := []struct { + name string + setupFunc func() *Streamsql + description string + expectedMB float64 // 预期内存使用(MB) + }{ + { + name: "轻量配置", + setupFunc: func() *Streamsql { + return New(WithBufferSizes(5000, 5000, 250)) + }, + description: "5K数据 + 5K结果 + 250sink池", + expectedMB: 1.0, // 预期约1MB + }, + { + name: "默认配置(中等场景)", + setupFunc: func() *Streamsql { + return New() + }, + description: "20K数据 + 20K结果 + 800sink池", + expectedMB: 3.0, // 预期约3MB + }, + { + name: "高性能配置", + setupFunc: func() *Streamsql { + return New(WithHighPerformance()) + }, + description: "50K数据 + 50K结果 + 1Ksinki池", + expectedMB: 12.0, // 预期约12MB + }, + { + name: "超大缓冲配置", + setupFunc: func() *Streamsql { + return New(WithBufferSizes(100000, 100000, 2000)) + }, + description: "100K数据缓冲,100K结果缓冲,2Ksinki池", + expectedMB: 25.0, // 预期约25MB + }, + } + + sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20" + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // 获取开始内存 + var startMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&startMem) + + // 创建Stream + ssql := tt.setupFunc() + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + // 等待初始化完成 + time.Sleep(10 * time.Millisecond) + + // 获取创建后内存 + var afterCreateMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&afterCreateMem) + + createUsage := float64(afterCreateMem.Alloc-startMem.Alloc) / 1024 / 1024 + + // 添加一些数据测试内存增长 + testData := generateTestData(3) + for i := 0; i < 1000; i++ { + ssql.AddData(testData[i%len(testData)]) + } + + time.Sleep(50 * time.Millisecond) + + // 获取使用后内存 + var afterUseMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&afterUseMem) + + totalUsage := float64(afterUseMem.Alloc-startMem.Alloc) / 1024 / 1024 + + // 获取详细统计 + detailedStats := ssql.Stream().GetDetailedStats() + basicStats := detailedStats["basic_stats"].(map[string]int64) + + ssql.Stop() + + t.Logf("=== %s 内存使用分析 ===", tt.name) + t.Logf("配置: %s", tt.description) + t.Logf("创建开销: %.2f MB", createUsage) + t.Logf("总内存使用: %.2f MB", totalUsage) + t.Logf("缓冲区配置:") + t.Logf(" 数据通道: %d", basicStats["data_chan_cap"]) + t.Logf(" 结果通道: %d", basicStats["result_chan_cap"]) + t.Logf(" Sink池: %d", basicStats["sink_pool_cap"]) + + // 计算理论内存使用 (每个接口槽位约24字节) + dataChanMem := float64(basicStats["data_chan_cap"]) * 24 / 1024 / 1024 + resultChanMem := float64(basicStats["result_chan_cap"]) * 24 / 1024 / 1024 + sinkPoolMem := float64(basicStats["sink_pool_cap"]) * 8 / 1024 / 1024 // 函数指针 + + theoreticalMem := dataChanMem + resultChanMem + sinkPoolMem + + t.Logf("理论内存分配:") + t.Logf(" 数据通道: %.2f MB", dataChanMem) + t.Logf(" 结果通道: %.2f MB", resultChanMem) + t.Logf(" Sink池: %.2f MB", sinkPoolMem) + t.Logf(" 理论总计: %.2f MB", theoreticalMem) + + // 内存效率分析 + if totalUsage > tt.expectedMB*2 { + t.Logf("警告: 内存使用超过预期2倍 (%.2f MB > %.2f MB)", totalUsage, tt.expectedMB*2) + } else if totalUsage > tt.expectedMB*1.5 { + t.Logf("注意: 内存使用超过预期50%% (%.2f MB > %.2f MB)", totalUsage, tt.expectedMB*1.5) + } else { + t.Logf("✓ 内存使用在合理范围内 (%.2f MB)", totalUsage) + } + }) + } +} + +// TestResourceCostAnalysis 资源成本分析测试 +func TestResourceCostAnalysis(t *testing.T) { + scenarios := []struct { + name string + setup func() *Streamsql + workload int + description string + costCategory string + }{ + { + name: "轻量场景", + setup: func() *Streamsql { return New(WithBufferSizes(5000, 5000, 250)) }, + workload: 1000, + description: "资源受限环境,轻量业务", + costCategory: "低成本", + }, + { + name: "中等场景(默认)", + setup: func() *Streamsql { return New() }, + workload: 10000, + description: "生产环境,正常峰值", + costCategory: "中等成本", + }, + { + name: "高负载场景", + setup: func() *Streamsql { return New(WithHighPerformance()) }, + workload: 50000, + description: "高并发,极端负载", + costCategory: "高成本", + }, + } + + sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('100ms')" + + t.Log("=== 资源成本对比分析 ===") + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + // 内存监控 + var beforeMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&beforeMem) + + // CPU和goroutine监控 + beforeGoroutines := runtime.NumGoroutine() + start := time.Now() + + // 创建实例 + ssql := scenario.setup() + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + // 运行负载 + testData := generateTestData(5) + for i := 0; i < scenario.workload; i++ { + ssql.AddData(testData[i%len(testData)]) + } + + // 等待处理完成 + time.Sleep(200 * time.Millisecond) + + // 获取统计 + detailedStats := ssql.Stream().GetDetailedStats() + basicStats := detailedStats["basic_stats"].(map[string]int64) + + // 资源使用测量 + duration := time.Since(start) + var afterMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&afterMem) + afterGoroutines := runtime.NumGoroutine() + + memUsage := float64(afterMem.Alloc-beforeMem.Alloc) / 1024 / 1024 + goroutineIncrease := afterGoroutines - beforeGoroutines + + ssql.Stop() + + // 成本分析报告 + t.Logf("--- %s 成本分析 ---", scenario.name) + t.Logf("场景: %s (%s)", scenario.description, scenario.costCategory) + t.Logf("负载: %d 条数据", scenario.workload) + t.Logf("执行时间: %v", duration) + + t.Logf("资源消耗:") + t.Logf(" 内存: %.2f MB", memUsage) + t.Logf(" Goroutine增加: %d", goroutineIncrease) + t.Logf(" 处理速率: %.2f ops/sec", float64(scenario.workload)/duration.Seconds()) + + t.Logf("缓冲区开销:") + t.Logf(" 数据通道容量: %d", basicStats["data_chan_cap"]) + t.Logf(" 结果通道容量: %d", basicStats["result_chan_cap"]) + t.Logf(" Sink池容量: %d", basicStats["sink_pool_cap"]) + + t.Logf("性能指标:") + t.Logf(" 处理效率: %.2f%%", detailedStats["process_rate"]) + t.Logf(" 丢弃率: %.2f%%", detailedStats["drop_rate"]) + t.Logf(" 性能等级: %s", detailedStats["performance_level"]) + + // 成本效益分析 + throughput := float64(scenario.workload) / duration.Seconds() + memEfficiency := throughput / memUsage // ops/sec per MB + + t.Logf("成本效益:") + t.Logf(" 内存效率: %.2f ops/sec/MB", memEfficiency) + + // 推荐使用场景 + switch scenario.costCategory { + case "低成本": + if throughput > 5000 { + t.Logf("✓ 推荐: 适合日常业务、开发测试、资源受限环境") + } + case "中等成本": + if throughput > 20000 { + t.Logf("✓ 推荐: 适合生产环境、中等负载、平衡性能和成本") + } + case "高成本": + if throughput > 100000 { + t.Logf("✓ 推荐: 适合极高负载、关键业务、性能优先场景") + } else { + t.Logf("⚠ 注意: 高成本但吞吐量未达到预期,可能配置过度") + } + } + }) + } + + // 配置选择建议 + t.Log("\n=== 配置选择建议 ===") + t.Log("1. 默认配置: 适合大多数场景,内存占用低(~2MB),goroutine开销小") + t.Log("2. 中等配置: 适合中等负载,内存占用中等(~5MB),平衡性能和成本") + t.Log("3. 高性能配置: 适合极高负载,内存占用高(~12MB),最大化吞吐量") + t.Log("4. 自定义配置: 根据具体业务需求精确调优,避免资源浪费") +} + +// TestHighPerformanceCostAnalysis 高性能模式代价深度分析 +func TestHighPerformanceCostAnalysis(t *testing.T) { + t.Log("=== 高性能模式 vs 默认配置深度对比 ===") + + configs := []struct { + name string + setup func() *Streamsql + category string + }{ + { + name: "默认配置", + setup: func() *Streamsql { return New() }, + category: "基准", + }, + { + name: "高性能配置", + setup: func() *Streamsql { return New(WithHighPerformance()) }, + category: "优化", + }, + } + + sql := "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('50ms')" + workload := 10000 + + var results []map[string]interface{} + + for _, config := range configs { + t.Run(config.name, func(t *testing.T) { + // 详细资源监控 + var beforeMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&beforeMem) + beforeGoroutines := runtime.NumGoroutine() + + // 创建并启动Stream + ssql := config.setup() + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("执行失败: %v", err) + } + + // 等待完全初始化 + time.Sleep(20 * time.Millisecond) + + // 测量初始化开销 + var afterInitMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&afterInitMem) + afterInitGoroutines := runtime.NumGoroutine() + + initMemCost := float64(afterInitMem.Alloc-beforeMem.Alloc) / 1024 / 1024 + initGoroutineCost := afterInitGoroutines - beforeGoroutines + + // 运行负载测试 + testData := generateTestData(3) + start := time.Now() + + for i := 0; i < workload; i++ { + ssql.AddData(testData[i%len(testData)]) + } + + inputDuration := time.Since(start) + time.Sleep(100 * time.Millisecond) // 等待处理完成 + + // 最终资源测量 + var finalMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&finalMem) + finalGoroutines := runtime.NumGoroutine() + + totalMemUsage := float64(finalMem.Alloc-beforeMem.Alloc) / 1024 / 1024 + runtimeMemUsage := totalMemUsage - initMemCost + + // 获取详细统计 + detailedStats := ssql.Stream().GetDetailedStats() + basicStats := detailedStats["basic_stats"].(map[string]int64) + + ssql.Stop() + + // 分析结果 + result := map[string]interface{}{ + "name": config.name, + "category": config.category, + "init_memory_mb": initMemCost, + "runtime_memory_mb": runtimeMemUsage, + "total_memory_mb": totalMemUsage, + "init_goroutines": initGoroutineCost, + "total_goroutines": finalGoroutines - beforeGoroutines, + "input_duration_ms": float64(inputDuration.Nanoseconds()) / 1e6, + "throughput_ops_sec": float64(workload) / inputDuration.Seconds(), + "data_chan_cap": basicStats["data_chan_cap"], + "result_chan_cap": basicStats["result_chan_cap"], + "sink_pool_cap": basicStats["sink_pool_cap"], + "process_rate": detailedStats["process_rate"], + "drop_rate": detailedStats["drop_rate"], + "performance_level": detailedStats["performance_level"], + } + + results = append(results, result) + + // 详细报告 + t.Logf("=== %s 详细分析 ===", config.name) + t.Logf("初始化开销:") + t.Logf(" 内存: %.2f MB", initMemCost) + t.Logf(" Goroutine: %d 个", initGoroutineCost) + + t.Logf("运行时开销:") + t.Logf(" 额外内存: %.2f MB", runtimeMemUsage) + t.Logf(" 总内存: %.2f MB", totalMemUsage) + t.Logf(" 总Goroutine: %d 个", finalGoroutines-beforeGoroutines) + + t.Logf("性能表现:") + t.Logf(" 输入耗时: %.2f ms", float64(inputDuration.Nanoseconds())/1e6) + t.Logf(" 吞吐量: %.2f ops/sec", float64(workload)/inputDuration.Seconds()) + t.Logf(" 处理效率: %.2f%%", detailedStats["process_rate"]) + t.Logf(" 丢弃率: %.2f%%", detailedStats["drop_rate"]) + + t.Logf("缓冲区配置:") + t.Logf(" 数据通道: %d", basicStats["data_chan_cap"]) + t.Logf(" 结果通道: %d", basicStats["result_chan_cap"]) + t.Logf(" Sink池: %d", basicStats["sink_pool_cap"]) + }) + } + + // 对比分析 + if len(results) == 2 { + defaultResult := results[0] + highPerfResult := results[1] + + t.Log("\n=== 对比分析总结 ===") + + // 内存开销对比 + memMultiplier := highPerfResult["total_memory_mb"].(float64) / defaultResult["total_memory_mb"].(float64) + t.Logf("内存开销倍数: %.1fx (%.2f MB vs %.2f MB)", + memMultiplier, + highPerfResult["total_memory_mb"], + defaultResult["total_memory_mb"]) + + // 性能提升对比 + perfMultiplier := highPerfResult["throughput_ops_sec"].(float64) / defaultResult["throughput_ops_sec"].(float64) + t.Logf("性能提升倍数: %.1fx (%.0f ops/sec vs %.0f ops/sec)", + perfMultiplier, + highPerfResult["throughput_ops_sec"], + defaultResult["throughput_ops_sec"]) + + // 缓冲区容量对比 + dataCapMultiplier := float64(highPerfResult["data_chan_cap"].(int64)) / float64(defaultResult["data_chan_cap"].(int64)) + t.Logf("缓冲区容量倍数: %.1fx", dataCapMultiplier) + + // 成本效益分析 + memEfficiencyDefault := defaultResult["throughput_ops_sec"].(float64) / defaultResult["total_memory_mb"].(float64) + memEfficiencyHighPerf := highPerfResult["throughput_ops_sec"].(float64) / highPerfResult["total_memory_mb"].(float64) + + t.Logf("内存效率对比:") + t.Logf(" 默认配置: %.0f ops/sec/MB", memEfficiencyDefault) + t.Logf(" 高性能配置: %.0f ops/sec/MB", memEfficiencyHighPerf) + t.Logf(" 效率比: %.2fx", memEfficiencyHighPerf/memEfficiencyDefault) + + // 代价分析 + t.Log("\n=== 高性能模式代价分析 ===") + t.Logf("✓ 性能收益: %.1fx 吞吐量提升", perfMultiplier) + t.Logf("✗ 内存代价: %.1fx 内存消耗增长", memMultiplier) + t.Logf("✗ 缓冲区代价: %.1fx 缓冲区容量增长", dataCapMultiplier) + + if memMultiplier < perfMultiplier { + t.Log("✓ 结论: 高性能模式性价比较高,内存增长小于性能提升") + } else { + t.Log("⚠ 结论: 高性能模式需要权衡,内存增长超过性能提升比例") + } + } +} + +// TestLightweightVsDefaultPerformanceAnalysis 专门分析轻量配置vs默认配置性能差异的测试 +func TestLightweightVsDefaultPerformanceAnalysis(t *testing.T) { + configs := []struct { + name string + setupFunc func() *Streamsql + description string + expectedPerf string + }{ + { + name: "轻量配置(5K)", + setupFunc: func() *Streamsql { + return New(WithBufferSizes(5000, 5000, 250)) + }, + description: "5K数据缓冲,5K结果缓冲,250sink池", + expectedPerf: "高吞吐,低内存", + }, + { + name: "默认配置(20K)", + setupFunc: func() *Streamsql { + return New() + }, + description: "20K数据缓冲,20K结果缓冲,800sink池", + expectedPerf: "平衡性能", + }, + { + name: "中等配置(10K)", + setupFunc: func() *Streamsql { + return New(WithBufferSizes(10000, 10000, 400)) + }, + description: "10K数据缓冲,10K结果缓冲,400sink池", + expectedPerf: "介于两者之间", + }, + } + + sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20" + + t.Log("=== 轻量配置 vs 默认配置深度对比分析 ===") + + var results []map[string]interface{} + + for _, config := range configs { + t.Run(config.name, func(t *testing.T) { + // 内存和性能监控 + var beforeMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&beforeMem) + + ssql := config.setupFunc() + defer ssql.Stop() + + err := ssql.Execute(sql) + if err != nil { + t.Fatalf("SQL执行失败: %v", err) + } + + // 测量初始化后内存 + var afterInitMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&afterInitMem) + initMemory := float64(afterInitMem.Alloc-beforeMem.Alloc) / 1024 / 1024 + + var resultCount int64 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&resultCount, 1) + }) + + // 消费resultChan防止阻塞 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + case <-ctx.Done(): + return + } + } + }() + + // 重置统计 + ssql.Stream().ResetStats() + + // 执行性能测试 + testData := generateTestData(3) + iterations := 50000 // 固定迭代次数便于对比 + + start := time.Now() + for i := 0; i < iterations; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + // 等待处理完成 + time.Sleep(100 * time.Millisecond) + + // 获取详细统计 + detailedStats := ssql.Stream().GetDetailedStats() + basicStats := detailedStats["basic_stats"].(map[string]int64) + + // 最终内存测量 + var finalMem runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&finalMem) + totalMemory := float64(finalMem.Alloc-beforeMem.Alloc) / 1024 / 1024 + + // 计算指标 + inputThroughput := float64(iterations) / inputDuration.Seconds() + processRate := detailedStats["process_rate"].(float64) + dropRate := detailedStats["drop_rate"].(float64) + perfLevel := detailedStats["performance_level"].(string) + dataChanUsage := detailedStats["data_chan_usage"].(float64) + memEfficiency := inputThroughput / totalMemory // ops/sec per MB + + result := map[string]interface{}{ + "name": config.name, + "description": config.description, + "data_chan_cap": basicStats["data_chan_cap"], + "result_chan_cap": basicStats["result_chan_cap"], + "sink_pool_cap": basicStats["sink_pool_cap"], + "init_memory_mb": initMemory, + "total_memory_mb": totalMemory, + "input_throughput": inputThroughput, + "process_rate": processRate, + "drop_rate": dropRate, + "performance_level": perfLevel, + "data_chan_usage": dataChanUsage, + "mem_efficiency": memEfficiency, + "input_duration_ms": float64(inputDuration.Nanoseconds()) / 1e6, + } + results = append(results, result) + + // 详细报告 + t.Logf("=== %s 分析报告 ===", config.name) + t.Logf("配置: %s", config.description) + t.Logf("预期: %s", config.expectedPerf) + + t.Logf("缓冲区配置:") + t.Logf(" 数据通道: %d", basicStats["data_chan_cap"]) + t.Logf(" 结果通道: %d", basicStats["result_chan_cap"]) + t.Logf(" Sink池: %d", basicStats["sink_pool_cap"]) + + t.Logf("内存使用:") + t.Logf(" 初始化: %.2f MB", initMemory) + t.Logf(" 总计: %.2f MB", totalMemory) + + t.Logf("性能指标:") + t.Logf(" 输入速率: %.0f ops/sec", inputThroughput) + t.Logf(" 输入耗时: %.2f ms", float64(inputDuration.Nanoseconds())/1e6) + t.Logf(" 处理效率: %.2f%%", processRate) + t.Logf(" 丢弃率: %.2f%%", dropRate) + t.Logf(" 性能等级: %s", perfLevel) + t.Logf(" 数据通道使用率: %.1f%%", dataChanUsage) + t.Logf(" 内存效率: %.0f ops/sec/MB", memEfficiency) + + // 性能分析 + if dropRate > 10 { + t.Logf("⚠ 警告: 丢弃率较高 (%.2f%%)", dropRate) + } else if dropRate < 1 { + t.Logf("✓ 优秀: 丢弃率很低 (%.2f%%)", dropRate) + } + + if dataChanUsage > 80 { + t.Logf("⚠ 警告: 数据通道使用率过高 (%.1f%%)", dataChanUsage) + } else if dataChanUsage < 50 { + t.Logf("✓ 良好: 数据通道使用率适中 (%.1f%%)", dataChanUsage) + } + }) + } + + // 对比分析 + if len(results) >= 2 { + t.Log("\n=== 对比分析结论 ===") + + lightweight := results[0] // 轻量配置 + defaultCfg := results[1] // 默认配置 + + // 性能对比 + perfRatio := lightweight["input_throughput"].(float64) / defaultCfg["input_throughput"].(float64) + memRatio := lightweight["total_memory_mb"].(float64) / defaultCfg["total_memory_mb"].(float64) + memEffRatio := lightweight["mem_efficiency"].(float64) / defaultCfg["mem_efficiency"].(float64) + + t.Logf("性能倍数: %.2fx (轻量 %.0f vs 默认 %.0f ops/sec)", + perfRatio, + lightweight["input_throughput"].(float64), + defaultCfg["input_throughput"].(float64)) + + t.Logf("内存倍数: %.2fx (轻量 %.2f vs 默认 %.2f MB)", + memRatio, + lightweight["total_memory_mb"].(float64), + defaultCfg["total_memory_mb"].(float64)) + + t.Logf("内存效率倍数: %.2fx (轻量 %.0f vs 默认 %.0f ops/sec/MB)", + memEffRatio, + lightweight["mem_efficiency"].(float64), + defaultCfg["mem_efficiency"].(float64)) + + // 分析原因 + t.Log("\n=== 轻量配置性能更高的可能原因 ===") + + lightUsage := lightweight["data_chan_usage"].(float64) + defaultUsage := defaultCfg["data_chan_usage"].(float64) + + t.Logf("1. 缓冲区压力差异:") + t.Logf(" 轻量配置数据通道使用率: %.1f%%", lightUsage) + t.Logf(" 默认配置数据通道使用率: %.1f%%", defaultUsage) + + if lightUsage < defaultUsage { + t.Log(" → 轻量配置缓冲区压力更小,减少了队列等待时间") + } + + lightCapacity := lightweight["data_chan_cap"].(int64) + defaultCapacity := defaultCfg["data_chan_cap"].(int64) + capacityRatio := float64(defaultCapacity) / float64(lightCapacity) + + t.Logf("2. 缓冲区容量差异:") + t.Logf(" 容量倍数: %.1fx (轻量 %d vs 默认 %d)", + capacityRatio, lightCapacity, defaultCapacity) + t.Log(" → 大缓冲区可能导致更多内存分配和GC压力") + + t.Logf("3. 内存分配模式:") + t.Logf(" 轻量配置总内存: %.2f MB", lightweight["total_memory_mb"].(float64)) + t.Logf(" 默认配置总内存: %.2f MB", defaultCfg["total_memory_mb"].(float64)) + t.Log(" → 轻量配置内存占用更少,减少GC频率和暂停时间") + + t.Log("\n=== 技术解释 ===") + t.Log("轻量配置吞吐量更高的核心原因:") + t.Log("1. **内存局部性更好**: 小缓冲区提高CPU缓存命中率") + t.Log("2. **GC压力更小**: 减少垃圾回收的暂停时间") + t.Log("3. **队列效率更高**: 小队列减少锁竞争和等待时间") + t.Log("4. **资源竞争减少**: 更少的内存分配减少系统调用开销") + t.Log("5. **适合高频小数据**: 本测试场景正好符合轻量配置的优势区间") + + if perfRatio > 1.1 { + t.Log("\n✓ 结论: 轻量配置在此场景下确实具有性能优势") + t.Log(" 推荐: 对于高频率、小数据量的场景,优先考虑轻量配置") + } else { + t.Log("\n→ 结论: 性能差异不显著,可能存在测试误差") + } + } +} + +// BenchmarkLightweightVsDefaultComparison 轻量 vs 默认配置基准测试 +func BenchmarkLightweightVsDefaultComparison(b *testing.B) { + tests := []struct { + name string + setupFunc func() *Streamsql + }{ + { + name: "轻量配置5K", + setupFunc: func() *Streamsql { + return New(WithBufferSizes(5000, 5000, 250)) + }, + }, + { + name: "默认配置20K", + setupFunc: func() *Streamsql { + return New() + }, + }, + } + + sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20" + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + ssql := tt.setupFunc() + defer ssql.Stop() + + err := ssql.Execute(sql) + if err != nil { + b.Fatalf("SQL执行失败: %v", err) + } + + var resultCount int64 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&resultCount, 1) + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + case <-ctx.Done(): + return + } + } + }() + + testData := generateTestData(3) + + b.ResetTimer() + + start := time.Now() + for i := 0; i < b.N; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + b.StopTimer() + + time.Sleep(50 * time.Millisecond) + cancel() + + detailedStats := ssql.Stream().GetDetailedStats() + results := atomic.LoadInt64(&resultCount) + + inputThroughput := float64(b.N) / inputDuration.Seconds() + processRate := detailedStats["process_rate"].(float64) + dropRate := detailedStats["drop_rate"].(float64) + dataChanUsage := detailedStats["data_chan_usage"].(float64) + + b.ReportMetric(inputThroughput, "input_ops/sec") + b.ReportMetric(processRate, "process_rate_%") + b.ReportMetric(dropRate, "drop_rate_%") + b.ReportMetric(dataChanUsage, "data_chan_usage_%") + b.ReportMetric(float64(results), "results_count") + + basicStats := detailedStats["basic_stats"].(map[string]int64) + b.Logf("缓冲区配置: 数据通道 %d, 结果通道 %d, Sink池 %d", + basicStats["data_chan_cap"], + basicStats["result_chan_cap"], + basicStats["sink_pool_cap"]) + b.Logf("性能指标: %.0f ops/sec, 处理率 %.1f%%, 丢弃率 %.2f%%, 通道使用率 %.1f%%", + inputThroughput, processRate, dropRate, dataChanUsage) + }) + } +} + +// BenchmarkStreamSQLRealistic 现实的性能基准测试 +func BenchmarkStreamSQLRealistic(b *testing.B) { + tests := []struct { + name string + sql string + hasWindow bool + waitTime time.Duration + }{ + { + name: "SimpleFilter", + sql: "SELECT deviceId, temperature FROM stream WHERE temperature > 20", + hasWindow: false, + waitTime: 50 * time.Millisecond, + }, + { + name: "BasicAggregation", + sql: "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('100ms')", + hasWindow: true, + waitTime: 200 * time.Millisecond, + }, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + // 使用默认配置,避免异常大的缓冲区 + ssql := New() + defer ssql.Stop() + + err := ssql.Execute(tt.sql) + if err != nil { + b.Fatalf("SQL执行失败: %v", err) + } + + var processedCount int64 + var actualResultCount int64 + + // 测量实际的处理完成 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&actualResultCount, 1) + }) + + // 不使用异步消费resultChan,让系统自然处理 + testData := generateTestData(3) + + // 限制测试规模,避免过度膨胀 + maxIterations := min(b.N, 10000) // 最多1万次 + + ssql.Stream().ResetStats() + b.ResetTimer() + + // 受控的输入,测量真实处理性能 + start := time.Now() + for i := 0; i < maxIterations; i++ { + // 直接使用AddData,如果系统处理不过来会自然阻塞或丢弃 + ssql.AddData(testData[i%len(testData)]) + atomic.AddInt64(&processedCount, 1) + + // 每100条数据稍微停顿,模拟真实的数据流 + if i > 0 && i%100 == 0 { + time.Sleep(1 * time.Millisecond) + } + } + inputDuration := time.Since(start) + + b.StopTimer() + + // 等待处理完成 + time.Sleep(tt.waitTime) + + processed := atomic.LoadInt64(&processedCount) + results := atomic.LoadInt64(&actualResultCount) + stats := ssql.Stream().GetStats() + + // 计算真实的处理吞吐量 + realThroughput := float64(processed) / inputDuration.Seconds() + + b.ReportMetric(realThroughput, "realistic_ops/sec") + b.ReportMetric(float64(results), "actual_results") + b.ReportMetric(float64(stats["dropped_count"]), "dropped_data") + + // 输出合理的性能数据范围 + b.Logf("实际输入: %d 条, 实际结果: %d 个", processed, results) + b.Logf("真实吞吐量: %.0f ops/sec (%.1f万 ops/sec)", realThroughput, realThroughput/10000) + + if dropped := stats["dropped_count"]; dropped > 0 { + b.Logf("丢弃数据: %d 条", dropped) + } + }) + } +} + +// min 辅助函数 +func min(a, b int) int { + if a < b { + return a + } + return b +} + +// BenchmarkStreamSQLOptimized 优化的高性能基准测试 +func BenchmarkStreamSQLOptimized(b *testing.B) { + tests := []struct { + name string + sql string + hasWindow bool + waitTime time.Duration + }{ + { + name: "HighThroughputFilter", + sql: "SELECT deviceId, temperature FROM stream WHERE temperature > 20", + hasWindow: false, + waitTime: 10 * time.Millisecond, + }, + { + name: "HighThroughputAggregation", + sql: "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('50ms')", + hasWindow: true, + waitTime: 100 * time.Millisecond, + }, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + // 使用高性能配置 + ssql := New(WithHighPerformance()) + defer ssql.Stop() + + err := ssql.Execute(tt.sql) + if err != nil { + b.Fatalf("SQL执行失败: %v", err) + } + + var actualResultCount int64 + + // 极简的sink,避免任何额外开销 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&actualResultCount, 1) + }) + + // 异步消费resultChan,避免阻塞 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + // 立即丢弃,避免处理开销 + case <-ctx.Done(): + return + } + } + }() + + // 预生成测试数据,避免重复生成开销 + testData := generateTestData(5) + + ssql.Stream().ResetStats() + b.ResetTimer() + + // 纯粹的性能测试:无限制,无延迟 + start := time.Now() + for i := 0; i < b.N; i++ { + ssql.AddData(testData[i%len(testData)]) + } + inputDuration := time.Since(start) + + b.StopTimer() + + // 最小等待时间 + time.Sleep(tt.waitTime) + cancel() + + results := atomic.LoadInt64(&actualResultCount) + stats := ssql.Stream().GetStats() + + // 计算纯输入吞吐量 + inputThroughput := float64(b.N) / inputDuration.Seconds() + + b.ReportMetric(inputThroughput, "optimized_ops/sec") + b.ReportMetric(float64(results), "actual_results") + b.ReportMetric(float64(stats["dropped_count"]), "dropped_data") + + // 输出性能数据 + b.Logf("优化测试 - 输入: %d 条, 结果: %d 个", b.N, results) + b.Logf("优化吞吐量: %.0f ops/sec (%.1f万 ops/sec)", inputThroughput, inputThroughput/10000) + + if dropped := stats["dropped_count"]; dropped > 0 { + dropRate := float64(dropped) / float64(b.N) * 100 + b.Logf("丢弃率: %.2f%% (%d/%d)", dropRate, dropped, b.N) + } + }) + } +} + +// BenchmarkPurePerformance 纯性能基准测试(无等待,无限制) +func BenchmarkPurePerformance(b *testing.B) { + ssql := New(WithHighPerformance()) + defer ssql.Stop() + + sql := "SELECT deviceId FROM stream" + err := ssql.Execute(sql) + if err != nil { + b.Fatal(err) + } + + // 启动结果消费者 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case <-ssql.Stream().GetResultsChan(): + case <-ctx.Done(): + return + } + } + }() + + // 预生成单条数据 + data := map[string]interface{}{ + "deviceId": "device1", + "temperature": 25.0, + } + + b.ResetTimer() + + // 纯输入性能测试 + for i := 0; i < b.N; i++ { + ssql.AddData(data) + } + + duration := b.Elapsed() + throughput := float64(b.N) / duration.Seconds() + + b.StopTimer() + cancel() + + b.ReportMetric(throughput, "pure_ops/sec") + b.Logf("纯输入性能: %.0f ops/sec (%.1f万 ops/sec)", throughput, throughput/10000) +} + +// BenchmarkEndToEndProcessing 端到端处理性能基准测试 +func BenchmarkEndToEndProcessing(b *testing.B) { + tests := []struct { + name string + sql string + batchSize int + expectOutput bool + }{ + { + name: "EndToEndFilter", + sql: "SELECT deviceId, temperature FROM stream WHERE temperature > 20", + batchSize: 1000, + expectOutput: true, + }, + { + name: "EndToEndAggregation", + sql: "SELECT deviceId, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('100ms')", + batchSize: 500, + expectOutput: true, + }, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + ssql := New() + defer ssql.Stop() + + err := ssql.Execute(tt.sql) + if err != nil { + b.Fatalf("SQL执行失败: %v", err) + } + + // 计算需要的批次数 + batches := (b.N + tt.batchSize - 1) / tt.batchSize + + var totalProcessed int64 + var totalDuration time.Duration + + testData := generateTestData(3) + + b.ResetTimer() + + // 分批处理,每批次测量完整的处理时间 + for batch := 0; batch < batches; batch++ { + currentBatchSize := tt.batchSize + if batch == batches-1 { + // 最后一批可能不满 + currentBatchSize = b.N - batch*tt.batchSize + } + + var resultsReceived int64 + resultChan := make(chan bool, currentBatchSize) + + // 设置sink来捕获结果 + ssql.Stream().AddSink(func(result interface{}) { + count := atomic.AddInt64(&resultsReceived, 1) + if count <= int64(currentBatchSize) { + resultChan <- true + } + }) + + // 记录开始时间 + start := time.Now() + + // 输入数据 + for i := 0; i < currentBatchSize; i++ { + ssql.AddData(testData[i%len(testData)]) + } + + // 等待所有结果处理完成(对于非聚合查询) + if tt.expectOutput { + expectedResults := currentBatchSize + if tt.name == "EndToEndAggregation" { + // 聚合查询的结果数量较少,等待至少1个结果 + expectedResults = 1 + } + + receivedCount := 0 + timeout := time.After(5 * time.Second) + for receivedCount < expectedResults { + select { + case <-resultChan: + receivedCount++ + if tt.name == "EndToEndAggregation" && receivedCount >= 1 { + // 聚合查询收到1个结果就算完成 + goto batchDone + } + case <-timeout: + // 超时,记录实际收到的结果 + goto batchDone + } + } + } + + batchDone: + // 记录这批次的处理时间 + batchDuration := time.Since(start) + totalDuration += batchDuration + totalProcessed += int64(currentBatchSize) + + // 注意:没有ClearSinks方法,所以每次测试使用新的Stream实例 + } + + b.StopTimer() + + // 计算真实的端到端吞吐量 + realThroughput := float64(totalProcessed) / totalDuration.Seconds() + + b.ReportMetric(realThroughput, "end_to_end_ops/sec") + b.Logf("端到端测试 - 处理: %d 条, 总耗时: %v", totalProcessed, totalDuration) + b.Logf("端到端吞吐量: %.0f ops/sec (%.1f万 ops/sec)", realThroughput, realThroughput/10000) + }) + } +} + +// BenchmarkSustainedProcessing 持续处理性能基准测试 +func BenchmarkSustainedProcessing(b *testing.B) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20" + err := ssql.Execute(sql) + if err != nil { + b.Fatal(err) + } + + var processedResults int64 + var lastResultTime time.Time + + // 设置结果处理器 + ssql.Stream().AddSink(func(result interface{}) { + atomic.AddInt64(&processedResults, 1) + lastResultTime = time.Now() + }) + + testData := generateTestData(3) + + b.ResetTimer() + start := time.Now() + + // 持续输入数据 + for i := 0; i < b.N; i++ { + ssql.AddData(testData[i%len(testData)]) + + // 每1000条检查一次处理进度 + if i > 0 && i%1000 == 0 { + time.Sleep(1 * time.Millisecond) // 让系统有时间处理 + } + } + + inputEnd := time.Now() + inputDuration := inputEnd.Sub(start) + + // 等待所有结果处理完成 + for { + current := atomic.LoadInt64(&processedResults) + if current >= int64(b.N) { + break + } + if time.Since(lastResultTime) > 2*time.Second { + // 2秒没有新结果,认为处理完成 + break + } + time.Sleep(10 * time.Millisecond) + } + + totalDuration := time.Since(start) + final := atomic.LoadInt64(&processedResults) + + b.StopTimer() + + inputThroughput := float64(b.N) / inputDuration.Seconds() + sustainedThroughput := float64(final) / totalDuration.Seconds() + + b.ReportMetric(inputThroughput, "input_rate_ops/sec") + b.ReportMetric(sustainedThroughput, "sustained_ops/sec") + b.ReportMetric(float64(final), "processed_count") + + b.Logf("持续处理测试:") + b.Logf(" 输入速率: %.0f ops/sec (%.1f万 ops/sec)", inputThroughput, inputThroughput/10000) + b.Logf(" 持续处理速率: %.0f ops/sec (%.1f万 ops/sec)", sustainedThroughput, sustainedThroughput/10000) + b.Logf(" 处理完成率: %.1f%% (%d/%d)", float64(final)/float64(b.N)*100, final, b.N) +}