From ffff8c20400bb9b0b028000830e3be8bfa1bbead Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Wed, 8 May 2019 10:34:50 +0200 Subject: [PATCH] Prevent leaking collectors when HPA gets updated (#54) * Prevent leaking collectors when HPA gets updated This fixes an issue where collectors would be leaking when HPAs are getting updated. Fix this by stopping the collector started for the previous version of the HPA. Signed-off-by: Mikkel Oscar Lyderik Larsen * Add tests to verify old collector is removed Signed-off-by: Mikkel Oscar Lyderik Larsen --- pkg/provider/hpa.go | 11 ++++- pkg/provider/hpa_test.go | 95 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 pkg/provider/hpa_test.go diff --git a/pkg/provider/hpa.go b/pkg/provider/hpa.go index 0d6b673..61a64f4 100644 --- a/pkg/provider/hpa.go +++ b/pkg/provider/hpa.go @@ -132,7 +132,16 @@ func (p *HPAProvider) updateHPAs() error { Namespace: hpa.Namespace, } - if cachedHPA, ok := p.hpaCache[resourceRef]; !ok || !equalHPA(cachedHPA, hpa) { + cachedHPA, ok := p.hpaCache[resourceRef] + hpaUpdated := !equalHPA(cachedHPA, hpa) + if !ok || hpaUpdated { + // if the hpa has changed then remove the previous + // scheduled collector. + if hpaUpdated { + p.logger.Infof("Removing previously scheduled metrics collector: %s", resourceRef) + p.collectorScheduler.Remove(resourceRef) + } + metricConfigs, err := collector.ParseHPAMetrics(&hpa) if err != nil { p.logger.Errorf("Failed to parse HPA metrics: %v", err) diff --git a/pkg/provider/hpa_test.go b/pkg/provider/hpa_test.go new file mode 100644 index 0000000..3d6e0c5 --- /dev/null +++ b/pkg/provider/hpa_test.go @@ -0,0 +1,95 @@ +package provider + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/zalando-incubator/kube-metrics-adapter/pkg/collector" + autoscalingv2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +type mockCollectorPlugin struct{} + +func (m mockCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *collector.MetricConfig, interval time.Duration) (collector.Collector, error) { + return mockCollector{}, nil +} + +type mockCollector struct{} + +func (c mockCollector) GetMetrics() ([]collector.CollectedMetric, error) { + return nil, nil +} + +func (c mockCollector) Interval() time.Duration { + return 1 * time.Second +} + +func TestUpdateHPAs(t *testing.T) { + value := resource.MustParse("1k") + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hpa1", + Namespace: "default", + Annotations: map[string]string{ + "metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps", + "metric-config.pods.requests-per-second.json-path/path": "/metrics", + "metric-config.pods.requests-per-second.json-path/port": "9090", + }, + }, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ + Kind: "Deployment", + Name: "app", + APIVersion: "apps/v1", + }, + MinReplicas: &[]int32{1}[0], + MaxReplicas: 10, + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.PodsMetricSourceType, + Pods: &autoscalingv2.PodsMetricSource{ + Metric: autoscalingv2.MetricIdentifier{ + Name: "requests-per-second", + }, + Target: autoscalingv2.MetricTarget{ + Type: autoscalingv2.AverageValueMetricType, + AverageValue: &value, + }, + }, + }, + }, + }, + } + + fakeClient := fake.NewSimpleClientset() + + var err error + hpa, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa) + require.NoError(t, err) + + collectorFactory := collector.NewCollectorFactory() + err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{}) + require.NoError(t, err) + + provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory) + provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink) + + err = provider.updateHPAs() + require.NoError(t, err) + require.Len(t, provider.collectorScheduler.table, 1) + + // update HPA + hpa.Annotations["metric-config.pods.requests-per-second.json-path/port"] = "8080" + _, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Update(hpa) + require.NoError(t, err) + + err = provider.updateHPAs() + require.NoError(t, err) + + require.Len(t, provider.collectorScheduler.table, 1) +}