Compare commits

...

64 Commits

Author SHA1 Message Date
9e211b181a Merge pull request #101 from zalando-incubator/update-to-v2beta2
Only support autoscaling/v2beta2
2020-01-29 16:47:15 +01:00
9d78fff1b5 Only support autoscaling/v2beta2
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2020-01-29 15:59:20 +01:00
1c6f9e2ea6 Merge pull request #100 from affo/feat/influxdb-collector
feat(collector): add InfluxDB collector
2020-01-24 10:56:52 +01:00
c0eda7cd1e adding tests for collector creation
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-24 09:54:35 +01:00
75f3e48f70 address szuecs review
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-24 09:07:56 +01:00
5b55bea994 feat(collector): add InfluxDB collector
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-22 10:09:29 +01:00
4412e3dca4 Merge pull request #92 from zalando-incubator/njuettner-patch
Updating golangci
2019-11-26 14:49:02 +01:00
8f9277258c Increase timeout for golangci-lint
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-11-25 11:36:18 +01:00
8c3fef45fd Updating golangci
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
Signed-off-by: Nick Jüttner <nick@zalando.de>
2019-11-25 10:56:22 +01:00
120950078c Fix #89 by copying the MatchLabels map instead of referencing it. (#90)
Signed-off-by: Johann Fuechsl <johann@fuechsl.co>
2019-11-07 14:38:26 +01:00
0790bc351a This fixes an issue with the type switch that was never able to fall (#88)
into cases of []<number>, being <number> a number type such as int,
float32, float64. This is because Go can't type cast slices of
interface{} out right because it's impossible to know the true types of
the slice members beforehand.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2019-11-05 09:43:25 +01:00
f6b2aede5b Support for JSONPath expressions that return arrays of values (#85)
* This is the initial implementation of support for JSONPath expressions
that return arrays of values instead of a single value.

This extends the
collector to define a few handy reducer functions that take in the slice
of float64 and return a single value. It also allows the user to define
which reducer function to use via the
"metric-config.<metricType>.<metricName>.json-path/reducer-func"
annotation, which
can have the values of 'avg', 'min', 'max' and 'sum'.

For instance, the Ruby puma webserver exposes metrics of the form of $.worker_status[*].last_status.pool_capacity that have to be consumed as an array of values to be properly targetted.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Renames "reducerFunc" to "aggregator" for consistency with other
collectors. Renames the annotation from
"metric-config.<metricType>.<metricName>.json-path/reducer-func" to "metric-config.<metricType>.<metricName>.json-path/aggregator".

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Return error instead of defaulting to the avg aggregator, when no valid
aggregator name was specified and the JSONPath value is a slice of
numbers.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Fix index out of range on initialized output slice that was found while
writing tests.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add tests for all added functions + NewJSONPathMetricsGetter

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add documentation on the `aggregator` option.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* reducer function -> aggregator function

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Fix comment to account for returned error.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2019-10-24 18:15:10 +02:00
7d5e719eb0 Merge pull request #86 from pinkavaj/fix-var-name
Fix variable name typo
2019-10-24 10:16:58 +02:00
7497a61a2c Fix variable name typo
Signed-off-by: Jiri Pinkava <jiri.pinkava@rossum.ai>
2019-10-24 09:45:22 +02:00
a72380125f Update SECURITY.md (#84)
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-10-22 16:11:29 +02:00
70c7fb843d Merge pull request #83 from zalando-incubator/ingress-collector
Skipper: simplify metrics collection
2019-10-22 16:07:49 +02:00
79533a5a93 Skipper: simplify metrics collection
* Drop MaxWeightedCollector (we don't want max anyway, we want sum)
 * Use Prometheus to add up all matching metrics and scale them; this
   has a nice side effect of ensuring that unused hostnames don't cause
   an error when collecting the metrics
 * Update the tests a bit

Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
2019-10-21 14:05:30 +02:00
2765ff9811 Merge pull request #68 from zalando-incubator/skipper-collector-averagevalue
Add support for averageValue for request-per-second Skipper metric
2019-10-10 08:09:30 +02:00
76d2f74743 Add support for averageValue for request-per-second Skipper metric
This adds support for `averageValue` for the `request-per-second` metric
based on Ingress Objects. This is only supported from Kubernetes
`>=v1.14` (https://github.com/kubernetes/kubernetes/pull/72872).

When defining the HPA with `autoscaling/v2beta1` you still need to
define `targetValue` even though it won't be used when `averageValue` is
set. Once we default to `autoscaling/v2beta2` this akward API will be
gone.

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-10-08 17:10:28 +02:00
0de5042d3d Update dependencies (#80)
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-10-08 16:20:47 +02:00
07c0e179b3 Fail on dirty and/or non-exact versions on master (#79)
* Fail on dirty and/or non-exact versions on master

Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>

* Prevent go from modifying go.mod

Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>

* Fix go.mod version

Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>

* Allow non-exact tag matches

Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
2019-10-01 11:54:37 +02:00
29ee953a16 Merge pull request #78 from zalando-incubator/return-err
When traffic switching is used, require a backend for the RPS metric
2019-09-27 17:56:35 +02:00
f78ef26857 When traffic switching is used, require a backend for the RPS metric
Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
2019-09-27 15:02:46 +02:00
a3c14e9dcb Merge pull request #76 from zalando-incubator/refactor-parsing
Prevent panic when parsing HPAs
2019-08-23 09:08:07 +02:00
b6b13fb31a Prevent panic when parsing HPAs
This is a slight refactoring/unification of how metric
labels/annotations are parsed and handled accross collectors. This is
done to prevent crashes when labels are not defined on external metrics.

Fix #69

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-08-22 08:09:28 +02:00
0a06691d39 Merge pull request #75 from edganiukov/master
collector/prometheus: add Prometheus URL (optional) as an annotation in HPA
2019-08-21 09:55:10 +02:00
2d1d51e829 collector/prometheus: add prometheus server (optional) as an annotation in HPA.
Signed-off-by: Eduard Ganiukov <eduard.ganiukov@swisscom.com>
2019-08-14 13:05:50 +02:00
41761e62df Merge pull request #71 from zalando-incubator/fix-rps-test
Skipper: fix the no annotation test so it makes more sense
2019-07-30 17:57:11 +02:00
ed4c93abbb Skipper: fix the no annotation test so it makes more sense
Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
2019-07-30 17:50:39 +02:00
b2194ca136 Correctly handle zero-weight backends (#70)
Signed-off-by: Alexey Ermakov <alexey.ermakov@zalando.de>
2019-07-30 17:32:36 +02:00
bd0dd10e72 Use proper tags for docker images (#66)
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-07-26 20:36:03 +02:00
461869c69b Fix response on no metrics found (#67)
Fixes the response from `GetMetricsBySelector` in case no metrics are
found. This issue caused a panic in kube-controller-manager:
https://github.com/kubernetes/kubernetes/pull/80392

Fix #40

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-07-26 20:35:17 +02:00
9950851cad Merge pull request #64 from zalando-incubator/v2beta1-conversion
Support autoscaling v2beta1
2019-07-26 19:27:31 +02:00
d85fee795e Don't import v2beta2 twice
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-07-21 12:36:32 +02:00
990f8eab14 Ignore files with upstream code
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-07-21 12:11:59 +02:00
9a396bde68 Support autoscaling v2beta1
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-07-21 11:22:55 +02:00
aa8d24dbcf Merge pull request #63 from zalando-incubator/document-pod-https
Document how to use HTTPS for pod collector
2019-07-15 09:09:39 +02:00
19e9be9671 Document how to use HTTPS for pod collector
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-07-14 15:49:00 +02:00
8fed8538ad Merge pull request #53 from zalando-incubator/prometheus-external-metric
Allow Prometheus metrics for External target
2019-05-19 23:19:29 +02:00
9a234cbdac add AWS IAM policy as requirement to integrate with AWS SQS (#58)
Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
2019-05-17 11:07:18 +02:00
ffff8c2040 Prevent leaking collectors when HPA gets updated (#54)
* Prevent leaking collectors when HPA gets updated

This fixes an issue where collectors would be leaking when HPAs are
getting updated.

Fix this by stopping the collector started for the previous version of
the HPA.

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>

* Add tests to verify old collector is removed

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-05-08 10:34:49 +02:00
9d2760e3fc Allow Prometheus metrics for External target
Fix #45

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-04-30 23:22:18 +02:00
5598b4d012 Merge pull request #52 from zalando-incubator/fix/golangci-lint-errors
Fix all errors from golangci-lint command
2019-04-27 16:11:35 +02:00
888e76b748 Fix all errors from golangci-lint command
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-27 15:54:15 +02:00
7c848a1282 Max collector should ignore only no result errors (#50)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-27 14:54:20 +02:00
445c7c874a Added golangci linter (#51)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-27 13:35:55 +02:00
2eed3e64d0 Return a value when atleast one of the metrics returns a value (#47)
* Return a value when atleast one of the metrics returns a value

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Added test for max weighted collector

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-26 17:54:25 +02:00
f097e63401 Add build cache for CDP build (#49)
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-04-26 16:47:00 +02:00
ca4e2008c4 Merge pull request #48 from zalando-incubator/fix-config-key
Remove unused Configuration key from MetricConfig
2019-04-26 16:34:48 +02:00
3f019a1ceb Remove unused Configuration key from MetricConfig
This fixes an issue of setting up a ZMON collector where the incorrect
key `Configuration` was used, which was not initialized in the metrics
config parser. The `Config` key is the right one to use.

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-04-26 16:19:53 +02:00
5a6f4997bd Add the labels from the zmon check into the config object (#46)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-18 13:34:11 +02:00
8db22f38a3 Fixed metric labels so that metric are tagged correct. Also added nil check (#44)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-05 16:05:37 +02:00
d5b803d923 Merge pull request #43 from zalando-incubator/fix/panic
Fix nil dereference panics for the Annotation config parser
2019-04-04 15:00:46 +02:00
14f13495af Fix nil dereference panics
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-04 10:57:08 +02:00
dfeae82cae Upgrade all pacakges to autoscalingv2beta2 (#39)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-04-03 10:23:52 +02:00
04b212175e Added clarification of dummy-pod (#42)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-03-25 16:15:54 +01:00
478c97d5cb Added instructions on configuring adapter to collect ingress metrics (#34)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-02-24 21:03:55 +01:00
f4efa2898b Handle condition when backend weights only sometimes present (#33)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-02-01 10:59:42 +01:00
7258cb7800 Merge pull request #31 from zalando-incubator/fix/no-backend
Fix case when backend is not set
2019-01-23 11:44:54 +01:00
56dd8b52e0 Fix case when backend is not set
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-01-23 11:36:09 +01:00
248acf0311 Added logic and test case for weight of backend is 0 (#29)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-01-18 17:39:01 +01:00
75633d3082 Changed request-per-second metric separator to a comma (#28)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-01-17 13:46:57 +01:00
72aa672f51 Added weighting of RPS metrics based on backend weights (#27)
* Added weighting of rps metrics based on backend weights

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Updated documented with instructions on how to use the backend weighting

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Changed separator for RPS metric and added flag to specify backend weights annotation.

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Allow for multiple backends with for weighting.

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2019-01-17 13:13:52 +01:00
f49f7821dc Fix json serialization naming for zmon queries (#25)
Signed-off-by: Cagdas Senol <cagdas.senol@zalando.de>
2019-01-08 16:24:46 +01:00
35 changed files with 2435 additions and 752 deletions

19
.golangci.yml Normal file
View File

@ -0,0 +1,19 @@
run:
linters-settings:
golint:
min-confidence: 0.9
linters:
disable-all: true
enable:
- staticcheck
- ineffassign
- golint
- goimports
- errcheck
issues:
exclude-rules:
# Exclude some staticcheck messages
- linters:
- staticcheck
text: "SA9003:"

View File

@ -2,17 +2,19 @@ language: go
dist: xenial
go:
- "1.11.x"
- "1.13.x"
env:
- GO111MODULE=on
- GO111MODULE=on GOLANGCI_RELEASE="v1.21.0"
before_install:
- go get github.com/mattn/goveralls
- go get github.com/lawrencewoodman/roveralls
- GO111MODULE=off go get github.com/mattn/goveralls
- GO111MODULE=off go get github.com/lawrencewoodman/roveralls
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANGCI_RELEASE}
script:
- make test
- make build.docker
- make check
- roveralls
- goveralls -v -coverprofile=roveralls.coverprofile -service=travis-ci

View File

@ -19,8 +19,8 @@ test:
go test -v $(GOPKGS)
check:
golint $(GOPKGS)
go vet -v $(GOPKGS)
go mod download
golangci-lint run --timeout=2m ./...
build.local: build/$(BINARY)
build.linux: build/linux/$(BINARY)

295
README.md

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,8 @@
We acknowledge that every line of code that we write may potentially contain security issues.
We are trying to deal with it responsibly and provide patches as quickly as possible.
We are trying to deal with it responsibly and provide patches as quickly as possible. If you have anything to report to us please use the following channels:
We host our bug bounty program on HackerOne, it is currently private, therefore if you would like to report a vulnerability and get rewarded for it, please ask to join our program by filling this form:
Email: Tech-Security@zalando.de
OR
Submit your vulnerability report through our bug bounty program at: https://hackerone.com/zalando
https://corporate.zalando.com/en/services-and-contact#security-form
You can also send you report via this form if you do not want to join our bug bounty program and just want to report a vulnerability or security issue.

View File

@ -2,7 +2,13 @@ version: "2017-09-20"
pipeline:
- id: build
overlay: ci/golang
cache:
paths:
- /go/pkg/mod # pkg cache for Go modules
- ~/.cache/go-build # Go build cache
type: script
env:
GOFLAGS: "-mod=readonly"
commands:
- desc: test
cmd: |
@ -14,7 +20,11 @@ pipeline:
cmd: |
if [[ $CDP_TARGET_BRANCH == master && ! $CDP_PULL_REQUEST_NUMBER ]]; then
IMAGE=registry-write.opensource.zalan.do/teapot/kube-metrics-adapter
VERSION=$(git describe --tags --always)
else
IMAGE=registry-write.opensource.zalan.do/teapot/kube-metrics-adapter-test
VERSION=$CDP_BUILD_VERSION
fi
IMAGE=$IMAGE VERSION=$CDP_BUILD_VERSION make build.push
IMAGE=$IMAGE VERSION=$VERSION make build.docker
git diff --stat --exit-code
IMAGE=$IMAGE VERSION=$VERSION make build.push

View File

@ -1,4 +1,4 @@
apiVersion: autoscaling/v2beta1
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: custom-metrics-consumer
@ -25,24 +25,36 @@ spec:
# - type: Resource
# resource:
# name: cpu
# targetAverageUtilization: 50
# current:
# averageUtilization: 50
- type: Pods
pods:
metricName: queue-length
targetAverageValue: 1k
metric:
name: queue-length
target:
averageValue: 1k
type: AverageValue
- type: Object
object:
metricName: requests-per-second
target:
describedObject:
apiVersion: extensions/v1beta1
kind: Ingress
name: custom-metrics-consumer
targetValue: 10 # this will be treated as targetAverageValue
metric:
name: requests-per-second
target:
averageValue: "10"
type: AverageValue
- type: External
external:
metricName: sqs-queue-length
metricSelector:
matchLabels:
queue-name: foobar
region: eu-central-1
targetAverageValue: 30
metric:
name: sqs-queue-length
selector:
matchLabels:
queue-name: foobar
region: eu-central-1
target:
averageValue: "30"
type: AverageValue

View File

@ -3,13 +3,15 @@ package main
import (
"flag"
"fmt"
"log"
"net/http"
"time"
)
func metricsHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte(fmt.Sprintf(`{"queue": {"length": %d}}`, size)))
_, err := w.Write([]byte(fmt.Sprintf(`{"queue": {"length": %d}}`, size)))
log.Fatalf("failed to write: %v", err)
}
var (
@ -29,5 +31,5 @@ func main() {
ReadTimeout: 5 * time.Second,
}
server.ListenAndServe()
log.Fatal(server.ListenAndServe())
}

91
go.mod
View File

@ -1,91 +1,32 @@
module github.com/zalando-incubator/kube-metrics-adapter
require (
bitbucket.org/ww/goautoneg v0.0.0-20120707110453-75cd24fc2f2c // indirect
github.com/BurntSushi/toml v0.3.0 // indirect
github.com/NYTimes/gziphandler v1.0.1 // indirect
github.com/PuerkitoBio/purell v1.1.0 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/aws/aws-sdk-go v1.16.6
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/boltdb/bolt v1.3.1 // indirect
github.com/coreos/bbolt v1.3.0 // indirect
github.com/coreos/etcd v3.3.9+incompatible // indirect
github.com/coreos/go-semver v0.2.0 // indirect
github.com/coreos/go-systemd v0.0.0-20180705093442-88bfeed483d3 // indirect
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful v2.8.0+incompatible // indirect
github.com/emicklei/go-restful-swagger12 v0.0.0-20170926063155-7524189396c6 // indirect
github.com/evanphx/json-patch v3.0.0+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-openapi/jsonpointer v0.0.0-20180322222829-3a0015ad55fa // indirect
github.com/go-openapi/jsonreference v0.0.0-20180322222742-3fb327e6747d // indirect
github.com/go-openapi/spec v0.0.0-20180801175345-384415f06ee2 // indirect
github.com/go-openapi/swag v0.0.0-20180715190254-becd2f08beaf // indirect
github.com/gogo/protobuf v1.1.1 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/gorilla/websocket v1.3.0 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47 // indirect
github.com/hpcloud/tail v1.0.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/json-iterator/go v1.1.5 // indirect
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20180824182428-26e5299457d3
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/influxdata/influxdb-client-go v0.1.4
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/onsi/ginkgo v1.6.0 // indirect
github.com/onsi/gomega v1.4.1 // indirect
github.com/pborman/uuid v0.0.0-20180122190007-c65b2f87fee3 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v0.9.0-pre1.0.20180824101016-4eb539fa85a2
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 // indirect
github.com/sirupsen/logrus v1.0.6
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/sirupsen/logrus v1.4.2
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.2 // indirect
github.com/stretchr/testify v1.2.2
github.com/stretchr/testify v1.3.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
github.com/ugorji/go v1.1.1 // indirect
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e // indirect
golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 // indirect
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 // indirect
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect
google.golang.org/appengine v1.2.0 // indirect
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 // indirect
google.golang.org/grpc v1.14.0 // indirect
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.2.1 // indirect
k8s.io/api v0.0.0-20180628040859-072894a440bd
k8s.io/apimachinery v0.0.0-20180621070125-103fd098999d
k8s.io/apiserver v0.0.0-20180628044425-01459b68eb5f
k8s.io/client-go v8.0.0+incompatible
k8s.io/kube-openapi v0.0.0-20180731170545-e3762e86a74c // indirect
k8s.io/metrics v0.0.0-20180718014405-6efa0bfaa5c1
k8s.io/api v0.0.0-20190918155943-95b840bb6a1f
k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655
k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad // indirect
k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90
k8s.io/component-base v0.0.0-20190918160511-547f6c5d7090
k8s.io/klog v0.4.0
k8s.io/metrics v0.0.0-20190226180357-f3f09b9076d1
)
go 1.13

479
go.sum

File diff suppressed because it is too large Load Diff

40
how-to/skipper_setup.md Normal file
View File

@ -0,0 +1,40 @@
# Skipper Prometheus Metrics Collection
The skipper-ingress pods should be configured to be scraped by Prometheus. This
can be done by Prometheus service discovery using discovery of Kubernetes services
or Kubernetes pods:
```yaml
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "9911"
prometheus.io/scrape: "true"
labels:
application: skipper-ingress
name: skipper-ingress
spec:
ports:
- port: 80
protocol: TCP
targetPort: 9999
selector:
application: skipper-ingress
type: ClusterIP
```
This [configuration](https://github.com/zalando-incubator/kubernetes-on-aws/blob/dev/cluster/manifests/prometheus/configmap.yaml#L69)
shows how prometheus is configured in our clusters to scrape service endpoints.
The annotations `prometheus.io/path`, `prometheus.io/port` and `prometheus.io/scrape`
instruct Prometheus to scrape all pods of this service on the port _9911_ and
the path `/metrics`.
When the `kube-metrics-adapter` is started the flag `--prometheus-server` should be set so that
the adapter can query prometheus to get aggregated metrics. When running in kubernetes it can
be the service address of the prometheus service like `http://prometheus.kube-system`.
With these settings the `kube-metrics-adapter` can provide `request-per-second` metrics for ingress
objects which are present in the cluster. The prometheus instances scrape the metrics from
the `skipper-ingress` pods. The adapter then queries prometheus to get the metric and then
provides them to the API server when requested.

View File

@ -23,7 +23,7 @@ import (
"github.com/zalando-incubator/kube-metrics-adapter/pkg/server"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/component-base/logs"
)
func main() {

101
pkg/annotations/parser.go Normal file
View File

@ -0,0 +1,101 @@
package annotations
import (
"fmt"
"strings"
"time"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
)
const (
customMetricsPrefix = "metric-config."
perReplicaMetricsConfKey = "per-replica"
intervalMetricsConfKey = "interval"
)
type AnnotationConfigs struct {
CollectorName string
Configs map[string]string
PerReplica bool
Interval time.Duration
}
type MetricConfigKey struct {
Type autoscalingv2.MetricSourceType
MetricName string
}
type AnnotationConfigMap map[MetricConfigKey]*AnnotationConfigs
func (m AnnotationConfigMap) Parse(annotations map[string]string) error {
for key, val := range annotations {
if !strings.HasPrefix(key, customMetricsPrefix) {
continue
}
parts := strings.Split(key, "/")
if len(parts) != 2 {
// TODO: error?
continue
}
configs := strings.Split(parts[0], ".")
if len(configs) != 4 {
// TODO: error?
continue
}
key := MetricConfigKey{
MetricName: configs[2],
}
switch configs[1] {
case "pods":
key.Type = autoscalingv2.PodsMetricSourceType
case "object":
key.Type = autoscalingv2.ObjectMetricSourceType
default:
key.Type = autoscalingv2.ExternalMetricSourceType
}
metricCollector := configs[3]
config, ok := m[key]
if !ok {
config = &AnnotationConfigs{
CollectorName: metricCollector,
Configs: map[string]string{},
}
m[key] = config
}
// TODO: fail if collector name doesn't match
if config.CollectorName != metricCollector {
continue
}
if parts[1] == perReplicaMetricsConfKey {
config.PerReplica = true
continue
}
if parts[1] == intervalMetricsConfKey {
interval, err := time.ParseDuration(val)
if err != nil {
return fmt.Errorf("failed to parse interval value %s for %s: %v", val, key, err)
}
config.Interval = interval
continue
}
config.Configs[parts[1]] = val
}
return nil
}
func (m AnnotationConfigMap) GetAnnotationConfig(metricName string, metricType autoscalingv2.MetricSourceType) (*AnnotationConfigs, bool) {
key := MetricConfigKey{MetricName: metricName, Type: metricType}
config, ok := m[key]
return config, ok
}

View File

@ -0,0 +1,106 @@
package annotations
import (
"testing"
"github.com/stretchr/testify/require"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
)
func TestParser(t *testing.T) {
for _, tc := range []struct {
Name string
Annotations map[string]string
MetricName string
MetricType autoscalingv2.MetricSourceType
ExpectedConfig map[string]string
PerReplica bool
}{
{
Name: "no annotations",
Annotations: map[string]string{},
ExpectedConfig: map[string]string{},
},
{
Name: "pod metrics",
Annotations: map[string]string{
"metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps",
"metric-config.pods.requests-per-second.json-path/path": "/metrics",
"metric-config.pods.requests-per-second.json-path/port": "9090",
"metric-config.pods.requests-per-second.json-path/scheme": "https",
},
MetricName: "requests-per-second",
MetricType: autoscalingv2.PodsMetricSourceType,
ExpectedConfig: map[string]string{
"json-key": "$.http_server.rps",
"path": "/metrics",
"port": "9090",
"scheme": "https",
},
},
{
Name: "prometheus metrics",
Annotations: map[string]string{
"metric-config.object.processed-events-per-second.prometheus/query": "scalar(sum(rate(event-service_events_count{application=\"event-service\",processed=\"true\"}[1m])))",
"metric-config.object.processed-events-per-second.prometheus/per-replica": "true",
},
MetricName: "processed-events-per-second",
MetricType: autoscalingv2.ObjectMetricSourceType,
ExpectedConfig: map[string]string{
"query": "scalar(sum(rate(event-service_events_count{application=\"event-service\",processed=\"true\"}[1m])))",
},
PerReplica: true,
},
{
Name: "zmon collector",
Annotations: map[string]string{
"metric-config.external.zmon-check.zmon/key": "custom.*",
"metric-config.external.zmon-check.zmon/tag-application": "my-custom-app-*",
},
MetricName: "zmon-check",
MetricType: autoscalingv2.ExternalMetricSourceType,
ExpectedConfig: map[string]string{
"key": "custom.*",
"tag-application": "my-custom-app-*",
},
PerReplica: false,
},
{
Name: "influxdb metrics",
Annotations: map[string]string{
"metric-config.external.flux-query.influxdb/range1m": `from(bucket: "?") |> range(start: -1m)`,
"metric-config.external.flux-query.influxdb/range2m": `from(bucket: "?") |> range(start: -2m)`,
"metric-config.external.flux-query.influxdb/range3m": `from(bucket: "?") |> range(start: -3m)`,
"metric-config.external.flux-query.influxdb/address": "http://localhost:9999",
"metric-config.external.flux-query.influxdb/token": "sEcr3TT0ken",
"metric-config.external.flux-query.influxdb/org-id": "deadbeef",
},
MetricName: "flux-query",
MetricType: autoscalingv2.ExternalMetricSourceType,
ExpectedConfig: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org-id": "deadbeef",
},
},
} {
t.Run(tc.Name, func(t *testing.T) {
hpaMap := make(AnnotationConfigMap)
err := hpaMap.Parse(tc.Annotations)
require.NoError(t, err)
config, present := hpaMap.GetAnnotationConfig(tc.MetricName, tc.MetricType)
if len(tc.ExpectedConfig) == 0 {
require.False(t, present)
return
}
require.True(t, present)
for k, v := range tc.ExpectedConfig {
require.Equal(t, v, config.Configs[k])
}
require.Equal(t, tc.PerReplica, config.PerReplica)
})
}
}

View File

@ -9,7 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
@ -32,13 +32,13 @@ func NewAWSCollectorPlugin(sessions map[string]*session.Session) *AWSCollectorPl
}
// NewCollector initializes a new skipper collector from the specified HPA.
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Name {
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Metric.Name {
case AWSSQSQueueLengthMetric:
return NewAWSSQSCollector(c.sessions, config, interval)
}
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
}
type AWSSQSCollector struct {
@ -47,18 +47,20 @@ type AWSSQSCollector struct {
region string
queueURL string
queueName string
labels map[string]string
metricName string
metricType autoscalingv2beta1.MetricSourceType
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
}
func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConfig, interval time.Duration) (*AWSSQSCollector, error) {
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for queue is not specified")
}
name, ok := config.Labels[sqsQueueNameLabelKey]
name, ok := config.Config[sqsQueueNameLabelKey]
if !ok {
return nil, fmt.Errorf("sqs queue name not specified on metric")
}
region, ok := config.Labels[sqsQueueRegionLabelKey]
region, ok := config.Config[sqsQueueRegionLabelKey]
if !ok {
return nil, fmt.Errorf("sqs queue region is not specified on metric")
}
@ -83,9 +85,8 @@ func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConf
interval: interval,
queueURL: aws.StringValue(resp.QueueUrl),
queueName: name,
metricName: config.Name,
metric: config.Metric,
metricType: config.Type,
labels: config.Labels,
}, nil
}
@ -109,8 +110,8 @@ func (c *AWSSQSCollector) GetMetrics() ([]CollectedMetric, error) {
metricValue := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metricName,
MetricLabels: c.labels,
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewQuantity(int64(i), resource.DecimalSI),
},

View File

@ -2,22 +2,16 @@ package collector
import (
"fmt"
"strings"
"time"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/annotations"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/metrics/pkg/apis/custom_metrics"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
customMetricsPrefix = "metric-config."
perReplicaMetricsConfKey = "per-replica"
intervalMetricsConfKey = "interval"
)
type ObjectReference struct {
autoscalingv2beta1.CrossVersionObjectReference
autoscalingv2.CrossVersionObjectReference
Namespace string
}
@ -49,7 +43,7 @@ func NewCollectorFactory() *CollectorFactory {
}
type CollectorPlugin interface {
NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
}
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
@ -106,9 +100,9 @@ func (c *CollectorFactory) RegisterExternalCollector(metrics []string, plugin Co
}
}
func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Type {
case autoscalingv2beta1.PodsMetricSourceType:
case autoscalingv2.PodsMetricSourceType:
// first try to find a plugin by format
if plugin, ok := c.podsPlugins.Named[config.CollectorName]; ok {
return plugin.NewCollector(hpa, config, interval)
@ -118,7 +112,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
if c.podsPlugins.Any != nil {
return c.podsPlugins.Any.NewCollector(hpa, config, interval)
}
case autoscalingv2beta1.ObjectMetricSourceType:
case autoscalingv2.ObjectMetricSourceType:
// first try to find a plugin by kind
if kinds, ok := c.objectPlugins.Named[config.ObjectReference.Kind]; ok {
if plugin, ok := kinds.Named[config.CollectorName]; ok {
@ -139,8 +133,8 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
if c.objectPlugins.Any.Any != nil {
return c.objectPlugins.Any.Any.NewCollector(hpa, config, interval)
}
case autoscalingv2beta1.ExternalMetricSourceType:
if plugin, ok := c.externalPlugins[config.Name]; ok {
case autoscalingv2.ExternalMetricSourceType:
if plugin, ok := c.externalPlugins[config.Metric.Name]; ok {
return plugin.NewCollector(hpa, config, interval)
}
}
@ -148,31 +142,15 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
}
func getObjectReference(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, metricName string) (custom_metrics.ObjectReference, error) {
for _, metric := range hpa.Spec.Metrics {
if metric.Type == autoscalingv2beta1.ObjectMetricSourceType && metric.Object.MetricName == metricName {
return custom_metrics.ObjectReference{
APIVersion: metric.Object.Target.APIVersion,
Kind: metric.Object.Target.Kind,
Name: metric.Object.Target.Name,
Namespace: hpa.Namespace,
}, nil
}
}
return custom_metrics.ObjectReference{}, fmt.Errorf("failed to find object reference")
}
type MetricTypeName struct {
Type autoscalingv2beta1.MetricSourceType
Name string
Type autoscalingv2.MetricSourceType
Metric autoscalingv2.MetricIdentifier
}
type CollectedMetric struct {
Type autoscalingv2beta1.MetricSourceType
Type autoscalingv2.MetricSourceType
Custom custom_metrics.MetricValue
External external_metrics.ExternalMetricValue
Labels map[string]string
}
type Collector interface {
@ -187,83 +165,17 @@ type MetricConfig struct {
ObjectReference custom_metrics.ObjectReference
PerReplica bool
Interval time.Duration
Labels map[string]string
}
func parseCustomMetricsAnnotations(annotations map[string]string) (map[MetricTypeName]*MetricConfig, error) {
metrics := make(map[MetricTypeName]*MetricConfig)
for key, val := range annotations {
if !strings.HasPrefix(key, customMetricsPrefix) {
continue
}
parts := strings.Split(key, "/")
if len(parts) != 2 {
// TODO: error?
continue
}
configs := strings.Split(parts[0], ".")
if len(configs) != 4 {
// TODO: error?
continue
}
metricTypeName := MetricTypeName{
Name: configs[2],
}
switch configs[1] {
case "pods":
metricTypeName.Type = autoscalingv2beta1.PodsMetricSourceType
case "object":
metricTypeName.Type = autoscalingv2beta1.ObjectMetricSourceType
}
metricCollector := configs[3]
config, ok := metrics[metricTypeName]
if !ok {
config = &MetricConfig{
MetricTypeName: metricTypeName,
CollectorName: metricCollector,
Config: map[string]string{},
}
metrics[metricTypeName] = config
}
// TODO: fail if collector name doesn't match
if config.CollectorName != metricCollector {
continue
}
if parts[1] == perReplicaMetricsConfKey {
config.PerReplica = true
continue
}
if parts[1] == intervalMetricsConfKey {
interval, err := time.ParseDuration(val)
if err != nil {
return nil, fmt.Errorf("failed to parse interval value %s for %s: %v", val, key, err)
}
config.Interval = interval
continue
}
config.Config[parts[1]] = val
}
return metrics, nil
MetricSpec autoscalingv2.MetricSpec
}
// ParseHPAMetrics parses the HPA object into a list of metric configurations.
func ParseHPAMetrics(hpa *autoscalingv2beta1.HorizontalPodAutoscaler) ([]*MetricConfig, error) {
func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfig, error) {
metricConfigs := make([]*MetricConfig, 0, len(hpa.Spec.Metrics))
// TODO: validate that the specified metric names are defined
// in the HPA
configs, err := parseCustomMetricsAnnotations(hpa.Annotations)
parser := make(annotations.AnnotationConfigMap)
err := parser.Parse(hpa.Annotations)
if err != nil {
return nil, err
}
@ -275,39 +187,49 @@ func ParseHPAMetrics(hpa *autoscalingv2beta1.HorizontalPodAutoscaler) ([]*Metric
var ref custom_metrics.ObjectReference
switch metric.Type {
case autoscalingv2beta1.PodsMetricSourceType:
typeName.Name = metric.Pods.MetricName
case autoscalingv2beta1.ObjectMetricSourceType:
typeName.Name = metric.Object.MetricName
case autoscalingv2.PodsMetricSourceType:
typeName.Metric = metric.Pods.Metric
case autoscalingv2.ObjectMetricSourceType:
typeName.Metric = metric.Object.Metric
ref = custom_metrics.ObjectReference{
APIVersion: metric.Object.Target.APIVersion,
Kind: metric.Object.Target.Kind,
Name: metric.Object.Target.Name,
APIVersion: metric.Object.DescribedObject.APIVersion,
Kind: metric.Object.DescribedObject.Kind,
Name: metric.Object.DescribedObject.Name,
Namespace: hpa.Namespace,
}
case autoscalingv2beta1.ExternalMetricSourceType:
typeName.Name = metric.External.MetricName
case autoscalingv2beta1.ResourceMetricSourceType:
case autoscalingv2.ExternalMetricSourceType:
typeName.Metric = metric.External.Metric
case autoscalingv2.ResourceMetricSourceType:
continue // kube-metrics-adapter does not collect resource metrics
}
if config, ok := configs[typeName]; ok {
config.ObjectReference = ref
metricConfigs = append(metricConfigs, config)
continue
}
config := &MetricConfig{
MetricTypeName: typeName,
ObjectReference: ref,
Config: map[string]string{},
MetricSpec: metric,
}
if metric.Type == autoscalingv2beta1.ExternalMetricSourceType {
config.Labels = metric.External.MetricSelector.MatchLabels
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
metric.External.Metric.Selector != nil &&
metric.External.Metric.Selector.MatchLabels != nil {
for k, v := range metric.External.Metric.Selector.MatchLabels {
config.Config[k] = v
}
}
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)
if present {
config.CollectorName = annotationConfigs.CollectorName
config.Interval = annotationConfigs.Interval
config.PerReplica = annotationConfigs.PerReplica
// configs specified in annotations takes precedence
// over labels
for k, v := range annotationConfigs.Configs {
config.Config[k] = v
}
}
metricConfigs = append(metricConfigs, config)
}
return metricConfigs, nil
}

View File

@ -0,0 +1,152 @@
package collector
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb-client-go"
"k8s.io/api/autoscaling/v2beta2"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
InfluxDBMetricName = "flux-query"
influxDBAddressKey = "address"
influxDBTokenKey = "token"
influxDBOrgIDKey = "org-id"
influxDBQueryNameLabelKey = "query-name"
)
type InfluxDBCollectorPlugin struct {
kubeClient kubernetes.Interface
address string
token string
orgID string
}
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, orgID string) (*InfluxDBCollectorPlugin, error) {
return &InfluxDBCollectorPlugin{
kubeClient: client,
address: address,
token: token,
orgID: orgID,
}, nil
}
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewInfluxDBCollector(p.address, p.orgID, p.token, config, interval)
}
type InfluxDBCollector struct {
address string
token string
orgID string
influxDBClient *influxdb.Client
interval time.Duration
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
query string
}
func NewInfluxDBCollector(address string, token string, orgID string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
collector := &InfluxDBCollector{
interval: interval,
metric: config.Metric,
metricType: config.Type,
}
switch configType := config.Type; configType {
case autoscalingv2.ObjectMetricSourceType:
return nil, fmt.Errorf("InfluxDB does not support object, but only external custom metrics")
case autoscalingv2.ExternalMetricSourceType:
// `metricSelector` is flattened into the MetricConfig.Config.
queryName, ok := config.Config[influxDBQueryNameLabelKey]
if !ok {
return nil, fmt.Errorf("selector for Flux query is not specified, "+
"please add metricSelector.matchLabels.%s: <...> to .yml description", influxDBQueryNameLabelKey)
}
if query, ok := config.Config[queryName]; ok {
// TODO(affo): validate the query once this is done:
// https://github.com/influxdata/influxdb-client-go/issues/73.
collector.query = query
} else {
return nil, fmt.Errorf("no Flux query defined for metric \"%s\"", config.Metric.Name)
}
default:
return nil, fmt.Errorf("unknown metric type: %v", configType)
}
// Use custom InfluxDB config if defined in HPA annotation.
if v, ok := config.Config[influxDBAddressKey]; ok {
address = v
}
if v, ok := config.Config[influxDBTokenKey]; ok {
token = v
}
if v, ok := config.Config[influxDBOrgIDKey]; ok {
orgID = v
}
influxDbClient, err := influxdb.New(address, token)
if err != nil {
return nil, err
}
collector.address = address
collector.token = token
collector.orgID = orgID
collector.influxDBClient = influxDbClient
return collector, nil
}
// queryResult is for unmarshaling the result from InfluxDB.
// The FluxQuery should make it so that the resulting table contains the column "metricvalue".
type queryResult struct {
MetricValue float64
}
// getValue returns the first result gathered from an InfluxDB instance.
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.orgID)
if err != nil {
return resource.Quantity{}, err
}
defer res.Close()
// Keeping just the first result.
if res.Next() {
qr := queryResult{}
if err := res.Unmarshal(&qr); err != nil {
return resource.Quantity{}, fmt.Errorf("error in unmarshaling query result: %v", err)
}
return *resource.NewMilliQuantity(int64(qr.MetricValue*1000), resource.DecimalSI), nil
}
if err := res.Err; err != nil {
return resource.Quantity{}, fmt.Errorf("error in query result: %v", err)
}
return resource.Quantity{}, fmt.Errorf("empty result returned")
}
func (c *InfluxDBCollector) GetMetrics() ([]CollectedMetric, error) {
v, err := c.getValue()
if err != nil {
return nil, err
}
cm := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{
Time: time.Now().UTC(),
},
Value: v,
},
}
return []CollectedMetric{cm}, nil
}
func (c *InfluxDBCollector) Interval() time.Duration {
return c.interval
}

View File

@ -0,0 +1,155 @@
package collector
import (
"strings"
"testing"
"time"
"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestInfluxDBCollector_New(t *testing.T) {
t.Run("simple", func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
// This is actually useless, because the selector should be flattened in Config when parsing.
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"query-name": "range2m",
},
},
},
},
CollectorName: "influxdb",
Config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"query-name": "range2m",
},
}
c, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.orgID, "deadbeef"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.token, "secret"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.query, `from(bucket: "?") |> range(start: -2m)`; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
})
t.Run("override params", func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"query-name": "range2m",
},
},
},
},
CollectorName: "influxdb",
Config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org-id": "deadbeef1234",
"query-name": "range3m",
},
}
c, err := NewInfluxDBCollector("http://localhost:8888", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.orgID, "deadbeef1234"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.token, "sEcr3TT0ken"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.query, `from(bucket: "?") |> range(start: -3m)`; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
})
// Errors.
for _, tc := range []struct {
name string
mTypeName MetricTypeName
config map[string]string
errorStartsWith string
}{
{
name: "object metric",
mTypeName: MetricTypeName{
Type: v2beta2.ObjectMetricSourceType,
},
errorStartsWith: "InfluxDB does not support object",
},
{
name: "no selector",
mTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
},
},
// The selector should be flattened into the config by the parsing step, but it isn't.
config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
},
errorStartsWith: "selector for Flux query is not specified",
},
{
name: "referencing non-existing query",
mTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
},
},
config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"query-name": "rangeXm",
},
errorStartsWith: "no Flux query defined for metric",
},
} {
t.Run("error - "+tc.name, func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: tc.mTypeName,
CollectorName: "influxdb",
Config: tc.config,
}
_, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err == nil {
t.Fatal("expected error got none")
}
if want, got := tc.errorStartsWith, err.Error(); !strings.HasPrefix(got, want) {
t.Fatalf("%s should start with %s", got, want)
}
})
}
}

View File

@ -4,23 +4,25 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"net/url"
"strconv"
"time"
"github.com/oliveagle/jsonpath"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
)
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
// querying the pods metrics endpoint and lookup the metric value as defined by
// the json path query.
type JSONPathMetricsGetter struct {
jsonPath *jsonpath.Compiled
scheme string
path string
port int
jsonPath *jsonpath.Compiled
scheme string
path string
port int
aggregator string
}
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
@ -28,12 +30,12 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
getter := &JSONPathMetricsGetter{}
if v, ok := config["json-key"]; ok {
pat, err := jsonpath.Compile(v)
path, err := jsonpath.Compile(v)
if err != nil {
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
}
getter.jsonPath = pat
getter.jsonPath = path
}
if v, ok := config["scheme"]; ok {
@ -52,13 +54,17 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
getter.port = n
}
if v, ok := config["aggregator"]; ok {
getter.aggregator = v
}
return getter, nil
}
// GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path
// query.
func (g *JSONPathMetricsGetter) GetMetric(pod *v1.Pod) (float64, error) {
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
if err != nil {
return 0, err
@ -83,13 +89,40 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *v1.Pod) (float64, error) {
return float64(res), nil
case float64:
return res, nil
case []interface{}:
s, err := castSlice(res)
if err != nil {
return 0, err
}
return reduce(s, g.aggregator)
default:
return 0, fmt.Errorf("unsupported type %T", res)
}
}
// castSlice takes a slice of interface and returns a slice of float64 if all
// values in slice were castable, else returns an error
func castSlice(in []interface{}) ([]float64, error) {
out := []float64{}
for _, v := range in {
switch v := v.(type) {
case int:
out = append(out, float64(v))
case float32:
out = append(out, float64(v))
case float64:
out = append(out, v)
default:
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
}
}
return out, nil
}
// getPodMetrics returns the content of the pods metrics endpoint.
func getPodMetrics(pod *v1.Pod, scheme, path string, port int) ([]byte, error) {
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" {
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
}
@ -131,3 +164,64 @@ func getPodMetrics(pod *v1.Pod, scheme, path string, port int) ([]byte, error) {
return data, nil
}
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
func reduce(values []float64, aggregator string) (float64, error) {
switch aggregator {
case "avg":
return avg(values), nil
case "min":
return min(values), nil
case "max":
return max(values), nil
case "sum":
return sum(values), nil
default:
return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator)
}
}
// avg implements the average mathematical function over a slice of float64
func avg(values []float64) float64 {
sum := sum(values)
return sum / float64(len(values))
}
// min implements the absolute minimum mathematical function over a slice of float64
func min(values []float64) float64 {
// initialized with positive infinity, all finite numbers are smaller than it
curMin := math.Inf(1)
for _, v := range values {
if v < curMin {
curMin = v
}
}
return curMin
}
// max implements the absolute maximum mathematical function over a slice of float64
func max(values []float64) float64 {
// initialized with negative infinity, all finite numbers are bigger than it
curMax := math.Inf(-1)
for _, v := range values {
if v > curMax {
curMax = v
}
}
return curMax
}
// sum implements the summation mathematical function over a slice of float64
func sum(values []float64) float64 {
res := 0.0
for _, v := range values {
res += v
}
return res
}

View File

@ -0,0 +1,105 @@
package collector
import (
"testing"
"github.com/oliveagle/jsonpath"
"github.com/stretchr/testify/require"
)
func TestNewJSONPathMetricsGetter(t *testing.T) {
configNoAggregator := map[string]string{
"json-key": "$.value",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
require.NoError(t, err1)
require.Equal(t, &JSONPathMetricsGetter{
jsonPath: jpath1,
scheme: "http",
path: "/metrics",
port: 9090,
}, getterNoAggregator)
configAggregator := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "9090",
"aggregator": "avg",
}
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
require.NoError(t, err2)
require.Equal(t, &JSONPathMetricsGetter{
jsonPath: jpath2,
scheme: "http",
path: "/metrics",
port: 9090,
aggregator: "avg",
}, getterAggregator)
configErrorJSONPath := map[string]string{
"json-key": "{}",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
_, err3 := NewJSONPathMetricsGetter(configErrorJSONPath)
require.Error(t, err3)
configErrorPort := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "a9090",
}
_, err4 := NewJSONPathMetricsGetter(configErrorPort)
require.Error(t, err4)
}
func TestCastSlice(t *testing.T) {
res1, err1 := castSlice([]interface{}{1, 2, 3})
require.NoError(t, err1)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
require.NoError(t, err2)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
require.NoError(t, err3)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
require.Equal(t, []float64(nil), res4)
}
func TestReduce(t *testing.T) {
average, err1 := reduce([]float64{1, 2, 3}, "avg")
require.NoError(t, err1)
require.Equal(t, 2.0, average)
min, err2 := reduce([]float64{1, 2, 3}, "min")
require.NoError(t, err2)
require.Equal(t, 1.0, min)
max, err3 := reduce([]float64{1, 2, 3}, "max")
require.NoError(t, err3)
require.Equal(t, 3.0, max)
sum, err4 := reduce([]float64{1, 2, 3}, "sum")
require.NoError(t, err4)
require.Equal(t, 6.0, sum)
_, err5 := reduce([]float64{1, 2, 3}, "inexistent_function")
require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function")
}

View File

@ -1,42 +0,0 @@
package collector
import "time"
// MaxCollector is a simple aggregator collector that returns the maximum value
// of metrics from all collectors.
type MaxCollector struct {
collectors []Collector
interval time.Duration
}
// NewMaxCollector initializes a new MacCollector.
func NewMaxCollector(interval time.Duration, collectors ...Collector) *MaxCollector {
return &MaxCollector{
collectors: collectors,
interval: interval,
}
}
// GetMetrics gets metrics from all collectors and return the higest value.
func (c *MaxCollector) GetMetrics() ([]CollectedMetric, error) {
var max CollectedMetric
for _, collector := range c.collectors {
values, err := collector.GetMetrics()
if err != nil {
return nil, err
}
for _, value := range values {
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
max = value
}
}
}
return []CollectedMetric{max}, nil
}
// Interval returns the interval at which the collector should run.
func (c *MaxCollector) Interval() time.Duration {
return c.interval
}

View File

@ -1,20 +0,0 @@
package collector
import autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
type ObjectMetricsGetter interface {
GetObjectMetric(namespace string, reference *autoscalingv2beta1.CrossVersionObjectReference) (float64, error)
}
// type PodCollector struct {
// client kubernetes.Interface
// Getter PodMetricsGetter
// podLabelSelector string
// namespace string
// metricName string
// interval time.Duration
// }
// func NewObjectCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, metricName string, config *MetricConfig, interval time.Duration) (Collector, error) {
// switch
// }

View File

@ -5,8 +5,8 @@ import (
"time"
log "github.com/sirupsen/logrus"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/api/core/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -24,26 +24,26 @@ func NewPodCollectorPlugin(client kubernetes.Interface) *PodCollectorPlugin {
}
}
func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewPodCollector(p.client, hpa, config, interval)
}
type PodCollector struct {
client kubernetes.Interface
Getter PodMetricsGetter
podLabelSelector string
podLabelSelector *metav1.LabelSelector
namespace string
metricName string
metricType autoscalingv2beta1.MetricSourceType
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
interval time.Duration
logger *log.Entry
}
type PodMetricsGetter interface {
GetMetric(pod *v1.Pod) (float64, error)
GetMetric(pod *corev1.Pod) (float64, error)
}
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
// get pod selector based on HPA scale target ref
selector, err := getPodLabelSelector(client, hpa)
if err != nil {
@ -53,7 +53,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.Horizo
c := &PodCollector{
client: client,
namespace: hpa.Namespace,
metricName: config.Name,
metric: config.Metric,
metricType: config.Type,
interval: interval,
podLabelSelector: selector,
@ -79,7 +79,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.Horizo
func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
opts := metav1.ListOptions{
LabelSelector: c.podLabelSelector,
LabelSelector: labels.Set(c.podLabelSelector.MatchLabels).String(),
}
pods, err := c.client.CoreV1().Pods(c.namespace).List(opts)
@ -106,11 +106,10 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
Name: pod.Name,
Namespace: pod.Namespace,
},
MetricName: c.metricName,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
},
Labels: pod.Labels,
}
values = append(values, metricValue)
@ -123,21 +122,21 @@ func (c *PodCollector) Interval() time.Duration {
return c.interval
}
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler) (string, error) {
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) {
switch hpa.Spec.ScaleTargetRef.Kind {
case "Deployment":
deployment, err := client.AppsV1().Deployments(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
return nil, err
}
return labels.Set(deployment.Spec.Selector.MatchLabels).String(), nil
return deployment.Spec.Selector, nil
case "StatefulSet":
sts, err := client.AppsV1().StatefulSets(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
return "", err
return nil, err
}
return labels.Set(sts.Spec.Selector.MatchLabels).String(), nil
return sts.Spec.Selector, nil
}
return "", fmt.Errorf("unable to get pod label selector for scale target ref '%s'", hpa.Spec.ScaleTargetRef.Kind)
return nil, fmt.Errorf("unable to get pod label selector for scale target ref '%s'", hpa.Spec.ScaleTargetRef.Kind)
}

View File

@ -3,19 +3,35 @@ package collector
import (
"context"
"fmt"
"math"
"net/http"
"time"
"github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
PrometheusMetricName = "prometheus-query"
prometheusQueryNameLabelKey = "query-name"
prometheusServerAnnotationKey = "prometheus-server"
)
type NoResultError struct {
query string
}
func (r NoResultError) Error() string {
return fmt.Sprintf("query '%s' did not result a valid response", r.query)
}
type PrometheusCollectorPlugin struct {
promAPI promv1.API
client kubernetes.Interface
@ -24,7 +40,7 @@ type PrometheusCollectorPlugin struct {
func NewPrometheusCollectorPlugin(client kubernetes.Interface, prometheusServer string) (*PrometheusCollectorPlugin, error) {
cfg := api.Config{
Address: prometheusServer,
RoundTripper: &http.Transport{},
RoundTripper: http.DefaultTransport,
}
promClient, err := api.NewClient(cfg)
@ -38,7 +54,7 @@ func NewPrometheusCollectorPlugin(client kubernetes.Interface, prometheusServer
}, nil
}
func (p *PrometheusCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
func (p *PrometheusCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewPrometheusCollector(p.client, p.promAPI, hpa, config, interval)
}
@ -46,31 +62,65 @@ type PrometheusCollector struct {
client kubernetes.Interface
promAPI promv1.API
query string
metricName string
metricType autoscalingv2beta1.MetricSourceType
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
objectReference custom_metrics.ObjectReference
interval time.Duration
perReplica bool
hpa *autoscalingv2beta1.HorizontalPodAutoscaler
hpa *autoscalingv2.HorizontalPodAutoscaler
}
func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PrometheusCollector, error) {
func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PrometheusCollector, error) {
c := &PrometheusCollector{
client: client,
objectReference: config.ObjectReference,
metricName: config.Name,
metricType: config.Type,
interval: interval,
promAPI: promAPI,
perReplica: config.PerReplica,
hpa: hpa,
client: client,
promAPI: promAPI,
interval: interval,
hpa: hpa,
metric: config.Metric,
metricType: config.Type,
}
if v, ok := config.Config["query"]; ok {
// TODO: validate query
c.query = v
} else {
return nil, fmt.Errorf("no prometheus query defined")
switch config.Type {
case autoscalingv2.ObjectMetricSourceType:
c.objectReference = config.ObjectReference
c.perReplica = config.PerReplica
if v, ok := config.Config["query"]; ok {
// TODO: validate query
c.query = v
} else {
return nil, fmt.Errorf("no prometheus query defined")
}
case autoscalingv2.ExternalMetricSourceType:
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for prometheus query is not specified")
}
queryName, ok := config.Config[prometheusQueryNameLabelKey]
if !ok {
return nil, fmt.Errorf("query name not specified on metric")
}
if v, ok := config.Config[queryName]; ok {
// TODO: validate query
c.query = v
} else {
return nil, fmt.Errorf("no prometheus query defined for metric")
}
// Use custom Prometheus URL if defined in HPA annotation.
if promServer, ok := config.Config[prometheusServerAnnotationKey]; ok {
cfg := api.Config{
Address: promServer,
RoundTripper: http.DefaultTransport,
}
promClient, err := api.NewClient(cfg)
if err != nil {
return nil, err
}
c.promAPI = promv1.NewAPI(promClient)
}
}
return c, nil
@ -88,7 +138,7 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
case model.ValVector:
samples := value.(model.Vector)
if len(samples) == 0 {
return nil, fmt.Errorf("query '%s' returned no samples", c.query)
return nil, &NoResultError{query: c.query}
}
sampleValue = samples[0].Value
@ -97,8 +147,8 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
sampleValue = scalar.Value
}
if sampleValue.String() == "NaN" {
return nil, fmt.Errorf("query '%s' returned no samples: %s", c.query, sampleValue.String())
if math.IsNaN(float64(sampleValue)) {
return nil, &NoResultError{query: c.query}
}
if c.perReplica {
@ -113,14 +163,28 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
sampleValue = model.SampleValue(float64(sampleValue) / float64(replicas))
}
metricValue := CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: c.objectReference,
MetricName: c.metricName,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
},
var metricValue CollectedMetric
switch c.metricType {
case autoscalingv2.ObjectMetricSourceType:
metricValue = CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: c.objectReference,
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.metric.Selector},
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
},
}
case autoscalingv2.ExternalMetricSourceType:
metricValue = CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
},
}
}
return []CollectedMetric{metricValue}, nil

View File

@ -1,11 +1,15 @@
package collector
import (
"encoding/json"
"errors"
"fmt"
"math"
"regexp"
"strings"
"time"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
@ -13,60 +17,112 @@ import (
)
const (
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
rpsMetricName = "requests-per-second"
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"%s"}[1m])) * %.4f)`
rpsMetricName = "requests-per-second"
rpsMetricBackendSeparator = ","
)
var (
errBackendNameMissing = errors.New("backend name must be specified for requests-per-second when traffic switching is used")
)
// SkipperCollectorPlugin is a collector plugin for initializing metrics
// collectors for getting skipper ingress metrics.
type SkipperCollectorPlugin struct {
client kubernetes.Interface
plugin CollectorPlugin
client kubernetes.Interface
plugin CollectorPlugin
backendAnnotations []string
}
// NewSkipperCollectorPlugin initializes a new SkipperCollectorPlugin.
func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin) (*SkipperCollectorPlugin, error) {
func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin, backendAnnotations []string) (*SkipperCollectorPlugin, error) {
return &SkipperCollectorPlugin{
client: client,
plugin: prometheusPlugin,
client: client,
plugin: prometheusPlugin,
backendAnnotations: backendAnnotations,
}, nil
}
// NewCollector initializes a new skipper collector from the specified HPA.
func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Name {
case rpsMetricName:
return NewSkipperCollector(c.client, c.plugin, hpa, config, interval)
default:
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
if strings.HasPrefix(config.Metric.Name, rpsMetricName) {
backend := ""
if len(config.Metric.Name) > len(rpsMetricName) {
metricNameParts := strings.Split(config.Metric.Name, rpsMetricBackendSeparator)
if len(metricNameParts) == 2 {
backend = metricNameParts[1]
}
}
return NewSkipperCollector(c.client, c.plugin, hpa, config, interval, c.backendAnnotations, backend)
}
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
}
// SkipperCollector is a metrics collector for getting skipper ingress metrics.
// It depends on the prometheus collector for getting the metrics.
type SkipperCollector struct {
client kubernetes.Interface
metricName string
objectReference custom_metrics.ObjectReference
hpa *autoscalingv2beta1.HorizontalPodAutoscaler
interval time.Duration
plugin CollectorPlugin
config MetricConfig
client kubernetes.Interface
metric autoscalingv2.MetricIdentifier
objectReference custom_metrics.ObjectReference
hpa *autoscalingv2.HorizontalPodAutoscaler
interval time.Duration
plugin CollectorPlugin
config MetricConfig
backend string
backendAnnotations []string
}
// NewSkipperCollector initializes a new SkipperCollector.
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*SkipperCollector, error) {
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) {
return &SkipperCollector{
client: client,
objectReference: config.ObjectReference,
hpa: hpa,
metricName: config.Name,
interval: interval,
plugin: plugin,
config: *config,
client: client,
objectReference: config.ObjectReference,
hpa: hpa,
metric: config.Metric,
interval: interval,
plugin: plugin,
config: *config,
backend: backend,
backendAnnotations: backendAnnotations,
}, nil
}
func getAnnotationWeight(backendWeights string, backend string) float64 {
var weightsMap map[string]int
err := json.Unmarshal([]byte(backendWeights), &weightsMap)
if err != nil {
return 0
}
if weight, ok := weightsMap[backend]; ok {
return float64(weight) / 100
}
return 0
}
func getWeights(ingressAnnotations map[string]string, backendAnnotations []string, backend string) (float64, error) {
maxWeight := 0.0
annotationsPresent := false
for _, anno := range backendAnnotations {
if weightsMap, ok := ingressAnnotations[anno]; ok {
annotationsPresent = true
maxWeight = math.Max(maxWeight, getAnnotationWeight(weightsMap, backend))
}
}
// Fallback for ingresses that don't use traffic switching
if !annotationsPresent {
return 1.0, nil
}
// Require backend name here
if backend != "" {
return maxWeight, nil
}
return 0.0, errBackendNameMissing
}
// getCollector returns a collector for getting the metrics.
func (c *SkipperCollector) getCollector() (Collector, error) {
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
@ -74,32 +130,31 @@ func (c *SkipperCollector) getCollector() (Collector, error) {
return nil, err
}
backendWeight, err := getWeights(ingress.Annotations, c.backendAnnotations, c.backend)
if err != nil {
return nil, err
}
config := c.config
var collector Collector
collectors := make([]Collector, 0, len(ingress.Spec.Rules))
var escapedHostnames []string
for _, rule := range ingress.Spec.Rules {
host := strings.Replace(rule.Host, ".", "_", -1)
config.Config = map[string]string{
"query": fmt.Sprintf(rpsQuery, host),
}
config.PerReplica = false // per replica is handled outside of the prometheus collector
collector, err := c.plugin.NewCollector(c.hpa, &config, c.interval)
if err != nil {
return nil, err
}
collectors = append(collectors, collector)
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(rule.Host, ".", "_", -1)))
}
if len(collectors) > 1 {
collector = NewMaxCollector(c.interval, collectors...)
} else if len(collectors) == 1 {
collector = collectors[0]
} else {
if len(escapedHostnames) == 0 {
return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name)
}
config.Config = map[string]string{
"query": fmt.Sprintf(rpsQuery, strings.Join(escapedHostnames, "|"), backendWeight),
}
config.PerReplica = false // per replica is handled outside of the prometheus collector
collector, err := c.plugin.NewCollector(c.hpa, &config, c.interval)
if err != nil {
return nil, err
}
return collector, nil
}
@ -119,22 +174,26 @@ func (c *SkipperCollector) GetMetrics() ([]CollectedMetric, error) {
return nil, fmt.Errorf("expected to only get one metric value, got %d", len(values))
}
// get current replicas for the targeted scale object. This is used to
// calculate an average metric instead of total.
// targetAverageValue will be available in Kubernetes v1.12
// https://github.com/kubernetes/kubernetes/pull/64097
replicas, err := targetRefReplicas(c.client, c.hpa)
if err != nil {
return nil, err
}
if replicas < 1 {
return nil, fmt.Errorf("unable to get average value for %d replicas", replicas)
}
value := values[0]
avgValue := float64(value.Custom.Value.MilliValue()) / float64(replicas)
value.Custom.Value = *resource.NewMilliQuantity(int64(avgValue), resource.DecimalSI)
// For Kubernetes <v1.14 we have to fall back to manual average
if c.config.MetricSpec.Object.Target.AverageValue == nil {
// get current replicas for the targeted scale object. This is used to
// calculate an average metric instead of total.
// targetAverageValue will be available in Kubernetes v1.12
// https://github.com/kubernetes/kubernetes/pull/64097
replicas, err := targetRefReplicas(c.client, c.hpa)
if err != nil {
return nil, err
}
if replicas < 1 {
return nil, fmt.Errorf("unable to get average value for %d replicas", replicas)
}
avgValue := float64(value.Custom.Value.MilliValue()) / float64(replicas)
value.Custom.Value = *resource.NewMilliQuantity(int64(avgValue), resource.DecimalSI)
}
return []CollectedMetric{value}, nil
}
@ -144,7 +203,7 @@ func (c *SkipperCollector) Interval() time.Duration {
return c.interval
}
func targetRefReplicas(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler) (int32, error) {
func targetRefReplicas(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (int32, error) {
var replicas int32
switch hpa.Spec.ScaleTargetRef.Kind {
case "Deployment":

File diff suppressed because it is too large Load Diff

View File

@ -7,7 +7,7 @@ import (
"time"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
@ -41,17 +41,13 @@ func NewZMONCollectorPlugin(zmon zmon.ZMON) (*ZMONCollectorPlugin, error) {
}
// NewCollector initializes a new ZMON collector from the specified HPA.
func (c *ZMONCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Name {
func (c *ZMONCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Metric.Name {
case ZMONCheckMetric:
annotations := map[string]string{}
if hpa != nil {
annotations = hpa.Annotations
}
return NewZMONCollector(c.zmon, config, annotations, interval)
return NewZMONCollector(c.zmon, config, interval)
}
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
}
// ZMONCollector defines a collector that is able to collect metrics from ZMON.
@ -60,17 +56,20 @@ type ZMONCollector struct {
interval time.Duration
checkID int
key string
labels map[string]string
tags map[string]string
duration time.Duration
aggregators []string
metricName string
metricType autoscalingv2beta1.MetricSourceType
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
}
// NewZMONCollector initializes a new ZMONCollector.
func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[string]string, interval time.Duration) (*ZMONCollector, error) {
checkIDStr, ok := config.Labels[zmonCheckIDLabelKey]
func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, interval time.Duration) (*ZMONCollector, error) {
if config.Metric.Selector == nil {
return nil, fmt.Errorf("selector for zmon-check is not specified")
}
checkIDStr, ok := config.Config[zmonCheckIDLabelKey]
if !ok {
return nil, fmt.Errorf("ZMON check ID not specified on metric")
}
@ -83,19 +82,14 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
key := ""
// get optional key
if k, ok := config.Labels[zmonKeyLabelKey]; ok {
key = k
}
// annotations takes precedence over label
if k, ok := annotations[zmonKeyAnnotationKey]; ok {
if k, ok := config.Config[zmonKeyLabelKey]; ok {
key = k
}
duration := defaultQueryDuration
// parse optional duration value
if d, ok := config.Labels[zmonDurationLabelKey]; ok {
if d, ok := config.Config[zmonDurationLabelKey]; ok {
duration, err = time.ParseDuration(d)
if err != nil {
return nil, err
@ -104,26 +98,16 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
// parse tags
tags := make(map[string]string)
for k, v := range config.Labels {
for k, v := range config.Config {
if strings.HasPrefix(k, zmonTagPrefixLabelKey) {
key := strings.TrimPrefix(k, zmonTagPrefixLabelKey)
tags[key] = v
}
}
// parse tags from annotations
// tags defined in annotations takes precedence over tags defined in
// the labels.
for k, v := range annotations {
if strings.HasPrefix(k, zmonTagPrefixAnnotationKey) {
key := strings.TrimPrefix(k, zmonTagPrefixAnnotationKey)
tags[key] = v
}
}
// default aggregator is last
aggregators := []string{"last"}
if k, ok := config.Labels[zmonAggregatorsLabelKey]; ok {
if k, ok := config.Config[zmonAggregatorsLabelKey]; ok {
aggregators = strings.Split(k, ",")
}
@ -135,9 +119,8 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
tags: tags,
duration: duration,
aggregators: aggregators,
metricName: config.Name,
metric: config.Metric,
metricType: config.Type,
labels: config.Labels,
}, nil
}
@ -159,8 +142,8 @@ func (c *ZMONCollector) GetMetrics() ([]CollectedMetric, error) {
metricValue := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metricName,
MetricLabels: c.labels,
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{Time: point.Time},
Value: *resource.NewMilliQuantity(int64(point.Value*1000), resource.DecimalSI),
},

View File

@ -6,7 +6,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
@ -26,9 +26,9 @@ func TestZMONCollectorNewCollector(t *testing.T) {
config := &MetricConfig{
MetricTypeName: MetricTypeName{
Name: ZMONCheckMetric,
Metric: newMetricIdentifier(ZMONCheckMetric),
},
Labels: map[string]string{
Config: map[string]string{
zmonCheckIDLabelKey: "1234",
zmonAggregatorsLabelKey: "max",
zmonTagPrefixLabelKey + "alias": "cluster_alias",
@ -37,7 +37,7 @@ func TestZMONCollectorNewCollector(t *testing.T) {
},
}
hpa := &autoscalingv2beta1.HorizontalPodAutoscaler{}
hpa := &autoscalingv2.HorizontalPodAutoscaler{}
collector, err := collectPlugin.NewCollector(hpa, config, 1*time.Second)
require.NoError(t, err)
@ -50,39 +50,31 @@ func TestZMONCollectorNewCollector(t *testing.T) {
require.Equal(t, []string{"max"}, zmonCollector.aggregators)
require.Equal(t, map[string]string{"alias": "cluster_alias"}, zmonCollector.tags)
// check that annotations overwrites labels
hpa.ObjectMeta = metav1.ObjectMeta{
Annotations: map[string]string{
zmonKeyAnnotationKey: "annotation_key",
zmonTagPrefixAnnotationKey + "alias": "cluster_alias_annotation",
},
}
collector, err = collectPlugin.NewCollector(hpa, config, 1*time.Second)
require.NoError(t, err)
require.NotNil(t, collector)
zmonCollector = collector.(*ZMONCollector)
require.Equal(t, "annotation_key", zmonCollector.key)
require.Equal(t, map[string]string{"alias": "cluster_alias_annotation"}, zmonCollector.tags)
// should fail if the metric name isn't ZMON
config.Name = "non-zmon-check"
config.Metric = newMetricIdentifier("non-zmon-check")
_, err = collectPlugin.NewCollector(nil, config, 1*time.Second)
require.Error(t, err)
// should fail if the check id is not specified.
delete(config.Labels, zmonCheckIDLabelKey)
config.Name = ZMONCheckMetric
delete(config.Config, zmonCheckIDLabelKey)
config.Metric.Name = ZMONCheckMetric
_, err = collectPlugin.NewCollector(nil, config, 1*time.Second)
require.Error(t, err)
}
func newMetricIdentifier(metricName string) autoscalingv2.MetricIdentifier {
selector := metav1.LabelSelector{}
return autoscalingv2.MetricIdentifier{Name: metricName, Selector: &selector}
}
func TestZMONCollectorGetMetrics(tt *testing.T) {
config := &MetricConfig{
MetricTypeName: MetricTypeName{
Name: ZMONCheckMetric,
Type: "foo",
Metric: newMetricIdentifier(ZMONCheckMetric),
Type: "foo",
},
Labels: map[string]string{
Config: map[string]string{
zmonCheckIDLabelKey: "1234",
zmonAggregatorsLabelKey: "max",
zmonTagPrefixLabelKey + "alias": "cluster_alias",
@ -108,8 +100,8 @@ func TestZMONCollectorGetMetrics(tt *testing.T) {
{
Type: config.Type,
External: external_metrics.ExternalMetricValue{
MetricName: config.Name,
MetricLabels: config.Labels,
MetricName: config.Metric.Name,
MetricLabels: config.Metric.Selector.MatchLabels,
Timestamp: metav1.Time{Time: time.Time{}},
Value: *resource.NewMilliQuantity(int64(1.0)*1000, resource.DecimalSI),
},
@ -125,7 +117,7 @@ func TestZMONCollectorGetMetrics(tt *testing.T) {
dataPoints: ti.dataPoints,
}
zmonCollector, err := NewZMONCollector(z, config, nil, 1*time.Second)
zmonCollector, err := NewZMONCollector(z, config, 1*time.Second)
require.NoError(t, err)
metrics, _ := zmonCollector.GetMetrics()

View File

@ -12,7 +12,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/recorder"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -46,10 +46,6 @@ var (
})
)
type objectCollector struct {
ObjectReference *autoscalingv2beta1.CrossVersionObjectReference
}
// HPAProvider is a base provider for initializing metric collectors based on
// HPA resources.
type HPAProvider struct {
@ -58,7 +54,7 @@ type HPAProvider struct {
collectorScheduler *CollectorScheduler
collectorInterval time.Duration
metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2beta1.HorizontalPodAutoscaler
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore
collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder
@ -120,23 +116,31 @@ func (p *HPAProvider) Run(ctx context.Context) {
func (p *HPAProvider) updateHPAs() error {
p.logger.Info("Looking for HPAs")
hpas, err := p.client.AutoscalingV2beta1().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{})
hpas, err := p.client.AutoscalingV2beta2().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
return err
}
newHPACache := make(map[resourceReference]autoscalingv2beta1.HorizontalPodAutoscaler, len(hpas.Items))
newHPACache := make(map[resourceReference]autoscalingv2.HorizontalPodAutoscaler, len(hpas.Items))
newHPAs := 0
for _, hpa := range hpas.Items {
hpa := hpa
resourceRef := resourceReference{
Name: hpa.Name,
Namespace: hpa.Namespace,
}
if cachedHPA, ok := p.hpaCache[resourceRef]; !ok || !equalHPA(cachedHPA, hpa) {
cachedHPA, ok := p.hpaCache[resourceRef]
hpaUpdated := !equalHPA(cachedHPA, hpa)
if !ok || hpaUpdated {
// if the hpa has changed then remove the previous
// scheduled collector.
if hpaUpdated {
p.logger.Infof("Removing previously scheduled metrics collector: %s", resourceRef)
p.collectorScheduler.Remove(resourceRef)
}
metricConfigs, err := collector.ParseHPAMetrics(&hpa)
if err != nil {
p.logger.Errorf("Failed to parse HPA metrics: %v", err)
@ -187,7 +191,7 @@ func (p *HPAProvider) updateHPAs() error {
}
// equalHPA returns true if two HPAs are identical (apart from their status).
func equalHPA(a, b autoscalingv2beta1.HorizontalPodAutoscaler) bool {
func equalHPA(a, b autoscalingv2.HorizontalPodAutoscaler) bool {
// reset resource version to not compare it since this will change
// whenever the status of the object is updated. We only want to
// compare the metadata and the spec.
@ -225,15 +229,15 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
p.logger.Infof("Collected %d new metric(s)", len(collection.Values))
for _, value := range collection.Values {
switch value.Type {
case autoscalingv2beta1.ObjectMetricSourceType, autoscalingv2beta1.PodsMetricSourceType:
case autoscalingv2.ObjectMetricSourceType, autoscalingv2.PodsMetricSourceType:
p.logger.Infof("Collected new custom metric '%s' (%s) for %s %s/%s",
value.Custom.MetricName,
value.Custom.Metric.Name,
value.Custom.Value.String(),
value.Custom.DescribedObject.Kind,
value.Custom.DescribedObject.Namespace,
value.Custom.DescribedObject.Name,
)
case autoscalingv2beta1.ExternalMetricSourceType:
case autoscalingv2.ExternalMetricSourceType:
p.logger.Infof("Collected new external metric '%s' (%s) [%s]",
value.External.MetricName,
value.External.Value.String(),
@ -250,7 +254,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
}
// GetMetricByName gets a single metric by name.
func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo) (*custom_metrics.MetricValue, error) {
func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
metric := p.metricStore.GetMetricsByName(name, info)
if metric == nil {
return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name.Name)
@ -260,7 +264,7 @@ func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.C
// GetMetricBySelector returns metrics for namespaced resources by
// label selector.
func (p *HPAProvider) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo) (*custom_metrics.MetricValueList, error) {
func (p *HPAProvider) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
return p.metricStore.GetMetricsBySelector(namespace, selector, info), nil
}

96
pkg/provider/hpa_test.go Normal file
View File

@ -0,0 +1,96 @@
package provider
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
autoscaling "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)
type mockCollectorPlugin struct{}
func (m mockCollectorPlugin) NewCollector(hpa *autoscaling.HorizontalPodAutoscaler, config *collector.MetricConfig, interval time.Duration) (collector.Collector, error) {
return mockCollector{}, nil
}
type mockCollector struct{}
func (c mockCollector) GetMetrics() ([]collector.CollectedMetric, error) {
return nil, nil
}
func (c mockCollector) Interval() time.Duration {
return 1 * time.Second
}
func TestUpdateHPAs(t *testing.T) {
value := resource.MustParse("1k")
hpa := &autoscaling.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa1",
Namespace: "default",
Annotations: map[string]string{
"metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps",
"metric-config.pods.requests-per-second.json-path/path": "/metrics",
"metric-config.pods.requests-per-second.json-path/port": "9090",
},
},
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app",
APIVersion: "apps/v1",
},
MinReplicas: &[]int32{1}[0],
MaxReplicas: 10,
Metrics: []autoscaling.MetricSpec{
{
Type: autoscaling.PodsMetricSourceType,
Pods: &autoscaling.PodsMetricSource{
Metric: autoscaling.MetricIdentifier{
Name: "requests-per-second",
},
Target: autoscaling.MetricTarget{
Type: autoscaling.AverageValueMetricType,
AverageValue: &value,
},
},
},
},
},
}
fakeClient := fake.NewSimpleClientset()
var err error
hpa, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
require.NoError(t, err)
collectorFactory := collector.NewCollectorFactory()
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
require.NoError(t, err)
require.Len(t, provider.collectorScheduler.table, 1)
// update HPA
hpa.Annotations["metric-config.pods.requests-per-second.json-path/port"] = "8080"
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Update(hpa)
require.NoError(t, err)
err = provider.updateHPAs()
require.NoError(t, err)
require.Len(t, provider.collectorScheduler.table, 1)
}

Some files were not shown because too many files have changed in this diff Show More