forked from GiteaTest2015/streamsql
410 lines
9.6 KiB
Go
410 lines
9.6 KiB
Go
package stream
|
||
|
||
import (
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/rulego/streamsql/logger"
|
||
"github.com/rulego/streamsql/types"
|
||
"github.com/stretchr/testify/require"
|
||
)
|
||
|
||
// TestStrategyFactory 测试策略工厂
|
||
func TestStrategyFactory(t *testing.T) {
|
||
factory := NewStrategyFactory()
|
||
|
||
tests := []struct {
|
||
name string
|
||
strategyName string
|
||
expectedType string
|
||
}{
|
||
{
|
||
name: "Blocking Strategy",
|
||
strategyName: StrategyBlock,
|
||
expectedType: StrategyBlock,
|
||
},
|
||
{
|
||
name: "Expansion Strategy",
|
||
strategyName: StrategyExpand,
|
||
expectedType: StrategyExpand,
|
||
},
|
||
{
|
||
name: "Drop Strategy",
|
||
strategyName: StrategyDrop,
|
||
expectedType: StrategyDrop,
|
||
},
|
||
{
|
||
name: "Unknown Strategy (Default to Drop)",
|
||
strategyName: "unknown",
|
||
expectedType: StrategyDrop,
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
strategy, err := factory.CreateStrategy(tt.strategyName)
|
||
if err != nil {
|
||
t.Fatalf("Failed to create strategy: %v", err)
|
||
}
|
||
|
||
if strategy.GetStrategyName() != tt.expectedType {
|
||
t.Errorf("Expected strategy name %s, got %s", tt.expectedType, strategy.GetStrategyName())
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestStrategy_Constructor 测试策略构造函数
|
||
func TestStrategy_Constructor(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
}
|
||
|
||
// TestBlockingStrategy_ProcessData 测试阻塞策略数据处理
|
||
func TestBlockingStrategy_ProcessData(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
}
|
||
|
||
// TestExpansionStrategy_ProcessData 测试扩容策略数据处理
|
||
func TestExpansionStrategy_ProcessData(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
}
|
||
|
||
// TestPersistenceStrategy_ProcessData 测试持久化策略数据处理
|
||
|
||
// TestDropStrategy_ProcessData 测试丢弃策略数据处理
|
||
func TestDropStrategy_ProcessData(t *testing.T) {
|
||
config := types.Config{
|
||
SimpleFields: []string{"name", "age"},
|
||
}
|
||
stream, err := NewStream(config)
|
||
require.NoError(t, err)
|
||
defer func() {
|
||
if stream != nil {
|
||
close(stream.done)
|
||
}
|
||
}()
|
||
}
|
||
|
||
// TestStrategyInitialization 测试策略初始化
|
||
func TestStrategyInitialization(t *testing.T) {
|
||
// 创建测试配置
|
||
config := types.Config{
|
||
PerformanceConfig: types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 100,
|
||
ResultChannelSize: 50,
|
||
},
|
||
OverflowConfig: types.OverflowConfig{
|
||
Strategy: StrategyBlock,
|
||
AllowDataLoss: false,
|
||
BlockTimeout: time.Second * 5,
|
||
},
|
||
WorkerConfig: types.WorkerConfig{
|
||
SinkPoolSize: 10,
|
||
SinkWorkerCount: 3,
|
||
MaxRetryRoutines: 5,
|
||
},
|
||
},
|
||
}
|
||
|
||
// 创建Stream实例
|
||
stream, err := NewStream(config)
|
||
if err != nil {
|
||
t.Fatalf("Failed to create stream: %v", err)
|
||
}
|
||
defer stream.Stop()
|
||
|
||
// 验证策略是否正确设置
|
||
if stream.dataStrategy == nil {
|
||
t.Fatal("Data strategy not set")
|
||
}
|
||
|
||
if stream.dataStrategy.GetStrategyName() != StrategyBlock {
|
||
t.Errorf("Expected blocking strategy, got %s", stream.dataStrategy.GetStrategyName())
|
||
}
|
||
}
|
||
|
||
// TestStrategyProcessData 测试策略数据处理
|
||
func TestStrategyProcessData(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
strategy string
|
||
}{
|
||
{"Blocking Strategy", StrategyBlock},
|
||
{"Expansion Strategy", StrategyExpand},
|
||
{"Drop Strategy", StrategyDrop},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
// 创建测试配置
|
||
config := types.Config{
|
||
PerformanceConfig: types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 10,
|
||
ResultChannelSize: 5,
|
||
},
|
||
OverflowConfig: types.OverflowConfig{
|
||
Strategy: tt.strategy,
|
||
AllowDataLoss: true,
|
||
BlockTimeout: time.Millisecond * 100,
|
||
},
|
||
WorkerConfig: types.WorkerConfig{
|
||
SinkPoolSize: 5,
|
||
SinkWorkerCount: 2,
|
||
MaxRetryRoutines: 3,
|
||
},
|
||
},
|
||
}
|
||
|
||
// 创建Stream实例
|
||
stream, err := NewStream(config)
|
||
if err != nil {
|
||
t.Fatalf("Failed to create stream: %v", err)
|
||
}
|
||
defer stream.Stop()
|
||
|
||
// 测试数据处理
|
||
testData := map[string]interface{}{
|
||
"test": "data",
|
||
"id": 1,
|
||
}
|
||
|
||
// 这里主要测试策略能够正常调用,不会panic
|
||
stream.Emit(testData)
|
||
|
||
// 验证输入计数增加
|
||
if stream.inputCount != 1 {
|
||
t.Errorf("Expected input count 1, got %d", stream.inputCount)
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// TestStrategyCleanup 测试策略清理
|
||
func TestStrategyCleanup(t *testing.T) {
|
||
config := types.Config{
|
||
PerformanceConfig: types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 10,
|
||
ResultChannelSize: 5,
|
||
},
|
||
OverflowConfig: types.OverflowConfig{
|
||
Strategy: StrategyBlock,
|
||
AllowDataLoss: false,
|
||
BlockTimeout: time.Second,
|
||
},
|
||
WorkerConfig: types.WorkerConfig{
|
||
SinkPoolSize: 5,
|
||
SinkWorkerCount: 2,
|
||
MaxRetryRoutines: 3,
|
||
},
|
||
},
|
||
}
|
||
|
||
// 创建Stream实例
|
||
stream, err := NewStream(config)
|
||
if err != nil {
|
||
t.Fatalf("Failed to create stream: %v", err)
|
||
}
|
||
|
||
// 验证策略存在
|
||
if stream.dataStrategy == nil {
|
||
t.Fatal("Data strategy not set")
|
||
}
|
||
|
||
// 测试停止和清理
|
||
stream.Stop()
|
||
|
||
// 这里主要验证Stop方法能够正常执行,不会panic
|
||
// 实际的清理逻辑在各个策略的Cleanup方法中实现
|
||
}
|
||
|
||
// MockStrategy 模拟策略,用于测试扩展性
|
||
type MockStrategy struct {
|
||
stream *Stream
|
||
processed int
|
||
initialized bool
|
||
cleaned bool
|
||
}
|
||
|
||
// NewMockStrategy 创建模拟策略实例
|
||
func NewMockStrategy() *MockStrategy {
|
||
return &MockStrategy{}
|
||
}
|
||
|
||
// ProcessData 模拟数据处理
|
||
func (ms *MockStrategy) ProcessData(data map[string]interface{}) {
|
||
ms.processed++
|
||
// 模拟处理逻辑
|
||
}
|
||
|
||
// GetStrategyName 获取策略名称
|
||
func (ms *MockStrategy) GetStrategyName() string {
|
||
return "mock"
|
||
}
|
||
|
||
// Init 初始化模拟策略
|
||
func (ms *MockStrategy) Init(stream *Stream, config types.PerformanceConfig) error {
|
||
ms.stream = stream
|
||
ms.initialized = true
|
||
return nil
|
||
}
|
||
|
||
// Stop 停止并清理模拟策略资源
|
||
func (ms *MockStrategy) Stop() error {
|
||
ms.cleaned = true
|
||
return nil
|
||
}
|
||
|
||
// TestCustomStrategy 测试自定义策略的扩展性
|
||
func TestCustomStrategy(t *testing.T) {
|
||
// 创建自定义策略
|
||
mockStrategy := NewMockStrategy()
|
||
|
||
// 创建基本的Stream实例(不通过工厂)
|
||
stream := &Stream{
|
||
dataChan: make(chan map[string]interface{}, 10),
|
||
done: make(chan struct{}),
|
||
inputCount: 0,
|
||
outputCount: 0,
|
||
}
|
||
|
||
// 手动设置策略
|
||
config := types.PerformanceConfig{}
|
||
err := mockStrategy.Init(stream, config)
|
||
if err != nil {
|
||
t.Fatalf("Failed to init mock strategy: %v", err)
|
||
}
|
||
|
||
stream.dataStrategy = mockStrategy
|
||
|
||
// 测试策略功能
|
||
if !mockStrategy.initialized {
|
||
t.Error("Mock strategy not initialized")
|
||
}
|
||
|
||
if mockStrategy.GetStrategyName() != "mock" {
|
||
t.Errorf("Expected strategy name 'mock', got %s", mockStrategy.GetStrategyName())
|
||
}
|
||
|
||
// 测试数据处理
|
||
testData := map[string]interface{}{"test": "data"}
|
||
stream.Emit(testData)
|
||
|
||
if mockStrategy.processed != 1 {
|
||
t.Errorf("Expected processed count 1, got %d", mockStrategy.processed)
|
||
}
|
||
|
||
// 测试清理
|
||
stream.Stop()
|
||
if !mockStrategy.cleaned {
|
||
t.Error("Mock strategy not cleaned")
|
||
}
|
||
}
|
||
|
||
// TestStrategyRegistration 测试策略注册机制
|
||
func TestStrategyRegistration(t *testing.T) {
|
||
factory := NewStrategyFactory()
|
||
|
||
// 测试内置策略是否已注册
|
||
registeredStrategies := factory.GetRegisteredStrategies()
|
||
expectedStrategies := []string{StrategyBlock, StrategyExpand, StrategyDrop}
|
||
|
||
for _, expected := range expectedStrategies {
|
||
found := false
|
||
for _, registered := range registeredStrategies {
|
||
if registered == expected {
|
||
found = true
|
||
break
|
||
}
|
||
}
|
||
if !found {
|
||
t.Errorf("Expected strategy %s to be registered", expected)
|
||
}
|
||
}
|
||
|
||
// 测试注册自定义策略
|
||
factory.RegisterStrategy("kafka", func() DataProcessingStrategy {
|
||
return &KafkaStrategy{}
|
||
})
|
||
|
||
// 验证自定义策略已注册
|
||
strategy, err := factory.CreateStrategy("kafka")
|
||
if err != nil {
|
||
t.Fatalf("Failed to create kafka strategy: %v", err)
|
||
}
|
||
|
||
if strategy.GetStrategyName() != "kafka" {
|
||
t.Errorf("Expected strategy name 'kafka', got %s", strategy.GetStrategyName())
|
||
}
|
||
|
||
// 测试注销策略
|
||
factory.UnregisterStrategy("kafka")
|
||
strategy, err = factory.CreateStrategy("kafka")
|
||
if err != nil {
|
||
t.Fatalf("Failed to create default strategy: %v", err)
|
||
}
|
||
// 注销后应该返回默认的丢弃策略
|
||
if strategy.GetStrategyName() != StrategyDrop {
|
||
t.Errorf("Expected default strategy '%s', got %s", StrategyDrop, strategy.GetStrategyName())
|
||
}
|
||
}
|
||
|
||
// KafkaStrategy Kafka削峰策略示例
|
||
type KafkaStrategy struct {
|
||
stream *Stream
|
||
}
|
||
|
||
// ProcessData 实现Kafka削峰数据处理
|
||
func (ks *KafkaStrategy) ProcessData(data map[string]interface{}) {
|
||
// 模拟Kafka削峰逻辑
|
||
// 实际实现中会将数据发送到Kafka队列
|
||
logger.Debug("Data sent to Kafka for peak shaving")
|
||
}
|
||
|
||
// GetStrategyName 获取策略名称
|
||
func (ks *KafkaStrategy) GetStrategyName() string {
|
||
return "kafka"
|
||
}
|
||
|
||
// Init 初始化Kafka策略
|
||
func (ks *KafkaStrategy) Init(stream *Stream, config types.PerformanceConfig) error {
|
||
ks.stream = stream
|
||
// 这里可以初始化Kafka连接等
|
||
return nil
|
||
}
|
||
|
||
// Stop 停止并清理Kafka策略资源
|
||
func (ks *KafkaStrategy) Stop() error {
|
||
// 这里可以关闭Kafka连接等
|
||
return nil
|
||
}
|