Files
2025-11-15 13:06:06 +08:00

908 lines
26 KiB
Go

/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package window
import (
"context"
"fmt"
"sync"
"time"
"github.com/rulego/streamsql/utils/cast"
"github.com/rulego/streamsql/types"
)
// Ensure SlidingWindow implements the Window interface
var _ Window = (*SlidingWindow)(nil)
// TimedData wraps data with timestamp
type TimedData struct {
Data interface{}
Timestamp time.Time
}
// SlidingWindow represents a sliding window for processing data within time ranges
type SlidingWindow struct {
// config holds window configuration
config types.WindowConfig
// size is the total window size (time range covered by the window)
size time.Duration
// slide is the sliding interval for the window
slide time.Duration
// mu protects concurrent data access
mu sync.RWMutex
// data stores window data
data []types.Row
// outputChan is the channel for outputting window data
outputChan chan []types.Row
// callback function executed when window triggers
callback func([]types.Row)
// ctx controls window lifecycle
ctx context.Context
// cancelFunc cancels the context
cancelFunc context.CancelFunc
// timer for triggering window periodically (used for ProcessingTime)
timer *time.Ticker
currentSlot *types.TimeSlot
// initChan for window initialization
initChan chan struct{}
initialized bool
// timerMu protects timer access
timerMu sync.Mutex
// firstWindowStartTime records when first window started (processing time)
firstWindowStartTime time.Time
// watermark for event time processing (only used for EventTime)
watermark *Watermark
// triggeredWindows stores windows that have been triggered but are still open for late data (for EventTime with allowedLateness)
triggeredWindows map[string]*triggeredWindowInfo // key: window end time string
// Performance statistics
droppedCount int64 // Number of dropped results
sentCount int64 // Number of successfully sent results
}
// NewSlidingWindow creates a new sliding window instance
// size parameter represents the total window size, slide represents the sliding interval
func NewSlidingWindow(config types.WindowConfig) (*SlidingWindow, error) {
// Get size parameter from params array
if len(config.Params) < 1 {
return nil, fmt.Errorf("sliding window requires at least 'size' parameter")
}
sizeVal := config.Params[0]
size, err := cast.ToDurationE(sizeVal)
if err != nil {
return nil, fmt.Errorf("invalid size for sliding window: %v", err)
}
// Get slide parameter from params array
if len(config.Params) < 2 {
return nil, fmt.Errorf("sliding window requires 'slide' parameter")
}
slideVal := config.Params[1]
slide, err := cast.ToDurationE(slideVal)
if err != nil {
return nil, fmt.Errorf("invalid slide for sliding window: %v", err)
}
// Use unified performance config to get window output buffer size
bufferSize := 1000 // Default value
if (config.PerformanceConfig != types.PerformanceConfig{}) {
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize
}
// Determine time characteristic (default to ProcessingTime for backward compatibility)
timeChar := config.TimeCharacteristic
if timeChar == "" {
timeChar = types.ProcessingTime
}
// Initialize watermark for event time
var watermark *Watermark
if timeChar == types.EventTime {
maxOutOfOrderness := config.MaxOutOfOrderness
if maxOutOfOrderness == 0 {
maxOutOfOrderness = 0 // Default: no out-of-orderness allowed
}
watermarkInterval := config.WatermarkInterval
if watermarkInterval == 0 {
watermarkInterval = 200 * time.Millisecond // Default: 200ms
}
idleTimeout := config.IdleTimeout
// Default: 0 means disabled, no idle source mechanism
watermark = NewWatermark(maxOutOfOrderness, watermarkInterval, idleTimeout)
}
// Create a cancellable context
ctx, cancel := context.WithCancel(context.Background())
return &SlidingWindow{
config: config,
size: size,
slide: slide,
outputChan: make(chan []types.Row, bufferSize),
ctx: ctx,
cancelFunc: cancel,
data: make([]types.Row, 0),
initChan: make(chan struct{}),
initialized: false,
watermark: watermark,
triggeredWindows: make(map[string]*triggeredWindowInfo),
}, nil
}
// Add adds data to the sliding window
func (sw *SlidingWindow) Add(data interface{}) {
// Lock to ensure thread safety
sw.mu.Lock()
defer sw.mu.Unlock()
// Get timestamp
eventTime := GetTimestamp(data, sw.config.TsProp, sw.config.TimeUnit)
// Determine time characteristic (default to ProcessingTime for backward compatibility)
timeChar := sw.config.TimeCharacteristic
if timeChar == "" {
timeChar = types.ProcessingTime
}
// For event time, update watermark
if timeChar == types.EventTime && sw.watermark != nil {
sw.watermark.UpdateEventTime(eventTime)
}
// Add data to the window's data list first (needed for late data handling)
if !sw.initialized {
if timeChar == types.EventTime {
// For event time, align window start to window boundaries
alignedStart := alignWindowStart(eventTime, sw.slide)
sw.currentSlot = sw.createSlotFromStart(alignedStart)
} else {
// For processing time, use current time or event time as-is
sw.currentSlot = sw.createSlot(eventTime)
// Record when first window started (processing time)
sw.firstWindowStartTime = time.Now()
}
// Don't start timer here, wait for first window to end
// Send initialization complete signal
// Safely close initChan to avoid closing an already closed channel
select {
case <-sw.initChan:
// Already closed, do nothing
default:
close(sw.initChan)
}
sw.initialized = true
}
row := types.Row{
Data: data,
Timestamp: eventTime,
}
sw.data = append(sw.data, row)
// Check if data is late and handle allowedLateness (after data is added)
if timeChar == types.EventTime && sw.watermark != nil {
if sw.watermark.IsEventTimeLate(eventTime) {
allowedLateness := sw.config.AllowedLateness
if allowedLateness > 0 {
// IMPORTANT: First check if this late data belongs to any triggered window that's still open
// This ensures late data is correctly assigned to its original window, even if
// the event time happens to fall within the current window's range
belongsToTriggeredWindow := false
for _, info := range sw.triggeredWindows {
if info.slot.Contains(eventTime) {
belongsToTriggeredWindow = true
// Trigger late update for this window (data is already in sw.data)
sw.handleLateData(eventTime, allowedLateness)
break
}
}
// If not belonging to triggered window, check if it belongs to currentSlot
// This handles the case where watermark has advanced but window hasn't triggered yet
if !belongsToTriggeredWindow && sw.initialized && sw.currentSlot != nil && sw.currentSlot.Contains(eventTime) {
// Data belongs to currentSlot, it will be included when window triggers
// No need to do anything here
} else if !belongsToTriggeredWindow {
// Check if this late data belongs to any triggered window that's still open
sw.handleLateData(eventTime, allowedLateness)
}
}
// If allowedLateness is 0 or data is too late, we still add it but it won't trigger updates
}
}
}
// Start starts the sliding window with periodic triggering
// Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally
// First window triggers when it ends, then subsequent windows trigger at slide intervals
func (sw *SlidingWindow) Start() {
// Determine time characteristic (default to ProcessingTime for backward compatibility)
timeChar := sw.config.TimeCharacteristic
if timeChar == "" {
timeChar = types.ProcessingTime
}
if timeChar == types.EventTime {
// Event time: trigger based on watermark
sw.startEventTime()
} else {
// Processing time: trigger based on system clock
sw.startProcessingTime()
}
}
// startProcessingTime starts the processing time trigger mechanism
func (sw *SlidingWindow) startProcessingTime() {
go func() {
// Close output channel when function ends
defer close(sw.outputChan)
// Wait for initialization complete or context cancellation
select {
case <-sw.initChan:
// Initialization completed normally, continue processing
case <-sw.ctx.Done():
// Context cancelled, exit directly
return
}
// Wait for first window to end, then trigger it
// After initChan is closed, firstWindowStartTime should be set by Add()
sw.mu.RLock()
firstWindowStartTime := sw.firstWindowStartTime
sw.mu.RUnlock()
// Verify that firstWindowStartTime is valid (not zero)
// If zero, it means Add() hasn't been called yet, which shouldn't happen
// but we handle it gracefully by waiting for window size
if firstWindowStartTime.IsZero() {
// This shouldn't happen if Add() is called before Start(),
// but if it does, wait for window size from now
firstWindowStartTime = time.Now()
}
// Calculate time until first window ends (window size from processing time)
now := time.Now()
elapsed := now.Sub(firstWindowStartTime)
var waitDuration time.Duration
if elapsed < sw.size {
// Wait until window size time has passed
waitDuration = sw.size - elapsed
} else {
// First window already ended, trigger immediately
waitDuration = 0
}
// Wait for first window to end
if waitDuration > 0 {
select {
case <-time.After(waitDuration):
// First window ended, trigger it
sw.Trigger()
case <-sw.ctx.Done():
return
}
} else {
// First window already ended, trigger immediately
sw.Trigger()
}
// Now start the sliding step timer for subsequent windows
sw.timerMu.Lock()
sw.timer = time.NewTicker(sw.slide)
sw.timerMu.Unlock()
// Continue with periodic triggering at slide intervals
for {
// Safely get timer in each loop iteration
sw.timerMu.Lock()
timer := sw.timer
sw.timerMu.Unlock()
if timer == nil {
// If timer is nil, wait briefly and retry
select {
case <-time.After(10 * time.Millisecond):
continue
case <-sw.ctx.Done():
return
}
}
select {
// Trigger window when timer expires
case <-timer.C:
sw.Trigger()
// Stop timer and exit loop when context is cancelled
case <-sw.ctx.Done():
sw.timerMu.Lock()
if sw.timer != nil {
sw.timer.Stop()
}
sw.timerMu.Unlock()
return
}
}
}()
}
// startEventTime starts the event time trigger mechanism based on watermark
func (sw *SlidingWindow) startEventTime() {
go func() {
// Close output channel when function ends
defer close(sw.outputChan)
if sw.watermark != nil {
defer sw.watermark.Stop()
}
// Wait for initialization complete or context cancellation
select {
case <-sw.initChan:
// Initialization completed normally, continue processing
case <-sw.ctx.Done():
// Context cancelled, exit directly
return
}
// Process watermark updates
if sw.watermark != nil {
for {
select {
case watermarkTime := <-sw.watermark.WatermarkChan():
sw.checkAndTriggerWindows(watermarkTime)
case <-sw.ctx.Done():
return
}
}
}
}()
}
// checkAndTriggerWindows checks if any windows should be triggered based on watermark
func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
sw.mu.Lock()
defer sw.mu.Unlock()
if !sw.initialized || sw.currentSlot == nil {
return
}
allowedLateness := sw.config.AllowedLateness
// Trigger all windows whose end time is <= watermark
// In Flink, windows are triggered when watermark >= windowEnd.
// Watermark calculation: watermark = maxEventTime - maxOutOfOrderness
// So watermark >= windowEnd means: maxEventTime - maxOutOfOrderness >= windowEnd
// Which means: maxEventTime >= windowEnd + maxOutOfOrderness
// This ensures all data for the window has arrived (within maxOutOfOrderness tolerance)
// Use a small threshold (1ms) only for floating point precision issues
for sw.currentSlot != nil {
windowEnd := sw.currentSlot.End
// Check if watermark >= windowEnd
// Use !Before() instead of After() to include equality case
// This is equivalent to watermarkTime >= windowEnd
shouldTrigger := !watermarkTime.Before(*windowEnd)
if !shouldTrigger {
// Watermark hasn't reached windowEnd yet, stop checking
break
}
// Check if window has data before triggering
hasData := false
for _, item := range sw.data {
if sw.currentSlot.Contains(item.Timestamp) {
hasData = true
break
}
}
// Trigger current window only if it has data
if hasData {
// Count data in window before triggering
dataInWindow := 0
for _, item := range sw.data {
if sw.currentSlot.Contains(item.Timestamp) {
dataInWindow++
}
}
// Save snapshot data before triggering (for Flink-like late update behavior)
var snapshotData []types.Row
if allowedLateness > 0 {
// Create a deep copy of window data for snapshot
snapshotData = make([]types.Row, 0, dataInWindow)
for _, item := range sw.data {
if sw.currentSlot.Contains(item.Timestamp) {
// Create a copy of the row
snapshotData = append(snapshotData, types.Row{
Data: item.Data,
Timestamp: item.Timestamp,
Slot: sw.currentSlot,
})
}
}
}
sw.triggerWindowLocked()
// If allowedLateness > 0, keep window open for late data
if allowedLateness > 0 {
windowKey := sw.getWindowKey(*sw.currentSlot.End)
closeTime := sw.currentSlot.End.Add(allowedLateness)
sw.triggeredWindows[windowKey] = &triggeredWindowInfo{
slot: sw.currentSlot,
closeTime: closeTime,
snapshotData: snapshotData, // Save snapshot for late updates
}
}
}
// Move to next window (even if current window was empty)
sw.currentSlot = sw.NextSlot()
}
// Close windows that have exceeded allowedLateness
sw.closeExpiredWindows(watermarkTime)
}
// triggerWindowLocked triggers the window (must be called with lock held)
func (sw *SlidingWindow) triggerWindowLocked() {
if sw.currentSlot == nil {
return
}
// Extract current window data
resultData := make([]types.Row, 0)
for _, item := range sw.data {
if sw.currentSlot.Contains(item.Timestamp) {
item.Slot = sw.currentSlot
resultData = append(resultData, item)
}
}
// Skip triggering if window has no data
// This prevents empty windows from being triggered
if len(resultData) == 0 {
return
}
// Retain data that could be in future windows
// For sliding windows, we need to keep data that falls within:
// - Current window end + size (for overlapping windows)
cutoffTime := sw.currentSlot.End.Add(sw.size)
newData := make([]types.Row, 0)
for _, item := range sw.data {
// Keep data that could be in future windows (before cutoffTime)
if item.Timestamp.Before(cutoffTime) {
newData = append(newData, item)
}
}
sw.data = newData
// Get callback reference before releasing lock
callback := sw.callback
// Release lock before calling callback and sending to channel to avoid blocking
sw.mu.Unlock()
if callback != nil {
callback(resultData)
}
// Non-blocking send to output channel and update statistics
var sent bool
select {
case sw.outputChan <- resultData:
// Successfully sent
sent = true
default:
// Channel full, drop result
sent = false
}
// Re-acquire lock to update statistics
sw.mu.Lock()
if sent {
sw.sentCount++
} else {
sw.droppedCount++
}
}
// Stop stops the sliding window operations
func (sw *SlidingWindow) Stop() {
// Call cancel function to stop window operations
sw.cancelFunc()
// Safely stop timer (for processing time)
sw.timerMu.Lock()
if sw.timer != nil {
sw.timer.Stop()
}
sw.timerMu.Unlock()
// Stop watermark (for event time)
if sw.watermark != nil {
sw.watermark.Stop()
}
// Ensure initChan is closed if it hasn't been closed yet
// This prevents Start() goroutine from blocking on initChan
sw.mu.Lock()
if !sw.initialized && sw.initChan != nil {
select {
case <-sw.initChan:
// Already closed, do nothing
default:
close(sw.initChan)
}
}
sw.mu.Unlock()
}
// Trigger triggers the sliding window to process data within the window
// For ProcessingTime: called by timer
// For EventTime: called by watermark updates
func (sw *SlidingWindow) Trigger() {
// Determine time characteristic
timeChar := sw.config.TimeCharacteristic
if timeChar == "" {
timeChar = types.ProcessingTime
}
// Lock to ensure thread safety
sw.mu.Lock()
// Return directly if no data in window
if len(sw.data) == 0 {
sw.mu.Unlock()
return
}
if !sw.initialized {
sw.mu.Unlock()
return
}
if timeChar == types.EventTime {
// For event time, trigger is handled by watermark mechanism
// This method is kept for backward compatibility but shouldn't be called directly
sw.mu.Unlock()
return
}
// Processing time logic
// Calculate next slot for sliding window
next := sw.NextSlot()
if next == nil {
sw.mu.Unlock()
return
}
// Extract Data fields to form []interface{} type data for current window
resultData := make([]types.Row, 0)
for _, item := range sw.data {
if sw.currentSlot.Contains(item.Timestamp) {
item.Slot = sw.currentSlot
resultData = append(resultData, item)
}
}
// Retain data that could be in future windows
// For sliding windows, we need to keep data that falls within:
// - Current window end + size (for overlapping windows)
// - Next window end + size (for future windows)
// Actually, we should keep all data that could be in any future window
// The latest window that could contain a data point is: next.End + size
cutoffTime := next.End.Add(sw.size)
newData := make([]types.Row, 0)
for _, item := range sw.data {
// Keep data that could be in future windows (before cutoffTime)
if item.Timestamp.Before(cutoffTime) {
newData = append(newData, item)
}
}
// If resultData is empty, skip callback to avoid sending empty results
// This prevents empty results from filling up channels when timer triggers repeatedly
if len(resultData) == 0 {
// Update window data even if no result
sw.data = newData
sw.currentSlot = next
sw.mu.Unlock()
return
}
// Update window data
sw.data = newData
sw.currentSlot = next
// Get callback reference before releasing lock
callback := sw.callback
// Release lock before calling callback and sending to channel to avoid blocking
sw.mu.Unlock()
// Execute callback function if set (outside of lock to avoid blocking)
if callback != nil {
callback(resultData)
}
// Non-blocking send to output channel and update statistics
var sent bool
select {
case sw.outputChan <- resultData:
// Successfully sent
sent = true
default:
// Channel full, drop result
sent = false
}
// Re-acquire lock to update statistics
sw.mu.Lock()
if sent {
sw.sentCount++
} else {
sw.droppedCount++
}
sw.mu.Unlock()
}
// GetStats returns window performance statistics
func (sw *SlidingWindow) GetStats() map[string]int64 {
sw.mu.RLock()
defer sw.mu.RUnlock()
return map[string]int64{
"sentCount": sw.sentCount,
"droppedCount": sw.droppedCount,
"bufferSize": int64(cap(sw.outputChan)),
"bufferUsed": int64(len(sw.outputChan)),
}
}
// ResetStats resets performance statistics
func (sw *SlidingWindow) ResetStats() {
sw.mu.Lock()
defer sw.mu.Unlock()
sw.sentCount = 0
sw.droppedCount = 0
}
// Reset resets the sliding window and clears window data
func (sw *SlidingWindow) Reset() {
// First cancel context to stop all running goroutines
sw.cancelFunc()
// Lock to ensure thread safety
sw.mu.Lock()
defer sw.mu.Unlock()
// Stop existing timer (for processing time)
sw.timerMu.Lock()
if sw.timer != nil {
sw.timer.Stop()
sw.timer = nil
}
sw.timerMu.Unlock()
// Stop watermark (for event time)
if sw.watermark != nil {
sw.watermark.Stop()
// Recreate watermark
timeChar := sw.config.TimeCharacteristic
if timeChar == "" {
timeChar = types.ProcessingTime
}
if timeChar == types.EventTime {
maxOutOfOrderness := sw.config.MaxOutOfOrderness
if maxOutOfOrderness == 0 {
maxOutOfOrderness = 0
}
watermarkInterval := sw.config.WatermarkInterval
if watermarkInterval == 0 {
watermarkInterval = 200 * time.Millisecond
}
idleTimeout := sw.config.IdleTimeout
sw.watermark = NewWatermark(maxOutOfOrderness, watermarkInterval, idleTimeout)
}
}
// Clear window data
sw.data = nil
sw.currentSlot = nil
sw.initialized = false
sw.initChan = make(chan struct{})
sw.firstWindowStartTime = time.Time{}
sw.triggeredWindows = make(map[string]*triggeredWindowInfo)
// Recreate context for next startup
sw.ctx, sw.cancelFunc = context.WithCancel(context.Background())
}
// OutputChan returns the sliding window's output channel
func (sw *SlidingWindow) OutputChan() <-chan []types.Row {
return sw.outputChan
}
// SetCallback sets the callback function to execute when sliding window triggers
func (sw *SlidingWindow) SetCallback(callback func([]types.Row)) {
sw.mu.Lock()
defer sw.mu.Unlock()
sw.callback = callback
}
func (sw *SlidingWindow) NextSlot() *types.TimeSlot {
if sw.currentSlot == nil {
return nil
}
start := sw.currentSlot.Start.Add(sw.slide)
end := sw.currentSlot.End.Add(sw.slide)
next := types.NewTimeSlot(&start, &end)
return next
}
func (sw *SlidingWindow) createSlot(t time.Time) *types.TimeSlot {
// Create a new time slot (for processing time, no alignment needed)
start := t
end := start.Add(sw.size)
slot := types.NewTimeSlot(&start, &end)
return slot
}
func (sw *SlidingWindow) createSlotFromStart(start time.Time) *types.TimeSlot {
// Create a new time slot from aligned start time (for event time)
end := start.Add(sw.size)
slot := types.NewTimeSlot(&start, &end)
return slot
}
// getWindowKey generates a key for a window based on its end time
func (sw *SlidingWindow) getWindowKey(endTime time.Time) string {
return fmt.Sprintf("%d", endTime.UnixNano())
}
// handleLateData handles late data that arrives within allowedLateness
func (sw *SlidingWindow) handleLateData(eventTime time.Time, allowedLateness time.Duration) {
// Find which triggered window this late data belongs to
for _, info := range sw.triggeredWindows {
if info.slot.Contains(eventTime) {
// This late data belongs to a triggered window that's still open
// Trigger window again with updated data (late update)
sw.triggerLateUpdateLocked(info.slot)
return
}
}
}
// triggerLateUpdateLocked triggers a late update for a window (must be called with lock held)
// This implements Flink-like behavior: late updates include complete window data (original + late data)
func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
// Find the triggered window info to get snapshot data
var windowInfo *triggeredWindowInfo
windowKey := sw.getWindowKey(*slot.End)
if info, exists := sw.triggeredWindows[windowKey]; exists {
windowInfo = info
}
// Collect all data for this window: original snapshot + late data from sw.data
resultData := make([]types.Row, 0)
// First, add original snapshot data (if exists)
if windowInfo != nil && len(windowInfo.snapshotData) > 0 {
// Create copies of snapshot data
for _, item := range windowInfo.snapshotData {
resultData = append(resultData, types.Row{
Data: item.Data,
Timestamp: item.Timestamp,
Slot: slot, // Update slot reference
})
}
}
// Then, add late data from sw.data (newly arrived late data)
lateDataCount := 0
for _, item := range sw.data {
if slot.Contains(item.Timestamp) {
item.Slot = slot
resultData = append(resultData, item)
lateDataCount++
}
}
if len(resultData) == 0 {
return
}
// Update snapshot to include late data (for future late updates)
if windowInfo != nil {
// Update snapshot with complete data (original + late)
windowInfo.snapshotData = make([]types.Row, len(resultData))
for i, item := range resultData {
windowInfo.snapshotData[i] = types.Row{
Data: item.Data,
Timestamp: item.Timestamp,
Slot: slot,
}
}
}
// Get callback reference before releasing lock
callback := sw.callback
// Release lock before calling callback and sending to channel to avoid blocking
sw.mu.Unlock()
if callback != nil {
callback(resultData)
}
// Non-blocking send to output channel and update statistics
var sent bool
select {
case sw.outputChan <- resultData:
// Successfully sent
sent = true
default:
// Channel full, drop result
sent = false
}
// Re-acquire lock to update statistics
sw.mu.Lock()
if sent {
sw.sentCount++
} else {
sw.droppedCount++
}
}
// closeExpiredWindows closes windows that have exceeded allowedLateness
func (sw *SlidingWindow) closeExpiredWindows(watermarkTime time.Time) {
expiredWindows := make([]*types.TimeSlot, 0)
for key, info := range sw.triggeredWindows {
if !watermarkTime.Before(info.closeTime) {
// Window has expired, mark for removal
expiredWindows = append(expiredWindows, info.slot)
delete(sw.triggeredWindows, key)
}
}
// Clean up data that belongs to expired windows (if any)
if len(expiredWindows) > 0 {
newData := make([]types.Row, 0)
for _, item := range sw.data {
belongsToExpiredWindow := false
for _, expiredSlot := range expiredWindows {
if expiredSlot.Contains(item.Timestamp) {
belongsToExpiredWindow = true
break
}
}
if !belongsToExpiredWindow {
newData = append(newData, item)
}
}
if len(newData) != len(sw.data) {
sw.data = newData
}
}
}