Files
streamsql/streamsql_benchmark_test.go
2025-08-29 13:49:26 +08:00

447 lines
11 KiB
Go

package streamsql
import (
"context"
"math/rand"
"sync/atomic"
"testing"
"time"
)
// BenchmarkStreamSQL StreamSQL基准测试
func BenchmarkStreamSQL(b *testing.B) {
tests := []struct {
name string
sql string
waitTime time.Duration
}{
{
name: "SimpleFilter",
sql: "SELECT deviceId, temperature FROM stream WHERE temperature > 20",
waitTime: 50 * time.Millisecond,
},
{
name: "BasicAggregation",
sql: "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('100ms')",
waitTime: 150 * time.Millisecond,
},
{
name: "ComplexQuery",
sql: "SELECT deviceId, AVG(temperature), COUNT(*) FROM stream WHERE humidity > 50 GROUP BY deviceId, TumblingWindow('100ms')",
waitTime: 200 * time.Millisecond,
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
// 使用默认配置
ssql := New()
defer ssql.Stop()
err := ssql.Execute(tt.sql)
if err != nil {
b.Fatalf("SQL执行失败: %v", err)
}
var resultCount int64
ssql.AddSink(func(result []map[string]interface{}) {
atomic.AddInt64(&resultCount, 1)
})
// 异步消费结果防止阻塞
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ssql.Stream().GetResultsChan():
case <-ctx.Done():
return
}
}
}()
// 生成测试数据
testData := generateOptimizedTestData(5)
ssql.Stream().ResetStats()
b.ResetTimer()
// 执行基准测试
start := time.Now()
for i := 0; i < b.N; i++ {
ssql.Emit(testData[i%len(testData)])
}
inputDuration := time.Since(start)
b.StopTimer()
// 等待处理完成
time.Sleep(tt.waitTime)
cancel()
// 获取统计信息
stats := ssql.Stream().GetStats()
results := atomic.LoadInt64(&resultCount)
// 计算核心性能指标
throughput := float64(b.N) / inputDuration.Seconds()
processedCount := stats["output_count"]
droppedCount := stats["dropped_count"]
processRate := float64(processedCount) / float64(b.N) * 100
dropRate := float64(droppedCount) / float64(b.N) * 100
// 报告指标
b.ReportMetric(throughput, "ops/sec")
b.ReportMetric(processRate, "process_rate_%")
b.ReportMetric(dropRate, "drop_rate_%")
b.ReportMetric(float64(results), "results")
// 输出可读的性能报告
b.Logf("性能报告 - %s:", tt.name)
b.Logf(" 吞吐量: %.0f ops/sec (%.1f万 ops/sec)", throughput, throughput/10000)
b.Logf(" 处理率: %.1f%%, 丢弃率: %.2f%%", processRate, dropRate)
b.Logf(" 结果数: %d", results)
})
}
}
// BenchmarkConfigurationOptimized 优化后的配置对比基准测试
func BenchmarkConfigurationOptimized(b *testing.B) {
configs := []struct {
name string
setupFunc func() *Streamsql
}{
{
name: "Lightweight",
setupFunc: func() *Streamsql {
return New(WithBufferSizes(5000, 5000, 250))
},
},
{
name: "Default",
setupFunc: func() *Streamsql {
return New()
},
},
{
name: "HighPerformance",
setupFunc: func() *Streamsql {
return New(WithHighPerformance())
},
},
}
sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20"
for _, config := range configs {
b.Run(config.name, func(b *testing.B) {
ssql := config.setupFunc()
defer ssql.Stop()
err := ssql.Execute(sql)
if err != nil {
b.Fatalf("SQL执行失败: %v", err)
}
var resultCount int64
ssql.AddSink(func(result []map[string]interface{}) {
atomic.AddInt64(&resultCount, 1)
})
// 异步消费结果
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ssql.Stream().GetResultsChan():
case <-ctx.Done():
return
}
}
}()
testData := generateOptimizedTestData(3)
ssql.Stream().ResetStats()
b.ResetTimer()
start := time.Now()
for i := 0; i < b.N; i++ {
ssql.Emit(testData[i%len(testData)])
}
inputDuration := time.Since(start)
b.StopTimer()
time.Sleep(100 * time.Millisecond)
cancel()
// 获取详细统计
detailedStats := ssql.Stream().GetDetailedStats()
results := atomic.LoadInt64(&resultCount)
throughput := float64(b.N) / inputDuration.Seconds()
processRate := detailedStats["process_rate"].(float64)
dropRate := detailedStats["drop_rate"].(float64)
b.ReportMetric(throughput, "ops/sec")
b.ReportMetric(processRate, "process_rate_%")
b.ReportMetric(dropRate, "drop_rate_%")
b.Logf("%s配置性能:", config.name)
b.Logf(" 吞吐量: %.0f ops/sec (%.1f万 ops/sec)", throughput, throughput/10000)
b.Logf(" 处理率: %.1f%%, 丢弃率: %.2f%%", processRate, dropRate)
b.Logf(" 结果数: %d", results)
})
}
}
// BenchmarkPureInputOptimized 优化后的纯输入性能测试
func BenchmarkPureInputOptimized(b *testing.B) {
ssql := New(WithHighPerformance())
defer ssql.Stop()
sql := "SELECT deviceId FROM stream"
err := ssql.Execute(sql)
if err != nil {
b.Fatal(err)
}
// 启动结果消费者
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ssql.Stream().GetResultsChan():
case <-ctx.Done():
return
}
}
}()
// 预生成数据
data := map[string]interface{}{
"deviceId": "device1",
"temperature": 25.0,
}
b.ResetTimer()
start := time.Now()
for i := 0; i < b.N; i++ {
ssql.Emit(data)
}
b.StopTimer()
duration := time.Since(start)
throughput := float64(b.N) / duration.Seconds()
b.ReportMetric(throughput, "pure_ops/sec")
b.Logf("纯输入性能: %.0f ops/sec (%.1f万 ops/sec)", throughput, throughput/10000)
}
// BenchmarkPostAggregationPerformance 后聚合性能基准测试
func BenchmarkPostAggregationPerformance(b *testing.B) {
tests := []struct {
name string
sql string
}{
{
name: "SimpleAggregation",
sql: "SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('100ms')",
},
{
name: "PostAggregationSimple",
sql: "SELECT deviceId, AVG(temperature) + 10 as adjusted_temp FROM stream GROUP BY deviceId, TumblingWindow('100ms')",
},
{
name: "PostAggregationComplex",
sql: "SELECT deviceId, CEIL(AVG(temperature) * 1.8 + 32) as fahrenheit FROM stream GROUP BY deviceId, TumblingWindow('100ms')",
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
ssql := New()
defer ssql.Stop()
err := ssql.Execute(tt.sql)
if err != nil {
b.Fatalf("SQL执行失败: %v", err)
}
var resultCount int64
ssql.AddSink(func(result []map[string]interface{}) {
atomic.AddInt64(&resultCount, 1)
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ssql.Stream().GetResultsChan():
case <-ctx.Done():
return
}
}
}()
testData := generateOptimizedTestData(5)
ssql.Stream().ResetStats()
b.ResetTimer()
start := time.Now()
for i := 0; i < b.N; i++ {
ssql.Emit(testData[i%len(testData)])
}
inputDuration := time.Since(start)
b.StopTimer()
time.Sleep(200 * time.Millisecond)
cancel()
stats := ssql.Stream().GetStats()
results := atomic.LoadInt64(&resultCount)
throughput := float64(b.N) / inputDuration.Seconds()
processedCount := stats["output_count"]
droppedCount := stats["dropped_count"]
processRate := float64(processedCount) / float64(b.N) * 100
dropRate := float64(droppedCount) / float64(b.N) * 100
b.ReportMetric(throughput, "ops/sec")
b.ReportMetric(processRate, "process_rate_%")
b.ReportMetric(dropRate, "drop_rate_%")
b.ReportMetric(float64(results), "results")
b.Logf("%s性能:", tt.name)
b.Logf(" 吞吐量: %.0f ops/sec (%.1f万 ops/sec)", throughput, throughput/10000)
b.Logf(" 处理率: %.1f%%, 丢弃率: %.2f%%", processRate, dropRate)
b.Logf(" 结果数: %d", results)
})
}
}
// generateOptimizedTestData 生成优化的测试数据
func generateOptimizedTestData(count int) []map[string]interface{} {
data := make([]map[string]interface{}, count)
devices := []string{"device1", "device2", "device3", "device4", "device5"}
for i := 0; i < count; i++ {
data[i] = map[string]interface{}{
"deviceId": devices[rand.Intn(len(devices))],
"temperature": 15.0 + rand.Float64()*20, // 15-35度
"humidity": 30.0 + rand.Float64()*40, // 30-70%
"timestamp": time.Now().UnixNano(),
}
}
return data
}
// BenchmarkMemoryEfficiency 内存效率基准测试
func BenchmarkMemoryEfficiency(b *testing.B) {
configs := []struct {
name string
setupFunc func() *Streamsql
}{
{
name: "Lightweight5K",
setupFunc: func() *Streamsql {
return New(WithBufferSizes(5000, 5000, 250))
},
},
{
name: "Default20K",
setupFunc: func() *Streamsql {
return New()
},
},
{
name: "HighPerf50K",
setupFunc: func() *Streamsql {
return New(WithHighPerformance())
},
},
}
sql := "SELECT deviceId, temperature FROM stream WHERE temperature > 20"
for _, config := range configs {
b.Run(config.name, func(b *testing.B) {
ssql := config.setupFunc()
defer ssql.Stop()
err := ssql.Execute(sql)
if err != nil {
b.Fatalf("SQL执行失败: %v", err)
}
var resultCount int64
ssql.AddSink(func(result []map[string]interface{}) {
atomic.AddInt64(&resultCount, 1)
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ssql.Stream().GetResultsChan():
case <-ctx.Done():
return
}
}
}()
testData := generateOptimizedTestData(3)
b.ResetTimer()
start := time.Now()
for i := 0; i < b.N; i++ {
ssql.Emit(testData[i%len(testData)])
}
inputDuration := time.Since(start)
b.StopTimer()
time.Sleep(50 * time.Millisecond)
cancel()
detailedStats := ssql.Stream().GetDetailedStats()
results := atomic.LoadInt64(&resultCount)
throughput := float64(b.N) / inputDuration.Seconds()
processRate := detailedStats["process_rate"].(float64)
dataChanUsage := detailedStats["data_chan_usage"].(float64)
resultChanUsage := detailedStats["result_chan_usage"].(float64)
b.ReportMetric(throughput, "ops/sec")
b.ReportMetric(processRate, "process_rate_%")
b.ReportMetric(dataChanUsage, "data_chan_usage_%")
b.ReportMetric(resultChanUsage, "result_chan_usage_%")
b.ReportMetric(float64(results), "results")
basicStats := detailedStats["basic_stats"].(map[string]int64)
b.Logf("%s配置效率:", config.name)
b.Logf(" 缓冲区: 数据%d/结果%d/Sink%d",
basicStats["data_chan_cap"],
basicStats["result_chan_cap"],
basicStats["sink_pool_cap"])
b.Logf(" 吞吐量: %.0f ops/sec, 处理率: %.1f%%, 结果数: %d", throughput, processRate, results)
b.Logf(" 通道使用率: 数据%.1f%%, 结果%.1f%%", dataChanUsage, resultChanUsage)
})
}
}