mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-04-03 03:06:24 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4528d1f1a6 | |||
| 6ce76d506c |
@@ -1066,9 +1066,20 @@ func TestSQLSlidingWindow_MaxOutOfOrderness(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 第三阶段:发送更多正常数据,推进 watermark
|
// 第三阶段:发送更多正常数据,推进 watermark
|
||||||
|
// 关键:要触发窗口,需要 watermark >= windowEnd
|
||||||
|
// watermark = maxEventTime - maxOutOfOrderness
|
||||||
|
// 所以需要:maxEventTime >= windowEnd + maxOutOfOrderness
|
||||||
|
windowSizeMs := int64(2000) // 2秒
|
||||||
|
maxOutOfOrdernessMs := int64(1000) // 1秒
|
||||||
|
firstWindowEnd := baseTime + windowSizeMs
|
||||||
|
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
|
||||||
t.Log("第三阶段:继续发送正常数据,推进 watermark")
|
t.Log("第三阶段:继续发送正常数据,推进 watermark")
|
||||||
for i := 10; i < 15; i++ {
|
for i := 10; i < 15; i++ {
|
||||||
eventTime := baseTime + int64(i*200)
|
eventTime := baseTime + int64(i*200)
|
||||||
|
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
|
||||||
|
if i == 10 && eventTime < requiredEventTimeForTrigger {
|
||||||
|
eventTime = requiredEventTimeForTrigger
|
||||||
|
}
|
||||||
ssql.Emit(map[string]interface{}{
|
ssql.Emit(map[string]interface{}{
|
||||||
"deviceId": "sensor001",
|
"deviceId": "sensor001",
|
||||||
"eventTime": eventTime,
|
"eventTime": eventTime,
|
||||||
@@ -1192,11 +1203,17 @@ func TestSQLSlidingWindow_AllowedLateness(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 推进watermark,触发第一个窗口
|
// 推进watermark,触发第一个窗口
|
||||||
// 发送事件时间超过第一个窗口结束时间的数据
|
// 关键:要触发窗口,需要 watermark >= windowEnd
|
||||||
firstWindowEnd := baseTime + int64(2000) // 第一个窗口结束时间
|
// watermark = maxEventTime - maxOutOfOrderness
|
||||||
|
// 所以需要:maxEventTime >= windowEnd + maxOutOfOrderness
|
||||||
|
windowSizeMs := int64(2000) // 2秒
|
||||||
|
maxOutOfOrdernessMs := int64(1000) // 1秒
|
||||||
|
firstWindowEnd := baseTime + windowSizeMs
|
||||||
|
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
|
||||||
|
// 发送事件时间 >= requiredEventTimeForTrigger 的数据,确保 watermark >= windowEnd
|
||||||
ssql.Emit(map[string]interface{}{
|
ssql.Emit(map[string]interface{}{
|
||||||
"deviceId": "sensor001",
|
"deviceId": "sensor001",
|
||||||
"eventTime": firstWindowEnd + int64(2000),
|
"eventTime": requiredEventTimeForTrigger,
|
||||||
"temperature": 100.0,
|
"temperature": 100.0,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -448,7 +448,17 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// 模拟完整的延迟数据处理场景
|
// 模拟完整的延迟数据处理场景
|
||||||
baseTime := time.Now().UnixMilli() - 10000
|
// 关键:确保 baseTime 对齐到窗口边界,以便窗口对齐行为可预测
|
||||||
|
windowSizeMs := int64(2000) // 2秒
|
||||||
|
baseTimeRaw := time.Now().UnixMilli() - 10000
|
||||||
|
baseTime := (baseTimeRaw / windowSizeMs) * windowSizeMs // 对齐到窗口边界
|
||||||
|
maxOutOfOrdernessMs := int64(1000) // 1秒
|
||||||
|
firstWindowEnd := baseTime + windowSizeMs
|
||||||
|
// 关键:要触发窗口,需要 watermark >= windowEnd
|
||||||
|
// watermark = maxEventTime - maxOutOfOrderness
|
||||||
|
// 所以需要:maxEventTime - maxOutOfOrderness >= windowEnd
|
||||||
|
// 即:maxEventTime >= windowEnd + maxOutOfOrderness
|
||||||
|
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
|
||||||
|
|
||||||
// 第一阶段:发送正常顺序的数据
|
// 第一阶段:发送正常顺序的数据
|
||||||
t.Log("第一阶段:发送正常顺序的数据")
|
t.Log("第一阶段:发送正常顺序的数据")
|
||||||
@@ -482,9 +492,14 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 第三阶段:继续发送正常数据,推进 watermark
|
// 第三阶段:继续发送正常数据,推进 watermark
|
||||||
|
// 关键:必须发送事件时间 >= requiredEventTimeForTrigger 的数据,才能让 watermark >= windowEnd
|
||||||
t.Log("第三阶段:继续发送正常数据,推进 watermark")
|
t.Log("第三阶段:继续发送正常数据,推进 watermark")
|
||||||
for i := 10; i < 15; i++ {
|
for i := 10; i < 15; i++ {
|
||||||
eventTime := baseTime + int64(i*200)
|
eventTime := baseTime + int64(i*200)
|
||||||
|
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
|
||||||
|
if i == 10 && eventTime < requiredEventTimeForTrigger {
|
||||||
|
eventTime = requiredEventTimeForTrigger
|
||||||
|
}
|
||||||
ssql.Emit(map[string]interface{}{
|
ssql.Emit(map[string]interface{}{
|
||||||
"deviceId": "sensor001",
|
"deviceId": "sensor001",
|
||||||
"eventTime": eventTime,
|
"eventTime": eventTime,
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ package window
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -27,6 +28,17 @@ import (
|
|||||||
"github.com/rulego/streamsql/types"
|
"github.com/rulego/streamsql/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// debugLogSliding logs debug information only when EnableDebug is true
|
||||||
|
// This function is optimized to avoid unnecessary string formatting when debug is disabled
|
||||||
|
func debugLogSliding(format string, args ...interface{}) {
|
||||||
|
// Fast path: if debug is disabled, return immediately without evaluating args
|
||||||
|
// The compiler should optimize this check away when EnableDebug is a compile-time constant false
|
||||||
|
if !EnableDebug {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Printf("[SlidingWindow] "+format, args...)
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure SlidingWindow implements the Window interface
|
// Ensure SlidingWindow implements the Window interface
|
||||||
var _ Window = (*SlidingWindow)(nil)
|
var _ Window = (*SlidingWindow)(nil)
|
||||||
|
|
||||||
@@ -171,11 +183,17 @@ func (sw *SlidingWindow) Add(data interface{}) {
|
|||||||
// For event time, align window start to window boundaries
|
// For event time, align window start to window boundaries
|
||||||
alignedStart := alignWindowStart(eventTime, sw.slide)
|
alignedStart := alignWindowStart(eventTime, sw.slide)
|
||||||
sw.currentSlot = sw.createSlotFromStart(alignedStart)
|
sw.currentSlot = sw.createSlotFromStart(alignedStart)
|
||||||
|
debugLogSliding("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)",
|
||||||
|
eventTime.UnixMilli(), alignedStart.UnixMilli(),
|
||||||
|
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
|
||||||
} else {
|
} else {
|
||||||
// For processing time, use current time or event time as-is
|
// For processing time, use current time or event time as-is
|
||||||
sw.currentSlot = sw.createSlot(eventTime)
|
sw.currentSlot = sw.createSlot(eventTime)
|
||||||
// Record when first window started (processing time)
|
// Record when first window started (processing time)
|
||||||
sw.firstWindowStartTime = time.Now()
|
sw.firstWindowStartTime = time.Now()
|
||||||
|
debugLogSliding("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)",
|
||||||
|
eventTime.UnixMilli(),
|
||||||
|
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
|
||||||
}
|
}
|
||||||
// Don't start timer here, wait for first window to end
|
// Don't start timer here, wait for first window to end
|
||||||
// Send initialization complete signal
|
// Send initialization complete signal
|
||||||
@@ -193,6 +211,10 @@ func (sw *SlidingWindow) Add(data interface{}) {
|
|||||||
Timestamp: eventTime,
|
Timestamp: eventTime,
|
||||||
}
|
}
|
||||||
sw.data = append(sw.data, row)
|
sw.data = append(sw.data, row)
|
||||||
|
debugLogSliding("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v",
|
||||||
|
eventTime.UnixMilli(), len(sw.data),
|
||||||
|
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli(),
|
||||||
|
sw.currentSlot.Contains(eventTime))
|
||||||
|
|
||||||
// Check if data is late and handle allowedLateness (after data is added)
|
// Check if data is late and handle allowedLateness (after data is added)
|
||||||
if timeChar == types.EventTime && sw.watermark != nil {
|
if timeChar == types.EventTime && sw.watermark != nil {
|
||||||
@@ -211,7 +233,7 @@ func (sw *SlidingWindow) Add(data interface{}) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If not belonging to triggered window, check if it belongs to currentSlot
|
// If not belonging to triggered window, check if it belongs to currentSlot
|
||||||
// This handles the case where watermark has advanced but window hasn't triggered yet
|
// This handles the case where watermark has advanced but window hasn't triggered yet
|
||||||
if !belongsToTriggeredWindow && sw.initialized && sw.currentSlot != nil && sw.currentSlot.Contains(eventTime) {
|
if !belongsToTriggeredWindow && sw.initialized && sw.currentSlot != nil && sw.currentSlot.Contains(eventTime) {
|
||||||
@@ -379,6 +401,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
defer sw.mu.Unlock()
|
defer sw.mu.Unlock()
|
||||||
|
|
||||||
if !sw.initialized || sw.currentSlot == nil {
|
if !sw.initialized || sw.currentSlot == nil {
|
||||||
|
debugLogSliding("checkAndTriggerWindows: not initialized or currentSlot is nil")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -391,27 +414,43 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
// Which means: maxEventTime >= windowEnd + maxOutOfOrderness
|
// Which means: maxEventTime >= windowEnd + maxOutOfOrderness
|
||||||
// This ensures all data for the window has arrived (within maxOutOfOrderness tolerance)
|
// This ensures all data for the window has arrived (within maxOutOfOrderness tolerance)
|
||||||
// Use a small threshold (1ms) only for floating point precision issues
|
// Use a small threshold (1ms) only for floating point precision issues
|
||||||
|
totalDataCount := len(sw.data)
|
||||||
|
debugLogSliding("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)",
|
||||||
|
watermarkTime.UnixMilli(), totalDataCount,
|
||||||
|
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
|
||||||
|
|
||||||
for sw.currentSlot != nil {
|
for sw.currentSlot != nil {
|
||||||
windowEnd := sw.currentSlot.End
|
windowEnd := sw.currentSlot.End
|
||||||
|
windowStart := sw.currentSlot.Start
|
||||||
|
|
||||||
// Check if watermark >= windowEnd
|
// Check if watermark >= windowEnd
|
||||||
// Use !Before() instead of After() to include equality case
|
// Use !Before() instead of After() to include equality case
|
||||||
// This is equivalent to watermarkTime >= windowEnd
|
// This is equivalent to watermarkTime >= windowEnd
|
||||||
shouldTrigger := !watermarkTime.Before(*windowEnd)
|
shouldTrigger := !watermarkTime.Before(*windowEnd)
|
||||||
|
|
||||||
|
debugLogSliding("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
|
||||||
|
windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger)
|
||||||
|
|
||||||
if !shouldTrigger {
|
if !shouldTrigger {
|
||||||
// Watermark hasn't reached windowEnd yet, stop checking
|
// Watermark hasn't reached windowEnd yet, stop checking
|
||||||
|
debugLogSliding("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Check if window has data before triggering
|
// Check if window has data before triggering
|
||||||
hasData := false
|
hasData := false
|
||||||
|
dataInWindow := 0
|
||||||
|
var dataTimestamps []int64
|
||||||
for _, item := range sw.data {
|
for _, item := range sw.data {
|
||||||
if sw.currentSlot.Contains(item.Timestamp) {
|
if sw.currentSlot.Contains(item.Timestamp) {
|
||||||
hasData = true
|
hasData = true
|
||||||
break
|
dataInWindow++
|
||||||
|
dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debugLogSliding("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v",
|
||||||
|
windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps)
|
||||||
|
|
||||||
// Trigger current window only if it has data
|
// Trigger current window only if it has data
|
||||||
if hasData {
|
if hasData {
|
||||||
// Count data in window before triggering
|
// Count data in window before triggering
|
||||||
@@ -421,7 +460,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
dataInWindow++
|
dataInWindow++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save snapshot data before triggering (for Flink-like late update behavior)
|
// Save snapshot data before triggering (for Flink-like late update behavior)
|
||||||
var snapshotData []types.Row
|
var snapshotData []types.Row
|
||||||
if allowedLateness > 0 {
|
if allowedLateness > 0 {
|
||||||
@@ -438,8 +477,11 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debugLogSliding("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
|
||||||
|
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
|
||||||
sw.triggerWindowLocked()
|
sw.triggerWindowLocked()
|
||||||
|
debugLogSliding("checkAndTriggerWindows: window triggered successfully")
|
||||||
|
|
||||||
// If allowedLateness > 0, keep window open for late data
|
// If allowedLateness > 0, keep window open for late data
|
||||||
if allowedLateness > 0 {
|
if allowedLateness > 0 {
|
||||||
@@ -450,11 +492,23 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
closeTime: closeTime,
|
closeTime: closeTime,
|
||||||
snapshotData: snapshotData, // Save snapshot for late updates
|
snapshotData: snapshotData, // Save snapshot for late updates
|
||||||
}
|
}
|
||||||
|
debugLogSliding("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v",
|
||||||
|
windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli())
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
debugLogSliding("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger",
|
||||||
|
windowStart.UnixMilli(), windowEnd.UnixMilli())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move to next window (even if current window was empty)
|
// Move to next window (even if current window was empty)
|
||||||
sw.currentSlot = sw.NextSlot()
|
sw.currentSlot = sw.NextSlot()
|
||||||
|
if sw.currentSlot != nil {
|
||||||
|
debugLogSliding("checkAndTriggerWindows: moved to next window [%v, %v)",
|
||||||
|
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
|
||||||
|
} else {
|
||||||
|
debugLogSliding("checkAndTriggerWindows: NextSlot returned nil, stopping")
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close windows that have exceeded allowedLateness
|
// Close windows that have exceeded allowedLateness
|
||||||
@@ -804,7 +858,7 @@ func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
|
|||||||
|
|
||||||
// Collect all data for this window: original snapshot + late data from sw.data
|
// Collect all data for this window: original snapshot + late data from sw.data
|
||||||
resultData := make([]types.Row, 0)
|
resultData := make([]types.Row, 0)
|
||||||
|
|
||||||
// First, add original snapshot data (if exists)
|
// First, add original snapshot data (if exists)
|
||||||
if windowInfo != nil && len(windowInfo.snapshotData) > 0 {
|
if windowInfo != nil && len(windowInfo.snapshotData) > 0 {
|
||||||
// Create copies of snapshot data
|
// Create copies of snapshot data
|
||||||
@@ -816,7 +870,7 @@ func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then, add late data from sw.data (newly arrived late data)
|
// Then, add late data from sw.data (newly arrived late data)
|
||||||
lateDataCount := 0
|
lateDataCount := 0
|
||||||
for _, item := range sw.data {
|
for _, item := range sw.data {
|
||||||
@@ -884,7 +938,7 @@ func (sw *SlidingWindow) closeExpiredWindows(watermarkTime time.Time) {
|
|||||||
delete(sw.triggeredWindows, key)
|
delete(sw.triggeredWindows, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up data that belongs to expired windows (if any)
|
// Clean up data that belongs to expired windows (if any)
|
||||||
if len(expiredWindows) > 0 {
|
if len(expiredWindows) > 0 {
|
||||||
newData := make([]types.Row, 0)
|
newData := make([]types.Row, 0)
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ package window
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -26,6 +27,20 @@ import (
|
|||||||
"github.com/rulego/streamsql/utils/cast"
|
"github.com/rulego/streamsql/utils/cast"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// EnableDebug enables debug logging for window operations
|
||||||
|
var EnableDebug = false
|
||||||
|
|
||||||
|
// debugLog logs debug information only when EnableDebug is true
|
||||||
|
// This function is optimized to avoid unnecessary string formatting when debug is disabled
|
||||||
|
func debugLog(format string, args ...interface{}) {
|
||||||
|
// Fast path: if debug is disabled, return immediately without evaluating args
|
||||||
|
// The compiler should optimize this check away when EnableDebug is a compile-time constant false
|
||||||
|
if !EnableDebug {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Printf("[TumblingWindow] "+format, args...)
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure TumblingWindow implements the Window interface
|
// Ensure TumblingWindow implements the Window interface
|
||||||
var _ Window = (*TumblingWindow)(nil)
|
var _ Window = (*TumblingWindow)(nil)
|
||||||
|
|
||||||
@@ -160,10 +175,16 @@ func (tw *TumblingWindow) Add(data interface{}) {
|
|||||||
// Alignment granularity equals window size (e.g., 2s window aligns to 2s boundaries)
|
// Alignment granularity equals window size (e.g., 2s window aligns to 2s boundaries)
|
||||||
alignedStart := alignWindowStart(eventTime, tw.size)
|
alignedStart := alignWindowStart(eventTime, tw.size)
|
||||||
tw.currentSlot = tw.createSlotFromStart(alignedStart)
|
tw.currentSlot = tw.createSlotFromStart(alignedStart)
|
||||||
|
debugLog("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)",
|
||||||
|
eventTime.UnixMilli(), alignedStart.UnixMilli(),
|
||||||
|
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
|
||||||
} else {
|
} else {
|
||||||
// For processing time, use current time or event time as-is
|
// For processing time, use current time or event time as-is
|
||||||
// No alignment is performed - window starts immediately when first data arrives
|
// No alignment is performed - window starts immediately when first data arrives
|
||||||
tw.currentSlot = tw.createSlot(eventTime)
|
tw.currentSlot = tw.createSlot(eventTime)
|
||||||
|
debugLog("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)",
|
||||||
|
eventTime.UnixMilli(),
|
||||||
|
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only start timer for processing time
|
// Only start timer for processing time
|
||||||
@@ -189,6 +210,10 @@ func (tw *TumblingWindow) Add(data interface{}) {
|
|||||||
Timestamp: eventTime,
|
Timestamp: eventTime,
|
||||||
}
|
}
|
||||||
tw.data = append(tw.data, row)
|
tw.data = append(tw.data, row)
|
||||||
|
debugLog("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v",
|
||||||
|
eventTime.UnixMilli(), len(tw.data),
|
||||||
|
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli(),
|
||||||
|
tw.currentSlot.Contains(eventTime))
|
||||||
|
|
||||||
// Check if data is late and handle allowedLateness (after data is added)
|
// Check if data is late and handle allowedLateness (after data is added)
|
||||||
if timeChar == types.EventTime && tw.watermark != nil {
|
if timeChar == types.EventTime && tw.watermark != nil {
|
||||||
@@ -388,6 +413,7 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
defer tw.mu.Unlock()
|
defer tw.mu.Unlock()
|
||||||
|
|
||||||
if !tw.initialized || tw.currentSlot == nil {
|
if !tw.initialized || tw.currentSlot == nil {
|
||||||
|
debugLog("checkAndTriggerWindows: not initialized or currentSlot is nil")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -400,8 +426,14 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
// watermark may be slightly less than windowEnd. We need to handle this case.
|
// watermark may be slightly less than windowEnd. We need to handle this case.
|
||||||
// If watermark is very close to windowEnd (within a small threshold), we should also trigger.
|
// If watermark is very close to windowEnd (within a small threshold), we should also trigger.
|
||||||
triggeredCount := 0
|
triggeredCount := 0
|
||||||
|
totalDataCount := len(tw.data)
|
||||||
|
debugLog("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)",
|
||||||
|
watermarkTime.UnixMilli(), totalDataCount,
|
||||||
|
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
|
||||||
|
|
||||||
for tw.currentSlot != nil {
|
for tw.currentSlot != nil {
|
||||||
windowEnd := tw.currentSlot.End
|
windowEnd := tw.currentSlot.End
|
||||||
|
windowStart := tw.currentSlot.Start
|
||||||
// Trigger if watermark >= windowEnd
|
// Trigger if watermark >= windowEnd
|
||||||
// In Flink, windows are triggered when watermark >= windowEnd.
|
// In Flink, windows are triggered when watermark >= windowEnd.
|
||||||
// Watermark calculation: watermark = maxEventTime - maxOutOfOrderness
|
// Watermark calculation: watermark = maxEventTime - maxOutOfOrderness
|
||||||
@@ -413,8 +445,12 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
// This is equivalent to watermarkTime >= windowEnd
|
// This is equivalent to watermarkTime >= windowEnd
|
||||||
shouldTrigger := !watermarkTime.Before(*windowEnd)
|
shouldTrigger := !watermarkTime.Before(*windowEnd)
|
||||||
|
|
||||||
|
debugLog("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
|
||||||
|
windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger)
|
||||||
|
|
||||||
if !shouldTrigger {
|
if !shouldTrigger {
|
||||||
// Watermark hasn't reached windowEnd yet, stop checking
|
// Watermark hasn't reached windowEnd yet, stop checking
|
||||||
|
debugLog("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -425,13 +461,18 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
// Check if window has data before triggering
|
// Check if window has data before triggering
|
||||||
hasData := false
|
hasData := false
|
||||||
dataInWindow := 0
|
dataInWindow := 0
|
||||||
|
var dataTimestamps []int64
|
||||||
for _, item := range tw.data {
|
for _, item := range tw.data {
|
||||||
if tw.currentSlot.Contains(item.Timestamp) {
|
if tw.currentSlot.Contains(item.Timestamp) {
|
||||||
hasData = true
|
hasData = true
|
||||||
dataInWindow++
|
dataInWindow++
|
||||||
|
dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debugLog("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v",
|
||||||
|
windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps)
|
||||||
|
|
||||||
// Trigger current window only if it has data
|
// Trigger current window only if it has data
|
||||||
if hasData {
|
if hasData {
|
||||||
|
|
||||||
@@ -452,8 +493,11 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debugLog("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
|
||||||
|
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
|
||||||
tw.triggerWindowLocked()
|
tw.triggerWindowLocked()
|
||||||
triggeredCount++
|
triggeredCount++
|
||||||
|
debugLog("checkAndTriggerWindows: window triggered successfully, triggeredCount=%d", triggeredCount)
|
||||||
// triggerWindowLocked releases and re-acquires lock, so we need to re-check state
|
// triggerWindowLocked releases and re-acquires lock, so we need to re-check state
|
||||||
|
|
||||||
// If allowedLateness > 0, keep window open for late data
|
// If allowedLateness > 0, keep window open for late data
|
||||||
@@ -466,18 +510,32 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
|||||||
closeTime: closeTime,
|
closeTime: closeTime,
|
||||||
snapshotData: snapshotData, // Save snapshot for late updates
|
snapshotData: snapshotData, // Save snapshot for late updates
|
||||||
}
|
}
|
||||||
|
debugLog("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v",
|
||||||
|
windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli())
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
debugLog("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger",
|
||||||
|
windowStart.UnixMilli(), windowEnd.UnixMilli())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move to next window (even if current window was empty)
|
// Move to next window (even if current window was empty)
|
||||||
// Re-check currentSlot in case it was modified
|
// Re-check currentSlot in case it was modified
|
||||||
if tw.currentSlot != nil {
|
if tw.currentSlot != nil {
|
||||||
tw.currentSlot = tw.NextSlot()
|
tw.currentSlot = tw.NextSlot()
|
||||||
|
if tw.currentSlot != nil {
|
||||||
|
debugLog("checkAndTriggerWindows: moved to next window [%v, %v)",
|
||||||
|
tw.currentSlot.Start.UnixMilli(), tw.currentSlot.End.UnixMilli())
|
||||||
|
} else {
|
||||||
|
debugLog("checkAndTriggerWindows: NextSlot returned nil, stopping")
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
|
debugLog("checkAndTriggerWindows: currentSlot is nil, breaking")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debugLog("checkAndTriggerWindows: finished, triggeredCount=%d", triggeredCount)
|
||||||
|
|
||||||
// Close windows that have exceeded allowedLateness
|
// Close windows that have exceeded allowedLateness
|
||||||
tw.closeExpiredWindows(watermarkTime)
|
tw.closeExpiredWindows(watermarkTime)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user