mirror of
https://gitee.com/rulego/streamsql.git
synced 2025-07-06 16:08:36 +00:00
refactor: 优化时间触发逻辑
This commit is contained in:
@ -31,10 +31,8 @@ func TestStreamsql(t *testing.T) {
|
||||
strm.AddSink(func(result interface{}) {
|
||||
resultChan <- result
|
||||
})
|
||||
// 等待 3 秒触发窗口
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var actual interface{}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package timex
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// AlignTimeToWindow 将时间对齐到窗口的起始时间。
|
||||
func AlignTimeToWindow(t time.Time, size time.Duration) time.Time {
|
||||
|
@ -41,8 +41,11 @@ type SlidingWindow struct {
|
||||
// 用于取消上下文的函数
|
||||
cancelFunc context.CancelFunc
|
||||
// 用于定时触发窗口的定时器
|
||||
timer *time.Timer
|
||||
timer *time.Ticker
|
||||
currentSlot *model.TimeSlot
|
||||
// 用于初始化窗口的通道
|
||||
initChan chan struct{}
|
||||
initialized bool
|
||||
}
|
||||
|
||||
// NewSlidingWindow 创建一个新的滑动窗口实例
|
||||
@ -59,13 +62,15 @@ func NewSlidingWindow(config model.WindowConfig) (*SlidingWindow, error) {
|
||||
return nil, fmt.Errorf("invalid slide for sliding window: %v", err)
|
||||
}
|
||||
return &SlidingWindow{
|
||||
config: config,
|
||||
size: size,
|
||||
slide: slide,
|
||||
outputChan: make(chan []model.Row, 10),
|
||||
ctx: ctx,
|
||||
cancelFunc: cancel,
|
||||
data: make([]model.Row, 0),
|
||||
config: config,
|
||||
size: size,
|
||||
slide: slide,
|
||||
outputChan: make(chan []model.Row, 10),
|
||||
ctx: ctx,
|
||||
cancelFunc: cancel,
|
||||
data: make([]model.Row, 0),
|
||||
initChan: make(chan struct{}),
|
||||
initialized: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -77,30 +82,32 @@ func (sw *SlidingWindow) Add(data interface{}) {
|
||||
defer sw.mu.Unlock()
|
||||
// 将数据添加到窗口的数据列表中
|
||||
t := GetTimestamp(data, sw.config.TsProp)
|
||||
if sw.currentSlot == nil {
|
||||
if !sw.initialized {
|
||||
sw.currentSlot = sw.createSlot(t)
|
||||
sw.timer = time.NewTicker(sw.slide)
|
||||
// 发送初始化完成信号
|
||||
close(sw.initChan)
|
||||
sw.initialized = true
|
||||
}
|
||||
go func() {
|
||||
row := model.Row{
|
||||
Data: data,
|
||||
Timestamp: t,
|
||||
}
|
||||
sw.data = append(sw.data, row)
|
||||
}()
|
||||
row := model.Row{
|
||||
Data: data,
|
||||
Timestamp: t,
|
||||
}
|
||||
sw.data = append(sw.data, row)
|
||||
}
|
||||
|
||||
// Start 启动滑动窗口,开始定时触发窗口
|
||||
func (sw *SlidingWindow) Start() {
|
||||
go func() {
|
||||
// 创建一个定时器,初始时间为窗口滑动的时间间隔
|
||||
sw.timer = time.NewTimer(sw.slide)
|
||||
// 等待初始化信号
|
||||
<-sw.initChan
|
||||
// 在函数结束时关闭输出通道。
|
||||
defer close(sw.outputChan)
|
||||
for {
|
||||
select {
|
||||
// 当定时器到期时,触发窗口
|
||||
case <-sw.timer.C:
|
||||
sw.Trigger()
|
||||
// 重置定时器,以便下一次触发
|
||||
sw.timer.Reset(sw.slide)
|
||||
// 当上下文被取消时,停止定时器并退出循环
|
||||
case <-sw.ctx.Done():
|
||||
sw.timer.Stop()
|
||||
@ -120,7 +127,9 @@ func (sw *SlidingWindow) Trigger() {
|
||||
if len(sw.data) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if !sw.initialized {
|
||||
return
|
||||
}
|
||||
// 计算截止时间,即当前时间减去窗口的总大小
|
||||
next := sw.NextSlot()
|
||||
// 保留下一个窗口的数据
|
||||
@ -163,6 +172,8 @@ func (sw *SlidingWindow) Reset() {
|
||||
// 清空窗口内的数据
|
||||
sw.data = nil
|
||||
sw.currentSlot = nil
|
||||
sw.initialized = false
|
||||
sw.initChan = make(chan struct{})
|
||||
}
|
||||
|
||||
// OutputChan 返回滑动窗口的输出通道
|
||||
|
@ -10,6 +10,13 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type TestResult struct {
|
||||
size int
|
||||
data []TestDate
|
||||
start time.Time
|
||||
end time.Time
|
||||
}
|
||||
|
||||
func TestSlidingWindow(t *testing.T) {
|
||||
_, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@ -23,7 +30,13 @@ func TestSlidingWindow(t *testing.T) {
|
||||
TimeUnit: time.Second,
|
||||
})
|
||||
sw.SetCallback(func(results []model.Row) {
|
||||
t.Logf("Received results: %v", results)
|
||||
if len(results) == 0 {
|
||||
return
|
||||
}
|
||||
for _, row := range results {
|
||||
t.Logf("Slot: %v Received row: %v", row.Slot, row.Data)
|
||||
}
|
||||
|
||||
})
|
||||
sw.Start()
|
||||
|
||||
@ -38,65 +51,53 @@ func TestSlidingWindow(t *testing.T) {
|
||||
sw.Add(t_1)
|
||||
sw.Add(t_0)
|
||||
|
||||
// 验证每个窗口的数据
|
||||
expected := []TestResult{
|
||||
{size: 2, data: []TestDate{t_3, t_2}, start: timex.AlignTime(t_3.Ts, time.Second, true), end: timex.AlignTime(t_2.Ts, time.Second, false)},
|
||||
{size: 2, data: []TestDate{t_2, t_1}, start: timex.AlignTime(t_2.Ts, time.Second, true), end: timex.AlignTime(t_1.Ts, time.Second, false)},
|
||||
{size: 2, data: []TestDate{t_1, t_0}, start: timex.AlignTime(t_1.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, false)},
|
||||
{size: 1, data: []TestDate{t_0}, start: timex.AlignTime(t_0.Ts, time.Second, true), end: timex.AlignTime(t_0.Ts, time.Second, true).Add(sw.size)},
|
||||
}
|
||||
// 等待一段时间,触发窗口
|
||||
//time.Sleep(3 * time.Second)
|
||||
|
||||
// 检查结果
|
||||
resultsChan := sw.OutputChan()
|
||||
var results []model.Row
|
||||
|
||||
// resultsChan := sw.OutputChan()
|
||||
// results := make(chan []model.Row)
|
||||
actual := make([]TestResult, 0)
|
||||
timeout := time.After(6 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case results = <-resultsChan:
|
||||
case results := <-sw.OutputChan():
|
||||
raw := make([]TestDate, 0)
|
||||
for _, row := range results {
|
||||
raw = append(raw, row.Data.(TestDate))
|
||||
}
|
||||
|
||||
// 获取当前窗口的时间范围
|
||||
windowStart := results[0].Slot.Start
|
||||
windowEnd := results[0].Slot.End
|
||||
t.Logf("Window range: %v - %v", windowStart, windowEnd)
|
||||
|
||||
// 检查窗口内的数据
|
||||
expectedData := make([]TestDate, 0)
|
||||
|
||||
if windowStart.Before(t_3.Ts) && windowEnd.After(t_2.Ts) {
|
||||
expectedData = []TestDate{t_3, t_2}
|
||||
start := timex.AlignTimeToWindow(t_3.Ts, sw.size)
|
||||
assert.Equal(t, start, windowStart)
|
||||
assert.Equal(t, start.Add(sw.size), windowEnd)
|
||||
} else if windowStart.Before(t_2.Ts) && windowEnd.After(t_1.Ts) {
|
||||
expectedData = []TestDate{t_2, t_1}
|
||||
start := timex.AlignTimeToWindow(t_2.Ts, sw.size)
|
||||
assert.Equal(t, start, windowStart)
|
||||
assert.Equal(t, start.Add(sw.size), windowEnd)
|
||||
} else if windowStart.Before(t_1.Ts) && windowEnd.After(t_0.Ts) {
|
||||
expectedData = []TestDate{t_1, t_0}
|
||||
start := timex.AlignTimeToWindow(t_1.Ts, sw.size)
|
||||
assert.Equal(t, start, windowStart)
|
||||
assert.Equal(t, start.Add(sw.size), windowEnd)
|
||||
} else {
|
||||
expectedData = []TestDate{t_0}
|
||||
start := timex.AlignTimeToWindow(t_0.Ts, sw.size)
|
||||
assert.Equal(t, start, windowStart)
|
||||
assert.Equal(t, start.Add(sw.size), windowEnd)
|
||||
if len(results) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// 验证窗口数据
|
||||
assert.Equal(t, len(expectedData), len(raw), "窗口数据数量不匹配")
|
||||
for _, expected := range expectedData {
|
||||
assert.Contains(t, raw, expected, "窗口缺少预期数据")
|
||||
}
|
||||
default:
|
||||
// 通道为空时退出
|
||||
actual = append(actual, TestResult{
|
||||
size: len(results),
|
||||
data: raw,
|
||||
start: *results[0].Slot.Start,
|
||||
end: *results[0].Slot.End})
|
||||
case <-timeout:
|
||||
goto END
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
END:
|
||||
assert.Equal(t, len(actual), len(expected))
|
||||
// 预期结果:保留最近 2 秒内的数据
|
||||
assert.Len(t, results, 0)
|
||||
for i, exp := range expected {
|
||||
assert.Equal(t, actual[i].size, exp.size)
|
||||
assert.Equal(t, actual[i].start, exp.start)
|
||||
assert.Equal(t, actual[i].end, exp.end)
|
||||
for _, d := range exp.data {
|
||||
assert.Contains(t, actual[i].data, d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type TestDate struct {
|
||||
|
@ -34,8 +34,11 @@ type TumblingWindow struct {
|
||||
// cancelFunc 用于取消窗口的操作。
|
||||
cancelFunc context.CancelFunc
|
||||
// timer 用于定时触发窗口。
|
||||
timer *time.Timer
|
||||
timer *time.Ticker
|
||||
currentSlot *model.TimeSlot
|
||||
// 用于初始化窗口的通道
|
||||
initChan chan struct{}
|
||||
initialized bool
|
||||
}
|
||||
|
||||
// NewTumblingWindow 创建一个新的滚动窗口实例。
|
||||
@ -48,11 +51,13 @@ func NewTumblingWindow(config model.WindowConfig) (*TumblingWindow, error) {
|
||||
return nil, fmt.Errorf("invalid size for tumbling window: %v", err)
|
||||
}
|
||||
return &TumblingWindow{
|
||||
config: config,
|
||||
size: size,
|
||||
outputChan: make(chan []model.Row, 10),
|
||||
ctx: ctx,
|
||||
cancelFunc: cancel,
|
||||
config: config,
|
||||
size: size,
|
||||
outputChan: make(chan []model.Row, 10),
|
||||
ctx: ctx,
|
||||
cancelFunc: cancel,
|
||||
initChan: make(chan struct{}),
|
||||
initialized: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -63,16 +68,18 @@ func (tw *TumblingWindow) Add(data interface{}) {
|
||||
tw.mu.Lock()
|
||||
defer tw.mu.Unlock()
|
||||
// 将数据追加到窗口的数据列表中。
|
||||
if tw.currentSlot == nil {
|
||||
if !tw.initialized {
|
||||
tw.currentSlot = tw.createSlot(GetTimestamp(data, tw.config.TsProp))
|
||||
tw.timer = time.NewTicker(tw.size)
|
||||
// 发送初始化完成信号
|
||||
close(tw.initChan)
|
||||
tw.initialized = true
|
||||
}
|
||||
go func() {
|
||||
row := model.Row{
|
||||
Data: data,
|
||||
Timestamp: GetTimestamp(data, tw.config.TsProp),
|
||||
}
|
||||
tw.data = append(tw.data, row)
|
||||
}()
|
||||
row := model.Row{
|
||||
Data: data,
|
||||
Timestamp: GetTimestamp(data, tw.config.TsProp),
|
||||
}
|
||||
tw.data = append(tw.data, row)
|
||||
}
|
||||
|
||||
func (sw *TumblingWindow) createSlot(t time.Time) *model.TimeSlot {
|
||||
@ -101,8 +108,7 @@ func (tw *TumblingWindow) Stop() {
|
||||
// Start 启动滚动窗口的定时触发机制。
|
||||
func (tw *TumblingWindow) Start() {
|
||||
go func() {
|
||||
// 创建一个定时器,初始时间为窗口大小。
|
||||
tw.timer = time.NewTimer(tw.size)
|
||||
<-tw.initChan
|
||||
// 在函数结束时关闭输出通道。
|
||||
defer close(tw.outputChan)
|
||||
for {
|
||||
@ -110,8 +116,6 @@ func (tw *TumblingWindow) Start() {
|
||||
// 当定时器到期时,触发窗口。
|
||||
case <-tw.timer.C:
|
||||
tw.Trigger()
|
||||
// 重置定时器以开始下一个窗口周期。
|
||||
tw.timer.Reset(tw.size)
|
||||
// 当上下文被取消时,停止定时器并退出循环。
|
||||
case <-tw.ctx.Done():
|
||||
tw.timer.Stop()
|
||||
@ -126,6 +130,9 @@ func (tw *TumblingWindow) Trigger() {
|
||||
// 加锁以确保并发安全。
|
||||
tw.mu.Lock()
|
||||
defer tw.mu.Unlock()
|
||||
if !tw.initialized {
|
||||
return
|
||||
}
|
||||
// 计算下一个窗口槽位
|
||||
next := tw.NextSlot()
|
||||
// 保留下一个窗口的数据
|
||||
@ -168,6 +175,8 @@ func (tw *TumblingWindow) Reset() {
|
||||
// 清空窗口数据。
|
||||
tw.data = nil
|
||||
tw.currentSlot = nil
|
||||
tw.initialized = false
|
||||
tw.initChan = make(chan struct{})
|
||||
}
|
||||
|
||||
// OutputChan 返回一个只读通道,用于接收窗口触发时的数据。
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
func TestTumblingWindow(t *testing.T) {
|
||||
_, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
tw, _ := NewTumblingWindow(model.WindowConfig{
|
||||
Type: "TumblingWindow",
|
||||
Params: map[string]interface{}{"size": "2s"},
|
||||
|
Reference in New Issue
Block a user