Compare commits

..

56 Commits

Author SHA1 Message Date
dbed1570ba Merge pull request #156 from zalando-incubator/networking.k8s.io
Add support for Ingress of API Group networking.k8s.io
2020-05-26 15:09:18 +02:00
ef24244074 Add support for networking.k8s.io
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2020-05-26 14:47:46 +02:00
cadf2dff3c Merge pull request #153 from doyshinda/custom_timeouts
Add support for custom timeouts
2020-05-18 10:27:17 +02:00
db83268343 Merge pull request #150 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.24
Bump github.com/aws/aws-sdk-go from 1.30.19 to 1.30.24
2020-05-18 10:25:50 +02:00
78c2ef742b Merge pull request #152 from doyshinda/parallel_get_metrics
Fetch pod metrics in parallel
2020-05-18 09:26:14 +02:00
f0b817629c Add support for custom timeouts
Allow configuration per metric for connect and request timeouts
when querying pods for JSON metrics.

Signed-off-by: Abe Friesen <2319792+doyshinda@users.noreply.github.com>
2020-05-16 07:15:58 -06:00
3e7b66070c Fetch pod metrics in parallel
Fetching metrics from pods sequentially, with a large number of
pods, can result in poor performance when some of those pods have
been terminated by the HPA in a normal scale down event.

Signed-off-by: Abe Friesen <2319792+doyshinda@users.noreply.github.com>
2020-05-16 06:47:13 -06:00
f3d33b6132 Bump github.com/aws/aws-sdk-go from 1.30.19 to 1.30.24
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.19 to 1.30.24.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.19...v1.30.24)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-05-11 10:18:26 +00:00
7b7af889fb Merge pull request #148 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.19
Bump github.com/aws/aws-sdk-go from 1.30.14 to 1.30.19
2020-05-06 12:45:54 +02:00
2bcaa80ce0 Merge pull request #147 from zalando-incubator/dependabot/go_modules/github.com/prometheus/client_golang-1.6.0
Bump github.com/prometheus/client_golang from 1.5.1 to 1.6.0
2020-05-06 12:45:46 +02:00
a2e6980b25 Merge pull request #146 from zalando-incubator/dependabot/go_modules/github.com/sirupsen/logrus-1.6.0
Bump github.com/sirupsen/logrus from 1.5.0 to 1.6.0
2020-05-06 12:45:23 +02:00
f0783dcdd8 Bump github.com/aws/aws-sdk-go from 1.30.14 to 1.30.19
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.14 to 1.30.19.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.14...v1.30.19)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-05-04 09:59:27 +00:00
560fb40dae Bump github.com/prometheus/client_golang from 1.5.1 to 1.6.0
Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.5.1 to 1.6.0.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/master/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.5.1...v1.6.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-05-04 09:58:05 +00:00
41229e370d Bump github.com/sirupsen/logrus from 1.5.0 to 1.6.0
Bumps [github.com/sirupsen/logrus](https://github.com/sirupsen/logrus) from 1.5.0 to 1.6.0.
- [Release notes](https://github.com/sirupsen/logrus/releases)
- [Changelog](https://github.com/sirupsen/logrus/blob/master/CHANGELOG.md)
- [Commits](https://github.com/sirupsen/logrus/compare/v1.5.0...v1.6.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-05-04 09:57:46 +00:00
338393c11e Merge pull request #145 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.14
Bump github.com/aws/aws-sdk-go from 1.30.9 to 1.30.14
2020-05-02 21:28:27 +02:00
8e7d1bc0d6 Bump github.com/aws/aws-sdk-go from 1.30.9 to 1.30.14
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.9 to 1.30.14.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.9...v1.30.14)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-04-27 10:18:59 +00:00
dc7fb5875b Merge pull request #143 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.9
Bump github.com/aws/aws-sdk-go from 1.30.7 to 1.30.9
2020-04-21 06:20:06 +02:00
8f70d3493e Bump github.com/aws/aws-sdk-go from 1.30.7 to 1.30.9
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.7 to 1.30.9.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.7...v1.30.9)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-04-20 10:00:12 +00:00
97ec13d010 Update Travis to Go 1.14 (#140)
Signed-off-by: Gábor Lipták <gliptak@gmail.com>
2020-04-12 22:54:49 +02:00
5e20dcd961 Merge pull request #139 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.7
Bump github.com/aws/aws-sdk-go from 1.30.3 to 1.30.7
2020-04-09 15:52:18 +02:00
f27dbc3939 Bump github.com/aws/aws-sdk-go from 1.30.3 to 1.30.7
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.3 to 1.30.7.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.3...v1.30.7)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-04-09 08:48:10 +00:00
6443613930 Merge pull request #135 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.3
Bump github.com/aws/aws-sdk-go from 1.30.2 to 1.30.3
2020-04-03 12:35:00 +02:00
d9c2e50f2c Bump github.com/aws/aws-sdk-go from 1.30.2 to 1.30.3
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.2 to 1.30.3.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.2...v1.30.3)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-04-03 08:48:22 +00:00
a8cd761f85 Merge pull request #122 from zalando-incubator/http-collector
Add basic HTTP collector
2020-04-02 14:34:47 +02:00
43fa626ca1 Merge pull request #134 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.2
Bump github.com/aws/aws-sdk-go from 1.30.0 to 1.30.2
2020-04-02 12:01:48 +02:00
2d3ddc53ef Add and HTTP metrics collector
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-04-02 11:05:37 +02:00
02880fad2d Bump github.com/aws/aws-sdk-go from 1.30.0 to 1.30.2
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.0 to 1.30.2.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.0...v1.30.2)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-04-02 08:34:49 +00:00
405e347620 Merge pull request #132 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.0
Bump github.com/aws/aws-sdk-go from 1.29.33 to 1.30.0
2020-03-31 19:14:02 +02:00
f15aacad5c Merge pull request #123 from zalando-incubator/dependabot/go_modules/github.com/spf13/cobra-0.0.7
Bump github.com/spf13/cobra from 0.0.6 to 0.0.7
2020-03-31 19:13:31 +02:00
c0a9f525b8 Merge pull request #131 from vetinari/patch-1
fix error to not contain twice the namespace
2020-03-31 19:13:05 +02:00
24207d285c Merge pull request #126 from zalando-incubator/dependabot/go_modules/github.com/prometheus/client_golang-1.5.1
Bump github.com/prometheus/client_golang from 1.0.0 to 1.5.1
2020-03-31 19:12:03 +02:00
860aba807e fix error to not contain twice the namespace
but to contain also the name of the pod

Signed-off-by: Hanno Hecker <hanno@zalando.de>
2020-03-31 10:44:55 +02:00
73659f8ac6 Bump github.com/aws/aws-sdk-go from 1.29.33 to 1.30.0
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.29.33 to 1.30.0.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.29.33...v1.30.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-31 08:42:58 +00:00
280d358538 Bump github.com/prometheus/client_golang from 1.0.0 to 1.5.1
Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.0.0 to 1.5.1.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/master/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.0.0...v1.5.1)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-30 09:50:33 +00:00
735bb164e2 Bump github.com/spf13/cobra from 0.0.6 to 0.0.7
Bumps [github.com/spf13/cobra](https://github.com/spf13/cobra) from 0.0.6 to 0.0.7.
- [Release notes](https://github.com/spf13/cobra/releases)
- [Commits](https://github.com/spf13/cobra/compare/v0.0.6...0.0.7)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-30 09:49:38 +00:00
d28712abd1 Merge pull request #110 from zalando-incubator/update-dependencies
Updated dependencies based on version 1.17
2020-03-27 11:05:59 +01:00
670692b23c Updated dependencies based on version 1.17
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-03-27 10:54:06 +01:00
33683fe88d Merge pull request #121 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.29.33
Bump github.com/aws/aws-sdk-go from 1.29.31 to 1.29.33
2020-03-27 10:23:34 +01:00
abaef5e491 Bump github.com/aws/aws-sdk-go from 1.29.31 to 1.29.33
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.29.31 to 1.29.33.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.29.31...v1.29.33)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-27 08:31:09 +00:00
670f081a5e Merge pull request #115 from zalando-incubator/dependabot/go_modules/github.com/spf13/cobra-0.0.6
Bump github.com/spf13/cobra from 0.0.3 to 0.0.6
2020-03-26 12:10:03 +01:00
55a743e5ac Merge pull request #119 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.29.31
Bump github.com/aws/aws-sdk-go from 1.29.30 to 1.29.31
2020-03-25 14:06:01 +01:00
830d385a4a Bump github.com/spf13/cobra from 0.0.3 to 0.0.6
Bumps [github.com/spf13/cobra](https://github.com/spf13/cobra) from 0.0.3 to 0.0.6.
- [Release notes](https://github.com/spf13/cobra/releases)
- [Commits](https://github.com/spf13/cobra/compare/v0.0.3...v0.0.6)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-25 12:27:25 +00:00
f2638c6843 Bump github.com/aws/aws-sdk-go from 1.29.30 to 1.29.31
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.29.30 to 1.29.31.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.29.30...v1.29.31)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-25 08:33:33 +00:00
b77fba2e7b Merge pull request #114 from zalando-incubator/dependabot/go_modules/github.com/stretchr/testify-1.5.1
Bump github.com/stretchr/testify from 1.4.0 to 1.5.1
2020-03-24 10:49:11 +01:00
e77d51f97a Merge pull request #112 from zalando-incubator/dependabot/go_modules/github.com/influxdata/influxdb-client-go-0.1.5
Bump github.com/influxdata/influxdb-client-go from 0.1.4 to 0.1.5
2020-03-24 10:48:43 +01:00
69ac42ed7d Merge pull request #111 from zalando-incubator/dependabot/go_modules/github.com/prometheus/client_golang-0.9.4
Bump github.com/prometheus/client_golang from 0.9.2 to 0.9.4
2020-03-24 10:48:02 +01:00
582f05539c Bump github.com/sirupsen/logrus from 1.4.2 to 1.5.0 (#117)
Bumps [github.com/sirupsen/logrus](https://github.com/sirupsen/logrus) from 1.4.2 to 1.5.0.
- [Release notes](https://github.com/sirupsen/logrus/releases)
- [Changelog](https://github.com/sirupsen/logrus/blob/master/CHANGELOG.md)
- [Commits](https://github.com/sirupsen/logrus/compare/v1.4.2...v1.5.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>
2020-03-24 10:46:59 +01:00
46f2e5c4fd Bump github.com/aws/aws-sdk-go from 1.29.4 to 1.29.30 (#116)
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.29.4 to 1.29.30.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.29.4...v1.29.30)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>
2020-03-24 10:46:51 +01:00
31bc1771df Bump github.com/sirupsen/logrus from 1.4.2 to 1.5.0
Bumps [github.com/sirupsen/logrus](https://github.com/sirupsen/logrus) from 1.4.2 to 1.5.0.
- [Release notes](https://github.com/sirupsen/logrus/releases)
- [Changelog](https://github.com/sirupsen/logrus/blob/master/CHANGELOG.md)
- [Commits](https://github.com/sirupsen/logrus/compare/v1.4.2...v1.5.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-24 08:49:15 +00:00
836c78d08b Bump github.com/stretchr/testify from 1.4.0 to 1.5.1
Bumps [github.com/stretchr/testify](https://github.com/stretchr/testify) from 1.4.0 to 1.5.1.
- [Release notes](https://github.com/stretchr/testify/releases)
- [Commits](https://github.com/stretchr/testify/compare/v1.4.0...v1.5.1)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-22 14:21:19 +00:00
6f5ab042e6 Bump github.com/influxdata/influxdb-client-go from 0.1.4 to 0.1.5
Bumps [github.com/influxdata/influxdb-client-go](https://github.com/influxdata/influxdb-client-go) from 0.1.4 to 0.1.5.
- [Release notes](https://github.com/influxdata/influxdb-client-go/releases)
- [Commits](https://github.com/influxdata/influxdb-client-go/compare/v0.1.4...v0.1.5)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-22 14:20:20 +00:00
5abc7388fb Bump github.com/prometheus/client_golang from 0.9.2 to 0.9.4
Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 0.9.2 to 0.9.4.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/master/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v0.9.2...v0.9.4)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-22 14:19:58 +00:00
5bf87cb10e Add support for passing URL query params to pod metric endpoints (#109)
Adds a new metric-config option named `rawQuery`. The value of
this option will be appended to the metric `path` as URL query
parameters to be used by the pod's application. E.g.,:
```
metric-config.pods.requests-per-second.json-path/rawQuery: "foo=bar&baz=bop"
```
will apppend `?foo=bar&baz=bop` to the URL.

Signed-off-by: Abe Friesen <2319792+doyshinda@users.noreply.github.com>
2020-03-10 10:45:28 +01:00
c6610750e4 Merge pull request #107 from affo/fix/influxdb-token-org-swap
fix(influxdb): fix swap of org/token
2020-02-24 09:27:34 +01:00
04ae6d955e fix(influxdb): fix swap of org/token parameter passing to
InfluxDBCollector

Convert from `orgID` to `org`.

Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-02-21 19:13:17 +01:00
2d56c202a7 Update AWS SDK to support AWS IAM solution (#106)
Fix #105

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2020-02-18 09:59:31 +01:00
24 changed files with 1506 additions and 561 deletions

View File

@ -2,7 +2,7 @@ language: go
dist: xenial
go:
- "1.13.x"
- "1.14.x"
env:
- GO111MODULE=on GOLANGCI_RELEASE="v1.21.0"

View File

@ -155,6 +155,25 @@ values of JSONPath expressions that evaluate to arrays/slices of numbers.
It's optional but when the expression evaluates to an array/slice, it's absence will
produce an error. The supported aggregation functions are `avg`, `max`, `min` and `sum`.
The `raw-query` configuration option specifies the query params to send along to the endpoint:
```yaml
metric-config.pods.requests-per-second.json-path/path: /metrics
metric-config.pods.requests-per-second.json-path/port: "9090"
metric-config.pods.requests-per-second.json-path/raw-query: "foo=bar&baz=bop"
```
will create a URL like this:
```
http://<podIP>:9090/metrics?foo=bar&baz=bop
```
There are also configuration options for custom (connect and request) timeouts when querying pods for metrics:
```yaml
metric-config.pods.requests-per-second.json-path/request-timeout: 2s
metric-config.pods.requests-per-second.json-path/connect-timeout: 500ms
```
The default for both of the above values is 15 seconds.
## Prometheus collector
The Prometheus collector is a generic collector which can map Prometheus
@ -369,10 +388,11 @@ metadata:
# instead of using the ones specified via CLI. Respectively:
# - --influxdb-address
# - --influxdb-token
# - --influxdb-org-id
# - --influxdb-org
metric-config.external.flux-query.influxdb/address: "http://influxdbv2.my-namespace.svc"
metric-config.external.flux-query.influxdb/token: "secret-token"
metric-config.external.flux-query.influxdb/org-id: "deadbeef"
# This could be either the organization name or the ID.
metric-config.external.flux-query.influxdb/org: "deadbeef"
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
# <configKey> == query-name
metric-config.external.flux-query.influxdb/queue_depth: |
@ -523,9 +543,9 @@ spec:
tag-application: my-custom-app
aggregators: avg # comma separated list of aggregation functions, default: last
duration: 5m # default: 10m
target:
averageValue: "30"
type: AverageValue
target:
averageValue: "30"
type: AverageValue
```
The `check-id` specifies the ZMON check to query for the metrics. `key`
@ -573,3 +593,60 @@ you need to define a `key` or other `tag` with a "star" query syntax like
`values.*`. This *hack* is in place because it's not allowed to use `*` in the
metric label definitions. If both annotations and corresponding label is
defined, then the annotation takes precedence.
## HTTP Collector
The http collector allows collecting metrics from an external endpoint specified in the HPA.
Currently only `json-path` collection is supported.
### Supported metrics
| Metric | Description | Type | K8s Versions |
| ------------ | -------------- | ------- | -- |
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Pods | `>=1.12` |
### Example
This is an example of using the HTTP collector to collect metrics from a json
metrics endpoint specified in the annotations.
```yaml
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: myapp-hpa
annotations:
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
metric-config.external.http.json/json-key: "$.some-metric.value"
metric-config.external.http.json/endpoint: "http://metric-source.app-namespace:8080/metrics"
metric-config.external.http.json/aggregator: "max"
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: myapp
minReplicas: 1
maxReplicas: 10
metrics:
- type: External
external:
metric:
name: http
selector:
matchLabels:
identifier: unique-metric-name
target:
averageValue: 1
type: AverageValue
```
The HTTP collector similar to the Pod Metrics collector. The metric name should always be `http`.
This value is also used in the annotations to configure the metrics adapter to query the required
target. The following configuration values are supported:
- `json-key` to specify the JSON path of the metric to be queried
- `endpoint` the fully formed path to query for the metric. In the above example a Kubernetes _Service_
in the namespace `app-namespace` is called.
- `aggregator` is only required if the metric is an array of values and specifies how the values
are aggregated. Currently this option can support the values: `sum`, `max`, `min`, `avg`.

42
go.mod
View File

@ -2,31 +2,31 @@ module github.com/zalando-incubator/kube-metrics-adapter
require (
github.com/NYTimes/gziphandler v1.0.1 // indirect
github.com/aws/aws-sdk-go v1.16.6
github.com/coreos/go-systemd v0.0.0-20180705093442-88bfeed483d3 // indirect
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
github.com/aws/aws-sdk-go v1.30.24
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
github.com/influxdata/influxdb-client-go v0.1.4
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
github.com/influxdata/influxdb-client-go v0.1.5
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20200323093244-5046ce1afe6b
github.com/lib/pq v1.2.0 // indirect
github.com/mattn/go-colorable v0.1.4 // indirect
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/sirupsen/logrus v1.4.2
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/spf13/cobra v0.0.3
github.com/stretchr/testify v1.3.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
github.com/onsi/ginkgo v1.11.0 // indirect
github.com/onsi/gomega v1.8.1 // indirect
github.com/prometheus/client_golang v1.6.0
github.com/prometheus/common v0.9.1
github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v0.0.7
github.com/stretchr/testify v1.5.1
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
gopkg.in/inf.v0 v0.9.1 // indirect
k8s.io/api v0.0.0-20190918155943-95b840bb6a1f
k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655
k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad // indirect
k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90
k8s.io/component-base v0.0.0-20190918160511-547f6c5d7090
k8s.io/klog v0.4.0
k8s.io/metrics v0.0.0-20190226180357-f3f09b9076d1
golang.org/x/tools v0.0.0-20200204192400-7124308813f3 // indirect
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
k8s.io/api v0.17.3
k8s.io/apimachinery v0.17.4
k8s.io/client-go v0.17.3
k8s.io/component-base v0.17.3
k8s.io/klog v1.0.0
k8s.io/metrics v0.17.3
)
go 1.13

414
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -86,6 +86,21 @@ func TestParser(t *testing.T) {
"org-id": "deadbeef",
},
},
{
Name: "http metrics",
Annotations: map[string]string{
"metric-config.external.http.json/json-key": "$.metric.value",
"metric-config.external.http.json/endpoint": "http://metric-source.source-namespace.svc.cluster.local:8000/metrics",
"metric-config.external.http.json/aggregator": "avg",
},
MetricName: "http",
MetricType: autoscalingv2.ExternalMetricSourceType,
ExpectedConfig: map[string]string{
"json-key": "$.metric.value",
"endpoint": "http://metric-source.source-namespace.svc.cluster.local:8000/metrics",
"aggregator": "avg",
},
},
} {
t.Run(tc.Name, func(t *testing.T) {
hpaMap := make(AnnotationConfigMap)

View File

@ -0,0 +1,103 @@
package collector
import (
"fmt"
"net/url"
"time"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
"github.com/oliveagle/jsonpath"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
HTTPMetricName = "http"
HTTPEndpointAnnotationKey = "endpoint"
HTTPJsonPathAnnotationKey = "json-key"
identifierLabel = "identifier"
)
type HTTPCollectorPlugin struct{}
func NewHTTPCollectorPlugin() (*HTTPCollectorPlugin, error) {
return &HTTPCollectorPlugin{}, nil
}
func (p *HTTPCollectorPlugin) NewCollector(_ *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
collector := &HTTPCollector{}
var (
value string
ok bool
)
if value, ok = config.Config[HTTPJsonPathAnnotationKey]; !ok {
return nil, fmt.Errorf("config value %s not found", HTTPJsonPathAnnotationKey)
}
jsonPath, err := jsonpath.Compile(value)
if err != nil {
return nil, fmt.Errorf("failed to parse json path: %v", err)
}
collector.jsonPath = jsonPath
if value, ok = config.Config[HTTPEndpointAnnotationKey]; !ok {
return nil, fmt.Errorf("config value %s not found", HTTPEndpointAnnotationKey)
}
collector.endpoint, err = url.Parse(value)
if err != nil {
return nil, err
}
collector.interval = interval
collector.metricType = config.Type
if config.Metric.Selector == nil || config.Metric.Selector.MatchLabels == nil {
return nil, fmt.Errorf("no label selector specified for metric: %s", config.Metric.Name)
}
if _, ok := config.Metric.Selector.MatchLabels[identifierLabel]; !ok {
return nil, fmt.Errorf("%s is not specified as a label for metric %s", identifierLabel, config.Metric.Name)
}
collector.metric = config.Metric
var aggFunc httpmetrics.AggregatorFunc
if val, ok := config.Config["aggregator"]; ok {
aggFunc, err = httpmetrics.ParseAggregator(val)
if err != nil {
return nil, err
}
}
collector.metricsGetter = httpmetrics.NewJSONPathMetricsGetter(httpmetrics.DefaultMetricsHTTPClient(), aggFunc, jsonPath)
return collector, nil
}
type HTTPCollector struct {
endpoint *url.URL
jsonPath *jsonpath.Compiled
interval time.Duration
metricType v2beta2.MetricSourceType
metricsGetter *httpmetrics.JSONPathMetricsGetter
metric v2beta2.MetricIdentifier
}
func (c *HTTPCollector) GetMetrics() ([]CollectedMetric, error) {
metric, err := c.metricsGetter.GetMetric(*c.endpoint)
if err != nil {
return nil, err
}
value := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{
Time: time.Now(),
},
Value: *resource.NewMilliQuantity(int64(metric*1000), resource.DecimalSI),
},
}
return []CollectedMetric{value}, nil
}
func (c *HTTPCollector) Interval() time.Duration {
return c.interval
}

View File

@ -0,0 +1,94 @@
package collector
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/require"
"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type testExternalMetricsHandler struct {
values []int64
test *testing.T
}
func (t testExternalMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
response, err := json.Marshal(testMetricResponse{t.values})
require.NoError(t.test, err)
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(response)
require.NoError(t.test, err)
}
func makeHTTPTestServer(t *testing.T, values []int64) string {
server := httptest.NewServer(&testExternalMetricsHandler{values: values, test: t})
return server.URL
}
func TestHTTPCollector(t *testing.T) {
for _, tc := range []struct {
name string
values []int64
output int
aggregator string
}{
{
name: "basic",
values: []int64{3},
output: 3,
aggregator: "sum",
},
{
name: "sum",
values: []int64{3, 5, 6},
aggregator: "sum",
output: 14,
},
{
name: "average",
values: []int64{3, 5, 6},
aggregator: "sum",
output: 14,
},
} {
t.Run(tc.name, func(t *testing.T) {
testServer := makeHTTPTestServer(t, tc.values)
plugin, err := NewHTTPCollectorPlugin()
require.NoError(t, err)
testConfig := makeTestHTTPCollectorConfig(testServer, tc.aggregator)
collector, err := plugin.NewCollector(nil, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
require.NoError(t, err)
require.NotNil(t, metrics)
require.Len(t, metrics, 1)
require.EqualValues(t, metrics[0].External.Value.Value(), tc.output)
})
}
}
func makeTestHTTPCollectorConfig(endpoint, aggregator string) *MetricConfig {
config := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "test-metric",
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{identifierLabel: "test-metric"},
},
},
},
Config: map[string]string{
HTTPJsonPathAnnotationKey: "$.values",
HTTPEndpointAnnotationKey: endpoint,
},
}
if aggregator != "" {
config.Config["aggregator"] = aggregator
}
return config
}

View File

@ -0,0 +1,65 @@
package httpmetrics
import (
"fmt"
"math"
)
type AggregatorFunc func(...float64) float64
// Average implements the average mathematical function over a slice of float64
func Average(values ...float64) float64 {
sum := Sum(values...)
return sum / float64(len(values))
}
// Minimum implements the absolute minimum mathematical function over a slice of float64
func Minimum(values ...float64) float64 {
// initialized with positive infinity, all finite numbers are smaller than it
curMin := math.Inf(1)
for _, v := range values {
if v < curMin {
curMin = v
}
}
return curMin
}
// Maximum implements the absolute maximum mathematical function over a slice of float64
func Maximum(values ...float64) float64 {
// initialized with negative infinity, all finite numbers are bigger than it
curMax := math.Inf(-1)
for _, v := range values {
if v > curMax {
curMax = v
}
}
return curMax
}
// Sum implements the summation mathematical function over a slice of float64
func Sum(values ...float64) float64 {
res := 0.0
for _, v := range values {
res += v
}
return res
}
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
func ParseAggregator(aggregator string) (AggregatorFunc, error) {
switch aggregator {
case "avg":
return Average, nil
case "min":
return Minimum, nil
case "max":
return Maximum, nil
case "sum":
return Sum, nil
default:
return nil, fmt.Errorf("aggregator function: %s is unknown", aggregator)
}
}

View File

@ -0,0 +1,57 @@
package httpmetrics
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
func TestReduce(t *testing.T) {
for _, tc := range []struct {
input []float64
output float64
aggregator string
parseError bool
}{
{
input: []float64{1, 2, 3},
output: 2.0,
aggregator: "avg",
parseError: false,
},
{
input: []float64{1, 2, 3},
output: 1.0,
aggregator: "min",
parseError: false,
},
{
input: []float64{1, 2, 3},
output: 3.0,
aggregator: "max",
parseError: false,
},
{
input: []float64{1, 2, 3},
output: 6.0,
aggregator: "sum",
parseError: false,
},
{
input: []float64{1, 2, 3},
aggregator: "non-existent",
parseError: true,
},
} {
t.Run(fmt.Sprintf("Test function: %s", tc.aggregator), func(t *testing.T) {
aggFunc, err := ParseAggregator(tc.aggregator)
if tc.parseError {
require.Error(t, err)
} else {
val := aggFunc(tc.input...)
require.Equal(t, tc.output, val)
}
})
}
}

View File

@ -0,0 +1,136 @@
package httpmetrics
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"time"
"github.com/oliveagle/jsonpath"
)
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
// querying the pods metrics endpoint and lookup the metric value as defined by
// the json path query.
type JSONPathMetricsGetter struct {
jsonPath *jsonpath.Compiled
aggregator AggregatorFunc
client *http.Client
}
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
func NewJSONPathMetricsGetter(httpClient *http.Client, aggregatorFunc AggregatorFunc, compiledPath *jsonpath.Compiled) *JSONPathMetricsGetter {
return &JSONPathMetricsGetter{client: httpClient, aggregator: aggregatorFunc, jsonPath: compiledPath}
}
var DefaultRequestTimeout = 15 * time.Second
var DefaultConnectTimeout = 15 * time.Second
func CustomMetricsHTTPClient(requestTimeout time.Duration, connectTimeout time.Duration) *http.Client {
client := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: connectTimeout,
}).DialContext,
MaxIdleConns: 50,
IdleConnTimeout: 90 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: requestTimeout,
}
return client
}
func DefaultMetricsHTTPClient() *http.Client {
return CustomMetricsHTTPClient(DefaultRequestTimeout, DefaultConnectTimeout)
}
// GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path
// query.
func (g *JSONPathMetricsGetter) GetMetric(metricsURL url.URL) (float64, error) {
data, err := g.fetchMetrics(metricsURL)
if err != nil {
return 0, err
}
// parse data
var jsonData interface{}
err = json.Unmarshal(data, &jsonData)
if err != nil {
return 0, err
}
res, err := g.jsonPath.Lookup(jsonData)
if err != nil {
return 0, err
}
switch res := res.(type) {
case int:
return float64(res), nil
case float32:
return float64(res), nil
case float64:
return res, nil
case []interface{}:
if g.aggregator == nil {
return 0, fmt.Errorf("no aggregator function has been specified")
}
s, err := castSlice(res)
if err != nil {
return 0, err
}
return g.aggregator(s...), nil
default:
return 0, fmt.Errorf("unsupported type %T", res)
}
}
// castSlice takes a slice of interface and returns a slice of float64 if all
// values in slice were castable, else returns an error
func castSlice(in []interface{}) ([]float64, error) {
var out []float64
for _, v := range in {
switch v := v.(type) {
case int:
out = append(out, float64(v))
case float32:
out = append(out, float64(v))
case float64:
out = append(out, v)
default:
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
}
}
return out, nil
}
func (g *JSONPathMetricsGetter) fetchMetrics(metricsURL url.URL) ([]byte, error) {
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
if err != nil {
return nil, err
}
resp, err := g.client.Do(request)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
}
return data, nil
}

View File

@ -0,0 +1,89 @@
package httpmetrics
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/oliveagle/jsonpath"
"github.com/stretchr/testify/require"
)
func TestCastSlice(t *testing.T) {
res1, err1 := castSlice([]interface{}{1, 2, 3})
require.NoError(t, err1)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
require.NoError(t, err2)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
require.NoError(t, err3)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
require.Equal(t, []float64(nil), res4)
}
type testValueResponse struct {
Value int64 `json:"value"`
}
type testValueArrayResponse struct {
Value []int64 `json:"value"`
}
func makeTestHTTPServer(t *testing.T, values ...int64) *httptest.Server {
h := func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.URL.Path, "/metrics")
w.Header().Set("Content-Type", "application/json")
var (
response []byte
err error
)
if len(values) == 1 {
response, err = json.Marshal(testValueResponse{Value: values[0]})
require.NoError(t, err)
} else {
response, err = json.Marshal(testValueArrayResponse{Value: values})
require.NoError(t, err)
}
_, err = w.Write(response)
require.NoError(t, err)
}
return httptest.NewServer(http.HandlerFunc(h))
}
func TestJSONPathMetricsGetter(t *testing.T) {
for _, tc := range []struct {
name string
input []int64
output float64
aggregator AggregatorFunc
}{
{
name: "basic average",
input: []int64{3, 4, 5},
output: 4,
aggregator: Average,
},
} {
t.Run(tc.name, func(t *testing.T) {
server := makeTestHTTPServer(t, tc.input...)
defer server.Close()
path, err := jsonpath.Compile("$.value")
require.NoError(t, err)
getter := NewJSONPathMetricsGetter(DefaultMetricsHTTPClient(), tc.aggregator, path)
url, err := url.Parse(fmt.Sprintf("%s/metrics", server.URL))
require.NoError(t, err)
metric, err := getter.GetMetric(*url)
require.NoError(t, err)
require.Equal(t, tc.output, metric)
})
}
}

View File

@ -0,0 +1,120 @@
package httpmetrics
import (
"fmt"
"net/url"
"strconv"
"time"
"github.com/oliveagle/jsonpath"
v1 "k8s.io/api/core/v1"
)
type PodMetricsGetter interface {
GetMetric(pod *v1.Pod) (float64, error)
}
type PodMetricsJSONPathGetter struct {
scheme string
path string
rawQuery string
port int
metricGetter *JSONPathMetricsGetter
}
func (g PodMetricsJSONPathGetter) GetMetric(pod *v1.Pod) (float64, error) {
if pod.Status.PodIP == "" {
return 0, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Name)
}
metricsURL := g.buildMetricsURL(pod.Status.PodIP)
return g.metricGetter.GetMetric(metricsURL)
}
func NewPodMetricsJSONPathGetter(config map[string]string) (*PodMetricsJSONPathGetter, error) {
getter := PodMetricsJSONPathGetter{}
var (
jsonPath *jsonpath.Compiled
aggregator AggregatorFunc
err error
)
if v, ok := config["json-key"]; ok {
path, err := jsonpath.Compile(v)
if err != nil {
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
}
jsonPath = path
}
if v, ok := config["scheme"]; ok {
getter.scheme = v
}
if v, ok := config["path"]; ok {
getter.path = v
}
if v, ok := config["raw-query"]; ok {
getter.rawQuery = v
}
if v, ok := config["port"]; ok {
n, err := strconv.Atoi(v)
if err != nil {
return nil, err
}
getter.port = n
}
if v, ok := config["aggregator"]; ok {
aggregator, err = ParseAggregator(v)
if err != nil {
return nil, err
}
}
requestTimeout := DefaultRequestTimeout
connectTimeout := DefaultConnectTimeout
if v, ok := config["request-timeout"]; ok {
d, err := time.ParseDuration(v)
if err != nil {
return nil, err
}
if d < 0 {
return nil, fmt.Errorf("Invalid request-timeout config value: %s", v)
}
requestTimeout = d
}
if v, ok := config["connect-timeout"]; ok {
d, err := time.ParseDuration(v)
if err != nil {
return nil, err
}
if d < 0 {
return nil, fmt.Errorf("Invalid connect-timeout config value: %s", v)
}
connectTimeout = d
}
getter.metricGetter = NewJSONPathMetricsGetter(CustomMetricsHTTPClient(requestTimeout, connectTimeout), aggregator, jsonPath)
return &getter, nil
}
// buildMetricsURL will build the full URL needed to hit the pod metric endpoint.
func (g *PodMetricsJSONPathGetter) buildMetricsURL(podIP string) url.URL {
var scheme = g.scheme
if scheme == "" {
scheme = "http"
}
return url.URL{
Scheme: scheme,
Host: fmt.Sprintf("%s:%d", podIP, g.port),
Path: g.path,
RawQuery: g.rawQuery,
}
}

View File

@ -0,0 +1,198 @@
package httpmetrics
import (
"fmt"
"testing"
"time"
"github.com/oliveagle/jsonpath"
"github.com/stretchr/testify/require"
)
func compareMetricsGetter(t *testing.T, first, second *PodMetricsJSONPathGetter) {
require.Equal(t, first.metricGetter.jsonPath, second.metricGetter.jsonPath)
require.Equal(t, first.scheme, second.scheme)
require.Equal(t, first.path, second.path)
require.Equal(t, first.port, second.port)
}
func TestNewPodJSONPathMetricsGetter(t *testing.T) {
configNoAggregator := map[string]string{
"json-key": "$.value",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
getterNoAggregator, err1 := NewPodMetricsJSONPathGetter(configNoAggregator)
require.NoError(t, err1)
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
metricGetter: &JSONPathMetricsGetter{jsonPath: jpath1},
scheme: "http",
path: "/metrics",
port: 9090,
}, getterNoAggregator)
configAggregator := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "9090",
"aggregator": "avg",
}
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
getterAggregator, err2 := NewPodMetricsJSONPathGetter(configAggregator)
require.NoError(t, err2)
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
metricGetter: &JSONPathMetricsGetter{jsonPath: jpath2, aggregator: Average},
scheme: "http",
path: "/metrics",
port: 9090,
}, getterAggregator)
configErrorJSONPath := map[string]string{
"json-key": "{}",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
_, err3 := NewPodMetricsJSONPathGetter(configErrorJSONPath)
require.Error(t, err3)
configErrorPort := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "a9090",
}
_, err4 := NewPodMetricsJSONPathGetter(configErrorPort)
require.Error(t, err4)
configWithRawQuery := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "9090",
"raw-query": "foo=bar&baz=bop",
}
jpath5, _ := jsonpath.Compile(configWithRawQuery["json-key"])
getterWithRawQuery, err5 := NewPodMetricsJSONPathGetter(configWithRawQuery)
require.NoError(t, err5)
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
metricGetter: &JSONPathMetricsGetter{jsonPath: jpath5},
scheme: "http",
path: "/metrics",
port: 9090,
rawQuery: "foo=bar&baz=bop",
}, getterWithRawQuery)
}
func TestBuildMetricsURL(t *testing.T) {
scheme := "http"
ip := "1.2.3.4"
port := "9090"
path := "/v1/test/"
rawQuery := "foo=bar&baz=bop"
// Test building URL with rawQuery
configWithRawQuery := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
"raw-query": rawQuery,
}
_, err := jsonpath.Compile(configWithRawQuery["json-key"])
require.NoError(t, err)
getterWithRawQuery, err1 := NewPodMetricsJSONPathGetter(configWithRawQuery)
require.NoError(t, err1)
expectedURLWithQuery := fmt.Sprintf("%s://%s:%s%s?%s", scheme, ip, port, path, rawQuery)
receivedURLWithQuery := getterWithRawQuery.buildMetricsURL(ip)
require.Equal(t, receivedURLWithQuery.String(), expectedURLWithQuery)
// Test building URL without rawQuery
configWithNoQuery := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
}
_, err2 := jsonpath.Compile(configWithNoQuery["json-key"])
require.NoError(t, err2)
getterWithNoQuery, err3 := NewPodMetricsJSONPathGetter(configWithNoQuery)
require.NoError(t, err3)
expectedURLNoQuery := fmt.Sprintf("%s://%s:%s%s", scheme, ip, port, path)
receivedURLNoQuery := getterWithNoQuery.buildMetricsURL(ip)
require.Equal(t, receivedURLNoQuery.String(), expectedURLNoQuery)
}
func TestCustomTimeouts(t *testing.T) {
scheme := "http"
port := "9090"
path := "/v1/test/"
// Test no custom options results in default timeouts
defaultConfig := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
}
defaultTime := time.Duration(15000) * time.Millisecond
defaultGetter, err1 := NewPodMetricsJSONPathGetter(defaultConfig)
require.NoError(t, err1)
require.Equal(t, defaultGetter.metricGetter.client.Timeout, defaultTime)
// Test with custom request timeout
configWithRequestTimeout := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
"request-timeout": "978ms",
}
exectedTimeout := time.Duration(978) * time.Millisecond
customRequestGetter, err2 := NewPodMetricsJSONPathGetter(configWithRequestTimeout)
require.NoError(t, err2)
require.Equal(t, customRequestGetter.metricGetter.client.Timeout, exectedTimeout)
// Test with custom connect timeout. Unfortunately, it seems there's no way to access the
// connect timeout of the client struct to actually verify it's set :/
configWithConnectTimeout := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
"connect-timeout": "512ms",
}
customRequestGetter, err3 := NewPodMetricsJSONPathGetter(configWithConnectTimeout)
require.NoError(t, err3)
configWithInvalidTimeout := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
"request-timeout": "-256ms",
}
_, err4 := NewPodMetricsJSONPathGetter(configWithInvalidTimeout)
require.Error(t, err4)
configWithInvalidTimeout = map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
"connect-timeout": "-256ms",
}
_, err5 := NewPodMetricsJSONPathGetter(configWithInvalidTimeout)
require.Error(t, err5)
}

View File

@ -18,7 +18,7 @@ const (
InfluxDBMetricName = "flux-query"
influxDBAddressKey = "address"
influxDBTokenKey = "token"
influxDBOrgIDKey = "org-id"
influxDBOrgKey = "org"
influxDBQueryNameLabelKey = "query-name"
)
@ -26,26 +26,26 @@ type InfluxDBCollectorPlugin struct {
kubeClient kubernetes.Interface
address string
token string
orgID string
org string
}
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, orgID string) (*InfluxDBCollectorPlugin, error) {
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, org string) (*InfluxDBCollectorPlugin, error) {
return &InfluxDBCollectorPlugin{
kubeClient: client,
address: address,
token: token,
orgID: orgID,
org: org,
}, nil
}
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewInfluxDBCollector(p.address, p.orgID, p.token, config, interval)
return NewInfluxDBCollector(p.address, p.token, p.org, config, interval)
}
type InfluxDBCollector struct {
address string
token string
orgID string
org string
influxDBClient *influxdb.Client
interval time.Duration
@ -54,7 +54,7 @@ type InfluxDBCollector struct {
query string
}
func NewInfluxDBCollector(address string, token string, orgID string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
func NewInfluxDBCollector(address string, token string, org string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
collector := &InfluxDBCollector{
interval: interval,
metric: config.Metric,
@ -87,8 +87,8 @@ func NewInfluxDBCollector(address string, token string, orgID string, config *Me
if v, ok := config.Config[influxDBTokenKey]; ok {
token = v
}
if v, ok := config.Config[influxDBOrgIDKey]; ok {
orgID = v
if v, ok := config.Config[influxDBOrgKey]; ok {
org = v
}
influxDbClient, err := influxdb.New(address, token)
if err != nil {
@ -96,7 +96,7 @@ func NewInfluxDBCollector(address string, token string, orgID string, config *Me
}
collector.address = address
collector.token = token
collector.orgID = orgID
collector.org = org
collector.influxDBClient = influxDbClient
return collector, nil
}
@ -109,7 +109,7 @@ type queryResult struct {
// getValue returns the first result gathered from an InfluxDB instance.
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.orgID)
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.org)
if err != nil {
return resource.Quantity{}, err
}

View File

@ -36,7 +36,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.orgID, "deadbeef"; want != got {
if got, want := c.org, "deadbeef"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
@ -69,7 +69,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org-id": "deadbeef1234",
"org": "deadbeef1234",
"query-name": "range3m",
},
}
@ -77,7 +77,7 @@ func TestInfluxDBCollector_New(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.orgID, "deadbeef1234"; want != got {
if got, want := c.org, "deadbeef1234"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {

View File

@ -1,240 +0,0 @@
package collector
import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"strconv"
"time"
"github.com/oliveagle/jsonpath"
corev1 "k8s.io/api/core/v1"
)
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
// querying the pods metrics endpoint and lookup the metric value as defined by
// the json path query.
type JSONPathMetricsGetter struct {
jsonPath *jsonpath.Compiled
scheme string
path string
port int
aggregator string
client *http.Client
}
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
httpClient := defaultHTTPClient()
getter := &JSONPathMetricsGetter{client: httpClient}
if v, ok := config["json-key"]; ok {
path, err := jsonpath.Compile(v)
if err != nil {
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
}
getter.jsonPath = path
}
if v, ok := config["scheme"]; ok {
getter.scheme = v
}
if v, ok := config["path"]; ok {
getter.path = v
}
if v, ok := config["port"]; ok {
n, err := strconv.Atoi(v)
if err != nil {
return nil, err
}
getter.port = n
}
if v, ok := config["aggregator"]; ok {
getter.aggregator = v
}
return getter, nil
}
func defaultHTTPClient() *http.Client {
client := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 15 * time.Second,
}).DialContext,
MaxIdleConns: 50,
IdleConnTimeout: 90 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 15 * time.Second,
}
return client
}
// GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path
// query.
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
data, err := g.getPodMetrics(pod, g.scheme, g.path, g.port)
if err != nil {
return 0, err
}
// parse data
var jsonData interface{}
err = json.Unmarshal(data, &jsonData)
if err != nil {
return 0, err
}
res, err := g.jsonPath.Lookup(jsonData)
if err != nil {
return 0, err
}
switch res := res.(type) {
case int:
return float64(res), nil
case float32:
return float64(res), nil
case float64:
return res, nil
case []interface{}:
s, err := castSlice(res)
if err != nil {
return 0, err
}
return reduce(s, g.aggregator)
default:
return 0, fmt.Errorf("unsupported type %T", res)
}
}
// castSlice takes a slice of interface and returns a slice of float64 if all
// values in slice were castable, else returns an error
func castSlice(in []interface{}) ([]float64, error) {
out := []float64{}
for _, v := range in {
switch v := v.(type) {
case int:
out = append(out, float64(v))
case float32:
out = append(out, float64(v))
case float64:
out = append(out, v)
default:
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
}
}
return out, nil
}
// getPodMetrics returns the content of the pods metrics endpoint.
func (g *JSONPathMetricsGetter) getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" {
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
}
if scheme == "" {
scheme = "http"
}
metricsURL := url.URL{
Scheme: scheme,
Host: fmt.Sprintf("%s:%d", pod.Status.PodIP, port),
Path: path,
}
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
if err != nil {
return nil, err
}
resp, err := g.client.Do(request)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
}
return data, nil
}
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
func reduce(values []float64, aggregator string) (float64, error) {
switch aggregator {
case "avg":
return avg(values), nil
case "min":
return min(values), nil
case "max":
return max(values), nil
case "sum":
return sum(values), nil
default:
return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator)
}
}
// avg implements the average mathematical function over a slice of float64
func avg(values []float64) float64 {
sum := sum(values)
return sum / float64(len(values))
}
// min implements the absolute minimum mathematical function over a slice of float64
func min(values []float64) float64 {
// initialized with positive infinity, all finite numbers are smaller than it
curMin := math.Inf(1)
for _, v := range values {
if v < curMin {
curMin = v
}
}
return curMin
}
// max implements the absolute maximum mathematical function over a slice of float64
func max(values []float64) float64 {
// initialized with negative infinity, all finite numbers are bigger than it
curMax := math.Inf(-1)
for _, v := range values {
if v > curMax {
curMax = v
}
}
return curMax
}
// sum implements the summation mathematical function over a slice of float64
func sum(values []float64) float64 {
res := 0.0
for _, v := range values {
res += v
}
return res
}

View File

@ -1,112 +0,0 @@
package collector
import (
"testing"
"github.com/oliveagle/jsonpath"
"github.com/stretchr/testify/require"
)
func compareMetricsGetter(t *testing.T, first, second *JSONPathMetricsGetter) {
require.Equal(t, first.jsonPath, second.jsonPath)
require.Equal(t, first.scheme, second.scheme)
require.Equal(t, first.path, second.path)
require.Equal(t, first.port, second.port)
}
func TestNewJSONPathMetricsGetter(t *testing.T) {
configNoAggregator := map[string]string{
"json-key": "$.value",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
require.NoError(t, err1)
compareMetricsGetter(t, &JSONPathMetricsGetter{
jsonPath: jpath1,
scheme: "http",
path: "/metrics",
port: 9090,
}, getterNoAggregator)
configAggregator := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "9090",
"aggregator": "avg",
}
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
require.NoError(t, err2)
compareMetricsGetter(t, &JSONPathMetricsGetter{
jsonPath: jpath2,
scheme: "http",
path: "/metrics",
port: 9090,
aggregator: "avg",
}, getterAggregator)
configErrorJSONPath := map[string]string{
"json-key": "{}",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
_, err3 := NewJSONPathMetricsGetter(configErrorJSONPath)
require.Error(t, err3)
configErrorPort := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "a9090",
}
_, err4 := NewJSONPathMetricsGetter(configErrorPort)
require.Error(t, err4)
}
func TestCastSlice(t *testing.T) {
res1, err1 := castSlice([]interface{}{1, 2, 3})
require.NoError(t, err1)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
require.NoError(t, err2)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
require.NoError(t, err3)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
require.Equal(t, []float64(nil), res4)
}
func TestReduce(t *testing.T) {
average, err1 := reduce([]float64{1, 2, 3}, "avg")
require.NoError(t, err1)
require.Equal(t, 2.0, average)
min, err2 := reduce([]float64{1, 2, 3}, "min")
require.NoError(t, err2)
require.Equal(t, 1.0, min)
max, err3 := reduce([]float64{1, 2, 3}, "max")
require.NoError(t, err3)
require.Equal(t, 3.0, max)
sum, err4 := reduce([]float64{1, 2, 3}, "sum")
require.NoError(t, err4)
require.Equal(t, 6.0, sum)
_, err5 := reduce([]float64{1, 2, 3}, "inexistent_function")
require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function")
}

View File

@ -6,10 +6,11 @@ import (
"time"
log "github.com/sirupsen/logrus"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
@ -31,7 +32,7 @@ func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutosc
type PodCollector struct {
client kubernetes.Interface
Getter PodMetricsGetter
Getter httpmetrics.PodMetricsGetter
podLabelSelector *metav1.LabelSelector
namespace string
metric autoscalingv2.MetricIdentifier
@ -41,10 +42,6 @@ type PodCollector struct {
httpClient *http.Client
}
type PodMetricsGetter interface {
GetMetric(pod *corev1.Pod) (float64, error)
}
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
// get pod selector based on HPA scale target ref
selector, err := getPodLabelSelector(client, hpa)
@ -62,11 +59,11 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalP
logger: log.WithFields(log.Fields{"Collector": "Pod"}),
}
var getter PodMetricsGetter
var getter httpmetrics.PodMetricsGetter
switch config.CollectorName {
case "json-path":
var err error
getter, err = NewJSONPathMetricsGetter(config.Config)
getter, err = httpmetrics.NewPodMetricsJSONPathGetter(config.Config)
if err != nil {
return nil, err
}
@ -89,32 +86,20 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
return nil, err
}
values := make([]CollectedMetric, 0, len(pods.Items))
// TODO: get metrics in parallel
ch := make(chan CollectedMetric)
errCh := make(chan error)
for _, pod := range pods.Items {
value, err := c.Getter.GetMetric(&pod)
if err != nil {
c.logger.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
continue
}
go c.getPodMetric(pod, ch, errCh)
}
metricValue := CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: custom_metrics.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
Namespace: pod.Namespace,
},
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
},
values := make([]CollectedMetric, 0, len(pods.Items))
for i := 0; i < len(pods.Items); i++ {
select {
case err := <- errCh:
c.logger.Error(err)
case resp := <- ch:
values = append(values, resp)
}
values = append(values, metricValue)
}
return values, nil
@ -124,6 +109,29 @@ func (c *PodCollector) Interval() time.Duration {
return c.interval
}
func (c *PodCollector) getPodMetric(pod corev1.Pod, ch chan CollectedMetric, errCh chan error) {
value, err := c.Getter.GetMetric(&pod)
if err != nil {
errCh <- fmt.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
return
}
ch <- CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: custom_metrics.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
Namespace: pod.Namespace,
},
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
},
}
}
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) {
switch hpa.Spec.ScaleTargetRef.Kind {
case "Deployment":

View File

@ -0,0 +1,155 @@
package collector
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"sync"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
const (
testNamespace = "test-namespace"
applicationLabelName = "application"
applicationLabelValue = "test-application"
testDeploymentName = "test-application"
testInterval = 10 * time.Second
)
func TestPodCollector(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{1, 3, 8, 5, 2},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
makeTestPods(t, host, port, "test-metric", client, 5)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port)
collector, err := plugin.NewCollector(testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
type testMetricResponse struct {
Values []int64 `json:"values"`
}
type testMetricsHandler struct {
values [][]int64
calledCounter uint
t *testing.T
metricsPath string
sync.RWMutex
}
func (h *testMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.Lock()
defer h.Unlock()
require.Equal(h.t, h.metricsPath, r.URL.Path)
require.Less(h.t, int(h.calledCounter), len(h.values))
response, err := json.Marshal(testMetricResponse{Values: h.values[h.calledCounter]})
require.NoError(h.t, err)
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(response)
require.NoError(h.t, err)
h.calledCounter++
}
func makeTestHTTPServer(t *testing.T, values [][]int64) (string, string, *testMetricsHandler) {
metricsHandler := &testMetricsHandler{values: values, t: t, metricsPath: "/metrics"}
server := httptest.NewServer(metricsHandler)
url, err := url.Parse(server.URL)
require.NoError(t, err)
return url.Hostname(), url.Port(), metricsHandler
}
func makeTestConfig(port string) *MetricConfig {
return &MetricConfig{
CollectorName: "json-path",
Config: map[string]string{"json-key": "$.values", "port": port, "path": "/metrics", "aggregator": "sum"},
}
}
func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int) {
for i := 0; i < replicas; i++ {
testPod := &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("test-pod-%d", i),
Labels: map[string]string{applicationLabelName: applicationLabelValue},
Annotations: map[string]string{
fmt.Sprintf("metric-config.pods.%s.json-path/port", metricName): port,
},
},
Status: corev1.PodStatus{
PodIP: testServer,
},
}
_, err := client.CoreV1().Pods(testNamespace).Create(testPod)
require.NoError(t, err)
}
}
func makeTestDeployment(t *testing.T, client kubernetes.Interface) *appsv1.Deployment {
deployment := appsv1.Deployment{
ObjectMeta: v1.ObjectMeta{Name: testDeploymentName},
Spec: appsv1.DeploymentSpec{
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{applicationLabelName: applicationLabelValue},
},
},
}
_, err := client.AppsV1().Deployments(testNamespace).Create(&deployment)
require.NoError(t, err)
return &deployment
}
func makeTestHPA(t *testing.T, client kubernetes.Interface) *autoscalingv2.HorizontalPodAutoscaler {
hpa := &autoscalingv2.HorizontalPodAutoscaler{
ObjectMeta: v1.ObjectMeta{
Name: "test-hpa",
Namespace: testNamespace,
},
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
Kind: "Deployment",
Name: testDeploymentName,
APIVersion: "apps/v1",
},
},
}
_, err := client.AutoscalingV2beta2().HorizontalPodAutoscalers("test-namespace").Create(hpa)
require.NoError(t, err)
return hpa
}

View File

@ -128,7 +128,7 @@ func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa
func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
// TODO: use real context
value, err := c.promAPI.Query(context.Background(), c.query, time.Now().UTC())
value, _, err := c.promAPI.Query(context.Background(), c.query, time.Now().UTC())
if err != nil {
return nil, err
}

View File

@ -125,7 +125,7 @@ func getWeights(ingressAnnotations map[string]string, backendAnnotations []strin
// getCollector returns a collector for getting the metrics.
func (c *SkipperCollector) getCollector() (Collector, error) {
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
ingress, err := c.client.NetworkingV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

View File

@ -10,7 +10,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
v1beta1 "k8s.io/api/networking/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
@ -331,8 +331,8 @@ func TestSkipperCollector(t *testing.T) {
err := makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.hostnames, tc.backendWeights)
require.NoError(t, err)
plugin := makePlugin(tc.metric)
hpa := makeHPA(tc.ingressName, tc.backend)
config := makeConfig(tc.backend, tc.fakedAverage)
hpa := makeHPA(tc.namespace, tc.ingressName, tc.backend)
config := makeConfig(tc.ingressName, tc.namespace, tc.backend, tc.fakedAverage)
_, err = newDeployment(client, tc.namespace, tc.backend, tc.replicas, tc.readyReplicas)
require.NoError(t, err)
collector, err := NewSkipperCollector(client, plugin, hpa, config, time.Minute, tc.backendAnnotations, tc.backend)
@ -341,6 +341,7 @@ func TestSkipperCollector(t *testing.T) {
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, map[string]string{"query": tc.expectedQuery}, plugin.config)
require.NoError(t, err, "failed to collect metrics: %v", err)
require.Len(t, collected, 1, "the number of metrics returned is not 1")
@ -381,12 +382,13 @@ func makeIngress(client kubernetes.Interface, namespace, ingressName, backend st
Host: hostname,
})
}
_, err := client.ExtensionsV1beta1().Ingresses(namespace).Create(ingress)
_, err := client.NetworkingV1beta1().Ingresses(namespace).Create(ingress)
return err
}
func makeHPA(ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler {
func makeHPA(namespace, ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler {
return &autoscalingv2.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{Namespace: namespace},
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
Kind: "Deployment",
@ -404,9 +406,13 @@ func makeHPA(ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler
},
}
}
func makeConfig(backend string, fakedAverage bool) *MetricConfig {
func makeConfig(ingressName, namespace, backend string, fakedAverage bool) *MetricConfig {
config := &MetricConfig{
MetricTypeName: MetricTypeName{Metric: autoscalingv2.MetricIdentifier{Name: fmt.Sprintf("%s,%s", rpsMetricName, backend)}},
ObjectReference: custom_metrics.ObjectReference{
Name: ingressName,
Namespace: namespace,
},
MetricSpec: autoscalingv2.MetricSpec{
Object: &autoscalingv2.ObjectMetricSource{
Target: autoscalingv2.MetricTarget{},

View File

@ -69,9 +69,15 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue) {
Resource: "pods",
}
case "Ingress":
// group can be either `extentions` or `networking.k8s.io`
group := "extensions"
gv, err := schema.ParseGroupVersion(value.DescribedObject.APIVersion)
if err == nil {
group = gv.Group
}
groupResource = schema.GroupResource{
Resource: "ingresses",
Group: "extensions",
Group: group,
}
}

View File

@ -92,7 +92,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"address of InfluxDB 2.x server to query (e.g. http://localhost:9999)")
flags.StringVar(&o.InfluxDBToken, "influxdb-token", o.InfluxDBToken, ""+
"token for InfluxDB 2.x server to query")
flags.StringVar(&o.InfluxDBOrgID, "influxdb-org-id", o.InfluxDBOrgID, ""+
flags.StringVar(&o.InfluxDBOrg, "influxdb-org", o.InfluxDBOrg, ""+
"organization ID for InfluxDB 2.x server to query")
flags.StringVar(&o.ZMONKariosDBEndpoint, "zmon-kariosdb-endpoint", o.ZMONKariosDBEndpoint, ""+
"url of ZMON KariosDB endpoint to query for ZMON checks")
@ -183,13 +183,15 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
}
if o.InfluxDBAddress != "" {
influxdbPlugin, err := collector.NewInfluxDBCollectorPlugin(client, o.InfluxDBAddress, o.InfluxDBToken, o.InfluxDBOrgID)
influxdbPlugin, err := collector.NewInfluxDBCollectorPlugin(client, o.InfluxDBAddress, o.InfluxDBToken, o.InfluxDBOrg)
if err != nil {
return fmt.Errorf("failed to initialize InfluxDB collector plugin: %v", err)
}
collectorFactory.RegisterExternalCollector([]string{collector.InfluxDBMetricName}, influxdbPlugin)
}
plugin, _ := collector.NewHTTPCollectorPlugin()
collectorFactory.RegisterExternalCollector([]string{collector.HTTPMetricName}, plugin)
// register generic pod collector
err = collectorFactory.RegisterPodsCollector("", collector.NewPodCollectorPlugin(client))
if err != nil {
@ -309,8 +311,8 @@ type AdapterServerOptions struct {
InfluxDBAddress string
// InfluxDBToken is the token used for querying InfluxDB
InfluxDBToken string
// InfluxDBOrgID is the organization ID used for querying InfluxDB
InfluxDBOrgID string
// InfluxDBOrg is the organization ID used for querying InfluxDB
InfluxDBOrg string
// ZMONKariosDBEndpoint enables ZMON check queries to the specified
// kariosDB endpoint
ZMONKariosDBEndpoint string