7 Commits

Author SHA1 Message Date
rulego-team 52653b2143 ci:测试超时设置600s 2025-12-21 11:57:52 +08:00
Whki 2db23f5b99 Merge pull request #45 from rulego/dependabot/go_modules/github.com/expr-lang/expr-1.17.7
chore(deps): bump github.com/expr-lang/expr from 1.17.6 to 1.17.7
2025-12-20 16:24:18 +08:00
rulego-team c7cece15e2 feat:window输出结果增加溢出策略 2025-12-20 16:15:53 +08:00
rulego-team 09fe63102e fix:SessionWindow和CountingWindow bufferSize初始化 2025-12-20 11:14:37 +08:00
rulego-team b405a935b7 增加同步的获取结果的方法 2025-12-20 11:10:21 +08:00
dependabot[bot] 1c781979a6 chore(deps): bump github.com/expr-lang/expr from 1.17.6 to 1.17.7
Bumps [github.com/expr-lang/expr](https://github.com/expr-lang/expr) from 1.17.6 to 1.17.7.
- [Release notes](https://github.com/expr-lang/expr/releases)
- [Commits](https://github.com/expr-lang/expr/compare/v1.17.6...v1.17.7)

---
updated-dependencies:
- dependency-name: github.com/expr-lang/expr
  dependency-version: 1.17.7
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 02:14:17 +00:00
rulego-team 16778f2ac3 test:fix test fail 2025-12-16 17:40:23 +08:00
18 changed files with 760 additions and 263 deletions
+1 -1
View File
@@ -32,7 +32,7 @@ jobs:
run: go mod download
- name: Run all tests
run: go test -v -race -timeout 300s ./...
run: go test -v -race -timeout 600s ./...
release:
name: Create Release
+1 -1
View File
@@ -3,7 +3,7 @@ module github.com/rulego/streamsql
go 1.18
require (
github.com/expr-lang/expr v1.17.6
github.com/expr-lang/expr v1.17.7
github.com/stretchr/testify v1.10.0
)
+2 -2
View File
@@ -1,7 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/expr-lang/expr v1.17.6 h1:1h6i8ONk9cexhDmowO/A64VPxHScu7qfSl2k8OlINec=
github.com/expr-lang/expr v1.17.6/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8=
github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+32 -2
View File
@@ -141,7 +141,7 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) {
s.sinksMux.RLock()
defer s.sinksMux.RUnlock()
if len(s.sinks) == 0 {
if len(s.sinks) == 0 && len(s.syncSinks) == 0 {
return
}
@@ -150,6 +150,19 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) {
for _, sink := range s.sinks {
s.submitSinkTask(sink, results)
}
// Execute synchronous sinks (blocking, sequential)
for _, sink := range s.syncSinks {
// Recover panic for each sync sink to prevent crashing the stream
func() {
defer func() {
if r := recover(); r != nil {
logger.Error("Sync sink execution exception: %v", r)
}
}()
sink(results)
}()
}
}
// submitSinkTask submits sink task
@@ -169,24 +182,41 @@ func (s *Stream) submitSinkTask(sink func([]map[string]interface{}), results []m
}
// Non-blocking task submission
// Note: Since we use a worker pool, tasks may be executed out of order
select {
case s.sinkWorkerPool <- task:
// Successfully submitted task
default:
// Worker pool is full, execute directly in current goroutine (degraded handling)
go task()
// This also helps with backpressure
task()
}
}
// AddSink adds a sink function
// Parameters:
// - sink: result processing function that receives []map[string]interface{} type result data
//
// Note: Sinks are executed asynchronously in a worker pool, so execution order is NOT guaranteed.
// If you need strict ordering, use GetResultsChan() instead.
func (s *Stream) AddSink(sink func([]map[string]interface{})) {
s.sinksMux.Lock()
defer s.sinksMux.Unlock()
s.sinks = append(s.sinks, sink)
}
// AddSyncSink adds a synchronous sink function
// Parameters:
// - sink: result processing function that receives []map[string]interface{} type result data
//
// Note: Sync sinks are executed sequentially in the result processing goroutine.
// They block subsequent processing, so they should be fast.
func (s *Stream) AddSyncSink(sink func([]map[string]interface{})) {
s.sinksMux.Lock()
defer s.sinksMux.Unlock()
s.syncSinks = append(s.syncSinks, sink)
}
// GetResultsChan gets the result channel
func (s *Stream) GetResultsChan() <-chan []map[string]interface{} {
return s.resultChan
+10 -1
View File
@@ -42,7 +42,7 @@ func (s *Stream) GetStats() map[string]int64 {
dataChanCap := int64(cap(s.dataChan))
s.dataChanMux.RUnlock()
return map[string]int64{
stats := map[string]int64{
InputCount: atomic.LoadInt64(&s.inputCount),
OutputCount: atomic.LoadInt64(&s.outputCount),
DroppedCount: atomic.LoadInt64(&s.droppedCount),
@@ -55,6 +55,15 @@ func (s *Stream) GetStats() map[string]int64 {
ActiveRetries: int64(atomic.LoadInt32(&s.activeRetries)),
Expanding: int64(atomic.LoadInt32(&s.expanding)),
}
if s.Window != nil {
winStats := s.Window.GetStats()
for k, v := range winStats {
stats[k] = v
}
}
return stats
}
// GetDetailedStats gets detailed performance statistics
+2 -1
View File
@@ -61,7 +61,8 @@ type Stream struct {
aggregator aggregator.Aggregator
config types.Config
sinks []func([]map[string]interface{})
resultChan chan []map[string]interface{} // Result channel
syncSinks []func([]map[string]interface{}) // Synchronous sinks, executed sequentially
resultChan chan []map[string]interface{} // Result channel
seenResults *sync.Map
done chan struct{} // Used to close processing goroutines
sinkWorkerPool chan func() // Sink worker pool to avoid blocking
-1
View File
@@ -176,5 +176,4 @@ func (sf *StreamFactory) validatePerformanceConfig(config types.PerformanceConfi
// startWorkerRoutines starts worker goroutines
func (sf *StreamFactory) startWorkerRoutines(stream *Stream, perfConfig types.PerformanceConfig) {
go stream.startSinkWorkerPool(perfConfig.WorkerConfig.SinkWorkerCount)
go stream.startResultConsumer()
}
+14
View File
@@ -338,6 +338,20 @@ func (s *Streamsql) AddSink(sink func([]map[string]interface{})) {
}
}
// AddSyncSink directly adds synchronous result processing callback functions.
// Convenience wrapper for Stream().AddSyncSink() for cleaner API calls.
//
// Parameters:
// - sink: Result processing function, receives []map[string]interface{} type result data
//
// Note: Sync sinks are executed sequentially in the result processing goroutine.
// Use this when order of execution matters.
func (s *Streamsql) AddSyncSink(sink func([]map[string]interface{})) {
if s.stream != nil {
s.stream.AddSyncSink(sink)
}
}
// PrintTable prints results to console in table format, similar to database output.
// Displays column names first, then data rows.
//
+150
View File
@@ -0,0 +1,150 @@
package streamsql
import (
"testing"
"time"
"github.com/rulego/streamsql/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestSQLIntegration_StrategyBlock 测试 SQL 集成下的阻塞策略
func TestSQLIntegration_StrategyBlock(t *testing.T) {
// 配置:输出缓冲为 1阻塞策略超时 100ms
ssql := New(WithCustomPerformance(types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 100,
ResultChannelSize: 100,
WindowOutputSize: 1,
},
OverflowConfig: types.OverflowConfig{
Strategy: types.OverflowStrategyBlock,
BlockTimeout: 100 * time.Millisecond,
AllowDataLoss: true,
},
WorkerConfig: types.WorkerConfig{
SinkPoolSize: 0, // 无缓冲任务队列
SinkWorkerCount: 1, // 1个 worker
},
}))
defer ssql.Stop()
// SQL: 每条数据触发一次窗口
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
err := ssql.Execute(rsql)
require.NoError(t, err)
// 添加同步 Sink 阻塞 Stream 处理,从而反压 Window
// 注意:必须在 Execute 之后添加,因为 Execute 才会创建 stream
ssql.AddSyncSink(func(results []map[string]interface{}) {
time.Sleep(500 * time.Millisecond)
})
// 发送 5 条数据
// d1: Worker 处理中 (阻塞 500ms)
// d2: Stream 尝试写入 WorkerPool -> 阻塞 (无缓冲)
// d3: Window OutputChan (size 1) -> 填满
// d4: Window OutputChan 满 -> 尝试写入 -> 阻塞 (Window Add) -> 放入 TriggerChan (size=1)
// d5: Window Add -> TriggerChan 满 -> 阻塞? No, Emit 是异步的?
// Emit 往 dataChan 写. DataProcessor 读 dataChan -> Window.Add.
// Window.Add 往 triggerChan 写.
//
// 修正分析:
// Window.Add 是非阻塞的 (如果 triggerChan 不满).
// CountingWindow triggerChan size = bufferSize = 1.
// Worker 协程: 从 triggerChan 读 -> 处理 -> sendResult (到 OutputChan).
//
// d1: Worker读triggerChan -> OutputChan -> Stream -> WorkerPool -> Worker(busy).
// d2: Worker读triggerChan -> OutputChan -> Stream -> Blocked on WorkerPool.
// 此时 Stream 持有 d2. OutputChan 空.
// Worker 协程 阻塞在 sendResult(d2)? No, Stream 取走了 d2, Stream 阻塞在 dispatch.
// 所以 OutputChan 是空的!
// Wait, Stream loop:
// result := <-OutputChan. (Stream has d2).
// handleResult(d2) -> Blocked.
// So OutputChan is empty.
// d3: Worker读triggerChan -> OutputChan (d3). Success.
// OutputChan has d3.
// d4: Worker读triggerChan -> OutputChan (d4). Blocked (OutputChan full).
// Worker 协程 阻塞在 sendResult(d4).
// d5: Add -> triggerChan (d5). Success (triggerChan size 1).
// d6: Add -> triggerChan (d6). Blocked (triggerChan full).
// Add blocks. DataProcessor blocks. Emit succeeds (dataChan).
//
// 所以 Window Worker 只有在 sendResult 阻塞时才触发 Drop logic.
// sendResult 只有在 OutputChan 满且超时时才 Drop.
//
// d4 阻塞在 sendResult.
// 100ms 后超时 -> Drop d4.
// Worker 继续.
//
// 所以 d4 应该是被 Drop 的那个.
// Sent: d1, d2, d3. (d5 在 triggerChan, d6 在 dataChan).
// Wait, d5 is in triggerChan, not processed yet.
// So Sent = 3. Dropped = 1 (d4).
for _, id := range []string{"d1", "d2", "d3", "d4", "d5"} {
ssql.Emit(map[string]interface{}{"deviceId": id})
time.Sleep(10 * time.Millisecond)
}
// 等待足够长的时间让 Stream 醒来并处理完,以及 Window 丢弃逻辑执行
time.Sleep(1000 * time.Millisecond)
// 获取统计信息
// d1: Stream 处理完
// d2: Stream 处理完 (Worker 醒来后处理 d2)
// d3: Dropped (Worker 阻塞 -> 超时)
// d4: Dropped (Worker 阻塞 -> 超时)
// d5: Dropped (Worker 阻塞 -> 超时)
// Total Sent: 2 (d1, d2).
// Dropped: 3 (d3, d4, d5).
stats := ssql.stream.GetStats()
assert.Equal(t, int64(3), stats["droppedCount"], "Should have 3 dropped window result due to overflow")
assert.Equal(t, int64(2), stats["sentCount"], "Should have 2 sent window result")
}
// TestSQLIntegration_StrategyDrop 测试 SQL 集成下的丢弃策略
func TestSQLIntegration_StrategyDrop(t *testing.T) {
// 配置:输出缓冲为 1丢弃策略
ssql := New(WithCustomPerformance(types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 100,
ResultChannelSize: 100,
WindowOutputSize: 1,
},
OverflowConfig: types.OverflowConfig{
Strategy: types.OverflowStrategyDrop,
},
}))
defer ssql.Stop()
// SQL: 每条数据触发一次窗口
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
err := ssql.Execute(rsql)
require.NoError(t, err)
// 连续发送 3 条数据
ssql.Emit(map[string]interface{}{"deviceId": "d1"})
ssql.Emit(map[string]interface{}{"deviceId": "d2"})
ssql.Emit(map[string]interface{}{"deviceId": "d3"})
// 等待处理完成
time.Sleep(200 * time.Millisecond)
// 对于 StrategyDrop它会挤掉旧数据所以 sentCount 应该持续增加
stats := ssql.stream.GetStats()
// d1, d2, d3 都会成功发送(虽然 d1, d2 可能被挤掉,但 sendResult 逻辑中挤掉旧的后写入新的算发送成功)
assert.Equal(t, int64(3), stats["sentCount"])
// 验证最终留在缓冲区的是最后一条数据 (d3)
// 注意AddSink 会启动 worker 从 OutputChan 读。
// 为了验证,我们直接从 Window 的 OutputChan 读
select {
case result := <-ssql.stream.Window.OutputChan():
assert.Equal(t, "d3", result[0].Data.(map[string]interface{})["deviceId"])
case <-time.After(100 * time.Millisecond):
// 如果已经被 AddSink 的 worker 读走了也正常,但由于我们没加 Sink所以应该在里面
}
}
+1 -1
View File
@@ -343,7 +343,7 @@ func TestStreamsqlDistinct(t *testing.T) {
// 等待窗口触发(处理时间模式)
//fmt.Println("等待窗口初始化...")
time.Sleep(1 * time.Second)
time.Sleep(1500 * time.Millisecond)
// 手动触发窗口
//fmt.Println("手动触发窗口")
+9
View File
@@ -6,6 +6,15 @@ import (
"github.com/rulego/streamsql/aggregator"
)
const (
// OverflowStrategyBlock blocks when buffer is full
OverflowStrategyBlock = "block"
// OverflowStrategyDrop drops data when buffer is full
OverflowStrategyDrop = "drop"
// OverflowStrategyExpand expands buffer when full (not implemented for windows yet)
OverflowStrategyExpand = "expand"
)
// Config stream processing configuration
type Config struct {
// SQL processing related configuration
+62 -25
View File
@@ -22,6 +22,8 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/rulego/streamsql/utils/cast"
@@ -75,12 +77,9 @@ func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error) {
}
// Use unified performance config to get window output buffer size
bufferSize := 100 // Default value, counting windows usually have smaller buffers
if (config.PerformanceConfig != types.PerformanceConfig{}) {
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize / 10 // Counting window uses 1/10 of buffer
if bufferSize < 10 {
bufferSize = 10 // Minimum value
}
bufferSize := 1000 // Default value
if config.PerformanceConfig.BufferConfig.WindowOutputSize > 0 {
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize
}
cw := &CountingWindow{
@@ -161,18 +160,7 @@ func (cw *CountingWindow) Start() {
cw.callback(data)
}
select {
case cw.outputChan <- data:
cw.mu.Lock()
cw.sentCount++
cw.mu.Unlock()
case <-cw.ctx.Done():
return
default:
cw.mu.Lock()
cw.droppedCount++
cw.mu.Unlock()
}
cw.sendResult(data)
} else {
cw.mu.Unlock()
}
@@ -184,6 +172,58 @@ func (cw *CountingWindow) Start() {
}()
}
func (cw *CountingWindow) sendResult(data []types.Row) {
strategy := cw.config.PerformanceConfig.OverflowConfig.Strategy
timeout := cw.config.PerformanceConfig.OverflowConfig.BlockTimeout
if strategy == types.OverflowStrategyBlock {
if timeout <= 0 {
timeout = 5 * time.Second
}
select {
case cw.outputChan <- data:
atomic.AddInt64(&cw.sentCount, 1)
case <-time.After(timeout):
// Timeout, check if data loss is allowed
if cw.config.PerformanceConfig.OverflowConfig.AllowDataLoss {
atomic.AddInt64(&cw.droppedCount, 1)
// Drop new data (simplest fallback for block)
} else {
atomic.AddInt64(&cw.droppedCount, 1)
}
case <-cw.ctx.Done():
return
}
return
}
// Default: "drop" strategy (implemented as Drop Oldest / Smart Drop)
// If the buffer is full, remove the oldest item to make space for the new item.
// This ensures that we always keep the most recent data, which is usually preferred in streaming.
select {
case cw.outputChan <- data:
atomic.AddInt64(&cw.sentCount, 1)
case <-cw.ctx.Done():
return
default:
// Try to drop oldest data to make room for new data
select {
case <-cw.outputChan:
// Successfully dropped one old item
select {
case cw.outputChan <- data:
atomic.AddInt64(&cw.sentCount, 1)
default:
// Still failed, drop current
atomic.AddInt64(&cw.droppedCount, 1)
}
default:
// Channel empty, try to send again or drop
atomic.AddInt64(&cw.droppedCount, 1)
}
}
}
func (cw *CountingWindow) Trigger() {
// Note: trigger logic has been merged into Start method to avoid data races
// This method is kept to satisfy Window interface requirements, but actual triggering is handled in Start method
@@ -211,17 +251,14 @@ func (cw *CountingWindow) Reset() {
cw.dataBuffer = nil
cw.keyedBuffer = make(map[string][]types.Row)
cw.keyedCount = make(map[string]int)
cw.sentCount = 0
cw.droppedCount = 0
atomic.StoreInt64(&cw.sentCount, 0)
atomic.StoreInt64(&cw.droppedCount, 0)
}
func (cw *CountingWindow) GetStats() map[string]int64 {
cw.mu.Lock()
defer cw.mu.Unlock()
return map[string]int64{
"sentCount": cw.sentCount,
"droppedCount": cw.droppedCount,
"sentCount": atomic.LoadInt64(&cw.sentCount),
"droppedCount": atomic.LoadInt64(&cw.droppedCount),
"bufferSize": int64(cap(cw.outputChan)),
"bufferUsed": int64(len(cw.outputChan)),
}
+1
View File
@@ -42,6 +42,7 @@ type Window interface {
OutputChan() <-chan []types.Row
SetCallback(callback func([]types.Row))
Trigger()
GetStats() map[string]int64
}
func CreateWindow(config types.WindowConfig) (Window, error) {
+100 -24
View File
@@ -19,8 +19,10 @@ package window
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/rulego/streamsql/types"
@@ -59,6 +61,9 @@ type SessionWindow struct {
watermark *Watermark
// triggeredSessions stores sessions that have been triggered but are still open for late data (for EventTime with allowedLateness)
triggeredSessions map[string]*sessionInfo
// Performance statistics
sentCount int64 // Number of successfully sent results
droppedCount int64 // Number of dropped results
}
// sessionInfo stores information about a triggered session that is still open for late data
@@ -92,12 +97,9 @@ func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error) {
}
// Use unified performance configuration to get window output buffer size
bufferSize := 100 // Default value, session windows typically have smaller buffers
if (config.PerformanceConfig != types.PerformanceConfig{}) {
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize / 10 // Session window uses 1/10 of buffer
if bufferSize < 10 {
bufferSize = 10 // Minimum value
}
bufferSize := 1000 // Default value
if config.PerformanceConfig.BufferConfig.WindowOutputSize > 0 {
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize
}
// Determine time characteristic (default to ProcessingTime for backward compatibility)
@@ -411,13 +413,67 @@ func (sw *SessionWindow) sendResults(resultsToSend [][]types.Row) {
sw.callback(result)
}
sw.sendResult(result)
}
}
func (sw *SessionWindow) sendResult(data []types.Row) {
strategy := sw.config.PerformanceConfig.OverflowConfig.Strategy
timeout := sw.config.PerformanceConfig.OverflowConfig.BlockTimeout
if strategy == types.OverflowStrategyBlock {
if timeout <= 0 {
timeout = 5 * time.Second
}
select {
case sw.outputChan <- result:
case sw.outputChan <- data:
atomic.AddInt64(&sw.sentCount, 1)
case <-time.After(timeout):
atomic.AddInt64(&sw.droppedCount, 1)
case <-sw.ctx.Done():
return
}
return
}
// Default: "drop" strategy (implemented as Drop Oldest / Smart Drop)
// If the buffer is full, remove the oldest item to make space for the new item.
// This ensures that we always keep the most recent data, which is usually preferred in streaming.
select {
case sw.outputChan <- data:
atomic.AddInt64(&sw.sentCount, 1)
default:
// Try to drop oldest data
select {
case <-sw.outputChan:
select {
case sw.outputChan <- data:
atomic.AddInt64(&sw.sentCount, 1)
default:
atomic.AddInt64(&sw.droppedCount, 1)
}
default:
atomic.AddInt64(&sw.droppedCount, 1)
}
}
}
// GetStats returns window performance statistics
func (sw *SessionWindow) GetStats() map[string]int64 {
return map[string]int64{
"sentCount": atomic.LoadInt64(&sw.sentCount),
"droppedCount": atomic.LoadInt64(&sw.droppedCount),
"bufferSize": int64(cap(sw.outputChan)),
"bufferUsed": int64(len(sw.outputChan)),
}
}
// ResetStats resets performance statistics
func (sw *SessionWindow) ResetStats() {
atomic.StoreInt64(&sw.sentCount, 0)
atomic.StoreInt64(&sw.droppedCount, 0)
}
// Trigger manually triggers all session windows
func (sw *SessionWindow) Trigger() {
sw.mu.Lock()
@@ -450,13 +506,7 @@ func (sw *SessionWindow) Trigger() {
sw.callback(result)
}
// Non-blocking send to output channel
select {
case sw.outputChan <- result:
// Successfully sent
default:
// Channel full, drop result (could add statistics here if needed)
}
sw.sendResult(result)
}
}
@@ -550,13 +600,7 @@ func (sw *SessionWindow) triggerLateUpdateLocked(s *session) {
callback(resultData)
}
// Non-blocking send to output channel
select {
case sw.outputChan <- resultData:
// Successfully sent
default:
// Channel full, drop result
}
sw.sendResult(resultData)
// Re-acquire lock
sw.mu.Lock()
@@ -578,12 +622,44 @@ func extractSessionCompositeKey(data interface{}, keys []string) string {
if len(keys) == 0 {
return "default"
}
parts := make([]string, 0, len(keys))
// Fast path for map[string]interface{}
if m, ok := data.(map[string]interface{}); ok {
parts := make([]string, 0, len(keys))
for _, k := range keys {
parts = append(parts, fmt.Sprintf("%v", m[k]))
if val, exists := m[k]; exists {
parts = append(parts, cast.ToString(val))
} else {
parts = append(parts, "")
}
}
return strings.Join(parts, "|")
}
return "default"
// Use reflection for structs and other types
v := reflect.ValueOf(data)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
parts := make([]string, 0, len(keys))
for _, k := range keys {
var part string
switch v.Kind() {
case reflect.Map:
if v.Type().Key().Kind() == reflect.String {
mv := v.MapIndex(reflect.ValueOf(k))
if mv.IsValid() {
part = cast.ToString(mv.Interface())
}
}
case reflect.Struct:
f := v.FieldByName(k)
if f.IsValid() {
part = cast.ToString(f.Interface())
}
}
parts = append(parts, part)
}
return strings.Join(parts, "|")
}
+98 -107
View File
File diff suppressed because it is too large Load Diff
+204
View File
@@ -0,0 +1,204 @@
package window
import (
"testing"
"time"
"github.com/rulego/streamsql/types"
"github.com/rulego/streamsql/utils/cast"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestOverflowStrategies 测试不同的缓冲区溢出策略
func TestOverflowStrategies(t *testing.T) {
t.Run("CountingWindow_StrategyBlock_Timeout", func(t *testing.T) {
// 配置窗口大小1每1条数据触发一次输出缓冲1阻塞策略超时100ms
config := types.WindowConfig{
Type: "CountingWindow",
Params: []interface{}{1}, // Threshold = 1
PerformanceConfig: types.PerformanceConfig{
BufferConfig: types.BufferConfig{
WindowOutputSize: 1,
},
OverflowConfig: types.OverflowConfig{
Strategy: types.OverflowStrategyBlock,
BlockTimeout: 100 * time.Millisecond,
AllowDataLoss: true, // 允许丢弃统计
},
},
}
win, err := NewCountingWindow(config)
require.NoError(t, err)
win.Start()
defer win.Stop()
// 1. 发送第1条数据触发窗口填充 outputChan (容量1)
win.Add(map[string]interface{}{"id": 1})
// 等待处理
time.Sleep(50 * time.Millisecond)
stats := win.GetStats()
assert.Equal(t, int64(1), stats["sentCount"])
assert.Equal(t, int64(1), stats["bufferUsed"]) // 应该还在缓冲区中,因为没人读
// 2. 发送第2条数据触发窗口
// 此时 outputChan 已满sendResult 应该阻塞 100ms 然后超时丢弃
win.Add(map[string]interface{}{"id": 2})
// 等待超时 (100ms) + 处理时间
time.Sleep(200 * time.Millisecond)
stats = win.GetStats()
// 第1条仍在缓冲区因为没人读
// 第2条因为阻塞超时被丢弃
assert.Equal(t, int64(1), stats["bufferUsed"])
assert.Equal(t, int64(1), stats["droppedCount"])
// 3. 读取缓冲区中的数据,腾出空间
select {
case <-win.OutputChan():
// 读出第1条
default:
t.Fatal("expected data in output channel")
}
// 4. 发送第3条数据
win.Add(map[string]interface{}{"id": 3})
time.Sleep(50 * time.Millisecond)
stats = win.GetStats()
assert.Equal(t, int64(2), stats["sentCount"]) // 第1条和第3条发送成功
assert.Equal(t, int64(1), stats["droppedCount"]) // 第2条丢弃
})
t.Run("SessionWindow_StrategyBlock_Timeout", func(t *testing.T) {
// 配置会话超时50ms输出缓冲1阻塞策略超时50ms
config := types.WindowConfig{
Type: "SessionWindow",
Params: []interface{}{"50ms"},
PerformanceConfig: types.PerformanceConfig{
BufferConfig: types.BufferConfig{
WindowOutputSize: 1,
},
OverflowConfig: types.OverflowConfig{
Strategy: types.OverflowStrategyBlock,
BlockTimeout: 50 * time.Millisecond,
AllowDataLoss: true,
},
},
}
win, err := NewSessionWindow(config)
require.NoError(t, err)
win.Start()
defer win.Stop()
// 1. 发送数据,开始一个 session
win.Add(map[string]interface{}{"id": 1})
// 2. 等待 session 超时 (50ms) + 检查周期 (timeout/2 = 25ms)
// 确保 session 被触发并发送到 outputChan
time.Sleep(100 * time.Millisecond)
stats := win.GetStats()
assert.Equal(t, int64(1), stats["sentCount"])
assert.Equal(t, int64(1), stats["bufferUsed"])
// 3. 发送数据开始第二个 session (因为上一个已经结束)
win.Add(map[string]interface{}{"id": 2})
// 4. 等待 session 超时
// 此时 outputChan 已满,应该阻塞并丢弃
time.Sleep(150 * time.Millisecond)
stats = win.GetStats()
assert.Equal(t, int64(1), stats["bufferUsed"])
assert.Equal(t, int64(1), stats["droppedCount"])
})
t.Run("CountingWindow_StrategyDrop", func(t *testing.T) {
// 配置窗口大小1输出缓冲1丢弃策略
config := types.WindowConfig{
Type: "CountingWindow",
Params: []interface{}{1},
PerformanceConfig: types.PerformanceConfig{
BufferConfig: types.BufferConfig{
WindowOutputSize: 1,
},
OverflowConfig: types.OverflowConfig{
Strategy: types.OverflowStrategyDrop,
},
},
}
win, err := NewCountingWindow(config)
require.NoError(t, err)
win.Start()
defer win.Stop()
// 1. 发送第1条数据填充 outputChan
win.Add(map[string]interface{}{"id": 1})
time.Sleep(50 * time.Millisecond)
// 2. 发送第2条数据
// outputChan 已满StrategyDrop 会尝试丢弃旧数据outputChan头部来放入新数据
win.Add(map[string]interface{}{"id": 2})
time.Sleep(50 * time.Millisecond)
stats := win.GetStats()
assert.Equal(t, int64(2), stats["sentCount"])
// 验证现在缓冲区里是第2条数据
select {
case data := <-win.OutputChan():
assert.Len(t, data, 1)
assert.Equal(t, 2, cast.ToInt(data[0].Data.(map[string]interface{})["id"]))
default:
t.Fatal("expected data in output channel")
}
})
t.Run("TumblingWindow_StrategyBlock_Timeout", func(t *testing.T) {
// 配置窗口大小50ms输出缓冲1阻塞策略超时50ms
config := types.WindowConfig{
Type: "TumblingWindow",
Params: []interface{}{"50ms"},
PerformanceConfig: types.PerformanceConfig{
BufferConfig: types.BufferConfig{
WindowOutputSize: 1,
},
OverflowConfig: types.OverflowConfig{
Strategy: types.OverflowStrategyBlock,
BlockTimeout: 50 * time.Millisecond,
AllowDataLoss: true,
},
},
}
win, err := NewTumblingWindow(config)
require.NoError(t, err)
win.Start()
defer win.Stop()
// 1. 发送数据触发第1个窗口
win.Add(map[string]interface{}{"id": 1})
// 等待窗口触发 (50ms)
time.Sleep(100 * time.Millisecond)
stats := win.GetStats()
assert.Equal(t, int64(1), stats["sentCount"])
assert.Equal(t, int64(1), stats["bufferUsed"])
// 2. 发送数据触发第2个窗口
// 由于没有读取 outputChan第2个窗口触发时应该阻塞然后超时
win.Add(map[string]interface{}{"id": 2})
// 等待窗口触发 (50ms) + 阻塞超时 (50ms)
time.Sleep(150 * time.Millisecond)
stats = win.GetStats()
assert.Equal(t, int64(1), stats["bufferUsed"]) // 仍然只有第1个窗口的数据
assert.Equal(t, int64(1), stats["droppedCount"]) // 第2个窗口结果被丢弃
})
}
+72 -96
View File
@@ -21,6 +21,7 @@ import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"github.com/rulego/streamsql/types"
@@ -435,14 +436,7 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
windowEnd := tw.currentSlot.End
windowStart := tw.currentSlot.Start
// Trigger if watermark >= windowEnd
// In Flink, windows are triggered when watermark >= windowEnd.
// Watermark calculation: watermark = maxEventTime - maxOutOfOrderness
// So watermark >= windowEnd means: maxEventTime - maxOutOfOrderness >= windowEnd
// Which means: maxEventTime >= windowEnd + maxOutOfOrderness
// This ensures all data for the window has arrived (within maxOutOfOrderness tolerance)
// Check if watermark >= windowEnd
// Use !Before() instead of After() to include equality case
// This is equivalent to watermarkTime >= windowEnd
shouldTrigger := !watermarkTime.Before(*windowEnd)
debugLog("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
@@ -495,7 +489,18 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
debugLog("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
tw.triggerWindowLocked()
resultData := tw.extractWindowDataLocked()
if len(resultData) > 0 {
callback := tw.callback
tw.mu.Unlock()
if callback != nil {
callback(resultData)
}
tw.sendResult(resultData)
tw.mu.Lock()
}
triggeredCount++
debugLog("checkAndTriggerWindows: window triggered successfully, triggeredCount=%d", triggeredCount)
// triggerWindowLocked releases and re-acquires lock, so we need to re-check state
@@ -579,15 +584,24 @@ func (tw *TumblingWindow) handleLateData(eventTime time.Time, allowedLateness ti
if info.slot.Contains(eventTime) {
// This late data belongs to a triggered window that's still open
// Trigger window again with updated data (late update)
tw.triggerLateUpdateLocked(info.slot)
resultData := tw.extractLateUpdateDataLocked(info.slot)
if len(resultData) > 0 {
callback := tw.callback
tw.mu.Unlock()
if callback != nil {
callback(resultData)
}
tw.sendResult(resultData)
tw.mu.Lock()
}
return
}
}
}
// triggerLateUpdateLocked triggers a late update for a window (must be called with lock held)
// extractLateUpdateDataLocked extracts late update data for a window (must be called with lock held)
// This implements Flink-like behavior: late updates include complete window data (original + late data)
func (tw *TumblingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
func (tw *TumblingWindow) extractLateUpdateDataLocked(slot *types.TimeSlot) []types.Row {
// Find the triggered window info to get snapshot data
var windowInfo *triggeredWindowInfo
windowKey := tw.getWindowKey(*slot.End)
@@ -619,7 +633,7 @@ func (tw *TumblingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
}
if len(resultData) == 0 {
return
return nil
}
// Update snapshot to include late data (for future late updates)
@@ -635,34 +649,7 @@ func (tw *TumblingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
}
}
// Get callback reference before releasing lock
callback := tw.callback
// Release lock before calling callback and sending to channel to avoid blocking
tw.mu.Unlock()
if callback != nil {
callback(resultData)
}
// Non-blocking send to output channel and update statistics
var sent bool
select {
case tw.outputChan <- resultData:
// Successfully sent
sent = true
default:
// Channel full, drop result
sent = false
}
// Re-acquire lock to update statistics
tw.mu.Lock()
if sent {
tw.sentCount++
} else {
tw.droppedCount++
}
return resultData
}
// getWindowKey generates a key for a window based on its end time
@@ -670,10 +657,10 @@ func (tw *TumblingWindow) getWindowKey(endTime time.Time) string {
return fmt.Sprintf("%d", endTime.UnixNano())
}
// triggerWindowLocked triggers the window (must be called with lock held)
func (tw *TumblingWindow) triggerWindowLocked() {
// extractWindowDataLocked extracts current window data (must be called with lock held)
func (tw *TumblingWindow) extractWindowDataLocked() []types.Row {
if tw.currentSlot == nil {
return
return nil
}
// Extract current window data
@@ -688,7 +675,7 @@ func (tw *TumblingWindow) triggerWindowLocked() {
// Skip triggering if window has no data
// This prevents empty windows from being triggered
if len(resultData) == 0 {
return
return nil
}
// Remove data that belongs to current window
@@ -700,33 +687,47 @@ func (tw *TumblingWindow) triggerWindowLocked() {
}
tw.data = newData
// Get callback reference before releasing lock
callback := tw.callback
return resultData
}
// Release lock before calling callback and sending to channel to avoid blocking
tw.mu.Unlock()
func (tw *TumblingWindow) sendResult(data []types.Row) {
strategy := tw.config.PerformanceConfig.OverflowConfig.Strategy
timeout := tw.config.PerformanceConfig.OverflowConfig.BlockTimeout
if callback != nil {
callback(resultData)
if strategy == types.OverflowStrategyBlock {
if timeout <= 0 {
timeout = 5 * time.Second
}
select {
case tw.outputChan <- data:
atomic.AddInt64(&tw.sentCount, 1)
case <-time.After(timeout):
atomic.AddInt64(&tw.droppedCount, 1)
case <-tw.ctx.Done():
return
}
return
}
// Non-blocking send to output channel and update statistics
var sent bool
// Default: "drop" strategy (implemented as Drop Oldest / Smart Drop)
// If the buffer is full, remove the oldest item to make space for the new item.
// This ensures that we always keep the most recent data, which is usually preferred in streaming.
select {
case tw.outputChan <- resultData:
// Successfully sent
sent = true
case tw.outputChan <- data:
atomic.AddInt64(&tw.sentCount, 1)
default:
// Channel full, drop result
sent = false
}
// Re-acquire lock to update statistics
tw.mu.Lock()
if sent {
tw.sentCount++
} else {
tw.droppedCount++
// Try to drop oldest data
select {
case <-tw.outputChan:
select {
case tw.outputChan <- data:
atomic.AddInt64(&tw.sentCount, 1)
default:
atomic.AddInt64(&tw.droppedCount, 1)
}
default:
atomic.AddInt64(&tw.droppedCount, 1)
}
}
}
@@ -801,27 +802,8 @@ func (tw *TumblingWindow) Trigger() {
callback(resultData)
}
// Non-blocking send to output channel and update statistics
var sent bool
select {
case tw.outputChan <- resultData:
// Successfully sent
sent = true
default:
// Channel full, drop result
sent = false
}
// Re-acquire lock to update statistics
tw.mu.Lock()
if sent {
tw.sentCount++
} else {
tw.droppedCount++
// Optional: add logging here
// log.Printf("Window output channel full, dropped result with %d rows", len(resultData))
}
tw.mu.Unlock()
// Use sendResult to respect overflow strategy
tw.sendResult(resultData)
}
// Reset resets tumbling window data
@@ -889,12 +871,9 @@ func (tw *TumblingWindow) SetCallback(callback func([]types.Row)) {
// GetStats returns window performance statistics
func (tw *TumblingWindow) GetStats() map[string]int64 {
tw.mu.RLock()
defer tw.mu.RUnlock()
return map[string]int64{
"sentCount": tw.sentCount,
"droppedCount": tw.droppedCount,
"sentCount": atomic.LoadInt64(&tw.sentCount),
"droppedCount": atomic.LoadInt64(&tw.droppedCount),
"bufferSize": int64(cap(tw.outputChan)),
"bufferUsed": int64(len(tw.outputChan)),
}
@@ -902,9 +881,6 @@ func (tw *TumblingWindow) GetStats() map[string]int64 {
// ResetStats resets performance statistics
func (tw *TumblingWindow) ResetStats() {
tw.mu.Lock()
defer tw.mu.Unlock()
tw.sentCount = 0
tw.droppedCount = 0
atomic.StoreInt64(&tw.sentCount, 0)
atomic.StoreInt64(&tw.droppedCount, 0)
}
+1 -1
View File
@@ -584,7 +584,7 @@ func TestWindowWithPerformanceConfig(t *testing.T) {
name: "计数窗口-高性能配置",
windowType: TypeCounting,
performanceConfig: types.HighPerformanceConfig(),
expectedBufferSize: 20, // 200 / 10
expectedBufferSize: 200,
extraParams: map[string]interface{}{"count": 10},
},
{