diff --git a/pkg/collector/max_collector.go b/pkg/collector/max_collector.go deleted file mode 100644 index c194de6..0000000 --- a/pkg/collector/max_collector.go +++ /dev/null @@ -1,67 +0,0 @@ -package collector - -import ( - "fmt" - "strings" - "time" - - "k8s.io/apimachinery/pkg/api/resource" -) - -// MaxWeightedCollector is a simple aggregator collector that returns the maximum value -// of metrics from all collectors. -type MaxWeightedCollector struct { - collectors []Collector - interval time.Duration - weight float64 -} - -// NewMaxWeightedCollector initializes a new MaxWeightedCollector. -func NewMaxWeightedCollector(interval time.Duration, weight float64, collectors ...Collector) *MaxWeightedCollector { - return &MaxWeightedCollector{ - collectors: collectors, - interval: interval, - weight: weight, - } -} - -// GetMetrics gets metrics from all collectors and return the higest value. -func (c *MaxWeightedCollector) GetMetrics() ([]CollectedMetric, error) { - errors := make([]error, 0) - collectedMetrics := make([]CollectedMetric, 0) - for _, collector := range c.collectors { - values, err := collector.GetMetrics() - if err != nil { - if _, ok := err.(NoResultError); ok { - errors = append(errors, err) - continue - } - return nil, err - } - collectedMetrics = append(collectedMetrics, values...) - } - if len(collectedMetrics) == 0 { - if len(errors) == 0 { - return nil, fmt.Errorf("no metrics collected, cannot determine max") - } - errorStrings := make([]string, len(errors)) - for i, e := range errors { - errorStrings[i] = e.Error() - } - allErrors := strings.Join(errorStrings, ",") - return nil, fmt.Errorf("could not determine maximum due to errors: %s", allErrors) - } - max := collectedMetrics[0] - for _, value := range collectedMetrics { - if value.Custom.Value.MilliValue() > max.Custom.Value.MilliValue() { - max = value - } - } - max.Custom.Value = *resource.NewMilliQuantity(int64(c.weight*float64(max.Custom.Value.MilliValue())), resource.DecimalSI) - return []CollectedMetric{max}, nil -} - -// Interval returns the interval at which the collector should run. -func (c *MaxWeightedCollector) Interval() time.Duration { - return c.interval -} diff --git a/pkg/collector/max_collector_test.go b/pkg/collector/max_collector_test.go deleted file mode 100644 index 195ef9c..0000000 --- a/pkg/collector/max_collector_test.go +++ /dev/null @@ -1,99 +0,0 @@ -package collector - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/metrics/pkg/apis/custom_metrics" -) - -type dummyCollector struct { - value int64 -} - -func (c dummyCollector) Interval() time.Duration { - return time.Second -} - -func (c dummyCollector) GetMetrics() ([]CollectedMetric, error) { - switch c.value { - case 0: - return nil, NoResultError{query: "invalid query"} - case -1: - return nil, fmt.Errorf("test error") - default: - quantity := resource.NewQuantity(c.value, resource.DecimalSI) - return []CollectedMetric{ - { - Custom: custom_metrics.MetricValue{ - Value: *quantity, - }, - }, - }, nil - } -} - -func TestMaxCollector(t *testing.T) { - for _, tc := range []struct { - name string - values []int64 - expected int - weight float64 - errored bool - }{ - { - name: "basic", - values: []int64{100, 10, 9}, - expected: 100, - weight: 1, - errored: false, - }, - { - name: "weighted", - values: []int64{100, 10, 9}, - expected: 20, - weight: 0.2, - errored: false, - }, - { - name: "with error", - values: []int64{10, 9, -1}, - weight: 0.5, - errored: true, - }, - { - name: "some invalid results", - values: []int64{0, 1, 0, 10, 9}, - expected: 5, - weight: 0.5, - errored: false, - }, - { - name: "both invalid results and errors", - values: []int64{0, 1, 0, -1, 10, 9}, - weight: 0.5, - errored: true, - }, - } { - t.Run(tc.name, func(t *testing.T) { - collectors := make([]Collector, len(tc.values)) - for i, v := range tc.values { - collectors[i] = dummyCollector{value: v} - } - wc := NewMaxWeightedCollector(time.Second, tc.weight, collectors...) - metrics, err := wc.GetMetrics() - if tc.errored { - require.Error(t, err) - } else { - require.NoError(t, err) - require.Len(t, metrics, 1) - require.EqualValues(t, tc.expected, metrics[0].Custom.Value.Value()) - } - - }) - - } -} diff --git a/pkg/collector/prometheus_collector.go b/pkg/collector/prometheus_collector.go index ec1c133..c32ac9c 100644 --- a/pkg/collector/prometheus_collector.go +++ b/pkg/collector/prometheus_collector.go @@ -3,6 +3,7 @@ package collector import ( "context" "fmt" + "math" "net/http" "time" @@ -146,7 +147,7 @@ func (c *PrometheusCollector) GetMetrics() ([]CollectedMetric, error) { sampleValue = scalar.Value } - if sampleValue.String() == "NaN" { + if math.IsNaN(float64(sampleValue)) { return nil, &NoResultError{query: c.query} } diff --git a/pkg/collector/skipper_collector.go b/pkg/collector/skipper_collector.go index d660021..7397f53 100644 --- a/pkg/collector/skipper_collector.go +++ b/pkg/collector/skipper_collector.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "regexp" "strings" "time" @@ -16,7 +17,7 @@ import ( ) const ( - rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host="%s"}[1m])))` + rpsQuery = `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"%s"}[1m])) * %.4f)` rpsMetricName = "requests-per-second" rpsMetricBackendSeparator = "," ) @@ -135,29 +136,25 @@ func (c *SkipperCollector) getCollector() (Collector, error) { } config := c.config - var collector Collector - collectors := make([]Collector, 0, len(ingress.Spec.Rules)) + var escapedHostnames []string for _, rule := range ingress.Spec.Rules { - host := strings.Replace(rule.Host, ".", "_", -1) - config.Config = map[string]string{ - "query": fmt.Sprintf(rpsQuery, host), - } - - config.PerReplica = false // per replica is handled outside of the prometheus collector - collector, err := c.plugin.NewCollector(c.hpa, &config, c.interval) - if err != nil { - return nil, err - } - - collectors = append(collectors, collector) + escapedHostnames = append(escapedHostnames, regexp.QuoteMeta(strings.Replace(rule.Host, ".", "_", -1))) } - if len(collectors) > 0 { - collector = NewMaxWeightedCollector(c.interval, backendWeight, collectors...) - } else { + if len(escapedHostnames) == 0 { return nil, fmt.Errorf("no hosts defined on ingress %s/%s, unable to create collector", c.objectReference.Namespace, c.objectReference.Name) } + config.Config = map[string]string{ + "query": fmt.Sprintf(rpsQuery, strings.Join(escapedHostnames, "|"), backendWeight), + } + + config.PerReplica = false // per replica is handled outside of the prometheus collector + collector, err := c.plugin.NewCollector(c.hpa, &config, c.interval) + if err != nil { + return nil, err + } + return collector, nil } diff --git a/pkg/collector/skipper_collector_test.go b/pkg/collector/skipper_collector_test.go index ba6b862..73a67a2 100644 --- a/pkg/collector/skipper_collector_test.go +++ b/pkg/collector/skipper_collector_test.go @@ -102,9 +102,11 @@ func newStatefulSet(client *fake.Clientset, namespace string, name string) (*app func TestSkipperCollector(t *testing.T) { for _, tc := range []struct { msg string - metrics []int + metric int backend string ingressName string + hostnames []string + expectedQuery string collectedMetric int expectError bool fakedAverage bool @@ -116,9 +118,11 @@ func TestSkipperCollector(t *testing.T) { }{ { msg: "test unweighted hpa", - metrics: []int{1000, 1000, 2000}, + metric: 1000, ingressName: "dummy-ingress", - collectedMetric: 2000, + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`, + collectedMetric: 1000, namespace: "default", backend: "dummy-backend", replicas: 1, @@ -126,9 +130,25 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test weighted backend", - metrics: []int{100, 1500, 700}, + metric: 1000, ingressName: "dummy-ingress", - collectedMetric: 600, + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.4000)`, + collectedMetric: 1000, + namespace: "default", + backend: "backend1", + backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 60, "backend1": 40}}, + replicas: 1, + readyReplicas: 1, + backendAnnotations: []string{testBackendWeightsAnnotation}, + }, + { + msg: "test multiple hostnames", + metric: 1000, + ingressName: "dummy-ingress", + hostnames: []string{"example.org", "foo.bar.com", "test.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org|foo_bar_com|test_org"}[1m])) * 0.4000)`, + collectedMetric: 1000, namespace: "default", backend: "backend1", backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 60, "backend1": 40}}, @@ -138,9 +158,11 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test multiple replicas", - metrics: []int{100, 1500, 700}, + metric: 1000, ingressName: "dummy-ingress", - collectedMetric: 150, + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.5000)`, + collectedMetric: 200, fakedAverage: true, namespace: "default", backend: "backend1", @@ -151,9 +173,11 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test multiple replicas not calculating average internally", - metrics: []int{100, 1500, 700}, + metric: 1500, ingressName: "dummy-ingress", - collectedMetric: 750, // 50% of 1500 + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.5000)`, + collectedMetric: 1500, namespace: "default", backend: "backend1", backendWeights: map[string]map[string]int{testBackendWeightsAnnotation: {"backend2": 50, "backend1": 50}}, @@ -163,8 +187,10 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test zero weight backends", - metrics: []int{100, 1500, 700}, + metric: 0, ingressName: "dummy-ingress", + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.0000)`, collectedMetric: 0, namespace: "default", backend: "backend1", @@ -175,8 +201,10 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test multiple backend annotation", - metrics: []int{100, 1500, 700}, + metric: 1500, ingressName: "dummy-ingress", + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`, collectedMetric: 300, fakedAverage: true, namespace: "default", @@ -191,8 +219,10 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test multiple backend annotation not calculating average internally", - metrics: []int{100, 1500, 700}, + metric: 1500, ingressName: "dummy-ingress", + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`, collectedMetric: 1500, namespace: "default", backend: "backend1", @@ -206,8 +236,10 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test backend is not set", - metrics: []int{100, 1500, 700}, + metric: 0, ingressName: "dummy-ingress", + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.0000)`, collectedMetric: 0, namespace: "default", backend: "backend3", @@ -218,8 +250,10 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test no annotations set", - metrics: []int{100, 1500, 700}, + metric: 1500, ingressName: "dummy-ingress", + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`, collectedMetric: 1500, namespace: "default", backend: "backend3", @@ -230,8 +264,10 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test annotations are set but backend is missing", - metrics: []int{100, 1500, 700}, + metric: 1500, ingressName: "dummy-ingress", + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`, expectError: true, namespace: "default", backend: "", @@ -242,8 +278,10 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test annotations are missing and backend is unset", - metrics: []int{100, 1500, 700}, + metric: 1500, ingressName: "dummy-ingress", + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 1.0000)`, collectedMetric: 1500, namespace: "default", backend: "", @@ -254,9 +292,11 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test partial backend annotations", - metrics: []int{100, 1500, 700}, + metric: 1500, ingressName: "dummy-ingress", - collectedMetric: 60, + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.2000)`, + collectedMetric: 300, fakedAverage: true, namespace: "default", backend: "backend2", @@ -270,9 +310,11 @@ func TestSkipperCollector(t *testing.T) { }, { msg: "test partial backend annotations not calculating average internally", - metrics: []int{100, 1500, 700}, + metric: 1500, ingressName: "dummy-ingress", - collectedMetric: 300, + hostnames: []string{"example.org"}, + expectedQuery: `scalar(sum(rate(skipper_serve_host_duration_seconds_count{host=~"example_org"}[1m])) * 0.2000)`, + collectedMetric: 1500, namespace: "default", backend: "backend2", backendWeights: map[string]map[string]int{ @@ -286,9 +328,9 @@ func TestSkipperCollector(t *testing.T) { } { t.Run(tc.msg, func(t *testing.T) { client := fake.NewSimpleClientset() - err := makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.backendWeights) + err := makeIngress(client, tc.namespace, tc.ingressName, tc.backend, tc.hostnames, tc.backendWeights) require.NoError(t, err) - plugin := makePlugin(tc.metrics) + plugin := makePlugin(tc.metric) hpa := makeHPA(tc.ingressName, tc.backend) config := makeConfig(tc.backend, tc.fakedAverage) _, err = newDeployment(client, tc.namespace, tc.backend, tc.replicas, tc.readyReplicas) @@ -299,6 +341,7 @@ func TestSkipperCollector(t *testing.T) { if tc.expectError { require.Error(t, err) } else { + require.Equal(t, map[string]string{"query": tc.expectedQuery}, plugin.config) require.NoError(t, err, "failed to collect metrics: %v", err) require.Len(t, collected, 1, "the number of metrics returned is not 1") require.EqualValues(t, tc.collectedMetric, collected[0].Custom.Value.Value(), "the returned metric is not expected value") @@ -307,7 +350,7 @@ func TestSkipperCollector(t *testing.T) { } } -func makeIngress(client kubernetes.Interface, namespace, ingressName, backend string, backendWeights map[string]map[string]int) error { +func makeIngress(client kubernetes.Interface, namespace, ingressName, backend string, hostnames []string, backendWeights map[string]map[string]int) error { annotations := make(map[string]string) for anno, weights := range backendWeights { sWeights, err := json.Marshal(weights) @@ -316,7 +359,7 @@ func makeIngress(client kubernetes.Interface, namespace, ingressName, backend st } annotations[anno] = string(sWeights) } - _, err := client.ExtensionsV1beta1().Ingresses(namespace).Create(&v1beta1.Ingress{ + ingress := &v1beta1.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: ingressName, Annotations: annotations, @@ -326,18 +369,19 @@ func makeIngress(client kubernetes.Interface, namespace, ingressName, backend st ServiceName: backend, }, TLS: nil, - Rules: []v1beta1.IngressRule{ - { - Host: "example.org", - }, - }, }, Status: v1beta1.IngressStatus{ LoadBalancer: corev1.LoadBalancerStatus{ Ingress: nil, }, }, - }) + } + for _, hostname := range hostnames { + ingress.Spec.Rules = append(ingress.Spec.Rules, v1beta1.IngressRule{ + Host: hostname, + }) + } + _, err := client.ExtensionsV1beta1().Ingresses(namespace).Create(ingress) return err } @@ -380,6 +424,7 @@ func makeConfig(backend string, fakedAverage bool) *MetricConfig { type FakeCollectorPlugin struct { metrics []CollectedMetric + config map[string]string } type FakeCollector struct { @@ -395,13 +440,19 @@ func (FakeCollector) Interval() time.Duration { } func (p *FakeCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { + if p.config != nil { + return nil, fmt.Errorf("config already assigned once: %v", p.config) + } + p.config = config.Config return &FakeCollector{metrics: p.metrics}, nil } -func makePlugin(metrics []int) CollectorPlugin { - m := make([]CollectedMetric, len(metrics)) - for i, v := range metrics { - m[i] = CollectedMetric{Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(v), resource.DecimalSI)}} +func makePlugin(metric int) *FakeCollectorPlugin { + return &FakeCollectorPlugin{ + metrics: []CollectedMetric{ + { + Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(metric), resource.DecimalSI)}, + }, + }, } - return &FakeCollectorPlugin{metrics: m} }