mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2025-01-09 09:51:38 +00:00
Compare commits
73 Commits
scheduled-
...
v0.1.9
Author | SHA1 | Date | |
---|---|---|---|
|
e61071a36d | ||
|
c108fab55a | ||
|
be7567efea | ||
|
b677e814be | ||
|
deec4727ee | ||
|
cf369639d8 | ||
|
262a35c2ec | ||
|
607386834b | ||
|
4bdce4da4d | ||
|
801e5d7a47 | ||
|
4df21ae2b4 | ||
|
7f639baeee | ||
|
88ddb6f10e | ||
|
d9cf2e0858 | ||
|
c3b18e784b | ||
|
c3a03fd758 | ||
|
05704c0a6b | ||
|
f3a608fcf7 | ||
|
223ec9fd89 | ||
|
81255aa956 | ||
|
dac44965e3 | ||
|
dbed1570ba | ||
|
ef24244074 | ||
|
cadf2dff3c | ||
|
db83268343 | ||
|
78c2ef742b | ||
|
f0b817629c | ||
|
3e7b66070c | ||
|
f3d33b6132 | ||
|
7b7af889fb | ||
|
2bcaa80ce0 | ||
|
a2e6980b25 | ||
|
f0783dcdd8 | ||
|
560fb40dae | ||
|
41229e370d | ||
|
338393c11e | ||
|
8e7d1bc0d6 | ||
|
dc7fb5875b | ||
|
8f70d3493e | ||
|
97ec13d010 | ||
|
5e20dcd961 | ||
|
f27dbc3939 | ||
|
6443613930 | ||
|
d9c2e50f2c | ||
|
a8cd761f85 | ||
|
43fa626ca1 | ||
|
2d3ddc53ef | ||
|
02880fad2d | ||
|
405e347620 | ||
|
f15aacad5c | ||
|
c0a9f525b8 | ||
|
24207d285c | ||
|
860aba807e | ||
|
73659f8ac6 | ||
|
280d358538 | ||
|
735bb164e2 | ||
|
d28712abd1 | ||
|
670692b23c | ||
|
33683fe88d | ||
|
abaef5e491 | ||
|
670f081a5e | ||
|
55a743e5ac | ||
|
830d385a4a | ||
|
f2638c6843 | ||
|
b77fba2e7b | ||
|
e77d51f97a | ||
|
69ac42ed7d | ||
|
582f05539c | ||
|
46f2e5c4fd | ||
|
31bc1771df | ||
|
836c78d08b | ||
|
6f5ab042e6 | ||
|
5abc7388fb |
35
.github/CODEOWNERS
vendored
35
.github/CODEOWNERS
vendored
@ -1,35 +0,0 @@
|
||||
# These owners will be the default owners for everything in
|
||||
# the repo.
|
||||
* @arjunrn
|
||||
|
||||
|
||||
|
||||
# Samples for assigning codeowners below:
|
||||
# Order is important; the last matching pattern takes the most
|
||||
# precedence. When someone opens a pull request that only
|
||||
# modifies JS files, only @js-owner and not the global
|
||||
# owner(s) will be requested for a review.
|
||||
# *.js @js-owner
|
||||
|
||||
# You can also use email addresses if you prefer. They'll be
|
||||
# used to look up users just like we do for commit author
|
||||
# emails.
|
||||
# *.go docs@example.com
|
||||
|
||||
# In this example, @doctocat owns any files in the build/logs
|
||||
# directory at the root of the repository and any of its
|
||||
# subdirectories.
|
||||
# /build/logs/ @doctocat
|
||||
|
||||
# The `docs/*` pattern will match files like
|
||||
# `docs/getting-started.md` but not further nested files like
|
||||
# `docs/build-app/troubleshooting.md`.
|
||||
# docs/* docs@example.com
|
||||
|
||||
# In this example, @octocat owns any file in an apps directory
|
||||
# anywhere in your repository.
|
||||
# apps/ @octocat
|
||||
|
||||
# In this example, @doctocat owns any file in the `/docs`
|
||||
# directory in the root of your repository.
|
||||
# /docs/ @doctocat
|
14
.github/dependabot.yml
vendored
Normal file
14
.github/dependabot.yml
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: gomod
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: monthly
|
||||
time: "07:00"
|
||||
open-pull-requests-limit: 10
|
||||
- package-ecosystem: docker
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: monthly
|
||||
time: "07:00"
|
||||
open-pull-requests-limit: 10
|
@ -2,7 +2,7 @@ language: go
|
||||
dist: xenial
|
||||
|
||||
go:
|
||||
- "1.13.x"
|
||||
- "1.14.x"
|
||||
|
||||
env:
|
||||
- GO111MODULE=on GOLANGCI_RELEASE="v1.21.0"
|
||||
|
@ -1,3 +1,2 @@
|
||||
Mikkel Larsen <mikkel.larsen@zalando.de>
|
||||
Arjun Naik <arjun.naik@zalando.de>
|
||||
Team Teapot <team-teapot@zalando.de>
|
||||
|
73
README.md
73
README.md
@ -142,7 +142,7 @@ example the following JSON data would be expected:
|
||||
```
|
||||
|
||||
The json-path query support depends on the
|
||||
[github.com/oliveagle/jsonpath](https://github.com/oliveagle/jsonpath) library.
|
||||
[github.com/spyzhov/ajson](https://github.com/spyzhov/ajson) library.
|
||||
See the README for possible queries. It's expected that the metric you query
|
||||
returns something that can be turned into a `float64`.
|
||||
|
||||
@ -166,6 +166,14 @@ will create a URL like this:
|
||||
http://<podIP>:9090/metrics?foo=bar&baz=bop
|
||||
```
|
||||
|
||||
There are also configuration options for custom (connect and request) timeouts when querying pods for metrics:
|
||||
```yaml
|
||||
metric-config.pods.requests-per-second.json-path/request-timeout: 2s
|
||||
metric-config.pods.requests-per-second.json-path/connect-timeout: 500ms
|
||||
```
|
||||
|
||||
The default for both of the above values is 15 seconds.
|
||||
|
||||
## Prometheus collector
|
||||
|
||||
The Prometheus collector is a generic collector which can map Prometheus
|
||||
@ -535,9 +543,9 @@ spec:
|
||||
tag-application: my-custom-app
|
||||
aggregators: avg # comma separated list of aggregation functions, default: last
|
||||
duration: 5m # default: 10m
|
||||
target:
|
||||
averageValue: "30"
|
||||
type: AverageValue
|
||||
target:
|
||||
averageValue: "30"
|
||||
type: AverageValue
|
||||
```
|
||||
|
||||
The `check-id` specifies the ZMON check to query for the metrics. `key`
|
||||
@ -585,3 +593,60 @@ you need to define a `key` or other `tag` with a "star" query syntax like
|
||||
`values.*`. This *hack* is in place because it's not allowed to use `*` in the
|
||||
metric label definitions. If both annotations and corresponding label is
|
||||
defined, then the annotation takes precedence.
|
||||
|
||||
## HTTP Collector
|
||||
|
||||
The http collector allows collecting metrics from an external endpoint specified in the HPA.
|
||||
Currently only `json-path` collection is supported.
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type | K8s Versions |
|
||||
| ------------ | -------------- | ------- | -- |
|
||||
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Pods | `>=1.12` |
|
||||
|
||||
### Example
|
||||
|
||||
This is an example of using the HTTP collector to collect metrics from a json
|
||||
metrics endpoint specified in the annotations.
|
||||
|
||||
```yaml
|
||||
apiVersion: autoscaling/v2beta2
|
||||
kind: HorizontalPodAutoscaler
|
||||
metadata:
|
||||
name: myapp-hpa
|
||||
annotations:
|
||||
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
|
||||
metric-config.external.http.json/json-key: "$.some-metric.value"
|
||||
metric-config.external.http.json/endpoint: "http://metric-source.app-namespace:8080/metrics"
|
||||
metric-config.external.http.json/aggregator: "max"
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
name: myapp
|
||||
minReplicas: 1
|
||||
maxReplicas: 10
|
||||
metrics:
|
||||
- type: External
|
||||
external:
|
||||
metric:
|
||||
name: http
|
||||
selector:
|
||||
matchLabels:
|
||||
identifier: unique-metric-name
|
||||
target:
|
||||
averageValue: 1
|
||||
type: AverageValue
|
||||
```
|
||||
|
||||
The HTTP collector similar to the Pod Metrics collector. The metric name should always be `http`.
|
||||
This value is also used in the annotations to configure the metrics adapter to query the required
|
||||
target. The following configuration values are supported:
|
||||
|
||||
- `json-key` to specify the JSON path of the metric to be queried
|
||||
- `endpoint` the fully formed path to query for the metric. In the above example a Kubernetes _Service_
|
||||
in the namespace `app-namespace` is called.
|
||||
- `aggregator` is only required if the metric is an array of values and specifies how the values
|
||||
are aggregated. Currently this option can support the values: `sum`, `max`, `min`, `avg`.
|
||||
|
||||
|
46
go.mod
46
go.mod
@ -2,31 +2,33 @@ module github.com/zalando-incubator/kube-metrics-adapter
|
||||
|
||||
require (
|
||||
github.com/NYTimes/gziphandler v1.0.1 // indirect
|
||||
github.com/aws/aws-sdk-go v1.29.4
|
||||
github.com/coreos/go-systemd v0.0.0-20180705093442-88bfeed483d3 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
|
||||
github.com/aws/aws-sdk-go v1.35.0
|
||||
github.com/googleapis/gnostic v0.2.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
|
||||
github.com/influxdata/influxdb-client-go v0.1.4
|
||||
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
|
||||
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
|
||||
github.com/prometheus/client_golang v0.9.2
|
||||
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
|
||||
github.com/sirupsen/logrus v1.4.2
|
||||
github.com/soheilhy/cmux v0.1.4 // indirect
|
||||
github.com/spf13/cobra v0.0.3
|
||||
github.com/stretchr/testify v1.4.0
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
|
||||
github.com/influxdata/influxdb-client-go v0.1.5
|
||||
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20200618121405-54026617ec44
|
||||
github.com/lib/pq v1.2.0 // indirect
|
||||
github.com/mattn/go-colorable v0.1.4 // indirect
|
||||
github.com/onsi/gomega v1.8.1 // indirect
|
||||
github.com/prometheus/client_golang v1.7.1
|
||||
github.com/prometheus/common v0.14.0
|
||||
github.com/sirupsen/logrus v1.7.0
|
||||
github.com/spf13/cobra v0.0.7
|
||||
github.com/spyzhov/ajson v0.4.2
|
||||
github.com/stretchr/testify v1.6.1
|
||||
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
k8s.io/api v0.0.0-20190918155943-95b840bb6a1f
|
||||
k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655
|
||||
k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad // indirect
|
||||
k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90
|
||||
k8s.io/component-base v0.0.0-20190918160511-547f6c5d7090
|
||||
k8s.io/klog v0.4.0
|
||||
k8s.io/metrics v0.0.0-20190226180357-f3f09b9076d1
|
||||
golang.org/x/tools v0.0.0-20200204192400-7124308813f3 // indirect
|
||||
gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e // indirect
|
||||
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
|
||||
k8s.io/api v0.18.8
|
||||
k8s.io/apimachinery v0.18.8
|
||||
k8s.io/client-go v0.18.8
|
||||
k8s.io/component-base v0.18.8
|
||||
k8s.io/klog v1.0.0
|
||||
k8s.io/metrics v0.18.8
|
||||
)
|
||||
|
||||
// use version that provides: https://github.com/kubernetes-sigs/custom-metrics-apiserver/pull/65
|
||||
replace github.com/kubernetes-incubator/custom-metrics-apiserver => github.com/mikkeloscar/custom-metrics-apiserver v0.0.0-20200626202535-cd2d550cf55d
|
||||
|
||||
go 1.13
|
||||
|
@ -86,6 +86,21 @@ func TestParser(t *testing.T) {
|
||||
"org-id": "deadbeef",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "http metrics",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.external.http.json/json-key": "$.metric.value",
|
||||
"metric-config.external.http.json/endpoint": "http://metric-source.source-namespace.svc.cluster.local:8000/metrics",
|
||||
"metric-config.external.http.json/aggregator": "avg",
|
||||
},
|
||||
MetricName: "http",
|
||||
MetricType: autoscalingv2.ExternalMetricSourceType,
|
||||
ExpectedConfig: map[string]string{
|
||||
"json-key": "$.metric.value",
|
||||
"endpoint": "http://metric-source.source-namespace.svc.cluster.local:8000/metrics",
|
||||
"aggregator": "avg",
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
hpaMap := make(AnnotationConfigMap)
|
||||
|
103
pkg/collector/http_collector.go
Normal file
103
pkg/collector/http_collector.go
Normal file
@ -0,0 +1,103 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
|
||||
|
||||
"k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
HTTPMetricName = "http"
|
||||
HTTPEndpointAnnotationKey = "endpoint"
|
||||
HTTPJsonPathAnnotationKey = "json-key"
|
||||
identifierLabel = "identifier"
|
||||
)
|
||||
|
||||
type HTTPCollectorPlugin struct{}
|
||||
|
||||
func NewHTTPCollectorPlugin() (*HTTPCollectorPlugin, error) {
|
||||
return &HTTPCollectorPlugin{}, nil
|
||||
}
|
||||
|
||||
func (p *HTTPCollectorPlugin) NewCollector(_ *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
collector := &HTTPCollector{}
|
||||
var (
|
||||
value string
|
||||
ok bool
|
||||
)
|
||||
if value, ok = config.Config[HTTPJsonPathAnnotationKey]; !ok {
|
||||
return nil, fmt.Errorf("config value %s not found", HTTPJsonPathAnnotationKey)
|
||||
}
|
||||
jsonPath := value
|
||||
|
||||
if value, ok = config.Config[HTTPEndpointAnnotationKey]; !ok {
|
||||
return nil, fmt.Errorf("config value %s not found", HTTPEndpointAnnotationKey)
|
||||
}
|
||||
var err error
|
||||
collector.endpoint, err = url.Parse(value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
collector.interval = interval
|
||||
collector.metricType = config.Type
|
||||
if config.Metric.Selector == nil || config.Metric.Selector.MatchLabels == nil {
|
||||
return nil, fmt.Errorf("no label selector specified for metric: %s", config.Metric.Name)
|
||||
}
|
||||
if _, ok := config.Metric.Selector.MatchLabels[identifierLabel]; !ok {
|
||||
return nil, fmt.Errorf("%s is not specified as a label for metric %s", identifierLabel, config.Metric.Name)
|
||||
}
|
||||
collector.metric = config.Metric
|
||||
var aggFunc httpmetrics.AggregatorFunc
|
||||
|
||||
if val, ok := config.Config["aggregator"]; ok {
|
||||
aggFunc, err = httpmetrics.ParseAggregator(val)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
jsonPathGetter, err := httpmetrics.NewJSONPathMetricsGetter(httpmetrics.DefaultMetricsHTTPClient(), aggFunc, jsonPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
collector.metricsGetter = jsonPathGetter
|
||||
return collector, nil
|
||||
}
|
||||
|
||||
type HTTPCollector struct {
|
||||
endpoint *url.URL
|
||||
interval time.Duration
|
||||
metricType v2beta2.MetricSourceType
|
||||
metricsGetter *httpmetrics.JSONPathMetricsGetter
|
||||
metric v2beta2.MetricIdentifier
|
||||
}
|
||||
|
||||
func (c *HTTPCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
metric, err := c.metricsGetter.GetMetric(*c.endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
value := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Time{
|
||||
Time: time.Now(),
|
||||
},
|
||||
Value: *resource.NewMilliQuantity(int64(metric*1000), resource.DecimalSI),
|
||||
},
|
||||
}
|
||||
return []CollectedMetric{value}, nil
|
||||
}
|
||||
|
||||
func (c *HTTPCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
94
pkg/collector/http_collector_test.go
Normal file
94
pkg/collector/http_collector_test.go
Normal file
@ -0,0 +1,94 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/api/autoscaling/v2beta2"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
type testExternalMetricsHandler struct {
|
||||
values []int64
|
||||
test *testing.T
|
||||
}
|
||||
|
||||
func (t testExternalMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
response, err := json.Marshal(testMetricResponse{t.values})
|
||||
require.NoError(t.test, err)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, err = w.Write(response)
|
||||
require.NoError(t.test, err)
|
||||
}
|
||||
|
||||
func makeHTTPTestServer(t *testing.T, values []int64) string {
|
||||
server := httptest.NewServer(&testExternalMetricsHandler{values: values, test: t})
|
||||
return server.URL
|
||||
}
|
||||
|
||||
func TestHTTPCollector(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
values []int64
|
||||
output int
|
||||
aggregator string
|
||||
}{
|
||||
{
|
||||
name: "basic",
|
||||
values: []int64{3},
|
||||
output: 3,
|
||||
aggregator: "sum",
|
||||
},
|
||||
{
|
||||
name: "sum",
|
||||
values: []int64{3, 5, 6},
|
||||
aggregator: "sum",
|
||||
output: 14,
|
||||
},
|
||||
{
|
||||
name: "average",
|
||||
values: []int64{3, 5, 6},
|
||||
aggregator: "sum",
|
||||
output: 14,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
testServer := makeHTTPTestServer(t, tc.values)
|
||||
plugin, err := NewHTTPCollectorPlugin()
|
||||
require.NoError(t, err)
|
||||
testConfig := makeTestHTTPCollectorConfig(testServer, tc.aggregator)
|
||||
collector, err := plugin.NewCollector(nil, testConfig, testInterval)
|
||||
require.NoError(t, err)
|
||||
metrics, err := collector.GetMetrics()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, metrics)
|
||||
require.Len(t, metrics, 1)
|
||||
require.EqualValues(t, metrics[0].External.Value.Value(), tc.output)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestHTTPCollectorConfig(endpoint, aggregator string) *MetricConfig {
|
||||
config := &MetricConfig{
|
||||
MetricTypeName: MetricTypeName{
|
||||
Type: v2beta2.ExternalMetricSourceType,
|
||||
Metric: v2beta2.MetricIdentifier{
|
||||
Name: "test-metric",
|
||||
Selector: &v1.LabelSelector{
|
||||
MatchLabels: map[string]string{identifierLabel: "test-metric"},
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: map[string]string{
|
||||
HTTPJsonPathAnnotationKey: "$.values",
|
||||
HTTPEndpointAnnotationKey: endpoint,
|
||||
},
|
||||
}
|
||||
if aggregator != "" {
|
||||
config.Config["aggregator"] = aggregator
|
||||
}
|
||||
return config
|
||||
}
|
65
pkg/collector/httpmetrics/aggregator.go
Normal file
65
pkg/collector/httpmetrics/aggregator.go
Normal file
@ -0,0 +1,65 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
)
|
||||
|
||||
type AggregatorFunc func(...float64) float64
|
||||
|
||||
// Average implements the average mathematical function over a slice of float64
|
||||
func Average(values ...float64) float64 {
|
||||
sum := Sum(values...)
|
||||
return sum / float64(len(values))
|
||||
}
|
||||
|
||||
// Minimum implements the absolute minimum mathematical function over a slice of float64
|
||||
func Minimum(values ...float64) float64 {
|
||||
// initialized with positive infinity, all finite numbers are smaller than it
|
||||
curMin := math.Inf(1)
|
||||
for _, v := range values {
|
||||
if v < curMin {
|
||||
curMin = v
|
||||
}
|
||||
}
|
||||
return curMin
|
||||
}
|
||||
|
||||
// Maximum implements the absolute maximum mathematical function over a slice of float64
|
||||
func Maximum(values ...float64) float64 {
|
||||
// initialized with negative infinity, all finite numbers are bigger than it
|
||||
curMax := math.Inf(-1)
|
||||
for _, v := range values {
|
||||
if v > curMax {
|
||||
curMax = v
|
||||
}
|
||||
}
|
||||
return curMax
|
||||
}
|
||||
|
||||
// Sum implements the summation mathematical function over a slice of float64
|
||||
func Sum(values ...float64) float64 {
|
||||
res := 0.0
|
||||
|
||||
for _, v := range values {
|
||||
res += v
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
|
||||
func ParseAggregator(aggregator string) (AggregatorFunc, error) {
|
||||
switch aggregator {
|
||||
case "avg":
|
||||
return Average, nil
|
||||
case "min":
|
||||
return Minimum, nil
|
||||
case "max":
|
||||
return Maximum, nil
|
||||
case "sum":
|
||||
return Sum, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("aggregator function: %s is unknown", aggregator)
|
||||
}
|
||||
}
|
57
pkg/collector/httpmetrics/aggregator_test.go
Normal file
57
pkg/collector/httpmetrics/aggregator_test.go
Normal file
@ -0,0 +1,57 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestReduce(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
input []float64
|
||||
output float64
|
||||
aggregator string
|
||||
parseError bool
|
||||
}{
|
||||
{
|
||||
input: []float64{1, 2, 3},
|
||||
output: 2.0,
|
||||
aggregator: "avg",
|
||||
parseError: false,
|
||||
},
|
||||
{
|
||||
input: []float64{1, 2, 3},
|
||||
output: 1.0,
|
||||
aggregator: "min",
|
||||
parseError: false,
|
||||
},
|
||||
{
|
||||
input: []float64{1, 2, 3},
|
||||
output: 3.0,
|
||||
aggregator: "max",
|
||||
parseError: false,
|
||||
},
|
||||
{
|
||||
input: []float64{1, 2, 3},
|
||||
output: 6.0,
|
||||
aggregator: "sum",
|
||||
parseError: false,
|
||||
},
|
||||
{
|
||||
input: []float64{1, 2, 3},
|
||||
aggregator: "non-existent",
|
||||
parseError: true,
|
||||
},
|
||||
} {
|
||||
t.Run(fmt.Sprintf("Test function: %s", tc.aggregator), func(t *testing.T) {
|
||||
aggFunc, err := ParseAggregator(tc.aggregator)
|
||||
if tc.parseError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
val := aggFunc(tc.input...)
|
||||
require.Equal(t, tc.output, val)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
128
pkg/collector/httpmetrics/json_path.go
Normal file
128
pkg/collector/httpmetrics/json_path.go
Normal file
@ -0,0 +1,128 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/spyzhov/ajson"
|
||||
)
|
||||
|
||||
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
|
||||
// querying the pods metrics endpoint and lookup the metric value as defined by
|
||||
// the json path query.
|
||||
type JSONPathMetricsGetter struct {
|
||||
jsonPath string
|
||||
aggregator AggregatorFunc
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
||||
func NewJSONPathMetricsGetter(httpClient *http.Client, aggregatorFunc AggregatorFunc, jsonPath string) (*JSONPathMetricsGetter, error) {
|
||||
// check that jsonPath parses
|
||||
_, err := ajson.ParseJSONPath(jsonPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &JSONPathMetricsGetter{client: httpClient, aggregator: aggregatorFunc, jsonPath: jsonPath}, nil
|
||||
}
|
||||
|
||||
var DefaultRequestTimeout = 15 * time.Second
|
||||
var DefaultConnectTimeout = 15 * time.Second
|
||||
|
||||
func CustomMetricsHTTPClient(requestTimeout time.Duration, connectTimeout time.Duration) *http.Client {
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: connectTimeout,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 50,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
},
|
||||
Timeout: requestTimeout,
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
func DefaultMetricsHTTPClient() *http.Client {
|
||||
return CustomMetricsHTTPClient(DefaultRequestTimeout, DefaultConnectTimeout)
|
||||
}
|
||||
|
||||
// GetMetric gets metric from pod by fetching json metrics from the pods metric
|
||||
// endpoint and extracting the desired value using the specified json path
|
||||
// query.
|
||||
func (g *JSONPathMetricsGetter) GetMetric(metricsURL url.URL) (float64, error) {
|
||||
data, err := g.fetchMetrics(metricsURL)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// parse data
|
||||
root, err := ajson.Unmarshal(data)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
nodes, err := root.JSONPath(g.jsonPath)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if len(nodes) != 1 {
|
||||
return 0, fmt.Errorf("unexpected json: expected single numeric or array value")
|
||||
}
|
||||
|
||||
node := nodes[0]
|
||||
if node.IsArray() {
|
||||
if g.aggregator == nil {
|
||||
return 0, fmt.Errorf("no aggregator function has been specified")
|
||||
}
|
||||
values := make([]float64, 0, len(nodes))
|
||||
items, _ := node.GetArray()
|
||||
for _, item := range items {
|
||||
value, err := item.GetNumeric()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("did not find numeric type: %w", err)
|
||||
}
|
||||
values = append(values, value)
|
||||
}
|
||||
return g.aggregator(values...), nil
|
||||
} else if node.IsNumeric() {
|
||||
res, _ := node.GetNumeric()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
value, err := node.Value()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to check value of jsonPath result: %w", err)
|
||||
}
|
||||
return 0, fmt.Errorf("unsupported type %T", value)
|
||||
}
|
||||
|
||||
func (g *JSONPathMetricsGetter) fetchMetrics(metricsURL url.URL) ([]byte, error) {
|
||||
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := g.client.Do(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
84
pkg/collector/httpmetrics/json_path_test.go
Normal file
84
pkg/collector/httpmetrics/json_path_test.go
Normal file
@ -0,0 +1,84 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func makeTestHTTPServer(t *testing.T, response []byte) *httptest.Server {
|
||||
h := func(w http.ResponseWriter, r *http.Request) {
|
||||
require.Equal(t, r.URL.Path, "/metrics")
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, err := w.Write(response)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
return httptest.NewServer(http.HandlerFunc(h))
|
||||
}
|
||||
|
||||
func TestJSONPathMetricsGetter(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
jsonResponse []byte
|
||||
jsonPath string
|
||||
result float64
|
||||
aggregator AggregatorFunc
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "basic single value",
|
||||
jsonResponse: []byte(`{"value":3}`),
|
||||
jsonPath: "$.value",
|
||||
result: 3,
|
||||
aggregator: Average,
|
||||
},
|
||||
{
|
||||
name: "basic average",
|
||||
jsonResponse: []byte(`{"value":[3,4,5]}`),
|
||||
jsonPath: "$.value",
|
||||
result: 4,
|
||||
aggregator: Average,
|
||||
},
|
||||
{
|
||||
name: "dotted key",
|
||||
jsonResponse: []byte(`{"metric.value":5}`),
|
||||
jsonPath: "$['metric.value']",
|
||||
result: 5,
|
||||
aggregator: Average,
|
||||
},
|
||||
{
|
||||
name: "json path not resulting in array or number should lead to error",
|
||||
jsonResponse: []byte(`{"metric.value":5}`),
|
||||
jsonPath: "$['invalid.metric.values']",
|
||||
err: errors.New("unexpected json: expected single numeric or array value"),
|
||||
},
|
||||
{
|
||||
name: "invalid json should error",
|
||||
jsonResponse: []byte(`{`),
|
||||
jsonPath: "$['invalid.metric.values']",
|
||||
err: errors.New("unexpected end of file"),
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
server := makeTestHTTPServer(t, tc.jsonResponse)
|
||||
defer server.Close()
|
||||
getter, err := NewJSONPathMetricsGetter(DefaultMetricsHTTPClient(), tc.aggregator, tc.jsonPath)
|
||||
require.NoError(t, err)
|
||||
url, err := url.Parse(fmt.Sprintf("%s/metrics", server.URL))
|
||||
require.NoError(t, err)
|
||||
metric, err := getter.GetMetric(*url)
|
||||
if tc.err != nil {
|
||||
require.Error(t, err)
|
||||
require.Equal(t, tc.err.Error(), err.Error())
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.result, metric)
|
||||
})
|
||||
}
|
||||
}
|
118
pkg/collector/httpmetrics/pod_metrics.go
Normal file
118
pkg/collector/httpmetrics/pod_metrics.go
Normal file
@ -0,0 +1,118 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
type PodMetricsGetter interface {
|
||||
GetMetric(pod *v1.Pod) (float64, error)
|
||||
}
|
||||
|
||||
type PodMetricsJSONPathGetter struct {
|
||||
scheme string
|
||||
path string
|
||||
rawQuery string
|
||||
port int
|
||||
metricGetter *JSONPathMetricsGetter
|
||||
}
|
||||
|
||||
func (g PodMetricsJSONPathGetter) GetMetric(pod *v1.Pod) (float64, error) {
|
||||
if pod.Status.PodIP == "" {
|
||||
return 0, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Name)
|
||||
}
|
||||
metricsURL := g.buildMetricsURL(pod.Status.PodIP)
|
||||
return g.metricGetter.GetMetric(metricsURL)
|
||||
}
|
||||
|
||||
func NewPodMetricsJSONPathGetter(config map[string]string) (*PodMetricsJSONPathGetter, error) {
|
||||
getter := PodMetricsJSONPathGetter{}
|
||||
var (
|
||||
jsonPath string
|
||||
aggregator AggregatorFunc
|
||||
err error
|
||||
)
|
||||
|
||||
if v, ok := config["json-key"]; ok {
|
||||
jsonPath = v
|
||||
}
|
||||
|
||||
if v, ok := config["scheme"]; ok {
|
||||
getter.scheme = v
|
||||
}
|
||||
|
||||
if v, ok := config["path"]; ok {
|
||||
getter.path = v
|
||||
}
|
||||
|
||||
if v, ok := config["raw-query"]; ok {
|
||||
getter.rawQuery = v
|
||||
}
|
||||
|
||||
if v, ok := config["port"]; ok {
|
||||
n, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getter.port = n
|
||||
}
|
||||
|
||||
if v, ok := config["aggregator"]; ok {
|
||||
aggregator, err = ParseAggregator(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
requestTimeout := DefaultRequestTimeout
|
||||
connectTimeout := DefaultConnectTimeout
|
||||
|
||||
if v, ok := config["request-timeout"]; ok {
|
||||
d, err := time.ParseDuration(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if d < 0 {
|
||||
return nil, fmt.Errorf("Invalid request-timeout config value: %s", v)
|
||||
}
|
||||
requestTimeout = d
|
||||
}
|
||||
|
||||
if v, ok := config["connect-timeout"]; ok {
|
||||
d, err := time.ParseDuration(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if d < 0 {
|
||||
return nil, fmt.Errorf("Invalid connect-timeout config value: %s", v)
|
||||
}
|
||||
connectTimeout = d
|
||||
}
|
||||
|
||||
jsonPathGetter, err := NewJSONPathMetricsGetter(CustomMetricsHTTPClient(requestTimeout, connectTimeout), aggregator, jsonPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getter.metricGetter = jsonPathGetter
|
||||
return &getter, nil
|
||||
}
|
||||
|
||||
// buildMetricsURL will build the full URL needed to hit the pod metric endpoint.
|
||||
func (g *PodMetricsJSONPathGetter) buildMetricsURL(podIP string) url.URL {
|
||||
var scheme = g.scheme
|
||||
|
||||
if scheme == "" {
|
||||
scheme = "http"
|
||||
}
|
||||
|
||||
return url.URL{
|
||||
Scheme: scheme,
|
||||
Host: fmt.Sprintf("%s:%d", podIP, g.port),
|
||||
Path: g.path,
|
||||
RawQuery: g.rawQuery,
|
||||
}
|
||||
}
|
190
pkg/collector/httpmetrics/pod_metrics_test.go
Normal file
190
pkg/collector/httpmetrics/pod_metrics_test.go
Normal file
@ -0,0 +1,190 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func compareMetricsGetter(t *testing.T, first, second *PodMetricsJSONPathGetter) {
|
||||
require.Equal(t, first.metricGetter.jsonPath, second.metricGetter.jsonPath)
|
||||
require.Equal(t, first.scheme, second.scheme)
|
||||
require.Equal(t, first.path, second.path)
|
||||
require.Equal(t, first.port, second.port)
|
||||
}
|
||||
|
||||
func TestNewPodJSONPathMetricsGetter(t *testing.T) {
|
||||
configNoAggregator := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
}
|
||||
getterNoAggregator, err1 := NewPodMetricsJSONPathGetter(configNoAggregator)
|
||||
|
||||
require.NoError(t, err1)
|
||||
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
|
||||
metricGetter: &JSONPathMetricsGetter{jsonPath: configNoAggregator["json-key"]},
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
}, getterNoAggregator)
|
||||
|
||||
configAggregator := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
"aggregator": "avg",
|
||||
}
|
||||
getterAggregator, err2 := NewPodMetricsJSONPathGetter(configAggregator)
|
||||
|
||||
require.NoError(t, err2)
|
||||
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
|
||||
metricGetter: &JSONPathMetricsGetter{jsonPath: configAggregator["json-key"], aggregator: Average},
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
}, getterAggregator)
|
||||
|
||||
configErrorJSONPath := map[string]string{
|
||||
"json-key": "{}",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
}
|
||||
|
||||
_, err3 := NewPodMetricsJSONPathGetter(configErrorJSONPath)
|
||||
require.Error(t, err3)
|
||||
|
||||
configErrorPort := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "a9090",
|
||||
}
|
||||
|
||||
_, err4 := NewPodMetricsJSONPathGetter(configErrorPort)
|
||||
require.Error(t, err4)
|
||||
|
||||
configWithRawQuery := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
"raw-query": "foo=bar&baz=bop",
|
||||
}
|
||||
getterWithRawQuery, err5 := NewPodMetricsJSONPathGetter(configWithRawQuery)
|
||||
|
||||
require.NoError(t, err5)
|
||||
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
|
||||
metricGetter: &JSONPathMetricsGetter{jsonPath: configWithRawQuery["json-key"]},
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
rawQuery: "foo=bar&baz=bop",
|
||||
}, getterWithRawQuery)
|
||||
}
|
||||
|
||||
func TestBuildMetricsURL(t *testing.T) {
|
||||
scheme := "http"
|
||||
ip := "1.2.3.4"
|
||||
port := "9090"
|
||||
path := "/v1/test/"
|
||||
rawQuery := "foo=bar&baz=bop"
|
||||
|
||||
// Test building URL with rawQuery
|
||||
configWithRawQuery := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"raw-query": rawQuery,
|
||||
}
|
||||
getterWithRawQuery, err1 := NewPodMetricsJSONPathGetter(configWithRawQuery)
|
||||
require.NoError(t, err1)
|
||||
|
||||
expectedURLWithQuery := fmt.Sprintf("%s://%s:%s%s?%s", scheme, ip, port, path, rawQuery)
|
||||
receivedURLWithQuery := getterWithRawQuery.buildMetricsURL(ip)
|
||||
require.Equal(t, receivedURLWithQuery.String(), expectedURLWithQuery)
|
||||
|
||||
// Test building URL without rawQuery
|
||||
configWithNoQuery := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
}
|
||||
getterWithNoQuery, err3 := NewPodMetricsJSONPathGetter(configWithNoQuery)
|
||||
require.NoError(t, err3)
|
||||
|
||||
expectedURLNoQuery := fmt.Sprintf("%s://%s:%s%s", scheme, ip, port, path)
|
||||
receivedURLNoQuery := getterWithNoQuery.buildMetricsURL(ip)
|
||||
require.Equal(t, receivedURLNoQuery.String(), expectedURLNoQuery)
|
||||
}
|
||||
|
||||
func TestCustomTimeouts(t *testing.T) {
|
||||
scheme := "http"
|
||||
port := "9090"
|
||||
path := "/v1/test/"
|
||||
|
||||
// Test no custom options results in default timeouts
|
||||
defaultConfig := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
}
|
||||
defaultTime := time.Duration(15000) * time.Millisecond
|
||||
|
||||
defaultGetter, err1 := NewPodMetricsJSONPathGetter(defaultConfig)
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, defaultGetter.metricGetter.client.Timeout, defaultTime)
|
||||
|
||||
// Test with custom request timeout
|
||||
configWithRequestTimeout := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"request-timeout": "978ms",
|
||||
}
|
||||
exectedTimeout := time.Duration(978) * time.Millisecond
|
||||
customRequestGetter, err2 := NewPodMetricsJSONPathGetter(configWithRequestTimeout)
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, customRequestGetter.metricGetter.client.Timeout, exectedTimeout)
|
||||
|
||||
// Test with custom connect timeout. Unfortunately, it seems there's no way to access the
|
||||
// connect timeout of the client struct to actually verify it's set :/
|
||||
configWithConnectTimeout := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"connect-timeout": "512ms",
|
||||
}
|
||||
_, err3 := NewPodMetricsJSONPathGetter(configWithConnectTimeout)
|
||||
require.NoError(t, err3)
|
||||
|
||||
configWithInvalidTimeout := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"request-timeout": "-256ms",
|
||||
}
|
||||
_, err4 := NewPodMetricsJSONPathGetter(configWithInvalidTimeout)
|
||||
require.Error(t, err4)
|
||||
|
||||
configWithInvalidTimeout = map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"connect-timeout": "-256ms",
|
||||
}
|
||||
_, err5 := NewPodMetricsJSONPathGetter(configWithInvalidTimeout)
|
||||
require.Error(t, err5)
|
||||
}
|
@ -1,253 +0,0 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/oliveagle/jsonpath"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
|
||||
// querying the pods metrics endpoint and lookup the metric value as defined by
|
||||
// the json path query.
|
||||
type JSONPathMetricsGetter struct {
|
||||
jsonPath *jsonpath.Compiled
|
||||
scheme string
|
||||
path string
|
||||
port int
|
||||
aggregator string
|
||||
client *http.Client
|
||||
rawQuery string
|
||||
}
|
||||
|
||||
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
||||
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
|
||||
httpClient := defaultHTTPClient()
|
||||
getter := &JSONPathMetricsGetter{client: httpClient}
|
||||
|
||||
if v, ok := config["json-key"]; ok {
|
||||
path, err := jsonpath.Compile(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
|
||||
}
|
||||
|
||||
getter.jsonPath = path
|
||||
}
|
||||
|
||||
if v, ok := config["scheme"]; ok {
|
||||
getter.scheme = v
|
||||
}
|
||||
|
||||
if v, ok := config["path"]; ok {
|
||||
getter.path = v
|
||||
}
|
||||
|
||||
if v, ok := config["raw-query"]; ok {
|
||||
getter.rawQuery = v
|
||||
}
|
||||
|
||||
if v, ok := config["port"]; ok {
|
||||
n, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getter.port = n
|
||||
}
|
||||
|
||||
if v, ok := config["aggregator"]; ok {
|
||||
getter.aggregator = v
|
||||
}
|
||||
|
||||
return getter, nil
|
||||
}
|
||||
|
||||
func defaultHTTPClient() *http.Client {
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 15 * time.Second,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 50,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
},
|
||||
Timeout: 15 * time.Second,
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
// GetMetric gets metric from pod by fetching json metrics from the pods metric
|
||||
// endpoint and extracting the desired value using the specified json path
|
||||
// query.
|
||||
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
|
||||
data, err := g.getPodMetrics(pod)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// parse data
|
||||
var jsonData interface{}
|
||||
err = json.Unmarshal(data, &jsonData)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
res, err := g.jsonPath.Lookup(jsonData)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
switch res := res.(type) {
|
||||
case int:
|
||||
return float64(res), nil
|
||||
case float32:
|
||||
return float64(res), nil
|
||||
case float64:
|
||||
return res, nil
|
||||
case []interface{}:
|
||||
s, err := castSlice(res)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return reduce(s, g.aggregator)
|
||||
default:
|
||||
return 0, fmt.Errorf("unsupported type %T", res)
|
||||
}
|
||||
}
|
||||
|
||||
// castSlice takes a slice of interface and returns a slice of float64 if all
|
||||
// values in slice were castable, else returns an error
|
||||
func castSlice(in []interface{}) ([]float64, error) {
|
||||
out := []float64{}
|
||||
|
||||
for _, v := range in {
|
||||
switch v := v.(type) {
|
||||
case int:
|
||||
out = append(out, float64(v))
|
||||
case float32:
|
||||
out = append(out, float64(v))
|
||||
case float64:
|
||||
out = append(out, v)
|
||||
default:
|
||||
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// getPodMetrics returns the content of the pods metrics endpoint.
|
||||
func (g *JSONPathMetricsGetter) getPodMetrics(pod *corev1.Pod) ([]byte, error) {
|
||||
if pod.Status.PodIP == "" {
|
||||
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
|
||||
}
|
||||
|
||||
metricsURL := g.buildMetricsURL(pod.Status.PodIP)
|
||||
|
||||
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := g.client.Do(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// buildMetricsURL will build the full URL needed to hit the pod metric endpoint.
|
||||
func (g *JSONPathMetricsGetter) buildMetricsURL(podIP string) url.URL {
|
||||
var scheme = g.scheme
|
||||
|
||||
if scheme == "" {
|
||||
scheme = "http"
|
||||
}
|
||||
|
||||
return url.URL{
|
||||
Scheme: scheme,
|
||||
Host: fmt.Sprintf("%s:%d", podIP, g.port),
|
||||
Path: g.path,
|
||||
RawQuery: g.rawQuery,
|
||||
}
|
||||
}
|
||||
|
||||
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
|
||||
func reduce(values []float64, aggregator string) (float64, error) {
|
||||
switch aggregator {
|
||||
case "avg":
|
||||
return avg(values), nil
|
||||
case "min":
|
||||
return min(values), nil
|
||||
case "max":
|
||||
return max(values), nil
|
||||
case "sum":
|
||||
return sum(values), nil
|
||||
default:
|
||||
return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator)
|
||||
}
|
||||
}
|
||||
|
||||
// avg implements the average mathematical function over a slice of float64
|
||||
func avg(values []float64) float64 {
|
||||
sum := sum(values)
|
||||
return sum / float64(len(values))
|
||||
}
|
||||
|
||||
// min implements the absolute minimum mathematical function over a slice of float64
|
||||
func min(values []float64) float64 {
|
||||
// initialized with positive infinity, all finite numbers are smaller than it
|
||||
curMin := math.Inf(1)
|
||||
|
||||
for _, v := range values {
|
||||
if v < curMin {
|
||||
curMin = v
|
||||
}
|
||||
}
|
||||
|
||||
return curMin
|
||||
}
|
||||
|
||||
// max implements the absolute maximum mathematical function over a slice of float64
|
||||
func max(values []float64) float64 {
|
||||
// initialized with negative infinity, all finite numbers are bigger than it
|
||||
curMax := math.Inf(-1)
|
||||
|
||||
for _, v := range values {
|
||||
if v > curMax {
|
||||
curMax = v
|
||||
}
|
||||
}
|
||||
|
||||
return curMax
|
||||
}
|
||||
|
||||
// sum implements the summation mathematical function over a slice of float64
|
||||
func sum(values []float64) float64 {
|
||||
res := 0.0
|
||||
|
||||
for _, v := range values {
|
||||
res += v
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
@ -1,173 +0,0 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/oliveagle/jsonpath"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func compareMetricsGetter(t *testing.T, first, second *JSONPathMetricsGetter) {
|
||||
require.Equal(t, first.jsonPath, second.jsonPath)
|
||||
require.Equal(t, first.scheme, second.scheme)
|
||||
require.Equal(t, first.path, second.path)
|
||||
require.Equal(t, first.port, second.port)
|
||||
}
|
||||
|
||||
func TestNewJSONPathMetricsGetter(t *testing.T) {
|
||||
configNoAggregator := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
}
|
||||
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
|
||||
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
|
||||
|
||||
require.NoError(t, err1)
|
||||
compareMetricsGetter(t, &JSONPathMetricsGetter{
|
||||
jsonPath: jpath1,
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
}, getterNoAggregator)
|
||||
|
||||
configAggregator := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
"aggregator": "avg",
|
||||
}
|
||||
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
|
||||
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
|
||||
|
||||
require.NoError(t, err2)
|
||||
compareMetricsGetter(t, &JSONPathMetricsGetter{
|
||||
jsonPath: jpath2,
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
aggregator: "avg",
|
||||
}, getterAggregator)
|
||||
|
||||
configErrorJSONPath := map[string]string{
|
||||
"json-key": "{}",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
}
|
||||
|
||||
_, err3 := NewJSONPathMetricsGetter(configErrorJSONPath)
|
||||
require.Error(t, err3)
|
||||
|
||||
configErrorPort := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "a9090",
|
||||
}
|
||||
|
||||
_, err4 := NewJSONPathMetricsGetter(configErrorPort)
|
||||
require.Error(t, err4)
|
||||
|
||||
configWithRawQuery := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
"raw-query": "foo=bar&baz=bop",
|
||||
}
|
||||
jpath5, _ := jsonpath.Compile(configWithRawQuery["json-key"])
|
||||
getterWithRawQuery, err5 := NewJSONPathMetricsGetter(configWithRawQuery)
|
||||
|
||||
require.NoError(t, err5)
|
||||
compareMetricsGetter(t, &JSONPathMetricsGetter{
|
||||
jsonPath: jpath5,
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
rawQuery: "foo=bar&baz=bop",
|
||||
}, getterWithRawQuery)
|
||||
}
|
||||
|
||||
func TestCastSlice(t *testing.T) {
|
||||
res1, err1 := castSlice([]interface{}{1, 2, 3})
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
|
||||
|
||||
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
|
||||
|
||||
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
|
||||
require.NoError(t, err3)
|
||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
|
||||
|
||||
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
|
||||
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
|
||||
require.Equal(t, []float64(nil), res4)
|
||||
}
|
||||
|
||||
func TestReduce(t *testing.T) {
|
||||
average, err1 := reduce([]float64{1, 2, 3}, "avg")
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, 2.0, average)
|
||||
|
||||
min, err2 := reduce([]float64{1, 2, 3}, "min")
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, 1.0, min)
|
||||
|
||||
max, err3 := reduce([]float64{1, 2, 3}, "max")
|
||||
require.NoError(t, err3)
|
||||
require.Equal(t, 3.0, max)
|
||||
|
||||
sum, err4 := reduce([]float64{1, 2, 3}, "sum")
|
||||
require.NoError(t, err4)
|
||||
require.Equal(t, 6.0, sum)
|
||||
|
||||
_, err5 := reduce([]float64{1, 2, 3}, "inexistent_function")
|
||||
require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function")
|
||||
}
|
||||
|
||||
func TestBuildMetricsURL(t *testing.T) {
|
||||
scheme := "http"
|
||||
ip := "1.2.3.4"
|
||||
port := "9090"
|
||||
path := "/v1/test/"
|
||||
rawQuery := "foo=bar&baz=bop"
|
||||
|
||||
// Test building URL with rawQuery
|
||||
configWithRawQuery := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"raw-query": rawQuery,
|
||||
}
|
||||
_, err := jsonpath.Compile(configWithRawQuery["json-key"])
|
||||
require.NoError(t, err)
|
||||
getterWithRawQuery, err1 := NewJSONPathMetricsGetter(configWithRawQuery)
|
||||
require.NoError(t, err1)
|
||||
|
||||
expectedURLWithQuery := fmt.Sprintf("%s://%s:%s%s?%s", scheme, ip, port, path, rawQuery)
|
||||
receivedURLWithQuery := getterWithRawQuery.buildMetricsURL(ip)
|
||||
require.Equal(t, receivedURLWithQuery.String(), expectedURLWithQuery)
|
||||
|
||||
// Test building URL without rawQuery
|
||||
configWithNoQuery := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
}
|
||||
_, err2 := jsonpath.Compile(configWithNoQuery["json-key"])
|
||||
require.NoError(t, err2)
|
||||
getterWithNoQuery, err3 := NewJSONPathMetricsGetter(configWithNoQuery)
|
||||
require.NoError(t, err3)
|
||||
|
||||
expectedURLNoQuery := fmt.Sprintf("%s://%s:%s%s", scheme, ip, port, path)
|
||||
receivedURLNoQuery := getterWithNoQuery.buildMetricsURL(ip)
|
||||
require.Equal(t, receivedURLNoQuery.String(), expectedURLNoQuery)
|
||||
}
|
@ -1,11 +1,13 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
@ -31,7 +33,7 @@ func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutosc
|
||||
|
||||
type PodCollector struct {
|
||||
client kubernetes.Interface
|
||||
Getter PodMetricsGetter
|
||||
Getter httpmetrics.PodMetricsGetter
|
||||
podLabelSelector *metav1.LabelSelector
|
||||
namespace string
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
@ -41,10 +43,6 @@ type PodCollector struct {
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
type PodMetricsGetter interface {
|
||||
GetMetric(pod *corev1.Pod) (float64, error)
|
||||
}
|
||||
|
||||
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
|
||||
// get pod selector based on HPA scale target ref
|
||||
selector, err := getPodLabelSelector(client, hpa)
|
||||
@ -62,11 +60,11 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalP
|
||||
logger: log.WithFields(log.Fields{"Collector": "Pod"}),
|
||||
}
|
||||
|
||||
var getter PodMetricsGetter
|
||||
var getter httpmetrics.PodMetricsGetter
|
||||
switch config.CollectorName {
|
||||
case "json-path":
|
||||
var err error
|
||||
getter, err = NewJSONPathMetricsGetter(config.Config)
|
||||
getter, err = httpmetrics.NewPodMetricsJSONPathGetter(config.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -84,37 +82,25 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
LabelSelector: labels.Set(c.podLabelSelector.MatchLabels).String(),
|
||||
}
|
||||
|
||||
pods, err := c.client.CoreV1().Pods(c.namespace).List(opts)
|
||||
pods, err := c.client.CoreV1().Pods(c.namespace).List(context.TODO(), opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
values := make([]CollectedMetric, 0, len(pods.Items))
|
||||
|
||||
// TODO: get metrics in parallel
|
||||
ch := make(chan CollectedMetric)
|
||||
errCh := make(chan error)
|
||||
for _, pod := range pods.Items {
|
||||
value, err := c.Getter.GetMetric(&pod)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
|
||||
continue
|
||||
}
|
||||
go c.getPodMetric(pod, ch, errCh)
|
||||
}
|
||||
|
||||
metricValue := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Custom: custom_metrics.MetricValue{
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
APIVersion: "v1",
|
||||
Kind: "Pod",
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
},
|
||||
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
|
||||
},
|
||||
values := make([]CollectedMetric, 0, len(pods.Items))
|
||||
for i := 0; i < len(pods.Items); i++ {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
c.logger.Error(err)
|
||||
case resp := <-ch:
|
||||
values = append(values, resp)
|
||||
}
|
||||
|
||||
values = append(values, metricValue)
|
||||
}
|
||||
|
||||
return values, nil
|
||||
@ -124,16 +110,39 @@ func (c *PodCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
||||
|
||||
func (c *PodCollector) getPodMetric(pod corev1.Pod, ch chan CollectedMetric, errCh chan error) {
|
||||
value, err := c.Getter.GetMetric(&pod)
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
ch <- CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Custom: custom_metrics.MetricValue{
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
APIVersion: "v1",
|
||||
Kind: "Pod",
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
},
|
||||
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) {
|
||||
switch hpa.Spec.ScaleTargetRef.Kind {
|
||||
case "Deployment":
|
||||
deployment, err := client.AppsV1().Deployments(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||
deployment, err := client.AppsV1().Deployments(hpa.Namespace).Get(context.TODO(), hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return deployment.Spec.Selector, nil
|
||||
case "StatefulSet":
|
||||
sts, err := client.AppsV1().StatefulSets(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||
sts, err := client.AppsV1().StatefulSets(hpa.Namespace).Get(context.TODO(), hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
156
pkg/collector/pod_collector_test.go
Normal file
156
pkg/collector/pod_collector_test.go
Normal file
@ -0,0 +1,156 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
)
|
||||
|
||||
const (
|
||||
testNamespace = "test-namespace"
|
||||
applicationLabelName = "application"
|
||||
applicationLabelValue = "test-application"
|
||||
testDeploymentName = "test-application"
|
||||
testInterval = 10 * time.Second
|
||||
)
|
||||
|
||||
func TestPodCollector(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
metrics [][]int64
|
||||
result []int64
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
|
||||
result: []int64{1, 3, 8, 5, 2},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
client := fake.NewSimpleClientset()
|
||||
plugin := NewPodCollectorPlugin(client)
|
||||
makeTestDeployment(t, client)
|
||||
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
|
||||
makeTestPods(t, host, port, "test-metric", client, 5)
|
||||
testHPA := makeTestHPA(t, client)
|
||||
testConfig := makeTestConfig(port)
|
||||
collector, err := plugin.NewCollector(testHPA, testConfig, testInterval)
|
||||
require.NoError(t, err)
|
||||
metrics, err := collector.GetMetrics()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
|
||||
var values []int64
|
||||
for _, m := range metrics {
|
||||
values = append(values, m.Custom.Value.Value())
|
||||
}
|
||||
require.ElementsMatch(t, tc.result, values)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type testMetricResponse struct {
|
||||
Values []int64 `json:"values"`
|
||||
}
|
||||
type testMetricsHandler struct {
|
||||
values [][]int64
|
||||
calledCounter uint
|
||||
t *testing.T
|
||||
metricsPath string
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func (h *testMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
require.Equal(h.t, h.metricsPath, r.URL.Path)
|
||||
require.Less(h.t, int(h.calledCounter), len(h.values))
|
||||
response, err := json.Marshal(testMetricResponse{Values: h.values[h.calledCounter]})
|
||||
require.NoError(h.t, err)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, err = w.Write(response)
|
||||
require.NoError(h.t, err)
|
||||
h.calledCounter++
|
||||
}
|
||||
|
||||
func makeTestHTTPServer(t *testing.T, values [][]int64) (string, string, *testMetricsHandler) {
|
||||
metricsHandler := &testMetricsHandler{values: values, t: t, metricsPath: "/metrics"}
|
||||
server := httptest.NewServer(metricsHandler)
|
||||
url, err := url.Parse(server.URL)
|
||||
require.NoError(t, err)
|
||||
return url.Hostname(), url.Port(), metricsHandler
|
||||
}
|
||||
|
||||
func makeTestConfig(port string) *MetricConfig {
|
||||
return &MetricConfig{
|
||||
CollectorName: "json-path",
|
||||
Config: map[string]string{"json-key": "$.values", "port": port, "path": "/metrics", "aggregator": "sum"},
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int) {
|
||||
for i := 0; i < replicas; i++ {
|
||||
testPod := &corev1.Pod{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: fmt.Sprintf("test-pod-%d", i),
|
||||
Labels: map[string]string{applicationLabelName: applicationLabelValue},
|
||||
Annotations: map[string]string{
|
||||
fmt.Sprintf("metric-config.pods.%s.json-path/port", metricName): port,
|
||||
},
|
||||
},
|
||||
Status: corev1.PodStatus{
|
||||
PodIP: testServer,
|
||||
},
|
||||
}
|
||||
_, err := client.CoreV1().Pods(testNamespace).Create(context.TODO(), testPod, v1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestDeployment(t *testing.T, client kubernetes.Interface) *appsv1.Deployment {
|
||||
deployment := appsv1.Deployment{
|
||||
ObjectMeta: v1.ObjectMeta{Name: testDeploymentName},
|
||||
Spec: appsv1.DeploymentSpec{
|
||||
Selector: &v1.LabelSelector{
|
||||
MatchLabels: map[string]string{applicationLabelName: applicationLabelValue},
|
||||
},
|
||||
},
|
||||
}
|
||||
_, err := client.AppsV1().Deployments(testNamespace).Create(context.TODO(), &deployment, v1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
return &deployment
|
||||
|
||||
}
|
||||
|
||||
func makeTestHPA(t *testing.T, client kubernetes.Interface) *autoscalingv2.HorizontalPodAutoscaler {
|
||||
hpa := &autoscalingv2.HorizontalPodAutoscaler{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: "test-hpa",
|
||||
Namespace: testNamespace,
|
||||
},
|
||||
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
|
||||
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
|
||||
Kind: "Deployment",
|
||||
Name: testDeploymentName,
|
||||
APIVersion: "apps/v1",
|
||||
},
|
||||
},
|
||||
}
|
||||
_, err := client.AutoscalingV2beta2().HorizontalPodAutoscalers("test-namespace").Create(context.TODO(), hpa, v1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
return hpa
|
||||
}
|
@ -128,7 +128,7 @@ func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa
|
||||
|
||||
func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
// TODO: use real context
|
||||
value, err := c.promAPI.Query(context.Background(), c.query, time.Now().UTC())
|
||||
value, _, err := c.promAPI.Query(context.Background(), c.query, time.Now().UTC())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -87,16 +88,16 @@ func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hp
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getAnnotationWeight(backendWeights string, backend string) float64 {
|
||||
var weightsMap map[string]int
|
||||
func getAnnotationWeight(backendWeights string, backend string) (float64, error) {
|
||||
var weightsMap map[string]float64
|
||||
err := json.Unmarshal([]byte(backendWeights), &weightsMap)
|
||||
if err != nil {
|
||||
return 0
|
||||
return 0, err
|
||||
}
|
||||
if weight, ok := weightsMap[backend]; ok {
|
||||
return float64(weight) / 100
|
||||
return float64(weight) / 100, nil
|
||||
}
|
||||
return 0
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func getWeights(ingressAnnotations map[string]string, backendAnnotations []string, backend string) (float64, error) {
|
||||
@ -106,7 +107,11 @@ func getWeights(ingressAnnotations map[string]string, backendAnnotations []strin
|
||||
for _, anno := range backendAnnotations {
|
||||
if weightsMap, ok := ingressAnnotations[anno]; ok {
|
||||
annotationsPresent = true
|
||||
maxWeight = math.Max(maxWeight, getAnnotationWeight(weightsMap, backend))
|
||||
weight, err := getAnnotationWeight(weightsMap, backend)
|
||||
if err != nil {
|
||||
return 0.0, err
|
||||
}
|
||||
maxWeight = math.Max(maxWeight, weight)
|
||||
}
|
||||
}
|
||||
|
||||
@ -125,7 +130,7 @@ func getWeights(ingressAnnotations map[string]string, backendAnnotations []strin
|
||||
|
||||
// getCollector returns a collector for getting the metrics.
|
||||
func (c *SkipperCollector) getCollector() (Collector, error) {
|
||||
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
|
||||
ingress, err := c.client.NetworkingV1beta1().Ingresses(c.objectReference.Namespace).Get(context.TODO(), c.objectReference.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -207,13 +212,13 @@ func targetRefReplicas(client kubernetes.Interface, hpa *autoscalingv2.Horizonta
|
||||
var replicas int32
|
||||
switch hpa.Spec.ScaleTargetRef.Kind {
|
||||
case "Deployment":
|
||||
deployment, err := client.AppsV1().Deployments(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||
deployment, err := client.AppsV1().Deployments(hpa.Namespace).Get(context.TODO(), hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
replicas = deployment.Status.Replicas
|
||||
case "StatefulSet":
|
||||
sts, err := client.AppsV1().StatefulSets(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||
sts, err := client.AppsV1().StatefulSets(hpa.Namespace).Get(context.TODO(), hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
@ -10,7 +11,7 @@ import (
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/extensions/v1beta1"
|
||||
v1beta1 "k8s.io/api/networking/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
@ -32,7 +33,7 @@ func TestTargetRefReplicasDeployments(t *testing.T) {
|
||||
|
||||
// Create an HPA with the deployment as ref
|
||||
hpa, err := client.AutoscalingV2beta2().HorizontalPodAutoscalers(deployment.Namespace).
|
||||
Create(newHPA(defaultNamespace, name, "Deployment"))
|
||||
Create(context.TODO(), newHPA(defaultNamespace, name, "Deployment"), metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
replicas, err := targetRefReplicas(client, hpa)
|
||||
@ -49,7 +50,7 @@ func TestTargetRefReplicasStatefulSets(t *testing.T) {
|
||||
|
||||
// Create an HPA with the statefulSet as ref
|
||||
hpa, err := client.AutoscalingV2beta2().HorizontalPodAutoscalers(statefulSet.Namespace).
|
||||
Create(newHPA(defaultNamespace, name, "StatefulSet"))
|
||||
Create(context.TODO(), newHPA(defaultNamespace, name, "StatefulSet"), metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
replicas, err := targetRefReplicas(client, hpa)
|
||||
@ -73,7 +74,7 @@ func newHPA(namesapce string, refName string, refKind string) *autoscalingv2.Hor
|
||||
}
|
||||
|
||||
func newDeployment(client *fake.Clientset, namespace string, name string, replicas, readyReplicas int32) (*appsv1.Deployment, error) {
|
||||
return client.AppsV1().Deployments(namespace).Create(&appsv1.Deployment{
|
||||
return client.AppsV1().Deployments(namespace).Create(context.TODO(), &appsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
@ -83,11 +84,11 @@ func newDeployment(client *fake.Clientset, namespace string, name string, replic
|
||||
ReadyReplicas: replicas,
|
||||
Replicas: readyReplicas,
|
||||
},
|
||||
})
|
||||
}, metav1.CreateOptions{})
|
||||
}
|
||||
|
||||
func newStatefulSet(client *fake.Clientset, namespace string, name string) (*appsv1.StatefulSet, error) {
|
||||
return client.AppsV1().StatefulSets(namespace).Create(&appsv1.StatefulSet{
|
||||
return client.AppsV1().StatefulSets(namespace).Create(context.TODO(), &appsv1.StatefulSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
@ -96,7 +97,7 @@ func newStatefulSet(client *fake.Clientset, namespace string, name string) (*app
|
||||
ReadyReplicas: 1,
|
||||
Replicas: 2,
|
||||
},
|
||||
})
|
||||
}, metav1.CreateOptions{})
|
||||
}
|
||||
|
||||
func TestSkipperCollector(t *testing.T) {
|
||||
@ -111,7 +112,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
expectError bool
|
||||
fakedAverage bool
|
||||
namespace string
|
||||
backendWeights map[string]map[string]int
|
||||
backendWeights map[string]map[string]float64
|
||||
replicas int32
|
||||
readyReplicas int32
|
||||
backendAnnotations []string
|
||||
@ -137,7 +138,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
collectedMetric: 1000,
|
||||
namespace: "default",
|
||||
backend: "backend1",
|
||||
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 60, "backend1": 40}},
|
||||
backendWeights: map[string]map[string]float64{testBackendWeightsAnnotation: {"backend2": 60.0, "backend1": 40}},
|
||||
replicas: 1,
|
||||
readyReplicas: 1,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
@ -151,7 +152,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
collectedMetric: 1000,
|
||||
namespace: "default",
|
||||
backend: "backend1",
|
||||
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 60, "backend1": 40}},
|
||||
backendWeights: map[string]map[string]float64{testBackendWeightsAnnotation: {"backend2": 60, "backend1": 40}},
|
||||
replicas: 1,
|
||||
readyReplicas: 1,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
@ -166,7 +167,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
fakedAverage: true,
|
||||
namespace: "default",
|
||||
backend: "backend1",
|
||||
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 50, "backend1": 50}},
|
||||
backendWeights: map[string]map[string]float64{testBackendWeightsAnnotation: {"backend2": 50, "backend1": 50}},
|
||||
replicas: 5,
|
||||
readyReplicas: 5,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
@ -180,7 +181,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
collectedMetric: 1500,
|
||||
namespace: "default",
|
||||
backend: "backend1",
|
||||
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 50, "backend1": 50}},
|
||||
backendWeights: map[string]map[string]float64{testBackendWeightsAnnotation: {"backend2": 50, "backend1": 50}},
|
||||
replicas: 5, // this is not taken into account
|
||||
readyReplicas: 5,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
@ -194,7 +195,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
collectedMetric: 0,
|
||||
namespace: "default",
|
||||
backend: "backend1",
|
||||
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 100, "backend1": 0}},
|
||||
backendWeights: map[string]map[string]float64{testBackendWeightsAnnotation: {"backend2": 100, "backend1": 0}},
|
||||
replicas: 5,
|
||||
readyReplicas: 5,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
@ -209,7 +210,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
fakedAverage: true,
|
||||
namespace: "default",
|
||||
backend: "backend1",
|
||||
backendWeights: map[string]map[string]int{
|
||||
backendWeights: map[string]map[string]float64{
|
||||
testBackendWeightsAnnotation: {"backend2": 20, "backend1": 80},
|
||||
testStacksetWeightsAnnotation: {"backend2": 0, "backend1": 100},
|
||||
},
|
||||
@ -226,7 +227,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
collectedMetric: 1500,
|
||||
namespace: "default",
|
||||
backend: "backend1",
|
||||
backendWeights: map[string]map[string]int{
|
||||
backendWeights: map[string]map[string]float64{
|
||||
testBackendWeightsAnnotation: {"backend2": 20, "backend1": 80},
|
||||
testStacksetWeightsAnnotation: {"backend2": 0, "backend1": 100},
|
||||
},
|
||||
@ -243,7 +244,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
collectedMetric: 0,
|
||||
namespace: "default",
|
||||
backend: "backend3",
|
||||
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 100, "backend1": 0}},
|
||||
backendWeights: map[string]map[string]float64{testBackendWeightsAnnotation: {"backend2": 100, "backend1": 0}},
|
||||
replicas: 1,
|
||||
readyReplicas: 1,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
@ -257,7 +258,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
collectedMetric: 1500,
|
||||
namespace: "default",
|
||||
backend: "backend3",
|
||||
backendWeights: map[string]map[string]int{},
|
||||
backendWeights: map[string]map[string]float64{},
|
||||
replicas: 1,
|
||||
readyReplicas: 1,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
@ -271,7 +272,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
expectError: true,
|
||||
namespace: "default",
|
||||
backend: "",
|
||||
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 100, "backend1": 0}},
|
||||
backendWeights: map[string]map[string]float64{testBackendWeightsAnnotation: {"backend2": 100, "backend1": 0}},
|
||||
replicas: 1,
|
||||
readyReplicas: 1,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
@ -300,7 +301,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
fakedAverage: true,
|
||||
namespace: "default",
|
||||
backend: "backend2",
|
||||
backendWeights: map[string]map[string]int{
|
||||
backendWeights: map[string]map[string]float64{
|
||||
testBackendWeightsAnnotation: {"backend2": 20, "backend1": 80},
|
||||
testStacksetWeightsAnnotation: {"backend1": 100},
|
||||
},
|
||||
@ -313,12 +314,12 @@ func TestSkipperCollector(t *testing.T) {
|
||||
metric: 1500,
|
||||
ingressName: "dummy-ingress",
|
||||
hostnames: []string{"example.org"},
|
||||
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.2000)`,
|
||||
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.2050)`,
|
||||
collectedMetric: 1500,
|
||||
namespace: "default",
|
||||
backend: "backend2",
|
||||
backendWeights: map[string]map[string]int{
|
||||
testBackendWeightsAnnotation: {"backend2": 20, "backend1": 80},
|
||||
backendWeights: map[string]map[string]float64{
|
||||
testBackendWeightsAnnotation: {"backend2": 20.5, "backend1": 79.5},
|
||||
testStacksetWeightsAnnotation: {"backend1": 100},
|
||||
},
|
||||
replicas: 5,
|
||||
@ -331,8 +332,8 @@ func TestSkipperCollector(t *testing.T) {
|
||||
err := makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.hostnames, tc.backendWeights)
|
||||
require.NoError(t, err)
|
||||
plugin := makePlugin(tc.metric)
|
||||
hpa := makeHPA(tc.ingressName, tc.backend)
|
||||
config := makeConfig(tc.backend, tc.fakedAverage)
|
||||
hpa := makeHPA(tc.namespace, tc.ingressName, tc.backend)
|
||||
config := makeConfig(tc.ingressName, tc.namespace, tc.backend, tc.fakedAverage)
|
||||
_, err = newDeployment(client, tc.namespace, tc.backend, tc.replicas, tc.readyReplicas)
|
||||
require.NoError(t, err)
|
||||
collector, err := NewSkipperCollector(client, plugin, hpa, config, time.Minute, tc.backendAnnotations, tc.backend)
|
||||
@ -341,6 +342,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
if tc.expectError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[string]string{"query": tc.expectedQuery}, plugin.config)
|
||||
require.NoError(t, err, "failed to collect metrics: %v", err)
|
||||
require.Len(t, collected, 1, "the number of metrics returned is not 1")
|
||||
@ -350,7 +352,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func makeIngress(client kubernetes.Interface, namespace, ingressName, backend string, hostnames []string, backendWeights map[string]map[string]int) error {
|
||||
func makeIngress(client kubernetes.Interface, namespace, ingressName, backend string, hostnames []string, backendWeights map[string]map[string]float64) error {
|
||||
annotations := make(map[string]string)
|
||||
for anno, weights := range backendWeights {
|
||||
sWeights, err := json.Marshal(weights)
|
||||
@ -381,12 +383,13 @@ func makeIngress(client kubernetes.Interface, namespace, ingressName, backend st
|
||||
Host: hostname,
|
||||
})
|
||||
}
|
||||
_, err := client.ExtensionsV1beta1().Ingresses(namespace).Create(ingress)
|
||||
_, err := client.NetworkingV1beta1().Ingresses(namespace).Create(context.TODO(), ingress, metav1.CreateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func makeHPA(ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler {
|
||||
func makeHPA(namespace, ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler {
|
||||
return &autoscalingv2.HorizontalPodAutoscaler{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: namespace},
|
||||
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
|
||||
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
|
||||
Kind: "Deployment",
|
||||
@ -404,9 +407,13 @@ func makeHPA(ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler
|
||||
},
|
||||
}
|
||||
}
|
||||
func makeConfig(backend string, fakedAverage bool) *MetricConfig {
|
||||
func makeConfig(ingressName, namespace, backend string, fakedAverage bool) *MetricConfig {
|
||||
config := &MetricConfig{
|
||||
MetricTypeName: MetricTypeName{Metric: autoscalingv2.MetricIdentifier{Name: fmt.Sprintf("%s,%s", rpsMetricName, backend)}},
|
||||
ObjectReference: custom_metrics.ObjectReference{
|
||||
Name: ingressName,
|
||||
Namespace: namespace,
|
||||
},
|
||||
MetricSpec: autoscalingv2.MetricSpec{
|
||||
Object: &autoscalingv2.ObjectMetricSource{
|
||||
Target: autoscalingv2.MetricTarget{},
|
||||
|
@ -119,7 +119,7 @@ func (p *HPAProvider) Run(ctx context.Context) {
|
||||
func (p *HPAProvider) updateHPAs() error {
|
||||
p.logger.Info("Looking for HPAs")
|
||||
|
||||
hpas, err := p.client.AutoscalingV2beta2().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{})
|
||||
hpas, err := p.client.AutoscalingV2beta2().HorizontalPodAutoscalers(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ func TestUpdateHPAs(t *testing.T) {
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
|
||||
var err error
|
||||
hpa, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
|
||||
hpa, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(context.TODO(), hpa, metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
collectorFactory := collector.NewCollectorFactory()
|
||||
@ -86,7 +86,7 @@ func TestUpdateHPAs(t *testing.T) {
|
||||
|
||||
// update HPA
|
||||
hpa.Annotations["metric-config.pods.requests-per-second.json-path/port"] = "8080"
|
||||
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Update(hpa)
|
||||
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Update(context.TODO(), hpa, metav1.UpdateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = provider.updateHPAs()
|
||||
@ -134,7 +134,7 @@ func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
|
||||
var err error
|
||||
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
|
||||
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(context.TODO(), hpa, metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
collectorFactory := collector.NewCollectorFactory()
|
||||
|
@ -69,9 +69,15 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue) {
|
||||
Resource: "pods",
|
||||
}
|
||||
case "Ingress":
|
||||
// group can be either `extentions` or `networking.k8s.io`
|
||||
group := "extensions"
|
||||
gv, err := schema.ParseGroupVersion(value.DescribedObject.APIVersion)
|
||||
if err == nil {
|
||||
group = gv.Group
|
||||
}
|
||||
groupResource = schema.GroupResource{
|
||||
Resource: "ingresses",
|
||||
Group: "extensions",
|
||||
Group: group,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,6 +126,8 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
return err
|
||||
}
|
||||
|
||||
config.GenericConfig.OpenAPIConfig.Info.Title = "kube-metrics-adapter"
|
||||
|
||||
var clientConfig *rest.Config
|
||||
if len(o.RemoteKubeConfigFile) > 0 {
|
||||
loadingRules := &clientcmd.ClientConfigLoadingRules{ExplicitPath: o.RemoteKubeConfigFile}
|
||||
@ -190,6 +192,8 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.InfluxDBMetricName}, influxdbPlugin)
|
||||
}
|
||||
|
||||
plugin, _ := collector.NewHTTPCollectorPlugin()
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.HTTPMetricName}, plugin)
|
||||
// register generic pod collector
|
||||
err = collectorFactory.RegisterPodsCollector("", collector.NewPodCollectorPlugin(client))
|
||||
if err != nil {
|
||||
|
Reference in New Issue
Block a user