forked from GiteaTest2015/streamsql
96 lines
2.4 KiB
Go
96 lines
2.4 KiB
Go
/*
|
|
* Copyright 2025 The RuleGo Authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package window
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/rulego/streamsql/utils/cast"
|
|
|
|
"github.com/rulego/streamsql/types"
|
|
)
|
|
|
|
const (
|
|
TypeTumbling = "tumbling"
|
|
TypeSliding = "sliding"
|
|
TypeCounting = "counting"
|
|
TypeSession = "session"
|
|
)
|
|
|
|
type Window interface {
|
|
Add(item interface{})
|
|
//GetResults() []interface{}
|
|
Reset()
|
|
Start()
|
|
OutputChan() <-chan []types.Row
|
|
SetCallback(callback func([]types.Row))
|
|
Trigger()
|
|
}
|
|
|
|
func CreateWindow(config types.WindowConfig) (Window, error) {
|
|
switch config.Type {
|
|
case TypeTumbling:
|
|
return NewTumblingWindow(config)
|
|
case TypeSliding:
|
|
return NewSlidingWindow(config)
|
|
case TypeCounting:
|
|
return NewCountingWindow(config)
|
|
case TypeSession:
|
|
return NewSessionWindow(config)
|
|
default:
|
|
return nil, fmt.Errorf("unsupported window type: %s", config.Type)
|
|
}
|
|
}
|
|
|
|
func (cw *CountingWindow) SetCallback(callback func([]types.Row)) {
|
|
cw.callback = callback
|
|
}
|
|
|
|
// GetTimestamp extracts timestamp from data
|
|
func GetTimestamp(data interface{}, tsProp string, timeUnit time.Duration) time.Time {
|
|
if ts, ok := data.(interface{ GetTimestamp() time.Time }); ok {
|
|
return ts.GetTimestamp()
|
|
} else if tsProp != "" {
|
|
v := reflect.ValueOf(data)
|
|
|
|
// Handle different types
|
|
switch v.Kind() {
|
|
case reflect.Struct:
|
|
// If it's a struct, use reflection to get field value
|
|
if f := v.FieldByName(tsProp); f.IsValid() {
|
|
if t, ok := f.Interface().(time.Time); ok {
|
|
return t
|
|
}
|
|
}
|
|
case reflect.Map:
|
|
// If it's a map, get value directly through key
|
|
if v.Type().Key().Kind() == reflect.String {
|
|
if value := v.MapIndex(reflect.ValueOf(tsProp)); value.IsValid() {
|
|
if t, ok := value.Interface().(time.Time); ok {
|
|
return t
|
|
} else if timestampInt, isInt := value.Interface().(int64); isInt {
|
|
return cast.ConvertIntToTime(timestampInt, timeUnit)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return time.Now()
|
|
}
|