mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2025-08-01 00:05:43 +00:00
Compare commits
59 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
dbed1570ba | ||
![]() |
ef24244074 | ||
![]() |
cadf2dff3c | ||
![]() |
db83268343 | ||
![]() |
78c2ef742b | ||
![]() |
f0b817629c | ||
![]() |
3e7b66070c | ||
![]() |
f3d33b6132 | ||
![]() |
7b7af889fb | ||
![]() |
2bcaa80ce0 | ||
![]() |
a2e6980b25 | ||
![]() |
f0783dcdd8 | ||
![]() |
560fb40dae | ||
![]() |
41229e370d | ||
![]() |
338393c11e | ||
![]() |
8e7d1bc0d6 | ||
![]() |
dc7fb5875b | ||
![]() |
8f70d3493e | ||
![]() |
97ec13d010 | ||
![]() |
5e20dcd961 | ||
![]() |
f27dbc3939 | ||
![]() |
6443613930 | ||
![]() |
d9c2e50f2c | ||
![]() |
a8cd761f85 | ||
![]() |
43fa626ca1 | ||
![]() |
2d3ddc53ef | ||
![]() |
02880fad2d | ||
![]() |
405e347620 | ||
![]() |
f15aacad5c | ||
![]() |
c0a9f525b8 | ||
![]() |
24207d285c | ||
![]() |
860aba807e | ||
![]() |
73659f8ac6 | ||
![]() |
280d358538 | ||
![]() |
735bb164e2 | ||
![]() |
d28712abd1 | ||
![]() |
670692b23c | ||
![]() |
33683fe88d | ||
![]() |
abaef5e491 | ||
![]() |
670f081a5e | ||
![]() |
55a743e5ac | ||
![]() |
830d385a4a | ||
![]() |
f2638c6843 | ||
![]() |
b77fba2e7b | ||
![]() |
e77d51f97a | ||
![]() |
69ac42ed7d | ||
![]() |
582f05539c | ||
![]() |
46f2e5c4fd | ||
![]() |
31bc1771df | ||
![]() |
836c78d08b | ||
![]() |
6f5ab042e6 | ||
![]() |
5abc7388fb | ||
![]() |
5bf87cb10e | ||
![]() |
c6610750e4 | ||
![]() |
04ae6d955e | ||
![]() |
2d56c202a7 | ||
![]() |
c9fa15c7d4 | ||
![]() |
e3330dcf43 | ||
![]() |
8e4662b26c |
@@ -2,7 +2,7 @@ language: go
|
|||||||
dist: xenial
|
dist: xenial
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- "1.13.x"
|
- "1.14.x"
|
||||||
|
|
||||||
env:
|
env:
|
||||||
- GO111MODULE=on GOLANGCI_RELEASE="v1.21.0"
|
- GO111MODULE=on GOLANGCI_RELEASE="v1.21.0"
|
||||||
|
87
README.md
87
README.md
@@ -155,6 +155,25 @@ values of JSONPath expressions that evaluate to arrays/slices of numbers.
|
|||||||
It's optional but when the expression evaluates to an array/slice, it's absence will
|
It's optional but when the expression evaluates to an array/slice, it's absence will
|
||||||
produce an error. The supported aggregation functions are `avg`, `max`, `min` and `sum`.
|
produce an error. The supported aggregation functions are `avg`, `max`, `min` and `sum`.
|
||||||
|
|
||||||
|
The `raw-query` configuration option specifies the query params to send along to the endpoint:
|
||||||
|
```yaml
|
||||||
|
metric-config.pods.requests-per-second.json-path/path: /metrics
|
||||||
|
metric-config.pods.requests-per-second.json-path/port: "9090"
|
||||||
|
metric-config.pods.requests-per-second.json-path/raw-query: "foo=bar&baz=bop"
|
||||||
|
```
|
||||||
|
will create a URL like this:
|
||||||
|
```
|
||||||
|
http://<podIP>:9090/metrics?foo=bar&baz=bop
|
||||||
|
```
|
||||||
|
|
||||||
|
There are also configuration options for custom (connect and request) timeouts when querying pods for metrics:
|
||||||
|
```yaml
|
||||||
|
metric-config.pods.requests-per-second.json-path/request-timeout: 2s
|
||||||
|
metric-config.pods.requests-per-second.json-path/connect-timeout: 500ms
|
||||||
|
```
|
||||||
|
|
||||||
|
The default for both of the above values is 15 seconds.
|
||||||
|
|
||||||
## Prometheus collector
|
## Prometheus collector
|
||||||
|
|
||||||
The Prometheus collector is a generic collector which can map Prometheus
|
The Prometheus collector is a generic collector which can map Prometheus
|
||||||
@@ -369,10 +388,11 @@ metadata:
|
|||||||
# instead of using the ones specified via CLI. Respectively:
|
# instead of using the ones specified via CLI. Respectively:
|
||||||
# - --influxdb-address
|
# - --influxdb-address
|
||||||
# - --influxdb-token
|
# - --influxdb-token
|
||||||
# - --influxdb-org-id
|
# - --influxdb-org
|
||||||
metric-config.external.flux-query.influxdb/address: "http://influxdbv2.my-namespace.svc"
|
metric-config.external.flux-query.influxdb/address: "http://influxdbv2.my-namespace.svc"
|
||||||
metric-config.external.flux-query.influxdb/token: "secret-token"
|
metric-config.external.flux-query.influxdb/token: "secret-token"
|
||||||
metric-config.external.flux-query.influxdb/org-id: "deadbeef"
|
# This could be either the organization name or the ID.
|
||||||
|
metric-config.external.flux-query.influxdb/org: "deadbeef"
|
||||||
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
|
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
|
||||||
# <configKey> == query-name
|
# <configKey> == query-name
|
||||||
metric-config.external.flux-query.influxdb/queue_depth: |
|
metric-config.external.flux-query.influxdb/queue_depth: |
|
||||||
@@ -523,9 +543,9 @@ spec:
|
|||||||
tag-application: my-custom-app
|
tag-application: my-custom-app
|
||||||
aggregators: avg # comma separated list of aggregation functions, default: last
|
aggregators: avg # comma separated list of aggregation functions, default: last
|
||||||
duration: 5m # default: 10m
|
duration: 5m # default: 10m
|
||||||
target:
|
target:
|
||||||
averageValue: "30"
|
averageValue: "30"
|
||||||
type: AverageValue
|
type: AverageValue
|
||||||
```
|
```
|
||||||
|
|
||||||
The `check-id` specifies the ZMON check to query for the metrics. `key`
|
The `check-id` specifies the ZMON check to query for the metrics. `key`
|
||||||
@@ -573,3 +593,60 @@ you need to define a `key` or other `tag` with a "star" query syntax like
|
|||||||
`values.*`. This *hack* is in place because it's not allowed to use `*` in the
|
`values.*`. This *hack* is in place because it's not allowed to use `*` in the
|
||||||
metric label definitions. If both annotations and corresponding label is
|
metric label definitions. If both annotations and corresponding label is
|
||||||
defined, then the annotation takes precedence.
|
defined, then the annotation takes precedence.
|
||||||
|
|
||||||
|
## HTTP Collector
|
||||||
|
|
||||||
|
The http collector allows collecting metrics from an external endpoint specified in the HPA.
|
||||||
|
Currently only `json-path` collection is supported.
|
||||||
|
|
||||||
|
### Supported metrics
|
||||||
|
|
||||||
|
| Metric | Description | Type | K8s Versions |
|
||||||
|
| ------------ | -------------- | ------- | -- |
|
||||||
|
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Pods | `>=1.12` |
|
||||||
|
|
||||||
|
### Example
|
||||||
|
|
||||||
|
This is an example of using the HTTP collector to collect metrics from a json
|
||||||
|
metrics endpoint specified in the annotations.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
apiVersion: autoscaling/v2beta2
|
||||||
|
kind: HorizontalPodAutoscaler
|
||||||
|
metadata:
|
||||||
|
name: myapp-hpa
|
||||||
|
annotations:
|
||||||
|
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
|
||||||
|
metric-config.external.http.json/json-key: "$.some-metric.value"
|
||||||
|
metric-config.external.http.json/endpoint: "http://metric-source.app-namespace:8080/metrics"
|
||||||
|
metric-config.external.http.json/aggregator: "max"
|
||||||
|
spec:
|
||||||
|
scaleTargetRef:
|
||||||
|
apiVersion: apps/v1
|
||||||
|
kind: Deployment
|
||||||
|
name: myapp
|
||||||
|
minReplicas: 1
|
||||||
|
maxReplicas: 10
|
||||||
|
metrics:
|
||||||
|
- type: External
|
||||||
|
external:
|
||||||
|
metric:
|
||||||
|
name: http
|
||||||
|
selector:
|
||||||
|
matchLabels:
|
||||||
|
identifier: unique-metric-name
|
||||||
|
target:
|
||||||
|
averageValue: 1
|
||||||
|
type: AverageValue
|
||||||
|
```
|
||||||
|
|
||||||
|
The HTTP collector similar to the Pod Metrics collector. The metric name should always be `http`.
|
||||||
|
This value is also used in the annotations to configure the metrics adapter to query the required
|
||||||
|
target. The following configuration values are supported:
|
||||||
|
|
||||||
|
- `json-key` to specify the JSON path of the metric to be queried
|
||||||
|
- `endpoint` the fully formed path to query for the metric. In the above example a Kubernetes _Service_
|
||||||
|
in the namespace `app-namespace` is called.
|
||||||
|
- `aggregator` is only required if the metric is an array of values and specifies how the values
|
||||||
|
are aggregated. Currently this option can support the values: `sum`, `max`, `min`, `avg`.
|
||||||
|
|
||||||
|
42
go.mod
42
go.mod
@@ -2,31 +2,31 @@ module github.com/zalando-incubator/kube-metrics-adapter
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/NYTimes/gziphandler v1.0.1 // indirect
|
github.com/NYTimes/gziphandler v1.0.1 // indirect
|
||||||
github.com/aws/aws-sdk-go v1.16.6
|
github.com/aws/aws-sdk-go v1.30.24
|
||||||
github.com/coreos/go-systemd v0.0.0-20180705093442-88bfeed483d3 // indirect
|
|
||||||
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
|
|
||||||
github.com/googleapis/gnostic v0.2.0 // indirect
|
github.com/googleapis/gnostic v0.2.0 // indirect
|
||||||
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
|
github.com/influxdata/influxdb-client-go v0.1.5
|
||||||
github.com/influxdata/influxdb-client-go v0.1.4
|
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20200323093244-5046ce1afe6b
|
||||||
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
|
github.com/lib/pq v1.2.0 // indirect
|
||||||
|
github.com/mattn/go-colorable v0.1.4 // indirect
|
||||||
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
|
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
|
||||||
github.com/prometheus/client_golang v0.9.2
|
github.com/onsi/ginkgo v1.11.0 // indirect
|
||||||
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
|
github.com/onsi/gomega v1.8.1 // indirect
|
||||||
github.com/sirupsen/logrus v1.4.2
|
github.com/prometheus/client_golang v1.6.0
|
||||||
github.com/soheilhy/cmux v0.1.4 // indirect
|
github.com/prometheus/common v0.9.1
|
||||||
github.com/spf13/cobra v0.0.3
|
github.com/sirupsen/logrus v1.6.0
|
||||||
github.com/stretchr/testify v1.3.0
|
github.com/spf13/cobra v0.0.7
|
||||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
|
github.com/stretchr/testify v1.5.1
|
||||||
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
|
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
|
||||||
|
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 // indirect
|
||||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
golang.org/x/tools v0.0.0-20200204192400-7124308813f3 // indirect
|
||||||
k8s.io/api v0.0.0-20190918155943-95b840bb6a1f
|
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
|
||||||
k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655
|
k8s.io/api v0.17.3
|
||||||
k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad // indirect
|
k8s.io/apimachinery v0.17.4
|
||||||
k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90
|
k8s.io/client-go v0.17.3
|
||||||
k8s.io/component-base v0.0.0-20190918160511-547f6c5d7090
|
k8s.io/component-base v0.17.3
|
||||||
k8s.io/klog v0.4.0
|
k8s.io/klog v1.0.0
|
||||||
k8s.io/metrics v0.0.0-20190226180357-f3f09b9076d1
|
k8s.io/metrics v0.17.3
|
||||||
)
|
)
|
||||||
|
|
||||||
go 1.13
|
go 1.13
|
||||||
|
1
main.go
1
main.go
@@ -18,6 +18,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
|
@@ -86,6 +86,21 @@ func TestParser(t *testing.T) {
|
|||||||
"org-id": "deadbeef",
|
"org-id": "deadbeef",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "http metrics",
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"metric-config.external.http.json/json-key": "$.metric.value",
|
||||||
|
"metric-config.external.http.json/endpoint": "http://metric-source.source-namespace.svc.cluster.local:8000/metrics",
|
||||||
|
"metric-config.external.http.json/aggregator": "avg",
|
||||||
|
},
|
||||||
|
MetricName: "http",
|
||||||
|
MetricType: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
ExpectedConfig: map[string]string{
|
||||||
|
"json-key": "$.metric.value",
|
||||||
|
"endpoint": "http://metric-source.source-namespace.svc.cluster.local:8000/metrics",
|
||||||
|
"aggregator": "avg",
|
||||||
|
},
|
||||||
|
},
|
||||||
} {
|
} {
|
||||||
t.Run(tc.Name, func(t *testing.T) {
|
t.Run(tc.Name, func(t *testing.T) {
|
||||||
hpaMap := make(AnnotationConfigMap)
|
hpaMap := make(AnnotationConfigMap)
|
||||||
|
@@ -46,6 +46,14 @@ type CollectorPlugin interface {
|
|||||||
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
|
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PluginNotFoundError struct {
|
||||||
|
metricTypeName MetricTypeName
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PluginNotFoundError) Error() string {
|
||||||
|
return fmt.Sprintf("no plugin found for %s", p.metricTypeName)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
|
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
|
||||||
if metricCollector == "" {
|
if metricCollector == "" {
|
||||||
c.podsPlugins.Any = plugin
|
c.podsPlugins.Any = plugin
|
||||||
@@ -139,7 +147,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscal
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
|
return nil, &PluginNotFoundError{metricTypeName: config.MetricTypeName}
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricTypeName struct {
|
type MetricTypeName struct {
|
||||||
|
103
pkg/collector/http_collector.go
Normal file
103
pkg/collector/http_collector.go
Normal file
@@ -0,0 +1,103 @@
|
|||||||
|
package collector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
|
||||||
|
|
||||||
|
"github.com/oliveagle/jsonpath"
|
||||||
|
"k8s.io/api/autoscaling/v2beta2"
|
||||||
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
HTTPMetricName = "http"
|
||||||
|
HTTPEndpointAnnotationKey = "endpoint"
|
||||||
|
HTTPJsonPathAnnotationKey = "json-key"
|
||||||
|
identifierLabel = "identifier"
|
||||||
|
)
|
||||||
|
|
||||||
|
type HTTPCollectorPlugin struct{}
|
||||||
|
|
||||||
|
func NewHTTPCollectorPlugin() (*HTTPCollectorPlugin, error) {
|
||||||
|
return &HTTPCollectorPlugin{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *HTTPCollectorPlugin) NewCollector(_ *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||||
|
collector := &HTTPCollector{}
|
||||||
|
var (
|
||||||
|
value string
|
||||||
|
ok bool
|
||||||
|
)
|
||||||
|
if value, ok = config.Config[HTTPJsonPathAnnotationKey]; !ok {
|
||||||
|
return nil, fmt.Errorf("config value %s not found", HTTPJsonPathAnnotationKey)
|
||||||
|
}
|
||||||
|
jsonPath, err := jsonpath.Compile(value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse json path: %v", err)
|
||||||
|
}
|
||||||
|
collector.jsonPath = jsonPath
|
||||||
|
if value, ok = config.Config[HTTPEndpointAnnotationKey]; !ok {
|
||||||
|
return nil, fmt.Errorf("config value %s not found", HTTPEndpointAnnotationKey)
|
||||||
|
}
|
||||||
|
collector.endpoint, err = url.Parse(value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
collector.interval = interval
|
||||||
|
collector.metricType = config.Type
|
||||||
|
if config.Metric.Selector == nil || config.Metric.Selector.MatchLabels == nil {
|
||||||
|
return nil, fmt.Errorf("no label selector specified for metric: %s", config.Metric.Name)
|
||||||
|
}
|
||||||
|
if _, ok := config.Metric.Selector.MatchLabels[identifierLabel]; !ok {
|
||||||
|
return nil, fmt.Errorf("%s is not specified as a label for metric %s", identifierLabel, config.Metric.Name)
|
||||||
|
}
|
||||||
|
collector.metric = config.Metric
|
||||||
|
var aggFunc httpmetrics.AggregatorFunc
|
||||||
|
|
||||||
|
if val, ok := config.Config["aggregator"]; ok {
|
||||||
|
aggFunc, err = httpmetrics.ParseAggregator(val)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
collector.metricsGetter = httpmetrics.NewJSONPathMetricsGetter(httpmetrics.DefaultMetricsHTTPClient(), aggFunc, jsonPath)
|
||||||
|
return collector, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type HTTPCollector struct {
|
||||||
|
endpoint *url.URL
|
||||||
|
jsonPath *jsonpath.Compiled
|
||||||
|
interval time.Duration
|
||||||
|
metricType v2beta2.MetricSourceType
|
||||||
|
metricsGetter *httpmetrics.JSONPathMetricsGetter
|
||||||
|
metric v2beta2.MetricIdentifier
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *HTTPCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||||
|
metric, err := c.metricsGetter.GetMetric(*c.endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
value := CollectedMetric{
|
||||||
|
Type: c.metricType,
|
||||||
|
External: external_metrics.ExternalMetricValue{
|
||||||
|
MetricName: c.metric.Name,
|
||||||
|
MetricLabels: c.metric.Selector.MatchLabels,
|
||||||
|
Timestamp: metav1.Time{
|
||||||
|
Time: time.Now(),
|
||||||
|
},
|
||||||
|
Value: *resource.NewMilliQuantity(int64(metric*1000), resource.DecimalSI),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return []CollectedMetric{value}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *HTTPCollector) Interval() time.Duration {
|
||||||
|
return c.interval
|
||||||
|
}
|
94
pkg/collector/http_collector_test.go
Normal file
94
pkg/collector/http_collector_test.go
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
package collector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"k8s.io/api/autoscaling/v2beta2"
|
||||||
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testExternalMetricsHandler struct {
|
||||||
|
values []int64
|
||||||
|
test *testing.T
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t testExternalMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
response, err := json.Marshal(testMetricResponse{t.values})
|
||||||
|
require.NoError(t.test, err)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_, err = w.Write(response)
|
||||||
|
require.NoError(t.test, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeHTTPTestServer(t *testing.T, values []int64) string {
|
||||||
|
server := httptest.NewServer(&testExternalMetricsHandler{values: values, test: t})
|
||||||
|
return server.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHTTPCollector(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
values []int64
|
||||||
|
output int
|
||||||
|
aggregator string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "basic",
|
||||||
|
values: []int64{3},
|
||||||
|
output: 3,
|
||||||
|
aggregator: "sum",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "sum",
|
||||||
|
values: []int64{3, 5, 6},
|
||||||
|
aggregator: "sum",
|
||||||
|
output: 14,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "average",
|
||||||
|
values: []int64{3, 5, 6},
|
||||||
|
aggregator: "sum",
|
||||||
|
output: 14,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
testServer := makeHTTPTestServer(t, tc.values)
|
||||||
|
plugin, err := NewHTTPCollectorPlugin()
|
||||||
|
require.NoError(t, err)
|
||||||
|
testConfig := makeTestHTTPCollectorConfig(testServer, tc.aggregator)
|
||||||
|
collector, err := plugin.NewCollector(nil, testConfig, testInterval)
|
||||||
|
require.NoError(t, err)
|
||||||
|
metrics, err := collector.GetMetrics()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, metrics)
|
||||||
|
require.Len(t, metrics, 1)
|
||||||
|
require.EqualValues(t, metrics[0].External.Value.Value(), tc.output)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeTestHTTPCollectorConfig(endpoint, aggregator string) *MetricConfig {
|
||||||
|
config := &MetricConfig{
|
||||||
|
MetricTypeName: MetricTypeName{
|
||||||
|
Type: v2beta2.ExternalMetricSourceType,
|
||||||
|
Metric: v2beta2.MetricIdentifier{
|
||||||
|
Name: "test-metric",
|
||||||
|
Selector: &v1.LabelSelector{
|
||||||
|
MatchLabels: map[string]string{identifierLabel: "test-metric"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Config: map[string]string{
|
||||||
|
HTTPJsonPathAnnotationKey: "$.values",
|
||||||
|
HTTPEndpointAnnotationKey: endpoint,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if aggregator != "" {
|
||||||
|
config.Config["aggregator"] = aggregator
|
||||||
|
}
|
||||||
|
return config
|
||||||
|
}
|
65
pkg/collector/httpmetrics/aggregator.go
Normal file
65
pkg/collector/httpmetrics/aggregator.go
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
package httpmetrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AggregatorFunc func(...float64) float64
|
||||||
|
|
||||||
|
// Average implements the average mathematical function over a slice of float64
|
||||||
|
func Average(values ...float64) float64 {
|
||||||
|
sum := Sum(values...)
|
||||||
|
return sum / float64(len(values))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Minimum implements the absolute minimum mathematical function over a slice of float64
|
||||||
|
func Minimum(values ...float64) float64 {
|
||||||
|
// initialized with positive infinity, all finite numbers are smaller than it
|
||||||
|
curMin := math.Inf(1)
|
||||||
|
for _, v := range values {
|
||||||
|
if v < curMin {
|
||||||
|
curMin = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return curMin
|
||||||
|
}
|
||||||
|
|
||||||
|
// Maximum implements the absolute maximum mathematical function over a slice of float64
|
||||||
|
func Maximum(values ...float64) float64 {
|
||||||
|
// initialized with negative infinity, all finite numbers are bigger than it
|
||||||
|
curMax := math.Inf(-1)
|
||||||
|
for _, v := range values {
|
||||||
|
if v > curMax {
|
||||||
|
curMax = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return curMax
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sum implements the summation mathematical function over a slice of float64
|
||||||
|
func Sum(values ...float64) float64 {
|
||||||
|
res := 0.0
|
||||||
|
|
||||||
|
for _, v := range values {
|
||||||
|
res += v
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
|
||||||
|
func ParseAggregator(aggregator string) (AggregatorFunc, error) {
|
||||||
|
switch aggregator {
|
||||||
|
case "avg":
|
||||||
|
return Average, nil
|
||||||
|
case "min":
|
||||||
|
return Minimum, nil
|
||||||
|
case "max":
|
||||||
|
return Maximum, nil
|
||||||
|
case "sum":
|
||||||
|
return Sum, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("aggregator function: %s is unknown", aggregator)
|
||||||
|
}
|
||||||
|
}
|
57
pkg/collector/httpmetrics/aggregator_test.go
Normal file
57
pkg/collector/httpmetrics/aggregator_test.go
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
package httpmetrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReduce(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
input []float64
|
||||||
|
output float64
|
||||||
|
aggregator string
|
||||||
|
parseError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
input: []float64{1, 2, 3},
|
||||||
|
output: 2.0,
|
||||||
|
aggregator: "avg",
|
||||||
|
parseError: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: []float64{1, 2, 3},
|
||||||
|
output: 1.0,
|
||||||
|
aggregator: "min",
|
||||||
|
parseError: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: []float64{1, 2, 3},
|
||||||
|
output: 3.0,
|
||||||
|
aggregator: "max",
|
||||||
|
parseError: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: []float64{1, 2, 3},
|
||||||
|
output: 6.0,
|
||||||
|
aggregator: "sum",
|
||||||
|
parseError: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
input: []float64{1, 2, 3},
|
||||||
|
aggregator: "non-existent",
|
||||||
|
parseError: true,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(fmt.Sprintf("Test function: %s", tc.aggregator), func(t *testing.T) {
|
||||||
|
aggFunc, err := ParseAggregator(tc.aggregator)
|
||||||
|
if tc.parseError {
|
||||||
|
require.Error(t, err)
|
||||||
|
} else {
|
||||||
|
val := aggFunc(tc.input...)
|
||||||
|
require.Equal(t, tc.output, val)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
136
pkg/collector/httpmetrics/json_path.go
Normal file
136
pkg/collector/httpmetrics/json_path.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
package httpmetrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/oliveagle/jsonpath"
|
||||||
|
)
|
||||||
|
|
||||||
|
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
|
||||||
|
// querying the pods metrics endpoint and lookup the metric value as defined by
|
||||||
|
// the json path query.
|
||||||
|
type JSONPathMetricsGetter struct {
|
||||||
|
jsonPath *jsonpath.Compiled
|
||||||
|
aggregator AggregatorFunc
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
||||||
|
func NewJSONPathMetricsGetter(httpClient *http.Client, aggregatorFunc AggregatorFunc, compiledPath *jsonpath.Compiled) *JSONPathMetricsGetter {
|
||||||
|
return &JSONPathMetricsGetter{client: httpClient, aggregator: aggregatorFunc, jsonPath: compiledPath}
|
||||||
|
}
|
||||||
|
|
||||||
|
var DefaultRequestTimeout = 15 * time.Second
|
||||||
|
var DefaultConnectTimeout = 15 * time.Second
|
||||||
|
|
||||||
|
func CustomMetricsHTTPClient(requestTimeout time.Duration, connectTimeout time.Duration) *http.Client {
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
DialContext: (&net.Dialer{
|
||||||
|
Timeout: connectTimeout,
|
||||||
|
}).DialContext,
|
||||||
|
MaxIdleConns: 50,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
|
},
|
||||||
|
Timeout: requestTimeout,
|
||||||
|
}
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
|
||||||
|
func DefaultMetricsHTTPClient() *http.Client {
|
||||||
|
return CustomMetricsHTTPClient(DefaultRequestTimeout, DefaultConnectTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMetric gets metric from pod by fetching json metrics from the pods metric
|
||||||
|
// endpoint and extracting the desired value using the specified json path
|
||||||
|
// query.
|
||||||
|
func (g *JSONPathMetricsGetter) GetMetric(metricsURL url.URL) (float64, error) {
|
||||||
|
data, err := g.fetchMetrics(metricsURL)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse data
|
||||||
|
var jsonData interface{}
|
||||||
|
err = json.Unmarshal(data, &jsonData)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := g.jsonPath.Lookup(jsonData)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch res := res.(type) {
|
||||||
|
case int:
|
||||||
|
return float64(res), nil
|
||||||
|
case float32:
|
||||||
|
return float64(res), nil
|
||||||
|
case float64:
|
||||||
|
return res, nil
|
||||||
|
case []interface{}:
|
||||||
|
if g.aggregator == nil {
|
||||||
|
return 0, fmt.Errorf("no aggregator function has been specified")
|
||||||
|
}
|
||||||
|
s, err := castSlice(res)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return g.aggregator(s...), nil
|
||||||
|
default:
|
||||||
|
return 0, fmt.Errorf("unsupported type %T", res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// castSlice takes a slice of interface and returns a slice of float64 if all
|
||||||
|
// values in slice were castable, else returns an error
|
||||||
|
func castSlice(in []interface{}) ([]float64, error) {
|
||||||
|
var out []float64
|
||||||
|
|
||||||
|
for _, v := range in {
|
||||||
|
switch v := v.(type) {
|
||||||
|
case int:
|
||||||
|
out = append(out, float64(v))
|
||||||
|
case float32:
|
||||||
|
out = append(out, float64(v))
|
||||||
|
case float64:
|
||||||
|
out = append(out, v)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *JSONPathMetricsGetter) fetchMetrics(metricsURL url.URL) ([]byte, error) {
|
||||||
|
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := g.client.Do(request)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
data, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
return data, nil
|
||||||
|
}
|
89
pkg/collector/httpmetrics/json_path_test.go
Normal file
89
pkg/collector/httpmetrics/json_path_test.go
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
package httpmetrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/oliveagle/jsonpath"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCastSlice(t *testing.T) {
|
||||||
|
res1, err1 := castSlice([]interface{}{1, 2, 3})
|
||||||
|
require.NoError(t, err1)
|
||||||
|
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
|
||||||
|
|
||||||
|
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
|
||||||
|
require.NoError(t, err2)
|
||||||
|
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
|
||||||
|
|
||||||
|
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
|
||||||
|
require.NoError(t, err3)
|
||||||
|
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
|
||||||
|
|
||||||
|
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
|
||||||
|
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
|
||||||
|
require.Equal(t, []float64(nil), res4)
|
||||||
|
}
|
||||||
|
|
||||||
|
type testValueResponse struct {
|
||||||
|
Value int64 `json:"value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type testValueArrayResponse struct {
|
||||||
|
Value []int64 `json:"value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeTestHTTPServer(t *testing.T, values ...int64) *httptest.Server {
|
||||||
|
h := func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
require.Equal(t, r.URL.Path, "/metrics")
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
var (
|
||||||
|
response []byte
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if len(values) == 1 {
|
||||||
|
response, err = json.Marshal(testValueResponse{Value: values[0]})
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
response, err = json.Marshal(testValueArrayResponse{Value: values})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
_, err = w.Write(response)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
return httptest.NewServer(http.HandlerFunc(h))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJSONPathMetricsGetter(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
input []int64
|
||||||
|
output float64
|
||||||
|
aggregator AggregatorFunc
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "basic average",
|
||||||
|
input: []int64{3, 4, 5},
|
||||||
|
output: 4,
|
||||||
|
aggregator: Average,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
server := makeTestHTTPServer(t, tc.input...)
|
||||||
|
defer server.Close()
|
||||||
|
path, err := jsonpath.Compile("$.value")
|
||||||
|
require.NoError(t, err)
|
||||||
|
getter := NewJSONPathMetricsGetter(DefaultMetricsHTTPClient(), tc.aggregator, path)
|
||||||
|
url, err := url.Parse(fmt.Sprintf("%s/metrics", server.URL))
|
||||||
|
require.NoError(t, err)
|
||||||
|
metric, err := getter.GetMetric(*url)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tc.output, metric)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
120
pkg/collector/httpmetrics/pod_metrics.go
Normal file
120
pkg/collector/httpmetrics/pod_metrics.go
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
package httpmetrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/oliveagle/jsonpath"
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PodMetricsGetter interface {
|
||||||
|
GetMetric(pod *v1.Pod) (float64, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type PodMetricsJSONPathGetter struct {
|
||||||
|
scheme string
|
||||||
|
path string
|
||||||
|
rawQuery string
|
||||||
|
port int
|
||||||
|
metricGetter *JSONPathMetricsGetter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g PodMetricsJSONPathGetter) GetMetric(pod *v1.Pod) (float64, error) {
|
||||||
|
if pod.Status.PodIP == "" {
|
||||||
|
return 0, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Name)
|
||||||
|
}
|
||||||
|
metricsURL := g.buildMetricsURL(pod.Status.PodIP)
|
||||||
|
return g.metricGetter.GetMetric(metricsURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPodMetricsJSONPathGetter(config map[string]string) (*PodMetricsJSONPathGetter, error) {
|
||||||
|
getter := PodMetricsJSONPathGetter{}
|
||||||
|
var (
|
||||||
|
jsonPath *jsonpath.Compiled
|
||||||
|
aggregator AggregatorFunc
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if v, ok := config["json-key"]; ok {
|
||||||
|
path, err := jsonpath.Compile(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonPath = path
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := config["scheme"]; ok {
|
||||||
|
getter.scheme = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := config["path"]; ok {
|
||||||
|
getter.path = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := config["raw-query"]; ok {
|
||||||
|
getter.rawQuery = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := config["port"]; ok {
|
||||||
|
n, err := strconv.Atoi(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
getter.port = n
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := config["aggregator"]; ok {
|
||||||
|
aggregator, err = ParseAggregator(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
requestTimeout := DefaultRequestTimeout
|
||||||
|
connectTimeout := DefaultConnectTimeout
|
||||||
|
|
||||||
|
if v, ok := config["request-timeout"]; ok {
|
||||||
|
d, err := time.ParseDuration(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if d < 0 {
|
||||||
|
return nil, fmt.Errorf("Invalid request-timeout config value: %s", v)
|
||||||
|
}
|
||||||
|
requestTimeout = d
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := config["connect-timeout"]; ok {
|
||||||
|
d, err := time.ParseDuration(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if d < 0 {
|
||||||
|
return nil, fmt.Errorf("Invalid connect-timeout config value: %s", v)
|
||||||
|
}
|
||||||
|
connectTimeout = d
|
||||||
|
}
|
||||||
|
|
||||||
|
getter.metricGetter = NewJSONPathMetricsGetter(CustomMetricsHTTPClient(requestTimeout, connectTimeout), aggregator, jsonPath)
|
||||||
|
return &getter, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildMetricsURL will build the full URL needed to hit the pod metric endpoint.
|
||||||
|
func (g *PodMetricsJSONPathGetter) buildMetricsURL(podIP string) url.URL {
|
||||||
|
var scheme = g.scheme
|
||||||
|
|
||||||
|
if scheme == "" {
|
||||||
|
scheme = "http"
|
||||||
|
}
|
||||||
|
|
||||||
|
return url.URL{
|
||||||
|
Scheme: scheme,
|
||||||
|
Host: fmt.Sprintf("%s:%d", podIP, g.port),
|
||||||
|
Path: g.path,
|
||||||
|
RawQuery: g.rawQuery,
|
||||||
|
}
|
||||||
|
}
|
198
pkg/collector/httpmetrics/pod_metrics_test.go
Normal file
198
pkg/collector/httpmetrics/pod_metrics_test.go
Normal file
@@ -0,0 +1,198 @@
|
|||||||
|
package httpmetrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/oliveagle/jsonpath"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func compareMetricsGetter(t *testing.T, first, second *PodMetricsJSONPathGetter) {
|
||||||
|
require.Equal(t, first.metricGetter.jsonPath, second.metricGetter.jsonPath)
|
||||||
|
require.Equal(t, first.scheme, second.scheme)
|
||||||
|
require.Equal(t, first.path, second.path)
|
||||||
|
require.Equal(t, first.port, second.port)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewPodJSONPathMetricsGetter(t *testing.T) {
|
||||||
|
configNoAggregator := map[string]string{
|
||||||
|
"json-key": "$.value",
|
||||||
|
"scheme": "http",
|
||||||
|
"path": "/metrics",
|
||||||
|
"port": "9090",
|
||||||
|
}
|
||||||
|
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
|
||||||
|
getterNoAggregator, err1 := NewPodMetricsJSONPathGetter(configNoAggregator)
|
||||||
|
|
||||||
|
require.NoError(t, err1)
|
||||||
|
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
|
||||||
|
metricGetter: &JSONPathMetricsGetter{jsonPath: jpath1},
|
||||||
|
scheme: "http",
|
||||||
|
path: "/metrics",
|
||||||
|
port: 9090,
|
||||||
|
}, getterNoAggregator)
|
||||||
|
|
||||||
|
configAggregator := map[string]string{
|
||||||
|
"json-key": "$.values",
|
||||||
|
"scheme": "http",
|
||||||
|
"path": "/metrics",
|
||||||
|
"port": "9090",
|
||||||
|
"aggregator": "avg",
|
||||||
|
}
|
||||||
|
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
|
||||||
|
getterAggregator, err2 := NewPodMetricsJSONPathGetter(configAggregator)
|
||||||
|
|
||||||
|
require.NoError(t, err2)
|
||||||
|
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
|
||||||
|
metricGetter: &JSONPathMetricsGetter{jsonPath: jpath2, aggregator: Average},
|
||||||
|
scheme: "http",
|
||||||
|
path: "/metrics",
|
||||||
|
port: 9090,
|
||||||
|
}, getterAggregator)
|
||||||
|
|
||||||
|
configErrorJSONPath := map[string]string{
|
||||||
|
"json-key": "{}",
|
||||||
|
"scheme": "http",
|
||||||
|
"path": "/metrics",
|
||||||
|
"port": "9090",
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err3 := NewPodMetricsJSONPathGetter(configErrorJSONPath)
|
||||||
|
require.Error(t, err3)
|
||||||
|
|
||||||
|
configErrorPort := map[string]string{
|
||||||
|
"json-key": "$.values",
|
||||||
|
"scheme": "http",
|
||||||
|
"path": "/metrics",
|
||||||
|
"port": "a9090",
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err4 := NewPodMetricsJSONPathGetter(configErrorPort)
|
||||||
|
require.Error(t, err4)
|
||||||
|
|
||||||
|
configWithRawQuery := map[string]string{
|
||||||
|
"json-key": "$.values",
|
||||||
|
"scheme": "http",
|
||||||
|
"path": "/metrics",
|
||||||
|
"port": "9090",
|
||||||
|
"raw-query": "foo=bar&baz=bop",
|
||||||
|
}
|
||||||
|
jpath5, _ := jsonpath.Compile(configWithRawQuery["json-key"])
|
||||||
|
getterWithRawQuery, err5 := NewPodMetricsJSONPathGetter(configWithRawQuery)
|
||||||
|
|
||||||
|
require.NoError(t, err5)
|
||||||
|
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
|
||||||
|
metricGetter: &JSONPathMetricsGetter{jsonPath: jpath5},
|
||||||
|
scheme: "http",
|
||||||
|
path: "/metrics",
|
||||||
|
port: 9090,
|
||||||
|
rawQuery: "foo=bar&baz=bop",
|
||||||
|
}, getterWithRawQuery)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildMetricsURL(t *testing.T) {
|
||||||
|
scheme := "http"
|
||||||
|
ip := "1.2.3.4"
|
||||||
|
port := "9090"
|
||||||
|
path := "/v1/test/"
|
||||||
|
rawQuery := "foo=bar&baz=bop"
|
||||||
|
|
||||||
|
// Test building URL with rawQuery
|
||||||
|
configWithRawQuery := map[string]string{
|
||||||
|
"json-key": "$.value",
|
||||||
|
"scheme": scheme,
|
||||||
|
"path": path,
|
||||||
|
"port": port,
|
||||||
|
"raw-query": rawQuery,
|
||||||
|
}
|
||||||
|
_, err := jsonpath.Compile(configWithRawQuery["json-key"])
|
||||||
|
require.NoError(t, err)
|
||||||
|
getterWithRawQuery, err1 := NewPodMetricsJSONPathGetter(configWithRawQuery)
|
||||||
|
require.NoError(t, err1)
|
||||||
|
|
||||||
|
expectedURLWithQuery := fmt.Sprintf("%s://%s:%s%s?%s", scheme, ip, port, path, rawQuery)
|
||||||
|
receivedURLWithQuery := getterWithRawQuery.buildMetricsURL(ip)
|
||||||
|
require.Equal(t, receivedURLWithQuery.String(), expectedURLWithQuery)
|
||||||
|
|
||||||
|
// Test building URL without rawQuery
|
||||||
|
configWithNoQuery := map[string]string{
|
||||||
|
"json-key": "$.value",
|
||||||
|
"scheme": scheme,
|
||||||
|
"path": path,
|
||||||
|
"port": port,
|
||||||
|
}
|
||||||
|
_, err2 := jsonpath.Compile(configWithNoQuery["json-key"])
|
||||||
|
require.NoError(t, err2)
|
||||||
|
getterWithNoQuery, err3 := NewPodMetricsJSONPathGetter(configWithNoQuery)
|
||||||
|
require.NoError(t, err3)
|
||||||
|
|
||||||
|
expectedURLNoQuery := fmt.Sprintf("%s://%s:%s%s", scheme, ip, port, path)
|
||||||
|
receivedURLNoQuery := getterWithNoQuery.buildMetricsURL(ip)
|
||||||
|
require.Equal(t, receivedURLNoQuery.String(), expectedURLNoQuery)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCustomTimeouts(t *testing.T) {
|
||||||
|
scheme := "http"
|
||||||
|
port := "9090"
|
||||||
|
path := "/v1/test/"
|
||||||
|
|
||||||
|
// Test no custom options results in default timeouts
|
||||||
|
defaultConfig := map[string]string{
|
||||||
|
"json-key": "$.value",
|
||||||
|
"scheme": scheme,
|
||||||
|
"path": path,
|
||||||
|
"port": port,
|
||||||
|
}
|
||||||
|
defaultTime := time.Duration(15000) * time.Millisecond
|
||||||
|
|
||||||
|
defaultGetter, err1 := NewPodMetricsJSONPathGetter(defaultConfig)
|
||||||
|
require.NoError(t, err1)
|
||||||
|
require.Equal(t, defaultGetter.metricGetter.client.Timeout, defaultTime)
|
||||||
|
|
||||||
|
// Test with custom request timeout
|
||||||
|
configWithRequestTimeout := map[string]string{
|
||||||
|
"json-key": "$.value",
|
||||||
|
"scheme": scheme,
|
||||||
|
"path": path,
|
||||||
|
"port": port,
|
||||||
|
"request-timeout": "978ms",
|
||||||
|
}
|
||||||
|
exectedTimeout := time.Duration(978) * time.Millisecond
|
||||||
|
customRequestGetter, err2 := NewPodMetricsJSONPathGetter(configWithRequestTimeout)
|
||||||
|
require.NoError(t, err2)
|
||||||
|
require.Equal(t, customRequestGetter.metricGetter.client.Timeout, exectedTimeout)
|
||||||
|
|
||||||
|
// Test with custom connect timeout. Unfortunately, it seems there's no way to access the
|
||||||
|
// connect timeout of the client struct to actually verify it's set :/
|
||||||
|
configWithConnectTimeout := map[string]string{
|
||||||
|
"json-key": "$.value",
|
||||||
|
"scheme": scheme,
|
||||||
|
"path": path,
|
||||||
|
"port": port,
|
||||||
|
"connect-timeout": "512ms",
|
||||||
|
}
|
||||||
|
customRequestGetter, err3 := NewPodMetricsJSONPathGetter(configWithConnectTimeout)
|
||||||
|
require.NoError(t, err3)
|
||||||
|
|
||||||
|
configWithInvalidTimeout := map[string]string{
|
||||||
|
"json-key": "$.value",
|
||||||
|
"scheme": scheme,
|
||||||
|
"path": path,
|
||||||
|
"port": port,
|
||||||
|
"request-timeout": "-256ms",
|
||||||
|
}
|
||||||
|
_, err4 := NewPodMetricsJSONPathGetter(configWithInvalidTimeout)
|
||||||
|
require.Error(t, err4)
|
||||||
|
|
||||||
|
configWithInvalidTimeout = map[string]string{
|
||||||
|
"json-key": "$.value",
|
||||||
|
"scheme": scheme,
|
||||||
|
"path": path,
|
||||||
|
"port": port,
|
||||||
|
"connect-timeout": "-256ms",
|
||||||
|
}
|
||||||
|
_, err5 := NewPodMetricsJSONPathGetter(configWithInvalidTimeout)
|
||||||
|
require.Error(t, err5)
|
||||||
|
}
|
@@ -18,7 +18,7 @@ const (
|
|||||||
InfluxDBMetricName = "flux-query"
|
InfluxDBMetricName = "flux-query"
|
||||||
influxDBAddressKey = "address"
|
influxDBAddressKey = "address"
|
||||||
influxDBTokenKey = "token"
|
influxDBTokenKey = "token"
|
||||||
influxDBOrgIDKey = "org-id"
|
influxDBOrgKey = "org"
|
||||||
influxDBQueryNameLabelKey = "query-name"
|
influxDBQueryNameLabelKey = "query-name"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -26,26 +26,26 @@ type InfluxDBCollectorPlugin struct {
|
|||||||
kubeClient kubernetes.Interface
|
kubeClient kubernetes.Interface
|
||||||
address string
|
address string
|
||||||
token string
|
token string
|
||||||
orgID string
|
org string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, orgID string) (*InfluxDBCollectorPlugin, error) {
|
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, org string) (*InfluxDBCollectorPlugin, error) {
|
||||||
return &InfluxDBCollectorPlugin{
|
return &InfluxDBCollectorPlugin{
|
||||||
kubeClient: client,
|
kubeClient: client,
|
||||||
address: address,
|
address: address,
|
||||||
token: token,
|
token: token,
|
||||||
orgID: orgID,
|
org: org,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||||
return NewInfluxDBCollector(p.address, p.orgID, p.token, config, interval)
|
return NewInfluxDBCollector(p.address, p.token, p.org, config, interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
type InfluxDBCollector struct {
|
type InfluxDBCollector struct {
|
||||||
address string
|
address string
|
||||||
token string
|
token string
|
||||||
orgID string
|
org string
|
||||||
|
|
||||||
influxDBClient *influxdb.Client
|
influxDBClient *influxdb.Client
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
@@ -54,7 +54,7 @@ type InfluxDBCollector struct {
|
|||||||
query string
|
query string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewInfluxDBCollector(address string, token string, orgID string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
|
func NewInfluxDBCollector(address string, token string, org string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
|
||||||
collector := &InfluxDBCollector{
|
collector := &InfluxDBCollector{
|
||||||
interval: interval,
|
interval: interval,
|
||||||
metric: config.Metric,
|
metric: config.Metric,
|
||||||
@@ -87,8 +87,8 @@ func NewInfluxDBCollector(address string, token string, orgID string, config *Me
|
|||||||
if v, ok := config.Config[influxDBTokenKey]; ok {
|
if v, ok := config.Config[influxDBTokenKey]; ok {
|
||||||
token = v
|
token = v
|
||||||
}
|
}
|
||||||
if v, ok := config.Config[influxDBOrgIDKey]; ok {
|
if v, ok := config.Config[influxDBOrgKey]; ok {
|
||||||
orgID = v
|
org = v
|
||||||
}
|
}
|
||||||
influxDbClient, err := influxdb.New(address, token)
|
influxDbClient, err := influxdb.New(address, token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -96,7 +96,7 @@ func NewInfluxDBCollector(address string, token string, orgID string, config *Me
|
|||||||
}
|
}
|
||||||
collector.address = address
|
collector.address = address
|
||||||
collector.token = token
|
collector.token = token
|
||||||
collector.orgID = orgID
|
collector.org = org
|
||||||
collector.influxDBClient = influxDbClient
|
collector.influxDBClient = influxDbClient
|
||||||
return collector, nil
|
return collector, nil
|
||||||
}
|
}
|
||||||
@@ -109,7 +109,7 @@ type queryResult struct {
|
|||||||
|
|
||||||
// getValue returns the first result gathered from an InfluxDB instance.
|
// getValue returns the first result gathered from an InfluxDB instance.
|
||||||
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
|
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
|
||||||
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.orgID)
|
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.org)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resource.Quantity{}, err
|
return resource.Quantity{}, err
|
||||||
}
|
}
|
||||||
|
@@ -36,7 +36,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if got, want := c.orgID, "deadbeef"; want != got {
|
if got, want := c.org, "deadbeef"; want != got {
|
||||||
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
||||||
}
|
}
|
||||||
if got, want := c.address, "http://localhost:9999"; want != got {
|
if got, want := c.address, "http://localhost:9999"; want != got {
|
||||||
@@ -69,7 +69,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
|
|||||||
"range3m": `from(bucket: "?") |> range(start: -3m)`,
|
"range3m": `from(bucket: "?") |> range(start: -3m)`,
|
||||||
"address": "http://localhost:9999",
|
"address": "http://localhost:9999",
|
||||||
"token": "sEcr3TT0ken",
|
"token": "sEcr3TT0ken",
|
||||||
"org-id": "deadbeef1234",
|
"org": "deadbeef1234",
|
||||||
"query-name": "range3m",
|
"query-name": "range3m",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -77,7 +77,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if got, want := c.orgID, "deadbeef1234"; want != got {
|
if got, want := c.org, "deadbeef1234"; want != got {
|
||||||
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
||||||
}
|
}
|
||||||
if got, want := c.address, "http://localhost:9999"; want != got {
|
if got, want := c.address, "http://localhost:9999"; want != got {
|
||||||
|
@@ -1,227 +0,0 @@
|
|||||||
package collector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"math"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/oliveagle/jsonpath"
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
|
||||||
)
|
|
||||||
|
|
||||||
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
|
|
||||||
// querying the pods metrics endpoint and lookup the metric value as defined by
|
|
||||||
// the json path query.
|
|
||||||
type JSONPathMetricsGetter struct {
|
|
||||||
jsonPath *jsonpath.Compiled
|
|
||||||
scheme string
|
|
||||||
path string
|
|
||||||
port int
|
|
||||||
aggregator string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
|
||||||
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
|
|
||||||
getter := &JSONPathMetricsGetter{}
|
|
||||||
|
|
||||||
if v, ok := config["json-key"]; ok {
|
|
||||||
path, err := jsonpath.Compile(v)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
getter.jsonPath = path
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, ok := config["scheme"]; ok {
|
|
||||||
getter.scheme = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, ok := config["path"]; ok {
|
|
||||||
getter.path = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, ok := config["port"]; ok {
|
|
||||||
n, err := strconv.Atoi(v)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
getter.port = n
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, ok := config["aggregator"]; ok {
|
|
||||||
getter.aggregator = v
|
|
||||||
}
|
|
||||||
|
|
||||||
return getter, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetMetric gets metric from pod by fetching json metrics from the pods metric
|
|
||||||
// endpoint and extracting the desired value using the specified json path
|
|
||||||
// query.
|
|
||||||
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
|
|
||||||
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// parse data
|
|
||||||
var jsonData interface{}
|
|
||||||
err = json.Unmarshal(data, &jsonData)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := g.jsonPath.Lookup(jsonData)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch res := res.(type) {
|
|
||||||
case int:
|
|
||||||
return float64(res), nil
|
|
||||||
case float32:
|
|
||||||
return float64(res), nil
|
|
||||||
case float64:
|
|
||||||
return res, nil
|
|
||||||
case []interface{}:
|
|
||||||
s, err := castSlice(res)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return reduce(s, g.aggregator)
|
|
||||||
default:
|
|
||||||
return 0, fmt.Errorf("unsupported type %T", res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// castSlice takes a slice of interface and returns a slice of float64 if all
|
|
||||||
// values in slice were castable, else returns an error
|
|
||||||
func castSlice(in []interface{}) ([]float64, error) {
|
|
||||||
out := []float64{}
|
|
||||||
|
|
||||||
for _, v := range in {
|
|
||||||
switch v := v.(type) {
|
|
||||||
case int:
|
|
||||||
out = append(out, float64(v))
|
|
||||||
case float32:
|
|
||||||
out = append(out, float64(v))
|
|
||||||
case float64:
|
|
||||||
out = append(out, v)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getPodMetrics returns the content of the pods metrics endpoint.
|
|
||||||
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
|
|
||||||
if pod.Status.PodIP == "" {
|
|
||||||
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
|
|
||||||
}
|
|
||||||
|
|
||||||
httpClient := &http.Client{
|
|
||||||
Timeout: 15 * time.Second,
|
|
||||||
Transport: &http.Transport{},
|
|
||||||
}
|
|
||||||
|
|
||||||
if scheme == "" {
|
|
||||||
scheme = "http"
|
|
||||||
}
|
|
||||||
|
|
||||||
metricsURL := url.URL{
|
|
||||||
Scheme: scheme,
|
|
||||||
Host: fmt.Sprintf("%s:%d", pod.Status.PodIP, port),
|
|
||||||
Path: path,
|
|
||||||
}
|
|
||||||
|
|
||||||
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := httpClient.Do(request)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
data, err := ioutil.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
return data, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
|
|
||||||
func reduce(values []float64, aggregator string) (float64, error) {
|
|
||||||
switch aggregator {
|
|
||||||
case "avg":
|
|
||||||
return avg(values), nil
|
|
||||||
case "min":
|
|
||||||
return min(values), nil
|
|
||||||
case "max":
|
|
||||||
return max(values), nil
|
|
||||||
case "sum":
|
|
||||||
return sum(values), nil
|
|
||||||
default:
|
|
||||||
return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// avg implements the average mathematical function over a slice of float64
|
|
||||||
func avg(values []float64) float64 {
|
|
||||||
sum := sum(values)
|
|
||||||
return sum / float64(len(values))
|
|
||||||
}
|
|
||||||
|
|
||||||
// min implements the absolute minimum mathematical function over a slice of float64
|
|
||||||
func min(values []float64) float64 {
|
|
||||||
// initialized with positive infinity, all finite numbers are smaller than it
|
|
||||||
curMin := math.Inf(1)
|
|
||||||
|
|
||||||
for _, v := range values {
|
|
||||||
if v < curMin {
|
|
||||||
curMin = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return curMin
|
|
||||||
}
|
|
||||||
|
|
||||||
// max implements the absolute maximum mathematical function over a slice of float64
|
|
||||||
func max(values []float64) float64 {
|
|
||||||
// initialized with negative infinity, all finite numbers are bigger than it
|
|
||||||
curMax := math.Inf(-1)
|
|
||||||
|
|
||||||
for _, v := range values {
|
|
||||||
if v > curMax {
|
|
||||||
curMax = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return curMax
|
|
||||||
}
|
|
||||||
|
|
||||||
// sum implements the summation mathematical function over a slice of float64
|
|
||||||
func sum(values []float64) float64 {
|
|
||||||
res := 0.0
|
|
||||||
|
|
||||||
for _, v := range values {
|
|
||||||
res += v
|
|
||||||
}
|
|
||||||
|
|
||||||
return res
|
|
||||||
}
|
|
@@ -1,105 +0,0 @@
|
|||||||
package collector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/oliveagle/jsonpath"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestNewJSONPathMetricsGetter(t *testing.T) {
|
|
||||||
configNoAggregator := map[string]string{
|
|
||||||
"json-key": "$.value",
|
|
||||||
"scheme": "http",
|
|
||||||
"path": "/metrics",
|
|
||||||
"port": "9090",
|
|
||||||
}
|
|
||||||
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
|
|
||||||
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
|
|
||||||
|
|
||||||
require.NoError(t, err1)
|
|
||||||
require.Equal(t, &JSONPathMetricsGetter{
|
|
||||||
jsonPath: jpath1,
|
|
||||||
scheme: "http",
|
|
||||||
path: "/metrics",
|
|
||||||
port: 9090,
|
|
||||||
}, getterNoAggregator)
|
|
||||||
|
|
||||||
configAggregator := map[string]string{
|
|
||||||
"json-key": "$.values",
|
|
||||||
"scheme": "http",
|
|
||||||
"path": "/metrics",
|
|
||||||
"port": "9090",
|
|
||||||
"aggregator": "avg",
|
|
||||||
}
|
|
||||||
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
|
|
||||||
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
|
|
||||||
|
|
||||||
require.NoError(t, err2)
|
|
||||||
require.Equal(t, &JSONPathMetricsGetter{
|
|
||||||
jsonPath: jpath2,
|
|
||||||
scheme: "http",
|
|
||||||
path: "/metrics",
|
|
||||||
port: 9090,
|
|
||||||
aggregator: "avg",
|
|
||||||
}, getterAggregator)
|
|
||||||
|
|
||||||
configErrorJSONPath := map[string]string{
|
|
||||||
"json-key": "{}",
|
|
||||||
"scheme": "http",
|
|
||||||
"path": "/metrics",
|
|
||||||
"port": "9090",
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err3 := NewJSONPathMetricsGetter(configErrorJSONPath)
|
|
||||||
require.Error(t, err3)
|
|
||||||
|
|
||||||
configErrorPort := map[string]string{
|
|
||||||
"json-key": "$.values",
|
|
||||||
"scheme": "http",
|
|
||||||
"path": "/metrics",
|
|
||||||
"port": "a9090",
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err4 := NewJSONPathMetricsGetter(configErrorPort)
|
|
||||||
require.Error(t, err4)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCastSlice(t *testing.T) {
|
|
||||||
res1, err1 := castSlice([]interface{}{1, 2, 3})
|
|
||||||
require.NoError(t, err1)
|
|
||||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
|
|
||||||
|
|
||||||
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
|
|
||||||
require.NoError(t, err2)
|
|
||||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
|
|
||||||
|
|
||||||
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
|
|
||||||
require.NoError(t, err3)
|
|
||||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
|
|
||||||
|
|
||||||
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
|
|
||||||
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
|
|
||||||
require.Equal(t, []float64(nil), res4)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReduce(t *testing.T) {
|
|
||||||
average, err1 := reduce([]float64{1, 2, 3}, "avg")
|
|
||||||
require.NoError(t, err1)
|
|
||||||
require.Equal(t, 2.0, average)
|
|
||||||
|
|
||||||
min, err2 := reduce([]float64{1, 2, 3}, "min")
|
|
||||||
require.NoError(t, err2)
|
|
||||||
require.Equal(t, 1.0, min)
|
|
||||||
|
|
||||||
max, err3 := reduce([]float64{1, 2, 3}, "max")
|
|
||||||
require.NoError(t, err3)
|
|
||||||
require.Equal(t, 3.0, max)
|
|
||||||
|
|
||||||
sum, err4 := reduce([]float64{1, 2, 3}, "sum")
|
|
||||||
require.NoError(t, err4)
|
|
||||||
require.Equal(t, 6.0, sum)
|
|
||||||
|
|
||||||
_, err5 := reduce([]float64{1, 2, 3}, "inexistent_function")
|
|
||||||
require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function")
|
|
||||||
}
|
|
@@ -2,13 +2,15 @@ package collector
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
|
||||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||||
corev1 "k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||||
@@ -30,17 +32,14 @@ func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutosc
|
|||||||
|
|
||||||
type PodCollector struct {
|
type PodCollector struct {
|
||||||
client kubernetes.Interface
|
client kubernetes.Interface
|
||||||
Getter PodMetricsGetter
|
Getter httpmetrics.PodMetricsGetter
|
||||||
podLabelSelector *metav1.LabelSelector
|
podLabelSelector *metav1.LabelSelector
|
||||||
namespace string
|
namespace string
|
||||||
metric autoscalingv2.MetricIdentifier
|
metric autoscalingv2.MetricIdentifier
|
||||||
metricType autoscalingv2.MetricSourceType
|
metricType autoscalingv2.MetricSourceType
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
}
|
httpClient *http.Client
|
||||||
|
|
||||||
type PodMetricsGetter interface {
|
|
||||||
GetMetric(pod *corev1.Pod) (float64, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
|
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
|
||||||
@@ -60,11 +59,11 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalP
|
|||||||
logger: log.WithFields(log.Fields{"Collector": "Pod"}),
|
logger: log.WithFields(log.Fields{"Collector": "Pod"}),
|
||||||
}
|
}
|
||||||
|
|
||||||
var getter PodMetricsGetter
|
var getter httpmetrics.PodMetricsGetter
|
||||||
switch config.CollectorName {
|
switch config.CollectorName {
|
||||||
case "json-path":
|
case "json-path":
|
||||||
var err error
|
var err error
|
||||||
getter, err = NewJSONPathMetricsGetter(config.Config)
|
getter, err = httpmetrics.NewPodMetricsJSONPathGetter(config.Config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -87,32 +86,20 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
values := make([]CollectedMetric, 0, len(pods.Items))
|
ch := make(chan CollectedMetric)
|
||||||
|
errCh := make(chan error)
|
||||||
// TODO: get metrics in parallel
|
|
||||||
for _, pod := range pods.Items {
|
for _, pod := range pods.Items {
|
||||||
value, err := c.Getter.GetMetric(&pod)
|
go c.getPodMetric(pod, ch, errCh)
|
||||||
if err != nil {
|
}
|
||||||
c.logger.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
metricValue := CollectedMetric{
|
values := make([]CollectedMetric, 0, len(pods.Items))
|
||||||
Type: c.metricType,
|
for i := 0; i < len(pods.Items); i++ {
|
||||||
Custom: custom_metrics.MetricValue{
|
select {
|
||||||
DescribedObject: custom_metrics.ObjectReference{
|
case err := <- errCh:
|
||||||
APIVersion: "v1",
|
c.logger.Error(err)
|
||||||
Kind: "Pod",
|
case resp := <- ch:
|
||||||
Name: pod.Name,
|
values = append(values, resp)
|
||||||
Namespace: pod.Namespace,
|
|
||||||
},
|
|
||||||
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
|
|
||||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
|
||||||
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
values = append(values, metricValue)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return values, nil
|
return values, nil
|
||||||
@@ -122,6 +109,29 @@ func (c *PodCollector) Interval() time.Duration {
|
|||||||
return c.interval
|
return c.interval
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *PodCollector) getPodMetric(pod corev1.Pod, ch chan CollectedMetric, errCh chan error) {
|
||||||
|
value, err := c.Getter.GetMetric(&pod)
|
||||||
|
if err != nil {
|
||||||
|
errCh <- fmt.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ch <- CollectedMetric{
|
||||||
|
Type: c.metricType,
|
||||||
|
Custom: custom_metrics.MetricValue{
|
||||||
|
DescribedObject: custom_metrics.ObjectReference{
|
||||||
|
APIVersion: "v1",
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: pod.Name,
|
||||||
|
Namespace: pod.Namespace,
|
||||||
|
},
|
||||||
|
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
|
||||||
|
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||||
|
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) {
|
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) {
|
||||||
switch hpa.Spec.ScaleTargetRef.Kind {
|
switch hpa.Spec.ScaleTargetRef.Kind {
|
||||||
case "Deployment":
|
case "Deployment":
|
||||||
|
155
pkg/collector/pod_collector_test.go
Normal file
155
pkg/collector/pod_collector_test.go
Normal file
@@ -0,0 +1,155 @@
|
|||||||
|
package collector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
|
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
testNamespace = "test-namespace"
|
||||||
|
applicationLabelName = "application"
|
||||||
|
applicationLabelValue = "test-application"
|
||||||
|
testDeploymentName = "test-application"
|
||||||
|
testInterval = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPodCollector(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
metrics [][]int64
|
||||||
|
result []int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple",
|
||||||
|
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
|
||||||
|
result: []int64{1, 3, 8, 5, 2},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
|
plugin := NewPodCollectorPlugin(client)
|
||||||
|
makeTestDeployment(t, client)
|
||||||
|
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
|
||||||
|
makeTestPods(t, host, port, "test-metric", client, 5)
|
||||||
|
testHPA := makeTestHPA(t, client)
|
||||||
|
testConfig := makeTestConfig(port)
|
||||||
|
collector, err := plugin.NewCollector(testHPA, testConfig, testInterval)
|
||||||
|
require.NoError(t, err)
|
||||||
|
metrics, err := collector.GetMetrics()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
|
||||||
|
var values []int64
|
||||||
|
for _, m := range metrics {
|
||||||
|
values = append(values, m.Custom.Value.Value())
|
||||||
|
}
|
||||||
|
require.ElementsMatch(t, tc.result, values)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type testMetricResponse struct {
|
||||||
|
Values []int64 `json:"values"`
|
||||||
|
}
|
||||||
|
type testMetricsHandler struct {
|
||||||
|
values [][]int64
|
||||||
|
calledCounter uint
|
||||||
|
t *testing.T
|
||||||
|
metricsPath string
|
||||||
|
sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
h.Lock()
|
||||||
|
defer h.Unlock()
|
||||||
|
|
||||||
|
require.Equal(h.t, h.metricsPath, r.URL.Path)
|
||||||
|
require.Less(h.t, int(h.calledCounter), len(h.values))
|
||||||
|
response, err := json.Marshal(testMetricResponse{Values: h.values[h.calledCounter]})
|
||||||
|
require.NoError(h.t, err)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_, err = w.Write(response)
|
||||||
|
require.NoError(h.t, err)
|
||||||
|
h.calledCounter++
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeTestHTTPServer(t *testing.T, values [][]int64) (string, string, *testMetricsHandler) {
|
||||||
|
metricsHandler := &testMetricsHandler{values: values, t: t, metricsPath: "/metrics"}
|
||||||
|
server := httptest.NewServer(metricsHandler)
|
||||||
|
url, err := url.Parse(server.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return url.Hostname(), url.Port(), metricsHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeTestConfig(port string) *MetricConfig {
|
||||||
|
return &MetricConfig{
|
||||||
|
CollectorName: "json-path",
|
||||||
|
Config: map[string]string{"json-key": "$.values", "port": port, "path": "/metrics", "aggregator": "sum"},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int) {
|
||||||
|
for i := 0; i < replicas; i++ {
|
||||||
|
testPod := &corev1.Pod{
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("test-pod-%d", i),
|
||||||
|
Labels: map[string]string{applicationLabelName: applicationLabelValue},
|
||||||
|
Annotations: map[string]string{
|
||||||
|
fmt.Sprintf("metric-config.pods.%s.json-path/port", metricName): port,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: corev1.PodStatus{
|
||||||
|
PodIP: testServer,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := client.CoreV1().Pods(testNamespace).Create(testPod)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeTestDeployment(t *testing.T, client kubernetes.Interface) *appsv1.Deployment {
|
||||||
|
deployment := appsv1.Deployment{
|
||||||
|
ObjectMeta: v1.ObjectMeta{Name: testDeploymentName},
|
||||||
|
Spec: appsv1.DeploymentSpec{
|
||||||
|
Selector: &v1.LabelSelector{
|
||||||
|
MatchLabels: map[string]string{applicationLabelName: applicationLabelValue},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := client.AppsV1().Deployments(testNamespace).Create(&deployment)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return &deployment
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeTestHPA(t *testing.T, client kubernetes.Interface) *autoscalingv2.HorizontalPodAutoscaler {
|
||||||
|
hpa := &autoscalingv2.HorizontalPodAutoscaler{
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
Name: "test-hpa",
|
||||||
|
Namespace: testNamespace,
|
||||||
|
},
|
||||||
|
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
|
||||||
|
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
|
||||||
|
Kind: "Deployment",
|
||||||
|
Name: testDeploymentName,
|
||||||
|
APIVersion: "apps/v1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := client.AutoscalingV2beta2().HorizontalPodAutoscalers("test-namespace").Create(hpa)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return hpa
|
||||||
|
}
|
@@ -128,7 +128,7 @@ func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa
|
|||||||
|
|
||||||
func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
|
func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||||
// TODO: use real context
|
// TODO: use real context
|
||||||
value, err := c.promAPI.Query(context.Background(), c.query, time.Now().UTC())
|
value, _, err := c.promAPI.Query(context.Background(), c.query, time.Now().UTC())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -125,7 +125,7 @@ func getWeights(ingressAnnotations map[string]string, backendAnnotations []strin
|
|||||||
|
|
||||||
// getCollector returns a collector for getting the metrics.
|
// getCollector returns a collector for getting the metrics.
|
||||||
func (c *SkipperCollector) getCollector() (Collector, error) {
|
func (c *SkipperCollector) getCollector() (Collector, error) {
|
||||||
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
|
ingress, err := c.client.NetworkingV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -10,7 +10,7 @@ import (
|
|||||||
appsv1 "k8s.io/api/apps/v1"
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/api/extensions/v1beta1"
|
v1beta1 "k8s.io/api/networking/v1beta1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
@@ -331,8 +331,8 @@ func TestSkipperCollector(t *testing.T) {
|
|||||||
err := makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.hostnames, tc.backendWeights)
|
err := makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.hostnames, tc.backendWeights)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
plugin := makePlugin(tc.metric)
|
plugin := makePlugin(tc.metric)
|
||||||
hpa := makeHPA(tc.ingressName, tc.backend)
|
hpa := makeHPA(tc.namespace, tc.ingressName, tc.backend)
|
||||||
config := makeConfig(tc.backend, tc.fakedAverage)
|
config := makeConfig(tc.ingressName, tc.namespace, tc.backend, tc.fakedAverage)
|
||||||
_, err = newDeployment(client, tc.namespace, tc.backend, tc.replicas, tc.readyReplicas)
|
_, err = newDeployment(client, tc.namespace, tc.backend, tc.replicas, tc.readyReplicas)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
collector, err := NewSkipperCollector(client, plugin, hpa, config, time.Minute, tc.backendAnnotations, tc.backend)
|
collector, err := NewSkipperCollector(client, plugin, hpa, config, time.Minute, tc.backendAnnotations, tc.backend)
|
||||||
@@ -341,6 +341,7 @@ func TestSkipperCollector(t *testing.T) {
|
|||||||
if tc.expectError {
|
if tc.expectError {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
} else {
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
require.Equal(t, map[string]string{"query": tc.expectedQuery}, plugin.config)
|
require.Equal(t, map[string]string{"query": tc.expectedQuery}, plugin.config)
|
||||||
require.NoError(t, err, "failed to collect metrics: %v", err)
|
require.NoError(t, err, "failed to collect metrics: %v", err)
|
||||||
require.Len(t, collected, 1, "the number of metrics returned is not 1")
|
require.Len(t, collected, 1, "the number of metrics returned is not 1")
|
||||||
@@ -381,12 +382,13 @@ func makeIngress(client kubernetes.Interface, namespace, ingressName, backend st
|
|||||||
Host: hostname,
|
Host: hostname,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
_, err := client.ExtensionsV1beta1().Ingresses(namespace).Create(ingress)
|
_, err := client.NetworkingV1beta1().Ingresses(namespace).Create(ingress)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeHPA(ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler {
|
func makeHPA(namespace, ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler {
|
||||||
return &autoscalingv2.HorizontalPodAutoscaler{
|
return &autoscalingv2.HorizontalPodAutoscaler{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: namespace},
|
||||||
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
|
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
|
||||||
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
|
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
|
||||||
Kind: "Deployment",
|
Kind: "Deployment",
|
||||||
@@ -404,9 +406,13 @@ func makeHPA(ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func makeConfig(backend string, fakedAverage bool) *MetricConfig {
|
func makeConfig(ingressName, namespace, backend string, fakedAverage bool) *MetricConfig {
|
||||||
config := &MetricConfig{
|
config := &MetricConfig{
|
||||||
MetricTypeName: MetricTypeName{Metric: autoscalingv2.MetricIdentifier{Name: fmt.Sprintf("%s,%s", rpsMetricName, backend)}},
|
MetricTypeName: MetricTypeName{Metric: autoscalingv2.MetricIdentifier{Name: fmt.Sprintf("%s,%s", rpsMetricName, backend)}},
|
||||||
|
ObjectReference: custom_metrics.ObjectReference{
|
||||||
|
Name: ingressName,
|
||||||
|
Namespace: namespace,
|
||||||
|
},
|
||||||
MetricSpec: autoscalingv2.MetricSpec{
|
MetricSpec: autoscalingv2.MetricSpec{
|
||||||
Object: &autoscalingv2.ObjectMetricSource{
|
Object: &autoscalingv2.ObjectMetricSource{
|
||||||
Target: autoscalingv2.MetricTarget{},
|
Target: autoscalingv2.MetricTarget{},
|
||||||
|
@@ -2,6 +2,7 @@ package provider
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -49,16 +50,17 @@ var (
|
|||||||
// HPAProvider is a base provider for initializing metric collectors based on
|
// HPAProvider is a base provider for initializing metric collectors based on
|
||||||
// HPA resources.
|
// HPA resources.
|
||||||
type HPAProvider struct {
|
type HPAProvider struct {
|
||||||
client kubernetes.Interface
|
client kubernetes.Interface
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
collectorScheduler *CollectorScheduler
|
collectorScheduler *CollectorScheduler
|
||||||
collectorInterval time.Duration
|
collectorInterval time.Duration
|
||||||
metricSink chan metricCollection
|
metricSink chan metricCollection
|
||||||
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
|
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
|
||||||
metricStore *MetricStore
|
metricStore *MetricStore
|
||||||
collectorFactory *collector.CollectorFactory
|
collectorFactory *collector.CollectorFactory
|
||||||
recorder kube_record.EventRecorder
|
recorder kube_record.EventRecorder
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
|
disregardIncompatibleHPAs bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// metricCollection is a container for sending collected metrics across a
|
// metricCollection is a container for sending collected metrics across a
|
||||||
@@ -69,7 +71,7 @@ type metricCollection struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewHPAProvider initializes a new HPAProvider.
|
// NewHPAProvider initializes a new HPAProvider.
|
||||||
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory) *HPAProvider {
|
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
|
||||||
metricsc := make(chan metricCollection)
|
metricsc := make(chan metricCollection)
|
||||||
|
|
||||||
return &HPAProvider{
|
return &HPAProvider{
|
||||||
@@ -80,9 +82,10 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
|
|||||||
metricStore: NewMetricStore(func() time.Time {
|
metricStore: NewMetricStore(func() time.Time {
|
||||||
return time.Now().UTC().Add(15 * time.Minute)
|
return time.Now().UTC().Add(15 * time.Minute)
|
||||||
}),
|
}),
|
||||||
collectorFactory: collectorFactory,
|
collectorFactory: collectorFactory,
|
||||||
recorder: recorder.CreateEventRecorder(client),
|
recorder: recorder.CreateEventRecorder(client),
|
||||||
logger: log.WithFields(log.Fields{"provider": "hpa"}),
|
logger: log.WithFields(log.Fields{"provider": "hpa"}),
|
||||||
|
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -154,15 +157,20 @@ func (p *HPAProvider) updateHPAs() error {
|
|||||||
interval = p.collectorInterval
|
interval = p.collectorInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
collector, err := p.collectorFactory.NewCollector(&hpa, config, interval)
|
c, err := p.collectorFactory.NewCollector(&hpa, config, interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
|
|
||||||
|
// Only log when it's not a PluginNotFoundError AND flag disregardIncompatibleHPAs is true
|
||||||
|
if !(errors.Is(err, &collector.PluginNotFoundError{}) && p.disregardIncompatibleHPAs) {
|
||||||
|
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
cache = false
|
cache = false
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
p.logger.Infof("Adding new metrics collector: %T", collector)
|
p.logger.Infof("Adding new metrics collector: %T", c)
|
||||||
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, collector)
|
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, c)
|
||||||
}
|
}
|
||||||
newHPAs++
|
newHPAs++
|
||||||
|
|
||||||
|
@@ -77,7 +77,7 @@ func TestUpdateHPAs(t *testing.T) {
|
|||||||
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory)
|
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
|
||||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||||
|
|
||||||
err = provider.updateHPAs()
|
err = provider.updateHPAs()
|
||||||
@@ -94,3 +94,56 @@ func TestUpdateHPAs(t *testing.T) {
|
|||||||
|
|
||||||
require.Len(t, provider.collectorScheduler.table, 1)
|
require.Len(t, provider.collectorScheduler.table, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
|
||||||
|
// Test HPAProvider with disregardIncompatibleHPAs = true
|
||||||
|
|
||||||
|
value := resource.MustParse("1k")
|
||||||
|
|
||||||
|
hpa := &autoscaling.HorizontalPodAutoscaler{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "hpa1",
|
||||||
|
Namespace: "default",
|
||||||
|
Annotations: map[string]string{},
|
||||||
|
},
|
||||||
|
Spec: autoscaling.HorizontalPodAutoscalerSpec{
|
||||||
|
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
|
||||||
|
Kind: "Deployment",
|
||||||
|
Name: "app",
|
||||||
|
APIVersion: "apps/v1",
|
||||||
|
},
|
||||||
|
MinReplicas: &[]int32{1}[0],
|
||||||
|
MaxReplicas: 10,
|
||||||
|
Metrics: []autoscaling.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscaling.ExternalMetricSourceType,
|
||||||
|
External: &autoscaling.ExternalMetricSource{
|
||||||
|
Metric: autoscaling.MetricIdentifier{
|
||||||
|
Name: "some-other-metric",
|
||||||
|
},
|
||||||
|
Target: autoscaling.MetricTarget{
|
||||||
|
Type: autoscaling.AverageValueMetricType,
|
||||||
|
AverageValue: &value,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeClient := fake.NewSimpleClientset()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
collectorFactory := collector.NewCollectorFactory()
|
||||||
|
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
|
||||||
|
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||||
|
|
||||||
|
err = provider.updateHPAs()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
@@ -69,9 +69,15 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue) {
|
|||||||
Resource: "pods",
|
Resource: "pods",
|
||||||
}
|
}
|
||||||
case "Ingress":
|
case "Ingress":
|
||||||
|
// group can be either `extentions` or `networking.k8s.io`
|
||||||
|
group := "extensions"
|
||||||
|
gv, err := schema.ParseGroupVersion(value.DescribedObject.APIVersion)
|
||||||
|
if err == nil {
|
||||||
|
group = gv.Group
|
||||||
|
}
|
||||||
groupResource = schema.GroupResource{
|
groupResource = schema.GroupResource{
|
||||||
Resource: "ingresses",
|
Resource: "ingresses",
|
||||||
Group: "extensions",
|
Group: group,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -92,7 +92,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
|||||||
"address of InfluxDB 2.x server to query (e.g. http://localhost:9999)")
|
"address of InfluxDB 2.x server to query (e.g. http://localhost:9999)")
|
||||||
flags.StringVar(&o.InfluxDBToken, "influxdb-token", o.InfluxDBToken, ""+
|
flags.StringVar(&o.InfluxDBToken, "influxdb-token", o.InfluxDBToken, ""+
|
||||||
"token for InfluxDB 2.x server to query")
|
"token for InfluxDB 2.x server to query")
|
||||||
flags.StringVar(&o.InfluxDBOrgID, "influxdb-org-id", o.InfluxDBOrgID, ""+
|
flags.StringVar(&o.InfluxDBOrg, "influxdb-org", o.InfluxDBOrg, ""+
|
||||||
"organization ID for InfluxDB 2.x server to query")
|
"organization ID for InfluxDB 2.x server to query")
|
||||||
flags.StringVar(&o.ZMONKariosDBEndpoint, "zmon-kariosdb-endpoint", o.ZMONKariosDBEndpoint, ""+
|
flags.StringVar(&o.ZMONKariosDBEndpoint, "zmon-kariosdb-endpoint", o.ZMONKariosDBEndpoint, ""+
|
||||||
"url of ZMON KariosDB endpoint to query for ZMON checks")
|
"url of ZMON KariosDB endpoint to query for ZMON checks")
|
||||||
@@ -110,7 +110,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
|||||||
"whether to enable AWS external metrics")
|
"whether to enable AWS external metrics")
|
||||||
flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1")
|
flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1")
|
||||||
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
|
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
|
||||||
|
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
|
||||||
|
"disregard failing to create collectors for incompatible HPAs")
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -182,13 +183,15 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
|||||||
}
|
}
|
||||||
|
|
||||||
if o.InfluxDBAddress != "" {
|
if o.InfluxDBAddress != "" {
|
||||||
influxdbPlugin, err := collector.NewInfluxDBCollectorPlugin(client, o.InfluxDBAddress, o.InfluxDBToken, o.InfluxDBOrgID)
|
influxdbPlugin, err := collector.NewInfluxDBCollectorPlugin(client, o.InfluxDBAddress, o.InfluxDBToken, o.InfluxDBOrg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to initialize InfluxDB collector plugin: %v", err)
|
return fmt.Errorf("failed to initialize InfluxDB collector plugin: %v", err)
|
||||||
}
|
}
|
||||||
collectorFactory.RegisterExternalCollector([]string{collector.InfluxDBMetricName}, influxdbPlugin)
|
collectorFactory.RegisterExternalCollector([]string{collector.InfluxDBMetricName}, influxdbPlugin)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
plugin, _ := collector.NewHTTPCollectorPlugin()
|
||||||
|
collectorFactory.RegisterExternalCollector([]string{collector.HTTPMetricName}, plugin)
|
||||||
// register generic pod collector
|
// register generic pod collector
|
||||||
err = collectorFactory.RegisterPodsCollector("", collector.NewPodCollectorPlugin(client))
|
err = collectorFactory.RegisterPodsCollector("", collector.NewPodCollectorPlugin(client))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -228,7 +231,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
|||||||
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
|
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
|
||||||
}
|
}
|
||||||
|
|
||||||
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory)
|
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
|
||||||
|
|
||||||
go hpaProvider.Run(ctx)
|
go hpaProvider.Run(ctx)
|
||||||
|
|
||||||
@@ -308,8 +311,8 @@ type AdapterServerOptions struct {
|
|||||||
InfluxDBAddress string
|
InfluxDBAddress string
|
||||||
// InfluxDBToken is the token used for querying InfluxDB
|
// InfluxDBToken is the token used for querying InfluxDB
|
||||||
InfluxDBToken string
|
InfluxDBToken string
|
||||||
// InfluxDBOrgID is the organization ID used for querying InfluxDB
|
// InfluxDBOrg is the organization ID used for querying InfluxDB
|
||||||
InfluxDBOrgID string
|
InfluxDBOrg string
|
||||||
// ZMONKariosDBEndpoint enables ZMON check queries to the specified
|
// ZMONKariosDBEndpoint enables ZMON check queries to the specified
|
||||||
// kariosDB endpoint
|
// kariosDB endpoint
|
||||||
ZMONKariosDBEndpoint string
|
ZMONKariosDBEndpoint string
|
||||||
@@ -332,4 +335,7 @@ type AdapterServerOptions struct {
|
|||||||
MetricsAddress string
|
MetricsAddress string
|
||||||
// SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
|
// SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
|
||||||
SkipperBackendWeightAnnotation []string
|
SkipperBackendWeightAnnotation []string
|
||||||
|
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
|
||||||
|
// kube-metrics-adapter beside another Metrics Provider
|
||||||
|
DisregardIncompatibleHPAs bool
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user