Compare commits
48 Commits
update-dep
...
v0.2.2
Author | SHA1 | Date | |
---|---|---|---|
|
49f9f5ffe0 | ||
|
43dd2e9741 | ||
|
4aeebc853c | ||
|
45a09c12a0 | ||
|
789c2dee79 | ||
|
bbe5a8a1f7 | ||
|
32e9a39be0 | ||
|
afea8de1d9 | ||
|
a17ad004d3 | ||
|
ec77258ba4 | ||
|
3ef92536c1 | ||
|
ae5c4af76f | ||
|
b130ce5fa7 | ||
|
8cd076b01d | ||
|
15cffaee88 | ||
|
82a96580a2 | ||
|
08825c4920 | ||
|
ac3c500d84 | ||
|
656a05fe07 | ||
|
e54f3b88cf | ||
|
f5124993a2 | ||
|
7c1339a6f2 | ||
|
fc2f6aa5df | ||
|
7448688788 | ||
|
0e1302a97e | ||
|
749c1a7e25 | ||
|
e7eb21a773 | ||
|
2c3247bf57 | ||
|
6860217a71 | ||
|
a66ce04128 | ||
|
9eabbd53d9 | ||
|
ec8622c028 | ||
|
166865edbb | ||
|
85f1c3e13d | ||
|
1a5297d6b2 | ||
|
be0e0d485e | ||
|
a07eb0b79c | ||
|
34dc968e77 | ||
|
9174f2550a | ||
|
6c4dfd682b | ||
|
aa3f8ab969 | ||
|
437bca4de8 | ||
|
c2fe13d21d | ||
|
bbfd982fb6 | ||
|
e9112a7114 | ||
|
2c526dd498 | ||
|
29b782160d | ||
|
28f3d96061 |
91
.github/workflows/gh-packages.yaml
vendored
Normal file
91
.github/workflows/gh-packages.yaml
vendored
Normal file
@ -0,0 +1,91 @@
|
||||
name: gh-package-deploy
|
||||
permissions: {}
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
tags:
|
||||
- '*'
|
||||
|
||||
env:
|
||||
REGISTRY: ghcr.io
|
||||
IMAGE_NAME: "${{ github.repository }}"
|
||||
|
||||
jobs:
|
||||
docker:
|
||||
if: ${{ github.actor != 'dependabot[bot]' }}
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
# Adding this block will overridw default values to None if not specified in the block
|
||||
# https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
|
||||
permissions:
|
||||
contents: read
|
||||
actions: read
|
||||
packages: write # to push packages
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9
|
||||
|
||||
- uses: actions/setup-go@fac708d6674e30b6ba41289acaab6d4b75aa0753
|
||||
with:
|
||||
# https://www.npmjs.com/package/semver#caret-ranges-123-025-004
|
||||
go-version: '^1.21'
|
||||
|
||||
- name: Login to Github Container Registry
|
||||
uses: docker/login-action@465a07811f14bebb1938fbed4728c6a1ff8901fc
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- uses: actions-ecosystem/action-get-latest-tag@b7c32daec3395a9616f88548363a42652b22d435
|
||||
id: get-latest-tag
|
||||
|
||||
- name: Build binaries
|
||||
run: |
|
||||
make build.linux.amd64 build.linux.arm64
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@2b82ce82d56a2a04d2637cd93a637ae1b359c0a7
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@4c0219f9ac95b02789c1075625400b2acbff50b1
|
||||
|
||||
- name: Login to GitHub Container Registry
|
||||
if: github.event_name != 'pull_request'
|
||||
uses: docker/login-action@465a07811f14bebb1938fbed4728c6a1ff8901fc
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Docker meta
|
||||
uses: docker/metadata-action@818d4b7b91585d195f67373fd9cb0332e31a7175
|
||||
id: meta
|
||||
with:
|
||||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
|
||||
tags: |
|
||||
type=semver,pattern=v{{version}}
|
||||
type=semver,pattern=v{{major}}.{{minor}}
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@2eb1c1961a95fc15694676618e422e8ba1d63825
|
||||
with:
|
||||
context: .
|
||||
build-args: BASE_IMAGE=alpine:3
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' && startsWith(github.ref, 'refs/tags/v') }}
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
|
||||
# Build and push latest tag
|
||||
- name: Build and push latest
|
||||
uses: docker/build-push-action@2eb1c1961a95fc15694676618e422e8ba1d63825
|
||||
with:
|
||||
context: .
|
||||
build-args: BASE_IMAGE=alpine:3
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
@ -1,4 +1,4 @@
|
||||
ARG BASE_IMAGE=registry.opensource.zalan.do/library/alpine-3.13:latest
|
||||
ARG BASE_IMAGE=registry.opensource.zalan.do/library/alpine-3:latest
|
||||
FROM ${BASE_IMAGE}
|
||||
LABEL maintainer="Team Teapot @ Zalando SE <team-teapot@zalando.de>"
|
||||
|
||||
|
89
README.md
89
README.md
@ -702,6 +702,95 @@ you need to define a `key` or other `tag` with a "star" query syntax like
|
||||
metric label definitions. If both annotations and corresponding label is
|
||||
defined, then the annotation takes precedence.
|
||||
|
||||
|
||||
## Nakadi collector
|
||||
|
||||
The Nakadi collector allows scaling based on [Nakadi](https://nakadi.io/)
|
||||
Subscription API stats metrics `consumer_lag_seconds` or `unconsumed_events`.
|
||||
|
||||
### Supported metrics
|
||||
|
||||
| Metric Type | Description | Type | K8s Versions |
|
||||
|------------------------|-----------------------------------------------------------------------------|----------|--------------|
|
||||
| `unconsumed-events` | Scale based on number of unconsumed events for a Nakadi subscription | External | `>=1.24` |
|
||||
| `consumer-lag-seconds` | Scale based on number of max consumer lag seconds for a Nakadi subscription | External | `>=1.24` |
|
||||
| ------------ | ------- | -- | -- |
|
||||
| `unconsumed-events` | Scale based on number of unconsumed events for a Nakadi Subscription | External | `>=1.24` |
|
||||
| `consumer-lag-seconds` | Scale based on number of max consumer lag seconds for a Nakadi Subscription | External | `>=1.24` |
|
||||
|
||||
```yaml
|
||||
apiVersion: autoscaling/v2
|
||||
kind: HorizontalPodAutoscaler
|
||||
metadata:
|
||||
name: myapp-hpa
|
||||
annotations:
|
||||
# metric-config.<metricType>.<metricName>.<collectorType>/<configKey>
|
||||
metric-config.external.my-nakadi-consumer.nakadi/interval: "60s" # optional
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
name: custom-metrics-consumer
|
||||
minReplicas: 0
|
||||
maxReplicas: 8 # should match number of partitions for the event type
|
||||
metrics:
|
||||
- type: External
|
||||
external:
|
||||
metric:
|
||||
name: my-nakadi-consumer
|
||||
selector:
|
||||
matchLabels:
|
||||
type: nakadi
|
||||
subscription-id: "708095f6-cece-4d02-840e-ee488d710b29"
|
||||
metric-type: "consumer-lag-seconds|unconsumed-events"
|
||||
target:
|
||||
# value is compatible with the consumer-lag-seconds metric type.
|
||||
# It describes the amount of consumer lag in seconds before scaling
|
||||
# additionally up.
|
||||
# if an event-type has multiple partitions the value of
|
||||
# consumer-lag-seconds is the max of all the partitions.
|
||||
value: "600" # 10m
|
||||
# averageValue is compatible with unconsumed-events metric type.
|
||||
# This means for every 30 unconsumed events a pod is added.
|
||||
# unconsumed-events is the sum of of unconsumed_events over all
|
||||
# partitions.
|
||||
averageValue: "30"
|
||||
type: AverageValue
|
||||
```
|
||||
|
||||
The `subscription-id` is the Subscription ID of the relevant consumer. The
|
||||
`metric-type` indicates whether to scale on `consumer-lag-seconds` or
|
||||
`unconsumed-events` as outlined below.
|
||||
|
||||
`unconsumed-events` - is the total number of unconsumed events over all
|
||||
partitions. When using this `metric-type` you should also use the target
|
||||
`averageValue` which indicates the number of events which can be handled per
|
||||
pod. To best estimate the number of events per pods, you need to understand the
|
||||
average time for processing an event as well as the rate of events.
|
||||
|
||||
*Example*: You have an event type producing 100 events per second between 00:00
|
||||
and 08:00. Between 08:01 to 23:59 it produces 400 events per second.
|
||||
Let's assume that on average a single pod can consume 100 events per second,
|
||||
then we can define 100 as `averageValue` and the HPA would scale to 1 between
|
||||
00:00 and 08:00, and scale to 4 between 08:01 and 23:59. If there for some
|
||||
reason is a short spike of 800 events per second, then it would scale to 8 pods
|
||||
to process those events until the rate goes down again.
|
||||
|
||||
`consumer-lag-seconds` - describes the age of the oldest unconsumed event for
|
||||
a subscription. If the event type has multiple partitions the lag is defined as
|
||||
the max age over all partitions. When using this `metric-type` you should use
|
||||
the target `value` to indicate the max lag (in seconds) before the HPA should
|
||||
scale.
|
||||
|
||||
*Example*: You have a subscription with a defined SLO of "99.99 of events are
|
||||
consumed within 30 min.". In this case you can define a target `value` of e.g.
|
||||
20 min. (1200s) (to include a safety buffer) such that the HPA only scales up
|
||||
from 1 to 2 if the target of 20 min. is breached and it needs to work faster
|
||||
with more consumers.
|
||||
For this case you should also account for the average time for processing an
|
||||
event when defining the target.
|
||||
|
||||
|
||||
## HTTP Collector
|
||||
|
||||
The http collector allows collecting metrics from an external endpoint specified in the HPA.
|
||||
|
@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
|
||||
kind: CustomResourceDefinition
|
||||
metadata:
|
||||
annotations:
|
||||
controller-gen.kubebuilder.io/version: v0.8.0
|
||||
controller-gen.kubebuilder.io/version: v0.9.0
|
||||
creationTimestamp: null
|
||||
name: clusterscalingschedules.zalando.org
|
||||
spec:
|
||||
@ -140,9 +140,3 @@ spec:
|
||||
storage: true
|
||||
subresources:
|
||||
status: {}
|
||||
status:
|
||||
acceptedNames:
|
||||
kind: ""
|
||||
plural: ""
|
||||
conditions: []
|
||||
storedVersions: []
|
||||
|
@ -22,7 +22,7 @@ spec:
|
||||
serviceAccountName: custom-metrics-apiserver
|
||||
containers:
|
||||
- name: kube-metrics-adapter
|
||||
image: registry.opensource.zalan.do/teapot/kube-metrics-adapter:latest
|
||||
image: ghcr.io/zalando-incubator/kube-metrics-adapter:latest
|
||||
args:
|
||||
# - --v=9
|
||||
- --prometheus-server=http://prometheus.kube-system.svc.cluster.local
|
||||
|
@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
|
||||
kind: CustomResourceDefinition
|
||||
metadata:
|
||||
annotations:
|
||||
controller-gen.kubebuilder.io/version: v0.8.0
|
||||
controller-gen.kubebuilder.io/version: v0.9.0
|
||||
creationTimestamp: null
|
||||
name: scalingschedules.zalando.org
|
||||
spec:
|
||||
@ -142,9 +142,3 @@ spec:
|
||||
storage: true
|
||||
subresources:
|
||||
status: {}
|
||||
status:
|
||||
acceptedNames:
|
||||
kind: ""
|
||||
plural: ""
|
||||
conditions: []
|
||||
storedVersions: []
|
||||
|
@ -1,4 +1,4 @@
|
||||
FROM registry.opensource.zalan.do/library/alpine-3.13:latest
|
||||
FROM registry.opensource.zalan.do/library/alpine-3:latest
|
||||
LABEL maintainer="Team Teapot @ Zalando SE <team-teapot@zalando.de>"
|
||||
|
||||
# add binary
|
||||
|
69
go.mod
69
go.mod
@ -1,36 +1,36 @@
|
||||
module github.com/zalando-incubator/kube-metrics-adapter
|
||||
|
||||
require (
|
||||
github.com/aws/aws-sdk-go v1.44.299
|
||||
github.com/aws/aws-sdk-go v1.48.10
|
||||
github.com/influxdata/influxdb-client-go v0.2.0
|
||||
github.com/prometheus/client_golang v1.16.0
|
||||
github.com/prometheus/common v0.44.0
|
||||
github.com/prometheus/client_golang v1.17.0
|
||||
github.com/prometheus/common v0.45.0
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
github.com/spf13/cobra v1.7.0
|
||||
github.com/spyzhov/ajson v0.8.0
|
||||
github.com/spf13/cobra v1.8.0
|
||||
github.com/spyzhov/ajson v0.9.0
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/szuecs/routegroup-client v0.21.1
|
||||
github.com/zalando-incubator/cluster-lifecycle-manager v0.0.0-20230601114834-6ed1bba3c85d
|
||||
golang.org/x/net v0.12.0
|
||||
golang.org/x/oauth2 v0.10.0
|
||||
golang.org/x/sync v0.3.0
|
||||
k8s.io/api v0.23.17
|
||||
k8s.io/apimachinery v0.23.17
|
||||
k8s.io/apiserver v0.23.17
|
||||
k8s.io/client-go v0.23.17
|
||||
k8s.io/code-generator v0.23.17
|
||||
k8s.io/component-base v0.23.17
|
||||
golang.org/x/net v0.19.0
|
||||
golang.org/x/oauth2 v0.13.0
|
||||
golang.org/x/sync v0.5.0
|
||||
k8s.io/api v0.24.17
|
||||
k8s.io/apimachinery v0.24.17
|
||||
k8s.io/apiserver v0.24.17
|
||||
k8s.io/client-go v0.24.17
|
||||
k8s.io/code-generator v0.24.17
|
||||
k8s.io/component-base v0.24.17
|
||||
k8s.io/klog v1.0.0
|
||||
k8s.io/kube-openapi v0.0.0-20230614213217-ba0abe644833
|
||||
k8s.io/metrics v0.23.17
|
||||
sigs.k8s.io/controller-tools v0.8.0
|
||||
sigs.k8s.io/custom-metrics-apiserver v1.22.0
|
||||
k8s.io/metrics v0.24.17
|
||||
sigs.k8s.io/controller-tools v0.9.0
|
||||
sigs.k8s.io/custom-metrics-apiserver v1.24.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/NYTimes/gziphandler v1.1.1 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/blang/semver v3.5.1+incompatible // indirect
|
||||
github.com/blang/semver/v4 v4.0.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/coreos/go-semver v0.3.0 // indirect
|
||||
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
|
||||
@ -44,15 +44,15 @@ require (
|
||||
github.com/go-openapi/jsonpointer v0.19.6 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
||||
github.com/go-openapi/swag v0.22.4 // indirect
|
||||
github.com/gobuffalo/flect v0.2.3 // indirect
|
||||
github.com/gobuffalo/flect v0.2.5 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/gnostic v0.6.9 // indirect
|
||||
github.com/google/go-cmp v0.5.9 // indirect
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/googleapis/gnostic v0.5.5 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
|
||||
github.com/imdario/mergo v0.3.16 // indirect
|
||||
@ -64,14 +64,14 @@ require (
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.17 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_model v0.4.0 // indirect
|
||||
github.com/prometheus/procfs v0.10.1 // indirect
|
||||
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
|
||||
github.com/prometheus/procfs v0.11.1 // indirect
|
||||
github.com/rogpeppe/go-internal v1.11.0 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
go.etcd.io/etcd/api/v3 v3.5.9 // indirect
|
||||
@ -84,7 +84,7 @@ require (
|
||||
go.opentelemetry.io/otel v1.14.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/export/metric v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.14.0 // indirect
|
||||
@ -92,36 +92,37 @@ require (
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.8.0 // indirect
|
||||
go.uber.org/zap v1.21.0 // indirect
|
||||
golang.org/x/crypto v0.11.0 // indirect
|
||||
golang.org/x/crypto v0.16.0 // indirect
|
||||
golang.org/x/mod v0.12.0 // indirect
|
||||
golang.org/x/sys v0.10.0 // indirect
|
||||
golang.org/x/term v0.10.0 // indirect
|
||||
golang.org/x/text v0.11.0 // indirect
|
||||
golang.org/x/sys v0.15.0 // indirect
|
||||
golang.org/x/term v0.15.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
golang.org/x/tools v0.11.0 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
|
||||
google.golang.org/grpc v1.55.0 // indirect
|
||||
google.golang.org/grpc v1.56.3 // indirect
|
||||
google.golang.org/protobuf v1.31.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.23.0 // indirect
|
||||
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c // indirect
|
||||
k8s.io/apiextensions-apiserver v0.24.0 // indirect
|
||||
k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 // indirect
|
||||
k8s.io/klog/v2 v2.100.1 // indirect
|
||||
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 // indirect
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.37 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
|
||||
sigs.k8s.io/yaml v1.3.0 // indirect
|
||||
)
|
||||
|
||||
replace (
|
||||
go.opentelemetry.io/cotrib v0.20.0 => go.opentelemetry.io/contrib v0.44.0
|
||||
go.opentelemetry.io/otel => go.opentelemetry.io/otel v0.20.0
|
||||
go.opentelemetry.io/otel/sdk => go.opentelemetry.io/otel/sdk v0.20.0
|
||||
go.opentelemetry.io/otel/trace => go.opentelemetry.io/otel/trace v0.20.0
|
||||
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65
|
||||
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42
|
||||
)
|
||||
|
||||
go 1.20
|
||||
go 1.21
|
||||
|
File diff suppressed because it is too large
Load Diff
120
pkg/collector/nakadi_collector.go
Normal file
120
pkg/collector/nakadi_collector.go
Normal file
@ -0,0 +1,120 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
// NakadiMetricType defines the metric type for metrics based on Nakadi
|
||||
// subscriptions.
|
||||
NakadiMetricType = "nakadi"
|
||||
nakadiSubscriptionIDKey = "subscription-id"
|
||||
nakadiMetricTypeKey = "metric-type"
|
||||
nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds"
|
||||
nakadiMetricTypeUnconsumedEvents = "unconsumed-events"
|
||||
)
|
||||
|
||||
// NakadiCollectorPlugin defines a plugin for creating collectors that can get
|
||||
// unconsumed events from Nakadi.
|
||||
type NakadiCollectorPlugin struct {
|
||||
nakadi nakadi.Nakadi
|
||||
}
|
||||
|
||||
// NewNakadiCollectorPlugin initializes a new NakadiCollectorPlugin.
|
||||
func NewNakadiCollectorPlugin(nakadi nakadi.Nakadi) (*NakadiCollectorPlugin, error) {
|
||||
return &NakadiCollectorPlugin{
|
||||
nakadi: nakadi,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewCollector initializes a new Nakadi collector from the specified HPA.
|
||||
func (c *NakadiCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
return NewNakadiCollector(c.nakadi, hpa, config, interval)
|
||||
}
|
||||
|
||||
// NakadiCollector defines a collector that is able to collect metrics from
|
||||
// Nakadi.
|
||||
type NakadiCollector struct {
|
||||
nakadi nakadi.Nakadi
|
||||
interval time.Duration
|
||||
subscriptionID string
|
||||
nakadiMetricType string
|
||||
metric autoscalingv2.MetricIdentifier
|
||||
metricType autoscalingv2.MetricSourceType
|
||||
namespace string
|
||||
}
|
||||
|
||||
// NewNakadiCollector initializes a new NakadiCollector.
|
||||
func NewNakadiCollector(nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
|
||||
if config.Metric.Selector == nil {
|
||||
return nil, fmt.Errorf("selector for nakadi is not specified")
|
||||
}
|
||||
|
||||
subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("subscription-id not specified on metric")
|
||||
}
|
||||
|
||||
metricType, ok := config.Config[nakadiMetricTypeKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("metric-type not specified on metric")
|
||||
}
|
||||
|
||||
if metricType != nakadiMetricTypeConsumerLagSeconds && metricType != nakadiMetricTypeUnconsumedEvents {
|
||||
return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType)
|
||||
}
|
||||
|
||||
return &NakadiCollector{
|
||||
nakadi: nakadi,
|
||||
interval: interval,
|
||||
subscriptionID: subscriptionID,
|
||||
nakadiMetricType: metricType,
|
||||
metric: config.Metric,
|
||||
metricType: config.Type,
|
||||
namespace: hpa.Namespace,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetMetrics returns a list of collected metrics for the Nakadi subscription ID.
|
||||
func (c *NakadiCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||
var value int64
|
||||
var err error
|
||||
switch c.nakadiMetricType {
|
||||
case nakadiMetricTypeConsumerLagSeconds:
|
||||
value, err = c.nakadi.ConsumerLagSeconds(context.TODO(), c.subscriptionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case nakadiMetricTypeUnconsumedEvents:
|
||||
value, err = c.nakadi.UnconsumedEvents(context.TODO(), c.subscriptionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
metricValue := CollectedMetric{
|
||||
Namespace: c.namespace,
|
||||
Type: c.metricType,
|
||||
External: external_metrics.ExternalMetricValue{
|
||||
MetricName: c.metric.Name,
|
||||
MetricLabels: c.metric.Selector.MatchLabels,
|
||||
Timestamp: metav1.Now(),
|
||||
Value: *resource.NewQuantity(value, resource.DecimalSI),
|
||||
},
|
||||
}
|
||||
|
||||
return []CollectedMetric{metricValue}, nil
|
||||
}
|
||||
|
||||
// Interval returns the interval at which the collector should run.
|
||||
func (c *NakadiCollector) Interval() time.Duration {
|
||||
return c.interval
|
||||
}
|
124
pkg/nakadi/nakadi.go
Normal file
124
pkg/nakadi/nakadi.go
Normal file
@ -0,0 +1,124 @@
|
||||
package nakadi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// Nakadi defines an interface for talking to the Nakadi API.
|
||||
type Nakadi interface {
|
||||
ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error)
|
||||
UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error)
|
||||
}
|
||||
|
||||
// Client defines client for interfacing with the Nakadi API.
|
||||
type Client struct {
|
||||
nakadiEndpoint string
|
||||
http *http.Client
|
||||
}
|
||||
|
||||
// NewNakadiClient initializes a new Nakadi Client.
|
||||
func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client {
|
||||
return &Client{
|
||||
nakadiEndpoint: nakadiEndpoint,
|
||||
http: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) {
|
||||
stats, err := c.stats(ctx, subscriptionID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var maxConsumerLagSeconds int64
|
||||
for _, eventType := range stats {
|
||||
for _, partition := range eventType.Partitions {
|
||||
maxConsumerLagSeconds = max(maxConsumerLagSeconds, partition.ConsumerLagSeconds)
|
||||
}
|
||||
}
|
||||
|
||||
return maxConsumerLagSeconds, nil
|
||||
}
|
||||
|
||||
func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) {
|
||||
stats, err := c.stats(ctx, subscriptionID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var unconsumedEvents int64
|
||||
for _, eventType := range stats {
|
||||
for _, partition := range eventType.Partitions {
|
||||
unconsumedEvents += partition.UnconsumedEvents
|
||||
}
|
||||
}
|
||||
|
||||
return unconsumedEvents, nil
|
||||
}
|
||||
|
||||
type statsResp struct {
|
||||
Items []statsEventType `json:"items"`
|
||||
}
|
||||
|
||||
type statsEventType struct {
|
||||
EventType string `json:"event_type"`
|
||||
Partitions []statsPartition `json:"partitions"`
|
||||
}
|
||||
|
||||
type statsPartition struct {
|
||||
Partiton string `json:"partition"`
|
||||
State string `json:"state"`
|
||||
UnconsumedEvents int64 `json:"unconsumed_events"`
|
||||
ConsumerLagSeconds int64 `json:"consumer_lag_seconds"`
|
||||
StreamID string `json:"stream_id"`
|
||||
AssignmentType string `json:"assignment_type"`
|
||||
}
|
||||
|
||||
// stats returns the Nakadi stats for a given subscription ID.
|
||||
//
|
||||
// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get
|
||||
func (c *Client) stats(ctx context.Context, subscriptionID string) ([]statsEventType, error) {
|
||||
endpoint, err := url.Parse(c.nakadiEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID)
|
||||
|
||||
q := endpoint.Query()
|
||||
q.Set("show_time_lag", "true")
|
||||
endpoint.RawQuery = q.Encode()
|
||||
|
||||
resp, err := c.http.Get(endpoint.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
d, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
|
||||
}
|
||||
|
||||
var result statsResp
|
||||
err = json.Unmarshal(d, &result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(result.Items) == 0 {
|
||||
return nil, errors.New("expected at least 1 event-type, 0 returned")
|
||||
}
|
||||
|
||||
return result.Items, nil
|
||||
}
|
141
pkg/nakadi/nakadi_test.go
Normal file
141
pkg/nakadi/nakadi_test.go
Normal file
@ -0,0 +1,141 @@
|
||||
package nakadi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestQuery(tt *testing.T) {
|
||||
client := &http.Client{}
|
||||
for _, ti := range []struct {
|
||||
msg string
|
||||
status int
|
||||
responseBody string
|
||||
err error
|
||||
unconsumedEvents int64
|
||||
consumerLagSeconds int64
|
||||
}{
|
||||
{
|
||||
msg: "test getting a single event-type",
|
||||
status: http.StatusOK,
|
||||
responseBody: `{
|
||||
"items": [
|
||||
{
|
||||
"event_type": "example-event",
|
||||
"partitions": [
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 4,
|
||||
"consumer_lag_seconds": 2,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
},
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 5,
|
||||
"consumer_lag_seconds": 1,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}`,
|
||||
unconsumedEvents: 9,
|
||||
consumerLagSeconds: 2,
|
||||
},
|
||||
{
|
||||
msg: "test getting multiple event-types",
|
||||
status: http.StatusOK,
|
||||
responseBody: `{
|
||||
"items": [
|
||||
{
|
||||
"event_type": "example-event",
|
||||
"partitions": [
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 4,
|
||||
"consumer_lag_seconds": 2,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
},
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 5,
|
||||
"consumer_lag_seconds": 1,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"event_type": "example-event-2",
|
||||
"partitions": [
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 4,
|
||||
"consumer_lag_seconds": 6,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
},
|
||||
{
|
||||
"partition": "0",
|
||||
"state": "assigned",
|
||||
"unconsumed_events": 5,
|
||||
"consumer_lag_seconds": 1,
|
||||
"stream_id": "example-id",
|
||||
"assignment_type": "auto"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}`,
|
||||
unconsumedEvents: 18,
|
||||
consumerLagSeconds: 6,
|
||||
},
|
||||
{
|
||||
msg: "test call with invalid response",
|
||||
status: http.StatusInternalServerError,
|
||||
responseBody: `{"error": 500}`,
|
||||
err: errors.New("[nakadi stats] unexpected response code: 500 ({\"error\": 500})"),
|
||||
},
|
||||
{
|
||||
msg: "test getting back a single data point",
|
||||
status: http.StatusOK,
|
||||
responseBody: `{
|
||||
"items": []
|
||||
}`,
|
||||
err: errors.New("expected at least 1 event-type, 0 returned"),
|
||||
},
|
||||
} {
|
||||
tt.Run(ti.msg, func(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(
|
||||
func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(ti.status)
|
||||
_, err := w.Write([]byte(ti.responseBody))
|
||||
assert.NoError(t, err)
|
||||
}),
|
||||
)
|
||||
defer ts.Close()
|
||||
|
||||
nakadiClient := NewNakadiClient(ts.URL, client)
|
||||
consumerLagSeconds, err := nakadiClient.ConsumerLagSeconds(context.Background(), "id")
|
||||
assert.Equal(t, ti.err, err)
|
||||
assert.Equal(t, ti.consumerLagSeconds, consumerLagSeconds)
|
||||
unconsumedEvents, err := nakadiClient.UnconsumedEvents(context.Background(), "id")
|
||||
assert.Equal(t, ti.err, err)
|
||||
assert.Equal(t, ti.unconsumedEvents, unconsumedEvents)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
@ -35,6 +35,7 @@ import (
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/client/clientset/versioned"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/collector"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/controller/scheduledscaling"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/provider"
|
||||
"github.com/zalando-incubator/kube-metrics-adapter/pkg/zmon"
|
||||
"golang.org/x/oauth2"
|
||||
@ -64,6 +65,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
EnableExternalMetricsAPI: true,
|
||||
MetricsAddress: ":7979",
|
||||
ZMONTokenName: "zmon",
|
||||
NakadiTokenName: "nakadi",
|
||||
CredentialsDir: "/meta/credentials",
|
||||
ExternalRPSMetricName: "skipper_serve_host_duration_seconds_count",
|
||||
}
|
||||
@ -110,8 +112,12 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
||||
"url of ZMON KariosDB endpoint to query for ZMON checks")
|
||||
flags.StringVar(&o.ZMONTokenName, "zmon-token-name", o.ZMONTokenName, ""+
|
||||
"name of the token used to query ZMON")
|
||||
flags.StringVar(&o.NakadiEndpoint, "nakadi-endpoint", o.NakadiEndpoint, ""+
|
||||
"url of Nakadi endpoint to for nakadi subscription stats")
|
||||
flags.StringVar(&o.NakadiTokenName, "nakadi-token-name", o.NakadiTokenName, ""+
|
||||
"name of the token used to call nakadi subscription API")
|
||||
flags.StringVar(&o.Token, "token", o.Token, ""+
|
||||
"static oauth2 token to use when calling external services like ZMON")
|
||||
"static oauth2 token to use when calling external services like ZMON and Nakadi")
|
||||
flags.StringVar(&o.CredentialsDir, "credentials-dir", o.CredentialsDir, ""+
|
||||
"path to the credentials dir where tokens are stored")
|
||||
flags.BoolVar(&o.SkipperIngressMetrics, "skipper-ingress-metrics", o.SkipperIngressMetrics, ""+
|
||||
@ -274,6 +280,27 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.ZMONMetricType, collector.ZMONCheckMetricLegacy}, zmonPlugin)
|
||||
}
|
||||
|
||||
// enable Nakadi based metrics
|
||||
if o.NakadiEndpoint != "" {
|
||||
var tokenSource oauth2.TokenSource
|
||||
if o.Token != "" {
|
||||
tokenSource = oauth2.StaticTokenSource(&oauth2.Token{AccessToken: o.Token})
|
||||
} else {
|
||||
tokenSource = platformiam.NewTokenSource(o.NakadiTokenName, o.CredentialsDir)
|
||||
}
|
||||
|
||||
httpClient := newOauth2HTTPClient(ctx, tokenSource)
|
||||
|
||||
nakadiClient := nakadi.NewNakadiClient(o.NakadiEndpoint, httpClient)
|
||||
|
||||
nakadiPlugin, err := collector.NewNakadiCollectorPlugin(nakadiClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize Nakadi collector plugin: %v", err)
|
||||
}
|
||||
|
||||
collectorFactory.RegisterExternalCollector([]string{collector.NakadiMetricType}, nakadiPlugin)
|
||||
}
|
||||
|
||||
awsSessions := make(map[string]*session.Session, len(o.AWSRegions))
|
||||
for _, region := range o.AWSRegions {
|
||||
awsSessions[region], err = session.NewSessionWithOptions(session.Options{
|
||||
@ -427,6 +454,10 @@ type AdapterServerOptions struct {
|
||||
ZMONKariosDBEndpoint string
|
||||
// ZMONTokenName is the name of the token used to query ZMON
|
||||
ZMONTokenName string
|
||||
// NakadiEndpoint enables Nakadi metrics from the specified endpoint
|
||||
NakadiEndpoint string
|
||||
// NakadiTokenName is the name of the token used to call Nakadi
|
||||
NakadiTokenName string
|
||||
// Token is an oauth2 token used to authenticate with services like
|
||||
// ZMON.
|
||||
Token string
|
||||
|
Reference in New Issue
Block a user