diff --git a/README.md b/README.md index 6c8a300..f3b561f 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,7 @@ metadata: metric-config.pods.requests-per-second.json-path/scheme: "https" metric-config.pods.requests-per-second.json-path/aggregator: "max" metric-config.pods.requests-per-second.json-path/interval: "60s" # optional + metric-config.pods.requests-per-second.json-path/min-pod-ready-age: "30s" # optional spec: scaleTargetRef: apiVersion: apps/v1 @@ -175,6 +176,11 @@ metric-config.pods.requests-per-second.json-path/connect-timeout: 500ms The default for both of the above values is 15 seconds. +The `min-pod-ready-age` configuration option instructs the service to start collecting metrics from the pods only if they are "older" (time elapsed after pod reached "Ready" state) than the specified amount of time. +This is handy when pods need to warm up before HPAs will start tracking their metrics. + +The default value is 0 seconds. + ## Prometheus collector The Prometheus collector is a generic collector which can map Prometheus diff --git a/pkg/annotations/parser.go b/pkg/annotations/parser.go index 39b3b82..5672348 100644 --- a/pkg/annotations/parser.go +++ b/pkg/annotations/parser.go @@ -12,13 +12,15 @@ const ( customMetricsPrefix = "metric-config." perReplicaMetricsConfKey = "per-replica" intervalMetricsConfKey = "interval" + minPodReadyAgeConfKey = "min-pod-ready-age" ) type AnnotationConfigs struct { - CollectorType string - Configs map[string]string - PerReplica bool - Interval time.Duration + CollectorType string + Configs map[string]string + PerReplica bool + Interval time.Duration + MinPodReadyAge time.Duration } type MetricConfigKey struct { @@ -89,6 +91,15 @@ func (m AnnotationConfigMap) Parse(annotations map[string]string) error { continue } + if parts[1] == minPodReadyAgeConfKey { + minPodReadyAge, err := time.ParseDuration(val) + if err != nil { + return fmt.Errorf("failed to parse min-pod-ready-age value %s for %s: %v", val, key, err) + } + config.MinPodReadyAge = minPodReadyAge + continue + } + config.Configs[parts[1]] = val } return nil diff --git a/pkg/annotations/parser_test.go b/pkg/annotations/parser_test.go index 612ff50..0282bd6 100644 --- a/pkg/annotations/parser_test.go +++ b/pkg/annotations/parser_test.go @@ -24,10 +24,11 @@ func TestParser(t *testing.T) { { Name: "pod metrics", Annotations: map[string]string{ - "metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps", - "metric-config.pods.requests-per-second.json-path/path": "/metrics", - "metric-config.pods.requests-per-second.json-path/port": "9090", - "metric-config.pods.requests-per-second.json-path/scheme": "https", + "metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps", + "metric-config.pods.requests-per-second.json-path/path": "/metrics", + "metric-config.pods.requests-per-second.json-path/port": "9090", + "metric-config.pods.requests-per-second.json-path/scheme": "https", + "metric-config.pods.requests-per-second.json-path/min-pod-ready-age": "30s", }, MetricName: "requests-per-second", MetricType: autoscalingv2.PodsMetricSourceType, diff --git a/pkg/collector/collector.go b/pkg/collector/collector.go index d857640..11e77c5 100644 --- a/pkg/collector/collector.go +++ b/pkg/collector/collector.go @@ -200,6 +200,7 @@ type MetricConfig struct { ObjectReference custom_metrics.ObjectReference PerReplica bool Interval time.Duration + MinPodReadyAge time.Duration MetricSpec autoscalingv2.MetricSpec } @@ -258,6 +259,7 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi config.CollectorType = annotationConfigs.CollectorType config.Interval = annotationConfigs.Interval config.PerReplica = annotationConfigs.PerReplica + config.MinPodReadyAge = annotationConfigs.MinPodReadyAge // configs specified in annotations takes precedence // over labels for k, v := range annotationConfigs.Configs { diff --git a/pkg/collector/pod_collector.go b/pkg/collector/pod_collector.go index 499c015..e5cb2db 100644 --- a/pkg/collector/pod_collector.go +++ b/pkg/collector/pod_collector.go @@ -7,7 +7,6 @@ import ( "time" log "github.com/sirupsen/logrus" - "github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics" autoscalingv2 "k8s.io/api/autoscaling/v2beta2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -15,6 +14,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/metrics/pkg/apis/custom_metrics" + + "github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics" ) type PodCollectorPlugin struct { @@ -38,6 +39,7 @@ type PodCollector struct { namespace string metric autoscalingv2.MetricIdentifier metricType autoscalingv2.MetricSourceType + minPodReadyAge time.Duration interval time.Duration logger *log.Entry httpClient *http.Client @@ -55,6 +57,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalP namespace: hpa.Namespace, metric: config.Metric, metricType: config.Type, + minPodReadyAge: config.MinPodReadyAge, interval: interval, podLabelSelector: selector, logger: log.WithFields(log.Fields{"Collector": "Pod"}), @@ -89,12 +92,27 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) { ch := make(chan CollectedMetric) errCh := make(chan error) + skippedPodsCount := 0 + for _, pod := range pods.Items { - go c.getPodMetric(pod, ch, errCh) + + isPodReady, podReadyAge := GetPodReadyAge(pod) + + if isPodReady { + if podReadyAge >= c.minPodReadyAge { + go c.getPodMetric(pod, ch, errCh) + } else { + skippedPodsCount++ + c.logger.Warnf("Skipping metrics collection for pod %s/%s because it's ready age is %s and min-pod-ready-age is set to %s", pod.Namespace, pod.Name, podReadyAge, c.minPodReadyAge) + } + } else { + skippedPodsCount++ + c.logger.Warnf("Skipping metrics collection for pod %s/%s because it's status is not Ready.", pod.Namespace, pod.Name) + } } - values := make([]CollectedMetric, 0, len(pods.Items)) - for i := 0; i < len(pods.Items); i++ { + values := make([]CollectedMetric, 0, (len(pods.Items) - skippedPodsCount)) + for i := 0; i < (len(pods.Items) - skippedPodsCount); i++ { select { case err := <-errCh: c.logger.Error(err) @@ -152,3 +170,21 @@ func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.Horizon return nil, fmt.Errorf("unable to get pod label selector for scale target ref '%s'", hpa.Spec.ScaleTargetRef.Kind) } + +// GetPodReadyAge extracts corev1.PodReady condition from the given pod object and +// returns true, time.Duration() for LastTransitionTime if the condition corev1.PodReady is found. Returns time.Duration(0s), false if the condition is not present. +func GetPodReadyAge(pod corev1.Pod) (bool, time.Duration) { + podReadyAge := time.Duration(0 * time.Second) + conditions := pod.Status.Conditions + if conditions == nil { + return false, podReadyAge + } + for i := range conditions { + if conditions[i].Type == corev1.PodReady && conditions[i].Status == corev1.ConditionTrue { + podReadyAge = time.Since(conditions[i].LastTransitionTime.Time) + return true, podReadyAge + } + } + + return false, podReadyAge +} diff --git a/pkg/collector/pod_collector_test.go b/pkg/collector/pod_collector_test.go index c90d394..c7bbe43 100644 --- a/pkg/collector/pod_collector_test.go +++ b/pkg/collector/pod_collector_test.go @@ -45,9 +45,89 @@ func TestPodCollector(t *testing.T) { plugin := NewPodCollectorPlugin(client) makeTestDeployment(t, client) host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics) - makeTestPods(t, host, port, "test-metric", client, 5) + lastReadyTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second)) + minPodReadyAge := time.Duration(0 * time.Second) + podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: lastReadyTransitionTimeTimestamp} + makeTestPods(t, host, port, "test-metric", client, 5, podCondition) testHPA := makeTestHPA(t, client) - testConfig := makeTestConfig(port) + testConfig := makeTestConfig(port, minPodReadyAge) + collector, err := plugin.NewCollector(testHPA, testConfig, testInterval) + require.NoError(t, err) + metrics, err := collector.GetMetrics() + require.NoError(t, err) + require.Equal(t, len(metrics), int(metricsHandler.calledCounter)) + var values []int64 + for _, m := range metrics { + values = append(values, m.Custom.Value.Value()) + } + require.ElementsMatch(t, tc.result, values) + }) + } +} + +func TestPodCollectorWithMinPodReadyAge(t *testing.T) { + for _, tc := range []struct { + name string + metrics [][]int64 + result []int64 + }{ + { + name: "simple-with-min-pod-ready-age", + metrics: [][]int64{{1}, {3}, {8}, {5}, {2}}, + result: []int64{}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + client := fake.NewSimpleClientset() + plugin := NewPodCollectorPlugin(client) + makeTestDeployment(t, client) + host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics) + // Setting pods age to 30 seconds + lastReadyTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second)) + // Pods that are not older that 60 seconds (all in this case) should not be processed + minPodReadyAge := time.Duration(60 * time.Second) + podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: lastReadyTransitionTimeTimestamp} + makeTestPods(t, host, port, "test-metric", client, 5, podCondition) + testHPA := makeTestHPA(t, client) + testConfig := makeTestConfig(port, minPodReadyAge) + collector, err := plugin.NewCollector(testHPA, testConfig, testInterval) + require.NoError(t, err) + metrics, err := collector.GetMetrics() + require.NoError(t, err) + require.Equal(t, len(metrics), int(metricsHandler.calledCounter)) + var values []int64 + for _, m := range metrics { + values = append(values, m.Custom.Value.Value()) + } + require.ElementsMatch(t, tc.result, values) + }) + } +} + +func TestPodCollectorWithPodCondition(t *testing.T) { + for _, tc := range []struct { + name string + metrics [][]int64 + result []int64 + }{ + { + name: "simple-with-pod-condition", + metrics: [][]int64{{1}, {3}, {8}, {5}, {2}}, + result: []int64{}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + client := fake.NewSimpleClientset() + plugin := NewPodCollectorPlugin(client) + makeTestDeployment(t, client) + host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics) + lastScheduledTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second)) + minPodReadyAge := time.Duration(0 * time.Second) + //Pods in state corev1.PodReady == corev1.ConditionFalse should not be processed + podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionFalse, LastTransitionTime: lastScheduledTransitionTimeTimestamp} + makeTestPods(t, host, port, "test-metric", client, 5, podCondition) + testHPA := makeTestHPA(t, client) + testConfig := makeTestConfig(port, minPodReadyAge) collector, err := plugin.NewCollector(testHPA, testConfig, testInterval) require.NoError(t, err) metrics, err := collector.GetMetrics() @@ -95,14 +175,15 @@ func makeTestHTTPServer(t *testing.T, values [][]int64) (string, string, *testMe return url.Hostname(), url.Port(), metricsHandler } -func makeTestConfig(port string) *MetricConfig { +func makeTestConfig(port string, minPodReadyAge time.Duration) *MetricConfig { return &MetricConfig{ - CollectorType: "json-path", - Config: map[string]string{"json-key": "$.values", "port": port, "path": "/metrics", "aggregator": "sum"}, + CollectorType: "json-path", + Config: map[string]string{"json-key": "$.values", "port": port, "path": "/metrics", "aggregator": "sum"}, + MinPodReadyAge: minPodReadyAge, } } -func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int) { +func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int, podCondition corev1.PodCondition) { for i := 0; i < replicas; i++ { testPod := &corev1.Pod{ ObjectMeta: v1.ObjectMeta{ @@ -113,7 +194,8 @@ func makeTestPods(t *testing.T, testServer string, metricName string, port strin }, }, Status: corev1.PodStatus{ - PodIP: testServer, + PodIP: testServer, + Conditions: []corev1.PodCondition{podCondition}, }, } _, err := client.CoreV1().Pods(testNamespace).Create(context.TODO(), testPod, v1.CreateOptions{})