Added weighting of RPS metrics based on backend weights (#27)

* Added weighting of rps metrics based on backend weights

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Updated documented with instructions on how to use the backend weighting

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Changed separator for RPS metric and added flag to specify backend weights annotation.

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>

* Allow for multiple backends with for weighting.

Signed-off-by: Arjun Naik <arjun.rn@gmail.com>
This commit is contained in:
Arjun 2019-01-17 13:13:52 +01:00 committed by GitHub
parent f49f7821dc
commit 72aa672f51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 301 additions and 51 deletions

View File

@ -240,6 +240,17 @@ spec:
targetValue: 10 # this will be treated as targetAverageValue
```
### Metric weighting based on backend
Skipper supports sending traffic to different backend based on annotations present on the
`Ingress` object. When the metric name is specified without a backend as `requests-per-second`
then the number of replicas will be calculated based on the full traffic served by that ingress.
If however only the traffic being routed to a specific backend should be used then the
backend name can be specified as a metric name like `requests-per-second/backend1` which would
return the requests-per-second being sent to the `backend1`. The ingress annotation where
the backend weights can be obtained can be specified through the flag `--skipper-backends-annotation`.
**Note:** As of Kubernetes v1.10 the HPA does not support `targetAverageValue` for
metrics of type `Object`. In case of requests per second it does not make sense
to scale on a summed value because you can not make the total requests per

View File

@ -1,42 +1,54 @@
package collector
import "time"
import (
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
"time"
)
// MaxCollector is a simple aggregator collector that returns the maximum value
// MaxWeightedCollector is a simple aggregator collector that returns the maximum value
// of metrics from all collectors.
type MaxCollector struct {
type MaxWeightedCollector struct {
collectors []Collector
interval time.Duration
weight float64
}
// NewMaxCollector initializes a new MacCollector.
func NewMaxCollector(interval time.Duration, collectors ...Collector) *MaxCollector {
return &MaxCollector{
func NewMaxWeightedCollector(interval time.Duration, weight float64, collectors ...Collector) *MaxWeightedCollector {
return &MaxWeightedCollector{
collectors: collectors,
interval: interval,
weight: weight,
}
}
// GetMetrics gets metrics from all collectors and return the higest value.
func (c *MaxCollector) GetMetrics() ([]CollectedMetric, error) {
var max CollectedMetric
func (c *MaxWeightedCollector) GetMetrics() ([]CollectedMetric, error) {
collectedMetrics := make([]CollectedMetric, 0)
for _, collector := range c.collectors {
values, err := collector.GetMetrics()
if err != nil {
return nil, err
}
for _, value := range values {
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
max = value
}
for _, v := range values {
collectedMetrics = append(collectedMetrics, v)
}
}
if len(collectedMetrics) == 0 {
return nil, fmt.Errorf("no metrics collected, cannot determine max")
}
max := collectedMetrics[0]
for _, value := range collectedMetrics {
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
max = value
}
}
max.Custom.Value = *resource.NewMilliQuantity(int64(c.weight*float64(max.Custom.Value.MilliValue())), resource.DecimalSI)
return []CollectedMetric{max}, nil
}
// Interval returns the interval at which the collector should run.
func (c *MaxCollector) Interval() time.Duration {
func (c *MaxWeightedCollector) Interval() time.Duration {
return c.interval
}

View File

@ -1,72 +1,114 @@
package collector
import (
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
"strings"
"time"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
)
const (
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
rpsMetricName = "requests-per-second"
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
rpsMetricName = "requests-per-second"
rpsMetricBackendSeparator = "/"
)
// SkipperCollectorPlugin is a collector plugin for initializing metrics
// collectors for getting skipper ingress metrics.
type SkipperCollectorPlugin struct {
client kubernetes.Interface
plugin CollectorPlugin
client kubernetes.Interface
plugin CollectorPlugin
backendAnnotations []string
}
// NewSkipperCollectorPlugin initializes a new SkipperCollectorPlugin.
func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin) (*SkipperCollectorPlugin, error) {
func NewSkipperCollectorPlugin(client kubernetes.Interface, prometheusPlugin *PrometheusCollectorPlugin, backendAnnotations []string) (*SkipperCollectorPlugin, error) {
return &SkipperCollectorPlugin{
client: client,
plugin: prometheusPlugin,
client: client,
plugin: prometheusPlugin,
backendAnnotations: backendAnnotations,
}, nil
}
// NewCollector initializes a new skipper collector from the specified HPA.
func (c *SkipperCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
switch config.Name {
case rpsMetricName:
return NewSkipperCollector(c.client, c.plugin, hpa, config, interval)
default:
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
if strings.HasPrefix(config.Name, rpsMetricName) {
backend := ""
if len(config.Name) > len(rpsMetricName) {
metricNameParts := strings.Split(config.Name, rpsMetricBackendSeparator)
if len(metricNameParts) == 2 {
backend = metricNameParts[1]
}
}
return NewSkipperCollector(c.client, c.plugin, hpa, config, interval, c.backendAnnotations, backend)
}
return nil, fmt.Errorf("metric '%s' not supported", config.Name)
}
// SkipperCollector is a metrics collector for getting skipper ingress metrics.
// It depends on the prometheus collector for getting the metrics.
type SkipperCollector struct {
client kubernetes.Interface
metricName string
objectReference custom_metrics.ObjectReference
hpa *autoscalingv2beta1.HorizontalPodAutoscaler
interval time.Duration
plugin CollectorPlugin
config MetricConfig
client kubernetes.Interface
metricName string
objectReference custom_metrics.ObjectReference
hpa *autoscalingv2beta1.HorizontalPodAutoscaler
interval time.Duration
plugin CollectorPlugin
config MetricConfig
backend string
backendAnnotations []string
}
// NewSkipperCollector initializes a new SkipperCollector.
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*SkipperCollector, error) {
func NewSkipperCollector(client kubernetes.Interface, plugin CollectorPlugin, hpa *autoscalingv2beta1.HorizontalPodAutoscaler,
config *MetricConfig, interval time.Duration, backendAnnotations []string, backend string) (*SkipperCollector, error) {
return &SkipperCollector{
client: client,
objectReference: config.ObjectReference,
hpa: hpa,
metricName: config.Name,
interval: interval,
plugin: plugin,
config: *config,
client: client,
objectReference: config.ObjectReference,
hpa: hpa,
metricName: config.Name,
interval: interval,
plugin: plugin,
config: *config,
backend: backend,
backendAnnotations: backendAnnotations,
}, nil
}
func getAnnotationWeight(backendWeights string, backend string) float64 {
var weightsMap map[string]int
err := json.Unmarshal([]byte(backendWeights), &weightsMap)
if err != nil {
return 0
}
if weight, ok := weightsMap[backend]; ok {
return float64(weight) / 100
}
return 0
}
func getWeights(ingressAnnotations map[string]string, backendAnnotations []string, backend string) float64 {
var maxWeight float64 = 0
for _, anno := range backendAnnotations {
if weightsMap, ok := ingressAnnotations[anno]; ok {
weight := getAnnotationWeight(weightsMap, backend)
if weight > maxWeight {
maxWeight = weight
}
}
}
if maxWeight > 0 {
return maxWeight
}
return 1.0
}
// getCollector returns a collector for getting the metrics.
func (c *SkipperCollector) getCollector() (Collector, error) {
ingress, err := c.client.ExtensionsV1beta1().Ingresses(c.objectReference.Namespace).Get(c.objectReference.Name, metav1.GetOptions{})
@ -74,6 +116,7 @@ func (c *SkipperCollector) getCollector() (Collector, error) {
return nil, err
}
backendWeight := getWeights(ingress.Annotations, c.backendAnnotations, c.backend)
config := c.config
var collector Collector
@ -92,10 +135,9 @@ func (c *SkipperCollector) getCollector() (Collector, error) {
collectors = append(collectors, collector)
}
if len(collectors) > 1 {
collector = NewMaxCollector(c.interval, collectors...)
} else if len(collectors) == 1 {
collector = collectors[0]
if len(collectors) > 0 {
collector = NewMaxWeightedCollector(c.interval, backendWeight, collectors...)
} else {
return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name)
}

View File

@ -1,7 +1,16 @@
package collector
import (
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/apis/custom_metrics"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/api/apps/v1"
@ -10,11 +19,16 @@ import (
"k8s.io/client-go/kubernetes/fake"
)
const (
testBackendWeightsAnnotation = "zalando.org/backend-weights"
testStacksetWeightsAnnotation = "zalando.org/stack-set-weights"
)
func TestTargetRefReplicasDeployments(t *testing.T) {
client := fake.NewSimpleClientset()
name := "some-app"
defaultNamespace := "default"
deployment, err := newDeployment(client, defaultNamespace, name)
deployment, err := newDeployment(client, defaultNamespace, name, 2, 1)
require.NoError(t, err)
// Create an HPA with the deployment as ref
@ -59,7 +73,7 @@ func newHPA(namesapce string, refName string, refKind string) *autoscalingv2beta
}
}
func newDeployment(client *fake.Clientset, namespace string, name string) (*v1.Deployment, error) {
func newDeployment(client *fake.Clientset, namespace string, name string, replicas, readyReplicas int32) (*v1.Deployment, error) {
return client.AppsV1().Deployments(namespace).Create(&v1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
@ -67,8 +81,8 @@ func newDeployment(client *fake.Clientset, namespace string, name string) (*v1.D
},
Spec: v1.DeploymentSpec{},
Status: v1.DeploymentStatus{
ReadyReplicas: 1,
Replicas: 2,
ReadyReplicas: replicas,
Replicas: readyReplicas,
},
})
}
@ -85,3 +99,170 @@ func newStatefulSet(client *fake.Clientset, namespace string, name string) (*v1.
},
})
}
func TestSkipperCollector(t *testing.T) {
for _, tc := range []struct {
msg string
metrics []int
backend string
ingressName string
collectedMetric int
namespace string
backendWeights map[string]map[string]int
replicas int32
readyReplicas int32
backendAnnotations []string
}{
{
msg: "test unweighted hpa",
metrics: []int{1000, 1000, 2000},
ingressName: "dummy-ingress",
collectedMetric: 2000,
namespace: "default",
backend: "dummy-backend",
replicas: 1,
readyReplicas: 1,
},
{
msg: "test weighted backend",
metrics: []int{100, 1500, 700},
ingressName: "dummy-ingress",
collectedMetric: 600,
namespace: "default",
backend: "backend1",
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 60, "backend1": 40}},
replicas: 1,
readyReplicas: 1,
backendAnnotations: []string{testBackendWeightsAnnotation},
},
{
msg: "test multiple replicas",
metrics: []int{100, 1500, 700},
ingressName: "dummy-ingress",
collectedMetric: 150,
namespace: "default",
backend: "backend1",
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 50, "backend1": 50}},
replicas: 5,
readyReplicas: 5,
backendAnnotations: []string{testBackendWeightsAnnotation},
},
{
msg: "test multiple backend annotation",
metrics: []int{100, 1500, 700},
ingressName: "dummy-ingress",
collectedMetric: 300,
namespace: "default",
backend: "backend1",
backendWeights: map[string]map[string]int{
testBackendWeightsAnnotation: {"backend2": 20, "backend1": 80},
testStacksetWeightsAnnotation: {"backend2": 0, "backend1": 100},
},
replicas: 5,
readyReplicas: 5,
backendAnnotations: []string{testBackendWeightsAnnotation, testStacksetWeightsAnnotation},
},
} {
t.Run(tc.msg, func(tt *testing.T) {
client := fake.NewSimpleClientset()
makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.backendWeights)
plugin := makePlugin(tc.metrics)
hpa := makeHPA(tc.ingressName, tc.backend)
config := makeConfig(tc.backend)
newDeployment(client, tc.namespace, tc.backend, tc.replicas, tc.readyReplicas)
collector, err := NewSkipperCollector(client, plugin, hpa, config, time.Minute, tc.backendAnnotations, tc.backend)
assert.NoError(tt, err, "failed to create skipper collector: %v", err)
collected, err := collector.GetMetrics()
assert.NoError(tt, err, "failed to collect metrics: %v", err)
assert.Len(t, collected, 1, "the number of metrics returned is not 1")
assert.EqualValues(t, tc.collectedMetric, collected[0].Custom.Value.Value(), "the returned metric is not expected value")
})
}
}
func makeIngress(client kubernetes.Interface, namespace, ingressName, backend string, backendWeights map[string]map[string]int) error {
annotations := make(map[string]string)
for anno, weights := range backendWeights {
sWeights, err := json.Marshal(weights)
if err != nil {
return err
}
annotations[anno] = string(sWeights)
}
_, err := client.ExtensionsV1beta1().Ingresses(namespace).Create(&v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: ingressName,
Annotations: annotations,
},
Spec: v1beta1.IngressSpec{
Backend: &v1beta1.IngressBackend{
ServiceName: backend,
},
TLS: nil,
Rules: []v1beta1.IngressRule{
{
Host: "example.org",
},
},
},
Status: v1beta1.IngressStatus{
LoadBalancer: corev1.LoadBalancerStatus{
Ingress: nil,
},
},
})
return err
}
func makeHPA(ingressName, backend string) *autoscalingv2beta1.HorizontalPodAutoscaler {
return &autoscalingv2beta1.HorizontalPodAutoscaler{
Spec: autoscalingv2beta1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2beta1.CrossVersionObjectReference{
Kind: "Deployment",
Name: backend,
},
Metrics: []autoscalingv2beta1.MetricSpec{
{
Type: autoscalingv2beta1.ObjectMetricSourceType,
Object: &autoscalingv2beta1.ObjectMetricSource{
Target: autoscalingv2beta1.CrossVersionObjectReference{Name: ingressName, APIVersion: "extensions/v1", Kind: "Ingress"},
MetricName: fmt.Sprintf("%s,%s", rpsMetricName, backend),
},
},
},
},
}
}
func makeConfig(backend string) *MetricConfig {
return &MetricConfig{
MetricTypeName: MetricTypeName{Name: fmt.Sprintf("%s,%s", rpsMetricName, backend)},
}
}
type FakeCollectorPlugin struct {
metrics []CollectedMetric
}
type FakeCollector struct {
metrics []CollectedMetric
}
func (c *FakeCollector) GetMetrics() ([]CollectedMetric, error) {
return c.metrics, nil
}
func (FakeCollector) Interval() time.Duration {
return time.Minute
}
func (p *FakeCollectorPlugin) NewCollector(hpa *autoscalingv2beta1.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return &FakeCollector{metrics: p.metrics}, nil
}
func makePlugin(metrics []int) CollectorPlugin {
m := make([]CollectedMetric, len(metrics))
for i, v := range metrics {
m[i] = CollectedMetric{Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(v), resource.DecimalSI)}}
}
return &FakeCollectorPlugin{metrics: m}
}

View File

@ -98,6 +98,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
"path to the credentials dir where tokens are stored")
flags.BoolVar(&o.SkipperIngressMetrics, "skipper-ingress-metrics", o.SkipperIngressMetrics, ""+
"whether to enable skipper ingress metrics")
flags.StringArrayVar(&o.SkipperBackendWeightAnnotation, "skipper-backends-annotation", o.SkipperBackendWeightAnnotation, ""+
"the annotation to get backend weights so that the returned metric can be weighted")
flags.BoolVar(&o.AWSExternalMetrics, "aws-external-metrics", o.AWSExternalMetrics, ""+
"whether to enable AWS external metrics")
flags.StringSliceVar(&o.AWSRegions, "aws-region", o.AWSRegions, "the AWS regions which should be monitored. eg: eu-central, eu-west-1")
@ -159,7 +161,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
// skipper collector can only be enabled if prometheus is.
if o.SkipperIngressMetrics {
skipperPlugin, err := collector.NewSkipperCollectorPlugin(client, promPlugin)
skipperPlugin, err := collector.NewSkipperCollectorPlugin(client, promPlugin, o.SkipperBackendWeightAnnotation)
if err != nil {
return fmt.Errorf("failed to initialize skipper collector plugin: %v", err)
}
@ -306,4 +308,6 @@ type AdapterServerOptions struct {
AWSRegions []string
// MetricsAddress is the address where to serve prometheus metrics.
MetricsAddress string
// SkipperBackendWeightAnnotation is the annotation on the ingress indicating the backend weights
SkipperBackendWeightAnnotation []string
}