Files
2025-08-07 19:23:48 +08:00

410 lines
9.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package stream
import (
"testing"
"time"
"github.com/rulego/streamsql/logger"
"github.com/rulego/streamsql/types"
"github.com/stretchr/testify/require"
)
// TestStrategyFactory 测试策略工厂
func TestStrategyFactory(t *testing.T) {
factory := NewStrategyFactory()
tests := []struct {
name string
strategyName string
expectedType string
}{
{
name: "Blocking Strategy",
strategyName: StrategyBlock,
expectedType: StrategyBlock,
},
{
name: "Expansion Strategy",
strategyName: StrategyExpand,
expectedType: StrategyExpand,
},
{
name: "Drop Strategy",
strategyName: StrategyDrop,
expectedType: StrategyDrop,
},
{
name: "Unknown Strategy (Default to Drop)",
strategyName: "unknown",
expectedType: StrategyDrop,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
strategy, err := factory.CreateStrategy(tt.strategyName)
if err != nil {
t.Fatalf("Failed to create strategy: %v", err)
}
if strategy.GetStrategyName() != tt.expectedType {
t.Errorf("Expected strategy name %s, got %s", tt.expectedType, strategy.GetStrategyName())
}
})
}
}
// TestStrategy_Constructor 测试策略构造函数
func TestStrategy_Constructor(t *testing.T) {
config := types.Config{
SimpleFields: []string{"name", "age"},
}
stream, err := NewStream(config)
require.NoError(t, err)
defer func() {
if stream != nil {
close(stream.done)
}
}()
}
// TestBlockingStrategy_ProcessData 测试阻塞策略数据处理
func TestBlockingStrategy_ProcessData(t *testing.T) {
config := types.Config{
SimpleFields: []string{"name", "age"},
}
stream, err := NewStream(config)
require.NoError(t, err)
defer func() {
if stream != nil {
close(stream.done)
}
}()
}
// TestExpansionStrategy_ProcessData 测试扩容策略数据处理
func TestExpansionStrategy_ProcessData(t *testing.T) {
config := types.Config{
SimpleFields: []string{"name", "age"},
}
stream, err := NewStream(config)
require.NoError(t, err)
defer func() {
if stream != nil {
close(stream.done)
}
}()
}
// TestPersistenceStrategy_ProcessData 测试持久化策略数据处理
// TestDropStrategy_ProcessData 测试丢弃策略数据处理
func TestDropStrategy_ProcessData(t *testing.T) {
config := types.Config{
SimpleFields: []string{"name", "age"},
}
stream, err := NewStream(config)
require.NoError(t, err)
defer func() {
if stream != nil {
close(stream.done)
}
}()
}
// TestStrategyInitialization 测试策略初始化
func TestStrategyInitialization(t *testing.T) {
// 创建测试配置
config := types.Config{
PerformanceConfig: types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 100,
ResultChannelSize: 50,
},
OverflowConfig: types.OverflowConfig{
Strategy: StrategyBlock,
AllowDataLoss: false,
BlockTimeout: time.Second * 5,
},
WorkerConfig: types.WorkerConfig{
SinkPoolSize: 10,
SinkWorkerCount: 3,
MaxRetryRoutines: 5,
},
},
}
// 创建Stream实例
stream, err := NewStream(config)
if err != nil {
t.Fatalf("Failed to create stream: %v", err)
}
defer stream.Stop()
// 验证策略是否正确设置
if stream.dataStrategy == nil {
t.Fatal("Data strategy not set")
}
if stream.dataStrategy.GetStrategyName() != StrategyBlock {
t.Errorf("Expected blocking strategy, got %s", stream.dataStrategy.GetStrategyName())
}
}
// TestStrategyProcessData 测试策略数据处理
func TestStrategyProcessData(t *testing.T) {
tests := []struct {
name string
strategy string
}{
{"Blocking Strategy", StrategyBlock},
{"Expansion Strategy", StrategyExpand},
{"Drop Strategy", StrategyDrop},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 创建测试配置
config := types.Config{
PerformanceConfig: types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 10,
ResultChannelSize: 5,
},
OverflowConfig: types.OverflowConfig{
Strategy: tt.strategy,
AllowDataLoss: true,
BlockTimeout: time.Millisecond * 100,
},
WorkerConfig: types.WorkerConfig{
SinkPoolSize: 5,
SinkWorkerCount: 2,
MaxRetryRoutines: 3,
},
},
}
// 创建Stream实例
stream, err := NewStream(config)
if err != nil {
t.Fatalf("Failed to create stream: %v", err)
}
defer stream.Stop()
// 测试数据处理
testData := map[string]interface{}{
"test": "data",
"id": 1,
}
// 这里主要测试策略能够正常调用不会panic
stream.Emit(testData)
// 验证输入计数增加
if stream.inputCount != 1 {
t.Errorf("Expected input count 1, got %d", stream.inputCount)
}
})
}
}
// TestStrategyCleanup 测试策略清理
func TestStrategyCleanup(t *testing.T) {
config := types.Config{
PerformanceConfig: types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 10,
ResultChannelSize: 5,
},
OverflowConfig: types.OverflowConfig{
Strategy: StrategyBlock,
AllowDataLoss: false,
BlockTimeout: time.Second,
},
WorkerConfig: types.WorkerConfig{
SinkPoolSize: 5,
SinkWorkerCount: 2,
MaxRetryRoutines: 3,
},
},
}
// 创建Stream实例
stream, err := NewStream(config)
if err != nil {
t.Fatalf("Failed to create stream: %v", err)
}
// 验证策略存在
if stream.dataStrategy == nil {
t.Fatal("Data strategy not set")
}
// 测试停止和清理
stream.Stop()
// 这里主要验证Stop方法能够正常执行不会panic
// 实际的清理逻辑在各个策略的Cleanup方法中实现
}
// MockStrategy 模拟策略,用于测试扩展性
type MockStrategy struct {
stream *Stream
processed int
initialized bool
cleaned bool
}
// NewMockStrategy 创建模拟策略实例
func NewMockStrategy() *MockStrategy {
return &MockStrategy{}
}
// ProcessData 模拟数据处理
func (ms *MockStrategy) ProcessData(data map[string]interface{}) {
ms.processed++
// 模拟处理逻辑
}
// GetStrategyName 获取策略名称
func (ms *MockStrategy) GetStrategyName() string {
return "mock"
}
// Init 初始化模拟策略
func (ms *MockStrategy) Init(stream *Stream, config types.PerformanceConfig) error {
ms.stream = stream
ms.initialized = true
return nil
}
// Stop 停止并清理模拟策略资源
func (ms *MockStrategy) Stop() error {
ms.cleaned = true
return nil
}
// TestCustomStrategy 测试自定义策略的扩展性
func TestCustomStrategy(t *testing.T) {
// 创建自定义策略
mockStrategy := NewMockStrategy()
// 创建基本的Stream实例不通过工厂
stream := &Stream{
dataChan: make(chan map[string]interface{}, 10),
done: make(chan struct{}),
inputCount: 0,
outputCount: 0,
}
// 手动设置策略
config := types.PerformanceConfig{}
err := mockStrategy.Init(stream, config)
if err != nil {
t.Fatalf("Failed to init mock strategy: %v", err)
}
stream.dataStrategy = mockStrategy
// 测试策略功能
if !mockStrategy.initialized {
t.Error("Mock strategy not initialized")
}
if mockStrategy.GetStrategyName() != "mock" {
t.Errorf("Expected strategy name 'mock', got %s", mockStrategy.GetStrategyName())
}
// 测试数据处理
testData := map[string]interface{}{"test": "data"}
stream.Emit(testData)
if mockStrategy.processed != 1 {
t.Errorf("Expected processed count 1, got %d", mockStrategy.processed)
}
// 测试清理
stream.Stop()
if !mockStrategy.cleaned {
t.Error("Mock strategy not cleaned")
}
}
// TestStrategyRegistration 测试策略注册机制
func TestStrategyRegistration(t *testing.T) {
factory := NewStrategyFactory()
// 测试内置策略是否已注册
registeredStrategies := factory.GetRegisteredStrategies()
expectedStrategies := []string{StrategyBlock, StrategyExpand, StrategyDrop}
for _, expected := range expectedStrategies {
found := false
for _, registered := range registeredStrategies {
if registered == expected {
found = true
break
}
}
if !found {
t.Errorf("Expected strategy %s to be registered", expected)
}
}
// 测试注册自定义策略
factory.RegisterStrategy("kafka", func() DataProcessingStrategy {
return &KafkaStrategy{}
})
// 验证自定义策略已注册
strategy, err := factory.CreateStrategy("kafka")
if err != nil {
t.Fatalf("Failed to create kafka strategy: %v", err)
}
if strategy.GetStrategyName() != "kafka" {
t.Errorf("Expected strategy name 'kafka', got %s", strategy.GetStrategyName())
}
// 测试注销策略
factory.UnregisterStrategy("kafka")
strategy, err = factory.CreateStrategy("kafka")
if err != nil {
t.Fatalf("Failed to create default strategy: %v", err)
}
// 注销后应该返回默认的丢弃策略
if strategy.GetStrategyName() != StrategyDrop {
t.Errorf("Expected default strategy '%s', got %s", StrategyDrop, strategy.GetStrategyName())
}
}
// KafkaStrategy Kafka削峰策略示例
type KafkaStrategy struct {
stream *Stream
}
// ProcessData 实现Kafka削峰数据处理
func (ks *KafkaStrategy) ProcessData(data map[string]interface{}) {
// 模拟Kafka削峰逻辑
// 实际实现中会将数据发送到Kafka队列
logger.Debug("Data sent to Kafka for peak shaving")
}
// GetStrategyName 获取策略名称
func (ks *KafkaStrategy) GetStrategyName() string {
return "kafka"
}
// Init 初始化Kafka策略
func (ks *KafkaStrategy) Init(stream *Stream, config types.PerformanceConfig) error {
ks.stream = stream
// 这里可以初始化Kafka连接等
return nil
}
// Stop 停止并清理Kafka策略资源
func (ks *KafkaStrategy) Stop() error {
// 这里可以关闭Kafka连接等
return nil
}