feat: 增加window时间槽timeslot;

This commit is contained in:
dimon
2025-04-07 17:27:58 +08:00
parent 6fc3e34527
commit 20313690d3
11 changed files with 330 additions and 85 deletions

View File

@ -11,6 +11,7 @@ type RowEvent interface {
type Row struct {
Timestamp time.Time
Data interface{}
Slot *TimeSlot
}
// GetTimestamp 获取时间戳

52
model/timeslot.go Normal file
View File

@ -0,0 +1,52 @@
package model
import (
"time"
)
type TimeSlot struct {
Start *time.Time
End *time.Time
}
func NewTimeSlot(start, end *time.Time) *TimeSlot {
return &TimeSlot{
Start: start,
End: end,
}
}
// Hash 生成槽位的哈希值
func (ts TimeSlot) Hash() uint64 {
// 将开始时间和结束时间转换为 Unix 时间戳(纳秒级)
startNano := ts.Start.UnixNano()
endNano := ts.End.UnixNano()
// 使用简单但高效的哈希算法
// 将两个时间戳组合成一个唯一的哈希值
hash := uint64(startNano)
hash = (hash << 32) | (hash >> 32)
hash = hash ^ uint64(endNano)
return hash
}
// Contains 检查给定时间是否在槽位范围内
func (ts TimeSlot) Contains(t time.Time) bool {
return (t.Equal(*ts.Start) || t.After(*ts.Start)) &&
(t.Equal(*ts.End) || t.Before(*ts.End))
}
func (ts *TimeSlot) GetStartTime() *time.Time {
if ts == nil || ts.Start == nil {
return nil
}
return ts.Start
}
func (ts *TimeSlot) GetEndTime() *time.Time {
if ts == nil || ts.End == nil {
return nil
}
return ts.End
}

8
utils/time.go Normal file
View File

@ -0,0 +1,8 @@
package timex
import "time"
func AlignTimeToWindow(t time.Time, size time.Duration) time.Time {
offset := t.UnixNano() % int64(size)
return t.Add(time.Duration(-offset))
}

69
utils/time_test.go Normal file
View File

@ -0,0 +1,69 @@
// Copyright 2021 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package timex
import (
"testing"
"time"
)
func TestAlignTimeToWindow(t *testing.T) {
tests := []struct {
name string
input time.Time
size time.Duration
expected time.Time
}{
{
name: "对齐到1分钟窗口",
input: time.Date(2024, 1, 1, 12, 35, 56, 789000000, time.UTC),
size: 3 * time.Minute,
expected: time.Date(2024, 1, 1, 12, 33, 0, 0, time.UTC),
},
{
name: "对齐到5分钟窗口",
input: time.Date(2024, 1, 1, 12, 37, 56, 789000000, time.UTC),
size: 5 * time.Minute,
expected: time.Date(2024, 1, 1, 12, 35, 0, 0, time.UTC),
},
{
name: "对齐到1小时窗口",
input: time.Date(2024, 1, 1, 12, 34, 56, 789000000, time.UTC),
size: time.Hour,
expected: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC),
},
{
name: "对齐到1天窗口",
input: time.Date(2024, 1, 1, 12, 34, 56, 789000000, time.UTC),
size: 24 * time.Hour,
expected: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
name: "零时刻对齐测试",
input: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
size: time.Hour,
expected: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := AlignTimeToWindow(tt.input, tt.size)
if !got.Equal(tt.expected) {
t.Errorf("AlignTimeToWindow() = %v, want %v", got, tt.expected)
}
})
}
}

View File

@ -17,9 +17,9 @@ type CountingWindow struct {
threshold int
count int
mu sync.Mutex
callback func([]interface{})
dataBuffer []interface{}
outputChan chan []interface{}
callback func([]model.Row)
dataBuffer []model.Row
outputChan chan []model.Row
ctx context.Context
cancelFunc context.CancelFunc
ticker *time.Ticker
@ -35,14 +35,14 @@ func NewCountingWindow(config model.WindowConfig) (*CountingWindow, error) {
cw := &CountingWindow{
threshold: threshold,
dataBuffer: make([]interface{}, 0, threshold),
outputChan: make(chan []interface{}, 10),
dataBuffer: make([]model.Row, 0, threshold),
outputChan: make(chan []model.Row, 10),
ctx: ctx,
cancelFunc: cancel,
triggerChan: make(chan struct{}, 1),
}
if callback, ok := config.Params["callback"].(func([]interface{})); ok {
if callback, ok := config.Params["callback"].(func([]model.Row)); ok {
cw.SetCallback(callback)
}
return cw, nil
@ -50,21 +50,21 @@ func NewCountingWindow(config model.WindowConfig) (*CountingWindow, error) {
func (cw *CountingWindow) Add(data interface{}) {
cw.mu.Lock()
cw.dataBuffer = append(cw.dataBuffer, data)
defer cw.mu.Unlock()
row := model.Row{
Data: data,
Timestamp: GetTimestamp(data, cw.config.TsProp),
}
cw.dataBuffer = append(cw.dataBuffer, row)
cw.count++
shouldTrigger := cw.count >= cw.threshold
cw.mu.Unlock()
if shouldTrigger {
cw.mu.Lock()
v := append([]interface{}{}, cw.dataBuffer...)
cw.mu.Unlock()
go func() {
if cw.callback != nil {
cw.callback(v)
cw.callback(cw.dataBuffer)
}
cw.outputChan <- v
cw.outputChan <- cw.dataBuffer
cw.Reset()
}()
}
@ -109,9 +109,10 @@ func (cw *CountingWindow) Reset() {
cw.dataBuffer = cw.dataBuffer[:0]
}
func (cw *CountingWindow) OutputChan() <-chan []interface{} {
func (cw *CountingWindow) OutputChan() <-chan []model.Row {
return cw.outputChan
}
func (cw *CountingWindow) GetResults() []interface{} {
return append([]interface{}{}, cw.dataBuffer...)
}
// func (cw *CountingWindow) GetResults() []interface{} {
// return append([]mode.Row, cw.dataBuffer...)
// }

View File

@ -34,7 +34,7 @@ func TestCountingWindow(t *testing.T) {
// Trigger one more element to check threshold
cw.Add(3)
results := make(chan []interface{})
results := make(chan []model.Row)
go func() {
for res := range cw.OutputChan() {
results <- res
@ -44,9 +44,13 @@ func TestCountingWindow(t *testing.T) {
select {
case res := <-results:
assert.Len(t, res, 3)
assert.Contains(t, res, 0)
assert.Contains(t, res, 1)
assert.Contains(t, res, 2)
raw := make([]interface{}, len(res))
for _, row := range res {
raw = append(raw, row.Data)
}
assert.Contains(t, raw, 0)
assert.Contains(t, raw, 1)
assert.Contains(t, raw, 2)
case <-time.After(2 * time.Second):
t.Error("No results received within timeout")
}

View File

@ -17,11 +17,11 @@ const (
type Window interface {
Add(item interface{})
GetResults() []interface{}
//GetResults() []interface{}
Reset()
Start()
OutputChan() <-chan []interface{}
SetCallback(callback func([]interface{}))
OutputChan() <-chan []model.Row
SetCallback(callback func([]model.Row))
Trigger()
}
@ -38,7 +38,7 @@ func CreateWindow(config model.WindowConfig) (Window, error) {
}
}
func (cw *CountingWindow) SetCallback(callback func([]interface{})) {
func (cw *CountingWindow) SetCallback(callback func([]model.Row)) {
cw.callback = callback
}

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/rulego/streamsql/model"
timex "github.com/rulego/streamsql/utils"
"github.com/spf13/cast"
)
@ -30,17 +31,19 @@ type SlidingWindow struct {
// 用于保护数据并发访问的互斥锁
mu sync.Mutex
// 存储窗口内的数据
data []TimedData
data []model.Row
// 用于输出窗口内数据的通道
outputChan chan []interface{}
outputChan chan []model.Row
// 当窗口触发时执行的回调函数
callback func([]interface{})
callback func([]model.Row)
// 用于控制窗口生命周期的上下文
ctx context.Context
// 用于取消上下文的函数
cancelFunc context.CancelFunc
// 用于定时触发窗口的定时器
timer *time.Timer
timer *time.Timer
startSlot *model.TimeSlot
currentSlot *model.TimeSlot
}
// NewSlidingWindow 创建一个新的滑动窗口实例
@ -60,10 +63,10 @@ func NewSlidingWindow(config model.WindowConfig) (*SlidingWindow, error) {
config: config,
size: size,
slide: slide,
outputChan: make(chan []interface{}, 10),
outputChan: make(chan []model.Row, 10),
ctx: ctx,
cancelFunc: cancel,
data: make([]TimedData, 0),
data: make([]model.Row, 0),
}, nil
}
@ -74,10 +77,34 @@ func (sw *SlidingWindow) Add(data interface{}) {
sw.mu.Lock()
defer sw.mu.Unlock()
// 将数据添加到窗口的数据列表中
sw.data = append(sw.data, TimedData{
if sw.startSlot == nil {
sw.startSlot = sw.createSlot(GetTimestamp(data, sw.config.TsProp))
sw.currentSlot = sw.startSlot
}
row := model.Row{
Data: data,
Timestamp: GetTimestamp(data, sw.config.TsProp),
})
}
sw.data = append(sw.data, row)
}
func (sw *SlidingWindow) createSlot(t time.Time) *model.TimeSlot {
// 创建一个新的时间槽位
start := timex.AlignTimeToWindow(t, sw.size)
end := start.Add(sw.size)
slot := model.NewTimeSlot(&start, &end)
return slot
}
func (sw *SlidingWindow) NextSlot() *model.TimeSlot {
if sw.currentSlot == nil {
return nil
}
start := sw.currentSlot.Start.Add(sw.slide)
end := sw.currentSlot.End.Add(sw.slide)
next := model.NewTimeSlot(&start, &end)
return next
}
// Start 启动滑动窗口,开始定时触发窗口
@ -113,19 +140,22 @@ func (sw *SlidingWindow) Trigger() {
}
// 计算截止时间,即当前时间减去窗口的总大小
cutoff := time.Now().Add(-sw.size)
var newData []TimedData
next := sw.NextSlot()
var newData []model.Row
// 遍历窗口内的数据,只保留在截止时间之后的数据
for _, item := range sw.data {
if item.Timestamp.After(cutoff) {
if next.Contains(item.Timestamp) {
newData = append(newData, item)
}
}
// 提取出 Data 字段组成 []interface{} 类型的数据
resultData := make([]interface{}, 0, len(newData))
for _, item := range newData {
resultData = append(resultData, item.Data)
resultData := make([]model.Row, 0)
for _, item := range sw.data {
if sw.currentSlot.Contains(item.Timestamp) {
item.Slot = sw.currentSlot
resultData = append(resultData, item)
}
}
// 如果设置了回调函数,则执行回调函数
@ -135,6 +165,7 @@ func (sw *SlidingWindow) Trigger() {
// 更新窗口内的数据
sw.data = newData
sw.currentSlot = next
// 将新的数据发送到输出通道
sw.outputChan <- resultData
}
@ -149,13 +180,13 @@ func (sw *SlidingWindow) Reset() {
}
// OutputChan 返回滑动窗口的输出通道
func (sw *SlidingWindow) OutputChan() <-chan []interface{} {
func (sw *SlidingWindow) OutputChan() <-chan []model.Row {
return sw.outputChan
}
// SetCallback 设置滑动窗口触发时执行的回调函数
// 参数 callback 表示要设置的回调函数
func (sw *SlidingWindow) SetCallback(callback func([]interface{})) {
func (sw *SlidingWindow) SetCallback(callback func([]model.Row)) {
sw.callback = callback
}

View File

@ -18,19 +18,19 @@ func TestSlidingWindow(t *testing.T) {
"size": "2s",
"slide": "1s",
},
TsProp: "Ts",
TsProp: "Ts",
TimeUnit: time.Second,
})
sw.SetCallback(func(results []interface{}) {
sw.SetCallback(func(results []model.Row) {
t.Logf("Received results: %v", results)
})
sw.Start()
// 添加数据
now := time.Now()
t_3 := TestDate{Ts: now.Add(-3 * time.Second)}
t_2 := TestDate{Ts: now.Add(-2 * time.Second)}
t_1 := TestDate{Ts: now.Add(-1 * time.Second)}
t_0 := TestDate{Ts: now}
t_3 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 56, 789000000, time.UTC), tag: "1"}
t_2 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 57, 789000000, time.UTC), tag: "2"}
t_1 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 58, 789000000, time.UTC), tag: "3"}
t_0 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 59, 789000000, time.UTC), tag: "4"}
sw.Add(t_3)
sw.Add(t_2)
@ -38,25 +38,56 @@ func TestSlidingWindow(t *testing.T) {
sw.Add(t_0)
// 等待一段时间,触发窗口
time.Sleep(3 * time.Second)
//time.Sleep(3 * time.Second)
// 检查结果
resultsChan := sw.OutputChan()
var results []interface{}
select {
case results = <-resultsChan:
case <-time.After(100 * time.Second):
t.Fatal("No results received within timeout")
var results []model.Row
for {
select {
case results = <-resultsChan:
raw := make([]TestDate, 0)
for _, row := range results {
raw = append(raw, row.Data.(TestDate))
}
// 获取当前窗口的时间范围
windowStart := results[0].Slot.Start
windowEnd := results[0].Slot.End
t.Logf("Window range: %v - %v", windowStart, windowEnd)
// 检查窗口内的数据
expectedData := make([]TestDate, 0)
if windowStart.Before(t_3.Ts) && windowEnd.After(t_2.Ts) {
expectedData = []TestDate{t_3, t_2}
} else if windowStart.Before(t_2.Ts) && windowEnd.After(t_1.Ts) {
expectedData = []TestDate{t_2, t_1}
} else if windowStart.Before(t_1.Ts) && windowEnd.After(t_0.Ts) {
expectedData = []TestDate{t_1, t_0}
} else {
expectedData = []TestDate{t_0}
}
// 验证窗口数据
assert.Equal(t, len(expectedData), len(raw), "窗口数据数量不匹配")
for _, expected := range expectedData {
assert.Contains(t, raw, expected, "窗口缺少预期数据")
}
default:
// 通道为空时退出
goto END
}
}
END:
// 预期结果:保留最近 2 秒内的数据
assert.Len(t, results, 2)
assert.Contains(t, results, t_1)
assert.Contains(t, results, t_0)
assert.Len(t, results, 0)
}
type TestDate struct {
Ts time.Time
Ts time.Time
tag string
}
type TestDate2 struct {

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/rulego/streamsql/model"
timex "github.com/rulego/streamsql/utils"
"github.com/spf13/cast"
)
@ -23,17 +24,19 @@ type TumblingWindow struct {
// mu 用于保护对窗口数据的并发访问。
mu sync.Mutex
// data 存储窗口内收集的数据。
data []interface{}
data []model.Row
// outputChan 是一个通道,用于在窗口触发时发送数据。
outputChan chan []interface{}
outputChan chan []model.Row
// callback 是一个可选的回调函数,在窗口触发时调用。
callback func([]interface{})
callback func([]model.Row)
// ctx 用于控制窗口的生命周期。
ctx context.Context
// cancelFunc 用于取消窗口的操作。
cancelFunc context.CancelFunc
// timer 用于定时触发窗口。
timer *time.Timer
timer *time.Timer
startSlot *model.TimeSlot
currentSlot *model.TimeSlot
}
// NewTumblingWindow 创建一个新的滚动窗口实例。
@ -48,7 +51,7 @@ func NewTumblingWindow(config model.WindowConfig) (*TumblingWindow, error) {
return &TumblingWindow{
config: config,
size: size,
outputChan: make(chan []interface{}, 10),
outputChan: make(chan []model.Row, 10),
ctx: ctx,
cancelFunc: cancel,
}, nil
@ -61,7 +64,33 @@ func (tw *TumblingWindow) Add(data interface{}) {
tw.mu.Lock()
defer tw.mu.Unlock()
// 将数据追加到窗口的数据列表中。
tw.data = append(tw.data, data)
if tw.startSlot == nil {
tw.startSlot = tw.createSlot(GetTimestamp(data, tw.config.TsProp))
tw.currentSlot = tw.startSlot
}
row := model.Row{
Data: data,
Timestamp: GetTimestamp(data, tw.config.TsProp),
}
tw.data = append(tw.data, row)
}
func (sw *TumblingWindow) createSlot(t time.Time) *model.TimeSlot {
// 创建一个新的时间槽位
start := timex.AlignTimeToWindow(t, sw.size)
end := start.Add(sw.size)
slot := model.NewTimeSlot(&start, &end)
return slot
}
func (sw *TumblingWindow) NextSlot() *model.TimeSlot {
if sw.currentSlot == nil {
return nil
}
start := sw.currentSlot.End
end := sw.currentSlot.End.Add(sw.size)
next := model.NewTimeSlot(start, &end)
return next
}
// Stop 停止滚动窗口的操作。
@ -98,16 +127,35 @@ func (tw *TumblingWindow) Trigger() {
// 加锁以确保并发安全。
tw.mu.Lock()
defer tw.mu.Unlock()
// 如果设置了回调函数,则调用它。
if tw.callback != nil {
tw.callback(tw.data)
// 计算截止时间,即当前时间减去窗口的总大小
next := tw.NextSlot()
var newData []model.Row
// 遍历窗口内的数据,只保留在截止时间之后的数据
for _, item := range tw.data {
if next.Contains(item.Timestamp) {
newData = append(newData, item)
}
}
// 将窗口数据发送到输出通道。
tw.outputChan <- append([]interface{}{}, tw.data...)
// 重置窗口数据。
tw.data = nil
// 提取出 Data 字段组成 []interface{} 类型的数据
resultData := make([]model.Row, 0)
for _, item := range tw.data {
if tw.currentSlot.Contains(item.Timestamp) {
item.Slot = tw.currentSlot
resultData = append(resultData, item)
}
}
// 如果设置了回调函数,则执行回调函数
if tw.callback != nil {
tw.callback(resultData)
}
// 更新窗口内的数据
tw.data = newData
tw.currentSlot = next
// 将新的数据发送到输出通道
tw.outputChan <- resultData
}
// Reset 重置滚动窗口的数据。
@ -120,21 +168,21 @@ func (tw *TumblingWindow) Reset() {
}
// OutputChan 返回一个只读通道,用于接收窗口触发时的数据。
func (tw *TumblingWindow) OutputChan() <-chan []interface{} {
func (tw *TumblingWindow) OutputChan() <-chan []model.Row {
return tw.outputChan
}
// SetCallback 设置滚动窗口触发时的回调函数。
// 参数 callback 是要设置的回调函数。
func (tw *TumblingWindow) SetCallback(callback func([]interface{})) {
func (tw *TumblingWindow) SetCallback(callback func([]model.Row)) {
tw.callback = callback
}
// GetResults 获取当前滚动窗口中的数据副本。
func (tw *TumblingWindow) GetResults() []interface{} {
// 加锁以确保并发安全。
tw.mu.Lock()
defer tw.mu.Unlock()
// 返回窗口数据的副本。
return append([]interface{}{}, tw.data...)
}
// // GetResults 获取当前滚动窗口中的数据副本。
// func (tw *TumblingWindow) GetResults() []interface{} {
// // 加锁以确保并发安全。
// tw.mu.Lock()
// defer tw.mu.Unlock()
// // 返回窗口数据的副本。
// return append([]interface{}{}, tw.data...)
// }

View File

@ -17,7 +17,7 @@ func TestTumblingWindow(t *testing.T) {
Type: "TumblingWindow",
Params: map[string]interface{}{"size": "2s"},
})
tw.SetCallback(func(results []interface{}) {
tw.SetCallback(func(results []model.Row) {
// Process results
})
go tw.Start()
@ -30,7 +30,7 @@ func TestTumblingWindow(t *testing.T) {
// Check output channel
resultsChan := tw.OutputChan()
var results []interface{}
var results []model.Row
select {
case results = <-resultsChan:
case <-time.After(3 * time.Second):