33 Commits

Author SHA1 Message Date
rulego-team 52653b2143 ci:测试超时设置600s 2025-12-21 11:57:52 +08:00
Whki 2db23f5b99 Merge pull request #45 from rulego/dependabot/go_modules/github.com/expr-lang/expr-1.17.7
chore(deps): bump github.com/expr-lang/expr from 1.17.6 to 1.17.7
2025-12-20 16:24:18 +08:00
rulego-team c7cece15e2 feat:window输出结果增加溢出策略 2025-12-20 16:15:53 +08:00
rulego-team 09fe63102e fix:SessionWindow和CountingWindow bufferSize初始化 2025-12-20 11:14:37 +08:00
rulego-team b405a935b7 增加同步的获取结果的方法 2025-12-20 11:10:21 +08:00
dependabot[bot] 1c781979a6 chore(deps): bump github.com/expr-lang/expr from 1.17.6 to 1.17.7
Bumps [github.com/expr-lang/expr](https://github.com/expr-lang/expr) from 1.17.6 to 1.17.7.
- [Release notes](https://github.com/expr-lang/expr/releases)
- [Commits](https://github.com/expr-lang/expr/compare/v1.17.6...v1.17.7)

---
updated-dependencies:
- dependency-name: github.com/expr-lang/expr
  dependency-version: 1.17.7
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 02:14:17 +00:00
rulego-team 16778f2ac3 test:fix test fail 2025-12-16 17:40:23 +08:00
rulego-team ba2cdd5629 fix:修复有特殊符号出现多余字段 2025-12-16 11:50:34 +08:00
rulego-team 47b7e07c6b fix:修复特殊符号被认为非法 2025-12-16 11:34:30 +08:00
rulego-team 9739f213e9 fix:修复Trigger未清除过期数据问题 2025-12-16 11:15:33 +08:00
rulego-team 4528d1f1a6 fix(window): 修复事件时间窗口未触发问题并添加调试日志 2025-11-15 13:54:27 +08:00
Whki 6ce76d506c Merge pull request #43 from rulego/dev
Dev
2025-11-15 13:15:05 +08:00
rulego-team 8207e6ba8c fix:修复延迟数据处理逻辑 2025-11-15 13:06:06 +08:00
rulego-team b2c638671a ci:增加测试时间 2025-11-15 00:49:49 +08:00
rulego-team 1744502678 feat:增加事件时间窗口处理机制 2025-11-15 00:27:55 +08:00
Whki 8a6e41ab78 Merge pull request #42 from rulego/dev
Dev
2025-11-14 10:53:22 +08:00
rulego-team 14d9a0874b feat:优化stream停止 2025-11-14 10:46:42 +08:00
rulego-team 6f77dc5f7f fix:修复测试用例断言 2025-11-14 08:51:45 +08:00
Whki 2907d7e3a3 Merge pull request #41 from rulego/dev
Dev
2025-11-13 18:47:20 +08:00
rulego-team eb29ed921d fix:取消对齐时间 2025-11-13 18:46:30 +08:00
rulego-team 73f7843996 fix:第一个窗口后启动步长定时器 2025-11-13 18:46:29 +08:00
Whki 3ef10b86a9 Merge pull request #40 from rulego/dev
Dev
2025-11-13 11:18:08 +08:00
rulego-team e435ef2040 fix:修复测试错误 2025-11-13 11:11:49 +08:00
rulego-team c39f915808 refactor:重构窗口函数参数传递问题 2025-11-13 11:03:49 +08:00
rulego-team e05213267d chore:添加测试用例 2025-11-13 11:03:28 +08:00
rulego-team 5f05b0ef61 perf:优化阻塞问题 2025-11-13 11:02:53 +08:00
rulego-team 3662949105 fix:支持混合keys分组 2025-11-13 11:01:52 +08:00
rulego-team c86d22e06a fix:修复窗口对齐问题 2025-11-13 11:01:13 +08:00
rulego-team 645c233762 fix:根据group by分组计数 2025-11-13 10:58:10 +08:00
rulego-team 29ed63ea50 chore(ci):dev 分支增加自动化测试 2025-11-13 10:56:51 +08:00
rulego-team 5f7076de7b chore: add awesome badge 2025-10-08 12:01:10 +08:00
rulego-team 3464ff5f6e fix:unnest function to properly expand arrays into multiple rows in stream processing 2025-09-08 18:40:26 +08:00
rulego-team 1d9e2a3dab test: add test cases 2025-08-30 00:56:48 +08:00
60 changed files with 9614 additions and 1663 deletions
+4 -4
View File
@@ -2,9 +2,9 @@ name: CI
on:
push:
branches: [ main, master, develop ]
branches: [ main, dev ]
pull_request:
branches: [ main, master, develop ]
branches: [ main, dev ]
jobs:
test:
@@ -42,11 +42,11 @@ jobs:
- name: Run tests
if: matrix.go-version != '1.21'
run: go test -v -race -timeout 300s ./...
run: go test -v -race -timeout 600s ./...
- name: Run tests with coverage
if: matrix.go-version == '1.21'
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 300s ./...
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 600s ./...
- name: Upload coverage reports to Codecov
if: matrix.go-version == '1.21'
+1 -1
View File
@@ -32,7 +32,7 @@ jobs:
run: go mod download
- name: Run all tests
run: go test -v -race -timeout 300s ./...
run: go test -v -race -timeout 600s ./...
release:
name: Create Release
+94 -9
View File
@@ -4,6 +4,7 @@
[![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
[![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml)
[![codecov](https://codecov.io/gh/rulego/streamsql/graph/badge.svg?token=1CK1O5J1BI)](https://codecov.io/gh/rulego/streamsql)
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go?tab=readme-ov-file#stream-processing)
English| [简体中文](README_ZH.md)
@@ -150,7 +151,7 @@ func main() {
// Step 1: Create StreamSQL Instance
// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
ssql := streamsql.New()
defer ssql.Stop()
// Step 2: Define Stream SQL Query Statement
// This SQL statement showcases StreamSQL's core capabilities:
// - SELECT: Choose output fields and aggregation functions
@@ -266,8 +267,7 @@ func main() {
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
GROUP BY device.location, TumblingWindow('5s')`
err := ssql.Execute(rsql)
if err != nil {
@@ -334,6 +334,11 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
- **Characteristics**: The size of the window is not related to time but is divided based on the volume of data. It is suitable for segmenting data based on the amount of data.
- **Application Scenario**: In industrial IoT, an aggregation calculation is performed every time 100 device status data records are processed.
- **Session Window**
- **Definition**: A dynamic window based on data activity. When the interval between data exceeds a specified timeout, the current session ends and triggers the window.
- **Characteristics**: Window size changes dynamically, automatically dividing sessions based on data arrival intervals. When data arrives continuously, the session continues; when the data interval exceeds the timeout, the session ends and triggers the window.
- **Application Scenario**: In user behavior analysis, maintain a session when users operate continuously, and close the session and count operations within that session when users stop operating for more than 5 minutes.
### Stream
- **Definition**: A continuous sequence of data that is generated in an unbounded manner, typically from sensors, log systems, user behaviors, etc.
@@ -342,17 +347,97 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
### Time Semantics
- **Event Time**
- **Definition**: The actual time when the data occurred, usually represented by a timestamp generated by the data source.
StreamSQL supports two time concepts that determine how windows are divided and triggered:
- **Processing Time**
- **Definition**: The time when the data arrives at the processing system.
#### Event Time
- **Definition**: Event time refers to the actual time when data was generated, usually recorded in a field within the data itself (such as `event_time`, `timestamp`, `order_time`, etc.).
- **Characteristics**:
- Windows are divided based on timestamp field values in the data
- Even if data arrives late, it can be correctly counted into the corresponding window based on event time
- Uses Watermark mechanism to handle out-of-order and late data
- Results are accurate but may have delays (need to wait for late data)
- **Use Cases**:
- Scenarios requiring precise temporal analysis
- Scenarios where data may arrive out of order or delayed
- Historical data replay and analysis
- **Configuration**: Use `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` to specify the event time field
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
```
#### Processing Time
- **Definition**: Processing time refers to the time when data arrives at the StreamSQL processing system, i.e., the current time when the system receives the data.
- **Characteristics**:
- Windows are divided based on the time data arrives at the system (`time.Now()`)
- Regardless of the time field value in the data, it is counted into the current window based on arrival time
- Uses system clock (Timer) to trigger windows
- Low latency but results may be inaccurate (cannot handle out-of-order and late data)
- **Use Cases**:
- Real-time monitoring and alerting scenarios
- Scenarios with high latency requirements and relatively low accuracy requirements
- Scenarios where data arrives in order and delay is controllable
- **Configuration**: Default when `WITH (TIMESTAMP=...)` is not specified
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- No WITH clause specified, defaults to processing time
```
#### Event Time vs Processing Time Comparison
| Feature | Event Time | Processing Time |
|---------|------------|-----------------|
| **Time Source** | Timestamp field in data | System current time |
| **Window Division** | Based on event timestamp | Based on data arrival time |
| **Late Data Handling** | Supported (Watermark mechanism) | Not supported |
| **Out-of-Order Handling** | Supported (Watermark mechanism) | Not supported |
| **Result Accuracy** | Accurate | May be inaccurate |
| **Processing Latency** | Higher (need to wait for late data) | Low (real-time trigger) |
| **Configuration** | `WITH (TIMESTAMP='field')` | Default (no WITH clause) |
| **Use Cases** | Precise temporal analysis, historical replay | Real-time monitoring, low latency requirements |
#### Window Time
- **Window Start Time**
- **Definition**: The starting time point of the window based on event time. For example, for a sliding window based on event time, the window start time is the timestamp of the earliest event within the window.
- **Event Time Windows**: The starting time point of the window, aligned to window boundaries based on event time (e.g., aligned to minute or hour boundaries).
- **Processing Time Windows**: The starting time point of the window, based on the time data arrives at the system.
- **Example**: For an event-time-based tumbling window `TumblingWindow('5m')`, the window start time aligns to multiples of 5 minutes (e.g., 10:00, 10:05, 10:10).
- **Window End Time**
- **Definition**: The ending time point of the window based on event time. Typically, the window end time is the window start time plus the duration of the window. For example, if the duration of a sliding window is 1 minute, then the window end time is the window start time plus 1 minute.
- **Event Time Windows**: The ending time point of the window, usually the window start time plus the window duration. Windows trigger when `watermark >= window_end`.
- **Processing Time Windows**: The ending time point of the window, based on the time data arrives at the system plus the window duration. Windows trigger when the system clock reaches the end time.
- **Example**: For a tumbling window with a duration of 1 minute, if the window start time is 10:00, then the window end time is 10:01.
#### Watermark Mechanism (Event Time Windows Only)
- **Definition**: Watermark indicates "events with timestamps less than this time should not arrive anymore", used to determine when windows can trigger.
- **Calculation Formula**: `Watermark = max(event_time) - MaxOutOfOrderness`
- **Window Trigger Condition**: Windows trigger when `watermark >= window_end`
- **Configuration Parameters**:
- `MAXOUTOFORDERNESS`: Maximum allowed out-of-order time for tolerating data disorder (default: 0, no out-of-order allowed)
- `ALLOWEDLATENESS`: Time window can accept late data after triggering (default: 0, no late data accepted)
- `IDLETIMEOUT`: Timeout for advancing Watermark based on processing time when data source is idle (default: 0, disabled)
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- Tolerate 5 seconds of out-of-order
ALLOWEDLATENESS='2s', -- Accept 2 seconds of late data after window triggers
IDLETIMEOUT='5s' -- Advance watermark based on processing time after 5s of no data
)
```
## Contribution Guidelines
+96 -10
View File
@@ -4,6 +4,7 @@
[![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
[![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml)
[![codecov](https://codecov.io/gh/rulego/streamsql/graph/badge.svg?token=1CK1O5J1BI)](https://codecov.io/gh/rulego/streamsql)
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go?tab=readme-ov-file#stream-processing)
[English](README.md)| 简体中文
@@ -150,7 +151,7 @@ import (
func main() {
// 1. 创建StreamSQL实例 - 这是流式SQL处理引擎的入口
ssql := streamsql.New()
defer ssql.Stop()
// 2. 定义流式SQL查询语句
// 核心概念解析:
// - TumblingWindow('5s'): 滚动窗口每5秒创建一个新窗口窗口之间不重叠
@@ -284,8 +285,7 @@ func main() {
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
GROUP BY device.location, TumblingWindow('5s')`
err := ssql.Execute(rsql)
if err != nil {
@@ -346,6 +346,11 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
- **特点**:窗口的大小与时间无关,而是根据数据量来划分,适合对数据量进行分段处理。
- **应用场景**:在工业物联网中,每处理 100 条设备状态数据后进行一次聚合计算。
- **会话窗口Session Window**
- **定义**:基于数据活跃度的动态窗口,当数据之间的间隔超过指定的超时时间时,当前会话结束并触发窗口。
- **特点**:窗口大小动态变化,根据数据到达的间隔自动划分会话。当数据连续到达时,会话持续;当数据间隔超过超时时间时,会话结束并触发窗口。
- **应用场景**:在用户行为分析中,当用户连续操作时保持会话,当用户停止操作超过 5 分钟后关闭会话并统计该会话内的操作次数。
### 流Stream
- **定义**:流是数据的连续序列,数据以无界的方式产生,通常来自于传感器、日志系统、用户行为等。
@@ -354,16 +359,97 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
### 时间语义
- **事件时间Event Time**
- **定义**:数据实际发生的时间,通常由数据源生成的时间戳表示。
StreamSQL 支持两种时间概念,它们决定了窗口如何划分和触发:
#### 事件时间Event Time
- **定义**:事件时间是指数据实际产生的时间,通常记录在数据本身的某个字段中(如 `event_time``timestamp``order_time` 等)。
- **特点**
- 窗口基于数据中的时间戳字段值来划分
- 即使数据延迟到达,也能根据事件时间正确统计到对应的窗口
- 使用 Watermark 机制来处理乱序和延迟数据
- 结果准确,但可能有延迟(需要等待延迟数据)
- **使用场景**
- 需要精确时序分析的场景
- 数据可能乱序或延迟到达的场景
- 历史数据回放和分析
- **配置方法**:使用 `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` 指定事件时间字段
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
```
#### 处理时间Processing Time
- **定义**:处理时间是指数据到达 StreamSQL 处理系统的时间,即系统接收到数据时的当前时间。
- **特点**
- 窗口基于数据到达系统的时间(`time.Now()`)来划分
- 不管数据中的时间字段是什么值,都按到达时间统计到当前窗口
- 使用系统时钟Timer来触发窗口
- 延迟低,但结果可能不准确(无法处理乱序和延迟数据)
- **使用场景**
- 实时监控和告警场景
- 对延迟要求高,对准确性要求相对较低的场景
- 数据顺序到达且延迟可控的场景
- **配置方法**:不指定 `WITH (TIMESTAMP=...)` 时,默认使用处理时间
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- 不指定 WITH 子句,默认使用处理时间
```
#### 事件时间 vs 处理时间对比
| 特性 | 事件时间 (Event Time) | 处理时间 (Processing Time) |
|------|---------------------|-------------------------|
| **时间来源** | 数据中的时间戳字段 | 系统当前时间 |
| **窗口划分** | 基于事件时间戳 | 基于数据到达时间 |
| **延迟处理** | 支持Watermark机制 | 不支持 |
| **乱序处理** | 支持Watermark机制 | 不支持 |
| **结果准确性** | 准确 | 可能不准确 |
| **处理延迟** | 较高(需等待延迟数据) | 低(实时触发) |
| **配置方式** | `WITH (TIMESTAMP='field')` | 默认不指定WITH |
| **适用场景** | 精确时序分析、历史回放 | 实时监控、低延迟要求 |
#### 窗口时间
- **处理时间Processing Time**
- **定义**:数据到达处理系统的时间。
- **窗口开始时间Window Start Time**
- **定义**:基于事件时间,窗口的起始时间点。例如,对于一个基于事件时间的滑动窗口,窗口开始时间是窗口内最早事件的时间戳
- **事件时间窗口**:窗口的起始时间点,基于事件时间对齐到窗口边界(如对齐到分钟、小时的整点)
- **处理时间窗口**:窗口的起始时间点,基于数据到达系统的时间。
- **示例**:对于一个基于事件时间的滚动窗口 `TumblingWindow('5m')`窗口开始时间会对齐到5分钟的倍数如 10:00、10:05、10:10
- **窗口结束时间Window End Time**
- **定义**:基于事件时间,窗口的结束时间点通常窗口结束时间是窗口开始时间加上窗口的持续时间。
- 例如,一个滑动窗口的持续时间为 1 分钟,则窗口结束时间窗口开始时间加上 1 分钟
- **事件时间窗口**窗口的结束时间点通常是窗口开始时间加上窗口的持续时间。窗口在 `watermark >= window_end` 时触发。
- **处理时间窗口**:窗口的结束时间点,基于数据到达系统的时间加上窗口持续时间窗口在系统时钟到达结束时间时触发
- **示例**:一个持续时间为 1 分钟的滚动窗口,如果窗口开始时间是 10:00则窗口结束时间是 10:01。
#### Watermark 机制(仅事件时间窗口)
- **定义**Watermark 表示"小于该时间的事件不应该再到达",用于判断窗口是否可以触发。
- **计算公式**`Watermark = max(event_time) - MaxOutOfOrderness`
- **窗口触发条件**:当 `watermark >= window_end` 时,窗口触发
- **配置参数**
- `MAXOUTOFORDERNESS`允许的最大乱序时间用于容忍数据乱序默认0不允许乱序
- `ALLOWEDLATENESS`窗口触发后还能接受延迟数据的时间默认0不接受延迟数据
- `IDLETIMEOUT`:数据源空闲时,基于处理时间推进 Watermark 的超时时间默认0禁用
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- 容忍5秒的乱序
ALLOWEDLATENESS='2s', -- 窗口触发后还能接受2秒的延迟数据
IDLETIMEOUT='5s' -- 5秒无数据基于处理时间推进watermark
)
```
## 贡献指南
+148
View File
@@ -0,0 +1,148 @@
package aggregator
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestPostAggregationPlaceholder 测试后聚合占位符的完整功能
func TestPostAggregationPlaceholder(t *testing.T) {
t.Run("测试PostAggregationPlaceholder基本功能", func(t *testing.T) {
// 创建PostAggregationPlaceholder实例
placeholder := &PostAggregationPlaceholder{}
require.NotNil(t, placeholder)
// 测试New方法
newPlaceholder := placeholder.New()
require.NotNil(t, newPlaceholder)
assert.IsType(t, &PostAggregationPlaceholder{}, newPlaceholder)
// 测试Add方法应该不做任何操作
placeholder.Add(10)
placeholder.Add("test")
placeholder.Add(nil)
placeholder.Add([]int{1, 2, 3})
// 测试Result方法应该返回nil
result := placeholder.Result()
assert.Nil(t, result)
})
t.Run("测试通过CreateBuiltinAggregator创建PostAggregationPlaceholder", func(t *testing.T) {
// 使用CreateBuiltinAggregator创建post_aggregation类型的聚合器
aggregator := CreateBuiltinAggregator(PostAggregation)
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
// 测试创建的聚合器功能
newAgg := aggregator.New()
require.NotNil(t, newAgg)
assert.IsType(t, &PostAggregationPlaceholder{}, newAgg)
// 测试添加各种类型的值
newAgg.Add(100)
newAgg.Add("string_value")
newAgg.Add(map[string]interface{}{"key": "value"})
// 验证结果始终为nil
result := newAgg.Result()
assert.Nil(t, result)
})
t.Run("测试PostAggregationPlaceholder的多实例独立性", func(t *testing.T) {
// 创建多个实例
placeholder1 := &PostAggregationPlaceholder{}
placeholder2 := placeholder1.New()
placeholder3 := placeholder1.New()
// 验证实例类型正确
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder1)
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder2)
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder3)
// 每个实例都应该返回nil
assert.Nil(t, placeholder1.Result())
assert.Nil(t, placeholder2.Result())
assert.Nil(t, placeholder3.Result())
// 验证Add操作不会影响结果因为是占位符
placeholder1.Add("test1")
placeholder2.Add("test2")
placeholder3.Add("test3")
assert.Nil(t, placeholder1.Result())
assert.Nil(t, placeholder2.Result())
assert.Nil(t, placeholder3.Result())
})
t.Run("测试PostAggregationPlaceholder在聚合场景中的使用", func(t *testing.T) {
// 创建包含PostAggregationPlaceholder的聚合字段
groupFields := []string{"category"}
aggFields := []AggregationField{
{InputField: "value", AggregateType: Sum, OutputAlias: "sum_value"},
{InputField: "placeholder_field", AggregateType: PostAggregation, OutputAlias: "post_agg_field"},
}
// 创建分组聚合器
agg := NewGroupAggregator(groupFields, aggFields)
require.NotNil(t, agg)
// 添加测试数据
testData := []map[string]interface{}{
{"category": "A", "value": 10, "placeholder_field": "should_be_ignored"},
{"category": "A", "value": 20, "placeholder_field": "also_ignored"},
{"category": "B", "value": 30, "placeholder_field": 999},
}
for _, data := range testData {
err := agg.Add(data)
assert.NoError(t, err)
}
// 获取结果
results, err := agg.GetResults()
assert.NoError(t, err)
assert.Len(t, results, 2)
// 验证PostAggregationPlaceholder字段的结果为nil
for _, result := range results {
assert.Contains(t, result, "post_agg_field")
assert.Nil(t, result["post_agg_field"])
// 验证正常聚合字段工作正常
assert.Contains(t, result, "sum_value")
assert.NotNil(t, result["sum_value"])
}
})
}
// TestCreateBuiltinAggregatorPostAggregation 测试CreateBuiltinAggregator对post_aggregation类型的处理
func TestCreateBuiltinAggregatorPostAggregation(t *testing.T) {
t.Run("测试post_aggregation类型聚合器创建", func(t *testing.T) {
aggregator := CreateBuiltinAggregator("post_aggregation")
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
})
t.Run("测试PostAggregation常量", func(t *testing.T) {
// 验证PostAggregation常量值
assert.Equal(t, AggregateType("post_aggregation"), PostAggregation)
// 使用常量创建聚合器
aggregator := CreateBuiltinAggregator(PostAggregation)
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
})
t.Run("测试与其他聚合类型的区别", func(t *testing.T) {
// 创建不同类型的聚合器
sumAgg := CreateBuiltinAggregator(Sum)
countAgg := CreateBuiltinAggregator(Count)
postAgg := CreateBuiltinAggregator(PostAggregation)
// 验证类型不同
assert.NotEqual(t, sumAgg, postAgg)
assert.NotEqual(t, countAgg, postAgg)
assert.IsType(t, &PostAggregationPlaceholder{}, postAgg)
})
}
+73
View File
@@ -26,6 +26,8 @@ integration with the RuleGo ecosystem.
• Lightweight design - Pure in-memory operations, no external dependencies
• SQL syntax support - Process stream data using familiar SQL syntax
• Multiple window types - Sliding, tumbling, counting, and session windows
• Event time and processing time - Support both time semantics for accurate stream processing
• Watermark mechanism - Handle out-of-order and late-arriving data with configurable tolerance
• Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
• Plugin-based custom functions - Runtime dynamic registration, supports 8 function types
• RuleGo ecosystem integration - Extend input/output sources using RuleGo components
@@ -107,6 +109,77 @@ StreamSQL supports multiple window types:
// Session window - Automatically closes session after 5-minute timeout
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
# Event Time vs Processing Time
StreamSQL supports two time semantics for window processing:
## Processing Time (Default)
Processing time uses the system clock when data arrives. Windows are triggered based on data arrival time:
// Processing time window (default)
SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m')
// Windows are triggered every 5 minutes based on when data arrives
## Event Time
Event time uses timestamps embedded in the data itself. Windows are triggered based on event timestamps,
allowing correct handling of out-of-order and late-arriving data:
// Event time window - Use 'order_time' field as event timestamp
SELECT COUNT(*) as order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP='order_time')
// Event time with integer timestamp (Unix milliseconds)
SELECT AVG(temperature) FROM stream
GROUP BY TumblingWindow('1m')
WITH (TIMESTAMP='event_time', TIMEUNIT='ms')
## Watermark and Late Data Handling
Event time windows use watermark mechanism to handle out-of-order and late data:
// Configure max out-of-orderness (tolerate 5 seconds of out-of-order data)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
MAXOUTOFORDERNESS='5s' // Watermark = max(event_time) - 5s
)
// Configure allowed lateness (accept late data for 2 seconds after window closes)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
ALLOWEDLATENESS='2s' // Window stays open for 2s after trigger
)
// Combine both configurations
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
MAXOUTOFORDERNESS='5s', // Tolerate 5s out-of-order before trigger
ALLOWEDLATENESS='2s' // Accept 2s late data after trigger
)
// Configure idle source mechanism (advance watermark based on processing time when data source is idle)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
IDLETIMEOUT='5s' // If no data arrives within 5s, watermark advances based on processing time
)
Key concepts:
• MaxOutOfOrderness: Affects watermark calculation, delays window trigger to tolerate out-of-order data
• AllowedLateness: Keeps window open after trigger to accept late data and update results
• IdleTimeout: When data source is idle (no data arrives within timeout), watermark advances based on processing time to ensure windows can close
• Watermark: Indicates that no events with timestamp less than watermark are expected
# Custom Functions
StreamSQL supports plugin-based custom functions with runtime dynamic registration:
+18
View File
@@ -131,7 +131,25 @@ func validateBasicSyntax(exprStr string) error {
}
// Check for invalid characters
inQuotes := false
var quoteChar rune
for i, ch := range trimmed {
// Handle quotes
if ch == '\'' || ch == '"' {
if !inQuotes {
inQuotes = true
quoteChar = ch
} else if ch == quoteChar {
inQuotes = false
}
}
// If inside quotes, skip character validation
if inQuotes {
continue
}
// Allowed characters: letters, numbers, operators, parentheses, dots, underscores, spaces, quotes
if !isValidChar(ch) {
return fmt.Errorf("invalid character '%c' at position %d", ch, i)
File diff suppressed because it is too large Load Diff
+4 -14
View File
@@ -479,12 +479,7 @@ func (f *YearFunction) Validate(args []interface{}) error {
}
func (f *YearFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
// 首先检查是否是 time.Time 类型
if t, ok := args[0].(time.Time); ok {
return float64(t.Year()), nil
}
// 如果不是 time.Time尝试转换为字符串并解析
// 尝试转换为字符串并解析
dateStr, err := cast.ToStringE(args[0])
if err != nil {
return nil, fmt.Errorf("invalid date: %v", err)
@@ -497,7 +492,7 @@ func (f *YearFunction) Execute(ctx *FunctionContext, args []interface{}) (interf
}
}
return float64(t.Year()), nil
return t.Year(), nil
}
// MonthFunction 提取月份函数
@@ -516,12 +511,7 @@ func (f *MonthFunction) Validate(args []interface{}) error {
}
func (f *MonthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
// 首先检查是否是 time.Time 类型
if t, ok := args[0].(time.Time); ok {
return float64(t.Month()), nil
}
// 如果不是 time.Time尝试转换为字符串并解析
// 转换为字符串并解析
dateStr, err := cast.ToStringE(args[0])
if err != nil {
return nil, fmt.Errorf("invalid date: %v", err)
@@ -534,7 +524,7 @@ func (f *MonthFunction) Execute(ctx *FunctionContext, args []interface{}) (inter
}
}
return float64(t.Month()), nil
return int(t.Month()), nil
}
// DayFunction 提取日期函数
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+102 -20
View File
@@ -5,7 +5,13 @@ import (
"reflect"
)
// UnnestFunction 将数组展开为多行
const (
UnnestObjectMarker = "__unnest_object__"
UnnestDataKey = "__data__"
UnnestEmptyMarker = "__empty_unnest__"
DefaultValueKey = "value"
)
type UnnestFunction struct {
*BaseFunction
}
@@ -27,7 +33,13 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte
array := args[0]
if array == nil {
return []interface{}{}, nil
// 返回带有unnest标记的空结果
return []interface{}{
map[string]interface{}{
UnnestObjectMarker: true,
UnnestEmptyMarker: true, // 标记这是空unnest结果
},
}, nil
}
// 使用反射检查是否为数组或切片
@@ -36,7 +48,18 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte
return nil, fmt.Errorf("unnest requires an array or slice, got %T", array)
}
// 转换为 []interface{}
// 如果数组为空,返回带标记的空数组
if v.Len() == 0 {
// 返回带有unnest标记的空结果
return []interface{}{
map[string]interface{}{
UnnestObjectMarker: true,
UnnestEmptyMarker: true, // 标记这是空unnest结果
},
}, nil
}
// 转换为 []interface{}所有元素都标记为unnest结果
result := make([]interface{}, v.Len())
for i := 0; i < v.Len(); i++ {
elem := v.Index(i).Interface()
@@ -45,39 +68,46 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte
if elemMap, ok := elem.(map[string]interface{}); ok {
// 对于对象,我们返回一个特殊的结构来表示需要展开为列
result[i] = map[string]interface{}{
"__unnest_object__": true,
"__data__": elemMap,
UnnestObjectMarker: true,
UnnestDataKey: elemMap,
}
} else {
result[i] = elem
// 对于普通元素也需要标记为unnest结果
result[i] = map[string]interface{}{
UnnestObjectMarker: true,
UnnestDataKey: elem,
}
}
}
return result, nil
}
// UnnestResult 表示 unnest 函数的结果
type UnnestResult struct {
Rows []map[string]interface{}
}
// IsUnnestResult 检查是否为 unnest 结果
func IsUnnestResult(value interface{}) bool {
if slice, ok := value.([]interface{}); ok {
for _, item := range slice {
if itemMap, ok := item.(map[string]interface{}); ok {
if unnest, exists := itemMap["__unnest_object__"]; exists {
if unnestBool, ok := unnest.(bool); ok && unnestBool {
return true
}
slice, ok := value.([]interface{})
if !ok || len(slice) == 0 {
return false
}
// 检查数组中是否有任何unnest标记的元素
for _, item := range slice {
if itemMap, ok := item.(map[string]interface{}); ok {
if unnest, exists := itemMap[UnnestObjectMarker]; exists {
if unnestBool, ok := unnest.(bool); ok && unnestBool {
return true
}
}
}
}
// 如果没有找到unnest标记则不是unnest结果
return false
}
// ProcessUnnestResult 处理 unnest 结果,将其转换为多行
func ProcessUnnestResult(value interface{}) []map[string]interface{} {
slice, ok := value.([]interface{})
if !ok {
@@ -87,20 +117,72 @@ func ProcessUnnestResult(value interface{}) []map[string]interface{} {
var rows []map[string]interface{}
for _, item := range slice {
if itemMap, ok := item.(map[string]interface{}); ok {
if unnest, exists := itemMap["__unnest_object__"]; exists {
if unnest, exists := itemMap[UnnestObjectMarker]; exists {
if unnestBool, ok := unnest.(bool); ok && unnestBool {
if data, exists := itemMap["__data__"]; exists {
if data, exists := itemMap[UnnestDataKey]; exists {
// 检查数据是否为对象map
if dataMap, ok := data.(map[string]interface{}); ok {
// 对象数据直接展开为列
rows = append(rows, dataMap)
} else {
// 普通数据使用默认字段名
row := map[string]interface{}{
DefaultValueKey: data,
}
rows = append(rows, row)
}
}
continue
}
}
}
// 对于非对象元素,创建一个包含单个值的行
// 对于非标记元素,创建一个包含单个值的行(向后兼容)
row := map[string]interface{}{
"value": item,
DefaultValueKey: item,
}
rows = append(rows, row)
}
return rows
}
func ProcessUnnestResultWithFieldName(value interface{}, fieldName string) []map[string]interface{} {
slice, ok := value.([]interface{})
if !ok {
return nil
}
var rows []map[string]interface{}
for _, item := range slice {
if itemMap, ok := item.(map[string]interface{}); ok {
if unnest, exists := itemMap[UnnestObjectMarker]; exists {
if unnestBool, ok := unnest.(bool); ok && unnestBool {
// 检查是否为空unnest结果
if itemMap[UnnestEmptyMarker] == true {
// 空unnest结果返回空数组
return []map[string]interface{}{}
}
if data, exists := itemMap[UnnestDataKey]; exists {
// 检查数据是否为对象map
if dataMap, ok := data.(map[string]interface{}); ok {
// 对象数据直接展开为列
rows = append(rows, dataMap)
} else {
// 普通数据使用指定字段名
row := map[string]interface{}{
fieldName: data,
}
rows = append(rows, row)
}
}
continue
}
}
}
// 对于非标记元素,使用指定的字段名创建行(向后兼容)
row := map[string]interface{}{
fieldName: item,
}
rows = append(rows, row)
}
+46 -6
View File
@@ -15,7 +15,20 @@ func TestUnnestFunction(t *testing.T) {
if err != nil {
t.Errorf("UnnestFunction should not return error: %v", err)
}
expected := []interface{}{"a", "b", "c"}
expected := []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__data__": "a",
},
map[string]interface{}{
"__unnest_object__": true,
"__data__": "b",
},
map[string]interface{}{
"__unnest_object__": true,
"__data__": "c",
},
}
if !reflect.DeepEqual(result, expected) {
t.Errorf("UnnestFunction = %v, want %v", result, expected)
}
@@ -51,8 +64,15 @@ func TestUnnestFunction(t *testing.T) {
if err != nil {
t.Errorf("UnnestFunction should not return error for empty array: %v", err)
}
if len(result.([]interface{})) != 0 {
t.Errorf("UnnestFunction should return empty array for empty input")
// 空数组应该返回带有空标记的结果
expectedEmpty := []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__empty_unnest__": true,
},
}
if !reflect.DeepEqual(result, expectedEmpty) {
t.Errorf("UnnestFunction empty array = %v, want %v", result, expectedEmpty)
}
// 测试nil参数
@@ -61,8 +81,15 @@ func TestUnnestFunction(t *testing.T) {
if err != nil {
t.Errorf("UnnestFunction should not return error for nil: %v", err)
}
if len(result.([]interface{})) != 0 {
t.Errorf("UnnestFunction should return empty array for nil input")
// nil应该返回带有空标记的结果
expectedNil := []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__empty_unnest__": true,
},
}
if !reflect.DeepEqual(result, expectedNil) {
t.Errorf("UnnestFunction nil = %v, want %v", result, expectedNil)
}
// 测试错误参数数量
@@ -85,7 +112,20 @@ func TestUnnestFunction(t *testing.T) {
if err != nil {
t.Errorf("UnnestFunction should handle arrays: %v", err)
}
expected = []interface{}{"x", "y", "z"}
expected = []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__data__": "x",
},
map[string]interface{}{
"__unnest_object__": true,
"__data__": "y",
},
map[string]interface{}{
"__unnest_object__": true,
"__data__": "z",
},
}
if !reflect.DeepEqual(result, expected) {
t.Errorf("UnnestFunction array = %v, want %v", result, expected)
}
+5 -79
View File
@@ -82,7 +82,11 @@ func TestNewStringFunctions(t *testing.T) {
if !exists {
t.Fatalf("Function %s not found", tt.funcName)
}
// 验证参数
if err := fn.Validate(tt.args); err != nil {
t.Errorf("Validate() error = %v", err)
return
}
ctx := &FunctionContext{}
result, err := fn.Execute(ctx, tt.args)
@@ -167,84 +171,6 @@ func TestStringFunctionValidation(t *testing.T) {
args: []interface{}{"hello"},
wantErr: false,
},
{
name: "upper no args",
function: NewUpperFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "upper valid args",
function: NewUpperFunction(),
args: []interface{}{"hello"},
wantErr: false,
},
{
name: "endswith no args",
function: NewEndswithFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "endswith one arg",
function: NewEndswithFunction(),
args: []interface{}{"hello"},
wantErr: true,
},
{
name: "endswith valid args",
function: NewEndswithFunction(),
args: []interface{}{"hello", "lo"},
wantErr: false,
},
{
name: "substring no args",
function: NewSubstringFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "substring one arg",
function: NewSubstringFunction(),
args: []interface{}{"hello"},
wantErr: true,
},
{
name: "substring valid args",
function: NewSubstringFunction(),
args: []interface{}{"hello", 1},
wantErr: false,
},
{
name: "replace no args",
function: NewReplaceFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "replace two args",
function: NewReplaceFunction(),
args: []interface{}{"hello", "world"},
wantErr: true,
},
{
name: "replace valid args",
function: NewReplaceFunction(),
args: []interface{}{"hello", "l", "x"},
wantErr: false,
},
{
name: "regexp_matches no args",
function: NewRegexpMatchesFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "regexp_matches valid args",
function: NewRegexpMatchesFunction(),
args: []interface{}{"hello123", "[0-9]+"},
wantErr: false,
},
}
for _, tt := range tests {
+80 -177
View File
@@ -4,184 +4,8 @@ import (
"testing"
)
// TestTypeFunctions 测试类型检查函数的基本功能
// TestTypeFunctions 测试类型函数
func TestTypeFunctions(t *testing.T) {
tests := []struct {
name string
funcName string
args []interface{}
expected interface{}
}{
{
name: "is_null true",
funcName: "is_null",
args: []interface{}{nil},
expected: true,
},
{
name: "is_null false",
funcName: "is_null",
args: []interface{}{"test"},
expected: false,
},
{
name: "is_not_null true",
funcName: "is_not_null",
args: []interface{}{"test"},
expected: true,
},
{
name: "is_not_null false",
funcName: "is_not_null",
args: []interface{}{nil},
expected: false,
},
{
name: "is_numeric true",
funcName: "is_numeric",
args: []interface{}{123},
expected: true,
},
{
name: "is_numeric false",
funcName: "is_numeric",
args: []interface{}{"test"},
expected: false,
},
{
name: "is_string true",
funcName: "is_string",
args: []interface{}{"test"},
expected: true,
},
{
name: "is_string false",
funcName: "is_string",
args: []interface{}{123},
expected: false,
},
{
name: "is_bool true",
funcName: "is_bool",
args: []interface{}{true},
expected: true,
},
{
name: "is_bool false",
funcName: "is_bool",
args: []interface{}{"test"},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fn, exists := Get(tt.funcName)
if !exists {
t.Fatalf("Function %s not found", tt.funcName)
}
result, err := fn.Execute(&FunctionContext{}, tt.args)
if err != nil {
t.Errorf("Execute() error = %v", err)
return
}
if result != tt.expected {
t.Errorf("Execute() = %v, want %v", result, tt.expected)
}
})
}
}
// TestTypeFunctionValidation 测试类型函数的参数验证
func TestTypeFunctionValidation(t *testing.T) {
tests := []struct {
name string
function Function
args []interface{}
wantErr bool
}{
{
name: "is_null no args",
function: NewIsNullFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_null too many args",
function: NewIsNullFunction(),
args: []interface{}{"test", "extra"},
wantErr: true,
},
{
name: "is_null valid args",
function: NewIsNullFunction(),
args: []interface{}{"test"},
wantErr: false,
},
{
name: "is_not_null no args",
function: NewIsNotNullFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_not_null valid args",
function: NewIsNotNullFunction(),
args: []interface{}{nil},
wantErr: false,
},
{
name: "is_numeric no args",
function: NewIsNumericFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_numeric valid args",
function: NewIsNumericFunction(),
args: []interface{}{123},
wantErr: false,
},
{
name: "is_string no args",
function: NewIsStringFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_string valid args",
function: NewIsStringFunction(),
args: []interface{}{"test"},
wantErr: false,
},
{
name: "is_bool no args",
function: NewIsBoolFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_bool valid args",
function: NewIsBoolFunction(),
args: []interface{}{true},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.function.Validate(tt.args)
if (err != nil) != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
// TestTypeFunctionEdgeCases 测试类型函数的边界情况
func TestTypeFunctionEdgeCases(t *testing.T) {
tests := []struct {
name string
function Function
@@ -242,6 +66,12 @@ func TestTypeFunctionEdgeCases(t *testing.T) {
args: []interface{}{true},
expected: false,
},
{
name: "is_numeric with nil",
function: NewIsNumericFunction(),
args: []interface{}{nil},
expected: false,
},
{
name: "is_string with empty string",
function: NewIsStringFunction(),
@@ -254,10 +84,45 @@ func TestTypeFunctionEdgeCases(t *testing.T) {
args: []interface{}{false},
expected: true,
},
{
name: "is_bool with nil",
function: NewIsBoolFunction(),
args: []interface{}{nil},
expected: false,
},
{
name: "is_array",
function: NewIsArrayFunction(),
args: []interface{}{[]int{1, 2, 3}},
expected: true,
},
{
name: "is_array with nil",
function: NewIsArrayFunction(),
args: []interface{}{nil},
expected: false,
},
{
name: "is_object",
function: NewIsObjectFunction(),
args: []interface{}{map[string]int{"a": 1, "b": 2}},
expected: true,
},
{
name: "is_object with nil",
function: NewIsObjectFunction(),
args: []interface{}{nil},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 验证参数
if err := tt.function.Validate(tt.args); err != nil {
t.Errorf("Validate() error = %v", err)
return
}
result, err := tt.function.Execute(&FunctionContext{}, tt.args)
if err != nil {
t.Errorf("Execute() error = %v", err)
@@ -270,3 +135,41 @@ func TestTypeFunctionEdgeCases(t *testing.T) {
})
}
}
// TestTypeFunctionValidation 测试类型函数的参数验证
func TestTypeFunctionValidation(t *testing.T) {
tests := []struct {
name string
function Function
args []interface{}
wantErr bool
}{
{
name: "is_null no args",
function: NewIsNullFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_null too many args",
function: NewIsNullFunction(),
args: []interface{}{"test", "extra"},
wantErr: true,
},
{
name: "is_null valid args",
function: NewIsNullFunction(),
args: []interface{}{"test"},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.function.Validate(tt.args)
if (err != nil) != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
+1 -1
View File
@@ -3,7 +3,7 @@ module github.com/rulego/streamsql
go 1.18
require (
github.com/expr-lang/expr v1.17.6
github.com/expr-lang/expr v1.17.7
github.com/stretchr/testify v1.10.0
)
+2 -2
View File
@@ -1,7 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/expr-lang/expr v1.17.6 h1:1h6i8ONk9cexhDmowO/A64VPxHScu7qfSl2k8OlINec=
github.com/expr-lang/expr v1.17.6/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8=
github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+140 -67
View File
@@ -8,6 +8,7 @@ import (
"github.com/rulego/streamsql/functions"
"github.com/rulego/streamsql/types"
"github.com/rulego/streamsql/utils/cast"
"github.com/rulego/streamsql/window"
"github.com/rulego/streamsql/aggregator"
@@ -34,10 +35,13 @@ type Field struct {
}
type WindowDefinition struct {
Type string
Params []interface{}
TsProp string
TimeUnit time.Duration
Type string
Params []interface{}
TsProp string
TimeUnit time.Duration
MaxOutOfOrderness time.Duration // Maximum allowed out-of-orderness for event time
AllowedLateness time.Duration // Maximum allowed lateness for event time windows
IdleTimeout time.Duration // Idle source timeout: when no data arrives within this duration, watermark advances based on processing time
}
// ToStreamConfig converts AST to Stream configuration
@@ -58,9 +62,16 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
windowType = window.TypeSession
}
params, err := parseWindowParamsWithType(s.Window.Params, windowType)
if err != nil {
return nil, "", fmt.Errorf("failed to parse window parameters: %w", err)
// Parse window parameters - now returns array directly
params := s.Window.Params
// Validate and convert parameters based on window type
if len(params) > 0 {
var err error
params, err = validateWindowParams(params, windowType)
if err != nil {
return nil, "", fmt.Errorf("failed to validate window parameters: %w", err)
}
}
// Check if window processing is needed
@@ -80,16 +91,7 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
if !needWindow && hasAggregation {
needWindow = true
windowType = window.TypeTumbling
params = map[string]interface{}{
"size": 10 * time.Second, // Default 10-second window
}
}
// Handle special configuration for SessionWindow
var groupByKey string
if windowType == window.TypeSession && len(s.GroupBy) > 0 {
// For session window, use the first GROUP BY field as session key
groupByKey = s.GroupBy[0]
params = []interface{}{10 * time.Second} // Default 10-second window
}
// If no aggregation functions, collect simple fields
@@ -105,10 +107,10 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
simpleFields = append(simpleFields, fieldName+":"+field.Alias)
} else {
// For fields without alias, check if it's a string literal
_, n, _, _, err := ParseAggregateTypeWithExpression(fieldName)
if err != nil {
return nil, "", err
}
_, n, _, _, err := ParseAggregateTypeWithExpression(fieldName)
if err != nil {
return nil, "", err
}
if n != "" {
// If string literal, use parsed field name (remove quotes)
simpleFields = append(simpleFields, n)
@@ -134,14 +136,25 @@ func (s *SelectStatement) ToStreamConfig() (*types.Config, string, error) {
return nil, "", err
}
// Determine time characteristic based on whether TIMESTAMP is specified in WITH clause
// If TsProp is set, use EventTime; otherwise use ProcessingTime (default)
timeCharacteristic := types.ProcessingTime
if s.Window.TsProp != "" {
timeCharacteristic = types.EventTime
}
// Build Stream configuration
config := types.Config{
WindowConfig: types.WindowConfig{
Type: windowType,
Params: params,
TsProp: s.Window.TsProp,
TimeUnit: s.Window.TimeUnit,
GroupByKey: groupByKey,
Type: windowType,
Params: params,
TsProp: s.Window.TsProp,
TimeUnit: s.Window.TimeUnit,
TimeCharacteristic: timeCharacteristic,
MaxOutOfOrderness: s.Window.MaxOutOfOrderness,
AllowedLateness: s.Window.AllowedLateness,
IdleTimeout: s.Window.IdleTimeout,
GroupByKeys: extractGroupFields(s),
},
GroupFields: extractGroupFields(s),
SelectFields: aggs,
@@ -245,9 +258,9 @@ func buildSelectFields(fields []Field) (aggMap map[string]aggregator.AggregateTy
for _, f := range fields {
if alias := f.Alias; alias != "" {
t, n, _, _, parseErr := ParseAggregateTypeWithExpression(f.Expression)
if parseErr != nil {
return nil, nil, parseErr
}
if parseErr != nil {
return nil, nil, parseErr
}
if t != "" {
// Use alias as key for aggregator, not field name
selectFields[alias] = t
@@ -287,11 +300,11 @@ func detectNestedAggregationRecursive(expr string, inAggregation bool) error {
// 使用正则表达式匹配函数调用模式
pattern := regexp.MustCompile(`(?i)([a-z_]+)\s*\(`)
matches := pattern.FindAllStringSubmatchIndex(expr, -1)
for _, match := range matches {
funcStart := match[0]
funcName := strings.ToLower(expr[match[2]:match[3]])
// 检查函数是否为聚合函数
if fn, exists := functions.Get(funcName); exists {
switch fn.GetType() {
@@ -300,14 +313,14 @@ func detectNestedAggregationRecursive(expr string, inAggregation bool) error {
if inAggregation {
return fmt.Errorf("aggregate function calls cannot be nested")
}
// 找到该函数的参数部分
funcEnd := findMatchingParenInternal(expr, funcStart+len(funcName))
if funcEnd > funcStart {
// 提取函数参数
paramStart := funcStart + len(funcName) + 1
params := expr[paramStart:funcEnd]
// 在聚合函数参数内部递归检查
if err := detectNestedAggregationRecursive(params, true); err != nil {
return err
@@ -316,7 +329,7 @@ func detectNestedAggregationRecursive(expr string, inAggregation bool) error {
}
}
}
return nil
}
@@ -697,43 +710,103 @@ func extractSimpleField(fieldExpr string) string {
return fieldExpr
}
func parseWindowParams(params []interface{}) (map[string]interface{}, error) {
return parseWindowParamsWithType(params, "")
}
// validateWindowParams validates and converts window parameters based on window type
// Returns validated parameters array with proper types
func validateWindowParams(params []interface{}, windowType string) ([]interface{}, error) {
if len(params) == 0 {
return params, nil
}
func parseWindowParamsWithType(params []interface{}, windowType string) (map[string]interface{}, error) {
result := make(map[string]interface{})
var key string
for index, v := range params {
if windowType == window.TypeSession {
// First parameter for SessionWindow is timeout
if index == 0 {
key = "timeout"
} else {
key = fmt.Sprintf("param%d", index)
}
} else {
// Parameters for other window types
if index == 0 {
key = "size"
} else if index == 1 {
key = "slide"
} else {
key = "offset"
}
validated := make([]interface{}, 0, len(params))
if windowType == window.TypeCounting {
// CountingWindow expects integer count as first parameter
if len(params) == 0 {
return nil, fmt.Errorf("counting window requires at least one parameter")
}
if s, ok := v.(string); ok {
dur, err := time.ParseDuration(s)
if err != nil {
return nil, fmt.Errorf("invalid %s duration: %w", s, err)
}
result[key] = dur
} else {
return nil, fmt.Errorf("%s parameter must be string format (like '5s')", s)
// Convert first parameter to int using cast utility
count, err := cast.ToIntE(params[0])
if err != nil {
return nil, fmt.Errorf("invalid count parameter: %w", err)
}
if count <= 0 {
return nil, fmt.Errorf("counting window count must be positive, got: %d", count)
}
validated = append(validated, count)
// Add any additional parameters
if len(params) > 1 {
validated = append(validated, params[1:]...)
}
return validated, nil
}
// Helper function to convert a value to time.Duration
// For numeric types, treats them as seconds
// For strings, uses time.ParseDuration
convertToDuration := func(val interface{}) (time.Duration, error) {
switch v := val.(type) {
case time.Duration:
return v, nil
case string:
// Use ToDurationE which handles string parsing
return cast.ToDurationE(v)
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
// Treat numeric integers as seconds
return time.Duration(cast.ToInt(v)) * time.Second, nil
case float32, float64:
// Treat numeric floats as seconds
return time.Duration(int(cast.ToFloat64(v))) * time.Second, nil
default:
// Try ToDurationE as fallback
return cast.ToDurationE(v)
}
}
return result, nil
if windowType == window.TypeSession {
// SessionWindow expects timeout duration as first parameter
if len(params) == 0 {
return nil, fmt.Errorf("session window requires at least one parameter")
}
timeout, err := convertToDuration(params[0])
if err != nil {
return nil, fmt.Errorf("invalid timeout duration: %w", err)
}
if timeout <= 0 {
return nil, fmt.Errorf("session window timeout must be positive, got: %v", timeout)
}
validated = append(validated, timeout)
// Add any additional parameters
if len(params) > 1 {
validated = append(validated, params[1:]...)
}
return validated, nil
}
// For TumblingWindow and SlidingWindow, convert parameters to time.Duration
for index, v := range params {
dur, err := convertToDuration(v)
if err != nil {
return nil, fmt.Errorf("invalid duration parameter at index %d: %w", index, err)
}
if dur <= 0 {
return nil, fmt.Errorf("duration parameter at index %d must be positive, got: %v", index, dur)
}
validated = append(validated, dur)
}
return validated, nil
}
func parseAggregateExpression(expr string) string {
@@ -958,7 +1031,7 @@ func parseComplexAggExpressionInternal(expr string) ([]types.AggregationFieldInf
if err := detectNestedAggregation(expr); err != nil {
return nil, "", err
}
// 使用改进的递归解析方法
aggFields, exprTemplate := parseNestedFunctionsInternal(expr, make([]types.AggregationFieldInfo, 0))
return aggFields, exprTemplate, nil
+2 -2
View File
@@ -251,8 +251,8 @@ func TestSelectStatementEdgeCases(t *testing.T) {
if config2.WindowConfig.Type != window.TypeSession {
t.Errorf("Expected session window, got %v", config2.WindowConfig.Type)
}
if config2.WindowConfig.GroupByKey != "user_id" {
t.Errorf("Expected GroupByKey to be 'user_id', got %s", config2.WindowConfig.GroupByKey)
if len(config2.WindowConfig.GroupByKeys) == 0 || config2.WindowConfig.GroupByKeys[0] != "user_id" {
t.Errorf("Expected GroupByKeys to contain 'user_id', got %v", config2.WindowConfig.GroupByKeys)
}
}
+22 -6
View File
@@ -6,6 +6,7 @@ import (
"github.com/rulego/streamsql/aggregator"
"github.com/rulego/streamsql/types"
"github.com/rulego/streamsql/window"
)
// TestParseSmartParameters 测试智能参数解析函数
@@ -202,6 +203,12 @@ func TestParseWindowParams(t *testing.T) {
windowType: "SLIDINGWINDOW",
expectError: false,
},
{
name: "计数窗口参数",
params: []interface{}{100},
windowType: "COUNTINGWINDOW",
expectError: false,
},
{
name: "无效持续时间",
params: []interface{}{"invalid"},
@@ -212,7 +219,7 @@ func TestParseWindowParams(t *testing.T) {
name: "非字符串参数",
params: []interface{}{123},
windowType: "TUMBLINGWINDOW",
expectError: true,
expectError: false, // 整数参数会被视为秒数,这是有效的
},
{
name: "空参数",
@@ -224,15 +231,24 @@ func TestParseWindowParams(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var result map[string]interface{}
var result []interface{}
var err error
if tt.windowType == "SESSIONWINDOW" {
result, err = parseWindowParamsWithType(tt.params, "SESSIONWINDOW")
} else {
result, err = parseWindowParams(tt.params)
// Convert window type to internal format
windowType := ""
switch tt.windowType {
case "SESSIONWINDOW":
windowType = window.TypeSession
case "TUMBLINGWINDOW":
windowType = window.TypeTumbling
case "SLIDINGWINDOW":
windowType = window.TypeSliding
case "COUNTINGWINDOW":
windowType = window.TypeCounting
}
result, err = validateWindowParams(tt.params, windowType)
if tt.expectError {
if err == nil {
t.Errorf("Expected error but got none")
+1 -1
View File
@@ -88,7 +88,7 @@ func (fv *FunctionValidator) isBuiltinFunction(funcName string) bool {
func (fv *FunctionValidator) isKeyword(word string) bool {
keywords := []string{
"SELECT", "FROM", "WHERE", "GROUP", "BY", "HAVING", "ORDER",
"AS", "DISTINCT", "LIMIT", "WITH", "TIMESTAMP", "TIMEUNIT",
"AS", "DISTINCT", "LIMIT", "WITH", "TIMESTAMP", "TIMEUNIT", "MAXOUTOFORDERNESS", "ALLOWEDLATENESS", "IDLETIMEOUT",
"TUMBLINGWINDOW", "SLIDINGWINDOW", "COUNTINGWINDOW", "SESSIONWINDOW",
"AND", "OR", "NOT", "IN", "LIKE", "IS", "NULL", "TRUE", "FALSE",
"BETWEEN", "IS", "NULL", "TRUE", "FALSE", "CASE", "WHEN",
+9
View File
@@ -41,6 +41,9 @@ const (
TokenWITH
TokenTimestamp
TokenTimeUnit
TokenMaxOutOfOrderness
TokenAllowedLateness
TokenIdleTimeout
TokenOrder
TokenDISTINCT
TokenLIMIT
@@ -349,6 +352,12 @@ func (l *Lexer) lookupIdent(ident string) Token {
return Token{Type: TokenTimestamp, Value: ident}
case "TIMEUNIT":
return Token{Type: TokenTimeUnit, Value: ident}
case "MAXOUTOFORDERNESS":
return Token{Type: TokenMaxOutOfOrderness, Value: ident}
case "ALLOWEDLATENESS":
return Token{Type: TokenAllowedLateness, Value: ident}
case "IDLETIMEOUT":
return Token{Type: TokenIdleTimeout, Value: ident}
case "ORDER":
return Token{Type: TokenOrder, Value: ident}
case "DISTINCT":
+101 -20
View File
@@ -8,6 +8,7 @@ import (
"time"
"github.com/rulego/streamsql/types"
"github.com/rulego/streamsql/utils/cast"
)
// 解析器配置常量
@@ -506,44 +507,46 @@ func (p *Parser) parseWhere(stmt *SelectStatement) error {
}
func (p *Parser) parseWindowFunction(stmt *SelectStatement, winType string) error {
p.lexer.NextToken() // 跳过(
var params []interface{}
nextTok := p.lexer.NextToken() // 读取下一个 token应该是 '('
if nextTok.Type != TokenLParen {
return fmt.Errorf("expected '(' after window function %s, got %s (type: %v)", winType, nextTok.Value, nextTok.Type)
}
// 设置最大次数限制,防止无限循环
var params []interface{}
maxIterations := 100
iterations := 0
for p.lexer.peekChar() != ')' {
// Parse parameters until we find the closing parenthesis
for {
iterations++
// 安全检查:防止无限循环
if iterations > maxIterations {
return errors.New("window function parameter parsing exceeded maximum iterations, possible syntax error")
return fmt.Errorf("window function parameter parsing exceeded maximum iterations")
}
// Read the next token first
valTok := p.lexer.NextToken()
// If we hit the closing parenthesis or EOF, break
if valTok.Type == TokenRParen || valTok.Type == TokenEOF {
break
}
// Skip commas
if valTok.Type == TokenComma {
continue
}
//valTok := p.lexer.NextToken()
// Handle quoted values
if strings.HasPrefix(valTok.Value, "'") && strings.HasSuffix(valTok.Value, "'") {
valTok.Value = strings.Trim(valTok.Value, "'")
}
// Add the parameter value
params = append(params, convertValue(valTok.Value))
}
if &stmt.Window != nil {
stmt.Window.Params = params
stmt.Window.Type = winType
} else {
stmt.Window = WindowDefinition{
Type: winType,
Params: params,
}
}
stmt.Window.Params = params
stmt.Window.Type = winType
return nil
}
@@ -593,7 +596,9 @@ func (p *Parser) parseGroupBy(stmt *SelectStatement) error {
hasWindowFunction := false
if tok.Type == TokenTumbling || tok.Type == TokenSliding || tok.Type == TokenCounting || tok.Type == TokenSession {
hasWindowFunction = true
_ = p.parseWindowFunction(stmt, tok.Value)
if err := p.parseWindowFunction(stmt, tok.Value); err != nil {
return err
}
}
hasGroupBy := false
@@ -633,7 +638,15 @@ func (p *Parser) parseGroupBy(stmt *SelectStatement) error {
continue
}
if tok.Type == TokenTumbling || tok.Type == TokenSliding || tok.Type == TokenCounting || tok.Type == TokenSession {
_ = p.parseWindowFunction(stmt, tok.Value)
if err := p.parseWindowFunction(stmt, tok.Value); err != nil {
return err
}
// After parsing window function, skip adding it to GroupBy and continue
continue
}
// Skip right parenthesis tokens (they should be consumed by parseWindowFunction)
if tok.Type == TokenRParen {
continue
}
@@ -696,7 +709,7 @@ func (p *Parser) parseWith(stmt *SelectStatement) error {
}
}
if valTok.Type == TokenTimeUnit {
timeUnit := time.Minute
timeUnit := time.Millisecond // Default to milliseconds
next := p.lexer.NextToken()
if next.Type == TokenEQ {
next = p.lexer.NextToken()
@@ -714,8 +727,10 @@ func (p *Parser) parseWith(stmt *SelectStatement) error {
timeUnit = time.Second
case "ms":
timeUnit = time.Millisecond
case "ns":
timeUnit = time.Nanosecond
default:
// If unknown unit, keep default (milliseconds)
}
// Check if Window is initialized; if not, create new WindowDefinition
if stmt.Window.Type == "" {
@@ -727,6 +742,72 @@ func (p *Parser) parseWith(stmt *SelectStatement) error {
}
}
}
if valTok.Type == TokenMaxOutOfOrderness {
next := p.lexer.NextToken()
if next.Type == TokenEQ {
next = p.lexer.NextToken()
durationStr := next.Value
if strings.HasPrefix(durationStr, "'") && strings.HasSuffix(durationStr, "'") {
durationStr = strings.Trim(durationStr, "'")
}
// Parse duration string like '5s', '2m', '1h', etc.
if duration, err := cast.ToDurationE(durationStr); err == nil {
// Check if Window is initialized; if not, create new WindowDefinition
if stmt.Window.Type == "" {
stmt.Window = WindowDefinition{
MaxOutOfOrderness: duration,
}
} else {
stmt.Window.MaxOutOfOrderness = duration
}
}
// If parsing fails, silently ignore (keep default 0)
}
}
if valTok.Type == TokenAllowedLateness {
next := p.lexer.NextToken()
if next.Type == TokenEQ {
next = p.lexer.NextToken()
durationStr := next.Value
if strings.HasPrefix(durationStr, "'") && strings.HasSuffix(durationStr, "'") {
durationStr = strings.Trim(durationStr, "'")
}
// Parse duration string like '5s', '2m', '1h', etc.
if duration, err := cast.ToDurationE(durationStr); err == nil {
// Check if Window is initialized; if not, create new WindowDefinition
if stmt.Window.Type == "" {
stmt.Window = WindowDefinition{
AllowedLateness: duration,
}
} else {
stmt.Window.AllowedLateness = duration
}
}
// If parsing fails, silently ignore (keep default 0)
}
}
if valTok.Type == TokenIdleTimeout {
next := p.lexer.NextToken()
if next.Type == TokenEQ {
next = p.lexer.NextToken()
durationStr := next.Value
if strings.HasPrefix(durationStr, "'") && strings.HasSuffix(durationStr, "'") {
durationStr = strings.Trim(durationStr, "'")
}
// Parse duration string like '5s', '2m', '1h', etc.
if duration, err := cast.ToDurationE(durationStr); err == nil {
// Check if Window is initialized; if not, create new WindowDefinition
if stmt.Window.Type == "" {
stmt.Window = WindowDefinition{
IdleTimeout: duration,
}
} else {
stmt.Window.IdleTimeout = duration
}
}
// If parsing fails, silently ignore (keep default 0)
}
}
}
return nil
+5 -5
View File
@@ -37,7 +37,7 @@ func TestDataProcessor_ApplyDistinct(t *testing.T) {
},
WindowConfig: types.WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{"size": 1 * time.Second},
Params: []interface{}{1 * time.Second},
},
}
stream, err := NewStream(config)
@@ -79,7 +79,7 @@ func TestDataProcessor_ApplyHavingFilter(t *testing.T) {
Having: "temperature > 25",
WindowConfig: types.WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{"size": 1 * time.Second},
Params: []interface{}{1 * time.Second},
},
}
stream, err := NewStream(config)
@@ -120,7 +120,7 @@ func TestDataProcessor_ApplyHavingWithCaseExpression(t *testing.T) {
Having: "CASE WHEN temperature > 30 THEN 1 WHEN status = 'active' THEN 1 ELSE 0 END",
WindowConfig: types.WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{"size": 1 * time.Second},
Params: []interface{}{1 * time.Second},
},
}
stream, err := NewStream(config)
@@ -161,7 +161,7 @@ func TestDataProcessor_ApplyHavingWithCondition(t *testing.T) {
Having: "temperature > 25",
WindowConfig: types.WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{"size": 1 * time.Second},
Params: []interface{}{1 * time.Second},
},
}
stream, err := NewStream(config)
@@ -541,7 +541,7 @@ func TestStream_ProcessSync(t *testing.T) {
},
WindowConfig: types.WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{"size": 1 * time.Second},
Params: []interface{}{1 * time.Second},
},
}
aggStream, err := NewStream(aggConfig)
+9
View File
@@ -42,7 +42,16 @@ func (s *Stream) safeGetDataChan() chan map[string]interface{} {
// safeSendToDataChan safely sends data to dataChan
func (s *Stream) safeSendToDataChan(data map[string]interface{}) bool {
// Check if stream is stopped before attempting to send
if atomic.LoadInt32(&s.stopped) == 1 {
return false
}
dataChan := s.safeGetDataChan()
if dataChan == nil {
return false
}
select {
case dataChan <- data:
return true
+32 -2
View File
@@ -141,7 +141,7 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) {
s.sinksMux.RLock()
defer s.sinksMux.RUnlock()
if len(s.sinks) == 0 {
if len(s.sinks) == 0 && len(s.syncSinks) == 0 {
return
}
@@ -150,6 +150,19 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) {
for _, sink := range s.sinks {
s.submitSinkTask(sink, results)
}
// Execute synchronous sinks (blocking, sequential)
for _, sink := range s.syncSinks {
// Recover panic for each sync sink to prevent crashing the stream
func() {
defer func() {
if r := recover(); r != nil {
logger.Error("Sync sink execution exception: %v", r)
}
}()
sink(results)
}()
}
}
// submitSinkTask submits sink task
@@ -169,24 +182,41 @@ func (s *Stream) submitSinkTask(sink func([]map[string]interface{}), results []m
}
// Non-blocking task submission
// Note: Since we use a worker pool, tasks may be executed out of order
select {
case s.sinkWorkerPool <- task:
// Successfully submitted task
default:
// Worker pool is full, execute directly in current goroutine (degraded handling)
go task()
// This also helps with backpressure
task()
}
}
// AddSink adds a sink function
// Parameters:
// - sink: result processing function that receives []map[string]interface{} type result data
//
// Note: Sinks are executed asynchronously in a worker pool, so execution order is NOT guaranteed.
// If you need strict ordering, use GetResultsChan() instead.
func (s *Stream) AddSink(sink func([]map[string]interface{})) {
s.sinksMux.Lock()
defer s.sinksMux.Unlock()
s.sinks = append(s.sinks, sink)
}
// AddSyncSink adds a synchronous sink function
// Parameters:
// - sink: result processing function that receives []map[string]interface{} type result data
//
// Note: Sync sinks are executed sequentially in the result processing goroutine.
// They block subsequent processing, so they should be fast.
func (s *Stream) AddSyncSink(sink func([]map[string]interface{})) {
s.sinksMux.Lock()
defer s.sinksMux.Unlock()
s.syncSinks = append(s.syncSinks, sink)
}
// GetResultsChan gets the result channel
func (s *Stream) GetResultsChan() <-chan []map[string]interface{} {
return s.resultChan
+10 -1
View File
@@ -42,7 +42,7 @@ func (s *Stream) GetStats() map[string]int64 {
dataChanCap := int64(cap(s.dataChan))
s.dataChanMux.RUnlock()
return map[string]int64{
stats := map[string]int64{
InputCount: atomic.LoadInt64(&s.inputCount),
OutputCount: atomic.LoadInt64(&s.outputCount),
DroppedCount: atomic.LoadInt64(&s.droppedCount),
@@ -55,6 +55,15 @@ func (s *Stream) GetStats() map[string]int64 {
ActiveRetries: int64(atomic.LoadInt32(&s.activeRetries)),
Expanding: int64(atomic.LoadInt32(&s.expanding)),
}
if s.Window != nil {
winStats := s.Window.GetStats()
for k, v := range winStats {
stats[k] = v
}
}
return stats
}
// GetDetailedStats gets detailed performance statistics
+62 -4
View File
@@ -59,6 +59,11 @@ func (dp *DataProcessor) Process() {
currentDataChan := dp.stream.dataChan
dp.stream.dataChanMux.RUnlock()
// Check if dataChan is nil (stream has been stopped)
if currentDataChan == nil {
return
}
select {
case data, ok := <-currentDataChan:
if !ok {
@@ -305,8 +310,19 @@ func (dp *DataProcessor) startWindowProcessing() {
}
}()
for batch := range dp.stream.Window.OutputChan() {
dp.processWindowBatch(batch)
outputChan := dp.stream.Window.OutputChan()
for {
select {
case batch, ok := <-outputChan:
if !ok {
// Channel closed, exit
return
}
dp.processWindowBatch(batch)
case <-dp.stream.done:
// Stream stopped, exit
return
}
}
}()
}
@@ -522,8 +538,8 @@ func (dp *DataProcessor) processDirectData(data map[string]interface{}) {
}
}
// Wrap result as array
results := []map[string]interface{}{result}
// Check if any field contains unnest function result and expand to multiple rows
results := dp.expandUnnestResults(result, dataMap)
// Non-blocking send result to resultChan
dp.stream.sendResultNonBlocking(results)
@@ -531,3 +547,45 @@ func (dp *DataProcessor) processDirectData(data map[string]interface{}) {
// Asynchronously call all sinks, avoid blocking
dp.stream.callSinksAsync(results)
}
// expandUnnestResults 检查结果是否包含 unnest 函数输出并展开为多行
func (dp *DataProcessor) expandUnnestResults(result map[string]interface{}, originalData map[string]interface{}) []map[string]interface{} {
// Early return if no unnest function is used in the query
// This optimization significantly improves performance for queries without unnest functions
if !dp.stream.hasUnnestFunction {
return []map[string]interface{}{result}
}
if len(result) == 0 {
return []map[string]interface{}{result}
}
for fieldName, fieldValue := range result {
if functions.IsUnnestResult(fieldValue) {
expandedRows := functions.ProcessUnnestResultWithFieldName(fieldValue, fieldName)
// 如果unnest结果为空返回空结果数组
if len(expandedRows) == 0 {
return []map[string]interface{}{}
}
results := make([]map[string]interface{}, len(expandedRows))
for i, unnestRow := range expandedRows {
newRow := make(map[string]interface{}, len(result)+len(unnestRow))
for k, v := range result {
if k != fieldName {
newRow[k] = v
}
}
for k, v := range unnestRow {
newRow[k] = v
}
results[i] = newRow
}
return results
}
}
return []map[string]interface{}{result}
}
+139 -2
View File
@@ -56,7 +56,7 @@ func TestDataProcessor_InitializeAggregator(t *testing.T) {
},
WindowConfig: types.WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{"size": 1 * time.Second},
Params: []interface{}{1 * time.Second},
},
}
stream, err := NewStream(config)
@@ -90,7 +90,7 @@ func TestDataProcessor_RegisterExpressionCalculator(t *testing.T) {
},
WindowConfig: types.WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{"size": 1 * time.Second},
Params: []interface{}{1 * time.Second},
},
}
stream, err := NewStream(config)
@@ -430,3 +430,140 @@ func TestDataProcessor_ExpressionWithNullValues(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 20.0, result)
}
// TestDataProcessor_ExpandUnnestResults 测试 expandUnnestResults 函数的各种情况
func TestDataProcessor_ExpandUnnestResults(t *testing.T) {
tests := []struct {
name string
hasUnnestFunction bool
result map[string]interface{}
originalData map[string]interface{}
expected []map[string]interface{}
}{
{
name: "no unnest function - should return single result",
hasUnnestFunction: false,
result: map[string]interface{}{
"name": "test",
"age": 25,
},
originalData: map[string]interface{}{"id": 1},
expected: []map[string]interface{}{
{"name": "test", "age": 25},
},
},
{
name: "empty result - should return single empty result",
hasUnnestFunction: true,
result: map[string]interface{}{},
originalData: map[string]interface{}{"id": 1},
expected: []map[string]interface{}{
{},
},
},
{
name: "no unnest result - should return single result",
hasUnnestFunction: true,
result: map[string]interface{}{
"name": "test",
"age": 25,
},
originalData: map[string]interface{}{"id": 1},
expected: []map[string]interface{}{
{"name": "test", "age": 25},
},
},
{
name: "unnest result with simple values",
hasUnnestFunction: true,
result: map[string]interface{}{
"name": "test",
"items": []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__data__": "item1",
},
map[string]interface{}{
"__unnest_object__": true,
"__data__": "item2",
},
},
},
originalData: map[string]interface{}{"id": 1},
expected: []map[string]interface{}{
{"name": "test", "items": "item1"},
{"name": "test", "items": "item2"},
},
},
{
name: "unnest result with object values",
hasUnnestFunction: true,
result: map[string]interface{}{
"name": "test",
"orders": []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__data__": map[string]interface{}{
"order_id": 1,
"amount": 100,
},
},
map[string]interface{}{
"__unnest_object__": true,
"__data__": map[string]interface{}{
"order_id": 2,
"amount": 200,
},
},
},
},
originalData: map[string]interface{}{"id": 1},
expected: []map[string]interface{}{
{"name": "test", "order_id": 1, "amount": 100},
{"name": "test", "order_id": 2, "amount": 200},
},
},
{
name: "empty unnest result - should return empty array",
hasUnnestFunction: true,
result: map[string]interface{}{
"name": "test",
"items": []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__empty_unnest__": true,
},
},
},
originalData: map[string]interface{}{"id": 1},
expected: []map[string]interface{}{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 创建测试用的 stream 和 processor
config := types.Config{
SimpleFields: []string{"name", "age"},
}
stream, err := NewStream(config)
require.NoError(t, err)
defer func() {
if stream != nil {
close(stream.done)
}
}()
// 设置 hasUnnestFunction 标志
stream.hasUnnestFunction = tt.hasUnnestFunction
processor := NewDataProcessor(stream)
// 调用被测试的函数
result := processor.expandUnnestResults(tt.result, tt.originalData)
// 验证结果
assert.Equal(t, tt.expected, result)
})
}
}
+59 -2
View File
@@ -61,7 +61,32 @@ func (s *Stream) compileSimpleFieldInfo(fieldSpec string) *fieldProcessInfo {
}
// Parse alias
parts := strings.Split(fieldSpec, ":")
var parts []string
// Helper to split field spec considering quotes
splitFieldSpec := func(spec string) []string {
inQuote := false
var quoteChar byte
for i := 0; i < len(spec); i++ {
c := spec[i]
if inQuote {
if c == quoteChar {
inQuote = false
}
} else {
if c == '\'' || c == '"' || c == '`' {
inQuote = true
quoteChar = c
} else if c == ':' {
// Found separator
return []string{spec[:i], spec[i+1:]}
}
}
}
// No separator found outside quotes
return []string{spec}
}
parts = splitFieldSpec(fieldSpec)
info.fieldName = parts[0]
// Remove backticks from field name
if len(info.fieldName) >= 2 && info.fieldName[0] == '`' && info.fieldName[len(info.fieldName)-1] == '`' {
@@ -98,6 +123,8 @@ func (s *Stream) compileSimpleFieldInfo(fieldSpec string) *fieldProcessInfo {
// compileExpressionInfo pre-compiles expression processing information
func (s *Stream) compileExpressionInfo() {
// Initialize unnest function detection flag
s.hasUnnestFunction = false
bridge := functions.GetExprBridge()
for fieldName, fieldExpr := range s.config.FieldExpressions {
@@ -124,6 +151,11 @@ func (s *Stream) compileExpressionInfo() {
exprInfo.hasNestedFields = !exprInfo.isFunctionCall && strings.Contains(fieldExpr.Expression, ".")
exprInfo.needsBacktickPreprocess = bridge.ContainsBacktickIdentifiers(fieldExpr.Expression)
// Check if expression contains unnest function
if exprInfo.isFunctionCall && strings.Contains(strings.ToLower(fieldExpr.Expression), "unnest(") {
s.hasUnnestFunction = true
}
// Pre-compile expression object (only for non-function call expressions)
if !exprInfo.isFunctionCall {
exprToCompile := fieldExpr.Expression
@@ -391,7 +423,32 @@ func (s *Stream) processSingleFieldFallback(fieldSpec string, dataMap map[string
}
// Handle alias
parts := strings.Split(fieldSpec, ":")
var parts []string
// Helper to split field spec considering quotes
splitFieldSpec := func(spec string) []string {
inQuote := false
var quoteChar byte
for i := 0; i < len(spec); i++ {
c := spec[i]
if inQuote {
if c == quoteChar {
inQuote = false
}
} else {
if c == '\'' || c == '"' || c == '`' {
inQuote = true
quoteChar = c
} else if c == ':' {
// Found separator
return []string{spec[:i], spec[i+1:]}
}
}
}
// No separator found outside quotes
return []string{spec}
}
parts = splitFieldSpec(fieldSpec)
fieldName := parts[0]
outputName := fieldName
if len(parts) > 1 {

Some files were not shown because too many files have changed in this diff Show More