mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2025-05-12 00:19:49 +00:00
add commandline flags to set GC interval and metrics TTL
Signed-off-by: Harman Singh <iamgrewal7@gmail.com>
This commit is contained in:
@ -62,6 +62,7 @@ type HPAProvider struct {
|
|||||||
recorder kube_record.EventRecorder
|
recorder kube_record.EventRecorder
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
disregardIncompatibleHPAs bool
|
disregardIncompatibleHPAs bool
|
||||||
|
gcInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// metricCollection is a container for sending collected metrics across a
|
// metricCollection is a container for sending collected metrics across a
|
||||||
@ -72,7 +73,7 @@ type metricCollection struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewHPAProvider initializes a new HPAProvider.
|
// NewHPAProvider initializes a new HPAProvider.
|
||||||
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
|
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool, metricsTTL time.Duration, gcInterval time.Duration) *HPAProvider {
|
||||||
metricsc := make(chan metricCollection)
|
metricsc := make(chan metricCollection)
|
||||||
|
|
||||||
return &HPAProvider{
|
return &HPAProvider{
|
||||||
@ -81,12 +82,13 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
|
|||||||
collectorInterval: collectorInterval,
|
collectorInterval: collectorInterval,
|
||||||
metricSink: metricsc,
|
metricSink: metricsc,
|
||||||
metricStore: NewMetricStore(func() time.Time {
|
metricStore: NewMetricStore(func() time.Time {
|
||||||
return time.Now().UTC().Add(15 * time.Minute)
|
return time.Now().UTC().Add(metricsTTL)
|
||||||
}),
|
}),
|
||||||
collectorFactory: collectorFactory,
|
collectorFactory: collectorFactory,
|
||||||
recorder: recorder.CreateEventRecorder(client),
|
recorder: recorder.CreateEventRecorder(client),
|
||||||
logger: log.WithFields(log.Fields{"provider": "hpa"}),
|
logger: log.WithFields(log.Fields{"provider": "hpa"}),
|
||||||
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
|
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
|
||||||
|
gcInterval: gcInterval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,7 +220,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
|
|||||||
go func(ctx context.Context) {
|
go func(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(10 * time.Minute):
|
case <-time.After(p.gcInterval):
|
||||||
p.metricStore.RemoveExpired()
|
p.metricStore.RemoveExpired()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
p.logger.Info("Stopped metrics store garbage collection.")
|
p.logger.Info("Stopped metrics store garbage collection.")
|
||||||
|
@ -106,7 +106,7 @@ func TestUpdateHPAs(t *testing.T) {
|
|||||||
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
|
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false, 1*time.Second, 1*time.Second)
|
||||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||||
|
|
||||||
err = provider.updateHPAs()
|
err = provider.updateHPAs()
|
||||||
@ -171,7 +171,7 @@ func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
eventRecorder := &mockEventRecorder{}
|
eventRecorder := &mockEventRecorder{}
|
||||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
|
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true, 1*time.Second, 1*time.Second)
|
||||||
provider.recorder = eventRecorder
|
provider.recorder = eventRecorder
|
||||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||||
|
|
||||||
@ -183,7 +183,7 @@ func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
|
|||||||
|
|
||||||
// check for events when disregardIncompatibleHPAs=false
|
// check for events when disregardIncompatibleHPAs=false
|
||||||
eventRecorder = &mockEventRecorder{}
|
eventRecorder = &mockEventRecorder{}
|
||||||
provider = NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
|
provider = NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false, 1*time.Second, 1*time.Second)
|
||||||
provider.recorder = eventRecorder
|
provider.recorder = eventRecorder
|
||||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||||
|
|
||||||
|
@ -116,6 +116,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
|||||||
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
|
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
|
||||||
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
|
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
|
||||||
"disregard failing to create collectors for incompatible HPAs")
|
"disregard failing to create collectors for incompatible HPAs")
|
||||||
|
flags.DurationVar(&o.MetricsTTL, "metrics-ttl", 15*time.Minute, "TTL for metrics that are stored in in-memory cache.")
|
||||||
|
flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.")
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,7 +245,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
|||||||
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
|
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
|
||||||
}
|
}
|
||||||
|
|
||||||
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
|
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs, o.MetricsTTL, o.GCInterval)
|
||||||
|
|
||||||
go hpaProvider.Run(ctx)
|
go hpaProvider.Run(ctx)
|
||||||
|
|
||||||
@ -350,4 +352,8 @@ type AdapterServerOptions struct {
|
|||||||
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
|
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
|
||||||
// kube-metrics-adapter beside another Metrics Provider
|
// kube-metrics-adapter beside another Metrics Provider
|
||||||
DisregardIncompatibleHPAs bool
|
DisregardIncompatibleHPAs bool
|
||||||
|
// TTL for metrics that are stored in in-memory cache
|
||||||
|
MetricsTTL time.Duration
|
||||||
|
// Interval to clean up metrics that are stored in in-memory cache
|
||||||
|
GCInterval time.Duration
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user