diff --git a/pkg/collector/hostname_collector.go b/pkg/collector/hostname_collector.go new file mode 100644 index 0000000..d5a2c2c --- /dev/null +++ b/pkg/collector/hostname_collector.go @@ -0,0 +1,89 @@ +package collector + +import ( + "fmt" + "time" + + autoscalingv2 "k8s.io/api/autoscaling/v2beta2" +) + +const ( + HostnameMetricType = "hostname-rps" + HostnameRPSQuery = `scalar(sum(rate(%s{host=~"%s"}[1m])))` +) + +type HostnameCollectorPlugin struct { + metricName string + promPlugin CollectorPlugin +} + +type HostnameCollector struct { + interval time.Duration + promCollector Collector +} + +func NewHostnameCollectorPlugin( + promPlugin CollectorPlugin, + metricName string, +) (*HostnameCollectorPlugin, error) { + if metricName == "" { + return nil, fmt.Errorf("failed to initialize hostname collector plugin, metric name was not defined") + } + + return &HostnameCollectorPlugin{ + metricName: metricName, + promPlugin: promPlugin, + }, nil +} + +// NewCollector initializes a new skipper collector from the specified HPA. +func (p *HostnameCollectorPlugin) NewCollector( + hpa *autoscalingv2.HorizontalPodAutoscaler, + config *MetricConfig, + interval time.Duration, +) (Collector, error) { + if config == nil { + return nil, fmt.Errorf("Metric config not present, it is not possible to initialize the collector.") + } + // Need to copy config and add a promQL query in order to get + // RPS data from a specific hostname from prometheus. The idea + // of the copy is to not modify the original config struct. + confCopy := *config + hostname := config.Config["hostname"] + + if hostname == "" { + return nil, fmt.Errorf("hostname not specified, unable to create collector") + } + + confCopy.Config = map[string]string{ + "query": fmt.Sprintf(HostnameRPSQuery, p.metricName, hostname), + } + + c, err := p.promPlugin.NewCollector(hpa, &confCopy, interval) + if err != nil { + return nil, err + } + + return &HostnameCollector{ + interval: interval, + promCollector: c, + }, nil +} + +// GetMetrics gets hostname metrics from Prometheus +func (c *HostnameCollector) GetMetrics() ([]CollectedMetric, error) { + v, err := c.promCollector.GetMetrics() + if err != nil { + return nil, err + } + + if len(v) != 1 { + return nil, fmt.Errorf("expected to only get one metric value, got %d", len(v)) + } + return v, nil +} + +// Interval returns the interval at which the collector should run. +func (c *HostnameCollector) Interval() time.Duration { + return c.interval +} diff --git a/pkg/collector/hostname_collector_test.go b/pkg/collector/hostname_collector_test.go new file mode 100644 index 0000000..1239ca0 --- /dev/null +++ b/pkg/collector/hostname_collector_test.go @@ -0,0 +1,261 @@ +package collector + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + autoscalingv2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +func TestHostnameCollectorPluginConstructor(tt *testing.T) { + for _, testcase := range []struct { + msg string + name string + isValid bool + }{ + {"No metric name", "", false}, + {"Valid metric name", "a_valid_metric_name", true}, + } { + tt.Run(testcase.msg, func(t *testing.T) { + + fakePlugin := &FakeCollectorPlugin{} + plugin, err := NewHostnameCollectorPlugin(fakePlugin, testcase.name) + + if testcase.isValid { + require.NoError(t, err) + require.NotNil(t, plugin) + require.Equal(t, testcase.name, plugin.metricName) + require.Equal(t, fakePlugin, plugin.promPlugin) + } else { + require.NotNil(t, err) + require.Nil(t, plugin) + } + }) + } +} + +func TestHostnamePluginNewCollector(tt *testing.T) { + fakePlugin := &FakeCollectorPlugin{} + + plugin := &HostnameCollectorPlugin{ + metricName: "a_valid_one", + promPlugin: fakePlugin, + } + interval := time.Duration(42) + expectedQuery := `scalar(sum(rate(a_valid_one{host=~"foo.bar.baz"}[1m])))` + + for _, testcase := range []struct { + msg string + config *MetricConfig + shouldWork bool + }{ + {"No hostname config", &MetricConfig{Config: make(map[string]string)}, false}, + {"Nil metric config", nil, false}, + {"Valid hostname no prom query config", &MetricConfig{Config: map[string]string{"hostname": "foo.bar.baz"}}, true}, + {"Valid hostname with prom query config", &MetricConfig{Config: map[string]string{"hostname": "foo.bar.baz", "query": "some_other_query"}}, true}, + } { + tt.Run(testcase.msg, func(t *testing.T) { + c, err := plugin.NewCollector( + &autoscalingv2.HorizontalPodAutoscaler{}, + testcase.config, + interval, + ) + + if testcase.shouldWork { + require.NotNil(t, c) + require.Nil(t, err) + require.Equal(t, fakePlugin.config["query"], expectedQuery) + } else { + require.Nil(t, c) + require.NotNil(t, err) + } + }) + } +} + +func TestHostnameCollectorGetMetrics(tt *testing.T) { + genericErr := fmt.Errorf("This is an error") + expectedMetric := *resource.NewQuantity(int64(42), resource.DecimalSI) + + for _, testcase := range []struct { + msg string + stub func() ([]CollectedMetric, error) + shouldWork bool + }{ + { + "Internal collector error", + func() ([]CollectedMetric, error) { + return nil, genericErr + }, + false, + }, + { + "Invalid metric collection from internal collector", + func() ([]CollectedMetric, error) { + return []CollectedMetric{ + {External: external_metrics.ExternalMetricValue{Value: *resource.NewQuantity(int64(24), resource.DecimalSI)}}, + {External: external_metrics.ExternalMetricValue{Value: *resource.NewQuantity(int64(42), resource.DecimalSI)}}, + }, nil + }, + false, + }, + { + "Internal collector return single metric", + func() ([]CollectedMetric, error) { + return []CollectedMetric{ + {External: external_metrics.ExternalMetricValue{Value: *resource.NewQuantity(int64(42), resource.DecimalSI)}}, + }, nil + }, + true, + }, + } { + tt.Run(testcase.msg, func(t *testing.T) { + fake := makeCollectorWithStub(testcase.stub) + c := &HostnameCollector{promCollector: fake} + m, err := c.GetMetrics() + + if testcase.shouldWork { + require.Nil(t, err) + require.NotNil(t, m) + require.Len(t, m, 1) + require.Equal(t, m[0].External.Value, expectedMetric) + } else { + require.NotNil(t, err) + require.Nil(t, m) + } + }) + } +} + +func TestHostnameCollectorInterval(t *testing.T) { + interval := time.Duration(42) + fakePlugin := &FakeCollectorPlugin{} + plugin := &HostnameCollectorPlugin{ + metricName: "a_valid_one", + promPlugin: fakePlugin, + } + c, err := plugin.NewCollector( + &autoscalingv2.HorizontalPodAutoscaler{}, + &MetricConfig{Config: map[string]string{"hostname": "foo.bar.baz"}}, + interval, + ) + + require.NotNil(t, c) + require.Nil(t, err) + require.Equal(t, interval, c.Interval()) +} + +func TestHostnameCollectorAndCollectorFabricInteraction(t *testing.T) { + expectedQuery := `scalar(sum(rate(a_metric{host=~"just.testing.com"}[1m])))` + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "metric-config.external.foo.hostname-rps/hostname": "just.testing.com", + }, + }, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{ + Name: "foo", + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"type": "hostname-rps"}, + }, + }, + }, + }, + }, + }, + } + + factory := NewCollectorFactory() + fakePlugin := makePlugin(42) + hostnamePlugin, err := NewHostnameCollectorPlugin(fakePlugin, "a_metric") + require.NoError(t, err) + factory.RegisterExternalCollector([]string{HostnameMetricType}, hostnamePlugin) + conf, err := ParseHPAMetrics(hpa) + require.NoError(t, err) + require.Len(t, conf, 1) + + c, err := factory.NewCollector(hpa, conf[0], 0) + + require.NoError(t, err) + _, ok := c.(*HostnameCollector) + require.True(t, ok) + require.Equal(t, fakePlugin.config["query"], expectedQuery) + +} + +func TestHostnamePrometheusCollectorInteraction(t *testing.T) { + hostnameQuery := `scalar(sum(rate(a_metric{host=~"just.testing.com"}[1m])))` + promQuery := "sum(rate(rps[1m]))" + hpa := &autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "metric-config.external.foo.hostname-rps/hostname": "just.testing.com", + "metric-config.external.bar.prometheus/query": promQuery, + }, + }, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{ + Name: "foo", + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"type": "hostname-rps"}, + }, + }, + }, + }, + { + Type: autoscalingv2.ExternalMetricSourceType, + External: &autoscalingv2.ExternalMetricSource{ + Metric: autoscalingv2.MetricIdentifier{ + Name: "bar", + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"type": "prometheus"}, + }, + }, + }, + }, + }, + }, + } + + factory := NewCollectorFactory() + promPlugin, err := NewPrometheusCollectorPlugin(nil, "http://prometheus") + require.NoError(t, err) + factory.RegisterExternalCollector([]string{PrometheusMetricType, PrometheusMetricNameLegacy}, promPlugin) + hostnamePlugin, err := NewHostnameCollectorPlugin(promPlugin, "a_metric") + require.NoError(t, err) + factory.RegisterExternalCollector([]string{HostnameMetricType}, hostnamePlugin) + + conf, err := ParseHPAMetrics(hpa) + require.NoError(t, err) + require.Len(t, conf, 2) + + collectors := make(map[string]Collector) + collectors["hostname"], err = factory.NewCollector(hpa, conf[0], 0) + require.NoError(t, err) + collectors["prom"], err = factory.NewCollector(hpa, conf[1], 0) + require.NoError(t, err) + + prom, ok := collectors["prom"].(*PrometheusCollector) + require.True(t, ok) + hostname, ok := collectors["hostname"].(*HostnameCollector) + require.True(t, ok) + hostnameProm, ok := hostname.promCollector.(*PrometheusCollector) + require.True(t, ok) + + require.Equal(t, prom.query, promQuery) + require.Equal(t, hostnameProm.query, hostnameQuery) +} diff --git a/pkg/server/start.go b/pkg/server/start.go index 86388c5..90da3a2 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -65,6 +65,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { MetricsAddress: ":7979", ZMONTokenName: "zmon", CredentialsDir: "/meta/credentials", + HostnameRPSMetricName: "skipper_serve_host_duration_seconds_count", } cmd := &cobra.Command{ @@ -132,6 +133,10 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules") flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.") flags.StringVar(&o.DefaultTimeZone, "scaling-schedule-default-time-zone", "Europe/Berlin", "Default time zone to use for ScalingSchedules.") + flags.StringVar(&o.HostnameRPSMetricName, "hostname-rps-metric-name", o.HostnameRPSMetricName, ""+ + "The name of the metric that should be used to query prometheus for RPS per hostname.") + flags.BoolVar(&o.HostnameRPSMetrics, "hostname-rps-metrics", o.HostnameRPSMetrics, ""+ + "whether to enable hostname RPS metric collector or not") return cmd } @@ -218,6 +223,18 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct } } } + + // Hostname collector, like skipper's, depends on prometheus being enabled. + // Also, to enable hostname metric its necessary to pass the metric name that + // will be used. This was built this way so we can support hostname metrics to + // any ingress provider, e.g. Skipper, Nginx, envoy etc, in a simple way. + if o.HostnameRPSMetrics && o.HostnameRPSMetricName != "" { + hostnamePlugin, err := collector.NewHostnameCollectorPlugin(promPlugin, o.HostnameRPSMetricName) + collectorFactory.RegisterExternalCollector([]string{collector.HostnameMetricType}, hostnamePlugin) + if err != nil { + return fmt.Errorf("failed to register hostname collector plugin: %v", err) + } + } } if o.InfluxDBAddress != "" { @@ -445,4 +462,8 @@ type AdapterServerOptions struct { RampSteps int // Default time zone to use for ScalingSchedules. DefaultTimeZone string + // Feature flag to enable hostname rps metric collector + HostnameRPSMetrics bool + // Name of the Prometheus metric that stores RPS by hostname for Hostname RPS metrics. + HostnameRPSMetricName string }