mirror of
https://gitee.com/rulego/streamsql.git
synced 2025-07-05 15:49:14 +00:00
fix:修复锁竞争问题
This commit is contained in:
@ -25,6 +25,7 @@ type PersistenceManager struct {
|
||||
pendingData []interface{} // 待写入数据
|
||||
pendingMutex sync.Mutex // 待写入数据互斥锁
|
||||
isRunning bool // 是否运行中
|
||||
runningMutex sync.RWMutex // 保护isRunning字段的读写锁
|
||||
stopChan chan struct{} // 停止通道
|
||||
|
||||
// 统计信息 (新增)
|
||||
@ -76,7 +77,12 @@ func (pm *PersistenceManager) Start() error {
|
||||
pm.writeMutex.Lock()
|
||||
defer pm.writeMutex.Unlock()
|
||||
|
||||
if pm.isRunning {
|
||||
// 检查是否已经在运行
|
||||
pm.runningMutex.RLock()
|
||||
running := pm.isRunning
|
||||
pm.runningMutex.RUnlock()
|
||||
|
||||
if running {
|
||||
return fmt.Errorf("persistence manager already running")
|
||||
}
|
||||
|
||||
@ -85,7 +91,10 @@ func (pm *PersistenceManager) Start() error {
|
||||
return fmt.Errorf("failed to create initial file: %w", err)
|
||||
}
|
||||
|
||||
// 设置运行状态
|
||||
pm.runningMutex.Lock()
|
||||
pm.isRunning = true
|
||||
pm.runningMutex.Unlock()
|
||||
|
||||
// 启动定时刷新
|
||||
pm.startFlushTimer()
|
||||
@ -102,11 +111,20 @@ func (pm *PersistenceManager) Stop() error {
|
||||
pm.writeMutex.Lock()
|
||||
defer pm.writeMutex.Unlock()
|
||||
|
||||
if !pm.isRunning {
|
||||
// 检查是否正在运行
|
||||
pm.runningMutex.RLock()
|
||||
running := pm.isRunning
|
||||
pm.runningMutex.RUnlock()
|
||||
|
||||
if !running {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 设置停止状态
|
||||
pm.runningMutex.Lock()
|
||||
pm.isRunning = false
|
||||
pm.runningMutex.Unlock()
|
||||
|
||||
close(pm.stopChan)
|
||||
|
||||
// 停止定时器
|
||||
@ -128,7 +146,12 @@ func (pm *PersistenceManager) Stop() error {
|
||||
|
||||
// PersistData 持久化单条数据
|
||||
func (pm *PersistenceManager) PersistData(data interface{}) error {
|
||||
if !pm.isRunning {
|
||||
// 检查是否正在运行
|
||||
pm.runningMutex.RLock()
|
||||
running := pm.isRunning
|
||||
pm.runningMutex.RUnlock()
|
||||
|
||||
if !running {
|
||||
return fmt.Errorf("persistence manager not running")
|
||||
}
|
||||
|
||||
@ -179,8 +202,13 @@ func (pm *PersistenceManager) GetStats() map[string]interface{} {
|
||||
fileIndex := pm.fileIndex
|
||||
pm.writeMutex.Unlock()
|
||||
|
||||
// 安全地读取运行状态
|
||||
pm.runningMutex.RLock()
|
||||
running := pm.isRunning
|
||||
pm.runningMutex.RUnlock()
|
||||
|
||||
return map[string]interface{}{
|
||||
"running": pm.isRunning,
|
||||
"running": running,
|
||||
"data_dir": pm.dataDir,
|
||||
"pending_count": pendingCount,
|
||||
"current_file_size": currentFileSize,
|
||||
@ -288,7 +316,12 @@ func (pm *PersistenceManager) flushPendingData() {
|
||||
// startFlushTimer 启动刷新定时器
|
||||
func (pm *PersistenceManager) startFlushTimer() {
|
||||
pm.flushTimer = time.AfterFunc(pm.flushInterval, func() {
|
||||
if pm.isRunning {
|
||||
// 安全地检查运行状态
|
||||
pm.runningMutex.RLock()
|
||||
running := pm.isRunning
|
||||
pm.runningMutex.RUnlock()
|
||||
|
||||
if running {
|
||||
pm.flushPendingData()
|
||||
pm.startFlushTimer() // 重新启动定时器
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ type Stream struct {
|
||||
|
||||
// 新增:线程安全控制
|
||||
dataChanMux sync.RWMutex // 保护dataChan访问的读写锁
|
||||
sinksMux sync.RWMutex // 保护sinks访问的读写锁
|
||||
expansionMux sync.Mutex // 防止并发扩容的互斥锁
|
||||
retryMux sync.Mutex // 控制持久化重试的互斥锁
|
||||
expanding int32 // 扩容状态标记,使用原子操作
|
||||
@ -543,12 +544,20 @@ func (s *Stream) sendResultNonBlocking(results []map[string]interface{}) {
|
||||
|
||||
// callSinksAsync 异步调用所有sink函数
|
||||
func (s *Stream) callSinksAsync(results []map[string]interface{}) {
|
||||
// 使用读锁安全地访问sinks切片
|
||||
s.sinksMux.RLock()
|
||||
if len(s.sinks) == 0 {
|
||||
s.sinksMux.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
// 复制sinks切片以避免在持有锁的情况下执行耗时操作
|
||||
sinksCopy := make([]func(interface{}), len(s.sinks))
|
||||
copy(sinksCopy, s.sinks)
|
||||
s.sinksMux.RUnlock()
|
||||
|
||||
// 为每个sink创建异步任务
|
||||
for _, sink := range s.sinks {
|
||||
for _, sink := range sinksCopy {
|
||||
// 捕获sink变量,避免闭包问题
|
||||
currentSink := sink
|
||||
|
||||
@ -935,7 +944,9 @@ func (s *Stream) safeSendToDataChan(data interface{}) bool {
|
||||
}
|
||||
|
||||
func (s *Stream) AddSink(sink func(interface{})) {
|
||||
s.sinksMux.Lock()
|
||||
s.sinks = append(s.sinks, sink)
|
||||
s.sinksMux.Unlock()
|
||||
}
|
||||
|
||||
func (s *Stream) GetResultsChan() <-chan interface{} {
|
||||
|
Reference in New Issue
Block a user