From 4528d1f1a61d8141b165d68f8b2c643ce3317dad Mon Sep 17 00:00:00 2001 From: rulego-team Date: Sat, 15 Nov 2025 13:54:27 +0800 Subject: [PATCH] =?UTF-8?q?fix(window):=20=E4=BF=AE=E5=A4=8D=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E6=97=B6=E9=97=B4=E7=AA=97=E5=8F=A3=E6=9C=AA=E8=A7=A6?= =?UTF-8?q?=E5=8F=91=E9=97=AE=E9=A2=98=E5=B9=B6=E6=B7=BB=E5=8A=A0=E8=B0=83?= =?UTF-8?q?=E8=AF=95=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- streamsql_sliding_window_test.go | 23 ++++++++-- streamsql_tumbling_window_test.go | 17 +++++++- window/sliding_window.go | 72 +++++++++++++++++++++++++++---- window/tumbling_window.go | 58 +++++++++++++++++++++++++ 4 files changed, 157 insertions(+), 13 deletions(-) diff --git a/streamsql_sliding_window_test.go b/streamsql_sliding_window_test.go index 1b6213c..2f398f1 100644 --- a/streamsql_sliding_window_test.go +++ b/streamsql_sliding_window_test.go @@ -1066,9 +1066,20 @@ func TestSQLSlidingWindow_MaxOutOfOrderness(t *testing.T) { } // 第三阶段:发送更多正常数据,推进 watermark + // 关键:要触发窗口,需要 watermark >= windowEnd + // watermark = maxEventTime - maxOutOfOrderness + // 所以需要:maxEventTime >= windowEnd + maxOutOfOrderness + windowSizeMs := int64(2000) // 2秒 + maxOutOfOrdernessMs := int64(1000) // 1秒 + firstWindowEnd := baseTime + windowSizeMs + requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs t.Log("第三阶段:继续发送正常数据,推进 watermark") for i := 10; i < 15; i++ { eventTime := baseTime + int64(i*200) + // 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger + if i == 10 && eventTime < requiredEventTimeForTrigger { + eventTime = requiredEventTimeForTrigger + } ssql.Emit(map[string]interface{}{ "deviceId": "sensor001", "eventTime": eventTime, @@ -1192,11 +1203,17 @@ func TestSQLSlidingWindow_AllowedLateness(t *testing.T) { } // 推进watermark,触发第一个窗口 - // 发送事件时间超过第一个窗口结束时间的数据 - firstWindowEnd := baseTime + int64(2000) // 第一个窗口结束时间 + // 关键:要触发窗口,需要 watermark >= windowEnd + // watermark = maxEventTime - maxOutOfOrderness + // 所以需要:maxEventTime >= windowEnd + maxOutOfOrderness + windowSizeMs := int64(2000) // 2秒 + maxOutOfOrdernessMs := int64(1000) // 1秒 + firstWindowEnd := baseTime + windowSizeMs + requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs + // 发送事件时间 >= requiredEventTimeForTrigger 的数据,确保 watermark >= windowEnd ssql.Emit(map[string]interface{}{ "deviceId": "sensor001", - "eventTime": firstWindowEnd + int64(2000), + "eventTime": requiredEventTimeForTrigger, "temperature": 100.0, }) diff --git a/streamsql_tumbling_window_test.go b/streamsql_tumbling_window_test.go index b7dc81f..2db63e4 100644 --- a/streamsql_tumbling_window_test.go +++ b/streamsql_tumbling_window_test.go @@ -448,7 +448,17 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) { }) // 模拟完整的延迟数据处理场景 - baseTime := time.Now().UnixMilli() - 10000 + // 关键:确保 baseTime 对齐到窗口边界,以便窗口对齐行为可预测 + windowSizeMs := int64(2000) // 2秒 + baseTimeRaw := time.Now().UnixMilli() - 10000 + baseTime := (baseTimeRaw / windowSizeMs) * windowSizeMs // 对齐到窗口边界 + maxOutOfOrdernessMs := int64(1000) // 1秒 + firstWindowEnd := baseTime + windowSizeMs + // 关键:要触发窗口,需要 watermark >= windowEnd + // watermark = maxEventTime - maxOutOfOrderness + // 所以需要:maxEventTime - maxOutOfOrderness >= windowEnd + // 即:maxEventTime >= windowEnd + maxOutOfOrderness + requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs // 第一阶段:发送正常顺序的数据 t.Log("第一阶段:发送正常顺序的数据") @@ -482,9 +492,14 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) { } // 第三阶段:继续发送正常数据,推进 watermark + // 关键:必须发送事件时间 >= requiredEventTimeForTrigger 的数据,才能让 watermark >= windowEnd t.Log("第三阶段:继续发送正常数据,推进 watermark") for i := 10; i < 15; i++ { eventTime := baseTime + int64(i*200) + // 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger + if i == 10 && eventTime < requiredEventTimeForTrigger { + eventTime = requiredEventTimeForTrigger + } ssql.Emit(map[string]interface{}{ "deviceId": "sensor001", "eventTime": eventTime, diff --git a/window/sliding_window.go b/window/sliding_window.go index 1ad077d..12b0a20 100644 --- a/window/sliding_window.go +++ b/window/sliding_window.go @@ -19,6 +19,7 @@ package window import ( "context" "fmt" + "log" "sync" "time" @@ -27,6 +28,17 @@ import ( "github.com/rulego/streamsql/types" ) +// debugLogSliding logs debug information only when EnableDebug is true +// This function is optimized to avoid unnecessary string formatting when debug is disabled +func debugLogSliding(format string, args ...interface{}) { + // Fast path: if debug is disabled, return immediately without evaluating args + // The compiler should optimize this check away when EnableDebug is a compile-time constant false + if !EnableDebug { + return + } + log.Printf("[SlidingWindow] "+format, args...) +} + // Ensure SlidingWindow implements the Window interface var _ Window = (*SlidingWindow)(nil) @@ -171,11 +183,17 @@ func (sw *SlidingWindow) Add(data interface{}) { // For event time, align window start to window boundaries alignedStart := alignWindowStart(eventTime, sw.slide) sw.currentSlot = sw.createSlotFromStart(alignedStart) + debugLogSliding("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)", + eventTime.UnixMilli(), alignedStart.UnixMilli(), + sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli()) } else { // For processing time, use current time or event time as-is sw.currentSlot = sw.createSlot(eventTime) // Record when first window started (processing time) sw.firstWindowStartTime = time.Now() + debugLogSliding("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)", + eventTime.UnixMilli(), + sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli()) } // Don't start timer here, wait for first window to end // Send initialization complete signal @@ -193,6 +211,10 @@ func (sw *SlidingWindow) Add(data interface{}) { Timestamp: eventTime, } sw.data = append(sw.data, row) + debugLogSliding("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v", + eventTime.UnixMilli(), len(sw.data), + sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli(), + sw.currentSlot.Contains(eventTime)) // Check if data is late and handle allowedLateness (after data is added) if timeChar == types.EventTime && sw.watermark != nil { @@ -211,7 +233,7 @@ func (sw *SlidingWindow) Add(data interface{}) { break } } - + // If not belonging to triggered window, check if it belongs to currentSlot // This handles the case where watermark has advanced but window hasn't triggered yet if !belongsToTriggeredWindow && sw.initialized && sw.currentSlot != nil && sw.currentSlot.Contains(eventTime) { @@ -379,6 +401,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) { defer sw.mu.Unlock() if !sw.initialized || sw.currentSlot == nil { + debugLogSliding("checkAndTriggerWindows: not initialized or currentSlot is nil") return } @@ -391,27 +414,43 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) { // Which means: maxEventTime >= windowEnd + maxOutOfOrderness // This ensures all data for the window has arrived (within maxOutOfOrderness tolerance) // Use a small threshold (1ms) only for floating point precision issues + totalDataCount := len(sw.data) + debugLogSliding("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)", + watermarkTime.UnixMilli(), totalDataCount, + sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli()) + for sw.currentSlot != nil { windowEnd := sw.currentSlot.End - + windowStart := sw.currentSlot.Start + // Check if watermark >= windowEnd // Use !Before() instead of After() to include equality case // This is equivalent to watermarkTime >= windowEnd shouldTrigger := !watermarkTime.Before(*windowEnd) - + + debugLogSliding("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v", + windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger) + if !shouldTrigger { // Watermark hasn't reached windowEnd yet, stop checking + debugLogSliding("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping") break } // Check if window has data before triggering hasData := false + dataInWindow := 0 + var dataTimestamps []int64 for _, item := range sw.data { if sw.currentSlot.Contains(item.Timestamp) { hasData = true - break + dataInWindow++ + dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli()) } } + debugLogSliding("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v", + windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps) + // Trigger current window only if it has data if hasData { // Count data in window before triggering @@ -421,7 +460,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) { dataInWindow++ } } - + // Save snapshot data before triggering (for Flink-like late update behavior) var snapshotData []types.Row if allowedLateness > 0 { @@ -438,8 +477,11 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) { } } } - + + debugLogSliding("checkAndTriggerWindows: triggering window [%v, %v) with %d data items", + windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow) sw.triggerWindowLocked() + debugLogSliding("checkAndTriggerWindows: window triggered successfully") // If allowedLateness > 0, keep window open for late data if allowedLateness > 0 { @@ -450,11 +492,23 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) { closeTime: closeTime, snapshotData: snapshotData, // Save snapshot for late updates } + debugLogSliding("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v", + windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli()) } + } else { + debugLogSliding("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger", + windowStart.UnixMilli(), windowEnd.UnixMilli()) } // Move to next window (even if current window was empty) sw.currentSlot = sw.NextSlot() + if sw.currentSlot != nil { + debugLogSliding("checkAndTriggerWindows: moved to next window [%v, %v)", + sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli()) + } else { + debugLogSliding("checkAndTriggerWindows: NextSlot returned nil, stopping") + break + } } // Close windows that have exceeded allowedLateness @@ -804,7 +858,7 @@ func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) { // Collect all data for this window: original snapshot + late data from sw.data resultData := make([]types.Row, 0) - + // First, add original snapshot data (if exists) if windowInfo != nil && len(windowInfo.snapshotData) > 0 { // Create copies of snapshot data @@ -816,7 +870,7 @@ func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) { }) } } - + // Then, add late data from sw.data (newly arrived late data) lateDataCount := 0 for _, item := range sw.data { @@ -884,7 +938,7 @@ func (sw *SlidingWindow) closeExpiredWindows(watermarkTime time.Time) { delete(sw.triggeredWindows, key) } } - + // Clean up data that belongs to expired windows (if any) if len(expiredWindows) > 0 { newData := make([]types.Row, 0) diff --git a/window/tumbling_window.go b/window/tumbling_window.go index 4656cdb..fe70440 100644 --- a/window/tumbling_window.go +++ b/window/tumbling_window.go @@ -19,6 +19,7 @@ package window import ( "context" "fmt" + "log" "sync" "time" @@ -26,6 +27,20 @@ import ( "github.com/rulego/streamsql/utils/cast" ) +// EnableDebug enables debug logging for window operations +var EnableDebug = false + +// debugLog logs debug information only when EnableDebug is true +// This function is optimized to avoid unnecessary string formatting when debug is disabled +func debugLog(format string, args ...interface{}) { + // Fast path: if debug is disabled, return immediately without evaluating args + // The compiler should optimize this check away when EnableDebug is a compile-time constant false + if !EnableDebug { + return + } + log.Printf("[TumblingWindow] "+format, args...) +} + // Ensure TumblingWindow implements the Window interface var _ Window = (*TumblingWindow)(nil) @@ -160,10 +175,16 @@ func (tw *TumblingWindow) Add(data interface{}) { // Alignment granularity equals window size (e.g., 2s window aligns to 2s boundaries) alignedStart := alignWindowStart(eventTime, tw.size) tw.currentSlot = tw.createSlotFromStart(alignedStart) + debugLog("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)", + eventTime.UnixMilli(), alignedStart.UnixMilli(), + tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli()) } else { // For processing time, use current time or event time as-is // No alignment is performed - window starts immediately when first data arrives tw.currentSlot = tw.createSlot(eventTime) + debugLog("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)", + eventTime.UnixMilli(), + tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli()) } // Only start timer for processing time @@ -189,6 +210,10 @@ func (tw *TumblingWindow) Add(data interface{}) { Timestamp: eventTime, } tw.data = append(tw.data, row) + debugLog("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v", + eventTime.UnixMilli(), len(tw.data), + tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli(), + tw.currentSlot.Contains(eventTime)) // Check if data is late and handle allowedLateness (after data is added) if timeChar == types.EventTime && tw.watermark != nil { @@ -388,6 +413,7 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) { defer tw.mu.Unlock() if !tw.initialized || tw.currentSlot == nil { + debugLog("checkAndTriggerWindows: not initialized or currentSlot is nil") return } @@ -400,8 +426,14 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) { // watermark may be slightly less than windowEnd. We need to handle this case. // If watermark is very close to windowEnd (within a small threshold), we should also trigger. triggeredCount := 0 + totalDataCount := len(tw.data) + debugLog("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)", + watermarkTime.UnixMilli(), totalDataCount, + tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli()) + for tw.currentSlot != nil { windowEnd := tw.currentSlot.End + windowStart := tw.currentSlot.Start // Trigger if watermark >= windowEnd // In Flink, windows are triggered when watermark >= windowEnd. // Watermark calculation: watermark = maxEventTime - maxOutOfOrderness @@ -413,8 +445,12 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) { // This is equivalent to watermarkTime >= windowEnd shouldTrigger := !watermarkTime.Before(*windowEnd) + debugLog("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v", + windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger) + if !shouldTrigger { // Watermark hasn't reached windowEnd yet, stop checking + debugLog("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping") break } @@ -425,13 +461,18 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) { // Check if window has data before triggering hasData := false dataInWindow := 0 + var dataTimestamps []int64 for _, item := range tw.data { if tw.currentSlot.Contains(item.Timestamp) { hasData = true dataInWindow++ + dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli()) } } + debugLog("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v", + windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps) + // Trigger current window only if it has data if hasData { @@ -452,8 +493,11 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) { } } + debugLog("checkAndTriggerWindows: triggering window [%v, %v) with %d data items", + windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow) tw.triggerWindowLocked() triggeredCount++ + debugLog("checkAndTriggerWindows: window triggered successfully, triggeredCount=%d", triggeredCount) // triggerWindowLocked releases and re-acquires lock, so we need to re-check state // If allowedLateness > 0, keep window open for late data @@ -466,18 +510,32 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) { closeTime: closeTime, snapshotData: snapshotData, // Save snapshot for late updates } + debugLog("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v", + windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli()) } + } else { + debugLog("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger", + windowStart.UnixMilli(), windowEnd.UnixMilli()) } // Move to next window (even if current window was empty) // Re-check currentSlot in case it was modified if tw.currentSlot != nil { tw.currentSlot = tw.NextSlot() + if tw.currentSlot != nil { + debugLog("checkAndTriggerWindows: moved to next window [%v, %v)", + tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli()) + } else { + debugLog("checkAndTriggerWindows: NextSlot returned nil, stopping") + } } else { + debugLog("checkAndTriggerWindows: currentSlot is nil, breaking") break } } + debugLog("checkAndTriggerWindows: finished, triggeredCount=%d", triggeredCount) + // Close windows that have exceeded allowedLateness tw.closeExpiredWindows(watermarkTime) }