From c7cece15e2781c19ceac31720f115e3d04352b87 Mon Sep 17 00:00:00 2001 From: rulego-team Date: Sat, 20 Dec 2025 15:45:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:window=E8=BE=93=E5=87=BA=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=BA=A2=E5=87=BA=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stream/manager_metrics.go | 11 +- streamsql_strategy_test.go | 150 +++++++++++++++++++++++++++ types/config.go | 9 ++ window/counting_window.go | 85 ++++++++++----- window/factory.go | 1 + window/session_window.go | 122 +++++++++++++++++----- window/sliding_window.go | 205 ++++++++++++++++++------------------- window/strategy_test.go | 204 ++++++++++++++++++++++++++++++++++++ window/tumbling_window.go | 168 +++++++++++++----------------- window/window_test.go | 2 +- 10 files changed, 705 insertions(+), 252 deletions(-) create mode 100644 streamsql_strategy_test.go create mode 100644 window/strategy_test.go diff --git a/stream/manager_metrics.go b/stream/manager_metrics.go index a6bc530..a28825b 100644 --- a/stream/manager_metrics.go +++ b/stream/manager_metrics.go @@ -42,7 +42,7 @@ func (s *Stream) GetStats() map[string]int64 { dataChanCap := int64(cap(s.dataChan)) s.dataChanMux.RUnlock() - return map[string]int64{ + stats := map[string]int64{ InputCount: atomic.LoadInt64(&s.inputCount), OutputCount: atomic.LoadInt64(&s.outputCount), DroppedCount: atomic.LoadInt64(&s.droppedCount), @@ -55,6 +55,15 @@ func (s *Stream) GetStats() map[string]int64 { ActiveRetries: int64(atomic.LoadInt32(&s.activeRetries)), Expanding: int64(atomic.LoadInt32(&s.expanding)), } + + if s.Window != nil { + winStats := s.Window.GetStats() + for k, v := range winStats { + stats[k] = v + } + } + + return stats } // GetDetailedStats gets detailed performance statistics diff --git a/streamsql_strategy_test.go b/streamsql_strategy_test.go new file mode 100644 index 0000000..c9daf8b --- /dev/null +++ b/streamsql_strategy_test.go @@ -0,0 +1,150 @@ +package streamsql + +import ( + "testing" + "time" + + "github.com/rulego/streamsql/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestSQLIntegration_StrategyBlock 测试 SQL 集成下的阻塞策略 +func TestSQLIntegration_StrategyBlock(t *testing.T) { + // 配置:输出缓冲为 1,阻塞策略,超时 100ms + ssql := New(WithCustomPerformance(types.PerformanceConfig{ + BufferConfig: types.BufferConfig{ + DataChannelSize: 100, + ResultChannelSize: 100, + WindowOutputSize: 1, + }, + OverflowConfig: types.OverflowConfig{ + Strategy: types.OverflowStrategyBlock, + BlockTimeout: 100 * time.Millisecond, + AllowDataLoss: true, + }, + WorkerConfig: types.WorkerConfig{ + SinkPoolSize: 0, // 无缓冲任务队列 + SinkWorkerCount: 1, // 1个 worker + }, + })) + defer ssql.Stop() + + // SQL: 每条数据触发一次窗口 + rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)" + err := ssql.Execute(rsql) + require.NoError(t, err) + + // 添加同步 Sink 阻塞 Stream 处理,从而反压 Window + // 注意:必须在 Execute 之后添加,因为 Execute 才会创建 stream + ssql.AddSyncSink(func(results []map[string]interface{}) { + time.Sleep(500 * time.Millisecond) + }) + + // 发送 5 条数据 + // d1: Worker 处理中 (阻塞 500ms) + // d2: Stream 尝试写入 WorkerPool -> 阻塞 (无缓冲) + // d3: Window OutputChan (size 1) -> 填满 + // d4: Window OutputChan 满 -> 尝试写入 -> 阻塞 (Window Add) -> 放入 TriggerChan (size=1) + // d5: Window Add -> TriggerChan 满 -> 阻塞? No, Emit 是异步的? + // Emit 往 dataChan 写. DataProcessor 读 dataChan -> Window.Add. + // Window.Add 往 triggerChan 写. + // + // 修正分析: + // Window.Add 是非阻塞的 (如果 triggerChan 不满). + // CountingWindow triggerChan size = bufferSize = 1. + // Worker 协程: 从 triggerChan 读 -> 处理 -> sendResult (到 OutputChan). + // + // d1: Worker读triggerChan -> OutputChan -> Stream -> WorkerPool -> Worker(busy). + // d2: Worker读triggerChan -> OutputChan -> Stream -> Blocked on WorkerPool. + // 此时 Stream 持有 d2. OutputChan 空. + // Worker 协程 阻塞在 sendResult(d2)? No, Stream 取走了 d2, Stream 阻塞在 dispatch. + // 所以 OutputChan 是空的! + // Wait, Stream loop: + // result := <-OutputChan. (Stream has d2). + // handleResult(d2) -> Blocked. + // So OutputChan is empty. + // d3: Worker读triggerChan -> OutputChan (d3). Success. + // OutputChan has d3. + // d4: Worker读triggerChan -> OutputChan (d4). Blocked (OutputChan full). + // Worker 协程 阻塞在 sendResult(d4). + // d5: Add -> triggerChan (d5). Success (triggerChan size 1). + // d6: Add -> triggerChan (d6). Blocked (triggerChan full). + // Add blocks. DataProcessor blocks. Emit succeeds (dataChan). + // + // 所以 Window Worker 只有在 sendResult 阻塞时才触发 Drop logic. + // sendResult 只有在 OutputChan 满且超时时才 Drop. + // + // d4 阻塞在 sendResult. + // 100ms 后超时 -> Drop d4. + // Worker 继续. + // + // 所以 d4 应该是被 Drop 的那个. + // Sent: d1, d2, d3. (d5 在 triggerChan, d6 在 dataChan). + // Wait, d5 is in triggerChan, not processed yet. + // So Sent = 3. Dropped = 1 (d4). + + for _, id := range []string{"d1", "d2", "d3", "d4", "d5"} { + ssql.Emit(map[string]interface{}{"deviceId": id}) + time.Sleep(10 * time.Millisecond) + } + + // 等待足够长的时间让 Stream 醒来并处理完,以及 Window 丢弃逻辑执行 + time.Sleep(1000 * time.Millisecond) + + // 获取统计信息 + // d1: Stream 处理完 + // d2: Stream 处理完 (Worker 醒来后处理 d2) + // d3: Dropped (Worker 阻塞 -> 超时) + // d4: Dropped (Worker 阻塞 -> 超时) + // d5: Dropped (Worker 阻塞 -> 超时) + // Total Sent: 2 (d1, d2). + // Dropped: 3 (d3, d4, d5). + stats := ssql.stream.GetStats() + assert.Equal(t, int64(3), stats["droppedCount"], "Should have 3 dropped window result due to overflow") + assert.Equal(t, int64(2), stats["sentCount"], "Should have 2 sent window result") +} + +// TestSQLIntegration_StrategyDrop 测试 SQL 集成下的丢弃策略 +func TestSQLIntegration_StrategyDrop(t *testing.T) { + // 配置:输出缓冲为 1,丢弃策略 + ssql := New(WithCustomPerformance(types.PerformanceConfig{ + BufferConfig: types.BufferConfig{ + DataChannelSize: 100, + ResultChannelSize: 100, + WindowOutputSize: 1, + }, + OverflowConfig: types.OverflowConfig{ + Strategy: types.OverflowStrategyDrop, + }, + })) + defer ssql.Stop() + + // SQL: 每条数据触发一次窗口 + rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)" + err := ssql.Execute(rsql) + require.NoError(t, err) + + // 连续发送 3 条数据 + ssql.Emit(map[string]interface{}{"deviceId": "d1"}) + ssql.Emit(map[string]interface{}{"deviceId": "d2"}) + ssql.Emit(map[string]interface{}{"deviceId": "d3"}) + + // 等待处理完成 + time.Sleep(200 * time.Millisecond) + + // 对于 StrategyDrop,它会挤掉旧数据,所以 sentCount 应该持续增加 + stats := ssql.stream.GetStats() + // d1, d2, d3 都会成功发送(虽然 d1, d2 可能被挤掉,但 sendResult 逻辑中挤掉旧的后写入新的算发送成功) + assert.Equal(t, int64(3), stats["sentCount"]) + + // 验证最终留在缓冲区的是最后一条数据 (d3) + // 注意:AddSink 会启动 worker 从 OutputChan 读。 + // 为了验证,我们直接从 Window 的 OutputChan 读 + select { + case result := <-ssql.stream.Window.OutputChan(): + assert.Equal(t, "d3", result[0].Data.(map[string]interface{})["deviceId"]) + case <-time.After(100 * time.Millisecond): + // 如果已经被 AddSink 的 worker 读走了也正常,但由于我们没加 Sink,所以应该在里面 + } +} diff --git a/types/config.go b/types/config.go index 3fa4130..642db86 100644 --- a/types/config.go +++ b/types/config.go @@ -6,6 +6,15 @@ import ( "github.com/rulego/streamsql/aggregator" ) +const ( + // OverflowStrategyBlock blocks when buffer is full + OverflowStrategyBlock = "block" + // OverflowStrategyDrop drops data when buffer is full + OverflowStrategyDrop = "drop" + // OverflowStrategyExpand expands buffer when full (not implemented for windows yet) + OverflowStrategyExpand = "expand" +) + // Config stream processing configuration type Config struct { // SQL processing related configuration diff --git a/window/counting_window.go b/window/counting_window.go index b2c30ba..4d30f10 100644 --- a/window/counting_window.go +++ b/window/counting_window.go @@ -22,6 +22,8 @@ import ( "reflect" "strings" "sync" + "sync/atomic" + "time" "github.com/rulego/streamsql/utils/cast" @@ -75,12 +77,9 @@ func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error) { } // Use unified performance config to get window output buffer size - bufferSize := 100 // Default value - if (config.PerformanceConfig != types.PerformanceConfig{}) { + bufferSize := 1000 // Default value + if config.PerformanceConfig.BufferConfig.WindowOutputSize > 0 { bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize - if bufferSize < 10 { - bufferSize = 10 // Minimum value - } } cw := &CountingWindow{ @@ -161,18 +160,7 @@ func (cw *CountingWindow) Start() { cw.callback(data) } - select { - case cw.outputChan <- data: - cw.mu.Lock() - cw.sentCount++ - cw.mu.Unlock() - case <-cw.ctx.Done(): - return - default: - cw.mu.Lock() - cw.droppedCount++ - cw.mu.Unlock() - } + cw.sendResult(data) } else { cw.mu.Unlock() } @@ -184,6 +172,58 @@ func (cw *CountingWindow) Start() { }() } +func (cw *CountingWindow) sendResult(data []types.Row) { + strategy := cw.config.PerformanceConfig.OverflowConfig.Strategy + timeout := cw.config.PerformanceConfig.OverflowConfig.BlockTimeout + + if strategy == types.OverflowStrategyBlock { + if timeout <= 0 { + timeout = 5 * time.Second + } + select { + case cw.outputChan <- data: + atomic.AddInt64(&cw.sentCount, 1) + case <-time.After(timeout): + // Timeout, check if data loss is allowed + if cw.config.PerformanceConfig.OverflowConfig.AllowDataLoss { + atomic.AddInt64(&cw.droppedCount, 1) + // Drop new data (simplest fallback for block) + } else { + atomic.AddInt64(&cw.droppedCount, 1) + } + case <-cw.ctx.Done(): + return + } + return + } + + // Default: "drop" strategy (implemented as Drop Oldest / Smart Drop) + // If the buffer is full, remove the oldest item to make space for the new item. + // This ensures that we always keep the most recent data, which is usually preferred in streaming. + select { + case cw.outputChan <- data: + atomic.AddInt64(&cw.sentCount, 1) + case <-cw.ctx.Done(): + return + default: + // Try to drop oldest data to make room for new data + select { + case <-cw.outputChan: + // Successfully dropped one old item + select { + case cw.outputChan <- data: + atomic.AddInt64(&cw.sentCount, 1) + default: + // Still failed, drop current + atomic.AddInt64(&cw.droppedCount, 1) + } + default: + // Channel empty, try to send again or drop + atomic.AddInt64(&cw.droppedCount, 1) + } + } +} + func (cw *CountingWindow) Trigger() { // Note: trigger logic has been merged into Start method to avoid data races // This method is kept to satisfy Window interface requirements, but actual triggering is handled in Start method @@ -211,17 +251,14 @@ func (cw *CountingWindow) Reset() { cw.dataBuffer = nil cw.keyedBuffer = make(map[string][]types.Row) cw.keyedCount = make(map[string]int) - cw.sentCount = 0 - cw.droppedCount = 0 + atomic.StoreInt64(&cw.sentCount, 0) + atomic.StoreInt64(&cw.droppedCount, 0) } func (cw *CountingWindow) GetStats() map[string]int64 { - cw.mu.Lock() - defer cw.mu.Unlock() - return map[string]int64{ - "sentCount": cw.sentCount, - "droppedCount": cw.droppedCount, + "sentCount": atomic.LoadInt64(&cw.sentCount), + "droppedCount": atomic.LoadInt64(&cw.droppedCount), "bufferSize": int64(cap(cw.outputChan)), "bufferUsed": int64(len(cw.outputChan)), } diff --git a/window/factory.go b/window/factory.go index 17fecf2..c9ef41a 100644 --- a/window/factory.go +++ b/window/factory.go @@ -42,6 +42,7 @@ type Window interface { OutputChan() <-chan []types.Row SetCallback(callback func([]types.Row)) Trigger() + GetStats() map[string]int64 } func CreateWindow(config types.WindowConfig) (Window, error) { diff --git a/window/session_window.go b/window/session_window.go index 1711a2d..969eef2 100644 --- a/window/session_window.go +++ b/window/session_window.go @@ -19,8 +19,10 @@ package window import ( "context" "fmt" + "reflect" "strings" "sync" + "sync/atomic" "time" "github.com/rulego/streamsql/types" @@ -59,6 +61,9 @@ type SessionWindow struct { watermark *Watermark // triggeredSessions stores sessions that have been triggered but are still open for late data (for EventTime with allowedLateness) triggeredSessions map[string]*sessionInfo + // Performance statistics + sentCount int64 // Number of successfully sent results + droppedCount int64 // Number of dropped results } // sessionInfo stores information about a triggered session that is still open for late data @@ -92,12 +97,9 @@ func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error) { } // Use unified performance configuration to get window output buffer size - bufferSize := 100 // Default value - if (config.PerformanceConfig != types.PerformanceConfig{}) { + bufferSize := 1000 // Default value + if config.PerformanceConfig.BufferConfig.WindowOutputSize > 0 { bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize - if bufferSize < 10 { - bufferSize = 10 // Minimum value - } } // Determine time characteristic (default to ProcessingTime for backward compatibility) @@ -411,13 +413,67 @@ func (sw *SessionWindow) sendResults(resultsToSend [][]types.Row) { sw.callback(result) } + sw.sendResult(result) + } +} + +func (sw *SessionWindow) sendResult(data []types.Row) { + strategy := sw.config.PerformanceConfig.OverflowConfig.Strategy + timeout := sw.config.PerformanceConfig.OverflowConfig.BlockTimeout + + if strategy == types.OverflowStrategyBlock { + if timeout <= 0 { + timeout = 5 * time.Second + } select { - case sw.outputChan <- result: + case sw.outputChan <- data: + atomic.AddInt64(&sw.sentCount, 1) + case <-time.After(timeout): + atomic.AddInt64(&sw.droppedCount, 1) + case <-sw.ctx.Done(): + return + } + return + } + + // Default: "drop" strategy (implemented as Drop Oldest / Smart Drop) + // If the buffer is full, remove the oldest item to make space for the new item. + // This ensures that we always keep the most recent data, which is usually preferred in streaming. + select { + case sw.outputChan <- data: + atomic.AddInt64(&sw.sentCount, 1) + default: + // Try to drop oldest data + select { + case <-sw.outputChan: + select { + case sw.outputChan <- data: + atomic.AddInt64(&sw.sentCount, 1) + default: + atomic.AddInt64(&sw.droppedCount, 1) + } default: + atomic.AddInt64(&sw.droppedCount, 1) } } } +// GetStats returns window performance statistics +func (sw *SessionWindow) GetStats() map[string]int64 { + return map[string]int64{ + "sentCount": atomic.LoadInt64(&sw.sentCount), + "droppedCount": atomic.LoadInt64(&sw.droppedCount), + "bufferSize": int64(cap(sw.outputChan)), + "bufferUsed": int64(len(sw.outputChan)), + } +} + +// ResetStats resets performance statistics +func (sw *SessionWindow) ResetStats() { + atomic.StoreInt64(&sw.sentCount, 0) + atomic.StoreInt64(&sw.droppedCount, 0) +} + // Trigger manually triggers all session windows func (sw *SessionWindow) Trigger() { sw.mu.Lock() @@ -450,13 +506,7 @@ func (sw *SessionWindow) Trigger() { sw.callback(result) } - // Non-blocking send to output channel - select { - case sw.outputChan <- result: - // Successfully sent - default: - // Channel full, drop result (could add statistics here if needed) - } + sw.sendResult(result) } } @@ -550,13 +600,7 @@ func (sw *SessionWindow) triggerLateUpdateLocked(s *session) { callback(resultData) } - // Non-blocking send to output channel - select { - case sw.outputChan <- resultData: - // Successfully sent - default: - // Channel full, drop result - } + sw.sendResult(resultData) // Re-acquire lock sw.mu.Lock() @@ -578,12 +622,44 @@ func extractSessionCompositeKey(data interface{}, keys []string) string { if len(keys) == 0 { return "default" } - parts := make([]string, 0, len(keys)) + + // Fast path for map[string]interface{} if m, ok := data.(map[string]interface{}); ok { + parts := make([]string, 0, len(keys)) for _, k := range keys { - parts = append(parts, fmt.Sprintf("%v", m[k])) + if val, exists := m[k]; exists { + parts = append(parts, cast.ToString(val)) + } else { + parts = append(parts, "") + } } return strings.Join(parts, "|") } - return "default" + + // Use reflection for structs and other types + v := reflect.ValueOf(data) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + parts := make([]string, 0, len(keys)) + for _, k := range keys { + var part string + switch v.Kind() { + case reflect.Map: + if v.Type().Key().Kind() == reflect.String { + mv := v.MapIndex(reflect.ValueOf(k)) + if mv.IsValid() { + part = cast.ToString(mv.Interface()) + } + } + case reflect.Struct: + f := v.FieldByName(k) + if f.IsValid() { + part = cast.ToString(f.Interface()) + } + } + parts = append(parts, part) + } + return strings.Join(parts, "|") } diff --git a/window/sliding_window.go b/window/sliding_window.go index 047c334..1328f72 100644 --- a/window/sliding_window.go +++ b/window/sliding_window.go @@ -21,6 +21,7 @@ import ( "fmt" "log" "sync" + "sync/atomic" "time" "github.com/rulego/streamsql/utils/cast" @@ -436,12 +437,28 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) { debugLogSliding("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping") break } + + // Save current slot reference before triggering + // We need to advance sw.currentSlot BEFORE calling triggerSpecificWindowLocked + // because triggerSpecificWindowLocked releases the lock, and we want to prevent + // re-entry for the same window. + slotToTrigger := sw.currentSlot + + // Move to next window immediately + sw.currentSlot = sw.NextSlot() + if sw.currentSlot != nil { + debugLogSliding("checkAndTriggerWindows: moved to next window [%v, %v)", + sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli()) + } else { + debugLogSliding("checkAndTriggerWindows: NextSlot returned nil") + } + // Check if window has data before triggering hasData := false dataInWindow := 0 var dataTimestamps []int64 for _, item := range sw.data { - if sw.currentSlot.Contains(item.Timestamp) { + if slotToTrigger.Contains(item.Timestamp) { hasData = true dataInWindow++ dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli()) @@ -453,13 +470,8 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) { // Trigger current window only if it has data if hasData { - // Count data in window before triggering - dataInWindow := 0 - for _, item := range sw.data { - if sw.currentSlot.Contains(item.Timestamp) { - dataInWindow++ - } - } + // Count data in window before triggering (re-count? redundant but harmless) + // Actually we already counted dataInWindow above // Save snapshot data before triggering (for Flink-like late update behavior) var snapshotData []types.Row @@ -467,12 +479,12 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) { // Create a deep copy of window data for snapshot snapshotData = make([]types.Row, 0, dataInWindow) for _, item := range sw.data { - if sw.currentSlot.Contains(item.Timestamp) { + if slotToTrigger.Contains(item.Timestamp) { // Create a copy of the row snapshotData = append(snapshotData, types.Row{ Data: item.Data, Timestamp: item.Timestamp, - Slot: sw.currentSlot, + Slot: slotToTrigger, }) } } @@ -480,15 +492,17 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) { debugLogSliding("checkAndTriggerWindows: triggering window [%v, %v) with %d data items", windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow) - sw.triggerWindowLocked() + + sw.triggerSpecificWindowLocked(slotToTrigger) + debugLogSliding("checkAndTriggerWindows: window triggered successfully") // If allowedLateness > 0, keep window open for late data if allowedLateness > 0 { - windowKey := sw.getWindowKey(*sw.currentSlot.End) - closeTime := sw.currentSlot.End.Add(allowedLateness) + windowKey := sw.getWindowKey(*slotToTrigger.End) + closeTime := slotToTrigger.End.Add(allowedLateness) sw.triggeredWindows[windowKey] = &triggeredWindowInfo{ - slot: sw.currentSlot, + slot: slotToTrigger, closeTime: closeTime, snapshotData: snapshotData, // Save snapshot for late updates } @@ -499,33 +513,23 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) { debugLogSliding("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger", windowStart.UnixMilli(), windowEnd.UnixMilli()) } - - // Move to next window (even if current window was empty) - sw.currentSlot = sw.NextSlot() - if sw.currentSlot != nil { - debugLogSliding("checkAndTriggerWindows: moved to next window [%v, %v)", - sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli()) - } else { - debugLogSliding("checkAndTriggerWindows: NextSlot returned nil, stopping") - break - } } // Close windows that have exceeded allowedLateness sw.closeExpiredWindows(watermarkTime) } -// triggerWindowLocked triggers the window (must be called with lock held) -func (sw *SlidingWindow) triggerWindowLocked() { - if sw.currentSlot == nil { - return +// extractWindowDataLocked extracts window data for the given slot (must be called with lock held) +func (sw *SlidingWindow) extractWindowDataLocked(slot *types.TimeSlot) []types.Row { + if slot == nil { + return nil } // Extract current window data resultData := make([]types.Row, 0) for _, item := range sw.data { - if sw.currentSlot.Contains(item.Timestamp) { - item.Slot = sw.currentSlot + if slot.Contains(item.Timestamp) { + item.Slot = slot resultData = append(resultData, item) } } @@ -533,22 +537,33 @@ func (sw *SlidingWindow) triggerWindowLocked() { // Skip triggering if window has no data // This prevents empty windows from being triggered if len(resultData) == 0 { - return + return nil } - // Retain data that could be in future windows - // For sliding windows, we need to keep data that falls within: - // - Current window end + size (for overlapping windows) - cutoffTime := sw.currentSlot.End.Add(sw.size) + // Remove data that is no longer needed + // For sliding windows, data can belong to multiple windows + // We only remove data that is older than the start of the next window + // because any data before that will not be included in future windows. + + nextWindowStart := slot.Start.Add(sw.slide) newData := make([]types.Row, 0) for _, item := range sw.data { - // Keep data that could be in future windows (before cutoffTime) - if item.Timestamp.Before(cutoffTime) { + if !item.Timestamp.Before(nextWindowStart) { newData = append(newData, item) } } sw.data = newData + return resultData +} + +// triggerSpecificWindowLocked triggers the specified window (must be called with lock held) +func (sw *SlidingWindow) triggerSpecificWindowLocked(slot *types.TimeSlot) { + resultData := sw.extractWindowDataLocked(slot) + if len(resultData) == 0 { + return + } + // Get callback reference before releasing lock callback := sw.callback @@ -559,24 +574,10 @@ func (sw *SlidingWindow) triggerWindowLocked() { callback(resultData) } - // Non-blocking send to output channel and update statistics - var sent bool - select { - case sw.outputChan <- resultData: - // Successfully sent - sent = true - default: - // Channel full, drop result - sent = false - } + sw.sendResult(resultData) // Re-acquire lock to update statistics sw.mu.Lock() - if sent { - sw.sentCount++ - } else { - sw.droppedCount++ - } } // Stop stops the sliding window operations @@ -648,81 +649,74 @@ func (sw *SlidingWindow) Trigger() { return } - // Extract Data fields to form []interface{} type data for current window - resultData := make([]types.Row, 0) - for _, item := range sw.data { - if sw.currentSlot.Contains(item.Timestamp) { - item.Slot = sw.currentSlot - resultData = append(resultData, item) - } - } + // Extract current window data + currentSlot := sw.currentSlot + sw.currentSlot = next - // Retain data that could be in future windows - // For sliding windows, we need to keep data that falls within future windows - // Future windows start at next.Start or later (next.Start + k * slide) - // So any data with timestamp < next.Start cannot be in any future window - newData := make([]types.Row, 0) - for _, item := range sw.data { - // Keep data that could be in future windows (>= next.Start) - if !item.Timestamp.Before(*next.Start) { - newData = append(newData, item) - } - } + resultData := sw.extractWindowDataLocked(currentSlot) - // If resultData is empty, skip callback to avoid sending empty results - // This prevents empty results from filling up channels when timer triggers repeatedly if len(resultData) == 0 { - // Update window data even if no result - sw.data = newData - sw.currentSlot = next sw.mu.Unlock() return } - // Update window data - sw.data = newData - sw.currentSlot = next - // Get callback reference before releasing lock callback := sw.callback // Release lock before calling callback and sending to channel to avoid blocking sw.mu.Unlock() - // Execute callback function if set (outside of lock to avoid blocking) if callback != nil { callback(resultData) } - // Non-blocking send to output channel and update statistics - var sent bool - select { - case sw.outputChan <- resultData: - // Successfully sent - sent = true - default: - // Channel full, drop result - sent = false + sw.sendResult(resultData) +} + +func (sw *SlidingWindow) sendResult(data []types.Row) { + strategy := sw.config.PerformanceConfig.OverflowConfig.Strategy + timeout := sw.config.PerformanceConfig.OverflowConfig.BlockTimeout + + if strategy == types.OverflowStrategyBlock { + if timeout <= 0 { + timeout = 5 * time.Second + } + select { + case sw.outputChan <- data: + atomic.AddInt64(&sw.sentCount, 1) + case <-time.After(timeout): + atomic.AddInt64(&sw.droppedCount, 1) + case <-sw.ctx.Done(): + return + } + return } - // Re-acquire lock to update statistics - sw.mu.Lock() - if sent { - sw.sentCount++ - } else { - sw.droppedCount++ + // Default: "drop" strategy (implemented as Drop Oldest / Smart Drop) + select { + case sw.outputChan <- data: + atomic.AddInt64(&sw.sentCount, 1) + default: + // Try to drop oldest data + select { + case <-sw.outputChan: + select { + case sw.outputChan <- data: + atomic.AddInt64(&sw.sentCount, 1) + default: + atomic.AddInt64(&sw.droppedCount, 1) + } + default: + atomic.AddInt64(&sw.droppedCount, 1) + } } - sw.mu.Unlock() } // GetStats returns window performance statistics func (sw *SlidingWindow) GetStats() map[string]int64 { - sw.mu.RLock() - defer sw.mu.RUnlock() - return map[string]int64{ - "sentCount": sw.sentCount, - "droppedCount": sw.droppedCount, + "sentCount": atomic.LoadInt64(&sw.sentCount), + "droppedCount": atomic.LoadInt64(&sw.droppedCount), "bufferSize": int64(cap(sw.outputChan)), "bufferUsed": int64(len(sw.outputChan)), } @@ -730,11 +724,8 @@ func (sw *SlidingWindow) GetStats() map[string]int64 { // ResetStats resets performance statistics func (sw *SlidingWindow) ResetStats() { - sw.mu.Lock() - defer sw.mu.Unlock() - - sw.sentCount = 0 - sw.droppedCount = 0 + atomic.StoreInt64(&sw.sentCount, 0) + atomic.StoreInt64(&sw.droppedCount, 0) } // Reset resets the sliding window and clears window data diff --git a/window/strategy_test.go b/window/strategy_test.go new file mode 100644 index 0000000..273c617 --- /dev/null +++ b/window/strategy_test.go @@ -0,0 +1,204 @@ +package window + +import ( + "testing" + "time" + + "github.com/rulego/streamsql/types" + "github.com/rulego/streamsql/utils/cast" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestOverflowStrategies 测试不同的缓冲区溢出策略 +func TestOverflowStrategies(t *testing.T) { + t.Run("CountingWindow_StrategyBlock_Timeout", func(t *testing.T) { + // 配置:窗口大小1(每1条数据触发一次),输出缓冲1,阻塞策略,超时100ms + config := types.WindowConfig{ + Type: "CountingWindow", + Params: []interface{}{1}, // Threshold = 1 + PerformanceConfig: types.PerformanceConfig{ + BufferConfig: types.BufferConfig{ + WindowOutputSize: 1, + }, + OverflowConfig: types.OverflowConfig{ + Strategy: types.OverflowStrategyBlock, + BlockTimeout: 100 * time.Millisecond, + AllowDataLoss: true, // 允许丢弃统计 + }, + }, + } + + win, err := NewCountingWindow(config) + require.NoError(t, err) + win.Start() + defer win.Stop() + + // 1. 发送第1条数据,触发窗口,填充 outputChan (容量1) + win.Add(map[string]interface{}{"id": 1}) + + // 等待处理 + time.Sleep(50 * time.Millisecond) + stats := win.GetStats() + assert.Equal(t, int64(1), stats["sentCount"]) + assert.Equal(t, int64(1), stats["bufferUsed"]) // 应该还在缓冲区中,因为没人读 + + // 2. 发送第2条数据,触发窗口 + // 此时 outputChan 已满,sendResult 应该阻塞 100ms 然后超时丢弃 + win.Add(map[string]interface{}{"id": 2}) + + // 等待超时 (100ms) + 处理时间 + time.Sleep(200 * time.Millisecond) + + stats = win.GetStats() + // 第1条仍在缓冲区(因为没人读) + // 第2条因为阻塞超时被丢弃 + assert.Equal(t, int64(1), stats["bufferUsed"]) + assert.Equal(t, int64(1), stats["droppedCount"]) + + // 3. 读取缓冲区中的数据,腾出空间 + select { + case <-win.OutputChan(): + // 读出第1条 + default: + t.Fatal("expected data in output channel") + } + + // 4. 发送第3条数据 + win.Add(map[string]interface{}{"id": 3}) + time.Sleep(50 * time.Millisecond) + + stats = win.GetStats() + assert.Equal(t, int64(2), stats["sentCount"]) // 第1条和第3条发送成功 + assert.Equal(t, int64(1), stats["droppedCount"]) // 第2条丢弃 + }) + + t.Run("SessionWindow_StrategyBlock_Timeout", func(t *testing.T) { + // 配置:会话超时50ms,输出缓冲1,阻塞策略,超时50ms + config := types.WindowConfig{ + Type: "SessionWindow", + Params: []interface{}{"50ms"}, + PerformanceConfig: types.PerformanceConfig{ + BufferConfig: types.BufferConfig{ + WindowOutputSize: 1, + }, + OverflowConfig: types.OverflowConfig{ + Strategy: types.OverflowStrategyBlock, + BlockTimeout: 50 * time.Millisecond, + AllowDataLoss: true, + }, + }, + } + + win, err := NewSessionWindow(config) + require.NoError(t, err) + win.Start() + defer win.Stop() + + // 1. 发送数据,开始一个 session + win.Add(map[string]interface{}{"id": 1}) + + // 2. 等待 session 超时 (50ms) + 检查周期 (timeout/2 = 25ms) + // 确保 session 被触发并发送到 outputChan + time.Sleep(100 * time.Millisecond) + + stats := win.GetStats() + assert.Equal(t, int64(1), stats["sentCount"]) + assert.Equal(t, int64(1), stats["bufferUsed"]) + + // 3. 发送数据开始第二个 session (因为上一个已经结束) + win.Add(map[string]interface{}{"id": 2}) + + // 4. 等待 session 超时 + // 此时 outputChan 已满,应该阻塞并丢弃 + time.Sleep(150 * time.Millisecond) + + stats = win.GetStats() + assert.Equal(t, int64(1), stats["bufferUsed"]) + assert.Equal(t, int64(1), stats["droppedCount"]) + }) + + t.Run("CountingWindow_StrategyDrop", func(t *testing.T) { + // 配置:窗口大小1,输出缓冲1,丢弃策略 + config := types.WindowConfig{ + Type: "CountingWindow", + Params: []interface{}{1}, + PerformanceConfig: types.PerformanceConfig{ + BufferConfig: types.BufferConfig{ + WindowOutputSize: 1, + }, + OverflowConfig: types.OverflowConfig{ + Strategy: types.OverflowStrategyDrop, + }, + }, + } + + win, err := NewCountingWindow(config) + require.NoError(t, err) + win.Start() + defer win.Stop() + + // 1. 发送第1条数据,填充 outputChan + win.Add(map[string]interface{}{"id": 1}) + time.Sleep(50 * time.Millisecond) + + // 2. 发送第2条数据 + // outputChan 已满,StrategyDrop 会尝试丢弃旧数据(outputChan头部)来放入新数据 + win.Add(map[string]interface{}{"id": 2}) + time.Sleep(50 * time.Millisecond) + + stats := win.GetStats() + assert.Equal(t, int64(2), stats["sentCount"]) + + // 验证现在缓冲区里是第2条数据 + select { + case data := <-win.OutputChan(): + assert.Len(t, data, 1) + assert.Equal(t, 2, cast.ToInt(data[0].Data.(map[string]interface{})["id"])) + default: + t.Fatal("expected data in output channel") + } + }) + + t.Run("TumblingWindow_StrategyBlock_Timeout", func(t *testing.T) { + // 配置:窗口大小50ms,输出缓冲1,阻塞策略,超时50ms + config := types.WindowConfig{ + Type: "TumblingWindow", + Params: []interface{}{"50ms"}, + PerformanceConfig: types.PerformanceConfig{ + BufferConfig: types.BufferConfig{ + WindowOutputSize: 1, + }, + OverflowConfig: types.OverflowConfig{ + Strategy: types.OverflowStrategyBlock, + BlockTimeout: 50 * time.Millisecond, + AllowDataLoss: true, + }, + }, + } + + win, err := NewTumblingWindow(config) + require.NoError(t, err) + win.Start() + defer win.Stop() + + // 1. 发送数据触发第1个窗口 + win.Add(map[string]interface{}{"id": 1}) + // 等待窗口触发 (50ms) + time.Sleep(100 * time.Millisecond) + + stats := win.GetStats() + assert.Equal(t, int64(1), stats["sentCount"]) + assert.Equal(t, int64(1), stats["bufferUsed"]) + + // 2. 发送数据触发第2个窗口 + // 由于没有读取 outputChan,第2个窗口触发时应该阻塞然后超时 + win.Add(map[string]interface{}{"id": 2}) + // 等待窗口触发 (50ms) + 阻塞超时 (50ms) + time.Sleep(150 * time.Millisecond) + + stats = win.GetStats() + assert.Equal(t, int64(1), stats["bufferUsed"]) // 仍然只有第1个窗口的数据 + assert.Equal(t, int64(1), stats["droppedCount"]) // 第2个窗口结果被丢弃 + }) +} diff --git a/window/tumbling_window.go b/window/tumbling_window.go index fe70440..450a4fb 100644 --- a/window/tumbling_window.go +++ b/window/tumbling_window.go @@ -21,6 +21,7 @@ import ( "fmt" "log" "sync" + "sync/atomic" "time" "github.com/rulego/streamsql/types" @@ -435,14 +436,7 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) { windowEnd := tw.currentSlot.End windowStart := tw.currentSlot.Start // Trigger if watermark >= windowEnd - // In Flink, windows are triggered when watermark >= windowEnd. - // Watermark calculation: watermark = maxEventTime - maxOutOfOrderness - // So watermark >= windowEnd means: maxEventTime - maxOutOfOrderness >= windowEnd - // Which means: maxEventTime >= windowEnd + maxOutOfOrderness // This ensures all data for the window has arrived (within maxOutOfOrderness tolerance) - // Check if watermark >= windowEnd - // Use !Before() instead of After() to include equality case - // This is equivalent to watermarkTime >= windowEnd shouldTrigger := !watermarkTime.Before(*windowEnd) debugLog("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v", @@ -495,7 +489,18 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) { debugLog("checkAndTriggerWindows: triggering window [%v, %v) with %d data items", windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow) - tw.triggerWindowLocked() + + resultData := tw.extractWindowDataLocked() + if len(resultData) > 0 { + callback := tw.callback + tw.mu.Unlock() + if callback != nil { + callback(resultData) + } + tw.sendResult(resultData) + tw.mu.Lock() + } + triggeredCount++ debugLog("checkAndTriggerWindows: window triggered successfully, triggeredCount=%d", triggeredCount) // triggerWindowLocked releases and re-acquires lock, so we need to re-check state @@ -579,15 +584,24 @@ func (tw *TumblingWindow) handleLateData(eventTime time.Time, allowedLateness ti if info.slot.Contains(eventTime) { // This late data belongs to a triggered window that's still open // Trigger window again with updated data (late update) - tw.triggerLateUpdateLocked(info.slot) + resultData := tw.extractLateUpdateDataLocked(info.slot) + if len(resultData) > 0 { + callback := tw.callback + tw.mu.Unlock() + if callback != nil { + callback(resultData) + } + tw.sendResult(resultData) + tw.mu.Lock() + } return } } } -// triggerLateUpdateLocked triggers a late update for a window (must be called with lock held) +// extractLateUpdateDataLocked extracts late update data for a window (must be called with lock held) // This implements Flink-like behavior: late updates include complete window data (original + late data) -func (tw *TumblingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) { +func (tw *TumblingWindow) extractLateUpdateDataLocked(slot *types.TimeSlot) []types.Row { // Find the triggered window info to get snapshot data var windowInfo *triggeredWindowInfo windowKey := tw.getWindowKey(*slot.End) @@ -619,7 +633,7 @@ func (tw *TumblingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) { } if len(resultData) == 0 { - return + return nil } // Update snapshot to include late data (for future late updates) @@ -635,34 +649,7 @@ func (tw *TumblingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) { } } - // Get callback reference before releasing lock - callback := tw.callback - - // Release lock before calling callback and sending to channel to avoid blocking - tw.mu.Unlock() - - if callback != nil { - callback(resultData) - } - - // Non-blocking send to output channel and update statistics - var sent bool - select { - case tw.outputChan <- resultData: - // Successfully sent - sent = true - default: - // Channel full, drop result - sent = false - } - - // Re-acquire lock to update statistics - tw.mu.Lock() - if sent { - tw.sentCount++ - } else { - tw.droppedCount++ - } + return resultData } // getWindowKey generates a key for a window based on its end time @@ -670,10 +657,10 @@ func (tw *TumblingWindow) getWindowKey(endTime time.Time) string { return fmt.Sprintf("%d", endTime.UnixNano()) } -// triggerWindowLocked triggers the window (must be called with lock held) -func (tw *TumblingWindow) triggerWindowLocked() { +// extractWindowDataLocked extracts current window data (must be called with lock held) +func (tw *TumblingWindow) extractWindowDataLocked() []types.Row { if tw.currentSlot == nil { - return + return nil } // Extract current window data @@ -688,7 +675,7 @@ func (tw *TumblingWindow) triggerWindowLocked() { // Skip triggering if window has no data // This prevents empty windows from being triggered if len(resultData) == 0 { - return + return nil } // Remove data that belongs to current window @@ -700,33 +687,47 @@ func (tw *TumblingWindow) triggerWindowLocked() { } tw.data = newData - // Get callback reference before releasing lock - callback := tw.callback + return resultData +} - // Release lock before calling callback and sending to channel to avoid blocking - tw.mu.Unlock() +func (tw *TumblingWindow) sendResult(data []types.Row) { + strategy := tw.config.PerformanceConfig.OverflowConfig.Strategy + timeout := tw.config.PerformanceConfig.OverflowConfig.BlockTimeout - if callback != nil { - callback(resultData) + if strategy == types.OverflowStrategyBlock { + if timeout <= 0 { + timeout = 5 * time.Second + } + select { + case tw.outputChan <- data: + atomic.AddInt64(&tw.sentCount, 1) + case <-time.After(timeout): + atomic.AddInt64(&tw.droppedCount, 1) + case <-tw.ctx.Done(): + return + } + return } - // Non-blocking send to output channel and update statistics - var sent bool + // Default: "drop" strategy (implemented as Drop Oldest / Smart Drop) + // If the buffer is full, remove the oldest item to make space for the new item. + // This ensures that we always keep the most recent data, which is usually preferred in streaming. select { - case tw.outputChan <- resultData: - // Successfully sent - sent = true + case tw.outputChan <- data: + atomic.AddInt64(&tw.sentCount, 1) default: - // Channel full, drop result - sent = false - } - - // Re-acquire lock to update statistics - tw.mu.Lock() - if sent { - tw.sentCount++ - } else { - tw.droppedCount++ + // Try to drop oldest data + select { + case <-tw.outputChan: + select { + case tw.outputChan <- data: + atomic.AddInt64(&tw.sentCount, 1) + default: + atomic.AddInt64(&tw.droppedCount, 1) + } + default: + atomic.AddInt64(&tw.droppedCount, 1) + } } } @@ -801,27 +802,8 @@ func (tw *TumblingWindow) Trigger() { callback(resultData) } - // Non-blocking send to output channel and update statistics - var sent bool - select { - case tw.outputChan <- resultData: - // Successfully sent - sent = true - default: - // Channel full, drop result - sent = false - } - - // Re-acquire lock to update statistics - tw.mu.Lock() - if sent { - tw.sentCount++ - } else { - tw.droppedCount++ - // Optional: add logging here - // log.Printf("Window output channel full, dropped result with %d rows", len(resultData)) - } - tw.mu.Unlock() + // Use sendResult to respect overflow strategy + tw.sendResult(resultData) } // Reset resets tumbling window data @@ -889,12 +871,9 @@ func (tw *TumblingWindow) SetCallback(callback func([]types.Row)) { // GetStats returns window performance statistics func (tw *TumblingWindow) GetStats() map[string]int64 { - tw.mu.RLock() - defer tw.mu.RUnlock() - return map[string]int64{ - "sentCount": tw.sentCount, - "droppedCount": tw.droppedCount, + "sentCount": atomic.LoadInt64(&tw.sentCount), + "droppedCount": atomic.LoadInt64(&tw.droppedCount), "bufferSize": int64(cap(tw.outputChan)), "bufferUsed": int64(len(tw.outputChan)), } @@ -902,9 +881,6 @@ func (tw *TumblingWindow) GetStats() map[string]int64 { // ResetStats resets performance statistics func (tw *TumblingWindow) ResetStats() { - tw.mu.Lock() - defer tw.mu.Unlock() - - tw.sentCount = 0 - tw.droppedCount = 0 + atomic.StoreInt64(&tw.sentCount, 0) + atomic.StoreInt64(&tw.droppedCount, 0) } diff --git a/window/window_test.go b/window/window_test.go index 68c98a8..3a0652f 100644 --- a/window/window_test.go +++ b/window/window_test.go @@ -584,7 +584,7 @@ func TestWindowWithPerformanceConfig(t *testing.T) { name: "计数窗口-高性能配置", windowType: TypeCounting, performanceConfig: types.HighPerformanceConfig(), - expectedBufferSize: 20, // 200 / 10 + expectedBufferSize: 200, extraParams: map[string]interface{}{"count": 10}, }, {