Compare commits

..

1 Commits

Author SHA1 Message Date
5e6d304ecd Support networking.k8s.io/v1beta1 Ingresses
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
2019-10-08 16:55:49 +02:00
11 changed files with 289 additions and 447 deletions

View File

@ -1,13 +1,10 @@
# kube-metrics-adapter # kube-metrics-adapter
[![Build Status](https://travis-ci.org/zalando-incubator/kube-metrics-adapter.svg?branch=master)](https://travis-ci.org/zalando-incubator/kube-metrics-adapter) [![Build Status](https://travis-ci.org/zalando-incubator/kube-metrics-adapter.svg?branch=master)](https://travis-ci.org/zalando-incubator/kube-metrics-adapter)
[![Coverage Status](https://coveralls.io/repos/github/zalando-incubator/kube-metrics-adapter/badge.svg?branch=master)](https://coveralls.io/github/zalando-incubator/kube-metrics-adapter?branch=master) [![Coverage Status](https://coveralls.io/repos/github/zalando-incubator/kube-metrics-adapter/badge.svg?branch=master)](https://coveralls.io/github/zalando-incubator/kube-metrics-adapter?branch=master)
Kube Metrics Adapter is a general purpose metrics adapter for Kubernetes that Kube Metrics Adapter is a general purpose metrics adapter for Kubernetes that
can collect and serve custom and external metrics for Horizontal Pod can collect and serve custom and external metrics for Horizontal Pod
Autoscaling. Autoscaling.
It supports scaling based on [Prometheus metrics](https://prometheus.io/), [SQS queues](https://aws.amazon.com/sqs/) and others out of the box.
It discovers Horizontal Pod Autoscaling resources and starts to collect the It discovers Horizontal Pod Autoscaling resources and starts to collect the
requested metrics and stores them in memory. It's implemented using the requested metrics and stores them in memory. It's implemented using the
[custom-metrics-apiserver](https://github.com/kubernetes-incubator/custom-metrics-apiserver) [custom-metrics-apiserver](https://github.com/kubernetes-incubator/custom-metrics-apiserver)
@ -44,18 +41,6 @@ The `metric-config.*` annotations are used by the `kube-metrics-adapter` to
configure a collector for getting the metrics. In the above example it configure a collector for getting the metrics. In the above example it
configures a *json-path pod collector*. configures a *json-path pod collector*.
## Kubernetes compatibility
Like the [support
policy](https://kubernetes.io/docs/setup/release/version-skew-policy/) offered
for Kubernetes, this project aims to support the latest three minor releases of
Kubernetes.
Currently the default supported API is `autoscaling/v2beta1`. However we aim to
move to `autoscaling/v2beta2` (available since `v1.12`) in the near future as
this adds a lot of improvements over `v2beta1`. The move to `v2beta2` will most
likely happen as soon as [GKE adds support for it](https://issuetracker.google.com/issues/135624588).
## Building ## Building
This project uses [Go modules](https://github.com/golang/go/wiki/Modules) as This project uses [Go modules](https://github.com/golang/go/wiki/Modules) as
@ -86,9 +71,9 @@ Currently only `json-path` collection is supported.
### Supported metrics ### Supported metrics
| Metric | Description | Type | K8s Versions | | Metric | Description | Type |
| ------------ | -------------- | ------- | -- | | ------------ | -------------- | ------- |
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Pods | `>=1.10` | | *custom* | No predefined metrics. Metrics are generated from user defined queries. | Pods |
### Example ### Example
@ -106,7 +91,6 @@ metadata:
metric-config.pods.requests-per-second.json-path/path: /metrics metric-config.pods.requests-per-second.json-path/path: /metrics
metric-config.pods.requests-per-second.json-path/port: "9090" metric-config.pods.requests-per-second.json-path/port: "9090"
metric-config.pods.requests-per-second.json-path/scheme: "https" metric-config.pods.requests-per-second.json-path/scheme: "https"
metric-config.pods.requests-per-second.json-path/aggregator: "max"
spec: spec:
scaleTargetRef: scaleTargetRef:
apiVersion: apps/v1 apiVersion: apps/v1
@ -144,11 +128,6 @@ The other configuration options `path`, `port` and `scheme` specify where the me
endpoint is exposed on the pod. The `path` and `port` options do not have default values endpoint is exposed on the pod. The `path` and `port` options do not have default values
so they must be defined. The `scheme` is optional and defaults to `http`. so they must be defined. The `scheme` is optional and defaults to `http`.
The `aggregator` configuration option specifies the aggregation function used to aggregate
values of JSONPath expressions that evaluate to arrays/slices of numbers.
It's optional but when the expression evaluates to an array/slice, it's absence will
produce an error. The supported aggregation functions are `avg`, `max`, `min` and `sum`.
## Prometheus collector ## Prometheus collector
The Prometheus collector is a generic collector which can map Prometheus The Prometheus collector is a generic collector which can map Prometheus
@ -172,10 +151,10 @@ the trade-offs between the two approaches.
### Supported metrics ### Supported metrics
| Metric | Description | Type | Kind | K8s Versions | | Metric | Description | Type | Kind |
| ------------ | -------------- | ------- | -- | -- | | ------------ | -------------- | ------- | -- |
| `prometheus-query` | Generic metric which requires a user defined query. | External | | `>=1.10` | | `prometheus-query` | Generic metric which requires a user defined query. | External | |
| *custom* | No predefined metrics. Metrics are generated from user defined queries. | Object | *any* | `>=1.10` | | *custom* | No predefined metrics. Metrics are generated from user defined queries. | Object | *any* |
### Example: External Metric ### Example: External Metric
@ -280,9 +259,9 @@ box so users don't have to define those manually.
### Supported metrics ### Supported metrics
| Metric | Description | Type | Kind | K8s Versions | | Metric | Description | Type | Kind |
| ----------- | -------------- | ------ | ---- | ---- | | ----------- | -------------- | ------ | ---- |
| `requests-per-second` | Scale based on requests per second for a certain ingress. | Object | `Ingress` | `>=1.14` (can work with `>=1.10`) | | `requests-per-second` | Scale based on requests per second for a certain ingress. | Object | `Ingress` |
### Example ### Example
@ -309,10 +288,7 @@ spec:
apiVersion: extensions/v1beta1 apiVersion: extensions/v1beta1
kind: Ingress kind: Ingress
name: myapp name: myapp
averageValue: 10 # Only works with Kubernetes >=1.14 targetValue: 10 # this will be treated as targetAverageValue
# for Kubernetes <1.14 you can use `targetValue` instead:
targetValue: 10 # this must be set, but has no effect if `averageValue` is defined.
# Otherwise it will be treated as targetAverageValue
``` ```
### Metric weighting based on backend ### Metric weighting based on backend
@ -326,17 +302,13 @@ return the requests-per-second being sent to the `backend1`. The ingress annotat
the backend weights can be obtained can be specified through the flag `--skipper-backends-annotation`. the backend weights can be obtained can be specified through the flag `--skipper-backends-annotation`.
**Note:** For Kubernetes `<v1.14` the HPA does not support `averageValue` for **Note:** As of Kubernetes v1.10 the HPA does not support `targetAverageValue` for
metrics of type `Object`. In case of requests per second it does not make sense metrics of type `Object`. In case of requests per second it does not make sense
to scale on a summed value because you can not make the total requests per to scale on a summed value because you can not make the total requests per
second go down by adding more pods. For this reason the skipper collector will second go down by adding more pods. For this reason the skipper collector will
automatically treat the value you define in `targetValue` as an average per pod automatically treat the value you define in `targetValue` as an average per pod
instead of a total sum. instead of a total sum.
**ONLY use `targetValue` if you are on Kubernetes
`<1.14`, it is not as percise as using `averageValue` and will not be supported
after Kubernetes `v1.16` is released according to the [support policy](https://kubernetes.io/docs/setup/release/version-skew-policy/).**
## AWS collector ## AWS collector
The AWS collector allows scaling based on external metrics exposed by AWS The AWS collector allows scaling based on external metrics exposed by AWS
@ -368,9 +340,9 @@ PolicyDocument:
### Supported metrics ### Supported metrics
| Metric | Description | Type | K8s Versions | | Metric | Description | Type |
| ------------ | ------- | -- | -- | | ------------ | ------- | -- |
| `sqs-queue-length` | Scale based on SQS queue length | External | `>=1.10` | | `sqs-queue-length` | Scale based on SQS queue length | External |
### Example ### Example
@ -416,9 +388,9 @@ The ZMON collector allows scaling based on external metrics exposed by
### Supported metrics ### Supported metrics
| Metric | Description | Type | K8s Versions | | Metric | Description | Type |
| ------------ | ------- | -- | -- | | ------------ | ------- | -- |
| `zmon-check` | Scale based on any ZMON check results | External | `>=1.10` | | `zmon-check` | Scale based on any ZMON check results | External |
### Example ### Example

View File

@ -1,8 +1,7 @@
We acknowledge that every line of code that we write may potentially contain security issues. We acknowledge that every line of code that we write may potentially contain security issues.
We are trying to deal with it responsibly and provide patches as quickly as possible.
We host our bug bounty program on HackerOne, it is currently private, therefore if you would like to report a vulnerability and get rewarded for it, please ask to join our program by filling this form: We are trying to deal with it responsibly and provide patches as quickly as possible. If you have anything to report to us please use the following channels:
https://corporate.zalando.com/en/services-and-contact#security-form Email: Tech-Security@zalando.de
OR
You can also send you report via this form if you do not want to join our bug bounty program and just want to report a vulnerability or security issue. Submit your vulnerability report through our bug bounty program at: https://hackerone.com/zalando

View File

@ -37,8 +37,7 @@ spec:
apiVersion: extensions/v1beta1 apiVersion: extensions/v1beta1
kind: Ingress kind: Ingress
name: custom-metrics-consumer name: custom-metrics-consumer
averageValue: 10 targetValue: 10 # this will be treated as targetAverageValue
targetValue: 10 # this must be set, but has no effect if `averageValue` is defined.
- type: External - type: External
external: external:
metricName: sqs-queue-length metricName: sqs-queue-length

View File

@ -165,7 +165,6 @@ type MetricConfig struct {
ObjectReference custom_metrics.ObjectReference ObjectReference custom_metrics.ObjectReference
PerReplica bool PerReplica bool
Interval time.Duration Interval time.Duration
MetricSpec autoscalingv2.MetricSpec
} }
// ParseHPAMetrics parses the HPA object into a list of metric configurations. // ParseHPAMetrics parses the HPA object into a list of metric configurations.
@ -207,15 +206,12 @@ func ParseHPAMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler) ([]*MetricConfi
MetricTypeName: typeName, MetricTypeName: typeName,
ObjectReference: ref, ObjectReference: ref,
Config: map[string]string{}, Config: map[string]string{},
MetricSpec: metric,
} }
if metric.Type == autoscalingv2.ExternalMetricSourceType && if metric.Type == autoscalingv2.ExternalMetricSourceType &&
metric.External.Metric.Selector != nil && metric.External.Metric.Selector != nil &&
metric.External.Metric.Selector.MatchLabels != nil { metric.External.Metric.Selector.MatchLabels != nil {
for k, v := range metric.External.Metric.Selector.MatchLabels { config.Config = metric.External.Metric.Selector.MatchLabels
config.Config[k] = v
}
} }
annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type) annotationConfigs, present := parser.GetAnnotationConfig(typeName.Metric.Name, typeName.Type)

View File

@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
@ -22,7 +21,6 @@ type JSONPathMetricsGetter struct {
scheme string scheme string
path string path string
port int port int
aggregator string
} }
// NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter. // NewJSONPathMetricsGetter initializes a new JSONPathMetricsGetter.
@ -30,12 +28,12 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
getter := &JSONPathMetricsGetter{} getter := &JSONPathMetricsGetter{}
if v, ok := config["json-key"]; ok { if v, ok := config["json-key"]; ok {
path, err := jsonpath.Compile(v) pat, err := jsonpath.Compile(v)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse json path definition: %v", err) return nil, fmt.Errorf("failed to parse json path definition: %v", err)
} }
getter.jsonPath = path getter.jsonPath = pat
} }
if v, ok := config["scheme"]; ok { if v, ok := config["scheme"]; ok {
@ -54,10 +52,6 @@ func NewJSONPathMetricsGetter(config map[string]string) (*JSONPathMetricsGetter,
getter.port = n getter.port = n
} }
if v, ok := config["aggregator"]; ok {
getter.aggregator = v
}
return getter, nil return getter, nil
} }
@ -89,38 +83,11 @@ func (g *JSONPathMetricsGetter) GetMetric(pod *corev1.Pod) (float64, error) {
return float64(res), nil return float64(res), nil
case float64: case float64:
return res, nil return res, nil
case []interface{}:
s, err := castSlice(res)
if err != nil {
return 0, err
}
return reduce(s, g.aggregator)
default: default:
return 0, fmt.Errorf("unsupported type %T", res) return 0, fmt.Errorf("unsupported type %T", res)
} }
} }
// castSlice takes a slice of interface and returns a slice of float64 if all
// values in slice were castable, else returns an error
func castSlice(in []interface{}) ([]float64, error) {
out := []float64{}
for _, v := range in {
switch v := v.(type) {
case int:
out = append(out, float64(v))
case float32:
out = append(out, float64(v))
case float64:
out = append(out, v)
default:
return nil, fmt.Errorf("slice was returned by JSONPath, but value inside is unsupported: %T", v)
}
}
return out, nil
}
// getPodMetrics returns the content of the pods metrics endpoint. // getPodMetrics returns the content of the pods metrics endpoint.
func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) { func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, error) {
if pod.Status.PodIP == "" { if pod.Status.PodIP == "" {
@ -164,64 +131,3 @@ func getPodMetrics(pod *corev1.Pod, scheme, path string, port int) ([]byte, erro
return data, nil return data, nil
} }
// reduce will reduce a slice of numbers given a aggregator function's name. If it's empty or not recognized, an error is returned.
func reduce(values []float64, aggregator string) (float64, error) {
switch aggregator {
case "avg":
return avg(values), nil
case "min":
return min(values), nil
case "max":
return max(values), nil
case "sum":
return sum(values), nil
default:
return 0, fmt.Errorf("slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", aggregator)
}
}
// avg implements the average mathematical function over a slice of float64
func avg(values []float64) float64 {
sum := sum(values)
return sum / float64(len(values))
}
// min implements the absolute minimum mathematical function over a slice of float64
func min(values []float64) float64 {
// initialized with positive infinity, all finite numbers are smaller than it
curMin := math.Inf(1)
for _, v := range values {
if v < curMin {
curMin = v
}
}
return curMin
}
// max implements the absolute maximum mathematical function over a slice of float64
func max(values []float64) float64 {
// initialized with negative infinity, all finite numbers are bigger than it
curMax := math.Inf(-1)
for _, v := range values {
if v > curMax {
curMax = v
}
}
return curMax
}
// sum implements the summation mathematical function over a slice of float64
func sum(values []float64) float64 {
res := 0.0
for _, v := range values {
res += v
}
return res
}

View File

@ -1,105 +0,0 @@
package collector
import (
"testing"
"github.com/oliveagle/jsonpath"
"github.com/stretchr/testify/require"
)
func TestNewJSONPathMetricsGetter(t *testing.T) {
configNoAggregator := map[string]string{
"json-key": "$.value",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
jpath1, _ := jsonpath.Compile(configNoAggregator["json-key"])
getterNoAggregator, err1 := NewJSONPathMetricsGetter(configNoAggregator)
require.NoError(t, err1)
require.Equal(t, &JSONPathMetricsGetter{
jsonPath: jpath1,
scheme: "http",
path: "/metrics",
port: 9090,
}, getterNoAggregator)
configAggregator := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "9090",
"aggregator": "avg",
}
jpath2, _ := jsonpath.Compile(configAggregator["json-key"])
getterAggregator, err2 := NewJSONPathMetricsGetter(configAggregator)
require.NoError(t, err2)
require.Equal(t, &JSONPathMetricsGetter{
jsonPath: jpath2,
scheme: "http",
path: "/metrics",
port: 9090,
aggregator: "avg",
}, getterAggregator)
configErrorJSONPath := map[string]string{
"json-key": "{}",
"scheme": "http",
"path": "/metrics",
"port": "9090",
}
_, err3 := NewJSONPathMetricsGetter(configErrorJSONPath)
require.Error(t, err3)
configErrorPort := map[string]string{
"json-key": "$.values",
"scheme": "http",
"path": "/metrics",
"port": "a9090",
}
_, err4 := NewJSONPathMetricsGetter(configErrorPort)
require.Error(t, err4)
}
func TestCastSlice(t *testing.T) {
res1, err1 := castSlice([]interface{}{1, 2, 3})
require.NoError(t, err1)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res1)
res2, err2 := castSlice([]interface{}{float32(1.0), float32(2.0), float32(3.0)})
require.NoError(t, err2)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res2)
res3, err3 := castSlice([]interface{}{float64(1.0), float64(2.0), float64(3.0)})
require.NoError(t, err3)
require.Equal(t, []float64{1.0, 2.0, 3.0}, res3)
res4, err4 := castSlice([]interface{}{1, 2, "some string"})
require.Errorf(t, err4, "slice was returned by JSONPath, but value inside is unsupported: %T", "string")
require.Equal(t, []float64(nil), res4)
}
func TestReduce(t *testing.T) {
average, err1 := reduce([]float64{1, 2, 3}, "avg")
require.NoError(t, err1)
require.Equal(t, 2.0, average)
min, err2 := reduce([]float64{1, 2, 3}, "min")
require.NoError(t, err2)
require.Equal(t, 1.0, min)
max, err3 := reduce([]float64{1, 2, 3}, "max")
require.NoError(t, err3)
require.Equal(t, 3.0, max)
sum, err4 := reduce([]float64{1, 2, 3}, "sum")
require.NoError(t, err4)
require.Equal(t, 6.0, sum)
_, err5 := reduce([]float64{1, 2, 3}, "inexistent_function")
require.Errorf(t, err5, "slice of numbers was returned by JSONPath, but no valid aggregator function was specified: %v", "inexistent_function")
}

View File

@ -0,0 +1,67 @@
package collector
import (
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/resource"
)
// MaxWeightedCollector is a simple aggregator collector that returns the maximum value
// of metrics from all collectors.
type MaxWeightedCollector struct {
collectors []Collector
interval time.Duration
weight float64
}
// NewMaxWeightedCollector initializes a new MaxWeightedCollector.
func NewMaxWeightedCollector(interval time.Duration, weight float64, collectors ...Collector) *MaxWeightedCollector {
return &MaxWeightedCollector{
collectors: collectors,
interval: interval,
weight: weight,
}
}
// GetMetrics gets metrics from all collectors and return the higest value.
func (c *MaxWeightedCollector) GetMetrics() ([]CollectedMetric, error) {
errors := make([]error, 0)
collectedMetrics := make([]CollectedMetric, 0)
for _, collector := range c.collectors {
values, err := collector.GetMetrics()
if err != nil {
if _, ok := err.(NoResultError); ok {
errors = append(errors, err)
continue
}
return nil, err
}
collectedMetrics = append(collectedMetrics, values...)
}
if len(collectedMetrics) == 0 {
if len(errors) == 0 {
return nil, fmt.Errorf("no metrics collected, cannot determine max")
}
errorStrings := make([]string, len(errors))
for i, e := range errors {
errorStrings[i] = e.Error()
}
allErrors := strings.Join(errorStrings, ",")
return nil, fmt.Errorf("could not determine maximum due to errors: %s", allErrors)
}
max := collectedMetrics[0]
for _, value := range collectedMetrics {
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
max = value
}
}
max.Custom.Value = *resource.NewMilliQuantity(int64(c.weight*float64(max.Custom.Value.MilliValue())), resource.DecimalSI)
return []CollectedMetric{max}, nil
}
// Interval returns the interval at which the collector should run.
func (c *MaxWeightedCollector) Interval() time.Duration {
return c.interval
}

View File

@ -0,0 +1,99 @@
package collector
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/custom_metrics"
)
type dummyCollector struct {
value int64
}
func (c dummyCollector) Interval() time.Duration {
return time.Second
}
func (c dummyCollector) GetMetrics() ([]CollectedMetric, error) {
switch c.value {
case 0:
return nil, NoResultError{query: "invalid query"}
case -1:
return nil, fmt.Errorf("test error")
default:
quantity := resource.NewQuantity(c.value, resource.DecimalSI)
return []CollectedMetric{
{
Custom: custom_metrics.MetricValue{
Value: *quantity,
},
},
}, nil
}
}
func TestMaxCollector(t *testing.T) {
for _, tc := range []struct {
name string
values []int64
expected int
weight float64
errored bool
}{
{
name: "basic",
values: []int64{100, 10, 9},
expected: 100,
weight: 1,
errored: false,
},
{
name: "weighted",
values: []int64{100, 10, 9},
expected: 20,
weight: 0.2,
errored: false,
},
{
name: "with error",
values: []int64{10, 9, -1},
weight: 0.5,
errored: true,
},
{
name: "some invalid results",
values: []int64{0, 1, 0, 10, 9},
expected: 5,
weight: 0.5,
errored: false,
},
{
name: "both invalid results and errors",
values: []int64{0, 1, 0, -1, 10, 9},
weight: 0.5,
errored: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
collectors := make([]Collector, len(tc.values))
for i, v := range tc.values {
collectors[i] = dummyCollector{value: v}
}
wc := NewMaxWeightedCollector(time.Second, tc.weight, collectors...)
metrics, err := wc.GetMetrics()
if tc.errored {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Len(t, metrics, 1)
require.EqualValues(t, tc.expected, metrics[0].Custom.Value.Value())
}
})
}
}

View File

@ -3,7 +3,6 @@ package collector
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"net/http" "net/http"
"time" "time"
@ -147,7 +146,7 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
sampleValue = scalar.Value sampleValue = scalar.Value
} }
if math.IsNaN(float64(sampleValue)) { if sampleValue.String() == "NaN" {
return nil, &NoResultError{query: c.query} return nil, &NoResultError{query: c.query}
} }

View File

@ -5,7 +5,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"math" "math"
"regexp"
"strings" "strings"
"time" "time"
@ -17,7 +16,7 @@ import (
) )
const ( const (
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"%s"}[1m])) * %.4f)` rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
rpsMetricName = "requests-per-second" rpsMetricName = "requests-per-second"
rpsMetricBackendSeparator = "," rpsMetricBackendSeparator = ","
) )
@ -73,7 +72,8 @@ type SkipperCollector struct {
} }
// NewSkipperCollector initializes a new SkipperCollector. // NewSkipperCollector initializes a new SkipperCollector.
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) { func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2.HorizontalPodAutoscaler,
config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) {
return &SkipperCollector{ return &SkipperCollector{
client: client, client: client,
objectReference: config.ObjectReference, objectReference: config.ObjectReference,
@ -125,28 +125,42 @@ func getWeights(ingressAnnotations map[string]string, backendAnnotations []strin
// getCollector returns a collector for getting the metrics. // getCollector returns a collector for getting the metrics.
func (c *SkipperCollector) getCollector() (Collector, error) { func (c *SkipperCollector) getCollector() (Collector, error) {
var annotations map[string]string
var hosts []string
switch c.objectReference.APIVersion {
case "extensions/v1beta1":
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{}) ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
annotations = ingress.Annotations
for _, rule := range ingress.Spec.Rules {
hosts = append(hosts, rule.Host)
}
case "networking.k8s.io/v1beta1":
ingress, err := c.client.NetworkingV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
annotations = ingress.Annotations
for _, rule := range ingress.Spec.Rules {
hosts = append(hosts, rule.Host)
}
}
backendWeight, err := getWeights(ingress.Annotations, c.backendAnnotations, c.backend) backendWeight, err := getWeights(annotations, c.backendAnnotations, c.backend)
if err != nil { if err != nil {
return nil, err return nil, err
} }
config := c.config config := c.config
var escapedHostnames []string var collector Collector
for _, rule := range ingress.Spec.Rules { collectors := make([]Collector, 0, len(hosts))
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(rule.Host, ".", "_", -1))) for _, host := range hosts {
} host := strings.Replace(host, ".", "_", -1)
if len(escapedHostnames) == 0 {
return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name)
}
config.Config = map[string]string{ config.Config = map[string]string{
"query": fmt.Sprintf(rpsQuery, strings.Join(escapedHostnames, "|"), backendWeight), "query": fmt.Sprintf(rpsQuery, host),
} }
config.PerReplica = false // per replica is handled outside of the prometheus collector config.PerReplica = false // per replica is handled outside of the prometheus collector
@ -155,6 +169,15 @@ func (c *SkipperCollector) getCollector() (Collector, error) {
return nil, err return nil, err
} }
collectors = append(collectors, collector)
}
if len(collectors) > 0 {
collector = NewMaxWeightedCollector(c.interval, backendWeight, collectors...)
} else {
return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name)
}
return collector, nil return collector, nil
} }
@ -174,10 +197,6 @@ func (c *SkipperCollector) GetMetrics() ([]CollectedMetric, error) {
return nil, fmt.Errorf("expected to only get one metric value, got %d", len(values)) return nil, fmt.Errorf("expected to only get one metric value, got %d", len(values))
} }
value := values[0]
// For Kubernetes <v1.14 we have to fall back to manual average
if c.config.MetricSpec.Object.Target.AverageValue == nil {
// get current replicas for the targeted scale object. This is used to // get current replicas for the targeted scale object. This is used to
// calculate an average metric instead of total. // calculate an average metric instead of total.
// targetAverageValue will be available in Kubernetes v1.12 // targetAverageValue will be available in Kubernetes v1.12
@ -191,9 +210,9 @@ func (c *SkipperCollector) GetMetrics() ([]CollectedMetric, error) {
return nil, fmt.Errorf("unable to get average value for %d replicas", replicas) return nil, fmt.Errorf("unable to get average value for %d replicas", replicas)
} }
value := values[0]
avgValue := float64(value.Custom.Value.MilliValue()) / float64(replicas) avgValue := float64(value.Custom.Value.MilliValue()) / float64(replicas)
value.Custom.Value = *resource.NewMilliQuantity(int64(avgValue), resource.DecimalSI) value.Custom.Value = *resource.NewMilliQuantity(int64(avgValue), resource.DecimalSI)
}
return []CollectedMetric{value}, nil return []CollectedMetric{value}, nil
} }

File diff suppressed because it is too large Load Diff