From 9ab28cb5fb837a16edfefefc7d475d127ca4c633 Mon Sep 17 00:00:00 2001 From: rulego-team Date: Fri, 13 Jun 2025 21:57:56 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E9=94=81=E7=AB=9E?= =?UTF-8?q?=E4=BA=89=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{test_persistence.go => main.go} | 0 stream/persistence.go | 43 ++++++++++++++++--- stream/stream.go | 13 +++++- 3 files changed, 50 insertions(+), 6 deletions(-) rename examples/persistence/{test_persistence.go => main.go} (100%) diff --git a/examples/persistence/test_persistence.go b/examples/persistence/main.go similarity index 100% rename from examples/persistence/test_persistence.go rename to examples/persistence/main.go diff --git a/stream/persistence.go b/stream/persistence.go index 5301c4c..2657daa 100644 --- a/stream/persistence.go +++ b/stream/persistence.go @@ -25,6 +25,7 @@ type PersistenceManager struct { pendingData []interface{} // 待写入数据 pendingMutex sync.Mutex // 待写入数据互斥锁 isRunning bool // 是否运行中 + runningMutex sync.RWMutex // 保护isRunning字段的读写锁 stopChan chan struct{} // 停止通道 // 统计信息 (新增) @@ -76,7 +77,12 @@ func (pm *PersistenceManager) Start() error { pm.writeMutex.Lock() defer pm.writeMutex.Unlock() - if pm.isRunning { + // 检查是否已经在运行 + pm.runningMutex.RLock() + running := pm.isRunning + pm.runningMutex.RUnlock() + + if running { return fmt.Errorf("persistence manager already running") } @@ -85,7 +91,10 @@ func (pm *PersistenceManager) Start() error { return fmt.Errorf("failed to create initial file: %w", err) } + // 设置运行状态 + pm.runningMutex.Lock() pm.isRunning = true + pm.runningMutex.Unlock() // 启动定时刷新 pm.startFlushTimer() @@ -102,11 +111,20 @@ func (pm *PersistenceManager) Stop() error { pm.writeMutex.Lock() defer pm.writeMutex.Unlock() - if !pm.isRunning { + // 检查是否正在运行 + pm.runningMutex.RLock() + running := pm.isRunning + pm.runningMutex.RUnlock() + + if !running { return nil } + // 设置停止状态 + pm.runningMutex.Lock() pm.isRunning = false + pm.runningMutex.Unlock() + close(pm.stopChan) // 停止定时器 @@ -128,7 +146,12 @@ func (pm *PersistenceManager) Stop() error { // PersistData 持久化单条数据 func (pm *PersistenceManager) PersistData(data interface{}) error { - if !pm.isRunning { + // 检查是否正在运行 + pm.runningMutex.RLock() + running := pm.isRunning + pm.runningMutex.RUnlock() + + if !running { return fmt.Errorf("persistence manager not running") } @@ -179,8 +202,13 @@ func (pm *PersistenceManager) GetStats() map[string]interface{} { fileIndex := pm.fileIndex pm.writeMutex.Unlock() + // 安全地读取运行状态 + pm.runningMutex.RLock() + running := pm.isRunning + pm.runningMutex.RUnlock() + return map[string]interface{}{ - "running": pm.isRunning, + "running": running, "data_dir": pm.dataDir, "pending_count": pendingCount, "current_file_size": currentFileSize, @@ -288,7 +316,12 @@ func (pm *PersistenceManager) flushPendingData() { // startFlushTimer 启动刷新定时器 func (pm *PersistenceManager) startFlushTimer() { pm.flushTimer = time.AfterFunc(pm.flushInterval, func() { - if pm.isRunning { + // 安全地检查运行状态 + pm.runningMutex.RLock() + running := pm.isRunning + pm.runningMutex.RUnlock() + + if running { pm.flushPendingData() pm.startFlushTimer() // 重新启动定时器 } diff --git a/stream/stream.go b/stream/stream.go index c78f886..7954ba8 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -34,6 +34,7 @@ type Stream struct { // 新增:线程安全控制 dataChanMux sync.RWMutex // 保护dataChan访问的读写锁 + sinksMux sync.RWMutex // 保护sinks访问的读写锁 expansionMux sync.Mutex // 防止并发扩容的互斥锁 retryMux sync.Mutex // 控制持久化重试的互斥锁 expanding int32 // 扩容状态标记,使用原子操作 @@ -543,12 +544,20 @@ func (s *Stream) sendResultNonBlocking(results []map[string]interface{}) { // callSinksAsync 异步调用所有sink函数 func (s *Stream) callSinksAsync(results []map[string]interface{}) { + // 使用读锁安全地访问sinks切片 + s.sinksMux.RLock() if len(s.sinks) == 0 { + s.sinksMux.RUnlock() return } + // 复制sinks切片以避免在持有锁的情况下执行耗时操作 + sinksCopy := make([]func(interface{}), len(s.sinks)) + copy(sinksCopy, s.sinks) + s.sinksMux.RUnlock() + // 为每个sink创建异步任务 - for _, sink := range s.sinks { + for _, sink := range sinksCopy { // 捕获sink变量,避免闭包问题 currentSink := sink @@ -935,7 +944,9 @@ func (s *Stream) safeSendToDataChan(data interface{}) bool { } func (s *Stream) AddSink(sink func(interface{})) { + s.sinksMux.Lock() s.sinks = append(s.sinks, sink) + s.sinksMux.Unlock() } func (s *Stream) GetResultsChan() <-chan interface{} {