Merge pull request #308 from adutchak-x/feature/improve-pod-collector

Feature/improve pod collector
This commit is contained in:
Jonathan Juares Beber
2021-04-21 19:39:49 +02:00
committed by GitHub
6 changed files with 157 additions and 19 deletions

View File

@ -111,6 +111,7 @@ metadata:
metric-config.pods.requests-per-second.json-path/scheme: "https"
metric-config.pods.requests-per-second.json-path/aggregator: "max"
metric-config.pods.requests-per-second.json-path/interval: "60s" # optional
metric-config.pods.requests-per-second.json-path/min-pod-ready-age: "30s" # optional
spec:
scaleTargetRef:
apiVersion: apps/v1
@ -175,6 +176,11 @@ metric-config.pods.requests-per-second.json-path/connect-timeout: 500ms
The default for both of the above values is 15 seconds.
The `min-pod-ready-age` configuration option instructs the service to start collecting metrics from the pods only if they are "older" (time elapsed after pod reached "Ready" state) than the specified amount of time.
This is handy when pods need to warm up before HPAs will start tracking their metrics.
The default value is 0 seconds.
## Prometheus collector
The Prometheus collector is a generic collector which can map Prometheus

View File

@ -12,13 +12,15 @@ const (
customMetricsPrefix = "metric-config."
perReplicaMetricsConfKey = "per-replica"
intervalMetricsConfKey = "interval"
minPodReadyAgeConfKey = "min-pod-ready-age"
)
type AnnotationConfigs struct {
CollectorType string
Configs map[string]string
PerReplica bool
Interval time.Duration
CollectorType string
Configs map[string]string
PerReplica bool
Interval time.Duration
MinPodReadyAge time.Duration
}
type MetricConfigKey struct {
@ -89,6 +91,15 @@ func (m AnnotationConfigMap) Parse(annotations map[string]string) error {
continue
}
if parts[1] == minPodReadyAgeConfKey {
minPodReadyAge, err := time.ParseDuration(val)
if err != nil {
return fmt.Errorf("failed to parse min-pod-ready-age value %s for %s: %v", val, key, err)
}
config.MinPodReadyAge = minPodReadyAge
continue
}
config.Configs[parts[1]] = val
}
return nil

View File

@ -24,10 +24,11 @@ func TestParser(t *testing.T) {
{
Name: "pod metrics",
Annotations: map[string]string{
"metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps",
"metric-config.pods.requests-per-second.json-path/path": "/metrics",
"metric-config.pods.requests-per-second.json-path/port": "9090",
"metric-config.pods.requests-per-second.json-path/scheme": "https",
"metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps",
"metric-config.pods.requests-per-second.json-path/path": "/metrics",
"metric-config.pods.requests-per-second.json-path/port": "9090",
"metric-config.pods.requests-per-second.json-path/scheme": "https",
"metric-config.pods.requests-per-second.json-path/min-pod-ready-age": "30s",
},
MetricName: "requests-per-second",
MetricType: autoscalingv2.PodsMetricSourceType,

View File

@ -200,6 +200,7 @@ type MetricConfig struct {
ObjectReference custom_metrics.ObjectReference
PerReplica bool
Interval time.Duration
MinPodReadyAge time.Duration
MetricSpec autoscalingv2.MetricSpec
}
@ -258,6 +259,7 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
config.CollectorType = annotationConfigs.CollectorType
config.Interval = annotationConfigs.Interval
config.PerReplica = annotationConfigs.PerReplica
config.MinPodReadyAge = annotationConfigs.MinPodReadyAge
// configs specified in annotations takes precedence
// over labels
for k, v := range annotationConfigs.Configs {

View File

@ -7,7 +7,6 @@ import (
"time"
log "github.com/sirupsen/logrus"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -15,6 +14,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
)
type PodCollectorPlugin struct {
@ -38,6 +39,7 @@ type PodCollector struct {
namespace string
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
minPodReadyAge time.Duration
interval time.Duration
logger *log.Entry
httpClient *http.Client
@ -55,6 +57,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalP
namespace: hpa.Namespace,
metric: config.Metric,
metricType: config.Type,
minPodReadyAge: config.MinPodReadyAge,
interval: interval,
podLabelSelector: selector,
logger: log.WithFields(log.Fields{"Collector": "Pod"}),
@ -89,12 +92,27 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
ch := make(chan CollectedMetric)
errCh := make(chan error)
skippedPodsCount := 0
for _, pod := range pods.Items {
go c.getPodMetric(pod, ch, errCh)
isPodReady, podReadyAge := GetPodReadyAge(pod)
if isPodReady {
if podReadyAge >= c.minPodReadyAge {
go c.getPodMetric(pod, ch, errCh)
} else {
skippedPodsCount++
c.logger.Warnf("Skipping metrics collection for pod %s/%s because it's ready age is %s and min-pod-ready-age is set to %s", pod.Namespace, pod.Name, podReadyAge, c.minPodReadyAge)
}
} else {
skippedPodsCount++
c.logger.Warnf("Skipping metrics collection for pod %s/%s because it's status is not Ready.", pod.Namespace, pod.Name)
}
}
values := make([]CollectedMetric, 0, len(pods.Items))
for i := 0; i < len(pods.Items); i++ {
values := make([]CollectedMetric, 0, (len(pods.Items) - skippedPodsCount))
for i := 0; i < (len(pods.Items) - skippedPodsCount); i++ {
select {
case err := <-errCh:
c.logger.Error(err)
@ -152,3 +170,21 @@ func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.Horizon
return nil, fmt.Errorf("unable to get pod label selector for scale target ref '%s'", hpa.Spec.ScaleTargetRef.Kind)
}
// GetPodReadyAge extracts corev1.PodReady condition from the given pod object and
// returns true, time.Duration() for LastTransitionTime if the condition corev1.PodReady is found. Returns time.Duration(0s), false if the condition is not present.
func GetPodReadyAge(pod corev1.Pod) (bool, time.Duration) {
podReadyAge := time.Duration(0 * time.Second)
conditions := pod.Status.Conditions
if conditions == nil {
return false, podReadyAge
}
for i := range conditions {
if conditions[i].Type == corev1.PodReady && conditions[i].Status == corev1.ConditionTrue {
podReadyAge = time.Since(conditions[i].LastTransitionTime.Time)
return true, podReadyAge
}
}
return false, podReadyAge
}

View File

@ -45,9 +45,89 @@ func TestPodCollector(t *testing.T) {
plugin := NewPodCollectorPlugin(client)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
makeTestPods(t, host, port, "test-metric", client, 5)
lastReadyTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
minPodReadyAge := time.Duration(0 * time.Second)
podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: lastReadyTransitionTimeTimestamp}
makeTestPods(t, host, port, "test-metric", client, 5, podCondition)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port)
testConfig := makeTestConfig(port, minPodReadyAge)
collector, err := plugin.NewCollector(testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
func TestPodCollectorWithMinPodReadyAge(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple-with-min-pod-ready-age",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
// Setting pods age to 30 seconds
lastReadyTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
// Pods that are not older that 60 seconds (all in this case) should not be processed
minPodReadyAge := time.Duration(60 * time.Second)
podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: lastReadyTransitionTimeTimestamp}
makeTestPods(t, host, port, "test-metric", client, 5, podCondition)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port, minPodReadyAge)
collector, err := plugin.NewCollector(testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
func TestPodCollectorWithPodCondition(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple-with-pod-condition",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
lastScheduledTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
minPodReadyAge := time.Duration(0 * time.Second)
//Pods in state corev1.PodReady == corev1.ConditionFalse should not be processed
podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionFalse, LastTransitionTime: lastScheduledTransitionTimeTimestamp}
makeTestPods(t, host, port, "test-metric", client, 5, podCondition)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port, minPodReadyAge)
collector, err := plugin.NewCollector(testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
@ -95,14 +175,15 @@ func makeTestHTTPServer(t *testing.T, values [][]int64) (string, string, *testMe
return url.Hostname(), url.Port(), metricsHandler
}
func makeTestConfig(port string) *MetricConfig {
func makeTestConfig(port string, minPodReadyAge time.Duration) *MetricConfig {
return &MetricConfig{
CollectorType: "json-path",
Config: map[string]string{"json-key": "$.values", "port": port, "path": "/metrics", "aggregator": "sum"},
CollectorType: "json-path",
Config: map[string]string{"json-key": "$.values", "port": port, "path": "/metrics", "aggregator": "sum"},
MinPodReadyAge: minPodReadyAge,
}
}
func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int) {
func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int, podCondition corev1.PodCondition) {
for i := 0; i < replicas; i++ {
testPod := &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
@ -113,7 +194,8 @@ func makeTestPods(t *testing.T, testServer string, metricName string, port strin
},
},
Status: corev1.PodStatus{
PodIP: testServer,
PodIP: testServer,
Conditions: []corev1.PodCondition{podCondition},
},
}
_, err := client.CoreV1().Pods(testNamespace).Create(context.TODO(), testPod, v1.CreateOptions{})