mirror of
https://gitee.com/rulego/streamsql.git
synced 2025-06-30 13:29:55 +00:00
1533 lines
45 KiB
Go
1533 lines
45 KiB
Go
package stream
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"reflect"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/rulego/streamsql/condition"
|
||
"github.com/rulego/streamsql/utils/fieldpath"
|
||
|
||
"github.com/rulego/streamsql/aggregator"
|
||
"github.com/rulego/streamsql/expr"
|
||
"github.com/rulego/streamsql/functions"
|
||
"github.com/rulego/streamsql/logger"
|
||
"github.com/rulego/streamsql/types"
|
||
"github.com/rulego/streamsql/window"
|
||
)
|
||
|
||
type Stream struct {
|
||
dataChan chan interface{}
|
||
filter condition.Condition
|
||
Window window.Window
|
||
aggregator aggregator.Aggregator
|
||
config types.Config
|
||
sinks []func(interface{})
|
||
resultChan chan interface{} // 结果通道
|
||
seenResults *sync.Map
|
||
done chan struct{} // 用于关闭处理协程
|
||
sinkWorkerPool chan func() // Sink工作池,避免阻塞
|
||
|
||
// 新增:线程安全控制
|
||
dataChanMux sync.RWMutex // 保护dataChan访问的读写锁
|
||
sinksMux sync.RWMutex // 保护sinks访问的读写锁
|
||
expansionMux sync.Mutex // 防止并发扩容的互斥锁
|
||
retryMux sync.Mutex // 控制持久化重试的互斥锁
|
||
expanding int32 // 扩容状态标记,使用原子操作
|
||
activeRetries int32 // 活跃重试计数,使用原子操作
|
||
maxRetryRoutines int32 // 最大重试协程数限制
|
||
|
||
// 性能监控指标
|
||
inputCount int64 // 输入数据计数
|
||
outputCount int64 // 输出结果计数
|
||
droppedCount int64 // 丢弃数据计数
|
||
|
||
// 数据丢失策略配置
|
||
allowDataDrop bool // 是否允许数据丢失
|
||
blockingTimeout time.Duration // 阻塞超时时间
|
||
overflowStrategy string // 溢出策略: "drop", "block", "expand", "persist"
|
||
persistenceManager *PersistenceManager // 持久化管理器
|
||
}
|
||
|
||
// NewStream 使用统一配置创建Stream
|
||
func NewStream(config types.Config) (*Stream, error) {
|
||
// 如果没有指定性能配置,使用默认配置
|
||
if (config.PerformanceConfig == types.PerformanceConfig{}) {
|
||
config.PerformanceConfig = types.DefaultPerformanceConfig()
|
||
}
|
||
|
||
return newStreamWithUnifiedConfig(config)
|
||
}
|
||
|
||
// NewStreamWithHighPerformance 创建高性能Stream
|
||
func NewStreamWithHighPerformance(config types.Config) (*Stream, error) {
|
||
config.PerformanceConfig = types.HighPerformanceConfig()
|
||
return newStreamWithUnifiedConfig(config)
|
||
}
|
||
|
||
// NewStreamWithLowLatency 创建低延迟Stream
|
||
func NewStreamWithLowLatency(config types.Config) (*Stream, error) {
|
||
config.PerformanceConfig = types.LowLatencyConfig()
|
||
return newStreamWithUnifiedConfig(config)
|
||
}
|
||
|
||
// NewStreamWithZeroDataLoss 创建零数据丢失Stream
|
||
func NewStreamWithZeroDataLoss(config types.Config) (*Stream, error) {
|
||
config.PerformanceConfig = types.ZeroDataLossConfig()
|
||
return newStreamWithUnifiedConfig(config)
|
||
}
|
||
|
||
// NewStreamWithCustomPerformance 创建自定义性能配置的Stream
|
||
func NewStreamWithCustomPerformance(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error) {
|
||
config.PerformanceConfig = perfConfig
|
||
return newStreamWithUnifiedConfig(config)
|
||
}
|
||
|
||
// newStreamWithUnifiedConfig 使用统一配置创建Stream的内部实现
|
||
func newStreamWithUnifiedConfig(config types.Config) (*Stream, error) {
|
||
var win window.Window
|
||
var err error
|
||
|
||
// 只有在需要窗口时才创建窗口
|
||
if config.NeedWindow {
|
||
// 将统一的性能配置传递给窗口
|
||
windowConfig := config.WindowConfig
|
||
if windowConfig.Params == nil {
|
||
windowConfig.Params = make(map[string]interface{})
|
||
}
|
||
// 传递完整的性能配置给窗口
|
||
windowConfig.Params["performanceConfig"] = config.PerformanceConfig
|
||
|
||
win, err = window.CreateWindow(windowConfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
// 使用统一配置创建Stream
|
||
perfConfig := config.PerformanceConfig
|
||
stream := &Stream{
|
||
dataChan: make(chan interface{}, perfConfig.BufferConfig.DataChannelSize),
|
||
config: config,
|
||
Window: win,
|
||
resultChan: make(chan interface{}, perfConfig.BufferConfig.ResultChannelSize),
|
||
seenResults: &sync.Map{},
|
||
done: make(chan struct{}),
|
||
sinkWorkerPool: make(chan func(), perfConfig.WorkerConfig.SinkPoolSize),
|
||
allowDataDrop: perfConfig.OverflowConfig.AllowDataLoss,
|
||
blockingTimeout: perfConfig.OverflowConfig.BlockTimeout,
|
||
overflowStrategy: perfConfig.OverflowConfig.Strategy,
|
||
maxRetryRoutines: int32(perfConfig.WorkerConfig.MaxRetryRoutines),
|
||
}
|
||
|
||
// 如果是持久化策略,初始化持久化管理器
|
||
if perfConfig.OverflowConfig.Strategy == "persist" && perfConfig.OverflowConfig.PersistenceConfig != nil {
|
||
persistConfig := perfConfig.OverflowConfig.PersistenceConfig
|
||
stream.persistenceManager = NewPersistenceManagerWithConfig(
|
||
persistConfig.DataDir,
|
||
persistConfig.MaxFileSize,
|
||
persistConfig.FlushInterval,
|
||
)
|
||
if err := stream.persistenceManager.Start(); err != nil {
|
||
return nil, fmt.Errorf("failed to start persistence manager: %w", err)
|
||
}
|
||
}
|
||
|
||
// 启动工作协程,使用配置的工作线程数
|
||
go stream.startSinkWorkerPool(perfConfig.WorkerConfig.SinkWorkerCount)
|
||
go stream.startResultConsumer()
|
||
|
||
return stream, nil
|
||
}
|
||
|
||
// startSinkWorkerPool 启动Sink工作池,支持配置工作线程数
|
||
func (s *Stream) startSinkWorkerPool(workerCount int) {
|
||
// 使用配置的工作线程数
|
||
if workerCount <= 0 {
|
||
workerCount = 8 // 默认值
|
||
}
|
||
|
||
for i := 0; i < workerCount; i++ {
|
||
go func(workerID int) {
|
||
for {
|
||
select {
|
||
case task := <-s.sinkWorkerPool:
|
||
// 执行sink任务
|
||
func() {
|
||
defer func() {
|
||
// 增强错误恢复,防止单个worker崩溃
|
||
if r := recover(); r != nil {
|
||
logger.Error("Sink worker %d panic recovered: %v", workerID, r)
|
||
}
|
||
}()
|
||
task()
|
||
}()
|
||
case <-s.done:
|
||
return
|
||
}
|
||
}
|
||
}(i)
|
||
}
|
||
}
|
||
|
||
// startResultConsumer 启动自动结果消费者,防止resultChan阻塞
|
||
func (s *Stream) startResultConsumer() {
|
||
for {
|
||
select {
|
||
case <-s.resultChan:
|
||
// 自动消费结果,防止通道阻塞
|
||
// 这是一个保底机制,确保即使没有外部消费者,系统也不会阻塞
|
||
case <-s.done:
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *Stream) RegisterFilter(conditionStr string) error {
|
||
if strings.TrimSpace(conditionStr) == "" {
|
||
return nil
|
||
}
|
||
|
||
// 预处理LIKE语法,转换为expr-lang可理解的形式
|
||
processedCondition := conditionStr
|
||
bridge := functions.GetExprBridge()
|
||
if bridge.ContainsLikeOperator(conditionStr) {
|
||
if processed, err := bridge.PreprocessLikeExpression(conditionStr); err == nil {
|
||
processedCondition = processed
|
||
}
|
||
}
|
||
|
||
// 预处理IS NULL和IS NOT NULL语法
|
||
if bridge.ContainsIsNullOperator(processedCondition) {
|
||
if processed, err := bridge.PreprocessIsNullExpression(processedCondition); err == nil {
|
||
processedCondition = processed
|
||
}
|
||
}
|
||
|
||
filter, err := condition.NewExprCondition(processedCondition)
|
||
if err != nil {
|
||
return fmt.Errorf("compile filter error: %w", err)
|
||
}
|
||
s.filter = filter
|
||
return nil
|
||
}
|
||
|
||
// convertToAggregationFields 将旧格式的配置转换为新的AggregationField格式
|
||
func convertToAggregationFields(selectFields map[string]aggregator.AggregateType, fieldAlias map[string]string) []aggregator.AggregationField {
|
||
var fields []aggregator.AggregationField
|
||
|
||
for outputAlias, aggType := range selectFields {
|
||
field := aggregator.AggregationField{
|
||
AggregateType: aggType,
|
||
OutputAlias: outputAlias,
|
||
}
|
||
|
||
// 查找对应的输入字段名
|
||
if inputField, exists := fieldAlias[outputAlias]; exists {
|
||
field.InputField = inputField
|
||
} else {
|
||
// 如果没有别名映射,假设输入字段名等于输出别名
|
||
field.InputField = outputAlias
|
||
}
|
||
|
||
fields = append(fields, field)
|
||
}
|
||
|
||
return fields
|
||
}
|
||
|
||
func (s *Stream) Start() {
|
||
// 启动处理协程
|
||
go s.process()
|
||
}
|
||
|
||
func (s *Stream) process() {
|
||
// 初始化聚合器,用于窗口模式
|
||
if s.config.NeedWindow {
|
||
// 转换为新的AggregationField格式
|
||
aggregationFields := convertToAggregationFields(s.config.SelectFields, s.config.FieldAlias)
|
||
s.aggregator = aggregator.NewGroupAggregator(s.config.GroupFields, aggregationFields)
|
||
|
||
// 为表达式字段创建计算器
|
||
for field, fieldExpr := range s.config.FieldExpressions {
|
||
// 创建局部变量避免闭包问题
|
||
currentField := field
|
||
currentFieldExpr := fieldExpr
|
||
|
||
// 注册表达式计算器
|
||
s.aggregator.RegisterExpression(
|
||
currentField,
|
||
currentFieldExpr.Expression,
|
||
currentFieldExpr.Fields,
|
||
func(data interface{}) (interface{}, error) {
|
||
// 将数据转换为 map[string]interface{} 以便计算
|
||
var dataMap map[string]interface{}
|
||
switch d := data.(type) {
|
||
case map[string]interface{}:
|
||
dataMap = d
|
||
default:
|
||
// 如果不是 map,尝试转换
|
||
v := reflect.ValueOf(data)
|
||
if v.Kind() == reflect.Ptr {
|
||
v = v.Elem()
|
||
}
|
||
|
||
if v.Kind() == reflect.Struct {
|
||
// 将结构体转换为 map
|
||
dataMap = make(map[string]interface{})
|
||
t := v.Type()
|
||
for i := 0; i < t.NumField(); i++ {
|
||
field := t.Field(i)
|
||
dataMap[field.Name] = v.Field(i).Interface()
|
||
}
|
||
} else {
|
||
return nil, fmt.Errorf("unsupported data type for expression: %T", data)
|
||
}
|
||
}
|
||
|
||
// 检查表达式是否包含嵌套字段,如果有则直接使用自定义表达式引擎
|
||
hasNestedFields := strings.Contains(currentFieldExpr.Expression, ".")
|
||
|
||
if hasNestedFields {
|
||
// 直接使用自定义表达式引擎处理嵌套字段,支持NULL值
|
||
expression, parseErr := expr.NewExpression(currentFieldExpr.Expression)
|
||
if parseErr != nil {
|
||
return nil, fmt.Errorf("expression parse failed: %w", parseErr)
|
||
}
|
||
|
||
// 使用支持NULL的计算方法
|
||
numResult, isNull, err := expression.EvaluateWithNull(dataMap)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("expression evaluation failed: %w", err)
|
||
}
|
||
if isNull {
|
||
return nil, nil // 返回nil表示NULL值
|
||
}
|
||
return numResult, nil
|
||
}
|
||
|
||
// 检查是否为CASE表达式
|
||
trimmedExpr := strings.TrimSpace(currentFieldExpr.Expression)
|
||
upperExpr := strings.ToUpper(trimmedExpr)
|
||
if strings.HasPrefix(upperExpr, "CASE") {
|
||
// CASE表达式使用支持NULL的计算方法
|
||
expression, parseErr := expr.NewExpression(currentFieldExpr.Expression)
|
||
if parseErr != nil {
|
||
return nil, fmt.Errorf("CASE expression parse failed: %w", parseErr)
|
||
}
|
||
|
||
numResult, isNull, err := expression.EvaluateWithNull(dataMap)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("CASE expression evaluation failed: %w", err)
|
||
}
|
||
if isNull {
|
||
return nil, nil // 返回nil表示NULL值
|
||
}
|
||
return numResult, nil
|
||
}
|
||
|
||
// 使用桥接器计算表达式,支持字符串拼接和IS NULL等语法
|
||
bridge := functions.GetExprBridge()
|
||
|
||
// 预处理表达式中的IS NULL和LIKE语法
|
||
processedExpr := currentFieldExpr.Expression
|
||
if bridge.ContainsIsNullOperator(processedExpr) {
|
||
if processed, err := bridge.PreprocessIsNullExpression(processedExpr); err == nil {
|
||
processedExpr = processed
|
||
}
|
||
}
|
||
if bridge.ContainsLikeOperator(processedExpr) {
|
||
if processed, err := bridge.PreprocessLikeExpression(processedExpr); err == nil {
|
||
processedExpr = processed
|
||
}
|
||
}
|
||
|
||
result, err := bridge.EvaluateExpression(processedExpr, dataMap)
|
||
if err != nil {
|
||
// 如果桥接器失败,回退到原来的表达式引擎(使用原始表达式,不是预处理的)
|
||
expression, parseErr := expr.NewExpression(currentFieldExpr.Expression)
|
||
if parseErr != nil {
|
||
return nil, fmt.Errorf("expression parse failed: %w", parseErr)
|
||
}
|
||
|
||
// 计算表达式,支持NULL值
|
||
numResult, isNull, evalErr := expression.EvaluateWithNull(dataMap)
|
||
if evalErr != nil {
|
||
return nil, fmt.Errorf("expression evaluation failed: %w", evalErr)
|
||
}
|
||
if isNull {
|
||
return nil, nil // 返回nil表示NULL值
|
||
}
|
||
return numResult, nil
|
||
}
|
||
|
||
return result, nil
|
||
},
|
||
)
|
||
}
|
||
|
||
// 启动窗口处理协程
|
||
s.Window.Start()
|
||
|
||
// 处理窗口模式
|
||
go func() {
|
||
for batch := range s.Window.OutputChan() {
|
||
// 处理窗口批数据
|
||
for _, item := range batch {
|
||
_ = s.aggregator.Put("window_start", item.Slot.WindowStart())
|
||
_ = s.aggregator.Put("window_end", item.Slot.WindowEnd())
|
||
if err := s.aggregator.Add(item.Data); err != nil {
|
||
logger.Error("aggregate error: %v", err)
|
||
}
|
||
}
|
||
|
||
// 获取并发送聚合结果
|
||
if results, err := s.aggregator.GetResults(); err == nil {
|
||
var finalResults []map[string]interface{}
|
||
if s.config.Distinct {
|
||
seenResults := make(map[string]bool)
|
||
for _, result := range results {
|
||
serializedResult, jsonErr := json.Marshal(result)
|
||
if jsonErr != nil {
|
||
logger.Error("Error serializing result for distinct check: %v", jsonErr)
|
||
finalResults = append(finalResults, result)
|
||
continue
|
||
}
|
||
if !seenResults[string(serializedResult)] {
|
||
finalResults = append(finalResults, result)
|
||
seenResults[string(serializedResult)] = true
|
||
}
|
||
}
|
||
} else {
|
||
finalResults = results
|
||
}
|
||
|
||
// 应用 HAVING 过滤条件
|
||
if s.config.Having != "" {
|
||
// 检查HAVING条件是否包含CASE表达式
|
||
hasCaseExpression := strings.Contains(strings.ToUpper(s.config.Having), "CASE")
|
||
|
||
var filteredResults []map[string]interface{}
|
||
|
||
if hasCaseExpression {
|
||
// HAVING条件包含CASE表达式,使用我们的表达式解析器
|
||
expression, err := expr.NewExpression(s.config.Having)
|
||
if err != nil {
|
||
logger.Error("having filter error (CASE expression): %v", err)
|
||
} else {
|
||
// 应用 HAVING 过滤,使用CASE表达式计算器
|
||
for _, result := range finalResults {
|
||
// 使用EvaluateWithNull方法以支持NULL值处理
|
||
havingResult, isNull, err := expression.EvaluateWithNull(result)
|
||
if err != nil {
|
||
logger.Error("having filter evaluation error: %v", err)
|
||
continue
|
||
}
|
||
|
||
// 如果结果是NULL,则不满足条件(SQL标准行为)
|
||
if isNull {
|
||
continue
|
||
}
|
||
|
||
// 对于数值结果,大于0视为true(满足HAVING条件)
|
||
if havingResult > 0 {
|
||
filteredResults = append(filteredResults, result)
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
// HAVING条件不包含CASE表达式,使用原有的expr-lang处理
|
||
// 预处理HAVING条件中的LIKE语法,转换为expr-lang可理解的形式
|
||
processedHaving := s.config.Having
|
||
bridge := functions.GetExprBridge()
|
||
if bridge.ContainsLikeOperator(s.config.Having) {
|
||
if processed, err := bridge.PreprocessLikeExpression(s.config.Having); err == nil {
|
||
processedHaving = processed
|
||
}
|
||
}
|
||
|
||
// 预处理HAVING条件中的IS NULL语法
|
||
if bridge.ContainsIsNullOperator(processedHaving) {
|
||
if processed, err := bridge.PreprocessIsNullExpression(processedHaving); err == nil {
|
||
processedHaving = processed
|
||
}
|
||
}
|
||
|
||
// 创建 HAVING 条件
|
||
havingFilter, err := condition.NewExprCondition(processedHaving)
|
||
if err != nil {
|
||
logger.Error("having filter error: %v", err)
|
||
} else {
|
||
// 应用 HAVING 过滤
|
||
for _, result := range finalResults {
|
||
if havingFilter.Evaluate(result) {
|
||
filteredResults = append(filteredResults, result)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
finalResults = filteredResults
|
||
}
|
||
|
||
// 应用 LIMIT 限制
|
||
if s.config.Limit > 0 && len(finalResults) > s.config.Limit {
|
||
finalResults = finalResults[:s.config.Limit]
|
||
}
|
||
|
||
// 优化: 发送结果到结果通道和 Sink 函数
|
||
if len(finalResults) > 0 {
|
||
// 非阻塞发送到结果通道
|
||
s.sendResultNonBlocking(finalResults)
|
||
|
||
// 异步调用所有sinks
|
||
s.callSinksAsync(finalResults)
|
||
}
|
||
s.aggregator.Reset()
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// 创建一个定时器,避免创建多个临时定时器导致资源泄漏
|
||
ticker := time.NewTicker(100 * time.Millisecond)
|
||
defer ticker.Stop() // 确保在函数退出时停止定时器
|
||
|
||
// 主处理循环
|
||
for {
|
||
// 使用读锁安全访问dataChan
|
||
s.dataChanMux.RLock()
|
||
currentDataChan := s.dataChan
|
||
s.dataChanMux.RUnlock()
|
||
|
||
select {
|
||
case data, ok := <-currentDataChan:
|
||
if !ok {
|
||
// 通道已关闭
|
||
return
|
||
}
|
||
// 应用过滤条件
|
||
if s.filter == nil || s.filter.Evaluate(data) {
|
||
if s.config.NeedWindow {
|
||
// 窗口模式,添加数据到窗口
|
||
s.Window.Add(data)
|
||
} else {
|
||
// 非窗口模式,直接处理数据并输出
|
||
s.processDirectData(data)
|
||
}
|
||
}
|
||
case <-s.done:
|
||
// 收到关闭信号
|
||
return
|
||
case <-ticker.C:
|
||
// 定时器触发,什么都不做,只是防止 CPU 空转
|
||
}
|
||
}
|
||
}
|
||
|
||
// processDirectData 直接处理非窗口数据 (优化版本)
|
||
func (s *Stream) processDirectData(data interface{}) {
|
||
// 增加输入计数
|
||
atomic.AddInt64(&s.inputCount, 1)
|
||
|
||
// 简化:直接将数据作为map处理
|
||
dataMap, ok := data.(map[string]interface{})
|
||
if !ok {
|
||
logger.Error("Unsupported data type: %T", data)
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
return
|
||
}
|
||
|
||
// 创建结果map
|
||
result := make(map[string]interface{})
|
||
|
||
// 处理表达式字段
|
||
for fieldName, fieldExpr := range s.config.FieldExpressions {
|
||
|
||
// 使用桥接器计算表达式,支持IS NULL等语法
|
||
bridge := functions.GetExprBridge()
|
||
|
||
// 预处理表达式中的IS NULL和LIKE语法
|
||
processedExpr := fieldExpr.Expression
|
||
if bridge.ContainsIsNullOperator(processedExpr) {
|
||
if processed, err := bridge.PreprocessIsNullExpression(processedExpr); err == nil {
|
||
processedExpr = processed
|
||
}
|
||
}
|
||
if bridge.ContainsLikeOperator(processedExpr) {
|
||
if processed, err := bridge.PreprocessLikeExpression(processedExpr); err == nil {
|
||
processedExpr = processed
|
||
}
|
||
}
|
||
|
||
// 检查表达式是否是函数调用(包含括号)
|
||
isFunctionCall := strings.Contains(fieldExpr.Expression, "(") && strings.Contains(fieldExpr.Expression, ")")
|
||
|
||
// 检查表达式是否包含嵌套字段(但排除函数调用中的点号)
|
||
hasNestedFields := false
|
||
if !isFunctionCall && strings.Contains(fieldExpr.Expression, ".") {
|
||
hasNestedFields = true
|
||
}
|
||
|
||
var evalResult interface{}
|
||
|
||
if isFunctionCall {
|
||
// 对于函数调用,优先使用桥接器处理,这样可以保持原始类型
|
||
exprResult, err := bridge.EvaluateExpression(processedExpr, dataMap)
|
||
if err != nil {
|
||
logger.Error("Function call evaluation failed for field %s: %v", fieldName, err)
|
||
result[fieldName] = nil
|
||
continue
|
||
}
|
||
evalResult = exprResult
|
||
} else if hasNestedFields {
|
||
// 检测到嵌套字段(非函数调用),使用自定义表达式引擎
|
||
expression, parseErr := expr.NewExpression(fieldExpr.Expression)
|
||
if parseErr != nil {
|
||
logger.Error("Expression parse failed for field %s: %v", fieldName, parseErr)
|
||
result[fieldName] = nil
|
||
continue
|
||
}
|
||
|
||
numResult, err := expression.Evaluate(dataMap)
|
||
if err != nil {
|
||
logger.Error("Expression evaluation failed for field %s: %v", fieldName, err)
|
||
result[fieldName] = nil
|
||
continue
|
||
}
|
||
evalResult = numResult
|
||
} else {
|
||
// 尝试使用桥接器处理其他表达式
|
||
exprResult, err := bridge.EvaluateExpression(processedExpr, dataMap)
|
||
if err != nil {
|
||
// 如果桥接器失败,回退到原来的表达式引擎(使用原始表达式,不是预处理的)
|
||
expression, parseErr := expr.NewExpression(fieldExpr.Expression)
|
||
if parseErr != nil {
|
||
logger.Error("Expression parse failed for field %s: %v", fieldName, parseErr)
|
||
result[fieldName] = nil
|
||
continue
|
||
}
|
||
|
||
// 计算表达式
|
||
numResult, evalErr := expression.Evaluate(dataMap)
|
||
if evalErr != nil {
|
||
logger.Error("Expression evaluation failed for field %s: %v", fieldName, evalErr)
|
||
result[fieldName] = nil
|
||
continue
|
||
}
|
||
evalResult = numResult
|
||
} else {
|
||
evalResult = exprResult
|
||
}
|
||
}
|
||
|
||
result[fieldName] = evalResult
|
||
}
|
||
|
||
// 如果指定了字段,只保留这些字段
|
||
if len(s.config.SimpleFields) > 0 {
|
||
for _, fieldSpec := range s.config.SimpleFields {
|
||
// 处理SELECT *的特殊情况
|
||
if fieldSpec == "*" {
|
||
// SELECT *:返回所有字段,但跳过已经通过表达式字段处理的字段
|
||
for k, v := range dataMap {
|
||
// 如果该字段已经通过表达式字段处理,则跳过,保持表达式计算结果
|
||
if _, isExpression := s.config.FieldExpressions[k]; !isExpression {
|
||
result[k] = v
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
|
||
// 处理别名
|
||
parts := strings.Split(fieldSpec, ":")
|
||
fieldName := parts[0]
|
||
outputName := fieldName
|
||
if len(parts) > 1 {
|
||
outputName = parts[1]
|
||
}
|
||
|
||
// 跳过已经通过表达式字段处理的字段
|
||
if _, isExpression := s.config.FieldExpressions[outputName]; isExpression {
|
||
continue
|
||
}
|
||
|
||
// 检查是否是函数调用
|
||
if strings.Contains(fieldName, "(") && strings.Contains(fieldName, ")") {
|
||
// 执行函数调用
|
||
if funcResult, err := s.executeFunction(fieldName, dataMap); err == nil {
|
||
result[outputName] = funcResult
|
||
} else {
|
||
logger.Error("Function execution error %s: %v", fieldName, err)
|
||
result[outputName] = nil
|
||
}
|
||
} else {
|
||
// 普通字段 - 支持嵌套字段
|
||
var value interface{}
|
||
var exists bool
|
||
|
||
if fieldpath.IsNestedField(fieldName) {
|
||
value, exists = fieldpath.GetNestedField(data, fieldName)
|
||
} else {
|
||
value, exists = dataMap[fieldName]
|
||
}
|
||
|
||
if exists {
|
||
result[outputName] = value
|
||
} else {
|
||
result[outputName] = nil
|
||
}
|
||
}
|
||
}
|
||
} else if len(s.config.FieldExpressions) == 0 {
|
||
// 如果没有指定字段且没有表达式字段,保留所有字段
|
||
for k, v := range dataMap {
|
||
result[k] = v
|
||
}
|
||
}
|
||
|
||
// 将结果包装为数组
|
||
results := []map[string]interface{}{result}
|
||
|
||
// 优化: 非阻塞发送结果到resultChan
|
||
s.sendResultNonBlocking(results)
|
||
|
||
// 优化: 异步调用所有sinks,避免阻塞
|
||
s.callSinksAsync(results)
|
||
}
|
||
|
||
// sendResultNonBlocking 非阻塞方式发送结果到resultChan (智能背压控制)
|
||
func (s *Stream) sendResultNonBlocking(results []map[string]interface{}) {
|
||
select {
|
||
case s.resultChan <- results:
|
||
// 成功发送到结果通道
|
||
atomic.AddInt64(&s.outputCount, 1)
|
||
default:
|
||
// 结果通道已满,使用智能背压控制策略
|
||
chanLen := len(s.resultChan)
|
||
chanCap := cap(s.resultChan)
|
||
|
||
// 如果通道使用率超过90%,进入背压模式
|
||
if float64(chanLen)/float64(chanCap) > 0.9 {
|
||
// 尝试清理一些旧数据,为新数据腾出空间
|
||
select {
|
||
case <-s.resultChan:
|
||
// 清理一个旧结果,然后尝试添加新结果
|
||
select {
|
||
case s.resultChan <- results:
|
||
atomic.AddInt64(&s.outputCount, 1)
|
||
default:
|
||
logger.Warn("Result channel is full, dropping result data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
}
|
||
default:
|
||
logger.Warn("Result channel is full, dropping result data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
}
|
||
} else {
|
||
logger.Warn("Result channel is full, dropping result data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
}
|
||
}
|
||
}
|
||
|
||
// callSinksAsync 异步调用所有sink函数
|
||
func (s *Stream) callSinksAsync(results []map[string]interface{}) {
|
||
// 使用读锁安全地访问sinks切片
|
||
s.sinksMux.RLock()
|
||
if len(s.sinks) == 0 {
|
||
s.sinksMux.RUnlock()
|
||
return
|
||
}
|
||
|
||
// 复制sinks切片以避免在持有锁的情况下执行耗时操作
|
||
sinksCopy := make([]func(interface{}), len(s.sinks))
|
||
copy(sinksCopy, s.sinks)
|
||
s.sinksMux.RUnlock()
|
||
|
||
// 为每个sink创建异步任务
|
||
for _, sink := range sinksCopy {
|
||
// 捕获sink变量,避免闭包问题
|
||
currentSink := sink
|
||
|
||
// 提交任务到工作池
|
||
task := func() {
|
||
defer func() {
|
||
// 恢复panic,防止单个sink错误影响整个系统
|
||
if r := recover(); r != nil {
|
||
logger.Error("Sink execution exception: %v", r)
|
||
}
|
||
}()
|
||
currentSink(results)
|
||
}
|
||
|
||
// 非阻塞提交任务
|
||
select {
|
||
case s.sinkWorkerPool <- task:
|
||
// 成功提交任务
|
||
default:
|
||
// 工作池已满,直接在当前goroutine执行(降级处理)
|
||
go task()
|
||
}
|
||
}
|
||
}
|
||
|
||
// executeFunction 执行函数调用
|
||
func (s *Stream) executeFunction(funcExpr string, data map[string]interface{}) (interface{}, error) {
|
||
// 检查是否是自定义函数
|
||
funcName := extractFunctionName(funcExpr)
|
||
if funcName != "" {
|
||
// 直接使用函数系统
|
||
fn, exists := functions.Get(funcName)
|
||
if exists {
|
||
// 解析参数
|
||
args, err := s.parseFunctionArgs(funcExpr, data)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 创建函数上下文
|
||
ctx := &functions.FunctionContext{Data: data}
|
||
|
||
// 执行函数
|
||
return fn.Execute(ctx, args)
|
||
}
|
||
}
|
||
|
||
// 对于复杂的嵌套函数调用,直接使用ExprBridge
|
||
// 这样可以避免Expression.Evaluate的float64类型限制
|
||
bridge := functions.GetExprBridge()
|
||
result, err := bridge.EvaluateExpression(funcExpr, data)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("evaluate function expression failed: %w", err)
|
||
}
|
||
|
||
return result, nil
|
||
}
|
||
|
||
// extractFunctionName 从表达式中提取函数名
|
||
func extractFunctionName(expr string) string {
|
||
parenIndex := strings.Index(expr, "(")
|
||
if parenIndex == -1 {
|
||
return ""
|
||
}
|
||
funcName := strings.TrimSpace(expr[:parenIndex])
|
||
if strings.ContainsAny(funcName, " +-*/=<>!&|") {
|
||
return ""
|
||
}
|
||
return funcName
|
||
}
|
||
|
||
// parseFunctionArgs 解析函数参数,支持嵌套函数调用
|
||
func (s *Stream) parseFunctionArgs(funcExpr string, data map[string]interface{}) ([]interface{}, error) {
|
||
// 提取括号内的参数
|
||
start := strings.Index(funcExpr, "(")
|
||
end := strings.LastIndex(funcExpr, ")")
|
||
if start == -1 || end == -1 || end <= start {
|
||
return nil, fmt.Errorf("invalid function expression: %s", funcExpr)
|
||
}
|
||
|
||
argsStr := strings.TrimSpace(funcExpr[start+1 : end])
|
||
if argsStr == "" {
|
||
return []interface{}{}, nil
|
||
}
|
||
|
||
// 智能分割参数,处理嵌套函数和引号
|
||
argParts, err := s.smartSplitArgs(argsStr)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
args := make([]interface{}, len(argParts))
|
||
|
||
for i, arg := range argParts {
|
||
arg = strings.TrimSpace(arg)
|
||
|
||
// 如果参数是字符串常量(用引号包围)
|
||
if strings.HasPrefix(arg, "'") && strings.HasSuffix(arg, "'") {
|
||
args[i] = strings.Trim(arg, "'")
|
||
} else if strings.HasPrefix(arg, "\"") && strings.HasSuffix(arg, "\"") {
|
||
args[i] = strings.Trim(arg, "\"")
|
||
} else if strings.Contains(arg, "(") {
|
||
// 如果参数包含函数调用,递归执行
|
||
result, err := s.executeFunction(arg, data)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to execute nested function '%s': %v", arg, err)
|
||
}
|
||
args[i] = result
|
||
} else if value, exists := data[arg]; exists {
|
||
// 如果是数据字段
|
||
args[i] = value
|
||
} else {
|
||
// 尝试解析为数字
|
||
if val, err := strconv.ParseFloat(arg, 64); err == nil {
|
||
args[i] = val
|
||
} else {
|
||
args[i] = arg
|
||
}
|
||
}
|
||
}
|
||
|
||
return args, nil
|
||
}
|
||
|
||
// smartSplitArgs 智能分割参数,考虑括号嵌套和引号
|
||
func (s *Stream) smartSplitArgs(argsStr string) ([]string, error) {
|
||
var args []string
|
||
var current strings.Builder
|
||
parenDepth := 0
|
||
inQuotes := false
|
||
quoteChar := byte(0)
|
||
|
||
for i := 0; i < len(argsStr); i++ {
|
||
ch := argsStr[i]
|
||
|
||
switch ch {
|
||
case '\'':
|
||
if !inQuotes {
|
||
inQuotes = true
|
||
quoteChar = ch
|
||
} else if quoteChar == ch {
|
||
inQuotes = false
|
||
quoteChar = 0
|
||
}
|
||
current.WriteByte(ch)
|
||
case '"':
|
||
if !inQuotes {
|
||
inQuotes = true
|
||
quoteChar = ch
|
||
} else if quoteChar == ch {
|
||
inQuotes = false
|
||
quoteChar = 0
|
||
}
|
||
current.WriteByte(ch)
|
||
case '(':
|
||
if !inQuotes {
|
||
parenDepth++
|
||
}
|
||
current.WriteByte(ch)
|
||
case ')':
|
||
if !inQuotes {
|
||
parenDepth--
|
||
}
|
||
current.WriteByte(ch)
|
||
case ',':
|
||
if !inQuotes && parenDepth == 0 {
|
||
// 找到参数分隔符
|
||
args = append(args, strings.TrimSpace(current.String()))
|
||
current.Reset()
|
||
} else {
|
||
current.WriteByte(ch)
|
||
}
|
||
default:
|
||
current.WriteByte(ch)
|
||
}
|
||
}
|
||
|
||
// 添加最后一个参数
|
||
if current.Len() > 0 {
|
||
args = append(args, strings.TrimSpace(current.String()))
|
||
}
|
||
|
||
return args, nil
|
||
}
|
||
|
||
func (s *Stream) AddData(data interface{}) {
|
||
atomic.AddInt64(&s.inputCount, 1)
|
||
|
||
// 根据溢出策略处理数据
|
||
switch s.overflowStrategy {
|
||
case "block":
|
||
// 阻塞模式:保证数据不丢失
|
||
s.addDataBlocking(data)
|
||
case "expand":
|
||
// 动态扩容模式:自动扩大缓冲区
|
||
s.addDataWithExpansion(data)
|
||
case "persist":
|
||
// 持久化模式:溢出数据写入磁盘
|
||
s.addDataWithPersistence(data)
|
||
default:
|
||
// 默认drop模式:原有逻辑
|
||
s.addDataWithDrop(data)
|
||
}
|
||
}
|
||
|
||
// addDataBlocking 阻塞模式添加数据,保证零数据丢失 (线程安全版本)
|
||
func (s *Stream) addDataBlocking(data interface{}) {
|
||
if s.blockingTimeout <= 0 {
|
||
// 无超时限制,永久阻塞直到成功
|
||
dataChan := s.safeGetDataChan()
|
||
dataChan <- data
|
||
return
|
||
}
|
||
|
||
// 带超时的阻塞
|
||
timer := time.NewTimer(s.blockingTimeout)
|
||
defer timer.Stop()
|
||
|
||
dataChan := s.safeGetDataChan()
|
||
select {
|
||
case dataChan <- data:
|
||
// 成功添加数据
|
||
return
|
||
case <-timer.C:
|
||
// 超时但不丢弃数据,记录错误但继续阻塞
|
||
logger.Error("Data addition timeout, but continue waiting to avoid data loss")
|
||
// 继续无限期阻塞,重新获取当前通道引用
|
||
finalDataChan := s.safeGetDataChan()
|
||
finalDataChan <- data
|
||
}
|
||
}
|
||
|
||
// addDataWithExpansion 动态扩容模式 (线程安全版本)
|
||
func (s *Stream) addDataWithExpansion(data interface{}) {
|
||
// 首次尝试添加数据
|
||
if s.safeSendToDataChan(data) {
|
||
return
|
||
}
|
||
|
||
// 通道满了,动态扩容
|
||
s.expandDataChannel()
|
||
|
||
// 扩容后重试,重新获取通道引用
|
||
if s.safeSendToDataChan(data) {
|
||
logger.Info("Successfully added data after data channel expansion")
|
||
return
|
||
}
|
||
|
||
// 如果扩容后仍然满,则阻塞等待
|
||
dataChan := s.safeGetDataChan()
|
||
dataChan <- data
|
||
}
|
||
|
||
// addDataWithPersistence 持久化模式(线程安全完整实现)
|
||
func (s *Stream) addDataWithPersistence(data interface{}) {
|
||
// 首次尝试添加数据
|
||
if s.safeSendToDataChan(data) {
|
||
return
|
||
}
|
||
|
||
// 通道满了,持久化到磁盘
|
||
if s.persistenceManager != nil {
|
||
if err := s.persistenceManager.PersistData(data); err != nil {
|
||
logger.Error("Failed to persist data: %v", err)
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
} else {
|
||
logger.Debug("Data has been persisted to disk")
|
||
}
|
||
} else {
|
||
logger.Error("Persistence manager not initialized, data will be lost")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
}
|
||
|
||
// 启动异步重试
|
||
go s.persistAndRetryData(data)
|
||
}
|
||
|
||
// addDataWithDrop 原有的丢弃模式 (线程安全版本)
|
||
func (s *Stream) addDataWithDrop(data interface{}) {
|
||
// 优化: 智能非阻塞添加,分层背压控制
|
||
if s.safeSendToDataChan(data) {
|
||
return
|
||
}
|
||
|
||
// 数据通道已满,使用分层背压策略,获取通道状态
|
||
s.dataChanMux.RLock()
|
||
chanLen := len(s.dataChan)
|
||
chanCap := cap(s.dataChan)
|
||
currentDataChan := s.dataChan
|
||
s.dataChanMux.RUnlock()
|
||
|
||
usage := float64(chanLen) / float64(chanCap)
|
||
|
||
// 根据通道使用率和缓冲区大小调整策略
|
||
var waitTime time.Duration
|
||
var maxRetries int
|
||
|
||
switch {
|
||
case chanCap >= 100000: // 超大缓冲区(基准测试模式)
|
||
switch {
|
||
case usage > 0.99:
|
||
waitTime = 1 * time.Millisecond // 更长等待
|
||
maxRetries = 3
|
||
case usage > 0.95:
|
||
waitTime = 500 * time.Microsecond
|
||
maxRetries = 2
|
||
case usage > 0.90:
|
||
waitTime = 100 * time.Microsecond
|
||
maxRetries = 1
|
||
default:
|
||
// 立即丢弃
|
||
logger.Warn("Data channel is full, dropping input data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
return
|
||
}
|
||
|
||
case chanCap >= 50000: // 高性能模式
|
||
switch {
|
||
case usage > 0.99:
|
||
waitTime = 500 * time.Microsecond
|
||
maxRetries = 2
|
||
case usage > 0.95:
|
||
waitTime = 200 * time.Microsecond
|
||
maxRetries = 1
|
||
case usage > 0.90:
|
||
waitTime = 50 * time.Microsecond
|
||
maxRetries = 1
|
||
default:
|
||
logger.Warn("Data channel is full, dropping input data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
return
|
||
}
|
||
|
||
default: // 默认模式
|
||
switch {
|
||
case usage > 0.99:
|
||
waitTime = 100 * time.Microsecond
|
||
maxRetries = 1
|
||
case usage > 0.95:
|
||
waitTime = 50 * time.Microsecond
|
||
maxRetries = 1
|
||
default:
|
||
logger.Warn("Data channel is full, dropping input data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
return
|
||
}
|
||
}
|
||
|
||
// 多次重试添加数据,使用线程安全的方式
|
||
for retry := 0; retry < maxRetries; retry++ {
|
||
timer := time.NewTimer(waitTime)
|
||
select {
|
||
case currentDataChan <- data:
|
||
// 重试成功
|
||
timer.Stop()
|
||
return
|
||
case <-timer.C:
|
||
// 超时,继续下一次重试或者丢弃
|
||
if retry == maxRetries-1 {
|
||
// 最后一次重试失败,记录丢弃
|
||
logger.Warn("Data channel is full, dropping input data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// safeGetDataChan 线程安全地获取dataChan引用
|
||
func (s *Stream) safeGetDataChan() chan interface{} {
|
||
s.dataChanMux.RLock()
|
||
defer s.dataChanMux.RUnlock()
|
||
return s.dataChan
|
||
}
|
||
|
||
// safeSendToDataChan 线程安全地向dataChan发送数据
|
||
func (s *Stream) safeSendToDataChan(data interface{}) bool {
|
||
dataChan := s.safeGetDataChan()
|
||
select {
|
||
case dataChan <- data:
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
func (s *Stream) AddSink(sink func(interface{})) {
|
||
s.sinksMux.Lock()
|
||
s.sinks = append(s.sinks, sink)
|
||
s.sinksMux.Unlock()
|
||
}
|
||
|
||
func (s *Stream) GetResultsChan() <-chan interface{} {
|
||
return s.resultChan
|
||
}
|
||
|
||
func NewStreamProcessor() (*Stream, error) {
|
||
return NewStream(types.Config{})
|
||
}
|
||
|
||
// Stop 停止流处理
|
||
func (s *Stream) Stop() {
|
||
close(s.done)
|
||
|
||
// 停止持久化管理器
|
||
if s.persistenceManager != nil {
|
||
if err := s.persistenceManager.Stop(); err != nil {
|
||
logger.Error("Failed to stop persistence manager: %v", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// GetStats 获取流处理统计信息 (线程安全版本)
|
||
func (s *Stream) GetStats() map[string]int64 {
|
||
// 线程安全地获取dataChan状态
|
||
s.dataChanMux.RLock()
|
||
dataChanLen := int64(len(s.dataChan))
|
||
dataChanCap := int64(cap(s.dataChan))
|
||
s.dataChanMux.RUnlock()
|
||
|
||
return map[string]int64{
|
||
"input_count": atomic.LoadInt64(&s.inputCount),
|
||
"output_count": atomic.LoadInt64(&s.outputCount),
|
||
"dropped_count": atomic.LoadInt64(&s.droppedCount),
|
||
"data_chan_len": dataChanLen,
|
||
"data_chan_cap": dataChanCap,
|
||
"result_chan_len": int64(len(s.resultChan)),
|
||
"result_chan_cap": int64(cap(s.resultChan)),
|
||
"sink_pool_len": int64(len(s.sinkWorkerPool)),
|
||
"sink_pool_cap": int64(cap(s.sinkWorkerPool)),
|
||
"active_retries": int64(atomic.LoadInt32(&s.activeRetries)),
|
||
"expanding": int64(atomic.LoadInt32(&s.expanding)),
|
||
}
|
||
}
|
||
|
||
// GetDetailedStats 获取详细的性能统计信息
|
||
func (s *Stream) GetDetailedStats() map[string]interface{} {
|
||
stats := s.GetStats()
|
||
|
||
// 计算使用率
|
||
dataUsage := float64(stats["data_chan_len"]) / float64(stats["data_chan_cap"]) * 100
|
||
resultUsage := float64(stats["result_chan_len"]) / float64(stats["result_chan_cap"]) * 100
|
||
sinkUsage := float64(stats["sink_pool_len"]) / float64(stats["sink_pool_cap"]) * 100
|
||
|
||
// 计算效率指标
|
||
var processRate float64 = 100.0
|
||
var dropRate float64 = 0.0
|
||
|
||
if stats["input_count"] > 0 {
|
||
processRate = float64(stats["output_count"]) / float64(stats["input_count"]) * 100
|
||
dropRate = float64(stats["dropped_count"]) / float64(stats["input_count"]) * 100
|
||
}
|
||
|
||
return map[string]interface{}{
|
||
"basic_stats": stats,
|
||
"data_chan_usage": dataUsage,
|
||
"result_chan_usage": resultUsage,
|
||
"sink_pool_usage": sinkUsage,
|
||
"process_rate": processRate,
|
||
"drop_rate": dropRate,
|
||
"performance_level": s.assessPerformanceLevel(dataUsage, dropRate),
|
||
}
|
||
}
|
||
|
||
// assessPerformanceLevel 评估当前性能水平
|
||
func (s *Stream) assessPerformanceLevel(dataUsage, dropRate float64) string {
|
||
switch {
|
||
case dropRate > 50:
|
||
return "CRITICAL" // 严重性能问题
|
||
case dropRate > 20:
|
||
return "WARNING" // 性能警告
|
||
case dataUsage > 90:
|
||
return "HIGH_LOAD" // 高负载
|
||
case dataUsage > 70:
|
||
return "MODERATE_LOAD" // 中等负载
|
||
default:
|
||
return "OPTIMAL" // 最佳状态
|
||
}
|
||
}
|
||
|
||
// ResetStats 重置统计信息
|
||
func (s *Stream) ResetStats() {
|
||
atomic.StoreInt64(&s.inputCount, 0)
|
||
atomic.StoreInt64(&s.outputCount, 0)
|
||
atomic.StoreInt64(&s.droppedCount, 0)
|
||
}
|
||
|
||
// expandDataChannel 动态扩容数据通道
|
||
func (s *Stream) expandDataChannel() {
|
||
// 使用原子操作检查是否正在扩容,防止并发扩容
|
||
if !atomic.CompareAndSwapInt32(&s.expanding, 0, 1) {
|
||
logger.Debug("Channel expansion already in progress, skipping")
|
||
return
|
||
}
|
||
defer atomic.StoreInt32(&s.expanding, 0)
|
||
|
||
// 获取扩容锁,确保只有一个协程进行扩容
|
||
s.expansionMux.Lock()
|
||
defer s.expansionMux.Unlock()
|
||
|
||
// 再次检查是否需要扩容(双重检查锁定模式)
|
||
s.dataChanMux.RLock()
|
||
oldCap := cap(s.dataChan)
|
||
currentLen := len(s.dataChan)
|
||
s.dataChanMux.RUnlock()
|
||
|
||
// 如果当前通道使用率低于80%,则不需要扩容
|
||
if float64(currentLen)/float64(oldCap) < 0.8 {
|
||
logger.Debug("Channel usage below threshold, expansion not needed")
|
||
return
|
||
}
|
||
|
||
newCap := int(float64(oldCap) * 1.5) // 扩容50%
|
||
if newCap < oldCap+1000 {
|
||
newCap = oldCap + 1000 // 至少增加1000
|
||
}
|
||
|
||
logger.Info("Dynamic expansion of data channel: %d -> %d", oldCap, newCap)
|
||
|
||
// 创建新的更大的通道
|
||
newChan := make(chan interface{}, newCap)
|
||
|
||
// 使用写锁安全地迁移数据
|
||
s.dataChanMux.Lock()
|
||
oldChan := s.dataChan
|
||
|
||
// 将旧通道中的数据快速迁移到新通道
|
||
migrationTimeout := time.NewTimer(5 * time.Second) // 5秒迁移超时
|
||
defer migrationTimeout.Stop()
|
||
|
||
migratedCount := 0
|
||
for {
|
||
select {
|
||
case data := <-oldChan:
|
||
select {
|
||
case newChan <- data:
|
||
migratedCount++
|
||
case <-migrationTimeout.C:
|
||
logger.Warn("Data migration timeout, some data may be lost during expansion")
|
||
goto migration_done
|
||
}
|
||
case <-migrationTimeout.C:
|
||
logger.Warn("Data migration timeout during channel drain")
|
||
goto migration_done
|
||
default:
|
||
// 旧通道为空,迁移完成
|
||
goto migration_done
|
||
}
|
||
}
|
||
|
||
migration_done:
|
||
// 原子性地更新通道引用
|
||
s.dataChan = newChan
|
||
s.dataChanMux.Unlock()
|
||
|
||
logger.Info("Channel expansion completed: migrated %d items", migratedCount)
|
||
}
|
||
|
||
// persistAndRetryData 持久化数据并重试 (改进版本,具备指数退避和资源控制)
|
||
func (s *Stream) persistAndRetryData(data interface{}) {
|
||
// 检查活跃重试协程数量,防止资源泄漏
|
||
currentRetries := atomic.LoadInt32(&s.activeRetries)
|
||
if currentRetries >= s.maxRetryRoutines {
|
||
logger.Warn("Maximum retry routines reached (%d), dropping data", currentRetries)
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
return
|
||
}
|
||
|
||
// 增加活跃重试计数
|
||
atomic.AddInt32(&s.activeRetries, 1)
|
||
defer atomic.AddInt32(&s.activeRetries, -1)
|
||
|
||
// 使用指数退避策略
|
||
baseInterval := 50 * time.Millisecond
|
||
maxInterval := 2 * time.Second
|
||
maxRetries := 10 // 减少最大重试次数,防止长时间阻塞
|
||
totalTimeout := 30 * time.Second // 总超时时间
|
||
|
||
retryTimer := time.NewTimer(totalTimeout)
|
||
defer retryTimer.Stop()
|
||
|
||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||
// 计算当前重试间隔(指数退避)
|
||
currentInterval := time.Duration(float64(baseInterval) * (1.5 * float64(attempt)))
|
||
if currentInterval > maxInterval {
|
||
currentInterval = maxInterval
|
||
}
|
||
|
||
// 等待重试间隔
|
||
waitTimer := time.NewTimer(currentInterval)
|
||
select {
|
||
case <-waitTimer.C:
|
||
// 继续重试
|
||
case <-retryTimer.C:
|
||
waitTimer.Stop()
|
||
logger.Warn("Persistence retry timeout reached, dropping data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
return
|
||
case <-s.done:
|
||
waitTimer.Stop()
|
||
logger.Debug("Stream stopped during retry, dropping data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
return
|
||
}
|
||
waitTimer.Stop()
|
||
|
||
// 使用线程安全方式尝试发送数据
|
||
s.dataChanMux.RLock()
|
||
currentDataChan := s.dataChan
|
||
s.dataChanMux.RUnlock()
|
||
|
||
select {
|
||
case currentDataChan <- data:
|
||
logger.Debug("Persistence data retry successful: attempt %d", attempt+1)
|
||
return
|
||
case <-retryTimer.C:
|
||
logger.Warn("Persistence retry timeout during send, dropping data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
return
|
||
case <-s.done:
|
||
logger.Debug("Stream stopped during retry send, dropping data")
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
return
|
||
default:
|
||
// 通道仍然满,继续下一次重试
|
||
if attempt == maxRetries-1 {
|
||
logger.Error("Persistence data retry failed after %d attempts, dropping data", maxRetries)
|
||
atomic.AddInt64(&s.droppedCount, 1)
|
||
} else {
|
||
logger.Debug("Persistence retry attempt %d/%d failed, will retry with interval %v",
|
||
attempt+1, maxRetries, currentInterval)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// LoadAndReprocessPersistedData 加载并重新处理持久化数据
|
||
func (s *Stream) LoadAndReprocessPersistedData() error {
|
||
if s.persistenceManager == nil {
|
||
return fmt.Errorf("persistence manager not initialized")
|
||
}
|
||
|
||
// 加载持久化数据
|
||
persistedData, err := s.persistenceManager.LoadPersistedData()
|
||
if err != nil {
|
||
return fmt.Errorf("failed to load persisted data: %w", err)
|
||
}
|
||
|
||
if len(persistedData) == 0 {
|
||
logger.Info("No persistent data to recover")
|
||
return nil
|
||
}
|
||
|
||
logger.Info("Start reprocessing %d persistent data records", len(persistedData))
|
||
|
||
// 重新处理每条数据(线程安全版本)
|
||
successCount := 0
|
||
for i, data := range persistedData {
|
||
// 使用线程安全方式尝试发送数据
|
||
if s.safeSendToDataChan(data) {
|
||
successCount++
|
||
continue
|
||
}
|
||
|
||
// 如果通道还是满的,等待一小段时间再试
|
||
time.Sleep(10 * time.Millisecond)
|
||
if s.safeSendToDataChan(data) {
|
||
successCount++
|
||
} else {
|
||
logger.Warn("Failed to recover data record %d, channel still full", i+1)
|
||
}
|
||
}
|
||
|
||
logger.Info("Persistent data recovery completed: successful %d/%d records", successCount, len(persistedData))
|
||
return nil
|
||
}
|
||
|
||
// GetPersistenceStats 获取持久化统计信息
|
||
func (s *Stream) GetPersistenceStats() map[string]interface{} {
|
||
if s.persistenceManager == nil {
|
||
return map[string]interface{}{
|
||
"enabled": false,
|
||
"message": "persistence not enabled",
|
||
}
|
||
}
|
||
|
||
stats := s.persistenceManager.GetStats()
|
||
stats["enabled"] = true
|
||
return stats
|
||
}
|
||
|
||
// 向后兼容性函数
|
||
|
||
// NewStreamWithBuffers 创建带自定义缓冲区大小的Stream (已弃用,使用NewStreamWithCustomPerformance)
|
||
// Deprecated: 使用NewStreamWithCustomPerformance替代
|
||
func NewStreamWithBuffers(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int) (*Stream, error) {
|
||
perfConfig := types.DefaultPerformanceConfig()
|
||
perfConfig.BufferConfig.DataChannelSize = dataBufSize
|
||
perfConfig.BufferConfig.ResultChannelSize = resultBufSize
|
||
perfConfig.WorkerConfig.SinkPoolSize = sinkPoolSize
|
||
|
||
config.PerformanceConfig = perfConfig
|
||
return newStreamWithUnifiedConfig(config)
|
||
}
|
||
|
||
// NewHighPerformanceStream 创建高性能配置的Stream (已弃用,使用NewStreamWithHighPerformance)
|
||
// Deprecated: 使用NewStreamWithHighPerformance替代
|
||
func NewHighPerformanceStream(config types.Config) (*Stream, error) {
|
||
return NewStreamWithHighPerformance(config)
|
||
}
|
||
|
||
// NewStreamWithoutDataLoss 创建零数据丢失的流处理器 (已弃用,使用NewStreamWithZeroDataLoss)
|
||
// Deprecated: 使用NewStreamWithZeroDataLoss替代
|
||
func NewStreamWithoutDataLoss(config types.Config, strategy string) (*Stream, error) {
|
||
perfConfig := types.ZeroDataLossConfig()
|
||
|
||
// 应用用户指定的策略
|
||
validStrategies := map[string]bool{
|
||
"drop": true,
|
||
"block": true,
|
||
"expand": true,
|
||
"persist": true,
|
||
}
|
||
|
||
if validStrategies[strategy] {
|
||
perfConfig.OverflowConfig.Strategy = strategy
|
||
if strategy == "drop" {
|
||
perfConfig.OverflowConfig.AllowDataLoss = true
|
||
}
|
||
}
|
||
|
||
config.PerformanceConfig = perfConfig
|
||
return newStreamWithUnifiedConfig(config)
|
||
}
|
||
|
||
// NewStreamWithLossPolicy 创建带数据丢失策略的流处理器 (已弃用,使用NewStreamWithCustomPerformance)
|
||
// Deprecated: 使用NewStreamWithCustomPerformance替代
|
||
func NewStreamWithLossPolicy(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int,
|
||
overflowStrategy string, timeout time.Duration) (*Stream, error) {
|
||
|
||
perfConfig := types.DefaultPerformanceConfig()
|
||
perfConfig.BufferConfig.DataChannelSize = dataBufSize
|
||
perfConfig.BufferConfig.ResultChannelSize = resultBufSize
|
||
perfConfig.WorkerConfig.SinkPoolSize = sinkPoolSize
|
||
perfConfig.OverflowConfig.Strategy = overflowStrategy
|
||
perfConfig.OverflowConfig.BlockTimeout = timeout
|
||
perfConfig.OverflowConfig.AllowDataLoss = (overflowStrategy == "drop")
|
||
|
||
config.PerformanceConfig = perfConfig
|
||
return newStreamWithUnifiedConfig(config)
|
||
}
|
||
|
||
// NewStreamWithLossPolicyAndPersistence 创建带数据丢失策略和持久化配置的流处理器 (已弃用,使用NewStreamWithCustomPerformance)
|
||
// Deprecated: 使用NewStreamWithCustomPerformance替代
|
||
func NewStreamWithLossPolicyAndPersistence(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int,
|
||
overflowStrategy string, timeout time.Duration, persistDataDir string, persistMaxFileSize int64, persistFlushInterval time.Duration) (*Stream, error) {
|
||
|
||
perfConfig := types.DefaultPerformanceConfig()
|
||
perfConfig.BufferConfig.DataChannelSize = dataBufSize
|
||
perfConfig.BufferConfig.ResultChannelSize = resultBufSize
|
||
perfConfig.WorkerConfig.SinkPoolSize = sinkPoolSize
|
||
perfConfig.OverflowConfig.Strategy = overflowStrategy
|
||
perfConfig.OverflowConfig.BlockTimeout = timeout
|
||
perfConfig.OverflowConfig.AllowDataLoss = (overflowStrategy == "drop")
|
||
|
||
// 设置持久化配置
|
||
if overflowStrategy == "persist" {
|
||
perfConfig.OverflowConfig.PersistenceConfig = &types.PersistenceConfig{
|
||
DataDir: persistDataDir,
|
||
MaxFileSize: persistMaxFileSize,
|
||
FlushInterval: persistFlushInterval,
|
||
MaxRetries: 5,
|
||
RetryInterval: 1 * time.Second,
|
||
}
|
||
}
|
||
|
||
config.PerformanceConfig = perfConfig
|
||
return newStreamWithUnifiedConfig(config)
|
||
}
|