forked from GiteaTest2015/streamsql
281 lines
8.6 KiB
Go
281 lines
8.6 KiB
Go
/*
|
|
* Copyright 2025 The RuleGo Authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
/*
|
|
Package types provides core type definitions and data structures for StreamSQL.
|
|
|
|
This package defines fundamental data types, configuration structures, and interfaces
|
|
used throughout the StreamSQL stream processing pipeline. It ensures type safety
|
|
and provides a unified API for data manipulation across components.
|
|
|
|
# Core Features
|
|
|
|
• Data Types - Core data structures for stream processing
|
|
• Configuration Management - Centralized configuration structures
|
|
• Type Safety - Strong typing with validation
|
|
• Serialization Support - JSON serialization support
|
|
• Cross-Component Compatibility - Shared types across packages
|
|
|
|
# Configuration Structures
|
|
|
|
Core configuration types:
|
|
|
|
type Config struct {
|
|
WindowConfig WindowConfig // Window settings
|
|
GroupFields []string // GROUP BY fields
|
|
SelectFields map[string]aggregator.AggregateType // SELECT aggregations
|
|
FieldAlias map[string]string // Field aliases
|
|
SimpleFields []string // Non-aggregated fields
|
|
FieldExpressions map[string]FieldExpression // Computed expressions
|
|
Where string // WHERE clause
|
|
Having string // HAVING clause
|
|
NeedWindow bool // Window requirement
|
|
Distinct bool // DISTINCT flag
|
|
Limit int // LIMIT clause
|
|
PerformanceConfig PerformanceConfig // Performance settings
|
|
}
|
|
|
|
# Window Configuration
|
|
|
|
Unified configuration for all window types:
|
|
|
|
type WindowConfig struct {
|
|
Type string // Window type
|
|
Params map[string]interface{} // Parameters
|
|
TsProp string // Timestamp property
|
|
TimeUnit time.Duration // Time unit
|
|
GroupByKey string // Grouping key
|
|
}
|
|
|
|
// Example configurations
|
|
// Tumbling window
|
|
windowConfig := WindowConfig{
|
|
Type: "tumbling",
|
|
Params: map[string]interface{}{
|
|
"size": "5s",
|
|
},
|
|
TsProp: "timestamp",
|
|
}
|
|
|
|
// Sliding window
|
|
windowConfig := WindowConfig{
|
|
Type: "sliding",
|
|
Params: map[string]interface{}{
|
|
"size": "30s",
|
|
"slide": "10s",
|
|
},
|
|
TsProp: "timestamp",
|
|
}
|
|
|
|
// Counting window
|
|
windowConfig := WindowConfig{
|
|
Type: "counting",
|
|
Params: map[string]interface{}{
|
|
"count": 100,
|
|
},
|
|
}
|
|
|
|
// Session window
|
|
windowConfig := WindowConfig{
|
|
Type: "session",
|
|
Params: map[string]interface{}{
|
|
"timeout": "5m",
|
|
},
|
|
GroupByKey: "user_id",
|
|
}
|
|
|
|
# Performance Configuration
|
|
|
|
Comprehensive performance tuning options:
|
|
|
|
type PerformanceConfig struct {
|
|
// Buffer management
|
|
BufferSize int // Input buffer size
|
|
BatchSize int // Processing batch size
|
|
FlushInterval time.Duration // Automatic flush interval
|
|
HighWaterMark float64 // Buffer high water mark (0.0-1.0)
|
|
LowWaterMark float64 // Buffer low water mark (0.0-1.0)
|
|
|
|
// Worker pool configuration
|
|
WorkerPoolSize int // Number of worker goroutines
|
|
MaxWorkers int // Maximum worker limit
|
|
WorkerIdleTime time.Duration // Worker idle timeout
|
|
|
|
// Overflow handling
|
|
OverflowStrategy string // "drop", "block", "spill", "compress"
|
|
SpillDirectory string // Directory for spill files
|
|
CompressionLevel int // Compression level (1-9)
|
|
|
|
// Memory management
|
|
MaxMemoryUsage int64 // Maximum memory usage in bytes
|
|
GCInterval time.Duration // Garbage collection interval
|
|
MemoryThreshold float64 // Memory usage threshold
|
|
|
|
// Monitoring
|
|
MetricsEnabled bool // Enable metrics collection
|
|
MetricsInterval time.Duration // Metrics collection interval
|
|
HealthCheckPort int // Health check HTTP port
|
|
|
|
// Persistence
|
|
PersistenceEnabled bool // Enable data persistence
|
|
PersistenceType string // "memory", "file", "database"
|
|
PersistencePath string // Persistence storage path
|
|
RecoveryEnabled bool // Enable automatic recovery
|
|
}
|
|
|
|
# Field Management
|
|
|
|
Advanced field handling and expression support:
|
|
|
|
type FieldExpression struct {
|
|
Field string // Field name
|
|
Expression string // Expression
|
|
Fields []string // Referenced fields
|
|
}
|
|
|
|
type Projection struct {
|
|
SourceType ProjectionSourceType // Source type (field, expression, aggregate)
|
|
Source string // Source identifier
|
|
Alias string // Output alias
|
|
DataType string // Expected data type
|
|
}
|
|
|
|
type ProjectionSourceType string
|
|
|
|
const (
|
|
ProjectionSourceField ProjectionSourceType = "field" // Direct field reference
|
|
ProjectionSourceExpression ProjectionSourceType = "expression" // Computed expression
|
|
ProjectionSourceAggregate ProjectionSourceType = "aggregate" // Aggregate function
|
|
ProjectionSourceConstant ProjectionSourceType = "constant" // Constant value
|
|
)
|
|
|
|
# Data Row Representation
|
|
|
|
Type-safe data row structures for stream processing:
|
|
|
|
type Row struct {
|
|
Data map[string]interface{} // Row data
|
|
Timestamp time.Time // Row timestamp
|
|
Metadata map[string]interface{} // Additional metadata
|
|
GroupKey string // Grouping key for aggregation
|
|
WindowID string // Window identifier
|
|
}
|
|
|
|
// Row creation and manipulation
|
|
func NewRow(data map[string]interface{}) *Row
|
|
func (r *Row) GetValue(field string) interface{}
|
|
func (r *Row) SetValue(field string, value interface{})
|
|
func (r *Row) HasField(field string) bool
|
|
func (r *Row) Clone() *Row
|
|
|
|
# Time Management
|
|
|
|
Time-based data structures for window processing:
|
|
|
|
type TimeSlot struct {
|
|
Start time.Time // Slot start time
|
|
End time.Time // Slot end time
|
|
Duration time.Duration // Slot duration
|
|
ID string // Unique slot identifier
|
|
}
|
|
|
|
// Time slot operations
|
|
func NewTimeSlot(start time.Time, duration time.Duration) *TimeSlot
|
|
func (ts *TimeSlot) Contains(timestamp time.Time) bool
|
|
func (ts *TimeSlot) Overlaps(other *TimeSlot) bool
|
|
func (ts *TimeSlot) String() string
|
|
|
|
# Configuration Presets
|
|
|
|
Pre-defined configuration templates for common use cases:
|
|
|
|
// High Performance Configuration
|
|
func NewHighPerformanceConfig() *PerformanceConfig {
|
|
return &PerformanceConfig{
|
|
BufferSize: 50000,
|
|
BatchSize: 1000,
|
|
WorkerPoolSize: 8,
|
|
FlushInterval: 100 * time.Millisecond,
|
|
OverflowStrategy: "spill",
|
|
MetricsEnabled: true,
|
|
}
|
|
}
|
|
|
|
// Low Latency Configuration
|
|
func NewLowLatencyConfig() *PerformanceConfig {
|
|
return &PerformanceConfig{
|
|
BufferSize: 1000,
|
|
BatchSize: 10,
|
|
WorkerPoolSize: 2,
|
|
FlushInterval: 10 * time.Millisecond,
|
|
OverflowStrategy: "drop",
|
|
MetricsEnabled: false,
|
|
}
|
|
}
|
|
|
|
// Zero Data Loss Configuration
|
|
func NewZeroDataLossConfig() *PerformanceConfig {
|
|
return &PerformanceConfig{
|
|
BufferSize: 10000,
|
|
BatchSize: 100,
|
|
WorkerPoolSize: 4,
|
|
FlushInterval: time.Second,
|
|
OverflowStrategy: "block",
|
|
PersistenceEnabled: true,
|
|
RecoveryEnabled: true,
|
|
MetricsEnabled: true,
|
|
}
|
|
}
|
|
|
|
# Usage Examples
|
|
|
|
Basic configuration:
|
|
|
|
config := &Config{
|
|
WindowConfig: WindowConfig{
|
|
Type: "tumbling",
|
|
Params: map[string]interface{}{"size": "5s"},
|
|
},
|
|
GroupFields: []string{"device_id"},
|
|
SelectFields: map[string]aggregator.AggregateType{
|
|
"temperature": aggregator.AggregateTypeAvg,
|
|
},
|
|
NeedWindow: true,
|
|
}
|
|
|
|
Data row operations:
|
|
|
|
row := NewRow(map[string]interface{}{
|
|
"device_id": "sensor001",
|
|
"temperature": 25.5,
|
|
})
|
|
|
|
deviceID := row.GetValue("device_id").(string)
|
|
row.SetValue("processed", true)
|
|
|
|
# Integration
|
|
|
|
Integrates with other StreamSQL components:
|
|
|
|
• Stream Package - Core data types for stream processing
|
|
• Window Package - WindowConfig for window configurations
|
|
• Aggregator Package - AggregateType definitions
|
|
• Condition Package - Data structures for clause evaluation
|
|
• Functions Package - Type definitions for functions
|
|
• RSQL Package - Config structures for query execution
|
|
*/
|
|
package types
|