From 73f7843996bf42fa344ba302f93b7305d1ae54e0 Mon Sep 17 00:00:00 2001 From: rulego-team Date: Thu, 13 Nov 2025 18:45:55 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix:=E7=AC=AC=E4=B8=80=E4=B8=AA=E7=AA=97?= =?UTF-8?q?=E5=8F=A3=E5=90=8E=E5=90=AF=E5=8A=A8=E6=AD=A5=E9=95=BF=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- window/sliding_window.go | 60 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/window/sliding_window.go b/window/sliding_window.go index fcce7fd..b9c8785 100644 --- a/window/sliding_window.go +++ b/window/sliding_window.go @@ -23,7 +23,6 @@ import ( "time" "github.com/rulego/streamsql/utils/cast" - "github.com/rulego/streamsql/utils/timex" "github.com/rulego/streamsql/types" ) @@ -65,6 +64,8 @@ type SlidingWindow struct { initialized bool // timerMu protects timer access timerMu sync.Mutex + // firstWindowStartTime records when first window started (processing time) + firstWindowStartTime time.Time // Performance statistics droppedCount int64 // Number of dropped results sentCount int64 // Number of successfully sent results @@ -126,9 +127,9 @@ func (sw *SlidingWindow) Add(data interface{}) { t := GetTimestamp(data, sw.config.TsProp, sw.config.TimeUnit) if !sw.initialized { sw.currentSlot = sw.createSlot(t) - sw.timerMu.Lock() - sw.timer = time.NewTicker(sw.slide) - sw.timerMu.Unlock() + // Record when first window started (processing time) + sw.firstWindowStartTime = time.Now() + // Don't start timer here, wait for first window to end // Send initialization complete signal close(sw.initChan) sw.initialized = true @@ -142,6 +143,7 @@ func (sw *SlidingWindow) Add(data interface{}) { // Start starts the sliding window with periodic triggering // Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally +// First window triggers when it ends, then subsequent windows trigger at slide intervals func (sw *SlidingWindow) Start() { go func() { // Close output channel when function ends @@ -156,6 +158,53 @@ func (sw *SlidingWindow) Start() { return } + // Wait for first window to end, then trigger it + // After initChan is closed, firstWindowStartTime should be set by Add() + sw.mu.RLock() + firstWindowStartTime := sw.firstWindowStartTime + sw.mu.RUnlock() + + // Verify that firstWindowStartTime is valid (not zero) + // If zero, it means Add() hasn't been called yet, which shouldn't happen + // but we handle it gracefully by waiting for window size + if firstWindowStartTime.IsZero() { + // This shouldn't happen if Add() is called before Start(), + // but if it does, wait for window size from now + firstWindowStartTime = time.Now() + } + + // Calculate time until first window ends (window size from processing time) + now := time.Now() + elapsed := now.Sub(firstWindowStartTime) + var waitDuration time.Duration + if elapsed < sw.size { + // Wait until window size time has passed + waitDuration = sw.size - elapsed + } else { + // First window already ended, trigger immediately + waitDuration = 0 + } + + // Wait for first window to end + if waitDuration > 0 { + select { + case <-time.After(waitDuration): + // First window ended, trigger it + sw.Trigger() + case <-sw.ctx.Done(): + return + } + } else { + // First window already ended, trigger immediately + sw.Trigger() + } + + // Now start the sliding step timer for subsequent windows + sw.timerMu.Lock() + sw.timer = time.NewTicker(sw.slide) + sw.timerMu.Unlock() + + // Continue with periodic triggering at slide intervals for { // Safely get timer in each loop iteration sw.timerMu.Lock() @@ -327,6 +376,7 @@ func (sw *SlidingWindow) Reset() { sw.currentSlot = nil sw.initialized = false sw.initChan = make(chan struct{}) + sw.firstWindowStartTime = time.Time{} // Recreate context for next startup sw.ctx, sw.cancelFunc = context.WithCancel(context.Background()) @@ -356,7 +406,7 @@ func (sw *SlidingWindow) NextSlot() *types.TimeSlot { func (sw *SlidingWindow) createSlot(t time.Time) *types.TimeSlot { // Create a new time slot - start := timex.AlignTimeToWindow(t, sw.slide) + start := t end := start.Add(sw.size) slot := types.NewTimeSlot(&start, &end) return slot From eb29ed921d3686def73531ffd9b44daa69aab01a Mon Sep 17 00:00:00 2001 From: rulego-team Date: Thu, 13 Nov 2025 18:46:10 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix:=E5=8F=96=E6=B6=88=E5=AF=B9=E9=BD=90?= =?UTF-8?q?=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- streamsql_sliding_window_test.go | 378 ++++++++++++++++++++++++++++--- window/counting_window.go | 11 +- window/session_window.go | 5 +- window/tumbling_window.go | 3 +- 4 files changed, 358 insertions(+), 39 deletions(-) diff --git a/streamsql_sliding_window_test.go b/streamsql_sliding_window_test.go index 1282fb6..22978aa 100644 --- a/streamsql_sliding_window_test.go +++ b/streamsql_sliding_window_test.go @@ -22,22 +22,33 @@ func TestSQLSlidingWindow_Basic(t *testing.T) { require.NoError(t, err) ch := make(chan []map[string]interface{}, 10) + windowResults := make([][]map[string]interface{}, 0) ssql.AddSink(func(results []map[string]interface{}) { - ch <- results + if len(results) > 0 { + windowResults = append(windowResults, results) + ch <- results + } }) - baseTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) - for i := 0; i < 12; i++ { - ssql.Emit(map[string]interface{}{ - "deviceId": "sensor001", - "temperature": i, - "timestamp": baseTime.Add(time.Duration(i) * time.Second), - }) - time.Sleep(10 * time.Millisecond) - } + // 每秒发送一条数据,持续发送15秒,确保有足够的数据 + // 数据会在处理时间到达时被添加到窗口 + done := make(chan bool) + go func() { + for i := 0; i < 15; i++ { // 发送15条数据,约15秒 + ssql.Emit(map[string]interface{}{ + "deviceId": "sensor001", + "temperature": i, + }) + time.Sleep(1 * time.Second) + } + done <- true + }() + + // 等待窗口触发(第一个窗口应该在10秒后触发) + time.Sleep(12 * time.Second) results := make([][]map[string]interface{}, 0) - timeout := time.After(15 * time.Second) + timeout := time.After(3 * time.Second) for { select { case res := <-ch: @@ -50,12 +61,44 @@ func TestSQLSlidingWindow_Basic(t *testing.T) { } END: - assert.Greater(t, len(results), 0) - if len(results) > 0 { - firstWindow := results[0] - require.Len(t, firstWindow, 1) + assert.Greater(t, len(results), 0, "应该至少触发一个窗口") + require.Greater(t, len(windowResults), 0, "应该至少有一个窗口结果") + + if len(windowResults) > 0 { + firstWindow := windowResults[0] + require.Len(t, firstWindow, 1, "第一个窗口应该只有一行结果") cnt := firstWindow[0]["cnt"].(float64) - assert.Greater(t, cnt, 0.0) + + // 验证第一个窗口包含数据 + assert.Greater(t, cnt, 0.0, "第一个窗口应该包含数据") + + // 使用处理时间时,窗口基于数据到达的处理时间 + // 窗口大小10秒,每秒发送一条数据 + // 第一个窗口应该在窗口大小时间(10秒)后触发 + // 在10秒内,应该会发送10条数据(每秒1条) + // 但由于窗口对齐到滑动步长(2秒),实际窗口范围可能略有不同 + + // 验证第一个窗口的数据量应该在合理范围内 + // 使用处理时间时,窗口包含的是在窗口大小时间内到达的所有数据 + // 窗口大小10秒,每秒1条,应该包含接近10条数据 + assert.GreaterOrEqual(t, cnt, 5.0, + "第一个窗口应该包含足够的数据(窗口大小10秒,每秒1条),实际: %.0f", cnt) + assert.LessOrEqual(t, cnt, 15.0, + "第一个窗口不应该超过15条数据,实际: %.0f", cnt) + + t.Logf("第一个窗口数据量: %.0f(使用处理时间,窗口大小10秒,每秒1条数据)", cnt) + } + + // 验证有多个窗口被触发(滑动窗口应该每2秒触发一次) + if len(windowResults) > 1 { + t.Logf("总共触发了 %d 个窗口", len(windowResults)) + // 验证后续窗口也包含数据 + for i := 1; i < len(windowResults) && i < 5; i++ { + if len(windowResults[i]) > 0 { + cnt := windowResults[i][0]["cnt"].(float64) + assert.Greater(t, cnt, 0.0, "窗口 %d 应该包含数据", i+1) + } + } } } @@ -80,17 +123,14 @@ func TestSQLSlidingWindow_WithAggregations(t *testing.T) { ch <- results }) - baseTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + // 使用处理时间,每秒发送一条数据 for i := 0; i < 15; i++ { - timestamp := baseTime.Add(time.Duration(i) * time.Second) temperature := float64(i) - ssql.Emit(map[string]interface{}{ "deviceId": "sensor001", "temperature": temperature, - "timestamp": timestamp, }) - time.Sleep(10 * time.Millisecond) + time.Sleep(1 * time.Second) } time.Sleep(5 * time.Second) @@ -167,14 +207,13 @@ func TestSQLSlidingWindow_MultipleWindowsAlignment(t *testing.T) { ch <- results }) - baseTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + // 使用处理时间,每秒发送一条数据 for i := 0; i < 15; i++ { ssql.Emit(map[string]interface{}{ "deviceId": "sensor001", "temperature": float64(i), - "timestamp": baseTime.Add(time.Duration(i) * time.Second), }) - time.Sleep(10 * time.Millisecond) + time.Sleep(1 * time.Second) } time.Sleep(8 * time.Second) @@ -269,33 +308,29 @@ func TestSQLSlidingWindow_MultiKeyGrouped(t *testing.T) { ch <- results }) - baseTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + // 使用处理时间,每秒发送一组数据 for i := 0; i < 8; i++ { ssql.Emit(map[string]interface{}{ "deviceId": "A", "region": "R1", "temperature": float64(i), - "timestamp": baseTime.Add(time.Duration(i) * time.Second), }) ssql.Emit(map[string]interface{}{ "deviceId": "B", "region": "R1", "temperature": float64(i + 10), - "timestamp": baseTime.Add(time.Duration(i) * time.Second), }) ssql.Emit(map[string]interface{}{ "deviceId": "A", "region": "R2", "temperature": float64(i + 20), - "timestamp": baseTime.Add(time.Duration(i) * time.Second), }) ssql.Emit(map[string]interface{}{ "deviceId": "B", "region": "R2", "temperature": float64(i + 30), - "timestamp": baseTime.Add(time.Duration(i) * time.Second), }) - time.Sleep(10 * time.Millisecond) + time.Sleep(1 * time.Second) } time.Sleep(3 * time.Second) @@ -353,3 +388,286 @@ END: } } } + +// TestSQLSlidingWindow_FirstWindowTiming 测试第一个窗口的触发时机 +// 验证第一个窗口应该在窗口大小时间后触发,而不是滑动步长时间后触发 +func TestSQLSlidingWindow_FirstWindowTiming(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := ` + SELECT deviceId, + COUNT(*) as cnt + FROM stream + GROUP BY deviceId, SlidingWindow('10s', '2s') + WITH (TIMESTAMP='timestamp') + ` + err := ssql.Execute(sql) + require.NoError(t, err) + + ch := make(chan []map[string]interface{}, 20) + windowTimings := make([]time.Time, 0) + ssql.AddSink(func(results []map[string]interface{}) { + if len(results) > 0 { + windowTimings = append(windowTimings, time.Now()) + ch <- results + } + }) + + // 记录第一个数据发送时间 + firstDataTime := time.Now() + + // 使用处理时间,每秒发送一条数据,共发送10条 + for i := 0; i < 10; i++ { + ssql.Emit(map[string]interface{}{ + "deviceId": "sensor001", + "temperature": float64(i), + }) + time.Sleep(1 * time.Second) + } + + // 等待第一个窗口触发(应该在窗口大小10秒后,而不是滑动步长2秒后) + timeout := time.After(12 * time.Second) + firstWindowReceived := false + + for { + select { + case res := <-ch: + if len(res) > 0 && !firstWindowReceived { + firstWindowReceived = true + firstWindowTime := windowTimings[0] + elapsed := firstWindowTime.Sub(firstDataTime) + + // 第一个窗口应该在窗口大小时间(10秒)后触发 + // 允许一些误差(±1秒),因为数据处理和调度可能有延迟 + assert.GreaterOrEqual(t, elapsed, 9*time.Second, + "第一个窗口应该在窗口大小时间(10秒)后触发,实际耗时: %v", elapsed) + assert.LessOrEqual(t, elapsed, 12*time.Second, + "第一个窗口不应该太晚触发,实际耗时: %v", elapsed) + + // 验证第一个窗口不应该在滑动步长时间(2秒)后就触发 + assert.Greater(t, elapsed, 3*time.Second, + "第一个窗口不应该在滑动步长时间(2秒)后就触发,实际耗时: %v", elapsed) + + cnt := res[0]["cnt"].(float64) + assert.Greater(t, cnt, 0.0, "第一个窗口应该包含数据") + t.Logf("第一个窗口触发时间: %v, 从第一个数据到触发耗时: %v, 窗口数据量: %.0f", + firstWindowTime, elapsed, cnt) + } + case <-timeout: + goto END + } + } + +END: + assert.True(t, firstWindowReceived, "应该至少收到第一个窗口") +} + +// TestSQLSlidingWindow_DataOverlap 测试滑动窗口的数据重叠正确性 +// 验证数据在多个窗口中正确保留,不会过早清理 +func TestSQLSlidingWindow_DataOverlap(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := ` + SELECT deviceId, + COUNT(*) as cnt, + MIN(temperature) as min_temp, + MAX(temperature) as max_temp + FROM stream + GROUP BY deviceId, SlidingWindow('10s', '2s') + ` + err := ssql.Execute(sql) + require.NoError(t, err) + + ch := make(chan []map[string]interface{}, 20) + windowResults := make([][]map[string]interface{}, 0) + ssql.AddSink(func(results []map[string]interface{}) { + if len(results) > 0 { + windowResults = append(windowResults, results) + ch <- results + } + }) + + // 使用处理时间,每秒发送一条数据,共发送15条 + // 窗口大小10秒,滑动步长2秒 + // 使用处理时间时,窗口基于数据到达的处理时间 + for i := 0; i < 15; i++ { + ssql.Emit(map[string]interface{}{ + "deviceId": "sensor001", + "temperature": float64(i), + }) + time.Sleep(1 * time.Second) + } + + // 等待足够的时间让多个窗口触发 + time.Sleep(12 * time.Second) + + timeout := time.After(3 * time.Second) + for { + select { + case <-ch: + // 继续收集结果 + case <-timeout: + goto END + } + } + +END: + require.GreaterOrEqual(t, len(windowResults), 3, "应该至少触发3个窗口") + + // 验证第一个窗口包含数据0-9 + if len(windowResults) > 0 { + firstWindow := windowResults[0] + require.Len(t, firstWindow, 1) + firstRow := firstWindow[0] + firstCnt := firstRow["cnt"].(float64) + firstMin := firstRow["min_temp"].(float64) + firstMax := firstRow["max_temp"].(float64) + + // 使用处理时间时,第一个窗口应该包含在窗口大小时间内到达的数据 + // 窗口大小10秒,每秒1条数据,应该包含约10条数据 + // 但由于窗口对齐和数据处理延迟,实际数量可能略有不同 + assert.GreaterOrEqual(t, firstCnt, 5.0, + "第一个窗口应该包含足够的数据(窗口大小10秒,每秒1条),实际: %.0f", firstCnt) + assert.LessOrEqual(t, firstCnt, 15.0, + "第一个窗口不应该超过15条数据,实际: %.0f", firstCnt) + // 第一个窗口的最小值应该是0或接近0 + assert.LessOrEqual(t, firstMin, 1.0, + "第一个窗口的最小值应该接近0,实际: %.0f", firstMin) + // 第一个窗口的最大值应该大于0 + assert.GreaterOrEqual(t, firstMax, 0.0, + "第一个窗口的最大值应该大于等于0,实际: %.0f", firstMax) + + t.Logf("第一个窗口: cnt=%.0f, min=%.0f, max=%.0f", firstCnt, firstMin, firstMax) + } + + // 验证第二个窗口与第一个窗口有重叠 + if len(windowResults) > 1 { + secondWindow := windowResults[1] + require.Len(t, secondWindow, 1) + secondRow := secondWindow[0] + secondCnt := secondRow["cnt"].(float64) + secondMin := secondRow["min_temp"].(float64) + secondMax := secondRow["max_temp"].(float64) + + // 使用处理时间时,第二个窗口也应该包含足够的数据 + // 窗口大小10秒,每秒1条数据,应该包含约10条数据 + assert.GreaterOrEqual(t, secondCnt, 5.0, + "第二个窗口应该包含足够的数据(窗口大小10秒,每秒1条),实际: %.0f", secondCnt) + + // 验证重叠:第二个窗口的最小值应该大于第一个窗口的最小值 + // 因为窗口滑动,第二个窗口应该从数据2开始 + if len(windowResults) > 0 { + firstMin := windowResults[0][0]["min_temp"].(float64) + assert.GreaterOrEqual(t, secondMin, firstMin, + "第二个窗口的最小值应该大于等于第一个窗口的最小值,说明窗口正确滑动") + } + + t.Logf("第二个窗口: cnt=%.0f, min=%.0f, max=%.0f", secondCnt, secondMin, secondMax) + } + + // 验证窗口数据不会过早丢失 + // 检查是否有窗口的数据量异常少(可能是数据被过早清理) + for i, res := range windowResults { + if len(res) > 0 { + cnt := res[0]["cnt"].(float64) + // 对于前几个窗口,数据量不应该异常少 + // 使用处理时间时,窗口大小10秒,每秒1条,应该包含约10条数据 + if i < 3 { + assert.GreaterOrEqual(t, cnt, 5.0, + "窗口 %d 的数据量不应该异常少,可能是数据被过早清理,实际: %.0f", i+1, cnt) + } + } + } +} + +// TestSQLSlidingWindow_DataRetention 测试滑动窗口的数据保留逻辑 +// 验证数据在后续窗口中正确保留,不会过早清理 +func TestSQLSlidingWindow_DataRetention(t *testing.T) { + ssql := New() + defer ssql.Stop() + + sql := ` + SELECT deviceId, + COUNT(*) as cnt, + MIN(temperature) as min_temp, + MAX(temperature) as max_temp + FROM stream + GROUP BY deviceId, SlidingWindow('10s', '2s') + ` + err := ssql.Execute(sql) + require.NoError(t, err) + + ch := make(chan []map[string]interface{}, 20) + windowResults := make([][]map[string]interface{}, 0) + ssql.AddSink(func(results []map[string]interface{}) { + if len(results) > 0 { + windowResults = append(windowResults, results) + ch <- results + } + }) + + // 使用处理时间,每秒发送一条数据,共发送12条 + // 窗口大小10秒,滑动步长2秒 + // 使用处理时间时,窗口基于数据到达的处理时间 + for i := 0; i < 12; i++ { + ssql.Emit(map[string]interface{}{ + "deviceId": "sensor001", + "temperature": float64(i), + }) + time.Sleep(1 * time.Second) + } + + // 等待多个窗口触发 + time.Sleep(12 * time.Second) + + timeout := time.After(3 * time.Second) + for { + select { + case <-ch: + // 继续收集结果 + case <-timeout: + goto END + } + } + +END: + require.GreaterOrEqual(t, len(windowResults), 3, "应该至少触发3个窗口") + + // 验证数据保留:检查最小值的变化趋势 + // 由于窗口滑动,后续窗口的最小值应该逐渐增大 + // 但如果数据保留逻辑正确,不应该突然跳跃 + minTemps := make([]float64, 0) + for _, res := range windowResults { + if len(res) > 0 { + minTemp := res[0]["min_temp"].(float64) + minTemps = append(minTemps, minTemp) + } + } + + // 验证最小值是递增或保持稳定的(不应该突然跳跃) + for i := 1; i < len(minTemps); i++ { + prevMin := minTemps[i-1] + currMin := minTemps[i] + // 最小值应该递增或保持不变(窗口滑动导致) + // 但差值不应该太大(说明数据没有被过早清理) + assert.GreaterOrEqual(t, currMin, prevMin-1.0, + "窗口 %d 的最小值不应该比前一个窗口小太多,可能是数据被过早清理", i+1) + } + + // 验证窗口数据量:前几个窗口的数据量应该足够 + // 使用处理时间时,如果数据保留逻辑正确,窗口数据量应该逐渐减少(因为旧数据逐渐过期) + // 但减少应该是平滑的,不应该突然大幅减少 + for i := 0; i < len(windowResults) && i < 5; i++ { + if len(windowResults[i]) > 0 { + cnt := windowResults[i][0]["cnt"].(float64) + // 前几个窗口应该包含足够的数据(使用处理时间) + // 窗口大小10秒,每秒1条数据,应该包含约10条数据 + if i < 3 { + assert.GreaterOrEqual(t, cnt, 5.0, + "窗口 %d 应该包含足够的数据(窗口大小10秒,每秒1条),实际: %.0f", i+1, cnt) + } + } + } +} diff --git a/window/counting_window.go b/window/counting_window.go index 4780b15..3d8404f 100644 --- a/window/counting_window.go +++ b/window/counting_window.go @@ -24,7 +24,6 @@ import ( "sync" "github.com/rulego/streamsql/utils/cast" - "github.com/rulego/streamsql/utils/timex" "github.com/rulego/streamsql/types" ) @@ -225,13 +224,15 @@ func (cw *CountingWindow) createSlot(data []types.Row) *types.TimeSlot { if len(data) == 0 { return nil } else if len(data) < cw.threshold { - start := timex.AlignTime(data[0].Timestamp, cw.config.TimeUnit, true) - end := timex.AlignTime(data[len(data)-1].Timestamp, cw.config.TimeUnit, false) + // Use actual timestamps without alignment + start := data[0].Timestamp + end := data[len(data)-1].Timestamp slot := types.NewTimeSlot(&start, &end) return slot } else { - start := timex.AlignTime(data[0].Timestamp, cw.config.TimeUnit, true) - end := timex.AlignTime(data[cw.threshold-1].Timestamp, cw.config.TimeUnit, false) + // Use actual timestamps without alignment + start := data[0].Timestamp + end := data[cw.threshold-1].Timestamp slot := types.NewTimeSlot(&start, &end) return slot } diff --git a/window/session_window.go b/window/session_window.go index 5550999..9695589 100644 --- a/window/session_window.go +++ b/window/session_window.go @@ -25,7 +25,6 @@ import ( "github.com/rulego/streamsql/types" "github.com/rulego/streamsql/utils/cast" - "github.com/rulego/streamsql/utils/timex" ) // Ensure SessionWindow struct implements Window interface @@ -128,7 +127,9 @@ func (sw *SessionWindow) Add(data interface{}) { s, exists := sw.sessionMap[key] if !exists { // Create new session - start := timex.AlignTime(timestamp, sw.config.TimeUnit, true) + // Use the actual timestamp of the first data point as session start + // No alignment needed - session starts from when first data arrives + start := timestamp end := start.Add(sw.timeout) slot := types.NewTimeSlot(&start, &end) diff --git a/window/tumbling_window.go b/window/tumbling_window.go index 6536221..bea7a71 100644 --- a/window/tumbling_window.go +++ b/window/tumbling_window.go @@ -24,7 +24,6 @@ import ( "github.com/rulego/streamsql/types" "github.com/rulego/streamsql/utils/cast" - "github.com/rulego/streamsql/utils/timex" ) // Ensure TumblingWindow implements the Window interface @@ -120,7 +119,7 @@ func (tw *TumblingWindow) Add(data interface{}) { func (sw *TumblingWindow) createSlot(t time.Time) *types.TimeSlot { // Create a new time slot - start := timex.AlignTimeToWindow(t, sw.size) + start := t end := start.Add(sw.size) slot := types.NewTimeSlot(&start, &end) return slot