From 3e7b66070cd54d28e579904a3db7585dc588873a Mon Sep 17 00:00:00 2001 From: Abe Friesen <2319792+doyshinda@users.noreply.github.com> Date: Wed, 13 May 2020 20:05:29 -0600 Subject: [PATCH] Fetch pod metrics in parallel Fetching metrics from pods sequentially, with a large number of pods, can result in poor performance when some of those pods have been terminated by the HPA in a normal scale down event. Signed-off-by: Abe Friesen <2319792+doyshinda@users.noreply.github.com> --- go.sum | 3 ++ pkg/collector/pod_collector.go | 58 +++++++++++++++++------------ pkg/collector/pod_collector_test.go | 7 +++- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/go.sum b/go.sum index 8f14b6f..40bff7f 100644 --- a/go.sum +++ b/go.sum @@ -166,6 +166,7 @@ github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -504,6 +505,7 @@ golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82 h1:ywK/j/KkyTHcdyYSZNXGjMwgmDSfjglYZ3vStQ/gSCU= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f h1:gWF768j/LaZugp8dyS4UwsslYCYz9XgFxvlgsn0n9H8= golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -563,6 +565,7 @@ google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLY google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/collector/pod_collector.go b/pkg/collector/pod_collector.go index e245935..5502f7c 100644 --- a/pkg/collector/pod_collector.go +++ b/pkg/collector/pod_collector.go @@ -10,6 +10,7 @@ import ( autoscalingv2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/metrics/pkg/apis/custom_metrics" @@ -85,32 +86,20 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) { return nil, err } - values := make([]CollectedMetric, 0, len(pods.Items)) - - // TODO: get metrics in parallel + ch := make(chan CollectedMetric) + errCh := make(chan error) for _, pod := range pods.Items { - value, err := c.Getter.GetMetric(&pod) - if err != nil { - c.logger.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err) - continue - } + go c.getPodMetric(pod, ch, errCh) + } - metricValue := CollectedMetric{ - Type: c.metricType, - Custom: custom_metrics.MetricValue{ - DescribedObject: custom_metrics.ObjectReference{ - APIVersion: "v1", - Kind: "Pod", - Name: pod.Name, - Namespace: pod.Namespace, - }, - Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector}, - Timestamp: metav1.Time{Time: time.Now().UTC()}, - Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI), - }, + values := make([]CollectedMetric, 0, len(pods.Items)) + for i := 0; i < len(pods.Items); i++ { + select { + case err := <- errCh: + c.logger.Error(err) + case resp := <- ch: + values = append(values, resp) } - - values = append(values, metricValue) } return values, nil @@ -120,6 +109,29 @@ func (c *PodCollector) Interval() time.Duration { return c.interval } +func (c *PodCollector) getPodMetric(pod corev1.Pod, ch chan CollectedMetric, errCh chan error) { + value, err := c.Getter.GetMetric(&pod) + if err != nil { + errCh <- fmt.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err) + return + } + + ch <- CollectedMetric{ + Type: c.metricType, + Custom: custom_metrics.MetricValue{ + DescribedObject: custom_metrics.ObjectReference{ + APIVersion: "v1", + Kind: "Pod", + Name: pod.Name, + Namespace: pod.Namespace, + }, + Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector}, + Timestamp: metav1.Time{Time: time.Now().UTC()}, + Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI), + }, + } +} + func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) { switch hpa.Spec.ScaleTargetRef.Kind { case "Deployment": diff --git a/pkg/collector/pod_collector_test.go b/pkg/collector/pod_collector_test.go index 45a7f93..ef9751c 100644 --- a/pkg/collector/pod_collector_test.go +++ b/pkg/collector/pod_collector_test.go @@ -8,6 +8,7 @@ import ( "net/url" "testing" "time" + "sync" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" @@ -55,7 +56,7 @@ func TestPodCollector(t *testing.T) { for _, m := range metrics { values = append(values, m.Custom.Value.Value()) } - require.Equal(t, tc.result, values) + require.ElementsMatch(t, tc.result, values) }) } } @@ -68,9 +69,13 @@ type testMetricsHandler struct { calledCounter uint t *testing.T metricsPath string + sync.RWMutex } func (h *testMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.Lock() + defer h.Unlock() + require.Equal(h.t, h.metricsPath, r.URL.Path) require.Less(h.t, int(h.calledCounter), len(h.values)) response, err := json.Marshal(testMetricResponse{Values: h.values[h.calledCounter]})