mirror of
https://gitee.com/rulego/streamsql.git
synced 2025-07-10 01:32:34 +00:00
fix:data race
This commit is contained in:
@ -74,9 +74,6 @@ func NewPersistenceManagerWithConfig(dataDir string, maxFileSize int64, flushInt
|
||||
|
||||
// Start 启动持久化管理器
|
||||
func (pm *PersistenceManager) Start() error {
|
||||
pm.writeMutex.Lock()
|
||||
defer pm.writeMutex.Unlock()
|
||||
|
||||
// 检查是否已经在运行
|
||||
pm.runningMutex.RLock()
|
||||
running := pm.isRunning
|
||||
@ -87,9 +84,12 @@ func (pm *PersistenceManager) Start() error {
|
||||
}
|
||||
|
||||
// 创建初始文件
|
||||
pm.writeMutex.Lock()
|
||||
if err := pm.createNewFile(); err != nil {
|
||||
pm.writeMutex.Unlock()
|
||||
return fmt.Errorf("failed to create initial file: %w", err)
|
||||
}
|
||||
pm.writeMutex.Unlock()
|
||||
|
||||
// 设置运行状态
|
||||
pm.runningMutex.Lock()
|
||||
@ -108,9 +108,6 @@ func (pm *PersistenceManager) Start() error {
|
||||
|
||||
// Stop 停止持久化管理器
|
||||
func (pm *PersistenceManager) Stop() error {
|
||||
pm.writeMutex.Lock()
|
||||
defer pm.writeMutex.Unlock()
|
||||
|
||||
// 检查是否正在运行
|
||||
pm.runningMutex.RLock()
|
||||
running := pm.isRunning
|
||||
@ -128,9 +125,22 @@ func (pm *PersistenceManager) Stop() error {
|
||||
close(pm.stopChan)
|
||||
|
||||
// 停止定时器
|
||||
pm.writeMutex.Lock()
|
||||
if pm.flushTimer != nil {
|
||||
pm.flushTimer.Stop()
|
||||
}
|
||||
pm.writeMutex.Unlock()
|
||||
|
||||
// 刷新剩余数据
|
||||
pm.flushPendingData()
|
||||
|
||||
// 关闭当前文件
|
||||
pm.writeMutex.Lock()
|
||||
if pm.currentFile != nil {
|
||||
pm.currentFile.Close()
|
||||
pm.currentFile = nil
|
||||
}
|
||||
pm.writeMutex.Unlock()
|
||||
|
||||
// 刷新剩余数据
|
||||
pm.flushPendingData()
|
||||
@ -157,7 +167,6 @@ func (pm *PersistenceManager) PersistData(data interface{}) error {
|
||||
|
||||
pm.pendingMutex.Lock()
|
||||
pm.pendingData = append(pm.pendingData, data)
|
||||
pm.totalPersisted++
|
||||
pm.pendingMutex.Unlock()
|
||||
|
||||
return nil
|
||||
@ -179,7 +188,11 @@ func (pm *PersistenceManager) LoadPersistedData() ([]interface{}, error) {
|
||||
continue
|
||||
}
|
||||
allData = append(allData, data...)
|
||||
|
||||
// 在锁保护下更新统计信息
|
||||
pm.writeMutex.Lock()
|
||||
pm.totalLoaded += int64(len(data))
|
||||
pm.writeMutex.Unlock()
|
||||
|
||||
// 加载后删除文件
|
||||
if err := os.Remove(filename); err != nil {
|
||||
@ -200,6 +213,9 @@ func (pm *PersistenceManager) GetStats() map[string]interface{} {
|
||||
pm.writeMutex.Lock()
|
||||
currentFileSize := pm.currentSize
|
||||
fileIndex := pm.fileIndex
|
||||
totalPersisted := pm.totalPersisted
|
||||
totalLoaded := pm.totalLoaded
|
||||
filesCreated := pm.filesCreated
|
||||
pm.writeMutex.Unlock()
|
||||
|
||||
// 安全地读取运行状态
|
||||
@ -215,9 +231,9 @@ func (pm *PersistenceManager) GetStats() map[string]interface{} {
|
||||
"file_index": fileIndex,
|
||||
"max_file_size": pm.maxFileSize,
|
||||
"flush_interval": pm.flushInterval.String(),
|
||||
"total_persisted": pm.totalPersisted,
|
||||
"total_loaded": pm.totalLoaded,
|
||||
"files_created": pm.filesCreated,
|
||||
"total_persisted": totalPersisted,
|
||||
"total_loaded": totalLoaded,
|
||||
"files_created": filesCreated,
|
||||
}
|
||||
}
|
||||
|
||||
@ -244,11 +260,12 @@ func (pm *PersistenceManager) createNewFile() error {
|
||||
pm.fileIndex++
|
||||
pm.filesCreated++
|
||||
|
||||
logger.Info("Created new persistence file: %s", filepath)
|
||||
// logger.Info("Created new persistence file: %s", filepath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeDataToFile 将数据写入文件
|
||||
// 注意:此方法应该在writeMutex锁保护下调用
|
||||
func (pm *PersistenceManager) writeDataToFile(data interface{}) error {
|
||||
if pm.currentFile == nil {
|
||||
return fmt.Errorf("no current file")
|
||||
@ -280,6 +297,7 @@ func (pm *PersistenceManager) writeDataToFile(data interface{}) error {
|
||||
}
|
||||
|
||||
pm.currentSize += int64(n)
|
||||
pm.totalPersisted++
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -310,11 +328,12 @@ func (pm *PersistenceManager) flushPendingData() {
|
||||
_ = pm.currentFile.Sync()
|
||||
}
|
||||
|
||||
logger.Info("Flushed %d pending data records to disk", len(dataToWrite))
|
||||
// logger.Info("Flushed %d pending data records to disk", len(dataToWrite))
|
||||
}
|
||||
|
||||
// startFlushTimer 启动刷新定时器
|
||||
func (pm *PersistenceManager) startFlushTimer() {
|
||||
pm.writeMutex.Lock()
|
||||
pm.flushTimer = time.AfterFunc(pm.flushInterval, func() {
|
||||
// 安全地检查运行状态
|
||||
pm.runningMutex.RLock()
|
||||
@ -326,6 +345,7 @@ func (pm *PersistenceManager) startFlushTimer() {
|
||||
pm.startFlushTimer() // 重新启动定时器
|
||||
}
|
||||
})
|
||||
pm.writeMutex.Unlock()
|
||||
}
|
||||
|
||||
// backgroundProcessor 后台处理协程
|
||||
|
Reference in New Issue
Block a user