package stream import ( "fmt" "strconv" "strings" "github.com/rulego/streamsql/expr" "github.com/rulego/streamsql/functions" "github.com/rulego/streamsql/logger" "github.com/rulego/streamsql/utils/fieldpath" ) // fieldProcessInfo field processing information for caching pre-compiled field processing logic type fieldProcessInfo struct { fieldName string // Original field name outputName string // Output field name isFunctionCall bool // Whether it's a function call hasNestedField bool // Whether it contains nested fields isSelectAll bool // Whether it's SELECT * isStringLiteral bool // Whether it's a string literal stringValue string // Pre-processed string literal value (quotes removed) alias string // Field alias for quick access } // expressionProcessInfo expression processing information for caching pre-compiled expression processing logic type expressionProcessInfo struct { originalExpr string // Original expression processedExpr string // Pre-processed expression isFunctionCall bool // Whether it's a function call hasNestedFields bool // Whether it contains nested fields compiledExpr *expr.Expression // Pre-compiled expression object needsBacktickPreprocess bool // Whether backtick preprocessing is needed } // compileFieldProcessInfo pre-compiles field processing information to avoid runtime re-parsing func (s *Stream) compileFieldProcessInfo() { s.compiledFieldInfo = make(map[string]*fieldProcessInfo) s.compiledExprInfo = make(map[string]*expressionProcessInfo) // Compile SimpleFields information for _, fieldSpec := range s.config.SimpleFields { info := s.compileSimpleFieldInfo(fieldSpec) s.compiledFieldInfo[fieldSpec] = info } // Pre-compile expression field information s.compileExpressionInfo() } // compileSimpleFieldInfo compiles simple field information func (s *Stream) compileSimpleFieldInfo(fieldSpec string) *fieldProcessInfo { info := &fieldProcessInfo{} if fieldSpec == "*" { info.isSelectAll = true info.fieldName = "" info.outputName = "*" info.alias = "*" return info } // Parse alias parts := strings.Split(fieldSpec, ":") info.fieldName = parts[0] // Remove backticks from field name if len(info.fieldName) >= 2 && info.fieldName[0] == '`' && info.fieldName[len(info.fieldName)-1] == '`' { info.fieldName = info.fieldName[1 : len(info.fieldName)-1] } info.outputName = info.fieldName if len(parts) > 1 { info.outputName = parts[1] // Remove backticks from output name if len(info.outputName) >= 2 && info.outputName[0] == '`' && info.outputName[len(info.outputName)-1] == '`' { info.outputName = info.outputName[1 : len(info.outputName)-1] } } // Pre-determine field characteristics info.isFunctionCall = strings.Contains(info.fieldName, "(") && strings.Contains(info.fieldName, ")") info.hasNestedField = !info.isFunctionCall && fieldpath.IsNestedField(info.fieldName) // Check if it's a string literal and preprocess value info.isStringLiteral = (len(info.fieldName) >= 2 && ((info.fieldName[0] == '\'' && info.fieldName[len(info.fieldName)-1] == '\'') || (info.fieldName[0] == '"' && info.fieldName[len(info.fieldName)-1] == '"'))) // Preprocess string literal value, remove quotes if info.isStringLiteral && len(info.fieldName) >= 2 { info.stringValue = info.fieldName[1 : len(info.fieldName)-1] } // Set alias for quick access info.alias = info.outputName return info } // compileExpressionInfo pre-compiles expression processing information func (s *Stream) compileExpressionInfo() { // Initialize unnest function detection flag s.hasUnnestFunction = false bridge := functions.GetExprBridge() for fieldName, fieldExpr := range s.config.FieldExpressions { exprInfo := &expressionProcessInfo{ originalExpr: fieldExpr.Expression, } // Preprocess expression processedExpr := fieldExpr.Expression if bridge.ContainsIsNullOperator(processedExpr) { if processed, err := bridge.PreprocessIsNullExpression(processedExpr); err == nil { processedExpr = processed } } if bridge.ContainsLikeOperator(processedExpr) { if processed, err := bridge.PreprocessLikeExpression(processedExpr); err == nil { processedExpr = processed } } exprInfo.processedExpr = processedExpr // Pre-judge expression characteristics exprInfo.isFunctionCall = strings.Contains(fieldExpr.Expression, "(") && strings.Contains(fieldExpr.Expression, ")") exprInfo.hasNestedFields = !exprInfo.isFunctionCall && strings.Contains(fieldExpr.Expression, ".") exprInfo.needsBacktickPreprocess = bridge.ContainsBacktickIdentifiers(fieldExpr.Expression) // Check if expression contains unnest function if exprInfo.isFunctionCall && strings.Contains(strings.ToLower(fieldExpr.Expression), "unnest(") { s.hasUnnestFunction = true } // Pre-compile expression object (only for non-function call expressions) if !exprInfo.isFunctionCall { exprToCompile := fieldExpr.Expression if exprInfo.needsBacktickPreprocess { if processed, err := bridge.PreprocessBacktickIdentifiers(exprToCompile); err == nil { exprToCompile = processed } } if compiledExpr, err := expr.NewExpression(exprToCompile); err == nil { exprInfo.compiledExpr = compiledExpr } } s.compiledExprInfo[fieldName] = exprInfo } } // processExpressionField processes expression field func (s *Stream) processExpressionField(fieldName string, dataMap map[string]interface{}, result map[string]interface{}) { exprInfo := s.compiledExprInfo[fieldName] if exprInfo == nil { // Fallback to original logic s.processExpressionFieldFallback(fieldName, dataMap, result) return } var evalResult interface{} bridge := functions.GetExprBridge() if exprInfo.isFunctionCall { // For function calls, use bridge processor exprResult, err := bridge.EvaluateExpression(exprInfo.processedExpr, dataMap) if err != nil { logger.Error("Function call evaluation failed for field %s: %v", fieldName, err) result[fieldName] = nil return } evalResult = exprResult } else if exprInfo.hasNestedFields { // Use pre-compiled expression object if exprInfo.compiledExpr != nil { // Use EvaluateValueWithNull to get actual value (including strings) exprResult, isNull, err := exprInfo.compiledExpr.EvaluateValueWithNull(dataMap) if err != nil { logger.Error("Expression evaluation failed for field %s: %v", fieldName, err) result[fieldName] = nil return } if isNull { evalResult = nil } else { evalResult = exprResult } } else { // Fallback to dynamic compilation s.processExpressionFieldFallback(fieldName, dataMap, result) return } } else { // Try using bridge processor for other expressions exprResult, err := bridge.EvaluateExpression(exprInfo.processedExpr, dataMap) if err != nil { // If bridge fails, use pre-compiled expression object if exprInfo.compiledExpr != nil { // Use EvaluateValueWithNull to get actual value (including strings) exprResult, isNull, evalErr := exprInfo.compiledExpr.EvaluateValueWithNull(dataMap) if evalErr != nil { logger.Error("Expression evaluation failed for field %s: %v", fieldName, evalErr) result[fieldName] = nil return } if isNull { evalResult = nil } else { evalResult = exprResult } } else { // Fallback to dynamic compilation s.processExpressionFieldFallback(fieldName, dataMap, result) return } } else { evalResult = exprResult } } result[fieldName] = evalResult } // processExpressionFieldFallback fallback logic for expression field processing func (s *Stream) processExpressionFieldFallback(fieldName string, dataMap map[string]interface{}, result map[string]interface{}) { fieldExpr, exists := s.config.FieldExpressions[fieldName] if !exists { result[fieldName] = nil return } // Use bridge to calculate expression, supports IS NULL and other syntax bridge := functions.GetExprBridge() // Preprocess IS NULL and LIKE syntax in expression processedExpr := fieldExpr.Expression if bridge.ContainsIsNullOperator(processedExpr) { if processed, err := bridge.PreprocessIsNullExpression(processedExpr); err == nil { processedExpr = processed } } if bridge.ContainsLikeOperator(processedExpr) { if processed, err := bridge.PreprocessLikeExpression(processedExpr); err == nil { processedExpr = processed } } // Check if expression is a function call (contains parentheses) isFunctionCall := strings.Contains(fieldExpr.Expression, "(") && strings.Contains(fieldExpr.Expression, ")") // Check if expression contains nested fields (but exclude dots in function calls) hasNestedFields := false if !isFunctionCall && strings.Contains(fieldExpr.Expression, ".") { hasNestedFields = true } var evalResult interface{} if isFunctionCall { // For function calls, prioritize bridge processor exprResult, err := bridge.EvaluateExpression(processedExpr, dataMap) if err != nil { logger.Error("Function call evaluation failed for field %s: %v", fieldName, err) result[fieldName] = nil return } evalResult = exprResult } else if hasNestedFields { // Detected nested fields (non-function call), use custom expression engine exprToUse := fieldExpr.Expression if bridge.ContainsBacktickIdentifiers(exprToUse) { if processed, err := bridge.PreprocessBacktickIdentifiers(exprToUse); err == nil { exprToUse = processed } } expression, parseErr := expr.NewExpression(exprToUse) if parseErr != nil { logger.Error("Expression parse failed for field %s: %v", fieldName, parseErr) result[fieldName] = nil return } // Use EvaluateValueWithNull to get actual value (including strings) exprResult, isNull, err := expression.EvaluateValueWithNull(dataMap) if err != nil { logger.Error("Expression evaluation failed for field %s: %v", fieldName, err) result[fieldName] = nil return } if isNull { evalResult = nil } else { evalResult = exprResult } } else { // Try using bridge processor for other expressions exprResult, err := bridge.EvaluateExpression(processedExpr, dataMap) if err != nil { // If bridge fails, fallback to original expression engine exprToUse := fieldExpr.Expression if bridge.ContainsBacktickIdentifiers(exprToUse) { if processed, err := bridge.PreprocessBacktickIdentifiers(exprToUse); err == nil { exprToUse = processed } } expression, parseErr := expr.NewExpression(exprToUse) if parseErr != nil { logger.Error("Expression parse failed for field %s: %v", fieldName, parseErr) result[fieldName] = nil return } // Use EvaluateValueWithNull to get actual value (including strings) exprResult, isNull, evalErr := expression.EvaluateValueWithNull(dataMap) if evalErr != nil { logger.Error("Expression evaluation failed for field %s: %v", fieldName, evalErr) result[fieldName] = nil return } if isNull { evalResult = nil } else { evalResult = exprResult } } else { evalResult = exprResult } } result[fieldName] = evalResult } // processSimpleField processes simple field func (s *Stream) processSimpleField(fieldSpec string, dataMap map[string]interface{}, data interface{}, result map[string]interface{}) { info := s.compiledFieldInfo[fieldSpec] if info == nil { // If no pre-compiled info, fallback to original logic (safety guarantee) s.processSingleFieldFallback(fieldSpec, dataMap, data, result) return } if info.isSelectAll { // SELECT *: batch copy all fields, skip expression fields for k, v := range dataMap { if _, isExpression := s.config.FieldExpressions[k]; !isExpression { result[k] = v } } return } // Skip fields already processed by expression fields if _, isExpression := s.config.FieldExpressions[info.outputName]; isExpression { return } if info.isStringLiteral { // String literal processing: use pre-compiled string value result[info.alias] = info.stringValue } else if info.isFunctionCall { // Execute function call if funcResult, err := s.executeFunction(info.fieldName, dataMap); err == nil { result[info.outputName] = funcResult } else { logger.Error("Function execution error %s: %v", info.fieldName, err) result[info.outputName] = nil } } else { // Ordinary field processing var value interface{} var exists bool if info.hasNestedField { value, exists = fieldpath.GetNestedField(data, info.fieldName) } else { value, exists = dataMap[info.fieldName] } if exists { result[info.outputName] = value } else { result[info.outputName] = nil } } } // processSingleFieldFallback fallback processing for single field (when pre-compiled info is missing) func (s *Stream) processSingleFieldFallback(fieldSpec string, dataMap map[string]interface{}, data interface{}, result map[string]interface{}) { // Handle special case of SELECT * if fieldSpec == "*" { // SELECT *: return all fields, but skip fields already processed by expression fields for k, v := range dataMap { // If field already processed by expression field, skip, maintain expression calculation result if _, isExpression := s.config.FieldExpressions[k]; !isExpression { result[k] = v } } return } // Handle alias parts := strings.Split(fieldSpec, ":") fieldName := parts[0] outputName := fieldName if len(parts) > 1 { outputName = parts[1] } // Skip fields already processed by expression fields if _, isExpression := s.config.FieldExpressions[outputName]; isExpression { return } // Check if it's a function call if strings.Contains(fieldName, "(") && strings.Contains(fieldName, ")") { // Execute function call if funcResult, err := s.executeFunction(fieldName, dataMap); err == nil { result[outputName] = funcResult } else { logger.Error("Function execution error %s: %v", fieldName, err) result[outputName] = nil } } else { // Ordinary field - supports nested fields var value interface{} var exists bool if fieldpath.IsNestedField(fieldName) { value, exists = fieldpath.GetNestedField(data, fieldName) } else { value, exists = dataMap[fieldName] } if exists { result[outputName] = value } else { result[outputName] = nil } } } // executeFunction executes function call func (s *Stream) executeFunction(funcExpr string, data map[string]interface{}) (interface{}, error) { // Check if it's a custom function funcName := extractFunctionName(funcExpr) if funcName != "" { // Use function system directly fn, exists := functions.Get(funcName) if exists { // Parse parameters args, err := s.parseFunctionArgs(funcExpr, data) if err != nil { return nil, err } // Create function context ctx := &functions.FunctionContext{Data: data} // Execute function return fn.Execute(ctx, args) } } // For complex nested function calls, use ExprBridge directly // This avoids the float64 type limitation of Expression.Evaluate bridge := functions.GetExprBridge() result, err := bridge.EvaluateExpression(funcExpr, data) if err != nil { return nil, fmt.Errorf("evaluate function expression failed: %w", err) } return result, nil } // extractFunctionName extracts function name from expression func extractFunctionName(expr string) string { parenIndex := strings.Index(expr, "(") if parenIndex == -1 { return "" } funcName := strings.TrimSpace(expr[:parenIndex]) if strings.ContainsAny(funcName, " +-*/=<>!&|") { return "" } return funcName } // parseFunctionArgs parses function arguments, supports nested function calls func (s *Stream) parseFunctionArgs(funcExpr string, data map[string]interface{}) ([]interface{}, error) { // Extract parameters within parentheses start := strings.Index(funcExpr, "(") end := strings.LastIndex(funcExpr, ")") if start == -1 || end == -1 || end <= start { return nil, fmt.Errorf("invalid function expression: %s", funcExpr) } argsStr := strings.TrimSpace(funcExpr[start+1 : end]) if argsStr == "" { return []interface{}{}, nil } // Smart split arguments, handle nested functions and quotes argParts, err := s.smartSplitArgs(argsStr) if err != nil { return nil, err } args := make([]interface{}, len(argParts)) for i, arg := range argParts { arg = strings.TrimSpace(arg) // If parameter is string constant (enclosed in quotes) if strings.HasPrefix(arg, "'") && strings.HasSuffix(arg, "'") { args[i] = strings.Trim(arg, "'") } else if strings.HasPrefix(arg, "\"") && strings.HasSuffix(arg, "\"") { args[i] = strings.Trim(arg, "\"") } else if strings.Contains(arg, "(") { // If parameter contains function call, execute recursively result, err := s.executeFunction(arg, data) if err != nil { return nil, fmt.Errorf("failed to execute nested function '%s': %v", arg, err) } args[i] = result } else if value, exists := data[arg]; exists { // If it's a data field args[i] = value } else { // Try to parse as number if val, err := strconv.ParseFloat(arg, 64); err == nil { args[i] = val } else { args[i] = arg } } } return args, nil } // smartSplitArgs intelligently splits arguments, considering bracket nesting and quotes func (s *Stream) smartSplitArgs(argsStr string) ([]string, error) { var args []string var current strings.Builder parenDepth := 0 inQuotes := false quoteChar := byte(0) for i := 0; i < len(argsStr); i++ { ch := argsStr[i] switch ch { case '\'': if !inQuotes { inQuotes = true quoteChar = ch } else if quoteChar == ch { inQuotes = false quoteChar = 0 } current.WriteByte(ch) case '"': if !inQuotes { inQuotes = true quoteChar = ch } else if quoteChar == ch { inQuotes = false quoteChar = 0 } current.WriteByte(ch) case '(': if !inQuotes { parenDepth++ } current.WriteByte(ch) case ')': if !inQuotes { parenDepth-- } current.WriteByte(ch) case ',': if !inQuotes && parenDepth == 0 { // Found parameter separator args = append(args, strings.TrimSpace(current.String())) current.Reset() } else { current.WriteByte(ch) } default: current.WriteByte(ch) } } // Add the last parameter if current.Len() > 0 { args = append(args, strings.TrimSpace(current.String())) } return args, nil }