Merge pull request #83 from zalando-incubator/ingress-collector

Skipper: simplify metrics collection
This commit is contained in:
aermakov-zalando
2019-10-22 16:07:49 +02:00
committed by GitHub
5 changed files with 103 additions and 220 deletions

View File

@ -1,67 +0,0 @@
package collector
import (
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/resource"
)
// MaxWeightedCollector is a simple aggregator collector that returns the maximum value
// of metrics from all collectors.
type MaxWeightedCollector struct {
collectors []Collector
interval time.Duration
weight float64
}
// NewMaxWeightedCollector initializes a new MaxWeightedCollector.
func NewMaxWeightedCollector(interval time.Duration, weight float64, collectors ...Collector) *MaxWeightedCollector {
return &MaxWeightedCollector{
collectors: collectors,
interval: interval,
weight: weight,
}
}
// GetMetrics gets metrics from all collectors and return the higest value.
func (c *MaxWeightedCollector) GetMetrics() ([]CollectedMetric, error) {
errors := make([]error, 0)
collectedMetrics := make([]CollectedMetric, 0)
for _, collector := range c.collectors {
values, err := collector.GetMetrics()
if err != nil {
if _, ok := err.(NoResultError); ok {
errors = append(errors, err)
continue
}
return nil, err
}
collectedMetrics = append(collectedMetrics, values...)
}
if len(collectedMetrics) == 0 {
if len(errors) == 0 {
return nil, fmt.Errorf("no metrics collected, cannot determine max")
}
errorStrings := make([]string, len(errors))
for i, e := range errors {
errorStrings[i] = e.Error()
}
allErrors := strings.Join(errorStrings, ",")
return nil, fmt.Errorf("could not determine maximum due to errors: %s", allErrors)
}
max := collectedMetrics[0]
for _, value := range collectedMetrics {
if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() {
max = value
}
}
max.Custom.Value = *resource.NewMilliQuantity(int64(c.weight*float64(max.Custom.Value.MilliValue())), resource.DecimalSI)
return []CollectedMetric{max}, nil
}
// Interval returns the interval at which the collector should run.
func (c *MaxWeightedCollector) Interval() time.Duration {
return c.interval
}

View File

@ -1,99 +0,0 @@
package collector
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/custom_metrics"
)
type dummyCollector struct {
value int64
}
func (c dummyCollector) Interval() time.Duration {
return time.Second
}
func (c dummyCollector) GetMetrics() ([]CollectedMetric, error) {
switch c.value {
case 0:
return nil, NoResultError{query: "invalid query"}
case -1:
return nil, fmt.Errorf("test error")
default:
quantity := resource.NewQuantity(c.value, resource.DecimalSI)
return []CollectedMetric{
{
Custom: custom_metrics.MetricValue{
Value: *quantity,
},
},
}, nil
}
}
func TestMaxCollector(t *testing.T) {
for _, tc := range []struct {
name string
values []int64
expected int
weight float64
errored bool
}{
{
name: "basic",
values: []int64{100, 10, 9},
expected: 100,
weight: 1,
errored: false,
},
{
name: "weighted",
values: []int64{100, 10, 9},
expected: 20,
weight: 0.2,
errored: false,
},
{
name: "with error",
values: []int64{10, 9, -1},
weight: 0.5,
errored: true,
},
{
name: "some invalid results",
values: []int64{0, 1, 0, 10, 9},
expected: 5,
weight: 0.5,
errored: false,
},
{
name: "both invalid results and errors",
values: []int64{0, 1, 0, -1, 10, 9},
weight: 0.5,
errored: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
collectors := make([]Collector, len(tc.values))
for i, v := range tc.values {
collectors[i] = dummyCollector{value: v}
}
wc := NewMaxWeightedCollector(time.Second, tc.weight, collectors...)
metrics, err := wc.GetMetrics()
if tc.errored {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Len(t, metrics, 1)
require.EqualValues(t, tc.expected, metrics[0].Custom.Value.Value())
}
})
}
}

View File

@ -3,6 +3,7 @@ package collector
import (
"context"
"fmt"
"math"
"net/http"
"time"
@ -146,7 +147,7 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) {
sampleValue = scalar.Value
}
if sampleValue.String() == "NaN" {
if math.IsNaN(float64(sampleValue)) {
return nil, &NoResultError{query: c.query}
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math"
"regexp"
"strings"
"time"
@ -16,7 +17,7 @@ import (
)
const (
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))`
rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"%s"}[1m])) * %.4f)`
rpsMetricName = "requests-per-second"
rpsMetricBackendSeparator = ","
)
@ -135,29 +136,25 @@ func (c *SkipperCollector) getCollector() (Collector, error) {
}
config := c.config
var collector Collector
collectors := make([]Collector, 0, len(ingress.Spec.Rules))
var escapedHostnames []string
for _, rule := range ingress.Spec.Rules {
host := strings.Replace(rule.Host, ".", "_", -1)
config.Config = map[string]string{
"query": fmt.Sprintf(rpsQuery, host),
}
config.PerReplica = false // per replica is handled outside of the prometheus collector
collector, err := c.plugin.NewCollector(c.hpa, &config, c.interval)
if err != nil {
return nil, err
}
collectors = append(collectors, collector)
escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(rule.Host, ".", "_", -1)))
}
if len(collectors) > 0 {
collector = NewMaxWeightedCollector(c.interval, backendWeight, collectors...)
} else {
if len(escapedHostnames) == 0 {
return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name)
}
config.Config = map[string]string{
"query": fmt.Sprintf(rpsQuery, strings.Join(escapedHostnames, "|"), backendWeight),
}
config.PerReplica = false // per replica is handled outside of the prometheus collector
collector, err := c.plugin.NewCollector(c.hpa, &config, c.interval)
if err != nil {
return nil, err
}
return collector, nil
}

View File

@ -102,9 +102,11 @@ func newStatefulSet(client *fake.Clientset, namespace string, name string) (*app
func TestSkipperCollector(t *testing.T) {
for _, tc := range []struct {
msg string
metrics []int
metric int
backend string
ingressName string
hostnames []string
expectedQuery string
collectedMetric int
expectError bool
fakedAverage bool
@ -116,9 +118,11 @@ func TestSkipperCollector(t *testing.T) {
}{
{
msg: "test unweighted hpa",
metrics: []int{1000, 1000, 2000},
metric: 1000,
ingressName: "dummy-ingress",
collectedMetric: 2000,
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`,
collectedMetric: 1000,
namespace: "default",
backend: "dummy-backend",
replicas: 1,
@ -126,9 +130,25 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test weighted backend",
metrics: []int{100, 1500, 700},
metric: 1000,
ingressName: "dummy-ingress",
collectedMetric: 600,
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.4000)`,
collectedMetric: 1000,
namespace: "default",
backend: "backend1",
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 60, "backend1": 40}},
replicas: 1,
readyReplicas: 1,
backendAnnotations: []string{testBackendWeightsAnnotation},
},
{
msg: "test multiple hostnames",
metric: 1000,
ingressName: "dummy-ingress",
hostnames: []string{"example.org", "foo.bar.com", "test.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org|foo_bar_com|test_org"}[1m])) * 0.4000)`,
collectedMetric: 1000,
namespace: "default",
backend: "backend1",
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 60, "backend1": 40}},
@ -138,9 +158,11 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test multiple replicas",
metrics: []int{100, 1500, 700},
metric: 1000,
ingressName: "dummy-ingress",
collectedMetric: 150,
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.5000)`,
collectedMetric: 200,
fakedAverage: true,
namespace: "default",
backend: "backend1",
@ -151,9 +173,11 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test multiple replicas not calculating average internally",
metrics: []int{100, 1500, 700},
metric: 1500,
ingressName: "dummy-ingress",
collectedMetric: 750, // 50% of 1500
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.5000)`,
collectedMetric: 1500,
namespace: "default",
backend: "backend1",
backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 50, "backend1": 50}},
@ -163,8 +187,10 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test zero weight backends",
metrics: []int{100, 1500, 700},
metric: 0,
ingressName: "dummy-ingress",
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.0000)`,
collectedMetric: 0,
namespace: "default",
backend: "backend1",
@ -175,8 +201,10 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test multiple backend annotation",
metrics: []int{100, 1500, 700},
metric: 1500,
ingressName: "dummy-ingress",
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`,
collectedMetric: 300,
fakedAverage: true,
namespace: "default",
@ -191,8 +219,10 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test multiple backend annotation not calculating average internally",
metrics: []int{100, 1500, 700},
metric: 1500,
ingressName: "dummy-ingress",
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`,
collectedMetric: 1500,
namespace: "default",
backend: "backend1",
@ -206,8 +236,10 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test backend is not set",
metrics: []int{100, 1500, 700},
metric: 0,
ingressName: "dummy-ingress",
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.0000)`,
collectedMetric: 0,
namespace: "default",
backend: "backend3",
@ -218,8 +250,10 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test no annotations set",
metrics: []int{100, 1500, 700},
metric: 1500,
ingressName: "dummy-ingress",
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`,
collectedMetric: 1500,
namespace: "default",
backend: "backend3",
@ -230,8 +264,10 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test annotations are set but backend is missing",
metrics: []int{100, 1500, 700},
metric: 1500,
ingressName: "dummy-ingress",
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`,
expectError: true,
namespace: "default",
backend: "",
@ -242,8 +278,10 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test annotations are missing and backend is unset",
metrics: []int{100, 1500, 700},
metric: 1500,
ingressName: "dummy-ingress",
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`,
collectedMetric: 1500,
namespace: "default",
backend: "",
@ -254,9 +292,11 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test partial backend annotations",
metrics: []int{100, 1500, 700},
metric: 1500,
ingressName: "dummy-ingress",
collectedMetric: 60,
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.2000)`,
collectedMetric: 300,
fakedAverage: true,
namespace: "default",
backend: "backend2",
@ -270,9 +310,11 @@ func TestSkipperCollector(t *testing.T) {
},
{
msg: "test partial backend annotations not calculating average internally",
metrics: []int{100, 1500, 700},
metric: 1500,
ingressName: "dummy-ingress",
collectedMetric: 300,
hostnames: []string{"example.org"},
expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.2000)`,
collectedMetric: 1500,
namespace: "default",
backend: "backend2",
backendWeights: map[string]map[string]int{
@ -286,9 +328,9 @@ func TestSkipperCollector(t *testing.T) {
} {
t.Run(tc.msg, func(t *testing.T) {
client := fake.NewSimpleClientset()
err := makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.backendWeights)
err := makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.hostnames, tc.backendWeights)
require.NoError(t, err)
plugin := makePlugin(tc.metrics)
plugin := makePlugin(tc.metric)
hpa := makeHPA(tc.ingressName, tc.backend)
config := makeConfig(tc.backend, tc.fakedAverage)
_, err = newDeployment(client, tc.namespace, tc.backend, tc.replicas, tc.readyReplicas)
@ -299,6 +341,7 @@ func TestSkipperCollector(t *testing.T) {
if tc.expectError {
require.Error(t, err)
} else {
require.Equal(t, map[string]string{"query": tc.expectedQuery}, plugin.config)
require.NoError(t, err, "failed to collect metrics: %v", err)
require.Len(t, collected, 1, "the number of metrics returned is not 1")
require.EqualValues(t, tc.collectedMetric, collected[0].Custom.Value.Value(), "the returned metric is not expected value")
@ -307,7 +350,7 @@ func TestSkipperCollector(t *testing.T) {
}
}
func makeIngress(client kubernetes.Interface, namespace, ingressName, backend string, backendWeights map[string]map[string]int) error {
func makeIngress(client kubernetes.Interface, namespace, ingressName, backend string, hostnames []string, backendWeights map[string]map[string]int) error {
annotations := make(map[string]string)
for anno, weights := range backendWeights {
sWeights, err := json.Marshal(weights)
@ -316,7 +359,7 @@ func makeIngress(client kubernetes.Interface, namespace, ingressName, backend st
}
annotations[anno] = string(sWeights)
}
_, err := client.ExtensionsV1beta1().Ingresses(namespace).Create(&v1beta1.Ingress{
ingress := &v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: ingressName,
Annotations: annotations,
@ -326,18 +369,19 @@ func makeIngress(client kubernetes.Interface, namespace, ingressName, backend st
ServiceName: backend,
},
TLS: nil,
Rules: []v1beta1.IngressRule{
{
Host: "example.org",
},
},
},
Status: v1beta1.IngressStatus{
LoadBalancer: corev1.LoadBalancerStatus{
Ingress: nil,
},
},
})
}
for _, hostname := range hostnames {
ingress.Spec.Rules = append(ingress.Spec.Rules, v1beta1.IngressRule{
Host: hostname,
})
}
_, err := client.ExtensionsV1beta1().Ingresses(namespace).Create(ingress)
return err
}
@ -380,6 +424,7 @@ func makeConfig(backend string, fakedAverage bool) *MetricConfig {
type FakeCollectorPlugin struct {
metrics []CollectedMetric
config map[string]string
}
type FakeCollector struct {
@ -395,13 +440,19 @@ func (FakeCollector) Interval() time.Duration {
}
func (p *FakeCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
if p.config != nil {
return nil, fmt.Errorf("config already assigned once: %v", p.config)
}
p.config = config.Config
return &FakeCollector{metrics: p.metrics}, nil
}
func makePlugin(metrics []int) CollectorPlugin {
m := make([]CollectedMetric, len(metrics))
for i, v := range metrics {
m[i] = CollectedMetric{Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(v), resource.DecimalSI)}}
func makePlugin(metric int) *FakeCollectorPlugin {
return &FakeCollectorPlugin{
metrics: []CollectedMetric{
{
Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(metric), resource.DecimalSI)},
},
},
}
return &FakeCollectorPlugin{metrics: m}
}