mirror of
https://gitee.com/rulego/streamsql.git
synced 2026-03-15 14:57:24 +00:00
151 lines
5.7 KiB
Go
151 lines
5.7 KiB
Go
package streamsql
|
||
|
||
import (
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/rulego/streamsql/types"
|
||
"github.com/stretchr/testify/assert"
|
||
"github.com/stretchr/testify/require"
|
||
)
|
||
|
||
// TestSQLIntegration_StrategyBlock 测试 SQL 集成下的阻塞策略
|
||
func TestSQLIntegration_StrategyBlock(t *testing.T) {
|
||
// 配置:输出缓冲为 1,阻塞策略,超时 100ms
|
||
ssql := New(WithCustomPerformance(types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 100,
|
||
ResultChannelSize: 100,
|
||
WindowOutputSize: 1,
|
||
},
|
||
OverflowConfig: types.OverflowConfig{
|
||
Strategy: types.OverflowStrategyBlock,
|
||
BlockTimeout: 100 * time.Millisecond,
|
||
AllowDataLoss: true,
|
||
},
|
||
WorkerConfig: types.WorkerConfig{
|
||
SinkPoolSize: 0, // 无缓冲任务队列
|
||
SinkWorkerCount: 1, // 1个 worker
|
||
},
|
||
}))
|
||
defer ssql.Stop()
|
||
|
||
// SQL: 每条数据触发一次窗口
|
||
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
|
||
err := ssql.Execute(rsql)
|
||
require.NoError(t, err)
|
||
|
||
// 添加同步 Sink 阻塞 Stream 处理,从而反压 Window
|
||
// 注意:必须在 Execute 之后添加,因为 Execute 才会创建 stream
|
||
ssql.AddSyncSink(func(results []map[string]interface{}) {
|
||
time.Sleep(500 * time.Millisecond)
|
||
})
|
||
|
||
// 发送 5 条数据
|
||
// d1: Worker 处理中 (阻塞 500ms)
|
||
// d2: Stream 尝试写入 WorkerPool -> 阻塞 (无缓冲)
|
||
// d3: Window OutputChan (size 1) -> 填满
|
||
// d4: Window OutputChan 满 -> 尝试写入 -> 阻塞 (Window Add) -> 放入 TriggerChan (size=1)
|
||
// d5: Window Add -> TriggerChan 满 -> 阻塞? No, Emit 是异步的?
|
||
// Emit 往 dataChan 写. DataProcessor 读 dataChan -> Window.Add.
|
||
// Window.Add 往 triggerChan 写.
|
||
//
|
||
// 修正分析:
|
||
// Window.Add 是非阻塞的 (如果 triggerChan 不满).
|
||
// CountingWindow triggerChan size = bufferSize = 1.
|
||
// Worker 协程: 从 triggerChan 读 -> 处理 -> sendResult (到 OutputChan).
|
||
//
|
||
// d1: Worker读triggerChan -> OutputChan -> Stream -> WorkerPool -> Worker(busy).
|
||
// d2: Worker读triggerChan -> OutputChan -> Stream -> Blocked on WorkerPool.
|
||
// 此时 Stream 持有 d2. OutputChan 空.
|
||
// Worker 协程 阻塞在 sendResult(d2)? No, Stream 取走了 d2, Stream 阻塞在 dispatch.
|
||
// 所以 OutputChan 是空的!
|
||
// Wait, Stream loop:
|
||
// result := <-OutputChan. (Stream has d2).
|
||
// handleResult(d2) -> Blocked.
|
||
// So OutputChan is empty.
|
||
// d3: Worker读triggerChan -> OutputChan (d3). Success.
|
||
// OutputChan has d3.
|
||
// d4: Worker读triggerChan -> OutputChan (d4). Blocked (OutputChan full).
|
||
// Worker 协程 阻塞在 sendResult(d4).
|
||
// d5: Add -> triggerChan (d5). Success (triggerChan size 1).
|
||
// d6: Add -> triggerChan (d6). Blocked (triggerChan full).
|
||
// Add blocks. DataProcessor blocks. Emit succeeds (dataChan).
|
||
//
|
||
// 所以 Window Worker 只有在 sendResult 阻塞时才触发 Drop logic.
|
||
// sendResult 只有在 OutputChan 满且超时时才 Drop.
|
||
//
|
||
// d4 阻塞在 sendResult.
|
||
// 100ms 后超时 -> Drop d4.
|
||
// Worker 继续.
|
||
//
|
||
// 所以 d4 应该是被 Drop 的那个.
|
||
// Sent: d1, d2, d3. (d5 在 triggerChan, d6 在 dataChan).
|
||
// Wait, d5 is in triggerChan, not processed yet.
|
||
// So Sent = 3. Dropped = 1 (d4).
|
||
|
||
for _, id := range []string{"d1", "d2", "d3", "d4", "d5"} {
|
||
ssql.Emit(map[string]interface{}{"deviceId": id})
|
||
time.Sleep(10 * time.Millisecond)
|
||
}
|
||
|
||
// 等待足够长的时间让 Stream 醒来并处理完,以及 Window 丢弃逻辑执行
|
||
time.Sleep(1000 * time.Millisecond)
|
||
|
||
// 获取统计信息
|
||
// d1: Stream 处理完
|
||
// d2: Stream 处理完 (Worker 醒来后处理 d2)
|
||
// d3: Dropped (Worker 阻塞 -> 超时)
|
||
// d4: Dropped (Worker 阻塞 -> 超时)
|
||
// d5: Dropped (Worker 阻塞 -> 超时)
|
||
// Total Sent: 2 (d1, d2).
|
||
// Dropped: 3 (d3, d4, d5).
|
||
stats := ssql.stream.GetStats()
|
||
assert.Equal(t, int64(3), stats["droppedCount"], "Should have 3 dropped window result due to overflow")
|
||
assert.Equal(t, int64(2), stats["sentCount"], "Should have 2 sent window result")
|
||
}
|
||
|
||
// TestSQLIntegration_StrategyDrop 测试 SQL 集成下的丢弃策略
|
||
func TestSQLIntegration_StrategyDrop(t *testing.T) {
|
||
// 配置:输出缓冲为 1,丢弃策略
|
||
ssql := New(WithCustomPerformance(types.PerformanceConfig{
|
||
BufferConfig: types.BufferConfig{
|
||
DataChannelSize: 100,
|
||
ResultChannelSize: 100,
|
||
WindowOutputSize: 1,
|
||
},
|
||
OverflowConfig: types.OverflowConfig{
|
||
Strategy: types.OverflowStrategyDrop,
|
||
},
|
||
}))
|
||
defer ssql.Stop()
|
||
|
||
// SQL: 每条数据触发一次窗口
|
||
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
|
||
err := ssql.Execute(rsql)
|
||
require.NoError(t, err)
|
||
|
||
// 连续发送 3 条数据
|
||
ssql.Emit(map[string]interface{}{"deviceId": "d1"})
|
||
ssql.Emit(map[string]interface{}{"deviceId": "d2"})
|
||
ssql.Emit(map[string]interface{}{"deviceId": "d3"})
|
||
|
||
// 等待处理完成
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
// 对于 StrategyDrop,它会挤掉旧数据,所以 sentCount 应该持续增加
|
||
stats := ssql.stream.GetStats()
|
||
// d1, d2, d3 都会成功发送(虽然 d1, d2 可能被挤掉,但 sendResult 逻辑中挤掉旧的后写入新的算发送成功)
|
||
assert.Equal(t, int64(3), stats["sentCount"])
|
||
|
||
// 验证最终留在缓冲区的是最后一条数据 (d3)
|
||
// 注意:AddSink 会启动 worker 从 OutputChan 读。
|
||
// 为了验证,我们直接从 Window 的 OutputChan 读
|
||
select {
|
||
case result := <-ssql.stream.Window.OutputChan():
|
||
assert.Equal(t, "d3", result[0].Data.(map[string]interface{})["deviceId"])
|
||
case <-time.After(100 * time.Millisecond):
|
||
// 如果已经被 AddSink 的 worker 读走了也正常,但由于我们没加 Sink,所以应该在里面
|
||
}
|
||
}
|