diff --git a/stream/persistence.go b/stream/persistence.go index 0b9287b..7977f13 100644 --- a/stream/persistence.go +++ b/stream/persistence.go @@ -12,7 +12,7 @@ import ( "github.com/rulego/streamsql/logger" ) -// PersistenceManager 持久化管理器 +// PersistenceManager 数据持久化管理器 type PersistenceManager struct { dataDir string // 持久化数据目录 maxFileSize int64 // 单个文件最大大小(字节) @@ -26,9 +26,14 @@ type PersistenceManager struct { pendingMutex sync.Mutex // 待写入数据互斥锁 isRunning bool // 是否运行中 stopChan chan struct{} // 停止通道 + + // 统计信息 (新增) + totalPersisted int64 + totalLoaded int64 + filesCreated int64 } -// NewPersistenceManager 创建持久化管理器 +// NewPersistenceManager 创建默认配置的持久化管理器 func NewPersistenceManager(dataDir string) *PersistenceManager { pm := &PersistenceManager{ dataDir: dataDir, @@ -47,7 +52,7 @@ func NewPersistenceManager(dataDir string) *PersistenceManager { return pm } -// NewPersistenceManagerWithConfig 创建带自定义配置的持久化管理器 +// NewPersistenceManagerWithConfig 创建自定义配置的持久化管理器 func NewPersistenceManagerWithConfig(dataDir string, maxFileSize int64, flushInterval time.Duration) *PersistenceManager { pm := &PersistenceManager{ dataDir: dataDir, @@ -121,7 +126,7 @@ func (pm *PersistenceManager) Stop() error { return nil } -// PersistData 持久化数据 +// PersistData 持久化单条数据 func (pm *PersistenceManager) PersistData(data interface{}) error { if !pm.isRunning { return fmt.Errorf("persistence manager not running") @@ -129,11 +134,65 @@ func (pm *PersistenceManager) PersistData(data interface{}) error { pm.pendingMutex.Lock() pm.pendingData = append(pm.pendingData, data) + pm.totalPersisted++ pm.pendingMutex.Unlock() return nil } +// LoadPersistedData 加载并删除持久化数据 +func (pm *PersistenceManager) LoadPersistedData() ([]interface{}, error) { + files, err := filepath.Glob(filepath.Join(pm.dataDir, "streamsql_overflow_*.log")) + if err != nil { + return nil, fmt.Errorf("failed to glob files: %w", err) + } + + var allData []interface{} + + for _, filename := range files { + data, err := pm.loadDataFromFile(filename) + if err != nil { + logger.Error("Failed to load file %s: %v", filename, err) + continue + } + allData = append(allData, data...) + pm.totalLoaded += int64(len(data)) + + // 加载后删除文件 + if err := os.Remove(filename); err != nil { + logger.Error("Failed to delete loaded file %s: %v", filename, err) + } + } + + logger.Info("Loaded %d data records from persistence files", len(allData)) + return allData, nil +} + +// GetStats 获取持久化统计信息 +func (pm *PersistenceManager) GetStats() map[string]interface{} { + pm.pendingMutex.Lock() + pendingCount := len(pm.pendingData) + pm.pendingMutex.Unlock() + + pm.writeMutex.Lock() + currentFileSize := pm.currentSize + fileIndex := pm.fileIndex + pm.writeMutex.Unlock() + + return map[string]interface{}{ + "running": pm.isRunning, + "data_dir": pm.dataDir, + "pending_count": pendingCount, + "current_file_size": currentFileSize, + "file_index": fileIndex, + "max_file_size": pm.maxFileSize, + "flush_interval": pm.flushInterval.String(), + "total_persisted": pm.totalPersisted, + "total_loaded": pm.totalLoaded, + "files_created": pm.filesCreated, + } +} + // createNewFile 创建新的持久化文件 func (pm *PersistenceManager) createNewFile() error { // 关闭当前文件 @@ -155,6 +214,7 @@ func (pm *PersistenceManager) createNewFile() error { pm.currentFile = file pm.currentSize = 0 pm.fileIndex++ + pm.filesCreated++ logger.Info("Created new persistence file: %s", filepath) return nil @@ -259,33 +319,6 @@ func (pm *PersistenceManager) backgroundProcessor() { } } -// LoadPersistedData 加载持久化数据 -func (pm *PersistenceManager) LoadPersistedData() ([]interface{}, error) { - files, err := filepath.Glob(filepath.Join(pm.dataDir, "streamsql_overflow_*.log")) - if err != nil { - return nil, fmt.Errorf("failed to glob files: %w", err) - } - - var allData []interface{} - - for _, filename := range files { - data, err := pm.loadDataFromFile(filename) - if err != nil { - logger.Error("Failed to load file %s: %v", filename, err) - continue - } - allData = append(allData, data...) - - // 加载后删除文件 - if err := os.Remove(filename); err != nil { - logger.Error("Failed to delete loaded file %s: %v", filename, err) - } - } - - logger.Info("Loaded %d data records from persistence files", len(allData)) - return allData, nil -} - // loadDataFromFile 从文件加载数据 func (pm *PersistenceManager) loadDataFromFile(filename string) ([]interface{}, error) { file, err := os.Open(filename) @@ -318,25 +351,3 @@ func (pm *PersistenceManager) loadDataFromFile(filename string) ([]interface{}, return data, nil } - -// GetStats 获取持久化统计信息 -func (pm *PersistenceManager) GetStats() map[string]interface{} { - pm.pendingMutex.Lock() - pendingCount := len(pm.pendingData) - pm.pendingMutex.Unlock() - - pm.writeMutex.Lock() - currentFileSize := pm.currentSize - fileIndex := pm.fileIndex - pm.writeMutex.Unlock() - - return map[string]interface{}{ - "running": pm.isRunning, - "data_dir": pm.dataDir, - "pending_count": pendingCount, - "current_file_size": currentFileSize, - "file_index": fileIndex, - "max_file_size": pm.maxFileSize, - "flush_interval": pm.flushInterval.String(), - } -} diff --git a/stream/stream.go b/stream/stream.go index 534d8ce..e5d7462 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -31,6 +31,15 @@ type Stream struct { seenResults *sync.Map done chan struct{} // 用于关闭处理协程 sinkWorkerPool chan func() // Sink工作池,避免阻塞 + + // 新增:线程安全控制 + dataChanMux sync.RWMutex // 保护dataChan访问的读写锁 + expansionMux sync.Mutex // 防止并发扩容的互斥锁 + retryMux sync.Mutex // 控制持久化重试的互斥锁 + expanding int32 // 扩容状态标记,使用原子操作 + activeRetries int32 // 活跃重试计数,使用原子操作 + maxRetryRoutines int32 // 最大重试协程数限制 + // 性能监控指标 inputCount int64 // 输入数据计数 outputCount int64 // 输出结果计数 @@ -72,6 +81,7 @@ func NewStreamWithBuffers(config types.Config, dataBufSize, resultBufSize, sinkP allowDataDrop: false, // 默认不允许数据丢失 blockingTimeout: 0, // 默认无超时 overflowStrategy: "expand", // 默认动态扩容策略 + maxRetryRoutines: 5, // 最大重试协程数限制 } // 启动Sink工作池,异步处理sink调用 @@ -131,6 +141,7 @@ func NewStreamWithLossPolicy(config types.Config, dataBufSize, resultBufSize, si allowDataDrop: overflowStrategy == "drop", blockingTimeout: timeout, overflowStrategy: overflowStrategy, + maxRetryRoutines: 5, // 最大重试协程数限制 } // 如果是持久化策略,初始化持久化管理器 @@ -186,6 +197,7 @@ func NewStreamWithLossPolicyAndPersistence(config types.Config, dataBufSize, res allowDataDrop: overflowStrategy == "drop", blockingTimeout: timeout, overflowStrategy: overflowStrategy, + maxRetryRoutines: 5, // 最大重试协程数限制 } // 如果是持久化策略,使用自定义配置初始化持久化管理器 @@ -404,8 +416,13 @@ func (s *Stream) process() { // 主处理循环 for { + // 使用读锁安全访问dataChan + s.dataChanMux.RLock() + currentDataChan := s.dataChan + s.dataChanMux.RUnlock() + select { - case data, ok := <-s.dataChan: + case data, ok := <-currentDataChan: if !ok { // 通道已关闭 return @@ -737,11 +754,12 @@ func (s *Stream) AddData(data interface{}) { } } -// addDataBlocking 阻塞模式添加数据,保证零数据丢失 +// addDataBlocking 阻塞模式添加数据,保证零数据丢失 (线程安全版本) func (s *Stream) addDataBlocking(data interface{}) { if s.blockingTimeout <= 0 { // 无超时限制,永久阻塞直到成功 - s.dataChan <- data + dataChan := s.safeGetDataChan() + dataChan <- data return } @@ -749,152 +767,173 @@ func (s *Stream) addDataBlocking(data interface{}) { timer := time.NewTimer(s.blockingTimeout) defer timer.Stop() + dataChan := s.safeGetDataChan() select { - case s.dataChan <- data: + case dataChan <- data: // 成功添加数据 return case <-timer.C: // 超时但不丢弃数据,记录错误但继续阻塞 logger.Error("Data addition timeout, but continue waiting to avoid data loss") - // 继续无限期阻塞 - s.dataChan <- data + // 继续无限期阻塞,重新获取当前通道引用 + finalDataChan := s.safeGetDataChan() + finalDataChan <- data } } -// addDataWithExpansion 动态扩容模式 +// addDataWithExpansion 动态扩容模式 (线程安全版本) func (s *Stream) addDataWithExpansion(data interface{}) { - select { - case s.dataChan <- data: - // 成功添加数据 + // 首次尝试添加数据 + if s.safeSendToDataChan(data) { return - default: - // 通道满了,动态扩容 - s.expandDataChannel() - // 扩容后重试 - select { - case s.dataChan <- data: - logger.Info("Successfully added data after data channel expansion") - return - default: - // 如果扩容后仍然满,则阻塞等待 - s.dataChan <- data - } } + + // 通道满了,动态扩容 + s.expandDataChannel() + + // 扩容后重试,重新获取通道引用 + if s.safeSendToDataChan(data) { + logger.Info("Successfully added data after data channel expansion") + return + } + + // 如果扩容后仍然满,则阻塞等待 + dataChan := s.safeGetDataChan() + dataChan <- data } -// addDataWithPersistence 持久化模式(完整实现) +// addDataWithPersistence 持久化模式(线程安全完整实现) func (s *Stream) addDataWithPersistence(data interface{}) { - select { - case s.dataChan <- data: - // 成功添加数据 + // 首次尝试添加数据 + if s.safeSendToDataChan(data) { return - default: - // 通道满了,持久化到磁盘 - if s.persistenceManager != nil { - if err := s.persistenceManager.PersistData(data); err != nil { - logger.Error("Failed to persist data: %v", err) - atomic.AddInt64(&s.droppedCount, 1) - } else { - logger.Debug("Data has been persisted to disk") - } - } else { - logger.Error("Persistence manager not initialized, data will be lost") - atomic.AddInt64(&s.droppedCount, 1) - } - - // 启动异步重试 - go s.persistAndRetryData(data) } + + // 通道满了,持久化到磁盘 + if s.persistenceManager != nil { + if err := s.persistenceManager.PersistData(data); err != nil { + logger.Error("Failed to persist data: %v", err) + atomic.AddInt64(&s.droppedCount, 1) + } else { + logger.Debug("Data has been persisted to disk") + } + } else { + logger.Error("Persistence manager not initialized, data will be lost") + atomic.AddInt64(&s.droppedCount, 1) + } + + // 启动异步重试 + go s.persistAndRetryData(data) } -// addDataWithDrop 原有的丢弃模式 +// addDataWithDrop 原有的丢弃模式 (线程安全版本) func (s *Stream) addDataWithDrop(data interface{}) { // 优化: 智能非阻塞添加,分层背压控制 - select { - case s.dataChan <- data: - // 成功添加数据 + if s.safeSendToDataChan(data) { return - default: - // 数据通道已满,使用分层背压策略 - chanLen := len(s.dataChan) - chanCap := cap(s.dataChan) - usage := float64(chanLen) / float64(chanCap) + } - // 根据通道使用率和缓冲区大小调整策略 - var waitTime time.Duration - var maxRetries int + // 数据通道已满,使用分层背压策略,获取通道状态 + s.dataChanMux.RLock() + chanLen := len(s.dataChan) + chanCap := cap(s.dataChan) + currentDataChan := s.dataChan + s.dataChanMux.RUnlock() + usage := float64(chanLen) / float64(chanCap) + + // 根据通道使用率和缓冲区大小调整策略 + var waitTime time.Duration + var maxRetries int + + switch { + case chanCap >= 100000: // 超大缓冲区(基准测试模式) switch { - case chanCap >= 100000: // 超大缓冲区(基准测试模式) - switch { - case usage > 0.99: - waitTime = 1 * time.Millisecond // 更长等待 - maxRetries = 3 - case usage > 0.95: - waitTime = 500 * time.Microsecond - maxRetries = 2 - case usage > 0.90: - waitTime = 100 * time.Microsecond - maxRetries = 1 - default: - // 立即丢弃 - logger.Warn("Data channel is full, dropping input data") - atomic.AddInt64(&s.droppedCount, 1) - return - } - - case chanCap >= 50000: // 高性能模式 - switch { - case usage > 0.99: - waitTime = 500 * time.Microsecond - maxRetries = 2 - case usage > 0.95: - waitTime = 200 * time.Microsecond - maxRetries = 1 - case usage > 0.90: - waitTime = 50 * time.Microsecond - maxRetries = 1 - default: - logger.Warn("Data channel is full, dropping input data") - atomic.AddInt64(&s.droppedCount, 1) - return - } - - default: // 默认模式 - switch { - case usage > 0.99: - waitTime = 100 * time.Microsecond - maxRetries = 1 - case usage > 0.95: - waitTime = 50 * time.Microsecond - maxRetries = 1 - default: - logger.Warn("Data channel is full, dropping input data") - atomic.AddInt64(&s.droppedCount, 1) - return - } + case usage > 0.99: + waitTime = 1 * time.Millisecond // 更长等待 + maxRetries = 3 + case usage > 0.95: + waitTime = 500 * time.Microsecond + maxRetries = 2 + case usage > 0.90: + waitTime = 100 * time.Microsecond + maxRetries = 1 + default: + // 立即丢弃 + logger.Warn("Data channel is full, dropping input data") + atomic.AddInt64(&s.droppedCount, 1) + return } - // 多次重试添加数据 - for retry := 0; retry < maxRetries; retry++ { - timer := time.NewTimer(waitTime) - select { - case s.dataChan <- data: - // 重试成功 - timer.Stop() - return - case <-timer.C: - // 超时,继续下一次重试或者丢弃 - if retry == maxRetries-1 { - // 最后一次重试失败,记录丢弃 - logger.Warn("Data channel is full, dropping input data") - atomic.AddInt64(&s.droppedCount, 1) - } + case chanCap >= 50000: // 高性能模式 + switch { + case usage > 0.99: + waitTime = 500 * time.Microsecond + maxRetries = 2 + case usage > 0.95: + waitTime = 200 * time.Microsecond + maxRetries = 1 + case usage > 0.90: + waitTime = 50 * time.Microsecond + maxRetries = 1 + default: + logger.Warn("Data channel is full, dropping input data") + atomic.AddInt64(&s.droppedCount, 1) + return + } + + default: // 默认模式 + switch { + case usage > 0.99: + waitTime = 100 * time.Microsecond + maxRetries = 1 + case usage > 0.95: + waitTime = 50 * time.Microsecond + maxRetries = 1 + default: + logger.Warn("Data channel is full, dropping input data") + atomic.AddInt64(&s.droppedCount, 1) + return + } + } + + // 多次重试添加数据,使用线程安全的方式 + for retry := 0; retry < maxRetries; retry++ { + timer := time.NewTimer(waitTime) + select { + case currentDataChan <- data: + // 重试成功 + timer.Stop() + return + case <-timer.C: + // 超时,继续下一次重试或者丢弃 + if retry == maxRetries-1 { + // 最后一次重试失败,记录丢弃 + logger.Warn("Data channel is full, dropping input data") + atomic.AddInt64(&s.droppedCount, 1) } } } } +// safeGetDataChan 线程安全地获取dataChan引用 +func (s *Stream) safeGetDataChan() chan interface{} { + s.dataChanMux.RLock() + defer s.dataChanMux.RUnlock() + return s.dataChan +} + +// safeSendToDataChan 线程安全地向dataChan发送数据 +func (s *Stream) safeSendToDataChan(data interface{}) bool { + dataChan := s.safeGetDataChan() + select { + case dataChan <- data: + return true + default: + return false + } +} + func (s *Stream) AddSink(sink func(interface{})) { s.sinks = append(s.sinks, sink) } @@ -919,18 +958,26 @@ func (s *Stream) Stop() { } } -// GetStats 获取流处理统计信息 +// GetStats 获取流处理统计信息 (线程安全版本) func (s *Stream) GetStats() map[string]int64 { + // 线程安全地获取dataChan状态 + s.dataChanMux.RLock() + dataChanLen := int64(len(s.dataChan)) + dataChanCap := int64(cap(s.dataChan)) + s.dataChanMux.RUnlock() + return map[string]int64{ "input_count": atomic.LoadInt64(&s.inputCount), "output_count": atomic.LoadInt64(&s.outputCount), "dropped_count": atomic.LoadInt64(&s.droppedCount), - "data_chan_len": int64(len(s.dataChan)), - "data_chan_cap": int64(cap(s.dataChan)), + "data_chan_len": dataChanLen, + "data_chan_cap": dataChanCap, "result_chan_len": int64(len(s.resultChan)), "result_chan_cap": int64(cap(s.resultChan)), "sink_pool_len": int64(len(s.sinkWorkerPool)), "sink_pool_cap": int64(cap(s.sinkWorkerPool)), + "active_retries": int64(atomic.LoadInt32(&s.activeRetries)), + "expanding": int64(atomic.LoadInt32(&s.expanding)), } } @@ -988,7 +1035,29 @@ func (s *Stream) ResetStats() { // expandDataChannel 动态扩容数据通道 func (s *Stream) expandDataChannel() { + // 使用原子操作检查是否正在扩容,防止并发扩容 + if !atomic.CompareAndSwapInt32(&s.expanding, 0, 1) { + logger.Debug("Channel expansion already in progress, skipping") + return + } + defer atomic.StoreInt32(&s.expanding, 0) + + // 获取扩容锁,确保只有一个协程进行扩容 + s.expansionMux.Lock() + defer s.expansionMux.Unlock() + + // 再次检查是否需要扩容(双重检查锁定模式) + s.dataChanMux.RLock() oldCap := cap(s.dataChan) + currentLen := len(s.dataChan) + s.dataChanMux.RUnlock() + + // 如果当前通道使用率低于80%,则不需要扩容 + if float64(currentLen)/float64(oldCap) < 0.8 { + logger.Debug("Channel usage below threshold, expansion not needed") + return + } + newCap := int(float64(oldCap) * 1.5) // 扩容50% if newCap < oldCap+1000 { newCap = oldCap + 1000 // 至少增加1000 @@ -999,45 +1068,115 @@ func (s *Stream) expandDataChannel() { // 创建新的更大的通道 newChan := make(chan interface{}, newCap) - // 安全地迁移数据:将旧通道中的数据快速迁移到新通道 - // 注意:这里不能关闭旧通道,因为process协程可能还在读取 + // 使用写锁安全地迁移数据 + s.dataChanMux.Lock() oldChan := s.dataChan + // 将旧通道中的数据快速迁移到新通道 + migrationTimeout := time.NewTimer(5 * time.Second) // 5秒迁移超时 + defer migrationTimeout.Stop() + + migratedCount := 0 + for { + select { + case data := <-oldChan: + select { + case newChan <- data: + migratedCount++ + case <-migrationTimeout.C: + logger.Warn("Data migration timeout, some data may be lost during expansion") + goto migration_done + } + case <-migrationTimeout.C: + logger.Warn("Data migration timeout during channel drain") + goto migration_done + default: + // 旧通道为空,迁移完成 + goto migration_done + } + } + +migration_done: // 原子性地更新通道引用 s.dataChan = newChan + s.dataChanMux.Unlock() - // 启动协程异步迁移旧数据 - go func() { - for { - select { - case data := <-oldChan: - newChan <- data - default: - // 旧通道为空,迁移完成 - return - } - } - }() + logger.Info("Channel expansion completed: migrated %d items", migratedCount) } -// persistAndRetryData 持久化数据并重试 +// persistAndRetryData 持久化数据并重试 (改进版本,具备指数退避和资源控制) func (s *Stream) persistAndRetryData(data interface{}) { - // 简化实现:等待一段时间后重试 - retryInterval := 100 * time.Millisecond - maxRetries := 50 // 最大重试50次,总共5秒 + // 检查活跃重试协程数量,防止资源泄漏 + currentRetries := atomic.LoadInt32(&s.activeRetries) + if currentRetries >= s.maxRetryRoutines { + logger.Warn("Maximum retry routines reached (%d), dropping data", currentRetries) + atomic.AddInt64(&s.droppedCount, 1) + return + } - for i := 0; i < maxRetries; i++ { - time.Sleep(retryInterval) + // 增加活跃重试计数 + atomic.AddInt32(&s.activeRetries, 1) + defer atomic.AddInt32(&s.activeRetries, -1) + + // 使用指数退避策略 + baseInterval := 50 * time.Millisecond + maxInterval := 2 * time.Second + maxRetries := 10 // 减少最大重试次数,防止长时间阻塞 + totalTimeout := 30 * time.Second // 总超时时间 + + retryTimer := time.NewTimer(totalTimeout) + defer retryTimer.Stop() + + for attempt := 0; attempt < maxRetries; attempt++ { + // 计算当前重试间隔(指数退避) + currentInterval := time.Duration(float64(baseInterval) * (1.5 * float64(attempt))) + if currentInterval > maxInterval { + currentInterval = maxInterval + } + + // 等待重试间隔 + waitTimer := time.NewTimer(currentInterval) + select { + case <-waitTimer.C: + // 继续重试 + case <-retryTimer.C: + waitTimer.Stop() + logger.Warn("Persistence retry timeout reached, dropping data") + atomic.AddInt64(&s.droppedCount, 1) + return + case <-s.done: + waitTimer.Stop() + logger.Debug("Stream stopped during retry, dropping data") + atomic.AddInt64(&s.droppedCount, 1) + return + } + waitTimer.Stop() + + // 使用线程安全方式尝试发送数据 + s.dataChanMux.RLock() + currentDataChan := s.dataChan + s.dataChanMux.RUnlock() select { - case s.dataChan <- data: - logger.Info("Persistence data retry successful: attempt %d", i+1) + case currentDataChan <- data: + logger.Debug("Persistence data retry successful: attempt %d", attempt+1) + return + case <-retryTimer.C: + logger.Warn("Persistence retry timeout during send, dropping data") + atomic.AddInt64(&s.droppedCount, 1) + return + case <-s.done: + logger.Debug("Stream stopped during retry send, dropping data") + atomic.AddInt64(&s.droppedCount, 1) return default: - // 继续重试 - if i == maxRetries-1 { - logger.Error("Persistence data retry failed, maximum retry attempts reached") + // 通道仍然满,继续下一次重试 + if attempt == maxRetries-1 { + logger.Error("Persistence data retry failed after %d attempts, dropping data", maxRetries) atomic.AddInt64(&s.droppedCount, 1) + } else { + logger.Debug("Persistence retry attempt %d/%d failed, will retry with interval %v", + attempt+1, maxRetries, currentInterval) } } } @@ -1062,21 +1201,21 @@ func (s *Stream) LoadAndReprocessPersistedData() error { logger.Info("Start reprocessing %d persistent data records", len(persistedData)) - // 重新处理每条数据 + // 重新处理每条数据(线程安全版本) successCount := 0 for i, data := range persistedData { - select { - case s.dataChan <- data: + // 使用线程安全方式尝试发送数据 + if s.safeSendToDataChan(data) { successCount++ - default: - // 如果通道还是满的,等待一小段时间再试 - time.Sleep(10 * time.Millisecond) - select { - case s.dataChan <- data: - successCount++ - default: - logger.Warn("Failed to recover data record %d, channel still full", i+1) - } + continue + } + + // 如果通道还是满的,等待一小段时间再试 + time.Sleep(10 * time.Millisecond) + if s.safeSendToDataChan(data) { + successCount++ + } else { + logger.Warn("Failed to recover data record %d, channel still full", i+1) } }