Merge pull request #25 from rulego/dev

perf:优化stream
This commit is contained in:
Whki
2025-07-25 10:54:42 +08:00
committed by GitHub
+149 -47
View File
@@ -21,6 +21,15 @@ import (
"github.com/rulego/streamsql/window"
)
// fieldProcessInfo 字段处理信息,用于缓存预编译的字段处理逻辑
type fieldProcessInfo struct {
fieldName string // 原始字段名
outputName string // 输出字段名
isFunctionCall bool // 是否为函数调用
hasNestedField bool // 是否包含嵌套字段
isSelectAll bool // 是否为SELECT *
}
type Stream struct {
dataChan chan interface{}
filter condition.Condition
@@ -52,6 +61,13 @@ type Stream struct {
blockingTimeout time.Duration // 阻塞超时时间
overflowStrategy string // 溢出策略: "drop", "block", "expand", "persist"
persistenceManager *PersistenceManager // 持久化管理器
// 性能优化:预编译的AddData函数指针,避免每次switch判断
addDataFunc func(interface{}) // 根据策略预设的函数指针
// 性能优化:预编译字段处理信息,避免重复解析
compiledFieldInfo map[string]*fieldProcessInfo // 字段处理信息缓存
}
// NewStream 使用统一配置创建Stream
@@ -138,6 +154,21 @@ func newStreamWithUnifiedConfig(config types.Config) (*Stream, error) {
}
}
// 性能优化:根据溢出策略预设AddData函数指针,避免运行时switch判断
switch perfConfig.OverflowConfig.Strategy {
case "block":
stream.addDataFunc = stream.addDataBlocking
case "expand":
stream.addDataFunc = stream.addDataWithExpansion
case "persist":
stream.addDataFunc = stream.addDataWithPersistence
default:
stream.addDataFunc = stream.addDataWithDrop
}
// 性能优化:预编译字段处理信息
stream.compileFieldProcessInfo()
// 启动工作协程,使用配置的工作线程数
go stream.startSinkWorkerPool(perfConfig.WorkerConfig.SinkWorkerCount)
go stream.startResultConsumer()
@@ -241,6 +272,36 @@ func convertToAggregationFields(selectFields map[string]aggregator.AggregateType
return fields
}
// compileFieldProcessInfo 预编译字段处理信息,避免运行时重复解析
func (s *Stream) compileFieldProcessInfo() {
s.compiledFieldInfo = make(map[string]*fieldProcessInfo)
// 编译SimpleFields信息
for _, fieldSpec := range s.config.SimpleFields {
info := &fieldProcessInfo{}
if fieldSpec == "*" {
info.isSelectAll = true
info.fieldName = "*"
info.outputName = "*"
} else {
// 解析别名
parts := strings.Split(fieldSpec, ":")
info.fieldName = parts[0]
info.outputName = info.fieldName
if len(parts) > 1 {
info.outputName = parts[1]
}
// 预判断字段特征
info.isFunctionCall = strings.Contains(info.fieldName, "(") && strings.Contains(info.fieldName, ")")
info.hasNestedField = !info.isFunctionCall && fieldpath.IsNestedField(info.fieldName)
}
s.compiledFieldInfo[fieldSpec] = info
}
}
func (s *Stream) Start() {
// 启动处理协程
go s.process()
@@ -543,8 +604,12 @@ func (s *Stream) processDirectData(data interface{}) {
return
}
// 创建结果map
result := make(map[string]interface{})
// 创建结果map,预分配合适容量
estimatedSize := len(s.config.FieldExpressions) + len(s.config.SimpleFields)
if estimatedSize < 8 {
estimatedSize = 8 // 最小容量
}
result := make(map[string]interface{}, estimatedSize)
// 处理表达式字段
for fieldName, fieldExpr := range s.config.FieldExpressions {
@@ -629,14 +694,19 @@ func (s *Stream) processDirectData(data interface{}) {
result[fieldName] = evalResult
}
// 如果指定了字段,只保留这些字段
// 性能优化:使用预编译的字段信息处理SimpleFields
if len(s.config.SimpleFields) > 0 {
for _, fieldSpec := range s.config.SimpleFields {
// 处理SELECT *的特殊情况
if fieldSpec == "*" {
// SELECT *:返回所有字段,但跳过已经通过表达式字段处理的字段
info := s.compiledFieldInfo[fieldSpec]
if info == nil {
// 如果没有预编译信息,回退到原逻辑(安全性保证)
s.processSingleFieldFallback(fieldSpec, dataMap, data, result)
continue
}
if info.isSelectAll {
// SELECT *:批量复制所有字段,跳过表达式字段
for k, v := range dataMap {
// 如果该字段已经通过表达式字段处理,则跳过,保持表达式计算结果
if _, isExpression := s.config.FieldExpressions[k]; !isExpression {
result[k] = v
}
@@ -644,43 +714,34 @@ func (s *Stream) processDirectData(data interface{}) {
continue
}
// 处理别名
parts := strings.Split(fieldSpec, ":")
fieldName := parts[0]
outputName := fieldName
if len(parts) > 1 {
outputName = parts[1]
}
// 跳过已经通过表达式字段处理的字段
if _, isExpression := s.config.FieldExpressions[outputName]; isExpression {
if _, isExpression := s.config.FieldExpressions[info.outputName]; isExpression {
continue
}
// 检查是否是函数调用
if strings.Contains(fieldName, "(") && strings.Contains(fieldName, ")") {
if info.isFunctionCall {
// 执行函数调用
if funcResult, err := s.executeFunction(fieldName, dataMap); err == nil {
result[outputName] = funcResult
if funcResult, err := s.executeFunction(info.fieldName, dataMap); err == nil {
result[info.outputName] = funcResult
} else {
logger.Error("Function execution error %s: %v", fieldName, err)
result[outputName] = nil
logger.Error("Function execution error %s: %v", info.fieldName, err)
result[info.outputName] = nil
}
} else {
// 普通字段 - 支持嵌套字段
// 普通字段处理
var value interface{}
var exists bool
if fieldpath.IsNestedField(fieldName) {
value, exists = fieldpath.GetNestedField(data, fieldName)
if info.hasNestedField {
value, exists = fieldpath.GetNestedField(data, info.fieldName)
} else {
value, exists = dataMap[fieldName]
value, exists = dataMap[info.fieldName]
}
if exists {
result[outputName] = value
result[info.outputName] = value
} else {
result[outputName] = nil
result[info.outputName] = nil
}
}
}
@@ -701,6 +762,61 @@ func (s *Stream) processDirectData(data interface{}) {
s.callSinksAsync(results)
}
// processSingleFieldFallback 回退处理单个字段(当预编译信息缺失时)
func (s *Stream) processSingleFieldFallback(fieldSpec string, dataMap map[string]interface{}, data interface{}, result map[string]interface{}) {
// 处理SELECT *的特殊情况
if fieldSpec == "*" {
// SELECT *:返回所有字段,但跳过已经通过表达式字段处理的字段
for k, v := range dataMap {
// 如果该字段已经通过表达式字段处理,则跳过,保持表达式计算结果
if _, isExpression := s.config.FieldExpressions[k]; !isExpression {
result[k] = v
}
}
return
}
// 处理别名
parts := strings.Split(fieldSpec, ":")
fieldName := parts[0]
outputName := fieldName
if len(parts) > 1 {
outputName = parts[1]
}
// 跳过已经通过表达式字段处理的字段
if _, isExpression := s.config.FieldExpressions[outputName]; isExpression {
return
}
// 检查是否是函数调用
if strings.Contains(fieldName, "(") && strings.Contains(fieldName, ")") {
// 执行函数调用
if funcResult, err := s.executeFunction(fieldName, dataMap); err == nil {
result[outputName] = funcResult
} else {
logger.Error("Function execution error %s: %v", fieldName, err)
result[outputName] = nil
}
} else {
// 普通字段 - 支持嵌套字段
var value interface{}
var exists bool
if fieldpath.IsNestedField(fieldName) {
value, exists = fieldpath.GetNestedField(data, fieldName)
} else {
value, exists = dataMap[fieldName]
}
if exists {
result[outputName] = value
} else {
result[outputName] = nil
}
}
}
// sendResultNonBlocking 非阻塞方式发送结果到resultChan (智能背压控制)
func (s *Stream) sendResultNonBlocking(results []map[string]interface{}) {
select {
@@ -939,22 +1055,8 @@ func (s *Stream) smartSplitArgs(argsStr string) ([]string, error) {
func (s *Stream) AddData(data interface{}) {
atomic.AddInt64(&s.inputCount, 1)
// 根据溢出策略处理数据
switch s.overflowStrategy {
case "block":
// 阻塞模式:保证数据不丢失
s.addDataBlocking(data)
case "expand":
// 动态扩容模式:自动扩大缓冲区
s.addDataWithExpansion(data)
case "persist":
// 持久化模式:溢出数据写入磁盘
s.addDataWithPersistence(data)
default:
// 默认drop模式:原有逻辑
s.addDataWithDrop(data)
}
// 性能优化:直接调用预编译的函数指针,避免switch判断
s.addDataFunc(data)
}
// addDataBlocking 阻塞模式添加数据,保证零数据丢失 (线程安全版本)
@@ -996,7 +1098,7 @@ func (s *Stream) addDataWithExpansion(data interface{}) {
// 扩容后重试,重新获取通道引用
if s.safeSendToDataChan(data) {
logger.Info("Successfully added data after data channel expansion")
logger.Debug("Successfully added data after data channel expansion")
return
}
@@ -1268,7 +1370,7 @@ func (s *Stream) expandDataChannel() {
newCap = oldCap + 1000 // 至少增加1000
}
logger.Info("Dynamic expansion of data channel: %d -> %d", oldCap, newCap)
logger.Debug("Dynamic expansion of data channel: %d -> %d", oldCap, newCap)
// 创建新的更大的通道
newChan := make(chan interface{}, newCap)
@@ -1306,7 +1408,7 @@ migration_done:
s.dataChan = newChan
s.dataChanMux.Unlock()
logger.Info("Channel expansion completed: migrated %d items", migratedCount)
logger.Debug("Channel expansion completed: migrated %d items", migratedCount)
}
// persistAndRetryData 持久化数据并重试 (改进版本,具备指数退避和资源控制)