From c86d22e06ac0a011ff900e5e726c5736e2176fbc Mon Sep 17 00:00:00 2001 From: rulego-team Date: Thu, 13 Nov 2025 11:01:13 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E7=AA=97=E5=8F=A3?= =?UTF-8?q?=E5=AF=B9=E9=BD=90=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- window/sliding_window.go | 99 +++++++++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 31 deletions(-) diff --git a/window/sliding_window.go b/window/sliding_window.go index 67ae00a..fcce7fd 100644 --- a/window/sliding_window.go +++ b/window/sliding_window.go @@ -73,25 +73,36 @@ type SlidingWindow struct { // NewSlidingWindow creates a new sliding window instance // size parameter represents the total window size, slide represents the sliding interval func NewSlidingWindow(config types.WindowConfig) (*SlidingWindow, error) { - // Create a cancellable context - ctx, cancel := context.WithCancel(context.Background()) - size, err := cast.ToDurationE(config.Params["size"]) + // Get size parameter from params array + if len(config.Params) < 1 { + return nil, fmt.Errorf("sliding window requires at least 'size' parameter") + } + + sizeVal := config.Params[0] + size, err := cast.ToDurationE(sizeVal) if err != nil { return nil, fmt.Errorf("invalid size for sliding window: %v", err) } - slide, err := cast.ToDurationE(config.Params["slide"]) + + // Get slide parameter from params array + if len(config.Params) < 2 { + return nil, fmt.Errorf("sliding window requires 'slide' parameter") + } + + slideVal := config.Params[1] + slide, err := cast.ToDurationE(slideVal) if err != nil { return nil, fmt.Errorf("invalid slide for sliding window: %v", err) } // Use unified performance config to get window output buffer size bufferSize := 1000 // Default value - if perfConfig, exists := config.Params["performanceConfig"]; exists { - if pc, ok := perfConfig.(types.PerformanceConfig); ok { - bufferSize = pc.BufferConfig.WindowOutputSize - } + if (config.PerformanceConfig != types.PerformanceConfig{}) { + bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize } + // Create a cancellable context + ctx, cancel := context.WithCancel(context.Background()) return &SlidingWindow{ config: config, size: size, @@ -195,29 +206,24 @@ func (sw *SlidingWindow) Stop() { func (sw *SlidingWindow) Trigger() { // Lock to ensure thread safety sw.mu.Lock() - defer sw.mu.Unlock() // Return directly if no data in window if len(sw.data) == 0 { + sw.mu.Unlock() return } if !sw.initialized { + sw.mu.Unlock() return } - // Calculate cutoff time (current time minus window size) + // Calculate next slot for sliding window next := sw.NextSlot() - // Retain data for next window - tms := next.Start.Add(-sw.size) - tme := next.End.Add(sw.size) - temp := types.NewTimeSlot(&tms, &tme) - newData := make([]types.Row, 0) - for _, item := range sw.data { - if temp.Contains(item.Timestamp) { - newData = append(newData, item) - } + if next == nil { + sw.mu.Unlock() + return } - // Extract Data fields to form []interface{} type data + // Extract Data fields to form []interface{} type data for current window resultData := make([]types.Row, 0) for _, item := range sw.data { if sw.currentSlot.Contains(item.Timestamp) { @@ -226,24 +232,55 @@ func (sw *SlidingWindow) Trigger() { } } - // Execute callback function if set - if sw.callback != nil { - sw.callback(resultData) + // Retain data that could be in future windows + // For sliding windows, we need to keep data that falls within: + // - Current window end + size (for overlapping windows) + // - Next window end + size (for future windows) + // Actually, we should keep all data that could be in any future window + // The latest window that could contain a data point is: next.End + size + cutoffTime := next.End.Add(sw.size) + newData := make([]types.Row, 0) + for _, item := range sw.data { + // Keep data that could be in future windows (before cutoffTime) + if item.Timestamp.Before(cutoffTime) { + newData = append(newData, item) + } } // Update window data sw.data = newData sw.currentSlot = next - // Non-blocking send to output channel and update statistics (within lock) + // Get callback reference before releasing lock + callback := sw.callback + + // Release lock before calling callback and sending to channel to avoid blocking + sw.mu.Unlock() + + // Execute callback function if set (outside of lock to avoid blocking) + if callback != nil { + callback(resultData) + } + + // Non-blocking send to output channel and update statistics + var sent bool select { case sw.outputChan <- resultData: - // Successfully sent, update statistics (within lock) - sw.sentCount++ + // Successfully sent + sent = true default: - // Channel full, drop result and update statistics (within lock) + // Channel full, drop result + sent = false + } + + // Re-acquire lock to update statistics + sw.mu.Lock() + if sent { + sw.sentCount++ + } else { sw.droppedCount++ } + sw.mu.Unlock() } // GetStats returns window performance statistics @@ -252,10 +289,10 @@ func (sw *SlidingWindow) GetStats() map[string]int64 { defer sw.mu.RUnlock() return map[string]int64{ - "sent_count": sw.sentCount, - "dropped_count": sw.droppedCount, - "buffer_size": int64(cap(sw.outputChan)), - "buffer_used": int64(len(sw.outputChan)), + "sentCount": sw.sentCount, + "droppedCount": sw.droppedCount, + "bufferSize": int64(cap(sw.outputChan)), + "bufferUsed": int64(len(sw.outputChan)), } }