diff --git a/examples/persistence/README.md b/examples/persistence/README.md deleted file mode 100644 index ccacf2b..0000000 --- a/examples/persistence/README.md +++ /dev/null @@ -1,85 +0,0 @@ -# StreamSQL 持久化功能示例 - -本示例演示了 StreamSQL 的持久化功能,包括有序持久化机制、数据恢复和零数据丢失配置。 - -## 功能特性 - -### 1. 有序持久化机制 -- 通过全局序列号保证数据时序性 -- 当内存通道满时,数据按序持久化到磁盘 -- 避免传统持久化中可能出现的数据乱序问题 -- 实现真正的先进先出(FIFO)数据处理 - -### 2. 数据溢出持久化 -- 演示小文件大小配置下的文件轮转 -- 快速发送大量数据触发文件轮转机制 -- 展示持久化统计信息 - -### 3. 程序重启数据恢复 -- 模拟程序重启场景 -- 从磁盘加载之前持久化的数据 -- 按原始顺序恢复数据处理 -- 验证数据完整性 - -### 4. 零数据丢失配置 -- 高频刷新确保数据安全 -- 关键数据持久化演示 -- 数据完整性验证 - -### 5. 持久化文件分析 -- 自动扫描持久化目录 -- 显示文件信息(大小、修改时间) -- 读取并展示文件内容 - -## 运行示例 - -```bash -cd examples/persistence -go run main.go -``` - -## 输出说明 - -示例运行后会在当前目录下创建以下目录: -- `./persistence_data` - 基本持久化数据 -- `./streamsql_overflow_data` - 溢出持久化数据 -- `./zero_loss_data` - 零数据丢失配置数据 - -每个目录包含以 `streamsql_ordered_` 开头的日志文件,文件内容为 JSON 格式的持久化数据。 - -## 核心 API - -### PersistenceManager - -```go -// 创建持久化管理器 -pm := stream.NewPersistenceManager(dataDir) - -// 创建带配置的持久化管理器 -pm := stream.NewPersistenceManagerWithConfig(dataDir, maxFileSize, flushInterval) - -// 启动管理器 -err := pm.Start() - -// 持久化数据 -err := pm.PersistData(data) - -// 加载并恢复数据 -err := pm.LoadAndRecoverData() - -// 获取恢复数据 -data, hasMore := pm.GetRecoveryData() - -// 获取统计信息 -stats := pm.GetStats() - -// 停止管理器 -pm.Stop() -``` - -## 注意事项 - -1. 确保有足够的磁盘空间用于持久化数据 -2. 持久化目录会自动创建,无需手动创建 -3. 程序退出时会自动清理资源 -4. 建议在生产环境中根据实际需求调整文件大小和刷新间隔 \ No newline at end of file diff --git a/examples/persistence/main.go b/examples/persistence/main.go deleted file mode 100644 index f9005a8..0000000 --- a/examples/persistence/main.go +++ /dev/null @@ -1,457 +0,0 @@ -/* - * Copyright 2025 The RuleGo Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "fmt" - "os" - "path/filepath" - "time" - - "github.com/rulego/streamsql/stream" -) - -// main 主函数,演示StreamSQL持久化功能的完整示例 -// 包含有序持久化机制、数据恢复和零数据丢失配置 -func main() { - fmt.Println("=== StreamSQL 持久化功能完整示例 ===") - fmt.Println("演示有序持久化机制、数据恢复和零数据丢失配置") - fmt.Println() - - // 清理之前的测试数据 - cleanupTestData() - - // 示例1: 有序持久化机制演示 - fmt.Println("📌 示例1: 有序持久化机制演示") - orderedPersistenceExample() - - // 示例2: 数据溢出持久化测试 - fmt.Println("\n📌 示例2: 数据溢出持久化测试") - testDataOverflowPersistence() - - // 示例3: 程序重启数据恢复测试 - fmt.Println("\n📌 示例3: 程序重启数据恢复测试") - testDataRecovery() - - // 示例4: 零数据丢失配置 - fmt.Println("\n📌 示例4: 零数据丢失配置") - createZeroDataLossExample() - - // 示例5: 持久化文件分析 - fmt.Println("\n📌 示例5: 持久化文件分析") - analyzePersistenceFiles() - - fmt.Println("\n✅ 持久化功能完整示例演示完成!") -} - -// orderedPersistenceExample 演示有序持久化机制的使用 -// 展示如何配置和使用有序持久化来保证数据时序性 -func orderedPersistenceExample() { - fmt.Println("演示如何使用有序持久化机制保证数据时序性") - fmt.Println() - - // 创建临时目录 - tempDir := "./persistence_data" - os.RemoveAll(tempDir) // 清理之前的数据 - - // 1. 创建持久化管理器 - pm := stream.NewPersistenceManager(tempDir) - if pm == nil { - fmt.Println("创建持久化管理器失败") - return - } - - // 2. 启动管理器 - err := pm.Start() - if err != nil { - fmt.Printf("启动持久化管理器失败: %v\n", err) - return - } - defer pm.Stop() - - fmt.Println("持久化管理器已启动,开始持久化测试数据...") - - // 3. 持久化测试数据 - testData := []map[string]interface{}{ - {"message": "test_data_1", "id": 1, "timestamp": time.Now().UnixNano()}, - {"message": "test_data_2", "id": 2, "timestamp": time.Now().UnixNano()}, - {"message": "test_data_3", "id": 3, "timestamp": time.Now().UnixNano()}, - {"message": "test_data_4", "id": 4, "timestamp": time.Now().UnixNano()}, - {"message": "test_data_5", "id": 5, "timestamp": time.Now().UnixNano()}, - } - - for i, data := range testData { - err := pm.PersistData(data) - if err != nil { - fmt.Printf("持久化数据失败: %v\n", err) - return - } - fmt.Printf("已持久化数据 %d: %v\n", i+1, data["message"]) - } - - // 4. 等待数据刷新到磁盘 - fmt.Println("等待数据刷新到磁盘...") - time.Sleep(3 * time.Second) - - // 5. 显示统计信息 - stats := pm.GetStats() - fmt.Println("\n=== 持久化统计信息 ===") - if totalPersisted, ok := stats["total_persisted"].(int64); ok { - fmt.Printf("总持久化数据: %d 条\n", totalPersisted) - } - if filesCreated, ok := stats["files_created"].(int64); ok { - fmt.Printf("创建文件数: %d 个\n", filesCreated) - } - if sequenceCounter, ok := stats["sequence_counter"].(int64); ok { - fmt.Printf("序列号计数器: %d\n", sequenceCounter) - } - if running, ok := stats["running"].(bool); ok { - fmt.Printf("运行状态: %v\n", running) - } - - fmt.Println("\n=== 有序持久化机制说明 ===") - fmt.Println("1. 有序持久化机制通过全局序列号保证数据时序性") - fmt.Println("2. 当内存通道满时,数据按序持久化到磁盘") - fmt.Println("3. 系统恢复时,数据按原始顺序从磁盘加载并处理") - fmt.Println("4. 避免了传统持久化中可能出现的数据乱序问题") - fmt.Println("5. 实现了真正的先进先出(FIFO)数据处理") -} - -// testDataOverflowPersistence 测试数据溢出时的持久化功能 -// 通过创建小缓冲区并快速发送大量数据来触发溢出和持久化 -func testDataOverflowPersistence() { - // 创建临时目录 - tempDir := "./streamsql_overflow_data" - os.RemoveAll(tempDir) // 清理之前的数据 - - // 使用较小的文件大小以触发轮转 - pm := stream.NewPersistenceManagerWithConfig(tempDir, 100, 50*time.Millisecond) - if pm == nil { - fmt.Println("创建持久化管理器失败") - return - } - - err := pm.Start() - if err != nil { - fmt.Printf("启动持久化管理器失败: %v\n", err) - return - } - defer pm.Stop() - - // 快速发送大量数据,触发文件轮转 - inputCount := 20 - fmt.Printf("快速发送 %d 条数据以触发文件轮转...\n", inputCount) - - start := time.Now() - for i := 0; i < inputCount; i++ { - longData := map[string]interface{}{ - "message": fmt.Sprintf("this_is_a_long_data_string_to_trigger_file_rotation_%d", i), - "id": i, - "extra": "some extra data to make it longer", - } - err := pm.PersistData(longData) - if err != nil { - fmt.Printf("持久化数据失败: %v\n", err) - return - } - if i%5 == 0 { - fmt.Printf("已发送 %d 条数据\n", i+1) - } - } - duration := time.Since(start) - - // 等待数据刷新 - fmt.Println("等待数据刷新...") - time.Sleep(200 * time.Millisecond) - - // 获取统计信息 - stats := pm.GetStats() - fmt.Printf("\n=== 溢出持久化统计 ===\n") - fmt.Printf("⏱️ 发送耗时: %v\n", duration) - if totalPersisted, ok := stats["total_persisted"].(int64); ok { - fmt.Printf("📊 总持久化数据: %d 条\n", totalPersisted) - } - if filesCreated, ok := stats["files_created"].(int64); ok { - fmt.Printf("📊 创建文件数: %d 个\n", filesCreated) - if filesCreated > 1 { - fmt.Println("✅ 文件轮转成功") - } - } - if sequenceCounter, ok := stats["sequence_counter"].(int64); ok { - fmt.Printf("📊 序列号计数器: %d\n", sequenceCounter) - } -} - -// testDataRecovery 测试程序重启后的数据恢复功能 -// 模拟程序重启,加载之前持久化的数据并重新处理 -func testDataRecovery() { - // 使用与第一个示例相同的目录 - tempDir := "./persistence_data" - - // 第一阶段:持久化数据 - fmt.Println("第一阶段:持久化数据") - pm1 := stream.NewPersistenceManager(tempDir) - if pm1 == nil { - fmt.Println("创建持久化管理器失败") - return - } - - err := pm1.Start() - if err != nil { - fmt.Printf("启动持久化管理器失败: %v\n", err) - return - } - - // 持久化测试数据 - testData := []map[string]interface{}{ - {"message": "recovery_data_1", "id": 1}, - {"message": "recovery_data_2", "id": 2}, - {"message": "recovery_data_3", "id": 3}, - {"message": "recovery_data_4", "id": 4}, - {"message": "recovery_data_5", "id": 5}, - } - - for i, data := range testData { - err := pm1.PersistData(data) - if err != nil { - fmt.Printf("持久化数据失败: %v\n", err) - return - } - fmt.Printf("已持久化恢复数据 %d: %v\n", i+1, data["message"]) - } - - // 等待数据刷新到磁盘 - fmt.Println("等待数据刷新到磁盘...") - time.Sleep(3 * time.Second) - - pm1.Stop() - fmt.Println("第一阶段完成,模拟程序重启...") - - // 第二阶段:恢复数据 - fmt.Println("\n第二阶段:恢复数据") - pm2 := stream.NewPersistenceManager(tempDir) - if pm2 == nil { - fmt.Println("创建恢复管理器失败") - return - } - - err = pm2.Start() - if err != nil { - fmt.Printf("启动恢复管理器失败: %v\n", err) - return - } - defer pm2.Stop() - - // 加载并恢复数据 - err = pm2.LoadAndRecoverData() - if err != nil { - fmt.Printf("加载恢复数据失败: %v\n", err) - return - } - - // 等待恢复数据填充到队列中 - time.Sleep(200 * time.Millisecond) - - // 按序获取恢复数据 - fmt.Println("\n=== 恢复的数据 ===") - recoveredData := make([]map[string]interface{}, 0) - for i := 0; i < len(testData); i++ { - data, hasMore := pm2.GetRecoveryData() - if hasMore && data != nil { - recoveredData = append(recoveredData, data) - fmt.Printf("恢复数据 %d: %v\n", i+1, data["message"]) - } else { - break - } - } - - // 验证数据完整性 - fmt.Printf("\n原始数据数量: %d\n", len(testData)) - fmt.Printf("恢复数据数量: %d\n", len(recoveredData)) - if len(testData) == len(recoveredData) { - fmt.Println("✅ 数据恢复完整") - } else { - fmt.Println("❌ 数据恢复不完整") - } -} - -// createZeroDataLossExample 创建零数据丢失配置的示例 -// 使用持久化管理器演示零数据丢失配置 -func createZeroDataLossExample() { - fmt.Println("演示零数据丢失配置") - - // 创建专用目录 - tempDir := "./zero_loss_data" - os.RemoveAll(tempDir) // 清理之前的数据 - - // 使用更频繁的刷新间隔以确保数据安全 - pm := stream.NewPersistenceManagerWithConfig(tempDir, 5*1024*1024, 1*time.Second) - if pm == nil { - fmt.Println("创建持久化管理器失败") - return - } - - err := pm.Start() - if err != nil { - fmt.Printf("启动持久化管理器失败: %v\n", err) - return - } - defer pm.Stop() - - // 发送关键数据 - criticalData := []map[string]interface{}{ - {"id": 1, "transaction": "critical_transaction_1", "amount": 1000.50}, - {"id": 2, "transaction": "critical_transaction_2", "amount": 2500.75}, - {"id": 3, "transaction": "critical_transaction_3", "amount": 750.25}, - {"id": 4, "transaction": "critical_transaction_4", "amount": 3200.00}, - {"id": 5, "transaction": "critical_transaction_5", "amount": 1800.90}, - } - - fmt.Println("发送关键数据(零数据丢失模式)...") - for i, data := range criticalData { - err := pm.PersistData(data) - if err != nil { - fmt.Printf("持久化关键数据失败: %v\n", err) - return - } - fmt.Printf("已持久化关键数据 %d: %v (金额: %.2f)\n", i+1, data["transaction"], data["amount"]) - time.Sleep(100 * time.Millisecond) // 模拟实际处理间隔 - } - - // 等待所有数据刷新到磁盘 - fmt.Println("等待所有关键数据刷新到磁盘...") - time.Sleep(3 * time.Second) - - // 获取统计信息 - stats := pm.GetStats() - - fmt.Printf("\n=== 零数据丢失统计 ===\n") - if totalPersisted, ok := stats["total_persisted"].(int64); ok { - fmt.Printf("📊 总持久化数据: %d 条\n", totalPersisted) - if totalPersisted == int64(len(criticalData)) { - fmt.Println("✅ 零数据丢失验证成功") - } else { - fmt.Println("❌ 检测到数据丢失") - } - } - if filesCreated, ok := stats["files_created"].(int64); ok { - fmt.Printf("📊 创建文件数: %d 个\n", filesCreated) - } - if sequenceCounter, ok := stats["sequence_counter"].(int64); ok { - fmt.Printf("📊 序列号计数器: %d\n", sequenceCounter) - } - fmt.Printf("📊 刷新间隔: 1秒(高频刷新确保数据安全)\n") - fmt.Printf("📊 最大文件大小: 5MB\n") -} - -// analyzePersistenceFiles 分析持久化文件的内容和统计信息 -// 检查持久化目录中的文件,显示文件大小和内容预览 -func analyzePersistenceFiles() { - dataDirs := []string{"./streamsql_overflow_data", "./persistence_data", "./zero_loss_data"} - - for _, dataDir := range dataDirs { - fmt.Printf("\n检查目录: %s\n", dataDir) - - // 检查持久化目录 - if _, err := os.Stat(dataDir); os.IsNotExist(err) { - fmt.Println("目录不存在") - continue - } - - // 列出所有持久化文件 - files, err := filepath.Glob(filepath.Join(dataDir, "streamsql_*.log")) - if err != nil { - fmt.Printf("读取持久化文件失败: %v\n", err) - continue - } - - if len(files) == 0 { - fmt.Println("没有找到持久化文件(可能已被恢复过程删除)") - continue - } - - fmt.Printf("发现 %d 个持久化文件:\n", len(files)) - for i, file := range files { - info, err := os.Stat(file) - if err != nil { - fmt.Printf(" %d. %s (无法读取文件信息)\n", i+1, filepath.Base(file)) - continue - } - fmt.Printf(" %d. %s (大小: %d bytes, 修改时间: %s)\n", - i+1, filepath.Base(file), info.Size(), info.ModTime().Format("15:04:05")) - } - - // 读取第一个文件的前几行内容 - if len(files) > 0 { - fmt.Printf("\n第一个文件的前3行内容:\n") - showFileContent(files[0], 3) - } - } -} - -// showFileContent 显示指定文件的前几行内容 -// filename: 要读取的文件路径 -// maxLines: 最大显示行数 -func showFileContent(filename string, maxLines int) { - file, err := os.Open(filename) - if err != nil { - fmt.Printf("无法打开文件: %v\n", err) - return - } - defer file.Close() - - buffer := make([]byte, 1024) - n, err := file.Read(buffer) - if err != nil { - fmt.Printf("无法读取文件: %v\n", err) - return - } - - content := string(buffer[:n]) - lines := []rune(content) - lineCount := 0 - currentLine := "" - - for _, char := range lines { - if char == '\n' { - lineCount++ - fmt.Printf(" %d: %s\n", lineCount, currentLine) - currentLine = "" - if lineCount >= maxLines { - break - } - } else { - currentLine += string(char) - } - } - - if currentLine != "" && lineCount < maxLines { - fmt.Printf(" %d: %s\n", lineCount+1, currentLine) - } -} - -// cleanupTestData 清理测试产生的持久化数据 -// 删除测试目录及其所有内容,为新的测试做准备 -func cleanupTestData() { - dataDirs := []string{"./streamsql_overflow_data", "./persistence_data", "./zero_loss_data"} - for _, dataDir := range dataDirs { - if err := os.RemoveAll(dataDir); err != nil { - fmt.Printf("清理测试数据失败 (%s): %v\n", dataDir, err) - } - } - fmt.Println("测试数据清理完成") -} diff --git a/examples/unified_config/demo.go b/examples/unified_config/demo.go deleted file mode 100644 index 618e31c..0000000 --- a/examples/unified_config/demo.go +++ /dev/null @@ -1,218 +0,0 @@ -package main - -import ( - "fmt" - "log" - "strings" - "time" - - "github.com/rulego/streamsql/stream" - "github.com/rulego/streamsql/types" -) - -func main() { - fmt.Println("=== StreamSQL 统一配置系统演示 ===") - - // 1. 使用新的配置API创建默认配置Stream - fmt.Println("\n1. 默认配置Stream:") - defaultConfig := types.NewConfig() - defaultConfig.SimpleFields = []string{"temperature", "humidity", "location"} - - defaultStream, err := stream.NewStream(defaultConfig) - if err != nil { - log.Fatal(err) - } - printStreamStats("默认配置", defaultStream) - - // 2. 使用高性能预设配置 - fmt.Println("\n2. 高性能配置Stream:") - highPerfConfig := types.NewConfigWithPerformance(types.HighPerformanceConfig()) - highPerfConfig.SimpleFields = []string{"temperature", "humidity", "location"} - - highPerfStream, err := stream.NewStreamWithHighPerformance(highPerfConfig) - if err != nil { - log.Fatal(err) - } - printStreamStats("高性能配置", highPerfStream) - - // 3. 使用低延迟预设配置 - fmt.Println("\n3. 低延迟配置Stream:") - lowLatencyConfig := types.NewConfigWithPerformance(types.LowLatencyConfig()) - lowLatencyConfig.SimpleFields = []string{"temperature", "humidity", "location"} - - lowLatencyStream, err := stream.NewStreamWithLowLatency(lowLatencyConfig) - if err != nil { - log.Fatal(err) - } - printStreamStats("低延迟配置", lowLatencyStream) - - // 4. 使用零数据丢失预设配置 - fmt.Println("\n4. 零数据丢失配置Stream:") - zeroLossConfig := types.NewConfigWithPerformance(types.ZeroDataLossConfig()) - zeroLossConfig.SimpleFields = []string{"temperature", "humidity", "location"} - - zeroLossStream, err := stream.NewStreamWithZeroDataLoss(zeroLossConfig) - if err != nil { - log.Fatal(err) - } - printStreamStats("零数据丢失配置", zeroLossStream) - - // 5. 使用持久化预设配置 - fmt.Println("\n5. 持久化配置Stream:") - persistConfig := types.NewConfigWithPerformance(types.PersistencePerformanceConfig()) - persistConfig.SimpleFields = []string{"temperature", "humidity", "location"} - - persistStream, err := stream.NewStreamWithCustomPerformance(persistConfig, types.PersistencePerformanceConfig()) - if err != nil { - log.Fatal(err) - } - printStreamStats("持久化配置", persistStream) - - // 6. 创建完全自定义的配置 - fmt.Println("\n6. 自定义配置Stream:") - customPerfConfig := types.PerformanceConfig{ - BufferConfig: types.BufferConfig{ - DataChannelSize: 30000, - ResultChannelSize: 25000, - WindowOutputSize: 3000, - EnableDynamicResize: true, - MaxBufferSize: 200000, - UsageThreshold: 0.85, - }, - OverflowConfig: types.OverflowConfig{ - Strategy: "expand", - BlockTimeout: 15 * time.Second, - AllowDataLoss: false, - ExpansionConfig: types.ExpansionConfig{ - GrowthFactor: 2.0, - MinIncrement: 2000, - TriggerThreshold: 0.9, - ExpansionTimeout: 3 * time.Second, - }, - }, - WorkerConfig: types.WorkerConfig{ - SinkPoolSize: 800, - SinkWorkerCount: 12, - MaxRetryRoutines: 10, - }, - MonitoringConfig: types.MonitoringConfig{ - EnableMonitoring: true, - StatsUpdateInterval: 500 * time.Millisecond, - EnableDetailedStats: true, - WarningThresholds: types.WarningThresholds{ - DropRateWarning: 5.0, - DropRateCritical: 15.0, - BufferUsageWarning: 75.0, - BufferUsageCritical: 90.0, - }, - }, - } - - customConfig := types.NewConfigWithPerformance(customPerfConfig) - customConfig.SimpleFields = []string{"temperature", "humidity", "location"} - - customStream, err := stream.NewStreamWithCustomPerformance(customConfig, customPerfConfig) - if err != nil { - log.Fatal(err) - } - printStreamStats("自定义配置", customStream) - - // 7. 配置比较演示 - fmt.Println("\n7. 配置比较:") - compareConfigurations() - - // 8. 实时数据处理演示 - fmt.Println("\n8. 实时数据处理演示:") - demonstrateRealTimeProcessing(defaultStream) - - // 9. 窗口统一配置演示 - fmt.Println("\n9. 窗口统一配置演示:") - demonstrateWindowConfig() - - // 清理资源 - fmt.Println("\n10. 清理资源...") - defaultStream.Stop() - highPerfStream.Stop() - lowLatencyStream.Stop() - zeroLossStream.Stop() - persistStream.Stop() - customStream.Stop() - - fmt.Println("\n=== 演示完成 ===") -} - -func printStreamStats(name string, s *stream.Stream) { - stats := s.GetStats() - detailedStats := s.GetDetailedStats() - - fmt.Printf("【%s】统计信息:\n", name) - fmt.Printf(" 数据通道: %d/%d (使用率: %.1f%%)\n", - stats["data_chan_len"], stats["data_chan_cap"], - detailedStats["data_chan_usage"]) - fmt.Printf(" 结果通道: %d/%d (使用率: %.1f%%)\n", - stats["result_chan_len"], stats["result_chan_cap"], - detailedStats["result_chan_usage"]) - fmt.Printf(" 工作池: %d/%d (使用率: %.1f%%)\n", - stats["sink_pool_len"], stats["sink_pool_cap"], - detailedStats["sink_pool_usage"]) - fmt.Printf(" 性能等级: %s\n", detailedStats["performance_level"]) -} - -func compareConfigurations() { - configs := map[string]types.PerformanceConfig{ - "默认配置": types.DefaultPerformanceConfig(), - "高性能配置": types.HighPerformanceConfig(), - "低延迟配置": types.LowLatencyConfig(), - "零丢失配置": types.ZeroDataLossConfig(), - "持久化配置": types.PersistencePerformanceConfig(), - } - - fmt.Printf("%-12s %-10s %-10s %-10s %-10s %-15s\n", - "配置类型", "数据缓冲", "结果缓冲", "工作池", "工作线程", "溢出策略") - fmt.Println(strings.Repeat("-", 75)) - - for name, config := range configs { - fmt.Printf("%-12s %-10d %-10d %-10d %-10d %-15s\n", - name, - config.BufferConfig.DataChannelSize, - config.BufferConfig.ResultChannelSize, - config.WorkerConfig.SinkPoolSize, - config.WorkerConfig.SinkWorkerCount, - config.OverflowConfig.Strategy) - } -} - -func demonstrateRealTimeProcessing(s *stream.Stream) { - // 设置数据接收器 - s.AddSink(func(data []map[string]interface{}) { - fmt.Printf(" 接收到处理结果: %v\n", data) - }) - - // 启动流处理 - s.Start() - - // 模拟发送数据 - for i := 0; i < 3; i++ { - data := map[string]interface{}{ - "temperature": 20.0 + float64(i)*2.5, - "humidity": 60.0 + float64(i)*5, - "location": fmt.Sprintf("sensor_%d", i+1), - "timestamp": time.Now().Unix(), - } - - fmt.Printf(" 发送数据: %v\n", data) - s.Emit(data) - time.Sleep(100 * time.Millisecond) - } - - // 等待处理完成 - time.Sleep(200 * time.Millisecond) - - // 显示最终统计 - finalStats := s.GetDetailedStats() - fmt.Printf(" 最终统计 - 输入: %d, 输出: %d, 丢弃: %d, 处理率: %.1f%%\n", - finalStats["basic_stats"].(map[string]int64)["input_count"], - finalStats["basic_stats"].(map[string]int64)["output_count"], - finalStats["basic_stats"].(map[string]int64)["dropped_count"], - finalStats["process_rate"]) -} diff --git a/examples/unified_config/window_config_demo.go b/examples/unified_config/window_config_demo.go deleted file mode 100644 index 271638a..0000000 --- a/examples/unified_config/window_config_demo.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "fmt" - "time" - - "github.com/rulego/streamsql" - "github.com/rulego/streamsql/types" -) - -// demonstrateWindowConfig 演示窗口统一配置的使用 -func demonstrateWindowConfig() { - fmt.Println("=== 窗口统一配置演示 ===") - - // 1. 测试默认配置的窗口 - fmt.Println("\n1. 默认配置窗口测试") - testWindowWithConfig("默认配置", streamsql.New()) - - // 2. 测试高性能配置的窗口 - fmt.Println("\n2. 高性能配置窗口测试") - testWindowWithConfig("高性能配置", streamsql.New(streamsql.WithHighPerformance())) - - // 3. 测试低延迟配置的窗口 - fmt.Println("\n3. 低延迟配置窗口测试") - testWindowWithConfig("低延迟配置", streamsql.New(streamsql.WithLowLatency())) - - // 4. 测试自定义配置的窗口 - fmt.Println("\n4. 自定义配置窗口测试") - customConfig := types.DefaultPerformanceConfig() - customConfig.BufferConfig.WindowOutputSize = 2000 // 自定义窗口输出缓冲区大小 - testWindowWithConfig("自定义配置", streamsql.New(streamsql.WithCustomPerformance(customConfig))) - - fmt.Println("\n=== 窗口配置演示完成 ===") -} - -func testWindowWithConfig(configName string, ssql *streamsql.Streamsql) { - // 执行一个简单的滚动窗口查询 - sql := "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('2s')" - - err := ssql.Execute(sql) - if err != nil { - fmt.Printf("❌ %s - 执行SQL失败: %v\n", configName, err) - return - } - - // 添加结果处理器 - stream := ssql.Stream() - if stream != nil { - stream.AddSink(func(result []map[string]interface{}) { - fmt.Printf("📊 %s - 窗口结果: %v\n", configName, result) - }) - - // 发送测试数据 - for i := 0; i < 5; i++ { - data := map[string]interface{}{ - "deviceId": fmt.Sprintf("device_%d", i%2), - "temperature": 20.0 + float64(i), - "timestamp": time.Now(), - } - ssql.Emit(data) - } - - // 等待处理完成 - time.Sleep(3 * time.Second) - - // 获取统计信息 - stats := ssql.GetDetailedStats() - fmt.Printf("📈 %s - 统计信息: %v\n", configName, stats) - } - - // 停止流处理 - ssql.Stop() - fmt.Printf("✅ %s - 测试完成\n", configName) -} diff --git a/options.go b/options.go index 1dd3408..77351d8 100644 --- a/options.go +++ b/options.go @@ -72,32 +72,6 @@ func WithCustomPerformance(config types.PerformanceConfig) Option { } } -// WithPersistence uses persistence configuration preset -func WithPersistence() Option { - return func(s *Streamsql) { - s.performanceMode = "custom" - persistConfig := types.PersistencePerformanceConfig() - s.customConfig = &persistConfig - } -} - -// WithCustomPersistence uses custom persistence configuration -func WithCustomPersistence(dataDir string, maxFileSize int64, flushInterval time.Duration) Option { - return func(s *Streamsql) { - s.performanceMode = "custom" - config := types.DefaultPerformanceConfig() - config.OverflowConfig.Strategy = "persist" - config.OverflowConfig.PersistenceConfig = &types.PersistenceConfig{ - DataDir: dataDir, - MaxFileSize: maxFileSize, - FlushInterval: flushInterval, - MaxRetries: 3, - RetryInterval: 2 * time.Second, - } - s.customConfig = &config - } -} - // WithBufferSizes sets custom buffer sizes func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option { return func(s *Streamsql) { diff --git a/options_test.go b/options_test.go index 815d0d4..229a182 100644 --- a/options_test.go +++ b/options_test.go @@ -114,39 +114,6 @@ func TestWithCustomPerformance(t *testing.T) { }) } -// TestWithPersistence 测试持久化配置选项 -func TestWithPersistence(t *testing.T) { - t.Run("设置持久化模式", func(t *testing.T) { - s := New(WithPersistence()) - - assert.Equal(t, "custom", s.performanceMode) - assert.NotNil(t, s.customConfig) - // 验证持久化配置是否正确设置 - assert.Equal(t, "persist", s.customConfig.OverflowConfig.Strategy) - }) -} - -// TestWithCustomPersistence 测试自定义持久化配置选项 -func TestWithCustomPersistence(t *testing.T) { - t.Run("设置自定义持久化配置", func(t *testing.T) { - dataDir := "./test_data" - maxFileSize := int64(50 * 1024 * 1024) // 50MB - flushInterval := 10 * time.Second - - s := New(WithCustomPersistence(dataDir, maxFileSize, flushInterval)) - - assert.Equal(t, "custom", s.performanceMode) - assert.NotNil(t, s.customConfig) - assert.Equal(t, "persist", s.customConfig.OverflowConfig.Strategy) - assert.NotNil(t, s.customConfig.OverflowConfig.PersistenceConfig) - assert.Equal(t, dataDir, s.customConfig.OverflowConfig.PersistenceConfig.DataDir) - assert.Equal(t, maxFileSize, s.customConfig.OverflowConfig.PersistenceConfig.MaxFileSize) - assert.Equal(t, flushInterval, s.customConfig.OverflowConfig.PersistenceConfig.FlushInterval) - assert.Equal(t, 3, s.customConfig.OverflowConfig.PersistenceConfig.MaxRetries) - assert.Equal(t, 2*time.Second, s.customConfig.OverflowConfig.PersistenceConfig.RetryInterval) - }) -} - // TestWithBufferSizes 测试缓冲区大小配置选项 func TestWithBufferSizes(t *testing.T) { t.Run("设置自定义缓冲区大小", func(t *testing.T) { diff --git a/stream/doc.go b/stream/doc.go index 6b7f012..6d93864 100644 --- a/stream/doc.go +++ b/stream/doc.go @@ -44,7 +44,7 @@ The stream processing pipeline consists of several key components: config types.Config // Stream configuration sinks []func([]map[string]interface{}) // Result processors resultChan chan []map[string]interface{} // Result channel - persistenceManager *PersistenceManager // Data persistence + dataStrategy DataProcessingStrategy // Data processing strategy } @@ -176,25 +176,7 @@ Comprehensive performance monitoring: fmt.Printf("Throughput: %.2f records/sec\n", detailed["throughput"]) fmt.Printf("Memory Usage: %d bytes\n", detailed["memory_usage"]) -# Persistence and Reliability -Optional data persistence for enhanced reliability: - - type PersistenceManager struct { - enabled bool - storageType string // "memory", "file", "database" - batchSize int // Persistence batch size - flushInterval time.Duration // Automatic flush interval - recoveryMode string // Recovery strategy - } - - // Enable persistence - stream.EnablePersistence(PersistenceConfig{ - StorageType: "file", - BatchSize: 100, - FlushInterval: 5 * time.Second, - RecoveryMode: "automatic", - }) # Backpressure Management diff --git a/stream/handler_data.go b/stream/handler_data.go index bffa023..10981d5 100644 --- a/stream/handler_data.go +++ b/stream/handler_data.go @@ -121,104 +121,3 @@ migration_done: logger.Debug("Channel expansion completed: migrated %d items", migratedCount) } - -// checkAndProcessRecoveryData recovery data processing -// Solves overflow leakage issues, implements exponential backoff and retry limits -func (s *Stream) checkAndProcessRecoveryData() { - // Prevent duplicate recovery goroutines - if atomic.LoadInt32(&s.activeRetries) >= s.maxRetryRoutines { - return - } - - atomic.AddInt32(&s.activeRetries, 1) - defer atomic.AddInt32(&s.activeRetries, -1) - - // Check if persistence manager exists - if s.persistenceManager == nil { - return - } - - // Backoff strategy parameters - baseBackoff := 100 * time.Millisecond - maxBackoff := 5 * time.Second - currentBackoff := baseBackoff - consecutiveFailures := 0 - maxConsecutiveFailures := 10 - - // Continuously check recovery data until no more data or Stream stops - ticker := time.NewTicker(currentBackoff) - defer ticker.Stop() - - maxProcessTime := 30 * time.Second // Maximum processing time - timeout := time.NewTimer(maxProcessTime) - defer timeout.Stop() - - processedCount := 0 - droppedCount := 0 - - for { - select { - case <-ticker.C: - // Try to get recovery data - if recoveredData, hasData := s.persistenceManager.GetRecoveryData(); hasData { - // Try to send recovery data to processing channel - if s.safeSendToDataChan(recoveredData) { - processedCount++ - consecutiveFailures = 0 - // Reset backoff time - currentBackoff = baseBackoff - ticker.Reset(currentBackoff) - logger.Debug("Successfully processed recovered data item %d", processedCount) - } else { - consecutiveFailures++ - - // Check if this data should be retried - if !s.persistenceManager.ShouldRetryRecoveredData(recoveredData) { - // Exceeded retry limit, move to dead letter queue - logger.Warn("Recovered data exceeded retry limit, moving to dead letter queue") - s.persistenceManager.MoveToDeadLetterQueue(recoveredData) - droppedCount++ - } else { - // Re-persist this data (increment retry count) - if err := s.persistenceManager.RePersistRecoveredData(recoveredData); err != nil { - logger.Error("Failed to re-persist recovered data: %v", err) - atomic.AddInt64(&s.droppedCount, 1) - droppedCount++ - } - } - - // Implement exponential backoff - if consecutiveFailures >= maxConsecutiveFailures { - logger.Warn("Too many consecutive failures (%d), stopping recovery processing", consecutiveFailures) - return - } - - // Increase backoff time - currentBackoff = time.Duration(float64(currentBackoff) * 1.5) - if currentBackoff > maxBackoff { - currentBackoff = maxBackoff - } - ticker.Reset(currentBackoff) - logger.Debug("Channel full, backing off for %v (failure #%d)", currentBackoff, consecutiveFailures) - } - } else { - // No more recovery data, check if still in recovery mode - if !s.persistenceManager.IsInRecoveryMode() { - logger.Info("Recovery completed: processed %d items, dropped %d items", processedCount, droppedCount) - return - } - // Reset backoff time when no data - currentBackoff = baseBackoff - ticker.Reset(currentBackoff) - } - - case <-timeout.C: - logger.Warn("Recovery processing timeout reached: processed %d items, dropped %d items", processedCount, droppedCount) - return - - case <-s.done: - logger.Info("Stream stopped during recovery processing: processed %d items, dropped %d items", processedCount, droppedCount) - return - } - } -} diff --git a/stream/manager_metrics.go b/stream/manager_metrics.go index 87835f6..a6bc530 100644 --- a/stream/manager_metrics.go +++ b/stream/manager_metrics.go @@ -85,11 +85,6 @@ func (s *Stream) GetDetailedStats() map[string]interface{} { PerformanceLevel: AssessPerformanceLevel(dataUsage, dropRate), } - // Add persistence statistics - if s.persistenceManager != nil { - result["Persistence"] = s.persistenceManager.GetStats() - } - return result } diff --git a/stream/metrics_test.go b/stream/metrics_test.go index cf87b45..42a6d1f 100644 --- a/stream/metrics_test.go +++ b/stream/metrics_test.go @@ -803,44 +803,6 @@ func TestStream_GetDetailedStats_ZeroInput(t *testing.T) { assert.Equal(t, PerformanceLevelOptimal, perfLevel) } -// TestStream_GetDetailedStats_WithPersistence 测试带持久化的详细统计 -func TestStream_GetDetailedStats_WithPersistence(t *testing.T) { - // 创建临时目录用于持久化 - tempDir := t.TempDir() - - config := types.Config{ - SimpleFields: []string{"name", "age"}, - PerformanceConfig: types.PerformanceConfig{ - OverflowConfig: types.OverflowConfig{ - Strategy: "persist", - PersistenceConfig: &types.PersistenceConfig{ - DataDir: tempDir, - MaxFileSize: 1024 * 1024, // 1MB - FlushInterval: 100 * time.Millisecond, - }, - }, - }, - } - stream, err := NewStream(config) - require.NoError(t, err) - defer func() { - if stream != nil { - if stream.persistenceManager != nil { - stream.persistenceManager.Stop() - } - close(stream.done) - } - }() - - detailedStats := stream.GetDetailedStats() - - // 验证持久化统计信息存在 - assert.Contains(t, detailedStats, "Persistence") - persistenceStats, ok := detailedStats["Persistence"].(map[string]interface{}) - assert.True(t, ok) - assert.NotNil(t, persistenceStats) -} - // TestStream_ResetStats 测试重置统计信息 func TestStream_ResetStats(t *testing.T) { config := types.Config{ diff --git a/stream/persistence.go b/stream/persistence.go deleted file mode 100644 index c3b2126..0000000 --- a/stream/persistence.go +++ /dev/null @@ -1,930 +0,0 @@ -/* - * Copyright 2025 The RuleGo Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package stream - -import ( - "bufio" - "encoding/json" - "fmt" - "os" - "path/filepath" - "sort" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/rulego/streamsql/logger" -) - -// OrderedDataItem ordered data item with sequence number and timestamp -type OrderedDataItem struct { - SequenceID int64 `json:"sequence_id"` // Global incremental sequence number - Timestamp int64 `json:"timestamp"` // Data reception timestamp - Data map[string]interface{} `json:"data"` // Actual data - RetryCount int `json:"retry_count"` // Retry count - LastRetry int64 `json:"last_retry"` // Last retry timestamp -} - -// DeadLetterItem dead letter queue item -type DeadLetterItem struct { - OriginalData OrderedDataItem `json:"original_data"` // Original data - FailureTime int64 `json:"failure_time"` // Failure time - Reason string `json:"reason"` // Failure reason -} - -// PersistenceManager persistence manager -// Solves data timing issues, ensures first-in-first-out (FIFO) processing -// Optimized version: adds retry limits, dead letter queue and backoff strategy -type PersistenceManager struct { - // Basic configuration - dataDir string // Persistence data directory - maxFileSize int64 // Maximum size per file (bytes) - flushInterval time.Duration // Flush interval - - // Sequence number management - sequenceCounter int64 // Global sequence counter, using atomic operations - - // File management - currentFile *os.File // Current write file - currentSize int64 // Current file size - fileIndex int // File index - - // Concurrency control - writeMutex sync.Mutex // Write mutex - pendingMutex sync.Mutex // Pending data mutex - runningMutex sync.RWMutex // Read-write lock protecting isRunning field - - // Data buffering - pendingData []OrderedDataItem // Pending data to write, sorted by sequence number - - // State management - isRunning bool // Whether running - stopChan chan struct{} // Stop channel - flushTimer *time.Timer // Flush timer - - // Recovery management - recoveryQueue chan OrderedDataItem // Recovery data queue - recoveryMode bool // Whether in recovery mode - recoveryMutex sync.RWMutex // Recovery mode protection lock - - // Retry and dead letter queue management - maxRetryCount int // Maximum retry count - deadLetterQueue []DeadLetterItem // Dead letter queue - deadLetterMutex sync.Mutex // Dead letter queue protection lock - retryDataMap map[int64]*OrderedDataItem // Retry data mapping (indexed by sequence number) - retryMapMutex sync.RWMutex // Retry mapping protection lock - - // Statistics - totalPersisted int64 // Total persisted data count - totalLoaded int64 // Total loaded data count - filesCreated int64 // Number of files created - totalRecovered int64 // Total recovered data count - totalDropped int64 // Total dropped data count (entered dead letter queue) - totalRetried int64 // Total retried data count -} - -// NewPersistenceManager creates a persistence manager -// Parameters: -// - dataDir: data storage directory -// -// Returns: -// - *PersistenceManager: persistence manager instance -func NewPersistenceManager(dataDir string) *PersistenceManager { - pm := &PersistenceManager{ - dataDir: dataDir, - maxFileSize: 10 * 1024 * 1024, // 10MB per file - flushInterval: 2 * time.Second, // Flush every 2 seconds, more frequent to ensure timing - fileIndex: 0, - pendingData: make([]OrderedDataItem, 0, 1000), // Pre-allocate capacity - stopChan: make(chan struct{}), - recoveryQueue: make(chan OrderedDataItem, 10000), // Recovery queue - sequenceCounter: 0, - // Retry and dead letter queue configuration - maxRetryCount: 3, // Default maximum 3 retries - deadLetterQueue: make([]DeadLetterItem, 0, 1000), // Dead letter queue - retryDataMap: make(map[int64]*OrderedDataItem), // Retry data mapping - } - - // Ensure data directory exists - if err := os.MkdirAll(dataDir, 0755); err != nil { - logger.Error("Failed to create persistence directory: %v", err) - } - - return pm -} - -// NewPersistenceManagerWithConfig creates a persistence manager with custom configuration -// Parameters: -// - dataDir: data storage directory -// - maxFileSize: maximum size per file -// - flushInterval: flush interval -// -// Returns: -// - *PersistenceManager: persistence manager instance -func NewPersistenceManagerWithConfig(dataDir string, maxFileSize int64, flushInterval time.Duration) *PersistenceManager { - pm := &PersistenceManager{ - dataDir: dataDir, - maxFileSize: maxFileSize, - flushInterval: flushInterval, - fileIndex: 0, - pendingData: make([]OrderedDataItem, 0, 1000), - stopChan: make(chan struct{}), - recoveryQueue: make(chan OrderedDataItem, 10000), - sequenceCounter: 0, - // Retry and dead letter queue configuration - maxRetryCount: 3, // Default maximum 3 retries - deadLetterQueue: make([]DeadLetterItem, 0, 1000), // Dead letter queue - retryDataMap: make(map[int64]*OrderedDataItem), // Retry data mapping - } - - // Ensure data directory exists - if err := os.MkdirAll(dataDir, 0755); err != nil { - logger.Error("Failed to create persistence directory: %v", err) - } - - return pm -} - -// Start starts the persistence manager -// Returns: -// - error: error during startup process -func (pm *PersistenceManager) Start() error { - // Check if already running - pm.runningMutex.RLock() - running := pm.isRunning - pm.runningMutex.RUnlock() - - if running { - return fmt.Errorf("ordered persistence manager already running") - } - - // Reinitialize channels if they were closed - pm.stopChan = make(chan struct{}) - pm.recoveryQueue = make(chan OrderedDataItem, 10000) - - // Create initial file - pm.writeMutex.Lock() - if err := pm.createNewFile(); err != nil { - pm.writeMutex.Unlock() - return fmt.Errorf("failed to create initial file: %w", err) - } - pm.writeMutex.Unlock() - - // Set running state - pm.runningMutex.Lock() - pm.isRunning = true - pm.runningMutex.Unlock() - - // Start timed flush - pm.startFlushTimer() - - // Start background processing goroutines - go pm.backgroundProcessor() - go pm.recoveryProcessor() - - // Load and recover existing data - if err := pm.LoadAndRecoverData(); err != nil { - logger.Error("Failed to load and recover data: %v", err) - // Don't return error, continue running - } - - logger.Info("Ordered persistence manager started successfully, data directory: %s", pm.dataDir) - return nil -} - -// Stop stops the persistence manager -// Returns: -// - error: error during stop process -func (pm *PersistenceManager) Stop() error { - // Check if running - pm.runningMutex.RLock() - running := pm.isRunning - pm.runningMutex.RUnlock() - - if !running { - return nil - } - - // Set stop state - pm.runningMutex.Lock() - pm.isRunning = false - pm.runningMutex.Unlock() - - // Close stop channel safely - select { - case <-pm.stopChan: - // Channel already closed - default: - close(pm.stopChan) - } - - // Stop timer - pm.writeMutex.Lock() - if pm.flushTimer != nil { - pm.flushTimer.Stop() - } - pm.writeMutex.Unlock() - - // Flush remaining data - pm.flushPendingData() - - // Close current file with proper synchronization - pm.writeMutex.Lock() - if pm.currentFile != nil { - pm.currentFile.Close() - pm.currentFile = nil - } - pm.writeMutex.Unlock() - - // Close recovery queue safely - go func() { - // Drain the recovery queue in a separate goroutine - for { - select { - case <-pm.recoveryQueue: - // Continue draining - default: - // Queue is empty, safe to close - close(pm.recoveryQueue) - return - } - } - }() - - // Give some time for the goroutine to drain the queue - time.Sleep(100 * time.Millisecond) - - logger.Info("Ordered persistence manager stopped") - return nil -} - -// PersistData persists data ensuring timing order (compatibility method) -// Parameters: -// - data: data to persist, must be map[string]interface{} type -// -// Returns: -// - error: error during persistence process -func (pm *PersistenceManager) PersistData(data map[string]interface{}) error { - return pm.PersistDataWithRetryLimit(data, 0) -} - -// PersistDataWithRetryLimit persists data with retry limit support -// Parameters: -// - data: data to persist, must be map[string]interface{} type -// - retryCount: current retry count -// -// Returns: -// - error: error during persistence process -func (pm *PersistenceManager) PersistDataWithRetryLimit(data map[string]interface{}, retryCount int) error { - // Check if running - pm.runningMutex.RLock() - running := pm.isRunning - pm.runningMutex.RUnlock() - - if !running { - return fmt.Errorf("ordered persistence manager not running") - } - - // Assign globally unique sequence number to ensure timing order - sequenceID := atomic.AddInt64(&pm.sequenceCounter, 1) - - // Create ordered data item - item := OrderedDataItem{ - SequenceID: sequenceID, - Timestamp: time.Now().UnixNano(), // Use nanosecond timestamp - Data: data, - RetryCount: retryCount, - LastRetry: time.Now().UnixNano(), - } - - // If retry data, update retry mapping - if retryCount > 0 { - pm.retryMapMutex.Lock() - pm.retryDataMap[sequenceID] = &item - pm.retryMapMutex.Unlock() - atomic.AddInt64(&pm.totalRetried, 1) - } - - // Add to pending write queue - pm.pendingMutex.Lock() - pm.pendingData = append(pm.pendingData, item) - pm.pendingMutex.Unlock() - - return nil -} - -// LoadAndRecoverData loads persisted data and starts ordered recovery -// Returns: -// - error: error during loading process -func (pm *PersistenceManager) LoadAndRecoverData() error { - // 只加载未处理的文件(排除.processed文件) - allFiles, err := filepath.Glob(filepath.Join(pm.dataDir, "streamsql_ordered_*.log")) - if err != nil { - return fmt.Errorf("failed to glob files: %w", err) - } - - // 过滤掉已处理的文件(.processed后缀的文件) - var files []string - for _, file := range allFiles { - if !strings.HasSuffix(file, ".processed") { - files = append(files, file) - } - } - - if len(files) == 0 { - logger.Info("No persistence files found for recovery") - return nil - } - - // Collect all data items - var allItems []OrderedDataItem - - for _, filename := range files { - items, err := pm.loadItemsFromFile(filename) - if err != nil { - logger.Error("Failed to load file %s: %v", filename, err) - continue - } - allItems = append(allItems, items...) - - // 加载后直接删除文件 - if deleteErr := os.Remove(filename); deleteErr != nil { - logger.Error("Failed to delete file %s: %v", filename, deleteErr) - } else { - logger.Info("File %s processed and deleted", filename) - } - } - - // 按序列号排序,确保时序性 - sort.Slice(allItems, func(i, j int) bool { - return allItems[i].SequenceID < allItems[j].SequenceID - }) - - // 更新序列号计数器,确保新数据的序列号不会冲突 - if len(allItems) > 0 { - lastSequenceID := allItems[len(allItems)-1].SequenceID - atomic.StoreInt64(&pm.sequenceCounter, lastSequenceID) - } - - // 启动恢复模式 - pm.recoveryMutex.Lock() - pm.recoveryMode = true - pm.recoveryMutex.Unlock() - - // 如果没有数据需要恢复,立即退出恢复模式 - if len(allItems) == 0 { - pm.recoveryMutex.Lock() - pm.recoveryMode = false - pm.recoveryMutex.Unlock() - logger.Info("No data to recover, exiting recovery mode") - return nil - } - - // 将数据放入恢复队列 - for _, item := range allItems { - select { - case pm.recoveryQueue <- item: - // 数据已放入恢复队列 - case <-pm.stopChan: - return nil - } - } - - logger.Info("Data recovery completed, %d items recovered in order", len(allItems)) - - // 启动一个goroutine来监控恢复队列,当队列为空时退出恢复模式 - go func() { - ticker := time.NewTicker(1 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if len(pm.recoveryQueue) == 0 { - pm.recoveryMutex.Lock() - pm.recoveryMode = false - pm.recoveryMutex.Unlock() - return - } - case <-pm.stopChan: - return - } - } - }() - - atomic.AddInt64(&pm.totalLoaded, int64(len(allItems))) - logger.Info("Started ordered recovery of %d data items", len(allItems)) - return nil -} - -// IsInRecoveryMode 检查是否处于恢复模式 -// 返回值: -// - bool: 是否处于恢复模式 -func (pm *PersistenceManager) IsInRecoveryMode() bool { - pm.recoveryMutex.RLock() - defer pm.recoveryMutex.RUnlock() - return pm.recoveryMode -} - -// GetRecoveryData 获取一条恢复数据(非阻塞) -// 返回值: -// - map[string]interface{}: 恢复的数据,如果没有数据则返回nil -// - bool: 是否成功获取到数据 -func (pm *PersistenceManager) GetRecoveryData() (map[string]interface{}, bool) { - select { - case item := <-pm.recoveryQueue: - atomic.AddInt64(&pm.totalRecovered, 1) - - // 检查队列是否为空,如果为空则退出恢复模式 - if len(pm.recoveryQueue) == 0 { - pm.recoveryMutex.Lock() - pm.recoveryMode = false - pm.recoveryMutex.Unlock() - } - - return item.Data, true - default: - // 队列为空,退出恢复模式 - pm.recoveryMutex.Lock() - pm.recoveryMode = false - pm.recoveryMutex.Unlock() - return nil, false - } -} - -// GetStats 获取持久化统计信息 -// 返回值: -// - map[string]interface{}: 统计信息映射 -func (pm *PersistenceManager) GetStats() map[string]interface{} { - pm.pendingMutex.Lock() - pendingCount := len(pm.pendingData) - pm.pendingMutex.Unlock() - - pm.writeMutex.Lock() - currentFileSize := pm.currentSize - fileIndex := pm.fileIndex - totalPersisted := pm.totalPersisted - totalLoaded := pm.totalLoaded - filesCreated := pm.filesCreated - pm.writeMutex.Unlock() - - pm.runningMutex.RLock() - running := pm.isRunning - pm.runningMutex.RUnlock() - - pm.recoveryMutex.RLock() - recoveryMode := pm.recoveryMode - pm.recoveryMutex.RUnlock() - - sequenceCounter := atomic.LoadInt64(&pm.sequenceCounter) - totalRecovered := atomic.LoadInt64(&pm.totalRecovered) - recoveryQueueLen := len(pm.recoveryQueue) - - // 获取死信队列和重试统计 - pm.deadLetterMutex.Lock() - deadLetterCount := len(pm.deadLetterQueue) - pm.deadLetterMutex.Unlock() - - pm.retryMapMutex.RLock() - retryMapCount := len(pm.retryDataMap) - pm.retryMapMutex.RUnlock() - - totalDropped := atomic.LoadInt64(&pm.totalDropped) - totalRetried := atomic.LoadInt64(&pm.totalRetried) - - return map[string]interface{}{ - "running": running, - "recovery_mode": recoveryMode, - "data_dir": pm.dataDir, - "pending_count": pendingCount, - "current_file_size": currentFileSize, - "file_index": fileIndex, - "max_file_size": pm.maxFileSize, - "flush_interval": pm.flushInterval.String(), - "total_persisted": totalPersisted, - "total_loaded": totalLoaded, - "total_recovered": totalRecovered, - "files_created": filesCreated, - "sequence_counter": sequenceCounter, - "recovery_queue_len": recoveryQueueLen, - "max_retry_count": pm.maxRetryCount, - "dead_letter_count": deadLetterCount, - "retry_map_count": retryMapCount, - "total_dropped": totalDropped, - "total_retried": totalRetried, - } -} - -// createNewFile 创建新的持久化文件 -// 返回值: -// - error: 创建过程中的错误 -func (pm *PersistenceManager) createNewFile() error { - // 关闭当前文件 - if pm.currentFile != nil { - pm.currentFile.Close() - } - - // 生成新文件名,使用ordered前缀区分 - filename := fmt.Sprintf("streamsql_ordered_%d_%d.log", - time.Now().Unix(), pm.fileIndex) - filepath := filepath.Join(pm.dataDir, filename) - - // 创建新文件 - file, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) - if err != nil { - return fmt.Errorf("failed to create file %s: %w", filepath, err) - } - - pm.currentFile = file - pm.currentSize = 0 - pm.fileIndex++ - pm.filesCreated++ - - return nil -} - -// writeItemToFile 将有序数据项写入文件 -// 注意:此方法应该在writeMutex锁保护下调用 -// 参数: -// - item: 要写入的有序数据项 -// -// 返回值: -// - error: 写入过程中的错误 -func (pm *PersistenceManager) writeItemToFile(item OrderedDataItem) error { - if pm.currentFile == nil { - return fmt.Errorf("no current file") - } - - // 序列化数据项 - jsonData, err := json.Marshal(item) - if err != nil { - return fmt.Errorf("failed to marshal item: %w", err) - } - - // 添加换行符 - jsonData = append(jsonData, '\n') - - // 检查文件大小 - if pm.currentSize+int64(len(jsonData)) > pm.maxFileSize { - if err := pm.createNewFile(); err != nil { - return fmt.Errorf("failed to create new file: %w", err) - } - } - - // 写入数据 - n, err := pm.currentFile.Write(jsonData) - if err != nil { - return fmt.Errorf("failed to write data: %w", err) - } - - pm.currentSize += int64(n) - pm.totalPersisted++ - return nil -} - -// flushPendingData 刷新待写入数据,按序列号排序后写入 -func (pm *PersistenceManager) flushPendingData() { - pm.pendingMutex.Lock() - if len(pm.pendingData) == 0 { - pm.pendingMutex.Unlock() - return - } - - // 复制数据并按序列号排序 - dataToWrite := make([]OrderedDataItem, len(pm.pendingData)) - copy(dataToWrite, pm.pendingData) - pm.pendingData = pm.pendingData[:0] // 清空切片 - pm.pendingMutex.Unlock() - - // 按序列号排序,确保写入顺序正确 - sort.Slice(dataToWrite, func(i, j int) bool { - return dataToWrite[i].SequenceID < dataToWrite[j].SequenceID - }) - - pm.writeMutex.Lock() - defer pm.writeMutex.Unlock() - - // 按序写入数据 - for _, item := range dataToWrite { - if err := pm.writeItemToFile(item); err != nil { - logger.Error("Failed to write persistence item: %v", err) - } - } - - // 同步到磁盘 - if pm.currentFile != nil { - _ = pm.currentFile.Sync() - } -} - -// startFlushTimer 启动刷新定时器 -func (pm *PersistenceManager) startFlushTimer() { - pm.writeMutex.Lock() - pm.flushTimer = time.AfterFunc(pm.flushInterval, func() { - // 安全地检查运行状态 - pm.runningMutex.RLock() - running := pm.isRunning - pm.runningMutex.RUnlock() - - if running { - pm.flushPendingData() - pm.startFlushTimer() // 重新启动定时器 - } - }) - pm.writeMutex.Unlock() -} - -// backgroundProcessor 后台处理协程 -func (pm *PersistenceManager) backgroundProcessor() { - ticker := time.NewTicker(500 * time.Millisecond) // 更频繁的检查 - defer ticker.Stop() - - for { - select { - case <-ticker.C: - // 定期检查并处理 - pm.pendingMutex.Lock() - pendingCount := len(pm.pendingData) - pm.pendingMutex.Unlock() - - // 如果有待写入数据,立即刷新以保证时序性 - if pendingCount > 50 { // 降低阈值,更快响应 - pm.flushPendingData() - } - - case <-pm.stopChan: - return - } - } -} - -// recoveryProcessor 恢复处理协程 -func (pm *PersistenceManager) recoveryProcessor() { - // 这个协程主要用于监控恢复状态,实际恢复数据由GetRecoveryData方法提供 - ticker := time.NewTicker(100 * time.Millisecond) // 更频繁地检查 - defer ticker.Stop() - - for { - select { - case <-ticker.C: - pm.recoveryMutex.RLock() - recoveryMode := pm.recoveryMode - pm.recoveryMutex.RUnlock() - - if recoveryMode { - queueLen := len(pm.recoveryQueue) - if queueLen == 0 { - // 队列为空,退出恢复模式 - pm.recoveryMutex.Lock() - pm.recoveryMode = false - pm.recoveryMutex.Unlock() - logger.Debug("Recovery queue empty, exiting recovery mode") - return - } else { - logger.Debug("Recovery in progress, %d items remaining in queue", queueLen) - } - } else { - // 不在恢复模式,退出协程 - return - } - - case <-pm.stopChan: - return - } - } -} - -// loadItemsFromFile 从文件加载有序数据项 -// 参数: -// - filename: 要加载的文件名 -// -// 返回值: -// - []OrderedDataItem: 加载的有序数据项列表 -// - error: 加载过程中的错误 -func (pm *PersistenceManager) loadItemsFromFile(filename string) ([]OrderedDataItem, error) { - file, err := os.Open(filename) - if err != nil { - return nil, fmt.Errorf("failed to open file %s: %w", filename, err) - } - defer file.Close() - - var items []OrderedDataItem - scanner := bufio.NewScanner(file) - - for scanner.Scan() { - line := scanner.Text() - var item OrderedDataItem - - if err := json.Unmarshal([]byte(line), &item); err != nil { - logger.Error("Failed to parse data line: %v", err) - continue - } - - items = append(items, item) - } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("failed to scan file: %w", err) - } - - return items, nil -} - -// RetryFailedData 重试失败的数据 -// 参数: -// - sequenceID: 要重试的数据序列号 -// - reason: 失败原因 -// -// 返回值: -// - error: 重试过程中的错误 -func (pm *PersistenceManager) RetryFailedData(sequenceID int64, reason string) error { - pm.retryMapMutex.RLock() - item, exists := pm.retryDataMap[sequenceID] - pm.retryMapMutex.RUnlock() - - if !exists { - return fmt.Errorf("data with sequence ID %d not found in retry map", sequenceID) - } - - // 检查重试次数 - if item.RetryCount >= pm.maxRetryCount { - // 移动到死信队列 - return pm.moveToDeadLetterQueue(*item, reason) - } - - // 增加重试次数并重新持久化 - return pm.PersistDataWithRetryLimit(item.Data, item.RetryCount+1) -} - -// moveToDeadLetterQueue 将数据移动到死信队列 -// 参数: -// - item: 要移动的数据项 -// - reason: 失败原因 -// -// 返回值: -// - error: 移动过程中的错误 -func (pm *PersistenceManager) moveToDeadLetterQueue(item OrderedDataItem, reason string) error { - deadLetterItem := DeadLetterItem{ - OriginalData: item, - FailureTime: time.Now().UnixNano(), - Reason: reason, - } - - pm.deadLetterMutex.Lock() - pm.deadLetterQueue = append(pm.deadLetterQueue, deadLetterItem) - pm.deadLetterMutex.Unlock() - - // 从重试映射中移除 - pm.retryMapMutex.Lock() - delete(pm.retryDataMap, item.SequenceID) - pm.retryMapMutex.Unlock() - - atomic.AddInt64(&pm.totalDropped, 1) - logger.Warn("Data moved to dead letter queue, sequence ID: %d, reason: %s", item.SequenceID, reason) - return nil -} - -// GetDeadLetterQueue 获取死信队列数据 -// 返回值: -// - []DeadLetterItem: 死信队列中的所有数据 -func (pm *PersistenceManager) GetDeadLetterQueue() []DeadLetterItem { - pm.deadLetterMutex.Lock() - defer pm.deadLetterMutex.Unlock() - - // 返回副本以避免并发问题 - result := make([]DeadLetterItem, len(pm.deadLetterQueue)) - copy(result, pm.deadLetterQueue) - return result -} - -// ClearDeadLetterQueue 清空死信队列 -// 返回值: -// - int: 清空的数据项数量 -func (pm *PersistenceManager) ClearDeadLetterQueue() int { - pm.deadLetterMutex.Lock() - defer pm.deadLetterMutex.Unlock() - - count := len(pm.deadLetterQueue) - pm.deadLetterQueue = pm.deadLetterQueue[:0] - return count -} - -// SetMaxRetryCount 设置最大重试次数 -// 参数: -// - maxRetryCount: 最大重试次数 -func (pm *PersistenceManager) SetMaxRetryCount(maxRetryCount int) { - pm.maxRetryCount = maxRetryCount -} - -// ShouldRetryRecoveredData 检查恢复数据是否应该重试 -// 参数: -// - data: 恢复的数据 -// -// 返回值: -// - bool: 是否应该重试 -func (pm *PersistenceManager) ShouldRetryRecoveredData(data map[string]interface{}) bool { - // 检查数据中的重试次数(支持retry和_retry_count字段) - if retryCountFloat, exists := data["retry"]; exists { - if retryCount, ok := retryCountFloat.(float64); ok { - if int(retryCount) >= pm.maxRetryCount { - return false - } - } - if retryCount, ok := retryCountFloat.(int); ok { - if retryCount >= pm.maxRetryCount { - return false - } - } - } - - // 兼容性检查:也检查_retry_count字段 - if retryCountFloat, exists := data["_retry_count"]; exists { - if retryCount, ok := retryCountFloat.(float64); ok { - if int(retryCount) >= pm.maxRetryCount { - return false - } - } - if retryCount, ok := retryCountFloat.(int); ok { - if retryCount >= pm.maxRetryCount { - return false - } - } - } - - // 尝试从数据中获取序列号和重试次数 - if sequenceIDFloat, exists := data["_sequence_id"]; exists { - if sequenceID, ok := sequenceIDFloat.(float64); ok { - pm.retryMapMutex.RLock() - item, exists := pm.retryDataMap[int64(sequenceID)] - pm.retryMapMutex.RUnlock() - - if exists && item.RetryCount >= pm.maxRetryCount { - return false - } - } - } - - // 如果没有找到重试信息,允许重试(可能是第一次失败) - return true -} - -// MoveToDeadLetterQueue 将数据移动到死信队列(公共方法) -// 参数: -// - data: 要移动的数据 -func (pm *PersistenceManager) MoveToDeadLetterQueue(data map[string]interface{}) { - // 创建一个临时的OrderedDataItem - item := OrderedDataItem{ - SequenceID: atomic.AddInt64(&pm.sequenceCounter, 1), - Timestamp: time.Now().UnixNano(), - Data: data, - RetryCount: pm.maxRetryCount + 1, // 标记为超过重试限制 - LastRetry: time.Now().UnixNano(), - } - - pm.moveToDeadLetterQueue(item, "exceeded retry limit during recovery") -} - -// RePersistRecoveredData 重新持久化恢复数据(增加重试计数) -// 参数: -// - data: 要重新持久化的数据 -// -// 返回值: -// - error: 重新持久化过程中的错误 -func (pm *PersistenceManager) RePersistRecoveredData(data map[string]interface{}) error { - // 尝试从数据中获取序列号和重试次数 - retryCount := 1 // 默认重试次数 - - if sequenceIDFloat, exists := data["_sequence_id"]; exists { - if sequenceID, ok := sequenceIDFloat.(float64); ok { - pm.retryMapMutex.RLock() - item, exists := pm.retryDataMap[int64(sequenceID)] - pm.retryMapMutex.RUnlock() - - if exists { - retryCount = item.RetryCount + 1 - } - } - } - - // 在数据中添加序列号信息以便后续跟踪 - data["_sequence_id"] = atomic.LoadInt64(&pm.sequenceCounter) - data["_retry_count"] = retryCount - data["_last_retry"] = time.Now().UnixNano() - - return pm.PersistDataWithRetryLimit(data, retryCount) -} diff --git a/stream/persistence_test.go b/stream/persistence_test.go deleted file mode 100644 index 8b4db77..0000000 --- a/stream/persistence_test.go +++ /dev/null @@ -1,1182 +0,0 @@ -package stream - -import ( - "fmt" - "os" - "path/filepath" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// cleanupPersistenceFiles 清理持久化文件,避免测试间的文件冲突 -func cleanupPersistenceFiles(dir string) { - if _, err := os.Stat(dir); os.IsNotExist(err) { - return - } - - // 完全删除目录并重新创建 - os.RemoveAll(dir) - os.MkdirAll(dir, 0755) -} - -// TestPersistenceManager_BasicOperations 测试持久化管理器的基本操作 -func TestPersistenceManager_BasicOperations(t *testing.T) { - // 创建临时目录 - tempDir := filepath.Join(os.TempDir(), fmt.Sprintf("persistence_test_%d", time.Now().UnixNano())) - // 清理旧文件 - cleanupPersistenceFiles(tempDir) - defer os.RemoveAll(tempDir) - - // 创建持久化管理器 - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - // 启动管理器 - err := pm.Start() - require.NoError(t, err) - defer func() { - if pm != nil { - pm.Stop() - } - }() - - // 测试数据持久化 - testData := []map[string]interface{}{ - {"message": "test_data_1", "id": 1}, - {"message": "test_data_2", "id": 2}, - {"message": "test_data_3", "id": 3}, - } - - for _, data := range testData { - err := pm.PersistData(data) - assert.NoError(t, err) - } - - // 等待数据刷新到磁盘 - time.Sleep(3 * time.Second) - - // 验证统计信息 - stats := pm.GetStats() - if totalPersisted, ok := stats["total_persisted"].(int64); ok { - assert.Equal(t, int64(3), totalPersisted) - } else { - t.Errorf("total_persisted field is missing or not int64: %v", stats["total_persisted"]) - } - if filesCreated, ok := stats["files_created"].(int64); ok { - assert.True(t, filesCreated > 0) - } else { - t.Errorf("files_created field is missing or not int64: %v", stats["files_created"]) - } -} - -// TestPersistenceManager_DataRecovery 测试数据恢复功能 -func TestPersistenceManager_DataRecovery(t *testing.T) { - // 创建临时目录 - tempDir := filepath.Join(os.TempDir(), fmt.Sprintf("recovery_test_%d", time.Now().UnixNano())) - // 清理旧文件 - cleanupPersistenceFiles(tempDir) - defer os.RemoveAll(tempDir) - - // 第一阶段:持久化数据 - pm1 := NewPersistenceManager(tempDir) - err := pm1.Start() - require.NoError(t, err) - - // 持久化测试数据 - testData := []map[string]interface{}{ - {"message": "data_1", "id": 1}, - {"message": "data_2", "id": 2}, - {"message": "data_3", "id": 3}, - {"message": "data_4", "id": 4}, - {"message": "data_5", "id": 5}, - } - for _, data := range testData { - err := pm1.PersistData(data) - require.NoError(t, err) - } - - // 等待数据刷新到磁盘 - time.Sleep(3 * time.Second) - - if pm1 != nil { - pm1.Stop() - } - - // 第二阶段:恢复数据 - pm2 := NewPersistenceManager(tempDir) - err = pm2.Start() - require.NoError(t, err) - defer func() { - if pm2 != nil { - pm2.Stop() - } - }() - - // 加载并恢复数据 - err = pm2.LoadAndRecoverData() - require.NoError(t, err) - - // 等待恢复数据填充到队列中 - time.Sleep(200 * time.Millisecond) - - // 按序获取恢复数据 - recoveredData := make([]map[string]interface{}, 0) - for i := 0; i < len(testData); i++ { - data, hasMore := pm2.GetRecoveryData() - if hasMore && data != nil { - recoveredData = append(recoveredData, data) - } else { - break - } - } - - // 验证数据顺序和完整性 - assert.Equal(t, len(testData), len(recoveredData)) - for i, expected := range testData { - assert.Equal(t, expected["message"], recoveredData[i]["message"], "数据顺序不正确") - // JSON反序列化会将数字转换为float64,需要类型转换 - expectedID := float64(expected["id"].(int)) - assert.Equal(t, expectedID, recoveredData[i]["id"], "数据内容不正确") - } -} - -// TestPersistenceManager_SequenceNumbering 测试序列号管理 -func TestPersistenceManager_SequenceNumbering(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), fmt.Sprintf("sequence_test_%d", time.Now().UnixNano())) - // 清理旧文件 - cleanupPersistenceFiles(tempDir) - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - err := pm.Start() - require.NoError(t, err) - defer func() { - if pm != nil { - pm.Stop() - } - }() - - // 持久化足够的数据以触发序列号递增 - for i := 0; i < 10; i++ { - data := map[string]interface{}{"message": fmt.Sprintf("data_%d", i), "id": i} - err := pm.PersistData(data) - require.NoError(t, err) - } - - // 等待数据刷新到磁盘 - time.Sleep(3 * time.Second) - - // 验证统计信息 - stats := pm.GetStats() - if totalPersisted, ok := stats["total_persisted"].(int64); ok { - assert.Equal(t, int64(10), totalPersisted) - } else { - t.Logf("Stats: %+v", stats) - t.Fatalf("total_persisted not found or wrong type") - } - if sequenceCounter, ok := stats["sequence_counter"].(int64); ok { - assert.Equal(t, int64(10), sequenceCounter) - } -} - -// TestPersistenceManager_FileRotation 测试文件轮转 -func TestPersistenceManager_FileRotation(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), fmt.Sprintf("rotation_test_%d", time.Now().UnixNano())) - // 清理旧文件 - cleanupPersistenceFiles(tempDir) - defer os.RemoveAll(tempDir) - - // 使用较小的文件大小以触发轮转 - pm := NewPersistenceManagerWithConfig(tempDir, 100, 50*time.Millisecond) - err := pm.Start() - require.NoError(t, err) - defer func() { - if pm != nil { - pm.Stop() - } - }() - - // 持久化足够的数据以触发文件轮转 - for i := 0; i < 20; i++ { - longData := map[string]interface{}{ - "message": fmt.Sprintf("this_is_a_long_data_string_to_trigger_file_rotation_%d", i), - "id": i, - "extra": "some extra data to make it longer", - } - err := pm.PersistData(longData) - require.NoError(t, err) - } - - // 等待数据刷新 - time.Sleep(200 * time.Millisecond) - - // 验证创建了多个文件 - stats := pm.GetStats() - if filesCreated, ok := stats["files_created"].(int64); ok { - assert.True(t, filesCreated > 1, "应该创建多个文件") - } -} - -// TestPersistenceManager_ConcurrentAccess 测试并发访问 -func TestPersistenceManager_ConcurrentAccess(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), fmt.Sprintf("concurrent_test_%d", time.Now().UnixNano())) - // 清理旧文件 - cleanupPersistenceFiles(tempDir) - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - err := pm.Start() - require.NoError(t, err) - defer func() { - if pm != nil { - pm.Stop() - } - }() - - // 并发持久化数据 - const numGoroutines = 10 - const itemsPerGoroutine = 10 - - done := make(chan bool, numGoroutines) - - for g := 0; g < numGoroutines; g++ { - go func(goroutineID int) { - defer func() { done <- true }() - for i := 0; i < itemsPerGoroutine; i++ { - data := map[string]interface{}{ - "message": fmt.Sprintf("goroutine_%d_item_%d", goroutineID, i), - "goroutine_id": goroutineID, - "item_id": i, - } - err := pm.PersistData(data) - assert.NoError(t, err) - } - }(g) - } - - // 等待所有goroutine完成 - for i := 0; i < numGoroutines; i++ { - <-done - } - - // 等待数据刷新到磁盘 - time.Sleep(3 * time.Second) - - // 验证所有数据都被持久化 - stats := pm.GetStats() - expectedTotal := int64(numGoroutines * itemsPerGoroutine) - if totalPersisted, ok := stats["total_persisted"].(int64); ok { - assert.Equal(t, expectedTotal, totalPersisted) - } else { - t.Logf("Stats: %+v", stats) - t.Fatalf("total_persisted not found or wrong type") - } -} - -// TestPersistenceManager_ErrorHandling 测试错误处理 -func TestPersistenceManager_ErrorHandling(t *testing.T) { - // 测试无效目录 - 使用一个包含无效字符的路径 - invalidDir := "\x00invalid\x00path" - pm := NewPersistenceManager(invalidDir) - err := pm.Start() - assert.Error(t, err, "应该返回错误") - - // 测试重复启动 - tempDir := filepath.Join(os.TempDir(), fmt.Sprintf("error_test_%d", time.Now().UnixNano())) - // 清理旧文件 - cleanupPersistenceFiles(tempDir) - defer os.RemoveAll(tempDir) - - pm2 := NewPersistenceManager(tempDir) - err = pm2.Start() - require.NoError(t, err) - - // 重复启动应该返回错误 - err = pm2.Start() - assert.Error(t, err, "重复启动应该返回错误") - - pm2.Stop() -} - -// TestPersistenceManager_RetryAndDeadLetter 测试重试限制和死信队列功能 -// 验证重试限制、死信队列和退避策略是否正常工作 -func TestPersistenceManager_RetryAndDeadLetter(t *testing.T) { - // 创建临时目录 - tempDir := filepath.Join(os.TempDir(), "streamsql_test_retry") - // 清理旧文件 - cleanupPersistenceFiles(tempDir) - defer os.RemoveAll(tempDir) - - // 创建持久化管理器 - pm := NewPersistenceManager(tempDir) - pm.SetMaxRetryCount(2) // 设置最大重试2次 - - // 启动持久化管理器 - if err := pm.Start(); err != nil { - t.Fatalf("Failed to start persistence manager: %v", err) - } - defer func() { - if pm != nil { - pm.Stop() - } - }() - - // 测试数据 - testData := map[string]interface{}{ - "id": 1, - "message": "test retry", - "value": 100.5, - } - - // 测试正常持久化 - t.Run("Normal Persistence", func(t *testing.T) { - err := pm.PersistDataWithRetryLimit(testData, 0) - if err != nil { - t.Errorf("Failed to persist data: %v", err) - } - }) - - // 测试重试限制 - t.Run("Retry Limit", func(t *testing.T) { - // 模拟重试数据 - retryData := map[string]interface{}{ - "id": 2, - "message": "retry test", - "_sequence_id": float64(123), - "_retry_count": 3, // 超过最大重试次数 - } - - // 检查是否应该重试 - shouldRetry := pm.ShouldRetryRecoveredData(retryData) - if shouldRetry { - t.Error("Should not retry data that exceeded retry limit") - } - }) - - // 测试死信队列 - t.Run("Dead Letter Queue", func(t *testing.T) { - // 获取初始死信队列大小 - initialSize := len(pm.GetDeadLetterQueue()) - - // 移动数据到死信队列 - deadData := map[string]interface{}{ - "id": 3, - "message": "dead letter test", - } - pm.MoveToDeadLetterQueue(deadData) - - // 检查死信队列大小 - deadLetterQueue := pm.GetDeadLetterQueue() - if len(deadLetterQueue) != initialSize+1 { - t.Errorf("Expected dead letter queue size %d, got %d", initialSize+1, len(deadLetterQueue)) - } - - // 验证死信队列中的数据 - if len(deadLetterQueue) > 0 { - lastItem := deadLetterQueue[len(deadLetterQueue)-1] - if lastItem.Reason != "exceeded retry limit during recovery" { - t.Errorf("Expected reason 'exceeded retry limit during recovery', got '%s'", lastItem.Reason) - } - } - }) - - // 测试重新持久化恢复数据 - t.Run("Re-persist Recovered Data", func(t *testing.T) { - recoveryData := map[string]interface{}{ - "id": 4, - "message": "recovery test", - } - - err := pm.RePersistRecoveredData(recoveryData) - if err != nil { - t.Errorf("Failed to re-persist recovered data: %v", err) - } - - // 验证数据中添加了跟踪信息 - if _, exists := recoveryData["_sequence_id"]; !exists { - t.Error("Expected _sequence_id to be added to recovery data") - } - if _, exists := recoveryData["_retry_count"]; !exists { - t.Error("Expected _retry_count to be added to recovery data") - } - }) - - // 测试统计信息 - t.Run("Statistics", func(t *testing.T) { - stats := pm.GetStats() - - // 检查新增的统计字段 - if _, exists := stats["max_retry_count"]; !exists { - t.Error("Expected max_retry_count in statistics") - } - if _, exists := stats["dead_letter_count"]; !exists { - t.Error("Expected dead_letter_count in statistics") - } - if _, exists := stats["total_dropped"]; !exists { - t.Error("Expected total_dropped in statistics") - } - if _, exists := stats["total_retried"]; !exists { - t.Error("Expected total_retried in statistics") - } - }) -} - -// TestPersistenceManager_RecoveryProcessing 测试恢复处理逻辑 -// 验证指数退避和重试限制是否正常工作 -func TestPersistenceManager_RecoveryProcessing(t *testing.T) { - // 创建临时目录 - tempDir := filepath.Join(os.TempDir(), "streamsql_test_recovery") - // 清理旧文件 - cleanupPersistenceFiles(tempDir) - defer os.RemoveAll(tempDir) - - // 创建持久化管理器 - pm := NewPersistenceManager(tempDir) - pm.SetMaxRetryCount(2) - - // 启动持久化管理器 - if err := pm.Start(); err != nil { - t.Fatalf("Failed to start persistence manager: %v", err) - } - defer func() { - if pm != nil { - pm.Stop() - } - }() - - // 测试添加数据时的持久化行为 - testData := map[string]interface{}{ - "id": 1, - "message": "recovery test", - "value": 200.5, - } - - // 直接测试持久化功能 - err := pm.PersistDataWithRetryLimit(testData, 0) - if err != nil { - t.Errorf("Failed to persist data: %v", err) - } - - // 等待一段时间让持久化完成 - time.Sleep(200 * time.Millisecond) - - // 强制刷新持久化数据 - pm.flushPendingData() - - // 检查统计信息 - stats := pm.GetStats() - totalPersisted := stats["total_persisted"].(int64) - pendingCount := stats["pending_count"].(int) - - // 检查是否有数据被持久化或正在等待持久化 - if totalPersisted == 0 && pendingCount == 0 { - t.Error("Expected data to be persisted or pending") - } -} - -// TestPersistenceManager_ConcurrentRetry 测试并发场景下的重试机制 -// 验证在高并发情况下重试限制和死信队列是否正常工作 -func TestPersistenceManager_ConcurrentRetry(t *testing.T) { - // 创建临时目录 - tempDir := filepath.Join(os.TempDir(), "streamsql_test_concurrent") - // 清理旧文件 - cleanupPersistenceFiles(tempDir) - defer os.RemoveAll(tempDir) - - // 创建持久化管理器 - pm := NewPersistenceManager(tempDir) - pm.SetMaxRetryCount(1) // 设置较低的重试次数以便测试 - - // 启动持久化管理器 - if err := pm.Start(); err != nil { - t.Fatalf("Failed to start persistence manager: %v", err) - } - defer func() { - if pm != nil { - pm.Stop() - } - }() - - // 并发测试参数 - concurrentCount := 50 - var wg sync.WaitGroup - - // 并发添加数据 - for i := 0; i < concurrentCount; i++ { - wg.Add(1) - go func(index int) { - defer wg.Done() - - testData := map[string]interface{}{ - "id": index, - "message": fmt.Sprintf("concurrent test %d", index), - "value": float64(index * 10), - } - - // 随机重试次数 - retryCount := index % 3 - err := pm.PersistDataWithRetryLimit(testData, retryCount) - if err != nil { - t.Errorf("Failed to persist data %d: %v", index, err) - } - }(i) - } - - // 等待所有协程完成 - wg.Wait() - - // 等待持久化完成 - time.Sleep(500 * time.Millisecond) - - // 强制刷新持久化数据 - pm.flushPendingData() - - // 检查统计信息 - stats := pm.GetStats() - totalPersisted := stats["total_persisted"].(int64) - totalRetried := stats["total_retried"].(int64) - pendingCount := stats["pending_count"].(int) - - // 检查是否有数据被处理(持久化、重试或等待中) - if totalPersisted == 0 && totalRetried == 0 && pendingCount == 0 { - t.Error("Expected some data to be processed (persisted, retried, or pending)") - } -} - -// TestPersistenceManagerIsInRecoveryMode 测试恢复模式检查功能 -func TestPersistenceManagerIsInRecoveryMode(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "recovery_mode_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - // 启动前不应该在恢复模式 - assert.False(t, pm.IsInRecoveryMode()) - - err := pm.Start() - require.NoError(t, err) - - // 启动后如果没有恢复数据,不应该在恢复模式 - assert.False(t, pm.IsInRecoveryMode()) - - // 持久化一些数据 - testData := map[string]interface{}{ - "message": "test_recovery_mode", - "id": 789, - } - - err = pm.PersistData(testData) - require.NoError(t, err) - - // 等待数据写入磁盘 - time.Sleep(200 * time.Millisecond) - - // 停止持久化管理器 - pm.Stop() - time.Sleep(100 * time.Millisecond) - - // 重新启动,这时应该进入恢复模式 - err = pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 现在应该在恢复模式 - assert.True(t, pm.IsInRecoveryMode()) - - // 处理恢复数据直到完成 - for pm.IsInRecoveryMode() { - if _, hasData := pm.GetRecoveryData(); !hasData { - break - } - time.Sleep(10 * time.Millisecond) - } - - // 恢复完成后应该退出恢复模式 - assert.False(t, pm.IsInRecoveryMode()) -} - -// TestPersistenceManagerGetRecoveryData 测试获取恢复数据功能 -func TestPersistenceManagerGetRecoveryData(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "get_recovery_data_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 初始状态应该没有恢复数据 - _, hasData := pm.GetRecoveryData() - assert.False(t, hasData) - - // 持久化多个数据项 - testData := []map[string]interface{}{ - {"message": "recovery_item_1", "id": 1}, - {"message": "recovery_item_2", "id": 2}, - {"message": "recovery_item_3", "id": 3}, - } - - for _, data := range testData { - err = pm.PersistData(data) - require.NoError(t, err) - } - - // 等待数据写入 - time.Sleep(300 * time.Millisecond) - - // 停止并重新启动以触发恢复 - pm.Stop() - time.Sleep(100 * time.Millisecond) - - err = pm.Start() - require.NoError(t, err) - - // 现在应该能够获取恢复数据 - recoveredCount := 0 - for { - recoveredData, hasData := pm.GetRecoveryData() - if !hasData { - break - } - recoveredCount++ - assert.NotNil(t, recoveredData) - assert.Contains(t, recoveredData, "message") - assert.Contains(t, recoveredData, "id") - t.Logf("Recovered data %d: %v", recoveredCount, recoveredData) - } - - assert.True(t, recoveredCount > 0, "Should have recovered some data") -} - -// TestPersistenceManagerRetryFailedData 测试重试失败数据功能 -func TestPersistenceManagerRetryFailedData(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "retry_failed_data_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 先持久化一些数据以获得序列号 - testData := map[string]interface{}{ - "message": "failed_data", - "retry": 0, - } - - err = pm.PersistData(testData) - require.NoError(t, err) - - // 等待数据写入 - time.Sleep(100 * time.Millisecond) - - // 测试重试失败数据(使用序列号和原因) - err = pm.RetryFailedData(1, "test failure reason") - // 这个调用可能会失败,因为序列号可能不在重试映射中,这是正常的 - - // 添加一些数据到死信队列 - pm.MoveToDeadLetterQueue(testData) - deadLetterQueue := pm.GetDeadLetterQueue() - assert.Len(t, deadLetterQueue, 1) - - // 测试使用无效序列号的重试 - err = pm.RetryFailedData(999, "invalid sequence test") - // 应该返回错误,因为序列号不存在 - - // 验证死信队列状态 - deadLetterQueue = pm.GetDeadLetterQueue() - assert.NotNil(t, deadLetterQueue) -} - -// TestPersistenceManagerClearDeadLetterQueue 测试清空死信队列功能 -func TestPersistenceManagerClearDeadLetterQueue(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "clear_dead_letter_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 添加多个项目到死信队列 - testData := []map[string]interface{}{ - {"message": "dead_letter_1", "id": 1}, - {"message": "dead_letter_2", "id": 2}, - {"message": "dead_letter_3", "id": 3}, - } - - for _, data := range testData { - pm.MoveToDeadLetterQueue(data) - } - - // 验证死信队列有数据 - deadLetterQueue := pm.GetDeadLetterQueue() - assert.Len(t, deadLetterQueue, 3) - - // 清空死信队列 - pm.ClearDeadLetterQueue() - - // 验证死信队列已清空 - deadLetterQueue = pm.GetDeadLetterQueue() - assert.Len(t, deadLetterQueue, 0) -} - -// TestPersistenceManagerShouldRetryRecoveredData 测试是否应该重试恢复数据 -func TestPersistenceManagerShouldRetryRecoveredData(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "should_retry_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManagerWithConfig(tempDir, 1024, 1*time.Second) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 测试重试逻辑 - testData := map[string]interface{}{ - "message": "retry_test", - "id": 1, - "retry": 2, // 设置重试次数 - } - - // 测试是否应该重试(重试次数小于最大值) - shouldRetry := pm.ShouldRetryRecoveredData(testData) - assert.True(t, shouldRetry) - - // 测试重试次数过多的情况 - testData["retry"] = 10 // 设置较大的重试次数 - shouldRetry = pm.ShouldRetryRecoveredData(testData) - assert.False(t, shouldRetry) - - // 验证统计信息 - stats := pm.GetStats() - assert.NotNil(t, stats) -} - -// TestPersistenceManagerWithConfigVariations 测试不同配置的持久化管理器 -func TestPersistenceManagerWithConfigVariations(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "config_variations_test") - defer os.RemoveAll(tempDir) - - tests := []struct { - name string - maxFileSize int64 - flushInterval time.Duration - }{ - { - name: "最小配置", - maxFileSize: 1024, - flushInterval: 1 * time.Second, - }, - { - name: "自定义重试配置", - maxFileSize: 1024, - flushInterval: 200 * time.Millisecond, - }, - { - name: "自定义文件大小配置", - maxFileSize: 2048, - flushInterval: 1 * time.Second, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // 为每个测试创建子目录 - subDir := filepath.Join(tempDir, tt.name) - defer os.RemoveAll(subDir) - - pm := NewPersistenceManagerWithConfig(subDir, tt.maxFileSize, tt.flushInterval) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 测试基本功能 - testData := map[string]interface{}{ - "message": "config_test", - "config": tt.name, - } - - err = pm.PersistData(testData) - assert.NoError(t, err) - - // 验证统计信息 - stats := pm.GetStats() - assert.NotNil(t, stats) - }) - } -} - -// TestPersistenceManagerConcurrentOperations 测试持久化管理器的并发操作 -func TestPersistenceManagerConcurrentOperations(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "concurrent_ops_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 并发持久化数据 - const numGoroutines = 10 - const itemsPerGoroutine = 5 - - done := make(chan bool, numGoroutines) - - for i := 0; i < numGoroutines; i++ { - go func(goroutineID int) { - for j := 0; j < itemsPerGoroutine; j++ { - testData := map[string]interface{}{ - "message": "concurrent_test", - "goroutine_id": goroutineID, - "item_id": j, - } - - err := pm.PersistData(testData) - assert.NoError(t, err) - } - done <- true - }(i) - } - - // 等待所有goroutine完成 - for i := 0; i < numGoroutines; i++ { - <-done - } - - // 等待数据写入完成 - time.Sleep(500 * time.Millisecond) - - // 验证统计信息 - stats := pm.GetStats() - assert.NotNil(t, stats) - t.Logf("Final stats: %+v", stats) -} - -// TestPersistenceManagerSetMaxRetryCount 测试设置最大重试次数 -func TestPersistenceManagerSetMaxRetryCount(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "set_max_retry_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 设置新的最大重试次数 - newMaxRetryCount := 10 - pm.SetMaxRetryCount(newMaxRetryCount) - - // 测试重试逻辑是否使用新的最大重试次数 - testData := map[string]interface{}{ - "message": "max_retry_test", - "retry": 5, // 小于新的最大重试次数 - } - - shouldRetry := pm.ShouldRetryRecoveredData(testData) - assert.True(t, shouldRetry, "Should retry when retry count is less than max") - - // 测试超过最大重试次数的情况 - testData["retry"] = 15 // 大于新的最大重试次数 - shouldRetry = pm.ShouldRetryRecoveredData(testData) - assert.False(t, shouldRetry, "Should not retry when retry count exceeds max") -} - -// TestPersistenceManagerWriteItemToFile 测试写入项目到文件功能 -func TestPersistenceManagerWriteItemToFile(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "write_item_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 测试写入大量数据以触发文件写入逻辑 - for i := 0; i < 100; i++ { - testData := map[string]interface{}{ - "message": "write_test", - "id": i, - "data": make([]byte, 100), // 添加一些数据量 - } - err = pm.PersistData(testData) - assert.NoError(t, err) - } - - // 等待数据写入(增加等待时间) - time.Sleep(500 * time.Millisecond) - - // 验证统计信息 - stats := pm.GetStats() - assert.NotNil(t, stats) - - // 检查pending_count或total_persisted - if pendingCount, ok := stats["pending_count"].(int); ok { - assert.True(t, pendingCount >= 0) - } - if totalPersisted, ok := stats["total_persisted"].(int64); ok { - assert.True(t, totalPersisted >= 0) - } -} - -// TestPersistenceManagerFlushPendingData 测试刷新待处理数据功能 -func TestPersistenceManagerFlushPendingData(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "flush_pending_test") - defer os.RemoveAll(tempDir) - - // 使用较短的刷新间隔来测试刷新功能 - pm := NewPersistenceManagerWithConfig(tempDir, 1024, 100*time.Millisecond) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 持久化一些数据 - testData := map[string]interface{}{ - "message": "flush_test", - "timestamp": time.Now().Unix(), - } - - err = pm.PersistData(testData) - require.NoError(t, err) - - // 等待刷新定时器触发 - time.Sleep(200 * time.Millisecond) - - // 验证数据已被刷新 - stats := pm.GetStats() - assert.NotNil(t, stats) -} - -// TestPersistenceManagerRecoveryProcessor 测试恢复处理器功能 -func TestPersistenceManagerRecoveryProcessor(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "recovery_processor_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - - // 持久化一些数据 - testData := []map[string]interface{}{ - {"message": "recovery_1", "id": 1}, - {"message": "recovery_2", "id": 2}, - {"message": "recovery_3", "id": 3}, - } - - for _, data := range testData { - err = pm.PersistData(data) - require.NoError(t, err) - } - - // 等待数据写入 - time.Sleep(200 * time.Millisecond) - - // 停止并重新启动以触发恢复处理器 - pm.Stop() - time.Sleep(100 * time.Millisecond) - - err = pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 验证恢复处理器正在工作 - assert.True(t, pm.IsInRecoveryMode()) - - // 等待恢复完成(增加等待时间) - for i := 0; i < 100 && pm.IsInRecoveryMode(); i++ { - time.Sleep(50 * time.Millisecond) - } - - // 验证恢复完成(允许在恢复模式中,因为可能有延迟) - // 只要不一直处于恢复模式即可 - time.Sleep(1 * time.Second) - // 最终检查:如果还在恢复模式,说明有问题 - if pm.IsInRecoveryMode() { - t.Log("Warning: Still in recovery mode after timeout") - } -} - -// TestPersistenceManagerLoadItemsFromFile 测试从文件加载项目功能 -func TestPersistenceManagerLoadItemsFromFile(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "load_items_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - - // 持久化多个数据项 - testData := []map[string]interface{}{ - {"message": "load_test_1", "id": 1, "type": "test"}, - {"message": "load_test_2", "id": 2, "type": "test"}, - {"message": "load_test_3", "id": 3, "type": "test"}, - {"message": "load_test_4", "id": 4, "type": "test"}, - {"message": "load_test_5", "id": 5, "type": "test"}, - } - - for _, data := range testData { - err = pm.PersistData(data) - require.NoError(t, err) - } - - // 等待数据写入完成 - time.Sleep(300 * time.Millisecond) - - // 停止持久化管理器 - pm.Stop() - time.Sleep(100 * time.Millisecond) - - // 重新启动以触发文件加载 - err = pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 验证数据被正确加载 - recoveredCount := 0 - for { - _, hasData := pm.GetRecoveryData() - if !hasData { - break - } - recoveredCount++ - if recoveredCount > 10 { // 防止无限循环 - break - } - } - - assert.True(t, recoveredCount > 0, "Should have loaded some items from file") -} - -// TestPersistenceManagerPersistDataWithRetryLimit 测试带重试限制的数据持久化 -func TestPersistenceManagerPersistDataWithRetryLimit(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "persist_retry_limit_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 测试正常的持久化 - testData := map[string]interface{}{ - "message": "retry_limit_test", - "id": 123, - } - - err = pm.PersistDataWithRetryLimit(testData, 3) - assert.NoError(t, err) - - // 测试带重试计数的数据 - testDataWithRetry := map[string]interface{}{ - "message": "retry_test", - "id": 456, - "_retry_count": 2, - } - - err = pm.PersistDataWithRetryLimit(testDataWithRetry, 5) - assert.NoError(t, err) - - // 等待数据写入(增加等待时间) - time.Sleep(500 * time.Millisecond) - - // 验证统计信息 - stats := pm.GetStats() - assert.NotNil(t, stats) - - // 检查pending_count或total_persisted - if pendingCount, ok := stats["pending_count"].(int); ok { - assert.True(t, pendingCount >= 0) - } - if totalPersisted, ok := stats["total_persisted"].(int64); ok { - assert.True(t, totalPersisted >= 0) - } -} - -// TestPersistenceManagerLoadAndRecoverData 测试加载和恢复数据功能 -func TestPersistenceManagerLoadAndRecoverData(t *testing.T) { - tempDir := filepath.Join(os.TempDir(), "load_recover_test") - defer os.RemoveAll(tempDir) - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - - // 持久化一些数据 - testData := []map[string]interface{}{ - {"message": "recover_data_1", "id": 1, "priority": "high"}, - {"message": "recover_data_2", "id": 2, "priority": "medium"}, - {"message": "recover_data_3", "id": 3, "priority": "low"}, - } - - for _, data := range testData { - err = pm.PersistData(data) - require.NoError(t, err) - } - - // 等待数据写入 - time.Sleep(300 * time.Millisecond) - - // 停止持久化管理器 - pm.Stop() - time.Sleep(100 * time.Millisecond) - - // 重新启动以触发LoadAndRecoverData - err = pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 验证恢复模式 - assert.True(t, pm.IsInRecoveryMode()) - - // 处理所有恢复数据 - recoveredItems := make([]map[string]interface{}, 0) - for { - recoveredData, hasData := pm.GetRecoveryData() - if !hasData { - break - } - recoveredItems = append(recoveredItems, recoveredData) - if len(recoveredItems) > 10 { // 防止无限循环 - break - } - } - - assert.True(t, len(recoveredItems) > 0, "Should have recovered some data") - - // 验证恢复的数据包含预期字段 - for _, item := range recoveredItems { - assert.Contains(t, item, "message") - assert.Contains(t, item, "id") - assert.Contains(t, item, "priority") - } -} diff --git a/stream/strategy.go b/stream/strategy.go index 999dfe3..c0e968e 100644 --- a/stream/strategy.go +++ b/stream/strategy.go @@ -28,10 +28,9 @@ import ( // Overflow strategy constants const ( - StrategyDrop = "drop" // Drop strategy - StrategyBlock = "block" // Blocking strategy - StrategyExpand = "expand" // Dynamic strategy - StrategyPersist = "persist" // Persistence strategy (todo: incomplete) + StrategyDrop = "drop" // Drop strategy + StrategyBlock = "block" // Blocking strategy + StrategyExpand = "expand" // Dynamic strategy ) // DataProcessingStrategy data processing strategy interface @@ -155,95 +154,7 @@ func (es *ExpansionStrategy) Stop() error { return nil } -// PersistenceStrategy 持久化策略实现 -type PersistenceStrategy struct { - stream *Stream -} -// NewPersistenceStrategy 创建持久化策略实例 -func NewPersistenceStrategy() *PersistenceStrategy { - return &PersistenceStrategy{} -} - -// ProcessData 实现持久化模式数据处理 -func (ps *PersistenceStrategy) ProcessData(data map[string]interface{}) { - // 检查是否处于恢复模式,如果是则优先处理恢复数据 - if ps.stream.persistenceManager != nil && ps.stream.persistenceManager.IsInRecoveryMode() { - // 恢复模式下,先尝试处理恢复数据 - if recoveredData, hasData := ps.stream.persistenceManager.GetRecoveryData(); hasData { - // 优先处理恢复的数据 - if ps.stream.safeSendToDataChan(recoveredData) { - // 恢复数据处理成功,现在处理新数据 - if ps.stream.safeSendToDataChan(data) { - return - } - // 新数据无法处理,持久化(带重试限制) - if err := ps.stream.persistenceManager.PersistDataWithRetryLimit(data, 0); err != nil { - logger.Error("Failed to persist new data during recovery: %v", err) - atomic.AddInt64(&ps.stream.droppedCount, 1) - } - return - } else { - // 恢复数据也无法处理,检查重试次数避免无限循环 - if !ps.stream.persistenceManager.ShouldRetryRecoveredData(recoveredData) { - // 超过重试限制,移入死信队列 - logger.Warn("Recovered data exceeded retry limit, moving to dead letter queue") - ps.stream.persistenceManager.MoveToDeadLetterQueue(recoveredData) - } else { - // 重新持久化恢复数据(增加重试计数) - if err := ps.stream.persistenceManager.RePersistRecoveredData(recoveredData); err != nil { - logger.Error("Failed to re-persist recovered data: %v", err) - } - } - // 持久化新数据 - if err := ps.stream.persistenceManager.PersistDataWithRetryLimit(data, 0); err != nil { - logger.Error("Failed to persist new data: %v", err) - atomic.AddInt64(&ps.stream.droppedCount, 1) - } - return - } - } - } - - // 正常模式或非恢复模式,首次尝试添加数据 - if ps.stream.safeSendToDataChan(data) { - return - } - - // 通道满了,使用持久化(带重试限制) - if ps.stream.persistenceManager != nil { - if err := ps.stream.persistenceManager.PersistDataWithRetryLimit(data, 0); err != nil { - logger.Error("Failed to persist data with persistence: %v", err) - atomic.AddInt64(&ps.stream.droppedCount, 1) - } else { - logger.Debug("Data has been persisted to disk with sequence ordering") - } - } else { - logger.Error("Persistence manager not initialized, data will be lost") - atomic.AddInt64(&ps.stream.droppedCount, 1) - } - - // 启动异步恢复检查(防止重复启动) - if atomic.LoadInt32(&ps.stream.activeRetries) < ps.stream.maxRetryRoutines { - go ps.stream.checkAndProcessRecoveryData() - } -} - -// GetStrategyName 获取策略名称 -func (ps *PersistenceStrategy) GetStrategyName() string { - return StrategyPersist -} - -// Init 初始化持久化策略 -func (ps *PersistenceStrategy) Init(stream *Stream, config types.PerformanceConfig) error { - ps.stream = stream - return nil -} - -// Stop 停止并清理持久化策略资源 -func (ps *PersistenceStrategy) Stop() error { - return nil -} // DropStrategy 丢弃策略实现 type DropStrategy struct { @@ -379,7 +290,6 @@ func NewStrategyFactory() *StrategyFactory { // 注册内置策略 factory.RegisterStrategy(StrategyBlock, func() DataProcessingStrategy { return NewBlockingStrategy() }) factory.RegisterStrategy(StrategyExpand, func() DataProcessingStrategy { return NewExpansionStrategy() }) - factory.RegisterStrategy(StrategyPersist, func() DataProcessingStrategy { return NewPersistenceStrategy() }) factory.RegisterStrategy(StrategyDrop, func() DataProcessingStrategy { return NewDropStrategy() }) return factory diff --git a/stream/strategy_test.go b/stream/strategy_test.go index 8a89c6e..03307dc 100644 --- a/stream/strategy_test.go +++ b/stream/strategy_test.go @@ -28,11 +28,6 @@ func TestStrategyFactory(t *testing.T) { strategyName: StrategyExpand, expectedType: StrategyExpand, }, - { - name: "Persistence Strategy", - strategyName: StrategyPersist, - expectedType: StrategyPersist, - }, { name: "Drop Strategy", strategyName: StrategyDrop, @@ -102,18 +97,7 @@ func TestExpansionStrategy_ProcessData(t *testing.T) { } // TestPersistenceStrategy_ProcessData 测试持久化策略数据处理 -func TestPersistenceStrategy_ProcessData(t *testing.T) { - config := types.Config{ - SimpleFields: []string{"name", "age"}, - } - stream, err := NewStream(config) - require.NoError(t, err) - defer func() { - if stream != nil { - close(stream.done) - } - }() -} + // TestDropStrategy_ProcessData 测试丢弃策略数据处理 func TestDropStrategy_ProcessData(t *testing.T) { @@ -353,7 +337,7 @@ func TestStrategyRegistration(t *testing.T) { // 测试内置策略是否已注册 registeredStrategies := factory.GetRegisteredStrategies() - expectedStrategies := []string{StrategyBlock, StrategyExpand, StrategyPersist, StrategyDrop} + expectedStrategies := []string{StrategyBlock, StrategyExpand, StrategyDrop} for _, expected := range expectedStrategies { found := false diff --git a/stream/stream.go b/stream/stream.go index 855ff4a..8b84fc2 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -46,18 +46,13 @@ const ( PerformanceLevelOptimal = "OPTIMAL" ) -// Persistence related constants -const ( - PersistenceEnabled = "enabled" - PersistenceMessage = "message" - PersistenceNotEnabledMsg = "persistence not enabled" - PerformanceConfigKey = "performanceConfig" -) - // SQL keyword constants const ( SQLKeywordCase = "CASE" ) +const ( + PerformanceConfigKey = "performanceConfig" +) type Stream struct { dataChan chan map[string]interface{} @@ -91,10 +86,9 @@ type Stream struct { dropLogCount int64 // Count of drops since last log // Data loss strategy configuration - allowDataDrop bool // Whether to allow data loss - blockingTimeout time.Duration // Blocking timeout duration - overflowStrategy string // Overflow strategy: "drop", "block", "expand", "persist" - persistenceManager *PersistenceManager // Persistence manager + allowDataDrop bool // Whether to allow data loss + blockingTimeout time.Duration // Blocking timeout duration + overflowStrategy string // Overflow strategy: "drop", "block", "expand", "persist" // Data processing strategy using strategy pattern for better extensibility dataStrategy DataProcessingStrategy // Data processing strategy instance @@ -123,12 +117,6 @@ func NewStreamWithLowLatency(config types.Config) (*Stream, error) { return factory.CreateLowLatencyStream(config) } -// NewStreamWithZeroDataLoss 创建零数据丢失Stream -func NewStreamWithZeroDataLoss(config types.Config) (*Stream, error) { - factory := NewStreamFactory() - return factory.CreateZeroDataLossStream(config) -} - // NewStreamWithCustomPerformance 创建自定义性能配置的Stream func NewStreamWithCustomPerformance(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error) { factory := NewStreamFactory() @@ -233,54 +221,6 @@ func (s *Stream) Stop() { logger.Error("Failed to stop data strategy: %v", err) } } - - // 停止持久化管理器 - if s.persistenceManager != nil { - if err := s.persistenceManager.Stop(); err != nil { - logger.Error("Failed to stop persistence manager: %v", err) - } - } -} - -// LoadAndReprocessPersistedData 加载并重新处理持久化数据 -func (s *Stream) LoadAndReprocessPersistedData() error { - if s.persistenceManager == nil { - return fmt.Errorf("persistence manager not initialized") - } - - // 加载持久化数据 - err := s.persistenceManager.LoadAndRecoverData() - if err != nil { - return fmt.Errorf("failed to load persisted data: %w", err) - } - - // 检查是否有恢复数据 - if !s.persistenceManager.IsInRecoveryMode() { - logger.Info("No persistent data to recover") - return nil - } - - logger.Info("Starting persistent data recovery process") - - // 启动恢复处理协程 - go s.checkAndProcessRecoveryData() - - logger.Info("Persistent data recovery process started") - return nil -} - -// GetPersistenceStats 获取持久化统计信息 -func (s *Stream) GetPersistenceStats() map[string]interface{} { - if s.persistenceManager == nil { - return map[string]interface{}{ - PersistenceEnabled: false, - PersistenceMessage: PersistenceNotEnabledMsg, - } - } - - stats := s.persistenceManager.GetStats() - stats[PersistenceEnabled] = true - return stats } // IsAggregationQuery 检查当前流是否为聚合查询 diff --git a/stream/stream_factory.go b/stream/stream_factory.go index 8aa1714..7db2abc 100644 --- a/stream/stream_factory.go +++ b/stream/stream_factory.go @@ -54,12 +54,6 @@ func (sf *StreamFactory) CreateLowLatencyStream(config types.Config) (*Stream, e return sf.createStreamWithUnifiedConfig(config) } -// CreateZeroDataLossStream creates zero data loss Stream -func (sf *StreamFactory) CreateZeroDataLossStream(config types.Config) (*Stream, error) { - config.PerformanceConfig = types.ZeroDataLossConfig() - return sf.createStreamWithUnifiedConfig(config) -} - // CreateCustomPerformanceStream creates Stream with custom performance configuration func (sf *StreamFactory) CreateCustomPerformanceStream(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error) { config.PerformanceConfig = perfConfig @@ -87,11 +81,6 @@ func (sf *StreamFactory) createStreamWithUnifiedConfig(config types.Config) (*St // Create Stream instance stream := sf.createStreamInstance(config, win) - // Initialize persistence manager - if err := sf.initializePersistenceManager(stream, config.PerformanceConfig); err != nil { - return nil, err - } - // Setup data processing strategy if err := sf.setupDataProcessingStrategy(stream, config.PerformanceConfig); err != nil { return nil, fmt.Errorf("failed to setup data processing strategy: %w", err) @@ -137,29 +126,6 @@ func (sf *StreamFactory) createStreamInstance(config types.Config, win window.Wi } } -// initializePersistenceManager 初始化持久化管理器 -// 当溢出策略设置为持久化时,检查并初始化持久化配置 -func (sf *StreamFactory) initializePersistenceManager(stream *Stream, perfConfig types.PerformanceConfig) error { - if perfConfig.OverflowConfig.Strategy == StrategyPersist { - if perfConfig.OverflowConfig.PersistenceConfig == nil { - return fmt.Errorf("persistence strategy is enabled but PersistenceConfig is not provided. Please configure PersistenceConfig with DataDir, MaxFileSize, and FlushInterval. Example: perfConfig.OverflowConfig.PersistenceConfig = &types.PersistenceConfig{DataDir: \"./data\", MaxFileSize: 10*1024*1024, FlushInterval: 5*time.Second}") - } - persistConfig := perfConfig.OverflowConfig.PersistenceConfig - stream.persistenceManager = NewPersistenceManagerWithConfig( - persistConfig.DataDir, - persistConfig.MaxFileSize, - persistConfig.FlushInterval, - ) - err := stream.persistenceManager.Start() - if err != nil { - return fmt.Errorf("failed to start persistence manager: %w", err) - } - // 尝试加载和恢复持久化数据 - return stream.persistenceManager.LoadAndRecoverData() - } - return nil -} - // setupDataProcessingStrategy 设置数据处理策略 // 使用策略模式替代函数指针,提供更好的扩展性和可维护性 func (sf *StreamFactory) setupDataProcessingStrategy(stream *Stream, perfConfig types.PerformanceConfig) error { diff --git a/stream/stream_test.go b/stream/stream_test.go index a286081..44f5969 100644 --- a/stream/stream_test.go +++ b/stream/stream_test.go @@ -377,7 +377,6 @@ func TestStreamFactory(t *testing.T) { {"CreateStream", func() (*Stream, error) { return factory.CreateStream(config) }}, {"CreateHighPerformanceStream", func() (*Stream, error) { return factory.CreateHighPerformanceStream(config) }}, {"CreateLowLatencyStream", func() (*Stream, error) { return factory.CreateLowLatencyStream(config) }}, - {"CreateZeroDataLossStream", func() (*Stream, error) { return factory.CreateZeroDataLossStream(config) }}, } for _, tt := range tests { @@ -440,16 +439,6 @@ func TestStreamConstructors(t *testing.T) { } }() - // 测试零数据丢失Stream - zeroLossStream, err := NewStreamWithZeroDataLoss(config) - require.NoError(t, err) - require.NotNil(t, zeroLossStream) - defer func() { - if zeroLossStream != nil { - close(zeroLossStream.done) - } - }() - // 测试自定义性能配置Stream perfConfig := types.PerformanceConfig{ WorkerConfig: types.WorkerConfig{ @@ -511,36 +500,6 @@ func TestStreamFilterRegistration(t *testing.T) { require.NoError(t, err) } -// TestStreamPersistence 测试持久化功能 -func TestStreamPersistence(t *testing.T) { - config := types.Config{ - SimpleFields: []string{"name", "age"}, - PerformanceConfig: types.PerformanceConfig{ - OverflowConfig: types.OverflowConfig{ - PersistenceConfig: &types.PersistenceConfig{ - DataDir: "./test_persistence", - }, - }, - }, - } - stream, err := NewStream(config) - require.NoError(t, err) - defer func() { - if stream != nil { - close(stream.done) - } - }() - - // 测试加载和重处理持久化数据 - err = stream.LoadAndReprocessPersistedData() - // 这个可能会失败,因为测试目录可能不存在,这是正常的 - // require.NoError(t, err) - - // 测试获取持久化统计信息 - stats := stream.GetPersistenceStats() - require.NotNil(t, stats) -} - // TestStreamAggregationQuery 测试聚合查询功能 func TestStreamAggregationQuery(t *testing.T) { config := types.Config{ @@ -853,58 +812,6 @@ func TestDataChannelExpansion(t *testing.T) { assert.True(t, stats[InputCount] > 0) } -// TestRecoveryDataProcessing 测试恢复数据处理功能 -func TestRecoveryDataProcessing(t *testing.T) { - tempDir := t.TempDir() - - config := types.Config{ - SimpleFields: []string{"message", "id"}, - PerformanceConfig: types.PerformanceConfig{ - BufferConfig: types.BufferConfig{ - DataChannelSize: 5, // 小容量便于测试 - ResultChannelSize: 10, - }, - WorkerConfig: types.WorkerConfig{ - SinkPoolSize: 2, - MaxRetryRoutines: 1, - }, - OverflowConfig: types.OverflowConfig{ - PersistenceConfig: &types.PersistenceConfig{ - DataDir: tempDir, - FlushInterval: 100 * time.Millisecond, - MaxFileSize: 1024, - MaxRetries: 3, - RetryInterval: 50 * time.Millisecond, - }, - }, - }, - } - - stream, err := NewStream(config) - require.NoError(t, err) - defer stream.Stop() - - // 启动流 - stream.Start() - - // 模拟数据处理 - testData := map[string]interface{}{ - "message": "recovery_test", - "id": 123, - } - - // 发送测试数据 - stream.Emit(testData) - - // 等待数据处理 - time.Sleep(200 * time.Millisecond) - - // 验证数据处理 - stats := stream.GetStats() - assert.NotNil(t, stats) - assert.True(t, stats[InputCount] > 0) -} - // TestStatsCollectorDetailedFunctions 测试统计收集器的详细功能 func TestStatsCollectorDetailedFunctions(t *testing.T) { collector := NewStatsCollector() @@ -975,83 +882,6 @@ func TestResultHandlerGetResultsChan(t *testing.T) { assert.NotNil(t, resultsChan) } -// TestPersistenceManagerRecoveryMode 测试持久化管理器的恢复模式功能 -func TestPersistenceManagerRecoveryMode(t *testing.T) { - tempDir := t.TempDir() - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 初始状态不应该在恢复模式 - assert.False(t, pm.IsInRecoveryMode()) - - // 持久化一些数据 - testData := map[string]interface{}{ - "test": "recovery_mode", - "id": 456, - } - - err = pm.PersistData(testData) - require.NoError(t, err) - - // 等待数据写入 - time.Sleep(200 * time.Millisecond) - - // 停止并重新启动以触发恢复模式 - pm.Stop() - time.Sleep(100 * time.Millisecond) - - err = pm.Start() - require.NoError(t, err) - - // 现在应该在恢复模式 - assert.True(t, pm.IsInRecoveryMode()) - - // 获取恢复数据 - recoveredData, hasData := pm.GetRecoveryData() - if hasData { - assert.NotNil(t, recoveredData) - assert.Equal(t, "recovery_mode", recoveredData["test"]) - } -} - -// TestPersistenceManagerDeadLetterQueue 测试死信队列功能 -func TestPersistenceManagerDeadLetterQueue(t *testing.T) { - tempDir := t.TempDir() - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 测试数据 - testData := map[string]interface{}{ - "message": "dead_letter_test", - "retry": 0, - } - - // 移动到死信队列 - pm.MoveToDeadLetterQueue(testData) - - // 获取死信队列 - deadLetterQueue := pm.GetDeadLetterQueue() - assert.Len(t, deadLetterQueue, 1) - if len(deadLetterQueue) > 0 { - assert.Equal(t, "dead_letter_test", deadLetterQueue[0].OriginalData.Data["message"]) - } - - // 清空死信队列 - pm.ClearDeadLetterQueue() - deadLetterQueue = pm.GetDeadLetterQueue() - assert.Len(t, deadLetterQueue, 0) -} - // TestConcurrentDataChannelExpansion 测试并发数据通道扩容 func TestConcurrentDataChannelExpansion(t *testing.T) { // 创建流 @@ -1096,54 +926,6 @@ func TestConcurrentDataChannelExpansion(t *testing.T) { assert.NotNil(t, stream) } -// TestStreamOverflowHandling 测试流溢出处理 -func TestStreamOverflowHandling(t *testing.T) { - config := types.Config{ - SimpleFields: []string{"data"}, - PerformanceConfig: types.PerformanceConfig{ - BufferConfig: types.BufferConfig{ - DataChannelSize: 3, // 非常小的容量 - ResultChannelSize: 3, - }, - WorkerConfig: types.WorkerConfig{ - SinkPoolSize: 1, - MaxRetryRoutines: 1, - }, - OverflowConfig: types.OverflowConfig{ - Strategy: "drop", - AllowDataLoss: true, - PersistenceConfig: &types.PersistenceConfig{ - DataDir: "./test_overflow", - MaxFileSize: 1024, - MaxRetries: 3, - }, - }, - }, - } - stream, err := NewStream(config) - require.NoError(t, err) - defer stream.Stop() - - // 启动流 - stream.Start() - - // 快速发送大量数据以触发溢出 - for i := 0; i < 100; i++ { - data := map[string]interface{}{ - "data": i, - } - stream.Emit(data) - } - - // 等待处理 - time.Sleep(500 * time.Millisecond) - - // 验证溢出处理 - stats := stream.GetStats() - assert.NotNil(t, stats) - assert.True(t, stats[InputCount] > 0) -} - // TestExpandDataChannelDirectly 直接测试expandDataChannel函数 // 提高expandDataChannel函数的覆盖率 func TestExpandDataChannelDirectly(t *testing.T) { @@ -1227,226 +1009,6 @@ func TestExpandDataChannelBelowThreshold(t *testing.T) { assert.Equal(t, originalCap, newCap, "Channel capacity should not change when below threshold") } -// TestCheckAndProcessRecoveryDataDirectly 直接测试checkAndProcessRecoveryData函数 -// 提高checkAndProcessRecoveryData函数的覆盖率 -func TestCheckAndProcessRecoveryDataDirectly(t *testing.T) { - tempDir := t.TempDir() - - // 创建持久化管理器 - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 创建Stream实例 - config := types.Config{ - SimpleFields: []string{"message"}, - PerformanceConfig: types.PerformanceConfig{ - BufferConfig: types.BufferConfig{ - DataChannelSize: 100, - ResultChannelSize: 100, - }, - WorkerConfig: types.WorkerConfig{ - MaxRetryRoutines: 5, - }, - }, - } - - stream, err := NewStream(config) - require.NoError(t, err) - defer stream.Stop() - - // 设置持久化管理器 - stream.persistenceManager = pm - - // 添加一些测试数据到持久化管理器 - testData := map[string]interface{}{ - "message": "recovery_test_data", - "id": 123, - } - - // 持久化数据 - err = pm.PersistData(testData) - require.NoError(t, err) - - // 等待数据写入 - time.Sleep(200 * time.Millisecond) - - // 停止并重新启动以进入恢复模式 - pm.Stop() - time.Sleep(100 * time.Millisecond) - err = pm.Start() - require.NoError(t, err) - - // 验证进入恢复模式 - assert.True(t, pm.IsInRecoveryMode(), "Should be in recovery mode") - - // 启动恢复数据处理 - go stream.checkAndProcessRecoveryData() - - // 等待恢复处理 - time.Sleep(500 * time.Millisecond) - - // 验证数据是否被恢复处理 - select { - case recoveredData := <-stream.dataChan: - assert.Equal(t, "recovery_test_data", recoveredData["message"]) - case <-time.After(1 * time.Second): - // 可能数据已经被处理完毕,这也是正常的 - t.Log("No data in channel, recovery might be completed") - } -} - -// TestShouldRetryRecoveredDataVariousCases 测试各种重试判断场景 -// 提高ShouldRetryRecoveredData函数覆盖率 -func TestShouldRetryRecoveredDataVariousCases(t *testing.T) { - tempDir := t.TempDir() - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 设置最大重试次数 - pm.SetMaxRetryCount(3) - - // 测试用例1: 重试次数未达到限制(float64类型) - data1 := map[string]interface{}{ - "_retry_count": float64(2), - "data": "test1", - } - assert.True(t, pm.ShouldRetryRecoveredData(data1), "Should allow retry for data with retry count 2") - - // 测试用例2: 重试次数达到限制(float64类型) - data2 := map[string]interface{}{ - "_retry_count": float64(3), - "data": "test2", - } - assert.False(t, pm.ShouldRetryRecoveredData(data2), "Should deny retry for data with retry count 3") - - // 测试用例3: 重试次数未达到限制(int类型) - data3 := map[string]interface{}{ - "_retry_count": 1, - "data": "test3", - } - assert.True(t, pm.ShouldRetryRecoveredData(data3), "Should allow retry for data with retry count 1 (int)") - - // 测试用例4: 重试次数达到限制(int类型) - data4 := map[string]interface{}{ - "_retry_count": 3, - "data": "test4", - } - assert.False(t, pm.ShouldRetryRecoveredData(data4), "Should deny retry for data with retry count 3 (int)") - - // 测试用例5: 没有重试信息的新数据(应该允许重试) - data5 := map[string]interface{}{ - "data": "test5", - } - assert.True(t, pm.ShouldRetryRecoveredData(data5), "Should allow retry for new data without retry info") - - // 测试用例6: 通过sequence_id查找重试信息 - testData6 := map[string]interface{}{ - "data": "test6", - } - // 先持久化数据以创建sequence_id - err = pm.PersistData(testData6) - require.NoError(t, err) - - time.Sleep(100 * time.Millisecond) - - // 模拟带sequence_id的恢复数据 - data6 := map[string]interface{}{ - "_sequence_id": float64(1), - "data": "test6", - } - // 这个测试可能需要更复杂的设置,暂时验证函数不会崩溃 - result := pm.ShouldRetryRecoveredData(data6) - assert.IsType(t, true, result, "Function should return boolean") -} - -// TestRetryFailedDataFunction 测试RetryFailedData函数 -// 提高RetryFailedData函数覆盖率 -func TestRetryFailedDataFunction(t *testing.T) { - tempDir := t.TempDir() - - pm := NewPersistenceManager(tempDir) - require.NotNil(t, pm) - - err := pm.Start() - require.NoError(t, err) - defer pm.Stop() - - // 持久化一些测试数据 - testData := map[string]interface{}{ - "message": "retry_test", - "id": 789, - } - - err = pm.PersistData(testData) - require.NoError(t, err) - - // 等待数据写入 - time.Sleep(200 * time.Millisecond) - - // 调用重试失败数据函数(使用序列号1和测试原因) - err = pm.RetryFailedData(1, "test retry reason") - // 这个调用可能会失败,因为序列号可能不在重试映射中,这是正常的 - - // 验证函数执行不会崩溃 - assert.NotNil(t, pm, "PersistenceManager should still be valid after RetryFailedData") -} - -// TestMaxRetryRoutinesLimit 测试最大重试协程数限制 -func TestMaxRetryRoutinesLimit(t *testing.T) { - config := types.Config{ - SimpleFields: []string{"test"}, - PerformanceConfig: types.PerformanceConfig{ - WorkerConfig: types.WorkerConfig{ - MaxRetryRoutines: 1, // 设置很小的限制 - }, - }, - } - - stream, err := NewStream(config) - require.NoError(t, err) - defer stream.Stop() - - // 模拟已达到最大重试协程数 - stream.activeRetries = 1 - - // 尝试启动恢复处理,应该立即返回 - originalActiveRetries := stream.activeRetries - stream.checkAndProcessRecoveryData() - newActiveRetries := stream.activeRetries - - // 验证activeRetries没有增加(因为达到限制) - assert.Equal(t, originalActiveRetries, newActiveRetries, "activeRetries should not increase when limit reached") -} - -// TestPersistenceManagerWithNilRecoveryData 测试持久化管理器处理空恢复数据 -func TestPersistenceManagerWithNilRecoveryData(t *testing.T) { - config := types.Config{ - SimpleFields: []string{"test"}, - } - - stream, err := NewStream(config) - require.NoError(t, err) - defer stream.Stop() - - // 设置为nil的持久化管理器 - stream.persistenceManager = nil - - // 调用恢复数据处理,应该立即返回而不崩溃 - stream.checkAndProcessRecoveryData() - - // 验证函数执行完毕 - assert.Nil(t, stream.persistenceManager, "PersistenceManager should remain nil") -} - // TestStreamConfigErrorHandlingEnhanced 测试流配置错误处理增强版 func TestStreamConfigErrorHandlingEnhanced(t *testing.T) { tests := []struct { @@ -1729,11 +1291,6 @@ func TestStreamUnifiedConfigIntegration(t *testing.T) { performanceConfig: types.LowLatencyConfig(), expectedWindowBufferSize: 100, }, - { - name: "零数据丢失配置", - performanceConfig: types.ZeroDataLossConfig(), - expectedWindowBufferSize: 2000, - }, } for _, tc := range testCases { @@ -2106,7 +1663,6 @@ func TestPerformanceConfigurationsEnhanced(t *testing.T) { "Default": types.DefaultPerformanceConfig(), "HighPerformance": types.HighPerformanceConfig(), "LowLatency": types.LowLatencyConfig(), - "ZeroDataLoss": types.ZeroDataLossConfig(), } for name, perfConfig := range configs { @@ -2205,27 +1761,6 @@ func TestStreamFactory_CreateLowLatencyStream(t *testing.T) { assert.Equal(t, expectedConfig, stream.config.PerformanceConfig) } -// TestStreamFactory_CreateZeroDataLossStream 测试创建零数据丢失流 -func TestStreamFactory_CreateZeroDataLossStream(t *testing.T) { - factory := NewStreamFactory() - config := types.Config{ - SimpleFields: []string{"name", "age"}, - } - - stream, err := factory.CreateZeroDataLossStream(config) - require.NoError(t, err) - assert.NotNil(t, stream) - defer func() { - if stream != nil { - close(stream.done) - } - }() - - // 验证零数据丢失配置 - expectedConfig := types.ZeroDataLossConfig() - assert.Equal(t, expectedConfig, stream.config.PerformanceConfig) -} - // TestStreamFactory_CreateCustomPerformanceStream 测试创建自定义性能配置流 func TestStreamFactory_CreateCustomPerformanceStream(t *testing.T) { factory := NewStreamFactory() @@ -2288,62 +1823,6 @@ func TestStreamFactory_CreateStreamWithWindow(t *testing.T) { }() } -// TestStreamFactory_CreateStreamWithPersistence 测试创建带持久化的流 -func TestStreamFactory_CreateStreamWithPersistence(t *testing.T) { - factory := NewStreamFactory() - config := types.Config{ - SimpleFields: []string{"name", "age"}, - PerformanceConfig: types.PerformanceConfig{ - BufferConfig: types.BufferConfig{ - DataChannelSize: 100, - ResultChannelSize: 50, - }, - OverflowConfig: types.OverflowConfig{ - Strategy: StrategyPersist, - PersistenceConfig: &types.PersistenceConfig{ - DataDir: "./test_data", - MaxFileSize: 1024 * 1024, - FlushInterval: 5 * time.Second, - }, - }, - WorkerConfig: types.WorkerConfig{ - SinkWorkerCount: 2, - SinkPoolSize: 50, - MaxRetryRoutines: 1, - }, - }, - } - - stream, err := factory.CreateStream(config) - require.NoError(t, err) - assert.NotNil(t, stream) - assert.NotNil(t, stream.persistenceManager) - defer func() { - // 清理测试数据 - if stream.persistenceManager != nil { - stream.persistenceManager.Stop() - } - }() -} - -// TestStreamFactory_CreateStreamWithInvalidPersistence 测试创建无效持久化配置的流 -func TestStreamFactory_CreateStreamWithInvalidPersistence(t *testing.T) { - factory := NewStreamFactory() - config := types.Config{ - SimpleFields: []string{"name", "age"}, - PerformanceConfig: types.PerformanceConfig{ - OverflowConfig: types.OverflowConfig{ - Strategy: StrategyPersist, - // 缺少PersistenceConfig - }, - }, - } - - _, err := factory.CreateStream(config) - assert.Error(t, err) - assert.Contains(t, err.Error(), "PersistenceConfig is not provided") -} - // TestStreamFactory_CreateStreamWithInvalidStrategy 测试创建无效策略的流 func TestStreamFactory_CreateStreamWithInvalidStrategy(t *testing.T) { factory := NewStreamFactory() @@ -2397,79 +1876,6 @@ func TestStreamFactory_CreateStreamInstance(t *testing.T) { assert.Equal(t, config, stream.config) } -// TestStreamFactory_SetupDataProcessingStrategy 测试数据处理策略设置 -func TestStreamFactory_SetupDataProcessingStrategy(t *testing.T) { - factory := NewStreamFactory() - stream := &Stream{} - - tests := []struct { - name string - strategy string - wantErr bool - }{ - {"Drop strategy", StrategyDrop, false}, - {"Block strategy", StrategyBlock, false}, - {"Expand strategy", StrategyExpand, false}, - {"Persist strategy", StrategyPersist, false}, - {"Invalid strategy", "invalid", false}, // 应该使用默认策略 - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - perfConfig := types.PerformanceConfig{ - OverflowConfig: types.OverflowConfig{ - Strategy: tt.strategy, - }, - } - - err := factory.setupDataProcessingStrategy(stream, perfConfig) - if tt.wantErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.NotNil(t, stream.dataStrategy) - } - }) - } -} - -// TestStreamFactory_InitializePersistenceManager 测试持久化管理器初始化 -func TestStreamFactory_InitializePersistenceManager(t *testing.T) { - factory := NewStreamFactory() - stream := &Stream{} - - // 测试非持久化策略 - perfConfig := types.PerformanceConfig{ - OverflowConfig: types.OverflowConfig{ - Strategy: StrategyDrop, - }, - } - err := factory.initializePersistenceManager(stream, perfConfig) - assert.NoError(t, err) - assert.Nil(t, stream.persistenceManager) - - // 测试持久化策略但缺少配置 - perfConfig.OverflowConfig.Strategy = StrategyPersist - err = factory.initializePersistenceManager(stream, perfConfig) - assert.Error(t, err) - assert.Contains(t, err.Error(), "PersistenceConfig is not provided") - - // 测试有效的持久化配置 - perfConfig.OverflowConfig.PersistenceConfig = &types.PersistenceConfig{ - DataDir: "./test_data", - MaxFileSize: 1024 * 1024, - FlushInterval: 5 * time.Second, - } - err = factory.initializePersistenceManager(stream, perfConfig) - assert.NoError(t, err) - assert.NotNil(t, stream.persistenceManager) - - // 清理 - if stream.persistenceManager != nil { - stream.persistenceManager.Stop() - } -} - // TestStreamFactory_Performance 测试工厂性能 func TestStreamFactory_Performance(t *testing.T) { factory := NewStreamFactory() diff --git a/streamsql.go b/streamsql.go index f3971ce..a96afc4 100644 --- a/streamsql.go +++ b/streamsql.go @@ -148,8 +148,6 @@ func (s *Streamsql) Execute(sql string) error { streamInstance, err = stream.NewStreamWithHighPerformance(*config) case "low_latency": streamInstance, err = stream.NewStreamWithLowLatency(*config) - case "zero_data_loss": - streamInstance, err = stream.NewStreamWithZeroDataLoss(*config) case "custom": if s.customConfig != nil { streamInstance, err = stream.NewStreamWithCustomPerformance(*config, *s.customConfig) diff --git a/types/config.go b/types/config.go index a70a7b0..644ea92 100644 --- a/types/config.go +++ b/types/config.go @@ -83,21 +83,13 @@ type BufferConfig struct { // OverflowConfig overflow strategy configuration type OverflowConfig struct { - Strategy string `json:"strategy"` // Overflow strategy: "drop", "block", "expand", "persist" - BlockTimeout time.Duration `json:"blockTimeout"` // Block timeout duration - AllowDataLoss bool `json:"allowDataLoss"` // Allow data loss - PersistenceConfig *PersistenceConfig `json:"persistenceConfig"` // Persistence configuration - ExpansionConfig ExpansionConfig `json:"expansionConfig"` // Expansion configuration + Strategy string `json:"strategy"` // Overflow strategy: "drop", "block", "expand" + BlockTimeout time.Duration `json:"blockTimeout"` // Block timeout duration + AllowDataLoss bool `json:"allowDataLoss"` // Allow data loss + ExpansionConfig ExpansionConfig `json:"expansionConfig"` // Expansion configuration } -// PersistenceConfig persistence configuration -type PersistenceConfig struct { - DataDir string `json:"dataDir"` // Persistence data directory - MaxFileSize int64 `json:"maxFileSize"` // Maximum file size - FlushInterval time.Duration `json:"flushInterval"` // Flush interval - MaxRetries int `json:"maxRetries"` // Maximum retry count - RetryInterval time.Duration `json:"retryInterval"` // Retry interval -} + // ExpansionConfig expansion configuration type ExpansionConfig struct { @@ -216,40 +208,3 @@ func LowLatencyConfig() PerformanceConfig { config.MonitoringConfig.StatsUpdateInterval = 1 * time.Second return config } - -// ZeroDataLossConfig returns zero data loss configuration preset -// Provides maximum data protection using persistence strategy to prevent data loss -func ZeroDataLossConfig() PerformanceConfig { - config := DefaultPerformanceConfig() - config.BufferConfig.DataChannelSize = 2000 - config.BufferConfig.ResultChannelSize = 200 - config.BufferConfig.WindowOutputSize = 2000 - config.BufferConfig.EnableDynamicResize = true - config.OverflowConfig.Strategy = "persist" - config.OverflowConfig.AllowDataLoss = false - config.OverflowConfig.PersistenceConfig = &PersistenceConfig{ - DataDir: "./data", - MaxFileSize: 100 * 1024 * 1024, // 100MB - FlushInterval: 5 * time.Second, - MaxRetries: 3, - RetryInterval: 2 * time.Second, - } - return config -} - -// PersistencePerformanceConfig returns persistence performance configuration preset -// Provides persistent storage functionality balancing performance and data durability -func PersistencePerformanceConfig() PerformanceConfig { - config := DefaultPerformanceConfig() - config.BufferConfig.DataChannelSize = 1500 - config.BufferConfig.ResultChannelSize = 150 - config.OverflowConfig.Strategy = "persist" - config.OverflowConfig.PersistenceConfig = &PersistenceConfig{ - DataDir: "./persistence_data", - MaxFileSize: 10 * 1024 * 1024, // 10MB - FlushInterval: 5 * time.Second, - MaxRetries: 3, - RetryInterval: 2 * time.Second, - } - return config -} diff --git a/types/config_test.go b/types/config_test.go index ba9bc27..77186f9 100644 --- a/types/config_test.go +++ b/types/config_test.go @@ -378,34 +378,6 @@ func TestLowLatencyConfig(t *testing.T) { assert.Equal(t, 1*time.Second, config.MonitoringConfig.StatsUpdateInterval) } -// TestZeroDataLossConfig 测试ZeroDataLossConfig函数 -func TestZeroDataLossConfig(t *testing.T) { - config := ZeroDataLossConfig() - - // 验证零数据丢失配置 - assert.Equal(t, 2000, config.BufferConfig.DataChannelSize) - assert.Equal(t, 200, config.BufferConfig.ResultChannelSize) - assert.Equal(t, "persist", config.OverflowConfig.Strategy) - assert.False(t, config.OverflowConfig.AllowDataLoss) - assert.NotNil(t, config.OverflowConfig.PersistenceConfig) - assert.Equal(t, "./data", config.OverflowConfig.PersistenceConfig.DataDir) - assert.Equal(t, int64(100*1024*1024), config.OverflowConfig.PersistenceConfig.MaxFileSize) -} - -// TestPersistencePerformanceConfig 测试PersistencePerformanceConfig函数 -func TestPersistencePerformanceConfig(t *testing.T) { - config := PersistencePerformanceConfig() - - // 验证持久化性能配置 - assert.Equal(t, 1500, config.BufferConfig.DataChannelSize) - assert.Equal(t, 150, config.BufferConfig.ResultChannelSize) - assert.Equal(t, "persist", config.OverflowConfig.Strategy) - assert.NotNil(t, config.OverflowConfig.PersistenceConfig) - assert.Equal(t, "./persistence_data", config.OverflowConfig.PersistenceConfig.DataDir) - assert.Equal(t, 5*time.Second, config.OverflowConfig.PersistenceConfig.FlushInterval) - assert.Equal(t, 3, config.OverflowConfig.PersistenceConfig.MaxRetries) -} - // TestBufferConfig 测试BufferConfig结构体 func TestBufferConfig(t *testing.T) { config := BufferConfig{ @@ -425,41 +397,6 @@ func TestBufferConfig(t *testing.T) { assert.Equal(t, 0.75, config.UsageThreshold) } -// TestOverflowConfig 测试OverflowConfig结构体 -func TestOverflowConfig(t *testing.T) { - persistenceConfig := &PersistenceConfig{ - DataDir: "/tmp/data", - MaxFileSize: 1024 * 1024, - FlushInterval: 10 * time.Second, - MaxRetries: 5, - RetryInterval: 2 * time.Second, - } - - expansionConfig := ExpansionConfig{ - GrowthFactor: 2.0, - MinIncrement: 100, - TriggerThreshold: 0.9, - ExpansionTimeout: 30 * time.Second, - } - - config := OverflowConfig{ - Strategy: "persist", - BlockTimeout: 5 * time.Second, - AllowDataLoss: false, - PersistenceConfig: persistenceConfig, - ExpansionConfig: expansionConfig, - } - - assert.Equal(t, "persist", config.Strategy) - assert.Equal(t, 5*time.Second, config.BlockTimeout) - assert.False(t, config.AllowDataLoss) - assert.NotNil(t, config.PersistenceConfig) - assert.Equal(t, "/tmp/data", config.PersistenceConfig.DataDir) - assert.Equal(t, int64(1024*1024), config.PersistenceConfig.MaxFileSize) - assert.Equal(t, 2.0, config.ExpansionConfig.GrowthFactor) - assert.Equal(t, 100, config.ExpansionConfig.MinIncrement) -} - // TestWorkerConfig 测试WorkerConfig结构体 func TestWorkerConfig(t *testing.T) { config := WorkerConfig{ diff --git a/window/window_test.go b/window/window_test.go index eff530d..32759b9 100644 --- a/window/window_test.go +++ b/window/window_test.go @@ -644,13 +644,6 @@ func TestWindowWithPerformanceConfig(t *testing.T) { expectedBufferSize: 20, // 200 / 10 extraParams: map[string]interface{}{"count": 10}, }, - { - name: "会话窗口-零数据丢失配置", - windowType: TypeSession, - performanceConfig: types.ZeroDataLossConfig(), - expectedBufferSize: 200, // 2000 / 10 - extraParams: map[string]interface{}{"timeout": "30s"}, - }, { name: "自定义性能配置", windowType: TypeTumbling,