diff --git a/stream/stream.go b/stream/stream.go index 0da0259..5e17396 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -21,6 +21,15 @@ import ( "github.com/rulego/streamsql/window" ) +// fieldProcessInfo 字段处理信息,用于缓存预编译的字段处理逻辑 +type fieldProcessInfo struct { + fieldName string // 原始字段名 + outputName string // 输出字段名 + isFunctionCall bool // 是否为函数调用 + hasNestedField bool // 是否包含嵌套字段 + isSelectAll bool // 是否为SELECT * +} + type Stream struct { dataChan chan interface{} filter condition.Condition @@ -52,6 +61,13 @@ type Stream struct { blockingTimeout time.Duration // 阻塞超时时间 overflowStrategy string // 溢出策略: "drop", "block", "expand", "persist" persistenceManager *PersistenceManager // 持久化管理器 + + // 性能优化:预编译的AddData函数指针,避免每次switch判断 + addDataFunc func(interface{}) // 根据策略预设的函数指针 + + // 性能优化:预编译字段处理信息,避免重复解析 + compiledFieldInfo map[string]*fieldProcessInfo // 字段处理信息缓存 + } // NewStream 使用统一配置创建Stream @@ -138,6 +154,21 @@ func newStreamWithUnifiedConfig(config types.Config) (*Stream, error) { } } + // 性能优化:根据溢出策略预设AddData函数指针,避免运行时switch判断 + switch perfConfig.OverflowConfig.Strategy { + case "block": + stream.addDataFunc = stream.addDataBlocking + case "expand": + stream.addDataFunc = stream.addDataWithExpansion + case "persist": + stream.addDataFunc = stream.addDataWithPersistence + default: + stream.addDataFunc = stream.addDataWithDrop + } + + // 性能优化:预编译字段处理信息 + stream.compileFieldProcessInfo() + // 启动工作协程,使用配置的工作线程数 go stream.startSinkWorkerPool(perfConfig.WorkerConfig.SinkWorkerCount) go stream.startResultConsumer() @@ -241,6 +272,36 @@ func convertToAggregationFields(selectFields map[string]aggregator.AggregateType return fields } +// compileFieldProcessInfo 预编译字段处理信息,避免运行时重复解析 +func (s *Stream) compileFieldProcessInfo() { + s.compiledFieldInfo = make(map[string]*fieldProcessInfo) + + // 编译SimpleFields信息 + for _, fieldSpec := range s.config.SimpleFields { + info := &fieldProcessInfo{} + + if fieldSpec == "*" { + info.isSelectAll = true + info.fieldName = "*" + info.outputName = "*" + } else { + // 解析别名 + parts := strings.Split(fieldSpec, ":") + info.fieldName = parts[0] + info.outputName = info.fieldName + if len(parts) > 1 { + info.outputName = parts[1] + } + + // 预判断字段特征 + info.isFunctionCall = strings.Contains(info.fieldName, "(") && strings.Contains(info.fieldName, ")") + info.hasNestedField = !info.isFunctionCall && fieldpath.IsNestedField(info.fieldName) + } + + s.compiledFieldInfo[fieldSpec] = info + } +} + func (s *Stream) Start() { // 启动处理协程 go s.process() @@ -543,8 +604,12 @@ func (s *Stream) processDirectData(data interface{}) { return } - // 创建结果map - result := make(map[string]interface{}) + // 创建结果map,预分配合适容量 + estimatedSize := len(s.config.FieldExpressions) + len(s.config.SimpleFields) + if estimatedSize < 8 { + estimatedSize = 8 // 最小容量 + } + result := make(map[string]interface{}, estimatedSize) // 处理表达式字段 for fieldName, fieldExpr := range s.config.FieldExpressions { @@ -629,14 +694,19 @@ func (s *Stream) processDirectData(data interface{}) { result[fieldName] = evalResult } - // 如果指定了字段,只保留这些字段 + // 性能优化:使用预编译的字段信息处理SimpleFields if len(s.config.SimpleFields) > 0 { for _, fieldSpec := range s.config.SimpleFields { - // 处理SELECT *的特殊情况 - if fieldSpec == "*" { - // SELECT *:返回所有字段,但跳过已经通过表达式字段处理的字段 + info := s.compiledFieldInfo[fieldSpec] + if info == nil { + // 如果没有预编译信息,回退到原逻辑(安全性保证) + s.processSingleFieldFallback(fieldSpec, dataMap, data, result) + continue + } + + if info.isSelectAll { + // SELECT *:批量复制所有字段,跳过表达式字段 for k, v := range dataMap { - // 如果该字段已经通过表达式字段处理,则跳过,保持表达式计算结果 if _, isExpression := s.config.FieldExpressions[k]; !isExpression { result[k] = v } @@ -644,43 +714,34 @@ func (s *Stream) processDirectData(data interface{}) { continue } - // 处理别名 - parts := strings.Split(fieldSpec, ":") - fieldName := parts[0] - outputName := fieldName - if len(parts) > 1 { - outputName = parts[1] - } - // 跳过已经通过表达式字段处理的字段 - if _, isExpression := s.config.FieldExpressions[outputName]; isExpression { + if _, isExpression := s.config.FieldExpressions[info.outputName]; isExpression { continue } - // 检查是否是函数调用 - if strings.Contains(fieldName, "(") && strings.Contains(fieldName, ")") { + if info.isFunctionCall { // 执行函数调用 - if funcResult, err := s.executeFunction(fieldName, dataMap); err == nil { - result[outputName] = funcResult + if funcResult, err := s.executeFunction(info.fieldName, dataMap); err == nil { + result[info.outputName] = funcResult } else { - logger.Error("Function execution error %s: %v", fieldName, err) - result[outputName] = nil + logger.Error("Function execution error %s: %v", info.fieldName, err) + result[info.outputName] = nil } } else { - // 普通字段 - 支持嵌套字段 + // 普通字段处理 var value interface{} var exists bool - if fieldpath.IsNestedField(fieldName) { - value, exists = fieldpath.GetNestedField(data, fieldName) + if info.hasNestedField { + value, exists = fieldpath.GetNestedField(data, info.fieldName) } else { - value, exists = dataMap[fieldName] + value, exists = dataMap[info.fieldName] } if exists { - result[outputName] = value + result[info.outputName] = value } else { - result[outputName] = nil + result[info.outputName] = nil } } } @@ -701,6 +762,61 @@ func (s *Stream) processDirectData(data interface{}) { s.callSinksAsync(results) } +// processSingleFieldFallback 回退处理单个字段(当预编译信息缺失时) +func (s *Stream) processSingleFieldFallback(fieldSpec string, dataMap map[string]interface{}, data interface{}, result map[string]interface{}) { + // 处理SELECT *的特殊情况 + if fieldSpec == "*" { + // SELECT *:返回所有字段,但跳过已经通过表达式字段处理的字段 + for k, v := range dataMap { + // 如果该字段已经通过表达式字段处理,则跳过,保持表达式计算结果 + if _, isExpression := s.config.FieldExpressions[k]; !isExpression { + result[k] = v + } + } + return + } + + // 处理别名 + parts := strings.Split(fieldSpec, ":") + fieldName := parts[0] + outputName := fieldName + if len(parts) > 1 { + outputName = parts[1] + } + + // 跳过已经通过表达式字段处理的字段 + if _, isExpression := s.config.FieldExpressions[outputName]; isExpression { + return + } + + // 检查是否是函数调用 + if strings.Contains(fieldName, "(") && strings.Contains(fieldName, ")") { + // 执行函数调用 + if funcResult, err := s.executeFunction(fieldName, dataMap); err == nil { + result[outputName] = funcResult + } else { + logger.Error("Function execution error %s: %v", fieldName, err) + result[outputName] = nil + } + } else { + // 普通字段 - 支持嵌套字段 + var value interface{} + var exists bool + + if fieldpath.IsNestedField(fieldName) { + value, exists = fieldpath.GetNestedField(data, fieldName) + } else { + value, exists = dataMap[fieldName] + } + + if exists { + result[outputName] = value + } else { + result[outputName] = nil + } + } +} + // sendResultNonBlocking 非阻塞方式发送结果到resultChan (智能背压控制) func (s *Stream) sendResultNonBlocking(results []map[string]interface{}) { select { @@ -939,22 +1055,8 @@ func (s *Stream) smartSplitArgs(argsStr string) ([]string, error) { func (s *Stream) AddData(data interface{}) { atomic.AddInt64(&s.inputCount, 1) - - // 根据溢出策略处理数据 - switch s.overflowStrategy { - case "block": - // 阻塞模式:保证数据不丢失 - s.addDataBlocking(data) - case "expand": - // 动态扩容模式:自动扩大缓冲区 - s.addDataWithExpansion(data) - case "persist": - // 持久化模式:溢出数据写入磁盘 - s.addDataWithPersistence(data) - default: - // 默认drop模式:原有逻辑 - s.addDataWithDrop(data) - } + // 性能优化:直接调用预编译的函数指针,避免switch判断 + s.addDataFunc(data) } // addDataBlocking 阻塞模式添加数据,保证零数据丢失 (线程安全版本) @@ -996,7 +1098,7 @@ func (s *Stream) addDataWithExpansion(data interface{}) { // 扩容后重试,重新获取通道引用 if s.safeSendToDataChan(data) { - logger.Info("Successfully added data after data channel expansion") + logger.Debug("Successfully added data after data channel expansion") return } @@ -1268,7 +1370,7 @@ func (s *Stream) expandDataChannel() { newCap = oldCap + 1000 // 至少增加1000 } - logger.Info("Dynamic expansion of data channel: %d -> %d", oldCap, newCap) + logger.Debug("Dynamic expansion of data channel: %d -> %d", oldCap, newCap) // 创建新的更大的通道 newChan := make(chan interface{}, newCap) @@ -1306,7 +1408,7 @@ migration_done: s.dataChan = newChan s.dataChanMux.Unlock() - logger.Info("Channel expansion completed: migrated %d items", migratedCount) + logger.Debug("Channel expansion completed: migrated %d items", migratedCount) } // persistAndRetryData 持久化数据并重试 (改进版本,具备指数退避和资源控制)