refactor:删除持久化策略

This commit is contained in:
rulego-team
2025-08-06 18:11:44 +08:00
parent 6f5305ca01
commit 57983f19d7
21 changed files with 17 additions and 4095 deletions
-85
View File
@@ -1,85 +0,0 @@
# StreamSQL 持久化功能示例
本示例演示了 StreamSQL 的持久化功能,包括有序持久化机制、数据恢复和零数据丢失配置。
## 功能特性
### 1. 有序持久化机制
- 通过全局序列号保证数据时序性
- 当内存通道满时,数据按序持久化到磁盘
- 避免传统持久化中可能出现的数据乱序问题
- 实现真正的先进先出(FIFO)数据处理
### 2. 数据溢出持久化
- 演示小文件大小配置下的文件轮转
- 快速发送大量数据触发文件轮转机制
- 展示持久化统计信息
### 3. 程序重启数据恢复
- 模拟程序重启场景
- 从磁盘加载之前持久化的数据
- 按原始顺序恢复数据处理
- 验证数据完整性
### 4. 零数据丢失配置
- 高频刷新确保数据安全
- 关键数据持久化演示
- 数据完整性验证
### 5. 持久化文件分析
- 自动扫描持久化目录
- 显示文件信息(大小、修改时间)
- 读取并展示文件内容
## 运行示例
```bash
cd examples/persistence
go run main.go
```
## 输出说明
示例运行后会在当前目录下创建以下目录:
- `./persistence_data` - 基本持久化数据
- `./streamsql_overflow_data` - 溢出持久化数据
- `./zero_loss_data` - 零数据丢失配置数据
每个目录包含以 `streamsql_ordered_` 开头的日志文件,文件内容为 JSON 格式的持久化数据。
## 核心 API
### PersistenceManager
```go
// 创建持久化管理器
pm := stream.NewPersistenceManager(dataDir)
// 创建带配置的持久化管理器
pm := stream.NewPersistenceManagerWithConfig(dataDir, maxFileSize, flushInterval)
// 启动管理器
err := pm.Start()
// 持久化数据
err := pm.PersistData(data)
// 加载并恢复数据
err := pm.LoadAndRecoverData()
// 获取恢复数据
data, hasMore := pm.GetRecoveryData()
// 获取统计信息
stats := pm.GetStats()
// 停止管理器
pm.Stop()
```
## 注意事项
1. 确保有足够的磁盘空间用于持久化数据
2. 持久化目录会自动创建,无需手动创建
3. 程序退出时会自动清理资源
4. 建议在生产环境中根据实际需求调整文件大小和刷新间隔
File diff suppressed because it is too large Load Diff
-218
View File
@@ -1,218 +0,0 @@
package main
import (
"fmt"
"log"
"strings"
"time"
"github.com/rulego/streamsql/stream"
"github.com/rulego/streamsql/types"
)
func main() {
fmt.Println("=== StreamSQL 统一配置系统演示 ===")
// 1. 使用新的配置API创建默认配置Stream
fmt.Println("\n1. 默认配置Stream:")
defaultConfig := types.NewConfig()
defaultConfig.SimpleFields = []string{"temperature", "humidity", "location"}
defaultStream, err := stream.NewStream(defaultConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("默认配置", defaultStream)
// 2. 使用高性能预设配置
fmt.Println("\n2. 高性能配置Stream:")
highPerfConfig := types.NewConfigWithPerformance(types.HighPerformanceConfig())
highPerfConfig.SimpleFields = []string{"temperature", "humidity", "location"}
highPerfStream, err := stream.NewStreamWithHighPerformance(highPerfConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("高性能配置", highPerfStream)
// 3. 使用低延迟预设配置
fmt.Println("\n3. 低延迟配置Stream:")
lowLatencyConfig := types.NewConfigWithPerformance(types.LowLatencyConfig())
lowLatencyConfig.SimpleFields = []string{"temperature", "humidity", "location"}
lowLatencyStream, err := stream.NewStreamWithLowLatency(lowLatencyConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("低延迟配置", lowLatencyStream)
// 4. 使用零数据丢失预设配置
fmt.Println("\n4. 零数据丢失配置Stream:")
zeroLossConfig := types.NewConfigWithPerformance(types.ZeroDataLossConfig())
zeroLossConfig.SimpleFields = []string{"temperature", "humidity", "location"}
zeroLossStream, err := stream.NewStreamWithZeroDataLoss(zeroLossConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("零数据丢失配置", zeroLossStream)
// 5. 使用持久化预设配置
fmt.Println("\n5. 持久化配置Stream:")
persistConfig := types.NewConfigWithPerformance(types.PersistencePerformanceConfig())
persistConfig.SimpleFields = []string{"temperature", "humidity", "location"}
persistStream, err := stream.NewStreamWithCustomPerformance(persistConfig, types.PersistencePerformanceConfig())
if err != nil {
log.Fatal(err)
}
printStreamStats("持久化配置", persistStream)
// 6. 创建完全自定义的配置
fmt.Println("\n6. 自定义配置Stream:")
customPerfConfig := types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 30000,
ResultChannelSize: 25000,
WindowOutputSize: 3000,
EnableDynamicResize: true,
MaxBufferSize: 200000,
UsageThreshold: 0.85,
},
OverflowConfig: types.OverflowConfig{
Strategy: "expand",
BlockTimeout: 15 * time.Second,
AllowDataLoss: false,
ExpansionConfig: types.ExpansionConfig{
GrowthFactor: 2.0,
MinIncrement: 2000,
TriggerThreshold: 0.9,
ExpansionTimeout: 3 * time.Second,
},
},
WorkerConfig: types.WorkerConfig{
SinkPoolSize: 800,
SinkWorkerCount: 12,
MaxRetryRoutines: 10,
},
MonitoringConfig: types.MonitoringConfig{
EnableMonitoring: true,
StatsUpdateInterval: 500 * time.Millisecond,
EnableDetailedStats: true,
WarningThresholds: types.WarningThresholds{
DropRateWarning: 5.0,
DropRateCritical: 15.0,
BufferUsageWarning: 75.0,
BufferUsageCritical: 90.0,
},
},
}
customConfig := types.NewConfigWithPerformance(customPerfConfig)
customConfig.SimpleFields = []string{"temperature", "humidity", "location"}
customStream, err := stream.NewStreamWithCustomPerformance(customConfig, customPerfConfig)
if err != nil {
log.Fatal(err)
}
printStreamStats("自定义配置", customStream)
// 7. 配置比较演示
fmt.Println("\n7. 配置比较:")
compareConfigurations()
// 8. 实时数据处理演示
fmt.Println("\n8. 实时数据处理演示:")
demonstrateRealTimeProcessing(defaultStream)
// 9. 窗口统一配置演示
fmt.Println("\n9. 窗口统一配置演示:")
demonstrateWindowConfig()
// 清理资源
fmt.Println("\n10. 清理资源...")
defaultStream.Stop()
highPerfStream.Stop()
lowLatencyStream.Stop()
zeroLossStream.Stop()
persistStream.Stop()
customStream.Stop()
fmt.Println("\n=== 演示完成 ===")
}
func printStreamStats(name string, s *stream.Stream) {
stats := s.GetStats()
detailedStats := s.GetDetailedStats()
fmt.Printf("【%s】统计信息:\n", name)
fmt.Printf(" 数据通道: %d/%d (使用率: %.1f%%)\n",
stats["data_chan_len"], stats["data_chan_cap"],
detailedStats["data_chan_usage"])
fmt.Printf(" 结果通道: %d/%d (使用率: %.1f%%)\n",
stats["result_chan_len"], stats["result_chan_cap"],
detailedStats["result_chan_usage"])
fmt.Printf(" 工作池: %d/%d (使用率: %.1f%%)\n",
stats["sink_pool_len"], stats["sink_pool_cap"],
detailedStats["sink_pool_usage"])
fmt.Printf(" 性能等级: %s\n", detailedStats["performance_level"])
}
func compareConfigurations() {
configs := map[string]types.PerformanceConfig{
"默认配置": types.DefaultPerformanceConfig(),
"高性能配置": types.HighPerformanceConfig(),
"低延迟配置": types.LowLatencyConfig(),
"零丢失配置": types.ZeroDataLossConfig(),
"持久化配置": types.PersistencePerformanceConfig(),
}
fmt.Printf("%-12s %-10s %-10s %-10s %-10s %-15s\n",
"配置类型", "数据缓冲", "结果缓冲", "工作池", "工作线程", "溢出策略")
fmt.Println(strings.Repeat("-", 75))
for name, config := range configs {
fmt.Printf("%-12s %-10d %-10d %-10d %-10d %-15s\n",
name,
config.BufferConfig.DataChannelSize,
config.BufferConfig.ResultChannelSize,
config.WorkerConfig.SinkPoolSize,
config.WorkerConfig.SinkWorkerCount,
config.OverflowConfig.Strategy)
}
}
func demonstrateRealTimeProcessing(s *stream.Stream) {
// 设置数据接收器
s.AddSink(func(data []map[string]interface{}) {
fmt.Printf(" 接收到处理结果: %v\n", data)
})
// 启动流处理
s.Start()
// 模拟发送数据
for i := 0; i < 3; i++ {
data := map[string]interface{}{
"temperature": 20.0 + float64(i)*2.5,
"humidity": 60.0 + float64(i)*5,
"location": fmt.Sprintf("sensor_%d", i+1),
"timestamp": time.Now().Unix(),
}
fmt.Printf(" 发送数据: %v\n", data)
s.Emit(data)
time.Sleep(100 * time.Millisecond)
}
// 等待处理完成
time.Sleep(200 * time.Millisecond)
// 显示最终统计
finalStats := s.GetDetailedStats()
fmt.Printf(" 最终统计 - 输入: %d, 输出: %d, 丢弃: %d, 处理率: %.1f%%\n",
finalStats["basic_stats"].(map[string]int64)["input_count"],
finalStats["basic_stats"].(map[string]int64)["output_count"],
finalStats["basic_stats"].(map[string]int64)["dropped_count"],
finalStats["process_rate"])
}
@@ -1,74 +0,0 @@
package main
import (
"fmt"
"time"
"github.com/rulego/streamsql"
"github.com/rulego/streamsql/types"
)
// demonstrateWindowConfig 演示窗口统一配置的使用
func demonstrateWindowConfig() {
fmt.Println("=== 窗口统一配置演示 ===")
// 1. 测试默认配置的窗口
fmt.Println("\n1. 默认配置窗口测试")
testWindowWithConfig("默认配置", streamsql.New())
// 2. 测试高性能配置的窗口
fmt.Println("\n2. 高性能配置窗口测试")
testWindowWithConfig("高性能配置", streamsql.New(streamsql.WithHighPerformance()))
// 3. 测试低延迟配置的窗口
fmt.Println("\n3. 低延迟配置窗口测试")
testWindowWithConfig("低延迟配置", streamsql.New(streamsql.WithLowLatency()))
// 4. 测试自定义配置的窗口
fmt.Println("\n4. 自定义配置窗口测试")
customConfig := types.DefaultPerformanceConfig()
customConfig.BufferConfig.WindowOutputSize = 2000 // 自定义窗口输出缓冲区大小
testWindowWithConfig("自定义配置", streamsql.New(streamsql.WithCustomPerformance(customConfig)))
fmt.Println("\n=== 窗口配置演示完成 ===")
}
func testWindowWithConfig(configName string, ssql *streamsql.Streamsql) {
// 执行一个简单的滚动窗口查询
sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('2s')"
err := ssql.Execute(sql)
if err != nil {
fmt.Printf("❌ %s - 执行SQL失败: %v\n", configName, err)
return
}
// 添加结果处理器
stream := ssql.Stream()
if stream != nil {
stream.AddSink(func(result []map[string]interface{}) {
fmt.Printf("📊 %s - 窗口结果: %v\n", configName, result)
})
// 发送测试数据
for i := 0; i < 5; i++ {
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device_%d", i%2),
"temperature": 20.0 + float64(i),
"timestamp": time.Now(),
}
ssql.Emit(data)
}
// 等待处理完成
time.Sleep(3 * time.Second)
// 获取统计信息
stats := ssql.GetDetailedStats()
fmt.Printf("📈 %s - 统计信息: %v\n", configName, stats)
}
// 停止流处理
ssql.Stop()
fmt.Printf("✅ %s - 测试完成\n", configName)
}
-26
View File
@@ -72,32 +72,6 @@ func WithCustomPerformance(config types.PerformanceConfig) Option {
}
}
// WithPersistence uses persistence configuration preset
func WithPersistence() Option {
return func(s *Streamsql) {
s.performanceMode = "custom"
persistConfig := types.PersistencePerformanceConfig()
s.customConfig = &persistConfig
}
}
// WithCustomPersistence uses custom persistence configuration
func WithCustomPersistence(dataDir string, maxFileSize int64, flushInterval time.Duration) Option {
return func(s *Streamsql) {
s.performanceMode = "custom"
config := types.DefaultPerformanceConfig()
config.OverflowConfig.Strategy = "persist"
config.OverflowConfig.PersistenceConfig = &types.PersistenceConfig{
DataDir: dataDir,
MaxFileSize: maxFileSize,
FlushInterval: flushInterval,
MaxRetries: 3,
RetryInterval: 2 * time.Second,
}
s.customConfig = &config
}
}
// WithBufferSizes sets custom buffer sizes
func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option {
return func(s *Streamsql) {
-33
View File
@@ -114,39 +114,6 @@ func TestWithCustomPerformance(t *testing.T) {
})
}
// TestWithPersistence 测试持久化配置选项
func TestWithPersistence(t *testing.T) {
t.Run("设置持久化模式", func(t *testing.T) {
s := New(WithPersistence())
assert.Equal(t, "custom", s.performanceMode)
assert.NotNil(t, s.customConfig)
// 验证持久化配置是否正确设置
assert.Equal(t, "persist", s.customConfig.OverflowConfig.Strategy)
})
}
// TestWithCustomPersistence 测试自定义持久化配置选项
func TestWithCustomPersistence(t *testing.T) {
t.Run("设置自定义持久化配置", func(t *testing.T) {
dataDir := "./test_data"
maxFileSize := int64(50 * 1024 * 1024) // 50MB
flushInterval := 10 * time.Second
s := New(WithCustomPersistence(dataDir, maxFileSize, flushInterval))
assert.Equal(t, "custom", s.performanceMode)
assert.NotNil(t, s.customConfig)
assert.Equal(t, "persist", s.customConfig.OverflowConfig.Strategy)
assert.NotNil(t, s.customConfig.OverflowConfig.PersistenceConfig)
assert.Equal(t, dataDir, s.customConfig.OverflowConfig.PersistenceConfig.DataDir)
assert.Equal(t, maxFileSize, s.customConfig.OverflowConfig.PersistenceConfig.MaxFileSize)
assert.Equal(t, flushInterval, s.customConfig.OverflowConfig.PersistenceConfig.FlushInterval)
assert.Equal(t, 3, s.customConfig.OverflowConfig.PersistenceConfig.MaxRetries)
assert.Equal(t, 2*time.Second, s.customConfig.OverflowConfig.PersistenceConfig.RetryInterval)
})
}
// TestWithBufferSizes 测试缓冲区大小配置选项
func TestWithBufferSizes(t *testing.T) {
t.Run("设置自定义缓冲区大小", func(t *testing.T) {
+1 -19
View File
@@ -44,7 +44,7 @@ The stream processing pipeline consists of several key components:
config types.Config // Stream configuration
sinks []func([]map[string]interface{}) // Result processors
resultChan chan []map[string]interface{} // Result channel
persistenceManager *PersistenceManager // Data persistence
dataStrategy DataProcessingStrategy // Data processing strategy
}
@@ -176,25 +176,7 @@ Comprehensive performance monitoring:
fmt.Printf("Throughput: %.2f records/sec\n", detailed["throughput"])
fmt.Printf("Memory Usage: %d bytes\n", detailed["memory_usage"])
# Persistence and Reliability
Optional data persistence for enhanced reliability:
type PersistenceManager struct {
enabled bool
storageType string // "memory", "file", "database"
batchSize int // Persistence batch size
flushInterval time.Duration // Automatic flush interval
recoveryMode string // Recovery strategy
}
// Enable persistence
stream.EnablePersistence(PersistenceConfig{
StorageType: "file",
BatchSize: 100,
FlushInterval: 5 * time.Second,
RecoveryMode: "automatic",
})
# Backpressure Management
-101
View File
@@ -121,104 +121,3 @@ migration_done:
logger.Debug("Channel expansion completed: migrated %d items", migratedCount)
}
// checkAndProcessRecoveryData recovery data processing
// Solves overflow leakage issues, implements exponential backoff and retry limits
func (s *Stream) checkAndProcessRecoveryData() {
// Prevent duplicate recovery goroutines
if atomic.LoadInt32(&s.activeRetries) >= s.maxRetryRoutines {
return
}
atomic.AddInt32(&s.activeRetries, 1)
defer atomic.AddInt32(&s.activeRetries, -1)
// Check if persistence manager exists
if s.persistenceManager == nil {
return
}
// Backoff strategy parameters
baseBackoff := 100 * time.Millisecond
maxBackoff := 5 * time.Second
currentBackoff := baseBackoff
consecutiveFailures := 0
maxConsecutiveFailures := 10
// Continuously check recovery data until no more data or Stream stops
ticker := time.NewTicker(currentBackoff)
defer ticker.Stop()
maxProcessTime := 30 * time.Second // Maximum processing time
timeout := time.NewTimer(maxProcessTime)
defer timeout.Stop()
processedCount := 0
droppedCount := 0
for {
select {
case <-ticker.C:
// Try to get recovery data
if recoveredData, hasData := s.persistenceManager.GetRecoveryData(); hasData {
// Try to send recovery data to processing channel
if s.safeSendToDataChan(recoveredData) {
processedCount++
consecutiveFailures = 0
// Reset backoff time
currentBackoff = baseBackoff
ticker.Reset(currentBackoff)
logger.Debug("Successfully processed recovered data item %d", processedCount)
} else {
consecutiveFailures++
// Check if this data should be retried
if !s.persistenceManager.ShouldRetryRecoveredData(recoveredData) {
// Exceeded retry limit, move to dead letter queue
logger.Warn("Recovered data exceeded retry limit, moving to dead letter queue")
s.persistenceManager.MoveToDeadLetterQueue(recoveredData)
droppedCount++
} else {
// Re-persist this data (increment retry count)
if err := s.persistenceManager.RePersistRecoveredData(recoveredData); err != nil {
logger.Error("Failed to re-persist recovered data: %v", err)
atomic.AddInt64(&s.droppedCount, 1)
droppedCount++
}
}
// Implement exponential backoff
if consecutiveFailures >= maxConsecutiveFailures {
logger.Warn("Too many consecutive failures (%d), stopping recovery processing", consecutiveFailures)
return
}
// Increase backoff time
currentBackoff = time.Duration(float64(currentBackoff) * 1.5)
if currentBackoff > maxBackoff {
currentBackoff = maxBackoff
}
ticker.Reset(currentBackoff)
logger.Debug("Channel full, backing off for %v (failure #%d)", currentBackoff, consecutiveFailures)
}
} else {
// No more recovery data, check if still in recovery mode
if !s.persistenceManager.IsInRecoveryMode() {
logger.Info("Recovery completed: processed %d items, dropped %d items", processedCount, droppedCount)
return
}
// Reset backoff time when no data
currentBackoff = baseBackoff
ticker.Reset(currentBackoff)
}
case <-timeout.C:
logger.Warn("Recovery processing timeout reached: processed %d items, dropped %d items", processedCount, droppedCount)
return
case <-s.done:
logger.Info("Stream stopped during recovery processing: processed %d items, dropped %d items", processedCount, droppedCount)
return
}
}
}
-5
View File
@@ -85,11 +85,6 @@ func (s *Stream) GetDetailedStats() map[string]interface{} {
PerformanceLevel: AssessPerformanceLevel(dataUsage, dropRate),
}
// Add persistence statistics
if s.persistenceManager != nil {
result["Persistence"] = s.persistenceManager.GetStats()
}
return result
}
-38
View File
@@ -803,44 +803,6 @@ func TestStream_GetDetailedStats_ZeroInput(t *testing.T) {
assert.Equal(t, PerformanceLevelOptimal, perfLevel)
}
// TestStream_GetDetailedStats_WithPersistence 测试带持久化的详细统计
func TestStream_GetDetailedStats_WithPersistence(t *testing.T) {
// 创建临时目录用于持久化
tempDir := t.TempDir()
config := types.Config{
SimpleFields: []string{"name", "age"},
PerformanceConfig: types.PerformanceConfig{
OverflowConfig: types.OverflowConfig{
Strategy: "persist",
PersistenceConfig: &types.PersistenceConfig{
DataDir: tempDir,
MaxFileSize: 1024 * 1024, // 1MB
FlushInterval: 100 * time.Millisecond,
},
},
},
}
stream, err := NewStream(config)
require.NoError(t, err)
defer func() {
if stream != nil {
if stream.persistenceManager != nil {
stream.persistenceManager.Stop()
}
close(stream.done)
}
}()
detailedStats := stream.GetDetailedStats()
// 验证持久化统计信息存在
assert.Contains(t, detailedStats, "Persistence")
persistenceStats, ok := detailedStats["Persistence"].(map[string]interface{})
assert.True(t, ok)
assert.NotNil(t, persistenceStats)
}
// TestStream_ResetStats 测试重置统计信息
func TestStream_ResetStats(t *testing.T) {
config := types.Config{
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+3 -93
View File
@@ -28,10 +28,9 @@ import (
// Overflow strategy constants
const (
StrategyDrop = "drop" // Drop strategy
StrategyBlock = "block" // Blocking strategy
StrategyExpand = "expand" // Dynamic strategy
StrategyPersist = "persist" // Persistence strategy (todo: incomplete)
StrategyDrop = "drop" // Drop strategy
StrategyBlock = "block" // Blocking strategy
StrategyExpand = "expand" // Dynamic strategy
)
// DataProcessingStrategy data processing strategy interface
@@ -155,95 +154,7 @@ func (es *ExpansionStrategy) Stop() error {
return nil
}
// PersistenceStrategy 持久化策略实现
type PersistenceStrategy struct {
stream *Stream
}
// NewPersistenceStrategy 创建持久化策略实例
func NewPersistenceStrategy() *PersistenceStrategy {
return &PersistenceStrategy{}
}
// ProcessData 实现持久化模式数据处理
func (ps *PersistenceStrategy) ProcessData(data map[string]interface{}) {
// 检查是否处于恢复模式,如果是则优先处理恢复数据
if ps.stream.persistenceManager != nil && ps.stream.persistenceManager.IsInRecoveryMode() {
// 恢复模式下,先尝试处理恢复数据
if recoveredData, hasData := ps.stream.persistenceManager.GetRecoveryData(); hasData {
// 优先处理恢复的数据
if ps.stream.safeSendToDataChan(recoveredData) {
// 恢复数据处理成功,现在处理新数据
if ps.stream.safeSendToDataChan(data) {
return
}
// 新数据无法处理,持久化(带重试限制)
if err := ps.stream.persistenceManager.PersistDataWithRetryLimit(data, 0); err != nil {
logger.Error("Failed to persist new data during recovery: %v", err)
atomic.AddInt64(&ps.stream.droppedCount, 1)
}
return
} else {
// 恢复数据也无法处理,检查重试次数避免无限循环
if !ps.stream.persistenceManager.ShouldRetryRecoveredData(recoveredData) {
// 超过重试限制,移入死信队列
logger.Warn("Recovered data exceeded retry limit, moving to dead letter queue")
ps.stream.persistenceManager.MoveToDeadLetterQueue(recoveredData)
} else {
// 重新持久化恢复数据(增加重试计数)
if err := ps.stream.persistenceManager.RePersistRecoveredData(recoveredData); err != nil {
logger.Error("Failed to re-persist recovered data: %v", err)
}
}
// 持久化新数据
if err := ps.stream.persistenceManager.PersistDataWithRetryLimit(data, 0); err != nil {
logger.Error("Failed to persist new data: %v", err)
atomic.AddInt64(&ps.stream.droppedCount, 1)
}
return
}
}
}
// 正常模式或非恢复模式,首次尝试添加数据
if ps.stream.safeSendToDataChan(data) {
return
}
// 通道满了,使用持久化(带重试限制)
if ps.stream.persistenceManager != nil {
if err := ps.stream.persistenceManager.PersistDataWithRetryLimit(data, 0); err != nil {
logger.Error("Failed to persist data with persistence: %v", err)
atomic.AddInt64(&ps.stream.droppedCount, 1)
} else {
logger.Debug("Data has been persisted to disk with sequence ordering")
}
} else {
logger.Error("Persistence manager not initialized, data will be lost")
atomic.AddInt64(&ps.stream.droppedCount, 1)
}
// 启动异步恢复检查(防止重复启动)
if atomic.LoadInt32(&ps.stream.activeRetries) < ps.stream.maxRetryRoutines {
go ps.stream.checkAndProcessRecoveryData()
}
}
// GetStrategyName 获取策略名称
func (ps *PersistenceStrategy) GetStrategyName() string {
return StrategyPersist
}
// Init 初始化持久化策略
func (ps *PersistenceStrategy) Init(stream *Stream, config types.PerformanceConfig) error {
ps.stream = stream
return nil
}
// Stop 停止并清理持久化策略资源
func (ps *PersistenceStrategy) Stop() error {
return nil
}
// DropStrategy 丢弃策略实现
type DropStrategy struct {
@@ -379,7 +290,6 @@ func NewStrategyFactory() *StrategyFactory {
// 注册内置策略
factory.RegisterStrategy(StrategyBlock, func() DataProcessingStrategy { return NewBlockingStrategy() })
factory.RegisterStrategy(StrategyExpand, func() DataProcessingStrategy { return NewExpansionStrategy() })
factory.RegisterStrategy(StrategyPersist, func() DataProcessingStrategy { return NewPersistenceStrategy() })
factory.RegisterStrategy(StrategyDrop, func() DataProcessingStrategy { return NewDropStrategy() })
return factory
+2 -18
View File
@@ -28,11 +28,6 @@ func TestStrategyFactory(t *testing.T) {
strategyName: StrategyExpand,
expectedType: StrategyExpand,
},
{
name: "Persistence Strategy",
strategyName: StrategyPersist,
expectedType: StrategyPersist,
},
{
name: "Drop Strategy",
strategyName: StrategyDrop,
@@ -102,18 +97,7 @@ func TestExpansionStrategy_ProcessData(t *testing.T) {
}
// TestPersistenceStrategy_ProcessData 测试持久化策略数据处理
func TestPersistenceStrategy_ProcessData(t *testing.T) {
config := types.Config{
SimpleFields: []string{"name", "age"},
}
stream, err := NewStream(config)
require.NoError(t, err)
defer func() {
if stream != nil {
close(stream.done)
}
}()
}
// TestDropStrategy_ProcessData 测试丢弃策略数据处理
func TestDropStrategy_ProcessData(t *testing.T) {
@@ -353,7 +337,7 @@ func TestStrategyRegistration(t *testing.T) {
// 测试内置策略是否已注册
registeredStrategies := factory.GetRegisteredStrategies()
expectedStrategies := []string{StrategyBlock, StrategyExpand, StrategyPersist, StrategyDrop}
expectedStrategies := []string{StrategyBlock, StrategyExpand, StrategyDrop}
for _, expected := range expectedStrategies {
found := false
+6 -66
View File
@@ -46,18 +46,13 @@ const (
PerformanceLevelOptimal = "OPTIMAL"
)
// Persistence related constants
const (
PersistenceEnabled = "enabled"
PersistenceMessage = "message"
PersistenceNotEnabledMsg = "persistence not enabled"
PerformanceConfigKey = "performanceConfig"
)
// SQL keyword constants
const (
SQLKeywordCase = "CASE"
)
const (
PerformanceConfigKey = "performanceConfig"
)
type Stream struct {
dataChan chan map[string]interface{}
@@ -91,10 +86,9 @@ type Stream struct {
dropLogCount int64 // Count of drops since last log
// Data loss strategy configuration
allowDataDrop bool // Whether to allow data loss
blockingTimeout time.Duration // Blocking timeout duration
overflowStrategy string // Overflow strategy: "drop", "block", "expand", "persist"
persistenceManager *PersistenceManager // Persistence manager
allowDataDrop bool // Whether to allow data loss
blockingTimeout time.Duration // Blocking timeout duration
overflowStrategy string // Overflow strategy: "drop", "block", "expand", "persist"
// Data processing strategy using strategy pattern for better extensibility
dataStrategy DataProcessingStrategy // Data processing strategy instance
@@ -123,12 +117,6 @@ func NewStreamWithLowLatency(config types.Config) (*Stream, error) {
return factory.CreateLowLatencyStream(config)
}
// NewStreamWithZeroDataLoss 创建零数据丢失Stream
func NewStreamWithZeroDataLoss(config types.Config) (*Stream, error) {
factory := NewStreamFactory()
return factory.CreateZeroDataLossStream(config)
}
// NewStreamWithCustomPerformance 创建自定义性能配置的Stream
func NewStreamWithCustomPerformance(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error) {
factory := NewStreamFactory()
@@ -233,54 +221,6 @@ func (s *Stream) Stop() {
logger.Error("Failed to stop data strategy: %v", err)
}
}
// 停止持久化管理器
if s.persistenceManager != nil {
if err := s.persistenceManager.Stop(); err != nil {
logger.Error("Failed to stop persistence manager: %v", err)
}
}
}
// LoadAndReprocessPersistedData 加载并重新处理持久化数据
func (s *Stream) LoadAndReprocessPersistedData() error {
if s.persistenceManager == nil {
return fmt.Errorf("persistence manager not initialized")
}
// 加载持久化数据
err := s.persistenceManager.LoadAndRecoverData()
if err != nil {
return fmt.Errorf("failed to load persisted data: %w", err)
}
// 检查是否有恢复数据
if !s.persistenceManager.IsInRecoveryMode() {
logger.Info("No persistent data to recover")
return nil
}
logger.Info("Starting persistent data recovery process")
// 启动恢复处理协程
go s.checkAndProcessRecoveryData()
logger.Info("Persistent data recovery process started")
return nil
}
// GetPersistenceStats 获取持久化统计信息
func (s *Stream) GetPersistenceStats() map[string]interface{} {
if s.persistenceManager == nil {
return map[string]interface{}{
PersistenceEnabled: false,
PersistenceMessage: PersistenceNotEnabledMsg,
}
}
stats := s.persistenceManager.GetStats()
stats[PersistenceEnabled] = true
return stats
}
// IsAggregationQuery 检查当前流是否为聚合查询
-34
View File
@@ -54,12 +54,6 @@ func (sf *StreamFactory) CreateLowLatencyStream(config types.Config) (*Stream, e
return sf.createStreamWithUnifiedConfig(config)
}
// CreateZeroDataLossStream creates zero data loss Stream
func (sf *StreamFactory) CreateZeroDataLossStream(config types.Config) (*Stream, error) {
config.PerformanceConfig = types.ZeroDataLossConfig()
return sf.createStreamWithUnifiedConfig(config)
}
// CreateCustomPerformanceStream creates Stream with custom performance configuration
func (sf *StreamFactory) CreateCustomPerformanceStream(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error) {
config.PerformanceConfig = perfConfig
@@ -87,11 +81,6 @@ func (sf *StreamFactory) createStreamWithUnifiedConfig(config types.Config) (*St
// Create Stream instance
stream := sf.createStreamInstance(config, win)
// Initialize persistence manager
if err := sf.initializePersistenceManager(stream, config.PerformanceConfig); err != nil {
return nil, err
}
// Setup data processing strategy
if err := sf.setupDataProcessingStrategy(stream, config.PerformanceConfig); err != nil {
return nil, fmt.Errorf("failed to setup data processing strategy: %w", err)
@@ -137,29 +126,6 @@ func (sf *StreamFactory) createStreamInstance(config types.Config, win window.Wi
}
}
// initializePersistenceManager 初始化持久化管理器
// 当溢出策略设置为持久化时,检查并初始化持久化配置
func (sf *StreamFactory) initializePersistenceManager(stream *Stream, perfConfig types.PerformanceConfig) error {
if perfConfig.OverflowConfig.Strategy == StrategyPersist {
if perfConfig.OverflowConfig.PersistenceConfig == nil {
return fmt.Errorf("persistence strategy is enabled but PersistenceConfig is not provided. Please configure PersistenceConfig with DataDir, MaxFileSize, and FlushInterval. Example: perfConfig.OverflowConfig.PersistenceConfig = &types.PersistenceConfig{DataDir: \"./data\", MaxFileSize: 10*1024*1024, FlushInterval: 5*time.Second}")
}
persistConfig := perfConfig.OverflowConfig.PersistenceConfig
stream.persistenceManager = NewPersistenceManagerWithConfig(
persistConfig.DataDir,
persistConfig.MaxFileSize,
persistConfig.FlushInterval,
)
err := stream.persistenceManager.Start()
if err != nil {
return fmt.Errorf("failed to start persistence manager: %w", err)
}
// 尝试加载和恢复持久化数据
return stream.persistenceManager.LoadAndRecoverData()
}
return nil
}
// setupDataProcessingStrategy 设置数据处理策略
// 使用策略模式替代函数指针,提供更好的扩展性和可维护性
func (sf *StreamFactory) setupDataProcessingStrategy(stream *Stream, perfConfig types.PerformanceConfig) error {
File diff suppressed because it is too large Load Diff
-2
View File
@@ -148,8 +148,6 @@ func (s *Streamsql) Execute(sql string) error {
streamInstance, err = stream.NewStreamWithHighPerformance(*config)
case "low_latency":
streamInstance, err = stream.NewStreamWithLowLatency(*config)
case "zero_data_loss":
streamInstance, err = stream.NewStreamWithZeroDataLoss(*config)
case "custom":
if s.customConfig != nil {
streamInstance, err = stream.NewStreamWithCustomPerformance(*config, *s.customConfig)
+5 -50
View File
@@ -83,21 +83,13 @@ type BufferConfig struct {
// OverflowConfig overflow strategy configuration
type OverflowConfig struct {
Strategy string `json:"strategy"` // Overflow strategy: "drop", "block", "expand", "persist"
BlockTimeout time.Duration `json:"blockTimeout"` // Block timeout duration
AllowDataLoss bool `json:"allowDataLoss"` // Allow data loss
PersistenceConfig *PersistenceConfig `json:"persistenceConfig"` // Persistence configuration
ExpansionConfig ExpansionConfig `json:"expansionConfig"` // Expansion configuration
Strategy string `json:"strategy"` // Overflow strategy: "drop", "block", "expand"
BlockTimeout time.Duration `json:"blockTimeout"` // Block timeout duration
AllowDataLoss bool `json:"allowDataLoss"` // Allow data loss
ExpansionConfig ExpansionConfig `json:"expansionConfig"` // Expansion configuration
}
// PersistenceConfig persistence configuration
type PersistenceConfig struct {
DataDir string `json:"dataDir"` // Persistence data directory
MaxFileSize int64 `json:"maxFileSize"` // Maximum file size
FlushInterval time.Duration `json:"flushInterval"` // Flush interval
MaxRetries int `json:"maxRetries"` // Maximum retry count
RetryInterval time.Duration `json:"retryInterval"` // Retry interval
}
// ExpansionConfig expansion configuration
type ExpansionConfig struct {
@@ -216,40 +208,3 @@ func LowLatencyConfig() PerformanceConfig {
config.MonitoringConfig.StatsUpdateInterval = 1 * time.Second
return config
}
// ZeroDataLossConfig returns zero data loss configuration preset
// Provides maximum data protection using persistence strategy to prevent data loss
func ZeroDataLossConfig() PerformanceConfig {
config := DefaultPerformanceConfig()
config.BufferConfig.DataChannelSize = 2000
config.BufferConfig.ResultChannelSize = 200
config.BufferConfig.WindowOutputSize = 2000
config.BufferConfig.EnableDynamicResize = true
config.OverflowConfig.Strategy = "persist"
config.OverflowConfig.AllowDataLoss = false
config.OverflowConfig.PersistenceConfig = &PersistenceConfig{
DataDir: "./data",
MaxFileSize: 100 * 1024 * 1024, // 100MB
FlushInterval: 5 * time.Second,
MaxRetries: 3,
RetryInterval: 2 * time.Second,
}
return config
}
// PersistencePerformanceConfig returns persistence performance configuration preset
// Provides persistent storage functionality balancing performance and data durability
func PersistencePerformanceConfig() PerformanceConfig {
config := DefaultPerformanceConfig()
config.BufferConfig.DataChannelSize = 1500
config.BufferConfig.ResultChannelSize = 150
config.OverflowConfig.Strategy = "persist"
config.OverflowConfig.PersistenceConfig = &PersistenceConfig{
DataDir: "./persistence_data",
MaxFileSize: 10 * 1024 * 1024, // 10MB
FlushInterval: 5 * time.Second,
MaxRetries: 3,
RetryInterval: 2 * time.Second,
}
return config
}
-63
View File
@@ -378,34 +378,6 @@ func TestLowLatencyConfig(t *testing.T) {
assert.Equal(t, 1*time.Second, config.MonitoringConfig.StatsUpdateInterval)
}
// TestZeroDataLossConfig 测试ZeroDataLossConfig函数
func TestZeroDataLossConfig(t *testing.T) {
config := ZeroDataLossConfig()
// 验证零数据丢失配置
assert.Equal(t, 2000, config.BufferConfig.DataChannelSize)
assert.Equal(t, 200, config.BufferConfig.ResultChannelSize)
assert.Equal(t, "persist", config.OverflowConfig.Strategy)
assert.False(t, config.OverflowConfig.AllowDataLoss)
assert.NotNil(t, config.OverflowConfig.PersistenceConfig)
assert.Equal(t, "./data", config.OverflowConfig.PersistenceConfig.DataDir)
assert.Equal(t, int64(100*1024*1024), config.OverflowConfig.PersistenceConfig.MaxFileSize)
}
// TestPersistencePerformanceConfig 测试PersistencePerformanceConfig函数
func TestPersistencePerformanceConfig(t *testing.T) {
config := PersistencePerformanceConfig()
// 验证持久化性能配置
assert.Equal(t, 1500, config.BufferConfig.DataChannelSize)
assert.Equal(t, 150, config.BufferConfig.ResultChannelSize)
assert.Equal(t, "persist", config.OverflowConfig.Strategy)
assert.NotNil(t, config.OverflowConfig.PersistenceConfig)
assert.Equal(t, "./persistence_data", config.OverflowConfig.PersistenceConfig.DataDir)
assert.Equal(t, 5*time.Second, config.OverflowConfig.PersistenceConfig.FlushInterval)
assert.Equal(t, 3, config.OverflowConfig.PersistenceConfig.MaxRetries)
}
// TestBufferConfig 测试BufferConfig结构体
func TestBufferConfig(t *testing.T) {
config := BufferConfig{
@@ -425,41 +397,6 @@ func TestBufferConfig(t *testing.T) {
assert.Equal(t, 0.75, config.UsageThreshold)
}
// TestOverflowConfig 测试OverflowConfig结构体
func TestOverflowConfig(t *testing.T) {
persistenceConfig := &PersistenceConfig{
DataDir: "/tmp/data",
MaxFileSize: 1024 * 1024,
FlushInterval: 10 * time.Second,
MaxRetries: 5,
RetryInterval: 2 * time.Second,
}
expansionConfig := ExpansionConfig{
GrowthFactor: 2.0,
MinIncrement: 100,
TriggerThreshold: 0.9,
ExpansionTimeout: 30 * time.Second,
}
config := OverflowConfig{
Strategy: "persist",
BlockTimeout: 5 * time.Second,
AllowDataLoss: false,
PersistenceConfig: persistenceConfig,
ExpansionConfig: expansionConfig,
}
assert.Equal(t, "persist", config.Strategy)
assert.Equal(t, 5*time.Second, config.BlockTimeout)
assert.False(t, config.AllowDataLoss)
assert.NotNil(t, config.PersistenceConfig)
assert.Equal(t, "/tmp/data", config.PersistenceConfig.DataDir)
assert.Equal(t, int64(1024*1024), config.PersistenceConfig.MaxFileSize)
assert.Equal(t, 2.0, config.ExpansionConfig.GrowthFactor)
assert.Equal(t, 100, config.ExpansionConfig.MinIncrement)
}
// TestWorkerConfig 测试WorkerConfig结构体
func TestWorkerConfig(t *testing.T) {
config := WorkerConfig{
-7
View File
@@ -644,13 +644,6 @@ func TestWindowWithPerformanceConfig(t *testing.T) {
expectedBufferSize: 20, // 200 / 10
extraParams: map[string]interface{}{"count": 10},
},
{
name: "会话窗口-零数据丢失配置",
windowType: TypeSession,
performanceConfig: types.ZeroDataLossConfig(),
expectedBufferSize: 200, // 2000 / 10
extraParams: map[string]interface{}{"timeout": "30s"},
},
{
name: "自定义性能配置",
windowType: TypeTumbling,