forked from GiteaTest2015/streamsql
600 lines
18 KiB
Go
600 lines
18 KiB
Go
package stream
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/rulego/streamsql/expr"
|
|
"github.com/rulego/streamsql/functions"
|
|
"github.com/rulego/streamsql/logger"
|
|
"github.com/rulego/streamsql/utils/fieldpath"
|
|
)
|
|
|
|
// fieldProcessInfo field processing information for caching pre-compiled field processing logic
|
|
type fieldProcessInfo struct {
|
|
fieldName string // Original field name
|
|
outputName string // Output field name
|
|
isFunctionCall bool // Whether it's a function call
|
|
hasNestedField bool // Whether it contains nested fields
|
|
isSelectAll bool // Whether it's SELECT *
|
|
isStringLiteral bool // Whether it's a string literal
|
|
stringValue string // Pre-processed string literal value (quotes removed)
|
|
alias string // Field alias for quick access
|
|
}
|
|
|
|
// expressionProcessInfo expression processing information for caching pre-compiled expression processing logic
|
|
type expressionProcessInfo struct {
|
|
originalExpr string // Original expression
|
|
processedExpr string // Pre-processed expression
|
|
isFunctionCall bool // Whether it's a function call
|
|
hasNestedFields bool // Whether it contains nested fields
|
|
compiledExpr *expr.Expression // Pre-compiled expression object
|
|
needsBacktickPreprocess bool // Whether backtick preprocessing is needed
|
|
}
|
|
|
|
// compileFieldProcessInfo pre-compiles field processing information to avoid runtime re-parsing
|
|
func (s *Stream) compileFieldProcessInfo() {
|
|
s.compiledFieldInfo = make(map[string]*fieldProcessInfo)
|
|
s.compiledExprInfo = make(map[string]*expressionProcessInfo)
|
|
|
|
// Compile SimpleFields information
|
|
for _, fieldSpec := range s.config.SimpleFields {
|
|
info := s.compileSimpleFieldInfo(fieldSpec)
|
|
s.compiledFieldInfo[fieldSpec] = info
|
|
}
|
|
|
|
// Pre-compile expression field information
|
|
s.compileExpressionInfo()
|
|
}
|
|
|
|
// compileSimpleFieldInfo compiles simple field information
|
|
func (s *Stream) compileSimpleFieldInfo(fieldSpec string) *fieldProcessInfo {
|
|
info := &fieldProcessInfo{}
|
|
|
|
if fieldSpec == "*" {
|
|
info.isSelectAll = true
|
|
info.fieldName = ""
|
|
info.outputName = "*"
|
|
info.alias = "*"
|
|
return info
|
|
}
|
|
|
|
// Parse alias
|
|
parts := strings.Split(fieldSpec, ":")
|
|
info.fieldName = parts[0]
|
|
// Remove backticks from field name
|
|
if len(info.fieldName) >= 2 && info.fieldName[0] == '`' && info.fieldName[len(info.fieldName)-1] == '`' {
|
|
info.fieldName = info.fieldName[1 : len(info.fieldName)-1]
|
|
}
|
|
info.outputName = info.fieldName
|
|
if len(parts) > 1 {
|
|
info.outputName = parts[1]
|
|
// Remove backticks from output name
|
|
if len(info.outputName) >= 2 && info.outputName[0] == '`' && info.outputName[len(info.outputName)-1] == '`' {
|
|
info.outputName = info.outputName[1 : len(info.outputName)-1]
|
|
}
|
|
}
|
|
|
|
// Pre-determine field characteristics
|
|
info.isFunctionCall = strings.Contains(info.fieldName, "(") && strings.Contains(info.fieldName, ")")
|
|
info.hasNestedField = !info.isFunctionCall && fieldpath.IsNestedField(info.fieldName)
|
|
|
|
// Check if it's a string literal and preprocess value
|
|
info.isStringLiteral = (len(info.fieldName) >= 2 &&
|
|
((info.fieldName[0] == '\'' && info.fieldName[len(info.fieldName)-1] == '\'') ||
|
|
(info.fieldName[0] == '"' && info.fieldName[len(info.fieldName)-1] == '"')))
|
|
|
|
// Preprocess string literal value, remove quotes
|
|
if info.isStringLiteral && len(info.fieldName) >= 2 {
|
|
info.stringValue = info.fieldName[1 : len(info.fieldName)-1]
|
|
}
|
|
|
|
// Set alias for quick access
|
|
info.alias = info.outputName
|
|
|
|
return info
|
|
}
|
|
|
|
// compileExpressionInfo pre-compiles expression processing information
|
|
func (s *Stream) compileExpressionInfo() {
|
|
// Initialize unnest function detection flag
|
|
s.hasUnnestFunction = false
|
|
bridge := functions.GetExprBridge()
|
|
|
|
for fieldName, fieldExpr := range s.config.FieldExpressions {
|
|
exprInfo := &expressionProcessInfo{
|
|
originalExpr: fieldExpr.Expression,
|
|
}
|
|
|
|
// Preprocess expression
|
|
processedExpr := fieldExpr.Expression
|
|
if bridge.ContainsIsNullOperator(processedExpr) {
|
|
if processed, err := bridge.PreprocessIsNullExpression(processedExpr); err == nil {
|
|
processedExpr = processed
|
|
}
|
|
}
|
|
if bridge.ContainsLikeOperator(processedExpr) {
|
|
if processed, err := bridge.PreprocessLikeExpression(processedExpr); err == nil {
|
|
processedExpr = processed
|
|
}
|
|
}
|
|
exprInfo.processedExpr = processedExpr
|
|
|
|
// Pre-judge expression characteristics
|
|
exprInfo.isFunctionCall = strings.Contains(fieldExpr.Expression, "(") && strings.Contains(fieldExpr.Expression, ")")
|
|
exprInfo.hasNestedFields = !exprInfo.isFunctionCall && strings.Contains(fieldExpr.Expression, ".")
|
|
exprInfo.needsBacktickPreprocess = bridge.ContainsBacktickIdentifiers(fieldExpr.Expression)
|
|
|
|
// Check if expression contains unnest function
|
|
if exprInfo.isFunctionCall && strings.Contains(strings.ToLower(fieldExpr.Expression), "unnest(") {
|
|
s.hasUnnestFunction = true
|
|
}
|
|
|
|
// Pre-compile expression object (only for non-function call expressions)
|
|
if !exprInfo.isFunctionCall {
|
|
exprToCompile := fieldExpr.Expression
|
|
if exprInfo.needsBacktickPreprocess {
|
|
if processed, err := bridge.PreprocessBacktickIdentifiers(exprToCompile); err == nil {
|
|
exprToCompile = processed
|
|
}
|
|
}
|
|
if compiledExpr, err := expr.NewExpression(exprToCompile); err == nil {
|
|
exprInfo.compiledExpr = compiledExpr
|
|
}
|
|
}
|
|
|
|
s.compiledExprInfo[fieldName] = exprInfo
|
|
}
|
|
}
|
|
|
|
// processExpressionField processes expression field
|
|
func (s *Stream) processExpressionField(fieldName string, dataMap map[string]interface{}, result map[string]interface{}) {
|
|
exprInfo := s.compiledExprInfo[fieldName]
|
|
if exprInfo == nil {
|
|
// Fallback to original logic
|
|
s.processExpressionFieldFallback(fieldName, dataMap, result)
|
|
return
|
|
}
|
|
|
|
var evalResult interface{}
|
|
bridge := functions.GetExprBridge()
|
|
|
|
if exprInfo.isFunctionCall {
|
|
// For function calls, use bridge processor
|
|
exprResult, err := bridge.EvaluateExpression(exprInfo.processedExpr, dataMap)
|
|
if err != nil {
|
|
logger.Error("Function call evaluation failed for field %s: %v", fieldName, err)
|
|
result[fieldName] = nil
|
|
return
|
|
}
|
|
evalResult = exprResult
|
|
} else if exprInfo.hasNestedFields {
|
|
// Use pre-compiled expression object
|
|
if exprInfo.compiledExpr != nil {
|
|
// Use EvaluateValueWithNull to get actual value (including strings)
|
|
exprResult, isNull, err := exprInfo.compiledExpr.EvaluateValueWithNull(dataMap)
|
|
if err != nil {
|
|
logger.Error("Expression evaluation failed for field %s: %v", fieldName, err)
|
|
result[fieldName] = nil
|
|
return
|
|
}
|
|
if isNull {
|
|
evalResult = nil
|
|
} else {
|
|
evalResult = exprResult
|
|
}
|
|
} else {
|
|
// Fallback to dynamic compilation
|
|
s.processExpressionFieldFallback(fieldName, dataMap, result)
|
|
return
|
|
}
|
|
} else {
|
|
// Try using bridge processor for other expressions
|
|
exprResult, err := bridge.EvaluateExpression(exprInfo.processedExpr, dataMap)
|
|
if err != nil {
|
|
// If bridge fails, use pre-compiled expression object
|
|
if exprInfo.compiledExpr != nil {
|
|
// Use EvaluateValueWithNull to get actual value (including strings)
|
|
exprResult, isNull, evalErr := exprInfo.compiledExpr.EvaluateValueWithNull(dataMap)
|
|
if evalErr != nil {
|
|
logger.Error("Expression evaluation failed for field %s: %v", fieldName, evalErr)
|
|
result[fieldName] = nil
|
|
return
|
|
}
|
|
if isNull {
|
|
evalResult = nil
|
|
} else {
|
|
evalResult = exprResult
|
|
}
|
|
} else {
|
|
// Fallback to dynamic compilation
|
|
s.processExpressionFieldFallback(fieldName, dataMap, result)
|
|
return
|
|
}
|
|
} else {
|
|
evalResult = exprResult
|
|
}
|
|
}
|
|
|
|
result[fieldName] = evalResult
|
|
}
|
|
|
|
// processExpressionFieldFallback fallback logic for expression field processing
|
|
func (s *Stream) processExpressionFieldFallback(fieldName string, dataMap map[string]interface{}, result map[string]interface{}) {
|
|
fieldExpr, exists := s.config.FieldExpressions[fieldName]
|
|
if !exists {
|
|
result[fieldName] = nil
|
|
return
|
|
}
|
|
|
|
// Use bridge to calculate expression, supports IS NULL and other syntax
|
|
bridge := functions.GetExprBridge()
|
|
|
|
// Preprocess IS NULL and LIKE syntax in expression
|
|
processedExpr := fieldExpr.Expression
|
|
if bridge.ContainsIsNullOperator(processedExpr) {
|
|
if processed, err := bridge.PreprocessIsNullExpression(processedExpr); err == nil {
|
|
processedExpr = processed
|
|
}
|
|
}
|
|
if bridge.ContainsLikeOperator(processedExpr) {
|
|
if processed, err := bridge.PreprocessLikeExpression(processedExpr); err == nil {
|
|
processedExpr = processed
|
|
}
|
|
}
|
|
|
|
// Check if expression is a function call (contains parentheses)
|
|
isFunctionCall := strings.Contains(fieldExpr.Expression, "(") && strings.Contains(fieldExpr.Expression, ")")
|
|
|
|
// Check if expression contains nested fields (but exclude dots in function calls)
|
|
hasNestedFields := false
|
|
if !isFunctionCall && strings.Contains(fieldExpr.Expression, ".") {
|
|
hasNestedFields = true
|
|
}
|
|
|
|
var evalResult interface{}
|
|
|
|
if isFunctionCall {
|
|
// For function calls, prioritize bridge processor
|
|
exprResult, err := bridge.EvaluateExpression(processedExpr, dataMap)
|
|
if err != nil {
|
|
logger.Error("Function call evaluation failed for field %s: %v", fieldName, err)
|
|
result[fieldName] = nil
|
|
return
|
|
}
|
|
evalResult = exprResult
|
|
} else if hasNestedFields {
|
|
// Detected nested fields (non-function call), use custom expression engine
|
|
exprToUse := fieldExpr.Expression
|
|
if bridge.ContainsBacktickIdentifiers(exprToUse) {
|
|
if processed, err := bridge.PreprocessBacktickIdentifiers(exprToUse); err == nil {
|
|
exprToUse = processed
|
|
}
|
|
}
|
|
expression, parseErr := expr.NewExpression(exprToUse)
|
|
if parseErr != nil {
|
|
logger.Error("Expression parse failed for field %s: %v", fieldName, parseErr)
|
|
result[fieldName] = nil
|
|
return
|
|
}
|
|
|
|
// Use EvaluateValueWithNull to get actual value (including strings)
|
|
exprResult, isNull, err := expression.EvaluateValueWithNull(dataMap)
|
|
if err != nil {
|
|
logger.Error("Expression evaluation failed for field %s: %v", fieldName, err)
|
|
result[fieldName] = nil
|
|
return
|
|
}
|
|
if isNull {
|
|
evalResult = nil
|
|
} else {
|
|
evalResult = exprResult
|
|
}
|
|
} else {
|
|
// Try using bridge processor for other expressions
|
|
exprResult, err := bridge.EvaluateExpression(processedExpr, dataMap)
|
|
if err != nil {
|
|
// If bridge fails, fallback to original expression engine
|
|
exprToUse := fieldExpr.Expression
|
|
if bridge.ContainsBacktickIdentifiers(exprToUse) {
|
|
if processed, err := bridge.PreprocessBacktickIdentifiers(exprToUse); err == nil {
|
|
exprToUse = processed
|
|
}
|
|
}
|
|
expression, parseErr := expr.NewExpression(exprToUse)
|
|
if parseErr != nil {
|
|
logger.Error("Expression parse failed for field %s: %v", fieldName, parseErr)
|
|
result[fieldName] = nil
|
|
return
|
|
}
|
|
|
|
// Use EvaluateValueWithNull to get actual value (including strings)
|
|
exprResult, isNull, evalErr := expression.EvaluateValueWithNull(dataMap)
|
|
if evalErr != nil {
|
|
logger.Error("Expression evaluation failed for field %s: %v", fieldName, evalErr)
|
|
result[fieldName] = nil
|
|
return
|
|
}
|
|
if isNull {
|
|
evalResult = nil
|
|
} else {
|
|
evalResult = exprResult
|
|
}
|
|
} else {
|
|
evalResult = exprResult
|
|
}
|
|
}
|
|
|
|
result[fieldName] = evalResult
|
|
}
|
|
|
|
// processSimpleField processes simple field
|
|
func (s *Stream) processSimpleField(fieldSpec string, dataMap map[string]interface{}, data interface{}, result map[string]interface{}) {
|
|
info := s.compiledFieldInfo[fieldSpec]
|
|
if info == nil {
|
|
// If no pre-compiled info, fallback to original logic (safety guarantee)
|
|
s.processSingleFieldFallback(fieldSpec, dataMap, data, result)
|
|
return
|
|
}
|
|
|
|
if info.isSelectAll {
|
|
// SELECT *: batch copy all fields, skip expression fields
|
|
for k, v := range dataMap {
|
|
if _, isExpression := s.config.FieldExpressions[k]; !isExpression {
|
|
result[k] = v
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Skip fields already processed by expression fields
|
|
if _, isExpression := s.config.FieldExpressions[info.outputName]; isExpression {
|
|
return
|
|
}
|
|
|
|
if info.isStringLiteral {
|
|
// String literal processing: use pre-compiled string value
|
|
result[info.alias] = info.stringValue
|
|
} else if info.isFunctionCall {
|
|
// Execute function call
|
|
if funcResult, err := s.executeFunction(info.fieldName, dataMap); err == nil {
|
|
result[info.outputName] = funcResult
|
|
} else {
|
|
logger.Error("Function execution error %s: %v", info.fieldName, err)
|
|
result[info.outputName] = nil
|
|
}
|
|
} else {
|
|
// Ordinary field processing
|
|
var value interface{}
|
|
var exists bool
|
|
|
|
if info.hasNestedField {
|
|
value, exists = fieldpath.GetNestedField(data, info.fieldName)
|
|
} else {
|
|
value, exists = dataMap[info.fieldName]
|
|
}
|
|
|
|
if exists {
|
|
result[info.outputName] = value
|
|
} else {
|
|
result[info.outputName] = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// processSingleFieldFallback fallback processing for single field (when pre-compiled info is missing)
|
|
func (s *Stream) processSingleFieldFallback(fieldSpec string, dataMap map[string]interface{}, data interface{}, result map[string]interface{}) {
|
|
// Handle special case of SELECT *
|
|
if fieldSpec == "*" {
|
|
// SELECT *: return all fields, but skip fields already processed by expression fields
|
|
for k, v := range dataMap {
|
|
// If field already processed by expression field, skip, maintain expression calculation result
|
|
if _, isExpression := s.config.FieldExpressions[k]; !isExpression {
|
|
result[k] = v
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Handle alias
|
|
parts := strings.Split(fieldSpec, ":")
|
|
fieldName := parts[0]
|
|
outputName := fieldName
|
|
if len(parts) > 1 {
|
|
outputName = parts[1]
|
|
}
|
|
|
|
// Skip fields already processed by expression fields
|
|
if _, isExpression := s.config.FieldExpressions[outputName]; isExpression {
|
|
return
|
|
}
|
|
|
|
// Check if it's a function call
|
|
if strings.Contains(fieldName, "(") && strings.Contains(fieldName, ")") {
|
|
// Execute function call
|
|
if funcResult, err := s.executeFunction(fieldName, dataMap); err == nil {
|
|
result[outputName] = funcResult
|
|
} else {
|
|
logger.Error("Function execution error %s: %v", fieldName, err)
|
|
result[outputName] = nil
|
|
}
|
|
} else {
|
|
// Ordinary field - supports nested fields
|
|
var value interface{}
|
|
var exists bool
|
|
|
|
if fieldpath.IsNestedField(fieldName) {
|
|
value, exists = fieldpath.GetNestedField(data, fieldName)
|
|
} else {
|
|
value, exists = dataMap[fieldName]
|
|
}
|
|
|
|
if exists {
|
|
result[outputName] = value
|
|
} else {
|
|
result[outputName] = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// executeFunction executes function call
|
|
func (s *Stream) executeFunction(funcExpr string, data map[string]interface{}) (interface{}, error) {
|
|
// Check if it's a custom function
|
|
funcName := extractFunctionName(funcExpr)
|
|
if funcName != "" {
|
|
// Use function system directly
|
|
fn, exists := functions.Get(funcName)
|
|
if exists {
|
|
// Parse parameters
|
|
args, err := s.parseFunctionArgs(funcExpr, data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create function context
|
|
ctx := &functions.FunctionContext{Data: data}
|
|
|
|
// Execute function
|
|
return fn.Execute(ctx, args)
|
|
}
|
|
}
|
|
|
|
// For complex nested function calls, use ExprBridge directly
|
|
// This avoids the float64 type limitation of Expression.Evaluate
|
|
bridge := functions.GetExprBridge()
|
|
result, err := bridge.EvaluateExpression(funcExpr, data)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("evaluate function expression failed: %w", err)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// extractFunctionName extracts function name from expression
|
|
func extractFunctionName(expr string) string {
|
|
parenIndex := strings.Index(expr, "(")
|
|
if parenIndex == -1 {
|
|
return ""
|
|
}
|
|
funcName := strings.TrimSpace(expr[:parenIndex])
|
|
if strings.ContainsAny(funcName, " +-*/=<>!&|") {
|
|
return ""
|
|
}
|
|
return funcName
|
|
}
|
|
|
|
// parseFunctionArgs parses function arguments, supports nested function calls
|
|
func (s *Stream) parseFunctionArgs(funcExpr string, data map[string]interface{}) ([]interface{}, error) {
|
|
// Extract parameters within parentheses
|
|
start := strings.Index(funcExpr, "(")
|
|
end := strings.LastIndex(funcExpr, ")")
|
|
if start == -1 || end == -1 || end <= start {
|
|
return nil, fmt.Errorf("invalid function expression: %s", funcExpr)
|
|
}
|
|
|
|
argsStr := strings.TrimSpace(funcExpr[start+1 : end])
|
|
if argsStr == "" {
|
|
return []interface{}{}, nil
|
|
}
|
|
|
|
// Smart split arguments, handle nested functions and quotes
|
|
argParts, err := s.smartSplitArgs(argsStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
args := make([]interface{}, len(argParts))
|
|
|
|
for i, arg := range argParts {
|
|
arg = strings.TrimSpace(arg)
|
|
|
|
// If parameter is string constant (enclosed in quotes)
|
|
if strings.HasPrefix(arg, "'") && strings.HasSuffix(arg, "'") {
|
|
args[i] = strings.Trim(arg, "'")
|
|
} else if strings.HasPrefix(arg, "\"") && strings.HasSuffix(arg, "\"") {
|
|
args[i] = strings.Trim(arg, "\"")
|
|
} else if strings.Contains(arg, "(") {
|
|
// If parameter contains function call, execute recursively
|
|
result, err := s.executeFunction(arg, data)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to execute nested function '%s': %v", arg, err)
|
|
}
|
|
args[i] = result
|
|
} else if value, exists := data[arg]; exists {
|
|
// If it's a data field
|
|
args[i] = value
|
|
} else {
|
|
// Try to parse as number
|
|
if val, err := strconv.ParseFloat(arg, 64); err == nil {
|
|
args[i] = val
|
|
} else {
|
|
args[i] = arg
|
|
}
|
|
}
|
|
}
|
|
|
|
return args, nil
|
|
}
|
|
|
|
// smartSplitArgs intelligently splits arguments, considering bracket nesting and quotes
|
|
func (s *Stream) smartSplitArgs(argsStr string) ([]string, error) {
|
|
var args []string
|
|
var current strings.Builder
|
|
parenDepth := 0
|
|
inQuotes := false
|
|
quoteChar := byte(0)
|
|
|
|
for i := 0; i < len(argsStr); i++ {
|
|
ch := argsStr[i]
|
|
|
|
switch ch {
|
|
case '\'':
|
|
if !inQuotes {
|
|
inQuotes = true
|
|
quoteChar = ch
|
|
} else if quoteChar == ch {
|
|
inQuotes = false
|
|
quoteChar = 0
|
|
}
|
|
current.WriteByte(ch)
|
|
case '"':
|
|
if !inQuotes {
|
|
inQuotes = true
|
|
quoteChar = ch
|
|
} else if quoteChar == ch {
|
|
inQuotes = false
|
|
quoteChar = 0
|
|
}
|
|
current.WriteByte(ch)
|
|
case '(':
|
|
if !inQuotes {
|
|
parenDepth++
|
|
}
|
|
current.WriteByte(ch)
|
|
case ')':
|
|
if !inQuotes {
|
|
parenDepth--
|
|
}
|
|
current.WriteByte(ch)
|
|
case ',':
|
|
if !inQuotes && parenDepth == 0 {
|
|
// Found parameter separator
|
|
args = append(args, strings.TrimSpace(current.String()))
|
|
current.Reset()
|
|
} else {
|
|
current.WriteByte(ch)
|
|
}
|
|
default:
|
|
current.WriteByte(ch)
|
|
}
|
|
}
|
|
|
|
// Add the last parameter
|
|
if current.Len() > 0 {
|
|
args = append(args, strings.TrimSpace(current.String()))
|
|
}
|
|
|
|
return args, nil
|
|
}
|