diff --git a/stream/handler_result.go b/stream/handler_result.go index 693ac65..2a9d9b4 100644 --- a/stream/handler_result.go +++ b/stream/handler_result.go @@ -141,7 +141,7 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) { s.sinksMux.RLock() defer s.sinksMux.RUnlock() - if len(s.sinks) == 0 { + if len(s.sinks) == 0 && len(s.syncSinks) == 0 { return } @@ -150,6 +150,19 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) { for _, sink := range s.sinks { s.submitSinkTask(sink, results) } + + // Execute synchronous sinks (blocking, sequential) + for _, sink := range s.syncSinks { + // Recover panic for each sync sink to prevent crashing the stream + func() { + defer func() { + if r := recover(); r != nil { + logger.Error("Sync sink execution exception: %v", r) + } + }() + sink(results) + }() + } } // submitSinkTask submits sink task @@ -169,24 +182,41 @@ func (s *Stream) submitSinkTask(sink func([]map[string]interface{}), results []m } // Non-blocking task submission + // Note: Since we use a worker pool, tasks may be executed out of order select { case s.sinkWorkerPool <- task: // Successfully submitted task default: // Worker pool is full, execute directly in current goroutine (degraded handling) - go task() + // This also helps with backpressure + task() } } // AddSink adds a sink function // Parameters: // - sink: result processing function that receives []map[string]interface{} type result data +// +// Note: Sinks are executed asynchronously in a worker pool, so execution order is NOT guaranteed. +// If you need strict ordering, use GetResultsChan() instead. func (s *Stream) AddSink(sink func([]map[string]interface{})) { s.sinksMux.Lock() defer s.sinksMux.Unlock() s.sinks = append(s.sinks, sink) } +// AddSyncSink adds a synchronous sink function +// Parameters: +// - sink: result processing function that receives []map[string]interface{} type result data +// +// Note: Sync sinks are executed sequentially in the result processing goroutine. +// They block subsequent processing, so they should be fast. +func (s *Stream) AddSyncSink(sink func([]map[string]interface{})) { + s.sinksMux.Lock() + defer s.sinksMux.Unlock() + s.syncSinks = append(s.syncSinks, sink) +} + // GetResultsChan gets the result channel func (s *Stream) GetResultsChan() <-chan []map[string]interface{} { return s.resultChan diff --git a/stream/stream.go b/stream/stream.go index 9114ca0..191c1d2 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -61,7 +61,8 @@ type Stream struct { aggregator aggregator.Aggregator config types.Config sinks []func([]map[string]interface{}) - resultChan chan []map[string]interface{} // Result channel + syncSinks []func([]map[string]interface{}) // Synchronous sinks, executed sequentially + resultChan chan []map[string]interface{} // Result channel seenResults *sync.Map done chan struct{} // Used to close processing goroutines sinkWorkerPool chan func() // Sink worker pool to avoid blocking diff --git a/stream/stream_factory.go b/stream/stream_factory.go index 9558c75..208ff96 100644 --- a/stream/stream_factory.go +++ b/stream/stream_factory.go @@ -176,5 +176,4 @@ func (sf *StreamFactory) validatePerformanceConfig(config types.PerformanceConfi // startWorkerRoutines starts worker goroutines func (sf *StreamFactory) startWorkerRoutines(stream *Stream, perfConfig types.PerformanceConfig) { go stream.startSinkWorkerPool(perfConfig.WorkerConfig.SinkWorkerCount) - go stream.startResultConsumer() } diff --git a/streamsql.go b/streamsql.go index a96afc4..f64d4e7 100644 --- a/streamsql.go +++ b/streamsql.go @@ -338,6 +338,20 @@ func (s *Streamsql) AddSink(sink func([]map[string]interface{})) { } } +// AddSyncSink directly adds synchronous result processing callback functions. +// Convenience wrapper for Stream().AddSyncSink() for cleaner API calls. +// +// Parameters: +// - sink: Result processing function, receives []map[string]interface{} type result data +// +// Note: Sync sinks are executed sequentially in the result processing goroutine. +// Use this when order of execution matters. +func (s *Streamsql) AddSyncSink(sink func([]map[string]interface{})) { + if s.stream != nil { + s.stream.AddSyncSink(sink) + } +} + // PrintTable prints results to console in table format, similar to database output. // Displays column names first, then data rows. //