mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2025-01-09 09:51:38 +00:00
Compare commits
144 Commits
feature/k8
...
v0.1.9
Author | SHA1 | Date | |
---|---|---|---|
|
e61071a36d | ||
|
c108fab55a | ||
|
be7567efea | ||
|
b677e814be | ||
|
deec4727ee | ||
|
cf369639d8 | ||
|
262a35c2ec | ||
|
607386834b | ||
|
4bdce4da4d | ||
|
801e5d7a47 | ||
|
4df21ae2b4 | ||
|
7f639baeee | ||
|
88ddb6f10e | ||
|
d9cf2e0858 | ||
|
c3b18e784b | ||
|
c3a03fd758 | ||
|
05704c0a6b | ||
|
f3a608fcf7 | ||
|
223ec9fd89 | ||
|
81255aa956 | ||
|
dac44965e3 | ||
|
dbed1570ba | ||
|
ef24244074 | ||
|
cadf2dff3c | ||
|
db83268343 | ||
|
78c2ef742b | ||
|
f0b817629c | ||
|
3e7b66070c | ||
|
f3d33b6132 | ||
|
7b7af889fb | ||
|
2bcaa80ce0 | ||
|
a2e6980b25 | ||
|
f0783dcdd8 | ||
|
560fb40dae | ||
|
41229e370d | ||
|
338393c11e | ||
|
8e7d1bc0d6 | ||
|
dc7fb5875b | ||
|
8f70d3493e | ||
|
97ec13d010 | ||
|
5e20dcd961 | ||
|
f27dbc3939 | ||
|
6443613930 | ||
|
d9c2e50f2c | ||
|
a8cd761f85 | ||
|
43fa626ca1 | ||
|
2d3ddc53ef | ||
|
02880fad2d | ||
|
405e347620 | ||
|
f15aacad5c | ||
|
c0a9f525b8 | ||
|
24207d285c | ||
|
860aba807e | ||
|
73659f8ac6 | ||
|
280d358538 | ||
|
735bb164e2 | ||
|
d28712abd1 | ||
|
670692b23c | ||
|
33683fe88d | ||
|
abaef5e491 | ||
|
670f081a5e | ||
|
55a743e5ac | ||
|
830d385a4a | ||
|
f2638c6843 | ||
|
b77fba2e7b | ||
|
e77d51f97a | ||
|
69ac42ed7d | ||
|
582f05539c | ||
|
46f2e5c4fd | ||
|
31bc1771df | ||
|
836c78d08b | ||
|
6f5ab042e6 | ||
|
5abc7388fb | ||
|
5bf87cb10e | ||
|
c6610750e4 | ||
|
04ae6d955e | ||
|
2d56c202a7 | ||
|
c9fa15c7d4 | ||
|
e3330dcf43 | ||
|
8e4662b26c | ||
|
9e211b181a | ||
|
9d78fff1b5 | ||
|
1c6f9e2ea6 | ||
|
c0eda7cd1e | ||
|
75f3e48f70 | ||
|
5b55bea994 | ||
|
4412e3dca4 | ||
|
8f9277258c | ||
|
8c3fef45fd | ||
|
120950078c | ||
|
0790bc351a | ||
|
f6b2aede5b | ||
|
7d5e719eb0 | ||
|
7497a61a2c | ||
|
a72380125f | ||
|
70c7fb843d | ||
|
79533a5a93 | ||
|
2765ff9811 | ||
|
76d2f74743 | ||
|
0de5042d3d | ||
|
07c0e179b3 | ||
|
29ee953a16 | ||
|
f78ef26857 | ||
|
a3c14e9dcb | ||
|
b6b13fb31a | ||
|
0a06691d39 | ||
|
2d1d51e829 | ||
|
41761e62df | ||
|
ed4c93abbb | ||
|
b2194ca136 | ||
|
bd0dd10e72 | ||
|
461869c69b | ||
|
9950851cad | ||
|
d85fee795e | ||
|
990f8eab14 | ||
|
9a396bde68 | ||
|
aa8d24dbcf | ||
|
19e9be9671 | ||
|
8fed8538ad | ||
|
9a234cbdac | ||
|
ffff8c2040 | ||
|
9d2760e3fc | ||
|
5598b4d012 | ||
|
888e76b748 | ||
|
7c848a1282 | ||
|
445c7c874a | ||
|
2eed3e64d0 | ||
|
f097e63401 | ||
|
ca4e2008c4 | ||
|
3f019a1ceb | ||
|
5a6f4997bd | ||
|
8db22f38a3 | ||
|
d5b803d923 | ||
|
14f13495af | ||
|
dfeae82cae | ||
|
04b212175e | ||
|
478c97d5cb | ||
|
f4efa2898b | ||
|
7258cb7800 | ||
|
56dd8b52e0 | ||
|
248acf0311 | ||
|
75633d3082 | ||
|
72aa672f51 | ||
|
f49f7821dc |
35
.github/CODEOWNERS
vendored
35
.github/CODEOWNERS
vendored
@ -1,35 +0,0 @@
|
||||
# These owners will be the default owners for everything in
|
||||
# the repo.
|
||||
* @arjunrn
|
||||
|
||||
|
||||
|
||||
# Samples for assigning codeowners below:
|
||||
# Order is important; the last matching pattern takes the most
|
||||
# precedence. When someone opens a pull request that only
|
||||
# modifies JS files, only @js-owner and not the global
|
||||
# owner(s) will be requested for a review.
|
||||
# *.js @js-owner
|
||||
|
||||
# You can also use email addresses if you prefer. They'll be
|
||||
# used to look up users just like we do for commit author
|
||||
# emails.
|
||||
# *.go docs@example.com
|
||||
|
||||
# In this example, @doctocat owns any files in the build/logs
|
||||
# directory at the root of the repository and any of its
|
||||
# subdirectories.
|
||||
# /build/logs/ @doctocat
|
||||
|
||||
# The `docs/*` pattern will match files like
|
||||
# `docs/getting-started.md` but not further nested files like
|
||||
# `docs/build-app/troubleshooting.md`.
|
||||
# docs/* docs@example.com
|
||||
|
||||
# In this example, @octocat owns any file in an apps directory
|
||||
# anywhere in your repository.
|
||||
# apps/ @octocat
|
||||
|
||||
# In this example, @doctocat owns any file in the `/docs`
|
||||
# directory in the root of your repository.
|
||||
# /docs/ @doctocat
|
14
.github/dependabot.yml
vendored
Normal file
14
.github/dependabot.yml
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: gomod
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: monthly
|
||||
time: "07:00"
|
||||
open-pull-requests-limit: 10
|
||||
- package-ecosystem: docker
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: monthly
|
||||
time: "07:00"
|
||||
open-pull-requests-limit: 10
|
19
.golangci.yml
Normal file
19
.golangci.yml
Normal file
@ -0,0 +1,19 @@
|
||||
run:
|
||||
linters-settings:
|
||||
golint:
|
||||
min-confidence: 0.9
|
||||
|
||||
linters:
|
||||
disable-all: true
|
||||
enable:
|
||||
- staticcheck
|
||||
- ineffassign
|
||||
- golint
|
||||
- goimports
|
||||
- errcheck
|
||||
issues:
|
||||
exclude-rules:
|
||||
# Exclude some staticcheck messages
|
||||
- linters:
|
||||
- staticcheck
|
||||
text: "SA9003:"
|
10
.travis.yml
10
.travis.yml
@ -2,17 +2,19 @@ language: go
|
||||
dist: xenial
|
||||
|
||||
go:
|
||||
- "1.11.x"
|
||||
- "1.14.x"
|
||||
|
||||
env:
|
||||
- GO111MODULE=on
|
||||
- GO111MODULE=on GOLANGCI_RELEASE="v1.21.0"
|
||||
|
||||
before_install:
|
||||
- go get github.com/mattn/goveralls
|
||||
- go get github.com/lawrencewoodman/roveralls
|
||||
- GO111MODULE=off go get github.com/mattn/goveralls
|
||||
- GO111MODULE=off go get github.com/lawrencewoodman/roveralls
|
||||
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANGCI_RELEASE}
|
||||
|
||||
script:
|
||||
- make test
|
||||
- make build.docker
|
||||
- make check
|
||||
- roveralls
|
||||
- goveralls -v -coverprofile=roveralls.coverprofile -service=travis-ci
|
||||
|
@ -1,3 +1,2 @@
|
||||
Mikkel Larsen <mikkel.larsen@zalando.de>
|
||||
Arjun Naik <arjun.naik@zalando.de>
|
||||
Team Teapot <team-teapot@zalando.de>
|
||||
|
4
Makefile
4
Makefile
@ -19,8 +19,8 @@ test:
|
||||
go test -v $(GOPKGS)
|
||||
|
||||
check:
|
||||
golint $(GOPKGS)
|
||||
go vet -v $(GOPKGS)
|
||||
go mod download
|
||||
golangci-lint run --timeout=2m ./...
|
||||
|
||||
build.local: build/$(BINARY)
|
||||
build.linux: build/linux/$(BINARY)
|
||||
|
@ -1,7 +1,8 @@
|
||||
We acknowledge that every line of code that we write may potentially contain security issues.
|
||||
We are trying to deal with it responsibly and provide patches as quickly as possible.
|
||||
|
||||
We are trying to deal with it responsibly and provide patches as quickly as possible. If you have anything to report to us please use the following channels:
|
||||
We host our bug bounty program on HackerOne, it is currently private, therefore if you would like to report a vulnerability and get rewarded for it, please ask to join our program by filling this form:
|
||||
|
||||
Email: Tech-Security@zalando.de
|
||||
OR
|
||||
Submit your vulnerability report through our bug bounty program at: https://hackerone.com/zalando
|
||||
https://corporate.zalando.com/en/services-and-contact#security-form
|
||||
|
||||
You can also send you report via this form if you do not want to join our bug bounty program and just want to report a vulnerability or security issue.
|
||||
|
@ -2,7 +2,13 @@ version: "2017-09-20"
|
||||
pipeline:
|
||||
- id: build
|
||||
overlay: ci/golang
|
||||
cache:
|
||||
paths:
|
||||
- /go/pkg/mod # pkg cache for Go modules
|
||||
- ~/.cache/go-build # Go build cache
|
||||
type: script
|
||||
env:
|
||||
GOFLAGS: "-mod=readonly"
|
||||
commands:
|
||||
- desc: test
|
||||
cmd: |
|
||||
@ -14,7 +20,11 @@ pipeline:
|
||||
cmd: |
|
||||
if [[ $CDP_TARGET_BRANCH == master && ! $CDP_PULL_REQUEST_NUMBER ]]; then
|
||||
IMAGE=registry-write.opensource.zalan.do/teapot/kube-metrics-adapter
|
||||
VERSION=$(git describe --tags --always)
|
||||
else
|
||||
IMAGE=registry-write.opensource.zalan.do/teapot/kube-metrics-adapter-test
|
||||
VERSION=$CDP_BUILD_VERSION
|
||||
fi
|
||||
IMAGE=$IMAGE VERSION=$CDP_BUILD_VERSION make build.push
|
||||
IMAGE=$IMAGE VERSION=$VERSION make build.docker
|
||||
git diff --stat --exit-code
|
||||
IMAGE=$IMAGE VERSION=$VERSION make build.push
|
||||
|
@ -1,4 +1,4 @@
|
||||
apiVersion: autoscaling/v2beta1
|
||||
apiVersion: autoscaling/v2beta2
|
||||
kind: HorizontalPodAutoscaler
|
||||
metadata:
|
||||
name: custom-metrics-consumer
|
||||
@ -25,24 +25,36 @@ spec:
|
||||
# - type: Resource
|
||||
# resource:
|
||||
# name: cpu
|
||||
# targetAverageUtilization: 50
|
||||
# current:
|
||||
# averageUtilization: 50
|
||||
|
||||
- type: Pods
|
||||
pods:
|
||||
metricName: queue-length
|
||||
targetAverageValue: 1k
|
||||
metric:
|
||||
name: queue-length
|
||||
target:
|
||||
averageValue: 1k
|
||||
type: AverageValue
|
||||
|
||||
- type: Object
|
||||
object:
|
||||
metricName: requests-per-second
|
||||
target:
|
||||
describedObject:
|
||||
apiVersion: extensions/v1beta1
|
||||
kind: Ingress
|
||||
name: custom-metrics-consumer
|
||||
targetValue: 10 # this will be treated as targetAverageValue
|
||||
metric:
|
||||
name: requests-per-second
|
||||
target:
|
||||
averageValue: "10"
|
||||
type: AverageValue
|
||||
- type: External
|
||||
external:
|
||||
metricName: sqs-queue-length
|
||||
metricSelector:
|
||||
matchLabels:
|
||||
queue-name: foobar
|
||||
region: eu-central-1
|
||||
targetAverageValue: 30
|
||||
metric:
|
||||
name: sqs-queue-length
|
||||
selector:
|
||||
matchLabels:
|
||||
queue-name: foobar
|
||||
region: eu-central-1
|
||||
target:
|
||||
averageValue: "30"
|
||||
type: AverageValue
|
||||
|
@ -3,13 +3,15 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
func metricsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
w.Write([]byte(fmt.Sprintf(`{"queue": {"length": %d}}`, size)))
|
||||
_, err := w.Write([]byte(fmt.Sprintf(`{"queue": {"length": %d}}`, size)))
|
||||
log.Fatalf("failed to write: %v", err)
|
||||
}
|
||||
|
||||
var (
|
||||
@ -29,5 +31,5 @@ func main() {
|
||||
ReadTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
server.ListenAndServe()
|
||||
log.Fatal(server.ListenAndServe())
|
||||
}
|
||||
|
111
go.mod
111
go.mod
@ -1,91 +1,34 @@
|
||||
module github.com/zalando-incubator/kube-metrics-adapter
|
||||
|
||||
require (
|
||||
bitbucket.org/ww/goautoneg v0.0.0-20120707110453-75cd24fc2f2c // indirect
|
||||
github.com/BurntSushi/toml v0.3.0 // indirect
|
||||
github.com/NYTimes/gziphandler v1.0.1 // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.0 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/aws/aws-sdk-go v1.16.6
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
|
||||
github.com/boltdb/bolt v1.3.1 // indirect
|
||||
github.com/coreos/bbolt v1.3.0 // indirect
|
||||
github.com/coreos/etcd v3.3.9+incompatible // indirect
|
||||
github.com/coreos/go-semver v0.2.0 // indirect
|
||||
github.com/coreos/go-systemd v0.0.0-20180705093442-88bfeed483d3 // indirect
|
||||
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
|
||||
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
|
||||
github.com/emicklei/go-restful v2.8.0+incompatible // indirect
|
||||
github.com/emicklei/go-restful-swagger12 v0.0.0-20170926063155-7524189396c6 // indirect
|
||||
github.com/evanphx/json-patch v3.0.0+incompatible // indirect
|
||||
github.com/fsnotify/fsnotify v1.4.7 // indirect
|
||||
github.com/ghodss/yaml v1.0.0 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.0.0-20180322222829-3a0015ad55fa // indirect
|
||||
github.com/go-openapi/jsonreference v0.0.0-20180322222742-3fb327e6747d // indirect
|
||||
github.com/go-openapi/spec v0.0.0-20180801175345-384415f06ee2 // indirect
|
||||
github.com/go-openapi/swag v0.0.0-20180715190254-becd2f08beaf // indirect
|
||||
github.com/gogo/protobuf v1.1.1 // indirect
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
|
||||
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
|
||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
|
||||
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect
|
||||
github.com/aws/aws-sdk-go v1.35.0
|
||||
github.com/googleapis/gnostic v0.2.0 // indirect
|
||||
github.com/gorilla/websocket v1.3.0 // indirect
|
||||
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
|
||||
github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47 // indirect
|
||||
github.com/hpcloud/tail v1.0.0 // indirect
|
||||
github.com/imdario/mergo v0.3.6 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/jonboulle/clockwork v0.1.0 // indirect
|
||||
github.com/json-iterator/go v1.1.5 // indirect
|
||||
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20180824182428-26e5299457d3
|
||||
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
|
||||
github.com/onsi/ginkgo v1.6.0 // indirect
|
||||
github.com/onsi/gomega v1.4.1 // indirect
|
||||
github.com/pborman/uuid v0.0.0-20180122190007-c65b2f87fee3 // indirect
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_golang v0.9.0-pre1.0.20180824101016-4eb539fa85a2
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect
|
||||
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e
|
||||
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 // indirect
|
||||
github.com/sirupsen/logrus v1.0.6
|
||||
github.com/soheilhy/cmux v0.1.4 // indirect
|
||||
github.com/spf13/cobra v0.0.3
|
||||
github.com/spf13/pflag v1.0.2 // indirect
|
||||
github.com/stretchr/testify v1.2.2
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
|
||||
github.com/ugorji/go v1.1.1 // indirect
|
||||
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect
|
||||
github.com/influxdata/influxdb-client-go v0.1.5
|
||||
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20200618121405-54026617ec44
|
||||
github.com/lib/pq v1.2.0 // indirect
|
||||
github.com/mattn/go-colorable v0.1.4 // indirect
|
||||
github.com/onsi/gomega v1.8.1 // indirect
|
||||
github.com/prometheus/client_golang v1.7.1
|
||||
github.com/prometheus/common v0.14.0
|
||||
github.com/sirupsen/logrus v1.7.0
|
||||
github.com/spf13/cobra v0.0.7
|
||||
github.com/spyzhov/ajson v0.4.2
|
||||
github.com/stretchr/testify v1.6.1
|
||||
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
|
||||
golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e // indirect
|
||||
golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
|
||||
golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 // indirect
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect
|
||||
google.golang.org/appengine v1.2.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 // indirect
|
||||
google.golang.org/grpc v1.14.0 // indirect
|
||||
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
|
||||
gopkg.in/fsnotify.v1 v1.4.7 // indirect
|
||||
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 // indirect
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||
gopkg.in/yaml.v2 v2.2.1 // indirect
|
||||
k8s.io/api v0.0.0-20180628040859-072894a440bd
|
||||
k8s.io/apimachinery v0.0.0-20180621070125-103fd098999d
|
||||
k8s.io/apiserver v0.0.0-20180628044425-01459b68eb5f
|
||||
k8s.io/client-go v8.0.0+incompatible
|
||||
k8s.io/kube-openapi v0.0.0-20180731170545-e3762e86a74c // indirect
|
||||
k8s.io/metrics v0.0.0-20180718014405-6efa0bfaa5c1
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||
golang.org/x/tools v0.0.0-20200204192400-7124308813f3 // indirect
|
||||
gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e // indirect
|
||||
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
|
||||
k8s.io/api v0.18.8
|
||||
k8s.io/apimachinery v0.18.8
|
||||
k8s.io/client-go v0.18.8
|
||||
k8s.io/component-base v0.18.8
|
||||
k8s.io/klog v1.0.0
|
||||
k8s.io/metrics v0.18.8
|
||||
)
|
||||
|
||||
// use version that provides: https://github.com/kubernetes-sigs/custom-metrics-apiserver/pull/65
|
||||
replace github.com/kubernetes-incubator/custom-metrics-apiserver => github.com/mikkeloscar/custom-metrics-apiserver v0.0.0-20200626202535-cd2d550cf55d
|
||||
|
||||
go 1.13
|
||||
|
40
how-to/skipper_setup.md
Normal file
40
how-to/skipper_setup.md
Normal file
@ -0,0 +1,40 @@
|
||||
# Skipper Prometheus Metrics Collection
|
||||
|
||||
The skipper-ingress pods should be configured to be scraped by Prometheus. This
|
||||
can be done by Prometheus service discovery using discovery of Kubernetes services
|
||||
or Kubernetes pods:
|
||||
|
||||
```yaml
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
annotations:
|
||||
prometheus.io/path: /metrics
|
||||
prometheus.io/port: "9911"
|
||||
prometheus.io/scrape: "true"
|
||||
labels:
|
||||
application: skipper-ingress
|
||||
name: skipper-ingress
|
||||
spec:
|
||||
ports:
|
||||
- port: 80
|
||||
protocol: TCP
|
||||
targetPort: 9999
|
||||
selector:
|
||||
application: skipper-ingress
|
||||
type: ClusterIP
|
||||
```
|
||||
This [configuration](https://github.com/zalando-incubator/kubernetes-on-aws/blob/dev/cluster/manifests/prometheus/configmap.yaml#L69)
|
||||
shows how prometheus is configured in our clusters to scrape service endpoints.
|
||||
The annotations `prometheus.io/path`, `prometheus.io/port` and `prometheus.io/scrape`
|
||||
instruct Prometheus to scrape all pods of this service on the port _9911_ and
|
||||
the path `/metrics`.
|
||||
|
||||
When the `kube-metrics-adapter` is started the flag `--prometheus-server` should be set so that
|
||||
the adapter can query prometheus to get aggregated metrics. When running in kubernetes it can
|
||||
be the service address of the prometheus service like `http://prometheus.kube-system`.
|
||||
|
||||
With these settings the `kube-metrics-adapter` can provide `request-per-second` metrics for ingress
|
||||
objects which are present in the cluster. The prometheus instances scrape the metrics from
|
||||
the `skipper-ingress` pods. The adapter then queries prometheus to get the metric and then
|
||||
provides them to the API server when requested.
|
3
main.go
3
main.go
@ -18,12 +18,13 @@ package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"runtime"
|
||||
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/server"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/util/logs"
|
||||
"k8s.io/component-base/logs"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
101
pkg/annotations/parser.go
Normal file
101
pkg/annotations/parser.go
Normal file
@ -0,0 +1,101 @@
|
||||
package annotations
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
)
|
||||
|
||||
const (
|
||||
customMetricsPrefix = "metric-config."
|
||||
perReplicaMetricsConfKey = "per-replica"
|
||||
intervalMetricsConfKey = "interval"
|
||||
)
|
||||
|
||||
type AnnotationConfigs struct {
|
||||
CollectorName string
|
||||
Configs map[string]string
|
||||
PerReplica bool
|
||||
Interval time.Duration
|
||||
}
|
||||
|
||||
type MetricConfigKey struct {
|
||||
Type autoscalingv2.MetricSourceType
|
||||
MetricName string
|
||||
}
|
||||
|
||||
type AnnotationConfigMap map[MetricConfigKey]*AnnotationConfigs
|
||||
|
||||
func (m AnnotationConfigMap) Parse(annotations map[string]string) error {
|
||||
for key, val := range annotations {
|
||||
if !strings.HasPrefix(key, customMetricsPrefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
parts := strings.Split(key, "/")
|
||||
if len(parts) != 2 {
|
||||
// TODO: error?
|
||||
continue
|
||||
}
|
||||
|
||||
configs := strings.Split(parts[0], ".")
|
||||
if len(configs) != 4 {
|
||||
// TODO: error?
|
||||
continue
|
||||
}
|
||||
|
||||
key := MetricConfigKey{
|
||||
MetricName: configs[2],
|
||||
}
|
||||
|
||||
switch configs[1] {
|
||||
case "pods":
|
||||
key.Type = autoscalingv2.PodsMetricSourceType
|
||||
case "object":
|
||||
key.Type = autoscalingv2.ObjectMetricSourceType
|
||||
default:
|
||||
key.Type = autoscalingv2.ExternalMetricSourceType
|
||||
}
|
||||
|
||||
metricCollector := configs[3]
|
||||
|
||||
config, ok := m[key]
|
||||
if !ok {
|
||||
config = &AnnotationConfigs{
|
||||
CollectorName: metricCollector,
|
||||
Configs: map[string]string{},
|
||||
}
|
||||
m[key] = config
|
||||
}
|
||||
|
||||
// TODO: fail if collector name doesn't match
|
||||
if config.CollectorName != metricCollector {
|
||||
continue
|
||||
}
|
||||
|
||||
if parts[1] == perReplicaMetricsConfKey {
|
||||
config.PerReplica = true
|
||||
continue
|
||||
}
|
||||
|
||||
if parts[1] == intervalMetricsConfKey {
|
||||
interval, err := time.ParseDuration(val)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse interval value %s for %s: %v", val, key, err)
|
||||
}
|
||||
config.Interval = interval
|
||||
continue
|
||||
}
|
||||
|
||||
config.Configs[parts[1]] = val
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m AnnotationConfigMap) GetAnnotationConfig(metricName string, metricType autoscalingv2.MetricSourceType) (*AnnotationConfigs, bool) {
|
||||
key := MetricConfigKey{MetricName: metricName, Type: metricType}
|
||||
config, ok := m[key]
|
||||
return config, ok
|
||||
}
|
121
pkg/annotations/parser_test.go
Normal file
121
pkg/annotations/parser_test.go
Normal file
@ -0,0 +1,121 @@
|
||||
package annotations
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
)
|
||||
|
||||
func TestParser(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
Name string
|
||||
Annotations map[string]string
|
||||
MetricName string
|
||||
MetricType autoscalingv2.MetricSourceType
|
||||
ExpectedConfig map[string]string
|
||||
PerReplica bool
|
||||
}{
|
||||
{
|
||||
Name: "no annotations",
|
||||
Annotations: map[string]string{},
|
||||
ExpectedConfig: map[string]string{},
|
||||
},
|
||||
{
|
||||
Name: "pod metrics",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps",
|
||||
"metric-config.pods.requests-per-second.json-path/path": "/metrics",
|
||||
"metric-config.pods.requests-per-second.json-path/port": "9090",
|
||||
"metric-config.pods.requests-per-second.json-path/scheme": "https",
|
||||
},
|
||||
MetricName: "requests-per-second",
|
||||
MetricType: autoscalingv2.PodsMetricSourceType,
|
||||
ExpectedConfig: map[string]string{
|
||||
"json-key": "$.http_server.rps",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
"scheme": "https",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "prometheus metrics",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.object.processed-events-per-second.prometheus/query": "scalar(sum(rate(event-service_events_count{application=\"event-service\",processed=\"true\"}[1m])))",
|
||||
"metric-config.object.processed-events-per-second.prometheus/per-replica": "true",
|
||||
},
|
||||
MetricName: "processed-events-per-second",
|
||||
MetricType: autoscalingv2.ObjectMetricSourceType,
|
||||
ExpectedConfig: map[string]string{
|
||||
"query": "scalar(sum(rate(event-service_events_count{application=\"event-service\",processed=\"true\"}[1m])))",
|
||||
},
|
||||
PerReplica: true,
|
||||
},
|
||||
{
|
||||
Name: "zmon collector",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.external.zmon-check.zmon/key": "custom.*",
|
||||
"metric-config.external.zmon-check.zmon/tag-application": "my-custom-app-*",
|
||||
},
|
||||
MetricName: "zmon-check",
|
||||
MetricType: autoscalingv2.ExternalMetricSourceType,
|
||||
ExpectedConfig: map[string]string{
|
||||
"key": "custom.*",
|
||||
"tag-application": "my-custom-app-*",
|
||||
},
|
||||
PerReplica: false,
|
||||
},
|
||||
{
|
||||
Name: "influxdb metrics",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.external.flux-query.influxdb/range1m": `from(bucket: "?") |> range(start: -1m)`,
|
||||
"metric-config.external.flux-query.influxdb/range2m": `from(bucket: "?") |> range(start: -2m)`,
|
||||
"metric-config.external.flux-query.influxdb/range3m": `from(bucket: "?") |> range(start: -3m)`,
|
||||
"metric-config.external.flux-query.influxdb/address": "http://localhost:9999",
|
||||
"metric-config.external.flux-query.influxdb/token": "sEcr3TT0ken",
|
||||
"metric-config.external.flux-query.influxdb/org-id": "deadbeef",
|
||||
},
|
||||
MetricName: "flux-query",
|
||||
MetricType: autoscalingv2.ExternalMetricSourceType,
|
||||
ExpectedConfig: map[string]string{
|
||||
"range1m": `from(bucket: "?") |> range(start: -1m)`,
|
||||
"range2m": `from(bucket: "?") |> range(start: -2m)`,
|
||||
"range3m": `from(bucket: "?") |> range(start: -3m)`,
|
||||
"address": "http://localhost:9999",
|
||||
"token": "sEcr3TT0ken",
|
||||
"org-id": "deadbeef",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "http metrics",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.external.http.json/json-key": "$.metric.value",
|
||||
"metric-config.external.http.json/endpoint": "http://metric-source.source-namespace.svc.cluster.local:8000/metrics",
|
||||
"metric-config.external.http.json/aggregator": "avg",
|
||||
},
|
||||
MetricName: "http",
|
||||
MetricType: autoscalingv2.ExternalMetricSourceType,
|
||||
ExpectedConfig: map[string]string{
|
||||
"json-key": "$.metric.value",
|
||||
"endpoint": "http://metric-source.source-namespace.svc.cluster.local:8000/metrics",
|
||||
"aggregator": "avg",
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
hpaMap := make(AnnotationConfigMap)
|
||||
err := hpaMap.Parse(tc.Annotations)
|
||||
require.NoError(t, err)
|
||||
config, present := hpaMap.GetAnnotationConfig(tc.MetricName, tc.MetricType)
|
||||
if len(tc.ExpectedConfig) == 0 {
|
||||
require.False(t, present)
|
||||
return
|
||||
}
|
||||
require.True(t, present)
|
||||
for k, v := range tc.ExpectedConfig {
|
||||
require.Equal(t, v, config.Configs[k])
|
||||
}
|
||||
require.Equal(t, tc.PerReplica, config.PerReplica)
|
||||
})
|
||||
}
|
||||
}
|
@ -9,7 +9,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/sqs"
|
||||
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
@ -32,13 +32,13 @@ func NewAWSCollectorPlugin(sessions map[string]*session.Session) *AWSCollectorPl
|
||||
}
|
||||
|
||||
// NewCollector initializes a new skipper collector from the specified HPA.
|
||||
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
switch config.Name {
|
||||
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
switch config.Metric.Name {
|
||||
case AWSSQSQueueLengthMetric:
|
||||
return NewAWSSQSCollector(c.sessions, config, interval)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
|
||||
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
|
||||
}
|
||||
|
||||
type AWSSQSCollector struct {
|
||||
@ -47,18 +47,20 @@ type AWSSQSCollector struct {
|
||||
region string
|
||||
queueURL string
|
||||
queueName string
|
||||
labels map[string]string
|
||||
metricName string
|
||||
metricType autoscalingv2beta1.MetricSourceType
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
}
|
||||
|
||||
func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConfig, interval time.Duration) (*AWSSQSCollector, error) {
|
||||
if config.Metric.Selector == nil {
|
||||
return nil, fmt.Errorf("selector for queue is not specified")
|
||||
}
|
||||
|
||||
name, ok := config.Labels[sqsQueueNameLabelKey]
|
||||
name, ok := config.Config[sqsQueueNameLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("sqs queue name not specified on metric")
|
||||
}
|
||||
region, ok := config.Labels[sqsQueueRegionLabelKey]
|
||||
region, ok := config.Config[sqsQueueRegionLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("sqs queue region is not specified on metric")
|
||||
}
|
||||
@ -83,9 +85,8 @@ func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConf
|
||||
interval: interval,
|
||||
queueURL: aws.StringValue(resp.QueueUrl),
|
||||
queueName: name,
|
||||
metricName: config.Name,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
labels: config.Labels,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -109,8 +110,8 @@ func (c *AWSSQSCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
metricValue := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metricName,
|
||||
MetricLabels: c.labels,
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewQuantity(int64(i), resource.DecimalSI),
|
||||
},
|
||||
|
@ -2,22 +2,16 @@ package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/annotations"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
customMetricsPrefix = "metric-config."
|
||||
perReplicaMetricsConfKey = "per-replica"
|
||||
intervalMetricsConfKey = "interval"
|
||||
)
|
||||
|
||||
type ObjectReference struct {
|
||||
autoscalingv2beta1.CrossVersionObjectReference
|
||||
autoscalingv2.CrossVersionObjectReference
|
||||
Namespace string
|
||||
}
|
||||
|
||||
@ -49,7 +43,15 @@ func NewCollectorFactory() *CollectorFactory {
|
||||
}
|
||||
|
||||
type CollectorPlugin interface {
|
||||
NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
|
||||
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
|
||||
}
|
||||
|
||||
type PluginNotFoundError struct {
|
||||
metricTypeName MetricTypeName
|
||||
}
|
||||
|
||||
func (p *PluginNotFoundError) Error() string {
|
||||
return fmt.Sprintf("no plugin found for %s", p.metricTypeName)
|
||||
}
|
||||
|
||||
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
|
||||
@ -106,9 +108,9 @@ func (c *CollectorFactory) RegisterExternalCollector(metrics []string, plugin Co
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
switch config.Type {
|
||||
case autoscalingv2beta1.PodsMetricSourceType:
|
||||
case autoscalingv2.PodsMetricSourceType:
|
||||
// first try to find a plugin by format
|
||||
if plugin, ok := c.podsPlugins.Named[config.CollectorName]; ok {
|
||||
return plugin.NewCollector(hpa, config, interval)
|
||||
@ -118,7 +120,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
|
||||
if c.podsPlugins.Any != nil {
|
||||
return c.podsPlugins.Any.NewCollector(hpa, config, interval)
|
||||
}
|
||||
case autoscalingv2beta1.ObjectMetricSourceType:
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
// first try to find a plugin by kind
|
||||
if kinds, ok := c.objectPlugins.Named[config.ObjectReference.Kind]; ok {
|
||||
if plugin, ok := kinds.Named[config.CollectorName]; ok {
|
||||
@ -139,40 +141,24 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
|
||||
if c.objectPlugins.Any.Any != nil {
|
||||
return c.objectPlugins.Any.Any.NewCollector(hpa, config, interval)
|
||||
}
|
||||
case autoscalingv2beta1.ExternalMetricSourceType:
|
||||
if plugin, ok := c.externalPlugins[config.Name]; ok {
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
if plugin, ok := c.externalPlugins[config.Metric.Name]; ok {
|
||||
return plugin.NewCollector(hpa, config, interval)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
|
||||
}
|
||||
|
||||
func getObjectReference(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, metricName string) (custom_metrics.ObjectReference, error) {
|
||||
for _, metric := range hpa.Spec.Metrics {
|
||||
if metric.Type == autoscalingv2beta1.ObjectMetricSourceType && metric.Object.MetricName == metricName {
|
||||
return custom_metrics.ObjectReference{
|
||||
APIVersion: metric.Object.Target.APIVersion,
|
||||
Kind: metric.Object.Target.Kind,
|
||||
Name: metric.Object.Target.Name,
|
||||
Namespace: hpa.Namespace,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return custom_metrics.ObjectReference{}, fmt.Errorf("failed to find object reference")
|
||||
return nil, &PluginNotFoundError{metricTypeName: config.MetricTypeName}
|
||||
}
|
||||
|
||||
type MetricTypeName struct {
|
||||
Type autoscalingv2beta1.MetricSourceType
|
||||
Name string
|
||||
Type autoscalingv2.MetricSourceType
|
||||
Metric autoscalingv2.MetricIdentifier
|
||||
}
|
||||
|
||||
type CollectedMetric struct {
|
||||
Type autoscalingv2beta1.MetricSourceType
|
||||
Type autoscalingv2.MetricSourceType
|
||||
Custom custom_metrics.MetricValue
|
||||
External external_metrics.ExternalMetricValue
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
type Collector interface {
|
||||
@ -187,83 +173,17 @@ type MetricConfig struct {
|
||||
ObjectReference custom_metrics.ObjectReference
|
||||
PerReplica bool
|
||||
Interval time.Duration
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
func parseCustomMetricsAnnotations(annotations map[string]string) (map[MetricTypeName]*MetricConfig, error) {
|
||||
metrics := make(map[MetricTypeName]*MetricConfig)
|
||||
for key, val := range annotations {
|
||||
if !strings.HasPrefix(key, customMetricsPrefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
parts := strings.Split(key, "/")
|
||||
if len(parts) != 2 {
|
||||
// TODO: error?
|
||||
continue
|
||||
}
|
||||
|
||||
configs := strings.Split(parts[0], ".")
|
||||
if len(configs) != 4 {
|
||||
// TODO: error?
|
||||
continue
|
||||
}
|
||||
|
||||
metricTypeName := MetricTypeName{
|
||||
Name: configs[2],
|
||||
}
|
||||
|
||||
switch configs[1] {
|
||||
case "pods":
|
||||
metricTypeName.Type = autoscalingv2beta1.PodsMetricSourceType
|
||||
case "object":
|
||||
metricTypeName.Type = autoscalingv2beta1.ObjectMetricSourceType
|
||||
}
|
||||
|
||||
metricCollector := configs[3]
|
||||
|
||||
config, ok := metrics[metricTypeName]
|
||||
if !ok {
|
||||
config = &MetricConfig{
|
||||
MetricTypeName: metricTypeName,
|
||||
CollectorName: metricCollector,
|
||||
Config: map[string]string{},
|
||||
}
|
||||
metrics[metricTypeName] = config
|
||||
}
|
||||
|
||||
// TODO: fail if collector name doesn't match
|
||||
if config.CollectorName != metricCollector {
|
||||
continue
|
||||
}
|
||||
|
||||
if parts[1] == perReplicaMetricsConfKey {
|
||||
config.PerReplica = true
|
||||
continue
|
||||
}
|
||||
|
||||
if parts[1] == intervalMetricsConfKey {
|
||||
interval, err := time.ParseDuration(val)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse interval value %s for %s: %v", val, key, err)
|
||||
}
|
||||
config.Interval = interval
|
||||
continue
|
||||
}
|
||||
|
||||
config.Config[parts[1]] = val
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
MetricSpec autoscalingv2.MetricSpec
|
||||
}
|
||||
|
||||
// ParseHPAMetrics parses the HPA object into a list of metric configurations.
|
||||
func ParseHPAMetrics(hpa *autoscalingv2beta1.HorizontalPodAutoscaler) ([]*MetricConfig, error) {
|
||||
func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfig, error) {
|
||||
metricConfigs := make([]*MetricConfig, 0, len(hpa.Spec.Metrics))
|
||||
|
||||
// TODO: validate that the specified metric names are defined
|
||||
// in the HPA
|
||||
configs, err := parseCustomMetricsAnnotations(hpa.Annotations)
|
||||
parser := make(annotations.AnnotationConfigMap)
|
||||
err := parser.Parse(hpa.Annotations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -275,39 +195,49 @@ func ParseHPAMetrics(hpa *autoscalingv2beta1.HorizontalPodAutoscaler) ([]*Metric
|
||||
|
||||
var ref custom_metrics.ObjectReference
|
||||
switch metric.Type {
|
||||
case autoscalingv2beta1.PodsMetricSourceType:
|
||||
typeName.Name = metric.Pods.MetricName
|
||||
case autoscalingv2beta1.ObjectMetricSourceType:
|
||||
typeName.Name = metric.Object.MetricName
|
||||
case autoscalingv2.PodsMetricSourceType:
|
||||
typeName.Metric = metric.Pods.Metric
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
typeName.Metric = metric.Object.Metric
|
||||
ref = custom_metrics.ObjectReference{
|
||||
APIVersion: metric.Object.Target.APIVersion,
|
||||
Kind: metric.Object.Target.Kind,
|
||||
Name: metric.Object.Target.Name,
|
||||
APIVersion: metric.Object.DescribedObject.APIVersion,
|
||||
Kind: metric.Object.DescribedObject.Kind,
|
||||
Name: metric.Object.DescribedObject.Name,
|
||||
Namespace: hpa.Namespace,
|
||||
}
|
||||
case autoscalingv2beta1.ExternalMetricSourceType:
|
||||
typeName.Name = metric.External.MetricName
|
||||
case autoscalingv2beta1.ResourceMetricSourceType:
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
typeName.Metric = metric.External.Metric
|
||||
case autoscalingv2.ResourceMetricSourceType:
|
||||
continue // kube-metrics-adapter does not collect resource metrics
|
||||
}
|
||||
|
||||
if config, ok := configs[typeName]; ok {
|
||||
config.ObjectReference = ref
|
||||
metricConfigs = append(metricConfigs, config)
|
||||
continue
|
||||
}
|
||||
|
||||
config := &MetricConfig{
|
||||
MetricTypeName: typeName,
|
||||
ObjectReference: ref,
|
||||
Config: map[string]string{},
|
||||
MetricSpec: metric,
|
||||
}
|
||||
|
||||
if metric.Type == autoscalingv2beta1.ExternalMetricSourceType {
|
||||
config.Labels = metric.External.MetricSelector.MatchLabels
|
||||
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
|
||||
metric.External.Metric.Selector != nil &&
|
||||
metric.External.Metric.Selector.MatchLabels != nil {
|
||||
for k, v := range metric.External.Metric.Selector.MatchLabels {
|
||||
config.Config[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)
|
||||
if present {
|
||||
config.CollectorName = annotationConfigs.CollectorName
|
||||
config.Interval = annotationConfigs.Interval
|
||||
config.PerReplica = annotationConfigs.PerReplica
|
||||
// configs specified in annotations takes precedence
|
||||
// over labels
|
||||
for k, v := range annotationConfigs.Configs {
|
||||
config.Config[k] = v
|
||||
}
|
||||
}
|
||||
metricConfigs = append(metricConfigs, config)
|
||||
}
|
||||
|
||||
return metricConfigs, nil
|
||||
}
|
||||
|
103
pkg/collector/http_collector.go
Normal file
103
pkg/collector/http_collector.go
Normal file
@ -0,0 +1,103 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector/httpmetrics"
|
||||
|
||||
"k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
HTTPMetricName = "http"
|
||||
HTTPEndpointAnnotationKey = "endpoint"
|
||||
HTTPJsonPathAnnotationKey = "json-key"
|
||||
identifierLabel = "identifier"
|
||||
)
|
||||
|
||||
type HTTPCollectorPlugin struct{}
|
||||
|
||||
func NewHTTPCollectorPlugin() (*HTTPCollectorPlugin, error) {
|
||||
return &HTTPCollectorPlugin{}, nil
|
||||
}
|
||||
|
||||
func (p *HTTPCollectorPlugin) NewCollector(_ *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
collector := &HTTPCollector{}
|
||||
var (
|
||||
value string
|
||||
ok bool
|
||||
)
|
||||
if value, ok = config.Config[HTTPJsonPathAnnotationKey]; !ok {
|
||||
return nil, fmt.Errorf("config value %s not found", HTTPJsonPathAnnotationKey)
|
||||
}
|
||||
jsonPath := value
|
||||
|
||||
if value, ok = config.Config[HTTPEndpointAnnotationKey]; !ok {
|
||||
return nil, fmt.Errorf("config value %s not found", HTTPEndpointAnnotationKey)
|
||||
}
|
||||
var err error
|
||||
collector.endpoint, err = url.Parse(value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
collector.interval = interval
|
||||
collector.metricType = config.Type
|
||||
if config.Metric.Selector == nil || config.Metric.Selector.MatchLabels == nil {
|
||||
return nil, fmt.Errorf("no label selector specified for metric: %s", config.Metric.Name)
|
||||
}
|
||||
if _, ok := config.Metric.Selector.MatchLabels[identifierLabel]; !ok {
|
||||
return nil, fmt.Errorf("%s is not specified as a label for metric %s", identifierLabel, config.Metric.Name)
|
||||
}
|
||||
collector.metric = config.Metric
|
||||
var aggFunc httpmetrics.AggregatorFunc
|
||||
|
||||
if val, ok := config.Config["aggregator"]; ok {
|
||||
aggFunc, err = httpmetrics.ParseAggregator(val)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
jsonPathGetter, err := httpmetrics.NewJSONPathMetricsGetter(httpmetrics.DefaultMetricsHTTPClient(), aggFunc, jsonPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
collector.metricsGetter = jsonPathGetter
|
||||
return collector, nil
|
||||
}
|
||||
|
||||
type HTTPCollector struct {
|
||||
endpoint *url.URL
|
||||
interval time.Duration
|
||||
metricType v2beta2.MetricSourceType
|
||||
metricsGetter *httpmetrics.JSONPathMetricsGetter
|
||||
metric v2beta2.MetricIdentifier
|
||||
}
|
||||
|
||||
func (c *HTTPCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
metric, err := c.metricsGetter.GetMetric(*c.endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
value := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Time{
|
||||
Time: time.Now(),
|
||||
},
|
||||
Value: *resource.NewMilliQuantity(int64(metric*1000), resource.DecimalSI),
|
||||
},
|
||||
}
|
||||
return []CollectedMetric{value}, nil
|
||||
}
|
||||
|
||||
func (c *HTTPCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
94
pkg/collector/http_collector_test.go
Normal file
94
pkg/collector/http_collector_test.go
Normal file
@ -0,0 +1,94 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/api/autoscaling/v2beta2"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
type testExternalMetricsHandler struct {
|
||||
values []int64
|
||||
test *testing.T
|
||||
}
|
||||
|
||||
func (t testExternalMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
response, err := json.Marshal(testMetricResponse{t.values})
|
||||
require.NoError(t.test, err)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, err = w.Write(response)
|
||||
require.NoError(t.test, err)
|
||||
}
|
||||
|
||||
func makeHTTPTestServer(t *testing.T, values []int64) string {
|
||||
server := httptest.NewServer(&testExternalMetricsHandler{values: values, test: t})
|
||||
return server.URL
|
||||
}
|
||||
|
||||
func TestHTTPCollector(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
values []int64
|
||||
output int
|
||||
aggregator string
|
||||
}{
|
||||
{
|
||||
name: "basic",
|
||||
values: []int64{3},
|
||||
output: 3,
|
||||
aggregator: "sum",
|
||||
},
|
||||
{
|
||||
name: "sum",
|
||||
values: []int64{3, 5, 6},
|
||||
aggregator: "sum",
|
||||
output: 14,
|
||||
},
|
||||
{
|
||||
name: "average",
|
||||
values: []int64{3, 5, 6},
|
||||
aggregator: "sum",
|
||||
output: 14,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
testServer := makeHTTPTestServer(t, tc.values)
|
||||
plugin, err := NewHTTPCollectorPlugin()
|
||||
require.NoError(t, err)
|
||||
testConfig := makeTestHTTPCollectorConfig(testServer, tc.aggregator)
|
||||
collector, err := plugin.NewCollector(nil, testConfig, testInterval)
|
||||
require.NoError(t, err)
|
||||
metrics, err := collector.GetMetrics()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, metrics)
|
||||
require.Len(t, metrics, 1)
|
||||
require.EqualValues(t, metrics[0].External.Value.Value(), tc.output)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestHTTPCollectorConfig(endpoint, aggregator string) *MetricConfig {
|
||||
config := &MetricConfig{
|
||||
MetricTypeName: MetricTypeName{
|
||||
Type: v2beta2.ExternalMetricSourceType,
|
||||
Metric: v2beta2.MetricIdentifier{
|
||||
Name: "test-metric",
|
||||
Selector: &v1.LabelSelector{
|
||||
MatchLabels: map[string]string{identifierLabel: "test-metric"},
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: map[string]string{
|
||||
HTTPJsonPathAnnotationKey: "$.values",
|
||||
HTTPEndpointAnnotationKey: endpoint,
|
||||
},
|
||||
}
|
||||
if aggregator != "" {
|
||||
config.Config["aggregator"] = aggregator
|
||||
}
|
||||
return config
|
||||
}
|
65
pkg/collector/httpmetrics/aggregator.go
Normal file
65
pkg/collector/httpmetrics/aggregator.go
Normal file
@ -0,0 +1,65 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
)
|
||||
|
||||
type AggregatorFunc func(...float64) float64
|
||||
|
||||
// Average implements the average mathematical function over a slice of float64
|
||||
func Average(values ...float64) float64 {
|
||||
sum := Sum(values...)
|
||||
return sum / float64(len(values))
|
||||
}
|
||||
|
||||
// Minimum implements the absolute minimum mathematical function over a slice of float64
|
||||
func Minimum(values ...float64) float64 {
|
||||
// initialized with positive infinity, all finite numbers are smaller than it
|
||||
curMin := math.Inf(1)
|
||||
for _, v := range values {
|
||||
if v < curMin {
|
||||
curMin = v
|
||||
}
|
||||
}
|
||||
return curMin
|
||||
}
|
||||
|
||||
// Maximum implements the absolute maximum mathematical function over a slice of float64
|
||||
func Maximum(values ...float64) float64 {
|
||||
// initialized with negative infinity, all finite numbers are bigger than it
|
||||
curMax := math.Inf(-1)
|
||||
for _, v := range values {
|
||||
if v > curMax {
|
||||
curMax = v
|
||||
}
|
||||
}
|
||||
return curMax
|
||||
}
|
||||
|
||||
// Sum implements the summation mathematical function over a slice of float64
|
||||
func Sum(values ...float64) float64 {
|
||||
res := 0.0
|
||||
|
||||
for _, v := range values {
|
||||
res += v
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
|
||||
func ParseAggregator(aggregator string) (AggregatorFunc, error) {
|
||||
switch aggregator {
|
||||
case "avg":
|
||||
return Average, nil
|
||||
case "min":
|
||||
return Minimum, nil
|
||||
case "max":
|
||||
return Maximum, nil
|
||||
case "sum":
|
||||
return Sum, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("aggregator function: %s is unknown", aggregator)
|
||||
}
|
||||
}
|
57
pkg/collector/httpmetrics/aggregator_test.go
Normal file
57
pkg/collector/httpmetrics/aggregator_test.go
Normal file
@ -0,0 +1,57 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestReduce(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
input []float64
|
||||
output float64
|
||||
aggregator string
|
||||
parseError bool
|
||||
}{
|
||||
{
|
||||
input: []float64{1, 2, 3},
|
||||
output: 2.0,
|
||||
aggregator: "avg",
|
||||
parseError: false,
|
||||
},
|
||||
{
|
||||
input: []float64{1, 2, 3},
|
||||
output: 1.0,
|
||||
aggregator: "min",
|
||||
parseError: false,
|
||||
},
|
||||
{
|
||||
input: []float64{1, 2, 3},
|
||||
output: 3.0,
|
||||
aggregator: "max",
|
||||
parseError: false,
|
||||
},
|
||||
{
|
||||
input: []float64{1, 2, 3},
|
||||
output: 6.0,
|
||||
aggregator: "sum",
|
||||
parseError: false,
|
||||
},
|
||||
{
|
||||
input: []float64{1, 2, 3},
|
||||
aggregator: "non-existent",
|
||||
parseError: true,
|
||||
},
|
||||
} {
|
||||
t.Run(fmt.Sprintf("Test function: %s", tc.aggregator), func(t *testing.T) {
|
||||
aggFunc, err := ParseAggregator(tc.aggregator)
|
||||
if tc.parseError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
val := aggFunc(tc.input...)
|
||||
require.Equal(t, tc.output, val)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
128
pkg/collector/httpmetrics/json_path.go
Normal file
128
pkg/collector/httpmetrics/json_path.go
Normal file
@ -0,0 +1,128 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/spyzhov/ajson"
|
||||
)
|
||||
|
||||
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
|
||||
// querying the pods metrics endpoint and lookup the metric value as defined by
|
||||
// the json path query.
|
||||
type JSONPathMetricsGetter struct {
|
||||
jsonPath string
|
||||
aggregator AggregatorFunc
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
||||
func NewJSONPathMetricsGetter(httpClient *http.Client, aggregatorFunc AggregatorFunc, jsonPath string) (*JSONPathMetricsGetter, error) {
|
||||
// check that jsonPath parses
|
||||
_, err := ajson.ParseJSONPath(jsonPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &JSONPathMetricsGetter{client: httpClient, aggregator: aggregatorFunc, jsonPath: jsonPath}, nil
|
||||
}
|
||||
|
||||
var DefaultRequestTimeout = 15 * time.Second
|
||||
var DefaultConnectTimeout = 15 * time.Second
|
||||
|
||||
func CustomMetricsHTTPClient(requestTimeout time.Duration, connectTimeout time.Duration) *http.Client {
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: connectTimeout,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 50,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
},
|
||||
Timeout: requestTimeout,
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
func DefaultMetricsHTTPClient() *http.Client {
|
||||
return CustomMetricsHTTPClient(DefaultRequestTimeout, DefaultConnectTimeout)
|
||||
}
|
||||
|
||||
// GetMetric gets metric from pod by fetching json metrics from the pods metric
|
||||
// endpoint and extracting the desired value using the specified json path
|
||||
// query.
|
||||
func (g *JSONPathMetricsGetter) GetMetric(metricsURL url.URL) (float64, error) {
|
||||
data, err := g.fetchMetrics(metricsURL)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// parse data
|
||||
root, err := ajson.Unmarshal(data)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
nodes, err := root.JSONPath(g.jsonPath)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if len(nodes) != 1 {
|
||||
return 0, fmt.Errorf("unexpected json: expected single numeric or array value")
|
||||
}
|
||||
|
||||
node := nodes[0]
|
||||
if node.IsArray() {
|
||||
if g.aggregator == nil {
|
||||
return 0, fmt.Errorf("no aggregator function has been specified")
|
||||
}
|
||||
values := make([]float64, 0, len(nodes))
|
||||
items, _ := node.GetArray()
|
||||
for _, item := range items {
|
||||
value, err := item.GetNumeric()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("did not find numeric type: %w", err)
|
||||
}
|
||||
values = append(values, value)
|
||||
}
|
||||
return g.aggregator(values...), nil
|
||||
} else if node.IsNumeric() {
|
||||
res, _ := node.GetNumeric()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
value, err := node.Value()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to check value of jsonPath result: %w", err)
|
||||
}
|
||||
return 0, fmt.Errorf("unsupported type %T", value)
|
||||
}
|
||||
|
||||
func (g *JSONPathMetricsGetter) fetchMetrics(metricsURL url.URL) ([]byte, error) {
|
||||
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := g.client.Do(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
84
pkg/collector/httpmetrics/json_path_test.go
Normal file
84
pkg/collector/httpmetrics/json_path_test.go
Normal file
@ -0,0 +1,84 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func makeTestHTTPServer(t *testing.T, response []byte) *httptest.Server {
|
||||
h := func(w http.ResponseWriter, r *http.Request) {
|
||||
require.Equal(t, r.URL.Path, "/metrics")
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, err := w.Write(response)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
return httptest.NewServer(http.HandlerFunc(h))
|
||||
}
|
||||
|
||||
func TestJSONPathMetricsGetter(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
jsonResponse []byte
|
||||
jsonPath string
|
||||
result float64
|
||||
aggregator AggregatorFunc
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "basic single value",
|
||||
jsonResponse: []byte(`{"value":3}`),
|
||||
jsonPath: "$.value",
|
||||
result: 3,
|
||||
aggregator: Average,
|
||||
},
|
||||
{
|
||||
name: "basic average",
|
||||
jsonResponse: []byte(`{"value":[3,4,5]}`),
|
||||
jsonPath: "$.value",
|
||||
result: 4,
|
||||
aggregator: Average,
|
||||
},
|
||||
{
|
||||
name: "dotted key",
|
||||
jsonResponse: []byte(`{"metric.value":5}`),
|
||||
jsonPath: "$['metric.value']",
|
||||
result: 5,
|
||||
aggregator: Average,
|
||||
},
|
||||
{
|
||||
name: "json path not resulting in array or number should lead to error",
|
||||
jsonResponse: []byte(`{"metric.value":5}`),
|
||||
jsonPath: "$['invalid.metric.values']",
|
||||
err: errors.New("unexpected json: expected single numeric or array value"),
|
||||
},
|
||||
{
|
||||
name: "invalid json should error",
|
||||
jsonResponse: []byte(`{`),
|
||||
jsonPath: "$['invalid.metric.values']",
|
||||
err: errors.New("unexpected end of file"),
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
server := makeTestHTTPServer(t, tc.jsonResponse)
|
||||
defer server.Close()
|
||||
getter, err := NewJSONPathMetricsGetter(DefaultMetricsHTTPClient(), tc.aggregator, tc.jsonPath)
|
||||
require.NoError(t, err)
|
||||
url, err := url.Parse(fmt.Sprintf("%s/metrics", server.URL))
|
||||
require.NoError(t, err)
|
||||
metric, err := getter.GetMetric(*url)
|
||||
if tc.err != nil {
|
||||
require.Error(t, err)
|
||||
require.Equal(t, tc.err.Error(), err.Error())
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.result, metric)
|
||||
})
|
||||
}
|
||||
}
|
118
pkg/collector/httpmetrics/pod_metrics.go
Normal file
118
pkg/collector/httpmetrics/pod_metrics.go
Normal file
@ -0,0 +1,118 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
type PodMetricsGetter interface {
|
||||
GetMetric(pod *v1.Pod) (float64, error)
|
||||
}
|
||||
|
||||
type PodMetricsJSONPathGetter struct {
|
||||
scheme string
|
||||
path string
|
||||
rawQuery string
|
||||
port int
|
||||
metricGetter *JSONPathMetricsGetter
|
||||
}
|
||||
|
||||
func (g PodMetricsJSONPathGetter) GetMetric(pod *v1.Pod) (float64, error) {
|
||||
if pod.Status.PodIP == "" {
|
||||
return 0, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Name)
|
||||
}
|
||||
metricsURL := g.buildMetricsURL(pod.Status.PodIP)
|
||||
return g.metricGetter.GetMetric(metricsURL)
|
||||
}
|
||||
|
||||
func NewPodMetricsJSONPathGetter(config map[string]string) (*PodMetricsJSONPathGetter, error) {
|
||||
getter := PodMetricsJSONPathGetter{}
|
||||
var (
|
||||
jsonPath string
|
||||
aggregator AggregatorFunc
|
||||
err error
|
||||
)
|
||||
|
||||
if v, ok := config["json-key"]; ok {
|
||||
jsonPath = v
|
||||
}
|
||||
|
||||
if v, ok := config["scheme"]; ok {
|
||||
getter.scheme = v
|
||||
}
|
||||
|
||||
if v, ok := config["path"]; ok {
|
||||
getter.path = v
|
||||
}
|
||||
|
||||
if v, ok := config["raw-query"]; ok {
|
||||
getter.rawQuery = v
|
||||
}
|
||||
|
||||
if v, ok := config["port"]; ok {
|
||||
n, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getter.port = n
|
||||
}
|
||||
|
||||
if v, ok := config["aggregator"]; ok {
|
||||
aggregator, err = ParseAggregator(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
requestTimeout := DefaultRequestTimeout
|
||||
connectTimeout := DefaultConnectTimeout
|
||||
|
||||
if v, ok := config["request-timeout"]; ok {
|
||||
d, err := time.ParseDuration(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if d < 0 {
|
||||
return nil, fmt.Errorf("Invalid request-timeout config value: %s", v)
|
||||
}
|
||||
requestTimeout = d
|
||||
}
|
||||
|
||||
if v, ok := config["connect-timeout"]; ok {
|
||||
d, err := time.ParseDuration(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if d < 0 {
|
||||
return nil, fmt.Errorf("Invalid connect-timeout config value: %s", v)
|
||||
}
|
||||
connectTimeout = d
|
||||
}
|
||||
|
||||
jsonPathGetter, err := NewJSONPathMetricsGetter(CustomMetricsHTTPClient(requestTimeout, connectTimeout), aggregator, jsonPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getter.metricGetter = jsonPathGetter
|
||||
return &getter, nil
|
||||
}
|
||||
|
||||
// buildMetricsURL will build the full URL needed to hit the pod metric endpoint.
|
||||
func (g *PodMetricsJSONPathGetter) buildMetricsURL(podIP string) url.URL {
|
||||
var scheme = g.scheme
|
||||
|
||||
if scheme == "" {
|
||||
scheme = "http"
|
||||
}
|
||||
|
||||
return url.URL{
|
||||
Scheme: scheme,
|
||||
Host: fmt.Sprintf("%s:%d", podIP, g.port),
|
||||
Path: g.path,
|
||||
RawQuery: g.rawQuery,
|
||||
}
|
||||
}
|
190
pkg/collector/httpmetrics/pod_metrics_test.go
Normal file
190
pkg/collector/httpmetrics/pod_metrics_test.go
Normal file
@ -0,0 +1,190 @@
|
||||
package httpmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func compareMetricsGetter(t *testing.T, first, second *PodMetricsJSONPathGetter) {
|
||||
require.Equal(t, first.metricGetter.jsonPath, second.metricGetter.jsonPath)
|
||||
require.Equal(t, first.scheme, second.scheme)
|
||||
require.Equal(t, first.path, second.path)
|
||||
require.Equal(t, first.port, second.port)
|
||||
}
|
||||
|
||||
func TestNewPodJSONPathMetricsGetter(t *testing.T) {
|
||||
configNoAggregator := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
}
|
||||
getterNoAggregator, err1 := NewPodMetricsJSONPathGetter(configNoAggregator)
|
||||
|
||||
require.NoError(t, err1)
|
||||
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
|
||||
metricGetter: &JSONPathMetricsGetter{jsonPath: configNoAggregator["json-key"]},
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
}, getterNoAggregator)
|
||||
|
||||
configAggregator := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
"aggregator": "avg",
|
||||
}
|
||||
getterAggregator, err2 := NewPodMetricsJSONPathGetter(configAggregator)
|
||||
|
||||
require.NoError(t, err2)
|
||||
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
|
||||
metricGetter: &JSONPathMetricsGetter{jsonPath: configAggregator["json-key"], aggregator: Average},
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
}, getterAggregator)
|
||||
|
||||
configErrorJSONPath := map[string]string{
|
||||
"json-key": "{}",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
}
|
||||
|
||||
_, err3 := NewPodMetricsJSONPathGetter(configErrorJSONPath)
|
||||
require.Error(t, err3)
|
||||
|
||||
configErrorPort := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "a9090",
|
||||
}
|
||||
|
||||
_, err4 := NewPodMetricsJSONPathGetter(configErrorPort)
|
||||
require.Error(t, err4)
|
||||
|
||||
configWithRawQuery := map[string]string{
|
||||
"json-key": "$.values",
|
||||
"scheme": "http",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
"raw-query": "foo=bar&baz=bop",
|
||||
}
|
||||
getterWithRawQuery, err5 := NewPodMetricsJSONPathGetter(configWithRawQuery)
|
||||
|
||||
require.NoError(t, err5)
|
||||
compareMetricsGetter(t, &PodMetricsJSONPathGetter{
|
||||
metricGetter: &JSONPathMetricsGetter{jsonPath: configWithRawQuery["json-key"]},
|
||||
scheme: "http",
|
||||
path: "/metrics",
|
||||
port: 9090,
|
||||
rawQuery: "foo=bar&baz=bop",
|
||||
}, getterWithRawQuery)
|
||||
}
|
||||
|
||||
func TestBuildMetricsURL(t *testing.T) {
|
||||
scheme := "http"
|
||||
ip := "1.2.3.4"
|
||||
port := "9090"
|
||||
path := "/v1/test/"
|
||||
rawQuery := "foo=bar&baz=bop"
|
||||
|
||||
// Test building URL with rawQuery
|
||||
configWithRawQuery := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"raw-query": rawQuery,
|
||||
}
|
||||
getterWithRawQuery, err1 := NewPodMetricsJSONPathGetter(configWithRawQuery)
|
||||
require.NoError(t, err1)
|
||||
|
||||
expectedURLWithQuery := fmt.Sprintf("%s://%s:%s%s?%s", scheme, ip, port, path, rawQuery)
|
||||
receivedURLWithQuery := getterWithRawQuery.buildMetricsURL(ip)
|
||||
require.Equal(t, receivedURLWithQuery.String(), expectedURLWithQuery)
|
||||
|
||||
// Test building URL without rawQuery
|
||||
configWithNoQuery := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
}
|
||||
getterWithNoQuery, err3 := NewPodMetricsJSONPathGetter(configWithNoQuery)
|
||||
require.NoError(t, err3)
|
||||
|
||||
expectedURLNoQuery := fmt.Sprintf("%s://%s:%s%s", scheme, ip, port, path)
|
||||
receivedURLNoQuery := getterWithNoQuery.buildMetricsURL(ip)
|
||||
require.Equal(t, receivedURLNoQuery.String(), expectedURLNoQuery)
|
||||
}
|
||||
|
||||
func TestCustomTimeouts(t *testing.T) {
|
||||
scheme := "http"
|
||||
port := "9090"
|
||||
path := "/v1/test/"
|
||||
|
||||
// Test no custom options results in default timeouts
|
||||
defaultConfig := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
}
|
||||
defaultTime := time.Duration(15000) * time.Millisecond
|
||||
|
||||
defaultGetter, err1 := NewPodMetricsJSONPathGetter(defaultConfig)
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, defaultGetter.metricGetter.client.Timeout, defaultTime)
|
||||
|
||||
// Test with custom request timeout
|
||||
configWithRequestTimeout := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"request-timeout": "978ms",
|
||||
}
|
||||
exectedTimeout := time.Duration(978) * time.Millisecond
|
||||
customRequestGetter, err2 := NewPodMetricsJSONPathGetter(configWithRequestTimeout)
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, customRequestGetter.metricGetter.client.Timeout, exectedTimeout)
|
||||
|
||||
// Test with custom connect timeout. Unfortunately, it seems there's no way to access the
|
||||
// connect timeout of the client struct to actually verify it's set :/
|
||||
configWithConnectTimeout := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"connect-timeout": "512ms",
|
||||
}
|
||||
_, err3 := NewPodMetricsJSONPathGetter(configWithConnectTimeout)
|
||||
require.NoError(t, err3)
|
||||
|
||||
configWithInvalidTimeout := map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"request-timeout": "-256ms",
|
||||
}
|
||||
_, err4 := NewPodMetricsJSONPathGetter(configWithInvalidTimeout)
|
||||
require.Error(t, err4)
|
||||
|
||||
configWithInvalidTimeout = map[string]string{
|
||||
"json-key": "$.value",
|
||||
"scheme": scheme,
|
||||
"path": path,
|
||||
"port": port,
|
||||
"connect-timeout": "-256ms",
|
||||
}
|
||||
_, err5 := NewPodMetricsJSONPathGetter(configWithInvalidTimeout)
|
||||
require.Error(t, err5)
|
||||
}
|
152
pkg/collector/influxdb_collector.go
Normal file
152
pkg/collector/influxdb_collector.go
Normal file
@ -0,0 +1,152 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb-client-go"
|
||||
"k8s.io/api/autoscaling/v2beta2"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
InfluxDBMetricName = "flux-query"
|
||||
influxDBAddressKey = "address"
|
||||
influxDBTokenKey = "token"
|
||||
influxDBOrgKey = "org"
|
||||
influxDBQueryNameLabelKey = "query-name"
|
||||
)
|
||||
|
||||
type InfluxDBCollectorPlugin struct {
|
||||
kubeClient kubernetes.Interface
|
||||
address string
|
||||
token string
|
||||
org string
|
||||
}
|
||||
|
||||
func NewInfluxDBCollectorPlugin(client kubernetes.Interface, address, token, org string) (*InfluxDBCollectorPlugin, error) {
|
||||
return &InfluxDBCollectorPlugin{
|
||||
kubeClient: client,
|
||||
address: address,
|
||||
token: token,
|
||||
org: org,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *InfluxDBCollectorPlugin) NewCollector(hpa *v2beta2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewInfluxDBCollector(p.address, p.token, p.org, config, interval)
|
||||
}
|
||||
|
||||
type InfluxDBCollector struct {
|
||||
address string
|
||||
token string
|
||||
org string
|
||||
|
||||
influxDBClient *influxdb.Client
|
||||
interval time.Duration
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
query string
|
||||
}
|
||||
|
||||
func NewInfluxDBCollector(address string, token string, org string, config *MetricConfig, interval time.Duration) (*InfluxDBCollector, error) {
|
||||
collector := &InfluxDBCollector{
|
||||
interval: interval,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
}
|
||||
switch configType := config.Type; configType {
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
return nil, fmt.Errorf("InfluxDB does not support object, but only external custom metrics")
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
// `metricSelector` is flattened into the MetricConfig.Config.
|
||||
queryName, ok := config.Config[influxDBQueryNameLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("selector for Flux query is not specified, "+
|
||||
"please add metricSelector.matchLabels.%s: <...> to .yml description", influxDBQueryNameLabelKey)
|
||||
}
|
||||
if query, ok := config.Config[queryName]; ok {
|
||||
// TODO(affo): validate the query once this is done:
|
||||
// https://github.com/influxdata/influxdb-client-go/issues/73.
|
||||
collector.query = query
|
||||
} else {
|
||||
return nil, fmt.Errorf("no Flux query defined for metric \"%s\"", config.Metric.Name)
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown metric type: %v", configType)
|
||||
}
|
||||
// Use custom InfluxDB config if defined in HPA annotation.
|
||||
if v, ok := config.Config[influxDBAddressKey]; ok {
|
||||
address = v
|
||||
}
|
||||
if v, ok := config.Config[influxDBTokenKey]; ok {
|
||||
token = v
|
||||
}
|
||||
if v, ok := config.Config[influxDBOrgKey]; ok {
|
||||
org = v
|
||||
}
|
||||
influxDbClient, err := influxdb.New(address, token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
collector.address = address
|
||||
collector.token = token
|
||||
collector.org = org
|
||||
collector.influxDBClient = influxDbClient
|
||||
return collector, nil
|
||||
}
|
||||
|
||||
// queryResult is for unmarshaling the result from InfluxDB.
|
||||
// The FluxQuery should make it so that the resulting table contains the column "metricvalue".
|
||||
type queryResult struct {
|
||||
MetricValue float64
|
||||
}
|
||||
|
||||
// getValue returns the first result gathered from an InfluxDB instance.
|
||||
func (c *InfluxDBCollector) getValue() (resource.Quantity, error) {
|
||||
res, err := c.influxDBClient.QueryCSV(context.Background(), c.query, c.org)
|
||||
if err != nil {
|
||||
return resource.Quantity{}, err
|
||||
}
|
||||
defer res.Close()
|
||||
// Keeping just the first result.
|
||||
if res.Next() {
|
||||
qr := queryResult{}
|
||||
if err := res.Unmarshal(&qr); err != nil {
|
||||
return resource.Quantity{}, fmt.Errorf("error in unmarshaling query result: %v", err)
|
||||
}
|
||||
return *resource.NewMilliQuantity(int64(qr.MetricValue*1000), resource.DecimalSI), nil
|
||||
}
|
||||
if err := res.Err; err != nil {
|
||||
return resource.Quantity{}, fmt.Errorf("error in query result: %v", err)
|
||||
}
|
||||
return resource.Quantity{}, fmt.Errorf("empty result returned")
|
||||
}
|
||||
|
||||
func (c *InfluxDBCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
v, err := c.getValue()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cm := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Time{
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Value: v,
|
||||
},
|
||||
}
|
||||
return []CollectedMetric{cm}, nil
|
||||
}
|
||||
|
||||
func (c *InfluxDBCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
155
pkg/collector/influxdb_collector_test.go
Normal file
155
pkg/collector/influxdb_collector_test.go
Normal file
@ -0,0 +1,155 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/autoscaling/v2beta2"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestInfluxDBCollector_New(t *testing.T) {
|
||||
t.Run("simple", func(t *testing.T) {
|
||||
m := &MetricConfig{
|
||||
MetricTypeName: MetricTypeName{
|
||||
Type: v2beta2.ExternalMetricSourceType,
|
||||
Metric: v2beta2.MetricIdentifier{
|
||||
Name: "flux-query",
|
||||
// This is actually useless, because the selector should be flattened in Config when parsing.
|
||||
Selector: &v1.LabelSelector{
|
||||
MatchLabels: map[string]string{
|
||||
"query-name": "range2m",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
CollectorName: "influxdb",
|
||||
Config: map[string]string{
|
||||
"range1m": `from(bucket: "?") |> range(start: -1m)`,
|
||||
"range2m": `from(bucket: "?") |> range(start: -2m)`,
|
||||
"range3m": `from(bucket: "?") |> range(start: -3m)`,
|
||||
"query-name": "range2m",
|
||||
},
|
||||
}
|
||||
c, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got, want := c.org, "deadbeef"; want != got {
|
||||
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
||||
}
|
||||
if got, want := c.address, "http://localhost:9999"; want != got {
|
||||
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
||||
}
|
||||
if got, want := c.token, "secret"; want != got {
|
||||
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
||||
}
|
||||
if got, want := c.query, `from(bucket: "?") |> range(start: -2m)`; want != got {
|
||||
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
||||
}
|
||||
})
|
||||
t.Run("override params", func(t *testing.T) {
|
||||
m := &MetricConfig{
|
||||
MetricTypeName: MetricTypeName{
|
||||
Type: v2beta2.ExternalMetricSourceType,
|
||||
Metric: v2beta2.MetricIdentifier{
|
||||
Name: "flux-query",
|
||||
Selector: &v1.LabelSelector{
|
||||
MatchLabels: map[string]string{
|
||||
"query-name": "range2m",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
CollectorName: "influxdb",
|
||||
Config: map[string]string{
|
||||
"range1m": `from(bucket: "?") |> range(start: -1m)`,
|
||||
"range2m": `from(bucket: "?") |> range(start: -2m)`,
|
||||
"range3m": `from(bucket: "?") |> range(start: -3m)`,
|
||||
"address": "http://localhost:9999",
|
||||
"token": "sEcr3TT0ken",
|
||||
"org": "deadbeef1234",
|
||||
"query-name": "range3m",
|
||||
},
|
||||
}
|
||||
c, err := NewInfluxDBCollector("http://localhost:8888", "secret", "deadbeef", m, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got, want := c.org, "deadbeef1234"; want != got {
|
||||
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
||||
}
|
||||
if got, want := c.address, "http://localhost:9999"; want != got {
|
||||
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
||||
}
|
||||
if got, want := c.token, "sEcr3TT0ken"; want != got {
|
||||
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
||||
}
|
||||
if got, want := c.query, `from(bucket: "?") |> range(start: -3m)`; want != got {
|
||||
t.Errorf("unexpected value -want/+got:\n\t-%s\n\t+%s", want, got)
|
||||
}
|
||||
})
|
||||
// Errors.
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
mTypeName MetricTypeName
|
||||
config map[string]string
|
||||
errorStartsWith string
|
||||
}{
|
||||
{
|
||||
name: "object metric",
|
||||
mTypeName: MetricTypeName{
|
||||
Type: v2beta2.ObjectMetricSourceType,
|
||||
},
|
||||
errorStartsWith: "InfluxDB does not support object",
|
||||
},
|
||||
{
|
||||
name: "no selector",
|
||||
mTypeName: MetricTypeName{
|
||||
Type: v2beta2.ExternalMetricSourceType,
|
||||
Metric: v2beta2.MetricIdentifier{
|
||||
Name: "flux-query",
|
||||
},
|
||||
},
|
||||
// The selector should be flattened into the config by the parsing step, but it isn't.
|
||||
config: map[string]string{
|
||||
"range1m": `from(bucket: "?") |> range(start: -1m)`,
|
||||
"range2m": `from(bucket: "?") |> range(start: -2m)`,
|
||||
"range3m": `from(bucket: "?") |> range(start: -3m)`,
|
||||
},
|
||||
errorStartsWith: "selector for Flux query is not specified",
|
||||
},
|
||||
{
|
||||
name: "referencing non-existing query",
|
||||
mTypeName: MetricTypeName{
|
||||
Type: v2beta2.ExternalMetricSourceType,
|
||||
Metric: v2beta2.MetricIdentifier{
|
||||
Name: "flux-query",
|
||||
},
|
||||
},
|
||||
config: map[string]string{
|
||||
"range1m": `from(bucket: "?") |> range(start: -1m)`,
|
||||
"range2m": `from(bucket: "?") |> range(start: -2m)`,
|
||||
"range3m": `from(bucket: "?") |> range(start: -3m)`,
|
||||
"query-name": "rangeXm",
|
||||
},
|
||||
errorStartsWith: "no Flux query defined for metric",
|
||||
},
|
||||
} {
|
||||
t.Run("error - "+tc.name, func(t *testing.T) {
|
||||
m := &MetricConfig{
|
||||
MetricTypeName: tc.mTypeName,
|
||||
CollectorName: "influxdb",
|
||||
Config: tc.config,
|
||||
}
|
||||
_, err := NewInfluxDBCollector("http://localhost:9999", "secret", "deadbeef", m, time.Second)
|
||||
if err == nil {
|
||||
t.Fatal("expected error got none")
|
||||
}
|
||||
if want, got := tc.errorStartsWith, err.Error(); !strings.HasPrefix(got, want) {
|
||||
t.Fatalf("%s should start with %s", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -1,133 +0,0 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/oliveagle/jsonpath"
|
||||
"k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
|
||||
// querying the pods metrics endpoint and lookup the metric value as defined by
|
||||
// the json path query.
|
||||
type JSONPathMetricsGetter struct {
|
||||
jsonPath *jsonpath.Compiled
|
||||
scheme string
|
||||
path string
|
||||
port int
|
||||
}
|
||||
|
||||
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
|
||||
func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter, error) {
|
||||
getter := &JSONPathMetricsGetter{}
|
||||
|
||||
if v, ok := config["json-key"]; ok {
|
||||
pat, err := jsonpath.Compile(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse json path definition: %v", err)
|
||||
}
|
||||
|
||||
getter.jsonPath = pat
|
||||
}
|
||||
|
||||
if v, ok := config["scheme"]; ok {
|
||||
getter.scheme = v
|
||||
}
|
||||
|
||||
if v, ok := config["path"]; ok {
|
||||
getter.path = v
|
||||
}
|
||||
|
||||
if v, ok := config["port"]; ok {
|
||||
n, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getter.port = n
|
||||
}
|
||||
|
||||
return getter, nil
|
||||
}
|
||||
|
||||
// GetMetric gets metric from pod by fetching json metrics from the pods metric
|
||||
// endpoint and extracting the desired value using the specified json path
|
||||
// query.
|
||||
func (g *JSONPathMetricsGetter) GetMetric(pod *v1.Pod) (float64, error) {
|
||||
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// parse data
|
||||
var jsonData interface{}
|
||||
err = json.Unmarshal(data, &jsonData)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
res, err := g.jsonPath.Lookup(jsonData)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
switch res := res.(type) {
|
||||
case int:
|
||||
return float64(res), nil
|
||||
case float32:
|
||||
return float64(res), nil
|
||||
case float64:
|
||||
return res, nil
|
||||
default:
|
||||
return 0, fmt.Errorf("unsupported type %T", res)
|
||||
}
|
||||
}
|
||||
|
||||
// getPodMetrics returns the content of the pods metrics endpoint.
|
||||
func getPodMetrics(pod *v1.Pod, scheme, path string, port int) ([]byte, error) {
|
||||
if pod.Status.PodIP == "" {
|
||||
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
|
||||
}
|
||||
|
||||
httpClient := &http.Client{
|
||||
Timeout: 15 * time.Second,
|
||||
Transport: &http.Transport{},
|
||||
}
|
||||
|
||||
if scheme == "" {
|
||||
scheme = "http"
|
||||
}
|
||||
|
||||
metricsURL := url.URL{
|
||||
Scheme: scheme,
|
||||
Host: fmt.Sprintf("%s:%d", pod.Status.PodIP, port),
|
||||
Path: path,
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(http.MethodGet, metricsURL.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := httpClient.Do(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("unsuccessful response: %s", resp.Status)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user