mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2026-03-06 00:04:58 +00:00
Compare commits
46 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5e6d304ecd | |||
| 0de5042d3d | |||
| 07c0e179b3 | |||
| 29ee953a16 | |||
| f78ef26857 | |||
| a3c14e9dcb | |||
| b6b13fb31a | |||
| 0a06691d39 | |||
| 2d1d51e829 | |||
| 41761e62df | |||
| ed4c93abbb | |||
| b2194ca136 | |||
| bd0dd10e72 | |||
| 461869c69b | |||
| 9950851cad | |||
| d85fee795e | |||
| 990f8eab14 | |||
| 9a396bde68 | |||
| aa8d24dbcf | |||
| 19e9be9671 | |||
| 8fed8538ad | |||
| 9a234cbdac | |||
| ffff8c2040 | |||
| 9d2760e3fc | |||
| 5598b4d012 | |||
| 888e76b748 | |||
| 7c848a1282 | |||
| 445c7c874a | |||
| 2eed3e64d0 | |||
| f097e63401 | |||
| ca4e2008c4 | |||
| 3f019a1ceb | |||
| 5a6f4997bd | |||
| 8db22f38a3 | |||
| d5b803d923 | |||
| 14f13495af | |||
| dfeae82cae | |||
| 04b212175e | |||
| 478c97d5cb | |||
| f4efa2898b | |||
| 7258cb7800 | |||
| 56dd8b52e0 | |||
| 248acf0311 | |||
| 75633d3082 | |||
| 72aa672f51 | |||
| f49f7821dc |
@@ -0,0 +1,22 @@
|
||||
run:
|
||||
skip-files:
|
||||
- "pkg/provider/generated.conversion.go"
|
||||
- "pkg/provider/conversion.go"
|
||||
linters-settings:
|
||||
golint:
|
||||
min-confidence: 0.9
|
||||
|
||||
linters:
|
||||
disable-all: true
|
||||
enable:
|
||||
- staticcheck
|
||||
- ineffassign
|
||||
- golint
|
||||
- goimports
|
||||
- errcheck
|
||||
issues:
|
||||
exclude-rules:
|
||||
# Exclude some staticcheck messages
|
||||
- linters:
|
||||
- staticcheck
|
||||
text: "SA9003:"
|
||||
+6
-4
@@ -2,16 +2,18 @@ language: go
|
||||
dist: xenial
|
||||
|
||||
go:
|
||||
- "1.11.x"
|
||||
- "1.13.x"
|
||||
|
||||
env:
|
||||
- GO111MODULE=on
|
||||
- GO111MODULE=on GOLANGCI_RELEASE="v1.16.0"
|
||||
|
||||
before_install:
|
||||
- go get github.com/mattn/goveralls
|
||||
- go get github.com/lawrencewoodman/roveralls
|
||||
- GO111MODULE=off go get github.com/mattn/goveralls
|
||||
- GO111MODULE=off go get github.com/lawrencewoodman/roveralls
|
||||
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANGCI_RELEASE}
|
||||
|
||||
script:
|
||||
- make check
|
||||
- make test
|
||||
- make build.docker
|
||||
- roveralls
|
||||
|
||||
@@ -19,8 +19,7 @@ test:
|
||||
go test -v $(GOPKGS)
|
||||
|
||||
check:
|
||||
golint $(GOPKGS)
|
||||
go vet -v $(GOPKGS)
|
||||
golangci-lint run ./...
|
||||
|
||||
build.local: build/$(BINARY)
|
||||
build.linux: build/linux/$(BINARY)
|
||||
|
||||
@@ -90,6 +90,7 @@ metadata:
|
||||
metric-config.pods.requests-per-second.json-path/json-key: "$.http_server.rps"
|
||||
metric-config.pods.requests-per-second.json-path/path: /metrics
|
||||
metric-config.pods.requests-per-second.json-path/port: "9090"
|
||||
metric-config.pods.requests-per-second.json-path/scheme: "https"
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
@@ -123,9 +124,9 @@ The json-path query support depends on the
|
||||
See the README for possible queries. It's expected that the metric you query
|
||||
returns something that can be turned into a `float64`.
|
||||
|
||||
The other configuration options `path` and `port` specifies where the metrics
|
||||
endpoint is exposed on the pod. There's no default values, so they must be
|
||||
defined.
|
||||
The other configuration options `path`, `port` and `scheme` specify where the metrics
|
||||
endpoint is exposed on the pod. The `path` and `port` options do not have default values
|
||||
so they must be defined. The `scheme` is optional and defaults to `http`.
|
||||
|
||||
## Prometheus collector
|
||||
|
||||
@@ -152,9 +153,55 @@ the trade-offs between the two approaches.
|
||||
|
||||
| Metric | Description | Type | Kind |
|
||||
| ------------ | -------------- | ------- | -- |
|
||||
| `prometheus-query` | Generic metric which requires a user defined query. | External | |
|
||||
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Object | *any* |
|
||||
|
||||
### Example
|
||||
### Example: External Metric
|
||||
|
||||
This is an example of an HPA configured to get metrics based on a Prometheus
|
||||
query. The query is defined in the annotation
|
||||
`metric-config.external.prometheus-query.prometheus/processed-events-per-second`
|
||||
where `processed-events-per-second` is the query name which will be associated
|
||||
with the result of the query. A matching `query-name` label must be defined in
|
||||
the `matchLabels` of the metric definition. This allows having multiple
|
||||
prometheus queries associated with a single HPA.
|
||||
|
||||
```yaml
|
||||
apiVersion: autoscaling/v2beta1
|
||||
kind: HorizontalPodAutoscaler
|
||||
metadata:
|
||||
name: myapp-hpa
|
||||
annotations:
|
||||
# This annotation is optional.
|
||||
# If specified, then this prometheus server is used,
|
||||
# instead of the prometheus server specified as the CLI argument `--prometheus-server`.
|
||||
metric-config.external.prometheus-query.prometheus/prometheus-server: http://prometheus.my-namespace.svc
|
||||
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
|
||||
# <configKey> == query-name
|
||||
metric-config.external.prometheus-query.prometheus/processed-events-per-second: |
|
||||
scalar(sum(rate(event-service_events_count{application="event-service",processed="true"}[1m])))
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
name: custom-metrics-consumer
|
||||
minReplicas: 1
|
||||
maxReplicas: 10
|
||||
metrics:
|
||||
- type: External
|
||||
external:
|
||||
metricName: prometheus-query
|
||||
metricSelector:
|
||||
matchLabels:
|
||||
query-name: processed-events-per-second
|
||||
targetAverageValue: 10
|
||||
```
|
||||
|
||||
### Example: Object Metric [DEPRECATED]
|
||||
|
||||
> _Note: Prometheus Object metrics are **deprecated** and will most likely be
|
||||
> removed in the future. Use the Prometheus External metrics instead as described
|
||||
> above._
|
||||
|
||||
This is an example of an HPA configured to get metrics based on a Prometheus
|
||||
query. The query is defined in the annotation
|
||||
@@ -192,11 +239,15 @@ spec:
|
||||
metricName: processed-events-per-second
|
||||
target:
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
name: event-service
|
||||
kind: Pod
|
||||
name: dummy-pod
|
||||
targetValue: 10 # this will be treated as targetAverageValue
|
||||
```
|
||||
|
||||
_Note:_ The HPA object requires an `Object` to be specified. However when a Prometheus metric is used there is no need
|
||||
for this object. But to satisfy the schema we specify a dummy pod called `dummy-pod`.
|
||||
|
||||
|
||||
## Skipper collector
|
||||
|
||||
The skipper collector is a simple wrapper around the Prometheus collector to
|
||||
@@ -240,6 +291,17 @@ spec:
|
||||
targetValue: 10 # this will be treated as targetAverageValue
|
||||
```
|
||||
|
||||
### Metric weighting based on backend
|
||||
|
||||
Skipper supports sending traffic to different backend based on annotations present on the
|
||||
`Ingress` object. When the metric name is specified without a backend as `requests-per-second`
|
||||
then the number of replicas will be calculated based on the full traffic served by that ingress.
|
||||
If however only the traffic being routed to a specific backend should be used then the
|
||||
backend name can be specified as a metric name like `requests-per-second,backend1` which would
|
||||
return the requests-per-second being sent to the `backend1`. The ingress annotation where
|
||||
the backend weights can be obtained can be specified through the flag `--skipper-backends-annotation`.
|
||||
|
||||
|
||||
**Note:** As of Kubernetes v1.10 the HPA does not support `targetAverageValue` for
|
||||
metrics of type `Object`. In case of requests per second it does not make sense
|
||||
to scale on a summed value because you can not make the total requests per
|
||||
@@ -252,6 +314,30 @@ instead of a total sum.
|
||||
The AWS collector allows scaling based on external metrics exposed by AWS
|
||||
services e.g. SQS queue lengths.
|
||||
|
||||
### AWS IAM role
|
||||
|
||||
To integrate with AWS, the controller needs to run on nodes with
|
||||
access to AWS API. Additionally the controller have to have a role
|
||||
with the following policy to get all required data from AWS:
|
||||
|
||||
```yaml
|
||||
PolicyDocument:
|
||||
Statement:
|
||||
- Action: 'sqs:GetQueueUrl'
|
||||
Effect: Allow
|
||||
Resource: '*'
|
||||
- Action: 'sqs:GetQueueAttributes'
|
||||
Effect: Allow
|
||||
Resource: '*'
|
||||
- Action: 'sqs:ListQueues'
|
||||
Effect: Allow
|
||||
Resource: '*'
|
||||
- Action: 'sqs:ListQueueTags'
|
||||
Effect: Allow
|
||||
Resource: '*'
|
||||
Version: 2012-10-17
|
||||
```
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type |
|
||||
|
||||
+11
-1
@@ -2,7 +2,13 @@ version: "2017-09-20"
|
||||
pipeline:
|
||||
- id: build
|
||||
overlay: ci/golang
|
||||
cache:
|
||||
paths:
|
||||
- /go/pkg/mod # pkg cache for Go modules
|
||||
- ~/.cache/go-build # Go build cache
|
||||
type: script
|
||||
env:
|
||||
GOFLAGS: "-mod=readonly"
|
||||
commands:
|
||||
- desc: test
|
||||
cmd: |
|
||||
@@ -14,7 +20,11 @@ pipeline:
|
||||
cmd: |
|
||||
if [[ $CDP_TARGET_BRANCH == master && ! $CDP_PULL_REQUEST_NUMBER ]]; then
|
||||
IMAGE=registry-write.opensource.zalan.do/teapot/kube-metrics-adapter
|
||||
VERSION=$(git describe --tags --always)
|
||||
else
|
||||
IMAGE=registry-write.opensource.zalan.do/teapot/kube-metrics-adapter-test
|
||||
VERSION=$CDP_BUILD_VERSION
|
||||
fi
|
||||
IMAGE=$IMAGE VERSION=$CDP_BUILD_VERSION make build.push
|
||||
IMAGE=$IMAGE VERSION=$VERSION make build.docker
|
||||
git diff --stat --exit-code
|
||||
IMAGE=$IMAGE VERSION=$VERSION make build.push
|
||||
|
||||
+4
-2
@@ -3,13 +3,15 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
func metricsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
w.Write([]byte(fmt.Sprintf(`{"queue": {"length": %d}}`, size)))
|
||||
_, err := w.Write([]byte(fmt.Sprintf(`{"queue": {"length": %d}}`, size)))
|
||||
log.Fatalf("failed to write: %v", err)
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -29,5 +31,5 @@ func main() {
|
||||
ReadTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
server.ListenAndServe()
|
||||
log.Fatal(server.ListenAndServe())
|
||||
}
|
||||
|
||||
@@ -1,91 +1,33 @@
|
||||
module github.com/zalando-incubator/kube-metrics-adapter
|
||||
|
||||
require (
|
||||
bitbucket.org/ww/goautoneg v0.0.0-20120707110453-75cd24fc2f2c // indirect
|
||||
github.com/BurntSushi/toml v0.3.0 // indirect
|
||||
github.com/NYTimes/gziphandler v1.0.1 // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.0 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/aws/aws-sdk-go v1.16.6
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
|
||||
github.com/boltdb/bolt v1.3.1 // indirect
|
||||
github.com/coreos/bbolt v1.3.0 // indirect
|
||||
github.com/coreos/etcd v3.3.9+incompatible // indirect
|
||||
github.com/coreos/go-semver v0.2.0 // indirect
|
||||
github.com/coreos/go-systemd v0.0.0-20180705093442-88bfeed483d3 // indirect
|
||||
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
|
||||
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
|
||||
github.com/emicklei/go-restful v2.8.0+incompatible // indirect
|
||||
github.com/emicklei/go-restful-swagger12 v0.0.0-20170926063155-7524189396c6 // indirect
|
||||
github.com/evanphx/json-patch v3.0.0+incompatible // indirect
|
||||
github.com/fsnotify/fsnotify v1.4.7 // indirect
|
||||
github.com/ghodss/yaml v1.0.0 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.0.0-20180322222829-3a0015ad55fa // indirect
|
||||
github.com/go-openapi/jsonreference v0.0.0-20180322222742-3fb327e6747d // indirect
|
||||
github.com/go-openapi/spec v0.0.0-20180801175345-384415f06ee2 // indirect
|
||||
github.com/go-openapi/swag v0.0.0-20180715190254-becd2f08beaf // indirect
|
||||
github.com/gogo/protobuf v1.1.1 // indirect
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
|
||||
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
|
||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
|
||||
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect
|
||||
github.com/googleapis/gnostic v0.2.0 // indirect
|
||||
github.com/gorilla/websocket v1.3.0 // indirect
|
||||
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.4.1 // indirect
|
||||
github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47 // indirect
|
||||
github.com/hpcloud/tail v1.0.0 // indirect
|
||||
github.com/imdario/mergo v0.3.6 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/jonboulle/clockwork v0.1.0 // indirect
|
||||
github.com/json-iterator/go v1.1.5 // indirect
|
||||
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20180824182428-26e5299457d3
|
||||
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20190918110929-3d9be26a50eb
|
||||
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
|
||||
github.com/onsi/ginkgo v1.6.0 // indirect
|
||||
github.com/onsi/gomega v1.4.1 // indirect
|
||||
github.com/pborman/uuid v0.0.0-20180122190007-c65b2f87fee3 // indirect
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_golang v0.9.0-pre1.0.20180824101016-4eb539fa85a2
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect
|
||||
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e
|
||||
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 // indirect
|
||||
github.com/sirupsen/logrus v1.0.6
|
||||
github.com/prometheus/client_golang v0.9.2
|
||||
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
|
||||
github.com/sirupsen/logrus v1.4.2
|
||||
github.com/soheilhy/cmux v0.1.4 // indirect
|
||||
github.com/spf13/cobra v0.0.3
|
||||
github.com/spf13/pflag v1.0.2 // indirect
|
||||
github.com/stretchr/testify v1.2.2
|
||||
github.com/stretchr/testify v1.3.0
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
|
||||
github.com/ugorji/go v1.1.1 // indirect
|
||||
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect
|
||||
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20180921141935-824b77fb1f84
|
||||
golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e // indirect
|
||||
golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
|
||||
golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 // indirect
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect
|
||||
google.golang.org/appengine v1.2.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 // indirect
|
||||
google.golang.org/grpc v1.14.0 // indirect
|
||||
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
|
||||
gopkg.in/fsnotify.v1 v1.4.7 // indirect
|
||||
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 // indirect
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||
gopkg.in/yaml.v2 v2.2.1 // indirect
|
||||
k8s.io/api v0.0.0-20180628040859-072894a440bd
|
||||
k8s.io/apimachinery v0.0.0-20180621070125-103fd098999d
|
||||
k8s.io/apiserver v0.0.0-20180628044425-01459b68eb5f
|
||||
k8s.io/client-go v8.0.0+incompatible
|
||||
k8s.io/kube-openapi v0.0.0-20180731170545-e3762e86a74c // indirect
|
||||
k8s.io/metrics v0.0.0-20180718014405-6efa0bfaa5c1
|
||||
k8s.io/api v0.0.0-20190918155943-95b840bb6a1f
|
||||
k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655
|
||||
k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad // indirect
|
||||
k8s.io/client-go v0.0.0-20190918160344-1fbdaa4c8d90
|
||||
k8s.io/component-base v0.0.0-20190918160511-547f6c5d7090
|
||||
k8s.io/klog v0.4.0
|
||||
k8s.io/metrics v0.0.0-20190226180357-f3f09b9076d1
|
||||
)
|
||||
|
||||
go 1.13
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
# Skipper Prometheus Metrics Collection
|
||||
|
||||
The skipper-ingress pods should be configured to be scraped by Prometheus. This
|
||||
can be done by Prometheus service discovery using discovery of Kubernetes services
|
||||
or Kubernetes pods:
|
||||
|
||||
```yaml
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
annotations:
|
||||
prometheus.io/path: /metrics
|
||||
prometheus.io/port: "9911"
|
||||
prometheus.io/scrape: "true"
|
||||
labels:
|
||||
application: skipper-ingress
|
||||
name: skipper-ingress
|
||||
spec:
|
||||
ports:
|
||||
- port: 80
|
||||
protocol: TCP
|
||||
targetPort: 9999
|
||||
selector:
|
||||
application: skipper-ingress
|
||||
type: ClusterIP
|
||||
```
|
||||
This [configuration](https://github.com/zalando-incubator/kubernetes-on-aws/blob/dev/cluster/manifests/prometheus/configmap.yaml#L69)
|
||||
shows how prometheus is configured in our clusters to scrape service endpoints.
|
||||
The annotations `prometheus.io/path`, `prometheus.io/port` and `prometheus.io/scrape`
|
||||
instruct Prometheus to scrape all pods of this service on the port _9911_ and
|
||||
the path `/metrics`.
|
||||
|
||||
When the `kube-metrics-adapter` is started the flag `--prometheus-server` should be set so that
|
||||
the adapter can query prometheus to get aggregated metrics. When running in kubernetes it can
|
||||
be the service address of the prometheus service like `http://prometheus.kube-system`.
|
||||
|
||||
With these settings the `kube-metrics-adapter` can provide `request-per-second` metrics for ingress
|
||||
objects which are present in the cluster. The prometheus instances scrape the metrics from
|
||||
the `skipper-ingress` pods. The adapter then queries prometheus to get the metric and then
|
||||
provides them to the API server when requested.
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/server"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/util/logs"
|
||||
"k8s.io/component-base/logs"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
package annotations
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
)
|
||||
|
||||
const (
|
||||
customMetricsPrefix = "metric-config."
|
||||
perReplicaMetricsConfKey = "per-replica"
|
||||
intervalMetricsConfKey = "interval"
|
||||
)
|
||||
|
||||
type AnnotationConfigs struct {
|
||||
CollectorName string
|
||||
Configs map[string]string
|
||||
PerReplica bool
|
||||
Interval time.Duration
|
||||
}
|
||||
|
||||
type MetricConfigKey struct {
|
||||
Type autoscalingv2.MetricSourceType
|
||||
MetricName string
|
||||
}
|
||||
|
||||
type AnnotationConfigMap map[MetricConfigKey]*AnnotationConfigs
|
||||
|
||||
func (m AnnotationConfigMap) Parse(annotations map[string]string) error {
|
||||
for key, val := range annotations {
|
||||
if !strings.HasPrefix(key, customMetricsPrefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
parts := strings.Split(key, "/")
|
||||
if len(parts) != 2 {
|
||||
// TODO: error?
|
||||
continue
|
||||
}
|
||||
|
||||
configs := strings.Split(parts[0], ".")
|
||||
if len(configs) != 4 {
|
||||
// TODO: error?
|
||||
continue
|
||||
}
|
||||
|
||||
key := MetricConfigKey{
|
||||
MetricName: configs[2],
|
||||
}
|
||||
|
||||
switch configs[1] {
|
||||
case "pods":
|
||||
key.Type = autoscalingv2.PodsMetricSourceType
|
||||
case "object":
|
||||
key.Type = autoscalingv2.ObjectMetricSourceType
|
||||
default:
|
||||
key.Type = autoscalingv2.ExternalMetricSourceType
|
||||
}
|
||||
|
||||
metricCollector := configs[3]
|
||||
|
||||
config, ok := m[key]
|
||||
if !ok {
|
||||
config = &AnnotationConfigs{
|
||||
CollectorName: metricCollector,
|
||||
Configs: map[string]string{},
|
||||
}
|
||||
m[key] = config
|
||||
}
|
||||
|
||||
// TODO: fail if collector name doesn't match
|
||||
if config.CollectorName != metricCollector {
|
||||
continue
|
||||
}
|
||||
|
||||
if parts[1] == perReplicaMetricsConfKey {
|
||||
config.PerReplica = true
|
||||
continue
|
||||
}
|
||||
|
||||
if parts[1] == intervalMetricsConfKey {
|
||||
interval, err := time.ParseDuration(val)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse interval value %s for %s: %v", val, key, err)
|
||||
}
|
||||
config.Interval = interval
|
||||
continue
|
||||
}
|
||||
|
||||
config.Configs[parts[1]] = val
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m AnnotationConfigMap) GetAnnotationConfig(metricName string, metricType autoscalingv2.MetricSourceType) (*AnnotationConfigs, bool) {
|
||||
key := MetricConfigKey{MetricName: metricName, Type: metricType}
|
||||
config, ok := m[key]
|
||||
return config, ok
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
package annotations
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
)
|
||||
|
||||
func TestParser(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
Name string
|
||||
Annotations map[string]string
|
||||
MetricName string
|
||||
MetricType autoscalingv2.MetricSourceType
|
||||
ExpectedConfig map[string]string
|
||||
PerReplica bool
|
||||
}{
|
||||
{
|
||||
Name: "no annotations",
|
||||
Annotations: map[string]string{},
|
||||
ExpectedConfig: map[string]string{},
|
||||
},
|
||||
{
|
||||
Name: "pod metrics",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps",
|
||||
"metric-config.pods.requests-per-second.json-path/path": "/metrics",
|
||||
"metric-config.pods.requests-per-second.json-path/port": "9090",
|
||||
"metric-config.pods.requests-per-second.json-path/scheme": "https",
|
||||
},
|
||||
MetricName: "requests-per-second",
|
||||
MetricType: autoscalingv2.PodsMetricSourceType,
|
||||
ExpectedConfig: map[string]string{
|
||||
"json-key": "$.http_server.rps",
|
||||
"path": "/metrics",
|
||||
"port": "9090",
|
||||
"scheme": "https",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "prometheus metrics",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.object.processed-events-per-second.prometheus/query": "scalar(sum(rate(event-service_events_count{application=\"event-service\",processed=\"true\"}[1m])))",
|
||||
"metric-config.object.processed-events-per-second.prometheus/per-replica": "true",
|
||||
},
|
||||
MetricName: "processed-events-per-second",
|
||||
MetricType: autoscalingv2.ObjectMetricSourceType,
|
||||
ExpectedConfig: map[string]string{
|
||||
"query": "scalar(sum(rate(event-service_events_count{application=\"event-service\",processed=\"true\"}[1m])))",
|
||||
},
|
||||
PerReplica: true,
|
||||
},
|
||||
{
|
||||
Name: "zmon collector",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.external.zmon-check.zmon/key": "custom.*",
|
||||
"metric-config.external.zmon-check.zmon/tag-application": "my-custom-app-*",
|
||||
},
|
||||
MetricName: "zmon-check",
|
||||
MetricType: autoscalingv2.ExternalMetricSourceType,
|
||||
ExpectedConfig: map[string]string{
|
||||
"key": "custom.*",
|
||||
"tag-application": "my-custom-app-*",
|
||||
},
|
||||
PerReplica: false,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
hpaMap := make(AnnotationConfigMap)
|
||||
err := hpaMap.Parse(tc.Annotations)
|
||||
require.NoError(t, err)
|
||||
config, present := hpaMap.GetAnnotationConfig(tc.MetricName, tc.MetricType)
|
||||
if len(tc.ExpectedConfig) == 0 {
|
||||
require.False(t, present)
|
||||
return
|
||||
}
|
||||
require.True(t, present)
|
||||
for k, v := range tc.ExpectedConfig {
|
||||
require.Equal(t, v, config.Configs[k])
|
||||
}
|
||||
require.Equal(t, tc.PerReplica, config.PerReplica)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/sqs"
|
||||
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
@@ -32,13 +32,13 @@ func NewAWSCollectorPlugin(sessions map[string]*session.Session) *AWSCollectorPl
|
||||
}
|
||||
|
||||
// NewCollector initializes a new skipper collector from the specified HPA.
|
||||
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
switch config.Name {
|
||||
func (c *AWSCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
switch config.Metric.Name {
|
||||
case AWSSQSQueueLengthMetric:
|
||||
return NewAWSSQSCollector(c.sessions, config, interval)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
|
||||
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
|
||||
}
|
||||
|
||||
type AWSSQSCollector struct {
|
||||
@@ -47,18 +47,20 @@ type AWSSQSCollector struct {
|
||||
region string
|
||||
queueURL string
|
||||
queueName string
|
||||
labels map[string]string
|
||||
metricName string
|
||||
metricType autoscalingv2beta1.MetricSourceType
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
}
|
||||
|
||||
func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConfig, interval time.Duration) (*AWSSQSCollector, error) {
|
||||
if config.Metric.Selector == nil {
|
||||
return nil, fmt.Errorf("selector for queue is not specified")
|
||||
}
|
||||
|
||||
name, ok := config.Labels[sqsQueueNameLabelKey]
|
||||
name, ok := config.Config[sqsQueueNameLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("sqs queue name not specified on metric")
|
||||
}
|
||||
region, ok := config.Labels[sqsQueueRegionLabelKey]
|
||||
region, ok := config.Config[sqsQueueRegionLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("sqs queue region is not specified on metric")
|
||||
}
|
||||
@@ -83,9 +85,8 @@ func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConf
|
||||
interval: interval,
|
||||
queueURL: aws.StringValue(resp.QueueUrl),
|
||||
queueName: name,
|
||||
metricName: config.Name,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
labels: config.Labels,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -109,8 +110,8 @@ func (c *AWSSQSCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
metricValue := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metricName,
|
||||
MetricLabels: c.labels,
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewQuantity(int64(i), resource.DecimalSI),
|
||||
},
|
||||
|
||||
+41
-123
@@ -2,22 +2,16 @@ package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/annotations"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
customMetricsPrefix = "metric-config."
|
||||
perReplicaMetricsConfKey = "per-replica"
|
||||
intervalMetricsConfKey = "interval"
|
||||
)
|
||||
|
||||
type ObjectReference struct {
|
||||
autoscalingv2beta1.CrossVersionObjectReference
|
||||
autoscalingv2.CrossVersionObjectReference
|
||||
Namespace string
|
||||
}
|
||||
|
||||
@@ -49,7 +43,7 @@ func NewCollectorFactory() *CollectorFactory {
|
||||
}
|
||||
|
||||
type CollectorPlugin interface {
|
||||
NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
|
||||
NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error)
|
||||
}
|
||||
|
||||
func (c *CollectorFactory) RegisterPodsCollector(metricCollector string, plugin CollectorPlugin) error {
|
||||
@@ -106,9 +100,9 @@ func (c *CollectorFactory) RegisterExternalCollector(metrics []string, plugin Co
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
func (c *CollectorFactory) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
switch config.Type {
|
||||
case autoscalingv2beta1.PodsMetricSourceType:
|
||||
case autoscalingv2.PodsMetricSourceType:
|
||||
// first try to find a plugin by format
|
||||
if plugin, ok := c.podsPlugins.Named[config.CollectorName]; ok {
|
||||
return plugin.NewCollector(hpa, config, interval)
|
||||
@@ -118,7 +112,7 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
|
||||
if c.podsPlugins.Any != nil {
|
||||
return c.podsPlugins.Any.NewCollector(hpa, config, interval)
|
||||
}
|
||||
case autoscalingv2beta1.ObjectMetricSourceType:
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
// first try to find a plugin by kind
|
||||
if kinds, ok := c.objectPlugins.Named[config.ObjectReference.Kind]; ok {
|
||||
if plugin, ok := kinds.Named[config.CollectorName]; ok {
|
||||
@@ -139,8 +133,8 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
|
||||
if c.objectPlugins.Any.Any != nil {
|
||||
return c.objectPlugins.Any.Any.NewCollector(hpa, config, interval)
|
||||
}
|
||||
case autoscalingv2beta1.ExternalMetricSourceType:
|
||||
if plugin, ok := c.externalPlugins[config.Name]; ok {
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
if plugin, ok := c.externalPlugins[config.Metric.Name]; ok {
|
||||
return plugin.NewCollector(hpa, config, interval)
|
||||
}
|
||||
}
|
||||
@@ -148,31 +142,15 @@ func (c *CollectorFactory) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAut
|
||||
return nil, fmt.Errorf("no plugin found for %s", config.MetricTypeName)
|
||||
}
|
||||
|
||||
func getObjectReference(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, metricName string) (custom_metrics.ObjectReference, error) {
|
||||
for _, metric := range hpa.Spec.Metrics {
|
||||
if metric.Type == autoscalingv2beta1.ObjectMetricSourceType && metric.Object.MetricName == metricName {
|
||||
return custom_metrics.ObjectReference{
|
||||
APIVersion: metric.Object.Target.APIVersion,
|
||||
Kind: metric.Object.Target.Kind,
|
||||
Name: metric.Object.Target.Name,
|
||||
Namespace: hpa.Namespace,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return custom_metrics.ObjectReference{}, fmt.Errorf("failed to find object reference")
|
||||
}
|
||||
|
||||
type MetricTypeName struct {
|
||||
Type autoscalingv2beta1.MetricSourceType
|
||||
Name string
|
||||
Type autoscalingv2.MetricSourceType
|
||||
Metric autoscalingv2.MetricIdentifier
|
||||
}
|
||||
|
||||
type CollectedMetric struct {
|
||||
Type autoscalingv2beta1.MetricSourceType
|
||||
Type autoscalingv2.MetricSourceType
|
||||
Custom custom_metrics.MetricValue
|
||||
External external_metrics.ExternalMetricValue
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
type Collector interface {
|
||||
@@ -187,83 +165,16 @@ type MetricConfig struct {
|
||||
ObjectReference custom_metrics.ObjectReference
|
||||
PerReplica bool
|
||||
Interval time.Duration
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
func parseCustomMetricsAnnotations(annotations map[string]string) (map[MetricTypeName]*MetricConfig, error) {
|
||||
metrics := make(map[MetricTypeName]*MetricConfig)
|
||||
for key, val := range annotations {
|
||||
if !strings.HasPrefix(key, customMetricsPrefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
parts := strings.Split(key, "/")
|
||||
if len(parts) != 2 {
|
||||
// TODO: error?
|
||||
continue
|
||||
}
|
||||
|
||||
configs := strings.Split(parts[0], ".")
|
||||
if len(configs) != 4 {
|
||||
// TODO: error?
|
||||
continue
|
||||
}
|
||||
|
||||
metricTypeName := MetricTypeName{
|
||||
Name: configs[2],
|
||||
}
|
||||
|
||||
switch configs[1] {
|
||||
case "pods":
|
||||
metricTypeName.Type = autoscalingv2beta1.PodsMetricSourceType
|
||||
case "object":
|
||||
metricTypeName.Type = autoscalingv2beta1.ObjectMetricSourceType
|
||||
}
|
||||
|
||||
metricCollector := configs[3]
|
||||
|
||||
config, ok := metrics[metricTypeName]
|
||||
if !ok {
|
||||
config = &MetricConfig{
|
||||
MetricTypeName: metricTypeName,
|
||||
CollectorName: metricCollector,
|
||||
Config: map[string]string{},
|
||||
}
|
||||
metrics[metricTypeName] = config
|
||||
}
|
||||
|
||||
// TODO: fail if collector name doesn't match
|
||||
if config.CollectorName != metricCollector {
|
||||
continue
|
||||
}
|
||||
|
||||
if parts[1] == perReplicaMetricsConfKey {
|
||||
config.PerReplica = true
|
||||
continue
|
||||
}
|
||||
|
||||
if parts[1] == intervalMetricsConfKey {
|
||||
interval, err := time.ParseDuration(val)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse interval value %s for %s: %v", val, key, err)
|
||||
}
|
||||
config.Interval = interval
|
||||
continue
|
||||
}
|
||||
|
||||
config.Config[parts[1]] = val
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
// ParseHPAMetrics parses the HPA object into a list of metric configurations.
|
||||
func ParseHPAMetrics(hpa *autoscalingv2beta1.HorizontalPodAutoscaler) ([]*MetricConfig, error) {
|
||||
func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfig, error) {
|
||||
metricConfigs := make([]*MetricConfig, 0, len(hpa.Spec.Metrics))
|
||||
|
||||
// TODO: validate that the specified metric names are defined
|
||||
// in the HPA
|
||||
configs, err := parseCustomMetricsAnnotations(hpa.Annotations)
|
||||
parser := make(annotations.AnnotationConfigMap)
|
||||
err := parser.Parse(hpa.Annotations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -275,39 +186,46 @@ func ParseHPAMetrics(hpa *autoscalingv2beta1.HorizontalPodAutoscaler) ([]*Metric
|
||||
|
||||
var ref custom_metrics.ObjectReference
|
||||
switch metric.Type {
|
||||
case autoscalingv2beta1.PodsMetricSourceType:
|
||||
typeName.Name = metric.Pods.MetricName
|
||||
case autoscalingv2beta1.ObjectMetricSourceType:
|
||||
typeName.Name = metric.Object.MetricName
|
||||
case autoscalingv2.PodsMetricSourceType:
|
||||
typeName.Metric = metric.Pods.Metric
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
typeName.Metric = metric.Object.Metric
|
||||
ref = custom_metrics.ObjectReference{
|
||||
APIVersion: metric.Object.Target.APIVersion,
|
||||
Kind: metric.Object.Target.Kind,
|
||||
Name: metric.Object.Target.Name,
|
||||
APIVersion: metric.Object.DescribedObject.APIVersion,
|
||||
Kind: metric.Object.DescribedObject.Kind,
|
||||
Name: metric.Object.DescribedObject.Name,
|
||||
Namespace: hpa.Namespace,
|
||||
}
|
||||
case autoscalingv2beta1.ExternalMetricSourceType:
|
||||
typeName.Name = metric.External.MetricName
|
||||
case autoscalingv2beta1.ResourceMetricSourceType:
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
typeName.Metric = metric.External.Metric
|
||||
case autoscalingv2.ResourceMetricSourceType:
|
||||
continue // kube-metrics-adapter does not collect resource metrics
|
||||
}
|
||||
|
||||
if config, ok := configs[typeName]; ok {
|
||||
config.ObjectReference = ref
|
||||
metricConfigs = append(metricConfigs, config)
|
||||
continue
|
||||
}
|
||||
|
||||
config := &MetricConfig{
|
||||
MetricTypeName: typeName,
|
||||
ObjectReference: ref,
|
||||
Config: map[string]string{},
|
||||
}
|
||||
|
||||
if metric.Type == autoscalingv2beta1.ExternalMetricSourceType {
|
||||
config.Labels = metric.External.MetricSelector.MatchLabels
|
||||
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
|
||||
metric.External.Metric.Selector != nil &&
|
||||
metric.External.Metric.Selector.MatchLabels != nil {
|
||||
config.Config = metric.External.Metric.Selector.MatchLabels
|
||||
}
|
||||
|
||||
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)
|
||||
if present {
|
||||
config.CollectorName = annotationConfigs.CollectorName
|
||||
config.Interval = annotationConfigs.Interval
|
||||
config.PerReplica = annotationConfigs.PerReplica
|
||||
// configs specified in annotations takes precedence
|
||||
// over labels
|
||||
for k, v := range annotationConfigs.Configs {
|
||||
config.Config[k] = v
|
||||
}
|
||||
}
|
||||
metricConfigs = append(metricConfigs, config)
|
||||
}
|
||||
|
||||
return metricConfigs, nil
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/oliveagle/jsonpath"
|
||||
"k8s.io/api/core/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// JSONPathMetricsGetter is a metrics getter which looks up pod metrics by
|
||||
@@ -58,7 +58,7 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
|
||||
// GetMetric gets metric from pod by fetching json metrics from the pods metric
|
||||
// endpoint and extracting the desired value using the specified json path
|
||||
// query.
|
||||
func (g *JSONPathMetricsGetter) GetMetric(pod *v1.Pod) (float64, error) {
|
||||
func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
|
||||
data, err := getPodMetrics(pod, g.scheme, g.path, g.port)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -89,7 +89,7 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *v1.Pod) (float64, error) {
|
||||
}
|
||||
|
||||
// getPodMetrics returns the content of the pods metrics endpoint.
|
||||
func getPodMetrics(pod *v1.Pod, scheme, path string, port int) ([]byte, error) {
|
||||
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
|
||||
if pod.Status.PodIP == "" {
|
||||
return nil, fmt.Errorf("pod %s/%s does not have a pod IP", pod.Namespace, pod.Namespace)
|
||||
}
|
||||
|
||||
@@ -1,42 +1,67 @@
|
||||
package collector
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
// MaxCollector is a simple aggregator collector that returns the maximum value
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
)
|
||||
|
||||
// MaxWeightedCollector is a simple aggregator collector that returns the maximum value
|
||||
// of metrics from all collectors.
|
||||
type MaxCollector struct {
|
||||
type MaxWeightedCollector struct {
|
||||
collectors []Collector
|
||||
interval time.Duration
|
||||
weight float64
|
||||
}
|
||||
|
||||
// NewMaxCollector initializes a new MacCollector.
|
||||
func NewMaxCollector(interval time.Duration, collectors ...Collector) *MaxCollector {
|
||||
return &MaxCollector{
|
||||
// NewMaxWeightedCollector initializes a new MaxWeightedCollector.
|
||||
func NewMaxWeightedCollector(interval time.Duration, weight float64, collectors ...Collector) *MaxWeightedCollector {
|
||||
return &MaxWeightedCollector{
|
||||
collectors: collectors,
|
||||
interval: interval,
|
||||
weight: weight,
|
||||
}
|
||||
}
|
||||
|
||||
// GetMetrics gets metrics from all collectors and return the higest value.
|
||||
func (c *MaxCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
var max CollectedMetric
|
||||
func (c *MaxWeightedCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
errors := make([]error, 0)
|
||||
collectedMetrics := make([]CollectedMetric, 0)
|
||||
for _, collector := range c.collectors {
|
||||
values, err := collector.GetMetrics()
|
||||
if err != nil {
|
||||
if _, ok := err.(NoResultError); ok {
|
||||
errors = append(errors, err)
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, value := range values {
|
||||
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
|
||||
max = value
|
||||
}
|
||||
}
|
||||
|
||||
collectedMetrics = append(collectedMetrics, values...)
|
||||
}
|
||||
if len(collectedMetrics) == 0 {
|
||||
if len(errors) == 0 {
|
||||
return nil, fmt.Errorf("no metrics collected, cannot determine max")
|
||||
}
|
||||
errorStrings := make([]string, len(errors))
|
||||
for i, e := range errors {
|
||||
errorStrings[i] = e.Error()
|
||||
}
|
||||
allErrors := strings.Join(errorStrings, ",")
|
||||
return nil, fmt.Errorf("could not determine maximum due to errors: %s", allErrors)
|
||||
}
|
||||
max := collectedMetrics[0]
|
||||
for _, value := range collectedMetrics {
|
||||
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
|
||||
max = value
|
||||
}
|
||||
}
|
||||
max.Custom.Value = *resource.NewMilliQuantity(int64(c.weight*float64(max.Custom.Value.MilliValue())), resource.DecimalSI)
|
||||
return []CollectedMetric{max}, nil
|
||||
}
|
||||
|
||||
// Interval returns the interval at which the collector should run.
|
||||
func (c *MaxCollector) Interval() time.Duration {
|
||||
func (c *MaxWeightedCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||
)
|
||||
|
||||
type dummyCollector struct {
|
||||
value int64
|
||||
}
|
||||
|
||||
func (c dummyCollector) Interval() time.Duration {
|
||||
return time.Second
|
||||
}
|
||||
|
||||
func (c dummyCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
switch c.value {
|
||||
case 0:
|
||||
return nil, NoResultError{query: "invalid query"}
|
||||
case -1:
|
||||
return nil, fmt.Errorf("test error")
|
||||
default:
|
||||
quantity := resource.NewQuantity(c.value, resource.DecimalSI)
|
||||
return []CollectedMetric{
|
||||
{
|
||||
Custom: custom_metrics.MetricValue{
|
||||
Value: *quantity,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func TestMaxCollector(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
values []int64
|
||||
expected int
|
||||
weight float64
|
||||
errored bool
|
||||
}{
|
||||
{
|
||||
name: "basic",
|
||||
values: []int64{100, 10, 9},
|
||||
expected: 100,
|
||||
weight: 1,
|
||||
errored: false,
|
||||
},
|
||||
{
|
||||
name: "weighted",
|
||||
values: []int64{100, 10, 9},
|
||||
expected: 20,
|
||||
weight: 0.2,
|
||||
errored: false,
|
||||
},
|
||||
{
|
||||
name: "with error",
|
||||
values: []int64{10, 9, -1},
|
||||
weight: 0.5,
|
||||
errored: true,
|
||||
},
|
||||
{
|
||||
name: "some invalid results",
|
||||
values: []int64{0, 1, 0, 10, 9},
|
||||
expected: 5,
|
||||
weight: 0.5,
|
||||
errored: false,
|
||||
},
|
||||
{
|
||||
name: "both invalid results and errors",
|
||||
values: []int64{0, 1, 0, -1, 10, 9},
|
||||
weight: 0.5,
|
||||
errored: true,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
collectors := make([]Collector, len(tc.values))
|
||||
for i, v := range tc.values {
|
||||
collectors[i] = dummyCollector{value: v}
|
||||
}
|
||||
wc := NewMaxWeightedCollector(time.Second, tc.weight, collectors...)
|
||||
metrics, err := wc.GetMetrics()
|
||||
if tc.errored {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Len(t, metrics, 1)
|
||||
require.EqualValues(t, tc.expected, metrics[0].Custom.Value.Value())
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
package collector
|
||||
|
||||
import autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
|
||||
type ObjectMetricsGetter interface {
|
||||
GetObjectMetric(namespace string, reference *autoscalingv2beta1.CrossVersionObjectReference) (float64, error)
|
||||
}
|
||||
|
||||
// type PodCollector struct {
|
||||
// client kubernetes.Interface
|
||||
// Getter PodMetricsGetter
|
||||
// podLabelSelector string
|
||||
// namespace string
|
||||
// metricName string
|
||||
// interval time.Duration
|
||||
// }
|
||||
|
||||
// func NewObjectCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, metricName string, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
// switch
|
||||
// }
|
||||
@@ -5,8 +5,8 @@ import (
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
"k8s.io/api/core/v1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@@ -24,26 +24,26 @@ func NewPodCollectorPlugin(client kubernetes.Interface) *PodCollectorPlugin {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
func (p *PodCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewPodCollector(p.client, hpa, config, interval)
|
||||
}
|
||||
|
||||
type PodCollector struct {
|
||||
client kubernetes.Interface
|
||||
Getter PodMetricsGetter
|
||||
podLabelSelector string
|
||||
podLabelSelector *metav1.LabelSelector
|
||||
namespace string
|
||||
metricName string
|
||||
metricType autoscalingv2beta1.MetricSourceType
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
interval time.Duration
|
||||
logger *log.Entry
|
||||
}
|
||||
|
||||
type PodMetricsGetter interface {
|
||||
GetMetric(pod *v1.Pod) (float64, error)
|
||||
GetMetric(pod *corev1.Pod) (float64, error)
|
||||
}
|
||||
|
||||
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
|
||||
func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PodCollector, error) {
|
||||
// get pod selector based on HPA scale target ref
|
||||
selector, err := getPodLabelSelector(client, hpa)
|
||||
if err != nil {
|
||||
@@ -53,7 +53,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.Horizo
|
||||
c := &PodCollector{
|
||||
client: client,
|
||||
namespace: hpa.Namespace,
|
||||
metricName: config.Name,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
interval: interval,
|
||||
podLabelSelector: selector,
|
||||
@@ -79,7 +79,7 @@ func NewPodCollector(client kubernetes.Interface, hpa *autoscalingv2beta1.Horizo
|
||||
|
||||
func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
opts := metav1.ListOptions{
|
||||
LabelSelector: c.podLabelSelector,
|
||||
LabelSelector: labels.Set(c.podLabelSelector.MatchLabels).String(),
|
||||
}
|
||||
|
||||
pods, err := c.client.CoreV1().Pods(c.namespace).List(opts)
|
||||
@@ -106,11 +106,10 @@ func (c *PodCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
},
|
||||
MetricName: c.metricName,
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
|
||||
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.podLabelSelector},
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewMilliQuantity(int64(value*1000), resource.DecimalSI),
|
||||
},
|
||||
Labels: pod.Labels,
|
||||
}
|
||||
|
||||
values = append(values, metricValue)
|
||||
@@ -123,21 +122,21 @@ func (c *PodCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
||||
|
||||
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler) (string, error) {
|
||||
func getPodLabelSelector(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (*metav1.LabelSelector, error) {
|
||||
switch hpa.Spec.ScaleTargetRef.Kind {
|
||||
case "Deployment":
|
||||
deployment, err := client.AppsV1().Deployments(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
return labels.Set(deployment.Spec.Selector.MatchLabels).String(), nil
|
||||
return deployment.Spec.Selector, nil
|
||||
case "StatefulSet":
|
||||
sts, err := client.AppsV1().StatefulSets(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
return labels.Set(sts.Spec.Selector.MatchLabels).String(), nil
|
||||
return sts.Spec.Selector, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("unable to get pod label selector for scale target ref '%s'", hpa.Spec.ScaleTargetRef.Kind)
|
||||
return nil, fmt.Errorf("unable to get pod label selector for scale target ref '%s'", hpa.Spec.ScaleTargetRef.Kind)
|
||||
}
|
||||
|
||||
@@ -9,13 +9,28 @@ import (
|
||||
"github.com/prometheus/client_golang/api"
|
||||
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
||||
"github.com/prometheus/common/model"
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
PrometheusMetricName = "prometheus-query"
|
||||
prometheusQueryNameLabelKey = "query-name"
|
||||
prometheusServerAnnotationKey = "prometheus-server"
|
||||
)
|
||||
|
||||
type NoResultError struct {
|
||||
query string
|
||||
}
|
||||
|
||||
func (r NoResultError) Error() string {
|
||||
return fmt.Sprintf("query '%s' did not result a valid response", r.query)
|
||||
}
|
||||
|
||||
type PrometheusCollectorPlugin struct {
|
||||
promAPI promv1.API
|
||||
client kubernetes.Interface
|
||||
@@ -24,7 +39,7 @@ type PrometheusCollectorPlugin struct {
|
||||
func NewPrometheusCollectorPlugin(client kubernetes.Interface, prometheusServer string) (*PrometheusCollectorPlugin, error) {
|
||||
cfg := api.Config{
|
||||
Address: prometheusServer,
|
||||
RoundTripper: &http.Transport{},
|
||||
RoundTripper: http.DefaultTransport,
|
||||
}
|
||||
|
||||
promClient, err := api.NewClient(cfg)
|
||||
@@ -38,7 +53,7 @@ func NewPrometheusCollectorPlugin(client kubernetes.Interface, prometheusServer
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *PrometheusCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
func (p *PrometheusCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewPrometheusCollector(p.client, p.promAPI, hpa, config, interval)
|
||||
}
|
||||
|
||||
@@ -46,31 +61,65 @@ type PrometheusCollector struct {
|
||||
client kubernetes.Interface
|
||||
promAPI promv1.API
|
||||
query string
|
||||
metricName string
|
||||
metricType autoscalingv2beta1.MetricSourceType
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
objectReference custom_metrics.ObjectReference
|
||||
interval time.Duration
|
||||
perReplica bool
|
||||
hpa *autoscalingv2beta1.HorizontalPodAutoscaler
|
||||
hpa *autoscalingv2.HorizontalPodAutoscaler
|
||||
}
|
||||
|
||||
func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PrometheusCollector, error) {
|
||||
func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*PrometheusCollector, error) {
|
||||
c := &PrometheusCollector{
|
||||
client: client,
|
||||
objectReference: config.ObjectReference,
|
||||
metricName: config.Name,
|
||||
metricType: config.Type,
|
||||
interval: interval,
|
||||
promAPI: promAPI,
|
||||
perReplica: config.PerReplica,
|
||||
hpa: hpa,
|
||||
client: client,
|
||||
promAPI: promAPI,
|
||||
interval: interval,
|
||||
hpa: hpa,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
}
|
||||
|
||||
if v, ok := config.Config["query"]; ok {
|
||||
// TODO: validate query
|
||||
c.query = v
|
||||
} else {
|
||||
return nil, fmt.Errorf("no prometheus query defined")
|
||||
switch config.Type {
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
c.objectReference = config.ObjectReference
|
||||
c.perReplica = config.PerReplica
|
||||
|
||||
if v, ok := config.Config["query"]; ok {
|
||||
// TODO: validate query
|
||||
c.query = v
|
||||
} else {
|
||||
return nil, fmt.Errorf("no prometheus query defined")
|
||||
}
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
if config.Metric.Selector == nil {
|
||||
return nil, fmt.Errorf("selector for prometheus query is not specified")
|
||||
}
|
||||
|
||||
queryName, ok := config.Config[prometheusQueryNameLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("query name not specified on metric")
|
||||
}
|
||||
|
||||
if v, ok := config.Config[queryName]; ok {
|
||||
// TODO: validate query
|
||||
c.query = v
|
||||
} else {
|
||||
return nil, fmt.Errorf("no prometheus query defined for metric")
|
||||
}
|
||||
|
||||
// Use custom Prometheus URL if defined in HPA annotation.
|
||||
if promServer, ok := config.Config[prometheusServerAnnotationKey]; ok {
|
||||
cfg := api.Config{
|
||||
Address: promServer,
|
||||
RoundTripper: http.DefaultTransport,
|
||||
}
|
||||
|
||||
promClient, err := api.NewClient(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.promAPI = promv1.NewAPI(promClient)
|
||||
}
|
||||
}
|
||||
|
||||
return c, nil
|
||||
@@ -88,7 +137,7 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
case model.ValVector:
|
||||
samples := value.(model.Vector)
|
||||
if len(samples) == 0 {
|
||||
return nil, fmt.Errorf("query '%s' returned no samples", c.query)
|
||||
return nil, &NoResultError{query: c.query}
|
||||
}
|
||||
|
||||
sampleValue = samples[0].Value
|
||||
@@ -98,7 +147,7 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
}
|
||||
|
||||
if sampleValue.String() == "NaN" {
|
||||
return nil, fmt.Errorf("query '%s' returned no samples: %s", c.query, sampleValue.String())
|
||||
return nil, &NoResultError{query: c.query}
|
||||
}
|
||||
|
||||
if c.perReplica {
|
||||
@@ -113,14 +162,28 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
sampleValue = model.SampleValue(float64(sampleValue) / float64(replicas))
|
||||
}
|
||||
|
||||
metricValue := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Custom: custom_metrics.MetricValue{
|
||||
DescribedObject: c.objectReference,
|
||||
MetricName: c.metricName,
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
|
||||
},
|
||||
var metricValue CollectedMetric
|
||||
switch c.metricType {
|
||||
case autoscalingv2.ObjectMetricSourceType:
|
||||
metricValue = CollectedMetric{
|
||||
Type: c.metricType,
|
||||
Custom: custom_metrics.MetricValue{
|
||||
DescribedObject: c.objectReference,
|
||||
Metric: custom_metrics.MetricIdentifier{Name: c.metric.Name, Selector: c.metric.Selector},
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
|
||||
},
|
||||
}
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
metricValue = CollectedMetric{
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Time{Time: time.Now().UTC()},
|
||||
Value: *resource.NewMilliQuantity(int64(sampleValue*1000), resource.DecimalSI),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return []CollectedMetric{metricValue}, nil
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
@@ -13,73 +16,149 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
|
||||
rpsMetricName = "requests-per-second"
|
||||
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
|
||||
rpsMetricName = "requests-per-second"
|
||||
rpsMetricBackendSeparator = ","
|
||||
)
|
||||
|
||||
var (
|
||||
errBackendNameMissing = errors.New("backend name must be specified for requests-per-second when traffic switching is used")
|
||||
)
|
||||
|
||||
// SkipperCollectorPlugin is a collector plugin for initializing metrics
|
||||
// collectors for getting skipper ingress metrics.
|
||||
type SkipperCollectorPlugin struct {
|
||||
client kubernetes.Interface
|
||||
plugin CollectorPlugin
|
||||
client kubernetes.Interface
|
||||
plugin CollectorPlugin
|
||||
backendAnnotations []string
|
||||
}
|
||||
|
||||
// NewSkipperCollectorPlugin initializes a new SkipperCollectorPlugin.
|
||||
func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin) (*SkipperCollectorPlugin, error) {
|
||||
func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin, backendAnnotations []string) (*SkipperCollectorPlugin, error) {
|
||||
return &SkipperCollectorPlugin{
|
||||
client: client,
|
||||
plugin: prometheusPlugin,
|
||||
client: client,
|
||||
plugin: prometheusPlugin,
|
||||
backendAnnotations: backendAnnotations,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewCollector initializes a new skipper collector from the specified HPA.
|
||||
func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
switch config.Name {
|
||||
case rpsMetricName:
|
||||
return NewSkipperCollector(c.client, c.plugin, hpa, config, interval)
|
||||
default:
|
||||
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
|
||||
func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
if strings.HasPrefix(config.Metric.Name, rpsMetricName) {
|
||||
backend := ""
|
||||
if len(config.Metric.Name) > len(rpsMetricName) {
|
||||
metricNameParts := strings.Split(config.Metric.Name, rpsMetricBackendSeparator)
|
||||
if len(metricNameParts) == 2 {
|
||||
backend = metricNameParts[1]
|
||||
}
|
||||
}
|
||||
return NewSkipperCollector(c.client, c.plugin, hpa, config, interval, c.backendAnnotations, backend)
|
||||
}
|
||||
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
|
||||
}
|
||||
|
||||
// SkipperCollector is a metrics collector for getting skipper ingress metrics.
|
||||
// It depends on the prometheus collector for getting the metrics.
|
||||
type SkipperCollector struct {
|
||||
client kubernetes.Interface
|
||||
metricName string
|
||||
objectReference custom_metrics.ObjectReference
|
||||
hpa *autoscalingv2beta1.HorizontalPodAutoscaler
|
||||
interval time.Duration
|
||||
plugin CollectorPlugin
|
||||
config MetricConfig
|
||||
client kubernetes.Interface
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
objectReference custom_metrics.ObjectReference
|
||||
hpa *autoscalingv2.HorizontalPodAutoscaler
|
||||
interval time.Duration
|
||||
plugin CollectorPlugin
|
||||
config MetricConfig
|
||||
backend string
|
||||
backendAnnotations []string
|
||||
}
|
||||
|
||||
// NewSkipperCollector initializes a new SkipperCollector.
|
||||
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*SkipperCollector, error) {
|
||||
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2.HorizontalPodAutoscaler,
|
||||
config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) {
|
||||
return &SkipperCollector{
|
||||
client: client,
|
||||
objectReference: config.ObjectReference,
|
||||
hpa: hpa,
|
||||
metricName: config.Name,
|
||||
interval: interval,
|
||||
plugin: plugin,
|
||||
config: *config,
|
||||
client: client,
|
||||
objectReference: config.ObjectReference,
|
||||
hpa: hpa,
|
||||
metric: config.Metric,
|
||||
interval: interval,
|
||||
plugin: plugin,
|
||||
config: *config,
|
||||
backend: backend,
|
||||
backendAnnotations: backendAnnotations,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getAnnotationWeight(backendWeights string, backend string) float64 {
|
||||
var weightsMap map[string]int
|
||||
err := json.Unmarshal([]byte(backendWeights), &weightsMap)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
if weight, ok := weightsMap[backend]; ok {
|
||||
return float64(weight) / 100
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func getWeights(ingressAnnotations map[string]string, backendAnnotations []string, backend string) (float64, error) {
|
||||
maxWeight := 0.0
|
||||
annotationsPresent := false
|
||||
|
||||
for _, anno := range backendAnnotations {
|
||||
if weightsMap, ok := ingressAnnotations[anno]; ok {
|
||||
annotationsPresent = true
|
||||
maxWeight = math.Max(maxWeight, getAnnotationWeight(weightsMap, backend))
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback for ingresses that don't use traffic switching
|
||||
if !annotationsPresent {
|
||||
return 1.0, nil
|
||||
}
|
||||
|
||||
// Require backend name here
|
||||
if backend != "" {
|
||||
return maxWeight, nil
|
||||
}
|
||||
|
||||
return 0.0, errBackendNameMissing
|
||||
}
|
||||
|
||||
// getCollector returns a collector for getting the metrics.
|
||||
func (c *SkipperCollector) getCollector() (Collector, error) {
|
||||
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
|
||||
var annotations map[string]string
|
||||
var hosts []string
|
||||
|
||||
switch c.objectReference.APIVersion {
|
||||
case "extensions/v1beta1":
|
||||
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
annotations = ingress.Annotations
|
||||
for _, rule := range ingress.Spec.Rules {
|
||||
hosts = append(hosts, rule.Host)
|
||||
}
|
||||
case "networking.k8s.io/v1beta1":
|
||||
ingress, err := c.client.NetworkingV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
annotations = ingress.Annotations
|
||||
for _, rule := range ingress.Spec.Rules {
|
||||
hosts = append(hosts, rule.Host)
|
||||
}
|
||||
}
|
||||
|
||||
backendWeight, err := getWeights(annotations, c.backendAnnotations, c.backend)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config := c.config
|
||||
|
||||
var collector Collector
|
||||
collectors := make([]Collector, 0, len(ingress.Spec.Rules))
|
||||
for _, rule := range ingress.Spec.Rules {
|
||||
host := strings.Replace(rule.Host, ".", "_", -1)
|
||||
collectors := make([]Collector, 0, len(hosts))
|
||||
for _, host := range hosts {
|
||||
host := strings.Replace(host, ".", "_", -1)
|
||||
config.Config = map[string]string{
|
||||
"query": fmt.Sprintf(rpsQuery, host),
|
||||
}
|
||||
@@ -92,10 +171,9 @@ func (c *SkipperCollector) getCollector() (Collector, error) {
|
||||
|
||||
collectors = append(collectors, collector)
|
||||
}
|
||||
if len(collectors) > 1 {
|
||||
collector = NewMaxCollector(c.interval, collectors...)
|
||||
} else if len(collectors) == 1 {
|
||||
collector = collectors[0]
|
||||
|
||||
if len(collectors) > 0 {
|
||||
collector = NewMaxWeightedCollector(c.interval, backendWeight, collectors...)
|
||||
} else {
|
||||
return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name)
|
||||
}
|
||||
@@ -144,7 +222,7 @@ func (c *SkipperCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
||||
|
||||
func targetRefReplicas(client kubernetes.Interface, hpa *autoscalingv2beta1.HorizontalPodAutoscaler) (int32, error) {
|
||||
func targetRefReplicas(client kubernetes.Interface, hpa *autoscalingv2.HorizontalPodAutoscaler) (int32, error) {
|
||||
var replicas int32
|
||||
switch hpa.Spec.ScaleTargetRef.Kind {
|
||||
case "Deployment":
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -7,7 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon"
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
@@ -41,17 +41,13 @@ func NewZMONCollectorPlugin(zmon zmon.ZMON) (*ZMONCollectorPlugin, error) {
|
||||
}
|
||||
|
||||
// NewCollector initializes a new ZMON collector from the specified HPA.
|
||||
func (c *ZMONCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
switch config.Name {
|
||||
func (c *ZMONCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
switch config.Metric.Name {
|
||||
case ZMONCheckMetric:
|
||||
annotations := map[string]string{}
|
||||
if hpa != nil {
|
||||
annotations = hpa.Annotations
|
||||
}
|
||||
return NewZMONCollector(c.zmon, config, annotations, interval)
|
||||
return NewZMONCollector(c.zmon, config, interval)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
|
||||
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
|
||||
}
|
||||
|
||||
// ZMONCollector defines a collector that is able to collect metrics from ZMON.
|
||||
@@ -60,17 +56,20 @@ type ZMONCollector struct {
|
||||
interval time.Duration
|
||||
checkID int
|
||||
key string
|
||||
labels map[string]string
|
||||
tags map[string]string
|
||||
duration time.Duration
|
||||
aggregators []string
|
||||
metricName string
|
||||
metricType autoscalingv2beta1.MetricSourceType
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
}
|
||||
|
||||
// NewZMONCollector initializes a new ZMONCollector.
|
||||
func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[string]string, interval time.Duration) (*ZMONCollector, error) {
|
||||
checkIDStr, ok := config.Labels[zmonCheckIDLabelKey]
|
||||
func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, interval time.Duration) (*ZMONCollector, error) {
|
||||
if config.Metric.Selector == nil {
|
||||
return nil, fmt.Errorf("selector for zmon-check is not specified")
|
||||
}
|
||||
|
||||
checkIDStr, ok := config.Config[zmonCheckIDLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("ZMON check ID not specified on metric")
|
||||
}
|
||||
@@ -83,19 +82,14 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
|
||||
key := ""
|
||||
|
||||
// get optional key
|
||||
if k, ok := config.Labels[zmonKeyLabelKey]; ok {
|
||||
key = k
|
||||
}
|
||||
|
||||
// annotations takes precedence over label
|
||||
if k, ok := annotations[zmonKeyAnnotationKey]; ok {
|
||||
if k, ok := config.Config[zmonKeyLabelKey]; ok {
|
||||
key = k
|
||||
}
|
||||
|
||||
duration := defaultQueryDuration
|
||||
|
||||
// parse optional duration value
|
||||
if d, ok := config.Labels[zmonDurationLabelKey]; ok {
|
||||
if d, ok := config.Config[zmonDurationLabelKey]; ok {
|
||||
duration, err = time.ParseDuration(d)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -104,26 +98,16 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
|
||||
|
||||
// parse tags
|
||||
tags := make(map[string]string)
|
||||
for k, v := range config.Labels {
|
||||
for k, v := range config.Config {
|
||||
if strings.HasPrefix(k, zmonTagPrefixLabelKey) {
|
||||
key := strings.TrimPrefix(k, zmonTagPrefixLabelKey)
|
||||
tags[key] = v
|
||||
}
|
||||
}
|
||||
|
||||
// parse tags from annotations
|
||||
// tags defined in annotations takes precedence over tags defined in
|
||||
// the labels.
|
||||
for k, v := range annotations {
|
||||
if strings.HasPrefix(k, zmonTagPrefixAnnotationKey) {
|
||||
key := strings.TrimPrefix(k, zmonTagPrefixAnnotationKey)
|
||||
tags[key] = v
|
||||
}
|
||||
}
|
||||
|
||||
// default aggregator is last
|
||||
aggregators := []string{"last"}
|
||||
if k, ok := config.Labels[zmonAggregatorsLabelKey]; ok {
|
||||
if k, ok := config.Config[zmonAggregatorsLabelKey]; ok {
|
||||
aggregators = strings.Split(k, ",")
|
||||
}
|
||||
|
||||
@@ -135,9 +119,8 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
|
||||
tags: tags,
|
||||
duration: duration,
|
||||
aggregators: aggregators,
|
||||
metricName: config.Name,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
labels: config.Labels,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -159,8 +142,8 @@ func (c *ZMONCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
metricValue := CollectedMetric{
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metricName,
|
||||
MetricLabels: c.labels,
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Time{Time: point.Time},
|
||||
Value: *resource.NewMilliQuantity(int64(point.Value*1000), resource.DecimalSI),
|
||||
},
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon"
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
@@ -26,9 +26,9 @@ func TestZMONCollectorNewCollector(t *testing.T) {
|
||||
|
||||
config := &MetricConfig{
|
||||
MetricTypeName: MetricTypeName{
|
||||
Name: ZMONCheckMetric,
|
||||
Metric: newMetricIdentifier(ZMONCheckMetric),
|
||||
},
|
||||
Labels: map[string]string{
|
||||
Config: map[string]string{
|
||||
zmonCheckIDLabelKey: "1234",
|
||||
zmonAggregatorsLabelKey: "max",
|
||||
zmonTagPrefixLabelKey + "alias": "cluster_alias",
|
||||
@@ -37,7 +37,7 @@ func TestZMONCollectorNewCollector(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
hpa := &autoscalingv2beta1.HorizontalPodAutoscaler{}
|
||||
hpa := &autoscalingv2.HorizontalPodAutoscaler{}
|
||||
|
||||
collector, err := collectPlugin.NewCollector(hpa, config, 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
@@ -50,39 +50,31 @@ func TestZMONCollectorNewCollector(t *testing.T) {
|
||||
require.Equal(t, []string{"max"}, zmonCollector.aggregators)
|
||||
require.Equal(t, map[string]string{"alias": "cluster_alias"}, zmonCollector.tags)
|
||||
|
||||
// check that annotations overwrites labels
|
||||
hpa.ObjectMeta = metav1.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
zmonKeyAnnotationKey: "annotation_key",
|
||||
zmonTagPrefixAnnotationKey + "alias": "cluster_alias_annotation",
|
||||
},
|
||||
}
|
||||
collector, err = collectPlugin.NewCollector(hpa, config, 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, collector)
|
||||
zmonCollector = collector.(*ZMONCollector)
|
||||
require.Equal(t, "annotation_key", zmonCollector.key)
|
||||
require.Equal(t, map[string]string{"alias": "cluster_alias_annotation"}, zmonCollector.tags)
|
||||
|
||||
// should fail if the metric name isn't ZMON
|
||||
config.Name = "non-zmon-check"
|
||||
config.Metric = newMetricIdentifier("non-zmon-check")
|
||||
_, err = collectPlugin.NewCollector(nil, config, 1*time.Second)
|
||||
require.Error(t, err)
|
||||
|
||||
// should fail if the check id is not specified.
|
||||
delete(config.Labels, zmonCheckIDLabelKey)
|
||||
config.Name = ZMONCheckMetric
|
||||
delete(config.Config, zmonCheckIDLabelKey)
|
||||
config.Metric.Name = ZMONCheckMetric
|
||||
_, err = collectPlugin.NewCollector(nil, config, 1*time.Second)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func newMetricIdentifier(metricName string) autoscalingv2.MetricIdentifier {
|
||||
selector := metav1.LabelSelector{}
|
||||
return autoscalingv2.MetricIdentifier{Name: metricName, Selector: &selector}
|
||||
}
|
||||
|
||||
func TestZMONCollectorGetMetrics(tt *testing.T) {
|
||||
|
||||
config := &MetricConfig{
|
||||
MetricTypeName: MetricTypeName{
|
||||
Name: ZMONCheckMetric,
|
||||
Type: "foo",
|
||||
Metric: newMetricIdentifier(ZMONCheckMetric),
|
||||
Type: "foo",
|
||||
},
|
||||
Labels: map[string]string{
|
||||
Config: map[string]string{
|
||||
zmonCheckIDLabelKey: "1234",
|
||||
zmonAggregatorsLabelKey: "max",
|
||||
zmonTagPrefixLabelKey + "alias": "cluster_alias",
|
||||
@@ -108,8 +100,8 @@ func TestZMONCollectorGetMetrics(tt *testing.T) {
|
||||
{
|
||||
Type: config.Type,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: config.Name,
|
||||
MetricLabels: config.Labels,
|
||||
MetricName: config.Metric.Name,
|
||||
MetricLabels: config.Metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Time{Time: time.Time{}},
|
||||
Value: *resource.NewMilliQuantity(int64(1.0)*1000, resource.DecimalSI),
|
||||
},
|
||||
@@ -125,7 +117,7 @@ func TestZMONCollectorGetMetrics(tt *testing.T) {
|
||||
dataPoints: ti.dataPoints,
|
||||
}
|
||||
|
||||
zmonCollector, err := NewZMONCollector(z, config, nil, 1*time.Second)
|
||||
zmonCollector, err := NewZMONCollector(z, config, 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics, _ := zmonCollector.GetMetrics()
|
||||
|
||||
@@ -0,0 +1,247 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
autoscaling "k8s.io/api/autoscaling/v2beta2"
|
||||
core "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/conversion"
|
||||
)
|
||||
|
||||
// from: https://github.com/kubernetes/kubernetes/blob/v1.14.4/pkg/apis/autoscaling/v2beta1/conversion.go
|
||||
|
||||
func Convert_autoscaling_MetricTarget_To_v2beta1_CrossVersionObjectReference(in *autoscaling.MetricTarget, out *autoscalingv2beta1.CrossVersionObjectReference, s conversion.Scope) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_v2beta1_CrossVersionObjectReference_To_autoscaling_MetricTarget(in *autoscalingv2beta1.CrossVersionObjectReference, out *autoscaling.MetricTarget, s conversion.Scope) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_v2beta1_ResourceMetricStatus_To_autoscaling_ResourceMetricStatus(in *autoscalingv2beta1.ResourceMetricStatus, out *autoscaling.ResourceMetricStatus, s conversion.Scope) error {
|
||||
out.Name = core.ResourceName(in.Name)
|
||||
utilization := in.CurrentAverageUtilization
|
||||
averageValue := in.CurrentAverageValue
|
||||
out.Current = autoscaling.MetricValueStatus{
|
||||
AverageValue: &averageValue,
|
||||
AverageUtilization: utilization,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_autoscaling_ResourceMetricStatus_To_v2beta1_ResourceMetricStatus(in *autoscaling.ResourceMetricStatus, out *autoscalingv2beta1.ResourceMetricStatus, s conversion.Scope) error {
|
||||
out.Name = v1.ResourceName(in.Name)
|
||||
out.CurrentAverageUtilization = in.Current.AverageUtilization
|
||||
if in.Current.AverageValue != nil {
|
||||
out.CurrentAverageValue = *in.Current.AverageValue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_v2beta1_ResourceMetricSource_To_autoscaling_ResourceMetricSource(in *autoscalingv2beta1.ResourceMetricSource, out *autoscaling.ResourceMetricSource, s conversion.Scope) error {
|
||||
out.Name = core.ResourceName(in.Name)
|
||||
utilization := in.TargetAverageUtilization
|
||||
averageValue := in.TargetAverageValue
|
||||
|
||||
var metricType autoscaling.MetricTargetType
|
||||
if utilization == nil {
|
||||
metricType = autoscaling.AverageValueMetricType
|
||||
} else {
|
||||
metricType = autoscaling.UtilizationMetricType
|
||||
}
|
||||
out.Target = autoscaling.MetricTarget{
|
||||
Type: metricType,
|
||||
AverageValue: averageValue,
|
||||
AverageUtilization: utilization,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_autoscaling_ResourceMetricSource_To_v2beta1_ResourceMetricSource(in *autoscaling.ResourceMetricSource, out *autoscalingv2beta1.ResourceMetricSource, s conversion.Scope) error {
|
||||
out.Name = v1.ResourceName(in.Name)
|
||||
out.TargetAverageUtilization = in.Target.AverageUtilization
|
||||
out.TargetAverageValue = in.Target.AverageValue
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_autoscaling_ExternalMetricSource_To_v2beta1_ExternalMetricSource(in *autoscaling.ExternalMetricSource, out *autoscalingv2beta1.ExternalMetricSource, s conversion.Scope) error {
|
||||
out.MetricName = in.Metric.Name
|
||||
out.TargetValue = in.Target.Value
|
||||
out.TargetAverageValue = in.Target.AverageValue
|
||||
out.MetricSelector = in.Metric.Selector
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_v2beta1_ExternalMetricSource_To_autoscaling_ExternalMetricSource(in *autoscalingv2beta1.ExternalMetricSource, out *autoscaling.ExternalMetricSource, s conversion.Scope) error {
|
||||
value := in.TargetValue
|
||||
averageValue := in.TargetAverageValue
|
||||
|
||||
var metricType autoscaling.MetricTargetType
|
||||
if value == nil {
|
||||
metricType = autoscaling.AverageValueMetricType
|
||||
} else {
|
||||
metricType = autoscaling.ValueMetricType
|
||||
}
|
||||
|
||||
out.Target = autoscaling.MetricTarget{
|
||||
Type: metricType,
|
||||
Value: value,
|
||||
AverageValue: averageValue,
|
||||
}
|
||||
|
||||
out.Metric = autoscaling.MetricIdentifier{
|
||||
Name: in.MetricName,
|
||||
Selector: in.MetricSelector,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_autoscaling_ObjectMetricSource_To_v2beta1_ObjectMetricSource(in *autoscaling.ObjectMetricSource, out *autoscalingv2beta1.ObjectMetricSource, s conversion.Scope) error {
|
||||
if in.Target.Value != nil {
|
||||
out.TargetValue = *in.Target.Value
|
||||
}
|
||||
out.AverageValue = in.Target.AverageValue
|
||||
|
||||
out.Target = autoscalingv2beta1.CrossVersionObjectReference{
|
||||
Kind: in.DescribedObject.Kind,
|
||||
Name: in.DescribedObject.Name,
|
||||
APIVersion: in.DescribedObject.APIVersion,
|
||||
}
|
||||
out.MetricName = in.Metric.Name
|
||||
out.Selector = in.Metric.Selector
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_v2beta1_ObjectMetricSource_To_autoscaling_ObjectMetricSource(in *autoscalingv2beta1.ObjectMetricSource, out *autoscaling.ObjectMetricSource, s conversion.Scope) error {
|
||||
var metricType autoscaling.MetricTargetType
|
||||
if in.AverageValue == nil {
|
||||
metricType = autoscaling.ValueMetricType
|
||||
} else {
|
||||
metricType = autoscaling.AverageValueMetricType
|
||||
}
|
||||
out.Target = autoscaling.MetricTarget{
|
||||
Type: metricType,
|
||||
Value: &in.TargetValue,
|
||||
AverageValue: in.AverageValue,
|
||||
}
|
||||
out.DescribedObject = autoscaling.CrossVersionObjectReference{
|
||||
Kind: in.Target.Kind,
|
||||
Name: in.Target.Name,
|
||||
APIVersion: in.Target.APIVersion,
|
||||
}
|
||||
out.Metric = autoscaling.MetricIdentifier{
|
||||
Name: in.MetricName,
|
||||
Selector: in.Selector,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_autoscaling_PodsMetricSource_To_v2beta1_PodsMetricSource(in *autoscaling.PodsMetricSource, out *autoscalingv2beta1.PodsMetricSource, s conversion.Scope) error {
|
||||
if in.Target.AverageValue != nil {
|
||||
targetAverageValue := *in.Target.AverageValue
|
||||
out.TargetAverageValue = targetAverageValue
|
||||
}
|
||||
|
||||
out.MetricName = in.Metric.Name
|
||||
out.Selector = in.Metric.Selector
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_v2beta1_PodsMetricSource_To_autoscaling_PodsMetricSource(in *autoscalingv2beta1.PodsMetricSource, out *autoscaling.PodsMetricSource, s conversion.Scope) error {
|
||||
targetAverageValue := &in.TargetAverageValue
|
||||
var metricType autoscaling.MetricTargetType
|
||||
metricType = autoscaling.AverageValueMetricType
|
||||
|
||||
out.Target = autoscaling.MetricTarget{
|
||||
Type: metricType,
|
||||
AverageValue: targetAverageValue,
|
||||
}
|
||||
out.Metric = autoscaling.MetricIdentifier{
|
||||
Name: in.MetricName,
|
||||
Selector: in.Selector,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_autoscaling_ExternalMetricStatus_To_v2beta1_ExternalMetricStatus(in *autoscaling.ExternalMetricStatus, out *autoscalingv2beta1.ExternalMetricStatus, s conversion.Scope) error {
|
||||
if &in.Current.AverageValue != nil {
|
||||
out.CurrentAverageValue = in.Current.AverageValue
|
||||
}
|
||||
out.MetricName = in.Metric.Name
|
||||
if in.Current.Value != nil {
|
||||
out.CurrentValue = *in.Current.Value
|
||||
}
|
||||
out.MetricSelector = in.Metric.Selector
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_v2beta1_ExternalMetricStatus_To_autoscaling_ExternalMetricStatus(in *autoscalingv2beta1.ExternalMetricStatus, out *autoscaling.ExternalMetricStatus, s conversion.Scope) error {
|
||||
value := in.CurrentValue
|
||||
averageValue := in.CurrentAverageValue
|
||||
out.Current = autoscaling.MetricValueStatus{
|
||||
Value: &value,
|
||||
AverageValue: averageValue,
|
||||
}
|
||||
out.Metric = autoscaling.MetricIdentifier{
|
||||
Name: in.MetricName,
|
||||
Selector: in.MetricSelector,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_autoscaling_ObjectMetricStatus_To_v2beta1_ObjectMetricStatus(in *autoscaling.ObjectMetricStatus, out *autoscalingv2beta1.ObjectMetricStatus, s conversion.Scope) error {
|
||||
if in.Current.Value != nil {
|
||||
out.CurrentValue = *in.Current.Value
|
||||
}
|
||||
out.Target = autoscalingv2beta1.CrossVersionObjectReference{
|
||||
Kind: in.DescribedObject.Kind,
|
||||
Name: in.DescribedObject.Name,
|
||||
APIVersion: in.DescribedObject.APIVersion,
|
||||
}
|
||||
out.MetricName = in.Metric.Name
|
||||
out.Selector = in.Metric.Selector
|
||||
if in.Current.AverageValue != nil {
|
||||
currentAverageValue := *in.Current.AverageValue
|
||||
out.AverageValue = ¤tAverageValue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_v2beta1_ObjectMetricStatus_To_autoscaling_ObjectMetricStatus(in *autoscalingv2beta1.ObjectMetricStatus, out *autoscaling.ObjectMetricStatus, s conversion.Scope) error {
|
||||
out.Current = autoscaling.MetricValueStatus{
|
||||
Value: &in.CurrentValue,
|
||||
AverageValue: in.AverageValue,
|
||||
}
|
||||
out.DescribedObject = autoscaling.CrossVersionObjectReference{
|
||||
Kind: in.Target.Kind,
|
||||
Name: in.Target.Name,
|
||||
APIVersion: in.Target.APIVersion,
|
||||
}
|
||||
out.Metric = autoscaling.MetricIdentifier{
|
||||
Name: in.MetricName,
|
||||
Selector: in.Selector,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_autoscaling_PodsMetricStatus_To_v2beta1_PodsMetricStatus(in *autoscaling.PodsMetricStatus, out *autoscalingv2beta1.PodsMetricStatus, s conversion.Scope) error {
|
||||
if in.Current.AverageValue != nil {
|
||||
out.CurrentAverageValue = *in.Current.AverageValue
|
||||
}
|
||||
out.MetricName = in.Metric.Name
|
||||
out.Selector = in.Metric.Selector
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_v2beta1_PodsMetricStatus_To_autoscaling_PodsMetricStatus(in *autoscalingv2beta1.PodsMetricStatus, out *autoscaling.PodsMetricStatus, s conversion.Scope) error {
|
||||
out.Current = autoscaling.MetricValueStatus{
|
||||
AverageValue: &in.CurrentAverageValue,
|
||||
}
|
||||
out.Metric = autoscaling.MetricIdentifier{
|
||||
Name: in.MetricName,
|
||||
Selector: in.Selector,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
+28
-16
@@ -12,7 +12,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/recorder"
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@@ -46,10 +46,6 @@ var (
|
||||
})
|
||||
)
|
||||
|
||||
type objectCollector struct {
|
||||
ObjectReference *autoscalingv2beta1.CrossVersionObjectReference
|
||||
}
|
||||
|
||||
// HPAProvider is a base provider for initializing metric collectors based on
|
||||
// HPA resources.
|
||||
type HPAProvider struct {
|
||||
@@ -58,7 +54,7 @@ type HPAProvider struct {
|
||||
collectorScheduler *CollectorScheduler
|
||||
collectorInterval time.Duration
|
||||
metricSink chan metricCollection
|
||||
hpaCache map[resourceReference]autoscalingv2beta1.HorizontalPodAutoscaler
|
||||
hpaCache map[resourceReference]autoscalingv2.HorizontalPodAutoscaler
|
||||
metricStore *MetricStore
|
||||
collectorFactory *collector.CollectorFactory
|
||||
recorder kube_record.EventRecorder
|
||||
@@ -125,18 +121,34 @@ func (p *HPAProvider) updateHPAs() error {
|
||||
return err
|
||||
}
|
||||
|
||||
newHPACache := make(map[resourceReference]autoscalingv2beta1.HorizontalPodAutoscaler, len(hpas.Items))
|
||||
newHPACache := make(map[resourceReference]autoscalingv2.HorizontalPodAutoscaler, len(hpas.Items))
|
||||
|
||||
newHPAs := 0
|
||||
|
||||
for _, hpa := range hpas.Items {
|
||||
hpa := hpa
|
||||
for _, hpav1 := range hpas.Items {
|
||||
hpav1 := hpav1
|
||||
hpa := autoscalingv2.HorizontalPodAutoscaler{}
|
||||
err := Convert_v2beta1_HorizontalPodAutoscaler_To_autoscaling_HorizontalPodAutoscaler(&hpav1, &hpa, nil)
|
||||
if err != nil {
|
||||
p.logger.Errorf("Failed to convert HPA to v2beta2: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
resourceRef := resourceReference{
|
||||
Name: hpa.Name,
|
||||
Namespace: hpa.Namespace,
|
||||
}
|
||||
|
||||
if cachedHPA, ok := p.hpaCache[resourceRef]; !ok || !equalHPA(cachedHPA, hpa) {
|
||||
cachedHPA, ok := p.hpaCache[resourceRef]
|
||||
hpaUpdated := !equalHPA(cachedHPA, hpa)
|
||||
if !ok || hpaUpdated {
|
||||
// if the hpa has changed then remove the previous
|
||||
// scheduled collector.
|
||||
if hpaUpdated {
|
||||
p.logger.Infof("Removing previously scheduled metrics collector: %s", resourceRef)
|
||||
p.collectorScheduler.Remove(resourceRef)
|
||||
}
|
||||
|
||||
metricConfigs, err := collector.ParseHPAMetrics(&hpa)
|
||||
if err != nil {
|
||||
p.logger.Errorf("Failed to parse HPA metrics: %v", err)
|
||||
@@ -187,7 +199,7 @@ func (p *HPAProvider) updateHPAs() error {
|
||||
}
|
||||
|
||||
// equalHPA returns true if two HPAs are identical (apart from their status).
|
||||
func equalHPA(a, b autoscalingv2beta1.HorizontalPodAutoscaler) bool {
|
||||
func equalHPA(a, b autoscalingv2.HorizontalPodAutoscaler) bool {
|
||||
// reset resource version to not compare it since this will change
|
||||
// whenever the status of the object is updated. We only want to
|
||||
// compare the metadata and the spec.
|
||||
@@ -225,15 +237,15 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
|
||||
p.logger.Infof("Collected %d new metric(s)", len(collection.Values))
|
||||
for _, value := range collection.Values {
|
||||
switch value.Type {
|
||||
case autoscalingv2beta1.ObjectMetricSourceType, autoscalingv2beta1.PodsMetricSourceType:
|
||||
case autoscalingv2.ObjectMetricSourceType, autoscalingv2.PodsMetricSourceType:
|
||||
p.logger.Infof("Collected new custom metric '%s' (%s) for %s %s/%s",
|
||||
value.Custom.MetricName,
|
||||
value.Custom.Metric.Name,
|
||||
value.Custom.Value.String(),
|
||||
value.Custom.DescribedObject.Kind,
|
||||
value.Custom.DescribedObject.Namespace,
|
||||
value.Custom.DescribedObject.Name,
|
||||
)
|
||||
case autoscalingv2beta1.ExternalMetricSourceType:
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
p.logger.Infof("Collected new external metric '%s' (%s) [%s]",
|
||||
value.External.MetricName,
|
||||
value.External.Value.String(),
|
||||
@@ -250,7 +262,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
|
||||
}
|
||||
|
||||
// GetMetricByName gets a single metric by name.
|
||||
func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo) (*custom_metrics.MetricValue, error) {
|
||||
func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
|
||||
metric := p.metricStore.GetMetricsByName(name, info)
|
||||
if metric == nil {
|
||||
return nil, provider.NewMetricNotFoundForError(info.GroupResource, info.Metric, name.Name)
|
||||
@@ -260,7 +272,7 @@ func (p *HPAProvider) GetMetricByName(name types.NamespacedName, info provider.C
|
||||
|
||||
// GetMetricBySelector returns metrics for namespaced resources by
|
||||
// label selector.
|
||||
func (p *HPAProvider) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo) (*custom_metrics.MetricValueList, error) {
|
||||
func (p *HPAProvider) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
|
||||
return p.metricStore.GetMetricsBySelector(namespace, selector, info), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
|
||||
autoscalingv1 "k8s.io/api/autoscaling/v2beta1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
)
|
||||
|
||||
type mockCollectorPlugin struct{}
|
||||
|
||||
func (m mockCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *collector.MetricConfig, interval time.Duration) (collector.Collector, error) {
|
||||
return mockCollector{}, nil
|
||||
}
|
||||
|
||||
type mockCollector struct{}
|
||||
|
||||
func (c mockCollector) GetMetrics() ([]collector.CollectedMetric, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c mockCollector) Interval() time.Duration {
|
||||
return 1 * time.Second
|
||||
}
|
||||
|
||||
func TestUpdateHPAs(t *testing.T) {
|
||||
value := resource.MustParse("1k")
|
||||
|
||||
hpa := &autoscalingv1.HorizontalPodAutoscaler{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "hpa1",
|
||||
Namespace: "default",
|
||||
Annotations: map[string]string{
|
||||
"metric-config.pods.requests-per-second.json-path/json-key": "$.http_server.rps",
|
||||
"metric-config.pods.requests-per-second.json-path/path": "/metrics",
|
||||
"metric-config.pods.requests-per-second.json-path/port": "9090",
|
||||
},
|
||||
},
|
||||
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
|
||||
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
|
||||
Kind: "Deployment",
|
||||
Name: "app",
|
||||
APIVersion: "apps/v1",
|
||||
},
|
||||
MinReplicas: &[]int32{1}[0],
|
||||
MaxReplicas: 10,
|
||||
Metrics: []autoscalingv1.MetricSpec{
|
||||
{
|
||||
Type: autoscalingv1.PodsMetricSourceType,
|
||||
Pods: &autoscalingv1.PodsMetricSource{
|
||||
MetricName: "requests-per-second",
|
||||
TargetAverageValue: value,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
|
||||
var err error
|
||||
hpa, err = fakeClient.AutoscalingV2beta1().HorizontalPodAutoscalers("default").Create(hpa)
|
||||
require.NoError(t, err)
|
||||
|
||||
collectorFactory := collector.NewCollectorFactory()
|
||||
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
|
||||
require.NoError(t, err)
|
||||
|
||||
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory)
|
||||
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)
|
||||
|
||||
err = provider.updateHPAs()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, provider.collectorScheduler.table, 1)
|
||||
|
||||
// update HPA
|
||||
hpa.Annotations["metric-config.pods.requests-per-second.json-path/port"] = "8080"
|
||||
_, err = fakeClient.AutoscalingV2beta1().HorizontalPodAutoscalers("default").Update(hpa)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = provider.updateHPAs()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, provider.collectorScheduler.table, 1)
|
||||
}
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
|
||||
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@@ -20,9 +20,8 @@ import (
|
||||
// customMetricsStoredMetric is a wrapper around custom_metrics.MetricValue with a metricsTTL used
|
||||
// to clean up stale metrics from the customMetricsStore.
|
||||
type customMetricsStoredMetric struct {
|
||||
Value custom_metrics.MetricValue
|
||||
Labels map[string]string
|
||||
TTL time.Time
|
||||
Value custom_metrics.MetricValue
|
||||
TTL time.Time
|
||||
}
|
||||
|
||||
type externalMetricsStoredMetric struct {
|
||||
@@ -50,15 +49,15 @@ func NewMetricStore(ttlCalculator func() time.Time) *MetricStore {
|
||||
// Insert inserts a collected metric into the metric customMetricsStore.
|
||||
func (s *MetricStore) Insert(value collector.CollectedMetric) {
|
||||
switch value.Type {
|
||||
case autoscalingv2beta1.ObjectMetricSourceType, autoscalingv2beta1.PodsMetricSourceType:
|
||||
s.insertCustomMetric(value.Custom, value.Labels)
|
||||
case autoscalingv2beta1.ExternalMetricSourceType:
|
||||
case autoscalingv2.ObjectMetricSourceType, autoscalingv2.PodsMetricSourceType:
|
||||
s.insertCustomMetric(value.Custom)
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
s.insertExternalMetric(value.External)
|
||||
}
|
||||
}
|
||||
|
||||
// insertCustomMetric inserts a custom metric plus labels into the store.
|
||||
func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue, labels map[string]string) {
|
||||
func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
@@ -77,15 +76,14 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue, label
|
||||
}
|
||||
|
||||
metric := customMetricsStoredMetric{
|
||||
Value: value,
|
||||
Labels: labels,
|
||||
TTL: s.metricsTTLCalculator(), // TODO: make TTL configurable
|
||||
Value: value,
|
||||
TTL: s.metricsTTLCalculator(), // TODO: make TTL configurable
|
||||
}
|
||||
|
||||
metrics, ok := s.customMetricsStore[value.MetricName]
|
||||
metrics, ok := s.customMetricsStore[value.Metric.Name]
|
||||
if !ok {
|
||||
s.customMetricsStore[value.MetricName] = map[schema.GroupResource]map[string]map[string]customMetricsStoredMetric{
|
||||
groupResource: map[string]map[string]customMetricsStoredMetric{
|
||||
s.customMetricsStore[value.Metric.Name] = map[schema.GroupResource]map[string]map[string]customMetricsStoredMetric{
|
||||
groupResource: {
|
||||
value.DescribedObject.Namespace: map[string]customMetricsStoredMetric{
|
||||
value.DescribedObject.Name: metric,
|
||||
},
|
||||
@@ -97,7 +95,7 @@ func (s *MetricStore) insertCustomMetric(value custom_metrics.MetricValue, label
|
||||
group, ok := metrics[groupResource]
|
||||
if !ok {
|
||||
metrics[groupResource] = map[string]map[string]customMetricsStoredMetric{
|
||||
value.DescribedObject.Namespace: map[string]customMetricsStoredMetric{
|
||||
value.DescribedObject.Namespace: {
|
||||
value.DescribedObject.Name: metric,
|
||||
},
|
||||
}
|
||||
@@ -157,25 +155,25 @@ func (s *MetricStore) GetMetricsBySelector(namespace string, selector labels.Sel
|
||||
|
||||
metrics, ok := s.customMetricsStore[info.Metric]
|
||||
if !ok {
|
||||
return nil
|
||||
return &custom_metrics.MetricValueList{}
|
||||
}
|
||||
|
||||
group, ok := metrics[info.GroupResource]
|
||||
if !ok {
|
||||
return nil
|
||||
return &custom_metrics.MetricValueList{}
|
||||
}
|
||||
|
||||
if !info.Namespaced {
|
||||
for _, metricMap := range group {
|
||||
for _, metric := range metricMap {
|
||||
if selector.Matches(labels.Set(metric.Labels)) {
|
||||
if selector.Matches(labels.Set(metric.Value.Metric.Selector.MatchLabels)) {
|
||||
matchedMetrics = append(matchedMetrics, metric.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if metricMap, ok := group[namespace]; ok {
|
||||
for _, metric := range metricMap {
|
||||
if selector.Matches(labels.Set(metric.Labels)) {
|
||||
if metric.Value.Metric.Selector != nil && selector.Matches(labels.Set(metric.Value.Metric.Selector.MatchLabels)) {
|
||||
matchedMetrics = append(matchedMetrics, metric.Value)
|
||||
}
|
||||
}
|
||||
@@ -222,13 +220,13 @@ func (s *MetricStore) ListAllMetrics() []provider.CustomMetricInfo {
|
||||
|
||||
metrics := make([]provider.CustomMetricInfo, 0, len(s.customMetricsStore))
|
||||
|
||||
for metricName, customMetricsStoredMetrics := range s.customMetricsStore {
|
||||
for metric, customMetricsStoredMetrics := range s.customMetricsStore {
|
||||
for groupResource, group := range customMetricsStoredMetrics {
|
||||
for namespace := range group {
|
||||
metric := provider.CustomMetricInfo{
|
||||
GroupResource: groupResource,
|
||||
Namespaced: namespace != "",
|
||||
Metric: metricName,
|
||||
Metric: metric,
|
||||
}
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
|
||||
@@ -1,20 +1,29 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
|
||||
"k8s.io/api/autoscaling/v2beta1"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func newMetricIdentifier(metricName string) custom_metrics.MetricIdentifier {
|
||||
selector := metav1.LabelSelector{}
|
||||
return custom_metrics.MetricIdentifier{
|
||||
Name: metricName,
|
||||
Selector: &selector,
|
||||
}
|
||||
}
|
||||
func TestInternalMetricStorage(t *testing.T) {
|
||||
var metricStoreTests = []struct {
|
||||
test string
|
||||
@@ -33,10 +42,10 @@ func TestInternalMetricStorage(t *testing.T) {
|
||||
{
|
||||
test: "insert/list/get a namespaced resource metric",
|
||||
insert: collector.CollectedMetric{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Namespace: "default",
|
||||
@@ -80,10 +89,10 @@ func TestInternalMetricStorage(t *testing.T) {
|
||||
{
|
||||
test: "insert/list/get a Pod metric",
|
||||
insert: collector.CollectedMetric{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Namespace: "default",
|
||||
@@ -136,10 +145,10 @@ func TestInternalMetricStorage(t *testing.T) {
|
||||
{
|
||||
test: "insert/list/get a non-namespaced resource metric",
|
||||
insert: collector.CollectedMetric{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Kind: "Node",
|
||||
@@ -182,10 +191,10 @@ func TestInternalMetricStorage(t *testing.T) {
|
||||
{
|
||||
test: "insert/list/get an Ingress metric",
|
||||
insert: collector.CollectedMetric{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Kind: "Ingress",
|
||||
@@ -286,10 +295,10 @@ func TestMultipleMetricValues(t *testing.T) {
|
||||
test: "insert/list/get multiple Ingress metrics",
|
||||
insert: []collector.CollectedMetric{
|
||||
{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Kind: "Ingress",
|
||||
@@ -298,10 +307,10 @@ func TestMultipleMetricValues(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(1, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(1, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Kind: "Ingress",
|
||||
@@ -355,10 +364,10 @@ func TestMultipleMetricValues(t *testing.T) {
|
||||
test: "insert/list/get multiple namespaced resource metrics",
|
||||
insert: []collector.CollectedMetric{
|
||||
{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Namespace: "default",
|
||||
@@ -368,10 +377,10 @@ func TestMultipleMetricValues(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(1, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(1, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Namespace: "default",
|
||||
@@ -489,10 +498,10 @@ func TestCustomMetricsStorageErrors(t *testing.T) {
|
||||
{
|
||||
test: "test that not all Kinds are mapped to a group/resource",
|
||||
insert: collector.CollectedMetric{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Namespace: "default",
|
||||
@@ -559,7 +568,7 @@ func TestCustomMetricsStorageErrors(t *testing.T) {
|
||||
require.Nil(t, metric)
|
||||
|
||||
metrics := metricsStore.GetMetricsBySelector(tc.byLabel.namespace, tc.byLabel.selector, tc.byLabel.info)
|
||||
require.Nil(t, metrics)
|
||||
require.Equal(t, &custom_metrics.MetricValueList{}, metrics)
|
||||
|
||||
})
|
||||
}
|
||||
@@ -581,10 +590,10 @@ func TestCustomMetricsStorageErrors(t *testing.T) {
|
||||
test: "insert/list/get multiple metrics in different groups",
|
||||
insert: []collector.CollectedMetric{
|
||||
{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Namespace: "default",
|
||||
@@ -594,10 +603,10 @@ func TestCustomMetricsStorageErrors(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(1, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(1, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Namespace: "default",
|
||||
@@ -607,10 +616,10 @@ func TestCustomMetricsStorageErrors(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(1, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(1, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Namespace: "new-namespace",
|
||||
@@ -684,7 +693,7 @@ func TestExternalMetricStorage(t *testing.T) {
|
||||
{
|
||||
test: "insert/list/get an external metric",
|
||||
insert: collector.CollectedMetric{
|
||||
Type: v2beta1.MetricSourceType("External"),
|
||||
Type: autoscalingv2.MetricSourceType("External"),
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
@@ -744,7 +753,7 @@ func TestMultipleExternalMetricStorage(t *testing.T) {
|
||||
test: "the latest value overrides the last one",
|
||||
insert: []collector.CollectedMetric{
|
||||
{
|
||||
Type: v2beta1.MetricSourceType("External"),
|
||||
Type: autoscalingv2.MetricSourceType("External"),
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
@@ -752,7 +761,7 @@ func TestMultipleExternalMetricStorage(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: v2beta1.MetricSourceType("External"),
|
||||
Type: autoscalingv2.MetricSourceType("External"),
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(1, ""),
|
||||
@@ -808,10 +817,10 @@ func TestMetricsExpiration(t *testing.T) {
|
||||
})
|
||||
|
||||
customMetric := collector.CollectedMetric{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Kind: "Node",
|
||||
@@ -821,7 +830,7 @@ func TestMetricsExpiration(t *testing.T) {
|
||||
}
|
||||
|
||||
externalMetric := collector.CollectedMetric{
|
||||
Type: v2beta1.MetricSourceType("External"),
|
||||
Type: autoscalingv2.MetricSourceType("External"),
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
@@ -847,10 +856,10 @@ func TestMetricsNonExpiration(t *testing.T) {
|
||||
})
|
||||
|
||||
customMetric := collector.CollectedMetric{
|
||||
Type: v2beta1.MetricSourceType("Object"),
|
||||
Type: autoscalingv2.MetricSourceType("Object"),
|
||||
Custom: custom_metrics.MetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
Metric: newMetricIdentifier("metric-per-unit"),
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
DescribedObject: custom_metrics.ObjectReference{
|
||||
Name: "metricObject",
|
||||
Kind: "Node",
|
||||
@@ -860,7 +869,7 @@ func TestMetricsNonExpiration(t *testing.T) {
|
||||
}
|
||||
|
||||
externalMetric := collector.CollectedMetric{
|
||||
Type: v2beta1.MetricSourceType("External"),
|
||||
Type: autoscalingv2.MetricSourceType("External"),
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: "metric-per-unit",
|
||||
Value: *resource.NewQuantity(0, ""),
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user