Introduced min-pod-age configuration key. Added Pod condition handling

Signed-off-by: Anatolii Dutchak <adutchak-x@tunein.com>
This commit is contained in:
Anatolii Dutchak
2021-04-17 00:41:10 +03:00
parent 8725f02db7
commit f216070630
6 changed files with 146 additions and 11 deletions

View File

@ -111,6 +111,7 @@ metadata:
metric-config.pods.requests-per-second.json-path/scheme: "https"
metric-config.pods.requests-per-second.json-path/aggregator: "max"
metric-config.pods.requests-per-second.json-path/interval: "60s" # optional
metric-config.pods.requests-per-second.json-path/min-pod-age: "30s" # optional
spec:
scaleTargetRef:
apiVersion: apps/v1
@ -175,6 +176,11 @@ metric-config.pods.requests-per-second.json-path/connect-timeout: 500ms
The default for both of the above values is 15 seconds.
The `min-pod-age` configuration option instructs the service to start collecting metrics from the pods only if they are "older" than the specified amount of time.
This is handy when pods need to warm up before HPAs will start tracking their metrics.
The default value is 0 seconds.
## Prometheus collector
The Prometheus collector is a generic collector which can map Prometheus

View File

@ -12,6 +12,7 @@ const (
customMetricsPrefix = "metric-config."
perReplicaMetricsConfKey = "per-replica"
intervalMetricsConfKey = "interval"
minPodAgeConfKey = "min-pod-age"
)
type AnnotationConfigs struct {
@ -19,6 +20,7 @@ type AnnotationConfigs struct {
Configs map[string]string
PerReplica bool
Interval time.Duration
MinPodAge time.Duration
}
type MetricConfigKey struct {
@ -89,6 +91,15 @@ func (m AnnotationConfigMap) Parse(annotations map[string]string) error {
continue
}
if parts[1] == minPodAgeConfKey {
minPodAge, err := time.ParseDuration(val)
if err != nil {
return fmt.Errorf("failed to parse min-pod-age value %s for %s: %v", val, key, err)
}
config.MinPodAge = minPodAge
continue
}
config.Configs[parts[1]] = val
}
return nil

View File

@ -28,6 +28,7 @@ func TestParser(t *testing.T) {
"metric-config.pods.requests-per-second.json-path/path": "/metrics",
"metric-config.pods.requests-per-second.json-path/port": "9090",
"metric-config.pods.requests-per-second.json-path/scheme": "https",
"metric-config.pods.requests-per-second.json-path/min-pod-age": "30s",
},
MetricName: "requests-per-second",
MetricType: autoscalingv2.PodsMetricSourceType,

View File

@ -200,6 +200,7 @@ type MetricConfig struct {
ObjectReference custom_metrics.ObjectReference
PerReplica bool
Interval time.Duration
MinPodAge time.Duration
MetricSpec autoscalingv2.MetricSpec
}
@ -258,6 +259,7 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
config.CollectorType = annotationConfigs.CollectorType
config.Interval = annotationConfigs.Interval
config.PerReplica = annotationConfigs.PerReplica
config.MinPodAge = annotationConfigs.MinPodAge
// configs specified in annotations takes precedence
// over labels
for k, v := range annotationConfigs.Configs {

View File

@ -38,6 +38,7 @@ type PodCollector struct {
namespace string
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
minPodAge time.Duration
interval time.Duration
logger *log.Entry
httpClient *http.Client
@ -55,6 +56,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalP
namespace: hpa.Namespace,
metric: config.Metric,
metricType: config.Type,
minPodAge: config.MinPodAge,
interval: interval,
podLabelSelector: selector,
logger: log.WithFields(log.Fields{"Collector": "Pod"}),
@ -89,12 +91,27 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
ch := make(chan CollectedMetric)
errCh := make(chan error)
skippedPodsCount := 0
for _, pod := range pods.Items {
t := time.Now()
podAge := time.Duration(t.Sub(pod.ObjectMeta.CreationTimestamp.Time).Nanoseconds())
if podAge > c.minPodAge {
if IsPodReady(pod) {
go c.getPodMetric(pod, ch, errCh)
} else {
skippedPodsCount++
c.logger.Warnf("Skipping metrics collection for pod %s because it's status is not Ready.", pod.Name)
}
} else {
skippedPodsCount++
c.logger.Warnf("Skipping metrics collection for pod %s because it's age is %s and min-pod-age is set to %s", pod.Name, podAge, c.minPodAge)
}
}
values := make([]CollectedMetric, 0, len(pods.Items))
for i := 0; i < len(pods.Items); i++ {
values := make([]CollectedMetric, 0, (len(pods.Items) - skippedPodsCount))
for i := 0; i < (len(pods.Items) - skippedPodsCount); i++ {
select {
case err := <-errCh:
c.logger.Error(err)
@ -152,3 +169,18 @@ func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.Horizon
return nil, fmt.Errorf("unable to get pod label selector for scale target ref '%s'", hpa.Spec.ScaleTargetRef.Kind)
}
// IsPodReady extracts corev1.PodReady condition from the given pod object and
// returns the true if the condition corev1.PodReady is found. Returns -1 and false if the condition is not present.
func IsPodReady(pod corev1.Pod) bool {
conditions := pod.Status.Conditions
if conditions == nil {
return false
}
for i := range conditions {
if conditions[i].Type == corev1.PodReady {
return true
}
}
return false
}

View File

@ -45,9 +45,89 @@ func TestPodCollector(t *testing.T) {
plugin := NewPodCollectorPlugin(client)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
makeTestPods(t, host, port, "test-metric", client, 5)
creationTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
minPodAge := time.Duration(0 * time.Second)
podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionStatus(corev1.PodRunning)}
makeTestPods(t, host, port, "test-metric", client, 5, creationTimestamp, podCondition)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port)
testConfig := makeTestConfig(port, minPodAge)
collector, err := plugin.NewCollector(testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
func TestPodCollectorWithMinPodAge(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple-with-min-pod-age",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
// Setting pods age to 30 seconds
creationTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
// Pods that are not older that 60 seconds (all in this case) should not be processed
minPodAge := time.Duration(60 * time.Second)
podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionStatus(corev1.PodRunning)}
makeTestPods(t, host, port, "test-metric", client, 5, creationTimestamp, podCondition)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port, minPodAge)
collector, err := plugin.NewCollector(testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
func TestPodCollectorWithPodCondition(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple-with-pod-condition",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
creationTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
minPodAge := time.Duration(0 * time.Second)
//Pods in state corev1.PodScheduled should not be processed
podCondition := corev1.PodCondition{Type: corev1.PodScheduled, Status: corev1.ConditionStatus(corev1.PodRunning)}
makeTestPods(t, host, port, "test-metric", client, 5, creationTimestamp, podCondition)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port, minPodAge)
collector, err := plugin.NewCollector(testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
@ -95,14 +175,15 @@ func makeTestHTTPServer(t *testing.T, values [][]int64) (string, string, *testMe
return url.Hostname(), url.Port(), metricsHandler
}
func makeTestConfig(port string) *MetricConfig {
func makeTestConfig(port string, minPodAge time.Duration) *MetricConfig {
return &MetricConfig{
CollectorType: "json-path",
Config: map[string]string{"json-key": "$.values", "port": port, "path": "/metrics", "aggregator": "sum"},
MinPodAge: minPodAge,
}
}
func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int) {
func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int, creationTimestamp v1.Time, podCondition corev1.PodCondition) {
for i := 0; i < replicas; i++ {
testPod := &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
@ -111,9 +192,11 @@ func makeTestPods(t *testing.T, testServer string, metricName string, port strin
Annotations: map[string]string{
fmt.Sprintf("metric-config.pods.%s.json-path/port", metricName): port,
},
CreationTimestamp: creationTimestamp,
},
Status: corev1.PodStatus{
PodIP: testServer,
Conditions: []corev1.PodCondition{podCondition},
},
}
_, err := client.CoreV1().Pods(testNamespace).Create(context.TODO(), testPod, v1.CreateOptions{})