Merge pull request #552 from zalando-incubator/hostname-rps-collector

Add hostname RPS metric collector
This commit is contained in:
Lucas Thiesen 2023-05-26 11:13:42 +02:00 committed by GitHub
commit c2179a35ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 569 additions and 35 deletions

View File

@ -402,6 +402,64 @@ the `backend` label under `matchLabels` for the metric. The ingress annotation
where the backend weights can be obtained can be specified through the flag
`--skipper-backends-annotation`.
## External RPS collector
The External RPS collector, like Skipper collector, is a simple wrapper around the Prometheus collector to
make it easy to define an HPA for scaling based on the RPS measured for a given hostname. When
[skipper](https://github.com/zalando/skipper) is used as the ingress
implementation in your cluster everything should work automatically, in case another reverse proxy is used as ingress, like [Nginx](https://github.com/kubernetes/ingress-nginx) for example, its necessary to configure which prometheus metric should be used through `--external-rps-metric-name <metric-name>` flag. Assuming `skipper-ingress` is being used or the appropriate metric name is passed using the flag mentioned previously this collector provides the correct Prometheus queries out of the
box so users don't have to define those manually.
### Supported metrics
| Metric | Description | Type | Kind | K8s Versions |
| ------------ | -------------- | ------- | -- | -- |
| `requests-per-second` | Scale based on requests per second for a certain hostname. | External | | `>=1.12` |
### Example: External Metric
This is an example of an HPA that will scale based on `requests-per-second` for the RPS measured in the hostnames called: `www.example1.com` and `www.example2.com`; and weighted by 42%.
```yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: myapp-hpa
annotations:
metric-config.external.example-rps.requests-per-second/hostname: www.example1.com,www.example2.com
metric-config.external.example-rps.requests-per-second/weight: "42"
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: custom-metrics-consumer
minReplicas: 1
maxReplicas: 10
metrics:
- type: External
external:
metric:
name: example-rps
selector:
matchLabels:
type: requests-per-second
target:
type: AverageValue
averageValue: "42"
```
### Multiple hostnames per metric
This metric supports a relation of n:1 between hostnames and metrics. The way it works is the measured RPS is the sum of the RPS rate of each of the specified hostnames. This value is further modified by the weight parameter explained bellow.
### Metric weighting based on backend
There are ingress-controllers, like skipper-ingress, that supports sending traffic to different backends based on some kind of configuration, in case of skipper annotations
present on the `Ingress` object, or weights on the RouteGroup backends. By
default the number of replicas will be calculated based on the full traffic
served by these components. If however only the traffic being routed to
a specific hostname should be used then the weight for the configured hostname(s) might be specified via the `weight` annotation `metric-config.external.<metric-name>.request-per-second/weight` for the metric being configured.
## InfluxDB collector
The InfluxDB collector maps [Flux](https://github.com/influxdata/flux) queries to metrics that can be used for scaling.

View File

@ -0,0 +1,128 @@
package collector
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"
autoscalingv2 "k8s.io/api/autoscaling/v2"
)
const (
ExternalRPSMetricType = "requests-per-second"
ExternalRPSQuery = `scalar(sum(rate(%s{host=~"%s"}[1m])) * %.4f)`
)
type ExternalRPSCollectorPlugin struct {
metricName string
promPlugin CollectorPlugin
pattern *regexp.Regexp
}
type ExternalRPSCollector struct {
interval time.Duration
promCollector Collector
}
func NewExternalRPSCollectorPlugin(
promPlugin CollectorPlugin,
metricName string,
) (*ExternalRPSCollectorPlugin, error) {
if metricName == "" {
return nil, fmt.Errorf("failed to initialize hostname collector plugin, metric name was not defined")
}
p, err := regexp.Compile("^[a-zA-Z0-9.-]+$")
if err != nil {
return nil, fmt.Errorf("failed to create regular expression to match hostname format")
}
return &ExternalRPSCollectorPlugin{
metricName: metricName,
promPlugin: promPlugin,
pattern: p,
}, nil
}
// NewCollector initializes a new skipper collector from the specified HPA.
func (p *ExternalRPSCollectorPlugin) NewCollector(
hpa *autoscalingv2.HorizontalPodAutoscaler,
config *MetricConfig,
interval time.Duration,
) (Collector, error) {
if config == nil {
return nil, fmt.Errorf("metric config not present, it is not possible to initialize the collector")
}
// Need to copy config and add a promQL query in order to get
// RPS data from a specific hostname from prometheus. The idea
// of the copy is to not modify the original config struct.
confCopy := *config
if _, ok := config.Config["hostnames"]; !ok {
return nil, fmt.Errorf("Hostname is not specified, unable to create collector")
}
hostnames := strings.Split(config.Config["hostnames"], ",")
if p.pattern == nil {
return nil, fmt.Errorf("plugin did not specify hostname regex pattern, unable to create collector")
}
for _, h := range hostnames {
if ok := p.pattern.MatchString(h); !ok {
return nil, fmt.Errorf(
"invalid hostname format, unable to create collector: %s",
h,
)
}
}
weight := 1.0
if w, ok := config.Config["weight"]; ok {
num, err := strconv.ParseFloat(w, 64)
if err != nil {
return nil, fmt.Errorf("could not parse weight annotation, unable to create collector: %s", w)
}
weight = num / 100.0
}
confCopy.Config = map[string]string{
"query": fmt.Sprintf(
ExternalRPSQuery,
p.metricName,
strings.ReplaceAll(strings.Join(hostnames, "|"), ".", "_"),
weight,
),
}
c, err := p.promPlugin.NewCollector(hpa, &confCopy, interval)
if err != nil {
return nil, err
}
return &ExternalRPSCollector{
interval: interval,
promCollector: c,
}, nil
}
// GetMetrics gets hostname metrics from Prometheus
func (c *ExternalRPSCollector) GetMetrics() ([]CollectedMetric, error) {
v, err := c.promCollector.GetMetrics()
if err != nil {
return nil, err
}
if len(v) != 1 {
return nil, fmt.Errorf("expected to only get one metric value, got %d", len(v))
}
return v, nil
}
// Interval returns the interval at which the collector should run.
func (c *ExternalRPSCollector) Interval() time.Duration {
return c.interval
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,57 @@
package collector
import (
"time"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/custom_metrics"
)
type FakeCollectorPlugin struct {
metrics []CollectedMetric
config map[string]string
}
type FakeCollector struct {
metrics []CollectedMetric
interval time.Duration
stub func() ([]CollectedMetric, error)
}
func (c *FakeCollector) GetMetrics() ([]CollectedMetric, error) {
if c.stub != nil {
v, err := c.stub()
return v, err
}
return c.metrics, nil
}
func (FakeCollector) Interval() time.Duration {
return time.Minute
}
func (p *FakeCollectorPlugin) NewCollector(
hpa *autoscalingv2.HorizontalPodAutoscaler,
config *MetricConfig,
interval time.Duration,
) (Collector, error) {
p.config = config.Config
return &FakeCollector{metrics: p.metrics, interval: interval}, nil
}
func makePlugin(metric int) *FakeCollectorPlugin {
return &FakeCollectorPlugin{
metrics: []CollectedMetric{
{
Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(metric), resource.DecimalSI)},
},
},
}
}
func makeCollectorWithStub(f func() ([]CollectedMetric, error)) *FakeCollector {
return &FakeCollector{stub: f}
}

View File

@ -658,38 +658,3 @@ func makeConfig(resourceName, namespace, kind, backend string, fakedAverage bool
}
return config
}
type FakeCollectorPlugin struct {
metrics []CollectedMetric
config map[string]string
}
type FakeCollector struct {
metrics []CollectedMetric
}
func (c *FakeCollector) GetMetrics() ([]CollectedMetric, error) {
return c.metrics, nil
}
func (FakeCollector) Interval() time.Duration {
return time.Minute
}
func (p *FakeCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
if p.config != nil {
return nil, fmt.Errorf("config already assigned once: %v", p.config)
}
p.config = config.Config
return &FakeCollector{metrics: p.metrics}, nil
}
func makePlugin(metric int) *FakeCollectorPlugin {
return &FakeCollectorPlugin{
metrics: []CollectedMetric{
{
Custom: custom_metrics.MetricValue{Value: *resource.NewQuantity(int64(metric), resource.DecimalSI)},
},
},
}
}

View File

@ -65,6 +65,7 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
MetricsAddress: ":7979",
ZMONTokenName: "zmon",
CredentialsDir: "/meta/credentials",
ExternalRPSMetricName: "skipper_serve_host_duration_seconds_count",
}
cmd := &cobra.Command{
@ -132,6 +133,10 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules")
flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.")
flags.StringVar(&o.DefaultTimeZone, "scaling-schedule-default-time-zone", "Europe/Berlin", "Default time zone to use for ScalingSchedules.")
flags.StringVar(&o.ExternalRPSMetricName, "external-rps-metric-name", o.ExternalRPSMetricName, ""+
"The name of the metric that should be used to query prometheus for RPS per hostname.")
flags.BoolVar(&o.ExternalRPSMetrics, "external-rps-metrics", o.ExternalRPSMetrics, ""+
"whether to enable external RPS metric collector or not")
return cmd
}
@ -218,6 +223,18 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
}
}
}
// External RPS collector, like skipper's, depends on prometheus being enabled.
// Also, to enable hostname metric its necessary to pass the metric name that
// will be used. This was built this way so we can support hostname metrics to
// any ingress provider, e.g. Skipper, Nginx, envoy etc, in a simple way.
if o.ExternalRPSMetrics && o.ExternalRPSMetricName != "" {
externalRPSPlugin, err := collector.NewExternalRPSCollectorPlugin(promPlugin, o.ExternalRPSMetricName)
collectorFactory.RegisterExternalCollector([]string{collector.ExternalRPSMetricType}, externalRPSPlugin)
if err != nil {
return fmt.Errorf("failed to register hostname collector plugin: %v", err)
}
}
}
if o.InfluxDBAddress != "" {
@ -445,4 +462,8 @@ type AdapterServerOptions struct {
RampSteps int
// Default time zone to use for ScalingSchedules.
DefaultTimeZone string
// Feature flag to enable external rps metric collector
ExternalRPSMetrics bool
// Name of the Prometheus metric that stores RPS by hostname for external RPS metrics.
ExternalRPSMetricName string
}