forked from GiteaTest2015/streamsql
Compare commits
31 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 52653b2143 | |||
| 2db23f5b99 | |||
| c7cece15e2 | |||
| 09fe63102e | |||
| b405a935b7 | |||
| 1c781979a6 | |||
| 16778f2ac3 | |||
| ba2cdd5629 | |||
| 47b7e07c6b | |||
| 9739f213e9 | |||
| 4528d1f1a6 | |||
| 6ce76d506c | |||
| 8207e6ba8c | |||
| b2c638671a | |||
| 1744502678 | |||
| 8a6e41ab78 | |||
| 14d9a0874b | |||
| 6f77dc5f7f | |||
| 2907d7e3a3 | |||
| eb29ed921d | |||
| 73f7843996 | |||
| 3ef10b86a9 | |||
| e435ef2040 | |||
| c39f915808 | |||
| e05213267d | |||
| 5f05b0ef61 | |||
| 3662949105 | |||
| c86d22e06a | |||
| 645c233762 | |||
| 29ed63ea50 | |||
| 5f7076de7b |
@@ -2,9 +2,9 @@ name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ main, master, develop ]
|
||||
branches: [ main, dev ]
|
||||
pull_request:
|
||||
branches: [ main, master, develop ]
|
||||
branches: [ main, dev ]
|
||||
|
||||
jobs:
|
||||
test:
|
||||
@@ -42,11 +42,11 @@ jobs:
|
||||
|
||||
- name: Run tests
|
||||
if: matrix.go-version != '1.21'
|
||||
run: go test -v -race -timeout 300s ./...
|
||||
run: go test -v -race -timeout 600s ./...
|
||||
|
||||
- name: Run tests with coverage
|
||||
if: matrix.go-version == '1.21'
|
||||
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 300s ./...
|
||||
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 600s ./...
|
||||
|
||||
- name: Upload coverage reports to Codecov
|
||||
if: matrix.go-version == '1.21'
|
||||
|
||||
@@ -32,7 +32,7 @@ jobs:
|
||||
run: go mod download
|
||||
|
||||
- name: Run all tests
|
||||
run: go test -v -race -timeout 300s ./...
|
||||
run: go test -v -race -timeout 600s ./...
|
||||
|
||||
release:
|
||||
name: Create Release
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/release.yml)
|
||||
[](https://codecov.io/gh/rulego/streamsql)
|
||||
[](https://github.com/avelino/awesome-go?tab=readme-ov-file#stream-processing)
|
||||
|
||||
English| [简体中文](README_ZH.md)
|
||||
|
||||
@@ -150,7 +151,7 @@ func main() {
|
||||
// Step 1: Create StreamSQL Instance
|
||||
// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
|
||||
ssql := streamsql.New()
|
||||
|
||||
defer ssql.Stop()
|
||||
// Step 2: Define Stream SQL Query Statement
|
||||
// This SQL statement showcases StreamSQL's core capabilities:
|
||||
// - SELECT: Choose output fields and aggregation functions
|
||||
@@ -266,8 +267,7 @@ func main() {
|
||||
window_end() as end
|
||||
FROM stream
|
||||
WHERE device.info.type = 'temperature'
|
||||
GROUP BY device.location, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
|
||||
GROUP BY device.location, TumblingWindow('5s')`
|
||||
|
||||
err := ssql.Execute(rsql)
|
||||
if err != nil {
|
||||
@@ -334,6 +334,11 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
|
||||
- **Characteristics**: The size of the window is not related to time but is divided based on the volume of data. It is suitable for segmenting data based on the amount of data.
|
||||
- **Application Scenario**: In industrial IoT, an aggregation calculation is performed every time 100 device status data records are processed.
|
||||
|
||||
- **Session Window**
|
||||
- **Definition**: A dynamic window based on data activity. When the interval between data exceeds a specified timeout, the current session ends and triggers the window.
|
||||
- **Characteristics**: Window size changes dynamically, automatically dividing sessions based on data arrival intervals. When data arrives continuously, the session continues; when the data interval exceeds the timeout, the session ends and triggers the window.
|
||||
- **Application Scenario**: In user behavior analysis, maintain a session when users operate continuously, and close the session and count operations within that session when users stop operating for more than 5 minutes.
|
||||
|
||||
### Stream
|
||||
|
||||
- **Definition**: A continuous sequence of data that is generated in an unbounded manner, typically from sensors, log systems, user behaviors, etc.
|
||||
@@ -342,17 +347,97 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
|
||||
|
||||
### Time Semantics
|
||||
|
||||
- **Event Time**
|
||||
- **Definition**: The actual time when the data occurred, usually represented by a timestamp generated by the data source.
|
||||
StreamSQL supports two time concepts that determine how windows are divided and triggered:
|
||||
|
||||
- **Processing Time**
|
||||
- **Definition**: The time when the data arrives at the processing system.
|
||||
#### Event Time
|
||||
|
||||
- **Definition**: Event time refers to the actual time when data was generated, usually recorded in a field within the data itself (such as `event_time`, `timestamp`, `order_time`, etc.).
|
||||
- **Characteristics**:
|
||||
- Windows are divided based on timestamp field values in the data
|
||||
- Even if data arrives late, it can be correctly counted into the corresponding window based on event time
|
||||
- Uses Watermark mechanism to handle out-of-order and late data
|
||||
- Results are accurate but may have delays (need to wait for late data)
|
||||
- **Use Cases**:
|
||||
- Scenarios requiring precise temporal analysis
|
||||
- Scenarios where data may arrive out of order or delayed
|
||||
- Historical data replay and analysis
|
||||
- **Configuration**: Use `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` to specify the event time field
|
||||
- **Example**:
|
||||
```sql
|
||||
SELECT deviceId, COUNT(*) as cnt
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('1m')
|
||||
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
|
||||
```
|
||||
|
||||
#### Processing Time
|
||||
|
||||
- **Definition**: Processing time refers to the time when data arrives at the StreamSQL processing system, i.e., the current time when the system receives the data.
|
||||
- **Characteristics**:
|
||||
- Windows are divided based on the time data arrives at the system (`time.Now()`)
|
||||
- Regardless of the time field value in the data, it is counted into the current window based on arrival time
|
||||
- Uses system clock (Timer) to trigger windows
|
||||
- Low latency but results may be inaccurate (cannot handle out-of-order and late data)
|
||||
- **Use Cases**:
|
||||
- Real-time monitoring and alerting scenarios
|
||||
- Scenarios with high latency requirements and relatively low accuracy requirements
|
||||
- Scenarios where data arrives in order and delay is controllable
|
||||
- **Configuration**: Default when `WITH (TIMESTAMP=...)` is not specified
|
||||
- **Example**:
|
||||
```sql
|
||||
SELECT deviceId, COUNT(*) as cnt
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('1m')
|
||||
-- No WITH clause specified, defaults to processing time
|
||||
```
|
||||
|
||||
#### Event Time vs Processing Time Comparison
|
||||
|
||||
| Feature | Event Time | Processing Time |
|
||||
|---------|------------|-----------------|
|
||||
| **Time Source** | Timestamp field in data | System current time |
|
||||
| **Window Division** | Based on event timestamp | Based on data arrival time |
|
||||
| **Late Data Handling** | Supported (Watermark mechanism) | Not supported |
|
||||
| **Out-of-Order Handling** | Supported (Watermark mechanism) | Not supported |
|
||||
| **Result Accuracy** | Accurate | May be inaccurate |
|
||||
| **Processing Latency** | Higher (need to wait for late data) | Low (real-time trigger) |
|
||||
| **Configuration** | `WITH (TIMESTAMP='field')` | Default (no WITH clause) |
|
||||
| **Use Cases** | Precise temporal analysis, historical replay | Real-time monitoring, low latency requirements |
|
||||
|
||||
#### Window Time
|
||||
|
||||
- **Window Start Time**
|
||||
- **Definition**: The starting time point of the window based on event time. For example, for a sliding window based on event time, the window start time is the timestamp of the earliest event within the window.
|
||||
- **Event Time Windows**: The starting time point of the window, aligned to window boundaries based on event time (e.g., aligned to minute or hour boundaries).
|
||||
- **Processing Time Windows**: The starting time point of the window, based on the time data arrives at the system.
|
||||
- **Example**: For an event-time-based tumbling window `TumblingWindow('5m')`, the window start time aligns to multiples of 5 minutes (e.g., 10:00, 10:05, 10:10).
|
||||
|
||||
- **Window End Time**
|
||||
- **Definition**: The ending time point of the window based on event time. Typically, the window end time is the window start time plus the duration of the window. For example, if the duration of a sliding window is 1 minute, then the window end time is the window start time plus 1 minute.
|
||||
- **Event Time Windows**: The ending time point of the window, usually the window start time plus the window duration. Windows trigger when `watermark >= window_end`.
|
||||
- **Processing Time Windows**: The ending time point of the window, based on the time data arrives at the system plus the window duration. Windows trigger when the system clock reaches the end time.
|
||||
- **Example**: For a tumbling window with a duration of 1 minute, if the window start time is 10:00, then the window end time is 10:01.
|
||||
|
||||
#### Watermark Mechanism (Event Time Windows Only)
|
||||
|
||||
- **Definition**: Watermark indicates "events with timestamps less than this time should not arrive anymore", used to determine when windows can trigger.
|
||||
- **Calculation Formula**: `Watermark = max(event_time) - MaxOutOfOrderness`
|
||||
- **Window Trigger Condition**: Windows trigger when `watermark >= window_end`
|
||||
- **Configuration Parameters**:
|
||||
- `MAXOUTOFORDERNESS`: Maximum allowed out-of-order time for tolerating data disorder (default: 0, no out-of-order allowed)
|
||||
- `ALLOWEDLATENESS`: Time window can accept late data after triggering (default: 0, no late data accepted)
|
||||
- `IDLETIMEOUT`: Timeout for advancing Watermark based on processing time when data source is idle (default: 0, disabled)
|
||||
- **Example**:
|
||||
```sql
|
||||
SELECT deviceId, COUNT(*) as cnt
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5m')
|
||||
WITH (
|
||||
TIMESTAMP='eventTime',
|
||||
TIMEUNIT='ms',
|
||||
MAXOUTOFORDERNESS='5s', -- Tolerate 5 seconds of out-of-order
|
||||
ALLOWEDLATENESS='2s', -- Accept 2 seconds of late data after window triggers
|
||||
IDLETIMEOUT='5s' -- Advance watermark based on processing time after 5s of no data
|
||||
)
|
||||
```
|
||||
|
||||
## Contribution Guidelines
|
||||
|
||||
|
||||
+96
-10
@@ -4,6 +4,7 @@
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
|
||||
[](https://github.com/rulego/streamsql/actions/workflows/release.yml)
|
||||
[](https://codecov.io/gh/rulego/streamsql)
|
||||
[](https://github.com/avelino/awesome-go?tab=readme-ov-file#stream-processing)
|
||||
|
||||
[English](README.md)| 简体中文
|
||||
|
||||
@@ -150,7 +151,7 @@ import (
|
||||
func main() {
|
||||
// 1. 创建StreamSQL实例 - 这是流式SQL处理引擎的入口
|
||||
ssql := streamsql.New()
|
||||
|
||||
defer ssql.Stop()
|
||||
// 2. 定义流式SQL查询语句
|
||||
// 核心概念解析:
|
||||
// - TumblingWindow('5s'): 滚动窗口,每5秒创建一个新窗口,窗口之间不重叠
|
||||
@@ -284,8 +285,7 @@ func main() {
|
||||
window_end() as end
|
||||
FROM stream
|
||||
WHERE device.info.type = 'temperature'
|
||||
GROUP BY device.location, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
|
||||
GROUP BY device.location, TumblingWindow('5s')`
|
||||
|
||||
err := ssql.Execute(rsql)
|
||||
if err != nil {
|
||||
@@ -346,6 +346,11 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
|
||||
- **特点**:窗口的大小与时间无关,而是根据数据量来划分,适合对数据量进行分段处理。
|
||||
- **应用场景**:在工业物联网中,每处理 100 条设备状态数据后进行一次聚合计算。
|
||||
|
||||
- **会话窗口(Session Window)**
|
||||
- **定义**:基于数据活跃度的动态窗口,当数据之间的间隔超过指定的超时时间时,当前会话结束并触发窗口。
|
||||
- **特点**:窗口大小动态变化,根据数据到达的间隔自动划分会话。当数据连续到达时,会话持续;当数据间隔超过超时时间时,会话结束并触发窗口。
|
||||
- **应用场景**:在用户行为分析中,当用户连续操作时保持会话,当用户停止操作超过 5 分钟后关闭会话并统计该会话内的操作次数。
|
||||
|
||||
### 流(Stream)
|
||||
|
||||
- **定义**:流是数据的连续序列,数据以无界的方式产生,通常来自于传感器、日志系统、用户行为等。
|
||||
@@ -354,16 +359,97 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
|
||||
|
||||
### 时间语义
|
||||
|
||||
- **事件时间(Event Time)**
|
||||
- **定义**:数据实际发生的时间,通常由数据源生成的时间戳表示。
|
||||
StreamSQL 支持两种时间概念,它们决定了窗口如何划分和触发:
|
||||
|
||||
#### 事件时间(Event Time)
|
||||
|
||||
- **定义**:事件时间是指数据实际产生的时间,通常记录在数据本身的某个字段中(如 `event_time`、`timestamp`、`order_time` 等)。
|
||||
- **特点**:
|
||||
- 窗口基于数据中的时间戳字段值来划分
|
||||
- 即使数据延迟到达,也能根据事件时间正确统计到对应的窗口
|
||||
- 使用 Watermark 机制来处理乱序和延迟数据
|
||||
- 结果准确,但可能有延迟(需要等待延迟数据)
|
||||
- **使用场景**:
|
||||
- 需要精确时序分析的场景
|
||||
- 数据可能乱序或延迟到达的场景
|
||||
- 历史数据回放和分析
|
||||
- **配置方法**:使用 `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` 指定事件时间字段
|
||||
- **示例**:
|
||||
```sql
|
||||
SELECT deviceId, COUNT(*) as cnt
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('1m')
|
||||
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
|
||||
```
|
||||
|
||||
#### 处理时间(Processing Time)
|
||||
|
||||
- **定义**:处理时间是指数据到达 StreamSQL 处理系统的时间,即系统接收到数据时的当前时间。
|
||||
- **特点**:
|
||||
- 窗口基于数据到达系统的时间(`time.Now()`)来划分
|
||||
- 不管数据中的时间字段是什么值,都按到达时间统计到当前窗口
|
||||
- 使用系统时钟(Timer)来触发窗口
|
||||
- 延迟低,但结果可能不准确(无法处理乱序和延迟数据)
|
||||
- **使用场景**:
|
||||
- 实时监控和告警场景
|
||||
- 对延迟要求高,对准确性要求相对较低的场景
|
||||
- 数据顺序到达且延迟可控的场景
|
||||
- **配置方法**:不指定 `WITH (TIMESTAMP=...)` 时,默认使用处理时间
|
||||
- **示例**:
|
||||
```sql
|
||||
SELECT deviceId, COUNT(*) as cnt
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('1m')
|
||||
-- 不指定 WITH 子句,默认使用处理时间
|
||||
```
|
||||
|
||||
#### 事件时间 vs 处理时间对比
|
||||
|
||||
| 特性 | 事件时间 (Event Time) | 处理时间 (Processing Time) |
|
||||
|------|---------------------|-------------------------|
|
||||
| **时间来源** | 数据中的时间戳字段 | 系统当前时间 |
|
||||
| **窗口划分** | 基于事件时间戳 | 基于数据到达时间 |
|
||||
| **延迟处理** | 支持(Watermark机制) | 不支持 |
|
||||
| **乱序处理** | 支持(Watermark机制) | 不支持 |
|
||||
| **结果准确性** | 准确 | 可能不准确 |
|
||||
| **处理延迟** | 较高(需等待延迟数据) | 低(实时触发) |
|
||||
| **配置方式** | `WITH (TIMESTAMP='field')` | 默认(不指定WITH) |
|
||||
| **适用场景** | 精确时序分析、历史回放 | 实时监控、低延迟要求 |
|
||||
|
||||
#### 窗口时间
|
||||
|
||||
- **处理时间(Processing Time)**
|
||||
- **定义**:数据到达处理系统的时间。
|
||||
- **窗口开始时间(Window Start Time)**
|
||||
- **定义**:基于事件时间,窗口的起始时间点。例如,对于一个基于事件时间的滑动窗口,窗口开始时间是窗口内最早事件的时间戳。
|
||||
- **事件时间窗口**:窗口的起始时间点,基于事件时间对齐到窗口边界(如对齐到分钟、小时的整点)。
|
||||
- **处理时间窗口**:窗口的起始时间点,基于数据到达系统的时间。
|
||||
- **示例**:对于一个基于事件时间的滚动窗口 `TumblingWindow('5m')`,窗口开始时间会对齐到5分钟的倍数(如 10:00、10:05、10:10)。
|
||||
|
||||
- **窗口结束时间(Window End Time)**
|
||||
- **定义**:基于事件时间,窗口的结束时间点。通常窗口结束时间是窗口开始时间加上窗口的持续时间。
|
||||
- 例如,一个滑动窗口的持续时间为 1 分钟,则窗口结束时间是窗口开始时间加上 1 分钟。
|
||||
- **事件时间窗口**:窗口的结束时间点,通常是窗口开始时间加上窗口的持续时间。窗口在 `watermark >= window_end` 时触发。
|
||||
- **处理时间窗口**:窗口的结束时间点,基于数据到达系统的时间加上窗口持续时间。窗口在系统时钟到达结束时间时触发。
|
||||
- **示例**:一个持续时间为 1 分钟的滚动窗口,如果窗口开始时间是 10:00,则窗口结束时间是 10:01。
|
||||
|
||||
#### Watermark 机制(仅事件时间窗口)
|
||||
|
||||
- **定义**:Watermark 表示"小于该时间的事件不应该再到达",用于判断窗口是否可以触发。
|
||||
- **计算公式**:`Watermark = max(event_time) - MaxOutOfOrderness`
|
||||
- **窗口触发条件**:当 `watermark >= window_end` 时,窗口触发
|
||||
- **配置参数**:
|
||||
- `MAXOUTOFORDERNESS`:允许的最大乱序时间,用于容忍数据乱序(默认:0,不允许乱序)
|
||||
- `ALLOWEDLATENESS`:窗口触发后还能接受延迟数据的时间(默认:0,不接受延迟数据)
|
||||
- `IDLETIMEOUT`:数据源空闲时,基于处理时间推进 Watermark 的超时时间(默认:0,禁用)
|
||||
- **示例**:
|
||||
```sql
|
||||
SELECT deviceId, COUNT(*) as cnt
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5m')
|
||||
WITH (
|
||||
TIMESTAMP='eventTime',
|
||||
TIMEUNIT='ms',
|
||||
MAXOUTOFORDERNESS='5s', -- 容忍5秒的乱序
|
||||
ALLOWEDLATENESS='2s', -- 窗口触发后还能接受2秒的延迟数据
|
||||
IDLETIMEOUT='5s' -- 5秒无数据,基于处理时间推进watermark
|
||||
)
|
||||
```
|
||||
|
||||
## 贡献指南
|
||||
|
||||
|
||||
@@ -26,6 +26,8 @@ integration with the RuleGo ecosystem.
|
||||
• Lightweight design - Pure in-memory operations, no external dependencies
|
||||
• SQL syntax support - Process stream data using familiar SQL syntax
|
||||
• Multiple window types - Sliding, tumbling, counting, and session windows
|
||||
• Event time and processing time - Support both time semantics for accurate stream processing
|
||||
• Watermark mechanism - Handle out-of-order and late-arriving data with configurable tolerance
|
||||
• Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
|
||||
• Plugin-based custom functions - Runtime dynamic registration, supports 8 function types
|
||||
• RuleGo ecosystem integration - Extend input/output sources using RuleGo components
|
||||
@@ -107,6 +109,77 @@ StreamSQL supports multiple window types:
|
||||
// Session window - Automatically closes session after 5-minute timeout
|
||||
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
|
||||
|
||||
# Event Time vs Processing Time
|
||||
|
||||
StreamSQL supports two time semantics for window processing:
|
||||
|
||||
## Processing Time (Default)
|
||||
|
||||
Processing time uses the system clock when data arrives. Windows are triggered based on data arrival time:
|
||||
|
||||
// Processing time window (default)
|
||||
SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m')
|
||||
// Windows are triggered every 5 minutes based on when data arrives
|
||||
|
||||
## Event Time
|
||||
|
||||
Event time uses timestamps embedded in the data itself. Windows are triggered based on event timestamps,
|
||||
allowing correct handling of out-of-order and late-arriving data:
|
||||
|
||||
// Event time window - Use 'order_time' field as event timestamp
|
||||
SELECT COUNT(*) as order_count
|
||||
FROM stream
|
||||
GROUP BY TumblingWindow('5m')
|
||||
WITH (TIMESTAMP='order_time')
|
||||
|
||||
// Event time with integer timestamp (Unix milliseconds)
|
||||
SELECT AVG(temperature) FROM stream
|
||||
GROUP BY TumblingWindow('1m')
|
||||
WITH (TIMESTAMP='event_time', TIMEUNIT='ms')
|
||||
|
||||
## Watermark and Late Data Handling
|
||||
|
||||
Event time windows use watermark mechanism to handle out-of-order and late data:
|
||||
|
||||
// Configure max out-of-orderness (tolerate 5 seconds of out-of-order data)
|
||||
SELECT COUNT(*) FROM stream
|
||||
GROUP BY TumblingWindow('5m')
|
||||
WITH (
|
||||
TIMESTAMP='order_time',
|
||||
MAXOUTOFORDERNESS='5s' // Watermark = max(event_time) - 5s
|
||||
)
|
||||
|
||||
// Configure allowed lateness (accept late data for 2 seconds after window closes)
|
||||
SELECT COUNT(*) FROM stream
|
||||
GROUP BY TumblingWindow('5m')
|
||||
WITH (
|
||||
TIMESTAMP='order_time',
|
||||
ALLOWEDLATENESS='2s' // Window stays open for 2s after trigger
|
||||
)
|
||||
|
||||
// Combine both configurations
|
||||
SELECT COUNT(*) FROM stream
|
||||
GROUP BY TumblingWindow('5m')
|
||||
WITH (
|
||||
TIMESTAMP='order_time',
|
||||
MAXOUTOFORDERNESS='5s', // Tolerate 5s out-of-order before trigger
|
||||
ALLOWEDLATENESS='2s' // Accept 2s late data after trigger
|
||||
)
|
||||
|
||||
// Configure idle source mechanism (advance watermark based on processing time when data source is idle)
|
||||
SELECT COUNT(*) FROM stream
|
||||
GROUP BY TumblingWindow('5m')
|
||||
WITH (
|
||||
TIMESTAMP='order_time',
|
||||
IDLETIMEOUT='5s' // If no data arrives within 5s, watermark advances based on processing time
|
||||
)
|
||||
|
||||
Key concepts:
|
||||
• MaxOutOfOrderness: Affects watermark calculation, delays window trigger to tolerate out-of-order data
|
||||
• AllowedLateness: Keeps window open after trigger to accept late data and update results
|
||||
• IdleTimeout: When data source is idle (no data arrives within timeout), watermark advances based on processing time to ensure windows can close
|
||||
• Watermark: Indicates that no events with timestamp less than watermark are expected
|
||||
|
||||
# Custom Functions
|
||||
|
||||
StreamSQL supports plugin-based custom functions with runtime dynamic registration:
|
||||
|
||||
@@ -131,7 +131,25 @@ func validateBasicSyntax(exprStr string) error {
|
||||
}
|
||||
|
||||
// Check for invalid characters
|
||||
inQuotes := false
|
||||
var quoteChar rune
|
||||
|
||||
for i, ch := range trimmed {
|
||||
// Handle quotes
|
||||
if ch == '\'' || ch == '"' {
|
||||
if !inQuotes {
|
||||
inQuotes = true
|
||||
quoteChar = ch
|
||||
} else if ch == quoteChar {
|
||||
inQuotes = false
|
||||
}
|
||||
}
|
||||
|
||||
// If inside quotes, skip character validation
|
||||
if inQuotes {
|
||||
continue
|
||||
}
|
||||
|
||||
// Allowed characters: letters, numbers, operators, parentheses, dots, underscores, spaces, quotes
|
||||
if !isValidChar(ch) {
|
||||
return fmt.Errorf("invalid character '%c' at position %d", ch, i)
|
||||
|
||||
@@ -3,7 +3,7 @@ module github.com/rulego/streamsql
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/expr-lang/expr v1.17.6
|
||||
github.com/expr-lang/expr v1.17.7
|
||||
github.com/stretchr/testify v1.10.0
|
||||
)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/expr-lang/expr v1.17.6 h1:1h6i8ONk9cexhDmowO/A64VPxHScu7qfSl2k8OlINec=
|
||||
github.com/expr-lang/expr v1.17.6/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
|
||||
github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8=
|
||||
github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
|
||||
+140
-67
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/rulego/streamsql/functions"
|
||||
"github.com/rulego/streamsql/types"
|
||||
"github.com/rulego/streamsql/utils/cast"
|
||||
"github.com/rulego/streamsql/window"
|
||||
|
||||
"github.com/rulego/streamsql/aggregator"
|
||||
@@ -34,10 +35,13 @@ type Field struct {
|
||||
}
|
||||
|
||||
type WindowDefinition struct {
|
||||
Type string
|
||||
Params []interface{}
|
||||
TsProp string
|
||||
TimeUnit time.Duration
|
||||
Type string
|
||||
Params []interface{}
|
||||
TsProp string
|
||||
TimeUnit time.Duration
|
||||
MaxOutOfOrderness time.Duration // Maximum allowed out-of-orderness for event time
|
||||
AllowedLateness time.Duration // Maximum allowed lateness for event time windows
|
||||
IdleTimeout time.Duration // Idle source timeout: when no data arrives within this duration, watermark advances based on processing time
|
||||
}
|
||||
|
||||
// ToStreamConfig converts AST to Stream configuration
|
||||
@@ -58,9 +62,16 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
|
||||
windowType = window.TypeSession
|
||||
}
|
||||
|
||||
params, err := parseWindowParamsWithType(s.Window.Params, windowType)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("failed to parse window parameters: %w", err)
|
||||
// Parse window parameters - now returns array directly
|
||||
params := s.Window.Params
|
||||
|
||||
// Validate and convert parameters based on window type
|
||||
if len(params) > 0 {
|
||||
var err error
|
||||
params, err = validateWindowParams(params, windowType)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("failed to validate window parameters: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if window processing is needed
|
||||
@@ -80,16 +91,7 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
|
||||
if !needWindow && hasAggregation {
|
||||
needWindow = true
|
||||
windowType = window.TypeTumbling
|
||||
params = map[string]interface{}{
|
||||
"size": 10 * time.Second, // Default 10-second window
|
||||
}
|
||||
}
|
||||
|
||||
// Handle special configuration for SessionWindow
|
||||
var groupByKey string
|
||||
if windowType == window.TypeSession && len(s.GroupBy) > 0 {
|
||||
// For session window, use the first GROUP BY field as session key
|
||||
groupByKey = s.GroupBy[0]
|
||||
params = []interface{}{10 * time.Second} // Default 10-second window
|
||||
}
|
||||
|
||||
// If no aggregation functions, collect simple fields
|
||||
@@ -105,10 +107,10 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
|
||||
simpleFields = append(simpleFields, fieldName+":"+field.Alias)
|
||||
} else {
|
||||
// For fields without alias, check if it's a string literal
|
||||
_, n, _, _, err := ParseAggregateTypeWithExpression(fieldName)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
_, n, _, _, err := ParseAggregateTypeWithExpression(fieldName)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
if n != "" {
|
||||
// If string literal, use parsed field name (remove quotes)
|
||||
simpleFields = append(simpleFields, n)
|
||||
@@ -134,14 +136,25 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
// Determine time characteristic based on whether TIMESTAMP is specified in WITH clause
|
||||
// If TsProp is set, use EventTime; otherwise use ProcessingTime (default)
|
||||
timeCharacteristic := types.ProcessingTime
|
||||
if s.Window.TsProp != "" {
|
||||
timeCharacteristic = types.EventTime
|
||||
}
|
||||
|
||||
// Build Stream configuration
|
||||
config := types.Config{
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: windowType,
|
||||
Params: params,
|
||||
TsProp: s.Window.TsProp,
|
||||
TimeUnit: s.Window.TimeUnit,
|
||||
GroupByKey: groupByKey,
|
||||
Type: windowType,
|
||||
Params: params,
|
||||
TsProp: s.Window.TsProp,
|
||||
TimeUnit: s.Window.TimeUnit,
|
||||
TimeCharacteristic: timeCharacteristic,
|
||||
MaxOutOfOrderness: s.Window.MaxOutOfOrderness,
|
||||
AllowedLateness: s.Window.AllowedLateness,
|
||||
IdleTimeout: s.Window.IdleTimeout,
|
||||
GroupByKeys: extractGroupFields(s),
|
||||
},
|
||||
GroupFields: extractGroupFields(s),
|
||||
SelectFields: aggs,
|
||||
@@ -245,9 +258,9 @@ func buildSelectFields(fields []Field) (aggMap map[string]aggregator.AggregateTy
|
||||
for _, f := range fields {
|
||||
if alias := f.Alias; alias != "" {
|
||||
t, n, _, _, parseErr := ParseAggregateTypeWithExpression(f.Expression)
|
||||
if parseErr != nil {
|
||||
return nil, nil, parseErr
|
||||
}
|
||||
if parseErr != nil {
|
||||
return nil, nil, parseErr
|
||||
}
|
||||
if t != "" {
|
||||
// Use alias as key for aggregator, not field name
|
||||
selectFields[alias] = t
|
||||
@@ -287,11 +300,11 @@ func detectNestedAggregationRecursive(expr string, inAggregation bool) error {
|
||||
// 使用正则表达式匹配函数调用模式
|
||||
pattern := regexp.MustCompile(`(?i)([a-z_]+)\s*\(`)
|
||||
matches := pattern.FindAllStringSubmatchIndex(expr, -1)
|
||||
|
||||
|
||||
for _, match := range matches {
|
||||
funcStart := match[0]
|
||||
funcName := strings.ToLower(expr[match[2]:match[3]])
|
||||
|
||||
|
||||
// 检查函数是否为聚合函数
|
||||
if fn, exists := functions.Get(funcName); exists {
|
||||
switch fn.GetType() {
|
||||
@@ -300,14 +313,14 @@ func detectNestedAggregationRecursive(expr string, inAggregation bool) error {
|
||||
if inAggregation {
|
||||
return fmt.Errorf("aggregate function calls cannot be nested")
|
||||
}
|
||||
|
||||
|
||||
// 找到该函数的参数部分
|
||||
funcEnd := findMatchingParenInternal(expr, funcStart+len(funcName))
|
||||
if funcEnd > funcStart {
|
||||
// 提取函数参数
|
||||
paramStart := funcStart + len(funcName) + 1
|
||||
params := expr[paramStart:funcEnd]
|
||||
|
||||
|
||||
// 在聚合函数参数内部递归检查
|
||||
if err := detectNestedAggregationRecursive(params, true); err != nil {
|
||||
return err
|
||||
@@ -316,7 +329,7 @@ func detectNestedAggregationRecursive(expr string, inAggregation bool) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -697,43 +710,103 @@ func extractSimpleField(fieldExpr string) string {
|
||||
return fieldExpr
|
||||
}
|
||||
|
||||
func parseWindowParams(params []interface{}) (map[string]interface{}, error) {
|
||||
return parseWindowParamsWithType(params, "")
|
||||
}
|
||||
// validateWindowParams validates and converts window parameters based on window type
|
||||
// Returns validated parameters array with proper types
|
||||
func validateWindowParams(params []interface{}, windowType string) ([]interface{}, error) {
|
||||
if len(params) == 0 {
|
||||
return params, nil
|
||||
}
|
||||
|
||||
func parseWindowParamsWithType(params []interface{}, windowType string) (map[string]interface{}, error) {
|
||||
result := make(map[string]interface{})
|
||||
var key string
|
||||
for index, v := range params {
|
||||
if windowType == window.TypeSession {
|
||||
// First parameter for SessionWindow is timeout
|
||||
if index == 0 {
|
||||
key = "timeout"
|
||||
} else {
|
||||
key = fmt.Sprintf("param%d", index)
|
||||
}
|
||||
} else {
|
||||
// Parameters for other window types
|
||||
if index == 0 {
|
||||
key = "size"
|
||||
} else if index == 1 {
|
||||
key = "slide"
|
||||
} else {
|
||||
key = "offset"
|
||||
}
|
||||
validated := make([]interface{}, 0, len(params))
|
||||
|
||||
if windowType == window.TypeCounting {
|
||||
// CountingWindow expects integer count as first parameter
|
||||
if len(params) == 0 {
|
||||
return nil, fmt.Errorf("counting window requires at least one parameter")
|
||||
}
|
||||
if s, ok := v.(string); ok {
|
||||
dur, err := time.ParseDuration(s)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid %s duration: %w", s, err)
|
||||
}
|
||||
result[key] = dur
|
||||
} else {
|
||||
return nil, fmt.Errorf("%s parameter must be string format (like '5s')", s)
|
||||
|
||||
// Convert first parameter to int using cast utility
|
||||
count, err := cast.ToIntE(params[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid count parameter: %w", err)
|
||||
}
|
||||
|
||||
if count <= 0 {
|
||||
return nil, fmt.Errorf("counting window count must be positive, got: %d", count)
|
||||
}
|
||||
|
||||
validated = append(validated, count)
|
||||
|
||||
// Add any additional parameters
|
||||
if len(params) > 1 {
|
||||
validated = append(validated, params[1:]...)
|
||||
}
|
||||
|
||||
return validated, nil
|
||||
}
|
||||
|
||||
// Helper function to convert a value to time.Duration
|
||||
// For numeric types, treats them as seconds
|
||||
// For strings, uses time.ParseDuration
|
||||
convertToDuration := func(val interface{}) (time.Duration, error) {
|
||||
switch v := val.(type) {
|
||||
case time.Duration:
|
||||
return v, nil
|
||||
case string:
|
||||
// Use ToDurationE which handles string parsing
|
||||
return cast.ToDurationE(v)
|
||||
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
|
||||
// Treat numeric integers as seconds
|
||||
return time.Duration(cast.ToInt(v)) * time.Second, nil
|
||||
case float32, float64:
|
||||
// Treat numeric floats as seconds
|
||||
return time.Duration(int(cast.ToFloat64(v))) * time.Second, nil
|
||||
default:
|
||||
// Try ToDurationE as fallback
|
||||
return cast.ToDurationE(v)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
if windowType == window.TypeSession {
|
||||
// SessionWindow expects timeout duration as first parameter
|
||||
if len(params) == 0 {
|
||||
return nil, fmt.Errorf("session window requires at least one parameter")
|
||||
}
|
||||
|
||||
timeout, err := convertToDuration(params[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid timeout duration: %w", err)
|
||||
}
|
||||
|
||||
if timeout <= 0 {
|
||||
return nil, fmt.Errorf("session window timeout must be positive, got: %v", timeout)
|
||||
}
|
||||
|
||||
validated = append(validated, timeout)
|
||||
|
||||
// Add any additional parameters
|
||||
if len(params) > 1 {
|
||||
validated = append(validated, params[1:]...)
|
||||
}
|
||||
|
||||
return validated, nil
|
||||
}
|
||||
|
||||
// For TumblingWindow and SlidingWindow, convert parameters to time.Duration
|
||||
for index, v := range params {
|
||||
dur, err := convertToDuration(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid duration parameter at index %d: %w", index, err)
|
||||
}
|
||||
|
||||
if dur <= 0 {
|
||||
return nil, fmt.Errorf("duration parameter at index %d must be positive, got: %v", index, dur)
|
||||
}
|
||||
|
||||
validated = append(validated, dur)
|
||||
}
|
||||
|
||||
return validated, nil
|
||||
}
|
||||
|
||||
func parseAggregateExpression(expr string) string {
|
||||
@@ -958,7 +1031,7 @@ func parseComplexAggExpressionInternal(expr string) ([]types.AggregationFieldInf
|
||||
if err := detectNestedAggregation(expr); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
|
||||
// 使用改进的递归解析方法
|
||||
aggFields, exprTemplate := parseNestedFunctionsInternal(expr, make([]types.AggregationFieldInfo, 0))
|
||||
return aggFields, exprTemplate, nil
|
||||
|
||||
+2
-2
@@ -251,8 +251,8 @@ func TestSelectStatementEdgeCases(t *testing.T) {
|
||||
if config2.WindowConfig.Type != window.TypeSession {
|
||||
t.Errorf("Expected session window, got %v", config2.WindowConfig.Type)
|
||||
}
|
||||
if config2.WindowConfig.GroupByKey != "user_id" {
|
||||
t.Errorf("Expected GroupByKey to be 'user_id', got %s", config2.WindowConfig.GroupByKey)
|
||||
if len(config2.WindowConfig.GroupByKeys) == 0 || config2.WindowConfig.GroupByKeys[0] != "user_id" {
|
||||
t.Errorf("Expected GroupByKeys to contain 'user_id', got %v", config2.WindowConfig.GroupByKeys)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+22
-6
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/rulego/streamsql/aggregator"
|
||||
"github.com/rulego/streamsql/types"
|
||||
"github.com/rulego/streamsql/window"
|
||||
)
|
||||
|
||||
// TestParseSmartParameters 测试智能参数解析函数
|
||||
@@ -202,6 +203,12 @@ func TestParseWindowParams(t *testing.T) {
|
||||
windowType: "SLIDINGWINDOW",
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "计数窗口参数",
|
||||
params: []interface{}{100},
|
||||
windowType: "COUNTINGWINDOW",
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "无效持续时间",
|
||||
params: []interface{}{"invalid"},
|
||||
@@ -212,7 +219,7 @@ func TestParseWindowParams(t *testing.T) {
|
||||
name: "非字符串参数",
|
||||
params: []interface{}{123},
|
||||
windowType: "TUMBLINGWINDOW",
|
||||
expectError: true,
|
||||
expectError: false, // 整数参数会被视为秒数,这是有效的
|
||||
},
|
||||
{
|
||||
name: "空参数",
|
||||
@@ -224,15 +231,24 @@ func TestParseWindowParams(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var result map[string]interface{}
|
||||
var result []interface{}
|
||||
var err error
|
||||
|
||||
if tt.windowType == "SESSIONWINDOW" {
|
||||
result, err = parseWindowParamsWithType(tt.params, "SESSIONWINDOW")
|
||||
} else {
|
||||
result, err = parseWindowParams(tt.params)
|
||||
// Convert window type to internal format
|
||||
windowType := ""
|
||||
switch tt.windowType {
|
||||
case "SESSIONWINDOW":
|
||||
windowType = window.TypeSession
|
||||
case "TUMBLINGWINDOW":
|
||||
windowType = window.TypeTumbling
|
||||
case "SLIDINGWINDOW":
|
||||
windowType = window.TypeSliding
|
||||
case "COUNTINGWINDOW":
|
||||
windowType = window.TypeCounting
|
||||
}
|
||||
|
||||
result, err = validateWindowParams(tt.params, windowType)
|
||||
|
||||
if tt.expectError {
|
||||
if err == nil {
|
||||
t.Errorf("Expected error but got none")
|
||||
|
||||
@@ -88,7 +88,7 @@ func (fv *FunctionValidator) isBuiltinFunction(funcName string) bool {
|
||||
func (fv *FunctionValidator) isKeyword(word string) bool {
|
||||
keywords := []string{
|
||||
"SELECT", "FROM", "WHERE", "GROUP", "BY", "HAVING", "ORDER",
|
||||
"AS", "DISTINCT", "LIMIT", "WITH", "TIMESTAMP", "TIMEUNIT",
|
||||
"AS", "DISTINCT", "LIMIT", "WITH", "TIMESTAMP", "TIMEUNIT", "MAXOUTOFORDERNESS", "ALLOWEDLATENESS", "IDLETIMEOUT",
|
||||
"TUMBLINGWINDOW", "SLIDINGWINDOW", "COUNTINGWINDOW", "SESSIONWINDOW",
|
||||
"AND", "OR", "NOT", "IN", "LIKE", "IS", "NULL", "TRUE", "FALSE",
|
||||
"BETWEEN", "IS", "NULL", "TRUE", "FALSE", "CASE", "WHEN",
|
||||
|
||||
@@ -41,6 +41,9 @@ const (
|
||||
TokenWITH
|
||||
TokenTimestamp
|
||||
TokenTimeUnit
|
||||
TokenMaxOutOfOrderness
|
||||
TokenAllowedLateness
|
||||
TokenIdleTimeout
|
||||
TokenOrder
|
||||
TokenDISTINCT
|
||||
TokenLIMIT
|
||||
@@ -349,6 +352,12 @@ func (l *Lexer) lookupIdent(ident string) Token {
|
||||
return Token{Type: TokenTimestamp, Value: ident}
|
||||
case "TIMEUNIT":
|
||||
return Token{Type: TokenTimeUnit, Value: ident}
|
||||
case "MAXOUTOFORDERNESS":
|
||||
return Token{Type: TokenMaxOutOfOrderness, Value: ident}
|
||||
case "ALLOWEDLATENESS":
|
||||
return Token{Type: TokenAllowedLateness, Value: ident}
|
||||
case "IDLETIMEOUT":
|
||||
return Token{Type: TokenIdleTimeout, Value: ident}
|
||||
case "ORDER":
|
||||
return Token{Type: TokenOrder, Value: ident}
|
||||
case "DISTINCT":
|
||||
|
||||
+101
-20
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/types"
|
||||
"github.com/rulego/streamsql/utils/cast"
|
||||
)
|
||||
|
||||
// 解析器配置常量
|
||||
@@ -506,44 +507,46 @@ func (p *Parser) parseWhere(stmt *SelectStatement) error {
|
||||
}
|
||||
|
||||
func (p *Parser) parseWindowFunction(stmt *SelectStatement, winType string) error {
|
||||
p.lexer.NextToken() // 跳过(
|
||||
var params []interface{}
|
||||
nextTok := p.lexer.NextToken() // 读取下一个 token,应该是 '('
|
||||
if nextTok.Type != TokenLParen {
|
||||
return fmt.Errorf("expected '(' after window function %s, got %s (type: %v)", winType, nextTok.Value, nextTok.Type)
|
||||
}
|
||||
|
||||
// 设置最大次数限制,防止无限循环
|
||||
var params []interface{}
|
||||
maxIterations := 100
|
||||
iterations := 0
|
||||
|
||||
for p.lexer.peekChar() != ')' {
|
||||
// Parse parameters until we find the closing parenthesis
|
||||
for {
|
||||
iterations++
|
||||
// 安全检查:防止无限循环
|
||||
if iterations > maxIterations {
|
||||
return errors.New("window function parameter parsing exceeded maximum iterations, possible syntax error")
|
||||
return fmt.Errorf("window function parameter parsing exceeded maximum iterations")
|
||||
}
|
||||
|
||||
// Read the next token first
|
||||
valTok := p.lexer.NextToken()
|
||||
|
||||
// If we hit the closing parenthesis or EOF, break
|
||||
if valTok.Type == TokenRParen || valTok.Type == TokenEOF {
|
||||
break
|
||||
}
|
||||
|
||||
// Skip commas
|
||||
if valTok.Type == TokenComma {
|
||||
continue
|
||||
}
|
||||
//valTok := p.lexer.NextToken()
|
||||
|
||||
// Handle quoted values
|
||||
if strings.HasPrefix(valTok.Value, "'") && strings.HasSuffix(valTok.Value, "'") {
|
||||
valTok.Value = strings.Trim(valTok.Value, "'")
|
||||
}
|
||||
|
||||
// Add the parameter value
|
||||
params = append(params, convertValue(valTok.Value))
|
||||
}
|
||||
|
||||
if &stmt.Window != nil {
|
||||
stmt.Window.Params = params
|
||||
stmt.Window.Type = winType
|
||||
} else {
|
||||
stmt.Window = WindowDefinition{
|
||||
Type: winType,
|
||||
Params: params,
|
||||
}
|
||||
}
|
||||
stmt.Window.Params = params
|
||||
stmt.Window.Type = winType
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -593,7 +596,9 @@ func (p *Parser) parseGroupBy(stmt *SelectStatement) error {
|
||||
hasWindowFunction := false
|
||||
if tok.Type == TokenTumbling || tok.Type == TokenSliding || tok.Type == TokenCounting || tok.Type == TokenSession {
|
||||
hasWindowFunction = true
|
||||
_ = p.parseWindowFunction(stmt, tok.Value)
|
||||
if err := p.parseWindowFunction(stmt, tok.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
hasGroupBy := false
|
||||
@@ -633,7 +638,15 @@ func (p *Parser) parseGroupBy(stmt *SelectStatement) error {
|
||||
continue
|
||||
}
|
||||
if tok.Type == TokenTumbling || tok.Type == TokenSliding || tok.Type == TokenCounting || tok.Type == TokenSession {
|
||||
_ = p.parseWindowFunction(stmt, tok.Value)
|
||||
if err := p.parseWindowFunction(stmt, tok.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
// After parsing window function, skip adding it to GroupBy and continue
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip right parenthesis tokens (they should be consumed by parseWindowFunction)
|
||||
if tok.Type == TokenRParen {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -696,7 +709,7 @@ func (p *Parser) parseWith(stmt *SelectStatement) error {
|
||||
}
|
||||
}
|
||||
if valTok.Type == TokenTimeUnit {
|
||||
timeUnit := time.Minute
|
||||
timeUnit := time.Millisecond // Default to milliseconds
|
||||
next := p.lexer.NextToken()
|
||||
if next.Type == TokenEQ {
|
||||
next = p.lexer.NextToken()
|
||||
@@ -714,8 +727,10 @@ func (p *Parser) parseWith(stmt *SelectStatement) error {
|
||||
timeUnit = time.Second
|
||||
case "ms":
|
||||
timeUnit = time.Millisecond
|
||||
case "ns":
|
||||
timeUnit = time.Nanosecond
|
||||
default:
|
||||
|
||||
// If unknown unit, keep default (milliseconds)
|
||||
}
|
||||
// Check if Window is initialized; if not, create new WindowDefinition
|
||||
if stmt.Window.Type == "" {
|
||||
@@ -727,6 +742,72 @@ func (p *Parser) parseWith(stmt *SelectStatement) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
if valTok.Type == TokenMaxOutOfOrderness {
|
||||
next := p.lexer.NextToken()
|
||||
if next.Type == TokenEQ {
|
||||
next = p.lexer.NextToken()
|
||||
durationStr := next.Value
|
||||
if strings.HasPrefix(durationStr, "'") && strings.HasSuffix(durationStr, "'") {
|
||||
durationStr = strings.Trim(durationStr, "'")
|
||||
}
|
||||
// Parse duration string like '5s', '2m', '1h', etc.
|
||||
if duration, err := cast.ToDurationE(durationStr); err == nil {
|
||||
// Check if Window is initialized; if not, create new WindowDefinition
|
||||
if stmt.Window.Type == "" {
|
||||
stmt.Window = WindowDefinition{
|
||||
MaxOutOfOrderness: duration,
|
||||
}
|
||||
} else {
|
||||
stmt.Window.MaxOutOfOrderness = duration
|
||||
}
|
||||
}
|
||||
// If parsing fails, silently ignore (keep default 0)
|
||||
}
|
||||
}
|
||||
if valTok.Type == TokenAllowedLateness {
|
||||
next := p.lexer.NextToken()
|
||||
if next.Type == TokenEQ {
|
||||
next = p.lexer.NextToken()
|
||||
durationStr := next.Value
|
||||
if strings.HasPrefix(durationStr, "'") && strings.HasSuffix(durationStr, "'") {
|
||||
durationStr = strings.Trim(durationStr, "'")
|
||||
}
|
||||
// Parse duration string like '5s', '2m', '1h', etc.
|
||||
if duration, err := cast.ToDurationE(durationStr); err == nil {
|
||||
// Check if Window is initialized; if not, create new WindowDefinition
|
||||
if stmt.Window.Type == "" {
|
||||
stmt.Window = WindowDefinition{
|
||||
AllowedLateness: duration,
|
||||
}
|
||||
} else {
|
||||
stmt.Window.AllowedLateness = duration
|
||||
}
|
||||
}
|
||||
// If parsing fails, silently ignore (keep default 0)
|
||||
}
|
||||
}
|
||||
if valTok.Type == TokenIdleTimeout {
|
||||
next := p.lexer.NextToken()
|
||||
if next.Type == TokenEQ {
|
||||
next = p.lexer.NextToken()
|
||||
durationStr := next.Value
|
||||
if strings.HasPrefix(durationStr, "'") && strings.HasSuffix(durationStr, "'") {
|
||||
durationStr = strings.Trim(durationStr, "'")
|
||||
}
|
||||
// Parse duration string like '5s', '2m', '1h', etc.
|
||||
if duration, err := cast.ToDurationE(durationStr); err == nil {
|
||||
// Check if Window is initialized; if not, create new WindowDefinition
|
||||
if stmt.Window.Type == "" {
|
||||
stmt.Window = WindowDefinition{
|
||||
IdleTimeout: duration,
|
||||
}
|
||||
} else {
|
||||
stmt.Window.IdleTimeout = duration
|
||||
}
|
||||
}
|
||||
// If parsing fails, silently ignore (keep default 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -37,7 +37,7 @@ func TestDataProcessor_ApplyDistinct(t *testing.T) {
|
||||
},
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 1 * time.Second},
|
||||
Params: []interface{}{1 * time.Second},
|
||||
},
|
||||
}
|
||||
stream, err := NewStream(config)
|
||||
@@ -79,7 +79,7 @@ func TestDataProcessor_ApplyHavingFilter(t *testing.T) {
|
||||
Having: "temperature > 25",
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 1 * time.Second},
|
||||
Params: []interface{}{1 * time.Second},
|
||||
},
|
||||
}
|
||||
stream, err := NewStream(config)
|
||||
@@ -120,7 +120,7 @@ func TestDataProcessor_ApplyHavingWithCaseExpression(t *testing.T) {
|
||||
Having: "CASE WHEN temperature > 30 THEN 1 WHEN status = 'active' THEN 1 ELSE 0 END",
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 1 * time.Second},
|
||||
Params: []interface{}{1 * time.Second},
|
||||
},
|
||||
}
|
||||
stream, err := NewStream(config)
|
||||
@@ -161,7 +161,7 @@ func TestDataProcessor_ApplyHavingWithCondition(t *testing.T) {
|
||||
Having: "temperature > 25",
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 1 * time.Second},
|
||||
Params: []interface{}{1 * time.Second},
|
||||
},
|
||||
}
|
||||
stream, err := NewStream(config)
|
||||
@@ -541,7 +541,7 @@ func TestStream_ProcessSync(t *testing.T) {
|
||||
},
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 1 * time.Second},
|
||||
Params: []interface{}{1 * time.Second},
|
||||
},
|
||||
}
|
||||
aggStream, err := NewStream(aggConfig)
|
||||
|
||||
@@ -42,7 +42,16 @@ func (s *Stream) safeGetDataChan() chan map[string]interface{} {
|
||||
|
||||
// safeSendToDataChan safely sends data to dataChan
|
||||
func (s *Stream) safeSendToDataChan(data map[string]interface{}) bool {
|
||||
// Check if stream is stopped before attempting to send
|
||||
if atomic.LoadInt32(&s.stopped) == 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
dataChan := s.safeGetDataChan()
|
||||
if dataChan == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
select {
|
||||
case dataChan <- data:
|
||||
return true
|
||||
|
||||
@@ -141,7 +141,7 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) {
|
||||
s.sinksMux.RLock()
|
||||
defer s.sinksMux.RUnlock()
|
||||
|
||||
if len(s.sinks) == 0 {
|
||||
if len(s.sinks) == 0 && len(s.syncSinks) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -150,6 +150,19 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) {
|
||||
for _, sink := range s.sinks {
|
||||
s.submitSinkTask(sink, results)
|
||||
}
|
||||
|
||||
// Execute synchronous sinks (blocking, sequential)
|
||||
for _, sink := range s.syncSinks {
|
||||
// Recover panic for each sync sink to prevent crashing the stream
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logger.Error("Sync sink execution exception: %v", r)
|
||||
}
|
||||
}()
|
||||
sink(results)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// submitSinkTask submits sink task
|
||||
@@ -169,24 +182,41 @@ func (s *Stream) submitSinkTask(sink func([]map[string]interface{}), results []m
|
||||
}
|
||||
|
||||
// Non-blocking task submission
|
||||
// Note: Since we use a worker pool, tasks may be executed out of order
|
||||
select {
|
||||
case s.sinkWorkerPool <- task:
|
||||
// Successfully submitted task
|
||||
default:
|
||||
// Worker pool is full, execute directly in current goroutine (degraded handling)
|
||||
go task()
|
||||
// This also helps with backpressure
|
||||
task()
|
||||
}
|
||||
}
|
||||
|
||||
// AddSink adds a sink function
|
||||
// Parameters:
|
||||
// - sink: result processing function that receives []map[string]interface{} type result data
|
||||
//
|
||||
// Note: Sinks are executed asynchronously in a worker pool, so execution order is NOT guaranteed.
|
||||
// If you need strict ordering, use GetResultsChan() instead.
|
||||
func (s *Stream) AddSink(sink func([]map[string]interface{})) {
|
||||
s.sinksMux.Lock()
|
||||
defer s.sinksMux.Unlock()
|
||||
s.sinks = append(s.sinks, sink)
|
||||
}
|
||||
|
||||
// AddSyncSink adds a synchronous sink function
|
||||
// Parameters:
|
||||
// - sink: result processing function that receives []map[string]interface{} type result data
|
||||
//
|
||||
// Note: Sync sinks are executed sequentially in the result processing goroutine.
|
||||
// They block subsequent processing, so they should be fast.
|
||||
func (s *Stream) AddSyncSink(sink func([]map[string]interface{})) {
|
||||
s.sinksMux.Lock()
|
||||
defer s.sinksMux.Unlock()
|
||||
s.syncSinks = append(s.syncSinks, sink)
|
||||
}
|
||||
|
||||
// GetResultsChan gets the result channel
|
||||
func (s *Stream) GetResultsChan() <-chan []map[string]interface{} {
|
||||
return s.resultChan
|
||||
|
||||
@@ -42,7 +42,7 @@ func (s *Stream) GetStats() map[string]int64 {
|
||||
dataChanCap := int64(cap(s.dataChan))
|
||||
s.dataChanMux.RUnlock()
|
||||
|
||||
return map[string]int64{
|
||||
stats := map[string]int64{
|
||||
InputCount: atomic.LoadInt64(&s.inputCount),
|
||||
OutputCount: atomic.LoadInt64(&s.outputCount),
|
||||
DroppedCount: atomic.LoadInt64(&s.droppedCount),
|
||||
@@ -55,6 +55,15 @@ func (s *Stream) GetStats() map[string]int64 {
|
||||
ActiveRetries: int64(atomic.LoadInt32(&s.activeRetries)),
|
||||
Expanding: int64(atomic.LoadInt32(&s.expanding)),
|
||||
}
|
||||
|
||||
if s.Window != nil {
|
||||
winStats := s.Window.GetStats()
|
||||
for k, v := range winStats {
|
||||
stats[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// GetDetailedStats gets detailed performance statistics
|
||||
|
||||
@@ -59,6 +59,11 @@ func (dp *DataProcessor) Process() {
|
||||
currentDataChan := dp.stream.dataChan
|
||||
dp.stream.dataChanMux.RUnlock()
|
||||
|
||||
// Check if dataChan is nil (stream has been stopped)
|
||||
if currentDataChan == nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case data, ok := <-currentDataChan:
|
||||
if !ok {
|
||||
@@ -305,8 +310,19 @@ func (dp *DataProcessor) startWindowProcessing() {
|
||||
}
|
||||
}()
|
||||
|
||||
for batch := range dp.stream.Window.OutputChan() {
|
||||
dp.processWindowBatch(batch)
|
||||
outputChan := dp.stream.Window.OutputChan()
|
||||
for {
|
||||
select {
|
||||
case batch, ok := <-outputChan:
|
||||
if !ok {
|
||||
// Channel closed, exit
|
||||
return
|
||||
}
|
||||
dp.processWindowBatch(batch)
|
||||
case <-dp.stream.done:
|
||||
// Stream stopped, exit
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ func TestDataProcessor_InitializeAggregator(t *testing.T) {
|
||||
},
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 1 * time.Second},
|
||||
Params: []interface{}{1 * time.Second},
|
||||
},
|
||||
}
|
||||
stream, err := NewStream(config)
|
||||
@@ -90,7 +90,7 @@ func TestDataProcessor_RegisterExpressionCalculator(t *testing.T) {
|
||||
},
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 1 * time.Second},
|
||||
Params: []interface{}{1 * time.Second},
|
||||
},
|
||||
}
|
||||
stream, err := NewStream(config)
|
||||
|
||||
@@ -61,7 +61,32 @@ func (s *Stream) compileSimpleFieldInfo(fieldSpec string) *fieldProcessInfo {
|
||||
}
|
||||
|
||||
// Parse alias
|
||||
parts := strings.Split(fieldSpec, ":")
|
||||
var parts []string
|
||||
// Helper to split field spec considering quotes
|
||||
splitFieldSpec := func(spec string) []string {
|
||||
inQuote := false
|
||||
var quoteChar byte
|
||||
for i := 0; i < len(spec); i++ {
|
||||
c := spec[i]
|
||||
if inQuote {
|
||||
if c == quoteChar {
|
||||
inQuote = false
|
||||
}
|
||||
} else {
|
||||
if c == '\'' || c == '"' || c == '`' {
|
||||
inQuote = true
|
||||
quoteChar = c
|
||||
} else if c == ':' {
|
||||
// Found separator
|
||||
return []string{spec[:i], spec[i+1:]}
|
||||
}
|
||||
}
|
||||
}
|
||||
// No separator found outside quotes
|
||||
return []string{spec}
|
||||
}
|
||||
|
||||
parts = splitFieldSpec(fieldSpec)
|
||||
info.fieldName = parts[0]
|
||||
// Remove backticks from field name
|
||||
if len(info.fieldName) >= 2 && info.fieldName[0] == '`' && info.fieldName[len(info.fieldName)-1] == '`' {
|
||||
@@ -398,7 +423,32 @@ func (s *Stream) processSingleFieldFallback(fieldSpec string, dataMap map[string
|
||||
}
|
||||
|
||||
// Handle alias
|
||||
parts := strings.Split(fieldSpec, ":")
|
||||
var parts []string
|
||||
// Helper to split field spec considering quotes
|
||||
splitFieldSpec := func(spec string) []string {
|
||||
inQuote := false
|
||||
var quoteChar byte
|
||||
for i := 0; i < len(spec); i++ {
|
||||
c := spec[i]
|
||||
if inQuote {
|
||||
if c == quoteChar {
|
||||
inQuote = false
|
||||
}
|
||||
} else {
|
||||
if c == '\'' || c == '"' || c == '`' {
|
||||
inQuote = true
|
||||
quoteChar = c
|
||||
} else if c == ':' {
|
||||
// Found separator
|
||||
return []string{spec[:i], spec[i+1:]}
|
||||
}
|
||||
}
|
||||
}
|
||||
// No separator found outside quotes
|
||||
return []string{spec}
|
||||
}
|
||||
|
||||
parts = splitFieldSpec(fieldSpec)
|
||||
fieldName := parts[0]
|
||||
outputName := fieldName
|
||||
if len(parts) > 1 {
|
||||
|
||||
+38
-4
@@ -66,11 +66,23 @@ func NewBlockingStrategy() *BlockingStrategy {
|
||||
|
||||
// ProcessData implements blocking mode data processing
|
||||
func (bs *BlockingStrategy) ProcessData(data map[string]interface{}) {
|
||||
// Check if stream is stopped
|
||||
if atomic.LoadInt32(&bs.stream.stopped) == 1 {
|
||||
return
|
||||
}
|
||||
|
||||
if bs.stream.blockingTimeout <= 0 {
|
||||
// No timeout limit, block permanently until success
|
||||
dataChan := bs.stream.safeGetDataChan()
|
||||
dataChan <- data
|
||||
return
|
||||
if dataChan == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case dataChan <- data:
|
||||
return
|
||||
case <-bs.stream.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Blocking with timeout
|
||||
@@ -78,6 +90,10 @@ func (bs *BlockingStrategy) ProcessData(data map[string]interface{}) {
|
||||
defer timer.Stop()
|
||||
|
||||
dataChan := bs.stream.safeGetDataChan()
|
||||
if dataChan == nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case dataChan <- data:
|
||||
// Successfully added data
|
||||
@@ -87,7 +103,17 @@ func (bs *BlockingStrategy) ProcessData(data map[string]interface{}) {
|
||||
logger.Error("Data addition timeout, but continue waiting to avoid data loss")
|
||||
// Continue blocking indefinitely, re-get current channel reference
|
||||
finalDataChan := bs.stream.safeGetDataChan()
|
||||
finalDataChan <- data
|
||||
if finalDataChan == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case finalDataChan <- data:
|
||||
return
|
||||
case <-bs.stream.done:
|
||||
return
|
||||
}
|
||||
case <-bs.stream.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,7 +161,15 @@ func (es *ExpansionStrategy) ProcessData(data map[string]interface{}) {
|
||||
|
||||
// If still full after expansion, block and wait
|
||||
dataChan := es.stream.safeGetDataChan()
|
||||
dataChan <- data
|
||||
if dataChan == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case dataChan <- data:
|
||||
return
|
||||
case <-es.stream.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// GetStrategyName gets strategy name
|
||||
|
||||
+15
-1
@@ -61,7 +61,8 @@ type Stream struct {
|
||||
aggregator aggregator.Aggregator
|
||||
config types.Config
|
||||
sinks []func([]map[string]interface{})
|
||||
resultChan chan []map[string]interface{} // Result channel
|
||||
syncSinks []func([]map[string]interface{}) // Synchronous sinks, executed sequentially
|
||||
resultChan chan []map[string]interface{} // Result channel
|
||||
seenResults *sync.Map
|
||||
done chan struct{} // Used to close processing goroutines
|
||||
sinkWorkerPool chan func() // Sink worker pool to avoid blocking
|
||||
@@ -220,6 +221,19 @@ func (s *Stream) Stop() {
|
||||
|
||||
close(s.done)
|
||||
|
||||
// Stop window operations first to prevent new window triggers
|
||||
if s.Window != nil {
|
||||
s.Window.Stop()
|
||||
}
|
||||
|
||||
// Close dataChan to signal DataProcessor to exit
|
||||
s.dataChanMux.Lock()
|
||||
if s.dataChan != nil {
|
||||
close(s.dataChan)
|
||||
s.dataChan = nil // Set to nil to prevent sending to closed channel
|
||||
}
|
||||
s.dataChanMux.Unlock()
|
||||
|
||||
// Stop and clean up data processing strategy resources
|
||||
if s.dataStrategy != nil {
|
||||
if err := s.dataStrategy.Stop(); err != nil {
|
||||
|
||||
@@ -99,11 +99,8 @@ func (sf *StreamFactory) createStreamWithUnifiedConfig(config types.Config) (*St
|
||||
func (sf *StreamFactory) createWindow(config types.Config) (window.Window, error) {
|
||||
// Pass unified performance configuration to window
|
||||
windowConfig := config.WindowConfig
|
||||
if windowConfig.Params == nil {
|
||||
windowConfig.Params = make(map[string]interface{})
|
||||
}
|
||||
// Pass complete performance configuration to window
|
||||
windowConfig.Params[PerformanceConfigKey] = config.PerformanceConfig
|
||||
// Set performance configuration directly
|
||||
windowConfig.PerformanceConfig = config.PerformanceConfig
|
||||
|
||||
return window.CreateWindow(windowConfig)
|
||||
}
|
||||
@@ -179,5 +176,4 @@ func (sf *StreamFactory) validatePerformanceConfig(config types.PerformanceConfi
|
||||
// startWorkerRoutines starts worker goroutines
|
||||
func (sf *StreamFactory) startWorkerRoutines(stream *Stream, perfConfig types.PerformanceConfig) {
|
||||
go stream.startSinkWorkerPool(perfConfig.WorkerConfig.SinkWorkerCount)
|
||||
go stream.startResultConsumer()
|
||||
}
|
||||
|
||||
+26
-47
@@ -53,7 +53,7 @@ func TestStreamBasicOperations(t *testing.T) {
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
TimeUnit: 1000,
|
||||
Params: map[string]interface{}{"size": 1 * time.Second},
|
||||
Params: []interface{}{1 * time.Second},
|
||||
},
|
||||
},
|
||||
testFunc: "withWindow",
|
||||
@@ -146,7 +146,7 @@ func TestStreamBasicFunctionality(t *testing.T) {
|
||||
config: types.Config{
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 500 * time.Millisecond},
|
||||
Params: []interface{}{500 * time.Millisecond},
|
||||
},
|
||||
GroupFields: []string{"device"},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
@@ -170,7 +170,7 @@ func TestStreamBasicFunctionality(t *testing.T) {
|
||||
config: types.Config{
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 500 * time.Millisecond},
|
||||
Params: []interface{}{500 * time.Millisecond},
|
||||
},
|
||||
GroupFields: []string{"device"},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
@@ -255,7 +255,7 @@ func TestStreamWithoutFilter(t *testing.T) {
|
||||
config := types.Config{
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "sliding",
|
||||
Params: map[string]interface{}{"size": 2 * time.Second, "slide": 1 * time.Second},
|
||||
Params: []interface{}{2 * time.Second, 1 * time.Second},
|
||||
},
|
||||
GroupFields: []string{"device"},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
@@ -510,10 +510,8 @@ func TestStreamAggregationQuery(t *testing.T) {
|
||||
},
|
||||
NeedWindow: true,
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{
|
||||
"size": "5s",
|
||||
},
|
||||
Type: "tumbling",
|
||||
Params: []interface{}{5 * time.Second},
|
||||
},
|
||||
}
|
||||
stream, err := NewStream(config)
|
||||
@@ -730,7 +728,7 @@ func TestStreamWithWindowAndAggregation(t *testing.T) {
|
||||
SimpleFields: []string{"name", "age"},
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 100 * time.Millisecond},
|
||||
Params: []interface{}{100 * time.Millisecond},
|
||||
},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"avg_age": aggregator.Avg,
|
||||
@@ -1214,10 +1212,8 @@ func TestStreamWindowEdgeCasesEnhanced(t *testing.T) {
|
||||
config: func() types.Config {
|
||||
c := types.NewConfig()
|
||||
c.WindowConfig = types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{
|
||||
"size": 1 * time.Nanosecond, // 极小时间窗口
|
||||
},
|
||||
Type: "tumbling",
|
||||
Params: []interface{}{1 * time.Nanosecond}, // 极小时间窗口
|
||||
TimeUnit: 1 * time.Nanosecond,
|
||||
}
|
||||
c.NeedWindow = true
|
||||
@@ -1230,10 +1226,8 @@ func TestStreamWindowEdgeCasesEnhanced(t *testing.T) {
|
||||
name: "极大时间窗口",
|
||||
config: types.Config{
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{
|
||||
"size": 8760 * time.Hour, // 1年
|
||||
},
|
||||
Type: "tumbling",
|
||||
Params: []interface{}{8760 * time.Hour}, // 1年
|
||||
TimeUnit: 8760 * time.Hour,
|
||||
},
|
||||
NeedWindow: true,
|
||||
@@ -1245,11 +1239,8 @@ func TestStreamWindowEdgeCasesEnhanced(t *testing.T) {
|
||||
name: "滑动窗口零滑动",
|
||||
config: types.Config{
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "sliding",
|
||||
Params: map[string]interface{}{
|
||||
"size": 1 * time.Second,
|
||||
"slide": 1 * time.Millisecond, // 很小的滑动间隔
|
||||
},
|
||||
Type: "sliding",
|
||||
Params: []interface{}{1 * time.Second, 1 * time.Millisecond}, // 很小的滑动间隔
|
||||
TimeUnit: 1 * time.Second,
|
||||
},
|
||||
NeedWindow: true,
|
||||
@@ -1298,10 +1289,8 @@ func TestStreamUnifiedConfigIntegration(t *testing.T) {
|
||||
config := types.Config{
|
||||
NeedWindow: true,
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{
|
||||
"size": "5s",
|
||||
},
|
||||
Type: "tumbling",
|
||||
Params: []interface{}{5 * time.Second},
|
||||
},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"value": aggregator.Count,
|
||||
@@ -1338,10 +1327,8 @@ func TestStreamUnifiedConfigPerformanceImpact(t *testing.T) {
|
||||
config := types.Config{
|
||||
NeedWindow: true,
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{
|
||||
"size": "1s",
|
||||
},
|
||||
Type: "tumbling",
|
||||
Params: []interface{}{time.Second},
|
||||
},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"value": aggregator.Sum,
|
||||
@@ -1410,10 +1397,8 @@ func TestStreamUnifiedConfigErrorHandling(t *testing.T) {
|
||||
config: types.Config{
|
||||
NeedWindow: true,
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "invalid_window_type",
|
||||
Params: map[string]interface{}{
|
||||
"size": "5s",
|
||||
},
|
||||
Type: "invalid_window_type",
|
||||
Params: []interface{}{5 * time.Second},
|
||||
},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"value": aggregator.Count,
|
||||
@@ -1429,7 +1414,7 @@ func TestStreamUnifiedConfigErrorHandling(t *testing.T) {
|
||||
NeedWindow: true,
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{},
|
||||
Params: []interface{}{},
|
||||
},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"value": aggregator.Count,
|
||||
@@ -1444,10 +1429,8 @@ func TestStreamUnifiedConfigErrorHandling(t *testing.T) {
|
||||
config: types.Config{
|
||||
NeedWindow: true,
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{
|
||||
"size": "5s",
|
||||
},
|
||||
Type: "tumbling",
|
||||
Params: []interface{}{5 * time.Second},
|
||||
},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
"value": aggregator.Count,
|
||||
@@ -1802,10 +1785,8 @@ func TestStreamFactory_CreateStreamWithWindow(t *testing.T) {
|
||||
SimpleFields: []string{"name", "age"},
|
||||
NeedWindow: true,
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{
|
||||
"size": "5s",
|
||||
},
|
||||
Type: "tumbling",
|
||||
Params: []interface{}{5 * time.Second},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1843,10 +1824,8 @@ func TestStreamFactory_CreateWindow(t *testing.T) {
|
||||
factory := NewStreamFactory()
|
||||
config := types.Config{
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{
|
||||
"size": "5s",
|
||||
},
|
||||
Type: "tumbling",
|
||||
Params: []interface{}{5 * time.Second},
|
||||
},
|
||||
PerformanceConfig: types.DefaultPerformanceConfig(),
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ func TestWindowSlotAggregation(t *testing.T) {
|
||||
config := types.Config{
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "sliding",
|
||||
Params: map[string]interface{}{"size": 2 * time.Second, "slide": 1 * time.Second},
|
||||
Params: []interface{}{2 * time.Second, 1 * time.Second},
|
||||
TsProp: "ts",
|
||||
},
|
||||
GroupFields: []string{"device"},
|
||||
@@ -103,44 +103,37 @@ func TestWindowTypes(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
windowType string
|
||||
windowParams map[string]interface{}
|
||||
windowParams []interface{}
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "Tumbling Window",
|
||||
windowType: "tumbling",
|
||||
windowParams: map[string]interface{}{
|
||||
"size": "5s",
|
||||
},
|
||||
expectError: false,
|
||||
name: "Tumbling Window",
|
||||
windowType: "tumbling",
|
||||
windowParams: []interface{}{5 * time.Second},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "Sliding Window",
|
||||
windowType: "sliding",
|
||||
windowParams: map[string]interface{}{
|
||||
"size": "10s",
|
||||
"slide": "5s",
|
||||
},
|
||||
expectError: false,
|
||||
name: "Sliding Window",
|
||||
windowType: "sliding",
|
||||
windowParams: []interface{}{10 * time.Second, 5 * time.Second},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "Session Window",
|
||||
windowType: "session",
|
||||
windowParams: map[string]interface{}{
|
||||
"timeout": "30s",
|
||||
},
|
||||
expectError: false,
|
||||
name: "Session Window",
|
||||
windowType: "session",
|
||||
windowParams: []interface{}{30 * time.Second},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "Invalid Window Type",
|
||||
windowType: "invalid_window_type",
|
||||
windowParams: map[string]interface{}{"size": "5s"},
|
||||
windowParams: []interface{}{5 * time.Second},
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "Missing Size Parameter",
|
||||
windowType: "tumbling",
|
||||
windowParams: map[string]interface{}{},
|
||||
windowParams: []interface{}{},
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
@@ -195,7 +188,7 @@ func TestAggregationTypes(t *testing.T) {
|
||||
config := types.Config{
|
||||
WindowConfig: types.WindowConfig{
|
||||
Type: "tumbling",
|
||||
Params: map[string]interface{}{"size": 500 * time.Millisecond},
|
||||
Params: []interface{}{500 * time.Millisecond},
|
||||
},
|
||||
GroupFields: []string{"group"},
|
||||
SelectFields: map[string]aggregator.AggregateType{
|
||||
|
||||
@@ -338,6 +338,20 @@ func (s *Streamsql) AddSink(sink func([]map[string]interface{})) {
|
||||
}
|
||||
}
|
||||
|
||||
// AddSyncSink directly adds synchronous result processing callback functions.
|
||||
// Convenience wrapper for Stream().AddSyncSink() for cleaner API calls.
|
||||
//
|
||||
// Parameters:
|
||||
// - sink: Result processing function, receives []map[string]interface{} type result data
|
||||
//
|
||||
// Note: Sync sinks are executed sequentially in the result processing goroutine.
|
||||
// Use this when order of execution matters.
|
||||
func (s *Streamsql) AddSyncSink(sink func([]map[string]interface{})) {
|
||||
if s.stream != nil {
|
||||
s.stream.AddSyncSink(sink)
|
||||
}
|
||||
}
|
||||
|
||||
// PrintTable prints results to console in table format, similar to database output.
|
||||
// Displays column names first, then data rows.
|
||||
//
|
||||
|
||||
+44
-53
@@ -61,13 +61,14 @@ func TestCaseExpressionInSQL(t *testing.T) {
|
||||
|
||||
// TestCaseExpressionInAggregation 测试CASE表达式在聚合查询中的使用
|
||||
func TestCaseExpressionInAggregation(t *testing.T) {
|
||||
// 使用处理时间窗口,避免需要推进watermark的复杂性
|
||||
// 这个测试主要验证CASE表达式在聚合函数中的使用,而不是事件时间窗口
|
||||
sql := `SELECT deviceId,
|
||||
COUNT(*) as total_count,
|
||||
SUM(CASE WHEN temperature > 30 THEN 1 ELSE 0 END) as hot_count,
|
||||
AVG(CASE status WHEN 'active' THEN temperature ELSE 0 END) as avg_active_temp
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('1s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('1s')`
|
||||
|
||||
// 创建StreamSQL实例
|
||||
streamSQL := New()
|
||||
@@ -76,14 +77,13 @@ func TestCaseExpressionInAggregation(t *testing.T) {
|
||||
err := streamSQL.Execute(sql)
|
||||
assert.NoError(t, err, "执行SQL应该成功")
|
||||
|
||||
// 模拟数据
|
||||
baseTime := time.Now()
|
||||
// 模拟数据(不需要时间戳字段,因为使用处理时间窗口)
|
||||
testData := []map[string]interface{}{
|
||||
{"deviceId": "device1", "temperature": 35.0, "status": "active", "ts": baseTime},
|
||||
{"deviceId": "device1", "temperature": 25.0, "status": "inactive", "ts": baseTime},
|
||||
{"deviceId": "device1", "temperature": 32.0, "status": "active", "ts": baseTime},
|
||||
{"deviceId": "device2", "temperature": 28.0, "status": "active", "ts": baseTime},
|
||||
{"deviceId": "device2", "temperature": 22.0, "status": "inactive", "ts": baseTime},
|
||||
{"deviceId": "device1", "temperature": 35.0, "status": "active"},
|
||||
{"deviceId": "device1", "temperature": 25.0, "status": "inactive"},
|
||||
{"deviceId": "device1", "temperature": 32.0, "status": "active"},
|
||||
{"deviceId": "device2", "temperature": 28.0, "status": "active"},
|
||||
{"deviceId": "device2", "temperature": 22.0, "status": "inactive"},
|
||||
}
|
||||
|
||||
// 添加数据并获取结果
|
||||
@@ -208,12 +208,11 @@ func TestComplexCaseExpressionsInAggregation(t *testing.T) {
|
||||
WHEN temperature > 25 THEN 0.5
|
||||
ELSE 0 END) as complex_score
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('1s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`,
|
||||
GROUP BY deviceId, TumblingWindow('1s')`,
|
||||
data: []map[string]interface{}{
|
||||
{"deviceId": "device1", "temperature": 35.0, "humidity": 70.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 28.0, "humidity": 50.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 20.0, "humidity": 40.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 35.0, "humidity": 70.0},
|
||||
{"deviceId": "device1", "temperature": 28.0, "humidity": 50.0},
|
||||
{"deviceId": "device1", "temperature": 20.0, "humidity": 40.0},
|
||||
},
|
||||
description: "测试多条件CASE表达式在SUM聚合中的使用",
|
||||
},
|
||||
@@ -222,12 +221,11 @@ func TestComplexCaseExpressionsInAggregation(t *testing.T) {
|
||||
sql: `SELECT deviceId,
|
||||
AVG(CASE WHEN ABS(temperature - 25) < 5 THEN temperature ELSE 0 END) as normalized_avg
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('1s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`,
|
||||
GROUP BY deviceId, TumblingWindow('1s')`,
|
||||
data: []map[string]interface{}{
|
||||
{"deviceId": "device1", "temperature": 23.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 27.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 35.0, "ts": time.Now()}, // 这个会被排除
|
||||
{"deviceId": "device1", "temperature": 23.0},
|
||||
{"deviceId": "device1", "temperature": 27.0},
|
||||
{"deviceId": "device1", "temperature": 35.0}, // 这个会被排除
|
||||
},
|
||||
description: "测试带函数的CASE表达式在AVG聚合中的使用",
|
||||
},
|
||||
@@ -236,12 +234,11 @@ func TestComplexCaseExpressionsInAggregation(t *testing.T) {
|
||||
sql: `SELECT deviceId,
|
||||
COUNT(CASE WHEN temperature * 1.8 + 32 > 80 THEN 1 END) as fahrenheit_hot_count
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('1s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`,
|
||||
GROUP BY deviceId, TumblingWindow('1s')`,
|
||||
data: []map[string]interface{}{
|
||||
{"deviceId": "device1", "temperature": 25.0, "ts": time.Now()}, // 77F
|
||||
{"deviceId": "device1", "temperature": 30.0, "ts": time.Now()}, // 86F
|
||||
{"deviceId": "device1", "temperature": 35.0, "ts": time.Now()}, // 95F
|
||||
{"deviceId": "device1", "temperature": 25.0}, // 77F
|
||||
{"deviceId": "device1", "temperature": 30.0}, // 86F
|
||||
{"deviceId": "device1", "temperature": 35.0}, // 95F
|
||||
},
|
||||
description: "测试算术表达式CASE在COUNT聚合中的使用",
|
||||
},
|
||||
@@ -501,14 +498,13 @@ func TestCaseExpressionAggregated(t *testing.T) {
|
||||
COUNT(CASE WHEN temperature <= 25 THEN 1 END) as normal_temp_count,
|
||||
COUNT(*) as total_count
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('1s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`,
|
||||
GROUP BY deviceId, TumblingWindow('1s')`,
|
||||
testData: []map[string]interface{}{
|
||||
{"deviceId": "device1", "temperature": 30.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 20.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 35.0, "ts": time.Now()},
|
||||
{"deviceId": "device2", "temperature": 22.0, "ts": time.Now()},
|
||||
{"deviceId": "device2", "temperature": 28.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 30.0},
|
||||
{"deviceId": "device1", "temperature": 20.0},
|
||||
{"deviceId": "device1", "temperature": 35.0},
|
||||
{"deviceId": "device2", "temperature": 22.0},
|
||||
{"deviceId": "device2", "temperature": 28.0},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
@@ -524,12 +520,11 @@ func TestCaseExpressionAggregated(t *testing.T) {
|
||||
ELSE NULL
|
||||
END) as avg_high_humidity
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('1s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`,
|
||||
GROUP BY deviceId, TumblingWindow('1s')`,
|
||||
testData: []map[string]interface{}{
|
||||
{"deviceId": "device1", "temperature": 30.0, "humidity": 60.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 20.0, "humidity": 40.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 35.0, "humidity": 70.0, "ts": time.Now()},
|
||||
{"deviceId": "device1", "temperature": 30.0, "humidity": 60.0},
|
||||
{"deviceId": "device1", "temperature": 20.0, "humidity": 40.0},
|
||||
{"deviceId": "device1", "temperature": 35.0, "humidity": 70.0},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
@@ -762,8 +757,7 @@ func TestHavingWithCaseExpression(t *testing.T) {
|
||||
AVG(CASE WHEN temperature > 30 THEN temperature ELSE 0 END) as conditional_avg
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
HAVING conditional_avg > 25
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`,
|
||||
HAVING conditional_avg > 25`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
@@ -777,8 +771,7 @@ func TestHavingWithCaseExpression(t *testing.T) {
|
||||
END) as weighted_score
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
HAVING weighted_score > 3
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`,
|
||||
HAVING weighted_score > 3`,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
@@ -819,8 +812,7 @@ func TestHavingWithCaseExpressionFunctional(t *testing.T) {
|
||||
SUM(CASE WHEN temperature > 30 THEN 1 ELSE 0 END) as hot_count
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('2s')
|
||||
HAVING hot_count >= 2
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
HAVING hot_count >= 2`
|
||||
|
||||
// 创建StreamSQL实例
|
||||
streamSQL := New()
|
||||
@@ -830,23 +822,22 @@ func TestHavingWithCaseExpressionFunctional(t *testing.T) {
|
||||
assert.NoError(t, err, "执行SQL应该成功")
|
||||
|
||||
// 模拟数据
|
||||
baseTime := time.Now()
|
||||
testData := []map[string]interface{}{
|
||||
// device1: 3条高温记录,应该通过HAVING条件
|
||||
{"deviceId": "device1", "temperature": 35.0, "ts": baseTime},
|
||||
{"deviceId": "device1", "temperature": 32.0, "ts": baseTime},
|
||||
{"deviceId": "device1", "temperature": 31.0, "ts": baseTime},
|
||||
{"deviceId": "device1", "temperature": 25.0, "ts": baseTime}, // 不是高温
|
||||
{"deviceId": "device1", "temperature": 35.0},
|
||||
{"deviceId": "device1", "temperature": 32.0},
|
||||
{"deviceId": "device1", "temperature": 31.0},
|
||||
{"deviceId": "device1", "temperature": 25.0}, // 不是高温
|
||||
|
||||
// device2: 1条高温记录,不应该通过HAVING条件
|
||||
{"deviceId": "device2", "temperature": 33.0, "ts": baseTime},
|
||||
{"deviceId": "device2", "temperature": 28.0, "ts": baseTime},
|
||||
{"deviceId": "device2", "temperature": 26.0, "ts": baseTime},
|
||||
{"deviceId": "device2", "temperature": 33.0},
|
||||
{"deviceId": "device2", "temperature": 28.0},
|
||||
{"deviceId": "device2", "temperature": 26.0},
|
||||
|
||||
// device3: 2条高温记录,应该通过HAVING条件
|
||||
{"deviceId": "device3", "temperature": 34.0, "ts": baseTime},
|
||||
{"deviceId": "device3", "temperature": 31.0, "ts": baseTime},
|
||||
{"deviceId": "device3", "temperature": 29.0, "ts": baseTime},
|
||||
{"deviceId": "device3", "temperature": 34.0},
|
||||
{"deviceId": "device3", "temperature": 31.0},
|
||||
{"deviceId": "device3", "temperature": 29.0},
|
||||
}
|
||||
|
||||
// 添加数据并获取结果
|
||||
|
||||
@@ -0,0 +1,161 @@
|
||||
package streamsql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSQLCountingWindow_GroupByDevice(t *testing.T) {
|
||||
ssql := New()
|
||||
defer ssql.Stop()
|
||||
|
||||
sql := `
|
||||
SELECT deviceId,
|
||||
COUNT(*) as cnt
|
||||
FROM stream
|
||||
GROUP BY deviceId, CountingWindow(10)
|
||||
`
|
||||
err := ssql.Execute(sql)
|
||||
require.NoError(t, err)
|
||||
|
||||
ch := make(chan []map[string]interface{}, 4)
|
||||
ssql.AddSink(func(results []map[string]interface{}) {
|
||||
ch <- results
|
||||
})
|
||||
|
||||
for i := 0; i < 30; i++ {
|
||||
ssql.Emit(map[string]interface{}{
|
||||
"deviceId": "sensor001",
|
||||
"temperature": i,
|
||||
"timestamp": time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
// Expect 3 batches, each with one row for deviceId=sensor001
|
||||
for batch := 0; batch < 3; batch++ {
|
||||
select {
|
||||
case res := <-ch:
|
||||
require.Len(t, res, 1)
|
||||
row := res[0]
|
||||
assert.Equal(t, "sensor001", row["deviceId"])
|
||||
assert.Equal(t, float64(10), row["cnt"])
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("timeout waiting for batch %d", batch+1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSQLCountingWindow_GroupedCounting_MixedDevices(t *testing.T) {
|
||||
ssql := New()
|
||||
defer ssql.Stop()
|
||||
|
||||
sql := `
|
||||
SELECT deviceId,
|
||||
AVG(temperature) as avg_temp
|
||||
FROM stream
|
||||
GROUP BY deviceId, CountingWindow(10)
|
||||
`
|
||||
err := ssql.Execute(sql)
|
||||
require.NoError(t, err)
|
||||
|
||||
ch := make(chan []map[string]interface{}, 8)
|
||||
ssql.AddSink(func(results []map[string]interface{}) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// channel 已关闭,忽略错误
|
||||
}
|
||||
}()
|
||||
ch <- results
|
||||
})
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "A", "temperature": i, "timestamp": time.Now()})
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "B", "temperature": i, "timestamp": time.Now()})
|
||||
}
|
||||
|
||||
ids := make(map[string]bool)
|
||||
for k := 0; k < 2; k++ {
|
||||
select {
|
||||
case res := <-ch:
|
||||
require.Len(t, res, 1)
|
||||
id := res[0]["deviceId"].(string)
|
||||
ids[id] = true
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
}
|
||||
assert.True(t, ids["A"])
|
||||
assert.True(t, ids["B"])
|
||||
}
|
||||
|
||||
func TestSQLCountingWindow_MultiKeyGroupedCounting(t *testing.T) {
|
||||
ssql := New()
|
||||
defer ssql.Stop()
|
||||
|
||||
sql := `
|
||||
SELECT deviceId, region,
|
||||
COUNT(*) as cnt,
|
||||
AVG(temperature) as avg_temp,
|
||||
MIN(temperature) as min_temp
|
||||
FROM stream
|
||||
GROUP BY deviceId, region, CountingWindow(5)
|
||||
`
|
||||
err := ssql.Execute(sql)
|
||||
require.NoError(t, err)
|
||||
|
||||
ch := make(chan []map[string]interface{}, 8)
|
||||
ssql.AddSink(func(results []map[string]interface{}) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// channel 已关闭,忽略错误
|
||||
}
|
||||
}()
|
||||
ch <- results
|
||||
})
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "A", "region": "R1", "temperature": i, "timestamp": time.Now()})
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "B", "region": "R1", "temperature": i + 10, "timestamp": time.Now()})
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "A", "region": "R2", "temperature": i + 20, "timestamp": time.Now()})
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "B", "region": "R2", "temperature": i + 30, "timestamp": time.Now()})
|
||||
}
|
||||
|
||||
type agg struct {
|
||||
cnt float64
|
||||
avg float64
|
||||
min float64
|
||||
}
|
||||
got := make(map[string]agg)
|
||||
for k := 0; k < 4; k++ {
|
||||
select {
|
||||
case res := <-ch:
|
||||
require.Len(t, res, 1)
|
||||
id := res[0]["deviceId"].(string)
|
||||
region := res[0]["region"].(string)
|
||||
cnt := res[0]["cnt"].(float64)
|
||||
avg := res[0]["avg_temp"].(float64)
|
||||
min := res[0]["min_temp"].(float64)
|
||||
got[id+"|"+region] = agg{cnt: cnt, avg: avg, min: min}
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
}
|
||||
// Expect 4 combinations all counted to 5, with known avg/min
|
||||
assert.Equal(t, float64(5), got["A|R1"].cnt)
|
||||
assert.Equal(t, float64(5), got["B|R1"].cnt)
|
||||
assert.Equal(t, float64(5), got["A|R2"].cnt)
|
||||
assert.Equal(t, float64(5), got["B|R2"].cnt)
|
||||
|
||||
assert.InEpsilon(t, 2.0, got["A|R1"].avg, 0.0001)
|
||||
assert.InEpsilon(t, 12.0, got["B|R1"].avg, 0.0001)
|
||||
assert.InEpsilon(t, 22.0, got["A|R2"].avg, 0.0001)
|
||||
assert.InEpsilon(t, 32.0, got["B|R2"].avg, 0.0001)
|
||||
|
||||
assert.Equal(t, 0.0, got["A|R1"].min)
|
||||
assert.InEpsilon(t, 10.0, got["B|R1"].min, 0.0001)
|
||||
assert.InEpsilon(t, 20.0, got["A|R2"].min, 0.0001)
|
||||
assert.InEpsilon(t, 30.0, got["B|R2"].min, 0.0001)
|
||||
}
|
||||
@@ -19,8 +19,19 @@ func createTestEnvironment(t *testing.T, rsql string) (*Streamsql, chan interfac
|
||||
require.NoError(t, err)
|
||||
|
||||
resultChan := make(chan interface{}, 10)
|
||||
t.Cleanup(func() { close(resultChan) })
|
||||
|
||||
ssql.AddSink(func(result []map[string]interface{}) {
|
||||
resultChan <- result
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// channel 已关闭,忽略错误
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case resultChan <- result:
|
||||
default:
|
||||
// 非阻塞发送
|
||||
}
|
||||
})
|
||||
|
||||
return ssql, resultChan
|
||||
@@ -35,20 +46,27 @@ func sendDataAndCollectResults(t *testing.T, ssql *Streamsql, resultChan chan in
|
||||
// 等待窗口触发
|
||||
time.Sleep(time.Duration(windowSizeSeconds+1) * time.Second)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
// 使用更严格的超时机制
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var results []map[string]interface{}
|
||||
maxIterations := 10 // 最多收集10次结果
|
||||
iteration := 0
|
||||
|
||||
collecting:
|
||||
for {
|
||||
for iteration < maxIterations {
|
||||
select {
|
||||
case result := <-resultChan:
|
||||
if resultSlice, ok := result.([]map[string]interface{}); ok {
|
||||
results = append(results, resultSlice...)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
iteration++
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
// 500ms 没有新结果,退出
|
||||
break collecting
|
||||
case <-ctx.Done():
|
||||
// 超时退出
|
||||
break collecting
|
||||
}
|
||||
}
|
||||
@@ -68,16 +86,14 @@ func TestPostAggregationExpressions(t *testing.T) {
|
||||
(SUM(value) / COUNT(*)) as calcAvg,
|
||||
(SUM(value) + AVG(value)) as sumPlusAvg
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
ssql, resultChan := createTestEnvironment(t, rsql)
|
||||
|
||||
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
testData := []map[string]interface{}{
|
||||
{"deviceId": "dev1", "value": 10.0, "ts": baseTime},
|
||||
{"deviceId": "dev1", "value": 20.0, "ts": baseTime.Add(1 * time.Second)},
|
||||
{"deviceId": "dev1", "value": 30.0, "ts": baseTime.Add(2 * time.Second)},
|
||||
{"deviceId": "dev1", "value": 10.0},
|
||||
{"deviceId": "dev1", "value": 20.0},
|
||||
{"deviceId": "dev1", "value": 30.0},
|
||||
}
|
||||
|
||||
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
|
||||
@@ -102,17 +118,15 @@ func TestPostAggregationExpressions(t *testing.T) {
|
||||
IF_NULL(LAST_VALUE(value), 0) as lastOrZero,
|
||||
IF_NULL(AVG(value), 0) as avgOrZero
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
ssql, resultChan := createTestEnvironment(t, rsql)
|
||||
|
||||
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
testData := []map[string]interface{}{
|
||||
{"deviceId": "sensor1", "value": nil, "ts": baseTime},
|
||||
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime.Add(1 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": nil, "ts": baseTime.Add(2 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(3 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": nil},
|
||||
{"deviceId": "sensor1", "value": 10.0},
|
||||
{"deviceId": "sensor1", "value": nil},
|
||||
{"deviceId": "sensor1", "value": 30.0},
|
||||
}
|
||||
|
||||
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
|
||||
@@ -136,17 +150,15 @@ func TestPostAggregationExpressions(t *testing.T) {
|
||||
MAX(IF_NULL(value, 0)) as maxVal,
|
||||
MIN(IF_NULL(value, 0)) as minVal
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
ssql, resultChan := createTestEnvironment(t, rsql)
|
||||
|
||||
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
testData := []map[string]interface{}{
|
||||
{"deviceId": "sensor1", "value": nil, "ts": baseTime},
|
||||
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime.Add(1 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": nil, "ts": baseTime.Add(2 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(3 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": nil},
|
||||
{"deviceId": "sensor1", "value": 10.0},
|
||||
{"deviceId": "sensor1", "value": nil},
|
||||
{"deviceId": "sensor1", "value": 30.0},
|
||||
}
|
||||
|
||||
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
|
||||
@@ -172,16 +184,14 @@ func TestPostAggregationExpressions(t *testing.T) {
|
||||
(SUM(value) + LATEST(value)) as totalPlusLatest,
|
||||
(AVG(value) * LATEST(value)) as avgTimesLatest
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
ssql, resultChan := createTestEnvironment(t, rsql)
|
||||
|
||||
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
testData := []map[string]interface{}{
|
||||
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime},
|
||||
{"deviceId": "sensor1", "value": 20.0, "ts": baseTime.Add(1 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(2 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 10.0},
|
||||
{"deviceId": "sensor1", "value": 20.0},
|
||||
{"deviceId": "sensor1", "value": 30.0},
|
||||
}
|
||||
|
||||
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
|
||||
@@ -210,17 +220,15 @@ func TestPostAggregationExpressions(t *testing.T) {
|
||||
CEIL((AVG(value) / COUNT(*))) as ceilResult,
|
||||
ROUND((SUM(value) * AVG(value) / 1000), 2) as roundResult
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
ssql, resultChan := createTestEnvironment(t, rsql)
|
||||
|
||||
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
testData := []map[string]interface{}{
|
||||
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime},
|
||||
{"deviceId": "sensor1", "value": 20.0, "ts": baseTime.Add(1 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(2 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 40.0, "ts": baseTime.Add(3 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 10.0},
|
||||
{"deviceId": "sensor1", "value": 20.0},
|
||||
{"deviceId": "sensor1", "value": 30.0},
|
||||
{"deviceId": "sensor1", "value": 40.0},
|
||||
}
|
||||
|
||||
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
|
||||
@@ -275,20 +283,18 @@ func TestPostAggregationExpressions(t *testing.T) {
|
||||
window_start() as start,
|
||||
window_end() as end
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
ssql, resultChan := createTestEnvironment(t, rsql)
|
||||
|
||||
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
testData := []map[string]interface{}{
|
||||
// 设备1的数据
|
||||
{"deviceId": "meter001", "displayNum": 100.0, "ts": baseTime},
|
||||
{"deviceId": "meter001", "displayNum": 115.0, "ts": baseTime.Add(3 * time.Second)},
|
||||
{"deviceId": "meter001", "displayNum": 100.0},
|
||||
{"deviceId": "meter001", "displayNum": 115.0},
|
||||
|
||||
// 设备2的数据
|
||||
{"deviceId": "meter002", "displayNum": 200.0, "ts": baseTime.Add(1 * time.Second)},
|
||||
{"deviceId": "meter002", "displayNum": 206.0, "ts": baseTime.Add(4 * time.Second)},
|
||||
{"deviceId": "meter002", "displayNum": 200.0},
|
||||
{"deviceId": "meter002", "displayNum": 206.0},
|
||||
}
|
||||
|
||||
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
|
||||
@@ -345,17 +351,15 @@ func TestPostAggregationExpressions(t *testing.T) {
|
||||
ROUND(SQRT(ABS(AVG(value) - MIN(value))), 2) as nestedMathFunc,
|
||||
UPPER(CONCAT('RESULT_', CAST(ROUND(SUM(value), 0) as STRING))) as nestedStrMathFunc
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
ssql, resultChan := createTestEnvironment(t, rsql)
|
||||
|
||||
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
testData := []map[string]interface{}{
|
||||
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime},
|
||||
{"deviceId": "sensor1", "value": 20.0, "ts": baseTime.Add(1 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(2 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 40.0, "ts": baseTime.Add(3 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 10.0},
|
||||
{"deviceId": "sensor1", "value": 20.0},
|
||||
{"deviceId": "sensor1", "value": 30.0},
|
||||
{"deviceId": "sensor1", "value": 40.0},
|
||||
}
|
||||
|
||||
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
|
||||
@@ -407,17 +411,15 @@ func TestPostAggregationExpressions(t *testing.T) {
|
||||
CEIL(AVG(FLOOR(SQRT(value)))) as tripleNested2,
|
||||
ABS(MIN(ROUND(value / 5, 2))) as tripleNested3
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
ssql, resultChan := createTestEnvironment(t, rsql)
|
||||
|
||||
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
testData := []map[string]interface{}{
|
||||
{"deviceId": "sensor1", "value": 16.0, "ts": baseTime},
|
||||
{"deviceId": "sensor1", "value": 25.0, "ts": baseTime.Add(1 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 36.0, "ts": baseTime.Add(2 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 49.0, "ts": baseTime.Add(3 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 16.0},
|
||||
{"deviceId": "sensor1", "value": 25.0},
|
||||
{"deviceId": "sensor1", "value": 36.0},
|
||||
{"deviceId": "sensor1", "value": 49.0},
|
||||
}
|
||||
|
||||
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
|
||||
@@ -481,17 +483,15 @@ func TestPostAggregationExpressions(t *testing.T) {
|
||||
(COUNT(*) * NTH_VALUE(value, 2)) as countTimesSecond,
|
||||
(SUM(value) + LEAD(value, 1)) as sumPlusLead
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
ssql, resultChan := createTestEnvironment(t, rsql)
|
||||
|
||||
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
testData := []map[string]interface{}{
|
||||
{"deviceId": "sensor1", "value": 10.0, "ts": baseTime},
|
||||
{"deviceId": "sensor1", "value": 20.0, "ts": baseTime.Add(1 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 30.0, "ts": baseTime.Add(2 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 40.0, "ts": baseTime.Add(3 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 10.0},
|
||||
{"deviceId": "sensor1", "value": 20.0},
|
||||
{"deviceId": "sensor1", "value": 30.0},
|
||||
{"deviceId": "sensor1", "value": 40.0},
|
||||
}
|
||||
|
||||
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
|
||||
@@ -515,17 +515,15 @@ func TestPostAggregationExpressions(t *testing.T) {
|
||||
NTH_VALUE(value, 3) as thirdValue,
|
||||
NTH_VALUE(value, 4) as fourthValue
|
||||
FROM stream
|
||||
GROUP BY deviceId, TumblingWindow('5s')
|
||||
WITH (TIMESTAMP='ts', TIMEUNIT='ss')`
|
||||
GROUP BY deviceId, TumblingWindow('5s')`
|
||||
|
||||
ssql, resultChan := createTestEnvironment(t, rsql)
|
||||
|
||||
baseTime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
testData := []map[string]interface{}{
|
||||
{"deviceId": "sensor1", "value": 100.0, "ts": baseTime},
|
||||
{"deviceId": "sensor1", "value": 200.0, "ts": baseTime.Add(1 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 300.0, "ts": baseTime.Add(2 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 400.0, "ts": baseTime.Add(3 * time.Second)},
|
||||
{"deviceId": "sensor1", "value": 100.0},
|
||||
{"deviceId": "sensor1", "value": 200.0},
|
||||
{"deviceId": "sensor1", "value": 300.0},
|
||||
{"deviceId": "sensor1", "value": 400.0},
|
||||
}
|
||||
|
||||
results := sendDataAndCollectResults(t, ssql, resultChan, testData, 5)
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user