mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2024-12-22 19:16:06 +00:00
Merge pull request #152 from doyshinda/parallel_get_metrics
Fetch pod metrics in parallel
This commit is contained in:
commit
78c2ef742b
3
go.sum
3
go.sum
@ -166,6 +166,7 @@ github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
|
||||
github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ=
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
|
||||
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
@ -504,6 +505,7 @@ golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82 h1:ywK/j/KkyTHcdyYSZNXGjMwgmDSfjglYZ3vStQ/gSCU=
|
||||
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f h1:gWF768j/LaZugp8dyS4UwsslYCYz9XgFxvlgsn0n9H8=
|
||||
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
@ -563,6 +565,7 @@ google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLY
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
|
||||
google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw=
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||
@ -85,32 +86,20 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
values := make([]CollectedMetric, 0, len(pods.Items))
|
||||
|
||||
// TODO: get metrics in parallel
|
||||
ch := make(chan CollectedMetric)
|
||||
errCh := make(chan error)
|
||||
for _, pod := range pods.Items {
|
||||
value, err := c.Getter.GetMetric(&pod)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
|
||||
continue
|
||||
}
|
||||
go c.getPodMetric(pod, ch, errCh)
|
||||
}
|
||||
|
||||
metricValue := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Custom: custom_metrics.MetricValue{
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
APIVersion: "v1",
|
||||
Kind: "Pod",
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
},
|
||||
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
|
||||
},
|
||||
values := make([]CollectedMetric, 0, len(pods.Items))
|
||||
for i := 0; i < len(pods.Items); i++ {
|
||||
select {
|
||||
case err := <- errCh:
|
||||
c.logger.Error(err)
|
||||
case resp := <- ch:
|
||||
values = append(values, resp)
|
||||
}
|
||||
|
||||
values = append(values, metricValue)
|
||||
}
|
||||
|
||||
return values, nil
|
||||
@ -120,6 +109,29 @@ func (c *PodCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
||||
|
||||
func (c *PodCollector) getPodMetric(pod corev1.Pod, ch chan CollectedMetric, errCh chan error) {
|
||||
value, err := c.Getter.GetMetric(&pod)
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
ch <- CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Custom: custom_metrics.MetricValue{
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
APIVersion: "v1",
|
||||
Kind: "Pod",
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
},
|
||||
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) {
|
||||
switch hpa.Spec.ScaleTargetRef.Kind {
|
||||
case "Deployment":
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
"sync"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
@ -55,7 +56,7 @@ func TestPodCollector(t *testing.T) {
|
||||
for _, m := range metrics {
|
||||
values = append(values, m.Custom.Value.Value())
|
||||
}
|
||||
require.Equal(t, tc.result, values)
|
||||
require.ElementsMatch(t, tc.result, values)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -68,9 +69,13 @@ type testMetricsHandler struct {
|
||||
calledCounter uint
|
||||
t *testing.T
|
||||
metricsPath string
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func (h *testMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
require.Equal(h.t, h.metricsPath, r.URL.Path)
|
||||
require.Less(h.t, int(h.calledCounter), len(h.values))
|
||||
response, err := json.Marshal(testMetricResponse{Values: h.values[h.calledCounter]})
|
||||
|
Loading…
x
Reference in New Issue
Block a user