mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-16 07:17:25 +00:00
feat:增加打印table到控制台
This commit is contained in:
@@ -0,0 +1,82 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql"
|
||||
)
|
||||
|
||||
// main 演示PrintTable方法的使用
|
||||
func main() {
|
||||
fmt.Println("=== StreamSQL PrintTable 示例 ===")
|
||||
|
||||
// 创建StreamSQL实例
|
||||
ssql := streamsql.New()
|
||||
|
||||
// 示例1: 聚合查询 - 按设备分组统计温度
|
||||
fmt.Println("\n示例1: 聚合查询结果")
|
||||
err := ssql.Execute("SELECT device, AVG(temperature) as avg_temp, MAX(temperature) as max_temp FROM stream GROUP BY device, TumblingWindow('3s')")
|
||||
if err != nil {
|
||||
fmt.Printf("执行SQL失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 使用PrintTable方法以表格形式输出结果
|
||||
ssql.PrintTable()
|
||||
|
||||
// 发送测试数据
|
||||
testData := []map[string]interface{}{
|
||||
{"device": "sensor1", "temperature": 25.5, "timestamp": time.Now()},
|
||||
{"device": "sensor1", "temperature": 26.0, "timestamp": time.Now()},
|
||||
{"device": "sensor2", "temperature": 23.8, "timestamp": time.Now()},
|
||||
{"device": "sensor2", "temperature": 24.2, "timestamp": time.Now()},
|
||||
{"device": "sensor1", "temperature": 27.1, "timestamp": time.Now()},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
ssql.Emit(data)
|
||||
}
|
||||
|
||||
// 等待窗口触发
|
||||
time.Sleep(4 * time.Second)
|
||||
|
||||
// 示例2: 非聚合查询
|
||||
fmt.Println("\n示例2: 非聚合查询结果")
|
||||
ssql2 := streamsql.New()
|
||||
err = ssql2.Execute("SELECT device, temperature, temperature * 1.8 + 32 as fahrenheit FROM stream WHERE temperature > 24")
|
||||
if err != nil {
|
||||
fmt.Printf("执行SQL失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
ssql2.PrintTable()
|
||||
|
||||
// 发送测试数据
|
||||
for _, data := range testData {
|
||||
ssql2.Emit(data)
|
||||
}
|
||||
|
||||
// 等待处理完成
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// 示例3: 对比原始Print方法
|
||||
fmt.Println("\n示例3: 原始Print方法输出对比")
|
||||
ssql3 := streamsql.New()
|
||||
err = ssql3.Execute("SELECT device, COUNT(*) as count FROM stream GROUP BY device, TumblingWindow('2s')")
|
||||
if err != nil {
|
||||
fmt.Printf("执行SQL失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("原始PrintTable方法:")
|
||||
ssql3.PrintTable()
|
||||
|
||||
// 发送数据
|
||||
for i := 0; i < 3; i++ {
|
||||
ssql3.Emit(map[string]interface{}{"device": "test_device", "value": i})
|
||||
}
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
fmt.Println("\n=== 示例结束 ===")
|
||||
}
|
||||
+30
-10
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/rulego/streamsql/rsql"
|
||||
"github.com/rulego/streamsql/stream"
|
||||
"github.com/rulego/streamsql/types"
|
||||
"github.com/rulego/streamsql/utils/table"
|
||||
)
|
||||
|
||||
// Streamsql 是StreamSQL流处理引擎的主要接口。
|
||||
@@ -41,6 +42,9 @@ type Streamsql struct {
|
||||
|
||||
// 新增:同步处理模式配置
|
||||
enableSyncMode bool // 是否启用同步模式(用于非聚合查询)
|
||||
|
||||
// 保存原始SELECT字段顺序,用于表格输出时保持字段顺序
|
||||
fieldOrder []string
|
||||
}
|
||||
|
||||
// New 创建一个新的StreamSQL实例。
|
||||
@@ -125,6 +129,9 @@ func (s *Streamsql) Execute(sql string) error {
|
||||
return fmt.Errorf("SQL解析失败: %w", err)
|
||||
}
|
||||
|
||||
// 从解析结果中获取字段顺序信息
|
||||
s.fieldOrder = config.FieldOrder
|
||||
|
||||
// 根据性能模式创建流处理器
|
||||
var streamInstance *stream.Stream
|
||||
|
||||
@@ -338,24 +345,37 @@ func (s *Streamsql) AddSink(sink func(interface{})) {
|
||||
}
|
||||
}
|
||||
|
||||
// Print 打印结果到控制台。
|
||||
// 这是一个便捷方法,自动添加一个打印结果的sink函数。
|
||||
// PrintTable 以表格形式打印结果到控制台,类似数据库输出格式。
|
||||
// 首先显示列名,然后逐行显示数据。
|
||||
//
|
||||
// 支持的数据格式:
|
||||
// - []map[string]interface{}: 多行记录
|
||||
// - map[string]interface{}: 单行记录
|
||||
// - 其他类型: 直接打印
|
||||
//
|
||||
// 示例:
|
||||
//
|
||||
// // 简单打印结果
|
||||
// ssql.Print()
|
||||
// // 表格式打印结果
|
||||
// ssql.PrintTable()
|
||||
//
|
||||
// // 等价于:
|
||||
// ssql.AddSink(func(result interface{}) {
|
||||
// fmt.Printf("Ressult: %v\n", result)
|
||||
// })
|
||||
func (s *Streamsql) Print() {
|
||||
// // 输出格式:
|
||||
// // +--------+----------+
|
||||
// // | device | max_temp |
|
||||
// // +--------+----------+
|
||||
// // | aa | 30.0 |
|
||||
// // | bb | 22.0 |
|
||||
// // +--------+----------+
|
||||
func (s *Streamsql) PrintTable() {
|
||||
s.AddSink(func(result interface{}) {
|
||||
fmt.Printf("Ressult: %v\n", result)
|
||||
s.printTableFormat(result)
|
||||
})
|
||||
}
|
||||
|
||||
// printTableFormat 格式化打印表格数据
|
||||
func (s *Streamsql) printTableFormat(result interface{}) {
|
||||
table.FormatTableData(result, s.fieldOrder)
|
||||
}
|
||||
|
||||
// ToChannel 返回结果通道,用于异步获取处理结果。
|
||||
// 通过此通道可以以非阻塞方式获取流处理结果。
|
||||
//
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
package streamsql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// TestPrintTable 测试PrintTable方法的基本功能
|
||||
func TestPrintTable(t *testing.T) {
|
||||
// 创建StreamSQL实例并测试PrintTable
|
||||
ssql := New()
|
||||
err := ssql.Execute("SELECT device, AVG(temperature) as avg_temp FROM stream GROUP BY device, TumblingWindow('2s')")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// 使用PrintTable方法(不验证输出内容,只确保不会panic)
|
||||
assert.NotPanics(t, func() {
|
||||
ssql.PrintTable()
|
||||
}, "PrintTable方法不应该panic")
|
||||
|
||||
// 发送测试数据
|
||||
testData := []map[string]interface{}{
|
||||
{"device": "sensor1", "temperature": 25.0},
|
||||
{"device": "sensor2", "temperature": 30.0},
|
||||
}
|
||||
|
||||
for _, data := range testData {
|
||||
ssql.Emit(data)
|
||||
}
|
||||
|
||||
// 等待窗口触发
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
|
||||
// TestPrintTableFormat 测试printTableFormat方法处理不同数据类型
|
||||
func TestPrintTableFormat(t *testing.T) {
|
||||
ssql := New()
|
||||
|
||||
// 测试不同类型的数据,确保不会panic
|
||||
assert.NotPanics(t, func() {
|
||||
// 测试空切片
|
||||
ssql.printTableFormat([]map[string]interface{}{})
|
||||
}, "空切片不应该panic")
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
// 测试单个map
|
||||
ssql.printTableFormat(map[string]interface{}{"key": "value"})
|
||||
}, "单个map不应该panic")
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
// 测试其他类型
|
||||
ssql.printTableFormat("string data")
|
||||
}, "字符串数据不应该panic")
|
||||
}
|
||||
@@ -0,0 +1,150 @@
|
||||
/*
|
||||
* Copyright 2025 The RuleGo Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package table
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// PrintTableFromSlice 从切片数据打印表格
|
||||
// 支持自定义字段顺序,如果fieldOrder为空则使用字母排序
|
||||
func PrintTableFromSlice(data []map[string]interface{}, fieldOrder []string) {
|
||||
if len(data) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 收集所有列名
|
||||
columnSet := make(map[string]bool)
|
||||
for _, row := range data {
|
||||
for col := range row {
|
||||
columnSet[col] = true
|
||||
}
|
||||
}
|
||||
|
||||
// 根据字段顺序排列列名
|
||||
var columns []string
|
||||
if len(fieldOrder) > 0 {
|
||||
// 使用指定的字段顺序
|
||||
for _, field := range fieldOrder {
|
||||
if columnSet[field] {
|
||||
columns = append(columns, field)
|
||||
delete(columnSet, field) // 标记已处理
|
||||
}
|
||||
}
|
||||
// 添加剩余的列(如果有的话)
|
||||
for col := range columnSet {
|
||||
columns = append(columns, col)
|
||||
}
|
||||
} else {
|
||||
// 如果没有指定字段顺序,使用简单排序
|
||||
columns = make([]string, 0, len(columnSet))
|
||||
for col := range columnSet {
|
||||
columns = append(columns, col)
|
||||
}
|
||||
// 简单排序,确保输出一致性
|
||||
for i := 0; i < len(columns)-1; i++ {
|
||||
for j := i + 1; j < len(columns); j++ {
|
||||
if columns[i] > columns[j] {
|
||||
columns[i], columns[j] = columns[j], columns[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 计算每列的最大宽度
|
||||
colWidths := make([]int, len(columns))
|
||||
for i, col := range columns {
|
||||
colWidths[i] = len(col) // 列名长度
|
||||
for _, row := range data {
|
||||
if val, exists := row[col]; exists {
|
||||
valStr := fmt.Sprintf("%v", val)
|
||||
if len(valStr) > colWidths[i] {
|
||||
colWidths[i] = len(valStr)
|
||||
}
|
||||
}
|
||||
}
|
||||
// 最小宽度为4
|
||||
if colWidths[i] < 4 {
|
||||
colWidths[i] = 4
|
||||
}
|
||||
}
|
||||
|
||||
// 打印顶部边框
|
||||
PrintTableBorder(colWidths)
|
||||
|
||||
// 打印列名
|
||||
fmt.Print("|")
|
||||
for i, col := range columns {
|
||||
fmt.Printf(" %-*s |", colWidths[i], col)
|
||||
}
|
||||
fmt.Println()
|
||||
|
||||
// 打印分隔线
|
||||
PrintTableBorder(colWidths)
|
||||
|
||||
// 打印数据行
|
||||
for _, row := range data {
|
||||
fmt.Print("|")
|
||||
for i, col := range columns {
|
||||
val := ""
|
||||
if v, exists := row[col]; exists {
|
||||
val = fmt.Sprintf("%v", v)
|
||||
}
|
||||
fmt.Printf(" %-*s |", colWidths[i], val)
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
// 打印底部边框
|
||||
PrintTableBorder(colWidths)
|
||||
|
||||
// 打印行数统计
|
||||
fmt.Printf("(%d rows)\n", len(data))
|
||||
}
|
||||
|
||||
// PrintTableBorder 打印表格边框
|
||||
func PrintTableBorder(columnWidths []int) {
|
||||
fmt.Print("+")
|
||||
for _, width := range columnWidths {
|
||||
for i := 0; i < width+2; i++ {
|
||||
fmt.Print("-")
|
||||
}
|
||||
fmt.Print("+")
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
// FormatTableData 格式化表格数据,支持多种数据类型
|
||||
func FormatTableData(result interface{}, fieldOrder []string) {
|
||||
switch v := result.(type) {
|
||||
case []map[string]interface{}:
|
||||
if len(v) == 0 {
|
||||
fmt.Println("(0 rows)")
|
||||
return
|
||||
}
|
||||
PrintTableFromSlice(v, fieldOrder)
|
||||
case map[string]interface{}:
|
||||
if len(v) == 0 {
|
||||
fmt.Println("(0 rows)")
|
||||
return
|
||||
}
|
||||
PrintTableFromSlice([]map[string]interface{}{v}, fieldOrder)
|
||||
default:
|
||||
// 对于非表格数据,直接打印
|
||||
fmt.Printf("Result: %v\n", result)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,91 @@
|
||||
/*
|
||||
* Copyright 2025 The RuleGo Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package table
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// TestPrintTableFromSlice 测试表格打印功能
|
||||
func TestPrintTableFromSlice(t *testing.T) {
|
||||
// 测试空数据
|
||||
assert.NotPanics(t, func() {
|
||||
PrintTableFromSlice([]map[string]interface{}{}, nil)
|
||||
}, "空数据不应该panic")
|
||||
|
||||
// 测试正常数据
|
||||
data := []map[string]interface{}{
|
||||
{"name": "Alice", "age": 30, "city": "New York"},
|
||||
{"name": "Bob", "age": 25, "city": "Los Angeles"},
|
||||
}
|
||||
assert.NotPanics(t, func() {
|
||||
PrintTableFromSlice(data, nil)
|
||||
}, "正常数据不应该panic")
|
||||
|
||||
// 测试带字段顺序的数据
|
||||
fieldOrder := []string{"name", "city", "age"}
|
||||
assert.NotPanics(t, func() {
|
||||
PrintTableFromSlice(data, fieldOrder)
|
||||
}, "带字段顺序的数据不应该panic")
|
||||
}
|
||||
|
||||
// TestPrintTableBorder 测试边框打印功能
|
||||
func TestPrintTableBorder(t *testing.T) {
|
||||
// 测试正常宽度
|
||||
assert.NotPanics(t, func() {
|
||||
colWidths := []int{5, 8, 6}
|
||||
PrintTableBorder(colWidths)
|
||||
}, "PrintTableBorder不应该panic")
|
||||
|
||||
// 测试空宽度
|
||||
assert.NotPanics(t, func() {
|
||||
PrintTableBorder([]int{})
|
||||
}, "空宽度数组不应该panic")
|
||||
}
|
||||
|
||||
// TestFormatTableData 测试数据格式化功能
|
||||
func TestFormatTableData(t *testing.T) {
|
||||
// 测试切片数据
|
||||
sliceData := []map[string]interface{}{
|
||||
{"device": "sensor1", "temp": 25.5},
|
||||
}
|
||||
assert.NotPanics(t, func() {
|
||||
FormatTableData(sliceData, nil)
|
||||
}, "切片数据不应该panic")
|
||||
|
||||
// 测试单个map数据
|
||||
mapData := map[string]interface{}{"device": "sensor1", "temp": 25.5}
|
||||
assert.NotPanics(t, func() {
|
||||
FormatTableData(mapData, nil)
|
||||
}, "map数据不应该panic")
|
||||
|
||||
// 测试其他类型数据
|
||||
assert.NotPanics(t, func() {
|
||||
FormatTableData("string data", nil)
|
||||
}, "字符串数据不应该panic")
|
||||
|
||||
// 测试空数据
|
||||
assert.NotPanics(t, func() {
|
||||
FormatTableData([]map[string]interface{}{}, nil)
|
||||
}, "空切片数据不应该panic")
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
FormatTableData(map[string]interface{}{}, nil)
|
||||
}, "空map数据不应该panic")
|
||||
}
|
||||
Reference in New Issue
Block a user