Add hostname RPS metric collector
Signed-off-by: Lucas Thiesen <lucas.thiesen@zalando.de>
This commit is contained in:
89
pkg/collector/hostname_collector.go
Normal file
89
pkg/collector/hostname_collector.go
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
package collector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
HostnameMetricType = "hostname-rps"
|
||||||
|
HostnameRPSQuery = `scalar(sum(rate(%s{host=~"%s"}[1m])))`
|
||||||
|
)
|
||||||
|
|
||||||
|
type HostnameCollectorPlugin struct {
|
||||||
|
metricName string
|
||||||
|
promPlugin CollectorPlugin
|
||||||
|
}
|
||||||
|
|
||||||
|
type HostnameCollector struct {
|
||||||
|
interval time.Duration
|
||||||
|
promCollector Collector
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHostnameCollectorPlugin(
|
||||||
|
promPlugin CollectorPlugin,
|
||||||
|
metricName string,
|
||||||
|
) (*HostnameCollectorPlugin, error) {
|
||||||
|
if metricName == "" {
|
||||||
|
return nil, fmt.Errorf("failed to initialize hostname collector plugin, metric name was not defined")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &HostnameCollectorPlugin{
|
||||||
|
metricName: metricName,
|
||||||
|
promPlugin: promPlugin,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCollector initializes a new skipper collector from the specified HPA.
|
||||||
|
func (p *HostnameCollectorPlugin) NewCollector(
|
||||||
|
hpa *autoscalingv2.HorizontalPodAutoscaler,
|
||||||
|
config *MetricConfig,
|
||||||
|
interval time.Duration,
|
||||||
|
) (Collector, error) {
|
||||||
|
if config == nil {
|
||||||
|
return nil, fmt.Errorf("Metric config not present, it is not possible to initialize the collector.")
|
||||||
|
}
|
||||||
|
// Need to copy config and add a promQL query in order to get
|
||||||
|
// RPS data from a specific hostname from prometheus. The idea
|
||||||
|
// of the copy is to not modify the original config struct.
|
||||||
|
confCopy := *config
|
||||||
|
hostname := config.Config["hostname"]
|
||||||
|
|
||||||
|
if hostname == "" {
|
||||||
|
return nil, fmt.Errorf("hostname not specified, unable to create collector")
|
||||||
|
}
|
||||||
|
|
||||||
|
confCopy.Config = map[string]string{
|
||||||
|
"query": fmt.Sprintf(HostnameRPSQuery, p.metricName, hostname),
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := p.promPlugin.NewCollector(hpa, &confCopy, interval)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &HostnameCollector{
|
||||||
|
interval: interval,
|
||||||
|
promCollector: c,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMetrics gets hostname metrics from Prometheus
|
||||||
|
func (c *HostnameCollector) GetMetrics() ([]CollectedMetric, error) {
|
||||||
|
v, err := c.promCollector.GetMetrics()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(v) != 1 {
|
||||||
|
return nil, fmt.Errorf("expected to only get one metric value, got %d", len(v))
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Interval returns the interval at which the collector should run.
|
||||||
|
func (c *HostnameCollector) Interval() time.Duration {
|
||||||
|
return c.interval
|
||||||
|
}
|
261
pkg/collector/hostname_collector_test.go
Normal file
261
pkg/collector/hostname_collector_test.go
Normal file
@ -0,0 +1,261 @@
|
|||||||
|
package collector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||||
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/metrics/pkg/apis/external_metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHostnameCollectorPluginConstructor(tt *testing.T) {
|
||||||
|
for _, testcase := range []struct {
|
||||||
|
msg string
|
||||||
|
name string
|
||||||
|
isValid bool
|
||||||
|
}{
|
||||||
|
{"No metric name", "", false},
|
||||||
|
{"Valid metric name", "a_valid_metric_name", true},
|
||||||
|
} {
|
||||||
|
tt.Run(testcase.msg, func(t *testing.T) {
|
||||||
|
|
||||||
|
fakePlugin := &FakeCollectorPlugin{}
|
||||||
|
plugin, err := NewHostnameCollectorPlugin(fakePlugin, testcase.name)
|
||||||
|
|
||||||
|
if testcase.isValid {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, plugin)
|
||||||
|
require.Equal(t, testcase.name, plugin.metricName)
|
||||||
|
require.Equal(t, fakePlugin, plugin.promPlugin)
|
||||||
|
} else {
|
||||||
|
require.NotNil(t, err)
|
||||||
|
require.Nil(t, plugin)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHostnamePluginNewCollector(tt *testing.T) {
|
||||||
|
fakePlugin := &FakeCollectorPlugin{}
|
||||||
|
|
||||||
|
plugin := &HostnameCollectorPlugin{
|
||||||
|
metricName: "a_valid_one",
|
||||||
|
promPlugin: fakePlugin,
|
||||||
|
}
|
||||||
|
interval := time.Duration(42)
|
||||||
|
expectedQuery := `scalar(sum(rate(a_valid_one{host=~"foo.bar.baz"}[1m])))`
|
||||||
|
|
||||||
|
for _, testcase := range []struct {
|
||||||
|
msg string
|
||||||
|
config *MetricConfig
|
||||||
|
shouldWork bool
|
||||||
|
}{
|
||||||
|
{"No hostname config", &MetricConfig{Config: make(map[string]string)}, false},
|
||||||
|
{"Nil metric config", nil, false},
|
||||||
|
{"Valid hostname no prom query config", &MetricConfig{Config: map[string]string{"hostname": "foo.bar.baz"}}, true},
|
||||||
|
{"Valid hostname with prom query config", &MetricConfig{Config: map[string]string{"hostname": "foo.bar.baz", "query": "some_other_query"}}, true},
|
||||||
|
} {
|
||||||
|
tt.Run(testcase.msg, func(t *testing.T) {
|
||||||
|
c, err := plugin.NewCollector(
|
||||||
|
&autoscalingv2.HorizontalPodAutoscaler{},
|
||||||
|
testcase.config,
|
||||||
|
interval,
|
||||||
|
)
|
||||||
|
|
||||||
|
if testcase.shouldWork {
|
||||||
|
require.NotNil(t, c)
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Equal(t, fakePlugin.config["query"], expectedQuery)
|
||||||
|
} else {
|
||||||
|
require.Nil(t, c)
|
||||||
|
require.NotNil(t, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHostnameCollectorGetMetrics(tt *testing.T) {
|
||||||
|
genericErr := fmt.Errorf("This is an error")
|
||||||
|
expectedMetric := *resource.NewQuantity(int64(42), resource.DecimalSI)
|
||||||
|
|
||||||
|
for _, testcase := range []struct {
|
||||||
|
msg string
|
||||||
|
stub func() ([]CollectedMetric, error)
|
||||||
|
shouldWork bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"Internal collector error",
|
||||||
|
func() ([]CollectedMetric, error) {
|
||||||
|
return nil, genericErr
|
||||||
|
},
|
||||||
|
false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Invalid metric collection from internal collector",
|
||||||
|
func() ([]CollectedMetric, error) {
|
||||||
|
return []CollectedMetric{
|
||||||
|
{External: external_metrics.ExternalMetricValue{Value: *resource.NewQuantity(int64(24), resource.DecimalSI)}},
|
||||||
|
{External: external_metrics.ExternalMetricValue{Value: *resource.NewQuantity(int64(42), resource.DecimalSI)}},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Internal collector return single metric",
|
||||||
|
func() ([]CollectedMetric, error) {
|
||||||
|
return []CollectedMetric{
|
||||||
|
{External: external_metrics.ExternalMetricValue{Value: *resource.NewQuantity(int64(42), resource.DecimalSI)}},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
tt.Run(testcase.msg, func(t *testing.T) {
|
||||||
|
fake := makeCollectorWithStub(testcase.stub)
|
||||||
|
c := &HostnameCollector{promCollector: fake}
|
||||||
|
m, err := c.GetMetrics()
|
||||||
|
|
||||||
|
if testcase.shouldWork {
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.NotNil(t, m)
|
||||||
|
require.Len(t, m, 1)
|
||||||
|
require.Equal(t, m[0].External.Value, expectedMetric)
|
||||||
|
} else {
|
||||||
|
require.NotNil(t, err)
|
||||||
|
require.Nil(t, m)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHostnameCollectorInterval(t *testing.T) {
|
||||||
|
interval := time.Duration(42)
|
||||||
|
fakePlugin := &FakeCollectorPlugin{}
|
||||||
|
plugin := &HostnameCollectorPlugin{
|
||||||
|
metricName: "a_valid_one",
|
||||||
|
promPlugin: fakePlugin,
|
||||||
|
}
|
||||||
|
c, err := plugin.NewCollector(
|
||||||
|
&autoscalingv2.HorizontalPodAutoscaler{},
|
||||||
|
&MetricConfig{Config: map[string]string{"hostname": "foo.bar.baz"}},
|
||||||
|
interval,
|
||||||
|
)
|
||||||
|
|
||||||
|
require.NotNil(t, c)
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Equal(t, interval, c.Interval())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHostnameCollectorAndCollectorFabricInteraction(t *testing.T) {
|
||||||
|
expectedQuery := `scalar(sum(rate(a_metric{host=~"just.testing.com"}[1m])))`
|
||||||
|
hpa := &autoscalingv2.HorizontalPodAutoscaler{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"metric-config.external.foo.hostname-rps/hostname": "just.testing.com",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
|
||||||
|
Metrics: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
Metric: autoscalingv2.MetricIdentifier{
|
||||||
|
Name: "foo",
|
||||||
|
Selector: &metav1.LabelSelector{
|
||||||
|
MatchLabels: map[string]string{"type": "hostname-rps"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
factory := NewCollectorFactory()
|
||||||
|
fakePlugin := makePlugin(42)
|
||||||
|
hostnamePlugin, err := NewHostnameCollectorPlugin(fakePlugin, "a_metric")
|
||||||
|
require.NoError(t, err)
|
||||||
|
factory.RegisterExternalCollector([]string{HostnameMetricType}, hostnamePlugin)
|
||||||
|
conf, err := ParseHPAMetrics(hpa)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, conf, 1)
|
||||||
|
|
||||||
|
c, err := factory.NewCollector(hpa, conf[0], 0)
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, ok := c.(*HostnameCollector)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, fakePlugin.config["query"], expectedQuery)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHostnamePrometheusCollectorInteraction(t *testing.T) {
|
||||||
|
hostnameQuery := `scalar(sum(rate(a_metric{host=~"just.testing.com"}[1m])))`
|
||||||
|
promQuery := "sum(rate(rps[1m]))"
|
||||||
|
hpa := &autoscalingv2.HorizontalPodAutoscaler{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"metric-config.external.foo.hostname-rps/hostname": "just.testing.com",
|
||||||
|
"metric-config.external.bar.prometheus/query": promQuery,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
|
||||||
|
Metrics: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
Metric: autoscalingv2.MetricIdentifier{
|
||||||
|
Name: "foo",
|
||||||
|
Selector: &metav1.LabelSelector{
|
||||||
|
MatchLabels: map[string]string{"type": "hostname-rps"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
Metric: autoscalingv2.MetricIdentifier{
|
||||||
|
Name: "bar",
|
||||||
|
Selector: &metav1.LabelSelector{
|
||||||
|
MatchLabels: map[string]string{"type": "prometheus"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
factory := NewCollectorFactory()
|
||||||
|
promPlugin, err := NewPrometheusCollectorPlugin(nil, "http://prometheus")
|
||||||
|
require.NoError(t, err)
|
||||||
|
factory.RegisterExternalCollector([]string{PrometheusMetricType, PrometheusMetricNameLegacy}, promPlugin)
|
||||||
|
hostnamePlugin, err := NewHostnameCollectorPlugin(promPlugin, "a_metric")
|
||||||
|
require.NoError(t, err)
|
||||||
|
factory.RegisterExternalCollector([]string{HostnameMetricType}, hostnamePlugin)
|
||||||
|
|
||||||
|
conf, err := ParseHPAMetrics(hpa)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, conf, 2)
|
||||||
|
|
||||||
|
collectors := make(map[string]Collector)
|
||||||
|
collectors["hostname"], err = factory.NewCollector(hpa, conf[0], 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
collectors["prom"], err = factory.NewCollector(hpa, conf[1], 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
prom, ok := collectors["prom"].(*PrometheusCollector)
|
||||||
|
require.True(t, ok)
|
||||||
|
hostname, ok := collectors["hostname"].(*HostnameCollector)
|
||||||
|
require.True(t, ok)
|
||||||
|
hostnameProm, ok := hostname.promCollector.(*PrometheusCollector)
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
require.Equal(t, prom.query, promQuery)
|
||||||
|
require.Equal(t, hostnameProm.query, hostnameQuery)
|
||||||
|
}
|
@ -65,6 +65,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
|||||||
MetricsAddress: ":7979",
|
MetricsAddress: ":7979",
|
||||||
ZMONTokenName: "zmon",
|
ZMONTokenName: "zmon",
|
||||||
CredentialsDir: "/meta/credentials",
|
CredentialsDir: "/meta/credentials",
|
||||||
|
HostnameRPSMetricName: "skipper_serve_host_duration_seconds_count",
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := &cobra.Command{
|
cmd := &cobra.Command{
|
||||||
@ -132,6 +133,10 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
|
|||||||
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules")
|
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules")
|
||||||
flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.")
|
flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.")
|
||||||
flags.StringVar(&o.DefaultTimeZone, "scaling-schedule-default-time-zone", "Europe/Berlin", "Default time zone to use for ScalingSchedules.")
|
flags.StringVar(&o.DefaultTimeZone, "scaling-schedule-default-time-zone", "Europe/Berlin", "Default time zone to use for ScalingSchedules.")
|
||||||
|
flags.StringVar(&o.HostnameRPSMetricName, "hostname-rps-metric-name", o.HostnameRPSMetricName, ""+
|
||||||
|
"The name of the metric that should be used to query prometheus for RPS per hostname.")
|
||||||
|
flags.BoolVar(&o.HostnameRPSMetrics, "hostname-rps-metrics", o.HostnameRPSMetrics, ""+
|
||||||
|
"whether to enable hostname RPS metric collector or not")
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,6 +223,18 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Hostname collector, like skipper's, depends on prometheus being enabled.
|
||||||
|
// Also, to enable hostname metric its necessary to pass the metric name that
|
||||||
|
// will be used. This was built this way so we can support hostname metrics to
|
||||||
|
// any ingress provider, e.g. Skipper, Nginx, envoy etc, in a simple way.
|
||||||
|
if o.HostnameRPSMetrics && o.HostnameRPSMetricName != "" {
|
||||||
|
hostnamePlugin, err := collector.NewHostnameCollectorPlugin(promPlugin, o.HostnameRPSMetricName)
|
||||||
|
collectorFactory.RegisterExternalCollector([]string{collector.HostnameMetricType}, hostnamePlugin)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to register hostname collector plugin: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if o.InfluxDBAddress != "" {
|
if o.InfluxDBAddress != "" {
|
||||||
@ -445,4 +462,8 @@ type AdapterServerOptions struct {
|
|||||||
RampSteps int
|
RampSteps int
|
||||||
// Default time zone to use for ScalingSchedules.
|
// Default time zone to use for ScalingSchedules.
|
||||||
DefaultTimeZone string
|
DefaultTimeZone string
|
||||||
|
// Feature flag to enable hostname rps metric collector
|
||||||
|
HostnameRPSMetrics bool
|
||||||
|
// Name of the Prometheus metric that stores RPS by hostname for Hostname RPS metrics.
|
||||||
|
HostnameRPSMetricName string
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user