// Package window 提供了窗口操作的实现,包括滚动窗口(Tumbling Window)。 package window import ( "context" "fmt" "sync" "time" "github.com/rulego/streamsql/types" "github.com/rulego/streamsql/utils/cast" "github.com/rulego/streamsql/utils/timex" ) // 确保 TumblingWindow 结构体实现了 Window 接口。 var _ Window = (*TumblingWindow)(nil) // TumblingWindow 表示一个滚动窗口,用于在固定时间间隔内收集数据并触发处理。 type TumblingWindow struct { // config 是窗口的配置信息。 config types.WindowConfig // size 是滚动窗口的时间大小,即窗口的持续时间。 size time.Duration // mu 用于保护对窗口数据的并发访问。 mu sync.RWMutex // data 存储窗口内收集的数据。 data []types.Row // outputChan 是一个通道,用于在窗口触发时发送数据。 outputChan chan []types.Row // callback 是一个可选的回调函数,在窗口触发时调用。 callback func([]types.Row) // ctx 用于控制窗口的生命周期。 ctx context.Context // cancelFunc 用于取消窗口的操作。 cancelFunc context.CancelFunc // timer 用于定时触发窗口。 timer *time.Ticker currentSlot *types.TimeSlot // 用于初始化窗口的通道 initChan chan struct{} initialized bool // 保护timer的锁 timerMu sync.Mutex // 性能统计 droppedCount int64 // 丢弃的结果数量 sentCount int64 // 成功发送的结果数量 } // NewTumblingWindow 创建一个新的滚动窗口实例。 // 参数 size 是窗口的时间大小。 func NewTumblingWindow(config types.WindowConfig) (*TumblingWindow, error) { // 创建一个可取消的上下文。 ctx, cancel := context.WithCancel(context.Background()) size, err := cast.ToDurationE(config.Params["size"]) if err != nil { return nil, fmt.Errorf("invalid size for tumbling window: %v", err) } // 使用统一的性能配置获取窗口输出缓冲区大小 bufferSize := 1000 // 默认值 if perfConfig, exists := config.Params["performanceConfig"]; exists { if pc, ok := perfConfig.(types.PerformanceConfig); ok { bufferSize = pc.BufferConfig.WindowOutputSize } } return &TumblingWindow{ config: config, size: size, outputChan: make(chan []types.Row, bufferSize), ctx: ctx, cancelFunc: cancel, initChan: make(chan struct{}), initialized: false, }, nil } // Add 向滚动窗口添加数据。 // 参数 data 是要添加的数据。 func (tw *TumblingWindow) Add(data interface{}) { // 加锁以确保并发安全。 tw.mu.Lock() defer tw.mu.Unlock() // 将数据追加到窗口的数据列表中。 if !tw.initialized { tw.currentSlot = tw.createSlot(GetTimestamp(data, tw.config.TsProp, tw.config.TimeUnit)) tw.timerMu.Lock() tw.timer = time.NewTicker(tw.size) tw.timerMu.Unlock() tw.initialized = true // 发送初始化完成信号(在设置timer后) close(tw.initChan) } row := types.Row{ Data: data, Timestamp: GetTimestamp(data, tw.config.TsProp, tw.config.TimeUnit), } tw.data = append(tw.data, row) } func (sw *TumblingWindow) createSlot(t time.Time) *types.TimeSlot { // 创建一个新的时间槽位 start := timex.AlignTimeToWindow(t, sw.size) end := start.Add(sw.size) slot := types.NewTimeSlot(&start, &end) return slot } func (sw *TumblingWindow) NextSlot() *types.TimeSlot { if sw.currentSlot == nil { return nil } start := sw.currentSlot.End end := sw.currentSlot.End.Add(sw.size) return types.NewTimeSlot(start, &end) } // Stop 停止滚动窗口的操作。 func (tw *TumblingWindow) Stop() { // 调用取消函数以停止窗口的操作。 tw.cancelFunc() // 安全地停止timer tw.timerMu.Lock() if tw.timer != nil { tw.timer.Stop() } tw.timerMu.Unlock() } // Start 启动滚动窗口的定时触发机制。 func (tw *TumblingWindow) Start() { go func() { <-tw.initChan // 在函数结束时关闭输出通道。 defer close(tw.outputChan) // 获取timer引用,避免在循环中重复加锁 tw.timerMu.Lock() timer := tw.timer tw.timerMu.Unlock() if timer == nil { return } for { select { // 当定时器到期时,触发窗口。 case <-timer.C: tw.Trigger() // 当上下文被取消时,停止定时器并退出循环。 case <-tw.ctx.Done(): tw.timerMu.Lock() if tw.timer != nil { tw.timer.Stop() } tw.timerMu.Unlock() return } } }() } // Trigger 触发滚动窗口的处理逻辑。 func (tw *TumblingWindow) Trigger() { // 加锁以确保并发安全。 tw.mu.Lock() defer tw.mu.Unlock() if !tw.initialized { return } // 计算下一个窗口槽位 next := tw.NextSlot() // 保留下一个窗口的数据 tms := next.Start.Add(-tw.size) tme := next.End.Add(tw.size) temp := types.NewTimeSlot(&tms, &tme) newData := make([]types.Row, 0) for _, item := range tw.data { if temp.Contains(item.Timestamp) { newData = append(newData, item) } } // 提取出当前窗口数据 resultData := make([]types.Row, 0) for _, item := range tw.data { if tw.currentSlot.Contains(item.Timestamp) { item.Slot = tw.currentSlot resultData = append(resultData, item) } } // 如果设置了回调函数,则执行回调函数 if tw.callback != nil { tw.callback(resultData) } // 更新窗口内的数据 tw.data = newData tw.currentSlot = next // 非阻塞发送到输出通道 tw.sendResultNonBlocking(resultData) } // sendResultNonBlocking 非阻塞地发送结果到输出通道 func (tw *TumblingWindow) sendResultNonBlocking(resultData []types.Row) { select { case tw.outputChan <- resultData: // 成功发送 tw.sentCount++ default: // 通道已满,丢弃结果(可选:记录日志或触发告警) tw.droppedCount++ // 可选:在这里添加日志记录 // log.Printf("Window output channel full, dropped result with %d rows", len(resultData)) } } // Reset 重置滚动窗口的数据。 func (tw *TumblingWindow) Reset() { // 加锁以确保并发安全。 tw.mu.Lock() defer tw.mu.Unlock() // 停止现有的timer tw.timerMu.Lock() if tw.timer != nil { tw.timer.Stop() tw.timer = nil } tw.timerMu.Unlock() // 清空窗口数据。 tw.data = nil tw.currentSlot = nil tw.initialized = false tw.initChan = make(chan struct{}) } // OutputChan 返回一个只读通道,用于接收窗口触发时的数据。 func (tw *TumblingWindow) OutputChan() <-chan []types.Row { return tw.outputChan } // SetCallback 设置滚动窗口触发时的回调函数。 // 参数 callback 是要设置的回调函数。 func (tw *TumblingWindow) SetCallback(callback func([]types.Row)) { tw.mu.Lock() defer tw.mu.Unlock() tw.callback = callback } // GetStats 获取窗口性能统计信息 func (tw *TumblingWindow) GetStats() map[string]int64 { tw.mu.RLock() defer tw.mu.RUnlock() return map[string]int64{ "sent_count": tw.sentCount, "dropped_count": tw.droppedCount, "buffer_size": int64(cap(tw.outputChan)), "buffer_used": int64(len(tw.outputChan)), } } // ResetStats 重置性能统计 func (tw *TumblingWindow) ResetStats() { tw.mu.Lock() defer tw.mu.Unlock() tw.sentCount = 0 tw.droppedCount = 0 } // // GetResults 获取当前滚动窗口中的数据副本。 // func (tw *TumblingWindow) GetResults() []interface{} { // // 加锁以确保并发安全。 // tw.mu.Lock() // defer tw.mu.Unlock() // // 返回窗口数据的副本。 // return append([]interface{}{}, tw.data...) // }