mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-15 06:47:26 +00:00
fix:Mutex Locking Causes Deadlock
This commit is contained in:
@@ -212,23 +212,14 @@ func (sw *SlidingWindow) Trigger() {
|
||||
sw.data = newData
|
||||
sw.currentSlot = next
|
||||
|
||||
// 非阻塞发送到输出通道
|
||||
sw.sendResultNonBlocking(resultData)
|
||||
}
|
||||
|
||||
// sendResultNonBlocking 非阻塞地发送结果到输出通道
|
||||
func (sw *SlidingWindow) sendResultNonBlocking(resultData []types.Row) {
|
||||
// 非阻塞发送到输出通道并直接更新统计信息(已在锁内)
|
||||
select {
|
||||
case sw.outputChan <- resultData:
|
||||
// 成功发送
|
||||
sw.mu.Lock()
|
||||
// 成功发送,更新统计信息(已在锁内)
|
||||
sw.sentCount++
|
||||
sw.mu.Unlock()
|
||||
default:
|
||||
// 通道已满,丢弃结果
|
||||
sw.mu.Lock()
|
||||
// 通道已满,丢弃结果并更新统计信息(已在锁内)
|
||||
sw.droppedCount++
|
||||
sw.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -209,23 +209,14 @@ func (tw *TumblingWindow) Trigger() {
|
||||
tw.data = newData
|
||||
tw.currentSlot = next
|
||||
|
||||
// 非阻塞发送到输出通道
|
||||
tw.sendResultNonBlocking(resultData)
|
||||
}
|
||||
|
||||
// sendResultNonBlocking 非阻塞地发送结果到输出通道
|
||||
func (tw *TumblingWindow) sendResultNonBlocking(resultData []types.Row) {
|
||||
// 非阻塞发送到输出通道并更新统计信息
|
||||
select {
|
||||
case tw.outputChan <- resultData:
|
||||
// 成功发送
|
||||
tw.mu.Lock()
|
||||
// 成功发送,更新统计信息(已在锁内)
|
||||
tw.sentCount++
|
||||
tw.mu.Unlock()
|
||||
default:
|
||||
// 通道已满,丢弃结果(可选:记录日志或触发告警)
|
||||
tw.mu.Lock()
|
||||
// 通道已满,丢弃结果并更新统计信息(已在锁内)
|
||||
tw.droppedCount++
|
||||
tw.mu.Unlock()
|
||||
// 可选:在这里添加日志记录
|
||||
// log.Printf("Window output channel full, dropped result with %d rows", len(resultData))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user