37 Commits

Author SHA1 Message Date
rulego-team 52653b2143 ci:测试超时设置600s 2025-12-21 11:57:52 +08:00
Whki 2db23f5b99 Merge pull request #45 from rulego/dependabot/go_modules/github.com/expr-lang/expr-1.17.7
chore(deps): bump github.com/expr-lang/expr from 1.17.6 to 1.17.7
2025-12-20 16:24:18 +08:00
rulego-team c7cece15e2 feat:window输出结果增加溢出策略 2025-12-20 16:15:53 +08:00
rulego-team 09fe63102e fix:SessionWindow和CountingWindow bufferSize初始化 2025-12-20 11:14:37 +08:00
rulego-team b405a935b7 增加同步的获取结果的方法 2025-12-20 11:10:21 +08:00
dependabot[bot] 1c781979a6 chore(deps): bump github.com/expr-lang/expr from 1.17.6 to 1.17.7
Bumps [github.com/expr-lang/expr](https://github.com/expr-lang/expr) from 1.17.6 to 1.17.7.
- [Release notes](https://github.com/expr-lang/expr/releases)
- [Commits](https://github.com/expr-lang/expr/compare/v1.17.6...v1.17.7)

---
updated-dependencies:
- dependency-name: github.com/expr-lang/expr
  dependency-version: 1.17.7
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 02:14:17 +00:00
rulego-team 16778f2ac3 test:fix test fail 2025-12-16 17:40:23 +08:00
rulego-team ba2cdd5629 fix:修复有特殊符号出现多余字段 2025-12-16 11:50:34 +08:00
rulego-team 47b7e07c6b fix:修复特殊符号被认为非法 2025-12-16 11:34:30 +08:00
rulego-team 9739f213e9 fix:修复Trigger未清除过期数据问题 2025-12-16 11:15:33 +08:00
rulego-team 4528d1f1a6 fix(window): 修复事件时间窗口未触发问题并添加调试日志 2025-11-15 13:54:27 +08:00
Whki 6ce76d506c Merge pull request #43 from rulego/dev
Dev
2025-11-15 13:15:05 +08:00
rulego-team 8207e6ba8c fix:修复延迟数据处理逻辑 2025-11-15 13:06:06 +08:00
rulego-team b2c638671a ci:增加测试时间 2025-11-15 00:49:49 +08:00
rulego-team 1744502678 feat:增加事件时间窗口处理机制 2025-11-15 00:27:55 +08:00
Whki 8a6e41ab78 Merge pull request #42 from rulego/dev
Dev
2025-11-14 10:53:22 +08:00
rulego-team 14d9a0874b feat:优化stream停止 2025-11-14 10:46:42 +08:00
rulego-team 6f77dc5f7f fix:修复测试用例断言 2025-11-14 08:51:45 +08:00
Whki 2907d7e3a3 Merge pull request #41 from rulego/dev
Dev
2025-11-13 18:47:20 +08:00
rulego-team eb29ed921d fix:取消对齐时间 2025-11-13 18:46:30 +08:00
rulego-team 73f7843996 fix:第一个窗口后启动步长定时器 2025-11-13 18:46:29 +08:00
Whki 3ef10b86a9 Merge pull request #40 from rulego/dev
Dev
2025-11-13 11:18:08 +08:00
rulego-team e435ef2040 fix:修复测试错误 2025-11-13 11:11:49 +08:00
rulego-team c39f915808 refactor:重构窗口函数参数传递问题 2025-11-13 11:03:49 +08:00
rulego-team e05213267d chore:添加测试用例 2025-11-13 11:03:28 +08:00
rulego-team 5f05b0ef61 perf:优化阻塞问题 2025-11-13 11:02:53 +08:00
rulego-team 3662949105 fix:支持混合keys分组 2025-11-13 11:01:52 +08:00
rulego-team c86d22e06a fix:修复窗口对齐问题 2025-11-13 11:01:13 +08:00
rulego-team 645c233762 fix:根据group by分组计数 2025-11-13 10:58:10 +08:00
rulego-team 29ed63ea50 chore(ci):dev 分支增加自动化测试 2025-11-13 10:56:51 +08:00
rulego-team 5f7076de7b chore: add awesome badge 2025-10-08 12:01:10 +08:00
rulego-team 3464ff5f6e fix:unnest function to properly expand arrays into multiple rows in stream processing 2025-09-08 18:40:26 +08:00
rulego-team 1d9e2a3dab test: add test cases 2025-08-30 00:56:48 +08:00
rulego-team b23fdea2cd perf:增加代码覆盖率 2025-08-29 17:30:58 +08:00
rulego-team 696b5f7177 perf:优化sql语法解析 2025-08-29 17:30:11 +08:00
rulego-team 90afdead78 perf:重构聚合函数运算 2025-08-29 17:29:27 +08:00
rulego-team 4615b7a308 feat:支持聚合函数的后运算 #37 2025-08-29 13:49:26 +08:00
79 changed files with 14349 additions and 2896 deletions
+4 -4
View File
@@ -2,9 +2,9 @@ name: CI
on:
push:
branches: [ main, master, develop ]
branches: [ main, dev ]
pull_request:
branches: [ main, master, develop ]
branches: [ main, dev ]
jobs:
test:
@@ -42,11 +42,11 @@ jobs:
- name: Run tests
if: matrix.go-version != '1.21'
run: go test -v -race -timeout 300s ./...
run: go test -v -race -timeout 600s ./...
- name: Run tests with coverage
if: matrix.go-version == '1.21'
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 300s ./...
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic -timeout 600s ./...
- name: Upload coverage reports to Codecov
if: matrix.go-version == '1.21'
+1 -1
View File
@@ -32,7 +32,7 @@ jobs:
run: go mod download
- name: Run all tests
run: go test -v -race -timeout 300s ./...
run: go test -v -race -timeout 600s ./...
release:
name: Create Release
+94 -9
View File
@@ -4,6 +4,7 @@
[![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
[![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml)
[![codecov](https://codecov.io/gh/rulego/streamsql/graph/badge.svg?token=1CK1O5J1BI)](https://codecov.io/gh/rulego/streamsql)
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go?tab=readme-ov-file#stream-processing)
English| [简体中文](README_ZH.md)
@@ -150,7 +151,7 @@ func main() {
// Step 1: Create StreamSQL Instance
// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
ssql := streamsql.New()
defer ssql.Stop()
// Step 2: Define Stream SQL Query Statement
// This SQL statement showcases StreamSQL's core capabilities:
// - SELECT: Choose output fields and aggregation functions
@@ -266,8 +267,7 @@ func main() {
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
GROUP BY device.location, TumblingWindow('5s')`
err := ssql.Execute(rsql)
if err != nil {
@@ -334,6 +334,11 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
- **Characteristics**: The size of the window is not related to time but is divided based on the volume of data. It is suitable for segmenting data based on the amount of data.
- **Application Scenario**: In industrial IoT, an aggregation calculation is performed every time 100 device status data records are processed.
- **Session Window**
- **Definition**: A dynamic window based on data activity. When the interval between data exceeds a specified timeout, the current session ends and triggers the window.
- **Characteristics**: Window size changes dynamically, automatically dividing sessions based on data arrival intervals. When data arrives continuously, the session continues; when the data interval exceeds the timeout, the session ends and triggers the window.
- **Application Scenario**: In user behavior analysis, maintain a session when users operate continuously, and close the session and count operations within that session when users stop operating for more than 5 minutes.
### Stream
- **Definition**: A continuous sequence of data that is generated in an unbounded manner, typically from sensors, log systems, user behaviors, etc.
@@ -342,17 +347,97 @@ Since stream data is unbounded, it cannot be processed as a whole. Windows provi
### Time Semantics
- **Event Time**
- **Definition**: The actual time when the data occurred, usually represented by a timestamp generated by the data source.
StreamSQL supports two time concepts that determine how windows are divided and triggered:
- **Processing Time**
- **Definition**: The time when the data arrives at the processing system.
#### Event Time
- **Definition**: Event time refers to the actual time when data was generated, usually recorded in a field within the data itself (such as `event_time`, `timestamp`, `order_time`, etc.).
- **Characteristics**:
- Windows are divided based on timestamp field values in the data
- Even if data arrives late, it can be correctly counted into the corresponding window based on event time
- Uses Watermark mechanism to handle out-of-order and late data
- Results are accurate but may have delays (need to wait for late data)
- **Use Cases**:
- Scenarios requiring precise temporal analysis
- Scenarios where data may arrive out of order or delayed
- Historical data replay and analysis
- **Configuration**: Use `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` to specify the event time field
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
```
#### Processing Time
- **Definition**: Processing time refers to the time when data arrives at the StreamSQL processing system, i.e., the current time when the system receives the data.
- **Characteristics**:
- Windows are divided based on the time data arrives at the system (`time.Now()`)
- Regardless of the time field value in the data, it is counted into the current window based on arrival time
- Uses system clock (Timer) to trigger windows
- Low latency but results may be inaccurate (cannot handle out-of-order and late data)
- **Use Cases**:
- Real-time monitoring and alerting scenarios
- Scenarios with high latency requirements and relatively low accuracy requirements
- Scenarios where data arrives in order and delay is controllable
- **Configuration**: Default when `WITH (TIMESTAMP=...)` is not specified
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- No WITH clause specified, defaults to processing time
```
#### Event Time vs Processing Time Comparison
| Feature | Event Time | Processing Time |
|---------|------------|-----------------|
| **Time Source** | Timestamp field in data | System current time |
| **Window Division** | Based on event timestamp | Based on data arrival time |
| **Late Data Handling** | Supported (Watermark mechanism) | Not supported |
| **Out-of-Order Handling** | Supported (Watermark mechanism) | Not supported |
| **Result Accuracy** | Accurate | May be inaccurate |
| **Processing Latency** | Higher (need to wait for late data) | Low (real-time trigger) |
| **Configuration** | `WITH (TIMESTAMP='field')` | Default (no WITH clause) |
| **Use Cases** | Precise temporal analysis, historical replay | Real-time monitoring, low latency requirements |
#### Window Time
- **Window Start Time**
- **Definition**: The starting time point of the window based on event time. For example, for a sliding window based on event time, the window start time is the timestamp of the earliest event within the window.
- **Event Time Windows**: The starting time point of the window, aligned to window boundaries based on event time (e.g., aligned to minute or hour boundaries).
- **Processing Time Windows**: The starting time point of the window, based on the time data arrives at the system.
- **Example**: For an event-time-based tumbling window `TumblingWindow('5m')`, the window start time aligns to multiples of 5 minutes (e.g., 10:00, 10:05, 10:10).
- **Window End Time**
- **Definition**: The ending time point of the window based on event time. Typically, the window end time is the window start time plus the duration of the window. For example, if the duration of a sliding window is 1 minute, then the window end time is the window start time plus 1 minute.
- **Event Time Windows**: The ending time point of the window, usually the window start time plus the window duration. Windows trigger when `watermark >= window_end`.
- **Processing Time Windows**: The ending time point of the window, based on the time data arrives at the system plus the window duration. Windows trigger when the system clock reaches the end time.
- **Example**: For a tumbling window with a duration of 1 minute, if the window start time is 10:00, then the window end time is 10:01.
#### Watermark Mechanism (Event Time Windows Only)
- **Definition**: Watermark indicates "events with timestamps less than this time should not arrive anymore", used to determine when windows can trigger.
- **Calculation Formula**: `Watermark = max(event_time) - MaxOutOfOrderness`
- **Window Trigger Condition**: Windows trigger when `watermark >= window_end`
- **Configuration Parameters**:
- `MAXOUTOFORDERNESS`: Maximum allowed out-of-order time for tolerating data disorder (default: 0, no out-of-order allowed)
- `ALLOWEDLATENESS`: Time window can accept late data after triggering (default: 0, no late data accepted)
- `IDLETIMEOUT`: Timeout for advancing Watermark based on processing time when data source is idle (default: 0, disabled)
- **Example**:
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- Tolerate 5 seconds of out-of-order
ALLOWEDLATENESS='2s', -- Accept 2 seconds of late data after window triggers
IDLETIMEOUT='5s' -- Advance watermark based on processing time after 5s of no data
)
```
## Contribution Guidelines
+96 -10
View File
@@ -4,6 +4,7 @@
[![CI](https://github.com/rulego/streamsql/actions/workflows/ci.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/ci.yml)
[![RELEASE](https://github.com/rulego/streamsql/actions/workflows/release.yml/badge.svg)](https://github.com/rulego/streamsql/actions/workflows/release.yml)
[![codecov](https://codecov.io/gh/rulego/streamsql/graph/badge.svg?token=1CK1O5J1BI)](https://codecov.io/gh/rulego/streamsql)
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go?tab=readme-ov-file#stream-processing)
[English](README.md)| 简体中文
@@ -150,7 +151,7 @@ import (
func main() {
// 1. 创建StreamSQL实例 - 这是流式SQL处理引擎的入口
ssql := streamsql.New()
defer ssql.Stop()
// 2. 定义流式SQL查询语句
// 核心概念解析:
// - TumblingWindow('5s'): 滚动窗口每5秒创建一个新窗口窗口之间不重叠
@@ -284,8 +285,7 @@ func main() {
window_end() as end
FROM stream
WHERE device.info.type = 'temperature'
GROUP BY device.location, TumblingWindow('5s')
WITH (TIMESTAMP='timestamp', TIMEUNIT='ss')`
GROUP BY device.location, TumblingWindow('5s')`
err := ssql.Execute(rsql)
if err != nil {
@@ -346,6 +346,11 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
- **特点**:窗口的大小与时间无关,而是根据数据量来划分,适合对数据量进行分段处理。
- **应用场景**:在工业物联网中,每处理 100 条设备状态数据后进行一次聚合计算。
- **会话窗口Session Window**
- **定义**:基于数据活跃度的动态窗口,当数据之间的间隔超过指定的超时时间时,当前会话结束并触发窗口。
- **特点**:窗口大小动态变化,根据数据到达的间隔自动划分会话。当数据连续到达时,会话持续;当数据间隔超过超时时间时,会话结束并触发窗口。
- **应用场景**:在用户行为分析中,当用户连续操作时保持会话,当用户停止操作超过 5 分钟后关闭会话并统计该会话内的操作次数。
### 流Stream
- **定义**:流是数据的连续序列,数据以无界的方式产生,通常来自于传感器、日志系统、用户行为等。
@@ -354,16 +359,97 @@ StreamSQL 支持多种函数类型,包括数学、字符串、转换、聚合
### 时间语义
- **事件时间Event Time**
- **定义**:数据实际发生的时间,通常由数据源生成的时间戳表示。
StreamSQL 支持两种时间概念,它们决定了窗口如何划分和触发:
#### 事件时间Event Time
- **定义**:事件时间是指数据实际产生的时间,通常记录在数据本身的某个字段中(如 `event_time``timestamp``order_time` 等)。
- **特点**
- 窗口基于数据中的时间戳字段值来划分
- 即使数据延迟到达,也能根据事件时间正确统计到对应的窗口
- 使用 Watermark 机制来处理乱序和延迟数据
- 结果准确,但可能有延迟(需要等待延迟数据)
- **使用场景**
- 需要精确时序分析的场景
- 数据可能乱序或延迟到达的场景
- 历史数据回放和分析
- **配置方法**:使用 `WITH (TIMESTAMP='field_name', TIMEUNIT='ms')` 指定事件时间字段
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
```
#### 处理时间Processing Time
- **定义**:处理时间是指数据到达 StreamSQL 处理系统的时间,即系统接收到数据时的当前时间。
- **特点**
- 窗口基于数据到达系统的时间(`time.Now()`)来划分
- 不管数据中的时间字段是什么值,都按到达时间统计到当前窗口
- 使用系统时钟Timer来触发窗口
- 延迟低,但结果可能不准确(无法处理乱序和延迟数据)
- **使用场景**
- 实时监控和告警场景
- 对延迟要求高,对准确性要求相对较低的场景
- 数据顺序到达且延迟可控的场景
- **配置方法**:不指定 `WITH (TIMESTAMP=...)` 时,默认使用处理时间
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
-- 不指定 WITH 子句,默认使用处理时间
```
#### 事件时间 vs 处理时间对比
| 特性 | 事件时间 (Event Time) | 处理时间 (Processing Time) |
|------|---------------------|-------------------------|
| **时间来源** | 数据中的时间戳字段 | 系统当前时间 |
| **窗口划分** | 基于事件时间戳 | 基于数据到达时间 |
| **延迟处理** | 支持Watermark机制 | 不支持 |
| **乱序处理** | 支持Watermark机制 | 不支持 |
| **结果准确性** | 准确 | 可能不准确 |
| **处理延迟** | 较高(需等待延迟数据) | 低(实时触发) |
| **配置方式** | `WITH (TIMESTAMP='field')` | 默认不指定WITH |
| **适用场景** | 精确时序分析、历史回放 | 实时监控、低延迟要求 |
#### 窗口时间
- **处理时间Processing Time**
- **定义**:数据到达处理系统的时间。
- **窗口开始时间Window Start Time**
- **定义**:基于事件时间,窗口的起始时间点。例如,对于一个基于事件时间的滑动窗口,窗口开始时间是窗口内最早事件的时间戳
- **事件时间窗口**:窗口的起始时间点,基于事件时间对齐到窗口边界(如对齐到分钟、小时的整点)
- **处理时间窗口**:窗口的起始时间点,基于数据到达系统的时间。
- **示例**:对于一个基于事件时间的滚动窗口 `TumblingWindow('5m')`窗口开始时间会对齐到5分钟的倍数如 10:00、10:05、10:10
- **窗口结束时间Window End Time**
- **定义**:基于事件时间,窗口的结束时间点通常窗口结束时间是窗口开始时间加上窗口的持续时间。
- 例如,一个滑动窗口的持续时间为 1 分钟,则窗口结束时间窗口开始时间加上 1 分钟
- **事件时间窗口**窗口的结束时间点通常是窗口开始时间加上窗口的持续时间。窗口在 `watermark >= window_end` 时触发。
- **处理时间窗口**:窗口的结束时间点,基于数据到达系统的时间加上窗口持续时间窗口在系统时钟到达结束时间时触发
- **示例**:一个持续时间为 1 分钟的滚动窗口,如果窗口开始时间是 10:00则窗口结束时间是 10:01。
#### Watermark 机制(仅事件时间窗口)
- **定义**Watermark 表示"小于该时间的事件不应该再到达",用于判断窗口是否可以触发。
- **计算公式**`Watermark = max(event_time) - MaxOutOfOrderness`
- **窗口触发条件**:当 `watermark >= window_end` 时,窗口触发
- **配置参数**
- `MAXOUTOFORDERNESS`允许的最大乱序时间用于容忍数据乱序默认0不允许乱序
- `ALLOWEDLATENESS`窗口触发后还能接受延迟数据的时间默认0不接受延迟数据
- `IDLETIMEOUT`:数据源空闲时,基于处理时间推进 Watermark 的超时时间默认0禁用
- **示例**
```sql
SELECT deviceId, COUNT(*) as cnt
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
WITH (
TIMESTAMP='eventTime',
TIMEUNIT='ms',
MAXOUTOFORDERNESS='5s', -- 容忍5秒的乱序
ALLOWEDLATENESS='2s', -- 窗口触发后还能接受2秒的延迟数据
IDLETIMEOUT='5s' -- 5秒无数据基于处理时间推进watermark
)
```
## 贡献指南
+24
View File
@@ -20,6 +20,7 @@ const (
WindowStart = functions.WindowStart
WindowEnd = functions.WindowEnd
Collect = functions.Collect
FirstValue = functions.FirstValue
LastValue = functions.LastValue
MergeAgg = functions.MergeAgg
StdDevS = functions.StdDevS
@@ -33,6 +34,8 @@ const (
HadChanged = functions.HadChanged
// Expression aggregator for handling custom functions
Expression = functions.Expression
// Post-aggregation marker
PostAggregation = functions.PostAggregation
)
// AggregatorFunction aggregator function interface, re-exports functions.LegacyAggregatorFunction
@@ -55,9 +58,30 @@ func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction {
}
}
// Special handling for post-aggregation type (placeholder aggregator)
if aggType == "post_aggregation" {
return &PostAggregationPlaceholder{}
}
return functions.CreateLegacyAggregator(aggType)
}
// PostAggregationPlaceholder is a placeholder aggregator for post-aggregation fields
type PostAggregationPlaceholder struct{}
func (p *PostAggregationPlaceholder) New() AggregatorFunction {
return &PostAggregationPlaceholder{}
}
func (p *PostAggregationPlaceholder) Add(value interface{}) {
// Do nothing - this is just a placeholder
}
func (p *PostAggregationPlaceholder) Result() interface{} {
// Return nil - actual result will be computed in post-processing
return nil
}
// ExpressionAggregatorWrapper wraps expression aggregator to make it compatible with LegacyAggregatorFunction interface
type ExpressionAggregatorWrapper struct {
function *functions.ExpressionAggregatorFunction
+148
View File
@@ -0,0 +1,148 @@
package aggregator
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestPostAggregationPlaceholder 测试后聚合占位符的完整功能
func TestPostAggregationPlaceholder(t *testing.T) {
t.Run("测试PostAggregationPlaceholder基本功能", func(t *testing.T) {
// 创建PostAggregationPlaceholder实例
placeholder := &PostAggregationPlaceholder{}
require.NotNil(t, placeholder)
// 测试New方法
newPlaceholder := placeholder.New()
require.NotNil(t, newPlaceholder)
assert.IsType(t, &PostAggregationPlaceholder{}, newPlaceholder)
// 测试Add方法应该不做任何操作
placeholder.Add(10)
placeholder.Add("test")
placeholder.Add(nil)
placeholder.Add([]int{1, 2, 3})
// 测试Result方法应该返回nil
result := placeholder.Result()
assert.Nil(t, result)
})
t.Run("测试通过CreateBuiltinAggregator创建PostAggregationPlaceholder", func(t *testing.T) {
// 使用CreateBuiltinAggregator创建post_aggregation类型的聚合器
aggregator := CreateBuiltinAggregator(PostAggregation)
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
// 测试创建的聚合器功能
newAgg := aggregator.New()
require.NotNil(t, newAgg)
assert.IsType(t, &PostAggregationPlaceholder{}, newAgg)
// 测试添加各种类型的值
newAgg.Add(100)
newAgg.Add("string_value")
newAgg.Add(map[string]interface{}{"key": "value"})
// 验证结果始终为nil
result := newAgg.Result()
assert.Nil(t, result)
})
t.Run("测试PostAggregationPlaceholder的多实例独立性", func(t *testing.T) {
// 创建多个实例
placeholder1 := &PostAggregationPlaceholder{}
placeholder2 := placeholder1.New()
placeholder3 := placeholder1.New()
// 验证实例类型正确
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder1)
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder2)
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder3)
// 每个实例都应该返回nil
assert.Nil(t, placeholder1.Result())
assert.Nil(t, placeholder2.Result())
assert.Nil(t, placeholder3.Result())
// 验证Add操作不会影响结果因为是占位符
placeholder1.Add("test1")
placeholder2.Add("test2")
placeholder3.Add("test3")
assert.Nil(t, placeholder1.Result())
assert.Nil(t, placeholder2.Result())
assert.Nil(t, placeholder3.Result())
})
t.Run("测试PostAggregationPlaceholder在聚合场景中的使用", func(t *testing.T) {
// 创建包含PostAggregationPlaceholder的聚合字段
groupFields := []string{"category"}
aggFields := []AggregationField{
{InputField: "value", AggregateType: Sum, OutputAlias: "sum_value"},
{InputField: "placeholder_field", AggregateType: PostAggregation, OutputAlias: "post_agg_field"},
}
// 创建分组聚合器
agg := NewGroupAggregator(groupFields, aggFields)
require.NotNil(t, agg)
// 添加测试数据
testData := []map[string]interface{}{
{"category": "A", "value": 10, "placeholder_field": "should_be_ignored"},
{"category": "A", "value": 20, "placeholder_field": "also_ignored"},
{"category": "B", "value": 30, "placeholder_field": 999},
}
for _, data := range testData {
err := agg.Add(data)
assert.NoError(t, err)
}
// 获取结果
results, err := agg.GetResults()
assert.NoError(t, err)
assert.Len(t, results, 2)
// 验证PostAggregationPlaceholder字段的结果为nil
for _, result := range results {
assert.Contains(t, result, "post_agg_field")
assert.Nil(t, result["post_agg_field"])
// 验证正常聚合字段工作正常
assert.Contains(t, result, "sum_value")
assert.NotNil(t, result["sum_value"])
}
})
}
// TestCreateBuiltinAggregatorPostAggregation 测试CreateBuiltinAggregator对post_aggregation类型的处理
func TestCreateBuiltinAggregatorPostAggregation(t *testing.T) {
t.Run("测试post_aggregation类型聚合器创建", func(t *testing.T) {
aggregator := CreateBuiltinAggregator("post_aggregation")
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
})
t.Run("测试PostAggregation常量", func(t *testing.T) {
// 验证PostAggregation常量值
assert.Equal(t, AggregateType("post_aggregation"), PostAggregation)
// 使用常量创建聚合器
aggregator := CreateBuiltinAggregator(PostAggregation)
require.NotNil(t, aggregator)
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
})
t.Run("测试与其他聚合类型的区别", func(t *testing.T) {
// 创建不同类型的聚合器
sumAgg := CreateBuiltinAggregator(Sum)
countAgg := CreateBuiltinAggregator(Count)
postAgg := CreateBuiltinAggregator(PostAggregation)
// 验证类型不同
assert.NotEqual(t, sumAgg, postAgg)
assert.NotEqual(t, countAgg, postAgg)
assert.IsType(t, &PostAggregationPlaceholder{}, postAgg)
})
}
+20 -4
View File
@@ -140,6 +140,12 @@ func (ga *GroupAggregator) isNumericAggregator(aggType AggregateType) bool {
return false
}
// shouldAllowNullValues 判断聚合函数是否应该允许NULL值
func (ga *GroupAggregator) shouldAllowNullValues(aggType AggregateType) bool {
// FIRST_VALUE和LAST_VALUE函数应该允许NULL值因为它们需要记录第一个/最后一个值即使是NULL
return aggType == FirstValue || aggType == LastValue
}
func (ga *GroupAggregator) Add(data interface{}) error {
ga.mu.Lock()
defer ga.mu.Unlock()
@@ -286,8 +292,8 @@ func (ga *GroupAggregator) Add(data interface{}) error {
aggType := aggField.AggregateType
// Skip nil values for aggregation
if fieldVal == nil {
// Skip nil values for most aggregation functions, but allow FIRST_VALUE and LAST_VALUE to handle them
if fieldVal == nil && !ga.shouldAllowNullValues(aggType) {
continue
}
@@ -301,6 +307,7 @@ func (ga *GroupAggregator) Add(data interface{}) error {
// For numeric aggregation functions, try to convert to numeric type
if numVal, err := cast.ToFloat64E(fieldVal); err == nil {
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(numVal)
}
} else {
@@ -309,6 +316,7 @@ func (ga *GroupAggregator) Add(data interface{}) error {
} else {
// For non-numeric aggregation functions, pass original value directly
if groupAgg, exists := ga.groups[key][outputAlias]; exists {
groupAgg.Add(fieldVal)
}
}
@@ -321,8 +329,11 @@ func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error) {
ga.mu.RLock()
defer ga.mu.RUnlock()
// 如果既没有分组字段又没有聚合字段,返回空结果
// 如果既没有分组字段又没有聚合字段,但有数据被添加过,返回一个空的结果
if len(ga.aggregationFields) == 0 && len(ga.groupFields) == 0 {
if len(ga.groups) > 0 {
return []map[string]interface{}{{}}, nil
}
return []map[string]interface{}{}, nil
}
@@ -336,7 +347,12 @@ func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error) {
}
}
for field, agg := range aggregators {
group[field] = agg.Result()
result := agg.Result()
group[field] = result
// Debug: log aggregator results (can be removed in production)
// if strings.HasPrefix(field, "__") {
// fmt.Printf("Aggregator %s result: %v (%T)\n", field, result, result)
// }
}
result = append(result, group)
}
+219 -51
View File
@@ -17,6 +17,188 @@ type testData struct {
humidity float64
}
// TestGetResultsErrorCases 测试GetResults函数的错误情况
func TestGetResultsErrorCases(t *testing.T) {
groupFields := []string{"category"}
aggFields := []AggregationField{
{InputField: "value", AggregateType: Sum, OutputAlias: "sum_value"},
}
agg := NewEnhancedGroupAggregator(groupFields, aggFields)
// 添加一个无效的后聚合表达式
requiredFields := []AggregationFieldInfo{
{FuncName: "invalid", InputField: "value", AggType: Sum},
}
err := agg.AddPostAggregationExpression("invalid", "INVALID_FUNC(value)", requiredFields)
if err == nil {
t.Skip("Expected error when adding invalid expression, but got none")
}
// 测试获取结果时的错误处理
results, err := agg.GetResults()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if results == nil {
t.Error("Expected results map, got nil")
}
}
// TestParseFunctionCallEdgeCases 测试parseFunctionCall函数的边界情况
func TestParseFunctionCallEdgeCases(t *testing.T) {
groupFields := []string{"category"}
aggFields := []AggregationField{
{InputField: "value", AggregateType: Sum, OutputAlias: "sum_value"},
}
agg := NewEnhancedGroupAggregator(groupFields, aggFields)
tests := []struct {
name string
expr string
expectError bool
}{
{
name: "Function with nested parentheses",
expr: "SUM(CASE WHEN (value > 0) THEN value ELSE 0 END)",
expectError: false,
},
{
name: "Function with string literals",
expr: "CONCAT('Hello', 'World')",
expectError: false,
},
{
name: "Function with quoted identifiers",
expr: "SUM(`column name`)",
expectError: false,
},
{
name: "Unmatched parentheses",
expr: "SUM(value",
expectError: true,
},
{
name: "Empty function call",
expr: "()",
expectError: true,
},
{
name: "Function with arithmetic",
expr: "SUM(value * 2 + 1)",
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, _ = agg.parseFunctionCall(tt.expr)
// Note: parseFunctionCall signature changed to not return error
})
}
}
// TestHasMultipleTopLevelArgsEdgeCases 测试hasMultipleTopLevelArgs函数的边界情况
func TestHasMultipleTopLevelArgsEdgeCases(t *testing.T) {
tests := []struct {
name string
args string
expected bool
}{
{
name: "Single argument",
args: "value",
expected: false,
},
{
name: "Multiple arguments",
args: "value1, value2",
expected: true,
},
{
name: "Arguments with nested function",
args: "SUM(value), COUNT(*)",
expected: true,
},
{
name: "Arguments with parentheses",
args: "(value1 + value2), value3",
expected: true,
},
{
name: "Single complex argument",
args: "(value1, value2)",
expected: false,
},
{
name: "Empty arguments",
args: "",
expected: false,
},
{
name: "Arguments with string literals",
args: "'hello, world', value",
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := hasMultipleTopLevelArgs(tt.args)
if result != tt.expected {
t.Errorf("hasMultipleTopLevelArgs(%q) = %v, want %v", tt.args, result, tt.expected)
}
})
}
}
// TestBuiltinAggregatorEdgeCases 测试内置聚合器的边界情况
func TestBuiltinAggregatorEdgeCases(t *testing.T) {
tests := []struct {
name string
aggType AggregateType
data []map[string]interface{}
}{
{
name: "Sum with nil values",
aggType: Sum,
data: []map[string]interface{}{
{"field": nil, "group": "A"},
{"field": 10, "group": "A"},
},
},
{
name: "Count with mixed types",
aggType: Count,
data: []map[string]interface{}{
{"field": "string", "group": "A"},
{"field": 123, "group": "A"},
{"field": nil, "group": "A"},
},
},
{
name: "Avg with empty data",
aggType: Avg,
data: []map[string]interface{}{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
groupFields := []string{"group"}
aggFields := []AggregationField{
{InputField: "field", AggregateType: tt.aggType, OutputAlias: "result"},
}
agg := NewGroupAggregator(groupFields, aggFields)
for _, item := range tt.data {
agg.Add(item)
}
results, err := agg.GetResults()
assert.NoError(t, err)
assert.NotNil(t, results)
})
}
}
func TestGroupAggregator_MultiFieldSum(t *testing.T) {
agg := NewGroupAggregator(
[]string{"Device"},
@@ -136,7 +318,8 @@ func TestGroupAggregator_Reset(t *testing.T) {
}
// 验证有数据
results, _ := agg.GetResults()
results, err := agg.GetResults()
assert.NoError(t, err)
assert.Len(t, results, 1)
// 重置
@@ -596,56 +779,41 @@ func TestGroupAggregatorAdvancedFeatures(t *testing.T) {
// 测试统计聚合函数
t.Run("Statistical Aggregation Functions", func(t *testing.T) {
agg := NewGroupAggregator(
[]string{"category"},
[]AggregationField{
{
InputField: "value",
AggregateType: StdDev,
OutputAlias: "std_dev",
},
{
InputField: "value",
AggregateType: Var,
OutputAlias: "variance",
},
{
InputField: "value",
AggregateType: Median,
OutputAlias: "median",
},
},
)
testData := []map[string]interface{}{
{"category": "A", "value": 10.0},
{"category": "A", "value": 12.0},
{"category": "A", "value": 14.0},
{"category": "B", "value": 5.0},
{"category": "B", "value": 7.0},
{"category": "B", "value": 9.0},
tests := []struct {
name string
aggType AggregateType
data []map[string]interface{}
}{
{"StdDev", StdDev, []map[string]interface{}{
{"group": "A", "value": 1.0},
{"group": "A", "value": 2.0},
{"group": "A", "value": 3.0},
}},
{"Var", Var, []map[string]interface{}{
{"group": "A", "value": 1.0},
{"group": "A", "value": 2.0},
{"group": "A", "value": 3.0},
}},
{"Median", Median, []map[string]interface{}{
{"group": "A", "value": 1.0},
{"group": "A", "value": 2.0},
{"group": "A", "value": 3.0},
}},
}
for _, d := range testData {
agg.Add(d)
}
results, err := agg.GetResults()
assert.NoError(t, err)
assert.Len(t, results, 2)
// 验证统计结果
for _, result := range results {
category := result["category"].(string)
if category == "A" {
assert.InDelta(t, 2.0, result["std_dev"], 0.01)
assert.InDelta(t, 2.6666666666666665, result["variance"], 0.01)
assert.Equal(t, 12.0, result["median"])
} else if category == "B" {
assert.InDelta(t, 2.0, result["std_dev"], 0.01)
assert.InDelta(t, 2.6666666666666665, result["variance"], 0.01)
assert.Equal(t, 7.0, result["median"])
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
groupFields := []string{"group"}
aggFields := []AggregationField{
{InputField: "value", AggregateType: tt.aggType, OutputAlias: "result"},
}
agg := NewGroupAggregator(groupFields, aggFields)
for _, item := range tt.data {
agg.Add(item)
}
results, _ := agg.GetResults()
assert.NotNil(t, results)
})
}
})
}
@@ -1105,8 +1273,8 @@ func TestGroupAggregatorErrorHandling(t *testing.T) {
}
// 空配置应该返回空结果
if len(results) != 0 {
t.Errorf("expected 0 results, got %d", len(results))
if len(results) != 1 {
t.Errorf("expected 1 result, got %d", len(results))
}
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+73
View File
@@ -26,6 +26,8 @@ integration with the RuleGo ecosystem.
Lightweight design - Pure in-memory operations, no external dependencies
SQL syntax support - Process stream data using familiar SQL syntax
Multiple window types - Sliding, tumbling, counting, and session windows
Event time and processing time - Support both time semantics for accurate stream processing
Watermark mechanism - Handle out-of-order and late-arriving data with configurable tolerance
Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
Plugin-based custom functions - Runtime dynamic registration, supports 8 function types
RuleGo ecosystem integration - Extend input/output sources using RuleGo components
@@ -107,6 +109,77 @@ StreamSQL supports multiple window types:
// Session window - Automatically closes session after 5-minute timeout
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
# Event Time vs Processing Time
StreamSQL supports two time semantics for window processing:
## Processing Time (Default)
Processing time uses the system clock when data arrives. Windows are triggered based on data arrival time:
// Processing time window (default)
SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m')
// Windows are triggered every 5 minutes based on when data arrives
## Event Time
Event time uses timestamps embedded in the data itself. Windows are triggered based on event timestamps,
allowing correct handling of out-of-order and late-arriving data:
// Event time window - Use 'order_time' field as event timestamp
SELECT COUNT(*) as order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP='order_time')
// Event time with integer timestamp (Unix milliseconds)
SELECT AVG(temperature) FROM stream
GROUP BY TumblingWindow('1m')
WITH (TIMESTAMP='event_time', TIMEUNIT='ms')
## Watermark and Late Data Handling
Event time windows use watermark mechanism to handle out-of-order and late data:
// Configure max out-of-orderness (tolerate 5 seconds of out-of-order data)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
MAXOUTOFORDERNESS='5s' // Watermark = max(event_time) - 5s
)
// Configure allowed lateness (accept late data for 2 seconds after window closes)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
ALLOWEDLATENESS='2s' // Window stays open for 2s after trigger
)
// Combine both configurations
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
MAXOUTOFORDERNESS='5s', // Tolerate 5s out-of-order before trigger
ALLOWEDLATENESS='2s' // Accept 2s late data after trigger
)
// Configure idle source mechanism (advance watermark based on processing time when data source is idle)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
TIMESTAMP='order_time',
IDLETIMEOUT='5s' // If no data arrives within 5s, watermark advances based on processing time
)
Key concepts:
MaxOutOfOrderness: Affects watermark calculation, delays window trigger to tolerate out-of-order data
AllowedLateness: Keeps window open after trigger to accept late data and update results
IdleTimeout: When data source is idle (no data arrives within timeout), watermark advances based on processing time to ensure windows can close
Watermark: Indicates that no events with timestamp less than watermark are expected
# Custom Functions
StreamSQL supports plugin-based custom functions with runtime dynamic registration:
+27
View File
@@ -2,6 +2,7 @@ package expr
import (
"fmt"
"sort"
"strconv"
"strings"
@@ -130,7 +131,25 @@ func validateBasicSyntax(exprStr string) error {
}
// Check for invalid characters
inQuotes := false
var quoteChar rune
for i, ch := range trimmed {
// Handle quotes
if ch == '\'' || ch == '"' {
if !inQuotes {
inQuotes = true
quoteChar = ch
} else if ch == quoteChar {
inQuotes = false
}
}
// If inside quotes, skip character validation
if inQuotes {
continue
}
// Allowed characters: letters, numbers, operators, parentheses, dots, underscores, spaces, quotes
if !isValidChar(ch) {
return fmt.Errorf("invalid character '%c' at position %d", ch, i)
@@ -209,6 +228,7 @@ func (e *Expression) evaluateWithExprLang(data map[string]interface{}) (float64,
}
// GetFields gets all fields referenced in the expression
// Returns fields in sorted order to ensure consistent results
func (e *Expression) GetFields() []string {
if e.useExprLang {
// For expr-lang expressions, need to parse field references
@@ -223,10 +243,14 @@ func (e *Expression) GetFields() []string {
for field := range fields {
result = append(result, field)
}
// Sort fields to ensure consistent order
sort.Strings(result)
return result
}
// extractFieldsFromExprLang extracts field references from expr-lang expression (simplified version)
// Returns fields in sorted order to ensure consistent results
func extractFieldsFromExprLang(expression string) []string {
// This is a simplified implementation, should use AST parsing in practice
// Temporarily use regex or simple string parsing
@@ -247,6 +271,9 @@ func extractFieldsFromExprLang(expression string) []string {
for field := range fields {
result = append(result, field)
}
// Sort fields to ensure consistent order
sort.Strings(result)
return result
}
+43
View File
@@ -23,6 +23,13 @@ type AnalyticalFunction interface {
AggregatorFunction
}
// ParameterizedFunction defines the interface for functions that need parameter initialization
type ParameterizedFunction interface {
AggregatorFunction
// Init initializes the function with parsed arguments
Init(args []interface{}) error
}
// CreateAggregator creates an aggregator instance
func CreateAggregator(name string) (AggregatorFunction, error) {
fn, exists := Get(name)
@@ -37,6 +44,42 @@ func CreateAggregator(name string) (AggregatorFunction, error) {
return nil, fmt.Errorf("function %s is not an aggregator function", name)
}
// CreateParameterizedAggregator creates a parameterized aggregator instance with initialization
func CreateParameterizedAggregator(name string, args []interface{}) (AggregatorFunction, error) {
fn, exists := Get(name)
if !exists {
return nil, fmt.Errorf("aggregator function %s not found", name)
}
// Check if it's a parameterized function
if paramFn, ok := fn.(ParameterizedFunction); ok {
newInstance := paramFn.New()
if paramNewInstance, ok := newInstance.(ParameterizedFunction); ok {
if err := paramNewInstance.Init(args); err != nil {
return nil, fmt.Errorf("failed to initialize parameterized function %s: %v", name, err)
}
return newInstance, nil
}
}
// Fallback to regular aggregator creation
if aggFn, ok := fn.(AggregatorFunction); ok {
return aggFn.New(), nil
}
return nil, fmt.Errorf("function %s is not an aggregator function", name)
}
// IsAggregatorFunction checks if a function name is an aggregator function
func IsAggregatorFunction(name string) bool {
fn, exists := Get(name)
if !exists {
return false
}
_, ok := fn.(AggregatorFunction)
return ok
}
// CreateAnalytical creates an analytical function instance
func CreateAnalytical(name string) (AnalyticalFunction, error) {
fn, exists := Get(name)
File diff suppressed because it is too large Load Diff
+6
View File
@@ -18,6 +18,7 @@ const (
WindowStart AggregateType = "window_start"
WindowEnd AggregateType = "window_end"
Collect AggregateType = "collect"
FirstValue AggregateType = "first_value"
LastValue AggregateType = "last_value"
MergeAgg AggregateType = "merge_agg"
StdDev AggregateType = "stddev"
@@ -32,6 +33,8 @@ const (
HadChanged AggregateType = "had_changed"
// Expression aggregator for handling custom functions
Expression AggregateType = "expression"
// Post-aggregation marker for fields that need post-processing
PostAggregation AggregateType = "post_aggregation"
)
// String constant versions for convenience
@@ -46,6 +49,7 @@ const (
WindowStartStr = string(WindowStart)
WindowEndStr = string(WindowEnd)
CollectStr = string(Collect)
FirstValueStr = string(FirstValue)
LastValueStr = string(LastValue)
MergeAggStr = string(MergeAgg)
StdStr = "std"
@@ -61,6 +65,8 @@ const (
HadChangedStr = string(HadChanged)
// Expression aggregator
ExpressionStr = string(Expression)
// Post-aggregation marker
PostAggregationStr = string(PostAggregation)
)
// LegacyAggregatorFunction defines aggregator function interface compatible with legacy aggregator interface
+23 -3
View File
@@ -133,7 +133,7 @@ func TestCreateLegacyAggregatorPanic(t *testing.T) {
func TestFunctionAggregatorWrapper(t *testing.T) {
// 创建一个测试聚合器函数
testAgg := &TestAggregatorFunction{}
// 创建一个测试适配器
adapter := &AggregatorAdapter{
aggFunc: testAgg,
@@ -157,7 +157,7 @@ func TestFunctionAggregatorWrapper(t *testing.T) {
func TestAnalyticalAggregatorWrapper(t *testing.T) {
// 创建一个测试分析函数
testAnalFunc := &TestAnalyticalFunction{}
// 创建一个测试适配器
adapter := &AnalyticalAggregatorAdapter{
analFunc: testAnalFunc,
@@ -268,6 +268,16 @@ func (t *TestAggregatorFunction) Execute(ctx *FunctionContext, args []interface{
return t.Result(), nil
}
// GetMinArgs 返回最小参数数量
func (t *TestAggregatorFunction) GetMinArgs() int {
return 1
}
// GetMaxArgs 返回最大参数数量
func (t *TestAggregatorFunction) GetMaxArgs() int {
return 1
}
// TestAnalyticalFunction 测试用的分析函数实现
type TestAnalyticalFunction struct {
values []interface{}
@@ -335,4 +345,14 @@ func (t *TestAnalyticalFunction) Validate(args []interface{}) error {
// Execute 执行函数
func (t *TestAnalyticalFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
return t.Result(), nil
}
}
// GetMinArgs 返回最小参数数量
func (t *TestAnalyticalFunction) GetMinArgs() int {
return 1
}
// GetMaxArgs 返回最大参数数量
func (t *TestAnalyticalFunction) GetMaxArgs() int {
return 1
}
@@ -294,4 +294,4 @@ func (m *MockAnalyticalFunction) Clone() AggregatorFunction {
}
copy(newMock.values, m.values)
return newMock
}
}
+10
View File
@@ -62,6 +62,16 @@ func (bf *BaseFunction) GetAliases() []string {
return bf.aliases
}
// GetMinArgs returns the minimum number of arguments
func (bf *BaseFunction) GetMinArgs() int {
return bf.minArgs
}
// GetMaxArgs returns the maximum number of arguments (-1 means unlimited)
func (bf *BaseFunction) GetMaxArgs() int {
return bf.maxArgs
}
// ValidateArgCount validates the number of arguments
func (bf *BaseFunction) ValidateArgCount(args []interface{}) error {
argCount := len(args)
+3 -5
View File
@@ -83,6 +83,7 @@ func registerBuiltinFunctions() {
_ = Register(NewMedianAggregatorFunction())
_ = Register(NewPercentileFunction())
_ = Register(NewCollectFunction())
_ = Register(NewFirstValueFunction())
_ = Register(NewLastValueFunction())
_ = Register(NewMergeAggFunction())
_ = Register(NewStdDevSAggregatorFunction())
@@ -91,8 +92,9 @@ func registerBuiltinFunctions() {
_ = Register(NewVarSAggregatorFunction())
// Window functions
_ = Register(NewWindowStartFunction())
_ = Register(NewWindowEndFunction())
_ = Register(NewRowNumberFunction())
_ = Register(NewFirstValueFunction())
_ = Register(NewLeadFunction())
_ = Register(NewNthValueFunction())
@@ -102,10 +104,6 @@ func registerBuiltinFunctions() {
_ = Register(NewChangedColFunction())
_ = Register(NewHadChangedFunction())
// Window functions
_ = Register(NewWindowStartFunction())
_ = Register(NewWindowEndFunction())
// Expression functions
_ = Register(NewExpressionFunction())
_ = Register(NewExprFunction())
+7 -2
View File
@@ -54,12 +54,17 @@ func (bridge *ExprBridge) RegisterStreamSQLFunctionsToExpr() []expr.Option {
// Add function to expr environment
bridge.exprEnv[name] = wrappedFunc
bridge.exprEnv[strings.ToUpper(name)] = wrappedFunc
// Register function type information
// Register function type information for both lowercase and uppercase
options = append(options, expr.Function(
name,
wrappedFunc,
))
options = append(options, expr.Function(
strings.ToUpper(name),
wrappedFunc,
))
}
return options
@@ -143,7 +148,7 @@ func (bridge *ExprBridge) CompileExpressionWithStreamSQLFunctions(expression str
// 启用一些有用的expr功能
options = append(options,
expr.AllowUndefinedVariables(), // 允许未定义变量
expr.AsBool(), // 期望布尔结果(可根据需要调整)
// 移除 expr.AsBool() 以允许返回任意类型的值
)
return expr.Compile(expression, options...)
+80 -3
View File
@@ -150,7 +150,7 @@ func (f *AvgFunction) Add(value interface{}) {
func (f *AvgFunction) Result() interface{} {
if f.count == 0 {
return nil // Return nil when no valid values instead of 0.0
return nil // Return NULL when no valid values according to SQL standard
}
return f.sum / float64(f.count)
}
@@ -187,6 +187,13 @@ func (f *MinFunction) Validate(args []interface{}) error {
}
func (f *MinFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
// 检查是否有nil参数
for _, arg := range args {
if arg == nil {
return nil, nil
}
}
min := math.Inf(1)
for _, arg := range args {
val, err := cast.ToFloat64E(arg)
@@ -224,7 +231,7 @@ func (f *MinFunction) Add(value interface{}) {
func (f *MinFunction) Result() interface{} {
if f.first {
return nil
return nil // Return NULL when no data according to SQL standard
}
return f.value
}
@@ -261,6 +268,13 @@ func (f *MaxFunction) Validate(args []interface{}) error {
}
func (f *MaxFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
// 检查是否有nil参数
for _, arg := range args {
if arg == nil {
return nil, nil
}
}
max := math.Inf(-1)
for _, arg := range args {
val, err := cast.ToFloat64E(arg)
@@ -298,7 +312,7 @@ func (f *MaxFunction) Add(value interface{}) {
func (f *MaxFunction) Result() interface{} {
if f.first {
return nil
return nil // Return NULL when no data according to SQL standard
}
return f.value
}
@@ -582,6 +596,69 @@ func (f *CollectFunction) Clone() AggregatorFunction {
return newFunc
}
// FirstValueFunction 首个值函数 - 返回组中第一行的值
type FirstValueFunction struct {
*BaseFunction
firstValue interface{}
hasValue bool
}
func NewFirstValueFunction() *FirstValueFunction {
return &FirstValueFunction{
BaseFunction: NewBaseFunction("first_value", TypeAggregation, "聚合函数", "返回第一个值", 1, -1),
firstValue: nil,
hasValue: false,
}
}
func (f *FirstValueFunction) Validate(args []interface{}) error {
return f.ValidateArgCount(args)
}
func (f *FirstValueFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
if err := f.Validate(args); err != nil {
return nil, err
}
if len(args) == 0 {
return nil, fmt.Errorf("function %s requires at least one argument", f.GetName())
}
// 返回第一个值
return args[0], nil
}
// 实现AggregatorFunction接口
func (f *FirstValueFunction) New() AggregatorFunction {
return &FirstValueFunction{
BaseFunction: f.BaseFunction,
firstValue: nil,
hasValue: false,
}
}
func (f *FirstValueFunction) Add(value interface{}) {
if !f.hasValue {
f.firstValue = value
f.hasValue = true
}
}
func (f *FirstValueFunction) Result() interface{} {
return f.firstValue
}
func (f *FirstValueFunction) Reset() {
f.firstValue = nil
f.hasValue = false
}
func (f *FirstValueFunction) Clone() AggregatorFunction {
return &FirstValueFunction{
BaseFunction: f.BaseFunction,
firstValue: f.firstValue,
hasValue: f.hasValue,
}
}
// LastValueFunction 最后值函数 - 返回组中最后一行的值
type LastValueFunction struct {
*BaseFunction
+11
View File
@@ -24,6 +24,17 @@ func (f *IfNullFunction) Validate(args []interface{}) error {
func (f *IfNullFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
if args[0] == nil {
// 当第一个参数为nil时返回第二个参数
// 如果第二个参数是数字0确保返回float64类型以保持一致性
if args[1] != nil {
// 尝试转换为float64以保持数值类型一致性
if val, ok := args[1].(int); ok && val == 0 {
return 0.0, nil
}
if val, ok := args[1].(float32); ok {
return float64(val), nil
}
}
return args[1], nil
}
return args[0], nil
+4 -14
View File
@@ -479,12 +479,7 @@ func (f *YearFunction) Validate(args []interface{}) error {
}
func (f *YearFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
// 首先检查是否是 time.Time 类型
if t, ok := args[0].(time.Time); ok {
return float64(t.Year()), nil
}
// 如果不是 time.Time尝试转换为字符串并解析
// 尝试转换为字符串并解析
dateStr, err := cast.ToStringE(args[0])
if err != nil {
return nil, fmt.Errorf("invalid date: %v", err)
@@ -497,7 +492,7 @@ func (f *YearFunction) Execute(ctx *FunctionContext, args []interface{}) (interf
}
}
return float64(t.Year()), nil
return t.Year(), nil
}
// MonthFunction 提取月份函数
@@ -516,12 +511,7 @@ func (f *MonthFunction) Validate(args []interface{}) error {
}
func (f *MonthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
// 首先检查是否是 time.Time 类型
if t, ok := args[0].(time.Time); ok {
return float64(t.Month()), nil
}
// 如果不是 time.Time尝试转换为字符串并解析
// 转换为字符串并解析
dateStr, err := cast.ToStringE(args[0])
if err != nil {
return nil, fmt.Errorf("invalid date: %v", err)
@@ -534,7 +524,7 @@ func (f *MonthFunction) Execute(ctx *FunctionContext, args []interface{}) (inter
}
}
return float64(t.Month()), nil
return int(t.Month()), nil
}
// DayFunction 提取日期函数
File diff suppressed because it is too large Load Diff
+10
View File
@@ -550,6 +550,11 @@ func (f *RoundFunction) Validate(args []interface{}) error {
}
func (f *RoundFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
// 检查第一个参数是否为nil
if args[0] == nil {
return nil, nil
}
val, err := cast.ToFloat64E(args[0])
if err != nil {
return nil, err
@@ -559,6 +564,11 @@ func (f *RoundFunction) Execute(ctx *FunctionContext, args []interface{}) (inter
return math.Round(val), nil
}
// 检查第二个参数是否为nil如果存在
if args[1] == nil {
return nil, nil
}
precision, err := cast.ToIntE(args[1])
if err != nil {
return nil, err
File diff suppressed because it is too large Load Diff
+102 -20
View File
@@ -5,7 +5,13 @@ import (
"reflect"
)
// UnnestFunction 将数组展开为多行
const (
UnnestObjectMarker = "__unnest_object__"
UnnestDataKey = "__data__"
UnnestEmptyMarker = "__empty_unnest__"
DefaultValueKey = "value"
)
type UnnestFunction struct {
*BaseFunction
}
@@ -27,7 +33,13 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte
array := args[0]
if array == nil {
return []interface{}{}, nil
// 返回带有unnest标记的空结果
return []interface{}{
map[string]interface{}{
UnnestObjectMarker: true,
UnnestEmptyMarker: true, // 标记这是空unnest结果
},
}, nil
}
// 使用反射检查是否为数组或切片
@@ -36,7 +48,18 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte
return nil, fmt.Errorf("unnest requires an array or slice, got %T", array)
}
// 转换为 []interface{}
// 如果数组为空,返回带标记的空数组
if v.Len() == 0 {
// 返回带有unnest标记的空结果
return []interface{}{
map[string]interface{}{
UnnestObjectMarker: true,
UnnestEmptyMarker: true, // 标记这是空unnest结果
},
}, nil
}
// 转换为 []interface{}所有元素都标记为unnest结果
result := make([]interface{}, v.Len())
for i := 0; i < v.Len(); i++ {
elem := v.Index(i).Interface()
@@ -45,39 +68,46 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte
if elemMap, ok := elem.(map[string]interface{}); ok {
// 对于对象,我们返回一个特殊的结构来表示需要展开为列
result[i] = map[string]interface{}{
"__unnest_object__": true,
"__data__": elemMap,
UnnestObjectMarker: true,
UnnestDataKey: elemMap,
}
} else {
result[i] = elem
// 对于普通元素也需要标记为unnest结果
result[i] = map[string]interface{}{
UnnestObjectMarker: true,
UnnestDataKey: elem,
}
}
}
return result, nil
}
// UnnestResult 表示 unnest 函数的结果
type UnnestResult struct {
Rows []map[string]interface{}
}
// IsUnnestResult 检查是否为 unnest 结果
func IsUnnestResult(value interface{}) bool {
if slice, ok := value.([]interface{}); ok {
for _, item := range slice {
if itemMap, ok := item.(map[string]interface{}); ok {
if unnest, exists := itemMap["__unnest_object__"]; exists {
if unnestBool, ok := unnest.(bool); ok && unnestBool {
return true
}
slice, ok := value.([]interface{})
if !ok || len(slice) == 0 {
return false
}
// 检查数组中是否有任何unnest标记的元素
for _, item := range slice {
if itemMap, ok := item.(map[string]interface{}); ok {
if unnest, exists := itemMap[UnnestObjectMarker]; exists {
if unnestBool, ok := unnest.(bool); ok && unnestBool {
return true
}
}
}
}
// 如果没有找到unnest标记则不是unnest结果
return false
}
// ProcessUnnestResult 处理 unnest 结果,将其转换为多行
func ProcessUnnestResult(value interface{}) []map[string]interface{} {
slice, ok := value.([]interface{})
if !ok {
@@ -87,20 +117,72 @@ func ProcessUnnestResult(value interface{}) []map[string]interface{} {
var rows []map[string]interface{}
for _, item := range slice {
if itemMap, ok := item.(map[string]interface{}); ok {
if unnest, exists := itemMap["__unnest_object__"]; exists {
if unnest, exists := itemMap[UnnestObjectMarker]; exists {
if unnestBool, ok := unnest.(bool); ok && unnestBool {
if data, exists := itemMap["__data__"]; exists {
if data, exists := itemMap[UnnestDataKey]; exists {
// 检查数据是否为对象map
if dataMap, ok := data.(map[string]interface{}); ok {
// 对象数据直接展开为列
rows = append(rows, dataMap)
} else {
// 普通数据使用默认字段名
row := map[string]interface{}{
DefaultValueKey: data,
}
rows = append(rows, row)
}
}
continue
}
}
}
// 对于非对象元素,创建一个包含单个值的行
// 对于非标记元素,创建一个包含单个值的行(向后兼容)
row := map[string]interface{}{
"value": item,
DefaultValueKey: item,
}
rows = append(rows, row)
}
return rows
}
func ProcessUnnestResultWithFieldName(value interface{}, fieldName string) []map[string]interface{} {
slice, ok := value.([]interface{})
if !ok {
return nil
}
var rows []map[string]interface{}
for _, item := range slice {
if itemMap, ok := item.(map[string]interface{}); ok {
if unnest, exists := itemMap[UnnestObjectMarker]; exists {
if unnestBool, ok := unnest.(bool); ok && unnestBool {
// 检查是否为空unnest结果
if itemMap[UnnestEmptyMarker] == true {
// 空unnest结果返回空数组
return []map[string]interface{}{}
}
if data, exists := itemMap[UnnestDataKey]; exists {
// 检查数据是否为对象map
if dataMap, ok := data.(map[string]interface{}); ok {
// 对象数据直接展开为列
rows = append(rows, dataMap)
} else {
// 普通数据使用指定字段名
row := map[string]interface{}{
fieldName: data,
}
rows = append(rows, row)
}
}
continue
}
}
}
// 对于非标记元素,使用指定的字段名创建行(向后兼容)
row := map[string]interface{}{
fieldName: item,
}
rows = append(rows, row)
}
+46 -6
View File
@@ -15,7 +15,20 @@ func TestUnnestFunction(t *testing.T) {
if err != nil {
t.Errorf("UnnestFunction should not return error: %v", err)
}
expected := []interface{}{"a", "b", "c"}
expected := []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__data__": "a",
},
map[string]interface{}{
"__unnest_object__": true,
"__data__": "b",
},
map[string]interface{}{
"__unnest_object__": true,
"__data__": "c",
},
}
if !reflect.DeepEqual(result, expected) {
t.Errorf("UnnestFunction = %v, want %v", result, expected)
}
@@ -51,8 +64,15 @@ func TestUnnestFunction(t *testing.T) {
if err != nil {
t.Errorf("UnnestFunction should not return error for empty array: %v", err)
}
if len(result.([]interface{})) != 0 {
t.Errorf("UnnestFunction should return empty array for empty input")
// 空数组应该返回带有空标记的结果
expectedEmpty := []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__empty_unnest__": true,
},
}
if !reflect.DeepEqual(result, expectedEmpty) {
t.Errorf("UnnestFunction empty array = %v, want %v", result, expectedEmpty)
}
// 测试nil参数
@@ -61,8 +81,15 @@ func TestUnnestFunction(t *testing.T) {
if err != nil {
t.Errorf("UnnestFunction should not return error for nil: %v", err)
}
if len(result.([]interface{})) != 0 {
t.Errorf("UnnestFunction should return empty array for nil input")
// nil应该返回带有空标记的结果
expectedNil := []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__empty_unnest__": true,
},
}
if !reflect.DeepEqual(result, expectedNil) {
t.Errorf("UnnestFunction nil = %v, want %v", result, expectedNil)
}
// 测试错误参数数量
@@ -85,7 +112,20 @@ func TestUnnestFunction(t *testing.T) {
if err != nil {
t.Errorf("UnnestFunction should handle arrays: %v", err)
}
expected = []interface{}{"x", "y", "z"}
expected = []interface{}{
map[string]interface{}{
"__unnest_object__": true,
"__data__": "x",
},
map[string]interface{}{
"__unnest_object__": true,
"__data__": "y",
},
map[string]interface{}{
"__unnest_object__": true,
"__data__": "z",
},
}
if !reflect.DeepEqual(result, expected) {
t.Errorf("UnnestFunction array = %v, want %v", result, expected)
}
+5 -79
View File
@@ -82,7 +82,11 @@ func TestNewStringFunctions(t *testing.T) {
if !exists {
t.Fatalf("Function %s not found", tt.funcName)
}
// 验证参数
if err := fn.Validate(tt.args); err != nil {
t.Errorf("Validate() error = %v", err)
return
}
ctx := &FunctionContext{}
result, err := fn.Execute(ctx, tt.args)
@@ -167,84 +171,6 @@ func TestStringFunctionValidation(t *testing.T) {
args: []interface{}{"hello"},
wantErr: false,
},
{
name: "upper no args",
function: NewUpperFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "upper valid args",
function: NewUpperFunction(),
args: []interface{}{"hello"},
wantErr: false,
},
{
name: "endswith no args",
function: NewEndswithFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "endswith one arg",
function: NewEndswithFunction(),
args: []interface{}{"hello"},
wantErr: true,
},
{
name: "endswith valid args",
function: NewEndswithFunction(),
args: []interface{}{"hello", "lo"},
wantErr: false,
},
{
name: "substring no args",
function: NewSubstringFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "substring one arg",
function: NewSubstringFunction(),
args: []interface{}{"hello"},
wantErr: true,
},
{
name: "substring valid args",
function: NewSubstringFunction(),
args: []interface{}{"hello", 1},
wantErr: false,
},
{
name: "replace no args",
function: NewReplaceFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "replace two args",
function: NewReplaceFunction(),
args: []interface{}{"hello", "world"},
wantErr: true,
},
{
name: "replace valid args",
function: NewReplaceFunction(),
args: []interface{}{"hello", "l", "x"},
wantErr: false,
},
{
name: "regexp_matches no args",
function: NewRegexpMatchesFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "regexp_matches valid args",
function: NewRegexpMatchesFunction(),
args: []interface{}{"hello123", "[0-9]+"},
wantErr: false,
},
}
for _, tt := range tests {
+80 -177
View File
@@ -4,184 +4,8 @@ import (
"testing"
)
// TestTypeFunctions 测试类型检查函数的基本功能
// TestTypeFunctions 测试类型函数
func TestTypeFunctions(t *testing.T) {
tests := []struct {
name string
funcName string
args []interface{}
expected interface{}
}{
{
name: "is_null true",
funcName: "is_null",
args: []interface{}{nil},
expected: true,
},
{
name: "is_null false",
funcName: "is_null",
args: []interface{}{"test"},
expected: false,
},
{
name: "is_not_null true",
funcName: "is_not_null",
args: []interface{}{"test"},
expected: true,
},
{
name: "is_not_null false",
funcName: "is_not_null",
args: []interface{}{nil},
expected: false,
},
{
name: "is_numeric true",
funcName: "is_numeric",
args: []interface{}{123},
expected: true,
},
{
name: "is_numeric false",
funcName: "is_numeric",
args: []interface{}{"test"},
expected: false,
},
{
name: "is_string true",
funcName: "is_string",
args: []interface{}{"test"},
expected: true,
},
{
name: "is_string false",
funcName: "is_string",
args: []interface{}{123},
expected: false,
},
{
name: "is_bool true",
funcName: "is_bool",
args: []interface{}{true},
expected: true,
},
{
name: "is_bool false",
funcName: "is_bool",
args: []interface{}{"test"},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fn, exists := Get(tt.funcName)
if !exists {
t.Fatalf("Function %s not found", tt.funcName)
}
result, err := fn.Execute(&FunctionContext{}, tt.args)
if err != nil {
t.Errorf("Execute() error = %v", err)
return
}
if result != tt.expected {
t.Errorf("Execute() = %v, want %v", result, tt.expected)
}
})
}
}
// TestTypeFunctionValidation 测试类型函数的参数验证
func TestTypeFunctionValidation(t *testing.T) {
tests := []struct {
name string
function Function
args []interface{}
wantErr bool
}{
{
name: "is_null no args",
function: NewIsNullFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_null too many args",
function: NewIsNullFunction(),
args: []interface{}{"test", "extra"},
wantErr: true,
},
{
name: "is_null valid args",
function: NewIsNullFunction(),
args: []interface{}{"test"},
wantErr: false,
},
{
name: "is_not_null no args",
function: NewIsNotNullFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_not_null valid args",
function: NewIsNotNullFunction(),
args: []interface{}{nil},
wantErr: false,
},
{
name: "is_numeric no args",
function: NewIsNumericFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_numeric valid args",
function: NewIsNumericFunction(),
args: []interface{}{123},
wantErr: false,
},
{
name: "is_string no args",
function: NewIsStringFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_string valid args",
function: NewIsStringFunction(),
args: []interface{}{"test"},
wantErr: false,
},
{
name: "is_bool no args",
function: NewIsBoolFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_bool valid args",
function: NewIsBoolFunction(),
args: []interface{}{true},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.function.Validate(tt.args)
if (err != nil) != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
// TestTypeFunctionEdgeCases 测试类型函数的边界情况
func TestTypeFunctionEdgeCases(t *testing.T) {
tests := []struct {
name string
function Function
@@ -242,6 +66,12 @@ func TestTypeFunctionEdgeCases(t *testing.T) {
args: []interface{}{true},
expected: false,
},
{
name: "is_numeric with nil",
function: NewIsNumericFunction(),
args: []interface{}{nil},
expected: false,
},
{
name: "is_string with empty string",
function: NewIsStringFunction(),
@@ -254,10 +84,45 @@ func TestTypeFunctionEdgeCases(t *testing.T) {
args: []interface{}{false},
expected: true,
},
{
name: "is_bool with nil",
function: NewIsBoolFunction(),
args: []interface{}{nil},
expected: false,
},
{
name: "is_array",
function: NewIsArrayFunction(),
args: []interface{}{[]int{1, 2, 3}},
expected: true,
},
{
name: "is_array with nil",
function: NewIsArrayFunction(),
args: []interface{}{nil},
expected: false,
},
{
name: "is_object",
function: NewIsObjectFunction(),
args: []interface{}{map[string]int{"a": 1, "b": 2}},
expected: true,
},
{
name: "is_object with nil",
function: NewIsObjectFunction(),
args: []interface{}{nil},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 验证参数
if err := tt.function.Validate(tt.args); err != nil {
t.Errorf("Validate() error = %v", err)
return
}
result, err := tt.function.Execute(&FunctionContext{}, tt.args)
if err != nil {
t.Errorf("Execute() error = %v", err)
@@ -270,3 +135,41 @@ func TestTypeFunctionEdgeCases(t *testing.T) {
})
}
}
// TestTypeFunctionValidation 测试类型函数的参数验证
func TestTypeFunctionValidation(t *testing.T) {
tests := []struct {
name string
function Function
args []interface{}
wantErr bool
}{
{
name: "is_null no args",
function: NewIsNullFunction(),
args: []interface{}{},
wantErr: true,
},
{
name: "is_null too many args",
function: NewIsNullFunction(),
args: []interface{}{"test", "extra"},
wantErr: true,
},
{
name: "is_null valid args",
function: NewIsNullFunction(),
args: []interface{}{"test"},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.function.Validate(tt.args)
if (err != nil) != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

Some files were not shown because too many files have changed in this diff Show More