Files
2025-08-07 19:23:48 +08:00

122 lines
3.7 KiB
Go

/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package streamsql
import (
"time"
"github.com/rulego/streamsql/logger"
"github.com/rulego/streamsql/types"
)
// Option defines the configuration option type for StreamSQL
type Option func(*Streamsql)
// WithLogLevel sets the log level
func WithLogLevel(level logger.Level) Option {
return func(s *Streamsql) {
logger.GetDefault().SetLevel(level)
}
}
// WithDiscardLog disables log output
func WithDiscardLog() Option {
return func(s *Streamsql) {
logger.SetDefault(logger.NewDiscardLogger())
}
}
// WithHighPerformance uses high-performance configuration
// Suitable for scenarios requiring maximum throughput
func WithHighPerformance() Option {
return func(s *Streamsql) {
s.performanceMode = "high_performance"
}
}
// WithLowLatency uses low-latency configuration
// Suitable for real-time interactive applications, minimizing latency
func WithLowLatency() Option {
return func(s *Streamsql) {
s.performanceMode = "low_latency"
}
}
// WithZeroDataLoss uses zero data loss configuration
// Suitable for critical business data, ensuring no data loss
func WithZeroDataLoss() Option {
return func(s *Streamsql) {
s.performanceMode = "zero_data_loss"
}
}
// WithCustomPerformance uses custom performance configuration
func WithCustomPerformance(config types.PerformanceConfig) Option {
return func(s *Streamsql) {
s.performanceMode = "custom"
s.customConfig = &config
}
}
// WithBufferSizes sets custom buffer sizes
func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option {
return func(s *Streamsql) {
s.performanceMode = "custom"
config := types.DefaultPerformanceConfig()
config.BufferConfig.DataChannelSize = dataChannelSize
config.BufferConfig.ResultChannelSize = resultChannelSize
config.BufferConfig.WindowOutputSize = windowOutputSize
s.customConfig = &config
}
}
// WithOverflowStrategy sets the overflow strategy
func WithOverflowStrategy(strategy string, blockTimeout time.Duration) Option {
return func(s *Streamsql) {
s.performanceMode = "custom"
config := types.DefaultPerformanceConfig()
config.OverflowConfig.Strategy = strategy
config.OverflowConfig.BlockTimeout = blockTimeout
config.OverflowConfig.AllowDataLoss = (strategy == "drop")
s.customConfig = &config
}
}
// WithWorkerConfig sets the worker pool configuration
func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option {
return func(s *Streamsql) {
s.performanceMode = "custom"
config := types.DefaultPerformanceConfig()
config.WorkerConfig.SinkPoolSize = sinkPoolSize
config.WorkerConfig.SinkWorkerCount = sinkWorkerCount
config.WorkerConfig.MaxRetryRoutines = maxRetryRoutines
s.customConfig = &config
}
}
// WithMonitoring enables detailed monitoring
func WithMonitoring(updateInterval time.Duration, enableDetailedStats bool) Option {
return func(s *Streamsql) {
s.performanceMode = "custom"
config := types.DefaultPerformanceConfig()
config.MonitoringConfig.EnableMonitoring = true
config.MonitoringConfig.StatsUpdateInterval = updateInterval
config.MonitoringConfig.EnableDetailedStats = enableDetailedStats
s.customConfig = &config
}
}