mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-22 10:05:20 +00:00
feat:Add like pattern matching syntax
This commit is contained in:
@@ -36,6 +36,99 @@ go get github.com/rulego/streamsql
|
||||
|
||||
## Usage
|
||||
|
||||
StreamSQL supports two main processing modes for different business scenarios:
|
||||
|
||||
### Non-Aggregation Mode - Real-time Data Transformation and Filtering
|
||||
|
||||
Suitable for scenarios requiring **real-time response** and **low latency**, where each data record is processed and output immediately.
|
||||
|
||||
**Typical Use Cases:**
|
||||
- **Data Cleaning**: Clean and standardize dirty data from IoT devices
|
||||
- **Real-time Alerting**: Monitor key metrics and alert immediately when thresholds are exceeded
|
||||
- **Data Enrichment**: Add calculated fields and business labels to raw data
|
||||
- **Format Conversion**: Convert data to formats required by downstream systems
|
||||
- **Data Routing**: Route data to different processing channels based on content
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"github.com/rulego/streamsql"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Create StreamSQL instance
|
||||
ssql := streamsql.New()
|
||||
defer ssql.Stop()
|
||||
|
||||
// Non-aggregation SQL: Real-time data transformation and filtering
|
||||
// Feature: Each input data is processed immediately, no need to wait for windows
|
||||
rsql := `SELECT deviceId,
|
||||
UPPER(deviceType) as device_type,
|
||||
temperature * 1.8 + 32 as temp_fahrenheit,
|
||||
CASE WHEN temperature > 30 THEN 'hot'
|
||||
WHEN temperature < 15 THEN 'cold'
|
||||
ELSE 'normal' END as temp_category,
|
||||
CONCAT(location, '-', deviceId) as full_identifier,
|
||||
NOW() as processed_time
|
||||
FROM stream
|
||||
WHERE temperature > 0 AND STARTSWITH(deviceId, 'sensor')`
|
||||
|
||||
err := ssql.Execute(rsql)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Handle real-time transformation results
|
||||
ssql.Stream().AddSink(func(result interface{}) {
|
||||
fmt.Printf("Real-time result: %+v\n", result)
|
||||
})
|
||||
|
||||
// Simulate sensor data input
|
||||
sensorData := []map[string]interface{}{
|
||||
{
|
||||
"deviceId": "sensor001",
|
||||
"deviceType": "temperature",
|
||||
"temperature": 25.0,
|
||||
"location": "warehouse-A",
|
||||
},
|
||||
{
|
||||
"deviceId": "sensor002",
|
||||
"deviceType": "humidity",
|
||||
"temperature": 32.5,
|
||||
"location": "warehouse-B",
|
||||
},
|
||||
{
|
||||
"deviceId": "pump001", // Will be filtered out
|
||||
"deviceType": "actuator",
|
||||
"temperature": 20.0,
|
||||
"location": "factory",
|
||||
},
|
||||
}
|
||||
|
||||
// Process data one by one, each will output results immediately
|
||||
for _, data := range sensorData {
|
||||
ssql.Stream().AddData(data)
|
||||
time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // Wait for processing completion
|
||||
}
|
||||
```
|
||||
|
||||
### Aggregation Mode - Windowed Statistical Analysis
|
||||
|
||||
Suitable for scenarios requiring **statistical analysis** and **batch processing**, collecting data over a period of time for aggregated computation.
|
||||
|
||||
**Typical Use Cases:**
|
||||
- **Monitoring Dashboard**: Display real-time statistical charts of device operational status
|
||||
- **Performance Analysis**: Analyze key metrics like QPS, latency, etc.
|
||||
- **Anomaly Detection**: Detect data anomalies based on statistical models
|
||||
- **Report Generation**: Generate various business reports periodically
|
||||
- **Trend Analysis**: Analyze data trends and patterns
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
@@ -144,6 +237,152 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
### 🔍 Pattern Matching Notes
|
||||
|
||||
**Note**: StreamSQL currently does not support standard SQL `LIKE` syntax, but provides more powerful alternatives:
|
||||
|
||||
- **Prefix matching**: `STARTSWITH(field, 'prefix')` replaces `field LIKE 'prefix%'`
|
||||
- **Suffix matching**: `ENDSWITH(field, 'suffix')` replaces `field LIKE '%suffix'`
|
||||
- **Contains matching**: `INDEXOF(field, 'substring') >= 0` replaces `field LIKE '%substring%'`
|
||||
- **Regex matching**: `REGEXP_MATCHES(field, '^pattern$')` supports complex pattern matching
|
||||
|
||||
**Examples**:
|
||||
```sql
|
||||
-- Replace: deviceId LIKE 'sensor%'
|
||||
WHERE STARTSWITH(deviceId, 'sensor')
|
||||
|
||||
-- Replace: message LIKE '%error%'
|
||||
WHERE INDEXOF(message, 'error') >= 0
|
||||
|
||||
-- Replace complex pattern: deviceId LIKE 'sensor[0-9]+'
|
||||
WHERE REGEXP_MATCHES(deviceId, '^sensor[0-9]+$')
|
||||
```
|
||||
|
||||
### Non-Aggregation Scenarios
|
||||
|
||||
StreamSQL supports real-time data transformation and filtering without aggregation operations. This mode provides immediate processing and output for each input record, making it ideal for data cleaning, enrichment, and real-time filtering.
|
||||
|
||||
```go
|
||||
// Real-time data transformation and filtering example
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"github.com/rulego/streamsql"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ssql := streamsql.New()
|
||||
defer ssql.Stop()
|
||||
|
||||
// Non-aggregation SQL - immediate data transformation
|
||||
rsql := `SELECT deviceId,
|
||||
upper(deviceType) as device_type,
|
||||
temperature * 1.8 + 32 as temp_fahrenheit,
|
||||
concat(location, '-', deviceId) as full_location,
|
||||
now() as processed_time
|
||||
FROM stream
|
||||
WHERE temperature > 0 AND deviceId LIKE 'sensor%'`
|
||||
|
||||
err := ssql.Execute(rsql)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Handle real-time transformation results
|
||||
ssql.Stream().AddSink(func(result interface{}) {
|
||||
fmt.Printf("Transformed data: %+v\n", result)
|
||||
})
|
||||
|
||||
// Input raw data
|
||||
rawData := []map[string]interface{}{
|
||||
{
|
||||
"deviceId": "sensor001",
|
||||
"deviceType": "temperature",
|
||||
"temperature": 25.0,
|
||||
"humidity": 60,
|
||||
"location": "warehouse-A",
|
||||
},
|
||||
{
|
||||
"deviceId": "sensor002",
|
||||
"deviceType": "humidity",
|
||||
"temperature": 22.5,
|
||||
"humidity": 55,
|
||||
"location": "warehouse-B",
|
||||
},
|
||||
{
|
||||
"deviceId": "pump001", // Will be filtered out
|
||||
"deviceType": "actuator",
|
||||
"temperature": 30.0,
|
||||
"location": "factory",
|
||||
},
|
||||
}
|
||||
|
||||
// Each data record is processed immediately
|
||||
for _, data := range rawData {
|
||||
ssql.Stream().AddData(data)
|
||||
time.Sleep(100 * time.Millisecond) // Simulate real-time arrival
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // Wait for processing
|
||||
}
|
||||
```
|
||||
|
||||
#### Use Cases for Non-Aggregation Scenarios
|
||||
|
||||
**1. Real-time Data Cleaning and Validation**
|
||||
```sql
|
||||
-- Filter invalid records and normalize data formats
|
||||
SELECT deviceId,
|
||||
CAST(temperature AS FLOAT) as temperature,
|
||||
LOWER(status) as status,
|
||||
COALESCE(location, 'unknown') as location
|
||||
FROM stream
|
||||
WHERE temperature IS NOT NULL AND deviceId != ''
|
||||
```
|
||||
|
||||
**2. Data Enrichment and Transformation**
|
||||
```sql
|
||||
-- Add calculated fields and enrichment
|
||||
SELECT *,
|
||||
temperature * 1.8 + 32 as temp_fahrenheit,
|
||||
CASE WHEN temperature > 30 THEN 'hot'
|
||||
WHEN temperature < 10 THEN 'cold'
|
||||
ELSE 'normal' END as temp_category,
|
||||
FORMAT(humidity, 2) as formatted_humidity
|
||||
FROM stream
|
||||
```
|
||||
|
||||
**3. Real-time Alerting and Monitoring**
|
||||
```sql
|
||||
-- Filter critical events for immediate alerting
|
||||
SELECT deviceId, temperature, humidity, now() as alert_time
|
||||
FROM stream
|
||||
WHERE temperature > 50 OR humidity < 10
|
||||
```
|
||||
|
||||
**4. Data Format Conversion**
|
||||
```sql
|
||||
-- Convert data format for downstream systems
|
||||
SELECT TO_JSON(MAP(
|
||||
'id', deviceId,
|
||||
'metrics', MAP('temp', temperature, 'hum', humidity),
|
||||
'meta', MAP('location', location, 'type', deviceType)
|
||||
)) as json_output
|
||||
FROM stream
|
||||
```
|
||||
|
||||
**5. Real-time Data Routing**
|
||||
```sql
|
||||
-- Route data based on conditions
|
||||
SELECT *,
|
||||
CASE WHEN deviceType = 'sensor' THEN 'sensor_topic'
|
||||
WHEN deviceType = 'actuator' THEN 'actuator_topic'
|
||||
ELSE 'default_topic' END as routing_key
|
||||
FROM stream
|
||||
```
|
||||
|
||||
### Nested Field Access
|
||||
|
||||
StreamSQL supports querying nested structured data using dot notation (`.`) syntax to access nested fields:
|
||||
@@ -204,18 +443,22 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
**Nested Field Access Features:**
|
||||
- Support dot notation syntax: `device.info.name`, `sensor.temperature`
|
||||
- Can be used in all SQL clauses: SELECT, WHERE, GROUP BY
|
||||
- Support aggregate functions: `AVG(sensor.temperature)`, `MAX(device.status.uptime)`
|
||||
- Backward compatible: existing flat field access methods remain unchanged
|
||||
|
||||
## Functions
|
||||
|
||||
StreamSQL supports a variety of function types, including mathematical, string, conversion, aggregate, analytic, window, and more. [Documentation](docs/FUNCTIONS_USAGE_GUIDE.md)
|
||||
|
||||
## Concepts
|
||||
|
||||
### Processing Modes
|
||||
|
||||
StreamSQL supports two main processing modes:
|
||||
|
||||
#### Aggregation Mode (Windowed Processing)
|
||||
Used when the SQL query contains aggregate functions (SUM, AVG, COUNT, etc.) or GROUP BY clauses. Data is collected in windows and aggregated results are output when windows are triggered.
|
||||
|
||||
#### Non-Aggregation Mode (Real-time Processing)
|
||||
Used for immediate data transformation and filtering without aggregation operations. Each input record is processed and output immediately, providing ultra-low latency for real-time scenarios like data cleaning, enrichment, and filtering.
|
||||
|
||||
### Windows
|
||||
|
||||
Since stream data is unbounded, it cannot be processed as a whole. Windows provide a mechanism to divide unbounded data into a series of bounded data segments for computation. StreamSQL includes the following types of windows:
|
||||
|
||||
+128
-8
@@ -1,8 +1,8 @@
|
||||
# StreamSQL
|
||||
[](https://pkg.go.dev/github.com/rulego/streamsql)
|
||||
[](https://goreportcard.com/report/github.com/rulego/streamsql)
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/release.yml)
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/release.yml)
|
||||
|
||||
[English](README.md)| 简体中文
|
||||
|
||||
@@ -39,6 +39,99 @@ go get github.com/rulego/streamsql
|
||||
|
||||
## 使用
|
||||
|
||||
StreamSQL支持两种主要的处理模式,适用于不同的业务场景:
|
||||
|
||||
### 非聚合模式 - 实时数据转换和过滤
|
||||
|
||||
适用于需要**实时响应**、**低延迟**的场景,每条数据立即处理并输出结果。
|
||||
|
||||
**典型应用场景:**
|
||||
- **数据清洗**:清理和标准化IoT设备上报的脏数据
|
||||
- **实时告警**:监控关键指标,超阈值立即告警
|
||||
- **数据富化**:为原始数据添加计算字段和业务标签
|
||||
- **格式转换**:将数据转换为下游系统需要的格式
|
||||
- **数据路由**:根据内容将数据路由到不同的处理通道
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"github.com/rulego/streamsql"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 创建StreamSQL实例
|
||||
ssql := streamsql.New()
|
||||
defer ssql.Stop()
|
||||
|
||||
// 非聚合SQL:实时数据转换和过滤
|
||||
// 特点:每条输入数据立即处理,无需等待窗口
|
||||
rsql := `SELECT deviceId,
|
||||
UPPER(deviceType) as device_type,
|
||||
temperature * 1.8 + 32 as temp_fahrenheit,
|
||||
CASE WHEN temperature > 30 THEN 'hot'
|
||||
WHEN temperature < 15 THEN 'cold'
|
||||
ELSE 'normal' END as temp_category,
|
||||
CONCAT(location, '-', deviceId) as full_identifier,
|
||||
NOW() as processed_time
|
||||
FROM stream
|
||||
WHERE temperature > 0 AND deviceId LIKE 'sensor%'`
|
||||
|
||||
err := ssql.Execute(rsql)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// 处理实时转换结果
|
||||
ssql.Stream().AddSink(func(result interface{}) {
|
||||
fmt.Printf("实时处理结果: %+v\n", result)
|
||||
})
|
||||
|
||||
// 模拟传感器数据输入
|
||||
sensorData := []map[string]interface{}{
|
||||
{
|
||||
"deviceId": "sensor001",
|
||||
"deviceType": "temperature",
|
||||
"temperature": 25.0,
|
||||
"location": "warehouse-A",
|
||||
},
|
||||
{
|
||||
"deviceId": "sensor002",
|
||||
"deviceType": "humidity",
|
||||
"temperature": 32.5,
|
||||
"location": "warehouse-B",
|
||||
},
|
||||
{
|
||||
"deviceId": "pump001", // 会被过滤掉
|
||||
"deviceType": "actuator",
|
||||
"temperature": 20.0,
|
||||
"location": "factory",
|
||||
},
|
||||
}
|
||||
|
||||
// 逐条处理数据,每条都会立即输出结果
|
||||
for _, data := range sensorData {
|
||||
ssql.Stream().AddData(data)
|
||||
time.Sleep(100 * time.Millisecond) // 模拟实时数据到达
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // 等待处理完成
|
||||
}
|
||||
```
|
||||
|
||||
### 聚合模式 - 窗口统计分析
|
||||
|
||||
适用于需要**统计分析**、**批量处理**的场景,收集一段时间内的数据进行聚合计算。
|
||||
|
||||
**典型应用场景:**
|
||||
- **监控大屏**:展示设备运行状态的实时统计图表
|
||||
- **性能分析**:分析系统的QPS、延迟等关键指标
|
||||
- **异常检测**:基于统计模型检测数据异常
|
||||
- **报表生成**:定时生成各种业务报表
|
||||
- **趋势分析**:分析数据的变化趋势和规律
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
@@ -160,6 +253,39 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
### 🔍 模式匹配功能
|
||||
|
||||
StreamSQL 支持标准 SQL 的 `LIKE` 语法进行模式匹配:
|
||||
|
||||
- **前缀匹配**: `field LIKE 'prefix%'` - 匹配以指定前缀开头的字符串
|
||||
- **后缀匹配**: `field LIKE '%suffix'` - 匹配以指定后缀结尾的字符串
|
||||
- **包含匹配**: `field LIKE '%substring%'` - 匹配包含指定子字符串的字符串
|
||||
- **单字符通配符**: `field LIKE 'patte_n'` - `_` 匹配任意单个字符
|
||||
- **复杂模式**: `field LIKE 'prefix%suffix'` - 组合前缀和后缀匹配
|
||||
|
||||
**示例**:
|
||||
```sql
|
||||
-- 前缀匹配:查找以'sensor'开头的设备ID
|
||||
WHERE deviceId LIKE 'sensor%'
|
||||
|
||||
-- 后缀匹配:查找以'error'结尾的消息
|
||||
WHERE message LIKE '%error'
|
||||
|
||||
-- 包含匹配:查找包含'alert'的日志
|
||||
WHERE logMessage LIKE '%alert%'
|
||||
|
||||
-- 单字符通配符:匹配三位数错误代码如E01, E02等
|
||||
WHERE errorCode LIKE 'E_0'
|
||||
|
||||
-- 复杂模式:匹配log_开头.log结尾的文件
|
||||
WHERE filename LIKE 'log_%.log'
|
||||
```
|
||||
|
||||
**兼容的字符串函数**:
|
||||
- `STARTSWITH(field, 'prefix')` - 等价于 `field LIKE 'prefix%'`
|
||||
- `ENDSWITH(field, 'suffix')` - 等价于 `field LIKE '%suffix'`
|
||||
- `REGEXP_MATCHES(field, '^pattern$')` - 支持更复杂的正则表达式匹配
|
||||
|
||||
### 嵌套字段访问
|
||||
|
||||
StreamSQL 还支持对嵌套结构数据进行查询,可以使用点号(`.`)语法访问嵌套字段:
|
||||
@@ -220,12 +346,6 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
**嵌套字段访问特性:**
|
||||
- 支持点号语法:`device.info.name`、`sensor.temperature`
|
||||
- 可用于 SELECT、WHERE、GROUP BY 等所有 SQL 子句
|
||||
- 支持聚合函数:`AVG(sensor.temperature)`、`MAX(device.status.uptime)`
|
||||
- 向后兼容:现有平坦字段访问方式保持不变
|
||||
|
||||
## 函数
|
||||
|
||||
StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合、分析、窗口等上百个函数。[文档](docs/FUNCTIONS_USAGE_GUIDE.md)
|
||||
|
||||
+72
-1
@@ -1,6 +1,8 @@
|
||||
package condition
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/expr-lang/expr"
|
||||
"github.com/expr-lang/expr/vm"
|
||||
)
|
||||
@@ -14,7 +16,24 @@ type ExprCondition struct {
|
||||
}
|
||||
|
||||
func NewExprCondition(expression string) (Condition, error) {
|
||||
program, err := expr.Compile(expression)
|
||||
// 添加自定义字符串函数支持(startsWith、endsWith、contains是内置操作符)
|
||||
options := []expr.Option{
|
||||
expr.Function("like_match", func(params ...any) (any, error) {
|
||||
if len(params) != 2 {
|
||||
return false, fmt.Errorf("like_match function requires 2 parameters")
|
||||
}
|
||||
text, ok1 := params[0].(string)
|
||||
pattern, ok2 := params[1].(string)
|
||||
if !ok1 || !ok2 {
|
||||
return false, fmt.Errorf("like_match function requires string parameters")
|
||||
}
|
||||
return matchesLikePattern(text, pattern), nil
|
||||
}),
|
||||
expr.AllowUndefinedVariables(),
|
||||
expr.AsBool(),
|
||||
}
|
||||
|
||||
program, err := expr.Compile(expression, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -28,3 +47,55 @@ func (ec *ExprCondition) Evaluate(env interface{}) bool {
|
||||
}
|
||||
return result.(bool)
|
||||
}
|
||||
|
||||
// matchesLikePattern 实现LIKE模式匹配
|
||||
// 支持%(匹配任意字符序列)和_(匹配单个字符)
|
||||
func matchesLikePattern(text, pattern string) bool {
|
||||
return likeMatch(text, pattern, 0, 0)
|
||||
}
|
||||
|
||||
// likeMatch 递归实现LIKE匹配算法
|
||||
func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
|
||||
// 如果模式已经匹配完成
|
||||
if patternIndex >= len(pattern) {
|
||||
return textIndex >= len(text) // 文本也应该匹配完成
|
||||
}
|
||||
|
||||
// 如果文本已经结束,但模式还有非%字符,则不匹配
|
||||
if textIndex >= len(text) {
|
||||
// 检查剩余的模式是否都是%
|
||||
for i := patternIndex; i < len(pattern); i++ {
|
||||
if pattern[i] != '%' {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// 处理当前模式字符
|
||||
patternChar := pattern[patternIndex]
|
||||
|
||||
if patternChar == '%' {
|
||||
// %可以匹配0个或多个字符
|
||||
// 尝试匹配0个字符(跳过%)
|
||||
if likeMatch(text, pattern, textIndex, patternIndex+1) {
|
||||
return true
|
||||
}
|
||||
// 尝试匹配1个或多个字符
|
||||
for i := textIndex; i < len(text); i++ {
|
||||
if likeMatch(text, pattern, i+1, patternIndex+1) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
} else if patternChar == '_' {
|
||||
// _匹配恰好一个字符
|
||||
return likeMatch(text, pattern, textIndex+1, patternIndex+1)
|
||||
} else {
|
||||
// 普通字符必须精确匹配
|
||||
if text[textIndex] == patternChar {
|
||||
return likeMatch(text, pattern, textIndex+1, patternIndex+1)
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,122 @@
|
||||
# StreamSQL 非聚合场景使用示例
|
||||
|
||||
本示例展示了StreamSQL在非聚合场景中的各种应用,包括实时数据转换、过滤、清洗等功能。
|
||||
|
||||
## 运行示例
|
||||
|
||||
```bash
|
||||
cd examples/non-aggregation
|
||||
go run main.go
|
||||
```
|
||||
|
||||
## 场景说明
|
||||
|
||||
### 1. 实时数据清洗和标准化
|
||||
|
||||
**场景描述**: 对输入的脏数据进行清洗和标准化处理,包括:
|
||||
- 空值处理(COALESCE)
|
||||
- 字符串规范化(UPPER, TRIM)
|
||||
- 数值精度处理(ROUND)
|
||||
- 状态码转换(CASE WHEN)
|
||||
- 无效数据过滤(WHERE条件)
|
||||
|
||||
**适用场景**:
|
||||
- IoT设备数据清洗
|
||||
- 日志标准化处理
|
||||
- 数据质量保证
|
||||
|
||||
### 2. 数据富化和计算字段
|
||||
|
||||
**场景描述**: 基于原始数据计算和添加新字段,包括:
|
||||
- 单位转换(摄氏度转华氏度)
|
||||
- 分类标签生成(温度分级)
|
||||
- 字符串拼接(全标识符)
|
||||
- 时间戳添加
|
||||
- 比率计算
|
||||
|
||||
**适用场景**:
|
||||
- 数据预处理
|
||||
- 业务规则应用
|
||||
- 指标计算
|
||||
|
||||
### 3. 实时告警和事件过滤
|
||||
|
||||
**场景描述**: 实时检测异常数据并生成告警事件,包括:
|
||||
- 阈值检测
|
||||
- 告警级别分类
|
||||
- 告警消息生成
|
||||
- 时间戳记录
|
||||
|
||||
**适用场景**:
|
||||
- 监控系统
|
||||
- 异常检测
|
||||
- 实时告警
|
||||
|
||||
### 4. 数据格式转换
|
||||
|
||||
**场景描述**: 将数据转换为不同的格式,包括:
|
||||
- JSON格式输出
|
||||
- CSV格式输出
|
||||
- 自定义格式转换
|
||||
|
||||
**适用场景**:
|
||||
- 数据接口适配
|
||||
- 多系统集成
|
||||
- 数据导出
|
||||
|
||||
### 5. 基于条件的数据路由
|
||||
|
||||
**场景描述**: 根据数据内容决定数据的路由目标,包括:
|
||||
- 条件路由规则
|
||||
- 优先级分类
|
||||
- 主题分发
|
||||
|
||||
**适用场景**:
|
||||
- 消息队列路由
|
||||
- 数据分发
|
||||
- 负载均衡
|
||||
|
||||
### 6. 嵌套字段处理
|
||||
|
||||
**场景描述**: 处理复杂的嵌套JSON数据,包括:
|
||||
- 深层字段提取
|
||||
- 嵌套字段组合
|
||||
- 条件判断
|
||||
|
||||
**适用场景**:
|
||||
- JSON数据处理
|
||||
- 复杂数据结构解析
|
||||
- API数据转换
|
||||
|
||||
## 核心特性
|
||||
|
||||
### 实时处理
|
||||
- 每条数据立即处理,无需等待窗口
|
||||
- 超低延迟,适合实时场景
|
||||
- 支持高吞吐量数据流
|
||||
|
||||
### 丰富的函数支持
|
||||
- 字符串处理:UPPER, LOWER, TRIM, CONCAT, SUBSTRING等
|
||||
- 数学计算:ROUND, CAST, 算术运算等
|
||||
- 条件判断:CASE WHEN, COALESCE, IF等
|
||||
- 时间函数:NOW, DATE_FORMAT等
|
||||
- 类型转换:CAST, TO_JSON等
|
||||
|
||||
### 灵活的字段操作
|
||||
- 字段选择和别名
|
||||
- 嵌套字段访问(点号语法)
|
||||
- 计算字段生成
|
||||
- 表达式计算
|
||||
|
||||
### 强大的过滤能力
|
||||
- WHERE条件过滤
|
||||
- 复杂表达式支持
|
||||
- 多条件组合(AND, OR)
|
||||
- 模式匹配(LIKE 语法)
|
||||
|
||||
## 性能特点
|
||||
|
||||
- **低延迟**: 每条数据立即处理输出
|
||||
- **高吞吐**: 支持高频数据流
|
||||
- **内存友好**: 无需缓存数据,即时处理
|
||||
- **CPU高效**: 简单的数据转换操作
|
||||
File diff suppressed because it is too large
Load Diff
+58
-2
@@ -26,7 +26,7 @@ var operatorPrecedence = map[string]int{
|
||||
"OR": 1,
|
||||
"AND": 2,
|
||||
"==": 3, "=": 3, "!=": 3, "<>": 3,
|
||||
">": 4, "<": 4, ">=": 4, "<=": 4,
|
||||
">": 4, "<": 4, ">=": 4, "<=": 4, "LIKE": 4,
|
||||
"+": 5, "-": 5,
|
||||
"*": 6, "/": 6, "%": 6,
|
||||
"^": 7, // 幂运算
|
||||
@@ -760,6 +760,8 @@ func compareValues(left, right interface{}, operator string) (bool, error) {
|
||||
return leftStr < rightStr, nil
|
||||
case "<=":
|
||||
return leftStr <= rightStr, nil
|
||||
case "LIKE":
|
||||
return matchesLikePattern(leftStr, rightStr), nil
|
||||
default:
|
||||
return false, fmt.Errorf("unsupported string comparison operator: %s", operator)
|
||||
}
|
||||
@@ -791,6 +793,58 @@ func compareValues(left, right interface{}, operator string) (bool, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// matchesLikePattern 实现LIKE模式匹配
|
||||
// 支持%(匹配任意字符序列)和_(匹配单个字符)
|
||||
func matchesLikePattern(text, pattern string) bool {
|
||||
return likeMatch(text, pattern, 0, 0)
|
||||
}
|
||||
|
||||
// likeMatch 递归实现LIKE匹配算法
|
||||
func likeMatch(text, pattern string, textIndex, patternIndex int) bool {
|
||||
// 如果模式已经匹配完成
|
||||
if patternIndex >= len(pattern) {
|
||||
return textIndex >= len(text) // 文本也应该匹配完成
|
||||
}
|
||||
|
||||
// 如果文本已经结束,但模式还有非%字符,则不匹配
|
||||
if textIndex >= len(text) {
|
||||
// 检查剩余的模式是否都是%
|
||||
for i := patternIndex; i < len(pattern); i++ {
|
||||
if pattern[i] != '%' {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
switch pattern[patternIndex] {
|
||||
case '%':
|
||||
// %可以匹配0个或多个字符
|
||||
// 尝试匹配0个字符(跳过%)
|
||||
if likeMatch(text, pattern, textIndex, patternIndex+1) {
|
||||
return true
|
||||
}
|
||||
// 尝试匹配1个或多个字符
|
||||
for i := textIndex; i < len(text); i++ {
|
||||
if likeMatch(text, pattern, i+1, patternIndex+1) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
||||
case '_':
|
||||
// _匹配任意单个字符
|
||||
return likeMatch(text, pattern, textIndex+1, patternIndex+1)
|
||||
|
||||
default:
|
||||
// 普通字符必须精确匹配
|
||||
if text[textIndex] == pattern[patternIndex] {
|
||||
return likeMatch(text, pattern, textIndex+1, patternIndex+1)
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// convertToFloat 将值转换为float64
|
||||
func convertToFloat(val interface{}) (float64, error) {
|
||||
switch v := val.(type) {
|
||||
@@ -1013,7 +1067,7 @@ func parseExpression(tokens []string) (*ExprNode, error) {
|
||||
if isIdentifier(token) {
|
||||
// 检查是否是逻辑运算符关键字
|
||||
upperToken := strings.ToUpper(token)
|
||||
if upperToken == "AND" || upperToken == "OR" || upperToken == "NOT" {
|
||||
if upperToken == "AND" || upperToken == "OR" || upperToken == "NOT" || upperToken == "LIKE" {
|
||||
// 处理逻辑运算符
|
||||
for len(operators) > 0 && operators[len(operators)-1] != "(" &&
|
||||
operatorPrecedence[operators[len(operators)-1]] >= operatorPrecedence[upperToken] {
|
||||
@@ -1432,6 +1486,8 @@ func isOperator(s string) bool {
|
||||
return true
|
||||
case "AND", "OR", "NOT":
|
||||
return true
|
||||
case "LIKE":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
+157
-1
@@ -2,6 +2,7 @@ package functions
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -98,6 +99,16 @@ func (bridge *ExprBridge) CreateEnhancedExprEnvironment(data map[string]interfac
|
||||
env["streamsql_min"] = env["min"]
|
||||
env["streamsql_max"] = env["max"]
|
||||
|
||||
// 添加自定义的LIKE匹配函数
|
||||
env["like_match"] = func(text, pattern string) bool {
|
||||
return bridge.matchesLikePattern(text, pattern)
|
||||
}
|
||||
|
||||
// 添加自定义LIKE函数(startsWith、endsWith、contains是内置操作符,不需要在环境中添加)
|
||||
env["like_match"] = func(text, pattern string) bool {
|
||||
return bridge.matchesLikePattern(text, pattern)
|
||||
}
|
||||
|
||||
return env
|
||||
}
|
||||
|
||||
@@ -111,6 +122,21 @@ func (bridge *ExprBridge) CompileExpressionWithStreamSQLFunctions(expression str
|
||||
streamSQLOptions := bridge.RegisterStreamSQLFunctionsToExpr()
|
||||
options = append(options, streamSQLOptions...)
|
||||
|
||||
// 添加LIKE相关的自定义函数(只需要like_match,其他是内置操作符)
|
||||
options = append(options,
|
||||
expr.Function("like_match", func(params ...any) (any, error) {
|
||||
if len(params) != 2 {
|
||||
return false, fmt.Errorf("like_match function requires 2 parameters")
|
||||
}
|
||||
text, ok1 := params[0].(string)
|
||||
pattern, ok2 := params[1].(string)
|
||||
if !ok1 || !ok2 {
|
||||
return false, fmt.Errorf("like_match function requires string parameters")
|
||||
}
|
||||
return bridge.matchesLikePattern(text, pattern), nil
|
||||
}),
|
||||
)
|
||||
|
||||
// 启用一些有用的expr功能
|
||||
options = append(options,
|
||||
expr.AllowUndefinedVariables(), // 允许未定义变量
|
||||
@@ -122,7 +148,15 @@ func (bridge *ExprBridge) CompileExpressionWithStreamSQLFunctions(expression str
|
||||
|
||||
// EvaluateExpression 评估表达式,自动选择最合适的引擎
|
||||
func (bridge *ExprBridge) EvaluateExpression(expression string, data map[string]interface{}) (interface{}, error) {
|
||||
// 首先检查是否包含字符串拼接模式
|
||||
// 首先检查是否包含LIKE操作符,如果有则进行预处理
|
||||
if bridge.ContainsLikeOperator(expression) {
|
||||
processedExpr, err := bridge.PreprocessLikeExpression(expression)
|
||||
if err == nil {
|
||||
expression = processedExpr
|
||||
}
|
||||
}
|
||||
|
||||
// 检查是否包含字符串拼接模式
|
||||
if bridge.isStringConcatenationExpression(expression, data) {
|
||||
result, err := bridge.evaluateStringConcatenation(expression, data)
|
||||
if err == nil {
|
||||
@@ -309,6 +343,128 @@ func (bridge *ExprBridge) evaluateSimpleNumericExpression(expression string, dat
|
||||
return nil, fmt.Errorf("unsupported expression: %s", expression)
|
||||
}
|
||||
|
||||
// ContainsLikeOperator 检查表达式是否包含LIKE操作符
|
||||
func (bridge *ExprBridge) ContainsLikeOperator(expression string) bool {
|
||||
// 简单检查是否包含LIKE关键字
|
||||
upperExpr := strings.ToUpper(expression)
|
||||
return strings.Contains(upperExpr, " LIKE ")
|
||||
}
|
||||
|
||||
// PreprocessLikeExpression 预处理LIKE表达式,转换为expr-lang可理解的函数调用
|
||||
func (bridge *ExprBridge) PreprocessLikeExpression(expression string) (string, error) {
|
||||
// 使用正则表达式匹配LIKE模式
|
||||
// 匹配: field LIKE 'pattern' (允许空模式)
|
||||
likePattern := `(\w+(?:\.\w+)*)\s+LIKE\s+'([^']*)'`
|
||||
re, err := regexp.Compile(likePattern)
|
||||
if err != nil {
|
||||
return expression, err
|
||||
}
|
||||
|
||||
// 替换所有LIKE表达式
|
||||
result := re.ReplaceAllStringFunc(expression, func(match string) string {
|
||||
submatches := re.FindStringSubmatch(match)
|
||||
if len(submatches) != 3 {
|
||||
return match // 保持原样
|
||||
}
|
||||
|
||||
field := submatches[1]
|
||||
pattern := submatches[2]
|
||||
|
||||
// 将LIKE模式转换为相应的函数调用
|
||||
return bridge.convertLikeToFunction(field, pattern)
|
||||
})
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// convertLikeToFunction 将LIKE模式转换为expr-lang操作符
|
||||
func (bridge *ExprBridge) convertLikeToFunction(field, pattern string) string {
|
||||
// 处理空模式
|
||||
if pattern == "" {
|
||||
return fmt.Sprintf("%s == ''", field)
|
||||
}
|
||||
|
||||
// 分析模式类型
|
||||
if strings.HasPrefix(pattern, "%") && strings.HasSuffix(pattern, "%") && len(pattern) > 1 {
|
||||
// %pattern% -> contains操作符(但不是单独的%)
|
||||
inner := strings.Trim(pattern, "%")
|
||||
if inner == "" {
|
||||
// %% 表示匹配任何字符串
|
||||
return "true"
|
||||
}
|
||||
return fmt.Sprintf("%s contains '%s'", field, inner)
|
||||
} else if strings.HasPrefix(pattern, "%") && len(pattern) > 1 {
|
||||
// %pattern -> endsWith操作符
|
||||
suffix := strings.TrimPrefix(pattern, "%")
|
||||
return fmt.Sprintf("%s endsWith '%s'", field, suffix)
|
||||
} else if strings.HasSuffix(pattern, "%") && len(pattern) > 1 {
|
||||
// pattern% -> startsWith操作符
|
||||
prefix := strings.TrimSuffix(pattern, "%")
|
||||
return fmt.Sprintf("%s startsWith '%s'", field, prefix)
|
||||
} else if pattern == "%" {
|
||||
// 单独的%匹配任何字符串
|
||||
return "true"
|
||||
} else if strings.Contains(pattern, "%") || strings.Contains(pattern, "_") {
|
||||
// 复杂模式(如prefix%suffix)或包含单字符通配符,使用自定义的like_match函数
|
||||
return fmt.Sprintf("like_match(%s, '%s')", field, pattern)
|
||||
} else {
|
||||
// 精确匹配
|
||||
return fmt.Sprintf("%s == '%s'", field, pattern)
|
||||
}
|
||||
}
|
||||
|
||||
// matchesLikePattern 实现LIKE模式匹配
|
||||
// 支持%(匹配任意字符序列)和_(匹配单个字符)
|
||||
func (bridge *ExprBridge) matchesLikePattern(text, pattern string) bool {
|
||||
return bridge.likeMatch(text, pattern, 0, 0)
|
||||
}
|
||||
|
||||
// likeMatch 递归实现LIKE匹配算法
|
||||
func (bridge *ExprBridge) likeMatch(text, pattern string, textIndex, patternIndex int) bool {
|
||||
// 如果模式已经匹配完成
|
||||
if patternIndex >= len(pattern) {
|
||||
return textIndex >= len(text) // 文本也应该匹配完成
|
||||
}
|
||||
|
||||
// 如果文本已经结束,但模式还有非%字符,则不匹配
|
||||
if textIndex >= len(text) {
|
||||
// 检查剩余的模式是否都是%
|
||||
for i := patternIndex; i < len(pattern); i++ {
|
||||
if pattern[i] != '%' {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// 处理当前模式字符
|
||||
patternChar := pattern[patternIndex]
|
||||
|
||||
if patternChar == '%' {
|
||||
// %可以匹配0个或多个字符
|
||||
// 尝试匹配0个字符(跳过%)
|
||||
if bridge.likeMatch(text, pattern, textIndex, patternIndex+1) {
|
||||
return true
|
||||
}
|
||||
// 尝试匹配1个或多个字符
|
||||
for i := textIndex; i < len(text); i++ {
|
||||
if bridge.likeMatch(text, pattern, i+1, patternIndex+1) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
} else if patternChar == '_' {
|
||||
// _匹配恰好一个字符
|
||||
return bridge.likeMatch(text, pattern, textIndex+1, patternIndex+1)
|
||||
} else {
|
||||
// 普通字符必须精确匹配
|
||||
if text[textIndex] == patternChar {
|
||||
return bridge.likeMatch(text, pattern, textIndex+1, patternIndex+1)
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// toFloat64 将值转换为float64
|
||||
func (bridge *ExprBridge) toFloat64(val interface{}) (float64, error) {
|
||||
switch v := val.(type) {
|
||||
|
||||
@@ -44,6 +44,7 @@ const (
|
||||
TokenDISTINCT
|
||||
TokenLIMIT
|
||||
TokenHAVING
|
||||
TokenLIKE
|
||||
// CASE表达式相关token
|
||||
TokenCASE
|
||||
TokenWHEN
|
||||
@@ -336,6 +337,8 @@ func (l *Lexer) lookupIdent(ident string) Token {
|
||||
return Token{Type: TokenLIMIT, Value: ident}
|
||||
case "HAVING":
|
||||
return Token{Type: TokenHAVING, Value: ident}
|
||||
case "LIKE":
|
||||
return Token{Type: TokenLIKE, Value: ident}
|
||||
// CASE表达式相关关键字
|
||||
case "CASE":
|
||||
return Token{Type: TokenCASE, Value: ident}
|
||||
|
||||
@@ -382,6 +382,8 @@ func (p *Parser) parseWhere(stmt *SelectStatement) error {
|
||||
conditions = append(conditions, "&&")
|
||||
case TokenOR:
|
||||
conditions = append(conditions, "||")
|
||||
case TokenLIKE:
|
||||
conditions = append(conditions, "LIKE")
|
||||
default:
|
||||
// 处理字符串值的引号
|
||||
if len(conditions) > 0 && conditions[len(conditions)-1] == "'" {
|
||||
@@ -826,6 +828,8 @@ func (p *Parser) parseHaving(stmt *SelectStatement) error {
|
||||
conditions = append(conditions, "&&")
|
||||
case TokenOR:
|
||||
conditions = append(conditions, "||")
|
||||
case TokenLIKE:
|
||||
conditions = append(conditions, "LIKE")
|
||||
default:
|
||||
// 处理字符串值的引号
|
||||
if len(conditions) > 0 && conditions[len(conditions)-1] == "'" {
|
||||
|
||||
+21
-2
@@ -192,7 +192,17 @@ func (s *Stream) RegisterFilter(conditionStr string) error {
|
||||
if strings.TrimSpace(conditionStr) == "" {
|
||||
return nil
|
||||
}
|
||||
filter, err := condition.NewExprCondition(conditionStr)
|
||||
|
||||
// 预处理LIKE语法,转换为expr-lang可理解的形式
|
||||
processedCondition := conditionStr
|
||||
bridge := functions.GetExprBridge()
|
||||
if bridge.ContainsLikeOperator(conditionStr) {
|
||||
if processed, err := bridge.PreprocessLikeExpression(conditionStr); err == nil {
|
||||
processedCondition = processed
|
||||
}
|
||||
}
|
||||
|
||||
filter, err := condition.NewExprCondition(processedCondition)
|
||||
if err != nil {
|
||||
return fmt.Errorf("compile filter error: %w", err)
|
||||
}
|
||||
@@ -334,8 +344,17 @@ func (s *Stream) process() {
|
||||
|
||||
// 应用 HAVING 过滤条件
|
||||
if s.config.Having != "" {
|
||||
// 预处理HAVING条件中的LIKE语法,转换为expr-lang可理解的形式
|
||||
processedHaving := s.config.Having
|
||||
bridge := functions.GetExprBridge()
|
||||
if bridge.ContainsLikeOperator(s.config.Having) {
|
||||
if processed, err := bridge.PreprocessLikeExpression(s.config.Having); err == nil {
|
||||
processedHaving = processed
|
||||
}
|
||||
}
|
||||
|
||||
// 创建 HAVING 条件
|
||||
havingFilter, err := condition.NewExprCondition(s.config.Having)
|
||||
havingFilter, err := condition.NewExprCondition(processedHaving)
|
||||
if err != nil {
|
||||
logger.Error("having filter error: %v", err)
|
||||
} else {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user