mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-22 10:05:20 +00:00
fix:修复窗口对齐问题
This commit is contained in:
+68
-31
@@ -73,25 +73,36 @@ type SlidingWindow struct {
|
||||
// NewSlidingWindow creates a new sliding window instance
|
||||
// size parameter represents the total window size, slide represents the sliding interval
|
||||
func NewSlidingWindow(config types.WindowConfig) (*SlidingWindow, error) {
|
||||
// Create a cancellable context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
size, err := cast.ToDurationE(config.Params["size"])
|
||||
// Get size parameter from params array
|
||||
if len(config.Params) < 1 {
|
||||
return nil, fmt.Errorf("sliding window requires at least 'size' parameter")
|
||||
}
|
||||
|
||||
sizeVal := config.Params[0]
|
||||
size, err := cast.ToDurationE(sizeVal)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid size for sliding window: %v", err)
|
||||
}
|
||||
slide, err := cast.ToDurationE(config.Params["slide"])
|
||||
|
||||
// Get slide parameter from params array
|
||||
if len(config.Params) < 2 {
|
||||
return nil, fmt.Errorf("sliding window requires 'slide' parameter")
|
||||
}
|
||||
|
||||
slideVal := config.Params[1]
|
||||
slide, err := cast.ToDurationE(slideVal)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid slide for sliding window: %v", err)
|
||||
}
|
||||
|
||||
// Use unified performance config to get window output buffer size
|
||||
bufferSize := 1000 // Default value
|
||||
if perfConfig, exists := config.Params["performanceConfig"]; exists {
|
||||
if pc, ok := perfConfig.(types.PerformanceConfig); ok {
|
||||
bufferSize = pc.BufferConfig.WindowOutputSize
|
||||
}
|
||||
if (config.PerformanceConfig != types.PerformanceConfig{}) {
|
||||
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize
|
||||
}
|
||||
|
||||
// Create a cancellable context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &SlidingWindow{
|
||||
config: config,
|
||||
size: size,
|
||||
@@ -195,29 +206,24 @@ func (sw *SlidingWindow) Stop() {
|
||||
func (sw *SlidingWindow) Trigger() {
|
||||
// Lock to ensure thread safety
|
||||
sw.mu.Lock()
|
||||
defer sw.mu.Unlock()
|
||||
|
||||
// Return directly if no data in window
|
||||
if len(sw.data) == 0 {
|
||||
sw.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if !sw.initialized {
|
||||
sw.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Calculate cutoff time (current time minus window size)
|
||||
// Calculate next slot for sliding window
|
||||
next := sw.NextSlot()
|
||||
// Retain data for next window
|
||||
tms := next.Start.Add(-sw.size)
|
||||
tme := next.End.Add(sw.size)
|
||||
temp := types.NewTimeSlot(&tms, &tme)
|
||||
newData := make([]types.Row, 0)
|
||||
for _, item := range sw.data {
|
||||
if temp.Contains(item.Timestamp) {
|
||||
newData = append(newData, item)
|
||||
}
|
||||
if next == nil {
|
||||
sw.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Extract Data fields to form []interface{} type data
|
||||
// Extract Data fields to form []interface{} type data for current window
|
||||
resultData := make([]types.Row, 0)
|
||||
for _, item := range sw.data {
|
||||
if sw.currentSlot.Contains(item.Timestamp) {
|
||||
@@ -226,24 +232,55 @@ func (sw *SlidingWindow) Trigger() {
|
||||
}
|
||||
}
|
||||
|
||||
// Execute callback function if set
|
||||
if sw.callback != nil {
|
||||
sw.callback(resultData)
|
||||
// Retain data that could be in future windows
|
||||
// For sliding windows, we need to keep data that falls within:
|
||||
// - Current window end + size (for overlapping windows)
|
||||
// - Next window end + size (for future windows)
|
||||
// Actually, we should keep all data that could be in any future window
|
||||
// The latest window that could contain a data point is: next.End + size
|
||||
cutoffTime := next.End.Add(sw.size)
|
||||
newData := make([]types.Row, 0)
|
||||
for _, item := range sw.data {
|
||||
// Keep data that could be in future windows (before cutoffTime)
|
||||
if item.Timestamp.Before(cutoffTime) {
|
||||
newData = append(newData, item)
|
||||
}
|
||||
}
|
||||
|
||||
// Update window data
|
||||
sw.data = newData
|
||||
sw.currentSlot = next
|
||||
|
||||
// Non-blocking send to output channel and update statistics (within lock)
|
||||
// Get callback reference before releasing lock
|
||||
callback := sw.callback
|
||||
|
||||
// Release lock before calling callback and sending to channel to avoid blocking
|
||||
sw.mu.Unlock()
|
||||
|
||||
// Execute callback function if set (outside of lock to avoid blocking)
|
||||
if callback != nil {
|
||||
callback(resultData)
|
||||
}
|
||||
|
||||
// Non-blocking send to output channel and update statistics
|
||||
var sent bool
|
||||
select {
|
||||
case sw.outputChan <- resultData:
|
||||
// Successfully sent, update statistics (within lock)
|
||||
sw.sentCount++
|
||||
// Successfully sent
|
||||
sent = true
|
||||
default:
|
||||
// Channel full, drop result and update statistics (within lock)
|
||||
// Channel full, drop result
|
||||
sent = false
|
||||
}
|
||||
|
||||
// Re-acquire lock to update statistics
|
||||
sw.mu.Lock()
|
||||
if sent {
|
||||
sw.sentCount++
|
||||
} else {
|
||||
sw.droppedCount++
|
||||
}
|
||||
sw.mu.Unlock()
|
||||
}
|
||||
|
||||
// GetStats returns window performance statistics
|
||||
@@ -252,10 +289,10 @@ func (sw *SlidingWindow) GetStats() map[string]int64 {
|
||||
defer sw.mu.RUnlock()
|
||||
|
||||
return map[string]int64{
|
||||
"sent_count": sw.sentCount,
|
||||
"dropped_count": sw.droppedCount,
|
||||
"buffer_size": int64(cap(sw.outputChan)),
|
||||
"buffer_used": int64(len(sw.outputChan)),
|
||||
"sentCount": sw.sentCount,
|
||||
"droppedCount": sw.droppedCount,
|
||||
"bufferSize": int64(cap(sw.outputChan)),
|
||||
"bufferUsed": int64(len(sw.outputChan)),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user