feat:优化stream停止

This commit is contained in:
rulego-team
2025-11-14 09:59:25 +08:00
parent 6f77dc5f7f
commit 14d9a0874b
11 changed files with 863 additions and 728 deletions
+9
View File
@@ -42,7 +42,16 @@ func (s *Stream) safeGetDataChan() chan map[string]interface{} {
// safeSendToDataChan safely sends data to dataChan
func (s *Stream) safeSendToDataChan(data map[string]interface{}) bool {
// Check if stream is stopped before attempting to send
if atomic.LoadInt32(&s.stopped) == 1 {
return false
}
dataChan := s.safeGetDataChan()
if dataChan == nil {
return false
}
select {
case dataChan <- data:
return true
+13 -2
View File
@@ -305,8 +305,19 @@ func (dp *DataProcessor) startWindowProcessing() {
}
}()
for batch := range dp.stream.Window.OutputChan() {
dp.processWindowBatch(batch)
outputChan := dp.stream.Window.OutputChan()
for {
select {
case batch, ok := <-outputChan:
if !ok {
// Channel closed, exit
return
}
dp.processWindowBatch(batch)
case <-dp.stream.done:
// Stream stopped, exit
return
}
}
}()
}
+38 -4
View File
@@ -66,11 +66,23 @@ func NewBlockingStrategy() *BlockingStrategy {
// ProcessData implements blocking mode data processing
func (bs *BlockingStrategy) ProcessData(data map[string]interface{}) {
// Check if stream is stopped
if atomic.LoadInt32(&bs.stream.stopped) == 1 {
return
}
if bs.stream.blockingTimeout <= 0 {
// No timeout limit, block permanently until success
dataChan := bs.stream.safeGetDataChan()
dataChan <- data
return
if dataChan == nil {
return
}
select {
case dataChan <- data:
return
case <-bs.stream.done:
return
}
}
// Blocking with timeout
@@ -78,6 +90,10 @@ func (bs *BlockingStrategy) ProcessData(data map[string]interface{}) {
defer timer.Stop()
dataChan := bs.stream.safeGetDataChan()
if dataChan == nil {
return
}
select {
case dataChan <- data:
// Successfully added data
@@ -87,7 +103,17 @@ func (bs *BlockingStrategy) ProcessData(data map[string]interface{}) {
logger.Error("Data addition timeout, but continue waiting to avoid data loss")
// Continue blocking indefinitely, re-get current channel reference
finalDataChan := bs.stream.safeGetDataChan()
finalDataChan <- data
if finalDataChan == nil {
return
}
select {
case finalDataChan <- data:
return
case <-bs.stream.done:
return
}
case <-bs.stream.done:
return
}
}
@@ -135,7 +161,15 @@ func (es *ExpansionStrategy) ProcessData(data map[string]interface{}) {
// If still full after expansion, block and wait
dataChan := es.stream.safeGetDataChan()
dataChan <- data
if dataChan == nil {
return
}
select {
case dataChan <- data:
return
case <-es.stream.done:
return
}
}
// GetStrategyName gets strategy name
+13
View File
@@ -220,6 +220,19 @@ func (s *Stream) Stop() {
close(s.done)
// Stop window operations first to prevent new window triggers
if s.Window != nil {
s.Window.Stop()
}
// Close dataChan to signal DataProcessor to exit
s.dataChanMux.Lock()
if s.dataChan != nil {
close(s.dataChan)
s.dataChan = nil // Set to nil to prevent sending to closed channel
}
s.dataChanMux.Unlock()
// Stop and clean up data processing strategy resources
if s.dataStrategy != nil {
if err := s.dataStrategy.Stop(); err != nil {
File diff suppressed because it is too large Load Diff
+10
View File
@@ -97,6 +97,16 @@ func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error) {
}
func (cw *CountingWindow) Add(data interface{}) {
// Check if window is stopped before adding data
cw.mu.Lock()
stopped := cw.stopped
cw.mu.Unlock()
if stopped {
// Window is stopped, ignore the data
return
}
t := GetTimestamp(data, cw.config.TsProp, cw.config.TimeUnit)
row := types.Row{
Data: data,
+1
View File
@@ -47,6 +47,7 @@ All window types implement a unified Window interface:
Add(row types.Row) error // Add data to window
Reset() error // Reset window state
Start() error // Start window processing
Stop() // Stop window operations and clean up resources
OutputChan() <-chan []types.Row // Get output channel
SetCallback(func([]types.Row)) // Set callback function
Trigger() error // Manual trigger
+1
View File
@@ -38,6 +38,7 @@ type Window interface {
//GetResults() []interface{}
Reset()
Start()
Stop() // Stop window operations and clean up resources
OutputChan() <-chan []types.Row
SetCallback(callback func([]types.Row))
Trigger()
+20 -1
View File
@@ -108,7 +108,13 @@ func (sw *SessionWindow) Add(data interface{}) {
defer sw.mu.Unlock()
if !sw.initialized {
close(sw.initChan)
// Safely close initChan to avoid closing an already closed channel
select {
case <-sw.initChan:
// Already closed, do nothing
default:
close(sw.initChan)
}
sw.initialized = true
}
@@ -209,6 +215,19 @@ func (sw *SessionWindow) Stop() {
sw.ticker.Stop()
}
sw.tickerMu.Unlock()
// Ensure initChan is closed if it hasn't been closed yet
// This prevents Start() goroutine from blocking on initChan
sw.mu.Lock()
if !sw.initialized && sw.initChan != nil {
select {
case <-sw.initChan:
// Already closed, do nothing
default:
close(sw.initChan)
}
}
sw.mu.Unlock()
}
// checkExpiredSessions checks and triggers expired sessions
+20 -1
View File
@@ -131,7 +131,13 @@ func (sw *SlidingWindow) Add(data interface{}) {
sw.firstWindowStartTime = time.Now()
// Don't start timer here, wait for first window to end
// Send initialization complete signal
close(sw.initChan)
// Safely close initChan to avoid closing an already closed channel
select {
case <-sw.initChan:
// Already closed, do nothing
default:
close(sw.initChan)
}
sw.initialized = true
}
row := types.Row{
@@ -249,6 +255,19 @@ func (sw *SlidingWindow) Stop() {
sw.timer.Stop()
}
sw.timerMu.Unlock()
// Ensure initChan is closed if it hasn't been closed yet
// This prevents Start() goroutine from blocking on initChan
sw.mu.Lock()
if !sw.initialized && sw.initChan != nil {
select {
case <-sw.initChan:
// Already closed, do nothing
default:
close(sw.initChan)
}
}
sw.mu.Unlock()
}
// Trigger triggers the sliding window to process data within the window
+20 -1
View File
@@ -108,7 +108,13 @@ func (tw *TumblingWindow) Add(data interface{}) {
tw.timerMu.Unlock()
tw.initialized = true
// Send initialization complete signal (after setting timer)
close(tw.initChan)
// Safely close initChan to avoid closing an already closed channel
select {
case <-tw.initChan:
// Already closed, do nothing
default:
close(tw.initChan)
}
}
row := types.Row{
Data: data,
@@ -145,6 +151,19 @@ func (tw *TumblingWindow) Stop() {
tw.timer.Stop()
}
tw.timerMu.Unlock()
// Ensure initChan is closed if it hasn't been closed yet
// This prevents Start() goroutine from blocking on initChan
tw.mu.Lock()
if !tw.initialized && tw.initChan != nil {
select {
case <-tw.initChan:
// Already closed, do nothing
default:
close(tw.initChan)
}
}
tw.mu.Unlock()
}
// Start starts the tumbling window's periodic trigger mechanism