From b405a935b7eab656dff03573e56a7c6b239accdc Mon Sep 17 00:00:00 2001 From: rulego-team Date: Sat, 20 Dec 2025 11:10:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=90=8C=E6=AD=A5=E7=9A=84?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E7=BB=93=E6=9E=9C=E7=9A=84=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stream/handler_result.go | 34 ++++++++++++++++++++++++++++++++-- stream/stream.go | 3 ++- stream/stream_factory.go | 1 - streamsql.go | 14 ++++++++++++++ 4 files changed, 48 insertions(+), 4 deletions(-) diff --git a/stream/handler_result.go b/stream/handler_result.go index 693ac65..2a9d9b4 100644 --- a/stream/handler_result.go +++ b/stream/handler_result.go @@ -141,7 +141,7 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) { s.sinksMux.RLock() defer s.sinksMux.RUnlock() - if len(s.sinks) == 0 { + if len(s.sinks) == 0 && len(s.syncSinks) == 0 { return } @@ -150,6 +150,19 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) { for _, sink := range s.sinks { s.submitSinkTask(sink, results) } + + // Execute synchronous sinks (blocking, sequential) + for _, sink := range s.syncSinks { + // Recover panic for each sync sink to prevent crashing the stream + func() { + defer func() { + if r := recover(); r != nil { + logger.Error("Sync sink execution exception: %v", r) + } + }() + sink(results) + }() + } } // submitSinkTask submits sink task @@ -169,24 +182,41 @@ func (s *Stream) submitSinkTask(sink func([]map[string]interface{}), results []m } // Non-blocking task submission + // Note: Since we use a worker pool, tasks may be executed out of order select { case s.sinkWorkerPool <- task: // Successfully submitted task default: // Worker pool is full, execute directly in current goroutine (degraded handling) - go task() + // This also helps with backpressure + task() } } // AddSink adds a sink function // Parameters: // - sink: result processing function that receives []map[string]interface{} type result data +// +// Note: Sinks are executed asynchronously in a worker pool, so execution order is NOT guaranteed. +// If you need strict ordering, use GetResultsChan() instead. func (s *Stream) AddSink(sink func([]map[string]interface{})) { s.sinksMux.Lock() defer s.sinksMux.Unlock() s.sinks = append(s.sinks, sink) } +// AddSyncSink adds a synchronous sink function +// Parameters: +// - sink: result processing function that receives []map[string]interface{} type result data +// +// Note: Sync sinks are executed sequentially in the result processing goroutine. +// They block subsequent processing, so they should be fast. +func (s *Stream) AddSyncSink(sink func([]map[string]interface{})) { + s.sinksMux.Lock() + defer s.sinksMux.Unlock() + s.syncSinks = append(s.syncSinks, sink) +} + // GetResultsChan gets the result channel func (s *Stream) GetResultsChan() <-chan []map[string]interface{} { return s.resultChan diff --git a/stream/stream.go b/stream/stream.go index 9114ca0..191c1d2 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -61,7 +61,8 @@ type Stream struct { aggregator aggregator.Aggregator config types.Config sinks []func([]map[string]interface{}) - resultChan chan []map[string]interface{} // Result channel + syncSinks []func([]map[string]interface{}) // Synchronous sinks, executed sequentially + resultChan chan []map[string]interface{} // Result channel seenResults *sync.Map done chan struct{} // Used to close processing goroutines sinkWorkerPool chan func() // Sink worker pool to avoid blocking diff --git a/stream/stream_factory.go b/stream/stream_factory.go index 9558c75..208ff96 100644 --- a/stream/stream_factory.go +++ b/stream/stream_factory.go @@ -176,5 +176,4 @@ func (sf *StreamFactory) validatePerformanceConfig(config types.PerformanceConfi // startWorkerRoutines starts worker goroutines func (sf *StreamFactory) startWorkerRoutines(stream *Stream, perfConfig types.PerformanceConfig) { go stream.startSinkWorkerPool(perfConfig.WorkerConfig.SinkWorkerCount) - go stream.startResultConsumer() } diff --git a/streamsql.go b/streamsql.go index a96afc4..f64d4e7 100644 --- a/streamsql.go +++ b/streamsql.go @@ -338,6 +338,20 @@ func (s *Streamsql) AddSink(sink func([]map[string]interface{})) { } } +// AddSyncSink directly adds synchronous result processing callback functions. +// Convenience wrapper for Stream().AddSyncSink() for cleaner API calls. +// +// Parameters: +// - sink: Result processing function, receives []map[string]interface{} type result data +// +// Note: Sync sinks are executed sequentially in the result processing goroutine. +// Use this when order of execution matters. +func (s *Streamsql) AddSyncSink(sink func([]map[string]interface{})) { + if s.stream != nil { + s.stream.AddSyncSink(sink) + } +} + // PrintTable prints results to console in table format, similar to database output. // Displays column names first, then data rows. //