mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2026-03-10 10:10:32 +00:00
Compare commits
72 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 49f9f5ffe0 | |||
| 43dd2e9741 | |||
| 4aeebc853c | |||
| 45a09c12a0 | |||
| 789c2dee79 | |||
| bbe5a8a1f7 | |||
| 32e9a39be0 | |||
| afea8de1d9 | |||
| a17ad004d3 | |||
| ec77258ba4 | |||
| 3ef92536c1 | |||
| ae5c4af76f | |||
| b130ce5fa7 | |||
| 8cd076b01d | |||
| 15cffaee88 | |||
| 82a96580a2 | |||
| 08825c4920 | |||
| ac3c500d84 | |||
| 656a05fe07 | |||
| e54f3b88cf | |||
| f5124993a2 | |||
| 7c1339a6f2 | |||
| fc2f6aa5df | |||
| 7448688788 | |||
| 0e1302a97e | |||
| 749c1a7e25 | |||
| e7eb21a773 | |||
| 2c3247bf57 | |||
| 6860217a71 | |||
| a66ce04128 | |||
| 9eabbd53d9 | |||
| ec8622c028 | |||
| 166865edbb | |||
| 85f1c3e13d | |||
| 1a5297d6b2 | |||
| be0e0d485e | |||
| a07eb0b79c | |||
| 34dc968e77 | |||
| 9174f2550a | |||
| 6c4dfd682b | |||
| aa3f8ab969 | |||
| 437bca4de8 | |||
| c2fe13d21d | |||
| bbfd982fb6 | |||
| e9112a7114 | |||
| 2c526dd498 | |||
| 29b782160d | |||
| 28f3d96061 | |||
| 24b7276282 | |||
| 542d90d9fe | |||
| 0f359920af | |||
| 54e2d2d564 | |||
| 16ec43c361 | |||
| ffcbfcee48 | |||
| 69f95534e8 | |||
| c2179a35ba | |||
| 35e3fe83e8 | |||
| 69df60e724 | |||
| d171e049bf | |||
| b89ca19e6a | |||
| a276b64576 | |||
| ff6d479f1a | |||
| cd986058e4 | |||
| d6a33fed63 | |||
| b8532b756b | |||
| f28653de74 | |||
| 2f5d3f5a42 | |||
| 153d754353 | |||
| 02ec2282ab | |||
| 65dd585813 | |||
| aa7b64e637 | |||
| 7633ac551e |
@@ -0,0 +1,91 @@
|
||||
name: gh-package-deploy
|
||||
permissions: {}
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
tags:
|
||||
- '*'
|
||||
|
||||
env:
|
||||
REGISTRY: ghcr.io
|
||||
IMAGE_NAME: "${{ github.repository }}"
|
||||
|
||||
jobs:
|
||||
docker:
|
||||
if: ${{ github.actor != 'dependabot[bot]' }}
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
# Adding this block will overridw default values to None if not specified in the block
|
||||
# https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
|
||||
permissions:
|
||||
contents: read
|
||||
actions: read
|
||||
packages: write # to push packages
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9
|
||||
|
||||
- uses: actions/setup-go@fac708d6674e30b6ba41289acaab6d4b75aa0753
|
||||
with:
|
||||
# https://www.npmjs.com/package/semver#caret-ranges-123-025-004
|
||||
go-version: '^1.21'
|
||||
|
||||
- name: Login to Github Container Registry
|
||||
uses: docker/login-action@465a07811f14bebb1938fbed4728c6a1ff8901fc
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- uses: actions-ecosystem/action-get-latest-tag@b7c32daec3395a9616f88548363a42652b22d435
|
||||
id: get-latest-tag
|
||||
|
||||
- name: Build binaries
|
||||
run: |
|
||||
make build.linux.amd64 build.linux.arm64
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@2b82ce82d56a2a04d2637cd93a637ae1b359c0a7
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@4c0219f9ac95b02789c1075625400b2acbff50b1
|
||||
|
||||
- name: Login to GitHub Container Registry
|
||||
if: github.event_name != 'pull_request'
|
||||
uses: docker/login-action@465a07811f14bebb1938fbed4728c6a1ff8901fc
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Docker meta
|
||||
uses: docker/metadata-action@818d4b7b91585d195f67373fd9cb0332e31a7175
|
||||
id: meta
|
||||
with:
|
||||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
|
||||
tags: |
|
||||
type=semver,pattern=v{{version}}
|
||||
type=semver,pattern=v{{major}}.{{minor}}
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@2eb1c1961a95fc15694676618e422e8ba1d63825
|
||||
with:
|
||||
context: .
|
||||
build-args: BASE_IMAGE=alpine:3
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' && startsWith(github.ref, 'refs/tags/v') }}
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
|
||||
# Build and push latest tag
|
||||
- name: Build and push latest
|
||||
uses: docker/build-push-action@2eb1c1961a95fc15694676618e422e8ba1d63825
|
||||
with:
|
||||
context: .
|
||||
build-args: BASE_IMAGE=alpine:3
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
@@ -4,7 +4,6 @@ run:
|
||||
linters:
|
||||
disable-all: true
|
||||
enable:
|
||||
- deadcode
|
||||
- errcheck
|
||||
- gosimple
|
||||
- govet
|
||||
@@ -12,4 +11,3 @@ linters:
|
||||
- staticcheck
|
||||
- typecheck
|
||||
- unused
|
||||
- varcheck
|
||||
|
||||
+1
-1
@@ -50,7 +50,7 @@ contribution is in line with our goals.
|
||||
- Make sure you sign-off on your commits `git commit -s -m "adding X to change Y"`
|
||||
- Write good commit messages (see below).
|
||||
- Push your changes to a topic branch in your fork of the repository.
|
||||
- As you push your changes, update the pull request with new infomation and tasks as you complete them
|
||||
- As you push your changes, update the pull request with new information and tasks as you complete them
|
||||
- Project maintainers might comment on your work as you progress
|
||||
- When you are done, remove the `work in progess` label and ping the maintainers for a review
|
||||
- Your pull request must receive a :thumbsup: from two [maintainers](MAINTAINERS)
|
||||
|
||||
+1
-1
@@ -1,4 +1,4 @@
|
||||
ARG BASE_IMAGE=registry.opensource.zalan.do/library/alpine-3.13:latest
|
||||
ARG BASE_IMAGE=registry.opensource.zalan.do/library/alpine-3:latest
|
||||
FROM ${BASE_IMAGE}
|
||||
LABEL maintainer="Team Teapot @ Zalando SE <team-teapot@zalando.de>"
|
||||
|
||||
|
||||
@@ -402,6 +402,64 @@ the `backend` label under `matchLabels` for the metric. The ingress annotation
|
||||
where the backend weights can be obtained can be specified through the flag
|
||||
`--skipper-backends-annotation`.
|
||||
|
||||
## External RPS collector
|
||||
|
||||
The External RPS collector, like Skipper collector, is a simple wrapper around the Prometheus collector to
|
||||
make it easy to define an HPA for scaling based on the RPS measured for a given hostname. When
|
||||
[skipper](https://github.com/zalando/skipper) is used as the ingress
|
||||
implementation in your cluster everything should work automatically, in case another reverse proxy is used as ingress, like [Nginx](https://github.com/kubernetes/ingress-nginx) for example, its necessary to configure which prometheus metric should be used through `--external-rps-metric-name <metric-name>` flag. Assuming `skipper-ingress` is being used or the appropriate metric name is passed using the flag mentioned previously this collector provides the correct Prometheus queries out of the
|
||||
box so users don't have to define those manually.
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric | Description | Type | Kind | K8s Versions |
|
||||
| ------------ | -------------- | ------- | -- | -- |
|
||||
| `requests-per-second` | Scale based on requests per second for a certain hostname. | External | | `>=1.12` |
|
||||
|
||||
### Example: External Metric
|
||||
|
||||
This is an example of an HPA that will scale based on `requests-per-second` for the RPS measured in the hostnames called: `www.example1.com` and `www.example2.com`; and weighted by 42%.
|
||||
|
||||
```yaml
|
||||
apiVersion: autoscaling/v2
|
||||
kind: HorizontalPodAutoscaler
|
||||
metadata:
|
||||
name: myapp-hpa
|
||||
annotations:
|
||||
metric-config.external.example-rps.requests-per-second/hostname: www.example1.com,www.example2.com
|
||||
metric-config.external.example-rps.requests-per-second/weight: "42"
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
name: custom-metrics-consumer
|
||||
minReplicas: 1
|
||||
maxReplicas: 10
|
||||
metrics:
|
||||
- type: External
|
||||
external:
|
||||
metric:
|
||||
name: example-rps
|
||||
selector:
|
||||
matchLabels:
|
||||
type: requests-per-second
|
||||
target:
|
||||
type: AverageValue
|
||||
averageValue: "42"
|
||||
```
|
||||
### Multiple hostnames per metric
|
||||
|
||||
This metric supports a relation of n:1 between hostnames and metrics. The way it works is the measured RPS is the sum of the RPS rate of each of the specified hostnames. This value is further modified by the weight parameter explained below.
|
||||
|
||||
### Metric weighting based on backend
|
||||
|
||||
There are ingress-controllers, like skipper-ingress, that supports sending traffic to different backends based on some kind of configuration, in case of skipper annotations
|
||||
present on the `Ingress` object, or weights on the RouteGroup backends. By
|
||||
default the number of replicas will be calculated based on the full traffic
|
||||
served by these components. If however only the traffic being routed to
|
||||
a specific hostname should be used then the weight for the configured hostname(s) might be specified via the `weight` annotation `metric-config.external.<metric-name>.request-per-second/weight` for the metric being configured.
|
||||
|
||||
|
||||
## InfluxDB collector
|
||||
|
||||
The InfluxDB collector maps [Flux](https://github.com/influxdata/flux) queries to metrics that can be used for scaling.
|
||||
@@ -644,6 +702,95 @@ you need to define a `key` or other `tag` with a "star" query syntax like
|
||||
metric label definitions. If both annotations and corresponding label is
|
||||
defined, then the annotation takes precedence.
|
||||
|
||||
|
||||
## Nakadi collector
|
||||
|
||||
The Nakadi collector allows scaling based on [Nakadi](https://nakadi.io/)
|
||||
Subscription API stats metrics `consumer_lag_seconds` or `unconsumed_events`.
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric Type | Description | Type | K8s Versions |
|
||||
|------------------------|-----------------------------------------------------------------------------|----------|--------------|
|
||||
| `unconsumed-events` | Scale based on number of unconsumed events for a Nakadi subscription | External | `>=1.24` |
|
||||
| `consumer-lag-seconds` | Scale based on number of max consumer lag seconds for a Nakadi subscription | External | `>=1.24` |
|
||||
| ------------ | ------- | -- | -- |
|
||||
| `unconsumed-events` | Scale based on number of unconsumed events for a Nakadi Subscription | External | `>=1.24` |
|
||||
| `consumer-lag-seconds` | Scale based on number of max consumer lag seconds for a Nakadi Subscription | External | `>=1.24` |
|
||||
|
||||
```yaml
|
||||
apiVersion: autoscaling/v2
|
||||
kind: HorizontalPodAutoscaler
|
||||
metadata:
|
||||
name: myapp-hpa
|
||||
annotations:
|
||||
# metric-config.<metricType>.<metricName>.<collectorType>/<configKey>
|
||||
metric-config.external.my-nakadi-consumer.nakadi/interval: "60s" # optional
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
name: custom-metrics-consumer
|
||||
minReplicas: 0
|
||||
maxReplicas: 8 # should match number of partitions for the event type
|
||||
metrics:
|
||||
- type: External
|
||||
external:
|
||||
metric:
|
||||
name: my-nakadi-consumer
|
||||
selector:
|
||||
matchLabels:
|
||||
type: nakadi
|
||||
subscription-id: "708095f6-cece-4d02-840e-ee488d710b29"
|
||||
metric-type: "consumer-lag-seconds|unconsumed-events"
|
||||
target:
|
||||
# value is compatible with the consumer-lag-seconds metric type.
|
||||
# It describes the amount of consumer lag in seconds before scaling
|
||||
# additionally up.
|
||||
# if an event-type has multiple partitions the value of
|
||||
# consumer-lag-seconds is the max of all the partitions.
|
||||
value: "600" # 10m
|
||||
# averageValue is compatible with unconsumed-events metric type.
|
||||
# This means for every 30 unconsumed events a pod is added.
|
||||
# unconsumed-events is the sum of of unconsumed_events over all
|
||||
# partitions.
|
||||
averageValue: "30"
|
||||
type: AverageValue
|
||||
```
|
||||
|
||||
The `subscription-id` is the Subscription ID of the relevant consumer. The
|
||||
`metric-type` indicates whether to scale on `consumer-lag-seconds` or
|
||||
`unconsumed-events` as outlined below.
|
||||
|
||||
`unconsumed-events` - is the total number of unconsumed events over all
|
||||
partitions. When using this `metric-type` you should also use the target
|
||||
`averageValue` which indicates the number of events which can be handled per
|
||||
pod. To best estimate the number of events per pods, you need to understand the
|
||||
average time for processing an event as well as the rate of events.
|
||||
|
||||
*Example*: You have an event type producing 100 events per second between 00:00
|
||||
and 08:00. Between 08:01 to 23:59 it produces 400 events per second.
|
||||
Let's assume that on average a single pod can consume 100 events per second,
|
||||
then we can define 100 as `averageValue` and the HPA would scale to 1 between
|
||||
00:00 and 08:00, and scale to 4 between 08:01 and 23:59. If there for some
|
||||
reason is a short spike of 800 events per second, then it would scale to 8 pods
|
||||
to process those events until the rate goes down again.
|
||||
|
||||
`consumer-lag-seconds` - describes the age of the oldest unconsumed event for
|
||||
a subscription. If the event type has multiple partitions the lag is defined as
|
||||
the max age over all partitions. When using this `metric-type` you should use
|
||||
the target `value` to indicate the max lag (in seconds) before the HPA should
|
||||
scale.
|
||||
|
||||
*Example*: You have a subscription with a defined SLO of "99.99 of events are
|
||||
consumed within 30 min.". In this case you can define a target `value` of e.g.
|
||||
20 min. (1200s) (to include a safety buffer) such that the HPA only scales up
|
||||
from 1 to 2 if the target of 20 min. is breached and it needs to work faster
|
||||
with more consumers.
|
||||
For this case you should also account for the average time for processing an
|
||||
event when defining the target.
|
||||
|
||||
|
||||
## HTTP Collector
|
||||
|
||||
The http collector allows collecting metrics from an external endpoint specified in the HPA.
|
||||
|
||||
@@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
|
||||
kind: CustomResourceDefinition
|
||||
metadata:
|
||||
annotations:
|
||||
controller-gen.kubebuilder.io/version: v0.8.0
|
||||
controller-gen.kubebuilder.io/version: v0.9.0
|
||||
creationTimestamp: null
|
||||
name: clusterscalingschedules.zalando.org
|
||||
spec:
|
||||
@@ -56,7 +56,7 @@ spec:
|
||||
properties:
|
||||
date:
|
||||
description: Defines the starting date of a OneTime schedule.
|
||||
It has to be a RFC3339 formated date.
|
||||
It has to be a RFC3339 formatted date.
|
||||
format: date-time
|
||||
type: string
|
||||
durationMinutes:
|
||||
@@ -65,7 +65,7 @@ spec:
|
||||
type: integer
|
||||
endDate:
|
||||
description: Defines the ending date of a OneTime schedule.
|
||||
It must be a RFC3339 formated date.
|
||||
It must be a RFC3339 formatted date.
|
||||
format: date-time
|
||||
type: string
|
||||
period:
|
||||
@@ -140,9 +140,3 @@ spec:
|
||||
storage: true
|
||||
subresources:
|
||||
status: {}
|
||||
status:
|
||||
acceptedNames:
|
||||
kind: ""
|
||||
plural: ""
|
||||
conditions: []
|
||||
storedVersions: []
|
||||
|
||||
@@ -22,7 +22,7 @@ spec:
|
||||
serviceAccountName: custom-metrics-apiserver
|
||||
containers:
|
||||
- name: kube-metrics-adapter
|
||||
image: registry.opensource.zalan.do/teapot/kube-metrics-adapter:latest
|
||||
image: ghcr.io/zalando-incubator/kube-metrics-adapter:latest
|
||||
args:
|
||||
# - --v=9
|
||||
- --prometheus-server=http://prometheus.kube-system.svc.cluster.local
|
||||
|
||||
@@ -56,7 +56,7 @@ spec:
|
||||
properties:
|
||||
date:
|
||||
description: Defines the starting date of a OneTime schedule.
|
||||
It has to be a RFC3339 formated date.
|
||||
It has to be a RFC3339 formatted date.
|
||||
format: date-time
|
||||
type: string
|
||||
durationMinutes:
|
||||
@@ -65,7 +65,7 @@ spec:
|
||||
type: integer
|
||||
endDate:
|
||||
description: Defines the ending date of a OneTime schedule.
|
||||
It must be a RFC3339 formated date.
|
||||
It must be a RFC3339 formatted date.
|
||||
format: date-time
|
||||
type: string
|
||||
period:
|
||||
|
||||
@@ -58,7 +58,7 @@ spec:
|
||||
properties:
|
||||
date:
|
||||
description: Defines the starting date of a OneTime schedule.
|
||||
It has to be a RFC3339 formated date.
|
||||
It has to be a RFC3339 formatted date.
|
||||
format: date-time
|
||||
type: string
|
||||
durationMinutes:
|
||||
@@ -67,7 +67,7 @@ spec:
|
||||
type: integer
|
||||
endDate:
|
||||
description: Defines the ending date of a OneTime schedule.
|
||||
It must be a RFC3339 formated date.
|
||||
It must be a RFC3339 formatted date.
|
||||
format: date-time
|
||||
type: string
|
||||
period:
|
||||
|
||||
@@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
|
||||
kind: CustomResourceDefinition
|
||||
metadata:
|
||||
annotations:
|
||||
controller-gen.kubebuilder.io/version: v0.8.0
|
||||
controller-gen.kubebuilder.io/version: v0.9.0
|
||||
creationTimestamp: null
|
||||
name: scalingschedules.zalando.org
|
||||
spec:
|
||||
@@ -58,7 +58,7 @@ spec:
|
||||
properties:
|
||||
date:
|
||||
description: Defines the starting date of a OneTime schedule.
|
||||
It has to be a RFC3339 formated date.
|
||||
It has to be a RFC3339 formatted date.
|
||||
format: date-time
|
||||
type: string
|
||||
durationMinutes:
|
||||
@@ -67,7 +67,7 @@ spec:
|
||||
type: integer
|
||||
endDate:
|
||||
description: Defines the ending date of a OneTime schedule.
|
||||
It must be a RFC3339 formated date.
|
||||
It must be a RFC3339 formatted date.
|
||||
format: date-time
|
||||
type: string
|
||||
period:
|
||||
@@ -142,9 +142,3 @@ spec:
|
||||
storage: true
|
||||
subresources:
|
||||
status: {}
|
||||
status:
|
||||
acceptedNames:
|
||||
kind: ""
|
||||
plural: ""
|
||||
conditions: []
|
||||
storedVersions: []
|
||||
|
||||
+1
-1
@@ -1,4 +1,4 @@
|
||||
FROM registry.opensource.zalan.do/library/alpine-3.13:latest
|
||||
FROM registry.opensource.zalan.do/library/alpine-3:latest
|
||||
LABEL maintainer="Team Teapot @ Zalando SE <team-teapot@zalando.de>"
|
||||
|
||||
# add binary
|
||||
|
||||
@@ -1,37 +1,36 @@
|
||||
module github.com/zalando-incubator/kube-metrics-adapter
|
||||
|
||||
require (
|
||||
github.com/aws/aws-sdk-go v1.44.254
|
||||
github.com/aws/aws-sdk-go v1.48.10
|
||||
github.com/influxdata/influxdb-client-go v0.2.0
|
||||
github.com/prometheus/client_golang v1.15.0
|
||||
github.com/prometheus/common v0.42.0
|
||||
github.com/sirupsen/logrus v1.9.0
|
||||
github.com/spf13/cobra v1.7.0
|
||||
github.com/spyzhov/ajson v0.7.2
|
||||
github.com/stretchr/testify v1.8.2
|
||||
github.com/prometheus/client_golang v1.17.0
|
||||
github.com/prometheus/common v0.45.0
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
github.com/spf13/cobra v1.8.0
|
||||
github.com/spyzhov/ajson v0.9.0
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/szuecs/routegroup-client v0.21.1
|
||||
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20230223125308-aff25efae501
|
||||
golang.org/x/net v0.9.0
|
||||
golang.org/x/oauth2 v0.7.0
|
||||
golang.org/x/sync v0.1.0
|
||||
k8s.io/api v0.23.0
|
||||
k8s.io/apimachinery v0.23.0
|
||||
k8s.io/apiserver v0.23.0
|
||||
k8s.io/client-go v0.23.0
|
||||
k8s.io/code-generator v0.23.0
|
||||
k8s.io/component-base v0.23.0
|
||||
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20230601114834-6ed1bba3c85d
|
||||
golang.org/x/net v0.19.0
|
||||
golang.org/x/oauth2 v0.13.0
|
||||
golang.org/x/sync v0.5.0
|
||||
k8s.io/api v0.24.17
|
||||
k8s.io/apimachinery v0.24.17
|
||||
k8s.io/apiserver v0.24.17
|
||||
k8s.io/client-go v0.24.17
|
||||
k8s.io/code-generator v0.24.17
|
||||
k8s.io/component-base v0.24.17
|
||||
k8s.io/klog v1.0.0
|
||||
k8s.io/kube-openapi v0.0.0-20220124234850-424119656bbf
|
||||
k8s.io/metrics v0.22.17
|
||||
sigs.k8s.io/controller-tools v0.8.0
|
||||
sigs.k8s.io/custom-metrics-apiserver v1.22.0
|
||||
k8s.io/kube-openapi v0.0.0-20230614213217-ba0abe644833
|
||||
k8s.io/metrics v0.24.17
|
||||
sigs.k8s.io/controller-tools v0.9.0
|
||||
sigs.k8s.io/custom-metrics-apiserver v1.24.0
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go/compute/metadata v0.2.3 // indirect
|
||||
github.com/NYTimes/gziphandler v1.1.1 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/blang/semver v3.5.1+incompatible // indirect
|
||||
github.com/blang/semver/v4 v4.0.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/coreos/go-semver v0.3.0 // indirect
|
||||
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
|
||||
@@ -41,22 +40,22 @@ require (
|
||||
github.com/fatih/color v1.13.0 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.3 // indirect
|
||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
||||
github.com/go-logr/logr v1.2.3 // indirect
|
||||
github.com/go-logr/logr v1.2.4 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.6 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
||||
github.com/go-openapi/swag v0.22.3 // indirect
|
||||
github.com/gobuffalo/flect v0.2.3 // indirect
|
||||
github.com/go-openapi/swag v0.22.4 // indirect
|
||||
github.com/gobuffalo/flect v0.2.5 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/gnostic v0.6.9 // indirect
|
||||
github.com/google/go-cmp v0.5.9 // indirect
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/googleapis/gnostic v0.5.5 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
|
||||
github.com/imdario/mergo v0.3.13 // indirect
|
||||
github.com/imdario/mergo v0.3.16 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
@@ -65,62 +64,65 @@ require (
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.17 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_model v0.3.0 // indirect
|
||||
github.com/prometheus/procfs v0.9.0 // indirect
|
||||
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
|
||||
github.com/prometheus/procfs v0.11.1 // indirect
|
||||
github.com/rogpeppe/go-internal v1.11.0 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
go.etcd.io/etcd/api/v3 v3.5.6 // indirect
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.6 // indirect
|
||||
go.etcd.io/etcd/client/v2 v2.305.6 // indirect
|
||||
go.etcd.io/etcd/client/v3 v3.5.6 // indirect
|
||||
go.etcd.io/etcd/api/v3 v3.5.9 // indirect
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect
|
||||
go.etcd.io/etcd/client/v2 v2.305.7 // indirect
|
||||
go.etcd.io/etcd/client/v3 v3.5.9 // indirect
|
||||
go.opentelemetry.io/contrib v0.20.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel v1.11.1 // indirect
|
||||
go.opentelemetry.io/otel v1.14.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.11.1 // indirect
|
||||
go.opentelemetry.io/otel/sdk v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/export/metric v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.11.1 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.14.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.8.0 // indirect
|
||||
go.uber.org/zap v1.21.0 // indirect
|
||||
golang.org/x/crypto v0.6.0 // indirect
|
||||
golang.org/x/mod v0.8.0 // indirect
|
||||
golang.org/x/sys v0.7.0 // indirect
|
||||
golang.org/x/term v0.7.0 // indirect
|
||||
golang.org/x/text v0.9.0 // indirect
|
||||
golang.org/x/crypto v0.16.0 // indirect
|
||||
golang.org/x/mod v0.12.0 // indirect
|
||||
golang.org/x/sys v0.15.0 // indirect
|
||||
golang.org/x/term v0.15.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
golang.org/x/tools v0.6.0 // indirect
|
||||
golang.org/x/tools v0.11.0 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
|
||||
google.golang.org/grpc v1.52.0 // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
|
||||
google.golang.org/grpc v1.56.3 // indirect
|
||||
google.golang.org/protobuf v1.31.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.23.0 // indirect
|
||||
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c // indirect
|
||||
k8s.io/klog/v2 v2.90.0 // indirect
|
||||
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 // indirect
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.24.0 // indirect
|
||||
k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 // indirect
|
||||
k8s.io/klog/v2 v2.100.1 // indirect
|
||||
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.37 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
|
||||
sigs.k8s.io/yaml v1.3.0 // indirect
|
||||
)
|
||||
|
||||
replace (
|
||||
go.opentelemetry.io/cotrib v0.20.0 => go.opentelemetry.io/contrib v0.44.0
|
||||
go.opentelemetry.io/otel => go.opentelemetry.io/otel v0.20.0
|
||||
go.opentelemetry.io/otel/sdk => go.opentelemetry.io/otel/sdk v0.20.0
|
||||
go.opentelemetry.io/otel/trace => go.opentelemetry.io/otel/trace v0.20.0
|
||||
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42
|
||||
)
|
||||
|
||||
go 1.20
|
||||
go 1.21
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -68,11 +68,11 @@ type Schedule struct {
|
||||
// +optional
|
||||
Period *SchedulePeriod `json:"period,omitempty"`
|
||||
// Defines the starting date of a OneTime schedule. It has to
|
||||
// be a RFC3339 formated date.
|
||||
// be a RFC3339 formatted date.
|
||||
// +optional
|
||||
Date *ScheduleDate `json:"date,omitempty"`
|
||||
// Defines the ending date of a OneTime schedule. It must be
|
||||
// a RFC3339 formated date.
|
||||
// a RFC3339 formatted date.
|
||||
// +optional
|
||||
EndDate *ScheduleDate `json:"endDate,omitempty"`
|
||||
// The duration in minutes (default 0) that the configured value will be
|
||||
|
||||
@@ -61,6 +61,10 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface {
|
||||
func NewForConfig(c *rest.Config) (*Clientset, error) {
|
||||
configShallowCopy := *c
|
||||
|
||||
if configShallowCopy.UserAgent == "" {
|
||||
configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
|
||||
}
|
||||
|
||||
// share the transport between all clients
|
||||
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
|
||||
if err != nil {
|
||||
|
||||
@@ -0,0 +1,125 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
ExternalRPSMetricType = "requests-per-second"
|
||||
ExternalRPSQuery = `scalar(sum(rate(%s{host=~"%s"}[1m])) * %.4f)`
|
||||
)
|
||||
|
||||
type ExternalRPSCollectorPlugin struct {
|
||||
metricName string
|
||||
promPlugin CollectorPlugin
|
||||
pattern *regexp.Regexp
|
||||
}
|
||||
|
||||
type ExternalRPSCollector struct {
|
||||
interval time.Duration
|
||||
promCollector Collector
|
||||
}
|
||||
|
||||
func NewExternalRPSCollectorPlugin(
|
||||
promPlugin CollectorPlugin,
|
||||
metricName string,
|
||||
) (*ExternalRPSCollectorPlugin, error) {
|
||||
if metricName == "" {
|
||||
return nil, fmt.Errorf("failed to initialize hostname collector plugin, metric name was not defined")
|
||||
}
|
||||
|
||||
p, err := regexp.Compile("^[a-zA-Z0-9.-]+$")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create regular expression to match hostname format")
|
||||
}
|
||||
|
||||
return &ExternalRPSCollectorPlugin{
|
||||
metricName: metricName,
|
||||
promPlugin: promPlugin,
|
||||
pattern: p,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewCollector initializes a new skipper collector from the specified HPA.
|
||||
func (p *ExternalRPSCollectorPlugin) NewCollector(
|
||||
hpa *autoscalingv2.HorizontalPodAutoscaler,
|
||||
config *MetricConfig,
|
||||
interval time.Duration,
|
||||
) (Collector, error) {
|
||||
if config == nil {
|
||||
return nil, fmt.Errorf("metric config not present, it is not possible to initialize the collector")
|
||||
}
|
||||
// Need to copy config and add a promQL query in order to get
|
||||
// RPS data from a specific hostname from prometheus. The idea
|
||||
// of the copy is to not modify the original config struct.
|
||||
confCopy := *config
|
||||
|
||||
if _, ok := config.Config["hostnames"]; !ok {
|
||||
return nil, fmt.Errorf("Hostname is not specified, unable to create collector")
|
||||
}
|
||||
|
||||
hostnames := strings.Split(config.Config["hostnames"], ",")
|
||||
if p.pattern == nil {
|
||||
return nil, fmt.Errorf("plugin did not specify hostname regex pattern, unable to create collector")
|
||||
}
|
||||
for _, h := range hostnames {
|
||||
if ok := p.pattern.MatchString(h); !ok {
|
||||
return nil, fmt.Errorf(
|
||||
"invalid hostname format, unable to create collector: %s",
|
||||
h,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
weight := 1.0
|
||||
if w, ok := config.Config["weight"]; ok {
|
||||
num, err := strconv.ParseFloat(w, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse weight annotation, unable to create collector: %s", w)
|
||||
}
|
||||
weight = num / 100.0
|
||||
}
|
||||
|
||||
confCopy.Config = map[string]string{
|
||||
"query": fmt.Sprintf(
|
||||
ExternalRPSQuery,
|
||||
p.metricName,
|
||||
strings.ReplaceAll(strings.Join(hostnames, "|"), ".", "_"),
|
||||
weight,
|
||||
),
|
||||
}
|
||||
|
||||
c, err := p.promPlugin.NewCollector(hpa, &confCopy, interval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ExternalRPSCollector{
|
||||
interval: interval,
|
||||
promCollector: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetMetrics gets hostname metrics from Prometheus
|
||||
func (c *ExternalRPSCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
v, err := c.promCollector.GetMetrics()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(v) != 1 {
|
||||
return nil, fmt.Errorf("expected to only get one metric value, got %d", len(v))
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// Interval returns the interval at which the collector should run.
|
||||
func (c *ExternalRPSCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,57 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/metrics/pkg/apis/custom_metrics"
|
||||
)
|
||||
|
||||
type FakeCollectorPlugin struct {
|
||||
metrics []CollectedMetric
|
||||
config map[string]string
|
||||
}
|
||||
|
||||
type FakeCollector struct {
|
||||
metrics []CollectedMetric
|
||||
interval time.Duration
|
||||
stub func() ([]CollectedMetric, error)
|
||||
}
|
||||
|
||||
func (c *FakeCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
if c.stub != nil {
|
||||
v, err := c.stub()
|
||||
return v, err
|
||||
}
|
||||
|
||||
return c.metrics, nil
|
||||
}
|
||||
|
||||
func (FakeCollector) Interval() time.Duration {
|
||||
return time.Minute
|
||||
}
|
||||
|
||||
func (p *FakeCollectorPlugin) NewCollector(
|
||||
hpa *autoscalingv2.HorizontalPodAutoscaler,
|
||||
config *MetricConfig,
|
||||
interval time.Duration,
|
||||
) (Collector, error) {
|
||||
|
||||
p.config = config.Config
|
||||
return &FakeCollector{metrics: p.metrics, interval: interval}, nil
|
||||
}
|
||||
|
||||
func makePlugin(metric int) *FakeCollectorPlugin {
|
||||
return &FakeCollectorPlugin{
|
||||
metrics: []CollectedMetric{
|
||||
{
|
||||
Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(metric), resource.DecimalSI)},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func makeCollectorWithStub(f func() ([]CollectedMetric, error)) *FakeCollector {
|
||||
return &FakeCollector{stub: f}
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
// NakadiMetricType defines the metric type for metrics based on Nakadi
|
||||
// subscriptions.
|
||||
NakadiMetricType = "nakadi"
|
||||
nakadiSubscriptionIDKey = "subscription-id"
|
||||
nakadiMetricTypeKey = "metric-type"
|
||||
nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds"
|
||||
nakadiMetricTypeUnconsumedEvents = "unconsumed-events"
|
||||
)
|
||||
|
||||
// NakadiCollectorPlugin defines a plugin for creating collectors that can get
|
||||
// unconsumed events from Nakadi.
|
||||
type NakadiCollectorPlugin struct {
|
||||
nakadi nakadi.Nakadi
|
||||
}
|
||||
|
||||
// NewNakadiCollectorPlugin initializes a new NakadiCollectorPlugin.
|
||||
func NewNakadiCollectorPlugin(nakadi nakadi.Nakadi) (*NakadiCollectorPlugin, error) {
|
||||
return &NakadiCollectorPlugin{
|
||||
nakadi: nakadi,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewCollector initializes a new Nakadi collector from the specified HPA.
|
||||
func (c *NakadiCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewNakadiCollector(c.nakadi, hpa, config, interval)
|
||||
}
|
||||
|
||||
// NakadiCollector defines a collector that is able to collect metrics from
|
||||
// Nakadi.
|
||||
type NakadiCollector struct {
|
||||
nakadi nakadi.Nakadi
|
||||
interval time.Duration
|
||||
subscriptionID string
|
||||
nakadiMetricType string
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
namespace string
|
||||
}
|
||||
|
||||
// NewNakadiCollector initializes a new NakadiCollector.
|
||||
func NewNakadiCollector(nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
|
||||
if config.Metric.Selector == nil {
|
||||
return nil, fmt.Errorf("selector for nakadi is not specified")
|
||||
}
|
||||
|
||||
subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("subscription-id not specified on metric")
|
||||
}
|
||||
|
||||
metricType, ok := config.Config[nakadiMetricTypeKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("metric-type not specified on metric")
|
||||
}
|
||||
|
||||
if metricType != nakadiMetricTypeConsumerLagSeconds && metricType != nakadiMetricTypeUnconsumedEvents {
|
||||
return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType)
|
||||
}
|
||||
|
||||
return &NakadiCollector{
|
||||
nakadi: nakadi,
|
||||
interval: interval,
|
||||
subscriptionID: subscriptionID,
|
||||
nakadiMetricType: metricType,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
namespace: hpa.Namespace,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetMetrics returns a list of collected metrics for the Nakadi subscription ID.
|
||||
func (c *NakadiCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
var value int64
|
||||
var err error
|
||||
switch c.nakadiMetricType {
|
||||
case nakadiMetricTypeConsumerLagSeconds:
|
||||
value, err = c.nakadi.ConsumerLagSeconds(context.TODO(), c.subscriptionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case nakadiMetricTypeUnconsumedEvents:
|
||||
value, err = c.nakadi.UnconsumedEvents(context.TODO(), c.subscriptionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
metricValue := CollectedMetric{
|
||||
Namespace: c.namespace,
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Now(),
|
||||
Value: *resource.NewQuantity(value, resource.DecimalSI),
|
||||
},
|
||||
}
|
||||
|
||||
return []CollectedMetric{metricValue}, nil
|
||||
}
|
||||
|
||||
// Interval returns the interval at which the collector should run.
|
||||
func (c *NakadiCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
||||
@@ -61,10 +61,10 @@ func TestTargetRefReplicasStatefulSets(t *testing.T) {
|
||||
require.Equal(t, statefulSet.Status.Replicas, replicas)
|
||||
}
|
||||
|
||||
func newHPA(namesapce string, refName string, refKind string) *autoscalingv2.HorizontalPodAutoscaler {
|
||||
func newHPA(namespace string, refName string, refKind string) *autoscalingv2.HorizontalPodAutoscaler {
|
||||
return &autoscalingv2.HorizontalPodAutoscaler{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: namesapce,
|
||||
Name: namespace,
|
||||
},
|
||||
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
|
||||
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
|
||||
@@ -658,38 +658,3 @@ func makeConfig(resourceName, namespace, kind, backend string, fakedAverage bool
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
type FakeCollectorPlugin struct {
|
||||
metrics []CollectedMetric
|
||||
config map[string]string
|
||||
}
|
||||
|
||||
type FakeCollector struct {
|
||||
metrics []CollectedMetric
|
||||
}
|
||||
|
||||
func (c *FakeCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
return c.metrics, nil
|
||||
}
|
||||
|
||||
func (FakeCollector) Interval() time.Duration {
|
||||
return time.Minute
|
||||
}
|
||||
|
||||
func (p *FakeCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
if p.config != nil {
|
||||
return nil, fmt.Errorf("config already assigned once: %v", p.config)
|
||||
}
|
||||
p.config = config.Config
|
||||
return &FakeCollector{metrics: p.metrics}, nil
|
||||
}
|
||||
|
||||
func makePlugin(metric int) *FakeCollectorPlugin {
|
||||
return &FakeCollectorPlugin{
|
||||
metrics: []CollectedMetric{
|
||||
{
|
||||
Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(metric), resource.DecimalSI)},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,124 @@
|
||||
package nakadi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// Nakadi defines an interface for talking to the Nakadi API.
|
||||
type Nakadi interface {
|
||||
ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error)
|
||||
UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error)
|
||||
}
|
||||
|
||||
// Client defines client for interfacing with the Nakadi API.
|
||||
type Client struct {
|
||||
nakadiEndpoint string
|
||||
http *http.Client
|
||||
}
|
||||
|
||||
// NewNakadiClient initializes a new Nakadi Client.
|
||||
func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client {
|
||||
return &Client{
|
||||
nakadiEndpoint: nakadiEndpoint,
|
||||
http: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) {
|
||||
stats, err := c.stats(ctx, subscriptionID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var maxConsumerLagSeconds int64
|
||||
for _, eventType := range stats {
|
||||
for _, partition := range eventType.Partitions {
|
||||
maxConsumerLagSeconds = max(maxConsumerLagSeconds, partition.ConsumerLagSeconds)
|
||||
}
|
||||
}
|
||||
|
||||
return maxConsumerLagSeconds, nil
|
||||
}
|
||||
|
||||
func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) {
|
||||
stats, err := c.stats(ctx, subscriptionID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var unconsumedEvents int64
|
||||
for _, eventType := range stats {
|
||||
for _, partition := range eventType.Partitions {
|
||||
unconsumedEvents += partition.UnconsumedEvents
|
||||
}
|
||||
}
|
||||
|
||||
return unconsumedEvents, nil
|
||||
}
|
||||
|
||||
type statsResp struct {
|
||||
Items []statsEventType `json:"items"`
|
||||
}
|
||||
|
||||
type statsEventType struct {
|
||||
EventType string `json:"event_type"`
|
||||
Partitions []statsPartition `json:"partitions"`
|
||||
}
|
||||
|
||||
type statsPartition struct {
|
||||
Partiton string `json:"partition"`
|
||||
State string `json:"state"`
|
||||
UnconsumedEvents int64 `json:"unconsumed_events"`
|
||||
ConsumerLagSeconds int64 `json:"consumer_lag_seconds"`
|
||||
StreamID string `json:"stream_id"`
|
||||
AssignmentType string `json:"assignment_type"`
|
||||
}
|
||||
|
||||
// stats returns the Nakadi stats for a given subscription ID.
|
||||
//
|
||||
// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get
|
||||
func (c *Client) stats(ctx context.Context, subscriptionID string) ([]statsEventType, error) {
|
||||
endpoint, err := url.Parse(c.nakadiEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID)
|
||||
|
||||
q := endpoint.Query()
|
||||
q.Set("show_time_lag", "true")
|
||||
endpoint.RawQuery = q.Encode()
|
||||
|
||||
resp, err := c.http.Get(endpoint.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
d, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
|
||||
}
|
||||
|
||||
var result statsResp
|
||||
err = json.Unmarshal(d, &result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(result.Items) == 0 {
|
||||
return nil, errors.New("expected at least 1 event-type, 0 returned")
|
||||
}
|
||||
|
||||
return result.Items, nil
|
||||
}
|
||||
@@ -0,0 +1,141 @@
|
||||
package nakadi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestQuery(tt *testing.T) {
|
||||
client := &http.Client{}
|
||||
for _, ti := range []struct {
|
||||
msg string
|
||||
status int
|
||||
responseBody string
|
||||
err error
|
||||
unconsumedEvents int64
|
||||
consumerLagSeconds int64
|
||||
}{
|
||||
{
|
||||
msg: "test getting a single event-type",
|
||||
status: http.StatusOK,
|
||||
responseBody: `{
|
||||
"items": [
|
||||
{
|
||||
"event_type": "example-event",
|
||||
"partitions": [
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 4,
|
||||
"consumer_lag_seconds": 2,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
},
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 5,
|
||||
"consumer_lag_seconds": 1,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}`,
|
||||
unconsumedEvents: 9,
|
||||
consumerLagSeconds: 2,
|
||||
},
|
||||
{
|
||||
msg: "test getting multiple event-types",
|
||||
status: http.StatusOK,
|
||||
responseBody: `{
|
||||
"items": [
|
||||
{
|
||||
"event_type": "example-event",
|
||||
"partitions": [
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 4,
|
||||
"consumer_lag_seconds": 2,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
},
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 5,
|
||||
"consumer_lag_seconds": 1,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"event_type": "example-event-2",
|
||||
"partitions": [
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 4,
|
||||
"consumer_lag_seconds": 6,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
},
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 5,
|
||||
"consumer_lag_seconds": 1,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}`,
|
||||
unconsumedEvents: 18,
|
||||
consumerLagSeconds: 6,
|
||||
},
|
||||
{
|
||||
msg: "test call with invalid response",
|
||||
status: http.StatusInternalServerError,
|
||||
responseBody: `{"error": 500}`,
|
||||
err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"),
|
||||
},
|
||||
{
|
||||
msg: "test getting back a single data point",
|
||||
status: http.StatusOK,
|
||||
responseBody: `{
|
||||
"items": []
|
||||
}`,
|
||||
err: errors.New("expected at least 1 event-type, 0 returned"),
|
||||
},
|
||||
} {
|
||||
tt.Run(ti.msg, func(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(
|
||||
func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(ti.status)
|
||||
_, err := w.Write([]byte(ti.responseBody))
|
||||
assert.NoError(t, err)
|
||||
}),
|
||||
)
|
||||
defer ts.Close()
|
||||
|
||||
nakadiClient := NewNakadiClient(ts.URL, client)
|
||||
consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), "id")
|
||||
assert.Equal(t, ti.err, err)
|
||||
assert.Equal(t, ti.consumerLagSeconds, consumerLagSeconds)
|
||||
unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), "id")
|
||||
assert.Equal(t, ti.err, err)
|
||||
assert.Equal(t, ti.unconsumedEvents, unconsumedEvents)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
+1
-1
@@ -363,7 +363,7 @@ func collectorRunner(ctx context.Context, collector collector.Collector, metrics
|
||||
}
|
||||
}
|
||||
|
||||
// Remove removes a collector from the Collector schduler. The collector is
|
||||
// Remove removes a collector from the Collector scheduler. The collector is
|
||||
// stopped before it's removed.
|
||||
func (t *CollectorScheduler) Remove(resourceRef resourceReference) {
|
||||
t.Lock()
|
||||
|
||||
+55
-3
@@ -35,6 +35,7 @@ import (
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/controller/scheduledscaling"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/provider"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon"
|
||||
"golang.org/x/oauth2"
|
||||
@@ -64,7 +65,9 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
EnableExternalMetricsAPI: true,
|
||||
MetricsAddress: ":7979",
|
||||
ZMONTokenName: "zmon",
|
||||
NakadiTokenName: "nakadi",
|
||||
CredentialsDir: "/meta/credentials",
|
||||
ExternalRPSMetricName: "skipper_serve_host_duration_seconds_count",
|
||||
}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
@@ -109,8 +112,12 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
"url of ZMON KariosDB endpoint to query for ZMON checks")
|
||||
flags.StringVar(&o.ZMONTokenName, "zmon-token-name", o.ZMONTokenName, ""+
|
||||
"name of the token used to query ZMON")
|
||||
flags.StringVar(&o.NakadiEndpoint, "nakadi-endpoint", o.NakadiEndpoint, ""+
|
||||
"url of Nakadi endpoint to for nakadi subscription stats")
|
||||
flags.StringVar(&o.NakadiTokenName, "nakadi-token-name", o.NakadiTokenName, ""+
|
||||
"name of the token used to call nakadi subscription API")
|
||||
flags.StringVar(&o.Token, "token", o.Token, ""+
|
||||
"static oauth2 token to use when calling external services like ZMON")
|
||||
"static oauth2 token to use when calling external services like ZMON and Nakadi")
|
||||
flags.StringVar(&o.CredentialsDir, "credentials-dir", o.CredentialsDir, ""+
|
||||
"path to the credentials dir where tokens are stored")
|
||||
flags.BoolVar(&o.SkipperIngressMetrics, "skipper-ingress-metrics", o.SkipperIngressMetrics, ""+
|
||||
@@ -132,6 +139,10 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules")
|
||||
flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.")
|
||||
flags.StringVar(&o.DefaultTimeZone, "scaling-schedule-default-time-zone", "Europe/Berlin", "Default time zone to use for ScalingSchedules.")
|
||||
flags.StringVar(&o.ExternalRPSMetricName, "external-rps-metric-name", o.ExternalRPSMetricName, ""+
|
||||
"The name of the metric that should be used to query prometheus for RPS per hostname.")
|
||||
flags.BoolVar(&o.ExternalRPSMetrics, "external-rps-metrics", o.ExternalRPSMetrics, ""+
|
||||
"whether to enable external RPS metric collector or not")
|
||||
return cmd
|
||||
}
|
||||
|
||||
@@ -218,6 +229,18 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// External RPS collector, like skipper's, depends on prometheus being enabled.
|
||||
// Also, to enable hostname metric its necessary to pass the metric name that
|
||||
// will be used. This was built this way so we can support hostname metrics to
|
||||
// any ingress provider, e.g. Skipper, Nginx, envoy etc, in a simple way.
|
||||
if o.ExternalRPSMetrics && o.ExternalRPSMetricName != "" {
|
||||
externalRPSPlugin, err := collector.NewExternalRPSCollectorPlugin(promPlugin, o.ExternalRPSMetricName)
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.ExternalRPSMetricType}, externalRPSPlugin)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register hostname collector plugin: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if o.InfluxDBAddress != "" {
|
||||
@@ -257,6 +280,27 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.ZMONMetricType, collector.ZMONCheckMetricLegacy}, zmonPlugin)
|
||||
}
|
||||
|
||||
// enable Nakadi based metrics
|
||||
if o.NakadiEndpoint != "" {
|
||||
var tokenSource oauth2.TokenSource
|
||||
if o.Token != "" {
|
||||
tokenSource = oauth2.StaticTokenSource(&oauth2.Token{AccessToken: o.Token})
|
||||
} else {
|
||||
tokenSource = platformiam.NewTokenSource(o.NakadiTokenName, o.CredentialsDir)
|
||||
}
|
||||
|
||||
httpClient := newOauth2HTTPClient(ctx, tokenSource)
|
||||
|
||||
nakadiClient := nakadi.NewNakadiClient(o.NakadiEndpoint, httpClient)
|
||||
|
||||
nakadiPlugin, err := collector.NewNakadiCollectorPlugin(nakadiClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize Nakadi collector plugin: %v", err)
|
||||
}
|
||||
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.NakadiMetricType}, nakadiPlugin)
|
||||
}
|
||||
|
||||
awsSessions := make(map[string]*session.Session, len(o.AWSRegions))
|
||||
for _, region := range o.AWSRegions {
|
||||
awsSessions[region], err = session.NewSessionWithOptions(session.Options{
|
||||
@@ -315,7 +359,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
return fmt.Errorf("failed to register ScalingSchedule object collector plugin: %v", err)
|
||||
}
|
||||
|
||||
// setup ScheduledScaling controller to continously update
|
||||
// setup ScheduledScaling controller to continuously update
|
||||
// status of ScalingSchedule and ClusterScalingSchedule
|
||||
// resources.
|
||||
scheduledScalingController := scheduledscaling.NewController(scalingScheduleClient.ZalandoV1(), scalingSchedulesStore, clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.DefaultTimeZone)
|
||||
@@ -383,7 +427,7 @@ func newOauth2HTTPClient(ctx context.Context, tokenSource oauth2.TokenSource) *h
|
||||
// add HTTP client to context (this is how the oauth2 lib gets it).
|
||||
ctx = context.WithValue(ctx, oauth2.HTTPClient, client)
|
||||
|
||||
// instantiate an http.Client containg the token source.
|
||||
// instantiate an http.Client containing the token source.
|
||||
return oauth2.NewClient(ctx, tokenSource)
|
||||
}
|
||||
|
||||
@@ -410,6 +454,10 @@ type AdapterServerOptions struct {
|
||||
ZMONKariosDBEndpoint string
|
||||
// ZMONTokenName is the name of the token used to query ZMON
|
||||
ZMONTokenName string
|
||||
// NakadiEndpoint enables Nakadi metrics from the specified endpoint
|
||||
NakadiEndpoint string
|
||||
// NakadiTokenName is the name of the token used to call Nakadi
|
||||
NakadiTokenName string
|
||||
// Token is an oauth2 token used to authenticate with services like
|
||||
// ZMON.
|
||||
Token string
|
||||
@@ -445,4 +493,8 @@ type AdapterServerOptions struct {
|
||||
RampSteps int
|
||||
// Default time zone to use for ScalingSchedules.
|
||||
DefaultTimeZone string
|
||||
// Feature flag to enable external rps metric collector
|
||||
ExternalRPSMetrics bool
|
||||
// Name of the Prometheus metric that stores RPS by hostname for external RPS metrics.
|
||||
ExternalRPSMetricName string
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user