diff --git a/stream/persistence.go b/stream/persistence.go index 3c5103f..c3b2126 100644 --- a/stream/persistence.go +++ b/stream/persistence.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "sync/atomic" "time" @@ -145,6 +146,10 @@ func NewPersistenceManagerWithConfig(dataDir string, maxFileSize int64, flushInt stopChan: make(chan struct{}), recoveryQueue: make(chan OrderedDataItem, 10000), sequenceCounter: 0, + // Retry and dead letter queue configuration + maxRetryCount: 3, // Default maximum 3 retries + deadLetterQueue: make([]DeadLetterItem, 0, 1000), // Dead letter queue + retryDataMap: make(map[int64]*OrderedDataItem), // Retry data mapping } // Ensure data directory exists @@ -168,6 +173,10 @@ func (pm *PersistenceManager) Start() error { return fmt.Errorf("ordered persistence manager already running") } + // Reinitialize channels if they were closed + pm.stopChan = make(chan struct{}) + pm.recoveryQueue = make(chan OrderedDataItem, 10000) + // Create initial file pm.writeMutex.Lock() if err := pm.createNewFile(); err != nil { @@ -188,6 +197,12 @@ func (pm *PersistenceManager) Start() error { go pm.backgroundProcessor() go pm.recoveryProcessor() + // Load and recover existing data + if err := pm.LoadAndRecoverData(); err != nil { + logger.Error("Failed to load and recover data: %v", err) + // Don't return error, continue running + } + logger.Info("Ordered persistence manager started successfully, data directory: %s", pm.dataDir) return nil } @@ -210,7 +225,13 @@ func (pm *PersistenceManager) Stop() error { pm.isRunning = false pm.runningMutex.Unlock() - close(pm.stopChan) + // Close stop channel safely + select { + case <-pm.stopChan: + // Channel already closed + default: + close(pm.stopChan) + } // Stop timer pm.writeMutex.Lock() @@ -222,7 +243,7 @@ func (pm *PersistenceManager) Stop() error { // Flush remaining data pm.flushPendingData() - // Close current file + // Close current file with proper synchronization pm.writeMutex.Lock() if pm.currentFile != nil { pm.currentFile.Close() @@ -230,8 +251,23 @@ func (pm *PersistenceManager) Stop() error { } pm.writeMutex.Unlock() - // Close recovery queue - close(pm.recoveryQueue) + // Close recovery queue safely + go func() { + // Drain the recovery queue in a separate goroutine + for { + select { + case <-pm.recoveryQueue: + // Continue draining + default: + // Queue is empty, safe to close + close(pm.recoveryQueue) + return + } + } + }() + + // Give some time for the goroutine to drain the queue + time.Sleep(100 * time.Millisecond) logger.Info("Ordered persistence manager stopped") return nil @@ -296,11 +332,20 @@ func (pm *PersistenceManager) PersistDataWithRetryLimit(data map[string]interfac // Returns: // - error: error during loading process func (pm *PersistenceManager) LoadAndRecoverData() error { - files, err := filepath.Glob(filepath.Join(pm.dataDir, "streamsql_ordered_*.log")) + // 只加载未处理的文件(排除.processed文件) + allFiles, err := filepath.Glob(filepath.Join(pm.dataDir, "streamsql_ordered_*.log")) if err != nil { return fmt.Errorf("failed to glob files: %w", err) } + // 过滤掉已处理的文件(.processed后缀的文件) + var files []string + for _, file := range allFiles { + if !strings.HasSuffix(file, ".processed") { + files = append(files, file) + } + } + if len(files) == 0 { logger.Info("No persistence files found for recovery") return nil @@ -317,9 +362,11 @@ func (pm *PersistenceManager) LoadAndRecoverData() error { } allItems = append(allItems, items...) - // 加载后删除文件 - if err := os.Remove(filename); err != nil { - logger.Error("Failed to delete loaded file %s: %v", filename, err) + // 加载后直接删除文件 + if deleteErr := os.Remove(filename); deleteErr != nil { + logger.Error("Failed to delete file %s: %v", filename, deleteErr) + } else { + logger.Info("File %s processed and deleted", filename) } } @@ -339,23 +386,45 @@ func (pm *PersistenceManager) LoadAndRecoverData() error { pm.recoveryMode = true pm.recoveryMutex.Unlock() + // 如果没有数据需要恢复,立即退出恢复模式 + if len(allItems) == 0 { + pm.recoveryMutex.Lock() + pm.recoveryMode = false + pm.recoveryMutex.Unlock() + logger.Info("No data to recover, exiting recovery mode") + return nil + } + // 将数据放入恢复队列 + for _, item := range allItems { + select { + case pm.recoveryQueue <- item: + // 数据已放入恢复队列 + case <-pm.stopChan: + return nil + } + } + + logger.Info("Data recovery completed, %d items recovered in order", len(allItems)) + + // 启动一个goroutine来监控恢复队列,当队列为空时退出恢复模式 go func() { - for _, item := range allItems { + ticker := time.NewTicker(1 * time.Millisecond) + defer ticker.Stop() + + for { select { - case pm.recoveryQueue <- item: - // 数据已放入恢复队列 + case <-ticker.C: + if len(pm.recoveryQueue) == 0 { + pm.recoveryMutex.Lock() + pm.recoveryMode = false + pm.recoveryMutex.Unlock() + return + } case <-pm.stopChan: return } } - - // 恢复完成,退出恢复模式 - pm.recoveryMutex.Lock() - pm.recoveryMode = false - pm.recoveryMutex.Unlock() - - logger.Info("Data recovery completed, %d items recovered in order", len(allItems)) }() atomic.AddInt64(&pm.totalLoaded, int64(len(allItems))) @@ -380,8 +449,20 @@ func (pm *PersistenceManager) GetRecoveryData() (map[string]interface{}, bool) { select { case item := <-pm.recoveryQueue: atomic.AddInt64(&pm.totalRecovered, 1) + + // 检查队列是否为空,如果为空则退出恢复模式 + if len(pm.recoveryQueue) == 0 { + pm.recoveryMutex.Lock() + pm.recoveryMode = false + pm.recoveryMutex.Unlock() + } + return item.Data, true default: + // 队列为空,退出恢复模式 + pm.recoveryMutex.Lock() + pm.recoveryMode = false + pm.recoveryMutex.Unlock() return nil, false } } @@ -595,7 +676,7 @@ func (pm *PersistenceManager) backgroundProcessor() { // recoveryProcessor 恢复处理协程 func (pm *PersistenceManager) recoveryProcessor() { // 这个协程主要用于监控恢复状态,实际恢复数据由GetRecoveryData方法提供 - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) // 更频繁地检查 defer ticker.Stop() for { @@ -607,9 +688,19 @@ func (pm *PersistenceManager) recoveryProcessor() { if recoveryMode { queueLen := len(pm.recoveryQueue) - if queueLen > 0 { + if queueLen == 0 { + // 队列为空,退出恢复模式 + pm.recoveryMutex.Lock() + pm.recoveryMode = false + pm.recoveryMutex.Unlock() + logger.Debug("Recovery queue empty, exiting recovery mode") + return + } else { logger.Debug("Recovery in progress, %d items remaining in queue", queueLen) } + } else { + // 不在恢复模式,退出协程 + return } case <-pm.stopChan: @@ -747,7 +838,21 @@ func (pm *PersistenceManager) SetMaxRetryCount(maxRetryCount int) { // 返回值: // - bool: 是否应该重试 func (pm *PersistenceManager) ShouldRetryRecoveredData(data map[string]interface{}) bool { - // 检查数据中的重试次数 + // 检查数据中的重试次数(支持retry和_retry_count字段) + if retryCountFloat, exists := data["retry"]; exists { + if retryCount, ok := retryCountFloat.(float64); ok { + if int(retryCount) >= pm.maxRetryCount { + return false + } + } + if retryCount, ok := retryCountFloat.(int); ok { + if retryCount >= pm.maxRetryCount { + return false + } + } + } + + // 兼容性检查:也检查_retry_count字段 if retryCountFloat, exists := data["_retry_count"]; exists { if retryCount, ok := retryCountFloat.(float64); ok { if int(retryCount) >= pm.maxRetryCount {