mirror of
https://github.com/zalando-incubator/kube-metrics-adapter.git
synced 2026-03-15 04:27:29 +00:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 07c0e179b3 | |||
| 29ee953a16 | |||
| f78ef26857 | |||
| a3c14e9dcb | |||
| b6b13fb31a | |||
| 0a06691d39 | |||
| 2d1d51e829 | |||
| 41761e62df | |||
| ed4c93abbb | |||
| b2194ca136 |
@@ -172,6 +172,10 @@ kind: HorizontalPodAutoscaler
|
||||
metadata:
|
||||
name: myapp-hpa
|
||||
annotations:
|
||||
# This annotation is optional.
|
||||
# If specified, then this prometheus server is used,
|
||||
# instead of the prometheus server specified as the CLI argument `--prometheus-server`.
|
||||
metric-config.external.prometheus-query.prometheus/prometheus-server: http://prometheus.my-namespace.svc
|
||||
# metric-config.<metricType>.<metricName>.<collectorName>/<configKey>
|
||||
# <configKey> == query-name
|
||||
metric-config.external.prometheus-query.prometheus/processed-events-per-second: |
|
||||
|
||||
+5
-1
@@ -7,6 +7,8 @@ pipeline:
|
||||
- /go/pkg/mod # pkg cache for Go modules
|
||||
- ~/.cache/go-build # Go build cache
|
||||
type: script
|
||||
env:
|
||||
GOFLAGS: "-mod=readonly"
|
||||
commands:
|
||||
- desc: test
|
||||
cmd: |
|
||||
@@ -18,9 +20,11 @@ pipeline:
|
||||
cmd: |
|
||||
if [[ $CDP_TARGET_BRANCH == master && ! $CDP_PULL_REQUEST_NUMBER ]]; then
|
||||
IMAGE=registry-write.opensource.zalan.do/teapot/kube-metrics-adapter
|
||||
VERSION=$(git describe --tags --always --dirty)
|
||||
VERSION=$(git describe --tags --always)
|
||||
else
|
||||
IMAGE=registry-write.opensource.zalan.do/teapot/kube-metrics-adapter-test
|
||||
VERSION=$CDP_BUILD_VERSION
|
||||
fi
|
||||
IMAGE=$IMAGE VERSION=$VERSION make build.docker
|
||||
git diff --stat --exit-code
|
||||
IMAGE=$IMAGE VERSION=$VERSION make build.push
|
||||
|
||||
@@ -89,3 +89,5 @@ require (
|
||||
k8s.io/kube-openapi v0.0.0-20180731170545-e3762e86a74c // indirect
|
||||
k8s.io/metrics v0.0.0-20190226180357-f3f09b9076d1
|
||||
)
|
||||
|
||||
go 1.13
|
||||
|
||||
@@ -56,11 +56,11 @@ func NewAWSSQSCollector(sessions map[string]*session.Session, config *MetricConf
|
||||
return nil, fmt.Errorf("selector for queue is not specified")
|
||||
}
|
||||
|
||||
name, ok := config.Metric.Selector.MatchLabels[sqsQueueNameLabelKey]
|
||||
name, ok := config.Config[sqsQueueNameLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("sqs queue name not specified on metric")
|
||||
}
|
||||
region, ok := config.Metric.Selector.MatchLabels[sqsQueueRegionLabelKey]
|
||||
region, ok := config.Config[sqsQueueRegionLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("sqs queue region is not specified on metric")
|
||||
}
|
||||
|
||||
@@ -208,7 +208,9 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
|
||||
Config: map[string]string{},
|
||||
}
|
||||
|
||||
if metric.Type == autoscalingv2.ExternalMetricSourceType {
|
||||
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
|
||||
metric.External.Metric.Selector != nil &&
|
||||
metric.External.Metric.Selector.MatchLabels != nil {
|
||||
config.Config = metric.External.Metric.Selector.MatchLabels
|
||||
}
|
||||
|
||||
@@ -217,7 +219,11 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
|
||||
config.CollectorName = annotationConfigs.CollectorName
|
||||
config.Interval = annotationConfigs.Interval
|
||||
config.PerReplica = annotationConfigs.PerReplica
|
||||
config.Config = annotationConfigs.Configs
|
||||
// configs specified in annotations takes precedence
|
||||
// over labels
|
||||
for k, v := range annotationConfigs.Configs {
|
||||
config.Config[k] = v
|
||||
}
|
||||
}
|
||||
metricConfigs = append(metricConfigs, config)
|
||||
}
|
||||
|
||||
@@ -18,8 +18,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
PrometheusMetricName = "prometheus-query"
|
||||
prometheusQueryNameLabelKey = "query-name"
|
||||
PrometheusMetricName = "prometheus-query"
|
||||
prometheusQueryNameLabelKey = "query-name"
|
||||
prometheusServerAnnotationKey = "prometheus-server"
|
||||
)
|
||||
|
||||
type NoResultError struct {
|
||||
@@ -38,7 +39,7 @@ type PrometheusCollectorPlugin struct {
|
||||
func NewPrometheusCollectorPlugin(client kubernetes.Interface, prometheusServer string) (*PrometheusCollectorPlugin, error) {
|
||||
cfg := api.Config{
|
||||
Address: prometheusServer,
|
||||
RoundTripper: &http.Transport{},
|
||||
RoundTripper: http.DefaultTransport,
|
||||
}
|
||||
|
||||
promClient, err := api.NewClient(cfg)
|
||||
@@ -90,7 +91,11 @@ func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa
|
||||
return nil, fmt.Errorf("no prometheus query defined")
|
||||
}
|
||||
case autoscalingv2.ExternalMetricSourceType:
|
||||
queryName, ok := config.Metric.Selector.MatchLabels[prometheusQueryNameLabelKey]
|
||||
if config.Metric.Selector == nil {
|
||||
return nil, fmt.Errorf("selector for prometheus query is not specified")
|
||||
}
|
||||
|
||||
queryName, ok := config.Config[prometheusQueryNameLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("query name not specified on metric")
|
||||
}
|
||||
@@ -101,6 +106,20 @@ func NewPrometheusCollector(client kubernetes.Interface, promAPI promv1.API, hpa
|
||||
} else {
|
||||
return nil, fmt.Errorf("no prometheus query defined for metric")
|
||||
}
|
||||
|
||||
// Use custom Prometheus URL if defined in HPA annotation.
|
||||
if promServer, ok := config.Config[prometheusServerAnnotationKey]; ok {
|
||||
cfg := api.Config{
|
||||
Address: promServer,
|
||||
RoundTripper: http.DefaultTransport,
|
||||
}
|
||||
|
||||
promClient, err := api.NewClient(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.promAPI = promv1.NewAPI(promClient)
|
||||
}
|
||||
}
|
||||
|
||||
return c, nil
|
||||
|
||||
@@ -2,7 +2,9 @@ package collector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -19,6 +21,10 @@ const (
|
||||
rpsMetricBackendSeparator = ","
|
||||
)
|
||||
|
||||
var (
|
||||
errBackendNameMissing = errors.New("backend name must be specified for requests-per-second when traffic switching is used")
|
||||
)
|
||||
|
||||
// SkipperCollectorPlugin is a collector plugin for initializing metrics
|
||||
// collectors for getting skipper ingress metrics.
|
||||
type SkipperCollectorPlugin struct {
|
||||
@@ -81,36 +87,40 @@ func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hp
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getAnnotationWeight(backendWeights string, backend string) (float64, bool) {
|
||||
func getAnnotationWeight(backendWeights string, backend string) float64 {
|
||||
var weightsMap map[string]int
|
||||
err := json.Unmarshal([]byte(backendWeights), &weightsMap)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
return 0
|
||||
}
|
||||
if weight, ok := weightsMap[backend]; ok {
|
||||
return float64(weight) / 100, true
|
||||
return float64(weight) / 100
|
||||
}
|
||||
return 0, false
|
||||
return 0
|
||||
}
|
||||
|
||||
func getWeights(ingressAnnotations map[string]string, backendAnnotations []string, backend string) float64 {
|
||||
var maxWeight float64 = -1
|
||||
weightSet := false
|
||||
func getWeights(ingressAnnotations map[string]string, backendAnnotations []string, backend string) (float64, error) {
|
||||
maxWeight := 0.0
|
||||
annotationsPresent := false
|
||||
|
||||
for _, anno := range backendAnnotations {
|
||||
if weightsMap, ok := ingressAnnotations[anno]; ok {
|
||||
weight, isPresent := getAnnotationWeight(weightsMap, backend)
|
||||
if isPresent {
|
||||
weightSet = true
|
||||
if weight > maxWeight {
|
||||
maxWeight = weight
|
||||
}
|
||||
}
|
||||
annotationsPresent = true
|
||||
maxWeight = math.Max(maxWeight, getAnnotationWeight(weightsMap, backend))
|
||||
}
|
||||
}
|
||||
if weightSet {
|
||||
return maxWeight
|
||||
|
||||
// Fallback for ingresses that don't use traffic switching
|
||||
if !annotationsPresent {
|
||||
return 1.0, nil
|
||||
}
|
||||
return 1.0
|
||||
|
||||
// Require backend name here
|
||||
if backend != "" {
|
||||
return maxWeight, nil
|
||||
}
|
||||
|
||||
return 0.0, errBackendNameMissing
|
||||
}
|
||||
|
||||
// getCollector returns a collector for getting the metrics.
|
||||
@@ -120,7 +130,10 @@ func (c *SkipperCollector) getCollector() (Collector, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
backendWeight := getWeights(ingress.Annotations, c.backendAnnotations, c.backend)
|
||||
backendWeight, err := getWeights(ingress.Annotations, c.backendAnnotations, c.backend)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := c.config
|
||||
|
||||
var collector Collector
|
||||
|
||||
@@ -106,6 +106,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
backend string
|
||||
ingressName string
|
||||
collectedMetric int
|
||||
expectError bool
|
||||
namespace string
|
||||
backendWeights map[string]map[string]int
|
||||
replicas int32
|
||||
@@ -177,7 +178,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
msg: "test backend is not set",
|
||||
metrics: []int{100, 1500, 700},
|
||||
ingressName: "dummy-ingress",
|
||||
collectedMetric: 1500,
|
||||
collectedMetric: 0,
|
||||
namespace: "default",
|
||||
backend: "backend3",
|
||||
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 100, "backend1": 0}},
|
||||
@@ -185,6 +186,42 @@ func TestSkipperCollector(t *testing.T) {
|
||||
readyReplicas: 1,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
},
|
||||
{
|
||||
msg: "test no annotations set",
|
||||
metrics: []int{100, 1500, 700},
|
||||
ingressName: "dummy-ingress",
|
||||
collectedMetric: 1500,
|
||||
namespace: "default",
|
||||
backend: "backend3",
|
||||
backendWeights: map[string]map[string]int{},
|
||||
replicas: 1,
|
||||
readyReplicas: 1,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
},
|
||||
{
|
||||
msg: "test annotations are set but backend is missing",
|
||||
metrics: []int{100, 1500, 700},
|
||||
ingressName: "dummy-ingress",
|
||||
expectError: true,
|
||||
namespace: "default",
|
||||
backend: "",
|
||||
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 100, "backend1": 0}},
|
||||
replicas: 1,
|
||||
readyReplicas: 1,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
},
|
||||
{
|
||||
msg: "test annotations are missing and backend is unset",
|
||||
metrics: []int{100, 1500, 700},
|
||||
ingressName: "dummy-ingress",
|
||||
collectedMetric: 1500,
|
||||
namespace: "default",
|
||||
backend: "",
|
||||
backendWeights: nil,
|
||||
replicas: 1,
|
||||
readyReplicas: 1,
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation},
|
||||
},
|
||||
{
|
||||
msg: "test partial backend annotations",
|
||||
metrics: []int{100, 1500, 700},
|
||||
@@ -201,7 +238,7 @@ func TestSkipperCollector(t *testing.T) {
|
||||
backendAnnotations: []string{testBackendWeightsAnnotation, testStacksetWeightsAnnotation},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.msg, func(tt *testing.T) {
|
||||
t.Run(tc.msg, func(t *testing.T) {
|
||||
client := fake.NewSimpleClientset()
|
||||
err := makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.backendWeights)
|
||||
require.NoError(t, err)
|
||||
@@ -211,11 +248,15 @@ func TestSkipperCollector(t *testing.T) {
|
||||
_, err = newDeployment(client, tc.namespace, tc.backend, tc.replicas, tc.readyReplicas)
|
||||
require.NoError(t, err)
|
||||
collector, err := NewSkipperCollector(client, plugin, hpa, config, time.Minute, tc.backendAnnotations, tc.backend)
|
||||
require.NoError(tt, err, "failed to create skipper collector: %v", err)
|
||||
require.NoError(t, err, "failed to create skipper collector: %v", err)
|
||||
collected, err := collector.GetMetrics()
|
||||
require.NoError(tt, err, "failed to collect metrics: %v", err)
|
||||
require.Len(t, collected, 1, "the number of metrics returned is not 1")
|
||||
require.EqualValues(t, tc.collectedMetric, collected[0].Custom.Value.Value(), "the returned metric is not expected value")
|
||||
if tc.expectError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err, "failed to collect metrics: %v", err)
|
||||
require.Len(t, collected, 1, "the number of metrics returned is not 1")
|
||||
require.EqualValues(t, tc.collectedMetric, collected[0].Custom.Value.Value(), "the returned metric is not expected value")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,11 +44,7 @@ func NewZMONCollectorPlugin(zmon zmon.ZMON) (*ZMONCollectorPlugin, error) {
|
||||
func (c *ZMONCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
|
||||
switch config.Metric.Name {
|
||||
case ZMONCheckMetric:
|
||||
annotations := map[string]string{}
|
||||
if hpa != nil {
|
||||
annotations = hpa.Annotations
|
||||
}
|
||||
return NewZMONCollector(c.zmon, config, annotations, interval)
|
||||
return NewZMONCollector(c.zmon, config, interval)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("metric '%s' not supported", config.Metric.Name)
|
||||
@@ -68,7 +64,11 @@ type ZMONCollector struct {
|
||||
}
|
||||
|
||||
// NewZMONCollector initializes a new ZMONCollector.
|
||||
func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[string]string, interval time.Duration) (*ZMONCollector, error) {
|
||||
func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, interval time.Duration) (*ZMONCollector, error) {
|
||||
if config.Metric.Selector == nil {
|
||||
return nil, fmt.Errorf("selector for zmon-check is not specified")
|
||||
}
|
||||
|
||||
checkIDStr, ok := config.Config[zmonCheckIDLabelKey]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("ZMON check ID not specified on metric")
|
||||
@@ -86,11 +86,6 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
|
||||
key = k
|
||||
}
|
||||
|
||||
// annotations takes precedence over label
|
||||
if k, ok := annotations[zmonKeyAnnotationKey]; ok {
|
||||
key = k
|
||||
}
|
||||
|
||||
duration := defaultQueryDuration
|
||||
|
||||
// parse optional duration value
|
||||
@@ -110,16 +105,6 @@ func NewZMONCollector(zmon zmon.ZMON, config *MetricConfig, annotations map[stri
|
||||
}
|
||||
}
|
||||
|
||||
// parse tags from annotations
|
||||
// tags defined in annotations takes precedence over tags defined in
|
||||
// the labels.
|
||||
for k, v := range annotations {
|
||||
if strings.HasPrefix(k, zmonTagPrefixAnnotationKey) {
|
||||
key := strings.TrimPrefix(k, zmonTagPrefixAnnotationKey)
|
||||
tags[key] = v
|
||||
}
|
||||
}
|
||||
|
||||
// default aggregator is last
|
||||
aggregators := []string{"last"}
|
||||
if k, ok := config.Config[zmonAggregatorsLabelKey]; ok {
|
||||
|
||||
@@ -50,20 +50,6 @@ func TestZMONCollectorNewCollector(t *testing.T) {
|
||||
require.Equal(t, []string{"max"}, zmonCollector.aggregators)
|
||||
require.Equal(t, map[string]string{"alias": "cluster_alias"}, zmonCollector.tags)
|
||||
|
||||
// check that annotations overwrites labels
|
||||
hpa.ObjectMeta = metav1.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
zmonKeyAnnotationKey: "annotation_key",
|
||||
zmonTagPrefixAnnotationKey + "alias": "cluster_alias_annotation",
|
||||
},
|
||||
}
|
||||
collector, err = collectPlugin.NewCollector(hpa, config, 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, collector)
|
||||
zmonCollector = collector.(*ZMONCollector)
|
||||
require.Equal(t, "annotation_key", zmonCollector.key)
|
||||
require.Equal(t, map[string]string{"alias": "cluster_alias_annotation"}, zmonCollector.tags)
|
||||
|
||||
// should fail if the metric name isn't ZMON
|
||||
config.Metric = newMetricIdentifier("non-zmon-check")
|
||||
_, err = collectPlugin.NewCollector(nil, config, 1*time.Second)
|
||||
@@ -131,7 +117,7 @@ func TestZMONCollectorGetMetrics(tt *testing.T) {
|
||||
dataPoints: ti.dataPoints,
|
||||
}
|
||||
|
||||
zmonCollector, err := NewZMONCollector(z, config, nil, 1*time.Second)
|
||||
zmonCollector, err := NewZMONCollector(z, config, 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics, _ := zmonCollector.GetMetrics()
|
||||
|
||||
Reference in New Issue
Block a user