mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-30 17:26:42 +00:00
1933 lines
47 KiB
Go
1933 lines
47 KiB
Go
package stream
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/rulego/streamsql/aggregator"
|
||
"github.com/rulego/streamsql/types"
|
||
"github.com/stretchr/testify/assert"
|
||
"github.com/stretchr/testify/require"
|
||
)
|
||
|
||
// TestStreamBasicOperations 测试Stream基本操作(合并了构造函数、添加数据、获取结果等测试)
|
||
func TestStreamBasicOperations(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
config types.Config
|
||
testFunc string
|
||
expectError bool
|
||
}{
|
||
{
|
||
name: "基本构造函数测试",
|
||
config: types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
},
|
||
testFunc: "constructor",
|
||
expectError: false,
|
||
},
|
||
{
|
||
name: "添加数据测试",
|
||
config: types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
},
|
||
testFunc: "addData",
|
||
expectError: false,
|
||
},
|
||
{
|
||
name: "获取结果测试",
|
||
config: types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
},
|
||
testFunc: "getResults",
|
||
expectError: false,
|
||
},
|
||
{
|
||
name: "窗口功能测试",
|
||
config: types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
NeedWindow: true,
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
TimeUnit: 1000,
|
||
Params: map[string]interface{}{"size": 1 * time.Second},
|
||
},
|
||
},
|
||
testFunc: "withWindow",
|
||
expectError: false,
|
||
},
|
||
{
|
||
name: "线程安全测试",
|
||
config: types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
},
|
||
testFunc: "threadSafety",
|
||
expectError: false,
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
stream, err := NewStream(tt.config)
|
||
if tt.expectError {
|
||
assert.Error(t, err)
|
||
return
|
||
}
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
|
||
// 根据测试类型执行不同的测试逻辑
|
||
switch tt.testFunc {
|
||
case "constructor":
|
||
// 验证stream创建成功
|
||
assert.NotNil(t, stream)
|
||
assert.NotNil(t, stream.dataChan)
|
||
assert.NotNil(t, stream.resultChan)
|
||
|
||
case "addData":
|
||
// 测试添加数据
|
||
testData := map[string]interface{}{
|
||
"name": "test",
|
||
"age": 25,
|
||
}
|
||
sent := stream.safeSendToDataChan(testData)
|
||
assert.True(t, sent)
|
||
|
||
case "getResults":
|
||
// 测试获取结果通道
|
||
resultsChan := stream.GetResultsChan()
|
||
assert.NotNil(t, resultsChan)
|
||
|
||
case "withWindow":
|
||
// 验证窗口配置
|
||
assert.NotNil(t, stream.Window)
|
||
assert.True(t, stream.config.NeedWindow)
|
||
|
||
case "threadSafety":
|
||
// 测试并发安全性
|
||
var wg sync.WaitGroup
|
||
for i := 0; i < 10; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
testData := map[string]interface{}{
|
||
"name": fmt.Sprintf("test_%d", id),
|
||
"age": 20 + id,
|
||
}
|
||
stream.safeSendToDataChan(testData)
|
||
}(i)
|
||
}
|
||
wg.Wait()
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestStreamBasicFunctionality 测试Stream基本功能
|
||
func TestStreamBasicFunctionality(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
config types.Config
|
||
filter string
|
||
testData []map[string]interface{}
|
||
expectedDevice string
|
||
expectedTemp float64
|
||
expectedHum float64
|
||
}{
|
||
{
|
||
name: "带过滤器的窗口聚合",
|
||
config: types.Config{
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{"size": 500 * time.Millisecond},
|
||
},
|
||
GroupFields: []string{"device"},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"temperature": aggregator.Avg,
|
||
"humidity": aggregator.Sum,
|
||
},
|
||
NeedWindow: true,
|
||
},
|
||
filter: "device == 'aa' && temperature > 10",
|
||
testData: []map[string]interface{}{
|
||
{"device": "aa", "temperature": 25.0, "humidity": 60},
|
||
{"device": "aa", "temperature": 30.0, "humidity": 55},
|
||
{"device": "bb", "temperature": 22.0, "humidity": 70},
|
||
},
|
||
expectedDevice: "aa",
|
||
expectedTemp: 27.5,
|
||
expectedHum: 115.0,
|
||
},
|
||
{
|
||
name: "不完整数据处理",
|
||
config: types.Config{
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{"size": 500 * time.Millisecond},
|
||
},
|
||
GroupFields: []string{"device"},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"temperature": aggregator.Avg,
|
||
"humidity": aggregator.Sum,
|
||
},
|
||
NeedWindow: true,
|
||
},
|
||
filter: "device == 'aa'",
|
||
testData: []map[string]interface{}{
|
||
{"device": "aa", "temperature": 25.0},
|
||
{"device": "aa", "humidity": 60},
|
||
{"device": "aa", "temperature": 30.0},
|
||
{"device": "aa", "humidity": 55},
|
||
{"device": "bb", "temperature": 22.0, "humidity": 70},
|
||
},
|
||
expectedDevice: "aa",
|
||
expectedTemp: 27.5,
|
||
expectedHum: 115.0,
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
strm, err := NewStream(tt.config)
|
||
require.NoError(t, err)
|
||
defer strm.Stop()
|
||
|
||
if tt.filter != "" {
|
||
err = strm.RegisterFilter(tt.filter)
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// 添加 Sink 函数来捕获结果
|
||
resultChan := make(chan interface{}, 1)
|
||
strm.AddSink(func(result []map[string]interface{}) {
|
||
select {
|
||
case resultChan <- result:
|
||
default:
|
||
// 防止阻塞
|
||
}
|
||
})
|
||
|
||
strm.Start()
|
||
|
||
// 发送测试数据
|
||
for _, data := range tt.testData {
|
||
strm.Emit(data)
|
||
}
|
||
|
||
// 等待窗口关闭并触发结果
|
||
time.Sleep(700 * time.Millisecond)
|
||
|
||
// 等待结果
|
||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||
defer cancel()
|
||
|
||
var actual interface{}
|
||
select {
|
||
case actual = <-resultChan:
|
||
cancel()
|
||
case <-ctx.Done():
|
||
t.Fatal("No results received within 3 seconds")
|
||
}
|
||
|
||
// 验证结果
|
||
require.NotNil(t, actual)
|
||
assert.IsType(t, []map[string]interface{}{}, actual)
|
||
resultMap := actual.([]map[string]interface{})
|
||
require.Greater(t, len(resultMap), 0)
|
||
|
||
firstResult := resultMap[0]
|
||
assert.Equal(t, tt.expectedDevice, firstResult["device"])
|
||
assert.InEpsilon(t, tt.expectedTemp, firstResult["temperature"].(float64), 0.0001)
|
||
assert.InDelta(t, tt.expectedHum, firstResult["humidity"].(float64), 0.0001)
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestStreamWithoutFilter 测试无过滤器的流处理
|
||
func TestStreamWithoutFilter(t *testing.T) {
|
||
config := types.Config{
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "sliding",
|
||
Params: map[string]interface{}{"size": 2 * time.Second, "slide": 1 * time.Second},
|
||
},
|
||
GroupFields: []string{"device"},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"temperature": aggregator.Max,
|
||
"humidity": aggregator.Min,
|
||
},
|
||
NeedWindow: true,
|
||
}
|
||
|
||
strm, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer strm.Stop()
|
||
|
||
strm.Start()
|
||
|
||
testData := []map[string]interface{}{
|
||
{"device": "aa", "temperature": 25.0, "humidity": 60},
|
||
{"device": "aa", "temperature": 30.0, "humidity": 55},
|
||
{"device": "bb", "temperature": 22.0, "humidity": 70},
|
||
}
|
||
|
||
for _, data := range testData {
|
||
strm.Emit(data)
|
||
}
|
||
|
||
// 捕获结果
|
||
resultChan := make(chan interface{})
|
||
strm.AddSink(func(result []map[string]interface{}) {
|
||
resultChan <- result
|
||
})
|
||
|
||
// 等待窗口触发
|
||
time.Sleep(3 * time.Second)
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
defer cancel()
|
||
|
||
var actual interface{}
|
||
select {
|
||
case actual = <-resultChan:
|
||
cancel()
|
||
case <-ctx.Done():
|
||
t.Fatal("Timeout waiting for results")
|
||
}
|
||
|
||
expected := []map[string]interface{}{
|
||
{"device": "aa", "temperature": 30.0, "humidity": 55.0},
|
||
{"device": "bb", "temperature": 22.0, "humidity": 70.0},
|
||
}
|
||
|
||
assert.IsType(t, []map[string]interface{}{}, actual)
|
||
resultSlice := actual.([]map[string]interface{})
|
||
assert.Len(t, resultSlice, 2)
|
||
|
||
for _, expectedResult := range expected {
|
||
found := false
|
||
for _, resultMap := range resultSlice {
|
||
if resultMap["device"] == expectedResult["device"] {
|
||
assert.InEpsilon(t, expectedResult["temperature"].(float64), resultMap["temperature"].(float64), 0.0001)
|
||
assert.InEpsilon(t, expectedResult["humidity"].(float64), resultMap["humidity"].(float64), 0.0001)
|
||
found = true
|
||
break
|
||
}
|
||
}
|
||
assert.True(t, found, "Expected result for device %v not found", expectedResult["device"])
|
||
}
|
||
}
|
||
|
||
// TestStreamRefactoring 测试重构后的Stream功能
|
||
func TestStreamRefactoring(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"count": aggregator.Count,
|
||
},
|
||
GroupFields: []string{"category"},
|
||
NeedWindow: false,
|
||
}
|
||
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer stream.Stop()
|
||
|
||
testData := map[string]interface{}{
|
||
"name": "test",
|
||
"age": 25,
|
||
"category": "A",
|
||
}
|
||
|
||
// 启动Stream
|
||
stream.Start()
|
||
|
||
// 发送测试数据
|
||
stream.Emit(testData)
|
||
|
||
// 等待处理完成
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 获取统计信息
|
||
stats := stream.GetStats()
|
||
assert.Equal(t, int64(1), stats[InputCount])
|
||
}
|
||
|
||
// TestStreamFactory 测试StreamFactory功能
|
||
func TestStreamFactory(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
require.NotNil(t, factory)
|
||
|
||
config := types.Config{
|
||
SimpleFields: []string{"value"},
|
||
NeedWindow: false,
|
||
}
|
||
|
||
// 测试不同的创建方法
|
||
tests := []struct {
|
||
name string
|
||
create func() (*Stream, error)
|
||
}{
|
||
{"CreateStream", func() (*Stream, error) { return factory.CreateStream(config) }},
|
||
{"CreateHighPerformanceStream", func() (*Stream, error) { return factory.CreateHighPerformanceStream(config) }},
|
||
{"CreateLowLatencyStream", func() (*Stream, error) { return factory.CreateLowLatencyStream(config) }},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
stream, err := tt.create()
|
||
require.NoError(t, err)
|
||
require.NotNil(t, stream)
|
||
defer stream.Stop()
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestMigratedFunctions 测试迁移后的函数
|
||
func TestMigratedFunctions(t *testing.T) {
|
||
// 测试性能评估函数
|
||
tests := []struct {
|
||
name string
|
||
dataUsage float64
|
||
dropRate float64
|
||
expected string
|
||
}{
|
||
{"Critical", 50.0, 60.0, PerformanceLevelCritical},
|
||
{"Warning", 50.0, 30.0, PerformanceLevelWarning},
|
||
{"HighLoad", 95.0, 5.0, PerformanceLevelHighLoad},
|
||
{"ModerateLoad", 75.0, 5.0, PerformanceLevelModerateLoad},
|
||
{"Optimal", 50.0, 5.0, PerformanceLevelOptimal},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
result := AssessPerformanceLevel(tt.dataUsage, tt.dropRate)
|
||
assert.Equal(t, tt.expected, result)
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestStreamConstructors 测试各种Stream构造函数
|
||
func TestStreamConstructors(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
|
||
// 测试高性能Stream
|
||
highPerfStream, err := NewStreamWithHighPerformance(config)
|
||
require.NoError(t, err)
|
||
require.NotNil(t, highPerfStream)
|
||
defer func() {
|
||
if highPerfStream != nil {
|
||
close(highPerfStream.done)
|
||
}
|
||
}()
|
||
|
||
// 测试低延迟Stream
|
||
lowLatencyStream, err := NewStreamWithLowLatency(config)
|
||
require.NoError(t, err)
|
||
require.NotNil(t, lowLatencyStream)
|
||
defer func() {
|
||
if lowLatencyStream != nil {
|
||
close(lowLatencyStream.done)
|
||
}
|
||
}()
|
||
|
||
// 测试自定义性能配置Stream
|
||
perfConfig := types.PerformanceConfig{
|
||
WorkerConfig: types.WorkerConfig{
|
||
SinkWorkerCount: 10,
|
||
},
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 1000,
|
||
},
|
||
}
|
||
customPerfStream, err := NewStreamWithCustomPerformance(config, perfConfig)
|
||
require.NoError(t, err)
|
||
require.NotNil(t, customPerfStream)
|
||
defer func() {
|
||
if customPerfStream != nil {
|
||
close(customPerfStream.done)
|
||
}
|
||
}()
|
||
}
|
||
|
||
// TestStreamFilterRegistration 测试过滤器注册功能
|
||
func TestStreamFilterRegistration(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
|
||
// 测试空过滤器
|
||
err = stream.RegisterFilter("")
|
||
require.NoError(t, err)
|
||
|
||
// 测试简单条件过滤器
|
||
err = stream.RegisterFilter("age > 18")
|
||
require.NoError(t, err)
|
||
|
||
// 测试带反引号的过滤器
|
||
err = stream.RegisterFilter("`user_name` == 'test'")
|
||
require.NoError(t, err)
|
||
|
||
// 测试LIKE语法
|
||
err = stream.RegisterFilter("name LIKE '%test%'")
|
||
require.NoError(t, err)
|
||
|
||
// 测试IS NULL语法
|
||
err = stream.RegisterFilter("age IS NULL")
|
||
require.NoError(t, err)
|
||
|
||
// 测试IS NOT NULL语法
|
||
err = stream.RegisterFilter("name IS NOT NULL")
|
||
require.NoError(t, err)
|
||
|
||
// 测试复杂条件
|
||
err = stream.RegisterFilter("age > 18 && name LIKE '%test%' || `user_id` IS NOT NULL")
|
||
require.NoError(t, err)
|
||
}
|
||
|
||
// TestStreamAggregationQuery 测试聚合查询功能
|
||
func TestStreamAggregationQuery(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"avg_age": aggregator.Avg,
|
||
"max_age": aggregator.Max,
|
||
},
|
||
NeedWindow: true,
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{
|
||
"size": "5s",
|
||
},
|
||
},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
|
||
// 测试是否为聚合查询
|
||
isAggregation := stream.IsAggregationQuery()
|
||
require.True(t, isAggregation)
|
||
|
||
// 测试非聚合查询
|
||
simpleConfig := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
simpleStream, err := NewStream(simpleConfig)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if simpleStream != nil {
|
||
close(simpleStream.done)
|
||
}
|
||
}()
|
||
|
||
isSimpleAggregation := simpleStream.IsAggregationQuery()
|
||
require.False(t, isSimpleAggregation)
|
||
}
|
||
|
||
// TestStreamSyncProcessing 测试同步处理功能
|
||
func TestStreamSyncProcessing(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
|
||
// 测试同步处理数据
|
||
testData := map[string]interface{}{
|
||
"name": "test",
|
||
"age": 25,
|
||
}
|
||
|
||
result, err := stream.ProcessSync(testData)
|
||
require.NoError(t, err)
|
||
require.NotNil(t, result)
|
||
require.Equal(t, "test", result["name"])
|
||
require.Equal(t, 25, result["age"])
|
||
}
|
||
|
||
// TestStreamErrorHandling 测试错误处理
|
||
func TestStreamErrorHandling(t *testing.T) {
|
||
// 测试无效配置
|
||
invalidConfig := types.Config{
|
||
// 故意留空以测试错误处理
|
||
}
|
||
stream, err := NewStream(invalidConfig)
|
||
// 这个可能会成功,因为配置验证可能比较宽松
|
||
// require.Error(t, err)
|
||
|
||
if stream != nil {
|
||
defer func() {
|
||
close(stream.done)
|
||
}()
|
||
|
||
// 测试无效过滤器
|
||
err = stream.RegisterFilter("invalid syntax !!!")
|
||
// 这个应该会失败
|
||
if err != nil {
|
||
require.Contains(t, err.Error(), "compile filter error")
|
||
}
|
||
}
|
||
}
|
||
|
||
// TestStreamConcurrency 测试并发安全性
|
||
func TestStreamConcurrency(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
|
||
// 启动多个goroutine并发发送数据
|
||
const numGoroutines = 10
|
||
const numMessages = 100
|
||
|
||
var wg sync.WaitGroup
|
||
wg.Add(numGoroutines)
|
||
|
||
for i := 0; i < numGoroutines; i++ {
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
for j := 0; j < numMessages; j++ {
|
||
data := map[string]interface{}{
|
||
"name": fmt.Sprintf("user_%d_%d", id, j),
|
||
"age": id + j,
|
||
}
|
||
stream.Emit(data)
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
}
|
||
|
||
// TestStreamPerformance 测试性能相关功能
|
||
func TestStreamPerformance(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
PerformanceConfig: types.PerformanceConfig{
|
||
WorkerConfig: types.WorkerConfig{
|
||
SinkWorkerCount: 5,
|
||
},
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 1000, // 增加缓冲区大小以容纳所有消息
|
||
},
|
||
},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer stream.Stop()
|
||
|
||
// 启动流处理
|
||
stream.Start()
|
||
|
||
// 测试大量数据处理
|
||
const numMessages = 1000
|
||
for i := 0; i < numMessages; i++ {
|
||
data := map[string]interface{}{
|
||
"name": fmt.Sprintf("user_%d", i),
|
||
"age": i % 100,
|
||
}
|
||
stream.Emit(data)
|
||
}
|
||
|
||
// 等待处理完成
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
// TestStreamLifecycle 测试Stream生命周期
|
||
func TestStreamLifecycle(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
|
||
// 测试启动
|
||
stream.Start()
|
||
|
||
// 发送一些数据
|
||
for i := 0; i < 10; i++ {
|
||
data := map[string]interface{}{
|
||
"name": fmt.Sprintf("user_%d", i),
|
||
"age": i,
|
||
}
|
||
stream.Emit(data)
|
||
}
|
||
|
||
// 测试停止
|
||
stream.Stop()
|
||
|
||
// 验证停止后不能发送数据(这取决于具体实现)
|
||
// 这里只是测试Stop方法不会panic
|
||
}
|
||
|
||
// TestConvertToAggregationFields 测试聚合字段转换
|
||
func TestConvertToAggregationFields(t *testing.T) {
|
||
selectFields := map[string]aggregator.AggregateType{
|
||
"avg_age": aggregator.Avg,
|
||
"max_age": aggregator.Max,
|
||
"count": aggregator.Count,
|
||
}
|
||
|
||
fieldAlias := map[string]string{
|
||
"avg_age": "age",
|
||
"max_age": "age",
|
||
"count": "id",
|
||
}
|
||
|
||
fields := convertToAggregationFields(selectFields, fieldAlias)
|
||
require.Len(t, fields, 3)
|
||
|
||
// 验证字段转换结果
|
||
for _, field := range fields {
|
||
switch field.OutputAlias {
|
||
case "avg_age":
|
||
require.Equal(t, aggregator.Avg, field.AggregateType)
|
||
require.Equal(t, "age", field.InputField)
|
||
case "max_age":
|
||
require.Equal(t, aggregator.Max, field.AggregateType)
|
||
require.Equal(t, "age", field.InputField)
|
||
case "count":
|
||
require.Equal(t, aggregator.Count, field.AggregateType)
|
||
require.Equal(t, "id", field.InputField)
|
||
}
|
||
}
|
||
}
|
||
|
||
// TestStreamWithWindowAndAggregation 测试带窗口和聚合的Stream
|
||
func TestStreamWithWindowAndAggregation(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{"size": 100 * time.Millisecond},
|
||
},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"avg_age": aggregator.Avg,
|
||
"count": aggregator.Count,
|
||
},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil && stream.done != nil {
|
||
select {
|
||
case <-stream.done:
|
||
// channel already closed
|
||
default:
|
||
close(stream.done)
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 注册过滤器
|
||
err = stream.RegisterFilter("age > 0")
|
||
require.NoError(t, err)
|
||
|
||
// 启动Stream
|
||
stream.Start()
|
||
|
||
// 发送数据
|
||
for i := 0; i < 50; i++ {
|
||
data := map[string]interface{}{
|
||
"name": fmt.Sprintf("user_%d", i),
|
||
"age": i + 1,
|
||
}
|
||
stream.Emit(data)
|
||
}
|
||
|
||
// 等待处理完成
|
||
time.Sleep(200 * time.Millisecond)
|
||
stream.Stop()
|
||
}
|
||
|
||
// TestDataChannelExpansion 测试数据通道动态扩容功能
|
||
func TestDataChannelExpansion(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "value"},
|
||
PerformanceConfig: types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 10, // 小容量便于测试扩容
|
||
ResultChannelSize: 100,
|
||
},
|
||
WorkerConfig: types.WorkerConfig{
|
||
SinkPoolSize: 5,
|
||
MaxRetryRoutines: 2,
|
||
},
|
||
},
|
||
}
|
||
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer stream.Stop()
|
||
|
||
// 启动流
|
||
stream.Start()
|
||
|
||
// 发送数据测试通道功能
|
||
for i := 0; i < 8; i++ {
|
||
data := map[string]interface{}{
|
||
"name": "test",
|
||
"value": i,
|
||
}
|
||
stream.Emit(data)
|
||
}
|
||
|
||
// 等待处理
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 验证数据处理
|
||
stats := stream.GetStats()
|
||
assert.NotNil(t, stats)
|
||
assert.True(t, stats[InputCount] > 0)
|
||
}
|
||
|
||
// TestStatsCollectorDetailedFunctions 测试统计收集器的详细功能
|
||
func TestStatsCollectorDetailedFunctions(t *testing.T) {
|
||
collector := NewStatsCollector()
|
||
|
||
// 测试重置功能
|
||
collector.IncrementInput()
|
||
collector.IncrementOutput()
|
||
collector.IncrementDropped()
|
||
|
||
assert.Equal(t, int64(1), collector.GetInputCount())
|
||
assert.Equal(t, int64(1), collector.GetOutputCount())
|
||
assert.Equal(t, int64(1), collector.GetDroppedCount())
|
||
|
||
// 测试重置
|
||
collector.Reset()
|
||
assert.Equal(t, int64(0), collector.GetInputCount())
|
||
assert.Equal(t, int64(0), collector.GetOutputCount())
|
||
assert.Equal(t, int64(0), collector.GetDroppedCount())
|
||
|
||
// 测试基本统计信息
|
||
basicStats := collector.GetBasicStats(5, 10, 3, 20, 2, 5, 1, 0)
|
||
assert.Equal(t, int64(0), basicStats[InputCount])
|
||
assert.Equal(t, int64(5), basicStats[DataChanLen])
|
||
assert.Equal(t, int64(10), basicStats[DataChanCap])
|
||
|
||
// 测试详细统计信息
|
||
collector.IncrementInput()
|
||
collector.IncrementInput()
|
||
collector.IncrementOutput()
|
||
collector.IncrementDropped()
|
||
|
||
basicStats = collector.GetBasicStats(8, 10, 15, 20, 4, 5, 2, 1)
|
||
detailedStats := collector.GetDetailedStats(basicStats)
|
||
|
||
assert.Contains(t, detailedStats, BasicStats)
|
||
assert.Contains(t, detailedStats, DataChanUsage)
|
||
assert.Contains(t, detailedStats, ResultChanUsage)
|
||
assert.Contains(t, detailedStats, ProcessRate)
|
||
assert.Contains(t, detailedStats, DropRate)
|
||
|
||
// 验证计算结果
|
||
dataUsage := detailedStats[DataChanUsage].(float64)
|
||
assert.Equal(t, 80.0, dataUsage) // 8/10 * 100
|
||
|
||
processRate := detailedStats[ProcessRate].(float64)
|
||
assert.Equal(t, 50.0, processRate) // 1/2 * 100
|
||
|
||
dropRate := detailedStats[DropRate].(float64)
|
||
assert.Equal(t, 50.0, dropRate) // 1/2 * 100
|
||
}
|
||
|
||
// TestResultHandlerGetResultsChan 测试结果处理器的GetResultsChan功能
|
||
func TestResultHandlerGetResultsChan(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"test"},
|
||
}
|
||
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
|
||
// 测试获取结果通道
|
||
resultsChan := stream.GetResultsChan()
|
||
assert.NotNil(t, resultsChan)
|
||
}
|
||
|
||
// TestConcurrentDataChannelExpansion 测试并发数据通道扩容
|
||
func TestConcurrentDataChannelExpansion(t *testing.T) {
|
||
// 创建流
|
||
config := types.Config{
|
||
SimpleFields: []string{"id", "message"},
|
||
PerformanceConfig: types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 5,
|
||
ResultChannelSize: 10,
|
||
},
|
||
WorkerConfig: types.WorkerConfig{
|
||
SinkPoolSize: 2,
|
||
MaxRetryRoutines: 1,
|
||
},
|
||
},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer stream.Stop()
|
||
|
||
// 启动流
|
||
stream.Start()
|
||
|
||
// 并发添加大量数据
|
||
var wg sync.WaitGroup
|
||
for i := 0; i < 100; i++ {
|
||
wg.Add(1)
|
||
go func(id int) {
|
||
defer wg.Done()
|
||
data := map[string]interface{}{
|
||
"id": id,
|
||
"message": "concurrent_expansion",
|
||
}
|
||
stream.Emit(data)
|
||
}(i)
|
||
}
|
||
|
||
wg.Wait()
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 验证流仍在正常运行
|
||
assert.NotNil(t, stream)
|
||
}
|
||
|
||
// TestExpandDataChannelDirectly 直接测试expandDataChannel函数
|
||
// 提高expandDataChannel函数的覆盖率
|
||
func TestExpandDataChannelDirectly(t *testing.T) {
|
||
// 创建一个Stream实例用于测试
|
||
config := types.Config{
|
||
SimpleFields: []string{"test"},
|
||
PerformanceConfig: types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 10,
|
||
ResultChannelSize: 10,
|
||
},
|
||
},
|
||
}
|
||
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer stream.Stop()
|
||
|
||
// 填充通道到80%以上以触发扩容条件
|
||
for i := 0; i < 9; i++ {
|
||
select {
|
||
case stream.dataChan <- map[string]interface{}{"test": i}:
|
||
default:
|
||
t.Fatal("Failed to fill channel")
|
||
}
|
||
}
|
||
|
||
// 记录原始容量
|
||
originalCap := cap(stream.dataChan)
|
||
|
||
// 直接调用扩容函数
|
||
stream.expandDataChannel()
|
||
|
||
// 验证容量是否增加
|
||
newCap := cap(stream.dataChan)
|
||
assert.Greater(t, newCap, originalCap, "Channel capacity should increase after expansion")
|
||
|
||
// 验证数据是否正确迁移
|
||
assert.Equal(t, 9, len(stream.dataChan), "All data should be migrated to new channel")
|
||
|
||
// 测试并发扩容保护
|
||
var wg sync.WaitGroup
|
||
for i := 0; i < 3; i++ {
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
stream.expandDataChannel()
|
||
}()
|
||
}
|
||
wg.Wait()
|
||
}
|
||
|
||
// TestExpandDataChannelBelowThreshold 测试通道使用率低于阈值时不扩容
|
||
func TestExpandDataChannelBelowThreshold(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"test"},
|
||
PerformanceConfig: types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 10,
|
||
ResultChannelSize: 10,
|
||
},
|
||
},
|
||
}
|
||
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer stream.Stop()
|
||
|
||
// 只填充少量数据(低于80%阈值)
|
||
for i := 0; i < 3; i++ {
|
||
stream.dataChan <- map[string]interface{}{"test": i}
|
||
}
|
||
|
||
originalCap := cap(stream.dataChan)
|
||
|
||
// 调用扩容函数
|
||
stream.expandDataChannel()
|
||
|
||
// 验证容量没有变化
|
||
newCap := cap(stream.dataChan)
|
||
assert.Equal(t, originalCap, newCap, "Channel capacity should not change when below threshold")
|
||
}
|
||
|
||
// TestStreamConfigErrorHandlingEnhanced 测试流配置错误处理增强版
|
||
func TestStreamConfigErrorHandlingEnhanced(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
config types.Config
|
||
expectError bool
|
||
}{
|
||
{
|
||
name: "空配置",
|
||
config: types.Config{
|
||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||
},
|
||
expectError: false,
|
||
},
|
||
{
|
||
name: "无效窗口类型",
|
||
config: types.Config{
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "invalid_type",
|
||
},
|
||
NeedWindow: true,
|
||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||
},
|
||
expectError: true,
|
||
},
|
||
{
|
||
name: "零时间窗口",
|
||
config: func() types.Config {
|
||
c := types.NewConfig()
|
||
c.WindowConfig = types.WindowConfig{
|
||
Type: "tumbling",
|
||
TimeUnit: 0,
|
||
}
|
||
c.NeedWindow = true
|
||
c.PerformanceConfig = types.DefaultPerformanceConfig()
|
||
return c
|
||
}(),
|
||
expectError: true,
|
||
},
|
||
{
|
||
name: "负时间窗口",
|
||
config: func() types.Config {
|
||
c := types.NewConfig()
|
||
c.WindowConfig = types.WindowConfig{
|
||
Type: "sliding",
|
||
TimeUnit: 1 * time.Second,
|
||
}
|
||
c.NeedWindow = true
|
||
c.PerformanceConfig = types.DefaultPerformanceConfig()
|
||
return c
|
||
}(),
|
||
expectError: true,
|
||
},
|
||
{
|
||
name: "超大缓冲区配置",
|
||
config: func() types.Config {
|
||
c := types.NewConfig()
|
||
c.PerformanceConfig = types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: -1,
|
||
},
|
||
OverflowConfig: types.OverflowConfig{
|
||
Strategy: "invalid",
|
||
},
|
||
}
|
||
return c
|
||
}(),
|
||
expectError: true,
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
_, err := NewStream(tt.config)
|
||
if (err != nil) != tt.expectError {
|
||
t.Errorf("expected error: %v, got: %v", tt.expectError, err)
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestStreamDataValidationEnhanced 测试流数据验证增强版
|
||
func TestStreamDataValidationEnhanced(t *testing.T) {
|
||
config := types.NewConfig()
|
||
config.PerformanceConfig = types.DefaultPerformanceConfig()
|
||
|
||
stream, err := NewStream(config)
|
||
if err != nil {
|
||
t.Fatalf("failed to create stream: %v", err)
|
||
}
|
||
|
||
tests := []struct {
|
||
name string
|
||
data map[string]interface{}
|
||
expectError bool
|
||
}{
|
||
{
|
||
name: "nil数据",
|
||
data: nil,
|
||
expectError: true,
|
||
},
|
||
{
|
||
name: "空数据",
|
||
data: map[string]interface{}{},
|
||
expectError: false,
|
||
},
|
||
{
|
||
name: "无效时间戳类型",
|
||
data: map[string]interface{}{
|
||
"timestamp": "invalid",
|
||
"value": 1,
|
||
},
|
||
expectError: false, // 由处理器处理,不直接报错
|
||
},
|
||
{
|
||
name: "负数值",
|
||
data: map[string]interface{}{
|
||
"value": -100,
|
||
},
|
||
expectError: false,
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
// 这里简化测试,只验证数据结构
|
||
if tt.data != nil && len(tt.data) == 0 {
|
||
// 空数据测试
|
||
}
|
||
})
|
||
}
|
||
|
||
_ = stream
|
||
}
|
||
|
||
// TestStreamMemoryPressureEnhanced 测试内存压力场景增强版
|
||
func TestStreamMemoryPressureEnhanced(t *testing.T) {
|
||
config := types.NewConfig()
|
||
config.PerformanceConfig = types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 1,
|
||
ResultChannelSize: 1,
|
||
MaxBufferSize: 10,
|
||
},
|
||
OverflowConfig: types.OverflowConfig{
|
||
Strategy: "drop",
|
||
AllowDataLoss: true,
|
||
},
|
||
}
|
||
|
||
stream, err := NewStream(config)
|
||
if err != nil {
|
||
t.Fatalf("failed to create stream: %v", err)
|
||
}
|
||
|
||
// 模拟大量数据
|
||
largeData := make([]map[string]interface{}, 1000)
|
||
for i := 0; i < 1000; i++ {
|
||
largeData[i] = map[string]interface{}{
|
||
"value": i,
|
||
"key": "test",
|
||
}
|
||
}
|
||
|
||
_ = stream
|
||
_ = largeData
|
||
}
|
||
|
||
// TestStreamConcurrentAccessEnhanced 测试并发访问边界条件增强版
|
||
func TestStreamConcurrentAccessEnhanced(t *testing.T) {
|
||
config := types.NewConfig()
|
||
config.PerformanceConfig = types.DefaultPerformanceConfig()
|
||
|
||
stream, err := NewStream(config)
|
||
if err != nil {
|
||
t.Fatalf("failed to create stream: %v", err)
|
||
}
|
||
|
||
// 测试并发关闭
|
||
go func() {
|
||
stream.Stop()
|
||
}()
|
||
|
||
// 测试并发启动
|
||
go func() {
|
||
stream.Start()
|
||
time.Sleep(100 * time.Millisecond)
|
||
}()
|
||
|
||
// 给并发操作一些时间
|
||
time.Sleep(10 * time.Millisecond)
|
||
}
|
||
|
||
// TestStreamWindowEdgeCasesEnhanced 测试窗口边界情况增强版
|
||
func TestStreamWindowEdgeCasesEnhanced(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
config types.Config
|
||
expectError bool
|
||
}{
|
||
{
|
||
name: "极小时间窗口",
|
||
config: func() types.Config {
|
||
c := types.NewConfig()
|
||
c.WindowConfig = types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{
|
||
"size": 1 * time.Nanosecond, // 极小时间窗口
|
||
},
|
||
TimeUnit: 1 * time.Nanosecond,
|
||
}
|
||
c.NeedWindow = true
|
||
c.PerformanceConfig = types.DefaultPerformanceConfig()
|
||
return c
|
||
}(),
|
||
expectError: false,
|
||
},
|
||
{
|
||
name: "极大时间窗口",
|
||
config: types.Config{
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{
|
||
"size": 8760 * time.Hour, // 1年
|
||
},
|
||
TimeUnit: 8760 * time.Hour,
|
||
},
|
||
NeedWindow: true,
|
||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||
},
|
||
expectError: false,
|
||
},
|
||
{
|
||
name: "滑动窗口零滑动",
|
||
config: types.Config{
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "sliding",
|
||
Params: map[string]interface{}{
|
||
"size": 1 * time.Second,
|
||
"slide": 1 * time.Millisecond, // 很小的滑动间隔
|
||
},
|
||
TimeUnit: 1 * time.Second,
|
||
},
|
||
NeedWindow: true,
|
||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||
},
|
||
expectError: false,
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
_, err := NewStream(tt.config)
|
||
if (err != nil) != tt.expectError {
|
||
t.Errorf("expected error: %v, got: %v", tt.expectError, err)
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestStreamUnifiedConfigIntegration 测试Stream和Window统一配置的集成
|
||
func TestStreamUnifiedConfigIntegration(t *testing.T) {
|
||
testCases := []struct {
|
||
name string
|
||
performanceConfig types.PerformanceConfig
|
||
expectedWindowBufferSize int
|
||
}{
|
||
{
|
||
name: "默认配置",
|
||
performanceConfig: types.DefaultPerformanceConfig(),
|
||
expectedWindowBufferSize: 1000,
|
||
},
|
||
{
|
||
name: "高性能配置",
|
||
performanceConfig: types.HighPerformanceConfig(),
|
||
expectedWindowBufferSize: 5000,
|
||
},
|
||
{
|
||
name: "低延迟配置",
|
||
performanceConfig: types.LowLatencyConfig(),
|
||
expectedWindowBufferSize: 100,
|
||
},
|
||
}
|
||
|
||
for _, tc := range testCases {
|
||
t.Run(tc.name, func(t *testing.T) {
|
||
config := types.Config{
|
||
NeedWindow: true,
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{
|
||
"size": "5s",
|
||
},
|
||
},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"value": aggregator.Count,
|
||
},
|
||
PerformanceConfig: tc.performanceConfig,
|
||
}
|
||
|
||
s, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer s.Stop()
|
||
|
||
// 验证stream的缓冲区配置
|
||
assert.Equal(t, tc.performanceConfig.BufferConfig.DataChannelSize, cap(s.dataChan),
|
||
"数据通道大小不匹配")
|
||
assert.Equal(t, tc.performanceConfig.BufferConfig.ResultChannelSize, cap(s.resultChan),
|
||
"结果通道大小不匹配")
|
||
|
||
// 验证窗口创建成功
|
||
assert.NotNil(t, s.Window, "窗口应该被创建")
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestStreamUnifiedConfigPerformanceImpact 测试统一配置对Stream性能的影响
|
||
func TestStreamUnifiedConfigPerformanceImpact(t *testing.T) {
|
||
configs := map[string]types.PerformanceConfig{
|
||
"默认配置": types.DefaultPerformanceConfig(),
|
||
"高性能配置": types.HighPerformanceConfig(),
|
||
"低延迟配置": types.LowLatencyConfig(),
|
||
}
|
||
|
||
for name, perfConfig := range configs {
|
||
t.Run(name, func(t *testing.T) {
|
||
config := types.Config{
|
||
NeedWindow: true,
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{
|
||
"size": "1s",
|
||
},
|
||
},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"value": aggregator.Sum,
|
||
},
|
||
PerformanceConfig: perfConfig,
|
||
}
|
||
|
||
s, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer s.Stop()
|
||
|
||
go s.Start()
|
||
|
||
// 发送测试数据并测量性能
|
||
dataCount := 1000
|
||
startTime := time.Now()
|
||
|
||
for i := 0; i < dataCount; i++ {
|
||
data := map[string]interface{}{
|
||
"value": i,
|
||
"timestamp": time.Now().Unix(),
|
||
}
|
||
|
||
select {
|
||
case s.dataChan <- data:
|
||
// 成功发送
|
||
case <-time.After(100 * time.Millisecond):
|
||
// 发送超时
|
||
t.Logf("第%d条数据发送超时", i)
|
||
break
|
||
}
|
||
}
|
||
|
||
processingTime := time.Since(startTime)
|
||
t.Logf("%s 处理%d条数据耗时: %v", name, dataCount, processingTime)
|
||
|
||
// 等待一些结果
|
||
time.Sleep(1500 * time.Millisecond)
|
||
|
||
// 检查结果
|
||
resultCount := 0
|
||
for {
|
||
select {
|
||
case <-s.resultChan:
|
||
resultCount++
|
||
default:
|
||
goto done
|
||
}
|
||
}
|
||
done:
|
||
// 性能测试主要关注处理时间,结果数量可能因窗口触发时机而变化
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestStreamUnifiedConfigErrorHandling 测试统一配置的错误处理
|
||
func TestStreamUnifiedConfigErrorHandling(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
config types.Config
|
||
expectError bool
|
||
description string
|
||
}{
|
||
{
|
||
name: "无效窗口类型",
|
||
config: types.Config{
|
||
NeedWindow: true,
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "invalid_window_type",
|
||
Params: map[string]interface{}{
|
||
"size": "5s",
|
||
},
|
||
},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"value": aggregator.Count,
|
||
},
|
||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||
},
|
||
expectError: true,
|
||
description: "无效的窗口类型应该导致创建失败",
|
||
},
|
||
{
|
||
name: "缺少窗口大小参数",
|
||
config: types.Config{
|
||
NeedWindow: true,
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{},
|
||
},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"value": aggregator.Count,
|
||
},
|
||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||
},
|
||
expectError: true,
|
||
description: "缺少size参数应该导致创建失败",
|
||
},
|
||
{
|
||
name: "有效配置",
|
||
config: types.Config{
|
||
NeedWindow: true,
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{
|
||
"size": "5s",
|
||
},
|
||
},
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"value": aggregator.Count,
|
||
},
|
||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||
},
|
||
expectError: false,
|
||
description: "有效配置应该创建成功",
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
stream, err := NewStream(tt.config)
|
||
if tt.expectError {
|
||
assert.Error(t, err, tt.description)
|
||
assert.Nil(t, stream)
|
||
} else {
|
||
assert.NoError(t, err, tt.description)
|
||
assert.NotNil(t, stream)
|
||
if stream != nil {
|
||
defer stream.Stop()
|
||
}
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestStreamUnifiedConfigCompatibility 测试统一配置的兼容性
|
||
func TestStreamUnifiedConfigCompatibility(t *testing.T) {
|
||
// 测试新的统一配置
|
||
newConfig := types.Config{
|
||
NeedWindow: false,
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"value": aggregator.Count,
|
||
},
|
||
PerformanceConfig: types.HighPerformanceConfig(),
|
||
}
|
||
|
||
s1, err := NewStream(newConfig)
|
||
require.NoError(t, err)
|
||
defer s1.Stop()
|
||
|
||
// 验证新配置生效
|
||
expectedDataSize := types.HighPerformanceConfig().BufferConfig.DataChannelSize
|
||
assert.Equal(t, expectedDataSize, cap(s1.dataChan), "高性能配置的数据通道大小不匹配")
|
||
|
||
// 测试默认配置
|
||
defaultConfig := types.Config{
|
||
NeedWindow: false,
|
||
SelectFields: map[string]aggregator.AggregateType{
|
||
"value": aggregator.Count,
|
||
},
|
||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||
}
|
||
|
||
s2, err := NewStream(defaultConfig)
|
||
require.NoError(t, err)
|
||
defer s2.Stop()
|
||
|
||
// 验证默认配置
|
||
expectedDefaultSize := types.DefaultPerformanceConfig().BufferConfig.DataChannelSize
|
||
assert.Equal(t, expectedDefaultSize, cap(s2.dataChan), "默认配置的数据通道大小不匹配")
|
||
|
||
}
|
||
|
||
// TestStatsManagerEnhanced 测试统计管理器增强版
|
||
func TestStatsManagerEnhanced(t *testing.T) {
|
||
config := types.Config{
|
||
NeedWindow: false,
|
||
SimpleFields: []string{"value"},
|
||
}
|
||
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer stream.Stop()
|
||
|
||
// 启动流处理
|
||
stream.Start()
|
||
|
||
// 发送一些数据来生成统计信息
|
||
for i := 0; i < 10; i++ {
|
||
stream.Emit(map[string]interface{}{"value": i})
|
||
}
|
||
|
||
// 等待处理完成
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 测试基本统计
|
||
stats := stream.GetStats()
|
||
assert.Equal(t, int64(10), stats[InputCount], "输入计数不匹配")
|
||
assert.GreaterOrEqual(t, stats[OutputCount], int64(1), "输出计数应该大于等于1")
|
||
|
||
// 测试重置统计
|
||
stream.ResetStats()
|
||
stats = stream.GetStats()
|
||
assert.Equal(t, int64(0), stats[InputCount], "重置后输入计数应该为0")
|
||
}
|
||
|
||
// TestDataHandlerEnhanced 测试数据处理器增强版
|
||
func TestDataHandlerEnhanced(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
performanceConfig types.PerformanceConfig
|
||
dataCount int
|
||
expectedDrops bool
|
||
}{
|
||
{
|
||
name: "高性能配置 - 无丢弃",
|
||
performanceConfig: types.HighPerformanceConfig(),
|
||
dataCount: 100,
|
||
expectedDrops: false,
|
||
},
|
||
{
|
||
name: "低延迟配置 - 可能丢弃",
|
||
performanceConfig: types.LowLatencyConfig(),
|
||
dataCount: 1000,
|
||
expectedDrops: true,
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
config := types.Config{
|
||
NeedWindow: false,
|
||
SimpleFields: []string{"value"},
|
||
PerformanceConfig: tt.performanceConfig,
|
||
}
|
||
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer stream.Stop()
|
||
|
||
stream.Start()
|
||
|
||
// 快速发送大量数据
|
||
for i := 0; i < tt.dataCount; i++ {
|
||
stream.Emit(map[string]interface{}{"value": i})
|
||
}
|
||
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
stats := stream.GetStats()
|
||
droppedCount := stats[DroppedCount]
|
||
|
||
if tt.expectedDrops {
|
||
// 在高负载下可能会有丢弃
|
||
t.Logf("%s: 输入 %d, 丢弃 %d", tt.name, stats[InputCount], droppedCount)
|
||
} else {
|
||
// 高性能配置应该能处理所有数据
|
||
assert.Equal(t, int64(0), droppedCount, "高性能配置不应该丢弃数据")
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestResultHandlerEnhanced 测试结果处理器增强版
|
||
func TestResultHandlerEnhanced(t *testing.T) {
|
||
config := types.Config{
|
||
NeedWindow: false,
|
||
SimpleFields: []string{"value"},
|
||
}
|
||
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer stream.Stop()
|
||
|
||
// 测试Sink功能
|
||
var mu sync.Mutex
|
||
var receivedResults []interface{}
|
||
|
||
stream.AddSink(func(result []map[string]interface{}) {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
receivedResults = append(receivedResults, result)
|
||
})
|
||
|
||
stream.Start()
|
||
|
||
// 发送测试数据
|
||
testData := []map[string]interface{}{
|
||
{"value": 1},
|
||
{"value": 2},
|
||
{"value": 3},
|
||
}
|
||
|
||
for _, data := range testData {
|
||
stream.Emit(data)
|
||
}
|
||
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
// 验证结果
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
|
||
assert.GreaterOrEqual(t, len(receivedResults), len(testData), "应该接收到所有结果")
|
||
|
||
// 验证结果格式
|
||
for _, result := range receivedResults {
|
||
assert.IsType(t, []map[string]interface{}{}, result, "结果应该是map切片类型")
|
||
resultSlice := result.([]map[string]interface{})
|
||
assert.Greater(t, len(resultSlice), 0, "结果切片不应该为空")
|
||
}
|
||
}
|
||
|
||
// TestPerformanceConfigurationsEnhanced 测试不同性能配置的效果增强版
|
||
func TestPerformanceConfigurationsEnhanced(t *testing.T) {
|
||
configs := map[string]types.PerformanceConfig{
|
||
"Default": types.DefaultPerformanceConfig(),
|
||
"HighPerformance": types.HighPerformanceConfig(),
|
||
"LowLatency": types.LowLatencyConfig(),
|
||
}
|
||
|
||
for name, perfConfig := range configs {
|
||
t.Run(name, func(t *testing.T) {
|
||
config := types.Config{
|
||
NeedWindow: false,
|
||
SimpleFields: []string{"value"},
|
||
PerformanceConfig: perfConfig,
|
||
}
|
||
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer stream.Stop()
|
||
|
||
// 验证缓冲区大小
|
||
assert.Equal(t, perfConfig.BufferConfig.DataChannelSize, cap(stream.dataChan))
|
||
assert.Equal(t, perfConfig.BufferConfig.ResultChannelSize, cap(stream.resultChan))
|
||
|
||
// 验证工作池配置
|
||
assert.Equal(t, perfConfig.WorkerConfig.SinkPoolSize, cap(stream.sinkWorkerPool))
|
||
|
||
//t.Logf("%s配置: 数据通道=%d, 结果通道=%d, Sink池=%d",
|
||
// name,
|
||
// cap(stream.dataChan),
|
||
// cap(stream.resultChan),
|
||
// cap(stream.sinkWorkerPool))
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestNewStreamFactory 测试流工厂创建
|
||
func TestNewStreamFactory(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
assert.NotNil(t, factory)
|
||
}
|
||
|
||
// TestStreamFactory_CreateStream 测试创建默认配置的流
|
||
func TestStreamFactory_CreateStream(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
|
||
stream, err := factory.CreateStream(config)
|
||
require.NoError(t, err)
|
||
assert.NotNil(t, stream)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
|
||
// 验证默认性能配置已应用
|
||
assert.NotEqual(t, types.PerformanceConfig{}, stream.config.PerformanceConfig)
|
||
}
|
||
|
||
// TestStreamFactory_CreateHighPerformanceStream 测试创建高性能流
|
||
func TestStreamFactory_CreateHighPerformanceStream(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
|
||
stream, err := factory.CreateHighPerformanceStream(config)
|
||
require.NoError(t, err)
|
||
assert.NotNil(t, stream)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
|
||
// 验证高性能配置
|
||
expectedConfig := types.HighPerformanceConfig()
|
||
assert.Equal(t, expectedConfig, stream.config.PerformanceConfig)
|
||
}
|
||
|
||
// TestStreamFactory_CreateLowLatencyStream 测试创建低延迟流
|
||
func TestStreamFactory_CreateLowLatencyStream(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
|
||
stream, err := factory.CreateLowLatencyStream(config)
|
||
require.NoError(t, err)
|
||
assert.NotNil(t, stream)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
|
||
// 验证低延迟配置
|
||
expectedConfig := types.LowLatencyConfig()
|
||
assert.Equal(t, expectedConfig, stream.config.PerformanceConfig)
|
||
}
|
||
|
||
// TestStreamFactory_CreateCustomPerformanceStream 测试创建自定义性能配置流
|
||
func TestStreamFactory_CreateCustomPerformanceStream(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
|
||
customPerfConfig := types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 500,
|
||
ResultChannelSize: 200,
|
||
},
|
||
OverflowConfig: types.OverflowConfig{
|
||
Strategy: StrategyDrop,
|
||
AllowDataLoss: true,
|
||
BlockTimeout: time.Second,
|
||
},
|
||
WorkerConfig: types.WorkerConfig{
|
||
SinkWorkerCount: 4,
|
||
SinkPoolSize: 100,
|
||
MaxRetryRoutines: 2,
|
||
},
|
||
}
|
||
|
||
stream, err := factory.CreateCustomPerformanceStream(config, customPerfConfig)
|
||
require.NoError(t, err)
|
||
assert.NotNil(t, stream)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
|
||
// 验证自定义配置
|
||
assert.Equal(t, customPerfConfig, stream.config.PerformanceConfig)
|
||
}
|
||
|
||
// TestStreamFactory_CreateStreamWithWindow 测试创建带窗口的流
|
||
func TestStreamFactory_CreateStreamWithWindow(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
NeedWindow: true,
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{
|
||
"size": "5s",
|
||
},
|
||
},
|
||
}
|
||
|
||
stream, err := factory.CreateStream(config)
|
||
require.NoError(t, err)
|
||
assert.NotNil(t, stream)
|
||
assert.NotNil(t, stream.Window)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
}
|
||
|
||
// TestStreamFactory_CreateStreamWithInvalidStrategy 测试创建无效策略的流
|
||
func TestStreamFactory_CreateStreamWithInvalidStrategy(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
PerformanceConfig: types.PerformanceConfig{
|
||
OverflowConfig: types.OverflowConfig{
|
||
Strategy: "invalid_strategy",
|
||
},
|
||
},
|
||
}
|
||
|
||
_, err := factory.CreateStream(config)
|
||
// 应该报错,因为策略无效
|
||
require.Error(t, err)
|
||
assert.Contains(t, err.Error(), "invalid overflow strategy")
|
||
}
|
||
|
||
// TestStreamFactory_CreateWindow 测试窗口创建
|
||
func TestStreamFactory_CreateWindow(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
config := types.Config{
|
||
WindowConfig: types.WindowConfig{
|
||
Type: "tumbling",
|
||
Params: map[string]interface{}{
|
||
"size": "5s",
|
||
},
|
||
},
|
||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||
}
|
||
|
||
win, err := factory.createWindow(config)
|
||
require.NoError(t, err)
|
||
assert.NotNil(t, win)
|
||
}
|
||
|
||
// TestStreamFactory_CreateStreamInstance 测试流实例创建
|
||
func TestStreamFactory_CreateStreamInstance(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||
}
|
||
|
||
stream := factory.createStreamInstance(config, nil)
|
||
assert.NotNil(t, stream)
|
||
assert.NotNil(t, stream.dataChan)
|
||
assert.NotNil(t, stream.resultChan)
|
||
assert.NotNil(t, stream.done)
|
||
assert.NotNil(t, stream.sinkWorkerPool)
|
||
assert.Equal(t, config, stream.config)
|
||
}
|
||
|
||
// TestStreamFactory_Performance 测试工厂性能
|
||
func TestStreamFactory_Performance(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
|
||
// 创建多个流实例,验证工厂性能
|
||
streams := make([]*Stream, 10)
|
||
for i := 0; i < 10; i++ {
|
||
stream, err := factory.CreateStream(config)
|
||
require.NoError(t, err)
|
||
streams[i] = stream
|
||
}
|
||
|
||
// 清理
|
||
for _, stream := range streams {
|
||
stream.Stop()
|
||
}
|
||
}
|
||
|
||
// TestStreamFactory_ConcurrentCreation 测试并发创建流
|
||
func TestStreamFactory_ConcurrentCreation(t *testing.T) {
|
||
factory := NewStreamFactory()
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
|
||
const numGoroutines = 10
|
||
streams := make([]*Stream, numGoroutines)
|
||
errors := make([]error, numGoroutines)
|
||
done := make(chan struct{})
|
||
|
||
// 并发创建流
|
||
for i := 0; i < numGoroutines; i++ {
|
||
go func(index int) {
|
||
stream, err := factory.CreateStream(config)
|
||
streams[index] = stream
|
||
errors[index] = err
|
||
done <- struct{}{}
|
||
}(i)
|
||
}
|
||
|
||
// 等待所有goroutine完成
|
||
for i := 0; i < numGoroutines; i++ {
|
||
<-done
|
||
}
|
||
|
||
// 验证结果
|
||
for i := 0; i < numGoroutines; i++ {
|
||
assert.NoError(t, errors[i])
|
||
assert.NotNil(t, streams[i])
|
||
if streams[i] != nil {
|
||
streams[i].Stop()
|
||
}
|
||
}
|
||
}
|