mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-14 14:27:27 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ba2cdd5629 | |||
| 47b7e07c6b | |||
| 9739f213e9 |
@@ -131,7 +131,25 @@ func validateBasicSyntax(exprStr string) error {
|
||||
}
|
||||
|
||||
// Check for invalid characters
|
||||
inQuotes := false
|
||||
var quoteChar rune
|
||||
|
||||
for i, ch := range trimmed {
|
||||
// Handle quotes
|
||||
if ch == '\'' || ch == '"' {
|
||||
if !inQuotes {
|
||||
inQuotes = true
|
||||
quoteChar = ch
|
||||
} else if ch == quoteChar {
|
||||
inQuotes = false
|
||||
}
|
||||
}
|
||||
|
||||
// If inside quotes, skip character validation
|
||||
if inQuotes {
|
||||
continue
|
||||
}
|
||||
|
||||
// Allowed characters: letters, numbers, operators, parentheses, dots, underscores, spaces, quotes
|
||||
if !isValidChar(ch) {
|
||||
return fmt.Errorf("invalid character '%c' at position %d", ch, i)
|
||||
|
||||
@@ -61,7 +61,32 @@ func (s *Stream) compileSimpleFieldInfo(fieldSpec string) *fieldProcessInfo {
|
||||
}
|
||||
|
||||
// Parse alias
|
||||
parts := strings.Split(fieldSpec, ":")
|
||||
var parts []string
|
||||
// Helper to split field spec considering quotes
|
||||
splitFieldSpec := func(spec string) []string {
|
||||
inQuote := false
|
||||
var quoteChar byte
|
||||
for i := 0; i < len(spec); i++ {
|
||||
c := spec[i]
|
||||
if inQuote {
|
||||
if c == quoteChar {
|
||||
inQuote = false
|
||||
}
|
||||
} else {
|
||||
if c == '\'' || c == '"' || c == '`' {
|
||||
inQuote = true
|
||||
quoteChar = c
|
||||
} else if c == ':' {
|
||||
// Found separator
|
||||
return []string{spec[:i], spec[i+1:]}
|
||||
}
|
||||
}
|
||||
}
|
||||
// No separator found outside quotes
|
||||
return []string{spec}
|
||||
}
|
||||
|
||||
parts = splitFieldSpec(fieldSpec)
|
||||
info.fieldName = parts[0]
|
||||
// Remove backticks from field name
|
||||
if len(info.fieldName) >= 2 && info.fieldName[0] == '`' && info.fieldName[len(info.fieldName)-1] == '`' {
|
||||
@@ -398,7 +423,32 @@ func (s *Stream) processSingleFieldFallback(fieldSpec string, dataMap map[string
|
||||
}
|
||||
|
||||
// Handle alias
|
||||
parts := strings.Split(fieldSpec, ":")
|
||||
var parts []string
|
||||
// Helper to split field spec considering quotes
|
||||
splitFieldSpec := func(spec string) []string {
|
||||
inQuote := false
|
||||
var quoteChar byte
|
||||
for i := 0; i < len(spec); i++ {
|
||||
c := spec[i]
|
||||
if inQuote {
|
||||
if c == quoteChar {
|
||||
inQuote = false
|
||||
}
|
||||
} else {
|
||||
if c == '\'' || c == '"' || c == '`' {
|
||||
inQuote = true
|
||||
quoteChar = c
|
||||
} else if c == ':' {
|
||||
// Found separator
|
||||
return []string{spec[:i], spec[i+1:]}
|
||||
}
|
||||
}
|
||||
}
|
||||
// No separator found outside quotes
|
||||
return []string{spec}
|
||||
}
|
||||
|
||||
parts = splitFieldSpec(fieldSpec)
|
||||
fieldName := parts[0]
|
||||
outputName := fieldName
|
||||
if len(parts) > 1 {
|
||||
|
||||
@@ -658,16 +658,13 @@ func (sw *SlidingWindow) Trigger() {
|
||||
}
|
||||
|
||||
// Retain data that could be in future windows
|
||||
// For sliding windows, we need to keep data that falls within:
|
||||
// - Current window end + size (for overlapping windows)
|
||||
// - Next window end + size (for future windows)
|
||||
// Actually, we should keep all data that could be in any future window
|
||||
// The latest window that could contain a data point is: next.End + size
|
||||
cutoffTime := next.End.Add(sw.size)
|
||||
// For sliding windows, we need to keep data that falls within future windows
|
||||
// Future windows start at next.Start or later (next.Start + k * slide)
|
||||
// So any data with timestamp < next.Start cannot be in any future window
|
||||
newData := make([]types.Row, 0)
|
||||
for _, item := range sw.data {
|
||||
// Keep data that could be in future windows (before cutoffTime)
|
||||
if item.Timestamp.Before(cutoffTime) {
|
||||
// Keep data that could be in future windows (>= next.Start)
|
||||
if !item.Timestamp.Before(*next.Start) {
|
||||
newData = append(newData, item)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user