forked from GiteaTest2015/streamsql
576 lines
14 KiB
Go
576 lines
14 KiB
Go
package functions
|
||
|
||
import (
|
||
"reflect"
|
||
"testing"
|
||
)
|
||
|
||
// isWindowFunction 判断是否为窗口函数
|
||
func isWindowFunction(funcName string) bool {
|
||
windowFunctions := map[string]bool{
|
||
"row_number": true,
|
||
"window_start": true,
|
||
"window_end": true,
|
||
"lead": true,
|
||
"lag": true,
|
||
"first_value": true,
|
||
"last_value": true,
|
||
}
|
||
return windowFunctions[funcName]
|
||
}
|
||
|
||
func TestNewWindowFunctions(t *testing.T) {
|
||
tests := []struct {
|
||
name string
|
||
funcName string
|
||
args []interface{}
|
||
want interface{}
|
||
wantErr bool
|
||
setup func(fn AggregatorFunction)
|
||
}{
|
||
// first_value 函数测试
|
||
{
|
||
name: "first_value basic",
|
||
funcName: "first_value",
|
||
args: []interface{}{"test"},
|
||
want: "first",
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {
|
||
fn.Add("first")
|
||
fn.Add("second")
|
||
fn.Add("third")
|
||
},
|
||
},
|
||
{
|
||
name: "first_value empty",
|
||
funcName: "first_value",
|
||
args: []interface{}{"test"},
|
||
want: nil,
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {},
|
||
},
|
||
|
||
// last_value 函数测试
|
||
{
|
||
name: "last_value basic",
|
||
funcName: "last_value",
|
||
args: []interface{}{"test"},
|
||
want: "third",
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {
|
||
fn.Add("first")
|
||
fn.Add("second")
|
||
fn.Add("third")
|
||
},
|
||
},
|
||
{
|
||
name: "last_value empty",
|
||
funcName: "last_value",
|
||
args: []interface{}{"test"},
|
||
want: nil,
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {},
|
||
},
|
||
|
||
// lag 函数测试
|
||
{
|
||
name: "lag default offset",
|
||
funcName: "lag",
|
||
args: []interface{}{"test"},
|
||
want: "second",
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {
|
||
fn.Add("first")
|
||
fn.Add("second")
|
||
fn.Add("third")
|
||
},
|
||
},
|
||
{
|
||
name: "lag with offset 2",
|
||
funcName: "lag",
|
||
args: []interface{}{"test", 2},
|
||
want: "first",
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {
|
||
fn.Add("first")
|
||
fn.Add("second")
|
||
fn.Add("third")
|
||
},
|
||
},
|
||
{
|
||
name: "lag with default value",
|
||
funcName: "lag",
|
||
args: []interface{}{"test", 5, "default"},
|
||
want: "default",
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {
|
||
fn.Add("first")
|
||
fn.Add("second")
|
||
},
|
||
},
|
||
{
|
||
name: "lag invalid offset type",
|
||
funcName: "lag",
|
||
args: []interface{}{"test", "invalid"},
|
||
wantErr: true,
|
||
setup: func(fn AggregatorFunction) {},
|
||
},
|
||
|
||
// lead 函数测试
|
||
{
|
||
name: "lead default offset",
|
||
funcName: "lead",
|
||
args: []interface{}{"test"},
|
||
want: nil, // Lead函数简化实现返回nil
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {
|
||
fn.Add("first")
|
||
fn.Add("second")
|
||
fn.Add("third")
|
||
},
|
||
},
|
||
{
|
||
name: "lead with default value",
|
||
funcName: "lead",
|
||
args: []interface{}{"test", 1, "default"},
|
||
want: "default",
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {},
|
||
},
|
||
{
|
||
name: "lead invalid offset type",
|
||
funcName: "lead",
|
||
args: []interface{}{"test", "invalid"},
|
||
wantErr: true,
|
||
setup: func(fn AggregatorFunction) {},
|
||
},
|
||
|
||
// nth_value 函数测试
|
||
{
|
||
name: "nth_value first",
|
||
funcName: "nth_value",
|
||
args: []interface{}{"test", 1},
|
||
want: "first",
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {
|
||
fn.Add("first")
|
||
fn.Add("second")
|
||
fn.Add("third")
|
||
},
|
||
},
|
||
{
|
||
name: "nth_value second",
|
||
funcName: "nth_value",
|
||
args: []interface{}{"test", 2},
|
||
want: "second",
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {
|
||
fn.Add("first")
|
||
fn.Add("second")
|
||
fn.Add("third")
|
||
},
|
||
},
|
||
{
|
||
name: "nth_value out of range",
|
||
funcName: "nth_value",
|
||
args: []interface{}{"test", 5},
|
||
want: nil,
|
||
wantErr: false,
|
||
setup: func(fn AggregatorFunction) {
|
||
fn.Add("first")
|
||
fn.Add("second")
|
||
},
|
||
},
|
||
{
|
||
name: "nth_value invalid n type",
|
||
funcName: "nth_value",
|
||
args: []interface{}{"test", "invalid"},
|
||
wantErr: true,
|
||
setup: func(fn AggregatorFunction) {},
|
||
},
|
||
{
|
||
name: "nth_value zero n",
|
||
funcName: "nth_value",
|
||
args: []interface{}{"test", 0},
|
||
wantErr: true,
|
||
setup: func(fn AggregatorFunction) {},
|
||
},
|
||
{
|
||
name: "nth_value negative n",
|
||
funcName: "nth_value",
|
||
args: []interface{}{"test", -1},
|
||
wantErr: true,
|
||
setup: func(fn AggregatorFunction) {},
|
||
},
|
||
}
|
||
|
||
for _, tt := range tests {
|
||
t.Run(tt.name, func(t *testing.T) {
|
||
fn, exists := Get(tt.funcName)
|
||
if !exists {
|
||
t.Fatalf("Function %s not found", tt.funcName)
|
||
}
|
||
|
||
// 检查函数是否实现了AggregatorFunction接口
|
||
aggFn, ok := fn.(AggregatorFunction)
|
||
if !ok {
|
||
t.Fatalf("Function %s does not implement AggregatorFunction", tt.funcName)
|
||
}
|
||
|
||
// 先执行函数的Validate方法来设置参数
|
||
err := fn.Validate(tt.args)
|
||
if (err != nil) != tt.wantErr {
|
||
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
|
||
return
|
||
}
|
||
|
||
// 如果期望错误且Validate已经失败,则测试通过
|
||
if tt.wantErr && err != nil {
|
||
return
|
||
}
|
||
|
||
// 创建新的聚合器实例
|
||
aggInstance := aggFn.New()
|
||
|
||
// 执行setup函数添加测试数据
|
||
if tt.setup != nil {
|
||
tt.setup(aggInstance)
|
||
}
|
||
|
||
// 对于窗口函数测试,不需要调用Execute方法
|
||
// Execute方法主要用于流式处理,这里我们直接测试聚合器的Result方法
|
||
// 如果需要测试Execute方法,应该在原始函数实例上调用
|
||
if !isWindowFunction(tt.funcName) {
|
||
// 对于非窗口函数,在聚合器实例上执行
|
||
if aggFunc, ok := aggInstance.(Function); ok {
|
||
_, err = aggFunc.Execute(nil, tt.args)
|
||
} else {
|
||
// 执行函数
|
||
_, err = fn.Execute(nil, tt.args)
|
||
}
|
||
}
|
||
|
||
if (err != nil) != tt.wantErr {
|
||
t.Errorf("Execute() error = %v, wantErr %v", err, tt.wantErr)
|
||
return
|
||
}
|
||
|
||
if !tt.wantErr {
|
||
// 对于窗口函数,我们主要测试聚合器的Result方法
|
||
aggResult := aggInstance.Result()
|
||
if tt.want != nil && aggResult != tt.want {
|
||
t.Errorf("AggregatorFunction.Result() = %v, want %v", aggResult, tt.want)
|
||
}
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
// 测试窗口函数的基本功能
|
||
func TestWindowFunctionBasics(t *testing.T) {
|
||
// 测试row_number函数
|
||
t.Run("RowNumberFunction", func(t *testing.T) {
|
||
rowNumFunc, exists := Get("row_number")
|
||
if !exists {
|
||
t.Fatal("row_number function not found")
|
||
}
|
||
|
||
// 重置函数状态
|
||
if rowNum, ok := rowNumFunc.(*RowNumberFunction); ok {
|
||
rowNum.Reset()
|
||
}
|
||
|
||
// 测试行号递增
|
||
result1, err := rowNumFunc.Execute(nil, []interface{}{})
|
||
if err != nil {
|
||
t.Errorf("Execute() error = %v", err)
|
||
}
|
||
if result1 != int64(1) {
|
||
t.Errorf("First call should return 1, got %v", result1)
|
||
}
|
||
|
||
result2, err := rowNumFunc.Execute(nil, []interface{}{})
|
||
if err != nil {
|
||
t.Errorf("Execute() error = %v", err)
|
||
}
|
||
if result2 != int64(2) {
|
||
t.Errorf("Second call should return 2, got %v", result2)
|
||
}
|
||
})
|
||
|
||
// 测试window_start和window_end函数
|
||
t.Run("WindowStartEndFunctions", func(t *testing.T) {
|
||
windowStartFunc, exists := Get("window_start")
|
||
if !exists {
|
||
t.Fatal("window_start function not found")
|
||
}
|
||
|
||
windowEndFunc, exists := Get("window_end")
|
||
if !exists {
|
||
t.Fatal("window_end function not found")
|
||
}
|
||
|
||
// 测试无窗口信息时的行为
|
||
ctx := &FunctionContext{
|
||
Data: map[string]interface{}{},
|
||
}
|
||
_, err := windowStartFunc.Execute(ctx, []interface{}{})
|
||
if err != nil {
|
||
t.Errorf("Execute() error = %v", err)
|
||
}
|
||
// 无窗口信息时应该返回nil或默认值
|
||
|
||
_, err = windowEndFunc.Execute(ctx, []interface{}{})
|
||
if err != nil {
|
||
t.Errorf("Execute() error = %v", err)
|
||
}
|
||
// 无窗口信息时应该返回nil或默认值
|
||
})
|
||
}
|
||
|
||
func TestWindowFunctionResetAndClone(t *testing.T) {
|
||
// 测试WindowStartFunction的Reset和Clone
|
||
windowStart := NewWindowStartFunction()
|
||
agg := windowStart.New().(*WindowStartFunction)
|
||
agg.Add("data1")
|
||
agg.Add("data2")
|
||
|
||
// 测试Reset
|
||
agg.Reset()
|
||
res := agg.Result()
|
||
if res != nil {
|
||
t.Errorf("WindowStart Reset failed, result = %v, want nil", res)
|
||
}
|
||
|
||
// 测试Clone
|
||
clone := agg.Clone().(*WindowStartFunction)
|
||
if clone.windowStart != agg.windowStart {
|
||
t.Errorf("WindowStart Clone failed")
|
||
}
|
||
|
||
// 测试WindowEndFunction的Reset和Clone
|
||
windowEnd := NewWindowEndFunction()
|
||
agg2 := windowEnd.New().(*WindowEndFunction)
|
||
agg2.Add("data1")
|
||
agg2.Add("data2")
|
||
|
||
// 测试Reset
|
||
agg2.Reset()
|
||
res2 := agg2.Result()
|
||
if res2 != nil {
|
||
t.Errorf("WindowEnd Reset failed, result = %v, want nil", res2)
|
||
}
|
||
|
||
// 测试Clone
|
||
clone2 := agg2.Clone().(*WindowEndFunction)
|
||
if clone2.windowEnd != agg2.windowEnd {
|
||
t.Errorf("WindowEnd Clone failed")
|
||
}
|
||
}
|
||
|
||
func TestExpressionFunction(t *testing.T) {
|
||
fn := NewExpressionFunction()
|
||
ctx := &FunctionContext{}
|
||
|
||
// 测试Execute方法
|
||
_, err := fn.Execute(ctx, []interface{}{"x + y", 1, 2})
|
||
if err != nil {
|
||
t.Errorf("Execute error: %v", err)
|
||
}
|
||
// 注意:这里的结果取决于表达式求值器的实现
|
||
// 我们主要测试函数调用是否成功
|
||
|
||
// 测试聚合器方法
|
||
agg := fn.New().(*ExpressionFunction)
|
||
agg.Add("test")
|
||
_ = agg.Result()
|
||
// 结果取决于表达式求值器的实现
|
||
|
||
// 测试Reset
|
||
agg.Reset()
|
||
res := agg.Result()
|
||
if res != nil {
|
||
t.Errorf("Reset failed, result = %v, want nil", res)
|
||
}
|
||
|
||
// 测试Clone
|
||
clone := agg.Clone().(*ExpressionFunction)
|
||
if !reflect.DeepEqual(clone.values, agg.values) {
|
||
t.Errorf("Clone failed")
|
||
}
|
||
}
|
||
|
||
func TestExpressionAggregatorFunction(t *testing.T) {
|
||
fn := NewExpressionAggregatorFunction()
|
||
ctx := &FunctionContext{}
|
||
|
||
// 测试Execute方法
|
||
_, err := fn.Execute(ctx, []interface{}{"sum(x)", 1, 2, 3})
|
||
if err != nil {
|
||
t.Errorf("Execute error: %v", err)
|
||
}
|
||
// 注意:这里的结果取决于表达式求值器的实现
|
||
|
||
// 测试聚合器方法
|
||
agg := fn.New().(*ExpressionAggregatorFunction)
|
||
agg.Add("test")
|
||
_ = agg.Result()
|
||
// 结果取决于表达式求值器的实现
|
||
|
||
// 测试Reset
|
||
agg.Reset()
|
||
res := agg.Result()
|
||
if res != nil {
|
||
t.Errorf("Reset failed, result = %v, want nil", res)
|
||
}
|
||
|
||
// 测试Clone
|
||
clone := agg.Clone().(*ExpressionAggregatorFunction)
|
||
if clone.lastResult != agg.lastResult {
|
||
t.Errorf("Clone failed")
|
||
}
|
||
}
|
||
|
||
func TestFirstValueFunction(t *testing.T) {
|
||
fn := NewFirstValueFunction()
|
||
|
||
// 测试聚合器方法
|
||
agg := fn.New().(*FirstValueFunction)
|
||
agg.Add("x")
|
||
agg.Add("y")
|
||
agg.Add("z")
|
||
res := agg.Result()
|
||
if res != "x" {
|
||
t.Errorf("Agg first_value result = %v, want x", res)
|
||
}
|
||
|
||
// 测试Reset
|
||
agg.Reset()
|
||
res2 := agg.Result()
|
||
if res2 != nil {
|
||
t.Errorf("Reset failed, result = %v, want nil", res2)
|
||
}
|
||
|
||
// 测试Clone
|
||
clone := agg.Clone().(*FirstValueFunction)
|
||
if clone.firstValue != agg.firstValue {
|
||
t.Errorf("Clone failed")
|
||
}
|
||
}
|
||
|
||
func TestLeadFunction(t *testing.T) {
|
||
fn := NewLeadFunction()
|
||
|
||
// 测试聚合器方法
|
||
agg := fn.New().(*LeadFunction)
|
||
agg.Add("x")
|
||
agg.Add("y")
|
||
agg.Add("z")
|
||
res := agg.Result()
|
||
if res != nil {
|
||
t.Errorf("Agg lead result = %v, want nil", res)
|
||
}
|
||
|
||
// 测试Reset
|
||
agg.Reset()
|
||
res2 := agg.Result()
|
||
if res2 != nil {
|
||
t.Errorf("Reset failed, result = %v, want nil", res2)
|
||
}
|
||
|
||
// 测试Clone
|
||
clone := agg.Clone().(*LeadFunction)
|
||
if clone.defaultValue != agg.defaultValue || clone.offset != agg.offset {
|
||
t.Errorf("Clone failed")
|
||
}
|
||
}
|
||
|
||
func TestNthValueFunction(t *testing.T) {
|
||
fn := NewNthValueFunction()
|
||
|
||
// 测试聚合器方法
|
||
agg := fn.New().(*NthValueFunction)
|
||
agg.Add("x")
|
||
agg.Add("y")
|
||
agg.Add("z")
|
||
res := agg.Result()
|
||
if res != "x" {
|
||
t.Errorf("Agg nth_value result = %v, want x", res)
|
||
}
|
||
|
||
// 测试Reset
|
||
agg.Reset()
|
||
res = agg.Result()
|
||
if res != nil {
|
||
t.Errorf("Reset failed, result = %v, want nil", res)
|
||
}
|
||
|
||
// 测试Clone
|
||
clone := agg.Clone().(*NthValueFunction)
|
||
if clone.n != agg.n {
|
||
t.Errorf("Clone failed")
|
||
}
|
||
}
|
||
|
||
func TestWindowFunctionEdgeCases(t *testing.T) {
|
||
// LeadFunction Validate/Execute边界
|
||
lead := NewLeadFunction()
|
||
if err := lead.Validate([]interface{}{}); err == nil {
|
||
t.Error("LeadFunction.Validate should fail for insufficient args")
|
||
}
|
||
_, err := lead.Execute(nil, []interface{}{})
|
||
if err == nil {
|
||
t.Error("LeadFunction.Execute should fail for empty args")
|
||
}
|
||
agg := lead.New().(*LeadFunction)
|
||
agg.Reset()
|
||
_ = agg.Clone()
|
||
|
||
// NthValueFunction Validate/Execute边界
|
||
nth := NewNthValueFunction()
|
||
if err := nth.Validate([]interface{}{}); err == nil {
|
||
t.Error("NthValueFunction.Validate should fail for insufficient args")
|
||
}
|
||
_, err = nth.Execute(nil, []interface{}{})
|
||
if err == nil {
|
||
t.Error("NthValueFunction.Execute should fail for empty args")
|
||
}
|
||
agg2 := nth.New().(*NthValueFunction)
|
||
agg2.Reset()
|
||
_ = agg2.Clone()
|
||
|
||
// FirstValueFunction Validate/Execute边界
|
||
first := NewFirstValueFunction()
|
||
if err := first.Validate([]interface{}{}); err == nil {
|
||
t.Error("FirstValueFunction.Validate should fail for insufficient args")
|
||
}
|
||
_, err = first.Execute(nil, []interface{}{})
|
||
if err == nil {
|
||
t.Error("FirstValueFunction.Execute should fail for empty args")
|
||
}
|
||
agg3 := first.New().(*FirstValueFunction)
|
||
agg3.Reset()
|
||
_ = agg3.Clone()
|
||
|
||
// LastValueFunction Validate/Execute边界
|
||
last := NewLastValueFunction()
|
||
if err := last.Validate([]interface{}{}); err == nil {
|
||
t.Error("LastValueFunction.Validate should fail for insufficient args")
|
||
}
|
||
_, err = last.Execute(nil, []interface{}{})
|
||
if err == nil {
|
||
t.Error("LastValueFunction.Execute should fail for empty args")
|
||
}
|
||
agg4 := last.New().(*LastValueFunction)
|
||
agg4.Reset()
|
||
_ = agg4.Clone()
|
||
|
||
// WindowStartFunction/WindowEndFunction Reset/Clone边界
|
||
ws := NewWindowStartFunction().New().(*WindowStartFunction)
|
||
ws.Reset()
|
||
_ = ws.Clone()
|
||
we := NewWindowEndFunction().New().(*WindowEndFunction)
|
||
we.Reset()
|
||
_ = we.Clone()
|
||
}
|