mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-04-13 15:48:52 +00:00
590 lines
16 KiB
Go
590 lines
16 KiB
Go
/*
|
|
* Copyright 2025 The RuleGo Authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package window
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rulego/streamsql/types"
|
|
"github.com/rulego/streamsql/utils/cast"
|
|
)
|
|
|
|
// Ensure SessionWindow struct implements Window interface
|
|
var _ Window = (*SessionWindow)(nil)
|
|
|
|
// SessionWindow represents a session window
|
|
// Session window is an event-time based window that closes when no events arrive for a period of time
|
|
type SessionWindow struct {
|
|
// config is the window configuration information
|
|
config types.WindowConfig
|
|
// timeout is the session timeout duration, session will close if no new events within this time
|
|
timeout time.Duration
|
|
// mu is used to protect concurrent access to window data
|
|
mu sync.RWMutex
|
|
// sessionMap stores session data for different keys
|
|
sessionMap map[string]*session
|
|
// outputChan is a channel for sending data when window triggers
|
|
outputChan chan []types.Row
|
|
// callback is an optional callback function called when window triggers
|
|
callback func([]types.Row)
|
|
// ctx is used to control window lifecycle
|
|
ctx context.Context
|
|
// cancelFunc is used to cancel window operations
|
|
cancelFunc context.CancelFunc
|
|
// Channel for initializing window
|
|
initChan chan struct{}
|
|
initialized bool
|
|
// Lock to protect ticker
|
|
tickerMu sync.Mutex
|
|
ticker *time.Ticker
|
|
// watermark for event time processing (only used for EventTime)
|
|
watermark *Watermark
|
|
// triggeredSessions stores sessions that have been triggered but are still open for late data (for EventTime with allowedLateness)
|
|
triggeredSessions map[string]*sessionInfo
|
|
}
|
|
|
|
// sessionInfo stores information about a triggered session that is still open for late data
|
|
type sessionInfo struct {
|
|
session *session
|
|
closeTime time.Time // session end + allowedLateness
|
|
}
|
|
|
|
// session stores data and state for a session
|
|
type session struct {
|
|
data []types.Row
|
|
lastActive time.Time
|
|
slot *types.TimeSlot
|
|
}
|
|
|
|
// NewSessionWindow creates a new session window instance
|
|
func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error) {
|
|
// Get timeout parameter from params array
|
|
if len(config.Params) == 0 {
|
|
return nil, fmt.Errorf("session window requires 'timeout' parameter")
|
|
}
|
|
|
|
// Create a cancellable context
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
timeoutVal := config.Params[0]
|
|
timeout, err := cast.ToDurationE(timeoutVal)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("invalid timeout for session window: %v", err)
|
|
}
|
|
|
|
// Use unified performance configuration to get window output buffer size
|
|
bufferSize := 100 // Default value, session windows typically have smaller buffers
|
|
if (config.PerformanceConfig != types.PerformanceConfig{}) {
|
|
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize / 10 // Session window uses 1/10 of buffer
|
|
if bufferSize < 10 {
|
|
bufferSize = 10 // Minimum value
|
|
}
|
|
}
|
|
|
|
// Determine time characteristic (default to ProcessingTime for backward compatibility)
|
|
timeChar := config.TimeCharacteristic
|
|
if timeChar == "" {
|
|
timeChar = types.ProcessingTime
|
|
}
|
|
|
|
// Initialize watermark for event time
|
|
var watermark *Watermark
|
|
if timeChar == types.EventTime {
|
|
maxOutOfOrderness := config.MaxOutOfOrderness
|
|
if maxOutOfOrderness == 0 {
|
|
maxOutOfOrderness = 0 // Default: no out-of-orderness allowed
|
|
}
|
|
watermarkInterval := config.WatermarkInterval
|
|
if watermarkInterval == 0 {
|
|
watermarkInterval = 200 * time.Millisecond // Default: 200ms
|
|
}
|
|
idleTimeout := config.IdleTimeout
|
|
// Default: 0 means disabled, no idle source mechanism
|
|
watermark = NewWatermark(maxOutOfOrderness, watermarkInterval, idleTimeout)
|
|
}
|
|
|
|
return &SessionWindow{
|
|
config: config,
|
|
timeout: timeout,
|
|
sessionMap: make(map[string]*session),
|
|
outputChan: make(chan []types.Row, bufferSize),
|
|
ctx: ctx,
|
|
cancelFunc: cancel,
|
|
initChan: make(chan struct{}),
|
|
initialized: false,
|
|
watermark: watermark,
|
|
triggeredSessions: make(map[string]*sessionInfo),
|
|
}, nil
|
|
}
|
|
|
|
// Add adds data to session window
|
|
func (sw *SessionWindow) Add(data interface{}) {
|
|
// Lock to ensure thread safety
|
|
sw.mu.Lock()
|
|
defer sw.mu.Unlock()
|
|
|
|
if !sw.initialized {
|
|
// Safely close initChan to avoid closing an already closed channel
|
|
select {
|
|
case <-sw.initChan:
|
|
// Already closed, do nothing
|
|
default:
|
|
close(sw.initChan)
|
|
}
|
|
sw.initialized = true
|
|
}
|
|
|
|
// Get data timestamp
|
|
timestamp := GetTimestamp(data, sw.config.TsProp, sw.config.TimeUnit)
|
|
|
|
// Determine time characteristic (default to ProcessingTime for backward compatibility)
|
|
timeChar := sw.config.TimeCharacteristic
|
|
if timeChar == "" {
|
|
timeChar = types.ProcessingTime
|
|
}
|
|
|
|
// For event time, update watermark and check for late data
|
|
if timeChar == types.EventTime && sw.watermark != nil {
|
|
sw.watermark.UpdateEventTime(timestamp)
|
|
// Check if data is late and handle allowedLateness
|
|
if sw.watermark.IsEventTimeLate(timestamp) {
|
|
// Data is late, check if it's within allowedLateness
|
|
allowedLateness := sw.config.AllowedLateness
|
|
if allowedLateness > 0 {
|
|
// Check if this late data belongs to any triggered session that's still open
|
|
sw.handleLateData(timestamp, allowedLateness)
|
|
}
|
|
// If allowedLateness is 0 or data is too late, we still add it but it won't trigger updates
|
|
}
|
|
}
|
|
|
|
// Create Row object
|
|
row := types.Row{
|
|
Data: data,
|
|
Timestamp: timestamp,
|
|
}
|
|
|
|
// Extract session key (supports multiple group by keys)
|
|
key := extractSessionCompositeKey(data, sw.config.GroupByKeys)
|
|
|
|
// Get or create session
|
|
s, exists := sw.sessionMap[key]
|
|
if !exists {
|
|
// Create new session
|
|
// Use the actual timestamp of the first data point as session start
|
|
// No alignment needed - session starts from when first data arrives
|
|
start := timestamp
|
|
end := start.Add(sw.timeout)
|
|
slot := types.NewTimeSlot(&start, &end)
|
|
|
|
s = &session{
|
|
data: []types.Row{},
|
|
lastActive: timestamp,
|
|
slot: slot,
|
|
}
|
|
sw.sessionMap[key] = s
|
|
} else {
|
|
// Update session end time
|
|
if timestamp.After(s.lastActive) {
|
|
s.lastActive = timestamp
|
|
// Extend session end time
|
|
newEnd := timestamp.Add(sw.timeout)
|
|
if newEnd.After(*s.slot.End) {
|
|
s.slot.End = &newEnd
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add data to session
|
|
row.Slot = s.slot
|
|
s.data = append(s.data, row)
|
|
}
|
|
|
|
// Start starts the session window's periodic check mechanism
|
|
// Start starts the session window, begins periodic checking of expired sessions
|
|
// Uses lazy initialization mode to avoid infinite waiting when no data, while ensuring subsequent data can be processed normally
|
|
func (sw *SessionWindow) Start() {
|
|
// Determine time characteristic (default to ProcessingTime for backward compatibility)
|
|
timeChar := sw.config.TimeCharacteristic
|
|
if timeChar == "" {
|
|
timeChar = types.ProcessingTime
|
|
}
|
|
|
|
if timeChar == types.EventTime {
|
|
// Event time: trigger based on watermark
|
|
sw.startEventTime()
|
|
} else {
|
|
// Processing time: trigger based on system clock
|
|
sw.startProcessingTime()
|
|
}
|
|
}
|
|
|
|
// startProcessingTime starts the processing time trigger mechanism
|
|
func (sw *SessionWindow) startProcessingTime() {
|
|
go func() {
|
|
// Close output channel when function ends
|
|
defer close(sw.outputChan)
|
|
|
|
// Wait for initialization completion or context cancellation
|
|
select {
|
|
case <-sw.initChan:
|
|
// Normal initialization completed, continue processing
|
|
case <-sw.ctx.Done():
|
|
// Context cancelled, exit directly
|
|
return
|
|
}
|
|
|
|
// Periodically check expired sessions
|
|
sw.tickerMu.Lock()
|
|
sw.ticker = time.NewTicker(sw.timeout / 2)
|
|
ticker := sw.ticker
|
|
sw.tickerMu.Unlock()
|
|
|
|
defer func() {
|
|
sw.tickerMu.Lock()
|
|
if sw.ticker != nil {
|
|
sw.ticker.Stop()
|
|
}
|
|
sw.tickerMu.Unlock()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
sw.checkExpiredSessions()
|
|
case <-sw.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// startEventTime starts the event time trigger mechanism based on watermark
|
|
func (sw *SessionWindow) startEventTime() {
|
|
go func() {
|
|
// Close output channel when function ends
|
|
defer close(sw.outputChan)
|
|
if sw.watermark != nil {
|
|
defer sw.watermark.Stop()
|
|
}
|
|
|
|
// Wait for initialization completion or context cancellation
|
|
select {
|
|
case <-sw.initChan:
|
|
// Normal initialization completed, continue processing
|
|
case <-sw.ctx.Done():
|
|
// Context cancelled, exit directly
|
|
return
|
|
}
|
|
|
|
// Process watermark updates
|
|
if sw.watermark != nil {
|
|
for {
|
|
select {
|
|
case watermarkTime := <-sw.watermark.WatermarkChan():
|
|
sw.checkAndTriggerSessions(watermarkTime)
|
|
case <-sw.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
} else {
|
|
// If watermark is nil, just wait for context cancellation
|
|
<-sw.ctx.Done()
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop stops session window operations
|
|
func (sw *SessionWindow) Stop() {
|
|
// Call cancel function to stop window operations
|
|
sw.cancelFunc()
|
|
|
|
// Safely stop ticker
|
|
sw.tickerMu.Lock()
|
|
if sw.ticker != nil {
|
|
sw.ticker.Stop()
|
|
}
|
|
sw.tickerMu.Unlock()
|
|
|
|
// Stop watermark (for event time)
|
|
if sw.watermark != nil {
|
|
sw.watermark.Stop()
|
|
}
|
|
|
|
// Ensure initChan is closed if it hasn't been closed yet
|
|
// This prevents Start() goroutine from blocking on initChan
|
|
sw.mu.Lock()
|
|
if !sw.initialized && sw.initChan != nil {
|
|
select {
|
|
case <-sw.initChan:
|
|
// Already closed, do nothing
|
|
default:
|
|
close(sw.initChan)
|
|
}
|
|
}
|
|
sw.mu.Unlock()
|
|
}
|
|
|
|
func (sw *SessionWindow) checkExpiredSessions() {
|
|
sw.mu.Lock()
|
|
now := time.Now()
|
|
resultsToSend := sw.collectExpiredSessions(now)
|
|
sw.mu.Unlock()
|
|
|
|
sw.sendResults(resultsToSend)
|
|
}
|
|
|
|
func (sw *SessionWindow) checkAndTriggerSessions(watermarkTime time.Time) {
|
|
sw.mu.Lock()
|
|
resultsToSend := sw.collectExpiredSessions(watermarkTime)
|
|
sw.closeExpiredSessions(watermarkTime)
|
|
sw.mu.Unlock()
|
|
|
|
sw.sendResults(resultsToSend)
|
|
}
|
|
|
|
func (sw *SessionWindow) collectExpiredSessions(currentTime time.Time) [][]types.Row {
|
|
expiredKeys := []string{}
|
|
for key, s := range sw.sessionMap {
|
|
// For event time, use slot.End to determine if session expired
|
|
// Session expires when watermark >= session end time
|
|
// For processing time, use lastActive + timeout
|
|
if s.slot.End != nil && !currentTime.Before(*s.slot.End) {
|
|
expiredKeys = append(expiredKeys, key)
|
|
} else if currentTime.Sub(s.lastActive) > sw.timeout {
|
|
expiredKeys = append(expiredKeys, key)
|
|
}
|
|
}
|
|
|
|
resultsToSend := make([][]types.Row, 0)
|
|
allowedLateness := sw.config.AllowedLateness
|
|
|
|
for _, key := range expiredKeys {
|
|
s := sw.sessionMap[key]
|
|
if len(s.data) > 0 {
|
|
result := make([]types.Row, len(s.data))
|
|
copy(result, s.data)
|
|
resultsToSend = append(resultsToSend, result)
|
|
|
|
if allowedLateness > 0 {
|
|
closeTime := s.slot.End.Add(allowedLateness)
|
|
sw.triggeredSessions[key] = &sessionInfo{
|
|
session: s,
|
|
closeTime: closeTime,
|
|
}
|
|
}
|
|
}
|
|
delete(sw.sessionMap, key)
|
|
}
|
|
|
|
return resultsToSend
|
|
}
|
|
|
|
func (sw *SessionWindow) sendResults(resultsToSend [][]types.Row) {
|
|
for _, result := range resultsToSend {
|
|
// Skip empty results to avoid filling up channels
|
|
if len(result) == 0 {
|
|
continue
|
|
}
|
|
|
|
if sw.callback != nil {
|
|
sw.callback(result)
|
|
}
|
|
|
|
select {
|
|
case sw.outputChan <- result:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// Trigger manually triggers all session windows
|
|
func (sw *SessionWindow) Trigger() {
|
|
sw.mu.Lock()
|
|
|
|
// Collect all results first
|
|
resultsToSend := make([][]types.Row, 0)
|
|
for _, s := range sw.sessionMap {
|
|
if len(s.data) > 0 {
|
|
// Trigger session window
|
|
result := make([]types.Row, len(s.data))
|
|
copy(result, s.data)
|
|
resultsToSend = append(resultsToSend, result)
|
|
}
|
|
}
|
|
// Clear all sessions
|
|
sw.sessionMap = make(map[string]*session)
|
|
|
|
// Release lock before sending to channel and calling callback to avoid blocking
|
|
sw.mu.Unlock()
|
|
|
|
// Send results and call callbacks outside of lock to avoid blocking
|
|
for _, result := range resultsToSend {
|
|
// Skip empty results to avoid filling up channels
|
|
if len(result) == 0 {
|
|
continue
|
|
}
|
|
|
|
// If callback function is set, execute it
|
|
if sw.callback != nil {
|
|
sw.callback(result)
|
|
}
|
|
|
|
// Non-blocking send to output channel
|
|
select {
|
|
case sw.outputChan <- result:
|
|
// Successfully sent
|
|
default:
|
|
// Channel full, drop result (could add statistics here if needed)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Reset resets session window data
|
|
func (sw *SessionWindow) Reset() {
|
|
sw.mu.Lock()
|
|
defer sw.mu.Unlock()
|
|
|
|
// Stop existing ticker
|
|
sw.tickerMu.Lock()
|
|
if sw.ticker != nil {
|
|
sw.ticker.Stop()
|
|
sw.ticker = nil
|
|
}
|
|
sw.tickerMu.Unlock()
|
|
|
|
// Stop watermark (for event time)
|
|
if sw.watermark != nil {
|
|
sw.watermark.Stop()
|
|
// Recreate watermark
|
|
timeChar := sw.config.TimeCharacteristic
|
|
if timeChar == "" {
|
|
timeChar = types.ProcessingTime
|
|
}
|
|
if timeChar == types.EventTime {
|
|
maxOutOfOrderness := sw.config.MaxOutOfOrderness
|
|
if maxOutOfOrderness == 0 {
|
|
maxOutOfOrderness = 0
|
|
}
|
|
watermarkInterval := sw.config.WatermarkInterval
|
|
if watermarkInterval == 0 {
|
|
watermarkInterval = 200 * time.Millisecond
|
|
}
|
|
idleTimeout := sw.config.IdleTimeout
|
|
sw.watermark = NewWatermark(maxOutOfOrderness, watermarkInterval, idleTimeout)
|
|
}
|
|
}
|
|
|
|
// Clear session data
|
|
sw.sessionMap = make(map[string]*session)
|
|
sw.triggeredSessions = make(map[string]*sessionInfo)
|
|
sw.initialized = false
|
|
sw.initChan = make(chan struct{})
|
|
}
|
|
|
|
// OutputChan returns a read-only channel for receiving data when window triggers
|
|
func (sw *SessionWindow) OutputChan() <-chan []types.Row {
|
|
return sw.outputChan
|
|
}
|
|
|
|
// SetCallback sets the callback function when session window triggers
|
|
func (sw *SessionWindow) SetCallback(callback func([]types.Row)) {
|
|
sw.mu.Lock()
|
|
defer sw.mu.Unlock()
|
|
sw.callback = callback
|
|
}
|
|
|
|
// handleLateData handles late data that arrives within allowedLateness
|
|
func (sw *SessionWindow) handleLateData(eventTime time.Time, allowedLateness time.Duration) {
|
|
sw.mu.Lock()
|
|
defer sw.mu.Unlock()
|
|
|
|
// Find which triggered session this late data belongs to
|
|
for _, info := range sw.triggeredSessions {
|
|
if info.session.slot.Contains(eventTime) {
|
|
// This late data belongs to a triggered session that's still open
|
|
// Trigger session again with updated data (late update)
|
|
sw.triggerLateUpdateLocked(info.session)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// triggerLateUpdateLocked triggers a late update for a session (must be called with lock held)
|
|
func (sw *SessionWindow) triggerLateUpdateLocked(s *session) {
|
|
if len(s.data) == 0 {
|
|
return
|
|
}
|
|
|
|
// Extract session data including late data
|
|
resultData := make([]types.Row, len(s.data))
|
|
copy(resultData, s.data)
|
|
|
|
// Get callback reference before releasing lock
|
|
callback := sw.callback
|
|
|
|
// Release lock before calling callback and sending to channel to avoid blocking
|
|
sw.mu.Unlock()
|
|
|
|
if callback != nil {
|
|
callback(resultData)
|
|
}
|
|
|
|
// Non-blocking send to output channel
|
|
select {
|
|
case sw.outputChan <- resultData:
|
|
// Successfully sent
|
|
default:
|
|
// Channel full, drop result
|
|
}
|
|
|
|
// Re-acquire lock
|
|
sw.mu.Lock()
|
|
}
|
|
|
|
// closeExpiredSessions closes sessions that have exceeded allowedLateness
|
|
func (sw *SessionWindow) closeExpiredSessions(watermarkTime time.Time) {
|
|
for key, info := range sw.triggeredSessions {
|
|
if !watermarkTime.Before(info.closeTime) {
|
|
// Session has expired, remove it
|
|
delete(sw.triggeredSessions, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
// extractSessionCompositeKey builds composite session key from multiple group fields
|
|
// If GroupByKeys is empty, returns default key
|
|
func extractSessionCompositeKey(data interface{}, keys []string) string {
|
|
if len(keys) == 0 {
|
|
return "default"
|
|
}
|
|
parts := make([]string, 0, len(keys))
|
|
if m, ok := data.(map[string]interface{}); ok {
|
|
for _, k := range keys {
|
|
parts = append(parts, fmt.Sprintf("%v", m[k]))
|
|
}
|
|
return strings.Join(parts, "|")
|
|
}
|
|
return "default"
|
|
}
|