Compare commits

...

73 Commits

Author SHA1 Message Date
dbed1570ba Merge pull request #156 from zalando-incubator/networking.k8s.io
Add support for Ingress of API Group networking.k8s.io
2020-05-26 15:09:18 +02:00
ef24244074 Add support for networking.k8s.io
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2020-05-26 14:47:46 +02:00
cadf2dff3c Merge pull request #153 from doyshinda/custom_timeouts
Add support for custom timeouts
2020-05-18 10:27:17 +02:00
db83268343 Merge pull request #150 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.24
Bump github.com/aws/aws-sdk-go from 1.30.19 to 1.30.24
2020-05-18 10:25:50 +02:00
78c2ef742b Merge pull request #152 from doyshinda/parallel_get_metrics
Fetch pod metrics in parallel
2020-05-18 09:26:14 +02:00
f0b817629c Add support for custom timeouts
Allow configuration per metric for connect and request timeouts
when querying pods for JSON metrics.

Signed-off-by: Abe Friesen <2319792+doyshinda@users.noreply.github.com>
2020-05-16 07:15:58 -06:00
3e7b66070c Fetch pod metrics in parallel
Fetching metrics from pods sequentially, with a large number of
pods, can result in poor performance when some of those pods have
been terminated by the HPA in a normal scale down event.

Signed-off-by: Abe Friesen <2319792+doyshinda@users.noreply.github.com>
2020-05-16 06:47:13 -06:00
f3d33b6132 Bump github.com/aws/aws-sdk-go from 1.30.19 to 1.30.24
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.19 to 1.30.24.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.19...v1.30.24)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-05-11 10:18:26 +00:00
7b7af889fb Merge pull request #148 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.19
Bump github.com/aws/aws-sdk-go from 1.30.14 to 1.30.19
2020-05-06 12:45:54 +02:00
2bcaa80ce0 Merge pull request #147 from zalando-incubator/dependabot/go_modules/github.com/prometheus/client_golang-1.6.0
Bump github.com/prometheus/client_golang from 1.5.1 to 1.6.0
2020-05-06 12:45:46 +02:00
a2e6980b25 Merge pull request #146 from zalando-incubator/dependabot/go_modules/github.com/sirupsen/logrus-1.6.0
Bump github.com/sirupsen/logrus from 1.5.0 to 1.6.0
2020-05-06 12:45:23 +02:00
f0783dcdd8 Bump github.com/aws/aws-sdk-go from 1.30.14 to 1.30.19
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.14 to 1.30.19.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.14...v1.30.19)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-05-04 09:59:27 +00:00
560fb40dae Bump github.com/prometheus/client_golang from 1.5.1 to 1.6.0
Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.5.1 to 1.6.0.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/master/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.5.1...v1.6.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-05-04 09:58:05 +00:00
41229e370d Bump github.com/sirupsen/logrus from 1.5.0 to 1.6.0
Bumps [github.com/sirupsen/logrus](https://github.com/sirupsen/logrus) from 1.5.0 to 1.6.0.
- [Release notes](https://github.com/sirupsen/logrus/releases)
- [Changelog](https://github.com/sirupsen/logrus/blob/master/CHANGELOG.md)
- [Commits](https://github.com/sirupsen/logrus/compare/v1.5.0...v1.6.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-05-04 09:57:46 +00:00
338393c11e Merge pull request #145 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.14
Bump github.com/aws/aws-sdk-go from 1.30.9 to 1.30.14
2020-05-02 21:28:27 +02:00
8e7d1bc0d6 Bump github.com/aws/aws-sdk-go from 1.30.9 to 1.30.14
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.9 to 1.30.14.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.9...v1.30.14)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-04-27 10:18:59 +00:00
dc7fb5875b Merge pull request #143 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.9
Bump github.com/aws/aws-sdk-go from 1.30.7 to 1.30.9
2020-04-21 06:20:06 +02:00
8f70d3493e Bump github.com/aws/aws-sdk-go from 1.30.7 to 1.30.9
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.7 to 1.30.9.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.7...v1.30.9)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-04-20 10:00:12 +00:00
97ec13d010 Update Travis to Go 1.14 (#140)
Signed-off-by: Gábor Lipták <gliptak@gmail.com>
2020-04-12 22:54:49 +02:00
5e20dcd961 Merge pull request #139 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.7
Bump github.com/aws/aws-sdk-go from 1.30.3 to 1.30.7
2020-04-09 15:52:18 +02:00
f27dbc3939 Bump github.com/aws/aws-sdk-go from 1.30.3 to 1.30.7
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.3 to 1.30.7.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.3...v1.30.7)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-04-09 08:48:10 +00:00
6443613930 Merge pull request #135 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.3
Bump github.com/aws/aws-sdk-go from 1.30.2 to 1.30.3
2020-04-03 12:35:00 +02:00
d9c2e50f2c Bump github.com/aws/aws-sdk-go from 1.30.2 to 1.30.3
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.2 to 1.30.3.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.2...v1.30.3)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-04-03 08:48:22 +00:00
a8cd761f85 Merge pull request #122 from zalando-incubator/http-collector
Add basic HTTP collector
2020-04-02 14:34:47 +02:00
43fa626ca1 Merge pull request #134 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.2
Bump github.com/aws/aws-sdk-go from 1.30.0 to 1.30.2
2020-04-02 12:01:48 +02:00
2d3ddc53ef Add and HTTP metrics collector
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-04-02 11:05:37 +02:00
02880fad2d Bump github.com/aws/aws-sdk-go from 1.30.0 to 1.30.2
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.30.0 to 1.30.2.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.30.0...v1.30.2)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-04-02 08:34:49 +00:00
405e347620 Merge pull request #132 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.30.0
Bump github.com/aws/aws-sdk-go from 1.29.33 to 1.30.0
2020-03-31 19:14:02 +02:00
f15aacad5c Merge pull request #123 from zalando-incubator/dependabot/go_modules/github.com/spf13/cobra-0.0.7
Bump github.com/spf13/cobra from 0.0.6 to 0.0.7
2020-03-31 19:13:31 +02:00
c0a9f525b8 Merge pull request #131 from vetinari/patch-1
fix error to not contain twice the namespace
2020-03-31 19:13:05 +02:00
24207d285c Merge pull request #126 from zalando-incubator/dependabot/go_modules/github.com/prometheus/client_golang-1.5.1
Bump github.com/prometheus/client_golang from 1.0.0 to 1.5.1
2020-03-31 19:12:03 +02:00
860aba807e fix error to not contain twice the namespace
but to contain also the name of the pod

Signed-off-by: Hanno Hecker <hanno@zalando.de>
2020-03-31 10:44:55 +02:00
73659f8ac6 Bump github.com/aws/aws-sdk-go from 1.29.33 to 1.30.0
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.29.33 to 1.30.0.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.29.33...v1.30.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-31 08:42:58 +00:00
280d358538 Bump github.com/prometheus/client_golang from 1.0.0 to 1.5.1
Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.0.0 to 1.5.1.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/master/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.0.0...v1.5.1)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-30 09:50:33 +00:00
735bb164e2 Bump github.com/spf13/cobra from 0.0.6 to 0.0.7
Bumps [github.com/spf13/cobra](https://github.com/spf13/cobra) from 0.0.6 to 0.0.7.
- [Release notes](https://github.com/spf13/cobra/releases)
- [Commits](https://github.com/spf13/cobra/compare/v0.0.6...0.0.7)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-30 09:49:38 +00:00
d28712abd1 Merge pull request #110 from zalando-incubator/update-dependencies
Updated dependencies based on version 1.17
2020-03-27 11:05:59 +01:00
670692b23c Updated dependencies based on version 1.17
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-03-27 10:54:06 +01:00
33683fe88d Merge pull request #121 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.29.33
Bump github.com/aws/aws-sdk-go from 1.29.31 to 1.29.33
2020-03-27 10:23:34 +01:00
abaef5e491 Bump github.com/aws/aws-sdk-go from 1.29.31 to 1.29.33
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.29.31 to 1.29.33.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.29.31...v1.29.33)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-27 08:31:09 +00:00
670f081a5e Merge pull request #115 from zalando-incubator/dependabot/go_modules/github.com/spf13/cobra-0.0.6
Bump github.com/spf13/cobra from 0.0.3 to 0.0.6
2020-03-26 12:10:03 +01:00
55a743e5ac Merge pull request #119 from zalando-incubator/dependabot/go_modules/github.com/aws/aws-sdk-go-1.29.31
Bump github.com/aws/aws-sdk-go from 1.29.30 to 1.29.31
2020-03-25 14:06:01 +01:00
830d385a4a Bump github.com/spf13/cobra from 0.0.3 to 0.0.6
Bumps [github.com/spf13/cobra](https://github.com/spf13/cobra) from 0.0.3 to 0.0.6.
- [Release notes](https://github.com/spf13/cobra/releases)
- [Commits](https://github.com/spf13/cobra/compare/v0.0.3...v0.0.6)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-25 12:27:25 +00:00
f2638c6843 Bump github.com/aws/aws-sdk-go from 1.29.30 to 1.29.31
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.29.30 to 1.29.31.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.29.30...v1.29.31)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-25 08:33:33 +00:00
b77fba2e7b Merge pull request #114 from zalando-incubator/dependabot/go_modules/github.com/stretchr/testify-1.5.1
Bump github.com/stretchr/testify from 1.4.0 to 1.5.1
2020-03-24 10:49:11 +01:00
e77d51f97a Merge pull request #112 from zalando-incubator/dependabot/go_modules/github.com/influxdata/influxdb-client-go-0.1.5
Bump github.com/influxdata/influxdb-client-go from 0.1.4 to 0.1.5
2020-03-24 10:48:43 +01:00
69ac42ed7d Merge pull request #111 from zalando-incubator/dependabot/go_modules/github.com/prometheus/client_golang-0.9.4
Bump github.com/prometheus/client_golang from 0.9.2 to 0.9.4
2020-03-24 10:48:02 +01:00
582f05539c Bump github.com/sirupsen/logrus from 1.4.2 to 1.5.0 (#117)
Bumps [github.com/sirupsen/logrus](https://github.com/sirupsen/logrus) from 1.4.2 to 1.5.0.
- [Release notes](https://github.com/sirupsen/logrus/releases)
- [Changelog](https://github.com/sirupsen/logrus/blob/master/CHANGELOG.md)
- [Commits](https://github.com/sirupsen/logrus/compare/v1.4.2...v1.5.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>
2020-03-24 10:46:59 +01:00
46f2e5c4fd Bump github.com/aws/aws-sdk-go from 1.29.4 to 1.29.30 (#116)
Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.29.4 to 1.29.30.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.29.4...v1.29.30)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>
2020-03-24 10:46:51 +01:00
31bc1771df Bump github.com/sirupsen/logrus from 1.4.2 to 1.5.0
Bumps [github.com/sirupsen/logrus](https://github.com/sirupsen/logrus) from 1.4.2 to 1.5.0.
- [Release notes](https://github.com/sirupsen/logrus/releases)
- [Changelog](https://github.com/sirupsen/logrus/blob/master/CHANGELOG.md)
- [Commits](https://github.com/sirupsen/logrus/compare/v1.4.2...v1.5.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-24 08:49:15 +00:00
836c78d08b Bump github.com/stretchr/testify from 1.4.0 to 1.5.1
Bumps [github.com/stretchr/testify](https://github.com/stretchr/testify) from 1.4.0 to 1.5.1.
- [Release notes](https://github.com/stretchr/testify/releases)
- [Commits](https://github.com/stretchr/testify/compare/v1.4.0...v1.5.1)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-22 14:21:19 +00:00
6f5ab042e6 Bump github.com/influxdata/influxdb-client-go from 0.1.4 to 0.1.5
Bumps [github.com/influxdata/influxdb-client-go](https://github.com/influxdata/influxdb-client-go) from 0.1.4 to 0.1.5.
- [Release notes](https://github.com/influxdata/influxdb-client-go/releases)
- [Commits](https://github.com/influxdata/influxdb-client-go/compare/v0.1.4...v0.1.5)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-22 14:20:20 +00:00
5abc7388fb Bump github.com/prometheus/client_golang from 0.9.2 to 0.9.4
Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 0.9.2 to 0.9.4.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/master/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v0.9.2...v0.9.4)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-03-22 14:19:58 +00:00
5bf87cb10e Add support for passing URL query params to pod metric endpoints (#109)
Adds a new metric-config option named `rawQuery`. The value of
this option will be appended to the metric `path` as URL query
parameters to be used by the pod's application. E.g.,:
```
metric-config.pods.requests-per-second.json-path/rawQuery: "foo=bar&baz=bop"
```
will apppend `?foo=bar&baz=bop` to the URL.

Signed-off-by: Abe Friesen <2319792+doyshinda@users.noreply.github.com>
2020-03-10 10:45:28 +01:00
c6610750e4 Merge pull request #107 from affo/fix/influxdb-token-org-swap
fix(influxdb): fix swap of org/token
2020-02-24 09:27:34 +01:00
04ae6d955e fix(influxdb): fix swap of org/token parameter passing to
InfluxDBCollector

Convert from `orgID` to `org`.

Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-02-21 19:13:17 +01:00
2d56c202a7 Update AWS SDK to support AWS IAM solution (#106)
Fix #105

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2020-02-18 09:59:31 +01:00
c9fa15c7d4 Updated the tests (#103)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-02-04 09:48:50 +01:00
e3330dcf43 Reuse the HTTP client for scraping pods (#102)
Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
2020-01-30 17:49:22 +01:00
8e4662b26c Permit disregarding incompatible HPAs (#95)
* This commit adds a --disregard-incompatible-hpas that makes the HPA
provider stop erroring out when a collector cannot be created for a
metric in a HPA. Useful when kube-metrics-adapter runs alongside another
metrics provider. Fixes issue #94.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Make tests pass

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Wraps the Plugin Not Found error in a new type that can be checked by the caller of a function to determine if its contents should be logged or added as an event to the HPA, when this HPA is incompatible.
The disregardIncompatibleHPAs is now targetting only the log or addition of the same event.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Invert if expression to select when we should log
CreateNewMetricsCollector errors: don't log when both conditions are true - it's not a PluginNotFoundError
and disregardIncompatibleHPAs flag is set to true. This way, if an error
is NOT PluginNotFoundError it will always be logged, and when it IS
PluginNotFoundError it will only be logged when
disregardIncompatibleHPAs is false.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Remove redundant "whether to"

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add test case for updating HPAs via HPA Provider while disregarding
incompatible HPAs.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2020-01-30 11:33:15 +01:00
9e211b181a Merge pull request #101 from zalando-incubator/update-to-v2beta2
Only support autoscaling/v2beta2
2020-01-29 16:47:15 +01:00
9d78fff1b5 Only support autoscaling/v2beta2
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2020-01-29 15:59:20 +01:00
1c6f9e2ea6 Merge pull request #100 from affo/feat/influxdb-collector
feat(collector): add InfluxDB collector
2020-01-24 10:56:52 +01:00
c0eda7cd1e adding tests for collector creation
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-24 09:54:35 +01:00
75f3e48f70 address szuecs review
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-24 09:07:56 +01:00
5b55bea994 feat(collector): add InfluxDB collector
Signed-off-by: Lorenzo Affetti <lorenzo.affetti@gmail.com>
2020-01-22 10:09:29 +01:00
4412e3dca4 Merge pull request #92 from zalando-incubator/njuettner-patch
Updating golangci
2019-11-26 14:49:02 +01:00
8f9277258c Increase timeout for golangci-lint
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-11-25 11:36:18 +01:00
8c3fef45fd Updating golangci
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
Signed-off-by: Nick Jüttner <nick@zalando.de>
2019-11-25 10:56:22 +01:00
120950078c Fix #89 by copying the MatchLabels map instead of referencing it. (#90)
Signed-off-by: Johann Fuechsl <johann@fuechsl.co>
2019-11-07 14:38:26 +01:00
0790bc351a This fixes an issue with the type switch that was never able to fall (#88)
into cases of []<number>, being <number> a number type such as int,
float32, float64. This is because Go can't type cast slices of
interface{} out right because it's impossible to know the true types of
the slice members beforehand.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2019-11-05 09:43:25 +01:00
f6b2aede5b Support for JSONPath expressions that return arrays of values (#85)
* This is the initial implementation of support for JSONPath expressions
that return arrays of values instead of a single value.

This extends the
collector to define a few handy reducer functions that take in the slice
of float64 and return a single value. It also allows the user to define
which reducer function to use via the
"metric-config.<metricType>.<metricName>.json-path/reducer-func"
annotation, which
can have the values of 'avg', 'min', 'max' and 'sum'.

For instance, the Ruby puma webserver exposes metrics of the form of $.worker_status[*].last_status.pool_capacity that have to be consumed as an array of values to be properly targetted.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Renames "reducerFunc" to "aggregator" for consistency with other
collectors. Renames the annotation from
"metric-config.<metricType>.<metricName>.json-path/reducer-func" to "metric-config.<metricType>.<metricName>.json-path/aggregator".

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Return error instead of defaulting to the avg aggregator, when no valid
aggregator name was specified and the JSONPath value is a slice of
numbers.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Fix index out of range on initialized output slice that was found while
writing tests.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add tests for all added functions + NewJSONPathMetricsGetter

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Add documentation on the `aggregator` option.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* reducer function -> aggregator function

Signed-off-by: Tomás Pinho <me@tomaspinho.com>

* Fix comment to account for returned error.

Signed-off-by: Tomás Pinho <me@tomaspinho.com>
2019-10-24 18:15:10 +02:00
7d5e719eb0 Merge pull request #86 from pinkavaj/fix-var-name
Fix variable name typo
2019-10-24 10:16:58 +02:00
7497a61a2c Fix variable name typo
Signed-off-by: Jiri Pinkava <jiri.pinkava@rossum.ai>
2019-10-24 09:45:22 +02:00
32 changed files with 2235 additions and 1217 deletions

View File

@ -1,7 +1,4 @@
run:
skip-files:
- "pkg/provider/generated.conversion.go"
- "pkg/provider/conversion.go"
linters-settings:
golint:
min-confidence: 0.9

View File

@ -2,10 +2,10 @@ language: go
dist: xenial
go:
- "1.13.x"
- "1.14.x"
env:
- GO111MODULE=on GOLANGCI_RELEASE="v1.16.0"
- GO111MODULE=on GOLANGCI_RELEASE="v1.21.0"
before_install:
- GO111MODULE=off go get github.com/mattn/goveralls
@ -13,8 +13,8 @@ before_install:
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANGCI_RELEASE}
script:
- make check
- make test
- make build.docker
- make check
- roveralls
- goveralls -v -coverprofile=roveralls.coverprofile -service=travis-ci

View File

@ -19,7 +19,8 @@ test:
go test -v $(GOPKGS)
check:
golangci-lint run ./...
go mod download
golangci-lint run --timeout=2m ./...
build.local: build/$(BINARY)
build.linux: build/linux/$(BINARY)

280
README.md

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
apiVersion: autoscaling/v2beta1
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: custom-metrics-consumer
@ -25,25 +25,36 @@ spec:
# - type: Resource
# resource:
# name: cpu
# targetAverageUtilization: 50
# current:
# averageUtilization: 50
- type: Pods
pods:
metricName: queue-length
targetAverageValue: 1k
metric:
name: queue-length
target:
averageValue: 1k
type: AverageValue
- type: Object
object:
metricName: requests-per-second
target:
describedObject:
apiVersion: extensions/v1beta1
kind: Ingress
name: custom-metrics-consumer
averageValue: 10
targetValue: 10 # this must be set, but has no effect if `averageValue` is defined.
metric:
name: requests-per-second
target:
averageValue: "10"
type: AverageValue
- type: External
external:
metricName: sqs-queue-length
metricSelector:
matchLabels:
queue-name: foobar
region: eu-central-1
targetAverageValue: 30
metric:
name: sqs-queue-length
selector:
matchLabels:
queue-name: foobar
region: eu-central-1
target:
averageValue: "30"
type: AverageValue

43
go.mod
View File

@ -2,32 +2,31 @@ module github.com/zalando-incubator/kube-metrics-adapter
require (
github.com/NYTimes/gziphandler v1.0.1 // indirect
github.com/aws/aws-sdk-go v1.16.6
github.com/coreos/go-systemd v0.0.0-20180705093442-88bfeed483d3 // indirect
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
github.com/aws/aws-sdk-go v1.30.24
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
github.com/influxdata/influxdb-client-go v0.1.5
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20200323093244-5046ce1afe6b
github.com/lib/pq v1.2.0 // indirect
github.com/mattn/go-colorable v0.1.4 // indirect
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/sirupsen/logrus v1.4.2
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/spf13/cobra v0.0.3
github.com/stretchr/testify v1.3.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
github.com/onsi/ginkgo v1.11.0 // indirect
github.com/onsi/gomega v1.8.1 // indirect
github.com/prometheus/client_golang v1.6.0
github.com/prometheus/common v0.9.1
github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v0.0.7
github.com/stretchr/testify v1.5.1
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
gopkg.in/inf.v0 v0.9.1 // indirect
k8s.io/api v0.0.0-20190918155943-95b840bb6a1f
k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655
k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad // indirect
k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90
k8s.io/component-base v0.0.0-20190918160511-547f6c5d7090
k8s.io/klog v0.4.0
k8s.io/metrics v0.0.0-20190226180357-f3f09b9076d1
golang.org/x/tools v0.0.0-20200204192400-7124308813f3 // indirect
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
k8s.io/api v0.17.3
k8s.io/apimachinery v0.17.4
k8s.io/client-go v0.17.3
k8s.io/component-base v0.17.3
k8s.io/klog v1.0.0
k8s.io/metrics v0.17.3
)
go 1.13

510
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,7 @@ package main
import (
"flag"
_ "net/http/pprof"
"os"
"runtime"

View File

@ -65,6 +65,42 @@ func TestParser(t *testing.T) {
},
PerReplica: false,
},
{
Name: "influxdb metrics",
Annotations: map[string]string{
"metric-config.external.flux-query.influxdb/range1m": `from(bucket: "?") |> range(start: -1m)`,
"metric-config.external.flux-query.influxdb/range2m": `from(bucket: "?") |> range(start: -2m)`,
"metric-config.external.flux-query.influxdb/range3m": `from(bucket: "?") |> range(start: -3m)`,
"metric-config.external.flux-query.influxdb/address": "http://localhost:9999",
"metric-config.external.flux-query.influxdb/token": "sEcr3TT0ken",
"metric-config.external.flux-query.influxdb/org-id": "deadbeef",
},
MetricName: "flux-query",
MetricType: autoscalingv2.ExternalMetricSourceType,
ExpectedConfig: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org-id": "deadbeef",
},
},
{
Name: "http metrics",
Annotations: map[string]string{
"metric-config.external.http.json/json-key": "$.metric.value",
"metric-config.external.http.json/endpoint": "http://metric-source.source-namespace.svc.cluster.local:8000/metrics",
"metric-config.external.http.json/aggregator": "avg",
},
MetricName: "http",
MetricType: autoscalingv2.ExternalMetricSourceType,
ExpectedConfig: map[string]string{
"json-key": "$.metric.value",
"endpoint": "http://metric-source.source-namespace.svc.cluster.local:8000/metrics",
"aggregator": "avg",
},
},
} {
t.Run(tc.Name, func(t *testing.T) {
hpaMap := make(AnnotationConfigMap)

View File

@ -46,6 +46,14 @@ type CollectorPlugin interface {
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
}
type PluginNotFoundError struct {
metricTypeName MetricTypeName
}
func (p *PluginNotFoundError) Error() string {
return fmt.Sprintf("no plugin found for %s", p.metricTypeName)
}
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
if metricCollector == "" {
c.podsPlugins.Any = plugin
@ -139,7 +147,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscal
}
}
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
return nil, &PluginNotFoundError{metricTypeName: config.MetricTypeName}
}
type MetricTypeName struct {
@ -213,7 +221,9 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
metric.External.Metric.Selector != nil &&
metric.External.Metric.Selector.MatchLabels != nil {
config.Config = metric.External.Metric.Selector.MatchLabels
for k, v := range metric.External.Metric.Selector.MatchLabels {
config.Config[k] = v
}
}
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)

View File

@ -0,0 +1,103 @@
package collector
import (
"fmt"
"net/url"
"time"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
"github.com/oliveagle/jsonpath"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
HTTPMetricName = "http"
HTTPEndpointAnnotationKey = "endpoint"
HTTPJsonPathAnnotationKey = "json-key"
identifierLabel = "identifier"
)
type HTTPCollectorPlugin struct{}
func NewHTTPCollectorPlugin() (*HTTPCollectorPlugin, error) {
return &HTTPCollectorPlugin{}, nil
}
func (p *HTTPCollectorPlugin) NewCollector(_ *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
collector := &HTTPCollector{}
var (
value string
ok bool
)
if value, ok = config.Config[HTTPJsonPathAnnotationKey]; !ok {
return nil, fmt.Errorf("config value %s not found", HTTPJsonPathAnnotationKey)
}
jsonPath, err := jsonpath.Compile(value)
if err != nil {
return nil, fmt.Errorf("failed to parse json path: %v", err)
}
collector.jsonPath = jsonPath
if value, ok = config.Config[HTTPEndpointAnnotationKey]; !ok {
return nil, fmt.Errorf("config value %s not found", HTTPEndpointAnnotationKey)
}
collector.endpoint, err = url.Parse(value)
if err != nil {
return nil, err
}
collector.interval = interval
collector.metricType = config.Type
if config.Metric.Selector == nil || config.Metric.Selector.MatchLabels == nil {
return nil, fmt.Errorf("no label selector specified for metric: %s", config.Metric.Name)
}
if _, ok := config.Metric.Selector.MatchLabels[identifierLabel]; !ok {
return nil, fmt.Errorf("%s is not specified as a label for metric %s", identifierLabel, config.Metric.Name)
}
collector.metric = config.Metric
var aggFunc httpmetrics.AggregatorFunc
if val, ok := config.Config["aggregator"]; ok {
aggFunc, err = httpmetrics.ParseAggregator(val)
if err != nil {
return nil, err
}
}
collector.metricsGetter = httpmetrics.NewJSONPathMetricsGetter(httpmetrics.DefaultMetricsHTTPClient(), aggFunc, jsonPath)
return collector, nil
}
type HTTPCollector struct {
endpoint *url.URL
jsonPath *jsonpath.Compiled
interval time.Duration
metricType v2beta2.MetricSourceType
metricsGetter *httpmetrics.JSONPathMetricsGetter
metric v2beta2.MetricIdentifier
}
func (c *HTTPCollector) GetMetrics() ([]CollectedMetric, error) {
metric, err := c.metricsGetter.GetMetric(*c.endpoint)
if err != nil {
return nil, err
}
value := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{
Time: time.Now(),
},
Value: *resource.NewMilliQuantity(int64(metric*1000), resource.DecimalSI),
},
}
return []CollectedMetric{value}, nil
}
func (c *HTTPCollector) Interval() time.Duration {
return c.interval
}

View File

@ -0,0 +1,94 @@
package collector
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/require"
"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type testExternalMetricsHandler struct {
values []int64
test *testing.T
}
func (t testExternalMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
response, err := json.Marshal(testMetricResponse{t.values})
require.NoError(t.test, err)
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(response)
require.NoError(t.test, err)
}
func makeHTTPTestServer(t *testing.T, values []int64) string {
server := httptest.NewServer(&testExternalMetricsHandler{values: values, test: t})
return server.URL
}
func TestHTTPCollector(t *testing.T) {
for _, tc := range []struct {
name string
values []int64
output int
aggregator string
}{
{
name: "basic",
values: []int64{3},
output: 3,
aggregator: "sum",
},
{
name: "sum",
values: []int64{3, 5, 6},
aggregator: "sum",
output: 14,
},
{
name: "average",
values: []int64{3, 5, 6},
aggregator: "sum",
output: 14,
},
} {
t.Run(tc.name, func(t *testing.T) {
testServer := makeHTTPTestServer(t, tc.values)
plugin, err := NewHTTPCollectorPlugin()
require.NoError(t, err)
testConfig := makeTestHTTPCollectorConfig(testServer, tc.aggregator)
collector, err := plugin.NewCollector(nil, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
require.NoError(t, err)
require.NotNil(t, metrics)
require.Len(t, metrics, 1)
require.EqualValues(t, metrics[0].External.Value.Value(), tc.output)
})
}
}
func makeTestHTTPCollectorConfig(endpoint, aggregator string) *MetricConfig {
config := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "test-metric",
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{identifierLabel: "test-metric"},
},
},
},
Config: map[string]string{
HTTPJsonPathAnnotationKey: "$.values",
HTTPEndpointAnnotationKey: endpoint,
},
}
if aggregator != "" {
config.Config["aggregator"] = aggregator
}
return config
}

View File

@ -0,0 +1,65 @@
package httpmetrics
import (
"fmt"
"math"
)
type AggregatorFunc func(...float64) float64
// Average implements the average mathematical function over a slice of float64
func Average(values ...float64) float64 {
sum := Sum(values...)
return sum / float64(len(values))
}
// Minimum implements the absolute minimum mathematical function over a slice of float64
func Minimum(values ...float64) float64 {
// initialized with positive infinity, all finite numbers are smaller than it
curMin := math.Inf(1)
for _, v := range values {
if v < curMin {
curMin = v
}
}
return curMin
}
// Maximum implements the absolute maximum mathematical function over a slice of float64
func Maximum(values ...float64) float64 {
// initialized with negative infinity, all finite numbers are bigger than it
curMax := math.Inf(-1)
for _, v := range values {
if v > curMax {
curMax = v
}
}
return curMax
}
// Sum implements the summation mathematical function over a slice of float64
func Sum(values ...float64) float64 {
res := 0.0
for _, v := range values {
res += v
}
return res
}
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
func ParseAggregator(aggregator string) (AggregatorFunc, error) {
switch aggregator {
case "avg":
return Average, nil
case "min":
return Minimum, nil
case "max":
return Maximum, nil
case "sum":
return Sum, nil
default:
return nil, fmt.Errorf("aggregator function: %s is unknown", aggregator)
}
}

View File

@ -0,0 +1,57 @@
package httpmetrics
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
func TestReduce(t *testing.T) {
for _, tc := range []struct {
input []float64
output float64
aggregator string
parseError bool
}{
{
input: []float64{1, 2, 3},
output: 2.0,
aggregator: "avg",
parseError: false,
},
{
input: []float64{1, 2, 3},
output: 1.0,
aggregator: "min",
parseError: false,
},
{
input: []float64{1, 2, 3},
output: 3.0,
aggregator: "max",
parseError: false,
},
{
input: []float64{1, 2, 3},
output: 6.0,
aggregator: "sum",
parseError: false,
},
{
input: []float64{1, 2, 3},
aggregator: "non-existent",
parseError: true,
},
} {
t.Run(fmt.Sprintf("Test function: %s", tc.aggregator), func(t *testing.T) {
aggFunc, err := ParseAggregator(tc.aggregator)
if tc.parseError {
require.Error(t, err)
} else {
val := aggFunc(tc.input...)
require.Equal(t, tc.output, val)
}
})
}
}

View File

@ -0,0 +1,136 @@
package httpmetrics
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"time"
"github.com/oliveagle/jsonpath"
)
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
// querying the pods metrics endpoint and lookup the metric value as defined by
// the json path query.
type JSONPathMetricsGetter struct {
jsonPath *jsonpath.Compiled
aggregator AggregatorFunc
client *http.Client
}
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
func NewJSONPathMetricsGetter(httpClient *http.Client, aggregatorFunc AggregatorFunc, compiledPath *jsonpath.Compiled) *JSONPathMetricsGetter {
return &JSONPathMetricsGetter{client: httpClient, aggregator: aggregatorFunc, jsonPath: compiledPath}
}
var DefaultRequestTimeout = 15 * time.Second
var DefaultConnectTimeout = 15 * time.Second
func CustomMetricsHTTPClient(requestTimeout time.Duration, connectTimeout time.Duration) *http.Client {
client := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: connectTimeout,
}).DialContext,
MaxIdleConns: 50,
IdleConnTimeout: 90 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: requestTimeout,
}
return client
}
func DefaultMetricsHTTPClient() *http.Client {
return CustomMetricsHTTPClient(DefaultRequestTimeout, DefaultConnectTimeout)
}
// GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path
// query.
func (g *JSONPathMetricsGetter) GetMetric(metricsURL url.URL) (float64, error) {
data, err := g.fetchMetrics(metricsURL)
if err != nil {
return 0, err
}
// parse data
var jsonData interface{}
err = json.Unmarshal(data, &jsonData)
if err != nil {
return 0, err
}
res, err := g.jsonPath.Lookup(jsonData)
if err != nil {
return 0, err
}
switch res := res.(type) {
case int:
return float64(res), nil
case float32:
return float64(res), nil
case float64:
return res, nil
case []interface{}:
if g.aggregator == nil {
return 0, fmt.Errorf("no aggregator function has been specified")
}
s, err := castSlice(res)
if err != nil {
return 0, err
}
return g.aggregator(s...), nil
default:
return 0, fmt.Errorf("unsupported type %T", res)
}
}
// castSlice takes a slice of interface and returns a slice of float64 if all
// values in slice were castable, else returns an error
func castSlice(in []interface{}) ([]float64, error) {
var out []float64
for _, v := range in {
switch v := v.(type) {
case int:
out = append(out, float64(v))
case float32:
out = append(out, float64(v))
case float64:
out = append(out, v)
default:
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
}
}
return out, nil
}
func (g *JSONPathMetricsGetter) fetchMetrics(metricsURL url.URL) ([]byte, error) {
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
if err != nil {
return nil, err
}
resp, err := g.client.Do(request)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
}
return data, nil
}

View File

@ -0,0 +1,89 @@
package httpmetrics
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/oliveagle/jsonpath"
"github.com/stretchr/testify/require"
)
func TestCastSlice(t *testing.T) {
res1, err1 := castSlice([]interface{}{1, 2, 3})
require.NoError(t, err1)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
require.NoError(t, err2)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
require.NoError(t, err3)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
require.Equal(t, []float64(nil), res4)
}
type testValueResponse struct {
Value int64 `json:"value"`
}
type testValueArrayResponse struct {
Value []int64 `json:"value"`
}
func makeTestHTTPServer(t *testing.T, values ...int64) *httptest.Server {
h := func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.URL.Path, "/metrics")
w.Header().Set("Content-Type", "application/json")
var (
response []byte
err error
)
if len(values) == 1 {
response, err = json.Marshal(testValueResponse{Value: values[0]})
require.NoError(t, err)
} else {
response, err = json.Marshal(testValueArrayResponse{Value: values})
require.NoError(t, err)
}
_, err = w.Write(response)
require.NoError(t, err)
}
return httptest.NewServer(http.HandlerFunc(h))
}
func TestJSONPathMetricsGetter(t *testing.T) {
for _, tc := range []struct {
name string
input []int64
output float64
aggregator AggregatorFunc
}{
{
name: "basic average",
input: []int64{3, 4, 5},
output: 4,
aggregator: Average,
},
} {
t.Run(tc.name, func(t *testing.T) {
server := makeTestHTTPServer(t, tc.input...)
defer server.Close()
path, err := jsonpath.Compile("$.value")
require.NoError(t, err)
getter := NewJSONPathMetricsGetter(DefaultMetricsHTTPClient(), tc.aggregator, path)
url, err := url.Parse(fmt.Sprintf("%s/metrics", server.URL))
require.NoError(t, err)
metric, err := getter.GetMetric(*url)
require.NoError(t, err)
require.Equal(t, tc.output, metric)
})
}
}

View File

@ -0,0 +1,120 @@
package httpmetrics
import (
"fmt"
"net/url"
"strconv"
"time"
"github.com/oliveagle/jsonpath"
v1 "k8s.io/api/core/v1"
)
type PodMetricsGetter interface {
GetMetric(pod *v1.Pod) (float64, error)
}
type PodMetricsJSONPathGetter struct {
scheme string
path string
rawQuery string
port int
metricGetter *JSONPathMetricsGetter
}
func (g PodMetricsJSONPathGetter) GetMetric(pod *v1.Pod) (float64, error) {
if pod.Status.PodIP == "" {
return 0, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Name)
}
metricsURL := g.buildMetricsURL(pod.Status.PodIP)
return g.metricGetter.GetMetric(metricsURL)
}
func NewPodMetricsJSONPathGetter(config map[string]string) (*PodMetricsJSONPathGetter, error) {
getter := PodMetricsJSONPathGetter{}
var (
jsonPath *jsonpath.Compiled
aggregator AggregatorFunc
err error
)
if v, ok := config["json-key"]; ok {
path, err := jsonpath.Compile(v)
if err != nil {
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
}
jsonPath = path
}
if v, ok := config["scheme"]; ok {
getter.scheme = v
}
if v, ok := config["path"]; ok {
getter.path = v
}
if v, ok := config["raw-query"]; ok {
getter.rawQuery = v
}
if v, ok := config["port"]; ok {
n, err := strconv.Atoi(v)
if err != nil {
return nil, err
}
getter.port = n
}
if v, ok := config["aggregator"]; ok {
aggregator, err = ParseAggregator(v)
if err != nil {
return nil, err
}
}
requestTimeout := DefaultRequestTimeout
connectTimeout := DefaultConnectTimeout
if v, ok := config["request-timeout"]; ok {
d, err := time.ParseDuration(v)
if err != nil {
return nil, err
}
if d < 0 {
return nil, fmt.Errorf("Invalid request-timeout config value: %s", v)
}
requestTimeout = d
}
if v, ok := config["connect-timeout"]; ok {
d, err := time.ParseDuration(v)
if err != nil {
return nil, err
}
if d < 0 {
return nil, fmt.Errorf("Invalid connect-timeout config value: %s", v)
}
connectTimeout = d
}
getter.metricGetter = NewJSONPathMetricsGetter(CustomMetricsHTTPClient(requestTimeout, connectTimeout), aggregator, jsonPath)
return &getter, nil
}
// buildMetricsURL will build the full URL needed to hit the pod metric endpoint.
func (g *PodMetricsJSONPathGetter) buildMetricsURL(podIP string) url.URL {
var scheme = g.scheme
if scheme == "" {
scheme = "http"
}
return url.URL{
Scheme: scheme,
Host: fmt.Sprintf("%s:%d", podIP, g.port),
Path: g.path,
RawQuery: g.rawQuery,
}
}

View File

@ -0,0 +1,198 @@
package httpmetrics
import (
"fmt"
"testing"
"time"
"github.com/oliveagle/jsonpath"
"github.com/stretchr/testify/require"
)
func compareMetricsGetter(t *testing.T, first, second *PodMetricsJSONPathGetter) {
require.Equal(t, first.metricGetter.jsonPath, second.metricGetter.jsonPath)
require.Equal(t, first.scheme, second.scheme)
require.Equal(t, first.path, second.path)
require.Equal(t, first.port, second.port)
}
func TestNewPodJSONPathMetricsGetter(t *testing.T) {
configNoAggregator := map[string]string{
"json-key": "$.value",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
getterNoAggregator, err1 := NewPodMetricsJSONPathGetter(configNoAggregator)
require.NoError(t, err1)
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
metricGetter: &JSONPathMetricsGetter{jsonPath: jpath1},
scheme: "http",
path: "/metrics",
port: 9090,
}, getterNoAggregator)
configAggregator := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "9090",
"aggregator": "avg",
}
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
getterAggregator, err2 := NewPodMetricsJSONPathGetter(configAggregator)
require.NoError(t, err2)
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
metricGetter: &JSONPathMetricsGetter{jsonPath: jpath2, aggregator: Average},
scheme: "http",
path: "/metrics",
port: 9090,
}, getterAggregator)
configErrorJSONPath := map[string]string{
"json-key": "{}",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
_, err3 := NewPodMetricsJSONPathGetter(configErrorJSONPath)
require.Error(t, err3)
configErrorPort := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "a9090",
}
_, err4 := NewPodMetricsJSONPathGetter(configErrorPort)
require.Error(t, err4)
configWithRawQuery := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "9090",
"raw-query": "foo=bar&baz=bop",
}
jpath5, _ := jsonpath.Compile(configWithRawQuery["json-key"])
getterWithRawQuery, err5 := NewPodMetricsJSONPathGetter(configWithRawQuery)
require.NoError(t, err5)
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
metricGetter: &JSONPathMetricsGetter{jsonPath: jpath5},
scheme: "http",
path: "/metrics",
port: 9090,
rawQuery: "foo=bar&baz=bop",
}, getterWithRawQuery)
}
func TestBuildMetricsURL(t *testing.T) {
scheme := "http"
ip := "1.2.3.4"
port := "9090"
path := "/v1/test/"
rawQuery := "foo=bar&baz=bop"
// Test building URL with rawQuery
configWithRawQuery := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
"raw-query": rawQuery,
}
_, err := jsonpath.Compile(configWithRawQuery["json-key"])
require.NoError(t, err)
getterWithRawQuery, err1 := NewPodMetricsJSONPathGetter(configWithRawQuery)
require.NoError(t, err1)
expectedURLWithQuery := fmt.Sprintf("%s://%s:%s%s?%s", scheme, ip, port, path, rawQuery)
receivedURLWithQuery := getterWithRawQuery.buildMetricsURL(ip)
require.Equal(t, receivedURLWithQuery.String(), expectedURLWithQuery)
// Test building URL without rawQuery
configWithNoQuery := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
}
_, err2 := jsonpath.Compile(configWithNoQuery["json-key"])
require.NoError(t, err2)
getterWithNoQuery, err3 := NewPodMetricsJSONPathGetter(configWithNoQuery)
require.NoError(t, err3)
expectedURLNoQuery := fmt.Sprintf("%s://%s:%s%s", scheme, ip, port, path)
receivedURLNoQuery := getterWithNoQuery.buildMetricsURL(ip)
require.Equal(t, receivedURLNoQuery.String(), expectedURLNoQuery)
}
func TestCustomTimeouts(t *testing.T) {
scheme := "http"
port := "9090"
path := "/v1/test/"
// Test no custom options results in default timeouts
defaultConfig := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
}
defaultTime := time.Duration(15000) * time.Millisecond
defaultGetter, err1 := NewPodMetricsJSONPathGetter(defaultConfig)
require.NoError(t, err1)
require.Equal(t, defaultGetter.metricGetter.client.Timeout, defaultTime)
// Test with custom request timeout
configWithRequestTimeout := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
"request-timeout": "978ms",
}
exectedTimeout := time.Duration(978) * time.Millisecond
customRequestGetter, err2 := NewPodMetricsJSONPathGetter(configWithRequestTimeout)
require.NoError(t, err2)
require.Equal(t, customRequestGetter.metricGetter.client.Timeout, exectedTimeout)
// Test with custom connect timeout. Unfortunately, it seems there's no way to access the
// connect timeout of the client struct to actually verify it's set :/
configWithConnectTimeout := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
"connect-timeout": "512ms",
}
customRequestGetter, err3 := NewPodMetricsJSONPathGetter(configWithConnectTimeout)
require.NoError(t, err3)
configWithInvalidTimeout := map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
"request-timeout": "-256ms",
}
_, err4 := NewPodMetricsJSONPathGetter(configWithInvalidTimeout)
require.Error(t, err4)
configWithInvalidTimeout = map[string]string{
"json-key": "$.value",
"scheme": scheme,
"path": path,
"port": port,
"connect-timeout": "-256ms",
}
_, err5 := NewPodMetricsJSONPathGetter(configWithInvalidTimeout)
require.Error(t, err5)
}

View File

@ -0,0 +1,152 @@
package collector
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb-client-go"
"k8s.io/api/autoscaling/v2beta2"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/external_metrics"
)
const (
InfluxDBMetricName = "flux-query"
influxDBAddressKey = "address"
influxDBTokenKey = "token"
influxDBOrgKey = "org"
influxDBQueryNameLabelKey = "query-name"
)
type InfluxDBCollectorPlugin struct {
kubeClient kubernetes.Interface
address string
token string
org string
}
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, org string) (*InfluxDBCollectorPlugin, error) {
return &InfluxDBCollectorPlugin{
kubeClient: client,
address: address,
token: token,
org: org,
}, nil
}
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewInfluxDBCollector(p.address, p.token, p.org, config, interval)
}
type InfluxDBCollector struct {
address string
token string
org string
influxDBClient *influxdb.Client
interval time.Duration
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
query string
}
func NewInfluxDBCollector(address string, token string, org string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
collector := &InfluxDBCollector{
interval: interval,
metric: config.Metric,
metricType: config.Type,
}
switch configType := config.Type; configType {
case autoscalingv2.ObjectMetricSourceType:
return nil, fmt.Errorf("InfluxDB does not support object, but only external custom metrics")
case autoscalingv2.ExternalMetricSourceType:
// `metricSelector` is flattened into the MetricConfig.Config.
queryName, ok := config.Config[influxDBQueryNameLabelKey]
if !ok {
return nil, fmt.Errorf("selector for Flux query is not specified, "+
"please add metricSelector.matchLabels.%s: <...> to .yml description", influxDBQueryNameLabelKey)
}
if query, ok := config.Config[queryName]; ok {
// TODO(affo): validate the query once this is done:
// https://github.com/influxdata/influxdb-client-go/issues/73.
collector.query = query
} else {
return nil, fmt.Errorf("no Flux query defined for metric \"%s\"", config.Metric.Name)
}
default:
return nil, fmt.Errorf("unknown metric type: %v", configType)
}
// Use custom InfluxDB config if defined in HPA annotation.
if v, ok := config.Config[influxDBAddressKey]; ok {
address = v
}
if v, ok := config.Config[influxDBTokenKey]; ok {
token = v
}
if v, ok := config.Config[influxDBOrgKey]; ok {
org = v
}
influxDbClient, err := influxdb.New(address, token)
if err != nil {
return nil, err
}
collector.address = address
collector.token = token
collector.org = org
collector.influxDBClient = influxDbClient
return collector, nil
}
// queryResult is for unmarshaling the result from InfluxDB.
// The FluxQuery should make it so that the resulting table contains the column "metricvalue".
type queryResult struct {
MetricValue float64
}
// getValue returns the first result gathered from an InfluxDB instance.
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.org)
if err != nil {
return resource.Quantity{}, err
}
defer res.Close()
// Keeping just the first result.
if res.Next() {
qr := queryResult{}
if err := res.Unmarshal(&qr); err != nil {
return resource.Quantity{}, fmt.Errorf("error in unmarshaling query result: %v", err)
}
return *resource.NewMilliQuantity(int64(qr.MetricValue*1000), resource.DecimalSI), nil
}
if err := res.Err; err != nil {
return resource.Quantity{}, fmt.Errorf("error in query result: %v", err)
}
return resource.Quantity{}, fmt.Errorf("empty result returned")
}
func (c *InfluxDBCollector) GetMetrics() ([]CollectedMetric, error) {
v, err := c.getValue()
if err != nil {
return nil, err
}
cm := CollectedMetric{
Type: c.metricType,
External: external_metrics.ExternalMetricValue{
MetricName: c.metric.Name,
MetricLabels: c.metric.Selector.MatchLabels,
Timestamp: metav1.Time{
Time: time.Now().UTC(),
},
Value: v,
},
}
return []CollectedMetric{cm}, nil
}
func (c *InfluxDBCollector) Interval() time.Duration {
return c.interval
}

View File

@ -0,0 +1,155 @@
package collector
import (
"strings"
"testing"
"time"
"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestInfluxDBCollector_New(t *testing.T) {
t.Run("simple", func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
// This is actually useless, because the selector should be flattened in Config when parsing.
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"query-name": "range2m",
},
},
},
},
CollectorName: "influxdb",
Config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"query-name": "range2m",
},
}
c, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.org, "deadbeef"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.token, "secret"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.query, `from(bucket: "?") |> range(start: -2m)`; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
})
t.Run("override params", func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"query-name": "range2m",
},
},
},
},
CollectorName: "influxdb",
Config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"address": "http://localhost:9999",
"token": "sEcr3TT0ken",
"org": "deadbeef1234",
"query-name": "range3m",
},
}
c, err := NewInfluxDBCollector("http://localhost:8888", "secret", "deadbeef", m, time.Second)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.org, "deadbeef1234"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.address, "http://localhost:9999"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.token, "sEcr3TT0ken"; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
if got, want := c.query, `from(bucket: "?") |> range(start: -3m)`; want != got {
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
}
})
// Errors.
for _, tc := range []struct {
name string
mTypeName MetricTypeName
config map[string]string
errorStartsWith string
}{
{
name: "object metric",
mTypeName: MetricTypeName{
Type: v2beta2.ObjectMetricSourceType,
},
errorStartsWith: "InfluxDB does not support object",
},
{
name: "no selector",
mTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
},
},
// The selector should be flattened into the config by the parsing step, but it isn't.
config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
},
errorStartsWith: "selector for Flux query is not specified",
},
{
name: "referencing non-existing query",
mTypeName: MetricTypeName{
Type: v2beta2.ExternalMetricSourceType,
Metric: v2beta2.MetricIdentifier{
Name: "flux-query",
},
},
config: map[string]string{
"range1m": `from(bucket: "?") |> range(start: -1m)`,
"range2m": `from(bucket: "?") |> range(start: -2m)`,
"range3m": `from(bucket: "?") |> range(start: -3m)`,
"query-name": "rangeXm",
},
errorStartsWith: "no Flux query defined for metric",
},
} {
t.Run("error - "+tc.name, func(t *testing.T) {
m := &MetricConfig{
MetricTypeName: tc.mTypeName,
CollectorName: "influxdb",
Config: tc.config,
}
_, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
if err == nil {
t.Fatal("expected error got none")
}
if want, got := tc.errorStartsWith, err.Error(); !strings.HasPrefix(got, want) {
t.Fatalf("%s should start with %s", got, want)
}
})
}
}

View File

@ -1,133 +0,0 @@
package collector
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
"github.com/oliveagle/jsonpath"
corev1 "k8s.io/api/core/v1"
)
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
// querying the pods metrics endpoint and lookup the metric value as defined by
// the json path query.
type JSONPathMetricsGetter struct {
jsonPath *jsonpath.Compiled
scheme string
path string
port int
}
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
getter := &JSONPathMetricsGetter{}
if v, ok := config["json-key"]; ok {
pat, err := jsonpath.Compile(v)
if err != nil {
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
}
getter.jsonPath = pat
}
if v, ok := config["scheme"]; ok {
getter.scheme = v
}
if v, ok := config["path"]; ok {
getter.path = v
}
if v, ok := config["port"]; ok {
n, err := strconv.Atoi(v)
if err != nil {
return nil, err
}
getter.port = n
}
return getter, nil
}
// GetMetric gets metric from pod by fetching json metrics from the pods metric
// endpoint and extracting the desired value using the specified json path
// query.
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
if err != nil {
return 0, err
}
// parse data
var jsonData interface{}
err = json.Unmarshal(data, &jsonData)
if err != nil {
return 0, err
}
res, err := g.jsonPath.Lookup(jsonData)
if err != nil {
return 0, err
}
switch res := res.(type) {
case int:
return float64(res), nil
case float32:
return float64(res), nil
case float64:
return res, nil
default:
return 0, fmt.Errorf("unsupported type %T", res)
}
}
// getPodMetrics returns the content of the pods metrics endpoint.
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" {
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
}
httpClient := &http.Client{
Timeout: 15 * time.Second,
Transport: &http.Transport{},
}
if scheme == "" {
scheme = "http"
}
metricsURL := url.URL{
Scheme: scheme,
Host: fmt.Sprintf("%s:%d", pod.Status.PodIP, port),
Path: path,
}
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
if err != nil {
return nil, err
}
resp, err := httpClient.Do(request)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
}
return data, nil
}

View File

@ -2,13 +2,15 @@ package collector
import (
"fmt"
"net/http"
"time"
log "github.com/sirupsen/logrus"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
@ -30,17 +32,14 @@ func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutosc
type PodCollector struct {
client kubernetes.Interface
Getter PodMetricsGetter
Getter httpmetrics.PodMetricsGetter
podLabelSelector *metav1.LabelSelector
namespace string
metric autoscalingv2.MetricIdentifier
metricType autoscalingv2.MetricSourceType
interval time.Duration
logger *log.Entry
}
type PodMetricsGetter interface {
GetMetric(pod *corev1.Pod) (float64, error)
httpClient *http.Client
}
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
@ -60,11 +59,11 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalP
logger: log.WithFields(log.Fields{"Collector": "Pod"}),
}
var getter PodMetricsGetter
var getter httpmetrics.PodMetricsGetter
switch config.CollectorName {
case "json-path":
var err error
getter, err = NewJSONPathMetricsGetter(config.Config)
getter, err = httpmetrics.NewPodMetricsJSONPathGetter(config.Config)
if err != nil {
return nil, err
}
@ -87,32 +86,20 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
return nil, err
}
values := make([]CollectedMetric, 0, len(pods.Items))
// TODO: get metrics in parallel
ch := make(chan CollectedMetric)
errCh := make(chan error)
for _, pod := range pods.Items {
value, err := c.Getter.GetMetric(&pod)
if err != nil {
c.logger.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
continue
}
go c.getPodMetric(pod, ch, errCh)
}
metricValue := CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: custom_metrics.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
Namespace: pod.Namespace,
},
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
},
values := make([]CollectedMetric, 0, len(pods.Items))
for i := 0; i < len(pods.Items); i++ {
select {
case err := <- errCh:
c.logger.Error(err)
case resp := <- ch:
values = append(values, resp)
}
values = append(values, metricValue)
}
return values, nil
@ -122,6 +109,29 @@ func (c *PodCollector) Interval() time.Duration {
return c.interval
}
func (c *PodCollector) getPodMetric(pod corev1.Pod, ch chan CollectedMetric, errCh chan error) {
value, err := c.Getter.GetMetric(&pod)
if err != nil {
errCh <- fmt.Errorf("Failed to get metrics from pod '%s/%s': %v", pod.Namespace, pod.Name, err)
return
}
ch <- CollectedMetric{
Type: c.metricType,
Custom: custom_metrics.MetricValue{
DescribedObject: custom_metrics.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
Namespace: pod.Namespace,
},
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
Timestamp: metav1.Time{Time: time.Now().UTC()},
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
},
}
}
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) {
switch hpa.Spec.ScaleTargetRef.Kind {
case "Deployment":

View File

@ -0,0 +1,155 @@
package collector
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"sync"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
const (
testNamespace = "test-namespace"
applicationLabelName = "application"
applicationLabelValue = "test-application"
testDeploymentName = "test-application"
testInterval = 10 * time.Second
)
func TestPodCollector(t *testing.T) {
for _, tc := range []struct {
name string
metrics [][]int64
result []int64
}{
{
name: "simple",
metrics: [][]int64{{1}, {3}, {8}, {5}, {2}},
result: []int64{1, 3, 8, 5, 2},
},
} {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
plugin := NewPodCollectorPlugin(client)
makeTestDeployment(t, client)
host, port, metricsHandler := makeTestHTTPServer(t, tc.metrics)
makeTestPods(t, host, port, "test-metric", client, 5)
testHPA := makeTestHPA(t, client)
testConfig := makeTestConfig(port)
collector, err := plugin.NewCollector(testHPA, testConfig, testInterval)
require.NoError(t, err)
metrics, err := collector.GetMetrics()
require.NoError(t, err)
require.Equal(t, len(metrics), int(metricsHandler.calledCounter))
var values []int64
for _, m := range metrics {
values = append(values, m.Custom.Value.Value())
}
require.ElementsMatch(t, tc.result, values)
})
}
}
type testMetricResponse struct {
Values []int64 `json:"values"`
}
type testMetricsHandler struct {
values [][]int64
calledCounter uint
t *testing.T
metricsPath string
sync.RWMutex
}
func (h *testMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.Lock()
defer h.Unlock()
require.Equal(h.t, h.metricsPath, r.URL.Path)
require.Less(h.t, int(h.calledCounter), len(h.values))
response, err := json.Marshal(testMetricResponse{Values: h.values[h.calledCounter]})
require.NoError(h.t, err)
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(response)
require.NoError(h.t, err)
h.calledCounter++
}
func makeTestHTTPServer(t *testing.T, values [][]int64) (string, string, *testMetricsHandler) {
metricsHandler := &testMetricsHandler{values: values, t: t, metricsPath: "/metrics"}
server := httptest.NewServer(metricsHandler)
url, err := url.Parse(server.URL)
require.NoError(t, err)
return url.Hostname(), url.Port(), metricsHandler
}
func makeTestConfig(port string) *MetricConfig {
return &MetricConfig{
CollectorName: "json-path",
Config: map[string]string{"json-key": "$.values", "port": port, "path": "/metrics", "aggregator": "sum"},
}
}
func makeTestPods(t *testing.T, testServer string, metricName string, port string, client kubernetes.Interface, replicas int) {
for i := 0; i < replicas; i++ {
testPod := &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("test-pod-%d", i),
Labels: map[string]string{applicationLabelName: applicationLabelValue},
Annotations: map[string]string{
fmt.Sprintf("metric-config.pods.%s.json-path/port", metricName): port,
},
},
Status: corev1.PodStatus{
PodIP: testServer,
},
}
_, err := client.CoreV1().Pods(testNamespace).Create(testPod)
require.NoError(t, err)
}
}
func makeTestDeployment(t *testing.T, client kubernetes.Interface) *appsv1.Deployment {
deployment := appsv1.Deployment{
ObjectMeta: v1.ObjectMeta{Name: testDeploymentName},
Spec: appsv1.DeploymentSpec{
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{applicationLabelName: applicationLabelValue},
},
},
}
_, err := client.AppsV1().Deployments(testNamespace).Create(&deployment)
require.NoError(t, err)
return &deployment
}
func makeTestHPA(t *testing.T, client kubernetes.Interface) *autoscalingv2.HorizontalPodAutoscaler {
hpa := &autoscalingv2.HorizontalPodAutoscaler{
ObjectMeta: v1.ObjectMeta{
Name: "test-hpa",
Namespace: testNamespace,
},
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
Kind: "Deployment",
Name: testDeploymentName,
APIVersion: "apps/v1",
},
},
}
_, err := client.AutoscalingV2beta2().HorizontalPodAutoscalers("test-namespace").Create(hpa)
require.NoError(t, err)
return hpa
}

View File

@ -128,7 +128,7 @@ func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa
func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
// TODO: use real context
value, err := c.promAPI.Query(context.Background(), c.query, time.Now().UTC())
value, _, err := c.promAPI.Query(context.Background(), c.query, time.Now().UTC())
if err != nil {
return nil, err
}

View File

@ -125,7 +125,7 @@ func getWeights(ingressAnnotations map[string]string, backendAnnotations []strin
// getCollector returns a collector for getting the metrics.
func (c *SkipperCollector) getCollector() (Collector, error) {
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
ingress, err := c.client.NetworkingV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

View File

@ -10,7 +10,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
v1beta1 "k8s.io/api/networking/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
@ -331,8 +331,8 @@ func TestSkipperCollector(t *testing.T) {
err := makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.hostnames, tc.backendWeights)
require.NoError(t, err)
plugin := makePlugin(tc.metric)
hpa := makeHPA(tc.ingressName, tc.backend)
config := makeConfig(tc.backend, tc.fakedAverage)
hpa := makeHPA(tc.namespace, tc.ingressName, tc.backend)
config := makeConfig(tc.ingressName, tc.namespace, tc.backend, tc.fakedAverage)
_, err = newDeployment(client, tc.namespace, tc.backend, tc.replicas, tc.readyReplicas)
require.NoError(t, err)
collector, err := NewSkipperCollector(client, plugin, hpa, config, time.Minute, tc.backendAnnotations, tc.backend)
@ -341,6 +341,7 @@ func TestSkipperCollector(t *testing.T) {
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, map[string]string{"query": tc.expectedQuery}, plugin.config)
require.NoError(t, err, "failed to collect metrics: %v", err)
require.Len(t, collected, 1, "the number of metrics returned is not 1")
@ -381,12 +382,13 @@ func makeIngress(client kubernetes.Interface, namespace, ingressName, backend st
Host: hostname,
})
}
_, err := client.ExtensionsV1beta1().Ingresses(namespace).Create(ingress)
_, err := client.NetworkingV1beta1().Ingresses(namespace).Create(ingress)
return err
}
func makeHPA(ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler {
func makeHPA(namespace, ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler {
return &autoscalingv2.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{Namespace: namespace},
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
Kind: "Deployment",
@ -404,9 +406,13 @@ func makeHPA(ingressName, backend string) *autoscalingv2.HorizontalPodAutoscaler
},
}
}
func makeConfig(backend string, fakedAverage bool) *MetricConfig {
func makeConfig(ingressName, namespace, backend string, fakedAverage bool) *MetricConfig {
config := &MetricConfig{
MetricTypeName: MetricTypeName{Metric: autoscalingv2.MetricIdentifier{Name: fmt.Sprintf("%s,%s", rpsMetricName, backend)}},
ObjectReference: custom_metrics.ObjectReference{
Name: ingressName,
Namespace: namespace,
},
MetricSpec: autoscalingv2.MetricSpec{
Object: &autoscalingv2.ObjectMetricSource{
Target: autoscalingv2.MetricTarget{},

View File

@ -1,247 +0,0 @@
package provider
import (
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscaling "k8s.io/api/autoscaling/v2beta2"
core "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/conversion"
)
// from: https://github.com/kubernetes/kubernetes/blob/v1.14.4/pkg/apis/autoscaling/v2beta1/conversion.go
func Convert_autoscaling_MetricTarget_To_v2beta1_CrossVersionObjectReference(in *autoscaling.MetricTarget, out *autoscalingv2beta1.CrossVersionObjectReference, s conversion.Scope) error {
return nil
}
func Convert_v2beta1_CrossVersionObjectReference_To_autoscaling_MetricTarget(in *autoscalingv2beta1.CrossVersionObjectReference, out *autoscaling.MetricTarget, s conversion.Scope) error {
return nil
}
func Convert_v2beta1_ResourceMetricStatus_To_autoscaling_ResourceMetricStatus(in *autoscalingv2beta1.ResourceMetricStatus, out *autoscaling.ResourceMetricStatus, s conversion.Scope) error {
out.Name = core.ResourceName(in.Name)
utilization := in.CurrentAverageUtilization
averageValue := in.CurrentAverageValue
out.Current = autoscaling.MetricValueStatus{
AverageValue: &averageValue,
AverageUtilization: utilization,
}
return nil
}
func Convert_autoscaling_ResourceMetricStatus_To_v2beta1_ResourceMetricStatus(in *autoscaling.ResourceMetricStatus, out *autoscalingv2beta1.ResourceMetricStatus, s conversion.Scope) error {
out.Name = v1.ResourceName(in.Name)
out.CurrentAverageUtilization = in.Current.AverageUtilization
if in.Current.AverageValue != nil {
out.CurrentAverageValue = *in.Current.AverageValue
}
return nil
}
func Convert_v2beta1_ResourceMetricSource_To_autoscaling_ResourceMetricSource(in *autoscalingv2beta1.ResourceMetricSource, out *autoscaling.ResourceMetricSource, s conversion.Scope) error {
out.Name = core.ResourceName(in.Name)
utilization := in.TargetAverageUtilization
averageValue := in.TargetAverageValue
var metricType autoscaling.MetricTargetType
if utilization == nil {
metricType = autoscaling.AverageValueMetricType
} else {
metricType = autoscaling.UtilizationMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
AverageValue: averageValue,
AverageUtilization: utilization,
}
return nil
}
func Convert_autoscaling_ResourceMetricSource_To_v2beta1_ResourceMetricSource(in *autoscaling.ResourceMetricSource, out *autoscalingv2beta1.ResourceMetricSource, s conversion.Scope) error {
out.Name = v1.ResourceName(in.Name)
out.TargetAverageUtilization = in.Target.AverageUtilization
out.TargetAverageValue = in.Target.AverageValue
return nil
}
func Convert_autoscaling_ExternalMetricSource_To_v2beta1_ExternalMetricSource(in *autoscaling.ExternalMetricSource, out *autoscalingv2beta1.ExternalMetricSource, s conversion.Scope) error {
out.MetricName = in.Metric.Name
out.TargetValue = in.Target.Value
out.TargetAverageValue = in.Target.AverageValue
out.MetricSelector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ExternalMetricSource_To_autoscaling_ExternalMetricSource(in *autoscalingv2beta1.ExternalMetricSource, out *autoscaling.ExternalMetricSource, s conversion.Scope) error {
value := in.TargetValue
averageValue := in.TargetAverageValue
var metricType autoscaling.MetricTargetType
if value == nil {
metricType = autoscaling.AverageValueMetricType
} else {
metricType = autoscaling.ValueMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
Value: value,
AverageValue: averageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.MetricSelector,
}
return nil
}
func Convert_autoscaling_ObjectMetricSource_To_v2beta1_ObjectMetricSource(in *autoscaling.ObjectMetricSource, out *autoscalingv2beta1.ObjectMetricSource, s conversion.Scope) error {
if in.Target.Value != nil {
out.TargetValue = *in.Target.Value
}
out.AverageValue = in.Target.AverageValue
out.Target = autoscalingv2beta1.CrossVersionObjectReference{
Kind: in.DescribedObject.Kind,
Name: in.DescribedObject.Name,
APIVersion: in.DescribedObject.APIVersion,
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ObjectMetricSource_To_autoscaling_ObjectMetricSource(in *autoscalingv2beta1.ObjectMetricSource, out *autoscaling.ObjectMetricSource, s conversion.Scope) error {
var metricType autoscaling.MetricTargetType
if in.AverageValue == nil {
metricType = autoscaling.ValueMetricType
} else {
metricType = autoscaling.AverageValueMetricType
}
out.Target = autoscaling.MetricTarget{
Type: metricType,
Value: &in.TargetValue,
AverageValue: in.AverageValue,
}
out.DescribedObject = autoscaling.CrossVersionObjectReference{
Kind: in.Target.Kind,
Name: in.Target.Name,
APIVersion: in.Target.APIVersion,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_PodsMetricSource_To_v2beta1_PodsMetricSource(in *autoscaling.PodsMetricSource, out *autoscalingv2beta1.PodsMetricSource, s conversion.Scope) error {
if in.Target.AverageValue != nil {
targetAverageValue := *in.Target.AverageValue
out.TargetAverageValue = targetAverageValue
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_PodsMetricSource_To_autoscaling_PodsMetricSource(in *autoscalingv2beta1.PodsMetricSource, out *autoscaling.PodsMetricSource, s conversion.Scope) error {
targetAverageValue := &in.TargetAverageValue
var metricType autoscaling.MetricTargetType
metricType = autoscaling.AverageValueMetricType
out.Target = autoscaling.MetricTarget{
Type: metricType,
AverageValue: targetAverageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_ExternalMetricStatus_To_v2beta1_ExternalMetricStatus(in *autoscaling.ExternalMetricStatus, out *autoscalingv2beta1.ExternalMetricStatus, s conversion.Scope) error {
if &in.Current.AverageValue != nil {
out.CurrentAverageValue = in.Current.AverageValue
}
out.MetricName = in.Metric.Name
if in.Current.Value != nil {
out.CurrentValue = *in.Current.Value
}
out.MetricSelector = in.Metric.Selector
return nil
}
func Convert_v2beta1_ExternalMetricStatus_To_autoscaling_ExternalMetricStatus(in *autoscalingv2beta1.ExternalMetricStatus, out *autoscaling.ExternalMetricStatus, s conversion.Scope) error {
value := in.CurrentValue
averageValue := in.CurrentAverageValue
out.Current = autoscaling.MetricValueStatus{
Value: &value,
AverageValue: averageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.MetricSelector,
}
return nil
}
func Convert_autoscaling_ObjectMetricStatus_To_v2beta1_ObjectMetricStatus(in *autoscaling.ObjectMetricStatus, out *autoscalingv2beta1.ObjectMetricStatus, s conversion.Scope) error {
if in.Current.Value != nil {
out.CurrentValue = *in.Current.Value
}
out.Target = autoscalingv2beta1.CrossVersionObjectReference{
Kind: in.DescribedObject.Kind,
Name: in.DescribedObject.Name,
APIVersion: in.DescribedObject.APIVersion,
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
if in.Current.AverageValue != nil {
currentAverageValue := *in.Current.AverageValue
out.AverageValue = &currentAverageValue
}
return nil
}
func Convert_v2beta1_ObjectMetricStatus_To_autoscaling_ObjectMetricStatus(in *autoscalingv2beta1.ObjectMetricStatus, out *autoscaling.ObjectMetricStatus, s conversion.Scope) error {
out.Current = autoscaling.MetricValueStatus{
Value: &in.CurrentValue,
AverageValue: in.AverageValue,
}
out.DescribedObject = autoscaling.CrossVersionObjectReference{
Kind: in.Target.Kind,
Name: in.Target.Name,
APIVersion: in.Target.APIVersion,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}
func Convert_autoscaling_PodsMetricStatus_To_v2beta1_PodsMetricStatus(in *autoscaling.PodsMetricStatus, out *autoscalingv2beta1.PodsMetricStatus, s conversion.Scope) error {
if in.Current.AverageValue != nil {
out.CurrentAverageValue = *in.Current.AverageValue
}
out.MetricName = in.Metric.Name
out.Selector = in.Metric.Selector
return nil
}
func Convert_v2beta1_PodsMetricStatus_To_autoscaling_PodsMetricStatus(in *autoscalingv2beta1.PodsMetricStatus, out *autoscaling.PodsMetricStatus, s conversion.Scope) error {
out.Current = autoscaling.MetricValueStatus{
AverageValue: &in.CurrentAverageValue,
}
out.Metric = autoscaling.MetricIdentifier{
Name: in.MetricName,
Selector: in.Selector,
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,7 @@ package provider
import (
"context"
"errors"
"reflect"
"sync"
"time"
@ -49,16 +50,17 @@ var (
// HPAProvider is a base provider for initializing metric collectors based on
// HPA resources.
type HPAProvider struct {
client kubernetes.Interface
interval time.Duration
collectorScheduler *CollectorScheduler
collectorInterval time.Duration
metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore
collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder
logger *log.Entry
client kubernetes.Interface
interval time.Duration
collectorScheduler *CollectorScheduler
collectorInterval time.Duration
metricSink chan metricCollection
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
metricStore *MetricStore
collectorFactory *collector.CollectorFactory
recorder kube_record.EventRecorder
logger *log.Entry
disregardIncompatibleHPAs bool
}
// metricCollection is a container for sending collected metrics across a
@ -69,7 +71,7 @@ type metricCollection struct {
}
// NewHPAProvider initializes a new HPAProvider.
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory) *HPAProvider {
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
metricsc := make(chan metricCollection)
return &HPAProvider{
@ -80,9 +82,10 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
metricStore: NewMetricStore(func() time.Time {
return time.Now().UTC().Add(15 * time.Minute)
}),
collectorFactory: collectorFactory,
recorder: recorder.CreateEventRecorder(client),
logger: log.WithFields(log.Fields{"provider": "hpa"}),
collectorFactory: collectorFactory,
recorder: recorder.CreateEventRecorder(client),
logger: log.WithFields(log.Fields{"provider": "hpa"}),
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
}
}
@ -116,7 +119,7 @@ func (p *HPAProvider) Run(ctx context.Context) {
func (p *HPAProvider) updateHPAs() error {
p.logger.Info("Looking for HPAs")
hpas, err := p.client.AutoscalingV2beta1().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{})
hpas, err := p.client.AutoscalingV2beta2().HorizontalPodAutoscalers(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
return err
}
@ -125,15 +128,7 @@ func (p *HPAProvider) updateHPAs() error {
newHPAs := 0
for _, hpav1 := range hpas.Items {
hpav1 := hpav1
hpa := autoscalingv2.HorizontalPodAutoscaler{}
err := Convert_v2beta1_HorizontalPodAutoscaler_To_autoscaling_HorizontalPodAutoscaler(&hpav1, &hpa, nil)
if err != nil {
p.logger.Errorf("Failed to convert HPA to v2beta2: %v", err)
continue
}
for _, hpa := range hpas.Items {
resourceRef := resourceReference{
Name: hpa.Name,
Namespace: hpa.Namespace,
@ -162,15 +157,20 @@ func (p *HPAProvider) updateHPAs() error {
interval = p.collectorInterval
}
collector, err := p.collectorFactory.NewCollector(&hpa, config, interval)
c, err := p.collectorFactory.NewCollector(&hpa, config, interval)
if err != nil {
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
// Only log when it's not a PluginNotFoundError AND flag disregardIncompatibleHPAs is true
if !(errors.Is(err, &collector.PluginNotFoundError{}) && p.disregardIncompatibleHPAs) {
p.recorder.Eventf(&hpa, apiv1.EventTypeWarning, "CreateNewMetricsCollector", "Failed to create new metrics collector: %v", err)
}
cache = false
continue
}
p.logger.Infof("Adding new metrics collector: %T", collector)
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, collector)
p.logger.Infof("Adding new metrics collector: %T", c)
p.collectorScheduler.Add(resourceRef, config.MetricTypeName, c)
}
newHPAs++

View File

@ -7,8 +7,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
autoscalingv1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
autoscaling "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
@ -16,7 +15,7 @@ import (
type mockCollectorPlugin struct{}
func (m mockCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *collector.MetricConfig, interval time.Duration) (collector.Collector, error) {
func (m mockCollectorPlugin) NewCollector(hpa *autoscaling.HorizontalPodAutoscaler, config *collector.MetricConfig, interval time.Duration) (collector.Collector, error) {
return mockCollector{}, nil
}
@ -33,7 +32,7 @@ func (c mockCollector) Interval() time.Duration {
func TestUpdateHPAs(t *testing.T) {
value := resource.MustParse("1k")
hpa := &autoscalingv1.HorizontalPodAutoscaler{
hpa := &autoscaling.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa1",
Namespace: "default",
@ -43,20 +42,25 @@ func TestUpdateHPAs(t *testing.T) {
"metric-config.pods.requests-per-second.json-path/port": "9090",
},
},
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app",
APIVersion: "apps/v1",
},
MinReplicas: &[]int32{1}[0],
MaxReplicas: 10,
Metrics: []autoscalingv1.MetricSpec{
Metrics: []autoscaling.MetricSpec{
{
Type: autoscalingv1.PodsMetricSourceType,
Pods: &autoscalingv1.PodsMetricSource{
MetricName: "requests-per-second",
TargetAverageValue: value,
Type: autoscaling.PodsMetricSourceType,
Pods: &autoscaling.PodsMetricSource{
Metric: autoscaling.MetricIdentifier{
Name: "requests-per-second",
},
Target: autoscaling.MetricTarget{
Type: autoscaling.AverageValueMetricType,
AverageValue: &value,
},
},
},
},
@ -66,14 +70,14 @@ func TestUpdateHPAs(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
var err error
hpa, err = fakeClient.AutoscalingV2beta1().HorizontalPodAutoscalers("default").Create(hpa)
hpa, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
require.NoError(t, err)
collectorFactory := collector.NewCollectorFactory()
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
@ -82,7 +86,7 @@ func TestUpdateHPAs(t *testing.T) {
// update HPA
hpa.Annotations["metric-config.pods.requests-per-second.json-path/port"] = "8080"
_, err = fakeClient.AutoscalingV2beta1().HorizontalPodAutoscalers("default").Update(hpa)
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Update(hpa)
require.NoError(t, err)
err = provider.updateHPAs()
@ -90,3 +94,56 @@ func TestUpdateHPAs(t *testing.T) {
require.Len(t, provider.collectorScheduler.table, 1)
}
func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
// Test HPAProvider with disregardIncompatibleHPAs = true
value := resource.MustParse("1k")
hpa := &autoscaling.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: "hpa1",
Namespace: "default",
Annotations: map[string]string{},
},
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app",
APIVersion: "apps/v1",
},
MinReplicas: &[]int32{1}[0],
MaxReplicas: 10,
Metrics: []autoscaling.MetricSpec{
{
Type: autoscaling.ExternalMetricSourceType,
External: &autoscaling.ExternalMetricSource{
Metric: autoscaling.MetricIdentifier{
Name: "some-other-metric",
},
Target: autoscaling.MetricTarget{
Type: autoscaling.AverageValueMetricType,
AverageValue: &value,
},
},
},
},
},
}
fakeClient := fake.NewSimpleClientset()
var err error
_, err = fakeClient.AutoscalingV2beta2().HorizontalPodAutoscalers("default").Create(hpa)
require.NoError(t, err)
collectorFactory := collector.NewCollectorFactory()
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
err = provider.updateHPAs()
require.NoError(t, err)
}

Some files were not shown because too many files have changed in this diff Show More