diff --git a/window/sliding_window.go b/window/sliding_window.go index 88e2fec..d587b34 100644 --- a/window/sliding_window.go +++ b/window/sliding_window.go @@ -212,23 +212,14 @@ func (sw *SlidingWindow) Trigger() { sw.data = newData sw.currentSlot = next - // 非阻塞发送到输出通道 - sw.sendResultNonBlocking(resultData) -} - -// sendResultNonBlocking 非阻塞地发送结果到输出通道 -func (sw *SlidingWindow) sendResultNonBlocking(resultData []types.Row) { + // 非阻塞发送到输出通道并直接更新统计信息(已在锁内) select { case sw.outputChan <- resultData: - // 成功发送 - sw.mu.Lock() + // 成功发送,更新统计信息(已在锁内) sw.sentCount++ - sw.mu.Unlock() default: - // 通道已满,丢弃结果 - sw.mu.Lock() + // 通道已满,丢弃结果并更新统计信息(已在锁内) sw.droppedCount++ - sw.mu.Unlock() } } diff --git a/window/tumbling_window.go b/window/tumbling_window.go index 0009a26..c916048 100644 --- a/window/tumbling_window.go +++ b/window/tumbling_window.go @@ -209,23 +209,14 @@ func (tw *TumblingWindow) Trigger() { tw.data = newData tw.currentSlot = next - // 非阻塞发送到输出通道 - tw.sendResultNonBlocking(resultData) -} - -// sendResultNonBlocking 非阻塞地发送结果到输出通道 -func (tw *TumblingWindow) sendResultNonBlocking(resultData []types.Row) { + // 非阻塞发送到输出通道并更新统计信息 select { case tw.outputChan <- resultData: - // 成功发送 - tw.mu.Lock() + // 成功发送,更新统计信息(已在锁内) tw.sentCount++ - tw.mu.Unlock() default: - // 通道已满,丢弃结果(可选:记录日志或触发告警) - tw.mu.Lock() + // 通道已满,丢弃结果并更新统计信息(已在锁内) tw.droppedCount++ - tw.mu.Unlock() // 可选:在这里添加日志记录 // log.Printf("Window output channel full, dropped result with %d rows", len(resultData)) }