Fetch pod metrics in parallel

Fetching metrics from pods sequentially, with a large number of
pods, can result in poor performance when some of those pods have
been terminated by the HPA in a normal scale down event.

Signed-off-by: Abe Friesen <2319792+doyshinda@users.noreply.github.com>
This commit is contained in:
Abe Friesen
2020-05-13 20:05:29 -06:00
parent 7b7af889fb
commit 3e7b66070c
3 changed files with 44 additions and 24 deletions
+35 -23
View File
@@ -10,6 +10,7 @@ import (
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
@@ -85,32 +86,20 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
return nil, err
}
values := make([]CollectedMetric, 0, len(pods.Items))
// TODO: get metrics in parallel
ch := make(chan CollectedMetric)
errCh := make(chan error)
for _, pod := range pods.Items {
value, err := c.Getter.GetMetric(&pod)
if err != nil {
c.logger.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
continue
}
go c.getPodMetric(pod, ch, errCh)
}
metricValue := CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: custom_metrics.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
Namespace: pod.Namespace,
},
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
},
values := make([]CollectedMetric, 0, len(pods.Items))
for i := 0; i < len(pods.Items); i++ {
select {
case err := <- errCh:
c.logger.Error(err)
case resp := <- ch:
values = append(values, resp)
}
values = append(values, metricValue)
}
return values, nil
@@ -120,6 +109,29 @@ func (c *PodCollector) Interval() time.Duration {
return c.interval
}
func (c *PodCollector) getPodMetric(pod corev1.Pod, ch chan CollectedMetric, errCh chan error) {
value, err := c.Getter.GetMetric(&pod)
if err != nil {
errCh <- fmt.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
return
}
ch <- CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: custom_metrics.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
Namespace: pod.Namespace,
},
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
},
}
}
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) {
switch hpa.Spec.ScaleTargetRef.Kind {
case "Deployment":
+6 -1
View File
@@ -8,6 +8,7 @@ import (
"net/url"
"testing"
"time"
"sync"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
@@ -55,7 +56,7 @@ func TestPodCollector(t *testing.T) {
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.Equal(t, tc.result, values)
require.ElementsMatch(t, tc.result, values)
})
}
}
@@ -68,9 +69,13 @@ type testMetricsHandler struct {
calledCounter uint
t *testing.T
metricsPath string
sync.RWMutex
}
func (h *testMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.Lock()
defer h.Unlock()
require.Equal(h.t, h.metricsPath, r.URL.Path)
require.Less(h.t, int(h.calledCounter), len(h.values))
response, err := json.Marshal(testMetricResponse{Values: h.values[h.calledCounter]})