Files
streamsql/window/tumbling_window.go
2025-08-04 14:45:43 +08:00

308 lines
8.0 KiB
Go

/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package window
import (
"context"
"fmt"
"sync"
"time"
"github.com/rulego/streamsql/types"
"github.com/rulego/streamsql/utils/cast"
"github.com/rulego/streamsql/utils/timex"
)
// Ensure TumblingWindow implements the Window interface
var _ Window = (*TumblingWindow)(nil)
// TumblingWindow represents a tumbling window for collecting data and triggering processing at fixed time intervals
type TumblingWindow struct {
// config holds window configuration
config types.WindowConfig
// size is the time size of tumbling window (window duration)
size time.Duration
// mu protects concurrent access to window data
mu sync.RWMutex
// data stores collected data within the window
data []types.Row
// outputChan is a channel for sending data when window triggers
outputChan chan []types.Row
// callback is an optional callback function called when window triggers
callback func([]types.Row)
// ctx controls window lifecycle
ctx context.Context
// cancelFunc cancels window operations
cancelFunc context.CancelFunc
// timer for triggering window periodically
timer *time.Ticker
currentSlot *types.TimeSlot
// initChan for window initialization
initChan chan struct{}
initialized bool
// timerMu protects timer access
timerMu sync.Mutex
// Performance statistics
droppedCount int64 // Number of dropped results
sentCount int64 // Number of successfully sent results
}
// NewTumblingWindow creates a new tumbling window instance
// Parameter size is the time size of the window
func NewTumblingWindow(config types.WindowConfig) (*TumblingWindow, error) {
// Create a cancellable context
ctx, cancel := context.WithCancel(context.Background())
size, err := cast.ToDurationE(config.Params["size"])
if err != nil {
return nil, fmt.Errorf("invalid size for tumbling window: %v", err)
}
// Use unified performance config to get window output buffer size
bufferSize := 1000 // Default value
if perfConfig, exists := config.Params["performanceConfig"]; exists {
if pc, ok := perfConfig.(types.PerformanceConfig); ok {
bufferSize = pc.BufferConfig.WindowOutputSize
}
}
return &TumblingWindow{
config: config,
size: size,
outputChan: make(chan []types.Row, bufferSize),
ctx: ctx,
cancelFunc: cancel,
initChan: make(chan struct{}),
initialized: false,
}, nil
}
// Add adds data to the tumbling window
func (tw *TumblingWindow) Add(data interface{}) {
// Lock to ensure thread safety
tw.mu.Lock()
defer tw.mu.Unlock()
// Append data to window's data list
if !tw.initialized {
tw.currentSlot = tw.createSlot(GetTimestamp(data, tw.config.TsProp, tw.config.TimeUnit))
tw.timerMu.Lock()
tw.timer = time.NewTicker(tw.size)
tw.timerMu.Unlock()
tw.initialized = true
// Send initialization complete signal (after setting timer)
close(tw.initChan)
}
row := types.Row{
Data: data,
Timestamp: GetTimestamp(data, tw.config.TsProp, tw.config.TimeUnit),
}
tw.data = append(tw.data, row)
}
func (sw *TumblingWindow) createSlot(t time.Time) *types.TimeSlot {
// Create a new time slot
start := timex.AlignTimeToWindow(t, sw.size)
end := start.Add(sw.size)
slot := types.NewTimeSlot(&start, &end)
return slot
}
func (sw *TumblingWindow) NextSlot() *types.TimeSlot {
if sw.currentSlot == nil {
return nil
}
start := sw.currentSlot.End
end := sw.currentSlot.End.Add(sw.size)
return types.NewTimeSlot(start, &end)
}
// Stop stops tumbling window operations
func (tw *TumblingWindow) Stop() {
// Call cancel function to stop window operations
tw.cancelFunc()
// Safely stop timer
tw.timerMu.Lock()
if tw.timer != nil {
tw.timer.Stop()
}
tw.timerMu.Unlock()
}
// Start starts the tumbling window's periodic trigger mechanism
// Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally
func (tw *TumblingWindow) Start() {
go func() {
// Close output channel when function ends
defer close(tw.outputChan)
// Wait for initialization complete or context cancellation
select {
case <-tw.initChan:
// Initialization completed normally, continue processing
case <-tw.ctx.Done():
// Context cancelled, exit directly
return
}
for {
// Safely get timer in each loop iteration
tw.timerMu.Lock()
timer := tw.timer
tw.timerMu.Unlock()
if timer == nil {
// If timer is nil, wait briefly and retry
select {
case <-time.After(10 * time.Millisecond):
continue
case <-tw.ctx.Done():
return
}
}
select {
// Trigger window when timer expires
case <-timer.C:
tw.Trigger()
// Stop timer and exit loop when context is cancelled
case <-tw.ctx.Done():
tw.timerMu.Lock()
if tw.timer != nil {
tw.timer.Stop()
}
tw.timerMu.Unlock()
return
}
}
}()
}
// Trigger triggers the tumbling window's processing logic
func (tw *TumblingWindow) Trigger() {
// Lock to ensure thread safety
tw.mu.Lock()
defer tw.mu.Unlock()
if !tw.initialized {
return
}
// Calculate next window slot
next := tw.NextSlot()
// Retain data for next window
tms := next.Start.Add(-tw.size)
tme := next.End.Add(tw.size)
temp := types.NewTimeSlot(&tms, &tme)
newData := make([]types.Row, 0)
for _, item := range tw.data {
if temp.Contains(item.Timestamp) {
newData = append(newData, item)
}
}
// Extract current window data
resultData := make([]types.Row, 0)
for _, item := range tw.data {
if tw.currentSlot.Contains(item.Timestamp) {
item.Slot = tw.currentSlot
resultData = append(resultData, item)
}
}
// Execute callback function if set
if tw.callback != nil {
tw.callback(resultData)
}
// Update window data
tw.data = newData
tw.currentSlot = next
// Non-blocking send to output channel and update statistics
select {
case tw.outputChan <- resultData:
// Successfully sent, update statistics (within lock)
tw.sentCount++
default:
// Channel full, drop result and update statistics (within lock)
tw.droppedCount++
// Optional: add logging here
} // log.Printf("Window output channel full, dropped result with %d rows", len(resultData))
}
// Reset resets tumbling window data
func (tw *TumblingWindow) Reset() {
// First cancel context to stop all running goroutines
tw.cancelFunc()
// Lock to ensure thread safety
tw.mu.Lock()
defer tw.mu.Unlock()
// Stop existing timer
tw.timerMu.Lock()
if tw.timer != nil {
tw.timer.Stop()
tw.timer = nil
}
tw.timerMu.Unlock()
// Clear window data
tw.data = nil
tw.currentSlot = nil
tw.initialized = false
tw.initChan = make(chan struct{})
// Recreate context for next startup
tw.ctx, tw.cancelFunc = context.WithCancel(context.Background())
}
// OutputChan returns a read-only channel for receiving data when window triggers
func (tw *TumblingWindow) OutputChan() <-chan []types.Row {
return tw.outputChan
}
// SetCallback sets the callback function to execute when tumbling window triggers
func (tw *TumblingWindow) SetCallback(callback func([]types.Row)) {
tw.mu.Lock()
defer tw.mu.Unlock()
tw.callback = callback
}
// GetStats returns window performance statistics
func (tw *TumblingWindow) GetStats() map[string]int64 {
tw.mu.RLock()
defer tw.mu.RUnlock()
return map[string]int64{
"sent_count": tw.sentCount,
"dropped_count": tw.droppedCount,
"buffer_size": int64(cap(tw.outputChan)),
"buffer_used": int64(len(tw.outputChan)),
}
}
// ResetStats resets performance statistics
func (tw *TumblingWindow) ResetStats() {
tw.mu.Lock()
defer tw.mu.Unlock()
tw.sentCount = 0
tw.droppedCount = 0
}