forked from GiteaTest2015/streamsql
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3464ff5f6e | |||
| 1d9e2a3dab |
@@ -0,0 +1,148 @@
|
||||
package aggregator
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestPostAggregationPlaceholder 测试后聚合占位符的完整功能
|
||||
func TestPostAggregationPlaceholder(t *testing.T) {
|
||||
t.Run("测试PostAggregationPlaceholder基本功能", func(t *testing.T) {
|
||||
// 创建PostAggregationPlaceholder实例
|
||||
placeholder := &PostAggregationPlaceholder{}
|
||||
require.NotNil(t, placeholder)
|
||||
|
||||
// 测试New方法
|
||||
newPlaceholder := placeholder.New()
|
||||
require.NotNil(t, newPlaceholder)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, newPlaceholder)
|
||||
|
||||
// 测试Add方法(应该不做任何操作)
|
||||
placeholder.Add(10)
|
||||
placeholder.Add("test")
|
||||
placeholder.Add(nil)
|
||||
placeholder.Add([]int{1, 2, 3})
|
||||
|
||||
// 测试Result方法(应该返回nil)
|
||||
result := placeholder.Result()
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("测试通过CreateBuiltinAggregator创建PostAggregationPlaceholder", func(t *testing.T) {
|
||||
// 使用CreateBuiltinAggregator创建post_aggregation类型的聚合器
|
||||
aggregator := CreateBuiltinAggregator(PostAggregation)
|
||||
require.NotNil(t, aggregator)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
|
||||
|
||||
// 测试创建的聚合器功能
|
||||
newAgg := aggregator.New()
|
||||
require.NotNil(t, newAgg)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, newAgg)
|
||||
|
||||
// 测试添加各种类型的值
|
||||
newAgg.Add(100)
|
||||
newAgg.Add("string_value")
|
||||
newAgg.Add(map[string]interface{}{"key": "value"})
|
||||
|
||||
// 验证结果始终为nil
|
||||
result := newAgg.Result()
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("测试PostAggregationPlaceholder的多实例独立性", func(t *testing.T) {
|
||||
// 创建多个实例
|
||||
placeholder1 := &PostAggregationPlaceholder{}
|
||||
placeholder2 := placeholder1.New()
|
||||
placeholder3 := placeholder1.New()
|
||||
|
||||
// 验证实例类型正确
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder1)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder2)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, placeholder3)
|
||||
|
||||
// 每个实例都应该返回nil
|
||||
assert.Nil(t, placeholder1.Result())
|
||||
assert.Nil(t, placeholder2.Result())
|
||||
assert.Nil(t, placeholder3.Result())
|
||||
|
||||
// 验证Add操作不会影响结果(因为是占位符)
|
||||
placeholder1.Add("test1")
|
||||
placeholder2.Add("test2")
|
||||
placeholder3.Add("test3")
|
||||
assert.Nil(t, placeholder1.Result())
|
||||
assert.Nil(t, placeholder2.Result())
|
||||
assert.Nil(t, placeholder3.Result())
|
||||
})
|
||||
|
||||
t.Run("测试PostAggregationPlaceholder在聚合场景中的使用", func(t *testing.T) {
|
||||
// 创建包含PostAggregationPlaceholder的聚合字段
|
||||
groupFields := []string{"category"}
|
||||
aggFields := []AggregationField{
|
||||
{InputField: "value", AggregateType: Sum, OutputAlias: "sum_value"},
|
||||
{InputField: "placeholder_field", AggregateType: PostAggregation, OutputAlias: "post_agg_field"},
|
||||
}
|
||||
|
||||
// 创建分组聚合器
|
||||
agg := NewGroupAggregator(groupFields, aggFields)
|
||||
require.NotNil(t, agg)
|
||||
|
||||
// 添加测试数据
|
||||
testData := []map[string]interface{}{
|
||||
{"category": "A", "value": 10, "placeholder_field": "should_be_ignored"},
|
||||
{"category": "A", "value": 20, "placeholder_field": "also_ignored"},
|
||||
{"category": "B", "value": 30, "placeholder_field": 999},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
err := agg.Add(data)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// 获取结果
|
||||
results, err := agg.GetResults()
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, results, 2)
|
||||
|
||||
// 验证PostAggregationPlaceholder字段的结果为nil
|
||||
for _, result := range results {
|
||||
assert.Contains(t, result, "post_agg_field")
|
||||
assert.Nil(t, result["post_agg_field"])
|
||||
// 验证正常聚合字段工作正常
|
||||
assert.Contains(t, result, "sum_value")
|
||||
assert.NotNil(t, result["sum_value"])
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestCreateBuiltinAggregatorPostAggregation 测试CreateBuiltinAggregator对post_aggregation类型的处理
|
||||
func TestCreateBuiltinAggregatorPostAggregation(t *testing.T) {
|
||||
t.Run("测试post_aggregation类型聚合器创建", func(t *testing.T) {
|
||||
aggregator := CreateBuiltinAggregator("post_aggregation")
|
||||
require.NotNil(t, aggregator)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
|
||||
})
|
||||
|
||||
t.Run("测试PostAggregation常量", func(t *testing.T) {
|
||||
// 验证PostAggregation常量值
|
||||
assert.Equal(t, AggregateType("post_aggregation"), PostAggregation)
|
||||
|
||||
// 使用常量创建聚合器
|
||||
aggregator := CreateBuiltinAggregator(PostAggregation)
|
||||
require.NotNil(t, aggregator)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, aggregator)
|
||||
})
|
||||
|
||||
t.Run("测试与其他聚合类型的区别", func(t *testing.T) {
|
||||
// 创建不同类型的聚合器
|
||||
sumAgg := CreateBuiltinAggregator(Sum)
|
||||
countAgg := CreateBuiltinAggregator(Count)
|
||||
postAgg := CreateBuiltinAggregator(PostAggregation)
|
||||
|
||||
// 验证类型不同
|
||||
assert.NotEqual(t, sumAgg, postAgg)
|
||||
assert.NotEqual(t, countAgg, postAgg)
|
||||
assert.IsType(t, &PostAggregationPlaceholder{}, postAgg)
|
||||
})
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -479,12 +479,7 @@ func (f *YearFunction) Validate(args []interface{}) error {
|
||||
}
|
||||
|
||||
func (f *YearFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
|
||||
// 首先检查是否是 time.Time 类型
|
||||
if t, ok := args[0].(time.Time); ok {
|
||||
return float64(t.Year()), nil
|
||||
}
|
||||
|
||||
// 如果不是 time.Time,尝试转换为字符串并解析
|
||||
// 尝试转换为字符串并解析
|
||||
dateStr, err := cast.ToStringE(args[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid date: %v", err)
|
||||
@@ -497,7 +492,7 @@ func (f *YearFunction) Execute(ctx *FunctionContext, args []interface{}) (interf
|
||||
}
|
||||
}
|
||||
|
||||
return float64(t.Year()), nil
|
||||
return t.Year(), nil
|
||||
}
|
||||
|
||||
// MonthFunction 提取月份函数
|
||||
@@ -516,12 +511,7 @@ func (f *MonthFunction) Validate(args []interface{}) error {
|
||||
}
|
||||
|
||||
func (f *MonthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
|
||||
// 首先检查是否是 time.Time 类型
|
||||
if t, ok := args[0].(time.Time); ok {
|
||||
return float64(t.Month()), nil
|
||||
}
|
||||
|
||||
// 如果不是 time.Time,尝试转换为字符串并解析
|
||||
// 转换为字符串并解析
|
||||
dateStr, err := cast.ToStringE(args[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid date: %v", err)
|
||||
@@ -534,7 +524,7 @@ func (f *MonthFunction) Execute(ctx *FunctionContext, args []interface{}) (inter
|
||||
}
|
||||
}
|
||||
|
||||
return float64(t.Month()), nil
|
||||
return int(t.Month()), nil
|
||||
}
|
||||
|
||||
// DayFunction 提取日期函数
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
+102
-20
@@ -5,7 +5,13 @@ import (
|
||||
"reflect"
|
||||
)
|
||||
|
||||
// UnnestFunction 将数组展开为多行
|
||||
const (
|
||||
UnnestObjectMarker = "__unnest_object__"
|
||||
UnnestDataKey = "__data__"
|
||||
UnnestEmptyMarker = "__empty_unnest__"
|
||||
DefaultValueKey = "value"
|
||||
)
|
||||
|
||||
type UnnestFunction struct {
|
||||
*BaseFunction
|
||||
}
|
||||
@@ -27,7 +33,13 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte
|
||||
|
||||
array := args[0]
|
||||
if array == nil {
|
||||
return []interface{}{}, nil
|
||||
// 返回带有unnest标记的空结果
|
||||
return []interface{}{
|
||||
map[string]interface{}{
|
||||
UnnestObjectMarker: true,
|
||||
UnnestEmptyMarker: true, // 标记这是空unnest结果
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// 使用反射检查是否为数组或切片
|
||||
@@ -36,7 +48,18 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte
|
||||
return nil, fmt.Errorf("unnest requires an array or slice, got %T", array)
|
||||
}
|
||||
|
||||
// 转换为 []interface{}
|
||||
// 如果数组为空,返回带标记的空数组
|
||||
if v.Len() == 0 {
|
||||
// 返回带有unnest标记的空结果
|
||||
return []interface{}{
|
||||
map[string]interface{}{
|
||||
UnnestObjectMarker: true,
|
||||
UnnestEmptyMarker: true, // 标记这是空unnest结果
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// 转换为 []interface{},所有元素都标记为unnest结果
|
||||
result := make([]interface{}, v.Len())
|
||||
for i := 0; i < v.Len(); i++ {
|
||||
elem := v.Index(i).Interface()
|
||||
@@ -45,39 +68,46 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte
|
||||
if elemMap, ok := elem.(map[string]interface{}); ok {
|
||||
// 对于对象,我们返回一个特殊的结构来表示需要展开为列
|
||||
result[i] = map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": elemMap,
|
||||
UnnestObjectMarker: true,
|
||||
UnnestDataKey: elemMap,
|
||||
}
|
||||
} else {
|
||||
result[i] = elem
|
||||
// 对于普通元素,也需要标记为unnest结果
|
||||
result[i] = map[string]interface{}{
|
||||
UnnestObjectMarker: true,
|
||||
UnnestDataKey: elem,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// UnnestResult 表示 unnest 函数的结果
|
||||
type UnnestResult struct {
|
||||
Rows []map[string]interface{}
|
||||
}
|
||||
|
||||
// IsUnnestResult 检查是否为 unnest 结果
|
||||
func IsUnnestResult(value interface{}) bool {
|
||||
if slice, ok := value.([]interface{}); ok {
|
||||
for _, item := range slice {
|
||||
if itemMap, ok := item.(map[string]interface{}); ok {
|
||||
if unnest, exists := itemMap["__unnest_object__"]; exists {
|
||||
if unnestBool, ok := unnest.(bool); ok && unnestBool {
|
||||
return true
|
||||
}
|
||||
slice, ok := value.([]interface{})
|
||||
if !ok || len(slice) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// 检查数组中是否有任何unnest标记的元素
|
||||
for _, item := range slice {
|
||||
if itemMap, ok := item.(map[string]interface{}); ok {
|
||||
if unnest, exists := itemMap[UnnestObjectMarker]; exists {
|
||||
if unnestBool, ok := unnest.(bool); ok && unnestBool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 如果没有找到unnest标记,则不是unnest结果
|
||||
return false
|
||||
}
|
||||
|
||||
// ProcessUnnestResult 处理 unnest 结果,将其转换为多行
|
||||
func ProcessUnnestResult(value interface{}) []map[string]interface{} {
|
||||
slice, ok := value.([]interface{})
|
||||
if !ok {
|
||||
@@ -87,20 +117,72 @@ func ProcessUnnestResult(value interface{}) []map[string]interface{} {
|
||||
var rows []map[string]interface{}
|
||||
for _, item := range slice {
|
||||
if itemMap, ok := item.(map[string]interface{}); ok {
|
||||
if unnest, exists := itemMap["__unnest_object__"]; exists {
|
||||
if unnest, exists := itemMap[UnnestObjectMarker]; exists {
|
||||
if unnestBool, ok := unnest.(bool); ok && unnestBool {
|
||||
if data, exists := itemMap["__data__"]; exists {
|
||||
if data, exists := itemMap[UnnestDataKey]; exists {
|
||||
// 检查数据是否为对象(map)
|
||||
if dataMap, ok := data.(map[string]interface{}); ok {
|
||||
// 对象数据直接展开为列
|
||||
rows = append(rows, dataMap)
|
||||
} else {
|
||||
// 普通数据使用默认字段名
|
||||
row := map[string]interface{}{
|
||||
DefaultValueKey: data,
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
// 对于非对象元素,创建一个包含单个值的行
|
||||
// 对于非标记元素,创建一个包含单个值的行(向后兼容)
|
||||
row := map[string]interface{}{
|
||||
"value": item,
|
||||
DefaultValueKey: item,
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
|
||||
return rows
|
||||
}
|
||||
|
||||
func ProcessUnnestResultWithFieldName(value interface{}, fieldName string) []map[string]interface{} {
|
||||
slice, ok := value.([]interface{})
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
var rows []map[string]interface{}
|
||||
for _, item := range slice {
|
||||
if itemMap, ok := item.(map[string]interface{}); ok {
|
||||
if unnest, exists := itemMap[UnnestObjectMarker]; exists {
|
||||
if unnestBool, ok := unnest.(bool); ok && unnestBool {
|
||||
// 检查是否为空unnest结果
|
||||
if itemMap[UnnestEmptyMarker] == true {
|
||||
// 空unnest结果,返回空数组
|
||||
return []map[string]interface{}{}
|
||||
}
|
||||
|
||||
if data, exists := itemMap[UnnestDataKey]; exists {
|
||||
// 检查数据是否为对象(map)
|
||||
if dataMap, ok := data.(map[string]interface{}); ok {
|
||||
// 对象数据直接展开为列
|
||||
rows = append(rows, dataMap)
|
||||
} else {
|
||||
// 普通数据使用指定字段名
|
||||
row := map[string]interface{}{
|
||||
fieldName: data,
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
// 对于非标记元素,使用指定的字段名创建行(向后兼容)
|
||||
row := map[string]interface{}{
|
||||
fieldName: item,
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
|
||||
@@ -15,7 +15,20 @@ func TestUnnestFunction(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("UnnestFunction should not return error: %v", err)
|
||||
}
|
||||
expected := []interface{}{"a", "b", "c"}
|
||||
expected := []interface{}{
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": "a",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": "b",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": "c",
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(result, expected) {
|
||||
t.Errorf("UnnestFunction = %v, want %v", result, expected)
|
||||
}
|
||||
@@ -51,8 +64,15 @@ func TestUnnestFunction(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("UnnestFunction should not return error for empty array: %v", err)
|
||||
}
|
||||
if len(result.([]interface{})) != 0 {
|
||||
t.Errorf("UnnestFunction should return empty array for empty input")
|
||||
// 空数组应该返回带有空标记的结果
|
||||
expectedEmpty := []interface{}{
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__empty_unnest__": true,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(result, expectedEmpty) {
|
||||
t.Errorf("UnnestFunction empty array = %v, want %v", result, expectedEmpty)
|
||||
}
|
||||
|
||||
// 测试nil参数
|
||||
@@ -61,8 +81,15 @@ func TestUnnestFunction(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("UnnestFunction should not return error for nil: %v", err)
|
||||
}
|
||||
if len(result.([]interface{})) != 0 {
|
||||
t.Errorf("UnnestFunction should return empty array for nil input")
|
||||
// nil应该返回带有空标记的结果
|
||||
expectedNil := []interface{}{
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__empty_unnest__": true,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(result, expectedNil) {
|
||||
t.Errorf("UnnestFunction nil = %v, want %v", result, expectedNil)
|
||||
}
|
||||
|
||||
// 测试错误参数数量
|
||||
@@ -85,7 +112,20 @@ func TestUnnestFunction(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("UnnestFunction should handle arrays: %v", err)
|
||||
}
|
||||
expected = []interface{}{"x", "y", "z"}
|
||||
expected = []interface{}{
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": "x",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": "y",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": "z",
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(result, expected) {
|
||||
t.Errorf("UnnestFunction array = %v, want %v", result, expected)
|
||||
}
|
||||
|
||||
@@ -82,7 +82,11 @@ func TestNewStringFunctions(t *testing.T) {
|
||||
if !exists {
|
||||
t.Fatalf("Function %s not found", tt.funcName)
|
||||
}
|
||||
|
||||
// 验证参数
|
||||
if err := fn.Validate(tt.args); err != nil {
|
||||
t.Errorf("Validate() error = %v", err)
|
||||
return
|
||||
}
|
||||
ctx := &FunctionContext{}
|
||||
result, err := fn.Execute(ctx, tt.args)
|
||||
|
||||
@@ -167,84 +171,6 @@ func TestStringFunctionValidation(t *testing.T) {
|
||||
args: []interface{}{"hello"},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "upper no args",
|
||||
function: NewUpperFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "upper valid args",
|
||||
function: NewUpperFunction(),
|
||||
args: []interface{}{"hello"},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "endswith no args",
|
||||
function: NewEndswithFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "endswith one arg",
|
||||
function: NewEndswithFunction(),
|
||||
args: []interface{}{"hello"},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "endswith valid args",
|
||||
function: NewEndswithFunction(),
|
||||
args: []interface{}{"hello", "lo"},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "substring no args",
|
||||
function: NewSubstringFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "substring one arg",
|
||||
function: NewSubstringFunction(),
|
||||
args: []interface{}{"hello"},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "substring valid args",
|
||||
function: NewSubstringFunction(),
|
||||
args: []interface{}{"hello", 1},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "replace no args",
|
||||
function: NewReplaceFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "replace two args",
|
||||
function: NewReplaceFunction(),
|
||||
args: []interface{}{"hello", "world"},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "replace valid args",
|
||||
function: NewReplaceFunction(),
|
||||
args: []interface{}{"hello", "l", "x"},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "regexp_matches no args",
|
||||
function: NewRegexpMatchesFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "regexp_matches valid args",
|
||||
function: NewRegexpMatchesFunction(),
|
||||
args: []interface{}{"hello123", "[0-9]+"},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
||||
@@ -4,184 +4,8 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestTypeFunctions 测试类型检查函数的基本功能
|
||||
// TestTypeFunctions 测试类型函数
|
||||
func TestTypeFunctions(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
funcName string
|
||||
args []interface{}
|
||||
expected interface{}
|
||||
}{
|
||||
{
|
||||
name: "is_null true",
|
||||
funcName: "is_null",
|
||||
args: []interface{}{nil},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "is_null false",
|
||||
funcName: "is_null",
|
||||
args: []interface{}{"test"},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "is_not_null true",
|
||||
funcName: "is_not_null",
|
||||
args: []interface{}{"test"},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "is_not_null false",
|
||||
funcName: "is_not_null",
|
||||
args: []interface{}{nil},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "is_numeric true",
|
||||
funcName: "is_numeric",
|
||||
args: []interface{}{123},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "is_numeric false",
|
||||
funcName: "is_numeric",
|
||||
args: []interface{}{"test"},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "is_string true",
|
||||
funcName: "is_string",
|
||||
args: []interface{}{"test"},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "is_string false",
|
||||
funcName: "is_string",
|
||||
args: []interface{}{123},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "is_bool true",
|
||||
funcName: "is_bool",
|
||||
args: []interface{}{true},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "is_bool false",
|
||||
funcName: "is_bool",
|
||||
args: []interface{}{"test"},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
fn, exists := Get(tt.funcName)
|
||||
if !exists {
|
||||
t.Fatalf("Function %s not found", tt.funcName)
|
||||
}
|
||||
|
||||
result, err := fn.Execute(&FunctionContext{}, tt.args)
|
||||
if err != nil {
|
||||
t.Errorf("Execute() error = %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if result != tt.expected {
|
||||
t.Errorf("Execute() = %v, want %v", result, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestTypeFunctionValidation 测试类型函数的参数验证
|
||||
func TestTypeFunctionValidation(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
function Function
|
||||
args []interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "is_null no args",
|
||||
function: NewIsNullFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "is_null too many args",
|
||||
function: NewIsNullFunction(),
|
||||
args: []interface{}{"test", "extra"},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "is_null valid args",
|
||||
function: NewIsNullFunction(),
|
||||
args: []interface{}{"test"},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "is_not_null no args",
|
||||
function: NewIsNotNullFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "is_not_null valid args",
|
||||
function: NewIsNotNullFunction(),
|
||||
args: []interface{}{nil},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "is_numeric no args",
|
||||
function: NewIsNumericFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "is_numeric valid args",
|
||||
function: NewIsNumericFunction(),
|
||||
args: []interface{}{123},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "is_string no args",
|
||||
function: NewIsStringFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "is_string valid args",
|
||||
function: NewIsStringFunction(),
|
||||
args: []interface{}{"test"},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "is_bool no args",
|
||||
function: NewIsBoolFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "is_bool valid args",
|
||||
function: NewIsBoolFunction(),
|
||||
args: []interface{}{true},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.function.Validate(tt.args)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestTypeFunctionEdgeCases 测试类型函数的边界情况
|
||||
func TestTypeFunctionEdgeCases(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
function Function
|
||||
@@ -242,6 +66,12 @@ func TestTypeFunctionEdgeCases(t *testing.T) {
|
||||
args: []interface{}{true},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "is_numeric with nil",
|
||||
function: NewIsNumericFunction(),
|
||||
args: []interface{}{nil},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "is_string with empty string",
|
||||
function: NewIsStringFunction(),
|
||||
@@ -254,10 +84,45 @@ func TestTypeFunctionEdgeCases(t *testing.T) {
|
||||
args: []interface{}{false},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "is_bool with nil",
|
||||
function: NewIsBoolFunction(),
|
||||
args: []interface{}{nil},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "is_array",
|
||||
function: NewIsArrayFunction(),
|
||||
args: []interface{}{[]int{1, 2, 3}},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "is_array with nil",
|
||||
function: NewIsArrayFunction(),
|
||||
args: []interface{}{nil},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "is_object",
|
||||
function: NewIsObjectFunction(),
|
||||
args: []interface{}{map[string]int{"a": 1, "b": 2}},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "is_object with nil",
|
||||
function: NewIsObjectFunction(),
|
||||
args: []interface{}{nil},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// 验证参数
|
||||
if err := tt.function.Validate(tt.args); err != nil {
|
||||
t.Errorf("Validate() error = %v", err)
|
||||
return
|
||||
}
|
||||
result, err := tt.function.Execute(&FunctionContext{}, tt.args)
|
||||
if err != nil {
|
||||
t.Errorf("Execute() error = %v", err)
|
||||
@@ -270,3 +135,41 @@ func TestTypeFunctionEdgeCases(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestTypeFunctionValidation 测试类型函数的参数验证
|
||||
func TestTypeFunctionValidation(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
function Function
|
||||
args []interface{}
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "is_null no args",
|
||||
function: NewIsNullFunction(),
|
||||
args: []interface{}{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "is_null too many args",
|
||||
function: NewIsNullFunction(),
|
||||
args: []interface{}{"test", "extra"},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "is_null valid args",
|
||||
function: NewIsNullFunction(),
|
||||
args: []interface{}{"test"},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.function.Validate(tt.args)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -522,8 +522,8 @@ func (dp *DataProcessor) processDirectData(data map[string]interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// Wrap result as array
|
||||
results := []map[string]interface{}{result}
|
||||
// Check if any field contains unnest function result and expand to multiple rows
|
||||
results := dp.expandUnnestResults(result, dataMap)
|
||||
|
||||
// Non-blocking send result to resultChan
|
||||
dp.stream.sendResultNonBlocking(results)
|
||||
@@ -531,3 +531,45 @@ func (dp *DataProcessor) processDirectData(data map[string]interface{}) {
|
||||
// Asynchronously call all sinks, avoid blocking
|
||||
dp.stream.callSinksAsync(results)
|
||||
}
|
||||
|
||||
// expandUnnestResults 检查结果是否包含 unnest 函数输出并展开为多行
|
||||
func (dp *DataProcessor) expandUnnestResults(result map[string]interface{}, originalData map[string]interface{}) []map[string]interface{} {
|
||||
// Early return if no unnest function is used in the query
|
||||
// This optimization significantly improves performance for queries without unnest functions
|
||||
if !dp.stream.hasUnnestFunction {
|
||||
return []map[string]interface{}{result}
|
||||
}
|
||||
|
||||
if len(result) == 0 {
|
||||
return []map[string]interface{}{result}
|
||||
}
|
||||
|
||||
for fieldName, fieldValue := range result {
|
||||
if functions.IsUnnestResult(fieldValue) {
|
||||
expandedRows := functions.ProcessUnnestResultWithFieldName(fieldValue, fieldName)
|
||||
// 如果unnest结果为空,返回空结果数组
|
||||
if len(expandedRows) == 0 {
|
||||
return []map[string]interface{}{}
|
||||
}
|
||||
|
||||
results := make([]map[string]interface{}, len(expandedRows))
|
||||
for i, unnestRow := range expandedRows {
|
||||
newRow := make(map[string]interface{}, len(result)+len(unnestRow))
|
||||
for k, v := range result {
|
||||
if k != fieldName {
|
||||
newRow[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range unnestRow {
|
||||
newRow[k] = v
|
||||
}
|
||||
|
||||
results[i] = newRow
|
||||
}
|
||||
return results
|
||||
}
|
||||
}
|
||||
|
||||
return []map[string]interface{}{result}
|
||||
}
|
||||
|
||||
@@ -430,3 +430,140 @@ func TestDataProcessor_ExpressionWithNullValues(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 20.0, result)
|
||||
}
|
||||
|
||||
// TestDataProcessor_ExpandUnnestResults 测试 expandUnnestResults 函数的各种情况
|
||||
func TestDataProcessor_ExpandUnnestResults(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
hasUnnestFunction bool
|
||||
result map[string]interface{}
|
||||
originalData map[string]interface{}
|
||||
expected []map[string]interface{}
|
||||
}{
|
||||
{
|
||||
name: "no unnest function - should return single result",
|
||||
hasUnnestFunction: false,
|
||||
result: map[string]interface{}{
|
||||
"name": "test",
|
||||
"age": 25,
|
||||
},
|
||||
originalData: map[string]interface{}{"id": 1},
|
||||
expected: []map[string]interface{}{
|
||||
{"name": "test", "age": 25},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty result - should return single empty result",
|
||||
hasUnnestFunction: true,
|
||||
result: map[string]interface{}{},
|
||||
originalData: map[string]interface{}{"id": 1},
|
||||
expected: []map[string]interface{}{
|
||||
{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no unnest result - should return single result",
|
||||
hasUnnestFunction: true,
|
||||
result: map[string]interface{}{
|
||||
"name": "test",
|
||||
"age": 25,
|
||||
},
|
||||
originalData: map[string]interface{}{"id": 1},
|
||||
expected: []map[string]interface{}{
|
||||
{"name": "test", "age": 25},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unnest result with simple values",
|
||||
hasUnnestFunction: true,
|
||||
result: map[string]interface{}{
|
||||
"name": "test",
|
||||
"items": []interface{}{
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": "item1",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": "item2",
|
||||
},
|
||||
},
|
||||
},
|
||||
originalData: map[string]interface{}{"id": 1},
|
||||
expected: []map[string]interface{}{
|
||||
{"name": "test", "items": "item1"},
|
||||
{"name": "test", "items": "item2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unnest result with object values",
|
||||
hasUnnestFunction: true,
|
||||
result: map[string]interface{}{
|
||||
"name": "test",
|
||||
"orders": []interface{}{
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": map[string]interface{}{
|
||||
"order_id": 1,
|
||||
"amount": 100,
|
||||
},
|
||||
},
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__data__": map[string]interface{}{
|
||||
"order_id": 2,
|
||||
"amount": 200,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
originalData: map[string]interface{}{"id": 1},
|
||||
expected: []map[string]interface{}{
|
||||
{"name": "test", "order_id": 1, "amount": 100},
|
||||
{"name": "test", "order_id": 2, "amount": 200},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty unnest result - should return empty array",
|
||||
hasUnnestFunction: true,
|
||||
result: map[string]interface{}{
|
||||
"name": "test",
|
||||
"items": []interface{}{
|
||||
map[string]interface{}{
|
||||
"__unnest_object__": true,
|
||||
"__empty_unnest__": true,
|
||||
},
|
||||
},
|
||||
},
|
||||
originalData: map[string]interface{}{"id": 1},
|
||||
expected: []map[string]interface{}{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// 创建测试用的 stream 和 processor
|
||||
config := types.Config{
|
||||
SimpleFields: []string{"name", "age"},
|
||||
}
|
||||
stream, err := NewStream(config)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
if stream != nil {
|
||||
close(stream.done)
|
||||
}
|
||||
}()
|
||||
|
||||
// 设置 hasUnnestFunction 标志
|
||||
stream.hasUnnestFunction = tt.hasUnnestFunction
|
||||
|
||||
processor := NewDataProcessor(stream)
|
||||
|
||||
// 调用被测试的函数
|
||||
result := processor.expandUnnestResults(tt.result, tt.originalData)
|
||||
|
||||
// 验证结果
|
||||
assert.Equal(t, tt.expected, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,6 +98,8 @@ func (s *Stream) compileSimpleFieldInfo(fieldSpec string) *fieldProcessInfo {
|
||||
|
||||
// compileExpressionInfo pre-compiles expression processing information
|
||||
func (s *Stream) compileExpressionInfo() {
|
||||
// Initialize unnest function detection flag
|
||||
s.hasUnnestFunction = false
|
||||
bridge := functions.GetExprBridge()
|
||||
|
||||
for fieldName, fieldExpr := range s.config.FieldExpressions {
|
||||
@@ -124,6 +126,11 @@ func (s *Stream) compileExpressionInfo() {
|
||||
exprInfo.hasNestedFields = !exprInfo.isFunctionCall && strings.Contains(fieldExpr.Expression, ".")
|
||||
exprInfo.needsBacktickPreprocess = bridge.ContainsBacktickIdentifiers(fieldExpr.Expression)
|
||||
|
||||
// Check if expression contains unnest function
|
||||
if exprInfo.isFunctionCall && strings.Contains(strings.ToLower(fieldExpr.Expression), "unnest(") {
|
||||
s.hasUnnestFunction = true
|
||||
}
|
||||
|
||||
// Pre-compile expression object (only for non-function call expressions)
|
||||
if !exprInfo.isFunctionCall {
|
||||
exprToCompile := fieldExpr.Expression
|
||||
|
||||
@@ -97,6 +97,11 @@ type Stream struct {
|
||||
compiledFieldInfo map[string]*fieldProcessInfo // Field processing information cache
|
||||
compiledExprInfo map[string]*expressionProcessInfo // Expression processing information cache
|
||||
|
||||
// Unnest function optimization flags
|
||||
// hasUnnestFunction 标识查询是否使用了 unnest 函数,在预处理阶段确定
|
||||
// 用于优化 expandUnnestResults 函数的性能,避免不必要的字段遍历检查
|
||||
hasUnnestFunction bool // Whether the query uses unnest function, determined during preprocessing
|
||||
|
||||
}
|
||||
|
||||
// NewStream creates Stream using unified configuration
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user