diff --git a/functions/functions_multirow.go b/functions/functions_multirow.go index 1a8004e..579cb11 100644 --- a/functions/functions_multirow.go +++ b/functions/functions_multirow.go @@ -5,7 +5,13 @@ import ( "reflect" ) -// UnnestFunction 将数组展开为多行 +const ( + UnnestObjectMarker = "__unnest_object__" + UnnestDataKey = "__data__" + UnnestEmptyMarker = "__empty_unnest__" + DefaultValueKey = "value" +) + type UnnestFunction struct { *BaseFunction } @@ -27,7 +33,13 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte array := args[0] if array == nil { - return []interface{}{}, nil + // 返回带有unnest标记的空结果 + return []interface{}{ + map[string]interface{}{ + UnnestObjectMarker: true, + UnnestEmptyMarker: true, // 标记这是空unnest结果 + }, + }, nil } // 使用反射检查是否为数组或切片 @@ -36,7 +48,18 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte return nil, fmt.Errorf("unnest requires an array or slice, got %T", array) } - // 转换为 []interface{} + // 如果数组为空,返回带标记的空数组 + if v.Len() == 0 { + // 返回带有unnest标记的空结果 + return []interface{}{ + map[string]interface{}{ + UnnestObjectMarker: true, + UnnestEmptyMarker: true, // 标记这是空unnest结果 + }, + }, nil + } + + // 转换为 []interface{},所有元素都标记为unnest结果 result := make([]interface{}, v.Len()) for i := 0; i < v.Len(); i++ { elem := v.Index(i).Interface() @@ -45,39 +68,46 @@ func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (inte if elemMap, ok := elem.(map[string]interface{}); ok { // 对于对象,我们返回一个特殊的结构来表示需要展开为列 result[i] = map[string]interface{}{ - "__unnest_object__": true, - "__data__": elemMap, + UnnestObjectMarker: true, + UnnestDataKey: elemMap, } } else { - result[i] = elem + // 对于普通元素,也需要标记为unnest结果 + result[i] = map[string]interface{}{ + UnnestObjectMarker: true, + UnnestDataKey: elem, + } } } return result, nil } -// UnnestResult 表示 unnest 函数的结果 type UnnestResult struct { Rows []map[string]interface{} } -// IsUnnestResult 检查是否为 unnest 结果 func IsUnnestResult(value interface{}) bool { - if slice, ok := value.([]interface{}); ok { - for _, item := range slice { - if itemMap, ok := item.(map[string]interface{}); ok { - if unnest, exists := itemMap["__unnest_object__"]; exists { - if unnestBool, ok := unnest.(bool); ok && unnestBool { - return true - } + slice, ok := value.([]interface{}) + if !ok || len(slice) == 0 { + return false + } + + // 检查数组中是否有任何unnest标记的元素 + for _, item := range slice { + if itemMap, ok := item.(map[string]interface{}); ok { + if unnest, exists := itemMap[UnnestObjectMarker]; exists { + if unnestBool, ok := unnest.(bool); ok && unnestBool { + return true } } } } + + // 如果没有找到unnest标记,则不是unnest结果 return false } -// ProcessUnnestResult 处理 unnest 结果,将其转换为多行 func ProcessUnnestResult(value interface{}) []map[string]interface{} { slice, ok := value.([]interface{}) if !ok { @@ -87,20 +117,72 @@ func ProcessUnnestResult(value interface{}) []map[string]interface{} { var rows []map[string]interface{} for _, item := range slice { if itemMap, ok := item.(map[string]interface{}); ok { - if unnest, exists := itemMap["__unnest_object__"]; exists { + if unnest, exists := itemMap[UnnestObjectMarker]; exists { if unnestBool, ok := unnest.(bool); ok && unnestBool { - if data, exists := itemMap["__data__"]; exists { + if data, exists := itemMap[UnnestDataKey]; exists { + // 检查数据是否为对象(map) if dataMap, ok := data.(map[string]interface{}); ok { + // 对象数据直接展开为列 rows = append(rows, dataMap) + } else { + // 普通数据使用默认字段名 + row := map[string]interface{}{ + DefaultValueKey: data, + } + rows = append(rows, row) } } continue } } } - // 对于非对象元素,创建一个包含单个值的行 + // 对于非标记元素,创建一个包含单个值的行(向后兼容) row := map[string]interface{}{ - "value": item, + DefaultValueKey: item, + } + rows = append(rows, row) + } + + return rows +} + +func ProcessUnnestResultWithFieldName(value interface{}, fieldName string) []map[string]interface{} { + slice, ok := value.([]interface{}) + if !ok { + return nil + } + + var rows []map[string]interface{} + for _, item := range slice { + if itemMap, ok := item.(map[string]interface{}); ok { + if unnest, exists := itemMap[UnnestObjectMarker]; exists { + if unnestBool, ok := unnest.(bool); ok && unnestBool { + // 检查是否为空unnest结果 + if itemMap[UnnestEmptyMarker] == true { + // 空unnest结果,返回空数组 + return []map[string]interface{}{} + } + + if data, exists := itemMap[UnnestDataKey]; exists { + // 检查数据是否为对象(map) + if dataMap, ok := data.(map[string]interface{}); ok { + // 对象数据直接展开为列 + rows = append(rows, dataMap) + } else { + // 普通数据使用指定字段名 + row := map[string]interface{}{ + fieldName: data, + } + rows = append(rows, row) + } + } + continue + } + } + } + // 对于非标记元素,使用指定的字段名创建行(向后兼容) + row := map[string]interface{}{ + fieldName: item, } rows = append(rows, row) } diff --git a/functions/functions_multirow_test.go b/functions/functions_multirow_test.go index cd795de..8c4c086 100644 --- a/functions/functions_multirow_test.go +++ b/functions/functions_multirow_test.go @@ -15,7 +15,20 @@ func TestUnnestFunction(t *testing.T) { if err != nil { t.Errorf("UnnestFunction should not return error: %v", err) } - expected := []interface{}{"a", "b", "c"} + expected := []interface{}{ + map[string]interface{}{ + "__unnest_object__": true, + "__data__": "a", + }, + map[string]interface{}{ + "__unnest_object__": true, + "__data__": "b", + }, + map[string]interface{}{ + "__unnest_object__": true, + "__data__": "c", + }, + } if !reflect.DeepEqual(result, expected) { t.Errorf("UnnestFunction = %v, want %v", result, expected) } @@ -51,8 +64,15 @@ func TestUnnestFunction(t *testing.T) { if err != nil { t.Errorf("UnnestFunction should not return error for empty array: %v", err) } - if len(result.([]interface{})) != 0 { - t.Errorf("UnnestFunction should return empty array for empty input") + // 空数组应该返回带有空标记的结果 + expectedEmpty := []interface{}{ + map[string]interface{}{ + "__unnest_object__": true, + "__empty_unnest__": true, + }, + } + if !reflect.DeepEqual(result, expectedEmpty) { + t.Errorf("UnnestFunction empty array = %v, want %v", result, expectedEmpty) } // 测试nil参数 @@ -61,8 +81,15 @@ func TestUnnestFunction(t *testing.T) { if err != nil { t.Errorf("UnnestFunction should not return error for nil: %v", err) } - if len(result.([]interface{})) != 0 { - t.Errorf("UnnestFunction should return empty array for nil input") + // nil应该返回带有空标记的结果 + expectedNil := []interface{}{ + map[string]interface{}{ + "__unnest_object__": true, + "__empty_unnest__": true, + }, + } + if !reflect.DeepEqual(result, expectedNil) { + t.Errorf("UnnestFunction nil = %v, want %v", result, expectedNil) } // 测试错误参数数量 @@ -85,7 +112,20 @@ func TestUnnestFunction(t *testing.T) { if err != nil { t.Errorf("UnnestFunction should handle arrays: %v", err) } - expected = []interface{}{"x", "y", "z"} + expected = []interface{}{ + map[string]interface{}{ + "__unnest_object__": true, + "__data__": "x", + }, + map[string]interface{}{ + "__unnest_object__": true, + "__data__": "y", + }, + map[string]interface{}{ + "__unnest_object__": true, + "__data__": "z", + }, + } if !reflect.DeepEqual(result, expected) { t.Errorf("UnnestFunction array = %v, want %v", result, expected) } diff --git a/stream/processor_data.go b/stream/processor_data.go index 3b4df97..9d056ac 100644 --- a/stream/processor_data.go +++ b/stream/processor_data.go @@ -522,8 +522,8 @@ func (dp *DataProcessor) processDirectData(data map[string]interface{}) { } } - // Wrap result as array - results := []map[string]interface{}{result} + // Check if any field contains unnest function result and expand to multiple rows + results := dp.expandUnnestResults(result, dataMap) // Non-blocking send result to resultChan dp.stream.sendResultNonBlocking(results) @@ -531,3 +531,45 @@ func (dp *DataProcessor) processDirectData(data map[string]interface{}) { // Asynchronously call all sinks, avoid blocking dp.stream.callSinksAsync(results) } + +// expandUnnestResults 检查结果是否包含 unnest 函数输出并展开为多行 +func (dp *DataProcessor) expandUnnestResults(result map[string]interface{}, originalData map[string]interface{}) []map[string]interface{} { + // Early return if no unnest function is used in the query + // This optimization significantly improves performance for queries without unnest functions + if !dp.stream.hasUnnestFunction { + return []map[string]interface{}{result} + } + + if len(result) == 0 { + return []map[string]interface{}{result} + } + + for fieldName, fieldValue := range result { + if functions.IsUnnestResult(fieldValue) { + expandedRows := functions.ProcessUnnestResultWithFieldName(fieldValue, fieldName) + // 如果unnest结果为空,返回空结果数组 + if len(expandedRows) == 0 { + return []map[string]interface{}{} + } + + results := make([]map[string]interface{}, len(expandedRows)) + for i, unnestRow := range expandedRows { + newRow := make(map[string]interface{}, len(result)+len(unnestRow)) + for k, v := range result { + if k != fieldName { + newRow[k] = v + } + } + + for k, v := range unnestRow { + newRow[k] = v + } + + results[i] = newRow + } + return results + } + } + + return []map[string]interface{}{result} +} diff --git a/stream/processor_data_test.go b/stream/processor_data_test.go index 4513857..8225c24 100644 --- a/stream/processor_data_test.go +++ b/stream/processor_data_test.go @@ -430,3 +430,140 @@ func TestDataProcessor_ExpressionWithNullValues(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 20.0, result) } + +// TestDataProcessor_ExpandUnnestResults 测试 expandUnnestResults 函数的各种情况 +func TestDataProcessor_ExpandUnnestResults(t *testing.T) { + tests := []struct { + name string + hasUnnestFunction bool + result map[string]interface{} + originalData map[string]interface{} + expected []map[string]interface{} + }{ + { + name: "no unnest function - should return single result", + hasUnnestFunction: false, + result: map[string]interface{}{ + "name": "test", + "age": 25, + }, + originalData: map[string]interface{}{"id": 1}, + expected: []map[string]interface{}{ + {"name": "test", "age": 25}, + }, + }, + { + name: "empty result - should return single empty result", + hasUnnestFunction: true, + result: map[string]interface{}{}, + originalData: map[string]interface{}{"id": 1}, + expected: []map[string]interface{}{ + {}, + }, + }, + { + name: "no unnest result - should return single result", + hasUnnestFunction: true, + result: map[string]interface{}{ + "name": "test", + "age": 25, + }, + originalData: map[string]interface{}{"id": 1}, + expected: []map[string]interface{}{ + {"name": "test", "age": 25}, + }, + }, + { + name: "unnest result with simple values", + hasUnnestFunction: true, + result: map[string]interface{}{ + "name": "test", + "items": []interface{}{ + map[string]interface{}{ + "__unnest_object__": true, + "__data__": "item1", + }, + map[string]interface{}{ + "__unnest_object__": true, + "__data__": "item2", + }, + }, + }, + originalData: map[string]interface{}{"id": 1}, + expected: []map[string]interface{}{ + {"name": "test", "items": "item1"}, + {"name": "test", "items": "item2"}, + }, + }, + { + name: "unnest result with object values", + hasUnnestFunction: true, + result: map[string]interface{}{ + "name": "test", + "orders": []interface{}{ + map[string]interface{}{ + "__unnest_object__": true, + "__data__": map[string]interface{}{ + "order_id": 1, + "amount": 100, + }, + }, + map[string]interface{}{ + "__unnest_object__": true, + "__data__": map[string]interface{}{ + "order_id": 2, + "amount": 200, + }, + }, + }, + }, + originalData: map[string]interface{}{"id": 1}, + expected: []map[string]interface{}{ + {"name": "test", "order_id": 1, "amount": 100}, + {"name": "test", "order_id": 2, "amount": 200}, + }, + }, + { + name: "empty unnest result - should return empty array", + hasUnnestFunction: true, + result: map[string]interface{}{ + "name": "test", + "items": []interface{}{ + map[string]interface{}{ + "__unnest_object__": true, + "__empty_unnest__": true, + }, + }, + }, + originalData: map[string]interface{}{"id": 1}, + expected: []map[string]interface{}{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // 创建测试用的 stream 和 processor + config := types.Config{ + SimpleFields: []string{"name", "age"}, + } + stream, err := NewStream(config) + require.NoError(t, err) + defer func() { + if stream != nil { + close(stream.done) + } + }() + + // 设置 hasUnnestFunction 标志 + stream.hasUnnestFunction = tt.hasUnnestFunction + + processor := NewDataProcessor(stream) + + // 调用被测试的函数 + result := processor.expandUnnestResults(tt.result, tt.originalData) + + // 验证结果 + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/stream/processor_field.go b/stream/processor_field.go index db16671..1701ade 100644 --- a/stream/processor_field.go +++ b/stream/processor_field.go @@ -98,6 +98,8 @@ func (s *Stream) compileSimpleFieldInfo(fieldSpec string) *fieldProcessInfo { // compileExpressionInfo pre-compiles expression processing information func (s *Stream) compileExpressionInfo() { + // Initialize unnest function detection flag + s.hasUnnestFunction = false bridge := functions.GetExprBridge() for fieldName, fieldExpr := range s.config.FieldExpressions { @@ -124,6 +126,11 @@ func (s *Stream) compileExpressionInfo() { exprInfo.hasNestedFields = !exprInfo.isFunctionCall && strings.Contains(fieldExpr.Expression, ".") exprInfo.needsBacktickPreprocess = bridge.ContainsBacktickIdentifiers(fieldExpr.Expression) + // Check if expression contains unnest function + if exprInfo.isFunctionCall && strings.Contains(strings.ToLower(fieldExpr.Expression), "unnest(") { + s.hasUnnestFunction = true + } + // Pre-compile expression object (only for non-function call expressions) if !exprInfo.isFunctionCall { exprToCompile := fieldExpr.Expression diff --git a/stream/stream.go b/stream/stream.go index e59f151..658d8ff 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -97,6 +97,11 @@ type Stream struct { compiledFieldInfo map[string]*fieldProcessInfo // Field processing information cache compiledExprInfo map[string]*expressionProcessInfo // Expression processing information cache + // Unnest function optimization flags + // hasUnnestFunction 标识查询是否使用了 unnest 函数,在预处理阶段确定 + // 用于优化 expandUnnestResults 函数的性能,避免不必要的字段遍历检查 + hasUnnestFunction bool // Whether the query uses unnest function, determined during preprocessing + } // NewStream creates Stream using unified configuration diff --git a/streamsql_function_integration_test.go b/streamsql_function_integration_test.go index 6278998..23fac09 100644 --- a/streamsql_function_integration_test.go +++ b/streamsql_function_integration_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/rulego/streamsql/functions" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -986,14 +987,293 @@ func TestNestedFunctionExecutionOrder(t *testing.T) { // 验证错误处理:invalid_field不存在,应该返回nil或默认值 _, exists := item["error_result"] - if exists { - // 如果字段存在,应该是nil或者错误被正确处理 - t.Logf("Error result field exists: %v", item["error_result"]) - } else { - t.Logf("Error result field does not exist (expected behavior)") - } + assert.True(t, exists) case <-ctx.Done(): t.Fatal("测试超时") } }) } + +// flattenUnnestRows 将可能包含 unnest 结果的批次结果展开为多行,便于断言 +// 兼容两种形态: +// 1) 当前实现:返回单行,其中 alias 字段为 []interface{}(需要在测试侧展开) +// 2) 未来实现:引擎直接返回多行(此时原样返回) +func flattenUnnestRows(result []map[string]interface{}, alias string) []map[string]interface{} { + // 如果已经是多行,直接返回 + if len(result) > 1 { + return result + } + if len(result) == 0 { + return result + } + + // 形如:[{ alias: []interface{}{...} , ...}] + if v, ok := result[0][alias]; ok { + if functions.IsUnnestResult(v) { + // 使用ProcessUnnestResultWithFieldName保留字段名,并合并其他字段 + expandedRows := functions.ProcessUnnestResultWithFieldName(v, alias) + if len(expandedRows) == 0 { + return result + } + + // 将其他字段合并到每一行中 + results := make([]map[string]interface{}, len(expandedRows)) + for i, unnestRow := range expandedRows { + newRow := make(map[string]interface{}, len(result[0])+len(unnestRow)) + // 复制原始行的其他字段(除了unnest字段) + for k, v := range result[0] { + if k != alias { + newRow[k] = v + } + } + // 添加unnest展开的字段 + for k, v := range unnestRow { + newRow[k] = v + } + results[i] = newRow + } + return results + } + } + + return result +} + +// TestUnnestFunctionIntegration 验证 unnest(array) 是否按预期将数组展开为多行 +// 该用例集成到完整 SQL 执行路径: +// - 语法: unnest(array) +// - 描述: 将数组展开为多行 +// - 示例: SELECT unnest(tags) as tag FROM stream +func TestUnnestFunctionIntegration(t *testing.T) { + t.Run("PrimitiveArray", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT unnest(tags) as tag FROM stream" + err := ssql.Execute(sql) + require.NoError(t, err) + + strm := ssql.stream + resultChan := make(chan interface{}, 10) + strm.AddSink(func(result []map[string]interface{}) { + resultChan <- result + }) + + // 输入为普通字符串数组 + input := map[string]interface{}{ + "tags": []string{"a", "b", "c"}, + } + strm.Emit(input) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + select { + case raw := <-resultChan: + batch, ok := raw.([]map[string]interface{}) + require.True(t, ok) + // 按两种形态规范化为多行 + rows := flattenUnnestRows(batch, "tag") + require.Len(t, rows, 3) + + expected := []string{"a", "b", "c"} + for i, exp := range expected { + row := rows[i] + // 兼容两种字段命名:引擎直接展开可能使用别名(tag),函数侧展开为默认字段(value) + var got interface{} + if v, ok := row["tag"]; ok { + got = v + } else if v, ok := row["value"]; ok { + got = v + } else { + t.Fatalf("row %d does not contain expected field 'tag' or 'value': %v", i, row) + } + assert.Equal(t, exp, got) + } + case <-ctx.Done(): + t.Fatal("测试超时,未收到结果") + } + }) + + t.Run("CombinedColumns", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + // 测试组合列:SELECT id,unnest(tags) as tag FROM events + sql := "SELECT id, unnest(tags) as tag FROM stream" + err := ssql.Execute(sql) + require.NoError(t, err) + + strm := ssql.stream + resultChan := make(chan interface{}, 10) + strm.AddSink(func(result []map[string]interface{}) { + resultChan <- result + }) + + // 输入包含id字段和tags数组 + input := map[string]interface{}{ + "id": 100, + "tags": []string{"a", "b", "c"}, + } + strm.Emit(input) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + select { + case raw := <-resultChan: + batch, ok := raw.([]map[string]interface{}) + require.True(t, ok) + // 展开unnest结果 + rows := flattenUnnestRows(batch, "tag") + require.Len(t, rows, 3) + + // 验证每行都包含id字段和tag字段 + expectedTags := []string{"a", "b", "c"} + for i, expectedTag := range expectedTags { + row := rows[i] + + // 验证id字段保持不变 + assert.Equal(t, 100, row["id"], "row %d should have id=100", i) + + // 验证tag字段 + var gotTag interface{} + if v, ok := row["tag"]; ok { + gotTag = v + } else if v, ok := row["value"]; ok { + gotTag = v + } else { + t.Fatalf("row %d does not contain expected field 'tag' or 'value': %v", i, row) + } + assert.Equal(t, expectedTag, gotTag, "row %d should have tag=%s", i, expectedTag) + } + case <-ctx.Done(): + t.Fatal("测试超时,未收到结果") + } + }) + t.Run("ObjectArray", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT unnest(props) as prop FROM stream" + err := ssql.Execute(sql) + require.NoError(t, err) + + strm := ssql.stream + resultChan := make(chan interface{}, 10) + strm.AddSink(func(result []map[string]interface{}) { + resultChan <- result + }) + + // 输入为对象数组 + input := map[string]interface{}{ + "props": []map[string]interface{}{ + {"k": "x", "v": 1}, + {"k": "y", "v": 2}, + }, + } + strm.Emit(input) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + select { + case raw := <-resultChan: + batch, ok := raw.([]map[string]interface{}) + require.True(t, ok) + + rows := flattenUnnestRows(batch, "prop") + require.Len(t, rows, 2) + + // 校验每一行包含对象内的字段 + assert.Equal(t, "x", firstOf(rows[0], "k", "prop", "k")) + assert.Equal(t, 1, firstOf(rows[0], "v", "prop", "v")) + assert.Equal(t, "y", firstOf(rows[1], "k", "prop", "k")) + assert.Equal(t, 2, firstOf(rows[1], "v", "prop", "v")) + case <-ctx.Done(): + t.Fatal("测试超时,未收到结果") + } + }) + + t.Run("EmptyArray", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT unnest(tags) as tag FROM stream" + err := ssql.Execute(sql) + require.NoError(t, err) + + strm := ssql.stream + resultChan := make(chan interface{}, 10) + strm.AddSink(func(result []map[string]interface{}) { + resultChan <- result + }) + + // 空数组 + input := map[string]interface{}{ + "tags": []string{}, + } + strm.Emit(input) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + select { + case raw := <-resultChan: + batch, ok := raw.([]map[string]interface{}) + require.True(t, ok) + + rows := flattenUnnestRows(batch, "tag") + assert.Len(t, rows, 0) + case <-ctx.Done(): + t.Fatal("测试超时,未收到结果") + } + }) + + t.Run("NilArray", func(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := "SELECT unnest(tags) as tag FROM stream" + err := ssql.Execute(sql) + require.NoError(t, err) + + strm := ssql.stream + resultChan := make(chan interface{}, 10) + strm.AddSink(func(result []map[string]interface{}) { + resultChan <- result + }) + + // nil 值 + input := map[string]interface{}{ + "tags": nil, + } + strm.Emit(input) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + select { + case raw := <-resultChan: + batch, ok := raw.([]map[string]interface{}) + require.True(t, ok) + + rows := flattenUnnestRows(batch, "tag") + assert.Len(t, rows, 0) + case <-ctx.Done(): + t.Fatal("测试超时,未收到结果") + } + }) +} + +// firstOf 辅助从行中读取字段值,兼容 prop 为对象的形态 +// 优先按 top-level 字段取值,若不存在则尝试从嵌套对象(如 prop[k])获取 +func firstOf(row map[string]interface{}, topLevelKey string, nestedObjKey string, nestedField string) interface{} { + if v, ok := row[topLevelKey]; ok { + return v + } + if m, ok := row[nestedObjKey].(map[string]interface{}); ok { + return m[nestedField] + } + return nil +}