mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-14 14:27:27 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 52653b2143 | |||
| 2db23f5b99 | |||
| c7cece15e2 | |||
| 09fe63102e | |||
| b405a935b7 | |||
| 1c781979a6 | |||
| 16778f2ac3 |
@@ -32,7 +32,7 @@ jobs:
|
||||
run: go mod download
|
||||
|
||||
- name: Run all tests
|
||||
run: go test -v -race -timeout 300s ./...
|
||||
run: go test -v -race -timeout 600s ./...
|
||||
|
||||
release:
|
||||
name: Create Release
|
||||
|
||||
@@ -3,7 +3,7 @@ module github.com/rulego/streamsql
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/expr-lang/expr v1.17.6
|
||||
github.com/expr-lang/expr v1.17.7
|
||||
github.com/stretchr/testify v1.10.0
|
||||
)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/expr-lang/expr v1.17.6 h1:1h6i8ONk9cexhDmowO/A64VPxHScu7qfSl2k8OlINec=
|
||||
github.com/expr-lang/expr v1.17.6/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
|
||||
github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8=
|
||||
github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
|
||||
@@ -141,7 +141,7 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) {
|
||||
s.sinksMux.RLock()
|
||||
defer s.sinksMux.RUnlock()
|
||||
|
||||
if len(s.sinks) == 0 {
|
||||
if len(s.sinks) == 0 && len(s.syncSinks) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -150,6 +150,19 @@ func (s *Stream) callSinksAsync(results []map[string]interface{}) {
|
||||
for _, sink := range s.sinks {
|
||||
s.submitSinkTask(sink, results)
|
||||
}
|
||||
|
||||
// Execute synchronous sinks (blocking, sequential)
|
||||
for _, sink := range s.syncSinks {
|
||||
// Recover panic for each sync sink to prevent crashing the stream
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logger.Error("Sync sink execution exception: %v", r)
|
||||
}
|
||||
}()
|
||||
sink(results)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// submitSinkTask submits sink task
|
||||
@@ -169,24 +182,41 @@ func (s *Stream) submitSinkTask(sink func([]map[string]interface{}), results []m
|
||||
}
|
||||
|
||||
// Non-blocking task submission
|
||||
// Note: Since we use a worker pool, tasks may be executed out of order
|
||||
select {
|
||||
case s.sinkWorkerPool <- task:
|
||||
// Successfully submitted task
|
||||
default:
|
||||
// Worker pool is full, execute directly in current goroutine (degraded handling)
|
||||
go task()
|
||||
// This also helps with backpressure
|
||||
task()
|
||||
}
|
||||
}
|
||||
|
||||
// AddSink adds a sink function
|
||||
// Parameters:
|
||||
// - sink: result processing function that receives []map[string]interface{} type result data
|
||||
//
|
||||
// Note: Sinks are executed asynchronously in a worker pool, so execution order is NOT guaranteed.
|
||||
// If you need strict ordering, use GetResultsChan() instead.
|
||||
func (s *Stream) AddSink(sink func([]map[string]interface{})) {
|
||||
s.sinksMux.Lock()
|
||||
defer s.sinksMux.Unlock()
|
||||
s.sinks = append(s.sinks, sink)
|
||||
}
|
||||
|
||||
// AddSyncSink adds a synchronous sink function
|
||||
// Parameters:
|
||||
// - sink: result processing function that receives []map[string]interface{} type result data
|
||||
//
|
||||
// Note: Sync sinks are executed sequentially in the result processing goroutine.
|
||||
// They block subsequent processing, so they should be fast.
|
||||
func (s *Stream) AddSyncSink(sink func([]map[string]interface{})) {
|
||||
s.sinksMux.Lock()
|
||||
defer s.sinksMux.Unlock()
|
||||
s.syncSinks = append(s.syncSinks, sink)
|
||||
}
|
||||
|
||||
// GetResultsChan gets the result channel
|
||||
func (s *Stream) GetResultsChan() <-chan []map[string]interface{} {
|
||||
return s.resultChan
|
||||
|
||||
@@ -42,7 +42,7 @@ func (s *Stream) GetStats() map[string]int64 {
|
||||
dataChanCap := int64(cap(s.dataChan))
|
||||
s.dataChanMux.RUnlock()
|
||||
|
||||
return map[string]int64{
|
||||
stats := map[string]int64{
|
||||
InputCount: atomic.LoadInt64(&s.inputCount),
|
||||
OutputCount: atomic.LoadInt64(&s.outputCount),
|
||||
DroppedCount: atomic.LoadInt64(&s.droppedCount),
|
||||
@@ -55,6 +55,15 @@ func (s *Stream) GetStats() map[string]int64 {
|
||||
ActiveRetries: int64(atomic.LoadInt32(&s.activeRetries)),
|
||||
Expanding: int64(atomic.LoadInt32(&s.expanding)),
|
||||
}
|
||||
|
||||
if s.Window != nil {
|
||||
winStats := s.Window.GetStats()
|
||||
for k, v := range winStats {
|
||||
stats[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// GetDetailedStats gets detailed performance statistics
|
||||
|
||||
+2
-1
@@ -61,7 +61,8 @@ type Stream struct {
|
||||
aggregator aggregator.Aggregator
|
||||
config types.Config
|
||||
sinks []func([]map[string]interface{})
|
||||
resultChan chan []map[string]interface{} // Result channel
|
||||
syncSinks []func([]map[string]interface{}) // Synchronous sinks, executed sequentially
|
||||
resultChan chan []map[string]interface{} // Result channel
|
||||
seenResults *sync.Map
|
||||
done chan struct{} // Used to close processing goroutines
|
||||
sinkWorkerPool chan func() // Sink worker pool to avoid blocking
|
||||
|
||||
@@ -176,5 +176,4 @@ func (sf *StreamFactory) validatePerformanceConfig(config types.PerformanceConfi
|
||||
// startWorkerRoutines starts worker goroutines
|
||||
func (sf *StreamFactory) startWorkerRoutines(stream *Stream, perfConfig types.PerformanceConfig) {
|
||||
go stream.startSinkWorkerPool(perfConfig.WorkerConfig.SinkWorkerCount)
|
||||
go stream.startResultConsumer()
|
||||
}
|
||||
|
||||
@@ -338,6 +338,20 @@ func (s *Streamsql) AddSink(sink func([]map[string]interface{})) {
|
||||
}
|
||||
}
|
||||
|
||||
// AddSyncSink directly adds synchronous result processing callback functions.
|
||||
// Convenience wrapper for Stream().AddSyncSink() for cleaner API calls.
|
||||
//
|
||||
// Parameters:
|
||||
// - sink: Result processing function, receives []map[string]interface{} type result data
|
||||
//
|
||||
// Note: Sync sinks are executed sequentially in the result processing goroutine.
|
||||
// Use this when order of execution matters.
|
||||
func (s *Streamsql) AddSyncSink(sink func([]map[string]interface{})) {
|
||||
if s.stream != nil {
|
||||
s.stream.AddSyncSink(sink)
|
||||
}
|
||||
}
|
||||
|
||||
// PrintTable prints results to console in table format, similar to database output.
|
||||
// Displays column names first, then data rows.
|
||||
//
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
package streamsql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestSQLIntegration_StrategyBlock 测试 SQL 集成下的阻塞策略
|
||||
func TestSQLIntegration_StrategyBlock(t *testing.T) {
|
||||
// 配置:输出缓冲为 1,阻塞策略,超时 100ms
|
||||
ssql := New(WithCustomPerformance(types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
DataChannelSize: 100,
|
||||
ResultChannelSize: 100,
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyBlock,
|
||||
BlockTimeout: 100 * time.Millisecond,
|
||||
AllowDataLoss: true,
|
||||
},
|
||||
WorkerConfig: types.WorkerConfig{
|
||||
SinkPoolSize: 0, // 无缓冲任务队列
|
||||
SinkWorkerCount: 1, // 1个 worker
|
||||
},
|
||||
}))
|
||||
defer ssql.Stop()
|
||||
|
||||
// SQL: 每条数据触发一次窗口
|
||||
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
|
||||
err := ssql.Execute(rsql)
|
||||
require.NoError(t, err)
|
||||
|
||||
// 添加同步 Sink 阻塞 Stream 处理,从而反压 Window
|
||||
// 注意:必须在 Execute 之后添加,因为 Execute 才会创建 stream
|
||||
ssql.AddSyncSink(func(results []map[string]interface{}) {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
})
|
||||
|
||||
// 发送 5 条数据
|
||||
// d1: Worker 处理中 (阻塞 500ms)
|
||||
// d2: Stream 尝试写入 WorkerPool -> 阻塞 (无缓冲)
|
||||
// d3: Window OutputChan (size 1) -> 填满
|
||||
// d4: Window OutputChan 满 -> 尝试写入 -> 阻塞 (Window Add) -> 放入 TriggerChan (size=1)
|
||||
// d5: Window Add -> TriggerChan 满 -> 阻塞? No, Emit 是异步的?
|
||||
// Emit 往 dataChan 写. DataProcessor 读 dataChan -> Window.Add.
|
||||
// Window.Add 往 triggerChan 写.
|
||||
//
|
||||
// 修正分析:
|
||||
// Window.Add 是非阻塞的 (如果 triggerChan 不满).
|
||||
// CountingWindow triggerChan size = bufferSize = 1.
|
||||
// Worker 协程: 从 triggerChan 读 -> 处理 -> sendResult (到 OutputChan).
|
||||
//
|
||||
// d1: Worker读triggerChan -> OutputChan -> Stream -> WorkerPool -> Worker(busy).
|
||||
// d2: Worker读triggerChan -> OutputChan -> Stream -> Blocked on WorkerPool.
|
||||
// 此时 Stream 持有 d2. OutputChan 空.
|
||||
// Worker 协程 阻塞在 sendResult(d2)? No, Stream 取走了 d2, Stream 阻塞在 dispatch.
|
||||
// 所以 OutputChan 是空的!
|
||||
// Wait, Stream loop:
|
||||
// result := <-OutputChan. (Stream has d2).
|
||||
// handleResult(d2) -> Blocked.
|
||||
// So OutputChan is empty.
|
||||
// d3: Worker读triggerChan -> OutputChan (d3). Success.
|
||||
// OutputChan has d3.
|
||||
// d4: Worker读triggerChan -> OutputChan (d4). Blocked (OutputChan full).
|
||||
// Worker 协程 阻塞在 sendResult(d4).
|
||||
// d5: Add -> triggerChan (d5). Success (triggerChan size 1).
|
||||
// d6: Add -> triggerChan (d6). Blocked (triggerChan full).
|
||||
// Add blocks. DataProcessor blocks. Emit succeeds (dataChan).
|
||||
//
|
||||
// 所以 Window Worker 只有在 sendResult 阻塞时才触发 Drop logic.
|
||||
// sendResult 只有在 OutputChan 满且超时时才 Drop.
|
||||
//
|
||||
// d4 阻塞在 sendResult.
|
||||
// 100ms 后超时 -> Drop d4.
|
||||
// Worker 继续.
|
||||
//
|
||||
// 所以 d4 应该是被 Drop 的那个.
|
||||
// Sent: d1, d2, d3. (d5 在 triggerChan, d6 在 dataChan).
|
||||
// Wait, d5 is in triggerChan, not processed yet.
|
||||
// So Sent = 3. Dropped = 1 (d4).
|
||||
|
||||
for _, id := range []string{"d1", "d2", "d3", "d4", "d5"} {
|
||||
ssql.Emit(map[string]interface{}{"deviceId": id})
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
// 等待足够长的时间让 Stream 醒来并处理完,以及 Window 丢弃逻辑执行
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
|
||||
// 获取统计信息
|
||||
// d1: Stream 处理完
|
||||
// d2: Stream 处理完 (Worker 醒来后处理 d2)
|
||||
// d3: Dropped (Worker 阻塞 -> 超时)
|
||||
// d4: Dropped (Worker 阻塞 -> 超时)
|
||||
// d5: Dropped (Worker 阻塞 -> 超时)
|
||||
// Total Sent: 2 (d1, d2).
|
||||
// Dropped: 3 (d3, d4, d5).
|
||||
stats := ssql.stream.GetStats()
|
||||
assert.Equal(t, int64(3), stats["droppedCount"], "Should have 3 dropped window result due to overflow")
|
||||
assert.Equal(t, int64(2), stats["sentCount"], "Should have 2 sent window result")
|
||||
}
|
||||
|
||||
// TestSQLIntegration_StrategyDrop 测试 SQL 集成下的丢弃策略
|
||||
func TestSQLIntegration_StrategyDrop(t *testing.T) {
|
||||
// 配置:输出缓冲为 1,丢弃策略
|
||||
ssql := New(WithCustomPerformance(types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
DataChannelSize: 100,
|
||||
ResultChannelSize: 100,
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyDrop,
|
||||
},
|
||||
}))
|
||||
defer ssql.Stop()
|
||||
|
||||
// SQL: 每条数据触发一次窗口
|
||||
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
|
||||
err := ssql.Execute(rsql)
|
||||
require.NoError(t, err)
|
||||
|
||||
// 连续发送 3 条数据
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "d1"})
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "d2"})
|
||||
ssql.Emit(map[string]interface{}{"deviceId": "d3"})
|
||||
|
||||
// 等待处理完成
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// 对于 StrategyDrop,它会挤掉旧数据,所以 sentCount 应该持续增加
|
||||
stats := ssql.stream.GetStats()
|
||||
// d1, d2, d3 都会成功发送(虽然 d1, d2 可能被挤掉,但 sendResult 逻辑中挤掉旧的后写入新的算发送成功)
|
||||
assert.Equal(t, int64(3), stats["sentCount"])
|
||||
|
||||
// 验证最终留在缓冲区的是最后一条数据 (d3)
|
||||
// 注意:AddSink 会启动 worker 从 OutputChan 读。
|
||||
// 为了验证,我们直接从 Window 的 OutputChan 读
|
||||
select {
|
||||
case result := <-ssql.stream.Window.OutputChan():
|
||||
assert.Equal(t, "d3", result[0].Data.(map[string]interface{})["deviceId"])
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// 如果已经被 AddSink 的 worker 读走了也正常,但由于我们没加 Sink,所以应该在里面
|
||||
}
|
||||
}
|
||||
+1
-1
@@ -343,7 +343,7 @@ func TestStreamsqlDistinct(t *testing.T) {
|
||||
|
||||
// 等待窗口触发(处理时间模式)
|
||||
//fmt.Println("等待窗口初始化...")
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
|
||||
// 手动触发窗口
|
||||
//fmt.Println("手动触发窗口")
|
||||
|
||||
@@ -6,6 +6,15 @@ import (
|
||||
"github.com/rulego/streamsql/aggregator"
|
||||
)
|
||||
|
||||
const (
|
||||
// OverflowStrategyBlock blocks when buffer is full
|
||||
OverflowStrategyBlock = "block"
|
||||
// OverflowStrategyDrop drops data when buffer is full
|
||||
OverflowStrategyDrop = "drop"
|
||||
// OverflowStrategyExpand expands buffer when full (not implemented for windows yet)
|
||||
OverflowStrategyExpand = "expand"
|
||||
)
|
||||
|
||||
// Config stream processing configuration
|
||||
type Config struct {
|
||||
// SQL processing related configuration
|
||||
|
||||
+62
-25
@@ -22,6 +22,8 @@ import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/utils/cast"
|
||||
|
||||
@@ -75,12 +77,9 @@ func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error) {
|
||||
}
|
||||
|
||||
// Use unified performance config to get window output buffer size
|
||||
bufferSize := 100 // Default value, counting windows usually have smaller buffers
|
||||
if (config.PerformanceConfig != types.PerformanceConfig{}) {
|
||||
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize / 10 // Counting window uses 1/10 of buffer
|
||||
if bufferSize < 10 {
|
||||
bufferSize = 10 // Minimum value
|
||||
}
|
||||
bufferSize := 1000 // Default value
|
||||
if config.PerformanceConfig.BufferConfig.WindowOutputSize > 0 {
|
||||
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize
|
||||
}
|
||||
|
||||
cw := &CountingWindow{
|
||||
@@ -161,18 +160,7 @@ func (cw *CountingWindow) Start() {
|
||||
cw.callback(data)
|
||||
}
|
||||
|
||||
select {
|
||||
case cw.outputChan <- data:
|
||||
cw.mu.Lock()
|
||||
cw.sentCount++
|
||||
cw.mu.Unlock()
|
||||
case <-cw.ctx.Done():
|
||||
return
|
||||
default:
|
||||
cw.mu.Lock()
|
||||
cw.droppedCount++
|
||||
cw.mu.Unlock()
|
||||
}
|
||||
cw.sendResult(data)
|
||||
} else {
|
||||
cw.mu.Unlock()
|
||||
}
|
||||
@@ -184,6 +172,58 @@ func (cw *CountingWindow) Start() {
|
||||
}()
|
||||
}
|
||||
|
||||
func (cw *CountingWindow) sendResult(data []types.Row) {
|
||||
strategy := cw.config.PerformanceConfig.OverflowConfig.Strategy
|
||||
timeout := cw.config.PerformanceConfig.OverflowConfig.BlockTimeout
|
||||
|
||||
if strategy == types.OverflowStrategyBlock {
|
||||
if timeout <= 0 {
|
||||
timeout = 5 * time.Second
|
||||
}
|
||||
select {
|
||||
case cw.outputChan <- data:
|
||||
atomic.AddInt64(&cw.sentCount, 1)
|
||||
case <-time.After(timeout):
|
||||
// Timeout, check if data loss is allowed
|
||||
if cw.config.PerformanceConfig.OverflowConfig.AllowDataLoss {
|
||||
atomic.AddInt64(&cw.droppedCount, 1)
|
||||
// Drop new data (simplest fallback for block)
|
||||
} else {
|
||||
atomic.AddInt64(&cw.droppedCount, 1)
|
||||
}
|
||||
case <-cw.ctx.Done():
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Default: "drop" strategy (implemented as Drop Oldest / Smart Drop)
|
||||
// If the buffer is full, remove the oldest item to make space for the new item.
|
||||
// This ensures that we always keep the most recent data, which is usually preferred in streaming.
|
||||
select {
|
||||
case cw.outputChan <- data:
|
||||
atomic.AddInt64(&cw.sentCount, 1)
|
||||
case <-cw.ctx.Done():
|
||||
return
|
||||
default:
|
||||
// Try to drop oldest data to make room for new data
|
||||
select {
|
||||
case <-cw.outputChan:
|
||||
// Successfully dropped one old item
|
||||
select {
|
||||
case cw.outputChan <- data:
|
||||
atomic.AddInt64(&cw.sentCount, 1)
|
||||
default:
|
||||
// Still failed, drop current
|
||||
atomic.AddInt64(&cw.droppedCount, 1)
|
||||
}
|
||||
default:
|
||||
// Channel empty, try to send again or drop
|
||||
atomic.AddInt64(&cw.droppedCount, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cw *CountingWindow) Trigger() {
|
||||
// Note: trigger logic has been merged into Start method to avoid data races
|
||||
// This method is kept to satisfy Window interface requirements, but actual triggering is handled in Start method
|
||||
@@ -211,17 +251,14 @@ func (cw *CountingWindow) Reset() {
|
||||
cw.dataBuffer = nil
|
||||
cw.keyedBuffer = make(map[string][]types.Row)
|
||||
cw.keyedCount = make(map[string]int)
|
||||
cw.sentCount = 0
|
||||
cw.droppedCount = 0
|
||||
atomic.StoreInt64(&cw.sentCount, 0)
|
||||
atomic.StoreInt64(&cw.droppedCount, 0)
|
||||
}
|
||||
|
||||
func (cw *CountingWindow) GetStats() map[string]int64 {
|
||||
cw.mu.Lock()
|
||||
defer cw.mu.Unlock()
|
||||
|
||||
return map[string]int64{
|
||||
"sentCount": cw.sentCount,
|
||||
"droppedCount": cw.droppedCount,
|
||||
"sentCount": atomic.LoadInt64(&cw.sentCount),
|
||||
"droppedCount": atomic.LoadInt64(&cw.droppedCount),
|
||||
"bufferSize": int64(cap(cw.outputChan)),
|
||||
"bufferUsed": int64(len(cw.outputChan)),
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ type Window interface {
|
||||
OutputChan() <-chan []types.Row
|
||||
SetCallback(callback func([]types.Row))
|
||||
Trigger()
|
||||
GetStats() map[string]int64
|
||||
}
|
||||
|
||||
func CreateWindow(config types.WindowConfig) (Window, error) {
|
||||
|
||||
+100
-24
@@ -19,8 +19,10 @@ package window
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/types"
|
||||
@@ -59,6 +61,9 @@ type SessionWindow struct {
|
||||
watermark *Watermark
|
||||
// triggeredSessions stores sessions that have been triggered but are still open for late data (for EventTime with allowedLateness)
|
||||
triggeredSessions map[string]*sessionInfo
|
||||
// Performance statistics
|
||||
sentCount int64 // Number of successfully sent results
|
||||
droppedCount int64 // Number of dropped results
|
||||
}
|
||||
|
||||
// sessionInfo stores information about a triggered session that is still open for late data
|
||||
@@ -92,12 +97,9 @@ func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error) {
|
||||
}
|
||||
|
||||
// Use unified performance configuration to get window output buffer size
|
||||
bufferSize := 100 // Default value, session windows typically have smaller buffers
|
||||
if (config.PerformanceConfig != types.PerformanceConfig{}) {
|
||||
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize / 10 // Session window uses 1/10 of buffer
|
||||
if bufferSize < 10 {
|
||||
bufferSize = 10 // Minimum value
|
||||
}
|
||||
bufferSize := 1000 // Default value
|
||||
if config.PerformanceConfig.BufferConfig.WindowOutputSize > 0 {
|
||||
bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize
|
||||
}
|
||||
|
||||
// Determine time characteristic (default to ProcessingTime for backward compatibility)
|
||||
@@ -411,13 +413,67 @@ func (sw *SessionWindow) sendResults(resultsToSend [][]types.Row) {
|
||||
sw.callback(result)
|
||||
}
|
||||
|
||||
sw.sendResult(result)
|
||||
}
|
||||
}
|
||||
|
||||
func (sw *SessionWindow) sendResult(data []types.Row) {
|
||||
strategy := sw.config.PerformanceConfig.OverflowConfig.Strategy
|
||||
timeout := sw.config.PerformanceConfig.OverflowConfig.BlockTimeout
|
||||
|
||||
if strategy == types.OverflowStrategyBlock {
|
||||
if timeout <= 0 {
|
||||
timeout = 5 * time.Second
|
||||
}
|
||||
select {
|
||||
case sw.outputChan <- result:
|
||||
case sw.outputChan <- data:
|
||||
atomic.AddInt64(&sw.sentCount, 1)
|
||||
case <-time.After(timeout):
|
||||
atomic.AddInt64(&sw.droppedCount, 1)
|
||||
case <-sw.ctx.Done():
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Default: "drop" strategy (implemented as Drop Oldest / Smart Drop)
|
||||
// If the buffer is full, remove the oldest item to make space for the new item.
|
||||
// This ensures that we always keep the most recent data, which is usually preferred in streaming.
|
||||
select {
|
||||
case sw.outputChan <- data:
|
||||
atomic.AddInt64(&sw.sentCount, 1)
|
||||
default:
|
||||
// Try to drop oldest data
|
||||
select {
|
||||
case <-sw.outputChan:
|
||||
select {
|
||||
case sw.outputChan <- data:
|
||||
atomic.AddInt64(&sw.sentCount, 1)
|
||||
default:
|
||||
atomic.AddInt64(&sw.droppedCount, 1)
|
||||
}
|
||||
default:
|
||||
atomic.AddInt64(&sw.droppedCount, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetStats returns window performance statistics
|
||||
func (sw *SessionWindow) GetStats() map[string]int64 {
|
||||
return map[string]int64{
|
||||
"sentCount": atomic.LoadInt64(&sw.sentCount),
|
||||
"droppedCount": atomic.LoadInt64(&sw.droppedCount),
|
||||
"bufferSize": int64(cap(sw.outputChan)),
|
||||
"bufferUsed": int64(len(sw.outputChan)),
|
||||
}
|
||||
}
|
||||
|
||||
// ResetStats resets performance statistics
|
||||
func (sw *SessionWindow) ResetStats() {
|
||||
atomic.StoreInt64(&sw.sentCount, 0)
|
||||
atomic.StoreInt64(&sw.droppedCount, 0)
|
||||
}
|
||||
|
||||
// Trigger manually triggers all session windows
|
||||
func (sw *SessionWindow) Trigger() {
|
||||
sw.mu.Lock()
|
||||
@@ -450,13 +506,7 @@ func (sw *SessionWindow) Trigger() {
|
||||
sw.callback(result)
|
||||
}
|
||||
|
||||
// Non-blocking send to output channel
|
||||
select {
|
||||
case sw.outputChan <- result:
|
||||
// Successfully sent
|
||||
default:
|
||||
// Channel full, drop result (could add statistics here if needed)
|
||||
}
|
||||
sw.sendResult(result)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -550,13 +600,7 @@ func (sw *SessionWindow) triggerLateUpdateLocked(s *session) {
|
||||
callback(resultData)
|
||||
}
|
||||
|
||||
// Non-blocking send to output channel
|
||||
select {
|
||||
case sw.outputChan <- resultData:
|
||||
// Successfully sent
|
||||
default:
|
||||
// Channel full, drop result
|
||||
}
|
||||
sw.sendResult(resultData)
|
||||
|
||||
// Re-acquire lock
|
||||
sw.mu.Lock()
|
||||
@@ -578,12 +622,44 @@ func extractSessionCompositeKey(data interface{}, keys []string) string {
|
||||
if len(keys) == 0 {
|
||||
return "default"
|
||||
}
|
||||
parts := make([]string, 0, len(keys))
|
||||
|
||||
// Fast path for map[string]interface{}
|
||||
if m, ok := data.(map[string]interface{}); ok {
|
||||
parts := make([]string, 0, len(keys))
|
||||
for _, k := range keys {
|
||||
parts = append(parts, fmt.Sprintf("%v", m[k]))
|
||||
if val, exists := m[k]; exists {
|
||||
parts = append(parts, cast.ToString(val))
|
||||
} else {
|
||||
parts = append(parts, "")
|
||||
}
|
||||
}
|
||||
return strings.Join(parts, "|")
|
||||
}
|
||||
return "default"
|
||||
|
||||
// Use reflection for structs and other types
|
||||
v := reflect.ValueOf(data)
|
||||
if v.Kind() == reflect.Ptr {
|
||||
v = v.Elem()
|
||||
}
|
||||
|
||||
parts := make([]string, 0, len(keys))
|
||||
for _, k := range keys {
|
||||
var part string
|
||||
switch v.Kind() {
|
||||
case reflect.Map:
|
||||
if v.Type().Key().Kind() == reflect.String {
|
||||
mv := v.MapIndex(reflect.ValueOf(k))
|
||||
if mv.IsValid() {
|
||||
part = cast.ToString(mv.Interface())
|
||||
}
|
||||
}
|
||||
case reflect.Struct:
|
||||
f := v.FieldByName(k)
|
||||
if f.IsValid() {
|
||||
part = cast.ToString(f.Interface())
|
||||
}
|
||||
}
|
||||
parts = append(parts, part)
|
||||
}
|
||||
return strings.Join(parts, "|")
|
||||
}
|
||||
|
||||
+98
-107
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,204 @@
|
||||
package window
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/types"
|
||||
"github.com/rulego/streamsql/utils/cast"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestOverflowStrategies 测试不同的缓冲区溢出策略
|
||||
func TestOverflowStrategies(t *testing.T) {
|
||||
t.Run("CountingWindow_StrategyBlock_Timeout", func(t *testing.T) {
|
||||
// 配置:窗口大小1(每1条数据触发一次),输出缓冲1,阻塞策略,超时100ms
|
||||
config := types.WindowConfig{
|
||||
Type: "CountingWindow",
|
||||
Params: []interface{}{1}, // Threshold = 1
|
||||
PerformanceConfig: types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyBlock,
|
||||
BlockTimeout: 100 * time.Millisecond,
|
||||
AllowDataLoss: true, // 允许丢弃统计
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
win, err := NewCountingWindow(config)
|
||||
require.NoError(t, err)
|
||||
win.Start()
|
||||
defer win.Stop()
|
||||
|
||||
// 1. 发送第1条数据,触发窗口,填充 outputChan (容量1)
|
||||
win.Add(map[string]interface{}{"id": 1})
|
||||
|
||||
// 等待处理
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
stats := win.GetStats()
|
||||
assert.Equal(t, int64(1), stats["sentCount"])
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"]) // 应该还在缓冲区中,因为没人读
|
||||
|
||||
// 2. 发送第2条数据,触发窗口
|
||||
// 此时 outputChan 已满,sendResult 应该阻塞 100ms 然后超时丢弃
|
||||
win.Add(map[string]interface{}{"id": 2})
|
||||
|
||||
// 等待超时 (100ms) + 处理时间
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
stats = win.GetStats()
|
||||
// 第1条仍在缓冲区(因为没人读)
|
||||
// 第2条因为阻塞超时被丢弃
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"])
|
||||
assert.Equal(t, int64(1), stats["droppedCount"])
|
||||
|
||||
// 3. 读取缓冲区中的数据,腾出空间
|
||||
select {
|
||||
case <-win.OutputChan():
|
||||
// 读出第1条
|
||||
default:
|
||||
t.Fatal("expected data in output channel")
|
||||
}
|
||||
|
||||
// 4. 发送第3条数据
|
||||
win.Add(map[string]interface{}{"id": 3})
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
stats = win.GetStats()
|
||||
assert.Equal(t, int64(2), stats["sentCount"]) // 第1条和第3条发送成功
|
||||
assert.Equal(t, int64(1), stats["droppedCount"]) // 第2条丢弃
|
||||
})
|
||||
|
||||
t.Run("SessionWindow_StrategyBlock_Timeout", func(t *testing.T) {
|
||||
// 配置:会话超时50ms,输出缓冲1,阻塞策略,超时50ms
|
||||
config := types.WindowConfig{
|
||||
Type: "SessionWindow",
|
||||
Params: []interface{}{"50ms"},
|
||||
PerformanceConfig: types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyBlock,
|
||||
BlockTimeout: 50 * time.Millisecond,
|
||||
AllowDataLoss: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
win, err := NewSessionWindow(config)
|
||||
require.NoError(t, err)
|
||||
win.Start()
|
||||
defer win.Stop()
|
||||
|
||||
// 1. 发送数据,开始一个 session
|
||||
win.Add(map[string]interface{}{"id": 1})
|
||||
|
||||
// 2. 等待 session 超时 (50ms) + 检查周期 (timeout/2 = 25ms)
|
||||
// 确保 session 被触发并发送到 outputChan
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
stats := win.GetStats()
|
||||
assert.Equal(t, int64(1), stats["sentCount"])
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"])
|
||||
|
||||
// 3. 发送数据开始第二个 session (因为上一个已经结束)
|
||||
win.Add(map[string]interface{}{"id": 2})
|
||||
|
||||
// 4. 等待 session 超时
|
||||
// 此时 outputChan 已满,应该阻塞并丢弃
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
stats = win.GetStats()
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"])
|
||||
assert.Equal(t, int64(1), stats["droppedCount"])
|
||||
})
|
||||
|
||||
t.Run("CountingWindow_StrategyDrop", func(t *testing.T) {
|
||||
// 配置:窗口大小1,输出缓冲1,丢弃策略
|
||||
config := types.WindowConfig{
|
||||
Type: "CountingWindow",
|
||||
Params: []interface{}{1},
|
||||
PerformanceConfig: types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyDrop,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
win, err := NewCountingWindow(config)
|
||||
require.NoError(t, err)
|
||||
win.Start()
|
||||
defer win.Stop()
|
||||
|
||||
// 1. 发送第1条数据,填充 outputChan
|
||||
win.Add(map[string]interface{}{"id": 1})
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// 2. 发送第2条数据
|
||||
// outputChan 已满,StrategyDrop 会尝试丢弃旧数据(outputChan头部)来放入新数据
|
||||
win.Add(map[string]interface{}{"id": 2})
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
stats := win.GetStats()
|
||||
assert.Equal(t, int64(2), stats["sentCount"])
|
||||
|
||||
// 验证现在缓冲区里是第2条数据
|
||||
select {
|
||||
case data := <-win.OutputChan():
|
||||
assert.Len(t, data, 1)
|
||||
assert.Equal(t, 2, cast.ToInt(data[0].Data.(map[string]interface{})["id"]))
|
||||
default:
|
||||
t.Fatal("expected data in output channel")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("TumblingWindow_StrategyBlock_Timeout", func(t *testing.T) {
|
||||
// 配置:窗口大小50ms,输出缓冲1,阻塞策略,超时50ms
|
||||
config := types.WindowConfig{
|
||||
Type: "TumblingWindow",
|
||||
Params: []interface{}{"50ms"},
|
||||
PerformanceConfig: types.PerformanceConfig{
|
||||
BufferConfig: types.BufferConfig{
|
||||
WindowOutputSize: 1,
|
||||
},
|
||||
OverflowConfig: types.OverflowConfig{
|
||||
Strategy: types.OverflowStrategyBlock,
|
||||
BlockTimeout: 50 * time.Millisecond,
|
||||
AllowDataLoss: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
win, err := NewTumblingWindow(config)
|
||||
require.NoError(t, err)
|
||||
win.Start()
|
||||
defer win.Stop()
|
||||
|
||||
// 1. 发送数据触发第1个窗口
|
||||
win.Add(map[string]interface{}{"id": 1})
|
||||
// 等待窗口触发 (50ms)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
stats := win.GetStats()
|
||||
assert.Equal(t, int64(1), stats["sentCount"])
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"])
|
||||
|
||||
// 2. 发送数据触发第2个窗口
|
||||
// 由于没有读取 outputChan,第2个窗口触发时应该阻塞然后超时
|
||||
win.Add(map[string]interface{}{"id": 2})
|
||||
// 等待窗口触发 (50ms) + 阻塞超时 (50ms)
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
stats = win.GetStats()
|
||||
assert.Equal(t, int64(1), stats["bufferUsed"]) // 仍然只有第1个窗口的数据
|
||||
assert.Equal(t, int64(1), stats["droppedCount"]) // 第2个窗口结果被丢弃
|
||||
})
|
||||
}
|
||||
+72
-96
@@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rulego/streamsql/types"
|
||||
@@ -435,14 +436,7 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
windowEnd := tw.currentSlot.End
|
||||
windowStart := tw.currentSlot.Start
|
||||
// Trigger if watermark >= windowEnd
|
||||
// In Flink, windows are triggered when watermark >= windowEnd.
|
||||
// Watermark calculation: watermark = maxEventTime - maxOutOfOrderness
|
||||
// So watermark >= windowEnd means: maxEventTime - maxOutOfOrderness >= windowEnd
|
||||
// Which means: maxEventTime >= windowEnd + maxOutOfOrderness
|
||||
// This ensures all data for the window has arrived (within maxOutOfOrderness tolerance)
|
||||
// Check if watermark >= windowEnd
|
||||
// Use !Before() instead of After() to include equality case
|
||||
// This is equivalent to watermarkTime >= windowEnd
|
||||
shouldTrigger := !watermarkTime.Before(*windowEnd)
|
||||
|
||||
debugLog("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
|
||||
@@ -495,7 +489,18 @@ func (tw *TumblingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
|
||||
|
||||
debugLog("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
|
||||
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
|
||||
tw.triggerWindowLocked()
|
||||
|
||||
resultData := tw.extractWindowDataLocked()
|
||||
if len(resultData) > 0 {
|
||||
callback := tw.callback
|
||||
tw.mu.Unlock()
|
||||
if callback != nil {
|
||||
callback(resultData)
|
||||
}
|
||||
tw.sendResult(resultData)
|
||||
tw.mu.Lock()
|
||||
}
|
||||
|
||||
triggeredCount++
|
||||
debugLog("checkAndTriggerWindows: window triggered successfully, triggeredCount=%d", triggeredCount)
|
||||
// triggerWindowLocked releases and re-acquires lock, so we need to re-check state
|
||||
@@ -579,15 +584,24 @@ func (tw *TumblingWindow) handleLateData(eventTime time.Time, allowedLateness ti
|
||||
if info.slot.Contains(eventTime) {
|
||||
// This late data belongs to a triggered window that's still open
|
||||
// Trigger window again with updated data (late update)
|
||||
tw.triggerLateUpdateLocked(info.slot)
|
||||
resultData := tw.extractLateUpdateDataLocked(info.slot)
|
||||
if len(resultData) > 0 {
|
||||
callback := tw.callback
|
||||
tw.mu.Unlock()
|
||||
if callback != nil {
|
||||
callback(resultData)
|
||||
}
|
||||
tw.sendResult(resultData)
|
||||
tw.mu.Lock()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// triggerLateUpdateLocked triggers a late update for a window (must be called with lock held)
|
||||
// extractLateUpdateDataLocked extracts late update data for a window (must be called with lock held)
|
||||
// This implements Flink-like behavior: late updates include complete window data (original + late data)
|
||||
func (tw *TumblingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
|
||||
func (tw *TumblingWindow) extractLateUpdateDataLocked(slot *types.TimeSlot) []types.Row {
|
||||
// Find the triggered window info to get snapshot data
|
||||
var windowInfo *triggeredWindowInfo
|
||||
windowKey := tw.getWindowKey(*slot.End)
|
||||
@@ -619,7 +633,7 @@ func (tw *TumblingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
|
||||
}
|
||||
|
||||
if len(resultData) == 0 {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update snapshot to include late data (for future late updates)
|
||||
@@ -635,34 +649,7 @@ func (tw *TumblingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
|
||||
}
|
||||
}
|
||||
|
||||
// Get callback reference before releasing lock
|
||||
callback := tw.callback
|
||||
|
||||
// Release lock before calling callback and sending to channel to avoid blocking
|
||||
tw.mu.Unlock()
|
||||
|
||||
if callback != nil {
|
||||
callback(resultData)
|
||||
}
|
||||
|
||||
// Non-blocking send to output channel and update statistics
|
||||
var sent bool
|
||||
select {
|
||||
case tw.outputChan <- resultData:
|
||||
// Successfully sent
|
||||
sent = true
|
||||
default:
|
||||
// Channel full, drop result
|
||||
sent = false
|
||||
}
|
||||
|
||||
// Re-acquire lock to update statistics
|
||||
tw.mu.Lock()
|
||||
if sent {
|
||||
tw.sentCount++
|
||||
} else {
|
||||
tw.droppedCount++
|
||||
}
|
||||
return resultData
|
||||
}
|
||||
|
||||
// getWindowKey generates a key for a window based on its end time
|
||||
@@ -670,10 +657,10 @@ func (tw *TumblingWindow) getWindowKey(endTime time.Time) string {
|
||||
return fmt.Sprintf("%d", endTime.UnixNano())
|
||||
}
|
||||
|
||||
// triggerWindowLocked triggers the window (must be called with lock held)
|
||||
func (tw *TumblingWindow) triggerWindowLocked() {
|
||||
// extractWindowDataLocked extracts current window data (must be called with lock held)
|
||||
func (tw *TumblingWindow) extractWindowDataLocked() []types.Row {
|
||||
if tw.currentSlot == nil {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Extract current window data
|
||||
@@ -688,7 +675,7 @@ func (tw *TumblingWindow) triggerWindowLocked() {
|
||||
// Skip triggering if window has no data
|
||||
// This prevents empty windows from being triggered
|
||||
if len(resultData) == 0 {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove data that belongs to current window
|
||||
@@ -700,33 +687,47 @@ func (tw *TumblingWindow) triggerWindowLocked() {
|
||||
}
|
||||
tw.data = newData
|
||||
|
||||
// Get callback reference before releasing lock
|
||||
callback := tw.callback
|
||||
return resultData
|
||||
}
|
||||
|
||||
// Release lock before calling callback and sending to channel to avoid blocking
|
||||
tw.mu.Unlock()
|
||||
func (tw *TumblingWindow) sendResult(data []types.Row) {
|
||||
strategy := tw.config.PerformanceConfig.OverflowConfig.Strategy
|
||||
timeout := tw.config.PerformanceConfig.OverflowConfig.BlockTimeout
|
||||
|
||||
if callback != nil {
|
||||
callback(resultData)
|
||||
if strategy == types.OverflowStrategyBlock {
|
||||
if timeout <= 0 {
|
||||
timeout = 5 * time.Second
|
||||
}
|
||||
select {
|
||||
case tw.outputChan <- data:
|
||||
atomic.AddInt64(&tw.sentCount, 1)
|
||||
case <-time.After(timeout):
|
||||
atomic.AddInt64(&tw.droppedCount, 1)
|
||||
case <-tw.ctx.Done():
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Non-blocking send to output channel and update statistics
|
||||
var sent bool
|
||||
// Default: "drop" strategy (implemented as Drop Oldest / Smart Drop)
|
||||
// If the buffer is full, remove the oldest item to make space for the new item.
|
||||
// This ensures that we always keep the most recent data, which is usually preferred in streaming.
|
||||
select {
|
||||
case tw.outputChan <- resultData:
|
||||
// Successfully sent
|
||||
sent = true
|
||||
case tw.outputChan <- data:
|
||||
atomic.AddInt64(&tw.sentCount, 1)
|
||||
default:
|
||||
// Channel full, drop result
|
||||
sent = false
|
||||
}
|
||||
|
||||
// Re-acquire lock to update statistics
|
||||
tw.mu.Lock()
|
||||
if sent {
|
||||
tw.sentCount++
|
||||
} else {
|
||||
tw.droppedCount++
|
||||
// Try to drop oldest data
|
||||
select {
|
||||
case <-tw.outputChan:
|
||||
select {
|
||||
case tw.outputChan <- data:
|
||||
atomic.AddInt64(&tw.sentCount, 1)
|
||||
default:
|
||||
atomic.AddInt64(&tw.droppedCount, 1)
|
||||
}
|
||||
default:
|
||||
atomic.AddInt64(&tw.droppedCount, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -801,27 +802,8 @@ func (tw *TumblingWindow) Trigger() {
|
||||
callback(resultData)
|
||||
}
|
||||
|
||||
// Non-blocking send to output channel and update statistics
|
||||
var sent bool
|
||||
select {
|
||||
case tw.outputChan <- resultData:
|
||||
// Successfully sent
|
||||
sent = true
|
||||
default:
|
||||
// Channel full, drop result
|
||||
sent = false
|
||||
}
|
||||
|
||||
// Re-acquire lock to update statistics
|
||||
tw.mu.Lock()
|
||||
if sent {
|
||||
tw.sentCount++
|
||||
} else {
|
||||
tw.droppedCount++
|
||||
// Optional: add logging here
|
||||
// log.Printf("Window output channel full, dropped result with %d rows", len(resultData))
|
||||
}
|
||||
tw.mu.Unlock()
|
||||
// Use sendResult to respect overflow strategy
|
||||
tw.sendResult(resultData)
|
||||
}
|
||||
|
||||
// Reset resets tumbling window data
|
||||
@@ -889,12 +871,9 @@ func (tw *TumblingWindow) SetCallback(callback func([]types.Row)) {
|
||||
|
||||
// GetStats returns window performance statistics
|
||||
func (tw *TumblingWindow) GetStats() map[string]int64 {
|
||||
tw.mu.RLock()
|
||||
defer tw.mu.RUnlock()
|
||||
|
||||
return map[string]int64{
|
||||
"sentCount": tw.sentCount,
|
||||
"droppedCount": tw.droppedCount,
|
||||
"sentCount": atomic.LoadInt64(&tw.sentCount),
|
||||
"droppedCount": atomic.LoadInt64(&tw.droppedCount),
|
||||
"bufferSize": int64(cap(tw.outputChan)),
|
||||
"bufferUsed": int64(len(tw.outputChan)),
|
||||
}
|
||||
@@ -902,9 +881,6 @@ func (tw *TumblingWindow) GetStats() map[string]int64 {
|
||||
|
||||
// ResetStats resets performance statistics
|
||||
func (tw *TumblingWindow) ResetStats() {
|
||||
tw.mu.Lock()
|
||||
defer tw.mu.Unlock()
|
||||
|
||||
tw.sentCount = 0
|
||||
tw.droppedCount = 0
|
||||
atomic.StoreInt64(&tw.sentCount, 0)
|
||||
atomic.StoreInt64(&tw.droppedCount, 0)
|
||||
}
|
||||
|
||||
@@ -584,7 +584,7 @@ func TestWindowWithPerformanceConfig(t *testing.T) {
|
||||
name: "计数窗口-高性能配置",
|
||||
windowType: TypeCounting,
|
||||
performanceConfig: types.HighPerformanceConfig(),
|
||||
expectedBufferSize: 20, // 200 / 10
|
||||
expectedBufferSize: 200,
|
||||
extraParams: map[string]interface{}{"count": 10},
|
||||
},
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user