diff --git a/builtin/function.go b/builtin/function.go index 3b8fe54..c55578c 100644 --- a/builtin/function.go +++ b/builtin/function.go @@ -16,6 +16,8 @@ package builtin +import "github.com/rulego/streamsql/types" + //type Function struct { // AuthorName string // Func func(args ...any) (any, error) @@ -35,4 +37,6 @@ type AggregateFunction struct { Func func(input []float64, args ...any) (any, error) //验证参数 ValidateArgs func(input []float64, args ...any) (any, error) + //流计算处理 + StreamHandler types.StreamFunc } diff --git a/collector/collector.go b/collector/collector.go index 0ec5fbf..e472796 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -32,7 +32,7 @@ type DefaultSelectStreamSqlContext struct { msgAsMap map[string]interface{} msg types.Msg locker sync.RWMutex - groupValues types.GroupValues + groupValues dataset.GroupValues } func NewDefaultSelectStreamSqlContext(ctx context.Context, collector *StreamCollector, msg types.Msg) *DefaultSelectStreamSqlContext { @@ -68,17 +68,17 @@ func (c *DefaultSelectStreamSqlContext) RawInput() types.Msg { return c.msg } -func (c *DefaultSelectStreamSqlContext) SetGroupByKey(groupByKey types.GroupFields) { +func (c *DefaultSelectStreamSqlContext) SetGroupByKey(groupByKey dataset.GroupFields) { c.Collector.SetGroupByKey(groupByKey) } -func (c *DefaultSelectStreamSqlContext) GetGroupByKey() types.GroupFields { +func (c *DefaultSelectStreamSqlContext) GetGroupByKey() dataset.GroupFields { return c.Collector.GetGroupByKey() } -func (c *DefaultSelectStreamSqlContext) IsInitWindow(groupValues types.GroupValues) bool { +func (c *DefaultSelectStreamSqlContext) IsInitWindow(groupValues dataset.GroupValues) bool { return c.Collector.IsInitWindow(groupValues) } -func (c *DefaultSelectStreamSqlContext) AddWindow(groupByKey types.GroupValues, window types.Window) { +func (c *DefaultSelectStreamSqlContext) AddWindow(groupByKey dataset.GroupValues, window types.Window) { c.Collector.AddWindow(groupByKey, window) } @@ -86,19 +86,19 @@ func (c *DefaultSelectStreamSqlContext) CreteWindowObserver() types.WindowObserv return c.Collector.CreteWindowObserver() } -func (c *DefaultSelectStreamSqlContext) GetWindow(groupByKey types.GroupValues) types.Window { +func (c *DefaultSelectStreamSqlContext) GetWindow(groupByKey dataset.GroupValues) types.Window { return c.Collector.GetWindow(groupByKey) } -func (c *DefaultSelectStreamSqlContext) GetRow(groupValues types.GroupValues) (*dataset.Row, bool) { +func (c *DefaultSelectStreamSqlContext) GetRow(groupValues dataset.GroupValues) (*dataset.Row, bool) { return c.Collector.GetRow(groupValues) } -func (c *DefaultSelectStreamSqlContext) AddColumn(groupValues types.GroupValues, kv dataset.KeyValue) { +func (c *DefaultSelectStreamSqlContext) AddColumn(groupValues dataset.GroupValues, kv dataset.KeyValue) { c.Collector.AddColumn(groupValues, kv) } -func (c *DefaultSelectStreamSqlContext) GetColumn(groupValues types.GroupValues, key dataset.Key) (dataset.KeyValue, bool) { +func (c *DefaultSelectStreamSqlContext) GetColumn(groupValues dataset.GroupValues, key dataset.Key) (dataset.KeyValue, bool) { return c.Collector.GetColumn(groupValues, key) } @@ -116,24 +116,24 @@ func (c *DefaultSelectStreamSqlContext) GetColumn(groupValues types.GroupValues, // return v, ok //} -func (c *DefaultSelectStreamSqlContext) SetCurrentGroupValues(groupValues types.GroupValues) { +func (c *DefaultSelectStreamSqlContext) SetCurrentGroupValues(groupValues dataset.GroupValues) { c.groupValues = groupValues } -func (c *DefaultSelectStreamSqlContext) GetCurrentGroupValues() types.GroupValues { +func (c *DefaultSelectStreamSqlContext) GetCurrentGroupValues() dataset.GroupValues { return c.groupValues } -func (c *DefaultSelectStreamSqlContext) AddFieldAggregateValue(groupValues types.GroupValues, fieldId string, value float64) { +func (c *DefaultSelectStreamSqlContext) AddFieldAggregateValue(groupValues dataset.GroupValues, fieldId string, value float64) { c.Collector.AddFieldAggregateValue(groupValues, fieldId, value) } -func (c *DefaultSelectStreamSqlContext) GetFieldAggregateValue(groupValues types.GroupValues, fieldId string) []float64 { +func (c *DefaultSelectStreamSqlContext) GetFieldAggregateValue(groupValues dataset.GroupValues, fieldId string) []float64 { return c.Collector.GetFieldAggregateValue(groupValues, fieldId) } type aggregateFieldValue struct { - GroupFields types.GroupFields + GroupFields dataset.GroupFields FieldWindows map[string]types.Window } @@ -152,32 +152,32 @@ func (afv *aggregateFieldValue) GetFieldValues(fieldId string) []float64 { // StreamCollector 收集器 type StreamCollector struct { context.Context - keyedWindow map[types.GroupValues]types.Window + keyedWindow map[dataset.GroupValues]types.Window planner types.LogicalPlan aggregateOperators []types.AggregateOperator - groupByKey types.GroupFields - aggregateFieldValues map[types.GroupValues]*aggregateFieldValue + groupByKey dataset.GroupFields + aggregateFieldValues map[dataset.GroupValues]*aggregateFieldValue rows *dataset.Rows sync.Mutex } -func (c *StreamCollector) SetGroupByKey(groupByKey types.GroupFields) { +func (c *StreamCollector) SetGroupByKey(groupByKey dataset.GroupFields) { c.groupByKey = groupByKey } -func (c *StreamCollector) GetGroupByKey() types.GroupFields { +func (c *StreamCollector) GetGroupByKey() dataset.GroupFields { return c.groupByKey } -func (c *StreamCollector) IsInitWindow(groupValues types.GroupValues) bool { +func (c *StreamCollector) IsInitWindow(groupValues dataset.GroupValues) bool { _, ok := c.keyedWindow[groupValues] return ok } -func (c *StreamCollector) AddWindow(groupByKey types.GroupValues, window types.Window) { +func (c *StreamCollector) AddWindow(groupByKey dataset.GroupValues, window types.Window) { c.keyedWindow[groupByKey] = window } -func (c *StreamCollector) GetWindow(groupByKey types.GroupValues) types.Window { +func (c *StreamCollector) GetWindow(groupByKey dataset.GroupValues) types.Window { return c.keyedWindow[groupByKey] } @@ -203,7 +203,7 @@ func (c *StreamCollector) CreteWindowObserver() types.WindowObserver { }, } } -func (c *StreamCollector) AddFieldAggregateValue(groupValues types.GroupValues, fieldId string, data float64) { +func (c *StreamCollector) AddFieldAggregateValue(groupValues dataset.GroupValues, fieldId string, data float64) { c.Lock() defer c.Unlock() if w, ok := c.keyedWindow[groupValues]; ok { @@ -211,7 +211,7 @@ func (c *StreamCollector) AddFieldAggregateValue(groupValues types.GroupValues, } } -func (c *StreamCollector) GetFieldAggregateValue(groupValues types.GroupValues, fieldId string) []float64 { +func (c *StreamCollector) GetFieldAggregateValue(groupValues dataset.GroupValues, fieldId string) []float64 { c.Lock() defer c.Unlock() if _, ok := c.keyedWindow[groupValues]; ok { @@ -220,19 +220,19 @@ func (c *StreamCollector) GetFieldAggregateValue(groupValues types.GroupValues, return nil } -func (c *StreamCollector) GetRow(groupValues types.GroupValues) (*dataset.Row, bool) { +func (c *StreamCollector) GetRow(groupValues dataset.GroupValues) (*dataset.Row, bool) { c.Lock() defer c.Unlock() return c.rows.GetRow(groupValues) } -func (c *StreamCollector) AddColumn(groupValues types.GroupValues, kv dataset.KeyValue) { +func (c *StreamCollector) AddColumn(groupValues dataset.GroupValues, kv dataset.KeyValue) { c.Lock() defer c.Unlock() c.rows.AddColumn(groupValues, kv) } -func (c *StreamCollector) GetColumn(groupValues types.GroupValues, key dataset.Key) (dataset.KeyValue, bool) { +func (c *StreamCollector) GetColumn(groupValues dataset.GroupValues, key dataset.Key) (dataset.KeyValue, bool) { c.Lock() defer c.Unlock() return c.rows.GetColumn(groupValues, key) @@ -242,8 +242,8 @@ func NewStreamCollector(planner types.LogicalPlan) *StreamCollector { c := &StreamCollector{ planner: planner, aggregateOperators: planner.AggregateOperators(), - keyedWindow: make(map[types.GroupValues]types.Window), - aggregateFieldValues: make(map[types.GroupValues]*aggregateFieldValue), + keyedWindow: make(map[dataset.GroupValues]types.Window), + aggregateFieldValues: make(map[dataset.GroupValues]*aggregateFieldValue), rows: dataset.NewRows(), } return c diff --git a/types/group_fields.go b/dataset/group_fields.go similarity index 98% rename from types/group_fields.go rename to dataset/group_fields.go index b55a305..172b5d9 100644 --- a/types/group_fields.go +++ b/dataset/group_fields.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package types +package dataset import "strings" diff --git a/dataset/kv.go b/dataset/kv.go index 03f6cfe..1ccdb73 100644 --- a/dataset/kv.go +++ b/dataset/kv.go @@ -84,3 +84,20 @@ func StringSlice(k string, v []string) KeyValue { func Stringer(k string, v fmt.Stringer) KeyValue { return Key(k).String(v.String()) } + +func Interface(k string, data interface{}) KeyValue { + switch v := data.(type) { + case string: + return String(k, v) + case int: + return Int(k, v) + case int64: + return Int64(k, v) + case float64: + return Float64(k, v) + case bool: + return Bool(k, v) + default: + return String(k, fmt.Sprintf("%v", data)) + } +} diff --git a/dataset/row.go b/dataset/row.go index 0b68838..ce97c4c 100644 --- a/dataset/row.go +++ b/dataset/row.go @@ -16,14 +16,12 @@ package dataset -import "github.com/rulego/streamsql/types" - type Row struct { - GroupValues types.GroupValues + GroupValues GroupValues Columns map[Key]KeyValue } -func NewRow(groupValues types.GroupValues) *Row { +func NewRow(groupValues GroupValues) *Row { r := &Row{ GroupValues: groupValues, Columns: make(map[Key]KeyValue), @@ -47,19 +45,19 @@ func (r *Row) GetColumn(key Key) (KeyValue, bool) { } type Rows struct { - Rows map[types.GroupValues]*Row + Rows map[GroupValues]*Row } func NewRows() *Rows { r := &Rows{ - Rows: make(map[types.GroupValues]*Row), + Rows: make(map[GroupValues]*Row), } return r } -func (r *Rows) AddColumn(groupValues types.GroupValues, kv KeyValue) { +func (r *Rows) AddColumn(groupValues GroupValues, kv KeyValue) { if r.Rows == nil { - r.Rows = make(map[types.GroupValues]*Row) + r.Rows = make(map[GroupValues]*Row) } row, ok := r.Rows[groupValues] if !ok { @@ -69,7 +67,7 @@ func (r *Rows) AddColumn(groupValues types.GroupValues, kv KeyValue) { row.AddColumn(kv) } -func (r *Rows) GetColumn(groupValues types.GroupValues, key Key) (KeyValue, bool) { +func (r *Rows) GetColumn(groupValues GroupValues, key Key) (KeyValue, bool) { if r.Rows == nil { return KeyValue{}, false } @@ -80,7 +78,7 @@ func (r *Rows) GetColumn(groupValues types.GroupValues, key Key) (KeyValue, bool return row.GetColumn(key) } -func (r *Rows) GetRow(groupValues types.GroupValues) (*Row, bool) { +func (r *Rows) GetRow(groupValues GroupValues) (*Row, bool) { if r.Rows == nil { return nil, false } @@ -93,7 +91,7 @@ func (r *Rows) AddRow(row *Row) { return } if r.Rows == nil { - r.Rows = make(map[types.GroupValues]*Row) + r.Rows = make(map[GroupValues]*Row) } r.Rows[row.GroupValues] = row } diff --git a/dataset/value.go b/dataset/value.go index fbebca4..83e8b50 100644 --- a/dataset/value.go +++ b/dataset/value.go @@ -260,7 +260,7 @@ func (v Value) Emit() string { // MarshalJSON returns the JSON encoding of the Value. func (v Value) MarshalJSON() ([]byte, error) { var jsonVal struct { - Type string + Type Type Value interface{} } jsonVal.Type = v.Type() diff --git a/operator/aggregate_op.go b/operator/aggregate_op.go index b581b7a..9f7d62b 100644 --- a/operator/aggregate_op.go +++ b/operator/aggregate_op.go @@ -30,15 +30,19 @@ type AggregateOp struct { BaseOp //WindowType rsql.WindowType AggregateFunc *rsql.FunctionCall - //dataList *queue.Queue - //groupDataList map[types.GroupFields]*queue.Queue - ArgsProgram []*vm.Program + ArgsProgram []*vm.Program + fn *builtin.AggregateFunction + streamFunc types.StreamFunc } func (o *AggregateOp) Init(context types.StreamSqlContext) error { - //o.groupDataList = make(map[types.GroupFields]*queue.Queue) - - //o.dataList = queue.NewCircleQueue(10000) + if f, ok := builtin.AggregateBuiltins[o.AggregateFunc.Name]; ok { + o.fn = f + if o.fn.StreamHandler != nil { + newF := f.StreamHandler.New() + o.streamFunc = newF + } + } return nil } @@ -61,29 +65,38 @@ func (o *AggregateOp) Apply(context types.StreamSqlContext) error { return nil } +func (o *AggregateOp) New() types.StreamFunc { + return &AggregateOp{} +} + // AddHandler 窗口添加数据事件 func (o *AggregateOp) AddHandler(context types.StreamSqlOperatorContext, data float64) { - fmt.Println(data) + if o.streamFunc != nil { + o.streamFunc.AddHandler(context, data) + } } // ArchiveHandler 清除原始数据,观察者需要保存中间过程 func (o *AggregateOp) ArchiveHandler(context types.StreamSqlOperatorContext, dataList []float64) { - + if o.streamFunc != nil { + o.streamFunc.ArchiveHandler(context, dataList) + } } // StartHandler 窗口开始事件 func (o *AggregateOp) StartHandler(context types.StreamSqlOperatorContext) { - + if o.streamFunc != nil { + o.streamFunc.StartHandler(context) + } } // EndHandler 窗口结束事件 func (o *AggregateOp) EndHandler(context types.StreamSqlOperatorContext, dataList []float64) { - if f, ok := builtin.AggregateBuiltins[o.AggregateFunc.Name]; ok { - - fieldId := o.AggregateFunc.Args[0].(*rsql.ExpressionLang).Val - values := context.GetFieldAggregateValue(fieldId) - result, err := f.Func(values) + if o.streamFunc != nil { + o.streamFunc.EndHandler(context, dataList) + } else { + var input []float64 + result, err := o.fn.Func(input) fmt.Println(result, err) - //o.dataList.Reset() } } diff --git a/operator/group_by_op.go b/operator/group_by_op.go index ee74dc6..75123ed 100644 --- a/operator/group_by_op.go +++ b/operator/group_by_op.go @@ -17,6 +17,7 @@ package operator import ( + "github.com/rulego/streamsql/dataset" "github.com/rulego/streamsql/types" "github.com/rulego/streamsql/utils/cast" ) @@ -28,7 +29,7 @@ import ( type GroupByOp struct { *BaseOp - GroupByKey types.GroupFields + GroupByKey dataset.GroupFields keys []string } @@ -45,13 +46,13 @@ func (o *GroupByOp) Apply(context types.StreamSqlContext) error { if groupValues, ok := o.getGroupValues(selectStreamSqlContext.InputAsMap()); ok { selectStreamSqlContext.SetCurrentGroupValues(groupValues) } else { - selectStreamSqlContext.SetCurrentGroupValues(types.EmptyGroupValues) + selectStreamSqlContext.SetCurrentGroupValues(dataset.EmptyGroupValues) } } return nil } -func (o *GroupByOp) getGroupValues(data map[string]interface{}) (types.GroupValues, bool) { +func (o *GroupByOp) getGroupValues(data map[string]interface{}) (dataset.GroupValues, bool) { var list []string for _, key := range o.keys { if v, ok := data[key]; ok { @@ -60,7 +61,7 @@ func (o *GroupByOp) getGroupValues(data map[string]interface{}) (types.GroupValu return "", false } } - return types.NewGroupValues(list...), true + return dataset.NewGroupValues(list...), true } //// 检查是否是当前分组数据 diff --git a/operator/lookup_field_op.go b/operator/lookup_field_op.go index 2716968..3307c05 100644 --- a/operator/lookup_field_op.go +++ b/operator/lookup_field_op.go @@ -18,6 +18,7 @@ package operator import ( "github.com/expr-lang/expr/vm" + "github.com/rulego/streamsql/dataset" "github.com/rulego/streamsql/types" ) @@ -34,7 +35,7 @@ func (o *LookupFieldOp) Apply(context types.StreamSqlContext) error { if ctx, ok := context.(types.SelectStreamSqlContext); ok { if o.Field.EvalProgram != nil { if result, err := o.eval(o.Field.EvalProgram, ctx.InputAsMap()); err == nil { - ctx.AddField(o.Field.FieldId, result) + ctx.AddColumn(ctx.GetCurrentGroupValues(), dataset.Interface(o.Field.FieldId, result)) } else { return err } diff --git a/planner/group_by_plan.go b/planner/group_by_plan.go index 2ee3c9e..1c0f8b1 100644 --- a/planner/group_by_plan.go +++ b/planner/group_by_plan.go @@ -17,6 +17,7 @@ package planner import ( + "github.com/rulego/streamsql/dataset" "github.com/rulego/streamsql/operator" "github.com/rulego/streamsql/rsql" "github.com/rulego/streamsql/types" @@ -39,7 +40,7 @@ func (p *GroupByPlan) Plan(statement rsql.Statement) error { }) } else { p.AddOperators(&operator.GroupByOp{ - GroupByKey: types.GroupFields(item.(*rsql.Identifier).Val), + GroupByKey: dataset.GroupFields(item.(*rsql.Identifier).Val), }) } } diff --git a/types/types.go b/types/types.go index 3e9351b..80fe799 100644 --- a/types/types.go +++ b/types/types.go @@ -53,25 +53,38 @@ type StreamSqlContext interface { type StreamSqlOperatorContext interface { StreamSqlContext - AddWindow(groupValues GroupValues, window Window) + AddWindow(groupValues dataset.GroupValues, window Window) CreteWindowObserver() WindowObserver //GetWindow 获取窗口实例 - GetWindow(groupValues GroupValues) Window - IsInitWindow(groupValues GroupValues) bool - AddFieldAggregateValue(groupValues GroupValues, fieldId string, value float64) - GetFieldAggregateValue(groupValues GroupValues, fieldId string) []float64 - SetGroupByKey(groupByKey GroupFields) - GetRow(groupValues GroupValues) (*dataset.Row, bool) - AddColumn(groupValues GroupValues, kv dataset.KeyValue) - GetColumn(groupValues GroupValues, key dataset.Key) (dataset.KeyValue, bool) + GetWindow(groupValues dataset.GroupValues) Window + IsInitWindow(groupValues dataset.GroupValues) bool + AddFieldAggregateValue(groupValues dataset.GroupValues, fieldId string, value float64) + GetFieldAggregateValue(groupValues dataset.GroupValues, fieldId string) []float64 + SetGroupByKey(groupByKey dataset.GroupFields) + GetRow(groupValues dataset.GroupValues) (*dataset.Row, bool) + AddColumn(groupValues dataset.GroupValues, kv dataset.KeyValue) + GetColumn(groupValues dataset.GroupValues, key dataset.Key) (dataset.KeyValue, bool) } type SelectStreamSqlContext interface { StreamSqlOperatorContext InputAsMap() map[string]interface{} RawInput() Msg - SetCurrentGroupValues(groupValues GroupValues) - GetCurrentGroupValues() GroupValues + SetCurrentGroupValues(groupValues dataset.GroupValues) + GetCurrentGroupValues() dataset.GroupValues +} + +type StreamFunc interface { + //New 返回一个新的实例 + New() StreamFunc + // AddHandler 窗口添加数据事件 + AddHandler(context StreamSqlOperatorContext, data float64) + //ArchiveHandler 清除原始数据,观察者需要保存中间过程 + ArchiveHandler(context StreamSqlOperatorContext, dataList []float64) + //StartHandler 窗口开始事件 + StartHandler(context StreamSqlOperatorContext) + //EndHandler 窗口结束事件 + EndHandler(context StreamSqlOperatorContext, dataList []float64) } // WindowObserver 窗口事件观察者 @@ -97,10 +110,12 @@ type Operator interface { type AggregateOperator interface { Operator - AddHandler(context StreamSqlOperatorContext, data float64) - ArchiveHandler(context StreamSqlOperatorContext, dataList []float64) - StartHandler(context StreamSqlOperatorContext) - EndHandler(context StreamSqlOperatorContext, dataList []float64) + StreamFunc + + //AddHandler(context StreamSqlOperatorContext, data float64) + //ArchiveHandler(context StreamSqlOperatorContext, dataList []float64) + //StartHandler(context StreamSqlOperatorContext) + //EndHandler(context StreamSqlOperatorContext, dataList []float64) } // LogicalPlan 逻辑计划接口