Files
streamsql/functions/functions_window_test.go
2025-08-06 17:18:50 +08:00

576 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package functions
import (
"reflect"
"testing"
)
// isWindowFunction 判断是否为窗口函数
func isWindowFunction(funcName string) bool {
windowFunctions := map[string]bool{
"row_number": true,
"window_start": true,
"window_end": true,
"lead": true,
"lag": true,
"first_value": true,
"last_value": true,
}
return windowFunctions[funcName]
}
func TestNewWindowFunctions(t *testing.T) {
tests := []struct {
name string
funcName string
args []interface{}
want interface{}
wantErr bool
setup func(fn AggregatorFunction)
}{
// first_value 函数测试
{
name: "first_value basic",
funcName: "first_value",
args: []interface{}{"test"},
want: "first",
wantErr: false,
setup: func(fn AggregatorFunction) {
fn.Add("first")
fn.Add("second")
fn.Add("third")
},
},
{
name: "first_value empty",
funcName: "first_value",
args: []interface{}{"test"},
want: nil,
wantErr: false,
setup: func(fn AggregatorFunction) {},
},
// last_value 函数测试
{
name: "last_value basic",
funcName: "last_value",
args: []interface{}{"test"},
want: "third",
wantErr: false,
setup: func(fn AggregatorFunction) {
fn.Add("first")
fn.Add("second")
fn.Add("third")
},
},
{
name: "last_value empty",
funcName: "last_value",
args: []interface{}{"test"},
want: nil,
wantErr: false,
setup: func(fn AggregatorFunction) {},
},
// lag 函数测试
{
name: "lag default offset",
funcName: "lag",
args: []interface{}{"test"},
want: "second",
wantErr: false,
setup: func(fn AggregatorFunction) {
fn.Add("first")
fn.Add("second")
fn.Add("third")
},
},
{
name: "lag with offset 2",
funcName: "lag",
args: []interface{}{"test", 2},
want: "first",
wantErr: false,
setup: func(fn AggregatorFunction) {
fn.Add("first")
fn.Add("second")
fn.Add("third")
},
},
{
name: "lag with default value",
funcName: "lag",
args: []interface{}{"test", 5, "default"},
want: "default",
wantErr: false,
setup: func(fn AggregatorFunction) {
fn.Add("first")
fn.Add("second")
},
},
{
name: "lag invalid offset type",
funcName: "lag",
args: []interface{}{"test", "invalid"},
wantErr: true,
setup: func(fn AggregatorFunction) {},
},
// lead 函数测试
{
name: "lead default offset",
funcName: "lead",
args: []interface{}{"test"},
want: nil, // Lead函数简化实现返回nil
wantErr: false,
setup: func(fn AggregatorFunction) {
fn.Add("first")
fn.Add("second")
fn.Add("third")
},
},
{
name: "lead with default value",
funcName: "lead",
args: []interface{}{"test", 1, "default"},
want: "default",
wantErr: false,
setup: func(fn AggregatorFunction) {},
},
{
name: "lead invalid offset type",
funcName: "lead",
args: []interface{}{"test", "invalid"},
wantErr: true,
setup: func(fn AggregatorFunction) {},
},
// nth_value 函数测试
{
name: "nth_value first",
funcName: "nth_value",
args: []interface{}{"test", 1},
want: "first",
wantErr: false,
setup: func(fn AggregatorFunction) {
fn.Add("first")
fn.Add("second")
fn.Add("third")
},
},
{
name: "nth_value second",
funcName: "nth_value",
args: []interface{}{"test", 2},
want: "second",
wantErr: false,
setup: func(fn AggregatorFunction) {
fn.Add("first")
fn.Add("second")
fn.Add("third")
},
},
{
name: "nth_value out of range",
funcName: "nth_value",
args: []interface{}{"test", 5},
want: nil,
wantErr: false,
setup: func(fn AggregatorFunction) {
fn.Add("first")
fn.Add("second")
},
},
{
name: "nth_value invalid n type",
funcName: "nth_value",
args: []interface{}{"test", "invalid"},
wantErr: true,
setup: func(fn AggregatorFunction) {},
},
{
name: "nth_value zero n",
funcName: "nth_value",
args: []interface{}{"test", 0},
wantErr: true,
setup: func(fn AggregatorFunction) {},
},
{
name: "nth_value negative n",
funcName: "nth_value",
args: []interface{}{"test", -1},
wantErr: true,
setup: func(fn AggregatorFunction) {},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fn, exists := Get(tt.funcName)
if !exists {
t.Fatalf("Function %s not found", tt.funcName)
}
// 检查函数是否实现了AggregatorFunction接口
aggFn, ok := fn.(AggregatorFunction)
if !ok {
t.Fatalf("Function %s does not implement AggregatorFunction", tt.funcName)
}
// 先执行函数的Validate方法来设置参数
err := fn.Validate(tt.args)
if (err != nil) != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
return
}
// 如果期望错误且Validate已经失败则测试通过
if tt.wantErr && err != nil {
return
}
// 创建新的聚合器实例
aggInstance := aggFn.New()
// 执行setup函数添加测试数据
if tt.setup != nil {
tt.setup(aggInstance)
}
// 对于窗口函数测试不需要调用Execute方法
// Execute方法主要用于流式处理这里我们直接测试聚合器的Result方法
// 如果需要测试Execute方法应该在原始函数实例上调用
if !isWindowFunction(tt.funcName) {
// 对于非窗口函数,在聚合器实例上执行
if aggFunc, ok := aggInstance.(Function); ok {
_, err = aggFunc.Execute(nil, tt.args)
} else {
// 执行函数
_, err = fn.Execute(nil, tt.args)
}
}
if (err != nil) != tt.wantErr {
t.Errorf("Execute() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !tt.wantErr {
// 对于窗口函数我们主要测试聚合器的Result方法
aggResult := aggInstance.Result()
if tt.want != nil && aggResult != tt.want {
t.Errorf("AggregatorFunction.Result() = %v, want %v", aggResult, tt.want)
}
}
})
}
}
// 测试窗口函数的基本功能
func TestWindowFunctionBasics(t *testing.T) {
// 测试row_number函数
t.Run("RowNumberFunction", func(t *testing.T) {
rowNumFunc, exists := Get("row_number")
if !exists {
t.Fatal("row_number function not found")
}
// 重置函数状态
if rowNum, ok := rowNumFunc.(*RowNumberFunction); ok {
rowNum.Reset()
}
// 测试行号递增
result1, err := rowNumFunc.Execute(nil, []interface{}{})
if err != nil {
t.Errorf("Execute() error = %v", err)
}
if result1 != int64(1) {
t.Errorf("First call should return 1, got %v", result1)
}
result2, err := rowNumFunc.Execute(nil, []interface{}{})
if err != nil {
t.Errorf("Execute() error = %v", err)
}
if result2 != int64(2) {
t.Errorf("Second call should return 2, got %v", result2)
}
})
// 测试window_start和window_end函数
t.Run("WindowStartEndFunctions", func(t *testing.T) {
windowStartFunc, exists := Get("window_start")
if !exists {
t.Fatal("window_start function not found")
}
windowEndFunc, exists := Get("window_end")
if !exists {
t.Fatal("window_end function not found")
}
// 测试无窗口信息时的行为
ctx := &FunctionContext{
Data: map[string]interface{}{},
}
_, err := windowStartFunc.Execute(ctx, []interface{}{})
if err != nil {
t.Errorf("Execute() error = %v", err)
}
// 无窗口信息时应该返回nil或默认值
_, err = windowEndFunc.Execute(ctx, []interface{}{})
if err != nil {
t.Errorf("Execute() error = %v", err)
}
// 无窗口信息时应该返回nil或默认值
})
}
func TestWindowFunctionResetAndClone(t *testing.T) {
// 测试WindowStartFunction的Reset和Clone
windowStart := NewWindowStartFunction()
agg := windowStart.New().(*WindowStartFunction)
agg.Add("data1")
agg.Add("data2")
// 测试Reset
agg.Reset()
res := agg.Result()
if res != nil {
t.Errorf("WindowStart Reset failed, result = %v, want nil", res)
}
// 测试Clone
clone := agg.Clone().(*WindowStartFunction)
if clone.windowStart != agg.windowStart {
t.Errorf("WindowStart Clone failed")
}
// 测试WindowEndFunction的Reset和Clone
windowEnd := NewWindowEndFunction()
agg2 := windowEnd.New().(*WindowEndFunction)
agg2.Add("data1")
agg2.Add("data2")
// 测试Reset
agg2.Reset()
res2 := agg2.Result()
if res2 != nil {
t.Errorf("WindowEnd Reset failed, result = %v, want nil", res2)
}
// 测试Clone
clone2 := agg2.Clone().(*WindowEndFunction)
if clone2.windowEnd != agg2.windowEnd {
t.Errorf("WindowEnd Clone failed")
}
}
func TestExpressionFunction(t *testing.T) {
fn := NewExpressionFunction()
ctx := &FunctionContext{}
// 测试Execute方法
_, err := fn.Execute(ctx, []interface{}{"x + y", 1, 2})
if err != nil {
t.Errorf("Execute error: %v", err)
}
// 注意:这里的结果取决于表达式求值器的实现
// 我们主要测试函数调用是否成功
// 测试聚合器方法
agg := fn.New().(*ExpressionFunction)
agg.Add("test")
_ = agg.Result()
// 结果取决于表达式求值器的实现
// 测试Reset
agg.Reset()
res := agg.Result()
if res != nil {
t.Errorf("Reset failed, result = %v, want nil", res)
}
// 测试Clone
clone := agg.Clone().(*ExpressionFunction)
if !reflect.DeepEqual(clone.values, agg.values) {
t.Errorf("Clone failed")
}
}
func TestExpressionAggregatorFunction(t *testing.T) {
fn := NewExpressionAggregatorFunction()
ctx := &FunctionContext{}
// 测试Execute方法
_, err := fn.Execute(ctx, []interface{}{"sum(x)", 1, 2, 3})
if err != nil {
t.Errorf("Execute error: %v", err)
}
// 注意:这里的结果取决于表达式求值器的实现
// 测试聚合器方法
agg := fn.New().(*ExpressionAggregatorFunction)
agg.Add("test")
_ = agg.Result()
// 结果取决于表达式求值器的实现
// 测试Reset
agg.Reset()
res := agg.Result()
if res != nil {
t.Errorf("Reset failed, result = %v, want nil", res)
}
// 测试Clone
clone := agg.Clone().(*ExpressionAggregatorFunction)
if clone.lastResult != agg.lastResult {
t.Errorf("Clone failed")
}
}
func TestFirstValueFunction(t *testing.T) {
fn := NewFirstValueFunction()
// 测试聚合器方法
agg := fn.New().(*FirstValueFunction)
agg.Add("x")
agg.Add("y")
agg.Add("z")
res := agg.Result()
if res != "x" {
t.Errorf("Agg first_value result = %v, want x", res)
}
// 测试Reset
agg.Reset()
res2 := agg.Result()
if res2 != nil {
t.Errorf("Reset failed, result = %v, want nil", res2)
}
// 测试Clone
clone := agg.Clone().(*FirstValueFunction)
if clone.firstValue != agg.firstValue {
t.Errorf("Clone failed")
}
}
func TestLeadFunction(t *testing.T) {
fn := NewLeadFunction()
// 测试聚合器方法
agg := fn.New().(*LeadFunction)
agg.Add("x")
agg.Add("y")
agg.Add("z")
res := agg.Result()
if res != nil {
t.Errorf("Agg lead result = %v, want nil", res)
}
// 测试Reset
agg.Reset()
res2 := agg.Result()
if res2 != nil {
t.Errorf("Reset failed, result = %v, want nil", res2)
}
// 测试Clone
clone := agg.Clone().(*LeadFunction)
if clone.defaultValue != agg.defaultValue || clone.offset != agg.offset {
t.Errorf("Clone failed")
}
}
func TestNthValueFunction(t *testing.T) {
fn := NewNthValueFunction()
// 测试聚合器方法
agg := fn.New().(*NthValueFunction)
agg.Add("x")
agg.Add("y")
agg.Add("z")
res := agg.Result()
if res != "x" {
t.Errorf("Agg nth_value result = %v, want x", res)
}
// 测试Reset
agg.Reset()
res = agg.Result()
if res != nil {
t.Errorf("Reset failed, result = %v, want nil", res)
}
// 测试Clone
clone := agg.Clone().(*NthValueFunction)
if clone.n != agg.n {
t.Errorf("Clone failed")
}
}
func TestWindowFunctionEdgeCases(t *testing.T) {
// LeadFunction Validate/Execute边界
lead := NewLeadFunction()
if err := lead.Validate([]interface{}{}); err == nil {
t.Error("LeadFunction.Validate should fail for insufficient args")
}
_, err := lead.Execute(nil, []interface{}{})
if err == nil {
t.Error("LeadFunction.Execute should fail for empty args")
}
agg := lead.New().(*LeadFunction)
agg.Reset()
_ = agg.Clone()
// NthValueFunction Validate/Execute边界
nth := NewNthValueFunction()
if err := nth.Validate([]interface{}{}); err == nil {
t.Error("NthValueFunction.Validate should fail for insufficient args")
}
_, err = nth.Execute(nil, []interface{}{})
if err == nil {
t.Error("NthValueFunction.Execute should fail for empty args")
}
agg2 := nth.New().(*NthValueFunction)
agg2.Reset()
_ = agg2.Clone()
// FirstValueFunction Validate/Execute边界
first := NewFirstValueFunction()
if err := first.Validate([]interface{}{}); err == nil {
t.Error("FirstValueFunction.Validate should fail for insufficient args")
}
_, err = first.Execute(nil, []interface{}{})
if err == nil {
t.Error("FirstValueFunction.Execute should fail for empty args")
}
agg3 := first.New().(*FirstValueFunction)
agg3.Reset()
_ = agg3.Clone()
// LastValueFunction Validate/Execute边界
last := NewLastValueFunction()
if err := last.Validate([]interface{}{}); err == nil {
t.Error("LastValueFunction.Validate should fail for insufficient args")
}
_, err = last.Execute(nil, []interface{}{})
if err == nil {
t.Error("LastValueFunction.Execute should fail for empty args")
}
agg4 := last.New().(*LastValueFunction)
agg4.Reset()
_ = agg4.Clone()
// WindowStartFunction/WindowEndFunction Reset/Clone边界
ws := NewWindowStartFunction().New().(*WindowStartFunction)
ws.Reset()
_ = ws.Clone()
we := NewWindowEndFunction().New().(*WindowEndFunction)
we.Reset()
_ = we.Clone()
}