diff --git a/window/counting_window.go b/window/counting_window.go index 53ccc04..4780b15 100644 --- a/window/counting_window.go +++ b/window/counting_window.go @@ -19,8 +19,9 @@ package window import ( "context" "fmt" + "reflect" + "strings" "sync" - "time" "github.com/rulego/streamsql/utils/cast" "github.com/rulego/streamsql/utils/timex" @@ -31,60 +32,82 @@ import ( var _ Window = (*CountingWindow)(nil) type CountingWindow struct { - config types.WindowConfig - threshold int - count int - mu sync.Mutex - callback func([]types.Row) - dataBuffer []types.Row - outputChan chan []types.Row - ctx context.Context - cancelFunc context.CancelFunc - ticker *time.Ticker - triggerChan chan types.Row + config types.WindowConfig + threshold int + count int + mu sync.Mutex + callback func([]types.Row) + dataBuffer []types.Row + outputChan chan []types.Row + ctx context.Context + cancelFunc context.CancelFunc + triggerChan chan types.Row + keyedBuffer map[string][]types.Row + keyedCount map[string]int + sentCount int64 + droppedCount int64 + stopped bool } func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error) { ctx, cancel := context.WithCancel(context.Background()) - threshold := cast.ToInt(config.Params["count"]) + defer func() { + if cancel != nil { + // cancel will be used in the returned struct + } + }() + + // Get count parameter from params array + if len(config.Params) == 0 { + cancel() + return nil, fmt.Errorf("counting window requires 'count' parameter") + } + + countVal := config.Params[0] + threshold := cast.ToInt(countVal) if threshold <= 0 { - return nil, fmt.Errorf("threshold must be a positive integer") + return nil, fmt.Errorf("threshold must be a positive integer, got: %v", countVal) } // Use unified performance config to get window output buffer size bufferSize := 100 // Default value, counting windows usually have smaller buffers - if perfConfig, exists := config.Params["performanceConfig"]; exists { - if pc, ok := perfConfig.(types.PerformanceConfig); ok { - bufferSize = pc.BufferConfig.WindowOutputSize / 10 // Counting window uses 1/10 of buffer - if bufferSize < 10 { - bufferSize = 10 // Minimum value - } + if (config.PerformanceConfig != types.PerformanceConfig{}) { + bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize / 10 // Counting window uses 1/10 of buffer + if bufferSize < 10 { + bufferSize = 10 // Minimum value } } cw := &CountingWindow{ + config: config, threshold: threshold, dataBuffer: make([]types.Row, 0, threshold), outputChan: make(chan []types.Row, bufferSize), ctx: ctx, cancelFunc: cancel, - triggerChan: make(chan types.Row, 3), + triggerChan: make(chan types.Row, bufferSize), + keyedBuffer: make(map[string][]types.Row), + keyedCount: make(map[string]int), } - if callback, ok := config.Params["callback"].(func([]types.Row)); ok { - cw.SetCallback(callback) + // Set callback if provided + if config.Callback != nil { + cw.SetCallback(config.Callback) } return cw, nil } func (cw *CountingWindow) Add(data interface{}) { - // Add data to window data list t := GetTimestamp(data, cw.config.TsProp, cw.config.TimeUnit) row := types.Row{ Data: data, Timestamp: t, } - cw.triggerChan <- row + + select { + case cw.triggerChan <- row: + case <-cw.ctx.Done(): + } } func (cw *CountingWindow) Start() { go func() { @@ -97,39 +120,44 @@ func (cw *CountingWindow) Start() { // Channel closed, exit loop return } + key := cw.getKey(row.Data) cw.mu.Lock() - cw.dataBuffer = append(cw.dataBuffer, row) - cw.count++ - shouldTrigger := cw.count >= cw.threshold - if shouldTrigger { - // Process immediately while holding lock - slot := cw.createSlot(cw.dataBuffer[:cw.threshold]) + buf := append(cw.keyedBuffer[key], row) + cw.keyedBuffer[key] = buf + cw.keyedCount[key] = len(buf) + if cw.keyedCount[key] >= cw.threshold { + slot := cw.createSlot(buf[:cw.threshold]) data := make([]types.Row, cw.threshold) - copy(data, cw.dataBuffer[:cw.threshold]) - // Set Slot field to copied data to avoid modifying original dataBuffer + copy(data, buf[:cw.threshold]) for i := range data { data[i].Slot = slot } - - if len(cw.dataBuffer) > cw.threshold { - remaining := len(cw.dataBuffer) - cw.threshold - newBuffer := make([]types.Row, remaining, cw.threshold) - copy(newBuffer, cw.dataBuffer[cw.threshold:]) - cw.dataBuffer = newBuffer + if len(buf) > cw.threshold { + rem := make([]types.Row, len(buf)-cw.threshold, cw.threshold) + copy(rem, buf[cw.threshold:]) + cw.keyedBuffer[key] = rem } else { - cw.dataBuffer = make([]types.Row, 0, cw.threshold) + cw.keyedBuffer[key] = make([]types.Row, 0, cw.threshold) } - // Reset count - cw.count = len(cw.dataBuffer) + cw.keyedCount[key] = len(cw.keyedBuffer[key]) cw.mu.Unlock() - // Handle callback after releasing lock - go func(data []types.Row) { - if cw.callback != nil { - cw.callback(data) - } - cw.outputChan <- data - }(data) + if cw.callback != nil { + cw.callback(data) + } + + select { + case cw.outputChan <- data: + cw.mu.Lock() + cw.sentCount++ + cw.mu.Unlock() + case <-cw.ctx.Done(): + return + default: + cw.mu.Lock() + cw.droppedCount++ + cw.mu.Unlock() + } } else { cw.mu.Unlock() } @@ -146,11 +174,42 @@ func (cw *CountingWindow) Trigger() { // This method is kept to satisfy Window interface requirements, but actual triggering is handled in Start method } +func (cw *CountingWindow) Stop() { + cw.mu.Lock() + stopped := cw.stopped + if !stopped { + cw.stopped = true + } + cw.mu.Unlock() + + if !stopped { + close(cw.triggerChan) + cw.cancelFunc() + } +} + func (cw *CountingWindow) Reset() { cw.mu.Lock() defer cw.mu.Unlock() + cw.count = 0 cw.dataBuffer = nil + cw.keyedBuffer = make(map[string][]types.Row) + cw.keyedCount = make(map[string]int) + cw.sentCount = 0 + cw.droppedCount = 0 +} + +func (cw *CountingWindow) GetStats() map[string]int64 { + cw.mu.Lock() + defer cw.mu.Unlock() + + return map[string]int64{ + "sentCount": cw.sentCount, + "droppedCount": cw.droppedCount, + "bufferSize": int64(cap(cw.outputChan)), + "bufferUsed": int64(len(cw.outputChan)), + } } func (cw *CountingWindow) OutputChan() <-chan []types.Row { @@ -177,3 +236,32 @@ func (cw *CountingWindow) createSlot(data []types.Row) *types.TimeSlot { return slot } } + +func (cw *CountingWindow) getKey(data interface{}) string { + // Use GroupByKeys array + keys := cw.config.GroupByKeys + if len(keys) == 0 { + return "__global__" + } + v := reflect.ValueOf(data) + keyParts := make([]string, 0, len(keys)) + for _, k := range keys { + var part string + switch v.Kind() { + case reflect.Map: + if v.Type().Key().Kind() == reflect.String { + mv := v.MapIndex(reflect.ValueOf(k)) + if mv.IsValid() { + part = cast.ToString(mv.Interface()) + } + } + case reflect.Struct: + f := v.FieldByName(k) + if f.IsValid() { + part = cast.ToString(f.Interface()) + } + } + keyParts = append(keyParts, part) + } + return strings.Join(keyParts, "|") +}