/* * Copyright 2025 The RuleGo Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package streamsql import ( "fmt" "sync/atomic" "github.com/rulego/streamsql/rsql" "github.com/rulego/streamsql/stream" "github.com/rulego/streamsql/types" "github.com/rulego/streamsql/utils/table" ) // Streamsql is the main interface for the StreamSQL streaming engine. // It encapsulates core functionality including SQL parsing, stream processing, and window management. // // Usage example: // // ssql := streamsql.New() // err := ssql.Execute("SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')") // ssql.Emit(map[string]interface{}{"temperature": 25.5}) type Streamsql struct { stream *stream.Stream // Performance configuration mode performanceMode string // "default", "high_performance", "low_latency", "zero_data_loss", "custom" customConfig *types.PerformanceConfig // Save original SELECT field order to maintain field order for table output fieldOrder []string // Flag to track if Execute has been called executed int32 } // New creates a new StreamSQL instance. // Supports configuration through optional Option parameters. // // Parameters: // - options: Variable configuration options for customizing StreamSQL behavior // // Returns: // - *Streamsql: Newly created StreamSQL instance // // Examples: // // // Create default instance // ssql := streamsql.New() // // // Create high performance instance // ssql := streamsql.New(streamsql.WithHighPerformance()) // // // Create zero data loss instance // ssql := streamsql.New(streamsql.WithZeroDataLoss()) func New(options ...Option) *Streamsql { s := &Streamsql{ performanceMode: "default", // Default to standard performance configuration } // Apply all configuration options for _, option := range options { option(s) } return s } // Execute parses and executes SQL queries, creating corresponding stream processing pipelines. // This is the core method of StreamSQL, responsible for converting SQL into actual stream processing logic. // // Supported SQL syntax: // - SELECT clause: Select fields and aggregate functions // - FROM clause: Specify data source (usually 'stream') // - WHERE clause: Data filtering conditions // - GROUP BY clause: Grouping fields and window functions // - HAVING clause: Aggregate result filtering // - LIMIT clause: Limit result count // - DISTINCT: Result deduplication // // Window functions: // - TumblingWindow('5s'): Tumbling window // - SlidingWindow('30s', '10s'): Sliding window // - CountingWindow(100): Counting window // - SessionWindow('5m'): Session window // // Parameters: // - sql: SQL query statement to execute // // Returns: // - error: Returns error if SQL parsing or execution fails // // Examples: // // // Basic aggregation query // err := ssql.Execute("SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')") // // // Query with filtering conditions // err := ssql.Execute("SELECT * FROM stream WHERE temperature > 30") // // // Complex window aggregation // err := ssql.Execute(` // SELECT deviceId, // AVG(temperature) as avg_temp, // MAX(humidity) as max_humidity // FROM stream // WHERE deviceId != 'test' // GROUP BY deviceId, SlidingWindow('1m', '30s') // HAVING avg_temp > 25 // LIMIT 100 // `) func (s *Streamsql) Execute(sql string) error { // Try to acquire execution lock using CAS operation if !atomic.CompareAndSwapInt32(&s.executed, 0, 1) { return fmt.Errorf("Execute() has already been called, create a new Streamsql instance for different queries") } // Parse SQL statement config, condition, err := rsql.Parse(sql) if err != nil { // Reset executed flag on error atomic.StoreInt32(&s.executed, 0) return fmt.Errorf("SQL parsing failed: %w", err) } // Get field order information from parsing result s.fieldOrder = config.FieldOrder // Create stream processor based on performance mode var streamInstance *stream.Stream switch s.performanceMode { case "high_performance": streamInstance, err = stream.NewStreamWithHighPerformance(*config) case "low_latency": streamInstance, err = stream.NewStreamWithLowLatency(*config) case "custom": if s.customConfig != nil { streamInstance, err = stream.NewStreamWithCustomPerformance(*config, *s.customConfig) } else { streamInstance, err = stream.NewStream(*config) } default: // "default" streamInstance, err = stream.NewStream(*config) } if err != nil { // Reset executed flag on error atomic.StoreInt32(&s.executed, 0) return fmt.Errorf("failed to create stream processor: %w", err) } s.stream = streamInstance // Register filter condition if err = s.stream.RegisterFilter(condition); err != nil { // Reset executed flag on error atomic.StoreInt32(&s.executed, 0) return fmt.Errorf("failed to register filter condition: %w", err) } // Start stream processing s.stream.Start() return nil } // Emit adds data to the stream processing pipeline. // Accepts type-safe map[string]interface{} format data. // // Parameters: // - data: Data to add, must be map[string]interface{} type // // Examples: // // // Add device data // ssql.Emit(map[string]interface{}{ // "deviceId": "sensor001", // "temperature": 25.5, // "humidity": 60.0, // "timestamp": time.Now(), // }) // // // Add user behavior data // ssql.Emit(map[string]interface{}{ // "userId": "user123", // "action": "click", // "page": "/home", // }) func (s *Streamsql) Emit(data map[string]interface{}) { if s.stream != nil { s.stream.Emit(data) } } // EmitSync processes data synchronously, returning results immediately. // Only applicable for non-aggregation queries, aggregation queries will return an error. // Accepts type-safe map[string]interface{} format data. // // Parameters: // - data: Data to process, must be map[string]interface{} type // // Returns: // - map[string]interface{}: Processed result data, returns nil if filter conditions don't match // - error: Processing error // // Examples: // // result, err := ssql.EmitSync(map[string]interface{}{ // "deviceId": "sensor001", // "temperature": 25.5, // }) // if err != nil { // log.Printf("processing error: %v", err) // } else if result != nil { // // Use processed result immediately (result is map[string]interface{} type) // fmt.Printf("Processing result: %v\n", result) // } func (s *Streamsql) EmitSync(data map[string]interface{}) (map[string]interface{}, error) { if s.stream == nil { return nil, fmt.Errorf("stream not initialized") } // Check if it's a non-aggregation query if s.stream.IsAggregationQuery() { return nil, fmt.Errorf("synchronous mode only supports non-aggregation queries, use Emit() method for aggregation queries") } return s.stream.ProcessSync(data) } // IsAggregationQuery checks if the current query is an aggregation query func (s *Streamsql) IsAggregationQuery() bool { if s.stream == nil { return false } return s.stream.IsAggregationQuery() } // Stream returns the underlying stream processor instance. // Provides access to lower-level stream processing functionality. // // Returns: // - *stream.Stream: Underlying stream processor instance, returns nil if SQL not executed // // Common use cases: // - Add result processing callbacks // - Get result channel // - Manually control stream processing lifecycle // // Examples: // // // Add result processing callback // ssql.Stream().AddSink(func(results []map[string]interface{}) { // fmt.Printf("Processing results: %v\n", results) // }) // // // Get result channel // resultChan := ssql.Stream().GetResultsChan() // go func() { // for result := range resultChan { // // Process result // } // }() func (s *Streamsql) Stream() *stream.Stream { return s.stream } // GetStats returns stream processing statistics func (s *Streamsql) GetStats() map[string]int64 { if s.stream != nil { return s.stream.GetStats() } return make(map[string]int64) } // GetDetailedStats returns detailed performance statistics func (s *Streamsql) GetDetailedStats() map[string]interface{} { if s.stream != nil { return s.stream.GetDetailedStats() } return make(map[string]interface{}) } // Stop stops the stream processor and releases related resources. // After calling this method, the stream processor will stop receiving and processing new data. // // Recommended to call this method for cleanup before application exit: // // defer ssql.Stop() // // Note: StreamSQL instance cannot be restarted after stopping, create a new instance. func (s *Streamsql) Stop() { if s.stream != nil { s.stream.Stop() } } // AddSink directly adds result processing callback functions. // Convenience wrapper for Stream().AddSink() for cleaner API calls. // // Parameters: // - sink: Result processing function, receives []map[string]interface{} type result data // // Examples: // // // Directly add result processing // ssql.AddSink(func(results []map[string]interface{}) { // fmt.Printf("Processing results: %v\n", results) // }) // // // Add multiple processors // ssql.AddSink(func(results []map[string]interface{}) { // // Save to database // saveToDatabase(results) // }) // ssql.AddSink(func(results []map[string]interface{}) { // // Send to message queue // sendToQueue(results) // }) func (s *Streamsql) AddSink(sink func([]map[string]interface{})) { if s.stream != nil { s.stream.AddSink(sink) } } // PrintTable prints results to console in table format, similar to database output. // Displays column names first, then data rows. // // Supported data formats: // - []map[string]interface{}: Multiple rows // - map[string]interface{}: Single row // - Other types: Direct print // // Example: // // // Print results in table format // ssql.PrintTable() // // // Output format: // // +--------+----------+ // // | device | max_temp | // // +--------+----------+ // // | aa | 30.0 | // // | bb | 22.0 | // // +--------+----------+ func (s *Streamsql) PrintTable() { s.AddSink(func(results []map[string]interface{}) { s.printTableFormat(results) }) } // printTableFormat formats and prints table data // Parameters: // - results: Result data of type []map[string]interface{} func (s *Streamsql) printTableFormat(results []map[string]interface{}) { table.FormatTableData(results, s.fieldOrder) } // ToChannel returns result channel for asynchronous result retrieval. // Provides non-blocking access to stream processing results. // // Returns: // - <-chan interface{}: Read-only result channel, returns nil if SQL not executed // // Example: // // // Get result channel // resultChan := ssql.ToChannel() // if resultChan != nil { // go func() { // for result := range resultChan { // fmt.Printf("Async result: %v\n", result) // } // }() // } // ToChannel converts query results to channel output // Returns a read-only channel for receiving query results // // Notes: // - Consumer must continuously read from channel to prevent stream processing blocking // - Channel transmits batch result data func (s *Streamsql) ToChannel() <-chan []map[string]interface{} { if s.stream != nil { return s.stream.GetResultsChan() } return nil }