diff --git a/examples/table_print_demo/main.go b/examples/table_print_demo/main.go new file mode 100644 index 0000000..05e174a --- /dev/null +++ b/examples/table_print_demo/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "fmt" + "time" + + "github.com/rulego/streamsql" +) + +// main 演示PrintTable方法的使用 +func main() { + fmt.Println("=== StreamSQL PrintTable 示例 ===") + + // 创建StreamSQL实例 + ssql := streamsql.New() + + // 示例1: 聚合查询 - 按设备分组统计温度 + fmt.Println("\n示例1: 聚合查询结果") + err := ssql.Execute("SELECT device, AVG(temperature) as avg_temp, MAX(temperature) as max_temp FROM stream GROUP BY device, TumblingWindow('3s')") + if err != nil { + fmt.Printf("执行SQL失败: %v\n", err) + return + } + + // 使用PrintTable方法以表格形式输出结果 + ssql.PrintTable() + + // 发送测试数据 + testData := []map[string]interface{}{ + {"device": "sensor1", "temperature": 25.5, "timestamp": time.Now()}, + {"device": "sensor1", "temperature": 26.0, "timestamp": time.Now()}, + {"device": "sensor2", "temperature": 23.8, "timestamp": time.Now()}, + {"device": "sensor2", "temperature": 24.2, "timestamp": time.Now()}, + {"device": "sensor1", "temperature": 27.1, "timestamp": time.Now()}, + } + + for _, data := range testData { + ssql.Emit(data) + } + + // 等待窗口触发 + time.Sleep(4 * time.Second) + + // 示例2: 非聚合查询 + fmt.Println("\n示例2: 非聚合查询结果") + ssql2 := streamsql.New() + err = ssql2.Execute("SELECT device, temperature, temperature * 1.8 + 32 as fahrenheit FROM stream WHERE temperature > 24") + if err != nil { + fmt.Printf("执行SQL失败: %v\n", err) + return + } + + ssql2.PrintTable() + + // 发送测试数据 + for _, data := range testData { + ssql2.Emit(data) + } + + // 等待处理完成 + time.Sleep(1 * time.Second) + + // 示例3: 对比原始Print方法 + fmt.Println("\n示例3: 原始Print方法输出对比") + ssql3 := streamsql.New() + err = ssql3.Execute("SELECT device, COUNT(*) as count FROM stream GROUP BY device, TumblingWindow('2s')") + if err != nil { + fmt.Printf("执行SQL失败: %v\n", err) + return + } + + fmt.Println("原始PrintTable方法:") + ssql3.PrintTable() + + // 发送数据 + for i := 0; i < 3; i++ { + ssql3.Emit(map[string]interface{}{"device": "test_device", "value": i}) + } + + time.Sleep(3 * time.Second) + fmt.Println("\n=== 示例结束 ===") +} \ No newline at end of file diff --git a/streamsql.go b/streamsql.go index 1b30a0f..ee9a378 100644 --- a/streamsql.go +++ b/streamsql.go @@ -22,6 +22,7 @@ import ( "github.com/rulego/streamsql/rsql" "github.com/rulego/streamsql/stream" "github.com/rulego/streamsql/types" + "github.com/rulego/streamsql/utils/table" ) // Streamsql 是StreamSQL流处理引擎的主要接口。 @@ -41,6 +42,9 @@ type Streamsql struct { // 新增:同步处理模式配置 enableSyncMode bool // 是否启用同步模式(用于非聚合查询) + + // 保存原始SELECT字段顺序,用于表格输出时保持字段顺序 + fieldOrder []string } // New 创建一个新的StreamSQL实例。 @@ -125,6 +129,9 @@ func (s *Streamsql) Execute(sql string) error { return fmt.Errorf("SQL解析失败: %w", err) } + // 从解析结果中获取字段顺序信息 + s.fieldOrder = config.FieldOrder + // 根据性能模式创建流处理器 var streamInstance *stream.Stream @@ -338,24 +345,37 @@ func (s *Streamsql) AddSink(sink func(interface{})) { } } -// Print 打印结果到控制台。 -// 这是一个便捷方法,自动添加一个打印结果的sink函数。 +// PrintTable 以表格形式打印结果到控制台,类似数据库输出格式。 +// 首先显示列名,然后逐行显示数据。 +// +// 支持的数据格式: +// - []map[string]interface{}: 多行记录 +// - map[string]interface{}: 单行记录 +// - 其他类型: 直接打印 // // 示例: // -// // 简单打印结果 -// ssql.Print() +// // 表格式打印结果 +// ssql.PrintTable() // -// // 等价于: -// ssql.AddSink(func(result interface{}) { -// fmt.Printf("Ressult: %v\n", result) -// }) -func (s *Streamsql) Print() { +// // 输出格式: +// // +--------+----------+ +// // | device | max_temp | +// // +--------+----------+ +// // | aa | 30.0 | +// // | bb | 22.0 | +// // +--------+----------+ +func (s *Streamsql) PrintTable() { s.AddSink(func(result interface{}) { - fmt.Printf("Ressult: %v\n", result) + s.printTableFormat(result) }) } +// printTableFormat 格式化打印表格数据 +func (s *Streamsql) printTableFormat(result interface{}) { + table.FormatTableData(result, s.fieldOrder) +} + // ToChannel 返回结果通道,用于异步获取处理结果。 // 通过此通道可以以非阻塞方式获取流处理结果。 // diff --git a/streamsql_table_print_test.go b/streamsql_table_print_test.go new file mode 100644 index 0000000..6aaf0d1 --- /dev/null +++ b/streamsql_table_print_test.go @@ -0,0 +1,55 @@ +package streamsql + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// TestPrintTable 测试PrintTable方法的基本功能 +func TestPrintTable(t *testing.T) { + // 创建StreamSQL实例并测试PrintTable + ssql := New() + err := ssql.Execute("SELECT device, AVG(temperature) as avg_temp FROM stream GROUP BY device, TumblingWindow('2s')") + assert.NoError(t, err) + + // 使用PrintTable方法(不验证输出内容,只确保不会panic) + assert.NotPanics(t, func() { + ssql.PrintTable() + }, "PrintTable方法不应该panic") + + // 发送测试数据 + testData := []map[string]interface{}{ + {"device": "sensor1", "temperature": 25.0}, + {"device": "sensor2", "temperature": 30.0}, + } + + for _, data := range testData { + ssql.Emit(data) + } + + // 等待窗口触发 + time.Sleep(3 * time.Second) +} + +// TestPrintTableFormat 测试printTableFormat方法处理不同数据类型 +func TestPrintTableFormat(t *testing.T) { + ssql := New() + + // 测试不同类型的数据,确保不会panic + assert.NotPanics(t, func() { + // 测试空切片 + ssql.printTableFormat([]map[string]interface{}{}) + }, "空切片不应该panic") + + assert.NotPanics(t, func() { + // 测试单个map + ssql.printTableFormat(map[string]interface{}{"key": "value"}) + }, "单个map不应该panic") + + assert.NotPanics(t, func() { + // 测试其他类型 + ssql.printTableFormat("string data") + }, "字符串数据不应该panic") +} \ No newline at end of file diff --git a/utils/table/table.go b/utils/table/table.go new file mode 100644 index 0000000..7c7334a --- /dev/null +++ b/utils/table/table.go @@ -0,0 +1,150 @@ +/* + * Copyright 2025 The RuleGo Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package table + +import ( + "fmt" +) + +// PrintTableFromSlice 从切片数据打印表格 +// 支持自定义字段顺序,如果fieldOrder为空则使用字母排序 +func PrintTableFromSlice(data []map[string]interface{}, fieldOrder []string) { + if len(data) == 0 { + return + } + + // 收集所有列名 + columnSet := make(map[string]bool) + for _, row := range data { + for col := range row { + columnSet[col] = true + } + } + + // 根据字段顺序排列列名 + var columns []string + if len(fieldOrder) > 0 { + // 使用指定的字段顺序 + for _, field := range fieldOrder { + if columnSet[field] { + columns = append(columns, field) + delete(columnSet, field) // 标记已处理 + } + } + // 添加剩余的列(如果有的话) + for col := range columnSet { + columns = append(columns, col) + } + } else { + // 如果没有指定字段顺序,使用简单排序 + columns = make([]string, 0, len(columnSet)) + for col := range columnSet { + columns = append(columns, col) + } + // 简单排序,确保输出一致性 + for i := 0; i < len(columns)-1; i++ { + for j := i + 1; j < len(columns); j++ { + if columns[i] > columns[j] { + columns[i], columns[j] = columns[j], columns[i] + } + } + } + } + + // 计算每列的最大宽度 + colWidths := make([]int, len(columns)) + for i, col := range columns { + colWidths[i] = len(col) // 列名长度 + for _, row := range data { + if val, exists := row[col]; exists { + valStr := fmt.Sprintf("%v", val) + if len(valStr) > colWidths[i] { + colWidths[i] = len(valStr) + } + } + } + // 最小宽度为4 + if colWidths[i] < 4 { + colWidths[i] = 4 + } + } + + // 打印顶部边框 + PrintTableBorder(colWidths) + + // 打印列名 + fmt.Print("|") + for i, col := range columns { + fmt.Printf(" %-*s |", colWidths[i], col) + } + fmt.Println() + + // 打印分隔线 + PrintTableBorder(colWidths) + + // 打印数据行 + for _, row := range data { + fmt.Print("|") + for i, col := range columns { + val := "" + if v, exists := row[col]; exists { + val = fmt.Sprintf("%v", v) + } + fmt.Printf(" %-*s |", colWidths[i], val) + } + fmt.Println() + } + + // 打印底部边框 + PrintTableBorder(colWidths) + + // 打印行数统计 + fmt.Printf("(%d rows)\n", len(data)) +} + +// PrintTableBorder 打印表格边框 +func PrintTableBorder(columnWidths []int) { + fmt.Print("+") + for _, width := range columnWidths { + for i := 0; i < width+2; i++ { + fmt.Print("-") + } + fmt.Print("+") + } + fmt.Println() +} + +// FormatTableData 格式化表格数据,支持多种数据类型 +func FormatTableData(result interface{}, fieldOrder []string) { + switch v := result.(type) { + case []map[string]interface{}: + if len(v) == 0 { + fmt.Println("(0 rows)") + return + } + PrintTableFromSlice(v, fieldOrder) + case map[string]interface{}: + if len(v) == 0 { + fmt.Println("(0 rows)") + return + } + PrintTableFromSlice([]map[string]interface{}{v}, fieldOrder) + default: + // 对于非表格数据,直接打印 + fmt.Printf("Result: %v\n", result) + } +} \ No newline at end of file diff --git a/utils/table/table_test.go b/utils/table/table_test.go new file mode 100644 index 0000000..21b5851 --- /dev/null +++ b/utils/table/table_test.go @@ -0,0 +1,91 @@ +/* + * Copyright 2025 The RuleGo Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package table + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestPrintTableFromSlice 测试表格打印功能 +func TestPrintTableFromSlice(t *testing.T) { + // 测试空数据 + assert.NotPanics(t, func() { + PrintTableFromSlice([]map[string]interface{}{}, nil) + }, "空数据不应该panic") + + // 测试正常数据 + data := []map[string]interface{}{ + {"name": "Alice", "age": 30, "city": "New York"}, + {"name": "Bob", "age": 25, "city": "Los Angeles"}, + } + assert.NotPanics(t, func() { + PrintTableFromSlice(data, nil) + }, "正常数据不应该panic") + + // 测试带字段顺序的数据 + fieldOrder := []string{"name", "city", "age"} + assert.NotPanics(t, func() { + PrintTableFromSlice(data, fieldOrder) + }, "带字段顺序的数据不应该panic") +} + +// TestPrintTableBorder 测试边框打印功能 +func TestPrintTableBorder(t *testing.T) { + // 测试正常宽度 + assert.NotPanics(t, func() { + colWidths := []int{5, 8, 6} + PrintTableBorder(colWidths) + }, "PrintTableBorder不应该panic") + + // 测试空宽度 + assert.NotPanics(t, func() { + PrintTableBorder([]int{}) + }, "空宽度数组不应该panic") +} + +// TestFormatTableData 测试数据格式化功能 +func TestFormatTableData(t *testing.T) { + // 测试切片数据 + sliceData := []map[string]interface{}{ + {"device": "sensor1", "temp": 25.5}, + } + assert.NotPanics(t, func() { + FormatTableData(sliceData, nil) + }, "切片数据不应该panic") + + // 测试单个map数据 + mapData := map[string]interface{}{"device": "sensor1", "temp": 25.5} + assert.NotPanics(t, func() { + FormatTableData(mapData, nil) + }, "map数据不应该panic") + + // 测试其他类型数据 + assert.NotPanics(t, func() { + FormatTableData("string data", nil) + }, "字符串数据不应该panic") + + // 测试空数据 + assert.NotPanics(t, func() { + FormatTableData([]map[string]interface{}{}, nil) + }, "空切片数据不应该panic") + + assert.NotPanics(t, func() { + FormatTableData(map[string]interface{}{}, nil) + }, "空map数据不应该panic") +} \ No newline at end of file