From a6b8e95f2f7f640040d2b211080b843b7facfab9 Mon Sep 17 00:00:00 2001 From: dexter Date: Fri, 11 Apr 2025 07:19:19 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E8=A7=A6=E5=8F=91=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- streamsql_test.go | 4 +- utils/time.go | 4 +- window/sliding_window.go | 53 +++++++++++++-------- window/sliding_window_test.go | 87 +++++++++++++++++----------------- window/tumbling_window.go | 45 +++++++++++------- window/tumbling_window_test.go | 1 - 6 files changed, 107 insertions(+), 87 deletions(-) diff --git a/streamsql_test.go b/streamsql_test.go index 47a7295..98c48e1 100644 --- a/streamsql_test.go +++ b/streamsql_test.go @@ -31,10 +31,8 @@ func TestStreamsql(t *testing.T) { strm.AddSink(func(result interface{}) { resultChan <- result }) - // 等待 3 秒触发窗口 - time.Sleep(3 * time.Second) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() var actual interface{} diff --git a/utils/time.go b/utils/time.go index 8c83f33..aecd203 100644 --- a/utils/time.go +++ b/utils/time.go @@ -1,6 +1,8 @@ package timex -import "time" +import ( + "time" +) // AlignTimeToWindow 将时间对齐到窗口的起始时间。 func AlignTimeToWindow(t time.Time, size time.Duration) time.Time { diff --git a/window/sliding_window.go b/window/sliding_window.go index 194be63..88664ce 100644 --- a/window/sliding_window.go +++ b/window/sliding_window.go @@ -41,8 +41,11 @@ type SlidingWindow struct { // 用于取消上下文的函数 cancelFunc context.CancelFunc // 用于定时触发窗口的定时器 - timer *time.Timer + timer *time.Ticker currentSlot *model.TimeSlot + // 用于初始化窗口的通道 + initChan chan struct{} + initialized bool } // NewSlidingWindow 创建一个新的滑动窗口实例 @@ -59,13 +62,15 @@ func NewSlidingWindow(config model.WindowConfig) (*SlidingWindow, error) { return nil, fmt.Errorf("invalid slide for sliding window: %v", err) } return &SlidingWindow{ - config: config, - size: size, - slide: slide, - outputChan: make(chan []model.Row, 10), - ctx: ctx, - cancelFunc: cancel, - data: make([]model.Row, 0), + config: config, + size: size, + slide: slide, + outputChan: make(chan []model.Row, 10), + ctx: ctx, + cancelFunc: cancel, + data: make([]model.Row, 0), + initChan: make(chan struct{}), + initialized: false, }, nil } @@ -77,30 +82,32 @@ func (sw *SlidingWindow) Add(data interface{}) { defer sw.mu.Unlock() // 将数据添加到窗口的数据列表中 t := GetTimestamp(data, sw.config.TsProp) - if sw.currentSlot == nil { + if !sw.initialized { sw.currentSlot = sw.createSlot(t) + sw.timer = time.NewTicker(sw.slide) + // 发送初始化完成信号 + close(sw.initChan) + sw.initialized = true } - go func() { - row := model.Row{ - Data: data, - Timestamp: t, - } - sw.data = append(sw.data, row) - }() + row := model.Row{ + Data: data, + Timestamp: t, + } + sw.data = append(sw.data, row) } // Start 启动滑动窗口,开始定时触发窗口 func (sw *SlidingWindow) Start() { go func() { - // 创建一个定时器,初始时间为窗口滑动的时间间隔 - sw.timer = time.NewTimer(sw.slide) + // 等待初始化信号 + <-sw.initChan + // 在函数结束时关闭输出通道。 + defer close(sw.outputChan) for { select { // 当定时器到期时,触发窗口 case <-sw.timer.C: sw.Trigger() - // 重置定时器,以便下一次触发 - sw.timer.Reset(sw.slide) // 当上下文被取消时,停止定时器并退出循环 case <-sw.ctx.Done(): sw.timer.Stop() @@ -120,7 +127,9 @@ func (sw *SlidingWindow) Trigger() { if len(sw.data) == 0 { return } - + if !sw.initialized { + return + } // 计算截止时间,即当前时间减去窗口的总大小 next := sw.NextSlot() // 保留下一个窗口的数据 @@ -163,6 +172,8 @@ func (sw *SlidingWindow) Reset() { // 清空窗口内的数据 sw.data = nil sw.currentSlot = nil + sw.initialized = false + sw.initChan = make(chan struct{}) } // OutputChan 返回滑动窗口的输出通道 diff --git a/window/sliding_window_test.go b/window/sliding_window_test.go index b605540..5853d42 100644 --- a/window/sliding_window_test.go +++ b/window/sliding_window_test.go @@ -10,6 +10,13 @@ import ( "github.com/stretchr/testify/assert" ) +type TestResult struct { + size int + data []TestDate + start time.Time + end time.Time +} + func TestSlidingWindow(t *testing.T) { _, cancel := context.WithCancel(context.Background()) defer cancel() @@ -23,7 +30,13 @@ func TestSlidingWindow(t *testing.T) { TimeUnit: time.Second, }) sw.SetCallback(func(results []model.Row) { - t.Logf("Received results: %v", results) + if len(results) == 0 { + return + } + for _, row := range results { + t.Logf("Slot: %v Received row: %v", row.Slot, row.Data) + } + }) sw.Start() @@ -38,65 +51,53 @@ func TestSlidingWindow(t *testing.T) { sw.Add(t_1) sw.Add(t_0) + // 验证每个窗口的数据 + expected := []TestResult{ + {size: 2, data: []TestDate{t_3, t_2}, start: timex.AlignTime(t_3.Ts, time.Second, true), end: timex.AlignTime(t_2.Ts, time.Second, false)}, + {size: 2, data: []TestDate{t_2, t_1}, start: timex.AlignTime(t_2.Ts, time.Second, true), end: timex.AlignTime(t_1.Ts, time.Second, false)}, + {size: 2, data: []TestDate{t_1, t_0}, start: timex.AlignTime(t_1.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, false)}, + {size: 1, data: []TestDate{t_0}, start: timex.AlignTime(t_0.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, true).Add(sw.size)}, + } // 等待一段时间,触发窗口 //time.Sleep(3 * time.Second) // 检查结果 - resultsChan := sw.OutputChan() - var results []model.Row - + // resultsChan := sw.OutputChan() + // results := make(chan []model.Row) + actual := make([]TestResult, 0) + timeout := time.After(6 * time.Second) for { select { - case results = <-resultsChan: + case results := <-sw.OutputChan(): raw := make([]TestDate, 0) for _, row := range results { raw = append(raw, row.Data.(TestDate)) } - - // 获取当前窗口的时间范围 - windowStart := results[0].Slot.Start - windowEnd := results[0].Slot.End - t.Logf("Window range: %v - %v", windowStart, windowEnd) - - // 检查窗口内的数据 - expectedData := make([]TestDate, 0) - - if windowStart.Before(t_3.Ts) && windowEnd.After(t_2.Ts) { - expectedData = []TestDate{t_3, t_2} - start := timex.AlignTimeToWindow(t_3.Ts, sw.size) - assert.Equal(t, start, windowStart) - assert.Equal(t, start.Add(sw.size), windowEnd) - } else if windowStart.Before(t_2.Ts) && windowEnd.After(t_1.Ts) { - expectedData = []TestDate{t_2, t_1} - start := timex.AlignTimeToWindow(t_2.Ts, sw.size) - assert.Equal(t, start, windowStart) - assert.Equal(t, start.Add(sw.size), windowEnd) - } else if windowStart.Before(t_1.Ts) && windowEnd.After(t_0.Ts) { - expectedData = []TestDate{t_1, t_0} - start := timex.AlignTimeToWindow(t_1.Ts, sw.size) - assert.Equal(t, start, windowStart) - assert.Equal(t, start.Add(sw.size), windowEnd) - } else { - expectedData = []TestDate{t_0} - start := timex.AlignTimeToWindow(t_0.Ts, sw.size) - assert.Equal(t, start, windowStart) - assert.Equal(t, start.Add(sw.size), windowEnd) + if len(results) == 0 { + continue } - - // 验证窗口数据 - assert.Equal(t, len(expectedData), len(raw), "窗口数据数量不匹配") - for _, expected := range expectedData { - assert.Contains(t, raw, expected, "窗口缺少预期数据") - } - default: - // 通道为空时退出 + actual = append(actual, TestResult{ + size: len(results), + data: raw, + start: *results[0].Slot.Start, + end: *results[0].Slot.End}) + case <-timeout: goto END + default: } } END: + assert.Equal(t, len(actual), len(expected)) // 预期结果:保留最近 2 秒内的数据 - assert.Len(t, results, 0) + for i, exp := range expected { + assert.Equal(t, actual[i].size, exp.size) + assert.Equal(t, actual[i].start, exp.start) + assert.Equal(t, actual[i].end, exp.end) + for _, d := range exp.data { + assert.Contains(t, actual[i].data, d) + } + } } type TestDate struct { diff --git a/window/tumbling_window.go b/window/tumbling_window.go index 9e15046..ad7fe19 100644 --- a/window/tumbling_window.go +++ b/window/tumbling_window.go @@ -34,8 +34,11 @@ type TumblingWindow struct { // cancelFunc 用于取消窗口的操作。 cancelFunc context.CancelFunc // timer 用于定时触发窗口。 - timer *time.Timer + timer *time.Ticker currentSlot *model.TimeSlot + // 用于初始化窗口的通道 + initChan chan struct{} + initialized bool } // NewTumblingWindow 创建一个新的滚动窗口实例。 @@ -48,11 +51,13 @@ func NewTumblingWindow(config model.WindowConfig) (*TumblingWindow, error) { return nil, fmt.Errorf("invalid size for tumbling window: %v", err) } return &TumblingWindow{ - config: config, - size: size, - outputChan: make(chan []model.Row, 10), - ctx: ctx, - cancelFunc: cancel, + config: config, + size: size, + outputChan: make(chan []model.Row, 10), + ctx: ctx, + cancelFunc: cancel, + initChan: make(chan struct{}), + initialized: false, }, nil } @@ -63,16 +68,18 @@ func (tw *TumblingWindow) Add(data interface{}) { tw.mu.Lock() defer tw.mu.Unlock() // 将数据追加到窗口的数据列表中。 - if tw.currentSlot == nil { + if !tw.initialized { tw.currentSlot = tw.createSlot(GetTimestamp(data, tw.config.TsProp)) + tw.timer = time.NewTicker(tw.size) + // 发送初始化完成信号 + close(tw.initChan) + tw.initialized = true } - go func() { - row := model.Row{ - Data: data, - Timestamp: GetTimestamp(data, tw.config.TsProp), - } - tw.data = append(tw.data, row) - }() + row := model.Row{ + Data: data, + Timestamp: GetTimestamp(data, tw.config.TsProp), + } + tw.data = append(tw.data, row) } func (sw *TumblingWindow) createSlot(t time.Time) *model.TimeSlot { @@ -101,8 +108,7 @@ func (tw *TumblingWindow) Stop() { // Start 启动滚动窗口的定时触发机制。 func (tw *TumblingWindow) Start() { go func() { - // 创建一个定时器,初始时间为窗口大小。 - tw.timer = time.NewTimer(tw.size) + <-tw.initChan // 在函数结束时关闭输出通道。 defer close(tw.outputChan) for { @@ -110,8 +116,6 @@ func (tw *TumblingWindow) Start() { // 当定时器到期时,触发窗口。 case <-tw.timer.C: tw.Trigger() - // 重置定时器以开始下一个窗口周期。 - tw.timer.Reset(tw.size) // 当上下文被取消时,停止定时器并退出循环。 case <-tw.ctx.Done(): tw.timer.Stop() @@ -126,6 +130,9 @@ func (tw *TumblingWindow) Trigger() { // 加锁以确保并发安全。 tw.mu.Lock() defer tw.mu.Unlock() + if !tw.initialized { + return + } // 计算下一个窗口槽位 next := tw.NextSlot() // 保留下一个窗口的数据 @@ -168,6 +175,8 @@ func (tw *TumblingWindow) Reset() { // 清空窗口数据。 tw.data = nil tw.currentSlot = nil + tw.initialized = false + tw.initChan = make(chan struct{}) } // OutputChan 返回一个只读通道,用于接收窗口触发时的数据。 diff --git a/window/tumbling_window_test.go b/window/tumbling_window_test.go index 7e9b6f7..d57bf96 100644 --- a/window/tumbling_window_test.go +++ b/window/tumbling_window_test.go @@ -13,7 +13,6 @@ import ( func TestTumblingWindow(t *testing.T) { _, cancel := context.WithCancel(context.Background()) defer cancel() - tw, _ := NewTumblingWindow(model.WindowConfig{ Type: "TumblingWindow", Params: map[string]interface{}{"size": "2s"},