feat:增加同步emit

This commit is contained in:
rulego-team
2025-07-27 12:51:55 +08:00
parent 844e311716
commit 25289baf7d
+61
View File
@@ -38,6 +38,9 @@ type Streamsql struct {
// 性能配置模式
performanceMode string // "default", "high_performance", "low_latency", "zero_data_loss", "custom"
customConfig *types.PerformanceConfig
// 新增:同步处理模式配置
enableSyncMode bool // 是否启用同步模式(用于非聚合查询)
}
// New 创建一个新的StreamSQL实例。
@@ -190,6 +193,64 @@ func (s *Streamsql) Emit(data interface{}) {
}
}
// EmitSync 同步处理数据,立即返回处理结果。
// 仅适用于非聚合查询(如过滤、转换等),聚合查询会返回错误。
//
// 对于非聚合查询,此方法提供同步的数据处理能力,同时:
// 1. 立即返回处理结果(同步)
// 2. 触发已注册的AddSink回调(异步)
//
// 这确保了同步和异步模式的一致性,用户可以同时获得:
// - 立即可用的处理结果
// - 异步回调处理(用于日志、监控、持久化等)
//
// 参数:
// - data: 要处理的数据
//
// 返回值:
// - interface{}: 处理后的结果,如果不匹配过滤条件返回nil
// - error: 处理错误,如果是聚合查询会返回错误
//
// 示例:
//
// // 添加日志回调
// ssql.AddSink(func(result interface{}) {
// fmt.Printf("异步日志: %v\n", result)
// })
//
// // 同步处理并立即获取结果
// result, err := ssql.EmitSync(map[string]interface{}{
// "temperature": 25.5,
// "humidity": 60.0,
// })
// if err != nil {
// // 处理错误
// } else if result != nil {
// // 立即使用处理结果
// fmt.Printf("同步结果: %v\n", result)
// // 同时异步回调也会被触发
// }
func (s *Streamsql) EmitSync(data interface{}) (interface{}, error) {
if s.stream == nil {
return nil, fmt.Errorf("stream未初始化")
}
// 检查是否为非聚合查询
if s.stream.IsAggregationQuery() {
return nil, fmt.Errorf("同步模式仅支持非聚合查询,聚合查询请使用Emit()方法")
}
return s.stream.ProcessSync(data)
}
// IsAggregationQuery 检查当前查询是否为聚合查询
func (s *Streamsql) IsAggregationQuery() bool {
if s.stream == nil {
return false
}
return s.stream.IsAggregationQuery()
}
// Stream 返回底层的流处理器实例。
// 通过此方法可以访问更底层的流处理功能。
//