kube-metrics-adapter/pkg/collector/pod_collector_test.go
Mikkel Oscar Lyderik Larsen 5a543781d7 Introduce context for collector interface
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2024-05-21 14:00:31 +02:00

369 lines
13 KiB
Go

package collector
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
"time"
argorolloutsv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
argorolloutsfake "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
const (
testNamespace = "test-namespace"
applicationLabelName = "application"
applicationLabelValue = "test-application"
testDeploymentName = "test-application"
testRolloutName = "test-application"
testInterval = 10 * time.Second
)
func TestPodCollector(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{1, 3, 8, 5, 2},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
argoRolloutsClient := argorolloutsfake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client, argoRolloutsClient)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
lastReadyTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
minPodReadyAge := time.Duration(0 * time.Second)
podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: lastReadyTransitionTimeTimestamp}
podDeletionTimestamp := time.Time{}
makeTestPods(t, host, port, "test-metric", client, 5, podCondition, podDeletionTimestamp)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port, minPodReadyAge)
collector, err := plugin.NewCollector(context.Background(), testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics(context.Background())
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
func TestPodCollectorWithMinPodReadyAge(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple-with-min-pod-ready-age",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
argoRolloutsClient := argorolloutsfake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client, argoRolloutsClient)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
// Setting pods age to 30 seconds
lastReadyTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
// Pods that are not older that 60 seconds (all in this case) should not be processed
minPodReadyAge := time.Duration(60 * time.Second)
podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: lastReadyTransitionTimeTimestamp}
podDeletionTimestamp := time.Time{}
makeTestPods(t, host, port, "test-metric", client, 5, podCondition, podDeletionTimestamp)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port, minPodReadyAge)
collector, err := plugin.NewCollector(context.Background(), testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics(context.Background())
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
func TestPodCollectorWithPodCondition(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple-with-pod-condition",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
argoRolloutsClient := argorolloutsfake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client, argoRolloutsClient)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
lastScheduledTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
minPodReadyAge := time.Duration(0 * time.Second)
podDeletionTimestamp := time.Time{}
//Pods in state corev1.PodReady == corev1.ConditionFalse should not be processed
podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionFalse, LastTransitionTime: lastScheduledTransitionTimeTimestamp}
makeTestPods(t, host, port, "test-metric", client, 5, podCondition, podDeletionTimestamp)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port, minPodReadyAge)
collector, err := plugin.NewCollector(context.Background(), testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics(context.Background())
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
func TestPodCollectorWithPodTerminatingCondition(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple-with-pod-condition",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
argoRolloutsClient := argorolloutsfake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client, argoRolloutsClient)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
lastScheduledTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
minPodReadyAge := time.Duration(0 * time.Second)
//Pods with podDeletionTimestamp should not be processed
podDeletionTimestamp := time.Now()
podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: lastScheduledTransitionTimeTimestamp}
makeTestPods(t, host, port, "test-metric", client, 5, podCondition, podDeletionTimestamp)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port, minPodReadyAge)
collector, err := plugin.NewCollector(context.Background(), testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics(context.Background())
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
func TestPodCollectorWithRollout(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple-with-rollout",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{1, 3, 8, 5, 2},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
argoRolloutsClient := argorolloutsfake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client, argoRolloutsClient)
makeTestRollout(t, argoRolloutsClient)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
lastReadyTransitionTimeTimestamp := v1.NewTime(time.Now().Add(time.Duration(-30) * time.Second))
minPodReadyAge := time.Duration(0 * time.Second)
podCondition := corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: lastReadyTransitionTimeTimestamp}
podDeletionTimestamp := time.Time{}
makeTestPods(t, host, port, "test-metric", client, 5, podCondition, podDeletionTimestamp)
testHPA := makeTestHPAForRollout(t, client)
testConfig := makeTestConfig(port, minPodReadyAge)
collector, err := plugin.NewCollector(context.Background(), testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics(context.Background())
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
type testMetricResponse struct {
Values []int64 `json:"values"`
}
type testMetricsHandler struct {
values [][]int64
calledCounter uint
t *testing.T
metricsPath string
sync.RWMutex
}
func (h *testMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.Lock()
defer h.Unlock()
require.Equal(h.t, h.metricsPath, r.URL.Path)
require.Less(h.t, int(h.calledCounter), len(h.values))
response, err := json.Marshal(testMetricResponse{Values: h.values[h.calledCounter]})
require.NoError(h.t, err)
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(response)
require.NoError(h.t, err)
h.calledCounter++
}
func makeTestHTTPServer(t *testing.T, values [][]int64) (string, string, *testMetricsHandler) {
metricsHandler := &testMetricsHandler{values: values, t: t, metricsPath: "/metrics"}
server := httptest.NewServer(metricsHandler)
url, err := url.Parse(server.URL)
require.NoError(t, err)
return url.Hostname(), url.Port(), metricsHandler
}
func makeTestConfig(port string, minPodReadyAge time.Duration) *MetricConfig {
return &MetricConfig{
CollectorType: "json-path",
Config: map[string]string{"json-key": "$.values", "port": port, "path": "/metrics", "aggregator": "sum"},
MinPodReadyAge: minPodReadyAge,
}
}
func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int, podCondition corev1.PodCondition, podDeletionTimestamp time.Time) {
for i := 0; i < replicas; i++ {
testPod := &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("test-pod-%d", i),
Labels: map[string]string{applicationLabelName: applicationLabelValue},
Annotations: map[string]string{
fmt.Sprintf("metric-config.pods.%s.json-path/port", metricName): port,
},
},
Status: corev1.PodStatus{
PodIP: testServer,
Conditions: []corev1.PodCondition{podCondition},
},
}
if podDeletionTimestamp.IsZero() {
testPod.ObjectMeta.DeletionTimestamp = nil
} else {
testPod.ObjectMeta.DeletionTimestamp = &v1.Time{Time: podDeletionTimestamp}
}
_, err := client.CoreV1().Pods(testNamespace).Create(context.Background(), testPod, v1.CreateOptions{})
require.NoError(t, err)
}
}
func makeTestDeployment(t *testing.T, client kubernetes.Interface) *appsv1.Deployment {
deployment := appsv1.Deployment{
ObjectMeta: v1.ObjectMeta{Name: testDeploymentName},
Spec: appsv1.DeploymentSpec{
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{applicationLabelName: applicationLabelValue},
},
},
}
_, err := client.AppsV1().Deployments(testNamespace).Create(context.Background(), &deployment, v1.CreateOptions{})
require.NoError(t, err)
return &deployment
}
func makeTestRollout(t *testing.T, argoRolloutsClient *argorolloutsfake.Clientset) *argorolloutsv1alpha1.Rollout {
rollout := &argorolloutsv1alpha1.Rollout{
ObjectMeta: v1.ObjectMeta{
Name: testRolloutName,
},
Spec: argorolloutsv1alpha1.RolloutSpec{
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{applicationLabelName: applicationLabelValue},
},
},
}
_, err := argoRolloutsClient.ArgoprojV1alpha1().Rollouts(testNamespace).Create(context.Background(), rollout, v1.CreateOptions{})
require.NoError(t, err)
return rollout
}
func makeTestHPA(t *testing.T, client kubernetes.Interface) *autoscalingv2.HorizontalPodAutoscaler {
hpa := &autoscalingv2.HorizontalPodAutoscaler{
ObjectMeta: v1.ObjectMeta{
Name: "test-hpa",
Namespace: testNamespace,
},
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
Kind: "Deployment",
Name: testDeploymentName,
APIVersion: "apps/v1",
},
},
}
_, err := client.AutoscalingV2().HorizontalPodAutoscalers("test-namespace").Create(context.Background(), hpa, v1.CreateOptions{})
require.NoError(t, err)
return hpa
}
func makeTestHPAForRollout(t *testing.T, client kubernetes.Interface) *autoscalingv2.HorizontalPodAutoscaler {
hpa := &autoscalingv2.HorizontalPodAutoscaler{
ObjectMeta: v1.ObjectMeta{
Name: "test-hpa-rollout",
Namespace: testNamespace,
},
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
Kind: "Rollout",
Name: testRolloutName,
APIVersion: "argoproj.io/v1alpha1",
},
},
}
_, err := client.AutoscalingV2().HorizontalPodAutoscalers(testNamespace).Create(context.Background(), hpa, v1.CreateOptions{})
require.NoError(t, err)
return hpa
}