Merge pull request #309 from iamgrewal7/master
add commandline flags to set GC interval and metrics TTL
This commit is contained in:
@ -62,6 +62,7 @@ type HPAProvider struct {
|
||||
recorder kube_record.EventRecorder
|
||||
logger *log.Entry
|
||||
disregardIncompatibleHPAs bool
|
||||
gcInterval time.Duration
|
||||
}
|
||||
|
||||
// metricCollection is a container for sending collected metrics across a
|
||||
@ -72,7 +73,7 @@ type metricCollection struct {
|
||||
}
|
||||
|
||||
// NewHPAProvider initializes a new HPAProvider.
|
||||
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
|
||||
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool, metricsTTL time.Duration, gcInterval time.Duration) *HPAProvider {
|
||||
metricsc := make(chan metricCollection)
|
||||
|
||||
return &HPAProvider{
|
||||
@ -81,12 +82,13 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
|
||||
collectorInterval: collectorInterval,
|
||||
metricSink: metricsc,
|
||||
metricStore: NewMetricStore(func() time.Time {
|
||||
return time.Now().UTC().Add(15 * time.Minute)
|
||||
return time.Now().UTC().Add(metricsTTL)
|
||||
}),
|
||||
collectorFactory: collectorFactory,
|
||||
recorder: recorder.CreateEventRecorder(client),
|
||||
logger: log.WithFields(log.Fields{"provider": "hpa"}),
|
||||
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
|
||||
gcInterval: gcInterval,
|
||||
}
|
||||
}
|
||||
|
||||
@ -218,7 +220,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
|
||||
go func(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(10 * time.Minute):
|
||||
case <-time.After(p.gcInterval):
|
||||
p.metricStore.RemoveExpired()
|
||||
case <-ctx.Done():
|
||||
p.logger.Info("Stopped metrics store garbage collection.")
|
||||
|
@ -106,7 +106,7 @@ func TestUpdateHPAs(t *testing.T) {
|
||||
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
||||
require.NoError(t, err)
|
||||
|
||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
|
||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false, 1*time.Second, 1*time.Second)
|
||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||
|
||||
err = provider.updateHPAs()
|
||||
@ -171,7 +171,7 @@ func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
eventRecorder := &mockEventRecorder{}
|
||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
|
||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true, 1*time.Second, 1*time.Second)
|
||||
provider.recorder = eventRecorder
|
||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||
|
||||
@ -183,7 +183,7 @@ func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
|
||||
|
||||
// check for events when disregardIncompatibleHPAs=false
|
||||
eventRecorder = &mockEventRecorder{}
|
||||
provider = NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
|
||||
provider = NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false, 1*time.Second, 1*time.Second)
|
||||
provider.recorder = eventRecorder
|
||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||
|
||||
|
@ -116,6 +116,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
|
||||
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
|
||||
"disregard failing to create collectors for incompatible HPAs")
|
||||
flags.DurationVar(&o.MetricsTTL, "metrics-ttl", 15*time.Minute, "TTL for metrics that are stored in in-memory cache.")
|
||||
flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.")
|
||||
return cmd
|
||||
}
|
||||
|
||||
@ -243,7 +245,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
|
||||
}
|
||||
|
||||
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
|
||||
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs, o.MetricsTTL, o.GCInterval)
|
||||
|
||||
go hpaProvider.Run(ctx)
|
||||
|
||||
@ -350,4 +352,8 @@ type AdapterServerOptions struct {
|
||||
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
|
||||
// kube-metrics-adapter beside another Metrics Provider
|
||||
DisregardIncompatibleHPAs bool
|
||||
// TTL for metrics that are stored in in-memory cache
|
||||
MetricsTTL time.Duration
|
||||
// Interval to clean up metrics that are stored in in-memory cache
|
||||
GCInterval time.Duration
|
||||
}
|
||||
|
Reference in New Issue
Block a user