Merge pull request #4 from dimon-83/main

refactor(window): 重构CountingWindow以优化数据触发逻辑
This commit is contained in:
Whki
2025-04-11 20:31:13 +08:00
committed by GitHub
+45 -62
View File
@@ -24,7 +24,7 @@ type CountingWindow struct {
ctx context.Context
cancelFunc context.CancelFunc
ticker *time.Ticker
triggerChan chan struct{}
triggerChan chan model.Row
}
func NewCountingWindow(config model.WindowConfig) (*CountingWindow, error) {
@@ -40,7 +40,7 @@ func NewCountingWindow(config model.WindowConfig) (*CountingWindow, error) {
outputChan: make(chan []model.Row, 10),
ctx: ctx,
cancelFunc: cancel,
triggerChan: make(chan struct{}, 1),
triggerChan: make(chan model.Row, 3),
}
if callback, ok := config.Params["callback"].(func([]model.Row)); ok {
@@ -50,57 +50,35 @@ func NewCountingWindow(config model.WindowConfig) (*CountingWindow, error) {
}
func (cw *CountingWindow) Add(data interface{}) {
cw.mu.Lock()
defer cw.mu.Unlock()
// 将数据添加到窗口的数据列表中
t := GetTimestamp(data, cw.config.TsProp)
row := model.Row{
Data: data,
Timestamp: t,
}
cw.dataBuffer = append(cw.dataBuffer, row)
cw.count++
shouldTrigger := cw.count >= cw.threshold
if shouldTrigger {
slot := cw.createSlot(cw.dataBuffer[:cw.threshold])
for _, r := range cw.dataBuffer[:cw.threshold] {
// 由于Row是值类型这里需要通过指针来修改Slot字段
(&r).Slot = slot
}
data := cw.dataBuffer[:cw.threshold]
if len(cw.dataBuffer) > cw.threshold {
remaining := len(cw.dataBuffer) - cw.threshold
newBuffer := make([]model.Row, remaining, cw.threshold)
copy(newBuffer, cw.dataBuffer[cw.threshold:])
cw.dataBuffer = newBuffer
} else {
cw.dataBuffer = make([]model.Row, 0, cw.threshold)
}
go func() {
cw.mu.Lock()
if cw.callback != nil {
cw.callback(data)
}
cw.outputChan <- data
cw.count = 0
//cw.Reset()
cw.mu.Unlock()
}()
}
cw.triggerChan <- row
}
func (cw *CountingWindow) Start() {
go func() {
cw.ticker = time.NewTicker(1 * time.Second)
defer func() {
cw.ticker.Stop()
cw.cancelFunc()
}()
defer cw.cancelFunc()
for {
select {
case <-cw.ticker.C:
//cw.Trigger()
case row, ok := <-cw.triggerChan:
if !ok {
// 通道已关闭,退出循环
return
}
cw.mu.Lock()
cw.dataBuffer = append(cw.dataBuffer, row)
cw.count++
shouldTrigger := cw.count >= cw.threshold
cw.mu.Unlock()
// 只有当达到阈值时才触发
if shouldTrigger {
cw.Trigger()
}
case <-cw.ctx.Done():
return
}
@@ -109,34 +87,39 @@ func (cw *CountingWindow) Start() {
}
func (cw *CountingWindow) Trigger() {
cw.triggerChan <- struct{}{}
//cw.triggerChan <- struct{}{}
cw.mu.Lock()
defer cw.mu.Unlock()
go func() {
cw.mu.Lock()
defer cw.mu.Unlock()
if cw.callback != nil && len(cw.dataBuffer) > 0 {
var resultData []model.Row
if len(cw.dataBuffer) > cw.threshold {
resultData = cw.dataBuffer[:cw.threshold]
} else {
resultData = cw.dataBuffer
}
slot := cw.createSlot(resultData)
for _, r := range resultData {
r.Slot = slot
}
cw.callback(resultData)
slot := cw.createSlot(cw.dataBuffer[:cw.threshold])
for _, r := range cw.dataBuffer[:cw.threshold] {
// 由于Row是值类型这里需要通过指针来修改Slot字段
(&r).Slot = slot
}
data := cw.dataBuffer[:cw.threshold]
if len(cw.dataBuffer) > cw.threshold {
remaining := len(cw.dataBuffer) - cw.threshold
newBuffer := make([]model.Row, remaining, cw.threshold)
copy(newBuffer, cw.dataBuffer[cw.threshold:])
cw.dataBuffer = newBuffer
} else {
cw.dataBuffer = make([]model.Row, 0, cw.threshold)
}
// 重置计数
cw.count = len(cw.dataBuffer)
go func(data []model.Row) {
if cw.callback != nil {
cw.callback(data)
}
cw.Reset()
}()
cw.outputChan <- data
}(data)
}
func (cw *CountingWindow) Reset() {
cw.mu.Lock()
defer cw.mu.Unlock()
cw.count = 0
cw.dataBuffer = cw.dataBuffer[:0]
cw.mu.Unlock()
cw.dataBuffer = nil
}
func (cw *CountingWindow) OutputChan() <-chan []model.Row {