Files
2025-08-06 18:11:44 +08:00

124 lines
3.4 KiB
Go

/*
* Copyright 2025 The RuleGo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package stream
import (
"sync/atomic"
"time"
"github.com/rulego/streamsql/logger"
)
// DataHandler handles data processing for different strategies
type DataHandler struct {
stream *Stream
}
// NewDataHandler creates a new data handler
func NewDataHandler(stream *Stream) *DataHandler {
return &DataHandler{stream: stream}
}
// safeGetDataChan safely gets dataChan reference
func (s *Stream) safeGetDataChan() chan map[string]interface{} {
s.dataChanMux.RLock()
defer s.dataChanMux.RUnlock()
return s.dataChan
}
// safeSendToDataChan safely sends data to dataChan
func (s *Stream) safeSendToDataChan(data map[string]interface{}) bool {
dataChan := s.safeGetDataChan()
select {
case dataChan <- data:
return true
default:
return false
}
}
// expandDataChannel dynamically expands data channel capacity
func (s *Stream) expandDataChannel() {
// Use atomic operation to check if expansion is in progress, prevent concurrent expansion
if !atomic.CompareAndSwapInt32(&s.expanding, 0, 1) {
logger.Debug("Channel expansion already in progress, skipping")
return
}
defer atomic.StoreInt32(&s.expanding, 0)
// Acquire expansion lock to ensure only one goroutine performs expansion
s.expansionMux.Lock()
defer s.expansionMux.Unlock()
// Double-check if expansion is needed (double-checked locking pattern)
s.dataChanMux.RLock()
oldCap := cap(s.dataChan)
currentLen := len(s.dataChan)
s.dataChanMux.RUnlock()
// No expansion needed if current channel usage is below 80%
if float64(currentLen)/float64(oldCap) < 0.8 {
logger.Debug("Channel usage below threshold, expansion not needed")
return
}
newCap := int(float64(oldCap) * 1.5) // Expand by 50%
if newCap < oldCap+1000 {
newCap = oldCap + 1000 // At least increase by 1000
}
logger.Debug("Dynamic expansion of data channel: %d -> %d", oldCap, newCap)
// Create new larger channel
newChan := make(chan map[string]interface{}, newCap)
// Safely migrate data using write lock
s.dataChanMux.Lock()
oldChan := s.dataChan
// Quickly migrate data from old channel to new channel
migrationTimeout := time.NewTimer(5 * time.Second) // 5 second migration timeout
defer migrationTimeout.Stop()
migratedCount := 0
for {
select {
case data := <-oldChan:
select {
case newChan <- data:
migratedCount++
case <-migrationTimeout.C:
logger.Warn("Data migration timeout, some data may be lost during expansion")
goto migration_done
}
case <-migrationTimeout.C:
logger.Warn("Data migration timeout during channel drain")
goto migration_done
default:
// Old channel is empty, migration completed
goto migration_done
}
}
migration_done:
// Atomically update channel reference
s.dataChan = newChan
s.dataChanMux.Unlock()
logger.Debug("Channel expansion completed: migrated %d items", migratedCount)
}