diff --git a/window/counting_window.go b/window/counting_window.go index a327c7d..069f671 100644 --- a/window/counting_window.go +++ b/window/counting_window.go @@ -24,7 +24,7 @@ type CountingWindow struct { ctx context.Context cancelFunc context.CancelFunc ticker *time.Ticker - triggerChan chan struct{} + triggerChan chan model.Row } func NewCountingWindow(config model.WindowConfig) (*CountingWindow, error) { @@ -40,7 +40,7 @@ func NewCountingWindow(config model.WindowConfig) (*CountingWindow, error) { outputChan: make(chan []model.Row, 10), ctx: ctx, cancelFunc: cancel, - triggerChan: make(chan struct{}, 1), + triggerChan: make(chan model.Row, 3), } if callback, ok := config.Params["callback"].(func([]model.Row)); ok { @@ -50,57 +50,35 @@ func NewCountingWindow(config model.WindowConfig) (*CountingWindow, error) { } func (cw *CountingWindow) Add(data interface{}) { - cw.mu.Lock() - defer cw.mu.Unlock() // 将数据添加到窗口的数据列表中 t := GetTimestamp(data, cw.config.TsProp) row := model.Row{ Data: data, Timestamp: t, } - cw.dataBuffer = append(cw.dataBuffer, row) - cw.count++ - shouldTrigger := cw.count >= cw.threshold - - if shouldTrigger { - slot := cw.createSlot(cw.dataBuffer[:cw.threshold]) - for _, r := range cw.dataBuffer[:cw.threshold] { - // 由于Row是值类型,这里需要通过指针来修改Slot字段 - (&r).Slot = slot - } - data := cw.dataBuffer[:cw.threshold] - if len(cw.dataBuffer) > cw.threshold { - remaining := len(cw.dataBuffer) - cw.threshold - newBuffer := make([]model.Row, remaining, cw.threshold) - copy(newBuffer, cw.dataBuffer[cw.threshold:]) - cw.dataBuffer = newBuffer - } else { - cw.dataBuffer = make([]model.Row, 0, cw.threshold) - } - go func() { - cw.mu.Lock() - if cw.callback != nil { - cw.callback(data) - } - cw.outputChan <- data - cw.count = 0 - //cw.Reset() - cw.mu.Unlock() - }() - } + cw.triggerChan <- row } func (cw *CountingWindow) Start() { go func() { - cw.ticker = time.NewTicker(1 * time.Second) - defer func() { - cw.ticker.Stop() - cw.cancelFunc() - }() + defer cw.cancelFunc() for { select { - case <-cw.ticker.C: - //cw.Trigger() + case row, ok := <-cw.triggerChan: + if !ok { + // 通道已关闭,退出循环 + return + } + cw.mu.Lock() + cw.dataBuffer = append(cw.dataBuffer, row) + cw.count++ + shouldTrigger := cw.count >= cw.threshold + cw.mu.Unlock() + // 只有当达到阈值时才触发 + if shouldTrigger { + cw.Trigger() + } + case <-cw.ctx.Done(): return } @@ -109,34 +87,39 @@ func (cw *CountingWindow) Start() { } func (cw *CountingWindow) Trigger() { - cw.triggerChan <- struct{}{} + //cw.triggerChan <- struct{}{} + cw.mu.Lock() + defer cw.mu.Unlock() - go func() { - cw.mu.Lock() - defer cw.mu.Unlock() - - if cw.callback != nil && len(cw.dataBuffer) > 0 { - var resultData []model.Row - if len(cw.dataBuffer) > cw.threshold { - resultData = cw.dataBuffer[:cw.threshold] - } else { - resultData = cw.dataBuffer - } - slot := cw.createSlot(resultData) - for _, r := range resultData { - r.Slot = slot - } - cw.callback(resultData) + slot := cw.createSlot(cw.dataBuffer[:cw.threshold]) + for _, r := range cw.dataBuffer[:cw.threshold] { + // 由于Row是值类型,这里需要通过指针来修改Slot字段 + (&r).Slot = slot + } + data := cw.dataBuffer[:cw.threshold] + if len(cw.dataBuffer) > cw.threshold { + remaining := len(cw.dataBuffer) - cw.threshold + newBuffer := make([]model.Row, remaining, cw.threshold) + copy(newBuffer, cw.dataBuffer[cw.threshold:]) + cw.dataBuffer = newBuffer + } else { + cw.dataBuffer = make([]model.Row, 0, cw.threshold) + } + // 重置计数 + cw.count = len(cw.dataBuffer) + go func(data []model.Row) { + if cw.callback != nil { + cw.callback(data) } - cw.Reset() - }() + cw.outputChan <- data + }(data) } func (cw *CountingWindow) Reset() { cw.mu.Lock() - defer cw.mu.Unlock() cw.count = 0 - cw.dataBuffer = cw.dataBuffer[:0] + cw.mu.Unlock() + cw.dataBuffer = nil } func (cw *CountingWindow) OutputChan() <-chan []model.Row {