mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-19 08:40:40 +00:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 52653b2143 | |||
| 2db23f5b99 | |||
| c7cece15e2 | |||
| 09fe63102e | |||
| b405a935b7 | |||
| 1c781979a6 | |||
| 16778f2ac3 | |||
| ba2cdd5629 | |||
| 47b7e07c6b | |||
| 9739f213e9 | |||
| 4528d1f1a6 | |||
| 6ce76d506c |
@@ -32,7 +32,7 @@ jobs:
|
||||
run: go mod download
|
||||
|
||||
- name: Run all tests
|
||||
run: go test -v -race -timeout 300s ./...
|
||||
run: go test -v -race -timeout 600s ./...
|
||||
|
||||
release:
|
||||
name: Create Release
|
||||
|
||||
@@ -131,7 +131,25 @@ func validateBasicSyntax(exprStr string) error {
|
||||
}
|
||||
|
||||
// Check for invalid characters
|
||||
inQuotes := false
|
||||
var quoteChar rune
|
||||
|
||||
for i, ch := range trimmed {
|
||||
// Handle quotes
|
||||
if ch == '\'' || ch == '"' {
|
||||
if !inQuotes {
|
||||
inQuotes = true
|
||||
quoteChar = ch
|
||||
} else if ch == quoteChar {
|
||||
inQuotes = false
|
||||
}
|
||||
}
|
||||
|
||||
// If inside quotes, skip character validation
|
||||
if inQuotes {
|
||||
continue
|
||||
}
|
||||
|
||||
// Allowed characters: letters, numbers, operators, parentheses, dots, underscores, spaces, quotes
|
||||
if !isValidChar(ch) {
|
||||
return fmt.Errorf("invalid character '%c' at position %d", ch, i)
|
||||
|
||||
@@ -3,7 +3,7 @@ module github.com/rulego/streamsql
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/expr-lang/expr v1.17.6
|
||||
github.com/expr-lang/expr v1.17.7
|
||||
github.com/stretchr/testify v1.10.0
|
||||
)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/expr-lang/expr v1.17.6 h1:1h6i8ONk9cexhDmowO/A64VPxHScu7qfSl2k8OlINec=
|
||||
github.com/expr-lang/expr v1.17.6/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
|
||||
github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8=
|
||||
github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
|
||||
@@ -141,7 +141,7 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) {
|
||||
s.sinksMux.RLock()
|
||||
defer s.sinksMux.RUnlock()
|
||||
|
||||
if len(s.sinks) == 0 {
|
||||
if len(s.sinks) == 0 && len(s.syncSinks) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -150,6 +150,19 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) {
|
||||
for _, sink := range s.sinks {
|
||||
s.submitSinkTask(sink, results)
|
||||
}
|
||||
|
||||
// Execute synchronous sinks (blocking, sequential)
|
||||
for _, sink := range s.syncSinks {
|
||||
// Recover panic for each sync sink to prevent crashing the stream
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logger.Error("Sync sink execution exception: %v", r)
|
||||
}
|
||||
}()
|
||||
sink(results)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// submitSinkTask submits sink task
|
||||
@@ -169,24 +182,41 @@ func (s *Stream) submitSinkTask(sink func([]map[string]interface{}), results []m
|
||||
}
|
||||
|
||||
// Non-blocking task submission
|
||||
// Note: Since we use a worker pool, tasks may be executed out of order
|
||||
select {
|
||||
case s.sinkWorkerPool <- task:
|
||||
// Successfully submitted task
|
||||
default:
|
||||
// Worker pool is full, execute directly in current goroutine (degraded handling)
|
||||
go task()
|
||||
// This also helps with backpressure
|
||||
task()
|
||||
}
|
||||
}
|
||||
|
||||
// AddSink adds a sink function
|
||||
// Parameters:
|
||||
// - sink: result processing function that receives []map[string]interface{} type result data
|
||||
//
|
||||
// Note: Sinks are executed asynchronously in a worker pool, so execution order is NOT guaranteed.
|
||||
// If you need strict ordering, use GetResultsChan() instead.
|
||||
func (s *Stream) AddSink(sink func([]map[string]interface{})) {
|
||||
s.sinksMux.Lock()
|
||||
defer s.sinksMux.Unlock()
|
||||
s.sinks = append(s.sinks, sink)
|
||||
}
|
||||
|
||||
// AddSyncSink adds a synchronous sink function
|
||||
// Parameters:
|
||||
// - sink: result processing function that receives []map[string]interface{} type result data
|
||||
//
|
||||
// Note: Sync sinks are executed sequentially in the result processing goroutine.
|
||||
// They block subsequent processing, so they should be fast.
|
||||
func (s *Stream) AddSyncSink(sink func([]map[string]interface{})) {
|
||||
s.sinksMux.Lock()
|
||||
defer s.sinksMux.Unlock()
|
||||
s.syncSinks = append(s.syncSinks, sink)
|
||||
}
|
||||
|
||||
// GetResultsChan gets the result channel
|
||||
func (s *Stream) GetResultsChan() <-chan []map[string]interface{} {
|
||||
return s.resultChan
|
||||
|
||||
@@ -42,7 +42,7 @@ func (s *Stream) GetStats() map[string]int64 {
|
||||
dataChanCap := int64(cap(s.dataChan))
|
||||
s.dataChanMux.RUnlock()
|
||||
|
||||
return map[string]int64{
|
||||
stats := map[string]int64{
|
||||
InputCount: atomic.LoadInt64(&s.inputCount),
|
||||
OutputCount: atomic.LoadInt64(&s.outputCount),
|
||||
DroppedCount: atomic.LoadInt64(&s.droppedCount),
|
||||
@@ -55,6 +55,15 @@ func (s *Stream) GetStats() map[string]int64 {
|
||||
ActiveRetries: int64(atomic.LoadInt32(&s.activeRetries)),
|
||||
Expanding: int64(atomic.LoadInt32(&s.expanding)),
|
||||
}
|
||||
|
||||
if s.Window != nil {
|
||||
winStats := s.Window.GetStats()
|
||||
for k, v := range winStats {
|
||||
stats[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// GetDetailedStats gets detailed performance statistics
|
||||
|
||||
@@ -61,7 +61,32 @@ func (s *Stream) compileSimpleFieldInfo(fieldSpec string) *fieldProcessInfo {
|
||||
}
|
||||
|
||||
// Parse alias
|
||||
parts := strings.Split(fieldSpec, ":")
|
||||
var parts []string
|
||||
// Helper to split field spec considering quotes
|
||||
splitFieldSpec := func(spec string) []string {
|
||||
inQuote := false
|
||||
var quoteChar byte
|
||||
for i := 0; i < len(spec); i++ {
|
||||
c := spec[i]
|
||||
if inQuote {
|
||||
if c == quoteChar {
|
||||
inQuote = false
|
||||
}
|
||||
} else {
|
||||
if c == '\'' || c == '"' || c == '`' {
|
||||
inQuote = true
|
||||
quoteChar = c
|
||||
} else if c == ':' {
|
||||
// Found separator
|
||||
return []string{spec[:i], spec[i+1:]}
|
||||
}
|
||||
}
|
||||
}
|
||||
// No separator found outside quotes
|
||||
return []string{spec}
|
||||
}
|
||||
|
||||
parts = splitFieldSpec(fieldSpec)
|
||||
info.fieldName = parts[0]
|
||||
// Remove backticks from field name
|
||||
if len(info.fieldName) >= 2 && info.fieldName[0] == '`' && info.fieldName[len(info.fieldName)-1] == '`' {
|
||||
@@ -398,7 +423,32 @@ func (s *Stream) processSingleFieldFallback(fieldSpec string, dataMap map[string
|
||||
}
|
||||
|
||||
// Handle alias
|
||||
parts := strings.Split(fieldSpec, ":")
|
||||
var parts []string
|
||||
// Helper to split field spec considering quotes
|
||||
splitFieldSpec := func(spec string) []string {
|
||||
inQuote := false
|
||||
var quoteChar byte
|
||||
for i := 0; i < len(spec); i++ {
|
||||
c := spec[i]
|
||||
if inQuote {
|
||||
if c == quoteChar {
|
||||
inQuote = false
|
||||
}
|
||||
} else {
|
||||
if c == '\'' || c == '"' || c == '`' {
|
||||
inQuote = true
|
||||
quoteChar = c
|
||||
} else if c == ':' {
|
||||
// Found separator
|
||||
return []string{spec[:i], spec[i+1:]}
|
||||
}
|
||||
}
|
||||
}
|
||||
// No separator found outside quotes
|
||||
return []string{spec}
|
||||
}
|
||||
|
||||
parts = splitFieldSpec(fieldSpec)
|
||||
fieldName := parts[0]
|
||||
outputName := fieldName
|
||||
if len(parts) > 1 {
|
||||
|
||||
+2
-1
@@ -61,7 +61,8 @@ type Stream struct {
|
||||
aggregator aggregator.Aggregator
|
||||
config types.Config
|
||||
sinks []func([]map[string]interface{})
|
||||
resultChan chan []map[string]interface{} // Result channel
|
||||
syncSinks []func([]map[string]interface{}) // Synchronous sinks, executed sequentially
|
||||
resultChan chan []map[string]interface{} // Result channel
|
||||
seenResults *sync.Map
|
||||
done chan struct{} // Used to close processing goroutines
|
||||
sinkWorkerPool chan func() // Sink worker pool to avoid blocking
|
||||
|
||||
@@ -176,5 +176,4 @@ func (sf *StreamFactory) validatePerformanceConfig(config types.PerformanceConfi
|
||||
// startWorkerRoutines starts worker goroutines
|
||||
func (sf *StreamFactory) startWorkerRoutines(stream *Stream, perfConfig types.PerformanceConfig) {
|
||||
go stream.startSinkWorkerPool(perfConfig.WorkerConfig.SinkWorkerCount)
|
||||
go stream.startResultConsumer()
|
||||
}
|
||||
|
||||
@@ -338,6 +338,20 @@ func (s *Streamsql) AddSink(sink func([]map[string]interface{})) {
|
||||
}
|
||||
}
|
||||
|
||||
// AddSyncSink directly adds synchronous result processing callback functions.
|
||||
// Convenience wrapper for Stream().AddSyncSink() for cleaner API calls.
|
||||
//
|
||||
// Parameters:
|
||||
// - sink: Result processing function, receives []map[string]interface{} type result data
|
||||
//
|
||||
// Note: Sync sinks are executed sequentially in the result processing goroutine.
|
||||
// Use this when order of execution matters.
|
||||
func (s *Streamsql) AddSyncSink(sink func([]map[string]interface{})) {
|
||||
if s.stream != nil {
|
||||
s.stream.AddSyncSink(sink)
|
||||
}
|
||||
}
|
||||
|
||||
// PrintTable prints results to console in table format, similar to database output.
|
||||
// Displays column names first, then data rows.
|
||||
//
|
||||
|
||||
@@ -1066,9 +1066,20 @@ func TestSQLSlidingWindow_MaxOutOfOrderness(t *testing.T) {
|
||||
}
|
||||
|
||||
// 第三阶段:发送更多正常数据,推进 watermark
|
||||
// 关键:要触发窗口,需要 watermark >= windowEnd
|
||||
// watermark = maxEventTime - maxOutOfOrderness
|
||||
// 所以需要:maxEventTime >= windowEnd + maxOutOfOrderness
|
||||
windowSizeMs := int64(2000) // 2秒
|
||||
maxOutOfOrdernessMs := int64(1000) // 1秒
|
||||
firstWindowEnd := baseTime + windowSizeMs
|
||||
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
|
||||
t.Log("第三阶段:继续发送正常数据,推进 watermark")
|
||||
for i := 10; i < 15; i++ {
|
||||
eventTime := baseTime + int64(i*200)
|
||||
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
|
||||
if i == 10 && eventTime < requiredEventTimeForTrigger {
|
||||
eventTime = requiredEventTimeForTrigger
|
||||
}
|
||||
ssql.Emit(map[string]interface{}{
|
||||
"deviceId": "sensor001",
|
||||
"eventTime": eventTime,
|
||||
@@ -1192,11 +1203,17 @@ func TestSQLSlidingWindow_AllowedLateness(t *testing.T) {
|
||||
}
|
||||
|
||||
// 推进watermark,触发第一个窗口
|
||||
// 发送事件时间超过第一个窗口结束时间的数据
|
||||
firstWindowEnd := baseTime + int64(2000) // 第一个窗口结束时间
|
||||
// 关键:要触发窗口,需要 watermark >= windowEnd
|
||||
// watermark = maxEventTime - maxOutOfOrderness
|
||||
// 所以需要:maxEventTime >= windowEnd + maxOutOfOrderness
|
||||
windowSizeMs := int64(2000) // 2秒
|
||||
maxOutOfOrdernessMs := int64(1000) // 1秒
|
||||
firstWindowEnd := baseTime + windowSizeMs
|
||||
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
|
||||
// 发送事件时间 >= requiredEventTimeForTrigger 的数据,确保 watermark >= windowEnd
|
||||
ssql.Emit(map[string]interface{}{
|
||||
"deviceId": "sensor001",
|
||||
"eventTime": firstWindowEnd + int64(2000),
|
||||
"eventTime": requiredEventTimeForTrigger,
|
||||
"temperature": 100.0,
|
||||
})
|
||||
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
package streamsql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestSQLIntegration_StrategyBlock 测试 SQL 集成下的阻塞策略
|
||||
func TestSQLIntegration_StrategyBlock(t *testing.T) {
|
||||
// 配置:输出缓冲为 1,阻塞策略,超时 100ms
|
||||
ssql := New(WithCustomPerformance(types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
DataChannelSize: 100,
|
||||
ResultChannelSize: 100,
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyBlock,
|
||||
BlockTimeout: 100 * time.Millisecond,
|
||||
AllowDataLoss: true,
|
||||
},
|
||||
WorkerConfig: types.WorkerConfig{
|
||||
SinkPoolSize: 0, // 无缓冲任务队列
|
||||
SinkWorkerCount: 1, // 1个 worker
|
||||
},
|
||||
}))
|
||||
defer ssql.Stop()
|
||||
|
||||
// SQL: 每条数据触发一次窗口
|
||||
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
|
||||
err := ssql.Execute(rsql)
|
||||
require.NoError(t, err)
|
||||
|
||||
// 添加同步 Sink 阻塞 Stream 处理,从而反压 Window
|
||||
// 注意:必须在 Execute 之后添加,因为 Execute 才会创建 stream
|
||||
ssql.AddSyncSink(func(results []map[string]interface{}) {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
})
|
||||
|
||||
// 发送 5 条数据
|
||||
// d1: Worker 处理中 (阻塞 500ms)
|
||||
// d2: Stream 尝试写入 WorkerPool -> 阻塞 (无缓冲)
|
||||
// d3: Window OutputChan (size 1) -> 填满
|
||||
// d4: Window OutputChan 满 -> 尝试写入 -> 阻塞 (Window Add) -> 放入 TriggerChan (size=1)
|
||||
// d5: Window Add -> TriggerChan 满 -> 阻塞? No, Emit 是异步的?
|
||||
// Emit 往 dataChan 写. DataProcessor 读 dataChan -> Window.Add.
|
||||
// Window.Add 往 triggerChan 写.
|
||||
//
|
||||
// 修正分析:
|
||||
// Window.Add 是非阻塞的 (如果 triggerChan 不满).
|
||||
// CountingWindow triggerChan size = bufferSize = 1.
|
||||
// Worker 协程: 从 triggerChan 读 -> 处理 -> sendResult (到 OutputChan).
|
||||
//
|
||||
// d1: Worker读triggerChan -> OutputChan -> Stream -> WorkerPool -> Worker(busy).
|
||||
// d2: Worker读triggerChan -> OutputChan -> Stream -> Blocked on WorkerPool.
|
||||
// 此时 Stream 持有 d2. OutputChan 空.
|
||||
// Worker 协程 阻塞在 sendResult(d2)? No, Stream 取走了 d2, Stream 阻塞在 dispatch.
|
||||
// 所以 OutputChan 是空的!
|
||||
// Wait, Stream loop:
|
||||
// result := <-OutputChan. (Stream has d2).
|
||||
// handleResult(d2) -> Blocked.
|
||||
// So OutputChan is empty.
|
||||
// d3: Worker读triggerChan -> OutputChan (d3). Success.
|
||||
// OutputChan has d3.
|
||||
// d4: Worker读triggerChan -> OutputChan (d4). Blocked (OutputChan full).
|
||||
// Worker 协程 阻塞在 sendResult(d4).
|
||||
// d5: Add -> triggerChan (d5). Success (triggerChan size 1).
|
||||
// d6: Add -> triggerChan (d6). Blocked (triggerChan full).
|
||||
// Add blocks. DataProcessor blocks. Emit succeeds (dataChan).
|
||||
//
|
||||
// 所以 Window Worker 只有在 sendResult 阻塞时才触发 Drop logic.
|
||||
// sendResult 只有在 OutputChan 满且超时时才 Drop.
|
||||
//
|
||||
// d4 阻塞在 sendResult.
|
||||
// 100ms 后超时 -> Drop d4.
|
||||
// Worker 继续.
|
||||
//
|
||||
// 所以 d4 应该是被 Drop 的那个.
|
||||
// Sent: d1, d2, d3. (d5 在 triggerChan, d6 在 dataChan).
|
||||
// Wait, d5 is in triggerChan, not processed yet.
|
||||
// So Sent = 3. Dropped = 1 (d4).
|
||||
|
||||
for _, id := range []string{"d1", "d2", "d3", "d4", "d5"} {
|
||||
ssql.Emit(map[string]interface{}{"deviceId": id})
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
// 等待足够长的时间让 Stream 醒来并处理完,以及 Window 丢弃逻辑执行
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
|
||||
// 获取统计信息
|
||||
// d1: Stream 处理完
|
||||
// d2: Stream 处理完 (Worker 醒来后处理 d2)
|
||||
// d3: Dropped (Worker 阻塞 -> 超时)
|
||||
// d4: Dropped (Worker 阻塞 -> 超时)
|
||||
// d5: Dropped (Worker 阻塞 -> 超时)
|
||||
// Total Sent: 2 (d1, d2).
|
||||
// Dropped: 3 (d3, d4, d5).
|
||||
stats := ssql.stream.GetStats()
|
||||
assert.Equal(t, int64(3), stats["droppedCount"], "Should have 3 dropped window result due to overflow")
|
||||
assert.Equal(t, int64(2), stats["sentCount"], "Should have 2 sent window result")
|
||||
}
|
||||
|
||||
// TestSQLIntegration_StrategyDrop 测试 SQL 集成下的丢弃策略
|
||||
func TestSQLIntegration_StrategyDrop(t *testing.T) {
|
||||
// 配置:输出缓冲为 1,丢弃策略
|
||||
ssql := New(WithCustomPerformance(types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
DataChannelSize: 100,
|
||||
ResultChannelSize: 100,
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyDrop,
|
||||
},
|
||||
}))
|
||||
defer ssql.Stop()
|
||||
|
||||
// SQL: 每条数据触发一次窗口
|
||||
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
|
||||
err := ssql.Execute(rsql)
|
||||
require.NoError(t, err)
|
||||
|
||||
// 连续发送 3 条数据
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "d1"})
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "d2"})
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "d3"})
|
||||
|
||||
// 等待处理完成
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// 对于 StrategyDrop,它会挤掉旧数据,所以 sentCount 应该持续增加
|
||||
stats := ssql.stream.GetStats()
|
||||
// d1, d2, d3 都会成功发送(虽然 d1, d2 可能被挤掉,但 sendResult 逻辑中挤掉旧的后写入新的算发送成功)
|
||||
assert.Equal(t, int64(3), stats["sentCount"])
|
||||
|
||||
// 验证最终留在缓冲区的是最后一条数据 (d3)
|
||||
// 注意:AddSink 会启动 worker 从 OutputChan 读。
|
||||
// 为了验证,我们直接从 Window 的 OutputChan 读
|
||||
select {
|
||||
case result := <-ssql.stream.Window.OutputChan():
|
||||
assert.Equal(t, "d3", result[0].Data.(map[string]interface{})["deviceId"])
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// 如果已经被 AddSink 的 worker 读走了也正常,但由于我们没加 Sink,所以应该在里面
|
||||
}
|
||||
}
|
||||
+1
-1
@@ -343,7 +343,7 @@ func TestStreamsqlDistinct(t *testing.T) {
|
||||
|
||||
// 等待窗口触发(处理时间模式)
|
||||
//fmt.Println("等待窗口初始化...")
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
|
||||
// 手动触发窗口
|
||||
//fmt.Println("手动触发窗口")
|
||||
|
||||
@@ -448,7 +448,17 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
|
||||
})
|
||||
|
||||
// 模拟完整的延迟数据处理场景
|
||||
baseTime := time.Now().UnixMilli() - 10000
|
||||
// 关键:确保 baseTime 对齐到窗口边界,以便窗口对齐行为可预测
|
||||
windowSizeMs := int64(2000) // 2秒
|
||||
baseTimeRaw := time.Now().UnixMilli() - 10000
|
||||
baseTime := (baseTimeRaw / windowSizeMs) * windowSizeMs // 对齐到窗口边界
|
||||
maxOutOfOrdernessMs := int64(1000) // 1秒
|
||||
firstWindowEnd := baseTime + windowSizeMs
|
||||
// 关键:要触发窗口,需要 watermark >= windowEnd
|
||||
// watermark = maxEventTime - maxOutOfOrderness
|
||||
// 所以需要:maxEventTime - maxOutOfOrderness >= windowEnd
|
||||
// 即:maxEventTime >= windowEnd + maxOutOfOrderness
|
||||
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
|
||||
|
||||
// 第一阶段:发送正常顺序的数据
|
||||
t.Log("第一阶段:发送正常顺序的数据")
|
||||
@@ -482,9 +492,14 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
|
||||
}
|
||||
|
||||
// 第三阶段:继续发送正常数据,推进 watermark
|
||||
// 关键:必须发送事件时间 >= requiredEventTimeForTrigger 的数据,才能让 watermark >= windowEnd
|
||||
t.Log("第三阶段:继续发送正常数据,推进 watermark")
|
||||
for i := 10; i < 15; i++ {
|
||||
eventTime := baseTime + int64(i*200)
|
||||
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
|
||||
if i == 10 && eventTime < requiredEventTimeForTrigger {
|
||||
eventTime = requiredEventTimeForTrigger
|
||||
}
|
||||
ssql.Emit(map[string]interface{}{
|
||||
"deviceId": "sensor001",
|
||||
"eventTime": eventTime,
|
||||
|
||||
@@ -6,6 +6,15 @@ import (
|
||||
"github.com/rulego/streamsql/aggregator"
|
||||
)
|
||||
|
||||
const (
|
||||
// OverflowStrategyBlock blocks when buffer is full
|
||||
OverflowStrategyBlock = "block"
|
||||
// OverflowStrategyDrop drops data when buffer is full
|
||||
OverflowStrategyDrop = "drop"
|
||||
// OverflowStrategyExpand expands buffer when full (not implemented for windows yet)
|
||||
OverflowStrategyExpand = "expand"
|
||||
)
|
||||
|
||||
// Config stream processing configuration
|
||||
type Config struct {
|
||||
// SQL processing related configuration
|
||||
|
||||
+62
-25
@@ -22,6 +22,8 @@ import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/utils/cast"
|
||||
|
||||
@@ -75,12 +77,9 @@ func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error) {
|
||||
}
|
||||
|
||||
// Use unified performance config to get window output buffer size
|
||||
bufferSize := 100 // Default value, counting windows usually have smaller buffers
|
||||
if (config.PerformanceConfig != types.PerformanceConfig{}) {
|
||||
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize / 10 // Counting window uses 1/10 of buffer
|
||||
if bufferSize < 10 {
|
||||
bufferSize = 10 // Minimum value
|
||||
}
|
||||
bufferSize := 1000 // Default value
|
||||
if config.PerformanceConfig.BufferConfig.WindowOutputSize > 0 {
|
||||
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize
|
||||
}
|
||||
|
||||
cw := &CountingWindow{
|
||||
@@ -161,18 +160,7 @@ func (cw *CountingWindow) Start() {
|
||||
cw.callback(data)
|
||||
}
|
||||
|
||||
select {
|
||||
case cw.outputChan <- data:
|
||||
cw.mu.Lock()
|
||||
cw.sentCount++
|
||||
cw.mu.Unlock()
|
||||
case <-cw.ctx.Done():
|
||||
return
|
||||
default:
|
||||
cw.mu.Lock()
|
||||
cw.droppedCount++
|
||||
cw.mu.Unlock()
|
||||
}
|
||||
cw.sendResult(data)
|
||||
} else {
|
||||
cw.mu.Unlock()
|
||||
}
|
||||
@@ -184,6 +172,58 @@ func (cw *CountingWindow) Start() {
|
||||
}()
|
||||
}
|
||||
|
||||
func (cw *CountingWindow) sendResult(data []types.Row) {
|
||||
strategy := cw.config.PerformanceConfig.OverflowConfig.Strategy
|
||||
timeout := cw.config.PerformanceConfig.OverflowConfig.BlockTimeout
|
||||
|
||||
if strategy == types.OverflowStrategyBlock {
|
||||
if timeout <= 0 {
|
||||
timeout = 5 * time.Second
|
||||
}
|
||||
select {
|
||||
case cw.outputChan <- data:
|
||||
atomic.AddInt64(&cw.sentCount, 1)
|
||||
case <-time.After(timeout):
|
||||
// Timeout, check if data loss is allowed
|
||||
if cw.config.PerformanceConfig.OverflowConfig.AllowDataLoss {
|
||||
atomic.AddInt64(&cw.droppedCount, 1)
|
||||
// Drop new data (simplest fallback for block)
|
||||
} else {
|
||||
atomic.AddInt64(&cw.droppedCount, 1)
|
||||
}
|
||||
case <-cw.ctx.Done():
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Default: "drop" strategy (implemented as Drop Oldest / Smart Drop)
|
||||
// If the buffer is full, remove the oldest item to make space for the new item.
|
||||
// This ensures that we always keep the most recent data, which is usually preferred in streaming.
|
||||
select {
|
||||
case cw.outputChan <- data:
|
||||
atomic.AddInt64(&cw.sentCount, 1)
|
||||
case <-cw.ctx.Done():
|
||||
return
|
||||
default:
|
||||
// Try to drop oldest data to make room for new data
|
||||
select {
|
||||
case <-cw.outputChan:
|
||||
// Successfully dropped one old item
|
||||
select {
|
||||
case cw.outputChan <- data:
|
||||
atomic.AddInt64(&cw.sentCount, 1)
|
||||
default:
|
||||
// Still failed, drop current
|
||||
atomic.AddInt64(&cw.droppedCount, 1)
|
||||
}
|
||||
default:
|
||||
// Channel empty, try to send again or drop
|
||||
atomic.AddInt64(&cw.droppedCount, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cw *CountingWindow) Trigger() {
|
||||
// Note: trigger logic has been merged into Start method to avoid data races
|
||||
// This method is kept to satisfy Window interface requirements, but actual triggering is handled in Start method
|
||||
@@ -211,17 +251,14 @@ func (cw *CountingWindow) Reset() {
|
||||
cw.dataBuffer = nil
|
||||
cw.keyedBuffer = make(map[string][]types.Row)
|
||||
cw.keyedCount = make(map[string]int)
|
||||
cw.sentCount = 0
|
||||
cw.droppedCount = 0
|
||||
atomic.StoreInt64(&cw.sentCount, 0)
|
||||
atomic.StoreInt64(&cw.droppedCount, 0)
|
||||
}
|
||||
|
||||
func (cw *CountingWindow) GetStats() map[string]int64 {
|
||||
cw.mu.Lock()
|
||||
defer cw.mu.Unlock()
|
||||
|
||||
return map[string]int64{
|
||||
"sentCount": cw.sentCount,
|
||||
"droppedCount": cw.droppedCount,
|
||||
"sentCount": atomic.LoadInt64(&cw.sentCount),
|
||||
"droppedCount": atomic.LoadInt64(&cw.droppedCount),
|
||||
"bufferSize": int64(cap(cw.outputChan)),
|
||||
"bufferUsed": int64(len(cw.outputChan)),
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ type Window interface {
|
||||
OutputChan() <-chan []types.Row
|
||||
SetCallback(callback func([]types.Row))
|
||||
Trigger()
|
||||
GetStats() map[string]int64
|
||||
}
|
||||
|
||||
func CreateWindow(config types.WindowConfig) (Window, error) {
|
||||
|
||||
+100
-24
@@ -19,8 +19,10 @@ package window
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/types"
|
||||
@@ -59,6 +61,9 @@ type SessionWindow struct {
|
||||
watermark *Watermark
|
||||
// triggeredSessions stores sessions that have been triggered but are still open for late data (for EventTime with allowedLateness)
|
||||
triggeredSessions map[string]*sessionInfo
|
||||
// Performance statistics
|
||||
sentCount int64 // Number of successfully sent results
|
||||
droppedCount int64 // Number of dropped results
|
||||
}
|
||||
|
||||
// sessionInfo stores information about a triggered session that is still open for late data
|
||||
@@ -92,12 +97,9 @@ func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error) {
|
||||
}
|
||||
|
||||
// Use unified performance configuration to get window output buffer size
|
||||
bufferSize := 100 // Default value, session windows typically have smaller buffers
|
||||
if (config.PerformanceConfig != types.PerformanceConfig{}) {
|
||||
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize / 10 // Session window uses 1/10 of buffer
|
||||
if bufferSize < 10 {
|
||||
bufferSize = 10 // Minimum value
|
||||
}
|
||||
bufferSize := 1000 // Default value
|
||||
if config.PerformanceConfig.BufferConfig.WindowOutputSize > 0 {
|
||||
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize
|
||||
}
|
||||
|
||||
// Determine time characteristic (default to ProcessingTime for backward compatibility)
|
||||
@@ -411,13 +413,67 @@ func (sw *SessionWindow) sendResults(resultsToSend [][]types.Row) {
|
||||
sw.callback(result)
|
||||
}
|
||||
|
||||
sw.sendResult(result)
|
||||
}
|
||||
}
|
||||
|
||||
func (sw *SessionWindow) sendResult(data []types.Row) {
|
||||
strategy := sw.config.PerformanceConfig.OverflowConfig.Strategy
|
||||
timeout := sw.config.PerformanceConfig.OverflowConfig.BlockTimeout
|
||||
|
||||
if strategy == types.OverflowStrategyBlock {
|
||||
if timeout <= 0 {
|
||||
timeout = 5 * time.Second
|
||||
}
|
||||
select {
|
||||
case sw.outputChan <- result:
|
||||
case sw.outputChan <- data:
|
||||
atomic.AddInt64(&sw.sentCount, 1)
|
||||
case <-time.After(timeout):
|
||||
atomic.AddInt64(&sw.droppedCount, 1)
|
||||
case <-sw.ctx.Done():
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Default: "drop" strategy (implemented as Drop Oldest / Smart Drop)
|
||||
// If the buffer is full, remove the oldest item to make space for the new item.
|
||||
// This ensures that we always keep the most recent data, which is usually preferred in streaming.
|
||||
select {
|
||||
case sw.outputChan <- data:
|
||||
atomic.AddInt64(&sw.sentCount, 1)
|
||||
default:
|
||||
// Try to drop oldest data
|
||||
select {
|
||||
case <-sw.outputChan:
|
||||
select {
|
||||
case sw.outputChan <- data:
|
||||
atomic.AddInt64(&sw.sentCount, 1)
|
||||
default:
|
||||
atomic.AddInt64(&sw.droppedCount, 1)
|
||||
}
|
||||
default:
|
||||
atomic.AddInt64(&sw.droppedCount, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetStats returns window performance statistics
|
||||
func (sw *SessionWindow) GetStats() map[string]int64 {
|
||||
return map[string]int64{
|
||||
"sentCount": atomic.LoadInt64(&sw.sentCount),
|
||||
"droppedCount": atomic.LoadInt64(&sw.droppedCount),
|
||||
"bufferSize": int64(cap(sw.outputChan)),
|
||||
"bufferUsed": int64(len(sw.outputChan)),
|
||||
}
|
||||
}
|
||||
|
||||
// ResetStats resets performance statistics
|
||||
func (sw *SessionWindow) ResetStats() {
|
||||
atomic.StoreInt64(&sw.sentCount, 0)
|
||||
atomic.StoreInt64(&sw.droppedCount, 0)
|
||||
}
|
||||
|
||||
// Trigger manually triggers all session windows
|
||||
func (sw *SessionWindow) Trigger() {
|
||||
sw.mu.Lock()
|
||||
@@ -450,13 +506,7 @@ func (sw *SessionWindow) Trigger() {
|
||||
sw.callback(result)
|
||||
}
|
||||
|
||||
// Non-blocking send to output channel
|
||||
select {
|
||||
case sw.outputChan <- result:
|
||||
// Successfully sent
|
||||
default:
|
||||
// Channel full, drop result (could add statistics here if needed)
|
||||
}
|
||||
sw.sendResult(result)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -550,13 +600,7 @@ func (sw *SessionWindow) triggerLateUpdateLocked(s *session) {
|
||||
callback(resultData)
|
||||
}
|
||||
|
||||
// Non-blocking send to output channel
|
||||
select {
|
||||
case sw.outputChan <- resultData:
|
||||
// Successfully sent
|
||||
default:
|
||||
// Channel full, drop result
|
||||
}
|
||||
sw.sendResult(resultData)
|
||||
|
||||
// Re-acquire lock
|
||||
sw.mu.Lock()
|
||||
@@ -578,12 +622,44 @@ func extractSessionCompositeKey(data interface{}, keys []string) string {
|
||||
if len(keys) == 0 {
|
||||
return "default"
|
||||
}
|
||||
parts := make([]string, 0, len(keys))
|
||||
|
||||
// Fast path for map[string]interface{}
|
||||
if m, ok := data.(map[string]interface{}); ok {
|
||||
parts := make([]string, 0, len(keys))
|
||||
for _, k := range keys {
|
||||
parts = append(parts, fmt.Sprintf("%v", m[k]))
|
||||
if val, exists := m[k]; exists {
|
||||
parts = append(parts, cast.ToString(val))
|
||||
} else {
|
||||
parts = append(parts, "")
|
||||
}
|
||||
}
|
||||
return strings.Join(parts, "|")
|
||||
}
|
||||
return "default"
|
||||
|
||||
// Use reflection for structs and other types
|
||||
v := reflect.ValueOf(data)
|
||||
if v.Kind() == reflect.Ptr {
|
||||
v = v.Elem()
|
||||
}
|
||||
|
||||
parts := make([]string, 0, len(keys))
|
||||
for _, k := range keys {
|
||||
var part string
|
||||
switch v.Kind() {
|
||||
case reflect.Map:
|
||||
if v.Type().Key().Kind() == reflect.String {
|
||||
mv := v.MapIndex(reflect.ValueOf(k))
|
||||
if mv.IsValid() {
|
||||
part = cast.ToString(mv.Interface())
|
||||
}
|
||||
}
|
||||
case reflect.Struct:
|
||||
f := v.FieldByName(k)
|
||||
if f.IsValid() {
|
||||
part = cast.ToString(f.Interface())
|
||||
}
|
||||
}
|
||||
parts = append(parts, part)
|
||||
}
|
||||
return strings.Join(parts, "|")
|
||||
}
|
||||
|
||||
+154
-112
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,204 @@
|
||||
package window
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/types"
|
||||
"github.com/rulego/streamsql/utils/cast"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestOverflowStrategies 测试不同的缓冲区溢出策略
|
||||
func TestOverflowStrategies(t *testing.T) {
|
||||
t.Run("CountingWindow_StrategyBlock_Timeout", func(t *testing.T) {
|
||||
// 配置:窗口大小1(每1条数据触发一次),输出缓冲1,阻塞策略,超时100ms
|
||||
config := types.WindowConfig{
|
||||
Type: "CountingWindow",
|
||||
Params: []interface{}{1}, // Threshold = 1
|
||||
PerformanceConfig: types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyBlock,
|
||||
BlockTimeout: 100 * time.Millisecond,
|
||||
AllowDataLoss: true, // 允许丢弃统计
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
win, err := NewCountingWindow(config)
|
||||
require.NoError(t, err)
|
||||
win.Start()
|
||||
defer win.Stop()
|
||||
|
||||
// 1. 发送第1条数据,触发窗口,填充 outputChan (容量1)
|
||||
win.Add(map[string]interface{}{"id": 1})
|
||||
|
||||
// 等待处理
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
stats := win.GetStats()
|
||||
assert.Equal(t, int64(1), stats["sentCount"])
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"]) // 应该还在缓冲区中,因为没人读
|
||||
|
||||
// 2. 发送第2条数据,触发窗口
|
||||
// 此时 outputChan 已满,sendResult 应该阻塞 100ms 然后超时丢弃
|
||||
win.Add(map[string]interface{}{"id": 2})
|
||||
|
||||
// 等待超时 (100ms) + 处理时间
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
stats = win.GetStats()
|
||||
// 第1条仍在缓冲区(因为没人读)
|
||||
// 第2条因为阻塞超时被丢弃
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"])
|
||||
assert.Equal(t, int64(1), stats["droppedCount"])
|
||||
|
||||
// 3. 读取缓冲区中的数据,腾出空间
|
||||
select {
|
||||
case <-win.OutputChan():
|
||||
// 读出第1条
|
||||
default:
|
||||
t.Fatal("expected data in output channel")
|
||||
}
|
||||
|
||||
// 4. 发送第3条数据
|
||||
win.Add(map[string]interface{}{"id": 3})
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
stats = win.GetStats()
|
||||
assert.Equal(t, int64(2), stats["sentCount"]) // 第1条和第3条发送成功
|
||||
assert.Equal(t, int64(1), stats["droppedCount"]) // 第2条丢弃
|
||||
})
|
||||
|
||||
t.Run("SessionWindow_StrategyBlock_Timeout", func(t *testing.T) {
|
||||
// 配置:会话超时50ms,输出缓冲1,阻塞策略,超时50ms
|
||||
config := types.WindowConfig{
|
||||
Type: "SessionWindow",
|
||||
Params: []interface{}{"50ms"},
|
||||
PerformanceConfig: types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyBlock,
|
||||
BlockTimeout: 50 * time.Millisecond,
|
||||
AllowDataLoss: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
win, err := NewSessionWindow(config)
|
||||
require.NoError(t, err)
|
||||
win.Start()
|
||||
defer win.Stop()
|
||||
|
||||
// 1. 发送数据,开始一个 session
|
||||
win.Add(map[string]interface{}{"id": 1})
|
||||
|
||||
// 2. 等待 session 超时 (50ms) + 检查周期 (timeout/2 = 25ms)
|
||||
// 确保 session 被触发并发送到 outputChan
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
stats := win.GetStats()
|
||||
assert.Equal(t, int64(1), stats["sentCount"])
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"])
|
||||
|
||||
// 3. 发送数据开始第二个 session (因为上一个已经结束)
|
||||
win.Add(map[string]interface{}{"id": 2})
|
||||
|
||||
// 4. 等待 session 超时
|
||||
// 此时 outputChan 已满,应该阻塞并丢弃
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
stats = win.GetStats()
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"])
|
||||
assert.Equal(t, int64(1), stats["droppedCount"])
|
||||
})
|
||||
|
||||
t.Run("CountingWindow_StrategyDrop", func(t *testing.T) {
|
||||
// 配置:窗口大小1,输出缓冲1,丢弃策略
|
||||
config := types.WindowConfig{
|
||||
Type: "CountingWindow",
|
||||
Params: []interface{}{1},
|
||||
PerformanceConfig: types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyDrop,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
win, err := NewCountingWindow(config)
|
||||
require.NoError(t, err)
|
||||
win.Start()
|
||||
defer win.Stop()
|
||||
|
||||
// 1. 发送第1条数据,填充 outputChan
|
||||
win.Add(map[string]interface{}{"id": 1})
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// 2. 发送第2条数据
|
||||
// outputChan 已满,StrategyDrop 会尝试丢弃旧数据(outputChan头部)来放入新数据
|
||||
win.Add(map[string]interface{}{"id": 2})
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
stats := win.GetStats()
|
||||
assert.Equal(t, int64(2), stats["sentCount"])
|
||||
|
||||
// 验证现在缓冲区里是第2条数据
|
||||
select {
|
||||
case data := <-win.OutputChan():
|
||||
assert.Len(t, data, 1)
|
||||
assert.Equal(t, 2, cast.ToInt(data[0].Data.(map[string]interface{})["id"]))
|
||||
default:
|
||||
t.Fatal("expected data in output channel")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("TumblingWindow_StrategyBlock_Timeout", func(t *testing.T) {
|
||||
// 配置:窗口大小50ms,输出缓冲1,阻塞策略,超时50ms
|
||||
config := types.WindowConfig{
|
||||
Type: "TumblingWindow",
|
||||
Params: []interface{}{"50ms"},
|
||||
PerformanceConfig: types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyBlock,
|
||||
BlockTimeout: 50 * time.Millisecond,
|
||||
AllowDataLoss: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
win, err := NewTumblingWindow(config)
|
||||
require.NoError(t, err)
|
||||
win.Start()
|
||||
defer win.Stop()
|
||||
|
||||
// 1. 发送数据触发第1个窗口
|
||||
win.Add(map[string]interface{}{"id": 1})
|
||||
// 等待窗口触发 (50ms)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
stats := win.GetStats()
|
||||
assert.Equal(t, int64(1), stats["sentCount"])
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"])
|
||||
|
||||
// 2. 发送数据触发第2个窗口
|
||||
// 由于没有读取 outputChan,第2个窗口触发时应该阻塞然后超时
|
||||
win.Add(map[string]interface{}{"id": 2})
|
||||
// 等待窗口触发 (50ms) + 阻塞超时 (50ms)
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
stats = win.GetStats()
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"]) // 仍然只有第1个窗口的数据
|
||||
assert.Equal(t, int64(1), stats["droppedCount"]) // 第2个窗口结果被丢弃
|
||||
})
|
||||
}
|
||||
+130
-96
File diff suppressed because it is too large
Load Diff
@@ -584,7 +584,7 @@ func TestWindowWithPerformanceConfig(t *testing.T) {
|
||||
name: "计数窗口-高性能配置",
|
||||
windowType: TypeCounting,
|
||||
performanceConfig: types.HighPerformanceConfig(),
|
||||
expectedBufferSize: 20, // 200 / 10
|
||||
expectedBufferSize: 200,
|
||||
extraParams: map[string]interface{}{"count": 10},
|
||||
},
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user