mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-22 02:00:36 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4528d1f1a6 | |||
| 6ce76d506c |
@@ -1066,9 +1066,20 @@ func TestSQLSlidingWindow_MaxOutOfOrderness(t *testing.T) {
|
||||
}
|
||||
|
||||
// 第三阶段:发送更多正常数据,推进 watermark
|
||||
// 关键:要触发窗口,需要 watermark >= windowEnd
|
||||
// watermark = maxEventTime - maxOutOfOrderness
|
||||
// 所以需要:maxEventTime >= windowEnd + maxOutOfOrderness
|
||||
windowSizeMs := int64(2000) // 2秒
|
||||
maxOutOfOrdernessMs := int64(1000) // 1秒
|
||||
firstWindowEnd := baseTime + windowSizeMs
|
||||
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
|
||||
t.Log("第三阶段:继续发送正常数据,推进 watermark")
|
||||
for i := 10; i < 15; i++ {
|
||||
eventTime := baseTime + int64(i*200)
|
||||
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
|
||||
if i == 10 && eventTime < requiredEventTimeForTrigger {
|
||||
eventTime = requiredEventTimeForTrigger
|
||||
}
|
||||
ssql.Emit(map[string]interface{}{
|
||||
"deviceId": "sensor001",
|
||||
"eventTime": eventTime,
|
||||
@@ -1192,11 +1203,17 @@ func TestSQLSlidingWindow_AllowedLateness(t *testing.T) {
|
||||
}
|
||||
|
||||
// 推进watermark,触发第一个窗口
|
||||
// 发送事件时间超过第一个窗口结束时间的数据
|
||||
firstWindowEnd := baseTime + int64(2000) // 第一个窗口结束时间
|
||||
// 关键:要触发窗口,需要 watermark >= windowEnd
|
||||
// watermark = maxEventTime - maxOutOfOrderness
|
||||
// 所以需要:maxEventTime >= windowEnd + maxOutOfOrderness
|
||||
windowSizeMs := int64(2000) // 2秒
|
||||
maxOutOfOrdernessMs := int64(1000) // 1秒
|
||||
firstWindowEnd := baseTime + windowSizeMs
|
||||
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
|
||||
// 发送事件时间 >= requiredEventTimeForTrigger 的数据,确保 watermark >= windowEnd
|
||||
ssql.Emit(map[string]interface{}{
|
||||
"deviceId": "sensor001",
|
||||
"eventTime": firstWindowEnd + int64(2000),
|
||||
"eventTime": requiredEventTimeForTrigger,
|
||||
"temperature": 100.0,
|
||||
})
|
||||
|
||||
|
||||
@@ -448,7 +448,17 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
|
||||
})
|
||||
|
||||
// 模拟完整的延迟数据处理场景
|
||||
baseTime := time.Now().UnixMilli() - 10000
|
||||
// 关键:确保 baseTime 对齐到窗口边界,以便窗口对齐行为可预测
|
||||
windowSizeMs := int64(2000) // 2秒
|
||||
baseTimeRaw := time.Now().UnixMilli() - 10000
|
||||
baseTime := (baseTimeRaw / windowSizeMs) * windowSizeMs // 对齐到窗口边界
|
||||
maxOutOfOrdernessMs := int64(1000) // 1秒
|
||||
firstWindowEnd := baseTime + windowSizeMs
|
||||
// 关键:要触发窗口,需要 watermark >= windowEnd
|
||||
// watermark = maxEventTime - maxOutOfOrderness
|
||||
// 所以需要:maxEventTime - maxOutOfOrderness >= windowEnd
|
||||
// 即:maxEventTime >= windowEnd + maxOutOfOrderness
|
||||
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
|
||||
|
||||
// 第一阶段:发送正常顺序的数据
|
||||
t.Log("第一阶段:发送正常顺序的数据")
|
||||
@@ -482,9 +492,14 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
|
||||
}
|
||||
|
||||
// 第三阶段:继续发送正常数据,推进 watermark
|
||||
// 关键:必须发送事件时间 >= requiredEventTimeForTrigger 的数据,才能让 watermark >= windowEnd
|
||||
t.Log("第三阶段:继续发送正常数据,推进 watermark")
|
||||
for i := 10; i < 15; i++ {
|
||||
eventTime := baseTime + int64(i*200)
|
||||
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
|
||||
if i == 10 && eventTime < requiredEventTimeForTrigger {
|
||||
eventTime = requiredEventTimeForTrigger
|
||||
}
|
||||
ssql.Emit(map[string]interface{}{
|
||||
"deviceId": "sensor001",
|
||||
"eventTime": eventTime,
|
||||
|
||||
@@ -19,6 +19,7 @@ package window
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -27,6 +28,17 @@ import (
|
||||
"github.com/rulego/streamsql/types"
|
||||
)
|
||||
|
||||
// debugLogSliding logs debug information only when EnableDebug is true
|
||||
// This function is optimized to avoid unnecessary string formatting when debug is disabled
|
||||
func debugLogSliding(format string, args ...interface{}) {
|
||||
// Fast path: if debug is disabled, return immediately without evaluating args
|
||||
// The compiler should optimize this check away when EnableDebug is a compile-time constant false
|
||||
if !EnableDebug {
|
||||
return
|
||||
}
|
||||
log.Printf("[SlidingWindow] "+format, args...)
|
||||
}
|
||||
|
||||
// Ensure SlidingWindow implements the Window interface
|
||||
var _ Window = (*SlidingWindow)(nil)
|
||||
|
||||
@@ -171,11 +183,17 @@ func (sw *SlidingWindow) Add(data interface{}) {
|
||||
// For event time, align window start to window boundaries
|
||||
alignedStart := alignWindowStart(eventTime, sw.slide)
|
||||
sw.currentSlot = sw.createSlotFromStart(alignedStart)
|
||||
debugLogSliding("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)",
|
||||
eventTime.UnixMilli(), alignedStart.UnixMilli(),
|
||||
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
|
||||
} else {
|
||||
// For processing time, use current time or event time as-is
|
||||
sw.currentSlot = sw.createSlot(eventTime)
|
||||
// Record when first window started (processing time)
|
||||
sw.firstWindowStartTime = time.Now()
|
||||
debugLogSliding("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)",
|
||||
eventTime.UnixMilli(),
|
||||
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
|
||||
}
|
||||
// Don't start timer here, wait for first window to end
|
||||
// Send initialization complete signal
|
||||
@@ -193,6 +211,10 @@ func (sw *SlidingWindow) Add(data interface{}) {
|
||||
Timestamp: eventTime,
|
||||
}
|
||||
sw.data = append(sw.data, row)
|
||||
debugLogSliding("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v",
|
||||
eventTime.UnixMilli(), len(sw.data),
|
||||
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli(),
|
||||
sw.currentSlot.Contains(eventTime))
|
||||
|
||||
// Check if data is late and handle allowedLateness (after data is added)
|
||||
if timeChar == types.EventTime && sw.watermark != nil {
|
||||
@@ -211,7 +233,7 @@ func (sw *SlidingWindow) Add(data interface{}) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// If not belonging to triggered window, check if it belongs to currentSlot
|
||||
// This handles the case where watermark has advanced but window hasn't triggered yet
|
||||
if !belongsToTriggeredWindow && sw.initialized && sw.currentSlot != nil && sw.currentSlot.Contains(eventTime) {
|
||||
@@ -379,6 +401,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
defer sw.mu.Unlock()
|
||||
|
||||
if !sw.initialized || sw.currentSlot == nil {
|
||||
debugLogSliding("checkAndTriggerWindows: not initialized or currentSlot is nil")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -391,27 +414,43 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
// Which means: maxEventTime >= windowEnd + maxOutOfOrderness
|
||||
// This ensures all data for the window has arrived (within maxOutOfOrderness tolerance)
|
||||
// Use a small threshold (1ms) only for floating point precision issues
|
||||
totalDataCount := len(sw.data)
|
||||
debugLogSliding("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)",
|
||||
watermarkTime.UnixMilli(), totalDataCount,
|
||||
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
|
||||
|
||||
for sw.currentSlot != nil {
|
||||
windowEnd := sw.currentSlot.End
|
||||
|
||||
windowStart := sw.currentSlot.Start
|
||||
|
||||
// Check if watermark >= windowEnd
|
||||
// Use !Before() instead of After() to include equality case
|
||||
// This is equivalent to watermarkTime >= windowEnd
|
||||
shouldTrigger := !watermarkTime.Before(*windowEnd)
|
||||
|
||||
|
||||
debugLogSliding("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger)
|
||||
|
||||
if !shouldTrigger {
|
||||
// Watermark hasn't reached windowEnd yet, stop checking
|
||||
debugLogSliding("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping")
|
||||
break
|
||||
}
|
||||
// Check if window has data before triggering
|
||||
hasData := false
|
||||
dataInWindow := 0
|
||||
var dataTimestamps []int64
|
||||
for _, item := range sw.data {
|
||||
if sw.currentSlot.Contains(item.Timestamp) {
|
||||
hasData = true
|
||||
break
|
||||
dataInWindow++
|
||||
dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli())
|
||||
}
|
||||
}
|
||||
|
||||
debugLogSliding("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps)
|
||||
|
||||
// Trigger current window only if it has data
|
||||
if hasData {
|
||||
// Count data in window before triggering
|
||||
@@ -421,7 +460,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
dataInWindow++
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Save snapshot data before triggering (for Flink-like late update behavior)
|
||||
var snapshotData []types.Row
|
||||
if allowedLateness > 0 {
|
||||
@@ -438,8 +477,11 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
debugLogSliding("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
|
||||
sw.triggerWindowLocked()
|
||||
debugLogSliding("checkAndTriggerWindows: window triggered successfully")
|
||||
|
||||
// If allowedLateness > 0, keep window open for late data
|
||||
if allowedLateness > 0 {
|
||||
@@ -450,11 +492,23 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
closeTime: closeTime,
|
||||
snapshotData: snapshotData, // Save snapshot for late updates
|
||||
}
|
||||
debugLogSliding("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli())
|
||||
}
|
||||
} else {
|
||||
debugLogSliding("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli())
|
||||
}
|
||||
|
||||
// Move to next window (even if current window was empty)
|
||||
sw.currentSlot = sw.NextSlot()
|
||||
if sw.currentSlot != nil {
|
||||
debugLogSliding("checkAndTriggerWindows: moved to next window [%v, %v)",
|
||||
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
|
||||
} else {
|
||||
debugLogSliding("checkAndTriggerWindows: NextSlot returned nil, stopping")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Close windows that have exceeded allowedLateness
|
||||
@@ -804,7 +858,7 @@ func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
|
||||
|
||||
// Collect all data for this window: original snapshot + late data from sw.data
|
||||
resultData := make([]types.Row, 0)
|
||||
|
||||
|
||||
// First, add original snapshot data (if exists)
|
||||
if windowInfo != nil && len(windowInfo.snapshotData) > 0 {
|
||||
// Create copies of snapshot data
|
||||
@@ -816,7 +870,7 @@ func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Then, add late data from sw.data (newly arrived late data)
|
||||
lateDataCount := 0
|
||||
for _, item := range sw.data {
|
||||
@@ -884,7 +938,7 @@ func (sw *SlidingWindow) closeExpiredWindows(watermarkTime time.Time) {
|
||||
delete(sw.triggeredWindows, key)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Clean up data that belongs to expired windows (if any)
|
||||
if len(expiredWindows) > 0 {
|
||||
newData := make([]types.Row, 0)
|
||||
|
||||
@@ -19,6 +19,7 @@ package window
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -26,6 +27,20 @@ import (
|
||||
"github.com/rulego/streamsql/utils/cast"
|
||||
)
|
||||
|
||||
// EnableDebug enables debug logging for window operations
|
||||
var EnableDebug = false
|
||||
|
||||
// debugLog logs debug information only when EnableDebug is true
|
||||
// This function is optimized to avoid unnecessary string formatting when debug is disabled
|
||||
func debugLog(format string, args ...interface{}) {
|
||||
// Fast path: if debug is disabled, return immediately without evaluating args
|
||||
// The compiler should optimize this check away when EnableDebug is a compile-time constant false
|
||||
if !EnableDebug {
|
||||
return
|
||||
}
|
||||
log.Printf("[TumblingWindow] "+format, args...)
|
||||
}
|
||||
|
||||
// Ensure TumblingWindow implements the Window interface
|
||||
var _ Window = (*TumblingWindow)(nil)
|
||||
|
||||
@@ -160,10 +175,16 @@ func (tw *TumblingWindow) Add(data interface{}) {
|
||||
// Alignment granularity equals window size (e.g., 2s window aligns to 2s boundaries)
|
||||
alignedStart := alignWindowStart(eventTime, tw.size)
|
||||
tw.currentSlot = tw.createSlotFromStart(alignedStart)
|
||||
debugLog("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)",
|
||||
eventTime.UnixMilli(), alignedStart.UnixMilli(),
|
||||
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
|
||||
} else {
|
||||
// For processing time, use current time or event time as-is
|
||||
// No alignment is performed - window starts immediately when first data arrives
|
||||
tw.currentSlot = tw.createSlot(eventTime)
|
||||
debugLog("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)",
|
||||
eventTime.UnixMilli(),
|
||||
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
|
||||
}
|
||||
|
||||
// Only start timer for processing time
|
||||
@@ -189,6 +210,10 @@ func (tw *TumblingWindow) Add(data interface{}) {
|
||||
Timestamp: eventTime,
|
||||
}
|
||||
tw.data = append(tw.data, row)
|
||||
debugLog("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v",
|
||||
eventTime.UnixMilli(), len(tw.data),
|
||||
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli(),
|
||||
tw.currentSlot.Contains(eventTime))
|
||||
|
||||
// Check if data is late and handle allowedLateness (after data is added)
|
||||
if timeChar == types.EventTime && tw.watermark != nil {
|
||||
@@ -388,6 +413,7 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
defer tw.mu.Unlock()
|
||||
|
||||
if !tw.initialized || tw.currentSlot == nil {
|
||||
debugLog("checkAndTriggerWindows: not initialized or currentSlot is nil")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -400,8 +426,14 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
// watermark may be slightly less than windowEnd. We need to handle this case.
|
||||
// If watermark is very close to windowEnd (within a small threshold), we should also trigger.
|
||||
triggeredCount := 0
|
||||
totalDataCount := len(tw.data)
|
||||
debugLog("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)",
|
||||
watermarkTime.UnixMilli(), totalDataCount,
|
||||
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
|
||||
|
||||
for tw.currentSlot != nil {
|
||||
windowEnd := tw.currentSlot.End
|
||||
windowStart := tw.currentSlot.Start
|
||||
// Trigger if watermark >= windowEnd
|
||||
// In Flink, windows are triggered when watermark >= windowEnd.
|
||||
// Watermark calculation: watermark = maxEventTime - maxOutOfOrderness
|
||||
@@ -413,8 +445,12 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
// This is equivalent to watermarkTime >= windowEnd
|
||||
shouldTrigger := !watermarkTime.Before(*windowEnd)
|
||||
|
||||
debugLog("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger)
|
||||
|
||||
if !shouldTrigger {
|
||||
// Watermark hasn't reached windowEnd yet, stop checking
|
||||
debugLog("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping")
|
||||
break
|
||||
}
|
||||
|
||||
@@ -425,13 +461,18 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
// Check if window has data before triggering
|
||||
hasData := false
|
||||
dataInWindow := 0
|
||||
var dataTimestamps []int64
|
||||
for _, item := range tw.data {
|
||||
if tw.currentSlot.Contains(item.Timestamp) {
|
||||
hasData = true
|
||||
dataInWindow++
|
||||
dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli())
|
||||
}
|
||||
}
|
||||
|
||||
debugLog("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps)
|
||||
|
||||
// Trigger current window only if it has data
|
||||
if hasData {
|
||||
|
||||
@@ -452,8 +493,11 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
debugLog("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
|
||||
tw.triggerWindowLocked()
|
||||
triggeredCount++
|
||||
debugLog("checkAndTriggerWindows: window triggered successfully, triggeredCount=%d", triggeredCount)
|
||||
// triggerWindowLocked releases and re-acquires lock, so we need to re-check state
|
||||
|
||||
// If allowedLateness > 0, keep window open for late data
|
||||
@@ -466,18 +510,32 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
closeTime: closeTime,
|
||||
snapshotData: snapshotData, // Save snapshot for late updates
|
||||
}
|
||||
debugLog("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli())
|
||||
}
|
||||
} else {
|
||||
debugLog("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli())
|
||||
}
|
||||
|
||||
// Move to next window (even if current window was empty)
|
||||
// Re-check currentSlot in case it was modified
|
||||
if tw.currentSlot != nil {
|
||||
tw.currentSlot = tw.NextSlot()
|
||||
if tw.currentSlot != nil {
|
||||
debugLog("checkAndTriggerWindows: moved to next window [%v, %v)",
|
||||
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
|
||||
} else {
|
||||
debugLog("checkAndTriggerWindows: NextSlot returned nil, stopping")
|
||||
}
|
||||
} else {
|
||||
debugLog("checkAndTriggerWindows: currentSlot is nil, breaking")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
debugLog("checkAndTriggerWindows: finished, triggeredCount=%d", triggeredCount)
|
||||
|
||||
// Close windows that have exceeded allowedLateness
|
||||
tw.closeExpiredWindows(watermarkTime)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user