mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2025-07-18 12:56:01 +00:00
Compare commits
11 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
120950078c | ||
![]() |
0790bc351a | ||
![]() |
f6b2aede5b | ||
![]() |
7d5e719eb0 | ||
![]() |
7497a61a2c | ||
![]() |
a72380125f | ||
![]() |
70c7fb843d | ||
![]() |
79533a5a93 | ||
![]() |
2765ff9811 | ||
![]() |
76d2f74743 | ||
![]() |
0de5042d3d |
@@ -2,14 +2,14 @@ language: go
|
||||
dist: xenial
|
||||
|
||||
go:
|
||||
- "1.11.x"
|
||||
- "1.13.x"
|
||||
|
||||
env:
|
||||
- GO111MODULE=on GOLANGCI_RELEASE="v1.16.0"
|
||||
|
||||
before_install:
|
||||
- go get github.com/mattn/goveralls
|
||||
- go get github.com/lawrencewoodman/roveralls
|
||||
- GO111MODULE=off go get github.com/mattn/goveralls
|
||||
- GO111MODULE=off go get github.com/lawrencewoodman/roveralls
|
||||
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANGCI_RELEASE}
|
||||
|
||||
script:
|
||||
|
64
README.md
64
README.md
@@ -1,10 +1,13 @@
|
||||
# kube-metrics-adapter
|
||||
[](https://travis-ci.org/zalando-incubator/kube-metrics-adapter)
|
||||
[](https://coveralls.io/github/zalando-incubator/kube-metrics-adapter?branch=master)
|
||||
|
||||
Kube Metrics Adapter is a general purpose metrics adapter for Kubernetes that
|
||||
can collect and serve custom and external metrics for Horizontal Pod
|
||||
Autoscaling.
|
||||
|
||||
It supports scaling based on [Prometheus metrics](https://prometheus.io/), [SQS queues](https://aws.amazon.com/sqs/) and others out of the box.
|
||||
|
||||
It discovers Horizontal Pod Autoscaling resources and starts to collect the
|
||||
requested metrics and stores them in memory. It's implemented using the
|
||||
[custom-metrics-apiserver](https://github.com/kubernetes-incubator/custom-metrics-apiserver)
|
||||
@@ -41,6 +44,18 @@ The `metric-config.*` annotations are used by the `kube-metrics-adapter` to
|
||||
configure a collector for getting the metrics. In the above example it
|
||||
configures a *json-path pod collector*.
|
||||
|
||||
## Kubernetes compatibility
|
||||
|
||||
Like the [support
|
||||
policy](https://kubernetes.io/docs/setup/release/version-skew-policy/) offered
|
||||
for Kubernetes, this project aims to support the latest three minor releases of
|
||||
Kubernetes.
|
||||
|
||||
Currently the default supported API is `autoscaling/v2beta1`. However we aim to
|
||||
move to `autoscaling/v2beta2` (available since `v1.12`) in the near future as
|
||||
this adds a lot of improvements over `v2beta1`. The move to `v2beta2` will most
|
||||
likely happen as soon as [GKE adds support for it](https://issuetracker.google.com/issues/135624588).
|
||||
|
||||
## Building
|
||||
|
||||
This project uses [Go modules](https://github.com/golang/go/wiki/Modules) as
|
||||
@@ -71,9 +86,9 @@ Currently only `json-path` collection is supported.
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type |
|
||||
| ------------ | -------------- | ------- |
|
||||
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Pods |
|
||||
| Metric | Description | Type | K8s Versions |
|
||||
| ------------ | -------------- | ------- | -- |
|
||||
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Pods | `>=1.10` |
|
||||
|
||||
### Example
|
||||
|
||||
@@ -91,6 +106,7 @@ metadata:
|
||||
metric-config.pods.requests-per-second.json-path/path: /metrics
|
||||
metric-config.pods.requests-per-second.json-path/port: "9090"
|
||||
metric-config.pods.requests-per-second.json-path/scheme: "https"
|
||||
metric-config.pods.requests-per-second.json-path/aggregator: "max"
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
@@ -128,6 +144,11 @@ The other configuration options `path`, `port` and `scheme` specify where the me
|
||||
endpoint is exposed on the pod. The `path` and `port` options do not have default values
|
||||
so they must be defined. The `scheme` is optional and defaults to `http`.
|
||||
|
||||
The `aggregator` configuration option specifies the aggregation function used to aggregate
|
||||
values of JSONPath expressions that evaluate to arrays/slices of numbers.
|
||||
It's optional but when the expression evaluates to an array/slice, it's absence will
|
||||
produce an error. The supported aggregation functions are `avg`, `max`, `min` and `sum`.
|
||||
|
||||
## Prometheus collector
|
||||
|
||||
The Prometheus collector is a generic collector which can map Prometheus
|
||||
@@ -151,10 +172,10 @@ the trade-offs between the two approaches.
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type | Kind |
|
||||
| ------------ | -------------- | ------- | -- |
|
||||
| `prometheus-query` | Generic metric which requires a user defined query. | External | |
|
||||
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Object | *any* |
|
||||
| Metric | Description | Type | Kind | K8s Versions |
|
||||
| ------------ | -------------- | ------- | -- | -- |
|
||||
| `prometheus-query` | Generic metric which requires a user defined query. | External | | `>=1.10` |
|
||||
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Object | *any* | `>=1.10` |
|
||||
|
||||
### Example: External Metric
|
||||
|
||||
@@ -259,9 +280,9 @@ box so users don't have to define those manually.
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type | Kind |
|
||||
| ----------- | -------------- | ------ | ---- |
|
||||
| `requests-per-second` | Scale based on requests per second for a certain ingress. | Object | `Ingress` |
|
||||
| Metric | Description | Type | Kind | K8s Versions |
|
||||
| ----------- | -------------- | ------ | ---- | ---- |
|
||||
| `requests-per-second` | Scale based on requests per second for a certain ingress. | Object | `Ingress` | `>=1.14` (can work with `>=1.10`) |
|
||||
|
||||
### Example
|
||||
|
||||
@@ -288,7 +309,10 @@ spec:
|
||||
apiVersion: extensions/v1beta1
|
||||
kind: Ingress
|
||||
name: myapp
|
||||
targetValue: 10 # this will be treated as targetAverageValue
|
||||
averageValue: 10 # Only works with Kubernetes >=1.14
|
||||
# for Kubernetes <1.14 you can use `targetValue` instead:
|
||||
targetValue: 10 # this must be set, but has no effect if `averageValue` is defined.
|
||||
# Otherwise it will be treated as targetAverageValue
|
||||
```
|
||||
|
||||
### Metric weighting based on backend
|
||||
@@ -302,13 +326,17 @@ return the requests-per-second being sent to the `backend1`. The ingress annotat
|
||||
the backend weights can be obtained can be specified through the flag `--skipper-backends-annotation`.
|
||||
|
||||
|
||||
**Note:** As of Kubernetes v1.10 the HPA does not support `targetAverageValue` for
|
||||
**Note:** For Kubernetes `<v1.14` the HPA does not support `averageValue` for
|
||||
metrics of type `Object`. In case of requests per second it does not make sense
|
||||
to scale on a summed value because you can not make the total requests per
|
||||
second go down by adding more pods. For this reason the skipper collector will
|
||||
automatically treat the value you define in `targetValue` as an average per pod
|
||||
instead of a total sum.
|
||||
|
||||
**ONLY use `targetValue` if you are on Kubernetes
|
||||
`<1.14`, it is not as percise as using `averageValue` and will not be supported
|
||||
after Kubernetes `v1.16` is released according to the [support policy](https://kubernetes.io/docs/setup/release/version-skew-policy/).**
|
||||
|
||||
## AWS collector
|
||||
|
||||
The AWS collector allows scaling based on external metrics exposed by AWS
|
||||
@@ -340,9 +368,9 @@ PolicyDocument:
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type |
|
||||
| ------------ | ------- | -- |
|
||||
| `sqs-queue-length` | Scale based on SQS queue length | External |
|
||||
| Metric | Description | Type | K8s Versions |
|
||||
| ------------ | ------- | -- | -- |
|
||||
| `sqs-queue-length` | Scale based on SQS queue length | External | `>=1.10` |
|
||||
|
||||
### Example
|
||||
|
||||
@@ -388,9 +416,9 @@ The ZMON collector allows scaling based on external metrics exposed by
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type |
|
||||
| ------------ | ------- | -- |
|
||||
| `zmon-check` | Scale based on any ZMON check results | External |
|
||||
| Metric | Description | Type | K8s Versions |
|
||||
| ------------ | ------- | -- | -- |
|
||||
| `zmon-check` | Scale based on any ZMON check results | External | `>=1.10` |
|
||||
|
||||
### Example
|
||||
|
||||
|
@@ -1,7 +1,8 @@
|
||||
We acknowledge that every line of code that we write may potentially contain security issues.
|
||||
We are trying to deal with it responsibly and provide patches as quickly as possible.
|
||||
|
||||
We are trying to deal with it responsibly and provide patches as quickly as possible. If you have anything to report to us please use the following channels:
|
||||
We host our bug bounty program on HackerOne, it is currently private, therefore if you would like to report a vulnerability and get rewarded for it, please ask to join our program by filling this form:
|
||||
|
||||
Email: Tech-Security@zalando.de
|
||||
OR
|
||||
Submit your vulnerability report through our bug bounty program at: https://hackerone.com/zalando
|
||||
https://corporate.zalando.com/en/services-and-contact#security-form
|
||||
|
||||
You can also send you report via this form if you do not want to join our bug bounty program and just want to report a vulnerability or security issue.
|
||||
|
@@ -37,7 +37,8 @@ spec:
|
||||
apiVersion: extensions/v1beta1
|
||||
kind: Ingress
|
||||
name: custom-metrics-consumer
|
||||
targetValue: 10 # this will be treated as targetAverageValue
|
||||
averageValue: 10
|
||||
targetValue: 10 # this must be set, but has no effect if `averageValue` is defined.
|
||||
- type: External
|
||||
external:
|
||||
metricName: sqs-queue-length
|
||||
|
84
go.mod
84
go.mod
@@ -1,92 +1,32 @@
|
||||
module github.com/zalando-incubator/kube-metrics-adapter
|
||||
|
||||
require (
|
||||
bitbucket.org/ww/goautoneg v0.0.0-20120707110453-75cd24fc2f2c // indirect
|
||||
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
|
||||
github.com/BurntSushi/toml v0.3.0 // indirect
|
||||
github.com/NYTimes/gziphandler v1.0.1 // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.0 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/Sirupsen/logrus v1.0.6 // indirect
|
||||
github.com/aws/aws-sdk-go v1.16.6
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
|
||||
github.com/boltdb/bolt v1.3.1 // indirect
|
||||
github.com/coreos/bbolt v1.3.0 // indirect
|
||||
github.com/coreos/etcd v3.3.9+incompatible // indirect
|
||||
github.com/coreos/go-semver v0.2.0 // indirect
|
||||
github.com/coreos/go-systemd v0.0.0-20180705093442-88bfeed483d3 // indirect
|
||||
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea // indirect
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
|
||||
github.com/docker/docker v1.13.1 // indirect
|
||||
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
|
||||
github.com/emicklei/go-restful v2.8.0+incompatible // indirect
|
||||
github.com/emicklei/go-restful-swagger12 v0.0.0-20170926063155-7524189396c6 // indirect
|
||||
github.com/evanphx/json-patch v4.1.1-0.20190203004735-bbf30d639737+incompatible // indirect
|
||||
github.com/fsnotify/fsnotify v1.4.7 // indirect
|
||||
github.com/ghodss/yaml v1.0.0 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.0.0-20180322222829-3a0015ad55fa // indirect
|
||||
github.com/go-openapi/jsonreference v0.0.0-20180322222742-3fb327e6747d // indirect
|
||||
github.com/go-openapi/spec v0.0.0-20180801175345-384415f06ee2 // indirect
|
||||
github.com/go-openapi/swag v0.0.0-20180715190254-becd2f08beaf // indirect
|
||||
github.com/gogo/protobuf v1.1.1 // indirect
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
|
||||
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
|
||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
|
||||
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect
|
||||
github.com/googleapis/gnostic v0.2.0 // indirect
|
||||
github.com/gorilla/websocket v1.3.0 // indirect
|
||||
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
|
||||
github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47 // indirect
|
||||
github.com/hpcloud/tail v1.0.0 // indirect
|
||||
github.com/imdario/mergo v0.3.6 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/jonboulle/clockwork v0.1.0 // indirect
|
||||
github.com/json-iterator/go v1.1.5 // indirect
|
||||
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190116221620-b7016fc85e1c
|
||||
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
|
||||
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
|
||||
github.com/onsi/ginkgo v1.6.0 // indirect
|
||||
github.com/onsi/gomega v1.4.1 // indirect
|
||||
github.com/pborman/uuid v0.0.0-20180122190007-c65b2f87fee3 // indirect
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
|
||||
github.com/prometheus/client_golang v0.9.0-pre1.0.20180824101016-4eb539fa85a2
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect
|
||||
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e
|
||||
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 // indirect
|
||||
github.com/sirupsen/logrus v1.3.0
|
||||
github.com/prometheus/client_golang v0.9.2
|
||||
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
|
||||
github.com/sirupsen/logrus v1.4.2
|
||||
github.com/soheilhy/cmux v0.1.4 // indirect
|
||||
github.com/spf13/cobra v0.0.3
|
||||
github.com/spf13/pflag v1.0.2 // indirect
|
||||
github.com/stretchr/testify v1.2.2
|
||||
github.com/stretchr/testify v1.3.0
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
|
||||
github.com/ugorji/go v1.1.1 // indirect
|
||||
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect
|
||||
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
|
||||
golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e // indirect
|
||||
golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect
|
||||
google.golang.org/appengine v1.2.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 // indirect
|
||||
google.golang.org/grpc v1.14.0 // indirect
|
||||
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
|
||||
gopkg.in/fsnotify.v1 v1.4.7 // indirect
|
||||
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 // indirect
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||
gopkg.in/yaml.v2 v2.2.1 // indirect
|
||||
k8s.io/api v0.0.0-20190226173710-145d52631d00
|
||||
k8s.io/apimachinery v0.0.0-20190221084156-01f179d85dbc
|
||||
k8s.io/apiserver v0.0.0-20190226174732-cf2f1d68202d
|
||||
k8s.io/client-go v2.0.0-alpha.0.0.20190226174127-78295b709ec6+incompatible
|
||||
k8s.io/kube-openapi v0.0.0-20180731170545-e3762e86a74c // indirect
|
||||
k8s.io/api v0.0.0-20190918155943-95b840bb6a1f
|
||||
k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655
|
||||
k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad // indirect
|
||||
k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90
|
||||
k8s.io/component-base v0.0.0-20190918160511-547f6c5d7090
|
||||
k8s.io/klog v0.4.0
|
||||
k8s.io/metrics v0.0.0-20190226180357-f3f09b9076d1
|
||||
)
|
||||
|
||||
|
2
main.go
2
main.go
@@ -23,7 +23,7 @@ import (
|
||||
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/server"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/util/logs"
|
||||
"k8s.io/component-base/logs"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
@@ -165,6 +165,7 @@ type MetricConfig struct {
|
||||
ObjectReference custom_metrics.ObjectReference
|
||||
PerReplica bool
|
||||
Interval time.Duration
|
||||
MetricSpec autoscalingv2.MetricSpec
|
||||
}
|
||||
|
||||
// ParseHPAMetrics parses the HPA object into a list of metric configurations.
|
||||
@@ -206,12 +207,15 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
|
||||
MetricTypeName: typeName,
|
||||
ObjectReference: ref,
|
||||
Config: map[string]string{},
|
||||
MetricSpec: metric,
|
||||
}
|
||||
|
||||
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
|
||||
metric.External.Metric.Selector != nil &&
|
||||
metric.External.Metric.Selector.MatchLabels != nil {
|
||||
config.Config = metric.External.Metric.Selector.MatchLabels
|
||||
for k, v := range metric.External.Metric.Selector.MatchLabels {
|
||||
config.Config[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
@@ -17,10 +18,11 @@ import (
|
||||
// querying the pods metrics endpoint and lookup the metric value as defined by
|
||||
// the json path query.
|
||||
type JSONPathMetricsGetter struct {
|
||||
jsonPath *jsonpath.Compiled
|
||||
scheme string
|
||||
path string
|
||||
port int
|
||||
jsonPath *jsonpath.Compiled
|
||||
scheme string
|
||||
path string
|
||||
port int
|
||||
aggregator string
|
||||
}
|
||||
|
||||
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
||||
@@ -28,12 +30,12 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
|
||||
getter := &JSONPathMetricsGetter{}
|
||||
|
||||
if v, ok := config["json-key"]; ok {
|
||||
pat, err := jsonpath.Compile(v)
|
||||
path, err := jsonpath.Compile(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
|
||||
}
|
||||
|
||||
getter.jsonPath = pat
|
||||
getter.jsonPath = path
|
||||
}
|
||||
|
||||
if v, ok := config["scheme"]; ok {
|
||||
@@ -52,6 +54,10 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
|
||||
getter.port = n
|
||||
}
|
||||
|
||||
if v, ok := config["aggregator"]; ok {
|
||||
getter.aggregator = v
|
||||
}
|
||||
|
||||
return getter, nil
|
||||
}
|
||||
|
||||
@@ -83,11 +89,38 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
|
||||
return float64(res), nil
|
||||
case float64:
|
||||
return res, nil
|
||||
case []interface{}:
|
||||
s, err := castSlice(res)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return reduce(s, g.aggregator)
|
||||
default:
|
||||
return 0, fmt.Errorf("unsupported type %T", res)
|
||||
}
|
||||
}
|
||||
|
||||
// castSlice takes a slice of interface and returns a slice of float64 if all
|
||||
// values in slice were castable, else returns an error
|
||||
func castSlice(in []interface{}) ([]float64, error) {
|
||||
out := []float64{}
|
||||
|
||||
for _, v := range in {
|
||||
switch v := v.(type) {
|
||||
case int:
|
||||
out = append(out, float64(v))
|
||||
case float32:
|
||||
out = append(out, float64(v))
|
||||
case float64:
|
||||
out = append(out, v)
|
||||
default:
|
||||
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// getPodMetrics returns the content of the pods metrics endpoint.
|
||||
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
|
||||
if pod.Status.PodIP == "" {
|
||||
@@ -131,3 +164,64 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
|
||||
func reduce(values []float64, aggregator string) (float64, error) {
|
||||
switch aggregator {
|
||||
case "avg":
|
||||
return avg(values), nil
|
||||
case "min":
|
||||
return min(values), nil
|
||||
case "max":
|
||||
return max(values), nil
|
||||
case "sum":
|
||||
return sum(values), nil
|
||||
default:
|
||||
return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator)
|
||||
}
|
||||
}
|
||||
|
||||
// avg implements the average mathematical function over a slice of float64
|
||||
func avg(values []float64) float64 {
|
||||
sum := sum(values)
|
||||
return sum / float64(len(values))
|
||||
}
|
||||
|
||||
// min implements the absolute minimum mathematical function over a slice of float64
|
||||
func min(values []float64) float64 {
|
||||
// initialized with positive infinity, all finite numbers are smaller than it
|
||||
curMin := math.Inf(1)
|
||||
|
||||
for _, v := range values {
|
||||
if v < curMin {
|
||||
curMin = v
|
||||
}
|
||||
}
|
||||
|
||||
return curMin
|
||||
}
|
||||
|
||||
// max implements the absolute maximum mathematical function over a slice of float64
|
||||
func max(values []float64) float64 {
|
||||
// initialized with negative infinity, all finite numbers are bigger than it
|
||||
curMax := math.Inf(-1)
|
||||
|
||||
for _, v := range values {
|
||||
if v > curMax {
|
||||
curMax = v
|
||||
}
|
||||
}
|
||||
|
||||
return curMax
|
||||
}
|
||||
|
||||
// sum implements the summation mathematical function over a slice of float64
|
||||
func sum(values []float64) float64 {
|
||||
res := 0.0
|
||||
|
||||
for _, v := range values {
|
||||
res += v
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
105
pkg/collector/json_path_collector_test.go
Normal file
105
pkg/collector/json_path_collector_test.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/oliveagle/jsonpath"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewJSONPathMetricsGetter(t *testing.T) {
|
||||
configNoAggregator := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
}
|
||||
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
|
||||
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
|
||||
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, &JSONPathMetricsGetter{
|
||||
jsonPath: jpath1,
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
}, getterNoAggregator)
|
||||
|
||||
configAggregator := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
"aggregator": "avg",
|
||||
}
|
||||
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
|
||||
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
|
||||
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, &JSONPathMetricsGetter{
|
||||
jsonPath: jpath2,
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
aggregator: "avg",
|
||||
}, getterAggregator)
|
||||
|
||||
configErrorJSONPath := map[string]string{
|
||||
"json-key": "{}",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
}
|
||||
|
||||
_, err3 := NewJSONPathMetricsGetter(configErrorJSONPath)
|
||||
require.Error(t, err3)
|
||||
|
||||
configErrorPort := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "a9090",
|
||||
}
|
||||
|
||||
_, err4 := NewJSONPathMetricsGetter(configErrorPort)
|
||||
require.Error(t, err4)
|
||||
}
|
||||
|
||||
func TestCastSlice(t *testing.T) {
|
||||
res1, err1 := castSlice([]interface{}{1, 2, 3})
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
|
||||
|
||||
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
|
||||
|
||||
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
|
||||
require.NoError(t, err3)
|
||||
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
|
||||
|
||||
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
|
||||
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
|
||||
require.Equal(t, []float64(nil), res4)
|
||||
}
|
||||
|
||||
func TestReduce(t *testing.T) {
|
||||
average, err1 := reduce([]float64{1, 2, 3}, "avg")
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, 2.0, average)
|
||||
|
||||
min, err2 := reduce([]float64{1, 2, 3}, "min")
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, 1.0, min)
|
||||
|
||||
max, err3 := reduce([]float64{1, 2, 3}, "max")
|
||||
require.NoError(t, err3)
|
||||
require.Equal(t, 3.0, max)
|
||||
|
||||
sum, err4 := reduce([]float64{1, 2, 3}, "sum")
|
||||
require.NoError(t, err4)
|
||||
require.Equal(t, 6.0, sum)
|
||||
|
||||
_, err5 := reduce([]float64{1, 2, 3}, "inexistent_function")
|
||||
require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function")
|
||||
}
|
@@ -1,67 +0,0 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
)
|
||||
|
||||
// MaxWeightedCollector is a simple aggregator collector that returns the maximum value
|
||||
// of metrics from all collectors.
|
||||
type MaxWeightedCollector struct {
|
||||
collectors []Collector
|
||||
interval time.Duration
|
||||
weight float64
|
||||
}
|
||||
|
||||
// NewMaxWeightedCollector initializes a new MaxWeightedCollector.
|
||||
func NewMaxWeightedCollector(interval time.Duration, weight float64, collectors ...Collector) *MaxWeightedCollector {
|
||||
return &MaxWeightedCollector{
|
||||
collectors: collectors,
|
||||
interval: interval,
|
||||
weight: weight,
|
||||
}
|
||||
}
|
||||
|
||||
// GetMetrics gets metrics from all collectors and return the higest value.
|
||||
func (c *MaxWeightedCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
errors := make([]error, 0)
|
||||
collectedMetrics := make([]CollectedMetric, 0)
|
||||
for _, collector := range c.collectors {
|
||||
values, err := collector.GetMetrics()
|
||||
if err != nil {
|
||||
if _, ok := err.(NoResultError); ok {
|
||||
errors = append(errors, err)
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
collectedMetrics = append(collectedMetrics, values...)
|
||||
}
|
||||
if len(collectedMetrics) == 0 {
|
||||
if len(errors) == 0 {
|
||||
return nil, fmt.Errorf("no metrics collected, cannot determine max")
|
||||
}
|
||||
errorStrings := make([]string, len(errors))
|
||||
for i, e := range errors {
|
||||
errorStrings[i] = e.Error()
|
||||
}
|
||||
allErrors := strings.Join(errorStrings, ",")
|
||||
return nil, fmt.Errorf("could not determine maximum due to errors: %s", allErrors)
|
||||
}
|
||||
max := collectedMetrics[0]
|
||||
for _, value := range collectedMetrics {
|
||||
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
|
||||
max = value
|
||||
}
|
||||
}
|
||||
max.Custom.Value = *resource.NewMilliQuantity(int64(c.weight*float64(max.Custom.Value.MilliValue())), resource.DecimalSI)
|
||||
return []CollectedMetric{max}, nil
|
||||
}
|
||||
|
||||
// Interval returns the interval at which the collector should run.
|
||||
func (c *MaxWeightedCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
@@ -1,99 +0,0 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||
)
|
||||
|
||||
type dummyCollector struct {
|
||||
value int64
|
||||
}
|
||||
|
||||
func (c dummyCollector) Interval() time.Duration {
|
||||
return time.Second
|
||||
}
|
||||
|
||||
func (c dummyCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
switch c.value {
|
||||
case 0:
|
||||
return nil, NoResultError{query: "invalid query"}
|
||||
case -1:
|
||||
return nil, fmt.Errorf("test error")
|
||||
default:
|
||||
quantity := resource.NewQuantity(c.value, resource.DecimalSI)
|
||||
return []CollectedMetric{
|
||||
{
|
||||
Custom: custom_metrics.MetricValue{
|
||||
Value: *quantity,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func TestMaxCollector(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
values []int64
|
||||
expected int
|
||||
weight float64
|
||||
errored bool
|
||||
}{
|
||||
{
|
||||
name: "basic",
|
||||
values: []int64{100, 10, 9},
|
||||
expected: 100,
|
||||
weight: 1,
|
||||
errored: false,
|
||||
},
|
||||
{
|
||||
name: "weighted",
|
||||
values: []int64{100, 10, 9},
|
||||
expected: 20,
|
||||
weight: 0.2,
|
||||
errored: false,
|
||||
},
|
||||
{
|
||||
name: "with error",
|
||||
values: []int64{10, 9, -1},
|
||||
weight: 0.5,
|
||||
errored: true,
|
||||
},
|
||||
{
|
||||
name: "some invalid results",
|
||||
values: []int64{0, 1, 0, 10, 9},
|
||||
expected: 5,
|
||||
weight: 0.5,
|
||||
errored: false,
|
||||
},
|
||||
{
|
||||
name: "both invalid results and errors",
|
||||
values: []int64{0, 1, 0, -1, 10, 9},
|
||||
weight: 0.5,
|
||||
errored: true,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
collectors := make([]Collector, len(tc.values))
|
||||
for i, v := range tc.values {
|
||||
collectors[i] = dummyCollector{value: v}
|
||||
}
|
||||
wc := NewMaxWeightedCollector(time.Second, tc.weight, collectors...)
|
||||
metrics, err := wc.GetMetrics()
|
||||
if tc.errored {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Len(t, metrics, 1)
|
||||
require.EqualValues(t, tc.expected, metrics[0].Custom.Value.Value())
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
}
|
@@ -3,6 +3,7 @@ package collector
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
@@ -146,7 +147,7 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
sampleValue = scalar.Value
|
||||
}
|
||||
|
||||
if sampleValue.String() == "NaN" {
|
||||
if math.IsNaN(float64(sampleValue)) {
|
||||
return nil, &NoResultError{query: c.query}
|
||||
}
|
||||
|
||||
|
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -16,7 +17,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
|
||||
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"%s"}[1m])) * %.4f)`
|
||||
rpsMetricName = "requests-per-second"
|
||||
rpsMetricBackendSeparator = ","
|
||||
)
|
||||
@@ -72,8 +73,7 @@ type SkipperCollector struct {
|
||||
}
|
||||
|
||||
// NewSkipperCollector initializes a new SkipperCollector.
|
||||
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2.HorizontalPodAutoscaler,
|
||||
config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) {
|
||||
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) {
|
||||
return &SkipperCollector{
|
||||
client: client,
|
||||
objectReference: config.ObjectReference,
|
||||
@@ -136,29 +136,25 @@ func (c *SkipperCollector) getCollector() (Collector, error) {
|
||||
}
|
||||
config := c.config
|
||||
|
||||
var collector Collector
|
||||
collectors := make([]Collector, 0, len(ingress.Spec.Rules))
|
||||
var escapedHostnames []string
|
||||
for _, rule := range ingress.Spec.Rules {
|
||||
host := strings.Replace(rule.Host, ".", "_", -1)
|
||||
config.Config = map[string]string{
|
||||
"query": fmt.Sprintf(rpsQuery, host),
|
||||
}
|
||||
|
||||
config.PerReplica = false // per replica is handled outside of the prometheus collector
|
||||
collector, err := c.plugin.NewCollector(c.hpa, &config, c.interval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
collectors = append(collectors, collector)
|
||||
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(rule.Host, ".", "_", -1)))
|
||||
}
|
||||
|
||||
if len(collectors) > 0 {
|
||||
collector = NewMaxWeightedCollector(c.interval, backendWeight, collectors...)
|
||||
} else {
|
||||
if len(escapedHostnames) == 0 {
|
||||
return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name)
|
||||
}
|
||||
|
||||
config.Config = map[string]string{
|
||||
"query": fmt.Sprintf(rpsQuery, strings.Join(escapedHostnames, "|"), backendWeight),
|
||||
}
|
||||
|
||||
config.PerReplica = false // per replica is handled outside of the prometheus collector
|
||||
collector, err := c.plugin.NewCollector(c.hpa, &config, c.interval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return collector, nil
|
||||
}
|
||||
|
||||
@@ -178,22 +174,26 @@ func (c *SkipperCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
return nil, fmt.Errorf("expected to only get one metric value, got %d", len(values))
|
||||
}
|
||||
|
||||
// get current replicas for the targeted scale object. This is used to
|
||||
// calculate an average metric instead of total.
|
||||
// targetAverageValue will be available in Kubernetes v1.12
|
||||
// https://github.com/kubernetes/kubernetes/pull/64097
|
||||
replicas, err := targetRefReplicas(c.client, c.hpa)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if replicas < 1 {
|
||||
return nil, fmt.Errorf("unable to get average value for %d replicas", replicas)
|
||||
}
|
||||
|
||||
value := values[0]
|
||||
avgValue := float64(value.Custom.Value.MilliValue()) / float64(replicas)
|
||||
value.Custom.Value = *resource.NewMilliQuantity(int64(avgValue), resource.DecimalSI)
|
||||
|
||||
// For Kubernetes <v1.14 we have to fall back to manual average
|
||||
if c.config.MetricSpec.Object.Target.AverageValue == nil {
|
||||
// get current replicas for the targeted scale object. This is used to
|
||||
// calculate an average metric instead of total.
|
||||
// targetAverageValue will be available in Kubernetes v1.12
|
||||
// https://github.com/kubernetes/kubernetes/pull/64097
|
||||
replicas, err := targetRefReplicas(c.client, c.hpa)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if replicas < 1 {
|
||||
return nil, fmt.Errorf("unable to get average value for %d replicas", replicas)
|
||||
}
|
||||
|
||||
avgValue := float64(value.Custom.Value.MilliValue()) / float64(replicas)
|
||||
value.Custom.Value = *resource.NewMilliQuantity(int64(avgValue), resource.DecimalSI)
|
||||
}
|
||||
|
||||
return []CollectedMetric{value}, nil
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@@ -262,7 +262,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
|
||||
}
|
||||
|
||||
// GetMetricByName gets a single metric by name.
|
||||
func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo) (*custom_metrics.MetricValue, error) {
|
||||
func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
|
||||
metric := p.metricStore.GetMetricsByName(name, info)
|
||||
if metric == nil {
|
||||
return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name.Name)
|
||||
@@ -272,7 +272,7 @@ func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.C
|
||||
|
||||
// GetMetricBySelector returns metrics for namespaced resources by
|
||||
// label selector.
|
||||
func (p *HPAProvider) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo) (*custom_metrics.MetricValueList, error) {
|
||||
func (p *HPAProvider) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
|
||||
return p.metricStore.GetMetricsBySelector(namespace, selector, info), nil
|
||||
}
|
||||
|
||||
|
@@ -25,7 +25,6 @@ import (
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/golang/glog"
|
||||
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/cmd/server"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/spf13/cobra"
|
||||
@@ -38,6 +37,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -111,7 +111,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct{}) error {
|
||||
go func() {
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
glog.Fatal(http.ListenAndServe(o.MetricsAddress, nil))
|
||||
klog.Fatal(http.ListenAndServe(o.MetricsAddress, nil))
|
||||
}()
|
||||
|
||||
config, err := o.Config()
|
||||
|
Reference in New Issue
Block a user