diff --git a/stream/persistence.go b/stream/persistence.go index 2657daa..8181977 100644 --- a/stream/persistence.go +++ b/stream/persistence.go @@ -74,9 +74,6 @@ func NewPersistenceManagerWithConfig(dataDir string, maxFileSize int64, flushInt // Start 启动持久化管理器 func (pm *PersistenceManager) Start() error { - pm.writeMutex.Lock() - defer pm.writeMutex.Unlock() - // 检查是否已经在运行 pm.runningMutex.RLock() running := pm.isRunning @@ -87,9 +84,12 @@ func (pm *PersistenceManager) Start() error { } // 创建初始文件 + pm.writeMutex.Lock() if err := pm.createNewFile(); err != nil { + pm.writeMutex.Unlock() return fmt.Errorf("failed to create initial file: %w", err) } + pm.writeMutex.Unlock() // 设置运行状态 pm.runningMutex.Lock() @@ -108,9 +108,6 @@ func (pm *PersistenceManager) Start() error { // Stop 停止持久化管理器 func (pm *PersistenceManager) Stop() error { - pm.writeMutex.Lock() - defer pm.writeMutex.Unlock() - // 检查是否正在运行 pm.runningMutex.RLock() running := pm.isRunning @@ -128,9 +125,22 @@ func (pm *PersistenceManager) Stop() error { close(pm.stopChan) // 停止定时器 + pm.writeMutex.Lock() if pm.flushTimer != nil { pm.flushTimer.Stop() } + pm.writeMutex.Unlock() + + // 刷新剩余数据 + pm.flushPendingData() + + // 关闭当前文件 + pm.writeMutex.Lock() + if pm.currentFile != nil { + pm.currentFile.Close() + pm.currentFile = nil + } + pm.writeMutex.Unlock() // 刷新剩余数据 pm.flushPendingData() @@ -157,7 +167,6 @@ func (pm *PersistenceManager) PersistData(data interface{}) error { pm.pendingMutex.Lock() pm.pendingData = append(pm.pendingData, data) - pm.totalPersisted++ pm.pendingMutex.Unlock() return nil @@ -179,7 +188,11 @@ func (pm *PersistenceManager) LoadPersistedData() ([]interface{}, error) { continue } allData = append(allData, data...) + + // 在锁保护下更新统计信息 + pm.writeMutex.Lock() pm.totalLoaded += int64(len(data)) + pm.writeMutex.Unlock() // 加载后删除文件 if err := os.Remove(filename); err != nil { @@ -200,6 +213,9 @@ func (pm *PersistenceManager) GetStats() map[string]interface{} { pm.writeMutex.Lock() currentFileSize := pm.currentSize fileIndex := pm.fileIndex + totalPersisted := pm.totalPersisted + totalLoaded := pm.totalLoaded + filesCreated := pm.filesCreated pm.writeMutex.Unlock() // 安全地读取运行状态 @@ -215,9 +231,9 @@ func (pm *PersistenceManager) GetStats() map[string]interface{} { "file_index": fileIndex, "max_file_size": pm.maxFileSize, "flush_interval": pm.flushInterval.String(), - "total_persisted": pm.totalPersisted, - "total_loaded": pm.totalLoaded, - "files_created": pm.filesCreated, + "total_persisted": totalPersisted, + "total_loaded": totalLoaded, + "files_created": filesCreated, } } @@ -244,11 +260,12 @@ func (pm *PersistenceManager) createNewFile() error { pm.fileIndex++ pm.filesCreated++ - logger.Info("Created new persistence file: %s", filepath) + // logger.Info("Created new persistence file: %s", filepath) return nil } // writeDataToFile 将数据写入文件 +// 注意:此方法应该在writeMutex锁保护下调用 func (pm *PersistenceManager) writeDataToFile(data interface{}) error { if pm.currentFile == nil { return fmt.Errorf("no current file") @@ -280,6 +297,7 @@ func (pm *PersistenceManager) writeDataToFile(data interface{}) error { } pm.currentSize += int64(n) + pm.totalPersisted++ return nil } @@ -310,11 +328,12 @@ func (pm *PersistenceManager) flushPendingData() { _ = pm.currentFile.Sync() } - logger.Info("Flushed %d pending data records to disk", len(dataToWrite)) + // logger.Info("Flushed %d pending data records to disk", len(dataToWrite)) } // startFlushTimer 启动刷新定时器 func (pm *PersistenceManager) startFlushTimer() { + pm.writeMutex.Lock() pm.flushTimer = time.AfterFunc(pm.flushInterval, func() { // 安全地检查运行状态 pm.runningMutex.RLock() @@ -326,6 +345,7 @@ func (pm *PersistenceManager) startFlushTimer() { pm.startFlushTimer() // 重新启动定时器 } }) + pm.writeMutex.Unlock() } // backgroundProcessor 后台处理协程