mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-12 21:37:17 +00:00
feat:优化持久加载机制
This commit is contained in:
+126
-21
@@ -23,6 +23,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -145,6 +146,10 @@ func NewPersistenceManagerWithConfig(dataDir string, maxFileSize int64, flushInt
|
||||
stopChan: make(chan struct{}),
|
||||
recoveryQueue: make(chan OrderedDataItem, 10000),
|
||||
sequenceCounter: 0,
|
||||
// Retry and dead letter queue configuration
|
||||
maxRetryCount: 3, // Default maximum 3 retries
|
||||
deadLetterQueue: make([]DeadLetterItem, 0, 1000), // Dead letter queue
|
||||
retryDataMap: make(map[int64]*OrderedDataItem), // Retry data mapping
|
||||
}
|
||||
|
||||
// Ensure data directory exists
|
||||
@@ -168,6 +173,10 @@ func (pm *PersistenceManager) Start() error {
|
||||
return fmt.Errorf("ordered persistence manager already running")
|
||||
}
|
||||
|
||||
// Reinitialize channels if they were closed
|
||||
pm.stopChan = make(chan struct{})
|
||||
pm.recoveryQueue = make(chan OrderedDataItem, 10000)
|
||||
|
||||
// Create initial file
|
||||
pm.writeMutex.Lock()
|
||||
if err := pm.createNewFile(); err != nil {
|
||||
@@ -188,6 +197,12 @@ func (pm *PersistenceManager) Start() error {
|
||||
go pm.backgroundProcessor()
|
||||
go pm.recoveryProcessor()
|
||||
|
||||
// Load and recover existing data
|
||||
if err := pm.LoadAndRecoverData(); err != nil {
|
||||
logger.Error("Failed to load and recover data: %v", err)
|
||||
// Don't return error, continue running
|
||||
}
|
||||
|
||||
logger.Info("Ordered persistence manager started successfully, data directory: %s", pm.dataDir)
|
||||
return nil
|
||||
}
|
||||
@@ -210,7 +225,13 @@ func (pm *PersistenceManager) Stop() error {
|
||||
pm.isRunning = false
|
||||
pm.runningMutex.Unlock()
|
||||
|
||||
close(pm.stopChan)
|
||||
// Close stop channel safely
|
||||
select {
|
||||
case <-pm.stopChan:
|
||||
// Channel already closed
|
||||
default:
|
||||
close(pm.stopChan)
|
||||
}
|
||||
|
||||
// Stop timer
|
||||
pm.writeMutex.Lock()
|
||||
@@ -222,7 +243,7 @@ func (pm *PersistenceManager) Stop() error {
|
||||
// Flush remaining data
|
||||
pm.flushPendingData()
|
||||
|
||||
// Close current file
|
||||
// Close current file with proper synchronization
|
||||
pm.writeMutex.Lock()
|
||||
if pm.currentFile != nil {
|
||||
pm.currentFile.Close()
|
||||
@@ -230,8 +251,23 @@ func (pm *PersistenceManager) Stop() error {
|
||||
}
|
||||
pm.writeMutex.Unlock()
|
||||
|
||||
// Close recovery queue
|
||||
close(pm.recoveryQueue)
|
||||
// Close recovery queue safely
|
||||
go func() {
|
||||
// Drain the recovery queue in a separate goroutine
|
||||
for {
|
||||
select {
|
||||
case <-pm.recoveryQueue:
|
||||
// Continue draining
|
||||
default:
|
||||
// Queue is empty, safe to close
|
||||
close(pm.recoveryQueue)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Give some time for the goroutine to drain the queue
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
logger.Info("Ordered persistence manager stopped")
|
||||
return nil
|
||||
@@ -296,11 +332,20 @@ func (pm *PersistenceManager) PersistDataWithRetryLimit(data map[string]interfac
|
||||
// Returns:
|
||||
// - error: error during loading process
|
||||
func (pm *PersistenceManager) LoadAndRecoverData() error {
|
||||
files, err := filepath.Glob(filepath.Join(pm.dataDir, "streamsql_ordered_*.log"))
|
||||
// 只加载未处理的文件(排除.processed文件)
|
||||
allFiles, err := filepath.Glob(filepath.Join(pm.dataDir, "streamsql_ordered_*.log"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to glob files: %w", err)
|
||||
}
|
||||
|
||||
// 过滤掉已处理的文件(.processed后缀的文件)
|
||||
var files []string
|
||||
for _, file := range allFiles {
|
||||
if !strings.HasSuffix(file, ".processed") {
|
||||
files = append(files, file)
|
||||
}
|
||||
}
|
||||
|
||||
if len(files) == 0 {
|
||||
logger.Info("No persistence files found for recovery")
|
||||
return nil
|
||||
@@ -317,9 +362,11 @@ func (pm *PersistenceManager) LoadAndRecoverData() error {
|
||||
}
|
||||
allItems = append(allItems, items...)
|
||||
|
||||
// 加载后删除文件
|
||||
if err := os.Remove(filename); err != nil {
|
||||
logger.Error("Failed to delete loaded file %s: %v", filename, err)
|
||||
// 加载后直接删除文件
|
||||
if deleteErr := os.Remove(filename); deleteErr != nil {
|
||||
logger.Error("Failed to delete file %s: %v", filename, deleteErr)
|
||||
} else {
|
||||
logger.Info("File %s processed and deleted", filename)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,23 +386,45 @@ func (pm *PersistenceManager) LoadAndRecoverData() error {
|
||||
pm.recoveryMode = true
|
||||
pm.recoveryMutex.Unlock()
|
||||
|
||||
// 如果没有数据需要恢复,立即退出恢复模式
|
||||
if len(allItems) == 0 {
|
||||
pm.recoveryMutex.Lock()
|
||||
pm.recoveryMode = false
|
||||
pm.recoveryMutex.Unlock()
|
||||
logger.Info("No data to recover, exiting recovery mode")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 将数据放入恢复队列
|
||||
for _, item := range allItems {
|
||||
select {
|
||||
case pm.recoveryQueue <- item:
|
||||
// 数据已放入恢复队列
|
||||
case <-pm.stopChan:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info("Data recovery completed, %d items recovered in order", len(allItems))
|
||||
|
||||
// 启动一个goroutine来监控恢复队列,当队列为空时退出恢复模式
|
||||
go func() {
|
||||
for _, item := range allItems {
|
||||
ticker := time.NewTicker(1 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case pm.recoveryQueue <- item:
|
||||
// 数据已放入恢复队列
|
||||
case <-ticker.C:
|
||||
if len(pm.recoveryQueue) == 0 {
|
||||
pm.recoveryMutex.Lock()
|
||||
pm.recoveryMode = false
|
||||
pm.recoveryMutex.Unlock()
|
||||
return
|
||||
}
|
||||
case <-pm.stopChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 恢复完成,退出恢复模式
|
||||
pm.recoveryMutex.Lock()
|
||||
pm.recoveryMode = false
|
||||
pm.recoveryMutex.Unlock()
|
||||
|
||||
logger.Info("Data recovery completed, %d items recovered in order", len(allItems))
|
||||
}()
|
||||
|
||||
atomic.AddInt64(&pm.totalLoaded, int64(len(allItems)))
|
||||
@@ -380,8 +449,20 @@ func (pm *PersistenceManager) GetRecoveryData() (map[string]interface{}, bool) {
|
||||
select {
|
||||
case item := <-pm.recoveryQueue:
|
||||
atomic.AddInt64(&pm.totalRecovered, 1)
|
||||
|
||||
// 检查队列是否为空,如果为空则退出恢复模式
|
||||
if len(pm.recoveryQueue) == 0 {
|
||||
pm.recoveryMutex.Lock()
|
||||
pm.recoveryMode = false
|
||||
pm.recoveryMutex.Unlock()
|
||||
}
|
||||
|
||||
return item.Data, true
|
||||
default:
|
||||
// 队列为空,退出恢复模式
|
||||
pm.recoveryMutex.Lock()
|
||||
pm.recoveryMode = false
|
||||
pm.recoveryMutex.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
@@ -595,7 +676,7 @@ func (pm *PersistenceManager) backgroundProcessor() {
|
||||
// recoveryProcessor 恢复处理协程
|
||||
func (pm *PersistenceManager) recoveryProcessor() {
|
||||
// 这个协程主要用于监控恢复状态,实际恢复数据由GetRecoveryData方法提供
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
ticker := time.NewTicker(100 * time.Millisecond) // 更频繁地检查
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
@@ -607,9 +688,19 @@ func (pm *PersistenceManager) recoveryProcessor() {
|
||||
|
||||
if recoveryMode {
|
||||
queueLen := len(pm.recoveryQueue)
|
||||
if queueLen > 0 {
|
||||
if queueLen == 0 {
|
||||
// 队列为空,退出恢复模式
|
||||
pm.recoveryMutex.Lock()
|
||||
pm.recoveryMode = false
|
||||
pm.recoveryMutex.Unlock()
|
||||
logger.Debug("Recovery queue empty, exiting recovery mode")
|
||||
return
|
||||
} else {
|
||||
logger.Debug("Recovery in progress, %d items remaining in queue", queueLen)
|
||||
}
|
||||
} else {
|
||||
// 不在恢复模式,退出协程
|
||||
return
|
||||
}
|
||||
|
||||
case <-pm.stopChan:
|
||||
@@ -747,7 +838,21 @@ func (pm *PersistenceManager) SetMaxRetryCount(maxRetryCount int) {
|
||||
// 返回值:
|
||||
// - bool: 是否应该重试
|
||||
func (pm *PersistenceManager) ShouldRetryRecoveredData(data map[string]interface{}) bool {
|
||||
// 检查数据中的重试次数
|
||||
// 检查数据中的重试次数(支持retry和_retry_count字段)
|
||||
if retryCountFloat, exists := data["retry"]; exists {
|
||||
if retryCount, ok := retryCountFloat.(float64); ok {
|
||||
if int(retryCount) >= pm.maxRetryCount {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if retryCount, ok := retryCountFloat.(int); ok {
|
||||
if retryCount >= pm.maxRetryCount {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 兼容性检查:也检查_retry_count字段
|
||||
if retryCountFloat, exists := data["_retry_count"]; exists {
|
||||
if retryCount, ok := retryCountFloat.(float64); ok {
|
||||
if int(retryCount) >= pm.maxRetryCount {
|
||||
|
||||
Reference in New Issue
Block a user