Compare commits

...

46 Commits

Author SHA1 Message Date
Mikkel Oscar Lyderik Larsen 5e6d304ecd Support networking.k8s.io/v1beta1 Ingresses
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-10-08 16:55:49 +02:00
Mikkel Oscar Lyderik Larsen 0de5042d3d Update dependencies (#80)
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-10-08 16:20:47 +02:00
aermakov-zalando 07c0e179b3 Fail on dirty and/or non-exact versions on master (#79)
* Fail on dirty and/or non-exact versions on master

Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>

* Prevent go from modifying go.mod

Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>

* Fix go.mod version

Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>

* Allow non-exact tag matches

Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
2019-10-01 11:54:37 +02:00
aermakov-zalando 29ee953a16 Merge pull request #78 from zalando-incubator/return-err
When traffic switching is used, require a backend for the RPS metric
2019-09-27 17:56:35 +02:00
Alexey Ermakov f78ef26857 When traffic switching is used, require a backend for the RPS metric
Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
2019-09-27 15:02:46 +02:00
Mikkel Oscar Lyderik Larsen a3c14e9dcb Merge pull request #76 from zalando-incubator/refactor-parsing
Prevent panic when parsing HPAs
2019-08-23 09:08:07 +02:00
Mikkel Oscar Lyderik Larsen b6b13fb31a Prevent panic when parsing HPAs
This is a slight refactoring/unification of how metric
labels/annotations are parsed and handled accross collectors. This is
done to prevent crashes when labels are not defined on external metrics.

Fix #69

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-08-22 08:09:28 +02:00
Mikkel Oscar Lyderik Larsen 0a06691d39 Merge pull request #75 from edganiukov/master
collector/prometheus: add Prometheus URL (optional) as an annotation in HPA
2019-08-21 09:55:10 +02:00
Eduard Ganiukov 2d1d51e829 collector/prometheus: add prometheus server (optional) as an annotation in HPA.
Signed-off-by: Eduard Ganiukov <eduard.ganiukov@swisscom.com>
2019-08-14 13:05:50 +02:00
aermakov-zalando 41761e62df Merge pull request #71 from zalando-incubator/fix-rps-test
Skipper: fix the no annotation test so it makes more sense
2019-07-30 17:57:11 +02:00
Alexey Ermakov ed4c93abbb Skipper: fix the no annotation test so it makes more sense
Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
2019-07-30 17:50:39 +02:00
aermakov-zalando b2194ca136 Correctly handle zero-weight backends (#70)
Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
2019-07-30 17:32:36 +02:00
Mikkel Oscar Lyderik Larsen bd0dd10e72 Use proper tags for docker images (#66)
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-07-26 20:36:03 +02:00
Mikkel Oscar Lyderik Larsen 461869c69b Fix response on no metrics found (#67)
Fixes the response from `GetMetricsBySelector` in case no metrics are
found. This issue caused a panic in kube-controller-manager:
https://github.com/kubernetes/kubernetes/pull/80392

Fix #40

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-07-26 20:35:17 +02:00
Mikkel Oscar Lyderik Larsen 9950851cad Merge pull request #64 from zalando-incubator/v2beta1-conversion
Support autoscaling v2beta1
2019-07-26 19:27:31 +02:00
Mikkel Oscar Lyderik Larsen d85fee795e Don't import v2beta2 twice
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-07-21 12:36:32 +02:00
Mikkel Oscar Lyderik Larsen 990f8eab14 Ignore files with upstream code
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-07-21 12:11:59 +02:00
Mikkel Oscar Lyderik Larsen 9a396bde68 Support autoscaling v2beta1
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-07-21 11:22:55 +02:00
Mikkel Oscar Lyderik Larsen aa8d24dbcf Merge pull request #63 from zalando-incubator/document-pod-https
Document how to use HTTPS for pod collector
2019-07-15 09:09:39 +02:00
Arjun Naik 19e9be9671 Document how to use HTTPS for pod collector
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-07-14 15:49:00 +02:00
Mikkel Oscar Lyderik Larsen 8fed8538ad Merge pull request #53 from zalando-incubator/prometheus-external-metric
Allow Prometheus metrics for External target
2019-05-19 23:19:29 +02:00
Sandor Szücs 9a234cbdac add AWS IAM policy as requirement to integrate with AWS SQS (#58)
Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
2019-05-17 11:07:18 +02:00
Mikkel Oscar Lyderik Larsen ffff8c2040 Prevent leaking collectors when HPA gets updated (#54)
* Prevent leaking collectors when HPA gets updated

This fixes an issue where collectors would be leaking when HPAs are
getting updated.

Fix this by stopping the collector started for the previous version of
the HPA.

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>

* Add tests to verify old collector is removed

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-05-08 10:34:49 +02:00
Mikkel Oscar Lyderik Larsen 9d2760e3fc Allow Prometheus metrics for External target
Fix #45

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-04-30 23:22:18 +02:00
Mikkel Oscar Lyderik Larsen 5598b4d012 Merge pull request #52 from zalando-incubator/fix/golangci-lint-errors
Fix all errors from golangci-lint command
2019-04-27 16:11:35 +02:00
Arjun Naik 888e76b748 Fix all errors from golangci-lint command
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-27 15:54:15 +02:00
Arjun 7c848a1282 Max collector should ignore only no result errors (#50)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-27 14:54:20 +02:00
Arjun 445c7c874a Added golangci linter (#51)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-27 13:35:55 +02:00
Arjun 2eed3e64d0 Return a value when atleast one of the metrics returns a value (#47)
* Return a value when atleast one of the metrics returns a value

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Added test for max weighted collector

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-26 17:54:25 +02:00
Mikkel Oscar Lyderik Larsen f097e63401 Add build cache for CDP build (#49)
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-04-26 16:47:00 +02:00
Mikkel Oscar Lyderik Larsen ca4e2008c4 Merge pull request #48 from zalando-incubator/fix-config-key
Remove unused Configuration key from MetricConfig
2019-04-26 16:34:48 +02:00
Mikkel Oscar Lyderik Larsen 3f019a1ceb Remove unused Configuration key from MetricConfig
This fixes an issue of setting up a ZMON collector where the incorrect
key `Configuration` was used, which was not initialized in the metrics
config parser. The `Config` key is the right one to use.

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-04-26 16:19:53 +02:00
Arjun 5a6f4997bd Add the labels from the zmon check into the config object (#46)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-18 13:34:11 +02:00
Arjun 8db22f38a3 Fixed metric labels so that metric are tagged correct. Also added nil check (#44)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-05 16:05:37 +02:00
Mikkel Oscar Lyderik Larsen d5b803d923 Merge pull request #43 from zalando-incubator/fix/panic
Fix nil dereference panics for the Annotation config parser
2019-04-04 15:00:46 +02:00
Arjun Naik 14f13495af Fix nil dereference panics
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-04 10:57:08 +02:00
Arjun dfeae82cae Upgrade all pacakges to autoscalingv2beta2 (#39)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-03 10:23:52 +02:00
Arjun 04b212175e Added clarification of dummy-pod (#42)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-03-25 16:15:54 +01:00
Arjun 478c97d5cb Added instructions on configuring adapter to collect ingress metrics (#34)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-02-24 21:03:55 +01:00
Arjun f4efa2898b Handle condition when backend weights only sometimes present (#33)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-02-01 10:59:42 +01:00
Mikkel Oscar Lyderik Larsen 7258cb7800 Merge pull request #31 from zalando-incubator/fix/no-backend
Fix case when backend is not set
2019-01-23 11:44:54 +01:00
Arjun Naik 56dd8b52e0 Fix case when backend is not set
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-01-23 11:36:09 +01:00
Arjun 248acf0311 Added logic and test case for weight of backend is 0 (#29)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-01-18 17:39:01 +01:00
Arjun 75633d3082 Changed request-per-second metric separator to a comma (#28)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-01-17 13:46:57 +01:00
Arjun 72aa672f51 Added weighting of RPS metrics based on backend weights (#27)
* Added weighting of rps metrics based on backend weights

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Updated documented with instructions on how to use the backend weighting

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Changed separator for RPS metric and added flag to specify backend weights annotation.

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Allow for multiple backends with for weighting.

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-01-17 13:13:52 +01:00
Çağdaş Şenol f49f7821dc Fix json serialization naming for zmon queries (#25)
Signed-off-by: Cagdas Senol <cagdas.senol@zalando.de>
2019-01-08 16:24:46 +01:00
33 changed files with 2382 additions and 627 deletions
+22
View File
@@ -0,0 +1,22 @@
run:
skip-files:
- "pkg/provider/generated.conversion.go"
- "pkg/provider/conversion.go"
linters-settings:
golint:
min-confidence: 0.9
linters:
disable-all: true
enable:
- staticcheck
- ineffassign
- golint
- goimports
- errcheck
issues:
exclude-rules:
# Exclude some staticcheck messages
- linters:
- staticcheck
text: "SA9003:"
+6 -4
View File
@@ -2,16 +2,18 @@ language: go
dist: xenial
go:
- "1.11.x"
- "1.13.x"
env:
- GO111MODULE=on
- GO111MODULE=on GOLANGCI_RELEASE="v1.16.0"
before_install:
- go get github.com/mattn/goveralls
- go get github.com/lawrencewoodman/roveralls
- GO111MODULE=off go get github.com/mattn/goveralls
- GO111MODULE=off go get github.com/lawrencewoodman/roveralls
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANGCI_RELEASE}
script:
- make check
- make test
- make build.docker
- roveralls
+1 -2
View File
@@ -19,8 +19,7 @@ test:
go test -v $(GOPKGS)
check:
golint $(GOPKGS)
go vet -v $(GOPKGS)
golangci-lint run ./...
build.local: build/$(BINARY)
build.linux: build/linux/$(BINARY)
+92 -6
View File
@@ -90,6 +90,7 @@ metadata:
metric-config.pods.requests-per-second.json-path/json-key: "$.http_server.rps"
metric-config.pods.requests-per-second.json-path/path: /metrics
metric-config.pods.requests-per-second.json-path/port: "9090"
metric-config.pods.requests-per-second.json-path/scheme: "https"
spec:
scaleTargetRef:
apiVersion: apps/v1
@@ -123,9 +124,9 @@ The json-path query support depends on the
See the README for possible queries. It's expected that the metric you query
returns something that can be turned into a `float64`.
The other configuration options `path` and `port` specifies where the metrics
endpoint is exposed on the pod. There's no default values, so they must be
defined.
The other configuration options `path`, `port` and `scheme` specify where the metrics
endpoint is exposed on the pod. The `path` and `port` options do not have default values
so they must be defined. The `scheme` is optional and defaults to `http`.
## Prometheus collector
@@ -152,9 +153,55 @@ the trade-offs between the two approaches.
| Metric | Description | Type | Kind |
| ------------ | -------------- | ------- | -- |
| `prometheus-query` | Generic metric which requires a user defined query. | External | |
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Object | *any* |
### Example
### Example: External Metric
This is an example of an HPA configured to get metrics based on a Prometheus
query. The query is defined in the annotation
`metric-config.external.prometheus-query.prometheus/processed-events-per-second`
where `processed-events-per-second` is the query name which will be associated
with the result of the query. A matching `query-name` label must be defined in
the `matchLabels` of the metric definition. This allows having multiple
prometheus queries associated with a single HPA.
```yaml
apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
metadata:
name: myapp-hpa
annotations:
# This annotation is optional.
# If specified, then this prometheus server is used,
# instead of the prometheus server specified as the CLI argument `--prometheus-server`.
metric-config.external.prometheus-query.prometheus/prometheus-server: http://prometheus.my-namespace.svc
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
# <configKey> == query-name
metric-config.external.prometheus-query.prometheus/processed-events-per-second: |
scalar(sum(rate(event-service_events_count{application="event-service",processed="true"}[1m])))
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: custom-metrics-consumer
minReplicas: 1
maxReplicas: 10
metrics:
- type: External
external:
metricName: prometheus-query
metricSelector:
matchLabels:
query-name: processed-events-per-second
targetAverageValue: 10
```
### Example: Object Metric [DEPRECATED]
> _Note: Prometheus Object metrics are **deprecated** and will most likely be
> removed in the future. Use the Prometheus External metrics instead as described
> above._
This is an example of an HPA configured to get metrics based on a Prometheus
query. The query is defined in the annotation
@@ -192,11 +239,15 @@ spec:
metricName: processed-events-per-second
target:
apiVersion: v1
kind: Service
name: event-service
kind: Pod
name: dummy-pod
targetValue: 10 # this will be treated as targetAverageValue
```
_Note:_ The HPA object requires an `Object` to be specified. However when a Prometheus metric is used there is no need
for this object. But to satisfy the schema we specify a dummy pod called `dummy-pod`.
## Skipper collector
The skipper collector is a simple wrapper around the Prometheus collector to
@@ -240,6 +291,17 @@ spec:
targetValue: 10 # this will be treated as targetAverageValue
```
### Metric weighting based on backend
Skipper supports sending traffic to different backend based on annotations present on the
`Ingress` object. When the metric name is specified without a backend as `requests-per-second`
then the number of replicas will be calculated based on the full traffic served by that ingress.
If however only the traffic being routed to a specific backend should be used then the
backend name can be specified as a metric name like `requests-per-second,backend1` which would
return the requests-per-second being sent to the `backend1`. The ingress annotation where
the backend weights can be obtained can be specified through the flag `--skipper-backends-annotation`.
**Note:** As of Kubernetes v1.10 the HPA does not support `targetAverageValue` for
metrics of type `Object`. In case of requests per second it does not make sense
to scale on a summed value because you can not make the total requests per
@@ -252,6 +314,30 @@ instead of a total sum.
The AWS collector allows scaling based on external metrics exposed by AWS
services e.g. SQS queue lengths.
### AWS IAM role
To integrate with AWS, the controller needs to run on nodes with
access to AWS API. Additionally the controller have to have a role
with the following policy to get all required data from AWS:
```yaml
PolicyDocument:
Statement:
- Action: 'sqs:GetQueueUrl'
Effect: Allow
Resource: '*'
- Action: 'sqs:GetQueueAttributes'
Effect: Allow
Resource: '*'
- Action: 'sqs:ListQueues'
Effect: Allow
Resource: '*'
- Action: 'sqs:ListQueueTags'
Effect: Allow
Resource: '*'
Version: 2012-10-17
```
### Supported metrics
| Metric | Description | Type |
+11 -1
View File
@@ -2,7 +2,13 @@ version: "2017-09-20"
pipeline:
- id: build
overlay: ci/golang
cache:
paths:
- /go/pkg/mod # pkg cache for Go modules
- ~/.cache/go-build # Go build cache
type: script
env:
GOFLAGS: "-mod=readonly"
commands:
- desc: test
cmd: |
@@ -14,7 +20,11 @@ pipeline:
cmd: |
if [[ $CDP_TARGET_BRANCH == master && ! $CDP_PULL_REQUEST_NUMBER ]]; then
IMAGE=registry-write.opensource.zalan.do/teapot/kube-metrics-adapter
VERSION=$(git describe --tags --always)
else
IMAGE=registry-write.opensource.zalan.do/teapot/kube-metrics-adapter-test
VERSION=$CDP_BUILD_VERSION
fi
IMAGE=$IMAGE VERSION=$CDP_BUILD_VERSION make build.push
IMAGE=$IMAGE VERSION=$VERSION make build.docker
git diff --stat --exit-code
IMAGE=$IMAGE VERSION=$VERSION make build.push
+4 -2
View File
@@ -3,13 +3,15 @@ package main
import (
"flag"
"fmt"
"log"
"net/http"
"time"
)
func metricsHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte(fmt.Sprintf(`{"queue": {"length": %d}}`, size)))
_, err := w.Write([]byte(fmt.Sprintf(`{"queue": {"length": %d}}`, size)))
log.Fatalf("failed to write: %v", err)
}
var (
@@ -29,5 +31,5 @@ func main() {
ReadTimeout: 5 * time.Second,
}
server.ListenAndServe()
log.Fatal(server.ListenAndServe())
}
+15 -73
View File
@@ -1,91 +1,33 @@
module github.com/zalando-incubator/kube-metrics-adapter
require (
bitbucket.org/ww/goautoneg v0.0.0-20120707110453-75cd24fc2f2c // indirect
github.com/BurntSushi/toml v0.3.0 // indirect
github.com/NYTimes/gziphandler v1.0.1 // indirect
github.com/PuerkitoBio/purell v1.1.0 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/aws/aws-sdk-go v1.16.6
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/boltdb/bolt v1.3.1 // indirect
github.com/coreos/bbolt v1.3.0 // indirect
github.com/coreos/etcd v3.3.9+incompatible // indirect
github.com/coreos/go-semver v0.2.0 // indirect
github.com/coreos/go-systemd v0.0.0-20180705093442-88bfeed483d3 // indirect
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful v2.8.0+incompatible // indirect
github.com/emicklei/go-restful-swagger12 v0.0.0-20170926063155-7524189396c6 // indirect
github.com/evanphx/json-patch v3.0.0+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-openapi/jsonpointer v0.0.0-20180322222829-3a0015ad55fa // indirect
github.com/go-openapi/jsonreference v0.0.0-20180322222742-3fb327e6747d // indirect
github.com/go-openapi/spec v0.0.0-20180801175345-384415f06ee2 // indirect
github.com/go-openapi/swag v0.0.0-20180715190254-becd2f08beaf // indirect
github.com/gogo/protobuf v1.1.1 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/gorilla/websocket v1.3.0 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47 // indirect
github.com/hpcloud/tail v1.0.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/json-iterator/go v1.1.5 // indirect
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20180824182428-26e5299457d3
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/onsi/ginkgo v1.6.0 // indirect
github.com/onsi/gomega v1.4.1 // indirect
github.com/pborman/uuid v0.0.0-20180122190007-c65b2f87fee3 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v0.9.0-pre1.0.20180824101016-4eb539fa85a2
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 // indirect
github.com/sirupsen/logrus v1.0.6
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/sirupsen/logrus v1.4.2
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.2 // indirect
github.com/stretchr/testify v1.2.2
github.com/stretchr/testify v1.3.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
github.com/ugorji/go v1.1.1 // indirect
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e // indirect
golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 // indirect
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 // indirect
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect
google.golang.org/appengine v1.2.0 // indirect
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 // indirect
google.golang.org/grpc v1.14.0 // indirect
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.2.1 // indirect
k8s.io/api v0.0.0-20180628040859-072894a440bd
k8s.io/apimachinery v0.0.0-20180621070125-103fd098999d
k8s.io/apiserver v0.0.0-20180628044425-01459b68eb5f
k8s.io/client-go v8.0.0+incompatible
k8s.io/kube-openapi v0.0.0-20180731170545-e3762e86a74c // indirect
k8s.io/metrics v0.0.0-20180718014405-6efa0bfaa5c1
k8s.io/api v0.0.0-20190918155943-95b840bb6a1f
k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655
k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad // indirect
k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90
k8s.io/component-base v0.0.0-20190918160511-547f6c5d7090
k8s.io/klog v0.4.0
k8s.io/metrics v0.0.0-20190226180357-f3f09b9076d1
)
go 1.13
+255 -94
View File
File diff suppressed because it is too large Load Diff
+40
View File
@@ -0,0 +1,40 @@
# Skipper Prometheus Metrics Collection
The skipper-ingress pods should be configured to be scraped by Prometheus. This
can be done by Prometheus service discovery using discovery of Kubernetes services
or Kubernetes pods:
```yaml
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "9911"
prometheus.io/scrape: "true"
labels:
application: skipper-ingress
name: skipper-ingress
spec:
ports:
- port: 80
protocol: TCP
targetPort: 9999
selector:
application: skipper-ingress
type: ClusterIP
```
This [configuration](https://github.com/zalando-incubator/kubernetes-on-aws/blob/dev/cluster/manifests/prometheus/configmap.yaml#L69)
shows how prometheus is configured in our clusters to scrape service endpoints.
The annotations `prometheus.io/path`, `prometheus.io/port` and `prometheus.io/scrape`
instruct Prometheus to scrape all pods of this service on the port _9911_ and
the path `/metrics`.
When the `kube-metrics-adapter` is started the flag `--prometheus-server` should be set so that
the adapter can query prometheus to get aggregated metrics. When running in kubernetes it can
be the service address of the prometheus service like `http://prometheus.kube-system`.
With these settings the `kube-metrics-adapter` can provide `request-per-second` metrics for ingress
objects which are present in the cluster. The prometheus instances scrape the metrics from
the `skipper-ingress` pods. The adapter then queries prometheus to get the metric and then
provides them to the API server when requested.
+1 -1
View File
@@ -23,7 +23,7 @@ import (
"github.com/zalando-incubator/kube-metrics-adapter/pkg/server"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/component-base/logs"
)
func main() {
+101
View File
@@ -0,0 +1,101 @@
package annotations
import (
"fmt"
"strings"
"time"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
)
const (
customMetricsPrefix = "metric-config."
perReplicaMetricsConfKey = "per-replica"
intervalMetricsConfKey = "interval"
)
type AnnotationConfigs struct {
CollectorName string
Configs map[string]string
PerReplica bool
Interval time.Duration
}
type MetricConfigKey struct {
Type autoscalingv2.MetricSourceType
MetricName string
}
type AnnotationConfigMap map[MetricConfigKey]*AnnotationConfigs
func (m AnnotationConfigMap) Parse(annotations map[string]string) error {
for key, val := range annotations {
if !strings.HasPrefix(key, customMetricsPrefix) {
continue
}
parts := strings.Split(key, "/")
if len(parts) != 2 {
// TODO: error?
continue
}
configs := strings.Split(parts[0], ".")
if len(configs) != 4 {
// TODO: error?
continue
}
key := MetricConfigKey{
MetricName: configs[2],
}
switch configs[1] {
case "pods":
key.Type = autoscalingv2.PodsMetricSourceType
case "object":
key.Type = autoscalingv2.ObjectMetricSourceType
default:
key.Type = autoscalingv2.ExternalMetricSourceType
}
metricCollector := configs[3]
config, ok := m[key]
if !ok {
config = &AnnotationConfigs{
CollectorName: metricCollector,
Configs: map[string]string{},
}
m[key] = config
}
// TODO: fail if collector name doesn't match
if config.CollectorName != metricCollector {
continue
}
if parts[1] == perReplicaMetricsConfKey {
config.PerReplica = true
continue
}
if parts[1] == intervalMetricsConfKey {
interval, err := time.ParseDuration(val)
if err != nil {
return fmt.Errorf("failed to parse interval value %s for %s: %v", val, key, err)
}
config.Interval = interval
continue
}
config.Configs[parts[1]] = val
}
return nil
}
func (m AnnotationConfigMap) GetAnnotationConfig(metricName string, metricType autoscalingv2.MetricSourceType) (*AnnotationConfigs, bool) {
key := MetricConfigKey{MetricName: metricName, Type: metricType}
config, ok := m[key]
return config, ok
}
+85
View File
@@ -0,0 +1,85 @@
package annotations
import (
"testing"
"github.com/stretchr/testify/require"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
)
func TestParser(t *testing.T) {
for _, tc := range []struct {
Name string
Annotations map[string]string
MetricName string
MetricType autoscalingv2.MetricSourceType
ExpectedConfig map[string]string
PerReplica bool
}{
{
Name: "no annotations",
Annotations: map[string]string{},
ExpectedConfig: map[string]string{},
},
{
Name: "pod metrics",
Annotations: map[string]string{
"metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps",
"metric-config.pods.requests-per-second.json-path/path": "/metrics",
"metric-config.pods.requests-per-second.json-path/port": "9090",
"metric-config.pods.requests-per-second.json-path/scheme": "https",
},
MetricName: "requests-per-second",
MetricType: autoscalingv2.PodsMetricSourceType,
ExpectedConfig: map[string]string{
"json-key": "$.http_server.rps",
"path": "/metrics",
"port": "9090",
"scheme": "https",
},
},
{
Name: "prometheus metrics",
Annotations: map[string]string{
"metric-config.object.processed-events-per-second.prometheus/query": "scalar(sum(rate(event-service_events_count{application=\"event-service\",processed=\"true\"}[1m])))",
"metric-config.object.processed-events-per-second.prometheus/per-replica": "true",
},
MetricName: "processed-events-per-second",
MetricType: autoscalingv2.ObjectMetricSourceType,
ExpectedConfig: map[string]string{
"query": "scalar(sum(rate(event-service_events_count{application=\"event-service\",processed=\"true\"}[1m])))",
},
PerReplica: true,
},
{
Name: "zmon collector",
Annotations: map[string]string{
"metric-config.external.zmon-check.zmon/key": "custom.*",
"metric-config.external.zmon-check.zmon/tag-application": "my-custom-app-*",
},
MetricName: "zmon-check",
MetricType: autoscalingv2.ExternalMetricSourceType,
ExpectedConfig: map[string]string{
"key": "custom.*",
"tag-application": "my-custom-app-*",
},
PerReplica: false,
},
} {
t.Run(tc.Name, func(t *testing.T) {
hpaMap := make(AnnotationConfigMap)
err := hpaMap.Parse(tc.Annotations)
require.NoError(t, err)
config, present := hpaMap.GetAnnotationConfig(tc.MetricName, tc.MetricType)
if len(tc.ExpectedConfig) == 0 {
require.False(t, present)
return
}
require.True(t, present)
for k, v := range tc.ExpectedConfig {
require.Equal(t, v, config.Configs[k])
}
require.Equal(t, tc.PerReplica, config.PerReplica)
})
}
}
+14 -13
View File
@@ -9,7 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
@@ -32,13 +32,13 @@ func NewAWSCollectorPlugin(sessions map[string]*session.Session) *AWSCollectorPl
}
// NewCollector initializes a new skipper collector from the specified HPA.
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Name {
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Metric.Name {
case AWSSQSQueueLengthMetric:
return NewAWSSQSCollector(c.sessions, config, interval)
}
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
}
type AWSSQSCollector struct {
@@ -47,18 +47,20 @@ type AWSSQSCollector struct {
region string
queueURL string
queueName string
labels map[string]string
metricName string
metricType autoscalingv2beta1.MetricSourceType
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
}
func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConfig, interval time.Duration) (*AWSSQSCollector, error) {
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for queue is not specified")
}
name, ok := config.Labels[sqsQueueNameLabelKey]
name, ok := config.Config[sqsQueueNameLabelKey]
if !ok {
return nil, fmt.Errorf("sqs queue name not specified on metric")
}
region, ok := config.Labels[sqsQueueRegionLabelKey]
region, ok := config.Config[sqsQueueRegionLabelKey]
if !ok {
return nil, fmt.Errorf("sqs queue region is not specified on metric")
}
@@ -83,9 +85,8 @@ func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConf
interval: interval,
queueURL: aws.StringValue(resp.QueueUrl),
queueName: name,
metricName: config.Name,
metric: config.Metric,
metricType: config.Type,
labels: config.Labels,
}, nil
}
@@ -109,8 +110,8 @@ func (c *AWSSQSCollector) GetMetrics() ([]CollectedMetric, error) {
metricValue := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metricName,
MetricLabels: c.labels,
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewQuantity(int64(i), resource.DecimalSI),
},
+41 -123
View File
@@ -2,22 +2,16 @@ package collector
import (
"fmt"
"strings"
"time"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/annotations"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/metrics/pkg/apis/custom_metrics"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
customMetricsPrefix = "metric-config."
perReplicaMetricsConfKey = "per-replica"
intervalMetricsConfKey = "interval"
)
type ObjectReference struct {
autoscalingv2beta1.CrossVersionObjectReference
autoscalingv2.CrossVersionObjectReference
Namespace string
}
@@ -49,7 +43,7 @@ func NewCollectorFactory() *CollectorFactory {
}
type CollectorPlugin interface {
NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
}
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
@@ -106,9 +100,9 @@ func (c *CollectorFactory) RegisterExternalCollector(metrics []string, plugin Co
}
}
func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Type {
case autoscalingv2beta1.PodsMetricSourceType:
case autoscalingv2.PodsMetricSourceType:
// first try to find a plugin by format
if plugin, ok := c.podsPlugins.Named[config.CollectorName]; ok {
return plugin.NewCollector(hpa, config, interval)
@@ -118,7 +112,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
if c.podsPlugins.Any != nil {
return c.podsPlugins.Any.NewCollector(hpa, config, interval)
}
case autoscalingv2beta1.ObjectMetricSourceType:
case autoscalingv2.ObjectMetricSourceType:
// first try to find a plugin by kind
if kinds, ok := c.objectPlugins.Named[config.ObjectReference.Kind]; ok {
if plugin, ok := kinds.Named[config.CollectorName]; ok {
@@ -139,8 +133,8 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
if c.objectPlugins.Any.Any != nil {
return c.objectPlugins.Any.Any.NewCollector(hpa, config, interval)
}
case autoscalingv2beta1.ExternalMetricSourceType:
if plugin, ok := c.externalPlugins[config.Name]; ok {
case autoscalingv2.ExternalMetricSourceType:
if plugin, ok := c.externalPlugins[config.Metric.Name]; ok {
return plugin.NewCollector(hpa, config, interval)
}
}
@@ -148,31 +142,15 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
}
func getObjectReference(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, metricName string) (custom_metrics.ObjectReference, error) {
for _, metric := range hpa.Spec.Metrics {
if metric.Type == autoscalingv2beta1.ObjectMetricSourceType && metric.Object.MetricName == metricName {
return custom_metrics.ObjectReference{
APIVersion: metric.Object.Target.APIVersion,
Kind: metric.Object.Target.Kind,
Name: metric.Object.Target.Name,
Namespace: hpa.Namespace,
}, nil
}
}
return custom_metrics.ObjectReference{}, fmt.Errorf("failed to find object reference")
}
type MetricTypeName struct {
Type autoscalingv2beta1.MetricSourceType
Name string
Type autoscalingv2.MetricSourceType
Metric autoscalingv2.MetricIdentifier
}
type CollectedMetric struct {
Type autoscalingv2beta1.MetricSourceType
Type autoscalingv2.MetricSourceType
Custom custom_metrics.MetricValue
External external_metrics.ExternalMetricValue
Labels map[string]string
}
type Collector interface {
@@ -187,83 +165,16 @@ type MetricConfig struct {
ObjectReference custom_metrics.ObjectReference
PerReplica bool
Interval time.Duration
Labels map[string]string
}
func parseCustomMetricsAnnotations(annotations map[string]string) (map[MetricTypeName]*MetricConfig, error) {
metrics := make(map[MetricTypeName]*MetricConfig)
for key, val := range annotations {
if !strings.HasPrefix(key, customMetricsPrefix) {
continue
}
parts := strings.Split(key, "/")
if len(parts) != 2 {
// TODO: error?
continue
}
configs := strings.Split(parts[0], ".")
if len(configs) != 4 {
// TODO: error?
continue
}
metricTypeName := MetricTypeName{
Name: configs[2],
}
switch configs[1] {
case "pods":
metricTypeName.Type = autoscalingv2beta1.PodsMetricSourceType
case "object":
metricTypeName.Type = autoscalingv2beta1.ObjectMetricSourceType
}
metricCollector := configs[3]
config, ok := metrics[metricTypeName]
if !ok {
config = &MetricConfig{
MetricTypeName: metricTypeName,
CollectorName: metricCollector,
Config: map[string]string{},
}
metrics[metricTypeName] = config
}
// TODO: fail if collector name doesn't match
if config.CollectorName != metricCollector {
continue
}
if parts[1] == perReplicaMetricsConfKey {
config.PerReplica = true
continue
}
if parts[1] == intervalMetricsConfKey {
interval, err := time.ParseDuration(val)
if err != nil {
return nil, fmt.Errorf("failed to parse interval value %s for %s: %v", val, key, err)
}
config.Interval = interval
continue
}
config.Config[parts[1]] = val
}
return metrics, nil
}
// ParseHPAMetrics parses the HPA object into a list of metric configurations.
func ParseHPAMetrics(hpa *autoscalingv2beta1.HorizontalPodAutoscaler) ([]*MetricConfig, error) {
func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfig, error) {
metricConfigs := make([]*MetricConfig, 0, len(hpa.Spec.Metrics))
// TODO: validate that the specified metric names are defined
// in the HPA
configs, err := parseCustomMetricsAnnotations(hpa.Annotations)
parser := make(annotations.AnnotationConfigMap)
err := parser.Parse(hpa.Annotations)
if err != nil {
return nil, err
}
@@ -275,39 +186,46 @@ func ParseHPAMetrics(hpa *autoscalingv2beta1.HorizontalPodAutoscaler) ([]*Metric
var ref custom_metrics.ObjectReference
switch metric.Type {
case autoscalingv2beta1.PodsMetricSourceType:
typeName.Name = metric.Pods.MetricName
case autoscalingv2beta1.ObjectMetricSourceType:
typeName.Name = metric.Object.MetricName
case autoscalingv2.PodsMetricSourceType:
typeName.Metric = metric.Pods.Metric
case autoscalingv2.ObjectMetricSourceType:
typeName.Metric = metric.Object.Metric
ref = custom_metrics.ObjectReference{
APIVersion: metric.Object.Target.APIVersion,
Kind: metric.Object.Target.Kind,
Name: metric.Object.Target.Name,
APIVersion: metric.Object.DescribedObject.APIVersion,
Kind: metric.Object.DescribedObject.Kind,
Name: metric.Object.DescribedObject.Name,
Namespace: hpa.Namespace,
}
case autoscalingv2beta1.ExternalMetricSourceType:
typeName.Name = metric.External.MetricName
case autoscalingv2beta1.ResourceMetricSourceType:
case autoscalingv2.ExternalMetricSourceType:
typeName.Metric = metric.External.Metric
case autoscalingv2.ResourceMetricSourceType:
continue // kube-metrics-adapter does not collect resource metrics
}
if config, ok := configs[typeName]; ok {
config.ObjectReference = ref
metricConfigs = append(metricConfigs, config)
continue
}
config := &MetricConfig{
MetricTypeName: typeName,
ObjectReference: ref,
Config: map[string]string{},
}
if metric.Type == autoscalingv2beta1.ExternalMetricSourceType {
config.Labels = metric.External.MetricSelector.MatchLabels
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
metric.External.Metric.Selector != nil &&
metric.External.Metric.Selector.MatchLabels != nil {
config.Config = metric.External.Metric.Selector.MatchLabels
}
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)
if present {
config.CollectorName = annotationConfigs.CollectorName
config.Interval = annotationConfigs.Interval
config.PerReplica = annotationConfigs.PerReplica
// configs specified in annotations takes precedence
// over labels
for k, v := range annotationConfigs.Configs {
config.Config[k] = v
}
}
metricConfigs = append(metricConfigs, config)
}
return metricConfigs, nil
}
+3 -3
View File
@@ -10,7 +10,7 @@ import (
"time"
"github.com/oliveagle/jsonpath"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
)
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
@@ -58,7 +58,7 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
// GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path
// query.
func (g *JSONPathMetricsGetter) GetMetric(pod *v1.Pod) (float64, error) {
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
if err != nil {
return 0, err
@@ -89,7 +89,7 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *v1.Pod) (float64, error) {
}
// getPodMetrics returns the content of the pods metrics endpoint.
func getPodMetrics(pod *v1.Pod, scheme, path string, port int) ([]byte, error) {
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" {
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
}
+41 -16
View File
@@ -1,42 +1,67 @@
package collector
import "time"
import (
"fmt"
"strings"
"time"
// MaxCollector is a simple aggregator collector that returns the maximum value
"k8s.io/apimachinery/pkg/api/resource"
)
// MaxWeightedCollector is a simple aggregator collector that returns the maximum value
// of metrics from all collectors.
type MaxCollector struct {
type MaxWeightedCollector struct {
collectors []Collector
interval time.Duration
weight float64
}
// NewMaxCollector initializes a new MacCollector.
func NewMaxCollector(interval time.Duration, collectors ...Collector) *MaxCollector {
return &MaxCollector{
// NewMaxWeightedCollector initializes a new MaxWeightedCollector.
func NewMaxWeightedCollector(interval time.Duration, weight float64, collectors ...Collector) *MaxWeightedCollector {
return &MaxWeightedCollector{
collectors: collectors,
interval: interval,
weight: weight,
}
}
// GetMetrics gets metrics from all collectors and return the higest value.
func (c *MaxCollector) GetMetrics() ([]CollectedMetric, error) {
var max CollectedMetric
func (c *MaxWeightedCollector) GetMetrics() ([]CollectedMetric, error) {
errors := make([]error, 0)
collectedMetrics := make([]CollectedMetric, 0)
for _, collector := range c.collectors {
values, err := collector.GetMetrics()
if err != nil {
if _, ok := err.(NoResultError); ok {
errors = append(errors, err)
continue
}
return nil, err
}
for _, value := range values {
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
max = value
}
}
collectedMetrics = append(collectedMetrics, values...)
}
if len(collectedMetrics) == 0 {
if len(errors) == 0 {
return nil, fmt.Errorf("no metrics collected, cannot determine max")
}
errorStrings := make([]string, len(errors))
for i, e := range errors {
errorStrings[i] = e.Error()
}
allErrors := strings.Join(errorStrings, ",")
return nil, fmt.Errorf("could not determine maximum due to errors: %s", allErrors)
}
max := collectedMetrics[0]
for _, value := range collectedMetrics {
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
max = value
}
}
max.Custom.Value = *resource.NewMilliQuantity(int64(c.weight*float64(max.Custom.Value.MilliValue())), resource.DecimalSI)
return []CollectedMetric{max}, nil
}
// Interval returns the interval at which the collector should run.
func (c *MaxCollector) Interval() time.Duration {
func (c *MaxWeightedCollector) Interval() time.Duration {
return c.interval
}
+99
View File
@@ -0,0 +1,99 @@
package collector
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/custom_metrics"
)
type dummyCollector struct {
value int64
}
func (c dummyCollector) Interval() time.Duration {
return time.Second
}
func (c dummyCollector) GetMetrics() ([]CollectedMetric, error) {
switch c.value {
case 0:
return nil, NoResultError{query: "invalid query"}
case -1:
return nil, fmt.Errorf("test error")
default:
quantity := resource.NewQuantity(c.value, resource.DecimalSI)
return []CollectedMetric{
{
Custom: custom_metrics.MetricValue{
Value: *quantity,
},
},
}, nil
}
}
func TestMaxCollector(t *testing.T) {
for _, tc := range []struct {
name string
values []int64
expected int
weight float64
errored bool
}{
{
name: "basic",
values: []int64{100, 10, 9},
expected: 100,
weight: 1,
errored: false,
},
{
name: "weighted",
values: []int64{100, 10, 9},
expected: 20,
weight: 0.2,
errored: false,
},
{
name: "with error",
values: []int64{10, 9, -1},
weight: 0.5,
errored: true,
},
{
name: "some invalid results",
values: []int64{0, 1, 0, 10, 9},
expected: 5,
weight: 0.5,
errored: false,
},
{
name: "both invalid results and errors",
values: []int64{0, 1, 0, -1, 10, 9},
weight: 0.5,
errored: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
collectors := make([]Collector, len(tc.values))
for i, v := range tc.values {
collectors[i] = dummyCollector{value: v}
}
wc := NewMaxWeightedCollector(time.Second, tc.weight, collectors...)
metrics, err := wc.GetMetrics()
if tc.errored {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Len(t, metrics, 1)
require.EqualValues(t, tc.expected, metrics[0].Custom.Value.Value())
}
})
}
}
-20
View File
@@ -1,20 +0,0 @@
package collector
import autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
type ObjectMetricsGetter interface {
GetObjectMetric(namespace string, reference *autoscalingv2beta1.CrossVersionObjectReference) (float64, error)
}
// type PodCollector struct {
// client kubernetes.Interface
// Getter PodMetricsGetter
// podLabelSelector string
// namespace string
// metricName string
// interval time.Duration
// }
// func NewObjectCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, metricName string, config *MetricConfig, interval time.Duration) (Collector, error) {
// switch
// }
+19 -20
View File
@@ -5,8 +5,8 @@ import (
"time"
log "github.com/sirupsen/logrus"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/api/core/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@@ -24,26 +24,26 @@ func NewPodCollectorPlugin(client kubernetes.Interface) *PodCollectorPlugin {
}
}
func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewPodCollector(p.client, hpa, config, interval)
}
type PodCollector struct {
client kubernetes.Interface
Getter PodMetricsGetter
podLabelSelector string
podLabelSelector *metav1.LabelSelector
namespace string
metricName string
metricType autoscalingv2beta1.MetricSourceType
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
interval time.Duration
logger *log.Entry
}
type PodMetricsGetter interface {
GetMetric(pod *v1.Pod) (float64, error)
GetMetric(pod *corev1.Pod) (float64, error)
}
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
// get pod selector based on HPA scale target ref
selector, err := getPodLabelSelector(client, hpa)
if err != nil {
@@ -53,7 +53,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.Horizo
c := &PodCollector{
client: client,
namespace: hpa.Namespace,
metricName: config.Name,
metric: config.Metric,
metricType: config.Type,
interval: interval,
podLabelSelector: selector,
@@ -79,7 +79,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.Horizo
func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
opts := metav1.ListOptions{
LabelSelector: c.podLabelSelector,
LabelSelector: labels.Set(c.podLabelSelector.MatchLabels).String(),
}
pods, err := c.client.CoreV1().Pods(c.namespace).List(opts)
@@ -106,11 +106,10 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
Name: pod.Name,
Namespace: pod.Namespace,
},
MetricName: c.metricName,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
},
Labels: pod.Labels,
}
values = append(values, metricValue)
@@ -123,21 +122,21 @@ func (c *PodCollector) Interval() time.Duration {
return c.interval
}
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler) (string, error) {
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) {
switch hpa.Spec.ScaleTargetRef.Kind {
case "Deployment":
deployment, err := client.AppsV1().Deployments(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
return nil, err
}
return labels.Set(deployment.Spec.Selector.MatchLabels).String(), nil
return deployment.Spec.Selector, nil
case "StatefulSet":
sts, err := client.AppsV1().StatefulSets(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
return nil, err
}
return labels.Set(sts.Spec.Selector.MatchLabels).String(), nil
return sts.Spec.Selector, nil
}
return "", fmt.Errorf("unable to get pod label selector for scale target ref '%s'", hpa.Spec.ScaleTargetRef.Kind)
return nil, fmt.Errorf("unable to get pod label selector for scale target ref '%s'", hpa.Spec.ScaleTargetRef.Kind)
}
+93 -30
View File
@@ -9,13 +9,28 @@ import (
"github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
PrometheusMetricName = "prometheus-query"
prometheusQueryNameLabelKey = "query-name"
prometheusServerAnnotationKey = "prometheus-server"
)
type NoResultError struct {
query string
}
func (r NoResultError) Error() string {
return fmt.Sprintf("query '%s' did not result a valid response", r.query)
}
type PrometheusCollectorPlugin struct {
promAPI promv1.API
client kubernetes.Interface
@@ -24,7 +39,7 @@ type PrometheusCollectorPlugin struct {
func NewPrometheusCollectorPlugin(client kubernetes.Interface, prometheusServer string) (*PrometheusCollectorPlugin, error) {
cfg := api.Config{
Address: prometheusServer,
RoundTripper: &http.Transport{},
RoundTripper: http.DefaultTransport,
}
promClient, err := api.NewClient(cfg)
@@ -38,7 +53,7 @@ func NewPrometheusCollectorPlugin(client kubernetes.Interface, prometheusServer
}, nil
}
func (p *PrometheusCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
func (p *PrometheusCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewPrometheusCollector(p.client, p.promAPI, hpa, config, interval)
}
@@ -46,31 +61,65 @@ type PrometheusCollector struct {
client kubernetes.Interface
promAPI promv1.API
query string
metricName string
metricType autoscalingv2beta1.MetricSourceType
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
objectReference custom_metrics.ObjectReference
interval time.Duration
perReplica bool
hpa *autoscalingv2beta1.HorizontalPodAutoscaler
hpa *autoscalingv2.HorizontalPodAutoscaler
}
func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PrometheusCollector, error) {
func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PrometheusCollector, error) {
c := &PrometheusCollector{
client: client,
objectReference: config.ObjectReference,
metricName: config.Name,
metricType: config.Type,
interval: interval,
promAPI: promAPI,
perReplica: config.PerReplica,
hpa: hpa,
client: client,
promAPI: promAPI,
interval: interval,
hpa: hpa,
metric: config.Metric,
metricType: config.Type,
}
if v, ok := config.Config["query"]; ok {
// TODO: validate query
c.query = v
} else {
return nil, fmt.Errorf("no prometheus query defined")
switch config.Type {
case autoscalingv2.ObjectMetricSourceType:
c.objectReference = config.ObjectReference
c.perReplica = config.PerReplica
if v, ok := config.Config["query"]; ok {
// TODO: validate query
c.query = v
} else {
return nil, fmt.Errorf("no prometheus query defined")
}
case autoscalingv2.ExternalMetricSourceType:
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for prometheus query is not specified")
}
queryName, ok := config.Config[prometheusQueryNameLabelKey]
if !ok {
return nil, fmt.Errorf("query name not specified on metric")
}
if v, ok := config.Config[queryName]; ok {
// TODO: validate query
c.query = v
} else {
return nil, fmt.Errorf("no prometheus query defined for metric")
}
// Use custom Prometheus URL if defined in HPA annotation.
if promServer, ok := config.Config[prometheusServerAnnotationKey]; ok {
cfg := api.Config{
Address: promServer,
RoundTripper: http.DefaultTransport,
}
promClient, err := api.NewClient(cfg)
if err != nil {
return nil, err
}
c.promAPI = promv1.NewAPI(promClient)
}
}
return c, nil
@@ -88,7 +137,7 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
case model.ValVector:
samples := value.(model.Vector)
if len(samples) == 0 {
return nil, fmt.Errorf("query '%s' returned no samples", c.query)
return nil, &NoResultError{query: c.query}
}
sampleValue = samples[0].Value
@@ -98,7 +147,7 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
}
if sampleValue.String() == "NaN" {
return nil, fmt.Errorf("query '%s' returned no samples: %s", c.query, sampleValue.String())
return nil, &NoResultError{query: c.query}
}
if c.perReplica {
@@ -113,14 +162,28 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
sampleValue = model.SampleValue(float64(sampleValue) / float64(replicas))
}
metricValue := CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: c.objectReference,
MetricName: c.metricName,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
},
var metricValue CollectedMetric
switch c.metricType {
case autoscalingv2.ObjectMetricSourceType:
metricValue = CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: c.objectReference,
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.metric.Selector},
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
},
}
case autoscalingv2.ExternalMetricSourceType:
metricValue = CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
},
}
}
return []CollectedMetric{metricValue}, nil
+117 -39
View File
@@ -1,11 +1,14 @@
package collector
import (
"encoding/json"
"errors"
"fmt"
"math"
"strings"
"time"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
@@ -13,73 +16,149 @@ import (
)
const (
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
rpsMetricName = "requests-per-second"
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
rpsMetricName = "requests-per-second"
rpsMetricBackendSeparator = ","
)
var (
errBackendNameMissing = errors.New("backend name must be specified for requests-per-second when traffic switching is used")
)
// SkipperCollectorPlugin is a collector plugin for initializing metrics
// collectors for getting skipper ingress metrics.
type SkipperCollectorPlugin struct {
client kubernetes.Interface
plugin CollectorPlugin
client kubernetes.Interface
plugin CollectorPlugin
backendAnnotations []string
}
// NewSkipperCollectorPlugin initializes a new SkipperCollectorPlugin.
func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin) (*SkipperCollectorPlugin, error) {
func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin, backendAnnotations []string) (*SkipperCollectorPlugin, error) {
return &SkipperCollectorPlugin{
client: client,
plugin: prometheusPlugin,
client: client,
plugin: prometheusPlugin,
backendAnnotations: backendAnnotations,
}, nil
}
// NewCollector initializes a new skipper collector from the specified HPA.
func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Name {
case rpsMetricName:
return NewSkipperCollector(c.client, c.plugin, hpa, config, interval)
default:
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
if strings.HasPrefix(config.Metric.Name, rpsMetricName) {
backend := ""
if len(config.Metric.Name) > len(rpsMetricName) {
metricNameParts := strings.Split(config.Metric.Name, rpsMetricBackendSeparator)
if len(metricNameParts) == 2 {
backend = metricNameParts[1]
}
}
return NewSkipperCollector(c.client, c.plugin, hpa, config, interval, c.backendAnnotations, backend)
}
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
}
// SkipperCollector is a metrics collector for getting skipper ingress metrics.
// It depends on the prometheus collector for getting the metrics.
type SkipperCollector struct {
client kubernetes.Interface
metricName string
objectReference custom_metrics.ObjectReference
hpa *autoscalingv2beta1.HorizontalPodAutoscaler
interval time.Duration
plugin CollectorPlugin
config MetricConfig
client kubernetes.Interface
metric autoscalingv2.MetricIdentifier
objectReference custom_metrics.ObjectReference
hpa *autoscalingv2.HorizontalPodAutoscaler
interval time.Duration
plugin CollectorPlugin
config MetricConfig
backend string
backendAnnotations []string
}
// NewSkipperCollector initializes a new SkipperCollector.
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*SkipperCollector, error) {
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2.HorizontalPodAutoscaler,
config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) {
return &SkipperCollector{
client: client,
objectReference: config.ObjectReference,
hpa: hpa,
metricName: config.Name,
interval: interval,
plugin: plugin,
config: *config,
client: client,
objectReference: config.ObjectReference,
hpa: hpa,
metric: config.Metric,
interval: interval,
plugin: plugin,
config: *config,
backend: backend,
backendAnnotations: backendAnnotations,
}, nil
}
func getAnnotationWeight(backendWeights string, backend string) float64 {
var weightsMap map[string]int
err := json.Unmarshal([]byte(backendWeights), &weightsMap)
if err != nil {
return 0
}
if weight, ok := weightsMap[backend]; ok {
return float64(weight) / 100
}
return 0
}
func getWeights(ingressAnnotations map[string]string, backendAnnotations []string, backend string) (float64, error) {
maxWeight := 0.0
annotationsPresent := false
for _, anno := range backendAnnotations {
if weightsMap, ok := ingressAnnotations[anno]; ok {
annotationsPresent = true
maxWeight = math.Max(maxWeight, getAnnotationWeight(weightsMap, backend))
}
}
// Fallback for ingresses that don't use traffic switching
if !annotationsPresent {
return 1.0, nil
}
// Require backend name here
if backend != "" {
return maxWeight, nil
}
return 0.0, errBackendNameMissing
}
// getCollector returns a collector for getting the metrics.
func (c *SkipperCollector) getCollector() (Collector, error) {
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
var annotations map[string]string
var hosts []string
switch c.objectReference.APIVersion {
case "extensions/v1beta1":
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
annotations = ingress.Annotations
for _, rule := range ingress.Spec.Rules {
hosts = append(hosts, rule.Host)
}
case "networking.k8s.io/v1beta1":
ingress, err := c.client.NetworkingV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
annotations = ingress.Annotations
for _, rule := range ingress.Spec.Rules {
hosts = append(hosts, rule.Host)
}
}
backendWeight, err := getWeights(annotations, c.backendAnnotations, c.backend)
if err != nil {
return nil, err
}
config := c.config
var collector Collector
collectors := make([]Collector, 0, len(ingress.Spec.Rules))
for _, rule := range ingress.Spec.Rules {
host := strings.Replace(rule.Host, ".", "_", -1)
collectors := make([]Collector, 0, len(hosts))
for _, host := range hosts {
host := strings.Replace(host, ".", "_", -1)
config.Config = map[string]string{
"query": fmt.Sprintf(rpsQuery, host),
}
@@ -92,10 +171,9 @@ func (c *SkipperCollector) getCollector() (Collector, error) {
collectors = append(collectors, collector)
}
if len(collectors) > 1 {
collector = NewMaxCollector(c.interval, collectors...)
} else if len(collectors) == 1 {
collector = collectors[0]
if len(collectors) > 0 {
collector = NewMaxWeightedCollector(c.interval, backendWeight, collectors...)
} else {
return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name)
}
@@ -144,7 +222,7 @@ func (c *SkipperCollector) Interval() time.Duration {
return c.interval
}
func targetRefReplicas(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler) (int32, error) {
func targetRefReplicas(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (int32, error) {
var replicas int32
switch hpa.Spec.ScaleTargetRef.Kind {
case "Deployment":
File diff suppressed because it is too large Load Diff
+20 -37
View File
@@ -7,7 +7,7 @@ import (
"time"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
@@ -41,17 +41,13 @@ func NewZMONCollectorPlugin(zmon zmon.ZMON) (*ZMONCollectorPlugin, error) {
}
// NewCollector initializes a new ZMON collector from the specified HPA.
func (c *ZMONCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Name {
func (c *ZMONCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Metric.Name {
case ZMONCheckMetric:
annotations := map[string]string{}
if hpa != nil {
annotations = hpa.Annotations
}
return NewZMONCollector(c.zmon, config, annotations, interval)
return NewZMONCollector(c.zmon, config, interval)
}
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
}
// ZMONCollector defines a collector that is able to collect metrics from ZMON.
@@ -60,17 +56,20 @@ type ZMONCollector struct {
interval time.Duration
checkID int
key string
labels map[string]string
tags map[string]string
duration time.Duration
aggregators []string
metricName string
metricType autoscalingv2beta1.MetricSourceType
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
}
// NewZMONCollector initializes a new ZMONCollector.
func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[string]string, interval time.Duration) (*ZMONCollector, error) {
checkIDStr, ok := config.Labels[zmonCheckIDLabelKey]
func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, interval time.Duration) (*ZMONCollector, error) {
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for zmon-check is not specified")
}
checkIDStr, ok := config.Config[zmonCheckIDLabelKey]
if !ok {
return nil, fmt.Errorf("ZMON check ID not specified on metric")
}
@@ -83,19 +82,14 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
key := ""
// get optional key
if k, ok := config.Labels[zmonKeyLabelKey]; ok {
key = k
}
// annotations takes precedence over label
if k, ok := annotations[zmonKeyAnnotationKey]; ok {
if k, ok := config.Config[zmonKeyLabelKey]; ok {
key = k
}
duration := defaultQueryDuration
// parse optional duration value
if d, ok := config.Labels[zmonDurationLabelKey]; ok {
if d, ok := config.Config[zmonDurationLabelKey]; ok {
duration, err = time.ParseDuration(d)
if err != nil {
return nil, err
@@ -104,26 +98,16 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
// parse tags
tags := make(map[string]string)
for k, v := range config.Labels {
for k, v := range config.Config {
if strings.HasPrefix(k, zmonTagPrefixLabelKey) {
key := strings.TrimPrefix(k, zmonTagPrefixLabelKey)
tags[key] = v
}
}
// parse tags from annotations
// tags defined in annotations takes precedence over tags defined in
// the labels.
for k, v := range annotations {
if strings.HasPrefix(k, zmonTagPrefixAnnotationKey) {
key := strings.TrimPrefix(k, zmonTagPrefixAnnotationKey)
tags[key] = v
}
}
// default aggregator is last
aggregators := []string{"last"}
if k, ok := config.Labels[zmonAggregatorsLabelKey]; ok {
if k, ok := config.Config[zmonAggregatorsLabelKey]; ok {
aggregators = strings.Split(k, ",")
}
@@ -135,9 +119,8 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
tags: tags,
duration: duration,
aggregators: aggregators,
metricName: config.Name,
metric: config.Metric,
metricType: config.Type,
labels: config.Labels,
}, nil
}
@@ -159,8 +142,8 @@ func (c *ZMONCollector) GetMetrics() ([]CollectedMetric, error) {
metricValue := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metricName,
MetricLabels: c.labels,
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{Time: point.Time},
Value: *resource.NewMilliQuantity(int64(point.Value*1000), resource.DecimalSI),
},
+19 -27
View File
@@ -6,7 +6,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
@@ -26,9 +26,9 @@ func TestZMONCollectorNewCollector(t *testing.T) {
config := &MetricConfig{
MetricTypeName: MetricTypeName{
Name: ZMONCheckMetric,
Metric: newMetricIdentifier(ZMONCheckMetric),
},
Labels: map[string]string{
Config: map[string]string{
zmonCheckIDLabelKey: "1234",
zmonAggregatorsLabelKey: "max",
zmonTagPrefixLabelKey + "alias": "cluster_alias",
@@ -37,7 +37,7 @@ func TestZMONCollectorNewCollector(t *testing.T) {
},
}
hpa := &autoscalingv2beta1.HorizontalPodAutoscaler{}
hpa := &autoscalingv2.HorizontalPodAutoscaler{}
collector, err := collectPlugin.NewCollector(hpa, config, 1*time.Second)
require.NoError(t, err)
@@ -50,39 +50,31 @@ func TestZMONCollectorNewCollector(t *testing.T) {
require.Equal(t, []string{"max"}, zmonCollector.aggregators)
require.Equal(t, map[string]string{"alias": "cluster_alias"}, zmonCollector.tags)
// check that annotations overwrites labels
hpa.ObjectMeta = metav1.ObjectMeta{
Annotations: map[string]string{
zmonKeyAnnotationKey: "annotation_key",
zmonTagPrefixAnnotationKey + "alias": "cluster_alias_annotation",
},
}
collector, err = collectPlugin.NewCollector(hpa, config, 1*time.Second)
require.NoError(t, err)
require.NotNil(t, collector)
zmonCollector = collector.(*ZMONCollector)
require.Equal(t, "annotation_key", zmonCollector.key)
require.Equal(t, map[string]string{"alias": "cluster_alias_annotation"}, zmonCollector.tags)
// should fail if the metric name isn't ZMON
config.Name = "non-zmon-check"
config.Metric = newMetricIdentifier("non-zmon-check")
_, err = collectPlugin.NewCollector(nil, config, 1*time.Second)
require.Error(t, err)
// should fail if the check id is not specified.
delete(config.Labels, zmonCheckIDLabelKey)
config.Name = ZMONCheckMetric
delete(config.Config, zmonCheckIDLabelKey)
config.Metric.Name = ZMONCheckMetric
_, err = collectPlugin.NewCollector(nil, config, 1*time.Second)
require.Error(t, err)
}
func newMetricIdentifier(metricName string) autoscalingv2.MetricIdentifier {
selector := metav1.LabelSelector{}
return autoscalingv2.MetricIdentifier{Name: metricName, Selector: &selector}
}
func TestZMONCollectorGetMetrics(tt *testing.T) {
config := &MetricConfig{
MetricTypeName: MetricTypeName{
Name: ZMONCheckMetric,
Type: "foo",
Metric: newMetricIdentifier(ZMONCheckMetric),
Type: "foo",
},
Labels: map[string]string{
Config: map[string]string{
zmonCheckIDLabelKey: "1234",
zmonAggregatorsLabelKey: "max",
zmonTagPrefixLabelKey + "alias": "cluster_alias",
@@ -108,8 +100,8 @@ func TestZMONCollectorGetMetrics(tt *testing.T) {
{
Type: config.Type,
External: external_metrics.ExternalMetricValue{
MetricName: config.Name,
MetricLabels: config.Labels,
MetricName: config.Metric.Name,
MetricLabels: config.Metric.Selector.MatchLabels,
Timestamp: metav1.Time{Time: time.Time{}},
Value: *resource.NewMilliQuantity(int64(1.0)*1000, resource.DecimalSI),
},
@@ -125,7 +117,7 @@ func TestZMONCollectorGetMetrics(tt *testing.T) {
dataPoints: ti.dataPoints,
}
zmonCollector, err := NewZMONCollector(z, config, nil, 1*time.Second)
zmonCollector, err := NewZMONCollector(z, config, 1*time.Second)
require.NoError(t, err)
metrics, _ := zmonCollector.GetMetrics()
+247
View File
@@ -0,0 +1,247 @@
package provider
import (
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscaling "k8s.io/api/autoscaling/v2beta2"
core "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/conversion"
)
// from: https://github.com/kubernetes/kubernetes/blob/v1.14.4/pkg/apis/autoscaling/v2beta1/conversion.go
func Convert_autoscaling_MetricTarget_To_v2beta1_CrossVersionObjectReference(in *autoscaling.MetricTarget, out *autoscalingv2beta1.CrossVersionObjectReference, s conversion.Scope) error {
return nil
}
func Convert_v2beta1_CrossVersionObjectReference_To_autoscaling_MetricTarget(in *autoscalingv2beta1.CrossVersionObjectReference, out *autoscaling.MetricTarget, s conversion.Scope) error {
return nil
}
func Convert_v2beta1_ResourceMetricStatus_To_autoscaling_ResourceMetricStatus(in *autoscalingv2beta1.ResourceMetricStatus, out *autoscaling.ResourceMetricStatus, s conversion.Scope) error {
out.Name = core.ResourceName(in.Name)
utilization := in.CurrentAverageUtilization
averageValue := in.CurrentAverageValue
out.Current = autoscaling.MetricValueStatus{
AverageValue: &averageValue,
AverageUtilization: utilization,
}
return nil
}
func Convert_autoscaling_ResourceMetricStatus_To_v2beta1_ResourceMetricStatus(in *autoscaling.ResourceMetricStatus, out *autoscalingv2beta1.ResourceMetricStatus, s conversion.Scope) error {
out.Name = v1.ResourceName(in.Name)
out.CurrentAverageUtilization = in.Current.AverageUtilization
if in.Current.AverageValue != nil {
out.CurrentAverageValue = *in.Current.AverageValue
}
return nil
}
func Convert_v2beta1_ResourceMetricSource_To_autoscaling_ResourceMetricSource(in *autoscalingv2beta1.ResourceMetricSource, out *autoscaling.ResourceMetricSource, s conversion.Scope) error {
out.Name = core.ResourceName(in.Name)
utilization := in.TargetAverageUtilization
averageValue := in.TargetAverageValue
var metricType autoscaling.MetricTargetType
if utilization == nil {
metricType = autoscaling.AverageValueMetricType
} else {
metricType = autoscaling.UtilizationMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
AverageValue: averageValue,
AverageUtilization: utilization,
}
return nil
}
func Convert_autoscaling_ResourceMetricSource_To_v2beta1_ResourceMetricSource(in *autoscaling.ResourceMetricSource, out *autoscalingv2beta1.ResourceMetricSource, s conversion.Scope) error {
out.Name = v1.ResourceName(in.Name)
out.TargetAverageUtilization = in.Target.AverageUtilization
out.TargetAverageValue = in.Target.AverageValue
return nil
}
func Convert_autoscaling_ExternalMetricSource_To_v2beta1_ExternalMetricSource(in *autoscaling.ExternalMetricSource, out *autoscalingv2beta1.ExternalMetricSource, s conversion.Scope) error {
out.MetricName = in.Metric.Name
out.TargetValue = in.Target.Value
out.TargetAverageValue = in.Target.AverageValue
out.MetricSelector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ExternalMetricSource_To_autoscaling_ExternalMetricSource(in *autoscalingv2beta1.ExternalMetricSource, out *autoscaling.ExternalMetricSource, s conversion.Scope) error {
value := in.TargetValue
averageValue := in.TargetAverageValue
var metricType autoscaling.MetricTargetType
if value == nil {
metricType = autoscaling.AverageValueMetricType
} else {
metricType = autoscaling.ValueMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
Value: value,
AverageValue: averageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.MetricSelector,
}
return nil
}
func Convert_autoscaling_ObjectMetricSource_To_v2beta1_ObjectMetricSource(in *autoscaling.ObjectMetricSource, out *autoscalingv2beta1.ObjectMetricSource, s conversion.Scope) error {
if in.Target.Value != nil {
out.TargetValue = *in.Target.Value
}
out.AverageValue = in.Target.AverageValue
out.Target = autoscalingv2beta1.CrossVersionObjectReference{
Kind: in.DescribedObject.Kind,
Name: in.DescribedObject.Name,
APIVersion: in.DescribedObject.APIVersion,
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ObjectMetricSource_To_autoscaling_ObjectMetricSource(in *autoscalingv2beta1.ObjectMetricSource, out *autoscaling.ObjectMetricSource, s conversion.Scope) error {
var metricType autoscaling.MetricTargetType
if in.AverageValue == nil {
metricType = autoscaling.ValueMetricType
} else {
metricType = autoscaling.AverageValueMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
Value: &in.TargetValue,
AverageValue: in.AverageValue,
}
out.DescribedObject = autoscaling.CrossVersionObjectReference{
Kind: in.Target.Kind,
Name: in.Target.Name,
APIVersion: in.Target.APIVersion,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_PodsMetricSource_To_v2beta1_PodsMetricSource(in *autoscaling.PodsMetricSource, out *autoscalingv2beta1.PodsMetricSource, s conversion.Scope) error {
if in.Target.AverageValue != nil {
targetAverageValue := *in.Target.AverageValue
out.TargetAverageValue = targetAverageValue
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_PodsMetricSource_To_autoscaling_PodsMetricSource(in *autoscalingv2beta1.PodsMetricSource, out *autoscaling.PodsMetricSource, s conversion.Scope) error {
targetAverageValue := &in.TargetAverageValue
var metricType autoscaling.MetricTargetType
metricType = autoscaling.AverageValueMetricType
out.Target = autoscaling.MetricTarget{
Type: metricType,
AverageValue: targetAverageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_ExternalMetricStatus_To_v2beta1_ExternalMetricStatus(in *autoscaling.ExternalMetricStatus, out *autoscalingv2beta1.ExternalMetricStatus, s conversion.Scope) error {
if &in.Current.AverageValue != nil {
out.CurrentAverageValue = in.Current.AverageValue
}
out.MetricName = in.Metric.Name
if in.Current.Value != nil {
out.CurrentValue = *in.Current.Value
}
out.MetricSelector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ExternalMetricStatus_To_autoscaling_ExternalMetricStatus(in *autoscalingv2beta1.ExternalMetricStatus, out *autoscaling.ExternalMetricStatus, s conversion.Scope) error {
value := in.CurrentValue
averageValue := in.CurrentAverageValue
out.Current = autoscaling.MetricValueStatus{
Value: &value,
AverageValue: averageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.MetricSelector,
}
return nil
}
func Convert_autoscaling_ObjectMetricStatus_To_v2beta1_ObjectMetricStatus(in *autoscaling.ObjectMetricStatus, out *autoscalingv2beta1.ObjectMetricStatus, s conversion.Scope) error {
if in.Current.Value != nil {
out.CurrentValue = *in.Current.Value
}
out.Target = autoscalingv2beta1.CrossVersionObjectReference{
Kind: in.DescribedObject.Kind,
Name: in.DescribedObject.Name,
APIVersion: in.DescribedObject.APIVersion,
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
if in.Current.AverageValue != nil {
currentAverageValue := *in.Current.AverageValue
out.AverageValue = &currentAverageValue
}
return nil
}
func Convert_v2beta1_ObjectMetricStatus_To_autoscaling_ObjectMetricStatus(in *autoscalingv2beta1.ObjectMetricStatus, out *autoscaling.ObjectMetricStatus, s conversion.Scope) error {
out.Current = autoscaling.MetricValueStatus{
Value: &in.CurrentValue,
AverageValue: in.AverageValue,
}
out.DescribedObject = autoscaling.CrossVersionObjectReference{
Kind: in.Target.Kind,
Name: in.Target.Name,
APIVersion: in.Target.APIVersion,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_PodsMetricStatus_To_v2beta1_PodsMetricStatus(in *autoscaling.PodsMetricStatus, out *autoscalingv2beta1.PodsMetricStatus, s conversion.Scope) error {
if in.Current.AverageValue != nil {
out.CurrentAverageValue = *in.Current.AverageValue
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_PodsMetricStatus_To_autoscaling_PodsMetricStatus(in *autoscalingv2beta1.PodsMetricStatus, out *autoscaling.PodsMetricStatus, s conversion.Scope) error {
out.Current = autoscaling.MetricValueStatus{
AverageValue: &in.CurrentAverageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
File diff suppressed because it is too large Load Diff
+28 -16
View File
@@ -12,7 +12,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/recorder"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@@ -46,10 +46,6 @@ var (
})
)
type objectCollector struct {
ObjectReference *autoscalingv2beta1.CrossVersionObjectReference
}
// HPAProvider is a base provider for initializing metric collectors based on
// HPA resources.
type HPAProvider struct {
@@ -58,7 +54,7 @@ type HPAProvider struct {
collectorScheduler *CollectorScheduler
collectorInterval time.Duration
metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2beta1.HorizontalPodAutoscaler
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore
collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder
@@ -125,18 +121,34 @@ func (p *HPAProvider) updateHPAs() error {
return err
}
newHPACache := make(map[resourceReference]autoscalingv2beta1.HorizontalPodAutoscaler, len(hpas.Items))
newHPACache := make(map[resourceReference]autoscalingv2.HorizontalPodAutoscaler, len(hpas.Items))
newHPAs := 0
for _, hpa := range hpas.Items {
hpa := hpa
for _, hpav1 := range hpas.Items {
hpav1 := hpav1
hpa := autoscalingv2.HorizontalPodAutoscaler{}
err := Convert_v2beta1_HorizontalPodAutoscaler_To_autoscaling_HorizontalPodAutoscaler(&hpav1, &hpa, nil)
if err != nil {
p.logger.Errorf("Failed to convert HPA to v2beta2: %v", err)
continue
}
resourceRef := resourceReference{
Name: hpa.Name,
Namespace: hpa.Namespace,
}
if cachedHPA, ok := p.hpaCache[resourceRef]; !ok || !equalHPA(cachedHPA, hpa) {
cachedHPA, ok := p.hpaCache[resourceRef]
hpaUpdated := !equalHPA(cachedHPA, hpa)
if !ok || hpaUpdated {
// if the hpa has changed then remove the previous
// scheduled collector.
if hpaUpdated {
p.logger.Infof("Removing previously scheduled metrics collector: %s", resourceRef)
p.collectorScheduler.Remove(resourceRef)
}
metricConfigs, err := collector.ParseHPAMetrics(&hpa)
if err != nil {
p.logger.Errorf("Failed to parse HPA metrics: %v", err)
@@ -187,7 +199,7 @@ func (p *HPAProvider) updateHPAs() error {
}
// equalHPA returns true if two HPAs are identical (apart from their status).
func equalHPA(a, b autoscalingv2beta1.HorizontalPodAutoscaler) bool {
func equalHPA(a, b autoscalingv2.HorizontalPodAutoscaler) bool {
// reset resource version to not compare it since this will change
// whenever the status of the object is updated. We only want to
// compare the metadata and the spec.
@@ -225,15 +237,15 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
p.logger.Infof("Collected %d new metric(s)", len(collection.Values))
for _, value := range collection.Values {
switch value.Type {
case autoscalingv2beta1.ObjectMetricSourceType, autoscalingv2beta1.PodsMetricSourceType:
case autoscalingv2.ObjectMetricSourceType, autoscalingv2.PodsMetricSourceType:
p.logger.Infof("Collected new custom metric '%s' (%s) for %s %s/%s",
value.Custom.MetricName,
value.Custom.Metric.Name,
value.Custom.Value.String(),
value.Custom.DescribedObject.Kind,
value.Custom.DescribedObject.Namespace,
value.Custom.DescribedObject.Name,
)
case autoscalingv2beta1.ExternalMetricSourceType:
case autoscalingv2.ExternalMetricSourceType:
p.logger.Infof("Collected new external metric '%s' (%s) [%s]",
value.External.MetricName,
value.External.Value.String(),
@@ -250,7 +262,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
}
// GetMetricByName gets a single metric by name.
func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo) (*custom_metrics.MetricValue, error) {
func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
metric := p.metricStore.GetMetricsByName(name, info)
if metric == nil {
return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name.Name)
@@ -260,7 +272,7 @@ func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.C
// GetMetricBySelector returns metrics for namespaced resources by
// label selector.
func (p *HPAProvider) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo) (*custom_metrics.MetricValueList, error) {
func (p *HPAProvider) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
return p.metricStore.GetMetricsBySelector(namespace, selector, info), nil
}
+92
View File
@@ -0,0 +1,92 @@
package provider
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
autoscalingv1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)
type mockCollectorPlugin struct{}
func (m mockCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *collector.MetricConfig, interval time.Duration) (collector.Collector, error) {
return mockCollector{}, nil
}
type mockCollector struct{}
func (c mockCollector) GetMetrics() ([]collector.CollectedMetric, error) {
return nil, nil
}
func (c mockCollector) Interval() time.Duration {
return 1 * time.Second
}
func TestUpdateHPAs(t *testing.T) {
value := resource.MustParse("1k")
hpa := &autoscalingv1.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa1",
Namespace: "default",
Annotations: map[string]string{
"metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps",
"metric-config.pods.requests-per-second.json-path/path": "/metrics",
"metric-config.pods.requests-per-second.json-path/port": "9090",
},
},
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app",
APIVersion: "apps/v1",
},
MinReplicas: &[]int32{1}[0],
MaxReplicas: 10,
Metrics: []autoscalingv1.MetricSpec{
{
Type: autoscalingv1.PodsMetricSourceType,
Pods: &autoscalingv1.PodsMetricSource{
MetricName: "requests-per-second",
TargetAverageValue: value,
},
},
},
},
}
fakeClient := fake.NewSimpleClientset()
var err error
hpa, err = fakeClient.AutoscalingV2beta1().HorizontalPodAutoscalers("default").Create(hpa)
require.NoError(t, err)
collectorFactory := collector.NewCollectorFactory()
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
require.NoError(t, err)
require.Len(t, provider.collectorScheduler.table, 1)
// update HPA
hpa.Annotations["metric-config.pods.requests-per-second.json-path/port"] = "8080"
_, err = fakeClient.AutoscalingV2beta1().HorizontalPodAutoscalers("default").Update(hpa)
require.NoError(t, err)
err = provider.updateHPAs()
require.NoError(t, err)
require.Len(t, provider.collectorScheduler.table, 1)
}
+19 -21
View File
@@ -9,7 +9,7 @@ import (
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
@@ -20,9 +20,8 @@ import (
// customMetricsStoredMetric is a wrapper around custom_metrics.MetricValue with a metricsTTL used
// to clean up stale metrics from the customMetricsStore.
type customMetricsStoredMetric struct {
Value custom_metrics.MetricValue
Labels map[string]string
TTL time.Time
Value custom_metrics.MetricValue
TTL time.Time
}
type externalMetricsStoredMetric struct {
@@ -50,15 +49,15 @@ func NewMetricStore(ttlCalculator func() time.Time) *MetricStore {
// Insert inserts a collected metric into the metric customMetricsStore.
func (s *MetricStore) Insert(value collector.CollectedMetric) {
switch value.Type {
case autoscalingv2beta1.ObjectMetricSourceType, autoscalingv2beta1.PodsMetricSourceType:
s.insertCustomMetric(value.Custom, value.Labels)
case autoscalingv2beta1.ExternalMetricSourceType:
case autoscalingv2.ObjectMetricSourceType, autoscalingv2.PodsMetricSourceType:
s.insertCustomMetric(value.Custom)
case autoscalingv2.ExternalMetricSourceType:
s.insertExternalMetric(value.External)
}
}
// insertCustomMetric inserts a custom metric plus labels into the store.
func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue, labels map[string]string) {
func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue) {
s.Lock()
defer s.Unlock()
@@ -77,15 +76,14 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue, label
}
metric := customMetricsStoredMetric{
Value: value,
Labels: labels,
TTL: s.metricsTTLCalculator(), // TODO: make TTL configurable
Value: value,
TTL: s.metricsTTLCalculator(), // TODO: make TTL configurable
}
metrics, ok := s.customMetricsStore[value.MetricName]
metrics, ok := s.customMetricsStore[value.Metric.Name]
if !ok {
s.customMetricsStore[value.MetricName] = map[schema.GroupResource]map[string]map[string]customMetricsStoredMetric{
groupResource: map[string]map[string]customMetricsStoredMetric{
s.customMetricsStore[value.Metric.Name] = map[schema.GroupResource]map[string]map[string]customMetricsStoredMetric{
groupResource: {
value.DescribedObject.Namespace: map[string]customMetricsStoredMetric{
value.DescribedObject.Name: metric,
},
@@ -97,7 +95,7 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue, label
group, ok := metrics[groupResource]
if !ok {
metrics[groupResource] = map[string]map[string]customMetricsStoredMetric{
value.DescribedObject.Namespace: map[string]customMetricsStoredMetric{
value.DescribedObject.Namespace: {
value.DescribedObject.Name: metric,
},
}
@@ -157,25 +155,25 @@ func (s *MetricStore) GetMetricsBySelector(namespace string, selector labels.Sel
metrics, ok := s.customMetricsStore[info.Metric]
if !ok {
return nil
return &custom_metrics.MetricValueList{}
}
group, ok := metrics[info.GroupResource]
if !ok {
return nil
return &custom_metrics.MetricValueList{}
}
if !info.Namespaced {
for _, metricMap := range group {
for _, metric := range metricMap {
if selector.Matches(labels.Set(metric.Labels)) {
if selector.Matches(labels.Set(metric.Value.Metric.Selector.MatchLabels)) {
matchedMetrics = append(matchedMetrics, metric.Value)
}
}
}
} else if metricMap, ok := group[namespace]; ok {
for _, metric := range metricMap {
if selector.Matches(labels.Set(metric.Labels)) {
if metric.Value.Metric.Selector != nil && selector.Matches(labels.Set(metric.Value.Metric.Selector.MatchLabels)) {
matchedMetrics = append(matchedMetrics, metric.Value)
}
}
@@ -222,13 +220,13 @@ func (s *MetricStore) ListAllMetrics() []provider.CustomMetricInfo {
metrics := make([]provider.CustomMetricInfo, 0, len(s.customMetricsStore))
for metricName, customMetricsStoredMetrics := range s.customMetricsStore {
for metric, customMetricsStoredMetrics := range s.customMetricsStore {
for groupResource, group := range customMetricsStoredMetrics {
for namespace := range group {
metric := provider.CustomMetricInfo{
GroupResource: groupResource,
Namespaced: namespace != "",
Metric: metricName,
Metric: metric,
}
metrics = append(metrics, metric)
}
+60 -51
View File
@@ -1,20 +1,29 @@
package provider
import (
"testing"
"time"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
"github.com/stretchr/testify/require"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
"k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/metrics/pkg/apis/custom_metrics"
"k8s.io/metrics/pkg/apis/external_metrics"
"testing"
"time"
)
func newMetricIdentifier(metricName string) custom_metrics.MetricIdentifier {
selector := metav1.LabelSelector{}
return custom_metrics.MetricIdentifier{
Name: metricName,
Selector: &selector,
}
}
func TestInternalMetricStorage(t *testing.T) {
var metricStoreTests = []struct {
test string
@@ -33,10 +42,10 @@ func TestInternalMetricStorage(t *testing.T) {
{
test: "insert/list/get a namespaced resource metric",
insert: collector.CollectedMetric{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(0, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Namespace: "default",
@@ -80,10 +89,10 @@ func TestInternalMetricStorage(t *testing.T) {
{
test: "insert/list/get a Pod metric",
insert: collector.CollectedMetric{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(0, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Namespace: "default",
@@ -136,10 +145,10 @@ func TestInternalMetricStorage(t *testing.T) {
{
test: "insert/list/get a non-namespaced resource metric",
insert: collector.CollectedMetric{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(0, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Kind: "Node",
@@ -182,10 +191,10 @@ func TestInternalMetricStorage(t *testing.T) {
{
test: "insert/list/get an Ingress metric",
insert: collector.CollectedMetric{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(0, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Kind: "Ingress",
@@ -286,10 +295,10 @@ func TestMultipleMetricValues(t *testing.T) {
test: "insert/list/get multiple Ingress metrics",
insert: []collector.CollectedMetric{
{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(0, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Kind: "Ingress",
@@ -298,10 +307,10 @@ func TestMultipleMetricValues(t *testing.T) {
},
},
{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(1, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(1, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Kind: "Ingress",
@@ -355,10 +364,10 @@ func TestMultipleMetricValues(t *testing.T) {
test: "insert/list/get multiple namespaced resource metrics",
insert: []collector.CollectedMetric{
{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(0, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Namespace: "default",
@@ -368,10 +377,10 @@ func TestMultipleMetricValues(t *testing.T) {
},
},
{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(1, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(1, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Namespace: "default",
@@ -489,10 +498,10 @@ func TestCustomMetricsStorageErrors(t *testing.T) {
{
test: "test that not all Kinds are mapped to a group/resource",
insert: collector.CollectedMetric{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(0, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Namespace: "default",
@@ -559,7 +568,7 @@ func TestCustomMetricsStorageErrors(t *testing.T) {
require.Nil(t, metric)
metrics := metricsStore.GetMetricsBySelector(tc.byLabel.namespace, tc.byLabel.selector, tc.byLabel.info)
require.Nil(t, metrics)
require.Equal(t, &custom_metrics.MetricValueList{}, metrics)
})
}
@@ -581,10 +590,10 @@ func TestCustomMetricsStorageErrors(t *testing.T) {
test: "insert/list/get multiple metrics in different groups",
insert: []collector.CollectedMetric{
{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(0, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Namespace: "default",
@@ -594,10 +603,10 @@ func TestCustomMetricsStorageErrors(t *testing.T) {
},
},
{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(1, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(1, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Namespace: "default",
@@ -607,10 +616,10 @@ func TestCustomMetricsStorageErrors(t *testing.T) {
},
},
{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(1, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(1, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Namespace: "new-namespace",
@@ -684,7 +693,7 @@ func TestExternalMetricStorage(t *testing.T) {
{
test: "insert/list/get an external metric",
insert: collector.CollectedMetric{
Type: v2beta1.MetricSourceType("External"),
Type: autoscalingv2.MetricSourceType("External"),
External: external_metrics.ExternalMetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
@@ -744,7 +753,7 @@ func TestMultipleExternalMetricStorage(t *testing.T) {
test: "the latest value overrides the last one",
insert: []collector.CollectedMetric{
{
Type: v2beta1.MetricSourceType("External"),
Type: autoscalingv2.MetricSourceType("External"),
External: external_metrics.ExternalMetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
@@ -752,7 +761,7 @@ func TestMultipleExternalMetricStorage(t *testing.T) {
},
},
{
Type: v2beta1.MetricSourceType("External"),
Type: autoscalingv2.MetricSourceType("External"),
External: external_metrics.ExternalMetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(1, ""),
@@ -808,10 +817,10 @@ func TestMetricsExpiration(t *testing.T) {
})
customMetric := collector.CollectedMetric{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(0, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Kind: "Node",
@@ -821,7 +830,7 @@ func TestMetricsExpiration(t *testing.T) {
}
externalMetric := collector.CollectedMetric{
Type: v2beta1.MetricSourceType("External"),
Type: autoscalingv2.MetricSourceType("External"),
External: external_metrics.ExternalMetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
@@ -847,10 +856,10 @@ func TestMetricsNonExpiration(t *testing.T) {
})
customMetric := collector.CollectedMetric{
Type: v2beta1.MetricSourceType("Object"),
Type: autoscalingv2.MetricSourceType("Object"),
Custom: custom_metrics.MetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),
Metric: newMetricIdentifier("metric-per-unit"),
Value: *resource.NewQuantity(0, ""),
DescribedObject: custom_metrics.ObjectReference{
Name: "metricObject",
Kind: "Node",
@@ -860,7 +869,7 @@ func TestMetricsNonExpiration(t *testing.T) {
}
externalMetric := collector.CollectedMetric{
Type: v2beta1.MetricSourceType("External"),
Type: autoscalingv2.MetricSourceType("External"),
External: external_metrics.ExternalMetricValue{
MetricName: "metric-per-unit",
Value: *resource.NewQuantity(0, ""),

Some files were not shown because too many files have changed in this diff Show More