From 25289baf7d6227b8733a01f57fba9d2f6e2db2f9 Mon Sep 17 00:00:00 2001 From: rulego-team Date: Sun, 27 Jul 2025 12:51:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E5=A2=9E=E5=8A=A0=E5=90=8C=E6=AD=A5emit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- streamsql.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/streamsql.go b/streamsql.go index 62f716a..1b30a0f 100644 --- a/streamsql.go +++ b/streamsql.go @@ -38,6 +38,9 @@ type Streamsql struct { // 性能配置模式 performanceMode string // "default", "high_performance", "low_latency", "zero_data_loss", "custom" customConfig *types.PerformanceConfig + + // 新增:同步处理模式配置 + enableSyncMode bool // 是否启用同步模式(用于非聚合查询) } // New 创建一个新的StreamSQL实例。 @@ -190,6 +193,64 @@ func (s *Streamsql) Emit(data interface{}) { } } +// EmitSync 同步处理数据,立即返回处理结果。 +// 仅适用于非聚合查询(如过滤、转换等),聚合查询会返回错误。 +// +// 对于非聚合查询,此方法提供同步的数据处理能力,同时: +// 1. 立即返回处理结果(同步) +// 2. 触发已注册的AddSink回调(异步) +// +// 这确保了同步和异步模式的一致性,用户可以同时获得: +// - 立即可用的处理结果 +// - 异步回调处理(用于日志、监控、持久化等) +// +// 参数: +// - data: 要处理的数据 +// +// 返回值: +// - interface{}: 处理后的结果,如果不匹配过滤条件返回nil +// - error: 处理错误,如果是聚合查询会返回错误 +// +// 示例: +// +// // 添加日志回调 +// ssql.AddSink(func(result interface{}) { +// fmt.Printf("异步日志: %v\n", result) +// }) +// +// // 同步处理并立即获取结果 +// result, err := ssql.EmitSync(map[string]interface{}{ +// "temperature": 25.5, +// "humidity": 60.0, +// }) +// if err != nil { +// // 处理错误 +// } else if result != nil { +// // 立即使用处理结果 +// fmt.Printf("同步结果: %v\n", result) +// // 同时异步回调也会被触发 +// } +func (s *Streamsql) EmitSync(data interface{}) (interface{}, error) { + if s.stream == nil { + return nil, fmt.Errorf("stream未初始化") + } + + // 检查是否为非聚合查询 + if s.stream.IsAggregationQuery() { + return nil, fmt.Errorf("同步模式仅支持非聚合查询,聚合查询请使用Emit()方法") + } + + return s.stream.ProcessSync(data) +} + +// IsAggregationQuery 检查当前查询是否为聚合查询 +func (s *Streamsql) IsAggregationQuery() bool { + if s.stream == nil { + return false + } + return s.stream.IsAggregationQuery() +} + // Stream 返回底层的流处理器实例。 // 通过此方法可以访问更底层的流处理功能。 //