From 20313690d363ad1f558c2b4376a8d6e8aef62b0d Mon Sep 17 00:00:00 2001 From: dimon Date: Mon, 7 Apr 2025 17:27:58 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0window=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E6=A7=BDtimeslot=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- model/row.go | 1 + model/timeslot.go | 52 ++++++++++++++++++ utils/time.go | 8 +++ utils/time_test.go | 69 ++++++++++++++++++++++++ window/counting_window.go | 37 ++++++------- window/counting_window_test.go | 12 +++-- window/factory.go | 8 +-- window/sliding_window.go | 63 ++++++++++++++++------ window/sliding_window_test.go | 65 +++++++++++++++++------ window/tumbling_window.go | 96 +++++++++++++++++++++++++--------- window/tumbling_window_test.go | 4 +- 11 files changed, 330 insertions(+), 85 deletions(-) create mode 100644 model/timeslot.go create mode 100644 utils/time.go create mode 100644 utils/time_test.go diff --git a/model/row.go b/model/row.go index f724296..25e1f59 100644 --- a/model/row.go +++ b/model/row.go @@ -11,6 +11,7 @@ type RowEvent interface { type Row struct { Timestamp time.Time Data interface{} + Slot *TimeSlot } // GetTimestamp 获取时间戳 diff --git a/model/timeslot.go b/model/timeslot.go new file mode 100644 index 0000000..f6fb4b6 --- /dev/null +++ b/model/timeslot.go @@ -0,0 +1,52 @@ +package model + +import ( + "time" +) + +type TimeSlot struct { + Start *time.Time + End *time.Time +} + +func NewTimeSlot(start, end *time.Time) *TimeSlot { + return &TimeSlot{ + Start: start, + End: end, + } +} + +// Hash 生成槽位的哈希值 +func (ts TimeSlot) Hash() uint64 { + // 将开始时间和结束时间转换为 Unix 时间戳(纳秒级) + startNano := ts.Start.UnixNano() + endNano := ts.End.UnixNano() + + // 使用简单但高效的哈希算法 + // 将两个时间戳组合成一个唯一的哈希值 + hash := uint64(startNano) + hash = (hash << 32) | (hash >> 32) + hash = hash ^ uint64(endNano) + + return hash +} + +// Contains 检查给定时间是否在槽位范围内 +func (ts TimeSlot) Contains(t time.Time) bool { + return (t.Equal(*ts.Start) || t.After(*ts.Start)) && + (t.Equal(*ts.End) || t.Before(*ts.End)) +} + +func (ts *TimeSlot) GetStartTime() *time.Time { + if ts == nil || ts.Start == nil { + return nil + } + return ts.Start +} + +func (ts *TimeSlot) GetEndTime() *time.Time { + if ts == nil || ts.End == nil { + return nil + } + return ts.End +} diff --git a/utils/time.go b/utils/time.go new file mode 100644 index 0000000..2288b50 --- /dev/null +++ b/utils/time.go @@ -0,0 +1,8 @@ +package timex + +import "time" + +func AlignTimeToWindow(t time.Time, size time.Duration) time.Time { + offset := t.UnixNano() % int64(size) + return t.Add(time.Duration(-offset)) +} diff --git a/utils/time_test.go b/utils/time_test.go new file mode 100644 index 0000000..1db6904 --- /dev/null +++ b/utils/time_test.go @@ -0,0 +1,69 @@ +// Copyright 2021 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timex + +import ( + "testing" + "time" +) + +func TestAlignTimeToWindow(t *testing.T) { + tests := []struct { + name string + input time.Time + size time.Duration + expected time.Time + }{ + { + name: "对齐到1分钟窗口", + input: time.Date(2024, 1, 1, 12, 35, 56, 789000000, time.UTC), + size: 3 * time.Minute, + expected: time.Date(2024, 1, 1, 12, 33, 0, 0, time.UTC), + }, + { + name: "对齐到5分钟窗口", + input: time.Date(2024, 1, 1, 12, 37, 56, 789000000, time.UTC), + size: 5 * time.Minute, + expected: time.Date(2024, 1, 1, 12, 35, 0, 0, time.UTC), + }, + { + name: "对齐到1小时窗口", + input: time.Date(2024, 1, 1, 12, 34, 56, 789000000, time.UTC), + size: time.Hour, + expected: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), + }, + { + name: "对齐到1天窗口", + input: time.Date(2024, 1, 1, 12, 34, 56, 789000000, time.UTC), + size: 24 * time.Hour, + expected: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + }, + { + name: "零时刻对齐测试", + input: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + size: time.Hour, + expected: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := AlignTimeToWindow(tt.input, tt.size) + if !got.Equal(tt.expected) { + t.Errorf("AlignTimeToWindow() = %v, want %v", got, tt.expected) + } + }) + } +} diff --git a/window/counting_window.go b/window/counting_window.go index 424f29f..4a16a45 100644 --- a/window/counting_window.go +++ b/window/counting_window.go @@ -17,9 +17,9 @@ type CountingWindow struct { threshold int count int mu sync.Mutex - callback func([]interface{}) - dataBuffer []interface{} - outputChan chan []interface{} + callback func([]model.Row) + dataBuffer []model.Row + outputChan chan []model.Row ctx context.Context cancelFunc context.CancelFunc ticker *time.Ticker @@ -35,14 +35,14 @@ func NewCountingWindow(config model.WindowConfig) (*CountingWindow, error) { cw := &CountingWindow{ threshold: threshold, - dataBuffer: make([]interface{}, 0, threshold), - outputChan: make(chan []interface{}, 10), + dataBuffer: make([]model.Row, 0, threshold), + outputChan: make(chan []model.Row, 10), ctx: ctx, cancelFunc: cancel, triggerChan: make(chan struct{}, 1), } - if callback, ok := config.Params["callback"].(func([]interface{})); ok { + if callback, ok := config.Params["callback"].(func([]model.Row)); ok { cw.SetCallback(callback) } return cw, nil @@ -50,21 +50,21 @@ func NewCountingWindow(config model.WindowConfig) (*CountingWindow, error) { func (cw *CountingWindow) Add(data interface{}) { cw.mu.Lock() - cw.dataBuffer = append(cw.dataBuffer, data) + defer cw.mu.Unlock() + row := model.Row{ + Data: data, + Timestamp: GetTimestamp(data, cw.config.TsProp), + } + cw.dataBuffer = append(cw.dataBuffer, row) cw.count++ shouldTrigger := cw.count >= cw.threshold - cw.mu.Unlock() if shouldTrigger { - cw.mu.Lock() - v := append([]interface{}{}, cw.dataBuffer...) - cw.mu.Unlock() - go func() { if cw.callback != nil { - cw.callback(v) + cw.callback(cw.dataBuffer) } - cw.outputChan <- v + cw.outputChan <- cw.dataBuffer cw.Reset() }() } @@ -109,9 +109,10 @@ func (cw *CountingWindow) Reset() { cw.dataBuffer = cw.dataBuffer[:0] } -func (cw *CountingWindow) OutputChan() <-chan []interface{} { +func (cw *CountingWindow) OutputChan() <-chan []model.Row { return cw.outputChan } -func (cw *CountingWindow) GetResults() []interface{} { - return append([]interface{}{}, cw.dataBuffer...) -} + +// func (cw *CountingWindow) GetResults() []interface{} { +// return append([]mode.Row, cw.dataBuffer...) +// } diff --git a/window/counting_window_test.go b/window/counting_window_test.go index 50ef2bb..5d1ae02 100644 --- a/window/counting_window_test.go +++ b/window/counting_window_test.go @@ -34,7 +34,7 @@ func TestCountingWindow(t *testing.T) { // Trigger one more element to check threshold cw.Add(3) - results := make(chan []interface{}) + results := make(chan []model.Row) go func() { for res := range cw.OutputChan() { results <- res @@ -44,9 +44,13 @@ func TestCountingWindow(t *testing.T) { select { case res := <-results: assert.Len(t, res, 3) - assert.Contains(t, res, 0) - assert.Contains(t, res, 1) - assert.Contains(t, res, 2) + raw := make([]interface{}, len(res)) + for _, row := range res { + raw = append(raw, row.Data) + } + assert.Contains(t, raw, 0) + assert.Contains(t, raw, 1) + assert.Contains(t, raw, 2) case <-time.After(2 * time.Second): t.Error("No results received within timeout") } diff --git a/window/factory.go b/window/factory.go index d3cccce..483e81c 100644 --- a/window/factory.go +++ b/window/factory.go @@ -17,11 +17,11 @@ const ( type Window interface { Add(item interface{}) - GetResults() []interface{} + //GetResults() []interface{} Reset() Start() - OutputChan() <-chan []interface{} - SetCallback(callback func([]interface{})) + OutputChan() <-chan []model.Row + SetCallback(callback func([]model.Row)) Trigger() } @@ -38,7 +38,7 @@ func CreateWindow(config model.WindowConfig) (Window, error) { } } -func (cw *CountingWindow) SetCallback(callback func([]interface{})) { +func (cw *CountingWindow) SetCallback(callback func([]model.Row)) { cw.callback = callback } diff --git a/window/sliding_window.go b/window/sliding_window.go index 39f549e..4a700aa 100644 --- a/window/sliding_window.go +++ b/window/sliding_window.go @@ -7,6 +7,7 @@ import ( "time" "github.com/rulego/streamsql/model" + timex "github.com/rulego/streamsql/utils" "github.com/spf13/cast" ) @@ -30,17 +31,19 @@ type SlidingWindow struct { // 用于保护数据并发访问的互斥锁 mu sync.Mutex // 存储窗口内的数据 - data []TimedData + data []model.Row // 用于输出窗口内数据的通道 - outputChan chan []interface{} + outputChan chan []model.Row // 当窗口触发时执行的回调函数 - callback func([]interface{}) + callback func([]model.Row) // 用于控制窗口生命周期的上下文 ctx context.Context // 用于取消上下文的函数 cancelFunc context.CancelFunc // 用于定时触发窗口的定时器 - timer *time.Timer + timer *time.Timer + startSlot *model.TimeSlot + currentSlot *model.TimeSlot } // NewSlidingWindow 创建一个新的滑动窗口实例 @@ -60,10 +63,10 @@ func NewSlidingWindow(config model.WindowConfig) (*SlidingWindow, error) { config: config, size: size, slide: slide, - outputChan: make(chan []interface{}, 10), + outputChan: make(chan []model.Row, 10), ctx: ctx, cancelFunc: cancel, - data: make([]TimedData, 0), + data: make([]model.Row, 0), }, nil } @@ -74,10 +77,34 @@ func (sw *SlidingWindow) Add(data interface{}) { sw.mu.Lock() defer sw.mu.Unlock() // 将数据添加到窗口的数据列表中 - sw.data = append(sw.data, TimedData{ + + if sw.startSlot == nil { + sw.startSlot = sw.createSlot(GetTimestamp(data, sw.config.TsProp)) + sw.currentSlot = sw.startSlot + } + row := model.Row{ Data: data, Timestamp: GetTimestamp(data, sw.config.TsProp), - }) + } + sw.data = append(sw.data, row) +} + +func (sw *SlidingWindow) createSlot(t time.Time) *model.TimeSlot { + // 创建一个新的时间槽位 + start := timex.AlignTimeToWindow(t, sw.size) + end := start.Add(sw.size) + slot := model.NewTimeSlot(&start, &end) + return slot +} + +func (sw *SlidingWindow) NextSlot() *model.TimeSlot { + if sw.currentSlot == nil { + return nil + } + start := sw.currentSlot.Start.Add(sw.slide) + end := sw.currentSlot.End.Add(sw.slide) + next := model.NewTimeSlot(&start, &end) + return next } // Start 启动滑动窗口,开始定时触发窗口 @@ -113,19 +140,22 @@ func (sw *SlidingWindow) Trigger() { } // 计算截止时间,即当前时间减去窗口的总大小 - cutoff := time.Now().Add(-sw.size) - var newData []TimedData + next := sw.NextSlot() + var newData []model.Row // 遍历窗口内的数据,只保留在截止时间之后的数据 for _, item := range sw.data { - if item.Timestamp.After(cutoff) { + if next.Contains(item.Timestamp) { newData = append(newData, item) } } // 提取出 Data 字段组成 []interface{} 类型的数据 - resultData := make([]interface{}, 0, len(newData)) - for _, item := range newData { - resultData = append(resultData, item.Data) + resultData := make([]model.Row, 0) + for _, item := range sw.data { + if sw.currentSlot.Contains(item.Timestamp) { + item.Slot = sw.currentSlot + resultData = append(resultData, item) + } } // 如果设置了回调函数,则执行回调函数 @@ -135,6 +165,7 @@ func (sw *SlidingWindow) Trigger() { // 更新窗口内的数据 sw.data = newData + sw.currentSlot = next // 将新的数据发送到输出通道 sw.outputChan <- resultData } @@ -149,13 +180,13 @@ func (sw *SlidingWindow) Reset() { } // OutputChan 返回滑动窗口的输出通道 -func (sw *SlidingWindow) OutputChan() <-chan []interface{} { +func (sw *SlidingWindow) OutputChan() <-chan []model.Row { return sw.outputChan } // SetCallback 设置滑动窗口触发时执行的回调函数 // 参数 callback 表示要设置的回调函数 -func (sw *SlidingWindow) SetCallback(callback func([]interface{})) { +func (sw *SlidingWindow) SetCallback(callback func([]model.Row)) { sw.callback = callback } diff --git a/window/sliding_window_test.go b/window/sliding_window_test.go index 9bfd674..5b4e0a6 100644 --- a/window/sliding_window_test.go +++ b/window/sliding_window_test.go @@ -18,19 +18,19 @@ func TestSlidingWindow(t *testing.T) { "size": "2s", "slide": "1s", }, - TsProp: "Ts", + TsProp: "Ts", + TimeUnit: time.Second, }) - sw.SetCallback(func(results []interface{}) { + sw.SetCallback(func(results []model.Row) { t.Logf("Received results: %v", results) }) sw.Start() // 添加数据 - now := time.Now() - t_3 := TestDate{Ts: now.Add(-3 * time.Second)} - t_2 := TestDate{Ts: now.Add(-2 * time.Second)} - t_1 := TestDate{Ts: now.Add(-1 * time.Second)} - t_0 := TestDate{Ts: now} + t_3 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 56, 789000000, time.UTC), tag: "1"} + t_2 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 57, 789000000, time.UTC), tag: "2"} + t_1 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 58, 789000000, time.UTC), tag: "3"} + t_0 := TestDate{Ts: time.Date(2025, 4, 7, 16, 46, 59, 789000000, time.UTC), tag: "4"} sw.Add(t_3) sw.Add(t_2) @@ -38,25 +38,56 @@ func TestSlidingWindow(t *testing.T) { sw.Add(t_0) // 等待一段时间,触发窗口 - time.Sleep(3 * time.Second) + //time.Sleep(3 * time.Second) // 检查结果 resultsChan := sw.OutputChan() - var results []interface{} - select { - case results = <-resultsChan: - case <-time.After(100 * time.Second): - t.Fatal("No results received within timeout") + var results []model.Row + + for { + select { + case results = <-resultsChan: + raw := make([]TestDate, 0) + for _, row := range results { + raw = append(raw, row.Data.(TestDate)) + } + + // 获取当前窗口的时间范围 + windowStart := results[0].Slot.Start + windowEnd := results[0].Slot.End + t.Logf("Window range: %v - %v", windowStart, windowEnd) + + // 检查窗口内的数据 + expectedData := make([]TestDate, 0) + if windowStart.Before(t_3.Ts) && windowEnd.After(t_2.Ts) { + expectedData = []TestDate{t_3, t_2} + } else if windowStart.Before(t_2.Ts) && windowEnd.After(t_1.Ts) { + expectedData = []TestDate{t_2, t_1} + } else if windowStart.Before(t_1.Ts) && windowEnd.After(t_0.Ts) { + expectedData = []TestDate{t_1, t_0} + } else { + expectedData = []TestDate{t_0} + } + + // 验证窗口数据 + assert.Equal(t, len(expectedData), len(raw), "窗口数据数量不匹配") + for _, expected := range expectedData { + assert.Contains(t, raw, expected, "窗口缺少预期数据") + } + default: + // 通道为空时退出 + goto END + } } +END: // 预期结果:保留最近 2 秒内的数据 - assert.Len(t, results, 2) - assert.Contains(t, results, t_1) - assert.Contains(t, results, t_0) + assert.Len(t, results, 0) } type TestDate struct { - Ts time.Time + Ts time.Time + tag string } type TestDate2 struct { diff --git a/window/tumbling_window.go b/window/tumbling_window.go index cb79f50..a0f45c1 100644 --- a/window/tumbling_window.go +++ b/window/tumbling_window.go @@ -8,6 +8,7 @@ import ( "time" "github.com/rulego/streamsql/model" + timex "github.com/rulego/streamsql/utils" "github.com/spf13/cast" ) @@ -23,17 +24,19 @@ type TumblingWindow struct { // mu 用于保护对窗口数据的并发访问。 mu sync.Mutex // data 存储窗口内收集的数据。 - data []interface{} + data []model.Row // outputChan 是一个通道,用于在窗口触发时发送数据。 - outputChan chan []interface{} + outputChan chan []model.Row // callback 是一个可选的回调函数,在窗口触发时调用。 - callback func([]interface{}) + callback func([]model.Row) // ctx 用于控制窗口的生命周期。 ctx context.Context // cancelFunc 用于取消窗口的操作。 cancelFunc context.CancelFunc // timer 用于定时触发窗口。 - timer *time.Timer + timer *time.Timer + startSlot *model.TimeSlot + currentSlot *model.TimeSlot } // NewTumblingWindow 创建一个新的滚动窗口实例。 @@ -48,7 +51,7 @@ func NewTumblingWindow(config model.WindowConfig) (*TumblingWindow, error) { return &TumblingWindow{ config: config, size: size, - outputChan: make(chan []interface{}, 10), + outputChan: make(chan []model.Row, 10), ctx: ctx, cancelFunc: cancel, }, nil @@ -61,7 +64,33 @@ func (tw *TumblingWindow) Add(data interface{}) { tw.mu.Lock() defer tw.mu.Unlock() // 将数据追加到窗口的数据列表中。 - tw.data = append(tw.data, data) + if tw.startSlot == nil { + tw.startSlot = tw.createSlot(GetTimestamp(data, tw.config.TsProp)) + tw.currentSlot = tw.startSlot + } + row := model.Row{ + Data: data, + Timestamp: GetTimestamp(data, tw.config.TsProp), + } + tw.data = append(tw.data, row) +} + +func (sw *TumblingWindow) createSlot(t time.Time) *model.TimeSlot { + // 创建一个新的时间槽位 + start := timex.AlignTimeToWindow(t, sw.size) + end := start.Add(sw.size) + slot := model.NewTimeSlot(&start, &end) + return slot +} + +func (sw *TumblingWindow) NextSlot() *model.TimeSlot { + if sw.currentSlot == nil { + return nil + } + start := sw.currentSlot.End + end := sw.currentSlot.End.Add(sw.size) + next := model.NewTimeSlot(start, &end) + return next } // Stop 停止滚动窗口的操作。 @@ -98,16 +127,35 @@ func (tw *TumblingWindow) Trigger() { // 加锁以确保并发安全。 tw.mu.Lock() defer tw.mu.Unlock() - - // 如果设置了回调函数,则调用它。 - if tw.callback != nil { - tw.callback(tw.data) + // 计算截止时间,即当前时间减去窗口的总大小 + next := tw.NextSlot() + var newData []model.Row + // 遍历窗口内的数据,只保留在截止时间之后的数据 + for _, item := range tw.data { + if next.Contains(item.Timestamp) { + newData = append(newData, item) + } } - // 将窗口数据发送到输出通道。 - tw.outputChan <- append([]interface{}{}, tw.data...) - // 重置窗口数据。 - tw.data = nil + // 提取出 Data 字段组成 []interface{} 类型的数据 + resultData := make([]model.Row, 0) + for _, item := range tw.data { + if tw.currentSlot.Contains(item.Timestamp) { + item.Slot = tw.currentSlot + resultData = append(resultData, item) + } + } + + // 如果设置了回调函数,则执行回调函数 + if tw.callback != nil { + tw.callback(resultData) + } + + // 更新窗口内的数据 + tw.data = newData + tw.currentSlot = next + // 将新的数据发送到输出通道 + tw.outputChan <- resultData } // Reset 重置滚动窗口的数据。 @@ -120,21 +168,21 @@ func (tw *TumblingWindow) Reset() { } // OutputChan 返回一个只读通道,用于接收窗口触发时的数据。 -func (tw *TumblingWindow) OutputChan() <-chan []interface{} { +func (tw *TumblingWindow) OutputChan() <-chan []model.Row { return tw.outputChan } // SetCallback 设置滚动窗口触发时的回调函数。 // 参数 callback 是要设置的回调函数。 -func (tw *TumblingWindow) SetCallback(callback func([]interface{})) { +func (tw *TumblingWindow) SetCallback(callback func([]model.Row)) { tw.callback = callback } -// GetResults 获取当前滚动窗口中的数据副本。 -func (tw *TumblingWindow) GetResults() []interface{} { - // 加锁以确保并发安全。 - tw.mu.Lock() - defer tw.mu.Unlock() - // 返回窗口数据的副本。 - return append([]interface{}{}, tw.data...) -} +// // GetResults 获取当前滚动窗口中的数据副本。 +// func (tw *TumblingWindow) GetResults() []interface{} { +// // 加锁以确保并发安全。 +// tw.mu.Lock() +// defer tw.mu.Unlock() +// // 返回窗口数据的副本。 +// return append([]interface{}{}, tw.data...) +// } diff --git a/window/tumbling_window_test.go b/window/tumbling_window_test.go index 70d42b0..8aea937 100644 --- a/window/tumbling_window_test.go +++ b/window/tumbling_window_test.go @@ -17,7 +17,7 @@ func TestTumblingWindow(t *testing.T) { Type: "TumblingWindow", Params: map[string]interface{}{"size": "2s"}, }) - tw.SetCallback(func(results []interface{}) { + tw.SetCallback(func(results []model.Row) { // Process results }) go tw.Start() @@ -30,7 +30,7 @@ func TestTumblingWindow(t *testing.T) { // Check output channel resultsChan := tw.OutputChan() - var results []interface{} + var results []model.Row select { case results = <-resultsChan: case <-time.After(3 * time.Second):