2 Commits

Author SHA1 Message Date
rulego-team 4528d1f1a6 fix(window): 修复事件时间窗口未触发问题并添加调试日志 2025-11-15 13:54:27 +08:00
Whki 6ce76d506c Merge pull request #43 from rulego/dev
Dev
2025-11-15 13:15:05 +08:00
4 changed files with 157 additions and 13 deletions
+20 -3
View File
@@ -1066,9 +1066,20 @@ func TestSQLSlidingWindow_MaxOutOfOrderness(t *testing.T) {
} }
// 第三阶段:发送更多正常数据,推进 watermark // 第三阶段:发送更多正常数据,推进 watermark
// 关键:要触发窗口,需要 watermark >= windowEnd
// watermark = maxEventTime - maxOutOfOrderness
// 所以需要maxEventTime >= windowEnd + maxOutOfOrderness
windowSizeMs := int64(2000) // 2秒
maxOutOfOrdernessMs := int64(1000) // 1秒
firstWindowEnd := baseTime + windowSizeMs
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
t.Log("第三阶段:继续发送正常数据,推进 watermark") t.Log("第三阶段:继续发送正常数据,推进 watermark")
for i := 10; i < 15; i++ { for i := 10; i < 15; i++ {
eventTime := baseTime + int64(i*200) eventTime := baseTime + int64(i*200)
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
if i == 10 && eventTime < requiredEventTimeForTrigger {
eventTime = requiredEventTimeForTrigger
}
ssql.Emit(map[string]interface{}{ ssql.Emit(map[string]interface{}{
"deviceId": "sensor001", "deviceId": "sensor001",
"eventTime": eventTime, "eventTime": eventTime,
@@ -1192,11 +1203,17 @@ func TestSQLSlidingWindow_AllowedLateness(t *testing.T) {
} }
// 推进watermark触发第一个窗口 // 推进watermark触发第一个窗口
// 发送事件时间超过第一个窗口结束时间的数据 // 关键:要触发窗口,需要 watermark >= windowEnd
firstWindowEnd := baseTime + int64(2000) // 第一个窗口结束时间 // watermark = maxEventTime - maxOutOfOrderness
// 所以需要maxEventTime >= windowEnd + maxOutOfOrderness
windowSizeMs := int64(2000) // 2秒
maxOutOfOrdernessMs := int64(1000) // 1秒
firstWindowEnd := baseTime + windowSizeMs
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
// 发送事件时间 >= requiredEventTimeForTrigger 的数据,确保 watermark >= windowEnd
ssql.Emit(map[string]interface{}{ ssql.Emit(map[string]interface{}{
"deviceId": "sensor001", "deviceId": "sensor001",
"eventTime": firstWindowEnd + int64(2000), "eventTime": requiredEventTimeForTrigger,
"temperature": 100.0, "temperature": 100.0,
}) })
+16 -1
View File
@@ -448,7 +448,17 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
}) })
// 模拟完整的延迟数据处理场景 // 模拟完整的延迟数据处理场景
baseTime := time.Now().UnixMilli() - 10000 // 关键:确保 baseTime 对齐到窗口边界,以便窗口对齐行为可预测
windowSizeMs := int64(2000) // 2秒
baseTimeRaw := time.Now().UnixMilli() - 10000
baseTime := (baseTimeRaw / windowSizeMs) * windowSizeMs // 对齐到窗口边界
maxOutOfOrdernessMs := int64(1000) // 1秒
firstWindowEnd := baseTime + windowSizeMs
// 关键:要触发窗口,需要 watermark >= windowEnd
// watermark = maxEventTime - maxOutOfOrderness
// 所以需要maxEventTime - maxOutOfOrderness >= windowEnd
// 即maxEventTime >= windowEnd + maxOutOfOrderness
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
// 第一阶段:发送正常顺序的数据 // 第一阶段:发送正常顺序的数据
t.Log("第一阶段:发送正常顺序的数据") t.Log("第一阶段:发送正常顺序的数据")
@@ -482,9 +492,14 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
} }
// 第三阶段:继续发送正常数据,推进 watermark // 第三阶段:继续发送正常数据,推进 watermark
// 关键:必须发送事件时间 >= requiredEventTimeForTrigger 的数据,才能让 watermark >= windowEnd
t.Log("第三阶段:继续发送正常数据,推进 watermark") t.Log("第三阶段:继续发送正常数据,推进 watermark")
for i := 10; i < 15; i++ { for i := 10; i < 15; i++ {
eventTime := baseTime + int64(i*200) eventTime := baseTime + int64(i*200)
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
if i == 10 && eventTime < requiredEventTimeForTrigger {
eventTime = requiredEventTimeForTrigger
}
ssql.Emit(map[string]interface{}{ ssql.Emit(map[string]interface{}{
"deviceId": "sensor001", "deviceId": "sensor001",
"eventTime": eventTime, "eventTime": eventTime,
+55 -1
View File
@@ -19,6 +19,7 @@ package window
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"sync" "sync"
"time" "time"
@@ -27,6 +28,17 @@ import (
"github.com/rulego/streamsql/types" "github.com/rulego/streamsql/types"
) )
// debugLogSliding logs debug information only when EnableDebug is true
// This function is optimized to avoid unnecessary string formatting when debug is disabled
func debugLogSliding(format string, args ...interface{}) {
// Fast path: if debug is disabled, return immediately without evaluating args
// The compiler should optimize this check away when EnableDebug is a compile-time constant false
if !EnableDebug {
return
}
log.Printf("[SlidingWindow] "+format, args...)
}
// Ensure SlidingWindow implements the Window interface // Ensure SlidingWindow implements the Window interface
var _ Window = (*SlidingWindow)(nil) var _ Window = (*SlidingWindow)(nil)
@@ -171,11 +183,17 @@ func (sw *SlidingWindow) Add(data interface{}) {
// For event time, align window start to window boundaries // For event time, align window start to window boundaries
alignedStart := alignWindowStart(eventTime, sw.slide) alignedStart := alignWindowStart(eventTime, sw.slide)
sw.currentSlot = sw.createSlotFromStart(alignedStart) sw.currentSlot = sw.createSlotFromStart(alignedStart)
debugLogSliding("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)",
eventTime.UnixMilli(), alignedStart.UnixMilli(),
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
} else { } else {
// For processing time, use current time or event time as-is // For processing time, use current time or event time as-is
sw.currentSlot = sw.createSlot(eventTime) sw.currentSlot = sw.createSlot(eventTime)
// Record when first window started (processing time) // Record when first window started (processing time)
sw.firstWindowStartTime = time.Now() sw.firstWindowStartTime = time.Now()
debugLogSliding("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)",
eventTime.UnixMilli(),
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
} }
// Don't start timer here, wait for first window to end // Don't start timer here, wait for first window to end
// Send initialization complete signal // Send initialization complete signal
@@ -193,6 +211,10 @@ func (sw *SlidingWindow) Add(data interface{}) {
Timestamp: eventTime, Timestamp: eventTime,
} }
sw.data = append(sw.data, row) sw.data = append(sw.data, row)
debugLogSliding("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v",
eventTime.UnixMilli(), len(sw.data),
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli(),
sw.currentSlot.Contains(eventTime))
// Check if data is late and handle allowedLateness (after data is added) // Check if data is late and handle allowedLateness (after data is added)
if timeChar == types.EventTime && sw.watermark != nil { if timeChar == types.EventTime && sw.watermark != nil {
@@ -379,6 +401,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
defer sw.mu.Unlock() defer sw.mu.Unlock()
if !sw.initialized || sw.currentSlot == nil { if !sw.initialized || sw.currentSlot == nil {
debugLogSliding("checkAndTriggerWindows: not initialized or currentSlot is nil")
return return
} }
@@ -391,27 +414,43 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
// Which means: maxEventTime >= windowEnd + maxOutOfOrderness // Which means: maxEventTime >= windowEnd + maxOutOfOrderness
// This ensures all data for the window has arrived (within maxOutOfOrderness tolerance) // This ensures all data for the window has arrived (within maxOutOfOrderness tolerance)
// Use a small threshold (1ms) only for floating point precision issues // Use a small threshold (1ms) only for floating point precision issues
totalDataCount := len(sw.data)
debugLogSliding("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)",
watermarkTime.UnixMilli(), totalDataCount,
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
for sw.currentSlot != nil { for sw.currentSlot != nil {
windowEnd := sw.currentSlot.End windowEnd := sw.currentSlot.End
windowStart := sw.currentSlot.Start
// Check if watermark >= windowEnd // Check if watermark >= windowEnd
// Use !Before() instead of After() to include equality case // Use !Before() instead of After() to include equality case
// This is equivalent to watermarkTime >= windowEnd // This is equivalent to watermarkTime >= windowEnd
shouldTrigger := !watermarkTime.Before(*windowEnd) shouldTrigger := !watermarkTime.Before(*windowEnd)
debugLogSliding("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger)
if !shouldTrigger { if !shouldTrigger {
// Watermark hasn't reached windowEnd yet, stop checking // Watermark hasn't reached windowEnd yet, stop checking
debugLogSliding("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping")
break break
} }
// Check if window has data before triggering // Check if window has data before triggering
hasData := false hasData := false
dataInWindow := 0
var dataTimestamps []int64
for _, item := range sw.data { for _, item := range sw.data {
if sw.currentSlot.Contains(item.Timestamp) { if sw.currentSlot.Contains(item.Timestamp) {
hasData = true hasData = true
break dataInWindow++
dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli())
} }
} }
debugLogSliding("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps)
// Trigger current window only if it has data // Trigger current window only if it has data
if hasData { if hasData {
// Count data in window before triggering // Count data in window before triggering
@@ -439,7 +478,10 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
} }
} }
debugLogSliding("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
sw.triggerWindowLocked() sw.triggerWindowLocked()
debugLogSliding("checkAndTriggerWindows: window triggered successfully")
// If allowedLateness > 0, keep window open for late data // If allowedLateness > 0, keep window open for late data
if allowedLateness > 0 { if allowedLateness > 0 {
@@ -450,11 +492,23 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
closeTime: closeTime, closeTime: closeTime,
snapshotData: snapshotData, // Save snapshot for late updates snapshotData: snapshotData, // Save snapshot for late updates
} }
debugLogSliding("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli())
} }
} else {
debugLogSliding("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger",
windowStart.UnixMilli(), windowEnd.UnixMilli())
} }
// Move to next window (even if current window was empty) // Move to next window (even if current window was empty)
sw.currentSlot = sw.NextSlot() sw.currentSlot = sw.NextSlot()
if sw.currentSlot != nil {
debugLogSliding("checkAndTriggerWindows: moved to next window [%v, %v)",
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
} else {
debugLogSliding("checkAndTriggerWindows: NextSlot returned nil, stopping")
break
}
} }
// Close windows that have exceeded allowedLateness // Close windows that have exceeded allowedLateness
+58
View File
@@ -19,6 +19,7 @@ package window
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"sync" "sync"
"time" "time"
@@ -26,6 +27,20 @@ import (
"github.com/rulego/streamsql/utils/cast" "github.com/rulego/streamsql/utils/cast"
) )
// EnableDebug enables debug logging for window operations
var EnableDebug = false
// debugLog logs debug information only when EnableDebug is true
// This function is optimized to avoid unnecessary string formatting when debug is disabled
func debugLog(format string, args ...interface{}) {
// Fast path: if debug is disabled, return immediately without evaluating args
// The compiler should optimize this check away when EnableDebug is a compile-time constant false
if !EnableDebug {
return
}
log.Printf("[TumblingWindow] "+format, args...)
}
// Ensure TumblingWindow implements the Window interface // Ensure TumblingWindow implements the Window interface
var _ Window = (*TumblingWindow)(nil) var _ Window = (*TumblingWindow)(nil)
@@ -160,10 +175,16 @@ func (tw *TumblingWindow) Add(data interface{}) {
// Alignment granularity equals window size (e.g., 2s window aligns to 2s boundaries) // Alignment granularity equals window size (e.g., 2s window aligns to 2s boundaries)
alignedStart := alignWindowStart(eventTime, tw.size) alignedStart := alignWindowStart(eventTime, tw.size)
tw.currentSlot = tw.createSlotFromStart(alignedStart) tw.currentSlot = tw.createSlotFromStart(alignedStart)
debugLog("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)",
eventTime.UnixMilli(), alignedStart.UnixMilli(),
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
} else { } else {
// For processing time, use current time or event time as-is // For processing time, use current time or event time as-is
// No alignment is performed - window starts immediately when first data arrives // No alignment is performed - window starts immediately when first data arrives
tw.currentSlot = tw.createSlot(eventTime) tw.currentSlot = tw.createSlot(eventTime)
debugLog("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)",
eventTime.UnixMilli(),
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
} }
// Only start timer for processing time // Only start timer for processing time
@@ -189,6 +210,10 @@ func (tw *TumblingWindow) Add(data interface{}) {
Timestamp: eventTime, Timestamp: eventTime,
} }
tw.data = append(tw.data, row) tw.data = append(tw.data, row)
debugLog("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v",
eventTime.UnixMilli(), len(tw.data),
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli(),
tw.currentSlot.Contains(eventTime))
// Check if data is late and handle allowedLateness (after data is added) // Check if data is late and handle allowedLateness (after data is added)
if timeChar == types.EventTime && tw.watermark != nil { if timeChar == types.EventTime && tw.watermark != nil {
@@ -388,6 +413,7 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
defer tw.mu.Unlock() defer tw.mu.Unlock()
if !tw.initialized || tw.currentSlot == nil { if !tw.initialized || tw.currentSlot == nil {
debugLog("checkAndTriggerWindows: not initialized or currentSlot is nil")
return return
} }
@@ -400,8 +426,14 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
// watermark may be slightly less than windowEnd. We need to handle this case. // watermark may be slightly less than windowEnd. We need to handle this case.
// If watermark is very close to windowEnd (within a small threshold), we should also trigger. // If watermark is very close to windowEnd (within a small threshold), we should also trigger.
triggeredCount := 0 triggeredCount := 0
totalDataCount := len(tw.data)
debugLog("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)",
watermarkTime.UnixMilli(), totalDataCount,
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
for tw.currentSlot != nil { for tw.currentSlot != nil {
windowEnd := tw.currentSlot.End windowEnd := tw.currentSlot.End
windowStart := tw.currentSlot.Start
// Trigger if watermark >= windowEnd // Trigger if watermark >= windowEnd
// In Flink, windows are triggered when watermark >= windowEnd. // In Flink, windows are triggered when watermark >= windowEnd.
// Watermark calculation: watermark = maxEventTime - maxOutOfOrderness // Watermark calculation: watermark = maxEventTime - maxOutOfOrderness
@@ -413,8 +445,12 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
// This is equivalent to watermarkTime >= windowEnd // This is equivalent to watermarkTime >= windowEnd
shouldTrigger := !watermarkTime.Before(*windowEnd) shouldTrigger := !watermarkTime.Before(*windowEnd)
debugLog("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger)
if !shouldTrigger { if !shouldTrigger {
// Watermark hasn't reached windowEnd yet, stop checking // Watermark hasn't reached windowEnd yet, stop checking
debugLog("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping")
break break
} }
@@ -425,13 +461,18 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
// Check if window has data before triggering // Check if window has data before triggering
hasData := false hasData := false
dataInWindow := 0 dataInWindow := 0
var dataTimestamps []int64
for _, item := range tw.data { for _, item := range tw.data {
if tw.currentSlot.Contains(item.Timestamp) { if tw.currentSlot.Contains(item.Timestamp) {
hasData = true hasData = true
dataInWindow++ dataInWindow++
dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli())
} }
} }
debugLog("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps)
// Trigger current window only if it has data // Trigger current window only if it has data
if hasData { if hasData {
@@ -452,8 +493,11 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
} }
} }
debugLog("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
tw.triggerWindowLocked() tw.triggerWindowLocked()
triggeredCount++ triggeredCount++
debugLog("checkAndTriggerWindows: window triggered successfully, triggeredCount=%d", triggeredCount)
// triggerWindowLocked releases and re-acquires lock, so we need to re-check state // triggerWindowLocked releases and re-acquires lock, so we need to re-check state
// If allowedLateness > 0, keep window open for late data // If allowedLateness > 0, keep window open for late data
@@ -466,18 +510,32 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
closeTime: closeTime, closeTime: closeTime,
snapshotData: snapshotData, // Save snapshot for late updates snapshotData: snapshotData, // Save snapshot for late updates
} }
debugLog("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli())
} }
} else {
debugLog("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger",
windowStart.UnixMilli(), windowEnd.UnixMilli())
} }
// Move to next window (even if current window was empty) // Move to next window (even if current window was empty)
// Re-check currentSlot in case it was modified // Re-check currentSlot in case it was modified
if tw.currentSlot != nil { if tw.currentSlot != nil {
tw.currentSlot = tw.NextSlot() tw.currentSlot = tw.NextSlot()
if tw.currentSlot != nil {
debugLog("checkAndTriggerWindows: moved to next window [%v, %v)",
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
} else { } else {
debugLog("checkAndTriggerWindows: NextSlot returned nil, stopping")
}
} else {
debugLog("checkAndTriggerWindows: currentSlot is nil, breaking")
break break
} }
} }
debugLog("checkAndTriggerWindows: finished, triggeredCount=%d", triggeredCount)
// Close windows that have exceeded allowedLateness // Close windows that have exceeded allowedLateness
tw.closeExpiredWindows(watermarkTime) tw.closeExpiredWindows(watermarkTime)
} }