2 Commits

Author SHA1 Message Date
rulego-team 4528d1f1a6 fix(window): 修复事件时间窗口未触发问题并添加调试日志 2025-11-15 13:54:27 +08:00
Whki 6ce76d506c Merge pull request #43 from rulego/dev
Dev
2025-11-15 13:15:05 +08:00
4 changed files with 157 additions and 13 deletions
+20 -3
View File
@@ -1066,9 +1066,20 @@ func TestSQLSlidingWindow_MaxOutOfOrderness(t *testing.T) {
}
// 第三阶段:发送更多正常数据,推进 watermark
// 关键:要触发窗口,需要 watermark >= windowEnd
// watermark = maxEventTime - maxOutOfOrderness
// 所以需要maxEventTime >= windowEnd + maxOutOfOrderness
windowSizeMs := int64(2000) // 2秒
maxOutOfOrdernessMs := int64(1000) // 1秒
firstWindowEnd := baseTime + windowSizeMs
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
t.Log("第三阶段:继续发送正常数据,推进 watermark")
for i := 10; i < 15; i++ {
eventTime := baseTime + int64(i*200)
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
if i == 10 && eventTime < requiredEventTimeForTrigger {
eventTime = requiredEventTimeForTrigger
}
ssql.Emit(map[string]interface{}{
"deviceId": "sensor001",
"eventTime": eventTime,
@@ -1192,11 +1203,17 @@ func TestSQLSlidingWindow_AllowedLateness(t *testing.T) {
}
// 推进watermark触发第一个窗口
// 发送事件时间超过第一个窗口结束时间的数据
firstWindowEnd := baseTime + int64(2000) // 第一个窗口结束时间
// 关键:要触发窗口,需要 watermark >= windowEnd
// watermark = maxEventTime - maxOutOfOrderness
// 所以需要maxEventTime >= windowEnd + maxOutOfOrderness
windowSizeMs := int64(2000) // 2秒
maxOutOfOrdernessMs := int64(1000) // 1秒
firstWindowEnd := baseTime + windowSizeMs
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
// 发送事件时间 >= requiredEventTimeForTrigger 的数据,确保 watermark >= windowEnd
ssql.Emit(map[string]interface{}{
"deviceId": "sensor001",
"eventTime": firstWindowEnd + int64(2000),
"eventTime": requiredEventTimeForTrigger,
"temperature": 100.0,
})
+16 -1
View File
@@ -448,7 +448,17 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
})
// 模拟完整的延迟数据处理场景
baseTime := time.Now().UnixMilli() - 10000
// 关键:确保 baseTime 对齐到窗口边界,以便窗口对齐行为可预测
windowSizeMs := int64(2000) // 2秒
baseTimeRaw := time.Now().UnixMilli() - 10000
baseTime := (baseTimeRaw / windowSizeMs) * windowSizeMs // 对齐到窗口边界
maxOutOfOrdernessMs := int64(1000) // 1秒
firstWindowEnd := baseTime + windowSizeMs
// 关键:要触发窗口,需要 watermark >= windowEnd
// watermark = maxEventTime - maxOutOfOrderness
// 所以需要maxEventTime - maxOutOfOrderness >= windowEnd
// 即maxEventTime >= windowEnd + maxOutOfOrderness
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
// 第一阶段:发送正常顺序的数据
t.Log("第一阶段:发送正常顺序的数据")
@@ -482,9 +492,14 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
}
// 第三阶段:继续发送正常数据,推进 watermark
// 关键:必须发送事件时间 >= requiredEventTimeForTrigger 的数据,才能让 watermark >= windowEnd
t.Log("第三阶段:继续发送正常数据,推进 watermark")
for i := 10; i < 15; i++ {
eventTime := baseTime + int64(i*200)
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
if i == 10 && eventTime < requiredEventTimeForTrigger {
eventTime = requiredEventTimeForTrigger
}
ssql.Emit(map[string]interface{}{
"deviceId": "sensor001",
"eventTime": eventTime,
+63 -9
View File
@@ -19,6 +19,7 @@ package window
import (
"context"
"fmt"
"log"
"sync"
"time"
@@ -27,6 +28,17 @@ import (
"github.com/rulego/streamsql/types"
)
// debugLogSliding logs debug information only when EnableDebug is true
// This function is optimized to avoid unnecessary string formatting when debug is disabled
func debugLogSliding(format string, args ...interface{}) {
// Fast path: if debug is disabled, return immediately without evaluating args
// The compiler should optimize this check away when EnableDebug is a compile-time constant false
if !EnableDebug {
return
}
log.Printf("[SlidingWindow] "+format, args...)
}
// Ensure SlidingWindow implements the Window interface
var _ Window = (*SlidingWindow)(nil)
@@ -171,11 +183,17 @@ func (sw *SlidingWindow) Add(data interface{}) {
// For event time, align window start to window boundaries
alignedStart := alignWindowStart(eventTime, sw.slide)
sw.currentSlot = sw.createSlotFromStart(alignedStart)
debugLogSliding("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)",
eventTime.UnixMilli(), alignedStart.UnixMilli(),
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
} else {
// For processing time, use current time or event time as-is
sw.currentSlot = sw.createSlot(eventTime)
// Record when first window started (processing time)
sw.firstWindowStartTime = time.Now()
debugLogSliding("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)",
eventTime.UnixMilli(),
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
}
// Don't start timer here, wait for first window to end
// Send initialization complete signal
@@ -193,6 +211,10 @@ func (sw *SlidingWindow) Add(data interface{}) {
Timestamp: eventTime,
}
sw.data = append(sw.data, row)
debugLogSliding("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v",
eventTime.UnixMilli(), len(sw.data),
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli(),
sw.currentSlot.Contains(eventTime))
// Check if data is late and handle allowedLateness (after data is added)
if timeChar == types.EventTime && sw.watermark != nil {
@@ -211,7 +233,7 @@ func (sw *SlidingWindow) Add(data interface{}) {
break
}
}
// If not belonging to triggered window, check if it belongs to currentSlot
// This handles the case where watermark has advanced but window hasn't triggered yet
if !belongsToTriggeredWindow && sw.initialized && sw.currentSlot != nil && sw.currentSlot.Contains(eventTime) {
@@ -379,6 +401,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
defer sw.mu.Unlock()
if !sw.initialized || sw.currentSlot == nil {
debugLogSliding("checkAndTriggerWindows: not initialized or currentSlot is nil")
return
}
@@ -391,27 +414,43 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
// Which means: maxEventTime >= windowEnd + maxOutOfOrderness
// This ensures all data for the window has arrived (within maxOutOfOrderness tolerance)
// Use a small threshold (1ms) only for floating point precision issues
totalDataCount := len(sw.data)
debugLogSliding("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)",
watermarkTime.UnixMilli(), totalDataCount,
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
for sw.currentSlot != nil {
windowEnd := sw.currentSlot.End
windowStart := sw.currentSlot.Start
// Check if watermark >= windowEnd
// Use !Before() instead of After() to include equality case
// This is equivalent to watermarkTime >= windowEnd
shouldTrigger := !watermarkTime.Before(*windowEnd)
debugLogSliding("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger)
if !shouldTrigger {
// Watermark hasn't reached windowEnd yet, stop checking
debugLogSliding("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping")
break
}
// Check if window has data before triggering
hasData := false
dataInWindow := 0
var dataTimestamps []int64
for _, item := range sw.data {
if sw.currentSlot.Contains(item.Timestamp) {
hasData = true
break
dataInWindow++
dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli())
}
}
debugLogSliding("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps)
// Trigger current window only if it has data
if hasData {
// Count data in window before triggering
@@ -421,7 +460,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
dataInWindow++
}
}
// Save snapshot data before triggering (for Flink-like late update behavior)
var snapshotData []types.Row
if allowedLateness > 0 {
@@ -438,8 +477,11 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
}
}
}
debugLogSliding("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
sw.triggerWindowLocked()
debugLogSliding("checkAndTriggerWindows: window triggered successfully")
// If allowedLateness > 0, keep window open for late data
if allowedLateness > 0 {
@@ -450,11 +492,23 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
closeTime: closeTime,
snapshotData: snapshotData, // Save snapshot for late updates
}
debugLogSliding("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli())
}
} else {
debugLogSliding("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger",
windowStart.UnixMilli(), windowEnd.UnixMilli())
}
// Move to next window (even if current window was empty)
sw.currentSlot = sw.NextSlot()
if sw.currentSlot != nil {
debugLogSliding("checkAndTriggerWindows: moved to next window [%v, %v)",
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
} else {
debugLogSliding("checkAndTriggerWindows: NextSlot returned nil, stopping")
break
}
}
// Close windows that have exceeded allowedLateness
@@ -804,7 +858,7 @@ func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
// Collect all data for this window: original snapshot + late data from sw.data
resultData := make([]types.Row, 0)
// First, add original snapshot data (if exists)
if windowInfo != nil && len(windowInfo.snapshotData) > 0 {
// Create copies of snapshot data
@@ -816,7 +870,7 @@ func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
})
}
}
// Then, add late data from sw.data (newly arrived late data)
lateDataCount := 0
for _, item := range sw.data {
@@ -884,7 +938,7 @@ func (sw *SlidingWindow) closeExpiredWindows(watermarkTime time.Time) {
delete(sw.triggeredWindows, key)
}
}
// Clean up data that belongs to expired windows (if any)
if len(expiredWindows) > 0 {
newData := make([]types.Row, 0)
+58
View File
@@ -19,6 +19,7 @@ package window
import (
"context"
"fmt"
"log"
"sync"
"time"
@@ -26,6 +27,20 @@ import (
"github.com/rulego/streamsql/utils/cast"
)
// EnableDebug enables debug logging for window operations
var EnableDebug = false
// debugLog logs debug information only when EnableDebug is true
// This function is optimized to avoid unnecessary string formatting when debug is disabled
func debugLog(format string, args ...interface{}) {
// Fast path: if debug is disabled, return immediately without evaluating args
// The compiler should optimize this check away when EnableDebug is a compile-time constant false
if !EnableDebug {
return
}
log.Printf("[TumblingWindow] "+format, args...)
}
// Ensure TumblingWindow implements the Window interface
var _ Window = (*TumblingWindow)(nil)
@@ -160,10 +175,16 @@ func (tw *TumblingWindow) Add(data interface{}) {
// Alignment granularity equals window size (e.g., 2s window aligns to 2s boundaries)
alignedStart := alignWindowStart(eventTime, tw.size)
tw.currentSlot = tw.createSlotFromStart(alignedStart)
debugLog("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)",
eventTime.UnixMilli(), alignedStart.UnixMilli(),
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
} else {
// For processing time, use current time or event time as-is
// No alignment is performed - window starts immediately when first data arrives
tw.currentSlot = tw.createSlot(eventTime)
debugLog("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)",
eventTime.UnixMilli(),
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
}
// Only start timer for processing time
@@ -189,6 +210,10 @@ func (tw *TumblingWindow) Add(data interface{}) {
Timestamp: eventTime,
}
tw.data = append(tw.data, row)
debugLog("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v",
eventTime.UnixMilli(), len(tw.data),
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli(),
tw.currentSlot.Contains(eventTime))
// Check if data is late and handle allowedLateness (after data is added)
if timeChar == types.EventTime && tw.watermark != nil {
@@ -388,6 +413,7 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
defer tw.mu.Unlock()
if !tw.initialized || tw.currentSlot == nil {
debugLog("checkAndTriggerWindows: not initialized or currentSlot is nil")
return
}
@@ -400,8 +426,14 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
// watermark may be slightly less than windowEnd. We need to handle this case.
// If watermark is very close to windowEnd (within a small threshold), we should also trigger.
triggeredCount := 0
totalDataCount := len(tw.data)
debugLog("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)",
watermarkTime.UnixMilli(), totalDataCount,
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
for tw.currentSlot != nil {
windowEnd := tw.currentSlot.End
windowStart := tw.currentSlot.Start
// Trigger if watermark >= windowEnd
// In Flink, windows are triggered when watermark >= windowEnd.
// Watermark calculation: watermark = maxEventTime - maxOutOfOrderness
@@ -413,8 +445,12 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
// This is equivalent to watermarkTime >= windowEnd
shouldTrigger := !watermarkTime.Before(*windowEnd)
debugLog("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger)
if !shouldTrigger {
// Watermark hasn't reached windowEnd yet, stop checking
debugLog("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping")
break
}
@@ -425,13 +461,18 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
// Check if window has data before triggering
hasData := false
dataInWindow := 0
var dataTimestamps []int64
for _, item := range tw.data {
if tw.currentSlot.Contains(item.Timestamp) {
hasData = true
dataInWindow++
dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli())
}
}
debugLog("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps)
// Trigger current window only if it has data
if hasData {
@@ -452,8 +493,11 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
}
}
debugLog("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
tw.triggerWindowLocked()
triggeredCount++
debugLog("checkAndTriggerWindows: window triggered successfully, triggeredCount=%d", triggeredCount)
// triggerWindowLocked releases and re-acquires lock, so we need to re-check state
// If allowedLateness > 0, keep window open for late data
@@ -466,18 +510,32 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
closeTime: closeTime,
snapshotData: snapshotData, // Save snapshot for late updates
}
debugLog("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v",
windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli())
}
} else {
debugLog("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger",
windowStart.UnixMilli(), windowEnd.UnixMilli())
}
// Move to next window (even if current window was empty)
// Re-check currentSlot in case it was modified
if tw.currentSlot != nil {
tw.currentSlot = tw.NextSlot()
if tw.currentSlot != nil {
debugLog("checkAndTriggerWindows: moved to next window [%v, %v)",
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
} else {
debugLog("checkAndTriggerWindows: NextSlot returned nil, stopping")
}
} else {
debugLog("checkAndTriggerWindows: currentSlot is nil, breaking")
break
}
}
debugLog("checkAndTriggerWindows: finished, triggeredCount=%d", triggeredCount)
// Close windows that have exceeded allowedLateness
tw.closeExpiredWindows(watermarkTime)
}